柚子快報(bào)激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記
柚子快報(bào)激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記
學(xué)習(xí)視頻:動(dòng)力節(jié)點(diǎn)RabbitMQ教程|12小時(shí)學(xué)會(huì)rabbitmq消息中間件_嗶哩嗶哩_bilibili
一、RabbitMQ 運(yùn)行環(huán)境搭建
RabbitMQ 是使用 Erlang 語言開發(fā)的,所以要先下載安裝 Erlang
下載時(shí)一定要注意版本兼容性:RabbitMQ Erlang 版本要求 — 兔子MQ
二、啟動(dòng)及停止 RabbitMQ
1、啟動(dòng) RabbitMQ
進(jìn)入到安裝目錄的 sbin 目錄下
# -detached 表示在后臺(tái)啟動(dòng)運(yùn)行 rabbitmq, 不加該參數(shù)表示前臺(tái)啟動(dòng)
# rabbitmq 的運(yùn)行日志存放在安裝目錄的 var 目錄下
# 啟動(dòng)
./rabbitmq-server -detached
2、查看 RabbitMQ 狀態(tài)
進(jìn)入到安裝目錄的 sbin 目錄下
# -n rabbit 是指定節(jié)點(diǎn)名稱為 rabbit,目前只有一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)名默認(rèn)為 rabbit
# 此處 -n rabbit 也可以省略
# 查看狀態(tài)
./rabbitmqctl -n rabbit status
3、停止 RabbitMQ
進(jìn)入到安裝目錄的 sbin 目錄下
# 停止
./rabbitmqctl shutdown
4、配置 path 環(huán)境變量
打開配置文件
vim /etc/profile
進(jìn)行配置
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbin
export RABBIT_HOME PATH
刷新環(huán)境變量
source /etc/profile
三、RabbitMQ 管理命令
./rabbitmqctl 是一個(gè)管理命令,可以管理 rabbitmq 的很多操作
./rabbitmqctl help 可以查看有哪些操作
查看具體子命令,可以使用 ./rabbitmqctl help 子命令名稱
1、用戶管理
用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。
這些操作都是通過 rabbitmqct 管理命令來實(shí)現(xiàn)完成
查看幫助:rabbitmqct add_user --help
查看當(dāng)前用戶列表
rabbitmqctl list_users
新增一個(gè)用戶
# 語法:rabbitmqctl add_user Username Password
rabbitmqctl add_user admin 123456
2、設(shè)置用戶角色
設(shè)置用戶角色
# 語法:rabbitmqctl set_user_tags User Tag
# 這里設(shè)置用戶的角色為管理員角色
rabbitmqctl set_user_tags admin administrator
3、設(shè)置用戶權(quán)限
設(shè)置用戶權(quán)限
# 說明:此操作設(shè)置了 admin 用戶擁有操作虛擬主機(jī)/下的所以權(quán)限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
四、web 管理后臺(tái)
Rabbitmq 有一個(gè) web 管理后臺(tái),這個(gè)管理后臺(tái)是以插件的方式提供的,啟動(dòng)后臺(tái) web 管理功能,需要切換到安裝目錄的 sbin 目錄下進(jìn)行操作
1、啟用管理后臺(tái)
# 查看 rabbitmq 的插件列表
./rabbitmq-plugins list
# 啟用
./rabbitmq-plugins enable rabbitmq_management
# 禁用
./rabbitmq-plugins disable rabbitmq_management
2、訪問管理后臺(tái)
訪問時(shí)需要檢查虛擬機(jī)的防火 墻
使用:http://你的虛擬機(jī)ip:15672?就可以訪問了
注意:如果使用默認(rèn)用戶 guest,密碼 guest 登錄,會(huì)提示 User can only log in via localhost,說明 guest 用戶只能從 localhost 本機(jī)登錄,所以不要使用該用戶
3、新建虛擬主機(jī)
新建主機(jī)
建完后如下
五、RabbitMQ 工作模型
broker 相當(dāng)于 mysql 服務(wù)器,virtual host 相當(dāng)于數(shù)據(jù)庫(可以有多個(gè)數(shù)據(jù)庫)
queue 相當(dāng)于表,消息相當(dāng)于記錄
消息隊(duì)列有三個(gè)核心要素:消息生產(chǎn)者、消息隊(duì)列、消息消費(fèi)者
生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用;消費(fèi)者(Consumer):接收消息的應(yīng)用;
代理(Broker):就是消息服務(wù)器,RabbitMQ Server 就是 Message Broker
鏈接(Connection):鏈接 RabbitMQ 服務(wù)器的 TCP 長(zhǎng)連接
信道(Channel):鏈接中的一個(gè)虛擬通道,消息隊(duì)列發(fā)送或者接收消息時(shí),都是通過信道進(jìn)行的
虛擬主機(jī)(Virtual host):一個(gè)虛擬分組,在代碼中就是一個(gè)字符串,當(dāng)多個(gè)不同的用戶使用同一個(gè) RabbitMQ 服務(wù)時(shí),可以劃分出多個(gè) Virtual host,每個(gè)用戶在自己的 Virtual host 創(chuàng)建 exchange/queue 等(分類比較清晰、相互隔離)
交換機(jī)(Exchange):交換機(jī)負(fù)責(zé)從生產(chǎn)者接收消息,并根據(jù)交換機(jī)類型分發(fā)到對(duì)應(yīng)的消息隊(duì)列中,起到一個(gè)路由的作用
路由鍵(Routing Key):交換機(jī)根據(jù)路由鍵來決定消息分發(fā)到那個(gè)隊(duì)列,路由鍵是消息的目的地址
綁定(Binding):綁定是隊(duì)列與交換機(jī)的一個(gè)關(guān)聯(lián)鏈接(關(guān)聯(lián)關(guān)系)
隊(duì)列(Queue):存儲(chǔ)消息的緩存
消息(Message):由生產(chǎn)者通過 RabbitMQ 發(fā)送給消費(fèi)者的信息(消息可以是任何數(shù)據(jù),字符串、user 對(duì)象、json 串等)
六、RabbitMQ 交換機(jī)類型
Exchange(X)可翻譯為交換機(jī)/交換器/路由器,類型有以下幾種:
Fanout Exchange(扇形)Direct Exchange(直連)Topic Exchange(主題)Headers Exchange(頭部)
1、Fanout Exchange
1.1、介紹
Fanout 扇形,散開的;扇形交換機(jī)
投遞到所有綁定的隊(duì)列,不需要路由鍵,不需要進(jìn)行路由鍵的匹配,相當(dāng)于廣播、群發(fā)
P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列
1.2、示例
添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
/*
rabbitmq三部曲
1.定義交換機(jī)
2.定義隊(duì)列
3.綁定交換機(jī)和隊(duì)列
*/
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchange.fanout");
}
// 2.定義隊(duì)列
@Bean
public Queue queueA() {
return new Queue("queue.fanout.a");
}
@Bean
public Queue queueB() {
return new Queue("queue.fanout.b");
}
// 3.綁定交換機(jī)和隊(duì)列
@Bean
public Binding bindingA(FanoutExchange fanoutExchange, Queue queueA) {
// 將隊(duì)列A綁定到扇形交換機(jī)
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
@Bean
public Binding bindingB(FanoutExchange fanoutExchange, Queue queueB) {
// 將隊(duì)列B綁定到扇形交換機(jī)
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}
發(fā)送消息
@Component
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 定義要發(fā)送的消息
String msg = "hello world";
// 轉(zhuǎn)換并且發(fā)送
Message message = new Message(msg.getBytes());
rabbitTemplate.convertAndSend("exchange.fanout", "", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.fanout.a", "queue.fanout.b"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收到的消息為: " + msg);
}
}
2、Direct Exchange
2.1、介紹
根據(jù)路由鍵精確匹配(一摸一樣)進(jìn)行路由消息隊(duì)列
P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列
2.2、示例
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.direct").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queueA() {
return QueueBuilder.durable("queue.direct.a").build();
}
@Bean
public Queue queueB() {
return QueueBuilder.durable("queue.direct.b").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingA(DirectExchange directExchange, Queue queueA) {
return BindingBuilder.bind(queueA).to(directExchange).with("error");
}
@Bean
public Binding bindingB1(DirectExchange directExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(directExchange).with("info");
}
@Bean
public Binding bindingB2(DirectExchange directExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(directExchange).with("error");
}
@Bean
public Binding bindingB3(DirectExchange directExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(directExchange).with("warning");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.direct", "info", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.direct.a", "queue.direct.b"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收到的消息為: " + msg);
}
}
3、Topic Exchange
3.1、介紹
通配符匹配,相當(dāng)于模糊匹配
# 匹配多個(gè)單詞,用來表示任意數(shù)量(零個(gè)或多個(gè))單詞* 匹配一個(gè)單詞(必須有一個(gè),而且只有一個(gè)),用 . 隔開的為一個(gè)單詞舉例
beijing.# = beijing.queue.abc,beijing.queue.xyz.xxxbeijing.* = beijing.queue,beijing.xyz
發(fā)送時(shí)指定的路由鍵:lazy.orange.rabbit
P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列
3.2、示例
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange("exchange.topic").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queueA() {
return QueueBuilder.durable("queue.topic.a").build();
}
@Bean
public Queue queueB() {
return QueueBuilder.durable("queue.topic.b").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingA(TopicExchange topicExchange, Queue queueA) {
return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");
}
@Bean
public Binding bindingB1(TopicExchange topicExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");
}
@Bean
public Binding bindingB2(TopicExchange topicExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate; // 用RabbitTemplate也可以
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.topic", "lazy.orange.rabbit", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.topic.a", "queue.topic.b"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收到的消息為: " + msg);
}
}
4、Headers Exchange
4.1、介紹
用的比較少
基于消息內(nèi)容中的 headers 屬性進(jìn)行匹配
P 表示生產(chǎn)者X 表示交換機(jī)紅色部分表示隊(duì)列
4.2、示例
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange("exchange.headers").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queueA() {
return QueueBuilder.durable("queue.headers.a").build();
}
@Bean
public Queue queueB() {
return QueueBuilder.durable("queue.headers.b").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingA(HeadersExchange headersExchange, Queue queueA) {
Map
headerValues.put("type", "m");
headerValues.put("status", 1);
return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();
}
@Bean
public Binding bindingB(HeadersExchange headersExchange, Queue queueB) {
Map
headerValues.put("type", "s");
headerValues.put("status", 0);
return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 消息屬性
MessageProperties messageProperties = new MessageProperties();
Map
headers.put("type", "s");
headers.put("status", 0);
// 設(shè)置消息頭
messageProperties.setHeaders(headers);
// 添加了消息屬性
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
// 對(duì)于頭部交換機(jī),路由key無所謂(不需要)
rabbitTemplate.convertAndSend("exchange.headers", "", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.headers.a", "queue.headers.b"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收到的消息為: " + msg);
}
}
七、RabbitMQ 過期時(shí)間
過期時(shí)間也叫 TTL 消息,TTL:Time To Live
消息的過期時(shí)間有兩種設(shè)置方式:(過期消息)
1、設(shè)置單條消息的過期時(shí)間
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.direct").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queue() {
return QueueBuilder.durable("queue.ttl").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); // 過期的毫秒數(shù)
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
rabbitTemplate.convertAndSend("exchange.direct", "info", message);
}
}
2、隊(duì)列屬性設(shè)置消息過期時(shí)間
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.direct").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queue() {
// 設(shè)置消息過期時(shí)間
Map
arguments.put("x-message-ttl", 15000); // 15秒
// 方式1
return new Queue("queue.ttl", true, false, false, arguments);
// 方式2
// return QueueBuilder.durable("queue.ttl").withArguments(arguments).build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.direct", "info", message);
}
}
3、注意
如果消息和隊(duì)列都設(shè)置了過期時(shí)間,則消息的 TTL 以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。
八、死信隊(duì)列
也有叫死信交換機(jī)、死信郵箱等說法
DLX:Dead-Letter-Exchange 死信交換機(jī),死信郵箱
注意:圖中的 3.理由key 改為 路由key
以下情況下一個(gè)消息會(huì)進(jìn)入 DLX(Dead Letter Exchange)死信交換機(jī)。
1、消息過期
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 正常交換機(jī)
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange("exchange.normal.b").build();
}
// 正常隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.normal.b")
.deadLetterExchange("exchange.dlx.b") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣
.build();
}
// 綁定交換機(jī)和隊(duì)列(正常)
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
// 分割線
// 死信交換機(jī)
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange("exchange.dlx.b").build();
}
// 死信隊(duì)列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.dlx.b").build();
}
// 綁定交換機(jī)和隊(duì)列(死信)
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 消息屬性
MessageProperties messageProperties = new MessageProperties();
// 設(shè)置單條消息過期時(shí)間,單位為毫秒
messageProperties.setExpiration("15000");
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
// 對(duì)于頭部交換機(jī),路由key無所謂(不需要)
rabbitTemplate.convertAndSend("exchange.normal.b", "order", message);
}
}
2、隊(duì)列過期
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 正常交換機(jī)
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange("exchange.normal.a").build();
}
// 正常隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.normal.a")
.ttl(15000) // 過期時(shí)間 15秒
.deadLetterExchange("exchange.dlx.a") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣
.build();
}
// 綁定交換機(jī)和隊(duì)列(正常)
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
// 分割線
// 死信交換機(jī)
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange("exchange.dlx.a").build();
}
// 死信隊(duì)列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.dlx.a").build();
}
// 綁定交換機(jī)和隊(duì)列(死信)
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.a", "order", message);
}
}
3、隊(duì)列達(dá)到最大長(zhǎng)度
??添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 正常交換機(jī)
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange("exchange.normal.c").build();
}
// 正常隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.normal.c")
.deadLetterExchange("exchange.dlx.c") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣
.maxLength(5) // 設(shè)置隊(duì)列最大長(zhǎng)度
.build();
}
// 綁定交換機(jī)和隊(duì)列(正常)
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
// 分割線
// 死信交換機(jī)
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange("exchange.dlx.c").build();
}
// 死信隊(duì)列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.dlx.c").build();
}
// 綁定交換機(jī)和隊(duì)列(死信)
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 1; i <= 10; i++) {
String str = "hello world" + i;
Message message = MessageBuilder.withBody(str.getBytes()).build();
// 對(duì)于頭部交換機(jī),路由key無所謂(不需要)
rabbitTemplate.convertAndSend("exchange.normal.c", "order", message);
}
}
}
4、消費(fèi)者拒絕消息不進(jìn)行重新投遞
從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列
??添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
listener:
simple:
acknowledge-mode: manual # 啟動(dòng)手動(dòng)確認(rèn)
配置類
@Configuration
public class RabbitConfig {
// 正常交換機(jī)
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange("exchange.normal.d").build();
}
// 正常隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.normal.d")
.deadLetterExchange("exchange.dlx.d") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣
.build();
}
// 綁定交換機(jī)和隊(duì)列(正常)
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
// 分割線
// 死信交換機(jī)
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange("exchange.dlx.d").build();
}
// 死信隊(duì)列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.dlx.d").build();
}
// 綁定交換機(jī)和隊(duì)列(死信)
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
String str = "hello world";
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.d", "order", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.normal.d"})
public void receiveMsg(Message message, Channel channel) {
// 獲取消息屬性
MessageProperties messageProperties = message.getMessageProperties();
// 獲取消息的唯一標(biāo)識(shí),類似人的身份證號(hào)
long deliveryTag = messageProperties.getDeliveryTag();
try {
// 手動(dòng)加一段錯(cuò)誤代碼
int i = 1 / 0;
byte[] body = message.getBody();
String str = new String(body);
System.out.println("接收到的消息為: " + str);
// 消費(fèi)者的手動(dòng)確認(rèn)
// multiple為false,只確認(rèn)當(dāng)前消息,改為true是確認(rèn)當(dāng)前消息以前的消息
// 確認(rèn)后服務(wù)器就可以刪了
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// 接收者出現(xiàn)問題
// multiple為false,只確認(rèn)當(dāng)前消息,改為true是確認(rèn)當(dāng)前消息以前的消息
// requeue為true,表示重新入隊(duì),為false表示不重新入隊(duì)
// channel.basicNack(deliveryTag, false, true);
// requeue改為false,不重新入隊(duì),這時(shí)就會(huì)進(jìn)入死信隊(duì)列
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
5、消費(fèi)者拒絕消息
開啟手動(dòng)確認(rèn)模式,并拒絕消息,不重新投遞,則進(jìn)入死信隊(duì)列
??添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
listener:
simple:
acknowledge-mode: manual # 啟動(dòng)手動(dòng)確認(rèn)
配置類
@Configuration
public class RabbitConfig {
// 正常交換機(jī)
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange("exchange.normal.e").build();
}
// 正常隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.normal.e")
.deadLetterExchange("exchange.dlx.e") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機(jī)和死信隊(duì)列綁定的key一樣
.build();
}
// 綁定交換機(jī)和隊(duì)列(正常)
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
// 分割線
// 死信交換機(jī)
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange("exchange.dlx.e").build();
}
// 死信隊(duì)列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.dlx.e").build();
}
// 綁定交換機(jī)和隊(duì)列(死信)
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
String str = "hello world";
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.e", "order", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.normal.e"})
public void receiveMsg(Message message, Channel channel) throws IOException {
// 獲取消息屬性
MessageProperties messageProperties = message.getMessageProperties();
// 獲取消息的唯一標(biāo)識(shí),類似人的身份證號(hào)
long deliveryTag = messageProperties.getDeliveryTag();
// 拒絕消息
// 第一個(gè)參數(shù)是消息的唯一標(biāo)識(shí)
// 第二個(gè)參數(shù)是是否重新入隊(duì)
channel.basicReject(deliveryTag, false);
}
}
九、延遲隊(duì)列
場(chǎng)景:有一個(gè)訂單,15 分鐘內(nèi)如果不支付,就把該訂單設(shè)置為交易關(guān)閉,那么就不能支付了,這類實(shí)現(xiàn)延遲任務(wù)的場(chǎng)景就可以采用延遲隊(duì)列來實(shí)現(xiàn),當(dāng)然除了延遲隊(duì)列來實(shí)現(xiàn),也可以有一些其他方法實(shí)現(xiàn);
1、采用消息中間件
RabbitMQ 本身不支持延遲隊(duì)列,可以使用 TTL 結(jié)合 DLX 的方式來實(shí)現(xiàn)消息的延遲投遞,即把 DLX 跟某個(gè)隊(duì)列綁定,到了指定時(shí)間,消息過期后,就會(huì)從 DLX 路由到這個(gè)隊(duì)列,消費(fèi)者可以從這個(gè)隊(duì)列取走消息?
代碼:正常延遲
???添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)(一機(jī)兩用,正常交換機(jī)和死信交換機(jī))
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.delay.a").build();
}
// 2.定義隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.delay.normal.a")
.ttl(25000) // 過期時(shí)間25秒
.deadLetterExchange("exchange.delay.a") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 死信路由key
.build();
}
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.delay.dlx.a").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingNormal(DirectExchange directExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(directExchange).with("order");
}
@Bean
public Binding bindingDlx(DirectExchange directExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(directExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.delay.a", "order", message);
}
}
問題:如果先發(fā)送的消息,消息延遲時(shí)間長(zhǎng),會(huì)影響后面的延遲時(shí)間段的消息的消費(fèi)
解決:不同延遲時(shí)間的消息要發(fā)到不同的隊(duì)列上,同一個(gè)隊(duì)列的消息,它的延遲時(shí)間應(yīng)該一樣
?代碼:解決延遲問題
?添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)(一機(jī)兩用,正常交換機(jī)和死信交換機(jī))
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.delay").build();
}
// 2.定義隊(duì)列
// 正常的訂單隊(duì)列
@Bean
public Queue normalOrderQueue() {
return QueueBuilder.durable("queue.delay.normal.order")
.deadLetterExchange("exchange.delay") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 死信路由key
.build();
}
// 正常的支付隊(duì)列
@Bean
public Queue normalPayQueue() {
return QueueBuilder.durable("queue.delay.normal.pay")
.deadLetterExchange("exchange.delay") // 設(shè)置死信交換機(jī)
.deadLetterRoutingKey("error") // 死信路由key
.build();
}
// 死信隊(duì)列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("queue.delay.dlx").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
// 綁定正常的訂單隊(duì)列
@Bean
public Binding bindingNormalOrderQueue(DirectExchange directExchange, Queue normalOrderQueue) {
return BindingBuilder.bind(normalOrderQueue).to(directExchange).with("order");
}
// 綁定正常的支付隊(duì)列
@Bean
public Binding bindingNormalPayQueue(DirectExchange directExchange, Queue normalPayQueue) {
return BindingBuilder.bind(normalPayQueue).to(directExchange).with("pay");
}
// 綁定死信隊(duì)列
@Bean
public Binding bindingDlx(DirectExchange directExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(directExchange).with("error");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 第一條消息
Message orderMsg = MessageBuilder.withBody("這是一條訂單消息 20秒過期 ".getBytes()).setExpiration("20000").build();
// 第二條消息
Message payMsg = MessageBuilder.withBody("這是一條支付消息 10秒過期 ".getBytes()).setExpiration("10000").build();
rabbitTemplate.convertAndSend("exchange.delay", "order", orderMsg);
System.out.println("訂單消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
rabbitTemplate.convertAndSend("exchange.delay", "pay", payMsg);
System.out.println("支付消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.delay.dlx"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收的消息是: " + msg + "接收的時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
2、使用延遲插件
使用 rabbitmq-delayed-message-exchange 延遲插件
下載
選擇對(duì)應(yīng)的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:
Community Plugins — RabbitMQ
將插件拷貝到 RabbitMQ 服務(wù)器 plugins 目錄下解壓
// 如果 unzip 沒有安裝,先安裝一下
// yum install unzip -y
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
啟用插件
// 開啟插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟 rabbitmq 使其生效(此處也可以不重啟)
消息發(fā)送后不會(huì)直接投遞到隊(duì)列,而是存儲(chǔ)到 Mnesia(嵌入式數(shù)據(jù)庫),檢查 x-delay 時(shí)間(消息頭部);
延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運(yùn)行環(huán)境;
Mnesia 是一個(gè)小型數(shù)據(jù)庫,不適合于大量延遲消息的實(shí)現(xiàn)解決了消息過期時(shí)間不一致出現(xiàn)的問題
代碼實(shí)現(xiàn)
添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public CustomExchange customExchange() {
Map
arguments.put("x-delayed-type", "direct"); // 放一個(gè)參數(shù)
return new CustomExchange("exchange.elay.b", "x-delayed-message", true, false, arguments);
}
// 2.定義隊(duì)列
@Bean
public Queue queue() {
return QueueBuilder.durable("queue.delay.b").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingNormalOrderQueue(CustomExchange customExchange, Queue queue) {
// 綁定,
return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 第一條消息
MessageProperties messageProperties1 = new MessageProperties();
messageProperties1.setHeader("x-delay", 25000); // 設(shè)置延遲消息
Message message1 = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties1).build();
// 第二條消息
MessageProperties messageProperties2 = new MessageProperties();
messageProperties2.setHeader("x-delay", 15000); // 設(shè)置延遲消息
Message message2 = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties2).build();
// 發(fā)送消息
rabbitTemplate.convertAndSend("exchange.elay.b", "plugin", message1);
System.out.println("訂單消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
rabbitTemplate.convertAndSend("exchange.elay.b", "plugin", message2);
System.out.println("支付消息發(fā)送消息時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.delay.b"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收的消息是: " + msg + "接收的時(shí)間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
十、消息 Confirm 模式
消息的 confirm 確認(rèn)機(jī)制,是指生產(chǎn)者投遞消息后,到達(dá)了消息服務(wù)器 Broker 里面的 exchange 交換機(jī),則會(huì)給生產(chǎn)者一個(gè)應(yīng)答,生產(chǎn)者接收到應(yīng)答,用來確定這條消息是否正常的發(fā)送到 Broker 的 exchange 中,這也是消息可靠性投遞的重要保障;
添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.confirm").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queue() {
return QueueBuilder.durable("queue.confirm").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingA(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct // 構(gòu)造方法后執(zhí)行它,相當(dāng)于初始化的作用
public void init() {
// 第一個(gè)參數(shù): 關(guān)聯(lián)數(shù)據(jù)
// 第二個(gè)參數(shù): 是否到達(dá)交換機(jī)
// 第三個(gè)參數(shù): 原因
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
// 打印一下關(guān)聯(lián)數(shù)據(jù)
System.out.println("關(guān)聯(lián)數(shù)據(jù): " + correlationData);
if (ack) {
System.out.println("消息正確到達(dá)交換機(jī)");
}
if (!ack) {
System.out.println("消息沒有到達(dá)交換機(jī),原因: " + cause);
}
});
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
CorrelationData correlationData = new CorrelationData(); // 關(guān)聯(lián)數(shù)據(jù)
correlationData.setId("order_123456");
rabbitTemplate.convertAndSend("exchange.confirm", "info", message, correlationData);
}
}
十一、消息 Return 模式
rabbitmq 整個(gè)消息投遞的路徑為:
producer —> exchange —> queue —> consumer
消息從 producer 到 exchange 則會(huì)返回一個(gè) confirmCallback消息從 exchange -> queue 投遞失敗則會(huì)返回一個(gè) returnCallback
我們可以利用這兩個(gè) callback 控制消息的可靠性傳遞
添加依賴
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
publisher-returns: true # 開啟return模式
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.return").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queue() {
return QueueBuilder.durable("queue.return").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingA(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct // 構(gòu)造方法后執(zhí)行它,相當(dāng)于初始化的作用
public void init() {
rabbitTemplate.setReturnsCallback(message -> {
System.out.println("消息從交換機(jī)沒有正確的路由到(投遞到)隊(duì)列,原因: " + message.getReplyText());
});
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
CorrelationData correlationData = new CorrelationData(); // 關(guān)聯(lián)數(shù)據(jù)
correlationData.setId("order_654321");
// 發(fā)送正確不會(huì)回調(diào),只有發(fā)送錯(cuò)誤才會(huì)回調(diào)
rabbitTemplate.convertAndSend("exchange.return", "info", message, correlationData);
}
}
十二、交換機(jī)詳細(xì)屬性
Name:交換機(jī)名稱;就是一個(gè)字符串Type:交換機(jī)類型,direct、topic、fanout、headers 四種Durability:持久化,聲明交換機(jī)是否持久化,代表交換機(jī)在服務(wù)器重啟后是否還存在Auto delete:是否自動(dòng)刪除,曾經(jīng)有隊(duì)列綁定到該交換機(jī),后來解綁了,那就會(huì)自動(dòng)刪除該交換機(jī)Internal:內(nèi)部使用的,如果是 yes,客戶端無法直接發(fā)消息到此交換機(jī),他只能用于交換機(jī)與交換機(jī)的綁定(用的很少)Arguments:只有一個(gè)取值 alternate-exchange,表示備用交換機(jī),當(dāng)正常交換機(jī)的消息發(fā)送不到正常隊(duì)列時(shí),消息就會(huì)往備用交換機(jī)里面發(fā)
添加依賴
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
/*
return ExchangeBuilder
.directExchange("exchange.properties.1") // 交換機(jī)名字
.durable(false) // 是否持久化,一般都是持久化
.autoDelete() // 設(shè)置自動(dòng)刪除(當(dāng)隊(duì)列跟他解綁后是否自動(dòng)刪除),一般不是自動(dòng)刪除
.alternate("") // 設(shè)置備用交換機(jī)名字
.build();
*/
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
// 正常交換機(jī)
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.
directExchange("exchange.normal.1")
.alternate("exchange.alternate.1") // 設(shè)置備用交換機(jī)
.build();
}
// 備用交換機(jī)
@Bean
public FanoutExchange alternateExchange() {
return ExchangeBuilder.fanoutExchange("exchange.alternate.1").build();
}
// 2.定義隊(duì)列
// 正常隊(duì)列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("queue.normal.1").build();
}
// 備用隊(duì)列
@Bean
public Queue alternateQueue() {
return QueueBuilder.durable("queue.alternate.1").build();
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
// 正常交換機(jī)與正常隊(duì)列綁定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("info");
}
// 備用交換機(jī)與備用隊(duì)列綁定
@Bean
public Binding bindingAlternate(FanoutExchange alternateExchange, Queue alternateQueue) {
return BindingBuilder.bind(alternateQueue).to(alternateExchange);
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
// 發(fā)送正確不會(huì)回調(diào),只有發(fā)送錯(cuò)誤才會(huì)回調(diào)
rabbitTemplate.convertAndSend("exchange.normal.1", "error", message);
}
}
十三、隊(duì)列詳細(xì)屬性
Type:隊(duì)列類型,一般是 ClassicName:隊(duì)列名稱,就是一個(gè)字符串,隨便一個(gè)字符串就可以Durability:聲明隊(duì)列是否持久化,代表隊(duì)列在服務(wù)器重啟后是否還存在Auto delete:是否自動(dòng)刪除,如果為 true,當(dāng)沒有消費(fèi)者連接到這個(gè)隊(duì)列的時(shí)候,隊(duì)列會(huì)自動(dòng)刪除Exclusive:exclusive 屬性的隊(duì)列只對(duì)首次聲明它的連接可見,并且在連接斷開時(shí)自動(dòng)刪除;基本不設(shè)置它,設(shè)置成 falseArguments:隊(duì)列的其他屬性,例如指定 DLX(死信交換機(jī)等)
1. x-expires:Number
當(dāng) Queue(隊(duì)列)在指定的時(shí)間未被訪問,則隊(duì)列將被自動(dòng)刪除
2.?x-message-ttl:Number
發(fā)布的消息在隊(duì)列中存在多長(zhǎng)時(shí)間后被取消(單位毫秒)
3.?x-overflow:String
設(shè)置隊(duì)列溢出行為,當(dāng)達(dá)到隊(duì)列的最大長(zhǎng)度時(shí),消息會(huì)發(fā)生什么,有效值為?Drop Head?或?Reject Publish
4. x-max-length:Number
隊(duì)列所能容下消息的最大長(zhǎng)度,當(dāng)超出長(zhǎng)度后,新消息將會(huì)覆蓋最前面的消息,類似于Redis的LRU算法
5. x-single-active-consumer:默認(rèn)為false
激活單一的消費(fèi)者,也就是該隊(duì)列只能有一個(gè)消息者消費(fèi)消息
6.?x-max-length-bytes:Number
限定隊(duì)列的最大占用空間,當(dāng)超出后也使用類似于Redis的LRU算法
7.?x-dead-letter-exchange:String
指定隊(duì)列關(guān)聯(lián)的死信交換機(jī),有時(shí)候我們希望當(dāng)隊(duì)列的消息達(dá)到上限后溢出的消息不會(huì)被刪除掉,而是走到另一個(gè)隊(duì)列中保存起來
8. x-dead-letter-routing-key:String
指定死信交換機(jī)的路由鍵,一般和6一起定義
9. x-max-priority:Number
如果將一個(gè)隊(duì)列加上優(yōu)先級(jí)參數(shù),那么該隊(duì)列為優(yōu)先級(jí)隊(duì)列;
(1)、給隊(duì)列加上優(yōu)先級(jí)參數(shù)使其成為優(yōu)先級(jí)隊(duì)列
x-max-priority=10【0-255取值范圍】
(2)、給消息加上優(yōu)先級(jí)屬性
通過優(yōu)先級(jí)特性,將一個(gè)隊(duì)列實(shí)現(xiàn)插隊(duì)消費(fèi)
MessageProperties messageProperties=new MessageProperties();
messageProperties.setPriority(8);
10.?x-queue-mode:String(理解下即可)
隊(duì)列類型x-queue-mode=lazy懶隊(duì)列,在磁盤上盡可能多地保留消息以減少RAM使用,如果未設(shè)置,則隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息
11.?x-queue-master-locator:String(用的較少,不講)
在集群模式下設(shè)置隊(duì)列分配到的主節(jié)點(diǎn)位置信息;
每個(gè)queue都有一個(gè)master節(jié)點(diǎn),所有對(duì)于queue的操作都是事先在master上完成,之后再slave上進(jìn)行相同的操作;
每個(gè)不同的queue可以坐落在不同的集群節(jié)點(diǎn)上,這些queue如果配置了鏡像隊(duì)列,那么會(huì)有1個(gè)master和多個(gè)slave。
基本上所有的操作都落在master上,那么如果這些queues的master都落在個(gè)別的服務(wù)節(jié)點(diǎn)上,而其他的節(jié)點(diǎn)又很空閑,這樣就無法做到負(fù)載均衡,那么勢(shì)必會(huì)影響性能;
關(guān)于master queue host 的分配有幾種策略,可以在queue聲明的時(shí)候使用x-queue-master-locator參數(shù),或者在policy上設(shè)置queue-master-locator,或者直接在rabbitmq的配置文件中定義queue_master_locator,有三種可供選擇的策略:
(1)min-masters:選擇master queue數(shù)最少的那個(gè)服務(wù)節(jié)點(diǎn)host;
(2)client-local:選擇與client相連接的那個(gè)服務(wù)節(jié)點(diǎn)host;
(3)random:隨機(jī)分配;
添加依賴
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式
application.yml 配置文件
spring:
rabbitmq:
host: 192.168.224.133 # ip
port: 5672 # 端口
username: admin # 用戶名
password: 123456 # 密碼
virtual-host: powernode # 虛擬主機(jī)
配置類
@Configuration
public class RabbitConfig {
// 1.定義交換機(jī)
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("exchange.queue.properties").build();
}
// 2.定義隊(duì)列
@Bean
public Queue queue() {
// String name 隊(duì)列名稱
// boolean durable 是否持久化
// boolean exclusive 排他隊(duì)列
// boolean autoDelete 自動(dòng)刪除
// @Nullable Map
return new Queue("queue.properties.1", true, false, false);
}
// 3.交換機(jī)和隊(duì)列進(jìn)行綁定
@Bean
public Binding bindingNormal(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}
發(fā)送消息
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.queue.properties", "info", message);
}
}
接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = {"queue.properties.1"})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("接收到的消息為: " + msg);
}
}
十四、消息可靠性投遞
消息的可靠性投遞就是要保證消息投遞過程中每一個(gè)環(huán)節(jié)都要成功,那么這肯定要犧牲一些性能,性能與可靠性是無法兼得的;
如果業(yè)務(wù)實(shí)時(shí)一致性要求不是特別高的場(chǎng)景,可以犧牲一些可靠性來換取性能。
1.代表消息從生產(chǎn)者發(fā)送到 Exchange2.代表消息從 Exchange 路由到 Queue3.代表消息在 Queue 中存儲(chǔ)4.代表消費(fèi)者監(jiān)聽 Queue 并消費(fèi)消息
1、確保消息發(fā)送到 RabbitMQ 服務(wù)器的交換機(jī)上
可能因?yàn)榫W(wǎng)絡(luò)或者 Broker 的問題導(dǎo)致 1 失敗,而此時(shí)應(yīng)該讓生產(chǎn)者知道消息是否正確發(fā)送到了 Broker 的 exchange 中
有兩種解決方案:
第一種是開啟Confirm(確認(rèn))模式;(異步)
第二種是開啟Transaction(事務(wù))模式;(性能低,實(shí)際項(xiàng)目中很少用)
2、確保消息路由到正確的隊(duì)列
可能因?yàn)槁酚申P(guān)鍵字錯(cuò)誤,或者隊(duì)列不存在,或者隊(duì)列名稱錯(cuò)誤導(dǎo)致②失敗。
使用return模式,可以實(shí)現(xiàn)消息無法路由的時(shí)候返回給生產(chǎn)者;
當(dāng)然在實(shí)際生產(chǎn)環(huán)境下,我們不會(huì)出現(xiàn)這種問題,我們都會(huì)進(jìn)行嚴(yán)格測(cè)試才會(huì)上線(很少有這種問題);
另一種方式就是使用備份交換機(jī)(alternate-exchange),無法路由的消息會(huì)發(fā)送到這個(gè)備用交換機(jī)上
3、確保消息在隊(duì)列正確地存儲(chǔ)
可能因?yàn)橄到y(tǒng)宕機(jī)、重啟、關(guān)閉等等情況導(dǎo)致存儲(chǔ)在隊(duì)列的消息丟失,即 3 出現(xiàn)問題;
解決方案:
隊(duì)列持久化
QueueBuilder.durable(QUEUE).build();
交換機(jī)持久化
ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
消息持久化
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 默認(rèn)就是持久化的
集群,鏡像隊(duì)列,高可用
確保消息從隊(duì)列正確地投遞到消費(fèi)者
采用消息消費(fèi)時(shí)的手動(dòng)ack確認(rèn)機(jī)制來保證;
如果消費(fèi)者收到消息后未來得及處理即發(fā)生異常,或者處理過程中發(fā)生異常,會(huì)導(dǎo)致④失敗。
為了保證消息從隊(duì)列可靠地達(dá)到消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement);
#開啟手動(dòng)ack消息消費(fèi)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消費(fèi)者在訂閱隊(duì)列時(shí),通過上面的配置,不自動(dòng)確認(rèn),采用手動(dòng)確認(rèn),RabbitMQ會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號(hào)后才從隊(duì)列中刪除消息;
如果消息消費(fèi)失敗,也可以調(diào)用basicReject()或者basicNack()來拒絕當(dāng)前消息而不是確認(rèn)。如果requeue參數(shù)設(shè)置為true,可以把這條消息重新存入隊(duì)列,以便發(fā)給下一個(gè)消費(fèi)者(當(dāng)然,只有一個(gè)消費(fèi)者的時(shí)候,這種方式可能會(huì)出現(xiàn)無限循環(huán)重復(fù)消費(fèi)的情況,可以投遞到新的隊(duì)列中,或者只打印異常日志);
十五、消息的冪等性
消息消費(fèi)時(shí)的冪等性(消息不被重復(fù)消費(fèi))
同一個(gè)消息,第一次接收,正常處理業(yè)務(wù),如果該消息第二次再接收,那就不能再處理業(yè)務(wù),否則就處理重復(fù)了
冪等性是:對(duì)于一個(gè)資源,不管你請(qǐng)求一次還是請(qǐng)求多次,對(duì)該資源本身造成的影響應(yīng)該是相同的,不能因?yàn)橹貜?fù)的請(qǐng)求而對(duì)該資源重復(fù)造成影響;
以接口冪等性舉例:
接口冪等性是指:一個(gè)接口用同樣的參數(shù)反復(fù)調(diào)用,不會(huì)造成業(yè)務(wù)錯(cuò)誤,那么這個(gè)接口就是具有冪等性的,比如:注冊(cè)接口、發(fā)送短信驗(yàn)證碼接口;
比如同一個(gè)訂單我支付兩次,但是只會(huì)扣款一次,第二次支付不會(huì)扣款,這說明這個(gè)支付接口是具有冪等性的
如何避免消息的重復(fù)消費(fèi)問題?(消息消費(fèi)時(shí)d額冪等性)
全局唯一 ID + Redis
生產(chǎn)者在發(fā)送消息時(shí),為每條消息設(shè)置一個(gè)全局唯一的 messageId,消費(fèi)者拿到消息后,使用setnx 命令,將 messageId 作為 key 放到 redis 中:setnx(messageId, 1),若返回1,說明之前沒有消費(fèi)過,正常消費(fèi);若返回0,說明這條消息之前已消費(fèi)過,拋棄;
參考代碼
//1、把消息的唯一ID寫入redis
boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中該key不存在,那么就設(shè)置,存在就不設(shè)置
if (flag) { //key不存在返回true
//相當(dāng)于是第一次消費(fèi)該消息
//TODO 處理業(yè)務(wù)
System.out.println("正常處理業(yè)務(wù)....." + orders.getId());
}
柚子快報(bào)激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記
好文鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。