streaming flink数据事件概念和架构及原理

flink数据事件主要能力是实时扫描数据库数据的变化,将变化的数据增删改实时事件触发给用户。

典型应用场景:

  • 整库同步,将平台a的数据库同步到平台b使用
  • 对特定数据清洗或收集:比如对删除的数据进行归档历史
  • 用数据事件作为驱动引擎
  • SQL缓存服务的缓存刷新引擎

架构

图例说明:

event-trigger-cdc

用户通过配置 event 事件,可以将数据流事件最终路由到业务接口上

数据事件 cdc trigger 定义

{
    "trigger": [{
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
        "event": "cdc:t:flinkcdcdemo:database:appdef,app:uid:t",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "http://first-chensc-ide.trunk2.xcaas.com/main/subReceive",
                "body": "${cloudEvent}|json"
            }]
        }
    }]
}

上述event cdc:t:flinkcdcdemo:database:appdef,app:uid:t· 语义:cdc:是否使用模型资源映射:服务名:库名:表名集合:操作集合:映射选项 ,所有占位都用冒号隔开

  • 第一个占位cdc表示是数据事件类型,用它能与其他普通事件进行区分(必填)。
  • 第二个占位是否需要模型资源映射(必填)
    • t——表示使用模型资源映射,之后databaseid、tableName都将使用模型资源内容,都是逻辑库名、逻辑表名
    • f——表示不使用模型资源映射,直接使用物理名,如:物理库名、物理表名
  • 第三个占位是服务名(必填)。
  • 第四个占位库名(必填):
    • 如果是使用模型资源映射,这里就是《业务数据库》组件的id,如:database、comp.database2、comp.database3
    • 如果是不使用模型资源映射,这里就是模式或物理库名
  • 第五个占位是表名,这里可以是多个,多个用逗号隔开(可选):
    • 如果是使用模型资源映射,这里就是逻辑表名
    • 如果是不使用模型资源映射,这里就物理表名
  • 第六个占位是数据的操作,这里可以是多个。uid语义如下(可选):
    • u——表示只处理更新的事件
    • i——表示只处理插入的事件
    • d——表示只处理删除的事件
  • 第七个占位是映射选项,物理字段是否映射为逻辑字段,该占位只针对使用模型资源映射的场景(可选)
    • f——表示不映射,使用物理字段 physics
    • t——表示映射,使用逻辑字段 logic

注:event底层主题转换逻辑

  1. cdc:serviceName ——》cdc--serviceName 里面的冒号变为两个中横杠
  2. appdef,app ——》appdef__app 里面的逗号转为两个下划线
  3. comp.database2 ——》compdatabase2 里面的comp. 整个转为comp

注:占位必填项和缺省值

  • 占位1~4必填
  • 占位5开始如果不填(例如:cdc:t:flinkcdcdemo:database):代表要停止该event对应的dlink job

业务服务sub端收到的消息内容

//======== 收到的 cloudEvent ========
{
    "datacontenttype": "application/json",
    "data": {
        // 数据事件的具体内容
        "data": {
            "id": 111,
            "name": "hello 110",
            "description": "world",
            "sourceVirtualDatabaseName": "mydb",
            "sourceVirtualTableName": "products",
            "sourceVirtualSchemaName": "C_FLINKUSER"
        },
        "op": "UPDATE_BEFORE" // 一般四种类型 UPDATE_BEFORE、UPDATE_AFTER、DELETE、INSERT
    },
    "subject": "cdc--f--wf--202407011521170meeovvo--sa_task--uid--f--CACD5FE6C430000123A053801610152A",
    "specversion": "v1",
    "id": "876364e605f1464e9b7fbf99154c9b09",
    "source": "event-trigger-cdc",
    "time": "2024-07-01 18:14:28",
    "type": "cdc:f:wf:202407011521170meeovvo:sa_task:uid:f"
}
//======== 数据事件的具体内容 ========
// UPDATE_BEFORE
{
    "data": {
        "id": 111,
        "name": "hello 110",
        "description": "world",
        "sourceVirtualDatabaseName": "mydb",  // 库名
        "sourceVirtualTableName": "products", // 表名
        "sourceVirtualSchemaName": "C_FLINKUSER" // 模式名
    },
    "op": "UPDATE_BEFORE"
}
// UPDATE_AFTER通用容错
{
    "data": {
        "id": 111,
        "name": "hello 110",
        "description": "world",
        "sourceVirtualDatabaseName": "mydb", 
        "sourceVirtualTableName": "products,
        "sourceVirtualSchemaName": "C_FLINKUSER"
    },
    "op": "UPDATE_AFTER"
}
// UPDATE_AFTER定制(按照我们的规范设计表结构和写特定flinkSQL所得到的结果)
{
  "data": {
    "before": {
      "name": "hello 110",
      "description": "world",
      "id": 111,
      "sourceVirtualDatabaseName": "mydb", 
      "sourceVirtualTableName": "products,
      "sourceVirtualSchemaName": "C_FLINKUSER"
    },
    "after": {
      "id": 111,
      "name": "hello 110",
      "description": "world 221",
      "sourceVirtualDatabaseName": "mydb", 
      "sourceVirtualTableName": "products,
      "sourceVirtualSchemaName": "C_FLINKUSER"
    }
  },
  "op": "UPDATE_AFTER"
}
// DELETE
{
    "data": {
        "id": 111,
        "name": "hello 110",
        "description": "world 221",
        "sourceVirtualDatabaseName": "mydb", 
        "sourceVirtualTableName": "products,
        "sourceVirtualSchemaName": "C_FLINKUSER"
    },
    "op": "DELETE"
}
// INSERT
{
    "data": {
        "id": 111,
        "name": "hello",
        "description": "world",
        "sourceVirtualDatabaseName": "mydb", 
        "sourceVirtualTableName": "products,
        "sourceVirtualSchemaName": "C_FLINKUSER"
    },
    "op": "INSERT"
}

results matching ""

    No results matching ""