import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Tuple;
import java.util.Set;
public class DelayedConsumer {
private static final String DELAYED_QUEUE_KEY = "delayed_queue";
private static final String READY_QUEUE_KEY = "ready_queue";
public static void main(String[] args) {
// 创建Redis连接
Jedis jedis = new Jedis("localhost");
// 创建延迟消息生产者
DelayedProducer producer = new DelayedProducer(jedis);
// 创建延迟消息消费者
DelayedConsumer consumer = new DelayedConsumer(jedis);
// 启动延迟消息消费者
consumer.start();
// 发送延迟消息
producer.sendDelayedMessage("message1", 5000); // 5秒后消费
producer.sendDelayedMessage("message2", 10000); // 10秒后消费
// 关闭Redis连接
jedis.close();
}
private Jedis jedis;
public DelayedConsumer(Jedis jedis) {
this.jedis = jedis;
}
public void start() {
new Thread(() -> {
while (true) {
// 从延迟队列中获取到期的消息
Set<Tuple> messages = jedis.zrangeWithScores(DELAYED_QUEUE_KEY, 0, 0);
if (messages.isEmpty()) {
continue;
}
// 获取消息的到期时间
long score = messages.iterator().next().getScore();
long currentTime = System.currentTimeMillis();
// 如果消息还未到期,则等待一段时间后重新检查
if (score > currentTime) {
try {
Thread.sleep(score - currentTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
// 将消息从延迟队列移动到就绪队列
String message = messages.iterator().next().getElement();
jedis.zrem(DELAYED_QUEUE_KEY, message);
jedis.rpush(READY_QUEUE_KEY, message);
System.out.println("消费消息:" + message);
}
}).start();
}
}
class DelayedProducer {
private Jedis jedis;
public DelayedProducer(Jedis jedis) {
this.jedis = jedis;
}
public void sendDelayedMessage(String message, long delay) {
// 计算消息的到期时间
long currentTime = System.currentTimeMillis();
long expireTime = currentTime + delay;
// 将消息添加到延迟队列
jedis.zadd(DELAYED_QUEUE_KEY, expireTime, message);
}
}