欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:java RabbitMQ筆記

柚子快報激活碼778899分享:java RabbitMQ筆記

http://yzkb.51969.com/

文章目錄

四大核心概念下載安裝erlang和rabbitMQHellowWorld創(chuàng)建開發(fā)環(huán)境生產(chǎn)者代碼消費(fèi)者代碼

工作隊(duì)列實(shí)現(xiàn)工作隊(duì)列

消息應(yīng)答自動應(yīng)答手動應(yīng)答(建議)

持久化隊(duì)列持久化消息持久化不公平分發(fā)

預(yù)取值發(fā)布確認(rèn)單個發(fā)布確認(rèn)批量發(fā)布確認(rèn)異步發(fā)布確認(rèn)

交換機(jī)FanoutDirectTopics

死信隊(duì)列消息TTL過期隊(duì)列達(dá)到最大長度消息被拒絕

延遲隊(duì)列整合springboot隊(duì)列實(shí)現(xiàn)隊(duì)列優(yōu)化插件實(shí)現(xiàn)延遲隊(duì)列

發(fā)布確認(rèn)高級回退消息

四大核心概念

下載安裝erlang和rabbitMQ

1.安裝Erlang: https://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.6/erlang-23.2.6-1.el7.x86_64.rpm

rpm -ivh erlang-23.2.6-1.el7.x86_64.rpm

#測試

erl -version

2.安裝RabbitMQ https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.12/rabbitmq-server-3.8.12-1.el7.noarch.rpm

#先安裝依賴socat

yum install socat -y

rpm -ivh rabbitmq-server-3.8.12-1.el7.noarch.rpm

3.命令:

添加開機(jī)啟動RabbitMQ服務(wù)

chkconfig rabbitmq-server on

啟動服務(wù)

/sbin/service rabbitmq-server start

查看服務(wù)狀態(tài)

/sbin/service rabbitmq-server status

#停止服務(wù)

/sbin/service rabbitmq-server stop

#安裝可視化管理 插件

rabbitmq-plugins enable rabbitmq_management

4.重啟rabbitmq服務(wù),然后在windows客戶端進(jìn)入192.168.163.128(Linux的ip地址):15672,需要開啟端口號,順利進(jìn)入!

開啟防火墻

firewall-cmd --permanent --add-port=15672/tcp

重啟生效

firewall-cmd --reload

5.添加用戶并且設(shè)置權(quán)限

創(chuàng)建賬號

rabbitmqctl add_user admin 123

設(shè)置用戶角色

rabitmqctl set_user_tags admin administrator

設(shè)置用戶權(quán)限 格式:rabitmqctl set_permissions [-p ]

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" #用戶admin具有/vhost1這個virtual host中所有資源的配置、寫、讀權(quán)限

查看用戶列表

rabbitmqctl list_users

HellowWorld

創(chuàng)建開發(fā)環(huán)境

創(chuàng)建maven工程rabbitmq-hello,pom.xml:

com.rabbitmq

amqp-client

5.8.0

commons-io

commons-io

2.6

生產(chǎn)者代碼

注意:如果報connection error,考慮是端口號沒有開放的問題。連接服務(wù),請求的端口號是5672,而可視化工具服務(wù),請求的是15672,因此需要開啟5672跟15672兩個端口,測試連接成功!

public class Producer {

// 隊(duì)列名稱

public static final String QUEUE_NAME="hello";

// 發(fā)消息

public static void main(String[] args) throws IOException, TimeoutException {

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

// 工廠IP連接RabbitMQ的隊(duì)列

factory.setHost("192.168.163.128");

// 用戶名

factory.setUsername("admin");

// 密碼

factory.setPassword("123");

factory.setPort(5672);

// 創(chuàng)建連接

Connection connection = factory.newConnection();

// 獲取信道

Channel channel = connection.createChannel();

/*

* 生成一個隊(duì)列

* 參數(shù)1:隊(duì)列名稱

* 參數(shù)2:隊(duì)列里面的消息是否持久化,默認(rèn)情況下,消息存儲在內(nèi)存中

* 參數(shù)3:該隊(duì)列是否只供一個消費(fèi)者進(jìn)行消費(fèi),是否進(jìn)行消費(fèi)共享,true可以多個消費(fèi)者消費(fèi),

* false只能一個消費(fèi)者消費(fèi)

* 參數(shù)4:是否自動刪除:最后一個消費(fèi)者斷開連接之后,該隊(duì)列是否自動刪除,true則自動刪除,

* false不自動刪除

* 參數(shù)5:其他參數(shù)

* */

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// 發(fā)消息

String message = "hello world";

/*

* 發(fā)送一個消息

* 參數(shù)1:發(fā)送到哪個交換機(jī)

* 參數(shù)2:路由的key值是那個,本次是隊(duì)列的名稱

* 參數(shù)3:其他參數(shù)信息

* 參數(shù)4:發(fā)送消息的消息體

* */

channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("消息發(fā)送完畢!");

}

}

消費(fèi)者代碼

