欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Flink推測(cè)機(jī)制

柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Flink推測(cè)機(jī)制

http://yzkb.51969.com/

1、配置

????execution.batch.speculative.enabled:false,推測(cè)機(jī)制開(kāi)關(guān),必須在AdaptiveBatchScheduler模式下使用

????execution.batch.speculative.max-concurrent-executions:2,同時(shí)最多幾次執(zhí)行

????execution.batch.speculative.block-slow-node-duration:1分鐘,慢速節(jié)點(diǎn)會(huì)如黑名單,控制在黑名單中的時(shí)長(zhǎng)

????slow-task-detector.check-interval:1秒,慢任務(wù)檢查間隔

????slow-task-detector.execution-time.baseline-lower-bound:1分鐘,慢任務(wù)檢測(cè)基線的下限

????slow-task-detector.execution-time.baseline-ratio:0.75,開(kāi)始檢測(cè)慢任務(wù)基線的任務(wù)完成率,即有75%任務(wù)完成后,開(kāi)始計(jì)算剩下的任務(wù)是否為慢任務(wù)

????slow-task-detector.execution-time.baseline-multiplier:1.5,慢任務(wù)基線乘數(shù)

2、SpeculativeScheduler

????推測(cè)機(jī)制在AdaptiveBatchScheduler模式下使用,在AdaptiveBatchSchedulerFactory當(dāng)中,創(chuàng)建調(diào)度器時(shí),如果開(kāi)啟了推測(cè)機(jī)制,會(huì)創(chuàng)建SpeculativeScheduler

if?(enableSpeculativeExecution)?{

????return?new?SpeculativeScheduler(

????????????log,

????????????jobGraph,

????????????ioExecutor,

????????????jobMasterConfiguration,

2.1、啟動(dòng)

????調(diào)度器啟動(dòng)時(shí)有三個(gè)操作:1、注冊(cè)指標(biāo);2、父類(lèi)通用的啟動(dòng)流程,會(huì)有算子的一些初始化;3、啟動(dòng)慢任務(wù)檢測(cè)任務(wù)

protected?void?startSchedulingInternal()?{

????registerMetrics(jobManagerJobMetricGroup);

????super.startSchedulingInternal();

????slowTaskDetector.start(getExecutionGraph(),?this,?getMainThreadExecutor());

}

2.2、SlowTaskDetector

????SlowTaskDetector負(fù)責(zé)檢測(cè)慢任務(wù),實(shí)現(xiàn)類(lèi)是ExecutionTimeBasedSlowTaskDetector,基于schedule進(jìn)行檢測(cè)

this.scheduledDetectionFuture?=

????????mainThreadExecutor.schedule(

????????????????()?->?{

????????????????????listener.notifySlowTasks(findSlowTasks(executionGraph));

????????????????????scheduleTask(executionGraph,?listener,?mainThreadExecutor);

????????????????},

????????????????checkIntervalMillis,

????????????????TimeUnit.MILLISECONDS);

? ? 核心是findSlowTasks,首先是獲取需要校驗(yàn)的拓?fù)浼?/p>

private?List?getJobVerticesToCheck(final?ExecutionGraph?executionGraph)?{

????return?IterableUtils.toStream(executionGraph.getVerticesTopologically())

????????????.filter(ExecutionJobVertex::isInitialized)

????????????.filter(ejv?->?ejv.getAggregateState()?!=?ExecutionState.FINISHED)

????????????.filter(ejv?->?getFinishedRatio(ejv)?>=?baselineRatio)

????????????.collect(Collectors.toList());

}

????getFinishedRatio就是獲取完成任務(wù)數(shù)超過(guò)基線比率的,就是拓?fù)浼型瓿扇蝿?wù)數(shù)和總?cè)蝿?wù)數(shù)的比值

private?double?getFinishedRatio(final?ExecutionJobVertex?executionJobVertex)?{

????checkState(executionJobVertex.getTaskVertices().length?>?0);

????long?finishedCount?=

????????????Arrays.stream(executionJobVertex.getTaskVertices())

????????????????????.filter(ev?->?ev.getExecutionState()?==?ExecutionState.FINISHED)

????????????????????.count();

????return?(double)?finishedCount?/?executionJobVertex.getTaskVertices().length;

}

? ? 接下來(lái)是獲取基線和在基線基礎(chǔ)上計(jì)算慢速任務(wù)的,接口是getBaseline和findExecutionsExceedingBaseline,本質(zhì)就是執(zhí)行時(shí)間和基線的對(duì)比,注意這里不僅用到了時(shí)間,還用到了輸入字節(jié)數(shù),所以慢任務(wù)的檢測(cè)可能是基于吞吐來(lái)的

