Skip to content

  • 体验新版
    • 正在加载...
  • 登录
  • KnowledgePlanet
  • docdoc
  • Issue
  • #6

doc
doc
  • 项目概览

KnowledgePlanet / doc

通知 1303
Star 822
Fork 117
  • 代码
    • 文件
    • 提交
    • 分支
    • Tags
    • 贡献者
    • 分支图
    • Diff
  • Issue 42
    • 列表
    • 看板
    • 标记
    • 里程碑
  • 合并请求 0
  • DevOps
    • 流水线
    • 流水线任务
    • 计划
  • Wiki 2
    • Wiki
  • 分析
    • 仓库
    • DevOps
  • 项目成员
  • Pages
doc
doc
  • 项目概览
    • 项目概览
    • 详情
    • 发布
  • 仓库
    • 仓库
    • 文件
    • 提交
    • 分支
    • 标签
    • 贡献者
    • 分支图
    • 比较
  • Issue 42
    • Issue 42
    • 列表
    • 看板
    • 标记
    • 里程碑
  • 合并请求 0
    • 合并请求 0
  • Pages
  • DevOps
    • DevOps
    • 流水线
    • 流水线任务
    • 计划
  • 分析
    • 分析
    • 仓库分析
    • DevOps
  • Wiki 2
    • Wiki
  • 成员
    • 成员
  • 收起侧边栏
  • 动态
  • 分支图
  • 创建新Issue
  • 流水线任务
  • 提交
  • Issue看板
You need to sign in or sign up before continuing.
已关闭
开放中
Opened 3月 25, 2023 by 小傅哥@Yao__Shun__Yu⛹Owner

RocketMQ 支持使用开源的分布式事务解决方案 Apache RocketMQ Transactional Messages 来实现分布式事务。

下面是一个使用 RocketMQ 实现分布式事务的示例:

  1. 首先需要创建一个 TransactionProducer 实例来发送事务消息。
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
  1. 然后在实现 TransactionListener 接口的回调函数中进行事务处理。在这个示例中,我们使用本地文件来模拟事务操作,并在 prepare 方法中执行本地事务,并在 commit 方法中提交本地事务,rollback 方法中回滚本地事务。
class TransactionListenerImpl implements TransactionListener {
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String transactionId = msg.getTransactionId();
        localTrans.put(transactionId, 1);
        // 执行本地事务操作
        // 如果本地事务操作执行成功,则返回 COMMIT_MESSAGE,否则返回 ROLLBACK_MESSAGE
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String transactionId = msg.getTransactionId();
        Integer status = localTrans.get(transactionId);
        if (status != null) {
            switch (status) {
                case 1:
                    return LocalTransactionState.UNKNOW;
                case 2:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 3:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
  1. 使用 TransactionSendResult 实例来发送事务消息。
Message message = new Message("transaction_topic", "transaction_tag", "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
  1. 最后,在 TransactionListener 回调函数中根据本地事务执行结果来确定消息提交或回滚。
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    String transactionId = msg.getTransactionId();
    localTrans.put(transactionId, 1);
    // 执行本地事务操作
    boolean result = doLocalTransaction(msg);
    if (result) {
        localTrans.put(transactionId, 2);
        return LocalTransactionState.COMMIT_MESSAGE;
    } else {
        localTrans.put(transactionId, 3);
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

以上就是一个简单的使用 RocketMQ 实现分布式事务的示例。在实际使用中,还需要考虑消息重发、幂等性等问题。

指派人
分配到
无
里程碑
无
分配里程碑
工时统计
无
截止日期
无
标识: KnowledgePlanet/doc#6
渝ICP备2023009037号

京公网安备11010502055752号

网络110报警服务 Powered by GitLab CE v13.7
开源知识
Git 入门 Pro Git 电子书 在线学 Git
Markdown 基础入门 IT 技术知识开源图谱
帮助
使用手册 反馈建议 博客
《GitCode 隐私声明》 《GitCode 服务条款》 关于GitCode
Powered by GitLab CE v13.7