diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index 193b8719a68fd865d3baf3f7ffa9432a2472bc55..302bd51473ce1c25f01362de7cb97dfe6ebd915d 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -3,8 +3,8 @@ package com.xxl.job.admin.core.jobbean; import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.admin.core.thread.JobMonitorHelper; +import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.router.HandlerRouter.ActionRepository; @@ -60,17 +60,12 @@ public class RemoteHttpJobBean extends QuartzJobBean { List addressList = new ArrayList(); String parseAddressMsg = null; if (StringUtils.isNotBlank(jobInfo.getExecutorAppname())) { - List xxlJobRegistryList = DynamicSchedulerUtil.xxlJobRegistryDao.findRegistrys(RegistHelper.RegistType.EXECUTOR.name(), jobInfo.getExecutorAppname()); - if (xxlJobRegistryList!=null && xxlJobRegistryList.size()>0) { - for (XxlJobRegistry item: xxlJobRegistryList) { - addressList.add(item.getRegistryValue()); - } - } - parseAddressMsg = MessageFormat.format("Parse Address (Appname注册方式)
>>>[address list] : {0}

", addressList.toArray()); + addressList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), jobInfo.getExecutorAppname()); + parseAddressMsg = MessageFormat.format("Parse Address (Appname注册方式)
>>>[address list] : {0}

", addressList); } else { List addressArr = Arrays.asList(jobInfo.getExecutorAddress().split(",")); addressList.addAll(addressArr); - parseAddressMsg = MessageFormat.format("Parse Address (地址配置方式)
>>>[address list] : {0}

", addressList.toArray()); + parseAddressMsg = MessageFormat.format("Parse Address (地址配置方式)
>>>[address list] : {0}

", addressList); } // failover trigger @@ -97,7 +92,21 @@ public class RemoteHttpJobBean extends QuartzJobBean { * @return */ public ResponseModel failoverTrigger(List addressList, RequestModel requestModel, XxlJobLog jobLog){ - if (addressList.size() > 1) { + if (addressList==null || addressList.size() < 1) { + ResponseModel result = new ResponseModel(); + result.setStatus(ResponseModel.FAIL); + result.setMsg( "Trigger error,
>>>address list is null

" ); + return result; + } else if (addressList.size() == 1) { + String address = addressList.get(0); + // store real address + jobLog.setExecutorAddress(address); + + ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); + String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, triggerCallback.getStatus(), triggerCallback.getMsg()); + triggerCallback.setMsg(failoverMessage); + return triggerCallback; + } else { // for ha Collections.shuffle(addressList); @@ -133,20 +142,6 @@ public class RemoteHttpJobBean extends QuartzJobBean { result.setStatus(ResponseModel.FAIL); result.setMsg(failoverMessage); return result; - } else if (addressList.size() == 1) { - String address = addressList.get(0); - // store real address - jobLog.setExecutorAddress(address); - - ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); - String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, triggerCallback.getStatus(), triggerCallback.getMsg()); - triggerCallback.setMsg(failoverMessage); - return triggerCallback; - } else { - ResponseModel result = new ResponseModel(); - result.setStatus(ResponseModel.FAIL); - result.setMsg( "Trigger error,
>>>address list is null

" ); - return result; } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..85140c52ec96bdda46a78dc9b7fd6e1a5dd06638 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -0,0 +1,74 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.model.XxlJobRegistry; +import com.xxl.job.admin.core.util.DynamicSchedulerUtil; +import com.xxl.job.core.registry.RegistHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * job registry helper + * @author xuxueli 2016-10-02 19:10:24 + */ +public class JobRegistryHelper { + private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class); + + private static JobRegistryHelper helper = new JobRegistryHelper(); + private ConcurrentHashMap> registMap = new ConcurrentHashMap>(); + + public JobRegistryHelper(){ + Thread registryThread = new Thread(new Runnable() { + @Override + public void run() { + int timeout = 15; + while (true) { + try { + ConcurrentHashMap> temp = new ConcurrentHashMap>(); + // do biz + DynamicSchedulerUtil.xxlJobRegistryDao.removeDead(RegistHelper.TIMEOUT*2); + List list = DynamicSchedulerUtil.xxlJobRegistryDao.findAll(RegistHelper.TIMEOUT*2); + if (list != null) { + for (XxlJobRegistry item: list) { + String groupKey = makeGroupKey(item.getRegistryGroup(), item.getRegistryKey()); + List dataSet = temp.get(groupKey); + if (dataSet == null) { + dataSet = new ArrayList(); + } + dataSet.add(item.getRegistryValue()); + temp.put(groupKey, dataSet); + } + } + // gresh registry + registMap = temp; + logger.error("job registry :{}", list); + } catch (Exception e) { + logger.error("job registry helper error:{}", e); + } + try { + TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); + } catch (InterruptedException e) { + logger.error("job registry helper error:{}", e); + } + } + } + }); + registryThread.setDaemon(true); + registryThread.start(); + + } + + private static String makeGroupKey(String registryGroup, String registryKey){ + return registryGroup.concat("_").concat(registryKey); + } + + public static List discover(String registryGroup, String registryKey){ + String groupKey = makeGroupKey(registryGroup, registryKey); + return helper.registMap.get(groupKey); + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java index 068e546f22d901bf6f191435c28210b16c19d4d8..472c166b5ecf73b89408c10329e3b72db767a749 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java @@ -3,6 +3,7 @@ package com.xxl.job.admin.core.util; import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer; import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobRegistryDao; @@ -49,6 +50,9 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init } catch (Exception e) { e.printStackTrace(); } + + // init JobRegistryHelper + JobRegistryHelper.discover("g", "k"); } // destroy diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/IXxlJobRegistryDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/IXxlJobRegistryDao.java index ca67567d1da06a240d67f3bf7ae6f1797a5aea6a..34284374635d8d6ae9661f1612f3b7d503c0ce3f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/IXxlJobRegistryDao.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/IXxlJobRegistryDao.java @@ -8,5 +8,7 @@ import java.util.List; * Created by xuxueli on 16/9/30. */ public interface IXxlJobRegistryDao { - List findRegistrys(String registryGroup, String registryKey); + public int removeDead(int timeout); + + public List findAll(int timeout); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/impl/XxlJobRegistryDaoImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/impl/XxlJobRegistryDaoImpl.java index 36a3c38fab4f233cff396ad2f0bd70ec1f45141e..02b96eccc90c6b2aed3fc8838314d4e1d052b971 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/impl/XxlJobRegistryDaoImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/impl/XxlJobRegistryDaoImpl.java @@ -6,9 +6,7 @@ import org.mybatis.spring.SqlSessionTemplate; import org.springframework.stereotype.Repository; import javax.annotation.Resource; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Created by xuxueli on 16/9/30. @@ -20,11 +18,13 @@ public class XxlJobRegistryDaoImpl implements IXxlJobRegistryDao { public SqlSessionTemplate sqlSessionTemplate; @Override - public List findRegistrys(String registryGroup, String registryKey) { - Map params = new HashMap(); - params.put("registryGroup", registryGroup); - params.put("registryKey", registryKey); - return sqlSessionTemplate.selectList("XxlJobRegistryMapper.findRegistrys", params); + public int removeDead(int timeout) { + return sqlSessionTemplate.delete("XxlJobRegistryMapper.removeDead", timeout); + } + + @Override + public List findAll(int timeout) { + return sqlSessionTemplate.selectList("XxlJobRegistryMapper.findAll", timeout); } } diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobRegistryMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobRegistryMapper.xml index 42d7d12310f31ab9ea79dd03d3635b4b9d29d1d0..46bfcc85de730c7bd3a947130fceece6d7c79f9b 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobRegistryMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobRegistryMapper.xml @@ -19,17 +19,15 @@ t.update_time - SELECT FROM XXL_JOB_QRTZ_TRIGGER_REGISTRY AS t - WHERE t.registry_group = #{registryGroup} - AND t.registry_key = #{registryKey} - AND t.update_time ]]> DATE_ADD(NOW(),INTERVAL -30 SECOND) + WHERE t.update_time ]]> DATE_ADD(NOW(),INTERVAL -#{timeout} SECOND) - - delete from XXL_JOB_QRTZ_TRIGGER_REGISTRY - WHERE update_time DATE_ADD(NOW(),INTERVAL -30 SECOND) - - \ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java index ea9a6e6aff81d24f5ef1fe3bcd4ec5927be06ef7..765ef5d13bb13f9c61d7c3a1621129bd97245044 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java @@ -96,7 +96,7 @@ public class XxlJobExecutor implements ApplicationContextAware { try { String address = IpUtil.getIp().concat(":").concat(String.valueOf(port)); registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); - TimeUnit.SECONDS.sleep(15); + TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); } catch (Exception e) { e.printStackTrace(); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java index 8435596fc570173277ef0642e4abe03e2f1b0f37..117fc213f201823b408f7537262fb9b69cd06ed2 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java @@ -5,6 +5,7 @@ package com.xxl.job.core.registry; */ public interface RegistHelper { + public static final int TIMEOUT = 15; public enum RegistType{ EXECUTOR, ADMIN } public int registry(String registGroup, String registryKey, String registryValue);