欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:RabbitMQ 幾種模式

柚子快報激活碼778899分享:RabbitMQ 幾種模式

http://yzkb.51969.com/

一、Hello World 模式

????????在這一部分中,我們將用 Java 編寫兩個程序。發(fā)送單個消息的生產(chǎn)者和接收消息并打印出來的消費(fèi)者。模型如下所示:

????????在下圖中,“ P” 是我們的生產(chǎn)者,“ C” 是我們的消費(fèi)者。中間的框是一個隊列 RabbitMQ 代表使用者保留的消息緩沖區(qū)。

1.1 生產(chǎn)者

1.1.1 添加依賴

org.apache.maven.plugins

maven-compiler-plugin

8

8

com.rabbitmq

amqp-client

5.8.0

commons-io

commons-io

2.6

1.1.2 編寫生產(chǎn)者代碼

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

// 生產(chǎn)者,用于推送消息

public class Producer {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.146");

factory.setUsername("admin");

factory.setPassword("123");

// channel 實(shí)現(xiàn)了自動 close 接口 自動關(guān)閉 不需要顯示關(guān)閉

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

/**

* 生成一個隊列

* 1.隊列名稱

* 2.隊列里面的消息是否持久化 默認(rèn)消息存儲在內(nèi)存中

* 3.該隊列是否只供一個消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個消費(fèi)者消費(fèi)

* 4.是否自動刪除 最后一個消費(fèi)者端開連接以后 該隊列是否自動刪除 true 自動刪除

* 5.其他參數(shù)

*/

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

String message="hello world";

/**

* 發(fā)送一個消息

* 1.發(fā)送到那個交換機(jī)

* 2.路由的 key 是哪個

* 3.其他的參數(shù)信息

* 4.發(fā)送消息的消息體

*/

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

System.out.println("消息發(fā)送完畢");

}

}

1.1.3 推送數(shù)據(jù)

? ? ? ? 運(yùn)行代碼,然后在我們的 rabbitmq 管理界面上可以看到,此時我們的 hello 隊列里面,有一個消息等待被消費(fèi),如下所示:

1.2 消費(fèi)者

1.2.1 編寫消費(fèi)者代碼

import com.rabbitmq.client.*;

// 接收消息的消費(fèi)者

public class Consumer {

// 隊列的名稱

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.146");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

System.out.println("等待接收消息....");

// 接收到消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback=(consumerTag, message)->{

String resultMessage= new String(message.getBody());

System.out.println(resultMessage);

};

// 取消消費(fèi)時的回調(diào)函數(shù),比如在消費(fèi)的時候隊列被刪除掉了

CancelCallback cancelCallback=(consumerTag)->{

System.out.println("消息消費(fèi)被中斷");

};

/**

* 消費(fèi)者消費(fèi)消息

* 1.消費(fèi)哪個隊列

* 2.消費(fèi)成功之后是否要自動應(yīng)答 true 代表自動應(yīng)答 false 手動應(yīng)答

* 3.消費(fèi)者未成功消費(fèi)的回調(diào)

* 4.消費(fèi)者取消消費(fèi)的回調(diào)

*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

1.2.2 接收數(shù)據(jù)

????????啟動代碼,可以看到,我們成功的消費(fèi)了 rabbitmq 里面的消息。

二、Work Queues 模式

????????工作隊列(又稱任務(wù)隊列)的主要思想是假設(shè)生產(chǎn)者發(fā)送大量的消息,會把消息放入隊列之中,后面有多個工作線程去接收處理消息。

2.1 輪詢分發(fā)消息

????????在這個案例中我們會啟動三個工作線程,一個消息發(fā)送線程,我們來看看他們?nèi)齻€工作線程是如何工作的。輪詢模式就是你處理一個,我處理一個,按順序一個一個的進(jìn)行處理。

2.1.1 抽取工具類

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {

// 得到一個連接的 channel

public static Channel getChannel() throws Exception{

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.147");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

return channel;

}

}

2.1.2 啟動三個工作線程

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Worker01 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println("接收到消息:"+receivedMessage);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

System.out.println("C1 消費(fèi)者啟動等待消費(fèi)......");

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Worker02 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println("接收到消息:"+receivedMessage);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

System.out.println("C2 消費(fèi)者啟動等待消費(fèi)......");

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Worker03 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println("接收到消息:"+receivedMessage);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

System.out.println("C3 消費(fèi)者啟動等待消費(fèi)......");

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

2.1.3 啟動一個發(fā)送線程

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

public class Task01 {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i = 0; i < 10; i++) {

String message = "I am message " + i;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println("發(fā)送消息完成:" + message);

}

}

}

2.1.4 結(jié)果展示

????????通過程序執(zhí)行發(fā)現(xiàn)生產(chǎn)者總共發(fā)送 10?個消息,消費(fèi)者二和消費(fèi)者三分別分得 3 個消息,消費(fèi)者一分得?4?個消息,并且是按照有序的一個接收一次消息。

2.2 消息應(yīng)答

2.2.1 概念

????????消費(fèi)者完成一個任務(wù)可能需要一段時間,如果其中一個消費(fèi)者處理一個長的任務(wù)并且只完成了一部分任務(wù),這時它突然掛掉了,會發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消息標(biāo)記為刪除。在這種情況下,突然有個消費(fèi)者掛掉了,我們就會丟失正在處理的消息。后續(xù)發(fā)送給該消費(fèi)者的消息也都會丟失。

????????為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。

? ? ? ? 消息應(yīng)答分為兩種,一種是自動應(yīng)答,一種是手動應(yīng)答。

2.2.2 自動應(yīng)答

????????消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,假設(shè)采用自動應(yīng)答的模式,如果消息在被消費(fèi)者接收到之前,出現(xiàn)了連接關(guān)閉或者 channel 關(guān)閉的情況,那么消息就丟失了。

? ? ? ? 假設(shè)消費(fèi)者接收的消息很多,且沒有對傳遞的消息數(shù)量進(jìn)行限制,這樣就有可能使得消費(fèi)者這邊由于接收太多還來不及處理的消息,導(dǎo)致這些消息的積壓,最終使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并以某種速率能夠處理這些消息的情況下使用。

? ? ? ? 下面方法里面的第二個參數(shù)如果設(shè)置為 true 則為自動應(yīng)答。

/**

* 消費(fèi)者消費(fèi)消息

* 1.消費(fèi)哪個隊列

* 2.消費(fèi)成功之后是否要自動應(yīng)答 true 代表自動應(yīng)答 false 手動應(yīng)答

* 3.消費(fèi)者未成功消費(fèi)的回調(diào)

* 4.消費(fèi)者取消消費(fèi)的回調(diào)

*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

2.2.3 手動應(yīng)答

????????手動應(yīng)答有三種模式,第一種是 Channel.basicAck 肯定確認(rèn) ,如下所示,RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了。

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println("接收到消息:"+receivedMessage);

/**

* 1、消息標(biāo)記 tag

* 2、false 表示只應(yīng)答接收到的傳遞消息;true 表示應(yīng)答所有傳遞過來的消息

* */

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

System.out.println("C1 消費(fèi)者啟動等待消費(fèi)......");

// 手動應(yīng)答

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);

}

????????第二種是?Channel.basicNack 否定確認(rèn),第三種是 Channel.basicReject??也是用于否定確認(rèn),Channel.basicReject 比?Channel.basicNack 少一個參數(shù),表示不處理該消息了直接拒絕,可以將其丟棄了。

2.2.4 消息自動重新入隊

????????如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個消費(fèi)者。這樣,即使某個消費(fèi)者偶爾死亡,也可以確保不會丟失任何消息。

2.2.5 消息手動應(yīng)答編碼

????????消息生產(chǎn)者代碼如下:

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

// 記得先創(chuàng)建下這個隊列,執(zhí)行下就可以了

public class Task01 {

private static final String QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();)

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i = 0; i < 2; i++) {

String message = "I am message " + i;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println("發(fā)送消息完成:" + message);

}

}

}

????????兩個消費(fèi)者的代碼如下所示,系統(tǒng)默認(rèn)消息采用的是自動應(yīng)答,所以我們要想實(shí)現(xiàn)消息消費(fèi)過程中不丟失,需要把自動應(yīng)答改為手動應(yīng)答。

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Worker01 {

private static final String QUEUE_NAME="ack_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("c1 等待接收消息時間較短");

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

try {

Thread.sleep(1000l);

} catch (InterruptedException e) {

e.printStackTrace();

}

/**

* 1、消息標(biāo)記 tag

* 2、false 表示只應(yīng)答接收到的傳遞消息;true 表示應(yīng)答所有傳遞過來的消息

* */

System.out.println("接收到消息:"+receivedMessage);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

// 手動應(yīng)答

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);

}

}

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Worker02 {

private static final String QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("c2 等待接收消息時間較長");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String receivedMessage = new String(delivery.getBody());

try {

Thread.sleep(30000l);

} catch (InterruptedException e) {

e.printStackTrace();

}

/**

* 1、消息標(biāo)記 tag

* 2、false 表示只應(yīng)答接收到的傳遞消息;true 表示應(yīng)答所有傳遞過來的消息

* */

System.out.println("接收到消息:" + receivedMessage);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

CancelCallback cancelCallback = (consumerTag) -> {

System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

// 手動應(yīng)答

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

}

}

2.2.6 結(jié)果展示

? ? ? ? 啟動消費(fèi)者和生產(chǎn)者,消費(fèi)者 1 由于阻塞時間短,很快的處理完消息,而消費(fèi)者 2 則會阻塞一會,等到 30s 之后才會開始處理消息,如下所示:

????????等過了一會之后,消息成功被消費(fèi)者 2 消費(fèi),如下所示:

????????如果消費(fèi)者 2 等待的過程中,手動將其關(guān)閉的話,會發(fā)生什么?可以發(fā)現(xiàn)消息被消費(fèi)者 1 消費(fèi)了,并沒有丟失消息。

2.3 RabbitMQ 持久化

2.3.1 概念

? ? ? ? 上一節(jié)我們學(xué)習(xí)了消息應(yīng)答可以保證處理的任務(wù)不會丟失,但是如何保障當(dāng) RabbitMQ 服務(wù)停掉以后消息生產(chǎn)者發(fā)送過來的消息不丟失呢?默認(rèn)情況下 RabbitMQ 退出或由于某種原因崩潰時,隊列和消息就都丟了。除非配置相關(guān)的參數(shù)。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標(biāo)記為持久化。

2.3.2 隊列持久化

????????之前我們創(chuàng)建的隊列都是非持久化的,rabbitmq 如果重啟的話,該隊列就會被刪除掉,如果要隊列實(shí)現(xiàn)持久化,需要在聲明隊列的時候把 durable 參數(shù)設(shè)置為持久化,如下所示:

// 設(shè)置消息隊列持久化

boolean durable=true;

channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

????????但是需要注意的就是如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除,或者重新創(chuàng)建一個持久化的隊列,不然就會出現(xiàn)錯誤

????????刪除完畢之后,再次啟動生產(chǎn)者,可以看到 D 表示的就是這個隊列是持久化的。

? ? ? ? 此時,即使重啟 rabbitmq 隊列也依然存在。?

2.3.3 消息持久化

????????將消息標(biāo)記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是這里依然存在當(dāng)消息剛準(zhǔn)備存儲在磁盤的時候但是還沒有存儲完,消息還在緩存的一個間隔點(diǎn)。此時并沒有真正寫入磁盤。持久性保證并不強(qiáng),但是對于我們的簡單任務(wù)隊列而言,這已經(jīng)綽綽有余了。如果需要更強(qiáng)有力的持久化策略,后面會詳細(xì)說明。

????????要想讓消息實(shí)現(xiàn)持久化需要在消息生產(chǎn)者修改代碼,添加如下這個屬性。

# 第三個參數(shù)為 null 表示不持久化

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

# 第三個參數(shù)設(shè)置為 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息要持久化

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

2.3.4 不公平分發(fā)

????????在最開始的時候我們學(xué)習(xí)到 RabbitMQ 分發(fā)消息采用的輪詢分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費(fèi)者在處理任務(wù),其中有個消費(fèi)者 1 處理任務(wù)的速度非???,而另外一個消費(fèi)者 2 處理速度卻很慢,這個時候如果我們還是采用輪詢分發(fā)的話,就會導(dǎo)致處理速度快的這個消費(fèi)者很大一部分時間處于空閑狀態(tài),而處理慢的那個消費(fèi)者一直在干活,這種分配方式在這種情況下其實(shí)就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進(jìn)行分發(fā)。

????????為了避免這種情況,我們可以使用不公平分發(fā),在消費(fèi)者代碼里面設(shè)置參數(shù) channel.basicQos(1) 即可。如下:

# 默認(rèn)等于0,表示輪詢,如果等于1的話就表示不公平分發(fā)

int prefetchCount = 1;

channel.basicQos(prefetchCount);

????????分別啟動消費(fèi)者和生產(chǎn)者,打開 rabbitmq 的管理界面如下:

????????意思就是如果這個任務(wù)我還沒有處理完或者我還沒有應(yīng)答你,你先別分配給我,我目前只能處理一個任務(wù),然后 rabbitmq 就會把該任務(wù)分配給沒有那么忙的那個空閑消費(fèi)者,當(dāng)然如果所有的消費(fèi)者都沒有完成手上任務(wù),隊列還在不停的添加新任務(wù),隊列有可能就會遇到隊列被撐滿的情況,這個時候就只能添加新的 worker 或者改變其他存儲任務(wù)的策略。?

2.3.5 預(yù)取值

? ? ? ? rabbitmq 本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel 上肯定不止只有一個消息,另外來自消費(fèi)者的手動確認(rèn)本質(zhì)上也是異步的。因此這里就存在一個未確認(rèn)的消息緩沖區(qū),因此希望開發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認(rèn)消息問題。

