背景 在大数据实时同步的场景中,一个常见场景是从 mysql OLTP 数据库同步到 doris OLAP 数据库,前者属于业务系统通常情况下使用的数据库,后者提供给业务的同事进行高并发、大批量的数据计算和分析
今年年初时,我们这里的部署方案还是 flink cdc 2.4 + flink sql,任务在 yarn 上运行,架构如下:
这个架构在需要同步的表不多的情况下,维护起来不算麻烦,不过也有两个实践中很关键的问题: 通过 SQL 提交后每张表都会单独拉取 binlog,造成资源浪费,以及频繁拉取造成 MySQL 压力较大;其次是不支持表结构同步,比如 mysql 新增了字段,虽然不会直接导致同步任务报错(修改字段类型除外),但是一旦业务说需要用到这个字段,那我就需要在 flink sql 中加上这个字段,下游的 doris 手动执行 alter 去添加字段,再把 flink 任务重启,这些操作,比创建一个新的任务还要麻烦
于是在看到 flink 3.0 对 schema evolution 和批量表同步支持之后,我也是第一时间去尝鲜,根据官方文档: Streaming ELT 同步 MySQL 到 Doris ,在本地运行了整套 mysql 、 doris 、 flink 以及 flink cdc 任务。本文总结了整套服务部署过程,也提示一些遇到的小坑,希望助大家丝滑体验 flink cdc
flink cdc 3.0 之后的任务架构如下:
flink cdc 功能速览 flink cdc  是一个支持实时同步的数据流框架,相比其他第三方实现的实时同步框架,如 seatunnel、chunjun 等,它的优势在于 flink 社区原生支持,版本迭代和新功能发布更快,不足点在于缺少任务管理功能,需要自己做好任务监控和自动恢复机制
3.0 架构:
图片来源-A Glimpse into Flink CDC 3.0 
3.0 之后 cdc 整体的架构就很清晰了,架构中几个比较重要的概念如下:
CDC Client: 新的用于提交任务的客户端封装
Data Pipeline: 整个 flink cdc 数据流称为 pipeline,内部包括必须配置的 Data Source 和 Data Sink,以及可选的 router
Pipeline Connector: 支持 schema evolution 的数据源,目前有 mysql ( debezium  ) 、 paimon ( paimon flink conector  ) 、 kafka ( kafka flink connector  ) 、doris ( doris flink connector  ) 、 starrocks ( starrocks flink connector  ) 和 elasticsearch ( elasticsearch flink connector  )
Source Connector: 包括 pipeline connector 以及其他暂不支持 pipeline 的其他数据源,如 pgsql (debezium) 和 tidb ( tikv client java  ) 等,可直接引用或通过 flink sql 使用
Data Source: 数据读取端封装
Data Sink: 数据写入端封装
Table ID: 标识同步数据源时需要指定的表标识,如 mysql 就是表名,kafka 就是 topic
Transform: 通过 SQL 或者 UDF 注入字段转换的特殊逻辑
Route: 路由,支持设置不同表名映射、多表对单表映射等
环境准备 需要部署的 mysql, doris 和 flink 都可以单点模式运行,因此接下来的所有部署操作都在一个 centos8 容器内执行
1 2 3 4 #  3306: mysql #  9030: doris fe #  8081: flink web docker run -it -p 3306:3306 -p 9030:9030 -p 8081:8081 -d --name test_flink centos8 
flink local singleton flink 选择 1.18 版本, 本地启动 jobmanager 和 taskmanager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #  下载安装包 wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz tar -xvf flink-1.18.1-bin-scala_2.12.tgz #  设置默认配置 #  web ui port rest.port: 24001 #  web ui bind  address rest.bind-address: 0.0.0.0 # #  启动 jobmanager 和 taskmanager bash /opt/modules/flink/bin/jobmanager.sh start bash /opt/modules/flink/bin/taskmanager.sh start #  运行 wordcount example /opt/modules/flink/bin/flink run /opt/modules/flink/examples/batch/WordCount.jar 
flink cdc 3.0 
3.0 之后的 flink cdc 可以直接通过它自己封装的 Client 提交任务了,因此除了和 2.4 同样的要下载数据源对应的 jar 包外,还需用到官方提供的客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #  从 release 中下载并安装 flink cdc wget https://dlcdn.apache.org/flink/flink-cdc-3.1.1/flink-cdc-3.1.1-bin.tar.gz tar -xzvf flink-cdc-3.1.1-bin.tar.gz mv flink-cdc-3.1.1 flink-cdc #  注意: lib 下面只有 cdc 本身的依赖,数据库相关的依赖包需要自行下载 #  运行创建 flink jdbc connector 的任务需要以下依赖 wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar -O lib/mysql-connector-java-8.0.30.jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.2-1.18/flink-connector-jdbc-3.1.2-1.18.jar -O lib/flink-connector-jdbc-3.1.2-1.18.jar #  运行从 mysql 同步到 doris 的 CDC 任务还需要以下依赖 wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/flink-cdc-pipeline-connector-mysql-3.1.1.jar -O lib/flink-cdc-pipeline-connector-mysql-3.1.1.jar wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.1/flink-cdc-pipeline-connector-doris-3.1.1.jar -O lib/flink-cdc-pipeline-connector-doris-3.1.1.jar 
注意: flink cdc 的版本要和 connector 的版本对应,否则可能会在提交任务时报 Cannot find factory with identifier “mysql” in the classpath 
 
