欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:大數(shù)據(jù) Flink筆記

柚子快報邀請碼778899分享:大數(shù)據(jù) Flink筆記

http://yzkb.51969.com/

Flink筆記

一、Flink簡介

1、Spark Streaming和Flink的區(qū)別

Spark Streaming

微批處理(Micro-batching):Spark Streaming 將實時數(shù)據(jù)流分成小批次(通常每批次幾百毫秒到幾秒),然后以批處理的方式處理這些數(shù)據(jù)。每個微批次數(shù)據(jù)被看作一個 RDD(彈性分布式數(shù)據(jù)集),由 Spark 的核心引擎處理。延遲:由于微批處理的方式,延遲通常較大,通常在秒級。優(yōu)勢: 編程原語豐富,編程簡單 框架封裝層級較高,封裝性好 可以共用批處理處理邏輯,兼容性好 基于Spark,可以無縫內(nèi)嵌Spark其他子項目,如Spark Sql,MLlib等 劣勢: 微批處理,時間延遲大 穩(wěn)定性相對較差 機器性能消耗較大 Flink

真正的流處理(True Stream Processing):Flink 采用事件驅(qū)動模型,逐條處理事件,不分批次。Flink 的處理模型更接近實時,延遲通常在毫秒級。低延遲:由于逐條處理數(shù)據(jù),F(xiàn)link 的延遲比 Spark Streaming 更低,適合對延遲要求較高的場景。優(yōu)勢: Flink流處理為先的方法可提供低延遲,高吞吐率,近乎逐項處理的能力 Flink的很多組件是自行管理的 通過多種方式對工作進行分析進而優(yōu)化任務 提供了基于Web的調(diào)度視圖 主流實時計算框架對比

模型API保證次數(shù)容錯機制延時吞吐量批流統(tǒng)一業(yè)務模式易用性StormNative組合式At-least-onceRecord ACKs★★★★不支持需要其他框架★Spark StreamingMirco-batching聲明式Exectly-onceRDD Checkpoint★★★★支持需要其他框架★★Apache FlinkNative組合式Exectly-onceCheckpoint★★★★★★支持需要其他框架★★

2、什么是Flink

AApache Flink 是一個實時計算框架和分布式處理引擎,用于在無邊界和有邊界數(shù)據(jù)流上進行有狀態(tài)的計算。Flink 能在所有常見集群環(huán)境中運行,并能以內(nèi)存速度和任意規(guī)模進行計算。

這個圖展示了一個典型的數(shù)據(jù)處理架構(gòu),強調(diào)了 Apache Flink 的應用場景和功能。讓我們從左到右分步驟詳細解釋這個圖。

數(shù)據(jù)輸入

圖的左側(cè)列出了一些數(shù)據(jù)來源:

Transactions(交易)Logs(日志)IoT(物聯(lián)網(wǎng))Clicks(點擊)…(其他數(shù)據(jù)源)

這些數(shù)據(jù)源可以生成實時事件(Real-time Events)或存儲在數(shù)據(jù)庫、文件系統(tǒng)、KV-Store中。

數(shù)據(jù)處理框架

中間部分是核心,展示了 Apache Flink 如何處理這些數(shù)據(jù):

Event-driven Applications(事件驅(qū)動應用)

處理實時事件。Flink 可以處理來自各種實時數(shù)據(jù)源的事件,以構(gòu)建實時反應的應用。 Streaming Pipelines(流處理管道)

流處理工作流。Flink 提供了流處理管道,處理連續(xù)不斷的數(shù)據(jù)流,支持復雜的事件處理、數(shù)據(jù)變換和聚合。 Stream & Batch Analytics(流和批分析)

同時支持流處理和批處理分析。Flink 能夠在同一個系統(tǒng)中處理流數(shù)據(jù)和批數(shù)據(jù),提供統(tǒng)一的分析平臺。

資源與存儲

Flink 的處理依賴于各種資源和存儲系統(tǒng):

Resources(資源):

K8s(Kubernetes)YarnMesos這些資源管理平臺用于管理和調(diào)度 Flink 集群中的計算資源。 Storage(存儲):

HDFS(Hadoop Distributed File System)S3(Amazon Simple Storage Service)NFS(Network File System)這些存儲系統(tǒng)用于保存 Flink 處理過程中使用和生成的數(shù)據(jù)。

數(shù)據(jù)輸出

圖的右側(cè)展示了 Flink 處理后的數(shù)據(jù)去向:

Application(應用):處理后的數(shù)據(jù)可以直接驅(qū)動應用程序。Event Log(事件日志):處理后的事件數(shù)據(jù)可以記錄在事件日志中,用于審計或進一步分析。Database, File System, KV-Store(數(shù)據(jù)庫、文件系統(tǒng)、鍵值存儲):結(jié)果數(shù)據(jù)可以保存到不同的存儲系統(tǒng)中。

數(shù)據(jù)流動過程

數(shù)據(jù)從左側(cè)的數(shù)據(jù)源開始,經(jīng)過 Flink 的處理框架(事件驅(qū)動應用、流處理管道、流和批分析),最終輸出到應用、事件日志或存儲系統(tǒng)中。在這個過程中,F(xiàn)link 依賴資源管理平臺(如 Kubernetes、Yarn、Mesos)來調(diào)度計算資源,并使用各種存儲系統(tǒng)(如 HDFS、S3、NFS)來保存數(shù)據(jù)。

總結(jié)

這張圖展示了 Flink 在處理實時數(shù)據(jù)和批數(shù)據(jù)方面的強大能力。它可以接收多種數(shù)據(jù)源,通過流處理管道和事件驅(qū)動應用實時處理數(shù)據(jù),并能同時支持流和批處理分析。處理后的數(shù)據(jù)可以應用于不同的場景,包括驅(qū)動應用程序、記錄事件日志和保存到各種存儲系統(tǒng)中。Flink 的架構(gòu)依賴于資源管理平臺和分布式存儲系統(tǒng),確保高效、可靠地處理大量數(shù)據(jù)。

3、Flink的特性

支持高吞吐、低延遲、高性能的流處理支持帶有事件時間的窗口(Window)操作支持有狀態(tài)計算的Exactly-once語義支持高度靈活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作支持具有反壓功能的持續(xù)流模型支持基于輕量級分布式快照(Snapshot)實現(xiàn)的容錯一個運行時同時支持Batch on Streaming處理和Streaming處理Flink在JVM內(nèi)部實現(xiàn)了自己的內(nèi)存管理,避免了出現(xiàn)oom支持迭代計算支持程序自動優(yōu)化:避免特定情況下Shuffle、排序等昂貴操作,中間結(jié)果有必要進行緩存

4、Flink的組件棧

Apache Flink的組件棧: Deployment層: Apache Flink可以部署在單JVM,,獨立集群,Yarn,K8S等集群中運行。 Runtime層: 針對不同的執(zhí)行環(huán)境,F(xiàn)link 提供了一套統(tǒng)一的分布式作業(yè)執(zhí)行引擎,也就是 Flink Runtime 這層。 Flink Runtime 層采用了標準 master-slave 的結(jié)構(gòu),master(JobManager )負責管理整個集群中的資源和作業(yè);Slave(TaskManager)負責提供具體的資源并實際執(zhí)行作業(yè)。 API層&Libraries層: API層 API蹭其中包含 DataStream API(應用于有界/無界數(shù)據(jù)流場景)和 DataSet API(應用于有界數(shù)據(jù)集場景)兩部分。Core APIs 提供的流式 API(Fluent API)為數(shù)據(jù)處理提供了通用的模塊組件,例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等。此層API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對應的類。Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API來實現(xiàn)自己的需求。DataSet API 還額外提供了一些原語,比如循環(huán)/迭代(loop/iteration)操作。 Libraries層 Table API是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數(shù)據(jù)場景下,它可以表示一張正在動態(tài)改變的表。Table API遵循(擴展)關(guān)系模型:即表擁有 schema(類似于關(guān)系型數(shù)據(jù)庫中的 schema), SQL這層抽象在語義和程序表達式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL查詢表達式。SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的,并且 SQL 查詢語句可以在 Table API中定義的表上執(zhí)行。

5、Flink 中的 API

Flink 為流式/批式處理應用程序的開發(fā)提供了不同級別的抽象。

Flink API 最底層的抽象為有狀態(tài)實時流處理。其抽象實現(xiàn)是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應用程序中自由地處理來自單流或多流的事件(數(shù)據(jù)),并提供具有全局一致性和容錯保障的狀態(tài)。此外,用戶可以在此層抽象中注冊事件時間(event time)和處理時間(processing time)回調(diào)方法,從而允許程序可以實現(xiàn)復雜計算。 Flink API 第二層抽象是 Core APIs。實際上,許多應用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進行編程:其中包含 DataStream API(應用于有界/無界數(shù)據(jù)流場景)和 DataSet API(應用于有界數(shù)據(jù)集場景)兩部分。Core APIs 提供的流式 API(Fluent API)為數(shù)據(jù)處理提供了通用的模塊組件,例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等。此層 API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對應的類。 Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實現(xiàn)自己的需求。DataSet API 還額外提供了一些原語,比如循環(huán)/迭代(loop/iteration)操作。 Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數(shù)據(jù)場景下,它可以表示一張正在動態(tài)改變的表。Table API 遵循(擴展)關(guān)系模型:即表擁有 schema(類似于關(guān)系型數(shù)據(jù)庫中的 schema),并且 Table API 也提供了類似于關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應執(zhí)行的邏輯操作,而不是確切地指定程序應該執(zhí)行的代碼。盡管 Table API 使用起來很簡潔并且可以由各種類型的用戶自定義函數(shù)擴展功能,但還是比 Core API 的表達能力差。此外,Table API 程序在執(zhí)行之前還會使用優(yōu)化器中的優(yōu)化規(guī)則對用戶編寫的表達式進行優(yōu)化。 表和 DataStream/DataSet 可以進行無縫切換,F(xiàn)link 允許用戶在編寫應用程序時將 Table API 與 DataStream/DataSet API 混合使用。 Flink API 最頂層抽象是 SQL。這層抽象在語義和程序表達式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達式。SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執(zhí)行。

6、Flink的流批處理

流處理的代碼舉例

