柚子快報(bào)邀請碼778899分享:RabbitMQ之順序消費(fèi)
柚子快報(bào)邀請碼778899分享:RabbitMQ之順序消費(fèi)
什么是順序消費(fèi) 例如:業(yè)務(wù)上產(chǎn)生者發(fā)送三條消息, 分別是對同一條數(shù)據(jù)的增加、修改、刪除操作, 如果沒有保證順序消費(fèi),執(zhí)行順序可能變成刪除、修改、增加,這就亂了。 如何保證順序性 一般我們討論如何保證消息的順序性,會從下面三個方面考慮 1:發(fā)送消息的順序 2:隊(duì)列中消息的順序 3:消費(fèi)消息的順序 發(fā)送消息的順序 消息發(fā)送端的順序,大部分業(yè)務(wù)不做要求,誰先發(fā)消息無所謂,如果遇到業(yè)務(wù)一定要發(fā)送消息也確保順序,那意味著,只能全局加鎖一個個的操作,一個個的發(fā)消息,不能并發(fā)發(fā)送消息。
隊(duì)列中消息的順序 RabbitMQ 中,消息最終會保存在隊(duì)列中,在同一個隊(duì)列中,消息是順序的,先進(jìn)先出原則,這個由 RabbitMQ 保證,通常也不需要開發(fā)關(guān)心。
不同隊(duì)列 中的消息順序,是沒有保證的,例如:進(jìn)地鐵站的時候,排了三個隊(duì)伍,不同隊(duì)伍之間的,不能確保誰先進(jìn)站。
消費(fèi)消息的順序 我們說如何保證消息順序性,通常說的就是消費(fèi)者消費(fèi)消息的順序,在多個消費(fèi)者消費(fèi)同一個消息隊(duì)列的場景,通常是無法保證消息順序的,
雖然消息隊(duì)列的消息是順序的,但是多個消費(fèi)者并發(fā)消費(fèi)消息,獲取的消息的速度、執(zhí)行業(yè)務(wù)邏輯的速度快慢、執(zhí)行異常等等原因都會導(dǎo)致消息順序不一致。 例如:消息A、B、C按順序進(jìn)入隊(duì)列,消費(fèi)者A1拿到消息A、消費(fèi)者B1拿到消息B, 結(jié)果消費(fèi)者B執(zhí)行速度快,就跑完了,又或者消費(fèi)者A1掛了,都會導(dǎo)致消息順序不一致。 解決消費(fèi)順序的問題, 通常就是一個隊(duì)列只有一個消費(fèi)者 , 這樣就可以一個個消息按順序處理, 缺點(diǎn)就是并發(fā)能力下降了,無法并發(fā)消費(fèi)消息,這是個取舍問題。
如果業(yè)務(wù)又要順序消費(fèi),又要增加并發(fā),通常思路就是開啟多個隊(duì)列,業(yè)務(wù)根據(jù)規(guī)則將消息分發(fā)到不同的隊(duì)列,通過增加隊(duì)列的數(shù)量來提高并發(fā)度,例如:電商訂單場景,只需要保證同一個用戶的訂單消息的順序性就行,不同用戶之間沒有關(guān)系,所以只要讓同一個用戶的訂單消息進(jìn)入同一個隊(duì)列就行,其他用戶的訂單消息,可以進(jìn)入不同的隊(duì)列。
以下為代碼設(shè)計(jì)過程實(shí)現(xiàn) 首先我們必須保證只有一個消費(fèi)者 那么問題就來了,我們的項(xiàng)目一般是多副本的,如何保證只有一個副本在消費(fèi)呢 這時就會用到消費(fèi)者 單活模式 x-single-active-consumer 使用下述配置實(shí)現(xiàn)
private Queue creatQueue(String name){
// 創(chuàng)建一個 單活模式 隊(duì)列
HashMap
args.put("x-single-active-consumer",true);
return new Queue(name,true,false,false,args);
}
創(chuàng)建之后,我們可以在控制臺看到 消費(fèi)者的激活狀態(tài)
=======================>配置類
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {
@Bean
public Queue queue15_0() {
return creatQueue(Message15.QUEUE_0);
}
@Bean
public Queue queue15_1() {
return creatQueue(Message15.QUEUE_1);
}
@Bean
public Queue queue15_2() {
return creatQueue(Message15.QUEUE_2);
}
@Bean
public Queue queue15_3() {
return creatQueue(Message15.QUEUE_3);
}
@Bean
public DirectExchange exchange15() {
// name: 交換機(jī)名字 | durable: 是否持久化 | exclusive: 是否排它
return new DirectExchange(Message15.EXCHANGE, true, false);
}
@Bean
public Binding binding15_0() {
return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");
}
@Bean
public Binding binding15_1() {
return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");
}
@Bean
public Binding binding15_2() {
return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");
}
@Bean
public Binding binding15_3() {
return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");
}
/**
* 創(chuàng)建一個 單活 模式的隊(duì)列
* 注意 :
*
* 如果一個隊(duì)列已經(jīng)創(chuàng)建為非x-single-active-consumer,而你想更改其為x-single-active-consumer,要把之前創(chuàng)建的隊(duì)列刪除
*
* @param name
* @return queue
*/
private Queue creatQueue(String name) {
// 創(chuàng)建一個 單活模式 隊(duì)列
HashMap
args.put("x-single-active-consumer", true);
return new Queue(name, true, false, false, args);
}
=================================》生產(chǎn)者
@Component
@Slf4j
public class Producer15 {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 這里的發(fā)送是 擬投遞到多個隊(duì)列中
*
* @param id 業(yè)務(wù)id
* @param msg 業(yè)務(wù)信息
*/
public void syncSend(int id, String msg) {
Message15 message = new Message15(id, msg);
rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);
}
/**
* 根據(jù) id 取余來決定丟到那個隊(duì)列中去
*
* @param id id
* @return routingKey
*/
private String getRoutingKey(int id) {
return String.valueOf(id % Message15.QUEUE_COUNT);
}
}
============================》消費(fèi)者
/**
* 要想保證消息的順序,每個隊(duì)列只能有一個消費(fèi)者
*
* @author 深漂碼農(nóng)@明哥
* @date 2024-03-18
*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {
@RabbitHandler
public void onMessage(Message15 message) throws InterruptedException {
log.info("[{}][Consumer15 onMessage][線程編號:{} 消息內(nèi)容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
// 這里隨機(jī)睡一會,模擬業(yè)務(wù)處理時候的耗時
long l = new Random(1000).nextLong();
TimeUnit.MILLISECONDS.sleep(l);
}
}
==============================》測試類
@Test
void mock() throws InterruptedException {
// 先啟動這個測試類,模擬多個副本情況下,看如何消費(fèi)
new CountDownLatch(1).await();
}
@Test
void syncSend() throws InterruptedException {
// 模擬每個隊(duì)列中扔 10 個數(shù)據(jù),看看效果
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 4; j++) {
producer15.syncSend(j, " 編號:" + j + " 第:" + i + " 條消息");
}
}
TimeUnit.SECONDS.sleep(20);
}
}
ps:測試的時候時候 先啟動 mock 方式。 在啟動 syncSend 方法,模擬多個副本同時消費(fèi),觀察是否可以 以上的是RabbitMQ之順序消費(fèi)實(shí)現(xiàn)的代碼 若不了解rabbitmq的基本使用 建議先看看我前面對應(yīng)的文章 文章鏈接:點(diǎn)我—>let’s go 若需完整代碼 可識別二維碼后 給您發(fā)代碼。
柚子快報(bào)邀請碼778899分享:RabbitMQ之順序消費(fèi)
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。