异构数据源离线同步工具DataX

一、DataX3.0概览

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能,如图1-1所示。

异构数据源离线同步工具<a href='/tag/datax.html'>DataX</a>


DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。


异构数据源离线同步工具<a href='/tag/datax.html'>DataX</a>

核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

异构数据源离线同步工具<a href='/tag/datax.html'>DataX</a>

DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

二、DataX3.0的使用

要求

Java版本要求:jdk1.8及以上
Python版本要求:2.7.X,DataX未更新至Python3
DataX下载:DataX下载地址
当然你还可以去Github去下载源码进行编译,通过此种方式你可以自行选择你需要的配置进行打包,如只需要Mysql的,那么其他的相关支持你都不需要用到,也就不需要都打包,更轻量使用。与此同时,你还需要maven工具进行打包,在pom.Xml中删除你不需要的模块,再执行maven命令:mvn -U clean package assembly:assembly -Dmaven.test.skip=true,生成的文件在/target/datax/datax/下
确保上述步骤都通过后则继续往下看。

小例子

表2-1 测试表结构

字段名

类型

备注

name

varchar


age

int


age_true

int


并向其中插入40条数据,如表2-2所示。
表2-2 测试表数据

name age age_true
tom 23  
tom 23  
tom 23  
tom 23  
tom 23  
tom 23  
tom 23  
tom 23  
... ...  

构建任务Json
DataX工具是用json文件作为配置文件的,根据官方提供文档我们构建Json文件如下所示。

{
    "job": {
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "root",
                    "password": "123456",
                    "column": ["name", "age"],
                    "where": "age<100",
                    "connection": [{
                        "table": ["person"],
                        "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8"]
                    }]
                }
            },
            "writer": {
                "name": "mysqlwriter",
                "parameter": {
                    "username": "root",
                    "password": "123456",
                    "column": ["name", "age_true"],
                    "connection": [{
                        "table": ["person"],
                        "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8"
                    }]
                }
            }
        }],
        "setting": {
            "speed": {
                "channel": 1,
                "byte": 104857600
            },
            "errorLimit": {
                "record": 10,
                "percentage": 0.05
            }
        }
    }

它由三部分组成,分别是读,写和通用配置。
Reader部分,也就是读,常用以下几种参数,如表2-3所示。

表2-3 读参数表

参数名

解释

备注

name

与要读取的数据库一致

字符串

jdbcUrl

数据库链接

数组

会自动选择一个合法的链接

可以填写连接附件控制信息

username

用户名

字符串,数据库的用户名

password

密码

字符串,数据库的密码

table

要同步的表名

数组,需保证表结构一致

column

要同步的列名

数组

where

选取的条件

字符串

querySql

自定义查询语句

会自动忽略上述的同步条件

Writer部分,也就是写,常用以下几种参数,如表2-4所示。

参数名

解释

备注

name

与要读取的数据库一致

字符串

jdbcUrl

数据库链接

字符串

不和writer一样

可以填写连接附件控制信息

username

用户名

字符串,数据库的用户名

password

密码

字符串,数据库的密码

table

要同步的表名

数组,需保证表结构一致

column

要同步的列名

列名可以不对应,但是类型和总的个数要一致

preSql

写入前执行的语句

数组,比如清空表等

postSql

写入后执行的语句

数组

writeMode

写入方式,默认为insert

insert/replace/update

job.setting.speed(流量控制)
Job支持用户对速度的自定义控制,channel的值可以控制同步时的并发数,byte的值可以控制同步时的速度
job.setting.errorLimit(脏数据控制)
Job支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值),当Job传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job报错退出。

执行

Win+R+cmd进入命令行控制台,首先通过java –version和python查看是否满足要求,然后执行:python 空格{datax文件夹路径}\bin\datax.py 空格{json配置文件的路径},如,python D:\download\datax\bin\datax.py D:\download\datax\job\job2.json
接着控制台会打印出相应的信息,控制台乱码输入 chcp空格 65001

我们可以看到写入数据表中的数据已经发生了变化,此处是将age位置的信息写入到age_true位置上,结果如表2-5所示。

name age age_true
tom 23  
tom 23  
tom 23  
tom 23  
tom    23
tom    23
tom    23
tom    23
... ...

java中使用

数据库迁移通常是定期的,所以一般情况下我们是将他用作定时任务的,所以,我们需要在java环境下用定时任务去执行。

大致思路如下:

首先获得指定文件夹下的任务配置文件,然后调用cmd执行cmd程序,具体实现如下:

声明参数,参数请根据实际去修改:

//datax.py文件的路径

    @Value(value = "D:\\download\\datax\\bin\\datax.py")

    private String dataxPath;

    //任务文件夹的路径

    @Value(value = "D:\\download\\datax\\job\\")

    private String jsonPath;

    //python路径

    @Value(value = "C:\\Program Files\\Python27\\python.exe")

    private String pythonPath;



//获得任务文件夹下的所有json文件

    public File[] getFileList(){

        File file=new File(jsonPath);

        File[] files=file.listFiles((File f)->f.getName().endsWith(".json"));

        return files;

    }

//主程序

public void doTask(){

        File[] files=getFileList();

        for(File f:files){

            String cmd = pythonPath+" "+dataxPath+" "+f.getAbsolutePath();

            try {

                Process process = Runtime.getRuntime().exec(cmd);

                //返回信息写入流用控制台打出来

//此处转码,不然控制台中文乱码

                BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream(),"utf-8"));

                String line = null;

                while ((line = in.readLine()) != null) {

                    System.out.println(line);

                }

                in.close();

                process.waitFor();

            }

            catch (Exception e){

                logger.error(e.getMessage());

            }

        }

    }

{{collectdata}}

网友评论0