欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記

柚子快報(bào)激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記

http://yzkb.51969.com/

學(xué)習(xí)視頻:動(dòng)力節(jié)點(diǎn)RabbitMQ教程|12小時(shí)學(xué)會(huì)rabbitmq消息中間件_嗶哩嗶哩_bilibili

一、RabbitMQ 運(yùn)行環(huán)境搭建

RabbitMQ 是使用 Erlang 語言開發(fā)的,所以要先下載安裝 Erlang

下載時(shí)一定要注意版本兼容性:RabbitMQ Erlang 版本要求 — 兔子MQ

二、啟動(dòng)及停止 RabbitMQ

1、啟動(dòng) RabbitMQ

進(jìn)入到安裝目錄的 sbin 目錄下

# -detached 表示在后臺(tái)啟動(dòng)運(yùn)行 rabbitmq, 不加該參數(shù)表示前臺(tái)啟動(dòng)

# rabbitmq 的運(yùn)行日志存放在安裝目錄的 var 目錄下

# 啟動(dòng)

./rabbitmq-server -detached

2、查看 RabbitMQ 狀態(tài)

進(jìn)入到安裝目錄的 sbin 目錄下

# -n rabbit 是指定節(jié)點(diǎn)名稱為 rabbit,目前只有一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)名默認(rèn)為 rabbit

# 此處 -n rabbit 也可以省略

# 查看狀態(tài)

./rabbitmqctl -n rabbit status

3、停止 RabbitMQ

進(jìn)入到安裝目錄的 sbin 目錄下

# 停止

./rabbitmqctl shutdown

4、配置 path 環(huán)境變量

打開配置文件

vim /etc/profile

進(jìn)行配置

RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11

PATH=$PATH:$RABBIT_HOME/sbin

export RABBIT_HOME PATH

刷新環(huán)境變量

source /etc/profile

三、RabbitMQ 管理命令

./rabbitmqctl 是一個(gè)管理命令,可以管理 rabbitmq 的很多操作

./rabbitmqctl help 可以查看有哪些操作

查看具體子命令,可以使用 ./rabbitmqctl help 子命令名稱

1、用戶管理

用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。

這些操作都是通過 rabbitmqct 管理命令來實(shí)現(xiàn)完成

查看幫助:rabbitmqct add_user --help

查看當(dāng)前用戶列表

rabbitmqctl list_users

新增一個(gè)用戶

# 語法:rabbitmqctl add_user Username Password

rabbitmqctl add_user admin 123456

2、設(shè)置用戶角色

設(shè)置用戶角色

# 語法:rabbitmqctl set_user_tags User Tag

# 這里設(shè)置用戶的角色為管理員角色

rabbitmqctl set_user_tags admin administrator

3、設(shè)置用戶權(quán)限

設(shè)置用戶權(quán)限

# 說明:此操作設(shè)置了 admin 用戶擁有操作虛擬主機(jī)/下的所以權(quán)限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

四、web 管理后臺(tái)

Rabbitmq 有一個(gè) web 管理后臺(tái),這個(gè)管理后臺(tái)是以插件的方式提供的,啟動(dòng)后臺(tái) web 管理功能,需要切換到安裝目錄的 sbin 目錄下進(jìn)行操作

1、啟用管理后臺(tái)

# 查看 rabbitmq 的插件列表

./rabbitmq-plugins list

# 啟用

./rabbitmq-plugins enable rabbitmq_management

# 禁用

./rabbitmq-plugins disable rabbitmq_management

2、訪問管理后臺(tái)

訪問時(shí)需要檢查虛擬機(jī)的防火 墻

使用:http://你的虛擬機(jī)ip:15672?就可以訪問了

注意:如果使用默認(rèn)用戶 guest,密碼 guest 登錄,會(huì)提示 User can only log in via localhost,說明 guest 用戶只能從 localhost 本機(jī)登錄,所以不要使用該用戶

3、新建虛擬主機(jī)

新建主機(jī)

建完后如下

五、RabbitMQ 工作模型

broker 相當(dāng)于 mysql 服務(wù)器,virtual host 相當(dāng)于數(shù)據(jù)庫(可以有多個(gè)數(shù)據(jù)庫)

queue 相當(dāng)于表,消息相當(dāng)于記錄

消息隊(duì)列有三個(gè)核心要素:消息生產(chǎn)者、消息隊(duì)列、消息消費(fèi)者

生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用;消費(fèi)者(Consumer):接收消息的應(yīng)用;

代理(Broker):就是消息服務(wù)器,RabbitMQ Server 就是 Message Broker

鏈接(Connection):鏈接 RabbitMQ 服務(wù)器的 TCP 長(zhǎng)連接

信道(Channel):鏈接中的一個(gè)虛擬通道,消息隊(duì)列發(fā)送或者接收消息時(shí),都是通過信道進(jìn)行的

虛擬主機(jī)(Virtual host):一個(gè)虛擬分組,在代碼中就是一個(gè)字符串,當(dāng)多個(gè)不同的用戶使用同一個(gè) RabbitMQ 服務(wù)時(shí),可以劃分出多個(gè) Virtual host,每個(gè)用戶在自己的 Virtual host 創(chuàng)建 exchange/queue 等(分類比較清晰、相互隔離)

