欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ知識(shí)點(diǎn)

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ知識(shí)點(diǎn)

http://yzkb.51969.com/

MQ架構(gòu)設(shè)計(jì)原理

什么是消息中間件

消息中間件基于隊(duì)列模型實(shí)現(xiàn)異步/同步傳輸數(shù)據(jù)

作用:可以實(shí)現(xiàn)支撐高并發(fā)、異步解耦、流量削峰、降低耦合度。

傳統(tǒng)的http請(qǐng)求存在那些缺點(diǎn)

1.Http請(qǐng)求基于請(qǐng)求與響應(yīng)的模型,在高并發(fā)的情況下,客戶端發(fā)送大量的請(qǐng)求達(dá)到

服務(wù)器端有可能會(huì)導(dǎo)致我們服務(wù)器端處理請(qǐng)求堆積。

2.Tomcat服務(wù)器處理每個(gè)請(qǐng)求都有自己獨(dú)立的線程,如果超過(guò)最大線程數(shù)會(huì)將該請(qǐng)求緩存到隊(duì)列中,如果請(qǐng)求堆積過(guò)多的情況下,有可能會(huì)導(dǎo)致tomcat服務(wù)器崩潰的問(wèn)題。

所以一般都會(huì)在nginx入口實(shí)現(xiàn)限流,整合服務(wù)保護(hù)框架。

?3.http請(qǐng)求處理業(yè)務(wù)邏輯如果比較耗時(shí)的情況下,容易造成客戶端一直等待,阻塞等待過(guò)程中會(huì)導(dǎo)致客戶端超時(shí)發(fā)生重試策略,有可能會(huì)引發(fā)冪等性問(wèn)題。

注意事項(xiàng):接口是為http協(xié)議的情況下,最好不要處理比較耗時(shí)的業(yè)務(wù)邏輯,耗時(shí)的業(yè)務(wù)邏輯應(yīng)該單獨(dú)交給多線程或者是mq處理。

Mq應(yīng)用場(chǎng)景有那些 消息隊(duì)列(Message Queue,簡(jiǎn)稱MQ)在分布式系統(tǒng)中有著廣泛的應(yīng)用,以下是MQ的一些主要應(yīng)用場(chǎng)景: 1.異步處理: - 當(dāng)某個(gè)操作不需要立即返回結(jié)果,或者該操作非常耗時(shí)且影響主業(yè)務(wù)流程的性能時(shí),可以使用MQ將操作放入后臺(tái)異步執(zhí)行。例如,用戶注冊(cè)成功后發(fā)送歡迎郵件或短信,這些操作不需要實(shí)時(shí)同步完成,可以放入MQ中異步處理。 2.應(yīng)用解耦: - MQ允許應(yīng)用間進(jìn)行低耦合的通信。通過(guò)將消息發(fā)送到一個(gè)或多個(gè)目的地址(隊(duì)列或主題),應(yīng)用程序可以獨(dú)立地執(zhí)行和擴(kuò)展,而不需要知道彼此的實(shí)現(xiàn)細(xì)節(jié)。例如,訂單系統(tǒng)產(chǎn)生訂單后,將訂單數(shù)據(jù)發(fā)送到MQ,庫(kù)存系統(tǒng)、物流系統(tǒng)等多個(gè)系統(tǒng)監(jiān)聽(tīng)MQ中的訂單數(shù)據(jù),根據(jù)訂單數(shù)據(jù)執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。 3.流量削峰: - 在高并發(fā)場(chǎng)景下,如果某個(gè)應(yīng)用或系統(tǒng)突然接收到大量的請(qǐng)求,可能導(dǎo)致系統(tǒng)壓力過(guò)大甚至崩潰。通過(guò)使用MQ,可以將這些請(qǐng)求暫存起來(lái),然后按照系統(tǒng)能夠處理的速率勻速地將請(qǐng)求分發(fā)給后臺(tái)服務(wù)進(jìn)行處理。這樣可以有效地緩解系統(tǒng)壓力,避免系統(tǒng)崩潰。 4.數(shù)據(jù)同步: - MQ可以實(shí)現(xiàn)不同系統(tǒng)之間的數(shù)據(jù)同步。例如,在分布式數(shù)據(jù)庫(kù)中,可以使用MQ將數(shù)據(jù)的變更事件(如增刪改操作)發(fā)布到MQ中,其他系統(tǒng)監(jiān)聽(tīng)這些事件并據(jù)此更新自己的數(shù)據(jù)副本。 5.日志處理: - MQ可以用于收集、處理和存儲(chǔ)日志信息。通過(guò)將日志信息發(fā)送到MQ中,可以異步地將日志信息寫(xiě)入到存儲(chǔ)系統(tǒng)(如HDFS、Elasticsearch等)中,并進(jìn)行后續(xù)的分析和處理。

-6.任務(wù)調(diào)度: - MQ可以用于實(shí)現(xiàn)任務(wù)的調(diào)度和執(zhí)行。通過(guò)將任務(wù)信息發(fā)送到MQ中,并指定任務(wù)的執(zhí)行時(shí)間和執(zhí)行條件,可以實(shí)現(xiàn)定時(shí)任務(wù)、延遲任務(wù)等功能的實(shí)現(xiàn)。 7.微服務(wù)架構(gòu)中的通信: - 在微服務(wù)架構(gòu)中,服務(wù)與服務(wù)之間的通信是常見(jiàn)的需求。使用MQ可以實(shí)現(xiàn)服務(wù)之間的松耦合通信,使得服務(wù)可以獨(dú)立地?cái)U(kuò)展和升級(jí)。同時(shí),MQ還可以提供消息持久化、消息確認(rèn)等機(jī)制,保證服務(wù)之間通信的可靠性和穩(wěn)定性。 8.分布式事務(wù): - 在分布式系統(tǒng)中,實(shí)現(xiàn)跨多個(gè)服務(wù)的事務(wù)一致性是一個(gè)挑戰(zhàn)。通過(guò)使用MQ的分布式事務(wù)功能(如RabbitMQ的RabbitMQ-delayed-message-exchange插件或Kafka的事務(wù)API),可以實(shí)現(xiàn)跨多個(gè)服務(wù)的分布式事務(wù)處理。 9.消息廣播與訂閱: - MQ可以實(shí)現(xiàn)消息的廣播與訂閱功能。發(fā)布者將消息發(fā)送到特定的主題(Topic),所有訂閱了該主題的消費(fèi)者都可以接收到這些消息。這種模式常用于實(shí)時(shí)消息系統(tǒng)、新聞推送等場(chǎng)景。 10.跨語(yǔ)言通信: - MQ支持多種編程語(yǔ)言和平臺(tái),可以實(shí)現(xiàn)跨語(yǔ)言、跨平臺(tái)的通信。這使得不同語(yǔ)言開(kāi)發(fā)的應(yīng)用或服務(wù)可以方便地進(jìn)行通信和數(shù)據(jù)交換。

為什么使用MQ

消息中間件(如RabbitMQ、Kafka、ActiveMQ等)在分布式系統(tǒng)中扮演著重要的角色,它們基于隊(duì)列模型(Queue Model)實(shí)現(xiàn)數(shù)據(jù)的異步或同步傳輸。下面我將詳細(xì)解釋這些中間件如何支持高并發(fā)、異步解耦、流量削峰以及降低耦合度的作用。 1.支撐高并發(fā): - 當(dāng)系統(tǒng)面臨高并發(fā)請(qǐng)求時(shí),如果每個(gè)請(qǐng)求都直接由系統(tǒng)處理,可能會(huì)導(dǎo)致系統(tǒng)資源迅速耗盡,響應(yīng)速度下降,甚至服務(wù)崩潰。 - 消息中間件通過(guò)隊(duì)列的方式將請(qǐng)求暫存起來(lái),后臺(tái)服務(wù)可以異步地從隊(duì)列中獲取請(qǐng)求并進(jìn)行處理。這樣,即使在高并發(fā)場(chǎng)景下,系統(tǒng)也能保持穩(wěn)定的響應(yīng)速度,因?yàn)檎?qǐng)求被均勻地分散到不同的時(shí)間段內(nèi)處理。 - 消息中間件通常支持多消費(fèi)者模型,即多個(gè)服務(wù)實(shí)例可以同時(shí)從隊(duì)列中消費(fèi)消息,從而進(jìn)一步提高并發(fā)處理能力。 2.異步解耦: - 在傳統(tǒng)的同步請(qǐng)求-響應(yīng)模型中,請(qǐng)求方必須等待響應(yīng)方處理完請(qǐng)求后才能繼續(xù)執(zhí)行后續(xù)操作。這種模型下,請(qǐng)求方和響應(yīng)方之間存在緊密的耦合關(guān)系。 - 消息中間件引入了異步通信機(jī)制,使得請(qǐng)求方和響應(yīng)方可以解耦。請(qǐng)求方只需將請(qǐng)求發(fā)送到消息隊(duì)列中,然后立即返回,無(wú)需等待響應(yīng)。響應(yīng)方可以在后臺(tái)異步地處理這些請(qǐng)求,并通過(guò)其他方式(如回調(diào)函數(shù)、HTTP響應(yīng)等)將結(jié)果通知給請(qǐng)求方。 - 這種異步解耦的方式可以提高系統(tǒng)的響應(yīng)速度和吞吐量,同時(shí)減少系統(tǒng)間的依賴關(guān)系,提高系統(tǒng)的可擴(kuò)展性和可維護(hù)性。 3.流量削峰: - 在某些場(chǎng)景下,系統(tǒng)可能會(huì)在短時(shí)間內(nèi)接收到大量的請(qǐng)求,導(dǎo)致系統(tǒng)負(fù)載急劇上升,甚至超過(guò)系統(tǒng)的處理能力。 - 消息中間件可以將這些請(qǐng)求暫存到隊(duì)列中,然后按照系統(tǒng)能夠處理的速率勻速地將請(qǐng)求分發(fā)給后臺(tái)服務(wù)進(jìn)行處理。這樣,即使系統(tǒng)面臨巨大的流量沖擊,也能保持穩(wěn)定的性能和響應(yīng)速度。 - 流量削峰不僅可以保護(hù)系統(tǒng)免受流量沖擊的影響,還可以提高系統(tǒng)的可用性和容錯(cuò)能力。 4.降低耦合度: - 在復(fù)雜的分布式系統(tǒng)中,各個(gè)服務(wù)之間可能存在復(fù)雜的依賴關(guān)系,這些依賴關(guān)系可能導(dǎo)致系統(tǒng)難以維護(hù)和擴(kuò)展。 - 消息中間件通過(guò)引入消息隊(duì)列作為服務(wù)之間的通信媒介,降低了服務(wù)之間的耦合度。服務(wù)之間不再需要直接調(diào)用對(duì)方的方法或接口,而是通過(guò)發(fā)送和接收消息來(lái)進(jìn)行通信。 - 這種基于消息的通信方式使得服務(wù)可以更加獨(dú)立地設(shè)計(jì)和實(shí)現(xiàn),提高了系統(tǒng)的可維護(hù)性和可擴(kuò)展性。同時(shí),由于服務(wù)之間的耦合度降低,系統(tǒng)的容錯(cuò)能力也得到了提高,即使某個(gè)服務(wù)出現(xiàn)故障,也不會(huì)影響其他服務(wù)的正常運(yùn)行。

Mq與多線程之間區(qū)別

MQ可以實(shí)現(xiàn)異步/解耦/流量削峰問(wèn)題;

多線程也可以實(shí)現(xiàn)異步,但是消耗到cpu資源,沒(méi)有實(shí)現(xiàn)解耦。

實(shí)例:(基于springboot實(shí)現(xiàn)異步多線程)

package com.example.rabbitmq.mq;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.annotation.Async;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class MemberService {

@Autowired

public MemberServiceAsync memberServiceAsync;

//@Bean

@RequestMapping("/addMember")

public String addMember(){

//1.數(shù)據(jù)庫(kù)插入數(shù)據(jù) log.info(">01<");

System.out.println(">01<");

// sms();

memberServiceAsync.smsAsync();

System.out.println(">04<");

return "用戶注冊(cè)成功??!";

}

public String sms(){

System.out.println(">02<");

try{

System.out.println(">正在發(fā)送短信<");

Thread.sleep(3000);

}catch(Exception e){

e.printStackTrace();

}

System.out.println(">03<");

return "短信發(fā)送完成!";

}

}

