RabbitMQ是目前非常热门的一款消息中间件。主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。具体介绍一下rabbitmq涉及到的一些关键概念
Producer(生产者)
就是投递消息的一方。生产者创建消息,然后发布到RabbitMQ中。消息一般包括2个部分:消息体和标签(Label)。消息体也可用称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然也可以进一步对消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ, RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
Consumer(消费者)
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,也不需要知道。
Broker(服务节点)
对月RabbitMQ来说,一个RabbitMQ Broker可以简单的看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。
Exchange(交换器)
生产者将消息放松到Exchange(交换器),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或者直接丢弃。
RabbitMQ中的交换器由4中类型,不同的类型有着不同的路由策略:
fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
direct
它会把消息路由到哪些BindingKey和RoutingKey完全匹配的队列中。
topic
它与direct类型的交换器类似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里匹配规则有些不同,它约定:
- RoutingKey 为一个点号 “.” 分隔的字符串(被点号 “.” 分隔开的每一段独立的字符串称为一个单词),如”com.rabbitmq.client”、”java.util.concurrent”、”com.hidden.client”
- BindingKey和RoutingKey一样也是点号 “.” 分隔的字符串
- BindingKey中可以存在两种特殊字符串 ““ 和 “#”, 用于做模糊匹配,其中”“ 用于匹配一个单词,”#” 用于匹配多规格单词(可以是零个)。
headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
RoutingKey(路由键)
生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。
Binding(绑定)
RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey), 这样RabbitMQ就知道如何正确地将消息路由到队列了。
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。BindingKey并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无时BindingKey,而是将消息路由到所有绑定到该交换器的队列中。
Queue(队列)
RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息都只能存储在队列中。多个消费者可以订阅同一个队列,这时候队列中的消息会被平均分摊(即轮询)给多个消费者处理,而不是每个消费者都收到所有的消息并处理。RabbitMQ不支持队列层面的广播消费。
Connection/Channel
无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel), 每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理每条AMQP指令都是通过信道完成的。
我们完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道呢?试想这样一个场景,一个应用程序中有很多个线程需要从RabbitMQ中消费消息,或者生产消息,那么必然需要建立很多个Connection,也就是许多个TCP连接。然而对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ采用类似NIO(Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
生产者发送消息流程
- 生产者连接到RabbitMQ Broker,建立一个连接(Connection), 开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
- 相应的交换器根据接收到的路由键查找相匹配的队列
- 如果找到,则将从生产者发送过来的消息存入相应的队列中
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关系信道
- 关闭连接
消费者接收消息流程
- 消费者连接到RabbitMQ Broker,建立一个连接(Connection), 开启一个信道(Channel)
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应得回调函数,以及做一些准备工作
- 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息
- 消费者确认(ack)接收到消息
- RabbitMQ 从队列中删除相应已经确认的消息
- 关闭信道
- 关闭连接