柚子快報(bào)激活碼778899分享:分布式 RabbitMQ知識(shí)點(diǎn)
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ知識(shí)點(diǎn)
MQ架構(gòu)設(shè)計(jì)原理
什么是消息中間件
消息中間件基于隊(duì)列模型實(shí)現(xiàn)異步/同步傳輸數(shù)據(jù)
作用:可以實(shí)現(xiàn)支撐高并發(fā)、異步解耦、流量削峰、降低耦合度。
傳統(tǒng)的http請(qǐng)求存在那些缺點(diǎn)
1.Http請(qǐng)求基于請(qǐng)求與響應(yīng)的模型,在高并發(fā)的情況下,客戶端發(fā)送大量的請(qǐng)求達(dá)到
服務(wù)器端有可能會(huì)導(dǎo)致我們服務(wù)器端處理請(qǐng)求堆積。
2.Tomcat服務(wù)器處理每個(gè)請(qǐng)求都有自己獨(dú)立的線程,如果超過(guò)最大線程數(shù)會(huì)將該請(qǐng)求緩存到隊(duì)列中,如果請(qǐng)求堆積過(guò)多的情況下,有可能會(huì)導(dǎo)致tomcat服務(wù)器崩潰的問(wèn)題。
所以一般都會(huì)在nginx入口實(shí)現(xiàn)限流,整合服務(wù)保護(hù)框架。
?3.http請(qǐng)求處理業(yè)務(wù)邏輯如果比較耗時(shí)的情況下,容易造成客戶端一直等待,阻塞等待過(guò)程中會(huì)導(dǎo)致客戶端超時(shí)發(fā)生重試策略,有可能會(huì)引發(fā)冪等性問(wèn)題。
注意事項(xiàng):接口是為http協(xié)議的情況下,最好不要處理比較耗時(shí)的業(yè)務(wù)邏輯,耗時(shí)的業(yè)務(wù)邏輯應(yīng)該單獨(dú)交給多線程或者是mq處理。
Mq應(yīng)用場(chǎng)景有那些 消息隊(duì)列(Message Queue,簡(jiǎn)稱MQ)在分布式系統(tǒng)中有著廣泛的應(yīng)用,以下是MQ的一些主要應(yīng)用場(chǎng)景: 1.異步處理: - 當(dāng)某個(gè)操作不需要立即返回結(jié)果,或者該操作非常耗時(shí)且影響主業(yè)務(wù)流程的性能時(shí),可以使用MQ將操作放入后臺(tái)異步執(zhí)行。例如,用戶注冊(cè)成功后發(fā)送歡迎郵件或短信,這些操作不需要實(shí)時(shí)同步完成,可以放入MQ中異步處理。 2.應(yīng)用解耦: - MQ允許應(yīng)用間進(jìn)行低耦合的通信。通過(guò)將消息發(fā)送到一個(gè)或多個(gè)目的地址(隊(duì)列或主題),應(yīng)用程序可以獨(dú)立地執(zhí)行和擴(kuò)展,而不需要知道彼此的實(shí)現(xiàn)細(xì)節(jié)。例如,訂單系統(tǒng)產(chǎn)生訂單后,將訂單數(shù)據(jù)發(fā)送到MQ,庫(kù)存系統(tǒng)、物流系統(tǒng)等多個(gè)系統(tǒng)監(jiān)聽(tīng)MQ中的訂單數(shù)據(jù),根據(jù)訂單數(shù)據(jù)執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。 3.流量削峰: - 在高并發(fā)場(chǎng)景下,如果某個(gè)應(yīng)用或系統(tǒng)突然接收到大量的請(qǐng)求,可能導(dǎo)致系統(tǒng)壓力過(guò)大甚至崩潰。通過(guò)使用MQ,可以將這些請(qǐng)求暫存起來(lái),然后按照系統(tǒng)能夠處理的速率勻速地將請(qǐng)求分發(fā)給后臺(tái)服務(wù)進(jìn)行處理。這樣可以有效地緩解系統(tǒng)壓力,避免系統(tǒng)崩潰。 4.數(shù)據(jù)同步: - MQ可以實(shí)現(xiàn)不同系統(tǒng)之間的數(shù)據(jù)同步。例如,在分布式數(shù)據(jù)庫(kù)中,可以使用MQ將數(shù)據(jù)的變更事件(如增刪改操作)發(fā)布到MQ中,其他系統(tǒng)監(jiān)聽(tīng)這些事件并據(jù)此更新自己的數(shù)據(jù)副本。 5.日志處理: - MQ可以用于收集、處理和存儲(chǔ)日志信息。通過(guò)將日志信息發(fā)送到MQ中,可以異步地將日志信息寫(xiě)入到存儲(chǔ)系統(tǒng)(如HDFS、Elasticsearch等)中,并進(jìn)行后續(xù)的分析和處理。
-6.任務(wù)調(diào)度: - MQ可以用于實(shí)現(xiàn)任務(wù)的調(diào)度和執(zhí)行。通過(guò)將任務(wù)信息發(fā)送到MQ中,并指定任務(wù)的執(zhí)行時(shí)間和執(zhí)行條件,可以實(shí)現(xiàn)定時(shí)任務(wù)、延遲任務(wù)等功能的實(shí)現(xiàn)。 7.微服務(wù)架構(gòu)中的通信: - 在微服務(wù)架構(gòu)中,服務(wù)與服務(wù)之間的通信是常見(jiàn)的需求。使用MQ可以實(shí)現(xiàn)服務(wù)之間的松耦合通信,使得服務(wù)可以獨(dú)立地?cái)U(kuò)展和升級(jí)。同時(shí),MQ還可以提供消息持久化、消息確認(rèn)等機(jī)制,保證服務(wù)之間通信的可靠性和穩(wěn)定性。 8.分布式事務(wù): - 在分布式系統(tǒng)中,實(shí)現(xiàn)跨多個(gè)服務(wù)的事務(wù)一致性是一個(gè)挑戰(zhàn)。通過(guò)使用MQ的分布式事務(wù)功能(如RabbitMQ的RabbitMQ-delayed-message-exchange插件或Kafka的事務(wù)API),可以實(shí)現(xiàn)跨多個(gè)服務(wù)的分布式事務(wù)處理。 9.消息廣播與訂閱: - MQ可以實(shí)現(xiàn)消息的廣播與訂閱功能。發(fā)布者將消息發(fā)送到特定的主題(Topic),所有訂閱了該主題的消費(fèi)者都可以接收到這些消息。這種模式常用于實(shí)時(shí)消息系統(tǒng)、新聞推送等場(chǎng)景。 10.跨語(yǔ)言通信: - MQ支持多種編程語(yǔ)言和平臺(tái),可以實(shí)現(xiàn)跨語(yǔ)言、跨平臺(tái)的通信。這使得不同語(yǔ)言開(kāi)發(fā)的應(yīng)用或服務(wù)可以方便地進(jìn)行通信和數(shù)據(jù)交換。
為什么使用MQ
消息中間件(如RabbitMQ、Kafka、ActiveMQ等)在分布式系統(tǒng)中扮演著重要的角色,它們基于隊(duì)列模型(Queue Model)實(shí)現(xiàn)數(shù)據(jù)的異步或同步傳輸。下面我將詳細(xì)解釋這些中間件如何支持高并發(fā)、異步解耦、流量削峰以及降低耦合度的作用。 1.支撐高并發(fā): - 當(dāng)系統(tǒng)面臨高并發(fā)請(qǐng)求時(shí),如果每個(gè)請(qǐng)求都直接由系統(tǒng)處理,可能會(huì)導(dǎo)致系統(tǒng)資源迅速耗盡,響應(yīng)速度下降,甚至服務(wù)崩潰。 - 消息中間件通過(guò)隊(duì)列的方式將請(qǐng)求暫存起來(lái),后臺(tái)服務(wù)可以異步地從隊(duì)列中獲取請(qǐng)求并進(jìn)行處理。這樣,即使在高并發(fā)場(chǎng)景下,系統(tǒng)也能保持穩(wěn)定的響應(yīng)速度,因?yàn)檎?qǐng)求被均勻地分散到不同的時(shí)間段內(nèi)處理。 - 消息中間件通常支持多消費(fèi)者模型,即多個(gè)服務(wù)實(shí)例可以同時(shí)從隊(duì)列中消費(fèi)消息,從而進(jìn)一步提高并發(fā)處理能力。 2.異步解耦: - 在傳統(tǒng)的同步請(qǐng)求-響應(yīng)模型中,請(qǐng)求方必須等待響應(yīng)方處理完請(qǐng)求后才能繼續(xù)執(zhí)行后續(xù)操作。這種模型下,請(qǐng)求方和響應(yīng)方之間存在緊密的耦合關(guān)系。 - 消息中間件引入了異步通信機(jī)制,使得請(qǐng)求方和響應(yīng)方可以解耦。請(qǐng)求方只需將請(qǐng)求發(fā)送到消息隊(duì)列中,然后立即返回,無(wú)需等待響應(yīng)。響應(yīng)方可以在后臺(tái)異步地處理這些請(qǐng)求,并通過(guò)其他方式(如回調(diào)函數(shù)、HTTP響應(yīng)等)將結(jié)果通知給請(qǐng)求方。 - 這種異步解耦的方式可以提高系統(tǒng)的響應(yīng)速度和吞吐量,同時(shí)減少系統(tǒng)間的依賴關(guān)系,提高系統(tǒng)的可擴(kuò)展性和可維護(hù)性。 3.流量削峰: - 在某些場(chǎng)景下,系統(tǒng)可能會(huì)在短時(shí)間內(nèi)接收到大量的請(qǐng)求,導(dǎo)致系統(tǒng)負(fù)載急劇上升,甚至超過(guò)系統(tǒng)的處理能力。 - 消息中間件可以將這些請(qǐng)求暫存到隊(duì)列中,然后按照系統(tǒng)能夠處理的速率勻速地將請(qǐng)求分發(fā)給后臺(tái)服務(wù)進(jìn)行處理。這樣,即使系統(tǒng)面臨巨大的流量沖擊,也能保持穩(wěn)定的性能和響應(yīng)速度。 - 流量削峰不僅可以保護(hù)系統(tǒng)免受流量沖擊的影響,還可以提高系統(tǒng)的可用性和容錯(cuò)能力。 4.降低耦合度: - 在復(fù)雜的分布式系統(tǒng)中,各個(gè)服務(wù)之間可能存在復(fù)雜的依賴關(guān)系,這些依賴關(guān)系可能導(dǎo)致系統(tǒng)難以維護(hù)和擴(kuò)展。 - 消息中間件通過(guò)引入消息隊(duì)列作為服務(wù)之間的通信媒介,降低了服務(wù)之間的耦合度。服務(wù)之間不再需要直接調(diào)用對(duì)方的方法或接口,而是通過(guò)發(fā)送和接收消息來(lái)進(jìn)行通信。 - 這種基于消息的通信方式使得服務(wù)可以更加獨(dú)立地設(shè)計(jì)和實(shí)現(xiàn),提高了系統(tǒng)的可維護(hù)性和可擴(kuò)展性。同時(shí),由于服務(wù)之間的耦合度降低,系統(tǒng)的容錯(cuò)能力也得到了提高,即使某個(gè)服務(wù)出現(xiàn)故障,也不會(huì)影響其他服務(wù)的正常運(yùn)行。
Mq與多線程之間區(qū)別
MQ可以實(shí)現(xiàn)異步/解耦/流量削峰問(wèn)題;
多線程也可以實(shí)現(xiàn)異步,但是消耗到cpu資源,沒(méi)有實(shí)現(xiàn)解耦。
實(shí)例:(基于springboot實(shí)現(xiàn)異步多線程)
package com.example.rabbitmq.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MemberService {
@Autowired
public MemberServiceAsync memberServiceAsync;
//@Bean
@RequestMapping("/addMember")
public String addMember(){
//1.數(shù)據(jù)庫(kù)插入數(shù)據(jù) log.info(">01<");
System.out.println(">01<");
// sms();
memberServiceAsync.smsAsync();
System.out.println(">04<");
return "用戶注冊(cè)成功??!";
}
public String sms(){
System.out.println(">02<");
try{
System.out.println(">正在發(fā)送短信<");
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}
System.out.println(">03<");
return "短信發(fā)送完成!";
}
}
package com.example.rabbitmq.mq;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class MemberServiceAsync {
@Async
public void smsAsync(){ // 注意返回類型為void,因?yàn)楫惒椒椒ㄍǔ2环祷亟Y(jié)果給調(diào)用者
System.out.println(">02<");
try{
System.out.println(">正在發(fā)送短信<");
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}
System.out.println(">03<");
// 注意:這里不應(yīng)該返回任何值給調(diào)用者,因?yàn)檎{(diào)用是異步的
}
}
啟動(dòng)類
package com.example.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@SpringBootApplication
public class RabbitMqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqApplication.class, args);
}
}
效果
基于多線程隊(duì)列簡(jiǎn)單實(shí)現(xiàn)mq
package com.example.rabbitmq.mq;
import org.json.JSONObject;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 沒(méi)有網(wǎng)絡(luò)的情況下實(shí)現(xiàn)MQ
* 利用多線程制造生產(chǎn)者和消費(fèi)者
*/
public class BoyatopThreadMQ {
//MQ服務(wù)器 初始化消息的隊(duì)列
private static LinkedBlockingDeque
//主函數(shù) 程序入口
public static void main(String[] args) {
//生產(chǎn)者 生產(chǎn)線程
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
Thread.sleep(1000);
JSONObject data = new JSONObject();
data.put("userId","12345");
//存入消息隊(duì)列
msgs.offer(data);
}
} catch (Exception e) {
e.printStackTrace();
}
}
},"生產(chǎn)者");
producerThread.start();
//消費(fèi)端 消費(fèi)線程
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
JSONObject data = msgs.poll();
if(data != null){
System.out.println(Thread.currentThread().getName() + ",獲取到數(shù)據(jù):" + data);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
},"消費(fèi)者");
consumerThread.start();
}
}
效果:
基于netty實(shí)現(xiàn)mq
Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,專門用于構(gòu)建高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用程序。它提供了一個(gè)快速簡(jiǎn)便的方法來(lái)構(gòu)建網(wǎng)絡(luò)服務(wù)器和客戶端,并且不需要關(guān)注復(fù)雜的網(wǎng)絡(luò)編程細(xì)節(jié)。Netty支持多種網(wǎng)絡(luò)協(xié)議,包括TCP、UDP、HTTP、WebSocket等,使其適用于各種應(yīng)用場(chǎng)景。Netty在許多大型互聯(lián)網(wǎng)公司和開(kāi)源項(xiàng)目中被廣泛使用,特別是在需要處理大量并發(fā)連接和低延遲的場(chǎng)景中。
消費(fèi)者netty客戶端與nettyServer端MQ服務(wù)器端保持長(zhǎng)連接,MQ服務(wù)器端保存消費(fèi)者連接。
生產(chǎn)者netty客戶端發(fā)送請(qǐng)求給nettyServer端MQ服務(wù)器端,MQ服務(wù)器端在將該消息內(nèi)容發(fā)送給消費(fèi)者
導(dǎo)入相關(guān)依賴
Netty生產(chǎn)端
package com.example.rabbitmq.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName BoyatopNettyMQProducer
* @Author
* @Version V1.0
**/
public class NettyProducer {
public void connect(int port, String host) throws Exception {
//配置客戶端NIO 線程組
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 設(shè)置為Netty客戶端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)。
* Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到來(lái),組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該算法有效提高了網(wǎng)絡(luò)的有效負(fù)載,但是卻造成了延時(shí)。
* 而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸。和TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送數(shù)據(jù),適用于文件傳輸。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyProducer.NettyClientHandler());
1. 演示LineBasedFrameDecoder編碼器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//綁定端口, 異步連接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客戶端連接端口關(guān)閉
future.channel().closeFuture().sync();
} finally {
//優(yōu)雅關(guān)閉 線程組
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyProducer client = new NettyProducer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "producer");
JSONObject msg = new JSONObject();
msg.put("userId", "123456");
msg.put("age", "23");
data.put("msg", msg);
// 生產(chǎn)發(fā)送數(shù)據(jù)
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客戶端讀取到服務(wù)器端數(shù)據(jù)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客戶端接收到服務(wù)器端請(qǐng)求:" + body);
}
// tcp屬于雙向傳輸
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
Netty服務(wù)端
package com.example.rabbitmq.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @ClassName
* @Author
* @Version V1.0
**/
public class NettyMQServer {
public void bind(int port) throws Exception {
/**
* Netty 抽象出兩組線程池BossGroup和WorkerGroup
* BossGroup專門負(fù)責(zé)接收客戶端的連接, WorkerGroup專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫(xiě)。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
// 設(shè)定NioServerSocketChannel 為服務(wù)器端
.channel(NioServerSocketChannel.class)
//BACKLOG用于構(gòu)造服務(wù)端套接字ServerSocket對(duì)象,標(biāo)識(shí)當(dāng)服務(wù)器請(qǐng)求處理線程全滿時(shí),
//用于臨時(shí)存放已完成三次握手的請(qǐng)求的隊(duì)列的最大長(zhǎng)度。如果未設(shè)置或所設(shè)置的值小于1,Java將使用默認(rèn)值50。
.option(ChannelOption.SO_BACKLOG, 100)
// 服務(wù)器端監(jiān)聽(tīng)數(shù)據(jù)回調(diào)Handler
.childHandler(new NettyMQServer.ChildChannelHandler());
//綁定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("當(dāng)前服務(wù)器端啟動(dòng)成功...");
//等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//優(yōu)雅關(guān)閉 線程組
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 設(shè)置異步回調(diào)監(jiān)聽(tīng)
ch.pipeline().addLast(new NettyMQServer.MayiktServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 9008;
new NettyMQServer().bind(port);
}
private static final String type_consumer = "consumer";
private static final String type_producer = "producer";
private static LinkedBlockingDeque
private static ArrayList
// 生產(chǎn)者投遞消息的:topicName
public class MayiktServerHandler extends SimpleChannelInboundHandler
/**
* 服務(wù)器接收客戶端請(qǐng)求
*
* @param ctx
* @param data
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object data)
throws Exception {
JSONObject clientMsg = getData(data);
String type = clientMsg.getString("type");
switch (type) {
case type_producer:
producer(clientMsg);
break;
case type_consumer:
consumer(ctx);
break;
}
}
private void consumer(ChannelHandlerContext ctx) {
// 保存消費(fèi)者連接
ctxs.add(ctx);
// 主動(dòng)拉取mq服務(wù)器端緩存中沒(méi)有被消費(fèi)的消息
String data = msgs.poll();
if (StringUtils.isEmpty(data)) {
return;
}
// 將該消息發(fā)送給消費(fèi)者
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
private void producer(JSONObject clientMsg) {
// 緩存生產(chǎn)者投遞 消息
String msg = clientMsg.getString("msg");
msgs.offer(msg);
//需要將該消息推送消費(fèi)者
ctxs.forEach((ctx) -> {
// 將該消息發(fā)送給消費(fèi)者
String data = msgs.poll();
if (data == null) {
return;
}
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
});
}
private JSONObject getData(Object data) throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf) data;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
return JSONObject.parseObject(body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
}
Netty消費(fèi)端
package com.example.rabbitmq.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName BoyatopNettyMQProducer
* @Author
* @Version V1.0
**/
public class NettyConsumer {
public void connect(int port, String host) throws Exception {
//配置客戶端NIO 線程組
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 設(shè)置為Netty客戶端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)。
* Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到來(lái),組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該算法有效提高了網(wǎng)絡(luò)的有效負(fù)載,但是卻造成了延時(shí)。
* 而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸。和TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送數(shù)據(jù),適用于文件傳輸。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyConsumer.NettyClientHandler());
1. 演示LineBasedFrameDecoder編碼器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//綁定端口, 異步連接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客戶端連接端口關(guān)閉
future.channel().closeFuture().sync();
} finally {
//優(yōu)雅關(guān)閉 線程組
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyConsumer client = new NettyConsumer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "consumer");
// 生產(chǎn)發(fā)送數(shù)據(jù)
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客戶端讀取到服務(wù)器端數(shù)據(jù)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客戶端接收到服務(wù)器端請(qǐng)求:" + body);
}
// tcp屬于雙向傳輸
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
netty和rabbitmq的區(qū)別
首先,從設(shè)計(jì)目的和功能上看,Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,主要用于構(gòu)建高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用程序。它提供了一個(gè)快速簡(jiǎn)便的方法來(lái)構(gòu)建網(wǎng)絡(luò)服務(wù)器和客戶端,并且不需要關(guān)注復(fù)雜的網(wǎng)絡(luò)編程細(xì)節(jié)。而RabbitMQ是一個(gè)開(kāi)源的消息隊(duì)列系統(tǒng),主要用于在不同的應(yīng)用程序之間進(jìn)行消息傳遞和異步通信。它實(shí)現(xiàn)了一種分布式消息傳遞機(jī)制,使得應(yīng)用程序可以獨(dú)立地工作,并通過(guò)消息隊(duì)列來(lái)交換數(shù)據(jù)。
其次,從使用場(chǎng)景上看,Netty更側(cè)重于底層網(wǎng)絡(luò)通信的開(kāi)發(fā),可以處理TCP、UDP等底層網(wǎng)絡(luò)協(xié)議,適用于需要高性能網(wǎng)絡(luò)通信的場(chǎng)景。而RabbitMQ則更側(cè)重于業(yè)務(wù)層次的消息傳遞,它支持多種消息傳遞協(xié)議,如AMQP、MQTT、STOMP等,并提供了可靠的消息傳遞機(jī)制、靈活的消息路由和隊(duì)列管理功能,適用于需要異步通信、解耦和流量削峰等場(chǎng)景。
此外,Netty和RabbitMQ在性能、可擴(kuò)展性、可靠性等方面也有所不同。Netty通過(guò)異步事件驅(qū)動(dòng)的方式和高效的線程模型,實(shí)現(xiàn)了高性能的網(wǎng)絡(luò)通信。而RabbitMQ則通過(guò)分布式架構(gòu)和持久化機(jī)制,實(shí)現(xiàn)了高可靠性和可擴(kuò)展性。同時(shí),RabbitMQ還提供了可視化的管理界面和多種編程語(yǔ)言的客戶端庫(kù),方便用戶進(jìn)行管理和使用。
Mq消息中間件名詞
Producer 生產(chǎn)者:投遞消息到MQ服務(wù)器端;
Consumer?消費(fèi)者:從MQ服務(wù)器端獲取消息處理業(yè)務(wù)邏輯;
Broker??MQ服務(wù)器端
Topic 主題:分類業(yè)務(wù)邏輯發(fā)送短信主題、發(fā)送優(yōu)惠券主題
Queue 存放消息模型隊(duì)列 先進(jìn)先出 后進(jìn)后出原則 數(shù)組/鏈表
Message 生產(chǎn)者投遞消息報(bào)文:json
主流mq區(qū)別對(duì)比
特性 ActiveMQ RabbitMQ RocketMQ kafka 開(kāi)發(fā)語(yǔ)言 java erlang java scala 單機(jī)吞吐量 萬(wàn)級(jí) 萬(wàn)級(jí) 10萬(wàn)級(jí) 10萬(wàn)級(jí) 時(shí)效性 ms級(jí) us級(jí) ms級(jí) ms級(jí)以內(nèi) 可用性 高(主從架構(gòu)) 高(主從架構(gòu)) 非常高(分布式架構(gòu)) 非常高(分布式架構(gòu)) 功能特性 成熟的產(chǎn)品,在很多公司得到應(yīng)用;有較多的文檔;各種協(xié)議支持較好 基于erlang開(kāi)發(fā),所以并發(fā)能力很強(qiáng),性能極其好,延時(shí)很低管理界面較豐富 MQ功能比較完備,擴(kuò)展性佳 只支持主要的MQ功能,像一些消息查詢,消息回溯等功能沒(méi)有提供,畢竟是為大數(shù)據(jù)準(zhǔn)備的,在大數(shù)據(jù)領(lǐng)域應(yīng)用廣。
centos安裝rabbitmq
命令安裝
1、更新系統(tǒng)
yum update -y
2、安裝依賴包erlang
yum install erlang -y
3、添加rabbitMQ倉(cāng)庫(kù)
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
4、安裝rabbitMQ
yum install rabbitmq-server -y
erlang安裝成功
添加rabbitMQ倉(cāng)庫(kù)
安裝rabbitMQ成功
手動(dòng)安裝
1.官網(wǎng)地址
Installing RabbitMQ | RabbitMQ
2.文件上傳
上傳到/usr/local/software 目錄下(如果沒(méi)有 software 需要自己創(chuàng)建)
3.安裝文件(分別按照以下順序安裝)
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
常用命令(按照以下順序執(zhí)行)
添加開(kāi)機(jī)啟動(dòng) RabbitMQ 服務(wù)
chkconfig rabbitmq-server on
systemctl enable rabbitmq-server.service
啟動(dòng)服務(wù)
/sbin/service rabbitmq-server start
查看服務(wù)狀態(tài)
/sbin/service rabbitmq-server status
停止服務(wù)(選擇執(zhí)行)
/sbin/service rabbitmq-server stop
開(kāi)啟 web 管理插件
rabbitmq-plugins enable rabbitmq_management
用默認(rèn)賬號(hào)密碼(guest)訪問(wèn)地址http://62.234.175.16:15672/出現(xiàn)權(quán)限問(wèn)題
添加一個(gè)新的用戶
創(chuàng)建賬號(hào)
rabbitmqctl add_user admin 123
設(shè)置用戶角色
rabbitmqctl set_user_tags admin administrator
設(shè)置用戶權(quán)限
#set_permissions [-p
rabbitmqctl set_permissions -p "/" admin "." "." ".*"
用戶 user_admin 具有/vhost1 這個(gè) virtual host 中所有資源的配置、寫(xiě)、讀權(quán)限的
查看當(dāng)前用戶和角色
rabbitmqctl list_users
5.再次利用 admin 用戶登錄
重置命令
關(guān)閉應(yīng)用的命令為
rabbitmqctl stop_app
清除的命令為
rabbitmqctl reset
重新啟動(dòng)命令為
rabbitmqctl start_app
簡(jiǎn)單案例
1、導(dǎo)入依賴
2、消息生產(chǎn)者
package com.example.rabbitmq.demo1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
try(Connection connection = factory.newConnection(); Channel channel =
connection.createChannel()) {
/**
* 生成一個(gè)隊(duì)列
* 1. 隊(duì)列名稱
* 2. 隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中
* 3. 該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)
* 4. 是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開(kāi)連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除
* 5. 其他參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 發(fā)送一個(gè)消息
* 1. 發(fā)送到那個(gè)交換機(jī)
* 2. 路由的 key 是哪個(gè)
* 3. 其他的參數(shù)信息
* 4. 發(fā)送消息的消息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" 消息發(fā)送完畢");
}
}
}
3、消息消費(fèi)者
package com.example.rabbitmq.demo1;
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println(" 等待接收消息....");
// 推送的消息如何進(jìn)行消費(fèi)的接口回調(diào)
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
// 取消消費(fèi)的一個(gè)回調(diào)接口 如在消費(fèi)的時(shí)候隊(duì)列被刪除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(" 消息消費(fèi)被中斷");
};
/**
* 消費(fèi)者消費(fèi)消息
* 1. 消費(fèi)哪個(gè)隊(duì)列
* 2. 消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答
* 3. 消費(fèi)者未成功消費(fèi)的回調(diào)
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
RabbitMQ的六種消息模式
1.簡(jiǎn)單模式(Simple): - 這是一個(gè)一對(duì)一的消息模式。生產(chǎn)者將消息發(fā)送到隊(duì)列,一個(gè)消費(fèi)者從隊(duì)列中獲取消息。當(dāng)多個(gè)消費(fèi)者同時(shí)監(jiān)聽(tīng)一個(gè)隊(duì)列時(shí),他們并不能同時(shí)消費(fèi)一條消息,而是隨機(jī)消費(fèi)消息。一旦消息被消費(fèi)者接收,它就會(huì)從隊(duì)列中刪除。 2.工作隊(duì)列模式(Work Queues): - 這是一個(gè)一對(duì)多的消息模式。生產(chǎn)者將消息發(fā)送到隊(duì)列,多個(gè)消費(fèi)者從隊(duì)列中獲取消息。消息會(huì)在消費(fèi)者之間平均分配,但具體的分發(fā)機(jī)制(輪詢或公平分發(fā))可以配置。這種模式適用于執(zhí)行資源密集型任務(wù),單個(gè)消費(fèi)者處理不過(guò)來(lái),需要多個(gè)消費(fèi)者進(jìn)行處理。 3.發(fā)布訂閱模式(Publish/Subscribe 或 Fanout): - 這是一個(gè)廣播式的消息傳遞方式。生產(chǎn)者發(fā)送消息到交換機(jī),交換機(jī)將消息廣播到所有與之綁定的隊(duì)列,每個(gè)隊(duì)列對(duì)應(yīng)的消費(fèi)者都可以接收到消息。 4.路由模式(Routing): - 類似于發(fā)布訂閱模式,但增加了路由鍵(Routing Key)的概念。生產(chǎn)者發(fā)送消息時(shí),會(huì)指定一個(gè)路由鍵。交換機(jī)根據(jù)路由鍵將消息發(fā)送到匹配的隊(duì)列,再由隊(duì)列對(duì)應(yīng)的消費(fèi)者消費(fèi)。 5.主題模式(Topics): - 這是路由模式的擴(kuò)展。路由鍵可以是多個(gè)單詞的字符串,用.分隔。消費(fèi)者可以通過(guò)通配符(* 和 #)來(lái)匹配路由鍵,從而接收消息。* 匹配一個(gè)單詞,# 匹配一個(gè)或多個(gè)單詞。 6.RPC模式(Remote Procedure Call): - RPC 是一種通信機(jī)制,允許客戶端發(fā)送請(qǐng)求到遠(yuǎn)程服務(wù),并等待響應(yīng)。RabbitMQ 可以用于實(shí)現(xiàn) RPC,客戶端發(fā)送請(qǐng)求消息到隊(duì)列,服務(wù)器從隊(duì)列中獲取請(qǐng)求并處理,然后將結(jié)果發(fā)送回客戶端。
1、簡(jiǎn)單模式(點(diǎn)對(duì)點(diǎn))
。在這個(gè)模式中,一個(gè)生產(chǎn)者(Producer)發(fā)送消息到一個(gè)隊(duì)列(Queue),然后一個(gè)消費(fèi)者(Consumer)從該隊(duì)列中接收并處理這些消息。這個(gè)模式?jīng)]有使用交換機(jī)(Exchange),而是直接將消息發(fā)送到隊(duì)列。
案例參照上述簡(jiǎn)單案例
2、工作隊(duì)列模式(Work Queues)
工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。
相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)
程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。
1)輪詢分發(fā)消息
案例:一個(gè)生產(chǎn)者發(fā)送消息,兩個(gè)消費(fèi)者接收消息
①抽取工具類
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
// 得到一個(gè)連接的 channel
public static Channel getChannel() throws Exception{
// 創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//設(shè)置VirtualHost
// factory.setVirtualHost("/jxHosts");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
②啟動(dòng)兩個(gè)工作線程
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 演示消息輪詢機(jī)制
*/
public class Worker01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi)......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
================================================================================
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 演示消息輪詢機(jī)制
*/
public class Worker02 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
System.out.println("C2 消費(fèi)者啟動(dòng)等待消費(fèi)......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
③啟動(dòng)一個(gè)發(fā)送線程
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 演示消息輪詢機(jī)制
*/
public class Task01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try(Channel channel=RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("任務(wù)一,請(qǐng)輸入消息:");
// 從控制臺(tái)當(dāng)中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" 發(fā)送消息完成:"+message);
}
}
}
}
④結(jié)果展示
2)消息應(yīng)答
① 概念
消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成
了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消
息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù)
發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。
為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi) 者在接
收到消息并且處理該消息之后,告訴 q rabbitmq 它已經(jīng)處理了,q rabbitmq 可以把該消息刪除了。
② 自動(dòng)應(yīng)答
消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在 高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)
衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟
失了,當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制,
當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終
使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死, 所以這種模式僅適用在消費(fèi)者可以高效并
以某種速率能夠處理這些消息的情況下使用。
RabbitMQ默認(rèn)采用自動(dòng)應(yīng)答
③ 消息應(yīng)答的方法
A.Channel.basicAck(用于肯定確認(rèn))
RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
B B.Channel.basicNack(用于否定確認(rèn))
C C.Channel.basicReject(用于否定確認(rèn))
與 Channel.basicNack 相比少一個(gè)參數(shù)
不處理該消息了直接拒絕,可以將其丟棄了
④ 手動(dòng)應(yīng)答
手動(dòng)應(yīng)答的好處是可以批量應(yīng)答并且減少網(wǎng)絡(luò)擁堵
multiple 的 true 和 false 代表不同意思
true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
false 同上面相比,只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答
手動(dòng)應(yīng)答的好處
如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確保不會(huì)丟失任何消息。
案例:手動(dòng)應(yīng)答,一個(gè)生產(chǎn)者,兩個(gè)消費(fèi)者
① 睡眠工具類
package com.example.rabbitmq.demo2;
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
② 生產(chǎn)者
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 確認(rèn)應(yīng)答機(jī)制
*/
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println("確認(rèn)應(yīng)答機(jī)制");
Scanner sc = new Scanner(System.in);
System.out.println(" 請(qǐng)輸入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" 生產(chǎn)者發(fā)出消息" + message);
}
}
}
}
③ 消費(fèi)者01
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Work03 {
private static final String ACK_QUEUE_NAME="ack-queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息處理時(shí)間較短");
// 消息消費(fèi)的時(shí)候如何處理消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println(" 接收到消息:"+message);
/**
* 1. 消息標(biāo)記 tag
* 2. 是否批量應(yīng)答未應(yīng)答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 采用手動(dòng)應(yīng)答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
});
}
}
④ 消費(fèi)者02
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 確認(rèn)應(yīng)答機(jī)制
*
*/
public class Work04 {
private static final String ACK_QUEUE_NAME="ack-queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2 等待接收消息處理時(shí)間較長(zhǎng)");
// 消息消費(fèi)的時(shí)候如何處理消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println(" 接收到消息:"+message);
/**
* 1. 消息標(biāo)記 tag
* 2. 是否批量應(yīng)答未應(yīng)答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 采用手動(dòng)應(yīng)答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回 調(diào)邏輯");
});
}
}
⑤ 結(jié)果展示
在發(fā)送者發(fā)送消息 dd,發(fā)出消息之后的把 C2 消費(fèi)者停掉,按理說(shuō)該 C2 來(lái)處理該消息,但是
由于它處理時(shí)間較長(zhǎng),在還未處理完,也就是說(shuō) C2 還沒(méi)有執(zhí)行 ack 代碼的時(shí)候,C2 被停掉了,
此時(shí)會(huì)看到消息被 C1 接收到了,說(shuō)明消息 dd 被重新入隊(duì),然后分配給能處理消息的 C1 處理了
RabbitMQ的四種交換機(jī)類型
1.Fanout Exchange(扇型交換機(jī)): - 這種交換機(jī)將消息廣播到與之綁定的所有隊(duì)列,無(wú)論消息的路由鍵是什么。它也通常用于發(fā)布/訂閱模式,其中一個(gè)消息被廣播給所有訂閱者。 2.Direct Exchange(直連交換機(jī)): - 這種交換機(jī)根據(jù)消息的路由鍵(Routing Key)將消息發(fā)送到與之完全匹配的隊(duì)列。 3.Topic Exchange(主題交換機(jī)): - 這種交換機(jī)根據(jù)消息的路由鍵與隊(duì)列綁定時(shí)指定的路由鍵模式(通配符)匹配程度,將消息路由到一個(gè)或多個(gè)隊(duì)列。它通常用于發(fā)布/訂閱模式和復(fù)雜的消息路由需求。 4.Headers Exchange(頭交換機(jī)): - 這種交換機(jī)根據(jù)消息的標(biāo)頭信息(Headers)來(lái)決定消息的路由,而不是使用路由鍵。它允許開(kāi)發(fā)者在消息頭中定義多個(gè)屬性,并使用這些屬性來(lái)過(guò)濾消息。
RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會(huì)直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊(duì)列中。 相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡(jiǎn)單,一方面它接收來(lái) 自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說(shuō)把他們到許多隊(duì)列中還是說(shuō)應(yīng)該丟棄它們。這就的由交換機(jī)的類型來(lái)決定。
1、Fanout Exchange(扇型交換機(jī))
將接收到的所有消息廣播到它知道的所有隊(duì)列中
案例:
① 生產(chǎn)者
package com.example.rabbitmq.demo4;
import com.example.rabbitmq.demo3.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerFanout {
/**
* 定義交換機(jī)的名稱
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建Connection
Connection connection = RabbitMQConnection.getConnection();
// 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 通道關(guān)聯(lián)交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
String msg = "交換機(jī)樣例演示";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
② 郵件消費(fèi)者
package com.example.rabbitmq.demo4;
import com.example.rabbitmq.demo3.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MailConsumer {
/**
* 定義郵件隊(duì)列
*/
private static final String QUEUE_NAME = "fanout_email_queue";
/**
* 定義交換機(jī)的名稱
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("郵件消費(fèi)者...");
// 創(chuàng)建我們的連接
Connection connection = RabbitMQConnection.getConnection();
// 創(chuàng)建我們通道
final Channel channel = connection.createChannel();
// 關(guān)聯(lián)隊(duì)列消費(fèi)者關(guān)聯(lián)隊(duì)列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("郵件消費(fèi)者獲取消息:" + msg);
}
};
// 開(kāi)始監(jiān)聽(tīng)消息 自動(dòng)簽收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
③ 短信消費(fèi)者
package com.example.rabbitmq.demo4;
import com.example.rabbitmq.demo3.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SmsConsumer {
/**
* 定義短信隊(duì)列
*/
private static final String QUEUE_NAME = "fanout_email_sms";
/**
* 定義交換機(jī)的名稱
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消費(fèi)者...");
// 創(chuàng)建我們的連接
Connection connection = RabbitMQConnection.getConnection();
// 創(chuàng)建我們通道
final Channel channel = connection.createChannel();
// 關(guān)聯(lián)隊(duì)列消費(fèi)者關(guān)聯(lián)隊(duì)列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消費(fèi)者獲取消息:" + msg);
}
};
// 開(kāi)始監(jiān)聽(tīng)消息 自動(dòng)簽收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
④ 結(jié)果展示
2、Direct Exchange(直連交換機(jī))
消息只去到它綁定的routingKey 隊(duì)列中去
案例:一個(gè)生產(chǎn)者,兩個(gè)消費(fèi)者
①生產(chǎn)者
package com.example.rabbitmq.demo5;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 創(chuàng)建多個(gè) bindingKey
Map
bindingKeyMap.put("info"," 普通 info 信息");
bindingKeyMap.put("warning"," 警告 warning 信息");
bindingKeyMap.put("error"," 錯(cuò)誤 error 信息");
//debug 沒(méi)有消費(fèi)這接收這個(gè)消息 所有就丟失了
bindingKeyMap.put("debug"," 調(diào)試 debug 信息");
for (Map.Entry
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println(" 生產(chǎn)者發(fā)出消息:" + message);
}
}
}
}
②消費(fèi)者01
package com.example.rabbitmq.demo5;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;
import java.io.File;
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message=" 接收綁定鍵:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message;
File file = new File("D:\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println(" 錯(cuò)誤日志已經(jīng)接收");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
③消費(fèi)者02
package com.example.rabbitmq.demo5;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 綁 定 鍵 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
④結(jié)果展示:
3、Topic Exchange(主題交換機(jī))
盡管使用 direct 交換機(jī)改進(jìn)了我們的系統(tǒng),但是它仍然存在局限性-比方說(shuō)我們想接收的日志類型有info.base 和 info.advantage,某個(gè)隊(duì)列只想 info.base 的消息,那這個(gè)時(shí)候 direct 就辦不到了。這個(gè)時(shí)候就只能使用 topic 類型
發(fā)送到類型是 topic 交換機(jī)的消息的 routing_key 不能隨意寫(xiě),必須滿足一定的要求,它必須是一個(gè)單詞列表,以點(diǎn)號(hào)分隔開(kāi)。這些單詞可以是任意單詞,比如說(shuō):"stock.usd.nyse", "nyse.vmw",
"quick.orange.rabbit".這種類型的。當(dāng)然這個(gè)單詞列表最多不能超過(guò) 255 個(gè)字節(jié)。
在這個(gè)規(guī)則列表中,其中有兩個(gè)替換符是大家需要注意的
*(星號(hào))可以代替一個(gè)單詞
#(井號(hào))可以替代零個(gè)或多個(gè)單詞
案例1:使用全匹配
生產(chǎn)者
package com.example.rabbitmq.demo6;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//連接
ConnectionFactory f = new ConnectionFactory();
f.setHost("127.0.0.1"); // wht6.cn
// f.setPort(5672); //默認(rèn)端口可以省略
f.setUsername("guest");
f.setPassword("guest");
Channel c = f.newConnection().createChannel();
//定義交換機(jī)
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//向交換機(jī)發(fā)消息,并攜帶路由鍵關(guān)鍵詞
while (true) {
System.out.print("輸入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("輸入路由鍵:");
String key = new Scanner(System.in).nextLine();
c.basicPublish("topic_logs", key, null, s.getBytes());
}
}
}
消費(fèi)者
package com.example.rabbitmq.demo6;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
public class Consumer {
public static void main(String[] args) throws Exception {
//連接
ConnectionFactory f = new ConnectionFactory();
f.setHost("127.0.0.1"); // wht6.cn
// f.setPort(5672); //默認(rèn)端口可以省略
f.setUsername("guest");
f.setPassword("guest");
Channel c = f.newConnection().createChannel();
// 定義隨機(jī)隊(duì)列,定義交換機(jī),用綁定鍵綁定
String queue = UUID.randomUUID().toString();
c.queueDeclare(queue, false, true, true, null);
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
System.out.println("輸入綁定鍵,用空格隔開(kāi):");
String s = new Scanner(System.in).nextLine();// "aa bb cc"-->["aa", "bb", "cc"]
String[] a = s.split("\\s+");
for (String key : a) {
c.queueBind(queue, "topic_logs", key);
}
// 從隨機(jī)隊(duì)列正常消費(fèi)數(shù)據(jù)
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String key = message.getEnvelope().getRoutingKey();
String s = new String(message.getBody());
System.out.println(key+" - 收到: "+s);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
// 消費(fèi)數(shù)據(jù)
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
結(jié)果:
案例2
生產(chǎn)者
package com.example.rabbitmq.demo6;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1--> 綁定的是
* 中間帶 orange 帶 3 個(gè)單詞的字符串 (*.orange.*)
* Q2--> 綁定的是
* 最后一個(gè)單詞是 rabbit 的 3 個(gè)單詞 (*.*.rabbit)
* 第一個(gè)單詞是 lazy 的多個(gè)單詞 (lazy.#)
*
*/
Map
bindingKeyMap.put("quick.orange.rabbit"," 被隊(duì)列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant"," 被隊(duì)列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox"," 被隊(duì)列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox"," 被隊(duì)列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit"," 雖然滿足兩個(gè)綁定但只被隊(duì)列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox"," 不匹配任何綁定不會(huì)被任何隊(duì)列接收到會(huì)被丟棄");
bindingKeyMap.put("quick.orange.male.rabbit"," 是四個(gè)單詞不匹配任何綁定會(huì)被丟棄");
bindingKeyMap.put("lazy.orange.male.rabbit"," 是四個(gè)單詞但匹配 Q2");
for (Map.Entry
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println(" 生產(chǎn)者發(fā)出消息" + message);
}
}
}
}
消費(fèi)者01
package com.example.rabbitmq.demo6;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 聲明 Q1 隊(duì)列與綁定關(guān)系
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 隊(duì) 列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
消費(fèi)者02
package com.example.rabbitmq.demo6;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 聲明 Q2 隊(duì)列與綁定關(guān)系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 隊(duì) 列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
結(jié)果:
SpringBoot整合RabbitMq
Maven依賴
配置文件
application.yml
spring:
rabbitmq:
host: 127.0.0.1 # wht6.cn
port: 5672
username: guest
password: guest
virtual-host: /jxHosts
listener:
simple:
prefetch: 1 # 預(yù)抓取消息數(shù),spring默認(rèn)250
方式一:聚合項(xiàng)目
Main
package cn.tedu.rabbitmqspring.m5;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
import java.util.UUID;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//定義交換機(jī)
@Bean
public TopicExchange logsExchange() {
// 參數(shù): 1.名稱 2.持久 3.自動(dòng)刪除
// 默認(rèn)是持久交換機(jī)
return new TopicExchange("topic_logs", false, false);
}
/*
默認(rèn)使用方法名作為對(duì)象的屬性名放入spring容器
"rndQueue" ---- Queue實(shí)例
*/
@Bean
public Queue rndQueue() {
return new Queue(UUID.randomUUID().toString(), false, true, true);
}
//用生產(chǎn)者發(fā)送消息
@Autowired
private Producer p;
/*
@PostConstruct
spring在完成掃描創(chuàng)建實(shí)例,完成所有的依賴注入后,
會(huì)自動(dòng)地執(zhí)行 @PostConstruct 方法
spring單線程執(zhí)行下面的流程:
創(chuàng)建實(shí)例 --> 完成注入 --> @PostConstruct --> 后續(xù)配置
*/
@PostConstruct
public void test() {
// new Thread(new Runnable() {
// @Override
// public void run() {
// //線程中執(zhí)行的代碼
// }
// }).start();
// lambda 表達(dá)式,匿名內(nèi)部類的簡(jiǎn)寫(xiě)
new Thread(() -> p.send()).start();
}
}
生產(chǎn)者Producer
package cn.tedu.rabbitmqspring.m5;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Scanner;
@Component
public class Producer {
// RabbitAutoConfiguration 自動(dòng)配置類中
// 自動(dòng)創(chuàng)建 AmqpTemplate 實(shí)例
@Autowired
private AmqpTemplate t;
//生產(chǎn)者的發(fā)送方法,需要手動(dòng)調(diào)用
public void send() {
while (true) {
System.out.print("輸入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("輸入路由鍵:");
String k = new Scanner(System.in).nextLine();
t.convertAndSend("topic_logs", k, s);
}
}
}
消費(fèi)者Consumer
package cn.tedu.rabbitmqspring.m5;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 啟動(dòng)一個(gè)消費(fèi)者,從隊(duì)列接收消息,并傳遞給這個(gè)方法進(jìn)行處理
// 消費(fèi)者自動(dòng)啟動(dòng),不需要手動(dòng)調(diào)用
// spring expression language - SPEL #{} 可以直接訪問(wèn)spring容器中的對(duì)象
// ${} OGNL
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "#{rndQueue.name}",declare="false"), //隊(duì)列,不指定參數(shù),由服務(wù)器自動(dòng)命名,非持久,獨(dú)占,自動(dòng)刪除
exchange = @Exchange(name = "topic_logs",declare="false"), //交換機(jī),declare="false"使用存在的交換機(jī)而不重復(fù)定義
key = {"*.orange.*"}
))
public void receive1(String msg) {
System.out.println("消費(fèi)者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //隊(duì)列,不指定參數(shù),由服務(wù)器自動(dòng)命名,非持久,獨(dú)占,自動(dòng)刪除
exchange = @Exchange(name = "topic_logs",declare="false"), //交換機(jī),declare="false"使用存在的交換機(jī)而不重復(fù)定義
key = {"*.*.rabbit", "lazy.#"}
))
public void receive2(String msg) {
System.out.println("消費(fèi)者2收到:"+msg);
}
}
方式二:分離項(xiàng)目
分為三個(gè)子項(xiàng)目:生產(chǎn)者,短信消費(fèi)者,郵件消費(fèi)者
消息實(shí)體類
package com.boyatop.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @ClassName MsgEntity
* @Author www.boyatop.com
* @Version V1.0
**/
@Data
public class MsgEntity implements Serializable {
private String msgId;
private String userId;
private String phone;
private String email;
public MsgEntity(String msgId, String userId, String phone, String email) {
this.msgId = msgId;
this.userId = userId;
this.phone = phone;
this.email = email;
}
}
生產(chǎn)者
配置類
package com.boyatop.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName RabbitMQConfig
* @Author www.boyatop.com
* @Version V1.0
**/
@Component
public class RabbitMQConfig {
/**
* 定義交換機(jī)
*/
private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_ex";
/**
* 短信隊(duì)列
*/
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
/**
* 郵件隊(duì)列
*/
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
// 1.注入隊(duì)列和交換機(jī)注入到spring容器中
// 2.關(guān)聯(lián)交換機(jī)
/**
* 郵件和短信隊(duì)列注入到spring容器中
* @return
*/
@Bean
public Queue smsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
@Bean
public Queue emailQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
}
/**
* 關(guān)聯(lián)交換機(jī)
* 根據(jù)參數(shù)名稱 ioc獲取 Queue對(duì)象
*/
@Bean
public Binding BindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(smsQueue).to(fanoutExchange);
}
@Bean
public Binding BindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(emailQueue).to(fanoutExchange);
}
}
生產(chǎn)者Controller
package com.boyatop.controller;
import com.boyatop.entity.MsgEntity;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @ClassName ProducerController
* @Author www.boyatop.com
* @Version V1.0
**/
@RestController
public class ProducerController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/sendMsg")
public void sendMsg() {
/**
* 參數(shù)1 交換機(jī)名稱
* 參數(shù)2 路由key
* 參數(shù)3 發(fā)送內(nèi)容
*/
MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),
"1234", "181111111", "1019902226@qq.com");
amqpTemplate.convertAndSend("boyatop_ex", "", msgEntity);
}
}
短信消費(fèi)者
package com.boyatop;
import com.boyatop.entity.MsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName FanoutSmsConsumer
* @Author www.boyatop.com
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(MsgEntity msgEntity) {
log.info("sms:msgEntity:" + msgEntity);
}
}
郵件消費(fèi)者
package com.boyatop;
import com.boyatop.entity.MsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName FanoutEmailConsumer
* @Author www.boyatop.com
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_email_queue")
public class FanoutEmailConsumer {
@RabbitHandler
public void process(MsgEntity msgEntity) {
log.info("email:msgEntity:" + msgEntity);
}
}
SpringBoot開(kāi)啟消息確認(rèn)機(jī)制+解決冪等性問(wèn)題+生產(chǎn)者獲取消費(fèi)結(jié)果
創(chuàng)建SpringBoot項(xiàng)目,導(dǎo)入相關(guān)依賴
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
配置文件application.yml
spring:
rabbitmq:
####連接地址
host: 127.0.0.1
####端口號(hào)
port: 5672
####賬號(hào)
username: guest
####密碼
password: guest
### 地址
virtual-host: /jxHosts
listener:
simple:
retry:
##開(kāi)啟消費(fèi)者(程序出現(xiàn)異常的情況下)進(jìn)行重試
enabled: true
##最大重試次數(shù)
max-attempts: 5
##重試間隔時(shí)間
initial-interval: 3000
##消費(fèi)者需要手動(dòng)發(fā)送確認(rèn)信號(hào)
acknowledge-mode: manual
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/first?useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
server:
port: 9090
RabbitMq配置類
package com.example.orderproducerconsumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName RabbitMQConfig
* @Author
* @Version V1.0
**/
@Component
public class RabbitMQConfig {
/**
* 定義交換機(jī)
*/
private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_order";
/**
* 訂單隊(duì)列
*/
private String FANOUT_ORDER_QUEUE = "fanout_order_queue";
/**
* 配置orderQueue
*
* @return
*/
@Bean
public Queue orderQueue() {
return new Queue(FANOUT_ORDER_QUEUE);
}
/**
* 配置fanoutExchange
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
}
// 綁定交換機(jī) orderQueue
@Bean
public Binding bindingOrderFanoutExchange(Queue orderQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(orderQueue).to(fanoutExchange);
}
}
消息記錄實(shí)體類
package com.example.orderproducerconsumer.entity;
import java.io.Serializable;
public class OrderEntity implements Serializable {
private int id;
private String orderName;
private String orderId;
public OrderEntity(String orderName, String orderId) {
this.orderName = orderName;
this.orderId = orderId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getOrderName() {
return orderName;
}
public void setOrderName(String orderName) {
this.orderName = orderName;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public OrderEntity(int id, String orderName, String orderId) {
this.id = id;
this.orderName = orderName;
this.orderId = orderId;
}
public OrderEntity() {
}
}
Mapper
package com.example.orderproducerconsumer.mapper;
import com.example.orderproducerconsumer.entity.OrderEntity;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Component;
@Mapper
public interface OrderMapper {
@Insert("insert order_info values (null,#{orderName},#{orderId})")
int addOrder(OrderEntity orderEntity);
@Select("SELECT * from order_info where orderId=#{orderId} ")
OrderEntity getOrder(String orderId);
}
controller
package com.example.orderproducerconsumer.controller;
import com.example.orderproducerconsumer.entity.OrderEntity;
import com.example.orderproducerconsumer.mapper.OrderMapper;
import com.example.orderproducerconsumer.producer.OrderProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class OrderController {
@Autowired
private OrderProducer orderProducer;
@Autowired
private OrderMapper orderMapper;
@RequestMapping("/sendOrder")
public String sendOrder() {
// 生成全局id
String orderId = System.currentTimeMillis() + "";
log.info("orderId:{}", orderId);
String orderName = " boyatop";
orderProducer.sendMsg(orderName, orderId);
return orderId;
}
/**
* 前端主動(dòng)根據(jù)orderId定時(shí)查詢
*
* @param orderId
* @return
*/
@RequestMapping("/getOrder")
public Object getOrder(String orderId) {
OrderEntity order = orderMapper.getOrder(orderId);
if (order == null) {
return "該訂單沒(méi)有被消費(fèi)或者訂單號(hào)錯(cuò)誤!";
}
return order;
}
}
生產(chǎn)者
package com.example.orderproducerconsumer.producer;
import com.alibaba.fastjson.JSONObject;
import com.example.orderproducerconsumer.entity.OrderEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName OrderProducer
* @Author
* @Version V1.0
**/
@Component
@Slf4j
public class OrderProducer implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData.getId();
log.info("id:" + id);
}
/**
* 使用mq發(fā)送消息
*
* @param orderName
* @param orderId
*/
public void sendMsg(String orderName, String orderId) {
OrderEntity orderEntity = new OrderEntity(orderName, orderId);
rabbitTemplate.convertAndSend("boyatop_order", "", orderEntity, message -> {
return message;
});
// CorrelationData correlationData = new CorrelationData();
// correlationData.setId(JSONObject.toJSONString(orderEntity));
// rabbitTemplate.convertAndSend("/boyatop_order", "", orderEntity,correlationData);
}
}
消費(fèi)者
package com.example.orderproducerconsumer.consumer;
import com.example.orderproducerconsumer.entity.OrderEntity;
import com.example.orderproducerconsumer.mapper.OrderMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName fanout_sms_queue
* @Author
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_order_queue")
public class FanoutOrderConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitHandler
public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
try {
log.info(">>orderEntity:{}<<", orderEntity.toString());
String orderId = orderEntity.getOrderId();
if (StringUtils.isEmpty(orderId)) {
return;
}
OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
if (dbOrderEntity != null) {
log.info("另外消費(fèi)者已經(jīng)處理過(guò)該業(yè)務(wù)邏輯");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
int result = orderMapper.addOrder(orderEntity);
int i = 1 / 0;
log.info(">>插入數(shù)據(jù)庫(kù)中數(shù)據(jù)成功<<");
//開(kāi)啟消息確認(rèn)機(jī)制
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 記錄該消息日志形式 存放數(shù)據(jù)庫(kù)db中、后期通過(guò)定時(shí)任務(wù)實(shí)現(xiàn)消息補(bǔ)償、人工實(shí)現(xiàn)補(bǔ)償
//消息處理失敗,并將消息重新放回隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//將該消息存放到死信隊(duì)列中,單獨(dú)寫(xiě)一個(gè)死信消費(fèi)者實(shí)現(xiàn)消費(fèi)。
}
}
}
死信隊(duì)列
1、概念
死信,顧名思義就是無(wú)法被消費(fèi)的消息,字面意思可以這樣理解,一般來(lái)說(shuō),producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因?qū)е?queue 中的某些消息無(wú)法被消費(fèi),這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。
應(yīng)用場(chǎng)景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中.還有比如說(shuō): 用戶在商城下單成功并點(diǎn)擊去支付后在指定時(shí)間未支付時(shí)自動(dòng)失效
2、產(chǎn)生原因
1.??消息投遞到MQ中存放 消息已經(jīng)過(guò)期? 消費(fèi)者沒(méi)有及時(shí)的獲取到我們消息,消息如果存放到mq服務(wù)器中過(guò)期之后,會(huì)轉(zhuǎn)移到備胎死信隊(duì)列存放。
2.??隊(duì)列達(dá)到最大的長(zhǎng)度 (隊(duì)列容器已經(jīng)滿了)
3.????消費(fèi)者消費(fèi)多次消息失敗,就會(huì)轉(zhuǎn)移存放到死信隊(duì)列中;消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false.
3、死信隊(duì)列的架構(gòu)原理
死信隊(duì)列和普通隊(duì)列區(qū)別不是很大
普通與死信隊(duì)列都有自己獨(dú)立的交換機(jī)和路由key、隊(duì)列和消費(fèi)者。
區(qū)別:
1.生產(chǎn)者投遞消息先投遞到我們普通交換機(jī)中,普通交換機(jī)在將該消息投到
普通隊(duì)列中緩存起來(lái),普通隊(duì)列對(duì)應(yīng)有自己獨(dú)立普通消費(fèi)者。
2.如果生產(chǎn)者投遞消息到普通隊(duì)列中,普通隊(duì)列發(fā)現(xiàn)該消息一直沒(méi)有被消費(fèi)者消費(fèi)
的情況下,在這時(shí)候會(huì)將該消息轉(zhuǎn)移到死信(備胎)交換機(jī)中,死信(備胎)交換機(jī)
對(duì)應(yīng)有自己獨(dú)立的 死信(備胎)隊(duì)列 對(duì)應(yīng)獨(dú)立死信(備胎)消費(fèi)者。
4、死信實(shí)戰(zhàn)
1)消息TTL過(guò)期
1.生產(chǎn)者代碼
package com.example.rabbitmq.demo7;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
/**
* TTL時(shí)間過(guò)期
*/
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 設(shè)置消息的 TTL 時(shí)間
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();
// 該信息是用作演示隊(duì)列個(gè)數(shù)限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
message.getBytes());
System.out.println(" 生產(chǎn)者發(fā)送消息:"+message);
}
}
}
}
2.消費(fèi)者01代碼(啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)
package com.example.rabbitmq.demo7;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
/**
* TTL時(shí)間過(guò)期
*/
public class Consumer01 {
// 普通交換機(jī)名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交換機(jī)名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 聲明死信和普通交換機(jī) 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 聲明死信隊(duì)列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信隊(duì)列綁定死信交換機(jī)與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常隊(duì)列綁定死信隊(duì)列信息
Map
// 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
3.消費(fèi)者02代碼( 以上步驟完成后 啟動(dòng) 2 C2 消費(fèi)者 它消費(fèi)死信隊(duì)列里面的消息)
package com.example.rabbitmq.demo7;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* TTL時(shí)間過(guò)期
*/
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(" 等待接收死信隊(duì)列消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信隊(duì)列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
2)隊(duì)列達(dá)到最大長(zhǎng)度
1.消息生產(chǎn)者代碼去掉 TTL 屬性
/**
* 隊(duì)列達(dá)到最大長(zhǎng)度
*/
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 該信息是用作演示隊(duì)列個(gè)數(shù)限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println(" 生產(chǎn)者發(fā)送消息:"+message);
}
}
}
}
2.C1 消費(fèi)者修改以下代碼 ( 啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)
添加 params.put("x-max-length",6);
/**
* 隊(duì)列達(dá)到最大長(zhǎng)度
*/
public class Consumer01 {
// 普通交換機(jī)名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交換機(jī)名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 聲明死信和普通交換機(jī) 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 聲明死信隊(duì)列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信隊(duì)列綁定死信交換機(jī)與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常隊(duì)列綁定死信隊(duì)列信息
Map
// 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
params.put("x-max-length",6);
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
注意此時(shí)需要把原先隊(duì)列刪除 因?yàn)閰?shù)改變了
3.C2 消費(fèi)者代碼不變( 啟動(dòng) C2 消費(fèi)者)
3)消息被拒
1.消息生產(chǎn)者代碼同上生產(chǎn)者一致
2.C1 消費(fèi)者代碼( 啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)
/**
* 消息被拒
*/
public class Consumer01 {
// 普通交換機(jī)名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交換機(jī)名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 聲明死信和普通交換機(jī) 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 聲明死信隊(duì)列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信隊(duì)列綁定死信交換機(jī)與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常隊(duì)列綁定死信隊(duì)列信息
Map
// 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if(message.equals("info5")){
System.out.println("Consumer01 接收到消息" + message + " 并拒絕簽收該消息");
//requeue 設(shè)置為 false 代表拒絕重新入隊(duì) 該隊(duì)列如果配置了死信交換機(jī)將發(fā)送到死信隊(duì)列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
}
}
3.C2 消費(fèi)者代碼不變
啟動(dòng)消費(fèi)者 01 然后再啟動(dòng)消費(fèi)者 02
注意此時(shí)需要把原先隊(duì)列刪除 因?yàn)閰?shù)改變了
生產(chǎn)者如何獲取消費(fèi)結(jié)果
Rocketmq 自帶全局消息id,能夠根據(jù)該全局消息獲取消費(fèi)結(jié)果
原理: 生產(chǎn)者投遞消息到mq服務(wù)器,mq服務(wù)器端在這時(shí)候返回一個(gè)全局的消息id,當(dāng)我們消費(fèi)者消費(fèi)該消息成功之后,消費(fèi)者會(huì)給我們mq服務(wù)器端發(fā)送通知標(biāo)記該消息消費(fèi)成功。
生產(chǎn)者獲取到該消息全局id,每隔2s時(shí)間調(diào)用mq服務(wù)器端接口查詢?cè)撓⑹欠裼斜幌M(fèi)成功。
冪等性
1、概念
用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。
舉個(gè)最簡(jiǎn)單的例子,那就是支付,用戶購(gòu)買商品后支付,支付扣款成功,但是返回結(jié)果的時(shí)候網(wǎng)絡(luò)異常,此時(shí)錢已經(jīng)扣了,用戶再次點(diǎn)擊按鈕,此時(shí)會(huì)進(jìn)行第二次扣款,返回結(jié)果成功,用戶查詢余額發(fā)現(xiàn)多扣錢了,流水記錄也變成了兩條。在以前的單應(yīng)用系統(tǒng)中,我們只需要把數(shù)據(jù)操作放入事務(wù)中即可,發(fā)生錯(cuò)誤立即回滾,但是再響應(yīng)客戶端的時(shí)候也有可能出現(xiàn)網(wǎng)絡(luò)中斷或者異常等等
消費(fèi)者在消費(fèi) MQ 中的消息時(shí),MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷,故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息。
2、解決冪等性問(wèn)題
MQ 消費(fèi)者的冪等性的解決一般使用全局 ID 或者寫(xiě)個(gè)唯一標(biāo)識(shí)比如時(shí)間戳 或者 UUID 或者訂單消費(fèi)者消費(fèi) MQ 中的消息也可利用 MQ 的該 id 來(lái)判斷,或者可按自己的規(guī)則生成一個(gè)全局唯一 id,每次消費(fèi)消息時(shí)用該 id 先判斷該消息是否已消費(fèi)過(guò)。
主流的有兩種解決思路:
a.唯一 ID+指紋碼機(jī)制,利用數(shù)據(jù)庫(kù)主鍵去重
指紋碼:我們的一些規(guī)則或者時(shí)間戳加別的服務(wù)給到的唯一信息碼,它并不一定是我們系統(tǒng)生成的,基本都是由我們的業(yè)務(wù)規(guī)則拼接而來(lái),但是一定要保證唯一性,然后就利用查詢語(yǔ)句進(jìn)行判斷這個(gè) id 是否存在數(shù)據(jù)庫(kù)中,優(yōu)勢(shì)就是實(shí)現(xiàn)簡(jiǎn)單就一個(gè)拼接,然后查詢判斷是否重復(fù);劣勢(shì)就是在高并發(fā)時(shí),如果是單個(gè)數(shù)據(jù)庫(kù)就會(huì)有寫(xiě)入性能瓶頸當(dāng)然也可以采用分庫(kù)分表提升性能,但也不是我們最推薦的方式。
b.利用 redis 的原子性去實(shí)現(xiàn)
利用 redis 執(zhí)行 setnx 命令,天然具有冪等性。從而實(shí)現(xiàn)不重復(fù)消費(fèi)
延遲隊(duì)列
1、概念
延時(shí)隊(duì)列,隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。
2、延遲隊(duì)列使用場(chǎng)景
1.訂單在十分鐘之內(nèi)未支付則自動(dòng)取消
2.新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。
3.用戶注冊(cè)成功后,如果三天內(nèi)沒(méi)有登陸則進(jìn)行短信提醒。
4.用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
5.預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議
3、RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒。換句話說(shuō),如果一條消息設(shè)置了 TTL 屬性或者進(jìn)入了設(shè)置 TTL 屬性的隊(duì)列,那么這條消息如果在 TTL 設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi),則會(huì)成為"死信"。如果同時(shí)配置了隊(duì)列的 TTL 和消息的TTL,那么較小的那個(gè)值將會(huì)被使用,有兩種方式設(shè)置 TTL。
① 消息設(shè)置 TTL
一種方式便是針對(duì)每條消息設(shè)置 TTL
② 隊(duì)列設(shè)置 TTL
一種是在創(chuàng)建隊(duì)列的時(shí)候設(shè)置隊(duì)列的“x-message-ttl”屬性
③ 二者區(qū)別
第一種方式,消息即使過(guò)期,也不一定會(huì)被馬上丟棄,因?yàn)橄⑹欠襁^(guò)期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過(guò)期的消息也許還能存活較長(zhǎng)時(shí)間;如果設(shè)置了隊(duì)列的 TTL 屬性,那么一旦消息過(guò)期,就會(huì)被隊(duì)列丟棄(如果配置了死信隊(duì)列被丟到死信隊(duì)列中)。另外,還需要注意的一點(diǎn)是,如果不設(shè)置 TTL,表示消息永遠(yuǎn)不會(huì)過(guò)期,如果將 TTL 設(shè)置為 0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。
4、延遲隊(duì)列案例
創(chuàng)建兩個(gè)隊(duì)列 QA 和 QB,兩者隊(duì)列 TTL 分別設(shè)置為 10S 和 40S,然后在創(chuàng)建一個(gè)交換機(jī) X 和死信交換機(jī) Y,它們的類型都是 direct,創(chuàng)建一個(gè)死信隊(duì)列 QD,它們的綁定關(guān)系如下:
配置類
package com.example.orderproducerconsumer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 聲明 xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 聲明 xExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 聲明隊(duì)列 A ttl 為 10s 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("queueA")
public Queue queueA(){
Map
// 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 聲明當(dāng)前隊(duì)列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 聲明隊(duì)列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 聲明隊(duì)列 A 綁定 X 交換機(jī)
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 聲明隊(duì)列 B ttl 為 40s 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("queueB")
public Queue queueB(){
Map
// 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 聲明當(dāng)前隊(duì)列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 聲明隊(duì)列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
// 聲明隊(duì)列 B 綁定 X 交換機(jī)
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
// 聲明死信隊(duì)列 QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
// 聲明死信隊(duì)列 QD 綁定關(guān)系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
消息生產(chǎn)者
package com.example.orderproducerconsumer.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info(" 當(dāng)前時(shí)間:{}, 發(fā)送一條信息給兩個(gè) TTL 隊(duì)列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", " 消息來(lái)自 ttl 為 為 10S 的隊(duì)列: "+message);
rabbitTemplate.convertAndSend("X", "XB", " 消息來(lái)自 ttl 為 為 40S 的隊(duì)列: "+message);
}
}
消息消費(fèi)者
package com.example.orderproducerconsumer.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info(" 當(dāng)前時(shí)間:{}, 收到死信隊(duì)列信息{}", new Date().toString(), msg);
}
}
發(fā)起一個(gè)請(qǐng)求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一條消息在 10S 后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在 40S 之后變成了死信消息,然后被消費(fèi)掉,這樣一個(gè)延時(shí)隊(duì)列就打造完成了。
不過(guò),如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,這里只有 10S 和 40S兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理,那么就需要增加 TTL 為一個(gè)小時(shí)的隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求?
5、延時(shí)隊(duì)列優(yōu)化
在這里新增了一個(gè)隊(duì)列 QC,綁定關(guān)系如下,該隊(duì)列不設(shè)置 TTL 時(shí)間
配置類
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
// 聲明隊(duì)列 C 死信交換機(jī)
@Bean("queueC")
public Queue queueB(){
Map
// 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 聲明當(dāng)前隊(duì)列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 沒(méi)有聲明 TTL 屬性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
// 聲明隊(duì)列 B 綁定 X 交換機(jī)
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
消息生產(chǎn)者
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info(" 當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng){} 毫秒 TTL 信息給隊(duì)列 C:{}", new Date(),ttlTime, message);
}
發(fā)起請(qǐng)求
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。
6、Rabbitmq 插件實(shí)現(xiàn)延遲隊(duì)列
上文中提到的問(wèn)題,確實(shí)是一個(gè)問(wèn)題,如果不能實(shí)現(xiàn)在消息粒度上的 TTL,并使其在設(shè)置的 TTL 時(shí)間及時(shí)死亡,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列。那如何解決呢,接下來(lái)我們就去解決該問(wèn)題。
安裝延時(shí)隊(duì)列插件
在官網(wǎng)上下載 Community Plugins | RabbitMQ,下載
rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄。
進(jìn)入 RabbitMQ 的安裝目錄下的 plgins 目錄,執(zhí)行下面命令讓該插件生效,然后重啟 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
7、插件代碼實(shí)現(xiàn)
在這里新增了一個(gè)隊(duì)列 delayed.queue,一個(gè)自定義交換機(jī) delayed.exchange,綁定關(guān)系如下:
在我們自定義的交換機(jī)中,這是一種新的交換類型,該類型消息支持延遲投遞機(jī)制 消息傳遞后并不會(huì)立即投遞到目標(biāo)隊(duì)列中,而是存儲(chǔ)在 mnesia(一個(gè)分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時(shí)間時(shí),才投遞到目標(biāo)隊(duì)列中。
配置類
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 自定義交換機(jī) 我們?cè)谶@里定義的是一個(gè)延遲交換機(jī)
@Bean
public CustomExchange delayedExchange() {
Map
// 自定義交換機(jī)的類型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
消息生產(chǎn)者
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData ->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 當(dāng) 前 時(shí) 間 : {}, 發(fā) 送 一 條 延 遲 {} 毫 秒 的 信 息 給 隊(duì) 列 delayed.queue:{}", new
Date(),delayTime, message);
}
消息消費(fèi)者
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info(" 當(dāng)前時(shí)間:{}, 收到延時(shí)隊(duì)列的消息:{}", new Date().toString(), msg);
}
發(fā)起請(qǐng)求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二個(gè)消息被先消費(fèi)掉了,符合預(yù)期
8、總結(jié)
延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。
優(yōu)先級(jí)隊(duì)列
使用場(chǎng)景
在我們系統(tǒng)中有一個(gè)訂單催付的場(chǎng)景,我們的客戶在天貓下的訂單,淘寶會(huì)及時(shí)將訂單推送給我們,如果在用戶設(shè)定的時(shí)間內(nèi)未付款那么就會(huì)給用戶推送一條短信提醒,很簡(jiǎn)單的一個(gè)功能對(duì)吧,但是,tmall商家對(duì)我們來(lái)說(shuō),肯定是要分大客戶和小客戶的對(duì)吧,比如像蘋果,小米這樣大商家一年起碼能給我們創(chuàng)造很大的利潤(rùn),所以理應(yīng)當(dāng)然,他們的訂單必須得到優(yōu)先處理,而曾經(jīng)我們的后端系統(tǒng)是使用 redis 來(lái)存放的定時(shí)輪詢,大家都知道 redis 只能用 List 做一個(gè)簡(jiǎn)簡(jiǎn)單單的消息隊(duì)列,并不能實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)的場(chǎng)景,所以訂單量大了后采用 RabbitMQ 進(jìn)行改造和優(yōu)化,如果發(fā)現(xiàn)是大客戶的訂單給一個(gè)相對(duì)比較高的優(yōu)先級(jí),否則就是默認(rèn)優(yōu)先級(jí)。
如何添加
a.控制臺(tái)頁(yè)面添加
b.隊(duì)列中代碼添加優(yōu)先級(jí)
Map
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
c.消息中代碼添加優(yōu)先級(jí)
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build();
d.注意事項(xiàng)
要讓隊(duì)列實(shí)現(xiàn)優(yōu)先級(jí)需要做的事情有如下事情:隊(duì)列需要設(shè)置為優(yōu)先級(jí)隊(duì)列,消息需要設(shè)置消息的優(yōu)先級(jí),消費(fèi)者需要等待消息已經(jīng)發(fā)送到隊(duì)列中才去消費(fèi)因?yàn)?,這樣才有機(jī)會(huì)對(duì)消息進(jìn)行排序
優(yōu)先級(jí)隊(duì)列案例
消息生產(chǎn)者
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
// 給消息賦予一個(gè) priority 屬性
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++) {
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println(" 發(fā)送消息完成:" + message);
}
}
}
}
消息消費(fèi)者
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 設(shè)置隊(duì)列的最大優(yōu)先級(jí) 最大可以設(shè)置到 255 官網(wǎng)推薦 1-10 如果設(shè)置太高比較吃內(nèi)存和 CPU
Map
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println(" 消費(fèi)者啟動(dòng)等待消費(fèi)......");
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println(" 消費(fèi)者無(wú)法消費(fèi) 消息時(shí)調(diào)用,如隊(duì)列被刪除");
});
}
}
惰性隊(duì)列
1. 使用場(chǎng)景
RabbitMQ 從 3.6.0 版本開(kāi)始引入了惰性隊(duì)列的概念。惰性隊(duì)列會(huì)盡可能的將消息存入磁盤中,而在消費(fèi)者消費(fèi)到相應(yīng)的消息時(shí)才會(huì)被加載到內(nèi)存中,它的一個(gè)重要的設(shè)計(jì)目標(biāo)是能夠支持更長(zhǎng)的隊(duì)列,即支持更多的消息存儲(chǔ)。當(dāng)消費(fèi)者由于各種各樣的原因(比如消費(fèi)者下線、宕機(jī)亦或者是由于維護(hù)而關(guān)閉等)而致使長(zhǎng)時(shí)間內(nèi)不能消費(fèi)消息造成堆積時(shí),惰性隊(duì)列就很有必要了。
默認(rèn)情況下,當(dāng)生產(chǎn)者將消息發(fā)送到 RabbitMQ 的時(shí)候,隊(duì)列中的消息會(huì)盡可能的存儲(chǔ)在內(nèi)存之中,這樣可以更加快速的將消息發(fā)送給消費(fèi)者。即使是持久化的消息,在被寫(xiě)入磁盤的同時(shí)也會(huì)在內(nèi)存中駐留一份備份。當(dāng) RabbitMQ 需要釋放內(nèi)存的時(shí)候,會(huì)將內(nèi)存中的消息換頁(yè)至磁盤中,這個(gè)操作會(huì)耗費(fèi)較長(zhǎng)的時(shí)間,也會(huì)阻塞隊(duì)列的操作,進(jìn)而無(wú)法接收新的消息。雖然 RabbitMQ 的開(kāi)發(fā)者們一直在升級(jí)相關(guān)的算法,但是效果始終不太理想,尤其是在消息量特別大的時(shí)候。
2. 兩種模式
隊(duì)列具備兩種模式:default 和 lazy。默認(rèn)的為 default 模式,在 3.6.0 之前的版本無(wú)需做任何變更。lazy模式即為惰性隊(duì)列的模式,可以通過(guò)調(diào)用 channel.queueDeclare 方法的時(shí)候在參數(shù)中設(shè)置,也可以通過(guò)Policy 的方式設(shè)置,如果一個(gè)隊(duì)列同時(shí)使用這兩種方式設(shè)置的話,那么 Policy 的方式具備更高的優(yōu)先級(jí)。
如果要通過(guò)聲明的方式改變已有隊(duì)列的模式的話,那么只能先刪除隊(duì)列,然后再重新聲明一個(gè)新的。在隊(duì)列聲明的時(shí)候可以通過(guò)“x-queue-mode”參數(shù)來(lái)設(shè)置隊(duì)列的模式,取值為“default”和“l(fā)azy”。下面示例中演示了一個(gè)惰性隊(duì)列的聲明細(xì)節(jié):
Map
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
3. 內(nèi)存開(kāi)銷對(duì)比
在發(fā)送 1 百萬(wàn)條消息,每條消息大概占 1KB 的情況下,普通隊(duì)列占用內(nèi)存是 1.2GB,而惰性隊(duì)列僅僅占用 1.5MB
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ知識(shí)點(diǎn)
相關(guān)閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。