柚子快報邀請碼778899分享:RabbitMQ 學(xué)習(xí)筆記
柚子快報邀請碼778899分享:RabbitMQ 學(xué)習(xí)筆記
RabbitMQ
RabbitMQ:高性能的異步通訊組件
1. 初識 MQ
1.1 同步操作
本次操作必須知道上次操作的結(jié)果缺點
拓展性差性能下降級聯(lián)失敗 優(yōu)點:
即時性高,即刻知道結(jié)果
1.2 異步操作
每次的操作相互獨立,互不影響三種角色
消息發(fā)送者:生產(chǎn)者消息代理(broker)消息接收:消費者 優(yōu)點
生產(chǎn)者只需生產(chǎn)一次消費者監(jiān)聽消息代理,便于拓展,解除耦合無需等待,性能變好故障隔離緩存消息,削峰填谷 缺點
及時性差,不能立即得到結(jié)果不確定下游業(yè)務(wù)是否執(zhí)行成功業(yè)務(wù)安全依賴于 broker 的可靠性
1.3 MQ
MQ:Message Queue,即消息隊列(先進先出),異步調(diào)用中的 broker
2. Rabbit MQ
2.1 安裝
Rabbit MQ 官網(wǎng):https://www.rabbitmq.com/
Docker 安裝
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management
# 15672端口:圖形化界面
# 5672端口:sh
docker run -e RABBITMQ_DEFAULT_USER=itheima -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 --network hmall -d rabbitmq:3.8-management
Publisher:生產(chǎn)者RabbitMQ Server Broker:RabbitMQ 消息代理VirtualHost:虛擬主機,起數(shù)據(jù)隔離作用,每個vh中的exchange和queue相互獨立。exchange:交換機,消息必須由exchange分發(fā)給不同的queuequeue:消息隊列,接收exchange發(fā)送的消息consumer:消費者,監(jiān)聽queue
2.2 快速入門
AMQP:Advance Message Queuing Protocol(高級消息隊列協(xié)議),與語言平臺無關(guān),即可以跨語言。
Spring AMQP:spring 基于 AMQP 協(xié)議的基礎(chǔ)上用 java 重新封裝的 api。
1. 創(chuàng)建聚合項目
# mq_demo pom文件
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
# publisher pom文件
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
1. 父項目引入
這樣每個微服務(wù)都可以使用
2. 每個微服務(wù)配置MQ服務(wù)器信息
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.138.10 # 地址
port: 5672 # 端口
virtual-host: /hall # 虛擬主機
username: hmall # 用戶名
password: 123456 # 密碼
注意:
上面的配置信息必須一一對應(yīng),需要與15672端口查看消費者與生產(chǎn)者都必須配置
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue() {
// 隊列名
String queueName = "simple.queue";
// 消息
String msg = "hello, AMQP";
// 發(fā)送消息
rabbitTemplate.convertAndSend(queueName,msg);
}
3. 接收消息
SpringAMQP 提供聲明式的消息監(jiān)聽,需要通過注解在方法上聲明要監(jiān)聽的隊列名稱,將來Spring AMQP 就會把消息傳遞給當(dāng)前方法。
@Component
@Slf4j
public class MqListener {
@RabbitListener(queues = "simple")
// msg 類型和上面?zhèn)魉偷念愋鸵恢?/p>
public void ListenerSimpleQueue(String msg) {
System.out.println("消費者監(jiān)聽到了 simple 的消息:【" + msg + "】");
}
}
注意:
啟動的是consumer 的啟動類,不是測試類consumer 也得配置 MQ 服務(wù)器信息
3. Work Queues
任務(wù)模型:讓多個消費者綁定一個消息隊列,共同消費隊列中的信息(每條信息只會被其中之一的消費者消費)
創(chuàng)建 work.queue 消息隊列 生產(chǎn)者一秒鐘產(chǎn)生 50 個消息 c1 一秒消費一條 c2 兩秒消費一條
3.1. 初始化
1. 創(chuàng)建 work.queue 消息隊列
2. 生產(chǎn)者生產(chǎn)消息
@Test
void testWorkQueue() throws InterruptedException {
String queueName = "work.queue";
for (int i = 1; i <= 50; i++) {
String msg = "hello, worker, message_" + i;
rabbitTemplate.convertAndSend(queueName,msg);
Thread.sleep(20);
}
}
3. 消費者消費
@RabbitListener(queues = "work.queue")
public void ListenerWorkQueue1(String msg) {
System.out.println("消費者 1 監(jiān)聽到了 work 的消息:【" + msg + "】");
}
@RabbitListener(queues = "work.queue")
public void ListenerWorkQueue(String msg) {
System.err.println("消費者 2 監(jiān)聽到了 work 的消息.........:【" + msg + "】");
}
觀察到:
消費者 1 全是奇數(shù),消費者 2 全是偶數(shù)信息只會被消費一次消費者 1 和消費者 2 所有消費的信息之和 = 生產(chǎn)所有的信息
修改代碼:
假設(shè)消費者 1 的性能好,消費者 2 的性能相對弱一點
@RabbitListener(queues = "work.queue")
public void ListenerWorkQueue1(String msg) throws InterruptedException {
System.out.println("消費者 1 監(jiān)聽到了 work 的消息:【" + msg + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void ListenerWorkQueue(String msg) throws InterruptedException {
System.err.println("消費者 2 監(jiān)聽到了 work 的消息.........:【" + msg + "】");
Thread.sleep(200);
}
觀察到:即使是性能不同
消費者1 和 消費者2 都是對半的消息數(shù)量同時消費者 1 消費奇數(shù),消費者 2 消費偶數(shù)消費者1 消費過了,消費者2 不會消費
4. 思考
輪詢的結(jié)果是一人投一個,如果想讓性能好的機器多消費一點,性能差的機器消費少一點怎么辦?
3.2. 消費者消息推送限制
默認(rèn)情況下,RabbitMQ會將消息依次輪詢投遞給綁定在隊列上的每一個消費者。但并沒有考慮到消費者是否已經(jīng)處理完消息,可能出現(xiàn)消息堆積。
因此我們需要修改 application.yml ,設(shè)置 preFectch 值為1,確保同一時刻最多投遞給消費者 1 條消息
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能取 1 條信息,信息處理完成才能獲取下一條(消費者端開啟)
觀察到:
序號變成順序 性能好的多接收消息,性能差的處理的消息少 也就是處理完了一條消息,下一條消息才發(fā)放出來
意義
當(dāng)產(chǎn)生的信息在隊列中遠(yuǎn)遠(yuǎn)超過單個消費者消費的能力,這時候出現(xiàn)消費堆積
處理消費堆積的方法之一:增加消費者,同時消費者的消費能力有大小,所以根據(jù)消費者的性能來消費消息意義重大。
3.3. 總結(jié)
Work 模型的使用:
多個消費者綁定一個隊列,可以加快消息處理速度同一條消息只會被一個消費者處理通過設(shè)置 prefetch 來控制消費者預(yù)取的消息數(shù)量,處理完一條在處理下一條,實現(xiàn)能者多勞
4. 交換機 exchange
真正生產(chǎn)環(huán)境都會經(jīng)過 exchange 來發(fā)送信息,而不是直接發(fā)送到隊列,交換機的類型有以下三種:
Fanout:廣播Direct:定向Topic:話題
4.1 Fanout 交換機
Fanout exchange 會將接收到的消息廣播到每一個跟其綁定的 queue ,所以也叫廣播模式。
4.1.1 測試
在可視化頁面中創(chuàng)建,隊列 fanout.queue1 和 fanout.queue2在可視化頁面中創(chuàng)建,交換機 hmall.fanout,將兩個隊列將其綁定在 consumer 服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽 fanout.queue1 和 fanout.queue2在 publisher 服務(wù)中,編寫測試方法,向 hmall.fanout 發(fā)送消息
1. 在可視化頁面中創(chuàng)建,隊列 fanout.queue1 和 fanout.queue2
2. 在可視化頁面中創(chuàng)建,交換機 hmall.fanout,將兩個隊列將其綁定
3. 在 consumer 服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽 fanout.queue1 和 fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消費者 1 監(jiān)聽到了 fanout.queue1 的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消費者 2 監(jiān)聽到了 fanout.queue2 的消息:【" + msg + "】");
}
4. 在 publisher 服務(wù)中,編寫測試方法,向 hmall.fanout 發(fā)送消息
@Test
void testFanoutExchange() {
String exchangeName = "hmall.fanout";
String msg = "hello, everyone";
// routingKey 暫未設(shè)置,可以為 null 或 ""
rabbitTemplate.convertAndSend(exchangeName,"",msg);
}
4.1.2 總結(jié)
交換機的作用是什么?
接收 publisher 生產(chǎn)的消息將消息按照規(guī)則路由到與之綁定的隊列FanoutExchange的會將消息路由到每個綁定的隊列
4.2 Direct 交換機
Direct exchange 會將接收到的消息根據(jù)規(guī)則路由到指定的 Queue,因此成為定向路由
每個 Queue 都與 Exchange 設(shè)置一個 BindingKey發(fā)布者發(fā)布消息時,指定消息的 RoutingKeyExchange 將消息路由到 BindingKey 和 RoutingKey 一致的隊列
4.2.1 案例
在可視化頁面創(chuàng)建,隊列 direct.queue1 和 direct.queue2在可視化頁面創(chuàng)建,交換機 hmall.direct,將兩個隊列與其綁定在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 direct.queue1 和 direct.queue2在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.direct 發(fā)送消息
1. 在可視化頁面創(chuàng)建,隊列 direct.queue1 和 direct.queue2
**2. 在可視化頁面創(chuàng)建,交換機 hmall.direct,將兩個隊列與其綁定 **
綁定:
一次 RoutingKey 只能寫一個兩個的寫兩次
3. 在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 direct.queue1 和 direct.queue2
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消費者 1 接收到 direct.queue1 的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消費者 2 接收到 direct.queue2 的消息:【" + msg + "】");
}
4. 在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.direct 發(fā)送消息
@Test
void testDirectExchange() {
String exchangeName = "hmall.direct";
String red_msg = "紅色消息";
String yellow_msg = "黃色消息";
String blue_msg = "藍(lán)色消息";
rabbitTemplate.convertAndSend(exchangeName,"red",red_msg);
rabbitTemplate.convertAndSend(exchangeName,"yellow",yellow_msg);
rabbitTemplate.convertAndSend(exchangeName,"blue",blue_msg);
}
4.2.2 總結(jié)
描述下 Direct 交換機和 Fanout 交換機的差異?
Fanout 交換機是廣播 發(fā)送到每一個與之綁定的隊列Direct 交換機是根據(jù) RoutingKey 判斷發(fā)送給哪個隊列如果多個隊列具有相同的 RoutingKey,則與 Fanout 功能類似
4.3 Topic 交換機
Topic Exchange 與 Direct Exchange 類似,區(qū)別在于 RoutingKey 可以是多個單詞的列表,并且以 . 分割
Queue 和 Exchange 指定的 BindingKey 時可以使用通配符:
#:代指 0 或 多個單詞*****:代指一個單詞
4.3.1 案例
在可視化頁面創(chuàng)建,隊列 topic.queue1 和 topic.queue2在可視化頁面創(chuàng)建,交換機 hmall.topic,將兩個隊列與其綁定在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 topic.queue1 和 topic.queue2在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.topic發(fā)送消息
1. 在可視化頁面創(chuàng)建,隊列 topic.queue1 和 topic.queue2
2. 在可視化頁面創(chuàng)建,交換機 hmall.topic,將兩個隊列與其綁定
3. 在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 topic.queue1 和 topic.queue2
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
System.out.println("消費者 1 接收到 topic.queue1 的消息(china.#):【" + msg +"】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
System.out.println("消費者 2 接收到 topic.queue2 的消息(#.news):【" + msg +"】");
}
4. 在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.topic發(fā)送消息
@Test
void testTopicExchange() {
String exchangeName = "hmall.topic";
rabbitTemplate.convertAndSend(exchangeName,"china.weather","china.weather");
rabbitTemplate.convertAndSend(exchangeName,"china.news","china.news");
rabbitTemplate.convertAndSend(exchangeName,"japan.news","japan.news");
}
4.3.2 總結(jié)
描述下 Direct 交換機和 Topic 交換機的差異?
Topic 交換機接收的消息 RoutingKey 可以是多個單詞,以 . 分割Direct 交換機接收的消息 RoutingKey 是定死的Topic 交換機與隊列綁定時的 bindingKey 可以指定通配符#:代指 0 或 多個單詞*:代指一個單詞
5. java 聲明隊列和交換機
5.1 基于 bean 聲明
SpringAMQP 提供了幾個類,用來聲明隊列、交換機及其綁定關(guān)系:
Queue:用于聲明隊列,可以用工廠類 QueueBuilder 構(gòu)建Exchange:用于聲明交換機,可以用工廠類 ExchangeBuilder 構(gòu)建Binding:用于聲明隊列和交換機的綁定關(guān)系,可以用工廠類 BindingBuilder 構(gòu)建
Consumer 創(chuàng)建 config/FanoutConfiguration
// Consumer/config/FanoutConfiguration
@Configuration
public class FanoutConfiguration {
// 交換機創(chuàng)建
@Bean
public FanoutExchange fanoutExchange() {
// 兩種方式
// ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
return new FanoutExchange("hmall.fanout2");
}
// 隊列創(chuàng)建
@Bean
public Queue fanoutQueue3() {
// 也可以使用 隊列工廠類創(chuàng)建隊列,其中 durable 代表其持久化
// QueueBuilder.durable("fanout2.queue3").build();
return new Queue("fanout2.queue3");
}
@Bean
public Queue fanoutQueue4() {
Queue queue4 = QueueBuilder.durable("fanout2.queue4").build();
return queue4;
}
// 綁定 binding
// 實現(xiàn)隊列和交換機的綁定
// 參數(shù):交換機,隊列
@Bean
public Binding fanoutBinding3(FanoutExchange fanoutExchange,Queue fanoutQueue3) {
// 綁定(bind) 隊列(Queue) 給(to) 交換機(exchange)
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding4() {
// 直接使用方法傳參
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}
}
若想實現(xiàn) Direct 交換機創(chuàng)建,代碼參考如下
// Consumer/config/DirectConfiguration
@Configuration
public class DirectConfiguration {
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("hmall.direct2").build();
}
@Bean
public Queue directQueue1() {
return QueueBuilder.durable("direct2.queue1").build();
}
@Bean
public Queue directQueue2() {
return QueueBuilder.durable("direct2.queue2").build();
}
@Bean
public Binding directBinding1Red() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("red");
}
@Bean
public Binding directBinding1Blue() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("bule");
}
@Bean
public Binding directBinding2Red() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("red");
}
@Bean
public Binding directBinding2Yellow() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("yellow");
}
}
從以上代碼不難看出,創(chuàng)建 Direct 交換機和隊列代碼繁雜,因此,接下來提出基于注解的聲明。
5.2 基于注解的聲明
SpringAMQP 還提供了基于 @RabbitListener 注解來聲明隊列和交換機的方式
例如:使用 注解 創(chuàng)建上例交換機和隊列
ctrl + p:可用于提示參數(shù)列表
// Consumer/listeners/MqListener
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct3.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct3", type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue3(String msg) {
System.out.println("消費者 1 收到了來自 " +
"交換機(hmall.direct3)中隊列(direct3.queue1)的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct3.queue2",durable = "true"),
exchange = @Exchange(name = "hmall.direct3",type = "direct"),
key = {"red","yellow"}
))
public void listenDirectQueue4(String msg) {
System.out.println("消費者 2 收到了來自 " +
"交換機(hmall.direct3)中隊列(direct3.queue2)的消息:【" + msg + "】");
}
5.3 總結(jié)
聲明隊列、交換機、綁定關(guān)系的 Bean 是什么?
QueueFanoutExchange、DirectExchange、TopicExchangeBinding
基于 @RabbitListenner 注解創(chuàng)建隊列和交換機有哪些常見注解?
@Queue@Exchange
6. 消息轉(zhuǎn)換器
Spring 對消息對象的處理是基于 JDK 的ObjectOutputStream 完成序列化的。存在以下問題:
JDK 的序列化有安全風(fēng)險JDK 序列化的消息太大JDK 序列化的消息可讀性差
建議采用 JSON 序列化代替默認(rèn)的 JDK 序列化,要做兩件事:
在 publisher 和 consumer 中都要引入 jackson 依賴
在 publisher 和 consumer 中都要配置 MessageConverter @Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
publisher 測試代碼
@Test
void testSendObject() {
Map
msg.put("name","zhangsan");
msg.put("age",18);
rabbitTemplate.convertAndSend("object.queue",msg);
}
Consumer 接收消息
@RabbitListener(queues = "object.queue")
// 接收類型和傳送類型一致
public void listenObjectQueue(Map
System.out.println("消費者接收到 object.queue 的消息:【" + msg +"】");
}
柚子快報邀請碼778899分享:RabbitMQ 學(xué)習(xí)筆記
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。