欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:大數(shù)據(jù) Flink SQL

柚子快報邀請碼778899分享:大數(shù)據(jù) Flink SQL

http://yzkb.51969.com/

文章目錄

一、Flink SQL1、sql-client準(zhǔn)備1.1 基于yarn-session模式1.2 常用配置

2、流處理中的表2.1 動態(tài)表和持續(xù)查詢2.2 將流轉(zhuǎn)換成動態(tài)表2.3 用SQL持續(xù)查詢2.4 將動態(tài)表轉(zhuǎn)換為流

3、時間屬性3.1 事件時間3.2 處理時間

4、DDL(Data Definition Language)數(shù)據(jù)定義4.1 數(shù)據(jù)庫4.2 表

5、查詢5.1 DataGen & Print5.2 With子句5.3 SELECT & WHERE 子句5.4 分組聚合5.5 分組窗口聚合5.6 窗口表值函數(shù)(TVF)聚合5.7 Over 聚合5.8 特殊語法 —— TOP-N5.9 特殊語法 —— Deduplication去重5.10 聯(lián)結(jié)(Join)查詢常規(guī)聯(lián)結(jié)查詢間隔聯(lián)結(jié)查詢維表聯(lián)結(jié)查詢

5.11 Order by 和 limit5.12 SQL Hints5.13 集合操作5.14 系統(tǒng)函數(shù)標(biāo)量函數(shù)(Scalar Functions)聚合函數(shù)(Aggregate Functions)

5.15 Module操作

6、常用Connector讀寫6.1 Kafka6.2 File6.3 JDBC(MySQL)

7、sql-client 中使用 savepoint8、Catalog8.1 Catalog類型8.2 JdbcCatalog(MySQL)8.3 HiveCatalog

9、代碼中使用FlinkSQL9.1 環(huán)境準(zhǔn)備9.2 創(chuàng)建表環(huán)境9.3 創(chuàng)建表9.4 表的查詢9.5 輸出表9.6 表和流的轉(zhuǎn)換將流(DataStream)轉(zhuǎn)換成表(Table)將表(Table)轉(zhuǎn)換成流(DataStream)支持的數(shù)據(jù)類型綜合應(yīng)用示例

9.7 自定義函數(shù)(UDF)整體調(diào)用流程標(biāo)量函數(shù)(Scalar Functions)表函數(shù)(Table Functions)聚合函數(shù)(Aggregate Functions)表聚合函數(shù)(Table Aggregate Functions)

一、Flink SQL

Table API和SQL是最上層的API,在Flink中這兩種API被集成在一起,SQL執(zhí)行的對象也是Flink中的表(Table),所以我們一般會認為它們是一體的。Flink是批流統(tǒng)一的處理框架,無論是批處理(DataSet API)還是流處理(DataStream API),在上層應(yīng)用中都可以直接使用Table API或者SQL來實現(xiàn);這兩種API對于一張表執(zhí)行相同的查詢操作,得到的結(jié)果是完全一樣的。我們主要還是以流處理應(yīng)用為例進行講解。

SQL API 是基于 SQL 標(biāo)準(zhǔn)的 Apache Calcite 框架實現(xiàn)的,可通過純 SQL 來開發(fā)和運行一個Flink 任務(wù)

1、sql-client準(zhǔn)備

1.1 基于yarn-session模式

# 啟動Flink

/opt/module/flink/bin/yarn-session.sh -d

# 啟動Flink的sql-client

/opt/module/flink/bin/sql-client.sh embedded -s yarn-session

1.2 常用配置

# 結(jié)果顯示模式

# 默認table,還可以設(shè)置為tableau、changelog

SET sql-client.execution.result-mode=tableau;

# 執(zhí)行環(huán)境

SET execution.runtime-mode=streaming; #默認streaming,也可以設(shè)置batch

# 默認并行度

SET parallelism.default=1;

# 設(shè)置狀態(tài)TTL

SET table.exec.state.ttl=1000;

# 通過sql文件初始化

vim conf/sql-client-init.sql

SET sql-client.execution.result-mode=tableau;

CREATE DATABASE mydatabase;

# 啟動時,指定sql文件

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql

2、流處理中的表

關(guān)系型表/SQL流處理處理的數(shù)據(jù)對象字段元組的有界集合字段元組的無限序列查詢(Query)對數(shù)據(jù)的訪問可以訪問到完整的數(shù)據(jù)輸入無法訪問到所有數(shù)據(jù),必須“持續(xù)”等待流式輸入查詢終止條件生成固定大小的結(jié)果集后終止永不停止,根據(jù)持續(xù)收到的數(shù)據(jù)不斷更新查詢結(jié)果

2.1 動態(tài)表和持續(xù)查詢

動態(tài)表(Dynamic Tables)

當(dāng)流中有新數(shù)據(jù)到來,初始的表中會插入一行;而基于這個表定義的SQL查詢,就應(yīng)該在之前的基礎(chǔ)上更新結(jié)果。這樣得到的表就會不斷地動態(tài)變化,被稱為“動態(tài)表”(Dynamic Tables)

動態(tài)表是Flink在Table API和SQL中的核心概念,它為流數(shù)據(jù)處理提供了表和SQL支持。我們所熟悉的表一般用來做批處理,面向的是固定的數(shù)據(jù)集,可以認為是“靜態(tài)表”;而動態(tài)表則完全不同,它里面的數(shù)據(jù)會隨時間變化

持續(xù)查詢(Continuous Query)

動態(tài)表可以像靜態(tài)的批處理表一樣進行查詢操作。由于數(shù)據(jù)在不斷變化,因此基于它定義的SQL查詢也不可能執(zhí)行一次就得到最終結(jié)果。這樣一來,我們對動態(tài)表的查詢也就永遠不會停止,一直在隨著新數(shù)據(jù)的到來而繼續(xù)執(zhí)行。這樣的查詢就被稱作“持續(xù)查詢”(Continuous Query)。對動態(tài)表定義的查詢操作,都是持續(xù)查詢;而持續(xù)查詢的結(jié)果也會是一個動態(tài)表。

由于每次數(shù)據(jù)到來都會觸發(fā)查詢操作,因此可以認為一次查詢面對的數(shù)據(jù)集,就是當(dāng)前輸入動態(tài)表中收到的所有數(shù)據(jù)。這相當(dāng)于是對輸入動態(tài)表做了一個“快照”(snapshot),當(dāng)作有限數(shù)據(jù)集進行批處理;流式數(shù)據(jù)的到來會觸發(fā)連續(xù)不斷的快照查詢,像動畫一樣連貫起來,就構(gòu)成了“持續(xù)查詢”

持續(xù)查詢的步驟如下:

流(stream)被轉(zhuǎn)換為動態(tài)表(dynamic table);對動態(tài)表進行持續(xù)查詢(continuous query),生成新的動態(tài)表;生成的動態(tài)表被轉(zhuǎn)換成流

2.2 將流轉(zhuǎn)換成動態(tài)表

如果把流看作一張表,那么流中每個數(shù)據(jù)的到來,都應(yīng)該看作是對表的一次插入(Insert)操作,會在表的末尾添加一行數(shù)據(jù)。因為流是連續(xù)不斷的,而且之前的輸出結(jié)果無法改變、只能在后面追加;所以我們其實是通過一個只有插入操作(insert-only)的更新日志(changelog)流,來構(gòu)建一個表。

2.3 用SQL持續(xù)查詢

更新(Update)查詢

// 我們在代碼中定義了一個SQL查詢

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

當(dāng)原始動態(tài)表不停地插入新的數(shù)據(jù)時,查詢得到的urlCountTable會持續(xù)地進行更改。由于count數(shù)量可能會疊加增長,因此這里的更改操作可以是簡單的插入(Insert),也可以是對之前數(shù)據(jù)的更新(Update)。這種持續(xù)查詢被稱為更新查詢(Update Query),更新查詢得到的結(jié)果表如果想要轉(zhuǎn)換成DataStream,必須調(diào)用toChangelogStream()方法

追加(Append)查詢

// 如果我們執(zhí)行一個簡單的條件查詢,結(jié)果表中就會像原始表EventTable一樣,只有插入(Insert)操作了

Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");

// 這樣的持續(xù)查詢,就被稱為追加查詢(Append Query),它定義的結(jié)果表的更新日志(changelog)流中只有INSERT操作。

由于窗口的統(tǒng)計結(jié)果是一次性寫入結(jié)果表的,所以結(jié)果表的更新日志流中只會包含插入INSERT操作,而沒有更新UPDATE操作。所以這里的持續(xù)查詢,依然是一個追加(Append)查詢。結(jié)果表result如果轉(zhuǎn)換成DataStream,可以直接調(diào)用toDataStream()方法。

2.4 將動態(tài)表轉(zhuǎn)換為流

與關(guān)系型數(shù)據(jù)庫中的表一樣,動態(tài)表也可以通過插入(Insert)、更新(Update)和刪除(Delete)操作,進行持續(xù)的更改。將動態(tài)表轉(zhuǎn)換為流或?qū)⑵鋵懭胪獠肯到y(tǒng)時,就需要對這些更改操作進行編碼,通過發(fā)送編碼消息的方式告訴外部系統(tǒng)要執(zhí)行的操作。在Flink中,Table API和SQL支持三種編碼方式:

僅追加(Append-only)流:僅通過插入(Insert)更改來修改的動態(tài)表,可以直接轉(zhuǎn)換為“僅追加”流。這個流中發(fā)出的數(shù)據(jù),其實就是動態(tài)表中新增的每一行撤回(Retract)流:撤回流是包含兩類消息的流,添加(add)消息和撤回(retract)消息。具體的編碼規(guī)則是:INSERT插入操作編碼為add消息;DELETE刪除操作編碼為retract消息;而UPDATE更新操作則編碼為被更改行的retract消息,和更新后行(新行)的add消息。這樣,我們可以通過編碼后的消息指明所有的增刪改操作,一個動態(tài)表就可以轉(zhuǎn)換為撤回流了。

更新插入(Upsert)流:更新插入流中只包含兩種類型的消息:更新插入(upsert)消息和刪除(delete)消息。所謂的“upsert”其實是“update”和“insert”的合成詞,所以對于更新插入流來說,INSERT插入操作和UPDATE更新操作,統(tǒng)一被編碼為upsert消息;而DELETE刪除操作則被編碼為delete消息

在代碼里將動態(tài)表轉(zhuǎn)換為DataStream時,只支持僅追加(append-only)和撤回(retract)流,我們調(diào)用toChangelogStream()得到的其實就是撤回流。而連接到外部系統(tǒng)時,則可以支持不同的編碼方法,這取決于外部系統(tǒng)本身的特性

3、時間屬性

基于時間的操作(比如時間窗口),需要定義相關(guān)的時間語義和時間數(shù)據(jù)來源的信息。在Table API和SQL中,會給表單獨提供一個邏輯上的時間字段,專門用來在表處理程序中指示時間。

所以所謂的時間屬性(time attributes),其實就是每個表模式結(jié)構(gòu)(schema)的一部分。它可以在創(chuàng)建表的DDL里直接定義為一個字段,也可以在DataStream轉(zhuǎn)換成表時定義。一旦定義了時間屬性,它就可以作為一個普通字段引用,并且可以在基于時間的操作中使用。

時間屬性的數(shù)據(jù)類型必須為TIMESTAMP,它的行為類似于常規(guī)時間戳,可以直接訪問并且進行計算。按照時間語義的不同,可以把時間屬性的定義分成**事件時間(event time)和處理時間(processing time)**兩種情況

3.1 事件時間

## 事件時間屬性可以在創(chuàng)建表DDL中定義,增加一個字段,通過WATERMARK語句來定義事件時間屬性

CREATE TABLE EventTable(

user STRING,

url STRING,

ts TIMESTAMP(3),

WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

) WITH (

...

);

## 這里我們把ts字段定義為事件時間屬性,而且基于ts設(shè)置了5秒的水位線延遲

