欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)邀請碼778899分享:RabbitMQ之順序消費(fèi)

柚子快報(bào)邀請碼778899分享:RabbitMQ之順序消費(fèi)

http://yzkb.51969.com/

什么是順序消費(fèi) 例如:業(yè)務(wù)上產(chǎn)生者發(fā)送三條消息, 分別是對同一條數(shù)據(jù)的增加、修改、刪除操作, 如果沒有保證順序消費(fèi),執(zhí)行順序可能變成刪除、修改、增加,這就亂了。 如何保證順序性 一般我們討論如何保證消息的順序性,會從下面三個方面考慮 1:發(fā)送消息的順序 2:隊(duì)列中消息的順序 3:消費(fèi)消息的順序 發(fā)送消息的順序 消息發(fā)送端的順序,大部分業(yè)務(wù)不做要求,誰先發(fā)消息無所謂,如果遇到業(yè)務(wù)一定要發(fā)送消息也確保順序,那意味著,只能全局加鎖一個個的操作,一個個的發(fā)消息,不能并發(fā)發(fā)送消息。

隊(duì)列中消息的順序 RabbitMQ 中,消息最終會保存在隊(duì)列中,在同一個隊(duì)列中,消息是順序的,先進(jìn)先出原則,這個由 RabbitMQ 保證,通常也不需要開發(fā)關(guān)心。

不同隊(duì)列 中的消息順序,是沒有保證的,例如:進(jìn)地鐵站的時候,排了三個隊(duì)伍,不同隊(duì)伍之間的,不能確保誰先進(jìn)站。

消費(fèi)消息的順序 我們說如何保證消息順序性,通常說的就是消費(fèi)者消費(fèi)消息的順序,在多個消費(fèi)者消費(fèi)同一個消息隊(duì)列的場景,通常是無法保證消息順序的,

雖然消息隊(duì)列的消息是順序的,但是多個消費(fèi)者并發(fā)消費(fèi)消息,獲取的消息的速度、執(zhí)行業(yè)務(wù)邏輯的速度快慢、執(zhí)行異常等等原因都會導(dǎo)致消息順序不一致。 例如:消息A、B、C按順序進(jìn)入隊(duì)列,消費(fèi)者A1拿到消息A、消費(fèi)者B1拿到消息B, 結(jié)果消費(fèi)者B執(zhí)行速度快,就跑完了,又或者消費(fèi)者A1掛了,都會導(dǎo)致消息順序不一致。 解決消費(fèi)順序的問題, 通常就是一個隊(duì)列只有一個消費(fèi)者 , 這樣就可以一個個消息按順序處理, 缺點(diǎn)就是并發(fā)能力下降了,無法并發(fā)消費(fèi)消息,這是個取舍問題。

如果業(yè)務(wù)又要順序消費(fèi),又要增加并發(fā),通常思路就是開啟多個隊(duì)列,業(yè)務(wù)根據(jù)規(guī)則將消息分發(fā)到不同的隊(duì)列,通過增加隊(duì)列的數(shù)量來提高并發(fā)度,例如:電商訂單場景,只需要保證同一個用戶的訂單消息的順序性就行,不同用戶之間沒有關(guān)系,所以只要讓同一個用戶的訂單消息進(jìn)入同一個隊(duì)列就行,其他用戶的訂單消息,可以進(jìn)入不同的隊(duì)列。

以下為代碼設(shè)計(jì)過程實(shí)現(xiàn) 首先我們必須保證只有一個消費(fèi)者 那么問題就來了,我們的項(xiàng)目一般是多副本的,如何保證只有一個副本在消費(fèi)呢 這時就會用到消費(fèi)者 單活模式 x-single-active-consumer 使用下述配置實(shí)現(xiàn)

private Queue creatQueue(String name){

// 創(chuàng)建一個 單活模式 隊(duì)列

HashMap args=new HashMap<>();

args.put("x-single-active-consumer",true);

return new Queue(name,true,false,false,args);

}

創(chuàng)建之后,我們可以在控制臺看到 消費(fèi)者的激活狀態(tài)

=======================>配置類

@Configuration

@SuppressWarnings("all")

