柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ - 消息確認(rèn)
?1. 消息確認(rèn)機(jī)制
生產(chǎn)者發(fā)送消息之后, 到達(dá)消費(fèi)端之后, 可能會(huì)有以下情況:
????????(1)消息處理成功
????????(2)消息處理失敗
RabbitMQ向消費(fèi)者發(fā)送消息之后, 就會(huì)把這條消息刪掉, 那么第二種情況, 就會(huì)造成消息丟失。
那么如何確保消費(fèi)端已經(jīng)成功接收了, 并正確處理了呢?
為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement)。
消費(fèi)者在訂閱隊(duì)列時(shí),可以指定autoAck參數(shù),根據(jù)這個(gè)參數(shù)設(shè)置,消息確認(rèn)機(jī)制分為以下兩種:
自動(dòng)確認(rèn):當(dāng)autoAck等于true時(shí),RabbitMQ ?會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正地消費(fèi)到了這些消息,自動(dòng)確認(rèn)模式適合對(duì)于消息可靠性要求不高的場(chǎng)景。手動(dòng)確認(rèn):當(dāng)autoAck等于false時(shí),RabbitMO會(huì)等待消費(fèi)者顯式地調(diào)用Basic.Ack命令,回復(fù)確認(rèn)信號(hào)后才從內(nèi)存(或者磁盤)中移除消息,這種模式適合對(duì)消息可靠性要求比較高的場(chǎng)景。
當(dāng)autoAck參數(shù)置為false,對(duì)于RabbitMO服務(wù)端而言,隊(duì)列中的消息分成了兩個(gè)部分:
一是等待投遞給消費(fèi)者的消息。
二是已經(jīng)投遞給消費(fèi)者,但是還沒有收到消費(fèi)者確認(rèn)信號(hào)的消息。
如果RabbitM一直沒有收到消費(fèi)者的確認(rèn)信號(hào),并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開連接。
則RabbitM會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有可能還是原來的那個(gè)消費(fèi)。
從RabbitMQ的Web管理平臺(tái)上, 也可以看到當(dāng)前隊(duì)列中Ready狀態(tài)和Unacked狀態(tài)的
Ready: 等待投遞給消費(fèi)者的消息數(shù)。
Unacked: 已經(jīng)投遞給消費(fèi)者, 但是未收到消費(fèi)者確認(rèn)信號(hào)的消息。
2. 手動(dòng)確認(rèn)消息
消費(fèi)者在收到消息之后,可以選擇確認(rèn),也可以選擇直接拒絕或者跳過,RabbitMQ也提供了不同的確認(rèn)應(yīng)答的方式,消費(fèi)者客戶端可以調(diào)用與其對(duì)應(yīng)的channel的相關(guān)方法,共有以下三種:
(1)肯定確認(rèn)?
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMo已知道該消息并且成功的處理消息.可以將其丟棄了
參數(shù)說明:
deliveryTag:消息的唯一標(biāo)識(shí),它是一個(gè)單調(diào)遞增的64位的長(zhǎng)整型值。
multiple:是否批量確認(rèn)。
deliveryTag 是RabbitMQ中消息確認(rèn)機(jī)制的一個(gè)重要組成部分,它確保了消息傳遞的可靠性和順序性。
(2)否定確認(rèn)
Channel.basicReject(long deliveryTag, boolean requeue)
參數(shù)說明:
requeue:表示拒絕后,這條消息如何處理。
如果requeue參數(shù)設(shè)置為true,則RabbitMQ會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下一個(gè)訂閱的消費(fèi)者。
如果requeue參數(shù)設(shè)置為false,則RabbitMQ會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消者。
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個(gè)命令。
3. 代碼示例
Spring-AMQP 對(duì)消息確認(rèn)機(jī)制提供了三種策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
AcknowledgeMode.NONE
這種模式下,消息一旦投遞給消費(fèi)者,不管消費(fèi)者是否成功處理了消息,RabbitMO就會(huì)自動(dòng)確認(rèn)消息,從RabbitMQ隊(duì)列中移除消息。如果消費(fèi)者處理消息失敗,消息可能會(huì)丟失。
AcknowledgeMode.AUTO(默認(rèn))
這種模式下,消費(fèi)者在消息處理成功時(shí)會(huì)自動(dòng)確認(rèn)消息,但如果處理過程中拋出了異常,則不會(huì)確認(rèn)消息。
AcknowledgeMode.MANUAL
手動(dòng)確認(rèn)模式下,消費(fèi)者必須在成功處理消息后顯式調(diào)用basicAck方法來確認(rèn)消息。
如果消息未被確認(rèn),RabbitMO會(huì)認(rèn)為消息尚未被成功處理,并且會(huì)在消費(fèi)者可用時(shí)重新投遞該消息。
這種模式提高了消息處理的可靠性,因?yàn)榧词瓜M(fèi)者處理消息后失敗,消息也不會(huì)丟失,而是可以被重新處理。
(1)?AcknowledgeMode.NONE
配置確認(rèn)機(jī)制
spring:
rabbitmq:
# 消息監(jiān)聽配置
listener:
simple:
acknowledge-mode: none
發(fā)送消息
交換機(jī),隊(duì)列配置
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
@Bean("ackExchange")
public Exchange ackExchange() {
return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();
}
/**
* 綁定ack隊(duì)列和ack交換機(jī)
*
* @param ackExchange
* @param ackQueue
* @return
*/
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange ackExchange, @Qualifier("ackQueue") Queue ackQueue) {
return BindingBuilder.bind(ackQueue).to(ackExchange).with("ack").noargs();
}
}
通過接口發(fā)送消息
package com.example.orderservice.controller;
import com.example.orderservice.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AckController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE, "ack", "ack test...");
return "發(fā)送成功";
}
}
消費(fèi)端邏輯
package com.example.materialflowservice.listener;
import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class AckListener {
/**
* 監(jiān)聽消息
*
* @param message
*/
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) {
log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
//int num = 3 / 0;
log.info("業(yè)務(wù)處理完成");
}
}
正常情況下是能接收到消息的
將 int num = 3 / 0 的注釋放開
可以看到報(bào)異常了
但消息沒有保留,依然被處理掉了。
這種情況就可能會(huì)導(dǎo)致我們的消息丟失。
(2)AcknowledgeMode.AUTO(默認(rèn))
根據(jù)deliverTag可以看出,消息是一直在不停重試的
并且還是保留在隊(duì)列當(dāng)中的,并沒有丟棄
(3)AcknowledgeMode.MANUAL
消費(fèi)者邏輯
package com.example.materialflowservice.listener;
import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class AckListener {
/**
* 監(jiān)聽消息
*
* @param message
*/
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) throws Exception {
try {
log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
//int num = 3 / 0;
log.info("業(yè)務(wù)處理完成");
// 確認(rèn)消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒絕消息,重新入隊(duì)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
}
}
此時(shí)是正常情況,消息被正常簽收。?
?將 int num = 3 / 0 的注釋放開之后,再次運(yùn)行
可以看出一直在進(jìn)行重新入隊(duì)操作。
如果我們將確認(rèn)消息注釋掉:
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
可以看到Unacked變?yōu)?了,未確認(rèn)狀態(tài)。
當(dāng)我們把basicNack第三個(gè)參數(shù)設(shè)為false:
// 拒絕消息,禁止重新入隊(duì)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
可以看到消息被丟棄了。
以上就是Spring-AMQP 對(duì)消息確認(rèn)機(jī)制提供了三種策略。
柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ - 消息確認(rèn)
好文閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。