欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)邀請碼778899分享:分布式 RabbitMQ

柚子快報(bào)邀請碼778899分享:分布式 RabbitMQ

http://yzkb.51969.com/

一、初識(shí) MQ

1. 同步通訊

時(shí)效性強(qiáng),立即獲取結(jié)果

微服務(wù)間基于 Feign 的調(diào)用就屬于同步

方式,存在一些問題:

① 耦合度高

② 性能和吞吐能力不如異步

③ 額外資源消耗

④ 級(jí)聯(lián)失敗問題

2. 異步通訊

異步調(diào)用常見實(shí)現(xiàn)就是事件驅(qū)動(dòng)模式

優(yōu)點(diǎn):

① 服務(wù)解耦

② 性能提升,吞吐量提高

③ 服務(wù)沒有強(qiáng)依賴,不擔(dān)心級(jí)聯(lián)問題

④ 流量削峰

缺點(diǎn)

① 依賴 Broker(事件代理者) 的可靠性、

? ? 安全性、吞吐能力

② 架構(gòu)復(fù)雜的情況下,業(yè)務(wù)沒有明顯

? ? 的流程線,不好追蹤管理

3. MQ 常見框架

MQ (MessageQueue),中文是消息隊(duì)列,

字面來看就是存放消息的隊(duì)列,也就是事

件驅(qū)動(dòng)架構(gòu)中的 Broker

二、RabbitMQ 快速入門

1. RabbitMQ 概述

RabbitMQ 是基于 Erlang 語言開發(fā)的開源

消息通信中間件

官網(wǎng)地址:https://www.rabbitmq.com/

channel(通道):操作 MQ 的工具

exchange(交換機(jī)):路由消息到隊(duì)列中

queue(隊(duì)列):緩存消息

virtual host:虛擬主機(jī),是對 queue、

? ? ? ? ? ? ?exchange 等資源的邏輯分組

2. 常見消息模型

(1) 基本消息隊(duì)列(BasicQueue)

?1) 官方的 HelloWorld 是基于最基礎(chǔ)的

? ? 消息隊(duì)列模型來實(shí)現(xiàn)的,只包括三個(gè)

? ? 角色:

publisher:消息發(fā)布者,將消息發(fā)送到隊(duì)列

queue queue:消息隊(duì)列,負(fù)責(zé)接受并緩存消息

consumer:訂閱隊(duì)列,處理隊(duì)列中的消息

? 2) 基本消息隊(duì)列的消息發(fā)送流程:

① 建立 connection

② 創(chuàng)建 channel

③ 利用 channel 聲明隊(duì)列

④ 利用 channel 向隊(duì)列發(fā)送消息

? 3) 基本消息隊(duì)列的消息接收流程:

① 建立 connection

② 創(chuàng)建 channel

③ 利用 channel 聲明隊(duì)列

④ 定義 consumer 的消費(fèi)行為

? ??handleDelivery()

⑤ 利用 channel 將消費(fèi)者與隊(duì)列

? ? ?綁定?

(2) 工作消息隊(duì)列(WorkQueue)

可以提高消息處理速度,避免隊(duì)列消

息堆積?

(3) 發(fā)布訂閱(Publish、Subscribe)

允許將同一消息發(fā)送給多個(gè)消費(fèi)者

實(shí)現(xiàn)方式是加入了 exchange(交換機(jī))

注意:exchange 負(fù)責(zé)消息路由,而

? ? ? 不是存儲(chǔ),路由失敗則消息丟失

根據(jù)交換機(jī)類型不同分為三種:

1) Fanout Exchange:廣播

將接收到的消息廣播到每一個(gè)跟其

綁定的 queue

??

2) Direct Exchange:路由

將接收到的消息根據(jù)規(guī)則路由到指定

的 Queue,因此稱為路由模式(routes)

① 每一個(gè) Queue 都與 Exchange 設(shè)置

? ? 一個(gè) BindingKey

② 發(fā)布者發(fā)送消息時(shí),指定消息的

? ? RoutingKey

③ Exchange 將消息路由到 BindingKey

? ? 與消息 RoutingKey 一致的隊(duì)列

??

3) Topic Exchange:主題

TopicExchange 與 DirectExchange 類

似,區(qū)別在于 routingKey 必須是多個(gè)

單詞的列表,并且以 . 分割

Queue 與 Exchange 指定 BindingKey 時(shí)

可以使用通配符:

#:代指0個(gè)或多個(gè)單詞

*:代指一個(gè)單詞

三、SpringAMQP

SpringAmqp 的官方地址:

https://spring.io/projects/spring-amqp

AMQP,即 Advanced Message Queuing

Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用

層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,基于此協(xié)議的

客戶端與消息中間件可傳遞消息,并不受

客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言

等條件的限制

Spring AMQP 項(xiàng)目將核心 Spring 概念應(yīng)

用于基于 AMQP 的消息傳遞解決方案的

