欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:RabbitMQ 學(xué)習(xí)筆記

柚子快報邀請碼778899分享:RabbitMQ 學(xué)習(xí)筆記

http://yzkb.51969.com/

RabbitMQ

RabbitMQ:高性能的異步通訊組件

1. 初識 MQ

1.1 同步操作

本次操作必須知道上次操作的結(jié)果缺點

拓展性差性能下降級聯(lián)失敗 優(yōu)點:

即時性高,即刻知道結(jié)果

1.2 異步操作

每次的操作相互獨立,互不影響三種角色

消息發(fā)送者:生產(chǎn)者消息代理(broker)消息接收:消費者 優(yōu)點

生產(chǎn)者只需生產(chǎn)一次消費者監(jiān)聽消息代理,便于拓展,解除耦合無需等待,性能變好故障隔離緩存消息,削峰填谷 缺點

及時性差,不能立即得到結(jié)果不確定下游業(yè)務(wù)是否執(zhí)行成功業(yè)務(wù)安全依賴于 broker 的可靠性

1.3 MQ

MQ:Message Queue,即消息隊列(先進先出),異步調(diào)用中的 broker

2. Rabbit MQ

2.1 安裝

Rabbit MQ 官網(wǎng):https://www.rabbitmq.com/

Docker 安裝

docker run \

-e RABBITMQ_DEFAULT_USER=itheima \

-e RABBITMQ_DEFAULT_PASS=123456 \

-v mq-plugins:/plugins \

--name mq \

--hostname mq \

-p 15672:15672 \

-p 5672:5672 \

--network hmall \

-d \

rabbitmq:3.8-management

# 15672端口:圖形化界面

# 5672端口:sh

docker run -e RABBITMQ_DEFAULT_USER=itheima -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 --network hmall -d rabbitmq:3.8-management

Publisher:生產(chǎn)者RabbitMQ Server Broker:RabbitMQ 消息代理VirtualHost:虛擬主機,起數(shù)據(jù)隔離作用,每個vh中的exchange和queue相互獨立。exchange:交換機,消息必須由exchange分發(fā)給不同的queuequeue:消息隊列,接收exchange發(fā)送的消息consumer:消費者,監(jiān)聽queue

2.2 快速入門

AMQP:Advance Message Queuing Protocol(高級消息隊列協(xié)議),與語言平臺無關(guān),即可以跨語言。

Spring AMQP:spring 基于 AMQP 協(xié)議的基礎(chǔ)上用 java 重新封裝的 api。

1. 創(chuàng)建聚合項目

# mq_demo pom文件

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

mq_demo

pom

1.0-SNAPSHOT

consumer

publisher

pom

spring-boot-starter-parent

org.springframework.boot

2.7.11

8

8

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-test

# publisher pom文件

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

publisher

mqDemo

com.example

0.0.1-SNAPSHOT

8

8

1. 父項目引入

這樣每個微服務(wù)都可以使用

org.springframework.boot

spring-boot-starter-amqp

2. 每個微服務(wù)配置MQ服務(wù)器信息

logging:

pattern:

dateformat: MM-dd HH:mm:ss:SSS

spring:

rabbitmq:

host: 192.168.138.10 # 地址

port: 5672 # 端口

virtual-host: /hall # 虛擬主機

username: hmall # 用戶名

password: 123456 # 密碼

注意:

上面的配置信息必須一一對應(yīng),需要與15672端口查看消費者與生產(chǎn)者都必須配置

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

void testSendMessage2Queue() {

// 隊列名

String queueName = "simple.queue";

// 消息

String msg = "hello, AMQP";

// 發(fā)送消息

rabbitTemplate.convertAndSend(queueName,msg);

}

3. 接收消息

SpringAMQP 提供聲明式的消息監(jiān)聽,需要通過注解在方法上聲明要監(jiān)聽的隊列名稱,將來Spring AMQP 就會把消息傳遞給當(dāng)前方法。

@Component

@Slf4j