package com.shujia.core;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo1StreamWordCount {

public static void main(String[] args) throws Exception {

//加載flink環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//設(shè)置任務的并行度;一個并行度相當于一個task

env.setParallelism(2);

//設(shè)置延遲時間,默認的時間是200毫秒,單位是毫秒

env.setBufferTimeout(100);

//讀取數(shù)據(jù)

DataStream dataStream = env.socketTextStream("master", 12345);

//計算每個單詞的個數(shù)

DataStream> wordDS = dataStream.map((word) -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

//通過keyBy進行分組

KeyedStream, String> keyByDS = wordDS.keyBy((kv) -> kv.f0);

//通過sum進行聚合

DataStream> wordCountDS = keyByDS.sum(1);

//打印數(shù)據(jù)

wordCountDS.print();

//觸發(fā)程序執(zhí)行

env.execute();

}

}

flink持續(xù)流處理的模型

批處理的代碼

package com.shujia.core;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2BatchWordCount {

public static void main(String[] args) throws Exception {

//加載flink環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//設(shè)置任務的并行度;一個并行度相當于一個task

env.setParallelism(2);

//設(shè)置延遲時間,默認的時間是200毫秒,單位是毫秒

env.setBufferTimeout(100);

/*

*處理模式

* RuntimeExecutionMode.BATCH:批處理模式(MapReduce模型)

* 1、輸出最終結(jié)果

* 2、批處理模式只能用于處理有界流

*

* RuntimeExecutionMode.STREAMING:流處理模式(持續(xù)流模型)

* 1、輸出連續(xù)結(jié)果

* 2、流處理模式,有界流核無界流都可以處理

*/

//設(shè)置數(shù)據(jù)的處理模式

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

//讀取數(shù)據(jù)

DataStream dataStream = env.readTextFile("flink/data/words.txt");

//計算每個單詞的個數(shù)

DataStream> wordDS = dataStream.map((word) -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

//通過keyBy進行分組

KeyedStream, String> keyByDS = wordDS.keyBy((kv) -> kv.f0);

//通過sum進行聚合

DataStream> wordCountDS = keyByDS.sum(1);

//打印數(shù)據(jù)

wordCountDS.print();

//觸發(fā)程序執(zhí)行

env.execute();

}

}

二、Data Sources

1、分為四類

Flink 在流處理和批處理上的 source 大概有 4 類: 基于本地集合的 source、 基于文件的 source、 基于網(wǎng)絡(luò)套接字的 source、 自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。

2、具體介紹

2.1、基于本地集合的source

fromCollection(Collection) - 從 Java Java.util.Collection 創(chuàng)建數(shù)據(jù)流。集合中的所有元素必須屬于同一類型。

//創(chuàng)建flink環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//創(chuàng)建List集合

ArrayList wordsList = new ArrayList<>();

//添加集合元素

wordsList.add("java");

wordsList.add("java");

wordsList.add("flink");

wordsList.add("flink");

wordsList.add("hadoop");

wordsList.add("hadoop");

/*

*基于集合的source --- 有界流

*/

//讀取數(shù)據(jù)

DataStream wordDS = env.fromCollection(wordsList);

//打印結(jié)果

wordDS.print();

//啟動任務

env.execute();

2.2、基于文件的source

readTextFile(path) - 讀取文本文件,例如遵守 TextInputFormat 規(guī)范的文件,逐行讀取并將它們作為字符串返回。 readFile(fileInputFormat, path) - 按照指定的文件輸入格式讀?。ㄒ淮危┪募?。 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是前兩個方法內(nèi)部調(diào)用的方法。它基于給定的 fileInputFormat 讀取路徑 path 上的文件。根據(jù)提供的 watchType 的不同,source 可能定期(每 interval 毫秒)監(jiān)控路徑上的新數(shù)據(jù)(watchType 為 FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次當前路徑中的數(shù)據(jù)然后退出(watchType 為 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用戶可以進一步排除正在處理的文件。 實現(xiàn): 在底層,F(xiàn)link 將文件讀取過程拆分為兩個子任務,即 目錄監(jiān)控 和 數(shù)據(jù)讀取。每個子任務都由一個單獨的實體實現(xiàn)。監(jiān)控由單個非并行(并行度 = 1)任務實現(xiàn),而讀取由多個并行運行的任務執(zhí)行。后者的并行度和作業(yè)的并行度相等。單個監(jiān)控任務的作用是掃描目錄(定期或僅掃描一次,取決于 watchType),找到要處理的文件,將它們劃分為 分片,并將這些分片分配給下游 reader。Reader 是將實際獲取數(shù)據(jù)的角色。每個分片只能被一個 reader 讀取,而一個 reader 可以一個一個地讀取多個分片。 重要提示:

如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,當一個文件被修改時,它的內(nèi)容會被完全重新處理。這可能會打破 “精確一次” 的語義,因為在文件末尾追加數(shù)據(jù)將導致重新處理文件的所有內(nèi)容。如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE,source 掃描一次路徑然后退出,無需等待 reader 讀完文件內(nèi)容。當然,reader 會繼續(xù)讀取數(shù)據(jù),直到所有文件內(nèi)容都讀完。關(guān)閉 source 會導致在那之后不再有檢查點。這可能會導致節(jié)點故障后恢復速度變慢,因為作業(yè)將從最后一個檢查點恢復讀取。

//加載flink環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/*

* 老版本讀取數(shù)據(jù)

*/

//讀取數(shù)據(jù)

DataStream wordDs = env.readTextFile("flink/data/words.txt");

//打印數(shù)據(jù)

// wordDs.print();

/*

* 新版本讀取數(shù)據(jù)的方式,可以讀取有界流和無界流

*/

//讀取有界流

// FileSource fileSource = FileSource.forRecordStreamFormat(

// new TextLineInputFormat("UTF-8"),

// new Path("flink/data/words.txt")

// ).build();

//

// DataStreamSource fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSourceDS");

//

// fileSourceDS.print();

//讀取無界流

FileSource fileSource = FileSource.forRecordStreamFormat(

//指定讀取數(shù)據(jù)的編碼

new TextLineInputFormat("UTF-8"),

//指定讀取數(shù)據(jù)的路徑

new Path("flink/data/stu")

)

每隔一段時間讀取目錄下新的文件,構(gòu)建無界流

.monitorContinuously(Duration.ofSeconds(5))

.build();

DataStream fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSourceDS");

DataStream> clazzDS = fileSourceDS.map(lines -> Tuple2.of(lines.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = clazzDS.keyBy(kv -> kv.f0);

DataStream> countDS = keyByDS.sum(1);

countDS.print();

//啟動任務

env.execute();

2.3、基于套接字的source

通過 nc -lk post(端口號) 啟動一個輸入流socketTextStream - 從套接字讀取。元素可以由分隔符分隔。

//加載flink環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//設(shè)置任務的并行度;一個并行度相當于一個task

env.setParallelism(2);

//設(shè)置延遲時間,默認的時間是200毫秒,單位是毫秒

env.setBufferTimeout(100);

//讀取數(shù)據(jù)

DataStream dataStream = env.socketTextStream("master", 12345);

2.4、基于自定義的source

addSource - 關(guān)聯(lián)一個新的 source function。

public class Demo3MySource {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//讀取自定義數(shù)據(jù)

DataStreamSource sourceFunctionDS = env.addSource(new MySource());

//打印結(jié)果

sourceFunctionDS.print();

//啟動任務

env.execute();

}

}

/**

* 自定義source

* 實現(xiàn)SourceFunction接口,實現(xiàn)接口中的run方法

*/

class MySource implements SourceFunction {

/**

* flink啟動的時候會執(zhí)行一次,再run方法中讀取外部的數(shù)據(jù),將數(shù)據(jù)發(fā)送到下游

*/

@Override

public void run(SourceContext ctx) throws Exception {

while (true) {

ctx.collect(100);

Thread.sleep(100);

}

}

//cancel方法再任務取消的時候執(zhí)行,一般用于回收資源

@Override

public void cancel() {

}

}

可以通過自定義的source連接到MySQL數(shù)據(jù)庫等

public class Demo4MySQLSource {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//讀取自定義數(shù)據(jù)

DataStreamSource sourceFunctionDS = env.addSource(new MySQLSource());

sourceFunctionDS.print();

env.execute();

}

}

@Data

@AllArgsConstructor

class Student {

private int id;

private String name;

private int age;

private String gender;

private String clazz;

}

/**

* 自定義source 讀取mysql中的數(shù)據(jù)

* 練習:自定義source 讀取redis中的數(shù)據(jù)

*/

class MySQLSource implements SourceFunction {

//run方法在任務啟動的時候執(zhí)行一次

@Override

public void run(SourceContext ctx) throws Exception {

//加載驅(qū)動

Class.forName("com.mysql.jdbc.Driver");

//創(chuàng)建連接對象

Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");

//獲取操作對象

PreparedStatement statement = conn.prepareStatement("select * from students");

//執(zhí)行SQL語句

ResultSet resultSet = statement.executeQuery();

while (resultSet.next()) {

int id = resultSet.getInt(1);

String name = resultSet.getString(2);

int age = resultSet.getInt(3);

String gender = resultSet.getString(4);

String clazz = resultSet.getString(5);

//封裝為學生對象

Student student = new Student(id, name, age, gender, clazz);

//將數(shù)據(jù)發(fā)送到下游

ctx.collect(student);

}

//關(guān)閉資源

statement.close();

conn.close();

}

@Override

public void cancel() {

}

}

三、DataStream Transformations(算子)

1、什么是算子

用戶通過算子能將一個或多個 DataStream 轉(zhuǎn)換成新的 DataStream,在應用程序中可以將多個數(shù)據(jù)轉(zhuǎn)換算子合并成一個復雜的數(shù)據(jù)流拓撲。

2、數(shù)據(jù)流轉(zhuǎn)換

2.1、Map

DataStream → DataStream # 輸入一個元素同時輸出一個元素。下面是將輸入流中元素數(shù)值加倍的 map function:

public class Demo1Map {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream linesDS = env.socketTextStream("master", 12345);

//匿名內(nèi)部類的方式

// SingleOutputStreamOperator> wordDS = linesDS.map(new MapFunction>() {

//

// @Override

// public Tuple2 map(String value) throws Exception {

// return Tuple2.of(value, 1);

// }

// });

//聲明式寫法

DataStream> wordDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = wordDS.keyBy(kv -> kv.f0);

DataStream> countDS = keyByDS.sum(1);

countDS.print();

env.execute();

}

}

2.2、FlatMap

DataStream → DataStream # 輸入一個元素同時產(chǎn)生零個、一個或多個元素。下面是將句子拆分為單詞的 flatmap function:

public class Demo2FlatMap {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream linesDS = env.socketTextStream("master", 12345);

//匿名內(nèi)部類方式

// SingleOutputStreamOperator> flatMapDS = linesDS.flatMap(new FlatMapFunction>() {

//

// @Override

// public void flatMap(String line, Collector> out) throws Exception {

// String[] words = line.split(" ");

// for (String word : words) {

// out.collect(Tuple2.of(word, 1));

//

// }

// }

// });

//聲明式要指定數(shù)據(jù)類型

SingleOutputStreamOperator>flatMapDS = linesDS.flatMap((line, out) -> {

String[] words = line.split(" ");

for (String word : words) {

out.collect(Tuple2.of(word, 1));

}

}, Types.TUPLE(Types.STRING,Types.INT));

KeyedStream, String> keyByDS = flatMapDS.keyBy(kv -> kv.f0);

SingleOutputStreamOperator> countDS = keyByDS.sum(1);

countDS.print();

env.execute();

}

}

2.3、Filter

DataStream → DataStream # 為每個元素執(zhí)行一個布爾 function,并保留那些 function 輸出值為 true 的元素。下面是過濾掉零值的 filter:

public class Demo3Filter {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource studentDS = env.readTextFile("flink/data/students.csv");

//匿名內(nèi)部類方式

SingleOutputStreamOperator filterDS = studentDS.filter(new FilterFunction() {

@Override

public boolean filter(String line) throws Exception {

return "文科一班".equals(line.split(",")[4]);

}

});

//聲明式

// SingleOutputStreamOperator filterDS = studentDS.filter(line -> "文科一班".equals(line.split(",")[4]));

filterDS.print();

env.execute();

}

}

2.4、KeyBy

DataStream → KeyedStream # 在邏輯上將流劃分為不相交的分區(qū)。具有相同 key 的記錄都分配到同一個分區(qū)。在內(nèi)部, keyBy() 是通過哈希分區(qū)實現(xiàn)的。有多種指定 key 的方式。 以下情況,一個類不能作為 key:

它是一種 POJO 類,但沒有重寫 hashCode() 方法而是依賴于 Object.hashCode() 實現(xiàn)。它是任意類的數(shù)組。

public class Demo4KeyBy {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource linesDS = env.socketTextStream("master", 12345);

//匿名內(nèi)部類方式

KeyedStream keyByDS = linesDS.keyBy(new KeySelector() {

@Override

public String getKey(String word) throws Exception {

return word;

}

});

keyByDS.print();

//lambda表達式

//keyBy:將相同的key發(fā)送到同一個task中

// KeyedStream keyByDS = linesDS.keyBy(word -> word);

env.execute();

}

}

2.5、Reduce

KeyedStream → DataStream # 在相同 key 的數(shù)據(jù)流上“滾動”執(zhí)行 reduce。將當前元素與最后一次 reduce 得到的值組合然后輸出新值。

public class Demo5Reduce {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream streamSource = env.readTextFile("flink/data/words.txt");

DataStream flatMapDS = streamSource.flatMap((FlatMapFunction) (line, out) -> {

String[] words = line.split("\\|");

for (String word : words) {

out.collect(word);

}

},Types.STRING);;

DataStream> mapDS = flatMapDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = mapDS.keyBy(kv -> kv.f0);

//匿名內(nèi)部類

// DataStream> reduceDS = keyByDS.reduce(new ReduceFunction>() {

// @Override

// public Tuple2 reduce(Tuple2 kv1, Tuple2 kv2) throws Exception {

// return Tuple2.of(kv1.f0, kv1.f1 + kv2.f1);

// }

// });

//聲明式

SingleOutputStreamOperator> reduceDS = keyByDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));

reduceDS.print();

env.execute();

}

}

2.6、Window

KeyedStream → WindowedStream # 可以在已經(jīng)分區(qū)的 KeyedStreams 上定義 Window。Window 根據(jù)某些特征(例如,最近 5 秒內(nèi)到達的數(shù)據(jù))對每個 key Stream 中的數(shù)據(jù)進行分組。請參閱 windows 獲取有關(guān) window 的完整說明。

public class Demo6Window {

public static void main(String[] args) throws Exception {

//加載環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//讀取數(shù)據(jù)

DataStream streamSource = env.socketTextStream("master", 12345);

//轉(zhuǎn)換kv

DataStream> mapDS = streamSource.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

//分組

KeyedStream, String> keyByDS = mapDS.keyBy(kv -> kv.f0);

//設(shè)置滑動窗口

//SlidingProcessingTimeWindows:滑動的處理時間窗口

WindowedStream, String, TimeWindow> windowDS = keyByDS.window(

//傳入窗口的大小以及滑動的距離

SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(5))

);

//計算每個單詞的個數(shù)

DataStream> countDS = windowDS.sum(1);

//打印結(jié)果

countDS.print();

//啟動任務

env.execute();

}

}

2.7、Union

DataStream → DataStream #* 將兩個或多個數(shù)據(jù)流聯(lián)合來創(chuàng)建一個包含所有流中數(shù)據(jù)的新流。注意:如果一個數(shù)據(jù)流和自身進行聯(lián)合,這個流中的每個數(shù)據(jù)將在合并后的流中出現(xiàn)兩次。

public class Demo7Union {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

DataStreamSource streamSource1 = env.socketTextStream("master", 8888);

/*

* union:合并兩個DS

* 在數(shù)據(jù)層面并沒有合并,只是在邏輯層面合并了

*/

DataStream unionDS = streamSource.union(streamSource1);

unionDS.print();

env.execute();

}

}

2.8、ProcessFunction

ProcessFunction是一種低級流處理操作,可以訪問所有(非循環(huán))流應用程序的基本構(gòu)建塊

public class Demo8ProcessFunction {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

/*

* process算子是flink的底層算子嗎,可以代替map flatMap filter算子

*/

SingleOutputStreamOperator> processDS = streamSource.process(new ProcessFunction>() {

/**

* processElement:相當于flatMap。每一條數(shù)據(jù)執(zhí)行一次,可以返回一條也可以返回多條

* line:一行數(shù)據(jù)

* ctx:上下文對象(代表flink的執(zhí)行環(huán)境)

* out: 用于將數(shù)據(jù)發(fā)送到下游

*/

@Override

public void processElement(String line, ProcessFunction>.Context ctx, Collector> out) throws Exception {

String[] words = line.split(" ");

for (String word : words) {

Tuple2 tuple2 = Tuple2.of(word, 1);

out.collect(tuple2);

}

}

});

processDS.print();

env.execute();

}

}

四、Data Sinks

1、概述

在 Apache Flink 中,數(shù)據(jù)接收器(Data Sinks)用于將處理后的數(shù)據(jù)發(fā)送到外部系統(tǒng)

2、分為四大類

寫入文件、 打印出來、 寫入 socket 、 自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 sink。

3、具體介紹

主要介紹常用的兩類

3.1、FileSink

public class Demo1FileSink {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource wordDs = env.socketTextStream("master", 12345);

FileSink fileSink = FileSink

.forRowFormat(new Path("flink/data/words"), new SimpleStringEncoder<>("UTF-8"))

//指定滾動策略

.withRollingPolicy(

DefaultRollingPolicy.builder()

//包含了至少10秒的數(shù)據(jù)量

.withRolloverInterval(Duration.ofSeconds(10))

//從沒接收延時10秒之外的新紀錄

.withInactivityInterval(Duration.ofSeconds(10))

//文件大小已經(jīng)達到 1MB(寫入最后一條記錄之后)

.withMaxPartSize(MemorySize.ofMebiBytes(1))

.build()

).build();

wordDs.sinkTo(fileSink);

env.execute();

}

}

3.2、自定義Sink

public class Demo2MySink {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource wordsDs = env.socketTextStream("master", 12345);

wordsDs.addSink(new MySink());

env.execute();

}

}

//自定義sink

class MySink implements SinkFunction {

@Override

public void invoke(String value, Context context) throws Exception {

//自定義數(shù)據(jù)sink的位置

System.out.println("自定義sink:" + value);

}

}

通過自定義Sink將數(shù)據(jù)保存到MySQL數(shù)據(jù)庫中

