异步 job 篇

  • 异步 job 专为耗时作业准备,
  • 异步 job 获取工作作业状态的方式有两种,一种是 checkUrl 方式,一种是 feedbackUrl 的方式,这两种方式的原理参考《技术架构和运行原理
  • 异步 job 的 requests 请求中最后一个作业请求的业务作业过程不能同步阻塞请求,必须将该业务作业过程委派给另外一个线程或进程执行,然后自身进行快速请求响应来完成异步的动作。

checkUrl 方式

checkurl 的主要特性,即获取作业的进度或状态是主动的,由任务调度主动调用 work 进行获取

定义 job

如下 job 为例

[
    {
        "active": 1,
        "async": true,
        // check 的频率,单位是秒(s)
        "checkRateSeconds": 4,
        "checkUrl": {
            "headers": {
                "hello": "system"
            },
            "method": "GET",
            // check轮询work服务获取作业状态,这里必须要根据schedulingJobId、schedulingExecuteNumber两个参数获取
            "url": "service://jobswork/main/provideCheckUrl/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
        },
        "creator": "001",
        "creatorName": "service-client",
        "jobDescription": "走起...",
        "jobId": "20220125",
        "jobName": "job测试checkURL",
        "notification": [
            // stateEquals为空,那么所有结果集都走此策略进行通知
            {
                // 通知内容的body,check到的内容,然后由此处通知出去,引用通知body变量schedulingNotifyBody
                "body": "${schedulingNotifyBody}|json",
                "headers": {
                    "hello": "system"
                },
                "method": "POST",
                // 这里通知到Client端,携带schedulingJobId、schedulingExecuteNumber方便Client做业务处理
                "url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
            },
            // stateEquals=error1,那么error1的通知结果集将走此策略(error1的通知结果集是由checkUrl返回)
            {
                "stateEquals": "error1",
                "body": "${schedulingNotifyBody}|json",
                "headers": {
                    "hello": "system"
                },
                "method": "POST",
                "url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
            }
        ],
        "qos": 0,
        "request": [
            {
                "body": {
                    "classSize": 2,
                    "teacher": "陈同学",
                    "student": [
                        {
                            "number": 1,
                            "name": "${schedulingJobId}",
                            "age": 40
                        },
                        {
                            "number": 8,
                            "name": "${schedulingJobId}",
                            "age": 30
                        }
                    ],
                    "describe": "师大附中..."
                },
                "formCommit": false,
                "headers": {
                    "hello": "system"
                },
                "method": "POST",
                "responseHeader": false,
                // 第一个request
                "url": "service://jobswork/main/resultHandleSchool"
            },
            {
                "body": {
                    "school": "${$.student}|json" // 第一个request返回响应的内容,然后取里面student
                },
                "formCommit": false,
                "headers": {
                    "hello": "system"
                },
                "method": "POST",
                "responseHeader": false,
                // 第二个请求,真正的作业接口
                "url": "service://jobswork/main/workHandleStr"
            }
        ],
        "retry": [
            // 作业状态为500时进入该重试策略
            {
                "backoffRate": 1.5,
                "errorEquals": "500",
                "intervalSeconds": 2,
                "maxAttempts": 2
            },
            // 作业状态为timeout时进入的重试策略
            {
                "backoffRate": 2.0,
                "errorEquals": "timeout",
                "intervalSeconds": 5,
                "maxAttempts": 2
            },
            // check得到的error码
            {
                "backoffRate": 2.0,
                "errorEquals": "error1",
                "intervalSeconds": 3,
                "maxAttempts": 2
            }
        ],
        // 一次任务执行的超时时间默认60s,不等同于http请求超时时间,http请求超时时间由平台托管为60s
        "timeoutSeconds": 60,
        "timer": {
            "dueTime": "2s",
            "period": "5m"
        }
    }
]

work 端提供的 request 作业

注:requests 请求中最后一个作业请求的业务作业过程不能同步阻塞请求,必须将该业务作业过程委派给另外一个线程或进程执行,然后自身进行快速请求响应来完成异步的动作。


@RestController
public class WorkController {

    public static Logger log = Logger.getLogger(WorkController.class);

