快速入门后端事件推送 eventBridge

使用的先决条件

通过事件中心实现消息队列分为三个步骤

  • 在应用服务管理中添加事件中心
    • 配置好 mq 消息中间件的连接信息
    • 中间件的部署,平台只内置了两种 redis 和 kafka
  • 添加一个应用作为事件触发服务
    • 添加 clients maven 依赖
    • 写事件发送逻辑
  • 添加一个应用作为事件接收服务
    • 配置 trigger 列表
    • 添加 clients maven 依赖
    • 写事件接收逻辑

事件中心的添加及配置

基本流程是:租户管理员登录——》打开应用/服务管理——》添加事件中心——》配置 mq 中间件环境变量——》注册到门户

添加事件中心

Alt text

配置中间件环境变量

Alt text

Alt text

Alt text

注:

  • redis 和 kafka 是平台内置的中间件。
  • redis 模式如果选择了,那么平台 rds 将为事件中心分配,所以中间件默认模式是 redis,如果用户想自行定义,请注意 redis 版本必须大于5.
  • kafka 模式需要手动添加 kafkasimple 集群应用,如果用户想自定义,则需自己手动配置 kafka 集群这个环境变量。
  • natsstreaming 平台并没有内置,所以需要事先用户安装好 NATS 服务端,然后再通过 NATS 环境变量配置连接信息。
  • mqtt 平台也没有内置,用户需要事先安装好 mqtt 服务端,然后再通过 MQTT 环境变量即可连接上。

事件接收端

Maven依赖

<dependency>
    <groupId>com.justep</groupId>
    <artifactId>clients</artifactId>
    <version>1.0.0</version>
</dependency>

这里平台已自带 clients.jar, pom 中可以不加入

编写代码接收消息

添加一个服务请求,参数是事件中心传过来的事件参数

Alt text

  • 请求中接收的参数是事件触发过来的消息体,json 格式
  • 请求必须返回 ControllerResult<?>,以通知事件中心事件接收情况。包括两种情况:
    • ControllerResult 中 succeed 的含义,通知事件中心事件已成功收到
    • ControllerResult 中 failed 的含义,通知事件中心收到的事件有问题,需要重新派发事件

示例代码如下

import com.justep.clients.eventing.vo.ControllerResult;
import com.alibaba.fastjson.JSON;

public String eventReceive(String cloudEvent) throws Exception {
    System.out.println("事件参数--"+cloudEvent);
    ControllerResult<Object> controllerResult;
    boolean condition = true;
    if (condition) {
        controllerResult = ControllerResult.succeed("成功...");
    }else {
        // 如果用户不想使用高可靠策略,可以使用  ControllerResult.succeed("失败...");
        // 因为一旦返回给事件中心failed,事件中心就会理解为要进行高可靠保证,将该条消息进行记录,然后加入重试队列,进入重试机制。
        controllerResult = ControllerResult.failed("失败...");
    }

    String resultString = JSON.toJSONString(controllerResult);
    return resultString;
}

配置 trigger 定义事件主题

定时和事件设置

Alt text

打开事件派发设置菜单

Alt text

配置 trigger 列表

Alt text

配置后保存好,整个 trigger 会存储到 UI2\pcx\timerAndTrigger.serviceMetaInfo.json 文件中,如果想要更多细节的定义,请到此处定义。

Alt text

// timerAndTrigger.serviceMetaInfo.json  文件内容一般如下:
{
  "trigger": [
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            "timeoutSeconds": 30
        }
      }
  ],
  "serviceInfo": {
    "name": "{serviceName}",
    "label": "{serviceLabel}"
  },
  "menu": {},
  "authrize": {}
}

事件发送端

Maven依赖

<dependency>
    <groupId>com.justep</groupId>
    <artifactId>clients</artifactId>
    <version>1.0.0</version>
</dependency>

这里平台已自带 clients.jar, pom 中可以不加入

编写消息发送代码

添加服务

Alt text

核心是 massageInfo 对象,包括主题 event、来源 source 和数据 data。示例代码如下:

import com.justep.clients.eventing.EventBridgeClient;
import com.justep.clients.eventing.api.EventBridgeApi;
import com.justep.clients.eventing.vo.MassageInfo;

public String eventSend(String context) throws Exception {
    // 事件,定义详细请看上方 triggerInfo 的含义
    String event = "text:main:fuwu:eventreceive";
    // clients包中的工具类,想了解细节请查看源码
    EventBridgeApi eventBridgeApi = EventBridgeClient.getInstance();
    // 核心 massageInfo 对象
    MassageInfo<String> massageInfo = MassageInfo.<String>builder()
            .event(event)
            .source("eventsendchensc6")
            .data(context)
            .build();
    eventBridgeApi.send(massageInfo);
    return "事件发送成功...";
}

results matching ""

    No results matching ""