public class MqListener {

@RabbitListener(queues = "simple")

// msg 類型和上面?zhèn)魉偷念愋鸵恢?/p>

public void ListenerSimpleQueue(String msg) {

System.out.println("消費者監(jiān)聽到了 simple 的消息:【" + msg + "】");

}

}

注意:

啟動的是consumer 的啟動類,不是測試類consumer 也得配置 MQ 服務(wù)器信息

3. Work Queues

任務(wù)模型:讓多個消費者綁定一個消息隊列,共同消費隊列中的信息(每條信息只會被其中之一的消費者消費)

創(chuàng)建 work.queue 消息隊列 生產(chǎn)者一秒鐘產(chǎn)生 50 個消息 c1 一秒消費一條 c2 兩秒消費一條

3.1. 初始化

1. 創(chuàng)建 work.queue 消息隊列

2. 生產(chǎn)者生產(chǎn)消息

@Test

void testWorkQueue() throws InterruptedException {

String queueName = "work.queue";

for (int i = 1; i <= 50; i++) {

String msg = "hello, worker, message_" + i;

rabbitTemplate.convertAndSend(queueName,msg);

Thread.sleep(20);

}

}

3. 消費者消費

@RabbitListener(queues = "work.queue")

public void ListenerWorkQueue1(String msg) {

System.out.println("消費者 1 監(jiān)聽到了 work 的消息:【" + msg + "】");

}

@RabbitListener(queues = "work.queue")

public void ListenerWorkQueue(String msg) {

System.err.println("消費者 2 監(jiān)聽到了 work 的消息.........:【" + msg + "】");

}

觀察到:

消費者 1 全是奇數(shù),消費者 2 全是偶數(shù)信息只會被消費一次消費者 1 和消費者 2 所有消費的信息之和 = 生產(chǎn)所有的信息

修改代碼:

假設(shè)消費者 1 的性能好,消費者 2 的性能相對弱一點

@RabbitListener(queues = "work.queue")

public void ListenerWorkQueue1(String msg) throws InterruptedException {

System.out.println("消費者 1 監(jiān)聽到了 work 的消息:【" + msg + "】");

Thread.sleep(20);

}

@RabbitListener(queues = "work.queue")

public void ListenerWorkQueue(String msg) throws InterruptedException {

System.err.println("消費者 2 監(jiān)聽到了 work 的消息.........:【" + msg + "】");

Thread.sleep(200);

}

觀察到:即使是性能不同

消費者1 和 消費者2 都是對半的消息數(shù)量同時消費者 1 消費奇數(shù),消費者 2 消費偶數(shù)消費者1 消費過了,消費者2 不會消費

4. 思考

輪詢的結(jié)果是一人投一個,如果想讓性能好的機器多消費一點,性能差的機器消費少一點怎么辦?

3.2. 消費者消息推送限制

默認(rèn)情況下,RabbitMQ會將消息依次輪詢投遞給綁定在隊列上的每一個消費者。但并沒有考慮到消費者是否已經(jīng)處理完消息,可能出現(xiàn)消息堆積。

因此我們需要修改 application.yml ,設(shè)置 preFectch 值為1,確保同一時刻最多投遞給消費者 1 條消息

spring:

rabbitmq:

listener:

simple:

prefetch: 1 # 每次只能取 1 條信息,信息處理完成才能獲取下一條(消費者端開啟)

觀察到:

序號變成順序 性能好的多接收消息,性能差的處理的消息少 也就是處理完了一條消息,下一條消息才發(fā)放出來

意義

當(dāng)產(chǎn)生的信息在隊列中遠(yuǎn)遠(yuǎn)超過單個消費者消費的能力,這時候出現(xiàn)消費堆積

處理消費堆積的方法之一:增加消費者,同時消費者的消費能力有大小,所以根據(jù)消費者的性能來消費消息意義重大。

3.3. 總結(jié)

Work 模型的使用:

多個消費者綁定一個隊列,可以加快消息處理速度同一條消息只會被一個消費者處理通過設(shè)置 prefetch 來控制消費者預(yù)取的消息數(shù)量,處理完一條在處理下一條,實現(xiàn)能者多勞