????????這個時候就可以通過使用 basic.qos 方法設(shè)置 “預(yù)取計數(shù)” 值來完成的。該值定義通道上允許的未確認(rèn)消息的最大數(shù)量。一旦數(shù)量達(dá)到配置的數(shù)量,RabbitMQ 將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認(rèn),類似于 channel 的最大容量。

????????例如,假設(shè)在通道上有未確認(rèn)的消息 5、6、7,8,并且通道的預(yù)取計數(shù)設(shè)置為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何消息,除非至少有一個未應(yīng)答的消息被 ack。比方說 tag=6 這個消息剛剛被確認(rèn) ACK,RabbitMQ 將會感知這個情況到并再發(fā)送一條消息。消息應(yīng)答和 QoS 預(yù)取值對用戶吞吐量有重大影響。

????????通常,增加預(yù)取值將提高向消費(fèi)者傳遞消息的速度。雖然自動應(yīng)答傳輸消息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的消息的數(shù)量也會增加,從而增加了消費(fèi)者的 RAM 消耗(隨機(jī)存取存儲器),應(yīng)該小心使用具有無限預(yù)處理的自動確認(rèn)模式或手動確認(rèn)模式,消費(fèi)者消費(fèi)了大量的消息如果沒有確認(rèn)的話,會導(dǎo)致消費(fèi)者連接節(jié)點(diǎn)的內(nèi)存消耗變大,所以找到合適的預(yù)取值是一個反復(fù)試驗(yàn)的過程。

????????不同的負(fù)載預(yù)取值也不同, 100 到 300 范圍內(nèi)的值通??商峁┳罴训耐掏铝?,并且不會給消費(fèi)者帶來太大的風(fēng)險。預(yù)取值為 1 是最保守的。當(dāng)然這將使吞吐量變得很低,特別是消費(fèi)者連接延遲很嚴(yán)重的情況下,特別是在消費(fèi)者連接等待時間較長的環(huán)境中。對于大多數(shù)應(yīng)用來說,稍微高一點(diǎn)的值將是最佳的。

????????說白了就是根據(jù)你的消費(fèi)者的性能優(yōu)劣,動態(tài)的去設(shè)置?channel ,性能好的,處理速度快的設(shè)置大些,性能不好的設(shè)置小些,通過?channel.basicQos 參數(shù)進(jìn)行設(shè)置,默認(rèn)等于 0 表示輪詢,等于 1 表示不公平分發(fā),設(shè)置成其他值則表示預(yù)約值,即消息緩沖區(qū)的大小。

三、發(fā)布確認(rèn)

3.1 發(fā)布確認(rèn)概念

? ? ? ? 我們在前面的幾個章節(jié)說過,為了防止數(shù)據(jù)丟失,我們可以將隊列和消息都進(jìn)行持久化的操作。但是,這還是不夠好的,如果生產(chǎn)者再向 rabbitmq 發(fā)送消息,而 rabbitmq 還沒來得及存儲到磁盤的時候崩了,消息就丟失了,這個時候該怎么辦?此時就引入了發(fā)布確認(rèn)的概念,即生產(chǎn)者向 rabbitmq 發(fā)送消息,而 rabbitmq 給生產(chǎn)者個反饋,無論是否可以成功的接收到消息,都給一個反饋,這樣就比較合理了。

3.2 發(fā)布確認(rèn)策略

3.2.1 開啟發(fā)布確認(rèn)

???發(fā)布確認(rèn)默認(rèn)是沒有開啟的,如果要開啟需要調(diào)用方法 confirmSelect,每當(dāng)你要想使用發(fā)布確認(rèn),都需要在 channel 上調(diào)用該方法。

// 只需要在創(chuàng)建完 channel 信道之后創(chuàng)建即可

channel.confirmSelect();

3.2.2 單個確認(rèn)發(fā)布

????????這是一種最簡單的同步確認(rèn)發(fā)布的方式,只有前面發(fā)送的消息被確認(rèn)發(fā)布了,后續(xù)的消息才能繼續(xù)發(fā)布。

????????這種確認(rèn)方式有一個最大的缺點(diǎn)就是:發(fā)布速度特別的慢,因?yàn)槿绻麤]有確認(rèn)發(fā)布的消息就會阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對于某些應(yīng)用程序來說這可能已經(jīng)足夠了。

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task01 {

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個隊列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName,false,false,false,null);

// 開啟發(fā)布確認(rèn)

channel.confirmSelect();

long begin = System.currentTimeMillis();

for(int i=0;i<1000;i++){

String message = i +"";

channel.basicPublish("",queueName,null,message.getBytes());

// 單個消息馬上進(jìn)行發(fā)布確認(rèn),可以通過返回值進(jìn)行判斷是否通知成功

boolean flag = channel.waitForConfirms();

}

long end = System.currentTimeMillis();

System.out.println("發(fā)布1000個單獨(dú)確認(rèn)消息,耗時:"+(end-begin)+"ms");

}

}

3.2.3 批量確認(rèn)發(fā)布

????????上面那種方式非常慢,與單個等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時,不知道是哪個消息出現(xiàn)問題了,我們必須將整個批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布。

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task02 {

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個隊列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName, false, false, false, null);

// 開啟發(fā)布確認(rèn)

channel.confirmSelect();

// 批量確認(rèn)消息大小

int batchSize = 100;

// 未確認(rèn)消息個數(shù)

int outstandingMessageCount = 0;

long begin = System.currentTimeMillis();

for (int i = 0; i < 1000; i++) {

String message = i + "";

channel.basicPublish("", queueName, null, message.getBytes());

outstandingMessageCount++;

if (outstandingMessageCount == batchSize) {

channel.waitForConfirms();

outstandingMessageCount = 0;

}

}

long end = System.currentTimeMillis();

System.out.println("發(fā)布1000個批量確認(rèn)消息,耗時:" + (end - begin) + "ms");

}

}

3.2.4 異步確認(rèn)發(fā)布

????????異步確認(rèn)雖然編程邏輯比上兩個要復(fù)雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調(diào)函數(shù)來達(dá)到消息可靠性傳遞的,這個中間件也是通過函數(shù)回調(diào)來保證是否投遞成功,下面就讓我們來詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的。

? ? ? ? 如下圖,首先消息生產(chǎn)者發(fā)送消息到隊列中(信道),數(shù)據(jù)在信道中的存儲類型類似于 map ,key 存儲消息序號,value 存儲具體的消息內(nèi)容。在信道里面會根據(jù) key 為消息排列順序,這樣的好處是消息推送的成功與否完全可以根據(jù) key 來識別出來。

? ? ? ?broker 為 rabbitmq 的消息實(shí)體,當(dāng)它接收到 1 號消息的時候,它就會進(jìn)行一次確認(rèn)收到的回調(diào)函數(shù),告訴消息生產(chǎn)者消息我收到了,如果沒有收到 1 號消息,它就會調(diào)用未收到消息的回調(diào)函數(shù)通知生產(chǎn)者,消息我沒有收到。

? ? ? ? 所以作為消息生產(chǎn)者,我只管一直發(fā)送消息即可,將來會由 broker 告訴我哪些消息接收到了,哪些消息沒有接收到。只需要將沒接收到的消息重新發(fā)送即可。收到的不做任何處理。且 broker 的通知類型為異步,速度會很快。

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmCallback;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task03 {

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個隊列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName, false, false, false, null);

// 開啟發(fā)布確認(rèn)

channel.confirmSelect();

// 開始的時間

long begin = System.currentTimeMillis();

// 消息確認(rèn)成功的回調(diào)函數(shù)

ConfirmCallback ackCallback = (deliveryTag,multiple)->{

System.out.println("確認(rèn)的消息:"+deliveryTag);

};

// 消息確認(rèn)失敗的回調(diào)函數(shù)

// 第一個參數(shù):消息的標(biāo)記。第二個參數(shù):是否為批量確認(rèn)

ConfirmCallback nackCallback = (deliveryTag,multiple)->{

System.out.println("未確認(rèn)的消息:"+deliveryTag);

};

// 添加一個消息的監(jiān)聽器,用于監(jiān)聽哪些消息成功了,哪些消息失敗了

// 異步通知

// 第一個參數(shù):監(jiān)聽哪些消息成功了。第二個參數(shù):監(jiān)聽哪些消息失敗了

channel.addConfirmListener(ackCallback,nackCallback);

// 批量發(fā)送消息

for(int i=0;i<1000;i++){

String message = i+"";

channel.basicPublish("",queueName,null,message.getBytes());

}

// 結(jié)束的時間

long end = System.currentTimeMillis();

System.out.println("發(fā)布1000個異步確認(rèn)消息,耗時:" + (end - begin) + "ms");

}

}

3.2.5 處理異步未確認(rèn)消息

????????最好的解決的解決方案就是把未確認(rèn)的消息放到一個基于內(nèi)存且能被發(fā)布線程訪問的隊列,比如說用 ConcurrentLinkedQueue, 這個隊列可以在 confirm callbacks 與發(fā)布線程之間進(jìn)行消息的傳遞。

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmCallback;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

import java.util.concurrent.ConcurrentNavigableMap;

import java.util.concurrent.ConcurrentSkipListMap;

public class Task03 {

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個隊列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName, false, false, false, null);

// 開啟發(fā)布確認(rèn)

channel.confirmSelect();

/** 創(chuàng)建一個線程安全有序的哈希表,適用于高并發(fā)的情況

* 1、將序號與消息進(jìn)行關(guān)聯(lián)

* 2、可以根據(jù)需要批量的刪除消息條目

* 3、支持高并發(fā)

*/

ConcurrentSkipListMap outstandingConfirms =new ConcurrentSkipListMap<>();

// 開始的時間

long begin = System.currentTimeMillis();

// 消息確認(rèn)成功的回調(diào)函數(shù)

ConfirmCallback ackCallback = (deliveryTag,multiple)->{

if(multiple){

// 第二步:刪除掉已經(jīng)確認(rèn)的消息,剩下的就是未確認(rèn)的消息

ConcurrentNavigableMap confirmed = outstandingConfirms.headMap(deliveryTag);

confirmed.clear();

}else{

outstandingConfirms.remove(deliveryTag);

}

System.out.println("確認(rèn)的消息:"+deliveryTag);

};

// 消息確認(rèn)失敗的回調(diào)函數(shù)

// 第一個參數(shù):消息的標(biāo)記。第二個參數(shù):是否為批量確認(rèn)

ConfirmCallback nackCallback = (deliveryTag,multiple)->{

// 第三步:打印未確認(rèn)的消息有哪些

String message = outstandingConfirms.get(deliveryTag);

System.out.println("未確認(rèn)的消息是:"+message+":::未確認(rèn)消息的標(biāo)記為:"+deliveryTag);

};

// 添加一個消息的監(jiān)聽器,用于監(jiān)聽哪些消息成功了,哪些消息失敗了

// 異步通知

// 第一個參數(shù):監(jiān)聽哪些消息成功了。第二個參數(shù):監(jiān)聽哪些消息失敗了

channel.addConfirmListener(ackCallback,nackCallback);

// 批量發(fā)送消息

for(int i=0;i<1000;i++){

String message = i+"";

// 第一步:記錄下所有要發(fā)送的消息

outstandingConfirms.put(channel.getNextPublishSeqNo(), message);

channel.basicPublish("",queueName,null,message.getBytes());

}

// 結(jié)束的時間

long end = System.currentTimeMillis();

System.out.println("發(fā)布1000個異步確認(rèn)消息,耗時:" + (end - begin) + "ms");

}

}

3.2.6 三種發(fā)布確認(rèn)速度對比

????????單獨(dú)發(fā)布消息:同步等待確認(rèn),簡單,但吞吐量非常有限。

????????批量發(fā)布消息:批量同步等待確認(rèn),簡單,合理的吞吐量,一旦出現(xiàn)問題但很難推斷出是那條消息出現(xiàn)了問題。

????????異步發(fā)布處理:最佳性能和資源使用,在出現(xiàn)錯誤的情況下可以很好地控制,但是實(shí)現(xiàn)起來稍微難些。

四、Exchanges 交換機(jī)

????????我們在前面講解了 Hello World 模式 和?Work Queues 模式,這兩種都屬于簡單的隊列模式,即生產(chǎn)者將消息直接推送到 rabbitmq 的隊列里面,消費(fèi)者直接從隊列里面競爭消費(fèi)。

? ? ? ? 我們接下來要講解的模式為”發(fā)布/訂閱“,即生產(chǎn)者生產(chǎn)完消息,只有它的訂閱者才可以獲取消息,想要使用這種模式就需要先認(rèn)識一下交換機(jī)。

4.1 Exchanges

4.1.1 Exchanges 概念

????????RabbitMQ 消息傳遞模型的核心思想是:生產(chǎn)者生產(chǎn)的消息不會直接發(fā)送到隊列。而實(shí)際上生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。

????????相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡單,一方面接收來自生產(chǎn)者的消息,另一方面將它們推入隊列。交換機(jī)必須清楚的知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊列、還是說把他們放到許多隊列中、還是說應(yīng)該丟棄它們。這就需要由交換機(jī)的類型來決定。

4.1.2 Exchanges 類型

????????總共包含以下幾種類型:直連交換機(jī)(direct)、主題交換機(jī)(topic)、頭交換機(jī)(headers)、扇出交換機(jī)(fanout)。

4.1.3 無名 Exchanges

????????在學(xué)習(xí)本章節(jié)之前我們對 exchange 一無所知,但仍然能夠?qū)⑾l(fā)送到隊列。之前能實(shí)現(xiàn)的原因是因?yàn)槲覀兪褂玫氖悄J(rèn)交換機(jī),是通過空字符串 (“”) 進(jìn)行標(biāo)識。