package com.example.rabbitmq.mq;

import org.springframework.scheduling.annotation.Async;

import org.springframework.stereotype.Component;

@Component

public class MemberServiceAsync {

@Async

public void smsAsync(){ // 注意返回類型為void,因?yàn)楫惒椒椒ㄍǔ2环祷亟Y(jié)果給調(diào)用者

System.out.println(">02<");

try{

System.out.println(">正在發(fā)送短信<");

Thread.sleep(3000);

}catch(Exception e){

e.printStackTrace();

}

System.out.println(">03<");

// 注意:這里不應(yīng)該返回任何值給調(diào)用者,因?yàn)檎{(diào)用是異步的

}

}

啟動(dòng)類

package com.example.rabbitmq;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.annotation.Bean;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@EnableAsync

@SpringBootApplication

public class RabbitMqApplication {

public static void main(String[] args) {

SpringApplication.run(RabbitMqApplication.class, args);

}

}

效果

基于多線程隊(duì)列簡(jiǎn)單實(shí)現(xiàn)mq

package com.example.rabbitmq.mq;

import org.json.JSONObject;

import java.util.concurrent.LinkedBlockingDeque;

/**

* 沒(méi)有網(wǎng)絡(luò)的情況下實(shí)現(xiàn)MQ

* 利用多線程制造生產(chǎn)者和消費(fèi)者

*/

public class BoyatopThreadMQ {

//MQ服務(wù)器 初始化消息的隊(duì)列

private static LinkedBlockingDeque msgs=new LinkedBlockingDeque();

//主函數(shù) 程序入口

public static void main(String[] args) {

//生產(chǎn)者 生產(chǎn)線程

Thread producerThread = new Thread(new Runnable() {

@Override

public void run() {

try {

while(true){

Thread.sleep(1000);

JSONObject data = new JSONObject();

data.put("userId","12345");

//存入消息隊(duì)列

msgs.offer(data);

}

} catch (Exception e) {

e.printStackTrace();

}

}

},"生產(chǎn)者");

producerThread.start();

//消費(fèi)端 消費(fèi)線程

Thread consumerThread = new Thread(new Runnable() {

@Override

public void run() {

try {

while(true){

JSONObject data = msgs.poll();

if(data != null){

System.out.println(Thread.currentThread().getName() + ",獲取到數(shù)據(jù):" + data);

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

},"消費(fèi)者");

consumerThread.start();

}

}

效果:

基于netty實(shí)現(xiàn)mq

Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,專門用于構(gòu)建高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用程序。它提供了一個(gè)快速簡(jiǎn)便的方法來(lái)構(gòu)建網(wǎng)絡(luò)服務(wù)器和客戶端,并且不需要關(guān)注復(fù)雜的網(wǎng)絡(luò)編程細(xì)節(jié)。Netty支持多種網(wǎng)絡(luò)協(xié)議,包括TCP、UDP、HTTP、WebSocket等,使其適用于各種應(yīng)用場(chǎng)景。Netty在許多大型互聯(lián)網(wǎng)公司和開(kāi)源項(xiàng)目中被廣泛使用,特別是在需要處理大量并發(fā)連接和低延遲的場(chǎng)景中。

消費(fèi)者netty客戶端與nettyServer端MQ服務(wù)器端保持長(zhǎng)連接,MQ服務(wù)器端保存消費(fèi)者連接。

生產(chǎn)者netty客戶端發(fā)送請(qǐng)求給nettyServer端MQ服務(wù)器端,MQ服務(wù)器端在將該消息內(nèi)容發(fā)送給消費(fèi)者

導(dǎo)入相關(guān)依賴

com.alibaba

fastjson

1.2.62

io.netty

netty-all

4.0.23.Final

com.alibaba

fastjson

1.2.62

org.apache.commons

commons-lang3

3.11

com.rabbitmq

amqp-client

3.3.4

Netty生產(chǎn)端

package com.example.rabbitmq.netty;

import com.alibaba.fastjson.JSONObject;

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

/**

* @ClassName BoyatopNettyMQProducer

* @Author

* @Version V1.0

**/

public class NettyProducer {

public void connect(int port, String host) throws Exception {

//配置客戶端NIO 線程組

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap client = new Bootstrap();

try {

client.group(group)

// 設(shè)置為Netty客戶端

.channel(NioSocketChannel.class)

/**

* ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)。

* Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到來(lái),組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該算法有效提高了網(wǎng)絡(luò)的有效負(fù)載,但是卻造成了延時(shí)。

* 而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸。和TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送數(shù)據(jù),適用于文件傳輸。

*/

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new NettyProducer.NettyClientHandler());

1. 演示LineBasedFrameDecoder編碼器

// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

// ch.pipeline().addLast(new StringDecoder());

}

});

//綁定端口, 異步連接操作

ChannelFuture future = client.connect(host, port).sync();

//等待客戶端連接端口關(guān)閉

future.channel().closeFuture().sync();

} finally {

//優(yōu)雅關(guān)閉 線程組

group.shutdownGracefully();

}

}

public static void main(String[] args) {

int port = 9008;

NettyProducer client = new NettyProducer();

try {

client.connect(port, "127.0.0.1");

} catch (Exception e) {

e.printStackTrace();

}

}

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

JSONObject data = new JSONObject();

data.put("type", "producer");

JSONObject msg = new JSONObject();

msg.put("userId", "123456");

msg.put("age", "23");

data.put("msg", msg);

// 生產(chǎn)發(fā)送數(shù)據(jù)

byte[] req = data.toJSONString().getBytes();

ByteBuf firstMSG = Unpooled.buffer(req.length);

firstMSG.writeBytes(req);

ctx.writeAndFlush(firstMSG);

}

/**

* 客戶端讀取到服務(wù)器端數(shù)據(jù)

*

* @param ctx

* @param msg

* @throws Exception

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

System.out.println("客戶端接收到服務(wù)器端請(qǐng)求:" + body);

}

// tcp屬于雙向傳輸

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

Netty服務(wù)端

package com.example.rabbitmq.netty;

import com.alibaba.fastjson.JSONObject;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import org.apache.commons.lang3.StringUtils;

import java.io.UnsupportedEncodingException;

import java.util.ArrayList;

import java.util.concurrent.LinkedBlockingDeque;

/**

* @ClassName

* @Author

* @Version V1.0

**/

public class NettyMQServer {

public void bind(int port) throws Exception {

/**

* Netty 抽象出兩組線程池BossGroup和WorkerGroup

* BossGroup專門負(fù)責(zé)接收客戶端的連接, WorkerGroup專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫(xiě)。

*/

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();

try {

bootstrap.group(bossGroup, workerGroup)

// 設(shè)定NioServerSocketChannel 為服務(wù)器端

.channel(NioServerSocketChannel.class)

//BACKLOG用于構(gòu)造服務(wù)端套接字ServerSocket對(duì)象,標(biāo)識(shí)當(dāng)服務(wù)器請(qǐng)求處理線程全滿時(shí),

//用于臨時(shí)存放已完成三次握手的請(qǐng)求的隊(duì)列的最大長(zhǎng)度。如果未設(shè)置或所設(shè)置的值小于1,Java將使用默認(rèn)值50。

.option(ChannelOption.SO_BACKLOG, 100)

// 服務(wù)器端監(jiān)聽(tīng)數(shù)據(jù)回調(diào)Handler

.childHandler(new NettyMQServer.ChildChannelHandler());

//綁定端口, 同步等待成功;

ChannelFuture future = bootstrap.bind(port).sync();

System.out.println("當(dāng)前服務(wù)器端啟動(dòng)成功...");

//等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉

future.channel().closeFuture().sync();

} catch (Exception e) {

e.printStackTrace();

} finally {

//優(yōu)雅關(guān)閉 線程組

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

private class ChildChannelHandler extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

// 設(shè)置異步回調(diào)監(jiān)聽(tīng)

ch.pipeline().addLast(new NettyMQServer.MayiktServerHandler());

}

}

public static void main(String[] args) throws Exception {

int port = 9008;

new NettyMQServer().bind(port);

}

private static final String type_consumer = "consumer";

private static final String type_producer = "producer";

private static LinkedBlockingDeque msgs = new LinkedBlockingDeque<>();

private static ArrayList ctxs = new ArrayList<>();

// 生產(chǎn)者投遞消息的:topicName

public class MayiktServerHandler extends SimpleChannelInboundHandler {

/**

* 服務(wù)器接收客戶端請(qǐng)求

*

* @param ctx

* @param data

* @throws Exception

*/

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object data)

throws Exception {

JSONObject clientMsg = getData(data);

String type = clientMsg.getString("type");

switch (type) {

case type_producer:

producer(clientMsg);

break;

case type_consumer:

consumer(ctx);

break;

}

}

private void consumer(ChannelHandlerContext ctx) {

// 保存消費(fèi)者連接

ctxs.add(ctx);

// 主動(dòng)拉取mq服務(wù)器端緩存中沒(méi)有被消費(fèi)的消息

String data = msgs.poll();

if (StringUtils.isEmpty(data)) {

return;

}

// 將該消息發(fā)送給消費(fèi)者

byte[] req = data.getBytes();

ByteBuf firstMSG = Unpooled.buffer(req.length);

firstMSG.writeBytes(req);

ctx.writeAndFlush(firstMSG);

}

private void producer(JSONObject clientMsg) {

// 緩存生產(chǎn)者投遞 消息

String msg = clientMsg.getString("msg");

msgs.offer(msg);

//需要將該消息推送消費(fèi)者

ctxs.forEach((ctx) -> {

// 將該消息發(fā)送給消費(fèi)者

String data = msgs.poll();

if (data == null) {

return;

}

byte[] req = data.getBytes();

ByteBuf firstMSG = Unpooled.buffer(req.length);

firstMSG.writeBytes(req);

ctx.writeAndFlush(firstMSG);

});

}

private JSONObject getData(Object data) throws UnsupportedEncodingException {

ByteBuf buf = (ByteBuf) data;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

return JSONObject.parseObject(body);

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

ctx.close();

}

}

}

Netty消費(fèi)端

package com.example.rabbitmq.netty;

import com.alibaba.fastjson.JSONObject;

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

/**

* @ClassName BoyatopNettyMQProducer

* @Author

* @Version V1.0

**/

public class NettyConsumer {

public void connect(int port, String host) throws Exception {

//配置客戶端NIO 線程組

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap client = new Bootstrap();

try {

client.group(group)

// 設(shè)置為Netty客戶端

.channel(NioSocketChannel.class)

/**

* ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)。

* Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到來(lái),組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該算法有效提高了網(wǎng)絡(luò)的有效負(fù)載,但是卻造成了延時(shí)。

* 而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸。和TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送數(shù)據(jù),適用于文件傳輸。

*/

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new NettyConsumer.NettyClientHandler());

1. 演示LineBasedFrameDecoder編碼器

// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

// ch.pipeline().addLast(new StringDecoder());

}

});

//綁定端口, 異步連接操作

ChannelFuture future = client.connect(host, port).sync();

//等待客戶端連接端口關(guān)閉

future.channel().closeFuture().sync();

} finally {

//優(yōu)雅關(guān)閉 線程組

group.shutdownGracefully();

}

}

public static void main(String[] args) {

int port = 9008;

NettyConsumer client = new NettyConsumer();

try {

client.connect(port, "127.0.0.1");

} catch (Exception e) {

e.printStackTrace();

}

}

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

JSONObject data = new JSONObject();

data.put("type", "consumer");

// 生產(chǎn)發(fā)送數(shù)據(jù)

byte[] req = data.toJSONString().getBytes();

ByteBuf firstMSG = Unpooled.buffer(req.length);

firstMSG.writeBytes(req);

ctx.writeAndFlush(firstMSG);

}