## 時間戳類型必須是 TIMESTAMP 或者TIMESTAMP_LTZ 類型。

## 但是時間戳一般都是秒或者是毫秒(BIGINT 類型),這種情況可以通過如下方式轉(zhuǎn)換

ts BIGINT,

time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),

3.2 處理時間

在定義處理時間屬性時,必須要額外聲明一個字段,專門用來保存當(dāng)前的處理時間。在創(chuàng)建表的DDL(CREATE TABLE語句)中,可以增加一個額外的字段,通過調(diào)用系統(tǒng)內(nèi)置的PROCTIME()函數(shù)來指定當(dāng)前的處理時間屬性

CREATE TABLE EventTable(

user STRING,

url STRING,

ts AS PROCTIME()

) WITH (

...

);

4、DDL(Data Definition Language)數(shù)據(jù)定義

4.1 數(shù)據(jù)庫

## 創(chuàng)建數(shù)據(jù)庫

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name

[COMMENT database_comment]

WITH (key1=val1, key2=val2, ...)

# 舉例

CREATE DATABASE db_flink;

## 查詢數(shù)據(jù)庫

SHOW DATABASES;

# 查詢當(dāng)前數(shù)據(jù)庫

SHOW CURRENT DATABASE;

# 修改數(shù)據(jù)庫

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

# 刪除數(shù)據(jù)庫

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

# RESTRICT:刪除非空數(shù)據(jù)庫會觸發(fā)異常。默認啟用

# CASCADE:刪除非空數(shù)據(jù)庫也會刪除所有相關(guān)的表和函數(shù)

DROP DATABASE db_flink2;

## 切換當(dāng)前數(shù)據(jù)庫

USE database_name;

4.2 表

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name

(

{ | | }[ , ...n]

[ ]

[ ][ , ...n]

)

[COMMENT table_comment]