開發(fā)的一套 API 規(guī)范,它提供了一個(gè)模板

作為發(fā)送和接收消息的高級(jí)抽象

其中 spring-amqp 是基礎(chǔ)抽象,

spring-rabbit 是底層的默認(rèn)實(shí)現(xiàn)

1. 利用 SpringAMQP 實(shí)現(xiàn) HelloWorld 中的基

? ? 礎(chǔ)消息隊(duì)列功能

? (1) 在父工程中引入 spring-amqp 的依賴

org.springframework.boot

spring-boot-starter-amqp

? (2) 在 publisher 服務(wù)中利用 RabbitTemplate

? ? ? 發(fā)送消息到 simple.queue 這個(gè)隊(duì)列

? ?① 在 publisher 服務(wù)中編寫 application.yml,

? ? ? ?添加 mq 連接信息:

spring:

rabbitmq:

host: 192.168.150.110 # 主機(jī)名

port: 5672 # 端口

virtual-host: / # 虛擬主機(jī)

username: root

password: 123456

? ②?在 publisher 服務(wù)中新建一個(gè)測試類,

? ? ? ?編寫測試方法:

public class PublisherTest {

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

public void testSimpleQueue() {

String queueName = "simple.queue";

String message = "hello, spring amqp";

rabbitTemplate.convertAndSend(queueName, message);

}

}

(3) 在 consumer 服務(wù)中編寫消費(fèi)邏輯,綁定

? ? simple.queue 這個(gè)隊(duì)列?

? ① 在 consumer 服務(wù)中編寫 application.yml,

? ? ? 添加 mq 連接信息:

spring:

rabbitmq:

host: 192.168.150.110 # 主機(jī)名

port: 5672 # 端口

virtual-host: / # 虛擬主機(jī)

username: root

password: 123456

? ② 在 consumer 服務(wù)中新建一個(gè)類,編寫消

? ? ? 費(fèi)邏輯:?

@Component

public class SpringRabbitListener {

@RabbitListener(queues = {"simple.queue"})

public void listenSimpleQueue(String msg) {

System.out.println(msg);

}

}

注意:消息一旦消費(fèi)就會(huì)從隊(duì)列刪除,

? ? ? ? ? RabbitMQ 沒有消息回溯功能

2. 使用 WorkQueue 實(shí)現(xiàn)一個(gè)隊(duì)列綁定多個(gè)

? ? 消費(fèi)者

? (1) 生產(chǎn)者循環(huán)發(fā)送消息到 simple.queue

@Test

public void testSimpleQueue() throws InterruptedException {

String queueName = "simple.queue";

String message = "hello, spring amqp";

for (int i = 0; i < 50; i++) {

rabbitTemplate.convertAndSend(queueName, message + i);

Thread.sleep(20);

}

}

? (2) 編寫兩個(gè)消費(fèi)者,都監(jiān)聽 simple.queue

@Component

public class SpringRabbitListener {

@RabbitListener(queues = {"simple.queue"})

public void listenSimpleQueue1(String msg) throws InterruptedException {

System.out.println("消費(fèi)者1" + "【" + msg + "】" + LocalTime.now());

Thread.sleep(20);

}

@RabbitListener(queues = {"simple.queue"})

public void listenSimpleQueue2(String msg) throws InterruptedException {

System.err.println("消費(fèi)者2" + "【" + msg + "】" + LocalTime.now());

Thread.sleep(200);

}

}

AMQP 有一個(gè)消息預(yù)取機(jī)制,設(shè)置 preFetch

這個(gè)值,可以控制預(yù)取消息的上限:

spring:

rabbitmq:

host: 190.92.246.107 # 主機(jī)名

port: 5672 # 端口

virtual-host: / # 虛擬主機(jī)

username: root

password: 123456

listener:

simple:

prefetch: 1

3.?FanoutExchange 的使用

(1) 在 consumer 服務(wù)中,利用代碼聲明隊(duì)

? ? ?列、交換機(jī),并將兩者綁定

@Configuration

public class FanoutConfig {

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("root.fanout");

}

@Bean

public Queue fanoutQueue1() {

return new Queue("fanout.queue1");

}

@Bean

public Queue fanoutQueue2() {

return new Queue("fanout.queue2");

}

@Bean

public Binding bindQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);

}

@Bean

public Binding bindQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);

}

}

(2) 在 consumer 服務(wù)中,編寫兩個(gè)消費(fèi)者

? ? ?方法,分別監(jiān)聽 fanout.queue1 和

? ? ?fanout.queue2

@RabbitListener(queues = {"fanout.queue1"})

public void listenFanoutQueue1(String msg) throws InterruptedException {

System.out.println("fanout.queue1消費(fèi)者" + "【" + msg + "】" + LocalTime.now());

}

@RabbitListener(queues = {"fanout.queue2"})

public void listenFanoutQueue2(String msg) throws InterruptedException {

System.err.println("fanout.queue2消費(fèi)者" + "【" + msg + "】" + LocalTime.now());

}

