[TOC]
0x00 基础概述 1.什么是DataX? DataX 是阿里云商用产品 DataWorks 数据集成的开源版本,它是一个异构
数据源的离线数据同步
工具/平台(ETL工具)。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构
数据源之间高效的数据同步功能。
DataX.Logo
Tips : 异构即不同类型的应用或者数据源,例如MySQL/Oracle/DB2/MongDB
等不同类型的数据库源 Tips : 离线数据同步以及CDC实时数据复制,前者常用Sqoop以及DataX工具。 Tips : ETL(Extract-Transform-Load
)工具描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)
至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据
,其常用在数据仓库,但其对象并不限于数据仓库(DW)。
前面我们说到 DataX 的前身是阿里云商业化产品 DataWorks, 其稳定、高效、支持多样化等优点就不言而喻, DataWorks 致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks 数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。商业版本参见(https://www.aliyun.com/product/bigdata/ide )
DataX它有何特点?
答: DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
Github 仓库: https://github.com/alibaba/DataX Gitee 国内仓库: https://gitee.com/mirrors/DataX
2.DataX的设计思想 描述:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
简单得说,DataX就像中间商一样为每一个服务对象进行需求供应。
weiyigeek.top-DataX设计思想
3.DataX的框架设计 描述: DataX本身作为离线数据同步框架,离线(批量)的数据通道通过定义数据来源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。
Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
例如,将MySQL中的数据离线同步到HDFS之中来展示DataX的框架设计结构。
weiyigeek.top-DataX框架设计
Tips : DataX架构设计流程类似source(数据来源)-> channel(数据存储池中转通道) -> sink (目的地)
流程,
4.DataX的运行原理 描述: DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
weiyigeek.top-DataX运行原理
DataX 调度流程: 描述: DataX完成单个数据同步的作业(Job),当DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。在Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出,否则异常退出,并且进程退出值非0
。
核心模块解析:
DataX Job 模块 : 是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataX Task : 由Job切分而来, 是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作(包含Reader—>Channel—>Writer
流程)。
DataX Schedule 模块 : 将Task组成TaskGroup ,注意单个组的默认并发数量为5(动态概念即同时有5个在运行
)。
DataX TaskGroup : 负责启动以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
举例说明,当用户提交一个Datax Job 并且配置了20并非数,目的是将一个100张分别的MySQL数据同步到odps中。
(1) 首先根据分库分表切分成为100个Task。
(2) 根据要达到20个并发,此时我们需要分配4个TaskGroup,简单的说20并发除以每个TaskGroup的默认并发5得到4。
(3) 此时每一个TaskGroup负责以5并发数,共计运行25个Task,简单的说100Task除以4个TaskGroup就得到每个组需要执行的Task数。
5.DataX支持的数据源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据源如下表 (https://github.com/alibaba/DataX#support-data-channels )
类型
数据源
Reader(读)
Writer(写)
文档
RDBMS 关系型数据库
MySQL
√
√
读 、写
Oracle
√
√
读 、写
OceanBase
√
√
读 、写
SQLServer
√
√
读 、写
PostgreSQL
√
√
读 、写
DRDS
√
√
读 、写
通用RDBMS(支持所有关系型数据库)
√
√
读 、写
阿里云数仓数据存储
ODPS
√
√
读 、写
ADS
√
写
OSS
√
√
读 、写
OCS
√
√
读 、写
NoSQL数据存储
OTS
√
√
读 、写
Hbase0.94
√
√
读 、写
Hbase1.1
√
√
读 、写
Phoenix4.x
√
√
读 、写
Phoenix5.x
√
√
读 、写
MongoDB
√
√
读 、写
Hive
√
√
读 、写
Cassandra
√
√
读 、写
无结构化数据存储
TxtFile
√
√
读 、写
FTP
√
√
读 、写
HDFS
√
√
读 、写
Elasticsearch
√
写
时间序列数据库
OpenTSDB
√
读
TSDB
√
√
读 、写
Tips : DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
6.为何选择DataX? 描述: 我们可以从DataX 3.0六大核心优势入手了解我们为何选择它。
(1) 可靠的数据质量监控
完美解决数据传输个别类型失真问题
提供作业全链路的流量、数据量运行时监控
提供脏数据探测
(2) 丰富的数据转换功能 : DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。
(3) 精准的速度控制 : 新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
[TOC]
0x00 基础概述 1.什么是DataX? DataX 是阿里云商用产品 DataWorks 数据集成的开源版本,它是一个异构
数据源的离线数据同步
工具/平台(ETL工具)。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构
数据源之间高效的数据同步功能。
DataX.Logo
Tips : 异构即不同类型的应用或者数据源,例如MySQL/Oracle/DB2/MongDB
等不同类型的数据库源 Tips : 离线数据同步以及CDC实时数据复制,前者常用Sqoop以及DataX工具。 Tips : ETL(Extract-Transform-Load
)工具描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)
至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据
,其常用在数据仓库,但其对象并不限于数据仓库(DW)。
前面我们说到 DataX 的前身是阿里云商业化产品 DataWorks, 其稳定、高效、支持多样化等优点就不言而喻, DataWorks 致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks 数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。商业版本参见(https://www.aliyun.com/product/bigdata/ide )
DataX它有何特点?
答: DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
Github 仓库: https://github.com/alibaba/DataX Gitee 国内仓库: https://gitee.com/mirrors/DataX
2.DataX的设计思想 描述:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
简单得说,DataX就像中间商一样为每一个服务对象进行需求供应。
weiyigeek.top-DataX设计思想
3.DataX的框架设计 描述: DataX本身作为离线数据同步框架,离线(批量)的数据通道通过定义数据来源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。
Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
例如,将MySQL中的数据离线同步到HDFS之中来展示DataX的框架设计结构。
weiyigeek.top-DataX框架设计
Tips : DataX架构设计流程类似source(数据来源)-> channel(数据存储池中转通道) -> sink (目的地)
流程,
4.DataX的运行原理 描述: DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
weiyigeek.top-DataX运行原理
DataX 调度流程: 描述: DataX完成单个数据同步的作业(Job),当DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。在Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出,否则异常退出,并且进程退出值非0
。
核心模块解析:
DataX Job 模块 : 是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataX Task : 由Job切分而来, 是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作(包含Reader—>Channel—>Writer
流程)。
DataX Schedule 模块 : 将Task组成TaskGroup ,注意单个组的默认并发数量为5(动态概念即同时有5个在运行
)。
DataX TaskGroup : 负责启动以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
举例说明,当用户提交一个Datax Job 并且配置了20并非数,目的是将一个100张分别的MySQL数据同步到odps中。
(1) 首先根据分库分表切分成为100个Task。
(2) 根据要达到20个并发,此时我们需要分配4个TaskGroup,简单的说20并发除以每个TaskGroup的默认并发5得到4。
(3) 此时每一个TaskGroup负责以5并发数,共计运行25个Task,简单的说100Task除以4个TaskGroup就得到每个组需要执行的Task数。
5.DataX支持的数据源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据源如下表 (https://github.com/alibaba/DataX#support-data-channels )
类型
数据源
Reader(读)
Writer(写)
文档
RDBMS 关系型数据库
MySQL
√
√
读 、写
Oracle
√
√
读 、写
OceanBase
√
√
读 、写
SQLServer
√
√
读 、写
PostgreSQL
√
√
读 、写
DRDS
√
√
读 、写
通用RDBMS(支持所有关系型数据库)
√
√
读 、写
阿里云数仓数据存储
ODPS
√
√
读 、写
ADS
√
写
OSS
√
√
读 、写
OCS
√
√
读 、写
NoSQL数据存储
OTS
√
√
读 、写
Hbase0.94
√
√
读 、写
Hbase1.1
√
√
读 、写
Phoenix4.x
√
√
读 、写
Phoenix5.x
√
√
读 、写
MongoDB
√
√
读 、写
Hive
√
√
读 、写
Cassandra
√
√
读 、写
无结构化数据存储
TxtFile
√
√
读 、写
FTP
√
√
读 、写
HDFS
√
√
读 、写
Elasticsearch
√
写
时间序列数据库
OpenTSDB
√
读
TSDB
√
√
读 、写
Tips : DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
6.为何选择DataX? 描述: 我们可以从DataX 3.0六大核心优势入手了解我们为何选择它。
(1) 可靠的数据质量监控
完美解决数据传输个别类型失真问题
提供作业全链路的流量、数据量运行时监控
提供脏数据探测
(2) 丰富的数据转换功能 : DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。
(3) 精准的速度控制 : 新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。1 2 3 4 5 "speed" : { "channel" : 5, "byte" : 1048576, "record" : 10000 }
(4) 强劲的同步性能 : DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长
(5) 健壮的容错机制
(6) 极简的使用体验
易用: 开箱即用支持linux和windows,只需要短短几步骤就可以完成数据的传输
详细: 在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况
weiyigeek.top-详细日志输出
DataX 与 Sqoop 间的对比(VS)
Function
DataX
Sqoop
运行模式
单进程多线程 (可以独立启动多个进程)
MR模型
文件格式
ORC 支持
orc 不支持
分布式
不支持(但可以通过调度系统扩充)
支持
流量控制
支持
需要定制
统计信息
基本详细的统计信息
没有(分布式的数据采集不方便)
数据校验
在core部分有校验功能
没有
数据监控
需要定制
需要定制
社区支持
开源时间短社区活跃一般
开源时间长社区活跃,核心部分变动少
MySQL读写
单机压力大、读写粒度容易控制
MR 模式错误处理麻烦
Hive 读写
单机压力大
均衡写入很好
DataX 应用场景与适用人群。
应用于企业应用中数据迁移备份,以及供不同接入的应用数据库的应用进行数据的访问。
适用于从事数据采集工作人员,以及企业中从0到1建设阶段IT运维、以及DBA运维管理人员。
0x01 Datax 安装使用 1.快速开始 描述: DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS
等各种异构数据源之间高效的数据同步功能。
系统环境依赖-System Requirements
Linux
JDK ( 1.8以上,推荐1.8 )
Python ( 推荐 Python2.6.X )1 2 3 4 5 6 7 8 9 10 11 12 export PYTHON_HOME="/usr/local/python27" wget https://www.python.org/ftp/python/2.7.18/Python-2.7.18.tgz tar -zxf Python-2.7.18.tgz -C /usr/local / apt install gcc g++ make cd /usr/local /Python-2.7.18./configure --prefix=/usr/local /python27 make && make install ln -s /usr/local /python27/bin/python2.7 /usr/bin/python ln -s /usr/local /python27/bin/python2.7 /usr/bin/python2 python --version && python2 --version Python 2.7.18
Apache Maven 3.x (Compile DataX)
安装部署方式
Datax 解压后其结构如下:1 2 3 4 5 6 7 8 9 10 11 12 13 $ cd /usr/local/datax $ /usr/local/datax# tree -d -L 2 ├── bin # 可执行的Python脚本 ├── conf # Datax 配置文件 ├── job # 离线同步任务 ├── lib # 依赖库 ├── log # 任务执行过程日志 ├── log_perf ├── plugin # 各类数据库读写插件 │ ├── reader │ └── writer ├── script # 脚本存放 └── tmp # 临时目录
运行测试 描述: 采用 Datax 自带的 job/job.json 进行运行测试验证安装环境。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 /usr/local /datax 2021-10-26 11:20:54.301 [main] INFO Engine - the machine info => osInfo: Eclipse Foundation 16 16.0.2+7 jvmInfo: Linux amd64 5.4.0-88-generic cpu num: 4 totalPhysicalMemory: -0.00G freePhysicalMemory: -0.00G maxFileDescriptorCount: -1 currentOpenFileDescriptorCount: -1 GC Names [G1 Young Generation, G1 Old Generation] MEMORY_NAME | allocation_size | init_size CodeHeap 'profiled nmethods' | 117.21MB | 2.44MB G1 Old Gen | 1,024.00MB | 970.00MB G1 Survivor Space | -0.00MB | 0.00MB CodeHeap 'non-profiled nmethods' | 117.22MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB Metaspace | -0.00MB | 0.00MB G1 Eden Space | -0.00MB | 54.00MB CodeHeap 'non-nmethods' | 5.57MB | 2.44MB 2021-10-26 11:21:04.364 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.021s | All Task WaitReaderTim 2021-10-26 11:21:04.367 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime G1 Young Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s G1 Old Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2021-10-26 11:21:04.367 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.021s | All Task WaitReaderTime 0.041s | Percentage 100.00% 2021-10-26 11:21:04.368 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-26 11:20:54 任务结束时刻 : 2021-10-26 11:21:04 任务总计耗时 : 10s 任务平均流量 : 253.91KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0
Tips : 上面 Job 读写输出数据为DataX 19890604 1989-06-03 23:00:00 true test
2.基础使用 描述: 我们可以通过DataX数据源参考指南(https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)来查看具体每个插件需要或者可选的插件。
插件示例获取: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 $ ./bin/datax.py -r streamreader -w streamwriter # (1) 此处将会显示 读写 插件的使用文档说明 Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document:https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md # (2) 命令执行示例 python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json # (3) Job 任务配置示例 Json 格式 (以下参数我简单描述) tee job/stream2stream.json <<'EOF' { "job" : { "content" : [ { // 读插件 "reader": { "name": "streamreader", // 指定插件名称 "parameter": { "column": [ // 字段类与值 (必须进行指定) { "value" : "WeiyiGeek" , "type" : "string" }, { "value" : 2021 , "type" : "long" }, { "value" : "2021-01-01 00:00:00" , "type" : "date" }, { "value" : true , "type" : "bool" }, { "value" : "test" , "type" : "bytes" } ], "sliceRecordCount": "10" // 切片记录计数 } }, // 写插件 "writer": { "name": "streamwriter", // 指定使用的插件名称 "parameter": { "encoding": "UTF-8", // 编码格式 "print": true // 是否终端打印 } } } ], "setting": { "speed": { // 同步速度采用的类型 "channel": "2" // 并发数 //"byte": 10485760 // 字节数 } } } } EOF
执行结果: (执行时请删除上述备注)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 python bin/datax.py job/stream2stream.json 2021-10-26 16:28:33.568 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks. 2021-10-26 16:28:33.579 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started 2021-10-26 16:28:33.595 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[1] attemptCount[1] is started WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test WeiyiGeek 2021 2021-01-01 00:00:00 true test 2021-10-26 16:28:43.576 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 520 bytes | Speed 52B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReade rTime 0.002s | Percentage 100.00% 2021-10-26 16:28:43.576 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-26 16:28:33 任务结束时刻 : 2021-10-26 16:28:43 任务总计耗时 : 10s 任务平均流量 : 52B/s 记录写入速度 : 2rec/s 读出记录总数 : 20 读写失败总数 : 0
Tips : 执行后的日志除了终端打印还会在本地日志目录中存放(/usr/local/datax/log/2021-10-26/b_stream2stream_json-16_28_33.312.log
)文件。
Tips : 非常执行同步写入的总次数为setting.speed.channel * sliceRecordCount
。
0x02 Datax 实战使用 1.MySQL-To-HDFS 环境&准备说明: 描述: 为了快速搭建测试的数据库环境,本系列将采用docker容器
进行搭建部署,如没有安装docker 和 docker-compose
请参照本博客中的Docker系列课程。
(1) MySQL 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 version: '3.1' services: db8: image: mysql container_name: mysql8.x command : --default-authentication-plugin=mysql_native_password restart: always environment: MYSQL_ROOT_PASSWORD: www.weiyigeek.top MYSQL_DATABASE: test MYSQL_USER: test MYSQL_PASSWORD: www.weiyigeek.top volumes: - "/app/mysql8:/var/lib/mysql" ports: - 3306:3306 docker pull singularities/hadoop docker-compose up -d DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `uid` int(0) NOT NULL AUTO_INCREMENT COMMENT '用户id' , `name` varchar(32) CHARACTER SET utf8mb4 NOT NULL COMMENT '用户名称' , `age` int(0) NOT NULL COMMENT '用户年龄' , `hobby` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '用户爱好' , `operation_time` datetime(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '插入时间' , PRIMARY KEY (`uid`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (1, 'WeiyiGeek' , 20, 'Network,Computer' , '2021-10-12 14:34:03' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (2, 'Elastic' , 18, '数据分析,数据采集,数据处理' , '2021-10-12 17:16:34' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (3, 'Logstash' , 20, '日志采集,日志过滤' , '2021-10-12 17:16:59' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (4, 'Beats' , 10, '通用日志采集' , '2021-10-12 17:17:06' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (5, 'Kibana' , 19, '数据分析,日志搜寻,日志数据展示,可视化' , '2021-10-12 17:27:38' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (6, 'C' , 25, '面向过程编程语言' , '2021-10-13 02:43:30' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (7, 'C++' , 25, '面向对象' , '2021-10-13 10:44:59' ); INSERT INTO `test `.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (8, 'Python' , 26, '编程语言' , '2021-10-13 10:48:45' ); mysql> select * from test.user; +-----+-----------+-----+---------------------------------------+---------------------+ | uid | name | age | hobby | operation_time | +-----+-----------+-----+---------------------------------------+---------------------+ | 1 | WeiyiGeek | 20 | Network,Computer | 2021-10-12 14:34:03 | | 2 | Elastic | 18 | 数据分析,数据采集,数据处理 | 2021-10-12 17:16:34 | | 3 | Logstash | 20 | 日志采集,日志过滤 | 2021-10-12 17:16:59 | | 4 | Beats | 10 | 通用日志采集 | 2021-10-12 17:17:06 | | 5 | Kibana | 19 | 数据分析,日志搜寻,日志数据展示,可视化 | 2021-10-12 17:27:38 | | 6 | C | 25 | 面向过程编程语言 | 2021-10-13 02:43:30 | | 7 | C++ | 25 | 面向对象 | 2021-10-13 10:44:59 | | 8 | Python | 26 | 编程语言 | 2021-10-13 10:48:45 | +-----+-----------+-----+---------------------------------------+---------------------+ 8 rows in set (0.04 sec) mysql> desc test.user; +----------------+--------------+------+-----+---------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +----------------+--------------+------+-----+---------+-----------------------------+ | uid | int | NO | PRI | NULL | auto_increment | | name | varchar(32) | NO | | NULL | | | age | int | NO | | NULL | | | hobby | varchar(255) | NO | | NULL | | | operation_time | datetime | YES | | NULL | on update CURRENT_TIMESTAMP | +----------------+--------------+------+-----+---------+-----------------------------+ 5 rows in set (0.04 sec)
(2) HDFS Docker HDFS镜像参考地址: https://registry.hub.docker.com/r/gradiant/hdfs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 tee /tmp/hdfs-site.xml <<'EOF' <?xml version="1.0" encoding="UTF-8" ?> <?xml-stylesheet type ="text/xsl" href="configuration.xsl" ?> <configuration> <property><name>dfs.namenode.name.dir</name><value>file:///hadoop/dfs/name</value></property> <property><name>dfs.namenode.rpc-bind-host</name><value>0.0.0.0</value></property> <property><name>dfs.namenode.servicerpc-bind-host</name><value>0.0.0.0</value></property> <property><name>dfs.namenode.http-bind-host</name><value>0.0.0.0</value></property> <property><name>dfs.namenode.https-bind-host</name><value>0.0.0.0</value></property> <property><name>dfs.client.use.datanode.hostname</name><value>true </value></property> <property><name>dfs.datanode.use.datanode.hostname</name><value>true </value></property> <property><name>dfs.namenode.datanode.registration.ip-hostname-check</name><value>false </value></property> <property><name>dfs.permissions.enabled</name><value>false </value></property> </configuration> EOF docker pull gradiant/hdfs-namenode docker pull gradiant/hdfs-datanode docker run -d --name hdfs-namenode -v /tmp/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml \ -p "8020:8020" \ -p "14000:14000" \ -p "50070:50070" \ -p "50075:50075" \ -p "10020:10020" \ -p "13562:13562" \ -p "19888:19888" gradiant/hdfs-namenode docker run -d --link hdfs-namenode --name hdfs-datanode1 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode docker run -d --link hdfs-namenode --name hdfs-datanode2 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode docker run -d --link hdfs-namenode --name hdfs-datanode3 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode docker exec -ti hdfs-namenode hadoop fs -mkdir /hdfs docker exec -ti hdfs-namenode hadoop fs -mkdir -p /hdfs/d1/d2 docker exec -ti hdfs-namenode hadoop fs -ls / hadoop fs hadoop fs -mkdir /hdfs hadoop fs -mkdir -p /hdfs/d1/d2 echo "hello world" >> local.txt hadoop fs -put local.txt /hdfs/ hadoop fs -get /hdfs/local.txt hadoop fs -rm /hdfs/local.txt hadoop fs -rmdir /hdfs/d1/d2
weiyigeek.top-docker-hdfsTest
mysqlreader 快速使用说明与配置样例
(1) 关键参数&类型转换:
jdbcUrl: 描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。 username: 数据源的用户名 password: 数据源指定用户名的密码 table: 所选取的需要同步的表,支持多张表同时抽取,用户自己需保证多张表是同一schema结构. column: 所配置的表中需要同步的列名集合,用户使用*代表默认使用所有列配置,例如['*']
一般不会采用此种方式。 where: 筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取,注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。 splitPk: 表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,使得提高数据同步的效能,目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型
querySql: 用户可自定义定义筛选SQL, 此参数会忽略 table、column 、where 选项,其优先级最高
MysqlReader针对Mysql类型转换列表: (注意除下述罗列字段类型外,其他类型均不支持。)
DataX 内部类型
Mysql 数据类型
Long
int, tinyint, smallint, mediumint, int, bigint
Double
float, double, decimal
String
varchar, char, tinytext, text, mediumtext, longtext, year
Date
date, datetime, timestamp, time
Boolean
bit, bool
Bytes
tinyblob, mediumblob, blob, longblob, varbinary
重点注意:
除上述罗列字段类型外,其他类型均不支持
。
tinyint(1) DataX视作为整形
。
year DataX视作为字符串类型
bit DataX属于未定义行为
。
Tips : mysqlreader 插件默认不支持MySQL8.X由于其 jdbc_driver_class驱动名称"com.mysql.cj.jdbc.Driver"
(2) 配置样例 (2.1) 配置一个从Mysql数据库同步抽取数据到本地的作业:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 tee job/mysql2stream.json <<'EOF' { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "column" : [ "uid" , "name" , "operation_time" ], "connection" : [ { "jdbcUrl" : ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai" ], "table" : ["user" ] } ], "username" : "test5" , "password" : "weiyigeek.top" , "where" : "uid > 0" } }, "writer" : { "name" : "streamwriter" , "parameter" : { "print" : true , "encoding" : "UTF-8" } } } ], "setting" : { "speed" : { "channel" : "2" } } } } EOF
执行结果:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 2021-10-26 21:43:04.419 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select uid,name,operation_time from user where (uid > 0)] jdbcUrl:[jdbc:mysql://********&yearIsDateType=false &zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false &rewriteBatchedStatements=true ]. 1 WeiyiGeek 2021-10-12 14:34:03 2 Elastic 2021-10-12 17:16:34 3 Logstash 2021-10-12 17:16:59 4 Beats 2021-10-12 17:17:06 5 Kibana 2021-10-12 17:27:38 6 C 2021-10-13 02:43:30 7 C++ 2021-10-13 10:44:59 8 Python 2021-10-13 10:48:45 ..... 2021-10-26 21:43:04.497 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[103]ms 2021-10-26 21:43:14.393 [job-0] INFO StandAloneJobContainerCommunicator - Total 8 records, 117 bytes | Speed 11B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-10-26 21:43:14.394 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-26 21:43:04 任务结束时刻 : 2021-10-26 21:43:14 任务总计耗时 : 10s 任务平均流量 : 11B/s 记录写入速度 : 0rec/s 读出记录总数 : 8 读写失败总数 : 0
(2.2) 配置一个自定义SQL的数据库同步任务到本地内容的作业:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 cat job/mysql2stream1.json { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "connection" : [ { "jdbcUrl" : ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai" ], "querySql" : ["select uid,name,hobby,operation_time from user where (uid > 3);" ], } ], "username" : "test5" , "password" : "weiyigeek.top" , } }, "writer" : { "name" : "streamwriter" , "parameter" : { "print" : true , "encoding" : "UTF-8" } } } ], "setting" : { "speed" : { "channel" : "1" } } } }
执行结果:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 2021-10-26 21:50:58.904 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select uid,name,hobby,operation_time from user where (uid > 3);] jdbcUrl:[jdbc:mysql://*******&rewriteBatchedStatements=true ]. 4 Beats 通用日志采集 2021-10-12 17:17:06 5 Kibana 数据分析,日志搜寻,日志数据展示,可视化 2021-10-12 17:27:38 6 C 面向过程编程语言 2021-10-13 02:43:30 7 C++ 面向对象 2021-10-13 10:44:59 8 Python 编程语言 2021-10-13 10:48:45 2021-10-26 21:51:08.877 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-26 21:50:58 任务结束时刻 : 2021-10-26 21:51:08 任务总计耗时 : 10s 任务平均流量 : 10B/s 记录写入速度 : 0rec/s 读出记录总数 : 5 读写失败总数 : 0
hdfswriter 快速使用说明与配置样例
(1) 关键参数&类型转换: HdfsWriter提供向HDFS文件系统指定路径中写入TEXTFile文件和ORCFile文件,文件内容可与hive中表关联。
defaultFS:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口 fileType: 文件的类型,目前只支持用户配置为”text”(textfile文件格式)或”orc”(orc表示orcfile文件格式)。 path: 存储到Hadoop hdfs文件系统的路径信息,HdfsWriter会根据并发配置在Path目录下写入多个文件 fileName: HdfsWriter写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。 column: 写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型。 writeMode: 写入前数据清理处理模式,append 写入前不做任何处理, onConflict,如果目录下有fileName前缀的文件,直接报错。 fieldDelimiter: 写入时的字段分隔符,需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据 compress: 写入文件压缩类型,默认没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)。 encoding: 写文件的编码配置, 默认值 utf-8 慎重修改。 haveKerberos : 是否有Kerberos认证默认false,如果为True则配置项kerberosKeytabFilePath,kerberosPrincipal
为必填。 kerberosKeytabFilePath: Kerberos认证 keytab文件路径,绝对路径 kerberosPrincipal: Kerberos认证Principal名,如xxxx/hadoopclient@xxx.xxx hadoopConfig: HadoopConfig 高级HA配置:
1 2 3 4 5 6 7 8 // 名称空间: testDfs "hadoopConfig":{ "dfs.nameservices": "testDfs", "dfs.ha.namenodes.testDfs": "namenode1,namenode2", "dfs.namenode.rpc-address.aliDfs.namenode1": "主机名:端口", "dfs.namenode.rpc-address.aliDfs.namenode2": "主机名:端口", "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
HdfsWriter 针对 Hive 数据类型转换列表: | DataX 内部类型| HIVE 数据类型 | | ——– | —– | | Long |TINYINT,SMALLINT,INT,BIGINT | | Double |FLOAT,DOUBLE | | String |STRING,VARCHAR,CHAR | | Boolean |BOOLEAN | | Date |DATE,TIMESTAMP |
(2) 配置样例 (2.1) 从MySQL同步数据HDFS中实例演示:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 // 示例生成 bin/datax.py -r mysqlreader -w hdfswriter > mysql2hdfs.json // 最终示例 tee job/mysql2hdfs3.json <<'EOF' { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "connection" : [ { "jdbcUrl" : ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai" ], "querySql" : ["select uid,name,hobby,operation_time from user where (uid > 3);" ], } ], "username" : "test5" , "password" : "weiyigeek.top" , } }, "writer" : { "name" : "hdfswriter" , "parameter" : { "column" : [ {"name" :"uid" ,"type" :"Long" }, {"name" :"name" ,"type" :"string" }, {"name" :"hobby" ,"type" :"string" }, {"name" :"operation_time" ,"type" :"Date" }, ], "compress" : "gzip" , "defaultFS" : "hdfs://10.10.107.225:8020" , "fieldDelimiter" : "|" , "fileName" : "mysql-test-user" , "fileType" : "test" , "path" : "/hdfs" , "writeMode" : "append" , "encoding" : "UTF-8" } } } ], "setting" : { "speed" : { "channel" : "1" } } } } EOF
执行结果:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 /usr/local /datax 2021-10-27 18:25:25.794 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work . 2021-10-27 18:25:25.794 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do prepare work . 2021-10-27 18:25:25.853 [job-0] INFO HdfsWriter$Job - 由于您配置了writeMode append, 写入前不做清理工作, [/] 目录下写入相应文件名前缀 [mysql-test-user] 的文件 2021-10-27 18:25:25.895 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql、 2021-10-27 18:25:25.910 [0-0-0-writer] INFO HdfsWriter$Task - begin do write... 2021-10-27 18:25:25.911 [0-0-0-writer] INFO HdfsWriter$Task - write to file : [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8] 2021-10-27 18:25:25.923 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql 2021-10-27 18:25:35.885 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do post work. 2021-10-27 18:25:35.885 [job-0] INFO HdfsWriter$Job - start rename file [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz] to file [hdfs://10.10.107.225:8020/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz]. 2021-10-27 18:25:35.902 [job-0] INFO HdfsWriter$Job - finish rename file [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz] to file [hdfs://10.10.107.225:8020/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz]. 2021-10-27 18:25:35.902 [job-0] INFO HdfsWriter$Job - start delete tmp dir [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82] . 2021-10-27 18:25:35.911 [job-0] INFO HdfsWriter$Job - finish delete tmp dir [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82] . 2021-10-27 18:25:35.912 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. 任务启动时刻 : 2021-10-27 18:25:24 任务结束时刻 : 2021-10-27 18:25:36 任务总计耗时 : 11s 任务平均流量 : 6B/s 记录写入速度 : 0rec/s 读出记录总数 : 5 读写失败总数 : 0 $ docker inspect hdfs-datanode1 | grep '"IPAddress"' | head -n 1 "IPAddress" : "172.17.0.3" ,$ curl "http://172.17.0.3:50075/webhdfs/v1/mysql-test-user__7630c0e0_d169_43cf_a808_272ad7d907bc?op=OPEN&namenoderpcaddress=0f3e052efe21:8020&offset=0" 4|Beats|通用日志采集 5|Kibana|数据分析,日志搜寻,日志数据展示,可视化 6|C|面向过程编程语言 7|C++|面向对象 8|Python|编程语言
weiyigeek.top-WriteDataHDFSResult
Tips : 总结Datax针对HDFS的写入流程,首先将数据写入到一个临时文件,如果全部成功则重命名(移动)
临时文件名、并删除临时目录。如果个别数据失败则Job任务失败,删除临时目录和临时文件。 Tips : 从上面结果可以看出HDFS实际执行时会在该文件名后添加随机的后缀作为每个线程的实际写入文件名。
2.HDFS-To-MySQL hdfsreader 快速使用说明与配置样例
(1) 快速介绍和参数说明 HdfsReader支持的文件格式有textfile(text)、orcfile(orc)、rcfile(rc)、sequence file(seq)和普通逻辑二维表(csv)
类型格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
参数说明:
path: 要读取的文件路径,如果要读取多个文件,可以使用正则表达式”“,注意这里可以支持填写多个路径,比如需要读取表名叫mytable01下分区day为20150820这一天的所有数据,则配置如下:`”path”: “/user/hive/warehouse/mytable01/20150820/ “defaultFS: Hadoop hdfs文件系统namenode节点地址
fileType: 文件的类型,目前只支持用户配置为
“text”、”orc”、”rc”、”seq”、”csv”column: 读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,如
{ “type”: “long”, “index”: 0}, { “type”: “string”, “value”: “alibaba”}` fieldDelimiter: 读取的字段分隔符 encoding: 读取文件的编码配置 nullFormat: 文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null。 haveKerberos:是否有Kerberos认证,默认false,例如如果用户配置true,则配置项kerberosKeytabFilePath,kerberosPrincipal为必填。 kerberosKeytabFilePath: Kerberos认证 keytab文件路径,绝对路径 kerberosPrincipal: Kerberos认证Principal名,如xxxx/hadoopclient@xxx.xxx compress: 当fileType(文件类型)为csv下的文件压缩方式,目前仅支持 gzip、bz2、zip、lzo、lzo_deflate、hadoop-snappy、framing-snappy压缩; csvReaderConfig: 取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 "csvReaderConfig" :{ "safetySwitch" : false , "skipEmptyRecords" : false , "useTextQualifier" : false } boolean caseSensitive = true ; char textQualifier = 34; boolean trimWhitespace = true ; boolean useTextQualifier = true ;//是否使用csv转义字符 char delimiter = 44;//分隔符 char recordDelimiter = 0; char comment = 35; boolean useComments = false ; int escapeMode = 1; boolean safetySwitch = true ;//单列长度是否限制100000字符 boolean skipEmptyRecords = true ;//是否跳过空行 boolean captureRawRecord = true ;
HdfsReader提供了类型转换的建议表如下:
DataX 内部类型
Hive表 数据类型
Long
TINYINT,SMALLINT,INT,BIGINT
Double
FLOAT,DOUBLE
String
String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY
Boolean
BOOLEAN
Date
Date,TIMESTAMP
其中:
Long是指Hdfs文件文本中使用整形的字符串表示形式,例如”123456789”。
Double是指Hdfs文件文本中使用Double的字符串表示形式,例如”3.1415”。
Boolean是指Hdfs文件文本中使用Boolean的字符串表示形式,例如”true”、”false”。不区分大小写。
Date是指Hdfs文件文本中使用Date的字符串表示形式,例如”2014-12-31”。
特别提醒:
Hive支持的数据类型TIMESTAMP可以精确到纳秒级别,所以textfile、orcfile中TIMESTAMP存放的数据类似于”2015-08-21 22:40:47.397898389”,如果转换的类型配置为DataX的Date,转换之后会导致纳秒部分丢失,所以如果需要保留纳秒部分的数据,请配置转换类型为DataX的String类型。
Tips: 目前HdfsReader不支持对Hive元数据数据库进行访问查询,因此用户在进行类型转换的时候,必须指定数据类型,如果用户配置的column为"*",则所有column默认转换为string类型
。
mysqlwriter 快速使用说明与配置样例
(1) 描述: 我们使用 MysqlWriter 从数仓导入数据到 Mysql,同时 MysqlWriter 亦可以作为数据迁移工具为DBA等用户提供服务。
MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)
或者replace into...(没有遇到主键/唯一性索引冲突时
,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。
MysqlWriter 可用参数:
jdbcUrl: 目的数据库的 JDBC 连接信息。 username: 目的数据库的用户名。 password: 目的数据库的密码。 table: 目的表的表名称。注意:table 和 jdbcUrl 必须包含在 connection 配置单元中
column: 目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: “column”: [“id”,”name”,”age”]。如果要依次写入全部列,使用表示, 例如: “column”: `[“ “]`。 session: DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性。 preSql: 写入数据到目的表前,会先执行这里的标准语句 postSql: 写入数据到目的表后,会执行这里的标准语句。(原理同 preSql ) writeMode: 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句 batchSize: 一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
MysqlWriter 针对 Mysql 类型转换列表: Long、Double、String、Date、Boolean、Bytes
与Mysqlreader插件是一致的。
配置示例演示
(1) 从HDFS同步数据到MySQL中实例演示:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 /usr/local /datax mysql> CREATE TABLE IF NOT EXISTS test.hdfsreader like test.user; Query OK, 0 rows affected (0.48 sec) mysql> DESC test.hdfsreader; +----------------+--------------+------+-----+---------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +----------------+--------------+------+-----+---------+-----------------------------+ | uid | int(11) | NO | PRI | NULL | auto_increment | | name | varchar(32) | NO | | NULL | | | hobby | varchar(255) | NO | | NULL | | | operation_time | datetime | YES | | NULL | on update CURRENT_TIMESTAMP | +----------------+--------------+------+-----+---------+-----------------------------+ 4 rows in set (0.03 sec) $ bin/datax.py -r hdfsreader -w mysqlwriter tee job/hdfs2mysql.json <<'EOF' { "job" : { "content" : [ { "reader" : { "name" : "hdfsreader" , "parameter" : { "column" : [ {"index" :1,"type" :"string" }, {"index" :2,"type" :"string" } ], "defaultFS" : "hdfs://10.10.107.225:8020" , "encoding" : "UTF-8" , "fieldDelimiter" : "|" , "fileType" : "text" , "path" : "/mysql-test-user.txt" } }, "writer" : { "name" : "mysqlwriter" , "parameter" : { "column" : [ "name" , "hobby" ], "connection" : [ { "jdbcUrl" : "jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai" , "table" : ["hdfsreader" ] } ], "username" : "test5" , "password" : "weiyigeek.top" , "writeMode" : "insert" } } } ], "setting" : { "speed" : { "channel" : "1" } } } } EOF
执行结果:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 $ bin/datax.py job/hdfs2mysql.json 2021-10-27 23:20:44.255 [job-0] INFO OriginalConfPretreatmentUtil - table:[hdfsreader] all columns:[uid,name,hobby,operation_time]. 2021-10-27 23:20:44.277 [job-0] INFO OriginalConfPretreatmentUtil - Write data [insert INTO %s (name,hobby) VALUES(?,?)], which jdbcUrl like:[jdbc:mysql://10.20.172.248:3305/test ?useSSL=false &useUnicode=true &characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false &zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false &rewriteBatchedStatements=true ] 2021-10-27 23:20:44.763 [job-0] INFO HdfsReader$Job - [hdfs://10.10.107.225:8020/mysql-test-user.txt]是[text]类型的文件, 将该文件加入source files列表 2021-10-27 23:20:44.764 [job-0] INFO HdfsReader$Job - 您即将读取的文件数为: [1], 列表为: [hdfs://10.10.107.225:8020/mysql-test-user.txt] 2021-10-27 23:20:44.818 [0-0-0-reader] INFO Reader$Task - read start 2021-10-27 23:20:44.818 [0-0-0-reader] INFO Reader$Task - reading file : [hdfs://10.10.107.225:8020/mysql-test-user.txt] 2021-10-27 23:20:44.833 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord" :true ,"columnCount" :0,"comment" :"#" ,"currentRecord" :-1,"delimiter" :"|" ,"escapeMode" :1,"headerCount" :0,"rawRecord" :"" ,"recordDelimiter" :"\u0000" ,"safetySwitch" :false ,"skipEmptyRecords" :true ,"textQualifier" :"\"" ,"trimWhitespace" :true ,"useComments" :false ,"useTextQualifier" :true ,"values" :[]}],csvReaderConfig值为[null] 2021-10-27 23:20:44.836 [0-0-0-reader] INFO Reader$Task - end read source files... 任务启动时刻 : 2021-10-27 23:20:43 任务结束时刻 : 2021-10-27 23:20:54 任务总计耗时 : 10s 任务平均流量 : 6B/s 记录写入速度 : 0rec/s 读出记录总数 : 5 读写失败总数 : 0 mysql> select * from test.hdfsreader; +-----+--------+---------------------------------------+----------------+ | uid | name | hobby | operation_time | +-----+--------+---------------------------------------+----------------+ | 1 | Beats | 通用日志采集 | NULL | | 2 | Kibana | 数据分析,日志搜寻,日志数据展示,可视化 | NULL | | 3 | C | 面向过程编程语言 | NULL | | 4 | C++ | 面向对象 | NULL | | 5 | Python | 编程语言 | NULL | +-----+--------+---------------------------------------+----------------+ 5 rows in set (0.04 sec)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 create TABLE if not EXISTS mysqlwriter LIKE hdfsreader; bin/datax.py -r mysqlreader -w mysqlwriter tee job/mysql2mysql.json <<'EOF' { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "connection" : [ { "jdbcUrl" : ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai" ], "querySql" : ["select name,hobby,operation_time from user where (uid > 5);" ], } ], "username" : "test5" , "password" : "weiyigeek.top" , } }, "writer" : { "name" : "mysqlwriter" , "parameter" : { "column" : [ "name" , "hobby" , "operation_time" ], "connection" : [ { "jdbcUrl" : "jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai" , "table" : ["mysqlwriter" ] } ], "session" : [ "set session sql_mode='ANSI'" ], "preSql" : [ "delete from mysqlwriter" ], "username" : "test5" , "password" : "weiyigeek.top" , "writeMode" : "insert" } } } ], "setting" : { "speed" : { "channel" : "1" } } } } EOF
执行结果:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 2021-10-28 09:57:05.554 [job-0] INFO OriginalConfPretreatmentUtil - table:[mysqlwriter] all columns:[uid,name,hobby,operation_time]. 2021-10-28 09:57:05.582 [job-0] INFO OriginalConfPretreatmentUtil - Write data [ insert INTO %s (name,hobby,operation_time) VALUES(?,?,?) ], which jdbcUrl like:[jdbc:mysql://10.20.172.248:3305/test ?useSSL=false &useUnicode=true &characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false &zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false &rewriteBatchedStatements=true ] 2021-10-28 09:57:05.628 [job-0] INFO CommonRdbmsWriter$Job - Begin to execute preSqls:[delete from mysqlwriter]. context info:jdbc:mysql://10.20.172.248:3305/test ?useSSL=false &useUnicode=true &characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false &zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false &rewriteBatchedStatements=true . 任务启动时刻 : 2021-10-28 09:57:05 任务结束时刻 : 2021-10-28 09:57:15 任务总计耗时 : 10s 任务平均流量 : 5B/s 记录写入速度 : 0rec/s 读出记录总数 : 3 读写失败总数 : 0 SELECT * from test.mysqlwriter; +-----+--------+------------------+---------------------+ | uid | name | hobby | operation_time | +-----+--------+------------------+---------------------+ | 1 | C | 面向过程编程语言 | 2021-10-13 02:43:30 | | 2 | C++ | 面向对象 | 2021-10-13 10:44:59 | | 3 | Python | 编程语言 | 2021-10-13 10:48:45 | +-----+--------+------------------+---------------------+ 3 rows in set (0.03 sec)
Tips : 非常注意读取和写入的字段数需要一致。
至此本章完毕,下节更精彩。