[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

WITH (key1=val1, key2=val2, ...)

[ LIKE source_table [( )] | AS select_query ]

physical_column_definition

物理列是數(shù)據(jù)庫中所說的常規(guī)列。其定義了物理介質(zhì)中存儲的數(shù)據(jù)中字段的名稱、類型和順序。其他類型的列可以在物理列之間聲明,但不會影響最終的物理列的讀取

metadata_column_definition

元數(shù)據(jù)列是 SQL 標(biāo)準(zhǔn)的擴展,允許訪問數(shù)據(jù)源本身具有的一些元數(shù)據(jù)。元數(shù)據(jù)列由 METADATA 關(guān)鍵字標(biāo)識。例如,我們可以使用元數(shù)據(jù)列從Kafka記錄中讀取和寫入時間戳,用于基于時間的操作(這個時間戳不是數(shù)據(jù)中的某個時間戳字段,而是數(shù)據(jù)寫入 Kafka 時,Kafka 引擎給這條數(shù)據(jù)打上的時間戳標(biāo)記)。connector和format文檔列出了每個組件可用的元數(shù)據(jù)字段

CREATE TABLE MyTable (

`user_id` BIGINT,

`name` STRING,

`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'

) WITH (

'connector' = 'kafka'

...

);

## 如果自定義的列名稱和 Connector 中定義 metadata 字段的名稱一樣, FROM xxx 子句可省略

CREATE TABLE MyTable (

`user_id` BIGINT,

`name` STRING,

`timestamp` TIMESTAMP_LTZ(3) METADATA

) WITH (

'connector' = 'kafka'

...

);

## 如果自定義列的數(shù)據(jù)類型和 Connector 中定義的 metadata 字段的數(shù)據(jù)類型不一致,程序運行時會自動 cast強轉(zhuǎn),但是這要求兩種數(shù)據(jù)類型是可以強轉(zhuǎn)的

CREATE TABLE MyTable (

`user_id` BIGINT,

`name` STRING,

-- 將時間戳強轉(zhuǎn)為 BIGINT

`timestamp` BIGINT METADATA

) WITH (

'connector' = 'kafka'

...

);

## 默認情況下,F(xiàn)link SQL planner 認為 metadata 列可以讀取和寫入。

## 然而,在許多情況下,外部系統(tǒng)提供的只讀元數(shù)據(jù)字段比可寫字段多。因此,可以使用VIRTUAL關(guān)鍵字排除元數(shù)據(jù)列的持久化(表示只讀)

CREATE TABLE MyTable (

`timestamp` BIGINT METADATA,

`offset` BIGINT METADATA VIRTUAL,

`user_id` BIGINT,

`name` STRING,

) WITH (

'connector' = 'kafka'

...

);

computed_column_definition

計算列是使用語法column_name AS computed_column_expression生成的虛擬列。計算列就是拿已有的一些列經(jīng)過一些自定義的運算生成的新列,在物理上并不存儲在表中,只能讀不能寫。列的數(shù)據(jù)類型從給定的表達式自動派生,無需手動聲明。

CREATE TABLE MyTable (

`user_id` BIGINT,

`price` DOUBLE,

`quantity` DOUBLE,

`cost` AS price * quanitity

) WITH (

'connector' = 'kafka'

...

);

定義Watermark

Flink SQL 提供了幾種 WATERMARK 生產(chǎn)策略:

(1) 嚴(yán)格升序:WATERMARK FOR rowtime_column AS rowtime_column。Flink 任務(wù)認為時間戳只會越來越大,也不存在相等的情況,只要相等或者小于之前的,就認為是遲到的數(shù)據(jù)。

(2) 遞增:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND 。一般基本不用這種方式。如果設(shè)置此類,則允許有相同的時間戳出現(xiàn)。

(3) 有界無序: WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘string’ timeUnit 。此類策略就可以用于設(shè)置最大亂序時間,假如設(shè)置為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND ,則生成的是運行 5s 延遲的Watermark。一般都用這種 Watermark 生成策略,此類 Watermark 生成策略通常用于有數(shù)據(jù)亂序的場景中,而對應(yīng)到實際的場景中,數(shù)據(jù)都是會存在亂序的,所以基本都使用此類策略。

PRIMARY KEY

主鍵約束表明表中的一列或一組列是唯一的,并且它們不包含NULL值。主鍵唯一地標(biāo)識表中的一行,只支持 not enforced

CREATE TABLE MyTable (

`user_id` BIGINT,

`name` STRING,

PARYMARY KEY(user_id) not enforced

) WITH (

'connector' = 'kafka'

...

);

PARTITIONED BY

創(chuàng)建分區(qū)表

with語句

用于創(chuàng)建表的表屬性,用于指定外部存儲系統(tǒng)的元數(shù)據(jù)信息。配置屬性時,表達式key1=val1的鍵和值都應(yīng)該是字符串字面值。如下是Kafka的映射表

CREATE TABLE KafkaTable (

`user_id` BIGINT,

`name` STRING,

`ts` TIMESTAMP(3) METADATA FROM 'timestamp'

) WITH (

'connector' = 'kafka',

'topic' = 'user_behavior',

'properties.bootstrap.servers' = 'localhost:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv'

)

一般 with 中的配置項由 Flink SQL 的 Connector(鏈接外部存儲的連接器) 來定義,每種 Connector 提供的with 配置項都是不同的。

LIKE

用于基于現(xiàn)有表的定義創(chuàng)建表。此外,用戶可以擴展原始表或排除表的某些部分??梢允褂迷撟泳渲赜?可能還會覆蓋)某些連接器屬性,或者向外部定義的表添加水印。

CREATE TABLE Orders (

`user` BIGINT,

product STRING,

order_time TIMESTAMP(3)

) WITH (

'connector' = 'kafka',

'scan.startup.mode' = 'earliest-offset'

);

CREATE TABLE Orders_with_watermark (

-- Add watermark definition

WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

) WITH (

-- Overwrite the startup-mode

'scan.startup.mode' = 'latest-offset'

)

LIKE Orders;

AS select_statement(CTAS)

在一個create-table-as-select (CTAS)語句中,還可以通過查詢的結(jié)果創(chuàng)建和填充表。CTAS是使用單個命令創(chuàng)建數(shù)據(jù)并向表中插入數(shù)據(jù)的最簡單、最快速的方法。

CREATE TABLE my_ctas_table

WITH (

'connector' = 'kafka',

...

)

AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

# 注意:CTAS有以下限制:

# 暫不支持創(chuàng)建臨時表。

# 目前還不支持指定顯式列。

# 還不支持指定顯式水印。

# 目前還不支持創(chuàng)建分區(qū)表。

# 目前還不支持指定主鍵約束

## 舉例

CREATE TABLE test(

id INT,

ts BIGINT,

vc INT

) WITH (

'connector' = 'print'

);

CREATE TABLE test1 (

`value` STRING

)

LIKE test;

常用操作

## 查看表

# 查看所有表

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE ]

# 如果沒有指定數(shù)據(jù)庫,則從當(dāng)前數(shù)據(jù)庫返回表。

# LIKE子句中sql pattern的語法與MySQL方言的語法相同:

# %匹配任意數(shù)量的字符,甚至零字符,\%匹配一個'%'字符。

# _只匹配一個字符,\_只匹配一個'_'字符

## 查看表信息

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

## 修改表

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

## 刪除表

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

5、查詢

5.1 DataGen & Print

CREATE TABLE source (

id INT,

ts BIGINT,

vc INT

) WITH (

'connector' = 'datagen',

'rows-per-second'='1',

'fields.id.kind'='random',

'fields.id.min'='1',

'fields.id.max'='10',

'fields.ts.kind'='sequence',

'fields.ts.start'='1',

'fields.ts.end'='1000000',

'fields.vc.kind'='random',

'fields.vc.min'='1',

'fields.vc.max'='100'

);

# 這樣創(chuàng)建的 "source" 表將包含三列數(shù)據(jù):id(隨機整數(shù)在1和10之間)、

# ts(從1遞增到1000000的遞增序列)和vc(隨機整數(shù)在1和100之間)。

# 每秒鐘會生成一行這樣的模擬測試數(shù)據(jù)。您可以使用這個表作為 Flink 程序的輸入源,對數(shù)據(jù)進行處理和分析。

CREATE TABLE sink (

id INT,

ts BIGINT,

vc INT

) WITH (

'connector' = 'print'

);

# 查詢源表

select * from source;

# 插入sink表并查詢,這個需要去flink UI界面查看輸出

INSERT INTO sink select * from source;

select * from sink;

5.2 With子句

WITH提供了一種編寫輔助語句的方法,以便在較大的查詢中使用。這些語句通常被稱為公共表表達式(Common Table Expression, CTE),可以認為它們定義了僅為一個查詢而存在的臨時視圖

WITH [ , ... ]

SELECT ... FROM ...;

:

with_item_name (column_name[, ...n]) AS ( )

## 舉例

WITH source_with_total AS (

SELECT id, vc+10 AS total

FROM source

)

SELECT id, SUM(total)

FROM source_with_total

GROUP BY id;

5.3 SELECT & WHERE 子句

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

# 舉例

SELECT * FROM source;

SELECT id, vc + 10 FROM source;

-- 自定義 Source 的數(shù)據(jù)

SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (id, price);

SELECT vc + 10 FROM source WHERE id >10;

5.4 分組聚合

## group聚合案例

CREATE TABLE source1 (

dim STRING,

user_id BIGINT,

price BIGINT,

row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),

WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND

) WITH (

'connector' = 'datagen',

'rows-per-second' = '10',

'fields.dim.length' = '1',

'fields.user_id.min' = '1',

'fields.user_id.max' = '100000',

'fields.price.min' = '1',

'fields.price.max' = '100000'

);

CREATE TABLE sink1 (

dim STRING,

pv BIGINT,

sum_price BIGINT,

max_price BIGINT,

min_price BIGINT,

uv BIGINT,

window_start bigint

) WITH (

'connector' = 'print'

);

insert into sink1

select dim,

count(*) as pv,

sum(price) as sum_price,

max(price) as max_price,

min(price) as min_price,

-- 計算 uv 數(shù)

count(distinct user_id) as uv,

cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start

from source1

group by

dim,

-- UNIX_TIMESTAMP得到秒的時間戳,將秒級別時間戳 / 60 轉(zhuǎn)化為 1min,

cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);

## 多維分析

# Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets

SELECT

supplier_id

, rating

, product_id

, COUNT(*)

FROM (

VALUES

('supplier1', 'product1', 4),

('supplier1', 'product2', 3),

('supplier2', 'product3', 3),

('supplier2', 'product4', 4)

)

-- 供應(yīng)商id、產(chǎn)品id、評級

AS Products(supplier_id, product_id, rating)

GROUP BY GROUPING SETS(

(supplier_id, product_id, rating),

(supplier_id, product_id),

(supplier_id, rating),

(supplier_id),

(product_id, rating),

(product_id),

(rating),

()

);

5.5 分組窗口聚合

從1.13版本開始,分組窗口聚合已經(jīng)標(biāo)記為過時,鼓勵使用更強大、更有效的窗口TVF聚合,在這里簡單做個介紹。直接把窗口自身作為分組key放在GROUP BY之后的,所以也叫“分組窗口聚合”。SQL查詢的分組窗口是通過 GROUP BY 子句定義的。類似于使用常規(guī) GROUP BY 語句的查詢,窗口分組語句的 GROUP BY 子句中帶有一個窗口函數(shù)為每個分組計算出一個結(jié)果。SQL中只支持基于時間的窗口,不支持基于元素個數(shù)的窗口

分組窗口函數(shù)描述TUMBLE(time_attr, interval)定義一個滾動窗口。滾動窗口把行分配到有固定持續(xù)時間( interval )的不重疊的連續(xù)窗口。比如,5 分鐘的滾動窗口以 5 分鐘為間隔對行進行分組。滾動窗口可以定義在事件時間(批處理、流處理)或處理時間(流處理)上。HOP(time_attr, interval, interval)定義一個跳躍的時間窗口(在 Table API 中稱為滑動窗口)?;瑒哟翱谟幸粋€固定的持續(xù)時間( 第二個 interval 參數(shù) )以及一個滑動的間隔(第一個 interval 參數(shù) )。若滑動間隔小于窗口的持續(xù)時間,滑動窗口則會出現(xiàn)重疊;因此,行將會被分配到多個窗口中。比如,一個大小為 15 分組的滑動窗口,其滑動間隔為 5 分鐘,將會把每一行數(shù)據(jù)分配到 3 個 15 分鐘的窗口中?;瑒哟翱诳梢远x在事件時間(批處理、流處理)或處理時間(流處理)上。SESSION(time_attr, interval)定義一個會話時間窗口。會話時間窗口沒有一個固定的持續(xù)時間,但是它們的邊界會根據(jù) interval 所定義的不活躍時間所確定;即一個會話時間窗口在定義的間隔時間內(nèi)沒有時間出現(xiàn),該窗口會被關(guān)閉。例如時間窗口的間隔時間是 30 分鐘,當(dāng)其不活躍的時間達到30分鐘后,若觀測到新的記錄,則會啟動一個新的會話時間窗口(否則該行數(shù)據(jù)會被添加到當(dāng)前的窗口),且若在 30 分鐘內(nèi)沒有觀測到新紀(jì)錄,這個窗口將會被關(guān)閉。會話時間窗口可以使用事件時間(批處理、流處理)或處理時間(流處理)。

# 數(shù)據(jù)準(zhǔn)備

CREATE TABLE ws (

id INT,

vc INT,

pt AS PROCTIME(), --處理時間

et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件時間

WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark

) WITH (

'connector' = 'datagen',

'rows-per-second' = '10',

'fields.id.min' = '1',

'fields.id.max' = '3',

'fields.vc.min' = '1',

'fields.vc.max' = '100'

);

# 滾動窗口示例(時間屬性字段,窗口長度)

select

id,

TUMBLE_START(et, INTERVAL '5' SECOND) wstart,

TUMBLE_END(et, INTERVAL '5' SECOND) wend,

sum(vc) sumVc

from ws

group by id, TUMBLE(et, INTERVAL '5' SECOND);

# 滑動窗口(時間屬性字段,滑動步長,窗口長度)

select

id,

HOP_START(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wstart,

HOP_END(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wend,

sum(vc) sumVc

from ws

group by id, HOP(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND);

# 會話窗口(時間屬性字段,會話間隔)

select

id,

SESSION_START(et, INTERVAL '5' SECOND) wstart,

SESSION_END(et, INTERVAL '5' SECOND) wend,

sum(vc) sumVc

from ws

group by id, SESSION(et, INTERVAL '5' SECOND);

5.6 窗口表值函數(shù)(TVF)聚合

對比GroupWindow,TVF窗口更有效和強大。包括:

提供更多的性能優(yōu)化手段支持GroupingSets語法可以在window聚合中使用TopN提供累積窗口

對于窗口表值函數(shù),窗口本身返回的是就是一個表,所以窗口會出現(xiàn)在FROM后面,GROUP BY后面的則是窗口新增的字段window_start和window_end

FROM TABLE(

窗口類型(TABLE 表名, DESCRIPTOR(時間字段),INTERVAL時間…)

)

GROUP BY [window_start,][window_end,] --可選

## 滾動窗口

SELECT

window_start,

window_end,

id , SUM(vc)

sumVC

FROM TABLE(

TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end, id;

## 滑動窗口

# 要求: 窗口長度=滑動步長的整數(shù)倍(底層會優(yōu)化成多個小滾動窗口)

SELECT window_start, window_end, id , SUM(vc) sumVC

FROM TABLE(

HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))

GROUP BY window_start, window_end, id;

## 累積窗口

# 累積窗口會在一定的統(tǒng)計周期內(nèi)進行累積計算。累積窗口中有兩個核心的參數(shù):最大窗口長度(max window size)和累積步長(step)。

# 所謂的最大窗口長度其實就是我們所說的“統(tǒng)計周期”,最終目的就是統(tǒng)計這段時間內(nèi)的數(shù)據(jù)

# 其實就是固定窗口間隔內(nèi)提前觸發(fā)的的滾動窗口 ,其實就是 Tumble Window + early-fire 的一個事件時間的版本。

# 例如,從每日零點到當(dāng)前這一分鐘繪制累積 UV,其中 10:00 時的 UV 表示從 00:00 到 10:00 的 UV 總數(shù)

# 累積窗口可以認為是首先開一個最大窗口大小的滾動窗口,然后根據(jù)用戶設(shè)置的觸發(fā)的時間間隔將這個滾動窗口拆分為多個窗口,這些窗口具有相同的窗口起點和不同的窗口終點

# 注意: 窗口最大長度 = 累積步長的整數(shù)倍

SELECT

window_start,

window_end,

id ,

SUM(vc) sumVC

FROM TABLE(

CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS))

GROUP BY window_start, window_end, id;

## grouping sets多維分析

SELECT

window_start,

window_end,

id ,

SUM(vc) sumVC

FROM TABLE(

TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end,

rollup( (id) )

-- cube( (id) )

-- grouping sets( (id),() )

;

5.7 Over 聚合

OVER聚合為一系列有序行的每個輸入行計算一個聚合值。與GROUP BY聚合相比,OVER聚合不會將每個組的結(jié)果行數(shù)減少為一行。相反,OVER聚合為每個輸入行生成一個聚合值??梢栽谑录r間或處理時間,以及指定為時間間隔、或行計數(shù)的范圍內(nèi),定義Over windows

## 語法

SELECT

agg_func(agg_col) OVER (

[PARTITION BY col1[, col2, ...]]

ORDER BY time_col

range_definition),

...

FROM ...

# ORDER BY:必須是時間戳列(事件時間、處理時間),只能升序

# PARTITION BY:標(biāo)識了聚合窗口的聚合粒度

# range_definition:這個標(biāo)識聚合窗口的聚合數(shù)據(jù)范圍,在 Flink 中有兩種指定數(shù)據(jù)范圍的方式。第一種為按照行數(shù)聚合,第二種為按照時間區(qū)間聚合

# 舉例

# 按照時間區(qū)間聚合

# 統(tǒng)計每個傳感器前10秒到現(xiàn)在收到的水位數(shù)據(jù)條數(shù)

SELECT

id,

et,

vc,

count(vc) OVER (

PARTITION BY id

ORDER BY et

RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW

) AS cnt

FROM ws

# 也可以用WINDOW子句來在SELECT外部單獨定義一個OVER窗口,可以多次使用

SELECT

id,

et,

vc,

count(vc) OVER w AS cnt,

sum(vc) OVER w AS sumVC

FROM ws

WINDOW w AS (

PARTITION BY id

ORDER BY et

RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW

)

## 按照行數(shù)聚合

# 統(tǒng)計每個傳感器前5條到現(xiàn)在數(shù)據(jù)的平均水位

SELECT

id,

et,

vc,

avg(vc) OVER (

PARTITION BY id

ORDER BY et

ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

) AS avgVC

FROM ws

# 也可以用WINDOW子句來在SELECT外部單獨定義一個OVER窗口

SELECT

id,

et,

vc,

avg(vc) OVER w AS avgVC,

count(vc) OVER w AS cnt

FROM ws

WINDOW w AS (

PARTITION BY id

ORDER BY et

ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

)

5.8 特殊語法 —— TOP-N

目前在Flink SQL中沒有能夠直接調(diào)用的TOP-N函數(shù),而是提供了稍微復(fù)雜些的變通實現(xiàn)方法,是固定寫法,特殊支持的over用法

## 語法

SELECT [column_list]

FROM (

SELECT [column_list],

ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]

ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum

FROM table_name)

WHERE rownum <= N [AND conditions]

# ROW_NUMBER() :標(biāo)識 TopN 排序子句

# PARTITION BY col1[, col2...] :標(biāo)識分區(qū)字段,代表按照這個 col 字段作為分區(qū)粒度對數(shù)據(jù)進行排序取 topN,比如下述案例中的 partition by key ,就是根據(jù)需求中的搜索關(guān)鍵詞(key)做為分區(qū)

# ORDER BY col1 [asc|desc][, col2 [asc|desc]...] :標(biāo)識 TopN 的排序規(guī)則,是按照哪些字段、順序或逆序進行排序,可以不是時間字段,也可以降序(TopN特殊支持)

# WHERE rownum <= N :這個子句是一定需要的,只有加上了這個子句,F(xiàn)link 才能將其識別為一個TopN 的查詢,其中 N 代表 TopN 的條目數(shù)

# [AND conditions] :其他的限制條件也可以加上

# 案例

select

id,

et,

vc,

rownum

from

(

select

id,

et,

vc,

row_number() over(

partition by id

order by vc desc

) as rownum

from ws

)

where rownum<=3;

5.9 特殊語法 —— Deduplication去重

去重,也即上文介紹到的TopN 中** row_number = 1** 的場景,但是這里有一點不一樣在于其排序字段一定是時間屬性列,可以降序,不能是其他非時間屬性的普通列。

在 row_number = 1 時,如果排序字段是普通列 planner 會翻譯成 TopN 算子,如果是時間屬性列 planner 會翻譯成 Deduplication,這兩者最終的執(zhí)行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對應(yīng)的優(yōu)化,性能會有很大提升??梢詮膚ebui看出是翻譯成哪種算子。

# 語法

SELECT [column_list]

FROM (

SELECT [column_list],

ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]

ORDER BY time_attr [asc|desc]) AS rownum

FROM table_name)

WHERE rownum = 1

## 案例

# 對每個傳感器的水位值去重

select

id,

et,

vc,

rownum

from

(

select

id,

et,

vc,

row_number() over(

partition by id,vc

order by et

) as rownum

from ws

)

where rownum=1;

5.10 聯(lián)結(jié)(Join)查詢

Flink SQL中的聯(lián)結(jié)查詢大體上也可以分為兩類:SQL原生的聯(lián)結(jié)查詢方式,和流處理中特有的聯(lián)結(jié)查詢

常規(guī)聯(lián)結(jié)查詢

常規(guī)聯(lián)結(jié)(Regular Join)是SQL中原生定義的Join方式,是最通用的一類聯(lián)結(jié)操作。它的具體語法與標(biāo)準(zhǔn)SQL的聯(lián)結(jié)完全相同,通過關(guān)鍵字JOIN來聯(lián)結(jié)兩個表,后面用關(guān)鍵字ON來指明聯(lián)結(jié)條件。與標(biāo)準(zhǔn)SQL一致,F(xiàn)link SQL的常規(guī)聯(lián)結(jié)也可以分為內(nèi)聯(lián)結(jié)(INNER JOIN)和外聯(lián)結(jié)(OUTER JOIN),區(qū)別在于結(jié)果中是否包含不符合聯(lián)結(jié)條件的行。Regular Join 包含以下幾種(以 L 作為左流中的數(shù)據(jù)標(biāo)識, R 作為右流中的數(shù)據(jù)標(biāo)識):

Inner Join(Inner Equal Join):流任務(wù)中,只有兩條流 Join 到才輸出,輸出 +[L, R]Left Join(Outer Equal Join):流任務(wù)中,左流數(shù)據(jù)到達之后,無論有沒有 Join 到右流的數(shù)據(jù),都會輸出(Join 到輸出 +[L, R] ,沒 Join 到輸出 +[L, null] ),如果右流之后數(shù)據(jù)到達之后,發(fā)現(xiàn)左流之前輸出過沒有 Join 到的數(shù)據(jù),則會發(fā)起回撤流,先輸出 -[L, null] ,然后輸出 +[L, R]Right Join(Outer Equal Join):有 Left Join 一樣,左表和右表的執(zhí)行邏輯完全相反Full Join(Outer Equal Join):流任務(wù)中,左流或者右流的數(shù)據(jù)到達之后,無論有沒有 Join 到另外一條流的數(shù)據(jù),都會輸出(對右流來說:Join 到輸出 +[L, R] ,沒 Join 到輸出 +[null, R] ;對左流來說:Join 到輸出 +[L, R] ,沒 Join 到輸出 +[L, null] )。如果一條流的數(shù)據(jù)到達之后,發(fā)現(xiàn)之前另一條流之前輸出過沒有 Join 到的數(shù)據(jù),則會發(fā)起回撤流(左流數(shù)據(jù)到達為例:回撤 -[null, R] ,輸出+[L, R] ,右流數(shù)據(jù)到達為例:回撤 -[L, null] ,輸出 +[L, R]

Regular Join 的注意事項:

實時 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 區(qū)別在于, 等值 join數(shù)據(jù) shuffle 策略是 Hash,會按照 Join on 中的等值條件作為 id 發(fā)往對應(yīng)的下游; 非等值 join 數(shù)據(jù) shuffle 策略是 Global,所有數(shù)據(jù)發(fā)往一個并發(fā),按照非等值條件進行關(guān)聯(lián)流的上游是無限的數(shù)據(jù),所以要做到關(guān)聯(lián)的話,F(xiàn)link 會將兩條流的所有數(shù)據(jù)都存儲在 State 中,所以 Flink 任務(wù)的 State 會無限增大,因此你需要為 State 配置合適的 TTL,以防止 State 過大。

## 再準(zhǔn)備一張表用于join

CREATE TABLE ws1 (

id INT,

vc INT,

pt AS PROCTIME(), --處理時間

et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件時間

WATERMARK FOR et AS et - INTERVAL '0.001' SECOND --watermark

) WITH (

'connector' = 'datagen',

'rows-per-second' = '1',

'fields.id.min' = '3',

'fields.id.max' = '5',

'fields.vc.min' = '1',

'fields.vc.max' = '100'

);

## 等值內(nèi)聯(lián)結(jié)(INNER Equi-JOIN)

# 內(nèi)聯(lián)結(jié)用INNER JOIN來定義,會返回兩表中符合聯(lián)接條件的所有行的組合,也就是所謂的笛卡爾積(Cartesian product)。目前僅支持等值聯(lián)結(jié)條件

SELECT *

FROM ws

INNER JOIN ws1

ON ws.id = ws1.id;

## 等值外聯(lián)結(jié)(OUTER Equi-JOIN)

# Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分別表示會將左側(cè)表、右側(cè)表以及雙側(cè)表中沒有任何匹配的行返回。

SELECT *

FROM ws

LEFT JOIN ws1

ON ws.id = ws1.id;

SELECT *

FROM ws

RIGHT JOIN ws1

ON ws.id = ws1.id;

SELECT *

FROM ws

FULL OUTER JOIN ws1

ON ws.id = ws1.id;

間隔聯(lián)結(jié)查詢

兩條流的Join就對應(yīng)著SQL中兩個表的Join,這是流處理中特有的聯(lián)結(jié)方式。目前Flink SQL還不支持窗口聯(lián)結(jié),而間隔聯(lián)結(jié)則已經(jīng)實現(xiàn)。間隔聯(lián)結(jié)(Interval Join)返回的,同樣是符合約束條件的兩條中數(shù)據(jù)的笛卡爾積。只不過這里的“約束條件”除了常規(guī)的聯(lián)結(jié)條件外,還多了一個時間間隔的限制。具體語法有以下要點:

兩表的聯(lián)結(jié)

間隔聯(lián)結(jié)不需要用JOIN關(guān)鍵字,直接在FROM后將要聯(lián)結(jié)的兩表列出來就可以,用逗號分隔。這與標(biāo)準(zhǔn)SQL中的語法一致,表示一個“交叉聯(lián)結(jié)”(Cross Join),會返回兩表中所有行的笛卡爾積

聯(lián)結(jié)條件

聯(lián)結(jié)條件用WHERE子句來定義,用一個等值表達式描述。交叉聯(lián)結(jié)之后再用WHERE進行條件篩選,效果跟內(nèi)聯(lián)結(jié)INNER JOIN … ON …非常類似

時間間隔限制

我們可以在WHERE子句中,聯(lián)結(jié)條件后用AND追加一個時間間隔的限制條件;做法是提取左右兩側(cè)表中的時間字段,然后用一個表達式來指明兩者需要滿足的間隔限制。具體定義方式有下面三種,這里分別用ltime和rtime表示左右表中的時間字段:

(1)ltime = rtime

(2)ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE

(3)ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

SELECT *

FROM ws,ws1

WHERE ws.id = ws1. id

AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND

維表聯(lián)結(jié)查詢

Lookup Join 其實就是維表 Join,實時獲取外部緩存的 Join,Lookup 的意思就是實時查找。上面說的這幾種 Join 都是流與流之間的 Join,而 Lookup Join 是流與 Redis,Mysql,HBase 這種外部存儲介質(zhì)的 Join。僅支持處理時間字段

表A

JOIN 維度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 別名

ON xx.字段=別名.字段

## 比如維表在mysql,維表join的寫法如下

CREATE TABLE Customers (

id INT,

name STRING,

country STRING,

zip STRING

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://hadoop102:3306/customerdb',

'table-name' = 'customers'

);

-- order表每來一條數(shù)據(jù),都會去mysql的customers表查找維度數(shù)據(jù)

SELECT o.order_id, o.total, c.country, c.zip

FROM Orders AS o

JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c

ON o.customer_id = c.id;

5.11 Order by 和 limit

## 支持 Batch\Streaming,但在實時任務(wù)中一般用的非常少

## 實時任務(wù)中,Order By 子句中必須要有時間屬性字段,并且必須寫在最前面且為升序

SELECT *

FROM ws

ORDER BY et, id desc

SELECT *

FROM ws

LIMIT 3

5.12 SQL Hints

## 在執(zhí)行查詢時,可以在表名后面添加SQL Hints來臨時修改表屬性,對當(dāng)前job生效

select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;

5.13 集合操作

## UNION 和 UNION ALL

# UNION:將集合合并并且去重

# UNION ALL:將集合合并,不做去重

(SELECT id FROM ws) UNION (SELECT id FROM ws1);

(SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);

## Intersect 和 Intersect All

# ntersect:交集并且去重

# Intersect ALL:交集不做去重

(SELECT id FROM ws) INTERSECT (SELECT id FROM ws1);

(SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);

## Except 和 Except All

# Except:差集并且去重

# Except ALL:差集不做去重

(SELECT id FROM ws) EXCEPT (SELECT id FROM ws1);

(SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);

# 上述 SQL 在流式任務(wù)中,如果一條左流數(shù)據(jù)先來了,沒有從右流集合數(shù)據(jù)中找到對應(yīng)的數(shù)據(jù)時會直接輸出,當(dāng)右流對應(yīng)數(shù)據(jù)后續(xù)來了之后,會下發(fā)回撤流將之前的數(shù)據(jù)給撤回。這也是一個回撤流

## In 子查詢

# In 子查詢的結(jié)果集只能有一列

SELECT id, vc

FROM ws

WHERE id IN (

SELECT id FROM ws1

)

# 上述 SQL 的 In 子句和之前介紹到的 Inner Join 類似。并且 In 子查詢也會涉及到大狀態(tài)問題,要注意設(shè)置 State 的 TTL

5.14 系統(tǒng)函數(shù)

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/

系統(tǒng)函數(shù)(System Functions)也叫內(nèi)置函數(shù)(Built-in Functions),是在系統(tǒng)中預(yù)先實現(xiàn)好的功能模塊。我們可以通過固定的函數(shù)名直接調(diào)用,實現(xiàn)想要的轉(zhuǎn)換操作。Flink SQL提供了大量的系統(tǒng)函數(shù),幾乎支持所有的標(biāo)準(zhǔn)SQL中的操作,這為我們使用SQL編寫流處理程序提供了極大的方便。Flink SQL中的系統(tǒng)函數(shù)又主要可以分為兩大類:標(biāo)量函數(shù)(Scalar Functions)和聚合函數(shù)(Aggregate Functions)。

標(biāo)量函數(shù)(Scalar Functions)

標(biāo)量函數(shù)指的就是只對輸入數(shù)據(jù)做轉(zhuǎn)換操作、返回一個值的函數(shù)。標(biāo)量函數(shù)是最常見、也最簡單的一類系統(tǒng)函數(shù),數(shù)量非常龐大,很多在標(biāo)準(zhǔn)SQL中也有定義。所以我們這里只對一些常見類型列舉部分函數(shù),做一個簡單概述,具體應(yīng)用可以查看官網(wǎng)的完整函數(shù)列表

比較函數(shù)(Comparison Functions)

比較函數(shù)其實就是一個比較表達式,用來判斷兩個值之間的關(guān)系,返回一個布爾類型的值。這個比較表達式可以是用 <、>、= 等符號連接兩個值,也可以是用關(guān)鍵字定義的某種判斷。例如:

(1)value1 = value2 判斷兩個值相等;

(2)value1 <> value2 判斷兩個值不相等

(3)value IS NOT NULL 判斷value不為空

邏輯函數(shù)(Logical Functions)

邏輯函數(shù)就是一個邏輯表達式,也就是用與(AND)、或(OR)、非(NOT)將布爾類型的值連接起來,也可以用判斷語句(IS、IS NOT)進行真值判斷;返回的還是一個布爾類型的值。例如:

(1)boolean1 OR boolean2 布爾值boolean1與布爾值boolean2取邏輯或

(2)boolean IS FALSE 判斷布爾值boolean是否為false

(3)NOT boolean 布爾值boolean取邏輯非

算術(shù)函數(shù)(Arithmetic Functions)

進行算術(shù)計算的函數(shù),包括用算術(shù)符號連接的運算,和復(fù)雜的數(shù)學(xué)運算。例如:

(1)numeric1 + numeric2 兩數(shù)相加

(2)POWER(numeric1, numeric2) 冪運算,取數(shù)numeric1的numeric2次方

(3)RAND() 返回(0.0, 1.0)區(qū)間內(nèi)的一個double類型的偽隨機數(shù)

字符串函數(shù)(String Functions)

進行字符串處理的函數(shù)。例如:

(1)string1 || string2 兩個字符串的連接

(2)UPPER(string) 將字符串string轉(zhuǎn)為全部大寫

(3)CHAR_LENGTH(string) 計算字符串string的長度

時間函數(shù)(Temporal Functions)

進行與時間相關(guān)操作的函數(shù)。例如:

(1)DATE string 按格式"yyyy-MM-dd"解析字符串string,返回類型為SQL Date

(2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回類型為SQL timestamp

(3)CURRENT_TIME 返回本地時區(qū)的當(dāng)前時間,類型為SQL time(與LOCALTIME等價)

(4)INTERVAL string range 返回一個時間間隔。

聚合函數(shù)(Aggregate Functions)

聚合函數(shù)是以表中多個行作為輸入,提取字段進行聚合操作的函數(shù),會將唯一的聚合值作為結(jié)果返回。聚合函數(shù)應(yīng)用非常廣泛,不論分組聚合、窗口聚合還是開窗(Over)聚合,對數(shù)據(jù)的聚合操作都可以用相同的函數(shù)來定義。標(biāo)準(zhǔn)SQL中常見的聚合函數(shù)Flink SQL都是支持的,目前也在不斷擴展,為流處理應(yīng)用提供更強大的功能。例如:

(1)COUNT(*) 返回所有行的數(shù)量,統(tǒng)計個數(shù)。

(2)SUM([ ALL | DISTINCT ] expression) 對某個字段進行求和操作。默認情況下省略了關(guān)鍵字ALL,表示對所有行求和;如果指定DISTINCT,則會對數(shù)據(jù)進行去重,每個值只疊加一次。

(3)RANK() 返回當(dāng)前值在一組值中的排名。

(4)ROW_NUMBER() 對一組值排序后,返回當(dāng)前值的行號。

其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。

5.15 Module操作

目前 Flink 包含了以下三種 Module:

CoreModule:CoreModule 是 Flink 內(nèi)置的 Module,其包含了目前 Flink 內(nèi)置的所有 UDF,F(xiàn)link 默認開啟的 Module 就是 CoreModule,我們可以直接使用其中的 UDFHiveModule:HiveModule 可以將 Hive 內(nèi)置函數(shù)作為 Flink 的系統(tǒng)函數(shù)提供給 SQL\Table API 用戶進行使用,比如 get_json_object 這類 Hive 內(nèi)置函數(shù)(Flink 默認的 CoreModule 是沒有的)用戶自定義 Module:用戶可以實現(xiàn) Module 接口實現(xiàn)自己的 UDF 擴展 Module

## 使用 LOAD 子句去加載 Flink SQL 體系內(nèi)置的或者用戶自定義的 Module,UNLOAD 子句去卸載 Flink SQL 體系內(nèi)置的或者用戶自定義的 Module

-- 加載

LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]

-- 卸載

UNLOAD MODULE module_name

-- 查看

SHOW MODULES;

SHOW FULL MODULES;

在 Flink 中,Module 可以被 加載、啟用 、禁用 、卸載 Module,當(dāng)加載Module 之后,默認就是開啟的。同時支持多個 Module 的,并且根據(jù)加載 Module 的順序去按順序查找和解析 UDF,先查到的先解析使用。此外,F(xiàn)link 只會解析已經(jīng)啟用了的 Module。那么當(dāng)兩個 Module 中出現(xiàn)兩個同名的函數(shù)且都啟用時, Flink 會根據(jù)加載 Module 的順序進行解析,結(jié)果就是會使用順序為第一個的 Module 的 UDF,可以使用下面語法更改順序:USE MODULE hive,core;

USE是啟用module,沒有被use的為禁用(禁用不是卸載),除此之外還可以實現(xiàn)調(diào)整順序的效果。上面的語句會將 Hive Module 設(shè)為第一個使用及解析的 Module。

## 舉例

# 加載官方已經(jīng)提供的的 Hive Module,將 Hive 已有的內(nèi)置函數(shù)作為 Flink 的內(nèi)置函數(shù)。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一個 HiveModule

# https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hive/overview/

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar

cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar /opt/module/flink/lib/

# 注意:拷貝hadoop的包,解決依賴沖突問題

cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink/lib/

# 重啟flink集群和sql-client

# 啟動Flink

/opt/module/flink/bin/yarn-session.sh -d

# 啟動Flink的sql-client

/opt/module/flink/bin/sql-client.sh embedded -s yarn-session

# 加載hive module

-- hive-connector內(nèi)置了hive module,提供了hive自帶的系統(tǒng)函數(shù)

load module hive with ('hive-version'='3.1.3');

show modules;

show functions;

-- 可以調(diào)用hive的split函數(shù)

select split('a,b', ',');

6、常用Connector讀寫

DataGen和Print都是一種connector,其他connector參考官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/overview/

6.1 Kafka

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/

首先添加kafka連接器依賴,將flink-sql-connector-kafka-1.17.0.jar上傳到flink的lib目錄下,重啟yarn-session、sql-client

## 創(chuàng)建Kafka的映射表

CREATE TABLE t1(

`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',

--列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀

`partition` BIGINT METADATA VIRTUAL,

`offset` BIGINT METADATA VIRTUAL,

id int,

ts bigint ,

vc int )

WITH (

'connector' = 'kafka',

'properties.bootstrap.servers' = 'hadoop103:9092',

'properties.group.id' = 'atguigu',

-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'

'scan.startup.mode' = 'earliest-offset',

-- fixed為flink實現(xiàn)的分區(qū)器,一個并行度只寫往kafka一個分區(qū)

'sink.partitioner' = 'fixed',

'topic' = 'ws1',

'format' = 'json'

)

## 插入Kafka表

insert into t1(id,ts,vc) select * from source

## 查詢Kafka表

select * from t1

upsert-kafka表

如果當(dāng)前表存在更新操作,那么普通的kafka連接器將無法滿足,此時可以使用Upsert Kafka連接器。Upsert Kafka 連接器支持以 upsert 方式從 Kafka topic 中讀取數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka topic。

作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個更新或刪除事件。更準(zhǔn)確地說,數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一個 value 的 UPDATE,如果有這個 key(如果不存在相應(yīng)的 key,則該更新被視為 INSERT)。用表來類比,changelog 流中的數(shù)據(jù)記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因為任何具有相同 key 的現(xiàn)有行都被覆蓋。另外,value 為空的消息將會被視作為 DELETE 消息。

作為 sink,upsert-kafka 連接器可以消費 changelog 流。它會將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入,并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對數(shù)據(jù)進行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中

# 創(chuàng)建upsert-kafka的映射表(必須定義主鍵)

CREATE TABLE t2(

id int ,

sumVC int ,

primary key (id) NOT ENFORCED

)

WITH (

'connector' = 'upsert-kafka',

'properties.bootstrap.servers' = 'hadoop102:9092',

'topic' = 'ws2',

'key.format' = 'json',

'value.format' = 'json'

)

# 插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC from source group by id

# 查詢upsert-kafka表

# upsert-kafka 無法從指定的偏移量讀取,只會從主題的源讀取。如此,才知道整個數(shù)據(jù)的更新過程。并且通過 -U,+U,+I 等符號來顯示數(shù)據(jù)的變化過程

select * from t2

6.2 File

## 創(chuàng)建FileSystem映射表

CREATE TABLE t3( id int, ts bigint , vc int )

WITH (

'connector' = 'filesystem',

'path' = 'hdfs://hadoop102:8020/data/t3',

'format' = 'csv'

)

## 寫入

insert into t3 select * from source

## 查詢

select * from t3 where id = '1

# 如果報錯java.lang.classNotFoundException: org.apache.f1ink.table.planner.delegation.DialectFactory

# 因為之前l(fā)ib下放了sql-hive的連接器jar包,解決方案有兩種

# 將hive的連接器jar包挪走,重啟yarn-session、sql-client

# 和之前操作類似替換planner的jar包

6.3 JDBC(MySQL)

Flink在將數(shù)據(jù)寫入外部數(shù)據(jù)庫時使用DDL中定義的主鍵。如果定義了主鍵,則連接器以upsert模式操作,否則,連接器以追加模式操作。

在upsert模式下,F(xiàn)link會根據(jù)主鍵插入新行或更新現(xiàn)有行,F(xiàn)link這樣可以保證冪等性。為了保證輸出結(jié)果符合預(yù)期,建議為表定義主鍵,并確保主鍵是底層數(shù)據(jù)庫表的唯一鍵集或主鍵之一。在追加模式下,F(xiàn)link將所有記錄解釋為INSERT消息,如果底層數(shù)據(jù)庫中發(fā)生了主鍵或唯一約束違反,則INSERT操作可能會失敗

# mysql的test庫中建表

CREATE TABLE `ws2` (

`id` int(11) NOT NULL,

`ts` bigint(20) DEFAULT NULL,

`vc` int(11) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

添加JDBC連接器依賴,上傳jdbc連接器的jar包和mysql的連接驅(qū)動包到flink/lib下:

flink-connector-jdbc-1.17.jarmysql-connector-j-8.0.31.jar

# 創(chuàng)建JDBC映射表

CREATE TABLE t4

(

id INT,

ts BIGINT,

vc INT,

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector'='jdbc',

'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',

'username' = 'root',

'password' = '000000',

'connection.max-retry-timeout' = '60s',

'table-name' = 'ws2',

'sink.buffer-flush.max-rows' = '500',

'sink.buffer-flush.interval' = '5s',

'sink.max-retries' = '3',

'sink.parallelism' = '1'

);

# 查詢

select * from t4

# 寫入

insert into t4 select * from source

7、sql-client 中使用 savepoint

## 提交一個insert作業(yè),可以給作業(yè)設(shè)置名稱

INSERT INTO sink select * from source;

## 查看job列表

SHOW JOBS;

## 停止作業(yè),觸發(fā)savepoint

SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';

SET state.savepoints.dir='hdfs://hadoop102:8020/sp';

STOP JOB '228d70913eab60dda85c5e7f78b5782c' WITH SAVEPOINT;

## 從savepoint恢復(fù)

-- 設(shè)置從savepoint恢復(fù)的路徑

SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-37f5e6-0013a2874f0a';

-- 之后直接提交sql,就會從savepoint恢復(fù)

--允許跳過無法還原的保存點狀態(tài)

set 'execution.savepoint.ignore-unclaimed-state' = 'true';

## 恢復(fù)后重置路徑

# 指定execution.savepoint.path后,將影響后面執(zhí)行的所有DML語句,可以使用RESET命令重置這個配置選項

RESET execution.savepoint.path;

# 如果出現(xiàn)reset沒生效的問題,可能是個bug,我們可以退出sql-client,再重新進,不需要重啟flink的集群。

8、Catalog

Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。元數(shù)據(jù)可以是臨時的,例如臨時表、UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive MetaStore 中的元數(shù)據(jù)。Catalog 提供了一個統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢語句中來訪問。

Catalog 允許用戶引用其數(shù)據(jù)存儲系統(tǒng)中現(xiàn)有的元數(shù)據(jù),并自動將其映射到 Flink 的相應(yīng)元數(shù)據(jù)。例如,F(xiàn)link 可以直接使用 Hive MetaStore 中的表的元數(shù)據(jù),不必在Flink中手動重寫ddl,也可以將 Flink SQL 中的元數(shù)據(jù)存儲到 Hive MetaStore 中。Catalog 極大地簡化了用戶開始使用 Flink 的步驟,并極大地提升了用戶體驗

8.1 Catalog類型

GenericInMemoryCatalog:基于內(nèi)存實現(xiàn)的 Catalog,所有元數(shù)據(jù)只在session 的生命周期(即一個 Flink 任務(wù)一次運行生命周期內(nèi))內(nèi)可用。默認自動創(chuàng)建,會有名為“default_catalog”的內(nèi)存Catalog,這個Catalog默認只有一個名為“default_database”的數(shù)據(jù)庫。JdbcCatalog:JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫。Postgres Catalog和MySQL Catalog是目前僅有的兩種JDBC Catalog實現(xiàn),將元數(shù)據(jù)存儲在數(shù)據(jù)庫中。HiveCatalog:有兩個用途,一是單純作為 Flink 元數(shù)據(jù)的持久化存儲,二是作為讀寫現(xiàn)有 Hive 元數(shù)據(jù)的接口。注意:Hive MetaStore 以小寫形式存儲所有元數(shù)據(jù)對象名稱。Hive Metastore以小寫形式存儲所有元對象名稱,而 GenericInMemoryCatalog會區(qū)分大小寫。用戶自定義 Catalog:用戶可以實現(xiàn) Catalog 接口實現(xiàn)自定義 Catalog。從Flink1.16開始引入了用戶類加載器,通過CatalogFactory.Context#getClassLoader訪問,否則會報錯ClassNotFoundException

8.2 JdbcCatalog(MySQL)

JdbcCatalog不支持建表,只是打通flink與mysql的連接,可以去讀寫mysql現(xiàn)有的庫表,然后需要把1.17的jdbc包放到對應(yīng)目錄

# https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-jdbc/1.17-SNAPSHOT/

cp flink-connector-jdbc-1.17.jar /opt/module/flink/lib/

cp mysql-connector-j-8.0.31.jar /opt/module/flink/lib/

# 重啟flink集群和sql-client

# 創(chuàng)建Catalog

# JdbcCatalog支持以下選項:

# name:必需,Catalog名稱。

# default-database:必需,連接到的默認數(shù)據(jù)庫。

# username: 必需,Postgres/MySQL帳戶的用戶名。

# password:必需,該帳號的密碼。

# base-url:必需,數(shù)據(jù)庫的jdbc url(不包含數(shù)據(jù)庫名)

# 對于Postgres Catalog,是"jdbc:postgresql://:<端口>"

# 對于MySQL Catalog,是"jdbc: mysql://:<端口>"

CREATE CATALOG my_jdbc_catalog WITH(

'type' = 'jdbc',

'default-database' = 'test',

'username' = 'root',

'password' = '000000',

'base-url' = 'jdbc:mysql://hadoop102:3306'

);

# 查看Catalog

SHOW CATALOGS;

## 查看當(dāng)前的CATALOG

SHOW CURRENT CATALOG;

# 使用指定Catalog

USE CATALOG my_jdbc_catalog;

## 查看當(dāng)前的CATALOG

SHOW CURRENT CATALOG;

8.3 HiveCatalog

# 上傳所需jar包到lib下

cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink/lib/

cp mysql-connector-j-8.0.31.jar /opt/module/flink/lib/

# 更換planner依賴

# 只有在使用Hive方言或HiveServer2時才需要這樣額外的計劃器jar移動,但這是Hive集成的推薦設(shè)置

mv /opt/module/flink/opt/flink-table-planner_2.12-1.17.1.jar /opt/module/flink/lib/flink-table-planner_2.12-1.17.0.jar

mv /opt/module/flink/lib/flink-table-planner-loader-1.17.1.jar /opt/module/flink/opt/flink-table-planner-loader-1.17.0.jar

# 重啟flink集群和sql-client

# 啟動外置的hive metastore服務(wù)

# Hive metastore必須作為獨立服務(wù)運行,也就是hive-site中必須配置hive.metastore.uris

hive.metastore.uris

thrift://hadoop102:9083

hive --service metastore &

創(chuàng)建Catalog

配置項必需默認值類型說明typeYes(none)StringCatalog類型,創(chuàng)建HiveCatalog時必須設(shè)置為’hive’。nameYes(none)StringCatalog的唯一名稱hive-conf-dirNo(none)String包含hive -site.xml的目錄,需要Hadoop文件系統(tǒng)支持。如果沒指定hdfs協(xié)議,則認為是本地文件系統(tǒng)。如果不指定該選項,則在類路徑中搜索hive-site.xml。default-databaseNodefaultStringHive Catalog使用的默認數(shù)據(jù)庫hive-versionNo(none)StringHiveCatalog能夠自動檢測正在使用的Hive版本。建議不要指定Hive版本,除非自動檢測失敗。hadoop-conf-dirNo(none)StringHadoop conf目錄的路徑。只支持本地文件系統(tǒng)路徑。設(shè)置Hadoop conf的推薦方法是通過HADOOP_CONF_DIR環(huán)境變量。只有當(dāng)環(huán)境變量不適合你時才使用該選項,例如,如果你想分別配置每個HiveCatalog。

CREATE CATALOG myhive WITH (

'type' = 'hive',

'default-database' = 'default',

'hive-conf-dir' = '/opt/module/hive/conf'

);

# 查看Catalog

SHOW CATALOGS;

--查看當(dāng)前的CATALOG

SHOW CURRENT CATALOG;

# 使用指定Catalog

USE CATALOG myhive;

--查看當(dāng)前的CATALOG

SHOW CURRENT CATALOG;

# 建表,退出sql-client重進,查看catalog和表還在

## 讀寫Hive表

SHOW DATABASES; -- 可以看到hive的數(shù)據(jù)庫

USE test; -- 可以切換到hive的數(shù)據(jù)庫

SHOW TABLES; -- 可以看到hive的表

SELECT * from ws; --可以讀取hive表

INSERT INTO ws VALUES(1,1,1); -- 可以寫入hive表

# 斷開后可以持久化保存

9、代碼中使用FlinkSQL

9.1 環(huán)境準(zhǔn)備

引入相關(guān)依賴

org.apache.flink

flink-table-api-java-bridge

${flink.version}

這里的依賴是一個Java的“橋接器”(bridge),主要就是負責(zé)Table API和下層DataStream API的連接支持,按照不同的語言分為Java版和Scala版。如果我們希望在本地的集成開發(fā)環(huán)境(IDE)里運行Table API和SQL,還需要引入以下依賴:

org.apache.flink

flink-table-planner-loader

${flink.version}

org.apache.flink

flink-table-runtime

${flink.version}

org.apache.flink

flink-connector-files

${flink.version}

9.2 創(chuàng)建表環(huán)境

對于Flink這樣的流處理框架來說,數(shù)據(jù)流和表在結(jié)構(gòu)上還是有所區(qū)別的。所以使用Table API和SQL需要一個特別的運行時環(huán)境,這就是所謂的“表環(huán)境”(TableEnvironment)。它主要負責(zé):

注冊Catalog和表;執(zhí)行 SQL 查詢;注冊用戶自定義函數(shù)(UDF);DataStream 和表之間的轉(zhuǎn)換。

每個表和SQL的執(zhí)行,都必須綁定在一個表環(huán)境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口類,可以通過調(diào)用靜態(tài)的create()方法來創(chuàng)建一個表環(huán)境實例。方法需要傳入一個環(huán)境的配置參數(shù)EnvironmentSettings,它可以指定當(dāng)前表環(huán)境的執(zhí)行模式和計劃器(planner)。執(zhí)行模式有批處理和流處理兩種選擇,默認是流處理模式;計劃器默認使用blink planner。

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 使用流處理模式

.build();

TableEnvironment tableEnv = TableEnvironment.create(setting);

// 對于流處理場景,其實默認配置就完全夠用了。所以我們也可以用另一種更加簡單的方式來創(chuàng)建表環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 這里我們引入了一個“流式表環(huán)境”(StreamTableEnvironment),它是繼承自TableEnvironment的子接口。

// 調(diào)用它的create()方法,只需要直接將當(dāng)前的流執(zhí)行環(huán)境(StreamExecutionEnvironment)傳入,就可以創(chuàng)建出對應(yīng)的流式表環(huán)境了

9.3 創(chuàng)建表

連接器表(Connector Tables)

最直觀的創(chuàng)建表的方式,就是通過連接器(connector)連接到一個外部系統(tǒng),然后定義出對應(yīng)的表結(jié)構(gòu)。在代碼中,我們可以調(diào)用表環(huán)境的executeSql()方法,可以傳入一個DDL作為參數(shù)執(zhí)行SQL操作。這里我們傳入一個CREATE語句進行表的創(chuàng)建,并通過WITH關(guān)鍵字指定連接到外部系統(tǒng)的連接器

tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");

虛擬表(Virtual Tables)

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");

// 這里調(diào)用了表環(huán)境的sqlQuery()方法,直接傳入一條SQL語句作為參數(shù)執(zhí)行查詢,得到的結(jié)果是一個Table對象。

// Table是Table API中提供的核心接口類,就代表了一個Java中定義的表實例。

// 由于newTable是一個Table對象,并沒有在表環(huán)境中注冊;所以如果希望直接在SQL中使用,我們還需要將這個中間結(jié)果表注冊到環(huán)境中

tableEnv.createTemporaryView("NewTable", newTable);

// 類似于視圖

9.4 表的查詢

執(zhí)行SQL進行查詢

// 創(chuàng)建表環(huán)境

TableEnvironment tableEnv = ...;

// 創(chuàng)建表

tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

// 查詢用戶Alice的點擊事件,并提取表中前兩個字段

Table aliceVisitTable = tableEnv.sqlQuery(

"SELECT user, url " +

"FROM EventTable " +

"WHERE user = 'Alice' "

);

// 分組

Table urlCountTable = tableEnv.sqlQuery(

"SELECT user, COUNT(url) " +

"FROM EventTable " +

"GROUP BY user "

);

// 直接將查詢的結(jié)果寫入到已經(jīng)注冊的表

// 注冊表

tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 將查詢結(jié)果輸出到OutputTable中

tableEnv.executeSql (

"INSERT INTO OutputTable " +

"SELECT user, url " +

"FROM EventTable " +

"WHERE user = 'Alice' "

);

調(diào)用Table API進行查詢

// 基于環(huán)境中已注冊的表,可以通過表環(huán)境的from()方法非常容易地得到一個Table對象

Table eventTable = tableEnv.from("EventTable");

// 傳入的參數(shù)就是注冊好的表名。注意這里eventTable是一個Table對象,而EventTable是在環(huán)境中注冊的表名。

// 得到Table對象之后,就可以調(diào)用API進行各種轉(zhuǎn)換操作了,得到的是一個新的Table對象

Table maryClickTable = eventTable

.where($("user").isEqual("Alice"))

.select($("url"), $("user"));

// 這里每個方法的參數(shù)都是一個“表達式”(Expression),用方法調(diào)用的形式直觀地說明了想要表達的內(nèi)容;“$”符號用來指定表中的一個字段。上面的代碼和直接執(zhí)行SQL是等效的

兩種API的結(jié)合使用

無論是調(diào)用Table API還是執(zhí)行SQL,得到的結(jié)果都是一個Table對象;所以這兩種API的查詢可以很方便地結(jié)合在一起

無論是那種方式得到的Table對象,都可以繼續(xù)調(diào)用Table API進行查詢轉(zhuǎn)換;如果想要對一個表執(zhí)行SQL操作(用FROM關(guān)鍵字引用),必須先在環(huán)境中對它進行注冊。所以我們可以通過創(chuàng)建虛擬表的方式實現(xiàn)兩者的轉(zhuǎn)換

tableEnv.createTemporaryView("MyTable", myTable);

9.5 輸出表

表的創(chuàng)建和查詢,就對應(yīng)著流處理中的讀取數(shù)據(jù)源(Source)和轉(zhuǎn)換(Transform);而最后一個步驟Sink,也就是將結(jié)果數(shù)據(jù)輸出到外部系統(tǒng),就對應(yīng)著表的輸出操作。在代碼上,輸出一張表最直接的方法,就是調(diào)用Table的方法executeInsert()方法將一個 Table寫入到注冊過的表中,方法傳入的參數(shù)就是注冊的表名。

// 注冊表,用于輸出數(shù)據(jù)到外部系統(tǒng)

tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 經(jīng)過查詢轉(zhuǎn)換,得到結(jié)果表

Table result = ...

// 將結(jié)果表寫入已注冊的輸出表中

result.executeInsert("OutputTable");

在底層,表的輸出是通過將數(shù)據(jù)寫入到TableSink來實現(xiàn)的。TableSink是Table API中提供的一個向外部系統(tǒng)寫入數(shù)據(jù)的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存儲數(shù)據(jù)庫(比如JDBC、Elasticsearch)和消息隊列(比如Kafka)

public class SqlDemo {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// TODO 1.創(chuàng)建表環(huán)境

// 1.1 寫法一:

// EnvironmentSettings settings = EnvironmentSettings.newInstance()

// .inStreamingMode()

// .build();

// StreamTableEnvironment tableEnv = TableEnvironment.create(settings);

// 1.2 寫法二

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// TODO 2.創(chuàng)建表

tableEnv.executeSql("CREATE TABLE source ( \n" +

" id INT, \n" +

" ts BIGINT, \n" +

" vc INT\n" +

") WITH ( \n" +

" 'connector' = 'datagen', \n" +

" 'rows-per-second'='1', \n" +

" 'fields.id.kind'='random', \n" +

" 'fields.id.min'='1', \n" +

" 'fields.id.max'='10', \n" +

" 'fields.ts.kind'='sequence', \n" +

" 'fields.ts.start'='1', \n" +

" 'fields.ts.end'='1000000', \n" +

" 'fields.vc.kind'='random', \n" +

" 'fields.vc.min'='1', \n" +

" 'fields.vc.max'='100'\n" +

");\n");

tableEnv.executeSql("CREATE TABLE sink (\n" +

" id INT, \n" +

" sumVC INT \n" +

") WITH (\n" +

"'connector' = 'print'\n" +

");\n");

// TODO 3.執(zhí)行查詢

// 3.1 使用sql進行查詢

// Table table = tableEnv.sqlQuery("select id,sum(vc) as sumVC from source where id>5 group by id ;");

// 把table對象,注冊成表名

// tableEnv.createTemporaryView("tmp", table);

// tableEnv.sqlQuery("select * from tmp where id > 7");

// 3.2 用table api來查詢

Table source = tableEnv.from("source");

Table result = source

.where($("id").isGreater(5))

.groupBy($("id"))

.aggregate($("vc").sum().as("sumVC"))

.select($("id"), $("sumVC"));

// TODO 4.輸出表

// 4.1 sql用法

// tableEnv.executeSql("insert into sink select * from tmp");

// 4.2 tableapi用法

result.executeInsert("sink");

}

}

9.6 表和流的轉(zhuǎn)換

將流(DataStream)轉(zhuǎn)換成表(Table)

調(diào)用fromDataStream()方法,想要將一個DataStream轉(zhuǎn)換成表很簡單,可以通過調(diào)用表環(huán)境的fromDataStream()方法來實現(xiàn),返回的就是一個Table對象

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 獲取表環(huán)境

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 讀取數(shù)據(jù)源

SingleOutputStreamOperator sensorDS = env.fromSource(...)

// 將數(shù)據(jù)流轉(zhuǎn)換成表

Table sensorTable = tableEnv.fromDataStream(sensorDS);

// 由于流中的數(shù)據(jù)本身就是定義好的POJO類型WaterSensor,所以我們將流轉(zhuǎn)換成表之后,每一行數(shù)據(jù)就對應(yīng)著一個WaterSensor,而表中的列名就對應(yīng)著WaterSensor中的屬性。

// 另外,我們還可以在fromDataStream()方法中增加參數(shù),用來指定提取哪些屬性作為表中的字段名,并可以任意指定位置

// 提取Event中的timestamp和url作為表中的列

Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));