(3) 在 publisher 中編寫測試方法,向

? ? ?itcast.fanout 發(fā)送消息

@Test

public void testSendFanoutExchange() {

String exchangeName = "root.fanout";

String message = "hello everyone";

rabbitTemplate.convertAndSend(exchangeName, "", message);

}

交換機(jī)的作用:

① 接收 publisher 發(fā)送的消息

② 將消息按照規(guī)則路由到與之綁定的

? ? 隊(duì)列

③ 不能緩存消息,路由失敗,消息丟

? ? 失

④ FanoutExchange 的會(huì)將消息路由

? ? 到每個(gè)綁定的隊(duì)列

聲明隊(duì)列、交換機(jī)、綁定關(guān)系的 Bean:

Queue

FanoutExchange

Binding

4.?DirectExchange 的使用?

(1) 利用 @RabbitListener 聲明 Exchange、

? ? ?Queue、RoutingKey

(2) 在 consumer 服務(wù)中,編寫兩個(gè)消費(fèi)者

? ? ?方法,分別監(jiān)聽 direct.queue1 和

? ? ?direct.queue2

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "direct.queue1"),

exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),

key = {

"blue",

"red"

}

))

public void listenDirectQueue1(String msg) {

System.err.println("direct.queue1消費(fèi)者" + "【" + msg + "】" + LocalTime.now());

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "direct.queue2"),

exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),

key = {

"yellow",

"red"

}

))

public void listenDirectQueue2(String msg) {

System.err.println("direct.queue2消費(fèi)者" + "【" + msg + "】" + LocalTime.now());

}

(3) 在 publisher 中編寫測試方法,向 itcast.

? ? ?direct發(fā)送消息

@Test

public void testSendDirectExchange() {

String exchangeName = "root.direct";

String message = "hello red";

rabbitTemplate.convertAndSend(exchangeName, "red", message);

}

Direct 交換機(jī)與 Fanout 交換機(jī)的差異:

① Fanout 交換機(jī)將消息路由給每一個(gè)

? ? 與之綁定的隊(duì)列

② Direct 交換機(jī)根據(jù) RoutingKey 判斷

? ? 路由給哪個(gè)隊(duì)列

③ 如果多個(gè)隊(duì)列具有相同的 RoutingKey,

? ? 則與 Fanout 功能類似

5.?TopicExchange 的使用

(1) 利用 @RabbitListene r聲明 Exchange、

? ? ?Queue、RoutingKey

(2) 在 consumer 服務(wù)中,編寫兩個(gè)消費(fèi)者方

? ? 法,分別監(jiān)聽topic.queue1和topic.queue2

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "topic.queue1"),

exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),

key = {

"china.#"

}

))

public void listenTopicQueue1(String msg) {

System.err.println("topic.queue1消費(fèi)者" + "【" + msg + "】" + LocalTime.now());

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "topic.queue2"),

exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),

key = {

"#.news"

}

))

public void listenTopicQueue2(String msg) {

System.err.println("topic.queue2消費(fèi)者" + "【" + msg + "】" + LocalTime.now());

}

(3) 在 publisher 中編寫測試方法,向 itcast.

? ? ?topic 發(fā)送消息

@Test

public void testSendTopicExchange() {

String exchangeName = "root.topic";

String message = "hello world";

rabbitTemplate.convertAndSend(exchangeName, "china.news", message);

}

6. 消息轉(zhuǎn)換器

在 SpringAMQP 的發(fā)送方法中,接收消息

的類型是 Object,也就是說我們可以發(fā)送

任意對象類型的消息,SpringAMQP 會(huì)幫

我們序列化為字節(jié)后發(fā)送

SpringAMQP 中消息的序列化和反序列化

是利用 MessageConverter 實(shí)現(xiàn)的,默認(rèn)

是 JDK 的序列化,其中發(fā)送方與接收方必

須使用相同的 MessageConverter

推薦用 JSON 方式序列化:

① 先在 publisher 服務(wù)引入依賴

com.fasterxml.jackson.core

jackson-databind

② 在 publisher 服務(wù)聲明 MessageConverter

@Bean

public MessageConverter messageConverter() {

return new Jackson2JsonMessageConverter();

}

③?在 consumer 服務(wù)引入 Jackson 依賴:

com.fasterxml.jackson.core

jackson-databind

④?在 consumer 服務(wù)定義 MessageConverter:

@Bean

public MessageConverter messageConverter() {

return new Jackson2JsonMessageConverter();

}

⑤?定義一個(gè)消費(fèi)者,監(jiān)聽 object.queue 隊(duì)列

? ? 并消費(fèi)消息:

@RabbitListener(queues = "object.queue")

public void listenObjectQueue(Map msg) {

System.out.println("收到消息:【" + msg + "】");

}

柚子快報(bào)邀請碼778899分享:分布式 RabbitMQ

http://yzkb.51969.com/

參考文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18908394.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