柚子快報邀請碼778899分享:大數(shù)據(jù)之Shuffle詳解
柚子快報邀請碼778899分享:大數(shù)據(jù)之Shuffle詳解
1、什么是 shuffle?
shuffle?單從英文意思上來看,是 “洗牌” ?的意思,但在大數(shù)據(jù)分布式計算中指的是集群范圍內跨節(jié)點、跨進程的數(shù)據(jù)分發(fā)。?也就是 2 個 Stage 之間,數(shù)據(jù)進行傳遞的過程就叫做 Shuffle。
那為什么需要 shuffle 呢?
這往往和大數(shù)據(jù)的很多計算場景相關,比如我們一直在說的 WordCount ,把相同的單詞進行計數(shù)。因此,需要對數(shù)據(jù)進行聚合操作,聚合自然要對相同的 key 進行聚合,所以需要通過 shuffle 把各個節(jié)點相同的 key 拉到一起,可以結合下圖理解一下。
?
當然,WordCount 是我舉的一個小栗子。其實,在 Spark 中,聚合、排序是一個通用的計算場景(都有相應的算子),而且很多時候數(shù)據(jù)量會很大,因此,必須得設計 shuffle 來支持這些不同類型的計算。
那 shuffle 到底解決了什么問題?
我們已經(jīng)知道 Shuffle 指的是集群范圍內跨節(jié)點、跨進程數(shù)據(jù)分發(fā),其實分發(fā)并不是上游 Stage 直接把數(shù)據(jù)分發(fā)給下游,而是先輸出一個中間文件,然后下游 Stage 去讀取中間文件。
所以,shuffle?要解決的問題就是如何將這些中間文件數(shù)據(jù)重新組織,使其能夠在上下游 Task 之間進行數(shù)據(jù)傳遞和計算。
?2、shuffle 工作原理
Shuffle?分為?Shuffle Write?和?Shuffle Read?兩個階段,前者主要解決上游 stage 輸出數(shù)據(jù)的分區(qū)問題,后者主要解決下游 stage 從上游 stage 獲取數(shù)據(jù)、重新組織、并為后續(xù)操作提供數(shù)據(jù)源的問題。
干說有點晦澀,為了你方便理解,我還是用?WordCount?這個例子來說明。在?WordCount?中,要實現(xiàn)相同的單詞進行分組計數(shù),需要引入 shuffle,其流程圖如下:
?
WordCount?中主要是?reduceByKey?產生了 Shuffle ,從上圖可知,以 Shuffle 為邊界把計算切分成 2 ?個 Stage, Shuffle 的上游叫做 Map 階段,Shuffle 的下游叫做 Reduce 階段。
Map 階段,每個分區(qū)內先把自己的數(shù)據(jù)做一個初步的 combine,又稱為 Map 端聚合;然后不同的單詞會根據(jù)分區(qū)函數(shù)計算出來的分區(qū)號,被分發(fā)到下游對應的分區(qū)
最后?Reduce?階段以單詞為 key 執(zhí)行聚合函數(shù)做第二次聚合,這里稱為全局聚合,從而實現(xiàn)了單詞計數(shù)的功能。
上述流程值得注意的是,在 Shuffle 環(huán)節(jié),數(shù)據(jù)不是直接分發(fā)到下游,而是先寫到中間文件中,然后?Reduce?階段會去讀取這個中間文件的數(shù)據(jù)。
總的來說,Map 和 Reduce 類似于一個生產者-消費者模型,Map 階段生產 shuffle 中間文件,reduce 消費 shuffle 中間文件,從而銜接了 shuffle 的上下游,實現(xiàn)了數(shù)據(jù)的交換。
那么,疑問又來了,什么是 shuffle 中間文件?shuffle 中間文件又是怎樣生成的?
?
DAGScheduler?會從數(shù)據(jù)依賴圖后往前回溯,以 shuffle 為邊界切割 job ,生成 Stage,并會為每個 Stage 創(chuàng)建任務集合?TaskSet,每一個?TaskSet?又都包含多個分布式任務 Task。
在 Map 階段,每個 Task 都會生成 2 個文件:一個 data 文件,一個 index 文件。
這兩個文統(tǒng)稱為 Shuffle 中間文件,并且以 Map 端的 Task 粒度生成,有多少個并行的 Task ,
就會生成多少份 Shuffle 中間文件(最后小文件個數(shù):2 x?Map Task?個數(shù))。
shuffle 中間文件到底長啥樣?我們已經(jīng)說了,它是 data 文件和 index 文件的一個統(tǒng)稱。我們以?WordCount?為例來說明:
從上圖可知,data 文件記錄的就是原始的
標識了目標分區(qū)所屬數(shù)據(jù)的起始索引。在圖中,為了簡潔描述,我只用了 2 條數(shù)據(jù)示意,實際上更復雜。
我們知道了 shuffle 文件的樣子,下面看看它是如何產生的?
3、shuffle write
?
在 Spark shuffle 機制中,Shuffle Write 主要就是生成 shuffle 中間文件的一個過程。
在 shuffle 中間文件生成的過程中,Shuffle Writer?主要承擔的功能有:數(shù)據(jù)分區(qū)、聚合和排序 3 個功能。
關于數(shù)據(jù)分區(qū),主要考慮 2 個問題。
第一個問題,如何確定分區(qū)個數(shù)?
一般來說,分區(qū)個數(shù)與下游 Stage 的 Reduc Task 個數(shù)一致。在 shuffle 時,用戶通過指定分區(qū)個數(shù):numPartitions
相應的會有同樣個數(shù)的 reduce task 來處理相應的數(shù)據(jù);如果用戶沒有自定義,則分區(qū)個數(shù)默認為 parent RDD 中分區(qū)個數(shù)的最大值。
第二個問題,對于 map task 輸出的 record 如何分區(qū)?
公式:partitionId = Hash(key) % numpartitions?。即每計算出一條 record,計算其 key 的哈希值,并與分區(qū)個數(shù)取模,則得到該條數(shù)據(jù)的分區(qū) ID
關于數(shù)據(jù)聚合,本來聚合應該是?shuffle read??完數(shù)據(jù)之后要做的,之所以在?shuffle write?時做 combine ,是考慮到數(shù)據(jù)量大的時候,提前聚合以減少數(shù)據(jù)量,從而減少網(wǎng)絡 IO。
這里要注意的是,shuffle read?聚合是屬于全局聚合,shuffle write?聚合只針對當前分區(qū)做聚合。
關于排序,Spark 采取的是先聚合,再排序。實現(xiàn)主要是設計了一個特殊的數(shù)據(jù)結構,類似?HashMap+Array?,完成先聚合,再排序的功能。
針對上述三個功能,Spark 設計了一個通用的 Shuffle Write 框架,該框架的執(zhí)行順序為:map 輸出 record ?---> ? 數(shù)據(jù)聚合 ---> 排序 ---> 分區(qū)。其中數(shù)據(jù)聚合、排序是可選項。
這里還是使用 WordCount 例子給大家做說明:
如圖所示,Spark 采用的實現(xiàn)方法是建立一個類似?HashMap + Array?的內存數(shù)據(jù)結構,對?map()?輸出的 record 進行聚合。
類?HashMap??結構中的 Key 由?“partitionId+Key”?組成, Value 是經(jīng)過相同 Key combine 的結果。
在圖中,map 端聚合是?sum()?函數(shù),那么 Value 中存放的便是多個 record 對應的 Value 相加的結果。聚合完成后,Spark 對類似?HashMap?中的 record 進行排序。
如果需要按 Key 進行排序,那么按?partitionId+Key?進行排序。最后,將排序后的 record 寫入一個分區(qū)文件中。
如果該數(shù)據(jù)結構存放不下,則會先擴容為 2 倍大小,如果還存放不下,就將類?HashMap?中的 record 排序后 spill 到磁盤上。
此時,HashMap?被清空,可以繼續(xù)對?map()?輸出的 record 進行聚合,如果內存再次不夠用,那么繼續(xù)spill到磁盤上,此過程可以重復多次。
當?map()?輸出完成以后,將此時?HashMap?中的 reocrd 與磁盤上已排序的 record 進行再次聚合(merge),得到最終的?record?,并輸出到相應的分區(qū)文件中,該分區(qū)文件,便是上文中提到的 Shuffle 中間文件。
好了,我們知道 shuffle 中間文件是如何產生的,接下來我們繼續(xù)看 reduce task 如何讀取 shuffle 中間文件?
4、shuffle read
?
當?Shuffle Write?輸出了 shuffle 中間文件后,就到了?Shuffle Read?階段。Shuffle Read?主要需要實現(xiàn) 3 個功能:跨節(jié)點拉取數(shù)據(jù),聚合和排序。
同樣的,Spark 為了支持這三個功能,設計了一個通用的 Shuffle Read 框架,它的計算順序:跨節(jié)點數(shù)據(jù)拉取 ---> ? 數(shù)據(jù)聚合 ---> 排序 ---> 輸出。
這里值得注意的點是,對于每一個?Map Task?生成的中間文件,其中生成的分區(qū)數(shù)量由下游的 Reduce 階段的 Task 個數(shù)決定,并且 index 文件標識了目標分區(qū)所屬數(shù)據(jù)的起始索引。
為了講清楚?Shuffle Read?的細節(jié),小林還是使用?WordCount?例子,給大家揭開?Shuffle Read?的神秘面紗。
如圖所示,Reduce Task 不斷的從各個?shuffle?中間文件拉取數(shù)據(jù),并將數(shù)據(jù)輸出到一個 buffer 中。
獲取 record 后,Spark 建立一個類似?HashMap?的內存數(shù)據(jù)結構(ExternalAppendOnlyMap),對 buffer 中的 record 進行聚合,類?HashMap?中的 Key 是 record 中的 Key,類?HashMap?中的 Value 是經(jīng)過相同聚合函數(shù)(func())計算后的結果。
在圖中,聚合函數(shù)是?sum()?函數(shù),那么 Value 中存放的是多個 record 對應 Value 相加后的結果。之后,按照 Key進行排序,如圖所示,則建立一個Array 結構,讀取類?HashMap?中的 record,并對 record 按 Key (圖中是按 key 的首字母)進行排序,排序完成后,將結果輸出或者傳遞給下一步操作。
如果類?HashMap?存放不下,則會先擴容為兩倍大小,如果還存放不下,就將?HashMap?中的 record 排序后 spill 到磁盤上。此時,HashMap?被清空,可以繼續(xù)對?buffer?中的 record 進行聚合。如果內存再次不夠用,那么繼續(xù) spill 到磁盤上,此過程可以重復多次。
當聚合完成以后,將此時?HashMap?中的 reocrd 與磁盤上已排序的 record 進行再次聚合,得到最終的 record,輸出到分區(qū)文件中,供其它數(shù)據(jù)操作。
好了,至此小林已經(jīng)把 Shuffle 的整個全貌都講清楚了。
柚子快報邀請碼778899分享:大數(shù)據(jù)之Shuffle詳解
文章鏈接
本文內容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。