柚子快報(bào)邀請(qǐng)碼778899分享:c# RabbitMQ基礎(chǔ)應(yīng)用
柚子快報(bào)邀請(qǐng)碼778899分享:c# RabbitMQ基礎(chǔ)應(yīng)用
簡(jiǎn)介
????????RabbitMQ是一種消息中間件,它能接收并發(fā)送消息。為方便理解,可以將消息中間件認(rèn)為是郵局。用戶只需要將郵件送到郵局,郵局就能將郵件運(yùn)送到收件人手中,與此類似消息中間件能夠接收用戶發(fā)送的消息并將消息轉(zhuǎn)發(fā)給接收者。
作用
通信解耦:在RabbitMQ中,消息生產(chǎn)者和消費(fèi)者不需要感知彼此的存在,只需要與消息隊(duì)列交互,降低了生產(chǎn)者和消費(fèi)者的耦合程度。在微服務(wù)系統(tǒng)中有廣泛的應(yīng)用。流量削峰:服務(wù)器訪問(wèn)量過(guò)于龐大時(shí),可以使用RabbitMQ將請(qǐng)求暫時(shí)儲(chǔ)存在消息隊(duì)列中,之后服務(wù)器按一定的頻率從隊(duì)列中取出請(qǐng)求并處理,避免大量請(qǐng)求直接沖擊服務(wù)器。負(fù)載均衡:當(dāng)系統(tǒng)中存在多個(gè)消費(fèi)者時(shí),可以使用消息中間件將消息分發(fā)給多個(gè)消費(fèi)者處理,避免單個(gè)消費(fèi)者過(guò)載提升系統(tǒng)的處理能力。
RabbitMQ涉及的對(duì)象
產(chǎn)生消息的用戶稱為生產(chǎn)者(Producer),用字母P表示:
? ??
消息存儲(chǔ)在消息隊(duì)列中,多個(gè)生產(chǎn)者可以往同一個(gè)對(duì)列發(fā)送消息;多個(gè)消費(fèi)者也可以從同一個(gè)對(duì)列中獲取數(shù)據(jù)。消息隊(duì)列用以下符號(hào)表示:
? ??
接收消息的用戶稱為消費(fèi)者(Consumer),用字母C表示:
? ??
補(bǔ)充:生產(chǎn)者,消費(fèi)者,消息中間件不一定處于同一個(gè)服務(wù)器上。在大多數(shù)場(chǎng)景中他們都運(yùn)行在不同的服務(wù)器上,某一個(gè)應(yīng)用既可以是生產(chǎn)者也可以是消費(fèi)者。
案例一
????????以下是一個(gè)C#代碼編寫的極簡(jiǎn)RabbitMQ實(shí)例,生產(chǎn)者發(fā)送字符串“Holl World”到消息隊(duì)列中,消費(fèi)者從消息隊(duì)列中接收消息并顯示。
前置條件:本地已安裝RabbitMQ軟件,生產(chǎn)者端和消費(fèi)者端都安裝了RabbitMQ依賴包。
Nuget下載依賴包:NuGet\Install-Package RabbitMQ.Client -Version 7.0.0-rc.4
1、生產(chǎn)者端代碼
創(chuàng)建連接(connection)
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
本地安裝的RabbitMQ默認(rèn)運(yùn)行在localhost:5627端口,創(chuàng)建連接時(shí)指定參數(shù)HostName = "localhost"連接本地安裝的RabbitMQ。若要連接遠(yuǎn)程的RabbitMQ,在此修改HostName的值。
創(chuàng)建信道(channel)
var channel = await connection.CreateChannelAsync();
信道對(duì)象包含了大多數(shù)操作API。
創(chuàng)建隊(duì)列
await channel.QueueDeclareAsync(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);
隊(duì)列的名稱為“hello”,其余參數(shù)保持默認(rèn)值。
發(fā)送消息
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange:string.Empty,routingKey:"hello", body);
將消息發(fā)送給指定隊(duì)列“hello”,注意routingKey的值與隊(duì)列的名稱相同。
完整代碼Producer.cs如下:
using RabbitMQ.Client;
using System.Text;
namespace Producer
{
internal class Program
{
static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange:string.Empty,routingKey:"hello", body);
Console.WriteLine($"Producer發(fā)送消息:{message}");
Console.ReadLine();
}
}
}
2、消費(fèi)者端代碼
創(chuàng)建連接和信道
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
創(chuàng)建隊(duì)列
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
消費(fèi)者端可能比生產(chǎn)者端先執(zhí)行,在消費(fèi)者端重新聲明隊(duì)列可以保證消費(fèi)者接收消息前隊(duì)列存在。且消息隊(duì)列是冪等的,多次聲明不會(huì)對(duì)已存在的隊(duì)列產(chǎn)生影響。
創(chuàng)建consumer對(duì)象并編寫回調(diào)函數(shù)
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Consumer接收信息:{message}");
};
編寫處理消息的回調(diào)函數(shù)。并將回調(diào)函數(shù)合并到consumer對(duì)象的事件中。
為消息隊(duì)列指定消費(fèi)者對(duì)象
await channel.BasicConsumeAsync(queue: "hello",autoAck: true,consumer: consumer);
消費(fèi)者對(duì)象接收到消息時(shí)會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。
完整代碼Consumer.cs如下:
namespace Consumer
{
internal class Program
{
static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Consumer接收信息:{message}");
};
await channel.BasicConsumeAsync(queue: "hello",autoAck: true,consumer: consumer);
Console.ReadLine();
}
}
}
補(bǔ)充:最新的RabbitMQ依賴包將方法全都優(yōu)化為異步方法,因此上述使用了大量Async方法。
案例二
? ? ? ? 使用RabbitMQ實(shí)現(xiàn) “發(fā)布/訂閱” 模型,一個(gè)生產(chǎn)者同時(shí)給兩個(gè)消費(fèi)者發(fā)送不同的消息。
? ? ? ? 在案例一中我們實(shí)現(xiàn)了生產(chǎn)者直接發(fā)送消息給隊(duì)列,隊(duì)列又將消息被轉(zhuǎn)發(fā)給了一個(gè)明確的消費(fèi)者,實(shí)際上這并不是RabbitMQ列常見(jiàn)的工作模式。
? ? ? ? 一般情況下生產(chǎn)者并不會(huì)直接給消息隊(duì)列發(fā)送消息,也不知道消息會(huì)被送到哪一個(gè)隊(duì)列中。真實(shí)的情況是生產(chǎn)者將消息發(fā)送給交換機(jī)(Exchanges)。交換機(jī)就好比是郵遞員,顧客寄郵件不需要知道郵局的地址,只需要將郵件交給上門取件的郵遞員就能完成寄件。生產(chǎn)者只需要將消息送給交換機(jī),后續(xù)由交換機(jī)決定接下來(lái)的操作。是將消息添加到一個(gè)特定的隊(duì)列中?還是將添加到多個(gè)隊(duì)列中?亦或是將消息丟棄?在聲明交換機(jī)時(shí)可以通過(guò)ExchangeType參數(shù)配置。
交換機(jī)(Exchange)用字母X表示:
? ??
監(jiān)聽(tīng)交換機(jī)
rabbitmqctl list_exchanges
在命令行窗口輸入以上指令,查看RabbitMQ節(jié)點(diǎn)上的交換機(jī)。
補(bǔ)充:
使用rabbitmqctl指令前需要安裝Erlang環(huán)境,并將RabbitMQ的sbin路徑添加進(jìn)環(huán)境變量中。建議安裝Erlang和RabbitMQ時(shí)采用默認(rèn)的安裝路徑,否則可能出現(xiàn)本地erlang.cookie和RabbitMQ節(jié)點(diǎn)的erlang.cookie不一致,導(dǎo)致無(wú)法使用命令行指令。自定義安裝路徑導(dǎo)致cookie校驗(yàn)無(wú)法通過(guò)時(shí),可以嘗試以下操作:
??找到以下兩個(gè)文件,用后者的cookie文件替換掉前者的cookie文件。
C:\Windows\System32\config\systemprofile\.erlang.cookie
C:\用戶\你的用戶名\.erlang.cookie。
默認(rèn)交換機(jī)
在案例一中我們并未聲明交換機(jī)依然可以將消息發(fā)送到隊(duì)列中,是因?yàn)樯a(chǎn)者在發(fā)送消息時(shí)使用了RabbitMQ服務(wù)器默認(rèn)的交換機(jī)。上述以 amq.* 標(biāo)識(shí)的交換機(jī)就是默認(rèn)交換機(jī)。
await channel.BasicPublishAsync(exchange:string.Empty,routingKey:"hello", body);
第一個(gè)參數(shù)是交換機(jī)的名稱,值為空串表示將消息發(fā)送給默認(rèn)的交換機(jī)。再由默認(rèn)的交換將消息發(fā)送給與routingKey同名的隊(duì)列。
注意:routingKey只是值與隊(duì)列名稱相同,二者是不同的屬性。
創(chuàng)建交換機(jī)
await channel.ExchangeDeclareAsync(exchange:"X", ExchangeType.Direct);
ExchangeType有4種類型
1、Direct:生產(chǎn)者發(fā)送消息使用的routingKey與交換機(jī)綁定隊(duì)列使用的routingKey相同時(shí),交換機(jī)才會(huì)發(fā)送消息到該隊(duì)列。
2、Fanout:忽略routingKey的值,交換機(jī)將消息發(fā)送到所有與自己綁定的隊(duì)列(廣播通信)。
3、Topic:使用routingKey進(jìn)行通配符匹配,類似模糊查詢的匹配方式。
4、Headers:交換機(jī)不使用routingKey進(jìn)行匹配,轉(zhuǎn)而使用消息的頭部消息進(jìn)行匹配。
創(chuàng)建隊(duì)列
await channel.QueueDeclareAsync(queue:"Q1",durable: false, exclusive: false, autoDelete: false);
聲明隊(duì)列方法有4個(gè)參數(shù)
1、queue:設(shè)置隊(duì)列名稱。
2、durable:RabbitMQ節(jié)點(diǎn)重啟時(shí)隊(duì)列是否保留。
3、exclusive:沒(méi)有消費(fèi)者連接時(shí)隊(duì)列時(shí)刪除隊(duì)列。
4、auto-delete:沒(méi)有消費(fèi)者訂閱隊(duì)列時(shí)刪除隊(duì)列。
RabbitMQ參數(shù)文檔:Queues | RabbitMQ
交換機(jī)與隊(duì)列綁定
創(chuàng)建了交換機(jī)和隊(duì)列后,我們需要告訴交換機(jī)將消息發(fā)送給哪個(gè)隊(duì)列。交換機(jī)和隊(duì)列間的關(guān)系稱為綁定。
await channel.QueueBindAsync(queue: "Q1", exchange: "X",routingKey:"Q1_key");
生產(chǎn)者發(fā)送消息
await channel.BasicPublishAsync(exchange: "X",body:Encoding.UTF8.GetBytes("C1你好"), routingKey: "Q1_key");
發(fā)送消息的routingKey需要與交換機(jī)綁定隊(duì)列的routingKey保持一致。
1、生產(chǎn)者端代碼
namespace P
{
internal class P
{
static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync("X", ExchangeType.Direct);
await channel.QueueDeclareAsync(queue:"Q1",durable: false, exclusive: false, autoDelete: false);
await channel.QueueDeclareAsync(queue: "Q2", durable: false, exclusive: false, autoDelete: false);
await channel.QueueBindAsync(queue: "Q1", exchange: "X",routingKey:"Q1_key");
await channel.QueueBindAsync(queue: "Q2", exchange: "X", routingKey: "Q2_key");
string message1 = "C1你好";
string message2 = "C2你好";
await channel.BasicPublishAsync(exchange: "X",body:Encoding.UTF8.GetBytes(message1), routingKey: "Q1_key");
Console.WriteLine($"P發(fā)送消息:{message1}");
await channel.BasicPublishAsync(exchange: "X", body: Encoding.UTF8.GetBytes(message2), routingKey: "Q2_key");
Console.WriteLine($"P發(fā)送消息:{message2}");
Console.ReadLine();
}
}
}
2、消費(fèi)者C1
namespace C1
{
internal class C1
{
static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync("X", ExchangeType.Direct);
await channel.QueueDeclareAsync(queue: "Q1", durable: false, exclusive: false, autoDelete: false);
await channel.QueueBindAsync(queue: "Q1", exchange: "X", routingKey: "Q1_key");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"C1接收到信息:{message}");
};
await channel.BasicConsumeAsync(queue: "Q1", autoAck: true, consumer: consumer);
Console.ReadLine();
}
}
}
3、消費(fèi)者C2
namespace C2
{
internal class C2
{
static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync("X", ExchangeType.Direct);
await channel.QueueDeclareAsync(queue: "Q2", durable: false, exclusive: false, autoDelete: false);
await channel.QueueBindAsync(queue: "Q2", exchange: "X", routingKey: "Q2_key");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"C2接收到信息:{message}");
};
await channel.BasicConsumeAsync(queue: "Q2", autoAck: true, consumer: consumer);
Console.ReadLine();
}
}
}
補(bǔ)充:消費(fèi)者端創(chuàng)建隊(duì)列和交換機(jī)的步驟是可以省略,前提是保證生產(chǎn)者端的代碼先執(zhí)行。
4、輸出結(jié)果
補(bǔ)充
RabbitMQ官方文檔:RabbitMQ Documentation | RabbitMQhttps://www.rabbitmq.com/docs
柚子快報(bào)邀請(qǐng)碼778899分享:c# RabbitMQ基礎(chǔ)應(yīng)用
相關(guān)文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。