4. 交換機 exchange

真正生產(chǎn)環(huán)境都會經(jīng)過 exchange 來發(fā)送信息,而不是直接發(fā)送到隊列,交換機的類型有以下三種:

Fanout:廣播Direct:定向Topic:話題

4.1 Fanout 交換機

Fanout exchange 會將接收到的消息廣播到每一個跟其綁定的 queue ,所以也叫廣播模式。

4.1.1 測試

在可視化頁面中創(chuàng)建,隊列 fanout.queue1 和 fanout.queue2在可視化頁面中創(chuàng)建,交換機 hmall.fanout,將兩個隊列將其綁定在 consumer 服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽 fanout.queue1 和 fanout.queue2在 publisher 服務(wù)中,編寫測試方法,向 hmall.fanout 發(fā)送消息

1. 在可視化頁面中創(chuàng)建,隊列 fanout.queue1 和 fanout.queue2

2. 在可視化頁面中創(chuàng)建,交換機 hmall.fanout,將兩個隊列將其綁定

3. 在 consumer 服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽 fanout.queue1 和 fanout.queue2

@RabbitListener(queues = "fanout.queue1")

public void listenFanoutQueue1(String msg) {

System.out.println("消費者 1 監(jiān)聽到了 fanout.queue1 的消息:【" + msg + "】");

}

@RabbitListener(queues = "fanout.queue2")

public void listenFanoutQueue2(String msg) {

System.out.println("消費者 2 監(jiān)聽到了 fanout.queue2 的消息:【" + msg + "】");

}

4. 在 publisher 服務(wù)中,編寫測試方法,向 hmall.fanout 發(fā)送消息

@Test

void testFanoutExchange() {

String exchangeName = "hmall.fanout";

String msg = "hello, everyone";

// routingKey 暫未設(shè)置,可以為 null 或 ""

rabbitTemplate.convertAndSend(exchangeName,"",msg);

}

4.1.2 總結(jié)

交換機的作用是什么?

接收 publisher 生產(chǎn)的消息將消息按照規(guī)則路由到與之綁定的隊列FanoutExchange的會將消息路由到每個綁定的隊列

4.2 Direct 交換機

Direct exchange 會將接收到的消息根據(jù)規(guī)則路由到指定的 Queue,因此成為定向路由

每個 Queue 都與 Exchange 設(shè)置一個 BindingKey發(fā)布者發(fā)布消息時,指定消息的 RoutingKeyExchange 將消息路由到 BindingKey 和 RoutingKey 一致的隊列

4.2.1 案例

在可視化頁面創(chuàng)建,隊列 direct.queue1 和 direct.queue2在可視化頁面創(chuàng)建,交換機 hmall.direct,將兩個隊列與其綁定在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 direct.queue1 和 direct.queue2在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.direct 發(fā)送消息

1. 在可視化頁面創(chuàng)建,隊列 direct.queue1 和 direct.queue2

**2. 在可視化頁面創(chuàng)建,交換機 hmall.direct,將兩個隊列與其綁定 **

綁定:

一次 RoutingKey 只能寫一個兩個的寫兩次

3. 在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 direct.queue1 和 direct.queue2

@RabbitListener(queues = "direct.queue1")

public void listenDirectQueue1(String msg) {

System.out.println("消費者 1 接收到 direct.queue1 的消息:【" + msg + "】");

}

@RabbitListener(queues = "direct.queue2")

public void listenDirectQueue2(String msg) {

System.out.println("消費者 2 接收到 direct.queue2 的消息:【" + msg + "】");

}

4. 在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.direct 發(fā)送消息

@Test

void testDirectExchange() {

String exchangeName = "hmall.direct";

String red_msg = "紅色消息";

String yellow_msg = "黃色消息";

String blue_msg = "藍(lán)色消息";

rabbitTemplate.convertAndSend(exchangeName,"red",red_msg);

rabbitTemplate.convertAndSend(exchangeName,"yellow",yellow_msg);

rabbitTemplate.convertAndSend(exchangeName,"blue",blue_msg);

}

