[TOC]

0x00 基础概述

1.什么是DataX?

DataX 是阿里云商用产品 DataWorks 数据集成的开源版本,它是一个异构数据源的离线数据同步工具/平台(ETL工具)。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX.Logo

DataX.Logo

Tips : 异构即不同类型的应用或者数据源,例如MySQL/Oracle/DB2/MongDB等不同类型的数据库源
Tips : 离线数据同步以及CDC实时数据复制,前者常用Sqoop以及DataX工具。
Tips : ETL(Extract-Transform-Load)工具描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据,其常用在数据仓库,但其对象并不限于数据仓库(DW)。


前面我们说到 DataX 的前身是阿里云商业化产品 DataWorks, 其稳定、高效、支持多样化等优点就不言而喻, DataWorks 致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks 数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。商业版本参见(https://www.aliyun.com/product/bigdata/ide)


DataX它有何特点?

答: DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

Github 仓库: https://github.com/alibaba/DataX
Gitee 国内仓库: https://gitee.com/mirrors/DataX


2.DataX的设计思想

描述:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

简单得说,DataX就像中间商一样为每一个服务对象进行需求供应。

WeiyiGeek.DataX设计思想

WeiyiGeek.DataX设计思想


3.DataX的框架设计

描述: DataX本身作为离线数据同步框架,离线(批量)的数据通道通过定义数据来源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。

  • Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

  • Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

WeiyiGeek.离线(批量)同步简介

例如,将MySQL中的数据离线同步到HDFS之中来展示DataX的框架设计结构。

WeiyiGeek.DataX框架设计

WeiyiGeek.DataX框架设计

Tips : DataX架构设计流程类似source(数据来源)-> channel(数据存储池中转通道) -> sink (目的地)流程,


4.DataX的运行原理

描述: DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

WeiyiGeek.DataX运行原理

WeiyiGeek.DataX运行原理


DataX 调度流程:
描述: DataX完成单个数据同步的作业(Job),当DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。在Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出,否则异常退出,并且进程退出值非0


核心模块解析:

  • DataX Job 模块 : 是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataX Task : 由Job切分而来, 是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作(包含Reader—>Channel—>Writer流程)。
  • DataX Schedule 模块 : 将Task组成TaskGroup ,注意单个组的默认并发数量为5(动态概念即同时有5个在运行)。
  • DataX TaskGroup : 负责启动以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。


举例说明,当用户提交一个Datax Job 并且配置了20并非数,目的是将一个100张分别的MySQL数据同步到odps中。

  • (1) 首先根据分库分表切分成为100个Task。
  • (2) 根据要达到20个并发,此时我们需要分配4个TaskGroup,简单的说20并发除以每个TaskGroup的默认并发5得到4。
  • (3) 此时每一个TaskGroup负责以5并发数,共计运行25个Task,简单的说100Task除以4个TaskGroup就得到每个组需要执行的Task数。


5.DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据源如下表 (https://github.com/alibaba/DataX#support-data-channels)

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB

Tips : DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。


6.为何选择DataX?

描述: 我们可以从DataX 3.0六大核心优势入手了解我们为何选择它。

(1) 可靠的数据质量监控

  • 完美解决数据传输个别类型失真问题
  • 提供作业全链路的流量、数据量运行时监控
  • 提供脏数据探测

(2) 丰富的数据转换功能 : DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。

(3) 精准的速度控制 : 新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

1
2
3
4
5
"speed": {
"channel": 5,
"byte": 1048576,
"record": 10000
}

(4) 强劲的同步性能 : DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长

(5) 健壮的容错机制

  • 线程内部重试
  • 线程级别重试

(6) 极简的使用体验

  • 易用: 开箱即用支持linux和windows,只需要短短几步骤就可以完成数据的传输
  • 详细: 在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况
WeiyiGeek.详细日志输出

WeiyiGeek.详细日志输出


DataX 与 Sqoop 间的对比(VS)

Function DataX Sqoop
运行模式 单进程多线程 (可以独立启动多个进程) MR模型
文件格式 ORC 支持 orc 不支持
分布式 不支持(但可以通过调度系统扩充) 支持
流量控制 支持 需要定制
统计信息 基本详细的统计信息 没有(分布式的数据采集不方便)
数据校验 在core部分有校验功能 没有
数据监控 需要定制 需要定制
社区支持 开源时间短社区活跃一般 开源时间长社区活跃,核心部分变动少
MySQL读写 单机压力大、读写粒度容易控制 MR 模式错误处理麻烦
Hive 读写 单机压力大 均衡写入很好


DataX 应用场景与适用人群。

  • 应用于企业应用中数据迁移备份,以及供不同接入的应用数据库的应用进行数据的访问。
  • 适用于从事数据采集工作人员,以及企业中从0到1建设阶段IT运维、以及DBA运维管理人员。


0x01 Datax 安装使用

1.快速开始

描述: DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

系统环境依赖-System Requirements

  • Linux
  • JDK ( 1.8以上,推荐1.8 )
  • Python ( 推荐 Python2.6.X )
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 国内镜像下载: https://npmmirror.com/mirrors/python/2.6.9/Python-2.6.9.tgz
    export PYTHON_HOME="/usr/local/python27"
    wget https://www.python.org/ftp/python/2.7.18/Python-2.7.18.tgz
    tar -zxf Python-2.7.18.tgz -C /usr/local/
    apt install gcc g++ make
    cd /usr/local/Python-2.7.18
    ./configure --prefix=/usr/local/python27
    make && make install
    ln -s /usr/local/python27/bin/python2.7 /usr/bin/python
    ln -s /usr/local/python27/bin/python2.7 /usr/bin/python2
    python --version && python2 --version
    Python 2.7.18
  • Apache Maven 3.x (Compile DataX)


安装部署方式

  • 方法一、直接下载 DataX工具包:DataX下载地址

    下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

    1
    2
    3
    4
    5
    6
    $ export DATAX_HOME="/usr/local/datax"
    $ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
    $ tar -zxf datax.tar.gz -C /usr/local/
    $ cd ${DATAX_HOME}/bin
    $ python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/job.json
    $ ln -s ${DATAX_HOME}/bin/datax.py /usr/local/datax.py


  • 方法二、下载DataX源码,自己编译:DataX源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    # (1)、下载DataX源码:
    $ git clone git@github.com:alibaba/DataX.git


    # (2)、通过maven打包:
    $ cd {DataX_source_code_home}
    $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true


    # (3) 打包成功,日志显示如下:
    [INFO] BUILD SUCCESS
    [INFO] -----------------------------------------------------------------
    [INFO] Total time: 08:12 min
    [INFO] Finished at: 2015-12-13T16:26:48+08:00
    [INFO] Final Memory: 133M/960M
    [INFO] -----------------------------------------------------------------

    Tips : 打包成功后的DataX包位于 `{DataX_source_code_home}/target/datax/datax/ `

Datax 解压后其结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cd /usr/local/datax
$ /usr/local/datax# tree -d -L 2
├── bin # 可执行的Python脚本
├── conf # Datax 配置文件
├── job # 离线同步任务
├── lib # 依赖库
├── log # 任务执行过程日志
├── log_perf
├── plugin # 各类数据库读写插件
│   ├── reader
│   └── writer
├── script # 脚本存放
└── tmp # 临时目录


运行测试
描述: 采用 Datax 自带的 job/job.json 进行运行测试验证安装环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/usr/local/datax# ./bin/datax.py job/job.json
# (1) 显示机器相关信息(CPU/内存、以及JVM相关信息)
2021-10-26 11:20:54.301 [main] INFO Engine - the machine info =>
osInfo: Eclipse Foundation 16 16.0.2+7
jvmInfo: Linux amd64 5.4.0-88-generic
cpu num: 4
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1

GC Names [G1 Young Generation, G1 Old Generation]
MEMORY_NAME | allocation_size | init_size
CodeHeap 'profiled nmethods' | 117.21MB | 2.44MB
G1 Old Gen | 1,024.00MB | 970.00MB
G1 Survivor Space | -0.00MB | 0.00MB
CodeHeap 'non-profiled nmethods' | 117.22MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
Metaspace | -0.00MB | 0.00MB
G1 Eden Space | -0.00MB | 54.00MB
CodeHeap 'non-nmethods' | 5.57MB | 2.44MB

# (2) Job 任务执行情况
2021-10-26 11:21:04.364 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.021s | All Task WaitReaderTim

# (3) job 任务执行CPU与GC占比信息
2021-10-26 11:21:04.367 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
G1 Young Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
G1 Old Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s

# (4) Job 任务执行完毕总计数据(非常重要) 、可以验证同步的数据是否全部同步成功。
2021-10-26 11:21:04.367 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.021s | All Task WaitReaderTime 0.041s | Percentage 100.00%
2021-10-26 11:21:04.368 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-26 11:20:54
任务结束时刻 : 2021-10-26 11:21:04
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0

Tips : 上面 Job 读写输出数据为DataX 19890604 1989-06-03 23:00:00 true test


2.基础使用

描述: 我们可以通过DataX数据源参考指南(https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)来查看具体每个插件需要或者可选的插件。

插件示例获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
$ ./bin/datax.py -r streamreader -w streamwriter
# (1) 此处将会显示 读写 插件的使用文档说明
Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

# (2) 命令执行示例
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json

# (3) Job 任务配置示例 Json 格式 (以下参数我简单描述)
tee job/stream2stream.json <<'EOF'
{
"job": {
"content": [
{
// 读插件
"reader": {
"name": "streamreader", // 指定插件名称
"parameter": {
"column": [ // 字段类与值 (必须进行指定)
{
"value": "WeiyiGeek",
"type": "string"
},
{
"value": 2021,
"type": "long"
},
{
"value": "2021-01-01 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": "10" // 切片记录计数
}
},
// 写插件
"writer": {
"name": "streamwriter", // 指定使用的插件名称
"parameter": {
"encoding": "UTF-8", // 编码格式
"print": true // 是否终端打印
}
}
}
],
"setting": {
"speed": { // 同步速度采用的类型
"channel": "2" // 并发数
//"byte": 10485760 // 字节数
}
}
}
}
EOF

执行结果: (执行时请删除上述备注)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
python bin/datax.py job/stream2stream.json
# (1) 两个任务进程
2021-10-26 16:28:33.568 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.

# (2) 每个任务进程执行10条 (即总数20条)
2021-10-26 16:28:33.579 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2021-10-26 16:28:33.595 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[1] attemptCount[1] is started
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test
WeiyiGeek 2021 2021-01-01 00:00:00 true test

# (3) 执行结果信息
2021-10-26 16:28:43.576 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 520 bytes | Speed 52B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReade rTime 0.002s | Percentage 100.00%
2021-10-26 16:28:43.576 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-26 16:28:33
任务结束时刻 : 2021-10-26 16:28:43
任务总计耗时 : 10s
任务平均流量 : 52B/s
记录写入速度 : 2rec/s
读出记录总数 : 20
读写失败总数 : 0

Tips : 执行后的日志除了终端打印还会在本地日志目录中存放(/usr/local/datax/log/2021-10-26/b_stream2stream_json-16_28_33.312.log)文件。

Tips : 非常执行同步写入的总次数为setting.speed.channel * sliceRecordCount


0x02 Datax 实战使用

1.MySQL-To-HDFS

环境&准备说明:
描述: 为了快速搭建测试的数据库环境,本系列将采用docker容器进行搭建部署,如没有安装docker 和 docker-compose请参照本博客中的Docker系列课程。

(1) MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# docker-compose.yml
version: '3.1'
services:
db8:
image: mysql
container_name: mysql8.x
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: www.weiyigeek.top
MYSQL_DATABASE: test
MYSQL_USER: test
MYSQL_PASSWORD: www.weiyigeek.top
volumes:
- "/app/mysql8:/var/lib/mysql"
ports:
- 3306:3306

# 部署流程 #
docker pull singularities/hadoop
docker-compose up -d

# 创建测试表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`uid` int(0) NOT NULL AUTO_INCREMENT COMMENT '用户id',
`name` varchar(32) CHARACTER SET utf8mb4 NOT NULL COMMENT '用户名称',
`age` int(0) NOT NULL COMMENT '用户年龄',
`hobby` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '用户爱好',
`operation_time` datetime(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '插入时间',
PRIMARY KEY (`uid`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (1, 'WeiyiGeek', 20, 'Network,Computer', '2021-10-12 14:34:03');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (2, 'Elastic', 18, '数据分析,数据采集,数据处理', '2021-10-12 17:16:34');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (3, 'Logstash', 20, '日志采集,日志过滤', '2021-10-12 17:16:59');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (4, 'Beats', 10, '通用日志采集', '2021-10-12 17:17:06');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (5, 'Kibana', 19, '数据分析,日志搜寻,日志数据展示,可视化', '2021-10-12 17:27:38');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (6, 'C', 25, '面向过程编程语言', '2021-10-13 02:43:30');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (7, 'C++', 25, '面向对象', '2021-10-13 10:44:59');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (8, 'Python', 26, '编程语言', '2021-10-13 10:48:45');

# 此时test表中有如下数据。
mysql> select * from test.user;
+-----+-----------+-----+---------------------------------------+---------------------+
| uid | name | age | hobby | operation_time |
+-----+-----------+-----+---------------------------------------+---------------------+
| 1 | WeiyiGeek | 20 | Network,Computer | 2021-10-12 14:34:03 |
| 2 | Elastic | 18 | 数据分析,数据采集,数据处理 | 2021-10-12 17:16:34 |
| 3 | Logstash | 20 | 日志采集,日志过滤 | 2021-10-12 17:16:59 |
| 4 | Beats | 10 | 通用日志采集 | 2021-10-12 17:17:06 |
| 5 | Kibana | 19 | 数据分析,日志搜寻,日志数据展示,可视化 | 2021-10-12 17:27:38 |
| 6 | C | 25 | 面向过程编程语言 | 2021-10-13 02:43:30 |
| 7 | C++ | 25 | 面向对象 | 2021-10-13 10:44:59 |
| 8 | Python | 26 | 编程语言 | 2021-10-13 10:48:45 |
+-----+-----------+-----+---------------------------------------+---------------------+
8 rows in set (0.04 sec)

# 表中字段类型
mysql> desc test.user;
+----------------+--------------+------+-----+---------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+----------------+--------------+------+-----+---------+-----------------------------+
| uid | int | NO | PRI | NULL | auto_increment |
| name | varchar(32) | NO | | NULL | |
| age | int | NO | | NULL | |
| hobby | varchar(255) | NO | | NULL | |
| operation_time | datetime | YES | | NULL | on update CURRENT_TIMESTAMP |
+----------------+--------------+------+-----+---------+-----------------------------+
5 rows in set (0.04 sec)


(2) HDFS
Docker HDFS镜像参考地址: https://registry.hub.docker.com/r/gradiant/hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# hdfs-site 配置文件
tee /tmp/hdfs-site.xml <<'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property><name>dfs.namenode.name.dir</name><value>file:///hadoop/dfs/name</value></property>
<property><name>dfs.namenode.rpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.servicerpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.http-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.https-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.client.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.datanode.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.namenode.datanode.registration.ip-hostname-check</name><value>false</value></property>
<property><name>dfs.permissions.enabled</name><value>false</value></property>
</configuration>
EOF

# 部署流程 #
docker pull gradiant/hdfs-namenode
docker pull gradiant/hdfs-datanode
docker run -d --name hdfs-namenode -v /tmp/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml \
-p "8020:8020" \
-p "14000:14000" \
-p "50070:50070" \
-p "50075:50075" \
-p "10020:10020" \
-p "13562:13562" \
-p "19888:19888" gradiant/hdfs-namenode
# 此处需要等待 hdfs-namenode 启动完毕后才执行如下命令
docker run -d --link hdfs-namenode --name hdfs-datanode1 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode
docker run -d --link hdfs-namenode --name hdfs-datanode2 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode
docker run -d --link hdfs-namenode --name hdfs-datanode3 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode

# 测试:在 hdfs 中创建和列出示例文件夹
docker exec -ti hdfs-namenode hadoop fs -mkdir /hdfs # 在根目录下创建hdfs文件夹
docker exec -ti hdfs-namenode hadoop fs -mkdir -p /hdfs/d1/d2 # 创建多级目录
docker exec -ti hdfs-namenode hadoop fs -ls / # 列出根目录下的文件列表
# Found 1 items
# drwxr-xr-x - hdfs supergroup 0 2021-10-27 08:42 /hdfs
# HDFS 常规命令帮助
hadoop fs
# 创建单级、多级目录
hadoop fs -mkdir /hdfs
hadoop fs -mkdir -p /hdfs/d1/d2
# 上传文件到HDFS
echo "hello world" >> local.txt #创建文件
hadoop fs -put local.txt /hdfs/ #上传文件到hdfs
# 下载hdfs文件
hadoop fs -get /hdfs/local.txt
# 删除hdfs中的文件
hadoop fs -rm /hdfs/local.txt
# 删除hdfs中的目录
hadoop fs -rmdir /hdfs/d1/d2

WeiyiGeek.docker-hdfsTest

WeiyiGeek.docker-hdfsTest


mysqlreader 快速使用说明与配置样例

  • (1) 关键参数&类型转换:

    jdbcUrl: 描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。
    username: 数据源的用户名
    password: 数据源指定用户名的密码
    table: 所选取的需要同步的表,支持多张表同时抽取,用户自己需保证多张表是同一schema结构.
    column: 所配置的表中需要同步的列名集合,用户使用*代表默认使用所有列配置,例如['*']一般不会采用此种方式。
    where: 筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取,注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
    splitPk: 表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,使得提高数据同步的效能,目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型
    querySql: 用户可自定义定义筛选SQL, 此参数会忽略 table、column 、where 选项,其优先级最高

MysqlReader针对Mysql类型转换列表: (注意除下述罗列字段类型外,其他类型均不支持。)

DataX 内部类型 Mysql 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

重点注意:

  • 除上述罗列字段类型外,其他类型均不支持
  • tinyint(1) DataX视作为整形
  • year DataX视作为字符串类型
  • bit DataX属于未定义行为

Tips : mysqlreader 插件默认不支持MySQL8.X由于其 jdbc_driver_class驱动名称"com.mysql.cj.jdbc.Driver"


  • (2) 配置样例
    (2.1) 配置一个从Mysql数据库同步抽取数据到本地的作业:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    tee job/mysql2stream.json <<'EOF'
    {
    "job": {
    "content": [
    {
    "reader": {
    "name": "mysqlreader",
    "parameter": {
    "column": [
    "uid",
    "name",
    "operation_time"
    ],
    "connection": [
    {
    "jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
    "table": ["user"]
    }
    ],
    "username": "test5",
    "password": "weiyigeek.top",
    "where": "uid > 0"
    }
    },
    "writer": {
    "name": "streamwriter",
    "parameter": {
    "print": true,
    "encoding": "UTF-8"
    }
    }
    }
    ],
    "setting": {
    "speed": {
    "channel": "2"
    }
    }
    }
    }
    EOF
    执行结果:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    2021-10-26 21:43:04.419 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select uid,name,operation_time from user where (uid > 0)] 
    jdbcUrl:[jdbc:mysql://********&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
    1 WeiyiGeek 2021-10-12 14:34:03
    2 Elastic 2021-10-12 17:16:34
    3 Logstash 2021-10-12 17:16:59
    4 Beats 2021-10-12 17:17:06
    5 Kibana 2021-10-12 17:27:38
    6 C 2021-10-13 02:43:30
    7 C++ 2021-10-13 10:44:59
    8 Python 2021-10-13 10:48:45
    .....
    2021-10-26 21:43:04.497 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[103]ms
    2021-10-26 21:43:14.393 [job-0] INFO StandAloneJobContainerCommunicator - Total 8 records, 117 bytes | Speed 11B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
    2021-10-26 21:43:14.394 [job-0] INFO JobContainer -
    任务启动时刻 : 2021-10-26 21:43:04
    任务结束时刻 : 2021-10-26 21:43:14
    任务总计耗时 : 10s
    任务平均流量 : 11B/s
    记录写入速度 : 0rec/s
    读出记录总数 : 8
    读写失败总数 : 0


(2.2) 配置一个自定义SQL的数据库同步任务到本地内容的作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
cat job/mysql2stream1.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
"querySql": ["select uid,name,hobby,operation_time from user where (uid > 3);"],
}
],
"username": "test5",
"password": "weiyigeek.top",
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}

执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 执行SQL与Jdbc URL
2021-10-26 21:50:58.904 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select uid,name,hobby,operation_time from user where (uid > 3);]
jdbcUrl:[jdbc:mysql://*******&rewriteBatchedStatements=true].

# 打印查询到的数据到终端中
4 Beats 通用日志采集 2021-10-12 17:17:06
5 Kibana 数据分析,日志搜寻,日志数据展示,可视化 2021-10-12 17:27:38
6 C 面向过程编程语言 2021-10-13 02:43:30
7 C++ 面向对象 2021-10-13 10:44:59
8 Python 编程语言 2021-10-13 10:48:45

# 任务执行结果统计
2021-10-26 21:51:08.877 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-26 21:50:58
任务结束时刻 : 2021-10-26 21:51:08
任务总计耗时 : 10s
任务平均流量 : 10B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0


hdfswriter 快速使用说明与配置样例

  • (1) 关键参数&类型转换: HdfsWriter提供向HDFS文件系统指定路径中写入TEXTFile文件和ORCFile文件,文件内容可与hive中表关联。

    defaultFS:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口
    fileType: 文件的类型,目前只支持用户配置为”text”(textfile文件格式)或”orc”(orc表示orcfile文件格式)。
    path: 存储到Hadoop hdfs文件系统的路径信息,HdfsWriter会根据并发配置在Path目录下写入多个文件
    fileName: HdfsWriter写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
    column: 写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型。
    writeMode: 写入前数据清理处理模式,append 写入前不做任何处理, onConflict,如果目录下有fileName前缀的文件,直接报错。
    fieldDelimiter: 写入时的字段分隔符,需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据
    compress: 写入文件压缩类型,默认没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)。
    encoding: 写文件的编码配置, 默认值 utf-8 慎重修改。
    haveKerberos : 是否有Kerberos认证默认false,如果为True则配置项kerberosKeytabFilePath,kerberosPrincipal为必填。
    kerberosKeytabFilePath: Kerberos认证 keytab文件路径,绝对路径
    kerberosPrincipal: Kerberos认证Principal名,如xxxx/hadoopclient@xxx.xxx
    hadoopConfig: HadoopConfig 高级HA配置:

    1
    2
    3
    4
    5
    6
    7
    8
    // 名称空间: testDfs
    "hadoopConfig":{
    "dfs.nameservices": "testDfs",
    "dfs.ha.namenodes.testDfs": "namenode1,namenode2",
    "dfs.namenode.rpc-address.aliDfs.namenode1": "主机名:端口",
    "dfs.namenode.rpc-address.aliDfs.namenode2": "主机名:端口",
    "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    }

HdfsWriter 针对 Hive 数据类型转换列表:
| DataX 内部类型| HIVE 数据类型 |
| ——– | —– |
| Long |TINYINT,SMALLINT,INT,BIGINT |
| Double |FLOAT,DOUBLE |
| String |STRING,VARCHAR,CHAR |
| Boolean |BOOLEAN |
| Date |DATE,TIMESTAMP |


  • (2) 配置样例
    (2.1) 从MySQL同步数据HDFS中实例演示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    // 示例生成
    bin/datax.py -r mysqlreader -w hdfswriter > mysql2hdfs.json

    // 最终示例
    tee job/mysql2hdfs3.json <<'EOF'
    {
    "job": {
    "content": [
    {
    "reader": {
    "name": "mysqlreader",
    "parameter": {
    "connection": [
    {
    "jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
    "querySql": ["select uid,name,hobby,operation_time from user where (uid > 3);"],
    }
    ],
    "username": "test5",
    "password": "weiyigeek.top",
    }
    },
    "writer": {
    "name": "hdfswriter",
    "parameter": {
    "column": [
    {"name":"uid","type":"Long"},
    {"name":"name","type":"string"},
    {"name":"hobby","type":"string"},
    {"name":"operation_time","type":"Date"},
    ],
    "compress": "gzip",
    "defaultFS": "hdfs://10.10.107.225:8020",
    "fieldDelimiter": "|",
    "fileName": "mysql-test-user",
    "fileType": "test",
    "path": "/hdfs",
    "writeMode": "append",
    "encoding": "UTF-8"
    }
    }
    }
    ],
    "setting": {
    "speed": {
    "channel": "1"
    }
    }
    }
    }
    EOF

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/usr/local/datax# ./bin/datax.py job/mysql2hdfsjson
# 插件加载
2021-10-27 18:25:25.794 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2021-10-27 18:25:25.794 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do prepare work .
2021-10-27 18:25:25.853 [job-0] INFO HdfsWriter$Job - 由于您配置了writeMode append, 写入前不做清理工作, [/] 目录下写入相应文件名前缀 [mysql-test-user] 的文件

# mysqlreader 开始执行SQL读取数据
2021-10-27 18:25:25.895 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql、

# hdfswriter 准备将数据写入到临时目录和文件中
2021-10-27 18:25:25.910 [0-0-0-writer] INFO HdfsWriter$Task - begin do write...
2021-10-27 18:25:25.911 [0-0-0-writer] INFO HdfsWriter$Task - write to file : [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8]

# mysqlreader 读取完成
2021-10-27 18:25:25.923 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql

# hdfswriter 将写入的临时文件进行重命名`mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz`并删除临时目录和文件
2021-10-27 18:25:35.885 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do post work.
2021-10-27 18:25:35.885 [job-0] INFO HdfsWriter$Job - start rename file [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz] to file [hdfs://10.10.107.225:8020/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz].
2021-10-27 18:25:35.902 [job-0] INFO HdfsWriter$Job - finish rename file [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz] to file [hdfs://10.10.107.225:8020/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz].
2021-10-27 18:25:35.902 [job-0] INFO HdfsWriter$Job - start delete tmp dir [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82] .
2021-10-27 18:25:35.911 [job-0] INFO HdfsWriter$Job - finish delete tmp dir [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82] .
2021-10-27 18:25:35.912 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.

# 结果统计
任务启动时刻 : 2021-10-27 18:25:24
任务结束时刻 : 2021-10-27 18:25:36
任务总计耗时 : 11s
任务平均流量 : 6B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0

# 从HDFS中读取插入的text数据
$ docker inspect hdfs-datanode1 | grep '"IPAddress"' | head -n 1
"IPAddress": "172.17.0.3",

$ curl "http://172.17.0.3:50075/webhdfs/v1/mysql-test-user__7630c0e0_d169_43cf_a808_272ad7d907bc?op=OPEN&namenoderpcaddress=0f3e052efe21:8020&offset=0"
4|Beats|通用日志采集
5|Kibana|数据分析,日志搜寻,日志数据展示,可视化
6|C|面向过程编程语言
7|C++|面向对象
8|Python|编程语言

WeiyiGeek.WriteDataHDFSResult

WeiyiGeek.WriteDataHDFSResult

Tips : 总结Datax针对HDFS的写入流程,首先将数据写入到一个临时文件,如果全部成功则重命名(移动)临时文件名、并删除临时目录。如果个别数据失败则Job任务失败,删除临时目录和临时文件。
Tips : 从上面结果可以看出HDFS实际执行时会在该文件名后添加随机的后缀作为每个线程的实际写入文件名。


2.HDFS-To-MySQL

hdfsreader 快速使用说明与配置样例

  • (1) 快速介绍和参数说明
    HdfsReader支持的文件格式有textfile(text)、orcfile(orc)、rcfile(rc)、sequence file(seq)和普通逻辑二维表(csv)类型格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。

参数说明:

path: 要读取的文件路径,如果要读取多个文件,可以使用正则表达式”“,注意这里可以支持填写多个路径,比如需要读取表名叫mytable01下分区day为20150820这一天的所有数据,则配置如下:`”path”: “/user/hive/warehouse/mytable01/20150820/defaultFS: Hadoop hdfs文件系统namenode节点地址 fileType: 文件的类型,目前只支持用户配置为“text”、”orc”、”rc”、”seq”、”csv”column: 读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,如{ “type”: “long”, “index”: 0}, { “type”: “string”, “value”: “alibaba”}`
fieldDelimiter: 读取的字段分隔符
encoding: 读取文件的编码配置
nullFormat: 文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null。
haveKerberos:是否有Kerberos认证,默认false,例如如果用户配置true,则配置项kerberosKeytabFilePath,kerberosPrincipal为必填。
kerberosKeytabFilePath: Kerberos认证 keytab文件路径,绝对路径
kerberosPrincipal: Kerberos认证Principal名,如xxxx/hadoopclient@xxx.xxx
compress: 当fileType(文件类型)为csv下的文件压缩方式,目前仅支持 gzip、bz2、zip、lzo、lzo_deflate、hadoop-snappy、framing-snappy压缩;
csvReaderConfig: 取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
"csvReaderConfig":{
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}

# 所有配置项及默认值,配置时 csvReaderConfig 的map中请严格按照以下字段名字进行配置:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;


HdfsReader提供了类型转换的建议表如下:

DataX 内部类型 Hive表 数据类型
Long TINYINT,SMALLINT,INT,BIGINT
Double FLOAT,DOUBLE
String String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY
Boolean BOOLEAN
Date Date,TIMESTAMP

其中:

  • Long是指Hdfs文件文本中使用整形的字符串表示形式,例如”123456789”。
  • Double是指Hdfs文件文本中使用Double的字符串表示形式,例如”3.1415”。
  • Boolean是指Hdfs文件文本中使用Boolean的字符串表示形式,例如”true”、”false”。不区分大小写。
  • Date是指Hdfs文件文本中使用Date的字符串表示形式,例如”2014-12-31”。

特别提醒:

  • Hive支持的数据类型TIMESTAMP可以精确到纳秒级别,所以textfile、orcfile中TIMESTAMP存放的数据类似于”2015-08-21 22:40:47.397898389”,如果转换的类型配置为DataX的Date,转换之后会导致纳秒部分丢失,所以如果需要保留纳秒部分的数据,请配置转换类型为DataX的String类型。

Tips: 目前HdfsReader不支持对Hive元数据数据库进行访问查询,因此用户在进行类型转换的时候,必须指定数据类型,如果用户配置的column为"*",则所有column默认转换为string类型


mysqlwriter 快速使用说明与配置样例

  • (1) 描述: 我们使用 MysqlWriter 从数仓导入数据到 Mysql,同时 MysqlWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)或者replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。

MysqlWriter 可用参数:

jdbcUrl: 目的数据库的 JDBC 连接信息。
username: 目的数据库的用户名。
password: 目的数据库的密码。
table: 目的表的表名称。注意:table 和 jdbcUrl 必须包含在 connection 配置单元中
column: 目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: “column”: [“id”,”name”,”age”]。如果要依次写入全部列,使用表示, 例如: “column”: `[““]`。
session: DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性。
preSql: 写入数据到目的表前,会先执行这里的标准语句
postSql: 写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
writeMode: 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
batchSize: 一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。

MysqlWriter 针对 Mysql 类型转换列表: Long、Double、String、Date、Boolean、Bytes 与Mysqlreader插件是一致的。


配置示例演示

  • (1) 从HDFS同步数据到MySQL中实例演示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    # 操作重命名hadoop中指定文件名称
    /usr/local/datax# docker exec -it hdfs-namenode hadoop fs -mv /mysql-test-user__7630c0e0_d169_43cf_a808_272ad7d907bc /mysql-test-user.txt

    # 数据库表窗口
    mysql> CREATE TABLE IF NOT EXISTS test.hdfsreader like test.user;
    Query OK, 0 rows affected (0.48 sec)

    # 表结构
    mysql> DESC test.hdfsreader;
    +----------------+--------------+------+-----+---------+-----------------------------+
    | Field | Type | Null | Key | Default | Extra |
    +----------------+--------------+------+-----+---------+-----------------------------+
    | uid | int(11) | NO | PRI | NULL | auto_increment |
    | name | varchar(32) | NO | | NULL | |
    | hobby | varchar(255) | NO | | NULL | |
    | operation_time | datetime | YES | | NULL | on update CURRENT_TIMESTAMP |
    +----------------+--------------+------+-----+---------+-----------------------------+
    4 rows in set (0.03 sec)

    # hdfsreader =>> mysqlwriter 示例文件生成
    $ bin/datax.py -r hdfsreader -w mysqlwriter

    # 最终的Job配置
    tee job/hdfs2mysql.json <<'EOF'
    {
    "job": {
    "content": [
    {
    "reader": {
    "name": "hdfsreader",
    "parameter": {
    "column": [
    {"index":1,"type":"string"},
    {"index":2,"type":"string"}
    ],
    "defaultFS": "hdfs://10.10.107.225:8020",
    "encoding": "UTF-8",
    "fieldDelimiter": "|",
    "fileType": "text",
    "path": "/mysql-test-user.txt"
    }
    },
    "writer": {
    "name": "mysqlwriter",
    "parameter": {
    "column": [
    "name",
    "hobby"
    ],
    "connection": [
    {
    "jdbcUrl": "jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai",
    "table": ["hdfsreader"]
    }
    ],
    "username": "test5",
    "password": "weiyigeek.top",
    "writeMode": "insert"
    }
    }
    }
    ],
    "setting": {
    "speed": {
    "channel": "1"
    }
    }
    }
    }
    EOF

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
$ bin/datax.py job/hdfs2mysql.json
# - 获取的列
2021-10-27 23:20:44.255 [job-0] INFO OriginalConfPretreatmentUtil - table:[hdfsreader] all columns:[uid,name,hobby,operation_time].

# - SQL语句生成
2021-10-27 23:20:44.277 [job-0] INFO OriginalConfPretreatmentUtil - Write data [insert INTO %s (name,hobby) VALUES(?,?)], which jdbcUrl like:[jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]

# - 读取HDFS中指定的mysql-test-user.txt文件
2021-10-27 23:20:44.763 [job-0] INFO HdfsReader$Job - [hdfs://10.10.107.225:8020/mysql-test-user.txt]是[text]类型的文件, 将该文件加入source files列表
2021-10-27 23:20:44.764 [job-0] INFO HdfsReader$Job - 您即将读取的文件数为: [1], 列表为: [hdfs://10.10.107.225:8020/mysql-test-user.txt]

# - 读取开始
2021-10-27 23:20:44.818 [0-0-0-reader] INFO Reader$Task - read start
2021-10-27 23:20:44.818 [0-0-0-reader] INFO Reader$Task - reading file : [hdfs://10.10.107.225:8020/mysql-test-user.txt]
2021-10-27 23:20:44.833 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"|","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2021-10-27 23:20:44.836 [0-0-0-reader] INFO Reader$Task - end read source files...

# - 执行结果
任务启动时刻 : 2021-10-27 23:20:43
任务结束时刻 : 2021-10-27 23:20:54
任务总计耗时 : 10s
任务平均流量 : 6B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0

# - 查看写入表的数据。
mysql> select * from test.hdfsreader;
+-----+--------+---------------------------------------+----------------+
| uid | name | hobby | operation_time |
+-----+--------+---------------------------------------+----------------+
| 1 | Beats | 通用日志采集 | NULL |
| 2 | Kibana | 数据分析,日志搜寻,日志数据展示,可视化 | NULL |
| 3 | C | 面向过程编程语言 | NULL |
| 4 | C++ | 面向对象 | NULL |
| 5 | Python | 编程语言 | NULL |
+-----+--------+---------------------------------------+----------------+
5 rows in set (0.04 sec)


  • (2) 从MySQL中读取并写入到指定表之中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 表示创建
create TABLE if not EXISTS mysqlwriter LIKE hdfsreader;
# 示例生成
bin/datax.py -r mysqlreader -w mysqlwriter
# job任务配置
tee job/mysql2mysql.json <<'EOF'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
"querySql": ["select name,hobby,operation_time from user where (uid > 5);"],
}
],
"username": "test5",
"password": "weiyigeek.top",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"name",
"hobby",
"operation_time"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai",
"table": ["mysqlwriter"]
}
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from mysqlwriter"
],
"username": "test5",
"password": "weiyigeek.top",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
EOF

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -MysqlWriter 写入的字段以及SQL
2021-10-28 09:57:05.554 [job-0] INFO OriginalConfPretreatmentUtil - table:[mysqlwriter] all columns:[uid,name,hobby,operation_time].
2021-10-28 09:57:05.582 [job-0] INFO OriginalConfPretreatmentUtil - Write data [
insert INTO %s (name,hobby,operation_time) VALUES(?,?,?)
], which jdbcUrl like:[jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]

# -写入前的SQL执行处理
2021-10-28 09:57:05.628 [job-0] INFO CommonRdbmsWriter$Job - Begin to execute preSqls:[delete from mysqlwriter]. context info:jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.

# -执行统计
任务启动时刻 : 2021-10-28 09:57:05
任务结束时刻 : 2021-10-28 09:57:15
任务总计耗时 : 10s
任务平均流量 : 5B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0

# -查询插入到数据库的中的表数据
SELECT * from test.mysqlwriter;
+-----+--------+------------------+---------------------+
| uid | name | hobby | operation_time |
+-----+--------+------------------+---------------------+
| 1 | C | 面向过程编程语言 | 2021-10-13 02:43:30 |
| 2 | C++ | 面向对象 | 2021-10-13 10:44:59 |
| 3 | Python | 编程语言 | 2021-10-13 10:48:45 |
+-----+--------+------------------+---------------------+
3 rows in set (0.03 sec)

Tips : 非常注意读取和写入的字段数需要一致。

至此本章完毕,下节更精彩。