streaming flink 数据事件快速入门

前提条件

配置或安装数据库的 cdc

  • MySQL:开启 binlog
  • postgresql: 安装 Decoderbufs 插件,以支持Debezium
  • Oracle :配置开启 CDC——》LogMiner
  • SQLService:开启 cdc

非 root 账户需设置对 cdc 操作权限

  • MySQL:需要的权限 Replication client、REPLICATION SLAVE
  1. kafkasimple 集群
  2. flinks 集群
  3. dlink 服务
  4. 实时计算平台(必须在 dlink 服务之后启动)
  5. 事件中心 (配置好 kafka 类型)

事件中心设置 kafka 中间件

注:配置好后一定要重启

image-20240701152744293

image-20240701152932717

禁用 redis

1727336097074

配置 event-trigger-cdc

以下以 MySQL 同步《企业工作流》的 sa_task 表为例进行讲解。

核心技术依赖到 事件中心 的 eventBridge

开发数据流事件接收接口

在接口中会收到如下格式的数据

{
    "datacontenttype": "application/json",
    "data": {
        "op": "UPDATE_AFTER", // UPDATE_BEFORE、UPDATE_AFTER、DELETE、INSERT
        "data": {
            "before": {
                "sourceVirtualTableName": "sa_task", // 表名
                "sVars": "dsfsdf",
                "sSubPI": "sdfsd",
                "sourceVirtualDatabaseName": "202407011521170meeovvo", // 库名
                "sSubProcess": "sdfsd",
                "sourceVirtualSchemaName": "202407011521170meeovvo", //模式名
                "sID": "dfsf"
            },
            "after": {
                "sProcessVersion": "该如何",
                "sourceVirtualTableName": "sa_task",
                "sVars": "dsfsdf",
                "sSubPI": "sdfsd",
                "sourceVirtualDatabaseName": "202407011521170meeovvo",
                "sSubProcess": "sdfsd",
                "sourceVirtualSchemaName": "202407011521170meeovvo",
                "sID": "dfsf"
            }
        }
    },
    "subject": "cdc--f--wf--202407011521170meeovvo--sa_task--uid--f--CACD5FE6C430000123A053801610152A",
    "specversion": "v1",
    "id": "876364e605f1464e9b7fbf99154c9b09",
    "source": "event-trigger-cdc",
    "time": "2024-07-01 18:14:28",
    "type": "cdc:f:wf:202407011521170meeovvo:sa_task:uid:f"
}

接口中必须返回 ControllerResult.succeed,否则事件中心认为触发不成功,会再次触发

接口示例如下

image-20240701183107898

import com.justep.clients.eventing.vo.ControllerResult;

public String subscribe(String bodyParams) throws Exception {
    logger.info("cdc 事件内容--"+bodyParams);
    ControllerResult<Object> controllerResult;
    boolean condition = true;
    if (condition) {
        controllerResult = ControllerResult.succeed("成功...");
    }else {
        // 如果用户不想使用高可靠策略,可以使用  ControllerResult.succeed("失败...");
        // 因为一旦返回给事件中心failed,事件中心就会理解为要进行高可靠保证,将该条消息进行记录,然后加入重试队列,进入重试机制。
        controllerResult = ControllerResult.failed("失败...");
    }

    String resultString = JSON.toJSONString(controllerResult);
    return resultString;
}

定义 event trigger cdc

细节详情

我们以 cdc:f:wf:202407011521170meeovvo:sa_task:uid:f 事件定义为例进行反推原由

  • 第一占位 cdc 是固定格式意为cdc数据事件类型
  • 第二占位 f 表示不使用模型资源映射,直接使用物理名,如:物理库名、物理表名、物理字段
  • 第三占位 wf 表示服务名,也就是流程服务的唯一标识

image-20240701184643636

  • 第四占位 202407011521170meeovvo 是数据库名

image-20240701184858633

  • 第五占位 sa_task 是要同步数据的表名