public class Demo3MySQLSink {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream studentDS = env.addSource(new MySQLSource());

DataStream> genderDS = studentDS.map(stu -> Tuple2.of(stu.getGender(), 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = genderDS.keyBy(kv -> kv.f0);

SingleOutputStreamOperator> sumDS = keyByDS.sum(1);

sumDS.addSink(new MySQLSink());

env.execute();

}

}

@Data

@AllArgsConstructor

class Student {

private int id;

private String name;

private int age;

private String gender;

private String clazz;

}

/**

* 自定義source 讀取mysql中的數(shù)據(jù)

*

*/

class MySQLSource implements SourceFunction {

//run方法在任務啟動的時候執(zhí)行一次

@Override

public void run(SourceContext ctx) throws Exception {

//加載驅(qū)動

Class.forName("com.mysql.jdbc.Driver");

//創(chuàng)建連接對象

Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false&characterEncoding=UTF-8", "root", "123456");

//獲取操作對象

PreparedStatement statement = conn.prepareStatement("select * from students");

//執(zhí)行SQL語句

ResultSet resultSet = statement.executeQuery();

while (resultSet.next()) {

int id = resultSet.getInt(1);

String name = resultSet.getString(2);

int age = resultSet.getInt(3);

String gender = resultSet.getString(4);

String clazz = resultSet.getString(5);

//封裝為學生對象

Student student = new Student(id, name, age, gender, clazz);

//將數(shù)據(jù)發(fā)送到下游

ctx.collect(student);

}

//關(guān)閉資源

statement.close();

conn.close();

}

@Override

public void cancel() {

}

}

class MySQLSink extends RichSinkFunction> {

private Connection conn;

private PreparedStatement statement;

@Override

public void open(Configuration parameters) throws Exception {

//加載驅(qū)動

Class.forName("com.mysql.jdbc.Driver");

//創(chuàng)建連接對象

conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false&characterEncoding=UTF-8", "root", "123456");

//獲取操作對象

statement = conn.prepareStatement("replace into gender_count values(?,?)");

}

@Override

public void close() throws Exception {

statement.close();

conn.close();

}

@Override

public void invoke(Tuple2 kv, Context context) throws Exception {

statement.setString(1, kv.f0);

statement.setInt(2, kv.f1);

statement.execute();

}

}

五、Flink的集群搭建

1、Standalone(獨立集群)

1.1、上傳解壓配置環(huán)境變量

# 解壓

tar -xvf flink-1.15.2-bin-scala_2.12.tgz

# 配置環(huán)境變量

vim /etc/profile

export FLINK_HOME=/usr/local/soft/flink-1.15.2

export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

1.2、修改配置文件

flink-conf.yaml

jobmanager.rpc.address: master

jobmanager.bind-host: 0.0.0.0

taskmanager.bind-host: 0.0.0.0

taskmanager.host: localhost # noe1和node2需要單獨修改

taskmanager.numberOfTaskSlots: 4

rest.address: master

rest.bind-address: 0.0.0.0

masters

master:8081

workers

node1

node2

1.3、同步到所有節(jié)點

scp -r flink-1.15.2 node1:`pwd`

scp -r flink-1.15.2 node2:`pwd`

# 修改node1和node2中地taskmanager.host

taskmanager.host: node1

taskmanager.host: node2

1.4、啟動Flink獨立集群

start-cluster.sh

# stop-cluster.sh

# flink web ui

http://master:8081

1.5、提交任務

將代碼打包上傳到服務器提交

flink run -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar

在flink web ui中直接提交

2、Flink on yarn

flink on yarn模式:將flink地任務提交到y(tǒng)arn上運行

2.1、整合到y(tǒng)arn

# 在環(huán)境變量中配置HADOOP_CLASSSPATH

vim /etc/profile

export HADOOP_CLASSPATH=`hadoop classpath`

source /etc/profile

2.2、Flink on yarn部署模式

部署模式總共有三種

Application Mode

1、將任務提交到y(tǒng)arn上運行,yarn會為每一個flink地任務啟動一個jobmanager和一個或者多個taskmanasger 2、代碼main函數(shù)不再本地運行,dataFlow不再本地構(gòu)建,如果代碼報錯在本地看不到詳細地錯誤日志

flink run-application -t yarn-application -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar

# 查看yarn的日志

yarn logs -applicationId application_1717039073374_0001

Per-Job Cluster Mode

1、將任務提交到y(tǒng)arn上運行,yarn會為每一個flink地任務啟動一個jobmanager和一個或者多個taskmanasger 2、代碼地main函數(shù)在本地啟動,在本地構(gòu)建dataflow,再將dataflow提交給jobmanager,如果代碼報錯再本地可以爛到部分錯誤日志

flink run -t yarn-per-job -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar

Session Mode

1、先再yarn中啟動一個jobmanager, 不啟動taskmanager 2、提交任務地時候再動態(tài)申請taskmanager 3、所有使用session模式提交的任務共享同一個jobmanager 4、類似獨立集群,只是集群在yarn中啟動了,可以動態(tài)申請資源 5、一般用于測試

# 1、先啟動會話集群

yarn-session.sh -d

# 2、在提交任務

flink run -t yarn-session -Dyarn.application.id=application_1717068381486_0003 -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar

# 在網(wǎng)頁中直接提交

六、Flink的架構(gòu)及其工作原理

1、解析Flink集群

Flink 運行時由兩種類型的進程組成:一個 JobManager 和一個或者多個 TaskManager。

Flink Program:

Program Code:這是用戶編寫的Flink程序,包含數(shù)據(jù)處理的邏輯。Optimizer / Graph Builder:當用戶提交Flink程序時,首先由客戶端(Client)解析程序并構(gòu)建一個數(shù)據(jù)流圖(Dataflow Graph)。 Client:

Actor System:Flink的Client通過Actor System與JobManager進行通信。Dataflow Graph:客戶端生成的數(shù)據(jù)流圖(Dataflow Graph)會被提交給JobManager。 JobManager:

Actor System:JobManager也通過Actor System與Client和TaskManager進行通信。Dataflow Graph:JobManager接收到數(shù)據(jù)流圖后,負責將其轉(zhuǎn)換為實際的執(zhí)行圖。Scheduler:調(diào)度器負責分配任務,并決定在集群中的哪個TaskManager上運行哪些任務。Checkpoint Coordinator:負責協(xié)調(diào)檢查點操作,以確保在出現(xiàn)故障時能夠從最近的檢查點恢復。 TaskManager:

Task Slots:每個TaskManager擁有多個任務槽(Task Slot),每個任務槽可以獨立執(zhí)行一個任務(Task)。Memory & I/O Manager:管理內(nèi)存和I/O操作,以確保高效的數(shù)據(jù)處理。Network Manager:管理網(wǎng)絡(luò)通信,確保TaskManager之間的數(shù)據(jù)流傳輸順暢。Actor System:與JobManager和其他TaskManager進行通信。

2、TaskSlot和并行度

2.1、TaskSlot

Slot 是指 TaskManager 最大能并發(fā)執(zhí)行的能力

Task Slot 是 Flink 中用于執(zhí)行并行任務的物理資源單元。它們可以被視為一個 TaskManager(任務管理器)上的資源分配單位。

每個 Task Slot 可以運行一個并行任務,但也可以配置多個 Task Slot 來支持更高的并行度。

Task Slot 通常與計算資源相關(guān)聯(lián),比如一個 CPU 核心、一段內(nèi)存等。

2.2、Parallelism(并行度)

parallelism 是指 TaskManager 實際使用的并發(fā)能力

Parallelism 是指在同一時間處理數(shù)據(jù)的任務數(shù)量。在 Flink 中,它可以應用于整個作業(yè)(全局并行度)或單個算子(局部并行度)。

Flink 作業(yè)的并行度可以在作業(yè)提交時指定,也可以在算子級別進行設(shè)置。通過設(shè)置并行度,可以控制作業(yè)的性能和資源利用率。

2.3、設(shè)置并行度的方式

在代碼中設(shè)置:env.setParallelism(2) 在提交任務是通過參數(shù)設(shè)置 -p (推薦使用) 在配置文件中統(tǒng)一設(shè)置 每一個算子可以單獨設(shè)置并行度

算子設(shè)置并行度 > env 設(shè)置并行度 > 配置文件默認并行度。

2.4、Flink的共享資源

1、flink需要資源的數(shù)量和task數(shù)量無關(guān) 2、一個并行度對應一個資源(slot) 3、上游task的下游task共享同一個資源

2.5、并行度設(shè)置的原則

1、實時計算的任務并行度取決于數(shù)據(jù)的吞吐量 2、聚合計算(有shuffle)的代碼一個并行度大概一秒可以處理10000條數(shù)據(jù)左右 3、非聚合計算是,一個并行度大概一秒可以處理10萬條左右

3、Event Time(事件時間) and Processing Time (處理時間)

3.1、Processing Time

處理時間是指執(zhí)行相應操作的機器的系統(tǒng)時間。

public class Demo4ProcTime {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream wordsDS = env.socketTextStream("master", 8888);

//轉(zhuǎn)換成kv

DataStream> kvDS = wordsDS

.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

//按照單詞分組

KeyedStream, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

//劃分窗口

//TumblingProcessingTimeWindows:股東的處理時間窗口

WindowedStream, String, TimeWindow> windowDS = keyByDS

.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

//統(tǒng)計單詞的數(shù)量

DataStream> countDS = windowDS.sum(1);

countDS.print();

env.execute();

}

}

3.2、Event Time

事件時間是每個單獨事件在其產(chǎn)生設(shè)備上發(fā)生的時間。這個時間通常在記錄進入Flink之前嵌入到記錄中,并且可以從每個記錄中提取事件時間戳。在事件時間中,時間的進展取決于數(shù)據(jù),而不是任何操作機器的系統(tǒng)時間(例如:window系統(tǒng)時間)。事件時間程序必須指定如何生成事件時間水位線,這是在事件時間中表示進程的機制。

3.3、水位線

水位線等于最新一條數(shù)據(jù)的時間戳,在Flink中測量事件時間進展的機制是水位線。水位線作為數(shù)據(jù)流的一部分,并攜帶時間戳t。水位線(t)聲明事件時間在該流中已經(jīng)達到時間t,這意味著流中不應該再有時間戳t <= t的元素(即時間戳比水位線更早或等于水位線的事件)。這種情況只對于數(shù)據(jù)流是按照事件中的時間戳順序排列的

但是,數(shù)據(jù)流可能是無序的,這會導致watermark 一旦越過窗口結(jié)束的 timestamp,小于watermark 的事件時間不能觸發(fā)窗口,從而導致數(shù)據(jù)丟失,可以設(shè)置一些參數(shù)讓水位線延遲,使小于水位線的數(shù)據(jù)可以再次觸發(fā)窗口

水位線的對齊

1、當上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算

public class Demo5EventTime {

public static void main(String[] args) throws Exception {

/*

* 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應數(shù)據(jù)真實發(fā)生的順序,計算更有意義

*/

/*

java,1685433130000

java,1685433131000

java,1685433132000

java,1685433134000

java,1685433135000

java,1685433137000

java,1685433139000

java,1685433140000

java,1685433170000

*/

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/*

*水位線對齊

* 1、當上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算

*/

env.setParallelism(1);

DataStream linesDS = env.socketTextStream("master", 8888);

//解析數(shù)據(jù)

DataStream> tsDS = linesDS.map(line -> {

String[] split = line.split(",");

String word = split[0];

long ts = Long.parseLong(split[1]);

return Tuple2.of(word, ts);

}, Types.TUPLE(Types.STRING, Types.LONG));

/*

* 指定時間字段和水位線生成策略

*/

DataStream> assDS = tsDS

.assignTimestampsAndWatermarks(

WatermarkStrategy

//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)

//.>forMonotonousTimestamps()

//水位線前移時間(數(shù)據(jù)最大亂序時間)

.>forBoundedOutOfOrderness(Duration.ofSeconds(5))

//指定時間字段

.withTimestampAssigner((event, ts) -> event.f1)

);

/*

*每隔5秒統(tǒng)計單詞的數(shù)量

*/

DataStream> kvDS = assDS

.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = kvDS

.keyBy(kv -> kv.f0);

//TumblingEventTimeWindows:滾動的事件時間窗口

WindowedStream, String, TimeWindow> windowDS = keyByDS

.window(TumblingEventTimeWindows.of(Time.seconds(5)));

windowDS.sum(1).print();

env.execute();

}

}

4、窗口

窗口總共分為三大類:

1、Time Window

2、Count Window

3、Session Window

4.1、Time Window

時間窗口分為四種:

1、滾動處理時間窗口(TumblingProcessingTimeWindows)

2、滾動事件時間窗口(TumblingEventTimeWindows)

3、滑動處理時間窗口(SlidingProcessingTimeWindows)

4、滑動事件時間窗口(SlidingEventTimeWindows)

滾動處理時間窗口(TumblingProcessingTimeWindows)

滾動窗口的大小是固定的,且各自范圍之間不重疊。時間是根據(jù)系統(tǒng)時間來計算的

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator> wordDS = streamSource.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = wordDS.keyBy(kv -> kv.f0);

WindowedStream, String, TimeWindow> windowDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

SingleOutputStreamOperator> countDS = windowDS.sum(1);

countDS.print();

env.execute();

}

滾動事件時間窗口(TumblingEventTimeWindows)

滾動窗口的大小是固定的,且各自范圍之間不重疊。時間是根據(jù)所帶的時間字段來計算的

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/*

*水位線對齊

* 1、當上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算

*因此并行度設(shè)為1

*/

env.setParallelism(1);

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator> wordDS = streamSource.map(lines -> {

String[] line = lines.split(",");

String word = line[0];

long ts = Long.parseLong(line[1]);

return Tuple2.of(word, ts);

}, Types.TUPLE(Types.STRING, Types.LONG));

SingleOutputStreamOperator> eventDS = wordDS.assignTimestampsAndWatermarks(WatermarkStrategy

//設(shè)置水位線延遲時間

.>forBoundedOutOfOrderness(Duration.ofSeconds(2))

//不設(shè)置水位線的延遲

//.>forMonotonousTimestamps()

//指定時間字段

.withTimestampAssigner((event, ts) -> event.f1)

);

SingleOutputStreamOperator> mapDS = eventDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = mapDS.keyBy(kv -> kv.f0);

WindowedStream, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));

SingleOutputStreamOperator> countDS = windowDS.sum(1);

countDS.print();

env.execute();

}

滑動處理時間窗口(SlidingProcessingTimeWindows)

與滾動窗口類似,滑動窗口的 assigner 分發(fā)元素到指定大小的窗口,窗口大小通過 window size 參數(shù)設(shè)置。 滑動窗口需要一個額外的滑動距離(window slide)參數(shù)來控制生成新窗口的頻率,時間是根據(jù)系統(tǒng)時間來計算的。

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator> lineDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = lineDS.keyBy(kv -> kv.f0);

