后端事件推送 eventBridge

透析 trigger

trigger 即是事件主题的定义,和事件路由规则的定义。

triggerInfo 对象属性

字段 字段说明 类型 备注
id 唯一标识 String
event 事件 String 建议定义规范:服务名:模块名:类名:方法名
describe 描述 String 非必填可以选
target 用户配置的目标模版 Object 包括目标的地址、参数、body、QL脚本

event 字段说明

  • event 的定义如下: entry:main:massage:sendmessage
  • 真正映射到 mqService 中的主题为 entry--main--massage--sendmessage,
  • 转换原则是把 : 转为 --

注:转为的 entry--main--massage--sendmessage 形式的主题(是中间件 mqService 中的主题),如果需要事件中心与中间件 mqService 直接对接,那么需要这样做,例如:kafka 中已存在历史主题 entry--main--massage--sendmessage,如果需要把该主题接入平台,那么事件中心中的 event 定义就应该是 entry:main:massage:sendmessage

target 字段(标准requestInfo模版)

{
  "timeoutSeconds": 60,
  "request":[{
    "method": "POST", //支持 POST、DELETE、PUT
    "url": "http://example.com/request", // 远端的Url
    "headers": { // headers 内容
      "Accept": "application/json",
      "Content-Type": "application/json"
    }
  }]
}

request 字段说明 详细

[
  {
        "method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
        "url": "http://example.com/request", // 远端的Url
        "body": { // body 内容,以${}形式传递系统参数
            "recipe": "${workflow.input.recipe}", // ${workflow.input.recipe}是系统预先准备好的变量库里的变量
            "length": {
                "width": 100,
                "height": 100
            }
        },
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        }
    },
    {
        "method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
        "url": "service://example/request1?aaa=${$.aaa}&bbb=${$.bbb}", // 远端的Url
        "body": "${body}|json", //如果要将JSON字符串替换为JSON,是这种写法。注:该value中不能同时出现多个${body}|json,否则会报错提示
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json",
            "Cookie":"dfsdhdgertgdfy5rtg"
        }
    }
]

特别说明:

  • 里面存在多个 Url 时,只针对 request 模式是同步调用,它们的执行是按照数组顺序进行的 Url1 ——> Url2 ——> Url3
  • 其中 Url1 的返回值可以用在 Url2 上
  • 前一个请求的结果可以作为后一个请求的 JSONPath 使用,如:responses[0].xxx.xx responses[1].xxx1.xx1 使用,包装在变量里面就是 ${$.responses} 或 ${responses[0].xxx.xx responses[1].xxx1.xx1},如果是 json 替换,那么就是 ${$.responses}|json
  • 如果环境变量是 json,那么也可以使用 JSONPath,如:${<variable>$.responses} 、${<variable>responses[0].xxx.xx}

target 支持QL脚本

QL request支持 详细

code:
// 导入的包必须放到顶部
import org.apache.commons.lang.StringUtils;
import com.justep.util.net.ServiceUrlUtil;

//========== 标准类型的request使用示例
requestInfo =
'{
    "request": [{
        "method": "POST",
        "url": "service://message/main/subReceive",
        "body": "${cloudEvent}|json"
    }],
    "timeoutSeconds": 60
}';

vars =
'{
    "hello": "world"
}';

// 这里执行request函数逻辑
result = request(requestInfo,vars);
logger("响应body: " + result.responseBody);
logger("响应headers: " + result.responseHeaders);

// 这里必须返回事件接收的响应格式
res = '{
    "resp_code": 200,
    "resp_msg": "接收事件成功..."
}'
/*
res = '{
    "resp_code": 500,
    "resp_msg": "接收事件失败..."
}'
*/

cloudEvent 剖析

字段 字段说明 字段类型 备注
id 事件Id,唯一标识 String
source 发送事件的源,也就是调用者 String
type 事件类型,与消息massageInfo中的event保持一致 String
event 事件 String
specversion 版本号 String
datacontenttype 这次支持三种 application/json、application/jsonArray、text/plain String
data 事件中要发送的数据内容 String
time 时间 String
subject 主题 String
traceparent String
traceid String
tracestate String
pubsubname String
// cloudEvent在request body中的使用
"request": [{
    "headers": {
        "hello": "world"
    },
    "method": "POST",
    "url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
    "body": "${cloudEvent}|json" // 是被抽取一个变量,可以使用requests的语法对cloudEvent进行引用
}]

