欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq學(xué)習(xí)

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq學(xué)習(xí)

http://yzkb.51969.com/

RabbitMq筆記

1:安裝

1.1安裝Erlang:

安裝:yum -y install esl-erlang_xx.x.x-x_centos_x_amd64.rpm 檢查:erl

1.2安裝RabbitMq

官網(wǎng)下載 https://www.rabbitmq.com/ 選擇需要下載的 點(diǎn)擊Downloading 選擇需要下載的 上傳的linux 只要能上傳到linux就行 安裝

yum -y install rabbitmq-server-x.x.x-x.elx.noarch.rpm

安裝UI插件

rabbitmq-plugins enable rabbitmq_management

啟動(dòng)rabbitMq服務(wù)

systemctl start rabbitmq-server.service

啟動(dòng)后關(guān)閉防火墻或者開啟5672和15672端口 檢查rabbitMq服務(wù)

systemctl status rabbitmq-server.service

關(guān)閉rabbitmq服務(wù):

systemctl stop rabbitmq-server.service

或者直接kill -9 xxx 直接殺死 重啟rabbitmq服務(wù):

systemctl restart rabbitmq-server.service

訪問 linux系統(tǒng)的Ip+15672 默認(rèn)賬號(hào)密碼為:guest 修改配置文件 再安裝目錄(默認(rèn):/etc/tabbitmq)創(chuàng)建一個(gè)rabbitmq.config文件。文件中添加(請(qǐng)不要忘記結(jié)尾那個(gè)“.”):

[{rabbit, [{loopback_users, []}]}].

** 重啟服務(wù)**

systemctl restart rabbitmq-server.service

1.3添加用戶權(quán)限

添加用戶,默認(rèn)用戶 guest 角色為管理員,一般開發(fā)環(huán)境下會(huì)創(chuàng)建新的用戶并對(duì)權(quán)限進(jìn)行分配。 添加新用戶并對(duì)權(quán)限進(jìn)行分配 添加用戶

用戶列表顯示用戶狀態(tài)是 No access ,代表用戶未進(jìn)行權(quán)限分配,不能進(jìn)行任何操作, 這里創(chuàng)建用戶分配權(quán)限可以類比數(shù)據(jù)庫(kù)中創(chuàng)建用戶并分配權(quán)限操作。

分配權(quán)限 創(chuàng)建 virtual hosts 可以類比創(chuàng)建數(shù)據(jù)庫(kù),分配用戶操作權(quán)限

點(diǎn)擊 admin 重新登陸即可使用新增賬號(hào)登陸

2專業(yè)術(shù)語(yǔ)

官網(wǎng)地址:http://www.rabbitmq.com/getstarted.html

