欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ 高級(jí)

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ 高級(jí)

http://yzkb.51969.com/

在昨天的練習(xí)作業(yè)中,我們改造了余額支付功能,在支付成功后利用RabbitMQ通知交易服務(wù),更新業(yè)務(wù)訂單狀態(tài)為已支付。 但是大家思考一下,如果這里MQ通知失敗,支付服務(wù)中支付流水顯示支付成功,而交易服務(wù)中的訂單狀態(tài)卻顯示未支付,數(shù)據(jù)出現(xiàn)了不一致。 此時(shí)前端發(fā)送請(qǐng)求查詢(xún)支付狀態(tài)時(shí),肯定是查詢(xún)交易服務(wù)狀態(tài),會(huì)發(fā)現(xiàn)業(yè)務(wù)訂單未支付,而用戶(hù)自己知道已經(jīng)支付成功,這就導(dǎo)致用戶(hù)體驗(yàn)不一致。

因此,這里我們必須盡可能確保MQ消息的可靠性,即:消息應(yīng)該至少被消費(fèi)者處理1次 那么問(wèn)題來(lái)了:

我們?cè)撊绾未_保MQ消息的可靠性?如果真的發(fā)送失敗,有沒(méi)有其它的兜底方案?

這些問(wèn)題,在今天的學(xué)習(xí)中都會(huì)找到答案。

1.發(fā)送者的可靠性

首先,我們一起分析一下消息丟失的可能性有哪些。 消息從發(fā)送者發(fā)送消息,到消費(fèi)者處理消息,需要經(jīng)過(guò)的流程是這樣的: 消息從生產(chǎn)者到消費(fèi)者的每一步都可能導(dǎo)致消息丟失:

發(fā)送消息時(shí)丟失:

生產(chǎn)者發(fā)送消息時(shí)連接MQ失敗生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue消息到達(dá)MQ后,處理消息的進(jìn)程發(fā)生異常 MQ導(dǎo)致消息丟失:

消息到達(dá)MQ,保存到隊(duì)列后,尚未消費(fèi)就突然宕機(jī) 消費(fèi)者處理消息時(shí):

消息接收后尚未處理突然宕機(jī)消息接收后處理過(guò)程中拋出異常

綜上,我們要解決消息丟失問(wèn)題,保證MQ的可靠性,就必須從3個(gè)方面入手:

確保生產(chǎn)者一定把消息發(fā)送到MQ確保MQ不會(huì)將消息弄丟確保消費(fèi)者一定要處理消息

這一章我們先來(lái)看如何確保生產(chǎn)者一定能把消息發(fā)送到MQ。

1.1.生產(chǎn)者重試機(jī)制

首先第一種情況,就是生產(chǎn)者發(fā)送消息時(shí),出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。

為了解決這個(gè)問(wèn)題,SpringAMQP提供的消息發(fā)送時(shí)的重試機(jī)制。即:當(dāng)RabbitTemplate與MQ連接超時(shí)后,多次重試。

修改publisher模塊的application.yaml文件,添加下面的內(nèi)容:

spring:

rabbitmq:

connection-timeout: 1s # 設(shè)置MQ的連接超時(shí)時(shí)間

template:

retry:

enabled: true # 開(kāi)啟超時(shí)重試機(jī)制

initial-interval: 1000ms # 失敗后的初始等待時(shí)間

multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multiplier

max-attempts: 3 # 最大重試次數(shù)

我們利用命令停掉RabbitMQ服務(wù):

docker stop mq

然后測(cè)試發(fā)送一條消息,會(huì)發(fā)現(xiàn)會(huì)每隔1秒重試1次,總共重試了3次。消息發(fā)送的超時(shí)重試機(jī)制配置成功了!

:::warning 注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過(guò)SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說(shuō)多次重試等待的過(guò)程中,當(dāng)前線(xiàn)程是被阻塞的。 如果對(duì)于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請(qǐng)合理配置等待時(shí)長(zhǎng)和重試次數(shù),當(dāng)然也可以考慮使用異步線(xiàn)程來(lái)執(zhí)行發(fā)送消息的代碼。 :::

1.2.生產(chǎn)者確認(rèn)機(jī)制

一般情況下,只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會(huì)出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無(wú)需考慮這種問(wèn)題。 不過(guò),在少數(shù)情況下,也會(huì)出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:

MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue,因此無(wú)法路由

針對(duì)上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher Confirm和Publisher Return兩種。在開(kāi)啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會(huì)根據(jù)消息處理的情況返回不同的回執(zhí)。 具體如圖所示: 總結(jié)如下:

當(dāng)消息投遞到MQ,但是路由失敗時(shí),通過(guò)Publisher Return返回異常信息,同時(shí)返回ack的確認(rèn)信息,代表投遞成功臨時(shí)消息投遞到了MQ,并且入隊(duì)成功,返回ACK,告知投遞成功持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回ACK ,告知投遞成功其它情況都會(huì)返回NACK,告知投遞失敗

其中ack和nack屬于Publisher Confirm機(jī)制,ack是投遞成功;nack是投遞失敗。而return則屬于Publisher Return機(jī)制。 默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過(guò)配置文件來(lái)開(kāi)啟。

1.3.實(shí)現(xiàn)生產(chǎn)者確認(rèn)

1.3.1.開(kāi)啟生產(chǎn)者確認(rèn)

在publisher模塊的application.yaml中添加配置:

spring:

rabbitmq:

publisher-confirm-type: correlated # 開(kāi)啟publisher confirm機(jī)制,并設(shè)置confirm類(lèi)型

publisher-returns: true # 開(kāi)啟publisher return機(jī)制

這里publisher-confirm-type有三種模式可選:

none:關(guān)閉confirm機(jī)制simple:同步阻塞等待MQ的回執(zhí)correlated:MQ異步回調(diào)返回回執(zhí)

一般我們推薦使用correlated,回調(diào)機(jī)制。

1.3.2.定義ReturnCallback

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此我們可以在配置類(lèi)中統(tǒng)一設(shè)置。我們?cè)趐ublisher模塊定義一個(gè)配置類(lèi): 內(nèi)容如下:

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j