public class Consumer {

// 隊(duì)列名稱

public static final String QUEUE_NAME = "hello";

// 接受消息

public static void main(String[] args) throws IOException, TimeoutException {

// 創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.163.128");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 聲明 接受消息

DeliverCallback deliverCallback = (consumerTag,message) -> {

System.out.println(new String(message.getBody()));

};

// 聲明 取消消息

CancelCallback cancelCallback = consumer -> {

System.out.println("消息消費(fèi)被中斷");

};

/*

* 消費(fèi)者接收消息

* 參數(shù)1:表示消費(fèi)哪個UI列

* 參數(shù)2:消費(fèi)成功之后,是否需要自動應(yīng)答,true表示自動應(yīng)答,false表示手動應(yīng)答

* 參數(shù)3:消費(fèi)者成功消費(fèi)的回調(diào)

* 參數(shù)4:消費(fèi)者取消消費(fèi)的回調(diào)

*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

工作隊(duì)列

工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個工作線程時,這些工作線程將一起處理這些任務(wù)。

實(shí)現(xiàn)工作隊(duì)列

抽取連接工廠工具類

/*

* 此類為連接工廠創(chuàng)建信道的工具類

* */

public class RabbitMqUtils {

// 得到一個連接的channel

public static Channel getChannel() throws IOException, TimeoutException {

// 創(chuàng)建一個連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.24.8");

factory.setUsername("admin");

factory.setPassword("123");

Connection connection = factory.newConnection();

com.rabbitmq.client.Channel channel = connection.createChannel();

return channel;

}

}

工作線程代碼

public class Worker01 {

// 隊(duì)列名稱

public static final String QUEUE_NAME = "hello";

// 接受消息

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

// 接受消息參數(shù)

DeliverCallback deliverCallback = (consumerTag,message) -> {

System.out.println("接受到的消息:"+message.getBody());

};

// 取消消費(fèi)參數(shù)

CancelCallback cancelCallback = consumerTag -> {

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

// 消息的接受

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

同時開啟兩個工作線程:

生產(chǎn)者代碼

public class Task01 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

//聲明一個隊(duì)列

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//從控制臺輸入消息

Scanner scanner=new Scanner(System.in);

while (scanner.hasNext()){

String message = scanner.next();

channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("消息發(fā)送完成:"+message);

}

}

}

消息應(yīng)答

消費(fèi)者完成一個任務(wù)可能需要一段時間,如果其中一個消費(fèi)者處理一個長的任務(wù)并僅只完成了部分突然它掛掉了,會發(fā)生什么情況。RabbitMQ一旦向消費(fèi)者傳遞了一條消息,便立即將該消息標(biāo)記為刪除。在這種情況下,突然有個消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù)發(fā)送給該消費(fèi)這的消息,國為它無法接收到。 為了保證消息在發(fā)送過程中不丟失,RabbitMQ引入消息應(yīng)答機(jī)制,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后,告訴RabbitMQ它已經(jīng)處理了,RabbitMQ可以把該消息刪除了。

自動應(yīng)答

消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者channel關(guān)閉,那么消息就丟失了,當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過載的消息,沒有對傳遞的消息數(shù)量進(jìn)行限制,當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來不及處理的消息,導(dǎo)致這些消息的積壓,最終使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并以某種速率能夠處理這些消息的情況下使用。

手動應(yīng)答(建議)

消息應(yīng)答的方法

Channel.basicAck(用于肯定確認(rèn)),RabbitMQ已知道該消息并且成功處理,可以將其丟棄

Channel.basicNack(用于否定確認(rèn))

Channel.basicReject(用于否定確認(rèn)),與Channel.basicNack相比少了一個參數(shù)Multiple,不處理該消息了,直接拒絕,可以將其丟棄了。

Multiple的解釋(一般為false) 手動應(yīng)答的好處是可以批量應(yīng)答并且減少網(wǎng)絡(luò)擁堵

multiple的true和false是不同的意思: true表示批量應(yīng)答channel上未應(yīng)答的消息,比如channel上有傳送tag的消息5,6,7,8,,當(dāng)前tag是8,那么此時5-8的這些還未應(yīng)答的消息就會被確認(rèn)收到消息應(yīng)答. false同上面相比只會應(yīng)答tag=8的消息,5,6,7這三個消息依然不會被確認(rèn)收到消息應(yīng)答.

消息自動重新入隊(duì) 如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或TCP連接丟失),導(dǎo)致消息未發(fā)送ACK確認(rèn),RabbitMQ將了解到消息未完全處理,并將對其重新排隊(duì)。如果此時其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個消費(fèi)者。這樣,即使某個消費(fèi)者偶爾死亡,也可以確保不會丟失任何消息。

生產(chǎn)者代碼:

/*

* 消息在手動應(yīng)答時是不丟失、放回隊(duì)列中重新消費(fèi)

* */

public class Task2 {

// 隊(duì)列名稱

public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

// 聲明隊(duì)列

channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);

Scanner scanner = new Scanner(System.in);

while (scanner.hasNext()){

String message = scanner.next();

channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("生產(chǎn)者發(fā)出消息:"+message);

}

}

}

消費(fèi)者1代碼

/*

* 消費(fèi)者

* */

public class Worker02 {

// 隊(duì)列名稱

public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("C1等待接受消息處理時間較短");

DeliverCallback deliverCallback = (consumerTag,message) -> {

//沉睡1s

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("接受到的消息是:"+new String(message.getBody()));

//進(jìn)行手動應(yīng)答

/*

* 參數(shù)1:消息的標(biāo)記 tag

* 參數(shù)2:是否批量應(yīng)答,false:不批量應(yīng)答 true:批量

* */

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

};

// 采用手動應(yīng)答

boolean autoAck = false;

channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag) -> {

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

});

}

}

消費(fèi)者2代碼

/*

* 消費(fèi)者

* */

public class Worker03 {

// 隊(duì)列名稱

public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("C2等待接受消息處理時間較長");

DeliverCallback deliverCallback = (consumerTag,message) -> {

//沉睡30s

try {

Thread.sleep(30000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("接受到的消息是:"+new String(message.getBody()));

//進(jìn)行手動應(yīng)答

/*

* 參數(shù)1:消息的標(biāo)記 tag

* 參數(shù)2:是否批量應(yīng)答,false:不批量應(yīng)答 true:批量

* */

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

};

// 采用手動應(yīng)答

boolean autoAck = false;

channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag) -> {

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

});

}

}

持久化

剛剛我們已經(jīng)看到了如何處理任務(wù)不丟失的情況,但是如何保障當(dāng)RabbitMQ服務(wù)停掉以后消息生產(chǎn)者發(fā)送過來的消息不丟失。默認(rèn)情況下RabbitMQ退出或由于某種原因崩潰時,它忽視隊(duì)列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久化。

隊(duì)列持久化

之前我們創(chuàng)建的隊(duì)列都是非持久化的,rabbitmq如果重啟的話,該隊(duì)列就會被刪除掉,如果要隊(duì)列實(shí)現(xiàn)持久化需要在聲明隊(duì)列的時候把durable參數(shù)設(shè)置為持久化。

// 聲明隊(duì)列

// 持久化 需要讓Queue持久化

boolean durable = true;

channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);

需要注意的就是如果之前聲明的隊(duì)列不是持久化的,需要把原先隊(duì)列先刪除或者重新創(chuàng)建一個持久化的隊(duì)列,不然就會出現(xiàn)錯誤。

消息持久化

要想讓消息實(shí)現(xiàn)持久化需要在消息生產(chǎn)者修改代碼,MessageProperties,PERSISTENT_TEXT_PLAIN添加這個屬性。 將消息標(biāo)記為持久化并不能完全保證不會丟失消息。盡管它告訴RabbitMQ將消息保存到磁盤,但是這里依然存在當(dāng)消息剛準(zhǔn)備存儲在磁盤的時候但是還沒有存儲完,消息還在緩存的一個間隔點(diǎn)。此時并沒有真正寫入磁盤。持久性保證并不強(qiáng),但是對于我們的簡單任務(wù)隊(duì)列而言,這已經(jīng)綽綽有余了。