2.1 Producing Producing意思不僅僅事發(fā)送消息。發(fā)送消息的程序叫做producer生產(chǎn)者 2.2Queue Queue是一個(gè)消息盒子的名稱。它存活在RabbitMq里。雖然消息流經(jīng)Rabbitm和你的應(yīng)用程序,但是他們只能在Queue里才能被保存。Queue沒有任何邊界的限制,想存多少消息都可以。本質(zhì)上是一個(gè)無(wú)限的緩存。許多生產(chǎn)者都可以向一個(gè)Queue里發(fā)送消息,許多消費(fèi)者都可以從一個(gè)Queue里接收消息。 2.3Consuming Consuming的意思和接收類似。等待接收消息的程序叫做消費(fèi)者。 注意:生產(chǎn)者、消費(fèi)者和代理人不一定非要在同一臺(tái)機(jī)器上 2.4ConnectionFactory、Connection、Channel ConnectionFactory、Connection、Channe都是RabbutMQ對(duì)外體提供的API中最基本的對(duì)象。 Connection是RabbitMQ的socket連接,它封裝了socket協(xié)議相關(guān)部分的邏輯。 ConnectionFactory為Connecton的制造工廠。 Channel是我們與RabbitMQ打交道的最重要的一個(gè)接口,我們大部分的業(yè)務(wù)操作是在Channel這個(gè)接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Excheange、發(fā)布消息等。 2.5Message acknowledgment 實(shí)際應(yīng)用中,可能會(huì)發(fā)生消費(fèi)者收到Queue中的消息,但沒有處理完后就宕機(jī)(或其他意外)的情況,這種情況下就可能會(huì)導(dǎo)致消息丟失。為了避免這種情況發(fā)生,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknwledgment)后才將該消息從Queue移除;如果RabbitMQ沒有收到回執(zhí)并檢測(cè)到消費(fèi)者的RabbitMQ連接斷開,則RabbitMQ會(huì)將該消息發(fā)送給其他消費(fèi)者(如果存在多個(gè)消費(fèi)者)進(jìn)行處理。合理不存在timeout概念,一個(gè)消費(fèi)者處理消息時(shí)間再長(zhǎng)也不會(huì)導(dǎo)致該消息被發(fā)送給其他消費(fèi)者,除非它的RabbitMQ連接斷開。 這里會(huì)產(chǎn)生另外一個(gè)問題,如果我們的開發(fā)人員在處理完業(yè)務(wù)邏輯后,忘記發(fā)送回執(zhí)給RabbitMQ,這將導(dǎo)致嚴(yán)重bug–Queue中堆積的消息會(huì)越來(lái)越多;消費(fèi)者重啟后會(huì)重復(fù)新消費(fèi)這些消息并重新執(zhí)行業(yè)務(wù)邏輯… 2.6Message durability 如果我們希望即使在RabbitMQ服務(wù)重啟的情況下,也不會(huì)丟失消息,我們可以將Queue與Message都設(shè)置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會(huì)丟失。但依然解決不了小概率丟失事件的發(fā)生(比如RabbitMQ服務(wù)器已經(jīng)接收到生產(chǎn)者的消息,但還沒來(lái)得及持久化該消息時(shí)RabbitMQ服務(wù)器就斷電了),如果我們需要對(duì)這種小概率事件也要管理起來(lái),那么我們要用到事務(wù) 2.7Prefetch count 如果有很多消費(fèi)者同時(shí)訂閱同一個(gè)Queue中的消息,Queue中的消息會(huì)被平攤給多個(gè)消費(fèi)者,這是如果每個(gè)消息的處理時(shí)間不同,就有可能會(huì)導(dǎo)致某些消費(fèi)者一直在忙,而另外一些消費(fèi)者很快就處理完手頭工作并一直空閑情況。我們可以設(shè)置prefetchCount 來(lái)限制Queue每次發(fā)送給每個(gè)消費(fèi)者的消息數(shù),比如我們?cè)O(shè)置pefetchCount=1,則Queue每次給每個(gè)消費(fèi)者發(fā)送一條消息;消費(fèi)者處理完這條消息后Queue會(huì)再給該消費(fèi)者發(fā)送一條消息。 2.8Exchange 生產(chǎn)者將消息投遞到Queue中,實(shí)際上這在RabbitMQ中這種事情永遠(yuǎn)都不會(huì)發(fā)生。實(shí)際的情況時(shí),生產(chǎn)者將消息發(fā)送到Exchange(交換器),有Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)。 Exchange是通過Binding將Exchange與Queue關(guān)聯(lián)起來(lái)的,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。 RabbitMQ中的Exchange有四種類型(fanout、direct、tpic、headers),不同類型有著不同的路由策略 fanout:fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。 direct:direct類型的Exchange路由規(guī)則也會(huì)很簡(jiǎn)單,它會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中。 topic:topic類型的Exchange再匹配規(guī)則上進(jìn)行了擴(kuò)展,它與direct類型的Exchange相似,也是將消息路由到binding key 與 outing key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定: (1)routing key為一個(gè)句點(diǎn)號(hào),分割的字符串(將被句點(diǎn)號(hào)".“分割開的每一段獨(dú)立的字符串成為一個(gè)單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” (2)binding key 與 routing key 一樣也是句點(diǎn)號(hào)”."分割的字符串 (3)binding key中可以存在兩種特殊字符”“與”#“,用于做模糊匹配。其中"“用于匹配一個(gè)的單詞。#永固匹配多個(gè)單詞(可以是0個(gè)) headers:headers類型的Exchange不依賴于 routing key 與 binding key的匹配規(guī)則來(lái)路由消息,二十根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配 再綁定Queue與Exchange時(shí)指定一組鍵值對(duì);當(dāng)消息發(fā)送到Exchange時(shí),RabbitMQ會(huì)取到該消息的headers(也是一個(gè)鍵值對(duì)ing是),對(duì)比其中的鍵值對(duì)是否完全匹配Queue與Exchange綁定時(shí)指定的鍵值對(duì);如果完全匹配則消息會(huì)路由到該Queue,否則不會(huì)路由到該Queue。 2.9routing key 生產(chǎn)者再將消息發(fā)送給Exchange的時(shí)候,一版會(huì)指定一個(gè)routing key,來(lái)指定這個(gè)消息的路由規(guī)則,而routing key需要與Exchange Type及Binding key聯(lián)合使用才能最終生效, 再Exchange Type與binding key固定的情況下(在正常使用一般這些內(nèi)容都是固定配置好的),我們的生產(chǎn)者就可以在發(fā)送消息給Exchange時(shí),通過指定routing key來(lái)決定消息流向哪里。 RabbitMQ為routing key設(shè)定的長(zhǎng)度限制為255bytes 2.10Binding RabbitMQ中哦通過binding將Exchange與Queue關(guān)聯(lián)起來(lái),這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了 2.11Binding Key 在綁定(Binding)Exchange與Queue的同時(shí),一般會(huì)指定一個(gè)binding key;消費(fèi)者將消息發(fā)送給Exchange時(shí),一般會(huì)指定一個(gè)routing key;當(dāng)binding key 與routing key相匹配時(shí),消息將會(huì)被路由到對(duì)應(yīng)的Queue中 在綁定多個(gè)Queue到同一個(gè)Exchange的時(shí)候,這些Binding允許使用相同的binding key binding key 并不是在所有情況下都生效,它依賴于Exchange Type ,比如fanout類型的Exchange就會(huì)無(wú)視binding key,而是將消息路由到所偶綁定到Exchange的Queue。 2.12ExchangeTypes RabbitMQ常用的Exchange Type有 fanout 、 direct 、 topic 、 headers 這四種(AMQP規(guī)范里還 提到兩種Exchange Type,分別為 system 與 自定義 ,) fanout:fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。 direct:direct類型的Exchange路由規(guī)則也會(huì)很簡(jiǎn)單,它會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中。 topic:topic類型的Exchange再匹配規(guī)則上進(jìn)行了擴(kuò)展,它與direct類型的Exchange相似,也是將消息路由到binding key 與 outing key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定: (1)routing key為一個(gè)句點(diǎn)號(hào),分割的字符串(將被句點(diǎn)號(hào)”.“分割開的每一段獨(dú)立的字符串成為一個(gè)單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” (2)binding key 與 routing key 一樣也是句點(diǎn)號(hào)”."分割的字符串 (3)binding key中可以存在兩種特殊字符”“與”#“,用于做模糊匹配。其中""用于匹配一個(gè)的單詞。#永固匹配多個(gè)單詞(可以是0個(gè)) headers:headers類型的Exchange不依賴于 routing key 與 binding key的匹配規(guī)則來(lái)路由消息,二十根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配 再綁定Queue與Exchange時(shí)指定一組鍵值對(duì);當(dāng)消息發(fā)送到Exchange時(shí),RabbitMQ會(huì)取到該消息的headers(也是一個(gè)鍵值對(duì)ing是),對(duì)比其中的鍵值對(duì)是否完全匹配Queue與Exchange綁定時(shí)指定的鍵值對(duì);如果完全匹配則消息會(huì)路由到該Queue,否則不會(huì)路由到該Queue。 RabbitMQ中實(shí)現(xiàn)RPC機(jī)制 1.客戶端發(fā)送請(qǐng)求(消息)時(shí),在消息的屬性(MessageProperties,在AMQP協(xié)議中定義了14種properties,這些屬性會(huì)隨著消息一起發(fā)送中設(shè)置兩個(gè)值replyTo(一個(gè)Queue名稱,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到Queue中)和correlationId(此次請(qǐng)求的標(biāo)識(shí)號(hào),服務(wù)器處理完成后需要將此屬性返還,客戶端將根據(jù)這個(gè)id了解那條請(qǐng)求被成執(zhí)行了或執(zhí)行失敗) 2.服務(wù)器端收到消息并處理 3.服務(wù)器端處理完消息后,將生成一條應(yīng)答消息到replyTo指定的Queue,同時(shí)帶上corelationId屬性 4.客戶端之前已訂閱replyTo指定的Queue,從中收到服務(wù)器的應(yīng)答消息后,根據(jù)其中的correaltionId屬性分析那條請(qǐng)求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理

3.簡(jiǎn)單隊(duì)列

需要用Java寫兩個(gè)程序;一個(gè)是生產(chǎn)者,他發(fā)送一個(gè)消息,另一個(gè)時(shí)消費(fèi)者,他接收消息,并且把消息打印出來(lái),會(huì)忽略一些Java API的細(xì)節(jié), 創(chuàng)建項(xiàng)目 創(chuàng)建一個(gè)maven項(xiàng)目 添加如下依賴:

com.rabbitmq

amqp-client

消息發(fā)送

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**

* 簡(jiǎn)單模式隊(duì)列-消息發(fā)送者

*/

@Component

class MqSendTest{

//隊(duì)列名稱

public final static String QUUE_NAME = "ycMq02";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

Connection connection = null;

Channel channel = null;

try {

// 通過工廠創(chuàng)建連接

connection = factory.newConnection();

//獲取通道

channel = connection.createChannel();

/**

* 聲明隊(duì)列

* 第一個(gè)參數(shù)queue:隊(duì)列名

* 第二個(gè)擦?xí)鴇urable:是否持久化

* 第三額參數(shù)Exclusive:排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列對(duì)首次聲明它的連接可見,并在連接斷開時(shí)自動(dòng)珊瑚蟲

* 需要注意三點(diǎn):

* 1:排他隊(duì)列時(shí)基于連接可見的,同一連接的不同通道時(shí)可以同時(shí)訪問同一個(gè)連接創(chuàng)建的排他隊(duì)列的

* 2:”首次“,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的,這個(gè)與普通隊(duì)列不同

* 3:即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出,該排他隊(duì)列都會(huì)被自動(dòng)刪除的

* 這種隊(duì)列適用于只限于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場(chǎng)景

* 第四個(gè)參數(shù)Auto-delete:自動(dòng)刪除,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除

* 這種隊(duì)列只適用于臨時(shí)隊(duì)列

*/

channel.queueDeclare(QUUE_NAME,false,false,false,null);

//創(chuàng)建消息

String message="Hello World??!";

//將產(chǎn)生的消息放入隊(duì)列

channel.basicPublish("",QUUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));

System.out.println(" [x] Sent '"+message + "'");

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

//關(guān)閉通道

if (null!=channel&&channel.isOpen()){

channel.close();

}

//關(guān)閉連接

if (null!=connection&&connection.isOpen()){

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

消息發(fā)送成后可以通過RabitMQ管理界面查看 消息接收

/**

* 簡(jiǎn)單模式隊(duì)列-消息接收者

*/

class MqRecvTest{

//隊(duì)列名稱

public final static String QUUE_NAME = "ycMq02";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

try {

//通過工廠創(chuàng)建連接

Connection connection=factory.newConnection();

//獲取通道

Channel channel=connection.createChannel();

//指定隊(duì)列

channel.queueDeclare(QUUE_NAME,false,false,false,null);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//獲取消息

DeliverCallback deliverCallback=(consumerTag,delivery)->{

String message=new String(delivery.getBody(),"UTF-8");

System.out.println(" [x] Received '" + message + "'");

};

//監(jiān)聽隊(duì)列

channel.basicConsume(QUUE_NAME,true,deliverCallback,consumerTag->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

消息接收成功后可以通過RabbitMQ管理界面查看 測(cè)試 發(fā)送: 接收 總結(jié) 簡(jiǎn)單隊(duì)列處理消息效率不高,吞吐量較低,如果任務(wù)量很大,會(huì)造成消息得不到消費(fèi),出現(xiàn)內(nèi)存溢出,消息丟失等問題,這個(gè)時(shí)候就需要配置多個(gè)消費(fèi)者消費(fèi)消息

4.Work queues-工作模式隊(duì)列

4.1消息輪詢

簡(jiǎn)單隊(duì)列中存在的角色:生產(chǎn)者、消費(fèi)者、隊(duì)列,當(dāng)消費(fèi)者只有一個(gè),消息量非常大的時(shí)候,單個(gè)消費(fèi)者處理消息就會(huì)變慢,同時(shí)給節(jié)點(diǎn)頁(yè)帶來(lái)很大壓力,導(dǎo)致消息堆積越來(lái)越多。這是需要使用RabbitMQ提供的工作隊(duì)列模式,通過工作隊(duì)列提供多個(gè)消費(fèi)者,對(duì)MQ產(chǎn)生的消息進(jìn)行消費(fèi),提高M(jìn)Q消息的吞吐量,降低消息的處理時(shí)間 消息發(fā)送

/**

* 工作模式隊(duì)列-輪詢發(fā)布-消息發(fā)送者

*/

class MqSentRound {

//隊(duì)列名稱

private final static String QUEUE_NAME = "work_roundRobin";

public static void main(String[] args) {

//創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

Connection connection = null;

Channel channel = null;

try {

//通過工廠創(chuàng)建連接

connection = factory.newConnection();

//獲取通道

channel = connection.createChannel();

//聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//創(chuàng)建消息

for (int i = 1; i < 21; i++) {

String message = "Hello World! ------ " + i;

//將產(chǎn)生的消息放入隊(duì)列

channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

System.out.println(" [x] Received '" + message + "'");

}

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

//關(guān)閉通道

if (null != channel && channel.isOpen()) {

channel.close();

}

//關(guān)閉連接

if (null != connection && connection.isOpen()) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

RabitMQ管理界面查看 結(jié)果

消息接收

/**

* 工作模式隊(duì)列-輪詢發(fā)布-消息接收者

*/

class MqRecvRound01 {

//隊(duì)列名稱

private final static String QUEUE_NAME = "work_roundRobin";

public static void main(String[] args) {

//創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

try {

//通過工廠創(chuàng)建連接

Connection connection = factory.newConnection();

//獲取通道

Channel channel = connection.createChannel();

//指定隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//獲取消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received01 '" + message + "'");

//模擬程序執(zhí)行所耗時(shí)

try {

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}

};

//監(jiān)聽隊(duì)列

channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag ->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

/**

* 工作模式隊(duì)列-輪詢發(fā)布-消息接收者

*/

class MqRecvRound02 {

//隊(duì)列名稱

private final static String QUEUE_NAME = "work_roundRobin";

public static void main(String[] args) {

//創(chuàng)建連接工廠

ConnectionFactory factory =new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

try {

//創(chuàng)建連接工廠

Connection connection=factory.newConnection();

//獲取通道

Channel channel =connection.createChannel();

//綁定隊(duì)列

channel.queueDeclare(QUEUE_NAME,false, false,false,null);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//獲取消息

DeliverCallback deliverCallback = (consumerTag , delivery)->{

String message = new String(delivery.getBody(),"UTF-8");

System.out.println(" [x] Received02 '" + message + "'");

//模擬程序執(zhí)行所耗時(shí)

try {

Thread.sleep(2000);

} catch (Exception e) {

e.printStackTrace();

}

};

//監(jiān)聽隊(duì)列

channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag ->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

RabitMQ管理界面查看 總結(jié) 任務(wù)量很大,消息雖然得到了及時(shí)消費(fèi),單位時(shí)間呢i處理速度加快,提高了吞吐量;可是不同消費(fèi)者處理消息的時(shí)間不同,導(dǎo)致部分消費(fèi)者的資源被浪費(fèi)

4.2消息公平分發(fā)

消息發(fā)布采用的是默認(rèn)輪詢分發(fā),消息應(yīng)答采用的是自動(dòng)應(yīng)答模式,當(dāng)消息進(jìn)入隊(duì)列,RabbitMq就會(huì)分派消息。不會(huì)看消費(fèi)者的應(yīng)答數(shù)目,盲目的將第n條消息發(fā)給第n個(gè)消費(fèi)者。這個(gè)時(shí)候需要使用bassicQos(prefetchCount = 1)方法,來(lái)限制RabbitMQ只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者。當(dāng)消息處理完畢后,有了反饋,才會(huì)進(jìn)行第二次發(fā)送

消息發(fā)送

/**

* 工作模式隊(duì)列-公平分發(fā)-消息發(fā)送者

*/

class MqSendFair {

//隊(duì)列名稱

private final static String QUEUE_NAME = "work_fair";

public static void main(String[] args) {

//創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

Connection connection = null;

Channel channel = null;

try {

//創(chuàng)建連接

connection = factory.newConnection();

//獲取通道

channel = connection.createChannel();

//聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//創(chuàng)建消息

for (int i = 1; i < 22; i++) {

String message = "Hello World! ------ " + i;

//將產(chǎn)生的消息放入隊(duì)列

channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

System.out.println("[x] Sent '" + message + "'");

}

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

//關(guān)閉通道

if (null != channel && channel.isOpen()) {

channel.close();

}

//關(guān)閉連接

if (null != connection && connection.isOpen()) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

RabitMQ管理界面查看 消息接收

/**

* 工作模式隊(duì)列-公平分發(fā)-消息接收者

*/

class MqRecvFair01 {

//隊(duì)列名稱

private final static String QUEUE_NAME = "work_fair";

public static void main(String[] args) {

//創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

try {

//通過工廠創(chuàng)建連接

final Connection connection = factory.newConnection();

//獲取通道

final Channel channel = connection.createChannel();

//指定隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/**

* 限制RabbitMq只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者

* 當(dāng)消息處理完畢后,有了發(fā)聵,才會(huì)進(jìn)行第二次發(fā)送

*/

int prefetchCount = 1;

channel.basicQos(prefetchCount);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//獲取消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received01 '" + message + "'");

//模擬程序執(zhí)行所耗時(shí)

try {

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}

//手動(dòng)回執(zhí)消息

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

//監(jiān)聽隊(duì)列

/**

* autoAck = true 代表自動(dòng)確認(rèn)消息

* autoAck = false 代表手動(dòng)確認(rèn)消息

*/

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {

});

} catch (Exception e) {

e.printStackTrace();

}

}

}

/**

* 工作模式隊(duì)列-公平分發(fā)-消息接收者

*/

class MqRecvFair02 {

//隊(duì)列名稱

private final static String QUEUE_NAME = "work_fair";

public static void main(String[] args) {

//創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");

factory.setPort(5672);

factory.setUsername("shop");

factory.setPassword("shop");

factory.setVirtualHost("/shop");

try {

//通過工廠創(chuàng)建連接

final Connection connection = factory.newConnection();

//創(chuàng)建隊(duì)列

final Channel channel = connection.createChannel();

//指定隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/**

* 限制RabbitMQ只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者

* 當(dāng)消息處理完畢后,有了反饋,才會(huì)進(jìn)行第二次發(fā)送

*/

int prefetchCount = 1;

channel.basicQos(prefetchCount);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//獲取消息

DeliverCallback deliverCallback = (consumeTab, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received02 '" + message + "'");

//模擬程序執(zhí)行所耗時(shí)

try {

Thread.sleep(2000);

} catch (Exception e) {

e.printStackTrace();

}

//手動(dòng)回執(zhí)消息

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

//監(jiān)聽隊(duì)列

/**

* autoAck = true 代表自動(dòng)確認(rèn)消息

* autoAck = false 代表手動(dòng)確認(rèn)消息

*/

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {

});

} catch (Exception e) {

e.printStackTrace();

}

}

}

總結(jié) 公平輪詢發(fā)布-根據(jù)不同消費(fèi)者處理消息的速度不同,收到的數(shù)量也不同,處理速度快的消息數(shù)量比較多,最大化使用計(jì)算機(jī)資源。生產(chǎn)者產(chǎn)生的消息只可以被一個(gè)消費(fèi)者消費(fèi)

5.Publish/Subscribe- 發(fā)布與訂閱模式

當(dāng)生產(chǎn)者把消息推送出去后,不同的消費(fèi)者均可以對(duì)該消息進(jìn)行消費(fèi),而不是消息被一個(gè)消費(fèi)者消費(fèi)后就立即從隊(duì)列中刪除,這中處理方式稱之為消息的發(fā)布與訂閱模式

消息發(fā)送

/**

* 發(fā)布于訂閱模式隊(duì)列-fanout廣播模式-消息發(fā)送者

*/

class MqSendFanout {

//隊(duì)列名稱

//如果不生命隊(duì)列,會(huì)使用默認(rèn)值,RabbitMQ會(huì)創(chuàng)建一個(gè)排他隊(duì)列二,連接斷開后自動(dòng)刪除

//private final static String QUEUE_NAME="exchange_fanout";

//交換機(jī)

private final static String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//設(shè)置主機(jī)ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

Connection connection = null;

Channel channel = null;

try {

//2.創(chuàng)建連接

connection = factory.newConnection();

//3.獲取通道

channel = connection.createChannel();

/**

* 4.綁定交換機(jī) fanout:廣播模式

* 交換機(jī)名,模式

*/

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

//5.創(chuàng)建消息,模擬發(fā)送手機(jī)號(hào)碼喝右鍵地址

String message = "110|110!qq.com";

//6.將產(chǎn)生的消息發(fā)送至交換機(jī)

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));

System.out.println(" [x] Sent '" + message + "'");

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

//7.關(guān)閉連接 先關(guān)閉通道,再關(guān)閉連接,順序很重要

//關(guān)閉通道

if (null != channel && channel.isOpen()) {

channel.close();

}

//關(guān)閉連接

if (null != connection && connection.isOpen()) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

發(fā)送結(jié)果 消息接收

/**

* 發(fā)布于訂閱模式隊(duì)列-fanout廣播模式-消息接收者01

*/

class MqRecvFanout01 {

//交換機(jī)名稱

private final static String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//設(shè)置主機(jī)ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//設(shè)置虛擬主機(jī)

try {

//2.創(chuàng)建連接

final Connection connection = factory.newConnection();

//3.獲取通道

final Channel channel = connection.createChannel();

/**

* 4.綁定交換機(jī) fanount:廣播模式

* 交換機(jī)名稱 、 模式設(shè)置

*/

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

//5.獲取隊(duì)列名

String queueName = channel.queueDeclare().getQueue();

/**

* 6.綁定隊(duì)列

* 隊(duì)列名稱。交換機(jī)名稱?!啊?/p>

*/

channel.queueBind(queueName, EXCHANGE_NAME, "");

//7.現(xiàn)在一個(gè)消費(fèi)者處理完畢,發(fā)出反饋之后,才會(huì)進(jìn)行第二次發(fā)送

channel.basicQos(1);

//8.獲取消息

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received01 '" + message + "'");

//9.手動(dòng)回執(zhí)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

/**

* 10.監(jiān)聽隊(duì)列

* 隊(duì)列名。true自動(dòng)確認(rèn)/false手動(dòng)確認(rèn)

*/

channel.basicConsume(queueName,false,deliverCallback,consumerTag->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

/**

* 發(fā)布于訂閱模式隊(duì)列-fanout廣播模式-消息接收者02

*/

class MqRecvFanout02 {

//交換機(jī)名稱

private final static String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//設(shè)置主機(jī)ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//設(shè)置虛擬主機(jī)名

try {

//2.創(chuàng)建連接

final Connection connection = factory.newConnection();

//3.獲取通道

final Channel channel = connection.createChannel();

//4.綁定交換機(jī)

channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);

//5.獲取隊(duì)列名稱

String queueName = channel.queueDeclare().getQueue();

//6.交換機(jī)喝隊(duì)列綁定

channel.queueBind(queueName,EXCHANGE_NAME,"");

//7.給一個(gè)消費(fèi)者發(fā)送的消息不超過1條,消費(fèi)者處理完,發(fā)送反饋后,再進(jìn)行第二次發(fā)送

channel.basicQos(1);

//8.獲取消息

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag,delivery)->{

String message = new String(delivery.getBody(),"UTF-8");

System.out.println(" [x] Received01 '" + message + "'");

//9.手動(dòng)回執(zhí)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

//10.監(jiān)聽隊(duì)列 隊(duì)列名,手動(dòng)確認(rèn)false

channel.basicConsume(queueName,false,deliverCallback,consumerTag->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

結(jié)果 總結(jié) 生產(chǎn)者發(fā)送了一條消息,所有的消費(fèi)者均可以收到消息,但沒有指定具體的消費(fèi)者

6.Routing-路由模式隊(duì)列

生產(chǎn)者產(chǎn)生的消息投給交換機(jī),交換機(jī)發(fā)送消息時(shí)的Exchange Types為direct(路由模式)類型,消息通過定義的Routing Key被路由到指定的隊(duì)列進(jìn)行后續(xù)消費(fèi) 消息發(fā)送

/**

* direct路由模式隊(duì)列-消息發(fā)送者

*/

class MqSendDirect {

//交換機(jī)名稱

private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

Connection connection = null;

Channel channel = null;

try {

//2.創(chuàng)建連接

connection = factory.newConnection();

//3.獲取通道

channel = connection.createChannel();

/**

* 4.綁定交換機(jī)

* 交換機(jī)名稱 , 模式

*/

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

//5.創(chuàng)建不同類型的消息

String message = "INFO消息";

// String message="WARNING消息";

// String message="ERROR消息";

String rountingKey = "info";

// String rountingKey ="error";

/**

* 6.將消息放入交換機(jī)

* 交換機(jī)名 , 路由(rountungKey) , ,消息

*/

channel.basicPublish(EXCHANGE_NAME, rountingKey, null, message.getBytes(StandardCharsets.UTF_8));

System.out.println(" [x] Sent '" + message + "'");

} catch (Exception e) {

e.printStackTrace();

} finally {

//7.關(guān)閉連接,先關(guān)閉通道,再關(guān)閉連接

try {

//關(guān)閉通道

if (null != channel && channel.isOpen()) {

channel.close();

}

//關(guān)閉連接

if (null != connection && connection.isOpen()) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

結(jié)果 消息接收 消費(fèi)者對(duì)消息進(jìn)行后續(xù)消費(fèi)時(shí),對(duì)于接收消息的隊(duì)列在對(duì)消息進(jìn)行接收時(shí),綁定到每一個(gè)交換機(jī)上的隊(duì)列均會(huì)指定其Routing Key規(guī)則,通過路由規(guī)則將消息路由到執(zhí)行隊(duì)列中

/**

* direct路由模式隊(duì)列-消息接收者01

*/

class MqRecvDirect01 {

//交換機(jī)名稱

private static final String ECHANGE_NAME = "exchange_direct";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

try {

//2.創(chuàng)建連接

Connection connection = factory.newConnection();

//3.獲取通道

Channel channel = connection.createChannel();

/**

* 4.綁定交換機(jī)

* 交換機(jī)名稱,模式

*/

channel.exchangeDeclare(ECHANGE_NAME, BuiltinExchangeType.DIRECT);

//5.獲取隊(duì)列名

String queueName = channel.queueDeclare().getQueue();

/**

* 6.隊(duì)列與交換機(jī)綁定

* 隊(duì)列名,交換機(jī)名,路由

*/

String routingKeyInfo = "info";

String routingKeyWarning = "warning";

channel.queueBind(queueName, ECHANGE_NAME, routingKeyInfo);

channel.queueBind(queueName, ECHANGE_NAME, routingKeyWarning);

//7.一個(gè)消費(fèi)者只能同時(shí)處理一條消息

channel.basicQos(1);

//8.獲取消息

System.out.println(" [*] Waiting for message. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [X] Direct01 '" + message + "'");

//9.手動(dòng)回執(zhí)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

/**

* 10.監(jiān)聽隊(duì)列

* 隊(duì)列名,手動(dòng)確認(rèn),消息回執(zhí)

*/

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {

});

} catch (Exception e) {

e.printStackTrace();

}

}

}

結(jié)果01

/**

* direct路由模式隊(duì)列-消息接收者01

*/

class MqRecvDirect02 {

//交換機(jī)名稱

private static final String ECHANGE_NAME = "exchange_direct";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

try {

//2.創(chuàng)建鏈接

Connection connection =factory.newConnection();

//3.獲取通道

Channel channel = connection.createChannel();

/**

* 4.邦迪交換機(jī)

* 交換機(jī)名,模式

*/

channel.exchangeDeclare(ECHANGE_NAME,BuiltinExchangeType.DIRECT);

//5.獲取隊(duì)列

String queueName = channel.queueDeclare().getQueue();

//6.綁定隊(duì)列

String routingKeyError = "error";

channel.queueBind(queueName, ECHANGE_NAME, routingKeyError);

//7.一個(gè)消費(fèi)者只能同時(shí)處理一條消息

channel.basicQos(1);

//8.獲取消息

System.out.println(" [*] Waiting for message. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag,delivery)->{

String message = new String(delivery.getBody(),"UTF-8");

System.out.println(" [X] Direct02 '" + message + "'");

//9.手動(dòng)回執(zhí)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

/**

* 10.監(jiān)聽隊(duì)列

* 隊(duì)列名,手動(dòng)確認(rèn),消息回調(diào)

*/

channel.basicConsume(queueName,false,deliverCallback,cosumerTag->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

結(jié)果02 總結(jié) 生產(chǎn)者發(fā)送了多條設(shè)置了不同路由規(guī)則的消息,消費(fèi)者可以根據(jù)具體的路由規(guī)則消費(fèi)對(duì)應(yīng)隊(duì)列中的消息,而不是所有消費(fèi)者都可以消費(fèi)所有消息

7 Topics-主題模式隊(duì)列

當(dāng)生產(chǎn)者產(chǎn)生的消息如果場(chǎng)景需求過多,需要設(shè)置很多路由規(guī)則的時(shí)候,需要采用主題模式隊(duì)列;

routingkey為一個(gè)句點(diǎn)號(hào)'.'分割的字符串,我們將被句點(diǎn)號(hào)'.'分割開的每一段獨(dú)立的字符串稱為一個(gè)單詞。如:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”;

routingkey中可以存在兩種特殊字符*與'#',用于做模糊匹配,*用于匹配一個(gè)單詞,#用于匹配多個(gè)單詞(可以是零個(gè))

消息發(fā)送

/**

* topic主題模式隊(duì)列-消息發(fā)送者

*/

class MqSendTopic01 {

//交換機(jī)名稱

private static final String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

Connection connection = null;

Channel channel = null;

try {

//2.創(chuàng)建連接

connection = factory.newConnection();

//3.獲取通道

channel = connection.createChannel();

/**

* 4.綁定交換機(jī)

* 交換機(jī)名稱,主題

*/

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//5.創(chuàng)建消息并設(shè)置理由

String message = "查詢操作";

// String message = "更新操作";

String routingKey = "select.goods.byId";

// String routingKey="update.goods.byId.addName";

//6.將產(chǎn)生的消息發(fā)送給至交換機(jī)

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

;

System.out.println(" [x] SentTopic '" + message + "'");

} catch (Exception e) {

e.printStackTrace();

} finally {

//7.關(guān)閉連接,先關(guān)閉通道,再關(guān)閉連接

try {

//關(guān)閉通道

if (null != channel && channel.isOpen()) {

channel.close();

}

//關(guān)閉連接

if (null != connection && connection.isOpen()) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

結(jié)果

消息接收 消息接收01

/**

* topic主題模式-消息接收者01

*/

class MqRecvTopic01 {

//交換機(jī)名稱

private static final String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

try {

//2.創(chuàng)建連接

Connection connection = factory.newConnection();

//3.獲取通道

Channel channel = connection.createChannel();

/**

* 4.綁定交換機(jī)

* 交換

* 機(jī)名,模式名稱

*/

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//5.獲取隊(duì)列名

String queueName = channel.queueDeclare().getQueue();

//6.設(shè)置路由routingKey

String routingKey = "select.goods.*";

/**

* 7.綁定隊(duì)列

* 隊(duì)列名 ,交換機(jī)名,路由

*/

channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

//8.一個(gè)消費(fèi)之只能同時(shí)處理1條

channel.basicQos(1);

//9.獲取消息

System.out.println(" [*] Waiting for message. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] RecvTopic01 '" + message + "'");

//10.手動(dòng)回執(zhí)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

/**

* 11.監(jiān)聽隊(duì)列

* 隊(duì)列名 手動(dòng)確認(rèn) ,消息回調(diào)

*/

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {

});

} catch (Exception e) {

e.printStackTrace();

}

}

}

結(jié)果01

/**

* topic主題模式-消息接收者02

*/

class MqRecvTopic02 {

//交換機(jī)名稱

private static final String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] args) {

//1.創(chuàng)建連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//綁定虛擬主機(jī)

try {

//2.創(chuàng)建連接

Connection connection = factory.newConnection();

//3.獲取通道

Channel channel = connection.createChannel();

/**

* 4.綁定交換機(jī)

* 交換機(jī)名稱, 隊(duì)列模式

*/

channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

//5.獲取隊(duì)列名

String queueName= channel.queueDeclare().getQueue();

//6.設(shè)置路由routingKey

String routingKey = "update.goods.#";

/**

* 7.綁定隊(duì)列

* 隊(duì)列名,交換機(jī)名,路由

*/

channel.queueBind(queueName,EXCHANGE_NAME,routingKey);

//8.一個(gè)消費(fèi)者只能同時(shí)消費(fèi)一條消息

channel.basicQos(1);

//9.獲取消息

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag ,delivery)->{

String message = new String(delivery.getBody(),"UTF-8");

System.out.println(" [x] Received02 '" + message + "'");

//10.手動(dòng)回執(zhí)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

/**

* 11.監(jiān)聽隊(duì)列

* 隊(duì)列名,手動(dòng)確認(rèn)消息,消息回調(diào)

*/

channel.basicConsume(queueName,false,deliverCallback,consumerTag->{});

} catch (Exception e) {

e.printStackTrace();

}

}

}

總結(jié) 生產(chǎn)者發(fā)送了多條設(shè)置了匹配規(guī)則的消息,根據(jù)不同的路由匹配規(guī)則,將消息根據(jù)指定的路由到匹配的隊(duì)列中

8.RPC-遠(yuǎn)程過程調(diào)用模式隊(duì)列

mq是基于異步進(jìn)行消息處理,生產(chǎn)者將消息發(fā)送到mq后不知道消費(fèi)者處理成功或者失敗,甚至都不知道有沒有消費(fèi)者來(lái)處理。而實(shí)際應(yīng)用過程中,可能需要同步處理,需要同步等待服務(wù)端將我的消息處理完成后再進(jìn)行下一步處理。這相當(dāng)于RPC(Remote Procedure call,遠(yuǎn)程過程調(diào)用)。再RabbitMQ中也支持RPC

RabbitMQ中實(shí)現(xiàn)RPC的機(jī)制是: 1.客服端發(fā)哦是那個(gè)請(qǐng)求,再消息的屬性(AMQP協(xié)議中定義了14中propertiec,這14種屬性會(huì)隨著消息一起發(fā)送)中設(shè)置了兩個(gè)值replyTo(一個(gè)Queue名稱,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到這個(gè)Queue中)和correlationId(此次請(qǐng)求的標(biāo)識(shí)號(hào),服務(wù)器處理完成后需要將次屬性返回,客戶端將根據(jù)這個(gè)id了解那條請(qǐng)求被成功執(zhí)行了或執(zhí)行失?。?2.服務(wù)器端收到消息并處理 3.服務(wù)器端處理完消息后,將生成一條應(yīng)答消息到replyTo指定Queue,同時(shí)攜帶correlationId屬性

客戶端之前已訂閱repluTo指定的Queue,從中收到服務(wù)器的應(yīng)答消息后,根據(jù)其中的correlationId屬性分析那條請(qǐng)求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理。

服務(wù)端

/**

* RPC模式隊(duì)列-服務(wù)器

*/

class MqRPCServer01 {

//隊(duì)列名稱

private static final String RPC_QUEUE_NAME = "rpc_queue";

/**

* 計(jì)算斐波那契數(shù)列

*

* @param n

* @return

*/

private static int fib(int n) {

if (n == 0) {

return 0;

}

if (n == 1) {

return 1;

}

return fib(n - 1) + fib(n - 2);

}

public static void main(String[] args) {

//1.創(chuàng)建工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

try {

//2.創(chuàng)建連接

final Connection connection = factory.newConnection();

//3.獲取通道

final Channel channel = connection.createChannel();

/**4.聲明隊(duì)列

* 隊(duì)列名 是否持久化 是否排他 是否自動(dòng)刪除 長(zhǎng)度限制/存活時(shí)間/優(yōu)先級(jí)等限制條件

*/

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

//5.清空指定隊(duì)列的內(nèi)容,而不刪除隊(duì)列

channel.queuePurge(RPC_QUEUE_NAME);

//6.限制mq只發(fā)不超過1條的消息給同一個(gè)消費(fèi)者,當(dāng)消息處理完畢后,有了反饋才會(huì)進(jìn)行第二次發(fā)送

channel.basicQos(1);

//7.獲取消息

System.out.println(" [x] Awaiting RPC requests");

Object monitor = new Object();

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

//8.獲取replyTo隊(duì)列和correlationId請(qǐng)求標(biāo)識(shí)

AMQP.BasicProperties replyProps = new AMQP.BasicProperties

.Builder()

.correlationId(delivery.getProperties().getCorrelationId())

.build();

String response = "";

try {

//9.接收客戶端消息

String message = new String(delivery.getBody(), "UTF-8");

int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");

//10.服務(wù)端根據(jù)業(yè)務(wù)需求處理

response += fib(n);

} catch (Exception e) {

e.printStackTrace();

System.out.println(" [.] " + e.toString());

} finally {

//11.將處理結(jié)果發(fā)送至repluTo隊(duì)列同時(shí)攜帶correlationId屬性

channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));

/**

* 12.手動(dòng)回執(zhí)

* 通道中的信息 是否自動(dòng)

*/

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

//13.RabbitMq消費(fèi)者工作線程通知RPC服務(wù)器其他所有線程運(yùn)行

synchronized (monitor) {

monitor.notify();

}

}

};

/**

* 13,監(jiān)聽隊(duì)列

* 隊(duì)列名,是否自動(dòng)確認(rèn)消息 消息回執(zhí)

*/

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {

});

//14.線程等待并準(zhǔn)備接收來(lái)自RPC客戶端的消息

while (true) {

synchronized (monitor) {

try {

monitor.wait();

} catch (Exception e) {

e.printStackTrace();

}

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

客戶端

/**

* RPC模式隊(duì)列-客戶端

*/

class MqRPCClient01 implements AutoCloseable {

private Connection connection;

private Channel channel;

//隊(duì)列名稱

private String requestQueueName = "rpc_queue";

//初始化連接

public MqRPCClient01() throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.85.131");//ip

factory.setPort(5672);//端口

factory.setUsername("shop");//賬號(hào)

factory.setPassword("shop");//密碼

factory.setVirtualHost("/shop");//虛擬主機(jī)

connection = factory.newConnection();

channel = connection.createChannel();

}

public static void main(String[] args) {

try (MqRPCClient01 fibonacciRpc = new MqRPCClient01()) {

for (int i = 0; i < 10; i++) {

String i_str = Integer.toString(i);

System.out.println(" [x] Requesting fib(" + i_str + ")");

//請(qǐng)求服務(wù)端

String response = fibonacciRpc.call(i_str);

System.out.println(" [.] Got '" + response + "'");

}

} catch (Exception e) {

e.printStackTrace();

}

}

//請(qǐng)求服務(wù)端

public String call(String message) throws Exception {

//1.correlationId請(qǐng)求標(biāo)識(shí)ID

final String corrId = UUID.randomUUID().toString();

//2.獲取隊(duì)列名稱

String replyQueueName = channel.queueDeclare().getQueue();

//3.設(shè)置replyTo隊(duì)列和correlationId請(qǐng)求標(biāo)識(shí)

AMQP.BasicProperties props = new AMQP.BasicProperties

.Builder()

.correlationId(corrId)

.replyTo(replyQueueName)

.build();

//4.發(fā)送消息至隊(duì)列

channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

//5.設(shè)置線程池等待,每次只接收一個(gè)響應(yīng)結(jié)果

final BlockingQueue response = new ArrayBlockingQueue<>(1);

//6.接受服務(wù)器返回結(jié)果

String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {

if (delivery.getProperties().getCorrelationId().equals(corrId)) {

response.offer(new String(delivery.getBody(), "UTF-8"));

}

}, consumerTag -> {

});

String result = response.take();

channel.basicCancel(ctag);

return result;

}

//關(guān)閉連接

public void close() throws Exception {

connection.close();

}

}

運(yùn)行完客戶端后的服務(wù)端效果

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq學(xué)習(xí)

http://yzkb.51969.com/

相關(guān)文章

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19163264.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