@AllArgsConstructor

@Configuration

public class MqConfig {

private final RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {

@Override

public void returnedMessage(ReturnedMessage returned) {

log.error("觸發(fā)return callback,");

log.debug("exchange: {}", returned.getExchange());

log.debug("routingKey: {}", returned.getRoutingKey());

log.debug("message: {}", returned.getMessage());

log.debug("replyCode: {}", returned.getReplyCode());

log.debug("replyText: {}", returned.getReplyText());

}

});

}

}

1.3.3.定義ConfirmCallback

由于每個(gè)消息發(fā)送時(shí)的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時(shí)定義。具體來(lái)說(shuō),是在調(diào)用RabbitTemplate中的convertAndSend方法時(shí),多傳遞一個(gè)參數(shù): 這里的CorrelationData中包含兩個(gè)核心的東西:

id:消息的唯一標(biāo)示,MQ對(duì)不同的消息的回執(zhí)以此做判斷,避免混淆SettableListenableFuture:回執(zhí)結(jié)果的Future對(duì)象

將來(lái)MQ的回執(zhí)就會(huì)通過(guò)這個(gè)Future來(lái)返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來(lái)處理消息回執(zhí):

我們新建一個(gè)測(cè)試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback:

@Test

void testPublisherConfirm() {

// 1.創(chuàng)建CorrelationData

CorrelationData cd = new CorrelationData();

// 2.給Future添加ConfirmCallback

cd.getFuture().addCallback(new ListenableFutureCallback() {

@Override

public void onFailure(Throwable ex) {

// 2.1.Future發(fā)生異常時(shí)的處理邏輯,基本不會(huì)觸發(fā)

log.error("send message fail", ex);

}

@Override

public void onSuccess(CorrelationData.Confirm result) {

// 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容

if(result.isAck()){ // result.isAck(),boolean類(lèi)型,true代表ack回執(zhí),false 代表 nack回執(zhí)

log.debug("發(fā)送消息成功,收到 ack!");

}else{ // result.getReason(),String類(lèi)型,返回nack時(shí)的異常描述

log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason());

}

}

});

// 3.發(fā)送消息

rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);

}

執(zhí)行結(jié)果如下: 可以看到,由于傳遞的RoutingKey是錯(cuò)誤的,路由失敗后,觸發(fā)了return callback,同時(shí)也收到了ack。 當(dāng)我們修改為正確的RoutingKey以后,就不會(huì)觸發(fā)return callback了,只收到ack。 而如果連交換機(jī)都是錯(cuò)誤的,則只會(huì)收到nack。

:::warning 注意: 開(kāi)啟生產(chǎn)者確認(rèn)比較消耗MQ性能,一般不建議開(kāi)啟。而且大家思考一下觸發(fā)確認(rèn)的幾種情況:

路由失敗:一般是因?yàn)镽outingKey錯(cuò)誤導(dǎo)致,往往是編程導(dǎo)致交換機(jī)名稱(chēng)錯(cuò)誤:同樣是編程錯(cuò)誤導(dǎo)致MQ內(nèi)部故障:這種需要處理,但概率往往較低。因此只有對(duì)消息可靠性要求非常高的業(yè)務(wù)才需要開(kāi)啟,而且僅僅需要開(kāi)啟ConfirmCallback處理nack就可以了。 :::

2.MQ的可靠性

消息到達(dá)MQ以后,如果MQ不能及時(shí)保存,也會(huì)導(dǎo)致消息丟失,所以MQ的可靠性也非常重要。

2.1.數(shù)據(jù)持久化

為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲(chǔ)的臨時(shí)數(shù)據(jù),重啟后就會(huì)消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:

交換機(jī)持久化隊(duì)列持久化消息持久化

我們以控制臺(tái)界面為例來(lái)說(shuō)明。

2.1.1.交換機(jī)持久化

在控制臺(tái)的Exchanges頁(yè)面,添加交換機(jī)時(shí)可以配置交換機(jī)的Durability參數(shù): 設(shè)置為Durable就是持久化模式,Transient就是臨時(shí)模式。

2.1.2.隊(duì)列持久化

在控制臺(tái)的Queues頁(yè)面,添加隊(duì)列時(shí),同樣可以配置隊(duì)列的Durability參數(shù): 除了持久化以外,你可以看到隊(duì)列還有很多其它參數(shù),有一些我們會(huì)在后期學(xué)習(xí)。

2.1.3.消息持久化

在控制臺(tái)發(fā)送消息的時(shí)候,可以添加很多參數(shù),而消息的持久化是要配置一個(gè)properties:

:::warning 說(shuō)明:在開(kāi)啟持久化機(jī)制以后,如果同時(shí)還開(kāi)啟了生產(chǎn)者確認(rèn),那么MQ會(huì)在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。 不過(guò)出于性能考慮,為了減少I(mǎi)O次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫(kù)的,而是每隔一段時(shí)間批量持久化。一般間隔在100毫秒左右,這就會(huì)導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。 :::

2.2.LazyQueue

在默認(rèn)情況下,RabbitMQ會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會(huì)導(dǎo)致消息積壓,比如:

消費(fèi)者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障消息發(fā)送量激增,超過(guò)了消費(fèi)者處理速度消費(fèi)者處理業(yè)務(wù)發(fā)生阻塞

一旦出現(xiàn)消息堆積問(wèn)題,RabbitMQ的內(nèi)存占用就會(huì)越來(lái)越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時(shí)RabbitMQ會(huì)將內(nèi)存消息刷到磁盤(pán)上,這個(gè)行為成為PageOut. PageOut會(huì)耗費(fèi)一段時(shí)間,并且會(huì)阻塞隊(duì)列進(jìn)程。因此在這個(gè)過(guò)程中RabbitMQ不會(huì)再處理新的消息,生產(chǎn)者的所有請(qǐng)求都會(huì)被阻塞。

為了解決這個(gè)問(wèn)題,從RabbitMQ的3.6.0版本開(kāi)始,就增加了Lazy Queues的模式,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:

接收到消息后直接存入磁盤(pán)而非內(nèi)存消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存(也就是懶加載)支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)

而在3.12版本之后,LazyQueue已經(jīng)成為所有隊(duì)列的默認(rèn)格式。因此官方推薦升級(jí)MQ為3.12版本或者所有隊(duì)列都設(shè)置為L(zhǎng)azyQueue模式。

2.2.1.控制臺(tái)配置Lazy模式

在添加隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)即可設(shè)置隊(duì)列為L(zhǎng)azy模式:

2.2.2.代碼配置Lazy模式

在利用SpringAMQP聲明隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)也可設(shè)置隊(duì)列為L(zhǎng)azy模式:

@Bean

public Queue lazyQueue(){

return QueueBuilder

.durable("lazy.queue")

.lazy() // 開(kāi)啟Lazy模式

.build();

}

這里是通過(guò)QueueBuilder的lazy()函數(shù)配置Lazy模式,底層源碼如下:

當(dāng)然,我們也可以基于注解來(lái)聲明隊(duì)列并設(shè)置為L(zhǎng)azy模式:

@RabbitListener(queuesToDeclare = @Queue(

name = "lazy.queue",

durable = "true",

arguments = @Argument(name = "x-queue-mode", value = "lazy")

))

public void listenLazyQueue(String msg){

log.info("接收到 lazy.queue的消息:{}", msg);

}

2.2.3.更新已有隊(duì)列為lazy模式

對(duì)于已經(jīng)存在的隊(duì)列,也可以配置為lazy模式,但是要通過(guò)設(shè)置policy實(shí)現(xiàn)。 可以基于命令行設(shè)置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解讀:

rabbitmqctl :RabbitMQ的命令行工具set_policy :添加一個(gè)策略L(fǎng)azy :策略名稱(chēng),可以自定義"^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字'{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式--apply-to queues:策略的作用對(duì)象,是所有的隊(duì)列

當(dāng)然,也可以在控制臺(tái)配置policy,進(jìn)入在控制臺(tái)的Admin頁(yè)面,點(diǎn)擊Policies,即可添加配置:

3.消費(fèi)者的可靠性

當(dāng)RabbitMQ向消費(fèi)者投遞消息以后,需要知道消費(fèi)者的處理狀態(tài)如何。因?yàn)橄⑼哆f給消費(fèi)者并不代表就一定被正確消費(fèi)了,可能出現(xiàn)的故障有很多,比如:

消息投遞的過(guò)程中出現(xiàn)了網(wǎng)絡(luò)故障消費(fèi)者接收到消息后突然宕機(jī)消費(fèi)者接收到消息后,因處理不當(dāng)導(dǎo)致異?!?/p>

一旦發(fā)生上述情況,消息也會(huì)丟失。因此,RabbitMQ必須知道消費(fèi)者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。 但問(wèn)題來(lái)了:RabbitMQ如何得知消費(fèi)者的處理狀態(tài)呢?

本章我們就一起研究一下消費(fèi)者處理消息時(shí)的可靠性解決方案。

2.1.消費(fèi)者確認(rèn)機(jī)制

為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:

ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息nack:消息處理失敗,RabbitMQ需要再次投遞消息reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息

一般reject方式用的較少,除非是消息格式有問(wèn)題,那就是開(kāi)發(fā)問(wèn)題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過(guò)try catch機(jī)制捕獲,消息處理成功時(shí)返回ack,處理失敗時(shí)返回nack.

由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實(shí)現(xiàn)了消息確認(rèn)。并允許我們通過(guò)配置文件設(shè)置ACK處理方式,有三種模式:

**none**:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用**manual**:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活**auto**:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:

如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack;如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject;

返回Reject的常見(jiàn)異常有:

Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:

o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message but Message is received.java.lang.NoSuchMethodException: Added in version 1.6.3.java.lang.ClassCastException: Added in version 1.6.3.

通過(guò)下面的配置可以修改SpringAMQP的ACK處理方式:

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: none # 不做處理

修改consumer服務(wù)的SpringRabbitListener類(lèi)中的方法,模擬一個(gè)消息處理的異常:

@RabbitListener(queues = "simple.queue")

public void listenSimpleQueueMessage(String msg) throws InterruptedException {

log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");

if (true) {

throw new MessageConversionException("故意的");

}

log.info("消息處理完成");

}

測(cè)試可以發(fā)現(xiàn):當(dāng)消息處理發(fā)生異常時(shí),消息依然被RabbitMQ刪除了。

我們?cè)俅伟汛_認(rèn)機(jī)制修改為auto:

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: auto # 自動(dòng)ack

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài)): 放行以后,由于拋出的是消息轉(zhuǎn)換異常,因此Spring會(huì)自動(dòng)返回reject,所以消息依然會(huì)被刪除:

我們將異常改為RuntimeException類(lèi)型:

@RabbitListener(queues = "simple.queue")

public void listenSimpleQueueMessage(String msg) throws InterruptedException {

log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");

if (true) {

throw new RuntimeException("故意的");

}

log.info("消息處理完成");

}

在異常位置打斷點(diǎn),然后再次發(fā)送消息測(cè)試,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài)): 放行以后,由于拋出的是業(yè)務(wù)異常,所以Spring返回ack,最終消息恢復(fù)至Ready狀態(tài),并且沒(méi)有被RabbitMQ刪除: 當(dāng)我們把配置改為auto時(shí),消息處理失敗后,會(huì)回到RabbitMQ,并重新投遞到消費(fèi)者。

2.2.失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者。如果消費(fèi)者再次執(zhí)行依然出錯(cuò),消息會(huì)再次requeue到隊(duì)列,再次投遞,直到消息處理成功為止。 極端情況就是消費(fèi)者一直無(wú)法執(zhí)行成功,那么消息requeue就會(huì)無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:

當(dāng)然,上述極端情況發(fā)生的概率還是非常低的,不過(guò)不怕一萬(wàn)就怕萬(wàn)一。為了應(yīng)對(duì)上述情況Spring又提供了消費(fèi)者失敗重試機(jī)制:在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:

rabbitmq:

listener:

simple:

retry:

enabled: true # 開(kāi)啟消費(fèi)者失敗重試

initial-interval: 1000ms # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒

multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval

max-attempts: 3 # 最大重試次數(shù)

stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟consumer服務(wù),重復(fù)之前的測(cè)試。可以發(fā)現(xiàn):

消費(fèi)者在失敗后消息沒(méi)有重新回到MQ無(wú)限重新投遞,而是在本地重試了3次本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是reject

結(jié)論:

開(kāi)啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄

2.3.失敗處理策略

在之前的測(cè)試中,本地測(cè)試達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄。這在某些對(duì)于消息可靠性要求較高的業(yè)務(wù)場(chǎng)景下,顯然不太合適了。 因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個(gè)策略是由MessageRecovery接口來(lái)定義的,它有3個(gè)不同實(shí)現(xiàn):

RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專(zhuān)門(mén)存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列

@Bean

public DirectExchange errorMessageExchange(){

return new DirectExchange("error.direct");

}

@Bean

public Queue errorQueue(){

return new Queue("error.queue", true);

}

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){

return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

}

2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

完整代碼如下:

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.retry.MessageRecoverer;

import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;

import org.springframework.context.annotation.Bean;

@Configuration

@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")

public class ErrorMessageConfig {

@Bean

public DirectExchange errorMessageExchange(){

return new DirectExchange("error.direct");

}

@Bean

public Queue errorQueue(){

return new Queue("error.queue", true);

}

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){

return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

}

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

}

2.4.業(yè)務(wù)冪等性

何為冪等性? 冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來(lái)描述是這樣的:f(x) = f(f(x)),例如求絕對(duì)值函數(shù)。 在程序開(kāi)發(fā)中,則是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。例如:

根據(jù)id刪除數(shù)據(jù)查詢(xún)數(shù)據(jù)新增數(shù)據(jù)

但數(shù)據(jù)的更新往往不是冪等的,如果重復(fù)執(zhí)行可能造成不一樣的后果。比如:

取消訂單,恢復(fù)庫(kù)存的業(yè)務(wù)。如果多次恢復(fù)就會(huì)出現(xiàn)庫(kù)存重復(fù)增加的情況退款業(yè)務(wù)。重復(fù)退款對(duì)商家而言會(huì)有經(jīng)濟(jì)損失。

所以,我們要盡可能避免業(yè)務(wù)被重復(fù)執(zhí)行。 然而在實(shí)際業(yè)務(wù)場(chǎng)景中,由于意外經(jīng)常會(huì)出現(xiàn)業(yè)務(wù)被重復(fù)執(zhí)行的情況,例如:

頁(yè)面卡頓時(shí)頻繁刷新導(dǎo)致表單重復(fù)提交服務(wù)間調(diào)用的重試MQ消息的重復(fù)投遞

我們?cè)谟脩?hù)支付成功后會(huì)發(fā)送MQ消息到交易服務(wù),修改訂單狀態(tài)為已支付,就可能出現(xiàn)消息重復(fù)投遞的情況。如果消費(fèi)者不做判斷,很有可能導(dǎo)致消息被消費(fèi)多次,出現(xiàn)業(yè)務(wù)故障。 舉例:

假如用戶(hù)剛剛支付完成,并且投遞消息到交易服務(wù),交易服務(wù)更改訂單為已支付狀態(tài)。由于某種原因,例如網(wǎng)絡(luò)故障導(dǎo)致生產(chǎn)者沒(méi)有得到確認(rèn),隔了一段時(shí)間后重新投遞給交易服務(wù)。但是,在新投遞的消息被消費(fèi)之前,用戶(hù)選擇了退款,將訂單狀態(tài)改為了已退款狀態(tài)。退款完成后,新投遞的消息才被消費(fèi),那么訂單狀態(tài)會(huì)被再次改為已支付。業(yè)務(wù)異常。

因此,我們必須想辦法保證消息處理的冪等性。這里給出兩種方案:

唯一消息ID業(yè)務(wù)狀態(tài)判斷

2.4.1.唯一消息ID

這個(gè)思路非常簡(jiǎn)單:

每一條消息都生成一個(gè)唯一的id,與消息一起投遞給消費(fèi)者。消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫(kù)如果下次又收到相同消息,去數(shù)據(jù)庫(kù)查詢(xún)判斷是否存在,存在則為重復(fù)消息放棄處理。

我們?cè)撊绾谓o消息添加唯一ID呢? 其實(shí)很簡(jiǎn)單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開(kāi)啟這個(gè)功能即可。 以Jackson的消息轉(zhuǎn)換器為例:

@Bean

public MessageConverter messageConverter(){

// 1.定義消息轉(zhuǎn)換器

Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();

// 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息

jjmc.setCreateMessageIds(true);

return jjmc;

}

2.4.2.業(yè)務(wù)判斷

業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來(lái)判斷是否是重復(fù)的請(qǐng)求或消息,不同的業(yè)務(wù)場(chǎng)景判斷的思路也不一樣。 例如我們當(dāng)前案例中,處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時(shí)判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經(jīng)被處理過(guò),無(wú)需重復(fù)處理。

相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫(kù),所以我更推薦使用業(yè)務(wù)判斷的方案。

以支付修改訂單的業(yè)務(wù)為例,我們需要修改OrderServiceImpl中的markOrderPaySuccess方法:

@Override

public void markOrderPaySuccess(Long orderId) {

// 1.查詢(xún)訂單

Order old = getById(orderId);

// 2.判斷訂單狀態(tài)

if (old == null || old.getStatus() != 1) {

// 訂單不存在或者訂單狀態(tài)不是1,放棄處理

return;

}

// 3.嘗試更新訂單

Order order = new Order();

order.setId(orderId);

order.setStatus(2);

order.setPayTime(LocalDateTime.now());

updateById(order);

}

上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動(dòng)作,因此在極小概率下可能存在線(xiàn)程安全問(wèn)題。

我們可以合并上述操作為這樣:

@Override