    // 第一个request请求
    @PostMapping( "/main/resultHandleSchool")
    public School resultHandleSchool(@RequestBody School school){
        log.info("resultHandleSchool body-->" + school.toString());
        return school;
    }

    // 第二个request请求
    @PostMapping( "/main/workHandleStr")
    public String workHandleStr(@RequestBody JSONObject school){
        if (school != null){
            log.info("workHandleStr body-->" + school.toJSONString());
        }
        // 注:异步必须使用另外一个线程来委托执行作业,主线程必须马上同步响应返回
        new Thread(()->{ // 线程内部执行业务作业
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }).start();
        return "hello workHandleStr...";
    }
}

work 端提供 checkUrl 逻辑

checkUrl 接口重点是根据 jobid(任务 Id)、executeNumber(执行次数)来返回作业的进度和状态。

@RestController
public class WorkController {

    public static Logger log = Logger.getLogger(WorkController.class);

    @GetMapping( "/main/provideCheckUrl/{jobid}")
    public Results provideCheckUrl(@PathVariable String jobid,
                                   @RequestParam String executeNumber){
        log.info("provideCheckUrl jobid--->" + jobid);
        log.info("provideCheckUrl executeNumber--->" + executeNumber);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String formattedDate = sdf.format(new Date());
        return Results.builder().progress("100%")
                .message("成功收到消息...").status(CommonState.failed)
                .error("error1") //status=failed时,用户定义的error码,该error码可以在重试策略、notification通知里使用
                .data(null)
                .datetime(formattedDate)
                .build();
    }
}

client 端提供的 notification

client 端可以通过拿到的通知内容知晓 job 作业的进度和状态情况

@RestController
public class NotifierClientController {
    public static Logger log = Logger.getLogger(NotifierClientController.class);

   @PostMapping("/main/notifierClient/{jobId}")
   public void notifierClient(@RequestBody HttpInvokeResult<JSONObject> results ,
                              @RequestHeader Map<String,Object> headers,
                              @PathVariable String jobId,
                              @RequestParam String executeNumber){

        log.info("notifierClient==================");

        log.info("jobId-->{}"+ jobId);
        log.info("executeNumber-->{}"+ executeNumber);

//        headers.forEach((key, value) -> {
//            log.info(String.format("Header---> '%s' = %s", key, value));
//        });

        String jsonStr = JSON.toJSONString(results);
        log.info("notify context--->\n" + jsonStr);

    }
}

feedbackUrl 方式(推荐使用)

feedbackUrl 的主要特性,即获取作业的进度或状态是被动的,由 work 服务回调任务调度服务完成,所以 feedback 的状态和进度就比较实时和准确,这里推荐开发者多使用 feedbackUrl 的方式。

job 定义

feedbackUrl 模式下 job 定义中不能有 checkUrl 字段,否则将使用 checkUrl 的模式运行 job