// 也可以通過表達式的as()方法對字段進行重命名

// 將timestamp字段重命名為ts

Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));

調(diào)用createTemporaryView()方法,調(diào)用fromDataStream()方法簡單直觀,可以直接實現(xiàn)DataStream到Table的轉(zhuǎn)換;不過如果我們希望直接在SQL中引用這張表,就還需要調(diào)用表環(huán)境的createTemporaryView()方法來創(chuàng)建虛擬視圖了。

對于這種場景,也有一種更簡潔的調(diào)用方式。我們可以直接調(diào)用createTemporaryView()方法創(chuàng)建虛擬表,傳入的兩個參數(shù),第一個依然是注冊的表名,而第二個可以直接就是DataStream。之后仍舊可以傳入多個參數(shù),用來指定表中的字段

tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));

// 接下來就可以直接在SQL中引用表sensorTable了

將表(Table)轉(zhuǎn)換成流(DataStream)

// 調(diào)用toDataStream()方法

// 經(jīng)查詢轉(zhuǎn)換得到的表aliceClickTable轉(zhuǎn)換成流打印輸出

tableEnv.toDataStream(table).print();

// 調(diào)用toChangelogStream()方法

Table table = tableEnv.sqlQuery(

"SELECT id, sum(vc) " +

"FROM source " +

"GROUP BY id "

);

