后端事件推送 eventBridge

透析 trigger

trigger 即是事件主题的定义,和事件路由规则的定义。

triggerInfo 对象属性

字段 字段说明 类型 备注
id 唯一标识 String
event 事件 String 建议定义规范:服务名:模块名:类名:方法名
describe 描述 String 非必填可以选
target 用户配置的目标模版 array、JSON 包括目标的地址、参数、body

event 字段说明

  • event 的定义如下: entry:main:massage:sendmessage
  • 真正映射到 mqService 中的主题为 entry--main--massage--sendmessage,
  • 转换原则是把 : 转为 --

注:转为的 entry--main--massage--sendmessage 形式的主题(是中间件 mqService 中的主题),如果需要事件中心与中间件 mqService 直接对接,那么需要这样做,例如:kafka 中已存在历史主题 entry--main--massage--sendmessage,如果需要把该主题接入平台,那么事件中心中的 event 定义就应该是 entry:main:massage:sendmessage

target 字段的定义(模版)

{
  "timeoutSeconds": 60,
  "request":[{
    "method": "POST", //支持 POST、DELETE、PUT
    "url": "http://example.com/request", // 远端的Url
    "headers": { // headers 内容
      "Accept": "application/json",
      "Content-Type": "application/json"
    }
  }]
}

request 字段简要说明 详细

[
  {
        "method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
        "url": "http://example.com/request", // 远端的Url
        "body": { // body 内容,以${}形式传递系统参数
            "recipe": "${workflow.input.recipe}", // ${workflow.input.recipe}是系统预先准备好的变量库里的变量
            "length": {
                "width": 100,
                "height": 100
            }
        },
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        }
    },
    {
        "method": "POST", //什么请求方式,GET、POST、DELETE、PUT ......
        "url": "service://example/request1?aaa=${$.aaa}&bbb=${$.bbb}", // 远端的Url
        "body": "${body}|json", //如果要将JSON字符串替换为JSON,是这种写法。注:该value中不能同时出现多个${body}|json,否则会报错提示
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json",
            "Cookie":"dfsdhdgertgdfy5rtg"
        }
    }
]

特别说明:

  • 里面存在多个 Url 时,只针对 request 模式是同步调用,它们的执行是按照数组顺序进行的 Url1 ——> Url2 ——> Url3
  • 其中 Url1 的返回值可以用在 Url2 上
  • 前一个请求的结果可以作为后一个请求的 JSONPath 使用,如:responses[0].xxx.xx responses[1].xxx1.xx1 使用,包装在变量里面就是 ${$.responses} 或 ${responses[0].xxx.xx responses[1].xxx1.xx1},如果是 json 替换,那么就是 ${$.responses}|json
  • 如果环境变量是 json,那么也可以使用 JSONPath,如:${<variable>$.responses} 、${<variable>responses[0].xxx.xx}

cloudEvent 剖析

字段 字段说明 字段类型 备注
id 事件Id,唯一标识 String
source 发送事件的源,也就是调用者 String
type 与消息massageInfo中的event保持一致 String
specversion 版本号 String
datacontenttype 这次支持三种 application/json、application/jsonArray、text/plain String
data 事件中要发送的数据内容 String
time 时间 String
subject 主题 String
// cloudEvent在request body中的使用
"request": [{
    "headers": {
        "hello": "world"
    },
    "method": "POST",
    "url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
    "body": "${cloudEvent}|json" // 是被抽取一个变量,可以使用requests的语法对cloudEvent进行引用
}]

完整 trigger 列表示例

  "trigger": [
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50bb",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "http://newmessagechensc6-chensc-ide.trunk2.xcaas.com/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            "timeoutSeconds": 100
        }
      },
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b1",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world"
                },
                "method": "POST",
                "url": "service://message/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            "timeoutSeconds": 100
        }
      },
    {
        "id": "ce985329-d8f4-4c6c-9f4e-5597e89c50b2",
        "event": "service:main:massage:sendmessage",
        "describe": "发送消息描述说明",
        "target": {
            "request": [{
                "headers": {
                    "hello": "world" //可为空
                },
                "method": "POST",
                "url": "service:/main/subReceive",
                "body": "${cloudEvent}|json"
            }],
            "timeoutSeconds": 100
        }
      }
]

注:上述 url 支持 service 协议

  1. service://oa/main/api ——内网调用 oa 服务的 /main/api
  2. service://{serviceName}/main/api ——内网调用当前服务的 /main/api
  3. service:/main/api ——内网调用当前服务的 /main/api,不写host部分
  4. http(s)\://www.baidu.com/main/api
  5. service://{serviceGateway}/oa/main/api ——服务网关,特指 entry的 kong2

${cloudEvent} 是事件中心 eventing 的系统环境变量

如果要在开发环境下进行联调,那么这里的 headers 头需要透传,获取头的方法有

Map<String, String> extHeaders = ServiceUtil.getExtHeaders(SpringWebUtil.getRequest());
// 将此处的 header 拷贝到上方的 trigger 列表 JSON 中的 headers,即可实现开发环境联调测试
String header = JSON.toJSONString(extHeaders);

更复杂的多个request一起使用的场景可以参考文档:前端消息通讯 centrifugoService ——》通过 trigger 直接对接前端

使用的先决条件

通过事件中心实现消息队列分为三个步骤

  • 在应用服务管理中添加事件中心
    • 配置好 mq 消息中间件的连接信息
    • 中间件的部署,平台只内置了两种 redis 和 kafka
  • 添加一个应用作为事件触发服务
    • 添加 clients maven 依赖
    • 写事件发送逻辑
  • 添加一个应用作为事件接收服务
    • 配置 trigger 列表
    • 添加 clients maven 依赖
    • 写事件接收逻辑