完整 trigger 列表示例


  [
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            // 此处的超时时间设置建议小于 dapr 的自身的超时策略,dapr默认超时是120s
            "timeoutSeconds": 30
        }
      },
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b1",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "service://message/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            "timeoutSeconds": 30
        }
      },
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b2",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "service:/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            "timeoutSeconds": 30
        }
      }
  ]

注:上述 url 支持 service 协议

  1. service://oa/main/api ——内网调用 oa 服务的 /main/api
  2. service://{serviceName}/main/api ——内网调用当前服务的 /main/api
  3. service:/main/api ——内网调用当前服务的 /main/api,不写host部分
  4. http(s)\://www.baidu.com/main/api
  5. service://{serviceGateway}/oa/main/api ——服务网关,特指 entry的 kong2

${cloudEvent} 是事件中心 eventing 的系统环境变量

如果要在开发环境下进行联调,那么这里的 headers 头需要透传,获取头的方法有

Map<String, String> extHeaders = ServiceUtil.getExtHeaders(SpringWebUtil.getRequest());
// 将此处的 header 拷贝到上方的 trigger 列表 JSON 中的 headers,即可实现开发环境联调测试
String header = JSON.toJSONString(extHeaders);

更复杂的多个request一起使用的场景可以参考文档:前端消息通讯 centrifugoService ——》通过 trigger 直接对接前端

对外OpenAPI

POST /eventBridge/sendList

发送批量事件

// body
[{
    "event": "事件唯一标识(必填)",
    "source": "事件源头用于后期的事件追踪(可选)",
    "data": "要发送的数据,可以是任何数据类型(必填)"
}, {
    "event": "aaa",
    "source": "serviceA",
    "data": {}
}, {
    "event": "bbb",
    "source": "serviceB",
    "data": []
}]
// 响应
{
    "resp_code": 200,
    "resp_msg": "发生事件成功..."
}

POST /eventBridge/send

发送单个事件

// body
{
    "event": "aaa",
    "source": "serviceA",
    "data": {}
}
// 响应
{
    "resp_code": 200,
    "resp_msg": "发生事件成功..."
}

trigger 调用的业务接口

  • 必须提供能够接受 cloudEvent 的 body,比如是POST、PUT之类的接口
  • 必须要按规范响应,响应内容决定事件是否需要重试
// 成功接受事件,并告诉事件中心不必进行重试
{
    "resp_code": 200,
    "resp_msg": "接收事件成功..."
}
// 成功接受事件,并告诉事件中心要进行重试
{
    "resp_code": 500,
    "resp_msg": "接收事件失败..."
}

注:resp_code=500 表示要告诉事件中心进行事件重试策略

POST /eventBridge/registerUserTriggers

用户动态注册 trigger 列表

body元素参数:

字段 字段说明 类型 备注
id 唯一标识(可选) String 如果为空由后台自动生成
event 事件(必填) String 建议定义规范:服务名:模块名:类名:方法名
describe 描述(可选) String
target 用户配置要触发的目标模版(必填) Object 包括目标的地址、参数、body、QL脚本
creator 创建者(可选) String
serviceId 所属服务id(可选) String 如果为空后台会根据serviceName查询得到并填充此Id
serviceName 所属服务的name(必填) String
serviceLabel 所属服务的label(可选) String 如果为空后台会根据serviceName查询得到并填充此label
// 示例
curl -X POST -H 'Content-Type: application/json' 'http://127.0.0.1/eventBridge/registerUserTriggers' -d '[{"event":"service:main:test1","describe":"又是一个新事件","creator":"陈陈","serviceName":"jobsclient","target":{"request":[{"headers":{"hello":"world"},"method":"POST","url":"http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive","body":"${cloudEvent}|json"}]}}]'

// 返回
{
    "resp_code": 200,
    "resp_msg": "动态注册用户trigger成功。最长2分钟生效"
}

死信队列

当事件异常进入重试策略并且最终失败后,所有事件消息都会进入死信队列,平台自身指定的死信事件名是 eventing:poison:messages

  • 默认平台会把所有死信消息都存储管理到“死信监控页下”,最大容量存储 5000 条。注:此处平台只是临时存储,不保证高可靠
  • 如果要保证高可靠,需要用户自定义管理死信队列,通过订阅 eventing:poison:messages 事件来完成。
