一、DataX3.0概览
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能,如图1-1所示。
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
二、DataX3.0的使用
要求
Java版本要求:jdk1.8及以上小例子
表2-1 测试表结构
字段名 | 类型 | 备注 |
name | varchar | |
age | int | |
age_true | int |
并向其中插入40条数据,如表2-2所示。
表2-2 测试表数据
name | age | age_true |
---|---|---|
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
... | ... |
构建任务Json
DataX工具是用json文件作为配置文件的,根据官方提供文档我们构建Json文件如下所示。
{ "job": { "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123456", "column": ["name", "age"], "where": "age<100", "connection": [{ "table": ["person"], "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8"] }] } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "123456", "column": ["name", "age_true"], "connection": [{ "table": ["person"], "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8" }] } } }], "setting": { "speed": { "channel": 1, "byte": 104857600 }, "errorLimit": { "record": 10, "percentage": 0.05 } } }它由三部分组成,分别是读,写和通用配置。
表2-3 读参数表
参数名 | 解释 | 备注 |
name | 与要读取的数据库一致 | 字符串 |
jdbcUrl | 数据库链接 | 数组 会自动选择一个合法的链接 可以填写连接附件控制信息 |
username | 用户名 | 字符串,数据库的用户名 |
password | 密码 | 字符串,数据库的密码 |
table | 要同步的表名 | 数组,需保证表结构一致 |
column | 要同步的列名 | 数组 |
where | 选取的条件 | 字符串 |
querySql | 自定义查询语句 | 会自动忽略上述的同步条件 |
Writer部分,也就是写,常用以下几种参数,如表2-4所示。
参数名 | 解释 | 备注 |
name | 与要读取的数据库一致 | 字符串 |
jdbcUrl | 数据库链接 | 字符串 不和writer一样 可以填写连接附件控制信息 |
username | 用户名 | 字符串,数据库的用户名 |
password | 密码 | 字符串,数据库的密码 |
table | 要同步的表名 | 数组,需保证表结构一致 |
column | 要同步的列名 | 列名可以不对应,但是类型和总的个数要一致 |
preSql | 写入前执行的语句 | 数组,比如清空表等 |
postSql | 写入后执行的语句 | 数组 |
writeMode | 写入方式,默认为insert | insert/replace/update |
job.setting.speed(流量控制)
Job支持用户对速度的自定义控制,channel的值可以控制同步时的并发数,byte的值可以控制同步时的速度
job.setting.errorLimit(脏数据控制)
Job支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值),当Job传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job报错退出。
执行
Win+R+cmd进入命令行控制台,首先通过java –version和python查看是否满足要求,然后执行:python 空格{datax文件夹路径}\bin\datax.py 空格{json配置文件的路径},如,python D:\download\datax\bin\datax.py D:\download\datax\job\job2.json
接着控制台会打印出相应的信息,控制台乱码输入 chcp空格 65001
我们可以看到写入数据表中的数据已经发生了变化,此处是将age位置的信息写入到age_true位置上,结果如表2-5所示。
name | age | age_true |
---|---|---|
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
tom | 23 | |
... | ... |
java中使用
数据库迁移通常是定期的,所以一般情况下我们是将他用作定时任务的,所以,我们需要在java环境下用定时任务去执行。
大致思路如下:
首先获得指定文件夹下的任务配置文件,然后调用cmd执行cmd程序,具体实现如下:
声明参数,参数请根据实际去修改:
//datax.py文件的路径 @Value(value = "D:\\download\\datax\\bin\\datax.py") private String dataxPath; //任务文件夹的路径 @Value(value = "D:\\download\\datax\\job\\") private String jsonPath; //python路径 @Value(value = "C:\\Program Files\\Python27\\python.exe") private String pythonPath; //获得任务文件夹下的所有json文件 public File[] getFileList(){ File file=new File(jsonPath); File[] files=file.listFiles((File f)->f.getName().endsWith(".json")); return files; } //主程序 public void doTask(){ File[] files=getFileList(); for(File f:files){ String cmd = pythonPath+" "+dataxPath+" "+f.getAbsolutePath(); try { Process process = Runtime.getRuntime().exec(cmd); //返回信息写入流用控制台打出来 //此处转码,不然控制台中文乱码 BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream(),"utf-8")); String line = null; while ((line = in.readLine()) != null) { System.out.println(line); } in.close(); process.waitFor(); } catch (Exception e){ logger.error(e.getMessage()); } } }
网友评论0