柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq學(xué)習(xí)
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq學(xué)習(xí)
RabbitMq筆記
1:安裝
1.1安裝Erlang:
安裝:yum -y install esl-erlang_xx.x.x-x_centos_x_amd64.rpm 檢查:erl
1.2安裝RabbitMq
官網(wǎng)下載 https://www.rabbitmq.com/ 選擇需要下載的 點(diǎn)擊Downloading 選擇需要下載的 上傳的linux 只要能上傳到linux就行 安裝
yum -y install rabbitmq-server-x.x.x-x.elx.noarch.rpm
安裝UI插件
rabbitmq-plugins enable rabbitmq_management
啟動(dòng)rabbitMq服務(wù)
systemctl start rabbitmq-server.service
啟動(dòng)后關(guān)閉防火墻或者開啟5672和15672端口 檢查rabbitMq服務(wù)
systemctl status rabbitmq-server.service
關(guān)閉rabbitmq服務(wù):
systemctl stop rabbitmq-server.service
或者直接kill -9 xxx 直接殺死 重啟rabbitmq服務(wù):
systemctl restart rabbitmq-server.service
訪問 linux系統(tǒng)的Ip+15672 默認(rèn)賬號(hào)密碼為:guest 修改配置文件 再安裝目錄(默認(rèn):/etc/tabbitmq)創(chuàng)建一個(gè)rabbitmq.config文件。文件中添加(請(qǐng)不要忘記結(jié)尾那個(gè)“.”):
[{rabbit, [{loopback_users, []}]}].
** 重啟服務(wù)**
systemctl restart rabbitmq-server.service
1.3添加用戶權(quán)限
添加用戶,默認(rèn)用戶 guest 角色為管理員,一般開發(fā)環(huán)境下會(huì)創(chuàng)建新的用戶并對(duì)權(quán)限進(jìn)行分配。 添加新用戶并對(duì)權(quán)限進(jìn)行分配 添加用戶
用戶列表顯示用戶狀態(tài)是 No access ,代表用戶未進(jìn)行權(quán)限分配,不能進(jìn)行任何操作, 這里創(chuàng)建用戶分配權(quán)限可以類比數(shù)據(jù)庫(kù)中創(chuàng)建用戶并分配權(quán)限操作。
分配權(quán)限 創(chuàng)建 virtual hosts 可以類比創(chuàng)建數(shù)據(jù)庫(kù),分配用戶操作權(quán)限
點(diǎn)擊 admin 重新登陸即可使用新增賬號(hào)登陸
2專業(yè)術(shù)語(yǔ)
官網(wǎng)地址:http://www.rabbitmq.com/getstarted.html
2.1 Producing Producing意思不僅僅事發(fā)送消息。發(fā)送消息的程序叫做producer生產(chǎn)者 2.2Queue Queue是一個(gè)消息盒子的名稱。它存活在RabbitMq里。雖然消息流經(jīng)Rabbitm和你的應(yīng)用程序,但是他們只能在Queue里才能被保存。Queue沒有任何邊界的限制,想存多少消息都可以。本質(zhì)上是一個(gè)無(wú)限的緩存。許多生產(chǎn)者都可以向一個(gè)Queue里發(fā)送消息,許多消費(fèi)者都可以從一個(gè)Queue里接收消息。 2.3Consuming Consuming的意思和接收類似。等待接收消息的程序叫做消費(fèi)者。 注意:生產(chǎn)者、消費(fèi)者和代理人不一定非要在同一臺(tái)機(jī)器上 2.4ConnectionFactory、Connection、Channel ConnectionFactory、Connection、Channe都是RabbutMQ對(duì)外體提供的API中最基本的對(duì)象。 Connection是RabbitMQ的socket連接,它封裝了socket協(xié)議相關(guān)部分的邏輯。 ConnectionFactory為Connecton的制造工廠。 Channel是我們與RabbitMQ打交道的最重要的一個(gè)接口,我們大部分的業(yè)務(wù)操作是在Channel這個(gè)接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Excheange、發(fā)布消息等。 2.5Message acknowledgment 實(shí)際應(yīng)用中,可能會(huì)發(fā)生消費(fèi)者收到Queue中的消息,但沒有處理完后就宕機(jī)(或其他意外)的情況,這種情況下就可能會(huì)導(dǎo)致消息丟失。為了避免這種情況發(fā)生,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknwledgment)后才將該消息從Queue移除;如果RabbitMQ沒有收到回執(zhí)并檢測(cè)到消費(fèi)者的RabbitMQ連接斷開,則RabbitMQ會(huì)將該消息發(fā)送給其他消費(fèi)者(如果存在多個(gè)消費(fèi)者)進(jìn)行處理。合理不存在timeout概念,一個(gè)消費(fèi)者處理消息時(shí)間再長(zhǎng)也不會(huì)導(dǎo)致該消息被發(fā)送給其他消費(fèi)者,除非它的RabbitMQ連接斷開。 這里會(huì)產(chǎn)生另外一個(gè)問題,如果我們的開發(fā)人員在處理完業(yè)務(wù)邏輯后,忘記發(fā)送回執(zhí)給RabbitMQ,這將導(dǎo)致嚴(yán)重bug–Queue中堆積的消息會(huì)越來(lái)越多;消費(fèi)者重啟后會(huì)重復(fù)新消費(fèi)這些消息并重新執(zhí)行業(yè)務(wù)邏輯… 2.6Message durability 如果我們希望即使在RabbitMQ服務(wù)重啟的情況下,也不會(huì)丟失消息,我們可以將Queue與Message都設(shè)置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會(huì)丟失。但依然解決不了小概率丟失事件的發(fā)生(比如RabbitMQ服務(wù)器已經(jīng)接收到生產(chǎn)者的消息,但還沒來(lái)得及持久化該消息時(shí)RabbitMQ服務(wù)器就斷電了),如果我們需要對(duì)這種小概率事件也要管理起來(lái),那么我們要用到事務(wù) 2.7Prefetch count 如果有很多消費(fèi)者同時(shí)訂閱同一個(gè)Queue中的消息,Queue中的消息會(huì)被平攤給多個(gè)消費(fèi)者,這是如果每個(gè)消息的處理時(shí)間不同,就有可能會(huì)導(dǎo)致某些消費(fèi)者一直在忙,而另外一些消費(fèi)者很快就處理完手頭工作并一直空閑情況。我們可以設(shè)置prefetchCount 來(lái)限制Queue每次發(fā)送給每個(gè)消費(fèi)者的消息數(shù),比如我們?cè)O(shè)置pefetchCount=1,則Queue每次給每個(gè)消費(fèi)者發(fā)送一條消息;消費(fèi)者處理完這條消息后Queue會(huì)再給該消費(fèi)者發(fā)送一條消息。 2.8Exchange 生產(chǎn)者將消息投遞到Queue中,實(shí)際上這在RabbitMQ中這種事情永遠(yuǎn)都不會(huì)發(fā)生。實(shí)際的情況時(shí),生產(chǎn)者將消息發(fā)送到Exchange(交換器),有Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)。 Exchange是通過Binding將Exchange與Queue關(guān)聯(lián)起來(lái)的,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。 RabbitMQ中的Exchange有四種類型(fanout、direct、tpic、headers),不同類型有著不同的路由策略 fanout:fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。 direct:direct類型的Exchange路由規(guī)則也會(huì)很簡(jiǎn)單,它會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中。 topic:topic類型的Exchange再匹配規(guī)則上進(jìn)行了擴(kuò)展,它與direct類型的Exchange相似,也是將消息路由到binding key 與 outing key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定: (1)routing key為一個(gè)句點(diǎn)號(hào),分割的字符串(將被句點(diǎn)號(hào)".“分割開的每一段獨(dú)立的字符串成為一個(gè)單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” (2)binding key 與 routing key 一樣也是句點(diǎn)號(hào)”."分割的字符串 (3)binding key中可以存在兩種特殊字符”“與”#“,用于做模糊匹配。其中"“用于匹配一個(gè)的單詞。#永固匹配多個(gè)單詞(可以是0個(gè)) headers:headers類型的Exchange不依賴于 routing key 與 binding key的匹配規(guī)則來(lái)路由消息,二十根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配 再綁定Queue與Exchange時(shí)指定一組鍵值對(duì);當(dāng)消息發(fā)送到Exchange時(shí),RabbitMQ會(huì)取到該消息的headers(也是一個(gè)鍵值對(duì)ing是),對(duì)比其中的鍵值對(duì)是否完全匹配Queue與Exchange綁定時(shí)指定的鍵值對(duì);如果完全匹配則消息會(huì)路由到該Queue,否則不會(huì)路由到該Queue。 2.9routing key 生產(chǎn)者再將消息發(fā)送給Exchange的時(shí)候,一版會(huì)指定一個(gè)routing key,來(lái)指定這個(gè)消息的路由規(guī)則,而routing key需要與Exchange Type及Binding key聯(lián)合使用才能最終生效, 再Exchange Type與binding key固定的情況下(在正常使用一般這些內(nèi)容都是固定配置好的),我們的生產(chǎn)者就可以在發(fā)送消息給Exchange時(shí),通過指定routing key來(lái)決定消息流向哪里。 RabbitMQ為routing key設(shè)定的長(zhǎng)度限制為255bytes 2.10Binding RabbitMQ中哦通過binding將Exchange與Queue關(guān)聯(lián)起來(lái),這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了 2.11Binding Key 在綁定(Binding)Exchange與Queue的同時(shí),一般會(huì)指定一個(gè)binding key;消費(fèi)者將消息發(fā)送給Exchange時(shí),一般會(huì)指定一個(gè)routing key;當(dāng)binding key 與routing key相匹配時(shí),消息將會(huì)被路由到對(duì)應(yīng)的Queue中 在綁定多個(gè)Queue到同一個(gè)Exchange的時(shí)候,這些Binding允許使用相同的binding key binding key 并不是在所有情況下都生效,它依賴于Exchange Type ,比如fanout類型的Exchange就會(huì)無(wú)視binding key,而是將消息路由到所偶綁定到Exchange的Queue。 2.12ExchangeTypes RabbitMQ常用的Exchange Type有 fanout 、 direct 、 topic 、 headers 這四種(AMQP規(guī)范里還 提到兩種Exchange Type,分別為 system 與 自定義 ,) fanout:fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。 direct:direct類型的Exchange路由規(guī)則也會(huì)很簡(jiǎn)單,它會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中。 topic:topic類型的Exchange再匹配規(guī)則上進(jìn)行了擴(kuò)展,它與direct類型的Exchange相似,也是將消息路由到binding key 與 outing key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定: (1)routing key為一個(gè)句點(diǎn)號(hào),分割的字符串(將被句點(diǎn)號(hào)”.“分割開的每一段獨(dú)立的字符串成為一個(gè)單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” (2)binding key 與 routing key 一樣也是句點(diǎn)號(hào)”."分割的字符串 (3)binding key中可以存在兩種特殊字符”“與”#“,用于做模糊匹配。其中""用于匹配一個(gè)的單詞。#永固匹配多個(gè)單詞(可以是0個(gè)) headers:headers類型的Exchange不依賴于 routing key 與 binding key的匹配規(guī)則來(lái)路由消息,二十根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配 再綁定Queue與Exchange時(shí)指定一組鍵值對(duì);當(dāng)消息發(fā)送到Exchange時(shí),RabbitMQ會(huì)取到該消息的headers(也是一個(gè)鍵值對(duì)ing是),對(duì)比其中的鍵值對(duì)是否完全匹配Queue與Exchange綁定時(shí)指定的鍵值對(duì);如果完全匹配則消息會(huì)路由到該Queue,否則不會(huì)路由到該Queue。 RabbitMQ中實(shí)現(xiàn)RPC機(jī)制 1.客戶端發(fā)送請(qǐng)求(消息)時(shí),在消息的屬性(MessageProperties,在AMQP協(xié)議中定義了14種properties,這些屬性會(huì)隨著消息一起發(fā)送中設(shè)置兩個(gè)值replyTo(一個(gè)Queue名稱,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到Queue中)和correlationId(此次請(qǐng)求的標(biāo)識(shí)號(hào),服務(wù)器處理完成后需要將此屬性返還,客戶端將根據(jù)這個(gè)id了解那條請(qǐng)求被成執(zhí)行了或執(zhí)行失敗) 2.服務(wù)器端收到消息并處理 3.服務(wù)器端處理完消息后,將生成一條應(yīng)答消息到replyTo指定的Queue,同時(shí)帶上corelationId屬性 4.客戶端之前已訂閱replyTo指定的Queue,從中收到服務(wù)器的應(yīng)答消息后,根據(jù)其中的correaltionId屬性分析那條請(qǐng)求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理
3.簡(jiǎn)單隊(duì)列
需要用Java寫兩個(gè)程序;一個(gè)是生產(chǎn)者,他發(fā)送一個(gè)消息,另一個(gè)時(shí)消費(fèi)者,他接收消息,并且把消息打印出來(lái),會(huì)忽略一些Java API的細(xì)節(jié), 創(chuàng)建項(xiàng)目 創(chuàng)建一個(gè)maven項(xiàng)目 添加如下依賴:
消息發(fā)送
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* 簡(jiǎn)單模式隊(duì)列-消息發(fā)送者
*/
@Component
class MqSendTest{
//隊(duì)列名稱
public final static String QUUE_NAME = "ycMq02";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
try {
// 通過工廠創(chuàng)建連接
connection = factory.newConnection();
//獲取通道
channel = connection.createChannel();
/**
* 聲明隊(duì)列
* 第一個(gè)參數(shù)queue:隊(duì)列名
* 第二個(gè)擦?xí)鴇urable:是否持久化
* 第三額參數(shù)Exclusive:排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列對(duì)首次聲明它的連接可見,并在連接斷開時(shí)自動(dòng)珊瑚蟲
* 需要注意三點(diǎn):
* 1:排他隊(duì)列時(shí)基于連接可見的,同一連接的不同通道時(shí)可以同時(shí)訪問同一個(gè)連接創(chuàng)建的排他隊(duì)列的
* 2:”首次“,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的,這個(gè)與普通隊(duì)列不同
* 3:即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出,該排他隊(duì)列都會(huì)被自動(dòng)刪除的
* 這種隊(duì)列適用于只限于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場(chǎng)景
* 第四個(gè)參數(shù)Auto-delete:自動(dòng)刪除,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除
* 這種隊(duì)列只適用于臨時(shí)隊(duì)列
*/
channel.queueDeclare(QUUE_NAME,false,false,false,null);
//創(chuàng)建消息
String message="Hello World??!";
//將產(chǎn)生的消息放入隊(duì)列
channel.basicPublish("",QUUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '"+message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉通道
if (null!=channel&&channel.isOpen()){
channel.close();
}
//關(guān)閉連接
if (null!=connection&&connection.isOpen()){
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消息發(fā)送成后可以通過RabitMQ管理界面查看 消息接收
/**
* 簡(jiǎn)單模式隊(duì)列-消息接收者
*/
class MqRecvTest{
//隊(duì)列名稱
public final static String QUUE_NAME = "ycMq02";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
try {
//通過工廠創(chuàng)建連接
Connection connection=factory.newConnection();
//獲取通道
Channel channel=connection.createChannel();
//指定隊(duì)列
channel.queueDeclare(QUUE_NAME,false,false,false,null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//獲取消息
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message=new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//監(jiān)聽隊(duì)列
channel.basicConsume(QUUE_NAME,true,deliverCallback,consumerTag->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息接收成功后可以通過RabbitMQ管理界面查看 測(cè)試 發(fā)送: 接收 總結(jié) 簡(jiǎn)單隊(duì)列處理消息效率不高,吞吐量較低,如果任務(wù)量很大,會(huì)造成消息得不到消費(fèi),出現(xiàn)內(nèi)存溢出,消息丟失等問題,這個(gè)時(shí)候就需要配置多個(gè)消費(fèi)者消費(fèi)消息
4.Work queues-工作模式隊(duì)列
4.1消息輪詢
簡(jiǎn)單隊(duì)列中存在的角色:生產(chǎn)者、消費(fèi)者、隊(duì)列,當(dāng)消費(fèi)者只有一個(gè),消息量非常大的時(shí)候,單個(gè)消費(fèi)者處理消息就會(huì)變慢,同時(shí)給節(jié)點(diǎn)頁(yè)帶來(lái)很大壓力,導(dǎo)致消息堆積越來(lái)越多。這是需要使用RabbitMQ提供的工作隊(duì)列模式,通過工作隊(duì)列提供多個(gè)消費(fèi)者,對(duì)MQ產(chǎn)生的消息進(jìn)行消費(fèi),提高M(jìn)Q消息的吞吐量,降低消息的處理時(shí)間 消息發(fā)送
/**
* 工作模式隊(duì)列-輪詢發(fā)布-消息發(fā)送者
*/
class MqSentRound {
//隊(duì)列名稱
private final static String QUEUE_NAME = "work_roundRobin";
public static void main(String[] args) {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
try {
//通過工廠創(chuàng)建連接
connection = factory.newConnection();
//獲取通道
channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//創(chuàng)建消息
for (int i = 1; i < 21; i++) {
String message = "Hello World! ------ " + i;
//將產(chǎn)生的消息放入隊(duì)列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Received '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉通道
if (null != channel && channel.isOpen()) {
channel.close();
}
//關(guān)閉連接
if (null != connection && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
RabitMQ管理界面查看 結(jié)果
消息接收
/**
* 工作模式隊(duì)列-輪詢發(fā)布-消息接收者
*/
class MqRecvRound01 {
//隊(duì)列名稱
private final static String QUEUE_NAME = "work_roundRobin";
public static void main(String[] args) {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
try {
//通過工廠創(chuàng)建連接
Connection connection = factory.newConnection();
//獲取通道
Channel channel = connection.createChannel();
//指定隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//獲取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received01 '" + message + "'");
//模擬程序執(zhí)行所耗時(shí)
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
};
//監(jiān)聽隊(duì)列
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag ->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 工作模式隊(duì)列-輪詢發(fā)布-消息接收者
*/
class MqRecvRound02 {
//隊(duì)列名稱
private final static String QUEUE_NAME = "work_roundRobin";
public static void main(String[] args) {
//創(chuàng)建連接工廠
ConnectionFactory factory =new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
try {
//創(chuàng)建連接工廠
Connection connection=factory.newConnection();
//獲取通道
Channel channel =connection.createChannel();
//綁定隊(duì)列
channel.queueDeclare(QUEUE_NAME,false, false,false,null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//獲取消息
DeliverCallback deliverCallback = (consumerTag , delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received02 '" + message + "'");
//模擬程序執(zhí)行所耗時(shí)
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
};
//監(jiān)聽隊(duì)列
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag ->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
RabitMQ管理界面查看 總結(jié) 任務(wù)量很大,消息雖然得到了及時(shí)消費(fèi),單位時(shí)間呢i處理速度加快,提高了吞吐量;可是不同消費(fèi)者處理消息的時(shí)間不同,導(dǎo)致部分消費(fèi)者的資源被浪費(fèi)
4.2消息公平分發(fā)
消息發(fā)布采用的是默認(rèn)輪詢分發(fā),消息應(yīng)答采用的是自動(dòng)應(yīng)答模式,當(dāng)消息進(jìn)入隊(duì)列,RabbitMq就會(huì)分派消息。不會(huì)看消費(fèi)者的應(yīng)答數(shù)目,盲目的將第n條消息發(fā)給第n個(gè)消費(fèi)者。這個(gè)時(shí)候需要使用bassicQos(prefetchCount = 1)方法,來(lái)限制RabbitMQ只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者。當(dāng)消息處理完畢后,有了反饋,才會(huì)進(jìn)行第二次發(fā)送
消息發(fā)送
/**
* 工作模式隊(duì)列-公平分發(fā)-消息發(fā)送者
*/
class MqSendFair {
//隊(duì)列名稱
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] args) {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
try {
//創(chuàng)建連接
connection = factory.newConnection();
//獲取通道
channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//創(chuàng)建消息
for (int i = 1; i < 22; i++) {
String message = "Hello World! ------ " + i;
//將產(chǎn)生的消息放入隊(duì)列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("[x] Sent '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉通道
if (null != channel && channel.isOpen()) {
channel.close();
}
//關(guān)閉連接
if (null != connection && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
RabitMQ管理界面查看 消息接收
/**
* 工作模式隊(duì)列-公平分發(fā)-消息接收者
*/
class MqRecvFair01 {
//隊(duì)列名稱
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] args) {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
try {
//通過工廠創(chuàng)建連接
final Connection connection = factory.newConnection();
//獲取通道
final Channel channel = connection.createChannel();
//指定隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 限制RabbitMq只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者
* 當(dāng)消息處理完畢后,有了發(fā)聵,才會(huì)進(jìn)行第二次發(fā)送
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//獲取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received01 '" + message + "'");
//模擬程序執(zhí)行所耗時(shí)
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//手動(dòng)回執(zhí)消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//監(jiān)聽隊(duì)列
/**
* autoAck = true 代表自動(dòng)確認(rèn)消息
* autoAck = false 代表手動(dòng)確認(rèn)消息
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 工作模式隊(duì)列-公平分發(fā)-消息接收者
*/
class MqRecvFair02 {
//隊(duì)列名稱
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] args) {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
try {
//通過工廠創(chuàng)建連接
final Connection connection = factory.newConnection();
//創(chuàng)建隊(duì)列
final Channel channel = connection.createChannel();
//指定隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 限制RabbitMQ只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者
* 當(dāng)消息處理完畢后,有了反饋,才會(huì)進(jìn)行第二次發(fā)送
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//獲取消息
DeliverCallback deliverCallback = (consumeTab, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received02 '" + message + "'");
//模擬程序執(zhí)行所耗時(shí)
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
//手動(dòng)回執(zhí)消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//監(jiān)聽隊(duì)列
/**
* autoAck = true 代表自動(dòng)確認(rèn)消息
* autoAck = false 代表手動(dòng)確認(rèn)消息
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
總結(jié) 公平輪詢發(fā)布-根據(jù)不同消費(fèi)者處理消息的速度不同,收到的數(shù)量也不同,處理速度快的消息數(shù)量比較多,最大化使用計(jì)算機(jī)資源。生產(chǎn)者產(chǎn)生的消息只可以被一個(gè)消費(fèi)者消費(fèi)
5.Publish/Subscribe- 發(fā)布與訂閱模式
當(dāng)生產(chǎn)者把消息推送出去后,不同的消費(fèi)者均可以對(duì)該消息進(jìn)行消費(fèi),而不是消息被一個(gè)消費(fèi)者消費(fèi)后就立即從隊(duì)列中刪除,這中處理方式稱之為消息的發(fā)布與訂閱模式
消息發(fā)送
/**
* 發(fā)布于訂閱模式隊(duì)列-fanout廣播模式-消息發(fā)送者
*/
class MqSendFanout {
//隊(duì)列名稱
//如果不生命隊(duì)列,會(huì)使用默認(rèn)值,RabbitMQ會(huì)創(chuàng)建一個(gè)排他隊(duì)列二,連接斷開后自動(dòng)刪除
//private final static String QUEUE_NAME="exchange_fanout";
//交換機(jī)
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//設(shè)置主機(jī)ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
Connection connection = null;
Channel channel = null;
try {
//2.創(chuàng)建連接
connection = factory.newConnection();
//3.獲取通道
channel = connection.createChannel();
/**
* 4.綁定交換機(jī) fanout:廣播模式
* 交換機(jī)名,模式
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//5.創(chuàng)建消息,模擬發(fā)送手機(jī)號(hào)碼喝右鍵地址
String message = "110|110!qq.com";
//6.將產(chǎn)生的消息發(fā)送至交換機(jī)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//7.關(guān)閉連接 先關(guān)閉通道,再關(guān)閉連接,順序很重要
//關(guān)閉通道
if (null != channel && channel.isOpen()) {
channel.close();
}
//關(guān)閉連接
if (null != connection && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
發(fā)送結(jié)果 消息接收
/**
* 發(fā)布于訂閱模式隊(duì)列-fanout廣播模式-消息接收者01
*/
class MqRecvFanout01 {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//設(shè)置主機(jī)ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//設(shè)置虛擬主機(jī)
try {
//2.創(chuàng)建連接
final Connection connection = factory.newConnection();
//3.獲取通道
final Channel channel = connection.createChannel();
/**
* 4.綁定交換機(jī) fanount:廣播模式
* 交換機(jī)名稱 、 模式設(shè)置
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//5.獲取隊(duì)列名
String queueName = channel.queueDeclare().getQueue();
/**
* 6.綁定隊(duì)列
* 隊(duì)列名稱。交換機(jī)名稱?!啊?/p>
*/
channel.queueBind(queueName, EXCHANGE_NAME, "");
//7.現(xiàn)在一個(gè)消費(fèi)者處理完畢,發(fā)出反饋之后,才會(huì)進(jìn)行第二次發(fā)送
channel.basicQos(1);
//8.獲取消息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received01 '" + message + "'");
//9.手動(dòng)回執(zhí)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* 10.監(jiān)聽隊(duì)列
* 隊(duì)列名。true自動(dòng)確認(rèn)/false手動(dòng)確認(rèn)
*/
channel.basicConsume(queueName,false,deliverCallback,consumerTag->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 發(fā)布于訂閱模式隊(duì)列-fanout廣播模式-消息接收者02
*/
class MqRecvFanout02 {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//設(shè)置主機(jī)ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//設(shè)置虛擬主機(jī)名
try {
//2.創(chuàng)建連接
final Connection connection = factory.newConnection();
//3.獲取通道
final Channel channel = connection.createChannel();
//4.綁定交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//5.獲取隊(duì)列名稱
String queueName = channel.queueDeclare().getQueue();
//6.交換機(jī)喝隊(duì)列綁定
channel.queueBind(queueName,EXCHANGE_NAME,"");
//7.給一個(gè)消費(fèi)者發(fā)送的消息不超過1條,消費(fèi)者處理完,發(fā)送反饋后,再進(jìn)行第二次發(fā)送
channel.basicQos(1);
//8.獲取消息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received01 '" + message + "'");
//9.手動(dòng)回執(zhí)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//10.監(jiān)聽隊(duì)列 隊(duì)列名,手動(dòng)確認(rèn)false
channel.basicConsume(queueName,false,deliverCallback,consumerTag->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
結(jié)果 總結(jié) 生產(chǎn)者發(fā)送了一條消息,所有的消費(fèi)者均可以收到消息,但沒有指定具體的消費(fèi)者
6.Routing-路由模式隊(duì)列
生產(chǎn)者產(chǎn)生的消息投給交換機(jī),交換機(jī)發(fā)送消息時(shí)的Exchange Types為direct(路由模式)類型,消息通過定義的Routing Key被路由到指定的隊(duì)列進(jìn)行后續(xù)消費(fèi) 消息發(fā)送
/**
* direct路由模式隊(duì)列-消息發(fā)送者
*/
class MqSendDirect {
//交換機(jī)名稱
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
Connection connection = null;
Channel channel = null;
try {
//2.創(chuàng)建連接
connection = factory.newConnection();
//3.獲取通道
channel = connection.createChannel();
/**
* 4.綁定交換機(jī)
* 交換機(jī)名稱 , 模式
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//5.創(chuàng)建不同類型的消息
String message = "INFO消息";
// String message="WARNING消息";
// String message="ERROR消息";
String rountingKey = "info";
// String rountingKey ="error";
/**
* 6.將消息放入交換機(jī)
* 交換機(jī)名 , 路由(rountungKey) , ,消息
*/
channel.basicPublish(EXCHANGE_NAME, rountingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
//7.關(guān)閉連接,先關(guān)閉通道,再關(guān)閉連接
try {
//關(guān)閉通道
if (null != channel && channel.isOpen()) {
channel.close();
}
//關(guān)閉連接
if (null != connection && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
結(jié)果 消息接收 消費(fèi)者對(duì)消息進(jìn)行后續(xù)消費(fèi)時(shí),對(duì)于接收消息的隊(duì)列在對(duì)消息進(jìn)行接收時(shí),綁定到每一個(gè)交換機(jī)上的隊(duì)列均會(huì)指定其Routing Key規(guī)則,通過路由規(guī)則將消息路由到執(zhí)行隊(duì)列中
/**
* direct路由模式隊(duì)列-消息接收者01
*/
class MqRecvDirect01 {
//交換機(jī)名稱
private static final String ECHANGE_NAME = "exchange_direct";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
try {
//2.創(chuàng)建連接
Connection connection = factory.newConnection();
//3.獲取通道
Channel channel = connection.createChannel();
/**
* 4.綁定交換機(jī)
* 交換機(jī)名稱,模式
*/
channel.exchangeDeclare(ECHANGE_NAME, BuiltinExchangeType.DIRECT);
//5.獲取隊(duì)列名
String queueName = channel.queueDeclare().getQueue();
/**
* 6.隊(duì)列與交換機(jī)綁定
* 隊(duì)列名,交換機(jī)名,路由
*/
String routingKeyInfo = "info";
String routingKeyWarning = "warning";
channel.queueBind(queueName, ECHANGE_NAME, routingKeyInfo);
channel.queueBind(queueName, ECHANGE_NAME, routingKeyWarning);
//7.一個(gè)消費(fèi)者只能同時(shí)處理一條消息
channel.basicQos(1);
//8.獲取消息
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [X] Direct01 '" + message + "'");
//9.手動(dòng)回執(zhí)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* 10.監(jiān)聽隊(duì)列
* 隊(duì)列名,手動(dòng)確認(rèn),消息回執(zhí)
*/
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
結(jié)果01
/**
* direct路由模式隊(duì)列-消息接收者01
*/
class MqRecvDirect02 {
//交換機(jī)名稱
private static final String ECHANGE_NAME = "exchange_direct";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
try {
//2.創(chuàng)建鏈接
Connection connection =factory.newConnection();
//3.獲取通道
Channel channel = connection.createChannel();
/**
* 4.邦迪交換機(jī)
* 交換機(jī)名,模式
*/
channel.exchangeDeclare(ECHANGE_NAME,BuiltinExchangeType.DIRECT);
//5.獲取隊(duì)列
String queueName = channel.queueDeclare().getQueue();
//6.綁定隊(duì)列
String routingKeyError = "error";
channel.queueBind(queueName, ECHANGE_NAME, routingKeyError);
//7.一個(gè)消費(fèi)者只能同時(shí)處理一條消息
channel.basicQos(1);
//8.獲取消息
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [X] Direct02 '" + message + "'");
//9.手動(dòng)回執(zhí)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
/**
* 10.監(jiān)聽隊(duì)列
* 隊(duì)列名,手動(dòng)確認(rèn),消息回調(diào)
*/
channel.basicConsume(queueName,false,deliverCallback,cosumerTag->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
結(jié)果02 總結(jié) 生產(chǎn)者發(fā)送了多條設(shè)置了不同路由規(guī)則的消息,消費(fèi)者可以根據(jù)具體的路由規(guī)則消費(fèi)對(duì)應(yīng)隊(duì)列中的消息,而不是所有消費(fèi)者都可以消費(fèi)所有消息
7 Topics-主題模式隊(duì)列
當(dāng)生產(chǎn)者產(chǎn)生的消息如果場(chǎng)景需求過多,需要設(shè)置很多路由規(guī)則的時(shí)候,需要采用主題模式隊(duì)列;
routingkey為一個(gè)句點(diǎn)號(hào)'.'分割的字符串,我們將被句點(diǎn)號(hào)'.'分割開的每一段獨(dú)立的字符串稱為一個(gè)單詞。如:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”;
routingkey中可以存在兩種特殊字符*與'#',用于做模糊匹配,*用于匹配一個(gè)單詞,#用于匹配多個(gè)單詞(可以是零個(gè))
消息發(fā)送
/**
* topic主題模式隊(duì)列-消息發(fā)送者
*/
class MqSendTopic01 {
//交換機(jī)名稱
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
Connection connection = null;
Channel channel = null;
try {
//2.創(chuàng)建連接
connection = factory.newConnection();
//3.獲取通道
channel = connection.createChannel();
/**
* 4.綁定交換機(jī)
* 交換機(jī)名稱,主題
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//5.創(chuàng)建消息并設(shè)置理由
String message = "查詢操作";
// String message = "更新操作";
String routingKey = "select.goods.byId";
// String routingKey="update.goods.byId.addName";
//6.將產(chǎn)生的消息發(fā)送給至交換機(jī)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
;
System.out.println(" [x] SentTopic '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
//7.關(guān)閉連接,先關(guān)閉通道,再關(guān)閉連接
try {
//關(guān)閉通道
if (null != channel && channel.isOpen()) {
channel.close();
}
//關(guān)閉連接
if (null != connection && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
結(jié)果
消息接收 消息接收01
/**
* topic主題模式-消息接收者01
*/
class MqRecvTopic01 {
//交換機(jī)名稱
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
try {
//2.創(chuàng)建連接
Connection connection = factory.newConnection();
//3.獲取通道
Channel channel = connection.createChannel();
/**
* 4.綁定交換機(jī)
* 交換
* 機(jī)名,模式名稱
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//5.獲取隊(duì)列名
String queueName = channel.queueDeclare().getQueue();
//6.設(shè)置路由routingKey
String routingKey = "select.goods.*";
/**
* 7.綁定隊(duì)列
* 隊(duì)列名 ,交換機(jī)名,路由
*/
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
//8.一個(gè)消費(fèi)之只能同時(shí)處理1條
channel.basicQos(1);
//9.獲取消息
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] RecvTopic01 '" + message + "'");
//10.手動(dòng)回執(zhí)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* 11.監(jiān)聽隊(duì)列
* 隊(duì)列名 手動(dòng)確認(rèn) ,消息回調(diào)
*/
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
結(jié)果01
/**
* topic主題模式-消息接收者02
*/
class MqRecvTopic02 {
//交換機(jī)名稱
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//綁定虛擬主機(jī)
try {
//2.創(chuàng)建連接
Connection connection = factory.newConnection();
//3.獲取通道
Channel channel = connection.createChannel();
/**
* 4.綁定交換機(jī)
* 交換機(jī)名稱, 隊(duì)列模式
*/
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//5.獲取隊(duì)列名
String queueName= channel.queueDeclare().getQueue();
//6.設(shè)置路由routingKey
String routingKey = "update.goods.#";
/**
* 7.綁定隊(duì)列
* 隊(duì)列名,交換機(jī)名,路由
*/
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
//8.一個(gè)消費(fèi)者只能同時(shí)消費(fèi)一條消息
channel.basicQos(1);
//9.獲取消息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag ,delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received02 '" + message + "'");
//10.手動(dòng)回執(zhí)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
/**
* 11.監(jiān)聽隊(duì)列
* 隊(duì)列名,手動(dòng)確認(rèn)消息,消息回調(diào)
*/
channel.basicConsume(queueName,false,deliverCallback,consumerTag->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
總結(jié) 生產(chǎn)者發(fā)送了多條設(shè)置了匹配規(guī)則的消息,根據(jù)不同的路由匹配規(guī)則,將消息根據(jù)指定的路由到匹配的隊(duì)列中
8.RPC-遠(yuǎn)程過程調(diào)用模式隊(duì)列
mq是基于異步進(jìn)行消息處理,生產(chǎn)者將消息發(fā)送到mq后不知道消費(fèi)者處理成功或者失敗,甚至都不知道有沒有消費(fèi)者來(lái)處理。而實(shí)際應(yīng)用過程中,可能需要同步處理,需要同步等待服務(wù)端將我的消息處理完成后再進(jìn)行下一步處理。這相當(dāng)于RPC(Remote Procedure call,遠(yuǎn)程過程調(diào)用)。再RabbitMQ中也支持RPC
RabbitMQ中實(shí)現(xiàn)RPC的機(jī)制是: 1.客服端發(fā)哦是那個(gè)請(qǐng)求,再消息的屬性(AMQP協(xié)議中定義了14中propertiec,這14種屬性會(huì)隨著消息一起發(fā)送)中設(shè)置了兩個(gè)值replyTo(一個(gè)Queue名稱,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到這個(gè)Queue中)和correlationId(此次請(qǐng)求的標(biāo)識(shí)號(hào),服務(wù)器處理完成后需要將次屬性返回,客戶端將根據(jù)這個(gè)id了解那條請(qǐng)求被成功執(zhí)行了或執(zhí)行失?。?2.服務(wù)器端收到消息并處理 3.服務(wù)器端處理完消息后,將生成一條應(yīng)答消息到replyTo指定Queue,同時(shí)攜帶correlationId屬性
客戶端之前已訂閱repluTo指定的Queue,從中收到服務(wù)器的應(yīng)答消息后,根據(jù)其中的correlationId屬性分析那條請(qǐng)求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理。
服務(wù)端
/**
* RPC模式隊(duì)列-服務(wù)器
*/
class MqRPCServer01 {
//隊(duì)列名稱
private static final String RPC_QUEUE_NAME = "rpc_queue";
/**
* 計(jì)算斐波那契數(shù)列
*
* @param n
* @return
*/
private static int fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] args) {
//1.創(chuàng)建工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
try {
//2.創(chuàng)建連接
final Connection connection = factory.newConnection();
//3.獲取通道
final Channel channel = connection.createChannel();
/**4.聲明隊(duì)列
* 隊(duì)列名 是否持久化 是否排他 是否自動(dòng)刪除 長(zhǎng)度限制/存活時(shí)間/優(yōu)先級(jí)等限制條件
*/
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//5.清空指定隊(duì)列的內(nèi)容,而不刪除隊(duì)列
channel.queuePurge(RPC_QUEUE_NAME);
//6.限制mq只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者,當(dāng)消息處理完畢后,有了反饋才會(huì)進(jìn)行第二次發(fā)送
channel.basicQos(1);
//7.獲取消息
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//8.獲取replyTo隊(duì)列和correlationId請(qǐng)求標(biāo)識(shí)
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
//9.接收客戶端消息
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
//10.服務(wù)端根據(jù)業(yè)務(wù)需求處理
response += fib(n);
} catch (Exception e) {
e.printStackTrace();
System.out.println(" [.] " + e.toString());
} finally {
//11.將處理結(jié)果發(fā)送至repluTo隊(duì)列同時(shí)攜帶correlationId屬性
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
/**
* 12.手動(dòng)回執(zhí)
* 通道中的信息 是否自動(dòng)
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//13.RabbitMq消費(fèi)者工作線程通知RPC服務(wù)器其他所有線程運(yùn)行
synchronized (monitor) {
monitor.notify();
}
}
};
/**
* 13,監(jiān)聽隊(duì)列
* 隊(duì)列名,是否自動(dòng)確認(rèn)消息 消息回執(zhí)
*/
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
//14.線程等待并準(zhǔn)備接收來(lái)自RPC客戶端的消息
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客戶端
/**
* RPC模式隊(duì)列-客戶端
*/
class MqRPCClient01 implements AutoCloseable {
private Connection connection;
private Channel channel;
//隊(duì)列名稱
private String requestQueueName = "rpc_queue";
//初始化連接
public MqRPCClient01() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.85.131");//ip
factory.setPort(5672);//端口
factory.setUsername("shop");//賬號(hào)
factory.setPassword("shop");//密碼
factory.setVirtualHost("/shop");//虛擬主機(jī)
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] args) {
try (MqRPCClient01 fibonacciRpc = new MqRPCClient01()) {
for (int i = 0; i < 10; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
//請(qǐng)求服務(wù)端
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
//請(qǐng)求服務(wù)端
public String call(String message) throws Exception {
//1.correlationId請(qǐng)求標(biāo)識(shí)ID
final String corrId = UUID.randomUUID().toString();
//2.獲取隊(duì)列名稱
String replyQueueName = channel.queueDeclare().getQueue();
//3.設(shè)置replyTo隊(duì)列和correlationId請(qǐng)求標(biāo)識(shí)
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//4.發(fā)送消息至隊(duì)列
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//5.設(shè)置線程池等待,每次只接收一個(gè)響應(yīng)結(jié)果
final BlockingQueue
//6.接受服務(wù)器返回結(jié)果
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
//關(guān)閉連接
public void close() throws Exception {
connection.close();
}
}
運(yùn)行完客戶端后的服務(wù)端效果
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq學(xué)習(xí)
相關(guān)文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。