// 用户可以自定义注册trigger来订阅 eventing:poison:messages 事件,完成对死队列的管理
// trigger 定义示例如下
[{
    // 自己的id
    "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
    // 死信队列事件名
    "event": "eventing:poison:messages",
    "describe": "xxx死信队列事件",
    // 自定义自己的target目标
    "target": {
        "request": [{
            "method": "POST",
            "url": "service://xxxService/main/poisonessages", // 用户自己处理死信消息的业务逻辑
            "body": "${<cloudEvent>$.data}|json"
        }],
        "timeoutSeconds": 60
    }
}]

dapr pubsub 高可靠策略

无论是网络问题或者业务逻辑主动异常 ControllerResult.failed("失败..."),都会对该条事件都进行高可靠保证,会进入事先配置好的dapr超时策略、重试策略、熔断策略。(该配置未对外开发,如果用户需要可以自行定制事件中心即可)

# Pubsub高可靠保障策略如下:
# 官方参考文档:https://docs.dapr.io/zh-hans/operations/resiliency/policies/
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
spec:
  policies:
    # 超时
    timeouts:
      important: 120s
    # 重试:
    retries:
      retryForever:
        policy: exponential
        maxInterval: 15s
        maxRetries: -1
    circuitBreakers:
      pubsubCB:
        maxRequests: 1
        interval: 60s
        timeout: 120s
        trip: consecutiveFailures > 8
  # 目标 Pubsub
  targets:
    components:
      eventBridges:
        outbound: # sidecar 到 mq中间
          timeout: important
          retry: retryForever
          circuitBreaker: pubsubCB
        inbound: # sidecar 到 app
          timeout: important
          retry: retryForever
          circuitBreaker: pubsubCB
# 如下提供了通过环境变量动态运行时修改的手段
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
spec:
  policies:
    timeouts:
      important: ${DAPR_RESILIENCY_TIMEOUTS}
    retries:
      retryForever:
        policy: exponential
        maxInterval: ${DAPR_RESILIENCY_RETRIES_MAXINTERVAL}
        maxRetries: ${DAPR_RESILIENCY_RETRIES_MAXRETRIES}
    circuitBreakers:
      pubsubCB:
        maxRequests: ${DAPR_RESILIENCY_CIRCUITBREAKERS_MAXREQUESTS}
        interval: ${DAPR_RESILIENCY_CIRCUITBREAKERS_INTERVAL}
        timeout: ${DAPR_RESILIENCY_CIRCUITBREAKERS_TIMEOUT}
        trip: ${DAPR_RESILIENCY_CIRCUITBREAKERS_TRIP}
  targets:
    components:
      eventBridges:
        outbound:
          timeout: important
          retry: retryForever
          circuitBreaker: pubsubCB
        inbound:
          timeout: important
          retry: retryForever
          circuitBreaker: pubsubCB

注:这里的 dapr 超时策略 timeout 与 requests 的超时 timeout 的关系和优先级

  • dapr 的 timeout 是指 sidecar 调用 eventBridge 的超时。
  • requests 的 timeout 是指 eventBridge 调用 trigger 里面所配置的 target url 的超时。
  • requests 的 timeout 由平台托管,定为60s,由于复杂性目前未对外开放,所以建议用户的业务代码逻辑阻塞时长不能大于60s,如果大于60s,业务代码逻辑请使用异步执行。
  • 整个流转过程:sidecar 进程订阅到消息——》调用 eventBridge 组件进程——》根据 trigger 配置路由——》调用到目标业务服务

问题排查和处理

  • 由于trigger接收端一直异常或者出现逻辑变动时,导致事件消息一直不断重发,可以采用以下方式直接删除中间件主题来解决
# redis中间件
# 批量删除 eventing 前缀的所有主题
redis-cli -a fsMCbzaX_48 -h redis-svc.newdao-console -p 6259 KEYS "eventing*" | xargs redis-cli -a fsMCbzaX_48 -h redis-svc.newdao-console -p 6259 DEL

# kafka中间件
# 批量删除 eventing 前缀的所有主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list |grep "^eventing" | while read line; do
  echo "删除主题: $line"
  kafka-topics.sh --delete --topic $line --bootstrap-server 127.0.0.1:9092
done

results matching ""

    No results matching ""