交換機(jī)(Exchange):交換機(jī)負(fù)責(zé)從生產(chǎn)者接收消息,并根據(jù)交換機(jī)類型分發(fā)到對(duì)應(yīng)的消息隊(duì)列中,起到一個(gè)路由的作用

路由鍵(Routing Key):交換機(jī)根據(jù)路由鍵來決定消息分發(fā)到那個(gè)隊(duì)列,路由鍵是消息的目的地址

綁定(Binding):綁定是隊(duì)列與交換機(jī)的一個(gè)關(guān)聯(lián)鏈接(關(guān)聯(lián)關(guān)系)

隊(duì)列(Queue):存儲(chǔ)消息的緩存

消息(Message):由生產(chǎn)者通過 RabbitMQ 發(fā)送給消費(fèi)者的信息(消息可以是任何數(shù)據(jù),字符串、user 對(duì)象、json 串等)

六、RabbitMQ 交換機(jī)類型

Exchange(X)可翻譯為交換機(jī)/交換器/路由器,類型有以下幾種:

Fanout Exchange(扇形)Direct Exchange(直連)Topic Exchange(主題)Headers Exchange(頭部)

1、Fanout Exchange

1.1、介紹

Fanout 扇形,散開的;扇形交換機(jī)

投遞到所有綁定的隊(duì)列,不需要路由鍵,不需要進(jìn)行路由鍵的匹配,相當(dāng)于廣播、群發(fā)

P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列

1.2、示例

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

/*

rabbitmq三部曲

1.定義交換機(jī)

2.定義隊(duì)列

3.綁定交換機(jī)和隊(duì)列

*/

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("exchange.fanout");

}

// 2.定義隊(duì)列

@Bean

public Queue queueA() {

return new Queue("queue.fanout.a");

}

@Bean

public Queue queueB() {

return new Queue("queue.fanout.b");

}

// 3.綁定交換機(jī)和隊(duì)列

@Bean

public Binding bindingA(FanoutExchange fanoutExchange, Queue queueA) {

// 將隊(duì)列A綁定到扇形交換機(jī)

return BindingBuilder.bind(queueA).to(fanoutExchange);

}

@Bean

public Binding bindingB(FanoutExchange fanoutExchange, Queue queueB) {

// 將隊(duì)列B綁定到扇形交換機(jī)

return BindingBuilder.bind(queueB).to(fanoutExchange);

}

}

發(fā)送消息

@Component

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 定義要發(fā)送的消息

String msg = "hello world";

// 轉(zhuǎn)換并且發(fā)送

Message message = new Message(msg.getBytes());

rabbitTemplate.convertAndSend("exchange.fanout", "", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.fanout.a", "queue.fanout.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

2、Direct Exchange

2.1、介紹

根據(jù)路由鍵精確匹配(一摸一樣)進(jìn)行路由消息隊(duì)列

P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列

2.2、示例

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.direct").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queueA() {

return QueueBuilder.durable("queue.direct.a").build();

}

@Bean

