欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:Flink之SQL join

柚子快報激活碼778899分享:Flink之SQL join

http://yzkb.51969.com/

Flink SQL join

Flink sql 支持對動態(tài)表進行復(fù)雜且靈活的join操作??紤]到查詢可能需要的各種語義,flink提供了多種不同類型的join。

默認(rèn)情況下,join的順序沒有做過優(yōu)化。表是按照他們在from子句中指定的順序進行join的。你可以通過把更新頻率最低的表放在最前面,把更新頻率最高的表放在最后面,來調(diào)整連接查詢的性能。確保指定表的順序不會產(chǎn)生交叉連接(笛卡兒積),flink不支持這樣的操作,會導(dǎo)致查詢失敗。

Regular joins常規(guī)連接

Regular join是最通用的join類型。在這中連接中,任何新的記錄或者連接中任何一方的變化都是可見的,并且影響整個連接查詢的結(jié)果。例如如果左邊表有一條新的記錄產(chǎn)生,當(dāng)產(chǎn)品ID相等時,它將與右邊的所有以前和未來的記錄連接起來。

SELECT * FROM Orders

INNER JOIN Product

ON Orders.productId = Product.id

對于流式查詢,regular join的語法是最靈活的,允許任何一種更新(增、刪、改)輸入表。然鵝,這種操作具有重要的操作影響:它需要將連接輸入的兩邊永遠(yuǎn)保持在flink的狀態(tài)中。因此,計算查詢結(jié)果所需的狀態(tài)會無限增長,這取決于所有輸入表和中間連接結(jié)果的獨特輸入行數(shù)量。你可以提供一個具有適當(dāng)?shù)臓顟B(tài)生存時間(TTL)的查詢配置,以防止過度的狀態(tài)大小。注意這可能會影響查詢結(jié)果的正確性。

對于流式查詢,計算查詢結(jié)果所需的狀態(tài)可能會無限增長,這取決于聚合的類型和不同分組鍵的數(shù)量。請?zhí)峁┮粋€具有有效保留間隔的查詢配置,以防止過多的狀態(tài)大小。詳見查詢配置。

Inner Equi-JOIN

返回一個由連接條件限制的簡單笛卡爾乘積。目前,只支持等價連接,即至少有一個帶有相同謂詞的連接條件。不支持任意的交叉或θ連接。

SELECT * FROM Orders

INNER JOIN Product

ON Orders.product_id = Product.id

OUTER Equi-JOIN

返回合格的笛卡爾乘積中的所有行(即所有通過其連接條件的組合行),加上外表中連接條件與其他表的任何行不匹配的每條行的一個副本。Flink支持LEFT、RIGHT和FULL外部連接。目前,只支持等價連接,即至少有一個帶有等價謂詞的聯(lián)合條件的連接。不支持任意的交叉連接或θ連接。

SELECT *

FROM Orders

LEFT JOIN Product

ON Orders.product_id = Product.id

SELECT *

FROM Orders

RIGHT JOIN Product

ON Orders.product_id = Product.id

SELECT *

FROM Orders

FULL OUTER JOIN Product

ON Orders.product_id = Product.id

Interval Joins區(qū)間連接

返回一個由連接條件和時間約束限制的簡單笛卡爾積。一個區(qū)間連接需要至少一個等價連接謂詞和一個限制兩邊時間的連接條件。兩個適當(dāng)?shù)姆秶^詞可以定義這樣的條件(<, <=, >=, >),一個BETWEEN謂詞,或者一個比較兩個輸入表的相同類型的時間屬性(即處理時間或事件時間)的單一等價謂詞。

例如,如果訂單在收到四小時后才發(fā)貨,這個查詢將把所有的訂單與它們相應(yīng)的發(fā)貨連接起來。

SELECT *

FROM Orders o, Shipments s

WHERE o.id = s.order_id

AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

下面的謂詞是有效的區(qū)間連接條件的例子。

ltime = rtimeltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTEltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

對于流式查詢,與Regular joins相比,區(qū)間連接只支持具有時間屬性的append-only表。由于時間屬性是準(zhǔn)單調(diào)遞增的,F(xiàn)link可以從其狀態(tài)中刪除舊值而不影響結(jié)果的正確性。

Temporal Joins 時間性連接

時態(tài)表是一個隨時間演變的表–在Flink中又被稱為動態(tài)表。時態(tài)表中的行與一個或多個時態(tài)周期相關(guān),所有的Flink表都是時態(tài)(動態(tài))的。時態(tài)表包含一個或多個版本的表快照,它可以是一個變化的歷史表,跟蹤變化(例如,數(shù)據(jù)庫變化日志,包含所有快照)或一個變化的維度表,將變化具體化(例如,數(shù)據(jù)庫表,包含最新的快照)。

Event Time Temporal Join 事件時間

事件時間時間連接允許針對一個版本的表進行連接。這意味著一個表可以用不斷變化的元數(shù)據(jù)來充實,并檢索其在某個時間點的值。

