柚子快報激活碼778899分享:大數(shù)據(jù) Flink-時間語義
柚子快報激活碼778899分享:大數(shù)據(jù) Flink-時間語義
1時間語義
flink種設(shè)計時間的不同概念:
1 Event Time:事件時間,指代事件創(chuàng)建的時間,指代數(shù)據(jù)中的時間錯帶指代事件時間,F(xiàn)link通過時間戳分配器訪問事件時間2 Ingestion Time: 攝入時間:指代數(shù)據(jù)進(jìn)入Flink的時間3 Processing Time:進(jìn)程時間:數(shù)據(jù)執(zhí)行算子的處理時間
1 EventTime 的引入:
在Flink的流式處理中,絕大部分的業(yè)務(wù)都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的時間屬性
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調(diào)用時刻開始給env創(chuàng)建的每一個stream追加時間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2 Watermark
1 watermark概述:
1 在eventTime事件時間中,F(xiàn)link接收事件的數(shù)據(jù)不是嚴(yán)格按照事件時間進(jìn)行排序,會出現(xiàn)亂序,需要watermark進(jìn)行處理亂序的一種機(jī)制
2 一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時必須要有個機(jī)制來保證一個特定的時間后,必須觸發(fā)window去進(jìn)行計算了,這個特別的機(jī)制,就是Watermark。
2 watermark的理論知識
Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime- 延遲時長,
也就是說,Watermark是基于數(shù)據(jù)攜帶的時間戳生成的,
一旦Watermark比當(dāng)前未觸發(fā)的窗口的停止時間要晚,那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。
由于event time是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
圖解案例:
亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)
上圖中,我們設(shè)置的允許最大延遲到達(dá)時間為2s,所以時間戳為7s的事件對應(yīng)的Watermark是5s,
時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,
窗口2是6s~10s,那么時間戳為7s的事件到達(dá)時的Watermarker恰好觸發(fā)窗口1,時間戳為12s的事件到達(dá)時的Watermark恰好觸發(fā)窗口2
Watermark就是觸發(fā)前一窗口的“關(guān)窗時間”,一旦觸發(fā)關(guān)門那么以當(dāng)前時刻為準(zhǔn)在窗口范圍內(nèi)的所有數(shù)據(jù)都會收入窗中。
只要沒有達(dá)到水位那么不管現(xiàn)實中的時間推進(jìn)了多久都不會觸發(fā)關(guān)窗。
3 理論小結(jié)
watermark是用來處理按照事件時間出現(xiàn)亂序的一種機(jī)制
4 Watermark引入
dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor
@Override
public long extractTimestamp(element: SensorReading): Long = {
return element.getTimestamp() * 1000L;//獲取事件數(shù)據(jù)的時間戳作為事件時間
}
} );
我們看到上面的例子中創(chuàng)建了一個看起來有點復(fù)雜的類,這個類實現(xiàn)的其實就是分配時間戳的接口。
Flink暴露了TimestampAssigner接口供我們實現(xiàn),使我們可以自定義如何從事件數(shù)據(jù)中抽取時間戳。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設(shè)置事件時間語義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 在添加數(shù)據(jù)源時候設(shè)置時間戳以及watermark
DataStream
.assignTimestampsAndWatermarks(new MyAssigner());// 這里就可以自定義事件語義的時間戳
MyAssigner有兩種類型(分配時間戳的接口)
AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks
以上兩個接口都繼承自TimestampAssigner。
watermark生產(chǎn)的時間間隔,怎么周期性生成watermark設(shè)置
可以使用ExecutionConfig.setAutoWatermarkInterval()方法進(jìn)行設(shè)置。
// 每隔5秒產(chǎn)生一個watermark env.getConfig.setAutoWatermarkInterval(5000);
以上代碼解析:
產(chǎn)生watermark的邏輯:每隔5秒鐘,
Flink會調(diào)用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。
如果方法返回一個時間戳大于之前水位的時間戳,新的watermark會被插入到流中。
這個檢查保證了水位線是單調(diào)遞增的。如果方法返回的時間戳小于等于之前水位的時間戳,則不會產(chǎn)生新的watermark。
周期性獲取時間戳的例子:自定義周期性時間戳分配器
// 自定義周期性時間戳分配器
public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks
private Long bound = 60 * 1000L; // 延遲一分鐘
private Long maxTs = Long.MIN_VALUE; // 當(dāng)前最大時間戳
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTs - bound);
}
@Override
public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
maxTs = Math.max(maxTs, element.getTimestamp());
return element.getTime
不周期性:自定義時間戳:
如果數(shù)據(jù)是單調(diào)遞增:AscendingTimestampExtractor,這個類會直接使用數(shù)據(jù)的時間戳生成watermark。
代碼如下:
DataStream
dataStream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor
@Override
public long extractAscendingTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
亂序數(shù)據(jù)流,如果能大致估算出最大延遲時間,則使用 BoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)),
這個可以根據(jù)事件的時間戳減去1S,作為時間戳
DataStream
dataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
柚子快報激活碼778899分享:大數(shù)據(jù) Flink-時間語義
好文推薦
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。