public Queue queueB() {

return QueueBuilder.durable("queue.direct.b").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingA(DirectExchange directExchange, Queue queueA) {

return BindingBuilder.bind(queueA).to(directExchange).with("error");

}

@Bean

public Binding bindingB1(DirectExchange directExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(directExchange).with("info");

}

@Bean

public Binding bindingB2(DirectExchange directExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(directExchange).with("error");

}

@Bean

public Binding bindingB3(DirectExchange directExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(directExchange).with("warning");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.direct", "info", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.direct.a", "queue.direct.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

3、Topic Exchange

3.1、介紹

通配符匹配,相當(dāng)于模糊匹配

# 匹配多個(gè)單詞,用來表示任意數(shù)量(零個(gè)或多個(gè))單詞* 匹配一個(gè)單詞(必須有一個(gè),而且只有一個(gè)),用 . 隔開的為一個(gè)單詞舉例

beijing.# = beijing.queue.abc,beijing.queue.xyz.xxxbeijing.* = beijing.queue,beijing.xyz

發(fā)送時(shí)指定的路由鍵:lazy.orange.rabbit

P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列

3.2、示例

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public TopicExchange topicExchange() {

return ExchangeBuilder.topicExchange("exchange.topic").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queueA() {

return QueueBuilder.durable("queue.topic.a").build();

}

@Bean

public Queue queueB() {

return QueueBuilder.durable("queue.topic.b").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingA(TopicExchange topicExchange, Queue queueA) {

return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");

}

@Bean

public Binding bindingB1(TopicExchange topicExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");

}

@Bean

public Binding bindingB2(TopicExchange topicExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate; // 用RabbitTemplate也可以

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.topic", "lazy.orange.rabbit", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.topic.a", "queue.topic.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

4、Headers Exchange

4.1、介紹

用的比較少

基于消息內(nèi)容中的 headers 屬性進(jìn)行匹配

P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列

4.2、示例

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public HeadersExchange headersExchange() {

return ExchangeBuilder.headersExchange("exchange.headers").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queueA() {

return QueueBuilder.durable("queue.headers.a").build();

}

@Bean

public Queue queueB() {

return QueueBuilder.durable("queue.headers.b").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingA(HeadersExchange headersExchange, Queue queueA) {

Map headerValues = new HashMap<>();

headerValues.put("type", "m");

headerValues.put("status", 1);

return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();

}

@Bean

public Binding bindingB(HeadersExchange headersExchange, Queue queueB) {

Map headerValues = new HashMap<>();

headerValues.put("type", "s");

headerValues.put("status", 0);

return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 消息屬性

MessageProperties messageProperties = new MessageProperties();

Map headers = new HashMap<>();

headers.put("type", "s");

headers.put("status", 0);

// 設(shè)置消息頭

messageProperties.setHeaders(headers);

// 添加了消息屬性

Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();

// 對(duì)于頭部交換機(jī),路由key無所謂(不需要)

rabbitTemplate.convertAndSend("exchange.headers", "", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.headers.a", "queue.headers.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

七、RabbitMQ 過期時(shí)間

過期時(shí)間也叫 TTL 消息,TTL:Time To Live

消息的過期時(shí)間有兩種設(shè)置方式:(過期消息)

1、設(shè)置單條消息的過期時(shí)間

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.direct").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.ttl").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding binding(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("15000"); // 過期的毫秒數(shù)

Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();

rabbitTemplate.convertAndSend("exchange.direct", "info", message);

}

}

2、隊(duì)列屬性設(shè)置消息過期時(shí)間

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.direct").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queue() {

// 設(shè)置消息過期時(shí)間

Map arguments = new HashMap<>();

arguments.put("x-message-ttl", 15000); // 15秒

// 方式1

return new Queue("queue.ttl", true, false, false, arguments);

// 方式2

// return QueueBuilder.durable("queue.ttl").withArguments(arguments).build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding binding(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.direct", "info", message);

}

}

3、注意

如果消息和隊(duì)列都設(shè)置了過期時(shí)間,則消息的 TTL 以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。

八、死信隊(duì)列

也有叫死信交換機(jī)、死信郵箱等說法

DLX:Dead-Letter-Exchange 死信交換機(jī),死信郵箱

注意:圖中的 3.理由key 改為 路由key

以下情況下一個(gè)消息會(huì)進(jìn)入 DLX(Dead Letter Exchange)死信交換機(jī)。

1、消息過期

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機(jī)

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.b").build();

}

// 正常隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.b")

.deadLetterExchange("exchange.dlx.b") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣

.build();

}

// 綁定交換機(jī)和隊(duì)列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機(jī)

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.b").build();

}

// 死信隊(duì)列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.b").build();

}

// 綁定交換機(jī)和隊(duì)列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 消息屬性

MessageProperties messageProperties = new MessageProperties();

// 設(shè)置單條消息過期時(shí)間,單位為毫秒

messageProperties.setExpiration("15000");

Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();

// 對(duì)于頭部交換機(jī),路由key無所謂(不需要)

rabbitTemplate.convertAndSend("exchange.normal.b", "order", message);

}

}

2、隊(duì)列過期

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機(jī)

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.a").build();

}

// 正常隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.a")

.ttl(15000) // 過期時(shí)間 15秒

.deadLetterExchange("exchange.dlx.a") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣

.build();

}

// 綁定交換機(jī)和隊(duì)列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機(jī)

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.a").build();

}

// 死信隊(duì)列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.a").build();

}

// 綁定交換機(jī)和隊(duì)列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.normal.a", "order", message);

}

}

3、隊(duì)列達(dá)到最大長(zhǎng)度

??添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機(jī)

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.c").build();

}

// 正常隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.c")

.deadLetterExchange("exchange.dlx.c") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣

.maxLength(5) // 設(shè)置隊(duì)列最大長(zhǎng)度

.build();

}

// 綁定交換機(jī)和隊(duì)列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機(jī)

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.c").build();

}

// 死信隊(duì)列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.c").build();

}

// 綁定交換機(jī)和隊(duì)列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

for (int i = 1; i <= 10; i++) {

String str = "hello world" + i;

Message message = MessageBuilder.withBody(str.getBytes()).build();

// 對(duì)于頭部交換機(jī),路由key無所謂(不需要)

rabbitTemplate.convertAndSend("exchange.normal.c", "order", message);

}

}

}

4、消費(fèi)者拒絕消息不進(jìn)行重新投遞

從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列

??添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

listener:

simple:

acknowledge-mode: manual # 啟動(dòng)手動(dòng)確認(rèn)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機(jī)

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.d").build();

}

// 正常隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.d")

.deadLetterExchange("exchange.dlx.d") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣

.build();

}

// 綁定交換機(jī)和隊(duì)列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機(jī)

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.d").build();

}

// 死信隊(duì)列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.d").build();

}

// 綁定交換機(jī)和隊(duì)列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

String str = "hello world";

