From 8b71e2cec4d4e32cd8bacd36bc6abcb3d412da91 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Fri, 22 Jul 2016 21:50:46 +0800 Subject: [PATCH] init --- .../admin/controller/JobLogController.java | 55 ++-- .../XxlJobLogCallbackServerHandler.java | 56 ++-- .../admin/core/jobbean/LocalNomalJobBean.java | 10 +- .../admin/core/jobbean/RemoteHttpJobBean.java | 78 +++--- .../jobbean/impl/DemoConcurrentJobBean.java | 4 +- .../core/jobbean/impl/DemoNomalJobBean.java | 4 +- .../admin/core/thread/JobMonitorHelper.java | 8 +- .../xxl/job/admin/service/IXxlJobService.java | 2 +- .../admin/service/impl/XxlJobServiceImpl.java | 2 +- .../main/webapp/static/js/jobinfo.index.1.js | 8 +- .../main/webapp/static/js/joblog.index.1.js | 1 + .../com/xxl/job/dao/impl/XxlJobInfoTest.java | 10 +- .../com/xxl/job/dao/impl/XxlJobLogTest.java | 18 +- .../core/executor/jetty/XxlJobExecutor.java | 6 +- .../executor/jetty/XxlJobExecutorHandler.java | 47 ++-- .../core/executor/servlet/XxlJobServlet.java | 4 +- .../com/xxl/job/core/glue/GlueFactory.java | 2 +- .../job/core/handler/HandlerRepository.java | 247 ------------------ .../xxl/job/core/handler/HandlerThread.java | 129 --------- .../com/xxl/job/core/handler/IJobHandler.java | 10 +- .../xxl/job/core/log/XxlJobFileAppender.java | 23 +- .../xxl/job/core/router/HandlerRouter.java | 99 +++++++ .../java/com/xxl/job/core/router/IAction.java | 13 + .../job/core/router/action/BeatAction.java | 17 ++ .../job/core/router/action/KillAction.java | 35 +++ .../xxl/job/core/router/action/LogAction.java | 21 ++ .../xxl/job/core/router/action/RunAction.java | 66 +++++ .../job/core/router/model/RequestModel.java | 140 ++++++++++ .../job/core/router/model/ResponseModel.java | 45 ++++ .../xxl/job/core/router/thread/JobThread.java | 128 +++++++++ .../router/thread/TriggerCallbackThread.java | 44 ++++ .../xxl/job/core/util/ByteHexConverter.java | 59 +++++ .../java/com/xxl/job/core/util/HttpUtil.java | 130 --------- .../xxl/job/core/util/XxlJobNetCommUtil.java | 141 ++++++++++ 34 files changed, 988 insertions(+), 674 deletions(-) delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerRepository.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/HttpUtil.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index 0bc38efe..2a08c31a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -6,10 +6,10 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogDao; -import com.xxl.job.core.handler.HandlerRepository.ActionEnum; -import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum; -import com.xxl.job.core.util.HttpUtil; -import com.xxl.job.core.util.HttpUtil.RemoteCallBack; +import com.xxl.job.core.router.HandlerRouter.ActionRepository; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.util.XxlJobNetCommUtil; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.springframework.stereotype.Controller; @@ -110,22 +110,22 @@ public class JobLogController { if (log == null) { return new ReturnT(500, "参数异常"); } - if (!RemoteCallBack.SUCCESS.equals(log.getTriggerStatus())) { + if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) { return new ReturnT(500, "调度失败,无法查看执行日志"); } // trigger id, trigger time - Map reqMap = new HashMap(); - reqMap.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis())); - reqMap.put(HandlerParamEnum.ACTION.name(), ActionEnum.LOG.name()); - reqMap.put(HandlerParamEnum.LOG_ID.name(), String.valueOf(id)); - reqMap.put(HandlerParamEnum.LOG_DATE.name(), String.valueOf(log.getTriggerTime().getTime())); - - RemoteCallBack callBack = HttpUtil.post(HttpUtil.addressToUrl(log.getExecutorAddress()), reqMap); - if (HttpUtil.RemoteCallBack.SUCCESS.equals(callBack.getStatus())) { - return new ReturnT(callBack.getMsg()); + RequestModel requestModel = new RequestModel(); + requestModel.setTimestamp(System.currentTimeMillis()); + requestModel.setAction(ActionRepository.LOG.name()); + requestModel.setLogId(id); + requestModel.setLogDateTim(log.getTriggerTime().getTime()); + + ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel); + if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { + return new ReturnT(responseModel.getMsg()); } else { - return new ReturnT(500, callBack.getMsg()); + return new ReturnT(500, responseModel.getMsg()); } } @@ -145,27 +145,26 @@ public class JobLogController { if (log == null || jobInfo==null) { return new ReturnT(500, "参数异常"); } - if (!RemoteCallBack.SUCCESS.equals(log.getTriggerStatus())) { + if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) { return new ReturnT(500, "调度失败,无法终止日志"); } - // request - Map reqMap = new HashMap(); - reqMap.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis())); - reqMap.put(HandlerParamEnum.ACTION.name(), ActionEnum.KILL.name()); - reqMap.put(HandlerParamEnum.JOB_GROUP.name(), log.getJobGroup()); - reqMap.put(HandlerParamEnum.JOB_NAME.name(), log.getJobName()); - reqMap.put(HandlerParamEnum.GLUE_SWITCH.name(), String.valueOf(jobInfo.getGlueSwitch())); + // request of kill + RequestModel requestModel = new RequestModel(); + requestModel.setTimestamp(System.currentTimeMillis()); + requestModel.setAction(ActionRepository.KILL.name()); + requestModel.setJobGroup(log.getJobGroup()); + requestModel.setJobName(log.getJobName()); - RemoteCallBack callBack = HttpUtil.post(HttpUtil.addressToUrl(log.getExecutorAddress()), reqMap); - if (HttpUtil.RemoteCallBack.SUCCESS.equals(callBack.getStatus())) { - log.setHandleStatus(HttpUtil.RemoteCallBack.FAIL); + ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel); + if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { + log.setHandleStatus(ResponseModel.FAIL); log.setHandleMsg("人为操作主动终止"); log.setHandleTime(new Date()); xxlJobLogDao.updateHandleInfo(log); - return new ReturnT(callBack.getMsg()); + return new ReturnT(responseModel.getMsg()); } else { - return new ReturnT(500, callBack.getMsg()); + return new ReturnT(500, responseModel.getMsg()); } } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java index af5ef762..7a35710c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java @@ -1,20 +1,18 @@ package com.xxl.job.admin.core.callback; -import java.io.IOException; -import java.util.Date; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.util.DynamicSchedulerUtil; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.util.XxlJobNetCommUtil; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.lang.StringUtils; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; - -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.util.DynamicSchedulerUtil; -import com.xxl.job.core.util.HttpUtil.RemoteCallBack; -import com.xxl.job.core.util.JacksonUtil; +import java.io.IOException; +import java.util.Date; /** * Created by xuxueli on 2016-5-22 11:15:42 @@ -27,31 +25,31 @@ public class XxlJobLogCallbackServerHandler extends AbstractHandler { httpServletRequest.setCharacterEncoding("UTF-8"); httpServletResponse.setCharacterEncoding("UTF-8"); - // parse param - String log_id = httpServletRequest.getParameter("log_id"); - String status = httpServletRequest.getParameter("status"); - String msg = httpServletRequest.getParameter("msg"); - + // parse hex-json to request model + String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX); + RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class); + // process - RemoteCallBack callBack = new RemoteCallBack(); - callBack.setStatus(RemoteCallBack.FAIL); - if (StringUtils.isNumeric(log_id) && StringUtils.isNotBlank(status)) { - XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(Integer.valueOf(log_id)); - if (log!=null) { - log.setHandleTime(new Date()); - log.setHandleStatus(status); - log.setHandleMsg(msg); - DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log); - callBack.setStatus(RemoteCallBack.SUCCESS); - } + ResponseModel responseModel = null; + XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId()); + if (log!=null) { + log.setHandleTime(new Date()); + log.setHandleStatus(requestModel.getStatus()); + log.setHandleMsg(requestModel.getMsg()); + DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log); + responseModel = new ResponseModel(ResponseModel.SUCCESS, null); + } else { + responseModel = new ResponseModel(ResponseModel.FAIL, "log item not found."); } - String resp = JacksonUtil.writeValueAsString(callBack); + + // format response model to hex-json + String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel); // response httpServletResponse.setContentType("text/html;charset=utf-8"); httpServletResponse.setStatus(HttpServletResponse.SC_OK); baseRequest.setHandled(true); - httpServletResponse.getWriter().println(resp); + httpServletResponse.getWriter().println(responseHex); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/LocalNomalJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/LocalNomalJobBean.java index 7133b0a3..2d003f61 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/LocalNomalJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/LocalNomalJobBean.java @@ -1,5 +1,5 @@ package com.xxl.job.admin.core.jobbean; -//package com.xxl.job.service.job; +//package com.xxl.job.action.job; // //import java.io.PrintWriter; //import java.io.StringWriter; @@ -15,8 +15,8 @@ package com.xxl.job.admin.core.jobbean; //import org.slf4j.LoggerFactory; //import org.springframework.scheduling.quartz.QuartzJobBean; // -//import com.xxl.job.client.handler.HandlerRepository; -//import com.xxl.job.client.util.HttpUtil.RemoteCallBack; +//import com.xxl.job.client.handler.HandlerRouter; +//import com.xxl.job.client.util.XxlJobNetCommUtil.RemoteCallBack; //import com.xxl.job.client.util.JacksonUtil; //import com.xxl.job.core.model.XxlJobInfo; //import com.xxl.job.core.model.XxlJobLog; @@ -55,7 +55,7 @@ package com.xxl.job.admin.core.jobbean; // logger.info(">>>>>>>>>>> xxl-job trigger start, jobLog:{}", jobLog); // // // trigger request -// String handler_params = jobDataMap.get(HandlerRepository.HANDLER_PARAMS); +// String handler_params = jobDataMap.get(HandlerRouter.HANDLER_PARAMS); // String[] handlerParams = null; // if (StringUtils.isNotBlank(handler_params)) { // handlerParams = handler_params.split(","); @@ -72,7 +72,7 @@ package com.xxl.job.admin.core.jobbean; // jobLog.setHandleStatus(RemoteCallBack.SUCCESS); // jobLog.setHandleMsg(JacksonUtil.writeValueAsString(responseMsg)); // } catch (Exception e) { -// logger.info("HandlerThread Exception:", e); +// logger.info("JobThread Exception:", e); // StringWriter out = new StringWriter(); // e.printStackTrace(new PrintWriter(out)); // diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index 300a4d1a..3f6439e0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -5,10 +5,10 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.thread.JobMonitorHelper; import com.xxl.job.admin.core.util.DynamicSchedulerUtil; -import com.xxl.job.core.handler.HandlerRepository.ActionEnum; -import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum; -import com.xxl.job.core.util.HttpUtil; -import com.xxl.job.core.util.HttpUtil.RemoteCallBack; +import com.xxl.job.core.router.HandlerRouter.ActionRepository; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.util.XxlJobNetCommUtil; import org.apache.commons.lang.StringUtils; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -18,7 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.scheduling.quartz.QuartzJobBean; import java.text.MessageFormat; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; /** * http job bean @@ -43,30 +46,27 @@ public class RemoteHttpJobBean extends QuartzJobBean { logger.info(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // trigger request - HashMap params = new HashMap(); - params.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis())); - params.put(HandlerParamEnum.ACTION.name(), ActionEnum.RUN.name()); - - params.put(HandlerParamEnum.LOG_ADDRESS.name(), XxlJobLogCallbackServer.getTrigger_log_address()); - params.put(HandlerParamEnum.LOG_ID.name(), String.valueOf(jobLog.getId())); - - params.put(HandlerParamEnum.JOB_GROUP.name(), jobInfo.getJobGroup()); - params.put(HandlerParamEnum.JOB_NAME.name(), jobInfo.getJobName()); - params.put(HandlerParamEnum.EXECUTOR_HANDLER.name(), jobInfo.getExecutorHandler()); - params.put(HandlerParamEnum.EXECUTOR_PARAMS.name(), jobInfo.getExecutorParam()); - - params.put(HandlerParamEnum.GLUE_SWITCH.name(), String.valueOf(jobInfo.getGlueSwitch())); + RequestModel requestModel = new RequestModel(); + requestModel.setTimestamp(System.currentTimeMillis()); + requestModel.setAction(ActionRepository.RUN.name()); + requestModel.setJobGroup(jobInfo.getJobGroup()); + requestModel.setJobName(jobInfo.getJobName()); + requestModel.setExecutorHandler(jobInfo.getExecutorHandler()); + requestModel.setExecutorParams(jobInfo.getExecutorParam()); + requestModel.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true); + requestModel.setLogAddress(XxlJobLogCallbackServer.getTrigger_log_address()); + requestModel.setLogId(jobLog.getId()); // failover trigger - RemoteCallBack callback = failoverTrigger(jobInfo.getExecutorAddress(), params, jobLog); + ResponseModel responseModel = failoverTrigger(jobInfo.getExecutorAddress(), requestModel, jobLog); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); - logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, callback:{}", jobLog.getId(), callback); + logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString()); // update trigger info jobLog.setTriggerTime(new Date()); - jobLog.setTriggerStatus(callback.getStatus()); - jobLog.setTriggerMsg(callback.getMsg()); + jobLog.setTriggerStatus(responseModel.getStatus()); + jobLog.setTriggerMsg(responseModel.getMsg()); DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog); // monitor triger @@ -81,7 +81,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { * @param handler_address * @return */ - public RemoteCallBack failoverTrigger(String handler_address, HashMap handler_params, XxlJobLog jobLog){ + public ResponseModel failoverTrigger(String handler_address, RequestModel requestModel, XxlJobLog jobLog){ if (handler_address.split(",").length > 1) { // for ha @@ -92,33 +92,43 @@ public class RemoteHttpJobBean extends QuartzJobBean { String failoverMessage = ""; for (String address : addressList) { if (StringUtils.isNotBlank(address)) { - HashMap params = new HashMap(); - params.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis())); - params.put(HandlerParamEnum.ACTION.name(), ActionEnum.BEAT.name()); - RemoteCallBack beatResult = HttpUtil.post(HttpUtil.addressToUrl(address), params); + + // beat check + RequestModel beatRequest = new RequestModel(); + beatRequest.setTimestamp(System.currentTimeMillis()); + beatRequest.setAction(ActionRepository.BEAT.name()); + ResponseModel beatResult = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), beatRequest); failoverMessage += MessageFormat.format("BEAT running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, beatResult.getStatus(), beatResult.getMsg()); - if (RemoteCallBack.SUCCESS.equals(beatResult.getStatus())) { + + // beat success, trigger do + if (beatResult.SUCCESS.equals(beatResult.getStatus())) { + // store real address jobLog.setExecutorAddress(address); - RemoteCallBack triggerCallback = HttpUtil.post(HttpUtil.addressToUrl(address), handler_params); - triggerCallback.setStatus(RemoteCallBack.SUCCESS); + + // real trigger + ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); failoverMessage += MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, triggerCallback.getStatus(), triggerCallback.getMsg()); triggerCallback.setMsg(failoverMessage); return triggerCallback; } + } } - - RemoteCallBack result = new RemoteCallBack(); - result.setStatus(RemoteCallBack.FAIL); + + ResponseModel result = new ResponseModel(); + result.setStatus(ResponseModel.FAIL); result.setMsg(failoverMessage); return result; } else { + // store real address jobLog.setExecutorAddress(handler_address); - RemoteCallBack triggerCallback = HttpUtil.post(HttpUtil.addressToUrl(handler_address), handler_params); + + ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(handler_address), requestModel); String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", handler_address, triggerCallback.getStatus(), triggerCallback.getMsg()); triggerCallback.setMsg(failoverMessage); return triggerCallback; } } + } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoConcurrentJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoConcurrentJobBean.java index 2bf22eb4..22a77f54 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoConcurrentJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoConcurrentJobBean.java @@ -1,11 +1,11 @@ package com.xxl.job.admin.core.jobbean.impl; -//package com.xxl.job.service.job.impl; +//package com.xxl.job.action.job.impl; // //import java.util.concurrent.TimeUnit; // //import org.quartz.DisallowConcurrentExecution; // -//import com.xxl.job.service.job.LocalNomalJobBean; +//import com.xxl.job.action.job.LocalNomalJobBean; // ///** // * demo job bean for no-concurrent diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoNomalJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoNomalJobBean.java index 8c5a05b4..7d5bf667 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoNomalJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/impl/DemoNomalJobBean.java @@ -1,12 +1,12 @@ package com.xxl.job.admin.core.jobbean.impl; -//package com.xxl.job.service.job.impl; +//package com.xxl.job.action.job.impl; // //import java.util.concurrent.TimeUnit; // //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; // -//import com.xxl.job.service.job.LocalNomalJobBean; +//import com.xxl.job.action.job.LocalNomalJobBean; // ///** // * demo job bean for concurrent diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java index 3831bd18..de42b1ca 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java @@ -4,7 +4,7 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.util.MailUtil; -import com.xxl.job.core.util.HttpUtil.RemoteCallBack; +import com.xxl.job.core.router.model.ResponseModel; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public class JobMonitorHelper { logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId); XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId); if (log!=null) { - if (RemoteCallBack.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) { + if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { @@ -48,10 +48,10 @@ public class JobMonitorHelper { } JobMonitorHelper.monitor(jobLogId); } - if (RemoteCallBack.SUCCESS.equals(log.getTriggerStatus()) && RemoteCallBack.SUCCESS.equals(log.getHandleStatus())) { + if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && ResponseModel.SUCCESS.equals(log.getHandleStatus())) { // pass } - if (RemoteCallBack.FAIL.equals(log.getTriggerStatus()) || RemoteCallBack.FAIL.equals(log.getHandleStatus())) { + if (ResponseModel.FAIL.equals(log.getTriggerStatus()) || ResponseModel.FAIL.equals(log.getHandleStatus())) { XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java index 72a3fbb4..ccca803c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java @@ -5,7 +5,7 @@ import java.util.Map; import com.xxl.job.admin.core.model.ReturnT; /** - * core job service for xxl-job + * core job action for xxl-job * * @author xuxueli 2016-5-28 15:30:33 */ diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 7c99a600..e86452ac 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; /** - * core job service for xxl-job + * core job action for xxl-job * @author xuxueli 2016-5-28 15:30:33 */ @Service diff --git a/xxl-job-admin/src/main/webapp/static/js/jobinfo.index.1.js b/xxl-job-admin/src/main/webapp/static/js/jobinfo.index.1.js index 55112121..87545561 100644 --- a/xxl-job-admin/src/main/webapp/static/js/jobinfo.index.1.js +++ b/xxl-job-admin/src/main/webapp/static/js/jobinfo.index.1.js @@ -265,7 +265,9 @@ $(function() { $.post(base_url + "/jobinfo/add", $("#addModal .form").serialize(), function(data, status) { if (data.code == "200") { ComAlert.show(1, "新增任务成功", function(){ - window.location.reload(); + //window.location.reload(); + $('#addModal').modal('hide'); + jobTable.fnDraw(); }); } else { if (data.msg) { @@ -395,7 +397,9 @@ $(function() { $.post(base_url + "/jobinfo/reschedule", $("#updateModal .form").serialize(), function(data, status) { if (data.code == "200") { ComAlert.show(1, "更新成功", function(){ - window.location.reload(); + //window.location.reload(); + $('#updateModal').modal('hide'); + jobTable.fnDraw(); }); } else { if (data.msg) { diff --git a/xxl-job-admin/src/main/webapp/static/js/joblog.index.1.js b/xxl-job-admin/src/main/webapp/static/js/joblog.index.1.js index ef89a948..1f982b51 100644 --- a/xxl-job-admin/src/main/webapp/static/js/joblog.index.1.js +++ b/xxl-job-admin/src/main/webapp/static/js/joblog.index.1.js @@ -71,6 +71,7 @@ $(function() { var obj = {}; obj.jobGroup = $('#jobGroup').val(); obj.jobName = $('#jobName').val(); + obj.filterTime = $('#filterTime').val(); obj.start = d.start; obj.length = d.length; return obj; diff --git a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobInfoTest.java b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobInfoTest.java index 84ac113e..f655d8a4 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobInfoTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobInfoTest.java @@ -1,16 +1,14 @@ package com.xxl.job.dao.impl; -import java.util.List; - -import javax.annotation.Resource; - +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.dao.IXxlJobInfoDao; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.dao.IXxlJobInfoDao; +import javax.annotation.Resource; +import java.util.List; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath*:applicationcontext-*.xml") diff --git a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java index b4ff2fe1..7df61b4a 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java @@ -1,19 +1,17 @@ package com.xxl.job.dao.impl; -import java.util.Date; -import java.util.List; - -import javax.annotation.Resource; - +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.dao.IXxlJobLogDao; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.router.model.ResponseModel; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.dao.IXxlJobLogDao; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.util.HttpUtil.RemoteCallBack; +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath*:applicationcontext-*.xml") @@ -38,7 +36,7 @@ public class XxlJobLogTest { public void updateTriggerInfo(){ XxlJobLog xxlJobLog = xxlJobLogDao.load(29); xxlJobLog.setTriggerTime(new Date()); - xxlJobLog.setTriggerStatus(RemoteCallBack.SUCCESS); + xxlJobLog.setTriggerStatus(ResponseModel.SUCCESS); xxlJobLog.setTriggerMsg("trigger msg"); xxlJobLogDao.updateTriggerInfo(xxlJobLog); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java index 20fc5bc4..a9dea6f3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java @@ -1,6 +1,6 @@ package com.xxl.job.core.executor.jetty; -import com.xxl.job.core.handler.HandlerRepository; +import com.xxl.job.core.router.HandlerRouter; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; import org.eclipse.jetty.server.Connector; @@ -81,7 +81,7 @@ public class XxlJobExecutor implements ApplicationContextAware { } /** - * init job handler service + * init job handler action */ public void initJobHandler(){ Map serviceBeanMap = XxlJobExecutor.applicationContext.getBeansWithAnnotation(JobHander.class); @@ -90,7 +90,7 @@ public class XxlJobExecutor implements ApplicationContextAware { if (serviceBean instanceof IJobHandler){ String name = serviceBean.getClass().getAnnotation(JobHander.class).value(); IJobHandler handler = (IJobHandler) serviceBean; - HandlerRepository.registJobHandler(name, handler); + HandlerRouter.registJobHandler(name, handler); } } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java index c26d6e60..f74cd8d4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java @@ -1,20 +1,24 @@ package com.xxl.job.core.executor.jetty; -import com.xxl.job.core.handler.HandlerRepository; +import com.xxl.job.core.router.HandlerRouter; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.util.XxlJobNetCommUtil; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; /** * Created by xuxueli on 2016/3/2 21:23. */ public class XxlJobExecutorHandler extends AbstractHandler { + private static Logger logger = LoggerFactory.getLogger(XxlJobExecutorHandler.class); @Override public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException { @@ -22,22 +26,31 @@ public class XxlJobExecutorHandler extends AbstractHandler { httpServletRequest.setCharacterEncoding("UTF-8"); httpServletResponse.setCharacterEncoding("UTF-8"); - Map _param = new HashMap(); - if (httpServletRequest.getParameterMap()!=null && httpServletRequest.getParameterMap().size()>0) { - for (Object paramKey : httpServletRequest.getParameterMap().keySet()) { - if (paramKey!=null) { - String paramKeyStr = paramKey.toString(); - _param.put(paramKeyStr, httpServletRequest.getParameter(paramKeyStr)); - } - } - } - - String resp = HandlerRepository.service(_param); - - httpServletResponse.setContentType("text/html;charset=utf-8"); + // parse hex-json to request model + String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX); + ResponseModel responseModel = null; + if (requestHex!=null && requestHex.trim().length()>0) { + try { + // route trigger + RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class); + responseModel = HandlerRouter.route(requestModel); + } catch (Exception e) { + logger.error("", e); + responseModel = new ResponseModel(ResponseModel.SUCCESS, e.getMessage()); + } + } + if (responseModel == null) { + responseModel = new ResponseModel(ResponseModel.SUCCESS, "系统异常"); + } + + // format response model to hex-json + String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel); + + // return + httpServletResponse.setContentType("text/plain;charset=utf-8"); httpServletResponse.setStatus(HttpServletResponse.SC_OK); baseRequest.setHandled(true); - httpServletResponse.getWriter().println(resp); + httpServletResponse.getWriter().println(responseHex); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java index 7051bb18..078eee89 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java @@ -10,7 +10,7 @@ //import javax.servlet.http.HttpServletRequest; //import javax.servlet.http.HttpServletResponse; // -//import com.xxl.job.client.handler.HandlerRepository; +//import com.xxl.job.client.handler.HandlerRouter; // // ///** @@ -45,7 +45,7 @@ // } // } // -// String resp = HandlerRepository.service(_param); +// String resp = HandlerRouter.action(_param); // response.getWriter().append(resp); // return; // } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java index 652d073f..7c6ecfeb 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java @@ -59,7 +59,7 @@ public class GlueFactory implements ApplicationContextAware { } /** - * inject service of spring + * inject action of spring * @param instance */ public void injectService(Object instance){ diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerRepository.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerRepository.java deleted file mode 100644 index 67dff129..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerRepository.java +++ /dev/null @@ -1,247 +0,0 @@ -package com.xxl.job.core.handler; - -import com.xxl.job.core.handler.impl.GlueJobHandler; -import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.job.core.util.HttpUtil; -import com.xxl.job.core.util.HttpUtil.RemoteCallBack; -import com.xxl.job.core.util.JacksonUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * handler repository - * @author xuxueli 2015-12-19 19:28:44 - */ -public class HandlerRepository { - private static Logger logger = LoggerFactory.getLogger(HandlerRepository.class); - - public enum HandlerParamEnum{ - /** - * trigger timestamp - */ - TIMESTAMP, - /** - * trigger action - */ - ACTION, - /** - * job group - */ - JOB_GROUP, - /** - * job name - */ - JOB_NAME, - /** - * params of jobhandler - */ - EXECUTOR_HANDLER, - /** - * params of jobhandler - */ - EXECUTOR_PARAMS, - /** - * switch of glue job: 0-no,1-yes - */ - GLUE_SWITCH, - /** - * address for callback log - */ - LOG_ADDRESS, - /** - * log id - */ - LOG_ID, - /** - * log date - */ - LOG_DATE - } - public enum ActionEnum{RUN, KILL, LOG, BEAT} - - // jobhandler repository - private static ConcurrentHashMap handlerRepository = new ConcurrentHashMap(); - public static void registJobHandler(String name, IJobHandler jobHandler){ - handlerRepository.put(name, jobHandler); - logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); - } - - // thread repository of jobhandler - public static ConcurrentHashMap handlerTreadMap = new ConcurrentHashMap(); - public static HandlerThread registJobHandlerThread(String jobkey, IJobHandler handler){ - HandlerThread handlerThread = new HandlerThread(handler); - handlerThread.start(); - logger.info(">>>>>>>>>>> xxl-job regist handler success, jobkey:{}, handler:{}", new Object[]{jobkey, handler}); - return handlerTreadMap.put(jobkey, handlerThread); // putIfAbsent - } - - // handler push to queue - public static String service(Map _param) { - logger.debug(">>>>>>>>>>> xxl-job service start, _param:{}", new Object[]{_param}); - - // callback - RemoteCallBack callback = new RemoteCallBack(); - callback.setStatus(RemoteCallBack.FAIL); - - // check namespace - String namespace = _param.get(HandlerParamEnum.ACTION.name()); - if (namespace==null || namespace.trim().length()==0) { - callback.setMsg("param[NAMESPACE] can not be null."); - return JacksonUtil.writeValueAsString(callback); - } - // encryption check - long timestamp = _param.get(HandlerParamEnum.TIMESTAMP.name())!=null?Long.valueOf(_param.get(HandlerParamEnum.TIMESTAMP.name())):-1; - if (System.currentTimeMillis() - timestamp > 60000) { - callback.setMsg("Timestamp check failed."); - return JacksonUtil.writeValueAsString(callback); - } - - // parse namespace - if (namespace.equals(ActionEnum.RUN.name())) { - - // generate jobKey - String job_group = _param.get(HandlerParamEnum.JOB_GROUP.name()); - String job_name = _param.get(HandlerParamEnum.JOB_NAME.name()); - if (job_group == null || job_group.trim().length()==0 || job_name == null || job_name.trim().length()==0) { - callback.setMsg("JOB_GROUP or JOB_NAME is null."); - return JacksonUtil.writeValueAsString(callback); - } - - // glue switch - String handler_glue_switch = _param.get(HandlerParamEnum.GLUE_SWITCH.name()); - if (handler_glue_switch==null || handler_glue_switch.trim().length()==0){ - callback.setMsg("GLUE_SWITCH is null."); - return JacksonUtil.writeValueAsString(callback); - } - - // load old thread - String jobKey = job_group.concat("_").concat(job_name); - HandlerThread handlerThread = handlerTreadMap.get(jobKey); - - if ("0".equals(handler_glue_switch)) { - // bean model - - // handler name - String executor_handler = _param.get(HandlerParamEnum.EXECUTOR_HANDLER.name()); - if (executor_handler==null || executor_handler.trim().length()==0){ - callback.setMsg("EXECUTOR_HANDLER is null."); - return JacksonUtil.writeValueAsString(callback); - } - - // handler instance - IJobHandler jobHandler = handlerRepository.get(executor_handler); - - if (handlerThread == null) { - // jobhandler match - if (jobHandler==null) { - callback.setMsg("handler for jobKey=[" + jobKey + "] not found."); - return JacksonUtil.writeValueAsString(callback); - } - handlerThread = HandlerRepository.registJobHandlerThread(jobKey, jobHandler); - } else { - if (handlerThread.getHandler() != jobHandler) { - handlerThread = HandlerRepository.registJobHandlerThread(jobKey, jobHandler); - } - } - } else { - // glue - if (handlerThread == null) { - handlerThread = HandlerRepository.registJobHandlerThread(jobKey, new GlueJobHandler(job_group, job_name)); - } - } - - // push data to queue - handlerThread.pushData(_param); - callback.setStatus(RemoteCallBack.SUCCESS); - } else if (namespace.equals(ActionEnum.KILL.name())) { - // generate jobKey - String job_group = _param.get(HandlerParamEnum.JOB_GROUP.name()); - String job_name = _param.get(HandlerParamEnum.JOB_NAME.name()); - if (job_group == null || job_group.trim().length()==0 || job_name == null || job_name.trim().length()==0) { - callback.setMsg("JOB_GROUP or JOB_NAME is null."); - return JacksonUtil.writeValueAsString(callback); - } - String jobKey = job_group.concat("_").concat(job_name); - - // kill handlerThread, and create new one - HandlerThread handlerThread = handlerTreadMap.get(jobKey); - if (handlerThread != null) { - IJobHandler handler = handlerThread.getHandler(); - handlerThread.toStop(); - handlerThread.interrupt(); - HandlerRepository.registJobHandlerThread(jobKey, handler); - callback.setStatus(RemoteCallBack.SUCCESS); - } else { - callback.setMsg("handler for jobKey=[" + jobKey + "] not found."); - } - - } else if (namespace.equals(ActionEnum.LOG.name())) { - String log_id = _param.get(HandlerParamEnum.LOG_ID.name()); - String log_date = _param.get(HandlerParamEnum.LOG_DATE.name()); - if (log_id==null || log_date==null) { - callback.setMsg("LOG_ID | LOG_DATE can not be null."); - return JacksonUtil.writeValueAsString(callback); - } - int logId = -1; - Date triggerDate = null; - try { - logId = Integer.valueOf(log_id); - triggerDate = new Date(Long.valueOf(log_date)); - } catch (Exception e) { - } - if (logId<=0 || triggerDate==null) { - callback.setMsg("LOG_ID | LOG_DATE parse error."); - return JacksonUtil.writeValueAsString(callback); - } - String logConteng = XxlJobFileAppender.readLog(triggerDate, log_id); - callback.setStatus(RemoteCallBack.SUCCESS); - callback.setMsg(logConteng); - } else if (namespace.equals(ActionEnum.BEAT.name())) { - callback.setStatus(RemoteCallBack.SUCCESS); - callback.setMsg(null); - } else { - callback.setMsg("param[Action] is not valid."); - return JacksonUtil.writeValueAsString(callback); - } - - logger.debug(">>>>>>>>>>> xxl-job service end, triggerData:{}"); - return JacksonUtil.writeValueAsString(callback); - } - - // ----------------------- for callback log ----------------------- - private static LinkedBlockingQueue> callBackQueue = new LinkedBlockingQueue>(); - static { - new Thread(new Runnable() { - @Override - public void run() { - while(true){ - try { - HashMap item = callBackQueue.take(); - if (item != null) { - RemoteCallBack callback = null; - try { - callback = HttpUtil.post(item.get("_address"), item); - } catch (Exception e) { - logger.info("HandlerThread Exception:", e); - } - logger.info(">>>>>>>>>>> xxl-job callback , params:{}, result:{}", new Object[]{item, callback}); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }).start(); - } - public static void pushCallBack(String address, HashMap params){ - params.put("_address", address); - callBackQueue.add(params); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java deleted file mode 100644 index b283b088..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java +++ /dev/null @@ -1,129 +0,0 @@ -package com.xxl.job.core.handler; - -import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum; -import com.xxl.job.core.handler.IJobHandler.JobHandleStatus; -import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.job.core.util.HttpUtil; -import org.eclipse.jetty.util.ConcurrentHashSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * handler thread - * @author xuxueli 2016-1-16 19:52:47 - */ -public class HandlerThread extends Thread{ - private static Logger logger = LoggerFactory.getLogger(HandlerThread.class); - - private IJobHandler handler; - private LinkedBlockingQueue> handlerDataQueue; - private ConcurrentHashSet logIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID - private boolean toStop = false; - - public HandlerThread(IJobHandler handler) { - this.handler = handler; - handlerDataQueue = new LinkedBlockingQueue>(); - logIdSet = new ConcurrentHashSet(); - } - - public IJobHandler getHandler() { - return handler; - } - public void toStop() { - /** - * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), - * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; - * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; - */ - this.toStop = true; - } - - public void pushData(Map param) { - if (param.get(HandlerParamEnum.LOG_ID.name())!=null && !logIdSet.contains(param.get(HandlerParamEnum.LOG_ID.name()))) { - handlerDataQueue.offer(param); - } - } - - int i = 1; - @Override - public void run() { - while(!toStop){ - try { - // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) - Map handlerData = handlerDataQueue.poll(3L, TimeUnit.SECONDS); - if (handlerData!=null) { - i= 0; - String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name()); - String log_id = handlerData.get(HandlerParamEnum.LOG_ID.name()); - String handler_params = handlerData.get(HandlerParamEnum.EXECUTOR_PARAMS.name()); - logIdSet.remove(log_id); - - // parse param - String[] handlerParams = null; - if (handler_params!=null && handler_params.trim().length()>0) { - handlerParams = handler_params.split(","); - } else { - handlerParams = new String[0]; - } - - // handle job - JobHandleStatus _status = JobHandleStatus.FAIL; - String _msg = null; - try { - XxlJobFileAppender.contextHolder.set(log_id); - logger.info(">>>>>>>>>>> xxl-job handle start."); - _status = handler.execute(handlerParams); - } catch (Exception e) { - logger.info("HandlerThread Exception:", e); - StringWriter out = new StringWriter(); - e.printStackTrace(new PrintWriter(out)); - _msg = out.toString(); - } - logger.info(">>>>>>>>>>> xxl-job handle end, handlerParams:{}, _status:{}, _msg:{}", - new Object[]{handlerParams, _status, _msg}); - - // callback handler info - if (!toStop) { - HashMap params = new HashMap(); - params.put("log_id", log_id); - params.put("status", _status.name()); - params.put("msg", _msg); - HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params); - } else { - HashMap params = new HashMap(); - params.put("log_id", log_id); - params.put("status", JobHandleStatus.FAIL.name()); - params.put("msg", "人工手动终止[业务运行中,被强制终止]"); - HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params); - } - } - } catch (Exception e) { - logger.info("HandlerThread Exception:", e); - } - } - - // callback trigger request in queue - while(handlerDataQueue!=null && handlerDataQueue.size()>0){ - Map handlerData = handlerDataQueue.poll(); - if (handlerData!=null) { - String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name()); - String log_id = handlerData.get(HandlerParamEnum.LOG_ID.name()); - - HashMap params = new HashMap(); - params.put("log_id", log_id); - params.put("status", JobHandleStatus.FAIL.name()); - params.put("msg", "人工手动终止[任务尚未执行,在调度队列中被终止]"); - HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params); - } - } - - logger.info(">>>>>>>>>>>> xxl-job handlerThrad stoped, hashCode:{}", Thread.currentThread()); - } -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java index 150af12a..1e4c0f6f 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java @@ -1,10 +1,12 @@ package com.xxl.job.core.handler; +import com.xxl.job.core.router.HandlerRouter; + /** * remote job handler * @author xuxueli 2015-12-19 19:06:38 */ -public abstract class IJobHandler extends HandlerRepository{ +public abstract class IJobHandler extends HandlerRouter { /** * job handler

@@ -23,11 +25,7 @@ public abstract class IJobHandler extends HandlerRepository{ /** * handle fail */ - FAIL, - /** - * handle not found - */ - NOT_FOUND; + FAIL; } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index d6b94ad2..5dd6a7b9 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -1,27 +1,20 @@ package com.xxl.job.core.log; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.text.SimpleDateFormat; -import java.util.Date; - import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Layout; import org.apache.log4j.spi.LoggingEvent; +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.Date; + /** * store trigger log in each log-file * @author xuxueli 2016-3-12 19:25:12 */ public class XxlJobFileAppender extends AppenderSkeleton { - // for HandlerThread + // for JobThread public static ThreadLocal contextHolder = new ThreadLocal(); public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); @@ -112,8 +105,8 @@ public class XxlJobFileAppender extends AppenderSkeleton { * @param trigger_log_id * @return */ - public static String readLog(Date triggerDate, String trigger_log_id ){ - if (triggerDate==null || trigger_log_id==null || trigger_log_id.trim().length()==0) { + public static String readLog(Date triggerDate, int trigger_log_id ){ + if (triggerDate==null || trigger_log_id<=0) { return null; } @@ -131,7 +124,7 @@ public class XxlJobFileAppender extends AppenderSkeleton { } // filePath/yyyy-MM-dd/9999.log - String logFileName = trigger_log_id.concat(".log"); + String logFileName = String.valueOf(trigger_log_id).concat(".log"); File logFile = new File(filePathDateDir, logFileName); if (!logFile.exists()) { return null; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java new file mode 100644 index 00000000..b1b366d9 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java @@ -0,0 +1,99 @@ +package com.xxl.job.core.router; + +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.router.action.BeatAction; +import com.xxl.job.core.router.action.KillAction; +import com.xxl.job.core.router.action.LogAction; +import com.xxl.job.core.router.action.RunAction; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.router.thread.JobThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * handler repository + * @author xuxueli 2015-12-19 19:28:44 + */ +public class HandlerRouter { + private static Logger logger = LoggerFactory.getLogger(HandlerRouter.class); + + /** + * job handler repository + */ + private static ConcurrentHashMap jobHandlerRepository = new ConcurrentHashMap(); + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ + logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); + return HandlerRouter.jobHandlerRepository.put(name, jobHandler); + } + public static IJobHandler loadJobHandler(String name){ + return HandlerRouter.jobHandlerRepository.get(name); + } + + /** + * job thread repository + */ + private static ConcurrentHashMap JobThreadRepository = new ConcurrentHashMap(); + public static JobThread registJobThread(String jobkey, IJobHandler handler){ + JobThread handlerThread = new JobThread(handler); + handlerThread.start(); + logger.info(">>>>>>>>>>> xxl-job regist handler success, jobkey:{}, handler:{}", new Object[]{jobkey, handler}); + return HandlerRouter.JobThreadRepository.put(jobkey, handlerThread); // putIfAbsent + } + public static JobThread loadJobThread(String jobKey){ + return HandlerRouter.JobThreadRepository.get(jobKey); + } + + /** + * route action repository + */ + public enum ActionRepository { + RUN(new RunAction()), + KILL(new KillAction()), + LOG(new LogAction()), + BEAT(new BeatAction()); + + private IAction action; + private ActionRepository(IAction action){ + this.action = action; + } + + /** + * match Action by enum name + * @param name + * @return + */ + public static IAction matchAction(String name){ + if (name!=null && name.trim().length()>0) { + for (ActionRepository item : ActionRepository.values()) { + if (item.name().equals(name)) { + return item.action; + } + } + } + return null; + } + + } + + // handler push to queue + public static ResponseModel route(RequestModel requestModel) { + logger.debug(">>>>>>>>>>> xxl-job route, RequestModel:{}", new Object[]{requestModel.toString()}); + + // timestamp check + if (System.currentTimeMillis() - requestModel.getTimestamp() > 60000) { + return new ResponseModel(ResponseModel.SUCCESS, "Timestamp Timeout."); + } + + // match action + IAction action = ActionRepository.matchAction(requestModel.getAction()); + if (action == null) { + return new ResponseModel(ResponseModel.SUCCESS, "Action match fail."); + } + + return action.execute(requestModel); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java new file mode 100644 index 00000000..3de0469b --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java @@ -0,0 +1,13 @@ +package com.xxl.job.core.router; + +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; + +/** + * Created by xuxueli on 16/7/22. + */ +public abstract class IAction { + + public abstract ResponseModel execute(RequestModel requestModel); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java new file mode 100644 index 00000000..3bc33c9b --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java @@ -0,0 +1,17 @@ +package com.xxl.job.core.router.action; + +import com.xxl.job.core.router.IAction; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; + +/** + * Created by xuxueli on 16/7/22. + */ +public class BeatAction extends IAction { + + @Override + public ResponseModel execute(RequestModel requestModel) { + return new ResponseModel(ResponseModel.SUCCESS, "i am alive."); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java new file mode 100644 index 00000000..a1984af3 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java @@ -0,0 +1,35 @@ +package com.xxl.job.core.router.action; + +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.router.HandlerRouter; +import com.xxl.job.core.router.IAction; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.router.thread.JobThread; + +/** + * Created by xuxueli on 16/7/22. + */ +public class KillAction extends IAction { + + @Override + public ResponseModel execute(RequestModel requestModel) { + + // generate jobKey + String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName()); + + // kill handlerThread, and create new one + JobThread jobThread = HandlerRouter.loadJobThread(jobKey); + + if (jobThread != null) { + IJobHandler handler = jobThread.getHandler(); + jobThread.toStop("人工手动终止"); + jobThread.interrupt(); + HandlerRouter.registJobThread(jobKey, handler); + return new ResponseModel(ResponseModel.SUCCESS, "job thread kull success."); + } + + return new ResponseModel(ResponseModel.FAIL, "job thread not found."); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java new file mode 100644 index 00000000..8c9923a5 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java @@ -0,0 +1,21 @@ +package com.xxl.job.core.router.action; + +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.router.IAction; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; + +import java.util.Date; + +/** + * Created by xuxueli on 16/7/22. + */ +public class LogAction extends IAction { + + @Override + public ResponseModel execute(RequestModel requestModel) { + String logConteng = XxlJobFileAppender.readLog(new Date(requestModel.getLogDateTim()), requestModel.getLogId()); + return new ResponseModel(ResponseModel.SUCCESS, logConteng); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java new file mode 100644 index 00000000..7fea5bae --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java @@ -0,0 +1,66 @@ +package com.xxl.job.core.router.action; + +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.impl.GlueJobHandler; +import com.xxl.job.core.router.HandlerRouter; +import com.xxl.job.core.router.IAction; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.router.thread.JobThread; + +/** + * Created by xuxueli on 16/7/22. + */ +public class RunAction extends IAction { + + @Override + public ResponseModel execute(RequestModel requestModel) { + + // generate jobKey + String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName()); + + // load old thread + JobThread jobThread = HandlerRouter.loadJobThread(jobKey); + + if (!requestModel.isGlueSwitch()) { + // bean model + + // handler instance + IJobHandler jobHandler = HandlerRouter.loadJobHandler(requestModel.getExecutorHandler()); + + if (jobThread == null) { + // jobhandler match + if (jobHandler==null) { + return new ResponseModel(ResponseModel.FAIL, "job handler for jobKey=[" + jobKey + "] not found."); + } + jobThread = HandlerRouter.registJobThread(jobKey, jobHandler); + } else { + + // job handler update, kill old job thread + if (jobThread.getHandler() != jobHandler) { + + // kill old job thread + jobThread.toStop("人工手动终止"); + jobThread.interrupt(); + + // new thread, with new job handler + jobThread = HandlerRouter.registJobThread(jobKey, jobHandler); + } + } + } else { + // glue model + + if (jobThread == null) { + jobThread = HandlerRouter.registJobThread(jobKey, new GlueJobHandler(requestModel.getJobGroup(), requestModel.getJobName())); + } + } + + // sometime, cmap.get can not return given value, i do not know why + jobThread = HandlerRouter.loadJobThread(jobKey); + + // push data to queue + jobThread.pushTriggerQueue(requestModel); + return new ResponseModel(ResponseModel.SUCCESS, null); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java new file mode 100644 index 00000000..91badd0c --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java @@ -0,0 +1,140 @@ +package com.xxl.job.core.router.model; + +/** + * Created by xuxueli on 16/7/22. + */ +public class RequestModel { + + private long timestamp; + private String action; + + private String jobGroup; + private String jobName; + + private String executorHandler; + private String executorParams; + + private boolean glueSwitch; + + private String logAddress; + private int logId; + private long logDateTim; + + private String status; + private String msg; + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public String getJobGroup() { + return jobGroup; + } + + public void setJobGroup(String jobGroup) { + this.jobGroup = jobGroup; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getExecutorHandler() { + return executorHandler; + } + + public void setExecutorHandler(String executorHandler) { + this.executorHandler = executorHandler; + } + + public String getExecutorParams() { + return executorParams; + } + + public void setExecutorParams(String executorParams) { + this.executorParams = executorParams; + } + + public boolean isGlueSwitch() { + return glueSwitch; + } + + public void setGlueSwitch(boolean glueSwitch) { + this.glueSwitch = glueSwitch; + } + + public String getLogAddress() { + return logAddress; + } + + public void setLogAddress(String logAddress) { + this.logAddress = logAddress; + } + + public int getLogId() { + return logId; + } + + public void setLogId(int logId) { + this.logId = logId; + } + + public long getLogDateTim() { + return logDateTim; + } + + public void setLogDateTim(long logDateTim) { + this.logDateTim = logDateTim; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + @Override + public String toString() { + return "RequestModel{" + + "timestamp=" + timestamp + + ", action='" + action + '\'' + + ", jobGroup='" + jobGroup + '\'' + + ", jobName='" + jobName + '\'' + + ", executorHandler='" + executorHandler + '\'' + + ", executorParams='" + executorParams + '\'' + + ", glueSwitch=" + glueSwitch + + ", logAddress='" + logAddress + '\'' + + ", logId=" + logId + + ", logDateTim=" + logDateTim + + ", status='" + status + '\'' + + ", msg='" + msg + '\'' + + '}'; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java new file mode 100644 index 00000000..ab202735 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java @@ -0,0 +1,45 @@ +package com.xxl.job.core.router.model; + +/** + * Created by xuxueli on 16/7/22. + */ +public class ResponseModel { + public static final String SUCCESS = "SUCCESS"; + public static final String FAIL = "FAIL"; + + private String status; + private String msg; + + public ResponseModel() { + } + + public ResponseModel(String status, String msg) { + this.status = status; + this.msg = msg; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + @Override + public String toString() { + return "ResponseModel{" + + "status='" + status + '\'' + + ", msg='" + msg + '\'' + + '}'; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java new file mode 100644 index 00000000..3a9dcee5 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java @@ -0,0 +1,128 @@ +package com.xxl.job.core.router.thread; + +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.IJobHandler.JobHandleStatus; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.util.XxlJobNetCommUtil; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * handler thread + * @author xuxueli 2016-1-16 19:52:47 + */ +public class JobThread extends Thread{ + private static Logger logger = LoggerFactory.getLogger(JobThread.class); + + private IJobHandler handler; + private LinkedBlockingQueue triggerQueue; + private ConcurrentHashSet triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID + + private boolean toStop = false; + private String stopReason; + + public JobThread(IJobHandler handler) { + this.handler = handler; + triggerQueue = new LinkedBlockingQueue(); + triggerLogIdSet = new ConcurrentHashSet(); + } + public IJobHandler getHandler() { + return handler; + } + + public void pushTriggerQueue(RequestModel requestModel) { + if (triggerLogIdSet.contains(requestModel.getLogId())) { + logger.info("repeate trigger job, logId:{}", requestModel.getLogId()); + return; + } + + triggerLogIdSet.add(requestModel.getLogId()); + triggerQueue.add(requestModel); + } + + public void toStop(String stopReason) { + /** + * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), + * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; + * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; + */ + this.toStop = true; + this.stopReason = stopReason; + } + + + + int i = 1; + @Override + public void run() { + while(!toStop){ + try { + // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + RequestModel triggerDate = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerDate!=null) { + triggerLogIdSet.remove(triggerDate.getLogId()); + + // parse param + String[] handlerParams = (triggerDate.getExecutorParams()!=null && triggerDate.getExecutorParams().trim().length()>0) + ? (String[])(Arrays.asList(triggerDate.getExecutorParams().split(",")).toArray()) : null; + + // handle job + JobHandleStatus _status = JobHandleStatus.FAIL; + String _msg = null; + + try { + XxlJobFileAppender.contextHolder.set(String.valueOf(triggerDate.getLogId())); + logger.info("----------- xxl-job job handle start -----------"); + _status = handler.execute(handlerParams); + } catch (Exception e) { + logger.info("JobThread Exception:", e); + StringWriter out = new StringWriter(); + e.printStackTrace(new PrintWriter(out)); + _msg = out.toString(); + } + logger.info("----------- xxl-job job handle end -----------
: ExecutorParams:{}, Status:{}, Msg:{}", + new Object[]{handlerParams, _status, _msg}); + + // callback handler info + if (!toStop) { + // commonm + triggerDate.setStatus(_status.name()); + triggerDate.setMsg(_msg); + TriggerCallbackThread.pushCallBack(triggerDate); + } else { + // is killed + triggerDate.setStatus(JobHandleStatus.FAIL.name()); + triggerDate.setMsg(stopReason + "人工手动终止[业务运行中,被强制终止]"); + TriggerCallbackThread.pushCallBack(triggerDate); + } + } + } catch (Exception e) { + logger.info("JobThread Exception:", e); + } + } + + // callback trigger request in queue + while(triggerQueue !=null && triggerQueue.size()>0){ + RequestModel triggerDate = triggerQueue.poll(); + if (triggerDate!=null) { + // is killed + RequestModel callback = new RequestModel(); + callback.setLogAddress(XxlJobNetCommUtil.addressToUrl(triggerDate.getLogAddress())); + callback.setLogId(triggerDate.getLogId()); + callback.setStatus(JobHandleStatus.FAIL.name()); + callback.setMsg(stopReason + "[任务尚未执行,在调度队列中被终止]"); + TriggerCallbackThread.pushCallBack(callback); + } + } + + logger.info(">>>>>>>>>>>> xxl-job handlerThrad stoped, hashCode:{}", Thread.currentThread()); + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java new file mode 100644 index 00000000..fd0352f5 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java @@ -0,0 +1,44 @@ +package com.xxl.job.core.router.thread; + +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.util.XxlJobNetCommUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Created by xuxueli on 16/7/22. + */ +public class TriggerCallbackThread { + private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); + + private static LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); + static { + new Thread(new Runnable() { + @Override + public void run() { + while(true){ + try { + RequestModel callback = callBackQueue.take(); + if (callback != null) { + try { + ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(callback.getLogAddress()), callback); + logger.info(">>>>>>>>>>> xxl-job callback , RequestModel:{}, ResponseModel:{}", new Object[]{callback.toString(), responseModel.toString()}); + } catch (Exception e) { + logger.info("JobThread Exception:", e); + } + } + } catch (Exception e) { + logger.error("", e); + } + } + } + }).start(); + } + public static void pushCallBack(RequestModel callback){ + callBackQueue.add(callback); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java new file mode 100644 index 00000000..85000317 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java @@ -0,0 +1,59 @@ +package com.xxl.job.core.util; + +import java.math.BigInteger; + +/** + * hex/byte util + * @author xuxueli 2015-11-14 22:47:28 + */ +public class ByteHexConverter { + + /** + * byte - to - radix, use BigInteger + */ + private static final String hex_tables = "0123456789ABCDEF"; + public static String byte2hex (byte[] iBytes) { + StringBuilder hex = new StringBuilder(iBytes.length * 2); + for (int index = 0; index < iBytes.length; index++) { + hex.append(hex_tables.charAt((iBytes[index] & 0xf0) >> 4)); + hex.append(hex_tables.charAt((iBytes[index] & 0x0f) >> 0)); + } + return hex.toString(); + } + public static byte[] hex2Byte(String hexString) { + if (hexString == null || hexString.equals("")) { + return null; + } + byte[] res = new byte[hexString.length() / 2]; + char[] chs = hexString.toCharArray(); + for (int i = 0, c = 0; i < chs.length; i += 2, c++) { + res[c] = (byte) (Integer.parseInt(new String(chs, i, 2), 16)); + } + return res; + } + + /** + * byte - to - radix, use BigInteger + */ + public static final int HEX = 16; + public static String byte2radix(byte[] iBytes, int radix){ + return new BigInteger(1, iBytes).toString(radix); + } + public static byte[] radix2byte(String val, int radix){ + return new BigInteger(val, radix).toByteArray(); + } + + public static void main(String[] args) { + // hex - byte[] 方案A:位移 + String temp = "1111111111113d1f3a51sd3f1a32sd1f32as1df2a13sd21f3a2s1df32a13sd2f123s2a3d13fa13sd9999999999"; + System.out.println("明文:" + new String(temp.getBytes())); + System.out.println("编码:" + byte2hex(temp.getBytes())); + System.out.println("解码:" + new String(hex2Byte(byte2hex(temp.getBytes())))); + + // hex - byte[] 方案B:BigInteger + System.out.println("编码:" + byte2radix(temp.getBytes(), HEX)); + System.out.println("解码:" + new String(radix2byte(byte2radix(temp.getBytes(), HEX), HEX))); + + } + +} \ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpUtil.java deleted file mode 100644 index 64508e7d..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpUtil.java +++ /dev/null @@ -1,130 +0,0 @@ -package com.xxl.job.core.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.NameValuePair; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; - -/** - * http util to send data - * @author xuxueli - * @version 2015-11-28 15:30:59 - */ -public class HttpUtil { - - /** - * http remote callback - */ - public static class RemoteCallBack{ - public static final String SUCCESS = "SUCCESS"; - public static final String FAIL = "FAIL"; - - private String status; - private String msg; - public void setStatus(String status) { - this.status = status; - } - public String getStatus() { - return status; - } - public void setMsg(String msg) { - this.msg = msg; - } - public String getMsg() { - return msg; - } - - @Override - public String toString() { - return "RemoteCallBack [status=" + status + ", msg=" + msg + "]"; - } - - } - - /** - * http post request - * @param reqURL - * @param params - * @return [0]=responseMsg, [1]=exceptionMsg - */ - public static RemoteCallBack post(String reqURL, Map params){ - RemoteCallBack callback = new RemoteCallBack(); - callback.setStatus(RemoteCallBack.FAIL); - - // do post - HttpPost httpPost = null; - CloseableHttpClient httpClient = null; - try{ - httpPost = new HttpPost(reqURL); - if (params != null && !params.isEmpty()) { - List formParams = new ArrayList(); - for(Map.Entry entry : params.entrySet()){ - formParams.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); - } - httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8")); - } - RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build(); - httpPost.setConfig(requestConfig); - - //httpClient = HttpClients.createDefault(); // default retry 3 times - httpClient = HttpClients.custom().disableAutomaticRetries().build(); - - HttpResponse response = httpClient.execute(httpPost); - HttpEntity entity = response.getEntity(); - if (response.getStatusLine().getStatusCode() == 200) { - if (null != entity) { - String responseMsg = EntityUtils.toString(entity, "UTF-8"); - callback = JacksonUtil.readValue(responseMsg, RemoteCallBack.class); - if (callback == null) { - callback = new RemoteCallBack(); - callback.setStatus(RemoteCallBack.FAIL); - callback.setMsg("responseMsg parse json fail, responseMsg:" + responseMsg); - } - EntityUtils.consume(entity); - } - } else { - callback.setMsg("http statusCode error, statusCode:" + response.getStatusLine().getStatusCode()); - } - } catch (Exception e) { - e.printStackTrace(); - /*StringWriter out = new StringWriter(); - e.printStackTrace(new PrintWriter(out)); - callback.setMsg(out.toString());*/ - callback.setMsg(e.getMessage()); - } finally{ - if (httpPost!=null) { - httpPost.releaseConnection(); - } - if (httpClient!=null) { - try { - httpClient.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - return callback; - } - - /** - * parse address ip:port to url http://.../ - * @param address - * @return - */ - public static String addressToUrl(String address){ - return "http://" + address + "/"; - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java new file mode 100644 index 00000000..101c51dc --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java @@ -0,0 +1,141 @@ +package com.xxl.job.core.util; + +import com.xxl.job.core.router.model.RequestModel; +import com.xxl.job.core.router.model.ResponseModel; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * http util to send data + * @author xuxueli + * @version 2015-11-28 15:30:59 + */ +public class XxlJobNetCommUtil { + private static Logger logger = LoggerFactory.getLogger(XxlJobNetCommUtil.class); + + // hex param key + public static final String HEX = "hex"; + + /** + * format object to hex-json + * @param obj + * @return + */ + public static String formatObj2HexJson(Object obj){ + String json = JacksonUtil.writeValueAsString(obj); + String hex = ByteHexConverter.byte2hex(json.getBytes()); + return hex; + } + + /** + * parse hex-json to object + * @param hex + * @param clazz + * @return + */ + public static T parseHexJson2Obj(String hex, Class clazz){ + String json = new String(ByteHexConverter.hex2Byte(hex)); + T obj = JacksonUtil.readValue(json, clazz); + return obj; + } + + public static void main(String[] args) { + System.out.println(parseHexJson2Obj("7B22737461747573223A2253554343455353222C226D7367223A2254696D657374616D702054696D656F75742E227D", ResponseModel.class)); + } + + /** + * http post request + * @param reqURL + */ + public static ResponseModel postHex(String reqURL, RequestModel requestModel){ + + // parse RequestModel to hex-json + String requestHex = XxlJobNetCommUtil.formatObj2HexJson(requestModel); + + // msg + String failMsg = null; + + // do post + HttpPost httpPost = null; + CloseableHttpClient httpClient = null; + try{ + httpPost = new HttpPost(reqURL); + List formParams = new ArrayList(); + formParams.add(new BasicNameValuePair(XxlJobNetCommUtil.HEX, requestHex)); + httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8")); + + + RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build(); + httpPost.setConfig(requestConfig); + + //httpClient = HttpClients.createDefault(); // default retry 3 times + httpClient = HttpClients.custom().disableAutomaticRetries().build(); + + HttpResponse response = httpClient.execute(httpPost); + HttpEntity entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() == 200 && null != entity) { + String responseHex = EntityUtils.toString(entity, "UTF-8"); + EntityUtils.consume(entity); + + // i do not know why + responseHex = responseHex.replace("\n", ""); + + // parse hex-json to ResponseModel + ResponseModel responseModel = XxlJobNetCommUtil.parseHexJson2Obj(responseHex, ResponseModel.class); + + if (responseModel!=null) { + return responseModel; + } + } else { + failMsg = "http statusCode error, statusCode:" + response.getStatusLine().getStatusCode(); + } + } catch (Exception e) { + logger.info("", e); + /*StringWriter out = new StringWriter(); + e.printStackTrace(new PrintWriter(out)); + callback.setMsg(out.toString());*/ + failMsg = e.getMessage(); + } finally{ + if (httpPost!=null) { + httpPost.releaseConnection(); + } + if (httpClient!=null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.info("", e); + } + } + } + + // other, default fail + ResponseModel callback = new ResponseModel(); + callback.setStatus(ResponseModel.FAIL); + callback.setMsg(failMsg); + return callback; + } + + /** + * parse address ip:port to url http://.../ + * @param address + * @return + */ + public static String addressToUrl(String address){ + return "http://" + address + "/"; + } + +} -- GitLab