mysql 这里我们部署 8.0.39 版本,作为数据源
注: 一开始部署 8.4.3 最新版本 mysql ,发现读取 binlog 信息的 SHOW MASTER STATUS 竟然执行不了,替换成了 SHOW BINARY LOG STATUS ,而 flink cdc 还没有做适配,因此只能把 mysql 回退重装了。看来什么都用社区最新版还是有一些踩坑的风险的😂
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #  从官网下载安装包,注意下载适配系统架构的版本 #  https://dev.mysql.com/downloads/mysql #  注意: 系统若已有 mariadb 相关的安装包,安装会报冲突,需要先卸载 mariadb 相关包 #  下载 arm 的 mysql 官方安装包 wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-common-8.0.39-1.el8.aarch64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-community-client-plugins-8.0.39-1.el8.aarch64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-community-libs-8.0.39-1.el8.aarch64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-community-client-8.0.39-1.el8.aarch64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-community-icu-data-files-8.0.39-1.el8.aarch64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-community-server-8.0.39-1.el8.aarch64.rpm #  x86 wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-common-8.0.39-1.el8.x86_64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-client-plugins-8.0.39-1.el8.x86_64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-libs-8.0.39-1.el8.x86_64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-client-8.0.39-1.el8.x86_64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-icu-data-files-8.0.39-1.el8.x86_64.rpm wget https://dev.mysql.com/get/Downloads/MySQL-8.4/mysql-community-server-8.0.39-1.el8.x86_64.rpm #  安装 rpm -ivh mysql-community-common-8.0.39-1.el8.aarch64.rpm rpm -ivh mysql-community-client-plugins-8.0.39-1.el8.aarch64.rpm rpm -ivh mysql-community-libs-8.0.39-1.el8.aarch64.rpm rpm -ivh mysql-community-client-8.0.39-1.el8.aarch64.rpm rpm -ivh mysql-community-icu-data-files-8.0.39-1.el8.aarch64.rpm rpm -ivh mysql-community-server-8.0.39-1.el8.aarch64.rpm #  初始化、首次启动 mysql mysqld --initialize mysqld --user=root #  从 /var/log /mysqld.log 中获得 mysql 初始密码 cat /var/log/mysqld.log | grep "A temporary password" #  从另一个窗口登录 mysql mysql -h127.0.0.1 -P3306 -uroot -p{mysql初始密码} #  修改 root 密码 SET PASSWORD = '{mysql密码}'; 
doris doris  是一个具备高性能实时计算和查询性能的OLAP数据库,经常用于从业务数据库实时同步数据后,进行海量数据分析、高并发即席查询、日志分析、用户画像分析等。在企业内最常用的场景是制作各种业务报表,提供给数据分析师和管理者。相关技术主要有存储模型、物化视图、向量化引擎、CBO优化等
doris 的基本组件为 fe 和 be,fe 负责管理集群,接收和处理用户请求,分析并得到SQL执行计划,be 负责数据存储和SQL的具体计算逻辑执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #  doris 3.0 版本需要用 jdk 17 #  adoptium 版本的开源 openjdk 可以从这里下载 #  https://github.com/adoptium/temurin17-binaries/releases wget https://github.com/adoptium/temurin17-binaries/releases/download/jdk-17.0.9%2B9/OpenJDK17U-jdk_aarch64_linux_hotspot_17.0.9_9.tar.gz #  设置环境变量 export JAVA_HOME=/opt/modules/java/jdk-17.0.9+9 #  下载 doris wget https://apache-doris-releases.oss-accelerate.aliyuncs.com/apache-doris-3.0.2-bin-arm64.tar.gz #  修改配置 #  fe: http_port: frontend 界面地址,默认 9030 #  fe: meta_dir: frontend 元数据存储路径 vim /opt/modules/doris/fe/conf/fe.conf meta_dir = /opt/modules/doris/fe/meta #  be: storage_root_path: 存储路径 vim /opt/modules/doris/be/conf/be.conf storage_root_path = /opt/modules/doris/be/data #  启动 fe mkdir -p /opt/modules/doris/fe/meta /opt/modules/doris/fe/bin/start_fe.sh --daemon #  查看启动日志 tail -f /opt/modules/doris/fe/log/fe.log #  启动 be mkdir -p /opt/modules/doris/be/data /opt/modules/doris/be/bin/start_be.sh --daemon #  查看启动日志 tail -f /opt/modules/doris/be/log/be.INFO #  启动后,首次登录 fe 不需要密码,设置 root 密码并添加 be mysql -h127.0.0.1 -P9030 -uroot SET PASSWORD FOR 'root' = PASSWORD('{Doris密码}'); ALTER SYSTEM ADD BACKEND "localhost:9050"; 
注: 容器内启动可能会遇到报错 Please set vm.max_map_count to be 2000000 under root using ‘sysctl -w vm.max_map_count=2000000’ ,为了测试可以先注释掉这段判断逻辑,在 /opt/modules/doris/be/bin/start_be.sh 中
 
