技术架构、接口和运行原理

架构

图例解释说明:

  1. 存储层:主要负责存储作业的任务执行信息、日志信息、状态信息,以便中台更好的使用。 数据库存储层主要由两块构成:关系型数据库、非关系型数据库。
  2. scheduler service: 它是整个分布式任务调度的核心,里面由调度扩展 jar 、dapr actor 组成,详情说明:

  3. 分布式任务调度中台:主要承担人机可视化交互的功能,用户可以在上面进行的操作:

    • 创建/配置/删除任务
    • 选择超时策略
    • 配置失败重试策略
    • 配置调用任务服务的定时触发
    • 查看任务的运作过程日志
    • 查询任务的执行信息
    • 统计任务的执行情况
    • ........
  4. 程序调用者:任务调用触发不仅仅只是通过中台触发,任务触发逻辑还可以嵌入到不同的应用程序中。比如:工作流引擎、一些 job 执行、一些必要耦合于应用程序的任务。这些在这里都能够很好的拓展,因为触发条件很简单,都是 http restful 风格的请求。

服务接口

创建任务

POST /jobs/{jobid}

字段 字段说明 是否必填/类型 备注
jobId 任务ID,也就是 actorID,一个 job 就是一个 actor 是、String、path
jobName 任务名字 是、String、body
jobDescription 任务描述 否、String、body
creator 调用者,id 或唯一标识 是、String、body
creatorName 调用者名 是、String、body
notification 通知调用者 client 配置 否、JSON array、body 这里用到 URL 模版,需事先约定
active 1——调度执行 (默认)
0——不调度执行,不销毁 actor,只做 time、reminder、cron 下线
是、int、body 通过 put 更新该参数暂停调度
timer 调度策略:见下方 否、String、body 不支持 liteJob
request 调用中心触发任务服务执行的 URL:
这里有两种调用模式同步和异步,如果是异步,
那么我们对目标任务服务的执行进度和状态,有两种获取方式:
1. 调度中心轮询目标任务服务指定的 URL 获取数据
2.目标任务服务回调调度中心暴露的 URL 进行通知
是、array JSON、body 这里用到的 URL 模版,需事先约定
async 是否异步:
true——异步请求
false——同步请求(默认)
否、Boolean、body
checkUrl 调度中心轮询任务服务的 URL:
调度中心就会请求 checkUrl 轮询获取任务服务的执行
进度和状态。
如果 checkUrl 存在,那么异步回调不会走到 feedbackUrl
否、String、body 这里用到的 URL 模版,需事先约定
checkRateSeconds check 检查轮询的周期或频率,单位 s,默认1s 否,int、body
timeoutSeconds job 任务超时时间:
调度中心调度一次目标任务服务的超时时间,单位 s,默认60s。
如果是异步,那么与 checkurl 或feedbackUrl 时长有关
注:它并不是 http 请求调用的超时时间,而是整个任务的超时时间
http 请求调用超时由服务内部托管为60s
否、int、body 详情在下方说明
retry 重试策略 否,array JSON、body
maxRetries 重试最大次数限制:
1.无论重试策略之间如何切换,都不能超过该最大重试次数限制
2.默认为5次
否、int、body
qos 质量保证
0——》正常保证
1——》保证高可靠
必填、int、body 不支持 liteJob
parentJob 父任务Id 可选、String、body
parentJobExecution 父任务执行数 可选、int、body 与 parentJob 配合使用

notification 字段的定义(模版)

  • JSON array
  • 每一次 checkUrl 或 feedbackUrl 都会 notify 通知
  • notify 也有重试策略(由平台内置):通知不成功后,将会进入通知重试策略,最大重试次数是3次,每次间隔20s,所以整个重试过程需耗时为60s
[
    {
        "method": "GET",
         //通知状态匹配策略,当 stateEquals 为空时,全匹配。 
         //1.为异步时 checkUrl 或 feedbackUrl 得到的 status=failed 时 error 的值,如:inprogress,error1,error2... 。
         //当 status=succeed 那就匹配 succeed,其中 finalSucceed 和 finalFailed 建议必须配置 
         //2.为同步时匹配 http 状态码 Status 200、500、404 
         //3.超时匹配 timeout
        "stateEquals": "500",
        "url": "http://example.com/notifier1", 
        "body": "${schedulingNotifyBody}|json",
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        }
    },
    {
        "method": "GET",
        "stateEquals": "timeout",
        "url": "http://example.com/notifier3", 
        "body": "${schedulingNotifyBody}|json",
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        },
    },
    {
        "method": "GET",
        "stateEquals": "timeout",
        "url": "service://serviceName/notifier3", 
        "body": "${schedulingNotifyBody}|json",
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        },
    },
    {
        "method": "GET",
        "stateEquals": "timeout",
        "url": "service://main/notifier3", 
        "body": "${schedulingNotifyBody}|json",
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        },
    }
]