WindowedStream, String, TimeWindow> windowDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));

SingleOutputStreamOperator> countDS = windowDS.sum(1);

countDS.print();

env.execute();

}

滑動事件時間窗口(SlidingEventTimeWindows)

與滾動窗口類似,滑動窗口的 assigner 分發(fā)元素到指定大小的窗口,窗口大小通過 window size 參數(shù)設(shè)置。 滑動窗口需要一個額外的滑動距離(window slide)參數(shù)來控制生成新窗口的頻率,時間是根據(jù)所帶的時間字段來計算的。

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator> wordDS = streamSource.map(line -> {

String[] words = line.split(",");

String word = words[0];

long ts = Long.parseLong(words[1]);

return Tuple2.of(word, ts);

}, Types.TUPLE(Types.STRING, Types.LONG));

SingleOutputStreamOperator> watermarks = wordDS.assignTimestampsAndWatermarks(WatermarkStrategy

.>forBoundedOutOfOrderness(Duration.ofSeconds(2))

.withTimestampAssigner((event, ts) -> event.f1)

);

SingleOutputStreamOperator> mapDS = watermarks.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = mapDS.keyBy(kv -> kv.f0);

WindowedStream, String, TimeWindow> windowDS = keyByDS.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)));

SingleOutputStreamOperator> countDS = windowDS.sum(1);

countDS.print();

env.execute();

}

4.2、Count Window

計數(shù)窗口基于元素的個數(shù)來截取數(shù)據(jù),到達固定的個數(shù)時就觸發(fā)計算并關(guān)閉窗口

下面的代碼是基于同一個單詞達到3個就會計算

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator> lineDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = lineDS.keyBy(kv -> kv.f0);

WindowedStream, String, GlobalWindow> countWindow = keyByDS.countWindow(3);

SingleOutputStreamOperator> countDS = countWindow.sum(1);

countDS.print();

env.execute();

}

4.3、Session Window

會話窗口的 assigner 會把數(shù)據(jù)按活躍的會話分組。 與滾動窗口和滑動窗口不同,會話窗口不會相互重疊,且沒有固定的開始或結(jié)束時間。 會話窗口在一段時間沒有收到數(shù)據(jù)之后會關(guān)閉,即在一段不活躍的間隔之后。 會話窗口的 assigner 可以設(shè)置固定的會話間隔(session gap)或 用 session gap extractor 函數(shù)來動態(tài)地定義多長時間算作不活躍。 當超出了不活躍的時間段,當前的會話就會關(guān)閉,并且將接下來的數(shù)據(jù)分發(fā)到新的會話窗口。

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator> lineDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = lineDS.keyBy(kv -> kv.f0);

WindowedStream, String, TimeWindow> windowDS = keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));

SingleOutputStreamOperator> countDS = windowDS.sum(1);

countDS.print();

env.execute();

}

七、Kafka(消息隊列)

1、概述

Kafka是一款分布式消息發(fā)布和訂閱系統(tǒng),它的特點是高性能、高吞吐量。

2、Kafka性能好的原因

1、kafka寫磁盤是順序的,所以不斷的往前產(chǎn)生,不斷的往后寫 2、kafka還用了sendFile的0拷貝技術(shù),提高速度 3、而且還用到了批量讀寫,一批批往里寫,64K為單位

非零拷貝

數(shù)據(jù)在內(nèi)核空間和用戶空間之間需要多次拷貝,增加了 CPU 和內(nèi)存帶寬的使用,性能較低。

非零拷貝

通過減少數(shù)據(jù)拷貝次數(shù),降低了 CPU 和內(nèi)存帶寬的使用,提高了數(shù)據(jù)傳輸性能。常見的實現(xiàn)技術(shù)包括 sendfile()、mmap() 和 DMA。

3、Kafka的架構(gòu)

Broker

Kafka集群中包含的服務器,有一個或多個服務器,這種服務器被稱為 Broker。

Broker 端不維護數(shù)據(jù)的消費狀態(tài),提升了性能。直接使用磁盤進行存儲,線性讀寫,速度快。避免了在JVM 內(nèi)存和系統(tǒng)內(nèi)存之間的復制,減少耗性能的創(chuàng)建對象和垃圾回收。

Producer(生產(chǎn)者)

負責發(fā)布消息到Kafka Broker

producer自己決定往哪個partition寫消息,可以是輪詢的負載均衡,或者是基于hash的partition策略

Consumer(消費者)

負責從Broker 拉?。╬ull)數(shù)據(jù)并進行處理。

Topic

每條發(fā)布到kafka集群的消息都有一個類別,這個類別被稱為Topic

物理上不同Topic的消息分開存儲,邏輯上一個Topic 的消息雖然保存于一個或多個Broker上但是用戶只需指定消費的Topic即課生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處。一個topic分成多個partition

Partition

Partition 是物理上的概念,每個Topic 包含一個或多個Partition。kafka分配的單位是Partition

每個partition內(nèi)部消息強有序,其中的每個消息都有一個序號叫offset 一個partition只對應一個broker,一個broker可以管多個partition

Consumer Group

每個Consumer 屬于一個特定的Consumer Group

可為每個Consumer 指定Group name,若不指定group name 則屬于默認的group

每條消息只可以被Consumer Goup 組中中的一個Consumer消費,但是可以指定多個Consumer Group

所以一個消息在Consumer Group 里面只可以被消費一次。已確定!

4、Kafka的Java API

4.1、Produce

public static void main(String[] args) {

Properties properties = new Properties();

properties.setProperty("bootstrap.servers","master:9092,node2:9092,node2:9092");

//指定key和value的數(shù)據(jù)格式

properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//創(chuàng)建生產(chǎn)者

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

//生產(chǎn)數(shù)據(jù)

kafkaProducer.send(new ProducerRecord<>("words","java"));

//刷新數(shù)據(jù)

kafkaProducer.flush();

//關(guān)閉資源

kafkaProducer.close();

}

4.2、Consumer

public static void main(String[] args) {

Properties properties = new Properties();

//kafka 集群列表

properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");

//讀取數(shù)據(jù)的格式

properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

/*

* earliest

* 當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

* latest 默認

* 當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)認值生的該分區(qū)下的數(shù)據(jù)

* none

* topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常

*

*/

properties.setProperty("auto.offset.reset", "earliest");

//指定消費者組,一條數(shù)據(jù)在一個組內(nèi)只消費一次

properties.setProperty("group.id", "asddffas");

//創(chuàng)建消費者

KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);

//訂閱生產(chǎn)者

ArrayList list = new ArrayList<>();

list.add("student_hash");

kafkaConsumer.subscribe(list);

while (true){

//拉去數(shù)據(jù)

ConsumerRecords consumerRecords = kafkaConsumer.poll(1000);

//獲取數(shù)據(jù)的信息

for (ConsumerRecord consumerRecord : consumerRecords) {

String topic = consumerRecord.topic();

long offset = consumerRecord.offset();

int partition = consumerRecord.partition();

String value = consumerRecord.value();

long timestamp = consumerRecord.timestamp();

System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);

}

}

}

5、Flink on Kafka

5.1、KafkaSource

起始消費位點

Kafka source 能夠通過位點初始化器(OffsetsInitializer)來指定從不同的偏移量開始消費 。內(nèi)置的位點初始化器包括:

KafkaSource.builder()

// 從消費組提交的位點開始消費,不指定位點重置策略

.setStartingOffsets(OffsetsInitializer.committedOffsets())

// 從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

// 從時間戳大于等于指定時間戳(毫秒)的數(shù)據(jù)開始消費

.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))

// 從最早位點開始消費

.setStartingOffsets(OffsetsInitializer.earliest())

// 從最末尾位點開始消費

.setStartingOffsets(OffsetsInitializer.latest());

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource source = KafkaSource.builder()

//kafka集群

.setBootstrapServers("master:9092,node1:9092,node2:9092")

//指定topic

.setTopics("students")

//指定消費者組

.setGroupId("student-group")

//指定讀取的數(shù)據(jù)的起始點

.setStartingOffsets(OffsetsInitializer.earliest())

//指定讀取數(shù)據(jù)的格式

.setValueOnlyDeserializer(new SimpleStringSchema())

.build();

DataStreamSource kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafkaSource");

kafkaSourceDS.print();

env.execute();

}

5.2、KafkaSink

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource scoreDS = env.readTextFile("flink/data/score.txt");

KafkaSink kafkaSink = KafkaSink.builder()

.setBootstrapServers("master:9092,node1:9092,node2:9092")

.setRecordSerializer(KafkaRecordSerializationSchema.builder()

.setTopic("score")

.setValueSerializationSchema(new SimpleStringSchema())

.build()

)

.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)

.build();

scoreDS.sinkTo(kafkaSink);

env.execute();

}

八、狀態(tài)與容錯

1、Flink的狀態(tài)

雖然數(shù)據(jù)流中的許多操作一次只查看一個單獨的事件(例如事件解析器),但有些操作記住跨多個事件的信息(例如窗口操作符)。這些操作稱為有狀態(tài)操作。

1.1、Keyed State

keyed state 接口提供不同類型狀態(tài)的訪問接口,這些狀態(tài)都作用于當前輸入數(shù)據(jù)的 key 下。換句話說,這些狀態(tài)僅可在 KeyedStream 上使用,在Java/Scala API上可以通過 stream.keyBy(...) 得到 KeyedStream,在Python API上可以通過 stream.key_by(...) 得到 KeyedStream。

接下來,我們會介紹不同類型的狀態(tài),然后介紹如何使用他們。所有支持的狀態(tài)類型如下所示:

ValueState: 保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數(shù)據(jù)的 key,因此算子接收到的每個 key 都可能對應一個值)。 這個值可以通過 update(T) 進行更新,通過 T value() 進行檢索。ListState: 保存一個元素的列表。可以往這個列表中追加數(shù)據(jù),并在當前的列表上進行檢索??梢酝ㄟ^ add(T) 或者 addAll(List) 進行添加元素,通過 Iterable get() 獲得整個列表。還可以通過 update(List) 覆蓋當前的列表。ReducingState: 保存一個單值,表示添加到狀態(tài)的所有值的聚合。接口與 ListState 類似,但使用 add(T) 增加元素,會使用提供的 ReduceFunction 進行聚合。AggregatingState: 保留一個單值,表示添加到狀態(tài)的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與 添加到狀態(tài)的元素的類型不同。 接口與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合。MapState: 維護了一個映射列表。 你可以添加鍵值對到狀態(tài)中,也可以獲得反映當前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 檢索特定 key。 使用 entries(),keys() 和 values() 分別檢索映射、鍵和值的可迭代視圖。你還可以通過 isEmpty() 來判斷是否包含任何鍵值對。

所有類型的狀態(tài)還有一個clear() 方法,清除當前 key 下的狀態(tài)數(shù)據(jù),也就是當前輸入元素的 key。

傳統(tǒng)求wordcount的代碼以及出現(xiàn)的問題

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream lineDS = env.socketTextStream("master", 12345);

KeyedStream keyByDS = lineDS.keyBy(word -> word);

DataStream> countDS = keyByDS.process(new ProcessFunction>() {

//保存之前統(tǒng)計的結(jié)果(狀態(tài))

//問題:同一個task中的數(shù)據(jù)共享同一個count變量

//int count = 0;

//問題:如果關(guān)閉這個程序或者系統(tǒng)壞了數(shù)據(jù)就會丟失,使用hashmap保存計算的中間結(jié)果,flink的checkpoint不會將hashmap中的數(shù)據(jù)持久化到hdfs上

final HashMap hashMap = new HashMap<>();

/**

* processElement方法每一條數(shù)據(jù)執(zhí)行一次

* @param word 一行數(shù)據(jù)

* @param ctx 上下文對象,可以獲取到flink的key和時間屬性

* @param out 用于將處理結(jié)果發(fā)送到下游

*/

@Override

public void processElement(String word, ProcessFunction>.Context ctx, Collector> out) throws Exception {

Integer count = hashMap.getOrDefault(word, 0);

count++;

out.collect(Tuple2.of(word, count));

hashMap.put(word, count);

}

});

countDS.print();

env.execute();

}

修改后的狀態(tài)代碼

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream lineDS = env.socketTextStream("master", 12345);

KeyedStream keyByDS = lineDS.keyBy(word -> word);

SingleOutputStreamOperator> countDS = keyByDS.process(new ProcessFunction>() {

ValueState state;

@Override

public void open(Configuration parameters) throws Exception {

RuntimeContext context = getRuntimeContext();

ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("count", Types.INT, 0);

state = context.getState(valueStateDescriptor);

}

@Override

public void processElement(String word, ProcessFunction>.Context ctx, Collector> out) throws Exception {

Integer count = state.value();

count++;

out.collect(Tuple2.of(word, count));

state.update(count);

}

});

countDS.print();

env.execute();

}

基于 DataStream API 實現(xiàn)欺詐檢測(官網(wǎng)案例關(guān)于狀態(tài)的)

我們先實現(xiàn)第一版報警程序,對于一個賬戶,如果出現(xiàn)小于 $1 美元的交易后緊跟著一個大于 $500 的交易,就輸出一個報警信息。

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource streamSource = env.socketTextStream("master", 12345);

SingleOutputStreamOperator userDS = streamSource.map(line -> {

String[] split = line.split(",");

String id = split[0];

double price = Double.parseDouble(split[1]);

return new User(id, price);

});

KeyedStream keyByDS = userDS.keyBy(User::getId);

SingleOutputStreamOperator process = keyByDS.process(new ProcessFunction() {

ValueState state;

@Override

public void open(Configuration parameters) throws Exception {

RuntimeContext context = getRuntimeContext();

ValueStateDescriptor flag = new ValueStateDescriptor<>("flag", Types.BOOLEAN, false);

state = context.getState(flag);

}

@Override

public void processElement(User value, ProcessFunction.Context ctx, Collector out) throws Exception {

if (state.value()) {

if (value.getPrice() > 500) {

System.out.println("報警");

}

state.update(false);

}

if (value.getPrice() < 1) {

state.update(true);

}

}

});

env.execute();

}

@AllArgsConstructor

@Data

class User{

private String id;

private Double price;

}

1.2、算子狀態(tài)

