提交 510b397c 编写于 作者: X xueli.xue

升级大版本V1.3.x

上级 9beda409
......@@ -55,12 +55,19 @@ public class RemoteHttpJobBean extends QuartzJobBean {
// trigger request
HashMap<String, String> params = new HashMap<String, String>();
params.put(HandlerRepository.TRIGGER_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
params.put(HandlerRepository.NAMESPACE, HandlerRepository.NameSpaceEnum.RUN.name());
params.put(HandlerRepository.TRIGGER_LOG_URL, PropertiesUtil.getString(HandlerRepository.TRIGGER_LOG_URL));
params.put(HandlerRepository.TRIGGER_LOG_ID, String.valueOf(jobLog.getId()));
params.put(HandlerRepository.TRIGGER_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME));
params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS));
params.put(HandlerRepository.HANDLER_GLUE_SWITCH, String.valueOf(jobInfo.getGlueSwitch()));
params.put(HandlerRepository.HANDLER_JOB_GROUP, jobInfo.getJobGroup());
params.put(HandlerRepository.HANDLER_JOB_NAME, jobInfo.getJobName());
// handler address, jetty (servlet dead)
String handler_address = jobDataMap.get(HandlerRepository.HANDLER_ADDRESS);
......
......@@ -51,6 +51,30 @@
<version>1.7.5</version>
</dependency>
<!-- c3p0 -->
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<!-- mybatis-spring -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.2.8</version>
</dependency>
<!-- mysql-connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
<!-- xxl-job-client -->
<dependency>
<groupId>com.xxl</groupId>
......
package com.xxl.job.dao;
import com.xxl.job.dao.model.XxlJobInfo;
/**
* job log for glue
* @author xuxueli 2016-5-19 18:04:56
*/
public interface IXxlJobInfoDao {
public XxlJobInfo load(String jobGroup, String jobName);
}
package com.xxl.job.dao.impl;
import java.util.HashMap;
import javax.annotation.Resource;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Repository;
import com.xxl.job.dao.IXxlJobInfoDao;
import com.xxl.job.dao.model.XxlJobInfo;
/**
* job log for glue
* @author xuxueli 2016-5-19 18:17:52
*/
@Repository
public class XxlJobInfoDaoImpl implements IXxlJobInfoDao {
@Resource
public SqlSessionTemplate sqlSessionTemplate;
@Override
public XxlJobInfo load(String jobGroup, String jobName) {
HashMap<String, Object> params = new HashMap<String, Object>();
params.put("jobGroup", jobGroup);
params.put("jobName", jobName);
return sqlSessionTemplate.selectOne("XxlJobInfoMapper.load", params);
}
}
package com.xxl.job.dao.model;
/**
* xxl-job info
* @author xuxueli 2016-5-19 17:57:46
*/
public class XxlJobInfo {
private String jobGroup;
private String jobName;
private String glueSource;
public String getJobGroup() {
return jobGroup;
}
public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getGlueSource() {
return glueSource;
}
public void setGlueSource(String glueSource) {
this.glueSource = glueSource;
}
}
......@@ -7,7 +7,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.xxl.job.client.handler.IJobHandler;
import com.xxl.job.client.handler.JobHander;
import com.xxl.job.client.handler.IJobHandler.JobHandleStatus;
import com.xxl.job.client.handler.annotation.JobHander;
/**
* demo job handler
......@@ -30,7 +31,7 @@ public class DemoJobHandler extends IJobHandler {
public static void main(String[] args) {
System.out.println(DemoJobHandler.class.getName());
System.out.println(DemoJobHandler.class);
System.out.println(JobHandleStatus.class);
}
}
package com.xxl.job.service.loader;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.xxl.job.client.glue.loader.GlueLoader;
import com.xxl.job.dao.IXxlJobInfoDao;
import com.xxl.job.dao.model.XxlJobInfo;
@Service("dbGlueLoader")
public class DbGlueLoader implements GlueLoader {
@Resource
private IXxlJobInfoDao xxlJobInfoDao;
@Override
public String load(String job_group, String job_name) {
XxlJobInfo glue = xxlJobInfoDao.load(job_group, job_name);
return glue!=null?glue.getGlueSource():null;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<context:component-scan base-package="com.xxl.job.service, com.xxl.job.dao" />
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="fileEncoding" value="utf-8" />
<property name="locations">
<list>
<value>classpath*:jdbc.properties</value>
</list>
</property>
</bean>
<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
<property name="driverClass" value="${c3p0.driverClass}" />
<property name="jdbcUrl" value="${c3p0.url}" />
<property name="user" value="${c3p0.user}" />
<property name="password" value="${c3p0.password}" />
<property name="initialPoolSize" value="3" />
<property name="minPoolSize" value="2" />
<property name="maxPoolSize" value="10" />
<property name="maxIdleTime" value="60" />
<property name="acquireRetryDelay" value="1000" />
<property name="acquireRetryAttempts" value="10" />
<property name="preferredTestQuery" value="SELECT 1" />
</bean>
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="mapperLocations" value="classpath*:mybatis-mapper/*.xml"/>
</bean>
<!-- scope must be "prototype" when junit -->
<bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate" scope="prototype">
<constructor-arg index="0" ref="sqlSessionFactory" />
</bean>
</beans>
\ No newline at end of file
......@@ -15,5 +15,10 @@
<bean id="xxlJobJettyServer" class="com.xxl.job.client.netcom.jetty.XxlJobJettyServer" init-method="start">
<property name="port" value="9999" />
</bean>
<bean id="glueFactory" class="com.xxl.job.client.glue.GlueFactory">
<property name="cacheTimeout" value="5000" />
<property name="glueLoader" ref="dbGlueLoader" />
</bean>
</beans>
\ No newline at end of file
c3p0.driverClass=com.mysql.jdbc.Driver
c3p0.url=jdbc:mysql://localhost:3306/xxl-job?Unicode=true&amp;characterEncoding=UTF-8
c3p0.user=root
c3p0.password=root_pwd
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="XxlJobInfoMapper">
<resultMap id="XxlJobInfo" type="com.xxl.job.dao.model.XxlJobInfo" >
<result column="job_group" property="jobGroup" />
<result column="job_name" property="jobName" />
<result column="glue_source" property="glueSource" />
</resultMap>
<sql id="Base_Column_List">
t.job_group,
t.job_name,
t.glue_source
</sql>
<select id="load" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" />
FROM xxl_job_qrtz_trigger_info AS t
WHERE t.job_group = #{jobGroup}
AND t.job_name = #{jobName}
</select>
</mapper>
\ No newline at end of file
......@@ -60,6 +60,13 @@
<version>3.2.14.RELEASE</version>
<scope>compile</scope>
</dependency>
<!-- groovy-all -->
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
......
package com.xxl.job.client.glue;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import com.xxl.job.client.glue.cache.LocalCache;
import com.xxl.job.client.glue.loader.GlueLoader;
import com.xxl.job.client.handler.IJobHandler;
import com.xxl.job.client.handler.IJobHandler.JobHandleStatus;
import groovy.lang.GroovyClassLoader;
/**
* glue factory, product class/object by name
* @author xuxueli 2016-1-2 20:02:27
*/
public class GlueFactory implements ApplicationContextAware {
private static Logger logger = LoggerFactory.getLogger(GlueFactory.class);
/**
* groovy class loader
*/
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
/**
* glue cache timeout / second
*/
private long cacheTimeout = 5000;
public void setCacheTimeout(long cacheTimeout) {
this.cacheTimeout = cacheTimeout;
}
/**
* code source loader
*/
private GlueLoader glueLoader;
public void setGlueLoader(GlueLoader glueLoader) {
this.glueLoader = glueLoader;
}
// ----------------------------- spring support -----------------------------
private static ApplicationContext applicationContext;
private static GlueFactory glueFactory;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
GlueFactory.applicationContext = applicationContext;
GlueFactory.glueFactory = (GlueFactory) applicationContext.getBean("glueFactory");
}
/**
* inject service of spring
* @param instance
*/
public void injectService(Object instance){
if (instance==null) {
return;
}
Field[] fields = instance.getClass().getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
Object fieldBean = null;
// with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired
if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
try {
fieldBean = applicationContext.getBean(field.getName());
} catch (Exception e) {
}
if (fieldBean==null ) {
fieldBean = applicationContext.getBean(field.getType());
}
} else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
fieldBean = applicationContext.getBean(field.getType());
}
if (fieldBean!=null) {
field.setAccessible(true);
try {
field.set(instance, fieldBean);
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
// ----------------------------- load instance -----------------------------
// load class,
public static String generateClassCacheKey(String job_group, String job_name){
return job_group.concat("_").concat(job_name).concat("_class");
}
public Class<?> loadClass(String job_group, String job_name) throws Exception{
if (job_group==null || job_group.trim().length()==0 || job_name==null || job_name.trim().length()==0) {
return null;
}
String cacheClassKey = generateClassCacheKey(job_group, job_name);
Object cacheClass = LocalCache.getInstance().get(cacheClassKey);
if (cacheClass != null) {
return (Class<?>) cacheClass;
}
String codeSource = glueLoader.load(job_group, job_name);
if (codeSource!=null && codeSource.trim().length()>0) {
Class<?> clazz = groovyClassLoader.parseClass(codeSource);
if (clazz!=null) {
LocalCache.getInstance().set(cacheClassKey, clazz, cacheTimeout);
logger.info(">>>>>>>>>>>> xxl-glue, fresh class, cacheClassKey:{}", cacheClassKey);
return clazz;
}
}
return null;
}
// load new instance, prototype
public IJobHandler loadNewInstance(String job_group, String job_name) throws Exception{
if (job_group==null || job_group.trim().length()==0 || job_name==null || job_name.trim().length()==0) {
return null;
}
Class<?> clazz = loadClass(job_group, job_name);
if (clazz!=null) {
Object instance = clazz.newInstance();
if (instance!=null) {
if (!(instance instanceof IJobHandler)) {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
this.injectService(instance);
return (IJobHandler) instance;
}
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
// // load instance, singleton
public static String generateInstanceCacheKey(String job_group, String job_name){
return job_group.concat("_").concat(job_name).concat("_instance");
}
public IJobHandler loadInstance(String job_group, String job_name) throws Exception{
if (job_group==null || job_group.trim().length()==0 || job_name==null || job_name.trim().length()==0) {
return null;
}
String cacheInstanceKey = generateInstanceCacheKey(job_group, job_name);
Object cacheInstance = LocalCache.getInstance().get(cacheInstanceKey);
if (cacheInstance!=null) {
if (!(cacheInstance instanceof IJobHandler)) {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, "
+ "cannot convert from cacheClass["+ cacheInstance.getClass() +"] to IJobHandler");
}
return (IJobHandler) cacheInstance;
}
Object instance = loadNewInstance(job_group, job_name);
if (instance!=null) {
if (!(instance instanceof IJobHandler)) {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
LocalCache.getInstance().set(cacheInstanceKey, instance, cacheTimeout);
logger.info(">>>>>>>>>>>> xxl-glue, fresh instance, cacheInstanceKey:{}", cacheInstanceKey);
return (IJobHandler) instance;
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, instance is null");
}
// ----------------------------- util -----------------------------
public static JobHandleStatus glue(String job_group, String job_name, String... params) throws Exception{
return GlueFactory.glueFactory.loadInstance(job_group, job_name).handle(params);
}
}
package com.xxl.job.client.glue.cache;
/**
* chche interface
* @author xuxueli 2016-1-8 15:57:27
*/
public interface ICache {
public boolean set(String key, Object value);
public boolean set(String key, Object value, long timeout);
public Object get(String key);
public boolean remove(String key);
}
package com.xxl.job.client.glue.cache;
import java.util.concurrent.ConcurrentHashMap;
/**
* local interface
* @author Administrator
*/
public class LocalCache implements ICache{
private static final LocalCache instance = new LocalCache();
public static LocalCache getInstance(){
return instance;
}
private static final ConcurrentHashMap<String, Object> cacheMap = new ConcurrentHashMap<String, Object>();
private static final long CACHE_TIMEOUT = 5000;
private static String makeTimKey(String key){
return key.concat("_tim");
}
private static String makeDataKey(String key){
return key.concat("_data");
}
@Override
public boolean set(String key, Object value) {
cacheMap.put(makeTimKey(key), System.currentTimeMillis() + CACHE_TIMEOUT);
cacheMap.put(makeDataKey(key), value);
return true;
}
@Override
public boolean set(String key, Object value, long timeout) {
cacheMap.put(makeTimKey(key), System.currentTimeMillis() + timeout);
cacheMap.put(makeDataKey(key), value);
return true;
}
@Override
public Object get(String key) {
Object tim = cacheMap.get(makeTimKey(key));
if (tim != null && System.currentTimeMillis() < Long.parseLong(tim.toString())) {
return cacheMap.get(makeDataKey(key));
}
return null;
}
@Override
public boolean remove(String key) {
cacheMap.remove(makeTimKey(key));
cacheMap.remove(makeDataKey(key));
return true;
}
public static void main(String[] args) {
String key = "key01";
System.out.println(LocalCache.getInstance().get(key));
LocalCache.getInstance().set(key, "v1");
System.out.println(LocalCache.getInstance().get(key));
LocalCache.getInstance().set(key, "v2");
System.out.println(LocalCache.getInstance().get(key));
LocalCache.getInstance().remove(key);
System.out.println(LocalCache.getInstance().get(key));
}
}
package com.xxl.job.client.glue.loader;
/**
* code source loader
* @author xuxueli 2016-1-2 20:01:39
*/
public interface GlueLoader {
/**
* load code source by name, ensure every load is the latest.
* @param name
* @return
*/
public String load(String job_group, String job_name);
}
......@@ -7,8 +7,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.client.handler.impl.GlueJobHandler;
import com.xxl.job.client.log.XxlJobFileAppender;
import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.client.util.JacksonUtil;
/**
......@@ -22,9 +23,13 @@ public class HandlerRepository {
public enum NameSpaceEnum{RUN, KILL, LOG}
public static final String HANDLER_ADDRESS = "handler_address";
public static final String HANDLER_NAME = "handler_name";
public static final String HANDLER_PARAMS = "handler_params";
public static final String HANDLER_GLUE_SWITCH = "handler_glue_switch";
public static final String HANDLER_NAME = "handler_name";
public static final String HANDLER_JOB_GROUP = "handler_job_group";
public static final String HANDLER_JOB_NAME = "handler_job_name";
public static final String TRIGGER_LOG_ID = "trigger_log_id";
public static final String TRIGGER_LOG_URL = "trigger_log_url";
public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
......@@ -64,19 +69,38 @@ public class HandlerRepository {
}
// push data to queue
String handler_name = _param.get(HandlerRepository.HANDLER_NAME);
if (handler_name!=null && handler_name.trim().length()>0) {
HandlerThread handlerThread = handlerTreadMap.get(handler_name);
if (handlerThread != null) {
handlerThread.pushData(_param);
callback.setStatus(RemoteCallBack.SUCCESS);
} else {
String handler_glue_switch = _param.get(HandlerRepository.HANDLER_GLUE_SWITCH);
HandlerThread handlerThread = null;
if ("0".equals(handler_glue_switch)) {
// bean model
String handler_name = _param.get(HandlerRepository.HANDLER_NAME);
if (handler_name == null || handler_name.trim().length()==0) {
callback.setMsg("bean model handler[HANDLER_NAME] not found.");
return JacksonUtil.writeValueAsString(callback);
}
handlerThread = handlerTreadMap.get(handler_name);
if (handlerThread == null) {
callback.setMsg("handler[" + handler_name + "] not found.");
return JacksonUtil.writeValueAsString(callback);
}
}else{
callback.setMsg("param[HANDLER_NAME] can not be null.");
} else {
// glue
String handler_job_group = _param.get(HandlerRepository.HANDLER_JOB_GROUP);
String handler_job_name = _param.get(HandlerRepository.HANDLER_JOB_NAME);
if (handler_job_group == null || handler_job_group.trim().length()==0 || handler_job_name == null || handler_job_name.trim().length()==0) {
callback.setMsg("glue model handler[job group or name] is null.");
return JacksonUtil.writeValueAsString(callback);
}
String glueHandleName = "glue_".concat(handler_job_group).concat("_").concat(handler_job_name);
handlerThread = handlerTreadMap.get(glueHandleName);
if (handlerThread==null) {
HandlerRepository.regist(glueHandleName, new GlueJobHandler(handler_job_group, handler_job_name));
}
handlerThread = handlerTreadMap.get(glueHandleName);
}
handlerThread.pushData(_param);
callback.setStatus(RemoteCallBack.SUCCESS);
} else if (namespace.equals(HandlerRepository.NameSpaceEnum.LOG.name())) {
String trigger_log_id = _param.get(HandlerRepository.TRIGGER_LOG_ID);
String trigger_timestamp = _param.get(HandlerRepository.TRIGGER_TIMESTAMP);
......@@ -128,7 +152,7 @@ public class HandlerRepository {
return JacksonUtil.writeValueAsString(callback);
}
logger.info(">>>>>>>>>>> xxl-job service end, triggerData:{}", new Object[]{callback});
logger.info(">>>>>>>>>>> xxl-job service end, triggerData:{}");
return JacksonUtil.writeValueAsString(callback);
}
......
......@@ -15,7 +15,7 @@ public abstract class IJobHandler extends HandlerRepository{
*/
public abstract JobHandleStatus handle(String... params) throws Exception;
public enum JobHandleStatus{
public static enum JobHandleStatus{
/**
* handle success
*/
......
package com.xxl.job.client.handler;
package com.xxl.job.client.handler.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
......
package com.xxl.job.client.handler.impl;
import com.xxl.job.client.glue.GlueFactory;
import com.xxl.job.client.handler.IJobHandler;
/**
* glue job handler
* @author xuxueli 2016-5-19 21:05:45
*/
public class GlueJobHandler extends IJobHandler {
private String job_group;
private String job_name;
public GlueJobHandler(String job_group, String job_name) {
this.job_group = job_group;
this.job_name = job_name;
}
@Override
public JobHandleStatus handle(String... params) throws Exception {
return GlueFactory.glue(job_group, job_name, params);
}
}
......@@ -16,7 +16,7 @@ import org.springframework.context.ApplicationContextAware;
import com.xxl.job.client.handler.HandlerRepository;
import com.xxl.job.client.handler.IJobHandler;
import com.xxl.job.client.handler.JobHander;
import com.xxl.job.client.handler.annotation.JobHander;
/**
* Created by xuxueli on 2016/3/2 21:14.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册