事件中心的添加及配置

基本流程是:租户管理员登录——》打开应用/服务管理——》添加事件中心——》配置 mq 中间件环境变量——》注册到门户

添加事件中心

Alt text

配置中间件环境变量

Alt text

Alt text

Alt text

注:

  • redis 和 kafka 是平台内置的中间件。
  • redis 模式如果选择了,那么平台 rds 将为事件中心分配,所以中间件默认模式是 redis,如果用户想自行定义,请注意 redis 版本必须大于5.
  • kafka 模式需要手动添加 kafkasimple 集群应用,如果用户想自定义,则需自己手动配置 kafka 集群这个环境变量。
  • natsstreaming 平台并没有内置,所以需要事先用户安装好 NATS 服务端,然后再通过 NATS 环境变量配置连接信息。
  • mqtt 平台也没有内置,用户需要事先安装好 mqtt 服务端,然后再通过 MQTT 环境变量即可连接上。

事件接收端

Maven依赖

<dependency>
    <groupId>com.justep</groupId>
    <artifactId>clients</artifactId>
    <version>1.0.0</version>
</dependency>

这里平台已自带 clients.jar, pom 中可以不加入

编写代码接收消息

添加一个服务请求,参数是事件中心传过来的事件参数

Alt text

  • 请求中接收的参数是事件触发过来的消息体,json 格式
  • 请求必须返回 ControllerResult<?>,以通知事件中心事件接收情况。包括两种情况:
    • ControllerResult 中 succeed 的含义,通知事件中心事件已成功收到
    • ControllerResult 中 failed 的含义,通知事件中心收到的事件有问题,需要重新派发事件

示例代码如下

import com.justep.clients.eventing.vo.ControllerResult;
import com.alibaba.fastjson.JSON;

public String eventReceive(String cloudEvent) throws Exception {
    System.out.println("事件参数--"+cloudEvent);
    ControllerResult<Object> controllerResult;
    boolean condition = true;
    if (condition) {
        controllerResult = ControllerResult.succeed("成功...");
    }else {
        // 如果用户不想使用高可靠策略,可以使用  ControllerResult.succeed("失败...");
        // 因为一旦返回给事件中心failed,事件中心就会理解为要进行高可靠保证,将该条消息进行记录,然后加入重试队列,进入重试机制。
        controllerResult = ControllerResult.failed("失败...");
    }

    String resultString = JSON.toJSONString(controllerResult);
    return resultString;
}

配置 trigger 定义事件主题

定时和事件设置

Alt text

打开事件派发设置菜单

Alt text

配置 trigger 列表

Alt text

配置后保存好,整个 trigger 会存储到 UI2\pcx\timerAndTrigger.serviceMetaInfo.json 文件中,如果想要更多细节的定义,请到此处定义。

Alt text

事件发送端

Maven依赖

<dependency>
    <groupId>com.justep</groupId>
    <artifactId>clients</artifactId>
    <version>1.0.0</version>
</dependency>

这里平台已自带 clients.jar, pom 中可以不加入

编写消息发送代码

添加服务

Alt text

核心是 massageInfo 对象,包括主题 event、来源 source 和数据 data。示例代码如下:

import com.justep.clients.eventing.EventBridgeClient;
import com.justep.clients.eventing.api.EventBridgeApi;
import com.justep.clients.eventing.vo.MassageInfo;

public String eventSend(String context) throws Exception {
    // 事件,定义详细请看上方 triggerInfo 的含义
    String event = "text:main:fuwu:eventreceive";
    // clients包中的工具类,想了解细节请查看源码
    EventBridgeApi eventBridgeApi = EventBridgeClient.getInstance();
    // 核心 massageInfo 对象
    MassageInfo<String> massageInfo = MassageInfo.<String>builder()
            .event(event)
            .source("eventsendchensc6")
            .data(context)
            .build();
    eventBridgeApi.send(massageInfo);
    return "事件发送成功...";
}

pubsub 高可靠策略

无论是网络问题或者业务逻辑主动异常 ControllerResult.failed("失败..."),都会对该条事件都进行高可靠保证,会进入事先配置好的dapr超时策略、重试策略、熔断策略。(该配置未对外开发,如果用户需要可以自行定制事件中心即可)

# Pubsub高可靠保障策略如下:
# 官方参考文档:https://docs.dapr.io/zh-hans/operations/resiliency/policies/
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
spec:
  policies:
    # 超时
    timeouts:
      important: 120s
    # 重试:
    retries:
      retryForever:
        policy: exponential
        maxInterval: 15s
        maxRetries: -1
    circuitBreakers:
      pubsubCB:
        maxRequests: 1
        interval: 60s
        timeout: 120s
        trip: consecutiveFailures > 8
  # 目标 Pubsub
  targets:
    components:
      eventBridges:
        outbound: # sidecar 到 mq中间
          timeout: important
          retry: retryForever
          circuitBreaker: pubsubCB
        inbound: # sidecar 到 app
          timeout: important
          retry: retryForever
          circuitBreaker: pubsubCB

注:这里的 dapr 超时策略 timeout 与 requests 的超时 timeout 的关系和优先级

  • dapr 的 timeout 是指 sidecar 调用 eventBridge 的超时。
  • requests 的 timeout 是指 eventBridge 调用 trigger 里面所配置的 target url 的超时。
  • requests 的 timeout 由平台托管,定为60s,由于复杂性目前未对外开放,所以建议用户的业务代码逻辑阻塞时长不能大于60s,如果大于60s,业务代码逻辑请使用异步执行。
  • 整个流转过程:sidecar 进程订阅到消息——》调用 eventBridge 组件进程——》根据 trigger 配置路由——》调用到目标业务服务

results matching ""

    No results matching ""