注:

  1. service://oa/main/api 内网调用 oa 服务的 /main/api
  2. service://{serviceName}/main/api 内网调用自己的 /main/api
  3. service:/main/api 内网调用自己的 /main/api ,不写 host 部分
  4. http(s)://www.baidu.com/main/api
  5. service://{serviceGateway}/oa/main/api 服务网关,特指 entry 的 kong2

request 字段的定义(模版)

requests 详情

[
    {
        "method": "PUT", //请求方式,GET、POST、DELETE、PUT ......
        "url": "http://example.com/est", // 远端的 Url
        "body": {
            "length": {
                "width": 100,
                "height": 100
            }
        },
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json"
        },
        "respEnvName": "ENV_VALUE" //该请求 Response 结果作为变量的命名,可为空。根据该命名可在其他环节进行变量引用,如:下一个request、notify里、checkUrl里。注这里该变量的生命周期只在该次execution中有效
    },
    {
        "method": "POST", //请求方式,GET、POST、DELETE、PUT ......
        "serviceName": "entry"
        "url": "service://main/request1?aaa=${$.aaa}&bbb=${$.bbb}&callback=${schedulingFeedbackUrl}", // 远端的Url
         //如果要将 JSON 字符串替换为 JSON,是这种写法。注:该 value 中不能同时出现多个 ${body}|json,否则会报错提示
        "body": {"data":"${ENV_VALUE}|json"},
        "headers": { // headers 内容
            "Accept": "application/json",
            "Content-Type": "application/json",
            "Cookie":"dfsdhdgertgdfy5rtg"
        }
    }
]

注:

  • 里面存在多个 Url 时,request 是同步调用,它们的执行是按照数组顺序进行的 Url1——》Url2——》Url3
  • 其中 Url1 的返回值可以用在 Url2 上
  • 使用的方式利用 JSONPath,如:responses[0].xxx.xx responses[1].xxx1.xx1 使用,
  • 包装在变量里面就是 ${$.responses} 或 ${responses[0].xxx.xx responses[1].xxx1.xx1},
  • 如果是 json 替换,那么就是 ${$.responses}|json
  • 上面定义的 callback:主要是将回调 Url 传给作业服务,让作业服务在适当的时候将执行进度返回给任务调度

retry 字段的定义(模版)

[
    {
        "errorEquals": "error1",//状态码,1.同步调用由状态码一般如400/500/404/502 2.异步调用状态码一般由用户定义的 error 值定义 3.超时是 timeout
        "intervalSeconds": 2,  //整数,多久开始重试,也就是第一次重试之前等待的秒数(默认值为1)
        "maxAttempts": 3, //正整数,表示重试的最大次数(默认值为2)。如果错误发生次数超过指定值,则停止重试,恢复正常错误处理
        "backoffRate": 2.0  //每次尝试时重试间隔的倍数(默认值为2.0)。重试间隔是 timeoutSeconds
    },{
        "errorEquals": "timeout", // 同步 job  没有timeout,所以同步 job 超时的 ErrorEquals 请使用500
        "intervalSeconds": 1,
        "maxAttempts": 2,
        "backoffRate": 2.0
    },{
        "errorEquals": "error2",
        "intervalSeconds": 1,
        "maxAttempts": 2,
        "backoffRate": 2.0
    },{
        "errorEquals": "500",
        "intervalSeconds": 1,
        "maxAttempts": 2,
        "backoffRate": 2.0
    }
]

checkUrl 模板 (只支持 GET 请求)

{
    "headers": {
        "name": "chensc",
    },
    "method": "GET",
    "url": "service://serviceName/main/provideCheckUrl/${jobId}?executeNumber=${executeNumber}"
}

timer 字段的定义(模版)

  • one_time (一次调度)

