柚子快報(bào)邀請碼778899分享:分布式 RabbitMQ
柚子快報(bào)邀請碼778899分享:分布式 RabbitMQ
一、初識(shí) MQ
1. 同步通訊
時(shí)效性強(qiáng),立即獲取結(jié)果
微服務(wù)間基于 Feign 的調(diào)用就屬于同步
方式,存在一些問題:
① 耦合度高
② 性能和吞吐能力不如異步
③ 額外資源消耗
④ 級(jí)聯(lián)失敗問題
2. 異步通訊
異步調(diào)用常見實(shí)現(xiàn)就是事件驅(qū)動(dòng)模式
優(yōu)點(diǎn):
① 服務(wù)解耦
② 性能提升,吞吐量提高
③ 服務(wù)沒有強(qiáng)依賴,不擔(dān)心級(jí)聯(lián)問題
④ 流量削峰
缺點(diǎn)
① 依賴 Broker(事件代理者) 的可靠性、
? ? 安全性、吞吐能力
② 架構(gòu)復(fù)雜的情況下,業(yè)務(wù)沒有明顯
? ? 的流程線,不好追蹤管理
3. MQ 常見框架
MQ (MessageQueue),中文是消息隊(duì)列,
字面來看就是存放消息的隊(duì)列,也就是事
件驅(qū)動(dòng)架構(gòu)中的 Broker
二、RabbitMQ 快速入門
1. RabbitMQ 概述
RabbitMQ 是基于 Erlang 語言開發(fā)的開源
消息通信中間件
官網(wǎng)地址:https://www.rabbitmq.com/
channel(通道):操作 MQ 的工具
exchange(交換機(jī)):路由消息到隊(duì)列中
queue(隊(duì)列):緩存消息
virtual host:虛擬主機(jī),是對 queue、
? ? ? ? ? ? ?exchange 等資源的邏輯分組
2. 常見消息模型
(1) 基本消息隊(duì)列(BasicQueue)
?1) 官方的 HelloWorld 是基于最基礎(chǔ)的
? ? 消息隊(duì)列模型來實(shí)現(xiàn)的,只包括三個(gè)
? ? 角色:
publisher:消息發(fā)布者,將消息發(fā)送到隊(duì)列
queue queue:消息隊(duì)列,負(fù)責(zé)接受并緩存消息
consumer:訂閱隊(duì)列,處理隊(duì)列中的消息
? 2) 基本消息隊(duì)列的消息發(fā)送流程:
① 建立 connection
② 創(chuàng)建 channel
③ 利用 channel 聲明隊(duì)列
④ 利用 channel 向隊(duì)列發(fā)送消息
? 3) 基本消息隊(duì)列的消息接收流程:
① 建立 connection
② 創(chuàng)建 channel
③ 利用 channel 聲明隊(duì)列
④ 定義 consumer 的消費(fèi)行為
? ??handleDelivery()
⑤ 利用 channel 將消費(fèi)者與隊(duì)列
? ? ?綁定?
(2) 工作消息隊(duì)列(WorkQueue)
可以提高消息處理速度,避免隊(duì)列消
息堆積?
(3) 發(fā)布訂閱(Publish、Subscribe)
允許將同一消息發(fā)送給多個(gè)消費(fèi)者
實(shí)現(xiàn)方式是加入了 exchange(交換機(jī))
注意:exchange 負(fù)責(zé)消息路由,而
? ? ? 不是存儲(chǔ),路由失敗則消息丟失
根據(jù)交換機(jī)類型不同分為三種:
1) Fanout Exchange:廣播
將接收到的消息廣播到每一個(gè)跟其
綁定的 queue
??
2) Direct Exchange:路由
將接收到的消息根據(jù)規(guī)則路由到指定
的 Queue,因此稱為路由模式(routes)
① 每一個(gè) Queue 都與 Exchange 設(shè)置
? ? 一個(gè) BindingKey
② 發(fā)布者發(fā)送消息時(shí),指定消息的
? ? RoutingKey
③ Exchange 將消息路由到 BindingKey
? ? 與消息 RoutingKey 一致的隊(duì)列
??
3) Topic Exchange:主題
TopicExchange 與 DirectExchange 類
似,區(qū)別在于 routingKey 必須是多個(gè)
單詞的列表,并且以 . 分割
Queue 與 Exchange 指定 BindingKey 時(shí)
可以使用通配符:
#:代指0個(gè)或多個(gè)單詞
*:代指一個(gè)單詞
三、SpringAMQP
SpringAmqp 的官方地址:
https://spring.io/projects/spring-amqp
AMQP,即 Advanced Message Queuing
Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用
層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,基于此協(xié)議的
客戶端與消息中間件可傳遞消息,并不受
客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言
等條件的限制
Spring AMQP 項(xiàng)目將核心 Spring 概念應(yīng)
用于基于 AMQP 的消息傳遞解決方案的
開發(fā)的一套 API 規(guī)范,它提供了一個(gè)模板
作為發(fā)送和接收消息的高級(jí)抽象
其中 spring-amqp 是基礎(chǔ)抽象,
spring-rabbit 是底層的默認(rèn)實(shí)現(xiàn)
1. 利用 SpringAMQP 實(shí)現(xiàn) HelloWorld 中的基
? ? 礎(chǔ)消息隊(duì)列功能
? (1) 在父工程中引入 spring-amqp 的依賴
? (2) 在 publisher 服務(wù)中利用 RabbitTemplate
? ? ? 發(fā)送消息到 simple.queue 這個(gè)隊(duì)列
? ?① 在 publisher 服務(wù)中編寫 application.yml,
? ? ? ?添加 mq 連接信息:
spring:
rabbitmq:
host: 192.168.150.110 # 主機(jī)名
port: 5672 # 端口
virtual-host: / # 虛擬主機(jī)
username: root
password: 123456
? ②?在 publisher 服務(wù)中新建一個(gè)測試類,
? ? ? ?編寫測試方法:
public class PublisherTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp";
rabbitTemplate.convertAndSend(queueName, message);
}
}
(3) 在 consumer 服務(wù)中編寫消費(fèi)邏輯,綁定
? ? simple.queue 這個(gè)隊(duì)列?
? ① 在 consumer 服務(wù)中編寫 application.yml,
? ? ? 添加 mq 連接信息:
spring:
rabbitmq:
host: 192.168.150.110 # 主機(jī)名
port: 5672 # 端口
virtual-host: / # 虛擬主機(jī)
username: root
password: 123456
? ② 在 consumer 服務(wù)中新建一個(gè)類,編寫消
? ? ? 費(fèi)邏輯:?
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"simple.queue"})
public void listenSimpleQueue(String msg) {
System.out.println(msg);
}
}
注意:消息一旦消費(fèi)就會(huì)從隊(duì)列刪除,
? ? ? ? ? RabbitMQ 沒有消息回溯功能
2. 使用 WorkQueue 實(shí)現(xiàn)一個(gè)隊(duì)列綁定多個(gè)
? ? 消費(fèi)者
? (1) 生產(chǎn)者循環(huán)發(fā)送消息到 simple.queue
@Test
public void testSimpleQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, spring amqp";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
? (2) 編寫兩個(gè)消費(fèi)者,都監(jiān)聽 simple.queue
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"simple.queue"})
public void listenSimpleQueue1(String msg) throws InterruptedException {
System.out.println("消費(fèi)者1" + "【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = {"simple.queue"})
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.err.println("消費(fèi)者2" + "【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
}
AMQP 有一個(gè)消息預(yù)取機(jī)制,設(shè)置 preFetch
這個(gè)值,可以控制預(yù)取消息的上限:
spring:
rabbitmq:
host: 190.92.246.107 # 主機(jī)名
port: 5672 # 端口
virtual-host: / # 虛擬主機(jī)
username: root
password: 123456
listener:
simple:
prefetch: 1
3.?FanoutExchange 的使用
(1) 在 consumer 服務(wù)中,利用代碼聲明隊(duì)
? ? ?列、交換機(jī),并將兩者綁定
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("root.fanout");
}
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public Binding bindQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding bindQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
(2) 在 consumer 服務(wù)中,編寫兩個(gè)消費(fèi)者
? ? ?方法,分別監(jiān)聽 fanout.queue1 和
? ? ?fanout.queue2
@RabbitListener(queues = {"fanout.queue1"})
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.out.println("fanout.queue1消費(fèi)者" + "【" + msg + "】" + LocalTime.now());
}
@RabbitListener(queues = {"fanout.queue2"})
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.err.println("fanout.queue2消費(fèi)者" + "【" + msg + "】" + LocalTime.now());
}
(3) 在 publisher 中編寫測試方法,向
? ? ?itcast.fanout 發(fā)送消息
@Test
public void testSendFanoutExchange() {
String exchangeName = "root.fanout";
String message = "hello everyone";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
交換機(jī)的作用:
① 接收 publisher 發(fā)送的消息
② 將消息按照規(guī)則路由到與之綁定的
? ? 隊(duì)列
③ 不能緩存消息,路由失敗,消息丟
? ? 失
④ FanoutExchange 的會(huì)將消息路由
? ? 到每個(gè)綁定的隊(duì)列
聲明隊(duì)列、交換機(jī)、綁定關(guān)系的 Bean:
Queue
FanoutExchange
Binding
4.?DirectExchange 的使用?
(1) 利用 @RabbitListener 聲明 Exchange、
? ? ?Queue、RoutingKey
(2) 在 consumer 服務(wù)中,編寫兩個(gè)消費(fèi)者
? ? ?方法,分別監(jiān)聽 direct.queue1 和
? ? ?direct.queue2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),
key = {
"blue",
"red"
}
))
public void listenDirectQueue1(String msg) {
System.err.println("direct.queue1消費(fèi)者" + "【" + msg + "】" + LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),
key = {
"yellow",
"red"
}
))
public void listenDirectQueue2(String msg) {
System.err.println("direct.queue2消費(fèi)者" + "【" + msg + "】" + LocalTime.now());
}
(3) 在 publisher 中編寫測試方法,向 itcast.
? ? ?direct發(fā)送消息
@Test
public void testSendDirectExchange() {
String exchangeName = "root.direct";
String message = "hello red";
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
Direct 交換機(jī)與 Fanout 交換機(jī)的差異:
① Fanout 交換機(jī)將消息路由給每一個(gè)
? ? 與之綁定的隊(duì)列
② Direct 交換機(jī)根據(jù) RoutingKey 判斷
? ? 路由給哪個(gè)隊(duì)列
③ 如果多個(gè)隊(duì)列具有相同的 RoutingKey,
? ? 則與 Fanout 功能類似
5.?TopicExchange 的使用
(1) 利用 @RabbitListene r聲明 Exchange、
? ? ?Queue、RoutingKey
(2) 在 consumer 服務(wù)中,編寫兩個(gè)消費(fèi)者方
? ? 法,分別監(jiān)聽topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),
key = {
"china.#"
}
))
public void listenTopicQueue1(String msg) {
System.err.println("topic.queue1消費(fèi)者" + "【" + msg + "】" + LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),
key = {
"#.news"
}
))
public void listenTopicQueue2(String msg) {
System.err.println("topic.queue2消費(fèi)者" + "【" + msg + "】" + LocalTime.now());
}
(3) 在 publisher 中編寫測試方法,向 itcast.
? ? ?topic 發(fā)送消息
@Test
public void testSendTopicExchange() {
String exchangeName = "root.topic";
String message = "hello world";
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
6. 消息轉(zhuǎn)換器
在 SpringAMQP 的發(fā)送方法中,接收消息
的類型是 Object,也就是說我們可以發(fā)送
任意對象類型的消息,SpringAMQP 會(huì)幫
我們序列化為字節(jié)后發(fā)送
SpringAMQP 中消息的序列化和反序列化
是利用 MessageConverter 實(shí)現(xiàn)的,默認(rèn)
是 JDK 的序列化,其中發(fā)送方與接收方必
須使用相同的 MessageConverter
推薦用 JSON 方式序列化:
① 先在 publisher 服務(wù)引入依賴
② 在 publisher 服務(wù)聲明 MessageConverter
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
③?在 consumer 服務(wù)引入 Jackson 依賴:
④?在 consumer 服務(wù)定義 MessageConverter:
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
⑤?定義一個(gè)消費(fèi)者,監(jiān)聽 object.queue 隊(duì)列
? ? 并消費(fèi)消息:
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map
System.out.println("收到消息:【" + msg + "】");
}
柚子快報(bào)邀請碼778899分享:分布式 RabbitMQ
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。