欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ

Tiki奇趣購綜合2025-05-05470

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ

http://yzkb.51969.com/

讓你快速了解RabbitMQ

什么是RabbitM

RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)

消息隊(duì)列的概念

哪什么是消息隊(duì)列呢?

消息隊(duì)列(Message Queue)”是在消息的傳輸過程中保存消息的容器。在消息隊(duì)列中,通常有生產(chǎn)者和消費(fèi)者兩個(gè)角色。生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù)到消息隊(duì)列,誰從消息隊(duì)列中取出數(shù)據(jù)處理,他不管。消費(fèi)者只負(fù)責(zé)從消息隊(duì)列中取出數(shù)據(jù)處理,他不管這是誰發(fā)送的數(shù)據(jù)。

哪消息隊(duì)列有什么作用呢

主要作用有三點(diǎn):

1.解耦

實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的解耦,生產(chǎn)者和消費(fèi)者不直接調(diào)用,也不用關(guān)心對(duì)方如何處理,代碼的維護(hù)性提高

比如使用openfeign實(shí)現(xiàn)服務(wù)調(diào)用,被調(diào)用服務(wù)的接口發(fā)生修改,服務(wù)調(diào)用方也需要進(jìn)行修改,服務(wù)之間的耦合性就會(huì)較高,那么不利于開發(fā)和維護(hù)

2.異步

同步調(diào)用,服務(wù)A調(diào)用服務(wù)B,必須等待服務(wù)B執(zhí)行完業(yè)務(wù),服務(wù)A才能執(zhí)行其它業(yè)務(wù)

異步調(diào)用,服務(wù)A發(fā)送消息給消息隊(duì)列,馬上返回完成其它業(yè)務(wù),不用等待服務(wù)B執(zhí)行完

3.削峰

這其實(shí)是MQ一個(gè)很重要的應(yīng)用,可以通過控制消息隊(duì)列的長度來限制請(qǐng)求流量,從而達(dá)到限流保護(hù)服務(wù)器的作用

假如說系統(tǒng)A在某個(gè)時(shí)間段請(qǐng)求大量增加,有上萬條數(shù)據(jù)發(fā)送過來,系統(tǒng)A就會(huì)把發(fā)送過來的數(shù)據(jù)直接發(fā)到SQL中,mysl數(shù)據(jù)庫里執(zhí)行,如果處理不過來就會(huì)直接導(dǎo)致系統(tǒng)癱瘓,使用MQ,系統(tǒng)A不再是直接發(fā)送SQL到數(shù)據(jù)庫,而是把數(shù)據(jù)發(fā)送到MQ,MQ短時(shí)間積壓數(shù)據(jù)是可以接受的,然后由消費(fèi)者每次拉取2000條進(jìn)行處理,防止在請(qǐng)求峰值時(shí)期大量的請(qǐng)求直接發(fā)送到MySQL導(dǎo)致系統(tǒng)崩潰。

同時(shí)消息隊(duì)列也有它的缺點(diǎn)

1.系統(tǒng)的復(fù)雜性提高了

2.系統(tǒng)的可用性降低了

消息隊(duì)列的概念

生產(chǎn)者 向消息隊(duì)列發(fā)送消息的服務(wù) 消費(fèi)者 從消息隊(duì)列取消息的服務(wù) 隊(duì)列 queue 存放消息的容器,采用FIFO數(shù)據(jù)結(jié)構(gòu) 交換機(jī) exchange 實(shí)現(xiàn)消息路由,將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中 消息服務(wù)器 Broker 進(jìn)行消息通信的軟件平臺(tái)服務(wù)器 虛擬主機(jī) virtual host 類似于namespace,將不同用戶的交換機(jī)和隊(duì)列區(qū)分開來 連接 connection 網(wǎng)絡(luò)連接 通道 channel 數(shù)據(jù)通信的通道

RabbitMQ的基本使用

1.添加用戶

不同的系統(tǒng)可以使用各自的用戶登錄RabbitMQ,可以在Admin的User頁面添加新用戶

2.添加虛擬主機(jī)