如果不配 timer 或者 dueTime period 其中之一为空。就走 one_time 语义是立刻执行一次,然后结束

  • timeing (指定开始时间的周期调度)
"timer":{
  // 时间格式支持 time duration format、ISO 8601 duration format 和 固定的时间如:2023-10-01 09:00:01
  "dueTime":"10s", 
  "period":"R4/PT3S" // 时间格式支持 time duration format 和 ISO 8601 duration format
}
// 使用示例
1.2023-10-01 09:00:01 执行一次
"timer":{
  "dueTime":"2023-10-01 09:00:01", 
  "period":"0s"
}
2.2023-10-01 09:00:01 开始每隔5分钟执行一次
"timer":{
  "dueTime":"2023-10-01 09:00:01", 
  "period":"5m"
}
3.任务创建后1小时后开始执行,并且每隔5分钟一次
"timer":{
  "dueTime":"1h", 
  "period":"5m"
}
  • cron (cron 表达式的调度)
"timer":{
   // 如果是 cron 表达式,那么 dueTime 将不生效
  "dueTime":"0s", 
  "period":"0 */1 * * * ?"  // 支持 cron 表达式
}
// 使用示例
1.每隔一分钟执行一次
"timer":{
  "dueTime":"0s", 
  "period":"0 */1 * * * ?"
}
2.2023-10-01 09:00:01 执行一次
"timer":{
  "dueTime":"0s", 
  "period":"1 0 9 1 10 ? 2023"
}
  • external (外部自定义调度托管)
"timer":{
  // 时间格式支持 time duration format、ISO 8601 duration format 和 固定的时间如:2023-10-1 09:00:01,一般设为0s
  "dueTime":"0s",
  "period":"external"  //允许手工触发多次
}

注:

  • cron 如: 0 /1 ?
  • time duration format 格式——》如 0h0m2s、2s、3m、2h30m、1d2h0m2s
  • ISO 8601 duration format 格式——》PT2H30M、P1DT1H20M10S
  • 固定的时间如:2023-10-1 09:00:01

qos 字段解释(高可靠质量保证)

质量保证:目前针对 notify 和 execution 进行高可靠质量保证,保证模式两种:0——》正常保证、1——》高可靠质量保证

  1. 正常保证:

为节约内存空间,check 过程采用 timer,ExecutionContext 不持久化到 redis

  1. 高可靠质量保证(目标最终成功):

全程采用 reminder,包括 check 过程,然后 ExecutionContext 持久化保存于 redis,当宕机崩溃恢复后执行过程还能按之前 ExecutionContext 进行执行。

平台后期可能会加强 notify 过程,将采用强事务控制(也就是必须通知成功),所有重试不成功的都去数据库跑批。

注:notify 强事务控制是指——》当 Retry 重试不成功后,将 notify 内容存储于数据库,然后再用 job 作业执行重发,达到最终成功的效果。

响应数据格式如下:

{
    "resp_code": 200,     // 200 表示正常、其他失败
    "resp_msg": null      // 错误提示消息
}

查询任务信息

GET /jobs/{jobid}

请求参数

字段 字段说明 是否必填/类型 备注
jobId 任务 ID,也就是 actorID 是、String

暂停任务/重新启动

PUT /jobs/{jobid}

字段 字段说明 是否必填/类型 备注
jobId 任务 ID,也就是 actorID 是、String、path
active 1——调度执行 (默认)
0——不调度执行,不销毁 actor,只做 timer、corn 下线,深度停止
是、int、Param

响应数据格式如下:

{
    "resp_code": 200,
    "resp_msg": null 
}

更新并执行

POST /jobs/{jobId}/update

  • 如果 job 不存在则会抛出异常
  • body 内容请参考上方的“创建任务”

删除 job 任务

  • 彻底删除 actor

DELETE /job/{jobid}

字段 字段说明 是否必填/类型 备注
jobId 任务 ID,也就是 actorID 是、String、path

notifier (调度中心通知调用者)

  • job 作业执行过程中,对调用者进行进度或状态的通知
  • 异步 job 作业时进度或状态的通知,是由 checkUrl、feedbackUrl 返回

POST /xxx/notifier/{jobId}

