欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)激活碼778899分享:flink 窗口和水位線

柚子快報(bào)激活碼778899分享:flink 窗口和水位線

http://yzkb.51969.com/

一、窗口

? ? ? ? 1、什么是窗口?

????????我們的flink主要是用來處理無界數(shù)據(jù)流,一種方式就是將我們的無界數(shù)據(jù)流切割成有限的“數(shù)據(jù)塊”進(jìn)行處理,這就是我們的窗口(window)。

? ? ? ? 2、窗口分類

滾動(dòng)窗口、滑動(dòng)窗口、會(huì)話窗口

滾動(dòng)窗口:timeWindow(Time.seconds(3))? ?count-tumbling-window

滑動(dòng)窗口:timeWindow(Time.seconds(5),Time.seconds(3))? ? ? ? count-sliding-window

??????3、窗口能解決什么問題(為什么使用窗口)

? ? ? ? 首先flink是一個(gè)實(shí)現(xiàn)了流批一體的計(jì)算框架,當(dāng)我們使用批處理時(shí)我們引入了窗口計(jì)算,實(shí)現(xiàn)我們的批處理。

? ? ?4、滾動(dòng)窗口(每個(gè)區(qū)消費(fèi)總額Top3的公司)

public class CityShopNameTopN {

public static void main(String[] args) throws Exception{

// TODO: 2022/9/2創(chuàng)建Flink流式處理環(huán)境

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

// TODO: 2022/9/2 設(shè)置并行度

environment.setParallelism(1);

String uu = UUID.randomUUID().toString().substring(0, 6).replace("-", "");

String groupId = "ware_goods_group"+uu;

FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource("dwd_foo_order_detail",groupId);

DataStreamSource order_detail = environment.addSource(kafkaSource);

SingleOutputStreamOperator map1 = order_detail.map(d -> JSON.parseObject(d));

//水位線

SingleOutputStreamOperator watermarks = map1.assignTimestampsAndWatermarks(

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))

.withTimestampAssigner(new SerializableTimestampAssigner() {

@Override

public long extractTimestamp(JSONObject element, long recordTimestamp) {

long time = 0;

try {

time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(element.getString("createTime")).getTime();

} catch (ParseException e) {

e.printStackTrace();

}

return time;

}

}));

SingleOutputStreamOperator> map = watermarks.map(new MapFunction>() {

@Override

public Tuple3 map(JSONObject value) throws Exception {

String goodsNum = value.getString("goodsNum");

String goodsPrice = value.getString("goodsPrice");

return new Tuple3<>(value.getString("regionName"), value.getString("cityName"), Integer.valueOf(goodsNum) * Double.valueOf(goodsPrice));

}

});

SingleOutputStreamOperator> process = map.keyBy(data -> data.f0 + "," + data.f1).sum(2).keyBy(data -> data.f0 + "," + data.f1)

.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).process(new ProcessWindowFunction, Tuple3, String, TimeWindow>() {

@Override

public void process(String s, Context context, Iterable> iterable, Collector> collector) throws Exception {

ArrayList> list = new ArrayList<>();

for (Tuple3 value : iterable) {

list.add(value);

}

list.sort(new Comparator>() {

@Override

public int compare(Tuple3 o1, Tuple3 o2) {

return (int) (o2.f2 - o1.f2);

}

});

for (int i = 0; i < list.size() && i < 3; i++) {

collector.collect(list.get(i));

}

}

});

process.print();

//落地

process.addSink(new SinkPG());

? ? ? ? ?這個(gè)指標(biāo)中我們使用的是滾動(dòng)窗口,(由于數(shù)據(jù)過少)采用了每一秒把進(jìn)來的數(shù)據(jù)進(jìn)行一次計(jì)算,當(dāng)然這樣是不合乎常規(guī)的,比如我們需要統(tǒng)計(jì)一天的銷售額,我們可以使用TumblingProcessingTimeWindows.of(Time.days(1))來給他一個(gè)一天的窗口。?

?二、watermark(水位線)

????????1、什么是watermark(水位線的機(jī)制)?

