柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ之“延時(shí)隊(duì)列”
柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ之“延時(shí)隊(duì)列”
延時(shí)隊(duì)列
RabbitMQ是目前最為流行的消息隊(duì)列之一,它的高可靠性、高可用性和高性能使得它成為眾多應(yīng)用場(chǎng)景下的首選。在實(shí)際應(yīng)用中,我們經(jīng)常需要實(shí)現(xiàn)延時(shí)隊(duì)列來(lái)解決一些業(yè)務(wù)問(wèn)題,比如訂單超時(shí)未支付自動(dòng)取消等。本文將介紹如何使用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列
下面先來(lái)解釋一下
延時(shí)隊(duì)列(也可以稱為延遲隊(duì)列,其實(shí)都是一個(gè)意思):
延遲隊(duì)列存儲(chǔ)的對(duì)象是對(duì)應(yīng)的延遲消息,所謂“延遲消息”是指當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立刻拿到消息,而是等待特定時(shí)間后,消費(fèi)者才能拿到這個(gè)消息進(jìn)行消費(fèi)。
延時(shí)消息:生產(chǎn)者發(fā)送消息時(shí)指定一個(gè)時(shí)間,消費(fèi)者不會(huì)立刻收到消息,而是在指定時(shí)間之后才收到消息
延時(shí)任務(wù):設(shè)置在一定時(shí)間之后才執(zhí)行的任務(wù)
死信:
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),就會(huì)成為死信(dead letter):
1、消費(fèi)者使用basic.reject或basic.nack聲明消息消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
2、消息是一個(gè)過(guò)期消息(達(dá)到了隊(duì)列或消息本身設(shè)置的過(guò)期時(shí)間),超時(shí)無(wú)人消費(fèi)
3、要投遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信
延時(shí)隊(duì)列可以用于以下場(chǎng)景:
訂單處理:在電商網(wǎng)站中,訂單處理是一個(gè)常見(jiàn)的業(yè)務(wù)流程。如果訂單需要立即處理,可以使用RabbitMQ的延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)延遲處理。例如,可以將訂單發(fā)送到一個(gè)延時(shí)隊(duì)列中,并設(shè)置一個(gè)延遲時(shí)間(例如30分鐘),然后在延遲時(shí)間到達(dá)后,將訂單從隊(duì)列中取出并進(jìn)行處理。消息推送:在移動(dòng)應(yīng)用或Web應(yīng)用程序中,可以使用RabbitMQ的延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)消息推送。例如,可以將用戶訂閱的消息發(fā)送到一個(gè)延時(shí)隊(duì)列中,并設(shè)置一個(gè)延遲時(shí)間(例如1小時(shí)),然后在延遲時(shí)間到達(dá)后,將消息從隊(duì)列中取出并推送給用戶。定時(shí)任務(wù):在分布式系統(tǒng)中,可以使用RabbitMQ的延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)定時(shí)任務(wù)。例如,可以將需要定期執(zhí)行的任務(wù)發(fā)送到一個(gè)延時(shí)隊(duì)列中,并設(shè)置一個(gè)延遲時(shí)間(例如每天),然后在延遲時(shí)間到達(dá)后,將任務(wù)從隊(duì)列中取出并執(zhí)行。數(shù)據(jù)備份:在數(shù)據(jù)庫(kù)中,可以使用RabbitMQ的延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)數(shù)據(jù)備份。例如,可以將需要備份的數(shù)據(jù)發(fā)送到一個(gè)延時(shí)隊(duì)列中,并設(shè)置一個(gè)延遲時(shí)間(例如每天),然后在延遲時(shí)間到達(dá)后,將數(shù)據(jù)從隊(duì)列中取出并進(jìn)行備份。優(yōu)惠券發(fā)放:您可以設(shè)置一個(gè)延時(shí)隊(duì)列,將優(yōu)惠券發(fā)放任務(wù)添加到隊(duì)列中,設(shè)置一定的延時(shí)時(shí)間,以保證優(yōu)惠券在特定時(shí)間后才能被消費(fèi)。動(dòng)態(tài)路由:您可以使用延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)動(dòng)態(tài)路由的功能,將消息發(fā)送到延時(shí)隊(duì)列中,并設(shè)置一定的路由規(guī)則,以實(shí)現(xiàn)消息在特定時(shí)間后被路由到不同的目標(biāo)隊(duì)列中。
業(yè)務(wù)場(chǎng)景:
我們通常會(huì)在電商網(wǎng)站中(或者app比如:京東,淘寶)進(jìn)行下單,購(gòu)買商品,但是我們由于沒(méi)喲及時(shí)支付,會(huì)出現(xiàn)訂單超時(shí)未支付自動(dòng)取消的情況
下面用一張簡(jiǎn)單的圖片來(lái)設(shè)計(jì)一下業(yè)務(wù)場(chǎng)景:
那我們?cè)撊绾稳?shí)現(xiàn)延時(shí)隊(duì)列呢,下面用一張圖片給大家解釋一下
話不多說(shuō),上代碼?。?!
作者在這里只創(chuàng)建了一個(gè)交換機(jī),這個(gè)交換機(jī)可以同時(shí)綁定兩個(gè)隊(duì)列(有兩個(gè)隊(duì)列,一個(gè)隊(duì)列設(shè)置了它的ttl(消息過(guò)期時(shí)間),同時(shí)設(shè)置了消息過(guò)期后的路由交換機(jī)和路由的routeKey,如果不設(shè)置過(guò)期策略那么消息過(guò)期之后就會(huì)進(jìn)入死信隊(duì)列,另外一個(gè)隊(duì)列是普通隊(duì)列,監(jiān)聽(tīng)的時(shí)候只用去監(jiān)聽(tīng)普通隊(duì)列,達(dá)到延遲隊(duì)列的效果。跟上圖效果一樣,消息通過(guò)這個(gè)交換機(jī)到達(dá)設(shè)置了過(guò)期時(shí)間的的隊(duì)列,這個(gè)延遲隊(duì)列沒(méi)有消費(fèi)者進(jìn)行消費(fèi),當(dāng)消息過(guò)期之后,會(huì)通過(guò)這個(gè)交換機(jī)路由到正常的隊(duì)列,然后進(jìn)行消費(fèi))
導(dǎo)入依賴
配置類
package com.atguigu.gulimall.auth.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author YanShuLing
* @Package:com.atguigu.gulimall.auth.config
* @Project: brook
* @Description TODO
* @name:RabbitMQConfig
* @Date 2024/3/8:9:56
*/
@Configuration
public class RabbitMqConfig {
//創(chuàng)建了一個(gè)簡(jiǎn)單的隊(duì)列
@Bean
public Queue createOrderReleaseQueue(){
return new Queue("gmall.order.release.queue");
}
//這個(gè)是一個(gè)延時(shí)隊(duì)列
@Bean
public Queue createOrderDeadQueue(){
Map
//隊(duì)列消息的過(guò)期時(shí)間為十秒
map.put("x-message-ttl",10000);
//交換機(jī)
map.put("x-dead-letter-exchange","gmall-order-exchange");
//路由key
map.put("x-dead-letter-routing-key","gmall.order.release.queue");
return new Queue("gmall.order.dead.queue",true,false,false,map);
}
//交換機(jī)
@Bean
public Exchange createOrderExchange(){
return new DirectExchange("gmall-order-exchange");
}
//交換機(jī)和正常隊(duì)列綁定
@Bean
public Binding createOrderReleaseBind(){
return new Binding("gmall.order.release.queue",Binding.DestinationType.QUEUE,
"gmall-order-exchange","gmall.order.release.queue",null
);
}
//交換機(jī)和延遲隊(duì)列綁定
@Bean
public Binding createOrderDeadBind(){
return new Binding("gmall.order.dead.queue",Binding.DestinationType.QUEUE,
"gmall-order-exchange","gmall.order.dead.queue",null
);
}
}
生產(chǎn)者(作者寫了一個(gè)發(fā)送驗(yàn)證碼的代碼):
@PostMapping("/createOrder")
public R createOrder(String mobile){
//生成隨機(jī)的四位數(shù)(驗(yàn)證碼)
String code = RandomUtil.randomNumbers(4);
//redis給這個(gè)驗(yàn)證碼設(shè)置過(guò)期時(shí)間為5分鐘
redisTemplate.opsForValue().set("send_sms_"+mobile,code,5, TimeUnit.MINUTES);
String content = StrFormatter.format(Constants.SMS_TEMPLATE,code);
//給這個(gè)消息生成一個(gè)唯一標(biāo)識(shí),為了解決消息重復(fù)消費(fèi)問(wèn)題
String messageId = IdUtil.randomUUID();
//生產(chǎn)者發(fā)送消息,第一個(gè)參數(shù)是路由交換機(jī),第二個(gè)參數(shù)是路由鍵,作者設(shè)置了跟死信隊(duì)列一樣的
名稱,無(wú)傷大雅
rabbitRemplate.convertAndSend("gmall-order-exchange","gmall.order.dead.queue",
JSON.toJSONstring(new SmsParamVo(mobile,content,messageId)));
//發(fā)送驗(yàn)證碼,日志打印
log.info("發(fā)送延遲消息給ttl隊(duì)列,當(dāng)前時(shí)間:{},消息內(nèi)容:{}",new Date().toString(),content);
// smsService.sendSms(mobile,content);
return R.ok("成功");
}
消費(fèi)者:用來(lái)監(jiān)聽(tīng)消息
//消費(fèi)者監(jiān)聽(tīng)隊(duì)列為gmall.order.release.queue隊(duì)列的消息
@RabbitListener(queues = {"gmall.order.release.queue"})
@Component
@Slf4j
public class SmsListener {
private final SmsService smsService;
private final RedisTemplate redisTemplate;
public SmsListener(SmsService smsService, RedisTemplate redisTemplate) {
this.smsService = smsService;
this.redisTemplate = redisTemplate;
}
@RabbitHandler
public void sendSms(String string, Channel channel, Message message){
//消息標(biāo)簽
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
SmsParamVo smsParamVo = JSON.parseObject(string, SmsParamVo.class);
if(redisTemplate.hasKey(smsParamVo.getMsgId())){
//拿到消息的唯一標(biāo)簽,如果是已經(jīng)消費(fèi)過(guò)的消息,直接拒絕簽收
channel.basicReject(deliveryTag,false);
return;
}
//打印日志
log.info("發(fā)送延遲消息給ttl隊(duì)列,當(dāng)前時(shí)間:{},消息內(nèi)容:{}",new Date().toString(),smsPar
mVo);
//調(diào)用發(fā)送短信
// smsService.sendSms(smsParamVo.getMobile(),smsParamVo.getContext());
redisTemplate.opsForValue().set(smsParamVo.getMsgId(),smsParamVo.getMsgId(),12, TimeUnit.HOURS);
//確認(rèn)簽收,消息會(huì)從隊(duì)列中刪除
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
try {
if(deliveryTag<=3){
//如果是由于某種特殊原因,消息沒(méi)有發(fā)送成功,然后重回隊(duì)列,
channel.basicNack(deliveryTag,false,true);
}
//當(dāng)重試次數(shù)達(dá)到一定的數(shù)量,就放進(jìn)死信隊(duì)列
channel.basicNack(deliveryTag,false,false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
測(cè)試發(fā)送之前,我們先來(lái)到rabbitMq可視化界面觀察一下
下面我們來(lái)測(cè)試一下,作者使用的是Postman
看看后臺(tái)日志打印,我們可以看到我們已經(jīng)實(shí)現(xiàn)了延遲消息的效果
還有一種方式也可以實(shí)現(xiàn)延遲消息
那就是延遲消息插件,RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲消息功能。該插件的原理是設(shè)計(jì)了一種支持延遲消息功能的交換機(jī),當(dāng)消息投遞到交換機(jī)后可以暫存一定時(shí)間,到期后再投遞到隊(duì)列
1、前往RabbitMQ官網(wǎng)下載往RabbitMQ添加延遲消息的插件
RabbitMQ官網(wǎng)下載插件的網(wǎng)址:Community Plugins | RabbitMQ
2、下載rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下載的插件就得是什么版本的,得對(duì)應(yīng)上,以下截圖為官方文檔的對(duì)插件版本的要求說(shuō)明
這里作者的版本是3.9.13所以,作者就下載3.9版本的
選擇3.9版本
? ? ? ? ? ? ? ??
?3、把這個(gè)插件傳輸?shù)椒?wù)器上
4、拷貝下載好的插件到容器中
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez
可以看到我已經(jīng)copy到容器內(nèi)部了
?5、安裝延遲隊(duì)列插件
進(jìn)入RabbitMQ安裝目錄的目錄下
//進(jìn)入容器內(nèi)部
docker exec -it rabbitmq /bin/bash
進(jìn)入安裝目錄
cd /opt/rabbitmq/plugins
使用如下命令啟用延遲插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如下我們就安裝好了,然后我們重啟rabbitmq容器
使用 exit 命令退出容器
使用docker restart rabbitmq 重啟容器
我們來(lái)rabbitmq的可視化界面查看
這樣說(shuō)明我們的延遲插件就安裝好啦!
到此就結(jié)束啦!希望可以幫到你,可以幫作者點(diǎn)個(gè)關(guān)注和小心心嘛!你們的支持就是我最大的動(dòng)力,以后也會(huì)努力更新的哦!
柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ之“延時(shí)隊(duì)列”
好文推薦
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。