欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ實(shí)現(xiàn)延時(shí)消息

柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ實(shí)現(xiàn)延時(shí)消息

http://yzkb.51969.com/

RabbitMQ原生并不支持延時(shí)消息,需要我們自行實(shí)現(xiàn)。

AMQP協(xié)議

生產(chǎn)者端發(fā)布消息到MQ中的交換機(jī)Exchange,每條消息會(huì)帶一個(gè)路由鍵RoutingKey;交換機(jī)與隊(duì)列queue通過(guò)路由鍵進(jìn)行綁定binding;消息通過(guò)路由鍵路由到對(duì)應(yīng)的隊(duì)列queue,隊(duì)列與交換器沒(méi)有對(duì)應(yīng)的綁定關(guān)系則消息會(huì)丟失;消息由隊(duì)列投遞給消費(fèi)者端進(jìn)行消費(fèi)。

交換機(jī)類型

direct:直連交換機(jī)fanout:廣播交換機(jī)topic:主題交換機(jī)headers:頭交換機(jī),基本不用,由direct替代

延時(shí)消息實(shí)現(xiàn)方式

消息過(guò)期時(shí)間+死信隊(duì)列

普通隊(duì)列綁定死信交換機(jī),發(fā)送消息時(shí)設(shè)置過(guò)期時(shí)間,過(guò)期時(shí)間到了進(jìn)入死信隊(duì)列,監(jiān)聽(tīng)死信隊(duì)列消費(fèi)。 存在的問(wèn)題:消息的過(guò)期時(shí)間不一致時(shí),使用死信隊(duì)列可能起不到延時(shí)的作用。當(dāng)發(fā)送兩條不同過(guò)期時(shí)間的消息時(shí),先發(fā)送的消息1過(guò)期時(shí)間(20s)大于后發(fā)送的消息2過(guò)期時(shí)間(10s),由于消息的順序消費(fèi),消息2過(guò)期后并不會(huì)立即重新發(fā)布到死信交換機(jī),而是等消息1過(guò)期后一起被消費(fèi)。 使用場(chǎng)景:超時(shí)時(shí)間一致,如訂單超時(shí)取消。

延時(shí)插件

下載延時(shí)插件,將插件移至rabbitmq插件目錄下,通過(guò)命令應(yīng)用延時(shí)插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

聲明一個(gè)類型為x-delayed-message的交換機(jī)CustomExchange。參數(shù)添加x-delayed-type,值為交換機(jī)類型的屬性,如direct,用于路由鍵的映射。發(fā)送消息時(shí),消息頭header設(shè)置參數(shù)x-delay,值為延遲時(shí)間,單位毫秒。

云消息隊(duì)列RabbitMQ(阿里云)

阿里云消息隊(duì)列rabbitmq提供了延時(shí)消息的原生支持,只需要在消息頭header里面增加參數(shù)delay,值為延遲時(shí)間,單位毫秒。

MessageProperties messageProperties = new MessageProperties();

String msgId = "CANCEL:" + order.getId();

//此處設(shè)置的msgId才能被會(huì)轉(zhuǎn)成rabbitmq client的messageId,發(fā)送給broker

messageProperties.setMessageId(msgId);

long delay = 15 * 60 * 1000;

// 消息頭設(shè)置延時(shí)時(shí)間

messageProperties.setHeader("delay", delay);

Message message = new Message(order.getId().getBytes(), messageProperties);

rabbitTemplate.convertAndSend(ORDER_CANCEL_TOPIC, Q_CANCEL_ORDER, message);

代碼使用示例

pom文件添加amqp依賴。

org.springframework.boot

spring-boot-starter-amqp

配置文件application.yml添加mq配置。

spring:

rabbitmq:

host: 127.0.0.1

port: 5672

username: rabbitmq

password: rabbitmq

virtual-host: rabbitmq

listener:

simple:

acknowledge-mode: manual

編寫(xiě)配置類RabbitMQConfig,創(chuàng)建交換機(jī)、隊(duì)列并進(jìn)行綁定。

@Configuration

