提交 8b71e2ce 编写于 作者: X xueli.xue

init

上级 39a0205e
......@@ -6,10 +6,10 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.core.handler.HandlerRepository.ActionEnum;
import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum;
import com.xxl.job.core.util.HttpUtil;
import com.xxl.job.core.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.router.HandlerRouter.ActionRepository;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.stereotype.Controller;
......@@ -110,22 +110,22 @@ public class JobLogController {
if (log == null) {
return new ReturnT<String>(500, "参数异常");
}
if (!RemoteCallBack.SUCCESS.equals(log.getTriggerStatus())) {
if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) {
return new ReturnT<String>(500, "调度失败,无法查看执行日志");
}
// trigger id, trigger time
Map<String, String> reqMap = new HashMap<String, String>();
reqMap.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis()));
reqMap.put(HandlerParamEnum.ACTION.name(), ActionEnum.LOG.name());
reqMap.put(HandlerParamEnum.LOG_ID.name(), String.valueOf(id));
reqMap.put(HandlerParamEnum.LOG_DATE.name(), String.valueOf(log.getTriggerTime().getTime()));
RemoteCallBack callBack = HttpUtil.post(HttpUtil.addressToUrl(log.getExecutorAddress()), reqMap);
if (HttpUtil.RemoteCallBack.SUCCESS.equals(callBack.getStatus())) {
return new ReturnT<String>(callBack.getMsg());
RequestModel requestModel = new RequestModel();
requestModel.setTimestamp(System.currentTimeMillis());
requestModel.setAction(ActionRepository.LOG.name());
requestModel.setLogId(id);
requestModel.setLogDateTim(log.getTriggerTime().getTime());
ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel);
if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) {
return new ReturnT<String>(responseModel.getMsg());
} else {
return new ReturnT<String>(500, callBack.getMsg());
return new ReturnT<String>(500, responseModel.getMsg());
}
}
......@@ -145,27 +145,26 @@ public class JobLogController {
if (log == null || jobInfo==null) {
return new ReturnT<String>(500, "参数异常");
}
if (!RemoteCallBack.SUCCESS.equals(log.getTriggerStatus())) {
if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) {
return new ReturnT<String>(500, "调度失败,无法终止日志");
}
// request
Map<String, String> reqMap = new HashMap<String, String>();
reqMap.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis()));
reqMap.put(HandlerParamEnum.ACTION.name(), ActionEnum.KILL.name());
reqMap.put(HandlerParamEnum.JOB_GROUP.name(), log.getJobGroup());
reqMap.put(HandlerParamEnum.JOB_NAME.name(), log.getJobName());
reqMap.put(HandlerParamEnum.GLUE_SWITCH.name(), String.valueOf(jobInfo.getGlueSwitch()));
// request of kill
RequestModel requestModel = new RequestModel();
requestModel.setTimestamp(System.currentTimeMillis());
requestModel.setAction(ActionRepository.KILL.name());
requestModel.setJobGroup(log.getJobGroup());
requestModel.setJobName(log.getJobName());
RemoteCallBack callBack = HttpUtil.post(HttpUtil.addressToUrl(log.getExecutorAddress()), reqMap);
if (HttpUtil.RemoteCallBack.SUCCESS.equals(callBack.getStatus())) {
log.setHandleStatus(HttpUtil.RemoteCallBack.FAIL);
ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel);
if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) {
log.setHandleStatus(ResponseModel.FAIL);
log.setHandleMsg("人为操作主动终止");
log.setHandleTime(new Date());
xxlJobLogDao.updateHandleInfo(log);
return new ReturnT<String>(callBack.getMsg());
return new ReturnT<String>(responseModel.getMsg());
} else {
return new ReturnT<String>(500, callBack.getMsg());
return new ReturnT<String>(500, responseModel.getMsg());
}
}
}
package com.xxl.job.admin.core.callback;
import java.io.IOException;
import java.util.Date;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.core.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.util.JacksonUtil;
import java.io.IOException;
import java.util.Date;
/**
* Created by xuxueli on 2016-5-22 11:15:42
......@@ -27,31 +25,31 @@ public class XxlJobLogCallbackServerHandler extends AbstractHandler {
httpServletRequest.setCharacterEncoding("UTF-8");
httpServletResponse.setCharacterEncoding("UTF-8");
// parse param
String log_id = httpServletRequest.getParameter("log_id");
String status = httpServletRequest.getParameter("status");
String msg = httpServletRequest.getParameter("msg");
// parse hex-json to request model
String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX);
RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class);
// process
RemoteCallBack callBack = new RemoteCallBack();
callBack.setStatus(RemoteCallBack.FAIL);
if (StringUtils.isNumeric(log_id) && StringUtils.isNotBlank(status)) {
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(Integer.valueOf(log_id));
if (log!=null) {
log.setHandleTime(new Date());
log.setHandleStatus(status);
log.setHandleMsg(msg);
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
callBack.setStatus(RemoteCallBack.SUCCESS);
}
ResponseModel responseModel = null;
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId());
if (log!=null) {
log.setHandleTime(new Date());
log.setHandleStatus(requestModel.getStatus());
log.setHandleMsg(requestModel.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
responseModel = new ResponseModel(ResponseModel.SUCCESS, null);
} else {
responseModel = new ResponseModel(ResponseModel.FAIL, "log item not found.");
}
String resp = JacksonUtil.writeValueAsString(callBack);
// format response model to hex-json
String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel);
// response
httpServletResponse.setContentType("text/html;charset=utf-8");
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
httpServletResponse.getWriter().println(resp);
httpServletResponse.getWriter().println(responseHex);
}
}
package com.xxl.job.admin.core.jobbean;
//package com.xxl.job.service.job;
//package com.xxl.job.action.job;
//
//import java.io.PrintWriter;
//import java.io.StringWriter;
......@@ -15,8 +15,8 @@ package com.xxl.job.admin.core.jobbean;
//import org.slf4j.LoggerFactory;
//import org.springframework.scheduling.quartz.QuartzJobBean;
//
//import com.xxl.job.client.handler.HandlerRepository;
//import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
//import com.xxl.job.client.handler.HandlerRouter;
//import com.xxl.job.client.util.XxlJobNetCommUtil.RemoteCallBack;
//import com.xxl.job.client.util.JacksonUtil;
//import com.xxl.job.core.model.XxlJobInfo;
//import com.xxl.job.core.model.XxlJobLog;
......@@ -55,7 +55,7 @@ package com.xxl.job.admin.core.jobbean;
// logger.info(">>>>>>>>>>> xxl-job trigger start, jobLog:{}", jobLog);
//
// // trigger request
// String handler_params = jobDataMap.get(HandlerRepository.HANDLER_PARAMS);
// String handler_params = jobDataMap.get(HandlerRouter.HANDLER_PARAMS);
// String[] handlerParams = null;
// if (StringUtils.isNotBlank(handler_params)) {
// handlerParams = handler_params.split(",");
......@@ -72,7 +72,7 @@ package com.xxl.job.admin.core.jobbean;
// jobLog.setHandleStatus(RemoteCallBack.SUCCESS);
// jobLog.setHandleMsg(JacksonUtil.writeValueAsString(responseMsg));
// } catch (Exception e) {
// logger.info("HandlerThread Exception:", e);
// logger.info("JobThread Exception:", e);
// StringWriter out = new StringWriter();
// e.printStackTrace(new PrintWriter(out));
//
......
......@@ -5,10 +5,10 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.core.handler.HandlerRepository.ActionEnum;
import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum;
import com.xxl.job.core.util.HttpUtil;
import com.xxl.job.core.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.router.HandlerRouter.ActionRepository;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
......@@ -18,7 +18,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.text.MessageFormat;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/**
* http job bean
......@@ -43,30 +46,27 @@ public class RemoteHttpJobBean extends QuartzJobBean {
logger.info(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// trigger request
HashMap<String, String> params = new HashMap<String, String>();
params.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis()));
params.put(HandlerParamEnum.ACTION.name(), ActionEnum.RUN.name());
params.put(HandlerParamEnum.LOG_ADDRESS.name(), XxlJobLogCallbackServer.getTrigger_log_address());
params.put(HandlerParamEnum.LOG_ID.name(), String.valueOf(jobLog.getId()));
params.put(HandlerParamEnum.JOB_GROUP.name(), jobInfo.getJobGroup());
params.put(HandlerParamEnum.JOB_NAME.name(), jobInfo.getJobName());
params.put(HandlerParamEnum.EXECUTOR_HANDLER.name(), jobInfo.getExecutorHandler());
params.put(HandlerParamEnum.EXECUTOR_PARAMS.name(), jobInfo.getExecutorParam());
params.put(HandlerParamEnum.GLUE_SWITCH.name(), String.valueOf(jobInfo.getGlueSwitch()));
RequestModel requestModel = new RequestModel();
requestModel.setTimestamp(System.currentTimeMillis());
requestModel.setAction(ActionRepository.RUN.name());
requestModel.setJobGroup(jobInfo.getJobGroup());
requestModel.setJobName(jobInfo.getJobName());
requestModel.setExecutorHandler(jobInfo.getExecutorHandler());
requestModel.setExecutorParams(jobInfo.getExecutorParam());
requestModel.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
requestModel.setLogAddress(XxlJobLogCallbackServer.getTrigger_log_address());
requestModel.setLogId(jobLog.getId());
// failover trigger
RemoteCallBack callback = failoverTrigger(jobInfo.getExecutorAddress(), params, jobLog);
ResponseModel responseModel = failoverTrigger(jobInfo.getExecutorAddress(), requestModel, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, callback:{}", jobLog.getId(), callback);
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString());
// update trigger info
jobLog.setTriggerTime(new Date());
jobLog.setTriggerStatus(callback.getStatus());
jobLog.setTriggerMsg(callback.getMsg());
jobLog.setTriggerStatus(responseModel.getStatus());
jobLog.setTriggerMsg(responseModel.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog);
// monitor triger
......@@ -81,7 +81,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
* @param handler_address
* @return
*/
public RemoteCallBack failoverTrigger(String handler_address, HashMap<String, String> handler_params, XxlJobLog jobLog){
public ResponseModel failoverTrigger(String handler_address, RequestModel requestModel, XxlJobLog jobLog){
if (handler_address.split(",").length > 1) {
// for ha
......@@ -92,33 +92,43 @@ public class RemoteHttpJobBean extends QuartzJobBean {
String failoverMessage = "";
for (String address : addressList) {
if (StringUtils.isNotBlank(address)) {
HashMap<String, String> params = new HashMap<String, String>();
params.put(HandlerParamEnum.TIMESTAMP.name(), String.valueOf(System.currentTimeMillis()));
params.put(HandlerParamEnum.ACTION.name(), ActionEnum.BEAT.name());
RemoteCallBack beatResult = HttpUtil.post(HttpUtil.addressToUrl(address), params);
// beat check
RequestModel beatRequest = new RequestModel();
beatRequest.setTimestamp(System.currentTimeMillis());
beatRequest.setAction(ActionRepository.BEAT.name());
ResponseModel beatResult = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), beatRequest);
failoverMessage += MessageFormat.format("BEAT running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, beatResult.getStatus(), beatResult.getMsg());
if (RemoteCallBack.SUCCESS.equals(beatResult.getStatus())) {
// beat success, trigger do
if (beatResult.SUCCESS.equals(beatResult.getStatus())) {
// store real address
jobLog.setExecutorAddress(address);
RemoteCallBack triggerCallback = HttpUtil.post(HttpUtil.addressToUrl(address), handler_params);
triggerCallback.setStatus(RemoteCallBack.SUCCESS);
// real trigger
ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel);
failoverMessage += MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, triggerCallback.getStatus(), triggerCallback.getMsg());
triggerCallback.setMsg(failoverMessage);
return triggerCallback;
}
}
}
RemoteCallBack result = new RemoteCallBack();
result.setStatus(RemoteCallBack.FAIL);
ResponseModel result = new ResponseModel();
result.setStatus(ResponseModel.FAIL);
result.setMsg(failoverMessage);
return result;
} else {
// store real address
jobLog.setExecutorAddress(handler_address);
RemoteCallBack triggerCallback = HttpUtil.post(HttpUtil.addressToUrl(handler_address), handler_params);
ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(handler_address), requestModel);
String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", handler_address, triggerCallback.getStatus(), triggerCallback.getMsg());
triggerCallback.setMsg(failoverMessage);
return triggerCallback;
}
}
}
\ No newline at end of file
package com.xxl.job.admin.core.jobbean.impl;
//package com.xxl.job.service.job.impl;
//package com.xxl.job.action.job.impl;
//
//import java.util.concurrent.TimeUnit;
//
//import org.quartz.DisallowConcurrentExecution;
//
//import com.xxl.job.service.job.LocalNomalJobBean;
//import com.xxl.job.action.job.LocalNomalJobBean;
//
///**
// * demo job bean for no-concurrent
......
package com.xxl.job.admin.core.jobbean.impl;
//package com.xxl.job.service.job.impl;
//package com.xxl.job.action.job.impl;
//
//import java.util.concurrent.TimeUnit;
//
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import com.xxl.job.service.job.LocalNomalJobBean;
//import com.xxl.job.action.job.LocalNomalJobBean;
//
///**
// * demo job bean for concurrent
......
......@@ -4,7 +4,7 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.admin.core.util.MailUtil;
import com.xxl.job.core.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.router.model.ResponseModel;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,7 +40,7 @@ public class JobMonitorHelper {
logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId);
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId);
if (log!=null) {
if (RemoteCallBack.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) {
if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
......@@ -48,10 +48,10 @@ public class JobMonitorHelper {
}
JobMonitorHelper.monitor(jobLogId);
}
if (RemoteCallBack.SUCCESS.equals(log.getTriggerStatus()) && RemoteCallBack.SUCCESS.equals(log.getHandleStatus())) {
if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && ResponseModel.SUCCESS.equals(log.getHandleStatus())) {
// pass
}
if (RemoteCallBack.FAIL.equals(log.getTriggerStatus()) || RemoteCallBack.FAIL.equals(log.getHandleStatus())) {
if (ResponseModel.FAIL.equals(log.getTriggerStatus()) || ResponseModel.FAIL.equals(log.getHandleStatus())) {
XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
......
......@@ -5,7 +5,7 @@ import java.util.Map;
import com.xxl.job.admin.core.model.ReturnT;
/**
* core job service for xxl-job
* core job action for xxl-job
*
* @author xuxueli 2016-5-28 15:30:33
*/
......
......@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
/**
* core job service for xxl-job
* core job action for xxl-job
* @author xuxueli 2016-5-28 15:30:33
*/
@Service
......
......@@ -265,7 +265,9 @@ $(function() {
$.post(base_url + "/jobinfo/add", $("#addModal .form").serialize(), function(data, status) {
if (data.code == "200") {
ComAlert.show(1, "新增任务成功", function(){
window.location.reload();
//window.location.reload();
$('#addModal').modal('hide');
jobTable.fnDraw();
});
} else {
if (data.msg) {
......@@ -395,7 +397,9 @@ $(function() {
$.post(base_url + "/jobinfo/reschedule", $("#updateModal .form").serialize(), function(data, status) {
if (data.code == "200") {
ComAlert.show(1, "更新成功", function(){
window.location.reload();
//window.location.reload();
$('#updateModal').modal('hide');
jobTable.fnDraw();
});
} else {
if (data.msg) {
......
......@@ -71,6 +71,7 @@ $(function() {
var obj = {};
obj.jobGroup = $('#jobGroup').val();
obj.jobName = $('#jobName').val();
obj.filterTime = $('#filterTime').val();
obj.start = d.start;
obj.length = d.length;
return obj;
......
package com.xxl.job.dao.impl;
import java.util.List;
import javax.annotation.Resource;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import javax.annotation.Resource;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:applicationcontext-*.xml")
......
package com.xxl.job.dao.impl;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.router.model.ResponseModel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.util.HttpUtil.RemoteCallBack;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:applicationcontext-*.xml")
......@@ -38,7 +36,7 @@ public class XxlJobLogTest {
public void updateTriggerInfo(){
XxlJobLog xxlJobLog = xxlJobLogDao.load(29);
xxlJobLog.setTriggerTime(new Date());
xxlJobLog.setTriggerStatus(RemoteCallBack.SUCCESS);
xxlJobLog.setTriggerStatus(ResponseModel.SUCCESS);
xxlJobLog.setTriggerMsg("trigger msg");
xxlJobLogDao.updateTriggerInfo(xxlJobLog);
}
......
package com.xxl.job.core.executor.jetty;
import com.xxl.job.core.handler.HandlerRepository;
import com.xxl.job.core.router.HandlerRouter;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander;
import org.eclipse.jetty.server.Connector;
......@@ -81,7 +81,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
/**
* init job handler service
* init job handler action
*/
public void initJobHandler(){
Map<String, Object> serviceBeanMap = XxlJobExecutor.applicationContext.getBeansWithAnnotation(JobHander.class);
......@@ -90,7 +90,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHander.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
HandlerRepository.registJobHandler(name, handler);
HandlerRouter.registJobHandler(name, handler);
}
}
}
......
package com.xxl.job.core.executor.jetty;
import com.xxl.job.core.handler.HandlerRepository;
import com.xxl.job.core.router.HandlerRouter;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by xuxueli on 2016/3/2 21:23.
*/
public class XxlJobExecutorHandler extends AbstractHandler {
private static Logger logger = LoggerFactory.getLogger(XxlJobExecutorHandler.class);
@Override
public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
......@@ -22,22 +26,31 @@ public class XxlJobExecutorHandler extends AbstractHandler {
httpServletRequest.setCharacterEncoding("UTF-8");
httpServletResponse.setCharacterEncoding("UTF-8");
Map<String, String> _param = new HashMap<String, String>();
if (httpServletRequest.getParameterMap()!=null && httpServletRequest.getParameterMap().size()>0) {
for (Object paramKey : httpServletRequest.getParameterMap().keySet()) {
if (paramKey!=null) {
String paramKeyStr = paramKey.toString();
_param.put(paramKeyStr, httpServletRequest.getParameter(paramKeyStr));
}
}
}
String resp = HandlerRepository.service(_param);
httpServletResponse.setContentType("text/html;charset=utf-8");
// parse hex-json to request model
String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX);
ResponseModel responseModel = null;
if (requestHex!=null && requestHex.trim().length()>0) {
try {
// route trigger
RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class);
responseModel = HandlerRouter.route(requestModel);
} catch (Exception e) {
logger.error("", e);
responseModel = new ResponseModel(ResponseModel.SUCCESS, e.getMessage());
}
}
if (responseModel == null) {
responseModel = new ResponseModel(ResponseModel.SUCCESS, "系统异常");
}
// format response model to hex-json
String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel);
// return
httpServletResponse.setContentType("text/plain;charset=utf-8");
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
httpServletResponse.getWriter().println(resp);
httpServletResponse.getWriter().println(responseHex);
}
}
......@@ -10,7 +10,7 @@
//import javax.servlet.http.HttpServletRequest;
//import javax.servlet.http.HttpServletResponse;
//
//import com.xxl.job.client.handler.HandlerRepository;
//import com.xxl.job.client.handler.HandlerRouter;
//
//
///**
......@@ -45,7 +45,7 @@
// }
// }
//
// String resp = HandlerRepository.service(_param);
// String resp = HandlerRouter.action(_param);
// response.getWriter().append(resp);
// return;
// }
......
......@@ -59,7 +59,7 @@ public class GlueFactory implements ApplicationContextAware {
}
/**
* inject service of spring
* inject action of spring
* @param instance
*/
public void injectService(Object instance){
......
package com.xxl.job.core.handler;
import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.HttpUtil;
import com.xxl.job.core.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.util.JacksonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* handler repository
* @author xuxueli 2015-12-19 19:28:44
*/
public class HandlerRepository {
private static Logger logger = LoggerFactory.getLogger(HandlerRepository.class);
public enum HandlerParamEnum{
/**
* trigger timestamp
*/
TIMESTAMP,
/**
* trigger action
*/
ACTION,
/**
* job group
*/
JOB_GROUP,
/**
* job name
*/
JOB_NAME,
/**
* params of jobhandler
*/
EXECUTOR_HANDLER,
/**
* params of jobhandler
*/
EXECUTOR_PARAMS,
/**
* switch of glue job: 0-no,1-yes
*/
GLUE_SWITCH,
/**
* address for callback log
*/
LOG_ADDRESS,
/**
* log id
*/
LOG_ID,
/**
* log date
*/
LOG_DATE
}
public enum ActionEnum{RUN, KILL, LOG, BEAT}
// jobhandler repository
private static ConcurrentHashMap<String, IJobHandler> handlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static void registJobHandler(String name, IJobHandler jobHandler){
handlerRepository.put(name, jobHandler);
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
}
// thread repository of jobhandler
public static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>();
public static HandlerThread registJobHandlerThread(String jobkey, IJobHandler handler){
HandlerThread handlerThread = new HandlerThread(handler);
handlerThread.start();
logger.info(">>>>>>>>>>> xxl-job regist handler success, jobkey:{}, handler:{}", new Object[]{jobkey, handler});
return handlerTreadMap.put(jobkey, handlerThread); // putIfAbsent
}
// handler push to queue
public static String service(Map<String, String> _param) {
logger.debug(">>>>>>>>>>> xxl-job service start, _param:{}", new Object[]{_param});
// callback
RemoteCallBack callback = new RemoteCallBack();
callback.setStatus(RemoteCallBack.FAIL);
// check namespace
String namespace = _param.get(HandlerParamEnum.ACTION.name());
if (namespace==null || namespace.trim().length()==0) {
callback.setMsg("param[NAMESPACE] can not be null.");
return JacksonUtil.writeValueAsString(callback);
}
// encryption check
long timestamp = _param.get(HandlerParamEnum.TIMESTAMP.name())!=null?Long.valueOf(_param.get(HandlerParamEnum.TIMESTAMP.name())):-1;
if (System.currentTimeMillis() - timestamp > 60000) {
callback.setMsg("Timestamp check failed.");
return JacksonUtil.writeValueAsString(callback);
}
// parse namespace
if (namespace.equals(ActionEnum.RUN.name())) {
// generate jobKey
String job_group = _param.get(HandlerParamEnum.JOB_GROUP.name());
String job_name = _param.get(HandlerParamEnum.JOB_NAME.name());
if (job_group == null || job_group.trim().length()==0 || job_name == null || job_name.trim().length()==0) {
callback.setMsg("JOB_GROUP or JOB_NAME is null.");
return JacksonUtil.writeValueAsString(callback);
}
// glue switch
String handler_glue_switch = _param.get(HandlerParamEnum.GLUE_SWITCH.name());
if (handler_glue_switch==null || handler_glue_switch.trim().length()==0){
callback.setMsg("GLUE_SWITCH is null.");
return JacksonUtil.writeValueAsString(callback);
}
// load old thread
String jobKey = job_group.concat("_").concat(job_name);
HandlerThread handlerThread = handlerTreadMap.get(jobKey);
if ("0".equals(handler_glue_switch)) {
// bean model
// handler name
String executor_handler = _param.get(HandlerParamEnum.EXECUTOR_HANDLER.name());
if (executor_handler==null || executor_handler.trim().length()==0){
callback.setMsg("EXECUTOR_HANDLER is null.");
return JacksonUtil.writeValueAsString(callback);
}
// handler instance
IJobHandler jobHandler = handlerRepository.get(executor_handler);
if (handlerThread == null) {
// jobhandler match
if (jobHandler==null) {
callback.setMsg("handler for jobKey=[" + jobKey + "] not found.");
return JacksonUtil.writeValueAsString(callback);
}
handlerThread = HandlerRepository.registJobHandlerThread(jobKey, jobHandler);
} else {
if (handlerThread.getHandler() != jobHandler) {
handlerThread = HandlerRepository.registJobHandlerThread(jobKey, jobHandler);
}
}
} else {
// glue
if (handlerThread == null) {
handlerThread = HandlerRepository.registJobHandlerThread(jobKey, new GlueJobHandler(job_group, job_name));
}
}
// push data to queue
handlerThread.pushData(_param);
callback.setStatus(RemoteCallBack.SUCCESS);
} else if (namespace.equals(ActionEnum.KILL.name())) {
// generate jobKey
String job_group = _param.get(HandlerParamEnum.JOB_GROUP.name());
String job_name = _param.get(HandlerParamEnum.JOB_NAME.name());
if (job_group == null || job_group.trim().length()==0 || job_name == null || job_name.trim().length()==0) {
callback.setMsg("JOB_GROUP or JOB_NAME is null.");
return JacksonUtil.writeValueAsString(callback);
}
String jobKey = job_group.concat("_").concat(job_name);
// kill handlerThread, and create new one
HandlerThread handlerThread = handlerTreadMap.get(jobKey);
if (handlerThread != null) {
IJobHandler handler = handlerThread.getHandler();
handlerThread.toStop();
handlerThread.interrupt();
HandlerRepository.registJobHandlerThread(jobKey, handler);
callback.setStatus(RemoteCallBack.SUCCESS);
} else {
callback.setMsg("handler for jobKey=[" + jobKey + "] not found.");
}
} else if (namespace.equals(ActionEnum.LOG.name())) {
String log_id = _param.get(HandlerParamEnum.LOG_ID.name());
String log_date = _param.get(HandlerParamEnum.LOG_DATE.name());
if (log_id==null || log_date==null) {
callback.setMsg("LOG_ID | LOG_DATE can not be null.");
return JacksonUtil.writeValueAsString(callback);
}
int logId = -1;
Date triggerDate = null;
try {
logId = Integer.valueOf(log_id);
triggerDate = new Date(Long.valueOf(log_date));
} catch (Exception e) {
}
if (logId<=0 || triggerDate==null) {
callback.setMsg("LOG_ID | LOG_DATE parse error.");
return JacksonUtil.writeValueAsString(callback);
}
String logConteng = XxlJobFileAppender.readLog(triggerDate, log_id);
callback.setStatus(RemoteCallBack.SUCCESS);
callback.setMsg(logConteng);
} else if (namespace.equals(ActionEnum.BEAT.name())) {
callback.setStatus(RemoteCallBack.SUCCESS);
callback.setMsg(null);
} else {
callback.setMsg("param[Action] is not valid.");
return JacksonUtil.writeValueAsString(callback);
}
logger.debug(">>>>>>>>>>> xxl-job service end, triggerData:{}");
return JacksonUtil.writeValueAsString(callback);
}
// ----------------------- for callback log -----------------------
private static LinkedBlockingQueue<HashMap<String, String>> callBackQueue = new LinkedBlockingQueue<HashMap<String, String>>();
static {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
HashMap<String, String> item = callBackQueue.take();
if (item != null) {
RemoteCallBack callback = null;
try {
callback = HttpUtil.post(item.get("_address"), item);
} catch (Exception e) {
logger.info("HandlerThread Exception:", e);
}
logger.info(">>>>>>>>>>> xxl-job callback , params:{}, result:{}", new Object[]{item, callback});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
public static void pushCallBack(String address, HashMap<String, String> params){
params.put("_address", address);
callBackQueue.add(params);
}
}
package com.xxl.job.core.handler;
import com.xxl.job.core.router.HandlerRouter;
/**
* remote job handler
* @author xuxueli 2015-12-19 19:06:38
*/
public abstract class IJobHandler extends HandlerRepository{
public abstract class IJobHandler extends HandlerRouter {
/**
* job handler <br><br>
......@@ -23,11 +25,7 @@ public abstract class IJobHandler extends HandlerRepository{
/**
* handle fail
*/
FAIL,
/**
* handle not found
*/
NOT_FOUND;
FAIL;
}
}
package com.xxl.job.core.log;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.spi.LoggingEvent;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* store trigger log in each log-file
* @author xuxueli 2016-3-12 19:25:12
*/
public class XxlJobFileAppender extends AppenderSkeleton {
// for HandlerThread
// for JobThread
public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
......@@ -112,8 +105,8 @@ public class XxlJobFileAppender extends AppenderSkeleton {
* @param trigger_log_id
* @return
*/
public static String readLog(Date triggerDate, String trigger_log_id ){
if (triggerDate==null || trigger_log_id==null || trigger_log_id.trim().length()==0) {
public static String readLog(Date triggerDate, int trigger_log_id ){
if (triggerDate==null || trigger_log_id<=0) {
return null;
}
......@@ -131,7 +124,7 @@ public class XxlJobFileAppender extends AppenderSkeleton {
}
// filePath/yyyy-MM-dd/9999.log
String logFileName = trigger_log_id.concat(".log");
String logFileName = String.valueOf(trigger_log_id).concat(".log");
File logFile = new File(filePathDateDir, logFileName);
if (!logFile.exists()) {
return null;
......
package com.xxl.job.core.router;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.router.action.BeatAction;
import com.xxl.job.core.router.action.KillAction;
import com.xxl.job.core.router.action.LogAction;
import com.xxl.job.core.router.action.RunAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.router.thread.JobThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
* handler repository
* @author xuxueli 2015-12-19 19:28:44
*/
public class HandlerRouter {
private static Logger logger = LoggerFactory.getLogger(HandlerRouter.class);
/**
* job handler repository
*/
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return HandlerRouter.jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return HandlerRouter.jobHandlerRepository.get(name);
}
/**
* job thread repository
*/
private static ConcurrentHashMap<String, JobThread> JobThreadRepository = new ConcurrentHashMap<String, JobThread>();
public static JobThread registJobThread(String jobkey, IJobHandler handler){
JobThread handlerThread = new JobThread(handler);
handlerThread.start();
logger.info(">>>>>>>>>>> xxl-job regist handler success, jobkey:{}, handler:{}", new Object[]{jobkey, handler});
return HandlerRouter.JobThreadRepository.put(jobkey, handlerThread); // putIfAbsent
}
public static JobThread loadJobThread(String jobKey){
return HandlerRouter.JobThreadRepository.get(jobKey);
}
/**
* route action repository
*/
public enum ActionRepository {
RUN(new RunAction()),
KILL(new KillAction()),
LOG(new LogAction()),
BEAT(new BeatAction());
private IAction action;
private ActionRepository(IAction action){
this.action = action;
}
/**
* match Action by enum name
* @param name
* @return
*/
public static IAction matchAction(String name){
if (name!=null && name.trim().length()>0) {
for (ActionRepository item : ActionRepository.values()) {
if (item.name().equals(name)) {
return item.action;
}
}
}
return null;
}
}
// handler push to queue
public static ResponseModel route(RequestModel requestModel) {
logger.debug(">>>>>>>>>>> xxl-job route, RequestModel:{}", new Object[]{requestModel.toString()});
// timestamp check
if (System.currentTimeMillis() - requestModel.getTimestamp() > 60000) {
return new ResponseModel(ResponseModel.SUCCESS, "Timestamp Timeout.");
}
// match action
IAction action = ActionRepository.matchAction(requestModel.getAction());
if (action == null) {
return new ResponseModel(ResponseModel.SUCCESS, "Action match fail.");
}
return action.execute(requestModel);
}
}
package com.xxl.job.core.router;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
/**
* Created by xuxueli on 16/7/22.
*/
public abstract class IAction {
public abstract ResponseModel execute(RequestModel requestModel);
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
/**
* Created by xuxueli on 16/7/22.
*/
public class BeatAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
return new ResponseModel(ResponseModel.SUCCESS, "i am alive.");
}
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.router.HandlerRouter;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.router.thread.JobThread;
/**
* Created by xuxueli on 16/7/22.
*/
public class KillAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
// generate jobKey
String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName());
// kill handlerThread, and create new one
JobThread jobThread = HandlerRouter.loadJobThread(jobKey);
if (jobThread != null) {
IJobHandler handler = jobThread.getHandler();
jobThread.toStop("人工手动终止");
jobThread.interrupt();
HandlerRouter.registJobThread(jobKey, handler);
return new ResponseModel(ResponseModel.SUCCESS, "job thread kull success.");
}
return new ResponseModel(ResponseModel.FAIL, "job thread not found.");
}
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import java.util.Date;
/**
* Created by xuxueli on 16/7/22.
*/
public class LogAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
String logConteng = XxlJobFileAppender.readLog(new Date(requestModel.getLogDateTim()), requestModel.getLogId());
return new ResponseModel(ResponseModel.SUCCESS, logConteng);
}
}
package com.xxl.job.core.router.action;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.router.HandlerRouter;
import com.xxl.job.core.router.IAction;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.router.thread.JobThread;
/**
* Created by xuxueli on 16/7/22.
*/
public class RunAction extends IAction {
@Override
public ResponseModel execute(RequestModel requestModel) {
// generate jobKey
String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName());
// load old thread
JobThread jobThread = HandlerRouter.loadJobThread(jobKey);
if (!requestModel.isGlueSwitch()) {
// bean model
// handler instance
IJobHandler jobHandler = HandlerRouter.loadJobHandler(requestModel.getExecutorHandler());
if (jobThread == null) {
// jobhandler match
if (jobHandler==null) {
return new ResponseModel(ResponseModel.FAIL, "job handler for jobKey=[" + jobKey + "] not found.");
}
jobThread = HandlerRouter.registJobThread(jobKey, jobHandler);
} else {
// job handler update, kill old job thread
if (jobThread.getHandler() != jobHandler) {
// kill old job thread
jobThread.toStop("人工手动终止");
jobThread.interrupt();
// new thread, with new job handler
jobThread = HandlerRouter.registJobThread(jobKey, jobHandler);
}
}
} else {
// glue model
if (jobThread == null) {
jobThread = HandlerRouter.registJobThread(jobKey, new GlueJobHandler(requestModel.getJobGroup(), requestModel.getJobName()));
}
}
// sometime, cmap.get can not return given value, i do not know why
jobThread = HandlerRouter.loadJobThread(jobKey);
// push data to queue
jobThread.pushTriggerQueue(requestModel);
return new ResponseModel(ResponseModel.SUCCESS, null);
}
}
package com.xxl.job.core.router.model;
/**
* Created by xuxueli on 16/7/22.
*/
public class RequestModel {
private long timestamp;
private String action;
private String jobGroup;
private String jobName;
private String executorHandler;
private String executorParams;
private boolean glueSwitch;
private String logAddress;
private int logId;
private long logDateTim;
private String status;
private String msg;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getJobGroup() {
return jobGroup;
}
public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getExecutorHandler() {
return executorHandler;
}
public void setExecutorHandler(String executorHandler) {
this.executorHandler = executorHandler;
}
public String getExecutorParams() {
return executorParams;
}
public void setExecutorParams(String executorParams) {
this.executorParams = executorParams;
}
public boolean isGlueSwitch() {
return glueSwitch;
}
public void setGlueSwitch(boolean glueSwitch) {
this.glueSwitch = glueSwitch;
}
public String getLogAddress() {
return logAddress;
}
public void setLogAddress(String logAddress) {
this.logAddress = logAddress;
}
public int getLogId() {
return logId;
}
public void setLogId(int logId) {
this.logId = logId;
}
public long getLogDateTim() {
return logDateTim;
}
public void setLogDateTim(long logDateTim) {
this.logDateTim = logDateTim;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "RequestModel{" +
"timestamp=" + timestamp +
", action='" + action + '\'' +
", jobGroup='" + jobGroup + '\'' +
", jobName='" + jobName + '\'' +
", executorHandler='" + executorHandler + '\'' +
", executorParams='" + executorParams + '\'' +
", glueSwitch=" + glueSwitch +
", logAddress='" + logAddress + '\'' +
", logId=" + logId +
", logDateTim=" + logDateTim +
", status='" + status + '\'' +
", msg='" + msg + '\'' +
'}';
}
}
package com.xxl.job.core.router.model;
/**
* Created by xuxueli on 16/7/22.
*/
public class ResponseModel {
public static final String SUCCESS = "SUCCESS";
public static final String FAIL = "FAIL";
private String status;
private String msg;
public ResponseModel() {
}
public ResponseModel(String status, String msg) {
this.status = status;
this.msg = msg;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "ResponseModel{" +
"status='" + status + '\'' +
", msg='" + msg + '\'' +
'}';
}
}
package com.xxl.job.core.handler;
import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum;
import com.xxl.job.core.handler.IJobHandler.JobHandleStatus;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.HttpUtil;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* handler thread
* @author xuxueli 2016-1-16 19:52:47
*/
public class HandlerThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(HandlerThread.class);
private IJobHandler handler;
private LinkedBlockingQueue<Map<String, String>> handlerDataQueue;
private ConcurrentHashSet<String> logIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private boolean toStop = false;
public HandlerThread(IJobHandler handler) {
this.handler = handler;
handlerDataQueue = new LinkedBlockingQueue<Map<String,String>>();
logIdSet = new ConcurrentHashSet<String>();
}
public IJobHandler getHandler() {
return handler;
}
public void toStop() {
/**
* Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
*/
this.toStop = true;
}
public void pushData(Map<String, String> param) {
if (param.get(HandlerParamEnum.LOG_ID.name())!=null && !logIdSet.contains(param.get(HandlerParamEnum.LOG_ID.name()))) {
handlerDataQueue.offer(param);
}
}
int i = 1;
@Override
public void run() {
while(!toStop){
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
Map<String, String> handlerData = handlerDataQueue.poll(3L, TimeUnit.SECONDS);
if (handlerData!=null) {
i= 0;
String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name());
String log_id = handlerData.get(HandlerParamEnum.LOG_ID.name());
String handler_params = handlerData.get(HandlerParamEnum.EXECUTOR_PARAMS.name());
logIdSet.remove(log_id);
// parse param
String[] handlerParams = null;
if (handler_params!=null && handler_params.trim().length()>0) {
handlerParams = handler_params.split(",");
} else {
handlerParams = new String[0];
}
// handle job
JobHandleStatus _status = JobHandleStatus.FAIL;
String _msg = null;
try {
XxlJobFileAppender.contextHolder.set(log_id);
logger.info(">>>>>>>>>>> xxl-job handle start.");
_status = handler.execute(handlerParams);
} catch (Exception e) {
logger.info("HandlerThread Exception:", e);
StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
_msg = out.toString();
}
logger.info(">>>>>>>>>>> xxl-job handle end, handlerParams:{}, _status:{}, _msg:{}",
new Object[]{handlerParams, _status, _msg});
// callback handler info
if (!toStop) {
HashMap<String, String> params = new HashMap<String, String>();
params.put("log_id", log_id);
params.put("status", _status.name());
params.put("msg", _msg);
HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
} else {
HashMap<String, String> params = new HashMap<String, String>();
params.put("log_id", log_id);
params.put("status", JobHandleStatus.FAIL.name());
params.put("msg", "人工手动终止[业务运行中,被强制终止]");
HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
}
}
} catch (Exception e) {
logger.info("HandlerThread Exception:", e);
}
}
// callback trigger request in queue
while(handlerDataQueue!=null && handlerDataQueue.size()>0){
Map<String, String> handlerData = handlerDataQueue.poll();
if (handlerData!=null) {
String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name());
String log_id = handlerData.get(HandlerParamEnum.LOG_ID.name());
HashMap<String, String> params = new HashMap<String, String>();
params.put("log_id", log_id);
params.put("status", JobHandleStatus.FAIL.name());
params.put("msg", "人工手动终止[任务尚未执行,在调度队列中被终止]");
HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
}
}
logger.info(">>>>>>>>>>>> xxl-job handlerThrad stoped, hashCode:{}", Thread.currentThread());
}
}
package com.xxl.job.core.router.thread;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.IJobHandler.JobHandleStatus;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* handler thread
* @author xuxueli 2016-1-16 19:52:47
*/
public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private IJobHandler handler;
private LinkedBlockingQueue<RequestModel> triggerQueue;
private ConcurrentHashSet<Integer> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private boolean toStop = false;
private String stopReason;
public JobThread(IJobHandler handler) {
this.handler = handler;
triggerQueue = new LinkedBlockingQueue<RequestModel>();
triggerLogIdSet = new ConcurrentHashSet<Integer>();
}
public IJobHandler getHandler() {
return handler;
}
public void pushTriggerQueue(RequestModel requestModel) {
if (triggerLogIdSet.contains(requestModel.getLogId())) {
logger.info("repeate trigger job, logId:{}", requestModel.getLogId());
return;
}
triggerLogIdSet.add(requestModel.getLogId());
triggerQueue.add(requestModel);
}
public void toStop(String stopReason) {
/**
* Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
*/
this.toStop = true;
this.stopReason = stopReason;
}
int i = 1;
@Override
public void run() {
while(!toStop){
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
RequestModel triggerDate = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerDate!=null) {
triggerLogIdSet.remove(triggerDate.getLogId());
// parse param
String[] handlerParams = (triggerDate.getExecutorParams()!=null && triggerDate.getExecutorParams().trim().length()>0)
? (String[])(Arrays.asList(triggerDate.getExecutorParams().split(",")).toArray()) : null;
// handle job
JobHandleStatus _status = JobHandleStatus.FAIL;
String _msg = null;
try {
XxlJobFileAppender.contextHolder.set(String.valueOf(triggerDate.getLogId()));
logger.info("----------- xxl-job job handle start -----------");
_status = handler.execute(handlerParams);
} catch (Exception e) {
logger.info("JobThread Exception:", e);
StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
_msg = out.toString();
}
logger.info("----------- xxl-job job handle end ----------- <br>: ExecutorParams:{}, Status:{}, Msg:{}",
new Object[]{handlerParams, _status, _msg});
// callback handler info
if (!toStop) {
// commonm
triggerDate.setStatus(_status.name());
triggerDate.setMsg(_msg);
TriggerCallbackThread.pushCallBack(triggerDate);
} else {
// is killed
triggerDate.setStatus(JobHandleStatus.FAIL.name());
triggerDate.setMsg(stopReason + "人工手动终止[业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(triggerDate);
}
}
} catch (Exception e) {
logger.info("JobThread Exception:", e);
}
}
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
RequestModel triggerDate = triggerQueue.poll();
if (triggerDate!=null) {
// is killed
RequestModel callback = new RequestModel();
callback.setLogAddress(XxlJobNetCommUtil.addressToUrl(triggerDate.getLogAddress()));
callback.setLogId(triggerDate.getLogId());
callback.setStatus(JobHandleStatus.FAIL.name());
callback.setMsg(stopReason + "[任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(callback);
}
}
logger.info(">>>>>>>>>>>> xxl-job handlerThrad stoped, hashCode:{}", Thread.currentThread());
}
}
package com.xxl.job.core.router.thread;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import com.xxl.job.core.util.XxlJobNetCommUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by xuxueli on 16/7/22.
*/
public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static LinkedBlockingQueue<RequestModel> callBackQueue = new LinkedBlockingQueue<RequestModel>();
static {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
RequestModel callback = callBackQueue.take();
if (callback != null) {
try {
ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(callback.getLogAddress()), callback);
logger.info(">>>>>>>>>>> xxl-job callback , RequestModel:{}, ResponseModel:{}", new Object[]{callback.toString(), responseModel.toString()});
} catch (Exception e) {
logger.info("JobThread Exception:", e);
}
}
} catch (Exception e) {
logger.error("", e);
}
}
}
}).start();
}
public static void pushCallBack(RequestModel callback){
callBackQueue.add(callback);
}
}
package com.xxl.job.core.util;
import java.math.BigInteger;
/**
* hex/byte util
* @author xuxueli 2015-11-14 22:47:28
*/
public class ByteHexConverter {
/**
* byte - to - radix, use BigInteger
*/
private static final String hex_tables = "0123456789ABCDEF";
public static String byte2hex (byte[] iBytes) {
StringBuilder hex = new StringBuilder(iBytes.length * 2);
for (int index = 0; index < iBytes.length; index++) {
hex.append(hex_tables.charAt((iBytes[index] & 0xf0) >> 4));
hex.append(hex_tables.charAt((iBytes[index] & 0x0f) >> 0));
}
return hex.toString();
}
public static byte[] hex2Byte(String hexString) {
if (hexString == null || hexString.equals("")) {
return null;
}
byte[] res = new byte[hexString.length() / 2];
char[] chs = hexString.toCharArray();
for (int i = 0, c = 0; i < chs.length; i += 2, c++) {
res[c] = (byte) (Integer.parseInt(new String(chs, i, 2), 16));
}
return res;
}
/**
* byte - to - radix, use BigInteger
*/
public static final int HEX = 16;
public static String byte2radix(byte[] iBytes, int radix){
return new BigInteger(1, iBytes).toString(radix);
}
public static byte[] radix2byte(String val, int radix){
return new BigInteger(val, radix).toByteArray();
}
public static void main(String[] args) {
// hex - byte[] 方案A:位移
String temp = "1111111111113d1f3a51sd3f1a32sd1f32as1df2a13sd21f3a2s1df32a13sd2f123s2a3d13fa13sd9999999999";
System.out.println("明文:" + new String(temp.getBytes()));
System.out.println("编码:" + byte2hex(temp.getBytes()));
System.out.println("解码:" + new String(hex2Byte(byte2hex(temp.getBytes()))));
// hex - byte[] 方案B:BigInteger
System.out.println("编码:" + byte2radix(temp.getBytes(), HEX));
System.out.println("解码:" + new String(radix2byte(byte2radix(temp.getBytes(), HEX), HEX)));
}
}
\ No newline at end of file
package com.xxl.job.core.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
/**
* http util to send data
* @author xuxueli
* @version 2015-11-28 15:30:59
*/
public class HttpUtil {
/**
* http remote callback
*/
public static class RemoteCallBack{
public static final String SUCCESS = "SUCCESS";
public static final String FAIL = "FAIL";
private String status;
private String msg;
public void setStatus(String status) {
this.status = status;
}
public String getStatus() {
return status;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
@Override
public String toString() {
return "RemoteCallBack [status=" + status + ", msg=" + msg + "]";
}
}
/**
* http post request
* @param reqURL
* @param params
* @return [0]=responseMsg, [1]=exceptionMsg
*/
public static RemoteCallBack post(String reqURL, Map<String, String> params){
RemoteCallBack callback = new RemoteCallBack();
callback.setStatus(RemoteCallBack.FAIL);
// do post
HttpPost httpPost = null;
CloseableHttpClient httpClient = null;
try{
httpPost = new HttpPost(reqURL);
if (params != null && !params.isEmpty()) {
List<NameValuePair> formParams = new ArrayList<NameValuePair>();
for(Map.Entry<String,String> entry : params.entrySet()){
formParams.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8"));
}
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();
httpPost.setConfig(requestConfig);
//httpClient = HttpClients.createDefault(); // default retry 3 times
httpClient = HttpClients.custom().disableAutomaticRetries().build();
HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
if (response.getStatusLine().getStatusCode() == 200) {
if (null != entity) {
String responseMsg = EntityUtils.toString(entity, "UTF-8");
callback = JacksonUtil.readValue(responseMsg, RemoteCallBack.class);
if (callback == null) {
callback = new RemoteCallBack();
callback.setStatus(RemoteCallBack.FAIL);
callback.setMsg("responseMsg parse json fail, responseMsg:" + responseMsg);
}
EntityUtils.consume(entity);
}
} else {
callback.setMsg("http statusCode error, statusCode:" + response.getStatusLine().getStatusCode());
}
} catch (Exception e) {
e.printStackTrace();
/*StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
callback.setMsg(out.toString());*/
callback.setMsg(e.getMessage());
} finally{
if (httpPost!=null) {
httpPost.releaseConnection();
}
if (httpClient!=null) {
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return callback;
}
/**
* parse address ip:port to url http://.../
* @param address
* @return
*/
public static String addressToUrl(String address){
return "http://" + address + "/";
}
}
package com.xxl.job.core.util;
import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* http util to send data
* @author xuxueli
* @version 2015-11-28 15:30:59
*/
public class XxlJobNetCommUtil {
private static Logger logger = LoggerFactory.getLogger(XxlJobNetCommUtil.class);
// hex param key
public static final String HEX = "hex";
/**
* format object to hex-json
* @param obj
* @return
*/
public static String formatObj2HexJson(Object obj){
String json = JacksonUtil.writeValueAsString(obj);
String hex = ByteHexConverter.byte2hex(json.getBytes());
return hex;
}
/**
* parse hex-json to object
* @param hex
* @param clazz
* @return
*/
public static <T> T parseHexJson2Obj(String hex, Class<T> clazz){
String json = new String(ByteHexConverter.hex2Byte(hex));
T obj = JacksonUtil.readValue(json, clazz);
return obj;
}
public static void main(String[] args) {
System.out.println(parseHexJson2Obj("7B22737461747573223A2253554343455353222C226D7367223A2254696D657374616D702054696D656F75742E227D", ResponseModel.class));
}
/**
* http post request
* @param reqURL
*/
public static ResponseModel postHex(String reqURL, RequestModel requestModel){
// parse RequestModel to hex-json
String requestHex = XxlJobNetCommUtil.formatObj2HexJson(requestModel);
// msg
String failMsg = null;
// do post
HttpPost httpPost = null;
CloseableHttpClient httpClient = null;
try{
httpPost = new HttpPost(reqURL);
List<NameValuePair> formParams = new ArrayList<NameValuePair>();
formParams.add(new BasicNameValuePair(XxlJobNetCommUtil.HEX, requestHex));
httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8"));
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();
httpPost.setConfig(requestConfig);
//httpClient = HttpClients.createDefault(); // default retry 3 times
httpClient = HttpClients.custom().disableAutomaticRetries().build();
HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
if (response.getStatusLine().getStatusCode() == 200 && null != entity) {
String responseHex = EntityUtils.toString(entity, "UTF-8");
EntityUtils.consume(entity);
// i do not know why
responseHex = responseHex.replace("\n", "");
// parse hex-json to ResponseModel
ResponseModel responseModel = XxlJobNetCommUtil.parseHexJson2Obj(responseHex, ResponseModel.class);
if (responseModel!=null) {
return responseModel;
}
} else {
failMsg = "http statusCode error, statusCode:" + response.getStatusLine().getStatusCode();
}
} catch (Exception e) {
logger.info("", e);
/*StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out));
callback.setMsg(out.toString());*/
failMsg = e.getMessage();
} finally{
if (httpPost!=null) {
httpPost.releaseConnection();
}
if (httpClient!=null) {
try {
httpClient.close();
} catch (IOException e) {
logger.info("", e);
}
}
}
// other, default fail
ResponseModel callback = new ResponseModel();
callback.setStatus(ResponseModel.FAIL);
callback.setMsg(failMsg);
return callback;
}
/**
* parse address ip:port to url http://.../
* @param address
* @return
*/
public static String addressToUrl(String address){
return "http://" + address + "/";
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册