image-20240701185116426

  • 第六占位 uid 表示 更新、插入、删除 等操作都要进行监听
  • 第七占位 f 表示数据库字段不使用逻辑字段映射

添加 trigger

从市场下载“定时和事件设置”组件

image-20240701181857227

打开全局设置中的事件派发设置

image-20240701182012709

新增 trigger

  • 事件 event :输入上一节讲解的事件 cdc:f:wf:202407011521170meeovvo:sa_task:uid:f
  • 目标 target:选择前面讲解的事件接收接口

1727333976496

配置信息保存到 UI2/pcx/timerAndTrigger.serviceMetaInfo.json 文件中

image-20240701182338322

发布应用,修改企业工作流 sa_task 表中的数据,会调用到事件接收接口

问题排查及注意事项

MySQL 必须要开启 binlog

开 MySQL conf 配置加入以下配置

    log-bin = /var/lib/mysql/mysql-bin
    binlog_format = ROW
    expire-logs-days=7
    max-binlog-size=500M

数据库普通用户要设置 cdc 权限(root 账户除外)

MySQL 数据库加入 binlog 读权限:需要两个权限 Replication client、Replication slave

以设置企业工作流 system 数据库为例说明

获取数据库信息

在应用服务管理中,查看企业工作流-资源-数据库管理-system 数据库的类型、数据库地址、用户名和数据库名称

1727331168331

获取 root 密码

本例企业工作流使用平台分配的 mysql 数据库,如果使用外部数据库,跳过下面的步骤,直接看授权 SQL

用 system 登录控制台,打开“中间件管理-中间件系统管理-数据库实例管理”,找到主机=数据库地址的行,记录 root 密码

1727331033638

打开“运维管理-数据库管理”

1727331937611

服务器输入数据库地址、用户名输入 root、密码输入 root 密码、数据库输入数据库名称,登录数据库

1727332025841

点“权限”

1727332044526

点用户名右侧的“编辑”按钮

1727332063446

.* 改为 *.*,选中 Replication client、Replicationslave 两个权限

1727331835906

点击下方的保存按钮,完成对用户的授权

1727331878869

授权 SQL

不使用权限界面授权,也可以执行下面的 SQL 授权

GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'user'@'%';

flink 中出现如下错误日志:

image-20240701164811213

处理方式,需对/data/storageDir文件夹加权限
1. 先查询到/data/storageDir挂载的nas是谁
flink@flink-jobmanager-6b686bbbf7-kj52l:~$ df -h
Filesystem                                            Size  Used Avail Use% Mounted on
overlay                                                97G   36G   57G  39% /
192.168.20.2:/nas/storage/flinkschensc6-clusterflink  1.5T  927G  475G  67% /data/storageDir

2. 再去nas中找到对应的文件夹给权限
chmod -R 777 /nas/storage/flinkschensc6-clusterflink

用户名密码:admin/admin

image-20240701185811236

image-20240701190347158

image-20240701190656519

image-20240701190756761

image-20240701190848329

kafka集群常用的几个操作指令

# 创建主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic topic:name:a --replication-factor 1 --partitions 1
# 列出主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 删除主题
kafka-topics.sh --delete --topic service--main--massage --bootstrap-server 127.0.0.1:9092
# 查看主题详情
kafka-topics.sh --describe  --topic service--main--massage --bootstrap-server 127.0.0.1:9092
# 列出消费分组
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
# 订阅某主题,进行实时打印消息
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "sink_log_kafka" --from-beginning --property print.key=true
# 向某主题发送消息
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic "sink_log_kafka" xxxx

# 查看主题配置详细
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic topic-demo01 --all --describe
# 设置某主题的配置
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic topic-demo01 --alter --add-config max.message.bytes=10485760 

# 批量删除 eventing 前缀的主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list |grep '^eventing' | while read line; do
  echo "删除主题: $line"
  kafka-topics.sh --delete --topic $line --bootstrap-server 127.0.0.1:9092
done

results matching ""

    No results matching ""