// 请求 body:
{
    "httpStatus": "200", // 这个是 http 的返回状态码
    "response": { // 任务服务 work 返回或回调的内容,所以里面内容由任务服务 work 决定
        "status": "failed", //任务状态: failed——失败  succeed——成功 finalFailed——最终失败 finalSucceed——最终成功
        "message": "执行MySQL...", // 提示信息
        "error": "error1", //status=failed 时,用户定义的 error 码
        "progress": "30%", //任务进度,返回形式,20/60 或者 30%
        "datetime": "2022-10-26 10:23:13",
        "data": {} //返回的业务数据
    }
}

request(触发目标任务服务的URL模版)

开发者在 request 字段中定义

POST /xxx/request/xxx

字段 字段说明 是否必填/类型 备注
都由开发者定义

checkUrl(调度中心轮询作业服务)

轮询获取任务执行的进度和状态

GET /xxx/xxx/{jobId}

字段 字段说明 是否必填/类型 备注
jobId 任务 ID,也就是 actorID 是、String、path
executeNumber 执行次数 是、int,Param

响应数据格式如:

{
    "status":"failed",  //任务状态: failed——失败  succeed——成功  为空表示正在执行
    "message": "执行MySQL...",   // 提示信息
    "error":"error1",   //status=failed 时,用户定义的 error 码,方便在重试策略里使用
    "progress":"30%",   //任务进度,返回形式,20/60 或者 30%
    "datetime": "2022-10-26 10:23:13",
    "data":{}   //返回的业务数据
}

feedbackUrl(作业服务回调调度中心)

任务服务回调通知调度中心(调度中心事先定义)

在 request 中通过环境变量引入,这个 Url 引入两种,一种是微服务内部,一种是外部

POST /jobs/{jobId}/feedback/{requestCounts}

请求 body

{
    "status":"failed",  //任务状态: failed——失败  succeed——成功  为空表示正在执行
    "message": "执行MySQL...",   // 提示信息
    "error":"error1",   //status 为 failed 时,取用户定义的 error 码,方便在重试策略里使用
    "progress":"30%",   //任务进度,返回形式,20/60 或者 30%,
    "datetime": "2022-10-26 10:23:13",
    "data":{}   //返回的业务数据
}

响应数据格式如下:

{
    "resp_code": 200,      // 200 表示正常、其他失败
    "resp_msg": null,      // 错误提示消息 
}

feedbackUrl 是在 request 模版中由调度中心传递过来的,可参见上方的 request 字段中 callback=${schedulingFeedbackUrl} 的定义

对某任务进行单次触发

POST /jobs/{jobId}/trigger

语义:临时单次触发某一任务

body 参数是 Map vars 可为空 null,是对该次任务植入环境变量 var,可在 request 阶段进行使用

清理 Job 即包括相关的 execution、notification

DELETE /main/schedule/clean

字段 字段说明 是否必填/类型 备注
cleanDestroyJob 是否清理销毁的 Job 否、Boolean、Param 默认 true
retainExecutionCount 保留的执行条目数 否、int,Param 默认 10

直接获取子任务 job 的日志(scheduling 提供)

事先已知父任务 Id 和父任务执行次数的情况下,取的是最新的日志

GET /main/getChildJobLogs

字段 字段说明 是否必填/类型 备注
parentJobId 父任务 Id 是、String、Param
execution 父任务执行次数 是、int,Param

根据任务 Id 获取执行实例集合(scheduling 提供)

GET /main/execution

字段 字段说明 是否必填/类型 备注
jobId 任务 Id 是、String、Param

根据执行 id 获取子任务 job 的日志(scheduling 提供)

该接口需与上面的 /main/execution 配合使用

GET /main/getChildJobLogsByExecutionId

字段 字段说明 是否必填/类型 备注
executionId 执行 Id 是、String、Param

根据父任务销毁所有的子任务包括自己(scheduling 提供)

DELETE /main/destroyByParentJob

字段 字段说明 是否必填/类型 备注
parentJobId 父任务 Id 是、String、Param

timer 列表

特性主要是将 job 定义融入用户代码,以防止任务调度崩溃而导致的 job 丢失,所以使用 timer 将更加安全。timer 具体请在 ide 中的“定时和事件设置”里进行配置。