// 將表轉(zhuǎn)換成更新日志流

tableEnv.toChangelogStream(table).print();

支持的數(shù)據(jù)類型

(1)原子類型

在Flink中,基礎(chǔ)數(shù)據(jù)類型(Integer、Double、String)和通用數(shù)據(jù)類型(也就是不可再拆分的數(shù)據(jù)類型)統(tǒng)一稱作“原子類型”。原子類型的DataStream,轉(zhuǎn)換之后就成了只有一列的Table,列字段(field)的數(shù)據(jù)類型可以由原子類型推斷出。另外,還可以在fromDataStream()方法里增加參數(shù),用來重新命名列字段

StreamTableEnvironment tableEnv = ...;

DataStream stream = ...;

// 將數(shù)據(jù)流轉(zhuǎn)換成動態(tài)表,動態(tài)表只有一個字段,重命名為myLong

Table table = tableEnv.fromDataStream(stream, $("myLong"));

(2)Tuple類型

當(dāng)原子類型不做重命名時,默認的字段名就是“f0”,容易想到,這其實就是將原子類型看作了一元組Tuple1的處理結(jié)果。Table支持Flink中定義的元組類型Tuple,對應(yīng)在表中字段名默認就是元組中元素的屬性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段還可以通過調(diào)用表達式的as()方法來進行重命名。