時間連接采取一個任意的表(左側(cè)輸入/探測點),并將每一行與版本表(右側(cè)輸入/構(gòu)建點)中相應(yīng)行的相關(guān)版本相關(guān)聯(lián)。Flink使用SQL:2011標(biāo)準(zhǔn)中的FOR SYSTEM_TIME AS OF的SQL語法來執(zhí)行這一操作。時態(tài)連接的語法如下。

SELECT [column_list]

FROM table1 [AS ]

[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ]

ON table1.column-name1 = table2.column-name1

有了事件時間屬性(即行時間屬性),就有可能檢索到一個鍵的值,因為它是在過去的某個時間點上。這允許在一個共同的時間點上連接兩個表。版本表將存儲自最后一個水印以來的所有版本–通過時間識別。

例如,假設(shè)我們有一個訂單表,每個訂單都有不同貨幣的價格。為了正確地將此表規(guī)范化為單一貨幣,如美元,每個訂單都需要用下單時的時間點的適當(dāng)貨幣轉(zhuǎn)換率來連接。

-- 創(chuàng)建一個訂單表

-- 訂單表是一個 append-only 動態(tài)表.

CREATE TABLE orders (

order_id STRING,

price DECIMAL(32,2),

currency STRING,

order_time TIMESTAMP(3),

WATERMARK FOR order_time AS order_time

) WITH (/* ... */);

-- Define a versioned table of currency rates.

-- This could be from a change-data-capture(CDC)

-- such as Debezium, a compacted Kafka topic, or any other

-- way of defining a versioned table.

CREATE TABLE currency_rates (

currency STRING,

conversion_rate DECIMAL(32, 2),

update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,

WATERMARK FOR update_time AS update_time,

PRIMARY KEY(currency) NOT ENFORCED

) WITH (

'connector' = 'kafka',

'value.format' = 'debezium-json',

/* ... */

);

SELECT

order_id,

price,

currency,

conversion_rate,

order_time

FROM orders

LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time

ON orders.currency = currency_rates.currency;

order_id price currency conversion_rate order_time

======== ===== ======== =============== =========

o_001 11.11 EUR 1.14 12:00:00

o_002 12.51 EUR 1.10 12:06:00

注意:事件時間的時間連接是由左右兩邊的水印watermark觸發(fā)的;請確保連接的兩邊都正確設(shè)置水印watermark。

注意:事件時間時間連接要求時間連接條件的等價條件中包含主鍵,例如,表currency_rates的主鍵currency_rates.currency在條件orders.currency = currency_rates.currency中被約束。

與常規(guī)連接相比,盡管構(gòu)建方有變化,但之前的時態(tài)表結(jié)果不會受到影響。與區(qū)間連接相比,時間表連接并不定義記錄被連接的時間窗口。來自探測方的記錄總是在時間屬性指定的時間內(nèi)與構(gòu)建方的版本連接。因此,構(gòu)建方的記錄可能是任意老的。隨著時間的推移,不再需要的記錄版本(對于給定的主鍵)將被從狀態(tài)中刪除。

Processing Time Temporal Join 處理時間

一個處理時間的臨時表連接使用一個處理時間屬性來將記錄與外部版本表中的鍵的最新版本相關(guān)聯(lián)。

根據(jù)定義,有了處理時間屬性,該連接將總是返回一個給定鍵的最新值。我們可以把查找表看作是一個簡單的HashMap,它存儲了來自構(gòu)建方的所有記錄。這種連接的力量在于,當(dāng)在Flink內(nèi)部將表具體化為動態(tài)表不可行時,它允許Flink直接針對外部系統(tǒng)工作。

下面的處理時間時態(tài)表連接例子顯示了一個append-only 表orders,它應(yīng)該與表LatestRates連接。LatestRates是一個維度表(例如HBase表),為最新的rate。在時間10:15, 10:30, 10:52,LatestRates的內(nèi)容看起來如下。

10:15> SELECT * FROM LatestRates;

currency rate

======== ======

US Dollar 102

Euro 114

Yen 1

10:30> SELECT * FROM LatestRates;

currency rate

======== ======

US Dollar 102

Euro 114

Yen 1

10:52> SELECT * FROM LatestRates;

currency rate

======== ======

US Dollar 102

Euro 116 <==== changed from 114 to 116

Yen 1

LastestRates在10:15和10:30的內(nèi)容是相同的。歐元匯率在10:52時從114變?yōu)?16。

訂單是一個append-only表,代表給定金額和給定貨幣的付款。例如,在10:15,有一個金額為2歐元的訂單。

SELECT * FROM Orders;

amount currency

====== =========

2 Euro <== arrived at time 10:15

1 US Dollar <== arrived at time 10:30

2 Euro <== arrived at time 10:52

鑒于這些表格,我們想計算所有訂單轉(zhuǎn)換為一種通用貨幣

amount currency rate amount*rate

====== ========= ======= ============

2 Euro 114 228 <== arrived at time 10:15

1 US Dollar 102 102 <== arrived at time 10:30

2 Euro 116 232 <== arrived at time 10:52