# 第一個參數(shù)是交換機(jī)的名稱。空字符串表示默認(rèn)或無名稱交換機(jī)

channel.basicPublish("",queueName,null,message.getBytes());

????????這個 AMQP default 就是默認(rèn)的交換機(jī),他的類型是直連交換機(jī)。?

4.2 臨時隊列

????????我們在前兩章的學(xué)習(xí)中創(chuàng)建了 hello 隊列和 ack_queue 隊列,這些都是指定名稱的隊列,隊列的名稱很重要,因?yàn)槲覀冃枰鶕?jù)隊列的名稱來指定消費(fèi)者去哪里消費(fèi)。

? ? ? ? 在實(shí)際的生產(chǎn)環(huán)境中,我們一般都會創(chuàng)建一個隨機(jī)名稱的臨時隊列,它的特點(diǎn)是:當(dāng)消費(fèi)者斷開連接之后,該隊列會被自動刪除,創(chuàng)建臨時隊列的代碼如下:

String queueName = channel.queueDeclare().getQueue();

????????創(chuàng)建之后的樣子如下:?

4.3 bindings 綁定

????????binding 是 exchange 和 queue 之間的橋梁,它告訴?exchange 和哪個隊列進(jìn)行了綁定。比如說下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進(jìn)行了綁定。

4.4 Fanout exchange?扇出交換機(jī)

4.4.1 簡介

? ? ? ? 路由廣播的形式,這個交換機(jī)沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機(jī)在接收到消息后,會直接轉(zhuǎn)發(fā)到綁定到它上面的所有隊列。

? ? ? ? 路由鍵就是 RoutingKey,即扇出交換機(jī)不需要配置?RoutingKey,只需要將交換機(jī)和隊列綁定即可。

4.4.2 Fanout 實(shí)戰(zhàn)

? ? ? ? 我們根據(jù)下圖來編寫代碼實(shí)現(xiàn)下:EmitLog 作為生產(chǎn)者,將消息發(fā)送到名字為 logs 的?fanout 交換機(jī)上,而交換機(jī)綁定了兩個隊列,隊列里面的消息又分別被 ReceiveLogs01 和?ReceiveLogs02 接收。

????????編寫兩個消費(fèi)者的代碼,如下所示:

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs01 {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個交換機(jī),類型為 fanout

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

// 聲明一個臨時隊列,隊列名稱隨機(jī),好處是當(dāng)消費(fèi)者與隊列斷開連接的時候,隊列就自動刪除了

String queueName = channel.queueDeclare().getQueue();

// 交換機(jī)和隊列進(jìn)行綁定,第三個參數(shù) routingKey,有沒有值都是被無視的

channel.queueBind(queueName,EXCHANGE_NAME,"");

System.out.println("消費(fèi)者1等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("控制臺打印接收到的消息"+message);

};

channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});

}

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs02 {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個交換機(jī),類型為 fanout

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

// 聲明一個臨時隊列,隊列名稱隨機(jī),好處是當(dāng)消費(fèi)者與隊列斷開連接的時候,隊列就自動刪除了

String queueName = channel.queueDeclare().getQueue();

// 交換機(jī)和隊列進(jìn)行綁定,第三個參數(shù) routingKey,有沒有值都是被無視的

channel.queueBind(queueName,EXCHANGE_NAME,"");

System.out.println("消費(fèi)者2等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("控制臺打印接收到的消息"+message);

};

channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});

}

}

????????編寫生產(chǎn)者的代碼,如下所示,作為生產(chǎn)者,只需要知道將消息發(fā)送給哪個交換機(jī)即可。

// 負(fù)責(zé)發(fā)送消息給交換機(jī)

public class EmitLog {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

for(int i=0;i<10;i++) {

String message = i + "";

channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes());

System.out.println("生產(chǎn)者發(fā)送的消息為:"+message);

}

}

}

????????分別啟動兩個消費(fèi)者和生產(chǎn)者,結(jié)果如下所示,可以發(fā)現(xiàn)兩個消費(fèi)者都分別獲取到了生產(chǎn)者生產(chǎn)的消息。

4.5?Direct exchange 直連交換機(jī)

4.5.1 簡介

????????RabbitMQ?默認(rèn)的交換機(jī)模式,也是最簡單的模式。初始化時隊列綁定到一個直連交換機(jī)上,同時賦予一個路由鍵?BindingKey。當(dāng)發(fā)送者發(fā)送消息的時候它會攜帶著路由值?Key。當(dāng)?Key?和消息隊列的?BindingKey?一致的時候,消息將會被發(fā)送到該消息隊列中。

4.5.2 多重綁定

????????在下面這張圖中,我們可以看到 X 交換機(jī)綁定了兩個隊列,綁定類型是 direct。隊列 Q1 綁定鍵為 orange,隊列 Q2 綁定鍵有兩個:一個綁定鍵為 black,另一個綁定鍵為 green。

????????在這種綁定情況下,生產(chǎn)者發(fā)布消息到 exchange 上,綁定鍵為 orange 的消息會被發(fā)布到隊列 Q1。綁定鍵為 black 和 green 的消息會被發(fā)布到隊列 Q2,其他消息類型的消息將被丟棄。

4.5.3 Direct 實(shí)戰(zhàn)

? ? ? ? 我們根據(jù)下圖來編寫代碼實(shí)現(xiàn)下:P?作為生產(chǎn)者,將消息發(fā)送到名字為 X?的 direct?交換機(jī)上,交換機(jī) X 根據(jù) RoutingKey 為 error 綁定 disk 隊列、交換機(jī) X 根據(jù) RoutingKey 為 info 和 warning?綁定 console?隊列, disk 隊列里面的消息被 C2 接收,console 隊列里面的消息被 C1?接收。

????????編寫兩個消費(fèi)者的代碼,如下所示:

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs01 {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="direct_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個交換機(jī),類型為直連

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 聲明一個名字為 console 的隊列

channel.queueDeclare("console",false,false,false,null);

// 交換機(jī)和隊列進(jìn)行綁定,routingKey 為 info

channel.queueBind("console",EXCHANGE_NAME,"info");

// 針對同一個隊列,再寫一行即可實(shí)現(xiàn)多重綁定,routingKey 為 warning

channel.queueBind("console",EXCHANGE_NAME,"warning");

System.out.println("消費(fèi)者1等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("控制臺打印接收到的消息"+message);

};

channel.basicConsume("console",true,deliverCallback,consumerTag ->{});

}

}

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs02 {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="direct_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個交換機(jī),類型為直連

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 聲明一個名字為 disk 的隊列

channel.queueDeclare("disk",false,false,false,null);

// 交換機(jī)和隊列進(jìn)行綁定,routingKey 為 error

channel.queueBind("disk",EXCHANGE_NAME,"error");

System.out.println("消費(fèi)者2等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("控制臺打印接收到的消息"+message);

};

channel.basicConsume("disk",true,deliverCallback,consumerTag ->{});

}

}

????????編寫生產(chǎn)者的代碼,如下所示,作為生產(chǎn)者,只需要知道將消息發(fā)送給哪個交換機(jī)即可。

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

// 負(fù)責(zé)發(fā)送消息給交換機(jī)

public class DirectLogs {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="direct_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

for(int i=0;i<10;i++) {

String message = i + "";

// 程序需要運(yùn)行三遍,將數(shù)據(jù)分別發(fā)送給不同的 Routing Key 的交換機(jī)

channel.basicPublish(EXCHANGE_NAME,"error",null, message.getBytes());

// channel.basicPublish(EXCHANGE_NAME,"info",null, message.getBytes());

// channel.basicPublish(EXCHANGE_NAME,"warning",null, message.getBytes());

System.out.println("生產(chǎn)者發(fā)送的消息為:"+message);

}

}

}

????????分別啟動兩個消費(fèi)者和生產(chǎn)者,結(jié)果如下所示,可以發(fā)現(xiàn)兩個消費(fèi)者都分別獲取到了生產(chǎn)者生產(chǎn)的消息。

4.6 Topics 主題交換機(jī)

4.6.1 簡介

????????主題交換機(jī),轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符 + 字符串),而當(dāng)發(fā)送消息的時候,只有指定的 Key 和該模式相匹配的時候,消息才會被發(fā)送到該消息隊列中。

4.6.2 要求

????????topic 交換機(jī)的 routing_key 不能隨意寫,它必須是一個單詞列表,以點(diǎn)號分隔開。這些單詞可以是任意單詞,比如說:"stock.usd.nyse","nyse.vmw",?"quick.orange.rabbit" 這種類型的。當(dāng)然這個單詞列表最多不能超過 255 個字節(jié)。

? ? ? ? * 可以代替一個單詞。

? ? ? ? # 可以代替零個或多個單詞。

4.6.3 Topic 實(shí)戰(zhàn)

? ? ? ? 下圖是一個隊列綁定關(guān)系圖,我們可以發(fā)現(xiàn) Q1 隊列綁定的是:一共有?3 個單詞且中間是?orange 的字符串。Q2 隊列綁定的是:最后一個單詞是 rabbit 的 3 個單詞和第一個單詞是 lazy 的多個單詞。

????????我們分別測試底下這些 routing_key 和我們預(yù)想的結(jié)果是否一致。?

Routing key綁定關(guān)系quick.orange.rabbit被隊列 Q1 和 Q2 接收到lazy.orange.elephant被隊列 Q1 和 Q2 接收到quick.orange.fox被隊列 Q1 接收到lazy.brown.fox被隊列 Q2 接收到lazy.pink.rabbit雖然滿足兩個綁定但只被隊列 Q2 接收一次quick.brown.fox不匹配任何綁定會被丟棄quick.orange.male.rabbit不匹配任何綁定會被丟棄lazy.orange.male.rabbit被隊列 Q2 接收到

????????兩個消費(fèi)者的代碼如下所示:

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogsTopic01 {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="topic_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個交換機(jī),類型為直連

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 聲明一個名字為 Q1 的隊列

String queueName = "Q1";

channel.queueDeclare(queueName,false,false,false,null);

// 交換機(jī)和隊列進(jìn)行綁定,routingKey 為 *.orange.*

channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");

System.out.println("消費(fèi)者1等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("接收隊列:"+queueName+"綁定的鍵值為:"+delivery.getEnvelope().getRoutingKey()+"::消息為:"+message);

};

channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});

}

}

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogsTopic02 {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME="topic_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個交換機(jī),類型為直連

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 聲明一個名字為 Q2 的隊列

String queueName = "Q2";

channel.queueDeclare(queueName,false,false,false,null);

// 交換機(jī)和隊列進(jìn)行綁定,routingKey 為 *.orange.*

channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");

// 針對同一個隊列,再寫一行即可實(shí)現(xiàn)多重綁定,routingKey 為 lazy.#

channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");

System.out.println("消費(fèi)者2等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("接收隊列:"+queueName+"綁定的鍵值為:"+delivery.getEnvelope().getRoutingKey()+"::消息為:"+message);

};

channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});

}

}

????????編寫生產(chǎn)者的代碼,如下所示,作為生產(chǎn)者,只需要知道將消息發(fā)送給哪個交換機(jī)即可。

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;

import java.util.Map;

// 負(fù)責(zé)發(fā)送消息給交換機(jī)

public class TopicLogs {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {

Map map = new HashMap<>();

map.put("quick.orange.rabbit", "被隊列 Q1 和 Q2 接收到");

map.put("lazy.orange.elephant", "被隊列 Q1 和 Q2 接收到");

map.put("quick.orange.fox", "被隊列 Q1 接收到");

map.put("lazy.brown.fox", "被隊列 Q2 接收到");

map.put("lazy.pink.rabbit", "雖然滿足兩個綁定但只被隊列 Q2 接收一次");

map.put("quick.brown.fox", "不匹配任何綁定會被丟棄");

map.put("quick.orange.male.rabbit", "不匹配任何綁定會被丟棄");

map.put("lazy.orange.male.rabbit", "被隊列 Q2 接收到");

Channel channel = RabbitMqUtils.getChannel();

for(Map.Entry entry:map.entrySet()){

String bindingKey = entry.getKey();

String message = entry.getValue();

channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes());

System.out.println("生產(chǎn)者發(fā)出消息:"+message);

}

}

}

????????結(jié)果如下所示,是可以和表格里面的數(shù)據(jù)對上的。

五、死信隊列

5.1 概念

????????死信,顧名思義就是無法被消費(fèi)的消息。一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時候由于特定的原因?qū)е?queue 中的某些消息無法被消費(fèi),這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。

????????應(yīng)用場景:比如為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時,將消息投入死信隊列中。還比如說: 用戶在商城下單成功并點(diǎn)擊去支付后在指定時間未支付時自動失效。

5.2 來源

? ? ? ? 來源一:消息 TTL 過期(time to live)

? ? ? ? 來源二:隊列達(dá)到最大長度(隊列滿了,無法再添加數(shù)據(jù)到 mq 中)

? ? ? ? 來源三:消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false

5.3 死信實(shí)戰(zhàn)

5.3.1 代碼架構(gòu)圖

? ? ? ? 如下圖所示,Producer 生產(chǎn)者發(fā)送消息到類型為 direct 的 normal_exchange 交換機(jī)上,而交換機(jī)又通過 routingKey 為 zhangsan 和 normal-queue 隊列進(jìn)行綁定,消息最終被消費(fèi)者 C1 消費(fèi)。

????????當(dāng) normal-queue 隊列滿足 消息被拒絕、消息 TTL 過期或者隊列達(dá)到最大長度的這三個條件之一時,會將消息轉(zhuǎn)發(fā)到一個類型為 direct 的 dead_exchange 交換機(jī)上,而 dead_exchange 又通過?routingKey 為 lisi 和 dead-queue 隊列進(jìn)行綁定,最終消息被 C2 消費(fèi)者消費(fèi)。

5.3.2 消息 TTL 過期

