柚子快報(bào)邀請碼778899分享:大數(shù)據(jù)之-Flink學(xué)習(xí)筆記
目錄
Flink
簡介
Flink架構(gòu)
并行度
Flink部署
flink部署模式
會(huì)話模式(Session Mode)
單作業(yè)模式(Per-Job Mode)
應(yīng)用模式(Application Mode)
Standalone運(yùn)行模式
會(huì)話模式部署
應(yīng)用模式部署
Yarn運(yùn)行模式
會(huì)話模式部署
應(yīng)用模式部署
DataStream API
1、獲取執(zhí)行環(huán)境
執(zhí)行模式
2、數(shù)據(jù)源(源算子)
從集合中讀取數(shù)據(jù)
從文件中讀取數(shù)據(jù)
從socket中讀取數(shù)據(jù)
從Kafka讀取數(shù)據(jù)
3、轉(zhuǎn)換算子
基本轉(zhuǎn)換算子
分區(qū)算子
簡單聚合
4、輸出算子sink
輸出到文件
輸出到MySQL
flink窗口
1、窗口分類
滾動(dòng)窗口
滑動(dòng)窗口
會(huì)話窗口
全局窗口
代碼定義
2、窗口函數(shù)
增量聚合函數(shù)
歸約函數(shù)(ReduceFunction)
聚合函數(shù)(AggregateFunction)
全窗口函數(shù)
觸發(fā)器
移除器
flink時(shí)間
一些報(bào)錯(cuò)
Flink
Apache Flink? — Stateful Computations over Data Streams | Apache Flink
簡介
Apache Flink — 數(shù)據(jù)流上的有狀態(tài)計(jì)算。
Apache Flink 是一個(gè)框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算處理。
任何類型的數(shù)據(jù)都以事件流的形式生成。信用卡交易、傳感器測量、機(jī)器日志或網(wǎng)站或移動(dòng)應(yīng)用程序 2上的用戶交互,所有這些數(shù)據(jù)都以流的形式生成。
數(shù)據(jù)可以作為無界或有界流進(jìn)行處理。
無界數(shù)據(jù)流:有定義流的開始,但是沒有定義結(jié)束。會(huì)一直提供數(shù)據(jù),沒有結(jié)束。所以要一直連續(xù)的處理無界流,所以一旦有數(shù)據(jù)到來就要立即處理,不能等數(shù)據(jù)都到再處理,因?yàn)檩斎胧菬o限的。處理無界數(shù)據(jù)通常需要按特定順序(如數(shù)據(jù)引入的順序),以便能夠推斷結(jié)果的完整性。
有界數(shù)據(jù)流:有具體的開始和結(jié)束。有界流的處理也稱為批處理。有界數(shù)據(jù)可以等待所有數(shù)據(jù)到達(dá)之后再進(jìn)行計(jì)算處理。有界數(shù)據(jù)不需要按順序引入,因?yàn)榭梢詫τ薪绲臄?shù)據(jù)集進(jìn)行排序。
有狀態(tài)處理
流處理和批處理
Flink架構(gòu)
Flink 運(yùn)行時(shí)由兩種類型的進(jìn)程組成:一個(gè)JobManager和一個(gè)或多個(gè)TaskManager。
Client不是運(yùn)行時(shí)和程序執(zhí)行的一部分,而是用于準(zhǔn)備數(shù)據(jù)流并將其發(fā)送到 JobManager?。之后,客戶端可以斷開連接(分離模式),或保持連接以接收進(jìn)度報(bào)告
JobManager 和 TaskManager 可以通過多種方式啟動(dòng):作為獨(dú)立集群直接在機(jī)器上啟動(dòng)、在容器中啟動(dòng)或由YARN等資源框架進(jìn)行管理。TaskManager 連接到 JobManager,宣布自己可用,并被分配工作。
JobManager?
JobManager具有與協(xié)調(diào) Flink 應(yīng)用程序的分布式執(zhí)行相關(guān)的許多職責(zé):它決定何時(shí)安排下一個(gè)任務(wù)(或一組任務(wù))、對已完成的任務(wù)或執(zhí)行失敗做出反應(yīng)、協(xié)調(diào)檢查點(diǎn)以及協(xié)調(diào)故障恢復(fù)等其他的。該過程由三個(gè)不同的部分組成:
ResourceManager資源管理器
ResourceManager負(fù)責(zé) Flink 集群中的資源取消/分配和配置——它管理任務(wù)槽,任務(wù)槽是 Flink 集群中資源調(diào)度的單位(請參閱TaskManagers)。Flink 針對不同的環(huán)境和資源提供者(例如 YARN、Kubernetes 和獨(dú)立部署)實(shí)現(xiàn)了多個(gè) ResourceManager。在獨(dú)立設(shè)置中,ResourceManager 只能分配可用 TaskManager 的插槽,而無法自行啟動(dòng)新的 TaskManager。
ResourceManager主要負(fù)責(zé)資源的分配和管理,在Flink 集群中只有一個(gè)。所謂“資源”,主要是指TaskManager的任務(wù)槽(task slots)。任務(wù)槽就是Flink集群中的資源調(diào)配單元,包含了機(jī)器用來執(zhí)行計(jì)算的一組CPU和內(nèi)存資源。每一個(gè)任務(wù)(Task)都需要分配到一個(gè)slot上執(zhí)行。
Dispatcher分發(fā)器
Dispatcher提供 REST 接口來提交 Flink 應(yīng)用程序執(zhí)行,并為每個(gè)提交的作業(yè)啟動(dòng)一個(gè)新的 JobMaster?。它還運(yùn)行 Flink WebUI 以提供有關(guān)作業(yè)執(zhí)行的信息。
JobMaster
JobMaster負(fù)責(zé)管理單個(gè)?JobGraph的執(zhí)行。Flink 集群中可以同時(shí)運(yùn)行多個(gè)作業(yè),每個(gè)作業(yè)都有自己的 JobMaster。
所以JobMaster和具體的Job是一一對應(yīng)的,多個(gè)Job可以同時(shí)運(yùn)行在一個(gè)Flink集群中, 每個(gè)Job都有一個(gè)自己的JobMaster。
在作業(yè)提交時(shí),JobMaster會(huì)先接收到要執(zhí)行的應(yīng)用。JobMaster會(huì)把JobGraph轉(zhuǎn)換成一個(gè)物理層面的數(shù)據(jù)流圖,這個(gè)圖被叫作“執(zhí)行圖”(ExecutionGraph),它包含了所有可以并發(fā)執(zhí)行的任務(wù)。JobMaster會(huì)向資源管理器(ResourceManager)發(fā)出請求,申請執(zhí)行任務(wù)必要的資源。一旦它獲取到了足夠的資源,就會(huì)將執(zhí)行圖分發(fā)到真正運(yùn)行它們的TaskManager上。
總是至少有一個(gè) JobManager。高可用性設(shè)置可能有多個(gè) JobManager,其中一個(gè)始終是領(lǐng)導(dǎo)者,其他是?備用(請參閱高可用性 (HA))。
TaskManager?
必須始終至少有一個(gè) TaskManager。TaskManager 中資源調(diào)度的最小單位是任務(wù)槽。TaskManager 中的任務(wù)槽數(shù)表示并發(fā)處理任務(wù)的數(shù)量。請注意,多個(gè)運(yùn)算符可以在一個(gè)任務(wù)槽中執(zhí)行(請參閱任務(wù)和運(yùn)算符鏈)。
一個(gè)任務(wù)槽對應(yīng)一個(gè)任務(wù)。
TaskManager是Flink中的工作進(jìn)程,數(shù)據(jù)流的具體計(jì)算就是它來做的。Flink集群中必須至少有一個(gè)TaskManager;每一個(gè)TaskManager都包含了一定數(shù)量的任務(wù)槽(task slots)。
Slot是資源調(diào)度的最小單位,slot的數(shù)量限制了TaskManager能夠并行處理的任務(wù)數(shù)量。
啟動(dòng)之后,TaskManager會(huì)向資源管理器注冊它的slots;收到資源管理器的指令后,TaskManager就會(huì)將一個(gè)或者多個(gè)槽位提供給JobMaster調(diào)用,JobMaster就可以分配任務(wù)來執(zhí)行了。
在執(zhí)行過程中,TaskManager可以緩沖數(shù)據(jù),還可以跟其他運(yùn)行同一應(yīng)用的TaskManager交換數(shù)據(jù)。
并行度
一個(gè)flink程序處理數(shù)據(jù)的流程中一般會(huì)包含多個(gè)算子,每個(gè)算子都會(huì)處理數(shù)據(jù),如果要處理的數(shù)據(jù)特別大,可能就會(huì)導(dǎo)致算子處理數(shù)據(jù)的負(fù)擔(dān)過大,導(dǎo)致速度太慢。為了應(yīng)對這種現(xiàn)象,我們可以把要處理的數(shù)據(jù)分流,分到不同的節(jié)點(diǎn)上同時(shí)處理,而并行度就是同時(shí)處理數(shù)據(jù)的節(jié)點(diǎn)數(shù)。
在Flink執(zhí)行過程中,每一個(gè)算子(operator)可以包含一個(gè)或多個(gè)子任務(wù)(operator subtask),這些子任務(wù)在不同的線程、不同的物理機(jī)或不同的容器中完全獨(dú)立地執(zhí)行。
比如:sum.print().setParallelism(2)比如這個(gè),我們給print輸出算子設(shè)置了并行度2,sum數(shù)據(jù)流的輸出任務(wù)就會(huì)被分成兩個(gè)子任務(wù),到不同的物理機(jī)上執(zhí)行輸出:
并行度設(shè)置
代碼中設(shè)置 我們在代碼中,可以很簡單地在算子后跟著調(diào)用 setParallelism()方法,來設(shè)置當(dāng)前算子的并行度。這種方式設(shè)置的并行度,只針對當(dāng)前算子有效,可以看哪個(gè)算子處理數(shù)據(jù)負(fù)擔(dān)比較大,就可以將該算子的并行度設(shè)置大一點(diǎn)。
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); // 給map設(shè)置并行度2設(shè)置當(dāng)前環(huán)境的并行度 這樣代碼中所有算子,默認(rèn)的并行度就都為 2 了。我們一般不會(huì)在程序中設(shè)置全局并行度, 因?yàn)槿绻诔绦蛑袑θ植⑿卸冗M(jìn)行硬編碼,會(huì)導(dǎo)致無法動(dòng)態(tài)擴(kuò)容。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);
提交應(yīng)用時(shí)設(shè)置 在使用 flink run 命令提交應(yīng)用時(shí),可以增加-p 參數(shù)來指定當(dāng)前應(yīng)用程序執(zhí)行的并行度, 它的作用類似于執(zhí)行環(huán)境的全局設(shè)置。如果我們直接在Web UI 上提交作業(yè),也可以在對應(yīng)輸入框中直接添加并行度。
bin/flink run –p 2 –c 全類名 jar包配置文件中設(shè)置 我們還可以直接在集群的配置文件 flink-conf.yaml 中直接更改默認(rèn)并行度: parallelism.default: 2 ?
并行度設(shè)置優(yōu)先級從上而下遞減。
Flink部署
flink部署模式
Flink集群的部署方式是靈活的,支持在Local,Standalone,Yarn,Mesos,Docker,Kubernetes部署等。
而具體的如何跟這些資源交互,例如通信,回收,隔離等,則需要根據(jù)Flink job的提交方式,或者說是運(yùn)行方式來決定。
flink的部署方式:
在應(yīng)用程序模式下,在會(huì)話模式下,在 Per-Job 模式下(已棄用)。
它們的區(qū)別主要在于:集群的生命周期以及資源的分配方式;以及應(yīng)用的main方法到底在哪里執(zhí)行——客戶端(Client)還是JobManager。
會(huì)話模式(Session Mode)
集群生命周期:在 Flink Session 集群中,客戶端連接到一個(gè)預(yù)先存在的、長期運(yùn)行的flink集群,該集群可以接受多個(gè)作業(yè)提交。即使所有作業(yè)完成后,集群(和 JobManager)仍將繼續(xù)運(yùn)行直到手動(dòng)停止 session 為止。因此,F(xiàn)link Session 集群的壽命不受任何 Flink 作業(yè)壽命的約束。 資源隔離:TaskManager slot 由 ResourceManager 在提交作業(yè)時(shí)分配,并在作業(yè)完成時(shí)釋放。由于所有作業(yè)都共享同一集群,因此在集群資源方面存在一些競爭 — 例如提交工作階段的網(wǎng)絡(luò)帶寬。此共享設(shè)置的局限性在于,如果 TaskManager 崩潰,則在此 TaskManager 上運(yùn)行 task 的所有作業(yè)都將失??;類似的,如果 JobManager 上發(fā)生一些致命錯(cuò)誤,它將影響集群中正在運(yùn)行的所有作業(yè)。 其他注意事項(xiàng):擁有一個(gè)預(yù)先存在的集群可以節(jié)省大量時(shí)間申請資源和啟動(dòng) TaskManager。有種場景很重要,作業(yè)執(zhí)行時(shí)間短并且啟動(dòng)時(shí)間長會(huì)對端到端的用戶體驗(yàn)產(chǎn)生負(fù)面的影響 — 就像對簡短查詢的交互式分析一樣,希望作業(yè)可以使用現(xiàn)有資源快速執(zhí)行計(jì)算。
單作業(yè)模式(Per-Job Mode)
應(yīng)用模式(Application Mode)
集群生命周期:Flink 應(yīng)用程序集群是一個(gè)專用的 Flink 集群,僅執(zhí)行來自一個(gè) Flink 應(yīng)用程序的作業(yè),并且該?main()方法在集群上運(yùn)行,而不是在客戶端上運(yùn)行。作業(yè)提交是一個(gè)一步過程:您不需要先啟動(dòng) Flink 集群,然后將作業(yè)提交到現(xiàn)有集群會(huì)話;相反,您將應(yīng)用程序邏輯和依賴項(xiàng)打包到可執(zhí)行作業(yè) JAR 中,集群入口點(diǎn) (?ApplicationClusterEntryPoint) 負(fù)責(zé)調(diào)用該main()方法來提取 JobGraph。例如,這允許您像 Kubernetes 上的任何其他應(yīng)用程序一樣部署 Flink 應(yīng)用程序。因此,F(xiàn)link 應(yīng)用程序集群的生命周期與 Flink 應(yīng)用程序的生命周期綁定在一起。 資源隔離:在 Flink 應(yīng)用程序集群中,ResourceManager 和 Dispatcher 的范圍僅限于單個(gè) Flink 應(yīng)用程序,這比 Flink 會(huì)話集群提供了更好的關(guān)注點(diǎn)分離。
這里我們所講到的部署模式,相對是比較抽象的概念。實(shí)際應(yīng)用時(shí),一般需要和資源管理平臺(tái)結(jié)合起來,選擇特定的模式來分配資源、部署應(yīng)用。接下來,我們就針對不同的資源提供者的場景,具體介紹Flink的部署方式。
Standalone運(yùn)行模式
會(huì)話模式部署
當(dāng)集群啟動(dòng)后,TaskManager會(huì)向資源管理器注冊它的任務(wù)槽slots;收到資源管理器的指令后,TaskManager就會(huì)將一個(gè)或者多個(gè)任務(wù)槽位提供給JobMaster調(diào)用,JobMaster就可以分配任務(wù)來執(zhí)行了。
當(dāng)用戶提交一個(gè)flink計(jì)算任務(wù)時(shí),分發(fā)器啟動(dòng)一個(gè)新的 JobMaster,JobMaster會(huì)先接收到要執(zhí)行的任務(wù)應(yīng)用,JobMaster會(huì)把JobGraph轉(zhuǎn)換成一個(gè)物理層面的數(shù)據(jù)流圖,這個(gè)圖被叫作“執(zhí)行圖”(ExecutionGraph),它包含了所有可以并發(fā)執(zhí)行的任務(wù)。
JobMaster會(huì)向資源管理器(ResourceManager)發(fā)出請求,申請執(zhí)行任務(wù)必要的資源任務(wù)槽。一旦它獲取到了足夠的資源,就會(huì)將執(zhí)行圖分發(fā)到真正運(yùn)行它們的TaskManager(我的理解是哪個(gè)TaskManager有空閑的任務(wù)槽slot)上。
0)集群規(guī)劃
節(jié)點(diǎn)服務(wù)器hadoop102hadoop103hadoop104角色充當(dāng)JobManager和TaskManagerTaskManagerTaskManager
具體安裝部署步驟如下:
*1)下載并解壓安裝包*
(1)https://flink.apache.org/downloads/ 下載安裝包flink-1.17.0-bin-scala_2.12.tgz,將該jar包上傳到hadoop102節(jié)點(diǎn)服務(wù)器的/opt/software路徑上。
(2)在/opt/software路徑上解壓flink-1.17.0-bin-scala_2.12.tgz到/opt/module路徑上。
[atguigu@hadoop102 software]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
*2)修改集群配置
(1)進(jìn)入conf路徑,修改flink-conf.yaml文件,指定hadoop102節(jié)點(diǎn)服務(wù)器為JobManager
[atguigu@hadoop102 conf]$ vim flink-conf.yaml
修改如下內(nèi)容:
# JobManager節(jié)點(diǎn)地址.
jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop102
rest.bind-address: 0.0.0.0
\# TaskManager節(jié)點(diǎn)地址.需要配置為當(dāng)前機(jī)器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop102
(2)修改workers文件,指定hadoop102、hadoop103和hadoop104為TaskManager
[atguigu@hadoop102 conf]$ vim workers
修改如下內(nèi)容:
hadoop102
hadoop103
hadoop104
(3)修改masters文件
[atguigu@hadoop102 conf]$ vim masters
修改如下內(nèi)容:
hadoop102:8081
(4)另外,在flink-conf.yaml文件中還可以對集群中的JobManager和TaskManager組件進(jìn)行優(yōu)化配置,主要配置項(xiàng)如下:
l jobmanager.memory.process.size:對JobManager進(jìn)程可使用到的全部內(nèi)存進(jìn)行配置,包括JVM元空間和其他開銷,默認(rèn)為1600M,可以根據(jù)集群規(guī)模進(jìn)行適當(dāng)調(diào)整。
l taskmanager.memory.process.size:對TaskManager進(jìn)程可使用到的全部內(nèi)存進(jìn)行配置,包括JVM元空間和其他開銷,默認(rèn)為1728M,可以根據(jù)集群規(guī)模進(jìn)行適當(dāng)調(diào)整。
l taskmanager.numberOfTaskSlots:對每個(gè)TaskManager能夠分配的Slot數(shù)量進(jìn)行配置,默認(rèn)為1,可根據(jù)TaskManager所在的機(jī)器能夠提供給Flink的CPU數(shù)量決定。所謂Slot就是TaskManager中具體運(yùn)行一個(gè)任務(wù)所分配的計(jì)算資源。
l parallelism.default:Flink任務(wù)執(zhí)行的并行度,默認(rèn)為1。優(yōu)先級低于代碼中進(jìn)行的并行度配置和任務(wù)提交時(shí)使用參數(shù)指定的并行度數(shù)量。
關(guān)于Slot和并行度的概念,我們會(huì)在下一章做詳細(xì)講解。
*3)分發(fā)安裝目錄*
(1)配置修改完畢后,將Flink安裝目錄發(fā)給另外兩個(gè)節(jié)點(diǎn)服務(wù)器。
[atguigu@hadoop102 module]$ xsync flink-1.17.0/
(2)修改hadoop103的 taskmanager.host
[atguigu@hadoop103 conf]$ vim flink-conf.yaml
修改如下內(nèi)容:
# TaskManager節(jié)點(diǎn)地址.需要配置為當(dāng)前機(jī)器名
taskmanager.host: hadoop103
(3)修改hadoop104的 taskmanager.host
[atguigu@hadoop104 conf]$ vim flink-conf.yaml
修改如下內(nèi)容:
# TaskManager節(jié)點(diǎn)地址.需要配置為當(dāng)前機(jī)器名
taskmanager.host: hadoop104
*4)啟動(dòng)集群*
(1)在hadoop102節(jié)點(diǎn)服務(wù)器上執(zhí)行start-cluster.sh啟動(dòng)Flink集群:
[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
(2)查看進(jìn)程情況:
[atguigu@hadoop102 flink-1.17.0]$ jpsall
=============== hadoop102 ===============
4453 StandaloneSessionClusterEntrypoint
4458 TaskManagerRunner
4533 Jps
=============== hadoop103 ===============
2872 TaskManagerRunner
2941 Jps
=============== hadoop104 ===============
2948 Jps
2876 TaskManagerRunner
*5)訪問Web UI*
啟動(dòng)成功后,同樣可以訪問http://hadoop102:8081對flink集群和任務(wù)進(jìn)行監(jiān)控管理。
這里可以明顯看到,當(dāng)前集群的TaskManager數(shù)量為3;由于默認(rèn)每個(gè)TaskManager的Slot數(shù)量為1,所以總Slot數(shù)和可用Slot數(shù)都為3。
向集群提交作業(yè)
上傳要計(jì)算的任務(wù),這邊是寫了個(gè)計(jì)算單詞出現(xiàn)個(gè)數(shù)的程序,數(shù)據(jù)源來自socket。具體步驟如下:
*1**)環(huán)境準(zhǔn)備*
在hadoop102中執(zhí)行以下命令啟動(dòng)netcat。
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
*2**)打包程序jar
(1)在我們編寫的Flink入門程序的pom.xml文件中添加打包插件的配置,具體如下:
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
(2)插件配置完畢后,可以使用IDEA的Maven工具執(zhí)行package命令,出現(xiàn)如下提示即表示打包成功。
-------------------------------------------------------------------
[INFO] BUILD SUCCESS
-------------------------------------------------------------------
打包完成后,在target目錄下即可找到所需JAR包,JAR包會(huì)有兩個(gè),F(xiàn)linkTutorial-1.0-SNAPSHOT.jar和FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,因?yàn)榧褐幸呀?jīng)具備任務(wù)運(yùn)行所需的所有依賴,所以建議使用FlinkTutorial-1.0-SNAPSHOT.jar。比較大的帶有依賴。
*3)**在Web*?*UI上提交作業(yè)*
(1)任務(wù)打包完成后,我們打開Flink的WEB UI頁面,在右側(cè)導(dǎo)航欄點(diǎn)擊“Submit New Job”,然后點(diǎn)擊按鈕“+ Add New”,選擇要上傳運(yùn)行的JAR包,如下圖所示。
JAR包上傳完成,如下圖所示:
(2)點(diǎn)擊該JAR包,出現(xiàn)任務(wù)配置頁面,進(jìn)行相應(yīng)配置。
主要配置程序入口主類的全類名,任務(wù)運(yùn)行的并行度,任務(wù)運(yùn)行所需的配置參數(shù)和保存點(diǎn)路徑等,如下圖所示,配置完成后,即可點(diǎn)擊按鈕“Submit”,將任務(wù)提交到集群運(yùn)行。
(3)任務(wù)提交成功之后,可點(diǎn)擊左側(cè)導(dǎo)航欄的“Running Jobs”查看程序運(yùn)行列表情況。
(4)測試
? ①在socket端口中輸入hello
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
hello
②先點(diǎn)擊Task Manager,然后點(diǎn)擊右側(cè)的192.168.10.104服務(wù)器節(jié)點(diǎn)
? ③點(diǎn)擊Stdout,就可以看到hello單詞的統(tǒng)計(jì)
? 注意:如果hadoop104節(jié)點(diǎn)沒有統(tǒng)計(jì)單詞數(shù)據(jù),可以去其他TaskManager節(jié)點(diǎn)查看。
(4)點(diǎn)擊該任務(wù),可以查看任務(wù)運(yùn)行的具體情況,也可以通過點(diǎn)擊“Cancel Job”結(jié)束任務(wù)運(yùn)行。
4)命令行提交作業(yè)
除了通過WEB UI界面提交任務(wù)之外,也可以直接通過命令行來提交任務(wù)。這里為方便起見,我們可以先把jar包直接上傳到目錄flink-1.17.0下
(1)首先需要啟動(dòng)集群。
[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
(2)在hadoop102中執(zhí)行以下命令啟動(dòng)netcat。
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
(3)將flink程序運(yùn)行jar包上傳到/opt/module/flink-1.17.0路徑。
(4)進(jìn)入到flink的安裝路徑下,在命令行使用flink run命令提交作業(yè)。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink run -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
這里的參數(shù) -m指定了提交到的JobManager,-c指定了入口類。
(5)在瀏覽器中打開Web UI,http://hadoop102:8081查看應(yīng)用執(zhí)行情況。
用netcat輸入數(shù)據(jù),可以在TaskManager的標(biāo)準(zhǔn)輸出(Stdout)看到對應(yīng)的統(tǒng)計(jì)結(jié)果。
(6)在/opt/module/flink-1.17.0/log路徑中,可以查看TaskManager節(jié)點(diǎn)。
[atguigu@hadoop102 log]$ cat flink-atguigu-standalonesession-0-hadoop102.out
(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)
應(yīng)用模式部署
Yarn運(yùn)行模式
在將Flink任務(wù)部署至YARN集群之前,需要確認(rèn)集群是否安裝有Hadoop,保證Hadoop版本至少在2.2以上,并且集群中安裝有HDFS服務(wù)。
會(huì)話模式部署
應(yīng)用模式部署
DataStream API
DataStream API是Flink的核心層API,使用API實(shí)現(xiàn)對數(shù)據(jù)流的計(jì)算和處理。
一個(gè)Flink程序,其實(shí)就是對數(shù)據(jù)流DataStream的各種轉(zhuǎn)換。具體來說,代碼基本上都由以下幾部分構(gòu)成:
獲得一個(gè)execution environment,加載/創(chuàng)建初始數(shù)據(jù),指定此數(shù)據(jù)的轉(zhuǎn)換,指定放置計(jì)算結(jié)果的位置,觸發(fā)程序執(zhí)行
/**
* 計(jì)算單詞出現(xiàn)個(gè)數(shù)
*
* flink處理無界數(shù)據(jù)流
* 程序會(huì)一直運(yùn)行,一有數(shù)據(jù)來就處理
*
* @author shkstart
* @create 2023-09-10 16:44
*/
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// 1.創(chuàng)建flink流式處理環(huán)境 StreamExecutionEnvironment
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.接收要待處理的數(shù)據(jù)
DataStreamSource
// 3.處理數(shù)據(jù) 數(shù)據(jù)處理后格式:(word,2)單詞和對應(yīng)出現(xiàn)的次數(shù)
/**
* flatMap(FlatMapFunction
* 為數(shù)據(jù)流的每一個(gè)元素調(diào)用flatMapper
*/
System.out.println("原始數(shù)據(jù)流:" + dateStream);
// FlatMapFunction轉(zhuǎn)換,處理數(shù)據(jù)流元素
FlatMapFunction
SingleOutputStreamOperator
dateStream.flatMap(flatMapFunction);
System.out.println("處理后的數(shù)據(jù)流:" + transformedDataStream);
// 按照word分組 按string分組 將Integer累加
SingleOutputStreamOperator
// 4.展示
sum.print();
// 5.執(zhí)行 開始處理
// 代碼末尾需要調(diào)用 流式處理環(huán)境 的execute方法,開始執(zhí)行任務(wù)
see.execute();
}
}
public class FlatMapFunctionImpl implements FlatMapFunction
/**
* 轉(zhuǎn)換數(shù)據(jù)流元素
* @param value 輸入的元素
* @param out 輸出的元素
* @throws Exception
*/
@Override
public void flatMap(String value, Collector
// 切分
String[] words = value.split(" ");
// 收集
for (String word : words) {
out.collect(Tuple2.of(word,1));
}
}
}
1、獲取執(zhí)行環(huán)境
Flink程序可以在各種上下文環(huán)境中運(yùn)行:我們可以在本地JVM中執(zhí)行程序,也可以提交到遠(yuǎn)程集群上運(yùn)行。
不同的環(huán)境,代碼的提交運(yùn)行的過程會(huì)有所不同。這就要求我們在提交作業(yè)執(zhí)行計(jì)算時(shí),首先必須獲取當(dāng)前Flink的運(yùn)行環(huán)境,從而建立起與Flink框架之間的聯(lián)系。
可以通過以下三種方法獲取執(zhí)行環(huán)境:
// 這個(gè)最常用的,官方也是推薦這個(gè)。這個(gè)會(huì)自動(dòng)識(shí)別是本地運(yùn)行還是遠(yuǎn)程集群運(yùn)行,從而創(chuàng)建對應(yīng)的執(zhí)行環(huán)境。
StreamExecutionEnvironment.getExecutionEnvironment();
// 這個(gè)方法返回一個(gè)本地執(zhí)行環(huán)境。可以在調(diào)用時(shí)傳入一個(gè)參數(shù),指定默認(rèn)的并行度;
// 如果不傳入,則默認(rèn)并行度就是本地的CPU核心數(shù)。
StreamExecutionEnvironment.createLocalEnvironment();
// 這個(gè)方法返回集群執(zhí)行環(huán)境。需要在調(diào)用時(shí)指定JobManager的主機(jī)名和端口號(hào),并指定要在集群中運(yùn)行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager主機(jī)名
1234, // JobManager進(jìn)程端口號(hào)
"path/to/jarFile.jar" // 提交給JobManager的JAR包
);
// 在獲取到程序執(zhí)行環(huán)境后,我們還可以對執(zhí)行環(huán)境進(jìn)行靈活的設(shè)置。
// 比如可以全局設(shè)置程序的并行度、禁用算子鏈,還可以定義程序的時(shí)間語義、配置容錯(cuò)機(jī)制。
執(zhí)行模式
DataStream API執(zhí)行模式包括:流執(zhí)行模式、批執(zhí)行模式和自動(dòng)模式。
流執(zhí)行模式(Streaming)
這是DataStream API最經(jīng)典的模式,一般用于需要持續(xù)實(shí)時(shí)處理的無界數(shù)據(jù)流。默認(rèn)情況下,程序使用的就是Streaming執(zhí)行模式。
// 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
批執(zhí)行模式(Batch)
專門用于批處理(處理有界數(shù)據(jù))的執(zhí)行模式。
自動(dòng)模式(AutoMatic)
在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界,來自動(dòng)選擇執(zhí)行模式。
批執(zhí)行模式的使用。主要有兩種方式:
(1)通過命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
bin/flink run -Dexecution.runtime-mode=BATCH -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
在通過命令行提交作業(yè)時(shí),增加execution.runtime-mode參數(shù),指定值為BATCH。
(2)通過代碼配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代碼中,直接基于執(zhí)行環(huán)境調(diào)用setRuntimeMode方法,傳入BATCH模式。
實(shí)際應(yīng)用中一般不會(huì)在代碼中配置,而是使用命令行,這樣更加靈活。
2、數(shù)據(jù)源(源算子)
flink處理的數(shù)據(jù)來源,可以從各種數(shù)據(jù)源獲取數(shù)據(jù),然后構(gòu)建DataStream來轉(zhuǎn)換數(shù)據(jù),最后將結(jié)果寫入保存。
從集合中讀取數(shù)據(jù)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("=========");
List
DataStreamSource
source.print();
env.execute();
從文件中讀取數(shù)據(jù)
public class MyFileSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/**
* 文件數(shù)據(jù)源
* 文件可以來自本地系統(tǒng),還可以從HDFS目錄下讀取,使用路徑hdfs://...
*/
FileSource
FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/words.txt")).build();
DataStreamSource
fileDataStream.print();
env.execute();
}
}
從socket中讀取數(shù)據(jù)
不論從集合還是文件,我們讀取的其實(shí)都是有界數(shù)據(jù)。在流處理的場景中,數(shù)據(jù)往往是無界的。
我們之前用到的讀取socket文本流,就是流處理場景。但是這種方式由于吞吐量小、穩(wěn)定性較差,一般也是用于測試。
DataStream
從Kafka讀取數(shù)據(jù)
flink可以和kafka配合構(gòu)建實(shí)時(shí)數(shù)據(jù)處理和流式數(shù)據(jù)計(jì)算的架構(gòu)。
生產(chǎn)者將數(shù)據(jù)源中的數(shù)據(jù)發(fā)送到kafka中的主題topic中,然后flink連接到kafka取topic中的數(shù)據(jù)來處理,然后將處理結(jié)果輸出到外部系統(tǒng)比如mysql中。
我有個(gè)疑問,為啥flink不直接連接數(shù)據(jù)源,直接從數(shù)據(jù)源中取數(shù)據(jù)處理,用kafka作為中間件的好處?
第一個(gè)解耦:如果flink直接連接數(shù)據(jù)源取數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)源和flink處理程序的耦合性太高,而引入kafka作為中間件,數(shù)據(jù)源只要將數(shù)據(jù)發(fā)給kafka就好,而無需知道數(shù)據(jù)具體用來干嘛,這樣數(shù)據(jù)源和flink處理程序的耦合性就降低了,可以更靈活的擴(kuò)展和維護(hù)程序。
第二個(gè)容錯(cuò)可靠性: Kafka 允許數(shù)據(jù)源將實(shí)時(shí)數(shù)據(jù)發(fā)送到主題,并在其中持久化。這樣,即使 Flink 應(yīng)用程序暫時(shí)不可用或者出故障了,數(shù)據(jù)仍然保存在 Kafka 中,避免了數(shù)據(jù)丟失,flink恢復(fù)后可以繼續(xù)消費(fèi)數(shù)據(jù)。
第三個(gè)水平擴(kuò)展:Kafka 允許水平擴(kuò)展,使其能夠處理大量的數(shù)據(jù),可以增加分區(qū),將數(shù)據(jù)分發(fā)到不同的節(jié)點(diǎn)上。flink可以增加任務(wù)并行度來擴(kuò)展處理能力。
kafka的優(yōu)點(diǎn)和缺點(diǎn):
高吞吐量:Kafka 可以處理非常高的吞吐量,每秒數(shù)百萬的消息。這使得它適用于需要大規(guī)模數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)流的場景。可擴(kuò)展性:Kafka 允許將數(shù)據(jù)分布式地存儲(chǔ)和處理在多個(gè)節(jié)點(diǎn)上。它支持在集群中動(dòng)態(tài)地添加和刪除節(jié)點(diǎn),以適應(yīng)不同的負(fù)載需求。持久性:Kafka 將消息持久化到磁盤,因此即使消費(fèi)者離線,也不會(huì)丟失數(shù)據(jù)。它可以作為可靠的數(shù)據(jù)存儲(chǔ)和消息傳遞系統(tǒng)。實(shí)時(shí)處理:Kafka 提供了低延遲的消息傳遞,使得實(shí)時(shí)數(shù)據(jù)流處理成為可能。它支持發(fā)布-訂閱模型和流處理模型,方便實(shí)時(shí)數(shù)據(jù)分析和處理。
盡管 Kafka 具有許多優(yōu)點(diǎn),但也有一些潛在的缺點(diǎn):
復(fù)雜性:Kafka 是一個(gè)分布式系統(tǒng),需要配置和管理多個(gè)節(jié)點(diǎn)。這對于初學(xué)者來說可能有一定的學(xué)習(xí)曲線,而且需要更復(fù)雜的運(yùn)維。存儲(chǔ)需求:由于 Kafka 持久化消息到磁盤,因此它需要占用較多的存儲(chǔ)空間。這可能導(dǎo)致存儲(chǔ)成本的增加。配置和監(jiān)控:為了確保 Kafka 集群的正常運(yùn)行,需要進(jìn)行適當(dāng)?shù)呐渲煤捅O(jiān)控。這需要一些管理和維護(hù)工作,以確保高可用性和性能。
總之,Kafka 是一個(gè)強(qiáng)大的分布式流處理平臺(tái),具有高吞吐量、可靠性、實(shí)時(shí)處理和可擴(kuò)展性等優(yōu)點(diǎn)。它在處理大規(guī)模實(shí)時(shí)數(shù)據(jù)流方面表現(xiàn)出色,但也需要考慮到復(fù)雜性和存儲(chǔ)需求等潛在的缺點(diǎn)。
flink的kafka消費(fèi)者:
Flink官方提供了連接工具flink-connector-kafka,直接幫我們實(shí)現(xiàn)了一個(gè)消費(fèi)者FlinkKafkaConsumer,它就是用來讀取Kafka數(shù)據(jù)的SourceFunction。
所以想要以Kafka作為數(shù)據(jù)源獲取數(shù)據(jù),我們只需要引入Kafka連接器的依賴。Flink官方提供的是一個(gè)通用的Kafka連接器,它會(huì)自動(dòng)跟蹤最新版本的Kafka客戶端。目前最新版本只支持0.10.0版本以上的Kafka。這里我們需要導(dǎo)入的依賴如下。
代碼如下:
public class MyKafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource
.setBootstrapServers("192.168.10.102:9092")
.setTopics("fi")
.setGroupId("flink-consumer")
.setValueOnlyDeserializer(new SimpleStringSchema()) // 反序列器,只反序列值
// 可以設(shè)置key 和 value的反序列器
// .setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
// .setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
// flink消費(fèi)kafka的策略: earliest從頭開始消費(fèi) latest消費(fèi)最新數(shù)據(jù)
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStreamSource
kafkaDataStreamSource.print("kafka");
env.execute();
}
}
3、轉(zhuǎn)換算子
從數(shù)據(jù)源讀入數(shù)據(jù)之后,我們就可以使用各種轉(zhuǎn)換算子,將數(shù)據(jù)分析、計(jì)算、處理成我們想要的樣子。
基本轉(zhuǎn)換算子
(map/?filter/?flatMap)?
// map映射算子 來一條數(shù)據(jù)處理產(chǎn)出一條數(shù)據(jù),這里來一條String,處理成一個(gè)person
SingleOutputStreamOperator
String[] split = value.split(",");
Person person = null;
try {
person = new Person(Integer.parseInt(split[0]), Integer.parseInt(split[1]), split[2]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
return person;
});
分區(qū)算子
對于Flink而言,DataStream是沒有直接進(jìn)行聚合的API的。因?yàn)槲覀儗A繑?shù)據(jù)做聚合肯定要進(jìn)行分區(qū)并行處理,這樣才能提高效率。所以在Flink中,要做聚合,需要先進(jìn)行分區(qū);這個(gè)操作就是通過keyBy來完成的。
keyBy是聚合前必須要用到的一個(gè)算子。keyBy通過指定鍵(key),可以將一條流從邏輯上劃分成不同的分區(qū)(partitions)。這里所說的分區(qū),其實(shí)就是并行處理的子任務(wù)。
基于不同的key,流中的數(shù)據(jù)將被分配到不同的分區(qū)中去;這樣一來,所有具有相同的key的數(shù)據(jù),都將被發(fā)往同一個(gè)分區(qū)。
public class TransKeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);
// 根據(jù)id進(jìn)行分區(qū)
// 方式一:使用Lambda表達(dá)式
KeyedStream
// 方式二:使用匿名類實(shí)現(xiàn)KeySelector
KeyedStream
@Override
public String getKey(WaterSensor e) throws Exception {
return e.id;
}
});
env.execute();
}
}
簡單聚合
4、輸出算子sink
flink作為數(shù)據(jù)的處理框架,支持將最后的分析、計(jì)算、處理結(jié)果輸出到外部系統(tǒng),供其他使用。Flink程序中所有對外的輸出操作,一般都是利用Sink算子完成的。
輸出到文件
public class SinkFile {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每個(gè)目錄中,都有 并行度個(gè)數(shù)的 文件在寫入 。并行度多少,就會(huì)有多少個(gè)文件在同時(shí)寫入。
// env.setParallelism(2);
// 必須開啟checkpoint,否則后綴一直都是 .inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
// 數(shù)據(jù)生成器DataGeneratorSource
DataGeneratorSource
new GeneratorFunction
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1000),
Types.STRING
);
DataStreamSource
env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
// 輸出到文件系統(tǒng)
FileSink
// 輸出行式存儲(chǔ)的文件,指定路徑、指定編碼
.
// 輸出文件的一些配置: 文件名的前綴、后綴
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("lin-")
.withPartSuffix(".log")
.build()
)
// 按照目錄分桶:如下,就是每個(gè)小時(shí)一個(gè)目錄
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滾動(dòng)策略: 1分鐘 或 1m
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(1))
.withMaxPartSize(new MemorySize(1024*1024))
.build()
)
.build();
// dataGen.print();
dataGen.sinkTo(fieSink).setParallelism(3);
env.execute();
}
}
輸出到MySQL
public class SinkSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
// map映射算子 來一條數(shù)據(jù)處理產(chǎn)出一條數(shù)據(jù),這里來一條String,處理成一個(gè)person
SingleOutputStreamOperator
String[] split = value.split(",");
Person person = null;
try {
person = new Person(Integer.parseInt(split[0]), Integer.parseInt(split[1]), split[2]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
return person;
});
/**
* JdbcSink.sink(
* sqlDmlStatement, // mandatory
* jdbcStatementBuilder, // mandatory
* jdbcExecutionOptions, // optional
* jdbcConnectionOptions // mandatory
* );
*
* 第一個(gè)參數(shù): 執(zhí)行的sql,一般就是 insert into
* 第二個(gè)參數(shù): 預(yù)編譯sql, 對占位符填充值
* 第三個(gè)參數(shù): 執(zhí)行選項(xiàng) ---》 攢批、重試
* 第四個(gè)參數(shù): 連接選項(xiàng) ---》 url、用戶名、密碼
*/
SinkFunction
"insert into person values(?,?,?)",
new JdbcStatementBuilder
@Override
public void accept(PreparedStatement preparedStatement, Person o) throws SQLException {
// 每收到來自datastream的一條數(shù)據(jù),如何處理
if(o == null) return;
preparedStatement.setInt(1, o.getId());
preparedStatement.setInt(2, o.getAge());
preparedStatement.setString(3, o.getName());
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3) // 重試次數(shù)
.withBatchSize(100) // 批次的大?。簵l數(shù)
.withBatchIntervalMs(3000) // 批次的時(shí)間
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.31.47:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("123456")
.withConnectionCheckTimeoutSeconds(60) // 重試之間的超時(shí)時(shí)間,jdbc連接超時(shí)會(huì)重試,重試超過60s沒超過就超時(shí)異常。
.build()
);
// 指定數(shù)據(jù)流處理完流向哪個(gè)sink
processedDataStream.addSink(mysqlSink);
env.execute();
}
}
flink窗口
flink一般用于流式數(shù)據(jù)的處理,而流式數(shù)據(jù)就是說數(shù)據(jù)會(huì)源源不斷的進(jìn)來,然后來一條數(shù)據(jù)處理一條,但是這樣子處理不是非常高效,一般可以將數(shù)據(jù)攢一批,一批一批處理更加高效,將無限的數(shù)據(jù)切成一個(gè)一個(gè)數(shù)據(jù)塊進(jìn)行處理。flink中就是用一個(gè)一個(gè)窗口根據(jù)一定范圍將數(shù)據(jù)切割成一個(gè)個(gè)數(shù)據(jù)塊。
水桶接水:有一個(gè)水龍頭一直在流水,拿水桶去接,可以等待一段時(shí)間接一桶,然后換個(gè)桶繼續(xù)接,或者達(dá)到一定水量換一桶,這里桶就可以理解為窗口,水就是數(shù)據(jù)。
1、窗口分類
時(shí)間窗口
達(dá)到一定時(shí)間劃分一個(gè)窗口
計(jì)數(shù)窗口
達(dá)到一定數(shù)據(jù)量劃分一個(gè)窗口
窗口的具體實(shí)現(xiàn)可以分為4類:滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)、會(huì)話窗口(Session Window),以及全局窗口(Global Window)
滾動(dòng)窗口
根據(jù)時(shí)間或者數(shù)據(jù)個(gè)數(shù)進(jìn)行滾動(dòng),當(dāng)達(dá)到指定時(shí)間或者數(shù)據(jù)個(gè)數(shù)就滾動(dòng)生成一個(gè)窗口,然后對窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算處理。比如:計(jì)算每個(gè)小時(shí)的訂單數(shù)。
滑動(dòng)窗口
例如,我們定義一個(gè)長度為 1 小時(shí)、滑動(dòng)步長為 5 分鐘的滑動(dòng)窗口,那么就會(huì)統(tǒng)計(jì) 1 小時(shí)內(nèi)的數(shù)據(jù),每 5 分鐘統(tǒng)計(jì)一次。同樣,滑動(dòng)窗口可以基于時(shí)間定義,也可以基于數(shù)據(jù)個(gè)數(shù)定義。
比如:統(tǒng)計(jì)最近一個(gè)小時(shí)訂單數(shù),每5分鐘輸出一次,那就可以定義一個(gè)長度為 1 小時(shí)、滑動(dòng)步長為 5 分鐘的滑動(dòng)窗口。
會(huì)話窗口
數(shù)據(jù)相鄰時(shí)間超過指定時(shí)間,就會(huì)另起一個(gè)窗口來裝數(shù)據(jù)。
全局窗口
需要自定義觸發(fā)器,觸發(fā)窗口結(jié)束開始計(jì)算。
代碼定義
// 基于時(shí)間的滾動(dòng)窗口 窗口大小30s 30s新開一個(gè)窗口
AllWindowedStream
// 基于時(shí)間的滑動(dòng)窗口 窗口大小是30s 滑動(dòng)步長5s(5s計(jì)算一次)
AllWindowedStream
// 基于時(shí)間的會(huì)話窗口 會(huì)話時(shí)長30s 30s沒數(shù)據(jù)來就新開一個(gè)窗口
AllWindowedStream
// 全局窗口需要配合自定義觸發(fā)器
mapData.windowAll(GlobalWindows.create());
2、窗口函數(shù)
增量聚合函數(shù)
數(shù)據(jù)來一條處理一條,數(shù)據(jù)來一條就和上一條數(shù)據(jù)執(zhí)行增量聚合函數(shù),然后在窗口大小結(jié)束后輸出計(jì)算結(jié)果,因?yàn)楫吘筬link還是流式的處理,所以這邊就是我還沒到窗口大小,但是我一來數(shù)據(jù)我就處理,然后在窗口大小結(jié)束后輸出結(jié)果,否則你在窗口大小結(jié)束在計(jì)算窗口中的數(shù)據(jù)顯然效率不好。
歸約函數(shù)(ReduceFunction)
來一條數(shù)據(jù)就和上一條數(shù)據(jù)執(zhí)行歸約函數(shù),然后窗口大小結(jié)束輸出一次計(jì)算結(jié)果。
/**
* 輸入的數(shù)據(jù):
* id age name
* 1,1,1
* 1,1,2
* 1,1,3
* 1,1,4
* 1,1,5
* 1,1,6
* 1,1,7
* 1,18,8
*
* 輸出:
* reduce函數(shù)執(zhí)行了1--2
* reduce函數(shù)執(zhí)行了2--3
* reduce函數(shù)執(zhí)行了3--4
* reduce函數(shù)執(zhí)行了4--5
* reduce函數(shù)執(zhí)行了5--6
* 11> Person{id=1, age=6, name='6'} 30s窗口計(jì)算一次輸出
* reduce函數(shù)執(zhí)行了7--8
* 12> Person{id=1, age=19, name='8'} 然后就新開一個(gè)窗口也是30s計(jì)算輸出,看前面的12>就知道不是同一個(gè)線程
*/
SingleOutputStreamOperator
-> {
System.out.println("reduce函數(shù)執(zhí)行了" + value1.getName() + "--" + value2.getName());
return new Person(value2.getId(), value1.getAge() + value2.getAge(), value2.getName());
});
reducedData.print();
env.execute();
聚合函數(shù)(AggregateFunction)
ReduceFunction可以解決大多數(shù)歸約聚合的問題,但是這個(gè)接口有一個(gè)限制,就是聚合狀態(tài)的類型、輸出結(jié)果的類型都必須和輸入數(shù)據(jù)類型一樣。
Flink Window API中的aggregate就突破了這個(gè)限制,可以定義更加靈活的窗口聚合操作。這個(gè)方法需要傳入一個(gè)AggregateFunction的實(shí)現(xiàn)類作為參數(shù)。
全窗口函數(shù)
有些場景下,我們要做的計(jì)算必須基于全部的數(shù)據(jù)才有效,這時(shí)做增量聚合就沒什么意義了。 全窗口函數(shù)是數(shù)據(jù)來了先不計(jì)算,在內(nèi)部緩存起來,等到窗口大小結(jié)束要輸出結(jié)果的時(shí)候再取出數(shù)據(jù)進(jìn)行計(jì)算。
/**
* 全窗口函數(shù)
*/
public class AllWindowFunc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 源數(shù)據(jù)流
DataStreamSource
// map過的數(shù)據(jù)流
SingleOutputStreamOperator
String[] split = value.split(",");
try {
return new Person(Integer.parseInt(split[0]), Integer.parseInt(split[1]), split[2]);
} catch (NumberFormatException e) {
e.printStackTrace();
System.out.println("發(fā)生異常");
return null;
}
});
// 基于時(shí)間的滾動(dòng)窗口
AllWindowedStream
/**
* ProcessAllWindowFunction
*
*
*
*/
SingleOutputStreamOperator
/**
*
* @param context 上下文對象,可以獲取窗口對象,側(cè)輸出等等
* @param elements 窗口收集的數(shù)據(jù)
* @param out 收集器,收集器里面就是這個(gè)窗口的計(jì)算處理結(jié)果
* @throws Exception
*/
@Override
public void process(Context context, Iterable
System.out.println("開始計(jì)算數(shù)據(jù)");
TimeWindow window = context.window();
String winStartTime = MyUtil.parseLongTime(window.getStart(), "yyyy-MM-dd HH:mm:ss");
String winEndTime = MyUtil.parseLongTime(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
out.collect("窗口開始時(shí)間:" + winStartTime + ",窗口結(jié)束時(shí)間:" + winEndTime + ",窗口數(shù)據(jù)有:" + elements.toString());
}
});
/**
* 輸入的數(shù)據(jù):
* id age name
* 1,1,1lin
* 1,1,2lin 30s
*
* 1,1,3lin
* 1,1,4lin
* 1,1,5lin
* 1,1,6lin 30s
*
* 輸出的數(shù)據(jù):
* 開始計(jì)算數(shù)據(jù)
* 12> 窗口開始時(shí)間:2023-12-15 11:00:00,窗口結(jié)束時(shí)間:2023-12-15 11:00:30,窗口數(shù)據(jù)有:[Person{id=1, age=1, name='1lin'}, Person{id=1, age=1, name='2lin'}]
* 開始計(jì)算數(shù)據(jù)
* 1> 窗口開始時(shí)間:2023-12-15 11:00:30,窗口結(jié)束時(shí)間:2023-12-15 11:01:00,窗口數(shù)據(jù)有:[Person{id=1, age=1, name='3lin'}, Person{id=1, age=1, name='4lin'}, Person{id=1, age=1, name='5lin'}, Person{id=1, age=1, name='6lin'}]
*/
allWData.print();
env.execute();
}
}
觸發(fā)器
觸發(fā)器主要是用來控制窗口什么時(shí)候觸發(fā)計(jì)算。所謂的“觸發(fā)計(jì)算”,本質(zhì)上就是執(zhí)行窗口函數(shù),所以可以認(rèn)為是計(jì)算得到結(jié)果并輸出的過程。
基于WindowedStream調(diào)用.trigger()方法,就可以傳入一個(gè)自定義的窗口觸發(fā)器(Trigger)。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
移除器
移除器主要用來定義移除某些數(shù)據(jù)的邏輯?;赪indowedStream調(diào)用.evictor()方法,就可以傳入一個(gè)自定義的移除器(Evictor)。Evictor是一個(gè)接口,不同的窗口類型都有各自預(yù)實(shí)現(xiàn)的移除器。
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
flink時(shí)間
一些報(bào)錯(cuò)
提交任務(wù)后報(bào)錯(cuò):
2023-12-12 10:21:35,243 WARN ?akka.remote.ReliableDeliverySupervisor ? ? ? ? ? ? ? ? ? ? ? [] - Association with remote system [akka.tcp://flink-metrics@hadoop102:46371] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@hadoop102:46371]] Caused by: [java.net.ConnectException: 拒絕連接: hadoop102/192.168.10.102:46371]
因?yàn)閒link程序中通過socket連接到9999端口,但是這個(gè)服務(wù)沒開,就導(dǎo)致連接失敗。通過nc -lk 9999?這將使 netcat 在本地監(jiān)聽端口 9999,并保持連接??梢酝ㄟ^在另一個(gè)終端窗口中使用 netcat 或其他工具建立到該端口的連接,實(shí)現(xiàn)簡單的網(wǎng)絡(luò)通信。?
編碼實(shí)操
// 1. 創(chuàng)建流式執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
在Apache Flink的默認(rèn)配置下,當(dāng)你調(diào)用StreamExecutionEnvironment.getExecutionEnvironment()時(shí),它會(huì)自動(dòng)使用本地環(huán)境進(jìn)行執(zhí)行,也就是說會(huì)在本地啟動(dòng)一個(gè)Flink集群,使用默認(rèn)配置。
這種方式對于本地開發(fā)和測試是非常方便的,因?yàn)樗鼰o需額外的配置,你可以立即開始編寫和運(yùn)行Flink程序。但是在生產(chǎn)環(huán)境中,你通常需要配置Flink集群的地址和端口等參數(shù),以便連接到真實(shí)的Flink集群。
柚子快報(bào)邀請碼778899分享:大數(shù)據(jù)之-Flink學(xué)習(xí)筆記
好文鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。