算子狀態(tài)(或者非 keyed 狀態(tài))是綁定到一個并行算子實例的狀態(tài)。Kafka Connector 是 Flink 中使用算子狀態(tài)一個很具有啟發(fā)性的例子。Kafka consumer 每個并行實例維護了 topic partitions 和偏移量的 map 作為它的算子狀態(tài)。

當并行度改變的時候,算子狀態(tài)支持將狀態(tài)重新分發(fā)給各并行算子實例。處理重分發(fā)過程有多種不同的方案。

在典型的有狀態(tài) Flink 應用中你無需使用算子狀態(tài)。它大都作為一種特殊類型的狀態(tài)使用。用于實現(xiàn) source/sink,以及無法對 state 進行分區(qū)而沒有主鍵的這類場景中。

1.3、廣播狀態(tài)

廣播狀態(tài)是一種特殊的算子狀態(tài)。引入它的目的在于支持一個流中的元素需要廣播到所有下游任務的使用情形。在這些任務中廣播狀態(tài)用于保持所有子任務狀態(tài)相同。 該狀態(tài)接下來可在第二個處理記錄的數(shù)據(jù)流中訪問??梢栽O(shè)想包含了一系列用于處理其他流中元素規(guī)則的低吞吐量數(shù)據(jù)流,這個例子自然而然地運用了廣播狀態(tài)。 考慮到上述這類使用情形,廣播狀態(tài)和其他算子狀態(tài)的不同之處在于:

它具有 map 格式,它僅在一些特殊的算子中可用。這些算子的輸入為一個廣播數(shù)據(jù)流和非廣播數(shù)據(jù)流,這類算子可以擁有不同命名的多個廣播狀態(tài) 。

2、Checkpoint

可以定時將flink計算的狀態(tài)持久化到hdfs中,如果任務執(zhí)行失敗,可以基于hdfs中保存到的狀態(tài)恢復任務,保證之前的結(jié)果不丟失

Flink 的 checkpoint 機制會和持久化存儲進行交互,讀寫流與狀態(tài)。一般需要:

一個能夠回放一段時間內(nèi)數(shù)據(jù)的持久化數(shù)據(jù)源,例如持久化消息隊列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系統(tǒng)(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。存放狀態(tài)的持久化存儲,通常為分布式文件系統(tǒng)(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。

2.1、開啟Checkpoint的方式

在代碼中單獨開啟

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 開始一次 checkpoint

env.enableCheckpointing(20000);

// 高級選項:

// 設(shè)置模式為精確一次 (這是默認值)

// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.setStateBackend(new HashMapStateBackend());

// 使用 externalized checkpoints,這樣 checkpoint 在作業(yè)取消后仍就會被保留

env.getCheckpointConfig().setExternalizedCheckpointCleanup(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 開啟實驗性的 unaligned checkpoints

env.getCheckpointConfig().setCheckpointStorage("hdfs://master:9000/flink/checkpoint");

DataStream streamSource = env.socketTextStream("master", 12345);

DataStream> wordDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = wordDS.keyBy(kv -> kv.f0);

DataStream> countDS = keyByDS.sum(1);

countDS.print();

env.execute();

// flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717413381741_0001 -c com.shujia.state.Demo2Checkpoint flink-1.0.jar

}

在配置文件中統(tǒng)一開啟 vim flink-conf.yaml execution.checkpointing.interval: 5000

execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

execution.checkpointing.max-concurrent-checkpoints: 1

execution.checkpointing.min-pause: 0

execution.checkpointing.mode: EXACTLY_ONCE

execution.checkpointing.timeout: 10min

execution.checkpointing.tolerable-failed-checkpoints: 0

execution.checkpointing.unaligned: false

state.backend: hashmap

state.checkpoints.dir: hdfs://master:9000/flink/checkpoint

2.2、使用checkpoint

第一次提交任務之間提交

flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717413381741_0001 -c com.shujia.state.Demo2Checkpoint flink-1.0.jar

重啟任務時基于hdfs中的快照重啟

# -s 指定恢復任務的位置

flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717413381741_0001 -c com.shujia.state.Demo2Checkpoint -s hdfs://master:9000/flink/checkpoint/303212ccf121165b0d12713d61608dc3/chk-14 flink-1.0.jar

3、Exactly Once

數(shù)據(jù)只處理一次

3.1、Kafka的Exactly Once

Kafka保證數(shù)據(jù)處理的唯一一次

1、冪等性:保證數(shù)據(jù)不重復

2、事務:保證數(shù)據(jù)不重復

3、Acks+副本:保證數(shù)據(jù)不丟失

acks機制:

acks=1(默認):當主分區(qū)寫入成功,就會返回成功,如果這時主分區(qū)掛了,剛寫入的數(shù)據(jù)就會丟失

acks=0:生產(chǎn)者只負責生產(chǎn)數(shù)據(jù),不負責驗證數(shù)據(jù)是否成功寫入,可能會造成數(shù)據(jù)丟失,但是寫入的性能好

acks=-1或者all:生產(chǎn)者生產(chǎn)數(shù)據(jù)后必須等待數(shù)據(jù)都同步到副本之后才會返回成功,但是性能差

3.2、Flink的Exactly Once

Flink 分布式快照保存數(shù)據(jù)計算的狀態(tài)(checkpoint)和消費的偏移量,保證程序重啟之后不丟失狀態(tài)和消費偏移量

3.3、Exactly Once的代碼

聚合運算

如果任務在執(zhí)行過程中失敗了,可以恢復到上一次成功的checkpoint的位置,保證計算的狀態(tài)和消費偏移量不丟失,保證數(shù)據(jù)處理的唯一一次

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource kafkaSource = KafkaSource.builder()

.setBootstrapServers("master:9092,node1:9092,node2:9092")

.setTopics("words")

.setGroupId("group-id")

.setStartingOffsets(OffsetsInitializer.earliest())

.setValueOnlyDeserializer(new SimpleStringSchema())

.build();

DataStream streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

DataStream> wordDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));

KeyedStream, String> keyByDS = wordDS.keyBy(kv -> kv.f0);

DataStream> countDS = keyByDS.sum(1);

countDS.print();

env.execute();

// flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717499706759_0001 -c com.shujia.state.Demo5ExactlyOnce flink-1.0.jar

}

非聚合運算

只利用checkpoint和偏移量不能保證數(shù)據(jù)處理是唯一一次,需要將兩次的checkpoint放到一個事務中,上一次checkpoint完成時開啟事務,下一次事務完成時提交事務,這樣做會增加數(shù)據(jù)處理的延遲,但是保證了數(shù)據(jù)處理的唯一一次

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource kafkaSource = KafkaSource.builder()

.setBootstrapServers("master:9092,node1:9092,node2:9092")

.setTopics("words")

.setGroupId("group-id")

.setStartingOffsets(OffsetsInitializer.earliest())

.setValueOnlyDeserializer(new SimpleStringSchema())

.build();

DataStream streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

DataStream filterDS = streamSource.filter(word -> !"".equals(word));

//flink處理事務的時間要小于Kafka中的事務時間15min

Properties properties = new Properties();

properties.setProperty("transaction.timeout.ms", 10 * 60 * 1000 + "");

KafkaSink kafkaSink = KafkaSink.builder()

.setBootstrapServers("master:9092,node1:9092,node2:9092")

//同步flink處理事務的時間

.setKafkaProducerConfig(properties)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()

.setTopic("filter")

.setValueSerializationSchema(new SimpleStringSchema())

.build()

)

//指定數(shù)據(jù)處理的語義

.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

.build();

filterDS.sinkTo(kafkaSink);

env.execute();

}

九、FlinkSql

1、DataStream 上的關(guān)系查詢

下表比較了傳統(tǒng)的關(guān)系代數(shù)和流處理與輸入數(shù)據(jù)、執(zhí)行和輸出結(jié)果的關(guān)系。

關(guān)系代數(shù) / SQL流處理關(guān)系(或表)是有界(多)元組集合。流是一個無限元組序列。對批數(shù)據(jù)(例如關(guān)系數(shù)據(jù)庫中的表)執(zhí)行的查詢可以訪問完整的輸入數(shù)據(jù)。流式查詢在啟動時不能訪問所有數(shù)據(jù),必須“等待”數(shù)據(jù)流入。批處理查詢在產(chǎn)生固定大小的結(jié)果后終止。流查詢不斷地根據(jù)接收到的記錄更新其結(jié)果,并且始終不會結(jié)束。

2、動態(tài)表 & 連續(xù)查詢

動態(tài)表 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表不同,動態(tài)表是隨時間變化的??梢韵癫樵冹o態(tài)批處理表一樣查詢它們。查詢動態(tài)表將生成一個 連續(xù)查詢 。一個連續(xù)查詢永遠不會終止,結(jié)果會生成一個動態(tài)表。查詢不斷更新其(動態(tài))結(jié)果表,以反映其(動態(tài))輸入表上的更改。本質(zhì)上,動態(tài)表上的連續(xù)查詢非常類似于定義物化視圖的查詢。

需要注意的是,連續(xù)查詢的結(jié)果在語義上總是等價于以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果。

下圖顯示了流、動態(tài)表和連續(xù)查詢之間的關(guān)系:

將流轉(zhuǎn)換為動態(tài)表。在動態(tài)表上計算一個連續(xù)查詢,生成一個新的動態(tài)表。生成的動態(tài)表被轉(zhuǎn)換回流。

注意: 動態(tài)表首先是一個邏輯概念。在查詢執(zhí)行期間不一定(完全)物化動態(tài)表。

3、表到流的轉(zhuǎn)換

Append-only 流: 僅通過 INSERT 操作修改的動態(tài)表可以通過輸出插入的行轉(zhuǎn)換為流。Retract 流: retract 流包含兩種類型的 message: add messages 和 retract messages 。通過將INSERT 操作編碼為 add message、將 DELETE 操作編碼為 retract message、將 UPDATE 操作編碼為更新(先前)行的 retract message 和更新(新)行的 add message,將動態(tài)表轉(zhuǎn)換為 retract 流。下圖顯示了將動態(tài)表轉(zhuǎn)換為 retract 流的過程。

Upsert 流: upsert 流包含兩種類型的 message: upsert messages 和delete messages。轉(zhuǎn)換為 upsert 流的動態(tài)表需要(可能是組合的)唯一鍵。通過將 INSERT 和 UPDATE 操作編碼為 upsert message,將 DELETE 操作編碼為 delete message ,將具有唯一鍵的動態(tài)表轉(zhuǎn)換為流。消費流的算子需要知道唯一鍵的屬性,以便正確地應用 message。與 retract 流的主要區(qū)別在于 UPDATE 操作是用單個 message 編碼的,因此效率更高。下圖顯示了將動態(tài)表轉(zhuǎn)換為 upsert 流的過程。

4、SQL命令行

# 啟動flink集群

yarn-seesion.sh -d

# 進入sql命令行

sql-client.sh

# 1、創(chuàng)建表,數(shù)據(jù)源時kafka