虛擬主機(jī)相當(dāng)于一個(gè)獨(dú)立的MQ服務(wù),有自身的隊(duì)列、交換機(jī)、綁定策略等。 添加虛擬主機(jī)

3.添加隊(duì)列

不同的消息隊(duì)列保存不同類型的消息,如支付消息、秒殺消息、數(shù)據(jù)同步消息等。 添加隊(duì)列,需要填寫虛擬主機(jī)、類型、名稱、持久化、自動(dòng)刪除和參數(shù)等。

4.添加交換機(jī)

生產(chǎn)者將消息發(fā)送到交換機(jī)Exchange,再由交換機(jī)路由到一個(gè)或多個(gè)隊(duì)列中; 交換器的類型有fanout、direct、topic、headers這四種,下篇文章將詳細(xì)介紹。 添加交換機(jī)

RabbitMQ的五種消息模型

RabbitMQ提供了多種消息模型,官網(wǎng)上第6種是RPC不屬于常規(guī)的消息隊(duì)列。 屬于消息模型的是前5種:

簡單的一對(duì)一模型工作隊(duì)列模型 ,一個(gè)生產(chǎn)者將消息分發(fā)給多個(gè)消費(fèi)者發(fā)布/訂閱模型 ,生產(chǎn)者發(fā)布消息,多個(gè)消費(fèi)者同時(shí)收取路由模型 ,生產(chǎn)者通過關(guān)鍵字發(fā)送消息給特定消費(fèi)者主題模型 ,路由模式基礎(chǔ)上,在關(guān)鍵字里加入了通配符

1.一對(duì)一模型

最基本的隊(duì)列模型: 一個(gè)生產(chǎn)者發(fā)送消息到一個(gè)隊(duì)列,一個(gè)消費(fèi)者從隊(duì)列中取消息。

2.操作步驟

1)啟動(dòng)Rabbitmq,在管理頁面中創(chuàng)建用戶admin 2)使用admin登錄,然后創(chuàng)建虛擬主機(jī)myhost

3.案例代碼

導(dǎo)入依賴

com.rabbitmq

amqp-client

3.4.1

開發(fā)工具類

public class MQUtils {

public static final String QUEUE_NAME = "myqueue01";

public static final String QUEUE_NAME2 = "myqueue02";

public static final String EXCHANGE_NAME = "myexchange01";

public static final String EXCHANGE_NAME2 = "myexchange02";

public static final String EXCHANGE_NAME3 = "myexchange03";

/**

* 獲得MQ的連接

* @return

* @throws IOException

*/

public static Connection getConnection() throws IOException {

ConnectionFactory connectionFactory = new ConnectionFactory();

//配置服務(wù)器名、端口、虛擬主機(jī)名、登錄賬號(hào)和密碼

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("myhost");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("123456");

return connectionFactory.newConnection();

}

}

開發(fā)生產(chǎn)者

/**

* 生產(chǎn)者,發(fā)送簡單的消息到隊(duì)列中

*/

public class SimpleProducer {

public static void main(String[] args) throws IOException {

Connection connection = MQUtils.getConnection();

//創(chuàng)建通道

Channel channel = connection.createChannel();

//定義隊(duì)列

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

String msg = "Hello World!";

//發(fā)布消息到隊(duì)列

channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());

channel.close();

connection.close();

}

}

運(yùn)行生產(chǎn)者代碼,管理頁面點(diǎn)進(jìn)myqueue01,在GetMessages中可以看到消息

開發(fā)消費(fèi)者

/**

* 消費(fèi)者,從隊(duì)列中讀取簡單的消息

*/

public class SimpleConsumer {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//定義隊(duì)列

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//創(chuàng)建消費(fèi)者

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//消費(fèi)者消費(fèi)通道中的消息

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

//讀取消息

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println(new String(delivery.getBody()));

}

}

}

工作隊(duì)列模型

工作隊(duì)列,生產(chǎn)者將消息分發(fā)給多個(gè)消費(fèi)者,如果生產(chǎn)者生產(chǎn)了100條消息,消費(fèi)者1消費(fèi)50條,消費(fèi)者2消費(fèi)50條。