/**

* 客戶端讀取到服務(wù)器端數(shù)據(jù)

*

* @param ctx

* @param msg

* @throws Exception

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

System.out.println("客戶端接收到服務(wù)器端請(qǐng)求:" + body);

}

// tcp屬于雙向傳輸

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

netty和rabbitmq的區(qū)別

首先,從設(shè)計(jì)目的和功能上看,Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,主要用于構(gòu)建高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用程序。它提供了一個(gè)快速簡(jiǎn)便的方法來(lái)構(gòu)建網(wǎng)絡(luò)服務(wù)器和客戶端,并且不需要關(guān)注復(fù)雜的網(wǎng)絡(luò)編程細(xì)節(jié)。而RabbitMQ是一個(gè)開(kāi)源的消息隊(duì)列系統(tǒng),主要用于在不同的應(yīng)用程序之間進(jìn)行消息傳遞和異步通信。它實(shí)現(xiàn)了一種分布式消息傳遞機(jī)制,使得應(yīng)用程序可以獨(dú)立地工作,并通過(guò)消息隊(duì)列來(lái)交換數(shù)據(jù)。

其次,從使用場(chǎng)景上看,Netty更側(cè)重于底層網(wǎng)絡(luò)通信的開(kāi)發(fā),可以處理TCP、UDP等底層網(wǎng)絡(luò)協(xié)議,適用于需要高性能網(wǎng)絡(luò)通信的場(chǎng)景。而RabbitMQ則更側(cè)重于業(yè)務(wù)層次的消息傳遞,它支持多種消息傳遞協(xié)議,如AMQP、MQTT、STOMP等,并提供了可靠的消息傳遞機(jī)制、靈活的消息路由和隊(duì)列管理功能,適用于需要異步通信、解耦和流量削峰等場(chǎng)景。

此外,Netty和RabbitMQ在性能、可擴(kuò)展性、可靠性等方面也有所不同。Netty通過(guò)異步事件驅(qū)動(dòng)的方式和高效的線程模型,實(shí)現(xiàn)了高性能的網(wǎng)絡(luò)通信。而RabbitMQ則通過(guò)分布式架構(gòu)和持久化機(jī)制,實(shí)現(xiàn)了高可靠性和可擴(kuò)展性。同時(shí),RabbitMQ還提供了可視化的管理界面和多種編程語(yǔ)言的客戶端庫(kù),方便用戶進(jìn)行管理和使用。

Mq消息中間件名詞

Producer 生產(chǎn)者:投遞消息到MQ服務(wù)器端;

Consumer?消費(fèi)者:從MQ服務(wù)器端獲取消息處理業(yè)務(wù)邏輯;

Broker??MQ服務(wù)器端

Topic 主題:分類業(yè)務(wù)邏輯發(fā)送短信主題、發(fā)送優(yōu)惠券主題

Queue 存放消息模型隊(duì)列 先進(jìn)先出 后進(jìn)后出原則 數(shù)組/鏈表

Message 生產(chǎn)者投遞消息報(bào)文:json

主流mq區(qū)別對(duì)比

特性 ActiveMQ RabbitMQ RocketMQ kafka 開(kāi)發(fā)語(yǔ)言 java erlang java scala 單機(jī)吞吐量 萬(wàn)級(jí) 萬(wàn)級(jí) 10萬(wàn)級(jí) 10萬(wàn)級(jí) 時(shí)效性 ms級(jí) us級(jí) ms級(jí) ms級(jí)以內(nèi) 可用性 高(主從架構(gòu)) 高(主從架構(gòu)) 非常高(分布式架構(gòu)) 非常高(分布式架構(gòu)) 功能特性 成熟的產(chǎn)品,在很多公司得到應(yīng)用;有較多的文檔;各種協(xié)議支持較好 基于erlang開(kāi)發(fā),所以并發(fā)能力很強(qiáng),性能極其好,延時(shí)很低管理界面較豐富 MQ功能比較完備,擴(kuò)展性佳 只支持主要的MQ功能,像一些消息查詢,消息回溯等功能沒(méi)有提供,畢竟是為大數(shù)據(jù)準(zhǔn)備的,在大數(shù)據(jù)領(lǐng)域應(yīng)用廣。

centos安裝rabbitmq

命令安裝

1、更新系統(tǒng)

yum update -y

2、安裝依賴包erlang

yum install erlang -y

3、添加rabbitMQ倉(cāng)庫(kù)

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

4、安裝rabbitMQ

yum install rabbitmq-server -y

erlang安裝成功

添加rabbitMQ倉(cāng)庫(kù)

安裝rabbitMQ成功

手動(dòng)安裝

1.官網(wǎng)地址

Installing RabbitMQ | RabbitMQ

2.文件上傳

上傳到/usr/local/software 目錄下(如果沒(méi)有 software 需要自己創(chuàng)建)

3.安裝文件(分別按照以下順序安裝)

rpm -ivh erlang-21.3-1.el7.x86_64.rpm

yum install socat -y

rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

常用命令(按照以下順序執(zhí)行)

添加開(kāi)機(jī)啟動(dòng) RabbitMQ 服務(wù)

chkconfig rabbitmq-server on

systemctl enable rabbitmq-server.service

啟動(dòng)服務(wù)

/sbin/service rabbitmq-server start

查看服務(wù)狀態(tài)

/sbin/service rabbitmq-server status

停止服務(wù)(選擇執(zhí)行)

/sbin/service rabbitmq-server stop

開(kāi)啟 web 管理插件

rabbitmq-plugins enable rabbitmq_management

用默認(rèn)賬號(hào)密碼(guest)訪問(wèn)地址http://62.234.175.16:15672/出現(xiàn)權(quán)限問(wèn)題

添加一個(gè)新的用戶

創(chuàng)建賬號(hào)

rabbitmqctl add_user admin 123

設(shè)置用戶角色

rabbitmqctl set_user_tags admin administrator

設(shè)置用戶權(quán)限

#set_permissions [-p ]

rabbitmqctl set_permissions -p "/" admin "." "." ".*"

用戶 user_admin 具有/vhost1 這個(gè) virtual host 中所有資源的配置、寫(xiě)、讀權(quán)限的

查看當(dāng)前用戶和角色

rabbitmqctl list_users

5.再次利用 admin 用戶登錄

重置命令

關(guān)閉應(yīng)用的命令為

rabbitmqctl stop_app

清除的命令為

rabbitmqctl reset

重新啟動(dòng)命令為

rabbitmqctl start_app

簡(jiǎn)單案例

1、導(dǎo)入依賴

com.rabbitmq

amqp-client

5.8.0

commons-io

commons-io

2.6

2、消息生產(chǎn)者

package com.example.rabbitmq.demo1;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Producer {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

// 創(chuàng)建一個(gè)連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1");

factory.setUsername("guest");

factory.setPassword("guest");

//channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉

try(Connection connection = factory.newConnection(); Channel channel =

connection.createChannel()) {

/**

* 生成一個(gè)隊(duì)列

* 1. 隊(duì)列名稱

* 2. 隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中

* 3. 該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)

* 4. 是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開(kāi)連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除

* 5. 其他參數(shù)

*/

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

String message="hello world";

/**

* 發(fā)送一個(gè)消息

* 1. 發(fā)送到那個(gè)交換機(jī)

* 2. 路由的 key 是哪個(gè)

* 3. 其他的參數(shù)信息

* 4. 發(fā)送消息的消息體

*/

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

System.out.println(" 消息發(fā)送完畢");

}

}

}

3、消息消費(fèi)者

package com.example.rabbitmq.demo1;

import com.rabbitmq.client.*;

public class Consumer {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

System.out.println(" 等待接收消息....");

// 推送的消息如何進(jìn)行消費(fèi)的接口回調(diào)

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String message= new String(delivery.getBody());

System.out.println(message);

};

// 取消消費(fèi)的一個(gè)回調(diào)接口 如在消費(fèi)的時(shí)候隊(duì)列被刪除掉了

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(" 消息消費(fèi)被中斷");

};

/**

* 消費(fèi)者消費(fèi)消息

* 1. 消費(fèi)哪個(gè)隊(duì)列

* 2. 消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答

* 3. 消費(fèi)者未成功消費(fèi)的回調(diào)

*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

RabbitMQ的六種消息模式

1.簡(jiǎn)單模式(Simple): - 這是一個(gè)一對(duì)一的消息模式。生產(chǎn)者將消息發(fā)送到隊(duì)列,一個(gè)消費(fèi)者從隊(duì)列中獲取消息。當(dāng)多個(gè)消費(fèi)者同時(shí)監(jiān)聽(tīng)一個(gè)隊(duì)列時(shí),他們并不能同時(shí)消費(fèi)一條消息,而是隨機(jī)消費(fèi)消息。一旦消息被消費(fèi)者接收,它就會(huì)從隊(duì)列中刪除。 2.工作隊(duì)列模式(Work Queues): - 這是一個(gè)一對(duì)多的消息模式。生產(chǎn)者將消息發(fā)送到隊(duì)列,多個(gè)消費(fèi)者從隊(duì)列中獲取消息。消息會(huì)在消費(fèi)者之間平均分配,但具體的分發(fā)機(jī)制(輪詢或公平分發(fā))可以配置。這種模式適用于執(zhí)行資源密集型任務(wù),單個(gè)消費(fèi)者處理不過(guò)來(lái),需要多個(gè)消費(fèi)者進(jìn)行處理。 3.發(fā)布訂閱模式(Publish/Subscribe 或 Fanout): - 這是一個(gè)廣播式的消息傳遞方式。生產(chǎn)者發(fā)送消息到交換機(jī),交換機(jī)將消息廣播到所有與之綁定的隊(duì)列,每個(gè)隊(duì)列對(duì)應(yīng)的消費(fèi)者都可以接收到消息。 4.路由模式(Routing): - 類似于發(fā)布訂閱模式,但增加了路由鍵(Routing Key)的概念。生產(chǎn)者發(fā)送消息時(shí),會(huì)指定一個(gè)路由鍵。交換機(jī)根據(jù)路由鍵將消息發(fā)送到匹配的隊(duì)列,再由隊(duì)列對(duì)應(yīng)的消費(fèi)者消費(fèi)。 5.主題模式(Topics): - 這是路由模式的擴(kuò)展。路由鍵可以是多個(gè)單詞的字符串,用.分隔。消費(fèi)者可以通過(guò)通配符(* 和 #)來(lái)匹配路由鍵,從而接收消息。* 匹配一個(gè)單詞,# 匹配一個(gè)或多個(gè)單詞。 6.RPC模式(Remote Procedure Call): - RPC 是一種通信機(jī)制,允許客戶端發(fā)送請(qǐng)求到遠(yuǎn)程服務(wù),并等待響應(yīng)。RabbitMQ 可以用于實(shí)現(xiàn) RPC,客戶端發(fā)送請(qǐng)求消息到隊(duì)列,服務(wù)器從隊(duì)列中獲取請(qǐng)求并處理,然后將結(jié)果發(fā)送回客戶端。

1、簡(jiǎn)單模式(點(diǎn)對(duì)點(diǎn))

。在這個(gè)模式中,一個(gè)生產(chǎn)者(Producer)發(fā)送消息到一個(gè)隊(duì)列(Queue),然后一個(gè)消費(fèi)者(Consumer)從該隊(duì)列中接收并處理這些消息。這個(gè)模式?jīng)]有使用交換機(jī)(Exchange),而是直接將消息發(fā)送到隊(duì)列。

案例參照上述簡(jiǎn)單案例

2、工作隊(duì)列模式(Work Queues)

工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。

相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)

程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。

1)輪詢分發(fā)消息

案例:一個(gè)生產(chǎn)者發(fā)送消息,兩個(gè)消費(fèi)者接收消息

①抽取工具類

package com.example.rabbitmq.demo2;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {

// 得到一個(gè)連接的 channel

public static Channel getChannel() throws Exception{

// 創(chuàng)建一個(gè)連接工廠

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1");

factory.setUsername("guest");

factory.setPassword("guest");

//設(shè)置VirtualHost

// factory.setVirtualHost("/jxHosts");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

return channel;

}

}

②啟動(dòng)兩個(gè)工作線程

package com.example.rabbitmq.demo2;

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

/**

* 演示消息輪詢機(jī)制

*/

