基于 DataX 的 TDengine 数据迁移工具

基于 DataX,我们实现了 TDengine Database 的数据迁移工具,目前可以做到 OpenTSDB、MySQL、TDengine 等不同数据源之间的数据迁移。这篇文章的目的是,让用户能够快速了解如何使用这个数据迁移工具。

1、介绍

基于 DataX,我们完成了 TDengine 的适配,实现了 TDengineReader 和 TDengineWriter 两个插件。

TDengineReader 提供的功能:

  1. 支持通过 SQL 进行数据筛选;
  2. 根据时间间隔进行任务切分;
  3. 支持 TDengine 的全部数据类型;
  4. 支持批量读取,通过 batchSize 参数控制批量拉取结果集的大小,提高读取性能。

TDengineWriter 支持的功能:

  1. 支持 OpenTSDB 的 json 格式的行协议,使用 TDengine 的 schemaless 方式写入 TDengine。
  2. 支持批量写入,通过 batchSize 参数控制批量写入的数量,提高写入性能。

2、实现原理

TDengineReader:使用 JNI 方式从 TDengine 拉取数据。

TDengineWriter:使用 JNI 方式写数据到 TDengine。对 OpenTSDB 等使用 schemaless 写入,对于 MySQL 等关系型数据库,使用批量 stmt 写入。

3、使用方法

3.1 环境准备

(1)需要安装 TDengine 客户端

(2)需要安装 JDK 1.8 环境(运行 DataX)

(3)需要安装 Python 环境(运行 DataX)

(4)需要 maven 编译环境(如果不编译 DataX 则可以不安装 maven)

3.2 安装

下载源码

git clone https://github.com/taosdata/DataX.git

编译打包

cd DataX
mvn -U clean package assembly:assembly -Dmaven.test.skip=true

安装

cp target/datax.tar.gz your_install_dir
cd your_install_dir
tar -zxvf dataX.tar.gz

3.3 数据迁移 Job 的配置

3.3.1 时序数据的迁移配置

以一个从 OpenTSDB 到 TDengine 的数据迁移任务为例,配置文件 opentsdb2tdengine.json 如下:

 {
   "job":{
     "content":[{
       "reader": {
         "name": "opentsdbreader",
         "parameter": {
           "endpoint": "http://192.168.1.180:4242",
           "column": ["weather_temperature"],
           "beginDateTime": "2021-01-01 00:00:00",
           "endDateTime": "2021-01-01 01:00:00"
         }
       },
     "writer": {
       "name": "tdenginewriter",
         "parameter": {
           "host": "192.168.1.101",
           "port": 6030,
           "dbname": "test",
           "user": "root",
           "password": "taosdata",
           "batchSize": 1000
         }
       }
     }],
     "setting": {
       "speed": {
         "channel": 1
       }
     }
   }
 } 

配置说明:

  • 上面的配置表示,从 192.168.1.180 的 OpenTSDB,到 192.168.1.101 的 TDengine 的迁移。迁移 metric 为 weather_temperature,时间从 2021-01-01 00:00:00 开始,到 2021-01-01 01:00:00 结束的数据。
  • reader 使用 datax 的 opentsdbreader,parameter 的配置请参考:opentsdbreader.md#配置参数
  • tdenginewriter 的 parameter 中,host,port,dbname,user,password 都为必须项,没有默认值。batchSize 不是必须项,默认值为 1。详细参考:tdenginewriter.md#配置参数
  • TDengine 中,如果 dbname 指定的 database 不存在,则需要在迁移前创建数据库。

3.3.2 关系型数据的迁移配置

以一个从 MySQL 到 TDengine 的数据迁移任务为例,配置文件 mysql2tdengine.json 如下:

 {
   "job": {
     "content": [{
       "reader": {
         "name": "mysqlreader",
         "parameter": {
           "username": "root",
           "password": "root",
           "column": ["id","name"],
           "splitPk": "id",
           "connection": [{
             "table": ["test"],
             "jdbcUrl": ["jdbc:mysql://192.168.1.101:3306/db"]
           }]
         }
       },
       "writer": {
         "name": "tdenginewriter",
         "parameter": {
           "host": "192.168.1.105",
           "port": 6030,
           "dbname": "test",
           "user": "root",
           "password": "taosdata",
           "batchSize": 1000
         }
       }
     }],
     "setting": {
       "speed": {
         "channel": 1
       }
     }
   }
 } 

配置说明:

  • 上面的配置表示,从 192.168.1.101 的 MySQL,到 192.168.1.105 的 TDengine 的迁移。迁移 test 表中 id、name 两列到 TDengine,使用 id 列作为任务划分的列。
  • reader 使用 datax 的 mysqlreader,parameter 的配置请参考:mysqlreader.md

3.3.3 TDengine 之间的迁移配置

以一个从 TDengine 到 TDengine 的数据迁移为例,配置文件 tdengine2tdengine.json 如下:

 {
   "job": {
     "content": [{
       "reader": {
         "name": "tdenginereader",
         "parameter": {
           "host": "192.168.1.82",
           "port": 6030,
           "db": "test",
           "user": "root",
           "password": "taosdata",
           "sql": "select * from weather",
           "beginDateTime": "2021-01-01 00:00:00",
           "endDateTime": "2021-01-02 00:00:00",
           "splitInterval": "1h"
         }
       },
       "writer": {
         "name": "tdenginewriter",
         "parameter": {
           "host": "192.168.1.105",‘
           "port": 6030,
           "dbname": "test",
           "user": "root",
           "password": "taosdata",
           "batchSize": 1000
         }
       }
     }],
     "setting": {
       "speed": {
         "channel": 1
       }
     }
   }
 } 

配置说明:

  • 上面的配置表示,从 192.168.1.82 到 192.168.1.105 的 TDengine 之间的数据迁移。tdenginereader 根据 sql、begieDateTime、endDateTime 过滤数据,使用 splitInteval 进行任务划分。
  • reader 使用 tdenginereader,parameter 的配置请参考:tdenginereader.md#配置参数

3.4 执行迁移任务

将上面写好的配置文件保存在 datax/job 目录下,执行下面的命令,启动数据迁移任务:

python bin/datax.py job/opentsdb2tdengine.json

4、限制条件

(1)目前,DataX 自带的 opentsdbreader 仅支持 OpenTSDB-2.3.X 版本。详细参考:opentsdbreader#约束限制

(2)数据迁移工具依赖 TDengine 客户端中的 libtaos.so/taos.dll/libtaos.dylib,需要 TDengine-client-2.3.1.0 以上版本。

5、FAQ

(1)如何估算一个数据迁移任务所需要的资源

DataX 的每个 reader 按照自己的 task 切分策略进行任务划分,具体请参考 DataX 的任务调度规则。在估算资源是,需要按照数据迁移的数据量,任务切分规则和网络带宽限制等综合考虑,最好以实际数据迁移测试结果为准。

(2)TDengineWriter 的 batchSize 设置多大效率最高?

batchSize 是控制批量写入的参数,在获取 batchSize 行纪录后,TDengineWriter 会向 TDengine 发送一次写入请求,这减少了与 TDengine 交互次数,从而提高了性能。从测试结果来看,batchSize 在 500-1000 范围内效率最高。

(3)job 的配置中 channel 数为多少合适?

job 中的 channel 数为流量控制的参数,每个 channel 都需要开辟一块内存,用来缓存数据。如果 channel 设置过大,会引起 OOM,所以 channel 数并不是越大越好。增加 channel 数后,需要提高 JVM 内存大小。从测试结果来看,channel 在 1~6 的范围内都是合适,能够保证 DataX 的流量最大化即可。