本文csdn地址
背景
用户需求: 需要将 mongodb 的数据同步到 hive 表,共 2 亿+条数据,总数据量约 30G
查阅一些博客后,了解到同步方法有以下几种
通过 datax 同步数据
任务定义
json 配置文件参考官方文档进行适配即可
注意: hdfswriter 的 defaultFS 如果指定的是集群地址,比如 hdfs://cluster_name,可能会遇到解析不了的问题,需要把 hdfs 的配置文件打入到 hdfswriter-0.0.1-SNAPSHOT.jar 包中去
参考issue
jar 打包指令参考
datax 任务定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| { "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": [ "host1:27017", "host2:27017", "host3:27017" ], "collectionName": "collection_name", "column": [ { "index": 0, "name": "id", "type": "string" }, ... ], "dbName": "db", "userName": "user", "userPassword": "password" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "string" } ... ], "compress": "SNAPPY", "defaultFS": "hdfs://cluster_name", "fieldDelimiter": "\u0001", "fileName": "$table", "fileType": "orc", "path": "/user/hive/warehouse/db_name.db/table_name", "writeMode": "truncate" } } } ], "setting": { "speed": { "channel": "2" } } } }
|
效果
启动任务
1
| python bin/datax.py tmp/mongo_to_hive.json
|
等待同步完成后,查看 hdfs 对应表路径的数据,可以看到生成的 txt 文件
然后创建 hive 表关联这个路径即可
1 2 3 4 5 6 7 8 9
| create external table table_name ( id string, ... ) COMMENT '同步表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS ORC LOCATION '/user/hive/warehouse/db_name.db/table_name';
|
通过 seatunnel 同步数据
环境准备
不同于 datax,seatunnel 的安装过程稍微复杂点,而且和 hive 相关的ja包需要手动导入,因此在这里特别说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| # 下载 seatunnel 后,需要先安装插件(即各个connector) sh bin/install-plugin.sh 2.3.3
# 下载 hive 2.3.9 版本 (主要是为了拿到 jar 包,也可以从 maven 仓库单独下载下面的每个 jar 包) wget https://downloads.apache.org/hive/hive-2.3.9/apache-hive-2.3.9-bin.tar.gz
# 解压后,把以下 jar 包拷贝到 seatunnel 的 lib 下
seatunnel_home=/opt/modules/seatunnel
cd /opt/modules/apache-hive-2.3.9-bin/lib
cp hive-metastore-2.3.9.jar ${seatunnel_home}/lib
cp hive-common-2.3.9.jar ${seatunnel_home}/lib
cp hive-exec-2.3.9.jar ${seatunnel_home}/lib
cp libthrift-0.9.3.jar ${seatunnel_home}/lib
cp libfb303-0.9.3.jar ${seatunnel_home}/lib
|
任务定义
配置示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| env { execution.parallelism = 1 job.mode = "BATCH" }
source { MongoDB { uri = "mongodb://user:password@host1,host2,host3" database = "db" collection = "collection" schema = { fields { id = string create_time = string update_time = string name = string ... } } } }
sink { Hive { table_name = "db.table" metastore_uri = "thrift://metastore_host:9083" hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml" } }
|
效果
执行任务:
1
| ./bin/seatunnel.sh --config mongo_to_hive.conf -e local"
|
目前还存在 空值无法转换的问题,报错: Unable to convert to <STRING> from nullable value null,需要等待社区解决
相关 issue: [Bug] [MongoDB-CDC] 数据中有字段值为 Null 直接异常. #5340
和 datax 的对比
配置: 都是 hive reader 和 mongo writer 的两部分数据源配置方式
写入方式: 都是把数据写入到 hdfs 路径,hive 表需要用户自己创建
seatunnel 的 hive connector 也是先从 hive 中读取 location,拿到 hdfs 的路径,参考: HiveSinkAggregatedCommitter 继承了 FileSinkAggregatedCommitter, HiveSink 继承了 BaseHdfsFileSink
运行环境: datax 只能本地运行,seatunnel 支持 local、flink、spark 等运行模式