从0开始学习 RocketMQ:分布式事务消息的实现

news/2024/9/18 23:28:59 标签: rocketmq

消息队列中的事务,主要是解决消息生产者和消息消费者数据一致性的问题。

应用场景

比如订单系统创建订单后,会发消息给购物车系统,将已下单的商品从购物车中删除。

由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤,所以使用消息队列来异步清理购物车是更合理的设计。

在这里插入图片描述

对于订单系统来说,它做了两件事情

  1. 在订单库中插入了一条订单数据,创建了订单;
  2. 给 MQ 发送了一条订单消息。

对于购物车系统来说,它做了一件事情

  1. 接收订单消息,删除购物车库中的商品,清理购物车。

在分布式系统中,上面的这几个步骤,都有可能失败,如果失败了不做处理的话,就会造成订单数据和购物车数据不一致的情况。

比如:

  1. 创建了订单,没有清理购物车;
  2. 购物车中的商品清掉了,订单没有创建成功。

所以,我们需要做的就是,要保证在任何步骤失败的情况下,订单数据和购物车数据的一致性。

对于购物车系统,失败的处理比较简单,只有成功删除商品后再提交消费确认,如果发生失败,因为没有提交消费确认,消息队列会重试。

所以,问题的重点在于,怎么保证订单系统创建订单和发送消息的步骤,要么都成功,要么都失败,不能一个成功一个失败。

分布式事务

消息队列是如何实现分布式事务的?就要用到事务消息了。

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

在这里插入图片描述半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

在上面的步骤中,如果第 4 步提交事务消息失败了(比如网络异常),怎么办?

对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。

  • Kafka :简单粗暴,直接抛出异常,让用户自行处理。可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿;
  • RocketMQ:事务反查机制。

RocketMQ方案

在 RocketMQ 的分布式事务实现中,增加了事务反查机制来解决事务消息提交失败的问题。

如果订单系统在第 4 步提交或回滚事务消息失败(如网络异常),Broker 迟迟没有收到提交或回滚的消息,Broker 会定期去订单系统上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

所以,订单系统需要提供一个反查本地事务状态的接口,即根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

在这里插入图片描述

使用限制

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

使用建议

避免大量未决事务导致超时

RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
  • 程序能正确识别正在进行中的事务。

使用示例

创建事务主题

sh bin/mqadmin updatetopic -n localhost:9876 -t TransactionTopic -c DefaultCluster -a +message.type=TRANSACTION

生产者代码

模拟正常流程,本地事务成功提交

public class ProducerTransactionExample {


    public static void main(String[] args) throws Exception {
        String endpoint = "182.92.198.60:8080";
        String topic = "TransactionTopic";

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        builder.setRequestTimeout(Duration.ofSeconds(20));
        ClientConfiguration configuration = builder.build();

        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .setTransactionChecker(messageView -> {
                    System.out.println("5.broker回查事务状态");

                    String orderId = messageView.getProperties().get("orderId");

                    if (Strings.isNullOrEmpty(orderId)) {
                        return TransactionResolution.ROLLBACK;
                    }

                    if (checkOrderById(orderId)) {
                        System.out.println("7.本地事务状态成功,提交消息");
                        return TransactionResolution.COMMIT;
                    } else {
                        System.out.println("7.本地事务状态失败,回滚消息");
                        return TransactionResolution.ROLLBACK;
                    }

                })
                .build();


        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
            System.out.println("1.开启事务");
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            System.out.println("1.事务开启失败");
            return;
        }


        // 普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("transaction")
                .addProperty("orderId", "o10086")
                // 消息体。
                .setBody(("测试事务消息,订单号o10086").getBytes())
                .build();


        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
            System.out.println("2.半消息发送成功,messageId:" + sendReceipt.getMessageId());
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            System.out.println("2.半消息发送失败");
            return;
        }

        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
                System.out.println("4.commit事务消息");
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
                System.out.println("4.commit事务消息失败");
            }
        } else {
            try {
                transaction.rollback();
                System.out.println("4.rollback事务消息");
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
                System.out.println("4.rollback事务消息失败");
            }
        }
    }

    /**
     * 模拟本地事务的执行结果
     *
     * @return
     */
    private static boolean doLocalTransaction() {
        System.out.println("3.执行本地事务,处理中");
        try {
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("3.执行本地事务成功,提交事务");
        return true;
    }

    /**
     * 模拟本地事务反查
     *
     * @param orderId
     * @return
     */
    private static boolean checkOrderById(String orderId) {
        System.out.println("6.反查本地事务状态,订单号:" + orderId + "能查到");
        return true;
    }
}