1 案例代碼

開發(fā)生產(chǎn)者

/**

多對(duì)多模式的生產(chǎn)者,會(huì)發(fā)送多條消息到隊(duì)列中

*/

public class WorkProductor {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

for(int i = 0;i < 100;i++){

String msg = "Hello-->" + i;

channel.basicPublish("",MQUtils.QUEUE_NAME,null, msg.getBytes());

System.out.println("send:" + msg);

Thread.sleep(10);

}

channel.close();

connection.close();

}

}

開發(fā)消費(fèi)者1

/**

* 多對(duì)多模式的消費(fèi)者1

*/

public class WorkConsumer01 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//消費(fèi)者消費(fèi)通道中的消息

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));

Thread.sleep(10);

}

}

}

開發(fā)消費(fèi)者2

/**

* 多對(duì)多模式的消費(fèi)者2

*/

public class WorkConsumer02 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//消費(fèi)者消費(fèi)通道中的消息

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));

Thread.sleep(1000);

}

}

}

生產(chǎn)者發(fā)送100個(gè)消息,兩個(gè)消費(fèi)者分別讀取了50條。 看消息內(nèi)容,發(fā)現(xiàn)隊(duì)列發(fā)送消息采用的是輪詢方式,也就是先發(fā)給消費(fèi)者1,再發(fā)給消費(fèi)者2,依次往復(fù)。

2 能者多勞

上面案例中有一個(gè)問題:消費(fèi)者處理消息的速度是不一樣的,消費(fèi)者1處理后睡眠10毫秒(Thread.sleep(10)),消費(fèi)者2是1000毫秒,速度相差100倍,但是最后處理的消息數(shù)還是一樣的。這樣就存在效率問題:處理能力強(qiáng)的消費(fèi)者得不到更多的消息。

因?yàn)殛?duì)列默認(rèn)采用是自動(dòng)確認(rèn)機(jī)制,消息發(fā)過去后就自動(dòng)確認(rèn),隊(duì)列不清楚每個(gè)消息具體什么時(shí)間處理完,所以平均分配消息數(shù)量。

實(shí)現(xiàn)能者多勞:

channel.basicQos(1);限制隊(duì)列一次發(fā)一個(gè)消息給消費(fèi)者,等消費(fèi)者有了反饋,再發(fā)下一條channel.basicAck 消費(fèi)完消息后手動(dòng)反饋,處理快的消費(fèi)者就能處理更多消息basicConsume 中的參數(shù)改為false

/**

多對(duì)多模式的消費(fèi)者1

*/

public class WorkConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)者

channel.basicQos(1);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//true是自動(dòng)返回完成狀態(tài),false表示手動(dòng)

channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));

Thread.sleep(10);

//手動(dòng)確定返回狀態(tài),不寫就是自動(dòng)確認(rèn)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

/**

* 多對(duì)多模式的消費(fèi)者2

*/

public class WorkConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)者

channel.basicQos(1);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//true是自動(dòng)返回完成狀態(tài),false表示手動(dòng)

channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));

Thread.sleep(1000);

//手動(dòng)確定返回狀態(tài),不寫就是自動(dòng)確認(rèn)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

3 發(fā)布/訂閱模型

發(fā)布/訂閱模式和Work模式的區(qū)別是:Work模式只存在一個(gè)隊(duì)列,多個(gè)消費(fèi)者共同消費(fèi)一個(gè)隊(duì)列中的消息;而發(fā)布訂閱模式存在多個(gè)隊(duì)列,不同的消費(fèi)者可以從各自的隊(duì)列中處理完全相同的消息。

1. 操作步驟

實(shí)現(xiàn)步驟:

創(chuàng)建交換機(jī)(Exchange)類型是fanout(扇出)交換機(jī)需要綁定不同的隊(duì)列不同的消費(fèi)者從不同的隊(duì)列中獲得消息生產(chǎn)者發(fā)送消息到交換機(jī)再由交換機(jī)將消息分發(fā)到多個(gè)隊(duì)列

新建隊(duì)列

新建交換機(jī)