//設(shè)置生產(chǎn)者發(fā)送消息為持久化消息(要求保存到磁盤上)

channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN

,message.getBytes(StandardCharsets.UTF_8));

System.out.println("生產(chǎn)者發(fā)出消息:"+message);

不公平分發(fā)

在最開始的時候我們學(xué)習(xí)到RabbitMQ.分發(fā)消息采用的輪訓(xùn)分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費(fèi)者在處理任務(wù),其中有個消費(fèi)者1處理任務(wù)的速度非常快,而另外一個消費(fèi)者2處理速度卻很慢,這個時候我們還是采用輪訓(xùn)分發(fā)的化就會到這處理速度快的這個消費(fèi)者很大一部分時間處于空閑狀態(tài),而處理慢的那個消費(fèi)者一直在干活,這種分配方式在這種情況下其實(shí)就不太好,但是RabbitMQ并不知道這種情況,它依然很公平的進(jìn)行分發(fā)。 為了避免這種情況,我們可以設(shè)置參數(shù)channel.basicQos(1)。

// 消費(fèi)者設(shè)置不公平分發(fā)

int prefetchCount = 1;

channel.basicQos(prefetchCount);

prefetchCount默認(rèn)是0,代表是輪詢分發(fā)消息;1是不公平分發(fā),能這多勞;其他的是代表著預(yù)期值。

預(yù)取值

本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel上肯定不止只有一個消息另外來自消費(fèi)者的手動確認(rèn)本質(zhì)上也是異步的。因此這里就存在一個未確認(rèn)的消息緩沖區(qū),因此希望開發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認(rèn)消息問題。這個時候就可以通過使用basic.gos.方法設(shè)置“預(yù)取計(jì)數(shù)”值來完成的。該值定義通道上允許的未確認(rèn)消息的最大數(shù)量。一旦數(shù)量達(dá)到配置的數(shù)量,RabbitMQ將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認(rèn),例如,假設(shè)在通道上有未確認(rèn)的消息5、6、7,8,并且通道的預(yù)取計(jì)數(shù)設(shè)置為4,此時RabbitMQ.將不會在該通道上再傳遞任何消息,除非至少有一個未應(yīng)答的消息被ack。比方說tag=6這個消息剛剛被確認(rèn)ACK,RabbitMQ將會感知這個情況到并再發(fā)送一條消息。

發(fā)布確認(rèn)

原理: 生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會在將消息寫入磁盤之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號,此外 broker也可以設(shè)置basic.ack 的multiple域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理。 confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack.消息。

發(fā)布確認(rèn)策略: 開啟發(fā)布確認(rèn)的方法 發(fā)布確認(rèn)默認(rèn)是沒有開啟的,如果要開啟需要調(diào)用方法 confirmSelect,每當(dāng)你要想使用發(fā)布角認(rèn),都需要在channel上調(diào)用該方法

Channel channel = connection.createChannel();

channel.confirmSelect();

單個發(fā)布確認(rèn)

這是一種簡單的確認(rèn)方式,它是一種同步確認(rèn)發(fā)布的方式,也就是發(fā)布一個消息之后只有它被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個方法只有在消息被確認(rèn)的時候才返回,如果在指定時間范圍內(nèi)這個消息沒有被確認(rèn)那么它將拋出異常。 這種確認(rèn)方式有一個最大的缺點(diǎn)就是:發(fā)布速度特別的慢,因?yàn)槿绻麤]有確認(rèn)發(fā)布的消息潁會阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對于某些應(yīng)用程序來說這可能已經(jīng)足夠了。

//消息的個數(shù)

private static final int MESSAGE_COUNT=1000;

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

publishMessageIndividually();//單個確認(rèn)發(fā)布1000條數(shù)據(jù)需要7396ms

}

public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {

Channel channel = RabbitMqUtils.getChannel();

//聲明隊(duì)列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName,true,false,false,null);

//開啟發(fā)布確認(rèn)

channel.confirmSelect();

//開啟時間

long begin = System.currentTimeMillis();

//批量發(fā)送消息

for (int i=0;i

String message =i+"";

channel.basicPublish("",queueName,null,message.getBytes());

//單個消息就馬上發(fā)布確認(rèn)

boolean flag = channel.waitForConfirms();

if (flag) {

System.out.println("消息發(fā)送成功");

}

}

long time = System.currentTimeMillis() - begin;

System.out.println("發(fā)布:"+time);

}

批量發(fā)布確認(rèn)

上面那種方式非常慢,與單個等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時,不知道是哪個消息出現(xiàn)問題了,我們必須將整個批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布。

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

// publishMessageIndividually();//單個確認(rèn)發(fā)布1000條數(shù)據(jù)需要7396ms

publishMessageBatch();//批量確認(rèn)發(fā)布1000條數(shù)據(jù)需要1358ms

}

public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {

Channel channel = RabbitMqUtils.getChannel();

//聲明隊(duì)列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName,true,false,false,null);

//開啟發(fā)布確認(rèn)

channel.confirmSelect();

//開啟時間

long begin = System.currentTimeMillis();

//批量確認(rèn)消息大小

int batchSize = 100;

//批量發(fā)送消息 批量發(fā)布確認(rèn)

for (int i=0;i

String message =i+"";

channel.basicPublish("",queueName,null,message.getBytes());

if (i % 100 == 0) {

boolean flag = channel.waitForConfirms();

if (flag) {

System.out.println("消息發(fā)送成功");

}

}

}

long time = System.currentTimeMillis() - begin;

System.out.println("發(fā)布:"+time);

}

異步發(fā)布確認(rèn)

異步確認(rèn)雖然編程邏輯比上兩個要復(fù)雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調(diào)函數(shù)來達(dá)到消息可靠性傳遞的,這個中間件也是通過函數(shù)回調(diào)來保證是否投遞成功,下面就讓我們來詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的。

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

// publishMessageIndividually();//單個確認(rèn)發(fā)布1000條數(shù)據(jù)需要7396ms

// publishMessageBatch();//批量確認(rèn)發(fā)布1000條數(shù)據(jù)需要1358ms

publishMessageAsync();//異步確認(rèn)發(fā)布1000條數(shù)據(jù)需要537ms

}

