Flink CDC提交任务并验证
本文是本系列教程的第五部分:Flink CDC提交任务并验证。
前言
本文也是该系列教程的最后一篇,期望本系列教程可以实实在在的帮助到你。
本文将介绍如何把YAML任务文件使用flink-cdc.sh
提交到Flink环境中,最后我们再验证doris数据库中的数据,查看是否实时可查询。
数据准备
在提交任务之前,我们还需要准备好测试库和测试数据。
在第二篇中我们已经介绍了如何部署Mysql和Doris的环境,本文不再赘述。此处确保容器已经处于可用状态。
在Mysql中准备数据
- 进入Mysql容器
使用命令:
docker-compose exec mysql mysql -uroot -p123456
进入到容器内部
另外也可以用数据库管理工具连接数据库
- 创建数据库app_db和表orders, products, shipments,并插入数据
依次执行下面的SQL语句,完成数据库的创建和测试数据的插入
-- 创建数据库
CREATE DATABASE app_db;
USE app_db;
-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- 插入数据
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
至此,Mysql的准备工作就完成了。
在Doris中创建数据库
Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
- 进入 Doris Web UI。
访问http://localhost:8030/
,默认的用户名为 root,默认密码为空。
- 通过 Web UI 创建 app_db 数据库
create database app_db;
至此,Doris的准备工作就完成了。
通过Flink CDC CLI提交任务
同步任务依赖连接器,提交任务前确保flink-cdc-3.1.0/lib
目录下有如下连接器:
- flink-cdc-pipeline-connector-mysql-3.1.0.jar
- flink-cdc-pipeline-connector-doris-3.1.0.jar
如果没有则下载,下载地址为:
# 下载mysql连接器
wget https://repo1.maven.org/maven2/org/apache/flink/fli
nk-cdc-pipeline-connector-mysql/3.1.0/flink-cdc-pipeline-connector-mysql-3.1.0.jar
# 下载doris连接器
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.0/flink-cdc-pipeline-connector-doris-3.1.0.jar
另外,还需要确保Flink的flink-1.19.0/lib
下包含MySQL驱动,
如果没有则需要下载
# 下载MySQL驱动
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
提交任务
./bin/flink-cdc.sh mysql-to-doris.yaml
直接执行提交任务的命令,你是否又一次碰到了这个错误。别着急,你只是忘记了告诉Flink CDC环境你的Flink环境在哪里。
明确了问题,调整以下命令
./bin/flink-cdc.sh mysql-to-doris.yaml --flink-home /home/liuyq/flink-1.19.0
如果提交任务出现了如下图错误
请想一想,确保你的Flink服务已经启动了
验证
访问Flink Web UIhttp://localhost:8081/
,如果Running Jobs中出现下图中任务,则说明我们的实时数据同步任务已经成功上线了。
接下来,你就可以畅玩了,试一试在Mysql的表中插入一些数据,然后再以非常快的速度在Doris中查一下数据吧,看看是你的动作快,还是数据同步的快。
结语
本系列教程到这里就结束了,用非常有限的图文带领大家体验了Flink CDC的能力。但是对于我们个人的学习而言则才刚刚开始。
在教程陆续发布的这段时间,也非常感谢大家的各种提问,我尽量抽出时间帮大家解决问题。但是也希望大家可以理解,大家的环境不同、操作不同,我也没有办法一一帮助大家解决所有的问题,期望大家遇到问题不要着急,慢慢来,解决问题的过程也是深度学习的过程。
最后再次强调——多动手。
想要了解更多Flink CDC的内容,可以关注:遇码,回复flinkcdc,获取官方文档。