柚子快報邀請碼778899分享:大數(shù)據(jù) Spark SQL
柚子快報邀請碼778899分享:大數(shù)據(jù) Spark SQL
Spark SQL
一、Spark SQL架構(gòu)
能夠直接訪問現(xiàn)存的Hive數(shù)據(jù) 提供JDBC/ODBC接口供第三方工具借助Spark進行數(shù)據(jù)處理 提供更高層級的接口方便處理數(shù)據(jù) 支持多種操作方式:SQL、API編程
API編程:Spark SQL基于SQL開發(fā)了一套SQL語句的算子,名稱和標(biāo)準的SQL語句相似 支持Parquet、CSV、JSON、RDBMS、Hive、HBase等多種外部數(shù)據(jù)源。(掌握多種數(shù)據(jù)讀取方式) Spark SQL核心:是RDD+Schema(算子+表結(jié)構(gòu)),為了更方便我們操作,會將RDD+Schema發(fā)給DataFrame 數(shù)據(jù)回灌:用于將處理和清洗后的數(shù)據(jù)回寫到Hive中,以供后續(xù)分析和使用。 BI Tools:主要用于數(shù)據(jù)呈現(xiàn)。 Spark Application:開發(fā)人員使用Spark Application編寫數(shù)據(jù)處理和分析邏輯,這些應(yīng)用可以用不同的編程語言編寫,比如Python、Scala、Java等。
二、Spark SQL運行原理
Catalyst優(yōu)化器的運行流程:
Frontend(前端)
輸入:用戶可以通過SQL查詢或DataFrame API來輸入數(shù)據(jù)處理邏輯。Unresolved Logical Plan(未解析的邏輯計劃):輸入的SQL查詢或DataFrame轉(zhuǎn)換操作會首先被轉(zhuǎn)換為一個未解析的邏輯計劃,這個計劃包含了用戶請求的所有操作,但其中的表名和列名等可能尚未解析。 Catalyst Optimizer(Catalyst優(yōu)化器) Catalyst優(yōu)化器是Spark SQL的核心組件,它負責(zé)將邏輯計劃轉(zhuǎn)換為物理執(zhí)行計劃,并進行優(yōu)化。Catalyst優(yōu)化器包括以下幾個階段:
Analysis(分析):將未解析的邏輯計劃中的表名和列名解析為具體的元數(shù)據(jù),這一步依賴于Catalog(元數(shù)據(jù)存儲)。輸出是一個解析后的邏輯計劃。Logical Optimization(邏輯優(yōu)化):對解析后的邏輯計劃進行各種優(yōu)化,如投影剪切、過濾下推等。優(yōu)化后的邏輯計劃更加高效。Physical Planning(物理計劃):將優(yōu)化后的邏輯計劃轉(zhuǎn)換為一個或多個物理執(zhí)行計劃。每個物理計劃都代表了一種可能的執(zhí)行方式。Cost Model(成本模型):評估不同物理計劃的執(zhí)行成本,選擇代價最低的物理計劃作為最終的物理計劃。 Backend(后端)
Code Generation(代碼生成):將選擇的物理計劃轉(zhuǎn)換為可以在Spark上執(zhí)行的RDD操作。這一步會生成實際的執(zhí)行代碼。RDDs:最終生成的RDD操作被執(zhí)行,以完成用戶請求的數(shù)據(jù)處理任務(wù)。
一個SQL查詢在Spark SQL中的優(yōu)化流程
SELECT name FROM(
SELECT id, name FROM people
) p
WHERE p.id = 1
Filter下壓:將Filter操作推到更靠近數(shù)據(jù)源的位置,以減少不必要的數(shù)據(jù)處理。合并Projection:減少不必要的列選擇IndexLookup return:name:如果存在索引,可以直接通過索引查找并返回name列
三、Spark SQL API
SparkContext:Spark應(yīng)用的主入口,代表了與Spark集群的連接。 SQLContext:Spark SQL的編程入口,使用SQLContext可以運行SQL查詢、加載數(shù)據(jù)源和創(chuàng)建DataFrame。 HiveContext:SQLContext的一個子集,可以執(zhí)行HiveQL查詢,并且可以訪問Hive元數(shù)據(jù)和UDF。 SparkSession:Spark2.0后推薦使用,合并了SQLContext和HiveContext,提供了與Spark所有功能交互的單一入口點。 創(chuàng)建一個SparkSession就包含了一個SparkContext。 若同時需要創(chuàng)建SparkContext和SparkSession,必須先創(chuàng)建SparkContext再創(chuàng)建SparkSession。否則,會拋出如下異常,提示重復(fù)創(chuàng)建SparkContext:
詳細解釋
創(chuàng)建SparkSession的代碼
val conf: SparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName("SparkSql")
def main(args: Array[String]): Unit = {
SparkSession.builder()
.config(conf)
.getOrCreate()
}
優(yōu)化:減少創(chuàng)建代碼,SparkSessionBuilder工具類
package com.ybg
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
// 封裝SparkSession的創(chuàng)建方法
class SparkSessionBuilder(master:String,appName:String){
lazy val config:SparkConf = {
new SparkConf()
.setMaster(master)
.setAppName(appName)
}
lazy val spark:SparkSession = {
SparkSession.builder()
.config(config)
.getOrCreate()
}
lazy val sc:SparkContext = {
spark.sparkContext
}
def stop(): Unit = {
if (null != spark) {
spark.stop()
}
}
}
object SparkSessionBuilder {
def apply(master: String, appName: String): SparkSessionBuilder = new SparkSessionBuilder(master, appName)
}
四、Spark SQL依賴
pom.xml
若出現(xiàn)如下異常:
Caused by: com.fasterxml.jackson.databind.JsonMappingException:
Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0
追加如下依賴:
-->
log4j.properties
log4j.properties應(yīng)該放在資源包下。
log4j.rootLogger=ERROR, stdout, logfile # 設(shè)置可顯示的信息等級
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/spark_first.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
五、Spark SQL數(shù)據(jù)集
1、DataSet
簡介:
從Spark 1.6開始引入的新的抽象。是特定領(lǐng)域?qū)ο笾械膹婎愋图稀?梢允褂煤瘮?shù)式編程或SQL查詢進行操作。等于RDD + Schema。
2、DataFrame
簡介:
DataFrame是特殊的DataSet:DataFrame=DataSet[Row],行對象的集合,每一行就是一個行對象。類似于傳統(tǒng)數(shù)據(jù)的二維表格。 特性:
Schema:在RDD基礎(chǔ)上增加了Schema,描述數(shù)據(jù)結(jié)構(gòu)信息嵌套數(shù)據(jù)類型:支持struct,map,array等嵌套數(shù)據(jù)類型。API:提供類似SQL的操作接口。
詳細解釋
創(chuàng)建DataSet的代碼
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// 提供了一組隱式轉(zhuǎn)換,這些轉(zhuǎn)換允許將Scala的本地集合類型(如Seq、Array、List等)轉(zhuǎn)換為Spark的DataSet。
import spark.implicits._
val dsPhone: Dataset[Product] = spark.createDataset(Seq(
Product(1, "Huawei Mate60", 5888.0f),
Product(2, "IPhone", 5666.0f),
Product(3, "OPPO", 1888.0f)
))
dsPhone.printSchema()
/**
* root
* |-- id: integer (nullable = false)
* |-- name: string (nullable = true)
* |-- price: float (nullable = false)
*/
創(chuàng)建DataFrame的代碼
讀取CSV文件
對于CSV文件,在構(gòu)建DataFrame之前,必須要先創(chuàng)建一個Schema,再根據(jù)文件類型分不同情況進行導(dǎo)入。(讀取JSON文件或者數(shù)據(jù)庫表都并不需要) 注意:必須要import spark.implicits._,導(dǎo)入隱式類,才能夠識別一些隱式轉(zhuǎn)換,否則會報錯。 CSV文件在創(chuàng)建DataFrame時,可以選擇盡量模仿Hive中的OpenCSVSerDe的
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
val schema: StructType = StructType(
Seq(
StructField("user_id", LongType),
StructField("locale", StringType),
StructField("birthYear", IntegerType),
StructField("gender", StringType),
StructField("joinedAt", StringType),
StructField("location", StringType),
StructField("timezone", StringType)
)
)
val frmUsers: DataFrame = spark.read
.schema(schema)
.option("separator", ",") // 指定文件分割符
.option("header", "true") // 指定CSV文件包含表頭
.option("quoteChar", "\"")
.option("escapeChar", "\\")
.csv("C:\\Users\\lenovo\\Desktop\\users.csv")
.repartition(4)
.cache()
讀取JSON文件
val frmUsers2: DataFrame = spark.read.json("hdfs://single01:9000/spark/cha02/users.json")
frmUsers2.show()
讀取數(shù)據(jù)庫表
val url = "jdbc:mysql://single01:3306/test_db_for_bigdata" // 數(shù)據(jù)庫連接地址
val mysql = new Properties()
mysql.setProperty("driver", "com.mysql.cj.jdbc.Driver")
mysql.setProperty("user", "root")
mysql.setProperty("password", "123456")
spark
.read
.jdbc(url,"test_table1_for_hbase_import",mysql) // (url,TableName,連接屬性)
.show(100)
六、Spark_SQL的兩種編碼方式
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
val schema: StructType = StructType(
Seq(
StructField("user_id", LongType),
StructField("locale", StringType),
StructField("birthYear", IntegerType),
StructField("gender", StringType),
StructField("joinedAt", StringType),
StructField("location", StringType),
StructField("timezone", StringType)
)
)
val frmUsers: DataFrame = spark.read
.schema(schema)
.option("separator", ",") // 指定文件分割符
.option("header", "true") // 指定CSV文件包含表頭
.option("quoteChar", "\"")
.option("escapeChar", "\\")
.csv("C:\\Users\\lenovo\\Desktop\\users.csv")
.repartition(4)
.cache()
此處已經(jīng)創(chuàng)建好了DataFrame
1. 面向標(biāo)準SQL語句(偷懶用)
frmUsers.registerTempTable("user_info") // 此方法已過期
spark.sql(
"""
|select * from user_info
|where gender='female'
|""".stripMargin)
.show(10)
2. 使用Spark中的SQL算子(更規(guī)范)
frmUsers
.where($"birthYear">1990)
.groupBy($"locale")
.agg(
count($"locale").as("locale_count"),
round(avg($"birthYear"),2).as("avg_birth_year")
)
.where($"locale_count">=10 and $"avg_birth_year">=1993)
.orderBy($"locale_count".desc)
.select(
$"locale", $"locale_count", $"avg_birth_year",
dense_rank()
.over(win)
.as("rnk_by_locale_count"),
lag($"locale_count",1)
.over(win)
.as("last_locale_count")
)
.show(10)
七、常用算子
1.基本SQL模板
select
col,cols*,agg*
where
conditionCols
group by
col,cols*
having
condition
order by
col asc|desc
limit
n
2.select
select語句在代碼的開頭可以不寫,因為有后續(xù)的類似where和group by語句已經(jīng)對列進行了操作,指明了列名。如果后續(xù)有select語句,則優(yōu)先按照后面的select語句進行。
frmUsers.select(
$"locale",$"locale_count"
)
3.agg
.agg(
count($"locale").as("locale_count"),
round(avg($"birthYear"),2).as("avg_birth_year")
)
4.窗口函數(shù)
over子句
注意:over子句中的分區(qū)信息是可以被重用的
val win: WindowSpec = Window.partitionBy($"gender").orderBy($"locale_count".desc)
frmUsers
...
.select(
dense_rank()
.over(win)
.as("rnk_by_locale_count")
)
5.show
show(N)表示顯示符合條件的至多N條數(shù)據(jù)。(不是取前N條再提取出其中符合條件的數(shù)據(jù))
frmUsers
...
.show(10)
6.條件篩選 where
newCol:Column = $"cus_state".isNull
newCol:Column = $"cus_state".isNaN
newCol:Column = $"cus_state".isNotNull
newCol:Column = $"cus_state".gt(10) <=>$"cus_state">10
newCol:Column = $"cus_state".geq(10) <=>$"cus_state">=10
newCol:Column = $"cus_state".lt(10) <=>$"cus_state"<10
newCol:Column = $"cus_state".leq(10) <=>$"cus_state"<=10
newCol:Column = $"cus_state".eq(10) <=>$"cus_state"===10
newCol:Column = $"cus_state".ne(10) <=>$"cus_state"=!=10
newCol:Column = $"cus_state".between(10,20)
newCol:Column = $"cus_state".like("張%")
newCol:Column = $"cus_state".rlike("\\d+")
newCol:Column = $"cus_state".isin(list:Any*)
newCol:Column = $"cus_state".isInCollection(values:Itrable[_])
多條件:
newCol:Column = ColOne and ColTwo
newCol:Column = ColOne or ColTwo
在Spark SQL中,不存在Having子句,Where子句的實際作用根據(jù)相對于分組語句的前后決定。
7.分組
// 多重分組
/**
rollup的效果:
select birthYear,count(*) from user group by birthYear
union all
select gender,birthYear,count(*) from user group by gender,birthYear
存在"字段不對應(yīng)"的情況:
空缺的字段會自動補全為null
*/
frmUsers
.rollup("gender", "birthYear")
.count()
.show(100)
// 為了方便查找到每個數(shù)據(jù)行所對應(yīng)的分組方式
spark.sql(
"""
|select grouping__id,gender,birthYear,count(8) as cnt from user_info
|group by gender,birthday,
|grouping sets(gender,birthday,(gender,birthYear))
|""".stripMargin)
.show(100)
// 這里的group by子句定義了分組的列,到grouping sets明確指定了分組的組合
// 因而,在數(shù)倉設(shè)計的過程中,我們能夠?qū)Σ煌纸M依據(jù)下的不同數(shù)據(jù)依據(jù)grouping__id做分區(qū)。
RollUp和Cube的區(qū)別 假設(shè)有三列:1, 2, 3,使用CUBE(1, 2, 3),會生成以下組合:
GROUP BY ()(不分組,整體聚合)GROUP BY (1)GROUP BY (2)GROUP BY (3)GROUP BY (1, 2)GROUP BY (1, 3)GROUP BY (2, 3)GROUP BY (1, 2, 3) ROLLUP生成的分組組合是層級的,它從最詳細的分組開始,一步步減少分組的列,直到整體聚合。 假設(shè)有三列:1, 2, 3,使用ROLLUP(1, 2, 3),會生成以下組合:
GROUP BY (1, 2, 3)(最詳細的分組)GROUP BY (1, 2)GROUP BY (1)GROUP BY ()(不分組,整體聚合)
8.關(guān)聯(lián)查詢
val frmClass: DataFrame = spark.createDataFrame(
Seq(
Class(1, "yb12211"),
Class(2, "yb12309"),
Class(3, "yb12401")
)
)
val frmStu: DataFrame = spark.createDataFrame(
Seq(
Student("henry", 1),
Student("ariel", 2),
Student("jack", 1),
Student("rose", 4),
Student("jerry", 2),
Student("mary", 1)
)
)
// 1.笛卡爾積(默認情況下)
frmStu.as("S")
.join(frmClass.as("C"))
.show(100)
/**
+-----+-------+-------+---------+
| name|classId|classId|className|
+-----+-------+-------+---------+
|henry| 1 | 1 | yb12211|
|henry| 1 | 2 | yb12309|
|henry| 1 | 3 | yb12401|
|ariel| 2 | 1 | yb12211|
|ariel| 2 | 2 | yb12309|
|ariel| 2 | 3 | yb12401|
| jack| 1 | 1 | yb12211|
| jack| 1 | 2 | yb12309|
| jack| 1 | 3 | yb12401|
| rose| 4 | 1 | yb12211|
| rose| 4 | 2 | yb12309|
| rose| 4 | 3 | yb12401|
|jerry| 2 | 1 | yb12211|
|jerry| 2 | 2 | yb12309|
|jerry| 2 | 3 | yb12401|
| mary| 1 | 1 | yb12211|
| mary| 1 | 2 | yb12309|
| mary| 1 | 3 | yb12401|
+-----+-------+-------+---------+
*/
// 2.內(nèi)連接
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId","inner")
.show(100)
/**
+-----+-------+-------+---------+
| name|classId|classId|className|
+-----+-------+-------+---------+
|henry| 1 | 1 | yb12211|
|ariel| 2 | 2 | yb12309|
| jack| 1 | 1 | yb12211|
|jerry| 2 | 2 | yb12309|
| mary| 1 | 1 | yb12211|
+-----+-------+-------+---------+
*/
// 啟用using:使用Seq("Column")代表關(guān)聯(lián)字段
frmStu.as("S")
.join(frmClass.as("C"), Seq("classId"),"right")
.show(100)
// 3.外連接
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId","outer") // left | right | outer
.show(100)
/**
+-----+-------+-------+---------+
| name|classId|classId|className|
+-----+-------+-------+---------+
|henry| 1 | 1 | yb12211|
| jack| 1 | 1 | yb12211|
| mary| 1 | 1 | yb12211|
| null| null | 3 | yb12401|
| rose| 4 | null | null|
|ariel| 2 | 2 | yb12309|
|jerry| 2 | 2 | yb12309|
+-----+-------+-------+---------+
*/
// 4.反連接:返回左數(shù)據(jù)集中所有沒有關(guān)聯(lián)字段匹配記錄的左數(shù)據(jù)集的行
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId","anti")
.show(100)
/**
+----+-------+
|name|classId|
+----+-------+
|rose| 4 |
+----+-------+
*/
// 5.半連接:返回左數(shù)據(jù)集中所有有關(guān)聯(lián)字段匹配記錄的左數(shù)據(jù)集的行
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId","semi")
.show(100)
/**
+-----+-------+
| name|classId|
+-----+-------+
|henry| 1 |
|ariel| 2 |
| jack| 1 |
|jerry| 2 |
| mary| 1 |
+-----+-------+
*/
9.排序
frmStu.orderBy(cols:Column*)
10.數(shù)據(jù)截取
frmStu.tail(n:Int)
frmStu.take(n:Int)
八.SQL函數(shù)
常用函數(shù)
$"NAME" = col("NAME") // 取列值
as("ALIAS_NAME") // 別名
as(alias:Seq[String]) // 多個別名
when(CONDITION,V1) // 條件
.when(...)
.otherwise(VN)
lit(V) // 常量列
withColumn(colName:String,col:Column) // 擴展列(通常用于使用窗口函數(shù)做擴展列)
cast(DataType) // 類型轉(zhuǎn)換
常用函數(shù)案例
spark.createDataFrame(Seq(
Test(1,Array("money","freedom"),Map("java"->85,"c++"->92)),
Test(2,Array("beauty","writing"),Map("math"->91,"English"->88)),
Test(3,Array("movie","draw"),Map("Sql"->100,"LLM"->77))
))
// 多個explode不能寫在一個select中
.select($"id",explode($"hobbies").as("hobby"),$"scores")
.select($"id",$"hobby",explode($"scores").as(Seq("course","score")))
.select($"id",$"hobby",$"course",$"score".cast("Integer"))
.withColumn("score_rank",
when($"score">=90,lit("A")))
when($"score">=80,lit("B"))
when($"score">=70,lit("C"))
when($"score">=60,lit("D"))
.otherwise(lit("E"))
集合函數(shù)
array
size(collectCol:Column) // 計算數(shù)組大小
array(cols:Column*) // 一行中的多列轉(zhuǎn)為單列數(shù)組類型
array_sort(arrayCol:Column) // 對數(shù)組列中的元素進行排序
array_contains(arrayCol:Column,value:Any) // 依次判斷數(shù)組列的各個元素是否含有特定值
array_distinct(arrayCol:Column) // 對數(shù)組列的各個元素進行去重并返回去重后的結(jié)果
array_join(arrayCol:Column,sep:String,nullReplacement:String) // 對數(shù)組列的各個元素進行拼接
array_except(arrayCol:Column)
array_intersect(arrayCol:Column)
array_union(arrayCol:Column)
map
map_keys(mapCol:Column)
map_values(mapCol:Column)
map_entries(mapCol:Column)
集合函數(shù)案例
data.select($"id",size($"hobbies").as("hobbies_cnt")).show()
data.select($"id",array_sort($"hobbies").as("hobbies_sort")).show()
data.select($"id",array_contains($"hobbies","money")).show()
data.select($"id",array_distinct($"hobbies").as("unique_hobbies")).show()
data.select($"id",array_join($"hobbies",",","Unknown Value").as("union_hobby")).show()
data.withColumn("next_hobbies",lead($"hobbies",1) over(Window.orderBy("id")))
.where($"next_hobbies".isNotNull) // 提前做條件篩選
.select(
array_intersect($"hobbies",$"next_hobbies").as("intersect_hobbies")
)
.show(10)
data.select($"id",
map_keys($"scores").as("course_list"),
map_values($"scores").as("scores"),
map_entries($"scores").as("course_score_list")
).show()
// 考java的學(xué)生人數(shù)有多少
val num: Long = data.select(
array_contains(map_keys($"scores"), "java").as("isJava")
).filter($"isJava").count()
字符串函數(shù)
// 提取
// 1、提取 json
json_tuple(jsonCol:Column, fields:String*) // $"jsonString" => field1,field2 獲取單層Json字段
get_json_object(jsonCol:Column, path:String) // $"jsonString" => $.field1[.field2] 獲取多層嵌套Json字段
// 2、正則分組
regexp_extract(col:Column, pattern:String, groupId:Int)
// 3、分裂與截取
split(col:Column,pattern:String)
substring(col:Column,pos:Int,len:Int)
substring_index(col:Column,sep:String,groupId:Int)
// groupId +N 從左向右前N個
// groupId -N 從右向左前N個
// 第N個 substring_index(substring_index(COL,SEP,+N),SEP,-1)
// 4、子字符串在字段中的位置(表示子字符串的第一個字符在字符串中的索引位置)
locate(subStr:String,col:Column) // 有則>0,否則=0,
instr(col:Column,subStr:String)
// 5、字符串拼接
concat(cols:Column*)
concat_ws(sep:String,cols:Column*)
// 6、內(nèi)容長度
length(col:Column) // 字符長度
// 字節(jié)長度,未提供算子,需要通過 spark.sql(""" select octet_length(...)""") 實現(xiàn)
// 7、定長填充
lpad(col:Column,len:Int,pad:String)
rpad(col:Column,len:Int,pad:String)
// 8、清除兩端空格
ltrim(col:Column)
rtrim(col:Column)
trim(col:Column)
// 9、大小寫轉(zhuǎn)換
initcap(col:Column) // 每個單詞首字母大寫
upper(col:Column) // 全大寫
lower(col:Column) // 全小寫
hash(col:Column) // 去哈希值
regexp_replace(col:Column,pattern:String,replace:String) // 正則替換
translate(col:Column,from:String,to:String) // 按字母轉(zhuǎn)換
reverse(col:Column) // 翻轉(zhuǎn)
// 10、轉(zhuǎn)碼
encode(col:Column, charSet:String)
decode(col:Column, charSet:String)
// 11、非對稱加密
sha1(col:Column)
md5(col:Column)
字符串函數(shù)案例
val frm: DataFrame = spark
.createDataFrame(Seq(
Json(1, """{"name":"henry","age":22,"hobbies":["beauty","money","power"],"address":{"province":"jiangsu","city":"nanjing"}}"""),
Json(2, """{"name":"jack","age":23,"hobbies":["beauty","power"],"address":{"province":"jiangsu","city":"wuxi"}}"""),
Json(3, """{"name":"tom","age":24,"hobbies":["beauty","money"],"address":{"province":"jiangsu","city":"yancheng"}}""")
))
frm.select($"id",
json_tuple($"json","name","age","hobbies").as(Seq("name","age","hobbies")),
get_json_object($"json","$.address.province").as("province"),
get_json_object($"json","$.address.city").as("city")
).show(10)
// 通過正則提取獲取特定的日志信息
val regex_line = "(.*?) (INFO|WARN|ERROR) (.*?):(.*)"
val regex_log = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3} (INFO|WARN|ERROR) .*"
val frm: DataFrame = spark
.read
.text("spark-warehouse/datanode.log")
.toDF("line")
frm
.where($"line".rlike(regex_log))
.select(
regexp_extract($"line",regex_line,1).as("log_in_time"),
regexp_extract($"line",regex_line,2).as("log_type"),
regexp_extract($"line",regex_line,3).as("log_full_pack"),
regexp_extract($"line",regex_line,4).as("log_detail")
)
// 獲取錯誤日志信息中錯誤類別及其所占數(shù)量
.where($"log_type".equalTo("ERROR"))
.groupBy($"log_detail")
.count()
.show(100)
Spark自定義函數(shù)流程
自定義函數(shù)流程:定義-注冊-調(diào)用
建議將自定義函數(shù)實現(xiàn),單獨建對象保存 import java.nio.charset.{StandardCharsets}
import java.util.Base64
import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec
object SparkUtil {
/**
* 處理密鑰
* @param secret 密鑰
*/
private def secretInit(secret:String)={
// 對密鑰長度進行約束
val allowNumBits: Array[Int] = Array(16, 24, 32)
// 如果密鑰長度符合,將密鑰轉(zhuǎn)換為AES密鑰對象
if (allowNumBits.contains(secret.size)) {
new SecretKeySpec(
secret.getBytes(StandardCharsets.UTF_8),"AES")
}else{
throw new RuntimeException(
s"AES secret size of numBits ${secret.size} not in
permitted values (${allowNumBits.mkString(",")})")
}
}
/**
* 加密函數(shù)
* @param src 源數(shù)據(jù)
* @param secret 密鑰
*/
def encrypt(src:String,secret:String)={
// 獲取加密算法實例
val cipher: Cipher = Cipher.getInstance("AES")
// 初始化加密模式,使用給定的密鑰(需要先用key()對密鑰進行處理)
cipher.init(Cipher.ENCRYPT_MODE,secretInit(secret))
// 執(zhí)行加密操作
val bytes: Array[Byte] = cipher.doFinal(src.getBytes(StandardCharset.UTF_8))
// 返回加密后的數(shù)據(jù)
Base64.getEncoder().encodeToString(bytes)
}
/**
* 解密函數(shù)
* @param dest 待解密數(shù)據(jù)
* @param secret 密鑰
*/
def decrypt(dest:String,secret:String)={
val cipher: Cipher = Cipher.getInstance("AES")
cipher.init(Cipher.DECRYPT_MODE,secretInit(secret))
val bytes: Array[Byte] = cipher.doFinal(
Base64.getDecoder.decode(dest))
new String(bytes, StandardCharsets.UTF_8)
}
}
在 Spark 環(huán)境下導(dǎo)入對象實現(xiàn)的方法,并在 SparkSession 中注冊 UDF 函數(shù) import core.SparkUtil.{encrypt,decrypt}
spark.udf.register(
"aes_encrypt",
(src:String,secret:String)
=>encrypt(src, secret),StringType)
spark.udf.register(
"aes_decrypt",
(src:String,secret:String)
=>decrypt(src, secret),StringType)
在 SparkSql 中調(diào)用注冊函數(shù) val frm: DataFrame = spark.createDataFrame(Seq(
Test(1,Array("money","freedom"),Map("java"->85,"mysql"->67)),
Test(2,Array("beauty","beauty"),Map("java"->72,"mysql"->90)),
Test(3,Array("sports","beauty"),Map("java"->76,"html"->52))
))
val secret = "henryyb2211ariel"
val frmEncrypt: DataFrame = frm
.select($"id",
callUDF(
"aes_encrypt",
array_join($"hobbies", ","),
lit(secret)
).as("encrypted_hobbies")
)
val frmDecrypt: DataFrame = frmEncrypt
.select($"id",
split(
callUDF(
"aes_decrypt",
$"encrypted_hobbies",
lit(secret)
),
","
).as("hobbies")
).show()
柚子快報邀請碼778899分享:大數(shù)據(jù) Spark SQL
好文閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。