任务配置与运行 前面的环境安装都是为了这里的任务运行做铺垫
mysql 模拟表 在 mysql 创建一张“业务”表
1 2 3 4 5 6 7 8 9 10 CREATE  TABLE  `d_music`.`album` (  `id` varchar (255 ) NOT  NULL ,   `create_time` datetime DEFAULT  CURRENT_TIMESTAMP ,   `name` varchar (255 ) NOT  NULL  DEFAULT  '' ,   `album_artist` varchar (255 ) NOT  NULL  DEFAULT  '' ,   `year ` int  NOT  NULL  DEFAULT  '0' ,   `song_count` int  NOT  NULL  DEFAULT  '0'    PRIMARY  KEY (`id`) ) ENGINE= InnoDB DEFAULT  CHARSET= utf8mb4; 
通过 dagagen 实时向 mysql 写入数据 flink connector  数据源有各种连接器,其中 datagen  是可以随机生成数据的 connector
针对刚才 mysql 创建的测试表,我们创建一个定时写入数据的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 CREATE  TEMPORARY TABLE  album_datagen (    id STRING,     name STRING,     album_artist STRING,     `year ` INT ,     song_count INT  ) WITH  (     'connector'  =  'datagen' ,     'rows-per-second'  =  '3'  ); CREATE  TABLE  album_mysql (    id STRING,     name STRING,     album_artist STRING,     `year ` INT ,     song_count INT ,   PRIMARY  KEY (id) NOT  ENFORCED ) WITH  (    'connector'  =  'jdbc' ,    'url'  =  'jdbc:mysql://localhost:3306/d_music' ,    'username'  =  'root' ,    'password'  =  '{mysql密码}' ,    'table-name'  =  'album'  ); insert  into  `album_mysql` select  *  from  album_datagen;
注: 这里 flink sql 的表没有设置 create_time 字段,为了让其能使用 mysql 表的默认值
 
注: rows-per-second 配置为datagen每秒产生的数据量,默认为1w
 
