欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ - 消息確認(rèn)

柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ - 消息確認(rèn)

http://yzkb.51969.com/

?1. 消息確認(rèn)機(jī)制

生產(chǎn)者發(fā)送消息之后, 到達(dá)消費(fèi)端之后, 可能會(huì)有以下情況:

????????(1)消息處理成功

????????(2)消息處理失敗

RabbitMQ向消費(fèi)者發(fā)送消息之后, 就會(huì)把這條消息刪掉, 那么第二種情況, 就會(huì)造成消息丟失。

那么如何確保消費(fèi)端已經(jīng)成功接收了, 并正確處理了呢?

為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement)。

消費(fèi)者在訂閱隊(duì)列時(shí),可以指定autoAck參數(shù),根據(jù)這個(gè)參數(shù)設(shè)置,消息確認(rèn)機(jī)制分為以下兩種:

自動(dòng)確認(rèn):當(dāng)autoAck等于true時(shí),RabbitMQ ?會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正地消費(fèi)到了這些消息,自動(dòng)確認(rèn)模式適合對(duì)于消息可靠性要求不高的場(chǎng)景。手動(dòng)確認(rèn):當(dāng)autoAck等于false時(shí),RabbitMO會(huì)等待消費(fèi)者顯式地調(diào)用Basic.Ack命令,回復(fù)確認(rèn)信號(hào)后才從內(nèi)存(或者磁盤)中移除消息,這種模式適合對(duì)消息可靠性要求比較高的場(chǎng)景。

當(dāng)autoAck參數(shù)置為false,對(duì)于RabbitMO服務(wù)端而言,隊(duì)列中的消息分成了兩個(gè)部分:

一是等待投遞給消費(fèi)者的消息。

二是已經(jīng)投遞給消費(fèi)者,但是還沒有收到消費(fèi)者確認(rèn)信號(hào)的消息。

如果RabbitM一直沒有收到消費(fèi)者的確認(rèn)信號(hào),并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開連接。

則RabbitM會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有可能還是原來的那個(gè)消費(fèi)。

從RabbitMQ的Web管理平臺(tái)上, 也可以看到當(dāng)前隊(duì)列中Ready狀態(tài)和Unacked狀態(tài)的

Ready: 等待投遞給消費(fèi)者的消息數(shù)。

Unacked: 已經(jīng)投遞給消費(fèi)者, 但是未收到消費(fèi)者確認(rèn)信號(hào)的消息。

2. 手動(dòng)確認(rèn)消息

消費(fèi)者在收到消息之后,可以選擇確認(rèn),也可以選擇直接拒絕或者跳過,RabbitMQ也提供了不同的確認(rèn)應(yīng)答的方式,消費(fèi)者客戶端可以調(diào)用與其對(duì)應(yīng)的channel的相關(guān)方法,共有以下三種:

(1)肯定確認(rèn)?

Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMo已知道該消息并且成功的處理消息.可以將其丟棄了

參數(shù)說明:

deliveryTag:消息的唯一標(biāo)識(shí),它是一個(gè)單調(diào)遞增的64位的長(zhǎng)整型值。

multiple:是否批量確認(rèn)。

deliveryTag 是RabbitMQ中消息確認(rèn)機(jī)制的一個(gè)重要組成部分,它確保了消息傳遞的可靠性和順序性。

(2)否定確認(rèn)

Channel.basicReject(long deliveryTag, boolean requeue)

參數(shù)說明:

requeue:表示拒絕后,這條消息如何處理。

如果requeue參數(shù)設(shè)置為true,則RabbitMQ會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下一個(gè)訂閱的消費(fèi)者。

如果requeue參數(shù)設(shè)置為false,則RabbitMQ會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消者。

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

Basic.Reject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個(gè)命令。

3. 代碼示例

Spring-AMQP 對(duì)消息確認(rèn)機(jī)制提供了三種策略:

public enum AcknowledgeMode {

NONE,

MANUAL,

AUTO;

}

AcknowledgeMode.NONE

這種模式下,消息一旦投遞給消費(fèi)者,不管消費(fèi)者是否成功處理了消息,RabbitMO就會(huì)自動(dòng)確認(rèn)消息,從RabbitMQ隊(duì)列中移除消息。如果消費(fèi)者處理消息失敗,消息可能會(huì)丟失。