????????根據(jù)上面的架構(gòu)圖,我們看下滿足 消息TTL 過期 條件觸發(fā)的死信隊列該如何實(shí)現(xiàn)。消費(fèi)者?Consumer01和消費(fèi)者?Consumer02 代碼如下所示:

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;

import java.util.Map;

public class Consumer01 {

// 設(shè)置普通交換機(jī)的名字

public static final String NORMAL_EXCHANGE="normal_exchange";

// 設(shè)置死信交換機(jī)的名字

public static final String DEAD_EXCHANGE="dead_exchange";

// 設(shè)置普通隊列的名字

public static final String NORMAL_QUEUE="normal_queue";

// 設(shè)置死信隊列的名字

public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個普通交換機(jī),類型為直連

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明一個死信普通交換機(jī),類型為直連

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

Map arguments = new HashMap<>();

// 正常隊列設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);

// 設(shè)置死信 RoutingKey

arguments.put("x-dead-letter-routing-key","lisi");

// 設(shè)置過期時間,單位為 ms,一般情況下不在這個地方配置,都在生產(chǎn)者代碼里配置

// arguments.put("x-message-ttl",10000);

// 聲明一個普通隊列

channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

// 聲明一個死信隊列

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

// 綁定普通交換機(jī)和普通隊列

channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

// 綁定死信交換機(jī)和死信隊列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("消費(fèi)者1等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer01 接收的消息是:"+message);

};

channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag ->{});

}

}

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Consumer02 {

// 設(shè)置死信交換機(jī)的名字

public static final String DEAD_EXCHANGE="dead_exchange";

// 設(shè)置死信隊列的名字

public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個死信普通交換機(jī),類型為直連

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明一個死信隊列

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

// 綁定死信交換機(jī)和死信隊列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("消費(fèi)者2等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer02 接收的消息是:"+message);

};

channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag ->{});

}

}

????????生產(chǎn)者的代碼如下所示:

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

public class Producer {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME = "normal_exchange";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 死信消息,設(shè)置 TTL 時間,單位為 ms

AMQP.BasicProperties properties =

new AMQP.BasicProperties()

.builder().expiration("10000").build();

for(int i=1;i<11;i++) {

String message = "info"+i;

channel.basicPublish(EXCHANGE_NAME,"zhangsan",properties, message.getBytes());

System.out.println("生產(chǎn)者發(fā)送的消息為:"+message);

}

}

}

????????首先啟動消費(fèi)者?Consumer01 用于創(chuàng)建交換機(jī)和隊列,可以通過 rabbitmq 的管理界面查看是否創(chuàng)建成功,如下所示:

????????查看下死信隊列和普通隊列的綁定關(guān)系,如下圖所示:

????????此時為了模擬消息 TTL 過期的場景,需要先關(guān)閉消費(fèi)者?Consumer01,執(zhí)行生產(chǎn)者 Producer 的代碼。

? ? ? ? 執(zhí)行完生產(chǎn)者之后,可以看到數(shù)據(jù)都存儲在 normal_queue 隊列里面等待消費(fèi)者的消費(fèi)。?

?????????等待 10s 之后,再次查看 rabbitmq 的管理界面,可以發(fā)現(xiàn)消息被存放到了死信隊列里面,如下圖。

????????此時執(zhí)行消費(fèi)者?Consumer02 ,如下圖,可以看到,消息都被死信隊列進(jìn)行接收處理了。

5.3.3 隊列達(dá)到最大長度

????????首先,先在?rabbitmq 的管理界面刪除掉我們上一節(jié)創(chuàng)建的兩個交換機(jī)和隊列,它的存在會對我們接下來的測試有影響。

? ? ? ? 需要先將消費(fèi)者 Producer 代碼去掉 TTL 屬性,如下所示:

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

public class Producer {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME = "normal_exchange";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

for(int i=1;i<11;i++) {

String message = "info"+i;

channel.basicPublish(EXCHANGE_NAME,"zhangsan",null, message.getBytes());

System.out.println("生產(chǎn)者發(fā)送的消息為:"+message);

}

}

}

????????修改 Customer01 的代碼如下所示,僅僅添加了一個隊列長度的限制。而 Customer02 的代碼不需要改動。

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;

import java.util.Map;

public class Consumer01 {

// 設(shè)置普通交換機(jī)的名字

public static final String NORMAL_EXCHANGE="normal_exchange";

// 設(shè)置死信交換機(jī)的名字

public static final String DEAD_EXCHANGE="dead_exchange";

// 設(shè)置普通隊列的名字

public static final String NORMAL_QUEUE="normal_queue";

// 設(shè)置死信隊列的名字

public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個普通交換機(jī),類型為直連

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明一個死信普通交換機(jī),類型為直連

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

Map arguments = new HashMap<>();

// 正常隊列設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);

// 設(shè)置死信 RoutingKey

arguments.put("x-dead-letter-routing-key","lisi");

// 設(shè)置正常隊列的長度限制

arguments.put("x-max-length",6);

// 聲明一個普通隊列

channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

// 聲明一個死信隊列

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

// 綁定普通交換機(jī)和普通隊列

channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

// 綁定死信交換機(jī)和死信隊列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("消費(fèi)者1等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer01 接收的消息是:"+message);

};

channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag ->{});

}

}

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Consumer02 {

// 設(shè)置死信交換機(jī)的名字

public static final String DEAD_EXCHANGE="dead_exchange";

// 設(shè)置死信隊列的名字

public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個死信普通交換機(jī),類型為直連

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明一個死信隊列

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

// 綁定死信交換機(jī)和死信隊列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("消費(fèi)者2等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer02 接收的消息是:"+message);

};

channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag ->{});

}

}

? ? ? ? 還是老樣子,先啟動?Customer01,然后關(guān)掉?Customer01 ,啟動生產(chǎn)者 Producer,如下可以看到,一共發(fā)送了 10 條消息,其中 6 條消息在 normal_queue 里面,剩下的 4 條消息在 dead_queue 里面。

????????再次啟動 Customer01 和 Customer02 ,他們分別消費(fèi)了自己隊列里面的消息,如下,隊列的特點(diǎn)是先進(jìn)先出,所以將前面存儲的消息擠進(jìn)了死信隊列。

5.3.4 消息被拒絕

????????首先,先在?rabbitmq 的管理界面刪除掉我們上一節(jié)創(chuàng)建的兩個交換機(jī)和隊列,它的存在會對我們接下來的測試有影響。

? ? ? ? 消息的生產(chǎn)者 Producer 和消費(fèi)者 Customer02 的代碼不變,如下所示:

import com.rabbitmq.client.Channel;

import com.rabbitmq.utils.RabbitMqUtils;

public class Producer {

// 設(shè)置交換機(jī)的名字

public static final String EXCHANGE_NAME = "normal_exchange";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

for(int i=1;i<11;i++) {

String message = "info"+i;

channel.basicPublish(EXCHANGE_NAME,"zhangsan",null, message.getBytes());

System.out.println("生產(chǎn)者發(fā)送的消息為:"+message);

}

}

}

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

public class Consumer02 {

// 設(shè)置死信交換機(jī)的名字

public static final String DEAD_EXCHANGE="dead_exchange";

// 設(shè)置死信隊列的名字

public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個死信普通交換機(jī),類型為直連

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明一個死信隊列

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

// 綁定死信交換機(jī)和死信隊列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("消費(fèi)者2等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer02 接收的消息是:"+message);

};

channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag ->{});

}

}

????????修改生產(chǎn)者 Customer01 的代碼如下所示,修改為手動應(yīng)答,并拒絕指定的消息。

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;

import java.util.Map;

public class Consumer01 {

// 設(shè)置普通交換機(jī)的名字

public static final String NORMAL_EXCHANGE="normal_exchange";

// 設(shè)置死信交換機(jī)的名字

public static final String DEAD_EXCHANGE="dead_exchange";

// 設(shè)置普通隊列的名字

public static final String NORMAL_QUEUE="normal_queue";

// 設(shè)置死信隊列的名字

public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

// 聲明一個普通交換機(jī),類型為直連

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明一個死信普通交換機(jī),類型為直連

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

Map arguments = new HashMap<>();

// 正常隊列設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);

// 設(shè)置死信 RoutingKey

arguments.put("x-dead-letter-routing-key","lisi");

// 聲明一個普通隊列

channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

// 聲明一個死信隊列

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

// 綁定普通交換機(jī)和普通隊列

channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

// 綁定死信交換機(jī)和死信隊列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("消費(fèi)者1等待接收消息.....");

// 接收消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

if(message.equals("info5")){

System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");

// 第二個參數(shù) request 設(shè)置為 false,表示拒絕重新入隊,該隊列如果設(shè)置了死信交換機(jī)則將發(fā)送到死信隊列中

channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);

}else{

System.out.println("Consumer01 接收到消息"+message);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

};

boolean autoAck = false;

channel.basicConsume(NORMAL_QUEUE,autoAck,deliverCallback,consumerTag ->{});

}

}

????????首先啟動消費(fèi)者 Customer01,然后關(guān)閉??Customer01,啟動生產(chǎn)者 Producer,最后分別啟動生產(chǎn)者 Customer01 和 Customer02,如下所示:

六、延遲隊列

6.1 概念

????????延時隊列其實(shí)就是死信隊列當(dāng)中的?“消息TTL 過期” 這一種,這個可能不是太好理解,我們還是分析下我們前面講過的那張架構(gòu)圖,如下。

? ? ? ? 生產(chǎn)者發(fā)送消息給 normal_exchange 交換機(jī),而交換機(jī)把數(shù)據(jù)放到 normal-queue 里面,如果消費(fèi)者 C1 始終無法工作的話,一旦消息過期,就肯定會成為死信,最終被 C2 消費(fèi)。

? ? ? ? 這里面就存在一個問題,生產(chǎn)者生產(chǎn)消息到被 C2 消費(fèi)一共經(jīng)歷了多長時間?我們當(dāng)初配置的是 10s ,即生產(chǎn)者生產(chǎn)完消息 10s 之后才被轉(zhuǎn)發(fā)到消費(fèi)者 C2 這里進(jìn)行消費(fèi)。

? ? ? ? 站在生產(chǎn)者和 C2 的角度上來看,它們兩個經(jīng)歷的時間就是 10s,其實(shí)這個 10s 就是一個延遲。假設(shè) C1 永久消失,那么對于生產(chǎn)者和 C2 來說,他們之間的消息就是延遲消息,延遲為 10s。下圖就是沒有 C1 消費(fèi)者了,只有 C2 消費(fèi)者。

? ? ? ? 延遲隊列,隊列內(nèi)部是有序的,最重要的特點(diǎn)是體現(xiàn)在它的延遲屬性上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單的說,延遲隊列就是用來存放需要在指定時間處理元素的隊列。

6.2 使用場景

? ? ? ? 1、訂單在十分鐘之內(nèi)未支付則自動取消

? ? ? ? 2、新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。

? ? ? ? 3、用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。

? ? ? ? 4、用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。

? ? ? ? 5、預(yù)定會議后,需要在預(yù)定的時間點(diǎn)前十分鐘通知各個與會人員參加會議

? ? ? ? 以上的這些場景都有一個特點(diǎn),需要在某個事件發(fā)生之后或者之前的指定時間點(diǎn)完成某一項任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;看起來似乎使用定時任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對于“如果賬單一周內(nèi)未支付則進(jìn)行自動結(jié)算”這樣的需求,如果對于時間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個可行的方案。

????????但對于數(shù)據(jù)量比較大,并且時效性較強(qiáng)的場景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會有很多,活動期間甚至?xí)_(dá)到百萬甚至千萬級別,對這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無法完成所有訂單的檢查,同時會給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下。所以延遲隊列就是我們目前最好的選擇。

6.3 整合 springboot

6.3.1 創(chuàng)建項目

6.3.2 添加依賴

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.3.11.RELEASE

com

springboot-rabbitmq

0.0.1-SNAPSHOT

springboot-rabbitmq

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

com.alibaba

fastjson

1.2.47

org.projectlombok

lombok

io.springfox

springfox-swagger2

2.9.2

io.springfox

springfox-swagger-ui

2.9.2

org.springframework.amqp

spring-rabbit-test

test

org.springframework.boot

spring-boot-maven-plugin

6.3.3 修改配置文件

spring.rabbitmq.host=192.168.229.149

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=123

6.4 隊列 TTL

6.4.1 代碼架構(gòu)圖

? ? ? ? 我們的前期準(zhǔn)備工作已經(jīng)準(zhǔn)備完了,接下來我們根據(jù)底下的這個代碼架構(gòu)圖來實(shí)現(xiàn)下延遲隊列,如下所示。

????????消費(fèi)者 P 將消息推送給類型為 direct 的 X 交換機(jī),X 交換機(jī)通過 routingKey 為 XA 綁定 QA 隊列,并設(shè)置消息存活時間為 10s;X 交換機(jī)通過 routingKey 為 XB?綁定 QB?隊列,并設(shè)置消息存活時間為 40s。當(dāng) QA 隊列里面的消息超過存活時間之后,會將消息轉(zhuǎn)發(fā)給類型為 direct 的 Y 交換機(jī),routingKey 為 YD。當(dāng) QB?隊列里面的消息超過存活時間之后,也會將消息轉(zhuǎn)發(fā)給類型為 direct 的?Y 交換機(jī),routingKey 也為 YD。Y?交換機(jī)通過 routingKey 為 YD?綁定 QD?隊列,最終被消費(fèi)者 C 消費(fèi)。

