提交 d842e552 编写于 作者: C Calvin

#221 加入对Jedis的封装,代码改进与注释

上级 4af636d2
......@@ -15,14 +15,14 @@ import redis.clients.jedis.exceptions.JedisDataException;
import com.google.common.collect.Maps;
/**
* Load and Run the lua scripts and support to reload the script when execution failed.
* 装载并执行Lua Script,如果服务器上因为集群多台服务器或重启等原因没有装载script,会自动重新装载后重试。
*/
public class JedisScriptExecutor {
private static Logger logger = LoggerFactory.getLogger(JedisScriptExecutor.class);
private JedisTemplate jedisTemplate;
// Map contains <Script Hash, Script Content> pair
// 以 <Script Hash, Script Content>存储装载过的script,用于重试。
private Map<String, String> hashScriptMap = Maps.newHashMap();
public JedisScriptExecutor(JedisPool jedisPool) {
......@@ -30,9 +30,10 @@ public class JedisScriptExecutor {
}
/**
* Load the script to redis, return the script hash.
* 装载Lua Script,返回Hash值。
* 如果Script出错,抛出JedisDataException。
*/
public synchronized String load(final String script) {
public synchronized String load(final String script) throws JedisDataException {
String hash = jedisTemplate.execute(new JedisTemplate.JedisAction<String>() {
@Override
public String action(Jedis jedis) {
......@@ -44,18 +45,21 @@ public class JedisScriptExecutor {
}
/**
* Execute the script, auto reload the script if it is not in redis.
* 执行Lua Script, 如果Redis服务器上还没装载Script则自动装载并重试。
* 如果Script之前未在executor内装载,抛出IllegalArgumentException。
*/
public Object execute(final String hash, final String[] keys, final String[] args) {
public Object execute(final String hash, final String[] keys, final String[] args) throws IllegalArgumentException {
return execute(hash, Arrays.asList(keys), Arrays.asList(args));
}
/**
* Execute the script, auto reload the script if it is not in redis.
* 执行Lua Script, 如果Redis服务器上还没装载Script则自动装载并重试。
* 如果Script之前未在executor内装载,抛出IllegalArgumentException。
*/
public Object execute(final String hash, final List<String> keys, final List<String> args) {
public Object execute(final String hash, final List<String> keys, final List<String> args)
throws IllegalArgumentException {
if (!hashScriptMap.containsKey(hash)) {
throw new IllegalArgumentException("Script hash " + hash + " is not loaded in executor");
throw new IllegalArgumentException("Script hash " + hash + " is not loaded in executor");
}
try {
......@@ -72,7 +76,7 @@ public class JedisScriptExecutor {
}
/**
* Reload the script and execute it again.
* 重新装载script并执行。
*/
private Object reloadAndExecute(final String hash, final List<String> keys, final List<String> args) {
......
......@@ -9,7 +9,9 @@ import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
/**
* JedisTemplate is the template to execute jedis actions, handle the connection with the pool correctly.
* JedisTemplate 提供了一个template方法,负责对Jedis连接的获取与归还。
* JedisAction<T> 和 JedisActionNoResult两种回调接口,适用于有无返回值两种情况。
* 同时提供一些最常用函数的封装, 如get/set/zadd等。
*/
public class JedisTemplate {
private static Logger logger = LoggerFactory.getLogger(JedisTemplate.class);
......@@ -21,7 +23,7 @@ public class JedisTemplate {
}
/**
* Execute with a call back action with result.
* 执行有返回结果的action。
*/
public <T> T execute(JedisAction<T> jedisAction) throws JedisException {
Jedis jedis = null;
......@@ -39,7 +41,7 @@ public class JedisTemplate {
}
/**
* Execute with a call back action without result.
* 执行无返回结果的action。
*/
public void execute(JedisActionNoResult jedisAction) throws JedisException {
Jedis jedis = null;
......@@ -57,7 +59,7 @@ public class JedisTemplate {
}
/**
* Return jedis connection to the pool, call different return methods depends on the conectionBroken status.
* 根据连接是否已中断的标志,分别调用returnBrokenResource或returnResource。
*/
protected void closeResource(Jedis jedis, boolean connectionBroken) {
if (jedis != null) {
......@@ -70,21 +72,21 @@ public class JedisTemplate {
}
/**
* Get the internal JedisPool.
* 获取内部的pool做进一步的动作。
*/
public JedisPool getJedisPool() {
return jedisPool;
}
/**
* Callback interface for template method.
* 有返回结果的回调接口定义。
*/
public interface JedisAction<T> {
T action(Jedis jedis);
}
/**
* Callback interface for template method without result.
* 无返回结果的回调接口定义。
*/
public interface JedisActionNoResult {
void action(Jedis jedis);
......@@ -92,13 +94,16 @@ public class JedisTemplate {
// ////////////// 常用方法的封装 ///////////////////////// //
// ////////////// Key ///////////////////////// //
public Long del(final String key) {
return execute(new JedisAction<Long>() {
// ////////////// 公共 ///////////////////////////
/**
* 删除key, 如果key存在返回true, 否则返回false。
*/
public boolean del(final String key) {
return execute(new JedisAction<Boolean>() {
@Override
public Long action(Jedis jedis) {
return jedis.del(key);
public Boolean action(Jedis jedis) {
return jedis.del(key) == 1 ? true : false;
}
});
}
......@@ -113,13 +118,13 @@ public class JedisTemplate {
});
}
// ////////////// String ///////////////////////// //
public <T> T get(final String key) {
return execute(new JedisAction<T>() {
// ////////////// 关于String ///////////////////////////
public String get(final String key) {
return execute(new JedisAction<String>() {
@Override
public T action(Jedis jedis) {
return (T) jedis.get(key);
public String action(Jedis jedis) {
return jedis.get(key);
}
});
}
......@@ -134,7 +139,30 @@ public class JedisTemplate {
});
}
public Long incr(final String key) {
public void setex(final String key, final String value, final int seconds) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.setex(key, seconds, value);
}
});
}
/**
* 如果key还不存在则进行设置,返回true,否则返回false.
*/
public boolean setnx(final String key, final String value) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.setnx(key, value) == 1 ? true : false;
}
});
}
public long incr(final String key) {
return execute(new JedisAction<Long>() {
@Override
......@@ -144,7 +172,7 @@ public class JedisTemplate {
});
}
public Long decr(final String key) {
public long decr(final String key) {
return execute(new JedisAction<Long>() {
@Override
......@@ -154,7 +182,7 @@ public class JedisTemplate {
});
}
// ////////////// List ///////////////////////// //
// ////////////// 关于List ///////////////////////////
public void lpush(final String key, final String value) {
execute(new JedisActionNoResult() {
......@@ -165,13 +193,26 @@ public class JedisTemplate {
});
}
// ////////////// Sorted Set ///////////////////////// //
public void zadd(final String key, final double score, final String value) {
// ////////////// 关于Sorted Set ///////////////////////// //
public void zadd(final String key, final double score, final String member) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.zadd(key, score, value);
jedis.zadd(key, score, member);
}
});
}
/**
* 删除sorted set中的元素,成功删除返回true,key或member不存在返回false。
*/
public boolean zrem(final String key, final String member) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.zrem(key, member) == 1 ? true : false;
}
});
}
......
......@@ -5,7 +5,7 @@ public class JedisUtils {
private static final String OK_MULTI_CODE = "+OK";
/**
* Check status is OK or +OK.
* 判断 是 OK 或 +OK.
*/
public static boolean isStatusOk(String status) {
return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status));
......
......@@ -20,7 +20,7 @@ import redis.clients.jedis.JedisPool;
import com.google.common.collect.Lists;
public class JobDispatcher implements Runnable {
public static final String DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua";
public static final String DEFAULT_DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua";
private static Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
......@@ -33,19 +33,23 @@ public class JobDispatcher implements Runnable {
private List<String> keys;
public JobDispatcher(String jobName, JedisPool jedisPool) {
this(jobName, jedisPool, DEFAULT_DISPATCH_LUA_FILE);
}
public JobDispatcher(String jobName, JedisPool jedisPool, String scriptPath) {
this.scriptExecutor = new JedisScriptExecutor(jedisPool);
keys = Lists.newArrayList(jobName + ".job:sleeping", jobName + ".job:ready");
loadLuaScript();
loadLuaScript(scriptPath);
}
private void loadLuaScript() {
private void loadLuaScript(String scriptPath) {
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(DISPATCH_LUA_FILE);
Resource resource = resourceLoader.getResource(scriptPath);
try {
String script = FileUtils.readFileToString(resource.getFile());
scriptHash = scriptExecutor.load(script);
} catch (IOException e) {
throw new IllegalStateException(DISPATCH_LUA_FILE + "not exist", e);
throw new IllegalStateException(DEFAULT_DISPATCH_LUA_FILE + "not exist", e);
}
}
......
......@@ -9,77 +9,48 @@
*----------------------------------------------------------------------------*/
package org.springside.modules.nosql.redis.scheduler;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.nosql.redis.JedisTemplate;
import org.springside.modules.nosql.redis.JedisTemplate.JedisAction;
import org.springside.modules.nosql.redis.JedisTemplate.JedisActionNoResult;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* This is the Redis implementation of SchedulerManager.
* 任务管理,支持任务的安排与取消。
*/
public class JobManager {
private static final int REDIS_READ_TIMEOUT = 5;
private static Logger logger = LoggerFactory.getLogger(JobManager.class);
private String sleepingJobName;
private String readyJobName;
private JedisTemplate jedisTemplate = null;
private JedisTemplate jedisTemplate;
public JobManager(String jobName, JedisPool jedisPool) {
jedisTemplate = new JedisTemplate(jedisPool);
sleepingJobName = jobName + ".job:sleeping";
readyJobName = jobName + ".job:ready";
}
public void scheduleJob(final String job, long delay, TimeUnit timeUnit) {
/**
* 安排任务.
*/
public void scheduleJob(final String job, final long delay, final TimeUnit timeUnit) {
final long delayTimeInMillisecond = System.currentTimeMillis() + timeUnit.toMillis(delay);
jedisTemplate.zadd(sleepingJobName, delayTimeInMillisecond, job);
}
public boolean cancelJob(final String jobId) {
long removeNumber = jedisTemplate.execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zrem(sleepingJobName, jobId);
}
});
/**
* 取消任务,如果任务不存在或已触发返回false, 否则返回true.
*/
public boolean cancelJob(final String job) {
boolean removed = jedisTemplate.zrem(sleepingJobName, job);
if (removeNumber == 0) {
logger.warn("Can't not cancel job by id {}", jobId);
return false;
if (!removed) {
logger.warn("Can't cancel job by value {}", job);
}
return true;
}
public void startJobListener(final JobListener jobListener) {
jedisTemplate.execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
while (true) {
List<String> nameValuePair = jedis.brpop(REDIS_READ_TIMEOUT, readyJobName);
if (!nameValuePair.isEmpty()) {
String job = nameValuePair.get(1);
jobListener.receiveJob(job);
}
}
}
});
}
public interface JobListener {
void receiveJob(String job);
return removed;
}
}
......@@ -585,6 +585,13 @@
<version>${freemarker.version}</version>
</dependency>
<!-- commons-pool -->
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<!-- email -->
<dependency>
<groupId>javax.mail</groupId>
......
......@@ -23,6 +23,7 @@ public abstract class BenchmarkTask implements Runnable {
try {
for (int i = 1; i <= parent.loopCount; i++) {
execute(i);
printProgressMessage(i);
}
} finally {
tearDown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册