Message message = MessageBuilder.withBody(str.getBytes()).build();

rabbitTemplate.convertAndSend("exchange.normal.d", "order", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.normal.d"})

public void receiveMsg(Message message, Channel channel) {

// 獲取消息屬性

MessageProperties messageProperties = message.getMessageProperties();

// 獲取消息的唯一標(biāo)識(shí),類似人的身份證號(hào)

long deliveryTag = messageProperties.getDeliveryTag();

try {

// 手動(dòng)加一段錯(cuò)誤代碼

int i = 1 / 0;

byte[] body = message.getBody();

String str = new String(body);

System.out.println("接收到的消息為: " + str);

// 消費(fèi)者的手動(dòng)確認(rèn)

// multiple為false,只確認(rèn)當(dāng)前消息,改為true是確認(rèn)當(dāng)前消息以前的消息

// 確認(rèn)后服務(wù)器就可以刪了

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

try {

// 接收者出現(xiàn)問題

// multiple為false,只確認(rèn)當(dāng)前消息,改為true是確認(rèn)當(dāng)前消息以前的消息

// requeue為true,表示重新入隊(duì),為false表示不重新入隊(duì)

// channel.basicNack(deliveryTag, false, true);

// requeue改為false,不重新入隊(duì),這時(shí)就會(huì)進(jìn)入死信隊(duì)列

channel.basicNack(deliveryTag, false, false);

} catch (IOException ex) {

throw new RuntimeException(ex);

}

throw new RuntimeException(e);

}

}

}

5、消費(fèi)者拒絕消息

開啟手動(dòng)確認(rèn)模式,并拒絕消息,不重新投遞,則進(jìn)入死信隊(duì)列

??添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

listener:

simple:

acknowledge-mode: manual # 啟動(dòng)手動(dòng)確認(rèn)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機(jī)

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.e").build();

}

// 正常隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.e")

.deadLetterExchange("exchange.dlx.e") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣

.build();

}

// 綁定交換機(jī)和隊(duì)列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機(jī)

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.e").build();

}

// 死信隊(duì)列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.e").build();

}

// 綁定交換機(jī)和隊(duì)列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

String str = "hello world";

Message message = MessageBuilder.withBody(str.getBytes()).build();

rabbitTemplate.convertAndSend("exchange.normal.e", "order", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.normal.e"})

public void receiveMsg(Message message, Channel channel) throws IOException {

// 獲取消息屬性

MessageProperties messageProperties = message.getMessageProperties();

// 獲取消息的唯一標(biāo)識(shí),類似人的身份證號(hào)

long deliveryTag = messageProperties.getDeliveryTag();

// 拒絕消息

// 第一個(gè)參數(shù)是消息的唯一標(biāo)識(shí)

// 第二個(gè)參數(shù)是是否重新入隊(duì)

channel.basicReject(deliveryTag, false);

}

}

九、延遲隊(duì)列

場(chǎng)景:有一個(gè)訂單,15 分鐘內(nèi)如果不支付,就把該訂單設(shè)置為交易關(guān)閉,那么就不能支付了,這類實(shí)現(xiàn)延遲任務(wù)的場(chǎng)景就可以采用延遲隊(duì)列來實(shí)現(xiàn),當(dāng)然除了延遲隊(duì)列來實(shí)現(xiàn),也可以有一些其他方法實(shí)現(xiàn);

1、采用消息中間件

RabbitMQ 本身不支持延遲隊(duì)列,可以使用 TTL 結(jié)合 DLX 的方式來實(shí)現(xiàn)消息的延遲投遞,即把 DLX 跟某個(gè)隊(duì)列綁定,到了指定時(shí)間,消息過期后,就會(huì)從 DLX 路由到這個(gè)隊(duì)列,消費(fèi)者可以從這個(gè)隊(duì)列取走消息?

代碼:正常延遲

???添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)(一機(jī)兩用,正常交換機(jī)和死信交換機(jī))

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.delay.a").build();

}

// 2.定義隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.delay.normal.a")

.ttl(25000) // 過期時(shí)間25秒

.deadLetterExchange("exchange.delay.a") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 死信路由key

.build();

}

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.delay.dlx.a").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingNormal(DirectExchange directExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(directExchange).with("order");

}

@Bean

public Binding bindingDlx(DirectExchange directExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(directExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.delay.a", "order", message);

}

}

問題:如果先發(fā)送的消息,消息延遲時(shí)間長(zhǎng),會(huì)影響后面的延遲時(shí)間段的消息的消費(fèi)

解決:不同延遲時(shí)間的消息要發(fā)到不同的隊(duì)列上,同一個(gè)隊(duì)列的消息,它的延遲時(shí)間應(yīng)該一樣

?代碼:解決延遲問題

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)(一機(jī)兩用,正常交換機(jī)和死信交換機(jī))

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.delay").build();

}

// 2.定義隊(duì)列

// 正常的訂單隊(duì)列

@Bean

public Queue normalOrderQueue() {

return QueueBuilder.durable("queue.delay.normal.order")

.deadLetterExchange("exchange.delay") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 死信路由key

.build();

}

