streaming Tdengine 数据订阅概念和架构及原理

原理

  • 通过配置事件中心 trigger,就能够将 TDengine MQ 消息流转到事件中心,然后又通过事件中心流转到用户端
  • 用户只需要知晓主题即可订阅 TMQ 消息
  • Tdengine 数据订阅,官方文档 https://docs.taosdata.com/develop/tmq/

流程图

Alt text

TMQ trigger

核心技术依赖到 事件中心 的 eventBridge

以定义 tmq_topic 事件为例:

  • tmq_ 为固定标识,意为 tdengine 数据订阅的事件模式
  • tmq_topic 该主题必须与 tdengine SQL 执行创建的主题保持一致

创建主题,也就是这里的 tmqtopic,这些主题必须保证是 tmq 开头,才能与事件中心适配

create topic tmq_topic as select * from meter;

trigger示例大致如下:

{
    "trigger": [{
        "id": "CAD460CCF1700001F6E1115D718F2080",
        "event": "tmq_topic", // TMQ事件定义
        "target": {
            "request": [{
                "method": "POST",
                "body": "${cloudEvent}|json",
                "url": "service://cdcreceive/main/sub/receive"
            }]
        }
    }]
}

业务服务接受到的消息内容格式

{
    "datacontenttype": "application/json",
    // 此处为tdengine数据库值内容
    "data": {
        "current": 10.0,
        "groupid": 1,
        "location": "Q2FsaWZvcm5pYS5TYW5GcmFuY2lzY28=",
        "ts": 1721815994743,
        "voltage": 119
    },
    "subject": "tmq_topic--CAD460CCF1700001F6E1115D718F2080",
    "specversion": "v1",
    "id": "a950ea394dd0434cb9a457909136f5d9",
    "source": "tdengine-topic",
    "time": "2024-07-24 18:13:20",
    "type": "tmq_topic"
}

开放的接口

POST /tmq/execute/sql

执行 tdengine SQL,注:如果是 select 将没有返回结果

// body json 如下
{
    "sqls": ["sql1", "sql2", "sql3"]
}

POST /tmq/disable/topic

主题销毁

注:125秒内生效

// body
{
    "topics": ["topic1", "topic2", "topic3"]
}

POST /tmq/create/topic

主题创建

// body
{
    "database": "database1",
    "topicSqls": ["executeTopicSql1", "executeTopicSql2", "CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;"]
}

results matching ""

    No results matching ""