后端事件推送 eventBridge
透析 trigger
trigger 即是事件主题的定义,和事件路由规则的定义。
triggerInfo 对象属性
字段 | 字段说明 | 类型 | 备注 |
---|---|---|---|
id | 唯一标识 | String | |
event | 事件 | String | 建议定义规范:服务名:模块名:类名:方法名 |
describe | 描述 | String | 非必填可以选 |
target | 用户配置的目标模版 | array、JSON | 包括目标的地址、参数、body |
event 字段说明
- event 的定义如下:
entry:main:massage:sendmessage
, - 真正映射到 mqService 中的主题为 entry--main--massage--sendmessage,
- 转换原则是把 : 转为 --
注:转为的 entry--main--massage--sendmessage 形式的主题(是中间件 mqService 中的主题),如果需要事件中心与中间件 mqService 直接对接,那么需要这样做,例如:kafka 中已存在历史主题 entry--main--massage--sendmessage,如果需要把该主题接入平台,那么事件中心中的 event 定义就应该是 entry:main:massage:sendmessage
target 字段的定义(模版)
{
"timeoutSeconds": 60,
"request":[{
"method": "POST", //支持 POST、DELETE、PUT
"url": "http://example.com/request", // 远端的Url
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
}
}]
}
request 字段简要说明 详细
[
{
"method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
"url": "http://example.com/request", // 远端的Url
"body": { // body 内容,以${}形式传递系统参数
"recipe": "${workflow.input.recipe}", // ${workflow.input.recipe}是系统预先准备好的变量库里的变量
"length": {
"width": 100,
"height": 100
}
},
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
}
},
{
"method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
"url": "service://example/request1?aaa=${$.aaa}&bbb=${$.bbb}", // 远端的Url
"body": "${body}|json", //如果要将JSON字符串替换为JSON,是这种写法。注:该value中不能同时出现多个${body}|json,否则会报错提示
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json",
"Cookie":"dfsdhdgertgdfy5rtg"
}
}
]
特别说明:
- 里面存在多个 Url 时,只针对 request 模式是同步调用,它们的执行是按照数组顺序进行的 Url1 ——> Url2 ——> Url3
- 其中 Url1 的返回值可以用在 Url2 上
- 前一个请求的结果可以作为后一个请求的 JSONPath 使用,如:responses[0].xxx.xx responses[1].xxx1.xx1 使用,包装在变量里面就是 ${$.responses} 或 ${responses[0].xxx.xx responses[1].xxx1.xx1},如果是 json 替换,那么就是 ${$.responses}|json
- 如果环境变量是 json,那么也可以使用 JSONPath,如:${
<variable>
$.responses} 、${<variable>
responses[0].xxx.xx}
cloudEvent 剖析
字段 | 字段说明 | 字段类型 | 备注 |
---|---|---|---|
id | 事件Id,唯一标识 | String | |
source | 发送事件的源,也就是调用者 | String | |
type | 与消息massageInfo中的event保持一致 | String | |
specversion | 版本号 | String | |
datacontenttype | 这次支持三种 application/json、application/jsonArray、text/plain | String | |
data | 事件中要发送的数据内容 | String | |
time | 时间 | String | |
subject | 主题 | String |
// cloudEvent在request body中的使用
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
"body": "${cloudEvent}|json" // 是被抽取一个变量,可以使用requests的语法对cloudEvent进行引用
}]
完整 trigger 列表示例
"trigger": [
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 100
}
},
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b1",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "service://message/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 100
}
},
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b2",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world" //可为空
},
"method": "POST",
"url": "service:/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 100
}
}
]
注:上述 url 支持 service 协议
- service://oa/main/api ——内网调用 oa 服务的 /main/api
- service://{serviceName}/main/api ——内网调用当前服务的 /main/api
- service:/main/api ——内网调用当前服务的 /main/api,不写host部分
- http(s)\://www.baidu.com/main/api
- service://{serviceGateway}/oa/main/api ——服务网关,特指 entry的 kong2
${cloudEvent} 是事件中心 eventing 的系统环境变量
如果要在开发环境下进行联调,那么这里的 headers 头需要透传,获取头的方法有
Map<String, String> extHeaders = ServiceUtil.getExtHeaders(SpringWebUtil.getRequest());
// 将此处的 header 拷贝到上方的 trigger 列表 JSON 中的 headers,即可实现开发环境联调测试
String header = JSON.toJSONString(extHeaders);
更复杂的多个request一起使用的场景可以参考文档:前端消息通讯 centrifugoService ——》通过 trigger 直接对接前端
使用的先决条件
通过事件中心实现消息队列分为三个步骤
- 在应用服务管理中添加事件中心
- 配置好 mq 消息中间件的连接信息
- 中间件的部署,平台只内置了两种 redis 和 kafka
- 添加一个应用作为事件触发服务
- 添加 clients maven 依赖
- 写事件发送逻辑
- 添加一个应用作为事件接收服务
- 配置 trigger 列表
- 添加 clients maven 依赖
- 写事件接收逻辑
事件中心的添加及配置
基本流程是:租户管理员登录——》打开应用/服务管理——》添加事件中心——》配置 mq 中间件环境变量——》注册到门户
添加事件中心
配置中间件环境变量
注:
- redis 和 kafka 是平台内置的中间件。
- redis 模式如果选择了,那么平台 rds 将为事件中心分配,所以中间件默认模式是 redis,如果用户想自行定义,请注意 redis 版本必须大于5.
- kafka 模式需要手动添加 kafkasimple 集群应用,如果用户想自定义,则需自己手动配置 kafka 集群这个环境变量。
- natsstreaming 平台并没有内置,所以需要事先用户安装好 NATS 服务端,然后再通过 NATS 环境变量配置连接信息。
- mqtt 平台也没有内置,用户需要事先安装好 mqtt 服务端,然后再通过 MQTT 环境变量即可连接上。
事件接收端
Maven依赖
<dependency>
<groupId>com.justep</groupId>
<artifactId>clients</artifactId>
<version>1.0.0</version>
</dependency>
这里平台已自带 clients.jar, pom 中可以不加入
编写代码接收消息
添加一个服务请求,参数是事件中心传过来的事件参数
- 请求中接收的参数是事件触发过来的消息体,json 格式
- 请求必须返回 ControllerResult<?>,以通知事件中心事件接收情况。包括两种情况:
- ControllerResult 中 succeed 的含义,通知事件中心事件已成功收到
- ControllerResult 中 failed 的含义,通知事件中心收到的事件有问题,需要重新派发事件
示例代码如下
import com.justep.clients.eventing.vo.ControllerResult;
import com.alibaba.fastjson.JSON;
public String eventReceive(String cloudEvent) throws Exception {
System.out.println("事件参数--"+cloudEvent);
ControllerResult<Object> controllerResult;
boolean condition = true;
if (condition) {
controllerResult = ControllerResult.succeed("成功...");
}else {
// 如果用户不想使用高可靠策略,可以使用 ControllerResult.succeed("失败...");
// 因为一旦返回给事件中心failed,事件中心就会理解为要进行高可靠保证,将该条消息进行记录,然后加入重试队列,进入重试机制。
controllerResult = ControllerResult.failed("失败...");
}
String resultString = JSON.toJSONString(controllerResult);
return resultString;
}
配置 trigger 定义事件主题
定时和事件设置
打开事件派发设置菜单
配置 trigger 列表
配置后保存好,整个 trigger 会存储到 UI2\pcx\timerAndTrigger.serviceMetaInfo.json 文件中,如果想要更多细节的定义,请到此处定义。
事件发送端
Maven依赖
<dependency>
<groupId>com.justep</groupId>
<artifactId>clients</artifactId>
<version>1.0.0</version>
</dependency>
这里平台已自带 clients.jar, pom 中可以不加入
编写消息发送代码
添加服务
核心是 massageInfo 对象,包括主题 event、来源 source 和数据 data。示例代码如下:
import com.justep.clients.eventing.EventBridgeClient;
import com.justep.clients.eventing.api.EventBridgeApi;
import com.justep.clients.eventing.vo.MassageInfo;
public String eventSend(String context) throws Exception {
// 事件,定义详细请看上方 triggerInfo 的含义
String event = "text:main:fuwu:eventreceive";
// clients包中的工具类,想了解细节请查看源码
EventBridgeApi eventBridgeApi = EventBridgeClient.getInstance();
// 核心 massageInfo 对象
MassageInfo<String> massageInfo = MassageInfo.<String>builder()
.event(event)
.source("eventsendchensc6")
.data(context)
.build();
eventBridgeApi.send(massageInfo);
return "事件发送成功...";
}
pubsub 高可靠策略
无论是网络问题或者业务逻辑主动异常 ControllerResult.failed("失败...")
,都会对该条事件都进行高可靠保证,会进入事先配置好的dapr超时策略、重试策略、熔断策略。(该配置未对外开发,如果用户需要可以自行定制事件中心即可)
# Pubsub高可靠保障策略如下:
# 官方参考文档:https://docs.dapr.io/zh-hans/operations/resiliency/policies/
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: myresiliency
spec:
policies:
# 超时
timeouts:
important: 120s
# 重试:
retries:
retryForever:
policy: exponential
maxInterval: 15s
maxRetries: -1
circuitBreakers:
pubsubCB:
maxRequests: 1
interval: 60s
timeout: 120s
trip: consecutiveFailures > 8
# 目标 Pubsub
targets:
components:
eventBridges:
outbound: # sidecar 到 mq中间
timeout: important
retry: retryForever
circuitBreaker: pubsubCB
inbound: # sidecar 到 app
timeout: important
retry: retryForever
circuitBreaker: pubsubCB
注:这里的 dapr 超时策略 timeout 与 requests 的超时 timeout 的关系和优先级
- dapr 的 timeout 是指 sidecar 调用 eventBridge 的超时。
- requests 的 timeout 是指 eventBridge 调用 trigger 里面所配置的 target url 的超时。
- requests 的 timeout 由平台托管,定为60s,由于复杂性目前未对外开放,所以建议用户的业务代码逻辑阻塞时长不能大于60s,如果大于60s,业务代码逻辑请使用异步执行。
- 整个流转过程:sidecar 进程订阅到消息——》调用 eventBridge 组件进程——》根据 trigger 配置路由——》调用到目标业务服务