private?ExecutionTimeWithInputBytes?getBaseline(

????????final?ExecutionJobVertex?executionJobVertex,?final?long?currentTimeMillis)?{

????final?ExecutionTimeWithInputBytes?weightedExecutionTimeMedian?=

????????????calculateFinishedTaskExecutionTimeMedian(executionJobVertex,?currentTimeMillis);

????long?multipliedBaseline?=

????????????(long)?(weightedExecutionTimeMedian.getExecutionTime()?*?baselineMultiplier);

????return?new?ExecutionTimeWithInputBytes(

????????????multipliedBaseline,?weightedExecutionTimeMedian.getInputBytes());

}

return?Double.compare(

????????(double)?executionTime?/?Math.max(inputBytes,?Double.MIN_VALUE),

????????(double)?other.getExecutionTime()

????????????????/?Math.max(other.getInputBytes(),?Double.MIN_VALUE));

2.3、notifySlowTasks

????獲取慢速任務(wù)以后,SlowTaskDetector會(huì)觸發(fā)監(jiān)聽(tīng)器,監(jiān)聽(tīng)器的處理實(shí)現(xiàn)在SpeculativeScheduler的notifySlowTasks接口

????首先把節(jié)點(diǎn)加入黑名單

//?add?slow?nodes?to?blocklist?before?scheduling?new?speculative?executions

blockSlowNodes(slowTasks,?currentTimestamp);

? ? 這邊會(huì)檢測(cè)任務(wù)是否支持推測(cè),默認(rèn)是支持

if?(!executionVertex.isSupportsConcurrentExecutionAttempts())?{

????continue;

}

? ? 基于時(shí)間戳,對(duì)慢任務(wù)新建Execution

final?Collection?attempts?=

????????IntStream.range(0,?newSpeculativeExecutionsToDeploy)

????????????????.mapToObj(

????????????????????????i?->

????????????????????????????????executionVertex.createNewSpeculativeExecution(

????????????????????????????????????????currentTimestamp))

????????????????.collect(Collectors.toList());

????之后會(huì)進(jìn)行一系列的配置,加入監(jiān)控

setupSubtaskGatewayForAttempts(executionVertex,?attempts);

verticesToDeploy.add(executionVertexId);

newSpeculativeExecutions.addAll(attempts);

????最后發(fā)起調(diào)度

executionDeployer.allocateSlotsAndDeploy(

????????newSpeculativeExecutions,

????????executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));

3、任務(wù)結(jié)束

????任務(wù)結(jié)束主要核心在DefaultExecutionGraph的jobFinished,判斷在上層ExecutionJobVertex.executionVertexFinished,這里是通過(guò)任務(wù)并行度來(lái)判斷的,所有子任務(wù)完成則認(rèn)為job完成

void?executionVertexFinished()?{

????checkState(isInitialized());

????numExecutionVertexFinished++;

????if?(numExecutionVertexFinished?==?parallelismInfo.getParallelism())?{

????????getGraph().jobVertexFinished();

????}

}

????這個(gè)的調(diào)用是由Execution觸發(fā)的,也就是每個(gè)子任務(wù)完成會(huì)去調(diào)用一次

if?(transitionState(current,?FINISHED))?{

????try?{

????????finishPartitionsAndUpdateConsumers();

????????updateAccumulatorsAndMetrics(userAccumulators,?metrics);

????????releaseAssignedResource(null);

????????vertex.getExecutionGraphAccessor().deregisterExecution(this);

????}?finally?{

????????vertex.executionFinished(this);

????}

????return;

}

????最終一個(gè)jobVertex(對(duì)應(yīng)Job的一個(gè)任務(wù),任務(wù)根據(jù)并行度有子任務(wù))完成的時(shí)候會(huì)通知所有子任務(wù)完成

public?void?jobVertexFinished()?{

????assertRunningInJobMasterMainThread();

????final?int?numFinished?=?++numFinishedJobVertices;

????if?(numFinished?==?numJobVerticesTotal)?{

????????FutureUtils.assertNoException(

????????????????waitForAllExecutionsTermination().thenAccept(ignored?->?jobFinished()));

????}

}

柚子快報(bào)激活碼778899分享:大數(shù)據(jù) Flink推測(cè)機(jī)制

http://yzkb.51969.com/

好文閱讀

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19380929.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