欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:Golang學(xué)習(xí)筆記

柚子快報(bào)激活碼778899分享:Golang學(xué)習(xí)筆記

http://yzkb.51969.com/

RabbitMQ 簡(jiǎn)介

實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(Advanced Message Queuing Protcol)AMQP消息隊(duì)列中間件的作用(Redis實(shí)現(xiàn)MQ里面有寫過(guò),這里簡(jiǎn)單帶過(guò))

解耦削峰異步處理緩存消息通信提高擴(kuò)展性

RabbitMQ 架構(gòu)理解

#mermaid-svg-qMZy3zJ1gpoZK2gq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .error-icon{fill:#552222;}#mermaid-svg-qMZy3zJ1gpoZK2gq .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-qMZy3zJ1gpoZK2gq .marker{fill:#333333;stroke:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .marker.cross{stroke:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster-label text{fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster-label span{color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .label text,#mermaid-svg-qMZy3zJ1gpoZK2gq span{fill:#333;color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node rect,#mermaid-svg-qMZy3zJ1gpoZK2gq .node circle,#mermaid-svg-qMZy3zJ1gpoZK2gq .node ellipse,#mermaid-svg-qMZy3zJ1gpoZK2gq .node polygon,#mermaid-svg-qMZy3zJ1gpoZK2gq .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node .label{text-align:center;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node.clickable{cursor:pointer;}#mermaid-svg-qMZy3zJ1gpoZK2gq .arrowheadPath{fill:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster text{fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster span{color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-qMZy3zJ1gpoZK2gq :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

channel

binding

channel

channel

channel

Producer 生產(chǎn)者

Exchange交換機(jī)

Queue消息隊(duì)列

Consumer消費(fèi)者

Consumer消費(fèi)者

Consumer消費(fèi)者

binding(綁定):交換機(jī)將消息路由給Queue所遵循的規(guī)則,可以定義一個(gè)路由鍵,用于交換機(jī)篩選特定的Queue

Routing_Key(路由鍵):Producer 和 Consumer 協(xié)商一致的 key 策略。主要在交換機(jī)的 direct(直連)和 topic(主題) 模式下使用,fanout(廣播)模式下不使用Routing_Key Exchange(交換機(jī)):主要功能是分發(fā)消息給特定的Queue,只負(fù)責(zé)轉(zhuǎn)發(fā),不具備存儲(chǔ)消息的功能。Exchange有以下四種模式:

direct(直連模式),根據(jù)攜帶的Routing_Key來(lái)篩選特定的Queue進(jìn)行消息投遞。是RabbitMQ的默認(rèn)類型,可以不指定Routing_Key,在創(chuàng)建時(shí)會(huì)默認(rèn)生成與Queue重名。hander(頭模式),使用場(chǎng)景不多,消息路由涉及多個(gè)屬性的時(shí)候,交換機(jī)使用多屬性來(lái)代替Routing_key建立路由規(guī)則,還可以定義匹配單詞的個(gè)數(shù),例如any為有一個(gè)單詞滿足條件就匹配成功。all為所有單詞都滿足條件才匹配成功。fanout(廣播模式),不看Routing_Key。只根據(jù)Exchange和Queue的binding情況來(lái)分發(fā)信息。所有與之binding的queue都將接收到同一條消息。topic(主題模式),相當(dāng)于模糊查詢。topic的routing_key是使用 . 來(lái)進(jìn)行隔斷的。有兩種匹配方法:

" * " 匹配一個(gè)單詞,例子如下" # " 匹配0個(gè)~多個(gè)單詞,例子如下

rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic

rabbitMQ.# == rabbit.topic == rabbit.topic.topic

Queue(消息隊(duì)列的存儲(chǔ)數(shù)據(jù)結(jié)構(gòu)):

存儲(chǔ)方式:

持久化,在Server本地硬盤存儲(chǔ)一份臨時(shí)隊(duì)列,重啟后丟失數(shù)據(jù)自動(dòng)刪除,不存在用戶連接則刪除queue 隊(duì)列對(duì)ACK請(qǐng)求的不同情況