public class Worker01 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println(" 接收到消息:"+receivedMessage);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi)......");

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

================================================================================

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

/**

* 演示消息輪詢機(jī)制

*/

public class Worker02 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println(" 接收到消息:"+receivedMessage);

};

CancelCallback cancelCallback=(consumerTag)->{

System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

};

System.out.println("C2 消費(fèi)者啟動(dòng)等待消費(fèi)......");

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}

}

③啟動(dòng)一個(gè)發(fā)送線程

package com.example.rabbitmq.demo2;

import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**

* 演示消息輪詢機(jī)制

*/

public class Task01 {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

try(Channel channel=RabbitMqUtils.getChannel();) {

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

System.out.println("任務(wù)一,請(qǐng)輸入消息:");

// 從控制臺(tái)當(dāng)中接受信息

Scanner scanner = new Scanner(System.in);

while (scanner.hasNext()){

String message = scanner.next();

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

System.out.println(" 發(fā)送消息完成:"+message);

}

}

}

}

④結(jié)果展示

2)消息應(yīng)答

① 概念

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成

了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消

息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù)

發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。

為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi) 者在接

收到消息并且處理該消息之后,告訴 q rabbitmq 它已經(jīng)處理了,q rabbitmq 可以把該消息刪除了。

② 自動(dòng)應(yīng)答

消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在 高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)

衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟

失了,當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制,

當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終

使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死, 所以這種模式僅適用在消費(fèi)者可以高效并

以某種速率能夠處理這些消息的情況下使用。

RabbitMQ默認(rèn)采用自動(dòng)應(yīng)答

③ 消息應(yīng)答的方法

A.Channel.basicAck(用于肯定確認(rèn))

RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了

B B.Channel.basicNack(用于否定確認(rèn))

C C.Channel.basicReject(用于否定確認(rèn))

與 Channel.basicNack 相比少一個(gè)參數(shù)

不處理該消息了直接拒絕,可以將其丟棄了

④ 手動(dòng)應(yīng)答

手動(dòng)應(yīng)答的好處是可以批量應(yīng)答并且減少網(wǎng)絡(luò)擁堵

multiple 的 true 和 false 代表不同意思

true 代表批量應(yīng)答 channel 上未應(yīng)答的消息

比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答

false 同上面相比,只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答

手動(dòng)應(yīng)答的好處

如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確保不會(huì)丟失任何消息。

案例:手動(dòng)應(yīng)答,一個(gè)生產(chǎn)者,兩個(gè)消費(fèi)者

① 睡眠工具類

package com.example.rabbitmq.demo2;

public class SleepUtils {

public static void sleep(int second){

try {

Thread.sleep(1000*second);

} catch (InterruptedException _ignored) {

Thread.currentThread().interrupt();

}

}

② 生產(chǎn)者

package com.example.rabbitmq.demo2;

import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**

* 確認(rèn)應(yīng)答機(jī)制

*/

public class Task02 {

private static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {

channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

System.out.println("確認(rèn)應(yīng)答機(jī)制");

Scanner sc = new Scanner(System.in);

System.out.println(" 請(qǐng)輸入信息");

while (sc.hasNext()) {

String message = sc.nextLine();

channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));

System.out.println(" 生產(chǎn)者發(fā)出消息" + message);

}

}

}

}

③ 消費(fèi)者01

package com.example.rabbitmq.demo2;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

public class Work03 {

private static final String ACK_QUEUE_NAME="ack-queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("C1 等待接收消息處理時(shí)間較短");

// 消息消費(fèi)的時(shí)候如何處理消息

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String message= new String(delivery.getBody());

SleepUtils.sleep(1);

System.out.println(" 接收到消息:"+message);

/**

* 1. 消息標(biāo)記 tag

* 2. 是否批量應(yīng)答未應(yīng)答消息

*/

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

// 采用手動(dòng)應(yīng)答

boolean autoAck=false;

channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{

System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

});

}

}

④ 消費(fèi)者02

package com.example.rabbitmq.demo2;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

/**

* 確認(rèn)應(yīng)答機(jī)制

*

*/

public class Work04 {

private static final String ACK_QUEUE_NAME="ack-queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("C2 等待接收消息處理時(shí)間較長(zhǎng)");

// 消息消費(fèi)的時(shí)候如何處理消息

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String message= new String(delivery.getBody());

SleepUtils.sleep(30);

System.out.println(" 接收到消息:"+message);

/**

* 1. 消息標(biāo)記 tag

* 2. 是否批量應(yīng)答未應(yīng)答消息

*/

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

};

// 采用手動(dòng)應(yīng)答

boolean autoAck=false;

channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{

System.out.println(consumerTag+" 消費(fèi)者取消消費(fèi)接口回 調(diào)邏輯");

});

}

}

⑤ 結(jié)果展示

在發(fā)送者發(fā)送消息 dd,發(fā)出消息之后的把 C2 消費(fèi)者停掉,按理說(shuō)該 C2 來(lái)處理該消息,但是

由于它處理時(shí)間較長(zhǎng),在還未處理完,也就是說(shuō) C2 還沒(méi)有執(zhí)行 ack 代碼的時(shí)候,C2 被停掉了,

此時(shí)會(huì)看到消息被 C1 接收到了,說(shuō)明消息 dd 被重新入隊(duì),然后分配給能處理消息的 C1 處理了

RabbitMQ的四種交換機(jī)類型

1.Fanout Exchange(扇型交換機(jī)): - 這種交換機(jī)將消息廣播到與之綁定的所有隊(duì)列,無(wú)論消息的路由鍵是什么。它也通常用于發(fā)布/訂閱模式,其中一個(gè)消息被廣播給所有訂閱者。 2.Direct Exchange(直連交換機(jī)): - 這種交換機(jī)根據(jù)消息的路由鍵(Routing Key)將消息發(fā)送到與之完全匹配的隊(duì)列。 3.Topic Exchange(主題交換機(jī)): - 這種交換機(jī)根據(jù)消息的路由鍵與隊(duì)列綁定時(shí)指定的路由鍵模式(通配符)匹配程度,將消息路由到一個(gè)或多個(gè)隊(duì)列。它通常用于發(fā)布/訂閱模式和復(fù)雜的消息路由需求。 4.Headers Exchange(頭交換機(jī)): - 這種交換機(jī)根據(jù)消息的標(biāo)頭信息(Headers)來(lái)決定消息的路由,而不是使用路由鍵。它允許開(kāi)發(fā)者在消息頭中定義多個(gè)屬性,并使用這些屬性來(lái)過(guò)濾消息。

RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會(huì)直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊(duì)列中。 相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡(jiǎn)單,一方面它接收來(lái) 自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說(shuō)把他們到許多隊(duì)列中還是說(shuō)應(yīng)該丟棄它們。這就的由交換機(jī)的類型來(lái)決定。

1、Fanout Exchange(扇型交換機(jī))

將接收到的所有消息廣播到它知道的所有隊(duì)列中

案例:

① 生產(chǎn)者

package com.example.rabbitmq.demo4;

import com.example.rabbitmq.demo3.RabbitMQConnection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class ProducerFanout {

/**

* 定義交換機(jī)的名稱

*/

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

// 創(chuàng)建Connection

Connection connection = RabbitMQConnection.getConnection();

// 創(chuàng)建Channel

Channel channel = connection.createChannel();

// 通道關(guān)聯(lián)交換機(jī)

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);

String msg = "交換機(jī)樣例演示";

channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

channel.close();

connection.close();

}

}

② 郵件消費(fèi)者

package com.example.rabbitmq.demo4;

import com.example.rabbitmq.demo3.RabbitMQConnection;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class MailConsumer {

/**

* 定義郵件隊(duì)列

*/

private static final String QUEUE_NAME = "fanout_email_queue";

/**

* 定義交換機(jī)的名稱

*/

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

System.out.println("郵件消費(fèi)者...");

// 創(chuàng)建我們的連接

Connection connection = RabbitMQConnection.getConnection();

// 創(chuàng)建我們通道

final Channel channel = connection.createChannel();

// 關(guān)聯(lián)隊(duì)列消費(fèi)者關(guān)聯(lián)隊(duì)列

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "UTF-8");

System.out.println("郵件消費(fèi)者獲取消息:" + msg);

}

};

// 開(kāi)始監(jiān)聽(tīng)消息 自動(dòng)簽收

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

}

}

③ 短信消費(fèi)者

package com.example.rabbitmq.demo4;

import com.example.rabbitmq.demo3.RabbitMQConnection;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class SmsConsumer {

/**

* 定義短信隊(duì)列

*/

private static final String QUEUE_NAME = "fanout_email_sms";

/**

* 定義交換機(jī)的名稱

*/

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

System.out.println("短信消費(fèi)者...");

// 創(chuàng)建我們的連接

Connection connection = RabbitMQConnection.getConnection();

// 創(chuàng)建我們通道

final Channel channel = connection.createChannel();

// 關(guān)聯(lián)隊(duì)列消費(fèi)者關(guān)聯(lián)隊(duì)列

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "UTF-8");

System.out.println("短信消費(fèi)者獲取消息:" + msg);

}

};

// 開(kāi)始監(jiān)聽(tīng)消息 自動(dòng)簽收

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

}

}

④ 結(jié)果展示

2、Direct Exchange(直連交換機(jī))

消息只去到它綁定的routingKey 隊(duì)列中去

案例:一個(gè)生產(chǎn)者,兩個(gè)消費(fèi)者

①生產(chǎn)者

package com.example.rabbitmq.demo5;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import java.util.HashMap;

import java.util.Map;

public class EmitLogDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 創(chuàng)建多個(gè) bindingKey

Map bindingKeyMap = new HashMap<>();

bindingKeyMap.put("info"," 普通 info 信息");

bindingKeyMap.put("warning"," 警告 warning 信息");

bindingKeyMap.put("error"," 錯(cuò)誤 error 信息");

//debug 沒(méi)有消費(fèi)這接收這個(gè)消息 所有就丟失了

bindingKeyMap.put("debug"," 調(diào)試 debug 信息");

for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()){

String bindingKey = bindingKeyEntry.getKey();

String message = bindingKeyEntry.getValue();

channel.basicPublish(EXCHANGE_NAME,bindingKey, null,

message.getBytes("UTF-8"));

System.out.println(" 生產(chǎn)者發(fā)出消息:" + message);

}

}

}

}

②消費(fèi)者01

package com.example.rabbitmq.demo5;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import org.apache.commons.io.FileUtils;

import java.io.File;

public class ReceiveLogsDirect01 {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

String queueName = "disk";

channel.queueDeclare(queueName, false, false, false, null);

channel.queueBind(queueName, EXCHANGE_NAME, "error");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

message=" 接收綁定鍵:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message;

File file = new File("D:\\rabbitmq_info.txt");

FileUtils.writeStringToFile(file,message,"UTF-8");

System.out.println(" 錯(cuò)誤日志已經(jīng)接收");

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

});

}

}

③消費(fèi)者02

package com.example.rabbitmq.demo5;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect02 {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

String queueName = "console";

channel.queueDeclare(queueName, false, false, false, null);

channel.queueBind(queueName, EXCHANGE_NAME, "info");

channel.queueBind(queueName, EXCHANGE_NAME, "warning");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" 接 收 綁 定 鍵 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

});

}

}

④結(jié)果展示:

3、Topic Exchange(主題交換機(jī))

盡管使用 direct 交換機(jī)改進(jìn)了我們的系統(tǒng),但是它仍然存在局限性-比方說(shuō)我們想接收的日志類型有info.base 和 info.advantage,某個(gè)隊(duì)列只想 info.base 的消息,那這個(gè)時(shí)候 direct 就辦不到了。這個(gè)時(shí)候就只能使用 topic 類型

