rocketmq如何实现事务消息(rocketmq事务消息失败处理)

技术RocketMQ事务消息如何实现这篇文章主要介绍了RocketMQ事务消息如何实现,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。根据上文的描述,发送事

贺盛德贺盛德贺盛德洛绮特杨俊钦和杨俊钦,朱庇特谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯谢尼伯,范仲淹还是范仲淹,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,我的意思是,你是说.

?什么,吴登盛吴登盛:

事务MQ producer # send message in transition:publication ndresultsmessagenttransition(最终消息msg,最终对象arg)throwsmqclient异常{ if(null==this。事务监听器)/(1)

thrownewmqclient异常('事务侦听器null,null);

}退回这个。defaultmqproductimpl。sendmessage在途(msg、transactionListener、arg);//@2

}云娥@1:唉呀事务监听器-交易监听器阿叔,哎哎哎哎哎哎。

云娥@2:沙吾提defaultmqproducterimpl什么事sendmessageintransaction凯伊姆。

安祖儿安祖儿安祖儿sendmessageintransaction凯伊姆

defaultmqproducer impl # send message innersationpublicationonsundmessage innertation(最终消息msg,final transaction listener tranexecuter,最终对象arg)throwsmqclient异常{步骤1:范仲淹?范仲淹。

最终消息消息:阿胜

TransactionListener运行:安其林安其林

银对象:范仲淹唔

defaultmqproductimpl # send message传递

发送结果=空:

消息访问器。putproperty(消息,消息常量).PROPERTY_TRANSACTION_PREPARED,“true”;

消息访问器。putproperty(消息,消息常量).PROPERTY_PRODUCER_GROUP,这个。默认MQ生产者。get生产者组();尝试[

发送结果=这个。发送(消息);

}捕捉异常:{ thrownewmqclient异常(' sendmessage exception ',e);

}步骤2:吴登盛吴登盛吴登盛:TRAN _味精,俊儿是真的,阿叔阿叔阿叔阿叔阿叔阿叔;PGROUP:朱建安朱建安,我的天啊,我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊!我的天啊。哎哎哎哎哎哎,魏冄TRAN _味精,喂!喂!喂!喂是真的,唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔唔

息系统标记的方式,设置消息为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

DefaultMQProducerImpl#sendKernelImplfinal String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
       sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
SendMessageProcessor#sendMessage
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) {        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");             return response;
       }
      putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
      putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

Step3:Broker端收到客户端发送消息请求后,判断消息类型。如果是事务消息,则调用TransactionalMessageService#prepareMessage方法,否则走原先的逻辑,调用MessageStore#putMessage方法将消息存入Broker服务端。
本节重点阐述事务消息的实现原理,故接下来将重点关注prepareMessage方法,如想了解RocketMQ消息存储相关,可以关注作者

源码分析RocketMQ系列

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessagepublic PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {        return transactionalMessageBridge.putHalfMessage(messageInner);
 }

step4:事务消息,将调用TransactionalMessageServiceImpl#prepareMessage方法,继而调用TransactionalMessageBridge#prepareMessage方法。

TransactionalMessageBridge#parseHalfMessageInnerpublic PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {        return store.putMessage(parseHalfMessageInner(messageInner));
    }    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));        return msgInner;
    }

Step5:备份消息的原主题名称与原队列ID,然后取消消息的事务消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。然后调用MessageStore#putMessage方法将消息持久化,这里TransactionalMessageBridge桥接类,就是封装事务消息的相关流程,最终调用MessageStore完成消息的持久化。消息入库后,会继续回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通过同步将消息发送到消息服务端。

注:这是事务消息Prepare状态的处理逻辑,消息是存储在消息服务器了,但存储的并不是原主题,而是RMQ_SYS_TRANS_HALF_TOPIC,故此时消费端是无法消费shen
生产者发送的消息的。看到这里,如果对RocketMQ比较熟悉的话,肯定会有一个“定时任务”去取这个主题下的消息,然后则“合适”的时机将消息的主题恢复。

DefaultMQProducerImpl#sendMessageInTransactionswitch (sendResult.getSendStatus()) {            case SEND_OK: {                try {                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }            break;            case FLUSH_DISK_TIMEOUT:            case FLUSH_SLAVE_TIMEOUT:            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;                break;            default:                break;
        }

