柚子快報激活碼778899分享:java RabbitMQ筆記
柚子快報激活碼778899分享:java RabbitMQ筆記
文章目錄
四大核心概念下載安裝erlang和rabbitMQHellowWorld創(chuàng)建開發(fā)環(huán)境生產(chǎn)者代碼消費(fèi)者代碼
工作隊(duì)列實(shí)現(xiàn)工作隊(duì)列
消息應(yīng)答自動應(yīng)答手動應(yīng)答(建議)
持久化隊(duì)列持久化消息持久化不公平分發(fā)
預(yù)取值發(fā)布確認(rèn)單個發(fā)布確認(rèn)批量發(fā)布確認(rèn)異步發(fā)布確認(rèn)
交換機(jī)FanoutDirectTopics
死信隊(duì)列消息TTL過期隊(duì)列達(dá)到最大長度消息被拒絕
延遲隊(duì)列整合springboot隊(duì)列實(shí)現(xiàn)隊(duì)列優(yōu)化插件實(shí)現(xiàn)延遲隊(duì)列
發(fā)布確認(rèn)高級回退消息
四大核心概念
下載安裝erlang和rabbitMQ
1.安裝Erlang: https://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.6/erlang-23.2.6-1.el7.x86_64.rpm
rpm -ivh erlang-23.2.6-1.el7.x86_64.rpm
#測試
erl -version
2.安裝RabbitMQ https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.12/rabbitmq-server-3.8.12-1.el7.noarch.rpm
#先安裝依賴socat
yum install socat -y
rpm -ivh rabbitmq-server-3.8.12-1.el7.noarch.rpm
3.命令:
添加開機(jī)啟動RabbitMQ服務(wù)
chkconfig rabbitmq-server on
啟動服務(wù)
/sbin/service rabbitmq-server start
查看服務(wù)狀態(tài)
/sbin/service rabbitmq-server status
#停止服務(wù)
/sbin/service rabbitmq-server stop
#安裝可視化管理 插件
rabbitmq-plugins enable rabbitmq_management
4.重啟rabbitmq服務(wù),然后在windows客戶端進(jìn)入192.168.163.128(Linux的ip地址):15672,需要開啟端口號,順利進(jìn)入!
開啟防火墻
firewall-cmd --permanent --add-port=15672/tcp
重啟生效
firewall-cmd --reload
5.添加用戶并且設(shè)置權(quán)限
創(chuàng)建賬號
rabbitmqctl add_user admin 123
設(shè)置用戶角色
rabitmqctl set_user_tags admin administrator
設(shè)置用戶權(quán)限 格式:rabitmqctl set_permissions [-p
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" #用戶admin具有/vhost1這個virtual host中所有資源的配置、寫、讀權(quán)限
查看用戶列表
rabbitmqctl list_users
HellowWorld
創(chuàng)建開發(fā)環(huán)境
創(chuàng)建maven工程rabbitmq-hello,pom.xml:
生產(chǎn)者代碼
注意:如果報connection error,考慮是端口號沒有開放的問題。連接服務(wù),請求的端口號是5672,而可視化工具服務(wù),請求的是15672,因此需要開啟5672跟15672兩個端口,測試連接成功!
public class Producer {
// 隊(duì)列名稱
public static final String QUEUE_NAME="hello";
// 發(fā)消息
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 工廠IP連接RabbitMQ的隊(duì)列
factory.setHost("192.168.163.128");
// 用戶名
factory.setUsername("admin");
// 密碼
factory.setPassword("123");
factory.setPort(5672);
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 獲取信道
Channel channel = connection.createChannel();
/*
* 生成一個隊(duì)列
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:隊(duì)列里面的消息是否持久化,默認(rèn)情況下,消息存儲在內(nèi)存中
* 參數(shù)3:該隊(duì)列是否只供一個消費(fèi)者進(jìn)行消費(fèi),是否進(jìn)行消費(fèi)共享,true可以多個消費(fèi)者消費(fèi),
* false只能一個消費(fèi)者消費(fèi)
* 參數(shù)4:是否自動刪除:最后一個消費(fèi)者斷開連接之后,該隊(duì)列是否自動刪除,true則自動刪除,
* false不自動刪除
* 參數(shù)5:其他參數(shù)
* */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 發(fā)消息
String message = "hello world";
/*
* 發(fā)送一個消息
* 參數(shù)1:發(fā)送到哪個交換機(jī)
* 參數(shù)2:路由的key值是那個,本次是隊(duì)列的名稱
* 參數(shù)3:其他參數(shù)信息
* 參數(shù)4:發(fā)送消息的消息體
* */
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息發(fā)送完畢!");
}
}
消費(fèi)者代碼
public class Consumer {
// 隊(duì)列名稱
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.163.128");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明 接受消息
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String(message.getBody()));
};
// 聲明 取消消息
CancelCallback cancelCallback = consumer -> {
System.out.println("消息消費(fèi)被中斷");
};
/*
* 消費(fèi)者接收消息
* 參數(shù)1:表示消費(fèi)哪個UI列
* 參數(shù)2:消費(fèi)成功之后,是否需要自動應(yīng)答,true表示自動應(yīng)答,false表示手動應(yīng)答
* 參數(shù)3:消費(fèi)者成功消費(fèi)的回調(diào)
* 參數(shù)4:消費(fèi)者取消消費(fèi)的回調(diào)
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作隊(duì)列
工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個工作線程時,這些工作線程將一起處理這些任務(wù)。
實(shí)現(xiàn)工作隊(duì)列
抽取連接工廠工具類
/*
* 此類為連接工廠創(chuàng)建信道的工具類
* */
public class RabbitMqUtils {
// 得到一個連接的channel
public static Channel getChannel() throws IOException, TimeoutException {
// 創(chuàng)建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.24.8");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
com.rabbitmq.client.Channel channel = connection.createChannel();
return channel;
}
}
工作線程代碼
public class Worker01 {
// 隊(duì)列名稱
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 接受消息參數(shù)
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println("接受到的消息:"+message.getBody());
};
// 取消消費(fèi)參數(shù)
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
// 消息的接受
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
同時開啟兩個工作線程:
生產(chǎn)者代碼
public class Task01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//聲明一個隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//從控制臺輸入消息
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息發(fā)送完成:"+message);
}
}
}
消息應(yīng)答
消費(fèi)者完成一個任務(wù)可能需要一段時間,如果其中一個消費(fèi)者處理一個長的任務(wù)并僅只完成了部分突然它掛掉了,會發(fā)生什么情況。RabbitMQ一旦向消費(fèi)者傳遞了一條消息,便立即將該消息標(biāo)記為刪除。在這種情況下,突然有個消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù)發(fā)送給該消費(fèi)這的消息,國為它無法接收到。 為了保證消息在發(fā)送過程中不丟失,RabbitMQ引入消息應(yīng)答機(jī)制,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后,告訴RabbitMQ它已經(jīng)處理了,RabbitMQ可以把該消息刪除了。
自動應(yīng)答
消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者channel關(guān)閉,那么消息就丟失了,當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過載的消息,沒有對傳遞的消息數(shù)量進(jìn)行限制,當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來不及處理的消息,導(dǎo)致這些消息的積壓,最終使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并以某種速率能夠處理這些消息的情況下使用。
手動應(yīng)答(建議)
消息應(yīng)答的方法
Channel.basicAck(用于肯定確認(rèn)),RabbitMQ已知道該消息并且成功處理,可以將其丟棄
Channel.basicNack(用于否定確認(rèn))
Channel.basicReject(用于否定確認(rèn)),與Channel.basicNack相比少了一個參數(shù)Multiple,不處理該消息了,直接拒絕,可以將其丟棄了。
Multiple的解釋(一般為false) 手動應(yīng)答的好處是可以批量應(yīng)答并且減少網(wǎng)絡(luò)擁堵
multiple的true和false是不同的意思: true表示批量應(yīng)答channel上未應(yīng)答的消息,比如channel上有傳送tag的消息5,6,7,8,,當(dāng)前tag是8,那么此時5-8的這些還未應(yīng)答的消息就會被確認(rèn)收到消息應(yīng)答. false同上面相比只會應(yīng)答tag=8的消息,5,6,7這三個消息依然不會被確認(rèn)收到消息應(yīng)答.
消息自動重新入隊(duì) 如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或TCP連接丟失),導(dǎo)致消息未發(fā)送ACK確認(rèn),RabbitMQ將了解到消息未完全處理,并將對其重新排隊(duì)。如果此時其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個消費(fèi)者。這樣,即使某個消費(fèi)者偶爾死亡,也可以確保不會丟失任何消息。
生產(chǎn)者代碼:
/*
* 消息在手動應(yīng)答時是不丟失、放回隊(duì)列中重新消費(fèi)
* */
public class Task2 {
// 隊(duì)列名稱
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 聲明隊(duì)列
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生產(chǎn)者發(fā)出消息:"+message);
}
}
}
消費(fèi)者1代碼
/*
* 消費(fèi)者
* */
public class Worker02 {
// 隊(duì)列名稱
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接受消息處理時間較短");
DeliverCallback deliverCallback = (consumerTag,message) -> {
//沉睡1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接受到的消息是:"+new String(message.getBody()));
//進(jìn)行手動應(yīng)答
/*
* 參數(shù)1:消息的標(biāo)記 tag
* 參數(shù)2:是否批量應(yīng)答,false:不批量應(yīng)答 true:批量
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 采用手動應(yīng)答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag) -> {
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
});
}
}
消費(fèi)者2代碼
/*
* 消費(fèi)者
* */
public class Worker03 {
// 隊(duì)列名稱
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接受消息處理時間較長");
DeliverCallback deliverCallback = (consumerTag,message) -> {
//沉睡30s
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接受到的消息是:"+new String(message.getBody()));
//進(jìn)行手動應(yīng)答
/*
* 參數(shù)1:消息的標(biāo)記 tag
* 參數(shù)2:是否批量應(yīng)答,false:不批量應(yīng)答 true:批量
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 采用手動應(yīng)答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag) -> {
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
});
}
}
持久化
剛剛我們已經(jīng)看到了如何處理任務(wù)不丟失的情況,但是如何保障當(dāng)RabbitMQ服務(wù)停掉以后消息生產(chǎn)者發(fā)送過來的消息不丟失。默認(rèn)情況下RabbitMQ退出或由于某種原因崩潰時,它忽視隊(duì)列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久化。
隊(duì)列持久化
之前我們創(chuàng)建的隊(duì)列都是非持久化的,rabbitmq如果重啟的話,該隊(duì)列就會被刪除掉,如果要隊(duì)列實(shí)現(xiàn)持久化需要在聲明隊(duì)列的時候把durable參數(shù)設(shè)置為持久化。
// 聲明隊(duì)列
// 持久化 需要讓Queue持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
需要注意的就是如果之前聲明的隊(duì)列不是持久化的,需要把原先隊(duì)列先刪除或者重新創(chuàng)建一個持久化的隊(duì)列,不然就會出現(xiàn)錯誤。
消息持久化
要想讓消息實(shí)現(xiàn)持久化需要在消息生產(chǎn)者修改代碼,MessageProperties,PERSISTENT_TEXT_PLAIN添加這個屬性。 將消息標(biāo)記為持久化并不能完全保證不會丟失消息。盡管它告訴RabbitMQ將消息保存到磁盤,但是這里依然存在當(dāng)消息剛準(zhǔn)備存儲在磁盤的時候但是還沒有存儲完,消息還在緩存的一個間隔點(diǎn)。此時并沒有真正寫入磁盤。持久性保證并不強(qiáng),但是對于我們的簡單任務(wù)隊(duì)列而言,這已經(jīng)綽綽有余了。
//設(shè)置生產(chǎn)者發(fā)送消息為持久化消息(要求保存到磁盤上)
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN
,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生產(chǎn)者發(fā)出消息:"+message);
不公平分發(fā)
在最開始的時候我們學(xué)習(xí)到RabbitMQ.分發(fā)消息采用的輪訓(xùn)分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費(fèi)者在處理任務(wù),其中有個消費(fèi)者1處理任務(wù)的速度非常快,而另外一個消費(fèi)者2處理速度卻很慢,這個時候我們還是采用輪訓(xùn)分發(fā)的化就會到這處理速度快的這個消費(fèi)者很大一部分時間處于空閑狀態(tài),而處理慢的那個消費(fèi)者一直在干活,這種分配方式在這種情況下其實(shí)就不太好,但是RabbitMQ并不知道這種情況,它依然很公平的進(jìn)行分發(fā)。 為了避免這種情況,我們可以設(shè)置參數(shù)channel.basicQos(1)。
// 消費(fèi)者設(shè)置不公平分發(fā)
int prefetchCount = 1;
channel.basicQos(prefetchCount);
prefetchCount默認(rèn)是0,代表是輪詢分發(fā)消息;1是不公平分發(fā),能這多勞;其他的是代表著預(yù)期值。
預(yù)取值
本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel上肯定不止只有一個消息另外來自消費(fèi)者的手動確認(rèn)本質(zhì)上也是異步的。因此這里就存在一個未確認(rèn)的消息緩沖區(qū),因此希望開發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認(rèn)消息問題。這個時候就可以通過使用basic.gos.方法設(shè)置“預(yù)取計(jì)數(shù)”值來完成的。該值定義通道上允許的未確認(rèn)消息的最大數(shù)量。一旦數(shù)量達(dá)到配置的數(shù)量,RabbitMQ將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認(rèn),例如,假設(shè)在通道上有未確認(rèn)的消息5、6、7,8,并且通道的預(yù)取計(jì)數(shù)設(shè)置為4,此時RabbitMQ.將不會在該通道上再傳遞任何消息,除非至少有一個未應(yīng)答的消息被ack。比方說tag=6這個消息剛剛被確認(rèn)ACK,RabbitMQ將會感知這個情況到并再發(fā)送一條消息。
發(fā)布確認(rèn)
原理: 生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會在將消息寫入磁盤之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號,此外 broker也可以設(shè)置basic.ack 的multiple域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理。 confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack.消息。
發(fā)布確認(rèn)策略: 開啟發(fā)布確認(rèn)的方法 發(fā)布確認(rèn)默認(rèn)是沒有開啟的,如果要開啟需要調(diào)用方法 confirmSelect,每當(dāng)你要想使用發(fā)布角認(rèn),都需要在channel上調(diào)用該方法
Channel channel = connection.createChannel();
channel.confirmSelect();
單個發(fā)布確認(rèn)
這是一種簡單的確認(rèn)方式,它是一種同步確認(rèn)發(fā)布的方式,也就是發(fā)布一個消息之后只有它被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個方法只有在消息被確認(rèn)的時候才返回,如果在指定時間范圍內(nèi)這個消息沒有被確認(rèn)那么它將拋出異常。 這種確認(rèn)方式有一個最大的缺點(diǎn)就是:發(fā)布速度特別的慢,因?yàn)槿绻麤]有確認(rèn)發(fā)布的消息潁會阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對于某些應(yīng)用程序來說這可能已經(jīng)足夠了。
//消息的個數(shù)
private static final int MESSAGE_COUNT=1000;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
publishMessageIndividually();//單個確認(rèn)發(fā)布1000條數(shù)據(jù)需要7396ms
}
public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
//聲明隊(duì)列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
//開啟時間
long begin = System.currentTimeMillis();
//批量發(fā)送消息
for (int i=0;i String message =i+""; channel.basicPublish("",queueName,null,message.getBytes()); //單個消息就馬上發(fā)布確認(rèn) boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息發(fā)送成功"); } } long time = System.currentTimeMillis() - begin; System.out.println("發(fā)布:"+time); } 批量發(fā)布確認(rèn) 上面那種方式非常慢,與單個等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時,不知道是哪個消息出現(xiàn)問題了,我們必須將整個批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布。 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // publishMessageIndividually();//單個確認(rèn)發(fā)布1000條數(shù)據(jù)需要7396ms publishMessageBatch();//批量確認(rèn)發(fā)布1000條數(shù)據(jù)需要1358ms } public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); //聲明隊(duì)列 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); //開啟發(fā)布確認(rèn) channel.confirmSelect(); //開啟時間 long begin = System.currentTimeMillis(); //批量確認(rèn)消息大小 int batchSize = 100; //批量發(fā)送消息 批量發(fā)布確認(rèn) for (int i=0;i String message =i+""; channel.basicPublish("",queueName,null,message.getBytes()); if (i % 100 == 0) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息發(fā)送成功"); } } } long time = System.currentTimeMillis() - begin; System.out.println("發(fā)布:"+time); } 異步發(fā)布確認(rèn) 異步確認(rèn)雖然編程邏輯比上兩個要復(fù)雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調(diào)函數(shù)來達(dá)到消息可靠性傳遞的,這個中間件也是通過函數(shù)回調(diào)來保證是否投遞成功,下面就讓我們來詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的。 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // publishMessageIndividually();//單個確認(rèn)發(fā)布1000條數(shù)據(jù)需要7396ms // publishMessageBatch();//批量確認(rèn)發(fā)布1000條數(shù)據(jù)需要1358ms publishMessageAsync();//異步確認(rèn)發(fā)布1000條數(shù)據(jù)需要537ms } public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); //聲明隊(duì)列 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); //開啟發(fā)布確認(rèn) channel.confirmSelect(); //開啟時間 long begin = System.currentTimeMillis(); //消息確認(rèn)成功回調(diào)函數(shù) ConfirmCallback ackCallback=(deliveryTag,multiple)->{ }; //消息確認(rèn)失敗回調(diào)函數(shù) ConfirmCallback nackCallback=(deliveryTag,multiple)->{ System.out.println("未確認(rèn)的消息:"+deliveryTag); }; //準(zhǔn)備消息的監(jiān)聽器 監(jiān)聽哪些消息成功了 哪些消息失敗了 channel.addConfirmListener(ackCallback,nackCallback);//異步通知 //批量發(fā)送消息 for (int i=0;i String message =i+""; channel.basicPublish("",queueName,null,message.getBytes()); } long time = System.currentTimeMillis() - begin; System.out.println("發(fā)布:"+time); } 如何處理異步未確認(rèn)信息? 最好的解決方案就是把未確認(rèn)的消息放到一個基于內(nèi)存的能被發(fā)布線程訪問的隊(duì)列,比如說用ConcurrentSkipListMap在confirm callbacks與發(fā)布線程之間進(jìn)行消息的傳遞. public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); //聲明隊(duì)列 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); //開啟發(fā)布確認(rèn) channel.confirmSelect(); //開啟時間 long begin = System.currentTimeMillis(); /** * 線程安全有序的哈希表,適用于高并發(fā) * 1.輕松地將序號和消息進(jìn)行關(guān)聯(lián) * 2.輕松地批量刪除,只要給到序號 * 3.支持高并發(fā) */ ConcurrentSkipListMap //消息確認(rèn)成功回調(diào)函數(shù) ConfirmCallback ackCallback=(deliveryTag,multiple)->{ //刪除到已經(jīng)確定發(fā)布的消息,刪除哈希表中的數(shù)據(jù) if(multiple){ //如果是批量發(fā)布確認(rèn),全部刪除 ConcurrentNavigableMap confirmed.clear(); }else { outstandingConfirms.remove(deliveryTag); } }; //消息確認(rèn)失敗回調(diào)函數(shù) ConfirmCallback nackCallback=(deliveryTag,multiple)->{ //打印為確認(rèn)的消息有哪些 String message = outstandingConfirms.get(deliveryTag); System.out.println("未確認(rèn)的消息:"+deliveryTag); }; //準(zhǔn)備消息的監(jiān)聽器 監(jiān)聽哪些消息成功了 哪些消息失敗了 channel.addConfirmListener(ackCallback,nackCallback);//異步通知 //批量發(fā)送消息 for (int i=0;i String message =i+""; channel.basicPublish("",queueName,null,message.getBytes()); //記錄所有要發(fā)送消息的總和 outstandingConfirms.put(channel.getNextPublishSeqNo(),message); } long time = System.currentTimeMillis() - begin; System.out.println("發(fā)布:"+time); } 交換機(jī) RabbitMQ消息傳遞模型的核心思想是:生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊(duì)列中。 相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡單,一方面它接收來自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說把他們到許多隊(duì)列中還是說應(yīng)該丟棄它們。這就的由交換機(jī)的類型來決定。 總共有以下幾個類型: 直接(direct)、主題(topic)、標(biāo)題(headers)、扇出(fanout) 無名交換機(jī): 在本教程的前面部分我們對exchange一無所知,但仍然能夠?qū)⑾l(fā)送到隊(duì)列。之前能實(shí)現(xiàn)的原因是因?yàn)槲覀兪褂玫氖悄J(rèn)交換,我們通過空字符串(“”)進(jìn)行標(biāo)識。 第一個參數(shù)是交換機(jī)的名稱??兆址硎灸J(rèn)或無名稱交換機(jī):消息能路由發(fā)送到隊(duì)列中其實(shí)是由routingKey(bindingkey)綁定key指定的,如果它存在的話 channel.basiPublish("","hello",null,message.getBytes()); Fanout Fanout這種類型非常簡單。正如從名稱中猜到的那樣,它是將接收到的所有消息廣播到它知道的所有隊(duì)列中。系統(tǒng)中默認(rèn)有些exchange類型. 消費(fèi)者 public class ReceiveLog01 { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //聲明一個交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //聲明一個隊(duì)列 臨時隊(duì)列 名稱隨機(jī) 當(dāng)消費(fèi)者斷開與隊(duì)列連接時隊(duì)列自動刪除 String queue = channel.queueDeclare().getQueue(); //交換機(jī)綁定隊(duì)列 channel.queueBind(queue,EXCHANGE_NAME,""); System.out.println("等待接收消息......"); //接收信息參數(shù) DeliverCallback deliverCallback =(consumerTag, message)->{ System.out.println("ReceiveLogs01控制臺打印接受到的消息:" + new String(message.getBody())); }; //聲明取消消息 CancelCallback cancelCallback= consumerTag ->{ System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; channel.basicConsume(queue,true,deliverCallback,cancelCallback); } } 生產(chǎn)者 public class EmitLog { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //聲明一個交換機(jī) // channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); Scanner scanner=new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生產(chǎn)者發(fā)出的消息:"+ message); } } } Direct 生產(chǎn)者 public class DirectLogs { // 交換機(jī)的名稱 public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生產(chǎn)者發(fā)出的消息:"+ message); } } } 消費(fèi)者 public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //聲明一個隊(duì)列 channel.queueDeclare("console",false,false,false,null); //綁定交換機(jī)與隊(duì)列 channel.queueBind("console",EXCHANGE_NAME,"info"); channel.queueBind("console",EXCHANGE_NAME,"warning"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect01控制臺打印接受到的消息:" + new String(message.getBody())); }; channel.basicConsume("console",true,deliverCallback,consumerTag -> {}); } } public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //聲明一個隊(duì)列 channel.queueDeclare("disk",false,false,false,null); //綁定交換機(jī)與隊(duì)列 channel.queueBind("disk",EXCHANGE_NAME,"error"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect02控制臺打印接受到的消息:" + new String(message.getBody())); }; channel.basicConsume("disk",true,deliverCallback,consumerTag -> {}); } } Topics 發(fā)送到類型是topic交換機(jī)的消息的routing_key不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點(diǎn)號分隔開。這些單詞可以是任意單詞,比如說: “stock.usd.nyse” , “nyse.vmw”,"quick.orange.rabbit"這種類型的。當(dāng)然這個單詞列表最多不能超過255個字節(jié)。 在這個規(guī)則列表中,其中有兩個替換符: *可以代替一個單詞 #可以代替零個或多個單詞 下圖綁定關(guān)系如下: Q1綁定的是:中間帶orange的三個單詞的字符串:*.orange.* Q2綁定的是:最后一個單詞是rabbit的單個單詞:*.*.rabbit,第一個單詞是lazy的多個單詞:lazy.# 數(shù)據(jù)接收情況如下: quick.orange.rabbit:被隊(duì)列Q1Q2接收到 quick.orange.fox:被隊(duì)列Q1接收到 lazy.brown.fox:被隊(duì)列Q2接收到 lazy.pink.rabbit:雖然滿足隊(duì)列Q2的兩個綁定但是只會被接收一次 quick.orange.male.rabbit:四個單詞不匹配任何綁定會被丟棄 結(jié)論: 當(dāng)一個隊(duì)列綁定鍵是#,那么這個隊(duì)列將接收所有數(shù)據(jù),就有點(diǎn)像fanout了; 如果隊(duì)列綁定鍵當(dāng)中沒有#和*出現(xiàn),那么該隊(duì)列綁定類型就是direct了。 生產(chǎn)者 public class EmitLogTopic { //交換機(jī)的名稱 public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); HashMap map.put("quick.orange.rabbit","被隊(duì)列Q1Q2接收到"); map.put("quick.orange.fox","被隊(duì)列Q1接收到"); map.put("lazy.brown.fox","被隊(duì)列Q2接收到 "); map.put("lazy.pink.rabbit","雖然滿足隊(duì)列Q2的兩個綁定但是只會被接收一次"); map.put("quick.orange.male.rabbit","四個單詞不匹配任何綁定會被丟棄"); for (Map.Entry String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生產(chǎn)者發(fā)送消息:"+ message ); } } } 消費(fèi)者 /* * 聲明主題交換機(jī)及相關(guān)隊(duì)列 * 消費(fèi)者C1 * */ public class ReceiveLogsTopic01 { //交換機(jī)名稱 public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //聲明交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //聲明隊(duì)列 String queueName = "Q1"; channel.queueDeclare(queueName,false,false,false,null); //隊(duì)列捆綁 channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String(message.getBody())); System.out.println("接收隊(duì)列:"+ queueName + "綁定鍵:" + message.getEnvelope().getRoutingKey()); }; //接收消息 channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {}); } } /* * 聲明主題交換機(jī)及相關(guān)隊(duì)列 * 消費(fèi)者C2 * */ public class ReceiveLogsTopic02 { //交換機(jī)名稱 public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //聲明交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //聲明隊(duì)列 String queueName = "Q2"; channel.queueDeclare(queueName,false,false,false,null); //隊(duì)列捆綁 channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME,"*lazy.#"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String(message.getBody())); System.out.println("接收隊(duì)列:"+ queueName + "綁定鍵:" + message.getEnvelope().getRoutingKey()); }; //接收消息 channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {}); } } 死信隊(duì)列 死信,顧名思義就是無法被消費(fèi)的消息,字面意思可以這樣理解,一般來說,producer將消息投遞到 broker或者直接到queue里了,consumer 從 queue取出消息進(jìn)行消費(fèi),但某些時候由于特定的原因?qū)е聁ueue中的某些消息無法被消費(fèi),這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。 應(yīng)用場景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到RabbitMQ的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時,將消息投入死信隊(duì)列中.還有比如說:用戶在商城下單成功并點(diǎn)擊去支付后在指定時間未支付時自動失效。 來源: 消息TTL過期; 隊(duì)列達(dá)到最大長度(隊(duì)列滿了,無法再添加數(shù)據(jù)到mq中); 消息被拒絕(basic.reject或basic.nack)并且requeue=false。 消息TTL過期 生產(chǎn)者 /* * 死信隊(duì)列之生產(chǎn)者代碼 * * */ public class Producer { //普通交換機(jī)的名稱 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //死信消息,設(shè)置TTL時間 單位是ms 10000ms是10s AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 10; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8)); } } } 消費(fèi)者 /* * 死信隊(duì)列實(shí)戰(zhàn) * 消費(fèi)者01 * */ public class Consumer01 { //普通交換機(jī)名稱 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機(jī)名稱 public static final String DEAD_EXCHANGE = "dead_exchange"; //普通隊(duì)列名稱 public static final String NORMAL_QUEUE = "normal_queue"; //死信隊(duì)列名稱 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //聲明死信和普通的交換機(jī)類型為direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //聲明普通隊(duì)列 HashMap //過期時間 也可以不設(shè)置 由生產(chǎn)者設(shè)置過期時間,更靈活 //arguments.put("x-message-ttl",10000); //正常隊(duì)列設(shè)置死信隊(duì)列 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //設(shè)置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //聲明死信和普通隊(duì)列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //綁定普通的交換機(jī)與普通的隊(duì)列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); //綁定死信的交換機(jī)與死信的隊(duì)列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println("Consumer01接收的消息是:" + new String(message.getBody())); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{}); } } /* * 死信隊(duì)列實(shí)戰(zhàn) * 消費(fèi)者02 * */ public class Consumer02 { //死信隊(duì)列名稱 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println("Consumer02接收的消息是:" + new String(message.getBody())); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{}); } } 效果:啟動生產(chǎn)者后,10條消息被傳送到NORMAL_QUEUE,然后被傳送到DEAD_QUEUE,此時啟動消費(fèi)者02,消息全被接收。 隊(duì)列達(dá)到最大長度 生產(chǎn)者 public class Producer { //普通交換機(jī)名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //死信消息,設(shè)置TTL時間 單位是ms // AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 10; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8)); } } } 消費(fèi)者 public class Consumer01 { //普通交換機(jī)名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機(jī)名稱 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通隊(duì)列名稱 public static final String NORMAL_QUEUE = "normal_queue"; //死信隊(duì)列名稱 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //聲明兩個交換機(jī)類型為direct channel.exchangeDeclare(NORMAL_EXCHANGE,"direct"); channel.exchangeDeclare(DEAD_EXCHANGE,"direct"); //聲明普通隊(duì)列 HashMap //過期時間 也可以不設(shè)置 由生產(chǎn)者設(shè)置過期時間 // arguments.put("x-message-ttl",10000); //正常隊(duì)列設(shè)置死信隊(duì)列 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //設(shè)置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //設(shè)置正常隊(duì)列的長度的限制 arguments.put("x-max-length",6); channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //綁定普通交換機(jī)和普通的隊(duì)列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); //綁定死信交換機(jī)和死信隊(duì)列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接收消息"); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println("Consumer01接收的消息:"+ new String(message.getBody())); }; channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag -> {}); } } 消息被拒絕 生產(chǎn)者代碼不需要改變 消費(fèi)者 DeliverCallback deliverCallback=(consumerTag, message)->{ String msg = new String(message.getBody()); if (msg.equals("info6")){ //拒絕消息info6 System.out.println("Consumer01接收到的消息是:"+msg +":此消息被C1拒絕了"); channel.basicReject(message.getEnvelope().getDeliveryTag(),false); }else { System.out.println("Consumer01接收的消息:"+msg ); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } }; //開啟手動應(yīng)答 channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag -> {}); 延遲隊(duì)列 延時隊(duì)列,隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時屬性上,延時隊(duì)列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊(duì)列就是用來存放需要在指定時間被處理的元素的隊(duì)列。 使用場景: 訂單在十分鐘之內(nèi)未支付則自動取消。 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。 用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。 預(yù)定會議后,需要在預(yù)定的時間點(diǎn)前十分鐘通知各個與會人員參加會議。 這些場景都有一個特點(diǎn),需要在某個事件發(fā)生之后或者之前的指定時間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;看起來似乎使用定時任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對于“如果賬單一周內(nèi)未支付則進(jìn)行自動結(jié)算”這樣的需求,如果對于時間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個可行的方案。但對于數(shù)據(jù)量比較大,并且時效性較強(qiáng)的場景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會有很多,活動期間甚至?xí)_(dá)到百萬甚至千萬級別,對這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無法完成所有訂單的檢查,同時會給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下。 整合springboot spring.rabbitmq.host=192.168.24.8 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123 @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(){ return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(){ return new ApiInfoBuilder() .title("rabbitmq接口文檔") .description("本文檔描述了rabbitmq微服務(wù)接口定義") .version("1.0") .contact(new Contact("enjoy6288","http://atguigu.com","123456@qq.com")) .build(); } } 隊(duì)列實(shí)現(xiàn) 配置 /* * TTL隊(duì)列 配置文件類代碼 * * */ @Configuration public class TtlQueueConfig { //普通交換機(jī)的名稱 public static final String X_EXCHANGE = "X"; //死信交換機(jī)的名稱 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通隊(duì)列的名稱 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信隊(duì)列的名稱 public static final String DEAD_LATTER_QUEUE = "QD"; //聲明xExchange @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } //聲明yExchange @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //聲明隊(duì)列 @Bean("queueA") public Queue queueA(){ Map //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //設(shè)置死信Routing-key arguments.put("x-dead-letter-routing-key","YD"); //設(shè)置TTL 單位是ms arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } //聲明普通隊(duì)列 TTL為40s @Bean("queueB") public Queue queueB(){ Map //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //設(shè)置死信Routing-key arguments.put("x-dead-letter-routing-key","YD"); //設(shè)置TTL 單位是ms arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } //死信隊(duì)列 @Bean("queueD") public Queue queueD(){ return QueueBuilder.durable(DEAD_LATTER_QUEUE).build(); } //綁定 @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //綁定 @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //綁定 @Bean public Binding queueDBindingX(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } } 消費(fèi)者代碼 /* * 隊(duì)列TTL 消費(fèi)者 * */ @Slf4j @Component public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},收到死信隊(duì)列的消息:{}",new Date().toString(),msg); } } 生產(chǎn)者代碼 /* * 發(fā)送延遲消息 * */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; //開始發(fā)消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個TTL隊(duì)列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X","XA","消息來自TTL為10s的隊(duì)列:" + message); rabbitTemplate.convertAndSend("X","XB","消息來自TTL為40s的隊(duì)列:" + message); } } 隊(duì)列優(yōu)化 第一條消息在10S后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在40S之后變成了死信消息,然后被消費(fèi)掉,這樣一個延時隊(duì)列就打造完成了。 不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊(duì)列,這里只有10S和40S兩個時間選項(xiàng),如果需要一個小時后處理,那么就需要增加TTL為一個小時的隊(duì)列,如果是預(yù)定會議室然后提前通知這樣的場景,豈不是要增加無數(shù)個隊(duì)列才能滿足需求? 配置文件類 /* * TTL隊(duì)列 配置文件類代碼 * * */ @Configuration public class TtlQueueConfig { //普通交換機(jī)的名稱 public static final String X_EXCHANGE = "X"; //死信交換機(jī)的名稱 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通隊(duì)列的名稱 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String QUEUE_C = "QC"; //死信隊(duì)列的名稱 public static final String DEAD_LATTER_QUEUE = "QD"; //聲明QC隊(duì)列 @Bean("queueC") public Queue queueC(){ Map //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //設(shè)置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable().withArguments(arguments).build(); } @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } //聲明xExchange @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } //聲明yExchange @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //聲明隊(duì)列 @Bean("queueA") public Queue queueA(){ Map //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //設(shè)置死信Routing-key arguments.put("x-dead-letter-routing-key","YD"); //設(shè)置TTL 單位是ms arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } //聲明普通隊(duì)列 TTL為40s @Bean("queueB") public Queue queueB(){ Map //設(shè)置死信交換機(jī) arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //設(shè)置死信Routing-key arguments.put("x-dead-letter-routing-key","YD"); //設(shè)置TTL 單位是ms arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } //死信隊(duì)列 @Bean("queueD") public Queue queueD(){ return QueueBuilder.durable(DEAD_LATTER_QUEUE).build(); } //綁定 @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //綁定 @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //綁定 @Bean public Binding queueDBindingX(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } } /* * 發(fā)送延遲消息 * */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; //開始發(fā)消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個TTL隊(duì)列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X","XA","消息來自TTL為10s的隊(duì)列:" + message); rabbitTemplate.convertAndSend("X","XB","消息來自TTL為40s的隊(duì)列:" + message); } //開始發(fā)消息 @GetMapping("sendExpirationMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒TTL信息給隊(duì)列QC:{}", new Date().toString(),ttlTime,message); rabbitTemplate.convertAndSend("X","XC",message,msg->{ //發(fā)送消息的時候 延遲時長 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } } 消費(fèi)者代碼無需改變 插件實(shí)現(xiàn)延遲隊(duì)列 下載延遲插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 將延遲插件放到RabbitMQ的插件目錄下: 安裝插件并重啟服務(wù) rabbitmq-plugins enable rabbitmq_delayed_message_exchange systemctl restart rabbitmq-server 配置文件類 @Configuration public class DelayedQueueConfig { //隊(duì)列 public static final String DELAYED_QUEUE_NAME = "delayed.queue"; //交換機(jī) public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; //routingKey public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; //聲明隊(duì)列 @Bean public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE_NAME); }; //聲明交換機(jī) @Bean public CustomExchange delayedExchange(){ Map arguments.put("x-delayed-type","direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message", true,false,arguments); } //綁定 @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } } // 消費(fèi)者代碼 基于插件的延遲消息 @Slf4j @Component public class DelayQueueConsumer { //監(jiān)聽消息 @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void recieveDelayQueue(Message message){ String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},收到延遲隊(duì)列的消息:{}",new Date().toString(),msg); } } /* * 發(fā)送延遲消息 * */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; //開始發(fā)消息 基于插件的 消息 及 延遲的時間 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){ log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒信息給延遲隊(duì)列delayed.queue:{}", new Date().toString(),delayTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME ,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> { // 發(fā)送消息的時候 延遲時長 單位ms msg.getMessageProperties().setDelay(delayTime); return msg; }); } } 總結(jié): 延時隊(duì)列在需要延時處理的場景下非常有用,使用RabbitMQ來實(shí)現(xiàn)延時隊(duì)列可以很好的利用; RabbitMQ的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問題,不會因?yàn)閱蝹€節(jié)點(diǎn)掛掉導(dǎo)致延時隊(duì)列不可用或者消息丟失; 當(dāng)然,延時隊(duì)列還有很多其它選擇,比如利用Java的DelayQueue,利用Redis的zsset,利用Quartz或者利用kafka的時間輪,這些方式各有特點(diǎn),看需要適用的場景。 發(fā)布確認(rèn)高級 在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 rabbitmq重啟,在RabbitMQ重啟期間生產(chǎn)者消息投遞失敗,導(dǎo)致消息丟失,需要手動處理和恢復(fù)。于是,我們開始思考,如何才能進(jìn)行RabbitMQ的消息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ集群不可用的時候,無法投遞的消息該如何處理呢? 配置文件 配置文件及消息發(fā)送方 NONE:禁用發(fā)布確認(rèn)模式,是默認(rèn)值 CORRELATED:發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法 SIMPLE:經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會觸發(fā)回調(diào)方法, 其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms,或 waitForConfirmsOrDie方法 等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯, 要注意的點(diǎn)是waitForConfirmsOrDiea方法如果返回false則會關(guān)閉channel, 則接下來無法發(fā)送消息到broker spring.rabbitmq.publisher-confirm-type = correlated // 配置類:發(fā)布確認(rèn)(高級) @Configuration public class ConfirmConfig { //交換機(jī) public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; //隊(duì)列 public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; //RoutingKey public static final String CONFIRM_routing_key = "key1"; //聲明交換機(jī) @Bean public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //綁定 @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange")DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_routing_key); } } 生產(chǎn)者 // 開始發(fā)消息 測試確認(rèn) @RestController @Slf4j @RequestMapping("/confirm") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //發(fā)消息 @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME ,ConfirmConfig.CONFIRM_routing_key ,message,correlationData); log.info("發(fā)送消息內(nèi)容:{}",message); } } 消費(fèi)者 // 接收消息 @Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage(Message message){ String msg = new String(message.getBody()); log.info("接受到的隊(duì)列confirm.queue消息:{}",msg); } } 回調(diào)接口 @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ //注入 rabbitTemplate.setConfirmCallback(this); } /* * 交換機(jī)確認(rèn)回調(diào)方法,發(fā)消息后,交換機(jī)接收到了就回調(diào) * 1.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息 * 1.2 b:交換機(jī)收到消息,為true * 1.3 s:失敗原因,成功為null * * 發(fā)消息,交換機(jī)接受失敗,也回調(diào) * 2.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息 * 2.2 b:交換機(jī)沒收到消息,為false * 2.3 s:失敗的原因 * * */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData!=null ? correlationData.getId():""; if (b){ log.info("交換機(jī)已經(jīng)收到ID為:{}的信息",id); }else { log.info("交換機(jī)還未收到ID為:{}的消息,由于原因:{}",id,s); } } } 回退消息 在僅開啟了生產(chǎn)者確認(rèn)機(jī)制的情況下,交換機(jī)接收到消息后,會直接給消息生產(chǎn)者發(fā)送確認(rèn)消息,如果發(fā)現(xiàn)該消息不可路由,那么消息會被直接丟棄,此時生產(chǎn)者是不知道消息被丟棄這個事件的。那么如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過設(shè)置mandatory參數(shù)可以在當(dāng)消息傳遞過程中不可達(dá)目的地時將消息返回給生產(chǎn)者。 配置類 spring.rabbitmq.publisher-returns=true 回退接口 @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ //注入 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /* * 交換機(jī)確認(rèn)回調(diào)方法,發(fā)消息后,交換機(jī)接收到了就回調(diào) * 1.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息 * 1.2 b:交換機(jī)收到消息,為true * 1.3 s:失敗原因,成功為null * * 發(fā)消息,交換機(jī)接受失敗,也回調(diào) * 2.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息 * 2.2 b:交換機(jī)沒收到消息,為false * 2.3 s:失敗的原因 * * */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData!=null ? correlationData.getId():""; if (b){ log.info("交換機(jī)已經(jīng)收到ID為:{}的信息",id); }else { log.info("交換機(jī)還未收到ID為:{}的消息,由于原因:{}",id,s); } } //可以在當(dāng)消息傳遞過程中不可達(dá)目的的時將消息返回給生產(chǎn)者 //只有不可達(dá)目的地的時候才可回退 @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息{},被交換機(jī){}退回,退回的原因:{},路由Key:{}", new String(returnedMessage.getMessage().getBody()) , returnedMessage.getExchange() , returnedMessage.getReplyText() , returnedMessage.getRoutingKey()); } } 發(fā)布測試 // 開始發(fā)消息 測試確認(rèn) @RestController @Slf4j @RequestMapping("/confirm") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //發(fā)消息 @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME ,ConfirmConfig.CONFIRM_routing_key ,message+"key1",correlationData); log.info("發(fā)送消息內(nèi)容:{}",message+"key1"); CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME ,ConfirmConfig.CONFIRM_routing_key+"2" ,message+"key12",correlationData2); log.info("發(fā)送消息內(nèi)容:{}",message+"key12"); } } 柚子快報激活碼778899分享:java RabbitMQ筆記 精彩文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。