异步 job 工作 API
jobs-work SDK使用
- 引入Maven依赖
<dependency>
<groupId>com.justep.cloud</groupId>
<artifactId>jobs-work</artifactId>
<version>1.0.0</version>
</dependency>
快速开始,调用Api执行异步工作任务
/**
* 常规用法默认feedback模式回调,如果是check回调模式时,那么需要AsyncJobHandle.returnCheckContent()方法响应check内容
*/
public void commonModelTest(JobRequestBody jobRequestBody) {
AsyncJobHandle.submit(jobRequestBody, (content, jobNotifier) -> {
// TODO 具体的业务逻辑
JSONObject contentJson = JSON.parseObject(JSON.toJSONString(content));
jobNotifier.setSumStepNumber(60); // 设置进度条总的步骤数为60步
jobNotifier.notify("hello 你好呀");
jobNotifier.addStepNumber(); // 走完一步逻辑,对进度条追加一步
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
jobNotifier.errorNotify("500", e);
}
jobNotifier.errorNotify("401", "error 401 没登录");
Map<String, String> jobRequestHeaders = jobNotifier.getJobRequestHeaders();
return "result";
});
}
/**
* 当回调模式是check时,用如下方法进行响应。
* 默认返回check内容是使用defaultCheckContentStorer存储器
*/
public void defaultCheckContentStorerTest() {
String jobId = UUID.randomUUID().toString();
Integer executeNumber = 0;
Results results = AsyncJobHandle.returnCheckContent(jobId, executeNumber, CheckContentStorer.defaultCheckContentStorer);
}
AsyncJobExecutor 接口
public interface AsyncJobExecutor {
Object action(Object content, JobNotifier jobNotifier);
// 当需要自定义线程池时需要实现该接口,默认为空,底层将使用job自定义线程池
// ThreadPoolExecutor CUSTOM_POOL = JobThreadPool.getInstance().customThreadPool();
default ExecutorService getExecutorService() {
return null;
}
/**
* 当需要自定义存储check内容时需要实现该接口,默认实现是内存存储,如果用户job work端是多实例场景下,请必须实现该接口。
* 设置check内容存储器:必须遵循 Map 嵌套队列的语义进行存储
* {@code ConcurrentHashMap<String, ResultsQueue<Results>>} checkContent
*/
default CheckContentStorer getCheckContentStorer() {
return CheckContentStorer.defaultCheckContentStorer;
}
}
CheckContentStorer 接口
/**
* 必须遵循 Map 嵌套队列的语义进行存储
* {@code ConcurrentHashMap<String, ResultsQueue<Results>>} checkContent
*/
public interface CheckContentStorer {
CheckContentStorer defaultCheckContentStorer = new DefaultCheckContentStorer();
// 保存,AsyncJobHandle.submit() 时必须实现
void save(String key, Results results);
// 获取返回check内容,AsyncJobHandle.returnCheckContent() 时必须实现
Results get(String key);
}
整体细节演示
public void customModelTest(JobRequestBody jobRequestBody) {
AsyncJobHandle.submit(jobRequestBody, new AsyncJobExecutor() {
@Override
public Object action(Object content, JobNotifier jobNotifier) {
// TODO 具体的业务逻辑
JSONObject result = new JSONObject();
result.put("result", "success");
return result;
}
@Override
public ExecutorService getExecutorService() {
// 自定义线程池
return Executors.newCachedThreadPool();
}
@Override
public CheckContentStorer getCheckContentStorer() {
// 回调是check模式时,用户对通知内容进行自定义持久化存储
return new CheckContentStorer() {
@Override
public void save(String key, Results results) {
// TODO 进行持久化存储
// submit 时这里必须实现
}
@Override
public Results get(String key) {
return null;
}
};
}
});
}
/**
* 回调是check模式时,用户对通知内容进行自定义持久化层拿取
*/
public void customCheckContentStorerTest() {
String jobId = UUID.randomUUID().toString();
Integer executeNumber = 0;
Results results = AsyncJobHandle.returnCheckContent(jobId, executeNumber, new CheckContentStorer() {
@Override
public void save(String key, Results results) {
}
@Override
public Results get(String key) {
// TODO 从持久层拿数据
// returnCheckContent 时这里必须实现
return null;
}
});
}
demo示例
添加 jobs工作端服务demo模版、jobs客服端服务demo模版,分别命名为 jobswork、jobsclient
jobsclient
- 核心类
main.service.impl.JobclientUserServiceImpl讲述如何组装 jobInfo - 必须用 BodyVo 组装 body
BodyVo body2 = BodyVo.builder()
.content("${$.student[0]}|json")
.build();
RequestTemplate requestTemplate2 = RequestTemplate.builder()
.method("POST")
.url("service://jobswork/main/workHandleStr")
.headers(headers)
.body(body2)
.build();
jobswork
- 核心类
main.controller.WorkController讲述如何执行 job 任务 - 部分示例如下
/**
* Job实现
* @param jobRequestBody
* @return
*/
@PostMapping("/main/workHandleStr")
public String workHandleStr(@RequestBody JobRequestBody jobRequestBody) {
log.info("workHandleStr jobRequestBody-->" + JSON.toJSONString(jobRequestBody));
AsyncJobHandle.submit(jobRequestBody, (content, jobNotifier) -> {
// TODO 具体的业务逻辑
return doing(content, jobNotifier);
});
return "hello workHandleStr...";
}
/**
* 如果是check模式,此处对任务调度提供 checkUrl 接口,负者响应回调内容
* @param jobId
* @param executeNumber
* @return
*/
@GetMapping("/main/provideCheckUrl/{jobId}")
public Results provideCheckUrl(@PathVariable String jobId, @RequestParam Integer executeNumber) {
return AsyncJobHandle.returnCheckContent(jobId, executeNumber, CheckContentStorer.defaultCheckContentStorer);
}
