柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù) Flink性能優(yōu)化小結(jié)
柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù) Flink性能優(yōu)化小結(jié)
jvm內(nèi)存優(yōu)化 內(nèi)存優(yōu)化 netty優(yōu)化 akka優(yōu)化 并行度優(yōu)化 對(duì)象重用 checkpoint優(yōu)化 網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu) 狀態(tài)優(yōu)化 flink數(shù)據(jù)傾斜優(yōu)化 flink背壓
jvm內(nèi)存參數(shù)調(diào)優(yōu)
Flink是依賴內(nèi)存計(jì)算,計(jì)算過程中內(nèi)存不夠?qū)link的執(zhí)行效率影響很大。可以通過監(jiān)控GC(Garbage Collection),評(píng)估內(nèi)存使用及剩余情況來判斷內(nèi)存是否變成性能瓶頸,并根據(jù)情況優(yōu)化。
監(jiān)控節(jié)點(diǎn)進(jìn)程的YARN的Container GC日志,如果頻繁出現(xiàn)Full GC,需要優(yōu)化GC。
GC的配置:在客戶端的"conf/flink-conf.yaml"配置文件中,在“env.java.opts”配置項(xiàng)中添加參數(shù):
-Xloggc:
此處默認(rèn)已經(jīng)添加GC日志。
調(diào)整老年代和新生代的比值。在客戶端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置項(xiàng)中添加參數(shù):“-XX:NewRatio”。如“ -XX:NewRatio=2”,則表示老年代與新生代的比值為2:1,新生代占整個(gè)堆空間的1/3,老年代占2/3。
可以通過設(shè)置?jobmanager.memory.enable-jvm-direct-memory-limit?對(duì) JobManager 進(jìn)程的?JVM 直接內(nèi)存進(jìn)行限制
Flink內(nèi)存調(diào)優(yōu)
flink進(jìn)程內(nèi)存
?jobmanager相關(guān)配置:
taskamanger相關(guān)配置:
yarn相關(guān)的配置:
yarn.appmaster.vcores YARN應(yīng)用程序主機(jī)使用的虛擬核心(vcore)的數(shù)量。yarn.containers.vcores 每個(gè)YARN容器的虛擬核心數(shù)(vcore)。默認(rèn)情況下,vcore數(shù)設(shè)置為每個(gè)TaskManager的插槽數(shù)(如果已設(shè)置),否則設(shè)置為1。為了使用此參數(shù),您的群集必須啟用CPU調(diào)度。您可以通過設(shè)置來做到這一點(diǎn)org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。
yarn.scheduler.maximum-allocation-vcoresyarn.scheduler.minimum-allocation-vcoresFlink單個(gè)task manager的slot數(shù)量必須介于這兩個(gè)值之間
yarn.scheduler.maximum-allocation-mbyarn.scheduler.minimum-allocation-mbFlink的job manager 和task manager內(nèi)存不得超過container最大分配內(nèi)存大小。
yarn.nodemanager.resource.cpu-vcores yarn的虛擬CPU內(nèi)核數(shù),建議設(shè)置為物理CPU核心數(shù)
netty優(yōu)化
Netty Shuffle環(huán)境
Network Communication (via Netty)
akka優(yōu)化
akka.ask.callstack ???捕獲異步請(qǐng)求的調(diào)用堆棧。注意,如果有數(shù)百萬個(gè)并發(fā)RPC調(diào)用,這可能會(huì)增加內(nèi)存占用。
akka.ask.timeout 用于期望并阻止Akka呼叫的超時(shí)。如果Flink由于超時(shí)而失敗,則應(yīng)嘗試增加此值。超時(shí)可能是由于計(jì)算機(jī)運(yùn)行緩慢或網(wǎng)絡(luò)擁塞引起的。超時(shí)值需要一個(gè)時(shí)間單位說明符(ms / s / min / h / d)。
akka.client-socket-worker-pool.pool-size-factor 池大小因子使用以下公式確定線程池大小:ceil(available processors * factor)。然后,結(jié)果大小受pool-size-min和pool-size-max值限制。
akka.client-socket-worker-pool.pool-size-max??要限制基于因素的最大線程數(shù)。
akka.client-socket-worker-pool.pool-size-min??最小線程數(shù)以上限為基礎(chǔ)。
akka.client.timeout 60s 客戶端上所有阻塞呼叫的超時(shí)。
akka.fork-join-executor.parallelism-factor??并行度因子用于通過以下公式確定線程池大?。篶eil(available processors * factor)。然后,所得到的大小由并行度最小值和并行度最大值限制。
akka.fork-join-executor.parallelism-max 最大線程數(shù)上限為基于因子的并行數(shù)。
akka.fork-join-executor.parallelism-min??最小線程數(shù)以基于因素的并行度為上限。
akka.framesize 10485760b(10MB) 在JobManager和TaskManager之間發(fā)送的消息的最大大小。如果Flink因消息超出此限制而失敗,則應(yīng)增加該限制。消息大小需要大小單位說明符。
akka.fork-join-executor.parallelism-factor 并行度因子用于使用以下公式確定線程池大?。篶eil(可用處理器*因子)。然后,結(jié)果大小由并行度最小值和并行度最大值限制。
akka.fork-join-executor.parallelism-max 基于并行度的最大線程數(shù)上限
akka.fork-join-executor.parallelism-min 基于并行度的最大線程數(shù)下限
akka.framesize JobManager和TaskManager之間發(fā)送的最大消息大小。如果Flink失敗是因?yàn)橄⒊^了這個(gè)限制,那么您應(yīng)該增加它。消息大小需要大小單位說明符。
akka.retry-gate-closed-for 遠(yuǎn)程連接斷開后,閘門應(yīng)關(guān)閉幾毫秒。
akka.server-socket-worker-pool.pool-size-factor 池大小因子用于使用以下公式確定線程池大?。篶eil(可用處理器*因子)。然后,結(jié)果大小由池大小最小值和池大小最大值限定。
akka.server-socket-worker-pool.pool-size-max 基于上限因子的最大線程數(shù)。
akka.server-socket-worker-pool.pool-size-min 基于上限因子的最小線程數(shù)
akka.tcp.timeout 所有出站連接超時(shí)。如果由于網(wǎng)絡(luò)速度慢而在連接TaskManager時(shí)遇到問題,則應(yīng)增加此值。
akka.startup-timeout 超時(shí)之后,遠(yuǎn)程組件的啟動(dòng)被視為失敗。
并行度優(yōu)化
當(dāng)分區(qū)導(dǎo)致數(shù)據(jù)傾斜時(shí),需要考慮優(yōu)化分區(qū)。避免非并行度操作,有些對(duì)DataStream的操作會(huì)導(dǎo)致無法并行,例如WindowAll。keyBy盡量不要使用String。
并行度控制任務(wù)的數(shù)量,影響操作后數(shù)據(jù)被切分成的塊數(shù)。調(diào)整并行度讓任務(wù)的數(shù)量和每個(gè)任務(wù)處理的數(shù)據(jù)與機(jī)器的處理能力達(dá)到最優(yōu)。查看CPU使用情況和內(nèi)存占用情況,當(dāng)任務(wù)和數(shù)據(jù)不是平均分布在各節(jié)點(diǎn),而是集中在個(gè)別節(jié)點(diǎn)時(shí),可以增大并行度使任務(wù)和數(shù)據(jù)更均勻的分布在各個(gè)節(jié)點(diǎn)。增加任務(wù)的并行度,充分利用集群機(jī)器的計(jì)算能力,一般并行度設(shè)置為集群CPU核數(shù)總和的2-3倍。
taskmanger 個(gè)數(shù):
num_of_tm = ceil(parallelism / slot) 即并行度除以slot個(gè)數(shù),結(jié)果向上取整。
算子層面并行度設(shè)置:
通過調(diào)用setParallelism()方法來指定
執(zhí)行環(huán)境層次
Flink程序運(yùn)行在執(zhí)行環(huán)境中。執(zhí)行環(huán)境為所有執(zhí)行的算子、數(shù)據(jù)源、data sink定義了一個(gè)默認(rèn)的并行度。
執(zhí)行環(huán)境的默認(rèn)并行度可以通過調(diào)用setParallelism()方法指定。例如:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream
DataStream
wordCounts.print();
env.execute("Word Count Example");
客戶端層次
并行度可以在客戶端將job提交到Flink時(shí)設(shè)定。對(duì)于CLI客戶端,可以通過“-p”參數(shù)指定并行度。例如:./bin/flink run -p 10 ../examples/WordCount-java.jar
對(duì)象重用
對(duì)象重用的本質(zhì)就是在算子鏈中的下游算子使用上游對(duì)象的淺拷貝。若關(guān)閉對(duì)象重用,則必須經(jīng)過一輪序列化和反序列化,相當(dāng)于深拷貝,所以就不能100%地發(fā)揮算子鏈的優(yōu)化效果。
但正所謂魚與熊掌不可兼得,若啟用了對(duì)象重用,那么我們的業(yè)務(wù)代碼中必然不能出現(xiàn)以下兩種情況,以免造成混亂:
在下游修改上游發(fā)射的對(duì)象,或者上游存入其State中的對(duì)象; 同一條流直接對(duì)接多個(gè)處理邏輯(如stream.map(new AFunc())的同時(shí)還有stream.map(new BFunc()))。
總之,在enableObjectReuse()之前,需要謹(jǐn)慎評(píng)估業(yè)務(wù)代碼是否會(huì)帶來副作用。社區(qū)大佬David Anderson曾在Stack Overflow上給出了一個(gè)簡(jiǎn)單明晰的回答,可參見這里。
env.getConfig().enableObjectReuse();
當(dāng)調(diào)用了 enableObjectReuse 方法后, Flink 會(huì)把中間深拷貝的步驟都省略掉,SourceFunction 產(chǎn)生的數(shù)據(jù)直接作為 MapFunction 的輸入,可以減少 gc 壓力。但需要特別注意的是,這個(gè)方法不能隨便調(diào)用,必須要確保下游 Function 只有一種(也就是一個(gè)流只會(huì)被一個(gè)算子處理),或者下游的多個(gè) Function 均不會(huì)改變對(duì)象內(nèi)部的值。否則可能會(huì)有線程安全的問題。
checkpoint優(yōu)化
監(jiān)控checkpoint?
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/monitoring/checkpoint_monitoring/
Checkpoint 時(shí)間間隔,需要根據(jù)業(yè)務(wù)場(chǎng)景對(duì)時(shí)效性的要求而定。如果時(shí)效性要求不高,可以設(shè)置到分鐘級(jí)別,比如5分鐘、10分鐘;如果對(duì)時(shí)效性要求很高,結(jié)合 flink ?控制頁面 Checkpoints 的Summary 中的 End to End Duration,通過最大值、最小值和平均值,合理設(shè)置時(shí)間間隔。注意,時(shí)間間隔需要比 End to End Duration 的時(shí)間要長(zhǎng),否則,可能會(huì)導(dǎo)致上一個(gè) checkpoint 沒結(jié)束,下一個(gè) checkpoint 已經(jīng)開始。為了避免這一情況的發(fā)生,除了設(shè)置時(shí)間間隔,兩次 checkpoint 的最小時(shí)間間隔也可以起到作用,該配置決定在上一次 checkpoint 結(jié)束之后,至少等待多長(zhǎng)時(shí)間開始下一次的 checkpoint。
設(shè)置原則:
Checkpoint 時(shí)間間隔不易過大。一般來說,Checkpoint 時(shí)間間隔越長(zhǎng),需要生產(chǎn)的 State 就越大。如此一來,當(dāng)失敗恢復(fù)時(shí),需要更長(zhǎng)的追趕時(shí)間。 Checkpoint 時(shí)間間隔不易過小。如果 Checkpoint 時(shí)間間隔太小,那么 Flink 應(yīng)用程序就會(huì)頻繁 Checkpoint,導(dǎo)致部分資源被占有,無法專注地進(jìn)行數(shù)據(jù)處理。 Checkpoint 時(shí)間間隔大于 Checkpoint 的生產(chǎn)時(shí)間。當(dāng) Checkpoint 時(shí)間間隔比 Checkpoint 生產(chǎn)時(shí)間長(zhǎng)時(shí),在上次 Checkpoint 完成時(shí),不會(huì)立刻進(jìn)行下一次 Checkpoint,而是會(huì)等待一段時(shí)間,之后再進(jìn)行新的 Checkpoint。否則,每次 Checkpoint 完成時(shí),就會(huì)立即開始下一次 Checkpoint,系統(tǒng)會(huì)有很多資源被 Checkpoint 占用,而真正任務(wù)計(jì)算的資源就會(huì)變少。 開啟本地恢復(fù)。如果 Flink State 很大,在進(jìn)行恢復(fù)時(shí),需要從遠(yuǎn)程存儲(chǔ)上讀取 State 進(jìn)行恢復(fù),如果 State 文件過大,此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢,大量的時(shí)間浪費(fèi)在網(wǎng)絡(luò)傳輸方面。此時(shí)可以設(shè)置 Flink 應(yīng)用程序本地 State 恢復(fù),應(yīng)用程序 State 本地恢復(fù)默認(rèn)沒有開啟,可以設(shè)置參數(shù) state.backend.local-recovery 值為 true 進(jìn)行激活。 設(shè)置 Checkpoint 保存數(shù)。Checkpoint 保存數(shù)默認(rèn)是 1,也就是只保存最新的 Checkpoint 的 State 文件,當(dāng)進(jìn)行 State 恢復(fù)時(shí),如果最新的 Checkpoint 文件不可用時(shí) (比如文件損壞或者其他原因),那么 State 恢復(fù)就會(huì)失敗,如果設(shè)置 Checkpoint 保存數(shù) 3,即使最新的 Checkpoint 恢復(fù)失敗,那么 Flink 也會(huì)回滾到上一次 Checkpoint 的狀態(tài)文件進(jìn)行恢復(fù)??紤]到這種情況,可以通過 state.checkpoints.num-retained 設(shè)置 Checkpoint 保存數(shù)。
// 使? RocksDBStateBackend 做為狀態(tài)后端,并開啟增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hadoop01:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 開啟 Checkpoint,間隔為 1 分鐘
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小間隔 2 分鐘
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2))
// 超時(shí)時(shí)間 10 分鐘
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
配置Task本地恢復(fù) Task 本地恢復(fù)?默認(rèn)禁用,可以通過 Flink 的?CheckpointingOptions.LOCAL_RECOVERY?配置中指定的鍵 state.backend.local-recovery 來啟用。此設(shè)置的值可以是?true?以啟用或?false(默認(rèn))以禁用本地恢復(fù)。
注意,unaligned checkpoints 目前不支持 task 本地恢復(fù)。
參考公眾號(hào)鏈接:flink狀態(tài)調(diào)優(yōu)
參考官網(wǎng):
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/large_state_tuning/
網(wǎng)絡(luò)內(nèi)存調(diào)優(yōu)
緩沖消脹機(jī)制(Buffer Debloating)
緩沖消脹機(jī)制嘗試通過自動(dòng)調(diào)整緩沖數(shù)據(jù)量到一個(gè)合理值來解決這個(gè)問題。
緩沖消脹功能計(jì)算 subtask 可能達(dá)到的最大吞吐(始終保持繁忙狀態(tài)時(shí))并且通過調(diào)整緩沖數(shù)據(jù)量來使得數(shù)據(jù)的消費(fèi)時(shí)間達(dá)到配置值。
可以通過設(shè)置?taskmanager.network.memory.buffer-debloat.enabled?為?true?來開啟緩沖消脹機(jī)制。通過設(shè)置?taskmanager.network.memory.buffer-debloat.target?為?duration?類型的值來指定消費(fèi)緩沖數(shù)據(jù)的目標(biāo)時(shí)間。默認(rèn)值應(yīng)該能滿足大多數(shù)場(chǎng)景。
這個(gè)功能使用過去的吞吐數(shù)據(jù)來預(yù)測(cè)消費(fèi)剩余緩沖數(shù)據(jù)的時(shí)間。如果預(yù)測(cè)不準(zhǔn),緩沖消脹機(jī)制會(huì)導(dǎo)致以下問題:
沒有足夠的緩存數(shù)據(jù)來提供全量吞吐。 有太多緩沖數(shù)據(jù)對(duì) checkpoint barrier 推進(jìn)或者非對(duì)齊的 checkpoint 的大小造成不良影響。
如果您的作業(yè)負(fù)載經(jīng)常變化(即,突如其來的數(shù)據(jù)尖峰,定期的窗口聚合觸發(fā)或者 join ),您可能需要調(diào)整以下設(shè)置:
taskmanager.network.memory.buffer-debloat.period:這是緩沖區(qū)大小重算的最小時(shí)間周期。周期越小,緩沖消脹機(jī)制的反應(yīng)時(shí)間就越快,但是必要的計(jì)算會(huì)消耗更多的CPU。 taskmanager.network.memory.buffer-debloat.samples:調(diào)整用于計(jì)算平均吞吐量的采樣數(shù)。采集樣本的頻率可以通過 taskmanager.network.memory.buffer-debloat.period 來設(shè)置。樣本數(shù)越少,緩沖消脹機(jī)制的反應(yīng)時(shí)間就越快,但是當(dāng)吞吐量突然飆升或者下降時(shí),緩沖消脹機(jī)制計(jì)算的最佳緩沖數(shù)據(jù)量會(huì)更容易出錯(cuò)。 taskmanager.network.memory.buffer-debloat.threshold-percentages:防止緩沖區(qū)大小頻繁改變的優(yōu)化(比如,新的大小跟舊的大小相差不大)。
您可以使用以下指標(biāo)來監(jiān)控當(dāng)前的緩沖區(qū)大?。?/p>
estimatedTimeToConsumeBuffersMs:消費(fèi)所有輸入通道(input channel)中數(shù)據(jù)的總時(shí)間。 debloatedBufferSize:當(dāng)前的緩沖區(qū)大小。
限制
多個(gè)輸入和合并?
當(dāng)前,吞吐計(jì)算和緩沖消脹發(fā)生在 subtask 層面。
如果您的 subtask 有很多不同的輸入或者有一個(gè)合并的輸入,緩沖消脹可能會(huì)導(dǎo)致低吞吐的輸入有太多緩沖數(shù)據(jù),而高吞吐輸入的緩沖區(qū)數(shù)量可能太少而不夠維持當(dāng)前吞吐。當(dāng)不同的輸入吞吐差別比較大時(shí),這種現(xiàn)象會(huì)更加的明顯。我們推薦您在測(cè)試這個(gè)功能時(shí)重點(diǎn)關(guān)注這種 subtask。
緩沖區(qū)的尺寸和個(gè)數(shù)?
當(dāng)前,緩沖消脹僅在使用的緩沖區(qū)大小上設(shè)置上限。實(shí)際的緩沖區(qū)大小和個(gè)數(shù)保持不變。這意味著緩沖消脹機(jī)制不會(huì)減少作業(yè)的內(nèi)存使用。您應(yīng)該手動(dòng)減少緩沖區(qū)的大小或者個(gè)數(shù)。
此外,如果您想減少緩沖數(shù)據(jù)量使其低于緩沖消脹當(dāng)前允許的量,您可能需要手動(dòng)的設(shè)置緩沖區(qū)的個(gè)數(shù)。
高并行度
目前,使用默認(rèn)配置,緩沖區(qū)去塊機(jī)制可能無法在高并行度(約200以上)下正確執(zhí)行。如果您觀察到吞吐量降低或檢查點(diǎn)時(shí)間高于預(yù)期,我們建議將浮動(dòng)緩沖區(qū)(taskmanager.network.memory.foating buffers per gate)的數(shù)量從默認(rèn)值增加到至少等于并行度的數(shù)量。
發(fā)生問題的并行度的實(shí)際值因作業(yè)而異,但通常應(yīng)該超過幾百。
網(wǎng)絡(luò)緩沖生命周期?
Flink 有多個(gè)本地緩沖區(qū)池 —— 每個(gè)輸出和輸入流對(duì)應(yīng)一個(gè)。每個(gè)緩沖區(qū)池的大小被限制為
channels?*?taskmanager.network.memory.buffers-per-channel?+?taskmanager.network.memory.floating-buffers-per-gate
緩沖區(qū)的大小可以通過?taskmanager.memory.segment-size?來設(shè)置。
輸入網(wǎng)絡(luò)緩沖?
輸入通道中的緩沖區(qū)被分為獨(dú)占緩沖區(qū)(exclusive buffer)和流動(dòng)緩沖區(qū)(floating buffer)。每個(gè)獨(dú)占緩沖區(qū)只能被一個(gè)特定的通道使用。一個(gè)通道可以從輸入流的共享緩沖區(qū)池中申請(qǐng)額外的流動(dòng)緩沖區(qū)。剩余的流動(dòng)緩沖區(qū)是可選的并且只有資源足夠的時(shí)候才能獲取。
在初始階段:
Flink 會(huì)為每一個(gè)輸入通道獲取配置數(shù)量的獨(dú)占緩沖區(qū)。 所有的獨(dú)占緩沖區(qū)都必須被滿足,否則作業(yè)會(huì)拋異常失敗。 Flink 至少要有一個(gè)流動(dòng)緩沖區(qū)才能運(yùn)行。
輸出網(wǎng)絡(luò)緩沖?
不像輸入緩沖區(qū)池,輸出緩沖區(qū)池只有一種類型的緩沖區(qū)被所有的 subpartitions 共享。
為了避免過多的數(shù)據(jù)傾斜,每個(gè) subpartition 的緩沖區(qū)數(shù)量可以通過?taskmanager.network.memory.max-buffers-per-channel?來限制。
不同于輸入緩沖區(qū)池,這里配置的獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)只被當(dāng)作推薦值。如果沒有足夠的緩沖區(qū),每個(gè)輸出 subpartition 可以只使用一個(gè)獨(dú)占緩沖區(qū)而沒有流動(dòng)緩沖區(qū)。
透支緩沖區(qū)(Overdraft buffers)?
另外,每個(gè) subtask 輸出數(shù)據(jù)時(shí)可以至多請(qǐng)求?taskmanager.network.memory.max-overdraft-buffers-per-gate?(默認(rèn) 5)個(gè)額外的透支緩沖區(qū)(overdraft buffers)。只有當(dāng)前 subtask 被下游 subtasks 反壓且當(dāng)前 subtask 需要 請(qǐng)求超過 1 個(gè)網(wǎng)絡(luò)緩沖區(qū)(network buffer)才能完成當(dāng)前的操作時(shí),透支緩沖區(qū)才會(huì)被使用??赡馨l(fā)生在以下情況:
序列化非常大的 records,不能放到單個(gè)網(wǎng)絡(luò)緩沖區(qū)中。 類似 flatmap 的算子,即:處理單個(gè) record 時(shí)可能會(huì)生產(chǎn)多個(gè) records。 周期性地或某些事件觸發(fā)產(chǎn)生大量 records 的算子(例如:WindowOperator?的觸發(fā))。
在這些情況下,如果沒有透支緩沖區(qū),F(xiàn)link 的 subtask 線程會(huì)被阻塞在反壓,從而阻止例如 Unaligned Checkpoint 的完成。為了緩解這種情況,增加了透支緩沖區(qū)的概念。這些透支緩沖區(qū)是可選的,F(xiàn)link 可以僅僅使用常規(guī)的緩沖區(qū)逐漸取得進(jìn)展,也就是 說?0?是?taskmanager.network.memory.max-overdraft-buffers-per-gate?可以接受的配置值。
緩沖區(qū)的數(shù)量?
獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)的默認(rèn)配置應(yīng)該足以應(yīng)對(duì)最大吞吐。如果想要最小化緩沖數(shù)據(jù)量,那么可以將獨(dú)占緩沖區(qū)設(shè)置為?0,同時(shí)減小內(nèi)存段的大小。
選擇緩沖區(qū)的大小?
在往下游 subtask 發(fā)送數(shù)據(jù)部分時(shí),緩沖區(qū)通過匯集 record 來優(yōu)化網(wǎng)絡(luò)開銷。下游 subtask 應(yīng)該在接收到完整的 record 后才開始處理它。
如果緩沖區(qū)太小,或者緩沖區(qū)刷新太頻繁(execution.buffer-timeout配置參數(shù)),這可能會(huì)導(dǎo)致吞吐量降低,因?yàn)樵贔link的運(yùn)行時(shí),每個(gè)緩沖區(qū)的開銷明顯高于每個(gè)記錄的開銷。
根據(jù)經(jīng)驗(yàn),我們不建議考慮增加緩沖區(qū)大小或緩沖區(qū)超時(shí),除非您可以在實(shí)際工作負(fù)載中觀察到網(wǎng)絡(luò)瓶頸(下游操作員空閑、上游背壓、輸出緩沖區(qū)隊(duì)列已滿、下游輸入隊(duì)列為空)。
如果緩沖區(qū)太大,會(huì)導(dǎo)致:
內(nèi)存使用高 大量的 checkpoint 數(shù)據(jù)量(針對(duì)非對(duì)齊的 checkpoints) 漫長(zhǎng)的 checkpoint 時(shí)間(針對(duì)對(duì)齊的 checkpoints) execution.buffer-timeout?較小時(shí)內(nèi)存分配使用率會(huì)比較低,因?yàn)榫彌_區(qū)還沒被塞滿數(shù)據(jù)就被發(fā)送下去了。
選擇緩沖區(qū)的數(shù)量?
緩沖區(qū)的數(shù)量是通過?taskmanager.network.memory.buffers-per-channel?和?taskmanager.network.memory.floating-buffers-per-gate?來配置的。
為了最好的吞吐率,我們建議使用獨(dú)占緩沖區(qū)和流動(dòng)緩沖區(qū)的默認(rèn)值(except you have one of limit cases)。如果緩沖數(shù)據(jù)量存在問題,更建議打開緩沖消脹。
您可以人工地調(diào)整網(wǎng)絡(luò)緩沖區(qū)的個(gè)數(shù),但是需要注意:
您應(yīng)該根據(jù)期待的吞吐量(單位?bytes/second)來調(diào)整緩沖區(qū)的數(shù)量。協(xié)調(diào)數(shù)據(jù)傳輸量(大約兩個(gè)節(jié)點(diǎn)之間的兩個(gè)往返消息)。延遲也取決于您的網(wǎng)絡(luò)。
使用 buffer 往返時(shí)間(大概?1ms?在正常的本地網(wǎng)絡(luò)中),緩沖區(qū)大小和期待的吞吐,您可以通過下面的公式計(jì)算維持吞吐所需要的緩沖區(qū)數(shù)量:
number_of_buffers = expected_throughput * buffer_roundtrip / buffer_size
比如,期待吞吐為?320MB/s,往返延遲為?1ms,內(nèi)存段為默認(rèn)大小,為了維持吞吐需要使用10個(gè)活躍的緩沖區(qū):
number_of_buffers = 320MB/s * 1ms / 32KB = 10
流動(dòng)緩沖區(qū)的目的是為了處理數(shù)據(jù)傾斜。理想情況下,流動(dòng)緩沖區(qū)的數(shù)量(默認(rèn)8個(gè))和每個(gè)通道獨(dú)占緩沖區(qū)的數(shù)量(默認(rèn)2個(gè))能夠使網(wǎng)絡(luò)吞吐量飽和。但這并不總是可行和必要的。所有 subtask 中只有一個(gè)通道被使用也是非常罕見的。 獨(dú)占緩沖區(qū)的目的是提供一個(gè)流暢的吞吐量。當(dāng)一個(gè)緩沖區(qū)在傳輸數(shù)據(jù)時(shí),另一個(gè)緩沖區(qū)被填充。當(dāng)吞吐量比較高時(shí),獨(dú)占緩沖區(qū)的數(shù)量是決定 Flink 中緩沖數(shù)據(jù)的主要因素。
當(dāng)?shù)屯掏铝肯鲁霈F(xiàn)反壓時(shí),您應(yīng)該考慮減少獨(dú)占緩沖區(qū)。
總結(jié):
可以通過開啟緩沖消脹機(jī)制來簡(jiǎn)化 Flink 網(wǎng)絡(luò)的內(nèi)存配置調(diào)整。您也可能需要調(diào)整它。
如果這不起作用,您可以關(guān)閉緩沖消脹機(jī)制并且人工地配置內(nèi)存段的大小和緩沖區(qū)個(gè)數(shù)。針對(duì)第二種場(chǎng)景,我們推薦:
使用默認(rèn)值以獲得最大吞吐 減少內(nèi)存段大小、獨(dú)占緩沖區(qū)的數(shù)量來加快 checkpoint 并減少網(wǎng)絡(luò)棧消耗的內(nèi)存量
flink狀態(tài)優(yōu)化
使用rocksdb狀態(tài)后端開啟增量檢查點(diǎn)
Tuning MemTable
memtable 作為 LSM Tree 體系里的讀寫緩存,對(duì)寫性能有較大的影響。以下是一些值得注意的參數(shù)。為方便對(duì)比,下文都會(huì)將 RocksDB 的原始參數(shù)名與 Flink 配置中的參數(shù)名一并列出,用豎線分割。
write_buffer_size | state.backend.rocksdb.writebuffer.size單個(gè) memtable 的大小,默認(rèn)是64MB。當(dāng) memtable 大小達(dá)到此閾值時(shí),就會(huì)被標(biāo)記為不可變。一般來講,適當(dāng)增大這個(gè)參數(shù)可以減小寫放大帶來的影響,但同時(shí)會(huì)增大 flush 后 L0、L1 層的壓力,所以還需要配合修改 compaction 參數(shù),后面再提。 max_write_buffer_number | state.backend.rocksdb.writebuffer.countmemtable?的最大數(shù)量(包含活躍的和不可變的),默認(rèn)是2。當(dāng)全部 memtable 都寫滿但是 flush 速度較慢時(shí),就會(huì)造成寫停頓,所以如果內(nèi)存充足或者使用的是機(jī)械硬盤,建議適當(dāng)調(diào)大這個(gè)參數(shù),如4。 min_write_buffer_number_to_merge | state.backend.rocksdb.writebuffer.number-to-merge在 flush 發(fā)生之前被合并的 memtable 最小數(shù)量,默認(rèn)是1。舉個(gè)例子,如果此參數(shù)設(shè)為2,那么當(dāng)有至少兩個(gè)不可變 memtable 時(shí),才有可能觸發(fā) flush(亦即如果只有一個(gè)不可變 memtable,就會(huì)等待)。調(diào)大這個(gè)值的好處是可以使更多的更改在 flush 前就被合并,降低寫放大,但同時(shí)又可能增加讀放大,因?yàn)樽x取數(shù)據(jù)時(shí)要檢查的 memtable 變多了。經(jīng)測(cè)試,該參數(shù)設(shè)為2或3相對(duì)較好。
Tuning Block/Block Cache
block 是 sstable 的基本存儲(chǔ)單位。block cache 則扮演讀緩存的角色,采用 LRU 算法存儲(chǔ)最近使用的 block,對(duì)讀性能有較大的影響。
block_size | state.backend.rocksdb.block.blocksizeblock 的大小,默認(rèn)值為4KB。在生產(chǎn)環(huán)境中總是會(huì)適當(dāng)調(diào)大一些,一般32KB比較合適,對(duì)于機(jī)械硬盤可以再增大到128~256KB,充分利用其順序讀取能力。但是需要注意,如果 block 大小增大而 block cache 大小不變,那么緩存的 block 數(shù)量會(huì)減少,無形中會(huì)增加讀放大。 block_cache_size | state.backend.rocksdb.block.cache-sizeblock cache 的大小,默認(rèn)為8MB。由上文所述的讀寫流程可知,較大的 block cache 可以有效避免熱數(shù)據(jù)的讀請(qǐng)求落到 sstable 上,所以若內(nèi)存余量充足,建議設(shè)置到128MB甚至256MB,讀性能會(huì)有非常明顯的提升。 ?
Tuning Compaction
compaction 在所有基于 LSM Tree 的存儲(chǔ)引擎中都是開銷最大的操作,弄不好的話會(huì)非常容易阻塞讀寫。建議看官先讀讀前面那篇關(guān)于 RocksDB 的 compaction 策略的文章,獲取一些背景知識(shí),這里不再贅述。
compaction_style | state.backend.rocksdb.compaction.stylecompaction 算法,使用默認(rèn)的 LEVEL(即 leveled compaction)即可,下面的參數(shù)也是基于此。 target_file_size_base | state.backend.rocksdb.compaction.level.target-file-size-baseL1層單個(gè) sstable 文件的大小閾值,默認(rèn)值為64MB。每向上提升一級(jí),閾值會(huì)乘以因子 target_file_size_multiplier(但默認(rèn)為1,即每級(jí)sstable最大都是相同的)。顯然,增大此值可以降低 compaction 的頻率,減少寫放大,但是也會(huì)造成舊數(shù)據(jù)無法及時(shí)清理,從而增加讀放大。此參數(shù)不太容易調(diào)整,一般不建議設(shè)為256MB以上。 max_bytes_for_level_base | state.backend.rocksdb.compaction.level.max-size-level-baseL1層的數(shù)據(jù)總大小閾值,默認(rèn)值為256MB。每向上提升一級(jí),閾值會(huì)乘以因子 max_bytes_for_level_multiplier(默認(rèn)值為10)。由于上層的大小閾值都是以它為基礎(chǔ)推算出來的,所以要小心調(diào)整。建議設(shè)為 target_file_size_base 的倍數(shù),且不能太小,例如5~10倍。 level_compaction_dynamic_level_bytes | state.backend.rocksdb.compaction.level.use-dynamic-size這個(gè)參數(shù)之前講過。當(dāng)開啟之后,上述閾值的乘法因子會(huì)變成除法因子,能夠動(dòng)態(tài)調(diào)整每層的數(shù)據(jù)量閾值,使得較多的數(shù)據(jù)可以落在最高一層,能夠減少空間放大,整個(gè) LSM Tree 的結(jié)構(gòu)也會(huì)更穩(wěn)定。對(duì)于機(jī)械硬盤的環(huán)境,強(qiáng)烈建議開啟。
Generic Parameters
max_open_files | state.backend.rocksdb.files.open顧名思義,是 RocksDB 實(shí)例能夠打開的最大文件數(shù),默認(rèn)為-1,表示不限制。由于sstable的索引和布隆過濾器默認(rèn)都會(huì)駐留內(nèi)存,并占用文件描述符,所以如果此值太小,索引和布隆過濾器無法正常加載,就會(huì)嚴(yán)重拖累讀取性能。 max_background_compactions/max_background_flushes|state.backend.rocksdb.thread.num 后臺(tái)負(fù)責(zé) flush 和 compaction 的最大并發(fā)線程數(shù),默認(rèn)為1。注意 Flink 將這兩個(gè)參數(shù)合二為一處理(對(duì)應(yīng) DBOptions.setIncreaseParallelism() 方法),鑒于 flush 和 compaction 都是相對(duì)重的操作,如果 CPU 余量比較充足,建議調(diào)大,在我們的實(shí)踐中一般設(shè)為4。
參考公眾號(hào)鏈接:flink狀態(tài)調(diào)優(yōu)
flink數(shù)據(jù)傾斜優(yōu)化
?參考公眾號(hào)鏈接:flink數(shù)據(jù)傾斜常見優(yōu)化指南
背壓優(yōu)化
?反壓監(jiān)控指標(biāo):
backPressureTimeMsPerSecond,subtask 被反壓的時(shí)間 idleTimeMsPerSecond,subtask 等待某類處理的時(shí)間 busyTimeMsPerSecond,subtask 實(shí)際工作時(shí)間 在任何時(shí)間點(diǎn),這三個(gè)指標(biāo)相加都約等于1000ms。
web ui觀測(cè):
閑置的 tasks 為藍(lán)色,完全被反壓的 tasks 為黑色,完全繁忙的 tasks 被標(biāo)記為紅色。中間的所有值都表示為這三種顏色之間的過渡色。
Job Overview觀測(cè)反壓狀態(tài):
常見解決方式:
消除背壓源頭,通過優(yōu)化 Flink 作業(yè),通過調(diào)整 Flink 或 JVM 參數(shù),或是通過擴(kuò)容。
減少 Flink 作業(yè)中緩沖在 In-flight 數(shù)據(jù)的數(shù)據(jù)量。
啟用非對(duì)齊 Checkpoints。這些選項(xiàng)并不是互斥的,可以組合在一起。本文檔重點(diǎn)介紹后兩個(gè)選項(xiàng)。
禁用/合并算子鏈chain或者資源槽共享
先贊批,再寫入(滿足實(shí)時(shí)性要求的情況下,異步 io + 熱緩存來優(yōu)化讀寫性能
增加并行度,增加資源。checkpoint時(shí)長(zhǎng)合理設(shè)置
緩沖區(qū) Debloating
Flink 1.14 引入了一個(gè)新的工具,用于自動(dòng)控制在 Flink 算子/子任務(wù)之間緩沖的 In-flight 數(shù)據(jù)的數(shù)據(jù)量。緩沖區(qū) Debloating 機(jī) 制可以通過將屬性taskmanager.network.memory.buffer-debloat.enabled設(shè)置為true來啟用。
此特性對(duì)對(duì)齊和非對(duì)齊 Checkpoint 都生效,并且在這兩種情況下都能縮短 Checkpointing 的時(shí)間,不過 Debloating 的效果對(duì)于 對(duì)齊 Checkpoint 最明顯。當(dāng)在非對(duì)齊 Checkpoint 情況下使用緩沖區(qū) Debloating 時(shí),額外的好處是 Checkpoint 大小會(huì)更小,并且恢復(fù)時(shí)間更快 (需要保存 和恢復(fù)的 In-flight 數(shù)據(jù)更少)。
非對(duì)齊checkpoint
從Flink 1.11開始,Checkpoint 可以是非對(duì)齊的。Unaligned checkpoints 包含 In-flight 數(shù)據(jù)(例如,存儲(chǔ)在緩沖區(qū)中的數(shù)據(jù))作為 Checkpoint State的一部分,允許 Checkpoint Barrier 跨越這些緩沖區(qū)。因此, Checkpoint 時(shí)長(zhǎng)變得與當(dāng)前吞吐量無關(guān),因?yàn)?Checkpoint Barrier 實(shí)際上已經(jīng)不再嵌入到數(shù)據(jù)流當(dāng)中。
如果您的 Checkpointing 由于背壓導(dǎo)致周期非常的長(zhǎng),您應(yīng)該使用非對(duì)齊 Checkpoint。這樣,Checkpointing 時(shí)間基本上就與 端到端延遲無關(guān)。請(qǐng)注意,非對(duì)齊 Checkpointing 會(huì)增加狀態(tài)存儲(chǔ)的 I/O,因此當(dāng)狀態(tài)存儲(chǔ)的 I/O 是 整個(gè) Checkpointing 過程當(dāng)中真 正的瓶頸時(shí),您不應(yīng)當(dāng)使用非對(duì)齊 Checkpointing。
為了啟用非對(duì)齊 Checkpoint,您可以:
// 啟用非對(duì)齊 Checkpointenv.getCheckpointConfig().enableUnalignedCheckpoints();
限制?
并發(fā) Checkpoint?
Flink 當(dāng)前并不支持并發(fā)的非對(duì)齊 Checkpoint。然而,由于更可預(yù)測(cè)的和更短的 Checkpointing 時(shí)長(zhǎng),可能也根本就不需要并發(fā)的 Checkpoint。此外,Savepoint 也不能與非對(duì)齊 Checkpoint 同時(shí)發(fā)生,因此它們將會(huì)花費(fèi)稍長(zhǎng)的時(shí)間。
與 Watermark 的相互影響?
非對(duì)齊 Checkpoint 在恢復(fù)的過程中改變了關(guān)于 Watermark 的一個(gè)隱式保證。目前,F(xiàn)link 確保了 Watermark 作為恢復(fù)的第一步, 而不是將最近的 Watermark 存放在 Operator 中,以方便擴(kuò)縮容。在非對(duì)齊 Checkpoint 中,這意味著當(dāng)恢復(fù)時(shí),F(xiàn)link 會(huì)在恢復(fù) In-flight 數(shù)據(jù)后再生成 Watermark。如果您的 Pipeline 中使用了對(duì)每條記錄都應(yīng)用最新的 Watermark 的算子將會(huì)相對(duì)于 使用對(duì)齊 Checkpoint產(chǎn)生不同的結(jié)果。如果您的 Operator 依賴于最新的 Watermark 始終可用,解決辦法是將 Watermark 存放在 OperatorState 中。在這種情況下,Watermark 應(yīng)該使用單鍵 group 存放在 UnionState 以方便擴(kuò)縮容。
?與長(zhǎng)時(shí)間運(yùn)行的記錄處理相互作用
即使非對(duì)齊的檢查點(diǎn),但障礙物能夠超過隊(duì)列中的所有其他記錄。如果當(dāng)前記錄需要大量時(shí)間來處理,則該屏障的處理仍可能延遲。這種情況可能發(fā)生在同時(shí)觸發(fā)多個(gè)計(jì)時(shí)器時(shí),例如在窗口操作中。當(dāng)處理單個(gè)輸入記錄時(shí),系統(tǒng)被阻止等待多個(gè)網(wǎng)絡(luò)緩沖區(qū)可用性時(shí),可能會(huì)出現(xiàn)第二種問題。Flink不能中斷單個(gè)輸入記錄的處理,未對(duì)齊的檢查點(diǎn)必須等待當(dāng)前處理的記錄被完全處理。這可能在兩種情況下造成問題。由于串行化了一個(gè)不適合單個(gè)網(wǎng)絡(luò)緩沖區(qū)的大記錄,或者在flatMap操作中,一個(gè)輸入記錄產(chǎn)生了多個(gè)輸出記錄。在這種情況下,背壓可以阻止未對(duì)齊的檢查點(diǎn),直到處理單個(gè)輸入記錄所需的所有網(wǎng)絡(luò)緩沖區(qū)都可用。在處理單個(gè)記錄需要一段時(shí)間的任何其他情況下,也可能發(fā)生這種情況。因此,檢查點(diǎn)的時(shí)間可能比預(yù)期的要長(zhǎng),也可能會(huì)有所不同。?
某些數(shù)據(jù)分布模式?jīng)]有檢查點(diǎn)
有一部分包含屬性的的連接無法與 Channel 中的數(shù)據(jù)一樣保存在 Checkpoint 中。為了保留這些特性并且確保沒有狀態(tài)沖突或 非預(yù)期的行為,非對(duì)齊 Checkpoint 對(duì)于這些類型的連接是禁用的。所有其他的交換仍然執(zhí)行非對(duì)齊 Checkpoint。
?點(diǎn)對(duì)點(diǎn)連接
我們目前沒有任何對(duì)于點(diǎn)對(duì)點(diǎn)連接中有關(guān)數(shù)據(jù)有序性的強(qiáng)保證。然而,由于數(shù)據(jù)已經(jīng)被以前置的 Source 或是 KeyBy 相同的方式隱式 組織,一些用戶會(huì)依靠這種特性在提供的有序性保證的同時(shí)將計(jì)算敏感型的任務(wù)劃分為更小的塊。
只要并行度不變,非對(duì)齊 Checkpoint(UC) 將會(huì)保留這些特性。但是如果加上UC的伸縮容,這些特性將會(huì)被改變。
如果我們想將并行度從 p=2 擴(kuò)容到 p=3,那么需要根據(jù) KeyGroup 將 KeyBy 的 Channel 中的數(shù)據(jù)突然的劃分到3個(gè) Channel 中去。這 很容易做到,通過使用 Operator 的 KeyGroup 范圍和確定記錄屬于某個(gè) Key(group) 的方法(不管實(shí)際使用的是什么方法)。對(duì)于 Forward 的 Channel,我們根本沒有 KeyContext。Forward Channel 里也沒有任何記錄被分配了任何 KeyGroup;也無法計(jì)算它,因?yàn)闊o法保證 Key仍然存在。
廣播 Connections
廣播 Connection 帶來了另一個(gè)問題。無法保證所有 Channel 中的記錄都以相同的速率被消費(fèi)。這可能導(dǎo)致某些 Task 已經(jīng)應(yīng)用了與 特定廣播事件對(duì)應(yīng)的狀態(tài)變更,而其他任務(wù)則沒有,如圖所示。
廣播分區(qū)通常用于實(shí)現(xiàn)廣播狀態(tài),它應(yīng)該跨所有 Operator 都相同。Flink 實(shí)現(xiàn)廣播狀態(tài),通過僅 Checkpointing 有狀態(tài)算子的 SubTask 0 中狀態(tài)的單份副本。在恢復(fù)時(shí),我們將該份副本發(fā)往所有的 Operator。因此,可能會(huì)發(fā)生以下情況:某個(gè)算子將很快從它的 Checkpointed Channel 消費(fèi)數(shù)據(jù)并將修改應(yīng)有于記錄來獲得狀態(tài)。
柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù) Flink性能優(yōu)化小結(jié)
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。