提交 7cc72a01 编写于 作者: C Calvin

#221 加入对Jedis的封装,代码改进与注释

上级 d842e552
......@@ -53,7 +53,7 @@ public class JobListenerDemo implements JobHandler {
if (c == '\n') {
System.out.println("Shutting down");
threadPool.shutdownNow();
boolean shutdownSucess = threadPool.awaitTermination(JobListener.POPUP_TIMEOUT + 1,
boolean shutdownSucess = threadPool.awaitTermination(JobListener.DEFAULT_POPUP_TIMEOUT + 1,
TimeUnit.SECONDS);
if (!shutdownSucess) {
......
......@@ -58,6 +58,9 @@ public class JedisScriptExecutor {
*/
public Object execute(final String hash, final List<String> keys, final List<String> args)
throws IllegalArgumentException {
final long start = System.currentTimeMillis();
if (!hashScriptMap.containsKey(hash)) {
throw new IllegalArgumentException("Script hash " + hash + " is not loaded in executor。");
}
......@@ -66,12 +69,16 @@ public class JedisScriptExecutor {
return jedisTemplate.execute(new JedisAction<Object>() {
@Override
public Object action(Jedis jedis) {
return jedis.evalsha(hash, keys, args);
Object result = jedis.evalsha(hash, keys, args);
logger.debug("Script hash {} execution time is {}ms", hash, System.currentTimeMillis() - start);
return result;
}
});
} catch (JedisDataException e) {
logger.warn("Lua execution error, try to reload the script.", e);
return reloadAndExecute(hash, keys, args);
logger.warn("Script hash {} is not loaded yet, try to reload and run it again", hash, e);
Object result = reloadAndExecute(hash, keys, args);
logger.debug("Script hash {} reload and execution time is {}ms", hash, System.currentTimeMillis() - start);
return result;
}
}
......
......@@ -11,7 +11,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springside.modules.nosql.redis.JedisScriptExecutor;
import org.springside.modules.utils.Threads.WrapExceptionRunnable;
......@@ -19,12 +18,17 @@ import redis.clients.jedis.JedisPool;
import com.google.common.collect.Lists;
/**
* 定时分发任务。 启动线程定时从sleeping job sorted set 中取出到期的任务放入ready job list.
*
* @author calvin
*/
public class JobDispatcher implements Runnable {
public static final String DEFAULT_DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua";
private static Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
private ScheduledExecutorService threadPool;
private ScheduledExecutorService scheduledThreadPool;
private JedisScriptExecutor scriptExecutor;
......@@ -37,31 +41,39 @@ public class JobDispatcher implements Runnable {
}
public JobDispatcher(String jobName, JedisPool jedisPool, String scriptPath) {
keys = Lists.newArrayList(Keys.getSleepingJobKey(jobName), Keys.getReadyJobKey(jobName));
this.scriptExecutor = new JedisScriptExecutor(jedisPool);
keys = Lists.newArrayList(jobName + ".job:sleeping", jobName + ".job:ready");
loadLuaScript(scriptPath);
}
private void loadLuaScript(String scriptPath) {
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(scriptPath);
String script;
try {
String script = FileUtils.readFileToString(resource.getFile());
scriptHash = scriptExecutor.load(script);
Resource resource = new DefaultResourceLoader().getResource(scriptPath);
script = FileUtils.readFileToString(resource.getFile());
} catch (IOException e) {
throw new IllegalStateException(DEFAULT_DISPATCH_LUA_FILE + "not exist", e);
throw new IllegalArgumentException(scriptPath + " is not exist.", e);
}
scriptHash = scriptExecutor.load(script);
}
/**
* 启动分发线程, 自行创建scheduler线程池.
*/
public void start(long periodMilliseconds) {
threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0, periodMilliseconds, TimeUnit.MILLISECONDS);
this.scheduledThreadPool = Executors.newScheduledThreadPool(1);
scheduledThreadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0, periodMilliseconds,
TimeUnit.MILLISECONDS);
}
/**
* 停止分发任务, 默认最多延时10秒等候线程关闭.
*/
public void stop() {
threadPool.shutdownNow();
scheduledThreadPool.shutdownNow();
try {
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
if (!scheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
logger.error("Job dispatcher terminate failed!");
}
} catch (InterruptedException e) {
......@@ -69,12 +81,13 @@ public class JobDispatcher implements Runnable {
}
}
/**
* 以当前时间为参数执行Lua Script分发任务。
*/
@Override
public void run() {
long currTime = System.currentTimeMillis();
List<String> args = Lists.newArrayList(String.valueOf(currTime));
scriptExecutor.execute(scriptHash, keys, args);
long luaExecTime = System.currentTimeMillis() - currTime;
logger.debug("Execution Time={}ms.", luaExecTime);
}
}
......@@ -18,21 +18,21 @@ import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* This is the Redis implementation of SchedulerManager.
* 阻塞接收任务的Runnable.
*/
public class JobListener implements Runnable {
public static final int POPUP_TIMEOUT = 5;
public static final int DEFAULT_POPUP_TIMEOUT = 5;
private String readyJobName;
private String readyJobKey;
private JedisTemplate jedisTemplate = null;
private JedisTemplate jedisTemplate;
private final JobHandler jobHandler;
public JobListener(String jobName, JedisPool jedisPool, JobHandler jobHandler) {
jedisTemplate = new JedisTemplate(jedisPool);
readyJobName = jobName + ".job:ready";
readyJobKey = Keys.getReadyJobKey(jobName);
this.jobHandler = jobHandler;
}
......@@ -42,7 +42,7 @@ public class JobListener implements Runnable {
@Override
public void action(Jedis jedis) {
while (!Thread.currentThread().isInterrupted()) {
List<String> nameValuePair = jedis.brpop(POPUP_TIMEOUT, readyJobName);
List<String> nameValuePair = jedis.brpop(DEFAULT_POPUP_TIMEOUT, readyJobKey);
if ((nameValuePair != null) && !nameValuePair.isEmpty()) {
String job = nameValuePair.get(1);
jobHandler.handleJob(job);
......
......@@ -24,13 +24,13 @@ public class JobManager {
private static Logger logger = LoggerFactory.getLogger(JobManager.class);
private String sleepingJobName;
private String sleepingJobKey;
private JedisTemplate jedisTemplate;
public JobManager(String jobName, JedisPool jedisPool) {
jedisTemplate = new JedisTemplate(jedisPool);
sleepingJobName = jobName + ".job:sleeping";
sleepingJobKey = Keys.getSleepingJobKey(jobName);
}
/**
......@@ -38,14 +38,14 @@ public class JobManager {
*/
public void scheduleJob(final String job, final long delay, final TimeUnit timeUnit) {
final long delayTimeInMillisecond = System.currentTimeMillis() + timeUnit.toMillis(delay);
jedisTemplate.zadd(sleepingJobName, delayTimeInMillisecond, job);
jedisTemplate.zadd(sleepingJobKey, delayTimeInMillisecond, job);
}
/**
* 取消任务,如果任务不存在或已触发返回false, 否则返回true.
* 取消任务, 如果任务不存在或已被触发返回false, 否则返回true.
*/
public boolean cancelJob(final String job) {
boolean removed = jedisTemplate.zrem(sleepingJobName, job);
boolean removed = jedisTemplate.zrem(sleepingJobKey, job);
if (!removed) {
logger.warn("Can't cancel job by value {}", job);
......
package org.springside.modules.nosql.redis.scheduler;
public class Keys {
public static String getSleepingJobKey(String jobName) {
return new StringBuilder().append("job:").append(jobName).append(":sleepingjob").toString();
}
public static String getReadyJobKey(String jobName) {
return new StringBuilder().append("job:").append(jobName).append(":sleepingjob").toString();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册