public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {

Channel channel = RabbitMqUtils.getChannel();

//聲明隊(duì)列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName,true,false,false,null);

//開啟發(fā)布確認(rèn)

channel.confirmSelect();

//開啟時間

long begin = System.currentTimeMillis();

//消息確認(rèn)成功回調(diào)函數(shù)

ConfirmCallback ackCallback=(deliveryTag,multiple)->{

};

//消息確認(rèn)失敗回調(diào)函數(shù)

ConfirmCallback nackCallback=(deliveryTag,multiple)->{

System.out.println("未確認(rèn)的消息:"+deliveryTag);

};

//準(zhǔn)備消息的監(jiān)聽器 監(jiān)聽哪些消息成功了 哪些消息失敗了

channel.addConfirmListener(ackCallback,nackCallback);//異步通知

//批量發(fā)送消息

for (int i=0;i

String message =i+"";

channel.basicPublish("",queueName,null,message.getBytes());

}

long time = System.currentTimeMillis() - begin;

System.out.println("發(fā)布:"+time);

}

如何處理異步未確認(rèn)信息? 最好的解決方案就是把未確認(rèn)的消息放到一個基于內(nèi)存的能被發(fā)布線程訪問的隊(duì)列,比如說用ConcurrentSkipListMap在confirm callbacks與發(fā)布線程之間進(jìn)行消息的傳遞.

public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {

Channel channel = RabbitMqUtils.getChannel();

//聲明隊(duì)列

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName,true,false,false,null);

//開啟發(fā)布確認(rèn)

channel.confirmSelect();

//開啟時間

long begin = System.currentTimeMillis();

/**

* 線程安全有序的哈希表,適用于高并發(fā)

* 1.輕松地將序號和消息進(jìn)行關(guān)聯(lián)

* 2.輕松地批量刪除,只要給到序號

* 3.支持高并發(fā)

*/

ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();

//消息確認(rèn)成功回調(diào)函數(shù)

ConfirmCallback ackCallback=(deliveryTag,multiple)->{

//刪除到已經(jīng)確定發(fā)布的消息,刪除哈希表中的數(shù)據(jù)

if(multiple){

//如果是批量發(fā)布確認(rèn),全部刪除

ConcurrentNavigableMap confirmed = outstandingConfirms.headMap(deliveryTag);

confirmed.clear();

}else {

outstandingConfirms.remove(deliveryTag);

}

};

//消息確認(rèn)失敗回調(diào)函數(shù)

ConfirmCallback nackCallback=(deliveryTag,multiple)->{

//打印為確認(rèn)的消息有哪些

String message = outstandingConfirms.get(deliveryTag);

System.out.println("未確認(rèn)的消息:"+deliveryTag);

};

//準(zhǔn)備消息的監(jiān)聽器 監(jiān)聽哪些消息成功了 哪些消息失敗了

channel.addConfirmListener(ackCallback,nackCallback);//異步通知

//批量發(fā)送消息

for (int i=0;i

String message =i+"";

channel.basicPublish("",queueName,null,message.getBytes());

//記錄所有要發(fā)送消息的總和

outstandingConfirms.put(channel.getNextPublishSeqNo(),message);

}

long time = System.currentTimeMillis() - begin;

System.out.println("發(fā)布:"+time);

}

交換機(jī)

RabbitMQ消息傳遞模型的核心思想是:生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊(duì)列中。 相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡單,一方面它接收來自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說把他們到許多隊(duì)列中還是說應(yīng)該丟棄它們。這就的由交換機(jī)的類型來決定。 總共有以下幾個類型: 直接(direct)、主題(topic)、標(biāo)題(headers)、扇出(fanout) 無名交換機(jī): 在本教程的前面部分我們對exchange一無所知,但仍然能夠?qū)⑾l(fā)送到隊(duì)列。之前能實(shí)現(xiàn)的原因是因?yàn)槲覀兪褂玫氖悄J(rèn)交換,我們通過空字符串(“”)進(jìn)行標(biāo)識。 第一個參數(shù)是交換機(jī)的名稱??兆址硎灸J(rèn)或無名稱交換機(jī):消息能路由發(fā)送到隊(duì)列中其實(shí)是由routingKey(bindingkey)綁定key指定的,如果它存在的話

channel.basiPublish("","hello",null,message.getBytes());

Fanout

Fanout這種類型非常簡單。正如從名稱中猜到的那樣,它是將接收到的所有消息廣播到它知道的所有隊(duì)列中。系統(tǒng)中默認(rèn)有些exchange類型. 消費(fèi)者

public class ReceiveLog01 {

private static final String EXCHANGE_NAME="logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//聲明一個交換機(jī)

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

//聲明一個隊(duì)列 臨時隊(duì)列 名稱隨機(jī) 當(dāng)消費(fèi)者斷開與隊(duì)列連接時隊(duì)列自動刪除

String queue = channel.queueDeclare().getQueue();

//交換機(jī)綁定隊(duì)列

channel.queueBind(queue,EXCHANGE_NAME,"");

System.out.println("等待接收消息......");

//接收信息參數(shù)

DeliverCallback deliverCallback =(consumerTag, message)->{

System.out.println("ReceiveLogs01控制臺打印接受到的消息:" + new String(message.getBody()));

};

//聲明取消消息

CancelCallback cancelCallback= consumerTag ->{

System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

channel.basicConsume(queue,true,deliverCallback,cancelCallback);

}

}

生產(chǎn)者

public class EmitLog {

private static final String EXCHANGE_NAME="logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//聲明一個交換機(jī)

// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

Scanner scanner=new Scanner(System.in);

while (scanner.hasNext()){

String message = scanner.next();

channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("生產(chǎn)者發(fā)出的消息:"+ message);

}

}

}

Direct

生產(chǎn)者

public class DirectLogs {

// 交換機(jī)的名稱

public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

Scanner scanner = new Scanner(System.in);

while (scanner.hasNext()){

String message = scanner.next();

channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("生產(chǎn)者發(fā)出的消息:"+ message);

}

}

}

消費(fèi)者

public class ReceiveLogsDirect01 {

public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

//聲明一個隊(duì)列

channel.queueDeclare("console",false,false,false,null);

//綁定交換機(jī)與隊(duì)列

channel.queueBind("console",EXCHANGE_NAME,"info");

channel.queueBind("console",EXCHANGE_NAME,"warning");

DeliverCallback deliverCallback = (consumerTag, message) -> {

System.out.println("ReceiveLogsDirect01控制臺打印接受到的消息:" + new String(message.getBody()));

};

channel.basicConsume("console",true,deliverCallback,consumerTag -> {});

}

}

