欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:flink UDF使用

柚子快報邀請碼778899分享:flink UDF使用

http://yzkb.51969.com/

1. 基礎概念

??Flink UDF(User-Defined Function)是用戶自定義函數,用于擴展 Flink SQL 或 Table API 的內置函數功能。

2、分類

常見的 UDF 類型包括:

(1)flink UDF

Flink UDF(User-Defined Function) 為標量函數。

特點

單/多 字段輸入,單字段輸出,編寫函數時,繼承 Scalar Function。

使用場景

??適合數據轉換和簡單的計算。如:字符串的格式轉換,類型轉換,根據某些條件計算新的字段值。

實現與注冊

??標量函數需繼承 ScalarFunction 并實現 eval 方法。例如,實現一個字符串大寫轉換函數:

public class UpperCase extends ScalarFunction {

public String eval(String str) {

return str.toUpperCase();

}

}

注冊方式:

tableEnv.createTemporarySystemFunction("MY_UPPER", UpperCase.class);

SQL 使用:

SELECT MY_UPPER(name) FROM users;

實際場景

??根據 json 的 key,獲取對應的 value,這在數倉的業(yè)務場景中,是非?;A的一個 udf 函數,同時也是使用最廣泛、且重要的一個函數。

public class JsonExtract extends ScalarFunction {

public String eval(String json, String key) {

try {

JSONObject obj = new JSONObject(json);

return obj.getString(key);

} catch (JSONException e) {

return null;

}

}

}

SQL 使用:

SELECT JsonExtract(log, 'userId') AS userId FROM logs;

(2)flink UDTF

Flink UDTF(User-Defined Table Function) 為表函數。

特點為

單輸入/多輸入,多輸出。編寫函數時,繼承 Table Function。

使用場景

??數據拆分和數據擴展。例如:輸入一個 json,返回 json 中的多個字段?;蛘吒鶕承┮?guī)則生成額外的行數據。

實現步驟

繼承 TableFunction,定義輸出類型 T。實現 eval 方法,并通過 collect(T) 輸出多行。

示例:實現拆分字符串為多行的表函數:

public class Split extends TableFunction {

public void eval(String str) {

for (String s : str.split(",")) {

collect(s);

}

}

}

SQL 使用:

SELECT name, word FROM users, LATERAL TABLE(Split(name)) AS T(word);

(3)Flink UDAF

Flink UDAF(User-Defined Aggregate Function)為聚合函數。

特點

??對一組數據進行聚合計算??梢跃S護中間狀態(tài),逐步累積計算結果。編寫函數時,繼承 Table Function。

使用場景

常見的聚合操作:如求平均值、總和、最大值、最小值等。自定義的復雜聚合邏輯,比如計算移動平均值等。

實現步驟

??聚合函數需實現 AggregateFunction,其中 OUT 是輸出類型,ACC 是累加器類型。核心方法:

createAccumulator():初始化累加器。accumulate(ACC acc, …):累加輸入數據。getValue(ACC acc):從累加器生成最終結果。

示例:實現求平均值的聚合函數:

public class Avg extends AggregateFunction {

public AvgAccum createAccumulator() {

return new AvgAccum(0, 0);

}

public void accumulate(AvgAccum acc, Integer value) {

acc.sum += value;

acc.count++;

}

public Double getValue(AvgAccum acc) {

return acc.sum / (double) acc.count;

}

}

public static class AvgAccum {

public int sum;

public int count;

// 構造函數、getter/setter 省略

}

3、Flink UDF 中如何處理狀態(tài)(State)

UDF:通常無狀態(tài),需保證線程安全。

UDAF:通過累加器(Accumulator)隱式管理狀態(tài),Flink 自動容錯。

手動狀態(tài)管理:在 RichFunction(如 RichMapFunction)中通過 RuntimeContext 訪問狀態(tài) API,例如:

public class StatefulUDF extends RichMapFunction {

private ValueState state;

@Override

public void open(Configuration parameters) {

ValueStateDescriptor descriptor =

new ValueStateDescriptor<>("state", String.class);

state = getRuntimeContext().getState(descriptor);

}

@Override

public String map(String value) throws Exception {

String current = state.value();

// 更新狀態(tài)邏輯

state.update(newValue);

return result;

}

}

4、性能調優(yōu)

(1)避免復雜對象創(chuàng)建:在 eval 方法中重用對象,減少 GC 壓力。

(2)使用原生類型:優(yōu)先使用 int、double 而非 Integer、Double。

(3)并行度調優(yōu):根據數據量和資源調整 UDF 算子的并行度。

(4)狀態(tài)后端選擇:對狀態(tài)頻繁更新的場景,使用 RocksDB 狀態(tài)后端。

(5)禁用 Operator Chain:對計算密集型的 UDF,通過 disableChaining() 避免與上下游算子鏈化,提升資源利用率。

5. 異常處理

??如果 UDF 拋出異常,如何避免整個作業(yè)失敗呢?有如下幾種方法:

(1)Try-Catch 處理:在 eval 方法內部捕獲異常,返回默認值或錯誤標識。 (2)Side Output:將異常數據路由到側輸出流,單獨處理。 (3)重啟策略:配置 Flink 重啟策略(如 fixed-delay),但需謹慎避免無限重啟。

柚子快報邀請碼778899分享:flink UDF使用

http://yzkb.51969.com/

本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/2027324760.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