// 正常的支付隊(duì)列

@Bean

public Queue normalPayQueue() {

return QueueBuilder.durable("queue.delay.normal.pay")

.deadLetterExchange("exchange.delay") // 設(shè)置死信交換機(jī)

.deadLetterRoutingKey("error") // 死信路由key

.build();

}

// 死信隊(duì)列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.delay.dlx").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

// 綁定正常的訂單隊(duì)列

@Bean

public Binding bindingNormalOrderQueue(DirectExchange directExchange, Queue normalOrderQueue) {

return BindingBuilder.bind(normalOrderQueue).to(directExchange).with("order");

}

// 綁定正常的支付隊(duì)列

@Bean

public Binding bindingNormalPayQueue(DirectExchange directExchange, Queue normalPayQueue) {

return BindingBuilder.bind(normalPayQueue).to(directExchange).with("pay");

}

// 綁定死信隊(duì)列

@Bean

public Binding bindingDlx(DirectExchange directExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(directExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 第一條消息

Message orderMsg = MessageBuilder.withBody("這是一條訂單消息 20秒過期 ".getBytes()).setExpiration("20000").build();

// 第二條消息

Message payMsg = MessageBuilder.withBody("這是一條支付消息 10秒過期 ".getBytes()).setExpiration("10000").build();

rabbitTemplate.convertAndSend("exchange.delay", "order", orderMsg);

System.out.println("訂單消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

rabbitTemplate.convertAndSend("exchange.delay", "pay", payMsg);

System.out.println("支付消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.delay.dlx"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收的消息是: " + msg + "接收的時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

2、使用延遲插件

使用 rabbitmq-delayed-message-exchange 延遲插件

下載

選擇對(duì)應(yīng)的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:

Community Plugins — RabbitMQ

將插件拷貝到 RabbitMQ 服務(wù)器 plugins 目錄下解壓

// 如果 unzip 沒有安裝,先安裝一下

// yum install unzip -y

unzip rabbitmq_delayed_message_exchange-3.10.2.ez

啟用插件

// 開啟插件

./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重啟 rabbitmq 使其生效(此處也可以不重啟)

消息發(fā)送后不會(huì)直接投遞到隊(duì)列,而是存儲(chǔ)到 Mnesia(嵌入式數(shù)據(jù)庫),檢查 x-delay 時(shí)間(消息頭部);

延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運(yùn)行環(huán)境;

Mnesia 是一個(gè)小型數(shù)據(jù)庫,不適合于大量延遲消息的實(shí)現(xiàn)解決了消息過期時(shí)間不一致出現(xiàn)的問題

代碼實(shí)現(xiàn)

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public CustomExchange customExchange() {

Map arguments = new HashMap<>();

arguments.put("x-delayed-type", "direct"); // 放一個(gè)參數(shù)

return new CustomExchange("exchange.elay.b", "x-delayed-message", true, false, arguments);

}

// 2.定義隊(duì)列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.delay.b").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingNormalOrderQueue(CustomExchange customExchange, Queue queue) {

// 綁定,

return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 第一條消息

MessageProperties messageProperties1 = new MessageProperties();

messageProperties1.setHeader("x-delay", 25000); // 設(shè)置延遲消息

Message message1 = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties1).build();

// 第二條消息

MessageProperties messageProperties2 = new MessageProperties();

messageProperties2.setHeader("x-delay", 15000); // 設(shè)置延遲消息

Message message2 = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties2).build();

// 發(fā)送消息

rabbitTemplate.convertAndSend("exchange.elay.b", "plugin", message1);

System.out.println("訂單消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

rabbitTemplate.convertAndSend("exchange.elay.b", "plugin", message2);

System.out.println("支付消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.delay.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收的消息是: " + msg + "接收的時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

十、消息 Confirm 模式

消息的 confirm 確認(rèn)機(jī)制,是指生產(chǎn)者投遞消息后,到達(dá)了消息服務(wù)器 Broker 里面的 exchange 交換機(jī),則會(huì)給生產(chǎn)者一個(gè)應(yīng)答,生產(chǎn)者接收到應(yīng)答,用來確定這條消息是否正常的發(fā)送到 Broker 的 exchange 中,這也是消息可靠性投遞的重要保障;

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.confirm").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.confirm").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingA(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

@PostConstruct // 構(gòu)造方法后執(zhí)行它,相當(dāng)于初始化的作用

public void init() {

// 第一個(gè)參數(shù): 關(guān)聯(lián)數(shù)據(jù)

// 第二個(gè)參數(shù): 是否到達(dá)交換機(jī)

// 第三個(gè)參數(shù): 原因

rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {

// 打印一下關(guān)聯(lián)數(shù)據(jù)

System.out.println("關(guān)聯(lián)數(shù)據(jù): " + correlationData);

if (ack) {

System.out.println("消息正確到達(dá)交換機(jī)");

}

if (!ack) {

System.out.println("消息沒有到達(dá)交換機(jī),原因: " + cause);

}

});

}

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

CorrelationData correlationData = new CorrelationData(); // 關(guān)聯(lián)數(shù)據(jù)

correlationData.setId("order_123456");

rabbitTemplate.convertAndSend("exchange.confirm", "info", message, correlationData);

}

}

十一、消息 Return 模式

rabbitmq 整個(gè)消息投遞的路徑為:

producer —> exchange —> queue —> consumer

消息從 producer 到 exchange 則會(huì)返回一個(gè) confirmCallback消息從 exchange -> queue 投遞失敗則會(huì)返回一個(gè) returnCallback

我們可以利用這兩個(gè) callback 控制消息的可靠性傳遞

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

publisher-returns: true # 開啟return模式

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.return").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.return").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingA(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

@PostConstruct // 構(gòu)造方法后執(zhí)行它,相當(dāng)于初始化的作用

public void init() {

rabbitTemplate.setReturnsCallback(message -> {

System.out.println("消息從交換機(jī)沒有正確的路由到(投遞到)隊(duì)列,原因: " + message.getReplyText());

});

}

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

CorrelationData correlationData = new CorrelationData(); // 關(guān)聯(lián)數(shù)據(jù)

correlationData.setId("order_654321");

// 發(fā)送正確不會(huì)回調(diào),只有發(fā)送錯(cuò)誤才會(huì)回調(diào)

rabbitTemplate.convertAndSend("exchange.return", "info", message, correlationData);

}

}

