柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMQ的使用
一、消息中間件RabbitMQ---概述和概念 【一】
1、概述
1、大多應(yīng)用中,可通過消息服務(wù)中間件來提升系統(tǒng)異步通信、擴(kuò)展解耦能力 2、消息服務(wù)中兩個(gè)重要概念:
消息代理(message broker)和目的地(destination) 當(dāng)消息發(fā)送者發(fā)送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。
3、消息隊(duì)列主要有兩種形式的目的地
隊(duì)列(queue):點(diǎn)對(duì)點(diǎn)消息通信(point-to-point)主題(topic):發(fā)布(publish)/訂閱(subscribe)消息通信
4、點(diǎn)對(duì)點(diǎn)式:
消息發(fā)送者發(fā)送消息,消息代理將其放入一個(gè)隊(duì)列中,消息接收者從隊(duì)列中獲取消息內(nèi)容,消息讀取后被移出隊(duì)列消息只有唯一的發(fā)送者和接受者,但并不是說只能有一個(gè)接收者
5、發(fā)布訂閱式:
發(fā)送者(發(fā)布者)發(fā)送消息到主題,多個(gè)接收者(訂閱者)監(jiān)聽(訂閱)這個(gè)主題,那么就會(huì)在消息到達(dá)時(shí)同時(shí)收到消息
6、JMS(Java Message Service)JAVA消息服務(wù):
基于JVM消息代理的規(guī)范。ActiveMQ、HornetMQ是JMS實(shí)現(xiàn)
7、 AMQP(Advanced Message Queuing Protocol)
高級(jí)消息隊(duì)列協(xié)議,也是一個(gè)消息代理的規(guī)范,兼容JMSRabbitMQ是AMQP的實(shí)現(xiàn)
2、RabbitMQ概念
2.1 RabbitMQ簡(jiǎn)介
RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanved Message Queue Protocol)的開源實(shí)現(xiàn)
2.2 核心概念
Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。Exchange有4種類型:direct(默認(rèn)),fanout, topic, 和headers,不同類型的Exchange轉(zhuǎn)發(fā)消息的策略有所區(qū)別
Queue:消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
Binding綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。Exchange 和Queue的綁定可以是多對(duì)多的關(guān)系。
Connection:網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。
Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)的虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷?duì)于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
Consumer:消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。
Broker:表示消息隊(duì)列服務(wù)器實(shí)體
二、消息中間件RabbitMQ---Docker安裝RabbitMQ、以及RabbitMQ的基本使用【二】
消息中間件RabbitMQ---Docker安裝RabbitMQ、以及RabbitMQ的基本使用【二】
1、安裝
1.1 拉取鏡像
docker pull rabbitmq:management
1.2 查看鏡像
docker images
1.3 創(chuàng)建容器
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
1.4 瀏覽器訪問
登錄進(jìn)去后的效果
2、測(cè)試使用(基本步驟)
2.1 創(chuàng)建交換機(jī)
2.2 創(chuàng)建隊(duì)列
2.3 綁定
3 、direct 交換器
3.1 基本介紹
????????消息中的路由鍵(routing key)如果和Binding 中的 binding key 一致, 交換器就將消息發(fā)到對(duì)應(yīng)的隊(duì)列中。路由鍵與隊(duì)列名完全匹配,如果一個(gè)隊(duì)列綁定到交換機(jī)要求路由鍵為“dog”,則只轉(zhuǎn)發(fā) routing key 標(biāo)記為“dog”的消息,不會(huì)轉(zhuǎn)發(fā)“dog.puppy”,也不會(huì)轉(zhuǎn)發(fā)“dog.guard” 等等。它是完全匹配、單播的模式。
3.2 測(cè)試
3.2.1 創(chuàng)建direct類型交換機(jī)
3.2.2 交換機(jī)和隊(duì)列綁定
3.2.3 發(fā)布消息
3.2.4 隊(duì)列查看
4、fanout 交換器
4.1 基本介紹
????????每個(gè)發(fā)到 fanout 類型交換器的消息都會(huì)分到所有綁定的隊(duì)列上去。fanout 交換器不處理路由鍵,只是簡(jiǎn)單的將隊(duì)列綁定到交換器上,每個(gè)發(fā)送到交換器的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換器綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。fanout 類型轉(zhuǎn)發(fā)消息是最快的。
4.2 測(cè)試
4.2.1 創(chuàng)建
4.2.2 綁定
4.2.3 發(fā)布消息
4.2.4 隊(duì)列查看
5、topic 交換器
5.1 基本介紹
???????? topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個(gè)模式進(jìn)行匹配,此時(shí)隊(duì)列需要綁定到一個(gè)模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點(diǎn)隔開。它同樣也會(huì)識(shí)別兩個(gè)通配符:符號(hào)“#”和符號(hào)“”。#匹配0個(gè)或多個(gè)單詞,匹配一個(gè)單詞。
5.2 測(cè)試
5.2.1 創(chuàng)建
5.2.2 綁定
5.2.3 發(fā)布消息(都能收到)
5.2.4 發(fā)布消息 (部分收到)
三、消息中間件RabbitMQ---SpringBoot整合RabbitMQ【三】
消息中間件RabbitMQ---SpringBoot整合RabbitMQ【三】
1、SpringBoot整合
1.1、pom依賴
1.2、配置文件
# RabbitMQ配置
spring.rabbitmq.host=192.168.202.211
spring.rabbitmq.port=5672
# 虛擬主機(jī)配置
spring.rabbitmq.virtual-host=/
1.3、啟動(dòng)類開啟
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
2、單元測(cè)試
2.1 創(chuàng)建交換器
/**
* 1、如何創(chuàng)建Exchange、Queue、Binding
* 1)、使用AmqpAdmin進(jìn)行創(chuàng)建
*/
@Test
public void createExchange() {
Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]創(chuàng)建成功:","hello-java-exchange");
}
使用代碼創(chuàng)建和工具創(chuàng)建類似。
創(chuàng)建結(jié)果
2.2 創(chuàng)建隊(duì)列
@Test
public void testCreateQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]創(chuàng)建成功:","hello-java-queue");
}
創(chuàng)建結(jié)果
2.3 創(chuàng)建綁定
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]創(chuàng)建成功:","hello-java-binding");
}
結(jié)果
2.4 發(fā)送消息1 (字符串)
@Test
public void sendMessageTestDemo1(){
String msg = "hello java";
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
log.info("消息發(fā)送完成{}",msg);
}
查看接收的消息
2.2 發(fā)送消息2 (對(duì)象)
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、發(fā)送消息,如果發(fā)送的消息是個(gè)對(duì)象,會(huì)使用序列化機(jī)制,將對(duì)象寫出去,對(duì)象必須實(shí)現(xiàn)Serializable接口
//2、發(fā)送的對(duì)象類型的消息,可以是一個(gè)json
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息發(fā)送完成:{}",reasonEntity);
}
由于是序列化,需要添加配置,經(jīng)過轉(zhuǎn)換后才可以看到對(duì)象數(shù)據(jù)
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
2.3 接收數(shù)據(jù)
/**
* queues:聲明需要監(jiān)聽的隊(duì)列
* channel:當(dāng)前傳輸數(shù)據(jù)的通道
*/
@RabbitListener(queues = {"hello-java-queue"})
public void revieveMessage(Message message,
OrderReturnReasonEntity content) {
//拿到主體內(nèi)容
byte[] body = message.getBody();
//拿到的消息頭屬性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...內(nèi)容" + message + "===內(nèi)容:" + content);
}
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMQ的使用
精彩內(nèi)容
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。