streaming Tdengine 数据订阅快速入门

前提条件

添加相应依赖服务(建议使用 Kube 部署)

  • tdengine 服务(如果不使用平台内置的 tdengine 服务,请到实时计算平台的环境设置里面设置 tmq 信息即可)
  • 实时计算平台
  • 事件中心(如果追求高性能,建议消息中间件使用 kafka 集群,可以使用平台内置的 kafka simple 集群)

当有需求要进行的配置

  • 配置外部的 Tdengine(进入应用/服务管理——》找到实时计算平台——》点击配置)

Alt text

  • 配置事件中心消息中间件为 kafka
    • 添加 kafka simple 集群
    • 配置 kafka

Alt text

trigger 开发

核心技术依赖到 事件中心 的 eventBridge

先开发自己的数据流事件接收接口

Alt text

public String receive(String cloudEvent) throws Exception {
    //请添加你的业务代码
    String res = null;
    try {
/** 
 * cloudEvent 内容如下   
{
    "datacontenttype": "application/json",
    // 此处为tdengine数据库值内容
    "data": {
        "current": 10.0,
        "groupid": 1,
        "location": "Q2FsaWZvcm5pYS5TYW5GcmFuY2lzY28=",
        "ts": 1721815994743,
        "voltage": 119
    },
    "subject": "tmq_topic--CAD460CCF1700001F6E1115D718F2080",
    "specversion": "v1",
    "id": "a950ea394dd0434cb9a457909136f5d9",
    "source": "tdengine-topic",
    "time": "2024-07-24 18:13:20",
    "type": "tmq_topic"
}
*/
        logger.info("sub-->{}", cloudEvent);

        boolean succeed = true;

        if (succeed) {
            res = JSON.toJSONString(ControllerResult.succeed("接收消息成功..."));
        } else {
            res = JSON.toJSONString(ControllerResult.failed("接收消息失败..."));  //如果要高可靠控制
        }
    } catch (Exception e) {
        res = JSON.toJSONString(ControllerResult.failed("接收消息失败..." + e.getMessage()));  //如果要高可靠控制
    }

    return res;
}

定义 event

以定义 tmq_topic 事件为例:

  • tmq_ 为固定标识,意为 tdengine 数据订阅的事件模式
  • tmq_topic 该主题必须与 tdengine SQL 执行创建的主题保持一致

创建主题,也就是这里的 tmqtopic,这些主题必须保证是 tmq 开头,才能与事件中心适配

create topic tmq_topic as select * from meter;

配置 trigger

配置好 trigger 后发布部署,即可生效

image-20240701181857227

image-20240701182012709

Alt text

Alt text

tdengine 服务插入数据测试观察

drop topic if exists tmq_topic;
drop database if exists meters;
create database meters;
use meters;
CREATE TABLE `meter` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24));
CREATE TABLE `d0` USING `meter` TAGS(0, 'California.LosAngles');
INSERT INTO `d0` values(now - 10s, 0.32, 116);
INSERT INTO `d0` values(now - 8s, NULL, NULL);
INSERT INTO `d1` USING `meter` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119);
INSERT INTO `d1` values (now-8s, 10, 120);

-- 创建主题
create topic tmq_topic as select * from meter;
--- 插入数据进行验证,看事件接受服务里是否接受到事件
INSERT INTO `d1` values (now - 6s, 10, 119) (now - 4s, 11.2, 118);

到事件接受服务中验证结果

Alt text

results matching ""

    No results matching ""