RabbitMQ基础

RabbitMQ 一、MQ简介 消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。 消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就

RabbitMQ

一、MQ简介

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。
简单来说:消息队列可以实现异步之间的通信

微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式

1.1 消息队列特点

  1. 异步
    • 消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。
  2. 解耦
    • 消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
  3. 广播
    • 消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
  4. 流量削峰与流控
    • 当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的”载体”。在下游有能力处理的时候,再进行分发与处理。

1.2 异步调用

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

发送方 ---发布消息---> 消息代理 ---订阅消息---> 接收者

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。 这样,发送消息的人和接收消息的人就完全解耦了。

1.2.1 异步优缺点

  1. 优点:
    • 耦合度更低
    • 性能更好
    • 业务拓展性强
    • 故障隔离,避免级联失败
  2. 缺点
    • 完全依赖于Broker的可靠性、安全性和性能
    • 架构复杂,后期维护和调试麻烦

1.3 常用消息中间件

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ. 目比较常见的MQ实现:

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量
    1fc7f3f617018db5a6e9ce3c20aa045.png
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ
  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

二、RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
08bb86bdd50e8733d2e253239de63c4.png

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

RabbitMQ集群架构参考:RabbitMQ 的4种集群架构

2.1 安装

2.1.1 Windows安装

Erlang和Rabbitmq版本对照表详情:(https://www.rabbitmq.com/docs/which-erlang)
Rabbitmq全部版本安装地址:(https://www.rabbitmq.com/release-information)
Erlang全部版本安装地址:(https://www.erlang.org/downloads)

都需要配置环境变量,详情可以查看windows安装rabbitmq和环境erlang
进入mq的sbin目录,cmd窗口输入:rabbitmq-server.bat -detached,后台运行服务,前台窗口运行不加后面的-detached即可

2.1.2 Linux安装

由于RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装RabbitMQ之前,需要先安装Erlang。在本教程中我们将安装最新版本的Erlang到服务器中。 安装环境:Ubuntu 20.04,使用如下命令安装。

  1. 安装Erlang
apt-get install erlang
  1. 验证Erlang是否安装成功
erl	# 查看erlang版本,并启动。成功执行则说明erlnag安装成功
  1. 安装RabbitMQ
apt-get install rabbitmq-server
  1. 验证RabbitMQ是否安装成功
systemctl status rebbitmq-server # Active: active (running) 说明处于运行状态
  1. web端可视化操作界面
rabbitmq-plugins enable rabbitmq_management
# 重启服务
systemctl restart rabbitmq-server
  1. 此时,应该可以通过 http://localhost:15672 查看,使用默认账户guest/guest 登录。
  2. 添加用户,并赋予权限(方便我们以后做其他操作)。
rabbitmqctl add_user "username" "password"	# 添加用户
rabbitmqctl list_users	# 列出所有用户
rabbitmqctl set_user_tags "username" administrator	# 设置为管理员

可以用docker来拉取镜像使用

2.2 RabbitMQ交换机类型

2.2.1 Direct exchange 订阅

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:

  1. 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
  2. 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  3. Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
    515ca997e3f7d4ed697aaccf03ca9e9.png

2.2.2 Fanout exchange 广播

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的N个队列
27b97166b27fa216a631a2d09891c19.png

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

2.2.3 Topic exchange 通配符订阅

主题交换机(topic exchanges)中,队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。
Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以“ .”分割,例如: sys.insert
扇型交换机和主题交换机异同:

  • 对于扇型交换机路由键是没有意义的,只要有消息,它都发送到它绑定的所有队列上
  • 对于主题交换机,路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上

介绍一下规则

  • (星号) 用来表示一个单词 (必须出现的)
    # (井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为*.TT.* 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

2.3 数据隔离

2.3.1 用户管理

83856d82be035d65abe66334ea7020d.png
Name:guest也就是用户名
Tags:administrator,说明itheima用户是超级管理员,拥有所有权限
Can access virtual host: /,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

2.3.2 virtual host

  • RabbitMQ 中有一个概念叫做多租户,我们安装一个 RabbitMQ 服务器,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
  • 本质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突。
  • 我们并不需要特别的去看待 vhost,他就跟普通的物理 RabbitMQ 一样,不同的 vhost 能够提供逻辑上的分离,确保不同的应用消息队列能够安全独立运行。

三、 SpirngAMQP

由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。 但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址: Spring AMQP SpringAMQP提供了三个功能:

自动声明队列、交换机及其绑定关系
基于注解的监听器模式,异步接收消息
封装了RabbitTemplate工具,用于发送消息

3.1 API声明队列和交换机

3.1.1 基本API

SpringAMQP提供了一个Queue类,用来创建队列:
public class Queue extends AbstractDeclarable implements Cloneable

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:
e93f8bd8391f9996cacbc2c39e6a408.png

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:
16d389aac12aa35cdea82978c4a8237.png

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:
a03fe655af47f472c1f6b72c11a65a6.png

3.1.2 fanout示例

在consumer中创建一个配置类,声明队列和交换机:

@Configuratino
public class FanoutConfig{
  //创建fanout交换机
  @Bean
  public FanoutExchange fanoutExchange(){
      return new FanoutExchange("test1.fanout");
  }

  //创建队列
  @Bean
  public Queue fanoutQueue1(){
    return new Queue(fanout.queue1);
  }

  //绑定
  @Bean
  public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  }

}

3.1.3 direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding,也就是绑定Binding类中最后.with条件:

@Configration
public class DirectConfig{
  @Bean
  public DirectExchange(){
    return ExchangeBuilder.directExchange("test1.direct").builder();
}

  @Bean
  public Queue directQueue1(){
        return new Queue("direct.queue1");
    }
  @Bean
  public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }
}

3.1.4 注解声明队列和交换机

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
我们同样声明Direct模式的交换机和队列:
为什么在消费端声明交换机和队列:谁使用谁管理

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
 
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

3.1.5 Topic

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
 
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

3.2 消息转换器

Spring的消息发送代码接收的消息体是一个Object
4916df0fc2d3020310388714a1b3eee.png
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

3.2.1配置JSON转换器

JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
注意:spring-boot-starter-web中包含了jackson依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

3.1 案例入门

  1. 消费者,生产者导入依赖
  2. 消费者,生产者配置yml文件
  3. 在生产者中创建RabbitConfig配置交换机并用@Bean或者注解进行注册
  4. 然后创建个消息推送接口,利用自动注入@Autowired来注入RabbitTemplate,提供了接收/发送等等方法,来绑定交换机,并把消息发送到交换机
  5. 交换机会根据自己的类型来选择队列,把消息发送到对应的队列中
  6. 消费者创建消息接受监听类,也就是利用@RabbitListener注释来绑定自己的队列。
  7. 生产者消息绑定交换机(消费者绑定队列)---> 交换机通过配置来动态选择队列 ---> 队列获取监听自己的消费者并把消息发送----->消费者接收

添加依赖:

  <!--消息发送-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

配置MQ地址:

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
LICENSED UNDER CC BY-NC-SA 4.0
Comment