StreamTableEnvironment tableEnv = ...;

DataStream> stream = ...;

// 將數(shù)據(jù)流轉(zhuǎn)換成只包含f1字段的表

Table table = tableEnv.fromDataStream(stream, $("f1"));

// 將數(shù)據(jù)流轉(zhuǎn)換成包含f0和f1字段的表,在表中f0和f1位置交換

Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));

// 將f1字段命名為myInt,f0命名為myLong

Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));

(3)POJO 類型

Flink也支持多種數(shù)據(jù)類型組合成的“復(fù)合類型”,最典型的就是簡單Java對象(POJO 類型)。由于POJO中已經(jīng)定義好了可讀性強的字段名,這種類型的數(shù)據(jù)流轉(zhuǎn)換成Table就顯得無比順暢了。將POJO類型的DataStream轉(zhuǎn)換成Table,如果不指定字段名稱,就會直接使用原始 POJO 類型中的字段名稱。POJO中的字段同樣可以被重新排序、提卻和重命名。

StreamTableEnvironment tableEnv = ...;

DataStream stream = ...;

Table table = tableEnv.fromDataStream(stream);

Table table = tableEnv.fromDataStream(stream, $("user"));

Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));

(4)Row類型

Flink中還定義了一個在關(guān)系型表中更加通用的數(shù)據(jù)類型——行(Row),它是Table中數(shù)據(jù)的基本組織形式。Row類型也是一種復(fù)合類型,它的長度固定,而且無法直接推斷出每個字段的類型,所以在使用時必須指明具體的類型信息;我們在創(chuàng)建Table時調(diào)用的CREATE語句就會將所有的字段名稱和類型指定,這在Flink中被稱為表的“模式結(jié)構(gòu)”(Schema)。