public void markOrderPaySuccess(Long orderId) {

// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

lambdaUpdate()

.set(Order::getStatus, 2)

.set(Order::getPayTime, LocalDateTime.now())

.eq(Order::getId, orderId)

.eq(Order::getStatus, 1)

.update();

}

注意看,上述代碼等同于這樣的SQL語(yǔ)句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我們?cè)趙here條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說(shuō)明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會(huì)執(zhí)行。

2.5.兜底方案

雖然我們利用各種機(jī)制盡可能增加了消息的可靠性,但也不好說(shuō)能保證消息100%的可靠。萬(wàn)一真的MQ通知失敗該怎么辦呢? 有沒(méi)有其它兜底方案,能夠確保訂單的支付狀態(tài)一致呢?

其實(shí)思想很簡(jiǎn)單:既然MQ通知不一定發(fā)送到交易服務(wù),那么交易服務(wù)就必須自己主動(dòng)去查詢(xún)支付狀態(tài)。這樣即便支付服務(wù)的MQ通知失敗,我們依然能通過(guò)主動(dòng)查詢(xún)來(lái)保證訂單狀態(tài)的一致。 流程如下:

圖中黃色線(xiàn)圈起來(lái)的部分就是MQ通知失敗后的兜底處理方案,由交易服務(wù)自己主動(dòng)去查詢(xún)支付狀態(tài)。

不過(guò)需要注意的是,交易服務(wù)并不知道用戶(hù)會(huì)在什么時(shí)候支付,如果查詢(xún)的時(shí)機(jī)不正確(比如查詢(xún)的時(shí)候用戶(hù)正在支付中),可能查詢(xún)到的支付狀態(tài)也不正確。 那么問(wèn)題來(lái)了,我們到底該在什么時(shí)間主動(dòng)查詢(xún)支付狀態(tài)呢?

這個(gè)時(shí)間是無(wú)法確定的,因此,通常我們采取的措施就是利用定時(shí)任務(wù)定期查詢(xún),例如每隔20秒就查詢(xún)一次,并判斷支付狀態(tài)。如果發(fā)現(xiàn)訂單已經(jīng)支付,則立刻更新訂單狀態(tài)為已支付即可。 定時(shí)任務(wù)大家之前學(xué)習(xí)過(guò),具體的實(shí)現(xiàn)這里就不再贅述了。

至此,消息可靠性的問(wèn)題已經(jīng)解決了。

綜上,支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性是如何保證的?

首先,支付服務(wù)會(huì)正在用戶(hù)支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞的可靠性最后,我們還在交易服務(wù)設(shè)置了定時(shí)任務(wù),定期查詢(xún)訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。

4.延遲消息

在電商的支付業(yè)務(wù)中,對(duì)于一些庫(kù)存有限的商品,為了更好的用戶(hù)體驗(yàn),通常都會(huì)在用戶(hù)下單時(shí)立刻扣減商品庫(kù)存。例如電影院購(gòu)票、高鐵購(gòu)票,下單后就會(huì)鎖定座位資源,其他人無(wú)法重復(fù)購(gòu)買(mǎi)。

但是這樣就存在一個(gè)問(wèn)題,假如用戶(hù)下單后一直不付款,就會(huì)一直占有庫(kù)存資源,導(dǎo)致其他客戶(hù)無(wú)法正常交易,最終導(dǎo)致商戶(hù)利益受損!

因此,電商中通常的做法就是:對(duì)于超過(guò)一定時(shí)間未支付的訂單,應(yīng)該立刻取消訂單并釋放占用的庫(kù)存。

例如,訂單支付超時(shí)時(shí)間為30分鐘,則我們應(yīng)該在用戶(hù)下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應(yīng)該立刻取消訂單,釋放庫(kù)存。

但問(wèn)題來(lái)了:如何才能準(zhǔn)確的實(shí)現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢?

像這種在一段時(shí)間以后才執(zhí)行的任務(wù),我們稱(chēng)之為延遲任務(wù),而要實(shí)現(xiàn)延遲任務(wù),最簡(jiǎn)單的方案就是利用MQ的延遲消息了。

在RabbitMQ中實(shí)現(xiàn)延遲消息也有兩種方案:

死信交換機(jī)+TTL延遲消息插件

這一章我們就一起研究下這兩種方案的實(shí)現(xiàn)方式,以及優(yōu)缺點(diǎn)。

4.1.死信交換機(jī)和延遲消息

首先我們來(lái)學(xué)習(xí)一下基于死信交換機(jī)的延遲消息方案。

4.1.1.死信交換機(jī)

什么是死信?

當(dāng)一個(gè)隊(duì)列中的消息滿(mǎn)足下列情況之一時(shí),可以成為死信(dead letter):

消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi)要投遞的隊(duì)列消息滿(mǎn)了,無(wú)法投遞

如果一個(gè)隊(duì)列中的消息已經(jīng)成為死信,并且這個(gè)隊(duì)列通過(guò)**dead-letter-exchange**屬性指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)就稱(chēng)為死信交換機(jī)(Dead Letter Exchange)。而此時(shí)加入有隊(duì)列與死信交換機(jī)綁定,則最終死信就會(huì)被投遞到這個(gè)隊(duì)列中。

死信交換機(jī)有什么作用呢?

收集那些因處理失敗而被拒絕的消息收集那些因隊(duì)列滿(mǎn)了而被拒絕的消息收集因TTL(有效期)到期的消息

4.1.2.延遲消息

前面兩種作用場(chǎng)景可以看做是把死信交換機(jī)當(dāng)做一種消息處理的最終兜底方案,與消費(fèi)者重試時(shí)講的RepublishMessageRecoverer作用類(lèi)似。

而最后一種場(chǎng)景,大家設(shè)想一下這樣的場(chǎng)景: 如圖,有一組綁定的交換機(jī)(ttl.fanout)和隊(duì)列(ttl.queue)。但是ttl.queue沒(méi)有消費(fèi)者監(jiān)聽(tīng),而是設(shè)定了死信交換機(jī)hmall.direct,而隊(duì)列direct.queue1則與死信交換機(jī)綁定,RoutingKey是blue:

假如我們現(xiàn)在發(fā)送一條消息到ttl.fanout,RoutingKey為blue,并設(shè)置消息的有效期為5000毫秒: :::warning 注意:盡管這里的ttl.fanout不需要RoutingKey,但是當(dāng)消息變?yōu)樗佬挪⑼哆f到死信交換機(jī)時(shí),會(huì)沿用之前的RoutingKey,這樣hmall.direct才能正確路由消息。 :::

消息肯定會(huì)被投遞到ttl.queue之后,由于沒(méi)有消費(fèi)者,因此消息無(wú)人消費(fèi)。5秒之后,消息的有效期到期,成為死信: 死信被再次投遞到死信交換機(jī)hmall.direct,并沿用之前的RoutingKey,也就是blue: 由于direct.queue1與hmall.direct綁定的key是blue,因此最終消息被成功路由到direct.queue1,如果此時(shí)有消費(fèi)者與direct.queue1綁定, 也就能成功消費(fèi)消息了。但此時(shí)已經(jīng)是5秒鐘以后了: 也就是說(shuō),publisher發(fā)送了一條消息,但最終consumer在5秒后才收到消息。我們成功實(shí)現(xiàn)了延遲消息。

4.1.3.總結(jié)

:::warning 注意: RabbitMQ的消息過(guò)期是基于追溯方式來(lái)實(shí)現(xiàn)的,也就是說(shuō)當(dāng)一個(gè)消息的TTL到期以后不一定會(huì)被移除或投遞到死信交換機(jī),而是在消息恰好處于隊(duì)首時(shí)才會(huì)被處理。 當(dāng)隊(duì)列中消息堆積很多的時(shí)候,過(guò)期消息可能不會(huì)被按時(shí)處理,因此你設(shè)置的TTL時(shí)間不一定準(zhǔn)確。 :::

4.2.DelayExchange插件

基于死信隊(duì)列雖然可以實(shí)現(xiàn)延遲消息,但是太麻煩了。因此RabbitMQ社區(qū)提供了一個(gè)延遲消息插件來(lái)實(shí)現(xiàn)相同的效果。 官方文檔說(shuō)明: Scheduling Messages with RabbitMQ | RabbitMQ - Blog

4.2.1.下載

插件下載地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ 由于我們安裝的MQ是3.8版本,因此這里下載3.8.17版本: 當(dāng)然,也可以直接使用課前資料提供好的插件:

4.2.2.安裝

因?yàn)槲覀兪腔贒ocker安裝,所以需要先查看RabbitMQ的插件目錄對(duì)應(yīng)的數(shù)據(jù)卷。

docker volume inspect mq-plugins

結(jié)果如下:

[

{

"CreatedAt": "2024-06-19T09:22:59+08:00",

"Driver": "local",

"Labels": null,

"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",

"Name": "mq-plugins",

"Options": null,

"Scope": "local"

}

]

插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data這個(gè)目錄,我們上傳插件到該目錄下。

接下來(lái)執(zhí)行命令,安裝插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

運(yùn)行結(jié)果如下:

4.2.3.聲明延遲交換機(jī)

基于注解方式:

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "delay.queue", durable = "true"),

exchange = @Exchange(name = "delay.direct", delayed = "true"),

key = "delay"

))

public void listenDelayMessage(String msg){

log.info("接收到delay.queue的延遲消息:{}", msg);

}

基于@Bean的方式:

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Slf4j

@Configuration

public class DelayExchangeConfig {

@Bean

public DirectExchange delayExchange(){

return ExchangeBuilder

.directExchange("delay.direct") // 指定交換機(jī)類(lèi)型和名稱(chēng)

.delayed() // 設(shè)置delay的屬性為true

.durable(true) // 持久化

.build();

}

@Bean

public Queue delayedQueue(){

return new Queue("delay.queue");

}

@Bean

public Binding delayQueueBinding(){

return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");

}

}

4.2.4.發(fā)送延遲消息

發(fā)送消息時(shí),必須通過(guò)x-delay屬性設(shè)定延遲時(shí)間:

@Test

void testPublisherDelayMessage() {

// 1.創(chuàng)建消息

String message = "hello, delayed message";

// 2.發(fā)送消息,利用消息后置處理器添加消息頭

rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 添加延遲消息屬性

message.getMessageProperties().setDelay(5000);

return message;

}

});

}

:::warning 注意: 延遲消息插件內(nèi)部會(huì)維護(hù)一個(gè)本地?cái)?shù)據(jù)庫(kù)表,同時(shí)使用Elang Timers功能實(shí)現(xiàn)計(jì)時(shí)。如果消息的延遲時(shí)間設(shè)置較長(zhǎng),可能會(huì)導(dǎo)致堆積的延遲消息非常多,會(huì)帶來(lái)較大的CPU開(kāi)銷(xiāo),同時(shí)延遲消息的時(shí)間會(huì)存在誤差。 因此,不建議設(shè)置延遲時(shí)間過(guò)長(zhǎng)的延遲消息。 :::

4.5.訂單狀態(tài)同步問(wèn)題

接下來(lái),我們就在交易服務(wù)中利用延遲消息實(shí)現(xiàn)訂單支付狀態(tài)的同步。其大概思路如下:

假如訂單超時(shí)支付時(shí)間為30分鐘,理論上說(shuō)我們應(yīng)該在下單時(shí)發(fā)送一條延遲消息,延遲時(shí)間為30分鐘。這樣就可以在接收到消息時(shí)檢驗(yàn)訂單支付狀態(tài),關(guān)閉未支付訂單。 但是大多數(shù)情況下用戶(hù)支付都會(huì)在1分鐘內(nèi)完成,我們發(fā)送的消息卻要在MQ中停留30分鐘,額外消耗了MQ的資源。因此,我們最好多檢測(cè)幾次訂單支付狀態(tài),而不是在最后第30分鐘才檢測(cè)。 例如:我們?cè)谟脩?hù)下單后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、…30分分別設(shè)置延遲消息,如果提前發(fā)現(xiàn)訂單已經(jīng)支付,則后續(xù)的檢測(cè)取消即可。 這樣就可以有效避免對(duì)MQ資源的浪費(fèi)了。

