柚子快報激活碼778899分享:Kafka分布式數(shù)據(jù)處理平臺
柚子快報激活碼778899分享:Kafka分布式數(shù)據(jù)處理平臺
目錄
一.消息隊列基本介紹
1.為什么需要消息隊列
2.使用消息隊列的好處
2.1 解耦
耦合(非解耦)
解耦
2.2 可恢復(fù)性
2.3 緩沖
2.4 靈活性 & 峰值處理能力
2.5 異步通信
3.消息隊列的兩種模式
3.1?點對點模式
3.2?發(fā)布/訂閱模式
二.Kafka基本介紹
1.Kafka是什么?
2.Kafka的特性
3.Kafka系統(tǒng)架構(gòu)
3.1 Broker(服務(wù)代理節(jié)點)
3.2 Producer(生產(chǎn)者)
3.3 Consumer(消費者)
3.4 Consumer Group(消費組)
3.5 ZooKeeper
3.6 Topic(主題)
3.7 Partition(分區(qū))
3.8 Replica(副本)
3.9 Leader and Follower
3.10 Offset(偏移量)
三.部署ZooKeeper+Kafka集群
1.環(huán)境準備
2.下載安裝安裝包
?編輯
3.修改配置文件
4.設(shè)置環(huán)境變量
5.配置ZooKeeper啟動腳本
?編輯6.設(shè)置開機自啟并啟動
7.Kafka命令行操作
7.1?創(chuàng)建topic
7.2?查看當前服務(wù)器中的所有topic
7.3 查看某個topic的詳情
7.4?發(fā)布消息
7.5 消費消息
7.6?修改分區(qū)數(shù)
7.7?刪除topic
一.消息隊列基本介紹
1.為什么需要消息隊列
主要原因是由于在高并發(fā)環(huán)境下,同步請求來不及處理,請求往往會發(fā)生阻塞。比如大量的請求并發(fā)訪問數(shù)據(jù)庫,導(dǎo)致行鎖表鎖,最后請求線程會堆積過多, 從而觸發(fā) too many connection 錯誤, 引發(fā)雪崩效應(yīng)。 我們使用消息隊列,通過異步處理請求,從而緩解系統(tǒng)的壓力。消息隊列常應(yīng)用于異步處理,流量削峰,應(yīng)用解耦,消息通訊等場景當前比較常見的 MQ 中間件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
2.使用消息隊列的好處
2.1 解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
耦合(非解耦)
耦合是指兩個或兩個以上的體系或兩種運動形式間通過相互作用而彼此影響以至聯(lián)合起來的現(xiàn)象。在軟件工程中,對象之間的耦合度就是對象之間的依賴性。對象之間的耦合越高,維護成本越高,因此對象的設(shè)計應(yīng)使類和構(gòu)件之間的耦合最小。分類:有軟硬件之間的耦合,還有軟件各模塊之間的耦合。耦合性是程序結(jié)構(gòu)中各個模塊之間相互關(guān)聯(lián)的度量。它取決于各個模塊之間的接口的復(fù)雜程度、調(diào)用模塊的方式以及哪些信息通過接口。
解耦
解耦,字面意思就是解除耦合關(guān)系。在軟件工程中,降低耦合度即可以理解為解耦,模塊間有依賴關(guān)系必然存在耦合,理論上的絕對零耦合是做不到的,但可以通過一些現(xiàn)有的方法將耦合度降至最低。設(shè)計的核心思想:盡可能減少代碼耦合,如果發(fā)現(xiàn)代碼耦合,就要采取解耦技術(shù)。讓數(shù)據(jù)模型,業(yè)務(wù)邏輯和視圖顯示三層之間彼此降低耦合,把關(guān)聯(lián)依賴降到最低,而不至于牽一發(fā)而動全身。原則就是A功能的代碼不要寫在B的功能代碼中,如果兩者之間需要交互,可以通過接口,通過消息,甚至可以引入框架,但總之就是不要直接交叉寫。觀察者模式:觀察者模式存在的意義就是「解耦」,它使觀察者和被觀察者的邏輯不再攪在一起,而是彼此獨立、互不依賴。比如網(wǎng)易新聞的夜間模式,當用戶切換成夜間模式之后,被觀察者會通知所有的觀察者「設(shè)置改變了,大家快蒙上遮罩吧」。QQ消息推送來了之后,既要在通知欄上彈個推送,又要在桌面上標個小紅點,也是觀察者與被觀察者的巧妙配合。
2.2 可恢復(fù)性
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
2.3 緩沖
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
2.4 靈活性 & 峰值處理能力
在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。
2.5 異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
?
同步是指:發(fā)送方發(fā)出數(shù)據(jù)后,等接收方發(fā)回響應(yīng)以后才發(fā)下一個數(shù)據(jù)包的通訊方式。 ?異步是指:發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包的通訊方式
3.消息隊列的兩種模式
3.1?點對點模式
一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除
消息生產(chǎn)者生產(chǎn)消息發(fā)送到消息隊列中, 然后消息消費者從消息隊列中取出并且消費消息。 消息被消費以后, 消息隊列中不再有存儲,所以消息消費者不可能消費到已經(jīng)被消費的消息。消息隊列支持存在多個消費者, 但是對一個消息而言,只會有一個消費者可以消費。
?
每個消息只有一個接收者(Consumer)(即一旦被消費,消息就不再在消息隊列中) 發(fā)送者和接收者間沒有依賴性,發(fā)送者發(fā)送消息之后,不管有沒有接收者在運行,都不會影響到發(fā)送者下次發(fā)送消息 接收者在成功接收消息之后需向隊列應(yīng)答成功,以便消息隊列刪除當前接收的消息
3.2?發(fā)布/訂閱模式
一對多, 又叫觀察者模式,消費者消費數(shù)據(jù)之后不會清除消息
?
每個消息可以有多個訂閱者 發(fā)布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息 為了消費消息,訂閱者需要提前訂閱該角色主題,并保持在線運行
二.Kafka基本介紹
1.Kafka是什么?
Kafka 是一個分布式的基于發(fā)布/訂閱模式的消息隊列(MQ,Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。 最初由 Linkedin 公司開發(fā),是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于 Zookeeper 協(xié)調(diào)的分布式消息中間件系統(tǒng),它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景,比如基于 hadoop 的批處理系統(tǒng)、低延遲的實時系統(tǒng)、Spark/Flink 流式處理引擎,nginx 訪問日志,消息服務(wù)等等,用 scala 語言編寫,Linkedin 于 2010 年貢獻給了 Apache 基金會并成為頂級開源項目。
2.Kafka的特性
高吞吐量、低延遲 Kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒。每個 topic 可以分多個 Partition,Consumer Group 對 Partition 進行消費操作,提高負載均衡能力和消費能力。 可擴展性 kafka 集群支持熱擴展 持久性、可靠性 消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失 容錯性 允許集群中節(jié)點失敗(多副本情況下,若副本數(shù)量為 n,則允許 n-1 個節(jié)點失敗) 高并發(fā) 支持數(shù)干個客戶端同時讀寫
3.Kafka系統(tǒng)架構(gòu)
?
生產(chǎn)者生產(chǎn)數(shù)據(jù)傳給broker即kafka服務(wù)器集群kafka集群將數(shù)據(jù)存儲在topic主題中,每個topic主題中有多個分片(分片做了備份在其他topic)分片中存儲數(shù)據(jù),kafka集群注冊在zookeeper中,zookeeper通知消費者kafka服務(wù)器在線列表消費者收到zookeeper通知的在線列表,從broker中拉取數(shù)據(jù)消費者保存偏移量到zookeeper中,以便記錄自己宕機消費到什么地方
3.1 Broker(服務(wù)代理節(jié)點)
服務(wù)代理節(jié)點,其實就是一個kafka實例或服務(wù)節(jié)點,多個broker構(gòu)成了kafka集群一臺 kafka 服務(wù)器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic
3.2 Producer(生產(chǎn)者)
生產(chǎn)者,也就是寫入消息的一方,將消息寫入broker中即數(shù)據(jù)的發(fā)布者,該角色將消息 push 發(fā)布到 Kafka 的 topic 中broker 接收到生產(chǎn)者發(fā)送的消息后,broker 將該消息追加到當前用于追加數(shù)據(jù)的 segment 文件中生產(chǎn)者發(fā)送的消息,存儲到一個 partition 中,生產(chǎn)者也可以指定數(shù)據(jù)存儲的 partition
3.3 Consumer(消費者)
消費者,也就是讀取消息的一方,從broker中pull 拉取數(shù)據(jù)可以消費多個 topic 中的數(shù)據(jù)
3.4 Consumer Group(消費組)
消費者組,由多個 consumer 組成所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者??蔀槊總€消費者指定組名,若不指定組名則屬于默認的組將多個消費者集中到一起去處理某一個 Topic 的數(shù)據(jù),可以更快的提高數(shù)據(jù)的消費能力消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費,防止數(shù)據(jù)被重復(fù)讀取消費者組之間互不影響消費組。一個或多個消費者構(gòu)成一個消費組,不同的消費組可以訂閱同一個主題的消息且互不影響
3.5 ZooKeeper
kafka使用zookeeper來管理集群的元數(shù)據(jù) meta 信息,以及控制器的選舉等操作由于 consumer 在消費過程中可能會出現(xiàn)斷電宕機等故障,consumer 恢復(fù)后,需要從故障前的位置的繼續(xù)消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復(fù)后繼續(xù)消費zookeeper的作用就是,生產(chǎn)者push數(shù)據(jù)到kafka集群,就必須要找到kafka集群的節(jié)點在哪里,這些都是通過zookeeper去尋找的。消費者消費哪一條數(shù)據(jù),也需要zookeeper的支持,從zookeeper獲得offset,offset記錄上一次消費的數(shù)據(jù)消費到哪里,這樣就可以接著下一條數(shù)據(jù)進行消費
3.6 Topic(主題)
可以理解為一個隊列,生產(chǎn)者和消費者面向的都是一個 topic。類似于數(shù)據(jù)庫的表名或者 ES 的 index物理上不同 topic 的消息分開存儲
3.7 Partition(分區(qū))
分區(qū),同一個主題下的消息還可以繼續(xù)分成多個分區(qū),一個分區(qū)只屬于一個主題為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上,一個 topic 可以分割為一個或多個 partition,每個 partition 是一個有序的隊列。Kafka 只保證 partition 內(nèi)的記錄是有序的,而不保證 topic 中不同 partition 的順序每個 topic 至少有一個 partition,當生產(chǎn)者產(chǎn)生數(shù)據(jù)的時候,會根據(jù)分配策略選擇分區(qū),然后將消息追加到指定的分區(qū)的隊列末尾
Partation 數(shù)據(jù)路由規(guī)則:
1.指定了 patition,則直接使用 2.未指定 patition 但指定 key(相當于消息中某個屬性),通過對 key 的 value 進行 hash 取模,選出一個 patition 3.patition 和 key 都未指定,使用輪詢選出一個 patition
每條消息都會有一個自增的編號,用于標識消息的偏移量,標識順序從 0 開始。
每個 partition 中的數(shù)據(jù)使用多個 segment 文件存儲。
如果 topic 有多個 partition,消費數(shù)據(jù)時就不能保證數(shù)據(jù)的順序。嚴格保證消息的消費順序的場景下(例如商品秒殺、 搶紅包),需要將 partition 數(shù)目設(shè)為 1。
broker 存儲 topic 的數(shù)據(jù)。如果某 topic 有 N 個 partition,集群有 N 個 broker,那么每個 broker 存儲該 topic 的一個 partition。如果某 topic 有 N 個 partition,集群有 (N+M) 個 broker,那么其中有 N 個 broker 存儲 topic 的一個 partition, 剩下的 M 個 broker 不存儲該 topic 的 partition 數(shù)據(jù)。如果某 topic 有 N 個 partition,集群中 broker 數(shù)目少于 N 個,那么一個 broker 存儲該 topic 的一個或多個 partition。在實際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致 Kafka 集群數(shù)據(jù)不均衡
3.8 Replica(副本)
副本,一個分區(qū)可以有多個副本來提高容災(zāi)性為保證集群中的某個節(jié)點發(fā)生故障時,該節(jié)點上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機制,一個 topic 的每個分區(qū)都有若干個副本,一個 leader 和若干個 follower
3.9 Leader and Follower
分區(qū)有了多個副本,那么就需要有同步方式。kafka使用一主多從進行消息同步,主副本提供讀寫的能力,而從副本不提供讀寫,僅僅作為主副本的備份每個 partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當前負責(zé)數(shù)據(jù)的讀寫的 partitionFollower 跟隨 Leader,所有寫請求都通過 Leader 路由,數(shù)據(jù)變更會廣播給所有 Follower,F(xiàn)ollower 與 Leader 保持數(shù)據(jù)同步。Follower 只負責(zé)備份,不負責(zé)數(shù)據(jù)的讀寫。如果 Leader 故障,則從 Follower 中選舉出一個新的 Leader。當 Follower 掛掉、卡住或者同步太慢,Leader 會把這個 Follower 從 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合) 列表中刪除,重新創(chuàng)建一個 Follower
3.10 Offset(偏移量)
可以唯一的標識一條消息,分區(qū)中的每一條消息都有一個所在分區(qū)的偏移量,這個偏移量唯一標識了該消息在當前這個分區(qū)的位置,并保證了在這個分區(qū)的順序性,不過不保證跨分區(qū)的順序性偏移量決定讀取數(shù)據(jù)的位置,不會有線程安全的問題,消費者通過偏移量來決定下次讀取的消息(即消費位置)消息被消費之后,并不被馬上刪除,這樣多個業(yè)務(wù)就可以重復(fù)使用 Kafka 的消息某一個業(yè)務(wù)也可以通過修改偏移量達到重新讀取消息的目的,偏移量由用戶控制消息最終還是會被刪除的,默認生命周期為 1 周(7*24小時)
三.部署ZooKeeper+Kafka集群
1.環(huán)境準備
服務(wù)器類型IP地址Zookeeper服務(wù)器1192.168.21.10Zookeeper服務(wù)器2192.168.21.30Zookeeper服務(wù)器3192.168.21.40
2.下載安裝安裝包
1. #下載安裝包
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
1.1 #有壓縮包就直接拖進來
cd /opt
rz -E
2. #安裝Kafka
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
3.修改配置文件
1. #移動并將配置文件進行備份
cd /usr/local/kafka/config/
cp server.properties{,.bak}
2. #修改
vim server.properties
-------------------------------------------
broker.id=0
#21行,broker的全局唯一編號,每個broker不能重復(fù),因此要在其他機器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.21.10:9092
#31行,指定監(jiān)聽的IP和端口,如果修改每個broker的IP需區(qū)分開來,也可保持默認配置不用修改
num.network.threads=3
#42行,broker 處理網(wǎng)絡(luò)請求的線程數(shù)量,一般情況下不需要去修改
num.io.threads=8
#45行,用來處理磁盤IO的線程數(shù)量,數(shù)值應(yīng)該大于硬盤數(shù)
socket.send.buffer.bytes=102400 #48行,發(fā)送套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600 #54行,請求套接字的緩沖區(qū)大小
log.dirs=/usr/local/kafka/logs #60行,kafka運行日志存放的路徑,也是數(shù)據(jù)存放的路徑
num.partitions=1
#65行,topic在當前broker上的默認分區(qū)個數(shù),會被topic創(chuàng)建時的指定參數(shù)覆蓋
num.recovery.threads.per.data.dir=1 #69行,用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
log.retention.hours=168
#103行,segment文件(數(shù)據(jù)文件)保留的最長時間,單位為小時,默認為7天,超時將被刪除
log.segment.bytes=1073741824
#110行,一個segment文件最大的大小,默認為 1G,超出將新建一個新的segment文件
zookeeper.connect=192.168.21.10:2181,192.168.21.30:2181,192.168.21.40:2181
#123行,配置連接Zookeeper集群地址
------------------------------------------------
4.設(shè)置環(huán)境變量
1. #修改環(huán)境變量
vim /etc/profile
----------------------------------------
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
-----------------------------------------
2. #刷新配置文件
source /etc/profile
3. #查看環(huán)境變量
echo $PATN
5.配置ZooKeeper啟動腳本
vim /etc/init.d/kafka
------------------------------------------------
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 啟動 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 狀態(tài) ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
------------------------------------------------------------------
6.設(shè)置開機自啟并啟動
1. #設(shè)置開機自啟
chmod +x /etc/init.d/kafka
chkconfig --add kafka
2. #分別啟動 Kafka
service kafka start
7.Kafka命令行操作
7.1?創(chuàng)建topic
kafka-topics.sh --create --zookeeper 192.168.21.10:2181,192.168.21.30:2181,192.168.21.40:2181 --replication-factor 2 --partitions 3 --topic test
############################################
--zookeeper:定義 zookeeper 集群服務(wù)器地址,如果有多個 IP 地址使用逗號分割,一般使用一個 IP 即可
--replication-factor:定義分區(qū)副本數(shù),1 代表單副本,建議為 2
--partitions:定義分區(qū)數(shù)
--topic:定義 topic 名稱
7.2?查看當前服務(wù)器中的所有topic
kafka-topics.sh --describe --zookeeper 192.168.21.10:2181,192.168.21.30:2181,192.168.21.40:2181
7.3 查看某個topic的詳情
kafka-topics.sh --describe --zookeeper 192.168.21.10:2181,192.168.21.30:2181,192.168.21.40:2181
7.4?發(fā)布消息
kafka-console-producer.sh --broker-list 192.168.21.10:9092,192.168.21.30:9092,192.168.21.40:9092 --topic test
7.5 消費消息
kafka-console-consumer.sh --bootstrap-server 192.168.21.10:9092,192.168.21.30:9092,192.168.21.40:9092 --topic test --from-beginning
#--from-beginning:會把主題中以往所有的數(shù)據(jù)都讀取出來
7.6?修改分區(qū)數(shù)
kafka-topics.sh --zookeeper 192.168.21.10:2181,192.168.21.30:2181,192.168.21.40:2181 --alter --topic test --partitions 6
7.7?刪除topic
kafka-topics.sh --delete --zookeeper 192.168.21.10:2181,192.168.21.30:2181,192.168.21.40:2181 --topic test
柚子快報激活碼778899分享:Kafka分布式數(shù)據(jù)處理平臺
推薦文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。