柚子快報邀請碼778899分享:Zookeeper
柚子快報邀請碼778899分享:Zookeeper
基本介紹
框架特征
Zookeeper 是 Apache Hadoop 項目子項目,為分布式框架提供協(xié)調(diào)服務(wù),是一個樹形目錄服務(wù) Zookeeper 是基于觀察者模式設(shè)計的分布式服務(wù)管理框架,負責存儲和管理共享數(shù)據(jù),接受觀察者的注冊監(jiān)控,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper 會通知觀察者
Zookeeper 是一個領(lǐng)導者(Leader),多個跟隨者(Follower)組成的集群集群中只要有半數(shù)以上節(jié)點存活就能正常服務(wù),所以 Zookeeper 適合部署奇數(shù)臺服務(wù)器全局數(shù)據(jù)一致,每個 Server 保存一份相同的數(shù)據(jù)副本,Client 無論連接到哪個 Server,數(shù)據(jù)都是一致更新的請求順序執(zhí)行,來自同一個 Client 的請求按其發(fā)送順序依次執(zhí)行數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗實時性,在一定的時間范圍內(nèi),Client 能讀到最新數(shù)據(jù)心跳檢測,會定時向各個服務(wù)提供者發(fā)送一個請求(實際上建立的是一個 Socket 長連接) 參考視頻:【尚硅谷】大數(shù)據(jù)技術(shù)之Zookeeper 3.5.7版本教程_嗶哩嗶哩_bilibili
應(yīng)用場景
Zookeeper 提供的主要功能包括:統(tǒng)一命名服務(wù)、統(tǒng)一配置管理、統(tǒng)一集群管理、服務(wù)器節(jié)點動態(tài)上下線、軟負載均衡、分布式鎖等
在分布式環(huán)境中,經(jīng)常對應(yīng)用/服務(wù)進行統(tǒng)一命名,便于識別,例如域名相對于 IP 地址更容易被接收
/service/www.baidu.com # 節(jié)點路徑
192.168.1.1 192.168.1.2 # 節(jié)點值
如果在節(jié)點中記錄每臺服務(wù)器的訪問數(shù),讓訪問數(shù)最少的服務(wù)器去處理最新的客戶端請求,可以實現(xiàn)負載均衡
192.168.1.1 10 # 次數(shù)
192.168.1.1 15
配置文件同步可以通過 Zookeeper 實現(xiàn),將配置信息寫入某個 ZNode,其他客戶端監(jiān)視該節(jié)點,當節(jié)點數(shù)據(jù)被修改,通知各個客戶端服務(wù)器集群環(huán)境中,需要實時掌握每個集群節(jié)點的狀態(tài),可以將這些信息放入 ZNode,通過監(jiān)控通知的機制實現(xiàn)實現(xiàn)客戶端實時觀察服務(wù)器上下線的變化,通過心跳檢測實現(xiàn)
基本操作
安裝搭建
安裝步驟:
安裝 JDK拷貝 apache-zookeeper-3.5.7-bin.tar.gz 安裝包到 Linux 系統(tǒng)下,并解壓到指定目錄conf 目錄下的配置文件重命名:
mv zoo_sample.cfg zoo.cfg
修改配置文件:
vim zoo.cfg
# 修改內(nèi)容
dataDir=/home/seazean/SoftWare/zookeeper-3.5.7/zkData
在對應(yīng)目錄創(chuàng)建 zkData 文件夾:
mkdir zkData
Zookeeper 中的配置文件 zoo.cfg 中參數(shù)含義解讀:
tickTime = 2000:通信心跳時間,Zookeeper 服務(wù)器與客戶端心跳時間,單位毫秒initLimit = 10:Leader 與 Follower 初始通信時限,初始連接時能容忍的最多心跳次數(shù)syncLimit = 5:Leader 與 Follower 同步通信時限,LF 通信時間超過 syncLimit * tickTime,Leader 認為 Follwer 下線dataDir:保存 Zookeeper 中的數(shù)據(jù)目錄,默認是 tmp目錄,容易被 Linux 系統(tǒng)定期刪除,所以建議修改clientPort = 2181:客戶端連接端口,通常不做修改
操作命令
服務(wù)端
Linux 命令:
啟動 ZooKeeper 服務(wù):./zkServer.sh start查看 ZooKeeper 服務(wù):./zkServer.sh status停止 ZooKeeper 服務(wù):./zkServer.sh stop重啟 ZooKeeper 服務(wù):./zkServer.sh restart查看進程是否啟動:jps
客戶端
Linux 命令:
連接 ZooKeeper 服務(wù)端:
./zkCli.sh # 直接啟動
./zkCli.sh –server ip:port # 指定 host 啟動
客戶端命令:
基礎(chǔ)操作:
quit # 停止連接
help # 查看命令幫助
創(chuàng)建命令:/ 代表根目錄
create /path value # 創(chuàng)建節(jié)點,value 可選
create -e /path value # 創(chuàng)建臨時節(jié)點
create -s /path value # 創(chuàng)建順序節(jié)點
create -es /path value # 創(chuàng)建臨時順序節(jié)點,比如node10000012 刪除12后也會繼續(xù)從13開始,只會增加
查詢命令:
ls /path # 顯示指定目錄下子節(jié)點
ls –s /path # 查詢節(jié)點詳細信息
ls –w /path # 監(jiān)聽子節(jié)點數(shù)量的變化
stat /path # 查看節(jié)點狀態(tài)
get –s /path # 查詢節(jié)點詳細信息
get –w /path # 監(jiān)聽節(jié)點數(shù)據(jù)的變化
# 屬性,分為當前節(jié)點的屬性和子節(jié)點屬性
czxid: 節(jié)點被創(chuàng)建的事務(wù)ID, 是ZooKeeper中所有修改總的次序,每次修改都有唯一的 zxid,誰小誰先發(fā)生
ctime: 被創(chuàng)建的時間戳
mzxid: 最后一次被更新的事務(wù)ID
mtime: 最后修改的時間戳
pzxid: 子節(jié)點列表最后一次被更新的事務(wù)ID
cversion: 子節(jié)點的變化號,修改次數(shù)
dataversion: 節(jié)點的數(shù)據(jù)變化號,數(shù)據(jù)的變化次數(shù)
aclversion: 節(jié)點的訪問控制列表變化號
ephemeralOwner: 用于臨時節(jié)點,代表節(jié)點擁有者的 session id,如果為持久節(jié)點則為0
dataLength: 節(jié)點存儲的數(shù)據(jù)的長度
numChildren: 當前節(jié)點的子節(jié)點數(shù)量
刪除命令:
delete /path # 刪除節(jié)點
deleteall /path # 遞歸刪除節(jié)點
數(shù)據(jù)結(jié)構(gòu)
ZooKeeper 是一個樹形目錄服務(wù),類似 Unix 的文件系統(tǒng),每一個節(jié)點都被稱為 ZNode,每個 ZNode 默認存儲 1MB 的數(shù)據(jù),節(jié)點上會保存數(shù)據(jù)和節(jié)點信息,每個 ZNode 都可以通過其路徑唯一標識 節(jié)點可以分為四大類:
PERSISTENT:持久化節(jié)點EPHEMERAL:臨時節(jié)點,客戶端和服務(wù)器端斷開連接后,創(chuàng)建的節(jié)點刪除PERSISTENT_SEQUENTIAL:持久化順序節(jié)點,創(chuàng)建 znode 時設(shè)置順序標識,節(jié)點名稱后會附加一個值,順序號是一個單調(diào)遞增的計數(shù)器,由父節(jié)點維護EPHEMERAL_SEQUENTIAL:臨時順序節(jié)點 注意:在分布式系統(tǒng)中,順序號可以被用于為所有的事件進行全局排序,這樣客戶端可以通過順序號推斷事件的順序
代碼實現(xiàn)
添加 Maven 依賴:
實現(xiàn)代碼:
public static void main(String[] args) {
// 參數(shù)一:連接地址
// 參數(shù)二:會話超時時間
// 參數(shù)三:監(jiān)聽器
ZooKeeper zkClient = new ZooKeeper("192.168.3.128:2181", 20000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("監(jiān)聽處理函數(shù)");
}
});
}
集群介紹
相關(guān)概念
Zookeepe 集群三個角色:
Leader 領(lǐng)導者:處理客戶端事務(wù)請求,負責集群內(nèi)部各服務(wù)器的調(diào)度Follower 跟隨者:處理客戶端非事務(wù)請求,轉(zhuǎn)發(fā)事務(wù)請求給 Leader 服務(wù)器,參與 Leader 選舉投票O(jiān)bserver 觀察者:觀察集群的最新狀態(tài)的變化,并將這些狀態(tài)進行同步;處理非事務(wù)性請求,事務(wù)性請求會轉(zhuǎn)發(fā)給 Leader 服務(wù)器進行處理;不會參與任何形式的投票。只提供非事務(wù)性的服務(wù),通常用于在不影響集群事務(wù)處理能力的前提下,提升集群的非事務(wù)處理能力(提高集群讀的能力,但是也降低了集群選主的復(fù)雜程度) 相關(guān)屬性:SID:服務(wù)器 ID,用來唯一標識一臺集群中的機器,和 myid 一致ZXID:事務(wù) ID,用來標識一次服務(wù)器狀態(tài)的變更,在某一時刻集群中每臺機器的 ZXID 值不一定完全一致,這和 ZooKeeper 服務(wù)器對于客戶端更新請求的處理邏輯有關(guān)Epoch:每個 Leader 任期的代號,同一輪選舉投票過程中的該值是相同的,投完一次票就增加 選舉機制:半數(shù)機制,超過半數(shù)的投票就通過第一次啟動選舉規(guī)則:投票過半數(shù)時,服務(wù)器 ID 大的勝出第二次啟動選舉規(guī)則:
EPOCH 大的直接勝出EPOCH 相同,事務(wù) ID 大的勝出(事務(wù) ID 越大,數(shù)據(jù)越新)事務(wù) ID 相同,服務(wù)器 ID 大的勝出
初次選舉
選舉過程:
服務(wù)器 1 啟動,發(fā)起一次選舉,服務(wù)器 1 投自己一票,票數(shù)不超過半數(shù),選舉無法完成,服務(wù)器 1 狀態(tài)保持為 LOOKING服務(wù)器 2 啟動,再發(fā)起一次選舉,服務(wù)器 1 和 2 分別投自己一票并交換選票信息,此時服務(wù)器 1 會發(fā)現(xiàn)服務(wù)器 2 的 SID 比自己投票推舉的(服務(wù)器 1)大,更改選票為推舉服務(wù)器 2。投票結(jié)果為服務(wù)器 1 票數(shù) 0 票,服務(wù)器 2 票數(shù) 2 票,票數(shù)不超過半數(shù),選舉無法完成,服務(wù)器 1、2 狀態(tài)保持 LOOKING服務(wù)器 3 啟動,發(fā)起一次選舉,此時服務(wù)器 1 和 2 都會更改選票為服務(wù)器 3,投票結(jié)果為服務(wù)器 3 票數(shù) 3 票,此時服務(wù)器 3 的票數(shù)已經(jīng)超過半數(shù),服務(wù)器 3 當選 Leader,服務(wù)器 1、2 更改狀態(tài)為 FOLLOWING,服務(wù)器 3 更改狀態(tài)為 LEADING服務(wù)器 4 啟動,發(fā)起一次選舉,此時服務(wù)器 1、2、3 已經(jīng)不是 LOOKING 狀態(tài),不會更改選票信息,交換選票信息結(jié)果后服務(wù)器 3 為 3 票,服務(wù)器 4 為 1 票,此時服務(wù)器 4 更改選票信息為服務(wù)器 3,并更改狀態(tài)為 FOLLOWING服務(wù)器 5 啟動,同 4 一樣
再次選舉
ZooKeeper 集群中的一臺服務(wù)器出現(xiàn)以下情況之一時,就會開始進入 Leader 選舉:
服務(wù)器初始化啟動服務(wù)器運行期間無法和 Leader 保持連接 當一臺服務(wù)器進入 Leader 選舉流程時,當前集群可能會處于以下兩種狀態(tài):集群中本來就已經(jīng)存在一個 Leader,服務(wù)器試圖去選舉 Leader 時會被告知當前服務(wù)器的 Leader 信息,對于該服務(wù)器來說,只需要和 Leader 服務(wù)器建立連接,并進行狀態(tài)同步即可集群中確實不存在 Leader,假設(shè)服務(wù)器 3 和 5 出現(xiàn)故障,開始進行 Leader 選舉,SID 為 1、2、4 的機器投票情況
(EPOCH,ZXID,SID): (1, 8, 1), (1, 8, 2), (1, 7, 4)
根據(jù)選舉規(guī)則,服務(wù)器 2 勝出
數(shù)據(jù)寫入
寫操作就是事務(wù)請求,寫入請求直接發(fā)送給 Leader 節(jié)點:Leader 會先將數(shù)據(jù)寫入自身,同時通知其他 Follower 寫入,當集群中有半數(shù)以上節(jié)點寫入完成,Leader 節(jié)點就會響應(yīng)客戶端數(shù)據(jù)寫入完成
寫入請求直接發(fā)送給 Follower 節(jié)點:Follower 沒有寫入權(quán)限,會將寫請求轉(zhuǎn)發(fā)給 Leader,Leader 將數(shù)據(jù)寫入自身,通知其他 Follower 寫入,當集群中有半數(shù)以上節(jié)點寫入完成,Leader 會通知 Follower 寫入完成,由 Follower 響應(yīng)客戶端數(shù)據(jù)寫入完成
底層協(xié)議
Paxos
Paxos 算法:基于消息傳遞且具有高度容錯特性的一致性算法 優(yōu)點:快速正確的在一個分布式系統(tǒng)中對某個數(shù)據(jù)值達成一致,并且保證不論發(fā)生任何異常,都不會破壞整個系統(tǒng)的一致性 缺陷:在網(wǎng)絡(luò)復(fù)雜的情況下,可能很久無法收斂,甚至陷入活鎖的情況
ZAB
算法介紹
ZAB 協(xié)議借鑒了 Paxos 算法,是為 Zookeeper 設(shè)計的支持崩潰恢復(fù)的原子廣播協(xié)議,基于該協(xié)議 Zookeeper 設(shè)計為只有一臺客戶端(Leader)負責處理外部的寫事務(wù)請求,然后 Leader 將數(shù)據(jù)同步到其他 Follower 節(jié)點 Zab 協(xié)議包括兩種基本的模式:消息廣播、崩潰恢復(fù)
消息廣播
ZAB 協(xié)議針對事務(wù)請求的處理過程類似于一個兩階段提交過程:廣播事務(wù)階段、廣播提交操作
客戶端發(fā)起寫操作請求,Leader 服務(wù)器將請求轉(zhuǎn)化為事務(wù) Proposal 提案,同時為 Proposal 分配一個全局的 ID,即 ZXIDLeader 服務(wù)器為每個 Follower 分配一個單獨的隊列,將廣播的 Proposal 依次放到隊列中去,根據(jù) FIFO 策略進行消息發(fā)送Follower 接收到 Proposal 后,將其以事務(wù)日志的方式寫入本地磁盤中,寫入成功后向 Leader 反饋一個 ACK 響應(yīng)消息Leader 接收到超過半數(shù)以上 Follower 的 ACK 響應(yīng)消息后,即認為消息發(fā)送成功,可以發(fā)送 Commit 消息Leader 向所有 Follower 廣播 commit 消息,同時自身也會完成事務(wù)提交,F(xiàn)ollower 接收到 Commit 后,將上一條事務(wù)提交 兩階段提交模型可能因為 Leader 宕機帶來數(shù)據(jù)不一致:Leader 發(fā)起一個事務(wù) Proposal 后就宕機,F(xiàn)ollower 都沒有 ProposalLeader 收到半數(shù) ACK 宕機,沒來得及向 Follower 發(fā)送 Commit
崩潰恢復(fù)
Leader 服務(wù)器出現(xiàn)崩潰或者由于網(wǎng)絡(luò)原因?qū)е?Leader 服務(wù)器失去了與過半 Follower的聯(lián)系,那么就會進入崩潰恢復(fù)模式,崩潰恢復(fù)主要包括兩部分:Leader 選舉和數(shù)據(jù)恢復(fù) Zab 協(xié)議崩潰恢復(fù)要求滿足以下兩個要求:
已經(jīng)被 Leader 提交的提案 Proposal,必須最終被所有的 Follower 服務(wù)器正確提交丟棄已經(jīng)被 Leader 提出的,但是沒有被提交的 Proposal Zab 協(xié)議需要保證選舉出來的 Leader 需要滿足以下條件:新選舉的 Leader 不能包含未提交的 Proposal,即新 Leader 必須都是已經(jīng)提交了 Proposal 的 Follower 節(jié)點新選舉的 Leader 節(jié)點含有最大的 ZXID,可以避免 Leader 服務(wù)器檢查 Proposal 的提交和丟棄工作 數(shù)據(jù)恢復(fù)階段:完成 Leader 選舉后,在正式開始工作之前(接收事務(wù)請求提出新的 Proposal),Leader 服務(wù)器會首先確認事務(wù)日志中的所有 Proposal 是否已經(jīng)被集群中過半的服務(wù)器 CommitLeader 服務(wù)器需要確保所有的 Follower 服務(wù)器能夠接收到每一條事務(wù)的 Proposal,并且能將所有已經(jīng)提交的事務(wù) Proposal 應(yīng)用到內(nèi)存數(shù)據(jù)中,所以只有當 Follower 將所有尚未同步的事務(wù) Proposal 都從 Leader 服務(wù)器上同步,并且應(yīng)用到內(nèi)存數(shù)據(jù)后,Leader 才會把該 Follower 加入到真正可用的 Follower 列表中
異常處理
Zab 的事務(wù)編號 zxid 設(shè)計:
zxid 是一個 64 位的數(shù)字,低 32 位是一個簡單的單增計數(shù)器,針對客戶端每一個事務(wù)請求,Leader 在產(chǎn)生新的 Proposal 事務(wù)時,都會對該計數(shù)器加 1,而高 32 位則代表了 Leader 周期的 epoch 編號epoch 為當前集群所處的代或者周期,每次 Leader 變更后都會在 epoch 的基礎(chǔ)上加 1,F(xiàn)ollower 只服從 epoch 最高的 Leader 命令,所以舊的 Leader 崩潰恢復(fù)之后,其他 Follower 就不會繼續(xù)追隨每次選舉產(chǎn)生一個新的 Leader,就會從新 Leader 服務(wù)器上取出本地事務(wù)日志中最大編號 Proposal 的 zxid,從 zxid 中解析得到對應(yīng)的 epoch 編號,然后再對其加 1 后作為新的 epoch 值,并將低 32 位數(shù)字歸零,由 0 開始重新生成 zxid Zab 協(xié)議通過 epoch 編號來區(qū)分 Leader 變化周期,能夠有效避免不同的 Leader 錯誤的使用了相同的 zxid 編號提出了不一樣的 Proposal 的異常情況 Zab 數(shù)據(jù)同步過程:數(shù)據(jù)同步階段要以 Leader 服務(wù)器為準一個包含了上個 Leader 周期中尚未提交過的事務(wù) Proposal 的服務(wù)器啟動時,這臺機器加入集群中會以 Follower 角色連上 LeaderLeader 會根據(jù)自己服務(wù)器上最后提交的 Proposal 和 Follower 服務(wù)器的 Proposal 進行比對,讓 Follower 進行一個回退或者前進操作,到一個已經(jīng)被集群中過半機器 Commit 的最新 Proposal(源碼解析部分詳解)
CAP
CAP 理論指的是在一個分布式系統(tǒng)中,Consistency(一致性)、Availability(可用性)、Partition Tolerance(分區(qū)容錯性)不能同時成立,ZooKeeper 保證的是 CP
ZooKeeper 不能保證每次服務(wù)請求的可用性,在極端環(huán)境下可能會丟棄一些請求,消費者程序需要重新請求才能獲得結(jié)果進行 Leader 選舉時集群都是不可用 CAP 三個基本需求,因為 P 是必須的,因此分布式系統(tǒng)選擇就在 CP 或者 AP 中:一致性:指數(shù)據(jù)在多個副本之間是否能夠保持數(shù)據(jù)一致的特性,當一個系統(tǒng)在數(shù)據(jù)一致的狀態(tài)下執(zhí)行更新操作后,也能保證系統(tǒng)的數(shù)據(jù)仍然處于一致的狀態(tài)可用性:指系統(tǒng)提供的服務(wù)必須一直處于可用的狀態(tài),即使集群中一部分節(jié)點故障,對于用戶的每一個操作請求總是能夠在有限的時間內(nèi)返回結(jié)果分區(qū)容錯性:分布式系統(tǒng)在遇到任何網(wǎng)絡(luò)分區(qū)故障時,仍然能夠保證對外提供服務(wù),不會宕機,除非是整個網(wǎng)絡(luò)環(huán)境都發(fā)生了故障
監(jiān)聽機制
實現(xiàn)原理
ZooKeeper 中引入了 Watcher 機制來實現(xiàn)了發(fā)布/訂閱功能,客戶端注冊監(jiān)聽目錄節(jié)點,在特定事件觸發(fā)時,ZooKeeper 會通知所有關(guān)注該事件的客戶端,保證 ZooKeeper 保存的任何的數(shù)據(jù)的任何改變都能快速的響應(yīng)到監(jiān)聽應(yīng)用程序 監(jiān)聽命令:只能生效一次,接收一次通知,再次監(jiān)聽需要重新注冊
ls –w /path # 監(jiān)聽【子節(jié)點數(shù)量】的變化
get –w /path # 監(jiān)聽【節(jié)點數(shù)據(jù)】的變化
工作流程:
在主線程中創(chuàng)建 Zookeeper 客戶端,這時就會創(chuàng)建兩個線程,一個負責網(wǎng)絡(luò)連接通信(connet),一個負責監(jiān)聽(listener)通過 connect 線程將注冊的監(jiān)聽事件發(fā)送給 Zookeeper在 Zookeeper 的注冊監(jiān)聽器列表中將注冊的監(jiān)聽事件添加到列表中Zookeeper 監(jiān)聽到有數(shù)據(jù)或路徑變化,將消息發(fā)送給 listener 線程listener 線程內(nèi)部調(diào)用 process() 方法 Curator 框架引入了 Cache 來實現(xiàn)對 ZooKeeper 服務(wù)端事件的監(jiān)聽,三種 Watcher:NodeCache:只是監(jiān)聽某一個特定的節(jié)點PathChildrenCache:監(jiān)控一個 ZNode 的子節(jié)點TreeCache:可以監(jiān)控整個樹上的所有節(jié)點,類似于 PathChildrenCache 和 NodeCache 的組合
監(jiān)聽案例
整體架構(gòu)
客戶端實時監(jiān)聽服務(wù)器動態(tài)上下線
代碼實現(xiàn)
客戶端:先啟動客戶端進行監(jiān)聽
public class DistributeClient {
private String connectString = "192.168.3.128:2181";
private int sessionTimeout = 20000;
private ZooKeeper zk;
public static void main(String[] args) throws Exception {
DistributeClient client = new DistributeClient();
// 1 獲取zk連接
client.getConnect();
// 2 監(jiān)聽/servers下面子節(jié)點的增加和刪除
client.getServerList();
// 3 業(yè)務(wù)邏輯
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
ArrayList
// 獲取所有子節(jié)點,true 代表觸發(fā)監(jiān)聽操作
List
for (String child : children) {
// 獲取子節(jié)點的數(shù)據(jù)
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
getServerList();
}
});
}
}
服務(wù)端:啟動時需要 Program arguments
public class DistributeServer {
private String connectString = "192.168.3.128:2181";
private int sessionTimeout = 20000;
private ZooKeeper zk;
public static void main(String[] args) throws Exception {
DistributeServer server = new DistributeServer();
// 1 獲取 zookeeper 連接
server.getConnect();
// 2 注冊服務(wù)器到 zk 集群,注意參數(shù)
server.register(args[0]);
// 3 啟動業(yè)務(wù)邏輯
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String hostname) throws KeeperException, InterruptedException {
// OPEN_ACL_UNSAFE: ACL 開放
// EPHEMERAL_SEQUENTIAL: 臨時順序節(jié)點
String create = zk.create("/servers/" + hostname, hostname.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
分布式鎖
實現(xiàn)原理
分布式鎖可以實現(xiàn)在分布式系統(tǒng)中多個進程有序的訪問該臨界資源,多個進程之間不會相互干擾 核心思想:當客戶端要獲取鎖,則創(chuàng)建節(jié)點,使用完鎖,則刪除該節(jié)點
客戶端獲取鎖時,在 /locks 節(jié)點下創(chuàng)建臨時順序節(jié)點
使用臨時節(jié)點是為了防止當服務(wù)器或客戶端宕機以后節(jié)點無法刪除(持久節(jié)點),導致鎖無法釋放使用順序節(jié)點是為了系統(tǒng)自動編號排序,找最小的節(jié)點,防止客戶端饑餓現(xiàn)象,保證公平
獲取 /locks 目錄的所有子節(jié)點,判斷自己的子節(jié)點序號是否最小,成立則客戶端獲取到鎖,使用完鎖后將該節(jié)點刪除反之客戶端需要找到比自己小的節(jié)點,對其注冊事件監(jiān)聽器,監(jiān)聽刪除事件客戶端的 Watcher 收到刪除事件通知,就會重新判斷當前節(jié)點是否是子節(jié)點中序號最小,如果是則獲取到了鎖, 如果不是則重復(fù)以上步驟繼續(xù)獲取到比自己小的一個節(jié)點并注冊監(jiān)聽
Curator
Curator 實現(xiàn)分布式鎖 API,在 Curator 中有五種鎖方案:
InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)InterProcessMutex:分布式可重入排它鎖InterProcessReadWriteLock:分布式讀寫鎖InterProcessMultiLock:將多個鎖作為單個實體管理的容器InterProcessSemaphoreV2:共享信號量
public class CuratorLock {
public static CuratorFramework getCuratorFramework() {
// 重試策略對象
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
// 構(gòu)建客戶端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.3.128:2181")
.connectionTimeoutMs(2000) // 連接超時時間
.sessionTimeoutMs(20000) // 會話超時時間 單位ms
.retryPolicy(policy) // 重試策略
.build();
// 啟動客戶端
client.start();
System.out.println("zookeeper 啟動成功");
return client;
}
public static void main(String[] args) {
// 創(chuàng)建分布式鎖1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 創(chuàng)建分布式鎖2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
lock1.acquire();
System.out.println("線程1 獲取到鎖");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("線程1 釋放鎖");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
lock2.acquire();
System.out.println("線程2 獲取到鎖");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("線程2 釋放鎖");
}
}).start();
}
}
源碼解析
服務(wù)端
服務(wù)端程序的入口 QuorumPeerMain
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
initializeAndRun 的工作:
解析啟動參數(shù)提交周期任務(wù),定時刪除過期的快照初始化通信模型,默認是 NIO 通信
// QuorumPeerMain#runFromConfig
public void runFromConfig(QuorumPeerConfig config) {
// 通信信組件初始化,默認是 NIO 通信
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
// 初始化NIO 服務(wù)端socket,綁定2181 端口,可以接收客戶端請求
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
// 啟動 zk
quorumPeer.start();
}
啟動 zookeeper
// QuorumPeer#start
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷啟動數(shù)據(jù)恢復(fù),將快照中數(shù)據(jù)恢復(fù)到 DataTree
loadDataBase();
// 啟動通信工廠實例對象
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 準備選舉環(huán)境
startLeaderElection();
// 執(zhí)行選舉
super.start();
}
選舉機制
環(huán)境準備
QuorumPeer#startLeaderElection 初始化選舉環(huán)境:
synchronized public void startLeaderElection() {
try {
// Looking 狀態(tài),需要選舉
if (getPeerState() == ServerState.LOOKING) {
// 選票組件: myid (serverid), zxid, epoch
// 開始選票時,serverid 是自己,【先投自己】
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
// 響應(yīng)投票結(jié)果線程
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
// 創(chuàng)建選舉算法實例
this.electionAlg = createElectionAlgorithm(electionType);
}
// zk總的發(fā)送和接收隊列準備好
protected Election createElectionAlgorithm(int electionAlgorithm){
// 負責選舉過程中的所有網(wǎng)絡(luò)通信,創(chuàng)建各種隊列和集合
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// 啟動監(jiān)聽線程, 調(diào)用 client = ss.accept()阻塞,等待處理請求
listener.start();
// 準備好發(fā)送和接收隊列準備
FastLeaderElection fle = new FastLeaderElection(this, qcm);
// 啟動選舉線程,【W(wǎng)orkerSender 和 WorkerReceiver】
fle.start();
le = fle;
}
}
選舉源碼
當 Zookeeper 啟動后,首先都是 Looking 狀態(tài),通過選舉讓其中一臺服務(wù)器成為 Leader 執(zhí)行 super.start() 相當于執(zhí)行 QuorumPeer#run() 方法
public void run() {
case LOOKING:
// 進行選舉,選舉結(jié)束返回最終成為 Leader 勝選的那張選票
setCurrentVote(makeLEStrategy().lookForLeader());
}
FastLeaderElection 類:
lookForLeader:選舉
public Vote lookForLeader() {
// 正常啟動中其他服務(wù)器都會向我發(fā)送一個投票,保存每個服務(wù)器的最新合法有效的投票
HashMap
// 存儲合法選舉之外的投票結(jié)果
HashMap
// 一次選舉的最大等待時間,默認值是0.2s
int notTimeout = finalizeWait;
// 每發(fā)起一輪選舉,logicalclock++,在沒有合法的epoch 數(shù)據(jù)之前,都使用邏輯時鐘代替
synchronized(this){
// 更新邏輯時鐘,每進行一次選舉,都需要更新邏輯時鐘
logicalclock.incrementAndGet();
// 更新選票(serverid, zxid, epoch)
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 廣播選票,把自己的選票發(fā)給其他服務(wù)器
sendNotifications();
// 一輪一輪的選舉直到選舉成功
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ }
}
sendNotifications:廣播選票
private void sendNotifications() {
// 遍歷投票參與者,給每臺服務(wù)器發(fā)送選票
for (long sid : self.getCurrentAndNextConfigVoters()) {
// 創(chuàng)建發(fā)送選票
ToSend notmsg = new ToSend(...);
// 把發(fā)送選票放入發(fā)送隊列
sendqueue.offer(notmsg);
}
}
FastLeaderElection 中有 WorkerSender 線程:
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS):阻塞獲取要發(fā)送的選票process(m):處理要發(fā)送的選票 manager.toSend(m.sid, requestBuffer):發(fā)送選票
if (this.mySid == sid):如果消息的接收者 sid 是自己,直接進入自己的 RecvQueue(自己投自己)else:如果接收者是其他服務(wù)器,創(chuàng)建對應(yīng)的發(fā)送隊列或者復(fù)用已經(jīng)存在的發(fā)送隊列,把消息放入該隊列connectOne(sid):建立連接
sock.connect(electionAddr, cnxTO):建立與 sid 服務(wù)器的連接initiateConnection(sock, sid):初始化連接 startConnection(sock, sid):創(chuàng)建并啟動發(fā)送器線程和接收器線程
dout = new DataOutputStream(buf):獲取 Socket 輸出流,向服務(wù)器發(fā)送數(shù)據(jù)din = new DataInputStream(new BIS(sock.getInputStream()))):通過輸入流讀取對方發(fā)送過來的選票if (sid > self.getId()):接收者 sid 比我的大,沒有資格給對方發(fā)送連接請求的,直接關(guān)閉自己的客戶端SendWorker sw:初始化發(fā)送器,并啟動發(fā)送器線程,線程 run 方法
while (running && !shutdown && sock != null):連接沒有斷開就一直運行ByteBuffer b = pollSendQueue():從發(fā)送隊列 SendQueue 中獲取發(fā)送消息lastMessageSent.put(sid, b):更新對于 sid 這臺服務(wù)器的最近一條消息send(b):執(zhí)行發(fā)送
RecvWorker rw:初始化接收器,并啟動接收器線程
din.readFully(msgArray, 0, length):輸入流接收消息addToRecvQueue(new Message(messagg, sid)):將消息放入接收消息 recvQueue 隊列 FastLeaderElection 中有 WorkerReceiver 線程
response = manager.pollRecvQueue():從 RecvQueue 中阻塞獲取出選舉投票消息(其他服務(wù)器發(fā)送過來的)
狀態(tài)同步
選舉結(jié)束后,每個節(jié)點都需要根據(jù)角色更新自己的狀態(tài),Leader 更新狀態(tài)為 Leader,其他節(jié)點更新狀態(tài)為 Follower,整體流程:
Follower 需要讓 Leader 知道自己的狀態(tài) (sid, epoch, zxid)Leader 接收到信息,根據(jù)信息構(gòu)建新的 epoch,要返回對應(yīng)的信息給 Follower,F(xiàn)ollower 更新自己的 epochLeader 需要根據(jù) Follower 的狀態(tài),確定何種方式的數(shù)據(jù)同步 DIFF、TRUNC、SNAP,就是要以 Leader 服務(wù)器數(shù)據(jù)為準
DIFF:Leader 提交的 zxid 比 Follower 的 zxid 大,發(fā)送 Proposal 給 Follower 提交執(zhí)行TRUNC:Follower 的 zxid 比leader 的 zxid 大,F(xiàn)ollower 要進行回滾SNAP:Follower 沒有任何數(shù)據(jù),直接全量同步
執(zhí)行數(shù)據(jù)同步,當 Leader 接收到超過半數(shù) Follower 的 Ack 之后,進入正常工作狀態(tài),集群啟動完成 核心函數(shù)解析:Leader 更新狀態(tài)入口:Leader.lead()
zk.loadData():恢復(fù)數(shù)據(jù)到內(nèi)存cnxAcceptor = new LearnerCnxAcceptor():啟動通信組件
s = ss.accept():等待其他 Follower 節(jié)點向 Leader 節(jié)點發(fā)送同步狀態(tài)LearnerHandler fh:接收到 Follower 的請求,就創(chuàng)建 LearnerHandler 對象fh.start():啟動線程,通過 switch-case 語法判斷接收的命令,執(zhí)行相應(yīng)的操作
Follower 更新狀態(tài)入口:Follower.followerLeader()
QuorumServer leaderServer = findLeader():查找 LeaderconnectToLeader(addr, hostname):與 Leader 建立連接long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO):向 Leader 注冊
主從工作
Leader:主服務(wù)的工作流程
Follower:從服務(wù)的工作流程,核心函數(shù)為 Follower#followLeader()
readPacket(qp):讀取信息processPacket(qp):處理信息
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
break;
case Leader.PROPOSAL:
break;
case Leader.COMMIT:
break;
case Leader.COMMITANDACTIVATE:
break;
case Leader.UPTODATE:
break;
case Leader.REVALIDATE:
break;
case Leader.SYNC:
break;
default:
break;
}
}
客戶端
柚子快報邀請碼778899分享:Zookeeper
推薦鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。