6.4.2 配置文件類代碼

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class TtlQueueConfig {

// 普通交換機(jī)的名稱

public static final String X_EXCHANGE ="X";

// 死信交換機(jī)的名稱

public static final String Y_DEAD_LETTER__EXCHANGE ="Y";

// 普通隊列的名稱

public static final String QUEUE_A = "QA";

public static final String QUEUE_B = "QB";

// 死信隊列的名稱

public static final String DEAD_LETTER_QUEUE_D = "QD";

// 聲明 xExchange

@Bean("xExchange")

public DirectExchange xExchange(){

return new DirectExchange(X_EXCHANGE);

}

// 聲明 yExchange

@Bean("yExchange")

public DirectExchange yExchange(){

return new DirectExchange(Y_DEAD_LETTER__EXCHANGE);

}

// 聲明普通隊列A,ttl 為 10s

@Bean("queueA")

public Queue queueA(){

Map arguments = new HashMap<>();

// 設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER__EXCHANGE);

// 設(shè)置死信 routingKey

arguments.put("x-dead-letter-routing-key","YD");

// 設(shè)置過期時間,單位為 ms

arguments.put("x-message-ttl",10000);

return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();

}

// 聲明普通隊列B,ttl 為 40s

@Bean("queueB")

public Queue queueB(){

Map arguments = new HashMap<>();

// 設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER__EXCHANGE);

// 設(shè)置死信 routingKey

arguments.put("x-dead-letter-routing-key","YD");

// 設(shè)置過期時間,單位為 ms

arguments.put("x-message-ttl",40000);

return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

}

// 聲明死信隊列

@Bean("queueD")

public Queue queueD(){

return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();

}

// 將普通隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueABindingX(@Qualifier("queueA") Queue queueA,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}

// 將普通隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueB).to(xExchange).with("XB");

}

// 將死信隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,

@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");

}

}

6.4.3 生產(chǎn)者代碼

// 發(fā)送延遲消息

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{message}")

public void sendMsg(@PathVariable String message){

log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個 TTL 隊列:{}", new Date(), message);

rabbitTemplate.convertAndSend("X","XA","我是來自 ttl 為10s的數(shù)據(jù):"+message);

rabbitTemplate.convertAndSend("X","XB","我是來自 ttl 為40s的數(shù)據(jù):"+message);

}

}

6.4.4 消費(fèi)者代碼

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.util.Date;

// 隊列 ttl 的消費(fèi)者

@Slf4j

@Component

public class DeadLetterConsumer {

// 接收消息

@RabbitListener(queues="QD")

public void receiveD(Message message, Channel channel) throws Exception{

String msg = new String(message.getBody());

log.info("當(dāng)前時間:{},收到死信隊列信息{}", new Date().toString(), msg);

}

}

????????發(fā)送一個請求:http://localhost:8080/ttl/sendMsg/我愛你?

????????第一條消息在 10S 后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在 40S 之后變成了死信消息,然后被消費(fèi)掉,這樣一個延時隊列就打造完成了。?

????????不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這里只有 10S 和 40S 兩個時間選項,如果需要一個小時后處理,那么就需要增加 TTL 為一個小時的隊列,如果是預(yù)定會議室然后提前通知這樣的場景,豈不是要增加無數(shù)個隊列才能滿足需求?

6.5 延時隊列優(yōu)化

6.5.1 代碼架構(gòu)圖

? ? ? ? 假設(shè)此時我們新增加了一個 QC 隊列,該隊列不設(shè)置 TTL 時間,那該如何設(shè)置消息過期?在生產(chǎn)者發(fā)送消息的時候設(shè)置過期時間即可。

6.5.2 配置文件類代碼

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class TtlQueueConfig {

// 普通交換機(jī)的名稱

public static final String X_EXCHANGE ="X";

// 死信交換機(jī)的名稱

public static final String Y_DEAD_LETTER__EXCHANGE ="Y";

// 普通隊列的名稱

public static final String QUEUE_A = "QA";

public static final String QUEUE_B = "QB";

// 死信隊列的名稱

public static final String DEAD_LETTER_QUEUE_D = "QD";

// 普通隊列的名稱

public static final String QUEUE_C = "QC";

// 聲明 xExchange

@Bean("xExchange")

public DirectExchange xExchange(){

return new DirectExchange(X_EXCHANGE);

}

// 聲明 yExchange

@Bean("yExchange")

public DirectExchange yExchange(){

return new DirectExchange(Y_DEAD_LETTER__EXCHANGE);

}

// 聲明普通隊列A,ttl 為 10s

@Bean("queueA")

public Queue queueA(){

Map arguments = new HashMap<>();

// 設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER__EXCHANGE);

// 設(shè)置死信 routingKey

arguments.put("x-dead-letter-routing-key","YD");

// 設(shè)置過期時間,單位為 ms

arguments.put("x-message-ttl",10000);

return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();

}

// 聲明普通隊列B,ttl 為 40s

@Bean("queueB")

public Queue queueB(){

Map arguments = new HashMap<>();

// 設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER__EXCHANGE);

// 設(shè)置死信 routingKey

arguments.put("x-dead-letter-routing-key","YD");

// 設(shè)置過期時間,單位為 ms

arguments.put("x-message-ttl",40000);

return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

}

// 聲明死信隊列

@Bean("queueD")

public Queue queueD(){

return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();

}

// 聲明普通隊列C,不再設(shè)置 ttl

@Bean("queueC")

public Queue queueC(){

Map arguments = new HashMap<>();

// 設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER__EXCHANGE);

// 設(shè)置死信 routingKey

arguments.put("x-dead-letter-routing-key","YD");

return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();

}

// 將普通隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueABindingX(@Qualifier("queueA") Queue queueA,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}

// 將普通隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueB).to(xExchange).with("XB");

}

// 將死信隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,

@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");

}

// 將普通隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueC).to(xExchange).with("XC");

}

}

6.5.3? 生產(chǎn)者代碼

// 發(fā)送延遲消息

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{message}")

public void sendMsg(@PathVariable String message){

log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個 TTL 隊列:{}", new Date(), message);

rabbitTemplate.convertAndSend("X","XA","我是來自 ttl 為10s的數(shù)據(jù):"+message);

rabbitTemplate.convertAndSend("X","XB","我是來自 ttl 為40s的數(shù)據(jù):"+message);

}

@GetMapping("/sendExpireMsg/{message}/{ttlTime}")

public void sendMsg(@PathVariable String message,

@PathVariable String ttlTime){

rabbitTemplate.convertAndSend("X","XC",message,correlationData ->{

correlationData.getMessageProperties().setExpiration(ttlTime);

return correlationData;

});

log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒 TTL 信息給隊列 C:{}", new Date(),ttlTime, message);

}

}

6.5.4 消費(fèi)者代碼

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.util.Date;

// 隊列 ttl 的消費(fèi)者

@Slf4j

@Component

public class DeadLetterConsumer {

// 接收消息

@RabbitListener(queues="QD")

public void receiveD(Message message, Channel channel) throws Exception{

String msg = new String(message.getBody());

log.info("當(dāng)前時間:{},收到死信隊列信息{}", new Date().toString(), msg);

}

}

????????此時發(fā)送兩個請求:?

????????http://localhost:8080/ttl/sendExpireMsg/你好1/20000

????????http://localhost:8080/ttl/sendExpireMsg/你好2/2000

? ? ? ? 因?yàn)榈诙€消息過期時間我們只設(shè)置了 2s,所以我們認(rèn)為它會第一個打印出來,但實(shí)際上還是第一個消息先打印出來的。?第二個消息等到第一個消息打印之后才打印。

????????看起來似乎沒什么問題,但是如果在消息屬性上設(shè)置 TTL ,消息可能并不會按時 “死亡“ ,因?yàn)?RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行。

6.6 插件實(shí)現(xiàn)延遲隊列

? ? ? ? 在上一節(jié)中存在一個消息排隊的問題,它確實(shí)是一個問題,如果不能實(shí)現(xiàn)在消息粒度上的 TTL,并使其在設(shè)置的 TTL 時間及時死亡,就無法設(shè)計成一個通用的延時隊列。這時就需要使用插件來解決這個問題。

6.6.1 安裝延遲插件

????????在官網(wǎng)上下載?rabbitmq_delayed_message_exchange 插件,下載完成之后按照下面的步驟進(jìn)行安裝。

# 1、切換到 rabbitmq 的插件目錄

[root@localhost /]# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/

# 2、將下載下來的插件復(fù)制到這個目錄下

# 3、安裝這個插件

[root@localhost plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Enabling plugins on node rabbit@localhost:

rabbitmq_delayed_message_exchange

The following plugins have been configured:

rabbitmq_delayed_message_exchange

rabbitmq_management

rabbitmq_management_agent

rabbitmq_web_dispatch

Applying plugin configuration to rabbit@localhost...

The following plugins have been enabled:

rabbitmq_delayed_message_exchange

started 1 plugins.

# 4、重啟 rabbitmq 服務(wù)

[root@localhost plugins]# systemctl restart rabbitmq-server

????????安裝完成之后,打開 rabbitmq 的管理界面,當(dāng)我們手動新建一個交換機(jī)的時候出現(xiàn)了?x-delayed-message 選項,就證明安裝成功了。

6.6.2 代碼架構(gòu)圖

????????在這里我們新增一個隊列 delayed.queue,一個自定義交換機(jī) delayed.exchange,綁定關(guān)系如下:

6.6.3 配置文件類代碼

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class DelayedQueueConfig {

// 隊列

public static final String DELAYED_QUEUE_NAME ="delayed.queue";

// 交換機(jī)

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

// routingkey

public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@Bean

public Queue delayedQueue(){

return new Queue(DELAYED_QUEUE_NAME);

}

// 聲明一個基于插件的延遲交換機(jī)

@Bean

public CustomExchange delayedExchange(){

Map arguments = new HashMap<>();

// 延遲類型為 direct,因?yàn)?routingkey 為一個固定的值

arguments.put("x-delayed-type","direct");

/**

* 1、交換機(jī)的名稱

* 2、交換機(jī)的類型

* 3、是否需要持久化

* 4、是否需要自動刪除

* 5、其他的參數(shù)

*/

return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);

}

// 將普通隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,

@Qualifier("delayedExchange") CustomExchange delayedExchange){

return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();

}

}

6.6.4 生產(chǎn)者代碼

// 發(fā)送延遲消息

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("sendDelayMsg/{message}/{delayTime}")

public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {

rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message,

correlationData -> {

correlationData.getMessageProperties().setDelay(delayTime);

return correlationData;

});

log.info(" 當(dāng) 前 時 間 : {}, 發(fā)送一條延遲 {} 毫秒的信息給隊列 delayed.queue:{}", new

Date(), delayTime, message);

}

}

6.6.5 消費(fèi)者代碼

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j

@Component

public class DelayQueueConsumer {

@RabbitListener(queues = "delayed.queue")

public void receiveDelayedQueue(Message message){

String msg = new String(message.getBody());

log.info("當(dāng)前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg);

}

}

?????????此時發(fā)送兩個請求:?

????????http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000

? ? ? ?http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

????????第二個消息被優(yōu)先消費(fèi)掉了,符合我們的預(yù)期效果。

6.7 總結(jié)

????????延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實(shí)現(xiàn)延時隊列可以很好的利用 RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問題,不會因?yàn)閱蝹€節(jié)點(diǎn)掛掉導(dǎo)致延時隊列不可用或者消息丟失。

????????當(dāng)然,延時隊列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的時間輪,這些方式各有特點(diǎn),看需要適用的場景。

七、消息與 rabbitmq 的發(fā)布確認(rèn)

????????在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 rabbitmq 重啟,在 rabbitMQ 重啟期間生產(chǎn)者消息投遞失敗,導(dǎo)致消息丟失,需要手動處理和恢復(fù)。于是,我們開始思考,如何才能進(jìn)行 RabbitMQ 的消息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ 集群不可用的時候,無法投遞的消息該如何處理。

????????我們在第五章節(jié)的時候講過使用死信隊列來解決 rabbitmq 和消費(fèi)者之間進(jìn)行可靠投遞的問題。接下來我們講下生產(chǎn)者與 rabbitmq 如何實(shí)現(xiàn)消息的可靠投遞。

7.1 生產(chǎn)者和交換機(jī)發(fā)布確認(rèn)

7.1.1 確認(rèn)機(jī)制方案

? ? ? ? 當(dāng)消費(fèi)者發(fā)送消息給 rabbitmq 時,添加一個消息確認(rèn)的機(jī)制,即 rabbitmq 無法正常接收生產(chǎn)者的消息時,給生產(chǎn)者一個反饋,生產(chǎn)者再將投遞失敗的消息放到緩存中來,等待 rabbitmq 正常了再繼續(xù)推送。下面這個流程就是在生產(chǎn)過程中處理類似情況的方案。

7.1.2 代碼架構(gòu)圖

????????接下來我們模擬下 rabbitmq 無法工作時的自動反饋機(jī)制,如下,先搭建一個基礎(chǔ)的工程,生產(chǎn)者 P 發(fā)送消息給類型為 direct 類型名字為 confirm.exchange 交換機(jī),交換機(jī)通過 key1 與 confirm.queue 隊列進(jìn)行綁定,最終被消費(fèi)者 confirm.consumer 消費(fèi)者消費(fèi)。

7.1.3 添加配置類

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ConfirmConfig {

// 定義交換機(jī)

public static final String CONFIRM_EXCHANGE ="confirm.exchange";

// 定義隊列

public static final String CONFIRM_QUEUE = "confirm.queue";

// 定義路由 key

public static final String ROUTING_KEY = "key1";

// 聲明交換機(jī)

@Bean

public DirectExchange confirmExchange(){

return new DirectExchange(CONFIRM_EXCHANGE);

}

// 聲明隊列

@Bean

public Queue confirmQueue(){

return QueueBuilder.durable(CONFIRM_QUEUE).build();

}

// 隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue,

@Qualifier("confirmExchange") DirectExchange confirmExchange){

return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);

}

}