public class RabbitMQConfig {

// 普通交換機(jī)

public static final String ORDER_CANCEL_EXCHANGE = "order_cancel_exchange";

public static final String ORDER_CANCEL_QUEUE = "order_cancel_queue";

// 死信交換機(jī)

public static final String DEAD_EXCHANGE = "dead_exchange";

public static final String DEAD_ROUTING_KEY = "dead_routing_key";

public static final String DEAD_QUEUE = "dead_queue";

// 延遲插件交換機(jī)

public static final String DMP_EXCHANGE = "dmp_exchange";

public static final String DMP_ROUTING_KEY = "dmp_routing_key";

public static final String DMP_QUEUE = "dmp_queue";

@Bean

public Exchange orderCancelExchange() {

return ExchangeBuilder.directExchange(ORDER_CANCEL_EXCHANGE).build();

}

@Bean

public Queue orderCancelQueue() {

return QueueBuilder.durable(ORDER_CANCEL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)

.deadLetterRoutingKey(DEAD_ROUTING_KEY).build();

}

@Bean

public Exchange deadExchange() {

return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();

}

@Bean

public Queue deadQueue() {

return QueueBuilder.durable(DEAD_QUEUE).build();

}

@Bean

public Binding orderCancelBinding(Exchange orderCancelExchange, Queue orderCancelQueue) {

return BindingBuilder.bind(orderCancelQueue).to(orderCancelExchange).with("").noargs();

}

@Bean

public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {

return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();

}

// 延時(shí)插件使用

@Bean

public CustomExchange dmpExchange() {

Map args = new HashMap<>();

args.put("x-delayed-type", "direct");

return new CustomExchange(DMP_EXCHANGE, "x-delayed-message", true, false, args);

}

@Bean

public Queue dmpQueue() {

return QueueBuilder.durable(DMP_QUEUE).build();

}

@Bean

public Binding dmpBinding(CustomExchange dmpExchange, Queue dmpQueue) {

return BindingBuilder.bind(dmpQueue).to(dmpExchange).with(DMP_ROUTING_KEY).noargs();

}

}

4.編寫(xiě)消息生產(chǎn)者M(jìn)sgSender,實(shí)現(xiàn)死信隊(duì)列、延時(shí)插件消息發(fā)送。

@Component

@Slf4j

public class MsgSender {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 通過(guò)死信隊(duì)列發(fā)送消息

* @param message

* @param time

*/

public void send(String message, Integer time) {

String expireTime = String.valueOf(time * 1000);

rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_CANCEL_EXCHANGE, "", message, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

//設(shè)置消息的過(guò)期時(shí)間,是以毫秒為單位的

message.getMessageProperties().setExpiration(expireTime);

return message;

}

});

log.info("死信隊(duì)列消息:{}發(fā)送成功,過(guò)期時(shí)間:{}秒", message, time);

}

/**

* 通過(guò)延遲插件發(fā)送消息

* @param message

* @param time

*/

public void sendByPlugin(String message, Integer time) {

rabbitTemplate.convertAndSend(RabbitMQConfig.DMP_EXCHANGE, RabbitMQConfig.DMP_ROUTING_KEY, message, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 延遲插件只需要在消息的header中添加x-delay屬性,值為過(guò)期時(shí)間,單位毫秒

message.getMessageProperties().setHeader("x-delay", time * 1000);

return message;

}

});

log.info("延遲插件消息:{}發(fā)送成功,過(guò)期時(shí)間:{}秒", message, time);

}

}

編寫(xiě)消息消費(fèi)者M(jìn)sgListener,監(jiān)聽(tīng)延時(shí)消息進(jìn)行相應(yīng)的業(yè)務(wù)處理。

@Component

@Slf4j

public class MsgListener {

@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)

public void orderCancel(Message message, Channel channel) throws IOException {

log.info("使用死信隊(duì)列,收到消息:{}", new String(message.getBody()));

// 手動(dòng)ack

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = RabbitMQConfig.DMP_QUEUE)

public void orderCancelByPlugin(Message message, Channel channel) throws IOException {

log.info("使用延遲插件,收到消息:{}", new String(message.getBody()));

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

}

編寫(xiě)測(cè)試類MsgController,啟動(dòng)服務(wù)發(fā)送消息測(cè)試。

@RestController

public class MsgController {

@Autowired

public MsgSender msgSender;

@GetMapping("/send")

public String send(@RequestParam String msg, Integer time) {

msgSender.send(msg, time);

return "success";

}

@GetMapping("/sendByPlugin")

public String sendByPlugin(@RequestParam String msg, Integer time) {

msgSender.sendByPlugin(msg, time);

return "success";

}

}

1)使用死信隊(duì)列發(fā)送兩次消息http://localhost:8081/send?msg=消息A&time=20 和http://localhost:8081/send?msg=消息B&time=10,消息會(huì)按發(fā)送順序消費(fèi),并沒(méi)有起到延時(shí)的效果。

2)使用延時(shí)插件發(fā)送兩次消息http://localhost:8081/sendByPlugin?msg=消息A&time=20 和http://localhost:8081/sendByPlugin?msg=消息B&time=10,消息會(huì)按照延時(shí)時(shí)間順序消費(fèi)。

柚子快報(bào)邀請(qǐng)碼778899分享:RabbitMQ實(shí)現(xiàn)延時(shí)消息

http://yzkb.51969.com/

推薦鏈接

評(píng)論可見(jiàn),查看隱藏內(nèi)容
大家都在看:

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19509477.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