基于流的数据集成工具——Flink CDC
前面给大家介绍了Doris,如果你正好有实时数仓的建设需求,那不妨尝试一下。
现在又有人问了,怎么把数据库里的数据同步到Doris中呢?之前也给大家介绍推荐过数据集成工具SeaTunnel。
今天则为大家推荐一款开源、免费的基于流的数据集成工具——Flink CDC。
在介绍Flink CDC之前,需要强调一下:
Flink CDC基于Flink实现,如果您对Flink还不了解,可以专注我的后续教程。
Flink CDC也是一个数据集成工具,和其它数据集成工具如:SeaTunnel、DataX等功能上没有太大区别。
什么是Flink CDC
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
Flink CDC核心功能
- 端到端的数据集成框架
- 为数据集成的用户提供了易于构建作业的 API
- 支持在 Source 和 Sink 中处理多个表
- 整库同步
- 具备表结构变更自动同步的能力(Schema Evolution)
如何定义一个数据管道(Pipeline)
Flink CDC可以用一个YAML文件来定义一个Pipeline,以下案例则表达了该Pipeline从Mysql捕获实时变更(binlog),并将他们同步到Apache Doris中:
source:
type: mysql # 定义数据源的数据库类型
hostname: localhost # 数据库连接地址
port: 3306 # 数据库端口号
username: root # 数据库账户名
password: 123456 # 数据库账户密码
tables: app_db.\.* # 指定需要同步的数据库以及表,此处采用正则表达式的方式实现整库同步配置,多个规则可以用逗号分隔,如:adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404 # 服务id,详细用法可以参考官方文档
sink:
type: doris # 定义目标数据源的类型
fenodes: 127.0.0.1:8030 # Doris集群FE的Http地址
username: root # Doris集群的用户名
password: "" # Doris集群的密码
table.create.properties.light_schema_change: true # Doris表属性,是否使用light_schema_change优化
table.create.properties.replication_num: 1 # Doris表属性,副本数
pipeline:
name: Sync MySQL Database to Doris # 数据管道的名称
parallelism: 2 # 数据管道的并发数
以上YAML文件就定义好了一个Pipeline,后续只需要使用flink-cds.sh提交YAML文件,一个Flink作业就会被编译并部署到指定的Flink集群了。
如何进阶
本文主要介绍了Flink CDC的作用,更详细的操作教程可以关注后续发布的系列教程。如果您想要进一步了解Flink CDC,可以关注:遇码,回复:flinkcdc,获取Flink CDC中文教程。