public class ReceiveLogsDirect02 {

public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

//聲明一個隊(duì)列

channel.queueDeclare("disk",false,false,false,null);

//綁定交換機(jī)與隊(duì)列

channel.queueBind("disk",EXCHANGE_NAME,"error");

DeliverCallback deliverCallback = (consumerTag, message) -> {

System.out.println("ReceiveLogsDirect02控制臺打印接受到的消息:" + new String(message.getBody()));

};

channel.basicConsume("disk",true,deliverCallback,consumerTag -> {});

}

}

Topics

發(fā)送到類型是topic交換機(jī)的消息的routing_key不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點(diǎn)號分隔開。這些單詞可以是任意單詞,比如說: “stock.usd.nyse” , “nyse.vmw”,"quick.orange.rabbit"這種類型的。當(dāng)然這個單詞列表最多不能超過255個字節(jié)。 在這個規(guī)則列表中,其中有兩個替換符: *可以代替一個單詞 #可以代替零個或多個單詞

下圖綁定關(guān)系如下: Q1綁定的是:中間帶orange的三個單詞的字符串:*.orange.* Q2綁定的是:最后一個單詞是rabbit的單個單詞:*.*.rabbit,第一個單詞是lazy的多個單詞:lazy.#

數(shù)據(jù)接收情況如下:

quick.orange.rabbit:被隊(duì)列Q1Q2接收到

quick.orange.fox:被隊(duì)列Q1接收到

lazy.brown.fox:被隊(duì)列Q2接收到

lazy.pink.rabbit:雖然滿足隊(duì)列Q2的兩個綁定但是只會被接收一次

quick.orange.male.rabbit:四個單詞不匹配任何綁定會被丟棄

結(jié)論: 當(dāng)一個隊(duì)列綁定鍵是#,那么這個隊(duì)列將接收所有數(shù)據(jù),就有點(diǎn)像fanout了; 如果隊(duì)列綁定鍵當(dāng)中沒有#和*出現(xiàn),那么該隊(duì)列綁定類型就是direct了。

生產(chǎn)者

public class EmitLogTopic {

//交換機(jī)的名稱

public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

HashMap map = new HashMap<>();

map.put("quick.orange.rabbit","被隊(duì)列Q1Q2接收到");

map.put("quick.orange.fox","被隊(duì)列Q1接收到");

map.put("lazy.brown.fox","被隊(duì)列Q2接收到 ");

map.put("lazy.pink.rabbit","雖然滿足隊(duì)列Q2的兩個綁定但是只會被接收一次");

map.put("quick.orange.male.rabbit","四個單詞不匹配任何綁定會被丟棄");

for (Map.Entry bindingKeyEntry : map.entrySet()) {

String routingKey = bindingKeyEntry.getKey();

String message = bindingKeyEntry.getValue();

channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("生產(chǎn)者發(fā)送消息:"+ message );

}

}

}

消費(fèi)者

/*

* 聲明主題交換機(jī)及相關(guān)隊(duì)列

* 消費(fèi)者C1

* */

public class ReceiveLogsTopic01 {

//交換機(jī)名稱

public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//聲明交換機(jī)

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//聲明隊(duì)列

String queueName = "Q1";

channel.queueDeclare(queueName,false,false,false,null);

//隊(duì)列捆綁

channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");

System.out.println("等待接收消息......");

DeliverCallback deliverCallback = (consumerTag,message) -> {

System.out.println(new String(message.getBody()));

System.out.println("接收隊(duì)列:"+ queueName + "綁定鍵:" + message.getEnvelope().getRoutingKey());

};

//接收消息

channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});

}

}

/*

* 聲明主題交換機(jī)及相關(guān)隊(duì)列

* 消費(fèi)者C2

* */

public class ReceiveLogsTopic02 {

//交換機(jī)名稱

public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//聲明交換機(jī)

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//聲明隊(duì)列

String queueName = "Q2";

channel.queueDeclare(queueName,false,false,false,null);

//隊(duì)列捆綁

channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");

channel.queueBind(queueName,EXCHANGE_NAME,"*lazy.#");

System.out.println("等待接收消息......");

DeliverCallback deliverCallback = (consumerTag,message) -> {

System.out.println(new String(message.getBody()));

System.out.println("接收隊(duì)列:"+ queueName + "綁定鍵:" + message.getEnvelope().getRoutingKey());

};

//接收消息

channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});

}

}

死信隊(duì)列

死信,顧名思義就是無法被消費(fèi)的消息,字面意思可以這樣理解,一般來說,producer將消息投遞到 broker或者直接到queue里了,consumer 從 queue取出消息進(jìn)行消費(fèi),但某些時候由于特定的原因?qū)е聁ueue中的某些消息無法被消費(fèi),這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。 應(yīng)用場景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到RabbitMQ的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時,將消息投入死信隊(duì)列中.還有比如說:用戶在商城下單成功并點(diǎn)擊去支付后在指定時間未支付時自動失效。

來源:

消息TTL過期;

隊(duì)列達(dá)到最大長度(隊(duì)列滿了,無法再添加數(shù)據(jù)到mq中);

消息被拒絕(basic.reject或basic.nack)并且requeue=false。

消息TTL過期

生產(chǎn)者

/*

* 死信隊(duì)列之生產(chǎn)者代碼

*

* */

public class Producer {

//普通交換機(jī)的名稱

public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//死信消息,設(shè)置TTL時間 單位是ms 10000ms是10s

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

for (int i = 0; i < 10; i++) {

String message = "info" + i;

channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));

}

}

}

消費(fèi)者

/*

* 死信隊(duì)列實(shí)戰(zhàn)

* 消費(fèi)者01

* */

public class Consumer01 {

//普通交換機(jī)名稱

public static final String NORMAL_EXCHANGE = "normal_exchange";

//死信交換機(jī)名稱

public static final String DEAD_EXCHANGE = "dead_exchange";

//普通隊(duì)列名稱

public static final String NORMAL_QUEUE = "normal_queue";

//死信隊(duì)列名稱

public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//聲明死信和普通的交換機(jī)類型為direct

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

//聲明普通隊(duì)列

HashMap arguments = new HashMap<>();

//過期時間 也可以不設(shè)置 由生產(chǎn)者設(shè)置過期時間,更靈活

//arguments.put("x-message-ttl",10000);

//正常隊(duì)列設(shè)置死信隊(duì)列

arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);

//設(shè)置死信RoutingKey

arguments.put("x-dead-letter-routing-key","lisi");

//聲明死信和普通隊(duì)列

channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

//綁定普通的交換機(jī)與普通的隊(duì)列

channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