consumer 接收并 ack,queue 刪除數(shù)據(jù)并向 consumer 發(fā)送新消息consumer 接收但是未 ack 就斷開(kāi)了連接,queue 會(huì)認(rèn)為消息并未傳送成功,consumer 再次連接時(shí)會(huì)重新發(fā)送消息如果consumer 接收消息成功 ,但是忘記 ack 則 queue 不會(huì)重復(fù)發(fā)送消息如果 consumer 拒收消息,則 queue 會(huì)向另外滿足條件的 consumer 繼續(xù)發(fā)送這條消息

RabbitMQ 工作流程

Producer方向

Producer 與 RabbitMQ Broker 建立連接,開(kāi)啟一個(gè)信道 channel聲明交換機(jī)并設(shè)置屬性(交換機(jī)類型、持久化等)聲明Queue并設(shè)置屬性(持久化,自動(dòng)刪除等)通過(guò)Routing_key來(lái)binding交換機(jī)和Queue發(fā)送信息給交換,交換機(jī)根據(jù)Routing_key來(lái)確認(rèn)投遞的queue查找成功后將消息存到queue查找失敗將消息丟棄或拋回給生產(chǎn)者關(guān)閉channel

Consumer方向

與 queue 建立連接,開(kāi)啟channel向queue請(qǐng)求隊(duì)列中的msg等待queue回應(yīng),開(kāi)始接收消息消息處理完成后 返回回調(diào)確認(rèn)ackqueue 將確認(rèn)的消息從隊(duì)列中刪除關(guān)閉channel

RabbitMQ的兩種部署方式

Meta Data : 元數(shù)據(jù)(描述數(shù)據(jù)的數(shù)據(jù))

vhost meta data : 為Queue、Exchange、Binding提供命名空間級(jí)別的隔離exchange meta data:記錄路由的名稱類型和屬性binding mate data:映射 routing_key和queue之間的綁定關(guān)系queue mate data:表隊(duì)列名稱和屬性

普通模式

對(duì)于該模式的兩個(gè)節(jié)點(diǎn),消息只會(huì)存在其中一個(gè)節(jié)點(diǎn),另一個(gè)節(jié)點(diǎn)只保存mate data,當(dāng)consumer 連接節(jié)點(diǎn)2訪問(wèn)節(jié)點(diǎn)1的數(shù)據(jù)信息時(shí),消息會(huì)在兩個(gè)節(jié)點(diǎn)中傳遞。 該模式下p和c應(yīng)盡量連接每個(gè)節(jié)點(diǎn),這樣起到線性拓展的作用。 但存在一個(gè)問(wèn)題,如果節(jié)點(diǎn)上還有未消費(fèi)的消息,但是節(jié)點(diǎn)掛了。如果節(jié)點(diǎn)設(shè)置了持久化,則需要在節(jié)點(diǎn)重啟的時(shí)候消息才會(huì)恢復(fù)。如果未設(shè)置持久化,則消息會(huì)丟失。

鏡像模式

消息存在多個(gè)節(jié)點(diǎn)中,消息會(huì)在節(jié)點(diǎn)與節(jié)點(diǎn)之間同步,可實(shí)現(xiàn)高可用(當(dāng)一個(gè)節(jié)點(diǎn)掛了,另一個(gè)節(jié)點(diǎn)可以接替其位置,繼續(xù)工作)但會(huì)降低性能,因?yàn)榇罅肯⑦M(jìn)入和同步,會(huì)占用大量帶寬,但是為了保證高可靠性需要取舍。

面試題

Q:如何保證消息不被重復(fù)消費(fèi)?

A:MQ通過(guò)確認(rèn)機(jī)制ACK,進(jìn)行確認(rèn)。確認(rèn)后消息從queue中刪除,保證消息不被重復(fù)消費(fèi)的。如果因?yàn)榫W(wǎng)絡(luò)原因ack沒(méi)有成功發(fā)出,導(dǎo)致消息重新投遞??梢允褂萌治ㄒ幌d來(lái)避免。

消息發(fā)送者發(fā)送消息時(shí)攜帶一個(gè)全局唯一的消息id消費(fèi)者監(jiān)聽(tīng)到消息后,根據(jù)id在redis或者db中查詢是否存在消費(fèi)記錄如果沒(méi)有消費(fèi)就正常消費(fèi),消費(fèi)完畢后,寫入redis或者db如果消息消費(fèi)過(guò)則直接丟棄 Q:如何保證消息的消費(fèi)順序?

