提交 824dcb96 编写于 作者: C Calvin

#171 Redis增加session timer方案, 基于redis的job系统的最终版了吧

上级 12922de6
......@@ -94,6 +94,7 @@ public class RedisMassInsertionBenchmark extends ConcurrentBenchmark {
printProgressMessage(i);
}
}
pl.sync();
} finally {
onThreadFinish();
pool.returnResource(jedis);
......
package org.springside.examples.showcase.demos.redis.sessionTimer;
package org.springside.examples.showcase.demos.redis.job;
import java.util.List;
import java.util.concurrent.ExecutorService;
......@@ -17,7 +17,7 @@ import com.google.common.util.concurrent.RateLimiter;
*
* @author calvin
*/
public class RedisSessionTimerConsumer implements Runnable {
public class JobConsumer implements Runnable {
private static final int THREAD_COUNT = 10;
private static final int PRINT_BETWEEN_SECONDS = 10;
......@@ -38,27 +38,31 @@ public class RedisSessionTimerConsumer implements Runnable {
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
RedisSessionTimerConsumer consumer = new RedisSessionTimerConsumer();
JobConsumer consumer = new JobConsumer();
threadPool.submit(consumer);
}
System.out.println("Hit enter to stop");
System.in.read();
System.out.println("Shuting down");
threadPool.shutdownNow();
boolean shutdownSucess = threadPool.awaitTermination(5, TimeUnit.SECONDS);
tearDown();
if (!shutdownSucess) {
System.out.println("Forcing exiting.");
System.exit(-1);
while (true) {
char c = (char) System.in.read();
if (c == '\n') {
System.out.println("Shuting down");
threadPool.shutdownNow();
boolean shutdownSucess = threadPool.awaitTermination(5, TimeUnit.SECONDS);
tearDown();
if (!shutdownSucess) {
System.out.println("Forcing exiting.");
System.exit(-1);
}
}
}
}
public static void setUp() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxActive(THREAD_COUNT);
pool = new JedisPool(poolConfig, RedisSessionTimerDistributor.HOST, RedisSessionTimerDistributor.PORT,
RedisSessionTimerDistributor.TIMEOUT);
pool = new JedisPool(poolConfig, JobManager.HOST, JobManager.PORT, JobManager.TIMEOUT);
}
public static void tearDown() {
......@@ -72,11 +76,11 @@ public class RedisSessionTimerConsumer implements Runnable {
//Jedis的brpop 不会被中断, 所以下面的判断基本没用, 全靠外围的强行退出.
while (!Thread.currentThread().isInterrupted()) {
//fetch job
List<String> result = jedis.brpop(0, RedisSessionTimerDistributor.JOB_KEY);
List<String> result = jedis.brpop(0, JobManager.JOB_KEY);
String id = result.get(1);
//ack job
jedis.zrem(RedisSessionTimerDistributor.ACK_KEY, id);
jedis.zrem(JobManager.ACK_KEY, id);
int count = counter.incrementAndGet();
int localCount = localCounter.incrementAndGet();
......
package org.springside.examples.showcase.demos.redis.sessionTimer;
package org.springside.examples.showcase.demos.redis.job;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
......@@ -15,38 +16,44 @@ import com.google.common.util.concurrent.RateLimiter;
*
* @author calvin
*/
public class RedisSessionTimerDistributor implements Runnable {
public class JobManager implements Runnable {
public static final String TIMER_KEY = "ss.timer";
public static final String JOB_KEY = "ss.job";
public static final String ACK_KEY = "ss.ack";
public static final String TIMER_KEY = "ss.job:schedule";
public static final String JOB_KEY = "ss.job:queue";
public static final String ACK_KEY = "ss.job:ack";
public static final int EXPECT_TPS = 2500;
public static final int DELAY_SECONDS = 10;
public static final String HOST = "localhost";
public static final int PORT = Protocol.DEFAULT_PORT;
public static final int TIMEOUT = 5000;
private static final int PRINT_BETWEEN_SECONDS = 20;
private static int BATCH_SIZE = 2500;
private Jedis jedis;
private String scriptSha;
private int loop = 1;
private long totalTime;
private AtomicLong totalTime = new AtomicLong(0);
private RateLimiter printRate = RateLimiter.create(1d / PRINT_BETWEEN_SECONDS);
public static void main(String[] args) throws Exception {
RedisSessionTimerDistributor distributor = new RedisSessionTimerDistributor();
JobManager distributor = new JobManager();
distributor.setUp();
try {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleAtFixedRate(distributor, 5, 1, TimeUnit.SECONDS);
threadPool.scheduleAtFixedRate(distributor, 10, 1, TimeUnit.SECONDS);
System.out.println("Hit enter to stop.");
System.in.read();
System.out.println("Shuting down");
threadPool.shutdownNow();
threadPool.awaitTermination(3, TimeUnit.SECONDS);
while (true) {
char c = (char) System.in.read();
if (c == '\n') {
System.out.println("Shuting down");
threadPool.shutdownNow();
threadPool.awaitTermination(3, TimeUnit.SECONDS);
}
}
} finally {
distributor.tearDown();
}
......@@ -54,16 +61,16 @@ public class RedisSessionTimerDistributor implements Runnable {
public void setUp() {
jedis = new Jedis(HOST, PORT, TIMEOUT);
String script = "local jobWithScores=redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'withscores')\n";
String script = "local jobWithScores=redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'withscores')\n";
script += " redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1])\n";
script += " for i=1,ARGV[2] do \n";
script += " redis.call('lpush', KEYS[2], jobWithScores[i*2-1])\n";
script += " end\n";
script += " for i=1,ARGV[2] do \n";
script += " redis.call('zadd', KEYS[3], jobWithScores[i*2], jobWithScores[i*2-1])\n";
script += " end\n";
script += " redis.call('zremrangebyscore', KEYS[1], 0, ARGV[1])";
System.out.println(script);
System.out.println("Lua Scripts is:\n" + script);
scriptSha = jedis.scriptLoad(script);
}
......@@ -74,13 +81,16 @@ public class RedisSessionTimerDistributor implements Runnable {
@Override
public void run() {
long startTime = System.currentTimeMillis();
jedis.evalsha(scriptSha, Lists.newArrayList(TIMER_KEY, JOB_KEY, ACK_KEY),
Lists.newArrayList(String.valueOf(loop * BATCH_SIZE - 1), String.valueOf(BATCH_SIZE)));
loop++;
Lists.newArrayList(String.valueOf(startTime), String.valueOf(EXPECT_TPS)));
long spendTime = System.currentTimeMillis() - startTime;
totalTime += spendTime;
totalTime.addAndGet(spendTime);
loop++;
if (printRate.tryAcquire()) {
System.out.printf("Average time %d ms \n", totalTime / loop);
System.out.printf("Last time %,d ms, average time %,d ms \n", spendTime, totalTime.longValue() / loop);
}
}
}
package org.springside.examples.showcase.demos.redis.sessionTimer;
package org.springside.examples.showcase.demos.redis.job;
import java.util.concurrent.atomic.AtomicLong;
import org.springside.modules.test.benchmark.BenchmarkTask;
import org.springside.modules.test.benchmark.ConcurrentBenchmark;
......@@ -8,23 +10,26 @@ import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* 将Timer放入ss.timer(Sorted set).
* 将Job放入ss.job:schedule(sorted set).
*
* @author calvin
*/
public class RedisSessionTimerProducer extends ConcurrentBenchmark {
private static final int THREAD_COUNT = 10;
private static final long LOOP_COUNT = 2500 * 60;
public class JobProducer extends ConcurrentBenchmark {
private static final int THREAD_COUNT = 5;
private static final long LOOP_COUNT = 5000 * 60;
private static final int PRINT_BETWEEN_SECONDS = 10;
private static AtomicLong idGenerator = new AtomicLong(0);
private static long expireTime;
private JedisPool pool;
public static void main(String[] args) throws Exception {
RedisSessionTimerProducer benchmark = new RedisSessionTimerProducer(THREAD_COUNT, LOOP_COUNT);
JobProducer benchmark = new JobProducer(THREAD_COUNT, LOOP_COUNT);
benchmark.run();
}
public RedisSessionTimerProducer(int threadCount, long loopCount) {
public JobProducer(int threadCount, long loopCount) {
super(threadCount, loopCount);
}
......@@ -33,8 +38,8 @@ public class RedisSessionTimerProducer extends ConcurrentBenchmark {
//create jedis pool
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxActive(THREAD_COUNT);
pool = new JedisPool(poolConfig, RedisSessionTimerDistributor.HOST, RedisSessionTimerDistributor.PORT,
RedisSessionTimerDistributor.TIMEOUT);
pool = new JedisPool(poolConfig, JobManager.HOST, JobManager.PORT, JobManager.TIMEOUT);
expireTime = System.currentTimeMillis() + JobManager.DELAY_SECONDS * 1000;
}
@Override
......@@ -44,12 +49,12 @@ public class RedisSessionTimerProducer extends ConcurrentBenchmark {
@Override
protected BenchmarkTask createTask(int index) {
return new SessionTimerProducerTask(index, this, PRINT_BETWEEN_SECONDS);
return new JobProducerTask(index, this, PRINT_BETWEEN_SECONDS);
}
public class SessionTimerProducerTask extends BenchmarkTask {
public class JobProducerTask extends BenchmarkTask {
public SessionTimerProducerTask(int index, ConcurrentBenchmark parent, int printBetweenSeconds) {
public JobProducerTask(int index, ConcurrentBenchmark parent, int printBetweenSeconds) {
super(index, parent, printBetweenSeconds);
}
......@@ -59,10 +64,14 @@ public class RedisSessionTimerProducer extends ConcurrentBenchmark {
onThreadStart();
try {
for (int i = 0; i < loopCount; i++) {
long scheduleTime = taskSequence * loopCount + i;
jedis.zadd(RedisSessionTimerDistributor.TIMER_KEY, scheduleTime, String.valueOf(scheduleTime));
long jobId = idGenerator.getAndIncrement();
jedis.zadd(JobManager.TIMER_KEY, expireTime, String.valueOf(jobId));
//达到TPS上限后,expireTime往后滚动一秒
if (jobId % JobManager.EXPECT_TPS == 0) {
expireTime += 1000;
}
printProgressMessage(i);
}
} finally {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册