//綁定死信的交換機(jī)與死信的隊(duì)列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("等待接收消息......");

DeliverCallback deliverCallback = (consumerTag,message) -> {

System.out.println("Consumer01接收的消息是:" + new String(message.getBody()));

};

channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});

}

}

/*

* 死信隊(duì)列實(shí)戰(zhàn)

* 消費(fèi)者02

* */

public class Consumer02 {

//死信隊(duì)列名稱

public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

System.out.println("等待接收消息......");

DeliverCallback deliverCallback = (consumerTag,message) -> {

System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));

};

channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});

}

}

效果:啟動生產(chǎn)者后,10條消息被傳送到NORMAL_QUEUE,然后被傳送到DEAD_QUEUE,此時啟動消費(fèi)者02,消息全被接收。

隊(duì)列達(dá)到最大長度

生產(chǎn)者

public class Producer {

//普通交換機(jī)名稱

private static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//死信消息,設(shè)置TTL時間 單位是ms

// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

for (int i = 0; i < 10; i++) {

String message = "info" + i;

channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));

}

}

}

消費(fèi)者

public class Consumer01 {

//普通交換機(jī)名稱

private static final String NORMAL_EXCHANGE = "normal_exchange";

//死信交換機(jī)名稱

private static final String DEAD_EXCHANGE = "dead_exchange";

//普通隊(duì)列名稱

public static final String NORMAL_QUEUE = "normal_queue";

//死信隊(duì)列名稱

public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

//聲明兩個交換機(jī)類型為direct

channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");

channel.exchangeDeclare(DEAD_EXCHANGE,"direct");

//聲明普通隊(duì)列

HashMap arguments = new HashMap<>();

//過期時間 也可以不設(shè)置 由生產(chǎn)者設(shè)置過期時間

// arguments.put("x-message-ttl",10000);

//正常隊(duì)列設(shè)置死信隊(duì)列

arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);

//設(shè)置死信RoutingKey

arguments.put("x-dead-letter-routing-key","lisi");

//設(shè)置正常隊(duì)列的長度的限制

arguments.put("x-max-length",6);

channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

//綁定普通交換機(jī)和普通的隊(duì)列

channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

//綁定死信交換機(jī)和死信隊(duì)列

channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

System.out.println("等待接收消息");

DeliverCallback deliverCallback=(consumerTag, message)->{

System.out.println("Consumer01接收的消息:"+ new String(message.getBody()));

};

channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag -> {});

}

}

消息被拒絕

生產(chǎn)者代碼不需要改變 消費(fèi)者

DeliverCallback deliverCallback=(consumerTag, message)->{

String msg = new String(message.getBody());

if (msg.equals("info6")){

//拒絕消息info6

System.out.println("Consumer01接收到的消息是:"+msg +":此消息被C1拒絕了");

channel.basicReject(message.getEnvelope().getDeliveryTag(),false);

}else {

System.out.println("Consumer01接收的消息:"+msg );

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

}

};

//開啟手動應(yīng)答

channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag -> {});

延遲隊(duì)列

延時隊(duì)列,隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時屬性上,延時隊(duì)列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊(duì)列就是用來存放需要在指定時間被處理的元素的隊(duì)列。

使用場景:

訂單在十分鐘之內(nèi)未支付則自動取消。

新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。

用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。

用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。

預(yù)定會議后,需要在預(yù)定的時間點(diǎn)前十分鐘通知各個與會人員參加會議。

這些場景都有一個特點(diǎn),需要在某個事件發(fā)生之后或者之前的指定時間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;看起來似乎使用定時任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對于“如果賬單一周內(nèi)未支付則進(jìn)行自動結(jié)算”這樣的需求,如果對于時間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個可行的方案。但對于數(shù)據(jù)量比較大,并且時效性較強(qiáng)的場景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會有很多,活動期間甚至?xí)_(dá)到百萬甚至千萬級別,對這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無法完成所有訂單的檢查,同時會給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下。

整合springboot

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

com.alibaba

fastjson

1.2.73

org.projectlombok

lombok

io.springfox

springfox-swagger2

2.9.2

io.springfox

springfox-swagger-ui

2.9.2

org.springframework.amqp

spring-rabbit-test

test

spring.rabbitmq.host=192.168.24.8

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=123

@Configuration

@EnableSwagger2

public class SwaggerConfig {

@Bean

public Docket webApiConfig(){

return new Docket(DocumentationType.SWAGGER_2)

.groupName("webApi")

.apiInfo(webApiInfo())

.select()

.build();

}

private ApiInfo webApiInfo(){

return new ApiInfoBuilder()

.title("rabbitmq接口文檔")

.description("本文檔描述了rabbitmq微服務(wù)接口定義")

.version("1.0")

.contact(new Contact("enjoy6288","http://atguigu.com","123456@qq.com"))

.build();

}

}

隊(duì)列實(shí)現(xiàn)

配置

/*

* TTL隊(duì)列 配置文件類代碼

*

* */

@Configuration

public class TtlQueueConfig {

//普通交換機(jī)的名稱

public static final String X_EXCHANGE = "X";

//死信交換機(jī)的名稱

public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

//普通隊(duì)列的名稱

public static final String QUEUE_A = "QA";

public static final String QUEUE_B = "QB";

//死信隊(duì)列的名稱

public static final String DEAD_LATTER_QUEUE = "QD";

//聲明xExchange

@Bean("xExchange")

public DirectExchange xExchange(){

return new DirectExchange(X_EXCHANGE);

}

//聲明yExchange

@Bean("yExchange")

public DirectExchange yExchange(){

return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);

}

//聲明隊(duì)列

@Bean("queueA")

public Queue queueA(){

Map arguments = new HashMap<>(3);

//設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);

//設(shè)置死信Routing-key

arguments.put("x-dead-letter-routing-key","YD");

//設(shè)置TTL 單位是ms

arguments.put("x-message-ttl",10000);

return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();

}

//聲明普通隊(duì)列 TTL為40s

@Bean("queueB")

public Queue queueB(){

Map arguments = new HashMap<>(3);

//設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);

//設(shè)置死信Routing-key

arguments.put("x-dead-letter-routing-key","YD");

//設(shè)置TTL 單位是ms

arguments.put("x-message-ttl",40000);

return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

}

//死信隊(duì)列

@Bean("queueD")

public Queue queueD(){

return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();

}

//綁定

@Bean

public Binding queueABindingX(@Qualifier("queueA") Queue queueA,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}

//綁定

@Bean

public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueB).to(xExchange).with("XB");

}

