From b7b411ce2a016cb7ab882fbea1c9821a5f8c1605 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Mon, 24 Apr 2017 18:55:34 +0800 Subject: [PATCH] =?UTF-8?q?JobHandler=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E5=9B=9E=E8=B0=83=E7=BB=93=E6=9E=9C=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 ++- .../admin/controller/JobLogController.java | 10 +++----- .../xxl/job/admin/core/biz/AdminBizImpl.java | 8 +++--- .../template/jobinfo/jobinfo.index.ftl | 4 ++- .../main/webapp/static/js/joblog.detail.1.js | 8 +++--- .../core/biz/model/HandleCallbackParam.java | 25 ++++++------------- .../com/xxl/job/core/handler/IJobHandler.java | 10 +++++--- .../job/core/handler/impl/GlueJobHandler.java | 4 ++- .../xxl/job/core/log/XxlJobFileAppender.java | 8 +++--- .../com/xxl/job/core/thread/JobThread.java | 24 ++++++++++-------- .../service/jobhandler/DemoJobHandler.java | 4 ++- 11 files changed, 56 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index cbe193ab..dfd18297 100644 --- a/README.md +++ b/README.md @@ -772,6 +772,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 #### 6.12 版本 V1.6.2 特性(Coding) - 1、任务报表:总任务数、总调度数、调度成功比例; +- 2、JobHandler支持自定义回调结果; #### TODO LIST - 1、支持脚本JOB(源码或指定路径), 即shell/python/php等, 日志实时输出并支持在线监控;定制JobHandler实现; @@ -779,7 +780,8 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 3、任务权限管理; - 4、执行器,server启动,注册逻辑调整; - 5、调度失败重试机制; -- 6、JobHandler支持自定义回调结果; + +- 7、JobHandler开启多线程时,支持记录执行日志; ## 七、其他 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index 0af0f5dc..59ca770d 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -120,12 +120,10 @@ public class JobLogController { ReturnT logResult = executorBiz.log(triggerTime, logId, fromLineNum); // is end - if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) { - XxlJobLog jobLog = xxlJobLogDao.load(logId); - if (jobLog.getHandleCode() > 0) { - logResult.getContent().setEnd(true); - } - } + /*XxlJobLog jobLog = xxlJobLogDao.load(logId); + if (jobLog.getHandleCode() > 0) { + logResult.getContent().setEnd(true); + }*/ return logResult; } catch (Exception e) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java index 460daf5a..a4dfe133 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java @@ -31,7 +31,7 @@ public class AdminBizImpl implements AdminBiz { // trigger success, to trigger child job, and avoid repeat trigger child job String childTriggerMsg = null; - if (ReturnT.SUCCESS_CODE==handleCallbackParam.getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) { + if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) { XxlJobInfo xxlJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId()); if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { childTriggerMsg = "
"; @@ -68,8 +68,8 @@ public class AdminBizImpl implements AdminBiz { if (log.getHandleMsg()!=null) { handleMsg.append(log.getHandleMsg()).append("
"); } - if (handleCallbackParam.getMsg() != null) { - handleMsg.append("执行备注:").append(handleCallbackParam.getMsg()); + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append("执行备注:").append(handleCallbackParam.getExecuteResult().getMsg()); } if (childTriggerMsg !=null) { handleMsg.append("
子任务触发备注:").append(childTriggerMsg); @@ -77,7 +77,7 @@ public class AdminBizImpl implements AdminBiz { // success, save log log.setHandleTime(new Date()); - log.setHandleCode(handleCallbackParam.getCode()); + log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); log.setHandleMsg(handleMsg.toString()); XxlJobDynamicScheduler.xxlJobLogDao.updateHandleInfo(log); diff --git a/xxl-job-admin/src/main/webapp/WEB-INF/template/jobinfo/jobinfo.index.ftl b/xxl-job-admin/src/main/webapp/WEB-INF/template/jobinfo/jobinfo.index.ftl index a404e5b5..f96c8a7c 100644 --- a/xxl-job-admin/src/main/webapp/WEB-INF/template/jobinfo/jobinfo.index.ftl +++ b/xxl-job-admin/src/main/webapp/WEB-INF/template/jobinfo/jobinfo.index.ftl @@ -172,14 +172,16 @@ package com.xxl.job.service.handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; public class DemoGlueJobHandler extends IJobHandler { private static transient Logger logger = LoggerFactory.getLogger(DemoGlueJobHandler.class); @Override - public void execute(String... params) throws Exception { + public ReturnT execute(String... params) throws Exception { logger.info("XXL-JOB, Hello World."); + return ReturnT.SUCCESS; } } diff --git a/xxl-job-admin/src/main/webapp/static/js/joblog.detail.1.js b/xxl-job-admin/src/main/webapp/static/js/joblog.detail.1.js index af09ccf5..fa74c02b 100644 --- a/xxl-job-admin/src/main/webapp/static/js/joblog.detail.1.js +++ b/xxl-job-admin/src/main/webapp/static/js/joblog.detail.1.js @@ -8,7 +8,7 @@ $(function() { } // pull log - var fromLineNum = 0; // [from, to] + var fromLineNum = 1; // [from, to], start as 1 var pullFailCount = 0; function pullLog() { // pullFailCount, max=20 @@ -44,15 +44,17 @@ $(function() { } if (fromLineNum > data.content.toLineNum ) { console.log('pullLog already line-end'); + // valid end if (data.content.end) { logRunStop('[Rolling Log Finish]'); return; } + return; } - // append + // append content fromLineNum = data.content.toLineNum + 1; $('#logConsole').append(data.content.logContent); pullFailCount = 0; @@ -72,7 +74,7 @@ $(function() { // handler already callback, end if (handleCode > 0) { - logRunStop('[Load Log Finish]'); + logRunStop('
[Load Log Finish]'); return; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java index dae8a2d2..1c4233c5 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java @@ -12,14 +12,12 @@ public class HandleCallbackParam implements Serializable { private int logId; private Set logAddress; - private int code; - private String msg; + private ReturnT executeResult; - public HandleCallbackParam(int logId, Set logAddress, int code, String msg) { + public HandleCallbackParam(int logId, Set logAddress, ReturnT executeResult) { this.logId = logId; this.logAddress = logAddress; - this.code = code; - this.msg = msg; + this.executeResult = executeResult; } public int getLogId() { @@ -38,20 +36,11 @@ public class HandleCallbackParam implements Serializable { this.logAddress = logAddress; } - public int getCode() { - return code; + public ReturnT getExecuteResult() { + return executeResult; } - public void setCode(int code) { - this.code = code; + public void setExecuteResult(ReturnT executeResult) { + this.executeResult = executeResult; } - - public String getMsg() { - return msg; - } - - public void setMsg(String msg) { - this.msg = msg; - } - } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java index 3098fa60..928dc9cb 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java @@ -1,5 +1,7 @@ package com.xxl.job.core.handler; +import com.xxl.job.core.biz.model.ReturnT; + /** * remote job handler * @author xuxueli 2015-12-19 19:06:38 @@ -7,11 +9,11 @@ package com.xxl.job.core.handler; public abstract class IJobHandler { /** - * job handler