A:RabbitMQ中存在一個(gè)設(shè)置,叫獨(dú)占隊(duì)列。即在同一時(shí)間只有一個(gè)消費(fèi)者會(huì)消費(fèi)消息。從而制止了異步操作,保證消費(fèi)順序?;蛘咭粋€(gè)Producer對(duì)一個(gè)Consumer Q:如何保證數(shù)據(jù)一致性?

A:因?yàn)镸Q的使用場(chǎng)景多為分布式系統(tǒng),所以一般不追求強(qiáng)一致性。而保證最終一致性就可以。而保證數(shù)據(jù)最終一致性,可以采用消息補(bǔ)償機(jī)制。即消息在消費(fèi)者處理完之后調(diào)用生產(chǎn)者的API修改數(shù)據(jù)狀態(tài)。如未調(diào)用API則判斷為消息處理失敗或出錯(cuò)。此時(shí)間隔一段時(shí)間后重新投遞消息進(jìn)行再次操作。消費(fèi)者收到消息,處理完畢后,發(fā)送一條響應(yīng)消息給生產(chǎn)者也是消息補(bǔ)償機(jī)制,本意是確認(rèn)消費(fèi)者成功消費(fèi)消息。ACK也是處理方法

RabbitMQ的使用(Golang使用amqp包)

代碼部分參考 upup小亮的博客

代碼只是簡(jiǎn)單的操作,主要是熟悉流程。對(duì)于如何創(chuàng)建Queue和綁定Exchange之類的操作有個(gè)了解。

Simple(簡(jiǎn)單收發(fā)模式,只有一個(gè)Queue)

Simple運(yùn)行機(jī)制與WorkQueue相似,只是一個(gè)Consumer與多個(gè)Consumer的區(qū)別。多個(gè)Consumer之間存在競(jìng)爭(zhēng)關(guān)系,所以工作隊(duì)列是創(chuàng)建多個(gè)Consumer,多個(gè)競(jìng)爭(zhēng)只有一個(gè)可以獲取消息消費(fèi)。消費(fèi)成功后ack消息刪除。 演示代碼放到一起了:

WorkQueue 工作隊(duì)列

生產(chǎn)者

// simple and work queue

func main2() {

// 連接到 rabbitMQ

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建連接:%s", err)

return

}

// 默認(rèn)關(guān)閉

defer conn.Close()

// 創(chuàng)建通道Channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建channel:%s", err)

return

}

// 通道關(guān)閉

defer ch.Close()

// 創(chuàng)建存儲(chǔ)隊(duì)列

queue, err := ch.QueueDeclare(

"hello", // 隊(duì)列名稱

false, // 持久化設(shè)置,可以為true根據(jù)需求選擇

false, // 自動(dòng)刪除,沒(méi)有用戶連接刪除queue一般不選用

false, //獨(dú)占

false, //等待服務(wù)器確認(rèn)

nil) //參數(shù)

if err != nil {

fmt.Println(err)

log.Fatalf("無(wú)法聲明隊(duì)列:%s", err)

return

}

var body string

// 發(fā)送信息

for i := 0; i < 10; i++ {

fmt.Println(i)

body = "Hello RabbitMQ" + string(i)

err = ch.Publish(

"",

queue.Name,

false, // 必須發(fā)送到消息隊(duì)列

false, // 不等待服務(wù)器確認(rèn)

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

})

if err != nil {

log.Fatalf("消息生產(chǎn)失敗:%s", err)

continue

}

}

}

消費(fèi)者

// create conn

// 如果同時(shí)運(yùn)行兩個(gè)這樣的consumer代碼,就是工作隊(duì)列。只有一個(gè)consumer就是simple

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建連接:%s", err)

return

}

defer conn.Close()

// create channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建channel:%s", err)

return

}

defer ch.Close()

// create queue

queue, err := ch.QueueDeclare(

"hello",

false,

false,

false,

false,

nil)

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建queue:%s", err)

return

}

// 消費(fèi)信息

msgs, err := ch.Consume(

queue.Name,

"",

true,

false,

false,

false,

nil)

if err != nil {

log.Fatalf("無(wú)法消費(fèi)信息:%s", err)

return

}

for msg := range msgs {

log.Println(string(msg.Body))

}

return

pub/sub 發(fā)布訂閱模式

發(fā)布訂閱模式可以創(chuàng)建兩個(gè)Queue,綁定到同一個(gè)Exchange中 生產(chǎn)者這邊只需要跟交換機(jī)對(duì)接,而交換機(jī)類型為fanout:

func main() {

// 連接到 rabbitMQ

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建連接:%s", err)

}

// 默認(rèn)關(guān)閉

defer conn.Close()

// 創(chuàng)建通道Channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建channel:%s", err)

}

defer ch.Close()

// create exchange

ex := ch.ExchangeDeclare(

"exchange1", // 交換機(jī)名稱

"fanout", // 交換機(jī)類型

true, // 是否持久化

false, // 是否自動(dòng)刪除

false, // 是否內(nèi)部使用

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

fmt.Println(ex)

body := "Hello RabbitMQ for Pub/Sub"

err = ch.Publish(

"exchange1",

"", // routing key 可以為空,因?yàn)閒anout不看routing key

false,

false,

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

})

if err != nil {

log.Fatalf("err %s:", err)

}

log.Println(body)

}

消費(fèi)者:創(chuàng)建交換機(jī),類型為fanout,創(chuàng)建隊(duì)列,綁定交換機(jī)(創(chuàng)建多個(gè)consumer綁定同一個(gè)queue和同一個(gè)交換機(jī)。這樣發(fā)送一個(gè)消息,所有的consumer都能收到。== 發(fā)布訂閱模型)

// Pub/Sub

// Create conn

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf(err)

}

defer conn.Close()

// channel create

ch, err := conn.Channel()

if err != nil{

log.Fatalf(err)

}

defer ch.Close()

// exchange create

ex := ch.ExchangeDeclare(

"exchange1",

"fanout",

true,

false,

false,

false,

nil)

fmt.Println(ex)

// queue create

queue, err := ch.QueueDeclare(

"hello",

false,

false,

false,

false,

nil)

if err != nil{

log.Fatalf(err)

}

err = ch.QueueBind(

queue.Name,

"",

"exchange1",

false,

nil)

if err != nil{

log.Fatalf(err)

}

msgs, err := ch.Consume(

queue.Name,

"",

true,

false,

false,

false,

nil)

if err != nil{

log.Fatalf(err)

}

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf("Waiting for messages. To exit press CTRL+C")

<-make(chan struct{}) // 阻塞主goroutine

}

Routing 模式(對(duì)特定的隊(duì)列投遞消息)

生產(chǎn)者

func main() {

// 連接到 rabbitMQ

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建連接:%s", err)

}

// 默認(rèn)關(guān)閉

defer conn.Close()

// 創(chuàng)建通道Channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無(wú)法創(chuàng)建channel:%s", err)

}

defer ch.Close()

// create exchange

ex := ch.ExchangeDeclare(

"exchange1", // 交換機(jī)名稱

"direct", // 交換機(jī)類型

true, // 是否持久化

false, // 是否自動(dòng)刪除

false, // 是否內(nèi)部使用

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

fmt.Println(ex)

body := "Hello RabbitMQ for direct routing"

// 發(fā)布消息到交換機(jī),并指定路由鍵

err = ch.Publish(

"logs_direct", // 交換機(jī)名稱

"routing_key", // 路由鍵

false, // 是否等待服務(wù)器響應(yīng)

false, // 是否立即將消息寫入磁盤

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

},

)

if err != nil{

log.Fatalf("無(wú)法創(chuàng)建send msg:%s", err)

}

log.Printf("Sent message: %s", message)

消費(fèi)者

func main() {

// 連接到RabbitMQ服務(wù)器

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf("無(wú)法創(chuàng)建send msg:%s", err)

}

defer conn.Close()

// 創(chuàng)建一個(gè)通道

ch, err := conn.Channel()

if err != nil{

log.Fatalf("無(wú)法創(chuàng)建send msg:%s", err)

}

defer ch.Close()

// 聲明一個(gè)交換機(jī)

err = ch.ExchangeDeclare(

"logs_direct", // 交換機(jī)名稱

"direct", // 交換機(jī)類型

true, // 是否持久化

false, // 是否自動(dòng)刪除

false, // 是否內(nèi)部使用

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf("無(wú)法創(chuàng)建send msg:%s", err)

}

// 聲明一個(gè)臨時(shí)隊(duì)列