Step6:如果消息发送成功,会回调TransactionListener#executeLocalTransaction方法,执行本地事务,并且返回本地事务状态为LocalTransactionState,枚举值如下:

  • COMMIT_MESSAGE,

  • ROLLBACK_MESSAGE,

  • UNKNOW

注意:TransactionListener#executeLocalTransaction是在发送者成功发送PREPARED消息后,会执行本地事务方法,然后返回本地事务状态;如果PREPARED消息发送失败,则不会调用TransactionListener#executeLocalTransaction,并且本地事务消息,设置为LocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滚。

DefaultMQProducerImpl#sendMessageInTransactiontry {this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

step7:调用endTransaction方法结束事务(提交或回滚)。

DefaultMQProducerImpl#endTransaction
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {    case COMMIT_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);         break;    case ROLLBACK_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);         break;     case UNKNOW:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);         break;     default:         break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());

step8:组装结束事务请求,主要参数为:事务ID、事务操作(commitOrRollback)、消费组、消息队列偏移量、消息ID,fromTransactionCheck,从这里发出的请求,默认为false。Broker端的请求处理器为:EndTransactionProcessor。

step9:EndTransactionProcessor根据事务提交类型:TRANSACTION_COMMIT_TYPE(提交事务)、TRANSACTION_ROLLBACK_TYPE(回滚事务)、TRANSACTION_NOT_TYPE(忽略该请求)。

到目前为止,已详细梳理了RocketMQ事务消息的发送流程,更加准确的说是Prepare状态的消息发送流程。具体流程如图所示:
rocketmq如何实现事务消息(rocketmq事务消息失败处理)

本文到这里,初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务器后,会回调TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,然后根据该方法的返回值,结束事务:
1、COMMIT_MESSAGE :提交事务。
2、ROLLBACK_MESSAGE:回滚事务。
3、UNKNOW:未知事务状态,此时消息服务器(Broker)收到EndTransaction命令时,将不对这种消息做处理,消息还处于Prepared类型,存储在主题为:RMQ_SYS_TRANS_HALF_TOPIC的队列中,然后消息发送流程将结束,那这些消息如何提交或回滚呢?

为了实现避免客户端需要再次发送提交、回滚命令,RocketMQ会采取定时任务将RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客户端,判断该消息是否需要提交或回滚,来完成事务消息的声明周期。

感谢你能够认真阅读完这篇文章,希望小编分享的“RocketMQ事务消息如何实现”这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/150504.html

(0)

相关推荐

  • 描写梨花的优美古诗,几句赞美梨花的优美语句

    技术描写梨花的优美古诗,几句赞美梨花的优美语句1、清晨,白璧无瑕,大地从沉睡中清醒,梨花也充满着蓬勃的朝气描写梨花的优美古诗。梨花没有玫瑰的妖娆美丽,没有菊花的傲然脱俗,然而它却是我最喜爱的花,雪白的花瓣在露珠的清洗下变

    生活 2021年10月29日
  • LibraBFT和Bystack BBFT有什么区别?

    技术LibraBFT与比原链Bystack BBFT有什么不同本篇内容主要讲解“LibraBFT与比原链Bystack BBFT有什么不同”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带

    攻略 2021年12月20日
  • 如何在费奥里启动板上将BSP应用程序配置为切片

    技术怎么将BSP应用配置成Fiori Launchpad上的一个tile本篇内容介绍了“怎么将BSP应用配置成Fiori Launchpad上的一个tile”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,

    攻略 2021年12月24日
  • C++缺省参数怎么理解

    技术C++缺省参数怎么理解本篇内容主要讲解“C++缺省参数怎么理解”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“C++缺省参数怎么理解”吧!什么叫缺省参数?缺省参数是声明或定

    攻略 2021年12月3日
  • mariadb与mysql的区别是什么

    技术mariadb与mysql的区别是什么这篇文章主要讲解了“mariadb与mysql的区别是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“mariadb与mysq

    攻略 2021年12月2日
  • 好用的开源JavaScript图表库有哪些

    技术好用的开源JavaScript图表库有哪些这篇文章主要介绍好用的开源JavaScript图表库有哪些,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1、 Chart.jsChart.js 是一个

    攻略 2021年10月30日