柚子快報邀請碼778899分享:分布式 RabbitMq
柚子快報邀請碼778899分享:分布式 RabbitMq
目錄
一、為什么要用到RabbitMq?
二、RabbitMq有什么作用?
1.解耦
2.異步
三、RabbitMq的模型
1.helloword模型
2.Work模型
3.發(fā)布訂閱模型
4.路由鍵模型
5.主題模型
四、RabbitMq跟SpringBoot的整合
1.導(dǎo)入依賴
2.yml配置
3.創(chuàng)建隊列、創(chuàng)建交換機、將隊列與交換機綁定并設(shè)置路由鍵
4.生產(chǎn)者發(fā)送消息
5.消費者消費消息
五、ACK機制
1.什么是消息確認機制?
2.手動開啟ACK
2.消費者應(yīng)答
六、消息的可靠性
1.消息可靠性講的不能丟失,MQ是如何保證消息可靠性的?
1.1消息從生產(chǎn)者到交換機有可能會丟失。這里可以通過confirm機制來解決
1.2交換機到隊列也有可能會丟失。這里可以通過return機制來解決
1.33、從隊列到消費者也有可能會丟失。這里可以通過手動ACK解決。
2.confirm和return機制的實現(xiàn)
2.1yml配置
3.MQ是如何實現(xiàn)消息確認機制的?
4.消息補償機制
5.服務(wù)端實現(xiàn)遠程調(diào)用(RPC)
5.1通過java網(wǎng)絡(luò)編程包
5.2RestTemplate
5.3ApacheHttpClient:通過工具類即可
七、消息的重復(fù)消費
1.消息被消費多次的后果
2.怎么解決消息被重復(fù)消費
4.冪等性解決方案
八、死信隊列
1.什么是死信?
2.什么是非正常的消息
3.什么是死信隊列
九、延遲隊列
一、為什么要用到RabbitMq?
因為像我們之前的項目,代碼之間的執(zhí)行都是同步的,一個業(yè)務(wù)的處理必須等待上一個業(yè)務(wù)的完成,這樣就比較耗費時間,比如我們的用戶查詢數(shù)據(jù)的時候,對于用戶而言他只需要查尋數(shù)據(jù)這一個操作,對于我們服務(wù)端而言可能還需要做一些處理,像存入緩存、刪除緩存,只有做完這些操作我們服務(wù)端才會把數(shù)據(jù)傳給用戶,但是這些是我們業(yè)務(wù)的處理,不應(yīng)該讓用戶來承擔(dān)這樣的一個時間成本,并且用戶等待數(shù)據(jù)的時間過長,給用戶也會帶來了很不好的體驗感,同時模塊之前的耦合性很高,一個模塊宕機后,全部模塊都不能用了。所以要中間件RabbitMQ
二、RabbitMq有什么作用?
1.解耦
就是rabbitMq有一個生產(chǎn)者模塊負責(zé)發(fā)送消息到隊列中,一個消費者模塊負責(zé)從隊列中拿到數(shù)據(jù)進行消費,模塊與模塊間分離,通過RabbitMq進行數(shù)據(jù)通信
2.異步
它可以不需要等待我們代碼的全部執(zhí)行,就可以用戶所需要的數(shù)據(jù)將消息發(fā)送到隊列中,然后由隊列推給用戶
三、RabbitMq的模型
1.helloword模型
一個生產(chǎn)者,一個隊列,一個消費者
2.Work模型
?一個生產(chǎn)者,一個隊列,多個消費者
采用多個消費者是為了加快消息的消費,多個消費者之間采用輪訓(xùn)的方式。
3.發(fā)布訂閱模型
一個生產(chǎn)者,一個交換機 ,多個隊列,一個隊列上對應(yīng)一個消費者,消費者只有訂閱才能收到消息,交換機通過廣播給訂閱的隊列
4.路由鍵模型
路由鍵模型,跟發(fā)布訂閱一樣,只不過多了個路由鍵,進行一個條件的判斷
5.主題模型
主體模型跟路由鍵類似,只不過路由鍵多了兩個符號 *代表可以接單個字符,# 代表可以接多個字符
四、RabbitMq跟SpringBoot的整合
1.導(dǎo)入依賴
2.yml配置
spring:
rabbitmq:
host: 192.168.107.123 #虛擬主機的ip地址
port: 5672 #RabbitMq的端口號
username: guest #匿名用戶
password: guest
virtual-host: / # 虛擬機主機,隊列就是保存在虛擬主機中
3.創(chuàng)建隊列、創(chuàng)建交換機、將隊列與交換機綁定并設(shè)置路由鍵
// 交換機的類型是路由鍵
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public Queue directQueue1() {
return new Queue("direct-queue1");
}
@Bean
public Queue directQueue2() {
return new Queue("direct-queue2");
}
// 綁定
@Bean
public Binding directQueue1Bind() {
// 給direct-queue1綁定了兩個隊列
BindingBuilder.DirectExchangeRoutingKeyConfigurer to = BindingBuilder.bind(directQueue1()).to(directExchange());
// 綁定了兩個路由鍵
to.with("error");
Binding warn = to.with("info");
return warn;
}
@Bean
public Binding directQueue2Bind() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
}
4.生產(chǎn)者發(fā)送消息
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("direct-exchange", "error", "toString");
System.out.println("生產(chǎn)者消息發(fā)送完成");
}
5.消費者消費消息
@Component
public class HelloQueueListener {
@RabbitListener(queues = "direct-queue1")
public void cosnuermMsg(String msg) {
System.out.println("消費者拿到的數(shù)是:" + msg);
}
}
五、ACK機制
ACK機制是消費端的一個消息確認機制
1.什么是消息確認機制?
MQServer把消息推送給消費者后,消費者開始消費,消費完成后需要把結(jié)果給MQServer應(yīng)答一下,消費結(jié)果有兩種情況:失敗、成功
消費成功:應(yīng)答ACK,MQServer手動ACK后就明白這個消息已經(jīng)被成功的消費了,可以從隊列中刪除了。
消費失敗:應(yīng)答NACK。MQServer收到NACK后知道了消費者無法消費這個消息,發(fā)送給其他的消費者進行消費。如果其他的消費者也是無法消費,此時需要這類消息全部的收集起來入庫,通知相關(guān)人員來檢查。
消費者默認自動應(yīng)答,不出異常自動應(yīng)答,出了異常應(yīng)答NACK,并且把這個消息壓入到隊列
ready:待分配(消費者)的消息的數(shù)量。
unackded:待應(yīng)答的消息數(shù)量。
total:總消息的數(shù)量。
當(dāng)出現(xiàn)異常沒有處理時候,那么被認為應(yīng)答nACK,消息回到隊列的待應(yīng)答狀態(tài),關(guān)掉消費者,則進入待分配狀態(tài)客戶端先創(chuàng)建連接對象,有了連接對象才能創(chuàng)建信道進行數(shù)據(jù)的傳輸
channel斷開后把待應(yīng)答的消息,全部變?yōu)榇峙洹?/p>
mqServer是支持持久化的,重啟后數(shù)據(jù)還有,down刪除就沒有了
2.手動開啟ACK
1.ylm配置文件
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手動ACK
2.消費者應(yīng)答
@RabbitListener(queues = "hello-queue")
public void cosnuermMsg(String msg, Channel channel, Message message) {
System.out.println("消費者拿到的數(shù)是:" + msg);
// 每個消息的標識
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 開始消費消息
Boolean flag = customerData(msg);
if (flag) {
// 確定消息消費成功,應(yīng)答ACK
// 第一個參數(shù):消息的唯一標識
// 第二個參數(shù):是否批量應(yīng)答,一般都是false
channel.basicAck(deliveryTag, false);
System.out.println("消息成功,應(yīng)答ACK");
return;
}
System.out.println("消息過程中沒有出現(xiàn)異常,但是消息沒有消費成功,應(yīng)答NACK");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消費過程中出現(xiàn)了異常,應(yīng)答NACK");
}
// 消息消費失敗,應(yīng)答NACK
// 第三個參數(shù)是:是否壓入隊列,如果設(shè)置為false該消息就丟棄了
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
e.printStackTrace();
}
}
// 完成消息的消費
private Boolean customerData(String msg) {
Integer i = Integer.parseInt(msg);
if (i == 0) { // 沒有插入成功,但是也沒有出現(xiàn)異常
return false;
}
return true;
}
六、消息的可靠性
1.消息可靠性講的不能丟失,MQ是如何保證消息可靠性的?
1.1消息從生產(chǎn)者到交換機有可能會丟失。這里可以通過confirm機制來解決
confirm機制是RabbitMQ自己提供的一個機制,用來確認消息是否到了交換機了。
1.2交換機到隊列也有可能會丟失。這里可以通過return機制來解決
reutrn機制是RabbitMQ自己提供的一個機制,用來確認消息是否到了隊列了。
1.33、從隊列到消費者也有可能會丟失。這里可以通過手動ACK解決。
手動ACK后就明白這個消息已經(jīng)被成功的消費了,可以從隊列中刪除了
2.confirm和return機制的實現(xiàn)
2.1yml配置
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
publisher-returns: true #開啟return
publisher-confirm-type: simple # 開啟confirm
/**
* confirm機制和return機制
*/
@Component
public class MsgConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//這里要對mq的return和confirm進行覆蓋
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
// confirm機制確認消息是否到了交換機
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已經(jīng)到了交換機");
} else {
System.out.println("消息沒到了交換機");
}
}
// 確認消息是否到了隊列
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息沒有到隊列," + replyText + "," + exchange + "," + routingKey);
}
3.MQ是如何實現(xiàn)消息確認機制的?
生產(chǎn)者發(fā)送消息的時候除了交換機,數(shù)據(jù),路由鍵把回調(diào)函數(shù)也發(fā)送過去了,因為消息是否到了交換機時MQServer確定的,如果到了就調(diào)用ACK
4.消息補償機制
但是這只是通知消息是否正常到達,并沒有對消息沒有達到的情況進行一個處理,所以我們要進行消息補償,生產(chǎn)者對未發(fā)送成功的消息進行一個消息補償,特別是對業(yè)務(wù)很重要的數(shù)據(jù)必須補償,無關(guān)緊要的倒可以不必因為消息補償需要成本
也就是mq發(fā)送數(shù)據(jù)出現(xiàn)故障的時候,就可以考慮別的方式發(fā)送數(shù)據(jù)了,這就是消息補償,如:RPC遠程調(diào)用直接由生產(chǎn)者發(fā)送給消費者,不通過mq
5.服務(wù)端實現(xiàn)遠程調(diào)用(RPC)
5.1通過java網(wǎng)絡(luò)編程包
5.2RestTemplate
@Test
void contextLoads2() throws Exception {
// 這個類可以發(fā)送一個請求過去
RestTemplate restTemplate = new RestTemplate();
String info = restTemplate.getForObject("http://localhost:8080/send?msg=HTTP", String.class);
System.out.println("遠程調(diào)用的返回的結(jié)果:"+info);
}
5.3ApacheHttpClient:通過工具類即可
七、消息的重復(fù)消費
消息的重復(fù)消費指的是一個消息被同一個消費者消費了多次。
1.消息被消費多次的后果
消費者拿到消息后干的事情是扣款或者是發(fā)送短信等待。
正常來說消費一次就夠了,重復(fù)消費后就發(fā)現(xiàn)扣款了多次。
2.怎么解決消息被重復(fù)消費
控制冪等性? 冪等性:多次相同的操作不會對數(shù)據(jù)產(chǎn)生影響
接口的冪等性 就是多次相同的操作不會對數(shù)據(jù)再次改變 消息的冪等性 就是這個消息的重復(fù)消費不會對數(shù)據(jù)產(chǎn)生影響
3.為什么消息會被重復(fù)消費
消費者拿到數(shù)據(jù)開始消費,并且也消費成功了,在做ACK應(yīng)答之前網(wǎng)絡(luò)出現(xiàn)了閃斷,消費者和MQServer斷開了連接。MQServer中的待應(yīng)答就會變成待分配,此時消息已經(jīng)成功消費了,因為是閃斷,所以又再次的連接成功,MQServer在再次的把消息推送給了消費者,消費者再次拿到數(shù)據(jù)再次進行消費,這里就出現(xiàn)了重復(fù)的消費。
這個地方只需要把消費者中調(diào)用消費數(shù)據(jù)的方法控制冪等性就可以了
4.冪等性解決方案
Token機制:關(guān)于解決表單的重復(fù)提交就是服務(wù)端生成一個token帶給表單,表單中隱藏這個token,多次提交攜帶token過去,第一次token有效進行操作,然后把token設(shè)為失效了,然后后面的提交攜帶的token就是失效的了,就不會操作了
CAS保證接口冪等性
樂觀鎖實現(xiàn)冪等性
防重表
緩存隊列
select+insert
八、死信隊列
1.什么是死信?
非正常的消息就是死信
2.什么是非正常的消息
被Nack的,被拒絕的,超過隊列長度被擠出去的信息
3.什么是死信隊列
保存死信的消息的隊列就是死信隊列
被nack的消息可以壓入隊列,然后在隊列中配置了死信交換機和路由鍵
?的信息,轉(zhuǎn)到死信交換機,由死信交換機發(fā)給死信隊列,然后交給另一個消費者處理
一般無法處理的消息都是死信,會把這些死信消息全部的轉(zhuǎn)到同一個隊列來做特殊的處理,這個隊列就是死信隊列。
九、延遲隊列
有些場景不希望馬上消費,是需要延時一段時間后再去消費。比如:10分鐘后,半個小時后干點事情。
比如:訂單超過關(guān)閉,驗證碼超時失效,這些都是延時任務(wù),就可以通過延遲隊列完成
柚子快報邀請碼778899分享:分布式 RabbitMq
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。