柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Flink推測(cè)機(jī)制
柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Flink推測(cè)機(jī)制
1、配置
????execution.batch.speculative.enabled:false,推測(cè)機(jī)制開(kāi)關(guān),必須在AdaptiveBatchScheduler模式下使用
????execution.batch.speculative.max-concurrent-executions:2,同時(shí)最多幾次執(zhí)行
????execution.batch.speculative.block-slow-node-duration:1分鐘,慢速節(jié)點(diǎn)會(huì)如黑名單,控制在黑名單中的時(shí)長(zhǎng)
????slow-task-detector.check-interval:1秒,慢任務(wù)檢查間隔
????slow-task-detector.execution-time.baseline-lower-bound:1分鐘,慢任務(wù)檢測(cè)基線的下限
????slow-task-detector.execution-time.baseline-ratio:0.75,開(kāi)始檢測(cè)慢任務(wù)基線的任務(wù)完成率,即有75%任務(wù)完成后,開(kāi)始計(jì)算剩下的任務(wù)是否為慢任務(wù)
????slow-task-detector.execution-time.baseline-multiplier:1.5,慢任務(wù)基線乘數(shù)
2、SpeculativeScheduler
????推測(cè)機(jī)制在AdaptiveBatchScheduler模式下使用,在AdaptiveBatchSchedulerFactory當(dāng)中,創(chuàng)建調(diào)度器時(shí),如果開(kāi)啟了推測(cè)機(jī)制,會(huì)創(chuàng)建SpeculativeScheduler
if?(enableSpeculativeExecution)?{
????return?new?SpeculativeScheduler(
????????????log,
????????????jobGraph,
????????????ioExecutor,
????????????jobMasterConfiguration,
2.1、啟動(dòng)
????調(diào)度器啟動(dòng)時(shí)有三個(gè)操作:1、注冊(cè)指標(biāo);2、父類(lèi)通用的啟動(dòng)流程,會(huì)有算子的一些初始化;3、啟動(dòng)慢任務(wù)檢測(cè)任務(wù)
protected?void?startSchedulingInternal()?{
????registerMetrics(jobManagerJobMetricGroup);
????super.startSchedulingInternal();
????slowTaskDetector.start(getExecutionGraph(),?this,?getMainThreadExecutor());
}
2.2、SlowTaskDetector
????SlowTaskDetector負(fù)責(zé)檢測(cè)慢任務(wù),實(shí)現(xiàn)類(lèi)是ExecutionTimeBasedSlowTaskDetector,基于schedule進(jìn)行檢測(cè)
this.scheduledDetectionFuture?=
????????mainThreadExecutor.schedule(
????????????????()?->?{
????????????????????listener.notifySlowTasks(findSlowTasks(executionGraph));
????????????????????scheduleTask(executionGraph,?listener,?mainThreadExecutor);
????????????????},
????????????????checkIntervalMillis,
????????????????TimeUnit.MILLISECONDS);
? ? 核心是findSlowTasks,首先是獲取需要校驗(yàn)的拓?fù)浼?/p>
private?List
????return?IterableUtils.toStream(executionGraph.getVerticesTopologically())
????????????.filter(ExecutionJobVertex::isInitialized)
????????????.filter(ejv?->?ejv.getAggregateState()?!=?ExecutionState.FINISHED)
????????????.filter(ejv?->?getFinishedRatio(ejv)?>=?baselineRatio)
????????????.collect(Collectors.toList());
}
????getFinishedRatio就是獲取完成任務(wù)數(shù)超過(guò)基線比率的,就是拓?fù)浼型瓿扇蝿?wù)數(shù)和總?cè)蝿?wù)數(shù)的比值
private?double?getFinishedRatio(final?ExecutionJobVertex?executionJobVertex)?{
????checkState(executionJobVertex.getTaskVertices().length?>?0);
????long?finishedCount?=
????????????Arrays.stream(executionJobVertex.getTaskVertices())
????????????????????.filter(ev?->?ev.getExecutionState()?==?ExecutionState.FINISHED)
????????????????????.count();
????return?(double)?finishedCount?/?executionJobVertex.getTaskVertices().length;
}
? ? 接下來(lái)是獲取基線和在基線基礎(chǔ)上計(jì)算慢速任務(wù)的,接口是getBaseline和findExecutionsExceedingBaseline,本質(zhì)就是執(zhí)行時(shí)間和基線的對(duì)比,注意這里不僅用到了時(shí)間,還用到了輸入字節(jié)數(shù),所以慢任務(wù)的檢測(cè)可能是基于吞吐來(lái)的
private?ExecutionTimeWithInputBytes?getBaseline(
????????final?ExecutionJobVertex?executionJobVertex,?final?long?currentTimeMillis)?{
????final?ExecutionTimeWithInputBytes?weightedExecutionTimeMedian?=
????????????calculateFinishedTaskExecutionTimeMedian(executionJobVertex,?currentTimeMillis);
????long?multipliedBaseline?=
????????????(long)?(weightedExecutionTimeMedian.getExecutionTime()?*?baselineMultiplier);
????return?new?ExecutionTimeWithInputBytes(
????????????multipliedBaseline,?weightedExecutionTimeMedian.getInputBytes());
}
return?Double.compare(
????????(double)?executionTime?/?Math.max(inputBytes,?Double.MIN_VALUE),
????????(double)?other.getExecutionTime()
????????????????/?Math.max(other.getInputBytes(),?Double.MIN_VALUE));
2.3、notifySlowTasks
????獲取慢速任務(wù)以后,SlowTaskDetector會(huì)觸發(fā)監(jiān)聽(tīng)器,監(jiān)聽(tīng)器的處理實(shí)現(xiàn)在SpeculativeScheduler的notifySlowTasks接口
????首先把節(jié)點(diǎn)加入黑名單
//?add?slow?nodes?to?blocklist?before?scheduling?new?speculative?executions
blockSlowNodes(slowTasks,?currentTimestamp);
? ? 這邊會(huì)檢測(cè)任務(wù)是否支持推測(cè),默認(rèn)是支持
if?(!executionVertex.isSupportsConcurrentExecutionAttempts())?{
????continue;
}
? ? 基于時(shí)間戳,對(duì)慢任務(wù)新建Execution
final?Collection
????????IntStream.range(0,?newSpeculativeExecutionsToDeploy)
????????????????.mapToObj(
????????????????????????i?->
????????????????????????????????executionVertex.createNewSpeculativeExecution(
????????????????????????????????????????currentTimestamp))
????????????????.collect(Collectors.toList());
????之后會(huì)進(jìn)行一系列的配置,加入監(jiān)控
setupSubtaskGatewayForAttempts(executionVertex,?attempts);
verticesToDeploy.add(executionVertexId);
newSpeculativeExecutions.addAll(attempts);
????最后發(fā)起調(diào)度
executionDeployer.allocateSlotsAndDeploy(
????????newSpeculativeExecutions,
????????executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
3、任務(wù)結(jié)束
????任務(wù)結(jié)束主要核心在DefaultExecutionGraph的jobFinished,判斷在上層ExecutionJobVertex.executionVertexFinished,這里是通過(guò)任務(wù)并行度來(lái)判斷的,所有子任務(wù)完成則認(rèn)為job完成
void?executionVertexFinished()?{
????checkState(isInitialized());
????numExecutionVertexFinished++;
????if?(numExecutionVertexFinished?==?parallelismInfo.getParallelism())?{
????????getGraph().jobVertexFinished();
????}
}
????這個(gè)的調(diào)用是由Execution觸發(fā)的,也就是每個(gè)子任務(wù)完成會(huì)去調(diào)用一次
if?(transitionState(current,?FINISHED))?{
????try?{
????????finishPartitionsAndUpdateConsumers();
????????updateAccumulatorsAndMetrics(userAccumulators,?metrics);
????????releaseAssignedResource(null);
????????vertex.getExecutionGraphAccessor().deregisterExecution(this);
????}?finally?{
????????vertex.executionFinished(this);
????}
????return;
}
????最終一個(gè)jobVertex(對(duì)應(yīng)Job的一個(gè)任務(wù),任務(wù)根據(jù)并行度有子任務(wù))完成的時(shí)候會(huì)通知所有子任務(wù)完成
public?void?jobVertexFinished()?{
????assertRunningInJobMasterMainThread();
????final?int?numFinished?=?++numFinishedJobVertices;
????if?(numFinished?==?numJobVerticesTotal)?{
????????FutureUtils.assertNoException(
????????????????waitForAllExecutionsTermination().thenAccept(ignored?->?jobFinished()));
????}
}
柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Flink推測(cè)機(jī)制
好文閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。