優(yōu)化后的實(shí)現(xiàn)思路如下:

由于我們要多次發(fā)送延遲消息,因此需要先定義一個(gè)記錄消息延遲時(shí)間的消息體,處于通用性考慮,我們將其定義到hm-common模塊下: 代碼如下:

package com.hmall.common.domain;

import com.hmall.common.utils.CollUtils;

import lombok.Data;

import java.util.List;

@Data

public class MultiDelayMessage {

/**

* 消息體

*/

private T data;

/**

* 記錄延遲時(shí)間的集合

*/

private List delayMillis;

public MultiDelayMessage(T data, List delayMillis) {

this.data = data;

this.delayMillis = delayMillis;

}

public static MultiDelayMessage of(T data, Long ... delayMillis){

return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));

}

/**

* 獲取并移除下一個(gè)延遲時(shí)間

* @return 隊(duì)列中的第一個(gè)延遲時(shí)間

*/

public Long removeNextDelay(){

return delayMillis.remove(0);

}

/**

* 是否還有下一個(gè)延遲時(shí)間

*/

public boolean hasNextDelay(){

return !delayMillis.isEmpty();

}

}

4.5.1.定義常量

無(wú)論是消息發(fā)送還是接收都是在交易服務(wù)完成,因此我們?cè)趖rade-service中定義一個(gè)常量類(lèi),用于記錄交換機(jī)、隊(duì)列、RoutingKey等常量: 內(nèi)容如下:

package com.hmall.trade.constants;

public interface MqConstants {

String DELAY_EXCHANGE = "trade.delay.topic";

String DELAY_ORDER_QUEUE = "trade.order.delay.queue";

String DELAY_ORDER_ROUTING_KEY = "order.query";

}

4.5.2.抽取共享mq配置

我們將mq的配置抽取到nacos中,方便各個(gè)微服務(wù)共享配置。 在nacos中定義一個(gè)名為shared-mq.xml的配置文件,內(nèi)容如下:

spring:

rabbitmq:

host: ${hm.mq.host:192.168.150.101} # 主機(jī)名

port: ${hm.mq.port:5672} # 端口

virtual-host: ${hm.mq.vhost:/hmall} # 虛擬主機(jī)

username: ${hm.mq.un:hmall} # 用戶(hù)名

password: ${hm.mq.pw:123} # 密碼

listener:

simple:

prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息

這里只添加一些基礎(chǔ)配置,至于生產(chǎn)者確認(rèn),消費(fèi)者確認(rèn)配置則由微服務(wù)根據(jù)業(yè)務(wù)自己決定。

在trade-service模塊添加共享配置:

4.5.3.改造下單業(yè)務(wù)

接下來(lái),我們改造下單業(yè)務(wù),在下單完成后,發(fā)送延遲消息,查詢(xún)支付狀態(tài)。

1)引入依賴(lài) 在trade-service模塊的pom.xml中引入amqp的依賴(lài):

org.springframework.boot

spring-boot-starter-amqp

2)改造下單業(yè)務(wù) 修改trade-service模塊的com.hmall.trade.service.impl.OrderServiceImpl類(lèi)的createOrder方法,添加消息發(fā)送的代碼:

4.5.4.編寫(xiě)查詢(xún)支付狀態(tài)接口

由于MQ消息處理時(shí)需要查詢(xún)支付狀態(tài),因此我們要在pay-service模塊定義一個(gè)這樣的接口,并提供對(duì)應(yīng)的FeignClient. 首先,在hm-api模塊定義三個(gè)類(lèi): 說(shuō)明:

PayOrderDTO:支付單的數(shù)據(jù)傳輸實(shí)體PayClient:支付系統(tǒng)的Feign客戶(hù)端PayClientFallback:支付系統(tǒng)的fallback邏輯

PayOrderDTO代碼如下:

package com.hmall.api.dto;

import io.swagger.annotations.ApiModel;

import io.swagger.annotations.ApiModelProperty;

import lombok.Data;

import java.time.LocalDateTime;

/**

*

* 支付訂單

*

*/

@Data

@ApiModel(description = "支付單數(shù)據(jù)傳輸實(shí)體")

public class PayOrderDTO {

@ApiModelProperty("id")

private Long id;

@ApiModelProperty("業(yè)務(wù)訂單號(hào)")

private Long bizOrderNo;

@ApiModelProperty("支付單號(hào)")

private Long payOrderNo;

@ApiModelProperty("支付用戶(hù)id")

private Long bizUserId;

@ApiModelProperty("支付渠道編碼")

private String payChannelCode;

@ApiModelProperty("支付金額,單位分")

private Integer amount;

@ApiModelProperty("付類(lèi)型,1:h5,2:小程序,3:公眾號(hào),4:掃碼,5:余額支付")

private Integer payType;

@ApiModelProperty("付狀態(tài),0:待提交,1:待支付,2:支付超時(shí)或取消,3:支付成功")

private Integer status;

@ApiModelProperty("拓展字段,用于傳遞不同渠道單獨(dú)處理的字段")

private String expandJson;

@ApiModelProperty("第三方返回業(yè)務(wù)碼")

private String resultCode;

@ApiModelProperty("第三方返回提示信息")

private String resultMsg;

@ApiModelProperty("支付成功時(shí)間")

private LocalDateTime paySuccessTime;

@ApiModelProperty("支付超時(shí)時(shí)間")

private LocalDateTime payOverTime;

@ApiModelProperty("支付二維碼鏈接")

private String qrCodeUrl;

@ApiModelProperty("創(chuàng)建時(shí)間")

private LocalDateTime createTime;

@ApiModelProperty("更新時(shí)間")

private LocalDateTime updateTime;

}

PayClient代碼如下:

package com.hmall.api.client;

import com.hmall.api.client.fallback.PayClientFallback;

import com.hmall.api.dto.PayOrderDTO;

import org.springframework.cloud.openfeign.FeignClient;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)

public interface PayClient {

/**

* 根據(jù)交易訂單id查詢(xún)支付單

* @param id 業(yè)務(wù)訂單id

* @return 支付單信息

*/

@GetMapping("/pay-orders/biz/{id}")

PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);

}

