技术架构、接口和运行原理
架构
图例解释说明:
- 存储层:主要负责存储作业的任务执行信息、日志信息、状态信息,以便中台更好的使用。 数据库存储层主要由两块构成:关系型数据库、非关系型数据库。
scheduler service: 它是整个分布式任务调度的核心,里面由调度扩展 jar 、dapr actor 组成,详情说明:
- dapr actor:关于 actor 实例的知识,请参考 https://docs.dapr.io/zh-hans/developing-applications/building-blocks/actors/actors-overview/ 。dapr actor 支持弹性扩容缩容、故障转移、作业单片一致性等功能特性。
- 调度扩展 jar :它主要做 dapr actor 调度能力不足的补充和 scheduler 对外能力的暴露,它还负责给用户应用内(非分布式)开发提供调度模版
分布式任务调度中台:主要承担人机可视化交互的功能,用户可以在上面进行的操作:
- 创建/配置/删除任务
- 选择超时策略
- 配置失败重试策略
- 配置调用任务服务的定时触发
- 查看任务的运作过程日志
- 查询任务的执行信息
- 统计任务的执行情况
- ........
- 程序调用者:任务调用触发不仅仅只是通过中台触发,任务触发逻辑还可以嵌入到不同的应用程序中。比如:工作流引擎、一些 job 执行、一些必要耦合于应用程序的任务。这些在这里都能够很好的拓展,因为触发条件很简单,都是 http restful 风格的请求。
服务接口
创建任务
POST /jobs/{jobid}
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
jobId | 任务ID,也就是 actorID,一个 job 就是一个 actor | 是、String、path | |
jobName | 任务名字 | 是、String、body | |
jobDescription | 任务描述 | 否、String、body | |
creator | 调用者,id 或唯一标识 | 是、String、body | |
creatorName | 调用者名 | 是、String、body | |
notification | 通知调用者 client 配置 | 否、JSON array、body | 这里用到 URL 模版,需事先约定 |
active | 1——调度执行 (默认) 0——不调度执行,不销毁 actor,只做 time、reminder、cron 下线 |
是、int、body | 通过 put 更新该参数暂停调度 |
timer | 调度策略:见下方 | 否、String、body | 不支持 liteJob |
request | 调用中心触发任务服务执行的 URL: 这里有两种调用模式同步和异步,如果是异步, 那么我们对目标任务服务的执行进度和状态,有两种获取方式: 1. 调度中心轮询目标任务服务指定的 URL 获取数据 2.目标任务服务回调调度中心暴露的 URL 进行通知 |
是、array JSON、body | 这里用到的 URL 模版,需事先约定 |
async | 是否异步: true——异步请求 false——同步请求(默认) |
否、Boolean、body | |
checkUrl | 调度中心轮询任务服务的 URL: 调度中心就会请求 checkUrl 轮询获取任务服务的执行 进度和状态。 如果 checkUrl 存在,那么异步回调不会走到 feedbackUrl |
否、String、body | 这里用到的 URL 模版,需事先约定 |
checkRateSeconds | check 检查轮询的周期或频率,单位 s,默认1s | 否,int、body | |
timeoutSeconds | job 任务超时时间: 调度中心调度一次目标任务服务的超时时间,单位 s,默认60s。 如果是异步,那么与 checkurl 或feedbackUrl 时长有关 注:它并不是 http 请求调用的超时时间,而是整个任务的超时时间 http 请求调用超时由服务内部托管为60s |
否、int、body | 详情在下方说明 |
retry | 重试策略 | 否,array JSON、body | |
maxRetries | 重试最大次数限制: 1.无论重试策略之间如何切换,都不能超过该最大重试次数限制 2.默认为5次 |
否、int、body | |
qos | 质量保证 0——》正常保证 1——》保证高可靠 |
必填、int、body | 不支持 liteJob |
parentJob | 父任务Id | 可选、String、body | |
parentJobExecution | 父任务执行数 | 可选、int、body | 与 parentJob 配合使用 |
notification 字段的定义(模版)
- JSON array
- 每一次 checkUrl 或 feedbackUrl 都会 notify 通知
- notify 也有重试策略(由平台内置):通知不成功后,将会进入通知重试策略,最大重试次数是3次,每次间隔20s,所以整个重试过程需耗时为60s
[
{
"method": "GET",
//通知状态匹配策略,当 stateEquals 为空时,全匹配。
//1.为异步时 checkUrl 或 feedbackUrl 得到的 status=failed 时 error 的值,如:inprogress,error1,error2... 。
//当 status=succeed 那就匹配 succeed,其中 finalSucceed 和 finalFailed 建议必须配置
//2.为同步时匹配 http 状态码 Status 200、500、404
//3.超时匹配 timeout
"stateEquals": "500",
"url": "http://example.com/notifier1",
"body": "${schedulingNotifyBody}|json",
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
}
},
{
"method": "GET",
"stateEquals": "timeout",
"url": "http://example.com/notifier3",
"body": "${schedulingNotifyBody}|json",
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
},
},
{
"method": "GET",
"stateEquals": "timeout",
"url": "service://serviceName/notifier3",
"body": "${schedulingNotifyBody}|json",
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
},
},
{
"method": "GET",
"stateEquals": "timeout",
"url": "service://main/notifier3",
"body": "${schedulingNotifyBody}|json",
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
},
}
]
注:
- service://oa/main/api 内网调用 oa 服务的 /main/api
- service://{serviceName}/main/api 内网调用自己的 /main/api
- service:/main/api 内网调用自己的 /main/api ,不写 host 部分
- http(s)://www.baidu.com/main/api
- service://{serviceGateway}/oa/main/api 服务网关,特指 entry 的 kong2
request 字段的定义(模版)
[
{
"method": "PUT", //请求方式,GET、POST、DELETE、PUT ......
"url": "http://example.com/est", // 远端的 Url
"body": {
"length": {
"width": 100,
"height": 100
}
},
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json"
},
"respEnvName": "ENV_VALUE" //该请求 Response 结果作为变量的命名,可为空。根据该命名可在其他环节进行变量引用,如:下一个request、notify里、checkUrl里。注这里该变量的生命周期只在该次execution中有效
},
{
"method": "POST", //请求方式,GET、POST、DELETE、PUT ......
"serviceName": "entry"
"url": "service://main/request1?aaa=${$.aaa}&bbb=${$.bbb}&callback=${schedulingFeedbackUrl}", // 远端的Url
//如果要将 JSON 字符串替换为 JSON,是这种写法。注:该 value 中不能同时出现多个 ${body}|json,否则会报错提示
"body": {"data":"${ENV_VALUE}|json"},
"headers": { // headers 内容
"Accept": "application/json",
"Content-Type": "application/json",
"Cookie":"dfsdhdgertgdfy5rtg"
}
}
]
注:
- 里面存在多个 Url 时,request 是同步调用,它们的执行是按照数组顺序进行的 Url1——》Url2——》Url3
- 其中 Url1 的返回值可以用在 Url2 上
- 使用的方式利用 JSONPath,如:responses[0].xxx.xx responses[1].xxx1.xx1 使用,
- 包装在变量里面就是 ${$.responses} 或 ${responses[0].xxx.xx responses[1].xxx1.xx1},
- 如果是 json 替换,那么就是 ${$.responses}|json
- 上面定义的 callback:主要是将回调 Url 传给作业服务,让作业服务在适当的时候将执行进度返回给任务调度
retry 字段的定义(模版)
[
{
"errorEquals": "error1",//状态码,1.同步调用由状态码一般如400/500/404/502 2.异步调用状态码一般由用户定义的 error 值定义 3.超时是 timeout
"intervalSeconds": 2, //整数,多久开始重试,也就是第一次重试之前等待的秒数(默认值为1)
"maxAttempts": 3, //正整数,表示重试的最大次数(默认值为2)。如果错误发生次数超过指定值,则停止重试,恢复正常错误处理
"backoffRate": 2.0 //每次尝试时重试间隔的倍数(默认值为2.0)。重试间隔是 timeoutSeconds
},{
"errorEquals": "timeout", // 同步 job 没有timeout,所以同步 job 超时的 ErrorEquals 请使用500
"intervalSeconds": 1,
"maxAttempts": 2,
"backoffRate": 2.0
},{
"errorEquals": "error2",
"intervalSeconds": 1,
"maxAttempts": 2,
"backoffRate": 2.0
},{
"errorEquals": "500",
"intervalSeconds": 1,
"maxAttempts": 2,
"backoffRate": 2.0
}
]
checkUrl 模板 (只支持 GET 请求)
{
"headers": {
"name": "chensc",
},
"method": "GET",
"url": "service://serviceName/main/provideCheckUrl/${jobId}?executeNumber=${executeNumber}"
}
timer 字段的定义(模版)
- one_time (一次调度)
如果不配 timer 或者 dueTime period 其中之一为空。就走 one_time 语义是立刻执行一次,然后结束
- timeing (指定开始时间的周期调度)
"timer":{
// 时间格式支持 time duration format、ISO 8601 duration format 和 固定的时间如:2023-10-01 09:00:01
"dueTime":"10s",
"period":"R4/PT3S" // 时间格式支持 time duration format 和 ISO 8601 duration format
}
// 使用示例
1.2023-10-01 09:00:01 执行一次
"timer":{
"dueTime":"2023-10-01 09:00:01",
"period":"0s"
}
2.2023-10-01 09:00:01 开始每隔5分钟执行一次
"timer":{
"dueTime":"2023-10-01 09:00:01",
"period":"5m"
}
3.任务创建后1小时后开始执行,并且每隔5分钟一次
"timer":{
"dueTime":"1h",
"period":"5m"
}
- cron (cron 表达式的调度)
"timer":{
// 如果是 cron 表达式,那么 dueTime 将不生效
"dueTime":"0s",
"period":"0 */1 * * * ?" // 支持 cron 表达式
}
// 使用示例
1.每隔一分钟执行一次
"timer":{
"dueTime":"0s",
"period":"0 */1 * * * ?"
}
2.2023-10-01 09:00:01 执行一次
"timer":{
"dueTime":"0s",
"period":"1 0 9 1 10 ? 2023"
}
- external (外部自定义调度托管)
"timer":{
// 时间格式支持 time duration format、ISO 8601 duration format 和 固定的时间如:2023-10-1 09:00:01,一般设为0s
"dueTime":"0s",
"period":"external" //允许手工触发多次
}
注:
- cron 如: 0 /1 ?
- time duration format 格式——》如 0h0m2s、2s、3m、2h30m、1d2h0m2s
- ISO 8601 duration format 格式——》PT2H30M、P1DT1H20M10S
- 固定的时间如:2023-10-1 09:00:01
qos 字段解释(高可靠质量保证)
质量保证:目前针对 notify 和 execution 进行高可靠质量保证,保证模式两种:0——》正常保证、1——》高可靠质量保证
- 正常保证:
为节约内存空间,check 过程采用 timer,ExecutionContext 不持久化到 redis
- 高可靠质量保证(目标最终成功):
全程采用 reminder,包括 check 过程,然后 ExecutionContext 持久化保存于 redis,当宕机崩溃恢复后执行过程还能按之前 ExecutionContext 进行执行。
平台后期可能会加强 notify 过程,将采用强事务控制(也就是必须通知成功),所有重试不成功的都去数据库跑批。
注:notify 强事务控制是指——》当 Retry 重试不成功后,将 notify 内容存储于数据库,然后再用 job 作业执行重发,达到最终成功的效果。
响应数据格式如下:
{
"resp_code": 200, // 200 表示正常、其他失败
"resp_msg": null // 错误提示消息
}
查询任务信息
GET /jobs/{jobid}
请求参数
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
jobId | 任务 ID,也就是 actorID | 是、String |
暂停任务/重新启动
PUT /jobs/{jobid}
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
jobId | 任务 ID,也就是 actorID | 是、String、path | |
active | 1——调度执行 (默认) 0——不调度执行,不销毁 actor,只做 timer、corn 下线,深度停止 |
是、int、Param |
响应数据格式如下:
{
"resp_code": 200,
"resp_msg": null
}
更新并执行
POST /jobs/{jobId}/update
- 如果 job 不存在则会抛出异常
- body 内容请参考上方的“创建任务”
删除 job 任务
- 彻底删除 actor
DELETE /job/{jobid}
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
jobId | 任务 ID,也就是 actorID | 是、String、path |
notifier (调度中心通知调用者)
- job 作业执行过程中,对调用者进行进度或状态的通知
- 异步 job 作业时进度或状态的通知,是由 checkUrl、feedbackUrl 返回
POST /xxx/notifier/{jobId}
// 请求 body:
{
"httpStatus": "200", // 这个是 http 的返回状态码
"response": { // 任务服务 work 返回或回调的内容,所以里面内容由任务服务 work 决定
"status": "failed", //任务状态: failed——失败 succeed——成功 finalFailed——最终失败 finalSucceed——最终成功
"message": "执行MySQL...", // 提示信息
"error": "error1", //status=failed 时,用户定义的 error 码
"progress": "30%", //任务进度,返回形式,20/60 或者 30%
"datetime": "2022-10-26 10:23:13",
"data": {} //返回的业务数据
}
}
request(触发目标任务服务的URL模版)
开发者在 request 字段中定义
POST /xxx/request/xxx
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
都由开发者定义 |
checkUrl(调度中心轮询作业服务)
轮询获取任务执行的进度和状态
GET /xxx/xxx/{jobId}
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
jobId | 任务 ID,也就是 actorID | 是、String、path | |
executeNumber | 执行次数 | 是、int,Param |
响应数据格式如:
{
"status":"failed", //任务状态: failed——失败 succeed——成功 为空表示正在执行
"message": "执行MySQL...", // 提示信息
"error":"error1", //status=failed 时,用户定义的 error 码,方便在重试策略里使用
"progress":"30%", //任务进度,返回形式,20/60 或者 30%
"datetime": "2022-10-26 10:23:13",
"data":{} //返回的业务数据
}
feedbackUrl(作业服务回调调度中心)
任务服务回调通知调度中心(调度中心事先定义)
在 request 中通过环境变量引入,这个 Url 引入两种,一种是微服务内部,一种是外部
POST /jobs/{jobId}/feedback/{requestCounts}
请求 body
{
"status":"failed", //任务状态: failed——失败 succeed——成功 为空表示正在执行
"message": "执行MySQL...", // 提示信息
"error":"error1", //status 为 failed 时,取用户定义的 error 码,方便在重试策略里使用
"progress":"30%", //任务进度,返回形式,20/60 或者 30%,
"datetime": "2022-10-26 10:23:13",
"data":{} //返回的业务数据
}
响应数据格式如下:
{
"resp_code": 200, // 200 表示正常、其他失败
"resp_msg": null, // 错误提示消息
}
feedbackUrl 是在 request 模版中由调度中心传递过来的,可参见上方的 request 字段中 callback=${schedulingFeedbackUrl} 的定义
对某任务进行单次触发
POST /jobs/{jobId}/trigger
语义:临时单次触发某一任务
body 参数是 Map
清理 Job 即包括相关的 execution、notification
DELETE /main/schedule/clean
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
cleanDestroyJob | 是否清理销毁的 Job | 否、Boolean、Param | 默认 true |
retainExecutionCount | 保留的执行条目数 | 否、int,Param | 默认 10 |
直接获取子任务 job 的日志(scheduling 提供)
事先已知父任务 Id 和父任务执行次数的情况下,取的是最新的日志
GET /main/getChildJobLogs
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
parentJobId | 父任务 Id | 是、String、Param | |
execution | 父任务执行次数 | 是、int,Param |
根据任务 Id 获取执行实例集合(scheduling 提供)
GET /main/execution
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
jobId | 任务 Id | 是、String、Param |
根据执行 id 获取子任务 job 的日志(scheduling 提供)
该接口需与上面的 /main/execution 配合使用
GET /main/getChildJobLogsByExecutionId
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
executionId | 执行 Id | 是、String、Param |
根据父任务销毁所有的子任务包括自己(scheduling 提供)
DELETE /main/destroyByParentJob
字段 | 字段说明 | 是否必填/类型 | 备注 |
---|---|---|---|
parentJobId | 父任务 Id | 是、String、Param |
timer 列表
特性主要是将 job 定义融入用户代码,以防止任务调度崩溃而导致的 job 丢失,所以使用 timer 将更加安全。timer 具体请在 ide 中的“定时和事件设置”里进行配置。
"timer": [{
"jobName": "job测试2",
"request": [{
"headers": {
"name": "chensc"
},
"method": "POST",
"body": {
"classSize": 2,
"teacher": "陈世超",
"student": [{
"number": 1,
"name": "${jobId}",
"age": 40
},
{
"number": 8,
"name": "${jobId}",
"age": 30
}
],
"describe": "云师大附中..."
},
"url": "service://jobswork/main/resultHandleSchool?feedbackUrl=${schedulingFeedbackUrl}"
},
{
"headers": {
"name": "chensc"
},
"method": "POST",
"body": {
"school": "${$.student}|json"
},
"url": "service://jobswork/main/workHandleStr?feedbackUrl=${schedulingFeedbackUrl}"
}
],
"creator": "001",
"creatorName": "service-client",
"active": 1,
"async": true,
"jobId": "20220125",
"notification": [{
"headers": {
"name": "chensc"
},
"body": "${schedulingNotifyBody}|json",
"method": "POST",
"url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
}],
"timer": {
"period": "0 0/5 * * * ?",
"dueTime": "2s",
"ttl": "5d3s"
},
"qos": 0,
"timeoutSeconds": 15,
"jobDescription": "走起...",
"checkRateSeconds": 4,
"checkUrl": {
"headers": {
"name": "chensc"
},
"method": "GET",
"url": "service://jobswork/main/provideCheckUrl/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
},
"retry": [{
"maxAttempts": 2,
"backoffRate": 1.5,
"errorEquals": "500",
"intervalSeconds": 2
},
{
"maxAttempts": 2,
"backoffRate": 2.0,
"errorEquals": "timeout",
"intervalSeconds": 5
},
{
"maxAttempts": 2,
"backoffRate": 2.0,
"errorEquals": "error1",
"intervalSeconds": 3
}
]
}]
timer 配置转化为 job 作业的原理
- 发布后注册到门户,此时 timer 将随之注册到门户的服务列表中,主要存储在 manager_service 表的 timers 字段里面。
- 任务调度中自带了一个系统级别的 job 作业 check-timers,该作业的逻辑是每隔120s 扫描一次注册到门户的 timer,如果有新的 timer 变化,就会进行创建或更新 job 作业。
- 这样整个 job 作业资源都来源于用户代码资源,从而保证了 job 资源的可靠性。
任务调度系统变量
这些系统变量可在 request、notification、checkUrl 中引用
- schedulingFeedbackUrl:回调的 URL feedbackUrl
- schedulingJobId :job 任务的 id
- schedulingExecuteNumber:执行次数
- schedulingRequestCounts:请求数
- schedulingNotifyBody:通知内容,格式是 JSON
支持系统环境变量,如在 request、notification、checkUrl 中可使用 env.sh 中的环境变量
扩展
actor 超时、重试、熔断策略
当 sidecar 与 app 之间出现网络问题或者调用异常问题时,都会进入预先配置好的 dapr 超时策略、重试策略、熔断策略。(该配置未对外开发,如果用户需要可以自行定制任务调度即可)
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: myresiliency
spec:
# 配置细节请参考官网
# 官网 https://docs.dapr.io/zh-hans/operations/resiliency/policies/
policies:
# 超时
timeouts:
important: 120s
# 重试
retries:
largeResponse:
policy: constant
duration: 60s
maxRetries: 10
# 熔断
circuitBreakers:
actorCB:
maxRequests: 1
interval: 60s
timeout: 120s
trip: consecutiveFailures >= 5
# 目标actor
targets:
actors:
job_scheduler:
timeout: important
retry: largeResponse
circuitBreaker: actorCB
circuitBreakerScope: both
circuitBreakerCacheSize: 5000
注:这里的 dapr 超时策略 timeout 与 requests 的超时 timeout 的关系和优先级
- dapr 的 timeout 是指 sidecar 进程调用 jobs 模块的超时。
- requests 的 timeout 是指 jobs 调用 timer 里面所配置的 target url 的超时。
- requests 的 timeout 由平台托管,定为60s,由于复杂性目前未对外开放,所以建议用户的业务代码逻辑阻塞时长不能大于60s,如果大于60s,业务代码逻辑请使用异步执行。