發(fā)送到類型是 topic 交換機(jī)的消息的 routing_key 不能隨意寫(xiě),必須滿足一定的要求,它必須是一個(gè)單詞列表,以點(diǎn)號(hào)分隔開(kāi)。這些單詞可以是任意單詞,比如說(shuō):"stock.usd.nyse", "nyse.vmw",

"quick.orange.rabbit".這種類型的。當(dāng)然這個(gè)單詞列表最多不能超過(guò) 255 個(gè)字節(jié)。

在這個(gè)規(guī)則列表中,其中有兩個(gè)替換符是大家需要注意的

*(星號(hào))可以代替一個(gè)單詞

#(井號(hào))可以替代零個(gè)或多個(gè)單詞

案例1:使用全匹配

生產(chǎn)者

package com.example.rabbitmq.demo6;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class Producer {

public static void main(String[] args) throws Exception {

//連接

ConnectionFactory f = new ConnectionFactory();

f.setHost("127.0.0.1"); // wht6.cn

// f.setPort(5672); //默認(rèn)端口可以省略

f.setUsername("guest");

f.setPassword("guest");

Channel c = f.newConnection().createChannel();

//定義交換機(jī)

c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);

//向交換機(jī)發(fā)消息,并攜帶路由鍵關(guān)鍵詞

while (true) {

System.out.print("輸入消息:");

String s = new Scanner(System.in).nextLine();

System.out.print("輸入路由鍵:");

String key = new Scanner(System.in).nextLine();

c.basicPublish("topic_logs", key, null, s.getBytes());

}

}

}

消費(fèi)者

package com.example.rabbitmq.demo6;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.Scanner;

import java.util.UUID;

public class Consumer {

public static void main(String[] args) throws Exception {

//連接

ConnectionFactory f = new ConnectionFactory();

f.setHost("127.0.0.1"); // wht6.cn

// f.setPort(5672); //默認(rèn)端口可以省略

f.setUsername("guest");

f.setPassword("guest");

Channel c = f.newConnection().createChannel();

// 定義隨機(jī)隊(duì)列,定義交換機(jī),用綁定鍵綁定

String queue = UUID.randomUUID().toString();

c.queueDeclare(queue, false, true, true, null);

c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);

System.out.println("輸入綁定鍵,用空格隔開(kāi):");

String s = new Scanner(System.in).nextLine();// "aa bb cc"-->["aa", "bb", "cc"]

String[] a = s.split("\\s+");

for (String key : a) {

c.queueBind(queue, "topic_logs", key);

}

// 從隨機(jī)隊(duì)列正常消費(fèi)數(shù)據(jù)

DeliverCallback deliverCallback = new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

String key = message.getEnvelope().getRoutingKey();

String s = new String(message.getBody());

System.out.println(key+" - 收到: "+s);

}

};

CancelCallback cancelCallback = new CancelCallback() {

@Override

public void handle(String consumerTag) throws IOException {

}

};

// 消費(fèi)數(shù)據(jù)

c.basicConsume(queue, true, deliverCallback, cancelCallback);

}

}

結(jié)果:

案例2

生產(chǎn)者

package com.example.rabbitmq.demo6;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.Channel;

import java.util.HashMap;

import java.util.Map;

public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

/**

* Q1--> 綁定的是

* 中間帶 orange 帶 3 個(gè)單詞的字符串 (*.orange.*)

* Q2--> 綁定的是

* 最后一個(gè)單詞是 rabbit 的 3 個(gè)單詞 (*.*.rabbit)

* 第一個(gè)單詞是 lazy 的多個(gè)單詞 (lazy.#)

*

*/

Map bindingKeyMap = new HashMap<>();

bindingKeyMap.put("quick.orange.rabbit"," 被隊(duì)列 Q1Q2 接收到");

bindingKeyMap.put("lazy.orange.elephant"," 被隊(duì)列 Q1Q2 接收到");

bindingKeyMap.put("quick.orange.fox"," 被隊(duì)列 Q1 接收到");

bindingKeyMap.put("lazy.brown.fox"," 被隊(duì)列 Q2 接收到");

bindingKeyMap.put("lazy.pink.rabbit"," 雖然滿足兩個(gè)綁定但只被隊(duì)列 Q2 接收一次");

bindingKeyMap.put("quick.brown.fox"," 不匹配任何綁定不會(huì)被任何隊(duì)列接收到會(huì)被丟棄");

bindingKeyMap.put("quick.orange.male.rabbit"," 是四個(gè)單詞不匹配任何綁定會(huì)被丟棄");

bindingKeyMap.put("lazy.orange.male.rabbit"," 是四個(gè)單詞但匹配 Q2");

for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()){

String bindingKey = bindingKeyEntry.getKey();

String message = bindingKeyEntry.getValue();

channel.basicPublish(EXCHANGE_NAME,bindingKey, null,

message.getBytes("UTF-8"));

System.out.println(" 生產(chǎn)者發(fā)出消息" + message);

}

}

}

}

消費(fèi)者01

package com.example.rabbitmq.demo6;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic01 {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 聲明 Q1 隊(duì)列與綁定關(guān)系

String queueName="Q1";

channel.queueDeclare(queueName, false, false, false, null);

channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" 接 收 隊(duì) 列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

});

}

}

消費(fèi)者02

package com.example.rabbitmq.demo6;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic02 {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 聲明 Q2 隊(duì)列與綁定關(guān)系

String queueName="Q2";

channel.queueDeclare(queueName, false, false, false, null);

channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");

channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" 接 收 隊(duì) 列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

});

}

}

結(jié)果:

SpringBoot整合RabbitMq

Maven依賴

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-amqp

org.apache.commons

commons-lang3

com.alibaba

fastjson

1.2.49

org.projectlombok

lombok

配置文件

application.yml

spring:

rabbitmq:

host: 127.0.0.1 # wht6.cn

port: 5672

username: guest

password: guest

virtual-host: /jxHosts

listener:

simple:

prefetch: 1 # 預(yù)抓取消息數(shù),spring默認(rèn)250

方式一:聚合項(xiàng)目

Main

package cn.tedu.rabbitmqspring.m5;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

import java.util.UUID;

@SpringBootApplication

public class Main {

public static void main(String[] args) {

SpringApplication.run(Main.class, args);

}

//定義交換機(jī)

@Bean

public TopicExchange logsExchange() {

// 參數(shù): 1.名稱 2.持久 3.自動(dòng)刪除

// 默認(rèn)是持久交換機(jī)

return new TopicExchange("topic_logs", false, false);

}

/*

默認(rèn)使用方法名作為對(duì)象的屬性名放入spring容器

"rndQueue" ---- Queue實(shí)例

*/

@Bean

public Queue rndQueue() {

return new Queue(UUID.randomUUID().toString(), false, true, true);

}

//用生產(chǎn)者發(fā)送消息

@Autowired

private Producer p;

/*

@PostConstruct

spring在完成掃描創(chuàng)建實(shí)例,完成所有的依賴注入后,

會(huì)自動(dòng)地執(zhí)行 @PostConstruct 方法

spring單線程執(zhí)行下面的流程:

創(chuàng)建實(shí)例 --> 完成注入 --> @PostConstruct --> 后續(xù)配置

*/

@PostConstruct

public void test() {

// new Thread(new Runnable() {

// @Override

// public void run() {

// //線程中執(zhí)行的代碼

// }

// }).start();

// lambda 表達(dá)式,匿名內(nèi)部類的簡(jiǎn)寫(xiě)

new Thread(() -> p.send()).start();

}

}

生產(chǎn)者Producer

package cn.tedu.rabbitmqspring.m5;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component

public class Producer {

// RabbitAutoConfiguration 自動(dòng)配置類中

// 自動(dòng)創(chuàng)建 AmqpTemplate 實(shí)例

@Autowired

private AmqpTemplate t;

//生產(chǎn)者的發(fā)送方法,需要手動(dòng)調(diào)用

public void send() {

while (true) {

System.out.print("輸入消息:");

String s = new Scanner(System.in).nextLine();

System.out.print("輸入路由鍵:");

String k = new Scanner(System.in).nextLine();

t.convertAndSend("topic_logs", k, s);

}

}

}

消費(fèi)者Consumer

package cn.tedu.rabbitmqspring.m5;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class Consumer {

// 啟動(dòng)一個(gè)消費(fèi)者,從隊(duì)列接收消息,并傳遞給這個(gè)方法進(jìn)行處理

// 消費(fèi)者自動(dòng)啟動(dòng),不需要手動(dòng)調(diào)用

// spring expression language - SPEL #{} 可以直接訪問(wèn)spring容器中的對(duì)象

// ${} OGNL

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "#{rndQueue.name}",declare="false"), //隊(duì)列,不指定參數(shù),由服務(wù)器自動(dòng)命名,非持久,獨(dú)占,自動(dòng)刪除

exchange = @Exchange(name = "topic_logs",declare="false"), //交換機(jī),declare="false"使用存在的交換機(jī)而不重復(fù)定義

key = {"*.orange.*"}

))

public void receive1(String msg) {

System.out.println("消費(fèi)者1收到:"+msg);

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue, //隊(duì)列,不指定參數(shù),由服務(wù)器自動(dòng)命名,非持久,獨(dú)占,自動(dòng)刪除

exchange = @Exchange(name = "topic_logs",declare="false"), //交換機(jī),declare="false"使用存在的交換機(jī)而不重復(fù)定義

key = {"*.*.rabbit", "lazy.#"}

))

public void receive2(String msg) {

System.out.println("消費(fèi)者2收到:"+msg);

}

}

方式二:分離項(xiàng)目

分為三個(gè)子項(xiàng)目:生產(chǎn)者,短信消費(fèi)者,郵件消費(fèi)者

消息實(shí)體類

package com.boyatop.entity;

import lombok.Data;

import java.io.Serializable;

/**

* @ClassName MsgEntity

* @Author www.boyatop.com

* @Version V1.0

**/

@Data

public class MsgEntity implements Serializable {

private String msgId;

private String userId;

private String phone;

private String email;

public MsgEntity(String msgId, String userId, String phone, String email) {

this.msgId = msgId;

this.userId = userId;

this.phone = phone;

this.email = email;

}

}

生產(chǎn)者

配置類

package com.boyatop.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Component;

/**

* @ClassName RabbitMQConfig

* @Author www.boyatop.com

* @Version V1.0

**/

@Component

public class RabbitMQConfig {

/**

* 定義交換機(jī)

*/

private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_ex";

/**

* 短信隊(duì)列

*/

private String FANOUT_SMS_QUEUE = "fanout_sms_queue";

/**

* 郵件隊(duì)列

*/

private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";

// 1.注入隊(duì)列和交換機(jī)注入到spring容器中

// 2.關(guān)聯(lián)交換機(jī)

/**

* 郵件和短信隊(duì)列注入到spring容器中

* @return

*/

@Bean

public Queue smsQueue() {

return new Queue(FANOUT_SMS_QUEUE);

}

@Bean

public Queue emailQueue() {

return new Queue(FANOUT_EMAIL_QUEUE);

}

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);

}

/**

* 關(guān)聯(lián)交換機(jī)

* 根據(jù)參數(shù)名稱 ioc獲取 Queue對(duì)象

*/

@Bean

public Binding BindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(smsQueue).to(fanoutExchange);

}

@Bean

public Binding BindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(emailQueue).to(fanoutExchange);

}

}

生產(chǎn)者Controller

package com.boyatop.controller;

import com.boyatop.entity.MsgEntity;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**

* @ClassName ProducerController

* @Author www.boyatop.com

* @Version V1.0

**/

@RestController

public class ProducerController {

@Autowired

private AmqpTemplate amqpTemplate;

@RequestMapping("/sendMsg")

public void sendMsg() {

/**

* 參數(shù)1 交換機(jī)名稱

* 參數(shù)2 路由key

* 參數(shù)3 發(fā)送內(nèi)容

*/

MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),

"1234", "181111111", "1019902226@qq.com");

amqpTemplate.convertAndSend("boyatop_ex", "", msgEntity);

}

}

短信消費(fèi)者

package com.boyatop;

import com.boyatop.entity.MsgEntity;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

* @ClassName FanoutSmsConsumer

* @Author www.boyatop.com

