柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMq學(xué)習(xí)
Springboot整合RabbitMq
使用步驟
1、引入spring-boot-starter-amqp的依賴,并配置host主機(jī)地址、port端口、virtualHost虛擬主機(jī)、用戶名、密碼等
2、聲明交換機(jī)、隊(duì)列、交換機(jī)與隊(duì)列的綁定關(guān)系
3、使用RabbitTemplate的convertAndSend方法將消息發(fā)送給交換機(jī),交換機(jī)收到消息后,路由給所綁定的隊(duì)列
4、消費(fèi)者使用@RabbitListener注解方法監(jiān)聽(tīng)隊(duì)列,當(dāng)收到消息時(shí),回調(diào)此注解方法
補(bǔ)充(詳細(xì))
1、虛擬主機(jī)有什么用?
用于數(shù)據(jù)隔離,不同項(xiàng)目可以創(chuàng)建不同的虛擬主機(jī)。但是配置時(shí),需要有對(duì)應(yīng)虛擬主機(jī)權(quán)限的用戶才可以使用指定的虛擬主機(jī)
用于數(shù)據(jù)隔離,不同項(xiàng)目可以創(chuàng)建不同的虛擬主機(jī)。但是配置時(shí),需要有對(duì)應(yīng)虛擬主機(jī)權(quán)限的用戶才可以使用指定的虛擬主機(jī)
2、當(dāng)在rabbitmq中已經(jīng)在web管理后臺(tái),通過(guò)手動(dòng)的方式創(chuàng)建了隊(duì)列,交換機(jī)時(shí),啟動(dòng)項(xiàng)目時(shí)又去聲明隊(duì)列,交換機(jī)時(shí),然后項(xiàng)目停了,然后又去啟動(dòng)項(xiàng)目又去聲明隊(duì)列交換機(jī)時(shí)?;蛘吆竺鎲?dòng)項(xiàng)目時(shí),聲明的交換機(jī)類(lèi)型改變時(shí)?原來(lái)已經(jīng)存在的交換機(jī)或隊(duì)列是否會(huì)被刪除?原來(lái)里面已經(jīng)存在的消息是否會(huì)被清空掉?
如果消息代理中已經(jīng)存在對(duì)應(yīng)名稱的交換機(jī),那么如果修改了代碼中此交換機(jī)類(lèi)型的定義,那么啟動(dòng)的時(shí)候就會(huì)報(bào)錯(cuò)。
3、rabbitTemplate的convertAndSend發(fā)送消息時(shí),可以不指定交換機(jī),直接發(fā)給隊(duì)列(會(huì)使用默認(rèn)的交換機(jī),默認(rèn)的交換機(jī)就是根據(jù)消息發(fā)送時(shí)所指定的routekey找到與此routeKey名稱相同的消息隊(duì)列)
4、rabbitTemplate的convertAndSend發(fā)送消息時(shí),可以指定交換機(jī),交換機(jī)會(huì)根據(jù)當(dāng)前交換機(jī)類(lèi)型和此交換機(jī)的綁定關(guān)系將消息路由給對(duì)應(yīng)綁定的隊(duì)列
5、rabbitTemplate的convertAndSend發(fā)送不同的java類(lèi)型消息,可以指定消息轉(zhuǎn)換器。在使用@RabbitListener注解方法監(jiān)聽(tīng)消息時(shí),聲明發(fā)送的java類(lèi)型即可
6、在同一項(xiàng)目中可以使用@RabbitmListener注解的1個(gè)方法來(lái)監(jiān)聽(tīng)指定的多個(gè)消息隊(duì)列。
7、在同一項(xiàng)目中的多個(gè)方法都使用了@RabbitListener注解,并且這幾個(gè)方法監(jiān)聽(tīng)的消息隊(duì)列中有相同的,那么當(dāng)這些相同消息隊(duì)列中收到消息時(shí),會(huì)負(fù)載均衡的交給這幾個(gè)方法處理(也就是1個(gè)消息只會(huì)給到其中1個(gè)方法處理),這就是work queues工作隊(duì)列模式。
8、在同一項(xiàng)目中的多個(gè)方法都使用了@RabbitListener注解,并且這幾個(gè)方法監(jiān)聽(tīng)的消息隊(duì)列中有相同的,即使這幾個(gè)方法中處理的效率有高有低(故意在其中某個(gè)方法中睡它10s,這個(gè)方法的處理效率就低了),但是他們收到的消息數(shù)量仍然是按負(fù)載均衡分發(fā)的。那么肯定要解決這個(gè)問(wèn)題,因此可以加上配置:spring.rabbit.listener.simple.prefetch=1,意思就是消費(fèi)者每次拉取1條消息,這條消息處理完成之后,消息代理才會(huì)將下1條消息發(fā)過(guò)來(lái),這樣就不是按照負(fù)載均衡的方式發(fā)給這多個(gè)方法了,而是能者多勞。
9、在不同的項(xiàng)目中都監(jiān)聽(tīng)了同1個(gè)消息隊(duì)列,本來(lái)這個(gè)隊(duì)列中有消息時(shí),是按負(fù)載均衡1個(gè)消費(fèi)者1個(gè)來(lái)輪流發(fā),但是如果其中某個(gè)消費(fèi)者掛了,剩余的消息是否還按輪流來(lái)?當(dāng)它再次上線,是否繼續(xù)讓它輪流來(lái)?
10、在同一隊(duì)列上有多個(gè)消費(fèi)者在監(jiān)聽(tīng),當(dāng)消息發(fā)給某個(gè)消費(fèi)者時(shí),這個(gè)消費(fèi)者在處理時(shí)發(fā)生了異常,這個(gè)消息會(huì)被忽略掉?還是交給其它消費(fèi)者?默認(rèn)會(huì)被忽略掉
11、@RabbitListener可以標(biāo)注在方法上,指定需要監(jiān)聽(tīng)的消息隊(duì)列(可指定多個(gè)消息隊(duì)列),然后在方法參數(shù)位置上聲明所要處理的消息類(lèi)型(消息的發(fā)送者所發(fā)送的類(lèi)型)。
12、@RabbitListener可以標(biāo)注在類(lèi)上,然后在這個(gè)類(lèi)中的某個(gè)方法上使用@RabbitHandler注解,并在這個(gè) 方法的方法參數(shù)位置聲明byte[] data來(lái)接收消息數(shù)據(jù)。
13、1個(gè)消息隊(duì)列有多個(gè)消費(fèi)者在監(jiān)聽(tīng),每個(gè)消息都會(huì)只發(fā)給其中某1個(gè)消費(fèi)者處理,這樣的模型叫工作隊(duì)列模式
14、交換機(jī)類(lèi)型有默認(rèn)交換機(jī)類(lèi)型、Fanout類(lèi)型、Direct類(lèi)型、Topic類(lèi)型。
Fanout類(lèi)型交換機(jī):1個(gè)Fanout類(lèi)型交換機(jī)可以綁定多個(gè)消息隊(duì)列,當(dāng)Fanout類(lèi)型收到1個(gè)消息時(shí),會(huì)直接發(fā)給所綁定的每1個(gè)消息隊(duì)列 Direct類(lèi)型,1個(gè)Direct類(lèi)型交換機(jī)可以綁定多個(gè)消息隊(duì)列,每綁定1個(gè)消息隊(duì)列時(shí),需要指定對(duì)應(yīng)的路由key(routeKey),當(dāng)Direct類(lèi)型交換機(jī)收到1個(gè)消息時(shí),會(huì)根據(jù)消息發(fā)送時(shí)所指定的路由key發(fā)送給routeKey完全匹配到的消息隊(duì)列, Topic類(lèi)型,1個(gè)Topic類(lèi)型交換機(jī)可以綁定多個(gè)消息隊(duì)列,每綁定1個(gè)消息隊(duì)列時(shí),需要指定對(duì)應(yīng)的通配符(#代指0個(gè)或多個(gè)單詞和*代指1個(gè)單詞),當(dāng)Topic類(lèi)型交換機(jī)收到1個(gè)消息時(shí),會(huì)根據(jù)消息發(fā)送時(shí)所指定的路由key發(fā)送給routeKey通配匹配到的消息隊(duì)列,
15、Spring Amqp提供了聲明隊(duì)列、交換機(jī)、隊(duì)列和交換機(jī)綁定關(guān)系的類(lèi)。在sprigboot中如何使用呢?只需要將它們以bean的形式定義出來(lái),并且項(xiàng)目中必須至少使用了1個(gè)@RabbitListener注解,那么springboot會(huì)幫助我們?cè)趓abbitmq消息代理服務(wù)器中創(chuàng)建對(duì)應(yīng)的隊(duì)列、交換機(jī)、隊(duì)列和交換機(jī)綁定關(guān)系(注意前提:要想以@bean的方式創(chuàng)建隊(duì)列和交換機(jī),必須至少有一個(gè)監(jiān)聽(tīng)者@RabbitListener,否則即使聲明了Queue、Exchange、Binding這些bean,也不會(huì)創(chuàng)建成功的隊(duì)列和交換機(jī)的)
可以使用QueueBuilder來(lái)聲明隊(duì)列
可考慮定義為持久當(dāng)項(xiàng)目重啟過(guò)程中時(shí),原來(lái)已存在的隊(duì)列仍然能正常收到消息,并且這些消息能正常消費(fèi),不會(huì)被刪除或清空 可以使用ExchangeBuilder來(lái)聲明交換機(jī)
不同的交換機(jī)類(lèi)型有不同的實(shí)現(xiàn)類(lèi)如果在項(xiàng)目啟動(dòng)前就已經(jīng)存在了該名稱的交換機(jī),并且類(lèi)型相同,那么就不會(huì)創(chuàng)建。如果已經(jīng)存在了該名稱的交換機(jī),但是現(xiàn)在項(xiàng)目代碼中又把這個(gè)類(lèi)型改成了其它類(lèi)型,那么創(chuàng)建此交換機(jī)不會(huì)成功,僅會(huì)打印錯(cuò)誤創(chuàng)建的日志,不影響啟動(dòng) 可以使用BindingBuilder來(lái)聲明隊(duì)列和交換機(jī)的綁定關(guān)系
聲明代碼示例
@Configuration
public class QueueConfig {
@Bean
public Queue queue3() {
return QueueBuilder.durable("direct.queue3").build();
}
@Bean
public Exchange exchange3() {
return ExchangeBuilder.directExchange("direct.exchange3").build();
}
@Bean
public Binding binding3() {
return BindingBuilder.bind(queue3()).to(exchange3()).with("A3").noargs();
}
}
16、Spring Amqp還可以使用@RabbitListener注解來(lái)聲明隊(duì)列和交換機(jī)(這個(gè)不需要前提,直接如下聲明就會(huì)創(chuàng)建),如下聲明會(huì)創(chuàng)建對(duì)應(yīng)的消息隊(duì)列,交換機(jī),交換機(jī)和隊(duì)列的綁定,并且當(dāng)消息隊(duì)列中有消息時(shí),被注解的方法將會(huì)被回調(diào)
@RabbitListener(bindings = {
@QueueBinding(
// 隊(duì)列是否持久: 當(dāng)消息代理重啟時(shí), 非持舊隊(duì)列將會(huì)干掉了。不設(shè)置時(shí),默認(rèn)是持久的。
value = @Queue(value = "direct.queue1",durable = "true"),
// 交換機(jī)是否持久: 當(dāng)消息代理重啟時(shí), 非持久隊(duì)列將會(huì)干掉了。
// 不設(shè)置時(shí),默認(rèn)是持久的。
// 默認(rèn)就是DIRECT類(lèi)型交換機(jī)
exchange = @Exchange(value = "direct.exchange1",type = ExchangeTypes.DIRECT),
// 當(dāng)發(fā)送到direct.exchange1交換機(jī)的消息時(shí),所指定的routeKey是red或者是blue時(shí),由此方法處理
key = {"red","blue"}
)
})
public void listenQueue(AddUser addUser) {
// ...
}
17、使用rabbitmqTemplate#convert(exchange, routeKey, object)發(fā)送消息時(shí),所發(fā)送消息的類(lèi)型是Object類(lèi)型。默認(rèn)支持的類(lèi)型是Message類(lèi)型,它有2個(gè)屬性byte[] body和MessageProperties messageProperties。
如果發(fā)送的消息的類(lèi)型不是Message類(lèi)型,那么會(huì)使用1個(gè)消息轉(zhuǎn)換器(org.springframework.amqp.support.converter.MessageConverter),將object對(duì)象轉(zhuǎn)換為Message對(duì)象。
在spring amqp中默認(rèn)使用的是SimpleMessageConverter對(duì)此消息作序列化處理,它會(huì)在object是byte[]時(shí),直接就創(chuàng)建Message,是String時(shí),直接獲取字符串轉(zhuǎn)為utf8編碼的字節(jié),實(shí)現(xiàn)了Serializable時(shí),使用jkd的序列化機(jī)制轉(zhuǎn)為byte[],在這3種情況下,都會(huì)設(shè)置MessageProperties#setContentType為對(duì)應(yīng)的內(nèi)容類(lèi)型。在rabbitmq的web后臺(tái)管理頁(yè)面看到的是字節(jié)數(shù)組轉(zhuǎn)base64字符串的形式。在反序列化時(shí),也會(huì)使用對(duì)應(yīng)的逆向方式去作反序列化。
可以使用jackson序列化的方式,引入jackson的依賴com.fasterxml.jackson.core的jackson-databind,然后定義org.springframework.amqp.support.converter.Jackson2JsonMessageConverter的bean即可,它會(huì)自動(dòng)生效的。也可以直接創(chuàng)建它,然后將它直接設(shè)置給RabbitTemplate就行了
18、docker安裝rabbitmq
docker run \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management
19、在集群模式下如何保證使用@RabbitListener標(biāo)注的方法只存在1個(gè)消費(fèi)者呢?只是在聲明隊(duì)列的時(shí)候的參數(shù)(arguments參數(shù)),將x-single-active-consumer設(shè)置為T(mén)rue即可。參考:RabbitMQ多消費(fèi)者實(shí)例時(shí),保證只有一個(gè)消費(fèi)者進(jìn)行消費(fèi)(單活消費(fèi)者模式)。
如果要保證消息順序,可以這樣考慮:可以通過(guò)添加多個(gè)消息隊(duì)列,每個(gè)消息隊(duì)列只允許1個(gè)消費(fèi)者,并且開(kāi)啟手動(dòng)確認(rèn),并且設(shè)置prefetch為1,也就是每個(gè)隊(duì)列每次拉取1條消息,當(dāng)手動(dòng)確認(rèn)完了這條消息,再來(lái)處下1條消息,來(lái)保證處理消息的順序。但這樣會(huì)大大限制并發(fā)能力,可以將同1批要保證順序的消息按順序的發(fā)往同1個(gè)消息隊(duì)列,這可以通過(guò)對(duì)這1批消息通過(guò)某種哈希運(yùn)算得出相同消息隊(duì)列id,只要消息的發(fā)送是按順序的,那么消息的消費(fèi)就是按順序的,并且增加多個(gè)隊(duì)列就相當(dāng)于在增加并發(fā)度
20、消息可靠性,消息從發(fā)送者到mq,再?gòu)膍q到消費(fèi)者,其中每個(gè)環(huán)節(jié)都可能發(fā)生問(wèn)題。
發(fā)送者的可靠性,這部分的可靠性由生產(chǎn)者重連機(jī)制和生產(chǎn)者確認(rèn)機(jī)制來(lái)保證。
生產(chǎn)者重連:由于網(wǎng)絡(luò)波動(dòng)的存在,可能會(huì)出現(xiàn)客戶端連接mq失敗的情況。spring amqp提供了開(kāi)啟連接失敗后的重連機(jī)制(注意:這不是消息發(fā)送失敗的重試機(jī)制,是連接失敗的重試機(jī)制)。但是,這個(gè)重連是阻塞式的重試,在多次重試等待的過(guò)程中,當(dāng)前線程是被阻塞的,因此如果對(duì)業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,就要合理配置等待時(shí)長(zhǎng)和重試次數(shù),當(dāng)然,也可以考慮使用異步線程來(lái)執(zhí)行發(fā)送消息的代碼 (注意:這里說(shuō)的重連是項(xiàng)目啟動(dòng)之后,使用rabbitmqTemplate發(fā)送消息時(shí),肯定需要連接mq,是這個(gè)時(shí)候的重連,不是指的,項(xiàng)目啟動(dòng)時(shí)的重連。并且使用rabbitmqTemplate發(fā)送消息的下1行代碼在重連期間是不會(huì)執(zhí)行的,當(dāng)在試完最大嘗試次數(shù)還沒(méi)有連接成功后,就會(huì)在當(dāng)前線程拋出異常,下1行代碼不會(huì)執(zhí)行了) spring:
rabbitmq:
host: xxx.xx.xx.xx
port: 5672
virtual-host: /demo-vh
username: guest
password: xxxxxx
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
生產(chǎn)者確認(rèn):rabbitmq提供了Publisher Confiirm和Publisher Return這2種確認(rèn)機(jī)制。開(kāi)啟確認(rèn)機(jī)制后,在mq成功收到消息后,會(huì)返回確認(rèn)消息給生產(chǎn)者。返回的額結(jié)果有以下幾種情況:
消息投遞到了mq,但是路由失敗。此時(shí)會(huì)通過(guò)publisher Return返回路由異常原因,然后返回ack,告知投遞成功臨時(shí)消息投遞到了mq,并且入隊(duì)成功,返回ack,告知投遞成功持久消息投遞到了mq,并且入隊(duì)完成持久化,返回ack告知投遞成功其它情況都會(huì)返回nack,告知投遞失敗 配置如下: spring:
rabbitmq:
host: xxx.xx.xx.xx
port: 5672
virtual-host: /demo-vh
username: guest
password: xxxxxx
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟確認(rèn)機(jī)制(注意開(kāi)啟確認(rèn)機(jī)制后,對(duì)效率有所影響的哦,
# 演示發(fā)送大量消息時(shí),建議關(guān)掉)
publisher-returns: true # 開(kāi)啟確認(rèn)機(jī)制
設(shè)置confirmCallback和returnCallback @Slf4j
@Configuration
public class RabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void postProcessTemplate() {
// 設(shè)置returnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/* 1. 當(dāng)發(fā)送消息給mq的交換機(jī), 并且指定1個(gè)在此交換機(jī)上不存在的綁定關(guān)系的routeKey時(shí), 此方法會(huì)被回調(diào)
2. 當(dāng)發(fā)送消息給mq的交換機(jī), 并且交換機(jī)能夠根據(jù)綁定關(guān)系將消息路由到隊(duì)列時(shí), 這個(gè)方法是不會(huì)回調(diào)的
*/
@Override
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey) {
log.info("統(tǒng)一收到returnedMessage, message: {}, replayCode:{}, replyText: {}," +
"exchange: {}, routeKey:{}",
message, replyCode, replyText, exchange, routingKey);
}
});
// 設(shè)置confirmCallback
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/* 1. 當(dāng)發(fā)送消息給指定的交換機(jī), mq交換機(jī)收到消息時(shí), 會(huì)回調(diào)此方法, 并且傳過(guò)來(lái)的ack為true
(無(wú)論此交換機(jī)后面是否能將此消息路由到隊(duì)列, 都會(huì)回調(diào)此方法)
2. 當(dāng)發(fā)送消息給1個(gè)不存在交換機(jī)時(shí), 會(huì)回調(diào)此方法, 傳過(guò)來(lái)的ack為false
3. 當(dāng)發(fā)送1個(gè)消息時(shí), 此時(shí)斷網(wǎng)的情況下, 經(jīng)過(guò)一小段時(shí)間后, 會(huì)回調(diào)此方法, 傳過(guò)來(lái)的ack為false
*/
@Override
public void confirm(CorrelationData correlationData, // 能夠從此對(duì)象中拿到發(fā)送消息時(shí)的信息
boolean ack,
String cause) {
log.info("統(tǒng)一收到回執(zhí), ack: {}, correlationData: {}, cause: {}",
ack, correlationData, cause);
}
});
log.info("已設(shè)置confirmCallback和returnCallback");
}
}
發(fā)送消息 @RequestMapping("rabbit")
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendToMq2")
public Object sendToMq2(String content, String targetExchage, String routeKey) {
HashMap
data.put("content", content);
rabbitTemplate.convertAndSend(targetExchage, routeKey, data);
return "ok";
}
@GetMapping("sendToMq3")
public Object sendToMq3(String content, String targetExchage, String routeKey) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
correlationData
.getFuture()
.addCallback(new ListenableFutureCallback
@Override
public void onFailure(Throwable ex) {
log.info("失敗...");
}
/* 接收到回執(zhí)時(shí), 該方法觸發(fā)。
*/
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.info("接收到回執(zhí), 是否ack: {}, 原因: {}",
result.isAck(), result.getReason());
}
});
HashMap
data.put("content", content);
rabbitTemplate.convertAndSend(targetExchage, routeKey, data, correlationData);
return "ok";
}
}
上面我用的spring-boot-starter-amqp版本是2.1.8.RELEASE,在2.7.12版本中,配置有所不同,應(yīng)如下配置: spring:
rabbitmq:
publisher-confirm_type: correlated # 開(kāi)啟publisher confirm機(jī)制, 并設(shè)置confirm類(lèi)型
# publisher-confirm-type有3中模式可選
# - none 關(guān)閉confirm機(jī)制
# - simple 同步阻塞等待mq的繪制消息
# - correlated 異步回調(diào)方式返回回執(zhí)消息
publisher-returns: true # 開(kāi)啟publisher return機(jī)制
其中,當(dāng)publisher-confirm_type: simple時(shí),發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì)關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker; mq的可靠性:在默認(rèn)情況下,rabbitmq會(huì)將收到的消息保存到內(nèi)存中以降低消息收發(fā)的延遲。這樣會(huì)導(dǎo)致2個(gè)問(wèn)題。1個(gè)是:一旦mq宕機(jī),內(nèi)存中的消息會(huì)丟失。第二個(gè)是:內(nèi)存空間有限,當(dāng)消費(fèi)者故障或處理過(guò)慢時(shí),會(huì)導(dǎo)致消息積壓,引發(fā)mq阻塞。
數(shù)據(jù)持久化
交換機(jī)持久化(如果是非持久化,當(dāng)mq服務(wù)器重啟時(shí),會(huì)丟失;spring已設(shè)置默認(rèn)持久化,通過(guò)durable屬性設(shè)置)隊(duì)列持久化 (如果是非持久化,當(dāng)mq服務(wù)器重啟時(shí),會(huì)丟失;spring已設(shè)置默認(rèn)持久化,通過(guò)durable屬性設(shè)置)消息持久化
如果是非持久化,當(dāng)mq服務(wù)器重啟時(shí),會(huì)丟失;需要發(fā)送消息時(shí)設(shè)置deliveryMode,1為非持久化,2為持久化,可以參考MessageProperties中的deliveryMode屬性中使用的MessageDeliveryMode這個(gè)枚舉類(lèi),默認(rèn)是持久化的??梢允褂胷abbitTemplate#convertAndSend發(fā)送時(shí),指定MessagePostProcessor消息后置處理器,從Message對(duì)象中拿到MessageProperties,然后設(shè)置MessageProperties的deliveryMode屬性)也可以使用MessageBuilder這個(gè)構(gòu)建者來(lái)構(gòu)建消息 LazyQueue:從RabbitMO的3.6.0版本開(kāi)始,就增加了Lazy Queue的概念,也就是惰性隊(duì)列。性能較之前有很大提升
惰性隊(duì)列的特征如下
接收到消息后直接存入磁盤(pán)而非內(nèi)存(內(nèi)存中只保留最近的消息,默認(rèn)2048條)消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)在3.12版本后,所有隊(duì)列都是Lazy Queue模式,無(wú)法更改。 如何創(chuàng)建惰性隊(duì)列
在rabbitmq后臺(tái)管理頁(yè),在聲明隊(duì)列時(shí),指定Aruguements參數(shù)中,添加x-queue-mode為lazy即可 代碼的方式 @Bean
public Queue queue4() {
return QueueBuilder
.durable("direct.queue4")
// .lazy() // 需要2.2版本以上才有直接設(shè)置lazy的方法,不過(guò)沒(méi)事,用下面的也是一樣的
.withArgument("x-queue-mode","lazy")
.build();
}
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue2",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void lazyQueue2(String msg) {
log.info("消費(fèi)消息: {}", msg);
}
rabbitmq如何保證消息的可靠性?
首先通過(guò)配置可以讓交換機(jī)、隊(duì)列、以及發(fā)送的消息都持久化。這樣隊(duì)列中的消息會(huì)持久化到磁盤(pán),MQ重啟消息依然存在RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后會(huì)稱為隊(duì)列的默認(rèn)模式。LazyQueue會(huì)將所有消息都持久化,并且性能有很大提升(測(cè)試1000000條,19s就完成了,不會(huì)出現(xiàn)page out,并且不會(huì)阻塞mq接收新的消息)。開(kāi)啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMO只有在消息持久化完成后才會(huì)給生產(chǎn)者返回ACK回執(zhí) 消費(fèi)者的可靠性
為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)。回執(zhí)有三種可選值:
ack:成功處理消息,RabbitMO從隊(duì)列中刪除該消息nack:消息處理失敗,RabbitMo需要再次投遞消息reject:消息處理失敗并拒絕該消息,RabbitMO從隊(duì)列中刪除該消息 SpringAMQP已經(jīng)實(shí)現(xiàn)了消息確認(rèn)功能。并允許我們通過(guò)配置文件選擇ACK處理方式(通過(guò):spring.rabbitmq.listener.simple.acknowledge-mode屬性來(lái)配置),有如下三種方式
none: 不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從Mq刪除,不管監(jiān)聽(tīng)方法是否出現(xiàn)異常。 manual: 手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,可以捕獲異??刂浦卦嚧螖?shù),甚至可以控制失敗消息的處理方式,存在業(yè)務(wù)入侵,但更靈活
代碼示例 @Component
@RabbitListener(queues = "test.queue1")
public class MessageConsumer {
@RabbitHandler
public void recivedMessage(Message msg,
OrderReturnApplyEntity orderReturnApplyEntity,
Channel channel) throws IOException {
try {
System.out.println("接收到消息:" + msg);
int i = 1 / 0;
// 確認(rèn)收到消息,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
// 當(dāng)前作為mq的消費(fèi)端有1個(gè)consumeTag的消費(fèi)者標(biāo)識(shí), mq每次發(fā)送消息時(shí)都會(huì)標(biāo)識(shí)這條消息的deliveryTag, 這個(gè)投遞標(biāo)識(shí)會(huì)遞增。
// 當(dāng)消費(fèi)者斷開(kāi)連接后, 又連接上了mq, 此時(shí)devliveryTag會(huì)從1開(kāi)始繼續(xù)遞增
// 所以如果要唯一標(biāo)識(shí)消息的話, 就要在發(fā)送消息的時(shí)候, 指定correlationData的id
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (msg.getMessageProperties().getRedelivered()) {
System.out.println("消息重試后依然失敗,拒絕再次接收");
// 拒絕消息,不再重新入隊(duì)
// (如果綁定了死信隊(duì)列消息會(huì)進(jìn)入死信隊(duì)列,沒(méi)有綁定死信隊(duì)列則消息被丟棄,
// 也可以把失敗消息記錄到redis或者mysql中),也可以設(shè)置為true再重試。
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息消費(fèi)時(shí)出現(xiàn)異常,即將再次返回隊(duì)列處理");
// Nack消息,重新入隊(duì)(重試一次)參數(shù)二表示是否批量,參數(shù)三表示是否重新入隊(duì)列
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),
false, true);
}
log.error("處理消息發(fā)生錯(cuò)誤: {}", e);
}
}
}
配置如下: server:
port: 8081
spring:
rabbitmq:
host: 119.23.61.24
port: 5672
virtual-host: /demo-vh
username: guest
password: 17E821zj
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟消息發(fā)送確認(rèn)機(jī)制
publisher-returns: true # 開(kāi)啟消息return機(jī)制
listener:
simple:
prefetch: 1 # 每次拉取1個(gè)消息, 處理完成后, 再拉取下1個(gè)消息, 能者多勞
acknowledge-mode: munual # 消費(fèi)者確認(rèn)機(jī)制為手動(dòng)
問(wèn)題1:假設(shè)已經(jīng)配置為manual手動(dòng)確認(rèn)模式,在recivedMessage方法中,忘記basicAck或者basicReject或者basicNack, 會(huì)怎么樣? 測(cè)試步驟:在rabbitmq后臺(tái)手動(dòng)發(fā)1條數(shù)據(jù),到test.queue1隊(duì)列中,在recivedMessage處理消息的方法中,聲明該消息,但就是不去(basicAck或者basicReject或者basicNack)。發(fā)現(xiàn),消息處理方法收到了1次消息,因此方法只調(diào)用了1次,在rabbitmq的web后臺(tái)該消息一直處于unacked狀態(tài)。此時(shí),關(guān)閉消費(fèi)者服務(wù),在rabbitmq的web后臺(tái)該消息處于ready狀態(tài),即待投遞。然后,再次啟動(dòng)消費(fèi)者,此消息又投遞了過(guò)來(lái),消費(fèi)者方法又調(diào)用了1次,此時(shí)再以同樣的配置和代碼啟動(dòng)另外1個(gè)消費(fèi)者,這個(gè)新啟動(dòng)的消費(fèi)者沒(méi)有收到這個(gè)消息(說(shuō)明它不會(huì)將已投遞但未確認(rèn)的消息投遞給這個(gè)新的消費(fèi)者)。然后將原來(lái)的消費(fèi)者停掉,此時(shí)發(fā)現(xiàn)新啟動(dòng)的消費(fèi)者立刻收到了這條消息,不過(guò)消息仍處于unacked狀態(tài)。這證明這個(gè)消息在發(fā)送給1個(gè)消費(fèi)者之后,會(huì)等待消費(fèi)者的回執(zhí),如果消費(fèi)者遲遲不給回執(zhí),那就一直等,直到這個(gè)消費(fèi)者掛了,消息才會(huì)變?yōu)閞eady待投遞狀態(tài),才會(huì)投遞給其它的消費(fèi)者。 測(cè)試2:使用basicAck確認(rèn)收到消息后,消息將從隊(duì)列中刪除。如下代碼測(cè)試,當(dāng)收到消息時(shí),使用basicAck(消息投遞標(biāo)記,是否批量確認(rèn)),批量確認(rèn)指的是,將deliveryTag小于當(dāng)前消息投遞標(biāo)記的消息一并確認(rèn),這樣broker就會(huì)清理掉之前未確認(rèn)的消息,這可以適用于某些情況:既然最后面的消息都確認(rèn)了,之前的消息確不確認(rèn)也就沒(méi)啥關(guān)系的情況。 @Slf4j
@Configuration
public class RabbitConfig {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2",durable = "true"),
exchange = @Exchange(value = "direct.exchange2",
type = ExchangeTypes.FANOUT),
key = {"red","blue"}
)
})
public void listenQueue(Message message, String msg, Channel channel) {
log.info("收到消息=====================");
log.info("channel:{}", channel);
log.info("msg:{}", msg);
log.info("message:{},", new String(message.getBody()));
// receivedDeliveryMode-是否持久化的消息,
// redelivered-是否重新投遞的消息,
// receivedRoutingKey-路由key,
// deliveryTag-投遞唯一標(biāo)記(從1開(kāi)始遞增, 每次消費(fèi)者重啟后, 繼續(xù)從1開(kāi)始)
// consumerTag-當(dāng)前消費(fèi)者唯一標(biāo)記(每個(gè)消費(fèi)者都有自己的唯一標(biāo)記,每次消費(fèi)者重連后,生成新的標(biāo)記)
// consumerQueue-當(dāng)前消費(fèi)者收到消息的隊(duì)列
log.info("messageProperties:{}", message.getMessageProperties());
// deliveryTag-投遞標(biāo)記(broker用于標(biāo)記此消息),
// multiple-是否批量確認(rèn)(批量確認(rèn)會(huì)讓broker將小于當(dāng)前消息的deliveryTag的消息給確認(rèn)掉刪了)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("處理結(jié)束=====================");
}
測(cè)試3:使用basicNack(deliveryTag, mulitiple, requeue) 拒絕簽收該消息,第3個(gè)參數(shù)決定是否讓消息重新回到隊(duì)列,如果消息回到隊(duì)列后,重新變?yōu)閞eady待投遞狀態(tài),會(huì)選擇消費(fèi)者再次進(jìn)行進(jìn)行投遞。如果不回到隊(duì)列,那么broker將會(huì)刪除此消息,但是如果此隊(duì)列還綁定了死信交換機(jī),那么此消息將會(huì)發(fā)給死信交換機(jī)。 @Slf4j
@Configuration
public class RabbitConfig {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2",durable = "true"),
exchange = @Exchange(value = "direct.exchange2",
type = ExchangeTypes.FANOUT),
key = {"red","blue"}
)
})
public void listenQueue(Message message, String msg, Channel channel) {
log.info("收到消息=====================");
log.info("channel:{}", channel);
log.info("msg:{}", msg);
log.info("message:{},", new String(message.getBody()));
// receivedDeliveryMode-是否持久化的消息,
// redelivered-是否重新投遞的消息,
// receivedRoutingKey-路由key,
// deliveryTag-投遞唯一標(biāo)記(從1開(kāi)始遞增, 每次消費(fèi)者重啟后, 繼續(xù)從1開(kāi)始)
// consumerTag-當(dāng)前消費(fèi)者唯一標(biāo)記(每個(gè)消費(fèi)者都有自己的唯一標(biāo)記,每次消費(fèi)者重連后,生成新的標(biāo)記)
// consumerQueue-當(dāng)前消費(fèi)者收到消息的隊(duì)列
log.info("messageProperties:{}", message.getMessageProperties());
// deliveryTag-投遞標(biāo)記(broker用于標(biāo)記此消息),
// multiple-是否批量確認(rèn)(批量確認(rèn)會(huì)讓broker將小于當(dāng)前消息的deliveryTag的消息給確認(rèn)掉刪了)
// requeue-是否繼續(xù)入隊(duì),
// =====================以下是2種情況的代碼及對(duì)應(yīng)的解釋=====================
// 如果不繼續(xù)入隊(duì), 那么broker將會(huì)刪除這個(gè)消息,
// 但是如果這個(gè)隊(duì)列綁定了死信交換機(jī),那么會(huì)發(fā)到該私信交換機(jī)中
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// 如果繼續(xù)入隊(duì), 那么消息重新回到隊(duì)列處于待投遞狀態(tài), 然后又會(huì)投遞給當(dāng)前消費(fèi)者,
// 然后當(dāng)前消費(fèi)者又去讓這個(gè)消息去入隊(duì)待投遞, 然后又投遞給當(dāng)前消費(fèi)者,
// 然后就成了死循環(huán)了。
// 此時(shí), 再開(kāi)1個(gè)一樣的消費(fèi)者,再監(jiān)聽(tīng)此隊(duì)列,結(jié)果2個(gè)消費(fèi)者都死循環(huán)了。
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.info("處理結(jié)束=====================");
}
}
測(cè)試4:使用basicReject(deliveryTag, requeue) 拒絕該消息,與上面使用basicNack(deliveryTag, multiple, requeue)一樣的測(cè)試結(jié)果,只是basicNack方法中比basicReject多了個(gè)multiple的參數(shù)。 @Slf4j
@Configuration
public class RabbitConfig {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2",durable = "true"),
exchange = @Exchange(value = "direct.exchange2",
type = ExchangeTypes.FANOUT),
key = {"red","blue"}
)
})
public void listenQueue(Message message, String msg, Channel channel) {
log.info("收到消息=====================");
log.info("channel:{}", channel);
log.info("msg:{}", msg);
log.info("message:{},", new String(message.getBody()));
// receivedDeliveryMode-是否持久化的消息,
// redelivered-是否重新投遞的消息,
// receivedRoutingKey-路由key,
// deliveryTag-投遞唯一標(biāo)記(從1開(kāi)始遞增, 每次消費(fèi)者重啟后, 繼續(xù)從1開(kāi)始)
// consumerTag-當(dāng)前消費(fèi)者唯一標(biāo)記(每個(gè)消費(fèi)者都有自己的唯一標(biāo)記,每次消費(fèi)者重連后,生成新的標(biāo)記)
// consumerQueue-當(dāng)前消費(fèi)者收到消息的隊(duì)列
log.info("messageProperties:{}", message.getMessageProperties());
// deliveryTag-投遞標(biāo)記(broker用于標(biāo)記此消息),
// requeue-是否繼續(xù)入隊(duì),
// =====================以下是2種情況的代碼及對(duì)應(yīng)的解釋=====================
// 如果不繼續(xù)入隊(duì), 那么broker將會(huì)刪除這個(gè)消息,
// 但是如果這個(gè)隊(duì)列綁定了死信交換機(jī),那么會(huì)發(fā)到該私信交換機(jī)中
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// 如果繼續(xù)入隊(duì), 那么消息重新回到隊(duì)列處于待投遞狀態(tài), 然后又會(huì)投遞給當(dāng)前消費(fèi)者,
// 然后當(dāng)前消費(fèi)者又去讓這個(gè)消息去入隊(duì)待投遞, 然后又投遞給當(dāng)前消費(fèi)者,
// 然后就成了死循環(huán)了。
// 此時(shí), 再開(kāi)1個(gè)一樣的消費(fèi)者,再監(jiān)聽(tīng)此隊(duì)列,結(jié)果2個(gè)消費(fèi)者都死循環(huán)了。
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
log.info("處理結(jié)束=====================");
}
}
auto:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),
當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack。當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack,并重新入隊(duì)(就會(huì)導(dǎo)致無(wú)限重試,導(dǎo)致程序死循環(huán))
測(cè)試:當(dāng)消費(fèi)者監(jiān)聽(tīng)方法在處理消息的過(guò)程中發(fā)生NullPointer異常時(shí),會(huì)自動(dòng)對(duì)該消息進(jìn)行nack(并重新入隊(duì)),然后mq服務(wù)器又會(huì)再次將此消息投遞給此消費(fèi)者,并且此時(shí)mq管理后臺(tái)中對(duì)應(yīng)隊(duì)列中該消息的狀態(tài)時(shí)nack(消費(fèi)者一旦nack,這個(gè)消息就變成ready待投遞了,然后再次馬上又投遞給消費(fèi)者了,就馬上變成了nack。測(cè)試時(shí),我又開(kāi)了1個(gè)一樣的消費(fèi)者服務(wù),結(jié)果2個(gè)消費(fèi)者服務(wù)都一直不斷的拋出異常),當(dāng)把消費(fèi)者停了的時(shí)候,此消息又變成ready待投遞了 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject,消息會(huì)被刪除,不會(huì)重新入隊(duì),不會(huì)導(dǎo)致死循環(huán)(在監(jiān)聽(tīng)方法中,手動(dòng)拋出MessageConversionException,那么也是跟reject并且不重新入隊(duì),一樣的效果,消息會(huì)被刪除,不會(huì)導(dǎo)致死循環(huán)) 測(cè)試代碼:
配置 server:
port: 8081
spring:
rabbitmq:
host: 119.23.61.24
port: 5672
virtual-host: /demo-vh
username: guest
password: 17E821zj
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟消息發(fā)送確認(rèn)機(jī)制
publisher-returns: true # 開(kāi)啟消息return機(jī)制
listener:
simple:
prefetch: 1 # 每次拉取1個(gè)消息, 處理完成后, 再拉取下1個(gè)消息, 能者多勞
acknowledge-mode: auto # 消費(fèi)者確認(rèn)機(jī)制
代碼:下面代碼就是,故意在確認(rèn)機(jī)制已經(jīng)是自動(dòng)確認(rèn)的配置下,依然確認(rèn)或拒絕,檢查不同情況下,消息的流轉(zhuǎn)情況 @Slf4j
@Configuration
public class RabbitConfig {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2",durable = "true"),
exchange = @Exchange(value = "direct.exchange2",
type = ExchangeTypes.FANOUT),
key = {"red","blue"}
)
})
public void listenQueue(Message message, String msg, Channel channel) {
log.info("收到消息=====================");
log.info("channel:{}", channel);
log.info("msg:{}", msg);
log.info("message:{},", new String(message.getBody()));
// receivedDeliveryMode-是否持久化的消息,
// redelivered-是否重新投遞的消息,
// receivedRoutingKey-路由key,
// deliveryTag-投遞唯一標(biāo)記(從1開(kāi)始遞增, 每次消費(fèi)者重啟后, 繼續(xù)從1開(kāi)始)
// consumerTag-當(dāng)前消費(fèi)者唯一標(biāo)記(每個(gè)消費(fèi)者都有自己的唯一標(biāo)記,每次消費(fèi)者重連后,生成新的標(biāo)記)
// consumerQueue-當(dāng)前消費(fèi)者收到消息的隊(duì)列
log.info("messageProperties:{}", message.getMessageProperties());
String body = new String(message.getBody());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 以下的原意是指: 方法本身的作用, 并不是在確認(rèn)模式是自動(dòng)確認(rèn)下調(diào)用這些方法的作用
if ("1".equals(body)) {
// 原意: 確認(rèn), 不批量
//(channel會(huì)shutdown, 然后會(huì)重新連接, 然后消息被刪除)
channel.basicAck(deliveryTag, false);
}
else if ("2".equals(body)) {
// 原意: 拒絕, 重新入隊(duì)
//(channel會(huì)shutdown, 然后會(huì)重新連接, 消息會(huì)重新入隊(duì),
// 然后消費(fèi)者再次收到此消息, 然后不斷循環(huán), 并且中間的過(guò)程會(huì)拋出異常)
channel.basicReject(deliveryTag, true);
}
else if ("3".equals(body)) {
// 原意: 拒絕, 不重新入隊(duì)(broker將會(huì)刪除此消息,
// 如果該隊(duì)列還綁定了死信交換機(jī),那么會(huì)發(fā)往此交換機(jī))
//(收到1次消息后, channel會(huì)shutdown, 然后會(huì)重新連接, 不會(huì)再次收到該消息,
// 因?yàn)橄⒁呀?jīng)被刪除了)
channel.basicReject(deliveryTag, false);
}
else if ("4".equals(body)) {
// 原意: 拒絕, 不批量, 重新入隊(duì)(會(huì)再次投遞給消費(fèi)者)
//(與2表現(xiàn)幾乎一致)
channel.basicNack(deliveryTag, false, true);
}
else if ("5".equals(body)) {
// 原意: 拒絕, 批量, 重新入隊(duì)(對(duì)于之前未確認(rèn)的消息,批量拒絕并重新入隊(duì))
//(與2表現(xiàn)幾乎一致)
channel.basicNack(deliveryTag, true, true);
}
else if ("6".equals(body)) {
// 拋出空指針異常,
//(收到1次消息, 然后這里拋出異常, 然后會(huì)自動(dòng)nack并重新入隊(duì), 然后又收到該消息, 不斷循環(huán))
throw new NullPointerException("666...");
}
else if ("7".equals(body)) {
// 拋出消息轉(zhuǎn)換異常,
//(拋出異常,消息會(huì)被刪除, 并且不會(huì)重新入隊(duì), 不會(huì)死循環(huán)。
// 同basicReject拒絕消息并且設(shè)置不重新入隊(duì))
throw new MessageConversionException("777...");
}
else {
// 模擬正常處理
System.out.println("自動(dòng)確認(rèn)模式正常處理情況...");
}
log.info("處理結(jié)束=====================");
}
}
失敗重試機(jī)制:
當(dāng)消費(fèi)者出現(xiàn)異常后,如果消費(fèi)者設(shè)置的參數(shù)讓此消息再次回到隊(duì)列,那么消息會(huì)requeue(重新入隊(duì))到隊(duì)列,等待投遞,然后就會(huì)再投遞給消費(fèi)者,消費(fèi)者收到該消息后再次異常,由于消費(fèi)者設(shè)置的參數(shù)又讓此消息再次回到隊(duì)列,因此就會(huì)無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力。我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列,嘗試作如下配置來(lái)設(shè)置消費(fèi)者。 server:
port: 8081
spring:
rabbitmq:
host: xx.xx.xx.xx
port: 5672
virtual-host: /demo-vh
username: guest
password: xx
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template: #(消息生產(chǎn)者的配置)
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟消息發(fā)送確認(rèn)機(jī)制
publisher-returns: true # 開(kāi)啟消息return機(jī)制
listener: # (消息消費(fèi)者的配置)
simple:
prefetch: 1 # 每次拉取1個(gè)消息, 處理完成后, 再拉取下1個(gè)消息, 能者多勞
acknowledge-mode: auto # 消費(fèi)者確認(rèn)機(jī)制
## ===========添加失敗重試機(jī)制===========
retry:
enabled: true # 開(kāi)啟消費(fèi)者失敗重試
initial-interval: 1000ms # 初始的失敗等待時(shí)長(zhǎng)為1s
multiplier: 1 # 下次失敗的等待時(shí)長(zhǎng)倍數(shù),
# 下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)(不設(shè)置的話, 默認(rèn)配置的就是3次)
stateless: true # true無(wú)狀態(tài), false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù), 這里改為false
當(dāng)消費(fèi)者添加如上失敗重試機(jī)制前,我們發(fā)現(xiàn)本來(lái)消費(fèi)者的監(jiān)聽(tīng)方法拋出空指針異常,然后一直在不斷的nack消息(并且設(shè)置重新入隊(duì)參數(shù)為true),然后mq又投遞過(guò)來(lái)然后又導(dǎo)致空指針異常,然后又nack消息并重新入隊(duì),然后不斷的死循環(huán)的跑著(同上1個(gè)例子配置確認(rèn)模式為auto,并且監(jiān)聽(tīng)方法中拋出NullPointerException異常的例子)。加上失敗重試機(jī)制的配置后,同樣是在確認(rèn)模式為auto,并且監(jiān)聽(tīng)方法中拋出NullPointerException異常的情況下,發(fā)現(xiàn)消費(fèi)者就拉取了1次消息,然后在本地重試了3次,在這期間mq也并沒(méi)有投遞消息過(guò)來(lái),當(dāng)重試3次都失敗后,此消息從消息隊(duì)列中刪除了(重試次數(shù)耗盡都失敗之后,直接拒絕了該消息,并且不重新入隊(duì))。 失敗消息處理策略:在開(kāi)啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecoverer接口來(lái)處理,它包含三種不同的實(shí)現(xiàn)
實(shí)現(xiàn)方式分類(lèi)
RejectAndDontRequeueRecoverer: 重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式(當(dāng)重試都失敗之后,直接丟棄該消息)ImmediateRequeueMessageRecoverer: 重試耗盡后,返回nack,消息重新入隊(duì)(當(dāng)重試都失敗之后,重新入隊(duì),接著繼續(xù)接收該消息,如果重試都失敗之后,又重新入隊(duì))RepublishMessageRecoverer: 重試耗盡后,將失敗消息投遞到指定的交換機(jī)(當(dāng)重試都失敗之后,發(fā)送到指定的交換機(jī),將此消息路由到與此交換機(jī)所綁定的隊(duì)列) RepublishMessageRecoverer使用示例 示例描述:當(dāng)向direct.queue2發(fā)送1個(gè)payload為7的消息時(shí),消費(fèi)者就會(huì)只拉取1次該消息,并且會(huì)在本地重試3次,如果重試3次都失?。ū纠兄灰⑹?就會(huì)拋出空指針異常)之后,就會(huì)發(fā)送到error.direct交換機(jī),然后根據(jù)路由key路由到error.queue消息隊(duì)列,其中消息的內(nèi)容就是異常棧的字符串,這樣就可以讓人工介入處理。并且使用失敗消息處理策略后,不會(huì)出現(xiàn)無(wú)限制:失敗重試,然后重發(fā),繼續(xù)失敗重試。 server:
port: 8081
spring:
rabbitmq:
host: xx.xx.xx.xx
port: 5672
virtual-host: /demo-vh
username: guest
password: xxx
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟消息發(fā)送確認(rèn)機(jī)制
publisher-returns: true # 開(kāi)啟消息return機(jī)制
listener:
simple:
prefetch: 1 # 每次拉取1個(gè)消息, 處理完成后, 再拉取下1個(gè)消息, 能者多勞
acknowledge-mode: auto # 消費(fèi)者確認(rèn)機(jī)制
## ===========添加失敗重試機(jī)制===========
retry:
enabled: true # 開(kāi)啟消費(fèi)者失敗重試
initial-interval: 1000ms # 初始的失敗等待時(shí)長(zhǎng)為1s
multiplier: 1 # 下次失敗的等待時(shí)長(zhǎng)倍數(shù),
# 下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)(不設(shè)置的話, 默認(rèn)配置的就是3次)
stateless: true # true無(wú)狀態(tài), false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù), 這里改為false
@Slf4j
@Configuration
public class RabbitConfig {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2",durable = "true"),
exchange = @Exchange(value = "direct.exchange2",
type = ExchangeTypes.FANOUT),
key = {"red","blue"}
)
})
public void listenQueue(Message message, String msg, Channel channel) {
log.info("收到消息=====================");
log.info("channel:{}", channel);
log.info("msg:{}", msg);
log.info("message:{},", new String(message.getBody()));
// receivedDeliveryMode-是否持久化的消息,
// redelivered-是否重新投遞的消息,
// receivedRoutingKey-路由key,
// deliveryTag-投遞唯一標(biāo)記(從1開(kāi)始遞增, 每次消費(fèi)者重啟后, 繼續(xù)從1開(kāi)始)
// consumerTag-當(dāng)前消費(fèi)者唯一標(biāo)記(每個(gè)消費(fèi)者都有自己的唯一標(biāo)記,每次消費(fèi)者重連后,生成新的標(biāo)記)
// consumerQueue-當(dāng)前消費(fèi)者收到消息的隊(duì)列
log.info("messageProperties:{}", message.getMessageProperties());
String body = new String(message.getBody());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if ("7".equals(body)) {
// 拋出消息轉(zhuǎn)換異常,
//(拋出異常,消息會(huì)被刪除, 并且不會(huì)重新入隊(duì), 不會(huì)死循環(huán)。
// 同basicReject拒絕消息并且設(shè)置不重新入隊(duì))
throw new MessageConversionException("777...");
}
log.info("處理結(jié)束=====================");
}
}
@Slf4j
@Configuration
// 當(dāng)開(kāi)啟了消費(fèi)者失敗重試時(shí), 當(dāng)前配置類(lèi)才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled",
havingValue = "true")
public class ErrorConfiguration {
// 定義1個(gè)直連交換機(jī)
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error.direct");
}
// 定義1個(gè)消息隊(duì)列
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
// 綁定
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
// 消息消費(fèi)時(shí)重試耗盡并且都失敗(例如: 確認(rèn)模式為auto,并且監(jiān)聽(tīng)方法中拋出NullPointerException異常)時(shí)
// 的后續(xù)處理策略,
// 因?yàn)檫@里返回的是RepublishMessageRecoverer,所以在重試耗盡時(shí)發(fā)送到指定交換機(jī),并攜帶指定的路由key
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
log.debug("加載RepublishMessageRecoverer");
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
業(yè)務(wù)冪等性:通過(guò)以上所有的手段,我們可以保證消息至少被消費(fèi)者消費(fèi)1次。但是由于網(wǎng)絡(luò)波動(dòng)等原因?qū)е孪M(fèi)者消費(fèi)同一消息多次,這個(gè)時(shí)候,就需要保證消息的冪等性。所謂的冪等性指的是,消費(fèi)同一消息多次產(chǎn)生的效果與消費(fèi)該消息1次的效果是相同的,或者說(shuō)對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。
唯一消息id方案:
給每個(gè)消息都設(shè)置一個(gè)唯一id,利用id區(qū)分是否是重復(fù)消息:
每一條消息都生成一個(gè)唯一的id,與消息一起投遞給消費(fèi)者消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫(kù)如果下次又收到相同消息,去數(shù)據(jù)庫(kù)查詢判斷是否存在,存在則為重復(fù)消息放棄處理 使用步驟:消費(fèi)者和生產(chǎn)者中都配置如下的消息轉(zhuǎn)換器,并且設(shè)置createMessageIds屬性為true @Bean
public MessageConverter jacksonMessageConvertor(){
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 1. 設(shè)置此屬性后, 會(huì)在使用rabbitTemplate發(fā)送消息時(shí),
// 當(dāng)未設(shè)置消息屬性MessageProperties#messageId時(shí),對(duì)消息對(duì)象的MessageProperties的messageId設(shè)
// 置1個(gè)uuid值, 用來(lái)作為這條消息的標(biāo)識(shí)。
// 2. 當(dāng)然也可以在使用rabbitTemplate發(fā)送消息時(shí), 指定1個(gè)MessagePostProcessor,
// 來(lái)設(shè)置MessageProperties#messageId的值
jjmc.setCreateMessageIds(true);
return jjmc;
}
業(yè)務(wù)判斷
結(jié)合業(yè)務(wù)邏輯,基于業(yè)務(wù)本身作判斷
以我們的業(yè)務(wù)為例:我們要在支付后修改訂單狀態(tài)為已支付,應(yīng)該在修改訂單狀態(tài)前先查詢訂單狀態(tài),判斷狀態(tài)是否是未支付。只有未支付訂單才需要修改,其它狀態(tài)不做處理: @Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "mark.order.pay.queue", durable = "true"),
exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
key = "pay.success"
))
public void listenOrderPay(Long orderId) {
/*
// 1.查詢訂單
Order order = orderService.getById(orderId);
// 2.判斷訂單狀態(tài)是否為未支付
if(order == null || order.getStatus() != 1){
// 訂單不存在,或者狀態(tài)異常
return;
}
// 3.如果未支付,標(biāo)記訂單狀態(tài)為已支付
orderService.markOrderPaySuccess(orderId);
*/
// 其實(shí)可以使用下面的一步搞定(類(lèi)似于樂(lè)觀鎖機(jī)制)
// update order set status = 2 where id = ? AND status = 1
orderService.lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
}
如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性?
首先,支付服務(wù)會(huì)正在用戶支付成功以后利用MQ消息通知交易服務(wù)完成訂單狀態(tài)同步。其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞和處理的可靠性。同時(shí)也開(kāi)啟了MQ的持久化,避免因服務(wù)宕機(jī)導(dǎo)致消息丟失最后,我們還在交易服務(wù)更新訂單狀態(tài)時(shí)做了業(yè)務(wù)冪等判斷,避免因消息重復(fù)消費(fèi)導(dǎo)致訂單狀態(tài)異常。 如果交易服務(wù)消息處理失敗,有沒(méi)有什么兜底方案?
我們可以在交易服務(wù)設(shè)置定時(shí)任務(wù),定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。 延遲消息:勝場(chǎng)這發(fā)送消息時(shí),指定1個(gè)時(shí)間,消費(fèi)者不會(huì)立刻收到消息,而是在指定時(shí)間之后才收到消息
死信交換機(jī)方案
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),就會(huì)成為死信 (dead letter)
消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false消息是一個(gè)過(guò)期消息(達(dá)到了隊(duì)列或消息本身設(shè)置的過(guò)期時(shí)間),超時(shí)無(wú)人消費(fèi)要投遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信 如果隊(duì)列通過(guò)dead-letter-exchange屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中。這個(gè)交換機(jī)稱為該隊(duì)列的死信交換機(jī) (Dead Letter Exchange,簡(jiǎn)稱DLX) 示例 配置如下: server:
port: 8081
spring:
rabbitmq:
host: xx.xx.xx.xx
port: 5672
virtual-host: /demo-vh
username: guest
password: xxx
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟消息發(fā)送確認(rèn)機(jī)制
publisher-returns: true # 開(kāi)啟消息return機(jī)制
listener:
simple:
prefetch: 1 # 每次拉取1個(gè)消息, 處理完成后, 再拉取下1個(gè)消息, 能者多勞
acknowledge-mode: auto # 消費(fèi)者確認(rèn)機(jī)制
## ===========添加失敗重試機(jī)制===========
retry:
enabled: true # 開(kāi)啟消費(fèi)者失敗重試
initial-interval: 1000ms # 初始的失敗等待時(shí)長(zhǎng)為1s
multiplier: 1 # 下次失敗的等待時(shí)長(zhǎng)倍數(shù),
# 下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)(不設(shè)置的話, 默認(rèn)配置的就是3次)
stateless: true # true無(wú)狀態(tài), false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù), 這里改為false
代碼如下 @Slf4j
@Configuration
public class RabbitConfig {
/* 死信處理的交換機(jī)、隊(duì)列、綁定等定義 */
@Bean
public org.springframework.amqp.core.Exchange dlxExchange() {
Exchange exchange = ExchangeBuilder.directExchange("dlx.directExchange")
.durable(true)
.build();
return exchange;
}
@Bean
public org.springframework.amqp.core.Queue dlxQueue() {
Queue queue = QueueBuilder.durable("dlx.queue").build();
return queue;
}
@Bean
public Binding dlxExAndQueueBinding() {
// 建立 dlx.directExchange交換機(jī) 到 dlx.queue隊(duì)列 的綁定關(guān)系, 并指定路由key為red
Binding binding = BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with("red")
.noargs();
return binding;
}
/* 讓消息成為死信的交換機(jī)、隊(duì)列、綁定等定義 */
// 當(dāng)向direct.timedExchange交換機(jī)發(fā)送消息,并且攜帶red作為路由key,那么此消息會(huì)被路由到direct.queue隊(duì)列
// 并且, 當(dāng)這個(gè)消息設(shè)置了過(guò)期時(shí)間(通過(guò)設(shè)置MessageProperties#expiration屬性), 同時(shí)direct.queue又沒(méi)有消費(fèi)者,
// 那么, 當(dāng)?shù)搅诉^(guò)期時(shí)間時(shí), 這個(gè)消息會(huì)被發(fā)送到該隊(duì)列所綁定的死信交換機(jī), 并攜帶原消息原來(lái)的路由key,
// 然后, 我們?cè)谙旅娴谋O(jiān)聽(tīng)方法中監(jiān)聽(tīng)死信隊(duì)列
@Bean
public org.springframework.amqp.core.Exchange directTimedExchange() {
Exchange exchange = ExchangeBuilder.directExchange("direct.timedExchange")
.durable(true)
.build();
return exchange;
}
@Bean
public org.springframework.amqp.core.Queue directQueue() {
org.springframework.amqp.core.Queue queue = QueueBuilder.durable("direct.queue")
// 通過(guò)設(shè)置參數(shù), 來(lái)指定該隊(duì)列的死信交換機(jī)
.withArgument("x-dead-letter-exchange", "dlx.directExchange")
.build();
return queue;
}
@Bean
public Binding exAndQueueBinding() {
// 建立 direct.timedExchange 交換機(jī) 到 direct.queue 隊(duì)列 的綁定關(guān)系, 并指定路由key為red
Binding binding = BindingBuilder.bind(directQueue())
.to(directTimedExchange())
.with("red")
.noargs();
return binding;
}
/* 監(jiān)聽(tīng)死信隊(duì)列 */
@RabbitListener(queues = {"dlx.queue"})
public void handleDlxMsg(Message message) {
log.info("收到消息=====================");
// 可以在此處觀察日志的輸出時(shí)間, 和消息的數(shù)據(jù)(我設(shè)置的消息的數(shù)據(jù)就是消息的發(fā)送時(shí)間)
log.info("message:{},", new String(message.getBody()));
}
}
@Slf4j
@RequestMapping("rabbit")
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("orderMsg")
public Object orderMsg(String expiration, String exchange, String routeKey) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
String content = sdf.format(new Date());
rabbitTemplate.convertAndSend(exchange, routeKey, content, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設(shè)置消息過(guò)期時(shí)間
message.getMessageProperties().setExpiration(expiration);
return message;
}
});
return "ok";
}
}
測(cè)試步驟: 第一步:發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=10000&exchange=direct.timedExchange&routeKey=red,發(fā)現(xiàn)確實(shí)是在10秒后收到消息 第二步:發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=5000&exchange=direct.timedExchange&routeKey=red,發(fā)現(xiàn)確實(shí)是在5秒后收到消息 第三步:發(fā)送完http://localhost:8081/rabbit/orderMsg?expiration=5000&exchange=direct.timedExchange&routeKey=red,接著隔1-2秒發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=10000&exchange=direct.timedExchange&routeKey=red,發(fā)現(xiàn)1個(gè)是在5s后收到消息,1個(gè)是在10s后收到消息 第四步:發(fā)送完http://localhost:8081/rabbit/orderMsg?expiration=10000&exchange=direct.timedExchange&routeKey=red,接著隔1-2秒發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=5000&exchange=direct.timedExchange&routeKey=red,2個(gè)消息都是隔10s才收到的消息 這足以證明如果采取這種方案是有問(wèn)題的,必須是處于消息隊(duì)列頂端的消息隊(duì)列到期時(shí),才會(huì)立馬進(jìn)入死信隊(duì)列。所以如果要用這種方案的話,最好是分超時(shí)隊(duì)列,不同的超時(shí)時(shí)間發(fā)送的不同的隊(duì)列,這樣就能保證,最先進(jìn)入隊(duì)列的消息先超時(shí),后面的消息也都能正常延遲消費(fèi)。 延遲插件
介紹
RabbitMO的官方也推出了一個(gè)插件,原生支持延遲消息功能。該插件的原理是設(shè)計(jì)了一種支持延遲消息功能的交換機(jī),當(dāng)消息投遞到交換機(jī)后可以在交換機(jī)中暫存一定時(shí)間,到期后再投遞到隊(duì)列。下載rabbitmq延遲消息插件地址:rabbitmq-delayed-message-exchangerabbitmq官方文檔中對(duì)延遲消息插件的介紹及使用說(shuō)明github上README.md關(guān)于延遲消息插件的安裝使用說(shuō)明 安裝
官方介紹的步驟
第一步,先下載延遲消息插件第二步,如果需要找到rabbitmq的插件目錄,可以執(zhí)行:rabbitmq-plugins directories -s第三步,將下載的延遲消息插件復(fù)制到插件目錄第四步,執(zhí)行開(kāi)啟插件命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange 自己的安裝步驟(使用docker安裝rabbitmq時(shí),未掛載插件目錄),由于自己之前安裝rabbitmq之前沒(méi)有將插件目錄掛載出來(lái),所以步驟不一樣
docker exec -it rabbitmq /bin/bash進(jìn)入到rabbitmq容器中,然后在當(dāng)前的 / 目錄下有個(gè)plugins目錄,進(jìn)入可以看到很多.ez結(jié)尾的插件docker cp ./rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez 037a6fed1d41:/plugins/,將當(dāng)前的延遲消息插件復(fù)制到rabbitmq容器中的/plugins目錄中docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange,在容器外部執(zhí)行此命令,讓rabbitmq容器啟用此插件 docker已掛載插件目錄的rabbitmq容器安裝步驟
安裝rabbitmq docker run \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management
docker inspect mq查看,可以看到在Mounts節(jié)點(diǎn)中,已將source掛載到了容器中的Destination(即/plugins目錄中),然后將.ez的延遲消息插件拷貝到source所代表的文件位置即可 docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange,在容器外部執(zhí)行此命令,讓rabbitmq容器啟用此插件 使用:使用的時(shí)候,只需要在聲明交換機(jī)時(shí),設(shè)置此交換機(jī)的delayed屬性為true即可(spring-amqp包須>=1.6),而在發(fā)送消息的時(shí)候,需要設(shè)置MessageProperties#setDelay(Integer)傳入需要延遲的時(shí)間,單位:毫秒,其實(shí)就是設(shè)置x-delay頭。 代碼如下: server:
port: 8081
spring:
rabbitmq:
host: xx
port: 5672
virtual-host: /demo-vh
username: guest
password: xx
connection-timeout: 1s # 設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重連機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù)
max-attempts: 3 # 最大重連次數(shù)
publisher-confirms: true # 開(kāi)啟消息發(fā)送確認(rèn)機(jī)制
publisher-returns: true # 開(kāi)啟消息return機(jī)制
listener:
simple:
prefetch: 1 # 每次拉取1個(gè)消息, 處理完成后, 再拉取下1個(gè)消息, 能者多勞
acknowledge-mode: auto # 消費(fèi)者確認(rèn)機(jī)制
## ===========添加失敗重試機(jī)制===========
retry:
enabled: true # 開(kāi)啟消費(fèi)者失敗重試
initial-interval: 1000ms # 初始的失敗等待時(shí)長(zhǎng)為1s
multiplier: 1 # 下次失敗的等待時(shí)長(zhǎng)倍數(shù),
# 下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)(不設(shè)置的話, 默認(rèn)配置的就是3次)
stateless: true # true無(wú)狀態(tài), false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù), 這里改為false
@Slf4j
@Configuration
public class RabbitConfig {
/* 第1個(gè)延遲隊(duì)列的相關(guān)交換機(jī)、隊(duì)列、綁定、監(jiān)聽(tīng)方法定義 */
@RabbitListener(bindings = {
@QueueBinding(exchange = @Exchange(name = "dly.direct.ex",delayed = "true",durable = "true"),
key = "dly",
value = @Queue(name = "dly.queue", durable = "true")
)
})
public void listenDelayedMsg(Message message, String msg) {
log.info("收到消息=====================1111");
// 可以在此處觀察日志的輸出時(shí)間, 和消息的數(shù)據(jù)(我設(shè)置的消息的數(shù)據(jù)就是消息的發(fā)送時(shí)間)
log.info("message: {}, msg: {}", new String(message.getBody()), msg);
}
/* 第2個(gè)延遲隊(duì)列的相關(guān)交換機(jī)、隊(duì)列、綁定、監(jiān)聽(tīng)方法定義 */
@Bean
public org.springframework.amqp.core.Exchange dly2DirectExchange() {
return ExchangeBuilder.directExchange("dly2.direct.ex").delayed().build();
}
@Bean
public org.springframework.amqp.core.Queue dly2Queue() {
return QueueBuilder.durable("dly2.queue").build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(dly2Queue()).to(dly2DirectExchange()).with("dly2").noargs();
}
@RabbitListener(queues = {"dly2.queue"})
public void listenDelayedMsg2(Message message, String msg) {
log.info("收到消息=====================2222");
// 可以在此處觀察日志的輸出時(shí)間, 和消息的數(shù)據(jù)(我設(shè)置的消息的數(shù)據(jù)就是消息的發(fā)送時(shí)間)
log.info("message: {}, msg: {}", new String(message.getBody()), msg);
}
}
@Slf4j
@RequestMapping("rabbit")
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("delayMsg")
public Object delayMsg(Integer delayTimeMillis, String exchange, String routeKey) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
String content = sdf.format(new Date());
/* 發(fā)送消息至指定的交換機(jī), 并指定路由key, 注意延遲消息須如下設(shè)置延遲時(shí)間 */
rabbitTemplate.convertAndSend(exchange, routeKey, content, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設(shè)置延遲時(shí)間
message.getMessageProperties().setDelay(delayTimeMillis);
return message;
}
});
return "ok";
}
}
測(cè)試: 第1個(gè)延遲消息測(cè)試: http://localhost:8081/rabbit/delayMsg?delayTimeMillis=5000&exchange=dly.direct.ex&routeKey=dly, http://localhost:8081/rabbit/delayMsg?delayTimeMillis=10000&exchange=dly.direct.ex&routeKey=dly 第2個(gè)延遲消息測(cè)試: http://localhost:8081/rabbit/delayMsg?delayTimeMillis=5000&exchange=dly2.direct.ex&routeKey=dly2, http://localhost:8081/rabbit/delayMsg?delayTimeMillis=10000&exchange=dly2.direct.ex&routeKey=dly2 總結(jié):這種延遲消息都是需要消耗性能的,每來(lái)1個(gè)延遲消息,它都需要在mq的內(nèi)部維護(hù)1個(gè)時(shí)鐘,時(shí)鐘的運(yùn)行需要CPU不斷的運(yùn)算。當(dāng)延遲消息很多的時(shí)候,對(duì)CPU的占用就越高。而延遲消息指定的延遲時(shí)間設(shè)置的過(guò)長(zhǎng),就會(huì)給CPU造成額外的壓力。因此,延遲消息適用于指定延遲的時(shí)間較短的消息。 延遲消息優(yōu)化:上面,我們說(shuō)到延遲消息適用于指定延遲的時(shí)間較短的消息。針對(duì)延遲時(shí)間較長(zhǎng)的消息,我們可以對(duì)延遲消息做個(gè)優(yōu)化,將1個(gè)長(zhǎng)時(shí)間的延遲消息拆分成若干個(gè)一小段一小段時(shí)間的延遲消息,然后針對(duì)業(yè)務(wù)做邏輯。 這個(gè)就是待發(fā)送的消息,data是數(shù)據(jù),delayMillis中維護(hù)了一堆的時(shí)間段序列,每次要發(fā)消息到mq時(shí),先從這個(gè)時(shí)間段序列中獲取該時(shí)間段序列作為消息的延遲時(shí)間 @Data
public class MultiDelayMessage
/**
* 消息體
*/
private T data;
/**
* 記錄延遲時(shí)間的集合
*/
private List
public MultiDelayMessage(T data, List
this.data = data;
this.delayMillis = delayMillis;
}
public static
return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
}
/**
* 獲取并移除下一個(gè)延遲時(shí)間
* @return 隊(duì)列中的第一個(gè)延遲時(shí)間
*/
public Long removeNextDelay(){
return delayMillis.remove(0);
}
/**
* 是否還有下一個(gè)延遲時(shí)間
*/
public boolean hasNextDelay(){
return !delayMillis.isEmpty();
}
}
下單成功后,我們發(fā)送第1個(gè)延遲消息到rabbitmq中, // 1.訂單數(shù)據(jù)
// 2.保存訂單詳情
// 3.扣減庫(kù)存
// 4.清理購(gòu)物車(chē)商品
// 5.延遲檢測(cè)訂單狀態(tài)消息
try {
MultiDelayMessage
10000L, 10000L, 10000L,
15000L, 15000L, 30000L, 30000L);
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE,
MqConstants.DELAY_ORDER_ROUTING_KEY,
msg,
new DelayMessageProcessor(msg.removeNextDelay().intValue())
);
} catch (AmqpException e) {
log.error("延遲消息發(fā)送異常!", e);
}
監(jiān)聽(tīng)延遲消息交換機(jī)所綁定的隊(duì)列,當(dāng)接收到的消息體中,還存在時(shí)間段序列時(shí),繼續(xù)發(fā),如果不存在了,那就說(shuō)明所有的時(shí)間都消耗了 @Component
@RequiredArgsConstructor
public class OrderStatusCheckListener {
private final IOrderService orderService;
private final RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE,
delayed = "true",
type = ExchangeTypes.TOPIC),
key = MqConstants.DELAY_ORDER_ROUTING_KEY
))
public void listenOrderDelayMessage(MultiDelayMessage
// 1.查詢訂單狀態(tài)
Order order = orderService.getById(msg.getData());
// 2.判斷是否已經(jīng)支付
if (order == null || order.getStatus() == 2) {
// 訂單不存在或者已經(jīng)被處理
return;
}
// TODO 3.去支付服務(wù)查詢真正的支付狀態(tài)
boolean isPay = false;
// 3.1.已支付,標(biāo)記訂單狀態(tài)為已支付
if (isPay) {
orderService.markOrderPaySuccess(order.getId());
return;
}
// 4.判斷是否存在延遲時(shí)間
if (msg.hasNextDelay()) {
// 4.1.存在,重發(fā)延遲消息
Long nextDelay = msg.removeNextDelay();
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY,
msg, new DelayMessageProcessor(nextDelay.intValue()));
return;
}
// 5.不存在,取消訂單
orderService.cancelOrder(order.getId());
}
}
柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMq學(xué)習(xí)
參考鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。