提交 flink sql:
1 /opt/modules/flink/bin/sql-client.sh -f /opt/modules/flink/album.sql 
提交后可以在 flink 界面看到 datagen 任务正常运行,并在 mysql 中查询到每秒写入的随机数据
1 mysql -hlocalhost -P3306 -uroot -p'{mysql密码}'  -e "select * from d_music.artist order by create_time desc limit 10"  
flink cdc 任务配置与提交 按照官方教程,修改 mysql 地址和 doris 地址、需要同步的库表即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 source:   type:  mysql    hostname:  localhost    port:  3306    username:  root    password:  {mysql密码 }   tables:  d_music.album    scan.incremental.snapshot.chunk.size:  1000    scan.snapshot.fetch.size:  1000    scan.startup.mode:  initial    server-time-zone:  UTC+8  sink:   type:  doris    fenodes:  localhost:8030    username:  root    password:  {doris密码 }   table.create.properties.light_schema_change:  true    table.create.properties.replication_num:  1  pipeline:   name:  sr_cdc_sink_doris_example    parallelism:  1  
注: 若任务运行的本地时区和 mysql 服务器时区不一致,server-time-zone 需要指定为本地时区
 
注: fenodes 对应 doris fe 的 http_port
 
注: scan.startup.mode: 类似 canal 的同步模式配置,initial 表示从头同步所有数据,还支持从指定时间戳同步
 
然后我们提交这个任务, 正常等待个几秒,在 doris 就能查到从 mysql 同步过来的数据了
1 2 3 4 #  注意需要显式声明 FLINK_HOME 即 flink 安装路径,在 flink-cdc.sh 脚本中用到 export FLINK_HOME=/opt/modules/flink /opt/modules/flink-cdc/bin/flink-cdc.sh /opt/modules/flink-cdc/album.yaml 
1 2 mysql - h127.0 .0 .1  - P9030 - uroot - p{Doris密码} - e "select * from d_music.album order by create_time desc limit 10" 
我们还能发现,Doris 这边是不需要提前建表的,flink cdc 已经帮我们建好了一张主键表,其中副本数之类的基本配置根据cdc 任务中的 table.create.properties.replication_num 来设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 show  create  table  d_music.album;CREATE  TABLE  `album` (  `id` varchar (765 ) NULL ,   `create_time` datetime NULL ,   `name` varchar (765 ) NULL ,   `album_artist` varchar (765 ) NULL ,   `year ` int  NULL ,   `song_count` int  NULL  ) ENGINE= OLAP UNIQUE  KEY(`id`)DISTRIBUTED BY  HASH(`id`) BUCKETS AUTO PROPERTIES ( "replication_allocation" =  "tag.location.default: 1", "min_load_replica_num" =  "-1", "is_being_synced" =  "false", "storage_medium" =  "hdd", "storage_format" =  "V2", "inverted_index_storage_format" =  "V2", "enable_unique_key_merge_on_write" =  "true", "light_schema_change" =  "true", "disable_auto_compaction" =  "false", "enable_single_replica_compaction" =  "false", "group_commit_interval_ms" =  "10000", "group_commit_data_bytes" =  "134217728", "enable_mow_light_delete" =  "false" ); 
当然,除了主键模型,doris 还支持聚合模型(Aggregate)和明细模型(Duplicate)表,参考 data models 。但是 flink cdc 对于同步源表中主键的表,会默认创建主键模型表。因此有创建其他模型表的话就需要提前建了
关于 flink cdc 实现提前建表和 schema evolution 的实现可以参考对各个数据源的 MetadataApplier 实现,如 DorisMetadataApplier 
schema evolution 铺垫了这么久,终于来到了测试表结构变更同步的环节
1 2 3 4 5 6 7 8 9 10 11 #  添加字段 mysql -hlocalhost -P3306 -uroot -p'{mysql密码}' -e "ALTER TABLE d_music.album ADD COLUMN duration DOUBLE AFTER song_count" #  修改后确认 doris 表结构是否更新 mysql -h127.0.0.1 -P9030 -uroot -p{doris密码} -e "show create table d_music.album" #  修改 mysql 字段名称 mysql -hlocalhost -P3306 -uroot -p'{mysql密码}' -e "ALTER TABLE d_music.album RENAME COLUMN duration TO duration_total" #  删除 mysql 字段 mysql -hlocalhost -P3306 -uroot -p'{mysql密码}' -e "ALTER TABLE d_music.album DROP COLUMN duration_total" 
从 flink taskmanager 日志中我们可以看到表结构变更时,会触发 SchemaChangeEvent 事件,后续的其他数据写入或变更操作都必须等待当前的 schema change 完成后才会继续进行,以保证数据一致性
注: 表字段类型变更的同步在 3.2.0  版本之后才支持,本文所用的 3.1 不支持
 