4.2.2 總結(jié)

描述下 Direct 交換機和 Fanout 交換機的差異?

Fanout 交換機是廣播 發(fā)送到每一個與之綁定的隊列Direct 交換機是根據(jù) RoutingKey 判斷發(fā)送給哪個隊列如果多個隊列具有相同的 RoutingKey,則與 Fanout 功能類似

4.3 Topic 交換機

Topic Exchange 與 Direct Exchange 類似,區(qū)別在于 RoutingKey 可以是多個單詞的列表,并且以 . 分割

Queue 和 Exchange 指定的 BindingKey 時可以使用通配符:

#:代指 0 或 多個單詞*****:代指一個單詞

4.3.1 案例

在可視化頁面創(chuàng)建,隊列 topic.queue1 和 topic.queue2在可視化頁面創(chuàng)建,交換機 hmall.topic,將兩個隊列與其綁定在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 topic.queue1 和 topic.queue2在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.topic發(fā)送消息

1. 在可視化頁面創(chuàng)建,隊列 topic.queue1 和 topic.queue2

2. 在可視化頁面創(chuàng)建,交換機 hmall.topic,將兩個隊列與其綁定

3. 在 consumer 服務(wù)中編寫,兩個消費者方法,分別監(jiān)聽 topic.queue1 和 topic.queue2

@RabbitListener(queues = "topic.queue1")

public void listenTopicQueue1(String msg) {

System.out.println("消費者 1 接收到 topic.queue1 的消息(china.#):【" + msg +"】");

}

@RabbitListener(queues = "topic.queue2")

public void listenTopicQueue2(String msg) {

System.out.println("消費者 2 接收到 topic.queue2 的消息(#.news):【" + msg +"】");

}

4. 在 publisher 服務(wù)中編寫,測試方法,利用不同的 RoutingKey 向 hmall.topic發(fā)送消息

@Test

void testTopicExchange() {

String exchangeName = "hmall.topic";

rabbitTemplate.convertAndSend(exchangeName,"china.weather","china.weather");

rabbitTemplate.convertAndSend(exchangeName,"china.news","china.news");

rabbitTemplate.convertAndSend(exchangeName,"japan.news","japan.news");

}

4.3.2 總結(jié)

描述下 Direct 交換機和 Topic 交換機的差異?

Topic 交換機接收的消息 RoutingKey 可以是多個單詞,以 . 分割Direct 交換機接收的消息 RoutingKey 是定死的Topic 交換機與隊列綁定時的 bindingKey 可以指定通配符#:代指 0 或 多個單詞*:代指一個單詞

5. java 聲明隊列和交換機

5.1 基于 bean 聲明

SpringAMQP 提供了幾個類,用來聲明隊列、交換機及其綁定關(guān)系:

Queue:用于聲明隊列,可以用工廠類 QueueBuilder 構(gòu)建Exchange:用于聲明交換機,可以用工廠類 ExchangeBuilder 構(gòu)建Binding:用于聲明隊列和交換機的綁定關(guān)系,可以用工廠類 BindingBuilder 構(gòu)建

Consumer 創(chuàng)建 config/FanoutConfiguration

// Consumer/config/FanoutConfiguration

@Configuration

public class FanoutConfiguration {

// 交換機創(chuàng)建

@Bean

public FanoutExchange fanoutExchange() {

// 兩種方式

// ExchangeBuilder.fanoutExchange("hmall.fanout2").build();

return new FanoutExchange("hmall.fanout2");

}

// 隊列創(chuàng)建

@Bean

public Queue fanoutQueue3() {

// 也可以使用 隊列工廠類創(chuàng)建隊列,其中 durable 代表其持久化

// QueueBuilder.durable("fanout2.queue3").build();

return new Queue("fanout2.queue3");

}

@Bean

public Queue fanoutQueue4() {

Queue queue4 = QueueBuilder.durable("fanout2.queue4").build();

return queue4;

}

// 綁定 binding

// 實現(xiàn)隊列和交換機的綁定

// 參數(shù):交換機,隊列

@Bean

public Binding fanoutBinding3(FanoutExchange fanoutExchange,Queue fanoutQueue3) {

// 綁定(bind) 隊列(Queue) 給(to) 交換機(exchange)

return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);

}

