柚子快報(bào)激活碼778899分享:數(shù)據(jù)庫(kù) 再學(xué)DataX
柚子快報(bào)激活碼778899分享:數(shù)據(jù)庫(kù) 再學(xué)DataX
一、DataX簡(jiǎn)介
DataX官網(wǎng)文檔:https://github.com/alibaba/DataX/blob/master/introduction.md
DataX 是一個(gè)異構(gòu)數(shù)據(jù)源離線同步工具,致力于實(shí)現(xiàn)包括關(guān)系型數(shù)據(jù)庫(kù)(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構(gòu)數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能。
1.1、DataX 3.0框架設(shè)計(jì)
DataX本身作為離線數(shù)據(jù)同步框架,采用Framework + plugin架構(gòu)構(gòu)建。將數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer插件,納入到整個(gè)同步框架中。
Reader:Reader為數(shù)據(jù)采集模塊,負(fù)責(zé)采集數(shù)據(jù)源的數(shù)據(jù),將數(shù)據(jù)發(fā)送給Framework。 Writer: Writer為數(shù)據(jù)寫入模塊,負(fù)責(zé)不斷向Framework取數(shù)據(jù),并將數(shù)據(jù)寫入到目的端。 Framework:Framework用于連接reader和writer,作為兩者的數(shù)據(jù)傳輸通道,并處理緩沖,流控,并發(fā),數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問(wèn)題。
1.2、DataX3.0核心架構(gòu)
DataX 3.0 開(kāi)源版本支持單機(jī)多線程模式完成同步作業(yè)運(yùn)行
1、Job
DataX完成單個(gè)數(shù)據(jù)同步的作業(yè)。一個(gè)Job對(duì)應(yīng)一個(gè)進(jìn)程。Job模塊負(fù)責(zé)task切分,TaskGroup管理等。
2、Task
Task是DataX的最小單元,每個(gè)task負(fù)責(zé)一部分?jǐn)?shù)據(jù)同步工作。
3、TaskGroup
Job在切分完多個(gè)Task后,會(huì)調(diào)用DataX的scheduler模塊,根據(jù)配置的并發(fā)量,將拆分成的多個(gè)Task分配到不同的TaskGroup中,每個(gè)TaskGroup負(fù)責(zé)以一定并發(fā)運(yùn)行分配給他的全部Task,每個(gè)TaskGroup默認(rèn)的并發(fā)量是5.
4、Task的執(zhí)行流程
每個(gè)Task由TaskGroup啟動(dòng),每個(gè)Task對(duì)固定啟動(dòng)Reader—>Channel—>Writer的線程來(lái)完成數(shù)據(jù)同步工作。
DataX Job運(yùn)行起來(lái)后,由Job監(jiān)控并等待每個(gè)TaskGroup的task執(zhí)行完成,等所有TaskGroup任務(wù)執(zhí)行完成后,Job成功退出。否則,異常退出。
5、DataX調(diào)度流程
舉例來(lái)說(shuō),用戶提交了一個(gè)DataX作業(yè),并且配置了20個(gè)并發(fā),目的是將一個(gè)100張分表的mysql數(shù)據(jù)同步到odps里面。
DataX的調(diào)度決策思路是:
1)DataX Job根據(jù)分庫(kù)分表切分成了100個(gè)Task。
2)由于配置了20個(gè)并發(fā),每個(gè)TaskGroup默認(rèn)并發(fā)度是5,所以需要4個(gè)TaskGroup
3)由4個(gè)TaskGroup平均切分100個(gè)Task,每個(gè)TaskGroup被分到了25個(gè)Task,共啟動(dòng)5個(gè)并發(fā)。
1.3、DataX優(yōu)勢(shì)
1、可靠的監(jiān)控
2、數(shù)據(jù)轉(zhuǎn)換功能豐富
3、精準(zhǔn)的流控
4、同步性能好
5、容錯(cuò)機(jī)制健壯
6、使用體驗(yàn)好
二、DataX源碼解讀
2.1、入口類:Engine
入口類為com.alibaba.datax.core.Engine.java main函數(shù)
1、解析args入?yún)ⅲ?/p>
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
針對(duì)命令行參數(shù)采用了org.apache.commons的BasicParser解析,針對(duì)任務(wù)的配置文件則通過(guò)其本身的ConfigParser進(jìn)行解析(可以支持本地和網(wǎng)絡(luò)文件)。
2、啟動(dòng)Engine 參數(shù)啟動(dòng)完畢后,調(diào)用Engine.start方法啟動(dòng)
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
然后選擇是Job模式還是TaskGroup模式:
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
實(shí)際上基本都是Job模式,后續(xù)我們主要以JobContainer為切入點(diǎn),另一個(gè)則為TaskGroupContainer。兩者均繼承自AbstractContainer基類,并通過(guò)調(diào)用他們的start方法進(jìn)行啟動(dòng)。
2.2、JobContainer
JobContainer.start方法是入口
preHander
Job前置操作,即初始化preHandler插件并執(zhí)行其preHandler
1)init 初始化reader和writer,實(shí)際方法中根據(jù)讀寫插件各自執(zhí)行了對(duì)應(yīng)的初始化方法:
//必須先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
2)prepare 全局準(zhǔn)備工作,比如odpswriter清空目標(biāo)表。由于讀寫插件的特殊性質(zhì),其方法內(nèi)部主要也是執(zhí)行了各類型插件的方法來(lái)實(shí)現(xiàn)準(zhǔn)備工作
this.prepareJobReader();
this.prepareJobWriter();
3)split 拆分Task,參數(shù)adviceNumber為建議的拆分?jǐn)?shù)。除此之外我們還可以通過(guò)字節(jié)和事務(wù)的限速來(lái)進(jìn)行控制,從而決定Channel的數(shù)量。
job.setting.speed.byte:總BPS限速,如果存在值則單個(gè)Channel的BPS不能為空,通過(guò)總限速除以單個(gè)Channel限速得出Channel的需求數(shù)量;core.transport.channel.speed.byte:?jiǎn)蝹€(gè)Channel的BPS限速;job.setting.speed.record:總TPS限速,如果存在則單個(gè)Channel的TPS不能為空,通過(guò)總限速除以單個(gè)Channel限速得出Channel的需求數(shù)量;core.transport.channel.speed.record:?jiǎn)蝹€(gè)Channel的TPS限速;
4)schedule schedule首先完成的工作是把上一步reader和writer split的結(jié)果整合到具體taskGroupContainer中, 同時(shí)不同的執(zhí)行模式調(diào)用不同的調(diào)度策略,將所有任務(wù)調(diào)度起來(lái)
由于實(shí)際任務(wù)是由TaskGroupContainer執(zhí)行,為此我們還需要?jiǎng)澐謱?duì)應(yīng)TaskGroup需要運(yùn)行的Task,該參數(shù)通過(guò)core.container.taskGroup.channel進(jìn)行配置,默認(rèn)為5。決定每個(gè)Group運(yùn)行那些Task的則由以下方法進(jìn)行決定,將直接返回對(duì)應(yīng)任務(wù)組的配置參數(shù)。
/**
* 通過(guò)獲取配置信息得到每個(gè)taskGroup需要運(yùn)行哪些tasks任務(wù)
*/
List
this.needChannelNumber, channelsPerTaskGroup);
完成任務(wù)分配后我們就需要根據(jù)運(yùn)行模式?jīng)Q定調(diào)度器,通過(guò)這里的源碼可以明顯看出其DataX 3.0是經(jīng)過(guò)了閹割,僅保留了單機(jī)運(yùn)行模式。
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
后續(xù)我們僅能描述單機(jī)模式下關(guān)于任務(wù)調(diào)度的工作原理: Step1:調(diào)度器初始化的核心方法initStandaloneScheduler,其方法主要是初始化了StandAloneJobContainerCommunicator類用于通信(其中collect由ProcessInnerCollector提供,reporter由ProcessInnerReporter提供),StandAloneScheduler則為實(shí)際調(diào)度器。
最終執(zhí)行:
scheduler.schedule(taskGroupConfigs);
在AbstractScheduler的schedule中通過(guò)StandAloneJobContainerCommunicator類調(diào)用了其collect方法:
public Communication collect() {
return super.getCollector().collectFromTaskGroup();
}
該類為ProcessInnerCollector類,其對(duì)應(yīng)的方法依然是LocalTGCommunicationManager靜態(tài)類其中一個(gè)靜態(tài)方法。
public Communication collectFromTaskGroup() {
return LocalTGCommunicationManager.getJobCommunication();
}
其內(nèi)部也是將之前每個(gè)TaskGroup所創(chuàng)建的Communication維護(hù)了一個(gè)靜態(tài)字典并在需要的時(shí)候進(jìn)行合并。
public static Communication getJobCommunication() {
Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskGroupCommunication :
taskGroupCommunicationMap.values()) {
communication.mergeFrom(taskGroupCommunication);
}
return communication;
}
柚子快報(bào)激活碼778899分享:數(shù)據(jù)庫(kù) 再學(xué)DataX
精彩文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。