[
    {
        "jobName": "job测试feedback",
        "request": [
            {
                "headers": {
                    "name": "chensc"
                },
                "method": "POST",
                "body": {
                    "classSize": 2,
                    "teacher": "陈奕迅",
                    "student": [
                        {
                            "number": 1,
                            "name": "${jobId}",
                            "age": 40
                        },
                        {
                            "number": 8,
                            "name": "${jobId}",
                            "age": 30
                        }
                    ],
                    "describe": "师大附中..."
                },
                // 第一个request请求
                "url": "service://jobswork/main/resultHandleSchool"
            },
            {
                "headers": {
                    "name": "chensc"
                },
                "method": "POST",
                "body": {
                    "school": "${$.student}|json"
                },
                // 第二个请求,真正的作业接口
                "url": "service://jobswork/main/workHandleSchool?feedbackUrl=${schedulingFeedbackUrl}"
            }
        ],
        "creator": "001",
        "creatorName": "jobswork本身",
        "active": 1,
        "async": true,
        "jobId": "123456789",
        "notification": [
            // stateEquals为空,那么所有结果集都走此策略进行通知
            {
                "headers": {
                    "name": "chensc"
                },
                "body": "${schedulingNotifyBody}|json",
                "method": "POST",
                "url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
            },
            // stateEquals=error1,那么error1的通知结果集将走此策略(error1的通知结果集是由checkUrl返回)
            {
                "stateEquals": "error1",
                "body": "${schedulingNotifyBody}|json",
                "headers": {
                    "hello": "system"
                },
                "method": "POST",
                "url": "service://jobsclient/main/notifierClient/${schedulingJobId}?executeNumber=${schedulingExecuteNumber}"
            }
        ],
        "timer": {
            "period": "0 0/5 * * * ?"
        },
        "qos": 0,
        // 一次任务执行的超时时间默认60s,不等同于http请求超时时间,http请求超时时间由平台托管为60s
        "timeoutSeconds": 60,
        "jobDescription": "走起...",
        "retry": [
            {
                "maxAttempts": 2,
                "backoffRate": 1.5,
                "errorEquals": "500",
                "intervalSeconds": 2
            },
            {
                "maxAttempts": 2,
                "backoffRate": 2.0,
                "errorEquals": "timeout",
                "intervalSeconds": 5
            },
            // check得到的error码
            {
                "maxAttempts": 2,
                "backoffRate": 2.0,
                "errorEquals": "error1",
                "intervalSeconds": 3
            }
        ]
    }
]

work 端提供的 request 作业

  • 注:requests 请求中最后一个作业请求的业务作业过程不能同步阻塞请求,必须将该业务作业过程委派给另外一个线程或进程执行,然后自身进行快速请求响应来完成异步的动作。
  • feedback 回调逻辑需要写在业务线程内,业务执行到什么进度或状态,那么就要回调什么。
@RestController
public class WorkController {

    public static Logger log = Logger.getLogger(WorkController.class);

    // 第一个request请求
    @PostMapping( "/main/resultHandleSchool")
    public School resultHandleSchool(@RequestBody School school){
        log.info("resultHandleSchool body-->" + school.toString());
        return school;
    }

    // 第二个request请求
    @PostMapping( "/main/workHandleSchool")
    public String workHandleSchool(@RequestParam String feedbackUrl, @RequestBody School school){
        log.info("feedbackUrl-->" + feedbackUrl);
        log.info("school——》" + JSON.toJSONString(school));
        // 注:异步必须使用另外一个线程来委托执行作业,主线程必须马上同步响应返回
        new Thread(()->{ // 线程内部执行作业
            // 阻塞5后再进行回调
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            callbackRun(feedbackUrl);
        }).start();

        return "successful";
    }
    // 进行回调scheduling服务
    private void callbackRun(String callbackUrl) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String formattedDate = sdf.format(new Date());
        Results content =  Results.builder().status(CommonState.finalSucceed).data(null)
                .message("任务结束最终成功").progress("100%").datetime(formattedDate).build();

//        Results content = Results.builder().progress("100%")
//                .message("成功收到消息...").status(CommonState.failed)
//                .error("error1") //status=failed时,用户定义的error码,该error码可以在重试策略、notification通知里使用
//                .data(null)
//                .datetime(formattedDate)
//                .build();

        JSONObject body = (JSONObject) JSON.toJSON(content);
        ServiceUtil.post(callbackUrl, body, null);
    }
}

client 端提供的 notification

client 端可以通过拿到的通知内容知晓 job 作业的进度和状态情况

@RestController
public class NotifierClientController {
    public static Logger log = Logger.getLogger(NotifierClientController.class);

    @PostMapping("/main/notifierClient/{jobId}")
    public void notifierClient(@RequestBody HttpInvokeResult<JSONObject> results ,
                              @RequestHeader Map<String,Object> headers,
                              @PathVariable String jobId,
                              @RequestParam String executeNumber){

        log.info("notifierClient==================");

        log.info("jobId-->{}"+ jobId);
        log.info("executeNumber-->{}"+ executeNumber);

//        headers.forEach((key, value) -> {
//            log.info(String.format("Header---> '%s' = %s", key, value));
//        });

        String jsonStr = JSON.toJSONString(results);
        log.info("notify context--->\n" + jsonStr);

    }
}

results matching ""

    No results matching ""