Skip to content

  • 体验新版
    • 正在加载...
  • 登录
  • KnowledgePlanet
  • docdoc
  • Issue
  • #27

doc
doc
  • 项目概览

KnowledgePlanet / doc

通知 1303
Star 822
Fork 117
  • 代码
    • 文件
    • 提交
    • 分支
    • Tags
    • 贡献者
    • 分支图
    • Diff
  • Issue 42
    • 列表
    • 看板
    • 标记
    • 里程碑
  • 合并请求 0
  • DevOps
    • 流水线
    • 流水线任务
    • 计划
  • Wiki 2
    • Wiki
  • 分析
    • 仓库
    • DevOps
  • 项目成员
  • Pages
doc
doc
  • 项目概览
    • 项目概览
    • 详情
    • 发布
  • 仓库
    • 仓库
    • 文件
    • 提交
    • 分支
    • 标签
    • 贡献者
    • 分支图
    • 比较
  • Issue 42
    • Issue 42
    • 列表
    • 看板
    • 标记
    • 里程碑
  • 合并请求 0
    • 合并请求 0
  • Pages
  • DevOps
    • DevOps
    • 流水线
    • 流水线任务
    • 计划
  • 分析
    • 分析
    • 仓库分析
    • DevOps
  • Wiki 2
    • Wiki
  • 成员
    • 成员
  • 收起侧边栏
  • 动态
  • 分支图
  • 创建新Issue
  • 流水线任务
  • 提交
  • Issue看板
已关闭
开放中
Opened 8月 18, 2023 by 小傅哥@Yao__Shun__Yu⛹Owner

Redis 延迟队列

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Tuple;

import java.util.Set;

public class DelayedConsumer {

    private static final String DELAYED_QUEUE_KEY = "delayed_queue";
    private static final String READY_QUEUE_KEY = "ready_queue";

    public static void main(String[] args) {
        // 创建Redis连接
        Jedis jedis = new Jedis("localhost");

        // 创建延迟消息生产者
        DelayedProducer producer = new DelayedProducer(jedis);

        // 创建延迟消息消费者
        DelayedConsumer consumer = new DelayedConsumer(jedis);

        // 启动延迟消息消费者
        consumer.start();

        // 发送延迟消息
        producer.sendDelayedMessage("message1", 5000); // 5秒后消费
        producer.sendDelayedMessage("message2", 10000); // 10秒后消费

        // 关闭Redis连接
        jedis.close();
    }

    private Jedis jedis;

    public DelayedConsumer(Jedis jedis) {
        this.jedis = jedis;
    }

    public void start() {
        new Thread(() -> {
            while (true) {
                // 从延迟队列中获取到期的消息
                Set<Tuple> messages = jedis.zrangeWithScores(DELAYED_QUEUE_KEY, 0, 0);
                if (messages.isEmpty()) {
                    continue;
                }

                // 获取消息的到期时间
                long score = messages.iterator().next().getScore();
                long currentTime = System.currentTimeMillis();

                // 如果消息还未到期,则等待一段时间后重新检查
                if (score > currentTime) {
                    try {
                        Thread.sleep(score - currentTime);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }

                // 将消息从延迟队列移动到就绪队列
                String message = messages.iterator().next().getElement();
                jedis.zrem(DELAYED_QUEUE_KEY, message);
                jedis.rpush(READY_QUEUE_KEY, message);

                System.out.println("消费消息:" + message);
            }
        }).start();
    }
}

class DelayedProducer {

    private Jedis jedis;

    public DelayedProducer(Jedis jedis) {
        this.jedis = jedis;
    }

    public void sendDelayedMessage(String message, long delay) {
        // 计算消息的到期时间
        long currentTime = System.currentTimeMillis();
        long expireTime = currentTime + delay;

        // 将消息添加到延迟队列
        jedis.zadd(DELAYED_QUEUE_KEY, expireTime, message);
    }
}
指派人
分配到
无
里程碑
无
分配里程碑
工时统计
无
截止日期
无
标识: KnowledgePlanet/doc#27
渝ICP备2023009037号

京公网安备11010502055752号

网络110报警服务 Powered by GitLab CE v13.7
开源知识
Git 入门 Pro Git 电子书 在线学 Git
Markdown 基础入门 IT 技术知识开源图谱
帮助
使用手册 反馈建议 博客
《GitCode 隐私声明》 《GitCode 服务条款》 关于GitCode
Powered by GitLab CE v13.7