//綁定

@Bean

public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,

@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");

}

}

消費(fèi)者代碼

/*

* 隊(duì)列TTL 消費(fèi)者

* */

@Slf4j

@Component

public class DeadLetterQueueConsumer {

//接收消息

@RabbitListener(queues = "QD")

public void receiveD(Message message, Channel channel) throws Exception {

String msg = new String(message.getBody());

log.info("當(dāng)前時間:{},收到死信隊(duì)列的消息:{}",new Date().toString(),msg);

}

}

生產(chǎn)者代碼

/*

* 發(fā)送延遲消息

* */

@Slf4j

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

//開始發(fā)消息

@GetMapping("/sendMsg/{message}")

public void sendMsg(@PathVariable String message){

log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個TTL隊(duì)列:{}",new Date().toString(),message);

rabbitTemplate.convertAndSend("X","XA","消息來自TTL為10s的隊(duì)列:" + message);

rabbitTemplate.convertAndSend("X","XB","消息來自TTL為40s的隊(duì)列:" + message);

}

}

隊(duì)列優(yōu)化

第一條消息在10S后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在40S之后變成了死信消息,然后被消費(fèi)掉,這樣一個延時隊(duì)列就打造完成了。 不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊(duì)列,這里只有10S和40S兩個時間選項(xiàng),如果需要一個小時后處理,那么就需要增加TTL為一個小時的隊(duì)列,如果是預(yù)定會議室然后提前通知這樣的場景,豈不是要增加無數(shù)個隊(duì)列才能滿足需求?

配置文件類

/*

* TTL隊(duì)列 配置文件類代碼

*

* */

@Configuration

public class TtlQueueConfig {

//普通交換機(jī)的名稱

public static final String X_EXCHANGE = "X";

//死信交換機(jī)的名稱

public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

//普通隊(duì)列的名稱

public static final String QUEUE_A = "QA";

public static final String QUEUE_B = "QB";

public static final String QUEUE_C = "QC";

//死信隊(duì)列的名稱

public static final String DEAD_LATTER_QUEUE = "QD";

//聲明QC隊(duì)列

@Bean("queueC")

public Queue queueC(){

Map arguments = new HashMap<>();

//設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);

//設(shè)置死信RoutingKey

arguments.put("x-dead-letter-routing-key","YD");

return QueueBuilder.durable().withArguments(arguments).build();

}

@Bean

public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueC).to(xExchange).with("XC");

}

//聲明xExchange

@Bean("xExchange")

public DirectExchange xExchange(){

return new DirectExchange(X_EXCHANGE);

}

//聲明yExchange

@Bean("yExchange")

public DirectExchange yExchange(){

return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);

}

//聲明隊(duì)列

@Bean("queueA")

public Queue queueA(){

Map arguments = new HashMap<>(3);

//設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);

//設(shè)置死信Routing-key

arguments.put("x-dead-letter-routing-key","YD");

//設(shè)置TTL 單位是ms

arguments.put("x-message-ttl",10000);

return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();

}

//聲明普通隊(duì)列 TTL為40s

@Bean("queueB")

public Queue queueB(){

Map arguments = new HashMap<>(3);

//設(shè)置死信交換機(jī)

arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);

//設(shè)置死信Routing-key

arguments.put("x-dead-letter-routing-key","YD");

//設(shè)置TTL 單位是ms

arguments.put("x-message-ttl",40000);

return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

}

//死信隊(duì)列

@Bean("queueD")

public Queue queueD(){

return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();

}

//綁定

@Bean

public Binding queueABindingX(@Qualifier("queueA") Queue queueA,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}

//綁定

@Bean

public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueB).to(xExchange).with("XB");

}

//綁定

@Bean

public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,

@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");

}

}

/*

* 發(fā)送延遲消息

* */

@Slf4j

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

//開始發(fā)消息

@GetMapping("/sendMsg/{message}")

public void sendMsg(@PathVariable String message){

log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個TTL隊(duì)列:{}",new Date().toString(),message);

rabbitTemplate.convertAndSend("X","XA","消息來自TTL為10s的隊(duì)列:" + message);

rabbitTemplate.convertAndSend("X","XB","消息來自TTL為40s的隊(duì)列:" + message);

}

//開始發(fā)消息

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")

public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){

log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒TTL信息給隊(duì)列QC:{}",

new Date().toString(),ttlTime,message);

rabbitTemplate.convertAndSend("X","XC",message,msg->{

//發(fā)送消息的時候 延遲時長

msg.getMessageProperties().setExpiration(ttlTime);

return msg;

});

}

}

消費(fèi)者代碼無需改變

插件實(shí)現(xiàn)延遲隊(duì)列

下載延遲插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

將延遲插件放到RabbitMQ的插件目錄下:

安裝插件并重啟服務(wù)

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

systemctl restart rabbitmq-server

配置文件類

@Configuration

public class DelayedQueueConfig {

//隊(duì)列

public static final String DELAYED_QUEUE_NAME = "delayed.queue";

//交換機(jī)

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

//routingKey

public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

//聲明隊(duì)列

@Bean

public Queue delayedQueue(){

return new Queue(DELAYED_QUEUE_NAME);

};

//聲明交換機(jī)

@Bean

public CustomExchange delayedExchange(){

Map arguments = new HashMap<>();

arguments.put("x-delayed-type","direct");

return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",

true,false,arguments);

}

//綁定

@Bean

public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,

@Qualifier("delayedExchange") CustomExchange delayedExchange){

return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();

}

}

// 消費(fèi)者代碼 基于插件的延遲消息

@Slf4j

@Component

public class DelayQueueConsumer {

//監(jiān)聽消息

@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)

public void recieveDelayQueue(Message message){

String msg = new String(message.getBody());

log.info("當(dāng)前時間:{},收到延遲隊(duì)列的消息:{}",new Date().toString(),msg);

}

}

/*

* 發(fā)送延遲消息

* */

@Slf4j

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

//開始發(fā)消息 基于插件的 消息 及 延遲的時間

@GetMapping("/sendDelayMsg/{message}/{delayTime}")

public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){

log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒信息給延遲隊(duì)列delayed.queue:{}",

new Date().toString(),delayTime,message);

rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME

,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {

// 發(fā)送消息的時候 延遲時長 單位ms

msg.getMessageProperties().setDelay(delayTime);

return msg;

});

}

}

