柚子快報激活碼778899分享:分布式 rabbitmq
一,什么是消息中間件
MQ全稱為Message Queue,本質上是個隊列,F(xiàn)IFO先入先出。是在消息的傳輸過程中保存消息的容器??梢杂糜趹贸绦蚝蛻贸绦蛑g的通信方法。多用于分布式系統(tǒng)之間進行通信,在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節(jié)省了服務器的請求響應時間,從而提高了系統(tǒng)的吞吐量
二,為什么要使用消息中間件
消息中間件優(yōu)點:異步,解耦,限流
同步通信:耗時長,受網絡波動影響,不能保證高成功率,耦合性高。
同步,異步
并發(fā):一段時間(1S)多個請求數(shù)
并行:時間節(jié)點,多個指令同時被執(zhí)行
串行:順序執(zhí)行
1.異步處理:
將不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進行異步處理,提高了應用程序的響應時間。
消息隊列:Redis 發(fā)布訂閱(pub/sub)
異步方式:用戶點擊完下單按鈕后,只需等待25ms就能得到下單響應 (20 + 5 = 25ms)。也就是說,訂單消息提交到MQ,MQ回饋一個消息成功,然后再把訂單提交到數(shù)據(jù)庫20ms,就完成了。至于MQ通知庫存、支付、物流系統(tǒng)所花費的時間和訂單系統(tǒng)成功沒有關系了。 這樣這個訂單系統(tǒng)提升用戶體驗和系統(tǒng)吞吐量(單位時間內處理請求的數(shù)目)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-fw7ZuYfR-1691115209630)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20230713152842364.png)]
同步處理(耗時長):
同步方式的問題:當一個用戶提交訂單到成功需要300ms+300ms+300ms+20ms = 920ms,這是不能容忍的。也就是說庫存、支付、物流、最后保存數(shù)據(jù)庫全部成功,訂單的提交才算完成。
2.系統(tǒng)的耦合性越高,容錯性就越低,可維護性就越低
服務與之間耦合度,比如訂單服務與用戶積分服務(需求:下單成功,增加積分)
如果不用消息隊列,訂單服務和積分服務就要通信,下單后調用積分服務的接口通知積分服務進行處理(或者定時掃描之類的),那么調用接口失敗,或者延時等等…一系列的問題要考慮處理,非常繁瑣
用了消息隊列,用戶A下單成功后下單服務通過redis發(fā)布(mq的生產者)一消息,就不用管了.用戶積分服務redis訂閱了(mq的消費者),就會受到這用戶A下單的消息,進行處理.這就降低了多個服務之間的耦合,即使積分服務發(fā)生異常,也不會影響用戶正常下單.處理起來就非常的絲滑,各干各的互不影響.
解決方案:應用程序解耦合
MQ相當于一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
使用消息隊列的方式:使用 MQ 使得應用間解耦,提升容錯性和可維護性。庫存和支付和物流直接去MQ取到訂單的信息即可,即使庫存系統(tǒng)報錯,沒關系,等到庫存修復后再次從MQ中去取就可以了
3.高并發(fā)(分批處理請求)
訂單系統(tǒng),在下單的時候就會往數(shù)據(jù)庫寫數(shù)據(jù)。但是數(shù)據(jù)庫只能支撐每秒1000左右的并發(fā)寫入,并發(fā)量再高就容易宕機。低峰期的時候并發(fā)也就100多個,但是在高峰期時候,并發(fā)量會突然激增到5000以上,這個時候數(shù)據(jù)庫肯定卡死了。但不一定宕機,只會很慢,一旦宕機就會有消息丟失。
解決方案:削峰填谷
消息被MQ保存起來了,5000條數(shù)據(jù)對于MQ,簡直是小意思,然后系統(tǒng)就可以按照自己的消費能力來消費,比如每秒1000個數(shù)據(jù),這樣慢慢寫入數(shù)據(jù)庫,這樣就不會卡死數(shù)據(jù)庫了
4,MQ的劣勢
1、系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦 MQ 宕機,就會對業(yè)務造成影響。如何保證MQ的高可用?
2、系統(tǒng)復雜度提高:MQ 的加入大大增加了系統(tǒng)的復雜度,以前系統(tǒng)間是同步的遠程調用,現(xiàn)在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
3、一致性問題:A 系統(tǒng)處理完業(yè)務,通過 MQ 給B、C、D三個系統(tǒng)發(fā)消息數(shù)據(jù),如果 B 系統(tǒng)、C 系統(tǒng)處理成功,D 系統(tǒng)處理失敗。如何保證消息數(shù)據(jù)處理的一致性?
既然 MQ 有優(yōu)勢也有劣勢,那么使用 MQ 需要滿足什么條件呢?
生產者不需要從消費者處獲得反饋。引入消息隊列之前的直接調用,其接口的返回值應該為空,這才讓明明下層的動作還沒做,上層卻當成動作做完了繼續(xù)往后走,即所謂異步成為了可能。 容許短暫的不一致性。 確實是用了有效果。即解耦、提速、削峰這些方面的收益,超過加入MQ,管理MQ這些成本。
常見的 MQ 產品
目前業(yè)界有很多的 MQ 產品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充當消息隊列的案例,而這些消息隊列產品,各有側重,在實際選型時,需要結合自身需求及 MQ 產品特征,綜合考慮。
** **RabbitMQActiveMQRocketMQKafka公司/社區(qū)RabbitApache阿里Apache開發(fā)語言ErlangJavaJavaScala&Java協(xié)議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義自定義協(xié)議,社區(qū)封裝了http協(xié)議支持客戶端支持語言官方支持Erlang,Java,Ruby等,社區(qū)產出多種API,幾乎支持所有語言Java,C,C++,Python,PHP,Perl,.net等Java,C++(不成熟)官方支持Java,社區(qū)產出多種API,如PHP,Python等單機吞吐量萬級(其次)萬級(最差)十萬級(最好)十萬級(次之)消息延遲微秒級毫秒級毫秒級毫秒以內功能特性并發(fā)能力強,性能極其好,延時低,社區(qū)活躍,管理界面豐富老牌產品,成熟度高,文檔較多MQ功能比較完備,擴展性佳只支持主要的MQ功能,畢竟是為大數(shù)據(jù)領域準備的。
三,RabbitMQ簡介
RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊列協(xié)議)協(xié)議實現(xiàn)的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應用非常廣泛。2007年,Rabbit 技術公司基于 AMQP 標準開發(fā)的 RabbitMQ 1.0 發(fā)布。 (Erlang 語言由 Ericson 設計,專門為開發(fā)高并發(fā)和分布式系統(tǒng)的一種語言,在電信領域使用廣泛)
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ 基礎架構如下圖:
上圖說明:
1、Broker:接收和分發(fā)消息的應用,就是一個中介,RabbitMQ Server就是 Message Broker 2、Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創(chuàng)建 exchange/queue 等 3、Connection:publisher/consumer 和 broker 之間的 TCP 連接 4、Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創(chuàng)建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷
5、Exchange:message 到達 broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 6、Queue:消息最終被送到這里等待 consumer 取走 7、Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)
RabbitMQ提供了6種模式:簡單模式,work模式,Publish/Subscribe發(fā)布與訂閱模式,Routing路由模式,Topics主題模式,RPC遠程調用模式(遠程調用,不太算MQ;暫不作介紹);官網對應模式介紹:https://www.rabbitmq.com/getstarted.html , 點擊手冊按鈕 RabbitMQ Tutorials
2. 安裝及配置RabbitMQ
RabbitMQ 官方地址:http://www.rabbitmq.com/
win10安裝
安裝erlang和rabbitmq后,進入rabbitmq安裝目錄的sbin目錄執(zhí)行下面命令
rabbitmq-plugins.bat enable rabbitmq_management
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-tUbZI5GZ-1691115209633)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20230713154626768.png)]
登錄URL:
http://localhost:15672/
用戶名密碼:guest/guest
問題:
? 登錄用戶是中文解決方案:
? 1、創(chuàng)建用戶為英文,再安裝相關環(huán)境
? 2、修改相應的目錄
? 用管理員執(zhí)行CMD
rabbitmq-service.bat remove
set RABBITMQ_BASE=D:\rabbitmq_server\data
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
查看進程
tasklist | find /i "erl"
關閉進程
taskkill /pid 7300 -t -f //將7300改成對應端口號
3. RabbitMQ快速入門
3.1 生產方工程搭建
1.添加相關依賴
修改pom.xml文件內容為如下:
2.啟動類
package com.woniu.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class);
}
}
3.配置RabbitMQ
創(chuàng)建application.yml,內容如下:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /woniu
username: woniu
password: woniu
創(chuàng)建隊列參數(shù)說明:
參數(shù)說明name字符串值,queue的名稱。durable布爾值,表示該 queue 是否持久化。 持久化意味著當 RabbitMQ 重啟后,該 queue 是否會恢復/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也會被持久化。exclusive布爾值,表示該 queue 是否排它式使用。排它式使用意味著僅聲明他的連接可見/可用,其它連接不可見/不可用。autoDelete布爾值,表示當該 queue 沒“人”(connection)用時,是否會被自動刪除。
不指定 durable、exclusive 和 autoDelete 時,默認為 true 、 false 和 false 。表示持久化、非排它、不用自動刪除。
創(chuàng)建交換機參數(shù)說明
參數(shù)說明name字符串值,exchange 的名稱。durable布爾值,表示該 exchage 是否持久化。 持久化意味著當 RabbitMQ 重啟后,該 exchange 是否會恢復/仍存在。autoDelete布爾值,表示當該 exchange 沒“人”(queue)用時,是否會被自動刪除。
不指定 durable 和 autoDelete 時,默認為 true 和 false 。表示持久化、不用自動刪除
4. AMQP
4.1. 相關概念介紹
AMQP 一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。AMQP是一個二進制協(xié)議,擁有一些現(xiàn)代化特點:多信道、協(xié)商式,異步,安全,擴平臺,中立,高效。
RabbitMQ是AMQP協(xié)議的Erlang的實現(xiàn)。
概念說明連接Connection一個網絡連接,比如TCP/IP套接字連接。會話Session端點之間的命名對話。在一個會話上下文中,保證“恰好傳遞一次”。信道Channel多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。為會話提供物理傳輸介質??蛻舳薈lientAMQP連接或者會話的發(fā)起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。服務節(jié)點Broker消息中間件的服務節(jié)點;一般情況下可以將一個RabbitMQ Broker看作一臺RabbitMQ 服務器。端點AMQP對話的任意一方。一個AMQP連接包括兩個端點(一個是客戶端,一個是服務器)。消費者Consumer一個從消息隊列里請求消息的客戶端程序。生產者Producer一個向交換機發(fā)布消息的客戶端應用程序。
4.2. RabbitMQ運轉流程
在入門案例中:
生產者發(fā)送消息
生產者創(chuàng)建連接(Connection),開啟一個信道(Channel),連接到RabbitMQ Broker;聲明隊列并設置屬性;如是否排它,是否持久化,是否自動刪除;將路由鍵(空字符串)與隊列綁定起來;發(fā)送消息至RabbitMQ Broker;關閉信道;關閉連接; 消費者接收消息
消費者創(chuàng)建連接(Connection),開啟一個信道(Channel),連接到RabbitMQ Broker向Broker 請求消費相應隊列中的消息,設置相應的回調函數(shù);等待Broker回應投遞隊列中的消息,消費者接收消息;確認(ack,自動確認)接收到的消息;RabbitMQ從隊列中刪除相應已經被確認的消息;關閉信道;關閉連接;
4.3. 生產者流轉過程說明
客戶端與代理服務器Broker建立連接。會調用newConnection() 方法,這個方法會進一步封裝Protocol Header 0-9-1 的報文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 協(xié)議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個命令的交互。 客戶端調用connection.createChannel方法。此方法開啟信道,其包裝的channel.open命令發(fā)送給Broker; channel.basicPublish方法對應的AMQP命令為Basic.Publish,這個命令包含了content Header 和content Body()。content Header 包含了消息體的屬性,例如:投遞模式,優(yōu)先級等,content Body 包含了消息體本身。 客戶端發(fā)送完消息需要關閉資源時,涉及到Channel.Close和Channel.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。
4.4. 消費者流轉過程說明
消費者客戶端與代理服務器Broker建立連接。會調用newConnection() 方法,這個方法會進一步封裝Protocol Header 0-9-1 的報文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 協(xié)議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個命令的交互。消費者客戶端調用connection.createChannel方法。和生產者客戶端一樣,協(xié)議涉及Channel . Open/Open-Ok命令。在真正消費之前,消費者客戶端需要向Broker 發(fā)送Basic.Consume 命令(即調用channel.basicConsume 方法〉將Channel 置為接收模式,之后Broker響應Basic . Consume - Ok 以告訴消費者客戶端準備好消費消息。Broker 向消費者客戶端推送(Push) 消息,即Basic.Deliver 命令,這個命令和Basic.Publish 命令一樣會攜帶Content Header 和Content Body。消費者接收到消息并正確消費之后,向Broker 發(fā)送確認,即Basic.Ack 命令??蛻舳税l(fā)送完消息需要關閉資源時,涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。
5. RabbitMQ工作模式
5.1. Work queues工作隊列模式
simple模式: 一個生產者一個消費者 定義rabbitconfig: 創(chuàng)建消息隊列,交換機及其之間綁定 @Configuration
public class RabbitmqConfig {
/**
* simple 隊列
*/
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simpleQueue").build();
//return new Queue("simpleQueue");
}
}
定義生產者
/**
* 往消息隊列返送消息
*/
@Component
public class SimpleProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
rabbitTemplate.convertAndSend("simpleQueue",msg);
}
}
? 定義消費者
@Component
public class SimpleConsumer {
/**
* 消費消息
*/
@RabbitListener(queues = "simpleQueue")
private void recevie(String msg){
System.out.println("消費者接收到:"+msg);
}
}
**work模式:**一個生產者多個消費者,也稱之為競爭模式 創(chuàng)建兩個消費者監(jiān)聽隊列
Work Queues與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。它們處于競爭者的關系,一條消息只會被一個消費者接收,rabbit采用輪詢的方式將消息是平均發(fā)送給消費者的;消費者在處理完某條消息后,才會收到下一條消息。
應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。如生產者生產一千條消息,那么c1和c2各消費500條,隊列消費消息是均衡分配
復制消費方代碼,重新編寫一個消費端,然后啟動兩個消費端,進行測試
5.2. 訂閱模式類型
訂閱模式示例圖:
在訂閱模型中,多了一個exchange角色,而且過程略有變化:
P:生產者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給X(交換機)C:消費者,消息的接受者,會一直等待消息到來。Queue:消息隊列,接收消息、緩存消息。Exchange:交換機,圖中的X。一方面,接收生產者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機的隊列Direct:定向,把消息交給符合指定routing key 的隊列Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
5.2.1 廣播模式
1.創(chuàng)建RabbitMQ隊列與交換機綁定的配置類com…rabbitmq.config.RabbitMQConfig
/**
* 負責:創(chuàng)建消息隊列,交換機及其之間綁定
*/
@Configuration
public class RabbitmqConfig {
/**
* fanout 模式
*/
@Bean
public Queue fanoutQueueA(){
return QueueBuilder.durable("fanoutQueueA").build();
}
@Bean
public Queue fanoutQueueB(){
return QueueBuilder.durable("fanoutQueueB").build();
}
/**
* 創(chuàng)建交換機
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 把消息隊列,綁定到交換機上
* IOC 在調用配置中的方法時,如果有參數(shù),默認以形參的名字找到IOC中對應的方法
*/
@Bean
public Binding fanoutQueueAToFanoutExchange( Queue fanoutQueueA,
FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}
@Bean
public Binding fanoutQueueToFanoutExchange( Queue fanoutQueueB,
FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}
}
1、實現(xiàn)生產者
/**
* 往消息隊列返送消息
*/
@Component
public class FanoutProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* arg1: 交換機名字
* arg2: 路由名字
* arg3: 參數(shù)名字
* @param msg
*/
public void send(String msg){
rabbitTemplate.convertAndSend("fanoutExchange","",msg);
}
}
創(chuàng)建交換機參數(shù)說明:
參數(shù)說明exchange字符串值,交換機名稱type交換機的類型,有三種類型:FANOUT、DIRECT、TOPICdurable交換機是否持久化,表示當rabbitmq重啟時或者意外宕機,這個交換機還在不在autoDelete是否自動刪除,表示當該交換機沒人發(fā)消息時,是否會被自動刪除。internal內部使用,一般為falsearguments其它參數(shù)
發(fā)送消息參數(shù)說明
參數(shù)說明exchange字符串值,交換機名稱routingKey如果交換機類型是fanout,則routingKey為""props消息基本屬性配置body要發(fā)送的消息的內容
2、消費方實現(xiàn)
@Component
public class FanoutConsumer {
/**
* 消費消息
*/
@RabbitListener(queues = "fanoutQueueA")
private void recevie(String msg){
System.out.println("消費者A接收到:"+msg);
}
/**
* 消費消息
*/
@RabbitListener(queues = "fanoutQueueB")
private void recevieB(String msg){
System.out.println("消費者B接收到:"+msg);
}
}
發(fā)布訂閱模式與工作隊列模式的區(qū)別
1、工作隊列模式不用定義交換機,而發(fā)布/訂閱模式需要定義交換機。
2、發(fā)布/訂閱模式的生產方是面向交換機發(fā)送消息,工作隊列模式的生產方是面向隊列發(fā)送消息(底層使用默認交換機)。
3、發(fā)布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。
5.2.2 Routing路由模式
路由模式特點:
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息
在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機的類型為:Direct,還有隊列綁定交換機的時候需要指定routing key。
1、創(chuàng)建RabbitMQ隊列與交換機綁定的配置類com…rabbitmq.config.RabbitMQConfig
/**
* 負責:創(chuàng)建消息隊列,交換機及其之間綁定
*/
@Configuration
public class RabbitmqConfig {
//==================route===
/**
* 路由模式:通過路由key將消息發(fā)送給指定消息隊列
* 1個交換機,2個消息隊列,1個生產者,2個消費者
*/
@Bean
public Queue routeQueueA(){
return QueueBuilder.durable("routeQueueA").build();
}
@Bean
public Queue routeQueueB(){
return QueueBuilder.durable("routeQueueB").build();
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
//綁定
@Bean
public Binding routeQueueToDirectExchange(Queue routeQueueA,
DirectExchange directExchange){
return BindingBuilder.bind(routeQueueA).to(directExchange).with("red");
}
@Bean
public Binding routeQueueBToDirectExchange(Queue routeQueueB,
DirectExchange directExchange){
return BindingBuilder.bind(routeQueueB).to(directExchange).with("blue");
}
}
2、生產方實現(xiàn)
*/
@Component
public class DirectProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* arg1: 交換機名字
* arg2: 路由名字
* arg3: 參數(shù)名字
* @param msg
*/
public void send(String msg,String routekey){
rabbitTemplate.convertAndSend("directExchange",routekey,msg);
}
}
3.消費方實現(xiàn)
創(chuàng)建2個消費方并啟動,然后使用生產者發(fā)送消息;在消費者對應的控制臺可以查看到生產者發(fā)送對應routing key對應隊列的消息;到達按照需要接收的效果
@Component
public class DirectConsumer {
/**
* 消費消息
*/
@RabbitListener(queues = "routeQueueA")
private void recevie(String msg){
System.out.println("消費者A接收到:"+msg);
}
/**
* 消費消息
*/
@RabbitListener(queues = "routeQueueB")
private void recevieB(String msg){
System.out.println("消費者B接收到:"+msg);
}
}
5.2.3. Topics通配符模式(模糊路由)
Topic類型與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#:匹配0個或多個詞
*:匹配不多不少恰好1個詞
舉例:
item.#:能夠匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
創(chuàng)建RabbitMQ隊列與交換機綁定的配置類com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Topics通配符模式
@Bean
public Queue tt01(){
return new Queue("tt01");
}
@Bean
public Queue tt02(){
return new Queue("tt02");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("tte");
}
@Bean
public Binding ttBinding01(){
return BindingBuilder.bind(tt01()).to(topicExchange()).with("#.error");
}
@Bean
public Binding ttBinding02(){
return BindingBuilder.bind(tt02()).to(topicExchange()).with("order.*");
}
}
1、生產方代碼實現(xiàn)
使用topic類型的Exchange
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@RequestMapping("/sendT1MT1/{msg}")
public String sendT1MT1(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error", msg);
return "success";
}
@RequestMapping("/sendT1MT2/{msg}")
public String sendT1MT2(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.22.error", msg);
return "success";
}
@RequestMapping("/sendT1MF/{msg}")
public String sendT1MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error.22", msg);
return "success";
}
@RequestMapping("/sendT2MF/{msg}")
public String sendT2MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.11.22", msg);
return "success";
}
@RequestMapping("/sendT2MT/{msg}")
public String sendT2MT(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.1", msg);
return "success";
}
}
消費者
@Component
public class DirectComsume {
@RabbitListener(queues = "tt01")
public void received(String msg){
System.out.println("接收到消息:"+msg);
}
@RabbitListener(queues = "tt02")
public void receivedB(String msg){
System.out.println("接收到消息:"+msg);
}
}
創(chuàng)建2個消費方并啟動,然后使用生產者發(fā)送消息;在消費者對應的控制臺可以查看到生產者發(fā)送對應routing key對應隊列的消息;到達按照需要接收的效果;并且這些routing key可以使用通配符。
四,RabbitMQ高級
1.消息的可靠投遞(生產者端)
在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
confirm 確認模式return 退回模式
rabbitmq 整個消息投遞的路徑為:producer—>rabbitmq broker—>exchange—>queue—>consumer
消息從 producer 到 exchange,不管exchange是否收到生產者消息,都會返回一個 confirmCallback 。消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback 。
我們將利用這兩個 callback 控制消息的可靠性投遞
1.1.confirmCallback確認模式
1.在配置文件中 添加publisher-confirm-type: correlated配置
spring:
rabbitmq:
host: localhost
port: 5672
username: woniu
password: woniu
virtual-host: /woniu
publisher-confirm-type: correlated #發(fā)布消息成功到交換器后會觸發(fā)回調方法
publisher-returns: true #返回確認信息
在springboot2.2.0.RELEASE版本之前(spring.rabbitmq.publisher-confirm發(fā)布確認屬性配置)是amqp正式支持的屬性,用來配置消息發(fā)送到交換器之后是否觸發(fā)回調方法,在2.2.0及之后該屬性過期使用spring.rabbitmq.publisher-confirm-type屬性配置代替,用來配置更多的確認類型;
NONE值是禁用發(fā)布確認模式,是默認值; CORRELATED值是發(fā)布消息成功到交換器后會觸發(fā)回調方法; SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發(fā)回調方法,其二在發(fā)布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點返回發(fā)送結果,根據(jù)返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發(fā)送消息到broker。
2、編寫Product類
package com.woniu.product;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 往消息隊列返送消息
*/
@Component
public class ConfirmProduct implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* arg1: 交換機名字
* arg2: 路由名字
* arg3: 參數(shù)名字
* @param msg
*/
public void send(String msg,String routekey){
rabbitTemplate.setConfirmCallback(this); //設置回調
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.convertAndSend("confimExchange",routekey,msg);
}
/**
*
* @param correlationData 消息的唯一標識,如果發(fā)送失敗,可以根據(jù)這個標識補發(fā)信息
* @param status 交換機是否成功收到信息,true:成功
* @param reason 失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean status, String reason) {
System.out.println("進入confirm方法");
System.out.println(status);
System.out.println(reason);
}
/**
* 只有消息路由失敗進入,比如:找不到路由等
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
returnedMessage.getMessage(); //失敗的原因
returnedMessage.getReplyCode(); //失敗的狀態(tài)碼
returnedMessage.getReplyText(); //失敗的原因
returnedMessage.getExchange(); //交換機
returnedMessage.getRoutingKey(); //路由
}
}
失敗演示可以把convertAndSend的交換機名字寫錯
1.2.ReturnCallBack確認模式
在上個例子的基礎上,再添加一個測試方法returnedMessage
由于路由鍵不正確 022,故交換機的消息無法發(fā)送到消息隊列,setReturnCallback()方法,也就是Exchange路由到Queue失敗時執(zhí)行,這個前提是必須設置 rabbitTemplate.setMandatory(true);如果不加這句話,意味著交換機處理消息模式采用默認的模式,模式模式是直接丟掉該消息,不會執(zhí)行setReturnCallback()方法。 當然如果交換機發(fā)送消息到隊列,如果成功了也不會執(zhí)行該方法,因為setReturnCallback是交換機發(fā)送消息到隊列失敗才執(zhí)行的。
失敗演示生產者可以把發(fā)送的routekey寫錯
消息的可靠投遞小結
設置ConnectionFactory的publisher-confirms=“true” 開啟 確認模式。 使用rabbitTemplate.setConfirmCallback設置回調函數(shù)。當消息發(fā)送到exchange后回調confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
設置ConnectionFactory的publisher-returns=“true” 開啟 退回模式。使用rabbitTemplate.setReturnCallback設置退回函數(shù),當消息從exchange路由到queue失敗后,如果設置了rabbitTemplate.setMandatory(true)參數(shù),則會將消息退回給producer。并執(zhí)行回調函數(shù)returnedMessage。
在RabbitMQ中也提供了事務機制,但是性能較差,此處不做講解。 使用channel下列方法,完成事務控制: txSelect(), 用于將當前channel設置成transaction模式 txCommit(),用于提交事務 txRollback(),用于回滾事務
2.Consumer ACK(消費者端)
ack指Acknowledge(翻譯為:應答),表示消費端收到消息后的確認方式。有三種確認方式:
自動確認:acknowledge=“none”手動確認:acknowledge=“manual”根據(jù)異常情況確認:acknowledge=“auto”,(這種方式使用麻煩,不作講解)
其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應 message 從 RabbitMQ 的消息緩存中移除。但是在實際業(yè)務處理中,很可能消息接收到,業(yè)務處理出現(xiàn)異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業(yè)務處理成功后,調用channel.basicAck(),手動確認,如果出現(xiàn)異常,則調用channel.basicNack()方法,讓其自動重新發(fā)送消息。
2.1 消費方工程搭建
1.在配置文件中 添加手動確認的配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 默認一批一批消費的,提高消息,默認250個
2.編寫Ack監(jiān)聽器
@Component
public class ConfirmConsumer {
/**
* 消費消息
*/
@RabbitListener(queues = "confirmQueue")
private void recevie(String msg, Message message, Channel channel) throws IOException {
//arg1:消息的id
//arg2:是否批量確認這些信息 false:否
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
/**
* arg1:消息的id
* arg2:是否批量確認這些信息 false:否
* arg3:是否重新將消息放回隊列中,false:不放會,一般都是false
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
System.out.println("消費者A接收到:"+msg);
}
}
basicAck的批量應答問題說明:
channel.basicAck(8,true) 如果前面還有4,6,7的deliveryTag未被確認,則會一起確認,減少網絡流量,當然當前deliveryTag=8這條消息也會確認,如果沒有前面沒有未被確認的消息,則只會確認當前消息,也就是說可以一次性確認某個隊列小于等于delivery_tag值的所有消息
basicNack的參數(shù)說明:
第一個參數(shù)為deliveryTag,也就是每個消息標記index,消息標記值從1開始,依次遞增 第二個參數(shù)為multiple,表示是否批量,如果為true,那么小于或者等于該消息標記的消息(如果還沒有簽收)都會拒絕簽收 第三個參數(shù)為requeue,表示被拒絕的消息是否重回隊列,如果設置為true,則消息重新回到queue,那么broker會重新推送該消息給消費端,如果設置為false,則消息在隊列中被刪除,即消息會被直接丟失(當然如果為false,還有一種情況就是放到死信隊列)
啟動之前的生產者發(fā)送消息給test_queue_confirm隊列,如果拋出異常則該消息一直重發(fā)
2.2 消息可靠性總結
持久化
exchange要持久化queue要持久化message要持久化
生產方確認Confirm 消費方確認Ack Broker高可用
3. 死信隊列
TTL(time to live)
TTL是rabbitmq 中一個消息或者隊列的屬性,表明一條信息或者該隊列中的所有消息的最大存活時間。單位是毫秒,換句話說,如果一條信息設置了TTL 屬性或者設置TTL屬性的隊列,那么這個條信息如果在TTL設置的時間沒有被消費,則會成為死信。如果同時配置了隊列的TTL和消息的TTL, 那么較小的那個值將會被使用。
死信隊列
英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息在隊列成為Dead message后,通過該隊列把這條死信消息發(fā)給另一個交換機,這個交換機就是DLX。
消息成為死信的三種情況(面試常問):
隊列消息長度到達限制(淘汰最早的消息);消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;原隊列存在消息過期設置,消息到達超時時間未被消費;
隊列綁定死信交換機: 給隊列設置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key
5.1 死信隊列實現(xiàn)過程
1、創(chuàng)建RabbitMQ隊列與交換機綁定的配置類com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
// 1. 創(chuàng)建DLX交換機
@Bean
public DirectExchange dlxDirectExchange(){
return new DirectExchange("dlx_exchange");
}
// 2. ttl隊列
@Bean
public Queue ttlQueue(){
Map
// 1、正常隊列綁定死信交換機-->
// 1.1 x-dead-letter-exchange 死信交換機的名稱
args.put("x-dead-letter-exchange", "dlx_exchange");
// 1.2 x-dead-letter-routing-key 正常隊列發(fā)送消息到死信 交換機的routingKey
args.put("x-dead-letter-routing-key", "dlx01");
// 2 消息成為死信的三種情況
// 2.1 設置隊列的過期時間 ttl x-message-ttl
args.put("x-message-ttl", 1000 * 10);
// 2.2 設置隊列的長度限制 x-max-length 10條消息,超過進死信
args.put("x-max-length", 10);
// 2.3 消費者拒接消費消息,并且不重回隊列 這種情況后面在消費工程測試
return QueueBuilder.durable("ttlQueue").withArguments(args).build();
}
// 3. 死信隊列
@Bean
public Queue dlxQ(){
return new Queue("dlxQ");
}
// 4.dlxQ綁定DXL交換機
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQ()).to(dlxDirectExchange()).with("dlx01");
}
}
2、生產者工程測試:
//死信隊列測試
@Test
public void testDlx(){
//1、測試過期時間,死信消息
//rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會死嗎");
//2、測試隊列長度限制,消息死信
for (int i = 0; i < 20 ; i++) {
rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會死嗎");
}
//前兩步測試結果:死信隊列會有21條記錄 1(過期) + 10(限制)+10(正常隊列過期后的10條)
}
3、消息成為死信的第三種情況實現(xiàn)
1).在配置文件中 添加手動確認的配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
2).添加正常隊列的監(jiān)聽器
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*
* @Date:2023/2/28
* @Description:
*/
@Component
@RabbitListener(queues = "ttlQueue")
public class TtlListener {
@SneakyThrows
@RabbitHandler
public void dlxQ(String msg, Message message, Channel channel){
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("處理業(yè)務邏輯");
int m = 1/0;
channel.basicAck(deliveryTag,true);
}catch(Exception ex){
/** basicNack(long deliveryTag, boolean multiple, boolean requeue)
* multiple是否批量. true:將一次性拒絕所有小于或者等于deliveryTag的消息。
* requeue:被拒絕的消息是否重回隊列。如果設置為true,則消息重新回到queue,broker會重新發(fā)送該消息給消費端,如果 * 為 requeue=false,不重回隊列,則消息發(fā)送最終到死信隊列
*/
channel.basicNack(deliveryTag,true,false);
}
}
}
3).在生產端的testDlx方法再次給正常交換機發(fā)送消息
//死信隊列測試
@Test
public void testDlx(){
rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會死嗎?");
}
4. 延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費者調用,只有到達指定時間后,才會被調用者調用消費。
如下需求:
下單后,30分鐘未支付,取消訂單,回滾庫存。
當用戶提交訂單后,數(shù)據(jù)庫保存訂單信息,同時庫存表相應的庫存減少,然后消息隊列保存訂單的信息(如訂單Id),此時庫存系統(tǒng)監(jiān)聽隊列,隊列不會把消息立刻發(fā)送給庫存,而是過30分鐘再把信息發(fā)送給庫存系統(tǒng),庫存系統(tǒng)去查詢訂單數(shù)據(jù)庫,根據(jù)訂單id查詢,如果該訂單還沒有支付,則取消訂單,回滾庫存,如果支付過了,則庫存表什么都不用做。也就是給用戶30分鐘的機會,一個訂單在30分鐘后還沒有支付,則該訂單的庫存信息直接回滾。
新用戶注冊成功7天后,發(fā)送短信問候。
實現(xiàn)方式:
定時器:我們可以寫一段代碼,在某個時間段查詢訂單表的支付情況。把提交訂單的時間查出來和當前系統(tǒng)時間比較,30分鐘之類如果訂單狀態(tài)為支付,則取消該訂單,大家思考一下有什么問題?延遲隊列:很可惜,在RabbitMQ中并未提供延遲隊列功能。但是可以使用:TTL+死信隊列 組合實現(xiàn)延遲隊列的效果。
6.1 延遲隊列實現(xiàn)過程
通過插件實現(xiàn)
a, 把插件放入rabbitmq安裝目錄的plugins目錄
b, 進入rabbitmq 安裝目錄的sbin 目錄
執(zhí)行下面命令讓改插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
代碼實現(xiàn)
1:定義個一個延遲交換機,一個隊列
// 延遲交換機,消息隊列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("delay").build();
}
/**
* CustomExchange: 自定義交換機 ,是fanout,direct,topic 交換機
* @return
*/
@Bean
public CustomExchange customExchange(){
Map map = new HashMap();
//指定交換機類型
map.put("x-delayed-type","direct");
/**
* arg1:交換機名字
* arg2: 交換機信息類型,延遲消息
* arg3: 是否持久化,是否將沒有被消費的消息持久化
* arg4: 沒有隊列綁定到交換機,交換機是否刪除。
* arg5: 初始化參數(shù)
*/
return new CustomExchange("customExchange","x-delayed-message",
true,false,map);
}
/**
* 交換機綁定隊列
* @param delayQueue
* @param customExchange
* @return
*/
@Bean
public Binding delayQueueTocustomExchange(Queue delayQueue,CustomExchange customExchange){
return BindingBuilder.bind(delayQueue).to(customExchange).with("delay").noargs();
}
2: 定義個生產者
/**
* 往消息隊列返送消息
*/
@Component
@Slf4j
public class DelayProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg,int delayTime){
log.info("發(fā)送消息");
rabbitTemplate.convertAndSend("customExchange", "delay", msg,
new MessagePostProcessor() {
/**
* 在消息發(fā)送到消息隊列之前對消息進行處理
* @param message
* @return
* @throws AmqpException
*/
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 給消息設置過期時間,單位是毫秒
message.getMessageProperties().setDelay(delayTime);
return message;
}
});
}
}
3: 定義個消費者
@Component
@Slf4j
public class DelayConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "delay")
public void recevied(String msg,Channel channel,Message message) throws IOException {
log.info("消費消息:"+ msg);
/**
* 消費信息的id.
* 是否批量確認信息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
5. 日志與監(jiān)控
7.1 RabbitMQ日志
RabbitMQ默認日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務節(jié)點名稱、cookie的hash值、RabbitMQ配置文件地址、內存限制、磁盤限制、默認賬戶guest的創(chuàng)建以及權限配置等等。
7.2 rabbitmq常用命令
1、查看隊列
rabbitmqctl list_queues #查看所有虛擬主機里面的隊列
rabbitmqctl list_queues -p /vhost #查看某個虛擬主機里面的隊列
2、刪除所有隊列
rabbitmqctl stop_app #關閉應用
rabbitmqctl reset #清除隊列中的消息
rabbitmqctl start_app # 再次啟動此應用
注意:此方式,會同時刪除一些配置信息,需要慎用
3、查看rabbitmq中的交換機
rabbitmqctl list_exchanges [-p vhost]
4、rabbitmq的用戶操作命令
rabbitmqctl list_users
rabbitmqctl add_user 用戶名 密碼
rabbitmqctl delete_user 用戶名
5、查看未被確認的隊列
rabbitmqctl list_queues name messages_unacknowledged
6、查看隊列環(huán)境變量
rabbitmqctl environment
7、查看隊列消費者信息
rabbitmqctl list_consumers
8、查看隊列連接
rabbitmqctl list_connections
9、查看準備就緒的隊列
rabbitmqctl list_queues name messages_ready
10、查看單個隊列的內存使用
rabbitmqctl list_queues name memory
11、列出所有虛擬主機
rabbitmqctl list_vhosts
rabbitmqctl status | grep rabbit ##查看rabbitmq的版本
6 消息追蹤
在使用任何消息中間件的過程中,難免會出現(xiàn)某條消息異常丟失的情況。
對于RabbitMQ而言,可能是因為生產者或消費者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認機制;也有可能是因為交換器與隊列之間不同的轉發(fā)策略;甚至是交換器并沒有與任何隊列進行綁定,生產者又不感知或者沒有采取相應的措施;另外RabbitMQ本身的集群策略也可能導致消息的丟失。這個時候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協(xié)助開發(fā)和運維人員進行問題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現(xiàn)消息追蹤。
8.1 消息追蹤-Firehose(了解)
firehose的機制是將生產者投遞給隊列的消息,以及隊列投遞給消費者的消息按照指定的格式發(fā)送到默認的exchange上。這個默認的exchange的名稱為amq.rabbitmq.trace,它是一個topic類型的exchange。發(fā)送到這個exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實際交換機和隊列的名稱,分別對應生產者投遞到exchange的消息,和消費者從queue上獲取的消息。
1、打開trace 功能
rabbitmqctl trace_on [-p vhost] ##開啟Firehose命令
打開 trace 會影響消息寫入功能,適當打開后請關閉,關閉Firehose命令:rabbitmqctl trace_off [-p vhost],打開后會多一個交換機,如下圖
2、新建一個消息隊列,并給該交換機綁定一個消息隊列
3、打開任何一個其他的隊列,并往隊列發(fā)送一條消息,則這個test_trace隊列也會有其他隊列的消息
8.2 消息追蹤-rabbitmq_tracing
rabbitmq_tracing和Firehose在實現(xiàn)上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。
1、啟用插件:
[root@localhost ~]# rabbitmq-plugins list ###查詢插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing
1、新建一個trace,將來所有的消息都被trace保存起來,文件的默認路徑為/var/tmp/rabbitmq-tracing
不管在哪個隊列發(fā)送消息,都會保存到日志文件mytrace.log中
如果是用其它的用戶創(chuàng)建這個消息日志。則需要在/etc/rabbitmq/rabbit.config配置文件添加如下內容:創(chuàng)建的用戶名和密碼
{rabbitmq_tracing,
[
{directory, "/var/log/rabbitmq/rabbitmq_tracing"},
{username, "woniu"},
{password, "woniu"}
]
}
重啟消息隊列服務器即可
7.RabbitMQ應用問題
消息可靠性保障、消息冪等性處理 、微服務中用消息隊列實現(xiàn)微服務的異步調用,而用openfeign采用的同步
9.1 消息可靠性保障-消息補償
消息補償機制
需求:100%確保消息發(fā)送成功
9.2 消息冪等性保障-樂觀鎖(了解)
冪等性指一次和多次請求某一個資源,對于資源本身應該具有同樣的結果。也就是說,其任意多次執(zhí)行對資源本身所產生的影響均與一次執(zhí)行的影響相同。MQ中指,消費多條相同的消息,得到與消費該消息一次相同的結果。
樂觀鎖解決方案
第一次生產者發(fā)送一條消息,但是消費方系統(tǒng)宕機,即不能立即消費,于是回調檢查服務監(jiān)聽不到Q2的響應消息,也不會寫入數(shù)據(jù)庫MDB,當隔一段時間后,生產者又發(fā)送一條延遲消息到Q3隊列,回調檢查服務能監(jiān)聽到Q3隊列消息,于是和MDB去比較是否有,由于消費方的失敗,消息最終沒有入庫MDB,這個時候回調檢查服務和MDB數(shù)據(jù)庫比對失敗,于是通知生產者,重新發(fā)送一條消息給消費者,那么這個時候Q1就有2條消息了,當消費方正常運行的時候,由于監(jiān)聽的Q1是兩條2消息,怎么辦呢?樂觀鎖
第一次執(zhí)行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次執(zhí)行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
9.3 消息積壓問題
實際場景可能有這樣現(xiàn)象:大量消息在rabbitmq里積壓了幾個小時了還沒消息,怎么辦?
[root@localhost ~]# rabbitmq-plugins list ###查詢插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing
1、新建一個trace,將來所有的消息都被trace保存起來,文件的默認路徑為/var/tmp/rabbitmq-tracing
不管在哪個隊列發(fā)送消息,都會保存到日志文件mytrace.log中
如果是用其它的用戶創(chuàng)建這個消息日志。則需要在/etc/rabbitmq/rabbit.config配置文件添加如下內容:創(chuàng)建的用戶名和密碼
{rabbitmq_tracing,
[
{directory, "/var/log/rabbitmq/rabbitmq_tracing"},
{username, "woniu"},
{password, "woniu"}
]
}
重啟消息隊列服務器即可
7.RabbitMQ應用問題
消息可靠性保障、消息冪等性處理 、微服務中用消息隊列實現(xiàn)微服務的異步調用,而用openfeign采用的同步
9.1 消息可靠性保障-消息補償
消息補償機制
需求:100%確保消息發(fā)送成功
9.2 消息冪等性保障-樂觀鎖(了解)
冪等性指一次和多次請求某一個資源,對于資源本身應該具有同樣的結果。也就是說,其任意多次執(zhí)行對資源本身所產生的影響均與一次執(zhí)行的影響相同。MQ中指,消費多條相同的消息,得到與消費該消息一次相同的結果。
樂觀鎖解決方案
第一次生產者發(fā)送一條消息,但是消費方系統(tǒng)宕機,即不能立即消費,于是回調檢查服務監(jiān)聽不到Q2的響應消息,也不會寫入數(shù)據(jù)庫MDB,當隔一段時間后,生產者又發(fā)送一條延遲消息到Q3隊列,回調檢查服務能監(jiān)聽到Q3隊列消息,于是和MDB去比較是否有,由于消費方的失敗,消息最終沒有入庫MDB,這個時候回調檢查服務和MDB數(shù)據(jù)庫比對失敗,于是通知生產者,重新發(fā)送一條消息給消費者,那么這個時候Q1就有2條消息了,當消費方正常運行的時候,由于監(jiān)聽的Q1是兩條2消息,怎么辦呢?樂觀鎖
第一次執(zhí)行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次執(zhí)行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
9.3 消息積壓問題
實際場景可能有這樣現(xiàn)象:大量消息在rabbitmq里積壓了幾個小時了還沒消息,怎么辦?
這種時候只好采用 “丟棄+批量重導” 的方式來解決了,臨時寫個程序,連接到mq里面消費數(shù)據(jù),收到消息之后直接將其丟棄,快速消費掉積壓的消息,降低MQ的壓力?;蛘叨鄦讉€消費端。
柚子快報激活碼778899分享:分布式 rabbitmq
參考閱讀
本文內容根據(jù)網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。