@Bean

public Binding fanoutBinding4() {

// 直接使用方法傳參

return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());

}

}

若想實現(xiàn) Direct 交換機創(chuàng)建,代碼參考如下

// Consumer/config/DirectConfiguration

@Configuration

public class DirectConfiguration {

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("hmall.direct2").build();

}

@Bean

public Queue directQueue1() {

return QueueBuilder.durable("direct2.queue1").build();

}

@Bean

public Queue directQueue2() {

return QueueBuilder.durable("direct2.queue2").build();

}

@Bean

public Binding directBinding1Red() {

return BindingBuilder.bind(directQueue1()).to(directExchange()).with("red");

}

@Bean

public Binding directBinding1Blue() {

return BindingBuilder.bind(directQueue1()).to(directExchange()).with("bule");

}

@Bean

public Binding directBinding2Red() {

return BindingBuilder.bind(directQueue2()).to(directExchange()).with("red");

}

@Bean

public Binding directBinding2Yellow() {

return BindingBuilder.bind(directQueue2()).to(directExchange()).with("yellow");

}

}

從以上代碼不難看出,創(chuàng)建 Direct 交換機和隊列代碼繁雜,因此,接下來提出基于注解的聲明。

5.2 基于注解的聲明

SpringAMQP 還提供了基于 @RabbitListener 注解來聲明隊列和交換機的方式

例如:使用 注解 創(chuàng)建上例交換機和隊列

ctrl + p:可用于提示參數(shù)列表

// Consumer/listeners/MqListener

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "direct3.queue1", durable = "true"),

exchange = @Exchange(name = "hmall.direct3", type = ExchangeTypes.DIRECT),

key = {"red","blue"}

))

public void listenDirectQueue3(String msg) {

System.out.println("消費者 1 收到了來自 " +

"交換機(hmall.direct3)中隊列(direct3.queue1)的消息:【" + msg + "】");

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "direct3.queue2",durable = "true"),

exchange = @Exchange(name = "hmall.direct3",type = "direct"),

key = {"red","yellow"}

))

public void listenDirectQueue4(String msg) {

System.out.println("消費者 2 收到了來自 " +

"交換機(hmall.direct3)中隊列(direct3.queue2)的消息:【" + msg + "】");

}

5.3 總結(jié)

聲明隊列、交換機、綁定關(guān)系的 Bean 是什么?

QueueFanoutExchange、DirectExchange、TopicExchangeBinding

基于 @RabbitListenner 注解創(chuàng)建隊列和交換機有哪些常見注解?

@Queue@Exchange

6. 消息轉(zhuǎn)換器

Spring 對消息對象的處理是基于 JDK 的ObjectOutputStream 完成序列化的。存在以下問題:

JDK 的序列化有安全風(fēng)險JDK 序列化的消息太大JDK 序列化的消息可讀性差

建議采用 JSON 序列化代替默認(rèn)的 JDK 序列化,要做兩件事:

在 publisher 和 consumer 中都要引入 jackson 依賴

com.fasterxml.jackson.core

jackson-databind

在 publisher 和 consumer 中都要配置 MessageConverter @Bean

public MessageConverter messageConverter() {

return new Jackson2JsonMessageConverter();

}

publisher 測試代碼

@Test

void testSendObject() {

Map msg = new HashMap<>(2);

msg.put("name","zhangsan");

msg.put("age",18);

rabbitTemplate.convertAndSend("object.queue",msg);

}

Consumer 接收消息

@RabbitListener(queues = "object.queue")

// 接收類型和傳送類型一致

public void listenObjectQueue(Map msg) {

System.out.println("消費者接收到 object.queue 的消息:【" + msg +"】");

}

柚子快報邀請碼778899分享:RabbitMQ 學(xué)習(xí)筆記

http://yzkb.51969.com/

文章來源

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19187981.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