怎么正确使用RabbitMQ异步编程

技术怎么正确使用RabbitMQ异步编程这篇文章主要介绍“怎么正确使用RabbitMQ异步编程”,在日常操作中,相信很多人在怎么正确使用RabbitMQ异步编程问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法

本文主要介绍“如何正确使用RabbitMQ异步编程”。在日常操作中,相信很多人对于如何正确使用RabbitMQ异步编程有疑问。边肖查阅了各种资料,整理出简单易用的操作方法,希望能帮助大家解决“如何正确使用RabbitMQ异步编程”的疑惑!接下来,请和边肖一起学习!

1 适用场景

1.1 服务于主流程的分支流程

在注册过程中,将数据写入DB是主要流程,但注册后向用户发送优惠券或欢迎消息是分支流程,时效性不强。

1.2 用户无需实时看到结果

比如,取单后的配送和发货过程可以异步处理,每一阶段处理完成后,可以通过发送推送或短信的方式通知用户。

1.3 MQ

任务缓冲区分配、流量削峰、服务解耦和消息广播。

当然,异步处理不仅是通过MQ实现的,还有其他方式。

例如,启动一个新线程来执行并返回到未来。

还有各种异步框架,比如Vertx,都是通过回调实现的。

2 异步处理之坑

2.1 异步处理需做消息补偿以闭环

虽然RabbitMQ可以将消息放入磁盘,但即使MQ异常消息数据不会丢失,在消息发送、传输和处理的异步过程中也可能发生消息丢失。不能保证所有的MQ都是100%可用的,业务设计应该考虑异步流程在不可用时将如何继续。

因此,对于异步处理流,需要考虑补偿或建立主备主动流。

2.1.1 案例

用户注册后异步发送欢迎消息。

用户注册数据库是一个同步的过程。

成员在收到消息后发送欢迎消息是一个异步过程。

怎么正确使用RabbitMQ异步编程

蓝线

异步处理(主线),消息可能丢失(虚线代表异步调用)。

绿线

补偿作业定期消息补偿(备用线路),用于补偿主线丢失的消息。

考虑极端的MQ中间件故障场景。

要求备用线的处理吞吐能力达到主线性能。

代码示例

用户控制器注册发送异步消息。该注册方法一次注册10个用户,用户注册消息无法发出的概率为50%。

怎么正确使用RabbitMQ异步编程

MemberService成员服务监视用户注册成功的消息,并发送欢迎消息。ConcurrentHashMap用于存储已发送短信的用户的ID,以实现幂等性,从而避免同一用户补偿时重复发送短信。

怎么正确使用RabbitMQ异步编程

对于MQ消费程序,处理逻辑必须考虑重复数据消除(支持幂等性):

由于配置错误和中间件本身的稳定性,MQ消息可能会重复。

自动补偿重复。

例如,在这个例子中,同一个消息可能同时经过MQ和补偿,这肯定会被重复。考虑到高凝聚力,薪酬工作本身不会重复。

手动补偿重复。

当消息累积发生时,异步处理流程将不可避免地被延迟。如果提供补偿功能,当处理遇到延迟时,很可能会先手动进行补偿,一段时间后,处理程序会再次收到消息,重复处理。

有几次MQ失败,数十万条发放资金的消息堆积在MQ中,导致业务无法及时处理。如果操作认为程序错误,会先在后台手动处理。结果,消息在MQ系统恢复后被再次处理,导致大量资金被重复发放。

异步处理必须考虑消息重复的可能性,因此处理逻辑必须实现幂等性以防止重复处理。

接着定义补偿Job即备线操作。

计划任务,每5秒补偿一次,因为作业不知道注册了哪些用户。

消息可能丢失,所以是全量补偿。

  • 补偿逻辑

         每5秒补偿一次,按顺序一次补偿5个用户,下一次补偿操作从上一次补偿的最后一个用户ID开始

         补偿任务提交到线程池以“异步”处理,提高处理能力

怎么正确使用RabbitMQ异步编程

为实现高内聚,主线和备线处理消息,最好使用同一方法。本案例的MemberService监听到MQ消息和CompensationJob补偿,调用的都是welcome。

这里的补偿逻辑简单仅为 demo,实际生产代码须:

  • 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适值,以满足补偿的吞吐量

  • 考虑备线补偿数据进行适当延迟

  • 比如,对注册时间在30s前的用户再进行补偿,以方便和主线MQ实时流程错开,避免冲突

  • 诸如当前补偿到哪个用户的offset数据,需要落地DB

  • 补偿Job本身须高可用,可使用类似xxl-job或ElasticJob等任务系统。

运行程序,执行注册方法注册10个用户,查看日志

怎么正确使用RabbitMQ异步编程

可见

  • 共10个用户,MQ发送成功的用户有四个:1、5、7、8

  • 补偿任务第一次运行,补偿了用户2、3、4,第二次运行补偿了用户6、9,第三次运行补充了用户10

