柚子快報(bào)激活碼778899分享:分布式 RabbitMQ 高級(jí)
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ 高級(jí)
在昨天的練習(xí)作業(yè)中,我們改造了余額支付功能,在支付成功后利用RabbitMQ通知交易服務(wù),更新業(yè)務(wù)訂單狀態(tài)為已支付。 但是大家思考一下,如果這里MQ通知失敗,支付服務(wù)中支付流水顯示支付成功,而交易服務(wù)中的訂單狀態(tài)卻顯示未支付,數(shù)據(jù)出現(xiàn)了不一致。 此時(shí)前端發(fā)送請(qǐng)求查詢(xún)支付狀態(tài)時(shí),肯定是查詢(xún)交易服務(wù)狀態(tài),會(huì)發(fā)現(xiàn)業(yè)務(wù)訂單未支付,而用戶(hù)自己知道已經(jīng)支付成功,這就導(dǎo)致用戶(hù)體驗(yàn)不一致。
因此,這里我們必須盡可能確保MQ消息的可靠性,即:消息應(yīng)該至少被消費(fèi)者處理1次 那么問(wèn)題來(lái)了:
我們?cè)撊绾未_保MQ消息的可靠性?如果真的發(fā)送失敗,有沒(méi)有其它的兜底方案?
這些問(wèn)題,在今天的學(xué)習(xí)中都會(huì)找到答案。
1.發(fā)送者的可靠性
首先,我們一起分析一下消息丟失的可能性有哪些。 消息從發(fā)送者發(fā)送消息,到消費(fèi)者處理消息,需要經(jīng)過(guò)的流程是這樣的: 消息從生產(chǎn)者到消費(fèi)者的每一步都可能導(dǎo)致消息丟失:
發(fā)送消息時(shí)丟失:
生產(chǎn)者發(fā)送消息時(shí)連接MQ失敗生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue消息到達(dá)MQ后,處理消息的進(jìn)程發(fā)生異常 MQ導(dǎo)致消息丟失:
消息到達(dá)MQ,保存到隊(duì)列后,尚未消費(fèi)就突然宕機(jī) 消費(fèi)者處理消息時(shí):
消息接收后尚未處理突然宕機(jī)消息接收后處理過(guò)程中拋出異常
綜上,我們要解決消息丟失問(wèn)題,保證MQ的可靠性,就必須從3個(gè)方面入手:
確保生產(chǎn)者一定把消息發(fā)送到MQ確保MQ不會(huì)將消息弄丟確保消費(fèi)者一定要處理消息
這一章我們先來(lái)看如何確保生產(chǎn)者一定能把消息發(fā)送到MQ。
1.1.生產(chǎn)者重試機(jī)制
首先第一種情況,就是生產(chǎn)者發(fā)送消息時(shí),出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。
為了解決這個(gè)問(wèn)題,SpringAMQP提供的消息發(fā)送時(shí)的重試機(jī)制。即:當(dāng)RabbitTemplate與MQ連接超時(shí)后,多次重試。
修改publisher模塊的application.yaml文件,添加下面的內(nèi)容:
spring:
rabbitmq:
connection-timeout: 1s # 設(shè)置MQ的連接超時(shí)時(shí)間
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重試機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multiplier
max-attempts: 3 # 最大重試次數(shù)
我們利用命令停掉RabbitMQ服務(wù):
docker stop mq
然后測(cè)試發(fā)送一條消息,會(huì)發(fā)現(xiàn)會(huì)每隔1秒重試1次,總共重試了3次。消息發(fā)送的超時(shí)重試機(jī)制配置成功了!
:::warning 注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過(guò)SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說(shuō)多次重試等待的過(guò)程中,當(dāng)前線(xiàn)程是被阻塞的。 如果對(duì)于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請(qǐng)合理配置等待時(shí)長(zhǎng)和重試次數(shù),當(dāng)然也可以考慮使用異步線(xiàn)程來(lái)執(zhí)行發(fā)送消息的代碼。 :::
1.2.生產(chǎn)者確認(rèn)機(jī)制
一般情況下,只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會(huì)出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無(wú)需考慮這種問(wèn)題。 不過(guò),在少數(shù)情況下,也會(huì)出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:
MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue,因此無(wú)法路由
針對(duì)上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher Confirm和Publisher Return兩種。在開(kāi)啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會(huì)根據(jù)消息處理的情況返回不同的回執(zhí)。 具體如圖所示: 總結(jié)如下:
當(dāng)消息投遞到MQ,但是路由失敗時(shí),通過(guò)Publisher Return返回異常信息,同時(shí)返回ack的確認(rèn)信息,代表投遞成功臨時(shí)消息投遞到了MQ,并且入隊(duì)成功,返回ACK,告知投遞成功持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回ACK ,告知投遞成功其它情況都會(huì)返回NACK,告知投遞失敗
其中ack和nack屬于Publisher Confirm機(jī)制,ack是投遞成功;nack是投遞失敗。而return則屬于Publisher Return機(jī)制。 默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過(guò)配置文件來(lái)開(kāi)啟。
1.3.實(shí)現(xiàn)生產(chǎn)者確認(rèn)
1.3.1.開(kāi)啟生產(chǎn)者確認(rèn)
在publisher模塊的application.yaml中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 開(kāi)啟publisher confirm機(jī)制,并設(shè)置confirm類(lèi)型
publisher-returns: true # 開(kāi)啟publisher return機(jī)制
這里publisher-confirm-type有三種模式可選:
none:關(guān)閉confirm機(jī)制simple:同步阻塞等待MQ的回執(zhí)correlated:MQ異步回調(diào)返回回執(zhí)
一般我們推薦使用correlated,回調(diào)機(jī)制。
1.3.2.定義ReturnCallback
每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此我們可以在配置類(lèi)中統(tǒng)一設(shè)置。我們?cè)趐ublisher模塊定義一個(gè)配置類(lèi): 內(nèi)容如下:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("觸發(fā)return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
1.3.3.定義ConfirmCallback
由于每個(gè)消息發(fā)送時(shí)的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時(shí)定義。具體來(lái)說(shuō),是在調(diào)用RabbitTemplate中的convertAndSend方法時(shí),多傳遞一個(gè)參數(shù): 這里的CorrelationData中包含兩個(gè)核心的東西:
id:消息的唯一標(biāo)示,MQ對(duì)不同的消息的回執(zhí)以此做判斷,避免混淆SettableListenableFuture:回執(zhí)結(jié)果的Future對(duì)象
將來(lái)MQ的回執(zhí)就會(huì)通過(guò)這個(gè)Future來(lái)返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來(lái)處理消息回執(zhí):
我們新建一個(gè)測(cè)試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback:
@Test
void testPublisherConfirm() {
// 1.創(chuàng)建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.給Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback
@Override
public void onFailure(Throwable ex) {
// 2.1.Future發(fā)生異常時(shí)的處理邏輯,基本不會(huì)觸發(fā)
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容
if(result.isAck()){ // result.isAck(),boolean類(lèi)型,true代表ack回執(zhí),false 代表 nack回執(zhí)
log.debug("發(fā)送消息成功,收到 ack!");
}else{ // result.getReason(),String類(lèi)型,返回nack時(shí)的異常描述
log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.發(fā)送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
執(zhí)行結(jié)果如下: 可以看到,由于傳遞的RoutingKey是錯(cuò)誤的,路由失敗后,觸發(fā)了return callback,同時(shí)也收到了ack。 當(dāng)我們修改為正確的RoutingKey以后,就不會(huì)觸發(fā)return callback了,只收到ack。 而如果連交換機(jī)都是錯(cuò)誤的,則只會(huì)收到nack。
:::warning 注意: 開(kāi)啟生產(chǎn)者確認(rèn)比較消耗MQ性能,一般不建議開(kāi)啟。而且大家思考一下觸發(fā)確認(rèn)的幾種情況:
路由失敗:一般是因?yàn)镽outingKey錯(cuò)誤導(dǎo)致,往往是編程導(dǎo)致交換機(jī)名稱(chēng)錯(cuò)誤:同樣是編程錯(cuò)誤導(dǎo)致MQ內(nèi)部故障:這種需要處理,但概率往往較低。因此只有對(duì)消息可靠性要求非常高的業(yè)務(wù)才需要開(kāi)啟,而且僅僅需要開(kāi)啟ConfirmCallback處理nack就可以了。 :::
2.MQ的可靠性
消息到達(dá)MQ以后,如果MQ不能及時(shí)保存,也會(huì)導(dǎo)致消息丟失,所以MQ的可靠性也非常重要。
2.1.數(shù)據(jù)持久化
為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲(chǔ)的臨時(shí)數(shù)據(jù),重啟后就會(huì)消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:
交換機(jī)持久化隊(duì)列持久化消息持久化
我們以控制臺(tái)界面為例來(lái)說(shuō)明。
2.1.1.交換機(jī)持久化
在控制臺(tái)的Exchanges頁(yè)面,添加交換機(jī)時(shí)可以配置交換機(jī)的Durability參數(shù): 設(shè)置為Durable就是持久化模式,Transient就是臨時(shí)模式。
2.1.2.隊(duì)列持久化
在控制臺(tái)的Queues頁(yè)面,添加隊(duì)列時(shí),同樣可以配置隊(duì)列的Durability參數(shù): 除了持久化以外,你可以看到隊(duì)列還有很多其它參數(shù),有一些我們會(huì)在后期學(xué)習(xí)。
2.1.3.消息持久化
在控制臺(tái)發(fā)送消息的時(shí)候,可以添加很多參數(shù),而消息的持久化是要配置一個(gè)properties:
:::warning 說(shuō)明:在開(kāi)啟持久化機(jī)制以后,如果同時(shí)還開(kāi)啟了生產(chǎn)者確認(rèn),那么MQ會(huì)在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。 不過(guò)出于性能考慮,為了減少I(mǎi)O次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫(kù)的,而是每隔一段時(shí)間批量持久化。一般間隔在100毫秒左右,這就會(huì)導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。 :::
2.2.LazyQueue
在默認(rèn)情況下,RabbitMQ會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會(huì)導(dǎo)致消息積壓,比如:
消費(fèi)者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障消息發(fā)送量激增,超過(guò)了消費(fèi)者處理速度消費(fèi)者處理業(yè)務(wù)發(fā)生阻塞
一旦出現(xiàn)消息堆積問(wèn)題,RabbitMQ的內(nèi)存占用就會(huì)越來(lái)越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時(shí)RabbitMQ會(huì)將內(nèi)存消息刷到磁盤(pán)上,這個(gè)行為成為PageOut. PageOut會(huì)耗費(fèi)一段時(shí)間,并且會(huì)阻塞隊(duì)列進(jìn)程。因此在這個(gè)過(guò)程中RabbitMQ不會(huì)再處理新的消息,生產(chǎn)者的所有請(qǐng)求都會(huì)被阻塞。
為了解決這個(gè)問(wèn)題,從RabbitMQ的3.6.0版本開(kāi)始,就增加了Lazy Queues的模式,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
接收到消息后直接存入磁盤(pán)而非內(nèi)存消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存(也就是懶加載)支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)
而在3.12版本之后,LazyQueue已經(jīng)成為所有隊(duì)列的默認(rèn)格式。因此官方推薦升級(jí)MQ為3.12版本或者所有隊(duì)列都設(shè)置為L(zhǎng)azyQueue模式。
2.2.1.控制臺(tái)配置Lazy模式
在添加隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)即可設(shè)置隊(duì)列為L(zhǎng)azy模式:
2.2.2.代碼配置Lazy模式
在利用SpringAMQP聲明隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)也可設(shè)置隊(duì)列為L(zhǎng)azy模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 開(kāi)啟Lazy模式
.build();
}
這里是通過(guò)QueueBuilder的lazy()函數(shù)配置Lazy模式,底層源碼如下:
當(dāng)然,我們也可以基于注解來(lái)聲明隊(duì)列并設(shè)置為L(zhǎng)azy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
2.2.3.更新已有隊(duì)列為lazy模式
對(duì)于已經(jīng)存在的隊(duì)列,也可以配置為lazy模式,但是要通過(guò)設(shè)置policy實(shí)現(xiàn)。 可以基于命令行設(shè)置policy:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
rabbitmqctl :RabbitMQ的命令行工具set_policy :添加一個(gè)策略L(fǎng)azy :策略名稱(chēng),可以自定義"^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字'{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式--apply-to queues:策略的作用對(duì)象,是所有的隊(duì)列
當(dāng)然,也可以在控制臺(tái)配置policy,進(jìn)入在控制臺(tái)的Admin頁(yè)面,點(diǎn)擊Policies,即可添加配置:
3.消費(fèi)者的可靠性
當(dāng)RabbitMQ向消費(fèi)者投遞消息以后,需要知道消費(fèi)者的處理狀態(tài)如何。因?yàn)橄⑼哆f給消費(fèi)者并不代表就一定被正確消費(fèi)了,可能出現(xiàn)的故障有很多,比如:
消息投遞的過(guò)程中出現(xiàn)了網(wǎng)絡(luò)故障消費(fèi)者接收到消息后突然宕機(jī)消費(fèi)者接收到消息后,因處理不當(dāng)導(dǎo)致異?!?/p>
一旦發(fā)生上述情況,消息也會(huì)丟失。因此,RabbitMQ必須知道消費(fèi)者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。 但問(wèn)題來(lái)了:RabbitMQ如何得知消費(fèi)者的處理狀態(tài)呢?
本章我們就一起研究一下消費(fèi)者處理消息時(shí)的可靠性解決方案。
2.1.消費(fèi)者確認(rèn)機(jī)制
為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:
ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息nack:消息處理失敗,RabbitMQ需要再次投遞消息reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息
一般reject方式用的較少,除非是消息格式有問(wèn)題,那就是開(kāi)發(fā)問(wèn)題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過(guò)try catch機(jī)制捕獲,消息處理成功時(shí)返回ack,處理失敗時(shí)返回nack.
由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實(shí)現(xiàn)了消息確認(rèn)。并允許我們通過(guò)配置文件設(shè)置ACK處理方式,有三種模式:
**none**:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用**manual**:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活**auto**:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack;如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject;
返回Reject的常見(jiàn)異常有:
Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message but Message is received.java.lang.NoSuchMethodException: Added in version 1.6.3.java.lang.ClassCastException: Added in version 1.6.3.
通過(guò)下面的配置可以修改SpringAMQP的ACK處理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做處理
修改consumer服務(wù)的SpringRabbitListener類(lèi)中的方法,模擬一個(gè)消息處理的異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息處理完成");
}
測(cè)試可以發(fā)現(xiàn):當(dāng)消息處理發(fā)生異常時(shí),消息依然被RabbitMQ刪除了。
我們?cè)俅伟汛_認(rèn)機(jī)制修改為auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自動(dòng)ack
在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài)): 放行以后,由于拋出的是消息轉(zhuǎn)換異常,因此Spring會(huì)自動(dòng)返回reject,所以消息依然會(huì)被刪除:
我們將異常改為RuntimeException類(lèi)型:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
if (true) {
throw new RuntimeException("故意的");
}
log.info("消息處理完成");
}
在異常位置打斷點(diǎn),然后再次發(fā)送消息測(cè)試,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài)): 放行以后,由于拋出的是業(yè)務(wù)異常,所以Spring返回ack,最終消息恢復(fù)至Ready狀態(tài),并且沒(méi)有被RabbitMQ刪除: 當(dāng)我們把配置改為auto時(shí),消息處理失敗后,會(huì)回到RabbitMQ,并重新投遞到消費(fèi)者。
2.2.失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者。如果消費(fèi)者再次執(zhí)行依然出錯(cuò),消息會(huì)再次requeue到隊(duì)列,再次投遞,直到消息處理成功為止。 極端情況就是消費(fèi)者一直無(wú)法執(zhí)行成功,那么消息requeue就會(huì)無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:
當(dāng)然,上述極端情況發(fā)生的概率還是非常低的,不過(guò)不怕一萬(wàn)就怕萬(wàn)一。為了應(yīng)對(duì)上述情況Spring又提供了消費(fèi)者失敗重試機(jī)制:在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。
修改consumer服務(wù)的application.yml文件,添加內(nèi)容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開(kāi)啟消費(fèi)者失敗重試
initial-interval: 1000ms # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒
multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟consumer服務(wù),重復(fù)之前的測(cè)試。可以發(fā)現(xiàn):
消費(fèi)者在失敗后消息沒(méi)有重新回到MQ無(wú)限重新投遞,而是在本地重試了3次本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是reject
結(jié)論:
開(kāi)啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄
2.3.失敗處理策略
在之前的測(cè)試中,本地測(cè)試達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄。這在某些對(duì)于消息可靠性要求較高的業(yè)務(wù)場(chǎng)景下,顯然不太合適了。 因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個(gè)策略是由MessageRecovery接口來(lái)定義的,它有3個(gè)不同實(shí)現(xiàn):
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專(zhuān)門(mén)存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代碼如下:
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
2.4.業(yè)務(wù)冪等性
何為冪等性? 冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來(lái)描述是這樣的:f(x) = f(f(x)),例如求絕對(duì)值函數(shù)。 在程序開(kāi)發(fā)中,則是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。例如:
根據(jù)id刪除數(shù)據(jù)查詢(xún)數(shù)據(jù)新增數(shù)據(jù)
但數(shù)據(jù)的更新往往不是冪等的,如果重復(fù)執(zhí)行可能造成不一樣的后果。比如:
取消訂單,恢復(fù)庫(kù)存的業(yè)務(wù)。如果多次恢復(fù)就會(huì)出現(xiàn)庫(kù)存重復(fù)增加的情況退款業(yè)務(wù)。重復(fù)退款對(duì)商家而言會(huì)有經(jīng)濟(jì)損失。
所以,我們要盡可能避免業(yè)務(wù)被重復(fù)執(zhí)行。 然而在實(shí)際業(yè)務(wù)場(chǎng)景中,由于意外經(jīng)常會(huì)出現(xiàn)業(yè)務(wù)被重復(fù)執(zhí)行的情況,例如:
頁(yè)面卡頓時(shí)頻繁刷新導(dǎo)致表單重復(fù)提交服務(wù)間調(diào)用的重試MQ消息的重復(fù)投遞
我們?cè)谟脩?hù)支付成功后會(huì)發(fā)送MQ消息到交易服務(wù),修改訂單狀態(tài)為已支付,就可能出現(xiàn)消息重復(fù)投遞的情況。如果消費(fèi)者不做判斷,很有可能導(dǎo)致消息被消費(fèi)多次,出現(xiàn)業(yè)務(wù)故障。 舉例:
假如用戶(hù)剛剛支付完成,并且投遞消息到交易服務(wù),交易服務(wù)更改訂單為已支付狀態(tài)。由于某種原因,例如網(wǎng)絡(luò)故障導(dǎo)致生產(chǎn)者沒(méi)有得到確認(rèn),隔了一段時(shí)間后重新投遞給交易服務(wù)。但是,在新投遞的消息被消費(fèi)之前,用戶(hù)選擇了退款,將訂單狀態(tài)改為了已退款狀態(tài)。退款完成后,新投遞的消息才被消費(fèi),那么訂單狀態(tài)會(huì)被再次改為已支付。業(yè)務(wù)異常。
因此,我們必須想辦法保證消息處理的冪等性。這里給出兩種方案:
唯一消息ID業(yè)務(wù)狀態(tài)判斷
2.4.1.唯一消息ID
這個(gè)思路非常簡(jiǎn)單:
每一條消息都生成一個(gè)唯一的id,與消息一起投遞給消費(fèi)者。消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫(kù)如果下次又收到相同消息,去數(shù)據(jù)庫(kù)查詢(xún)判斷是否存在,存在則為重復(fù)消息放棄處理。
我們?cè)撊绾谓o消息添加唯一ID呢? 其實(shí)很簡(jiǎn)單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開(kāi)啟這個(gè)功能即可。 以Jackson的消息轉(zhuǎn)換器為例:
@Bean
public MessageConverter messageConverter(){
// 1.定義消息轉(zhuǎn)換器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
2.4.2.業(yè)務(wù)判斷
業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來(lái)判斷是否是重復(fù)的請(qǐng)求或消息,不同的業(yè)務(wù)場(chǎng)景判斷的思路也不一樣。 例如我們當(dāng)前案例中,處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時(shí)判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經(jīng)被處理過(guò),無(wú)需重復(fù)處理。
相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫(kù),所以我更推薦使用業(yè)務(wù)判斷的方案。
以支付修改訂單的業(yè)務(wù)為例,我們需要修改OrderServiceImpl中的markOrderPaySuccess方法:
@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查詢(xún)訂單
Order old = getById(orderId);
// 2.判斷訂單狀態(tài)
if (old == null || old.getStatus() != 1) {
// 訂單不存在或者訂單狀態(tài)不是1,放棄處理
return;
}
// 3.嘗試更新訂單
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}
上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動(dòng)作,因此在極小概率下可能存在線(xiàn)程安全問(wèn)題。
我們可以合并上述操作為這樣:
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
注意看,上述代碼等同于這樣的SQL語(yǔ)句:
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
我們?cè)趙here條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說(shuō)明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會(huì)執(zhí)行。
2.5.兜底方案
雖然我們利用各種機(jī)制盡可能增加了消息的可靠性,但也不好說(shuō)能保證消息100%的可靠。萬(wàn)一真的MQ通知失敗該怎么辦呢? 有沒(méi)有其它兜底方案,能夠確保訂單的支付狀態(tài)一致呢?
其實(shí)思想很簡(jiǎn)單:既然MQ通知不一定發(fā)送到交易服務(wù),那么交易服務(wù)就必須自己主動(dòng)去查詢(xún)支付狀態(tài)。這樣即便支付服務(wù)的MQ通知失敗,我們依然能通過(guò)主動(dòng)查詢(xún)來(lái)保證訂單狀態(tài)的一致。 流程如下:
圖中黃色線(xiàn)圈起來(lái)的部分就是MQ通知失敗后的兜底處理方案,由交易服務(wù)自己主動(dòng)去查詢(xún)支付狀態(tài)。
不過(guò)需要注意的是,交易服務(wù)并不知道用戶(hù)會(huì)在什么時(shí)候支付,如果查詢(xún)的時(shí)機(jī)不正確(比如查詢(xún)的時(shí)候用戶(hù)正在支付中),可能查詢(xún)到的支付狀態(tài)也不正確。 那么問(wèn)題來(lái)了,我們到底該在什么時(shí)間主動(dòng)查詢(xún)支付狀態(tài)呢?
這個(gè)時(shí)間是無(wú)法確定的,因此,通常我們采取的措施就是利用定時(shí)任務(wù)定期查詢(xún),例如每隔20秒就查詢(xún)一次,并判斷支付狀態(tài)。如果發(fā)現(xiàn)訂單已經(jīng)支付,則立刻更新訂單狀態(tài)為已支付即可。 定時(shí)任務(wù)大家之前學(xué)習(xí)過(guò),具體的實(shí)現(xiàn)這里就不再贅述了。
至此,消息可靠性的問(wèn)題已經(jīng)解決了。
綜上,支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性是如何保證的?
首先,支付服務(wù)會(huì)正在用戶(hù)支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞的可靠性最后,我們還在交易服務(wù)設(shè)置了定時(shí)任務(wù),定期查詢(xún)訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。
4.延遲消息
在電商的支付業(yè)務(wù)中,對(duì)于一些庫(kù)存有限的商品,為了更好的用戶(hù)體驗(yàn),通常都會(huì)在用戶(hù)下單時(shí)立刻扣減商品庫(kù)存。例如電影院購(gòu)票、高鐵購(gòu)票,下單后就會(huì)鎖定座位資源,其他人無(wú)法重復(fù)購(gòu)買(mǎi)。
但是這樣就存在一個(gè)問(wèn)題,假如用戶(hù)下單后一直不付款,就會(huì)一直占有庫(kù)存資源,導(dǎo)致其他客戶(hù)無(wú)法正常交易,最終導(dǎo)致商戶(hù)利益受損!
因此,電商中通常的做法就是:對(duì)于超過(guò)一定時(shí)間未支付的訂單,應(yīng)該立刻取消訂單并釋放占用的庫(kù)存。
例如,訂單支付超時(shí)時(shí)間為30分鐘,則我們應(yīng)該在用戶(hù)下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應(yīng)該立刻取消訂單,釋放庫(kù)存。
但問(wèn)題來(lái)了:如何才能準(zhǔn)確的實(shí)現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢?
像這種在一段時(shí)間以后才執(zhí)行的任務(wù),我們稱(chēng)之為延遲任務(wù),而要實(shí)現(xiàn)延遲任務(wù),最簡(jiǎn)單的方案就是利用MQ的延遲消息了。
在RabbitMQ中實(shí)現(xiàn)延遲消息也有兩種方案:
死信交換機(jī)+TTL延遲消息插件
這一章我們就一起研究下這兩種方案的實(shí)現(xiàn)方式,以及優(yōu)缺點(diǎn)。
4.1.死信交換機(jī)和延遲消息
首先我們來(lái)學(xué)習(xí)一下基于死信交換機(jī)的延遲消息方案。
4.1.1.死信交換機(jī)
什么是死信?
當(dāng)一個(gè)隊(duì)列中的消息滿(mǎn)足下列情況之一時(shí),可以成為死信(dead letter):
消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi)要投遞的隊(duì)列消息滿(mǎn)了,無(wú)法投遞
如果一個(gè)隊(duì)列中的消息已經(jīng)成為死信,并且這個(gè)隊(duì)列通過(guò)**dead-letter-exchange**屬性指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)就稱(chēng)為死信交換機(jī)(Dead Letter Exchange)。而此時(shí)加入有隊(duì)列與死信交換機(jī)綁定,則最終死信就會(huì)被投遞到這個(gè)隊(duì)列中。
死信交換機(jī)有什么作用呢?
收集那些因處理失敗而被拒絕的消息收集那些因隊(duì)列滿(mǎn)了而被拒絕的消息收集因TTL(有效期)到期的消息
4.1.2.延遲消息
前面兩種作用場(chǎng)景可以看做是把死信交換機(jī)當(dāng)做一種消息處理的最終兜底方案,與消費(fèi)者重試時(shí)講的RepublishMessageRecoverer作用類(lèi)似。
而最后一種場(chǎng)景,大家設(shè)想一下這樣的場(chǎng)景: 如圖,有一組綁定的交換機(jī)(ttl.fanout)和隊(duì)列(ttl.queue)。但是ttl.queue沒(méi)有消費(fèi)者監(jiān)聽(tīng),而是設(shè)定了死信交換機(jī)hmall.direct,而隊(duì)列direct.queue1則與死信交換機(jī)綁定,RoutingKey是blue:
假如我們現(xiàn)在發(fā)送一條消息到ttl.fanout,RoutingKey為blue,并設(shè)置消息的有效期為5000毫秒: :::warning 注意:盡管這里的ttl.fanout不需要RoutingKey,但是當(dāng)消息變?yōu)樗佬挪⑼哆f到死信交換機(jī)時(shí),會(huì)沿用之前的RoutingKey,這樣hmall.direct才能正確路由消息。 :::
消息肯定會(huì)被投遞到ttl.queue之后,由于沒(méi)有消費(fèi)者,因此消息無(wú)人消費(fèi)。5秒之后,消息的有效期到期,成為死信: 死信被再次投遞到死信交換機(jī)hmall.direct,并沿用之前的RoutingKey,也就是blue: 由于direct.queue1與hmall.direct綁定的key是blue,因此最終消息被成功路由到direct.queue1,如果此時(shí)有消費(fèi)者與direct.queue1綁定, 也就能成功消費(fèi)消息了。但此時(shí)已經(jīng)是5秒鐘以后了: 也就是說(shuō),publisher發(fā)送了一條消息,但最終consumer在5秒后才收到消息。我們成功實(shí)現(xiàn)了延遲消息。
4.1.3.總結(jié)
:::warning 注意: RabbitMQ的消息過(guò)期是基于追溯方式來(lái)實(shí)現(xiàn)的,也就是說(shuō)當(dāng)一個(gè)消息的TTL到期以后不一定會(huì)被移除或投遞到死信交換機(jī),而是在消息恰好處于隊(duì)首時(shí)才會(huì)被處理。 當(dāng)隊(duì)列中消息堆積很多的時(shí)候,過(guò)期消息可能不會(huì)被按時(shí)處理,因此你設(shè)置的TTL時(shí)間不一定準(zhǔn)確。 :::
4.2.DelayExchange插件
基于死信隊(duì)列雖然可以實(shí)現(xiàn)延遲消息,但是太麻煩了。因此RabbitMQ社區(qū)提供了一個(gè)延遲消息插件來(lái)實(shí)現(xiàn)相同的效果。 官方文檔說(shuō)明: Scheduling Messages with RabbitMQ | RabbitMQ - Blog
4.2.1.下載
插件下載地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ 由于我們安裝的MQ是3.8版本,因此這里下載3.8.17版本: 當(dāng)然,也可以直接使用課前資料提供好的插件:
4.2.2.安裝
因?yàn)槲覀兪腔贒ocker安裝,所以需要先查看RabbitMQ的插件目錄對(duì)應(yīng)的數(shù)據(jù)卷。
docker volume inspect mq-plugins
結(jié)果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data這個(gè)目錄,我們上傳插件到該目錄下。
接下來(lái)執(zhí)行命令,安裝插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
運(yùn)行結(jié)果如下:
4.2.3.聲明延遲交換機(jī)
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延遲消息:{}", msg);
}
基于@Bean的方式:
package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交換機(jī)類(lèi)型和名稱(chēng)
.delayed() // 設(shè)置delay的屬性為true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
4.2.4.發(fā)送延遲消息
發(fā)送消息時(shí),必須通過(guò)x-delay屬性設(shè)定延遲時(shí)間:
@Test
void testPublisherDelayMessage() {
// 1.創(chuàng)建消息
String message = "hello, delayed message";
// 2.發(fā)送消息,利用消息后置處理器添加消息頭
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延遲消息屬性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
:::warning 注意: 延遲消息插件內(nèi)部會(huì)維護(hù)一個(gè)本地?cái)?shù)據(jù)庫(kù)表,同時(shí)使用Elang Timers功能實(shí)現(xiàn)計(jì)時(shí)。如果消息的延遲時(shí)間設(shè)置較長(zhǎng),可能會(huì)導(dǎo)致堆積的延遲消息非常多,會(huì)帶來(lái)較大的CPU開(kāi)銷(xiāo),同時(shí)延遲消息的時(shí)間會(huì)存在誤差。 因此,不建議設(shè)置延遲時(shí)間過(guò)長(zhǎng)的延遲消息。 :::
4.5.訂單狀態(tài)同步問(wèn)題
接下來(lái),我們就在交易服務(wù)中利用延遲消息實(shí)現(xiàn)訂單支付狀態(tài)的同步。其大概思路如下:
假如訂單超時(shí)支付時(shí)間為30分鐘,理論上說(shuō)我們應(yīng)該在下單時(shí)發(fā)送一條延遲消息,延遲時(shí)間為30分鐘。這樣就可以在接收到消息時(shí)檢驗(yàn)訂單支付狀態(tài),關(guān)閉未支付訂單。 但是大多數(shù)情況下用戶(hù)支付都會(huì)在1分鐘內(nèi)完成,我們發(fā)送的消息卻要在MQ中停留30分鐘,額外消耗了MQ的資源。因此,我們最好多檢測(cè)幾次訂單支付狀態(tài),而不是在最后第30分鐘才檢測(cè)。 例如:我們?cè)谟脩?hù)下單后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、…30分分別設(shè)置延遲消息,如果提前發(fā)現(xiàn)訂單已經(jīng)支付,則后續(xù)的檢測(cè)取消即可。 這樣就可以有效避免對(duì)MQ資源的浪費(fèi)了。
優(yōu)化后的實(shí)現(xiàn)思路如下:
由于我們要多次發(fā)送延遲消息,因此需要先定義一個(gè)記錄消息延遲時(shí)間的消息體,處于通用性考慮,我們將其定義到hm-common模塊下: 代碼如下:
package com.hmall.common.domain;
import com.hmall.common.utils.CollUtils;
import lombok.Data;
import java.util.List;
@Data
public class MultiDelayMessage
/**
* 消息體
*/
private T data;
/**
* 記錄延遲時(shí)間的集合
*/
private List
public MultiDelayMessage(T data, List
this.data = data;
this.delayMillis = delayMillis;
}
public static
return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
}
/**
* 獲取并移除下一個(gè)延遲時(shí)間
* @return 隊(duì)列中的第一個(gè)延遲時(shí)間
*/
public Long removeNextDelay(){
return delayMillis.remove(0);
}
/**
* 是否還有下一個(gè)延遲時(shí)間
*/
public boolean hasNextDelay(){
return !delayMillis.isEmpty();
}
}
4.5.1.定義常量
無(wú)論是消息發(fā)送還是接收都是在交易服務(wù)完成,因此我們?cè)趖rade-service中定義一個(gè)常量類(lèi),用于記錄交換機(jī)、隊(duì)列、RoutingKey等常量: 內(nèi)容如下:
package com.hmall.trade.constants;
public interface MqConstants {
String DELAY_EXCHANGE = "trade.delay.topic";
String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
String DELAY_ORDER_ROUTING_KEY = "order.query";
}
4.5.2.抽取共享mq配置
我們將mq的配置抽取到nacos中,方便各個(gè)微服務(wù)共享配置。 在nacos中定義一個(gè)名為shared-mq.xml的配置文件,內(nèi)容如下:
spring:
rabbitmq:
host: ${hm.mq.host:192.168.150.101} # 主機(jī)名
port: ${hm.mq.port:5672} # 端口
virtual-host: ${hm.mq.vhost:/hmall} # 虛擬主機(jī)
username: ${hm.mq.un:hmall} # 用戶(hù)名
password: ${hm.mq.pw:123} # 密碼
listener:
simple:
prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息
這里只添加一些基礎(chǔ)配置,至于生產(chǎn)者確認(rèn),消費(fèi)者確認(rèn)配置則由微服務(wù)根據(jù)業(yè)務(wù)自己決定。
在trade-service模塊添加共享配置:
4.5.3.改造下單業(yè)務(wù)
接下來(lái),我們改造下單業(yè)務(wù),在下單完成后,發(fā)送延遲消息,查詢(xún)支付狀態(tài)。
1)引入依賴(lài) 在trade-service模塊的pom.xml中引入amqp的依賴(lài):
2)改造下單業(yè)務(wù) 修改trade-service模塊的com.hmall.trade.service.impl.OrderServiceImpl類(lèi)的createOrder方法,添加消息發(fā)送的代碼:
4.5.4.編寫(xiě)查詢(xún)支付狀態(tài)接口
由于MQ消息處理時(shí)需要查詢(xún)支付狀態(tài),因此我們要在pay-service模塊定義一個(gè)這樣的接口,并提供對(duì)應(yīng)的FeignClient. 首先,在hm-api模塊定義三個(gè)類(lèi): 說(shuō)明:
PayOrderDTO:支付單的數(shù)據(jù)傳輸實(shí)體PayClient:支付系統(tǒng)的Feign客戶(hù)端PayClientFallback:支付系統(tǒng)的fallback邏輯
PayOrderDTO代碼如下:
package com.hmall.api.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDateTime;
/**
*
* 支付訂單
*
*/
@Data
@ApiModel(description = "支付單數(shù)據(jù)傳輸實(shí)體")
public class PayOrderDTO {
@ApiModelProperty("id")
private Long id;
@ApiModelProperty("業(yè)務(wù)訂單號(hào)")
private Long bizOrderNo;
@ApiModelProperty("支付單號(hào)")
private Long payOrderNo;
@ApiModelProperty("支付用戶(hù)id")
private Long bizUserId;
@ApiModelProperty("支付渠道編碼")
private String payChannelCode;
@ApiModelProperty("支付金額,單位分")
private Integer amount;
@ApiModelProperty("付類(lèi)型,1:h5,2:小程序,3:公眾號(hào),4:掃碼,5:余額支付")
private Integer payType;
@ApiModelProperty("付狀態(tài),0:待提交,1:待支付,2:支付超時(shí)或取消,3:支付成功")
private Integer status;
@ApiModelProperty("拓展字段,用于傳遞不同渠道單獨(dú)處理的字段")
private String expandJson;
@ApiModelProperty("第三方返回業(yè)務(wù)碼")
private String resultCode;
@ApiModelProperty("第三方返回提示信息")
private String resultMsg;
@ApiModelProperty("支付成功時(shí)間")
private LocalDateTime paySuccessTime;
@ApiModelProperty("支付超時(shí)時(shí)間")
private LocalDateTime payOverTime;
@ApiModelProperty("支付二維碼鏈接")
private String qrCodeUrl;
@ApiModelProperty("創(chuàng)建時(shí)間")
private LocalDateTime createTime;
@ApiModelProperty("更新時(shí)間")
private LocalDateTime updateTime;
}
PayClient代碼如下:
package com.hmall.api.client;
import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
/**
* 根據(jù)交易訂單id查詢(xún)支付單
* @param id 業(yè)務(wù)訂單id
* @return 支付單信息
*/
@GetMapping("/pay-orders/biz/{id}")
PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
PayClientFallback代碼如下:
package com.hmall.api.client.fallback;
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;
@Slf4j
public class PayClientFallback implements FallbackFactory
@Override
public PayClient create(Throwable cause) {
return new PayClient() {
@Override
public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
return null;
}
};
}
}
最后,在pay-service模塊的PayController中實(shí)現(xiàn)該接口:
@ApiOperation("根據(jù)id查詢(xún)支付單")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}
4.5.5.消息監(jiān)聽(tīng)
接下來(lái),我們?cè)趖rader-service編寫(xiě)一個(gè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)延遲消息,查詢(xún)訂單支付狀態(tài): 代碼如下:
package com.hmall.trade.listener;
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.common.domain.MultiDelayMessage;
import com.hmall.trade.constants.MqConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {
private final IOrderService orderService;
private final PayClient payClient;
private final RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),
key = MqConstants.DELAY_ORDER_ROUTING_KEY
))
public void listenOrderCheckDelayMessage(MultiDelayMessage
// 1.獲取消息中的訂單id
Long orderId = msg.getData();
// 2.查詢(xún)訂單,判斷狀態(tài):1是未支付,大于1則是已支付或已關(guān)閉
Order order = orderService.getById(orderId);
if (order == null || order.getStatus() > 1) {
// 訂單不存在或交易已經(jīng)結(jié)束,放棄處理
return;
}
// 3.可能是未支付,查詢(xún)支付服務(wù)
PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
if (payOrder != null && payOrder.getStatus() == 3) {
// 支付成功,更新訂單狀態(tài)
orderService.markOrderPaySuccess(orderId);
return;
}
// 4.確定未支付,判斷是否還有剩余延遲時(shí)間
if (msg.hasNextDelay()) {
// 4.1.有延遲時(shí)間,需要重發(fā)延遲消息,先獲取延遲時(shí)間的int值
int delayVal = msg.removeNextDelay().intValue();
// 4.2.發(fā)送延遲消息
rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
message -> {
message.getMessageProperties().setDelay(delayVal);
return message;
});
return;
}
// 5.沒(méi)有剩余延遲時(shí)間了,說(shuō)明訂單超時(shí)未支付,需要取消訂單
orderService.cancelOrder(orderId);
}
}
注意,這里要在OrderServiceImpl中實(shí)現(xiàn)cancelOrder方法,留作作業(yè)大家自行實(shí)現(xiàn)。
5.作業(yè)
5.1.取消訂單
在處理超時(shí)未支付訂單時(shí),如果發(fā)現(xiàn)訂單確實(shí)超時(shí)未支付,最終需要關(guān)閉該訂單。 關(guān)閉訂單需要完成兩件事情:
將訂單狀態(tài)修改為已關(guān)閉恢復(fù)訂單中已經(jīng)扣除的庫(kù)存
這部分功能尚未實(shí)現(xiàn)。 大家要在IOrderService接口中定義cancelOrder方法:
void cancelOrder(Long orderId);
并且在OrderServiceImpl中實(shí)現(xiàn)該方法。實(shí)現(xiàn)過(guò)程中要注意業(yè)務(wù)冪等性判斷。
5.2.抽取MQ工具
MQ在企業(yè)開(kāi)發(fā)中的常見(jiàn)應(yīng)用我們就學(xué)習(xí)完畢了,除了收發(fā)消息以外,消息可靠性的處理、生產(chǎn)者確認(rèn)、消費(fèi)者確認(rèn)、延遲消息等等編碼還是相對(duì)比較復(fù)雜的。 因此,我們需要將這些常用的操作封裝為工具,方便在項(xiàng)目中使用。要求如下:
在hm-commom模塊下編寫(xiě)發(fā)送消息的工具類(lèi)RabbitMqHelper定義一個(gè)自動(dòng)配置類(lèi)MqConsumeErrorAutoConfiguration,內(nèi)容包括:
聲明一個(gè)交換機(jī),名為error.direct,類(lèi)型為direct聲明一個(gè)隊(duì)列,名為:微服務(wù)名 + error.queue,也就是說(shuō)要?jiǎng)討B(tài)獲取將隊(duì)列與交換機(jī)綁定,綁定時(shí)的RoutingKey就是微服務(wù)名聲明RepublishMessageRecoverer,消費(fèi)失敗消息投遞到上述交換機(jī)給配置類(lèi)添加條件,當(dāng)spring.rabbitmq.listener.simple.retry.enabled為true時(shí)觸發(fā)
RabbitMqHelper的結(jié)構(gòu)如下:
public class RabbitMqHelper {
private final RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, Object msg){
}
public void sendDelayMessage(String exchange, String routingKey, Object msg, int delay){
}
public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, int maxRetries){
}
}
5.3.改造業(yè)務(wù)
利用你編寫(xiě)的工具,改造支付服務(wù)、購(gòu)物車(chē)服務(wù)、交易服務(wù)中消息發(fā)送功能,并且添加消息確認(rèn)或消費(fèi)者重試機(jī)制,確保消息的可靠性。
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ 高級(jí)
文章來(lái)源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀(guān)點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。