柚子快報激活碼778899分享:分布式 RabbitMQ
讓你快速了解RabbitMQ
什么是RabbitM
RabbitMQ是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)
消息隊列的概念
哪什么是消息隊列呢?
消息隊列(Message Queue)”是在消息的傳輸過程中保存消息的容器。在消息隊列中,通常有生產(chǎn)者和消費者兩個角色。生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù)到消息隊列,誰從消息隊列中取出數(shù)據(jù)處理,他不管。消費者只負(fù)責(zé)從消息隊列中取出數(shù)據(jù)處理,他不管這是誰發(fā)送的數(shù)據(jù)。
哪消息隊列有什么作用呢
主要作用有三點:
1.解耦
實現(xiàn)生產(chǎn)者和消費者的解耦,生產(chǎn)者和消費者不直接調(diào)用,也不用關(guān)心對方如何處理,代碼的維護性提高
比如使用openfeign實現(xiàn)服務(wù)調(diào)用,被調(diào)用服務(wù)的接口發(fā)生修改,服務(wù)調(diào)用方也需要進行修改,服務(wù)之間的耦合性就會較高,那么不利于開發(fā)和維護
2.異步
同步調(diào)用,服務(wù)A調(diào)用服務(wù)B,必須等待服務(wù)B執(zhí)行完業(yè)務(wù),服務(wù)A才能執(zhí)行其它業(yè)務(wù)
異步調(diào)用,服務(wù)A發(fā)送消息給消息隊列,馬上返回完成其它業(yè)務(wù),不用等待服務(wù)B執(zhí)行完
3.削峰
這其實是MQ一個很重要的應(yīng)用,可以通過控制消息隊列的長度來限制請求流量,從而達到限流保護服務(wù)器的作用
假如說系統(tǒng)A在某個時間段請求大量增加,有上萬條數(shù)據(jù)發(fā)送過來,系統(tǒng)A就會把發(fā)送過來的數(shù)據(jù)直接發(fā)到SQL中,mysl數(shù)據(jù)庫里執(zhí)行,如果處理不過來就會直接導(dǎo)致系統(tǒng)癱瘓,使用MQ,系統(tǒng)A不再是直接發(fā)送SQL到數(shù)據(jù)庫,而是把數(shù)據(jù)發(fā)送到MQ,MQ短時間積壓數(shù)據(jù)是可以接受的,然后由消費者每次拉取2000條進行處理,防止在請求峰值時期大量的請求直接發(fā)送到MySQL導(dǎo)致系統(tǒng)崩潰。
同時消息隊列也有它的缺點
1.系統(tǒng)的復(fù)雜性提高了
2.系統(tǒng)的可用性降低了
消息隊列的概念
生產(chǎn)者 向消息隊列發(fā)送消息的服務(wù) 消費者 從消息隊列取消息的服務(wù) 隊列 queue 存放消息的容器,采用FIFO數(shù)據(jù)結(jié)構(gòu) 交換機 exchange 實現(xiàn)消息路由,將消息分發(fā)到對應(yīng)的隊列中 消息服務(wù)器 Broker 進行消息通信的軟件平臺服務(wù)器 虛擬主機 virtual host 類似于namespace,將不同用戶的交換機和隊列區(qū)分開來 連接 connection 網(wǎng)絡(luò)連接 通道 channel 數(shù)據(jù)通信的通道
RabbitMQ的基本使用
1.添加用戶
不同的系統(tǒng)可以使用各自的用戶登錄RabbitMQ,可以在Admin的User頁面添加新用戶
2.添加虛擬主機
虛擬主機相當(dāng)于一個獨立的MQ服務(wù),有自身的隊列、交換機、綁定策略等。 添加虛擬主機
3.添加隊列
不同的消息隊列保存不同類型的消息,如支付消息、秒殺消息、數(shù)據(jù)同步消息等。 添加隊列,需要填寫虛擬主機、類型、名稱、持久化、自動刪除和參數(shù)等。
4.添加交換機
生產(chǎn)者將消息發(fā)送到交換機Exchange,再由交換機路由到一個或多個隊列中; 交換器的類型有fanout、direct、topic、headers這四種,下篇文章將詳細介紹。 添加交換機
RabbitMQ的五種消息模型
RabbitMQ提供了多種消息模型,官網(wǎng)上第6種是RPC不屬于常規(guī)的消息隊列。 屬于消息模型的是前5種:
簡單的一對一模型工作隊列模型 ,一個生產(chǎn)者將消息分發(fā)給多個消費者發(fā)布/訂閱模型 ,生產(chǎn)者發(fā)布消息,多個消費者同時收取路由模型 ,生產(chǎn)者通過關(guān)鍵字發(fā)送消息給特定消費者主題模型 ,路由模式基礎(chǔ)上,在關(guān)鍵字里加入了通配符
1.一對一模型
最基本的隊列模型: 一個生產(chǎn)者發(fā)送消息到一個隊列,一個消費者從隊列中取消息。
2.操作步驟
1)啟動Rabbitmq,在管理頁面中創(chuàng)建用戶admin 2)使用admin登錄,然后創(chuàng)建虛擬主機myhost
3.案例代碼
導(dǎo)入依賴
開發(fā)工具類
public class MQUtils {
public static final String QUEUE_NAME = "myqueue01";
public static final String QUEUE_NAME2 = "myqueue02";
public static final String EXCHANGE_NAME = "myexchange01";
public static final String EXCHANGE_NAME2 = "myexchange02";
public static final String EXCHANGE_NAME3 = "myexchange03";
/**
* 獲得MQ的連接
* @return
* @throws IOException
*/
public static Connection getConnection() throws IOException {
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置服務(wù)器名、端口、虛擬主機名、登錄賬號和密碼
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("myhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}
}
開發(fā)生產(chǎn)者
/**
* 生產(chǎn)者,發(fā)送簡單的消息到隊列中
*/
public class SimpleProducer {
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//定義隊列
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
String msg = "Hello World!";
//發(fā)布消息到隊列
channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());
channel.close();
connection.close();
}
}
運行生產(chǎn)者代碼,管理頁面點進myqueue01,在GetMessages中可以看到消息
開發(fā)消費者
/**
* 消費者,從隊列中讀取簡單的消息
*/
public class SimpleConsumer {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//定義隊列
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//創(chuàng)建消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//消費者消費通道中的消息
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
//讀取消息
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
}
}
}
工作隊列模型
工作隊列,生產(chǎn)者將消息分發(fā)給多個消費者,如果生產(chǎn)者生產(chǎn)了100條消息,消費者1消費50條,消費者2消費50條。
1 案例代碼
開發(fā)生產(chǎn)者
/**
多對多模式的生產(chǎn)者,會發(fā)送多條消息到隊列中
*/
public class WorkProductor {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
for(int i = 0;i < 100;i++){
String msg = "Hello-->" + i;
channel.basicPublish("",MQUtils.QUEUE_NAME,null, msg.getBytes());
System.out.println("send:" + msg);
Thread.sleep(10);
}
channel.close();
connection.close();
}
}
開發(fā)消費者1
/**
* 多對多模式的消費者1
*/
public class WorkConsumer01 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//消費者消費通道中的消息
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
Thread.sleep(10);
}
}
}
開發(fā)消費者2
/**
* 多對多模式的消費者2
*/
public class WorkConsumer02 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//消費者消費通道中的消息
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
Thread.sleep(1000);
}
}
}
生產(chǎn)者發(fā)送100個消息,兩個消費者分別讀取了50條。 看消息內(nèi)容,發(fā)現(xiàn)隊列發(fā)送消息采用的是輪詢方式,也就是先發(fā)給消費者1,再發(fā)給消費者2,依次往復(fù)。
2 能者多勞
上面案例中有一個問題:消費者處理消息的速度是不一樣的,消費者1處理后睡眠10毫秒(Thread.sleep(10)),消費者2是1000毫秒,速度相差100倍,但是最后處理的消息數(shù)還是一樣的。這樣就存在效率問題:處理能力強的消費者得不到更多的消息。
因為隊列默認(rèn)采用是自動確認(rèn)機制,消息發(fā)過去后就自動確認(rèn),隊列不清楚每個消息具體什么時間處理完,所以平均分配消息數(shù)量。
實現(xiàn)能者多勞:
channel.basicQos(1);限制隊列一次發(fā)一個消息給消費者,等消費者有了反饋,再發(fā)下一條channel.basicAck 消費完消息后手動反饋,處理快的消費者就能處理更多消息basicConsume 中的參數(shù)改為false
/**
多對多模式的消費者1
*/
public class WorkConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//同一時刻服務(wù)器只發(fā)送一條消息給消費者
channel.basicQos(1);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//true是自動返回完成狀態(tài),false表示手動
channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
Thread.sleep(10);
//手動確定返回狀態(tài),不寫就是自動確認(rèn)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
/**
* 多對多模式的消費者2
*/
public class WorkConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//同一時刻服務(wù)器只發(fā)送一條消息給消費者
channel.basicQos(1);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//true是自動返回完成狀態(tài),false表示手動
channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
Thread.sleep(1000);
//手動確定返回狀態(tài),不寫就是自動確認(rèn)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
3 發(fā)布/訂閱模型
發(fā)布/訂閱模式和Work模式的區(qū)別是:Work模式只存在一個隊列,多個消費者共同消費一個隊列中的消息;而發(fā)布訂閱模式存在多個隊列,不同的消費者可以從各自的隊列中處理完全相同的消息。
1. 操作步驟
實現(xiàn)步驟:
創(chuàng)建交換機(Exchange)類型是fanout(扇出)交換機需要綁定不同的隊列不同的消費者從不同的隊列中獲得消息生產(chǎn)者發(fā)送消息到交換機再由交換機將消息分發(fā)到多個隊列
新建隊列
新建交換機
點擊交換機,在bindings里面綁定兩個隊列
2.案例代碼
生產(chǎn)者
/**
* 發(fā)布和訂閱模式的生產(chǎn)者,消息會通過交換機發(fā)到隊列
*/
public class PublishProductor {
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明fanout exchange
channel.exchangeDeclare(MQUtils.EXCHANGE_NAME,"fanout");
String msg = "Hello Fanout";
//發(fā)布消息到交換機
channel.basicPublish(MQUtils.EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消費者1
/**
* 發(fā)布訂閱模式的消費者1
* 兩個消費者綁定的消息隊列不同
* 通過交換機一個消息能被不同隊列的兩個消費者同時獲取
* 一個隊列可以有多個消費者,隊列中的消息只能被一個消費者獲取
*/
public class SubscribeConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
//綁定隊列1到交換機
channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME,"");
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("Consumer1 receive :" + new String(delivery.getBody()));
}
}
}
消費者2
public class SubscribeConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
//綁定隊列2到交換機
channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME,"");
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("Consumer2 receive :" + new String(delivery.getBody()));
}
}
}
路由模型
路由模式的消息隊列可以給隊列綁定不同的key,生產(chǎn)者發(fā)送消息時,給消息設(shè)置不同的key,這樣交換機在分發(fā)消息時,可以讓消息路由到key匹配的隊列中。 可以想象上圖是一個日志處理系統(tǒng),C1可以處理error日志消息,C2可以處理info\error\warining類型的日志消息,使用路由模式就很容易實現(xiàn)了。
1 操作步驟
新建direct類型的交換機
2.案例代碼
生產(chǎn)者,給myqueue01綁定了key:error,myqueue02綁定了key:debug,然后發(fā)送了key:error的消息
/**
路由模式的生產(chǎn)者,發(fā)布消息會有特定的Key,消息會被綁定特定Key的消費者獲取
*/
public class RouteProductor {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明交換機類型為direct
channel.exchangeDeclare(MQUtils.EXCHANGE_NAME2,"direct");
String msg = "Hello-->Route";
//綁定隊列1到交換機,指定了Key為error
channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME2,"error");
//綁定隊列2到交換機,指定了Key為debug
channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME2,"debug");
//error是一個指定的Key
channel.basicPublish(MQUtils.EXCHANGE_NAME2,"error",null,msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消費者1
/**
* 路由模式的消費者1
* 可以指定Key,消費特定的消息
*/
public class RouteConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("RouteConsumer1 receive :" + new String(delivery.getBody()));
}
}
}
消費者2
/**
* 路由模式的消費者2
* 可以指定Key,消費特定的消息
*/
public class RouteConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("RouteConsumer2 receive :" + new String(delivery.getBody()));
}
}
}
主題模型
主題模式和路由模式差不多,在key中可以加入通配符:
* 匹配任意一個單詞 com.* ----> com.hopu com.blb com.baidu# 匹配.號隔開的0個或多個單詞 com.# —> com.hopu.net com.hopu com.163.xxx.xxx.xxx
1 案例代碼
生產(chǎn)者代碼
/**
主題模式的生產(chǎn)者
*/
public class TopicProductor {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//聲明交換機類型為topic
channel.exchangeDeclare(MQUtils.EXCHANGE_NAME3,"topic");
//綁定隊列到交換機,最后指定了Key
channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME3,"xray.#");
//綁定隊列到交換機,最后指定了Key
channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME3,"*.*.cn");
String msg = "Hello-->Topic";
channel.basicPublish(MQUtils.EXCHANGE_NAME3,"rabbit.com.cn",null,msg.getBytes());
System.out.println("send:" + msg);
channel.close();
connection.close();
}
}
消費者1
/**
* 主題模式的消費者1 ,類似路由模式,可以使用通配符對Key進行篩選
* #匹配1個或多個單詞,*匹配一個單詞
*/
public class TopicConsumer1 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("TopicConsumer1 receive :" + new String(delivery.getBody()));
}
}
}
消費者2
/**
* 主題模式的消費者2
*/
public class TopicConsumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("TopicConsumer2 receive :" + new String(delivery.getBody()));
}
}
}
SpringBoot整合RabbitMQ
創(chuàng)建兩個SpringBoot項目,一個作為生產(chǎn)者,一個作為消費者
生產(chǎn)者會發(fā)送兩種消息:保存課程(更新和添加),刪除課程
消費者監(jiān)聽兩個隊列:保存課程隊列和刪除課程隊列
2)給生產(chǎn)者和消費者服務(wù)添加依賴
3) 給生產(chǎn)者和消費者服務(wù)添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: myhost
4)生產(chǎn)者的配置,用于生成消息隊列和交換機
/**
* RabbitMQ的配置
*/
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "edu.course.exchange";
@Bean
public Queue queueCourseSave() {
return new Queue(QUEUE_COURSE_SAVE);
}
@Bean
public Queue queueCourseRemove() {
return new Queue(QUEUE_COURSE_REMOVE);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(COURSE_EXCHANGE);
}
@Bean
public Binding bindCourseSave() {
return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
}
@Bean
public Binding bindCourseRemove() {
return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
}
}
5) 生產(chǎn)者發(fā)送消息的核心代碼
@Autowired
RabbitTemplate rabbitTemplate;
//發(fā)消息的代碼
rabbitTemplate.convertAndSend(交換機的名稱,消息的key,消息內(nèi)容);
6)消費者添加監(jiān)聽器
@Slf4j
@Component
public class CourseMQListener {
public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "course.exchange";
/**
* 監(jiān)聽課程添加操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_SAVE)})
public void receiveCourseSaveMessage(String message) {
try {
log.info("課程添加:{}",message);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 監(jiān)聽課程刪除操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_REMOVE)})
public void receiveCourseDeleteMessage(Long id) {
try {
log.info("課程刪除完成:{}",id);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
CourseSaveMessage(String message) { try { log.info(“課程添加:{}”,message); } catch (Exception ex) { ex.printStackTrace(); } }
/**
* 監(jiān)聽課程刪除操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_REMOVE)})
public void receiveCourseDeleteMessage(Long id) {
try {
log.info("課程刪除完成:{}",id);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
柚子快報激活碼778899分享:分布式 RabbitMQ
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。