柚子快報(bào)激活碼778899分享:rabbitMQ(1)
柚子快報(bào)激活碼778899分享:rabbitMQ(1)
AMQP協(xié)議解釋,官網(wǎng)地址:https://www.rabbitmq.com/tutorials/amqp-concepts rabbitMQ是個(gè)消息隊(duì)列中間件,它默認(rèn)的消息傳輸協(xié)議是AMQP 0-9-1,AMQP是 Advanced Message Queuing Protocol的縮寫,意思為高級(jí)消息隊(duì)列協(xié)議。而且Spring對(duì)rabbitMQ的支持也是spring-amqp模塊。 該協(xié)議有最基本的三個(gè)組件:1,publisher(發(fā)布者),2,Messaging broker(消息代理),3,consumer(消費(fèi)者)。 其中消息代理有三個(gè)概念:1,消息隊(duì)列(queue),2,Exchange(交換機(jī)),3,bindings(綁定規(guī)則)。 所以整個(gè)流程就有了,發(fā)布者僅將消息發(fā)布到消息代理的交換機(jī),然后交換機(jī)會(huì)根據(jù)綁定規(guī)則將消息轉(zhuǎn)發(fā)到綁定到自身的隊(duì)列里面,然后通過(guò)push或者pull api 送到消費(fèi)者那里。 一、交換機(jī) Exchange 這個(gè)Exchange算是里面最重要的概念之一,我們只需要知道,一個(gè)消息代理可以有多個(gè)交換機(jī),但是發(fā)布者只能綁定一個(gè),而且只能把消息發(fā)布到這個(gè)交換機(jī)里面,然后這個(gè)交換機(jī)并不存儲(chǔ)消息,而是把消息直接推送到隊(duì)列里面。 發(fā)布者發(fā)布消息比較簡(jiǎn)單,就一條線,消息帶著參數(shù)直接扔給交換機(jī)就結(jié)束了,但是交換機(jī)可以綁定0個(gè)或者多個(gè)隊(duì)列,隊(duì)列是真正存儲(chǔ)消息的地方。如何推送消息到隊(duì)列取決于交換機(jī)的類型,及路由規(guī)則,路由規(guī)則在消費(fèi)者端也叫綁定規(guī)則。 交換機(jī)一共四種類型。還有一些屬性,其中最重要的屬性:1,name 靠這個(gè)跟發(fā)布者和消費(fèi)者綁定,2,durability 是否持久化,注意這里的持久化是只這個(gè)交換機(jī)在消息代理重啟之后是否還存在,而不是消息的持久化,3,Auto-delete 這里的自動(dòng)刪除也是 這個(gè)交換機(jī)在沒(méi)有隊(duì)列綁定之后的自動(dòng)刪除而不是消息的自動(dòng)刪除。 1,直接交換機(jī)(direct Exchange):該類型的交換機(jī)轉(zhuǎn)發(fā)消息就靠一個(gè) routing key,如果發(fā)布者和消費(fèi)者綁定都綁定到同一個(gè)直接交換機(jī)上并且 routing key 一樣,那么消息就能從發(fā)布者到該交換機(jī)然后再到消費(fèi)者。如果有多個(gè)消費(fèi)者綁定相同的routing key,默認(rèn)使用輪詢策略。 注意,rabbitMQ提前聲明了一個(gè)默認(rèn)的交換機(jī),該交換機(jī)name屬性是空字符串,它是個(gè)直接交換機(jī),消費(fèi)者創(chuàng)建出來(lái)所有的沒(méi)有綁定規(guī)則的隊(duì)列,都直接自動(dòng)綁定在這個(gè)默認(rèn)的交易鎖上。發(fā)布者只需要拿這個(gè)隊(duì)列的名字作為routing key 就可以給對(duì)應(yīng)的消費(fèi)者發(fā)送消息。這里如果有多個(gè)消費(fèi)者聲明了同一個(gè)隊(duì)列,那么會(huì)輪詢發(fā)布消息。 2,扇形交換機(jī)(fanout Exchange):這個(gè)交換機(jī)不關(guān)心routing key,只要有隊(duì)列綁定在該類型的交換機(jī)上,這個(gè)隊(duì)列就可以收到該交換機(jī)里面的信息。所以這個(gè)類型的交換機(jī)適合做廣播路由。比如用于排行榜或積分的實(shí)時(shí)更新,分布式系統(tǒng)里面的配置的實(shí)時(shí)發(fā)布更新。群聊里面分發(fā)消息等。 3,主題交換機(jī)(Topic Exchange):這個(gè)類型的交換機(jī)根據(jù)路由規(guī)則可以講消息分發(fā)到一個(gè)或者多個(gè)隊(duì)列里面,常用語(yǔ)消息的組播路由。 4,標(biāo)頭交換機(jī)(Headers Exchange):這個(gè)類型的交換機(jī)不根據(jù)routing key 來(lái)路由而是根據(jù)消息頭里面的屬性。 二、Queue 隊(duì)列 隊(duì)列就是真正存儲(chǔ)消息的一個(gè)先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),想要使用隊(duì)列必須先聲明,如果隊(duì)列不存在就會(huì)創(chuàng)建一個(gè),如果存在且隊(duì)列的屬性相同則啥也不做,如果存在但是屬性不一樣就會(huì)保存。 隊(duì)列的重要屬性有:1,name,聲明綁定都需要,可以交給消息代理自動(dòng)生成。2,durable,持久化,消息代理重啟后該隊(duì)列是否還存在,也就是消息是否持久化到硬盤,否則的話只存在內(nèi)存,重啟就沒(méi)有了。3,exclusive,獨(dú)有的,僅被一個(gè)連接使用的隊(duì)列,當(dāng)連接關(guān)閉后自動(dòng)刪除該隊(duì)列。4,auto-delete,當(dāng)沒(méi)有消費(fèi)者綁定時(shí)自動(dòng)刪除 三、綁定 bindings, 交換機(jī)想要將消息發(fā)布到隊(duì)列里面得有一些路由規(guī)則,就是交換機(jī)要綁定一個(gè)或者多個(gè)隊(duì)列,如果沒(méi)有綁定可以配置消息是直接丟棄還是放入私信隊(duì)列。 (為什么不能堅(jiān)持呢,這個(gè)東西很難嗎?不難,那么是什么在阻止你?是枯燥乏味,是精力不濟(jì),一切都會(huì)好起來(lái)的。) 四,關(guān)于消費(fèi)者, Consumers 消費(fèi)者有兩種方式從隊(duì)列里面獲取消息一個(gè)是訂閱,另一個(gè)是拉取,其中拉取叫耗費(fèi)資源不建議使用。每一個(gè)消費(fèi)者都會(huì)有個(gè)字符串標(biāo)識(shí)叫做 consumerTag,可以用來(lái)取消訂閱。 消息確認(rèn)機(jī)制有兩種,1,消息代理發(fā)送消息給消費(fèi)者 之后接著刪除消息(自動(dòng)確認(rèn)模型),2,消費(fèi)者消費(fèi)完消息之后發(fā)送信號(hào)給消息代理再刪除消息,可以保證消息不丟失(顯示確認(rèn)模型)。如果消費(fèi)者在確認(rèn)消息之前掛了,消息代理會(huì)將消息傳給另外的消費(fèi)者或者等待一個(gè)消費(fèi)者。 五、關(guān)于消息 Message 消息有很多的屬性(Attributes)比如 content-type,攜帶的數(shù)據(jù)稱之為有效負(fù)載,可以是空負(fù)載。消息可以被持久化到磁盤,但會(huì)影響性能。 六,關(guān)于連接 Connections AMQP的連接是長(zhǎng)連接 AMQP是應(yīng)用層協(xié)議,使用TCP協(xié)議傳輸,支持TLS, 七,關(guān)于通道,Channels 通道就是共享一個(gè)TCP連接的輕量級(jí)連接,通道與通道之間完全隔離,每個(gè)都會(huì)有一個(gè)ID,通道只存在與連接的上下文中,連接關(guān)閉,通道也就沒(méi)了。對(duì)于多線程系統(tǒng)而言,建議每個(gè)線程創(chuàng)建一個(gè)通道,而不是共享它。 八,關(guān)于虛擬主機(jī) Virtual Hosts 同一個(gè)消息代理可以通過(guò)虛擬主機(jī)創(chuàng)建多個(gè)隔離的環(huán)境共不同的系統(tǒng)使用。 ?九,demo代碼 根據(jù)之前對(duì)AMQP協(xié)議的簡(jiǎn)介,我們知道rabbitMQ共有四種exchange,其中最常用的一種就是topic,他的功能最強(qiáng)大,所以一般情況下,推薦直接使用topic。 topic類型的exchange最特別的就是routing key有一定的規(guī)則,必須是使用"."連接的多個(gè)單詞,可以通過(guò)* 和 # 來(lái)模糊匹配。其中*匹配單個(gè)單詞,#匹配0個(gè)或者多個(gè)單詞。 1,首先是安裝rabbitMq,自己測(cè)試推薦使用docker安裝: docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management 運(yùn)行之后 進(jìn)入容器內(nèi)部運(yùn)行命令: rabbitmq-plugins enable rabbitmq_management 開啟rabbitMQ界面管理插件,然后訪問(wèn)地址:http://localhost:15672 使用 guest:guest 登陸后可以查看UI管理界面。 2,依賴
3,生產(chǎn)者代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
運(yùn)行參數(shù):"kern.critical" "A critical kernel error" 4,消費(fèi)者代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue(); //這是聲明一個(gè)默認(rèn)的隊(duì)列
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
運(yùn)行參數(shù)可以是:"#" "kern.*" "*.critical" 或者 "kern.critical" 多運(yùn)行幾個(gè)消費(fèi)者代碼可以感受到其中的設(shè)計(jì)。 代碼地址:https://gitee.com/luxiao_gitee/rabbitmq.git ?
柚子快報(bào)激活碼778899分享:rabbitMQ(1)
推薦鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。