q, err := ch.QueueDeclare(

"", // 隊(duì)列名稱,留空表示由RabbitMQ自動(dòng)生成,因?yàn)槎x了key所以隊(duì)列名可以是隨意的,畢竟是依靠key來(lái)進(jìn)行匹配的

false, // 是否持久化

false, // 是否自動(dòng)刪除(當(dāng)沒(méi)有任何消費(fèi)者連接時(shí))

true, // 是否排他隊(duì)列(僅限于當(dāng)前連接)

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

// 將隊(duì)列綁定到交換機(jī)上,并指定要接收的路由鍵

err = ch.QueueBind(

q.Name, // 隊(duì)列名稱

"routing_key", // 路由鍵

"logs_direct", // 交換機(jī)名稱

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf("無(wú)法創(chuàng)建send msg:%s", err)

}

// 訂閱消息

msgs, err := ch.Consume(

q.Name, // 隊(duì)列名稱

"", // 消費(fèi)者標(biāo)識(shí)符,留空表示由RabbitMQ自動(dòng)生成

true, // 是否自動(dòng)應(yīng)答

false, // 是否獨(dú)占模式(僅限于當(dāng)前連接)

false, // 是否等待服務(wù)器響應(yīng)

false, // 其他屬性

nil, // 其他屬性

)

failOnError(err, "Failed to register a consumer")

// 接收消息的goroutine

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf("Waiting for messages. To exit press CTRL+C")

<-make(chan struct{}) // 阻塞主goroutine

topic

func main() {

// 連接到RabbitMQ服務(wù)器

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf(err)

}

defer conn.Close()

// 創(chuàng)建一個(gè)通道

ch, err := conn.Channel()

if err != nil{

log.Fatalf(err)

}

defer ch.Close()

// 聲明一個(gè)交換機(jī)

err = ch.ExchangeDeclare(

"logs_topic", // 交換機(jī)名稱

"topic", // 交換機(jī)類型

true, // 是否持久化

false, // 是否自動(dòng)刪除

false, // 是否內(nèi)部使用

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 定義要發(fā)送的消息的路由鍵和內(nèi)容

routingKey := "example.key.das"

message := "Hello, RabbitMQ!"

// 發(fā)布消息到交換機(jī),并指定路由鍵

err = ch.Publish(

"logs_topic", // 交換機(jī)名稱

routingKey, // 路由鍵

false, // 是否等待服務(wù)器響應(yīng)

false, // 是否立即發(fā)送

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(message),

},

)

if err != nil{

log.Fatalf(err)

}

log.Printf("Sent message: %s", message)

}

消費(fèi)者

func main() {

// 連接到RabbitMQ服務(wù)器

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf(err)

}

defer conn.Close()

// 創(chuàng)建一個(gè)通道

ch, err := conn.Channel()

if err != nil{

log.Fatalf(err)

}

defer ch.Close()

// 聲明一個(gè)交換機(jī)

err = ch.ExchangeDeclare(

"logs_topic", // 交換機(jī)名稱

"topic", // 交換機(jī)類型

true, // 是否持久化

false, // 是否自動(dòng)刪除

false, // 是否內(nèi)部使用

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 聲明一個(gè)臨時(shí)隊(duì)列

q, err := ch.QueueDeclare(

"", // 隊(duì)列名稱,留空表示由RabbitMQ自動(dòng)生成

false, // 是否持久化

false, // 是否自動(dòng)刪除(當(dāng)沒(méi)有任何消費(fèi)者連接時(shí))

true, // 是否排他隊(duì)列(僅限于當(dāng)前連接)

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 將隊(duì)列綁定到交換機(jī)上,并指定要接收的路由鍵

err = ch.QueueBind(

q.Name, // 隊(duì)列名稱

"example.#", // 路由鍵,可以使用通配符*匹配一個(gè)單詞

"logs_topic", // 交換機(jī)名稱

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 創(chuàng)建一個(gè)消費(fèi)者通道

msgs, err := ch.Consume(

q.Name, // 隊(duì)列名稱

"", // 消費(fèi)者標(biāo)識(shí)符,留空表示由RabbitMQ自動(dòng)生成

true, // 是否自動(dòng)應(yīng)答

false, // 是否排他消費(fèi)者

false, // 是否阻塞

false, // 是否等待服務(wù)器響應(yīng)

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 接收和處理消息

forever := make(chan bool)

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf("Waiting for messages...")

// 阻塞

<-forever

}

柚子快報(bào)激活碼778899分享:Golang學(xué)習(xí)筆記

http://yzkb.51969.com/

好文閱讀

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19497780.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