柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ實(shí)現(xiàn)延時(shí)消息
柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ實(shí)現(xiàn)延時(shí)消息
RabbitMQ原生并不支持延時(shí)消息,需要我們自行實(shí)現(xiàn)。
AMQP協(xié)議
生產(chǎn)者端發(fā)布消息到MQ中的交換機(jī)Exchange,每條消息會(huì)帶一個(gè)路由鍵RoutingKey;交換機(jī)與隊(duì)列queue通過(guò)路由鍵進(jìn)行綁定binding;消息通過(guò)路由鍵路由到對(duì)應(yīng)的隊(duì)列queue,隊(duì)列與交換器沒(méi)有對(duì)應(yīng)的綁定關(guān)系則消息會(huì)丟失;消息由隊(duì)列投遞給消費(fèi)者端進(jìn)行消費(fèi)。
交換機(jī)類型
direct:直連交換機(jī)fanout:廣播交換機(jī)topic:主題交換機(jī)headers:頭交換機(jī),基本不用,由direct替代
延時(shí)消息實(shí)現(xiàn)方式
消息過(guò)期時(shí)間+死信隊(duì)列
普通隊(duì)列綁定死信交換機(jī),發(fā)送消息時(shí)設(shè)置過(guò)期時(shí)間,過(guò)期時(shí)間到了進(jìn)入死信隊(duì)列,監(jiān)聽(tīng)死信隊(duì)列消費(fèi)。 存在的問(wèn)題:消息的過(guò)期時(shí)間不一致時(shí),使用死信隊(duì)列可能起不到延時(shí)的作用。當(dāng)發(fā)送兩條不同過(guò)期時(shí)間的消息時(shí),先發(fā)送的消息1過(guò)期時(shí)間(20s)大于后發(fā)送的消息2過(guò)期時(shí)間(10s),由于消息的順序消費(fèi),消息2過(guò)期后并不會(huì)立即重新發(fā)布到死信交換機(jī),而是等消息1過(guò)期后一起被消費(fèi)。 使用場(chǎng)景:超時(shí)時(shí)間一致,如訂單超時(shí)取消。
延時(shí)插件
下載延時(shí)插件,將插件移至rabbitmq插件目錄下,通過(guò)命令應(yīng)用延時(shí)插件。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
聲明一個(gè)類型為x-delayed-message的交換機(jī)CustomExchange。參數(shù)添加x-delayed-type,值為交換機(jī)類型的屬性,如direct,用于路由鍵的映射。發(fā)送消息時(shí),消息頭header設(shè)置參數(shù)x-delay,值為延遲時(shí)間,單位毫秒。
云消息隊(duì)列RabbitMQ(阿里云)
阿里云消息隊(duì)列rabbitmq提供了延時(shí)消息的原生支持,只需要在消息頭header里面增加參數(shù)delay,值為延遲時(shí)間,單位毫秒。
MessageProperties messageProperties = new MessageProperties();
String msgId = "CANCEL:" + order.getId();
//此處設(shè)置的msgId才能被會(huì)轉(zhuǎn)成rabbitmq client的messageId,發(fā)送給broker
messageProperties.setMessageId(msgId);
long delay = 15 * 60 * 1000;
// 消息頭設(shè)置延時(shí)時(shí)間
messageProperties.setHeader("delay", delay);
Message message = new Message(order.getId().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(ORDER_CANCEL_TOPIC, Q_CANCEL_ORDER, message);
代碼使用示例
pom文件添加amqp依賴。
配置文件application.yml添加mq配置。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: rabbitmq
password: rabbitmq
virtual-host: rabbitmq
listener:
simple:
acknowledge-mode: manual
編寫(xiě)配置類RabbitMQConfig,創(chuàng)建交換機(jī)、隊(duì)列并進(jìn)行綁定。
@Configuration
public class RabbitMQConfig {
// 普通交換機(jī)
public static final String ORDER_CANCEL_EXCHANGE = "order_cancel_exchange";
public static final String ORDER_CANCEL_QUEUE = "order_cancel_queue";
// 死信交換機(jī)
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_ROUTING_KEY = "dead_routing_key";
public static final String DEAD_QUEUE = "dead_queue";
// 延遲插件交換機(jī)
public static final String DMP_EXCHANGE = "dmp_exchange";
public static final String DMP_ROUTING_KEY = "dmp_routing_key";
public static final String DMP_QUEUE = "dmp_queue";
@Bean
public Exchange orderCancelExchange() {
return ExchangeBuilder.directExchange(ORDER_CANCEL_EXCHANGE).build();
}
@Bean
public Queue orderCancelQueue() {
return QueueBuilder.durable(ORDER_CANCEL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
}
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding orderCancelBinding(Exchange orderCancelExchange, Queue orderCancelQueue) {
return BindingBuilder.bind(orderCancelQueue).to(orderCancelExchange).with("").noargs();
}
@Bean
public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
// 延時(shí)插件使用
@Bean
public CustomExchange dmpExchange() {
Map
args.put("x-delayed-type", "direct");
return new CustomExchange(DMP_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue dmpQueue() {
return QueueBuilder.durable(DMP_QUEUE).build();
}
@Bean
public Binding dmpBinding(CustomExchange dmpExchange, Queue dmpQueue) {
return BindingBuilder.bind(dmpQueue).to(dmpExchange).with(DMP_ROUTING_KEY).noargs();
}
}
4.編寫(xiě)消息生產(chǎn)者M(jìn)sgSender,實(shí)現(xiàn)死信隊(duì)列、延時(shí)插件消息發(fā)送。
@Component
@Slf4j
public class MsgSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 通過(guò)死信隊(duì)列發(fā)送消息
* @param message
* @param time
*/
public void send(String message, Integer time) {
String expireTime = String.valueOf(time * 1000);
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_CANCEL_EXCHANGE, "", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//設(shè)置消息的過(guò)期時(shí)間,是以毫秒為單位的
message.getMessageProperties().setExpiration(expireTime);
return message;
}
});
log.info("死信隊(duì)列消息:{}發(fā)送成功,過(guò)期時(shí)間:{}秒", message, time);
}
/**
* 通過(guò)延遲插件發(fā)送消息
* @param message
* @param time
*/
public void sendByPlugin(String message, Integer time) {
rabbitTemplate.convertAndSend(RabbitMQConfig.DMP_EXCHANGE, RabbitMQConfig.DMP_ROUTING_KEY, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 延遲插件只需要在消息的header中添加x-delay屬性,值為過(guò)期時(shí)間,單位毫秒
message.getMessageProperties().setHeader("x-delay", time * 1000);
return message;
}
});
log.info("延遲插件消息:{}發(fā)送成功,過(guò)期時(shí)間:{}秒", message, time);
}
}
編寫(xiě)消息消費(fèi)者M(jìn)sgListener,監(jiān)聽(tīng)延時(shí)消息進(jìn)行相應(yīng)的業(yè)務(wù)處理。
@Component
@Slf4j
public class MsgListener {
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
public void orderCancel(Message message, Channel channel) throws IOException {
log.info("使用死信隊(duì)列,收到消息:{}", new String(message.getBody()));
// 手動(dòng)ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = RabbitMQConfig.DMP_QUEUE)
public void orderCancelByPlugin(Message message, Channel channel) throws IOException {
log.info("使用延遲插件,收到消息:{}", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
編寫(xiě)測(cè)試類MsgController,啟動(dòng)服務(wù)發(fā)送消息測(cè)試。
@RestController
public class MsgController {
@Autowired
public MsgSender msgSender;
@GetMapping("/send")
public String send(@RequestParam String msg, Integer time) {
msgSender.send(msg, time);
return "success";
}
@GetMapping("/sendByPlugin")
public String sendByPlugin(@RequestParam String msg, Integer time) {
msgSender.sendByPlugin(msg, time);
return "success";
}
}
1)使用死信隊(duì)列發(fā)送兩次消息http://localhost:8081/send?msg=消息A&time=20 和http://localhost:8081/send?msg=消息B&time=10,消息會(huì)按發(fā)送順序消費(fèi),并沒(méi)有起到延時(shí)的效果。
2)使用延時(shí)插件發(fā)送兩次消息http://localhost:8081/sendByPlugin?msg=消息A&time=20 和http://localhost:8081/sendByPlugin?msg=消息B&time=10,消息會(huì)按照延時(shí)時(shí)間順序消費(fèi)。
柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ實(shí)現(xiàn)延時(shí)消息
推薦鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。