streaming Tdengine 数据订阅概念和架构及原理
原理
- 通过配置事件中心 trigger,就能够将 TDengine MQ 消息流转到事件中心,然后又通过事件中心流转到用户端
- 用户只需要知晓主题即可订阅 TMQ 消息
- Tdengine 数据订阅,官方文档 https://docs.taosdata.com/develop/tmq/
流程图
TMQ trigger
核心技术依赖到 事件中心 的 eventBridge
以定义 tmq_topic 事件为例:
- tmq_ 为固定标识,意为 tdengine 数据订阅的事件模式
- tmq_topic 该主题必须与 tdengine SQL 执行创建的主题保持一致
创建主题,也就是这里的 tmqtopic,这些主题必须保证是 tmq 开头,才能与事件中心适配
create topic tmq_topic as select * from meter;
trigger示例大致如下:
{
"trigger": [{
"id": "CAD460CCF1700001F6E1115D718F2080",
"event": "tmq_topic", // TMQ事件定义
"target": {
"request": [{
"method": "POST",
"body": "${cloudEvent}|json",
"url": "service://cdcreceive/main/sub/receive"
}]
}
}]
}
业务服务接受到的消息内容格式
{
"datacontenttype": "application/json",
// 此处为tdengine数据库值内容
"data": {
"current": 10.0,
"groupid": 1,
"location": "Q2FsaWZvcm5pYS5TYW5GcmFuY2lzY28=",
"ts": 1721815994743,
"voltage": 119
},
"subject": "tmq_topic--CAD460CCF1700001F6E1115D718F2080",
"specversion": "v1",
"id": "a950ea394dd0434cb9a457909136f5d9",
"source": "tdengine-topic",
"time": "2024-07-24 18:13:20",
"type": "tmq_topic"
}
开放的接口
POST /tmq/execute/sql
执行 tdengine SQL,注:如果是 select 将没有返回结果
// body json 如下
{
"sqls": ["sql1", "sql2", "sql3"]
}
POST /tmq/disable/topic
主题销毁
注:125秒内生效
// body
{
"topics": ["topic1", "topic2", "topic3"]
}
POST /tmq/create/topic
主题创建
// body
{
"database": "database1",
"topicSqls": ["executeTopicSql1", "executeTopicSql2", "CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;"]
}