文章目录
- 前言
- 开始准备
- 运行配置
- 开始运行
- JobContainer
- 1.进入init
- prepare
- schedule
- post阶段
- this.invokeHooks();
- 总结
前言
在用dataX一段时间后,还是想去了解一下它的具体原理,读源码还是得做一些笔记,可能我写的不是很好,如果有写错的话,还请大家多多指教。
开始准备
首先在阅读源码之前,需要先把DataX的源码clone下来,然后再用maven来进行打包,我这边的话在准备时也遇到一些jar找不到的问题,如果有需要的jar包的话,可以在下面提出来,我这边就放一个stream的jar包就好。
git clone https://github.com/alibaba/DataX.git然后再IDEA里面打开,然后,在datax-all使用maven进行打包,选择clean和package两个选项,进行打包

然后在core部分就可以看到打包出来了,我们可以将这个文件拷出来,然后再新建一个目录plugin文件,再建两个子目录,然后把相应需要的writer和reader的jar添加到相应的文件夹下。

完成上面后,我们需要修改一些配置文件,首先需要修改core.json,位于core打包出来的conf文件夹里面,byte不能设为-1
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CtJGVQhQ-1661089484962)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-30-17-49-23-image.png)] dataX源码学习_json_03](https://file.cfanz.cn/uploads/png/2022/08/27/16/0b87760617.png)
添加需要的jar包,我把stream的jar放在下面的链接中了。
运行配置
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1KuNWgDJ-1661089484964)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-30-17-51-34-image.png)] dataX源码学习_json_04](https://file.cfanz.cn/uploads/png/2022/08/27/16/Kc33f3b657.png)
需要的配置,将你的配置文件位置设置出来和你刚刚打包出来的dataX的位置,我们的是放在E盘
-Ddatax.home=E:/datax
-mode standalone -jobid -1 -job E:/datax/job/job.json
然后点击运行就可以查看到结果了。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E5a76r5Q-1661089484964)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-17-28-image.png)] dataX源码学习_java_05](https://file.cfanz.cn/uploads/png/2022/08/27/16/6e75VXA81H.png)
开始运行
首先我们从Engine主类,位于datax-core包下面,java包下面,我们可以找到入口Engine类
我们先从args获取到我们输入 的参数。
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
Engine.entry(args);
} catch (Throwable e) {
exitCode = 1;
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));
if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
exitCode = tempErrorCode.toExitValue();
}
}
System.exit(exitCode);
}
System.exit(exitCode);
}
通过ConfigParser来进行job.json的解析,先从参数中获取到工作文件的路径,然后再通过FileUtils.readFileToString部分读入job.json配置文件
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode") ;
// jobPath是job.json的配置路径
Configuration configuration = ConfigParser.parse(jobPath);
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-38IPSqG2-1661089484965)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-29-04-image.png)] dataX源码学习_配置文件_06](https://file.cfanz.cn/uploads/png/2022/08/27/16/caS2UCdL3D.png)
然后在ConfigParser.java中将配置文件读入进来,获取到我们从json中提交的content部分和settings
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-24VqAnFY-1661089484966)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-41-54-image.png)] dataX源码学习_json_07](https://file.cfanz.cn/uploads/png/2022/08/27/16/Yf96dbQVA2.png)
这里它是怎么获取这些配置呢,package com.alibaba.datax.core.util.container.CoreConstant中会中会有一些全局的变量配置,我列举一些出来
// reader 的名称
public static final String DATAX_JOB_CONTENT_READER_NAME = "job.content[0].reader.name";
// reader的参数
public static final String DATAX_JOB_CONTENT_READER_PARAMETER = "job.content[0].reader.parameter";
// writer的名称
public static final String DATAX_JOB_CONTENT_WRITER_NAME = "job.content[0].writer.name";
// writer的参数
public static final String DATAX_JOB_CONTENT_WRITER_PARAMETER = "job.content[0].writer.parameter";
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ntr8D1XV-1661089484966)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-48-53-image.png)] dataX源码学习_配置文件_08](https://file.cfanz.cn/uploads/png/2022/08/27/16/60c110516S.png)
获取到需要的reader和writer参数,通过官方的注释我们也可以看出这里是为了获取对应插件的名称,我们提交的官方的一个demo,所以这里获取到的也是我们定义的streamreader和streamwriter
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rLpXETfB-1661089484967)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-50-14-image.png)] dataX源码学习_配置文件_09](https://file.cfanz.cn/uploads/png/2022/08/27/16/LHDY7Z3N36.png)
获取到配置文件后,下面就用doValidate来校验这个这些配置文件
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VbRliVHm-1661089484967)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-58-03-image.png)] dataX源码学习_jar包_10](https://file.cfanz.cn/uploads/png/2022/08/27/16/U0cNYUXW52.png)
下面为Validate的内容,这里看到都是空的,这个是为什么,因为我们的是stream流,限制不多,如果是在前面是涉及采集其他数据库的话,可能这里就需要校验一些配置是否合理了。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sUr7iaid-1661089484968)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-17-59-05-image.png)] dataX源码学习_json_11](https://file.cfanz.cn/uploads/png/2022/08/27/16/Va752567b1.png)
校验好配置文件后,调用start方法,开始进行容器执行,首先先判断是不是一个job文件,然后如果是job的话,就启动一个JobContainer,否则就启动一个TaskGroupContainer,在获取到需要的配置后,就调用一个start方法,执行这个容器。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DVexE3JG-1661089484969)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-18-06-09-image.png)] dataX源码学习_配置文件_12](https://file.cfanz.cn/uploads/png/2022/08/27/16/a3FLb31CfD.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D7a8XgHk-1661089484969)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-18-09-15-image.png)] dataX源码学习_java_13](https://file.cfanz.cn/uploads/png/2022/08/27/16/608Ef4cUEM.png)
从下面这张图可以看出,我们提交的配置文件信息都保存到这个configuration这个配置对象中了。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-611IUhc5-1661089484970)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-18-12-22-image.png)] dataX源码学习_jar包_14](https://file.cfanz.cn/uploads/png/2022/08/27/16/6Cc43566eS.png)
JobContainer
JobContainer主要有以下的过程
- doPrepare,首先需要先处理前操作,
- init,初始化Reader和Writer,先Reader再Writer
- prepare
- split
- scheduler
1.进入init
初始化Reader和Writer,先是Reader,然后再是Writer, 通过获取到Reader,然后可以
然后解析content里面reader的模块,
调用initReader读取reader配置文件
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eYjw9CZ5-1661089484971)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-18-20-16-image.png)] dataX源码学习_json_15](https://file.cfanz.cn/uploads/png/2022/08/27/16/04A2XMMa51.png)
首先处理好列,可以获取到我们设定列的信息
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mDxNP4If-1661089484971)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-18-27-33-image.png)] dataX源码学习_json_16](https://file.cfanz.cn/uploads/png/2022/08/27/16/HK5PAQ4Xe7.png)
writer也是同样的道理
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mrt2m4sf-1661089484972)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-18-32-15-image.png)] dataX源码学习_配置文件_17](https://file.cfanz.cn/uploads/png/2022/08/27/16/6Z5K166T61.png)
prepare
this.prepareJobReader();
this.prepareJobWriter();
prepare阶段需要对数据进行spilt
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
计算需要的任务的数量,reader和writer的数量应该是一致的
然后再得出Reader和Writer的分组配置
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Jn2ndSa-1661089484973)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-19-36-12-image.png)] dataX源码学习_配置文件_18](https://file.cfanz.cn/uploads/png/2022/08/27/16/E5Y1516e0I.png)
分割后我们可以看到,任务的reader和writer的信息和对应的TaskId都被整合到一起了。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Xqr2ky3f-1661089484974)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-19-39-59-image.png)] dataX源码学习_配置文件_19](https://file.cfanz.cn/uploads/png/2022/08/27/16/68MV990cN4.png)
schedule
这里CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL,为我们定义的通道的值,如果没有设置就默认设置为5,需要的通道数量就通过一个task的数量和需要通道的数量取一个最小值
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vq8jBQc9-1661089484974)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-19-43-46-image.png)] dataX源码学习_java_20](https://file.cfanz.cn/uploads/png/2022/08/27/16/ab0T40Y11c.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K5vCBqqT-1661089484975)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-19-48-36-image.png)] dataX源码学习_json_21](https://file.cfanz.cn/uploads/png/2022/08/27/16/34Sf6K1SWA.png)
post阶段
这个阶段会触发Reader和Writer的post阶段
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wFoMxroj-1661089484975)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-19-52-55-image.png)] dataX源码学习_java_22](https://file.cfanz.cn/uploads/png/2022/08/27/16/656X140L98.png)
this.invokeHooks();
打印运行情况
包括Cpu和GC的统计,包括错误记录的条数呀,速度等打印出来。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7USqQwDG-1661089484976)(C:\Users\大勇\AppData\Roaming\marktext\images\2022-07-27-19-57-02-image.png)] dataX源码学习_json_23](https://file.cfanz.cn/uploads/png/2022/08/27/16/b4BNUC06cf.png)
总结
做一些笔记吧,这样可以可以让自己的印象更加深刻一下,这里并没有详细介绍调度执行的部分,写的比较糙,但是如果一直跟着debug下来还是可以对dataX的源码有一些更加好的理解的。