? ? ? ? watermark本質(zhì)就是一個(gè)時(shí)間戳。實(shí)際上就是在原有的結(jié)束時(shí)間上再多等一個(gè)最大允許的數(shù)據(jù)延遲時(shí)間或者亂序時(shí)間,一旦有事件時(shí)間在這個(gè)多等時(shí)間刻度線后的消息事件達(dá)到就立刻觸發(fā)窗口計(jì)算。

????????2、 水位線能解決什么問題(為什么使用水位線)

????????水位線是事件時(shí)間的進(jìn)展,它是整個(gè)應(yīng)用的全局邏輯時(shí)鐘。水位線生成之后,會(huì)隨著數(shù)據(jù)在任務(wù)間流動(dòng),從而給每個(gè)任務(wù)指明當(dāng)前的事件時(shí)間;當(dāng)然水位線的時(shí)間也不是越大越好,如果過大也會(huì)降低、影響我們的實(shí)時(shí)性;如果在我們的一個(gè)最大延遲時(shí)間數(shù)據(jù)還未到,我們可以定義一個(gè)測輸出流標(biāo)簽,把遲到的數(shù)據(jù)放到我們的標(biāo)簽中

?????????3、統(tǒng)計(jì)url的訪問量

public class Test{

public static void main(String[] args) throws Exception {

//流式環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//設(shè)置全局并行度

env.setParallelism(1);

//設(shè)置水位線生成間隔

env.getConfig().setAutoWatermarkInterval(100);

SingleOutputStreamOperator eventStream = env.socketTextStream("hadoop103", 9999).map(

new MapFunction() {

@Override

public Event map(String value) throws Exception {

String[] split = value.split(",");

return new Event(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim()));

}

}

).returns(new TypeHint() {

}).assignTimestampsAndWatermarks(WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(2))

.withTimestampAssigner(new SerializableTimestampAssigner() {

@Override

public long extractTimestamp(Event element, long recordTimestamp) {

return element.timestamp;

}

}));

eventStream.print(" input ");

//定義輸出標(biāo)簽

OutputTag later = new OutputTag("later") {

};

//統(tǒng)計(jì) url 訪問量

SingleOutputStreamOperator result = eventStream.keyBy(data -> data.url)

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.allowedLateness(Time.minutes(1)) //1 min 延遲

//遲到數(shù)據(jù)輸出到冊數(shù)出列

.sideOutputLateData(later)

.aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());

result.print(" result ");

//側(cè)輸出流

result.getSideOutput(later).print("later datas");

env.execute();

}

? ? ? ? ?統(tǒng)計(jì)訪問量我們給了一分鐘等待遲到數(shù)據(jù)如果還未到達(dá)我們就定義一個(gè)標(biāo)簽,把遲到數(shù)據(jù)放到測輸出流以便后邊的計(jì)算。

? ? ? ? 三、總結(jié)?

? ? ? ? 在一般情況下,watermark和我們的window或CEP是結(jié)合使用的,首先設(shè)置一個(gè)好的水位線,能夠最大限度的保證數(shù)據(jù)完整性以及處理計(jì)算,水位線設(shè)計(jì)可以從(1、設(shè)計(jì)時(shí)間語義時(shí)盡量選用數(shù)據(jù)原有的時(shí)間,這樣能夠更有利于體現(xiàn)用戶想要的效果;2、為了保證數(shù)據(jù)完整性,我們可以設(shè)置一個(gè)亂序時(shí)間,當(dāng)亂序時(shí)間還未到達(dá)我們可以采用手動(dòng)allowedLateness方法,可以延遲時(shí)間,盡量讓遲到的數(shù)據(jù)全部獲取到,這個(gè)方法的使用是:在我們設(shè)置的水位線亂序時(shí)間數(shù)據(jù)還未達(dá)到,我們可以使用這個(gè)方法延遲窗口關(guān)閉的時(shí)間,保證數(shù)據(jù)的完整性。3、當(dāng)然還有我們的一個(gè)兜底方法就是設(shè)置一個(gè)側(cè)輸出流標(biāo)簽,將沒有獲取到的遲到數(shù)據(jù)進(jìn)行側(cè)輸出)。

柚子快報(bào)激活碼778899分享:flink 窗口和水位線

http://yzkb.51969.com/

參考文章

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19075512.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