异步 job 篇
- 异步 job 专为耗时作业准备,
- 异步 job 获取工作作业状态的方式有两种,一种是 checkUrl 方式,一种是 feedbackUrl 的方式,这两种方式的原理参考《技术架构和运行原理》
- 异步 job 的 requests 请求中最后一个作业请求的业务作业过程不能同步阻塞请求,必须将该业务作业过程委派给另外一个线程或进程执行,然后自身进行快速请求响应来完成异步的动作。
checkUrl 方式
checkurl 的主要特性,即获取作业的进度或状态是主动的,由任务调度主动调用 work 进行获取
定义 job
如下 job 为例
[
{
"active": 1,
"async": true,
// check 的频率,单位是秒(s)
"checkRateSeconds": 4,
"checkUrl": {
"headers": {
"hello": "system"
},
"method": "GET",
// check轮询work服务获取作业状态,这里必须要根据schedulingJobId、schedulingExecuteNumber两个参数获取
"url": "service://jobswork/main/provideCheckUrl/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
},
"creator": "001",
"creatorName": "service-client",
"jobDescription": "走起...",
"jobId": "20220125",
"jobName": "job测试checkURL",
"notification": [
// stateEquals为空,那么所有结果集都走此策略进行通知
{
// 通知内容的body,check到的内容,然后由此处通知出去,引用通知body变量schedulingNotifyBody
"body": "${schedulingNotifyBody}|json",
"headers": {
"hello": "system"
},
"method": "POST",
// 这里通知到Client端,携带schedulingJobId、schedulingExecuteNumber方便Client做业务处理
"url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
},
// stateEquals=error1,那么error1的通知结果集将走此策略(error1的通知结果集是由checkUrl返回)
{
"stateEquals": "error1",
"body": "${schedulingNotifyBody}|json",
"headers": {
"hello": "system"
},
"method": "POST",
"url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
}
],
"qos": 0,
"request": [
{
"body": {
"classSize": 2,
"teacher": "陈同学",
"student": [
{
"number": 1,
"name": "${schedulingJobId}",
"age": 40
},
{
"number": 8,
"name": "${schedulingJobId}",
"age": 30
}
],
"describe": "师大附中..."
},
"formCommit": false,
"headers": {
"hello": "system"
},
"method": "POST",
"responseHeader": false,
// 第一个request
"url": "service://jobswork/main/resultHandleSchool"
},
{
"body": {
"school": "${$.student}|json" // 第一个request返回响应的内容,然后取里面student
},
"formCommit": false,
"headers": {
"hello": "system"
},
"method": "POST",
"responseHeader": false,
// 第二个请求,真正的作业接口
"url": "service://jobswork/main/workHandleStr"
}
],
"retry": [
// 作业状态为500时进入该重试策略
{
"backoffRate": 1.5,
"errorEquals": "500",
"intervalSeconds": 2,
"maxAttempts": 2
},
// 作业状态为timeout时进入的重试策略
{
"backoffRate": 2.0,
"errorEquals": "timeout",
"intervalSeconds": 5,
"maxAttempts": 2
},
// check得到的error码
{
"backoffRate": 2.0,
"errorEquals": "error1",
"intervalSeconds": 3,
"maxAttempts": 2
}
],
// 一次任务执行的超时时间默认60s,不等同于http请求超时时间,http请求超时时间由平台托管为60s
"timeoutSeconds": 60,
"timer": {
"dueTime": "2s",
"period": "5m"
}
}
]
work 端提供的 request 作业
注:requests 请求中最后一个作业请求的业务作业过程不能同步阻塞请求,必须将该业务作业过程委派给另外一个线程或进程执行,然后自身进行快速请求响应来完成异步的动作。
@RestController
public class WorkController {
public static Logger log = Logger.getLogger(WorkController.class);
// 第一个request请求
@PostMapping( "/main/resultHandleSchool")
public School resultHandleSchool(@RequestBody School school){
log.info("resultHandleSchool body-->" + school.toString());
return school;
}
// 第二个request请求
@PostMapping( "/main/workHandleStr")
public String workHandleStr(@RequestBody JSONObject school){
if (school != null){
log.info("workHandleStr body-->" + school.toJSONString());
}
// 注:异步必须使用另外一个线程来委托执行作业,主线程必须马上同步响应返回
new Thread(()->{ // 线程内部执行业务作业
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}).start();
return "hello workHandleStr...";
}
}
work 端提供 checkUrl 逻辑
checkUrl 接口重点是根据 jobid(任务 Id)、executeNumber(执行次数)来返回作业的进度和状态。
@RestController
public class WorkController {
public static Logger log = Logger.getLogger(WorkController.class);
@GetMapping( "/main/provideCheckUrl/{jobid}")
public Results provideCheckUrl(@PathVariable String jobid,
@RequestParam String executeNumber){
log.info("provideCheckUrl jobid--->" + jobid);
log.info("provideCheckUrl executeNumber--->" + executeNumber);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String formattedDate = sdf.format(new Date());
return Results.builder().progress("100%")
.message("成功收到消息...").status(CommonState.failed)
.error("error1") //status=failed时,用户定义的error码,该error码可以在重试策略、notification通知里使用
.data(null)
.datetime(formattedDate)
.build();
}
}
client 端提供的 notification
client 端可以通过拿到的通知内容知晓 job 作业的进度和状态情况
@RestController
public class NotifierClientController {
public static Logger log = Logger.getLogger(NotifierClientController.class);
@PostMapping("/main/notifierClient/{jobId}")
public void notifierClient(@RequestBody HttpInvokeResult<JSONObject> results ,
@RequestHeader Map<String,Object> headers,
@PathVariable String jobId,
@RequestParam String executeNumber){
log.info("notifierClient==================");
log.info("jobId-->{}"+ jobId);
log.info("executeNumber-->{}"+ executeNumber);
// headers.forEach((key, value) -> {
// log.info(String.format("Header---> '%s' = %s", key, value));
// });
String jsonStr = JSON.toJSONString(results);
log.info("notify context--->\n" + jsonStr);
}
}
feedbackUrl 方式(推荐使用)
feedbackUrl 的主要特性,即获取作业的进度或状态是被动的,由 work 服务回调任务调度服务完成,所以 feedback 的状态和进度就比较实时和准确,这里推荐开发者多使用 feedbackUrl 的方式。
job 定义
feedbackUrl 模式下 job 定义中不能有 checkUrl 字段,否则将使用 checkUrl 的模式运行 job
[
{
"jobName": "job测试feedback",
"request": [
{
"headers": {
"name": "chensc"
},
"method": "POST",
"body": {
"classSize": 2,
"teacher": "陈奕迅",
"student": [
{
"number": 1,
"name": "${jobId}",
"age": 40
},
{
"number": 8,
"name": "${jobId}",
"age": 30
}
],
"describe": "师大附中..."
},
// 第一个request请求
"url": "service://jobswork/main/resultHandleSchool"
},
{
"headers": {
"name": "chensc"
},
"method": "POST",
"body": {
"school": "${$.student}|json"
},
// 第二个请求,真正的作业接口
"url": "service://jobswork/main/workHandleSchool?feedbackUrl=${schedulingFeedbackUrl}"
}
],
"creator": "001",
"creatorName": "jobswork本身",
"active": 1,
"async": true,
"jobId": "123456789",
"notification": [
// stateEquals为空,那么所有结果集都走此策略进行通知
{
"headers": {
"name": "chensc"
},
"body": "${schedulingNotifyBody}|json",
"method": "POST",
"url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
},
// stateEquals=error1,那么error1的通知结果集将走此策略(error1的通知结果集是由checkUrl返回)
{
"stateEquals": "error1",
"body": "${schedulingNotifyBody}|json",
"headers": {
"hello": "system"
},
"method": "POST",
"url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
}
],
"timer": {
"period": "0 0/5 * * * ?"
},
"qos": 0,
// 一次任务执行的超时时间默认60s,不等同于http请求超时时间,http请求超时时间由平台托管为60s
"timeoutSeconds": 60,
"jobDescription": "走起...",
"retry": [
{
"maxAttempts": 2,
"backoffRate": 1.5,
"errorEquals": "500",
"intervalSeconds": 2
},
{
"maxAttempts": 2,
"backoffRate": 2.0,
"errorEquals": "timeout",
"intervalSeconds": 5
},
// check得到的error码
{
"maxAttempts": 2,
"backoffRate": 2.0,
"errorEquals": "error1",
"intervalSeconds": 3
}
]
}
]
work 端提供的 request 作业
- 注:requests 请求中最后一个作业请求的业务作业过程不能同步阻塞请求,必须将该业务作业过程委派给另外一个线程或进程执行,然后自身进行快速请求响应来完成异步的动作。
- feedback 回调逻辑需要写在业务线程内,业务执行到什么进度或状态,那么就要回调什么。
@RestController
public class WorkController {
public static Logger log = Logger.getLogger(WorkController.class);
// 第一个request请求
@PostMapping( "/main/resultHandleSchool")
public School resultHandleSchool(@RequestBody School school){
log.info("resultHandleSchool body-->" + school.toString());
return school;
}
// 第二个request请求
@PostMapping( "/main/workHandleSchool")
public String workHandleSchool(@RequestParam String feedbackUrl, @RequestBody School school){
log.info("feedbackUrl-->" + feedbackUrl);
log.info("school——》" + JSON.toJSONString(school));
// 注:异步必须使用另外一个线程来委托执行作业,主线程必须马上同步响应返回
new Thread(()->{ // 线程内部执行作业
// 阻塞5后再进行回调
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
callbackRun(feedbackUrl);
}).start();
return "successful";
}
// 进行回调scheduling服务
private void callbackRun(String callbackUrl) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String formattedDate = sdf.format(new Date());
Results content = Results.builder().status(CommonState.finalSucceed).data(null)
.message("任务结束最终成功").progress("100%").datetime(formattedDate).build();
// Results content = Results.builder().progress("100%")
// .message("成功收到消息...").status(CommonState.failed)
// .error("error1") //status=failed时,用户定义的error码,该error码可以在重试策略、notification通知里使用
// .data(null)
// .datetime(formattedDate)
// .build();
JSONObject body = (JSONObject) JSON.toJSON(content);
ServiceUtil.post(callbackUrl, body, null);
}
}
client 端提供的 notification
client 端可以通过拿到的通知内容知晓 job 作业的进度和状态情况
@RestController
public class NotifierClientController {
public static Logger log = Logger.getLogger(NotifierClientController.class);
@PostMapping("/main/notifierClient/{jobId}")
public void notifierClient(@RequestBody HttpInvokeResult<JSONObject> results ,
@RequestHeader Map<String,Object> headers,
@PathVariable String jobId,
@RequestParam String executeNumber){
log.info("notifierClient==================");
log.info("jobId-->{}"+ jobId);
log.info("executeNumber-->{}"+ executeNumber);
// headers.forEach((key, value) -> {
// log.info(String.format("Header---> '%s' = %s", key, value));
// });
String jsonStr = JSON.toJSONString(results);
log.info("notify context--->\n" + jsonStr);
}
}