public class DirectExchangeConfiguration {

@Bean

public Queue queue15_0() {

return creatQueue(Message15.QUEUE_0);

}

@Bean

public Queue queue15_1() {

return creatQueue(Message15.QUEUE_1);

}

@Bean

public Queue queue15_2() {

return creatQueue(Message15.QUEUE_2);

}

@Bean

public Queue queue15_3() {

return creatQueue(Message15.QUEUE_3);

}

@Bean

public DirectExchange exchange15() {

// name: 交換機(jī)名字 | durable: 是否持久化 | exclusive: 是否排它

return new DirectExchange(Message15.EXCHANGE, true, false);

}

@Bean

public Binding binding15_0() {

return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");

}

@Bean

public Binding binding15_1() {

return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");

}

@Bean

public Binding binding15_2() {

return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");

}

@Bean

public Binding binding15_3() {

return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");

}

/**

* 創(chuàng)建一個 單活 模式的隊(duì)列

* 注意 :

*

* 如果一個隊(duì)列已經(jīng)創(chuàng)建為非x-single-active-consumer,而你想更改其為x-single-active-consumer,要把之前創(chuàng)建的隊(duì)列刪除

*

* @param name

* @return queue

*/

private Queue creatQueue(String name) {

// 創(chuàng)建一個 單活模式 隊(duì)列

HashMap args = new HashMap<>();

args.put("x-single-active-consumer", true);

return new Queue(name, true, false, false, args);

}

=================================》生產(chǎn)者

@Component

@Slf4j

public class Producer15 {

@Resource

private RabbitTemplate rabbitTemplate;

/**

* 這里的發(fā)送是 擬投遞到多個隊(duì)列中

*

* @param id 業(yè)務(wù)id

* @param msg 業(yè)務(wù)信息

*/

public void syncSend(int id, String msg) {

Message15 message = new Message15(id, msg);

rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);

}

/**

* 根據(jù) id 取余來決定丟到那個隊(duì)列中去

*

* @param id id

* @return routingKey

*/

private String getRoutingKey(int id) {

return String.valueOf(id % Message15.QUEUE_COUNT);

}

}

============================》消費(fèi)者

/**

* 要想保證消息的順序,每個隊(duì)列只能有一個消費(fèi)者

*

* @author 深漂碼農(nóng)@明哥

* @date 2024-03-18

*/

@Component

@RabbitListener(queues = Message15.QUEUE_0)

@RabbitListener(queues = Message15.QUEUE_1)

@RabbitListener(queues = Message15.QUEUE_2)

@RabbitListener(queues = Message15.QUEUE_3)

@Slf4j

public class Consumer15 {

@RabbitHandler

public void onMessage(Message15 message) throws InterruptedException {

log.info("[{}][Consumer15 onMessage][線程編號:{} 消息內(nèi)容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);

// 這里隨機(jī)睡一會,模擬業(yè)務(wù)處理時候的耗時

long l = new Random(1000).nextLong();

TimeUnit.MILLISECONDS.sleep(l);

}

}

==============================》測試類

@Test

void mock() throws InterruptedException {

// 先啟動這個測試類,模擬多個副本情況下,看如何消費(fèi)

new CountDownLatch(1).await();

}

@Test

void syncSend() throws InterruptedException {

// 模擬每個隊(duì)列中扔 10 個數(shù)據(jù),看看效果

for (int i = 0; i < 10; i++) {

for (int j = 0; j < 4; j++) {

producer15.syncSend(j, " 編號:" + j + " 第:" + i + " 條消息");

}

}

TimeUnit.SECONDS.sleep(20);

}

}

ps:測試的時候時候 先啟動 mock 方式。 在啟動 syncSend 方法,模擬多個副本同時消費(fèi),觀察是否可以 以上的是RabbitMQ之順序消費(fèi)實(shí)現(xiàn)的代碼 若不了解rabbitmq的基本使用 建議先看看我前面對應(yīng)的文章 文章鏈接:點(diǎn)我—>let’s go 若需完整代碼 可識別二維碼后 給您發(fā)代碼。

柚子快報(bào)邀請碼778899分享:RabbitMQ之順序消費(fèi)

http://yzkb.51969.com/

參考文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19033442.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