PayClientFallback代碼如下:

package com.hmall.api.client.fallback;

import com.hmall.api.client.PayClient;

import com.hmall.api.dto.PayOrderDTO;

import lombok.extern.slf4j.Slf4j;

import org.springframework.cloud.openfeign.FallbackFactory;

@Slf4j

public class PayClientFallback implements FallbackFactory {

@Override

public PayClient create(Throwable cause) {

return new PayClient() {

@Override

public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {

return null;

}

};

}

}

最后,在pay-service模塊的PayController中實(shí)現(xiàn)該接口:

@ApiOperation("根據(jù)id查詢(xún)支付單")

@GetMapping("/biz/{id}")

public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){

PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();

return BeanUtils.copyBean(payOrder, PayOrderDTO.class);

}

4.5.5.消息監(jiān)聽(tīng)

接下來(lái),我們?cè)趖rader-service編寫(xiě)一個(gè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)延遲消息,查詢(xún)訂單支付狀態(tài): 代碼如下:

package com.hmall.trade.listener;

import com.hmall.api.client.PayClient;

import com.hmall.api.dto.PayOrderDTO;

import com.hmall.common.domain.MultiDelayMessage;

import com.hmall.trade.constants.MqConstants;

import com.hmall.trade.domain.po.Order;

import com.hmall.trade.service.IOrderService;

import lombok.RequiredArgsConstructor;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.stereotype.Component;

@Slf4j

@Component

@RequiredArgsConstructor

public class OrderStatusListener {

private final IOrderService orderService;

private final PayClient payClient;

private final RabbitTemplate rabbitTemplate;

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),

exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),

key = MqConstants.DELAY_ORDER_ROUTING_KEY

))

public void listenOrderCheckDelayMessage(MultiDelayMessage msg) {

// 1.獲取消息中的訂單id

Long orderId = msg.getData();

// 2.查詢(xún)訂單,判斷狀態(tài):1是未支付,大于1則是已支付或已關(guān)閉

Order order = orderService.getById(orderId);

if (order == null || order.getStatus() > 1) {

// 訂單不存在或交易已經(jīng)結(jié)束,放棄處理

return;

}

// 3.可能是未支付,查詢(xún)支付服務(wù)

PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);

if (payOrder != null && payOrder.getStatus() == 3) {

// 支付成功,更新訂單狀態(tài)

orderService.markOrderPaySuccess(orderId);

return;

}

// 4.確定未支付,判斷是否還有剩余延遲時(shí)間

if (msg.hasNextDelay()) {

// 4.1.有延遲時(shí)間,需要重發(fā)延遲消息,先獲取延遲時(shí)間的int值

int delayVal = msg.removeNextDelay().intValue();

// 4.2.發(fā)送延遲消息

rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,

message -> {

message.getMessageProperties().setDelay(delayVal);

return message;

});

return;

}

// 5.沒(méi)有剩余延遲時(shí)間了,說(shuō)明訂單超時(shí)未支付,需要取消訂單

orderService.cancelOrder(orderId);

}

}

注意,這里要在OrderServiceImpl中實(shí)現(xiàn)cancelOrder方法,留作作業(yè)大家自行實(shí)現(xiàn)。

5.作業(yè)

5.1.取消訂單

在處理超時(shí)未支付訂單時(shí),如果發(fā)現(xiàn)訂單確實(shí)超時(shí)未支付,最終需要關(guān)閉該訂單。 關(guān)閉訂單需要完成兩件事情:

將訂單狀態(tài)修改為已關(guān)閉恢復(fù)訂單中已經(jīng)扣除的庫(kù)存

這部分功能尚未實(shí)現(xiàn)。 大家要在IOrderService接口中定義cancelOrder方法:

void cancelOrder(Long orderId);

并且在OrderServiceImpl中實(shí)現(xiàn)該方法。實(shí)現(xiàn)過(guò)程中要注意業(yè)務(wù)冪等性判斷。

5.2.抽取MQ工具

MQ在企業(yè)開(kāi)發(fā)中的常見(jiàn)應(yīng)用我們就學(xué)習(xí)完畢了,除了收發(fā)消息以外,消息可靠性的處理、生產(chǎn)者確認(rèn)、消費(fèi)者確認(rèn)、延遲消息等等編碼還是相對(duì)比較復(fù)雜的。 因此,我們需要將這些常用的操作封裝為工具,方便在項(xiàng)目中使用。要求如下:

在hm-commom模塊下編寫(xiě)發(fā)送消息的工具類(lèi)RabbitMqHelper定義一個(gè)自動(dòng)配置類(lèi)MqConsumeErrorAutoConfiguration,內(nèi)容包括:

聲明一個(gè)交換機(jī),名為error.direct,類(lèi)型為direct聲明一個(gè)隊(duì)列,名為:微服務(wù)名 + error.queue,也就是說(shuō)要?jiǎng)討B(tài)獲取將隊(duì)列與交換機(jī)綁定,綁定時(shí)的RoutingKey就是微服務(wù)名聲明RepublishMessageRecoverer,消費(fèi)失敗消息投遞到上述交換機(jī)給配置類(lèi)添加條件,當(dāng)spring.rabbitmq.listener.simple.retry.enabled為true時(shí)觸發(fā)

RabbitMqHelper的結(jié)構(gòu)如下:

public class RabbitMqHelper {

private final RabbitTemplate rabbitTemplate;

public void sendMessage(String exchange, String routingKey, Object msg){

}

public void sendDelayMessage(String exchange, String routingKey, Object msg, int delay){

}

public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, int maxRetries){

}

}

5.3.改造業(yè)務(wù)

利用你編寫(xiě)的工具,改造支付服務(wù)、購(gòu)物車(chē)服務(wù)、交易服務(wù)中消息發(fā)送功能,并且添加消息確認(rèn)或消費(fèi)者重試機(jī)制,確保消息的可靠性。

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ 高級(jí)

http://yzkb.51969.com/

文章來(lái)源

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀(guān)點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19086292.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪(fǎng)問(wèn)

文章目錄