背景 来自用户的需求: 用户有一部分数据来自 elasticsearch ,我们已经支持了通过 presto 查询 es 数据。但是用户需要将 es 表 和 hive 表做关联查询,而 presto 是不能跨数据源进行 join 查询的。所以需要先把 es 数据导入到 hive 中
用户对数据同步周期的要求并不高 一天1-2次就可以了,所以继续使用我们emr集群中已有的 azkaban 服务进行调度,把 es 数据同步到 hive 的过程写到 azkaban 中,实现了7张表的定期同步
hive 创建 es 外表 参考教程-Elasticsearch-Hive
hive 引入 elasticsearch-hadoop 依赖包 hive 默认不支持创建 es 外表,需要引入 elasticsearch-hadoop 依赖包
修改 hive.aux.jars.path 配置, 多个可以用逗号分隔,如下:
1 hive.aux.jars.path=file:///usr/hdp/2.6.3.0-235/hive/auxlib/elasticsearch-hadoop-hive-8.8.0.jar
创建 hive 外表 sql 示例:
1 2 3 4 5 6 7 CREATE EXTERNAL TABLE temp.es_external_table ( fieldNameA STRING, fieldNameB STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'es索引名' , 'es.nodes' = 'es_host' , 'es.port' = 'es_port' , 'es.mapping.names' = 'fieldNameA:fieldNameA,fieldNameB:fieldNameB' );
这里踩了一个坑: 由于 hive 创建表会忽略大小写,不管 sql 中定义的字段是什么样子,都会统一转成小写。所以导致es 中的驼峰名称字段 会映射失败,最后查出的数据都是 null
类似的坑 -创建mongodb 外表时遇到的
因此需要显式地通过 es.mapping.names 配置指定字段名称的关联关系,像示例那样
同步脚本 从 es 表到 hive 表,大致步骤为: 创建 hive 外表,关联 es 数据 => 创建 hive 内表 => 同步外表数据到内表
过程写到脚本中如下: (create_hive_to_es_table.sh)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 get_index_field_ret=`curl http://${es_address} /${index_name} ?pretty=true ` field_arr=`echo ${get_index_field_ret} | jq -r ".${index_name} .mappings.properties | keys | join(\" \")" ` temp_table_name="temp.es_${index_name} " temp_rename_table_name="${hive_db} .es_${index_name} _bak" actual_table_name="${hive_db} .es_${index_name} " create_external_table_sql="CREATE EXTERNAL TABLE ${temp_table_name} (" for current_field in ${field_arr[@]} do create_external_table_sql="${create_external_table_sql} ${current_field} STRING," done create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g' ` create_external_table_sql="${create_external_table_sql} ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource'='${index_name} ','es.nodes'='${es_host} ','es.port'='${es_port} ','es.mapping.names'='" for current_field in ${field_arr[@]} do create_external_table_sql="${create_external_table_sql} ${current_field} :${current_field} ," done create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g' ` create_external_table_sql="${create_external_table_sql} ')" drop_external_table_sql="drop table if exists ${temp_table_name} " echo "create external sql: ${create_external_table_sql} " beeline -n ${hive_user} -u ${hive_server} -e "${drop_external_table_sql} " beeline -n ${hive_user} -u ${hive_server} -e "${create_external_table_sql} " create_temp_table_sql="CREATE TABLE ${temp_rename_table_name} AS SELECT * FROM ${temp_table_name} " drop_temp_table_sql="drop table if exists ${temp_rename_table_name} " echo "create temp table sql: ${create_temp_table_sql} " beeline -n ${hive_user} -u ${hive_server} -e "${drop_temp_table_sql} " beeline -n ${hive_user} -u ${hive_server} -e "${create_temp_table_sql} " create_actual_table_sql="ALTER TABLE ${temp_rename_table_name} RENAME TO ${actual_table_name} " drop_actual_table_sql="drop table if exists ${actual_table_name} " echo "create actual table sql: ${create_actual_table_sql} " beeline -n ${hive_user} -u ${hive_server} -e "${drop_actual_table_sql} " beeline -n ${hive_user} -u ${hive_server} -e "${create_actual_table_sql} "
azkaban 任务 定义任务流程 需要重建7张表,因此定义成 父任务 -> 7个子任务
1 2 3 4 5 6 type =commandcommand =echo "es to hive success!" dependencies =table1,table2,table3,table4,table5,table6,table7
因为前面具体外表的创建流程 已经写在脚本中了,所以子任务这里直接调用 create_hive_to_es_table.sh 就行
1 2 3 4 5 6 7 8 9 10 11 12 13 type =flowjob.name =table1flow.name =ES_TO_HIVEindex.name =es索引名hive_db =目标 hive 库名type =commandcommand =sh create_hive_to_es_table.sh ${es.address} ${index.name} ${hive.server} ${hive.user} ${hive.db}
总结 基于目前的资料搜索 这种方案应该是 es数据同步到 hive 比较通用的。但是确实不适合大批量数据同步的场景,也不能直接同步增量数据
想同步增量数据的话 应该需要从数据源头入手了,比如 es 数据是来自 kafka 的,那么需要通过类似 canal 的服务来同步增量数据,架构和这里说到的远远不同