- * the return Object will be and stored + * job handler * @param params - * @throws Exception default sussecc, fail if catch exception + * @return + * @throws Exception */ - public abstract void execute(String... params) throws Exception; + public abstract ReturnT execute(String... params) throws Exception; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java index 8ea891e5..de18c40a 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java @@ -1,5 +1,6 @@ package com.xxl.job.core.handler.impl; +import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,9 +23,10 @@ public class GlueJobHandler extends IJobHandler { } @Override - public void execute(String... params) throws Exception { + public ReturnT execute(String... params) throws Exception { logger.info("----------- glue.version:{} -----------", glueUpdatetime); jobHandler.execute(params); + return ReturnT.SUCCESS; } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index 63c5cf3e..9bb1cf1f 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -122,12 +122,12 @@ public class XxlJobFileAppender extends AppenderSkeleton { // valid log file if (logFileName==null || logFileName.trim().length()==0) { - return new LogResult(fromLineNum, -1, "readLog fail, logFile not found", true); + return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true); } File logFile = new File(filePath, logFileName); if (!logFile.exists()) { - return new LogResult(fromLineNum, -1, "readLog fail, logFile not exists", true); + return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true); } // read file @@ -139,8 +139,8 @@ public class XxlJobFileAppender extends AppenderSkeleton { String line = null; while ((line = reader.readLine())!=null) { - toLineNum++; - if (reader.getLineNumber() >= fromLineNum) { + toLineNum = reader.getLineNumber(); // [from, to], start as 1 + if (toLineNum >= fromLineNum) { logContentBuffer.append(line).append("\n"); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index d1d3c231..2d31f15d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -74,33 +74,36 @@ public class JobThread extends Thread{ ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; // handle job - int _code = ReturnT.SUCCESS_CODE; - String _msg = null; - + ReturnT executeResult = null; try { // log filename: yyyy-MM-dd/9999.log String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); logger.info("----------- xxl-job job execute start -----------"); - handler.execute(handlerParams); + + executeResult = handler.execute(handlerParams); + if (executeResult == null) { + executeResult = ReturnT.FAIL; + } } catch (Exception e) { logger.error("JobThread Exception:", e); - _code = ReturnT.FAIL_CODE; StringWriter out = new StringWriter(); e.printStackTrace(new PrintWriter(out)); - _msg = out.toString(); + + executeResult = new ReturnT(ReturnT.FAIL_CODE, out.toString()); } logger.info("----------- xxl-job job execute end -----------
Look : ExecutorParams:{}, Code:{}, Msg:{}", - new Object[]{handlerParams, _code, _msg}); + new Object[]{handlerParams, executeResult.getCode(), executeResult.getMsg()}); // callback handler info if (!toStop) { // commonm - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), _code, _msg)); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), executeResult)); } else { // is killed - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]")); + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult)); } } } catch (Exception e) { @@ -113,7 +116,8 @@ public class JobThread extends Thread{ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]")); + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult)); } } diff --git a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java index 8d1924b4..635eddb7 100644 --- a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java +++ b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java @@ -1,5 +1,6 @@ package com.xxl.job.executor.service.jobhandler; +import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; import org.slf4j.Logger; @@ -25,13 +26,14 @@ public class DemoJobHandler extends IJobHandler { private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class); @Override - public void execute(String... params) throws Exception { + public ReturnT execute(String... params) throws Exception { logger.info("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { logger.info("beat at:{}", i); TimeUnit.SECONDS.sleep(2); } + return ReturnT.SUCCESS; } } -- GitLab