點(diǎn)擊交換機(jī),在bindings里面綁定兩個(gè)隊(duì)列

2.案例代碼

生產(chǎn)者

/**

* 發(fā)布和訂閱模式的生產(chǎn)者,消息會(huì)通過交換機(jī)發(fā)到隊(duì)列

*/

public class PublishProductor {

public static void main(String[] args) throws IOException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//聲明fanout exchange

channel.exchangeDeclare(MQUtils.EXCHANGE_NAME,"fanout");

String msg = "Hello Fanout";

//發(fā)布消息到交換機(jī)

channel.basicPublish(MQUtils.EXCHANGE_NAME,"",null,msg.getBytes());

System.out.println("send:" + msg);

channel.close();

connection.close();

}

}

消費(fèi)者1

/**

* 發(fā)布訂閱模式的消費(fèi)者1

* 兩個(gè)消費(fèi)者綁定的消息隊(duì)列不同

* 通過交換機(jī)一個(gè)消息能被不同隊(duì)列的兩個(gè)消費(fèi)者同時(shí)獲取

* 一個(gè)隊(duì)列可以有多個(gè)消費(fèi)者,隊(duì)列中的消息只能被一個(gè)消費(fèi)者獲取

*/

public class SubscribeConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//綁定隊(duì)列1到交換機(jī)

channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME,"");

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("Consumer1 receive :" + new String(delivery.getBody()));

}

}

}

消費(fèi)者2

public class SubscribeConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);

//綁定隊(duì)列2到交換機(jī)

channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME,"");

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("Consumer2 receive :" + new String(delivery.getBody()));

}

}

}

路由模型

路由模式的消息隊(duì)列可以給隊(duì)列綁定不同的key,生產(chǎn)者發(fā)送消息時(shí),給消息設(shè)置不同的key,這樣交換機(jī)在分發(fā)消息時(shí),可以讓消息路由到key匹配的隊(duì)列中。 可以想象上圖是一個(gè)日志處理系統(tǒng),C1可以處理error日志消息,C2可以處理info\error\warining類型的日志消息,使用路由模式就很容易實(shí)現(xiàn)了。

1 操作步驟

新建direct類型的交換機(jī)

2.案例代碼

生產(chǎn)者,給myqueue01綁定了key:error,myqueue02綁定了key:debug,然后發(fā)送了key:error的消息

/**

路由模式的生產(chǎn)者,發(fā)布消息會(huì)有特定的Key,消息會(huì)被綁定特定Key的消費(fèi)者獲取

*/

public class RouteProductor {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//聲明交換機(jī)類型為direct

channel.exchangeDeclare(MQUtils.EXCHANGE_NAME2,"direct");

String msg = "Hello-->Route";

//綁定隊(duì)列1到交換機(jī),指定了Key為error

channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME2,"error");

//綁定隊(duì)列2到交換機(jī),指定了Key為debug

channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME2,"debug");

//error是一個(gè)指定的Key

channel.basicPublish(MQUtils.EXCHANGE_NAME2,"error",null,msg.getBytes());

System.out.println("send:" + msg);

channel.close();

connection.close();

}

}

消費(fèi)者1

/**

* 路由模式的消費(fèi)者1

* 可以指定Key,消費(fèi)特定的消息

*/

public class RouteConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("RouteConsumer1 receive :" + new String(delivery.getBody()));

}

}

}

消費(fèi)者2

/**

* 路由模式的消費(fèi)者2

* 可以指定Key,消費(fèi)特定的消息

*/

public class RouteConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("RouteConsumer2 receive :" + new String(delivery.getBody()));

}

}

}

主題模型

主題模式和路由模式差不多,在key中可以加入通配符:

* 匹配任意一個(gè)單詞 com.* ----> com.hopu com.blb com.baidu# 匹配.號(hào)隔開的0個(gè)或多個(gè)單詞 com.# —> com.hopu.net com.hopu com.163.xxx.xxx.xxx

1 案例代碼

生產(chǎn)者代碼

/**

主題模式的生產(chǎn)者

*/