* @Version V1.0

**/

@Slf4j

@Component

@RabbitListener(queues = "fanout_sms_queue")

public class FanoutSmsConsumer {

@RabbitHandler

public void process(MsgEntity msgEntity) {

log.info("sms:msgEntity:" + msgEntity);

}

}

郵件消費(fèi)者

package com.boyatop;

import com.boyatop.entity.MsgEntity;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

* @ClassName FanoutEmailConsumer

* @Author www.boyatop.com

* @Version V1.0

**/

@Slf4j

@Component

@RabbitListener(queues = "fanout_email_queue")

public class FanoutEmailConsumer {

@RabbitHandler

public void process(MsgEntity msgEntity) {

log.info("email:msgEntity:" + msgEntity);

}

}

SpringBoot開(kāi)啟消息確認(rèn)機(jī)制+解決冪等性問(wèn)題+生產(chǎn)者獲取消費(fèi)結(jié)果

創(chuàng)建SpringBoot項(xiàng)目,導(dǎo)入相關(guān)依賴

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.7.4

com.example

order-producer-consumer

0.0.1-SNAPSHOT

order-producer-consumer

order-producer-consumer

1.8

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-test

test

org.mybatis.spring.boot

mybatis-spring-boot-starter

1.1.1

mysql

mysql-connector-java

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-amqp

org.apache.commons

commons-lang3

com.alibaba

fastjson

1.2.49

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-maven-plugin

配置文件application.yml

spring:

rabbitmq:

####連接地址

host: 127.0.0.1

####端口號(hào)

port: 5672

####賬號(hào)

username: guest

####密碼

password: guest

### 地址

virtual-host: /jxHosts

listener:

simple:

retry:

##開(kāi)啟消費(fèi)者(程序出現(xiàn)異常的情況下)進(jìn)行重試

enabled: true

##最大重試次數(shù)

max-attempts: 5

##重試間隔時(shí)間

initial-interval: 3000

##消費(fèi)者需要手動(dòng)發(fā)送確認(rèn)信號(hào)

acknowledge-mode: manual

datasource:

driver-class-name: com.mysql.cj.jdbc.Driver

url: jdbc:mysql://localhost:3306/first?useUnicode=true&characterEncoding=utf-8

username: root

password: 123456

server:

port: 9090

RabbitMq配置類

package com.example.orderproducerconsumer.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Component;

/**

* @ClassName RabbitMQConfig

* @Author

* @Version V1.0

**/

@Component

public class RabbitMQConfig {

/**

* 定義交換機(jī)

*/

private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_order";

/**

* 訂單隊(duì)列

*/

private String FANOUT_ORDER_QUEUE = "fanout_order_queue";

/**

* 配置orderQueue

*

* @return

*/

@Bean

public Queue orderQueue() {

return new Queue(FANOUT_ORDER_QUEUE);

}

/**

* 配置fanoutExchange

*

* @return

*/

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);

}

// 綁定交換機(jī) orderQueue

@Bean

public Binding bindingOrderFanoutExchange(Queue orderQueue, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(orderQueue).to(fanoutExchange);

}

}

消息記錄實(shí)體類

package com.example.orderproducerconsumer.entity;

import java.io.Serializable;

public class OrderEntity implements Serializable {

private int id;

private String orderName;

private String orderId;

public OrderEntity(String orderName, String orderId) {

this.orderName = orderName;

this.orderId = orderId;

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getOrderName() {

return orderName;

}

public void setOrderName(String orderName) {

this.orderName = orderName;

}

public String getOrderId() {

return orderId;

}

public void setOrderId(String orderId) {

this.orderId = orderId;

}

public OrderEntity(int id, String orderName, String orderId) {

this.id = id;

this.orderName = orderName;

this.orderId = orderId;

}

public OrderEntity() {

}

}

Mapper

package com.example.orderproducerconsumer.mapper;

import com.example.orderproducerconsumer.entity.OrderEntity;

import org.apache.ibatis.annotations.Insert;

import org.apache.ibatis.annotations.Mapper;

import org.apache.ibatis.annotations.Select;

import org.springframework.stereotype.Component;

@Mapper

public interface OrderMapper {

@Insert("insert order_info values (null,#{orderName},#{orderId})")

int addOrder(OrderEntity orderEntity);

@Select("SELECT * from order_info where orderId=#{orderId} ")

OrderEntity getOrder(String orderId);

}

controller

package com.example.orderproducerconsumer.controller;

import com.example.orderproducerconsumer.entity.OrderEntity;

import com.example.orderproducerconsumer.mapper.OrderMapper;

import com.example.orderproducerconsumer.producer.OrderProducer;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@Slf4j

@RestController

public class OrderController {

@Autowired

private OrderProducer orderProducer;

@Autowired

private OrderMapper orderMapper;

@RequestMapping("/sendOrder")

public String sendOrder() {

// 生成全局id

String orderId = System.currentTimeMillis() + "";

log.info("orderId:{}", orderId);

String orderName = " boyatop";

orderProducer.sendMsg(orderName, orderId);

return orderId;

}

/**

* 前端主動(dòng)根據(jù)orderId定時(shí)查詢

*

* @param orderId

* @return

*/

@RequestMapping("/getOrder")

public Object getOrder(String orderId) {

OrderEntity order = orderMapper.getOrder(orderId);

if (order == null) {

return "該訂單沒(méi)有被消費(fèi)或者訂單號(hào)錯(cuò)誤!";

}

return order;

}

}

生產(chǎn)者

package com.example.orderproducerconsumer.producer;

import com.alibaba.fastjson.JSONObject;

import com.example.orderproducerconsumer.entity.OrderEntity;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

* @ClassName OrderProducer

* @Author

* @Version V1.0

**/

@Component

@Slf4j

public class OrderProducer implements RabbitTemplate.ConfirmCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

String id = correlationData.getId();

log.info("id:" + id);

}

/**

* 使用mq發(fā)送消息

*

* @param orderName

* @param orderId

*/

public void sendMsg(String orderName, String orderId) {

OrderEntity orderEntity = new OrderEntity(orderName, orderId);

rabbitTemplate.convertAndSend("boyatop_order", "", orderEntity, message -> {

return message;

});

// CorrelationData correlationData = new CorrelationData();

// correlationData.setId(JSONObject.toJSONString(orderEntity));

// rabbitTemplate.convertAndSend("/boyatop_order", "", orderEntity,correlationData);

}

}

消費(fèi)者

package com.example.orderproducerconsumer.consumer;

import com.example.orderproducerconsumer.entity.OrderEntity;

import com.example.orderproducerconsumer.mapper.OrderMapper;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

* @ClassName fanout_sms_queue

* @Author

* @Version V1.0

**/

@Slf4j

@Component

@RabbitListener(queues = "fanout_order_queue")

public class FanoutOrderConsumer {

@Autowired

private OrderMapper orderMapper;

@RabbitHandler

public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {

try {

log.info(">>orderEntity:{}<<", orderEntity.toString());

String orderId = orderEntity.getOrderId();

if (StringUtils.isEmpty(orderId)) {

return;

}

OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);

if (dbOrderEntity != null) {

log.info("另外消費(fèi)者已經(jīng)處理過(guò)該業(yè)務(wù)邏輯");

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

return;

}

int result = orderMapper.addOrder(orderEntity);

int i = 1 / 0;

log.info(">>插入數(shù)據(jù)庫(kù)中數(shù)據(jù)成功<<");

//開(kāi)啟消息確認(rèn)機(jī)制

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

// 記錄該消息日志形式 存放數(shù)據(jù)庫(kù)db中、后期通過(guò)定時(shí)任務(wù)實(shí)現(xiàn)消息補(bǔ)償、人工實(shí)現(xiàn)補(bǔ)償

//消息處理失敗,并將消息重新放回隊(duì)列

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

//將該消息存放到死信隊(duì)列中,單獨(dú)寫(xiě)一個(gè)死信消費(fèi)者實(shí)現(xiàn)消費(fèi)。

}

}

}

死信隊(duì)列

1、概念

死信,顧名思義就是無(wú)法被消費(fèi)的消息,字面意思可以這樣理解,一般來(lái)說(shuō),producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的原因?qū)е?queue 中的某些消息無(wú)法被消費(fèi),這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。

應(yīng)用場(chǎng)景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中.還有比如說(shuō): 用戶在商城下單成功并點(diǎn)擊去支付后在指定時(shí)間未支付時(shí)自動(dòng)失效

2、產(chǎn)生原因

1.??消息投遞到MQ中存放 消息已經(jīng)過(guò)期? 消費(fèi)者沒(méi)有及時(shí)的獲取到我們消息,消息如果存放到mq服務(wù)器中過(guò)期之后,會(huì)轉(zhuǎn)移到備胎死信隊(duì)列存放。

2.??隊(duì)列達(dá)到最大的長(zhǎng)度 (隊(duì)列容器已經(jīng)滿了)

3.????消費(fèi)者消費(fèi)多次消息失敗,就會(huì)轉(zhuǎn)移存放到死信隊(duì)列中;消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false.

3、死信隊(duì)列的架構(gòu)原理

死信隊(duì)列和普通隊(duì)列區(qū)別不是很大

普通與死信隊(duì)列都有自己獨(dú)立的交換機(jī)和路由key、隊(duì)列和消費(fèi)者。

區(qū)別:

1.生產(chǎn)者投遞消息先投遞到我們普通交換機(jī)中,普通交換機(jī)在將該消息投到

普通隊(duì)列中緩存起來(lái),普通隊(duì)列對(duì)應(yīng)有自己獨(dú)立普通消費(fèi)者。

2.如果生產(chǎn)者投遞消息到普通隊(duì)列中,普通隊(duì)列發(fā)現(xiàn)該消息一直沒(méi)有被消費(fèi)者消費(fèi)

的情況下,在這時(shí)候會(huì)將該消息轉(zhuǎn)移到死信(備胎)交換機(jī)中,死信(備胎)交換機(jī)

對(duì)應(yīng)有自己獨(dú)立的 死信(備胎)隊(duì)列 對(duì)應(yīng)獨(dú)立死信(備胎)消費(fèi)者。

4、死信實(shí)戰(zhàn)

1)消息TTL過(guò)期

1.生產(chǎn)者代碼

package com.example.rabbitmq.demo7;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

/**

* TTL時(shí)間過(guò)期

*/

public class Producer {

private static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 設(shè)置消息的 TTL 時(shí)間

AMQP.BasicProperties properties = new

AMQP.BasicProperties().builder().expiration("10000").build();

// 該信息是用作演示隊(duì)列個(gè)數(shù)限制

for (int i = 1; i <11 ; i++) {

String message="info"+i;

channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,

message.getBytes());

System.out.println(" 生產(chǎn)者發(fā)送消息:"+message);

}

}

}

}

2.消費(fèi)者01代碼(啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)

package com.example.rabbitmq.demo7;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;

import java.util.Map;

/**

* TTL時(shí)間過(guò)期

*/

public class Consumer01 {

// 普通交換機(jī)名稱

private static final String NORMAL_EXCHANGE = "normal_exchange";

// 死信交換機(jī)名稱

private static final String DEAD_EXCHANGE = "dead_exchange";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明死信和普通交換機(jī) 類型為 direct

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明死信隊(duì)列

String deadQueue = "dead-queue";

channel.queueDeclare(deadQueue, false, false, false, null);

// 死信隊(duì)列綁定死信交換機(jī)與 routingkey

channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

// 正常隊(duì)列綁定死信隊(duì)列信息

Map params = new HashMap<>();

// 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值

params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

// 正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值

params.put("x-dead-letter-routing-key", "lisi");

String normalQueue = "normal-queue";

channel.queueDeclare(normalQueue, false, false, false, params);

channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer01 接收到消息"+message);

};

channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {

});

}

}

3.消費(fèi)者02代碼( 以上步驟完成后 啟動(dòng) 2 C2 消費(fèi)者 它消費(fèi)死信隊(duì)列里面的消息)

package com.example.rabbitmq.demo7;

import com.example.rabbitmq.demo2.RabbitMqUtils;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

