Skip to content

基于流的数据集成工具——Flink CDC

前面给大家介绍了Doris,如果你正好有实时数仓的建设需求,那不妨尝试一下。

现在又有人问了,怎么把数据库里的数据同步到Doris中呢?之前也给大家介绍推荐过数据集成工具SeaTunnel。

今天则为大家推荐一款开源、免费的基于流的数据集成工具——Flink CDC。

在介绍Flink CDC之前,需要强调一下:

  • Flink CDC基于Flink实现,如果您对Flink还不了解,可以专注我的后续教程。

  • Flink CDC也是一个数据集成工具,和其它数据集成工具如:SeaTunnel、DataX等功能上没有太大区别。

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

  • 端到端的数据集成框架
  • 为数据集成的用户提供了易于构建作业的 API
  • 支持在 Source 和 Sink 中处理多个表
  • 整库同步
  • 具备表结构变更自动同步的能力(Schema Evolution)

如何定义一个数据管道(Pipeline)

Flink CDC可以用一个YAML文件来定义一个Pipeline,以下案例则表达了该Pipeline从Mysql捕获实时变更(binlog),并将他们同步到Apache Doris中:

yaml
source:
  type: mysql # 定义数据源的数据库类型
  hostname: localhost # 数据库连接地址
  port: 3306 # 数据库端口号
  username: root # 数据库账户名
  password: 123456 # 数据库账户密码
  tables: app_db.\.* # 指定需要同步的数据库以及表,此处采用正则表达式的方式实现整库同步配置,多个规则可以用逗号分隔,如:adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
  server-id: 5400-5404 # 服务id,详细用法可以参考官方文档

sink:
  type: doris # 定义目标数据源的类型
  fenodes: 127.0.0.1:8030 # Doris集群FE的Http地址
  username: root # Doris集群的用户名
  password: "" # Doris集群的密码
  table.create.properties.light_schema_change: true # Doris表属性,是否使用light_schema_change优化
  table.create.properties.replication_num: 1 # Doris表属性,副本数

pipeline:
  name: Sync MySQL Database to Doris # 数据管道的名称
  parallelism: 2 # 数据管道的并发数

以上YAML文件就定义好了一个Pipeline,后续只需要使用flink-cds.sh提交YAML文件,一个Flink作业就会被编译并部署到指定的Flink集群了。

如何进阶

本文主要介绍了Flink CDC的作用,更详细的操作教程可以关注后续发布的系列教程。如果您想要进一步了解Flink CDC,可以关注:遇码,回复:flinkcdc,获取Flink CDC中文教程。

遇码MeetCoding 开源技术社区