"timer": [{
    "jobName": "job测试2",
    "request": [{
            "headers": {
                "name": "chensc"
            },
            "method": "POST",
            "body": {
                "classSize": 2,
                "teacher": "陈世超",
                "student": [{
                        "number": 1,
                        "name": "${jobId}",
                        "age": 40
                    },
                    {
                        "number": 8,
                        "name": "${jobId}",
                        "age": 30
                    }
                ],
                "describe": "云师大附中..."
            },
            "url": "service://jobswork/main/resultHandleSchool?feedbackUrl=${schedulingFeedbackUrl}"
        },
        {
            "headers": {
                "name": "chensc"
            },
            "method": "POST",
            "body": {
                "school": "${$.student}|json"
            },
            "url": "service://jobswork/main/workHandleStr?feedbackUrl=${schedulingFeedbackUrl}"
        }
    ],
    "creator": "001",
    "creatorName": "service-client",
    "active": 1,
    "async": true,
    "jobId": "20220125",
    "notification": [{
        "headers": {
            "name": "chensc"
        },
        "body": "${schedulingNotifyBody}|json",
        "method": "POST",
        "url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
    }],
    "timer": {
        "period": "0 0/5 * * * ?",
        "dueTime": "2s",
        "ttl": "5d3s"
    },
    "qos": 0,
    "timeoutSeconds": 15,
    "jobDescription": "走起...",
    "checkRateSeconds": 4,
    "checkUrl": {
        "headers": {
            "name": "chensc"
        },
        "method": "GET",
        "url": "service://jobswork/main/provideCheckUrl/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
    },
    "retry": [{
            "maxAttempts": 2,
            "backoffRate": 1.5,
            "errorEquals": "500",
            "intervalSeconds": 2
        },
        {
            "maxAttempts": 2,
            "backoffRate": 2.0,
            "errorEquals": "timeout",
            "intervalSeconds": 5
        },
        {
            "maxAttempts": 2,
            "backoffRate": 2.0,
            "errorEquals": "error1",
            "intervalSeconds": 3
        }
    ]
}]

timer 配置转化为 job 作业的原理

  1. 发布后注册到门户,此时 timer 将随之注册到门户的服务列表中,主要存储在 manager_service 表的 timers 字段里面。
  2. 任务调度中自带了一个系统级别的 job 作业 check-timers,该作业的逻辑是每隔120s 扫描一次注册到门户的 timer,如果有新的 timer 变化,就会进行创建或更新 job 作业。
  3. 这样整个 job 作业资源都来源于用户代码资源,从而保证了 job 资源的可靠性。

任务调度系统变量

这些系统变量可在 request、notification、checkUrl 中引用

  • schedulingFeedbackUrl:回调的 URL feedbackUrl
  • schedulingJobId :job 任务的 id
  • schedulingExecuteNumber:执行次数
  • schedulingRequestCounts:请求数
  • schedulingNotifyBody:通知内容,格式是 JSON

支持系统环境变量,如在 request、notification、checkUrl 中可使用 env.sh 中的环境变量

扩展

actor 超时、重试、熔断策略

当 sidecar 与 app 之间出现网络问题或者调用异常问题时,都会进入预先配置好的 dapr 超时策略、重试策略、熔断策略。(该配置未对外开发,如果用户需要可以自行定制任务调度即可)

apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
spec:
  # 配置细节请参考官网
  # 官网 https://docs.dapr.io/zh-hans/operations/resiliency/policies/
  policies:
    # 超时
    timeouts:
      important: 120s
    # 重试
    retries:
      largeResponse:
        policy: constant
        duration: 60s
        maxRetries: 10
    # 熔断
    circuitBreakers:
      actorCB:
        maxRequests: 1
        interval: 60s
        timeout: 120s
        trip: consecutiveFailures >= 5
  # 目标actor
  targets:
    actors:
      job_scheduler:
        timeout: important
        retry: largeResponse
        circuitBreaker: actorCB
        circuitBreakerScope: both
        circuitBreakerCacheSize: 5000

注:这里的 dapr 超时策略 timeout 与 requests 的超时 timeout 的关系和优先级

  • dapr 的 timeout 是指 sidecar 进程调用 jobs 模块的超时。
  • requests 的 timeout 是指 jobs 调用 timer 里面所配置的 target url 的超时。
  • requests 的 timeout 由平台托管,定为60s,由于复杂性目前未对外开放,所以建议用户的业务代码逻辑阻塞时长不能大于60s,如果大于60s,业务代码逻辑请使用异步执行。

results matching ""

    No results matching ""