DataX安装与测试

1.前置条件

java 1.8、python 2.7

2.下载datax

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

3.解压

tar -zxvf datax.tar.gz

4.配置环境变量

vim /etc/profile

export DATAX_HOME=/app/hadoop/software/datax
export PATH=$DATAX_HOME/bin:$PATH

source /etc/profile

5.任务作业json配置文件

参考MySQL mysqlwriter mysqlreader,更多datax数据库插件参考DataX

{
    //全局配置
    "core":{
        "transport":{
            "channel":{
                "speed":{
                    //此处为数据导入的并发度,建议根据服务器硬件进行调优
                    "channel": 2,
                    //此处解除对读取行数的限制
                    "record":-1,
                    //此处解除对字节的限制
                    "byte":-1,
                    //每次读取batch的大小
                    "batchSize":2048
                }
            }
        }
    },
    "job": {
        //局部配置
        "setting": {
            //配置同步速度
            "speed": {
                //线程数
                "channel": 1
            }
        },
        "content": [
            {
                //读取器
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        //MySQL用户和密码
                        "username": "root",
                        "password": "root",
                        //读取MySQL哪些列,所有列:['*']
                        "column": [
                            "id",
                            "name"
                        ],
                        //筛选条件
                        "where": "",
                        //配置datax以id字段进行数据分片,如果不指定splitPk DataX视作使用单通道同步该表数据
                        "splitPk": "db_id",
                        //MySQL连接信息
                        "connection": [
                            {
                                //配置读取SQL脚本,如果配置该选项读取器会忽略table、column、where条件配置
                                "querySql": [
                                    "select db_id,on_line_flag from db_info where db_id < 10;"
                                ],                                                  
                                  //读取表
                                        "table": [""],
                                //JDBC连接地址
                                "jdbcUrl": [
                                    "jdbc:mysql://bad_ip:3306/database",
                                    "jdbc:mysql://127.0.0.1:bad_port/database",
                                    "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                //写入配置
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        //控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
                        "writeMode": "insert",
                        //账号和密码
                        "username": "root",
                        "password": "root",
                        //写入列名(依次写入全部列:[*])                    
                        "column": [
                            "id",
                            "name"
                        ],
                        //session配置
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        //在执行写入操作前执行相关的SQL操作
                        "preSql": [
                            "delete from test"
                        ],
                        //配置写入数据到目标表后执行的操作,与preSql同理
                        "postSql":"",
                        //默认值1024 一次性批量提交的记录数大小
                        "batchSize": 1024,
                        "connection": [
                            {
                                //JDBC URL
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                //目标表
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

6.DataX测试

将要一个MySQL实例中的一张表同步到另外一个MySQL实例中

任务配置脚本:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": ["*"],                        
                        "connection": [
                            {
                                "table": "testtable",
                                "jdbcUrl": [
                                    "jdbc:mysql://data-repo-06:3306/datax"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "bqs",
                        "password": "123456",
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.20.101:3306/demo2",
                                "table": [
                                    "datax_test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

执行脚本:

./bin/datax.py datax_test.json
如果觉得我的文章对你有用,请随意赞赏