總結(jié): 延時隊(duì)列在需要延時處理的場景下非常有用,使用RabbitMQ來實(shí)現(xiàn)延時隊(duì)列可以很好的利用; RabbitMQ的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問題,不會因?yàn)閱蝹€節(jié)點(diǎn)掛掉導(dǎo)致延時隊(duì)列不可用或者消息丟失; 當(dāng)然,延時隊(duì)列還有很多其它選擇,比如利用Java的DelayQueue,利用Redis的zsset,利用Quartz或者利用kafka的時間輪,這些方式各有特點(diǎn),看需要適用的場景。

發(fā)布確認(rèn)高級

在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 rabbitmq重啟,在RabbitMQ重啟期間生產(chǎn)者消息投遞失敗,導(dǎo)致消息丟失,需要手動處理和恢復(fù)。于是,我們開始思考,如何才能進(jìn)行RabbitMQ的消息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ集群不可用的時候,無法投遞的消息該如何處理呢?

配置文件

配置文件及消息發(fā)送方

NONE:禁用發(fā)布確認(rèn)模式,是默認(rèn)值

CORRELATED:發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法

SIMPLE:經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會觸發(fā)回調(diào)方法,

其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms,或 waitForConfirmsOrDie方法

等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯,

要注意的點(diǎn)是waitForConfirmsOrDiea方法如果返回false則會關(guān)閉channel,

則接下來無法發(fā)送消息到broker

spring.rabbitmq.publisher-confirm-type = correlated

// 配置類:發(fā)布確認(rèn)(高級)

@Configuration

public class ConfirmConfig {

//交換機(jī)

public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

//隊(duì)列

public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

//RoutingKey

public static final String CONFIRM_routing_key = "key1";

//聲明交換機(jī)

@Bean

public DirectExchange confirmExchange(){

return new DirectExchange(CONFIRM_EXCHANGE_NAME);

}

@Bean

public Queue confirmQueue(){

return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();

}

//綁定

@Bean

public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,

@Qualifier("confirmExchange")DirectExchange confirmExchange){

return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_routing_key);

}

}

生產(chǎn)者

// 開始發(fā)消息 測試確認(rèn)

@RestController

@Slf4j

@RequestMapping("/confirm")

public class ProducerController {

@Autowired

private RabbitTemplate rabbitTemplate;

//發(fā)消息

@GetMapping("/sendMessage/{message}")

public void sendMessage(@PathVariable String message){

CorrelationData correlationData = new CorrelationData("1");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME

,ConfirmConfig.CONFIRM_routing_key

,message,correlationData);

log.info("發(fā)送消息內(nèi)容:{}",message);

}

}

消費(fèi)者

// 接收消息

@Slf4j

@Component

public class Consumer {

@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)

public void receiveConfirmMessage(Message message){

String msg = new String(message.getBody());

log.info("接受到的隊(duì)列confirm.queue消息:{}",msg);

}

}

回調(diào)接口

@Component

@Slf4j

public class MyCallBack implements RabbitTemplate.ConfirmCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

//注入

rabbitTemplate.setConfirmCallback(this);

}

/*

* 交換機(jī)確認(rèn)回調(diào)方法,發(fā)消息后,交換機(jī)接收到了就回調(diào)

* 1.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息

* 1.2 b:交換機(jī)收到消息,為true

* 1.3 s:失敗原因,成功為null

*

* 發(fā)消息,交換機(jī)接受失敗,也回調(diào)

* 2.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息

* 2.2 b:交換機(jī)沒收到消息,為false

* 2.3 s:失敗的原因

*

* */

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

String id = correlationData!=null ? correlationData.getId():"";

if (b){

log.info("交換機(jī)已經(jīng)收到ID為:{}的信息",id);

}else {

log.info("交換機(jī)還未收到ID為:{}的消息,由于原因:{}",id,s);

}

}

}

回退消息

在僅開啟了生產(chǎn)者確認(rèn)機(jī)制的情況下,交換機(jī)接收到消息后,會直接給消息生產(chǎn)者發(fā)送確認(rèn)消息,如果發(fā)現(xiàn)該消息不可路由,那么消息會被直接丟棄,此時生產(chǎn)者是不知道消息被丟棄這個事件的。那么如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過設(shè)置mandatory參數(shù)可以在當(dāng)消息傳遞過程中不可達(dá)目的地時將消息返回給生產(chǎn)者。

配置類

spring.rabbitmq.publisher-returns=true

回退接口

@Component

@Slf4j

public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

//注入

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

/*

* 交換機(jī)確認(rèn)回調(diào)方法,發(fā)消息后,交換機(jī)接收到了就回調(diào)

* 1.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息

* 1.2 b:交換機(jī)收到消息,為true

* 1.3 s:失敗原因,成功為null

*

* 發(fā)消息,交換機(jī)接受失敗,也回調(diào)

* 2.1 correlationData:保存回調(diào)消息的ID及相關(guān)信息

* 2.2 b:交換機(jī)沒收到消息,為false

* 2.3 s:失敗的原因

*

* */

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

String id = correlationData!=null ? correlationData.getId():"";

if (b){

log.info("交換機(jī)已經(jīng)收到ID為:{}的信息",id);

}else {

log.info("交換機(jī)還未收到ID為:{}的消息,由于原因:{}",id,s);

}

}

//可以在當(dāng)消息傳遞過程中不可達(dá)目的的時將消息返回給生產(chǎn)者

//只有不可達(dá)目的地的時候才可回退

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

log.error("消息{},被交換機(jī){}退回,退回的原因:{},路由Key:{}",

new String(returnedMessage.getMessage().getBody())

, returnedMessage.getExchange()

, returnedMessage.getReplyText()

, returnedMessage.getRoutingKey());

}

}

發(fā)布測試

// 開始發(fā)消息 測試確認(rèn)

@RestController

@Slf4j

@RequestMapping("/confirm")

public class ProducerController {

@Autowired

private RabbitTemplate rabbitTemplate;

//發(fā)消息

@GetMapping("/sendMessage/{message}")

public void sendMessage(@PathVariable String message){

CorrelationData correlationData = new CorrelationData("1");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME

,ConfirmConfig.CONFIRM_routing_key

,message+"key1",correlationData);

log.info("發(fā)送消息內(nèi)容:{}",message+"key1");

CorrelationData correlationData2 = new CorrelationData("2");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME

,ConfirmConfig.CONFIRM_routing_key+"2"

,message+"key12",correlationData2);

log.info("發(fā)送消息內(nèi)容:{}",message+"key12");

}

}

柚子快報激活碼778899分享:java RabbitMQ筆記

http://yzkb.51969.com/

精彩文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19165896.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