异步 job 工作 API

jobs-work SDK使用

  • 引入Maven依赖
    <dependency>
      <groupId>com.justep.cloud</groupId>
      <artifactId>jobs-work</artifactId>
      <version>1.0.0</version>
    </dependency>

快速开始,调用Api执行异步工作任务

    /**
     * 常规用法默认feedback模式回调,如果是check回调模式时,那么需要AsyncJobHandle.returnCheckContent()方法响应check内容
     */
    public void commonModelTest(JobRequestBody jobRequestBody) {
        AsyncJobHandle.submit(jobRequestBody, (content, jobNotifier) -> {
            // TODO  具体的业务逻辑
            JSONObject contentJson = JSON.parseObject(JSON.toJSONString(content));
            jobNotifier.setSumStepNumber(60); // 设置进度条总的步骤数为60步
            jobNotifier.notify("hello 你好呀");
            jobNotifier.addStepNumber(); // 走完一步逻辑,对进度条追加一步
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                jobNotifier.errorNotify("500", e);
            }
            jobNotifier.errorNotify("401", "error 401 没登录");
            Map<String, String> jobRequestHeaders = jobNotifier.getJobRequestHeaders();
            return "result";
        });
    }

    /**
     * 当回调模式是check时,用如下方法进行响应。
     *  默认返回check内容是使用defaultCheckContentStorer存储器
     */
    public void defaultCheckContentStorerTest() {
        String jobId = UUID.randomUUID().toString();
        Integer executeNumber = 0;
        Results results = AsyncJobHandle.returnCheckContent(jobId, executeNumber, CheckContentStorer.defaultCheckContentStorer);
    }

AsyncJobExecutor 接口

public interface AsyncJobExecutor {

    Object action(Object content, JobNotifier jobNotifier);

    // 当需要自定义线程池时需要实现该接口,默认为空,底层将使用job自定义线程池
    // ThreadPoolExecutor CUSTOM_POOL = JobThreadPool.getInstance().customThreadPool();
    default ExecutorService getExecutorService() {
        return null;
    }

    /**
     * 当需要自定义存储check内容时需要实现该接口,默认实现是内存存储,如果用户job work端是多实例场景下,请必须实现该接口。
     * 设置check内容存储器:必须遵循 Map 嵌套队列的语义进行存储
     * {@code ConcurrentHashMap<String, ResultsQueue<Results>>} checkContent
     */
    default CheckContentStorer getCheckContentStorer() {
        return CheckContentStorer.defaultCheckContentStorer;
    }
}

CheckContentStorer 接口

/**
 * 必须遵循 Map 嵌套队列的语义进行存储
 * {@code ConcurrentHashMap<String, ResultsQueue<Results>>} checkContent
 */
public interface CheckContentStorer {
    CheckContentStorer defaultCheckContentStorer = new DefaultCheckContentStorer();
    // 保存,AsyncJobHandle.submit() 时必须实现
    void save(String key, Results results);
    // 获取返回check内容,AsyncJobHandle.returnCheckContent() 时必须实现
    Results get(String key);
}

整体细节演示

    public void customModelTest(JobRequestBody jobRequestBody) {
        AsyncJobHandle.submit(jobRequestBody, new AsyncJobExecutor() {
            @Override
            public Object action(Object content, JobNotifier jobNotifier) {
                // TODO  具体的业务逻辑
                JSONObject result = new JSONObject();
                result.put("result", "success");
                return result;
            }

            @Override
            public ExecutorService getExecutorService() {
                // 自定义线程池
                return Executors.newCachedThreadPool();
            }

            @Override
            public CheckContentStorer getCheckContentStorer() {
                // 回调是check模式时,用户对通知内容进行自定义持久化存储
                return new CheckContentStorer() {
                    @Override
                    public void save(String key, Results results) {
                        // TODO 进行持久化存储
                        // submit 时这里必须实现
                    }

                    @Override
                    public Results get(String key) {
                        return null;
                    }
                };
            }
        });
    }

    /**
     * 回调是check模式时,用户对通知内容进行自定义持久化层拿取
     */
    public void customCheckContentStorerTest() {
        String jobId = UUID.randomUUID().toString();
        Integer executeNumber = 0;
        Results results = AsyncJobHandle.returnCheckContent(jobId, executeNumber, new CheckContentStorer() {
            @Override
            public void save(String key, Results results) {

            }

            @Override
            public Results get(String key) {
                // TODO 从持久层拿数据
                // returnCheckContent 时这里必须实现
                return null;
            }
        });
    }

demo示例

添加 jobs工作端服务demo模版、jobs客服端服务demo模版,分别命名为 jobswork、jobsclient

jobsclient

  • 核心类 main.service.impl.JobclientUserServiceImpl 讲述如何组装 jobInfo
  • 必须用 BodyVo 组装 body
BodyVo body2 = BodyVo.builder()
    .content("${$.student[0]}|json")
    .build();
RequestTemplate requestTemplate2 = RequestTemplate.builder()
    .method("POST")
    .url("service://jobswork/main/workHandleStr")
    .headers(headers)
    .body(body2)
    .build();

jobswork

  • 核心类 main.controller.WorkController 讲述如何执行 job 任务
  • 部分示例如下
    /**
     * Job实现
     * @param jobRequestBody
     * @return
     */
    @PostMapping("/main/workHandleStr")
    public String workHandleStr(@RequestBody JobRequestBody jobRequestBody) {
        log.info("workHandleStr jobRequestBody-->" + JSON.toJSONString(jobRequestBody));
        AsyncJobHandle.submit(jobRequestBody, (content, jobNotifier) -> {
            // TODO  具体的业务逻辑
            return doing(content, jobNotifier);
        });
        return "hello workHandleStr...";
    }

    /**
     * 如果是check模式,此处对任务调度提供 checkUrl 接口,负者响应回调内容
     * @param jobId
     * @param executeNumber
     * @return
     */
    @GetMapping("/main/provideCheckUrl/{jobId}")
    public Results provideCheckUrl(@PathVariable String jobId, @RequestParam Integer executeNumber) {
        return AsyncJobHandle.returnCheckContent(jobId, executeNumber, CheckContentStorer.defaultCheckContentStorer);
    }

results matching ""

    No results matching ""