柚子快報邀請碼778899分享:大數(shù)據(jù) Flink SQL
柚子快報邀請碼778899分享:大數(shù)據(jù) Flink SQL
文章目錄
一、Flink SQL1、sql-client準(zhǔn)備1.1 基于yarn-session模式1.2 常用配置
2、流處理中的表2.1 動態(tài)表和持續(xù)查詢2.2 將流轉(zhuǎn)換成動態(tài)表2.3 用SQL持續(xù)查詢2.4 將動態(tài)表轉(zhuǎn)換為流
3、時間屬性3.1 事件時間3.2 處理時間
4、DDL(Data Definition Language)數(shù)據(jù)定義4.1 數(shù)據(jù)庫4.2 表
5、查詢5.1 DataGen & Print5.2 With子句5.3 SELECT & WHERE 子句5.4 分組聚合5.5 分組窗口聚合5.6 窗口表值函數(shù)(TVF)聚合5.7 Over 聚合5.8 特殊語法 —— TOP-N5.9 特殊語法 —— Deduplication去重5.10 聯(lián)結(jié)(Join)查詢常規(guī)聯(lián)結(jié)查詢間隔聯(lián)結(jié)查詢維表聯(lián)結(jié)查詢
5.11 Order by 和 limit5.12 SQL Hints5.13 集合操作5.14 系統(tǒng)函數(shù)標(biāo)量函數(shù)(Scalar Functions)聚合函數(shù)(Aggregate Functions)
5.15 Module操作
6、常用Connector讀寫6.1 Kafka6.2 File6.3 JDBC(MySQL)
7、sql-client 中使用 savepoint8、Catalog8.1 Catalog類型8.2 JdbcCatalog(MySQL)8.3 HiveCatalog
9、代碼中使用FlinkSQL9.1 環(huán)境準(zhǔn)備9.2 創(chuàng)建表環(huán)境9.3 創(chuàng)建表9.4 表的查詢9.5 輸出表9.6 表和流的轉(zhuǎn)換將流(DataStream)轉(zhuǎn)換成表(Table)將表(Table)轉(zhuǎn)換成流(DataStream)支持的數(shù)據(jù)類型綜合應(yīng)用示例
9.7 自定義函數(shù)(UDF)整體調(diào)用流程標(biāo)量函數(shù)(Scalar Functions)表函數(shù)(Table Functions)聚合函數(shù)(Aggregate Functions)表聚合函數(shù)(Table Aggregate Functions)
一、Flink SQL
Table API和SQL是最上層的API,在Flink中這兩種API被集成在一起,SQL執(zhí)行的對象也是Flink中的表(Table),所以我們一般會認為它們是一體的。Flink是批流統(tǒng)一的處理框架,無論是批處理(DataSet API)還是流處理(DataStream API),在上層應(yīng)用中都可以直接使用Table API或者SQL來實現(xiàn);這兩種API對于一張表執(zhí)行相同的查詢操作,得到的結(jié)果是完全一樣的。我們主要還是以流處理應(yīng)用為例進行講解。
SQL API 是基于 SQL 標(biāo)準(zhǔn)的 Apache Calcite 框架實現(xiàn)的,可通過純 SQL 來開發(fā)和運行一個Flink 任務(wù)
1、sql-client準(zhǔn)備
1.1 基于yarn-session模式
# 啟動Flink
/opt/module/flink/bin/yarn-session.sh -d
# 啟動Flink的sql-client
/opt/module/flink/bin/sql-client.sh embedded -s yarn-session
1.2 常用配置
# 結(jié)果顯示模式
# 默認table,還可以設(shè)置為tableau、changelog
SET sql-client.execution.result-mode=tableau;
# 執(zhí)行環(huán)境
SET execution.runtime-mode=streaming; #默認streaming,也可以設(shè)置batch
# 默認并行度
SET parallelism.default=1;
# 設(shè)置狀態(tài)TTL
SET table.exec.state.ttl=1000;
# 通過sql文件初始化
vim conf/sql-client-init.sql
SET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;
# 啟動時,指定sql文件
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
2、流處理中的表
關(guān)系型表/SQL流處理處理的數(shù)據(jù)對象字段元組的有界集合字段元組的無限序列查詢(Query)對數(shù)據(jù)的訪問可以訪問到完整的數(shù)據(jù)輸入無法訪問到所有數(shù)據(jù),必須“持續(xù)”等待流式輸入查詢終止條件生成固定大小的結(jié)果集后終止永不停止,根據(jù)持續(xù)收到的數(shù)據(jù)不斷更新查詢結(jié)果
2.1 動態(tài)表和持續(xù)查詢
動態(tài)表(Dynamic Tables)
當(dāng)流中有新數(shù)據(jù)到來,初始的表中會插入一行;而基于這個表定義的SQL查詢,就應(yīng)該在之前的基礎(chǔ)上更新結(jié)果。這樣得到的表就會不斷地動態(tài)變化,被稱為“動態(tài)表”(Dynamic Tables)
動態(tài)表是Flink在Table API和SQL中的核心概念,它為流數(shù)據(jù)處理提供了表和SQL支持。我們所熟悉的表一般用來做批處理,面向的是固定的數(shù)據(jù)集,可以認為是“靜態(tài)表”;而動態(tài)表則完全不同,它里面的數(shù)據(jù)會隨時間變化
持續(xù)查詢(Continuous Query)
動態(tài)表可以像靜態(tài)的批處理表一樣進行查詢操作。由于數(shù)據(jù)在不斷變化,因此基于它定義的SQL查詢也不可能執(zhí)行一次就得到最終結(jié)果。這樣一來,我們對動態(tài)表的查詢也就永遠不會停止,一直在隨著新數(shù)據(jù)的到來而繼續(xù)執(zhí)行。這樣的查詢就被稱作“持續(xù)查詢”(Continuous Query)。對動態(tài)表定義的查詢操作,都是持續(xù)查詢;而持續(xù)查詢的結(jié)果也會是一個動態(tài)表。
由于每次數(shù)據(jù)到來都會觸發(fā)查詢操作,因此可以認為一次查詢面對的數(shù)據(jù)集,就是當(dāng)前輸入動態(tài)表中收到的所有數(shù)據(jù)。這相當(dāng)于是對輸入動態(tài)表做了一個“快照”(snapshot),當(dāng)作有限數(shù)據(jù)集進行批處理;流式數(shù)據(jù)的到來會觸發(fā)連續(xù)不斷的快照查詢,像動畫一樣連貫起來,就構(gòu)成了“持續(xù)查詢”
持續(xù)查詢的步驟如下:
流(stream)被轉(zhuǎn)換為動態(tài)表(dynamic table);對動態(tài)表進行持續(xù)查詢(continuous query),生成新的動態(tài)表;生成的動態(tài)表被轉(zhuǎn)換成流
2.2 將流轉(zhuǎn)換成動態(tài)表
如果把流看作一張表,那么流中每個數(shù)據(jù)的到來,都應(yīng)該看作是對表的一次插入(Insert)操作,會在表的末尾添加一行數(shù)據(jù)。因為流是連續(xù)不斷的,而且之前的輸出結(jié)果無法改變、只能在后面追加;所以我們其實是通過一個只有插入操作(insert-only)的更新日志(changelog)流,來構(gòu)建一個表。
2.3 用SQL持續(xù)查詢
更新(Update)查詢
// 我們在代碼中定義了一個SQL查詢
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
當(dāng)原始動態(tài)表不停地插入新的數(shù)據(jù)時,查詢得到的urlCountTable會持續(xù)地進行更改。由于count數(shù)量可能會疊加增長,因此這里的更改操作可以是簡單的插入(Insert),也可以是對之前數(shù)據(jù)的更新(Update)。這種持續(xù)查詢被稱為更新查詢(Update Query),更新查詢得到的結(jié)果表如果想要轉(zhuǎn)換成DataStream,必須調(diào)用toChangelogStream()方法
追加(Append)查詢
// 如果我們執(zhí)行一個簡單的條件查詢,結(jié)果表中就會像原始表EventTable一樣,只有插入(Insert)操作了
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");
// 這樣的持續(xù)查詢,就被稱為追加查詢(Append Query),它定義的結(jié)果表的更新日志(changelog)流中只有INSERT操作。
由于窗口的統(tǒng)計結(jié)果是一次性寫入結(jié)果表的,所以結(jié)果表的更新日志流中只會包含插入INSERT操作,而沒有更新UPDATE操作。所以這里的持續(xù)查詢,依然是一個追加(Append)查詢。結(jié)果表result如果轉(zhuǎn)換成DataStream,可以直接調(diào)用toDataStream()方法。
2.4 將動態(tài)表轉(zhuǎn)換為流
與關(guān)系型數(shù)據(jù)庫中的表一樣,動態(tài)表也可以通過插入(Insert)、更新(Update)和刪除(Delete)操作,進行持續(xù)的更改。將動態(tài)表轉(zhuǎn)換為流或?qū)⑵鋵懭胪獠肯到y(tǒng)時,就需要對這些更改操作進行編碼,通過發(fā)送編碼消息的方式告訴外部系統(tǒng)要執(zhí)行的操作。在Flink中,Table API和SQL支持三種編碼方式:
僅追加(Append-only)流:僅通過插入(Insert)更改來修改的動態(tài)表,可以直接轉(zhuǎn)換為“僅追加”流。這個流中發(fā)出的數(shù)據(jù),其實就是動態(tài)表中新增的每一行撤回(Retract)流:撤回流是包含兩類消息的流,添加(add)消息和撤回(retract)消息。具體的編碼規(guī)則是:INSERT插入操作編碼為add消息;DELETE刪除操作編碼為retract消息;而UPDATE更新操作則編碼為被更改行的retract消息,和更新后行(新行)的add消息。這樣,我們可以通過編碼后的消息指明所有的增刪改操作,一個動態(tài)表就可以轉(zhuǎn)換為撤回流了。
更新插入(Upsert)流:更新插入流中只包含兩種類型的消息:更新插入(upsert)消息和刪除(delete)消息。所謂的“upsert”其實是“update”和“insert”的合成詞,所以對于更新插入流來說,INSERT插入操作和UPDATE更新操作,統(tǒng)一被編碼為upsert消息;而DELETE刪除操作則被編碼為delete消息
在代碼里將動態(tài)表轉(zhuǎn)換為DataStream時,只支持僅追加(append-only)和撤回(retract)流,我們調(diào)用toChangelogStream()得到的其實就是撤回流。而連接到外部系統(tǒng)時,則可以支持不同的編碼方法,這取決于外部系統(tǒng)本身的特性
3、時間屬性
基于時間的操作(比如時間窗口),需要定義相關(guān)的時間語義和時間數(shù)據(jù)來源的信息。在Table API和SQL中,會給表單獨提供一個邏輯上的時間字段,專門用來在表處理程序中指示時間。
所以所謂的時間屬性(time attributes),其實就是每個表模式結(jié)構(gòu)(schema)的一部分。它可以在創(chuàng)建表的DDL里直接定義為一個字段,也可以在DataStream轉(zhuǎn)換成表時定義。一旦定義了時間屬性,它就可以作為一個普通字段引用,并且可以在基于時間的操作中使用。
時間屬性的數(shù)據(jù)類型必須為TIMESTAMP,它的行為類似于常規(guī)時間戳,可以直接訪問并且進行計算。按照時間語義的不同,可以把時間屬性的定義分成**事件時間(event time)和處理時間(processing time)**兩種情況
3.1 事件時間
## 事件時間屬性可以在創(chuàng)建表DDL中定義,增加一個字段,通過WATERMARK語句來定義事件時間屬性
CREATE TABLE EventTable(
user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
## 這里我們把ts字段定義為事件時間屬性,而且基于ts設(shè)置了5秒的水位線延遲
## 時間戳類型必須是 TIMESTAMP 或者TIMESTAMP_LTZ 類型。
## 但是時間戳一般都是秒或者是毫秒(BIGINT 類型),這種情況可以通過如下方式轉(zhuǎn)換
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
3.2 處理時間
在定義處理時間屬性時,必須要額外聲明一個字段,專門用來保存當(dāng)前的處理時間。在創(chuàng)建表的DDL(CREATE TABLE語句)中,可以增加一個額外的字段,通過調(diào)用系統(tǒng)內(nèi)置的PROCTIME()函數(shù)來指定當(dāng)前的處理時間屬性
CREATE TABLE EventTable(
user STRING,
url STRING,
ts AS PROCTIME()
) WITH (
...
);
4、DDL(Data Definition Language)數(shù)據(jù)定義
4.1 數(shù)據(jù)庫
## 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
# 舉例
CREATE DATABASE db_flink;
## 查詢數(shù)據(jù)庫
SHOW DATABASES;
# 查詢當(dāng)前數(shù)據(jù)庫
SHOW CURRENT DATABASE;
# 修改數(shù)據(jù)庫
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
# 刪除數(shù)據(jù)庫
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
# RESTRICT:刪除非空數(shù)據(jù)庫會觸發(fā)異常。默認啟用
# CASCADE:刪除非空數(shù)據(jù)庫也會刪除所有相關(guān)的表和函數(shù)
DROP DATABASE db_flink2;
## 切換當(dāng)前數(shù)據(jù)庫
USE database_name;
4.2 表
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{
[
[
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [(
physical_column_definition
物理列是數(shù)據(jù)庫中所說的常規(guī)列。其定義了物理介質(zhì)中存儲的數(shù)據(jù)中字段的名稱、類型和順序。其他類型的列可以在物理列之間聲明,但不會影響最終的物理列的讀取
metadata_column_definition
元數(shù)據(jù)列是 SQL 標(biāo)準(zhǔn)的擴展,允許訪問數(shù)據(jù)源本身具有的一些元數(shù)據(jù)。元數(shù)據(jù)列由 METADATA 關(guān)鍵字標(biāo)識。例如,我們可以使用元數(shù)據(jù)列從Kafka記錄中讀取和寫入時間戳,用于基于時間的操作(這個時間戳不是數(shù)據(jù)中的某個時間戳字段,而是數(shù)據(jù)寫入 Kafka 時,Kafka 引擎給這條數(shù)據(jù)打上的時間戳標(biāo)記)。connector和format文檔列出了每個組件可用的元數(shù)據(jù)字段
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
## 如果自定義的列名稱和 Connector 中定義 metadata 字段的名稱一樣, FROM xxx 子句可省略
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
## 如果自定義列的數(shù)據(jù)類型和 Connector 中定義的 metadata 字段的數(shù)據(jù)類型不一致,程序運行時會自動 cast強轉(zhuǎn),但是這要求兩種數(shù)據(jù)類型是可以強轉(zhuǎn)的
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 將時間戳強轉(zhuǎn)為 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
## 默認情況下,F(xiàn)link SQL planner 認為 metadata 列可以讀取和寫入。
## 然而,在許多情況下,外部系統(tǒng)提供的只讀元數(shù)據(jù)字段比可寫字段多。因此,可以使用VIRTUAL關(guān)鍵字排除元數(shù)據(jù)列的持久化(表示只讀)
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
computed_column_definition
計算列是使用語法column_name AS computed_column_expression生成的虛擬列。計算列就是拿已有的一些列經(jīng)過一些自定義的運算生成的新列,在物理上并不存儲在表中,只能讀不能寫。列的數(shù)據(jù)類型從給定的表達式自動派生,無需手動聲明。
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity
) WITH (
'connector' = 'kafka'
...
);
定義Watermark
Flink SQL 提供了幾種 WATERMARK 生產(chǎn)策略:
(1) 嚴(yán)格升序:WATERMARK FOR rowtime_column AS rowtime_column。Flink 任務(wù)認為時間戳只會越來越大,也不存在相等的情況,只要相等或者小于之前的,就認為是遲到的數(shù)據(jù)。
(2) 遞增:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND 。一般基本不用這種方式。如果設(shè)置此類,則允許有相同的時間戳出現(xiàn)。
(3) 有界無序: WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘string’ timeUnit 。此類策略就可以用于設(shè)置最大亂序時間,假如設(shè)置為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND ,則生成的是運行 5s 延遲的Watermark。一般都用這種 Watermark 生成策略,此類 Watermark 生成策略通常用于有數(shù)據(jù)亂序的場景中,而對應(yīng)到實際的場景中,數(shù)據(jù)都是會存在亂序的,所以基本都使用此類策略。
PRIMARY KEY
主鍵約束表明表中的一列或一組列是唯一的,并且它們不包含NULL值。主鍵唯一地標(biāo)識表中的一行,只支持 not enforced
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced
) WITH (
'connector' = 'kafka'
...
);
PARTITIONED BY
創(chuàng)建分區(qū)表
with語句
用于創(chuàng)建表的表屬性,用于指定外部存儲系統(tǒng)的元數(shù)據(jù)信息。配置屬性時,表達式key1=val1的鍵和值都應(yīng)該是字符串字面值。如下是Kafka的映射表
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
一般 with 中的配置項由 Flink SQL 的 Connector(鏈接外部存儲的連接器) 來定義,每種 Connector 提供的with 配置項都是不同的。
LIKE
用于基于現(xiàn)有表的定義創(chuàng)建表。此外,用戶可以擴展原始表或排除表的某些部分??梢允褂迷撟泳渲赜?可能還會覆蓋)某些連接器屬性,或者向外部定義的表添加水印。
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
AS select_statement(CTAS)
在一個create-table-as-select (CTAS)語句中,還可以通過查詢的結(jié)果創(chuàng)建和填充表。CTAS是使用單個命令創(chuàng)建數(shù)據(jù)并向表中插入數(shù)據(jù)的最簡單、最快速的方法。
CREATE TABLE my_ctas_table
WITH (
'connector' = 'kafka',
...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
# 注意:CTAS有以下限制:
# 暫不支持創(chuàng)建臨時表。
# 目前還不支持指定顯式列。
# 還不支持指定顯式水印。
# 目前還不支持創(chuàng)建分區(qū)表。
# 目前還不支持指定主鍵約束
## 舉例
CREATE TABLE test(
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'print'
);
CREATE TABLE test1 (
`value` STRING
)
LIKE test;
常用操作
## 查看表
# 查看所有表
SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE
# 如果沒有指定數(shù)據(jù)庫,則從當(dāng)前數(shù)據(jù)庫返回表。
# LIKE子句中sql pattern的語法與MySQL方言的語法相同:
# %匹配任意數(shù)量的字符,甚至零字符,\%匹配一個'%'字符。
# _只匹配一個字符,\_只匹配一個'_'字符
## 查看表信息
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
## 修改表
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
## 刪除表
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
5、查詢
5.1 DataGen & Print
CREATE TABLE source (
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='10',
'fields.ts.kind'='sequence',
'fields.ts.start'='1',
'fields.ts.end'='1000000',
'fields.vc.kind'='random',
'fields.vc.min'='1',
'fields.vc.max'='100'
);
# 這樣創(chuàng)建的 "source" 表將包含三列數(shù)據(jù):id(隨機整數(shù)在1和10之間)、
# ts(從1遞增到1000000的遞增序列)和vc(隨機整數(shù)在1和100之間)。
# 每秒鐘會生成一行這樣的模擬測試數(shù)據(jù)。您可以使用這個表作為 Flink 程序的輸入源,對數(shù)據(jù)進行處理和分析。
CREATE TABLE sink (
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'print'
);
# 查詢源表
select * from source;
# 插入sink表并查詢,這個需要去flink UI界面查看輸出
INSERT INTO sink select * from source;
select * from sink;
5.2 With子句
WITH提供了一種編寫輔助語句的方法,以便在較大的查詢中使用。這些語句通常被稱為公共表表達式(Common Table Expression, CTE),可以認為它們定義了僅為一個查詢而存在的臨時視圖
WITH
SELECT ... FROM ...;
with_item_name (column_name[, ...n]) AS (
## 舉例
WITH source_with_total AS (
SELECT id, vc+10 AS total
FROM source
)
SELECT id, SUM(total)
FROM source_with_total
GROUP BY id;
5.3 SELECT & WHERE 子句
SELECT select_list FROM table_expression [ WHERE boolean_expression ]
# 舉例
SELECT * FROM source;
SELECT id, vc + 10 FROM source;
-- 自定義 Source 的數(shù)據(jù)
SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (id, price);
SELECT vc + 10 FROM source WHERE id >10;
5.4 分組聚合
## group聚合案例
CREATE TABLE source1 (
dim STRING,
user_id BIGINT,
price BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
CREATE TABLE sink1 (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
insert into sink1
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數(shù)
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source1
group by
dim,
-- UNIX_TIMESTAMP得到秒的時間戳,將秒級別時間戳 / 60 轉(zhuǎn)化為 1min,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);
## 多維分析
# Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (
VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4)
)
-- 供應(yīng)商id、產(chǎn)品id、評級
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS(
(supplier_id, product_id, rating),
(supplier_id, product_id),
(supplier_id, rating),
(supplier_id),
(product_id, rating),
(product_id),
(rating),
()
);
5.5 分組窗口聚合
從1.13版本開始,分組窗口聚合已經(jīng)標(biāo)記為過時,鼓勵使用更強大、更有效的窗口TVF聚合,在這里簡單做個介紹。直接把窗口自身作為分組key放在GROUP BY之后的,所以也叫“分組窗口聚合”。SQL查詢的分組窗口是通過 GROUP BY 子句定義的。類似于使用常規(guī) GROUP BY 語句的查詢,窗口分組語句的 GROUP BY 子句中帶有一個窗口函數(shù)為每個分組計算出一個結(jié)果。SQL中只支持基于時間的窗口,不支持基于元素個數(shù)的窗口
分組窗口函數(shù)描述TUMBLE(time_attr, interval)定義一個滾動窗口。滾動窗口把行分配到有固定持續(xù)時間( interval )的不重疊的連續(xù)窗口。比如,5 分鐘的滾動窗口以 5 分鐘為間隔對行進行分組。滾動窗口可以定義在事件時間(批處理、流處理)或處理時間(流處理)上。HOP(time_attr, interval, interval)定義一個跳躍的時間窗口(在 Table API 中稱為滑動窗口)?;瑒哟翱谟幸粋€固定的持續(xù)時間( 第二個 interval 參數(shù) )以及一個滑動的間隔(第一個 interval 參數(shù) )。若滑動間隔小于窗口的持續(xù)時間,滑動窗口則會出現(xiàn)重疊;因此,行將會被分配到多個窗口中。比如,一個大小為 15 分組的滑動窗口,其滑動間隔為 5 分鐘,將會把每一行數(shù)據(jù)分配到 3 個 15 分鐘的窗口中?;瑒哟翱诳梢远x在事件時間(批處理、流處理)或處理時間(流處理)上。SESSION(time_attr, interval)定義一個會話時間窗口。會話時間窗口沒有一個固定的持續(xù)時間,但是它們的邊界會根據(jù) interval 所定義的不活躍時間所確定;即一個會話時間窗口在定義的間隔時間內(nèi)沒有時間出現(xiàn),該窗口會被關(guān)閉。例如時間窗口的間隔時間是 30 分鐘,當(dāng)其不活躍的時間達到30分鐘后,若觀測到新的記錄,則會啟動一個新的會話時間窗口(否則該行數(shù)據(jù)會被添加到當(dāng)前的窗口),且若在 30 分鐘內(nèi)沒有觀測到新紀(jì)錄,這個窗口將會被關(guān)閉。會話時間窗口可以使用事件時間(批處理、流處理)或處理時間(流處理)。
# 數(shù)據(jù)準(zhǔn)備
CREATE TABLE ws (
id INT,
vc INT,
pt AS PROCTIME(), --處理時間
et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件時間
WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.id.min' = '1',
'fields.id.max' = '3',
'fields.vc.min' = '1',
'fields.vc.max' = '100'
);
# 滾動窗口示例(時間屬性字段,窗口長度)
select
id,
TUMBLE_START(et, INTERVAL '5' SECOND) wstart,
TUMBLE_END(et, INTERVAL '5' SECOND) wend,
sum(vc) sumVc
from ws
group by id, TUMBLE(et, INTERVAL '5' SECOND);
# 滑動窗口(時間屬性字段,滑動步長,窗口長度)
select
id,
HOP_START(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wstart,
HOP_END(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wend,
sum(vc) sumVc
from ws
group by id, HOP(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND);
# 會話窗口(時間屬性字段,會話間隔)
select
id,
SESSION_START(et, INTERVAL '5' SECOND) wstart,
SESSION_END(et, INTERVAL '5' SECOND) wend,
sum(vc) sumVc
from ws
group by id, SESSION(et, INTERVAL '5' SECOND);
5.6 窗口表值函數(shù)(TVF)聚合
對比GroupWindow,TVF窗口更有效和強大。包括:
提供更多的性能優(yōu)化手段支持GroupingSets語法可以在window聚合中使用TopN提供累積窗口
對于窗口表值函數(shù),窗口本身返回的是就是一個表,所以窗口會出現(xiàn)在FROM后面,GROUP BY后面的則是窗口新增的字段window_start和window_end
FROM TABLE(
窗口類型(TABLE 表名, DESCRIPTOR(時間字段),INTERVAL時間…)
)
GROUP BY [window_start,][window_end,] --可選
## 滾動窗口
SELECT
window_start,
window_end,
id , SUM(vc)
sumVC
FROM TABLE(
TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, id;
## 滑動窗口
# 要求: 窗口長度=滑動步長的整數(shù)倍(底層會優(yōu)化成多個小滾動窗口)
SELECT window_start, window_end, id , SUM(vc) sumVC
FROM TABLE(
HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, id;
## 累積窗口
# 累積窗口會在一定的統(tǒng)計周期內(nèi)進行累積計算。累積窗口中有兩個核心的參數(shù):最大窗口長度(max window size)和累積步長(step)。
# 所謂的最大窗口長度其實就是我們所說的“統(tǒng)計周期”,最終目的就是統(tǒng)計這段時間內(nèi)的數(shù)據(jù)
# 其實就是固定窗口間隔內(nèi)提前觸發(fā)的的滾動窗口 ,其實就是 Tumble Window + early-fire 的一個事件時間的版本。
# 例如,從每日零點到當(dāng)前這一分鐘繪制累積 UV,其中 10:00 時的 UV 表示從 00:00 到 10:00 的 UV 總數(shù)
# 累積窗口可以認為是首先開一個最大窗口大小的滾動窗口,然后根據(jù)用戶設(shè)置的觸發(fā)的時間間隔將這個滾動窗口拆分為多個窗口,這些窗口具有相同的窗口起點和不同的窗口終點
# 注意: 窗口最大長度 = 累積步長的整數(shù)倍
SELECT
window_start,
window_end,
id ,
SUM(vc) sumVC
FROM TABLE(
CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS))
GROUP BY window_start, window_end, id;
## grouping sets多維分析
SELECT
window_start,
window_end,
id ,
SUM(vc) sumVC
FROM TABLE(
TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end,
rollup( (id) )
-- cube( (id) )
-- grouping sets( (id),() )
;
5.7 Over 聚合
OVER聚合為一系列有序行的每個輸入行計算一個聚合值。與GROUP BY聚合相比,OVER聚合不會將每個組的結(jié)果行數(shù)減少為一行。相反,OVER聚合為每個輸入行生成一個聚合值??梢栽谑录r間或處理時間,以及指定為時間間隔、或行計數(shù)的范圍內(nèi),定義Over windows
## 語法
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
# ORDER BY:必須是時間戳列(事件時間、處理時間),只能升序
# PARTITION BY:標(biāo)識了聚合窗口的聚合粒度
# range_definition:這個標(biāo)識聚合窗口的聚合數(shù)據(jù)范圍,在 Flink 中有兩種指定數(shù)據(jù)范圍的方式。第一種為按照行數(shù)聚合,第二種為按照時間區(qū)間聚合
# 舉例
# 按照時間區(qū)間聚合
# 統(tǒng)計每個傳感器前10秒到現(xiàn)在收到的水位數(shù)據(jù)條數(shù)
SELECT
id,
et,
vc,
count(vc) OVER (
PARTITION BY id
ORDER BY et
RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
) AS cnt
FROM ws
# 也可以用WINDOW子句來在SELECT外部單獨定義一個OVER窗口,可以多次使用
SELECT
id,
et,
vc,
count(vc) OVER w AS cnt,
sum(vc) OVER w AS sumVC
FROM ws
WINDOW w AS (
PARTITION BY id
ORDER BY et
RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
)
## 按照行數(shù)聚合
# 統(tǒng)計每個傳感器前5條到現(xiàn)在數(shù)據(jù)的平均水位
SELECT
id,
et,
vc,
avg(vc) OVER (
PARTITION BY id
ORDER BY et
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS avgVC
FROM ws
# 也可以用WINDOW子句來在SELECT外部單獨定義一個OVER窗口
SELECT
id,
et,
vc,
avg(vc) OVER w AS avgVC,
count(vc) OVER w AS cnt
FROM ws
WINDOW w AS (
PARTITION BY id
ORDER BY et
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
)
5.8 特殊語法 —— TOP-N
目前在Flink SQL中沒有能夠直接調(diào)用的TOP-N函數(shù),而是提供了稍微復(fù)雜些的變通實現(xiàn)方法,是固定寫法,特殊支持的over用法
## 語法
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
# ROW_NUMBER() :標(biāo)識 TopN 排序子句
# PARTITION BY col1[, col2...] :標(biāo)識分區(qū)字段,代表按照這個 col 字段作為分區(qū)粒度對數(shù)據(jù)進行排序取 topN,比如下述案例中的 partition by key ,就是根據(jù)需求中的搜索關(guān)鍵詞(key)做為分區(qū)
# ORDER BY col1 [asc|desc][, col2 [asc|desc]...] :標(biāo)識 TopN 的排序規(guī)則,是按照哪些字段、順序或逆序進行排序,可以不是時間字段,也可以降序(TopN特殊支持)
# WHERE rownum <= N :這個子句是一定需要的,只有加上了這個子句,F(xiàn)link 才能將其識別為一個TopN 的查詢,其中 N 代表 TopN 的條目數(shù)
# [AND conditions] :其他的限制條件也可以加上
# 案例
select
id,
et,
vc,
rownum
from
(
select
id,
et,
vc,
row_number() over(
partition by id
order by vc desc
) as rownum
from ws
)
where rownum<=3;
5.9 特殊語法 —— Deduplication去重
去重,也即上文介紹到的TopN 中** row_number = 1** 的場景,但是這里有一點不一樣在于其排序字段一定是時間屬性列,可以降序,不能是其他非時間屬性的普通列。
在 row_number = 1 時,如果排序字段是普通列 planner 會翻譯成 TopN 算子,如果是時間屬性列 planner 會翻譯成 Deduplication,這兩者最終的執(zhí)行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對應(yīng)的優(yōu)化,性能會有很大提升??梢詮膚ebui看出是翻譯成哪種算子。
# 語法
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
## 案例
# 對每個傳感器的水位值去重
select
id,
et,
vc,
rownum
from
(
select
id,
et,
vc,
row_number() over(
partition by id,vc
order by et
) as rownum
from ws
)
where rownum=1;
5.10 聯(lián)結(jié)(Join)查詢
Flink SQL中的聯(lián)結(jié)查詢大體上也可以分為兩類:SQL原生的聯(lián)結(jié)查詢方式,和流處理中特有的聯(lián)結(jié)查詢
常規(guī)聯(lián)結(jié)查詢
常規(guī)聯(lián)結(jié)(Regular Join)是SQL中原生定義的Join方式,是最通用的一類聯(lián)結(jié)操作。它的具體語法與標(biāo)準(zhǔn)SQL的聯(lián)結(jié)完全相同,通過關(guān)鍵字JOIN來聯(lián)結(jié)兩個表,后面用關(guān)鍵字ON來指明聯(lián)結(jié)條件。與標(biāo)準(zhǔn)SQL一致,F(xiàn)link SQL的常規(guī)聯(lián)結(jié)也可以分為內(nèi)聯(lián)結(jié)(INNER JOIN)和外聯(lián)結(jié)(OUTER JOIN),區(qū)別在于結(jié)果中是否包含不符合聯(lián)結(jié)條件的行。Regular Join 包含以下幾種(以 L 作為左流中的數(shù)據(jù)標(biāo)識, R 作為右流中的數(shù)據(jù)標(biāo)識):
Inner Join(Inner Equal Join):流任務(wù)中,只有兩條流 Join 到才輸出,輸出 +[L, R]Left Join(Outer Equal Join):流任務(wù)中,左流數(shù)據(jù)到達之后,無論有沒有 Join 到右流的數(shù)據(jù),都會輸出(Join 到輸出 +[L, R] ,沒 Join 到輸出 +[L, null] ),如果右流之后數(shù)據(jù)到達之后,發(fā)現(xiàn)左流之前輸出過沒有 Join 到的數(shù)據(jù),則會發(fā)起回撤流,先輸出 -[L, null] ,然后輸出 +[L, R]Right Join(Outer Equal Join):有 Left Join 一樣,左表和右表的執(zhí)行邏輯完全相反Full Join(Outer Equal Join):流任務(wù)中,左流或者右流的數(shù)據(jù)到達之后,無論有沒有 Join 到另外一條流的數(shù)據(jù),都會輸出(對右流來說:Join 到輸出 +[L, R] ,沒 Join 到輸出 +[null, R] ;對左流來說:Join 到輸出 +[L, R] ,沒 Join 到輸出 +[L, null] )。如果一條流的數(shù)據(jù)到達之后,發(fā)現(xiàn)之前另一條流之前輸出過沒有 Join 到的數(shù)據(jù),則會發(fā)起回撤流(左流數(shù)據(jù)到達為例:回撤 -[null, R] ,輸出+[L, R] ,右流數(shù)據(jù)到達為例:回撤 -[L, null] ,輸出 +[L, R]
Regular Join 的注意事項:
實時 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 區(qū)別在于, 等值 join數(shù)據(jù) shuffle 策略是 Hash,會按照 Join on 中的等值條件作為 id 發(fā)往對應(yīng)的下游; 非等值 join 數(shù)據(jù) shuffle 策略是 Global,所有數(shù)據(jù)發(fā)往一個并發(fā),按照非等值條件進行關(guān)聯(lián)流的上游是無限的數(shù)據(jù),所以要做到關(guān)聯(lián)的話,F(xiàn)link 會將兩條流的所有數(shù)據(jù)都存儲在 State 中,所以 Flink 任務(wù)的 State 會無限增大,因此你需要為 State 配置合適的 TTL,以防止 State 過大。
## 再準(zhǔn)備一張表用于join
CREATE TABLE ws1 (
id INT,
vc INT,
pt AS PROCTIME(), --處理時間
et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件時間
WATERMARK FOR et AS et - INTERVAL '0.001' SECOND --watermark
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.min' = '3',
'fields.id.max' = '5',
'fields.vc.min' = '1',
'fields.vc.max' = '100'
);
## 等值內(nèi)聯(lián)結(jié)(INNER Equi-JOIN)
# 內(nèi)聯(lián)結(jié)用INNER JOIN來定義,會返回兩表中符合聯(lián)接條件的所有行的組合,也就是所謂的笛卡爾積(Cartesian product)。目前僅支持等值聯(lián)結(jié)條件
SELECT *
FROM ws
INNER JOIN ws1
ON ws.id = ws1.id;
## 等值外聯(lián)結(jié)(OUTER Equi-JOIN)
# Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分別表示會將左側(cè)表、右側(cè)表以及雙側(cè)表中沒有任何匹配的行返回。
SELECT *
FROM ws
LEFT JOIN ws1
ON ws.id = ws1.id;
SELECT *
FROM ws
RIGHT JOIN ws1
ON ws.id = ws1.id;
SELECT *
FROM ws
FULL OUTER JOIN ws1
ON ws.id = ws1.id;
間隔聯(lián)結(jié)查詢
兩條流的Join就對應(yīng)著SQL中兩個表的Join,這是流處理中特有的聯(lián)結(jié)方式。目前Flink SQL還不支持窗口聯(lián)結(jié),而間隔聯(lián)結(jié)則已經(jīng)實現(xiàn)。間隔聯(lián)結(jié)(Interval Join)返回的,同樣是符合約束條件的兩條中數(shù)據(jù)的笛卡爾積。只不過這里的“約束條件”除了常規(guī)的聯(lián)結(jié)條件外,還多了一個時間間隔的限制。具體語法有以下要點:
兩表的聯(lián)結(jié)
間隔聯(lián)結(jié)不需要用JOIN關(guān)鍵字,直接在FROM后將要聯(lián)結(jié)的兩表列出來就可以,用逗號分隔。這與標(biāo)準(zhǔn)SQL中的語法一致,表示一個“交叉聯(lián)結(jié)”(Cross Join),會返回兩表中所有行的笛卡爾積
聯(lián)結(jié)條件
聯(lián)結(jié)條件用WHERE子句來定義,用一個等值表達式描述。交叉聯(lián)結(jié)之后再用WHERE進行條件篩選,效果跟內(nèi)聯(lián)結(jié)INNER JOIN … ON …非常類似
時間間隔限制
我們可以在WHERE子句中,聯(lián)結(jié)條件后用AND追加一個時間間隔的限制條件;做法是提取左右兩側(cè)表中的時間字段,然后用一個表達式來指明兩者需要滿足的間隔限制。具體定義方式有下面三種,這里分別用ltime和rtime表示左右表中的時間字段:
(1)ltime = rtime
(2)ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
(3)ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND
SELECT *
FROM ws,ws1
WHERE ws.id = ws1. id
AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND
維表聯(lián)結(jié)查詢
Lookup Join 其實就是維表 Join,實時獲取外部緩存的 Join,Lookup 的意思就是實時查找。上面說的這幾種 Join 都是流與流之間的 Join,而 Lookup Join 是流與 Redis,Mysql,HBase 這種外部存儲介質(zhì)的 Join。僅支持處理時間字段
表A
JOIN 維度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 別名
ON xx.字段=別名.字段
## 比如維表在mysql,維表join的寫法如下
CREATE TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop102:3306/customerdb',
'table-name' = 'customers'
);
-- order表每來一條數(shù)據(jù),都會去mysql的customers表查找維度數(shù)據(jù)
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
5.11 Order by 和 limit
## 支持 Batch\Streaming,但在實時任務(wù)中一般用的非常少
## 實時任務(wù)中,Order By 子句中必須要有時間屬性字段,并且必須寫在最前面且為升序
SELECT *
FROM ws
ORDER BY et, id desc
SELECT *
FROM ws
LIMIT 3
5.12 SQL Hints
## 在執(zhí)行查詢時,可以在表名后面添加SQL Hints來臨時修改表屬性,對當(dāng)前job生效
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;
5.13 集合操作
## UNION 和 UNION ALL
# UNION:將集合合并并且去重
# UNION ALL:將集合合并,不做去重
(SELECT id FROM ws) UNION (SELECT id FROM ws1);
(SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);
## Intersect 和 Intersect All
# ntersect:交集并且去重
# Intersect ALL:交集不做去重
(SELECT id FROM ws) INTERSECT (SELECT id FROM ws1);
(SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);
## Except 和 Except All
# Except:差集并且去重
# Except ALL:差集不做去重
(SELECT id FROM ws) EXCEPT (SELECT id FROM ws1);
(SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);
# 上述 SQL 在流式任務(wù)中,如果一條左流數(shù)據(jù)先來了,沒有從右流集合數(shù)據(jù)中找到對應(yīng)的數(shù)據(jù)時會直接輸出,當(dāng)右流對應(yīng)數(shù)據(jù)后續(xù)來了之后,會下發(fā)回撤流將之前的數(shù)據(jù)給撤回。這也是一個回撤流
## In 子查詢
# In 子查詢的結(jié)果集只能有一列
SELECT id, vc
FROM ws
WHERE id IN (
SELECT id FROM ws1
)
# 上述 SQL 的 In 子句和之前介紹到的 Inner Join 類似。并且 In 子查詢也會涉及到大狀態(tài)問題,要注意設(shè)置 State 的 TTL
5.14 系統(tǒng)函數(shù)
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/
系統(tǒng)函數(shù)(System Functions)也叫內(nèi)置函數(shù)(Built-in Functions),是在系統(tǒng)中預(yù)先實現(xiàn)好的功能模塊。我們可以通過固定的函數(shù)名直接調(diào)用,實現(xiàn)想要的轉(zhuǎn)換操作。Flink SQL提供了大量的系統(tǒng)函數(shù),幾乎支持所有的標(biāo)準(zhǔn)SQL中的操作,這為我們使用SQL編寫流處理程序提供了極大的方便。Flink SQL中的系統(tǒng)函數(shù)又主要可以分為兩大類:標(biāo)量函數(shù)(Scalar Functions)和聚合函數(shù)(Aggregate Functions)。
標(biāo)量函數(shù)(Scalar Functions)
標(biāo)量函數(shù)指的就是只對輸入數(shù)據(jù)做轉(zhuǎn)換操作、返回一個值的函數(shù)。標(biāo)量函數(shù)是最常見、也最簡單的一類系統(tǒng)函數(shù),數(shù)量非常龐大,很多在標(biāo)準(zhǔn)SQL中也有定義。所以我們這里只對一些常見類型列舉部分函數(shù),做一個簡單概述,具體應(yīng)用可以查看官網(wǎng)的完整函數(shù)列表
比較函數(shù)(Comparison Functions)
比較函數(shù)其實就是一個比較表達式,用來判斷兩個值之間的關(guān)系,返回一個布爾類型的值。這個比較表達式可以是用 <、>、= 等符號連接兩個值,也可以是用關(guān)鍵字定義的某種判斷。例如:
(1)value1 = value2 判斷兩個值相等;
(2)value1 <> value2 判斷兩個值不相等
(3)value IS NOT NULL 判斷value不為空
邏輯函數(shù)(Logical Functions)
邏輯函數(shù)就是一個邏輯表達式,也就是用與(AND)、或(OR)、非(NOT)將布爾類型的值連接起來,也可以用判斷語句(IS、IS NOT)進行真值判斷;返回的還是一個布爾類型的值。例如:
(1)boolean1 OR boolean2 布爾值boolean1與布爾值boolean2取邏輯或
(2)boolean IS FALSE 判斷布爾值boolean是否為false
(3)NOT boolean 布爾值boolean取邏輯非
算術(shù)函數(shù)(Arithmetic Functions)
進行算術(shù)計算的函數(shù),包括用算術(shù)符號連接的運算,和復(fù)雜的數(shù)學(xué)運算。例如:
(1)numeric1 + numeric2 兩數(shù)相加
(2)POWER(numeric1, numeric2) 冪運算,取數(shù)numeric1的numeric2次方
(3)RAND() 返回(0.0, 1.0)區(qū)間內(nèi)的一個double類型的偽隨機數(shù)
字符串函數(shù)(String Functions)
進行字符串處理的函數(shù)。例如:
(1)string1 || string2 兩個字符串的連接
(2)UPPER(string) 將字符串string轉(zhuǎn)為全部大寫
(3)CHAR_LENGTH(string) 計算字符串string的長度
時間函數(shù)(Temporal Functions)
進行與時間相關(guān)操作的函數(shù)。例如:
(1)DATE string 按格式"yyyy-MM-dd"解析字符串string,返回類型為SQL Date
(2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回類型為SQL timestamp
(3)CURRENT_TIME 返回本地時區(qū)的當(dāng)前時間,類型為SQL time(與LOCALTIME等價)
(4)INTERVAL string range 返回一個時間間隔。
聚合函數(shù)(Aggregate Functions)
聚合函數(shù)是以表中多個行作為輸入,提取字段進行聚合操作的函數(shù),會將唯一的聚合值作為結(jié)果返回。聚合函數(shù)應(yīng)用非常廣泛,不論分組聚合、窗口聚合還是開窗(Over)聚合,對數(shù)據(jù)的聚合操作都可以用相同的函數(shù)來定義。標(biāo)準(zhǔn)SQL中常見的聚合函數(shù)Flink SQL都是支持的,目前也在不斷擴展,為流處理應(yīng)用提供更強大的功能。例如:
(1)COUNT(*) 返回所有行的數(shù)量,統(tǒng)計個數(shù)。
(2)SUM([ ALL | DISTINCT ] expression) 對某個字段進行求和操作。默認情況下省略了關(guān)鍵字ALL,表示對所有行求和;如果指定DISTINCT,則會對數(shù)據(jù)進行去重,每個值只疊加一次。
(3)RANK() 返回當(dāng)前值在一組值中的排名。
(4)ROW_NUMBER() 對一組值排序后,返回當(dāng)前值的行號。
其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。
5.15 Module操作
目前 Flink 包含了以下三種 Module:
CoreModule:CoreModule 是 Flink 內(nèi)置的 Module,其包含了目前 Flink 內(nèi)置的所有 UDF,F(xiàn)link 默認開啟的 Module 就是 CoreModule,我們可以直接使用其中的 UDFHiveModule:HiveModule 可以將 Hive 內(nèi)置函數(shù)作為 Flink 的系統(tǒng)函數(shù)提供給 SQL\Table API 用戶進行使用,比如 get_json_object 這類 Hive 內(nèi)置函數(shù)(Flink 默認的 CoreModule 是沒有的)用戶自定義 Module:用戶可以實現(xiàn) Module 接口實現(xiàn)自己的 UDF 擴展 Module
## 使用 LOAD 子句去加載 Flink SQL 體系內(nèi)置的或者用戶自定義的 Module,UNLOAD 子句去卸載 Flink SQL 體系內(nèi)置的或者用戶自定義的 Module
-- 加載
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸載
UNLOAD MODULE module_name
-- 查看
SHOW MODULES;
SHOW FULL MODULES;
在 Flink 中,Module 可以被 加載、啟用 、禁用 、卸載 Module,當(dāng)加載Module 之后,默認就是開啟的。同時支持多個 Module 的,并且根據(jù)加載 Module 的順序去按順序查找和解析 UDF,先查到的先解析使用。此外,F(xiàn)link 只會解析已經(jīng)啟用了的 Module。那么當(dāng)兩個 Module 中出現(xiàn)兩個同名的函數(shù)且都啟用時, Flink 會根據(jù)加載 Module 的順序進行解析,結(jié)果就是會使用順序為第一個的 Module 的 UDF,可以使用下面語法更改順序:USE MODULE hive,core;
USE是啟用module,沒有被use的為禁用(禁用不是卸載),除此之外還可以實現(xiàn)調(diào)整順序的效果。上面的語句會將 Hive Module 設(shè)為第一個使用及解析的 Module。
## 舉例
# 加載官方已經(jīng)提供的的 Hive Module,將 Hive 已有的內(nèi)置函數(shù)作為 Flink 的內(nèi)置函數(shù)。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一個 HiveModule
# https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hive/overview/
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar /opt/module/flink/lib/
# 注意:拷貝hadoop的包,解決依賴沖突問題
cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink/lib/
# 重啟flink集群和sql-client
# 啟動Flink
/opt/module/flink/bin/yarn-session.sh -d
# 啟動Flink的sql-client
/opt/module/flink/bin/sql-client.sh embedded -s yarn-session
# 加載hive module
-- hive-connector內(nèi)置了hive module,提供了hive自帶的系統(tǒng)函數(shù)
load module hive with ('hive-version'='3.1.3');
show modules;
show functions;
-- 可以調(diào)用hive的split函數(shù)
select split('a,b', ',');
6、常用Connector讀寫
DataGen和Print都是一種connector,其他connector參考官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/overview/
6.1 Kafka
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/
首先添加kafka連接器依賴,將flink-sql-connector-kafka-1.17.0.jar上傳到flink的lib目錄下,重啟yarn-session、sql-client
## 創(chuàng)建Kafka的映射表
CREATE TABLE t1(
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
--列名和元數(shù)據(jù)名一致可以省略 FROM 'xxxx', VIRTUAL表示只讀
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop103:9092',
'properties.group.id' = 'atguigu',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
'scan.startup.mode' = 'earliest-offset',
-- fixed為flink實現(xiàn)的分區(qū)器,一個并行度只寫往kafka一個分區(qū)
'sink.partitioner' = 'fixed',
'topic' = 'ws1',
'format' = 'json'
)
## 插入Kafka表
insert into t1(id,ts,vc) select * from source
## 查詢Kafka表
select * from t1
upsert-kafka表
如果當(dāng)前表存在更新操作,那么普通的kafka連接器將無法滿足,此時可以使用Upsert Kafka連接器。Upsert Kafka 連接器支持以 upsert 方式從 Kafka topic 中讀取數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka topic。
作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個更新或刪除事件。更準(zhǔn)確地說,數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一個 value 的 UPDATE,如果有這個 key(如果不存在相應(yīng)的 key,則該更新被視為 INSERT)。用表來類比,changelog 流中的數(shù)據(jù)記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因為任何具有相同 key 的現(xiàn)有行都被覆蓋。另外,value 為空的消息將會被視作為 DELETE 消息。
作為 sink,upsert-kafka 連接器可以消費 changelog 流。它會將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入,并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對數(shù)據(jù)進行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中
# 創(chuàng)建upsert-kafka的映射表(必須定義主鍵)
CREATE TABLE t2(
id int ,
sumVC int ,
primary key (id) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'hadoop102:9092',
'topic' = 'ws2',
'key.format' = 'json',
'value.format' = 'json'
)
# 插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id
# 查詢upsert-kafka表
# upsert-kafka 無法從指定的偏移量讀取,只會從主題的源讀取。如此,才知道整個數(shù)據(jù)的更新過程。并且通過 -U,+U,+I 等符號來顯示數(shù)據(jù)的變化過程
select * from t2
6.2 File
## 創(chuàng)建FileSystem映射表
CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
'connector' = 'filesystem',
'path' = 'hdfs://hadoop102:8020/data/t3',
'format' = 'csv'
)
## 寫入
insert into t3 select * from source
## 查詢
select * from t3 where id = '1
# 如果報錯java.lang.classNotFoundException: org.apache.f1ink.table.planner.delegation.DialectFactory
# 因為之前l(fā)ib下放了sql-hive的連接器jar包,解決方案有兩種
# 將hive的連接器jar包挪走,重啟yarn-session、sql-client
# 和之前操作類似替換planner的jar包
6.3 JDBC(MySQL)
Flink在將數(shù)據(jù)寫入外部數(shù)據(jù)庫時使用DDL中定義的主鍵。如果定義了主鍵,則連接器以upsert模式操作,否則,連接器以追加模式操作。
在upsert模式下,F(xiàn)link會根據(jù)主鍵插入新行或更新現(xiàn)有行,F(xiàn)link這樣可以保證冪等性。為了保證輸出結(jié)果符合預(yù)期,建議為表定義主鍵,并確保主鍵是底層數(shù)據(jù)庫表的唯一鍵集或主鍵之一。在追加模式下,F(xiàn)link將所有記錄解釋為INSERT消息,如果底層數(shù)據(jù)庫中發(fā)生了主鍵或唯一約束違反,則INSERT操作可能會失敗
# mysql的test庫中建表
CREATE TABLE `ws2` (
`id` int(11) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
添加JDBC連接器依賴,上傳jdbc連接器的jar包和mysql的連接驅(qū)動包到flink/lib下:
flink-connector-jdbc-1.17.jarmysql-connector-j-8.0.31.jar
# 創(chuàng)建JDBC映射表
CREATE TABLE t4
(
id INT,
ts BIGINT,
vc INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',
'username' = 'root',
'password' = '000000',
'connection.max-retry-timeout' = '60s',
'table-name' = 'ws2',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '5s',
'sink.max-retries' = '3',
'sink.parallelism' = '1'
);
# 查詢
select * from t4
# 寫入
insert into t4 select * from source
7、sql-client 中使用 savepoint
## 提交一個insert作業(yè),可以給作業(yè)設(shè)置名稱
INSERT INTO sink select * from source;
## 查看job列表
SHOW JOBS;
## 停止作業(yè),觸發(fā)savepoint
SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';
STOP JOB '228d70913eab60dda85c5e7f78b5782c' WITH SAVEPOINT;
## 從savepoint恢復(fù)
-- 設(shè)置從savepoint恢復(fù)的路徑
SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-37f5e6-0013a2874f0a';
-- 之后直接提交sql,就會從savepoint恢復(fù)
--允許跳過無法還原的保存點狀態(tài)
set 'execution.savepoint.ignore-unclaimed-state' = 'true';
## 恢復(fù)后重置路徑
# 指定execution.savepoint.path后,將影響后面執(zhí)行的所有DML語句,可以使用RESET命令重置這個配置選項
RESET execution.savepoint.path;
# 如果出現(xiàn)reset沒生效的問題,可能是個bug,我們可以退出sql-client,再重新進,不需要重啟flink的集群。
8、Catalog
Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。元數(shù)據(jù)可以是臨時的,例如臨時表、UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive MetaStore 中的元數(shù)據(jù)。Catalog 提供了一個統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢語句中來訪問。
Catalog 允許用戶引用其數(shù)據(jù)存儲系統(tǒng)中現(xiàn)有的元數(shù)據(jù),并自動將其映射到 Flink 的相應(yīng)元數(shù)據(jù)。例如,F(xiàn)link 可以直接使用 Hive MetaStore 中的表的元數(shù)據(jù),不必在Flink中手動重寫ddl,也可以將 Flink SQL 中的元數(shù)據(jù)存儲到 Hive MetaStore 中。Catalog 極大地簡化了用戶開始使用 Flink 的步驟,并極大地提升了用戶體驗
8.1 Catalog類型
GenericInMemoryCatalog:基于內(nèi)存實現(xiàn)的 Catalog,所有元數(shù)據(jù)只在session 的生命周期(即一個 Flink 任務(wù)一次運行生命周期內(nèi))內(nèi)可用。默認自動創(chuàng)建,會有名為“default_catalog”的內(nèi)存Catalog,這個Catalog默認只有一個名為“default_database”的數(shù)據(jù)庫。JdbcCatalog:JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫。Postgres Catalog和MySQL Catalog是目前僅有的兩種JDBC Catalog實現(xiàn),將元數(shù)據(jù)存儲在數(shù)據(jù)庫中。HiveCatalog:有兩個用途,一是單純作為 Flink 元數(shù)據(jù)的持久化存儲,二是作為讀寫現(xiàn)有 Hive 元數(shù)據(jù)的接口。注意:Hive MetaStore 以小寫形式存儲所有元數(shù)據(jù)對象名稱。Hive Metastore以小寫形式存儲所有元對象名稱,而 GenericInMemoryCatalog會區(qū)分大小寫。用戶自定義 Catalog:用戶可以實現(xiàn) Catalog 接口實現(xiàn)自定義 Catalog。從Flink1.16開始引入了用戶類加載器,通過CatalogFactory.Context#getClassLoader訪問,否則會報錯ClassNotFoundException
8.2 JdbcCatalog(MySQL)
JdbcCatalog不支持建表,只是打通flink與mysql的連接,可以去讀寫mysql現(xiàn)有的庫表,然后需要把1.17的jdbc包放到對應(yīng)目錄
# https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-jdbc/1.17-SNAPSHOT/
cp flink-connector-jdbc-1.17.jar /opt/module/flink/lib/
cp mysql-connector-j-8.0.31.jar /opt/module/flink/lib/
# 重啟flink集群和sql-client
# 創(chuàng)建Catalog
# JdbcCatalog支持以下選項:
# name:必需,Catalog名稱。
# default-database:必需,連接到的默認數(shù)據(jù)庫。
# username: 必需,Postgres/MySQL帳戶的用戶名。
# password:必需,該帳號的密碼。
# base-url:必需,數(shù)據(jù)庫的jdbc url(不包含數(shù)據(jù)庫名)
# 對于Postgres Catalog,是"jdbc:postgresql://
# 對于MySQL Catalog,是"jdbc: mysql://
CREATE CATALOG my_jdbc_catalog WITH(
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'root',
'password' = '000000',
'base-url' = 'jdbc:mysql://hadoop102:3306'
);
# 查看Catalog
SHOW CATALOGS;
## 查看當(dāng)前的CATALOG
SHOW CURRENT CATALOG;
# 使用指定Catalog
USE CATALOG my_jdbc_catalog;
## 查看當(dāng)前的CATALOG
SHOW CURRENT CATALOG;
8.3 HiveCatalog
# 上傳所需jar包到lib下
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink/lib/
cp mysql-connector-j-8.0.31.jar /opt/module/flink/lib/
# 更換planner依賴
# 只有在使用Hive方言或HiveServer2時才需要這樣額外的計劃器jar移動,但這是Hive集成的推薦設(shè)置
mv /opt/module/flink/opt/flink-table-planner_2.12-1.17.1.jar /opt/module/flink/lib/flink-table-planner_2.12-1.17.0.jar
mv /opt/module/flink/lib/flink-table-planner-loader-1.17.1.jar /opt/module/flink/opt/flink-table-planner-loader-1.17.0.jar
# 重啟flink集群和sql-client
# 啟動外置的hive metastore服務(wù)
# Hive metastore必須作為獨立服務(wù)運行,也就是hive-site中必須配置hive.metastore.uris
hive --service metastore &
創(chuàng)建Catalog
配置項必需默認值類型說明typeYes(none)StringCatalog類型,創(chuàng)建HiveCatalog時必須設(shè)置為’hive’。nameYes(none)StringCatalog的唯一名稱hive-conf-dirNo(none)String包含hive -site.xml的目錄,需要Hadoop文件系統(tǒng)支持。如果沒指定hdfs協(xié)議,則認為是本地文件系統(tǒng)。如果不指定該選項,則在類路徑中搜索hive-site.xml。default-databaseNodefaultStringHive Catalog使用的默認數(shù)據(jù)庫hive-versionNo(none)StringHiveCatalog能夠自動檢測正在使用的Hive版本。建議不要指定Hive版本,除非自動檢測失敗。hadoop-conf-dirNo(none)StringHadoop conf目錄的路徑。只支持本地文件系統(tǒng)路徑。設(shè)置Hadoop conf的推薦方法是通過HADOOP_CONF_DIR環(huán)境變量。只有當(dāng)環(huán)境變量不適合你時才使用該選項,例如,如果你想分別配置每個HiveCatalog。
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/module/hive/conf'
);
# 查看Catalog
SHOW CATALOGS;
--查看當(dāng)前的CATALOG
SHOW CURRENT CATALOG;
# 使用指定Catalog
USE CATALOG myhive;
--查看當(dāng)前的CATALOG
SHOW CURRENT CATALOG;
# 建表,退出sql-client重進,查看catalog和表還在
## 讀寫Hive表
SHOW DATABASES; -- 可以看到hive的數(shù)據(jù)庫
USE test; -- 可以切換到hive的數(shù)據(jù)庫
SHOW TABLES; -- 可以看到hive的表
SELECT * from ws; --可以讀取hive表
INSERT INTO ws VALUES(1,1,1); -- 可以寫入hive表
# 斷開后可以持久化保存
9、代碼中使用FlinkSQL
9.1 環(huán)境準(zhǔn)備
引入相關(guān)依賴
這里的依賴是一個Java的“橋接器”(bridge),主要就是負責(zé)Table API和下層DataStream API的連接支持,按照不同的語言分為Java版和Scala版。如果我們希望在本地的集成開發(fā)環(huán)境(IDE)里運行Table API和SQL,還需要引入以下依賴:
9.2 創(chuàng)建表環(huán)境
對于Flink這樣的流處理框架來說,數(shù)據(jù)流和表在結(jié)構(gòu)上還是有所區(qū)別的。所以使用Table API和SQL需要一個特別的運行時環(huán)境,這就是所謂的“表環(huán)境”(TableEnvironment)。它主要負責(zé):
注冊Catalog和表;執(zhí)行 SQL 查詢;注冊用戶自定義函數(shù)(UDF);DataStream 和表之間的轉(zhuǎn)換。
每個表和SQL的執(zhí)行,都必須綁定在一個表環(huán)境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口類,可以通過調(diào)用靜態(tài)的create()方法來創(chuàng)建一個表環(huán)境實例。方法需要傳入一個環(huán)境的配置參數(shù)EnvironmentSettings,它可以指定當(dāng)前表環(huán)境的執(zhí)行模式和計劃器(planner)。執(zhí)行模式有批處理和流處理兩種選擇,默認是流處理模式;計劃器默認使用blink planner。
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 使用流處理模式
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
// 對于流處理場景,其實默認配置就完全夠用了。所以我們也可以用另一種更加簡單的方式來創(chuàng)建表環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 這里我們引入了一個“流式表環(huán)境”(StreamTableEnvironment),它是繼承自TableEnvironment的子接口。
// 調(diào)用它的create()方法,只需要直接將當(dāng)前的流執(zhí)行環(huán)境(StreamExecutionEnvironment)傳入,就可以創(chuàng)建出對應(yīng)的流式表環(huán)境了
9.3 創(chuàng)建表
連接器表(Connector Tables)
最直觀的創(chuàng)建表的方式,就是通過連接器(connector)連接到一個外部系統(tǒng),然后定義出對應(yīng)的表結(jié)構(gòu)。在代碼中,我們可以調(diào)用表環(huán)境的executeSql()方法,可以傳入一個DDL作為參數(shù)執(zhí)行SQL操作。這里我們傳入一個CREATE語句進行表的創(chuàng)建,并通過WITH關(guān)鍵字指定連接到外部系統(tǒng)的連接器
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
虛擬表(Virtual Tables)
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
// 這里調(diào)用了表環(huán)境的sqlQuery()方法,直接傳入一條SQL語句作為參數(shù)執(zhí)行查詢,得到的結(jié)果是一個Table對象。
// Table是Table API中提供的核心接口類,就代表了一個Java中定義的表實例。
// 由于newTable是一個Table對象,并沒有在表環(huán)境中注冊;所以如果希望直接在SQL中使用,我們還需要將這個中間結(jié)果表注冊到環(huán)境中
tableEnv.createTemporaryView("NewTable", newTable);
// 類似于視圖
9.4 表的查詢
執(zhí)行SQL進行查詢
// 創(chuàng)建表環(huán)境
TableEnvironment tableEnv = ...;
// 創(chuàng)建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
// 查詢用戶Alice的點擊事件,并提取表中前兩個字段
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user, url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
// 分組
Table urlCountTable = tableEnv.sqlQuery(
"SELECT user, COUNT(url) " +
"FROM EventTable " +
"GROUP BY user "
);
// 直接將查詢的結(jié)果寫入到已經(jīng)注冊的表
// 注冊表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 將查詢結(jié)果輸出到OutputTable中
tableEnv.executeSql (
"INSERT INTO OutputTable " +
"SELECT user, url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
調(diào)用Table API進行查詢
// 基于環(huán)境中已注冊的表,可以通過表環(huán)境的from()方法非常容易地得到一個Table對象
Table eventTable = tableEnv.from("EventTable");
// 傳入的參數(shù)就是注冊好的表名。注意這里eventTable是一個Table對象,而EventTable是在環(huán)境中注冊的表名。
// 得到Table對象之后,就可以調(diào)用API進行各種轉(zhuǎn)換操作了,得到的是一個新的Table對象
Table maryClickTable = eventTable
.where($("user").isEqual("Alice"))
.select($("url"), $("user"));
// 這里每個方法的參數(shù)都是一個“表達式”(Expression),用方法調(diào)用的形式直觀地說明了想要表達的內(nèi)容;“$”符號用來指定表中的一個字段。上面的代碼和直接執(zhí)行SQL是等效的
兩種API的結(jié)合使用
無論是調(diào)用Table API還是執(zhí)行SQL,得到的結(jié)果都是一個Table對象;所以這兩種API的查詢可以很方便地結(jié)合在一起
無論是那種方式得到的Table對象,都可以繼續(xù)調(diào)用Table API進行查詢轉(zhuǎn)換;如果想要對一個表執(zhí)行SQL操作(用FROM關(guān)鍵字引用),必須先在環(huán)境中對它進行注冊。所以我們可以通過創(chuàng)建虛擬表的方式實現(xiàn)兩者的轉(zhuǎn)換
tableEnv.createTemporaryView("MyTable", myTable);
9.5 輸出表
表的創(chuàng)建和查詢,就對應(yīng)著流處理中的讀取數(shù)據(jù)源(Source)和轉(zhuǎn)換(Transform);而最后一個步驟Sink,也就是將結(jié)果數(shù)據(jù)輸出到外部系統(tǒng),就對應(yīng)著表的輸出操作。在代碼上,輸出一張表最直接的方法,就是調(diào)用Table的方法executeInsert()方法將一個 Table寫入到注冊過的表中,方法傳入的參數(shù)就是注冊的表名。
// 注冊表,用于輸出數(shù)據(jù)到外部系統(tǒng)
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 經(jīng)過查詢轉(zhuǎn)換,得到結(jié)果表
Table result = ...
// 將結(jié)果表寫入已注冊的輸出表中
result.executeInsert("OutputTable");
在底層,表的輸出是通過將數(shù)據(jù)寫入到TableSink來實現(xiàn)的。TableSink是Table API中提供的一個向外部系統(tǒng)寫入數(shù)據(jù)的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存儲數(shù)據(jù)庫(比如JDBC、Elasticsearch)和消息隊列(比如Kafka)
public class SqlDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 1.創(chuàng)建表環(huán)境
// 1.1 寫法一:
// EnvironmentSettings settings = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .build();
// StreamTableEnvironment tableEnv = TableEnvironment.create(settings);
// 1.2 寫法二
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2.創(chuàng)建表
tableEnv.executeSql("CREATE TABLE source ( \n" +
" id INT, \n" +
" ts BIGINT, \n" +
" vc INT\n" +
") WITH ( \n" +
" 'connector' = 'datagen', \n" +
" 'rows-per-second'='1', \n" +
" 'fields.id.kind'='random', \n" +
" 'fields.id.min'='1', \n" +
" 'fields.id.max'='10', \n" +
" 'fields.ts.kind'='sequence', \n" +
" 'fields.ts.start'='1', \n" +
" 'fields.ts.end'='1000000', \n" +
" 'fields.vc.kind'='random', \n" +
" 'fields.vc.min'='1', \n" +
" 'fields.vc.max'='100'\n" +
");\n");
tableEnv.executeSql("CREATE TABLE sink (\n" +
" id INT, \n" +
" sumVC INT \n" +
") WITH (\n" +
"'connector' = 'print'\n" +
");\n");
// TODO 3.執(zhí)行查詢
// 3.1 使用sql進行查詢
// Table table = tableEnv.sqlQuery("select id,sum(vc) as sumVC from source where id>5 group by id ;");
// 把table對象,注冊成表名
// tableEnv.createTemporaryView("tmp", table);
// tableEnv.sqlQuery("select * from tmp where id > 7");
// 3.2 用table api來查詢
Table source = tableEnv.from("source");
Table result = source
.where($("id").isGreater(5))
.groupBy($("id"))
.aggregate($("vc").sum().as("sumVC"))
.select($("id"), $("sumVC"));
// TODO 4.輸出表
// 4.1 sql用法
// tableEnv.executeSql("insert into sink select * from tmp");
// 4.2 tableapi用法
result.executeInsert("sink");
}
}
9.6 表和流的轉(zhuǎn)換
將流(DataStream)轉(zhuǎn)換成表(Table)
調(diào)用fromDataStream()方法,想要將一個DataStream轉(zhuǎn)換成表很簡單,可以通過調(diào)用表環(huán)境的fromDataStream()方法來實現(xiàn),返回的就是一個Table對象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取表環(huán)境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 讀取數(shù)據(jù)源
SingleOutputStreamOperator
// 將數(shù)據(jù)流轉(zhuǎn)換成表
Table sensorTable = tableEnv.fromDataStream(sensorDS);
// 由于流中的數(shù)據(jù)本身就是定義好的POJO類型WaterSensor,所以我們將流轉(zhuǎn)換成表之后,每一行數(shù)據(jù)就對應(yīng)著一個WaterSensor,而表中的列名就對應(yīng)著WaterSensor中的屬性。
// 另外,我們還可以在fromDataStream()方法中增加參數(shù),用來指定提取哪些屬性作為表中的字段名,并可以任意指定位置
// 提取Event中的timestamp和url作為表中的列
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));
// 也可以通過表達式的as()方法對字段進行重命名
// 將timestamp字段重命名為ts
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));
調(diào)用createTemporaryView()方法,調(diào)用fromDataStream()方法簡單直觀,可以直接實現(xiàn)DataStream到Table的轉(zhuǎn)換;不過如果我們希望直接在SQL中引用這張表,就還需要調(diào)用表環(huán)境的createTemporaryView()方法來創(chuàng)建虛擬視圖了。
對于這種場景,也有一種更簡潔的調(diào)用方式。我們可以直接調(diào)用createTemporaryView()方法創(chuàng)建虛擬表,傳入的兩個參數(shù),第一個依然是注冊的表名,而第二個可以直接就是DataStream。之后仍舊可以傳入多個參數(shù),用來指定表中的字段
tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));
// 接下來就可以直接在SQL中引用表sensorTable了
將表(Table)轉(zhuǎn)換成流(DataStream)
// 調(diào)用toDataStream()方法
// 經(jīng)查詢轉(zhuǎn)換得到的表aliceClickTable轉(zhuǎn)換成流打印輸出
tableEnv.toDataStream(table).print();
// 調(diào)用toChangelogStream()方法
Table table = tableEnv.sqlQuery(
"SELECT id, sum(vc) " +
"FROM source " +
"GROUP BY id "
);
// 將表轉(zhuǎn)換成更新日志流
tableEnv.toChangelogStream(table).print();
支持的數(shù)據(jù)類型
(1)原子類型
在Flink中,基礎(chǔ)數(shù)據(jù)類型(Integer、Double、String)和通用數(shù)據(jù)類型(也就是不可再拆分的數(shù)據(jù)類型)統(tǒng)一稱作“原子類型”。原子類型的DataStream,轉(zhuǎn)換之后就成了只有一列的Table,列字段(field)的數(shù)據(jù)類型可以由原子類型推斷出。另外,還可以在fromDataStream()方法里增加參數(shù),用來重新命名列字段
StreamTableEnvironment tableEnv = ...;
DataStream
// 將數(shù)據(jù)流轉(zhuǎn)換成動態(tài)表,動態(tài)表只有一個字段,重命名為myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
(2)Tuple類型
當(dāng)原子類型不做重命名時,默認的字段名就是“f0”,容易想到,這其實就是將原子類型看作了一元組Tuple1的處理結(jié)果。Table支持Flink中定義的元組類型Tuple,對應(yīng)在表中字段名默認就是元組中元素的屬性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段還可以通過調(diào)用表達式的as()方法來進行重命名。
StreamTableEnvironment tableEnv = ...;
DataStream
// 將數(shù)據(jù)流轉(zhuǎn)換成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 將數(shù)據(jù)流轉(zhuǎn)換成包含f0和f1字段的表,在表中f0和f1位置交換
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// 將f1字段命名為myInt,f0命名為myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
(3)POJO 類型
Flink也支持多種數(shù)據(jù)類型組合成的“復(fù)合類型”,最典型的就是簡單Java對象(POJO 類型)。由于POJO中已經(jīng)定義好了可讀性強的字段名,這種類型的數(shù)據(jù)流轉(zhuǎn)換成Table就顯得無比順暢了。將POJO類型的DataStream轉(zhuǎn)換成Table,如果不指定字段名稱,就會直接使用原始 POJO 類型中的字段名稱。POJO中的字段同樣可以被重新排序、提卻和重命名。
StreamTableEnvironment tableEnv = ...;
DataStream
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
(4)Row類型
Flink中還定義了一個在關(guān)系型表中更加通用的數(shù)據(jù)類型——行(Row),它是Table中數(shù)據(jù)的基本組織形式。Row類型也是一種復(fù)合類型,它的長度固定,而且無法直接推斷出每個字段的類型,所以在使用時必須指明具體的類型信息;我們在創(chuàng)建Table時調(diào)用的CREATE語句就會將所有的字段名稱和類型指定,這在Flink中被稱為表的“模式結(jié)構(gòu)”(Schema)。
綜合應(yīng)用示例
public class TableStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 2L, 2),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3),
new WaterSensor("s3", 4L, 4)
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 1. 流轉(zhuǎn)表
Table sensorTable = tableEnv.fromDataStream(sensorDS);
tableEnv.createTemporaryView("sensor", sensorTable);
Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2");
Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id");
// TODO 2. 表轉(zhuǎn)流
// 2.1 追加流
tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
// 2.2 changelog流(結(jié)果需要更新)
tableEnv.toChangelogStream(sumTable ).print("sum");
// 只要代碼中調(diào)用了 DataStreamAPI,就需要 execute,否則不需要
env.execute();
}
}
9.7 自定義函數(shù)(UDF)
系統(tǒng)函數(shù)盡管龐大,也不可能涵蓋所有的功能;如果有系統(tǒng)函數(shù)不支持的需求,我們就需要用自定義函數(shù)(User Defined Functions,UDF)來實現(xiàn)了。Flink的Table API和SQL提供了多種自定義函數(shù)的接口,以抽象類的形式定義。當(dāng)前UDF主要有以下幾類:
標(biāo)量函數(shù)(Scalar Functions):將輸入的標(biāo)量值轉(zhuǎn)換成一個新的標(biāo)量值;表函數(shù)(Table Functions):將標(biāo)量值轉(zhuǎn)換成一個或多個新的行數(shù)據(jù),也就是擴展成一個表;聚合函數(shù)(Aggregate Functions):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個新的標(biāo)量值;表聚合函數(shù)(Table Aggregate Functions):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個或多個新的行數(shù)據(jù)。
整體調(diào)用流程
// (1)注冊函數(shù)
// 注冊函數(shù)時需要調(diào)用表環(huán)境的createTemporarySystemFunction()方法,傳入注冊的函數(shù)名以及UDF類的Class對象
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
// (2)使用Table API調(diào)用函數(shù)
// 在Table API中,需要使用call()方法來調(diào)用自定義函數(shù):
ableEnv.from("MyTable").select(call("MyFunction", $("myField")));
// 這里call()方法有兩個參數(shù),一個是注冊好的函數(shù)名MyFunction,另一個則是函數(shù)調(diào)用時本身的參數(shù)。這里我們定義MyFunction在調(diào)用時,需要傳入的參數(shù)是myField字段
// (3)在SQL中調(diào)用函數(shù)
// 當(dāng)我們將函數(shù)注冊為系統(tǒng)函數(shù)之后,在SQL中的調(diào)用就與內(nèi)置系統(tǒng)函數(shù)完全一樣了
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
標(biāo)量函數(shù)(Scalar Functions)
想要實現(xiàn)自定義的標(biāo)量函數(shù),我們需要自定義一個類來繼承抽象類ScalarFunction,并實現(xiàn)叫作eval() 的求值方法。標(biāo)量函數(shù)的行為就取決于求值方法的定義,它必須是公有的(public),而且名字必須是eval。求值方法eval可以重載多次,任何數(shù)據(jù)類型都可作為求值方法的參數(shù)和返回值類型。
這里需要特別說明的是,ScalarFunction抽象類中并沒有定義eval()方法,所以我們不能直接在代碼中重寫(override);但Table API的框架底層又要求了求值方法必須名字為eval()。
public class MyScalarFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 2L, 2),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3),
new WaterSensor("s3", 4L, 4)
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table sensorTable = tableEnv.fromDataStream(sensorDS);
tableEnv.createTemporaryView("sensor", sensorTable);
// TODO 2.注冊函數(shù)
tableEnv.createTemporaryFunction("HashFunction", HashFunction.class);
// TODO 3.調(diào)用 自定義函數(shù)
// 3.1 sql用法
// tableEnv.sqlQuery("select HashFunction(id) from sensor")
// .execute() // 調(diào)用了 sql的execute,就不需要 env.execute()
// .print();
// 3.2 table api用法
sensorTable
.select(call("HashFunction",$("id")))
.execute()
.print();
}
// TODO 1.定義 自定義函數(shù)的實現(xiàn)類
public static class HashFunction extends ScalarFunction{
// 接受任意類型的輸入,返回 INT型輸出
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
}
這里我們自定義了一個ScalarFunction,實現(xiàn)了eval()求值方法,將任意類型的對象傳入,得到一個Int類型的哈希值返回。當(dāng)然,具體的求哈希操作就省略了,直接調(diào)用對象的hashCode()方法即可。
另外注意,由于Table API在對函數(shù)進行解析時需要提取求值方法參數(shù)的類型引用,所以我們用DataTypeHint(inputGroup = InputGroup.ANY)對輸入?yún)?shù)的類型做了標(biāo)注,表示eval的參數(shù)可以是任意類型
表函數(shù)(Table Functions)
跟標(biāo)量函數(shù)一樣,表函數(shù)的輸入?yún)?shù)也可以是 0個、1個或多個標(biāo)量值;不同的是,它可以返回任意多行數(shù)據(jù)?!岸嘈袛?shù)據(jù)”事實上就構(gòu)成了一個表,所以“表函數(shù)”可以認為就是返回一個表的函數(shù),這是一個“一對多”的轉(zhuǎn)換關(guān)系。之前我們介紹過的窗口TVF,本質(zhì)上就是表函數(shù)。
public class MyTableFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
"hello flink",
"hello world hi",
"hello java"
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));
tableEnv.createTemporaryView("str", sensorTable);
// TODO 2.注冊函數(shù)
tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);
// TODO 3.調(diào)用 自定義函數(shù)
// 3.1 交叉聯(lián)結(jié)
tableEnv
// 3.1 交叉聯(lián)結(jié)
// .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")
// 3.2 帶 on true 條件的 左聯(lián)結(jié)
// .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")
// 重命名側(cè)向表中的字段
.sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words)) as T(newWord,newLength) on true")
.execute()
.print();
}
// TODO 1.繼承 TableFunction<返回的類型>
// 類型標(biāo)注: Row包含兩個字段:word和length
@FunctionHint(output = @DataTypeHint("ROW
public static class SplitFunction extends TableFunction
// 返回是 void,用 collect方法輸出
public void eval(String str) {
for (String word : str.split(" ")) {
collect(Row.of(word, word.length()));
}
}
}
}
這里我們直接將表函數(shù)的輸出類型定義成了ROW,這就是得到的側(cè)向表中的數(shù)據(jù)類型;每行數(shù)據(jù)轉(zhuǎn)換后也只有一行。我們分別用交叉聯(lián)結(jié)和左聯(lián)結(jié)兩種方式在SQL中進行了調(diào)用,還可以對側(cè)向表的中字段進行重命名
聚合函數(shù)(Aggregate Functions)
用戶自定義聚合函數(shù)(User Defined AGGregate function,UDAGG)會把一行或多行數(shù)據(jù)(也就是一個表)聚合成一個標(biāo)量值。這是一個標(biāo)準(zhǔn)的“多對一”的轉(zhuǎn)換。聚合函數(shù)的概念我們之前已經(jīng)接觸過多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常見的系統(tǒng)內(nèi)置聚合函數(shù)。而如果有些需求無法直接調(diào)用系統(tǒng)函數(shù)解決,我們就必須自定義聚合函數(shù)來實現(xiàn)功能了。
自定義聚合函數(shù)需要繼承抽象類AggregateFunction。AggregateFunction有兩個泛型參數(shù)
(1)首先,它需要創(chuàng)建一個累加器(accumulator),用來存儲聚合的中間結(jié)果。這與DataStream API中的AggregateFunction非常類似,累加器就可以看作是一個聚合狀態(tài)。調(diào)用createAccumulator()方法可以創(chuàng)建一個空的累加器。
(2)對于輸入的每一行數(shù)據(jù),都會調(diào)用accumulate()方法來更新累加器,這是聚合的核心過程。
(3)當(dāng)所有的數(shù)據(jù)都處理完之后,通過調(diào)用getValue()方法來計算并返回最終的結(jié)果。
所以,每個 AggregateFunction 都必須實現(xiàn)以下幾個方法:
createAccumulator()
這是創(chuàng)建累加器的方法。沒有輸入?yún)?shù),返回類型為累加器類型ACC。
accumulate()
這是進行聚合計算的核心方法,每來一行數(shù)據(jù)都會調(diào)用。它的第一個參數(shù)是確定的,就是當(dāng)前的累加器,類型為ACC,表示當(dāng)前聚合的中間狀態(tài);后面的參數(shù)則是聚合函數(shù)調(diào)用時傳入的參數(shù),可以有多個,類型也可以不同。這個方法主要是更新聚合狀態(tài),所以沒有返回類型。需要注意的是,accumulate()與之前的求值方法eval()類似,也是底層架構(gòu)要求的,必須為public,方法名必須為accumulate,且無法直接override、只能手動實現(xiàn)。
getValue()
這是得到最終返回結(jié)果的方法。輸入?yún)?shù)是ACC類型的累加器,輸出類型為T。在遇到復(fù)雜類型時,F(xiàn)link 的類型推導(dǎo)可能會無法得到正確的結(jié)果。所以AggregateFunction也可以專門對累加器和返回結(jié)果的類型進行聲明,這是通過 getAccumulatorType()和getResultType()兩個方法來指定的。
AggregateFunction 的所有方法都必須是 公有的(public),不能是靜態(tài)的(static),而且名字必須跟上面寫的完全一樣。createAccumulator、getValue、getResultType 以及 getAccumulatorType 這幾個方法是在抽象類 AggregateFunction 中定義的,可以override;而其他則都是底層架構(gòu)約定的方法。
// 學(xué)生的分?jǐn)?shù)表ScoreTable中計算每個學(xué)生的加權(quán)平均分
public class MyAggregateFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 姓名,分?jǐn)?shù),權(quán)重
DataStreamSource
Tuple3.of("zs",80, 3),
Tuple3.of("zs",90, 4),
Tuple3.of("zs",95, 4),
Tuple3.of("ls",75, 4),
Tuple3.of("ls",65, 4),
Tuple3.of("ls",85, 4)
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight"));
tableEnv.createTemporaryView("scores", scoreWeightTable);
// TODO 2.注冊函數(shù)
tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class);
// TODO 3.調(diào)用 自定義函數(shù)
tableEnv
.sqlQuery("select name,WeightedAvg(score,weight) from scores group by name")
.execute()
.print();
}
// TODO 1.繼承 AggregateFunction< 返回類型,累加器類型<加權(quán)總和,權(quán)重總和> >
public static class WeightedAvg extends AggregateFunction
@Override
public Double getValue(Tuple2
return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1;
}
@Override
public Tuple2
return Tuple2.of(0, 0);
}
/**
* 累加計算的方法,每來一行數(shù)據(jù)都會調(diào)用一次
* @param acc 累加器類型
* @param score 第一個參數(shù):分?jǐn)?shù)
* @param weight 第二個參數(shù):權(quán)重
*/
public void accumulate(Tuple2
acc.f0 += score * weight; // 加權(quán)總和 = 分?jǐn)?shù)1 * 權(quán)重1 + 分?jǐn)?shù)2 * 權(quán)重2 +....
acc.f1 += weight; // 權(quán)重和 = 權(quán)重1 + 權(quán)重2 +....
}
}
}
表聚合函數(shù)(Table Aggregate Functions)
用戶自定義表聚合函數(shù)(UDTAGG)可以把一行或多行數(shù)據(jù)(也就是一個表)聚合成另一張表,結(jié)果表中可以有多行多列。很明顯,這就像表函數(shù)和聚合函數(shù)的結(jié)合體,是一個“多對多”的轉(zhuǎn)換。
自定義表聚合函數(shù)需要繼承抽象類TableAggregateFunction。TableAggregateFunction的結(jié)構(gòu)和原理與AggregateFunction非常類似,同樣有兩個泛型參數(shù)
createAccumulator()
創(chuàng)建累加器的方法,與AggregateFunction中用法相同。
accumulate()
聚合計算的核心方法,與AggregateFunction中用法相同。
emitValue()
所有輸入行處理完成后,輸出最終計算結(jié)果的方法。這個方法對應(yīng)著AggregateFunction中的getValue()方法;區(qū)別在于emitValue沒有輸出類型,而輸入?yún)?shù)有兩個:第一個是ACC類型的累加器,第二個則是用于輸出數(shù)據(jù)的“收集器”out,它的類型為Collect
表聚合函數(shù)相對比較復(fù)雜,它的一個典型應(yīng)用場景就是TOP-N查詢。比如我們希望選出一組數(shù)據(jù)排序后的前兩名,這就是最簡單的TOP-2查詢。沒有現(xiàn)成的系統(tǒng)函數(shù),那么我們就可以自定義一個表聚合函數(shù)來實現(xiàn)這個功能。在累加器中應(yīng)該能夠保存當(dāng)前最大的兩個值,每當(dāng)來一條新數(shù)據(jù)就在accumulate()方法中進行比較更新,最終在emitValue()中調(diào)用兩次out.collect()將前兩名數(shù)據(jù)輸出。
public class MyTableAggregateFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 姓名,分?jǐn)?shù),權(quán)重
DataStreamSource
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table numTable = tableEnv.fromDataStream(numDS, $("num"));
// TODO 2.注冊函數(shù)
tableEnv.createTemporaryFunction("Top2", Top2.class);
// TODO 3.調(diào)用 自定義函數(shù): 只能用 Table API
numTable
.flatAggregate(call("Top2", $("num")).as("value", "rank"))
.select( $("value"), $("rank"))
.execute().print();
}
// TODO 1.繼承 TableAggregateFunction< 返回類型,累加器類型<加權(quán)總和,權(quán)重總和> >
// 返回類型 (數(shù)值,排名) =》 (12,1) (9,2)
// 累加器類型 (第一大的數(shù),第二大的數(shù)) ===》 (12,9)
public static class Top2 extends TableAggregateFunction
@Override
public Tuple2
return Tuple2.of(0, 0);
}
/**
* 每來一個數(shù)據(jù)調(diào)用一次,比較大小,更新 最大的前兩個數(shù)到 acc中
*
* @param acc 累加器
* @param num 過來的數(shù)據(jù)
*/
public void accumulate(Tuple2
if (num > acc.f0) {
// 新來的變第一,原來的第一變第二
acc.f1 = acc.f0;
acc.f0 = num;
} else if (num > acc.f1) {
// 新來的變第二,原來的第二不要了
acc.f1 = num;
}
}
/**
* 輸出結(jié)果: (數(shù)值,排名)兩條最大的
*
* @param acc 累加器
* @param out 采集器<返回類型>
*/
public void emitValue(Tuple2
if (acc.f0 != 0) {
out.collect(Tuple2.of(acc.f0, 1));
}
if (acc.f1 != 0) {
out.collect(Tuple2.of(acc.f1, 2));
}
}
}
}
柚子快報邀請碼778899分享:大數(shù)據(jù) Flink SQL
相關(guān)文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。