目前,在最新版本的任何視圖/表的時態(tài)連接中使用的FOR SYSTEM_TIME AS OF語法還不支持,你可以使用以下時態(tài)表函數(shù)語法Temporal Table Function Join。

SELECT

o_amount, r_rate

FROM

Orders,

LATERAL TABLE (Rates(o_proctime))

WHERE

r_currency = o_currency

注意 不支持任何表/視圖的最新版本在時態(tài)連接中使用的FOR SYSTEM_TIME AS OF語法的原因只是語義上的考慮,因為左流的連接處理并不等待時態(tài)表的完整快照,這可能會誤導(dǎo)生產(chǎn)環(huán)境中的用戶。通過時態(tài)表函數(shù)處理時態(tài)連接也存在同樣的語義問題,但它已經(jīng)存在很長時間了,因此我們從兼容性的角度支持它。

其結(jié)果在處理時間上是不確定的。處理時間的時間連接最常用于用外部表(即維度表)來充實流。

與常規(guī)連接相比,盡管構(gòu)建方有變化,但之前的時態(tài)表結(jié)果不會受到影響。與區(qū)間連接相比,時態(tài)表連接不定義記錄加入的時間窗口,即舊的記錄不存儲在狀態(tài)中。

Temporal Table Function Join時態(tài)表函數(shù)連接

用時態(tài)表函數(shù)連接表 temporal table function的語法與用表函數(shù)連接Table Function的語法相同。

注意:目前只支持時態(tài)表的內(nèi)連接和左外連接。

假設(shè)利率是一個時態(tài)表函數(shù),連接可以用SQL表示如下。

SELECT

o_amount, r_rate

FROM

Orders,

LATERAL TABLE (Rates(o_proctime))

WHERE

r_currency = o_currency

上述時態(tài)表DDL和時態(tài)表函數(shù)的主要區(qū)別是。

時態(tài)表DDL可以在SQL中定義,但時態(tài)表函數(shù)不能。時態(tài)表DDL和時態(tài)表函數(shù)都支持時態(tài)連接版本變化versioned 表,但只有時態(tài)表函數(shù)可以時態(tài)連接任何表/視圖的最新版本。

Lookup Join查找連接

查詢連接通常用于用從外部系統(tǒng)查詢的數(shù)據(jù)來充實一個表。該連接要求一個表有一個處理時間屬性,另一個表有一個查詢源連接器的支持。

The lookup join uses the above Processing Time Temporal Join syntax with the right table to be backed by a lookup source connector.

查詢連接使用上面的處理時間時間連接語法,右邊的表要由查詢源連接器來支持。

下面的例子顯示了指定一個查找連接的語法。

-- Customers is backed by the JDBC connector and can be used for lookup joins

CREATE TEMPORARY TABLE Customers (

id INT,

name STRING,

country STRING,

zip STRING

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',

'table-name' = 'customers'

);

-- enrich each order with customer information

SELECT o.order_id, o.total, c.country, c.zip

FROM Orders AS o

JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c

ON o.customer_id = c.id;

In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The FOR SYSTEM_TIME AS OF clause with the subsequent processing time attribute ensures that each row of the Orders table is joined with those Customers rows that match the join predicate at the point in time when the Orders row is processed by the join operator. It also prevents that the join result is updated when a joined Customer row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.customer_id = c.id.在上面的例子中,訂單表被駐扎在MySQL數(shù)據(jù)庫中的客戶表的數(shù)據(jù)所充實。帶有后續(xù)處理時間屬性的FOR SYSTEM_TIME AS OF子句確保訂單表的每條記錄與那些在訂單行被連接操作者處理的時間點上符合連接謂詞的客戶行被連接起來。它還可以防止在將來更新連接的客戶行時更新連接的結(jié)果。查找連接也需要一個強制性的平等連接謂詞,在上面的例子中,o.customer_id = c.id。

Array Expansion

為給定數(shù)組中的每個元素返回一個新行。目前還不支持用ORDINALITY進行嵌套。

SELECT order_id, tag

FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Table Function 表函數(shù)

用一個表函數(shù)的結(jié)果來連接一個表。左側(cè)(外部)表的每一行都與表函數(shù)的相應(yīng)調(diào)用所產(chǎn)生的所有行連接。用戶定義的表函數(shù)必須在使用前注冊。

INNER JOIN

如果左表(外表)的表函數(shù)調(diào)用返回一個空的結(jié)果,那么左表(外表)的行將被放棄。

SELECT order_id, res

FROM Orders,

LATERAL TABLE(table_func(order_id)) t(res)

LEFT OUTER JOIN

如果一個表函數(shù)的調(diào)用返回一個空的結(jié)果,那么相應(yīng)的外層行將被保留,并且結(jié)果將被填充為空值。目前,針對橫向表的左外連接要求在ON子句中有一個TRUE字樣。

SELECT order_id, res

FROM Orders

LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)

ON TRUE

柚子快報激活碼778899分享:Flink之SQL join

http://yzkb.51969.com/

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/2027353594.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