/**

* TTL時(shí)間過(guò)期

*/

public class Consumer02 {

private static final String DEAD_EXCHANGE = "dead_exchange";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

String deadQueue = "dead-queue";

channel.queueDeclare(deadQueue, false, false, false, null);

channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

System.out.println(" 等待接收死信隊(duì)列消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer02 接收死信隊(duì)列的消息" + message);

};

channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {

});

}

}

2)隊(duì)列達(dá)到最大長(zhǎng)度

1.消息生產(chǎn)者代碼去掉 TTL 屬性

/**

* 隊(duì)列達(dá)到最大長(zhǎng)度

*/

public class Producer {

private static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 該信息是用作演示隊(duì)列個(gè)數(shù)限制

for (int i = 1; i <11 ; i++) {

String message="info"+i;

channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());

System.out.println(" 生產(chǎn)者發(fā)送消息:"+message);

}

}

}

}

2.C1 消費(fèi)者修改以下代碼 ( 啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)

添加 params.put("x-max-length",6);

/**

* 隊(duì)列達(dá)到最大長(zhǎng)度

*/

public class Consumer01 {

// 普通交換機(jī)名稱

private static final String NORMAL_EXCHANGE = "normal_exchange";

// 死信交換機(jī)名稱

private static final String DEAD_EXCHANGE = "dead_exchange";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明死信和普通交換機(jī) 類型為 direct

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明死信隊(duì)列

String deadQueue = "dead-queue";

channel.queueDeclare(deadQueue, false, false, false, null);

// 死信隊(duì)列綁定死信交換機(jī)與 routingkey

channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

// 正常隊(duì)列綁定死信隊(duì)列信息

Map params = new HashMap<>();

// 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值

params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

// 正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值

params.put("x-dead-letter-routing-key", "lisi");

params.put("x-max-length",6);

String normalQueue = "normal-queue";

channel.queueDeclare(normalQueue, false, false, false, params);

channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("Consumer01 接收到消息"+message);

};

channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {

});

}

}

注意此時(shí)需要把原先隊(duì)列刪除 因?yàn)閰?shù)改變了

3.C2 消費(fèi)者代碼不變( 啟動(dòng) C2 消費(fèi)者)

3)消息被拒

1.消息生產(chǎn)者代碼同上生產(chǎn)者一致

2.C1 消費(fèi)者代碼( 啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息)

/**

* 消息被拒

*/

public class Consumer01 {

// 普通交換機(jī)名稱

private static final String NORMAL_EXCHANGE = "normal_exchange";

// 死信交換機(jī)名稱

private static final String DEAD_EXCHANGE = "dead_exchange";

public static void main(String[] argv) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 聲明死信和普通交換機(jī) 類型為 direct

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 聲明死信隊(duì)列

String deadQueue = "dead-queue";

channel.queueDeclare(deadQueue, false, false, false, null);

// 死信隊(duì)列綁定死信交換機(jī)與 routingkey

channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

// 正常隊(duì)列綁定死信隊(duì)列信息

Map params = new HashMap<>();

// 正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值

params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

// 正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值

params.put("x-dead-letter-routing-key", "lisi");

String normalQueue = "normal-queue";

channel.queueDeclare(normalQueue, false, false, false, params);

channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

System.out.println(" 等待接收消息.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

if(message.equals("info5")){

System.out.println("Consumer01 接收到消息" + message + " 并拒絕簽收該消息");

//requeue 設(shè)置為 false 代表拒絕重新入隊(duì) 該隊(duì)列如果配置了死信交換機(jī)將發(fā)送到死信隊(duì)列中

channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

}else {

System.out.println("Consumer01 接收到消息"+message);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

};

boolean autoAck = false;

channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {

});

}

}

3.C2 消費(fèi)者代碼不變

啟動(dòng)消費(fèi)者 01 然后再啟動(dòng)消費(fèi)者 02

注意此時(shí)需要把原先隊(duì)列刪除 因?yàn)閰?shù)改變了

生產(chǎn)者如何獲取消費(fèi)結(jié)果

Rocketmq 自帶全局消息id,能夠根據(jù)該全局消息獲取消費(fèi)結(jié)果

原理: 生產(chǎn)者投遞消息到mq服務(wù)器,mq服務(wù)器端在這時(shí)候返回一個(gè)全局的消息id,當(dāng)我們消費(fèi)者消費(fèi)該消息成功之后,消費(fèi)者會(huì)給我們mq服務(wù)器端發(fā)送通知標(biāo)記該消息消費(fèi)成功。

生產(chǎn)者獲取到該消息全局id,每隔2s時(shí)間調(diào)用mq服務(wù)器端接口查詢?cè)撓⑹欠裼斜幌M(fèi)成功。

冪等性

1、概念

用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。

舉個(gè)最簡(jiǎn)單的例子,那就是支付,用戶購(gòu)買商品后支付,支付扣款成功,但是返回結(jié)果的時(shí)候網(wǎng)絡(luò)異常,此時(shí)錢已經(jīng)扣了,用戶再次點(diǎn)擊按鈕,此時(shí)會(huì)進(jìn)行第二次扣款,返回結(jié)果成功,用戶查詢余額發(fā)現(xiàn)多扣錢了,流水記錄也變成了兩條。在以前的單應(yīng)用系統(tǒng)中,我們只需要把數(shù)據(jù)操作放入事務(wù)中即可,發(fā)生錯(cuò)誤立即回滾,但是再響應(yīng)客戶端的時(shí)候也有可能出現(xiàn)網(wǎng)絡(luò)中斷或者異常等等

消費(fèi)者在消費(fèi) MQ 中的消息時(shí),MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷,故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息。

2、解決冪等性問(wèn)題

MQ 消費(fèi)者的冪等性的解決一般使用全局 ID 或者寫(xiě)個(gè)唯一標(biāo)識(shí)比如時(shí)間戳 或者 UUID 或者訂單消費(fèi)者消費(fèi) MQ 中的消息也可利用 MQ 的該 id 來(lái)判斷,或者可按自己的規(guī)則生成一個(gè)全局唯一 id,每次消費(fèi)消息時(shí)用該 id 先判斷該消息是否已消費(fèi)過(guò)。

主流的有兩種解決思路:

a.唯一 ID+指紋碼機(jī)制,利用數(shù)據(jù)庫(kù)主鍵去重

指紋碼:我們的一些規(guī)則或者時(shí)間戳加別的服務(wù)給到的唯一信息碼,它并不一定是我們系統(tǒng)生成的,基本都是由我們的業(yè)務(wù)規(guī)則拼接而來(lái),但是一定要保證唯一性,然后就利用查詢語(yǔ)句進(jìn)行判斷這個(gè) id 是否存在數(shù)據(jù)庫(kù)中,優(yōu)勢(shì)就是實(shí)現(xiàn)簡(jiǎn)單就一個(gè)拼接,然后查詢判斷是否重復(fù);劣勢(shì)就是在高并發(fā)時(shí),如果是單個(gè)數(shù)據(jù)庫(kù)就會(huì)有寫(xiě)入性能瓶頸當(dāng)然也可以采用分庫(kù)分表提升性能,但也不是我們最推薦的方式。

b.利用 redis 的原子性去實(shí)現(xiàn)

利用 redis 執(zhí)行 setnx 命令,天然具有冪等性。從而實(shí)現(xiàn)不重復(fù)消費(fèi)

延遲隊(duì)列

1、概念

延時(shí)隊(duì)列,隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。

2、延遲隊(duì)列使用場(chǎng)景

1.訂單在十分鐘之內(nèi)未支付則自動(dòng)取消

2.新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。

3.用戶注冊(cè)成功后,如果三天內(nèi)沒(méi)有登陸則進(jìn)行短信提醒。

4.用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。

5.預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議

3、RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒。換句話說(shuō),如果一條消息設(shè)置了 TTL 屬性或者進(jìn)入了設(shè)置 TTL 屬性的隊(duì)列,那么這條消息如果在 TTL 設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi),則會(huì)成為"死信"。如果同時(shí)配置了隊(duì)列的 TTL 和消息的TTL,那么較小的那個(gè)值將會(huì)被使用,有兩種方式設(shè)置 TTL。

① 消息設(shè)置 TTL

一種方式便是針對(duì)每條消息設(shè)置 TTL

② 隊(duì)列設(shè)置 TTL

一種是在創(chuàng)建隊(duì)列的時(shí)候設(shè)置隊(duì)列的“x-message-ttl”屬性

③ 二者區(qū)別

第一種方式,消息即使過(guò)期,也不一定會(huì)被馬上丟棄,因?yàn)橄⑹欠襁^(guò)期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過(guò)期的消息也許還能存活較長(zhǎng)時(shí)間;如果設(shè)置了隊(duì)列的 TTL 屬性,那么一旦消息過(guò)期,就會(huì)被隊(duì)列丟棄(如果配置了死信隊(duì)列被丟到死信隊(duì)列中)。另外,還需要注意的一點(diǎn)是,如果不設(shè)置 TTL,表示消息永遠(yuǎn)不會(huì)過(guò)期,如果將 TTL 設(shè)置為 0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。

4、延遲隊(duì)列案例

創(chuàng)建兩個(gè)隊(duì)列 QA 和 QB,兩者隊(duì)列 TTL 分別設(shè)置為 10S 和 40S,然后在創(chuàng)建一個(gè)交換機(jī) X 和死信交換機(jī) Y,它們的類型都是 direct,創(chuàng)建一個(gè)死信隊(duì)列 QD,它們的綁定關(guān)系如下:

配置類

package com.example.orderproducerconsumer.config;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class TtlQueueConfig {

public static final String X_EXCHANGE = "X";

public static final String QUEUE_A = "QA";

public static final String QUEUE_B = "QB";

public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

public static final String DEAD_LETTER_QUEUE = "QD";

// 聲明 xExchange

@Bean("xExchange")

public DirectExchange xExchange(){

return new DirectExchange(X_EXCHANGE);

}

// 聲明 xExchange

@Bean("yExchange")

public DirectExchange yExchange(){

return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);

}

// 聲明隊(duì)列 A ttl 為 10s 并綁定到對(duì)應(yīng)的死信交換機(jī)

@Bean("queueA")

public Queue queueA(){

Map args = new HashMap<>(3);

// 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)

args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

// 聲明當(dāng)前隊(duì)列的死信路由 key

args.put("x-dead-letter-routing-key", "YD");

// 聲明隊(duì)列的 TTL

args.put("x-message-ttl", 10000);

return QueueBuilder.durable(QUEUE_A).withArguments(args).build();

}

// 聲明隊(duì)列 A 綁定 X 交換機(jī)

@Bean

public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}

// 聲明隊(duì)列 B ttl 為 40s 并綁定到對(duì)應(yīng)的死信交換機(jī)

@Bean("queueB")

public Queue queueB(){

Map args = new HashMap<>(3);

// 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)

args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

// 聲明當(dāng)前隊(duì)列的死信路由 key

args.put("x-dead-letter-routing-key", "YD");

// 聲明隊(duì)列的 TTL

args.put("x-message-ttl", 40000);

return QueueBuilder.durable(QUEUE_B).withArguments(args).build();

}

// 聲明隊(duì)列 B 綁定 X 交換機(jī)

@Bean

public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queue1B).to(xExchange).with("XB");

}

// 聲明死信隊(duì)列 QD

@Bean("queueD")

public Queue queueD(){

return new Queue(DEAD_LETTER_QUEUE);

}

// 聲明死信隊(duì)列 QD 綁定關(guān)系

@Bean

public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,

@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");

}

}

消息生產(chǎn)者

package com.example.orderproducerconsumer.controller;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j

@RequestMapping("ttl")

@RestController

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("sendMsg/{message}")

public void sendMsg(@PathVariable String message){

log.info(" 當(dāng)前時(shí)間:{}, 發(fā)送一條信息給兩個(gè) TTL 隊(duì)列:{}", new Date(), message);

rabbitTemplate.convertAndSend("X", "XA", " 消息來(lái)自 ttl 為 為 10S 的隊(duì)列: "+message);

rabbitTemplate.convertAndSend("X", "XB", " 消息來(lái)自 ttl 為 為 40S 的隊(duì)列: "+message);

}

}

