柚子快報邀請碼778899分享:分布式 rabbitMq
柚子快報邀請碼778899分享:分布式 rabbitMq
AMQP
??????????AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標準高級消息隊列是什協(xié)議,是應(yīng)用層協(xié)議的一個開放標準,為面向消息的中間件設(shè)計?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。Erlang中的實現(xiàn)有RabbitMQ等。
(應(yīng)用層協(xié)議,開放標準,與實現(xiàn)無關(guān))
RabbitMQ是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語言編寫的,而集群和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
RabbitMQ 是一個可靠且成熟的消息傳遞和流代理,易于部署在云環(huán)境、本地和本地計算機上。它目前被全球數(shù)百萬人使用。
AMQP定義網(wǎng)絡(luò)協(xié)議和代理服務(wù)如下
一套確定的消息交換功能,也就是“高級消息交換協(xié)議模型”?路由、存儲、消息交換
AMQP模型
術(shù)語
連接(Connection):一個網(wǎng)絡(luò)連接,比如TCP/IP套接字連接。
會話(Session):端點之間的命名對話。在一個會話上下文中,保證“恰好傳遞一次”。
信道(Channel):多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道。為會話提供物理傳輸介質(zhì)。
客戶端(Client):AMQP連接或者會話的發(fā)起者。AMQP是非對稱的,客戶端生產(chǎn)和消費消息,服務(wù)器存儲和路由這些消息。
服務(wù)器(Server):接受客戶端連接,實現(xiàn)AMQP消息隊列和路由功能的進程。也稱為“消息代理”。
交換器(Exchange):服務(wù)器中的實體,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列。
交換器類型(Exchange Type):基于不同路由語義的交換器類。
消息隊列(Message Queue):一個命名實體,用來保存消息直到發(fā)送給消費者。
綁定器(Binding):消息隊列和交換器之間的關(guān)聯(lián)。
綁定器關(guān)鍵字(Binding Key):綁定的名稱。一些交換器類型可能使用這個名稱作為定義綁定器路由行為的模式。
路由關(guān)鍵字(Routing Key):一個消息頭,交換器可以用這個消息頭決定如何路由某條消息。
持久存儲(Durable):一種服務(wù)器資源,當服務(wù)器重啟時,保存的消息數(shù)據(jù)不會丟失。
臨時存儲(Transient):一種服務(wù)器資源,當服務(wù)器重啟時,保存的消息數(shù)據(jù)會丟失。
持久化(Persistent):服務(wù)器將消息保存在可靠磁盤存儲中,當服務(wù)器重啟時,消息不會丟失。
非持久化(Non-Persistent):服務(wù)器將消息保存在內(nèi)存中,當服務(wù)器重啟時,消息可能丟失。
虛擬主機(Virtual Host):一批交換器、消息隊列和相關(guān)對象。虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務(wù)器域。客戶端應(yīng)用程序在登錄到服務(wù)器之后,可以選擇一個虛擬主機。
主題:通常指發(fā)布消息;AMQP規(guī)范用一種或多種交換器來實現(xiàn)主題。
許可證
自 2007 年首次發(fā)布以來,RabbitMQ 是免費和開源軟件。此外,Broadcom 還提供一系列商業(yè)產(chǎn)品。
RabbitMQ 在 Apache 許可證 2.0 和 Mozilla 公共許可證 2 下獲得雙重許可。您可以隨心所欲地使用和修改 RabbitMQ。
github
RabbitMQ · GitHub
rabbitmq-server?
?rabbitmq-java-client?
rabbitmq-website?
安裝
最新版本是?RabbitMQ?的 3.13.3。
?docker 鏡像:hub.docker.com
# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
RabbitMQ 服務(wù)器
?Erlang 版本要求:? Erlang 和 ????????.26.x? ?25.x
Installing on Windows | RabbitMQ
rabbitmq-server-generic-unix-3.13.3.tar.xzhttps://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.3/rabbitmq-server-generic-unix-3.13.3.tar.xz
CLI 工具
RabbitMQ 節(jié)點通常使用?PowerShell?中的?CLI 工具進行管理、檢查和操作。
在 Windows 上,與其他平臺相比,CLI 工具具有后綴。例如,在 Windows 上被調(diào)用為 ..batrabbitmqctlrabbitmqctl.bat
要了解各種 RabbitMQ CLI 工具提供的命令,請使用以下命令:help
# lists commands provided by rabbitmqctl.bat
rabbitmqctl.bat help
# lists commands provided by rabbitmq-diagnostics.bat
rabbitmq-diagnostics.bat help
# ...you guessed it!
rabbitmq-plugins.bat help
管理 RabbitMQ 節(jié)點
管理服務(wù)
可以在“開始”菜單中找到指向 RabbitMQ 目錄的鏈接。
還有一個指向命令提示符窗口的鏈接,該窗口 將在 sbin 目錄的“開始”菜單中啟動。這是 運行命令行工具的最便捷方式。
請注意,CLI 工具必須對目標 RabbitMQ 節(jié)點進行身份驗證。
停止節(jié)點
要停止代理或檢查其狀態(tài),請使用 in(以管理員身份)。rabbitmqctl.batsbin
rabbitmqctl.bat stop
檢查節(jié)點狀態(tài)
以下?CLI 命令運行基本運行狀況檢查,并顯示有關(guān)節(jié)點的一些信息(如果節(jié)點正在運行)。
# A basic health check of both the node and CLI tool connectivity/authentication
rabbitmqctl.bat status
為了讓它工作, 必須滿足兩個條件:
節(jié)點必須正在運行rabbitmqctl.bat必須能夠向節(jié)點進行身份驗證
1、發(fā)布者、交換機、隊列、消費者都可以有多個。同時因為 AMQP 是一個網(wǎng)絡(luò)協(xié)議,所以這個過程中的發(fā)布者,消費者,消息代理 可以分別存在于不同的設(shè)備上。
2、發(fā)布者發(fā)布消息時可以給消息指定各種消息屬性(Message Meta-data)。有些屬性有可能會被消息代理(Brokers)使用,然而其他的屬性則是完全不透明的,它們只能被接收消息的應(yīng)用所使用。
3、從安全角度考慮,網(wǎng)絡(luò)是不可靠的,又或是消費者在處理消息的過程中意外掛掉,這樣沒有處理成功的消息就會丟失。基于此原因,AMQP 模塊包含了一個消息確認(Message Acknowledgements)機制:當一個消息從隊列中投遞給消費者后,不會立即從隊列中刪除,直到它收到來自消費者的確認回執(zhí)(Acknowledgement)后,才完全從隊列中刪除。
4、在某些情況下,例如當一個消息無法被成功路由時(無法從交換機分發(fā)到隊列),消息或許會被返回給發(fā)布者并被丟棄?;蛘撸绻⒋韴?zhí)行了延期操作,消息會被放入一個所謂的死信隊列中。此時,消息發(fā)布者可以選擇某些參數(shù)來處理這些特殊情況。
Exchange交換機
交換機是用來發(fā)送消息的 AMQP 實體。
交換機拿到一個消息之后將它路由給一個或零個隊列。
它使用哪種路由算法是由交換機類型和綁定(Bindings)規(guī)則所決定的。
交換機類型
Direct Exchange(直連交換機) (Empty String) and amq.direct
Fanout Exchange(扇形交換機) amq.fanout
Topic Exchange(主題交換機) amq.topic
Headers Exchange(頭交換機) amq.match(and amq.headers in rabbitMQ)
除交換機類型外,在聲明交換機時還可以附帶其它屬性,分別是:
Name
Durability (消息代理重啟后,交換機是否還存在)
Auto-delete (當所有與之綁定的隊列都完成了對此交換機的使用后,刪除它)
Aruguments (依賴代理本身)
交換機狀態(tài)
持久(durable)、暫存(transient)。
durable交換機消息代理重啟后依舊存在
暫存的交換機則不會(它們需要在代理再次上線后重新被聲明)
并不是所有的應(yīng)用場景都需要持久化的交換機。
默認交換機
默認交換機(default exchange)實際上是一個由消息代理預(yù)先聲明好的沒有名字(名字為空字符串)的直連交換機(direct exchange)。
它有一個特殊的屬性使得它對于簡單應(yīng)用特別有用處:那就是每個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
隊列教程(Queue tutorials)
1. "Hello World!"??
編寫程序以發(fā)送和接收來自命名隊列的消息
Sending
We'll call our message publisher (sender)?Send?and our message consumer (receiver)?Recv. The publisher will connect to RabbitMQ, send a single message, then exit.
rabbitmq-tutorials/java/Send.java at main · rabbitmq/rabbitmq-tutorials · GitHub
Send.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
// 設(shè)置類并命名隊列:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
...
}
}
//創(chuàng)建與服務(wù)器的連接:
x
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
}
//要發(fā)送,我們必須聲明一個隊列供我們發(fā)送到;然后我們可以發(fā)布一條消息 到隊列中,所有這些都在 try-with-resources 語句中
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
/**全部send代碼**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
接收
們的消費者會收聽來自 RabbitMQ,所以與發(fā)布單個消息的發(fā)布者不同,我們將 讓使用者保持運行以偵聽消息并將其打印出來。
Recv.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
//設(shè)置與發(fā)布者相同;我們打開一個連接和一個 channel,并聲明我們將要從中使用的隊列。 請注意,這與發(fā)布到的隊列匹配。
//注意,我們也在此處聲明隊列。因為我們可能會開始 消費者先于發(fā)布者,我們要確保隊列存在 在我們嘗試從中消費消息之前。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
//我們將告訴服務(wù)器將來自 隊列。由于它將異步推送我們消息,因此我們提供了一個 以對象的形式進行回調(diào),該對象將緩沖消息,直到 我們已準備好使用它們。這就是子類的作用。DeliverCallback
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
//完整Recv.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
2. Work Queues(任務(wù)隊列)
創(chuàng)建一個工作隊列,用于分發(fā) 多個工作人員之間的耗時任務(wù)。
工作隊列(又名:任務(wù)隊列):背后的主要思想是避免 立即執(zhí)行資源密集型任務(wù),而必須等待 它完成。相反,我們將任務(wù)安排在以后完成。我們將任務(wù)封裝為消息并將其發(fā)送到隊列。正在運行的工作進程 在后臺將彈出任務(wù)并最終執(zhí)行 工作。當您運行許多工作線程時,任務(wù)將在它們之間共享。
示例:
NewTask.java
Worker.java
NewTask.java 將任務(wù)安排到我們的工作隊列中
String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Worker.java 它將處理 傳遞消息并執(zhí)行任務(wù)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? String message = new String(delivery.getBody(), "UTF-8");
? System.out.println(" [x] Received '" + message + "'");
? try {
? ? doWork(message);
? } finally {
? ? System.out.println(" [x] Done");
? }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
模擬執(zhí)行時間的假任務(wù):
private static void doWork(String task) throws InterruptedException {
? ? for (char ch: task.toCharArray()) {
? ? ? ? if (ch == '.') Thread.sleep(1000);
? ? }
}
消息確認 autoAck=true false
// accept only one unack-ed message at a time (see below)
channel.basicQos(1);?
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? String message = new String(delivery.getBody(), "UTF-8");
? System.out.println(" [x] Received '" + message + "'");
? try {
? ? doWork(message);
? } finally {
? ? System.out.println(" [x] Done");
? ? channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
? }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
消息持久性(已經(jīng)聲明相同名稱的隊列,此定義不生效)
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
//定義另外一個隊列 此更改需要應(yīng)用于生產(chǎn)者 和消費者代碼。queueDeclare
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
//我們需要將消息標記為持久性task_queue
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
? ? ? ? ? ? MessageProperties.PERSISTENT_TEXT_PLAIN,
? ? ? ? ? ? message.getBytes());
Fair dispatch(公平調(diào)度)
在有兩個工人的情況下,當所有 奇數(shù)消息很重,偶數(shù)消息很輕,一個工人會 一直很忙,另一個人幾乎不做任何工作。井 RabbitMQ 對此一無所知,仍然會調(diào)度 消息均勻。
發(fā)生這種情況是因為 RabbitMQ 只是在消息時調(diào)度消息 進入隊列。它不看未確認的數(shù)量 給消費者的消息。它只是盲目地發(fā)送每 n 條消息 到第 n 個消費者。
為了解決這個問題,we can use the?basicQos?method with the?prefetchCount?=?1?setting.。這告訴 RabbitMQ 不要給超過 一次向工作人員發(fā)送一條消息?;蛘?,換句話說,不要派遣 向工作人員發(fā)送一條新消息,直到它處理并確認 上一個。相反,它會將其分派給下一個尚未忙碌的工作人員。basicQosprefetchCount1
int prefetchCount = 1;
channel.basicQos(prefetchCount);
關(guān)于隊列大小的注意事項
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
以上內(nèi)容集成在一起?完整代碼 NewTask.java ?Worker.java
NewTask.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Worker.java:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
3. Publish/Subscribe(發(fā)布/訂閱)
一次向多個消費者發(fā)送消息
它將由兩個程序組成 - 第一個程序?qū)l(fā)出日志 消息,第二個將接收并打印它們。
在我們的日志記錄系統(tǒng)中,接收器程序的每個運行副本都將 獲取消息。這樣,我們將能夠運行一個接收器,并且 將日志定向到磁盤;同時,我們將能夠運行 另一個接收器,并在屏幕上查看日志。
從本質(zhì)上講,已發(fā)布的日志消息將廣播給所有人 接收器。
示例:構(gòu)建一個簡單的日志記錄 系統(tǒng)
回顧一下我們在前面的教程中介紹的內(nèi)容:
生產(chǎn)者是發(fā)送消息的用戶應(yīng)用程序。隊列是存儲消息的緩沖區(qū)。使用者是接收消息的用戶應(yīng)用程序。
RabbitMQ 中消息傳遞模型的核心思想是生產(chǎn)者 從不直接向隊列發(fā)送任何消息。實際上,很多時候 生產(chǎn)者甚至不知道消息是否會傳遞給任何 完全排隊。
相反,生產(chǎn)者只能向exchange發(fā)送消息。一個exchange是一件非常簡單的事情。一方面,它接收來自 生產(chǎn)者,另一方面,它將他們推到隊列中。交易所 必須確切地知道如何處理它收到的消息。應(yīng)該是這樣嗎 附加到特定隊列?是否應(yīng)該將其附加到許多隊列中? 或者它應(yīng)該被丟棄。其規(guī)則由交換類型定義。
There are a few exchange types available:?direct,?topic,?headers?and?fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it?logs:
channel.exchangeDeclare("logs", "fanout");
The fanout exchange is very simple. (fanout exchange)
Listing exchanges
sudo rabbitmqctl list_exchanges
在此列表中,將有一些交換和默認(未命名) 交換。這些是默認創(chuàng)建的,但不太可能需要 目前使用它們。amq.*
In this list there will be some?amq.*?exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.
Nameless exchange
在前幾部分中,我們對交換一無所知, 但仍然能夠?qū)⑾l(fā)送到隊列。這是可能的 因為我們使用的是默認交換,我們用空字符串 () 來標識它。""
In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").
回想一下我們之前是如何發(fā)布消息的:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數(shù)是exchange的名稱。 空字符串表示默認或無名交換:消息是 路由到名稱為 指定的隊列(如果存在)。routingKey
現(xiàn)在,我們可以改為發(fā)布到我們命名的exchange:
channel.basicPublish( "logs", "", null, message.getBytes());
Temporary queues(臨時隊列)
以前我們使用的隊列具有 特定名稱(還記得和?能夠命名 排隊對我們來說至關(guān)重要——我們需要將工人指向 相同的隊列。當您 希望在生產(chǎn)者和消費者之間共享隊列。hello?task_queue
但對于我們的記錄器來說,情況并非如此。我們想聽聽所有 日志消息,而不僅僅是其中的子集。我們是 也只對當前流動的消息感興趣,而不是對舊的消息感興趣 的。為了解決這個問題,我們需要兩件事。
首先,每當我們連接到 Rabbit 時,我們都需要一個新的空隊列。 為此,我們可以創(chuàng)建一個具有隨機名稱的隊列,或者, 更好的是 - 讓服務(wù)器為我們選擇一個隨機的隊列名稱。
其次,一旦我們斷開消費者的連接,隊列應(yīng)該是 自動刪除。
我們不提供任何參數(shù)時,我們會創(chuàng)建一個非持久的、獨占的、具有生成名稱的自動刪除隊列:queueDeclare()
String queueName = channel.queueDeclare().getQueue();
可以了解有關(guān)標志和其他隊列的更多信息 隊列指南中的屬性。exclusive
此時包含一個隨機隊列名稱。例如 它可能看起來像.queueNameamq.gen-JzTY20BRgKO-HjmUJj0wLg
We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a?binding.
已經(jīng)創(chuàng)建一個 fanout exchange 和一個 queue. 現(xiàn)在需要告訴exchange 為了發(fā)送消息到隊列,
這種exchange 和 隊列的關(guān)系叫做綁定
channel.queueBind(queueName, "logs", "");
從現(xiàn)在開始,交換會將消息附加到我們的隊列中。logs
列表綁定 Listing bindings(展示綁定關(guān)系)
可以使用以下方法列出現(xiàn)有綁定,
rabbitmqctl list_bindings
發(fā)出日志消息的 producer 程序看起來并不多?
我們現(xiàn)在希望將消息發(fā)布到我們的exchange,而不是 無名的。發(fā)送時我們需要提供一個,但它 交換的值被忽略。這是程序的代碼:logs?routingKey?fanout?
EmitLog.java
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
如您所見,在建立連接后,我們聲明了 交換。此步驟是必需的,因為發(fā)布到不存在的 禁止交換。
如果還沒有隊列綁定到交換,則消息將丟失, 但這對我們來說沒關(guān)系;如果還沒有消費者在聽,我們可以安全地丟棄該消息。
ReceiveLogs.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
驗證代碼是否實際 根據(jù)需要創(chuàng)建綁定和隊列。運行兩個程序后,您應(yīng)該會看到如下內(nèi)容:rabbitmqctl
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
4. Routing (路由)
創(chuàng)建了綁定
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定是交換和隊列之間的關(guān)系。這可以 簡單地理解為:隊列對來自以下消息的消息感興趣 交換。
綁定可以采用額外的參數(shù)。為了避免 與參數(shù)混淆,我們將其稱為 .這就是我們?nèi)绾问褂面I創(chuàng)建綁定的方法:routingKey?basic_publish?binding key
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key含義取決于交換類型。我們之前使用的交易所只是忽略了它的 價值。fanout
Direct exchange
上一教程中的日志記錄系統(tǒng)廣播所有消息 給所有消費者。我們希望擴展它以允許過濾消息 基于其嚴重性。例如,我們可能想要一個程序 將日志消息寫入磁盤以僅接收嚴重錯誤,以及 不要在警告或信息日志消息上浪費磁盤空間。
我們使用的是fanout?exchange,這并沒有給我們太多 靈活性 - 它只能進行無意識的廣播。
我們將改用direct? exchange。背后的路由算法 direct?exchange很簡單 - 消息進入與消息完全匹配的隊列。
We will use a?direct?exchange instead. The routing algorithm behind a?direct?exchange is simple - a message goes to the queues whose?binding key?exactly matches the?routing key?of the message.
為了說明這一點,請考慮以下設(shè)置:
在此設(shè)置中,我們可以看到綁定了兩個隊列的交換 到它。第一個隊列綁定了綁定鍵,第二個隊列綁定了綁定鍵 有兩個綁定,一個帶有綁定鍵,另一個帶有綁定鍵 跟。directXorangeblackgreen
In this setup, we can see the?direct?exchange?X?with two queues bound to it. The first queue is bound with binding key?orange, and the second has two bindings, one with binding key?black?and the other one with?green.
在這樣的設(shè)置中,使用routing key orange發(fā)布到exchange的郵件將被路由到隊列Q1。routing key 為 black或green?將會到Q2。所有其他消息將被丟棄。
In such a setup a message published to the exchange with a routing key?orange?will be routed to queue?Q1. Messages with a routing key of?black?or?green?will go to?Q2. All other messages will be discarded.
Multiple bindings (多綁定)
It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between?X?and?Q1?with binding key?black. In that case, the?direct?exchange will behave like?fanout?and will broadcast the message to all the matching queues. A message with routing key?black?will be delivered to both?Q1?and?Q2.
合法的用相同的routing key 綁定多個隊列,在我們的例子中,我們能夠添加一個黑色的bingding key? 從X到Q1隊列,在這個例子中,direct exchange 的行為像fanout 廣播message 到所有的匹配隊列,一個消息,將被傳遞到Q1 和 Q2
發(fā)出日志 (Emitting logs)
我們將此模型用于日志記錄系統(tǒng)。取代fanout exchange。我們將發(fā)送一個direct exchange.
我們申請這個log作為一個routing key ,這樣接收程序?qū)⒖赡苓x擇想接收的。
As always, we need to create an exchange first:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
And we're ready to send a message:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
To?simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.
訂閱 Subscribing
接收消息的工作方式與上一教程中的工作方式相同,with one exception - we're going to create a new binding for each severity we're interested in.
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
Putting it all together
?EmitLogDirect.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
//..
}
ReceiveLogsDirect.java
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
5. Topics
在上一個教程中,我們改進了 記錄系統(tǒng)。而不是使用只能 虛擬廣播,我們用了一個,并獲得了一種可能性 有選擇地接收日志。fanout?direct
通過使用direct exchange 提升我們的系統(tǒng)柜,它仍然具有限制-不能基于multiple 條件進行路由
在我們的日志系統(tǒng)中,我們可能不僅要subscribe 訂閱日志 基于嚴重性,但也基于發(fā)出日志的源。 您可能從?syslog?unix 工具中知道這個概念,該工具 根據(jù)嚴重性(信息/警告/暴擊等)和設(shè)施路由日志 (auth/cron/kern...)。
這將給我們很大的靈活性——我們可能想聽聽 只有來自“cron”的嚴重錯誤,還有來自“kern”的所有日志。
為了在我們的日志系統(tǒng)中實現(xiàn)這一點,我們需要了解更多 topic exchange。
Topic exchange
信息發(fā)送到一個topic exchange 不能有任意值routing_key,必須是一個單詞列表,但通常它們指定一些特征 連接到消息,一些有效的路由,例如: "stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".可以有 路由鍵中的多個單詞,最多 255 個 字節(jié)
binding key也必須采用相同的形式。topic?exchange 背后的邏輯類似于一個 - 使用 direct的路由密鑰將傳遞到所有隊列 使用匹配的綁定鍵綁定。但是,有兩個重要的 綁定鍵的特殊情況:topic?direct
*(星號)可以完全代替一個單詞。#(hash) 可以替換零個或多個單詞。
在示例中解釋這一點是最簡單的:
我們創(chuàng)建了三個綁定:Q1 綁定了綁定鍵 “” Q2 帶有 “” 和 “”。*.orange.**.*.rabbitlazy.#
這些綁定可以概括為:
Q1 is interested in all the orange animals.Q2 wants to hear everything about rabbits, and everything about lazy animals.
路由鍵設(shè)置為“”的郵件 將傳送到兩個隊列。消息 “”也會去他們倆。另一方面 “” 將只轉(zhuǎn)到第一個隊列,并且 “”只到第二個。 僅傳遞到第二個隊列一次,即使它與兩個綁定匹配。 “” 與任何綁定都不匹配,因此它將被丟棄。quick.orange.rabbitlazy.orange.elephantquick.orange.foxlazy.brown.foxlazy.pink.rabbitquick.brown.fox
Putting it all together
EmitLogTopic.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
//..
}
ReceiveLogsTopic.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
6. RPC
,我們將使用 RabbitMQ 來構(gòu)建一個 RPC 系統(tǒng): 客戶端和可伸縮的 RPC 服務(wù)器。因為我們沒有任何耗時的東西 值得分發(fā)的任務(wù),我們將創(chuàng)建一個虛擬 RPC 返回斐波那契數(shù)列的服務(wù)。
客戶端界面
為了說明如何使用 RPC 服務(wù),我們將 創(chuàng)建一個簡單的客戶端類。它將公開一個名為 RPC 請求的方法,該方法將發(fā)送 RPC 請求并阻止,直到收到答案:call
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
回調(diào)隊列
一般來說,在 RabbitMQ 上執(zhí)行 RPC 很容易??蛻舳税l(fā)送請求 消息,服務(wù)器使用響應(yīng)消息進行回復(fù)。為了 收到我們需要發(fā)送的“回調(diào)”隊列地址的響應(yīng),其中包含 請求。我們可以使用默認隊列(在 Java 客戶端中是獨占的)。 讓我們試試看:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
我們需要這個新的導(dǎo)入:
import com.rabbitmq.client.AMQP.BasicProperties;
消息屬性
AMQP 0-9-1 協(xié)議預(yù)定義了一組 14 個屬性,這些屬性與 一條消息。大多數(shù)屬性很少使用,但 以下內(nèi)容:
deliveryMode:將消息標記為持久性(值為 ) 或瞬態(tài)(任何其他值)。你可能還記得這家酒店 從第二個教程開始。contentType:用于描述編碼的 mime 類型。 例如,對于常用的JSON編碼,這是一個很好的做法 將此屬性設(shè)置為: 。application/jsonreplyTo:常用于命名回調(diào)隊列。correlationId:用于將 RPC 響應(yīng)與請求相關(guān)聯(lián)。 RPC 將按如下方式工作: 對于 RPC 請求,客戶端發(fā)送一條具有兩個屬性的消息: ,設(shè)置為創(chuàng)建的匿名獨占隊列 只是為了請求,并且 設(shè)置為每個請求的唯一值。replyTocorrelationId請求將發(fā)送到隊列。rpc_queueRPC 工作線程(又名:服務(wù)器)正在等待該隊列上的請求。 當請求出現(xiàn)時,它會完成工作并發(fā)送一條消息,其中包含 結(jié)果返回給客戶端,使用字段中的隊列。replyTo客戶端等待應(yīng)答隊列中的數(shù)據(jù)。當消息 出現(xiàn)時,它會檢查屬性。如果匹配 請求中的值,它將響應(yīng)返回給 應(yīng)用。correlationId
RPCServer.java
import com.rabbitmq.client.*;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e);
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
}
}
RPCClient.java
mport com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.*;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException, ExecutionException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final CompletableFuture
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.complete(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.get();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
7. Publisher Confirms
使用發(fā)布者確認來使 確定已發(fā)布的消息已安全到達代理。我們會的 涵蓋使用發(fā)布者確認和解釋的幾種策略 他們的優(yōu)點和缺點。
Enabling Publisher Confirms on a Channel (在頻道上啟用發(fā)布確認)
發(fā)布者確認是 AMQP 0.9.1 協(xié)議的 RabbitMQ 擴展, 因此,默認情況下不啟用它們。發(fā)布者確認是 使用以下方法在通道級別啟用:confirmSelect
Channel channel = connection.createChannel();
channel.confirmSelect();
必須在您希望使用發(fā)布服務(wù)器的每個通道上調(diào)用此方法 證實。確認應(yīng)僅啟用一次,而不是針對發(fā)布的每條消息啟用。
策略#1:單獨發(fā)布消息
讓我們從最簡單的方法開始,使用確認進行發(fā)布, 也就是說,發(fā)布一條消息并同步等待其確認:
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}
我們像往常一樣發(fā)布一條消息并等待其 用方法確認。 確認消息后,該方法將立即返回。如果 消息在超時內(nèi)未確認,或者如果它被 nack-ed(意思是 由于某種原因,經(jīng)紀人無法處理它),該方法將 拋出異常。異常的處理通常包括 在記錄錯誤消息和/或重試發(fā)送消息時。Channel#waitForConfirmsOrDie(long)
發(fā)布者確認是異步的嗎?
我們在開頭提到經(jīng)紀人確認發(fā)布 消息是異步的,但在第一個示例中,代碼等待 同步,直到消息得到確認??蛻魧嶋H上是 接收異步確認并相應(yīng)地取消阻止對 的調(diào)用。將其視為同步幫助程序 它依賴于引擎蓋下的異步通知。waitForConfirmsOrDie?waitForConfirmsOrDie
策略 #2:批量發(fā)布消息
為了改進我們之前的示例,我們可以發(fā)布一個批處理 的消息,并等待整個批次得到確認。 以下示例使用一個批次 100:
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}
等待一批消息得到確認可以大大提高吞吐量 等待單個消息的確認(使用遠程 RabbitMQ 節(jié)點最多 20-30 次)。 一個缺點是,如果發(fā)生故障,我們不知道到底出了什么問題, 因此,我們可能需要在內(nèi)存中保留一整批內(nèi)容來記錄有意義的東西或 以重新發(fā)布消息。而且這個解決方案仍然是同步的,所以它 阻止消息的發(fā)布。
策略 #3:異步處理發(fā)布者確認
代理異步確認已發(fā)布的消息,只需要 要在客戶端上注冊回調(diào)以收到以下確認的通知:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
有 2 個回調(diào):一個用于確認的消息,一個用于 nack-ed 消息 (代理可能認為丟失的消息)。每個回調(diào)都有 2 參數(shù):
在發(fā)布之前,可以通過以下方式獲得序列號:Channel#getNextPublishSeqNo()
int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
將消息與序列號關(guān)聯(lián)的一種簡單方法是使用 地圖。假設(shè)我們想要發(fā)布字符串,因為它們很容易變成 用于發(fā)布的字節(jié)數(shù)組。下面是一個代碼示例,它使用映射來執(zhí)行以下操作 將發(fā)布序列號與郵件的字符串正文相關(guān)聯(lián):
ConcurrentNavigableMap
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
發(fā)布代碼現(xiàn)在使用地圖跟蹤出站消息。我們需要 在確認到達時清理此地圖并執(zhí)行諸如記錄警告之類的操作 當消息被 nack-ed:
ConcurrentNavigableMap
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
前面的示例包含一個回調(diào),該回調(diào)在以下情況下清理映射 確認到達。請注意,此回調(diào)同時處理單個和多個 證實。當確認到達時,將使用此回調(diào)(作為 的第一個參數(shù))。nack-ed 消息的回調(diào) 檢索郵件正文并發(fā)出警告。然后,它重用 上一次回調(diào)清理未完成的映射確認(是否 消息已確認或已確認,它們在地圖中的對應(yīng)條目 必須刪除。Channel#addConfirmListener
如何跟蹤未完成的確認?
我們的樣品使用 a 來跟蹤未完成的確認。 出于多種原因,這種數(shù)據(jù)結(jié)構(gòu)很方便。它允許 輕松將序列號與消息相關(guān)聯(lián)(無論消息數(shù)據(jù)如何) is)并輕松清理條目,直至給定的序列 ID(用于處理 多次確認/嘮叨)。最后,它支持并發(fā)訪問,因為 確認回調(diào)是在客戶端庫擁有的線程中調(diào)用的,該線程 應(yīng)與發(fā)布線程保持不同。ConcurrentNavigableMap
除了 復(fù)雜的映射實現(xiàn),例如使用簡單的并發(fā)哈希映射 以及一個用于跟蹤發(fā)布序列下限的變量,但 他們通常參與更多,不屬于教程。
總而言之,異步處理發(fā)布者確認通常需要 以下步驟:
重新發(fā)布 nack-ed 消息?
從相應(yīng)的 回調(diào),但應(yīng)避免這種情況,因為確認回調(diào)是 在不應(yīng)該有通道的 I/O 線程中調(diào)度 進行操作。更好的解決方案是在內(nèi)存中對消息進行排隊 由發(fā)布線程輪詢的隊列。像這樣的類是確認回調(diào)之間傳輸消息的一個很好的候選者 和發(fā)布線程。ConcurrentLinkedQueue
總結(jié)
某些應(yīng)用程序中,確保已發(fā)布的消息到達代理可能是必不可少的。 發(fā)布者確認是有助于滿足此要求的 RabbitMQ 功能。發(fā)行人 確認本質(zhì)上是異步的,但也可以同步處理它們。 沒有明確的方法來實現(xiàn)發(fā)布者確認,這通常會下降 應(yīng)用程序和整個系統(tǒng)中的約束。典型的技術(shù)包括:
PublisherConfirms.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BooleanSupplier;
public class PublisherConfirms {
static final int MESSAGE_COUNT = 50_000;
static Connection createConnection() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("localhost");
cf.setUsername("guest");
cf.setPassword("guest");
return cf.newConnection();
}
public static void main(String[] args) throws Exception {
publishMessagesIndividually();
publishMessagesInBatch();
handlePublishConfirmsAsynchronously();
}
static void publishMessagesIndividually() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
static void publishMessagesInBatch() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
static void handlePublishConfirmsAsynchronously() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
ConcurrentNavigableMap
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
ch.basicPublish("", queue, null, body.getBytes());
}
if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
}
long end = System.nanoTime();
System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
int waited = 0;
while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
Thread.sleep(100L);
waited += 100;
}
return condition.getAsBoolean();
}
}
MULTIPLE:這是一個布爾值。如果為 false,則僅確認/編輯一條消息,如果 true,所有序列號較低或相等的消息都將被確認/nack-ed。
提供一種將發(fā)布序列號與郵件相關(guān)聯(lián)的方法。在頻道上注冊確認監(jiān)聽器,以便在以下情況下收到通知 發(fā)布者 ack/nacks 到達以執(zhí)行適當?shù)牟僮?,例?記錄或重新發(fā)布 nack-ed 消息。發(fā)送到消息的序列號 在此步驟中,關(guān)聯(lián)機制可能還需要進行一些清理。在發(fā)布郵件之前跟蹤發(fā)布序列號。 序列號:標識已確認的數(shù)字 或嘮叨的消息。我們很快就會看到如何將其與已發(fā)布的消息相關(guān)聯(lián)。
單獨發(fā)布消息,同步等待確認:簡單,但非常 吞吐量有限。批量發(fā)布消息,等待批量同步確認:簡單、合理 吞吐量,但很難推理何時出現(xiàn)問題。異步處理:最佳性能和資源使用,在出錯時控制良好,但 可以參與正確實施。參考源碼:https://github.com/rabbitmq
柚子快報邀請碼778899分享:分布式 rabbitMq
推薦鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。