柚子快報激活碼778899分享:分布式 RabbitMQ進(jìn)階
柚子快報激活碼778899分享:分布式 RabbitMQ進(jìn)階
RabbitMQ進(jìn)階
1 RabbitMQ 的高級特性
1.1TTL(Time-To-Live)生存時間
TTL 允許設(shè)置消息的生存時間,超過指定時間仍未被消費者處理的消息將被視為過期。過期消息可以進(jìn)入死信隊列,用于后續(xù)處理或分析。這一特性有助于系統(tǒng)資源的合理利用和消息的有效管理。
@Test
public void test2(){
//1.創(chuàng)建具有過期時間的消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message("單條消息過期".getBytes(),messageProperties);
//2.發(fā)送消息
rabbitTemplate.convertAndSend("ttl-exchange","ttl-routingKey",message);
}
**注意: **
如果設(shè)置了單條消息的存活時間,也設(shè)置了隊列的存活時間,以時間短的為準(zhǔn)。消息過期后,并不會馬上移除消息,只有消息消費到隊列頂端時,才會移除該消息。
1.2. 消息優(yōu)先級
RabbitMQ 支持為消息設(shè)置優(yōu)先級,確保重要消息得到更快的處理。生產(chǎn)者可以通過設(shè)置消息的優(yōu)先級,讓代理在投遞消息時考慮這一因素,提高關(guān)鍵消息的傳遞速度。
/**
* 隊列優(yōu)先級
*/
@Configuration
public class PriorityConfig {
@Bean("pri-exchange")
public Exchange ackExchange(){
return ExchangeBuilder.topicExchange("pri-exchange").durable(true).build();
}
@Bean("pri-queue")
public Queue ackQueue(){
return QueueBuilder
.durable("pri-queue")
//設(shè)置隊列的最大優(yōu)先級,最大可以設(shè)置到255,官網(wǎng)推薦不要超過10,,如果設(shè)置太高比較浪費資源
.maxPriority(10)
.build();
}
@Bean
public Binding binding(@Qualifier("pri-queue")Queue queue,
@Qualifier("pri-exchange")Exchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("pri-routingKey").noargs();
}
}
1.3. 消息事務(wù)
消息事務(wù)是 RabbitMQ 提供的一種機制,用于確保消息的可靠傳遞。生產(chǎn)者可以開啟事務(wù),在事務(wù)中發(fā)布消息,只有在事務(wù)提交時,消息才會被真正發(fā)送到隊列。這有助于確保消息的原子性傳遞。
1.4. 集群與高可用性
RabbitMQ 支持構(gòu)建集群,將多個 RabbitMQ 節(jié)點組合在一起,提高系統(tǒng)的可用性和容錯能力。集群中的節(jié)點可以共享隊列和交換機的信息,確保在某個節(jié)點故障時,其他節(jié)點能夠繼續(xù)提供服務(wù)。
1.5消費限流
之前我們講過MQ可以對請求進(jìn)行“削峰填谷”,即通過消費端限流的方式限制消息的拉取速度,達(dá)到保護(hù)消費端的目的。
消費端限流的寫法如下:
spring:
rabbitmq:
host: 192.168.184.136
port: 5672
virtual-host: /cvdf
username: cvdf
password: cvdf
# 手動簽收消息
listener:
simple:
# 限流機制必須開啟手動簽收
acknowledge-mode: manual
# 消費端最多拉取5條消息消費,簽收后不滿5條才會繼續(xù)拉取消息
prefetch: 5
1.6死信隊列
在MQ中,當(dāng)消息成為死信(Dead message)后,消息中間件可以將其從當(dāng)前隊列發(fā)送到另一個隊列中,這個隊列就是死信隊列。而在RabbitMQ中,由于有交換機的概念,實際是將死信發(fā)送給了死信交換機(Dead Letter Exchange,簡稱DLX)。死信交換機和死信隊列和普通的沒有區(qū)別。 消息成為死信的情況:
隊列消息長度到達(dá)限制。消費者拒簽消息,并且不把消息重新放入原隊列。消息到達(dá)存活時間未被消費。
死信隊列實現(xiàn)如下
/**
* 死信隊列相關(guān)配置
*/
@Configuration
public class DLXConfig {
//1.死信交換機、隊列、binding
@Bean("dead-exchange")
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange("dead-exchange").durable(true).build();
}
@Bean("dead-queue")
public Queue deadQueue(){
return QueueBuilder
.durable("dead-queue")
.build();
}
@Bean
public Binding deadBinding(@Qualifier("dead-queue")Queue queue,
@Qualifier("dead-exchange")Exchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("dead-routingKey").noargs();
}
//2.正常交換機、隊列、binding
@Bean("normal-exchange")
public Exchange ackExchange(){
return ExchangeBuilder.topicExchange("normal-exchange").durable(true).build();
}
@Bean("normal-queue")
public Queue ackQueue(){
return QueueBuilder
.durable("normal-queue")
.deadLetterExchange("dead-exchange")//綁定死信交換機
.deadLetterRoutingKey("dead-routingKey")//死信routingKey
//過期時間
.ttl(20000)
.maxLength(10) //隊列最大長度
.build();
}
@Bean
public Binding binding(@Qualifier("normal-queue")Queue queue,
@Qualifier("normal-exchange")Exchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("normal-routingKey").noargs();
}
}
#生產(chǎn)者發(fā)送消息
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//演示消息超過隊列長度成為死信
for (int i = 1; i <=20 ; i++) {
rabbitTemplate.convertAndSend("normal-exchange","normal-routingKey","~~~消息~~~");
}
}
@Test
public void test2(){
//演示消息過期成為死信
for (int i = 1; i <=2 ; i++) {
rabbitTemplate.convertAndSend("normal-exchange","normal-routingKey","~~~消息~~~");
}
}
}
/**
* 消費者拒收成為死信隊列
*/
@Component
public class DLXListener {
@RabbitListener(queues = "normal-queue")
public void ack(Message message, Channel channel) throws IOException, InterruptedException {
//獲得消息投遞序號,消息每次投遞該值都會+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//拒簽消息:消息投遞序號,是否一次拒簽多條消息,拒簽消息是否重回隊列
channel.basicNack(deliveryTag,true,false);
}
}
死信隊列實現(xiàn)延遲隊列 延遲隊列,即消息進(jìn)入隊列后不會立即被消費,只有到達(dá)指定時間后,才會被消費。 但RabbitMQ中并未提供延遲隊列功能,我們可以使用死信隊列實現(xiàn)延遲隊列的效果。設(shè)置相應(yīng)的過期時間,到期都進(jìn)去死信隊列,在死信隊列中被消費,從而形成延時效果。 弊端:在使用死信隊列實現(xiàn)延遲隊列時,會遇到一個問題:RabbitMQ只會移除隊列頂端的過期消息,如果第一個消息的存活時長較長,而第二個消息的存活時長較短,則第二個消息并不會及時執(zhí)行。 解決:RabbitMQ雖然本身不能使用延遲隊列,但官方提供了延遲隊列插件,安裝后可直接使用延遲隊列。 【*】下載插件
RabbitMQ有一個官方的插件社區(qū),地址為:https://www.rabbitmq.com/community-plugins.html
其中包含各種各樣的插件,包括我們要使用的DelayExchange插件。
2. RabbitMQ 應(yīng)用場景
RabbitMQ 可以廣泛應(yīng)用于各種場景,其中一些主要應(yīng)用包括:
異步通信: 通過消息隊列實現(xiàn)應(yīng)用程序之間的解耦,提高系統(tǒng)的響應(yīng)速度和并發(fā)處理能力。
任務(wù)調(diào)度: 將任務(wù)以消息的形式發(fā)送到隊列,由消費者進(jìn)行處理,實現(xiàn)任務(wù)的異步執(zhí)行和調(diào)度。
日志處理: 將系統(tǒng)產(chǎn)生的日志通過消息隊列發(fā)送到日志處理系統(tǒng),實現(xiàn)日志的集中管理和分析。
微服務(wù)通信: 在微服務(wù)架構(gòu)中,通過消息隊列進(jìn)行服務(wù)之間的通信,確保服務(wù)之間的松耦合和可擴展性。
事件驅(qū)動架構(gòu): 使用消息隊列實現(xiàn)事件的發(fā)布和訂閱,實現(xiàn)系統(tǒng)中不同組件之間的松耦合通信。
3. RabbitMQ 與 Spring 集成
RabbitMQ 與 Spring 框架的集成相對簡單,Spring 提供了 spring-amqp 模塊用于支持 RabbitMQ。通過 Spring 提供的注解和配置,開發(fā)者可以方便地在 Spring 項目中使用 RabbitMQ。
4. 安全性與權(quán)限控制
RabbitMQ 提供了一系列的安全性特性,包括身份驗證、訪問控制列表(ACL)和加密通信等。通過合理配置這些安全性選項,可以確保 RabbitMQ 系統(tǒng)的數(shù)據(jù)安全和隱私保護(hù)。
5. 性能調(diào)優(yōu)和監(jiān)控
為了保持 RabbitMQ 系統(tǒng)的高性能,開發(fā)者需要進(jìn)行性能調(diào)優(yōu)和監(jiān)控。通過監(jiān)控工具,如 Prometheus 和 Grafana,以及 RabbitMQ 提供的管理插件,可以實時監(jiān)測系統(tǒng)性能、隊列狀態(tài)和節(jié)點健康情況,從而及時發(fā)現(xiàn)和解決潛在問題。
6 總結(jié)
RabbitMQ 作為一款強大的消息代理系統(tǒng),為構(gòu)建分布式系統(tǒng)和微服務(wù)提供了可靠的消息傳遞解決方案。本博客從 RabbitMQ 的基礎(chǔ)概念、核心特性、高級特性、應(yīng)用場景、集成與安全性等方面進(jìn)行了詳盡的探討。深入理解 RabbitMQ,將有助于開發(fā)者更好地應(yīng)用它解決實際問題,并構(gòu)建高性能、可擴展、可靠的分布式應(yīng)用系統(tǒng)。希望讀者通過本文能夠?qū)?RabbitMQ 有更深入的了解,從而更好地應(yīng)用于實際開發(fā)中。 以上就是全部內(nèi)容,如果你有任何問題、意見或建議,都?xì)g迎在評論中分享。讓我們繼續(xù)分享知識,共同成長,一起走向更加美好的未來。感謝你們的閱讀,祝愿你們在未來的道路上一帆風(fēng)順!
柚子快報激活碼778899分享:分布式 RabbitMQ進(jìn)階
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。