十二、交換機(jī)詳細(xì)屬性

Name:交換機(jī)名稱;就是一個(gè)字符串Type:交換機(jī)類型,direct、topic、fanout、headers 四種Durability:持久化,聲明交換機(jī)是否持久化,代表交換機(jī)在服務(wù)器重啟后是否還存在Auto delete:是否自動(dòng)刪除,曾經(jīng)有隊(duì)列綁定到該交換機(jī),后來解綁了,那就會(huì)自動(dòng)刪除該交換機(jī)Internal:內(nèi)部使用的,如果是 yes,客戶端無法直接發(fā)消息到此交換機(jī),他只能用于交換機(jī)與交換機(jī)的綁定(用的很少)Arguments:只有一個(gè)取值 alternate-exchange,表示備用交換機(jī),當(dāng)正常交換機(jī)的消息發(fā)送不到正常隊(duì)列時(shí),消息就會(huì)往備用交換機(jī)里面發(fā)

添加依賴

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

/*

return ExchangeBuilder

.directExchange("exchange.properties.1") // 交換機(jī)名字

.durable(false) // 是否持久化,一般都是持久化

.autoDelete() // 設(shè)置自動(dòng)刪除(當(dāng)隊(duì)列跟他解綁后是否自動(dòng)刪除),一般不是自動(dòng)刪除

.alternate("") // 設(shè)置備用交換機(jī)名字

.build();

*/

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

// 正常交換機(jī)

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.

directExchange("exchange.normal.1")

.alternate("exchange.alternate.1") // 設(shè)置備用交換機(jī)

.build();

}

// 備用交換機(jī)

@Bean

public FanoutExchange alternateExchange() {

return ExchangeBuilder.fanoutExchange("exchange.alternate.1").build();

}

// 2.定義隊(duì)列

// 正常隊(duì)列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.1").build();

}

// 備用隊(duì)列

@Bean

public Queue alternateQueue() {

return QueueBuilder.durable("queue.alternate.1").build();

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

// 正常交換機(jī)與正常隊(duì)列綁定

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("info");

}

// 備用交換機(jī)與備用隊(duì)列綁定

@Bean

public Binding bindingAlternate(FanoutExchange alternateExchange, Queue alternateQueue) {

return BindingBuilder.bind(alternateQueue).to(alternateExchange);

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

// 發(fā)送正確不會(huì)回調(diào),只有發(fā)送錯(cuò)誤才會(huì)回調(diào)

rabbitTemplate.convertAndSend("exchange.normal.1", "error", message);

}

}

十三、隊(duì)列詳細(xì)屬性

Type:隊(duì)列類型,一般是 ClassicName:隊(duì)列名稱,就是一個(gè)字符串,隨便一個(gè)字符串就可以Durability:聲明隊(duì)列是否持久化,代表隊(duì)列在服務(wù)器重啟后是否還存在Auto delete:是否自動(dòng)刪除,如果為 true,當(dāng)沒有消費(fèi)者連接到這個(gè)隊(duì)列的時(shí)候,隊(duì)列會(huì)自動(dòng)刪除Exclusive:exclusive 屬性的隊(duì)列只對(duì)首次聲明它的連接可見,并且在連接斷開時(shí)自動(dòng)刪除;基本不設(shè)置它,設(shè)置成 falseArguments:隊(duì)列的其他屬性,例如指定 DLX(死信交換機(jī)等)

1. x-expires:Number

當(dāng) Queue(隊(duì)列)在指定的時(shí)間未被訪問,則隊(duì)列將被自動(dòng)刪除

2.?x-message-ttl:Number

發(fā)布的消息在隊(duì)列中存在多長(zhǎng)時(shí)間后被取消(單位毫秒)

