柚子快報(bào)激活碼778899分享:分布式 RabbitMQ
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ
一、簡介
在微服務(wù)中,模塊之間的通訊有兩種方式,一種是同步通訊,發(fā)送請求之后,希望立即得到結(jié)果,例如feign,一種是異步通訊,發(fā)送請求之后,不需要立即得到結(jié)果,例如RabbitMQ。
(一) 同步通訊
微服務(wù)之間使用Feign調(diào)用就屬于同步方式,雖然調(diào)用可以實(shí)時(shí)獲取到結(jié)果,但是存在以下問題:
程序耦合度高性能下降資源浪費(fèi)級(jí)聯(lián)失敗
(二) 異步通訊
使用異步調(diào)用的方式可以避免同步調(diào)用所帶來的問題,不需要立即得到返回結(jié)果,同時(shí)也不用關(guān)心接收方對數(shù)據(jù)的處理,調(diào)用方只需要發(fā)送消息,不關(guān)心消息是如何被處理的,接收方,只需要接收消息,不需要返回結(jié)果。帶來的好處就是吞吐量提升、故障隔離、調(diào)用間沒有阻塞、耦合度低以及流量消峰,缺點(diǎn)就是架構(gòu)復(fù)雜、需要依賴消息隊(duì)列的可靠性、安全性以及性能。
(三) 技術(shù)對比
MQ,中文是消息隊(duì)列(MessageQueue),字面來看就是存放消息的隊(duì)列。也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。
比較常見的MQ實(shí)現(xiàn):
ActiveMQRabbitMQRocketMQKafka
幾種常見MQ的對比:
RabbitMQ ActiveMQ RocketMQ Kafka 公司/社區(qū) Rabbit Apache 阿里 Apache 開發(fā)語言 Erlang Java Java Scala&Java 協(xié)議支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協(xié)議 自定義協(xié)議 可用性 高 一般 高 高 單機(jī)吞吐量 一般 差 高 非常高 消息延遲 微秒級(jí) 毫秒級(jí) 毫秒級(jí) 毫秒以內(nèi) 消息可靠性 高 一般 高 一般
追求可用性:Kafka、 RocketMQ 、RabbitMQ追求可靠性:RabbitMQ、RocketMQ追求吞吐能力:RocketMQ、Kafka追求消息低延遲:RabbitMQ、Kafka
二、安裝
這里使用docker容器化部署的方式進(jìn)行安裝,默認(rèn)賬號(hào):root,密碼:123456
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-di \
rabbitmq:3.8-management
http端口為5672,可以通過5672端口來訪問管理后臺(tái)。
三、快速入門
(一) 基本概念
在RabbitMQ中,一些基本的概念和名詞:
publisher:生產(chǎn)者consumer:消費(fèi)者exchange:交換機(jī),負(fù)責(zé)消息路由queue:隊(duì)列,存儲(chǔ)消息virtualHost:虛擬主機(jī),隔離不同租戶的exchange、queue、消息的隔離
。。。
RabbitMQ概念詳細(xì)說明
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
Publisher
消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
Exchange
交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。Exchange有4種類型:direct(默認(rèn)),fanout, topic, 和headers,不同類型的Exchange轉(zhuǎn)發(fā)消息的策略有所區(qū)別。
Queue
消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
Binding
綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。Exchange 和Queue的綁定可以是多對多的關(guān)系。
Connection
網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。
Channel
信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)的虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷τ诓僮飨到y(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
Consumer
消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
Virtual Host
虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。
Broker
表示消息隊(duì)列服務(wù)器實(shí)體
MQ的基本結(jié)構(gòu):
RabbitMQ官方提供了5個(gè)不同的示例,對應(yīng)了不同的消息模型:
對于提供的5種消息模型,在后面進(jìn)行介紹,這里先說以下RabbitMQ的實(shí)現(xiàn),實(shí)現(xiàn)RabbitMQ采用的是SpringAMQP,它是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實(shí)現(xiàn)了自動(dòng)裝配,使用起來非常方便。SpringAMQP的官方地址:Spring AMQP
在SpringAMQP中提供了兩個(gè)非常重要的組件:AmqpAdmin和RabbitTemplate,分別用來管理隊(duì)列和交換機(jī)以及發(fā)送消息。對于SpringAMQP提供的API更詳細(xì)、更強(qiáng)大的用法,可以參考官網(wǎng)進(jìn)行使用。
(二) 引入依賴
(三) 添加配置
spring:
rabbitmq:
host: 192.168.0.240
port: 5672
virtual-host: /
username: root
password: 123456
(四) 消息發(fā)送
這里以簡單隊(duì)列模式進(jìn)行測試:
// 隊(duì)列名稱
String queueName = "simple.queue";
// 使用AmqpAdmin聲明隊(duì)列
amqpAdmin.declareQueue(new Queue(queueName, true, false, false, null));
// 消息
String message = "hello, spring amqp!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(queueName, message);
這個(gè)時(shí)候,就可以在Queue中看到自己創(chuàng)建的隊(duì)列和消息:
(五) 消息接收
剛剛發(fā)送消息的時(shí)候,是往simple.queue的隊(duì)列中發(fā)送了一條消息,消息內(nèi)容為:hello, spring amqp!,需要獲取到隊(duì)列中的消息。
@EnableRabbit
package cn.yichangqiao.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
}
}
這里消息接收的參數(shù)類型(public void listenSimpleQueueMessage(String msg))可以是AMQP提供的Message對象(包含了全部消息內(nèi)容,消息頭和消息體等)、發(fā)送消息的類型、Channel(當(dāng)前傳輸?shù)耐ǖ溃?,并且三種參數(shù)不分?jǐn)?shù)量和順序。
此外,對于監(jiān)聽隊(duì)列的消息,SpringAMQP提供了兩個(gè)注解,分別為RabbitListener和RabbitHandler,不同之處在于RabbitListener主要用在類和方法上,用于指明監(jiān)聽哪些隊(duì)列,而RabbitHandler主要用在方法上,用于重載接收不同類型的消息。
(六) 消息轉(zhuǎn)換器
Spring會(huì)把你發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時(shí)候,還會(huì)把字節(jié)反序列化為Java對象。默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
數(shù)據(jù)體積過大有安全漏洞可讀性差
因此我們需要修改配置為JSON轉(zhuǎn)換器,在publisher和consumer兩個(gè)服務(wù)中都引入依賴:
配置消息轉(zhuǎn)換器。
在啟動(dòng)類中添加一個(gè)Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
四、消息模型
在MQ的官方文檔中,給出了5個(gè)消息隊(duì)列的模型。
(一) 基本消息隊(duì)列
基本消息隊(duì)列中接收消息的消費(fèi)者,只綁定一個(gè)隊(duì)列,也就是這個(gè)消費(fèi)者只消費(fèi)這一個(gè)隊(duì)列中的消息,與隊(duì)列之間是一對一的綁定關(guān)系。由消息的發(fā)送者,往隊(duì)列中發(fā)送一條消息后,由綁定這個(gè)隊(duì)列的消費(fèi)者進(jìn)行消費(fèi),這種模式就是簡單隊(duì)列模式。
/**
* 基本消息隊(duì)列 - 消息發(fā)送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 9:44
*/
@Slf4j
@Component
public class BasicQueuePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 發(fā)送消息到簡單隊(duì)列
*/
public void sendMessageToBasicQueue() {
// 創(chuàng)建隊(duì)列
String queueName = "basic.queue";
// 創(chuàng)建隊(duì)列的參數(shù):
// String name (隊(duì)列名稱), boolean durable (持久性), boolean exclusive (排他性), boolean autoDelete (自動(dòng)刪除)
amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
// 發(fā)送消息
String message = "發(fā)送的消息內(nèi)容為:" + LocalDateTime.now();
rabbitTemplate.convertAndSend(queueName, message);
log.info("發(fā)送消息到簡單隊(duì)列成功");
}
}
/**
* 簡單消息隊(duì)列 - 消息消費(fèi)者
* @Author: yichangqiao
* @Date: 2024/2/20 10:08
*/
@Slf4j
@Component
public class BasicQueueListener {
@RabbitListener(queues = "basic.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
}
}
(二) 工作消息隊(duì)列
Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。
/**
* 工作消息隊(duì)列 - 消息發(fā)送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 10:23
*/
@Slf4j
@Component
public class WorkQueuePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 發(fā)送消息到簡單隊(duì)列
*/
public void sendMessageToWorkQueue() {
// 創(chuàng)建隊(duì)列
String queueName = "work.queue";
// 創(chuàng)建隊(duì)列的參數(shù):
// String name (隊(duì)列名稱), boolean durable (持久性), boolean exclusive (排他性), boolean autoDelete (自動(dòng)刪除)
amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
// 發(fā)送消息
String message = "_發(fā)送的消息內(nèi)容為:" + LocalDateTime.now();
for (int i = 0; i < 500; i++) {
rabbitTemplate.convertAndSend(queueName, i + message);
}
log.info("發(fā)送消息到工作隊(duì)列成功");
}
}
/**
* 工作消息隊(duì)列 - 消息消費(fèi)者
*
* @Author: yichangqiao
* @Date: 2024/2/20 10:28
*/
@Slf4j
@Component
public class WorkQueueListener {
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消費(fèi)者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消費(fèi)者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
}
這里使用兩個(gè)消費(fèi)者,共同綁定了work.queue這一個(gè)隊(duì)列,來共同消費(fèi)隊(duì)列中的消息,但是在listenWorkQueue1中消費(fèi)消息后休眠了20S,在listenWorkQueue2中消費(fèi)消息后休眠了200S,這兩個(gè)消費(fèi)者消費(fèi)消息的能力是不同的,listenWorkQueue1的消費(fèi)能力是比listenWorkQueue2要強(qiáng)的,但是在實(shí)際的消費(fèi)過程中,消息是平均分配給每個(gè)消費(fèi)者,并沒有考慮到消費(fèi)者的處理能力。這樣顯然是有問題的。
在spring中有一個(gè)簡單的配置,可以解決這個(gè)問題。修改consumer服務(wù)的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息
(三) 發(fā)布訂閱
在發(fā)布訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:
Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給交換機(jī)Exchange:交換機(jī)。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列Direct:定向,把消息交給符合指定routing key 的隊(duì)列Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒有變化。Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
Spring提供了一個(gè)接口Exchange,來表示所有不同類型的交換機(jī):
1. 廣播
在廣播模式下,消息發(fā)送流程是這樣的:
1) ?可以有多個(gè)隊(duì)列2) ?每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))3) ?生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定4) ?交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列5) ?訂閱隊(duì)列的消費(fèi)者都能拿到消息
@Configuration
public class FanoutConfig {
/**
* 聲明交換機(jī)
* @return Fanout類型交換機(jī)
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("exchange.fanout");
}
/**
* 第1個(gè)隊(duì)列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 綁定隊(duì)列和交換機(jī)
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2個(gè)隊(duì)列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 綁定隊(duì)列和交換機(jī)
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
/**
* 廣播模式 - 消息發(fā)送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 10:50
*/
@Slf4j
@Component
public class FanoutExchangePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 發(fā)送消息 - 廣播
*/
public void sendMessageToFanoutExchange() {
// 交換機(jī)
String exchangeName = "exchange.fanout";
// 消息
String message = "hello, everyone!" + LocalDateTime.now();
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消費(fèi)者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消費(fèi)者2接收到Fanout消息:【" + msg + "】");
}
2. 路由
在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。
在Direct模型下:
隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息。
/**
* 路由模式 - 消息消費(fèi)者
* @Author: yichangqiao
* @Date: 2024/2/20 11:05
*/
@Slf4j
@Component
public class DirectExchangeListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費(fèi)者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費(fèi)者接收到direct.queue2的消息:【" + msg + "】");
}
}
/**
* 路由模式 - 消息發(fā)送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 11:03
*/
@Slf4j
@Component
public class DirectExchangePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
public void sendMessageToDirectExchange() {
// 交換機(jī)名稱
String exchangeName = "exchange.direct";
// 消息
String message = "紅色警報(bào)!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
}
3. 主題
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!
Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#:匹配一個(gè)或多個(gè)詞
*:匹配不多不少恰好1個(gè)詞
舉例:
item.#:能夠匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
圖示:
解釋:
Queue1:綁定的是china.# ,因此凡是以 china.開頭的routing key 都會(huì)被匹配到。包括china.news和china.weatherQueue2:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配。包括china.news和japan.news
/**
* 主題模式 - 消息發(fā)送者
* @Author: yichangqiao
* @Date: 2024/2/20 11:12
*/
@Slf4j
@Component
public class TopicExchangePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
public void sendMessageToTopicExchange() {
// 交換機(jī)名稱
String exchangeName = "exchange.topic";
// 消息
String message = "喜報(bào)!孫悟空大戰(zhàn)哥斯拉,勝!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
}
/**
* 主題模式 - 消息消費(fèi)者
* @Author: yichangqiao
* @Date: 2024/2/20 11:13
*/
@Slf4j
@Component
public class TopicExchangeListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消費(fèi)者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消費(fèi)者接收到topic.queue2的消息:【" + msg + "】");
}
}
五、消息可靠性
消息隊(duì)列在實(shí)際的使用過程中,其實(shí)有很多的問題需要思考。
從消息發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷很多的過程。
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
發(fā)送時(shí)丟失:
生產(chǎn)者發(fā)送的消息未送達(dá)exchange消息到達(dá)exchange后未到達(dá)queue
MQ宕機(jī),queue將消息丟失consumer接收到消息后未消費(fèi)就宕機(jī)
針對這些問題,RabbitMQ分別給出了解決方案:
生產(chǎn)者確認(rèn)機(jī)制mq持久化消費(fèi)者確認(rèn)機(jī)制失敗重試機(jī)制
(一) 生產(chǎn)者確認(rèn)機(jī)制
RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
publisher-confirm,發(fā)送者確認(rèn)
消息成功投遞到交換機(jī),返回ack。消息未投遞到交換機(jī),返回nack。
publisher-return,發(fā)送者回執(zhí)
消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每一個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ACK沖突。
publisher-return(發(fā)送者回執(zhí))
如果消息正確投遞到了交換機(jī),但是沒有路由到隊(duì)列,可以使用發(fā)送者回執(zhí)機(jī)制,返回ACK,以及路由失敗的原因。
spring:
rabbitmq:
publisher-confirm-type: correlated # 開啟發(fā)布者確認(rèn)機(jī)制 異步回調(diào)機(jī)制
publisher-returns: true # 開啟消費(fèi)者返回機(jī)制
template:
mandatory: true # 消息路由失敗后的策略,true代表的是手動(dòng)處理
說明:
publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
simple:同步等待confirm結(jié)果,直到超時(shí)correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback默認(rèn)為none
publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallbacktemplate.mandatory:定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投遞失敗,記錄日志
log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業(yè)務(wù)需要,可以重發(fā)消息
});
}
}
publisher-confirm(發(fā)送者確認(rèn))
發(fā)送者確認(rèn)機(jī)制用來保證消息能夠投遞到交換機(jī),如果沒有正確的投遞到交換機(jī),返回ACK。
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息體
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失敗
log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.發(fā)送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一會(huì)兒,等待ack回執(zhí)
Thread.sleep(2000);
}
(二) 消息持久化
生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。
交換機(jī)持久化隊(duì)列持久化消息持久化
交換機(jī)持久化
SpringAMQP中可以通過代碼指定交換機(jī)持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除
return new DirectExchange("simple.direct", true, false);
}
隊(duì)列持久化
SpringAMQP中可以通過代碼指定隊(duì)列持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
消息持久化
利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
1:非持久化2:持久化
用java代碼指定:
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
(三) 消費(fèi)者確認(rèn)機(jī)制
RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。而RabbitMQ是通過消費(fèi)者回執(zhí)來確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。
設(shè)想這樣的場景:
1)RabbitMQ投遞消息給消費(fèi)者2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ3)RabbitMQ刪除消息4)消費(fèi)者宕機(jī),消息尚未處理
這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。而SpringAMQP則允許配置三種確認(rèn)模式:
?manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
?auto:自動(dòng)ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
?none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除
由此可知:
none模式下,消息投遞是不可靠的,可能丟失auto模式類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒有異常,返回ackmanual:自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack
一般,我們都是使用默認(rèn)的auto即可。
none模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關(guān)閉ack
auto模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # Spring自動(dòng)處理ack
(四) 失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力。
本地重試
可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制的requeue到mq隊(duì)列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費(fèi)者失敗重試
initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長為1秒
multiplier: 1 # 失敗的等待時(shí)長倍數(shù),下次等待時(shí)長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試,重試達(dá)到最大次數(shù)后,Spring會(huì)返回ack,消息會(huì)被丟棄。
失敗策略
在本地重試達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實(shí)現(xiàn):
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
重試失敗之后,消息會(huì)從simple.queue隊(duì)列中自動(dòng)入隊(duì)到error.queue隊(duì)列中存儲(chǔ)。
六、死信交換機(jī)
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):
消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false消息是一個(gè)過期消息,超時(shí)無人消費(fèi)要投遞的隊(duì)列消息滿了,無法投遞
如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,檢查DLX)。
在失敗重試策略中,默認(rèn)的RejectAndDontRequeueRecoverer會(huì)在本地重試次數(shù)耗盡后,發(fā)送reject給RabbitMQ,消息變成死信,被丟棄??梢越osimple.queue添加一個(gè)死信交換機(jī),給死信交換機(jī)綁定一個(gè)隊(duì)列。這樣消息變成死信后也不會(huì)丟棄,而是最終投遞到死信交換機(jī),路由到與死信交換機(jī)綁定的隊(duì)列。
// 聲明普通的 simple.queue隊(duì)列,并且為其指定死信交換機(jī):dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定隊(duì)列名稱,并持久化
.deadLetterExchange("dl.direct") // 指定死信交換機(jī)
.build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲(chǔ)死信的隊(duì)列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 將死信隊(duì)列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
一個(gè)隊(duì)列中的消息如果超時(shí)未消費(fèi),則會(huì)變?yōu)樗佬?,超時(shí)分為兩種情況:
● 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
● 消息本身設(shè)置了超時(shí)時(shí)間
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
.ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交換機(jī)
.build();
}
注意,這個(gè)隊(duì)列設(shè)定了死信交換機(jī)為dl.ttl.direct
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
@Test
public void testTTLQueue() {
// 創(chuàng)建消息
String message = "hello, ttl queue";
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 記錄日志
log.debug("發(fā)送消息成功");
}
@Test
public void testTTLMsg() {
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("發(fā)送消息成功");
}
當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信。
七、延遲隊(duì)列
利用死信交換機(jī)和TTL可以實(shí)現(xiàn)延遲隊(duì)列效果,實(shí)現(xiàn)了消息發(fā)出后,消費(fèi)者延遲收到消息的效果。這種消息模式就稱為延遲隊(duì)列(Delay Queue)模式。
延遲隊(duì)列的使用場景包括:
延遲發(fā)送短信用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動(dòng)取消預(yù)約工作會(huì)議,20分鐘后自動(dòng)通知所有參會(huì)人員
因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲隊(duì)列效果。
這個(gè)插件就是DelayExchange插件。參考RabbitMQ的插件列表頁面:
Community Plugins | RabbitMQ
使用方式可以參考官網(wǎng)地址:Scheduling Messages with RabbitMQ | RabbitMQ
DelayExchange需要將一個(gè)交換機(jī)聲明為delayed類型。當(dāng)我們發(fā)送消息到delayExchange時(shí),流程如下:
接收消息判斷消息是否具備x-delay屬性如果有x-delay屬性,說明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時(shí)間返回routing not found結(jié)果給消息發(fā)送者x-delay時(shí)間到期后,重新投遞消息到指定隊(duì)列
插件的使用也非常簡單:聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊(duì)列與其綁定即可。
聲明DelayExchange交換機(jī):
發(fā)送消息:
八、惰性隊(duì)列
當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。
解決消息堆積有兩種思路:
增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說的work queue模式擴(kuò)大隊(duì)列容積,提高堆積上限
要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
接收到消息后直接存入磁盤而非內(nèi)存消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存支持?jǐn)?shù)百萬條的消息存儲(chǔ)
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
基于@Bean聲明lazy-queue
基于@RabbitListener聲明LazyQueue
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ
推薦鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。