柚子快報(bào)激活碼778899分享:flink 窗口和水位線
柚子快報(bào)激活碼778899分享:flink 窗口和水位線
一、窗口
? ? ? ? 1、什么是窗口?
????????我們的flink主要是用來處理無界數(shù)據(jù)流,一種方式就是將我們的無界數(shù)據(jù)流切割成有限的“數(shù)據(jù)塊”進(jìn)行處理,這就是我們的窗口(window)。
? ? ? ? 2、窗口分類
滾動(dòng)窗口、滑動(dòng)窗口、會(huì)話窗口
滾動(dòng)窗口:timeWindow(Time.seconds(3))? ?count-tumbling-window
滑動(dòng)窗口:timeWindow(Time.seconds(5),Time.seconds(3))? ? ? ? count-sliding-window
??????3、窗口能解決什么問題(為什么使用窗口)
? ? ? ? 首先flink是一個(gè)實(shí)現(xiàn)了流批一體的計(jì)算框架,當(dāng)我們使用批處理時(shí)我們引入了窗口計(jì)算,實(shí)現(xiàn)我們的批處理。
? ? ?4、滾動(dòng)窗口(每個(gè)區(qū)消費(fèi)總額Top3的公司)
public class CityShopNameTopN {
public static void main(String[] args) throws Exception{
// TODO: 2022/9/2創(chuàng)建Flink流式處理環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO: 2022/9/2 設(shè)置并行度
environment.setParallelism(1);
String uu = UUID.randomUUID().toString().substring(0, 6).replace("-", "");
String groupId = "ware_goods_group"+uu;
FlinkKafkaConsumer
DataStreamSource
SingleOutputStreamOperator
//水位線
SingleOutputStreamOperator
WatermarkStrategy.
.withTimestampAssigner(new SerializableTimestampAssigner
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
long time = 0;
try {
time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(element.getString("createTime")).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return time;
}
}));
SingleOutputStreamOperator
@Override
public Tuple3
String goodsNum = value.getString("goodsNum");
String goodsPrice = value.getString("goodsPrice");
return new Tuple3<>(value.getString("regionName"), value.getString("cityName"), Integer.valueOf(goodsNum) * Double.valueOf(goodsPrice));
}
});
SingleOutputStreamOperator
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).process(new ProcessWindowFunction
@Override
public void process(String s, Context context, Iterable
ArrayList
for (Tuple3
list.add(value);
}
list.sort(new Comparator
@Override
public int compare(Tuple3
return (int) (o2.f2 - o1.f2);
}
});
for (int i = 0; i < list.size() && i < 3; i++) {
collector.collect(list.get(i));
}
}
});
process.print();
//落地
process.addSink(new SinkPG());
? ? ? ? ?這個(gè)指標(biāo)中我們使用的是滾動(dòng)窗口,(由于數(shù)據(jù)過少)采用了每一秒把進(jìn)來的數(shù)據(jù)進(jìn)行一次計(jì)算,當(dāng)然這樣是不合乎常規(guī)的,比如我們需要統(tǒng)計(jì)一天的銷售額,我們可以使用TumblingProcessingTimeWindows.of(Time.days(1))來給他一個(gè)一天的窗口。?
?二、watermark(水位線)
????????1、什么是watermark(水位線的機(jī)制)?
? ? ? ? watermark本質(zhì)就是一個(gè)時(shí)間戳。實(shí)際上就是在原有的結(jié)束時(shí)間上再多等一個(gè)最大允許的數(shù)據(jù)延遲時(shí)間或者亂序時(shí)間,一旦有事件時(shí)間在這個(gè)多等時(shí)間刻度線后的消息事件達(dá)到就立刻觸發(fā)窗口計(jì)算。
????????2、 水位線能解決什么問題(為什么使用水位線)
????????水位線是事件時(shí)間的進(jìn)展,它是整個(gè)應(yīng)用的全局邏輯時(shí)鐘。水位線生成之后,會(huì)隨著數(shù)據(jù)在任務(wù)間流動(dòng),從而給每個(gè)任務(wù)指明當(dāng)前的事件時(shí)間;當(dāng)然水位線的時(shí)間也不是越大越好,如果過大也會(huì)降低、影響我們的實(shí)時(shí)性;如果在我們的一個(gè)最大延遲時(shí)間數(shù)據(jù)還未到,我們可以定義一個(gè)測輸出流標(biāo)簽,把遲到的數(shù)據(jù)放到我們的標(biāo)簽中
?????????3、統(tǒng)計(jì)url的訪問量
public class Test{
public static void main(String[] args) throws Exception {
//流式環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置全局并行度
env.setParallelism(1);
//設(shè)置水位線生成間隔
env.getConfig().setAutoWatermarkInterval(100);
SingleOutputStreamOperator
new MapFunction
@Override
public Event map(String value) throws Exception {
String[] split = value.split(",");
return new Event(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim()));
}
}
).returns(new TypeHint
}).assignTimestampsAndWatermarks(WatermarkStrategy
.
.withTimestampAssigner(new SerializableTimestampAssigner
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
eventStream.print(" input ");
//定義輸出標(biāo)簽
OutputTag
};
//統(tǒng)計(jì) url 訪問量
SingleOutputStreamOperator
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.minutes(1)) //1 min 延遲
//遲到數(shù)據(jù)輸出到冊數(shù)出列
.sideOutputLateData(later)
.aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());
result.print(" result ");
//側(cè)輸出流
result.getSideOutput(later).print("later datas");
env.execute();
}
? ? ? ? ?統(tǒng)計(jì)訪問量我們給了一分鐘等待遲到數(shù)據(jù)如果還未到達(dá)我們就定義一個(gè)標(biāo)簽,把遲到數(shù)據(jù)放到測輸出流以便后邊的計(jì)算。
? ? ? ? 三、總結(jié)?
? ? ? ? 在一般情況下,watermark和我們的window或CEP是結(jié)合使用的,首先設(shè)置一個(gè)好的水位線,能夠最大限度的保證數(shù)據(jù)完整性以及處理計(jì)算,水位線設(shè)計(jì)可以從(1、設(shè)計(jì)時(shí)間語義時(shí)盡量選用數(shù)據(jù)原有的時(shí)間,這樣能夠更有利于體現(xiàn)用戶想要的效果;2、為了保證數(shù)據(jù)完整性,我們可以設(shè)置一個(gè)亂序時(shí)間,當(dāng)亂序時(shí)間還未到達(dá)我們可以采用手動(dòng)allowedLateness方法,可以延遲時(shí)間,盡量讓遲到的數(shù)據(jù)全部獲取到,這個(gè)方法的使用是:在我們設(shè)置的水位線亂序時(shí)間數(shù)據(jù)還未達(dá)到,我們可以使用這個(gè)方法延遲窗口關(guān)閉的時(shí)間,保證數(shù)據(jù)的完整性。3、當(dāng)然還有我們的一個(gè)兜底方法就是設(shè)置一個(gè)側(cè)輸出流標(biāo)簽,將沒有獲取到的遲到數(shù)據(jù)進(jìn)行側(cè)輸出)。
柚子快報(bào)激活碼778899分享:flink 窗口和水位線
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。