消息消費(fèi)者

package com.example.orderproducerconsumer.controller;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.Date;

@Slf4j

@Component

public class DeadLetterQueueConsumer {

@RabbitListener(queues = "QD")

public void receiveD(Message message, Channel channel) throws IOException {

String msg = new String(message.getBody());

log.info(" 當(dāng)前時(shí)間:{}, 收到死信隊(duì)列信息{}", new Date().toString(), msg);

}

}

發(fā)起一個(gè)請(qǐng)求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻

第一條消息在 10S 后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在 40S 之后變成了死信消息,然后被消費(fèi)掉,這樣一個(gè)延時(shí)隊(duì)列就打造完成了。

不過(guò),如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,這里只有 10S 和 40S兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理,那么就需要增加 TTL 為一個(gè)小時(shí)的隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求?

5、延時(shí)隊(duì)列優(yōu)化

在這里新增了一個(gè)隊(duì)列 QC,綁定關(guān)系如下,該隊(duì)列不設(shè)置 TTL 時(shí)間

配置類

@Component

public class MsgTtlQueueConfig {

public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

public static final String QUEUE_C = "QC";

// 聲明隊(duì)列 C 死信交換機(jī)

@Bean("queueC")

public Queue queueB(){

Map args = new HashMap<>(3);

// 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)

args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

// 聲明當(dāng)前隊(duì)列的死信路由 key

args.put("x-dead-letter-routing-key", "YD");

// 沒(méi)有聲明 TTL 屬性

return QueueBuilder.durable(QUEUE_C).withArguments(args).build();

}

// 聲明隊(duì)列 B 綁定 X 交換機(jī)

@Bean

public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,

@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueC).to(xExchange).with("XC");

}

}

消息生產(chǎn)者

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")

public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {

rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{

correlationData.getMessageProperties().setExpiration(ttlTime);

return correlationData;

});

log.info(" 當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng){} 毫秒 TTL 信息給隊(duì)列 C:{}", new Date(),ttlTime, message);

}

發(fā)起請(qǐng)求

http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000

http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。

6、Rabbitmq 插件實(shí)現(xiàn)延遲隊(duì)列

上文中提到的問(wèn)題,確實(shí)是一個(gè)問(wèn)題,如果不能實(shí)現(xiàn)在消息粒度上的 TTL,并使其在設(shè)置的 TTL 時(shí)間及時(shí)死亡,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列。那如何解決呢,接下來(lái)我們就去解決該問(wèn)題。

安裝延時(shí)隊(duì)列插件

在官網(wǎng)上下載 Community Plugins | RabbitMQ,下載

rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄。

進(jìn)入 RabbitMQ 的安裝目錄下的 plgins 目錄,執(zhí)行下面命令讓該插件生效,然后重啟 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

7、插件代碼實(shí)現(xiàn)

在這里新增了一個(gè)隊(duì)列 delayed.queue,一個(gè)自定義交換機(jī) delayed.exchange,綁定關(guān)系如下:

在我們自定義的交換機(jī)中,這是一種新的交換類型,該類型消息支持延遲投遞機(jī)制 消息傳遞后并不會(huì)立即投遞到目標(biāo)隊(duì)列中,而是存儲(chǔ)在 mnesia(一個(gè)分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時(shí)間時(shí),才投遞到目標(biāo)隊(duì)列中。

配置類

@Configuration

public class DelayedQueueConfig {

public static final String DELAYED_QUEUE_NAME = "delayed.queue";

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@Bean

public Queue delayedQueue() {

return new Queue(DELAYED_QUEUE_NAME);

}

// 自定義交換機(jī) 我們?cè)谶@里定義的是一個(gè)延遲交換機(jī)

@Bean

public CustomExchange delayedExchange() {

Map args = new HashMap<>();

// 自定義交換機(jī)的類型

args.put("x-delayed-type", "direct");

return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,

args);

}

@Bean

public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,

@Qualifier("delayedExchange") CustomExchange

delayedExchange) {

return

BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();

}

}

消息生產(chǎn)者

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@GetMapping("sendDelayMsg/{message}/{delayTime}")

public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {

rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,

correlationData ->{

correlationData.getMessageProperties().setDelay(delayTime);

return correlationData;

});

log.info(" 當(dāng) 前 時(shí) 間 : {}, 發(fā) 送 一 條 延 遲 {} 毫 秒 的 信 息 給 隊(duì) 列 delayed.queue:{}", new

Date(),delayTime, message);

}

消息消費(fèi)者

public static final String DELAYED_QUEUE_NAME = "delayed.queue";

@RabbitListener(queues = DELAYED_QUEUE_NAME)

public void receiveDelayedQueue(Message message){

String msg = new String(message.getBody());

log.info(" 當(dāng)前時(shí)間:{}, 收到延時(shí)隊(duì)列的消息:{}", new Date().toString(), msg);

}

發(fā)起請(qǐng)求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000

http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

第二個(gè)消息被先消費(fèi)掉了,符合預(yù)期

8、總結(jié)

延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。

優(yōu)先級(jí)隊(duì)列

使用場(chǎng)景

在我們系統(tǒng)中有一個(gè)訂單催付的場(chǎng)景,我們的客戶在天貓下的訂單,淘寶會(huì)及時(shí)將訂單推送給我們,如果在用戶設(shè)定的時(shí)間內(nèi)未付款那么就會(huì)給用戶推送一條短信提醒,很簡(jiǎn)單的一個(gè)功能對(duì)吧,但是,tmall商家對(duì)我們來(lái)說(shuō),肯定是要分大客戶和小客戶的對(duì)吧,比如像蘋果,小米這樣大商家一年起碼能給我們創(chuàng)造很大的利潤(rùn),所以理應(yīng)當(dāng)然,他們的訂單必須得到優(yōu)先處理,而曾經(jīng)我們的后端系統(tǒng)是使用 redis 來(lái)存放的定時(shí)輪詢,大家都知道 redis 只能用 List 做一個(gè)簡(jiǎn)簡(jiǎn)單單的消息隊(duì)列,并不能實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)的場(chǎng)景,所以訂單量大了后采用 RabbitMQ 進(jìn)行改造和優(yōu)化,如果發(fā)現(xiàn)是大客戶的訂單給一個(gè)相對(duì)比較高的優(yōu)先級(jí),否則就是默認(rèn)優(yōu)先級(jí)。

如何添加

a.控制臺(tái)頁(yè)面添加

b.隊(duì)列中代碼添加優(yōu)先級(jí)

Map params = new HashMap();

params.put("x-max-priority", 10);

channel.queueDeclare("hello", true, false, false, params);

c.消息中代碼添加優(yōu)先級(jí)

AMQP.BasicProperties properties = new

AMQP.BasicProperties().builder().priority(5).build();

d.注意事項(xiàng)

要讓隊(duì)列實(shí)現(xiàn)優(yōu)先級(jí)需要做的事情有如下事情:隊(duì)列需要設(shè)置為優(yōu)先級(jí)隊(duì)列,消息需要設(shè)置消息的優(yōu)先級(jí),消費(fèi)者需要等待消息已經(jīng)發(fā)送到隊(duì)列中才去消費(fèi)因?yàn)?,這樣才有機(jī)會(huì)對(duì)消息進(jìn)行排序

優(yōu)先級(jí)隊(duì)列案例

消息生產(chǎn)者

public class Producer {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel();) {

// 給消息賦予一個(gè) priority 屬性

AMQP.BasicProperties properties = new

AMQP.BasicProperties().builder().priority(5).build();

for (int i = 1; i <11; i++) {

String message = "info"+i;

if(i==5){

channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

}else{

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

}

System.out.println(" 發(fā)送消息完成:" + message);

}

}

}

}

消息消費(fèi)者

public class Consumer {

private static final String QUEUE_NAME="hello";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

// 設(shè)置隊(duì)列的最大優(yōu)先級(jí) 最大可以設(shè)置到 255 官網(wǎng)推薦 1-10 如果設(shè)置太高比較吃內(nèi)存和 CPU

Map params = new HashMap();

params.put("x-max-priority", 10);

channel.queueDeclare(QUEUE_NAME, true, false, false, params);

System.out.println(" 消費(fèi)者啟動(dòng)等待消費(fèi)......");

DeliverCallback deliverCallback=(consumerTag, delivery)->{

String receivedMessage = new String(delivery.getBody());

System.out.println(" 接收到消息:"+receivedMessage);

};

channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{

System.out.println(" 消費(fèi)者無(wú)法消費(fèi) 消息時(shí)調(diào)用,如隊(duì)列被刪除");

});

}

}

惰性隊(duì)列

1. 使用場(chǎng)景

RabbitMQ 從 3.6.0 版本開(kāi)始引入了惰性隊(duì)列的概念。惰性隊(duì)列會(huì)盡可能的將消息存入磁盤中,而在消費(fèi)者消費(fèi)到相應(yīng)的消息時(shí)才會(huì)被加載到內(nèi)存中,它的一個(gè)重要的設(shè)計(jì)目標(biāo)是能夠支持更長(zhǎng)的隊(duì)列,即支持更多的消息存儲(chǔ)。當(dāng)消費(fèi)者由于各種各樣的原因(比如消費(fèi)者下線、宕機(jī)亦或者是由于維護(hù)而關(guān)閉等)而致使長(zhǎng)時(shí)間內(nèi)不能消費(fèi)消息造成堆積時(shí),惰性隊(duì)列就很有必要了。

默認(rèn)情況下,當(dāng)生產(chǎn)者將消息發(fā)送到 RabbitMQ 的時(shí)候,隊(duì)列中的消息會(huì)盡可能的存儲(chǔ)在內(nèi)存之中,這樣可以更加快速的將消息發(fā)送給消費(fèi)者。即使是持久化的消息,在被寫(xiě)入磁盤的同時(shí)也會(huì)在內(nèi)存中駐留一份備份。當(dāng) RabbitMQ 需要釋放內(nèi)存的時(shí)候,會(huì)將內(nèi)存中的消息換頁(yè)至磁盤中,這個(gè)操作會(huì)耗費(fèi)較長(zhǎng)的時(shí)間,也會(huì)阻塞隊(duì)列的操作,進(jìn)而無(wú)法接收新的消息。雖然 RabbitMQ 的開(kāi)發(fā)者們一直在升級(jí)相關(guān)的算法,但是效果始終不太理想,尤其是在消息量特別大的時(shí)候。

2. 兩種模式

隊(duì)列具備兩種模式:default 和 lazy。默認(rèn)的為 default 模式,在 3.6.0 之前的版本無(wú)需做任何變更。lazy模式即為惰性隊(duì)列的模式,可以通過(guò)調(diào)用 channel.queueDeclare 方法的時(shí)候在參數(shù)中設(shè)置,也可以通過(guò)Policy 的方式設(shè)置,如果一個(gè)隊(duì)列同時(shí)使用這兩種方式設(shè)置的話,那么 Policy 的方式具備更高的優(yōu)先級(jí)。

如果要通過(guò)聲明的方式改變已有隊(duì)列的模式的話,那么只能先刪除隊(duì)列,然后再重新聲明一個(gè)新的。在隊(duì)列聲明的時(shí)候可以通過(guò)“x-queue-mode”參數(shù)來(lái)設(shè)置隊(duì)列的模式,取值為“default”和“l(fā)azy”。下面示例中演示了一個(gè)惰性隊(duì)列的聲明細(xì)節(jié):

Map args = new HashMap();

args.put("x-queue-mode", "lazy");

channel.queueDeclare("myqueue", false, false, false, args);

3. 內(nèi)存開(kāi)銷對(duì)比

在發(fā)送 1 百萬(wàn)條消息,每條消息大概占 1KB 的情況下,普通隊(duì)列占用內(nèi)存是 1.2GB,而惰性隊(duì)列僅僅占用 1.5MB

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ知識(shí)點(diǎn)

http://yzkb.51969.com/

相關(guān)閱讀

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18963736.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