public class TopicProductor {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//聲明交換機(jī)類型為topic

channel.exchangeDeclare(MQUtils.EXCHANGE_NAME3,"topic");

//綁定隊(duì)列到交換機(jī),最后指定了Key

channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME3,"xray.#");

//綁定隊(duì)列到交換機(jī),最后指定了Key

channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME3,"*.*.cn");

String msg = "Hello-->Topic";

channel.basicPublish(MQUtils.EXCHANGE_NAME3,"rabbit.com.cn",null,msg.getBytes());

System.out.println("send:" + msg);

channel.close();

connection.close();

}

}

消費(fèi)者1

/**

* 主題模式的消費(fèi)者1 ,類似路由模式,可以使用通配符對(duì)Key進(jìn)行篩選

* #匹配1個(gè)或多個(gè)單詞,*匹配一個(gè)單詞

*/

public class TopicConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("TopicConsumer1 receive :" + new String(delivery.getBody()));

}

}

}

消費(fèi)者2

/**

* 主題模式的消費(fèi)者2

*/

public class TopicConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("TopicConsumer2 receive :" + new String(delivery.getBody()));

}

}

}

SpringBoot整合RabbitMQ

創(chuàng)建兩個(gè)SpringBoot項(xiàng)目,一個(gè)作為生產(chǎn)者,一個(gè)作為消費(fèi)者

生產(chǎn)者會(huì)發(fā)送兩種消息:保存課程(更新和添加),刪除課程

消費(fèi)者監(jiān)聽兩個(gè)隊(duì)列:保存課程隊(duì)列和刪除課程隊(duì)列

2)給生產(chǎn)者和消費(fèi)者服務(wù)添加依賴

org.springframework.boot

spring-boot-starter-amqp

3) 給生產(chǎn)者和消費(fèi)者服務(wù)添加配置

spring:

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

virtual-host: myhost

4)生產(chǎn)者的配置,用于生成消息隊(duì)列和交換機(jī)

/**

* RabbitMQ的配置

*/

@Configuration

public class RabbitMQConfig {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";

public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";

public static final String KEY_COURSE_SAVE = "key.course.save";

public static final String KEY_COURSE_REMOVE = "key.course.remove";

public static final String COURSE_EXCHANGE = "edu.course.exchange";

@Bean

public Queue queueCourseSave() {

return new Queue(QUEUE_COURSE_SAVE);

}

@Bean

public Queue queueCourseRemove() {

return new Queue(QUEUE_COURSE_REMOVE);

}

@Bean

public TopicExchange topicExchange() {

return new TopicExchange(COURSE_EXCHANGE);

}

@Bean

public Binding bindCourseSave() {

return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);

}

@Bean

public Binding bindCourseRemove() {

return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);

}

}

5) 生產(chǎn)者發(fā)送消息的核心代碼

@Autowired

RabbitTemplate rabbitTemplate;

//發(fā)消息的代碼

rabbitTemplate.convertAndSend(交換機(jī)的名稱,消息的key,消息內(nèi)容);

6)消費(fèi)者添加監(jiān)聽器

@Slf4j

@Component

public class CourseMQListener {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";

public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";

public static final String KEY_COURSE_SAVE = "key.course.save";

public static final String KEY_COURSE_REMOVE = "key.course.remove";

public static final String COURSE_EXCHANGE = "course.exchange";

/**

* 監(jiān)聽課程添加操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_SAVE)})

public void receiveCourseSaveMessage(String message) {

try {

log.info("課程添加:{}",message);

} catch (Exception ex) {

ex.printStackTrace();

}

}

/**

* 監(jiān)聽課程刪除操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_REMOVE)})

public void receiveCourseDeleteMessage(Long id) {

try {

log.info("課程刪除完成:{}",id);

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

CourseSaveMessage(String message) { try { log.info(“課程添加:{}”,message); } catch (Exception ex) { ex.printStackTrace(); } }

/**

* 監(jiān)聽課程刪除操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_REMOVE)})

public void receiveCourseDeleteMessage(Long id) {

try {

log.info("課程刪除完成:{}",id);

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ

http://yzkb.51969.com/

文章來源

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19056276.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