綜合應(yīng)用示例

public class TableStreamDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource sensorDS = env.fromElements(

new WaterSensor("s1", 1L, 1),

new WaterSensor("s1", 2L, 2),

new WaterSensor("s2", 2L, 2),

new WaterSensor("s3", 3L, 3),

new WaterSensor("s3", 4L, 4)

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// TODO 1. 流轉(zhuǎn)表

Table sensorTable = tableEnv.fromDataStream(sensorDS);

tableEnv.createTemporaryView("sensor", sensorTable);

Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2");

Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id");

// TODO 2. 表轉(zhuǎn)流

// 2.1 追加流

tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");

// 2.2 changelog流(結(jié)果需要更新)

tableEnv.toChangelogStream(sumTable ).print("sum");

// 只要代碼中調(diào)用了 DataStreamAPI,就需要 execute,否則不需要

env.execute();

}

}

9.7 自定義函數(shù)(UDF)

系統(tǒng)函數(shù)盡管龐大,也不可能涵蓋所有的功能;如果有系統(tǒng)函數(shù)不支持的需求,我們就需要用自定義函數(shù)(User Defined Functions,UDF)來實現(xiàn)了。Flink的Table API和SQL提供了多種自定義函數(shù)的接口,以抽象類的形式定義。當(dāng)前UDF主要有以下幾類:

標(biāo)量函數(shù)(Scalar Functions):將輸入的標(biāo)量值轉(zhuǎn)換成一個新的標(biāo)量值;表函數(shù)(Table Functions):將標(biāo)量值轉(zhuǎn)換成一個或多個新的行數(shù)據(jù),也就是擴展成一個表;聚合函數(shù)(Aggregate Functions):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個新的標(biāo)量值;表聚合函數(shù)(Table Aggregate Functions):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個或多個新的行數(shù)據(jù)。

整體調(diào)用流程

// (1)注冊函數(shù)

// 注冊函數(shù)時需要調(diào)用表環(huán)境的createTemporarySystemFunction()方法,傳入注冊的函數(shù)名以及UDF類的Class對象

tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

// (2)使用Table API調(diào)用函數(shù)

// 在Table API中,需要使用call()方法來調(diào)用自定義函數(shù):

ableEnv.from("MyTable").select(call("MyFunction", $("myField")));

// 這里call()方法有兩個參數(shù),一個是注冊好的函數(shù)名MyFunction,另一個則是函數(shù)調(diào)用時本身的參數(shù)。這里我們定義MyFunction在調(diào)用時,需要傳入的參數(shù)是myField字段

// (3)在SQL中調(diào)用函數(shù)

// 當(dāng)我們將函數(shù)注冊為系統(tǒng)函數(shù)之后,在SQL中的調(diào)用就與內(nèi)置系統(tǒng)函數(shù)完全一樣了

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

標(biāo)量函數(shù)(Scalar Functions)

想要實現(xiàn)自定義的標(biāo)量函數(shù),我們需要自定義一個類來繼承抽象類ScalarFunction,并實現(xiàn)叫作eval() 的求值方法。標(biāo)量函數(shù)的行為就取決于求值方法的定義,它必須是公有的(public),而且名字必須是eval。求值方法eval可以重載多次,任何數(shù)據(jù)類型都可作為求值方法的參數(shù)和返回值類型。

這里需要特別說明的是,ScalarFunction抽象類中并沒有定義eval()方法,所以我們不能直接在代碼中重寫(override);但Table API的框架底層又要求了求值方法必須名字為eval()。

public class MyScalarFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource sensorDS = env.fromElements(

new WaterSensor("s1", 1L, 1),

new WaterSensor("s1", 2L, 2),

new WaterSensor("s2", 2L, 2),

new WaterSensor("s3", 3L, 3),

new WaterSensor("s3", 4L, 4)

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table sensorTable = tableEnv.fromDataStream(sensorDS);

tableEnv.createTemporaryView("sensor", sensorTable);

// TODO 2.注冊函數(shù)

tableEnv.createTemporaryFunction("HashFunction", HashFunction.class);

// TODO 3.調(diào)用 自定義函數(shù)

// 3.1 sql用法

// tableEnv.sqlQuery("select HashFunction(id) from sensor")

// .execute() // 調(diào)用了 sql的execute,就不需要 env.execute()

// .print();

// 3.2 table api用法

sensorTable

.select(call("HashFunction",$("id")))

.execute()

.print();

}