AcknowledgeMode.AUTO(默認(rèn))

這種模式下,消費(fèi)者在消息處理成功時(shí)會(huì)自動(dòng)確認(rèn)消息,但如果處理過程中拋出了異常,則不會(huì)確認(rèn)消息。

AcknowledgeMode.MANUAL

手動(dòng)確認(rèn)模式下,消費(fèi)者必須在成功處理消息后顯式調(diào)用basicAck方法來確認(rèn)消息。

如果消息未被確認(rèn),RabbitMO會(huì)認(rèn)為消息尚未被成功處理,并且會(huì)在消費(fèi)者可用時(shí)重新投遞該消息。

這種模式提高了消息處理的可靠性,因?yàn)榧词瓜M(fèi)者處理消息后失敗,消息也不會(huì)丟失,而是可以被重新處理。

(1)?AcknowledgeMode.NONE

配置確認(rèn)機(jī)制

spring:

rabbitmq:

# 消息監(jiān)聽配置

listener:

simple:

acknowledge-mode: none

發(fā)送消息

交換機(jī),隊(duì)列配置

@Configuration

public class RabbitMQConfig {

@Bean("ackQueue")

public Queue ackQueue() {

return QueueBuilder.durable(Constant.ACK_QUEUE).build();

}

@Bean("ackExchange")

public Exchange ackExchange() {

return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();

}

/**

* 綁定ack隊(duì)列和ack交換機(jī)

*

* @param ackExchange

* @param ackQueue

* @return

*/

@Bean("ackBinding")

public Binding ackBinding(@Qualifier("ackExchange") Exchange ackExchange, @Qualifier("ackQueue") Queue ackQueue) {

return BindingBuilder.bind(ackQueue).to(ackExchange).with("ack").noargs();

}

}

通過接口發(fā)送消息

package com.example.orderservice.controller;

import com.example.orderservice.Constant;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class AckController {

@Autowired

private RabbitTemplate rabbitTemplate;

@RequestMapping("/ack")

public String ack() {

rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE, "ack", "ack test...");

return "發(fā)送成功";

}

}

消費(fèi)端邏輯

package com.example.materialflowservice.listener;

import com.example.orderservice.Constant;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Slf4j

@Component

public class AckListener {

/**

* 監(jiān)聽消息

*

* @param message

*/

@RabbitListener(queues = Constant.ACK_QUEUE)

public void listener(Message message, Channel channel) {

log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());

//int num = 3 / 0;

log.info("業(yè)務(wù)處理完成");

}

}

正常情況下是能接收到消息的

將 int num = 3 / 0 的注釋放開

可以看到報(bào)異常了

但消息沒有保留,依然被處理掉了。

這種情況就可能會(huì)導(dǎo)致我們的消息丟失。

(2)AcknowledgeMode.AUTO(默認(rèn))

根據(jù)deliverTag可以看出,消息是一直在不停重試的

并且還是保留在隊(duì)列當(dāng)中的,并沒有丟棄

(3)AcknowledgeMode.MANUAL

消費(fèi)者邏輯

package com.example.materialflowservice.listener;

import com.example.orderservice.Constant;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

@Slf4j

@Component

public class AckListener {

/**

* 監(jiān)聽消息

*

* @param message

*/

@RabbitListener(queues = Constant.ACK_QUEUE)

public void listener(Message message, Channel channel) throws Exception {

try {

log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());

//int num = 3 / 0;

log.info("業(yè)務(wù)處理完成");

// 確認(rèn)消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

// 拒絕消息,重新入隊(duì)

channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);

}

}

}

此時(shí)是正常情況,消息被正常簽收。?

?將 int num = 3 / 0 的注釋放開之后,再次運(yùn)行

可以看出一直在進(jìn)行重新入隊(duì)操作。

如果我們將確認(rèn)消息注釋掉:

// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

可以看到Unacked變?yōu)?了,未確認(rèn)狀態(tài)。

當(dāng)我們把basicNack第三個(gè)參數(shù)設(shè)為false:

// 拒絕消息,禁止重新入隊(duì)

channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);

可以看到消息被丟棄了。

以上就是Spring-AMQP 對(duì)消息確認(rèn)機(jī)制提供了三種策略。

柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ - 消息確認(rèn)

http://yzkb.51969.com/

好文閱讀

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19504528.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