本文将详细说明如何实现RabbitMQ消息中间件的工作原理和使用方法。这篇文章的内容质量很高,所以边肖会分享给大家作为参考。希望你看完这篇文章后有所了解。
00-1010目前主流的消息中间件主要有:ActiveMQ、Kafka、RabbitMQ、RocketMQ等.................它是基于AMQP协议实现的。AMQP的主要特点是面向消息、面向队列、路由(包括点对点和发布/订阅)、可靠性和安全性。RabbitMQ支持多种语言,并有消息确认机制和持久化机制,保证可靠性和可用性而不丢失数据。
00-1010消息是指应用于应用程序之间传输的数据。消息的类型包括文本字符串、JSON、XML、嵌入对象等等。
所谓的消息中间件 / 消息队列(消息队列中间件(MQ)利用高效可靠的消息传输机制进行数据通信,同时可以继承基于数据通信的分布式系统。消息中间件的传输模式一般有两种:基于队列的点对点(Point-to-Point)模式和发布/订阅(Pub/Sub)模式,点对点模式,消息生产者向队列发送消息,消息消费者从队列接收消息。队列的存在使得消息的异步传输成为可能。发布-订阅模式定义了如何向内容节点发布和订阅内容,这称为主题。这种模式可以满足消费者发布一条消息,多个消费者同时消费同一信息的需求。
简介
消息与消息队列
AMQP的全称是高级消息队列协议,它是消息队列的规范,其中定义了很多核心概念。AMQP类似于JMS(Java消息服务)Java平台的专业技术规范,它还提供了许多面向中间件的API,用于在两个应用程序或分布式系统之间发送消息进行异步通信。
性能对比
解释:生产者(生产者)将信息传递到服务器端的rabbtmq的Exchange(流程:消息-服务器-虚拟主机-rabbtmq-Exchange),消费者(消费者)只需要订阅Message Queue,每次消息传递到队列队列时,都会通知消费者进行消费。
生产者只需要将消息投递到Exchange交换机中,不需要关注消息被投递到哪个队列。
消费者只需要监听队列来消费消息,不需要关注消息来自于哪个Exchange。
Exchange和Message Queue存在着绑定的关系,一个Exchage可以绑定多个消息队列。
什么是AMQP协议?
rabbtmq主要使用四种类型的开关:直接开关、扇出开关、主题开关和报头开关。当Exchange根据不同类型分发邮件时,分发策略是不同的。标头匹配AMQP邮件的标头,而不是路由密钥。此外,Headers开关和direct开关是完全一致的,但是它们的性能很差,目前几乎没有用。这里只描述其他三种类型:
1.direct
direct类型的交换器理由规则需要遵循严格的完全匹配规则,他会把消息路由到那些BindingKey和RoutingKey完全匹配的队里中,如果消息中的路由键(RoutingKey)如果和 Binding 中的 BindingKey 一致, 交换器就将消息发到对应的队列中。比如:
-
在发送消息的时候设置路由键为“info”或者“debug”,消息只会路由到Queue2,如果以其他路邮件发送消息,则消息不会路由到这两个队里中,这就是路由键和Binding key的完全匹配。
direct类型模式的特点:
(1) 不需要将Exchange进行任何绑定(binding)操作
(2) 消息传递时需要一个“RoutingKey”,可以简单的理解为要发送到的队列名字。
(3) 如果vhost中不存在RoutingKey中指定的队列名,则该消息会被抛弃。
2.fanout
它会将所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
这种模式的特点:
(1) 不需要RoutingKey,我们可以将路由键设置为空即可。
(2) 需要提前将Exchange和Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同时与多个Exchange进行绑定("多对多关系")。
(3) 如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
3.topic
direct类型的交换器理由规则需要遵循严格的完全匹配规则,这种严格的匹配方式有时候不能满足实际的业务需求,topic就是在这种规则上进行了扩展,topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,但是这里的匹配规则有所不同,它的约定如下:
-
RoutingKey为一个点号 ".",分隔的字符串(被点号 "."号分隔开的一段独立的字符串称为单词),比如:"com.rabbit.client"、"java.util.Map";
-
BindingKey和RoutingKey为一个点号 ".",分隔的字符串;
-
BindingKey中可以存在两种特殊的字符串 "*"和 "#",用于做模糊匹配,其中"*"用于匹配一个单词, "#" 用于匹配多规则单词;
-
"com.#" 可以匹配到 com.rabbitmq.aaa
-
"com.*" 可以匹配到 com.rabbitmq
举个实例:
-
路由键为 "com.rabbitmq.client" 的消息会同时路由到 Queue1 和 Queue2
-
路由键为 "com.hidden.client" 的消息会只会路由到 Queue2
-
路由键为 "com.hidden.data" 的消息会只会路由到 Queue2
-
路由键为 "java.rabbitmq.data" 的消息会只会路由到 Queue1
-
路由键为 "java.util.map" 的消息将会被丢弃或者返回给生产者,因为它没有匹配任何的路由键。
RabbitMQ特点
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
-
可靠性
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 -
灵活的路由(Flexible Routing)
在消息进入队列之前,通过Exchange来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的Exchange 。 -
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 -
高可用
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 -
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 -
多语言客户端
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 -
管理界面
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 -
跟踪机制
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 -
插件机制
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。。
RabbitMQ 中的概念模型
消息模型
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
消息流
RabbitMQ 基本概念
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
RabbitMQ 内部结构
RabbitMQ的作用和使用场景
RabbitMQ的核心组件
Hello RabbitMQ World
“Hello RabbitMQ World!”,学习一门技术,先出hello world开始,我们来编写一个Java项目来使用RabbitMQ来实现消息的生产和消费,这样能让我们能够更好的理解RabbitMQ的作用和原理,RabbitMQ是消息代理,它负责接收并转发消息,我们可以将它视为邮局,将要发送的邮件都放在邮箱中,可以确保 Mailperson 先生或者女生最终将邮件传递给别人,因此:RabbitMQ是一个邮箱,一个邮局和一个邮递员的实例。
-
生产意味着发送,发送消息的程序是生产者
-
队列就是RabbitMQ内部的邮箱名称,消息是存储在队列中的,尽管消息流经RabbitMQ和你的应用程序,生产者可以发送一个队列信息,许多消费者可以尝试从一个队列里接收数据
-
消费与接受者是同一个身份,一个消费者是一个程序,主要是等待接收信息
pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
我们称其为消息发布者(发送者)MessageProducer和我们的消息消费者(接收者) MessageConsumer。发布者将连接到RabbitMQ,发送响应的消息:
MassageProducer (消息生产者)
/** * @Author 林必昭 * @Date 2019/11/29 22:12 * @descr */ public class MassageProducer { private final static String QUEUE_NAME = "myQueue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ所在主机的ip或者主机名 factory.setHost("localhost"); //设置端口号 factory.setPort(5672); Connection conn = factory.newConnection(); //创建一个通道 Channel channel = conn.createChannel(); //指定一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定义要发送的消息 String message = "Hello RabbitMQ World!"; //往队列里发送message消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [生产者] 发送消息: '" + message + "'"); } }
MessageConsumer (消息消费者)
/** * @Author 林必昭 * @Date 2019/11/29 22:12 * @descr */ public class MessageConsumer { private final static String QUEUE_NAME = "myQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("消费者正在等待消息,退出请按CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [消费者] 接收到了: '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
在运行程序之前我们需要先启动RabbitMQ服务,前提需要安装好Erlang和RabbitMQ,这里安装的教程就不展开说明了:
下载Erlang软件包 (这个网址下载速度比较快)
RabbitMQ官网
-
注意的是:这里必须下载安装Erlang安装包,因为RabbitMQ是基于Erlang进行开发的。
启动服务之后,如图所示:
接着我们运行我们的MessgeProducer.java和MessageConsumer.java,先运行MessgeProcucer,接着运行MessageConsumer,观察控制台的输出:
至此,我们的第一个RabbitMQ服务程序编程测试成功了 !
RabbitMQ web客户端
启动之后,我们可以登录RabbitMQ的web客户端查看我们的信息:http://localhost:15672/,通过客户端我们可以查看RabbitMQ的版本信息,连接信息,Channel、Exchange连接等等...
我们也可以通过客户端新增Exchange并指定类型:
同样也可以手动添加队列,具体操作我们可以自己尝试登陆客户端来设置!
在终端通过命令查看所有的Exchange:rabbitmqctl list_exchanges
关于怎么实现RabbitMQ消息中间件的工作原理和使用就分享到这里了,希望
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/133182.html