streaming flink 数据事件快速入门
前提条件
配置或安装数据库的 cdc
- MySQL:开启 binlog
- postgresql: 安装 Decoderbufs 插件,以支持Debezium
- Oracle :配置开启 CDC——》LogMiner
- SQLService:开启 cdc
非 root 账户需设置对 cdc 操作权限
- MySQL:需要的权限 Replication client、REPLICATION SLAVE
添加 flink 生态和依赖服务(建议都使用 kube 部署)
- kafkasimple 集群
- flinks 集群
- dlink 服务
- 实时计算平台(必须在 dlink 服务之后启动)
- 事件中心 (配置好 kafka 类型)
事件中心设置 kafka 中间件
注:配置好后一定要重启
禁用 redis
配置 event-trigger-cdc
以下以 MySQL 同步《企业工作流》的 sa_task 表为例进行讲解。
核心技术依赖到 事件中心 的 eventBridge
开发数据流事件接收接口
在接口中会收到如下格式的数据
{
"datacontenttype": "application/json",
"data": {
"op": "UPDATE_AFTER", // UPDATE_BEFORE、UPDATE_AFTER、DELETE、INSERT
"data": {
"before": {
"sourceVirtualTableName": "sa_task", // 表名
"sVars": "dsfsdf",
"sSubPI": "sdfsd",
"sourceVirtualDatabaseName": "202407011521170meeovvo", // 库名
"sSubProcess": "sdfsd",
"sourceVirtualSchemaName": "202407011521170meeovvo", //模式名
"sID": "dfsf"
},
"after": {
"sProcessVersion": "该如何",
"sourceVirtualTableName": "sa_task",
"sVars": "dsfsdf",
"sSubPI": "sdfsd",
"sourceVirtualDatabaseName": "202407011521170meeovvo",
"sSubProcess": "sdfsd",
"sourceVirtualSchemaName": "202407011521170meeovvo",
"sID": "dfsf"
}
}
},
"subject": "cdc--f--wf--202407011521170meeovvo--sa_task--uid--f--CACD5FE6C430000123A053801610152A",
"specversion": "v1",
"id": "876364e605f1464e9b7fbf99154c9b09",
"source": "event-trigger-cdc",
"time": "2024-07-01 18:14:28",
"type": "cdc:f:wf:202407011521170meeovvo:sa_task:uid:f"
}
接口中必须返回 ControllerResult.succeed,否则事件中心认为触发不成功,会再次触发
接口示例如下
import com.justep.clients.eventing.vo.ControllerResult;
public String subscribe(String bodyParams) throws Exception {
logger.info("cdc 事件内容--"+bodyParams);
ControllerResult<Object> controllerResult;
boolean condition = true;
if (condition) {
controllerResult = ControllerResult.succeed("成功...");
}else {
// 如果用户不想使用高可靠策略,可以使用 ControllerResult.succeed("失败...");
// 因为一旦返回给事件中心failed,事件中心就会理解为要进行高可靠保证,将该条消息进行记录,然后加入重试队列,进入重试机制。
controllerResult = ControllerResult.failed("失败...");
}
String resultString = JSON.toJSONString(controllerResult);
return resultString;
}
定义 event trigger cdc
我们以 cdc:f:wf:202407011521170meeovvo:sa_task:uid:f
事件定义为例进行反推原由
- 第一占位 cdc 是固定格式意为cdc数据事件类型
- 第二占位 f 表示不使用模型资源映射,直接使用物理名,如:物理库名、物理表名、物理字段
- 第三占位 wf 表示服务名,也就是流程服务的唯一标识
- 第四占位 202407011521170meeovvo 是数据库名
- 第五占位 sa_task 是要同步数据的表名
- 第六占位 uid 表示 更新、插入、删除 等操作都要进行监听
- 第七占位 f 表示数据库字段不使用逻辑字段映射
添加 trigger
从市场下载“定时和事件设置”组件
打开全局设置中的事件派发设置
新增 trigger
- 事件 event :输入上一节讲解的事件
cdc:f:wf:202407011521170meeovvo:sa_task:uid:f
- 目标 target:选择前面讲解的事件接收接口
配置信息保存到 UI2/pcx/timerAndTrigger.serviceMetaInfo.json 文件中
发布应用,修改企业工作流 sa_task 表中的数据,会调用到事件接收接口
问题排查及注意事项
MySQL 必须要开启 binlog
开 MySQL conf 配置加入以下配置
log-bin = /var/lib/mysql/mysql-bin
binlog_format = ROW
expire-logs-days=7
max-binlog-size=500M
数据库普通用户要设置 cdc 权限(root 账户除外)
MySQL 数据库加入 binlog 读权限:需要两个权限 Replication client、Replication slave
以设置企业工作流 system 数据库为例说明
获取数据库信息
在应用服务管理中,查看企业工作流-资源-数据库管理-system 数据库的类型、数据库地址、用户名和数据库名称
获取 root 密码
本例企业工作流使用平台分配的 mysql 数据库,如果使用外部数据库,跳过下面的步骤,直接看授权 SQL
用 system 登录控制台,打开“中间件管理-中间件系统管理-数据库实例管理”,找到主机=数据库地址的行,记录 root 密码
打开“运维管理-数据库管理”
服务器输入数据库地址、用户名输入 root、密码输入 root 密码、数据库输入数据库名称,登录数据库
点“权限”
点用户名右侧的“编辑”按钮
将 .*
改为 *.*
,选中 Replication client、Replicationslave 两个权限
点击下方的保存按钮,完成对用户的授权
授权 SQL
不使用权限界面授权,也可以执行下面的 SQL 授权
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'user'@'%';
flink 中 /data/storageDir/default 目录无操作权限问题
flink 中出现如下错误日志:
处理方式,需对/data/storageDir文件夹加权限
1. 先查询到/data/storageDir挂载的nas是谁
flink@flink-jobmanager-6b686bbbf7-kj52l:~$ df -h
Filesystem Size Used Avail Use% Mounted on
overlay 97G 36G 57G 39% /
192.168.20.2:/nas/storage/flinkschensc6-clusterflink 1.5T 927G 475G 67% /data/storageDir
2. 再去nas中找到对应的文件夹给权限
chmod -R 777 /nas/storage/flinkschensc6-clusterflink
dlink 中台监控界面
用户名密码:admin/admin
flink dashbord 界面
kafka集群常用的几个操作指令
# 创建主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic topic:name:a --replication-factor 1 --partitions 1
# 列出主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 删除主题
kafka-topics.sh --delete --topic service--main--massage --bootstrap-server 127.0.0.1:9092
# 查看主题详情
kafka-topics.sh --describe --topic service--main--massage --bootstrap-server 127.0.0.1:9092
# 列出消费分组
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
# 订阅某主题,进行实时打印消息
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "sink_log_kafka" --from-beginning --property print.key=true
# 向某主题发送消息
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic "sink_log_kafka" xxxx
# 查看主题配置详细
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic topic-demo01 --all --describe
# 设置某主题的配置
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic topic-demo01 --alter --add-config max.message.bytes=10485760
# 批量删除 eventing 前缀的主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list |grep '^eventing' | while read line; do
echo "删除主题: $line"
kafka-topics.sh --delete --topic $line --bootstrap-server 127.0.0.1:9092
done