消息补偿闭环的最高标准

能够达到补偿全量数据的吞吐量。即若补偿备线足够完善,即使直接停机MQ,虽会稍微影响处理及时性,但至少确保流程都能正常执行。

小结

实际开发要考虑异步流程丢消息或处理中断场景。

异步流程需有备线以补偿,比如这里的全量补偿方式,即便异步流程彻底失效,通过补偿也能让业务继续进行。

2.2 RabbitMQ广播、工作队列模式坑

消息模式是广播 Or 工作队列

  • 消息广播

        同一消息,不同消费者都能分别消费

  • 队列模式

        不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。

比如同一用户的注册消息

  • 会员服务需监听以发送欢迎短信

  • 营销服务需监听以发送新用户小礼物

但会员、营销服务都可能有多实例,业务需求同一用户的消息,可同时广播给不同的服务(广播模式),但对同一服务的不同实例(比如会员服务1和会员服务2),不管哪个实例来处理,处理一次即可(工作队列模式):

怎么正确使用RabbitMQ异步编程

实现代码时务必确认MQ系统的机制,确保消息的路由按期望。

RocketMQ实现类似功能比较简单直白:若消费者属于一个组,那么消息只会由同组的一个消费者消费;若消费者属不同组,每个组都能消费一遍消息。

而RabbitMQ的消息路由模式采用队列+交换器,队列是消息载体,交换器决定消息路由到队列的方式。

step1:会员服务-监听用户服务发出的新用户注册消息

若启动俩会员服务,那么同一用户的注册消息应只能被其中一个实例消费。

分别实现RabbitMQ队列、交换器、绑定三件套。

  • 队列使用匿名队列

  • 交换器使用DirectExchange,交换器绑定到匿名队列的路由Key是空字符串

收到消息之后,打印所在实例使用的端口。

  • 消息发布者、消费者、以及MQ的配置

怎么正确使用RabbitMQ异步编程

使用12345和45678两个端口启动俩程序实例后,发条消息,输出的日志,显示同一会员服务两个实例都收到了消息:

怎么正确使用RabbitMQ异步编程

怎么正确使用RabbitMQ异步编程

所以问题在于不明

RabbitMQ直接交换器和队列的绑定关系

RabbitMQ的直接交换器根据routingKey路由消息。而程序每次启动都会创建匿名(随机命名)队列,所以每个会员服务实例都对应独立的队列,以空routingKey绑定到直接交换器。

用户服务发消息时也设置了空routingKey,所以直接交换器收到消息后,发现匹配俩队列,于是都转发消息

怎么正确使用RabbitMQ异步编程

修复

对会员服务不要使用匿名队列,而使用同一队列。

将上面代码中的匿名队列换做普通队列:

private static final String QUEUE = "newuserQueue";@Beanpublic Queue queue()  { return new Queue(QUEUE);}

这样对同一消息,俩实例中只有一个实例可收到,不同消息被轮询发给不同实例。

现在的交换器和队列关系

怎么正确使用RabbitMQ异步编程

step2:用户服务-广播消息给会员、营销服务

期望会员、营销服务都能收到广播消息,但会员/营销服务中的每个实例只需收到一次消息。

声明一个队列和一个FanoutExchange,然后模拟俩用户服务和俩营销服务:

怎么正确使用RabbitMQ异步编程

注册四个用户。日志发现一条用户注册的消息,要么被会员服务收到,要么被营销服务收到,这不是广播。可使用的明明是FanoutExchange,为什么没起效呢?

怎么正确使用RabbitMQ异步编程

因为广播交换器会忽略routingKey,广播消息到所有绑定的队列。该案例的俩会员服务和两个营销服务都绑定了同一队列,所以四服务只能收到一次消息:

怎么正确使用RabbitMQ异步编程

修复

拆分队列,会员和营销两组服务分别使用一条独立队列绑定到广播交换器

怎么正确使用RabbitMQ异步编程

现在的交换器和队列结构

怎么正确使用RabbitMQ异步编程

从日志输出可以验证,对每条MQ消息,会员服务和营销服务分别都会收到一次,一条消息广播到两个服务同时,在每一个服务的两个实例中通过轮询接收:

怎么正确使用RabbitMQ异步编程 

异步的消息路由模式一旦配置出错,轻则可能导致消息重复处理,重则可能导致重要的服务无法接收到消息,最终造成业务逻辑错误。

小结

微服务场景下不同服务多个实例监听消息的情况,一般不同服务需要同时收到相同的消息,而相同服务的多个实例只需要轮询接收消息。我们需要确认MQ的消息路由配置是否满足需求,以避免消息重复或漏发问题。

2.3 死信堵塞MQ之坑

始终无法处理的死信消息,可能会引发堵塞MQ。