// TODO 1.定義 自定義函數(shù)的實現(xiàn)類

public static class HashFunction extends ScalarFunction{

// 接受任意類型的輸入,返回 INT型輸出

public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {

return o.hashCode();

}

}

}

這里我們自定義了一個ScalarFunction,實現(xiàn)了eval()求值方法,將任意類型的對象傳入,得到一個Int類型的哈希值返回。當(dāng)然,具體的求哈希操作就省略了,直接調(diào)用對象的hashCode()方法即可。

另外注意,由于Table API在對函數(shù)進行解析時需要提取求值方法參數(shù)的類型引用,所以我們用DataTypeHint(inputGroup = InputGroup.ANY)對輸入?yún)?shù)的類型做了標(biāo)注,表示eval的參數(shù)可以是任意類型

表函數(shù)(Table Functions)

跟標(biāo)量函數(shù)一樣,表函數(shù)的輸入?yún)?shù)也可以是 0個、1個或多個標(biāo)量值;不同的是,它可以返回任意多行數(shù)據(jù)?!岸嘈袛?shù)據(jù)”事實上就構(gòu)成了一個表,所以“表函數(shù)”可以認為就是返回一個表的函數(shù),這是一個“一對多”的轉(zhuǎn)換關(guān)系。之前我們介紹過的窗口TVF,本質(zhì)上就是表函數(shù)。

public class MyTableFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource strDS = env.fromElements(

"hello flink",

"hello world hi",

"hello java"

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));

tableEnv.createTemporaryView("str", sensorTable);

// TODO 2.注冊函數(shù)

tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);

// TODO 3.調(diào)用 自定義函數(shù)

// 3.1 交叉聯(lián)結(jié)

tableEnv

// 3.1 交叉聯(lián)結(jié)

// .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")

// 3.2 帶 on true 條件的 左聯(lián)結(jié)

// .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")

// 重命名側(cè)向表中的字段

.sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words)) as T(newWord,newLength) on true")

.execute()

.print();

}

// TODO 1.繼承 TableFunction<返回的類型>

// 類型標(biāo)注: Row包含兩個字段:word和length

@FunctionHint(output = @DataTypeHint("ROW"))

public static class SplitFunction extends TableFunction {

// 返回是 void,用 collect方法輸出

public void eval(String str) {

for (String word : str.split(" ")) {

collect(Row.of(word, word.length()));

}

}

}

}

這里我們直接將表函數(shù)的輸出類型定義成了ROW,這就是得到的側(cè)向表中的數(shù)據(jù)類型;每行數(shù)據(jù)轉(zhuǎn)換后也只有一行。我們分別用交叉聯(lián)結(jié)和左聯(lián)結(jié)兩種方式在SQL中進行了調(diào)用,還可以對側(cè)向表的中字段進行重命名

聚合函數(shù)(Aggregate Functions)

用戶自定義聚合函數(shù)(User Defined AGGregate function,UDAGG)會把一行或多行數(shù)據(jù)(也就是一個表)聚合成一個標(biāo)量值。這是一個標(biāo)準(zhǔn)的“多對一”的轉(zhuǎn)換。聚合函數(shù)的概念我們之前已經(jīng)接觸過多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常見的系統(tǒng)內(nèi)置聚合函數(shù)。而如果有些需求無法直接調(diào)用系統(tǒng)函數(shù)解決,我們就必須自定義聚合函數(shù)來實現(xiàn)功能了。

自定義聚合函數(shù)需要繼承抽象類AggregateFunction。AggregateFunction有兩個泛型參數(shù),T表示聚合輸出的結(jié)果類型,ACC則表示聚合的中間狀態(tài)類型。Flink SQL中的聚合函數(shù)的工作原理如下:

(1)首先,它需要創(chuàng)建一個累加器(accumulator),用來存儲聚合的中間結(jié)果。這與DataStream API中的AggregateFunction非常類似,累加器就可以看作是一個聚合狀態(tài)。調(diào)用createAccumulator()方法可以創(chuàng)建一個空的累加器。

(2)對于輸入的每一行數(shù)據(jù),都會調(diào)用accumulate()方法來更新累加器,這是聚合的核心過程。

(3)當(dāng)所有的數(shù)據(jù)都處理完之后,通過調(diào)用getValue()方法來計算并返回最終的結(jié)果。

所以,每個 AggregateFunction 都必須實現(xiàn)以下幾個方法:

createAccumulator()

這是創(chuàng)建累加器的方法。沒有輸入?yún)?shù),返回類型為累加器類型ACC。

accumulate()

這是進行聚合計算的核心方法,每來一行數(shù)據(jù)都會調(diào)用。它的第一個參數(shù)是確定的,就是當(dāng)前的累加器,類型為ACC,表示當(dāng)前聚合的中間狀態(tài);后面的參數(shù)則是聚合函數(shù)調(diào)用時傳入的參數(shù),可以有多個,類型也可以不同。這個方法主要是更新聚合狀態(tài),所以沒有返回類型。需要注意的是,accumulate()與之前的求值方法eval()類似,也是底層架構(gòu)要求的,必須為public,方法名必須為accumulate,且無法直接override、只能手動實現(xiàn)。

getValue()

這是得到最終返回結(jié)果的方法。輸入?yún)?shù)是ACC類型的累加器,輸出類型為T。在遇到復(fù)雜類型時,F(xiàn)link 的類型推導(dǎo)可能會無法得到正確的結(jié)果。所以AggregateFunction也可以專門對累加器和返回結(jié)果的類型進行聲明,這是通過 getAccumulatorType()和getResultType()兩個方法來指定的。

AggregateFunction 的所有方法都必須是 公有的(public),不能是靜態(tài)的(static),而且名字必須跟上面寫的完全一樣。createAccumulator、getValue、getResultType 以及 getAccumulatorType 這幾個方法是在抽象類 AggregateFunction 中定義的,可以override;而其他則都是底層架構(gòu)約定的方法。

// 學(xué)生的分?jǐn)?shù)表ScoreTable中計算每個學(xué)生的加權(quán)平均分

public class MyAggregateFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 姓名,分?jǐn)?shù),權(quán)重

DataStreamSource> scoreWeightDS = env.fromElements(

Tuple3.of("zs",80, 3),

Tuple3.of("zs",90, 4),

Tuple3.of("zs",95, 4),

Tuple3.of("ls",75, 4),

Tuple3.of("ls",65, 4),

Tuple3.of("ls",85, 4)

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight"));

tableEnv.createTemporaryView("scores", scoreWeightTable);

// TODO 2.注冊函數(shù)

tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class);

// TODO 3.調(diào)用 自定義函數(shù)

tableEnv

.sqlQuery("select name,WeightedAvg(score,weight) from scores group by name")

.execute()

.print();

}

// TODO 1.繼承 AggregateFunction< 返回類型,累加器類型<加權(quán)總和,權(quán)重總和> >

public static class WeightedAvg extends AggregateFunction> {

@Override

public Double getValue(Tuple2 integerIntegerTuple2) {

return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1;

}

@Override

public Tuple2 createAccumulator() {

return Tuple2.of(0, 0);

}

/**

* 累加計算的方法,每來一行數(shù)據(jù)都會調(diào)用一次

* @param acc 累加器類型

* @param score 第一個參數(shù):分?jǐn)?shù)

* @param weight 第二個參數(shù):權(quán)重

*/

public void accumulate(Tuple2 acc,Integer score,Integer weight){

acc.f0 += score * weight; // 加權(quán)總和 = 分?jǐn)?shù)1 * 權(quán)重1 + 分?jǐn)?shù)2 * 權(quán)重2 +....

acc.f1 += weight; // 權(quán)重和 = 權(quán)重1 + 權(quán)重2 +....

}

}

}

表聚合函數(shù)(Table Aggregate Functions)

用戶自定義表聚合函數(shù)(UDTAGG)可以把一行或多行數(shù)據(jù)(也就是一個表)聚合成另一張表,結(jié)果表中可以有多行多列。很明顯,這就像表函數(shù)和聚合函數(shù)的結(jié)合體,是一個“多對多”的轉(zhuǎn)換。

自定義表聚合函數(shù)需要繼承抽象類TableAggregateFunction。TableAggregateFunction的結(jié)構(gòu)和原理與AggregateFunction非常類似,同樣有兩個泛型參數(shù),用一個ACC類型的累加器(accumulator)來存儲聚合的中間結(jié)果。聚合函數(shù)中必須實現(xiàn)的三個方法,在TableAggregateFunction中也必須對應(yīng)實現(xiàn):

createAccumulator()

創(chuàng)建累加器的方法,與AggregateFunction中用法相同。

accumulate()

聚合計算的核心方法,與AggregateFunction中用法相同。

emitValue()

所有輸入行處理完成后,輸出最終計算結(jié)果的方法。這個方法對應(yīng)著AggregateFunction中的getValue()方法;區(qū)別在于emitValue沒有輸出類型,而輸入?yún)?shù)有兩個:第一個是ACC類型的累加器,第二個則是用于輸出數(shù)據(jù)的“收集器”out,它的類型為Collect。另外,emitValue()在抽象類中也沒有定義,無法override,必須手動實現(xiàn)。

表聚合函數(shù)相對比較復(fù)雜,它的一個典型應(yīng)用場景就是TOP-N查詢。比如我們希望選出一組數(shù)據(jù)排序后的前兩名,這就是最簡單的TOP-2查詢。沒有現(xiàn)成的系統(tǒng)函數(shù),那么我們就可以自定義一個表聚合函數(shù)來實現(xiàn)這個功能。在累加器中應(yīng)該能夠保存當(dāng)前最大的兩個值,每當(dāng)來一條新數(shù)據(jù)就在accumulate()方法中進行比較更新,最終在emitValue()中調(diào)用兩次out.collect()將前兩名數(shù)據(jù)輸出。

public class MyTableAggregateFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 姓名,分?jǐn)?shù),權(quán)重

DataStreamSource numDS = env.fromElements(3, 6, 12, 5, 8, 9, 4);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table numTable = tableEnv.fromDataStream(numDS, $("num"));

// TODO 2.注冊函數(shù)

tableEnv.createTemporaryFunction("Top2", Top2.class);

// TODO 3.調(diào)用 自定義函數(shù): 只能用 Table API

numTable

.flatAggregate(call("Top2", $("num")).as("value", "rank"))

.select( $("value"), $("rank"))

.execute().print();

}

// TODO 1.繼承 TableAggregateFunction< 返回類型,累加器類型<加權(quán)總和,權(quán)重總和> >

// 返回類型 (數(shù)值,排名) =》 (12,1) (9,2)

// 累加器類型 (第一大的數(shù),第二大的數(shù)) ===》 (12,9)

public static class Top2 extends TableAggregateFunction, Tuple2> {

@Override

public Tuple2 createAccumulator() {

return Tuple2.of(0, 0);

}

/**

* 每來一個數(shù)據(jù)調(diào)用一次,比較大小,更新 最大的前兩個數(shù)到 acc中

*

* @param acc 累加器

* @param num 過來的數(shù)據(jù)

*/

public void accumulate(Tuple2 acc, Integer num) {

if (num > acc.f0) {

// 新來的變第一,原來的第一變第二

acc.f1 = acc.f0;

acc.f0 = num;

} else if (num > acc.f1) {

// 新來的變第二,原來的第二不要了

acc.f1 = num;

}

}

/**

* 輸出結(jié)果: (數(shù)值,排名)兩條最大的

*

* @param acc 累加器

* @param out 采集器<返回類型>

*/

public void emitValue(Tuple2 acc, Collector> out) {

if (acc.f0 != 0) {

out.collect(Tuple2.of(acc.f0, 1));

}

if (acc.f1 != 0) {

out.collect(Tuple2.of(acc.f1, 2));

}

}

}

}

柚子快報邀請碼778899分享:大數(shù)據(jù) Flink SQL

http://yzkb.51969.com/

相關(guān)文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18872379.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