CREATE TABLE students (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

# 2、編寫sql進行連續(xù)查詢

select

clazz,count(1)as num

from students

group by clazz;

# 3、生產(chǎn)數(shù)據(jù)

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students

5、SQL命令行打印結(jié)果模式

5.1表格模式(table mode)默認

在內(nèi)存中實體化結(jié)果,并將結(jié)果用規(guī)則的分頁表格可視化展示出來。執(zhí)行如下命令啟用:

SET 'sql-client.execution.result-mode' = 'table';

5.2、變更日志模式(changelog mode)

不會實體化和可視化結(jié)果,而是由插入(+)和撤銷(-)組成的持續(xù)查詢產(chǎn)生結(jié)果流。

SET 'sql-client.execution.result-mode' = 'changelog';

5.3、Tableau模式(tableau mode)

更接近傳統(tǒng)的數(shù)據(jù)庫,會將執(zhí)行的結(jié)果以制表的形式直接打在屏幕之上。具體顯示的內(nèi)容會取決于作業(yè) 執(zhí)行模式的不同(execution.type):

SET 'sql-client.execution.result-mode' = 'tableau';

6、處理模式

6.1、流處理

1、可以用于處理有界流和無界流 2、流處理模式輸出連續(xù)結(jié)果 3、流處理模式底層時持續(xù)流模型

SET 'execution.runtime-mode' = 'streaming';

6.2、批處理

1、批處理模式只能用于處理有界流 2、輸出最終結(jié)果 3、底層是MapReduce模型

SET 'execution.runtime-mode' = 'batch';

7、連接器

7.1、Kafka

KafkaSource

-- 創(chuàng)建表 --- 無界流

-- TIMESTAMP(3): 時flink總的時間字段

CREATE TABLE students_kafka (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING,

`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 獲取kfka時間戳

`partition` BIGINT METADATA VIRTUAL, -- 獲取kafka數(shù)據(jù)所在的分區(qū)

`offset` BIGINT METADATA VIRTUAL,-- 偏移量

-- 指定時間字段和水位線生成策略

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'students',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

select id,name,event_time,`partition`,`offset` from students_kafka;

-- 每隔5秒統(tǒng)計每個班級的人數(shù)

select

clazz,

TUMBLE_START(event_time,INTERVAL '5' SECOND) as win_start,

TUMBLE_END(event_time,INTERVAL '5' SECOND) as win_end,

count(id) as num

from

students_kafka

group by

clazz,

-- 滾動的事件時間窗口

TUMBLE(event_time,INTERVAL '5' SECOND);

kafka sink

CREATE TABLE students_kafka_num (

clazz STRING,

win_start TIMESTAMP(3),

win_end TIMESTAMP(3),

num BIGINT

) WITH (

'connector' = 'kafka',

'topic' = 'clazz_num',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

insert into students_kafka_num

select

clazz,

TUMBLE_START(event_time,INTERVAL '5' SECOND) as win_start,

TUMBLE_END(event_time,INTERVAL '5' SECOND) as win_end,

count(id) as num

from

students_kafka

group by

clazz,

-- 滾動的事件時間窗口

TUMBLE(event_time,INTERVAL '5' SECOND);

7.2、JDBC

整合

# 將依賴包上傳到flink的lib目錄下

flink-connector-jdbc-1.15.2.jar

mysql-connector-java-5.1.47.jar

# 依賴更新后需要重啟集群才會生效

yarn application -list

yarn application -kill [appid]

yarn-session.sh -d

sql-client.sh

MySQL Source

CREATE TABLE students_mysql (

id INT,

name STRING,

age INT,

gender STRING,

clazz STRING

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=UTF-8',

'table-name' = 'students',

'username'='root',

'password'='123456'

);

-- 求每個班的人數(shù)

select

clazz,

count(1) as num

from

students_mysql

group by

clazz;

MySQL Sink

注意:[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

這個錯誤是這是一個Retract 流有更新和刪除的作用為了數(shù)據(jù)的最終結(jié)果,要保證主鍵的唯一性

CREATE TABLE clazz_num (

clazz STRING,

num BIGINT,

PRIMARY KEY (clazz) NOT ENFORCED

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=UTF-8',

'table-name' = 'clazz_num',

'username'='root',

'password'='123456'

);

insert into clazz_num

select

clazz,

count(1) as num

from

students_mysql

group by

clazz;

7.3、HDFS

HDFS Source

CREATE TABLE students_hdfs (

id INT,

name STRING,

age INT,

gender STRING,

clazz STRING

) WITH (

'connector' = 'filesystem',

'path' = 'hdfs://master:9000/bigdata29/data/students.txt',

'format' ='csv'

);

-- 求每個班的男生和女生各有多少人

select

clazz,

gender,

count(1) as num

from

students_hdfs

group by

clazz,

gender;

HDFS Sink

--僅追加的寫入到HDFS中

CREATE TABLE clazz_gender_hdfs (

id INT,

name STRING,

age INT,

gender STRING,

clazz STRING

) WITH (

'connector' = 'filesystem',

'path' = 'hdfs://master:9000/bigdata29/flink/data',

'format' ='csv'

);

insert into clazz_gender_hdfs

select id,name,age,gender,clazz from students_mysql;

---- 2、將更新更改的結(jié)果寫入hdfs

CREATE TABLE clazz_gender_num_hdfs (

clazz STRING,

gender STRING,

num BIGINT

) WITH (

'connector' = 'filesystem', -- 必選:指定連接器類型

'path' = 'hdfs://master:9000/bigdata29/flink/sqlSink/clazz_gender_num_hdfs',--必選:指定路徑

'format' ='canal-json' -- 必選:文件系統(tǒng)連接器指定 format

);

insert into clazz_gender_num_hdfs

-- 求每個班的男生和女生各有多少人

select

clazz,

gender,

count(1) as num

from

students_hdfs

group by

clazz,

gender;

7.4、HBase

整合

# 將依賴包上傳到flink的lib目錄下

flink-sql-connector-hbase-2.2-1.15.2.jar

# 依賴更新后需要重啟集群才會生效

yarn application -list

yarn application -kill [appid]

yarn-session.sh -d

sql-client.sh

HBase sink

--創(chuàng)建hbase表

create 'students_flink','info'

-- 創(chuàng)建hbase sink表

CREATE TABLE students_hbase (

id INT,

info ROW, -- 指定列簇中的列

PRIMARY KEY (id) NOT ENFORCED -- 設(shè)置hbaserowkey

) WITH (

'connector' = 'hbase-2.2',

'table-name' = 'students_flink',

'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'

);

insert into students_hbase

select

id,

ROW(name,age,gender,clazz) as info

from students_mysql;

--查看結(jié)果

select * from students_hbase;

scan 'students_flink'

7.5、datagen

用于生成測試數(shù)據(jù),可以用于高性能測試

CREATE TABLE students_datagen (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'datagen',

'rows-per-second'='5', -- 指定每秒生成的數(shù)據(jù)量

'fields.id.length'='5',

'fields.name.length'='3',

'fields.age.min'='1',

'fields.age.max'='100',

'fields.sex.length'='1',

'fields.clazz.length'='4'

);

7.6、print

在task manager中打印結(jié)果

CREATE TABLE print_table (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'print'

);

CREATE TABLE print_table

WITH ('connector' = 'print')

-- 應用目標表的字段創(chuàng)建新的

LIKE students_datagen (EXCLUDING ALL);

insert into print_table

select * from students_datagen;

7.7、BlackHole

用于高性能測試

CREATE TABLE blackhole_table (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'blackhole'

);

insert into blackhole_table

select * from students_datagen;

8、數(shù)據(jù)格式

8.1、csv

數(shù)據(jù)中字段的順序需要和建表語句字段的順序保持一致 (順序映射) 默認按照逗號分割

CREATE TABLE students_csv (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = ',' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

8.2、json

link表中的字段和類型需要和json中保持一致(同名映射)

CREATE TABLE cars (

car STRING,

city_code STRING,

county_code STRING,

card BIGINT,

camera_id STRING,

orientation STRING,

road_id BIGINT,

`time` BIGINT,

speed DOUBLE

) WITH (

'connector' = 'kafka',

'topic' = 'cars', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'json', -- 指定數(shù)據(jù)的格式

'json.ignore-parse-errors' ='true'

);

8.3、canal-json

用于保存更新更改的結(jié)果流

CREATE TABLE clazz_num (

clazz STRING,

num BIGINT

) WITH (

'connector' = 'kafka',

'topic' = 'clazz_num',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'canal-json'

);

insert into clazz_num

select

clazz,

count(1) as num

from

students_kafka

group by

clazz;

9、時間屬性

9.1、處理時間

處理時間是基于機器的本地時間來處理數(shù)據(jù),它是最簡單的一種時間概念,但是它不能提供確定性。它既不需要從數(shù)據(jù)里獲取時間,也不需要生成 watermark。

-- PROCTIME() 生成處理時間的函數(shù)

CREATE TABLE words (

word STRING,

proctime AS PROCTIME() -- 聲明一個額外的列作為處理時間屬性

) WITH (

'connector' = 'kafka',

'topic' = 'words',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

-- 實時統(tǒng)計每個單詞最近5秒單詞的數(shù)量

select

word,

TUMBLE_START(proctime,INTERVAL '5' SECOND) win_start,

TUMBLE_END(proctime,INTERVAL '5' SECOND) win _end,

count(word) as num

from

words

group by

word,

TUMBLE(proctime,INTERVAL '5' SECOND);

9.2、事件時間

事件時間允許程序按照數(shù)據(jù)中包含的時間來處理,這樣可以在有亂序或者晚到的數(shù)據(jù)的情況下產(chǎn)生一致的處理結(jié)果。它可以保證從外部存儲讀取數(shù)據(jù)后產(chǎn)生可以復現(xiàn)(replayable)的結(jié)果。

除此之外,事件時間可以讓程序在流式和批式作業(yè)中使用同樣的語法。在流式程序中的事件時間屬性,在批式程序中就是一個正常的時間字段。

為了能夠處理亂序的事件,并且區(qū)分正常到達和晚到的事件,F(xiàn)link 需要從事件中獲取事件時間并且產(chǎn)生 watermark(watermarks)。

事件時間屬性也有類似于處理時間的三種定義方式:在DDL中定義、在 DataStream 到 Table 轉(zhuǎn)換時定義、用 TableSource 定義。

CREATE TABLE word_event_time (

word STRING,

word_event_time TIMESTAMP(3),

-- 聲明 user_action_time 是事件時間屬性,并且用 延遲 5 秒的策略來生成 watermark

WATERMARK FOR word_event_time AS word_event_time - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'words_event_time',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

select

word,

TUMBLE_START(word_event_time ,INTERVAL '5' SECOND) win_start,

TUMBLE_END(word_event_time ,INTERVAL '5' SECOND) win_end,

count(word) as num

from

word_event_time

group by

word,

TUMBLE(word_event_time ,INTERVAL '5' SECOND);

10、SQL語法

10.1、Hints

動態(tài)表選擇:可以在查詢表的時候動態(tài)修改表的參數(shù)配置

CREATE TABLE students (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

select * from students /*+ OPTIONS('csv.ignore-parse-errors' ='true') */;

-- latest-offset: 讀取任務啟動之后生產(chǎn)的數(shù)據(jù)

select * from students /*+ OPTIONS('csv.ignore-parse-errors' ='true','scan.startup.mode' = 'latest-offset') */;

CREATE TABLE students_hdfs_stream (

id int,

name STRING,

age INT,

gender STRING,

clazz STRING

) WITH (

'connector' = 'filesystem', -- 必選:指定連接器類型

'path' = 'hdfs://master:9000/data/students', -- 必選:指定路徑

'format' = 'csv' -- 必選:文件系統(tǒng)連接器指定 format

);

select * from students_hdfs_stream /*+OPTIONS('source.monitor-interval' = '5000') */

10.2、WITH 子句

WITH 子句提供了一種用于更大查詢而編寫輔助語句的方法。這些編寫的語句通常被稱為公用表表達式,表達式可以理解為僅針對某個查詢而存在的臨時視圖

with tmp as (

select

id,name,age,clazz

from

students_mysql

where age > 22

)

select * from tmp

union all

select * from tmp;

10.3、SELECT WHERE

select * from students_hdfs_stream

where

age > 21

and gender in ('男','女');

10.4、SELECT DISTINCT

對于流處理的問題 1、flink會將之前的數(shù)據(jù)保存在狀態(tài)中,用于判斷是否重復 2、如果表的數(shù)據(jù)量很大,隨著時間的推移狀態(tài)會越來越大,狀態(tài)的數(shù)據(jù)時先保存在TM的內(nèi)存中的,時間長了可能會出問題

select distinct * from students /*+ OPTIONS('csv.ignore-parse-errors' ='true','scan.startup.mode' = 'latest-offset') */;

10.5、窗口函數(shù)

TUMBLE(滾動窗口)

TUMBLE函數(shù)將每個元素分配給指定窗口大小的窗口。翻滾窗口有固定的大小,不重疊。例如,假設(shè)您指定了一個大小為5分鐘的滾動窗口。在這種情況下,F(xiàn)link將評估當前窗口,并且每五分鐘啟動一個新窗口,如下圖所示。

TUMBLE函數(shù)根據(jù)時間屬性字段為關(guān)系的每一行分配一個窗口。在流模式下,時間屬性字段必須是事件或處理時間屬性。在批處理模式下,窗口表函數(shù)的時間屬性字段必須是TIMESTAMP或TIMESTAMP_LTZ類型的屬性。TUMBLE的返回值是一個新的關(guān)系,它包括原關(guān)系的所有列,以及額外的名為“window_start”,“window_end”,“window_time”的3列,以表示分配的窗口。原來的時間屬性“timecol”將是一個常規(guī)的時間戳列

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

data:是一個表參數(shù),可以與時間屬性列有任意關(guān)系。timecol:是一個列描述符,指示應將數(shù)據(jù)的哪些時間屬性列映射到滾動窗口。size:是指定滾動窗口寬度的持續(xù)時間。offset:是一個可選參數(shù),用于指定窗口起始位置的偏移量。

--創(chuàng)建bid的表

CREATE TABLE bid (

item STRING,

price DECIMAL(10, 2),

bidtime TIMESTAMP(3),

WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'bid',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

--生產(chǎn)數(shù)據(jù)

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid

C,4.00,2020-04-15 08:05:01

A,2.00,2020-04-15 08:07:01

D,5.00,2020-04-15 08:09:01

B,3.00,2020-04-15 08:11:01

E,1.00,2020-04-15 08:13:01

F,6.00,2020-04-15 08:17:01

-- TUMBLE:滾動窗口函數(shù),在原表的基礎(chǔ)上增加窗口開始時間,窗口結(jié)束時間,窗口時間

SELECT item,price,bidtime,window_start,window_end,window_time FROM TABLE(

TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)

);

--或者

SELECT * FROM TABLE(

TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

-- 或者

SQL> SELECT * FROM TABLE(

TUMBLE(

DATA => TABLE Bid,

TIMECOL => DESCRIPTOR(bidtime),

SIZE => INTERVAL '10' MINUTES));

--每隔十分鐘統(tǒng)計總價格

SELECT window_start, window_end, SUM(price)

FROM TABLE(

TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

GROUP BY window_start, window_end;

HOP(滑動窗口)

HOP函數(shù)將元素分配給固定長度的窗口。與TUMBLE窗口函數(shù)一樣,窗口的大小由窗口大小參數(shù)配置。另一個窗口滑動參數(shù)控制啟動跳窗的頻率。因此,如果幻燈片小于窗口大小,則跳躍窗口可以重疊。在這種情況下,元素被分配給多個窗口。跳窗也被稱為“滑動窗”。

例如,您可以設(shè)置大小為10分鐘的窗口,每隔5分鐘滑動一次。這樣,每隔5分鐘就會出現(xiàn)一個窗口,其中包含最近10分鐘內(nèi)到達的事件,如下圖所示。

HOP函數(shù)分配的窗口覆蓋大小間隔內(nèi)的行,并根據(jù)時間屬性字段移動每張幻燈片。在流模式下,時間屬性字段必須是事件或處理時間屬性。在批處理模式下,窗口表函數(shù)的時間屬性字段必須是TIMESTAMP或TIMESTAMP_LTZ類型的屬性。HOP的返回值是一個新的關(guān)系,它包含了原關(guān)系的所有列,并增加了名為“window_start”,“window_end”,“window_time”的3列,以表示分配的窗口。原始的時間屬性“timecol”將是窗口TVF之后的一個常規(guī)時間戳列。

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

data:是一個表參數(shù),可以與時間屬性列有任意關(guān)系。timecol:是列描述符,指示應將數(shù)據(jù)的哪些時間屬性列映射到跳躍窗口。slide:是指定連續(xù)跳躍窗口開始之間的持續(xù)時間size:是指定跳躍窗口寬度的持續(xù)時間。offset:是一個可選參數(shù),用于指定窗口起始位置的偏移量。

CREATE TABLE bid_proctime (

item STRING,

price DECIMAL(10, 2),

proctime AS PROCTIME()

) WITH (

'connector' = 'kafka',

'topic' = 'bid_proctime',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_proctime

C,4.00

A,2.00

D,5.00

B,3.00

E,1.00

F,6.00

-- HOP: 滑動窗口函數(shù)

SELECT item,price,proctime,window_start,window_end,window_time FROM TABLE(

HOP(TABLE bid_proctime, DESCRIPTOR(proctime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)

);

-- 窗口聚合

SELECT

window_start,

window_end,

avg(price) as avg_price

FROM

TABLE(

HOP(TABLE bid_proctime, DESCRIPTOR(proctime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)

)

group by

window_start,

window_end;

CUMULATE (累積窗口)

累積窗口(Cumulative Window,也稱為累積窗口或滾動累積窗口)是一種用于處理流數(shù)據(jù)的窗口類型,它允許窗口的長度隨時間逐步增加,直到達到指定的最大長度。

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

data:是一個表參數(shù),可以與時間屬性列有任意關(guān)系。timecol:是一個列描述符,指示應將數(shù)據(jù)的哪些時間屬性列映射到累積窗口。step:是指定連續(xù)累積窗口末尾之間增加的窗口大小的持續(xù)時間。size:是指定累積窗口最大寬度的持續(xù)時間。size必須是的整數(shù)倍step。offset:是一個可選參數(shù),用于指定窗口起始位置的偏移量。

CREATE TABLE bid (

item STRING,

price DECIMAL(10, 2),

bidtime TIMESTAMP(3),

WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'bid',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid

C,4.00,2020-04-15 08:05:01

A,2.00,2020-04-15 08:07:01

D,5.00,2020-04-15 08:09:01

B,3.00,2020-04-15 08:11:01

E,1.00,2020-04-15 08:13:01

F,6.00,2020-04-15 08:17:01

SELECT * FROM TABLE(

CUMULATE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)

);

--每2分鐘累積一次數(shù)據(jù),直到累積時間達到10分鐘

SELECT window_start, window_end, SUM(price)

FROM TABLE(

CUMULATE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))

GROUP BY window_start, window_end;

會話窗口

CREATE TABLE bid_proctime (

item STRING,

price DECIMAL(10, 2),

proctime AS PROCTIME()

) WITH (

'connector' = 'kafka',

'topic' = 'bid_proctime',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_proctime

C,4.00

C,2.00

C,5.00

C,3.00

C,1.00

C,6.00

-- 實時統(tǒng)計每個商品的總的金額,隔5秒沒有數(shù)據(jù)開始統(tǒng)計

select

item,

SESSION_START(proctime,INTERVAL '5' SECOND) as session_start,

SESSION_END(proctime,INTERVAL '5' SECOND) as session_end,

sum(price) as sum_price

from

bid_proctime

group by

item,

SESSION(proctime,INTERVAL '5' SECOND);

10.6、GROUP BY

-- 出啊關(guān)鍵datagen source表

CREATE TABLE words_datagen (

word STRING

) WITH (

'connector' = 'datagen',

'rows-per-second'='50000', -- 指定每秒生成的數(shù)據(jù)量

'fields.word.length'='5'

);

CREATE TABLE blackhole_table (

word STRING,

num BIGINT

) WITH (

'connector' = 'blackhole'

);

-- 分組聚合需要將之前的計算結(jié)果保存在狀態(tài)中,

-- 如果狀態(tài)無限增長,會導致checkpoint時間拉長,如果checkpoint超時失敗了,也會導致任務失敗

insert into blackhole_table

select

word,

count(1)as num

from

words_datagen /*+ OPTIONS('fields.word.length'='7') */

group by

word;

10.7、OVER

sum max min avg count

CREATE TABLE `order` (

order_id STRING,

amount DECIMAL(10, 2),

product STRING,

order_time TIMESTAMP(3),

WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'order',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic order

1,4.00,001,2020-04-15 08:05:01

2,2.00,001,2020-04-15 08:07:01

3,5.00,001,2020-04-15 08:09:01

4,3.00,001,2020-04-15 08:11:01

5,1.00,001,2020-04-15 08:13:01

6,6.00,001,2020-04-15 08:17:01

6,6.00,001,2020-04-15 08:20:01

6,6.00,001,2020-04-15 08:21:01

6,10.00,001,2020-04-15 08:21:02

6,11.00,001,2020-04-15 08:21:03

6,12.00,001,2020-04-15 08:21:04

-- 1、實時統(tǒng)計每個商品的累計總金額,將總金額放在每一條數(shù)據(jù)的后面

-- 流處理的問題

-- a、sum over必須按照時間升序排序,因為數(shù)據(jù)時一條一套過來的,只能做累加求和,不能做全局求和

-- b、只能按照時間升序排序,如果按照其他的字段排序,每來一條數(shù)據(jù)都需要重新排序,計算代價太大,影響性能

select

order_id,

amount,

product,

order_time,

sum(amount) over(

partition by product

order by order_time

)

from

`order`

;

/**

RANGE 間隔

RANGE ORDER BY 列的值上定義了一個間隔,在 Flink 中,該間隔始終是時間屬性。以下 RANGE 間隔定義所有時間屬性比當前行小最多 30 分鐘的行都包含在聚合中。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

*/

-- 2、實時統(tǒng)計每個商品的累計總金額,將總金額放在每一條數(shù)據(jù)的后面,只統(tǒng)計最近10分鐘的數(shù)據(jù)

select

order_id,

amount,

product,

order_time,

sum(amount) over(

partition by product

order by order_time

-- 統(tǒng)計10分鐘前到當前行的數(shù)據(jù)

RANGE BETWEEN INTERVAL '10' MINUTES PRECEDING AND CURRENT ROW

)

from

`order`

;

/**

行間隔 #

間隔ROWS是基于計數(shù)的間隔。它精確定義了聚合中包含多少行。以下ROWS間隔定義了當前行和當前行之前的 10 行(因此總共 11 行)包含在聚合中。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

*/

-- 3、實時統(tǒng)計每個商品的累計總金額,將總金額放在每一條數(shù)據(jù)的后面,計算最近5條數(shù)據(jù)

select

order_id,

amount,

product,

order_time,

sum(amount) over(

partition by product

order by order_time

-- 從前4條數(shù)據(jù)到當前行

ROWS BETWEEN 4 PRECEDING AND CURRENT ROW

)

from

`order`

;

-- 4、實時統(tǒng)計每個商品的最大金額,將總金額放在每一條數(shù)據(jù)的后面,計算最近5條數(shù)據(jù)

select

order_id,

amount,

product,

order_time,

max(amount) over(

partition by product

order by order_time

-- 從前4條數(shù)據(jù)到當前行

ROWS BETWEEN 4 PRECEDING AND CURRENT ROW

)

from

`order`

;

10.8、TOP-N

row_number

-- 如果只是增加排名,只能按照時間字段升序排序

select

order_id,

amount,

product,

order_time,

row_number() over(partition by product order by order_time) as r

from

`order`

;

-- 實時統(tǒng)計每個商品金額最高的前兩個商品 -- TOPN

-- 去完topn之后需要計算的排名的數(shù)據(jù)較少了,計算代價降低了

select *

from (

select

order_id,

amount,

product,

order_time,

row_number() over(partition by product order by amount desc) as r

from

`order`

)

where r <= 2;

10.9、ORDER BY

-- 子流處理模式中,order by 需要按照時間字段升序排序

select * from

`order`

order by

order_time,amount

-- 加上limit ,計算代價就不高了,就可以按照普通字段進行排序了

select * from

`order`

order by

amount

limit 2;

10.10、模式檢測(CEP)

案例1

我們先實現(xiàn)第一版報警程序,對于一個賬戶,如果出現(xiàn)小于 $1 美元的交易后緊跟著一個大于 $500 的交易,就輸出一個報警信息。

CREATE TABLE tran (

id STRING,

amount DECIMAL(10, 2),

proctime as PROCTIME()

) WITH (

'connector' = 'kafka',

'topic' = 'tran',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'latest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic tran

1,4.00

1,2.00

1,5.00

1,0.90

1,600.00

1,4.00

1,2.00

1,0.10

1,200.00

1,700.00

-- MATCH_RECOGNIZE(模式檢測)

-- 在數(shù)據(jù)流上對數(shù)據(jù)進行匹配,當數(shù)滿足我們定義的規(guī)則后,返回匹配的結(jié)果

-- 我們先實現(xiàn)第一版報警程序,對于一個賬戶,如果出現(xiàn)小于 $1 美元的交易后緊跟著一個大于 $500 的交易,就輸出一個報警信息。

SELECT *

FROM tran

MATCH_RECOGNIZE (

PARTITION BY id -- 分組字段

ORDER BY proctime -- 排序字段,只能按照時間字段升序排序

MEASURES -- 相當于select

A.amount as min_amount,

A.proctime as min_proctime,

B.amount as max_amount,

B.proctime as max_proctime

PATTERN (A B) -- 定義規(guī)則

DEFINE -- 定義條件

A as amount < 1,

B as amount > 500

) AS T

-- 我們先實現(xiàn)第一版報警程序,對于一個賬戶,如果出現(xiàn)小于 $1 美元的交易后緊跟著一個大于 $500 的交易,就輸出一個報警信,兩次事件需要在10秒內(nèi)出現(xiàn)

SELECT *

FROM tran

MATCH_RECOGNIZE (

PARTITION BY id -- 分組字段

ORDER BY proctime -- 排序字段,只能按照時間字段升序排序

MEASURES -- 相當于select

A.amount as min_amount,

A.proctime as min_proctime,

B.amount as max_amount,

B.proctime as max_proctime

PATTERN (A B) WITHIN INTERVAL '5' SECOND -- 定義規(guī)則,增加事件約束,需要在5秒內(nèi)匹配出結(jié)果

DEFINE -- 定義條件

A as amount < 1,

B as amount > 500

) AS T;

-- 我們先實現(xiàn)第一版報警程序,對于一個賬戶,如果連續(xù)出現(xiàn)三次出現(xiàn)小于 $1 美元的交易后緊跟著一個大于 $500 的交易,就輸出一個報警信息

SELECT *

FROM tran

MATCH_RECOGNIZE (

PARTITION BY id -- 分組字段

ORDER BY proctime -- 排序字段,只能按照時間字段升序排序

MEASURES -- 相當于select

A.amount as a_amount, -- 獲取最后一條

min(A.amount) as min_a_amount, -- 取最小的

max(A.amount) as max_a_amount, -- 取最大的

sum(A.amount) as sum_a_amount, -- 求和

avg(A.amount) as avg_a_amount, -- 平均

FIRST(A.amount) AS first_a_amount, -- 取前面第一條

LAST(A.amount) AS LAST_a_amount, -- 取后面第一條

B.amount as b_amount

PATTERN (A{3} B) -- 定義規(guī)則

DEFINE -- 定義條件

A as amount < 1,

B as amount > 500

) AS T;

1,0.90

1,0.10

1,0.20

1,600.00

案例2

找出一個單一股票價格不斷下降的時期

CREATE TABLE ticker (

symbol STRING,

rowtime TIMESTAMP(3), -- 時間字段

price DECIMAL(10, 2) ,

tax DECIMAL(10, 2),

-- 指定時間字段和水位線生成策略

WATERMARK FOR rowtime AS rowtime

) WITH (

'connector' = 'kafka',

'topic' = 'ticker',

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'latest-offset',

'format' = 'csv',

'csv.ignore-parse-errors' ='true' -- 當有臟數(shù)據(jù)時是否跳過當前行

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic ticker

ACME,2024-06-04 10:00:00,12,1

ACME,2024-06-04 10:00:01,17,2

ACME,2024-06-04 10:00:02,19,1

ACME,2024-06-04 10:00:03,21,3

ACME,2024-06-04 10:00:04,25,2

ACME,2024-06-04 10:00:05,18,1

ACME,2024-06-04 10:00:06,15,1

ACME,2024-06-04 10:00:07,14,2

ACME,2024-06-04 10:00:08,24,2

ACME,2024-06-04 10:00:09,25,2

ACME,2024-06-04 10:00:10,19,1

-- 找出一個單一股票價格不斷下降的時期

select * from

ticker

MATCH_RECOGNIZE (

PARTITION BY symbol -- 分組字段

ORDER BY rowtime -- 排序字段,只能按照時間字段升序排序

MEASURES -- 相當于select

A.price as a_price,

FIRST(B.price) as FIRST_b_price,

LAST(B.price) as last_b_price,

C.price as c_price

AFTER MATCH SKIP PAST LAST ROW -- 從當前匹配成功止嘔的下一行開始匹配

PATTERN (A B+ C) -- 定義規(guī)則

DEFINE -- 定義條件

-- 如果時第一個B,就和A比較,如果時后面的B,就和前一個B比較

B as (LAST(B.price,1)is null and B.price < A.price) or B.price < LAST(B.price,1),

C as C.price > LAST(B.price)

) AS T;

10.11、Joins

Regular Joins

和hive sql中的join是一樣的,

CREATE TABLE students (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'latest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students

1500100001,施笑槐,22,女,文科六班

1500100002,呂金鵬,24,男,文科六班

1500100003,單樂蕊,22,女,理科六班

1500100004,葛德曜,24,男,理科三班

1500100005,宣谷芹,22,女,理科五班

1500100006,邊昂雄,21,男,理科二班

1500100007,尚孤風,23,女,文科六班

CREATE TABLE scores (

sid STRING,

cid STRING,

score INT

) WITH (

'connector' = 'kafka',

'topic' = 'scores', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'latest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores

1500100001,1000001,98

1500100001,1000002,5

1500100001,1000003,137

1500100001,1000004,29

1500100001,1000005,85

1500100001,1000006,52

1500100002,1000001,139

1500100002,1000002,102

-- inner jion(內(nèi)連接)

select

a.id,a.name,b.sid,b.score

from

students as a

inner join

scores as b

on a.id=b.sid;

--外連接

-- left join (左連接)

select

a.id,a.name,b.sid,b.score

from

students as a

left join

scores as b

on a.id=b.sid;

-- full join (全連接)

select

a.id,a.name,b.sid,b.score

from

students as a

full join

scores as b

on a.id=b.sid;

-- 常規(guī)的關(guān)聯(lián)方式,會將兩個表的數(shù)據(jù)一直保存在狀態(tài)中,時間長了,狀態(tài)會越來越大,導致任務執(zhí)行失敗

-- 狀態(tài)有效期,狀態(tài)在flink中保存的事件,狀態(tài)保留多久需要根據(jù)實際業(yè)務分析

SET 'table.exec.state.ttl' = '10000';

Interval Joins

Interval Join 用于在兩個流之間進行時間間隔內(nèi)的 Join。它允許你指定一個時間間隔,在這個時間間隔內(nèi)匹配流中的元素。

CREATE TABLE students_proctime (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING,

proctime AS PROCTIME()

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'latest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students

1500100001,施笑槐,22,女,文科六班

1500100002,呂金鵬,24,男,文科六班

1500100003,單樂蕊,22,女,理科六班

1500100004,葛德曜,24,男,理科三班

1500100005,宣谷芹,22,女,理科五班

1500100006,邊昂雄,21,男,理科二班

1500100007,尚孤風,23,女,文科六班

CREATE TABLE scores_proctime (

sid STRING,

cid STRING,

score INT,

proctime AS PROCTIME()

) WITH (

'connector' = 'kafka',

'topic' = 'scores', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'latest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores

1500100001,1000001,98

1500100001,1000002,5

1500100001,1000003,137

1500100001,1000004,29

1500100001,1000005,85

1500100001,1000006,52

1500100002,1000001,139

1500100002,1000002,102

select a.id,a.name,b.sid,b.score from

students_proctime a, scores_proctime b

where a.id=b.sid

-- a表的時間需要在b表時間10秒內(nèi)

and (

a.proctime BETWEEN b.proctime - INTERVAL '10' SECOND AND b.proctime

or b.proctime BETWEEN a.proctime - INTERVAL '10' SECOND AND a.proctime

);

Temporal Joins

時態(tài)表是一個隨時間變化的表,在Flink中也稱為動態(tài)表。時態(tài)表中的行與一個或多個時態(tài)周期相關(guān)聯(lián),所有Flink表都是時態(tài)的(動態(tài)的)。時態(tài)表包含一個或多個版本表快照,它可以是一個不斷變化的歷史表,用于跟蹤變化(例如:數(shù)據(jù)庫變更日志,包含所有快照)或一個變化的維度表,它具體化了變化(例如:包含最新快照的數(shù)據(jù)庫表)。

CREATE TABLE orders (

order_id STRING,

price DECIMAL(32,2),

currency STRING,

order_time TIMESTAMP(3),

WATERMARK FOR order_time AS order_time --設(shè)置水位線

) WITH (

'connector' = 'kafka',

'topic' = 'orders', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'latest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders

o_001,1,EUR,2024-06-06 12:00:00

o_002,100,EUR,2024-06-06 12:00:07

o_003,200,EUR,2024-06-06 12:00:16

o_004,10,EUR,2024-06-06 12:00:21

o_005,20,EUR,2024-06-06 12:00:25

-- 匯率表

CREATE TABLE currency_rates (

currency STRING,

conversion_rate DECIMAL(32, 2),

update_time TIMESTAMP(3),

WATERMARK FOR update_time AS update_time,--設(shè)置水位線

PRIMARY KEY(currency) NOT ENFORCED -- 主鍵,區(qū)分不同的匯率

) WITH (

'connector' = 'kafka',

'topic' = 'currency_rates1', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'canal-json' -- 指定數(shù)據(jù)的格式

);

insert into currency_rates

values

('EUR',0.12,TIMESTAMP'2024-06-06 12:00:00'),

('EUR',0.11,TIMESTAMP'2024-06-06 12:00:09'),

('EUR',0.15,TIMESTAMP'2024-06-06 12:00:17'),

('EUR',0.14,TIMESTAMP'2024-06-06 12:00:23');

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates

-- 使用常規(guī)關(guān)聯(lián)方式關(guān)聯(lián)時態(tài)表只能關(guān)聯(lián)到最新的數(shù)據(jù)

select

a.price,a.order_time,b.conversion_rate,b.update_time

from

orders as a

join

currency_rates as b

on a.currency=b.currency;

-- 時態(tài)表join

-- FOR SYSTEM_TIME AS OF a.order_time: 使用a表的時間到b表中查詢對應時間段的數(shù)據(jù)

select

a.price,a.order_time,b.conversion_rate,b.update_time

from

orders as a

join

currency_rates FOR SYSTEM_TIME AS OF a.order_time as b

on a.currency=b.currency;

Lookup Joins

用于流表關(guān)聯(lián)維度表 流表:動態(tài)表 維度表:不怎么變化的變,維度表的數(shù)據(jù)一般可以放在hdfs或者mysql

CREATE TABLE scores (

sid INT,

cid STRING,

score INT,

proctime AS PROCTIME()

) WITH (

'connector' = 'kafka',

'topic' = 'scores', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'latest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv' -- 指定數(shù)據(jù)的格式

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores

1500100001,1000001,98

1500100002,1000002,5

1500100001,1000003,137

CREATE TABLE students (

id INT,

name STRING,

age INT,

gender STRING,

clazz STRING

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://master:3306/bigdata29',

'table-name' = 'student',

'username' ='root',

'password' = '123456',

'lookup.cache.max-rows' = '1000', -- 最大緩存行數(shù)

'lookup.cache.ttl' ='10000' -- 緩存過期時間

);

--1、使用常規(guī)關(guān)聯(lián)方式

-- 維表的數(shù)據(jù)只在任務啟動的時候讀取一次,后面不再實時讀取,

-- 只能關(guān)聯(lián)到任務啟動時讀取的數(shù)據(jù)

select a.sid,a.score,b.id,b.name from

scores as a

left join

students as b

on a.sid=b.id;

-- lookup join

-- 當流表每來一條數(shù)據(jù)時,使用關(guān)聯(lián)字段到維表的數(shù)據(jù)源中查詢

-- 每一次都需要查詢數(shù)據(jù)庫,性能會降低

select a.sid,a.score,b.id,b.name from

scores as a

left join

students FOR SYSTEM_TIME AS OF a.proctime as b

on a.sid=b.id;

10.12、整合hive

整合

# 上傳依賴到flink的lib目錄下

flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar

# 重啟flink集群

yarn application -list

yarn application -kill XXX

yarn-session.sh -d

sql-client.sh

hive catalog

catalog—>database—>table---->字段---->數(shù)據(jù) catalog是數(shù)據(jù)庫上面的一個概念,一個cataloglog中可以i有多個database, catalog就是flink抽象的元數(shù)據(jù)層

default_catalog:是flink默認的元數(shù)據(jù),將元數(shù)據(jù)保存在jobmanager的內(nèi)存中

-- 1、啟動hive的元數(shù)據(jù)服務

nohup hive --service metastore &

-- 2、創(chuàng)建hive catalog

CREATE CATALOG hive_catalog WITH (

'type' = 'hive',

'hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf'

);

show catalogs;

--3、切換catalog

use catalog hive_catalog;

-- 查詢hive中的表

select * from hive_catalog.bigdata29.students;

-- 創(chuàng)建數(shù)據(jù)庫

create database flink;

-- flink可以查詢hive的表,hive不能查詢flink創(chuàng)建的動態(tài)表

-- 在hive cagalog 中保存flink的動態(tài)表

CREATE TABLE students_csv (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = ',' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

hive function

-- 加載hive函數(shù)

LOAD MODULE hive WITH ('hive-version' = '3.1.2');

select split('java,flink',',');

CREATE TABLE lines (

line STRING

) WITH (

'connector' = 'kafka',

'topic' = 'lines', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = '|' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic lines

java,java,flink

select

word,count(1) as num

from

lines,

lateral table(explode(split(line,','))) t(word)

group by

word;

10.13、Checkpoint

編寫sql文件

vim word_count.sql

-- 1、創(chuàng)建source表

CREATE TABLE lines (

line STRING

) WITH (

'connector' = 'kafka',

'topic' = 'lines', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = '|' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

-- 創(chuàng)建sink表

CREATE TABLE print_table (

word STRING,

num BIGINT

) WITH (

'connector' = 'print'

);

-- 加載hive函數(shù)

LOAD MODULE hive WITH ('hive-version' = '3.1.2');

-- 執(zhí)行sql

insert into print_table

select

word,count(1) as num

from

lines,

lateral table(explode(split(line,','))) t(word)

group by

word;

執(zhí)行SQL文件

-- 第一次直接提交任務

sql-client.sh -f word_count.sql

失敗重啟

-- 基于hdfs中保存的快照重啟任務

-- 在inert into 語句的前面增加

SET 'execution.savepoint.path' = 'hdfs://master:9000/flink/checkpoint/d915e6278f156a9278156e67105f914e/chk-36';

-- 重啟任務

sql-client.sh -f word_count.sql

10.14、一個表被多次使用的時候

vim student.sql

CREATE TABLE students_csv (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = ',' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

-- 創(chuàng)建sink表

CREATE TABLE clazz_num (

clazz STRING,

num BIGINT

) WITH (

'connector' = 'print'

);

CREATE TABLE sex_num (

sex STRING,

num BIGINT

) WITH (

'connector' = 'print'

);

-- 執(zhí)行一組sql,如果多個sql中使用了同一張表,flink只會讀取一次

EXECUTE STATEMENT SET

BEGIN

insert into clazz_num

select

clazz,

count(1) as num

from

students_csv

group by

clazz;

insert into sex_num

select

sex,

count(1) as num

from

students_csv

group by

sex;

END;

10.15、反壓

如果你看到一個 task 發(fā)生 反壓警告(例如: High),意味著它生產(chǎn)數(shù)據(jù)的速率比下游 task 消費數(shù)據(jù)的速率要快。 在工作流中數(shù)據(jù)記錄是從上游向下游流動的(例如:從 Source 到 Sink)。反壓沿著相反的方向傳播,沿著數(shù)據(jù)流向上游傳播。

測試反壓

-- 出啊關(guān)鍵datagen source表

CREATE TABLE words_datagen (

word STRING

) WITH (

'connector' = 'datagen',

'rows-per-second'='50000', -- 指定每秒生成的數(shù)據(jù)量

'fields.word.length'='5'

);

CREATE TABLE blackhole_table (

word STRING,

num BIGINT

) WITH (

'connector' = 'blackhole'

);

-- 反壓發(fā)生情況

--1、單詞太多,狀態(tài)太大導致反壓

insert into blackhole_table

select

word,

count(1)as num

from

words_datagen /*+ OPTIONS('fields.word.length'='6') */

group by

word;

--2、數(shù)據(jù)量太大導致反壓

insert into blackhole_table

select

word,

count(1)as num

from

words_datagen /*+ OPTIONS('fields.word.length'='5','rows-per-second'='400000') */

group by

word;

解決反壓的方法

增加資源 -- 1、增加Taskmanager的內(nèi)存

-- 啟動汲取設(shè)置tm的內(nèi)存

yarn-session.sh -tm 6G -d

-- 2、增加并行度

SET 'parallelism.default' = '8';

預聚合 -- 開啟微批處理

set 'table.exec.mini-batch.enabled' ='true';

set 'table.exec.mini-batch.allow-latency' = '5 s';

set 'table.exec.mini-batch.size' ='100000';

-- 開啟預聚合

set 'table.optimizer.agg-phase-strategy' ='TWO_PHASE';

‘csv.field-delimiter’ = ‘|’ ,-- 指定分隔符 ‘csv.ignore-parse-errors’ =‘true’ – 跳過臟數(shù)據(jù) );

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic lines java,java,flink

select word,count(1) as num from lines, lateral table(explode(split(line,‘,’))) t(word) group by word;

#### 10.13、Checkpoint

- 編寫sql文件

> vim word_count.sql

~~~sql

-- 1、創(chuàng)建source表

CREATE TABLE lines (

line STRING

) WITH (

'connector' = 'kafka',

'topic' = 'lines', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = '|' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

-- 創(chuàng)建sink表

CREATE TABLE print_table (

word STRING,

num BIGINT

) WITH (

'connector' = 'print'

);

-- 加載hive函數(shù)

LOAD MODULE hive WITH ('hive-version' = '3.1.2');

-- 執(zhí)行sql

insert into print_table

select

word,count(1) as num

from

lines,

lateral table(explode(split(line,','))) t(word)

group by

word;

執(zhí)行SQL文件

-- 第一次直接提交任務

sql-client.sh -f word_count.sql

失敗重啟

-- 基于hdfs中保存的快照重啟任務

-- 在inert into 語句的前面增加

SET 'execution.savepoint.path' = 'hdfs://master:9000/flink/checkpoint/d915e6278f156a9278156e67105f914e/chk-36';

-- 重啟任務

sql-client.sh -f word_count.sql

10.14、一個表被多次使用的時候

vim student.sql

CREATE TABLE students_csv (

id STRING,

name STRING,

age INT,

sex STRING,

clazz STRING

) WITH (

'connector' = 'kafka',

'topic' = 'students', -- 指定topic

'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表

'properties.group.id' = 'testGroup', -- 指定消費者組

'scan.startup.mode' = 'earliest-offset', -- 指定讀取數(shù)據(jù)的位置

'format' = 'csv', -- 指定數(shù)據(jù)的格式

'csv.field-delimiter' = ',' ,-- 指定分隔符

'csv.ignore-parse-errors' ='true' -- 跳過臟數(shù)據(jù)

);

-- 創(chuàng)建sink表

CREATE TABLE clazz_num (

clazz STRING,

num BIGINT

) WITH (

'connector' = 'print'

);

CREATE TABLE sex_num (

sex STRING,

num BIGINT

) WITH (

'connector' = 'print'

);

-- 執(zhí)行一組sql,如果多個sql中使用了同一張表,flink只會讀取一次

EXECUTE STATEMENT SET

BEGIN

insert into clazz_num

select

clazz,

count(1) as num

from

students_csv

group by

clazz;

insert into sex_num

select

sex,

count(1) as num

from

students_csv

group by

sex;

END;

10.15、反壓

如果你看到一個 task 發(fā)生 反壓警告(例如: High),意味著它生產(chǎn)數(shù)據(jù)的速率比下游 task 消費數(shù)據(jù)的速率要快。 在工作流中數(shù)據(jù)記錄是從上游向下游流動的(例如:從 Source 到 Sink)。反壓沿著相反的方向傳播,沿著數(shù)據(jù)流向上游傳播。

測試反壓

-- 出啊關(guān)鍵datagen source表

CREATE TABLE words_datagen (

word STRING

) WITH (

'connector' = 'datagen',

'rows-per-second'='50000', -- 指定每秒生成的數(shù)據(jù)量

'fields.word.length'='5'

);

CREATE TABLE blackhole_table (

word STRING,

num BIGINT

) WITH (

'connector' = 'blackhole'

);

-- 反壓發(fā)生情況

--1、單詞太多,狀態(tài)太大導致反壓

insert into blackhole_table

select

word,

count(1)as num

from

words_datagen /*+ OPTIONS('fields.word.length'='6') */

group by

word;

--2、數(shù)據(jù)量太大導致反壓

insert into blackhole_table

select

word,

count(1)as num

from

words_datagen /*+ OPTIONS('fields.word.length'='5','rows-per-second'='400000') */

group by

word;

解決反壓的方法

增加資源 -- 1、增加Taskmanager的內(nèi)存

-- 啟動汲取設(shè)置tm的內(nèi)存

yarn-session.sh -tm 6G -d

-- 2、增加并行度

SET 'parallelism.default' = '8';

預聚合 -- 開啟微批處理

set 'table.exec.mini-batch.enabled' ='true';

set 'table.exec.mini-batch.allow-latency' = '5 s';

set 'table.exec.mini-batch.size' ='100000';

-- 開啟預聚合

set 'table.optimizer.agg-phase-strategy' ='TWO_PHASE';

柚子快報邀請碼778899分享:大數(shù)據(jù) Flink筆記

http://yzkb.51969.com/

好文推薦

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19200336.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