若线程池的任务队列无上限,最终可能导致OOM,类似的MQ也要注意任务堆积问题。对于突发流量引起的MQ堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。

2.3.1 案例

用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在MQ中回荡的同一条消息,就是死信。

随着MQ被越来越多的死信填满,消费者需花费大量时间反复处理死信,导致正常消息的消费受阻,最终MQ可能因数据量过大而崩溃。

定义一个队列、一个直接交换器,然后把队列绑定到交换器

怎么正确使用RabbitMQ异步编程

sendMessage发送消息到MQ,访问一次提交一条消息,使用自增标识作为消息内容

怎么正确使用RabbitMQ异步编程

收到消息后,直接NPE,模拟处理出错

怎么正确使用RabbitMQ异步编程

调用sendMessage接口发送两条消息,然后来到RabbitMQ管理台,可以看到这两条消息始终在队列,不断被重新投递,导致重新投递QPS达到1063。

怎么正确使用RabbitMQ异步编程

在日志中也可看到大量异常信息。

修复方案

  • 解决死信无限重复进入队列最简单方案

        程序处理出错时,直接抛AmqpRejectAndDontRequeueException,避免消息重新进入队列

throw new AmqpRejectAndDontRequeueException("error");

但更希望对同一消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,若依旧失败,再把消息投递到专门设置的DLX。对于来自DLX的数据,可能只是记录日志发送报警,即使出现异常也不会再重复投递。

逻辑如下

怎么正确使用RabbitMQ异步编程

针对该问题,我们来看

Spring AMQP的简便解决方案

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2. 定义死信交换器、死信队列。其实都是普通交换器和队列,只不过专门用于处理死信消息

  3. 通过RetryInterceptorBuilder构建一个RetryOperationsInterceptor以处理失败时候的重试。策略是最多尝试5次(重试4次);并且采取指数退避重试,首次重试延迟1秒,第二次2秒,以此类推,最大延迟是10秒;如果第4次重试还是失败,则使用RepublishMessageRecoverer把消息重新投入一个DLX

  4. 定义死信队列的处理程序。本案例只记录日志

代码

怎么正确使用RabbitMQ异步编程

执行程序,发送两条消息,查看日志:

怎么正确使用RabbitMQ异步编程

  • msg2的4次重试间隔分别是1秒、2秒、4秒、8秒,再加上首次的失败,所以最大尝试次数是5

  • 4次重试后,RepublishMessageRecoverer把消息发往DLX

  • 死信处理程序输出了got dead message msg2。

虽然几乎同时发俩消息,但msg2在msg1四次重试全部结束后才开始处理,因为默认SimpleMessageListenerContainer只有一个消费线程。可通过增加消费线程避免性能问题:

直接设置concurrentConsumers参数为10,来增加到10个工作线程

怎么正确使用RabbitMQ异步编程

也可设置maxConcurrentConsumers参数,让SimpleMessageListenerContainer动态调整消费者线程数。

到此,关于“怎么正确使用RabbitMQ异步编程”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/43713.html

(0)

相关推荐

  • CPU基础知识-CPU的组成 运算器、控制器、寄存器

    技术CPU基础知识-CPU的组成 运算器、控制器、寄存器 CPU基础知识-CPU的组成 运算器、控制器、寄存器1 参考资料
    计算机硬件系统—CPU(运算器和控制器)(一) https://www.jia

    礼包 2021年12月13日
  • python怎么实现最新气候分区掩膜

    技术python怎么实现最新气候分区掩膜这篇文章主要介绍“python怎么实现最新气候分区掩膜”,在日常操作中,相信很多人在python怎么实现最新气候分区掩膜问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法

    攻略 2021年11月23日
  • oracle11g dataguard如何切换

    技术oracle11g dataguard如何切换这篇文章主要介绍“oracle11g dataguard如何切换”,在日常操作中,相信很多人在oracle11g dataguard如何切换问题上存在疑惑,小编查阅了各式

    攻略 2021年11月11日
  • mshflexgrid数据表格怎样绑定数据库

    技术mshflexgrid数据表格怎样绑定数据库这篇文章将为大家详细讲解有关mshflexgrid数据表格怎样绑定数据库,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。ms

    攻略 2021年12月1日
  • Docker笔记,狂神说)

    技术Docker笔记,狂神说) Docker笔记(狂神说)1、Docker 入门狂神说教程:https://www.bilibili.com/video/BV1og4y1q7M4share_source

    礼包 2021年10月20日
  • 舍瑟而作,一段诸子百家中的古文求译文

    技术舍瑟而作,一段诸子百家中的古文求译文1.天下有道舍瑟而作,丘不与易也【课文翻译】1.二三子何患于丧乎?天下之无道也久矣,天将以夫子为木铎。
    诸位何必为孔子丧失官位担忧呢?天下没有德政已经很久了,上天将借孔子来宣传大道

    生活 2021年10月30日