后端事件推送 eventBridge
透析 trigger
trigger 即是事件主题的定义,和事件路由规则的定义。
triggerInfo 对象属性
| 字段 | 字段说明 | 类型 | 备注 |
|---|---|---|---|
| id | 唯一标识 | String | |
| event | 事件 | String | 建议定义规范:服务名:模块名:类名:方法名 |
| describe | 描述 | String | 非必填可以选 |
| target | 用户配置的目标模版 | Object | 包括目标的地址、参数、body、QL脚本 |
event 字段说明
- event 的定义如下:
entry:main:massage:sendmessage, - 真正映射到 mqService 中的主题为 entry--main--massage--sendmessage,
- 转换原则是把 : 转为 --
注:转为的 entry--main--massage--sendmessage 形式的主题(是中间件 mqService 中的主题),如果需要事件中心与中间件 mqService 直接对接,那么需要这样做,例如:kafka 中已存在历史主题 entry--main--massage--sendmessage,如果需要把该主题接入平台,那么事件中心中的 event 定义就应该是 entry:main:massage:sendmessage
target 字段(标准requestInfo模版)
{
"timeoutSeconds": 60,
"request":[{
"method": "POST", //支持 POST、DELETE、PUT
"url": "http://example.com/request", // 远端的Url
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
}
}]
}
request 字段说明 详细
[
{
"method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
"url": "http://example.com/request", // 远端的Url
"body": { // body 内容,以${}形式传递系统参数
"recipe": "${workflow.input.recipe}", // ${workflow.input.recipe}是系统预先准备好的变量库里的变量
"length": {
"width": 100,
"height": 100
}
},
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
}
},
{
"method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
"url": "service://example/request1?aaa=${$.aaa}&bbb=${$.bbb}", // 远端的Url
"body": "${body}|json", //如果要将JSON字符串替换为JSON,是这种写法。注:该value中不能同时出现多个${body}|json,否则会报错提示
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json",
"Cookie":"dfsdhdgertgdfy5rtg"
}
}
]
特别说明:
- 里面存在多个 Url 时,只针对 request 模式是同步调用,它们的执行是按照数组顺序进行的 Url1 ——> Url2 ——> Url3
- 其中 Url1 的返回值可以用在 Url2 上
- 前一个请求的结果可以作为后一个请求的 JSONPath 使用,如:responses[0].xxx.xx responses[1].xxx1.xx1 使用,包装在变量里面就是 ${$.responses} 或 ${responses[0].xxx.xx responses[1].xxx1.xx1},如果是 json 替换,那么就是 ${$.responses}|json
- 如果环境变量是 json,那么也可以使用 JSONPath,如:${
<variable>$.responses} 、${<variable>responses[0].xxx.xx}
target 支持QL脚本
QL request支持 详细
code:
// 导入的包必须放到顶部
import org.apache.commons.lang.StringUtils;
import com.justep.util.net.ServiceUrlUtil;
//========== 标准类型的request使用示例
requestInfo =
'{
"request": [{
"method": "POST",
"url": "service://message/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 60
}';
vars =
'{
"hello": "world"
}';
// 这里执行request函数逻辑
result = request(requestInfo,vars);
logger("响应body: " + result.responseBody);
logger("响应headers: " + result.responseHeaders);
// 这里必须返回事件接收的响应格式
res = '{
"resp_code": 200,
"resp_msg": "接收事件成功..."
}'
/*
res = '{
"resp_code": 500,
"resp_msg": "接收事件失败..."
}'
*/
cloudEvent 剖析
| 字段 | 字段说明 | 字段类型 | 备注 |
|---|---|---|---|
| id | 事件Id,唯一标识 | String | |
| source | 发送事件的源,也就是调用者 | String | |
| type | 事件类型,与消息massageInfo中的event保持一致 | String | |
| event | 事件 | String | |
| specversion | 版本号 | String | |
| datacontenttype | 这次支持三种 application/json、application/jsonArray、text/plain | String | |
| data | 事件中要发送的数据内容 | String | |
| time | 时间 | String | |
| subject | 主题 | String | |
| traceparent | String | ||
| traceid | String | ||
| tracestate | String | ||
| pubsubname | String |
// cloudEvent在request body中的使用
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
"body": "${cloudEvent}|json" // 是被抽取一个变量,可以使用requests的语法对cloudEvent进行引用
}]
完整 trigger 列表示例
[
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
"body": "${cloudEvent}|json"
}],
// 此处的超时时间设置建议小于 dapr 的自身的超时策略,dapr默认超时是120s
"timeoutSeconds": 30
}
},
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b1",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "service://message/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 30
}
},
{
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b2",
"event": "service:main:massage:sendmessage",
"describe": "发送消息描述说明",
"target": {
"request": [{
"headers": {
"hello": "world"
},
"method": "POST",
"url": "service:/main/subReceive",
"body": "${cloudEvent}|json"
}],
"timeoutSeconds": 30
}
}
]
注:上述 url 支持 service 协议
- service://oa/main/api ——内网调用 oa 服务的 /main/api
- service://{serviceName}/main/api ——内网调用当前服务的 /main/api
- service:/main/api ——内网调用当前服务的 /main/api,不写host部分
- http(s)\://www.baidu.com/main/api
- service://{serviceGateway}/oa/main/api ——服务网关,特指 entry的 kong2
${cloudEvent} 是事件中心 eventing 的系统环境变量
如果要在开发环境下进行联调,那么这里的 headers 头需要透传,获取头的方法有
Map<String, String> extHeaders = ServiceUtil.getExtHeaders(SpringWebUtil.getRequest());
// 将此处的 header 拷贝到上方的 trigger 列表 JSON 中的 headers,即可实现开发环境联调测试
String header = JSON.toJSONString(extHeaders);
更复杂的多个request一起使用的场景可以参考文档:前端消息通讯 centrifugoService ——》通过 trigger 直接对接前端
对外OpenAPI
POST /eventBridge/sendList
发送批量事件
// body
[{
"event": "事件唯一标识(必填)",
"source": "事件源头用于后期的事件追踪(可选)",
"data": "要发送的数据,可以是任何数据类型(必填)"
}, {
"event": "aaa",
"source": "serviceA",
"data": {}
}, {
"event": "bbb",
"source": "serviceB",
"data": []
}]
// 响应
{
"resp_code": 200,
"resp_msg": "发生事件成功..."
}
POST /eventBridge/send
发送单个事件
// body
{
"event": "aaa",
"source": "serviceA",
"data": {}
}
// 响应
{
"resp_code": 200,
"resp_msg": "发生事件成功..."
}
trigger 调用的业务接口
- 必须提供能够接受 cloudEvent 的 body,比如是POST、PUT之类的接口
- 必须要按规范响应,响应内容决定事件是否需要重试
// 成功接受事件,并告诉事件中心不必进行重试
{
"resp_code": 200,
"resp_msg": "接收事件成功..."
}
// 成功接受事件,并告诉事件中心要进行重试
{
"resp_code": 500,
"resp_msg": "接收事件失败..."
}
注:resp_code=500 表示要告诉事件中心进行事件重试策略
POST /eventBridge/registerUserTriggers
用户动态注册 trigger 列表
body元素参数:
| 字段 | 字段说明 | 类型 | 备注 |
|---|---|---|---|
| id | 唯一标识(可选) | String | 如果为空由后台自动生成 |
| event | 事件(必填) | String | 建议定义规范:服务名:模块名:类名:方法名 |
| describe | 描述(可选) | String | |
| target | 用户配置要触发的目标模版(必填) | Object | 包括目标的地址、参数、body、QL脚本 |
| creator | 创建者(可选) | String | |
| serviceId | 所属服务id(可选) | String | 如果为空后台会根据serviceName查询得到并填充此Id |
| serviceName | 所属服务的name(必填) | String | |
| serviceLabel | 所属服务的label(可选) | String | 如果为空后台会根据serviceName查询得到并填充此label |
// 示例
curl -X POST -H 'Content-Type: application/json' 'http://127.0.0.1/eventBridge/registerUserTriggers' -d '[{"event":"service:main:test1","describe":"又是一个新事件","creator":"陈陈","serviceName":"jobsclient","target":{"request":[{"headers":{"hello":"world"},"method":"POST","url":"http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive","body":"${cloudEvent}|json"}]}}]'
// 返回
{
"resp_code": 200,
"resp_msg": "动态注册用户trigger成功。最长2分钟生效"
}
死信队列
当事件异常进入重试策略并且最终失败后,所有事件消息都会进入死信队列,平台自身指定的死信事件名是 eventing:poison:messages 。
- 默认平台会把所有死信消息都存储管理到“死信监控页下”,最大容量存储 5000 条。注:此处平台只是临时存储,不保证高可靠
- 如果要保证高可靠,需要用户自定义管理死信队列,通过订阅
eventing:poison:messages事件来完成。
// 用户可以自定义注册trigger来订阅 eventing:poison:messages 事件,完成对死队列的管理
// trigger 定义示例如下
[{
// 自己的id
"id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
// 死信队列事件名
"event": "eventing:poison:messages",
"describe": "xxx死信队列事件",
// 自定义自己的target目标
"target": {
"request": [{
"method": "POST",
"url": "service://xxxService/main/poisonessages", // 用户自己处理死信消息的业务逻辑
"body": "${<cloudEvent>$.data}|json"
}],
"timeoutSeconds": 60
}
}]
dapr pubsub 高可靠策略
无论是网络问题或者业务逻辑主动异常 ControllerResult.failed("失败..."),都会对该条事件都进行高可靠保证,会进入事先配置好的dapr超时策略、重试策略、熔断策略。(该配置未对外开发,如果用户需要可以自行定制事件中心即可)
# Pubsub高可靠保障策略如下:
# 官方参考文档:https://docs.dapr.io/zh-hans/operations/resiliency/policies/
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: myresiliency
spec:
policies:
# 超时
timeouts:
important: 120s
# 重试:
retries:
retryForever:
policy: exponential
maxInterval: 15s
maxRetries: -1
circuitBreakers:
pubsubCB:
maxRequests: 1
interval: 60s
timeout: 120s
trip: consecutiveFailures > 8
# 目标 Pubsub
targets:
components:
eventBridges:
outbound: # sidecar 到 mq中间
timeout: important
retry: retryForever
circuitBreaker: pubsubCB
inbound: # sidecar 到 app
timeout: important
retry: retryForever
circuitBreaker: pubsubCB
# 如下提供了通过环境变量动态运行时修改的手段
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: myresiliency
spec:
policies:
timeouts:
important: ${DAPR_RESILIENCY_TIMEOUTS}
retries:
retryForever:
policy: exponential
maxInterval: ${DAPR_RESILIENCY_RETRIES_MAXINTERVAL}
maxRetries: ${DAPR_RESILIENCY_RETRIES_MAXRETRIES}
circuitBreakers:
pubsubCB:
maxRequests: ${DAPR_RESILIENCY_CIRCUITBREAKERS_MAXREQUESTS}
interval: ${DAPR_RESILIENCY_CIRCUITBREAKERS_INTERVAL}
timeout: ${DAPR_RESILIENCY_CIRCUITBREAKERS_TIMEOUT}
trip: ${DAPR_RESILIENCY_CIRCUITBREAKERS_TRIP}
targets:
components:
eventBridges:
outbound:
timeout: important
retry: retryForever
circuitBreaker: pubsubCB
inbound:
timeout: important
retry: retryForever
circuitBreaker: pubsubCB
注:这里的 dapr 超时策略 timeout 与 requests 的超时 timeout 的关系和优先级
- dapr 的 timeout 是指 sidecar 调用 eventBridge 的超时。
- requests 的 timeout 是指 eventBridge 调用 trigger 里面所配置的 target url 的超时。
- requests 的 timeout 由平台托管,定为60s,由于复杂性目前未对外开放,所以建议用户的业务代码逻辑阻塞时长不能大于60s,如果大于60s,业务代码逻辑请使用异步执行。
- 整个流转过程:sidecar 进程订阅到消息——》调用 eventBridge 组件进程——》根据 trigger 配置路由——》调用到目标业务服务
问题排查和处理
- 由于trigger接收端一直异常或者出现逻辑变动时,导致事件消息一直不断重发,可以采用以下方式直接删除中间件主题来解决
# redis中间件
# 批量删除 eventing 前缀的所有主题
redis-cli -a fsMCbzaX_48 -h redis-svc.newdao-console -p 6259 KEYS "eventing*" | xargs redis-cli -a fsMCbzaX_48 -h redis-svc.newdao-console -p 6259 DEL
# kafka中间件
# 批量删除 eventing 前缀的所有主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list |grep "^eventing" | while read line; do
echo "删除主题: $line"
kafka-topics.sh --delete --topic $line --bootstrap-server 127.0.0.1:9092
done