3.?x-overflow:String

設(shè)置隊(duì)列溢出行為,當(dāng)達(dá)到隊(duì)列的最大長(zhǎng)度時(shí),消息會(huì)發(fā)生什么,有效值為?Drop Head?或?Reject Publish

4. x-max-length:Number

隊(duì)列所能容下消息的最大長(zhǎng)度,當(dāng)超出長(zhǎng)度后,新消息將會(huì)覆蓋最前面的消息,類似于Redis的LRU算法

5. x-single-active-consumer:默認(rèn)為false

激活單一的消費(fèi)者,也就是該隊(duì)列只能有一個(gè)消息者消費(fèi)消息

6.?x-max-length-bytes:Number

限定隊(duì)列的最大占用空間,當(dāng)超出后也使用類似于Redis的LRU算法

7.?x-dead-letter-exchange:String

指定隊(duì)列關(guān)聯(lián)的死信交換機(jī),有時(shí)候我們希望當(dāng)隊(duì)列的消息達(dá)到上限后溢出的消息不會(huì)被刪除掉,而是走到另一個(gè)隊(duì)列中保存起來

8. x-dead-letter-routing-key:String

指定死信交換機(jī)的路由鍵,一般和6一起定義

9. x-max-priority:Number

如果將一個(gè)隊(duì)列加上優(yōu)先級(jí)參數(shù),那么該隊(duì)列為優(yōu)先級(jí)隊(duì)列;

(1)、給隊(duì)列加上優(yōu)先級(jí)參數(shù)使其成為優(yōu)先級(jí)隊(duì)列

x-max-priority=10【0-255取值范圍】

(2)、給消息加上優(yōu)先級(jí)屬性

通過優(yōu)先級(jí)特性,將一個(gè)隊(duì)列實(shí)現(xiàn)插隊(duì)消費(fèi)

MessageProperties messageProperties=new MessageProperties();

messageProperties.setPriority(8);

10.?x-queue-mode:String(理解下即可)

隊(duì)列類型x-queue-mode=lazy懶隊(duì)列,在磁盤上盡可能多地保留消息以減少RAM使用,如果未設(shè)置,則隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息

11.?x-queue-master-locator:String(用的較少,不講)

在集群模式下設(shè)置隊(duì)列分配到的主節(jié)點(diǎn)位置信息;

每個(gè)queue都有一個(gè)master節(jié)點(diǎn),所有對(duì)于queue的操作都是事先在master上完成,之后再slave上進(jìn)行相同的操作;

每個(gè)不同的queue可以坐落在不同的集群節(jié)點(diǎn)上,這些queue如果配置了鏡像隊(duì)列,那么會(huì)有1個(gè)master和多個(gè)slave。

基本上所有的操作都落在master上,那么如果這些queues的master都落在個(gè)別的服務(wù)節(jié)點(diǎn)上,而其他的節(jié)點(diǎn)又很空閑,這樣就無法做到負(fù)載均衡,那么勢(shì)必會(huì)影響性能;

關(guān)于master queue host 的分配有幾種策略,可以在queue聲明的時(shí)候使用x-queue-master-locator參數(shù),或者在policy上設(shè)置queue-master-locator,或者直接在rabbitmq的配置文件中定義queue_master_locator,有三種可供選擇的策略:

(1)min-masters:選擇master queue數(shù)最少的那個(gè)服務(wù)節(jié)點(diǎn)host;

(2)client-local:選擇與client相連接的那個(gè)服務(wù)節(jié)點(diǎn)host;

(3)random:隨機(jī)分配;

添加依賴

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機(jī)

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(jī)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.queue.properties").build();

}

// 2.定義隊(duì)列

@Bean

public Queue queue() {

// String name 隊(duì)列名稱

// boolean durable 是否持久化

// boolean exclusive 排他隊(duì)列

// boolean autoDelete 自動(dòng)刪除

// @Nullable Map arguments

return new Queue("queue.properties.1", true, false, false);

}

// 3.交換機(jī)和隊(duì)列進(jìn)行綁定

@Bean

public Binding bindingNormal(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.queue.properties", "info", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.properties.1"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

十四、消息可靠性投遞

消息的可靠性投遞就是要保證消息投遞過程中每一個(gè)環(huán)節(jié)都要成功,那么這肯定要犧牲一些性能,性能與可靠性是無法兼得的;

如果業(yè)務(wù)實(shí)時(shí)一致性要求不是特別高的場(chǎng)景,可以犧牲一些可靠性來換取性能。

1.代表消息從生產(chǎn)者發(fā)送到 Exchange2.代表消息從 Exchange 路由到 Queue3.代表消息在 Queue 中存儲(chǔ)4.代表消費(fèi)者監(jiān)聽 Queue 并消費(fèi)消息

1、確保消息發(fā)送到 RabbitMQ 服務(wù)器的交換機(jī)上

可能因?yàn)榫W(wǎng)絡(luò)或者 Broker 的問題導(dǎo)致 1 失敗,而此時(shí)應(yīng)該讓生產(chǎn)者知道消息是否正確發(fā)送到了 Broker 的 exchange 中

