快速入门后端事件推送 eventBridge
使用的先决条件
通过事件中心实现消息队列分为三个步骤
- 在应用服务管理中添加事件中心
- 配置好 mq 消息中间件的连接信息
- 中间件的部署,平台只内置了两种 redis 和 kafka
- 添加一个应用作为事件触发服务
- 添加 clients maven 依赖
- 写事件发送逻辑
- 添加一个应用作为事件接收服务
- 配置 trigger 列表
- 添加 clients maven 依赖
- 写事件接收逻辑
事件中心的添加及配置
基本流程是:租户管理员登录——》打开应用/服务管理——》添加事件中心——》配置 mq 中间件环境变量——》注册到门户
添加事件中心
配置中间件环境变量
注:
- redis 和 kafka 是平台内置的中间件。
- redis 模式如果选择了,那么平台 rds 将为事件中心分配,所以中间件默认模式是 redis,如果用户想自行定义,请注意 redis 版本必须大于5.
- kafka 模式需要手动添加 kafkasimple 集群应用,如果用户想自定义,则需自己手动配置 kafka 集群这个环境变量。
- natsstreaming 平台并没有内置,所以需要事先用户安装好 NATS 服务端,然后再通过 NATS 环境变量配置连接信息。
- mqtt 平台也没有内置,用户需要事先安装好 mqtt 服务端,然后再通过 MQTT 环境变量即可连接上。
事件接收端
Maven依赖
<dependency>
<groupId>com.justep</groupId>
<artifactId>clients</artifactId>
<version>1.0.0</version>
</dependency>
这里平台已自带 clients.jar, pom 中可以不加入
编写代码接收消息
添加一个服务请求,参数是事件中心传过来的事件参数
- 请求中接收的参数是事件触发过来的消息体,json 格式
- 请求必须返回 ControllerResult<?>,以通知事件中心事件接收情况。包括两种情况:
- ControllerResult 中 succeed 的含义,通知事件中心事件已成功收到
- ControllerResult 中 failed 的含义,通知事件中心收到的事件有问题,需要重新派发事件
示例代码如下
import com.justep.clients.eventing.vo.ControllerResult;
import com.alibaba.fastjson.JSON;
public String eventReceive(String cloudEvent) throws Exception {
System.out.println("事件参数--"+cloudEvent);
ControllerResult<Object> controllerResult;
boolean condition = true;
if (condition) {
controllerResult = ControllerResult.succeed("成功...");
}else {
// 如果用户不想使用高可靠策略,可以使用 ControllerResult.succeed("失败...");
// 因为一旦返回给事件中心failed,事件中心就会理解为要进行高可靠保证,将该条消息进行记录,然后加入重试队列,进入重试机制。
controllerResult = ControllerResult.failed("失败...");
}
String resultString = JSON.toJSONString(controllerResult);
return resultString;
}
配置 trigger 定义事件主题
定时和事件设置
打开事件派发设置菜单
配置 trigger 列表
配置后保存好,整个 trigger 会存储到 UI2\pcx\timerAndTrigger.serviceMetaInfo.json 文件中,如果想要更多细节的定义,请到此处定义。
// timerAndTrigger.serviceMetaInfo.json 文件内容一般如下:
{
"trigger": [
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 30
}
}
],
"serviceInfo": {
"name": "{serviceName}",
"label": "{serviceLabel}"
},
"menu": {},
"authrize": {}
}
事件发送端
Maven依赖
<dependency>
<groupId>com.justep</groupId>
<artifactId>clients</artifactId>
<version>1.0.0</version>
</dependency>
这里平台已自带 clients.jar, pom 中可以不加入
编写消息发送代码
添加服务
核心是 massageInfo 对象,包括主题 event、来源 source 和数据 data。示例代码如下:
import com.justep.clients.eventing.EventBridgeClient;
import com.justep.clients.eventing.api.EventBridgeApi;
import com.justep.clients.eventing.vo.MassageInfo;
public String eventSend(String context) throws Exception {
// 事件,定义详细请看上方 triggerInfo 的含义
String event = "text:main:fuwu:eventreceive";
// clients包中的工具类,想了解细节请查看源码
EventBridgeApi eventBridgeApi = EventBridgeClient.getInstance();
// 核心 massageInfo 对象
MassageInfo<String> massageInfo = MassageInfo.<String>builder()
.event(event)
.source("eventsendchensc6")
.data(context)
.build();
eventBridgeApi.send(massageInfo);
return "事件发送成功...";
}









