柚子快報(bào)激活碼778899分享:分布式 RabbitMQ
讓你快速了解RabbitMQ
什么是RabbitM
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)
消息隊(duì)列的概念
哪什么是消息隊(duì)列呢?
消息隊(duì)列(Message Queue)”是在消息的傳輸過程中保存消息的容器。在消息隊(duì)列中,通常有生產(chǎn)者和消費(fèi)者兩個(gè)角色。生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù)到消息隊(duì)列,誰從消息隊(duì)列中取出數(shù)據(jù)處理,他不管。消費(fèi)者只負(fù)責(zé)從消息隊(duì)列中取出數(shù)據(jù)處理,他不管這是誰發(fā)送的數(shù)據(jù)。
哪消息隊(duì)列有什么作用呢
主要作用有三點(diǎn):
1.解耦
實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的解耦,生產(chǎn)者和消費(fèi)者不直接調(diào)用,也不用關(guān)心對(duì)方如何處理,代碼的維護(hù)性提高
比如使用openfeign實(shí)現(xiàn)服務(wù)調(diào)用,被調(diào)用服務(wù)的接口發(fā)生修改,服務(wù)調(diào)用方也需要進(jìn)行修改,服務(wù)之間的耦合性就會(huì)較高,那么不利于開發(fā)和維護(hù)
2.異步
同步調(diào)用,服務(wù)A調(diào)用服務(wù)B,必須等待服務(wù)B執(zhí)行完業(yè)務(wù),服務(wù)A才能執(zhí)行其它業(yè)務(wù)
異步調(diào)用,服務(wù)A發(fā)送消息給消息隊(duì)列,馬上返回完成其它業(yè)務(wù),不用等待服務(wù)B執(zhí)行完
3.削峰
這其實(shí)是MQ一個(gè)很重要的應(yīng)用,可以通過控制消息隊(duì)列的長度來限制請(qǐng)求流量,從而達(dá)到限流保護(hù)服務(wù)器的作用
假如說系統(tǒng)A在某個(gè)時(shí)間段請(qǐng)求大量增加,有上萬條數(shù)據(jù)發(fā)送過來,系統(tǒng)A就會(huì)把發(fā)送過來的數(shù)據(jù)直接發(fā)到SQL中,mysl數(shù)據(jù)庫里執(zhí)行,如果處理不過來就會(huì)直接導(dǎo)致系統(tǒng)癱瘓,使用MQ,系統(tǒng)A不再是直接發(fā)送SQL到數(shù)據(jù)庫,而是把數(shù)據(jù)發(fā)送到MQ,MQ短時(shí)間積壓數(shù)據(jù)是可以接受的,然后由消費(fèi)者每次拉取2000條進(jìn)行處理,防止在請(qǐng)求峰值時(shí)期大量的請(qǐng)求直接發(fā)送到MySQL導(dǎo)致系統(tǒng)崩潰。
同時(shí)消息隊(duì)列也有它的缺點(diǎn)
1.系統(tǒng)的復(fù)雜性提高了
2.系統(tǒng)的可用性降低了
消息隊(duì)列的概念
生產(chǎn)者 向消息隊(duì)列發(fā)送消息的服務(wù) 消費(fèi)者 從消息隊(duì)列取消息的服務(wù) 隊(duì)列 queue 存放消息的容器,采用FIFO數(shù)據(jù)結(jié)構(gòu) 交換機(jī) exchange 實(shí)現(xiàn)消息路由,將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中 消息服務(wù)器 Broker 進(jìn)行消息通信的軟件平臺(tái)服務(wù)器 虛擬主機(jī) virtual host 類似于namespace,將不同用戶的交換機(jī)和隊(duì)列區(qū)分開來 連接 connection 網(wǎng)絡(luò)連接 通道 channel 數(shù)據(jù)通信的通道
RabbitMQ的基本使用
1.添加用戶
不同的系統(tǒng)可以使用各自的用戶登錄RabbitMQ,可以在Admin的User頁面添加新用戶
2.添加虛擬主機(jī)
虛擬主機(jī)相當(dāng)于一個(gè)獨(dú)立的MQ服務(wù),有自身的隊(duì)列、交換機(jī)、綁定策略等。 添加虛擬主機(jī)
3.添加隊(duì)列
不同的消息隊(duì)列保存不同類型的消息,如支付消息、秒殺消息、數(shù)據(jù)同步消息等。 添加隊(duì)列,需要填寫虛擬主機(jī)、類型、名稱、持久化、自動(dòng)刪除和參數(shù)等。
4.添加交換機(jī)
生產(chǎn)者將消息發(fā)送到交換機(jī)Exchange,再由交換機(jī)路由到一個(gè)或多個(gè)隊(duì)列中; 交換器的類型有fanout、direct、topic、headers這四種,下篇文章將詳細(xì)介紹。 添加交換機(jī)
RabbitMQ的五種消息模型
RabbitMQ提供了多種消息模型,官網(wǎng)上第6種是RPC不屬于常規(guī)的消息隊(duì)列。 屬于消息模型的是前5種:
簡單的一對(duì)一模型工作隊(duì)列模型 ,一個(gè)生產(chǎn)者將消息分發(fā)給多個(gè)消費(fèi)者發(fā)布/訂閱模型 ,生產(chǎn)者發(fā)布消息,多個(gè)消費(fèi)者同時(shí)收取路由模型 ,生產(chǎn)者通過關(guān)鍵字發(fā)送消息給特定消費(fèi)者主題模型 ,路由模式基礎(chǔ)上,在關(guān)鍵字里加入了通配符
1.一對(duì)一模型
最基本的隊(duì)列模型: 一個(gè)生產(chǎn)者發(fā)送消息到一個(gè)隊(duì)列,一個(gè)消費(fèi)者從隊(duì)列中取消息。
2.操作步驟
1)啟動(dòng)Rabbitmq,在管理頁面中創(chuàng)建用戶admin 2)使用admin登錄,然后創(chuàng)建虛擬主機(jī)myhost
3.案例代碼
導(dǎo)入依賴
開發(fā)工具類
public class MQUtils {
public static final String QUEUE_NAME = "myqueue01";
public static final String QUEUE_NAME2 = "myqueue02";
public static final String EXCHANGE_NAME = "myexchange01";
public static final String EXCHANGE_NAME2 = "myexchange02";
public static final String EXCHANGE_NAME3 = "myexchange03";
/**
* 獲得MQ的連接
* @return
* @throws IOException
*/
public static Connection getConnection() throws IOException {
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置服務(wù)器名、端口、虛擬主機(jī)名、登錄賬號(hào)和密碼
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("myhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}
}
開發(fā)生產(chǎn)者
/**
* 生產(chǎn)者,發(fā)送簡單的消息到隊(duì)列中
*/
public class SimpleProducer {
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//定義隊(duì)列
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
String msg = "Hello World!";
//發(fā)布消息到隊(duì)列
channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());
channel.close();
connection.close();
}
}
運(yùn)行生產(chǎn)者代碼,管理頁面點(diǎn)進(jìn)myqueue01,在GetMessages中可以看到消息
開發(fā)消費(fèi)者
/**
* 消費(fèi)者,從隊(duì)列中讀取簡單的消息
*/
public class SimpleConsumer {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//定義隊(duì)列
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//消費(fèi)者消費(fèi)通道中的消息
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
//讀取消息
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
}
}
}
工作隊(duì)列模型
工作隊(duì)列,生產(chǎn)者將消息分發(fā)給多個(gè)消費(fèi)者,如果生產(chǎn)者生產(chǎn)了100條消息,消費(fèi)者1消費(fèi)50條,消費(fèi)者2消費(fèi)50條。
1 案例代碼
開發(fā)生產(chǎn)者
/**
多對(duì)多模式的生產(chǎn)者,會(huì)發(fā)送多條消息到隊(duì)列中
*/
public class WorkProductor {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
for(int i = 0;i < 100;i++){
String msg = "Hello-->" + i;
channel.basicPublish("",MQUtils.QUEUE_NAME,null, msg.getBytes());
System.out.println("send:" + msg);
Thread.sleep(10);
}
channel.close();
connection.close();
}
}
開發(fā)消費(fèi)者1
/**
* 多對(duì)多模式的消費(fèi)者1
*/
public class WorkConsumer01 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//消費(fèi)者消費(fèi)通道中的消息
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
Thread.sleep(10);
}
}
}
開發(fā)消費(fèi)者2
/**
* 多對(duì)多模式的消費(fèi)者2
*/
public class WorkConsumer02 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//消費(fèi)者消費(fèi)通道中的消息
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
Thread.sleep(1000);
}
}
}
生產(chǎn)者發(fā)送100個(gè)消息,兩個(gè)消費(fèi)者分別讀取了50條。 看消息內(nèi)容,發(fā)現(xiàn)隊(duì)列發(fā)送消息采用的是輪詢方式,也就是先發(fā)給消費(fèi)者1,再發(fā)給消費(fèi)者2,依次往復(fù)。
2 能者多勞
上面案例中有一個(gè)問題:消費(fèi)者處理消息的速度是不一樣的,消費(fèi)者1處理后睡眠10毫秒(Thread.sleep(10)),消費(fèi)者2是1000毫秒,速度相差100倍,但是最后處理的消息數(shù)還是一樣的。這樣就存在效率問題:處理能力強(qiáng)的消費(fèi)者得不到更多的消息。
因?yàn)殛?duì)列默認(rèn)采用是自動(dòng)確認(rèn)機(jī)制,消息發(fā)過去后就自動(dòng)確認(rèn),隊(duì)列不清楚每個(gè)消息具體什么時(shí)間處理完,所以平均分配消息數(shù)量。
實(shí)現(xiàn)能者多勞:
channel.basicQos(1);限制隊(duì)列一次發(fā)一個(gè)消息給消費(fèi)者,等消費(fèi)者有了反饋,再發(fā)下一條channel.basicAck 消費(fèi)完消息后手動(dòng)反饋,處理快的消費(fèi)者就能處理更多消息basicConsume 中的參數(shù)改為false
/**
多對(duì)多模式的消費(fèi)者1
*/
public class WorkConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)者
channel.basicQos(1);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//true是自動(dòng)返回完成狀態(tài),false表示手動(dòng)
channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
Thread.sleep(10);
//手動(dòng)確定返回狀態(tài),不寫就是自動(dòng)確認(rèn)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
/**
* 多對(duì)多模式的消費(fèi)者2
*/
public class WorkConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)者
channel.basicQos(1);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//true是自動(dòng)返回完成狀態(tài),false表示手動(dòng)
channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
Thread.sleep(1000);
//手動(dòng)確定返回狀態(tài),不寫就是自動(dòng)確認(rèn)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
3 發(fā)布/訂閱模型
發(fā)布/訂閱模式和Work模式的區(qū)別是:Work模式只存在一個(gè)隊(duì)列,多個(gè)消費(fèi)者共同消費(fèi)一個(gè)隊(duì)列中的消息;而發(fā)布訂閱模式存在多個(gè)隊(duì)列,不同的消費(fèi)者可以從各自的隊(duì)列中處理完全相同的消息。
1. 操作步驟
實(shí)現(xiàn)步驟:
創(chuàng)建交換機(jī)(Exchange)類型是fanout(扇出)交換機(jī)需要綁定不同的隊(duì)列不同的消費(fèi)者從不同的隊(duì)列中獲得消息生產(chǎn)者發(fā)送消息到交換機(jī)再由交換機(jī)將消息分發(fā)到多個(gè)隊(duì)列
新建隊(duì)列
新建交換機(jī)
點(diǎn)擊交換機(jī),在bindings里面綁定兩個(gè)隊(duì)列
2.案例代碼
生產(chǎn)者
/**
* 發(fā)布和訂閱模式的生產(chǎn)者,消息會(huì)通過交換機(jī)發(fā)到隊(duì)列
*/
public class PublishProductor {
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明fanout exchange
channel.exchangeDeclare(MQUtils.EXCHANGE_NAME,"fanout");
String msg = "Hello Fanout";
//發(fā)布消息到交換機(jī)
channel.basicPublish(MQUtils.EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消費(fèi)者1
/**
* 發(fā)布訂閱模式的消費(fèi)者1
* 兩個(gè)消費(fèi)者綁定的消息隊(duì)列不同
* 通過交換機(jī)一個(gè)消息能被不同隊(duì)列的兩個(gè)消費(fèi)者同時(shí)獲取
* 一個(gè)隊(duì)列可以有多個(gè)消費(fèi)者,隊(duì)列中的消息只能被一個(gè)消費(fèi)者獲取
*/
public class SubscribeConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//綁定隊(duì)列1到交換機(jī)
channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME,"");
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("Consumer1 receive :" + new String(delivery.getBody()));
}
}
}
消費(fèi)者2
public class SubscribeConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
//綁定隊(duì)列2到交換機(jī)
channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME,"");
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("Consumer2 receive :" + new String(delivery.getBody()));
}
}
}
路由模型
路由模式的消息隊(duì)列可以給隊(duì)列綁定不同的key,生產(chǎn)者發(fā)送消息時(shí),給消息設(shè)置不同的key,這樣交換機(jī)在分發(fā)消息時(shí),可以讓消息路由到key匹配的隊(duì)列中。 可以想象上圖是一個(gè)日志處理系統(tǒng),C1可以處理error日志消息,C2可以處理info\error\warining類型的日志消息,使用路由模式就很容易實(shí)現(xiàn)了。
1 操作步驟
新建direct類型的交換機(jī)
2.案例代碼
生產(chǎn)者,給myqueue01綁定了key:error,myqueue02綁定了key:debug,然后發(fā)送了key:error的消息
/**
路由模式的生產(chǎn)者,發(fā)布消息會(huì)有特定的Key,消息會(huì)被綁定特定Key的消費(fèi)者獲取
*/
public class RouteProductor {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明交換機(jī)類型為direct
channel.exchangeDeclare(MQUtils.EXCHANGE_NAME2,"direct");
String msg = "Hello-->Route";
//綁定隊(duì)列1到交換機(jī),指定了Key為error
channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME2,"error");
//綁定隊(duì)列2到交換機(jī),指定了Key為debug
channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME2,"debug");
//error是一個(gè)指定的Key
channel.basicPublish(MQUtils.EXCHANGE_NAME2,"error",null,msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消費(fèi)者1
/**
* 路由模式的消費(fèi)者1
* 可以指定Key,消費(fèi)特定的消息
*/
public class RouteConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("RouteConsumer1 receive :" + new String(delivery.getBody()));
}
}
}
消費(fèi)者2
/**
* 路由模式的消費(fèi)者2
* 可以指定Key,消費(fèi)特定的消息
*/
public class RouteConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("RouteConsumer2 receive :" + new String(delivery.getBody()));
}
}
}
主題模型
主題模式和路由模式差不多,在key中可以加入通配符:
* 匹配任意一個(gè)單詞 com.* ----> com.hopu com.blb com.baidu# 匹配.號(hào)隔開的0個(gè)或多個(gè)單詞 com.# —> com.hopu.net com.hopu com.163.xxx.xxx.xxx
1 案例代碼
生產(chǎn)者代碼
/**
主題模式的生產(chǎn)者
*/
public class TopicProductor {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明交換機(jī)類型為topic
channel.exchangeDeclare(MQUtils.EXCHANGE_NAME3,"topic");
//綁定隊(duì)列到交換機(jī),最后指定了Key
channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME3,"xray.#");
//綁定隊(duì)列到交換機(jī),最后指定了Key
channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME3,"*.*.cn");
String msg = "Hello-->Topic";
channel.basicPublish(MQUtils.EXCHANGE_NAME3,"rabbit.com.cn",null,msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消費(fèi)者1
/**
* 主題模式的消費(fèi)者1 ,類似路由模式,可以使用通配符對(duì)Key進(jìn)行篩選
* #匹配1個(gè)或多個(gè)單詞,*匹配一個(gè)單詞
*/
public class TopicConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("TopicConsumer1 receive :" + new String(delivery.getBody()));
}
}
}
消費(fèi)者2
/**
* 主題模式的消費(fèi)者2
*/
public class TopicConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("TopicConsumer2 receive :" + new String(delivery.getBody()));
}
}
}
SpringBoot整合RabbitMQ
創(chuàng)建兩個(gè)SpringBoot項(xiàng)目,一個(gè)作為生產(chǎn)者,一個(gè)作為消費(fèi)者
生產(chǎn)者會(huì)發(fā)送兩種消息:保存課程(更新和添加),刪除課程
消費(fèi)者監(jiān)聽兩個(gè)隊(duì)列:保存課程隊(duì)列和刪除課程隊(duì)列
2)給生產(chǎn)者和消費(fèi)者服務(wù)添加依賴
3) 給生產(chǎn)者和消費(fèi)者服務(wù)添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: myhost
4)生產(chǎn)者的配置,用于生成消息隊(duì)列和交換機(jī)
/**
* RabbitMQ的配置
*/
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "edu.course.exchange";
@Bean
public Queue queueCourseSave() {
return new Queue(QUEUE_COURSE_SAVE);
}
@Bean
public Queue queueCourseRemove() {
return new Queue(QUEUE_COURSE_REMOVE);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(COURSE_EXCHANGE);
}
@Bean
public Binding bindCourseSave() {
return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
}
@Bean
public Binding bindCourseRemove() {
return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
}
}
5) 生產(chǎn)者發(fā)送消息的核心代碼
@Autowired
RabbitTemplate rabbitTemplate;
//發(fā)消息的代碼
rabbitTemplate.convertAndSend(交換機(jī)的名稱,消息的key,消息內(nèi)容);
6)消費(fèi)者添加監(jiān)聽器
@Slf4j
@Component
public class CourseMQListener {
public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "course.exchange";
/**
* 監(jiān)聽課程添加操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_SAVE)})
public void receiveCourseSaveMessage(String message) {
try {
log.info("課程添加:{}",message);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 監(jiān)聽課程刪除操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_REMOVE)})
public void receiveCourseDeleteMessage(Long id) {
try {
log.info("課程刪除完成:{}",id);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
CourseSaveMessage(String message) { try { log.info(“課程添加:{}”,message); } catch (Exception ex) { ex.printStackTrace(); } }
/**
* 監(jiān)聽課程刪除操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_REMOVE)})
public void receiveCourseDeleteMessage(Long id) {
try {
log.info("課程刪除完成:{}",id);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。