streaming flink数据事件概念和架构及原理
flink数据事件主要能力是实时扫描数据库数据的变化,将变化的数据增删改实时事件触发给用户。
典型应用场景:
- 整库同步,将平台a的数据库同步到平台b使用
- 对特定数据清洗或收集:比如对删除的数据进行归档历史
- 用数据事件作为驱动引擎
- SQL缓存服务的缓存刷新引擎
架构
图例说明:
flink集群 https://flink.apache.org/
主要能力是实时扫描数据库数据的变化,形成数据事件。
实时计算平台
- 动态创建flink job:扫描到所有配置的trigger列表得到event——》将所得的event分析得到库表名称——》根据库表名称rds中获取库表详细连接信息——》进而就可以创建flink job
- 数据的逻辑与物理映射:flink cdc从数据库中拿到的数据都是物理字段,而咱们平台的数据模型内部定义的都是逻辑字段,这时候就需要提供一个选项将数据从物理字段映射到逻辑字段
- 主题分发、事件分发:从kafka中拿到的第一手数据,进行逻辑执行后会分发到事件订阅端
事件中心(详情):一个是订阅kafka的flink事件主题,另一个是承接实时计算平台分发事件,然后最终把消息推送到sub端
event-trigger-cdc
用户通过配置 event 事件,可以将数据流事件最终路由到业务接口上
数据事件 cdc trigger 定义
{
"trigger": [{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
"event": "cdc:t:flinkcdcdemo:database:appdef,app:uid:t",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "http://first-chensc-ide.trunk2.xcaas.com/main/subReceive",
"body": "${cloudEvent}|json"
}]
}
}]
}
上述event cdc:t:flinkcdcdemo:database:appdef,app:uid:t·
语义:cdc:是否使用模型资源映射:服务名:库名:表名集合:操作集合:映射选项 ,所有占位都用冒号隔开
- 第一个占位cdc表示是数据事件类型,用它能与其他普通事件进行区分(必填)。
- 第二个占位是否需要模型资源映射(必填)
- t——表示使用模型资源映射,之后databaseid、tableName都将使用模型资源内容,都是逻辑库名、逻辑表名
- f——表示不使用模型资源映射,直接使用物理名,如:物理库名、物理表名
- 第三个占位是服务名(必填)。
- 第四个占位库名(必填):
- 如果是使用模型资源映射,这里就是《业务数据库》组件的id,如:database、comp.database2、comp.database3
- 如果是不使用模型资源映射,这里就是模式或物理库名
- MySQL 使用库名
- oracle 使用模式名
- sqlService 使用库名
- Postgres 使用模式名
- 各类型数据库cdc开启的注意事项参考文档
- 第五个占位是表名,这里可以是多个,多个用逗号隔开(可选):
- 如果是使用模型资源映射,这里就是逻辑表名
- 如果是不使用模型资源映射,这里就物理表名
- 第六个占位是数据的操作,这里可以是多个。uid语义如下(可选):
- u——表示只处理更新的事件
- i——表示只处理插入的事件
- d——表示只处理删除的事件
- 第七个占位是映射选项,物理字段是否映射为逻辑字段,该占位只针对使用模型资源映射的场景(可选)
- f——表示不映射,使用物理字段 physics
- t——表示映射,使用逻辑字段 logic
注:event底层主题转换逻辑
- cdc:serviceName ——》cdc--serviceName 里面的冒号变为两个中横杠
- appdef,app ——》appdef__app 里面的逗号转为两个下划线
- comp.database2 ——》compdatabase2 里面的comp. 整个转为comp
注:占位必填项和缺省值
- 占位1~4必填
- 占位5开始如果不填(例如:
cdc:t:flinkcdcdemo:database
):代表要停止该event对应的dlink job
业务服务sub端收到的消息内容
//======== 收到的 cloudEvent ========
{
"datacontenttype": "application/json",
"data": {
// 数据事件的具体内容
"data": {
"id": 111,
"name": "hello 110",
"description": "world",
"sourceVirtualDatabaseName": "mydb",
"sourceVirtualTableName": "products",
"sourceVirtualSchemaName": "C_FLINKUSER"
},
"op": "UPDATE_BEFORE" // 一般四种类型 UPDATE_BEFORE、UPDATE_AFTER、DELETE、INSERT
},
"subject": "cdc--f--wf--202407011521170meeovvo--sa_task--uid--f--CACD5FE6C430000123A053801610152A",
"specversion": "v1",
"id": "876364e605f1464e9b7fbf99154c9b09",
"source": "event-trigger-cdc",
"time": "2024-07-01 18:14:28",
"type": "cdc:f:wf:202407011521170meeovvo:sa_task:uid:f"
}
//======== 数据事件的具体内容 ========
// UPDATE_BEFORE
{
"data": {
"id": 111,
"name": "hello 110",
"description": "world",
"sourceVirtualDatabaseName": "mydb", // 库名
"sourceVirtualTableName": "products", // 表名
"sourceVirtualSchemaName": "C_FLINKUSER" // 模式名
},
"op": "UPDATE_BEFORE"
}
// UPDATE_AFTER通用容错
{
"data": {
"id": 111,
"name": "hello 110",
"description": "world",
"sourceVirtualDatabaseName": "mydb",
"sourceVirtualTableName": "products,
"sourceVirtualSchemaName": "C_FLINKUSER"
},
"op": "UPDATE_AFTER"
}
// UPDATE_AFTER定制(按照我们的规范设计表结构和写特定flinkSQL所得到的结果)
{
"data": {
"before": {
"name": "hello 110",
"description": "world",
"id": 111,
"sourceVirtualDatabaseName": "mydb",
"sourceVirtualTableName": "products,
"sourceVirtualSchemaName": "C_FLINKUSER"
},
"after": {
"id": 111,
"name": "hello 110",
"description": "world 221",
"sourceVirtualDatabaseName": "mydb",
"sourceVirtualTableName": "products,
"sourceVirtualSchemaName": "C_FLINKUSER"
}
},
"op": "UPDATE_AFTER"
}
// DELETE
{
"data": {
"id": 111,
"name": "hello 110",
"description": "world 221",
"sourceVirtualDatabaseName": "mydb",
"sourceVirtualTableName": "products,
"sourceVirtualSchemaName": "C_FLINKUSER"
},
"op": "DELETE"
}
// INSERT
{
"data": {
"id": 111,
"name": "hello",
"description": "world",
"sourceVirtualDatabaseName": "mydb",
"sourceVirtualTableName": "products,
"sourceVirtualSchemaName": "C_FLINKUSER"
},
"op": "INSERT"
}