柚子快報(bào)邀請碼778899分享:查看kafka消息消費(fèi)堆積情況
柚子快報(bào)邀請碼778899分享:查看kafka消息消費(fèi)堆積情況
查看主題命令
展示topic列表
./kafka-topics.sh --list --zookeeper zookeeper_ip:2181
描述topic
./kafka-topics.sh --describe --zookeeper zookeeper_ip:2181 --topic topic_name
查看topic某分區(qū)偏移量最大(小)值
./kafka-run-class.sh kafka.tools.GetOffsetShell --topic topic_name --time -1 --broker-list broker_ip:9092 --partitions 0
增加topic分區(qū)數(shù)
./kafka-topics.sh --zookeeper zookeeper_ip:2181 --alter --topic test --partitions 10
刪除topic:慎用,只會刪除zookeeper中的元數(shù)據(jù),消息文件須手動刪除
方法一:
./kafka-topics.sh --delete --zookeeper zookeeper_ip:2181 --topic topic_name
方法二:待驗(yàn)證
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper zookeeper_ip:2181 --topic topic_name
查看topic消費(fèi)進(jìn)度,必須參數(shù)為–group, 不指定–topic,默認(rèn)為所有topic,
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group_name
列出所有主題中的所有用戶組:
./kafka-consumer-groups.sh --bootstrap-server broker_ip:9092 --list
要使用ConsumerOffsetChecker查看上一個示例中消費(fèi)者組的偏移量
按如下所示“describe”消費(fèi)者組:
./kafka-consumer-groups.sh --bootstrap-server 10.1.3.84:9098 --describe --group group_name
統(tǒng)計(jì)指定group下對應(yīng)各個topic的消息量
./kafka-consumer-groups.sh --bootstrap-server 10.1.3.84:9098 --group group_name --describe | awk '{print $2, $6}' | grep -v "LAG\|^ "|awk '{topics[$1] += $2} END {for (topic in topics) print topic ": " topics[topic]}'
例如
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group SPOT_MARKET --describe | awk '{print $2, $6}' | grep -v "LAG\|^ "|awk '{topics[$1] += $2} END {for (topic in topics) print topic ": " topics[topic]}'
NEEX_SPOT_DISPATCH_ORDER_FILL: 0
NEEX_SPOT_DISPATCH_LEVEL2: 12
NEEX_SPOT_DISPATCH_ORDER: 11
-members: 此選項(xiàng)提供使用者組中所有活動成員的列表。
./kafka-consumer-groups.sh --bootstrap-server broker_ip:9092 --describe --group group_name --members
按topic和分區(qū)統(tǒng)計(jì)各個topic的分區(qū)的消息堆積情況
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group SPOT_MARKET --describe | awk 'BEGIN {OFS="\t"} NR==1 {print "Topic", "Partition", "LAG"} NR>2 {topics[$2 ":" $3] += $6} END {for (topic in topics) {split(topic, parts, ":"); print parts[1] , parts[2] , topics[topic]}}' | awk 'NR==1; NR>1 {print $0 | "sort -k1,1 -k2,2rn"}'
Topic Partition LAG
NEEX_SPOT_DISPATCH_LEVEL2 49 0
NEEX_SPOT_DISPATCH_LEVEL2 48 0
NEEX_SPOT_DISPATCH_LEVEL2 47 0
NEEX_SPOT_DISPATCH_LEVEL2 46 0
NEEX_SPOT_DISPATCH_LEVEL2 45 0
NEEX_SPOT_DISPATCH_LEVEL2 44 0
NEEX_SPOT_DISPATCH_LEVEL2 43 0
NEEX_SPOT_DISPATCH_LEVEL2 42 0
NEEX_SPOT_DISPATCH_LEVEL2 41 0
NEEX_SPOT_DISPATCH_LEVEL2 40 0
NEEX_SPOT_DISPATCH_LEVEL2 39 0
NEEX_SPOT_DISPATCH_LEVEL2 38 0
NEEX_SPOT_DISPATCH_LEVEL2 37 0
NEEX_SPOT_DISPATCH_LEVEL2 36 0
NEEX_SPOT_DISPATCH_LEVEL2 35 0
NEEX_SPOT_DISPATCH_LEVEL2 34 0
NEEX_SPOT_DISPATCH_LEVEL2 33 0
NEEX_SPOT_DISPATCH_LEVEL2 32 0
NEEX_SPOT_DISPATCH_LEVEL2 31 0
NEEX_SPOT_DISPATCH_LEVEL2 30 0
NEEX_SPOT_DISPATCH_LEVEL2 29 0
NEEX_SPOT_DISPATCH_LEVEL2 28 0
NEEX_SPOT_DISPATCH_LEVEL2 27 0
NEEX_SPOT_DISPATCH_LEVEL2 26 0
NEEX_SPOT_DISPATCH_LEVEL2 25 0
NEEX_SPOT_DISPATCH_LEVEL2 24 0
NEEX_SPOT_DISPATCH_LEVEL2 23 0
NEEX_SPOT_DISPATCH_LEVEL2 22 0
NEEX_SPOT_DISPATCH_LEVEL2 21 0
NEEX_SPOT_DISPATCH_LEVEL2 20 0
NEEX_SPOT_DISPATCH_LEVEL2 19 0
NEEX_SPOT_DISPATCH_LEVEL2 18 0
NEEX_SPOT_DISPATCH_LEVEL2 17 0
NEEX_SPOT_DISPATCH_LEVEL2 16 0
NEEX_SPOT_DISPATCH_LEVEL2 15 0
NEEX_SPOT_DISPATCH_LEVEL2 14 0
NEEX_SPOT_DISPATCH_LEVEL2 13 0
NEEX_SPOT_DISPATCH_LEVEL2 12 4
NEEX_SPOT_DISPATCH_LEVEL2 11 0
NEEX_SPOT_DISPATCH_LEVEL2 10 0
NEEX_SPOT_DISPATCH_LEVEL2 9 0
NEEX_SPOT_DISPATCH_LEVEL2 8 0
NEEX_SPOT_DISPATCH_LEVEL2 7 0
NEEX_SPOT_DISPATCH_LEVEL2 6 0
NEEX_SPOT_DISPATCH_LEVEL2 5 0
NEEX_SPOT_DISPATCH_LEVEL2 4 0
NEEX_SPOT_DISPATCH_LEVEL2 3 0
NEEX_SPOT_DISPATCH_LEVEL2 2 0
NEEX_SPOT_DISPATCH_LEVEL2 1 0
NEEX_SPOT_DISPATCH_LEVEL2 0 0
查看kafka消息消費(fèi)情況
消息堆積是消費(fèi)滯后(Lag)的一種表現(xiàn)形式,消息中間件服務(wù)端中所留存的消息與消費(fèi)掉的消息之間的差值即為消息堆積量,也稱之為消費(fèi)滯后(Lag)量。 對于Kafka而言,消息被發(fā)送至Topic中,而Topic又分成了多個分區(qū)(Partition),每一個Partition都有一個預(yù)寫式的日志文件,雖然Partition可以繼續(xù)細(xì)分為若干個段文件(Segment),但是對于上層應(yīng)用來說可以將Partition看成最小的存儲單元(一個由多個Segment文件拼接的“巨型文件”)。 每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到Partition中。 我們來看下圖,其就是Partition的一個真實(shí)寫照: 上圖中有四個概念: LogStartOffset:表示一個Partition的起始位移,初始為0,雖然消息的增加以及日志清除策略的影響,這個值會階段性的增大。 ConsumerOffset:消費(fèi)位移,表示Partition的某個消費(fèi)者消費(fèi)到的位移位置。 HighWatermark:簡稱HW,代表消費(fèi)端所能“觀察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。 LogEndOffset:簡稱LEO, 代表Partition的最高日志位移,其值對消費(fèi)者不可見。 比如在ISR(In-Sync-Replicas)副本數(shù)等于3的情況下(如下圖所示),消息發(fā)送到Leader A之后會更新LEO的值,F(xiàn)ollower B和Follower C也會實(shí)時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達(dá)到的日志位移,也就是A、B、C三者中LEO最小的那個值。由于B、C拉取A消息之間延時問題,所以HW必然不會一直與Leader的LEO相等,即LEO>=HW。 要計(jì)算Kafka中某個消費(fèi)者的滯后量很簡單,首先看看其消費(fèi)了幾個Topic,然后針對每個Topic來計(jì)算其中每個Partition的Lag,每個Partition的Lag計(jì)算就顯得非常的簡單了,參考下圖: 由圖可知消費(fèi)Lag=HW - ConsumerOffset。Kafka中自帶的kafka-consumer_groups.sh腳本中就有Lag的信息,示例如下:
[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDdepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL2000consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL5000consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL1000consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL3000consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL6000consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL0110consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL4000consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9edepthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL11000consumer-3-1eff2fbc-fd4f-41c3-be60-2cd60e463617depthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL7000consumer-3-1eff2fbc-fd4f-41c3-be60-2cd60e463617depthNEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL10000consumer-3-1eff2fbc-fd4f-41c3-be60-2cd60e463617
柚子快報(bào)邀請碼778899分享:查看kafka消息消費(fèi)堆積情況
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。