7.1.4 回調(diào)接口

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j

@Component

public class MyCallBack implements RabbitTemplate.ConfirmCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

// 注入當(dāng)前回調(diào)函數(shù)充當(dāng)實(shí)現(xiàn)類

rabbitTemplate.setConfirmCallback(this);

}

/**

*交換機(jī)確認(rèn)消息的回調(diào)方法

* 情況一:交換機(jī)接收生產(chǎn)者發(fā)送的消息成功,進(jìn)行回調(diào)

* 1、CorrelationData 保存了回調(diào)消息的 ID 和其他信息

* 2、ack = true, 表示接收到了消息

* 3、cause = '', 表示沒有失敗的原因

* 情況二:交換機(jī)接收生產(chǎn)者發(fā)送的消息失敗,進(jìn)行回調(diào)

* 1、CorrelationData 保存了回調(diào)消息的 ID 和其他信息

* 2、ack = false, 表示沒有接收到了消息

* 3、cause , 存儲的是失敗的原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

String id = correlationData !=null?correlationData.getId():"";

if(ack){

log.info("交換機(jī)已經(jīng)成功收到 ID 為:{} 的消息",id);

}else{

log.info("交換機(jī)沒有接收到 ID 為:{} 的消息,原因?yàn)椋簕}",id,cause);

}

}

}

7.1.5?修改配置文件

? ? ? ? ?在 application.properties 的配置文件中添加如下的標(biāo)簽:

# none :默認(rèn)值,禁用發(fā)布確認(rèn)模式

# correlated : 發(fā)布消息成功到交換器會觸發(fā)該回調(diào)方法

# simple : 經(jīng)測試有兩種效果: 效果一:和 CORRELATED 值一樣會觸發(fā)回調(diào)方法。

# 效果二:在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點(diǎn)返回發(fā)送結(jié)果,

# 根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點(diǎn)是 waitForConfirmsOrDie 方法如果返回 false 則會關(guān)閉 channel,則接下來無法發(fā)送消息到 broker

spring.rabbitmq.publisher-confirm-type=correlated

7.1.6?消息消費(fèi)者

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Slf4j

@Component

public class ConfirmConsumer {

public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

@RabbitListener(queues =CONFIRM_QUEUE_NAME)

public void receiveMsg(Message message){

String msg=new String(message.getBody());

log.info("消費(fèi)者成功接收到隊列 confirm.queue 的消息:{}",msg);

}

}

7.1.7?消息生產(chǎn)者

import com.springbootrabbitmq.config.ConfirmConfig;

import com.springbootrabbitmq.config.MyCallBack;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

@RequestMapping("/confirm")

@Slf4j

public class ProducerController {

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private MyCallBack myCallBack;

@GetMapping("/sendMessage1/{message}")

public void sendMessage1(@PathVariable String message){

// 測試發(fā)送正常消息的回調(diào)函數(shù),會出現(xiàn)什么情況

// 指定消息 id 為 1

CorrelationData correlationData=new CorrelationData("1");

String routingKey="key1";

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,routingKey,message+routingKey,correlationData);

log.info("key1:發(fā)送消息內(nèi)容:{}",message);

}

@GetMapping("/sendMessage2/{message}")

public void sendMessage2(@PathVariable String message){

// 測試出現(xiàn)未找到交換機(jī)的情況下,會出現(xiàn)什么情況

CorrelationData correlationData=new CorrelationData("2");

String routingKey="key1";

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE+"123",routingKey,message+routingKey,correlationData);

log.info("key2:發(fā)送消息內(nèi)容:{}",message);

}

@GetMapping("/sendMessage3/{message}")

public void sendMessage3(@PathVariable String message){

// 測試出現(xiàn)找到交換機(jī)但未找到隊列的情況下,會出現(xiàn)什么情況

CorrelationData correlationData=new CorrelationData("3");

String routingKey="key2";

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,routingKey,message+routingKey,correlationData);

log.info("key2:發(fā)送消息內(nèi)容:{}",message);

}

????????輸入?http://localhost:8080/confirm/sendMessage1/大家好,來測試正常情況下的輸出和打印,如下所示,可以看到,消費(fèi)者成功的發(fā)送消息到交換機(jī),交換機(jī)也觸發(fā)了回調(diào)函數(shù),最終消息也被消費(fèi)者消費(fèi)了,整個流程沒有什么問題。

????????輸入?http://localhost:8080/confirm/sendMessage2/大家好,來測試交換機(jī)名稱錯誤的情況下的輸出和打印,如下所示,可以看到,消費(fèi)者成功的發(fā)送消息到交換機(jī),交換機(jī)也觸發(fā)了回調(diào)函數(shù),打印了失敗的原因。即未找到交換機(jī)。

????????輸入?http://localhost:8080/confirm/sendMessage3/大家好,來測試隊列名稱錯誤的情況下的輸出和打印,如下所示,可以看到,消費(fèi)者成功的發(fā)送消息到交換機(jī),交換機(jī)觸發(fā)了回調(diào)函數(shù),但是隊列沒有應(yīng)答也沒有確認(rèn)。這個不是我們想要的結(jié)果。

7.2 生產(chǎn)者和隊列發(fā)布確認(rèn)

7.2.1?Mandatory 參數(shù)

? ? ? ? 上面一章節(jié)在僅開啟了生產(chǎn)者確認(rèn)機(jī)制的情況下,交換機(jī)接收到消息后,會直接給消息生產(chǎn)者發(fā)送確認(rèn)消息,如果發(fā)現(xiàn)該消息不可路由,那么消息會被直接丟棄,此時生產(chǎn)者是不知道消息被丟棄這個事件的。

????????那么如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過設(shè)置 mandatory 參數(shù)可以在當(dāng)消息傳遞過程中不可達(dá)目的地時將消息返回給生產(chǎn)者。

# 當(dāng)隊列的消息無法路由到隊列時,會將消息回退給生產(chǎn)者

spring.rabbitmq.publisher-returns=true

7.2.2 修改配置類代碼

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j

@Component

public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

// 注入當(dāng)前實(shí)現(xiàn)類

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnCallback(this);

}

/**

*交換機(jī)確認(rèn)消息的回調(diào)方法

* 情況一:交換機(jī)接收生產(chǎn)者發(fā)送的消息成功,進(jìn)行回調(diào)

* 1、CorrelationData 保存了回調(diào)消息的 ID 和其他信息

* 2、ack = true, 表示接收到了消息

* 3、cause = '', 表示沒有失敗的原因

* 情況二:交換機(jī)接收生產(chǎn)者發(fā)送的消息失敗,進(jìn)行回調(diào)

* 1、CorrelationData 保存了回調(diào)消息的 ID 和其他信息

* 2、ack = false, 表示沒有接收到了消息

* 3、cause , 存儲的是失敗的原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

String id = correlationData !=null?correlationData.getId():"";

if(ack){

log.info("交換機(jī)已經(jīng)成功收到 ID 為:{} 的消息",id);

}else{

log.info("交換機(jī)沒有接收到 ID 為:{} 的消息,原因?yàn)椋簕}",id,cause);

}

}

// 只有不能達(dá)到目的地的時候才會進(jìn)行回退

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.info("消息:{}被服務(wù)器退回,退回原因:{}, 交換機(jī)是:{}, 路由 key:{}",

new String(message.getBody()),replyText, exchange, routingKey);

}

}

7.2.3 結(jié)果分析

???????輸入?http://localhost:8080/confirm/sendMessage3/大家好,來測試隊列名稱錯誤的情況下的輸出和打印,如下所示,可以看到,消費(fèi)者成功的發(fā)送消息到交換機(jī),交換機(jī)觸發(fā)了回調(diào)函數(shù),隊列也觸發(fā)了回調(diào)函數(shù),打印了路由失敗的原因,如下,這個就是我們想要的結(jié)果。

7.3 備份交換機(jī)

????????有了 mandatory 參數(shù)和回退消息,我們獲得了對無法投遞消息的感知能力,有機(jī)會在生產(chǎn)者的消息無法被投遞時發(fā)現(xiàn)并處理。但有時候,我們并不知道該如何處理這些無法路由的消息,最多打個日志,然后觸發(fā)報警,再來手動處理。

????????而通過日志來處理這些無法路由的消息是很不優(yōu)雅的做法,特別是當(dāng)生產(chǎn)者所在的服務(wù)有多臺機(jī)器的時候,手動復(fù)制日志會更加麻煩而且容易出錯。而且設(shè)置 mandatory 參數(shù)會增加生產(chǎn)者的復(fù)雜性,需要添加處理這些被退回的消息的邏輯。

????????如果既不想丟失消息,又不想增加生產(chǎn)者的復(fù)雜性,該怎么做呢?前面在設(shè)置死信隊列的文章中,我們提到,可以為隊列設(shè)置死信交換機(jī)來存儲那些處理失敗的消息,可是這些不可路由消息根本沒有機(jī)會進(jìn)入到隊列,因此無法使用死信隊列來保存消息。

????????在 RabbitMQ 中,有一種備份交換機(jī)的機(jī)制存在,可以很好的應(yīng)對這個問題。什么是備份交換機(jī)呢?備份交換機(jī)可以理解為 RabbitMQ 中交換機(jī)的“備胎”,當(dāng)我們?yōu)槟骋粋€交換機(jī)聲明一個對應(yīng)的備份交換機(jī)時,就是為它創(chuàng)建一個備胎,當(dāng)交換機(jī)接收到一條不可路由消息時,將會把這條消息轉(zhuǎn)發(fā)到備份交換機(jī)中,由備份交換機(jī)來進(jìn)行轉(zhuǎn)發(fā)和處理,

????????通常備份交換機(jī)的類型為 Fanout ,這樣就能把所有消息都投遞到與其綁定的隊列中,然后我們在備份交換機(jī)下綁定一個隊列,這樣所有那些原交換機(jī)無法被路由的消息,就會都進(jìn)入這個隊列了。當(dāng)然,我們還可以建立一個報警隊列,用獨(dú)立的消費(fèi)者來進(jìn)行監(jiān)測和報警。?

7.3.1 代碼架構(gòu)圖

????????先搭建一個基礎(chǔ)的工程,生產(chǎn)者 P 發(fā)送消息給類型為 direct 類型名字為 confirm.exchange 交換機(jī),交換機(jī)通過 key1 與 confirm.queue 隊列進(jìn)行綁定,最終被消費(fèi)者 confirm.consumer 消費(fèi)者消費(fèi)。

? ? ? ? 當(dāng) confim.exchange 交換機(jī)遇到無法投遞的消息時,就將消息轉(zhuǎn)發(fā)給類型為 fanout 的 back.exchange 交換機(jī),交換機(jī)又與 backup.queue 和 warning.queue 隊列進(jìn)行綁定。其中 warning.queue 隊列里面的消息最終被 warning.consumer 消費(fèi)者消費(fèi)。

7.3.2 修改配置類

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ConfirmConfig {

// 定義交換機(jī)

public static final String CONFIRM_EXCHANGE ="confirm.exchange";

// 定義隊列

public static final String CONFIRM_QUEUE = "confirm.queue";

// 定義路由 key

public static final String ROUTING_KEY = "key1";

// 定義備份交換機(jī)

public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";

// 定義備份隊列

public static final String BACKUP_QUEUE_NAME = "backup.queue";

// 定義警告隊列

public static final String WARNING_QUEUE_NAME = "warning.queue";

// 聲明交換機(jī),并綁定備份交換機(jī)

@Bean

public DirectExchange confirmExchange(){

return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true)

.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();

}

// 聲明備份 Exchange,并將其綁定到備份交換機(jī)上

@Bean("backupExchange")

public FanoutExchange backupExchange(){

return new FanoutExchange(BACKUP_EXCHANGE_NAME);

}

// 聲明隊列

@Bean

public Queue confirmQueue(){

return QueueBuilder.durable(CONFIRM_QUEUE).build();

}

// 聲明備份隊列

@Bean

public Queue backupQueue(){

return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();

}

// 聲明警告隊列

@Bean

public Queue warningQueue(){

return QueueBuilder.durable(WARNING_QUEUE_NAME).build();

}

// 隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue,

@Qualifier("confirmExchange") DirectExchange confirmExchange){

return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);

}

// 隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding backupQueueBinding(@Qualifier("backupQueue") Queue backupQueue,

@Qualifier("backupExchange") FanoutExchange backupExchange){

return BindingBuilder.bind(backupQueue).to(backupExchange);

}

// 隊列和交換機(jī)進(jìn)行綁定

@Bean

public Binding warningQueueBinding(@Qualifier("warningQueue") Queue warningQueue,

@Qualifier("backupExchange") FanoutExchange backupExchange){

return BindingBuilder.bind(warningQueue).to(backupExchange);

}

}

7.3.3 報警消費(fèi)者

@Slf4j

@Component

public class WarningConsumer {

public static final String CONFIRM_QUEUE_NAME = "warning.queue";

@RabbitListener(queues =CONFIRM_QUEUE_NAME)

public void receiveMsg(Message message){

String msg=new String(message.getBody());

log.error("報警發(fā)現(xiàn)不可路由消息:{}", msg);

}

}

7.3.4 測試解析

????????在進(jìn)行測試之間需要先在 rabbitmq 的管理界面刪除掉我們前面幾個章節(jié)創(chuàng)建的交換機(jī),否則結(jié)果會收到影響。

??????????輸入?http://localhost:8080/confirm/sendMessage3/大家好,來測試隊列名稱錯誤的情況下的輸出和打印,如下所示,可以看到,消費(fèi)者成功的發(fā)送消息到交換機(jī),交換機(jī)直接將消息轉(zhuǎn)發(fā)給了 back.exchange 交換機(jī),最終消息被 warning.customer 消費(fèi)者消費(fèi)。這個就是我們想要的結(jié)果。

? ? ? ? 因?yàn)槲覀兊拇a是?Mandatory 參數(shù)與備份交換機(jī)一起使用的,如果兩者同時開啟,則備份交換機(jī)優(yōu)先級高。即日志并沒有打印 7.1.5 章節(jié)的日志信息。

?八、RabbitMQ 其他知識點(diǎn)

8.1 冪等性

8.1.1 概念

????????用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。舉個支付的例子,用戶購買商品后支付,支付扣款成功,但是返回結(jié)果的時候網(wǎng)絡(luò)異常,此時錢已經(jīng)扣了,用戶再次點(diǎn)擊按鈕,此時會進(jìn)行第二次扣款,返回結(jié)果成功,用戶查詢余額發(fā)現(xiàn)多扣錢了,流水記錄也變成了兩條。在以前的單應(yīng)用系統(tǒng)中,我們只需要把數(shù)據(jù)操作放入事務(wù)中即可,發(fā)生錯誤立即回滾,但是再響應(yīng)客戶端的時候也有可能出現(xiàn)網(wǎng)絡(luò)中斷或者異常等等。

8.1.2 重復(fù)消費(fèi)

????????消費(fèi)者在消費(fèi) MQ 中的消息時,MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時網(wǎng)絡(luò)中斷,故 MQ 未收到確認(rèn)信息,該條消息會重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息。

8.1.3 解決思路

????????MQ 消費(fèi)者的冪等性的解決一般使用全局 ID 或者寫個唯一標(biāo)識,比如:時間戳或者?UUID 等, 也可以按自己的規(guī)則生成一個全局唯一 id,每次消費(fèi)消息時用該 id 先判斷該消息是否已消費(fèi)過。

8.1.4 消費(fèi)端的冪等性保障

????????在海量訂單生成的業(yè)務(wù)高峰期,生產(chǎn)端有可能就會重復(fù)發(fā)送消息,這時候消費(fèi)端就要實(shí)現(xiàn)冪等性保障,這就意味著我們的消息永遠(yuǎn)不會被消費(fèi)多次。

????????業(yè)界主流的冪等性有兩種操作:a. 唯一 ID + 指紋碼機(jī)制,利用數(shù)據(jù)庫主鍵去重。?b. 利用 redis 的原子性去實(shí)現(xiàn)。

8.1.4.1?唯一 ID + 指紋碼機(jī)制

????????指紋碼:我們的一些規(guī)則或者時間戳加別的服務(wù)給到的唯一信息碼,它并不一定是我們系統(tǒng)生成的,基本都是由我們的業(yè)務(wù)規(guī)則拼接而來,但是一定要保證唯一性,然后就利用查詢語句進(jìn)行判斷,這個 id 是否存在數(shù)據(jù)庫中。

????????優(yōu)勢就是實(shí)現(xiàn)簡單就一個拼接,然后查詢判斷是否重復(fù);劣勢就是在高并發(fā)時,如果是單個數(shù)據(jù)庫就會有寫入性能瓶頸當(dāng)然也可以采用分庫分表提升性能,但也不是我們最推薦的方式。

8.1.4.2 Redis 原子性

????????利用 redis 執(zhí)行 setnx 命令,天然具有冪等性。從而實(shí)現(xiàn)不重復(fù)消費(fèi)。

8.2 優(yōu)先級隊列

8.2.1 使用場景

????????假設(shè)現(xiàn)在有一個訂單催付的場景,顧客在淘寶下的訂單,淘寶會及時將訂單推送給我們,如果用戶在設(shè)定的時間內(nèi)未付款那么就會給用戶推送一條短信提醒。但是對于淘寶來說,商家肯定是要分大客戶和小客戶,比如像蘋果,小米這樣的就屬于大客戶,所以他們的訂單必須得到優(yōu)先處理,即優(yōu)先給這些大客戶的消費(fèi)者發(fā)送短信提醒。

? ? ? ? 以前后端系統(tǒng)是使用 redis 來存放的定時輪詢,但是 redis 的?List 只能做一個簡簡單單的消息隊列,并不能實(shí)現(xiàn)一個優(yōu)先級的場景,所以訂單量大了后采用 RabbitMQ 進(jìn)行改造和優(yōu)化,如果發(fā)現(xiàn)是大客戶的訂單給一個相對比較高的優(yōu)先級,否則就是默認(rèn)優(yōu)先級。

8.2.2 如何添加

? ? ? ? 第一種方式:可以在 rabbitmq 的管理界面進(jìn)行手動的添加,其中優(yōu)先級的取值范圍為 0 到 255 之間,我們在實(shí)際的開發(fā)中最大值設(shè)置為 10 就足夠了。

????????第二種方式:在生產(chǎn)者代碼里面添加,一共需要修改兩處,第一處是在隊列代碼中添加優(yōu)先級;?第二處是在發(fā)送消息的時候,需要設(shè)置消息的優(yōu)先級。

Map arguments = new HashMap<>();

// 官方允許的范圍為 0-255,此處設(shè)置為 10,即允許的范圍為 0-10,不宜設(shè)置過大,會浪費(fèi) CPU 內(nèi)存

arguments.put("x-max-priority",10);

channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();

channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());

? ? ? ? 如果想要讓隊列實(shí)現(xiàn)優(yōu)先級:首先隊列需要設(shè)置為優(yōu)先級隊列,消息需要設(shè)置消息的優(yōu)先級,消費(fèi)者需要等待消息已經(jīng)發(fā)送到隊列中才去消費(fèi)因?yàn)椋@樣才有機(jī)會對消息進(jìn)行排序。?

8.2.3 實(shí)戰(zhàn)

????????生產(chǎn)者代碼如下:

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;

import java.util.Map;

public class Producer {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.150");

factory.setUsername("admin");

factory.setPassword("123");

// channel 實(shí)現(xiàn)了自動 close 接口 自動關(guān)閉 不需要顯示關(guān)閉

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

Map arguments = new HashMap<>();

// 官方允許的范圍為 0-255,此處設(shè)置為 10,即允許的范圍為 0-10,不宜設(shè)置過大,會浪費(fèi) CPU 內(nèi)存

arguments.put("x-max-priority",10);

channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);

for(int i=0;i<10;i++){

String message="info"+i;

if(i==5){

AMQP.BasicProperties properties =

new AMQP.BasicProperties().builder().priority(5).build();

channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());

}else{

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

}

}

System.out.println("消息發(fā)送完畢");

}

}

????????消費(fèi)者代碼如下:

import com.rabbitmq.client.*;

// 接收消息的消費(fèi)者

public class Consumer {

// 隊列的名稱

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.150");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

System.out.println("等待接收消息....");

// 接收到消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback=(consumerTag, message)->{

String resultMessage= new String(message.getBody());

System.out.println(resultMessage);

};

// 取消消費(fèi)時的回調(diào)函數(shù),比如在消費(fèi)的時候隊列被刪除掉了

CancelCallback cancelCallback=(consumerTag)->{

System.out.println("消息消費(fèi)被中斷");

};

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

????????先啟動消費(fèi)者,如下

????????再啟動消費(fèi)者,如下,可以看到 info5 的消息被提前消費(fèi)掉了,滿足我們的預(yù)期。

8.3 惰性隊列

8.3.1 使用場景

????????RabbitMQ 從 3.6.0 版本開始引入了惰性隊列的概念。惰性隊列會盡可能的將消息存入磁盤中,而在消費(fèi)者消費(fèi)到相應(yīng)的消息時才會被加載到內(nèi)存中,它的一個重要的設(shè)計目標(biāo)是能夠支持更長的隊列,即支持更多的消息存儲。當(dāng)消費(fèi)者由于各種各樣的原因(比如消費(fèi)者下線、宕機(jī)亦或者是由于維護(hù)而關(guān)閉等)而致使長時間內(nèi)不能消費(fèi)消息造成堆積時,惰性隊列就很有必要了。

????????默認(rèn)情況下,當(dāng)生產(chǎn)者將消息發(fā)送到 RabbitMQ 的時候,隊列中的消息會盡可能的存儲在內(nèi)存之中,這樣可以更加快速的將消息發(fā)送給消費(fèi)者。即使是持久化的消息,在被寫入磁盤的同時也會在內(nèi)存中駐留一份備份。當(dāng) RabbitMQ 需要釋放內(nèi)存的時候,會將內(nèi)存中的消息換頁至磁盤中,這個操作會耗費(fèi)較長的時間,也會阻塞隊列的操作,進(jìn)而無法接收新的消息。雖然 RabbitMQ 的開發(fā)者們一直在升級相關(guān)的算法,但是效果始終不太理想,尤其是在消息量特別大的時候。

8.3.2 兩種模式

????????隊列具備兩種模式:default 和 lazy。默認(rèn)的為 default 模式,在 3.6.0 之前的版本無需做任何變更。lazy 模式即為惰性隊列的模式,可以通過調(diào)用 channel.queueDeclare()?方法的時候在參數(shù)中設(shè)置,也可以通過 Policy 的方式設(shè)置,如果一個隊列同時使用這兩種方式設(shè)置的話,那么 Policy 的方式具備更高的優(yōu)先級。如果要通過聲明的方式改變已有隊列的模式的話,那么只能先刪除隊列,然后再重新聲明一個新的。

????????在隊列聲明的時候可以通過 “x-queue-mode” 參數(shù)來設(shè)置隊列的模式,取值為 “default” 和 “l(fā)azy” 。下面示例中演示了一個惰性隊列的聲明細(xì)節(jié):

Map arguments = new HashMap<>();

arguments.put("x-queue-mode","lazy");

channel.queueDeclare("myqueue",true,false,false,arguments);

8.3.3 內(nèi)存開銷對比

????????在發(fā)送 1 百萬 條消息,每條消息大概占 1KB 的情況下,普通隊列占用內(nèi)存是 1.2GB,而惰性隊列僅僅占用 1.5MB。

九、RabbitMQ 集群

9.1 clustering

9.1.1 使用集群的原因

????????最開始我們介紹了如何安裝及運(yùn)行 RabbitMQ 服務(wù),不過這些是單機(jī)版的,無法滿足目前真實(shí)應(yīng)用的要求。如果 RabbitMQ 服務(wù)器遇到內(nèi)存崩潰、機(jī)器掉電或者主板故障等情況,該怎么辦?單臺 RabbitMQ 服務(wù)器可以滿足每秒 1000 條消息的吞吐量,那么如果應(yīng)用需要 RabbitMQ 服務(wù)滿足每秒 10 萬條消息的吞吐量呢?購買昂貴的服務(wù)器來增強(qiáng)單機(jī) RabbitMQ 服務(wù)的性能顯得捉襟見肘,搭建一個 RabbitMQ 集群才是解決實(shí)際問題的關(guān)鍵。

9.1.2 搭建步驟

? ? ? ? 我們準(zhǔn)備搭建一個 rabbitmq 的集群,如下圖所示,搭建的步驟如下

????????1、準(zhǔn)備 3 臺服務(wù)器,并確保 3 臺服務(wù)器都已經(jīng)安裝了 rabbitmq 的服務(wù)。

????????2、修改 3 臺服務(wù)器的主機(jī)名稱,改成如下效果即可。

# 第一步:修改主機(jī)名稱

vim /etc/hostname

# 第二步:執(zhí)行重啟命令使配置生效

reboot

? ? ???

????????3、?配置各個節(jié)點(diǎn)的 hosts 文件,讓各個節(jié)點(diǎn)都能互相識別對方

# 修改配置文件

vim /etc/hosts

# 每個節(jié)點(diǎn)都添加以下的信息

192.168.229.150 node1

192.168.229.152 node2

192.168.229.151 node3

????????4、確保各個節(jié)點(diǎn)的 cookie 文件使用的是同一個值

# 只在 node1 節(jié)點(diǎn)上執(zhí)行遠(yuǎn)程操作命令

scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie

scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie

????????5、啟動 Rabbitmq 服務(wù)

# 在三臺節(jié)點(diǎn)上分別執(zhí)行以下命令

rabbitmq-server -detached

? ? ? ? ?6、在節(jié)點(diǎn) 2 執(zhí)行以下的命令

# 先關(guān)閉服務(wù)

rabbitmqctl stop_app

# 重置服務(wù)

rabbitmqctl reset

# 把自己加入到 1 號節(jié)點(diǎn)當(dāng)中

# 如果執(zhí)行此條語句報錯,有可能是節(jié)點(diǎn) 1 的防火墻未關(guān)閉,需要先關(guān)閉防火墻

rabbitmqctl join_cluster rabbit@node1

# 重啟

rabbitmqctl start_app(只啟動應(yīng)用服務(wù))

? ? ? ? 7、在節(jié)點(diǎn) 3 執(zhí)行以下的命令?

# 先關(guān)閉服務(wù)

rabbitmqctl stop_app

# 重置服務(wù)

rabbitmqctl reset

# 把自己加入到 2 號節(jié)點(diǎn)當(dāng)中

# 如果執(zhí)行此條語句報錯,有可能是節(jié)點(diǎn) 2 的防火墻未關(guān)閉,需要先關(guān)閉防火墻

rabbitmqctl join_cluster rabbit@node2

# 重啟

rabbitmqctl start_app(只啟動應(yīng)用服務(wù))

? ? ? ? ?8、查看集群的狀態(tài)

# 任一節(jié)點(diǎn)執(zhí)行都可以

rabbitmqctl cluster_status

? ? ? ? 9、重新設(shè)置賬戶信息

# 任一賬號執(zhí)行均可

# 創(chuàng)建賬號

rabbitmqctl add_user admin 123

# 設(shè)置用戶角色

rabbitmqctl set_user_tags admin administrator

# 設(shè)置用戶權(quán)限

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

? ? ? ? ?10、解除集群節(jié)點(diǎn)命令

# 刪除節(jié)點(diǎn) 2 需要執(zhí)行的命令

rabbitmqctl stop_app

rabbitmqctl reset

rabbitmqctl start_app

rabbitmqctl cluster_status

# 此條命令需要在 node1 機(jī)器上執(zhí)行

rabbitmqctl forget_cluster_node rabbit@node2

# 刪除節(jié)點(diǎn) 3 需要執(zhí)行的命令

rabbitmqctl stop_app

rabbitmqctl reset

rabbitmqctl start_app

rabbitmqctl cluster_status

# 此條命令需要在 node2 機(jī)器上執(zhí)行

rabbitmqctl forget_cluster_node rabbit@node3

????????登錄 rabbitmq 的管理界面,可以看到,此時我們有了 3 臺服務(wù)的集群

9.2 鏡像隊列

? ? ? ? 根據(jù)上一節(jié)的步驟創(chuàng)建的 rabbitmq 集群是不可復(fù)用的,不可服用的意思是在 node1 上創(chuàng)建的隊列,在 node2 上是沒有的,意味著只要 node1 宕機(jī)了,那么這個隊列就消失了,它不會因?yàn)橛?3 臺機(jī)器就會有 3 個隊列。我們實(shí)際來感受下。

? ? ? ?首選使用 node1 來創(chuàng)建一個 hello 隊列并發(fā)送一條消息,代碼如下

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Producer {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.150");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

String message = "HELLO WORLD";

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

System.out.println("消息發(fā)送完畢");

}

}

? ? ? ? 打開 rabbitmq 的管理界面可以看到 hello 隊列是屬于 node1 節(jié)點(diǎn)的,在 node2 和 node3 上是沒有的。

? ? ? ? 如果此時 node1 節(jié)點(diǎn)宕機(jī)了,那消息是不是就丟失了呢?答案是會丟失的,我們來測試以下,我們執(zhí)行下面的命令來手動使 node1 節(jié)點(diǎn)宕機(jī)。

rabbitmqctl stop_app

????????執(zhí)行完命令之后,刷新下 rabbitmq 的管理界面,發(fā)現(xiàn)連接不上了,因?yàn)樗礄C(jī)了,現(xiàn)在只能連接另外的兩臺機(jī)器,可以看到,此時 node1 的狀態(tài)時不可用的狀態(tài)。

????????再來看看之間創(chuàng)建的 hello 隊列,我們發(fā)現(xiàn)隊列還在,但是里面的消息數(shù)量不顯示了?

????????此時,我們嘗試是否可以消費(fèi) hello 隊列里面的消息,代碼如下:

import com.rabbitmq.client.*;

// 接收消息的消費(fèi)者

public class Consumer {

// 隊列的名稱

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

// 由于 node1 節(jié)點(diǎn)掛掉了,此時只能連接另外的兩個節(jié)點(diǎn)

factory.setHost("192.168.229.151");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

System.out.println("等待接收消息....");

// 接收到消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback=(consumerTag, message)->{

String resultMessage= new String(message.getBody());

System.out.println(resultMessage);

};

// 取消消費(fèi)時的回調(diào)函數(shù),比如在消費(fèi)的時候隊列被刪除掉了

CancelCallback cancelCallback=(consumerTag)->{

System.out.println("消息消費(fèi)被中斷");

};

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

????????根據(jù)日志的輸出打印可以看出,node1 節(jié)點(diǎn)已經(jīng)掛機(jī)了,無法取出數(shù)據(jù)了,即消息丟失了

????????此時,我們輸入以下的命令重新啟動 node1 節(jié)點(diǎn),再次刷新 rabbitmq,發(fā)現(xiàn),消息丟失了。

rabbitmqctl start_app

????????如果我們在發(fā)送消息的時候,將消息進(jìn)行了持久化,參考 2.3.3 章節(jié),rabbitmq 宕機(jī)之后重啟消息是不會丟失的,這里就不在贅述了。?

9.2.1 使用鏡像的原因

? ? ? ? 鏡像隊列是什么意思是?就是備份,我們應(yīng)該給每一個 node 都進(jìn)行備份,只要生產(chǎn)者發(fā)送消息不應(yīng)該只在某一個節(jié)點(diǎn)上存在,這樣就可以避免當(dāng)一個 node 宕機(jī)了消息就丟失了。

? ? ? ? 我們想要的是生產(chǎn)者發(fā)送消息給 node1 節(jié)點(diǎn),node1 節(jié)點(diǎn)再備份一份到 node2 節(jié)點(diǎn),這樣信息就有兩份了,一份在 node1 ,一份在 node2 ,下面我們就來具體的搭建一下。

9.2.2 搭建步驟

? ? ? ? 1、啟動三臺集群節(jié)點(diǎn)

? ? ? ? 2、隨便找一個節(jié)點(diǎn)添加 policy

# 名字,隨便起

Name

# 規(guī)則,正則表達(dá)式,^mirrior:表示以 mirrior 為前綴的隊列和交換機(jī)

Pattern

# 應(yīng)用的一些參數(shù)

Definition

# 備份模式:指定模式,指定備份幾份

ha-mode = exactly

# 由此參數(shù)指定,指定兩份(主一份備一份)

ha-params = 2

# 同步的模式:自動同步

ha-syn-mode= automatic

????????此時我們測試下鏡像隊列是否生效,生產(chǎn)者代碼如下:

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.MessageProperties;

public class Producer {

private final static String QUEUE_NAME = "mirrior-hello";

public static void main(String[] args) throws Exception {

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.229.150");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

String message = "HELLO WORLD";

channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

System.out.println("消息發(fā)送完畢");

}

}

????????啟動生產(chǎn)者,打開 rabbitmq 的管理界面,這個 +1 就是備份的意思。

????????可以點(diǎn)進(jìn)去看一眼,看看哪兩個節(jié)點(diǎn)涉及到了備份,如下??

????????此時輸入 rabbitmqctl stop_app 命令關(guān)閉 node1 ,再次打開 rabbitmq 的管理界面,可以看到 node1 節(jié)點(diǎn)掛掉了。

????????可以看到,此時主變?yōu)榱?node3 ,備份又在?node2 上了,因?yàn)?node1 宕機(jī)了。?

????????此時再去獲取消息,消息是不會丟失的,如下:

import com.rabbitmq.client.*;

// 接收消息的消費(fèi)者

public class Consumer {

// 隊列的名稱

private final static String QUEUE_NAME = "mirrior-hello";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

// 由于 node1 節(jié)點(diǎn)掛掉了,此時只能連接另外的兩個節(jié)點(diǎn)

factory.setHost("192.168.229.151");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

System.out.println("等待接收消息....");

// 接收到消息時的回調(diào)函數(shù)

DeliverCallback deliverCallback=(consumerTag, message)->{

String resultMessage= new String(message.getBody());

System.out.println(resultMessage);

};

// 取消消費(fèi)時的回調(diào)函數(shù),比如在消費(fèi)的時候隊列被刪除掉了

CancelCallback cancelCallback=(consumerTag)->{

System.out.println("消息消費(fèi)被中斷");

};

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

????????就算整個集群只剩下一臺機(jī)器了,依然能消費(fèi)隊列里面的消息,說明隊列里面的消息被鏡像隊列傳遞到相應(yīng)機(jī)器里面了。?

9.3 Haproxy+Keepalive 實(shí)現(xiàn)高可用負(fù)載均衡

9.3.1 整體架構(gòu)圖

? ? ? ? 生產(chǎn)者發(fā)消息給 VIP,平時 VIP 走主機(jī),主機(jī) node11 再進(jìn)行轉(zhuǎn)發(fā)給其他的節(jié)點(diǎn),一旦 node11 宕機(jī)了,它里面是有一個 keepalive 軟件用于監(jiān)控,發(fā)現(xiàn) node11 宕機(jī)了,就會將 ip 進(jìn)行飄移到另外一個備機(jī) node21 上,備機(jī)再進(jìn)行轉(zhuǎn)發(fā)和處理;當(dāng)然了,備機(jī)也會時不時的去判斷主機(jī)是否存活。

9.3.2 Haproxy 實(shí)現(xiàn)負(fù)載均衡

????????HAProxy 提供高可用性、負(fù)載均衡及基于 TCP、HTTP 應(yīng)用的代理,支持虛擬主機(jī),它是免費(fèi)、快速并且可靠的一種解決方案,包括 Twitter、Reddit、StackOverflow、GitHub 在內(nèi)的多家知名互聯(lián)網(wǎng)公司在使用。HAProxy 實(shí)現(xiàn)了一種事件驅(qū)動、單一進(jìn)程模型,此模型支持非常大的井發(fā)連接數(shù)。

????????nginx、lvs 和 haproxy 之間的區(qū)別: http://www.ha97.com/5646.html

9.4 Federation Exchange?

? ? ? ? 聯(lián)邦交換機(jī),又名聯(lián)合交換機(jī)。

9.4.1 使用它的原因

? ? ? ? 我們公司有兩個機(jī)房,一個在北京,一個在深圳。彼此之間相距甚遠(yuǎn),網(wǎng)絡(luò)延遲是一個不得不面對的問題。如下圖,有兩個交換機(jī) A 和 B,如果北京的客戶想要訪問北京的交換機(jī)是沒有問題的;但是如果北京的客戶想訪問深圳的交換機(jī)就會出現(xiàn)網(wǎng)絡(luò)延遲,因?yàn)榫嚯x太遠(yuǎn)了,網(wǎng)絡(luò)傳輸需要一定的時間。

? ? ? ? 我們希望的是北京的客戶訪問北京的 rabbitmq,深圳的客戶訪問深圳的 rabbitmq,但是北京客戶如果想要訪問深圳的 rabbitmq,就有可能會存在數(shù)據(jù)不一致的問題,怎么辦呢?應(yīng)該讓兩臺交換機(jī)互相的同步數(shù)據(jù),保證北京和深圳的交換機(jī)數(shù)據(jù)是一致的。

? ? ? ? 那么如何保證一致呢?就需要使用?Federation 插件,搭建步驟在下一節(jié)。

9.4.2 搭建步驟

? ? ? ? 1、需要保證每臺節(jié)點(diǎn)單獨(dú)運(yùn)行

? ? ? ? 2、在每一臺服務(wù)器上執(zhí)行以下的命令

rabbitmq-plugins enable rabbitmq_federation

rabbitmq-plugins enable rabbitmq_federation_management

????????在 rabbitmq 的管理界面出現(xiàn)這兩個菜單就證明安裝成功了。?

? ? ? ? 3、原理圖如下,我們把上半部分理解為北京節(jié)點(diǎn) node1,下半部分理解為深圳節(jié)點(diǎn) node2,我們認(rèn)為 node1 為上游節(jié)點(diǎn),node2 為下游節(jié)點(diǎn),此時上游的數(shù)據(jù)往下游進(jìn)行同步。此時同步數(shù)據(jù)是以交換機(jī)為節(jié)點(diǎn);即 node1 的交換機(jī)同步數(shù)據(jù)給 node2 交換機(jī)。中間經(jīng)歷了兩個階段,一個階段是配置,即 node1 節(jié)點(diǎn)的交換機(jī)要配置 node2 節(jié)點(diǎn)的地址。也就是要讓 node1 可以找到 node2 節(jié)點(diǎn),自然就會把交換機(jī)里面的數(shù)據(jù)同步給 node2 節(jié)點(diǎn)中的交換機(jī)。兩個交換機(jī)的名字一樣,而且同步有個前提,node2 節(jié)點(diǎn)必須現(xiàn)有 fed_exchange 交換機(jī),如果沒有就無法進(jìn)行同步

????????4、在 downstream(node2)配置 upstream(node1)

????????5、添加 policy

????????6、在這個地方可以查看成功的狀態(tài)。?

9.5?Federation Queue

9.5.1 使用它的原因

????????聯(lián)邦隊列可以在多個 Broker 節(jié)點(diǎn)(或者集群)之間為單個隊列提供均衡負(fù)載的功能。一個聯(lián)邦隊列可以連接一個或者多個上游隊列 (upstream queue),并從這些上游隊列中獲取消息以滿足本地消費(fèi)者消費(fèi)消息的需求。

9.5.2 搭建步驟

? ? ? ? 1、原理圖如下:

9.6 Shovel

9.6.1 使用它的原因

? ? ? ? 與 Federation 具備的數(shù)據(jù)轉(zhuǎn)發(fā)功能類似,Shovel 可以可靠、持續(xù)地從一個 Broker 中的隊列(作為源端,即 source)拉取數(shù)據(jù)并轉(zhuǎn)發(fā)至另一個 Broker 中的交換器(作為目的端,即 destination)。作為源端的隊列和作為目的端的交換器可以同時位于同一個 Broker,也可以位于不同的 Broker 上。

????????Shovel 可以翻譯為 "鏟子" ,是一種比較形象的比喻,這個 "鏟子" 可以將消息從一方鏟到另一方。Shovel 行為就像優(yōu)秀的客戶端應(yīng)用程序能夠負(fù)責(zé)連接源和目的地、負(fù)責(zé)消息的讀寫及負(fù)責(zé)連接失敗問題的處理。

9.6.2 搭建步驟

????????1、開啟插件,源端和目的端都執(zhí)行下面的命令

rabbitmq-plugins enable rabbitmq_shovel

rabbitmq-plugins enable rabbitmq_shovel_management

? ? ? ? 安裝完畢之后進(jìn)入 rabbitmq 的管理界面看到下面的兩個選項就證明安裝成功了。?

? ? ? ? 2、原理圖,如下,如果先往 Q1 和 Q2 各自發(fā)一條消息,當(dāng)設(shè)置 shovel 之后,Q2 里面就有兩條消息了。

?????????3、添加 shovel 的源和目的地。

柚子快報激活碼778899分享:RabbitMQ 幾種模式

http://yzkb.51969.com/

相關(guān)閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19085242.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