有兩種解決方案:

第一種是開啟Confirm(確認(rèn))模式;(異步)

第二種是開啟Transaction(事務(wù))模式;(性能低,實(shí)際項(xiàng)目中很少用)

2、確保消息路由到正確的隊(duì)列

可能因?yàn)槁酚申P(guān)鍵字錯(cuò)誤,或者隊(duì)列不存在,或者隊(duì)列名稱錯(cuò)誤導(dǎo)致②失敗。

使用return模式,可以實(shí)現(xiàn)消息無法路由的時(shí)候返回給生產(chǎn)者;

當(dāng)然在實(shí)際生產(chǎn)環(huán)境下,我們不會(huì)出現(xiàn)這種問題,我們都會(huì)進(jìn)行嚴(yán)格測(cè)試才會(huì)上線(很少有這種問題);

另一種方式就是使用備份交換機(jī)(alternate-exchange),無法路由的消息會(huì)發(fā)送到這個(gè)備用交換機(jī)上

3、確保消息在隊(duì)列正確地存儲(chǔ)

可能因?yàn)橄到y(tǒng)宕機(jī)、重啟、關(guān)閉等等情況導(dǎo)致存儲(chǔ)在隊(duì)列的消息丟失,即 3 出現(xiàn)問題;

解決方案:

隊(duì)列持久化

QueueBuilder.durable(QUEUE).build();

交換機(jī)持久化

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

消息持久化

MessageProperties messageProperties = new MessageProperties();

messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 默認(rèn)就是持久化的

集群,鏡像隊(duì)列,高可用

確保消息從隊(duì)列正確地投遞到消費(fèi)者

采用消息消費(fèi)時(shí)的手動(dòng)ack確認(rèn)機(jī)制來保證;

如果消費(fèi)者收到消息后未來得及處理即發(fā)生異常,或者處理過程中發(fā)生異常,會(huì)導(dǎo)致④失敗。

為了保證消息從隊(duì)列可靠地達(dá)到消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement);

#開啟手動(dòng)ack消息消費(fèi)確認(rèn)

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費(fèi)者在訂閱隊(duì)列時(shí),通過上面的配置,不自動(dòng)確認(rèn),采用手動(dòng)確認(rèn),RabbitMQ會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號(hào)后才從隊(duì)列中刪除消息;

如果消息消費(fèi)失敗,也可以調(diào)用basicReject()或者basicNack()來拒絕當(dāng)前消息而不是確認(rèn)。如果requeue參數(shù)設(shè)置為true,可以把這條消息重新存入隊(duì)列,以便發(fā)給下一個(gè)消費(fèi)者(當(dāng)然,只有一個(gè)消費(fèi)者的時(shí)候,這種方式可能會(huì)出現(xiàn)無限循環(huán)重復(fù)消費(fèi)的情況,可以投遞到新的隊(duì)列中,或者只打印異常日志);

十五、消息的冪等性

消息消費(fèi)時(shí)的冪等性(消息不被重復(fù)消費(fèi))

同一個(gè)消息,第一次接收,正常處理業(yè)務(wù),如果該消息第二次再接收,那就不能再處理業(yè)務(wù),否則就處理重復(fù)了

冪等性是:對(duì)于一個(gè)資源,不管你請(qǐng)求一次還是請(qǐng)求多次,對(duì)該資源本身造成的影響應(yīng)該是相同的,不能因?yàn)橹貜?fù)的請(qǐng)求而對(duì)該資源重復(fù)造成影響;

以接口冪等性舉例:

接口冪等性是指:一個(gè)接口用同樣的參數(shù)反復(fù)調(diào)用,不會(huì)造成業(yè)務(wù)錯(cuò)誤,那么這個(gè)接口就是具有冪等性的,比如:注冊(cè)接口、發(fā)送短信驗(yàn)證碼接口;

比如同一個(gè)訂單我支付兩次,但是只會(huì)扣款一次,第二次支付不會(huì)扣款,這說明這個(gè)支付接口是具有冪等性的

如何避免消息的重復(fù)消費(fèi)問題?(消息消費(fèi)時(shí)d額冪等性)

全局唯一 ID + Redis

生產(chǎn)者在發(fā)送消息時(shí),為每條消息設(shè)置一個(gè)全局唯一的 messageId,消費(fèi)者拿到消息后,使用setnx 命令,將 messageId 作為 key 放到 redis 中:setnx(messageId, 1),若返回1,說明之前沒有消費(fèi)過,正常消費(fèi);若返回0,說明這條消息之前已消費(fèi)過,拋棄;

參考代碼

//1、把消息的唯一ID寫入redis

boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中該key不存在,那么就設(shè)置,存在就不設(shè)置

if (flag) { //key不存在返回true

//相當(dāng)于是第一次消費(fèi)該消息

//TODO 處理業(yè)務(wù)

System.out.println("正常處理業(yè)務(wù)....." + orders.getId());

}

柚子快報(bào)激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記

http://yzkb.51969.com/

好文鏈接

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18380993.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