注: 不同数据源对字段变更的适配完善程度有所不同,如 Doris 在 cdc 最新版支持字段类型变更,但 Starrocks  还不支持
 
整库同步 我们在 mysql 中再创建两张表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 create  table  if not  exists  d_music.artist(     `id` varchar (255 ) NOT  NULL ,     `create_time` datetime DEFAULT  CURRENT_TIMESTAMP ,     `name` varchar (255 ) NOT  NULL  DEFAULT  '' ,     `album_count` int  NOT  NULL  DEFAULT  '0' ,     PRIMARY  KEY (`id`) )ENGINE= InnoDB DEFAULT  CHARSET= utf8mb4; create  table  d_music.playlist(     `id` varchar (255 ) NOT  NULL ,     `create_time` datetime DEFAULT  CURRENT_TIMESTAMP ,     `name` varchar (255 ) NOT  NULL  DEFAULT  '' ,     `comment` varchar (255 ) NOT  NULL  DEFAULT  '' ,     `song_count` int  NOT  NULL  DEFAULT  '0' ,     PRIMARY  KEY (`id`) )ENGINE= InnoDB DEFAULT  CHARSET= utf8mb4; 
对应的,我们在 flink 也再提交这两张新表的 datagen
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 CREATE  TEMPORARY TABLE  artist_datagen (    id STRING,     name STRING,     album_count INT  ) WITH  (     'connector'  =  'datagen' ,     'rows-per-second'  =  '3'  ); CREATE  TEMPORARY TABLE  playlist_datagen (    id STRING,     name STRING,     `comment` STRING,     song_count INT  ) WITH  (     'connector'  =  'datagen' ,     'rows-per-second'  =  '3'  ); CREATE  TABLE  artist_mysql (    id STRING,     name STRING,     album_count INT ,   PRIMARY  KEY (id) NOT  ENFORCED ) WITH  (    'connector'  =  'jdbc' ,    'url'  =  'jdbc:mysql://localhost:3306/d_music' ,    'username'  =  'root' ,    'password'  =  '{mysql密码}' ,    'table-name'  =  'artist'  ); CREATE  TABLE  playlist_mysql (    id STRING,     name STRING,     `comment` STRING,     song_count INT ,   PRIMARY  KEY (id) NOT  ENFORCED ) WITH  (    'connector'  =  'jdbc' ,    'url'  =  'jdbc:mysql://localhost:3306/d_music' ,    'username'  =  'root' ,    'password'  =  '{mysql密码}' ,    'table-name'  =  'playlist'  ); insert  into  `artist_mysql` select  *  from  artist_datagen;insert  into  `playlist_mysql` select  *  from  playlist_datagen;
最后我们再提交同步这三张表 flink cdc ,相比刚才同步一张表的配置只需要修改 tables,按照正则表达式配置即可。从 MySqlDataSourceFactory  可以找到对 tables 配置的解析逻辑
提交所有任务后,flink 上将会有四个任务,三个 datagen 对应三张表,而 flink cdc 对应的所有表都在一个任务中同步
另外,我们还可以注意 mysql 的连接数: 和 2.4 版本对比,这次一整个任务只需要一个连接,其读取 binlog 会自动解析给对应表的同步,而不需要每张表都创建一个连接
1 2 mysql - hlocalhost - P3306 - uroot - p'{mysql密码}'  - e "show processlist" |  grep "Binlog Dump" 
但注意: 有哪些表需要同步,是任务在创建的时候就解析的,后续源库再添加表,不会让现有的任务把这个新表也加入同步
总结 3.0 之后的 cdc 在功能上更加成熟,不仅是架构更清楚,通过 yaml 配置 cdc 任务的方式也更简化,和 flink 本身更统一了
不过从整个过程也能看到,在生产环境面对大量业务表需要同步的时候,先不说任务怎么做好监控,本身任务要做到高可用就必须把 checkpoint 打开,另外,不是所有 mysql 的表结构变更都能成功同步到下游,面对无法同步的特殊情况,还需要手动设置指定同步时间点恢复任务,一些繁琐的维护过程还是无法避免
后续有机会再探索一下这些优化的思路吧