Flink尝鲜Apache Paimon入门案例
本文为大家讲解如何使用Flink完成Paimon官方的入门案例,建议大家收藏(对英文文档有恐惧感)。
本文会用到Flink环境,还不清楚如何部署Flink的同学可以查看:
《基于Flink CDC实现Mysql实时同步到Doris系列教程一:Flink环境的部署》
本文演示使用的Ubuntu环境。
下载并解压Flink
下载最新版本的Flink
wget https://dlcdn.apache.org/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz
解压后会得到flink-1.19.1
文件夹,并cd进入该目录。后续所有的操作都是在该目录下执行。
tar -xzf flink-1.19.1-bin-scala_2.12.tgz
cd flink-1.19.1
下载Paimon的jar包
使用Paimon需要用到paimon-flink.jar
和flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
jar包,下载与Flink相对应的版本,并复制到flink-1.19.1/lib
目录下。
wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.19/0.9-SNAPSHOT/paimon-flink-1.19-0.9-20240628.002224-23.jar
mv paimon-flink-1.19-0.9-20240628.002224-23.jar lib/
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
mv flink-shaded-hadoop-2-uber-2.8.3-10.0.jar lib/
启动Flink
在启动Flink之前,一定要注意,需要修改一个配置,不然后续会报错(非常打击士气)。
在flink-1.19.1/conf/conf.yaml
中找到taskmanager
,修改numberOfTaskSlots
配置为2,numberOfTaskSlots表示可以同时运行的任务数,也可以修改的更大一点。
启动Flink环境
./bin/start-cluster.sh
访问Flink Web UIhttp://localhost:8081
。
启动Flink Sql客户端
./bin/sql-client.sh
出现上图则代表成功了。
创建Catalog和表
create catalog my_catalog with (
'type'='paimon',
'warehouse'='file:/home/liuyq/paimon'
);
注意,此处file:/home/liuyq/paimon
为本地的一个绝对路径,会自动创建paimon
文件夹。
可以看到,paimon文件夹下会有default.db
,代表我们创建的my_catalog下有一个默认的default数据库。
切换到我们要使用的my_catalog下
use catalog my_catalog;
创建word_count
表
create table word_count (
word string primary key not enforced,
cnt bigint
);
此时,我们观察一下文件夹,发现多出来word_count文件夹,对应的正好就是我们刚刚创建好的表。
我们再看一下文件夹内的文件
写入数据
创建一个临时表,用于模拟生成单词
create temporary table word_table (
word string
) with (
'connector' = 'datagen',
'fields.word.length' = '1'
);
datagen
连接器用于模拟生成数据,该表中只有一个字符串字段word
,并且使用fields.word.length
指定生成的数据长度为1,注意此处1要带引号,不然会报错。生成的值如:a、b、c、1、2
等。
设置checkpoint
时长并写入数据
set 'execution.checkpointing.interval' = '10 s';
insert into word_count select word, count(*) from word_table group by word;
insert into
操作相当于给flink中提交了一个实时计算的任务。该任务会把10s内word_table
新产生的word数据聚合计算个数后把结果存入到word_count
表中。
OLAP查询
设置查询结果的样式为tableau
set 'sql-client.execution.result-mode' = 'tableau';
从流处理模式切换到批处理模式
reset 'execution.checkpointing.interval';
set 'execution.runtime-mode' = 'batch';
查询word_count
表中的数据
select * from word_count;
如何你卡在这里没有任何反应,说明你忘记修改numberOfTaskSlots
配置了,只需要退出后修改,再重复上面的操作即可。
批处理模式下的一次查询,也相当于给Flink中提交了一个任务,只不过该任务执行一次完成后就结束了。
流式查询
切换到流模式下
set 'execution.runtime-mode' = 'streaming';
实时查询
select `interval`, count(*) as interval_cnt from (select cnt / 10000 as `interval` from word_count) group by `interval`;
拉到最下面,仔细观察,你会发现数据每个大约10s会有变动。
此时,也相当于在Flink中提交了一个任务
在界面上取消任务,就会看到刚才的实时查询也中断了。
退出
退出SQL客户端
exit;
停止Flink环境
./bin/stop-cluster.sh
至此,一个我们完整体验了在Flink中使用Paimon。
进阶
现在,如果问你Paimon是什么?你心中会有怎样的答案呢。我们再看一下/paimon/default.db/word_count
下面有什么
这时我们是不是可以说Paimon是具有严格结构的一组文件夹和文件的组合,这些文件会描述出数据库、数据表、数据等,用这种方式表达数据库也正是数据湖。
有一点点绕,希望大家可以体会到。想要学习更多Paimon相关的内容,可以关注:遇码,回复Paimon获取官方文档。