柚子快報邀請碼778899分享:RabbitMQ - 消息確認
?1. 消息確認機制
生產(chǎn)者發(fā)送消息之后, 到達消費端之后, 可能會有以下情況:
????????(1)消息處理成功
????????(2)消息處理失敗
RabbitMQ向消費者發(fā)送消息之后, 就會把這條消息刪掉, 那么第二種情況, 就會造成消息丟失。
那么如何確保消費端已經(jīng)成功接收了, 并正確處理了呢?
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。
消費者在訂閱隊列時,可以指定autoAck參數(shù),根據(jù)這個參數(shù)設(shè)置,消息確認機制分為以下兩種:
自動確認:當autoAck等于true時,RabbitMQ ?會自動把發(fā)送出去的消息置為確認,然后從內(nèi)存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息,自動確認模式適合對于消息可靠性要求不高的場景。手動確認:當autoAck等于false時,RabbitMO會等待消費者顯式地調(diào)用Basic.Ack命令,回復(fù)確認信號后才從內(nèi)存(或者磁盤)中移除消息,這種模式適合對消息可靠性要求比較高的場景。
當autoAck參數(shù)置為false,對于RabbitMO服務(wù)端而言,隊列中的消息分成了兩個部分:
一是等待投遞給消費者的消息。
二是已經(jīng)投遞給消費者,但是還沒有收到消費者確認信號的消息。
如果RabbitM一直沒有收到消費者的確認信號,并且消費此消息的消費者已經(jīng)斷開連接。
則RabbitM會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費。
從RabbitMQ的Web管理平臺上, 也可以看到當前隊列中Ready狀態(tài)和Unacked狀態(tài)的
Ready: 等待投遞給消費者的消息數(shù)。
Unacked: 已經(jīng)投遞給消費者, 但是未收到消費者確認信號的消息。
2. 手動確認消息
消費者在收到消息之后,可以選擇確認,也可以選擇直接拒絕或者跳過,RabbitMQ也提供了不同的確認應(yīng)答的方式,消費者客戶端可以調(diào)用與其對應(yīng)的channel的相關(guān)方法,共有以下三種:
(1)肯定確認?
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMo已知道該消息并且成功的處理消息.可以將其丟棄了
參數(shù)說明:
deliveryTag:消息的唯一標識,它是一個單調(diào)遞增的64位的長整型值。
multiple:是否批量確認。
deliveryTag 是RabbitMQ中消息確認機制的一個重要組成部分,它確保了消息傳遞的可靠性和順序性。
(2)否定確認
Channel.basicReject(long deliveryTag, boolean requeue)
參數(shù)說明:
requeue:表示拒絕后,這條消息如何處理。
如果requeue參數(shù)設(shè)置為true,則RabbitMQ會重新將這條消息存入隊列,以便可以發(fā)送給下一個訂閱的消費者。
如果requeue參數(shù)設(shè)置為false,則RabbitMQ會把消息從隊列中移除,而不會把它發(fā)送給新的消者。
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個命令。
3. 代碼示例
Spring-AMQP 對消息確認機制提供了三種策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
AcknowledgeMode.NONE
這種模式下,消息一旦投遞給消費者,不管消費者是否成功處理了消息,RabbitMO就會自動確認消息,從RabbitMQ隊列中移除消息。如果消費者處理消息失敗,消息可能會丟失。
AcknowledgeMode.AUTO(默認)
這種模式下,消費者在消息處理成功時會自動確認消息,但如果處理過程中拋出了異常,則不會確認消息。
AcknowledgeMode.MANUAL
手動確認模式下,消費者必須在成功處理消息后顯式調(diào)用basicAck方法來確認消息。
如果消息未被確認,RabbitMO會認為消息尚未被成功處理,并且會在消費者可用時重新投遞該消息。
這種模式提高了消息處理的可靠性,因為即使消費者處理消息后失敗,消息也不會丟失,而是可以被重新處理。
(1)?AcknowledgeMode.NONE
配置確認機制
spring:
rabbitmq:
# 消息監(jiān)聽配置
listener:
simple:
acknowledge-mode: none
發(fā)送消息
交換機,隊列配置
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
@Bean("ackExchange")
public Exchange ackExchange() {
return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();
}
/**
* 綁定ack隊列和ack交換機
*
* @param ackExchange
* @param ackQueue
* @return
*/
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange ackExchange, @Qualifier("ackQueue") Queue ackQueue) {
return BindingBuilder.bind(ackQueue).to(ackExchange).with("ack").noargs();
}
}
通過接口發(fā)送消息
package com.example.orderservice.controller;
import com.example.orderservice.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AckController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE, "ack", "ack test...");
return "發(fā)送成功";
}
}
消費端邏輯
package com.example.materialflowservice.listener;
import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class AckListener {
/**
* 監(jiān)聽消息
*
* @param message
*/
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) {
log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
//int num = 3 / 0;
log.info("業(yè)務(wù)處理完成");
}
}
正常情況下是能接收到消息的
將 int num = 3 / 0 的注釋放開
可以看到報異常了
但消息沒有保留,依然被處理掉了。
這種情況就可能會導致我們的消息丟失。
(2)AcknowledgeMode.AUTO(默認)
根據(jù)deliverTag可以看出,消息是一直在不停重試的
并且還是保留在隊列當中的,并沒有丟棄
(3)AcknowledgeMode.MANUAL
消費者邏輯
package com.example.materialflowservice.listener;
import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class AckListener {
/**
* 監(jiān)聽消息
*
* @param message
*/
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) throws Exception {
try {
log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
//int num = 3 / 0;
log.info("業(yè)務(wù)處理完成");
// 確認消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒絕消息,重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
}
}
此時是正常情況,消息被正常簽收。?
?將 int num = 3 / 0 的注釋放開之后,再次運行
可以看出一直在進行重新入隊操作。
如果我們將確認消息注釋掉:
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
可以看到Unacked變?yōu)?了,未確認狀態(tài)。
當我們把basicNack第三個參數(shù)設(shè)為false:
// 拒絕消息,禁止重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
可以看到消息被丟棄了。
以上就是Spring-AMQP 對消息確認機制提供了三種策略。
柚子快報邀請碼778899分享:RabbitMQ - 消息確認
好文閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。