在这里插入图片描述
消费端在第4步后可以消费到消息。

在这里插入图片描述

模拟异常流程,将第4步提交/回滚的代码注释掉
在这里插入图片描述
消费端在第7步后可以消费到消息。

在这里插入图片描述

设置第一次事务回查时间
CHECK_IMMUNITY_TIME_IN_SECONDS 属性定义了从事务消息发送到 Broker 后,Broker 在多长时间内不会对这条消息发起回查。这个时间窗口为生产者提供了一个缓冲期,以确保即使在网络延迟或短暂的服务中断情况下,事务消息也不会被过早地回查。

Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("transaction")
                .addProperty("orderId", "o10086")
                .addProperty("CHECK_IMMUNITY_TIME_IN_SECONDS", "300")
                // 消息体。
                .setBody(("测试事务消息,订单号o10086").getBytes())
                .build();

在这里插入图片描述
消费端消费

在这里插入图片描述


http://www.niftyadmin.cn/n/5664697.html

相关文章

C++: 高效使用智能指针的8个建议

前言:智能指针是C11的新特性,它基于RAII实现,可以自动管理内存资源,避免内存泄漏的发生,但是智能指针也并不是万能的,如果不正确使用智能指针,也会导致内存泄漏的发生,因此&#xff…

UDP聊天室项目

代码思路 服务器 #include <stdio.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <netinet/in.h> #include <netinet/ip.h> #include <stdlib.h> #include <unistd.h> #include <arpa/inet.h>…

实时(按帧)处理的低通滤波C语言实现

写在前面&#xff1a; 低通滤波采用一般的FIR滤波器&#xff0c;因为本次任务&#xff0c;允许的延迟较多&#xff0c;或者说前面损失的信号可以较多&#xff0c;因此&#xff0c;涉及一个很高阶的FIR滤波器&#xff0c;信号起始段的信号点可以不处理&#xff0c;以及&#xf…

p14 使用阿里云服务器的docker部署NGINX

拉取NGINX的镜像 这里因为之前已经配置过从阿里云的镜像仓库里面拿镜像所以这里直接就执行docker pull nginx拉取NGINX镜像就OK了 运行NGINX镜像 这里执行docker run -d --name nginx01 -p 3344:80 nginx这里3344是服务器访问的端口80是容器内部的端口&#xff0c;可以看到…

C语言操作数据库

目录 一、引言 二、环境准备 三、C语言操作数据库步骤 1.数据库连接 2.数据库查询 3.数据库插入、更新和删除 四、总结 本文将详细介绍如何在C语言中操作数据库&#xff0c;包括数据库的连接、查询、插入、更新和删除等操作。通过本文的学习&#xff0c;读者可以掌握C语言操…

ruby和python哪个好学

Ruby和python都挺好学的。建议学习Python&#xff0c;语法的话&#xff0c;Python相对更简洁。而且Python应用场合更广泛&#xff0c;运维、网站开发、数据处理、科学研究都可以。 Ruby和Python十分相似&#xff0c;有很多共同点&#xff0c;但也有一些不同之外&#xff0c;以…

二叉树OJ题——对称二叉树

文章目录 一、题目链接二、解题思路三、解题代码 一、题目链接 对称二叉树 二、解题思路 三、解题代码

代码随想录算法训练营第三十六天 | 1049. 最后一块石头的重量 II,494. 目标和,474.一和零

第三十六天打卡&#xff0c;今天的题还是比较抽象&#xff0c;特别是后面两题&#xff0c;他们是01背包问题&#xff0c;只是递推公式变了&#xff0c;这点不容易想到 1049.最后一块石头的重量Ⅱ 题目链接 解题过程 dp[j]表示容量&#xff08;这里说容量更形象&#xff0c;其…