streaming Tdengine 数据订阅快速入门
前提条件
添加相应依赖服务(建议使用 Kube 部署)
- tdengine 服务(如果不使用平台内置的 tdengine 服务,请到实时计算平台的环境设置里面设置 tmq 信息即可)
- 实时计算平台
- 事件中心(如果追求高性能,建议消息中间件使用 kafka 集群,可以使用平台内置的 kafka simple 集群)
当有需求要进行的配置
- 配置外部的 Tdengine(进入应用/服务管理——》找到实时计算平台——》点击配置)
- 配置事件中心消息中间件为 kafka
- 添加 kafka simple 集群
- 配置 kafka
trigger 开发
核心技术依赖到 事件中心 的 eventBridge
先开发自己的数据流事件接收接口
public String receive(String cloudEvent) throws Exception {
//请添加你的业务代码
String res = null;
try {
/**
* cloudEvent 内容如下
{
"datacontenttype": "application/json",
// 此处为tdengine数据库值内容
"data": {
"current": 10.0,
"groupid": 1,
"location": "Q2FsaWZvcm5pYS5TYW5GcmFuY2lzY28=",
"ts": 1721815994743,
"voltage": 119
},
"subject": "tmq_topic--CAD460CCF1700001F6E1115D718F2080",
"specversion": "v1",
"id": "a950ea394dd0434cb9a457909136f5d9",
"source": "tdengine-topic",
"time": "2024-07-24 18:13:20",
"type": "tmq_topic"
}
*/
logger.info("sub-->{}", cloudEvent);
boolean succeed = true;
if (succeed) {
res = JSON.toJSONString(ControllerResult.succeed("接收消息成功..."));
} else {
res = JSON.toJSONString(ControllerResult.failed("接收消息失败...")); //如果要高可靠控制
}
} catch (Exception e) {
res = JSON.toJSONString(ControllerResult.failed("接收消息失败..." + e.getMessage())); //如果要高可靠控制
}
return res;
}
定义 event
以定义 tmq_topic 事件为例:
- tmq_ 为固定标识,意为 tdengine 数据订阅的事件模式
- tmq_topic 该主题必须与 tdengine SQL 执行创建的主题保持一致
创建主题,也就是这里的 tmqtopic,这些主题必须保证是 tmq 开头,才能与事件中心适配
create topic tmq_topic as select * from meter;
配置 trigger
配置好 trigger 后发布部署,即可生效
tdengine 服务插入数据测试观察
drop topic if exists tmq_topic;
drop database if exists meters;
create database meters;
use meters;
CREATE TABLE `meter` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24));
CREATE TABLE `d0` USING `meter` TAGS(0, 'California.LosAngles');
INSERT INTO `d0` values(now - 10s, 0.32, 116);
INSERT INTO `d0` values(now - 8s, NULL, NULL);
INSERT INTO `d1` USING `meter` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119);
INSERT INTO `d1` values (now-8s, 10, 120);
-- 创建主题
create topic tmq_topic as select * from meter;
--- 插入数据进行验证,看事件接受服务里是否接受到事件
INSERT INTO `d1` values (now - 6s, 10, 119) (now - 4s, 11.2, 118);
到事件接受服务中验证结果