欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:大數(shù)據(jù) Spark SQL

柚子快報邀請碼778899分享:大數(shù)據(jù) Spark SQL

http://yzkb.51969.com/

Spark SQL

一、Spark SQL架構(gòu)

能夠直接訪問現(xiàn)存的Hive數(shù)據(jù) 提供JDBC/ODBC接口供第三方工具借助Spark進行數(shù)據(jù)處理 提供更高層級的接口方便處理數(shù)據(jù) 支持多種操作方式:SQL、API編程

API編程:Spark SQL基于SQL開發(fā)了一套SQL語句的算子,名稱和標(biāo)準的SQL語句相似 支持Parquet、CSV、JSON、RDBMS、Hive、HBase等多種外部數(shù)據(jù)源。(掌握多種數(shù)據(jù)讀取方式) Spark SQL核心:是RDD+Schema(算子+表結(jié)構(gòu)),為了更方便我們操作,會將RDD+Schema發(fā)給DataFrame 數(shù)據(jù)回灌:用于將處理和清洗后的數(shù)據(jù)回寫到Hive中,以供后續(xù)分析和使用。 BI Tools:主要用于數(shù)據(jù)呈現(xiàn)。 Spark Application:開發(fā)人員使用Spark Application編寫數(shù)據(jù)處理和分析邏輯,這些應(yīng)用可以用不同的編程語言編寫,比如Python、Scala、Java等。

二、Spark SQL運行原理

Catalyst優(yōu)化器的運行流程:

Frontend(前端)

輸入:用戶可以通過SQL查詢或DataFrame API來輸入數(shù)據(jù)處理邏輯。Unresolved Logical Plan(未解析的邏輯計劃):輸入的SQL查詢或DataFrame轉(zhuǎn)換操作會首先被轉(zhuǎn)換為一個未解析的邏輯計劃,這個計劃包含了用戶請求的所有操作,但其中的表名和列名等可能尚未解析。 Catalyst Optimizer(Catalyst優(yōu)化器) Catalyst優(yōu)化器是Spark SQL的核心組件,它負責(zé)將邏輯計劃轉(zhuǎn)換為物理執(zhí)行計劃,并進行優(yōu)化。Catalyst優(yōu)化器包括以下幾個階段:

Analysis(分析):將未解析的邏輯計劃中的表名和列名解析為具體的元數(shù)據(jù),這一步依賴于Catalog(元數(shù)據(jù)存儲)。輸出是一個解析后的邏輯計劃。Logical Optimization(邏輯優(yōu)化):對解析后的邏輯計劃進行各種優(yōu)化,如投影剪切、過濾下推等。優(yōu)化后的邏輯計劃更加高效。Physical Planning(物理計劃):將優(yōu)化后的邏輯計劃轉(zhuǎn)換為一個或多個物理執(zhí)行計劃。每個物理計劃都代表了一種可能的執(zhí)行方式。Cost Model(成本模型):評估不同物理計劃的執(zhí)行成本,選擇代價最低的物理計劃作為最終的物理計劃。 Backend(后端)

Code Generation(代碼生成):將選擇的物理計劃轉(zhuǎn)換為可以在Spark上執(zhí)行的RDD操作。這一步會生成實際的執(zhí)行代碼。RDDs:最終生成的RDD操作被執(zhí)行,以完成用戶請求的數(shù)據(jù)處理任務(wù)。

一個SQL查詢在Spark SQL中的優(yōu)化流程

SELECT name FROM(

SELECT id, name FROM people

) p

WHERE p.id = 1

Filter下壓:將Filter操作推到更靠近數(shù)據(jù)源的位置,以減少不必要的數(shù)據(jù)處理。合并Projection:減少不必要的列選擇IndexLookup return:name:如果存在索引,可以直接通過索引查找并返回name列

三、Spark SQL API

SparkContext:Spark應(yīng)用的主入口,代表了與Spark集群的連接。 SQLContext:Spark SQL的編程入口,使用SQLContext可以運行SQL查詢、加載數(shù)據(jù)源和創(chuàng)建DataFrame。 HiveContext:SQLContext的一個子集,可以執(zhí)行HiveQL查詢,并且可以訪問Hive元數(shù)據(jù)和UDF。 SparkSession:Spark2.0后推薦使用,合并了SQLContext和HiveContext,提供了與Spark所有功能交互的單一入口點。 創(chuàng)建一個SparkSession就包含了一個SparkContext。 若同時需要創(chuàng)建SparkContext和SparkSession,必須先創(chuàng)建SparkContext再創(chuàng)建SparkSession。否則,會拋出如下異常,提示重復(fù)創(chuàng)建SparkContext:

詳細解釋

創(chuàng)建SparkSession的代碼

val conf: SparkConf = new SparkConf()

.setMaster("local[4]")

.setAppName("SparkSql")

def main(args: Array[String]): Unit = {

SparkSession.builder()

.config(conf)

.getOrCreate()

}

優(yōu)化:減少創(chuàng)建代碼,SparkSessionBuilder工具類

package com.ybg

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.SparkSession

// 封裝SparkSession的創(chuàng)建方法

class SparkSessionBuilder(master:String,appName:String){

lazy val config:SparkConf = {

new SparkConf()

.setMaster(master)

.setAppName(appName)

}

lazy val spark:SparkSession = {

SparkSession.builder()

.config(config)

.getOrCreate()

}

lazy val sc:SparkContext = {

spark.sparkContext

}

def stop(): Unit = {

if (null != spark) {

spark.stop()

}

}

}

object SparkSessionBuilder {

def apply(master: String, appName: String): SparkSessionBuilder = new SparkSessionBuilder(master, appName)

}

四、Spark SQL依賴

pom.xml

8

8

UTF-8

3.1.2

2.12

3.1.3

8.0.33

3.1.2

2.3.5

2.10.0

org.apache.spark

spark-core_${spark.scala.version}

${spark.version}

org.apache.spark

spark-sql_${spark.scala.version}

${spark.version}

若出現(xiàn)如下異常:

Caused by: com.fasterxml.jackson.databind.JsonMappingException:

Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0

追加如下依賴:

-->

com.fasterxml.jackson.core

jackson-databind

2.10.0

com.mysql

mysql-connector-j

${mysql.version}

log4j.properties

log4j.properties應(yīng)該放在資源包下。

log4j.rootLogger=ERROR, stdout, logfile # 設(shè)置可顯示的信息等級

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=log/spark_first.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

五、Spark SQL數(shù)據(jù)集

1、DataSet

簡介:

從Spark 1.6開始引入的新的抽象。是特定領(lǐng)域?qū)ο笾械膹婎愋图稀?梢允褂煤瘮?shù)式編程或SQL查詢進行操作。等于RDD + Schema。

2、DataFrame

簡介:

DataFrame是特殊的DataSet:DataFrame=DataSet[Row],行對象的集合,每一行就是一個行對象。類似于傳統(tǒng)數(shù)據(jù)的二維表格。 特性:

Schema:在RDD基礎(chǔ)上增加了Schema,描述數(shù)據(jù)結(jié)構(gòu)信息嵌套數(shù)據(jù)類型:支持struct,map,array等嵌套數(shù)據(jù)類型。API:提供類似SQL的操作接口。

詳細解釋

創(chuàng)建DataSet的代碼

val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

// 提供了一組隱式轉(zhuǎn)換,這些轉(zhuǎn)換允許將Scala的本地集合類型(如Seq、Array、List等)轉(zhuǎn)換為Spark的DataSet。

import spark.implicits._

val dsPhone: Dataset[Product] = spark.createDataset(Seq(

Product(1, "Huawei Mate60", 5888.0f),

Product(2, "IPhone", 5666.0f),

Product(3, "OPPO", 1888.0f)

))

dsPhone.printSchema()

/**

* root

* |-- id: integer (nullable = false)

* |-- name: string (nullable = true)

* |-- price: float (nullable = false)

*/

創(chuàng)建DataFrame的代碼

讀取CSV文件

對于CSV文件,在構(gòu)建DataFrame之前,必須要先創(chuàng)建一個Schema,再根據(jù)文件類型分不同情況進行導(dǎo)入。(讀取JSON文件或者數(shù)據(jù)庫表都并不需要) 注意:必須要import spark.implicits._,導(dǎo)入隱式類,才能夠識別一些隱式轉(zhuǎn)換,否則會報錯。 CSV文件在創(chuàng)建DataFrame時,可以選擇盡量模仿Hive中的OpenCSVSerDe的

val spark: SparkSession = SparkSession.builder()

.config(conf)

.getOrCreate()

import spark.implicits._

val schema: StructType = StructType(

Seq(

StructField("user_id", LongType),

StructField("locale", StringType),

StructField("birthYear", IntegerType),

StructField("gender", StringType),

StructField("joinedAt", StringType),

StructField("location", StringType),

StructField("timezone", StringType)

)

)

val frmUsers: DataFrame = spark.read

.schema(schema)

.option("separator", ",") // 指定文件分割符

.option("header", "true") // 指定CSV文件包含表頭

.option("quoteChar", "\"")

.option("escapeChar", "\\")

.csv("C:\\Users\\lenovo\\Desktop\\users.csv")

.repartition(4)

.cache()

讀取JSON文件

val frmUsers2: DataFrame = spark.read.json("hdfs://single01:9000/spark/cha02/users.json")

frmUsers2.show()

讀取數(shù)據(jù)庫表

val url = "jdbc:mysql://single01:3306/test_db_for_bigdata" // 數(shù)據(jù)庫連接地址

val mysql = new Properties()

mysql.setProperty("driver", "com.mysql.cj.jdbc.Driver")

mysql.setProperty("user", "root")

mysql.setProperty("password", "123456")

spark

.read

.jdbc(url,"test_table1_for_hbase_import",mysql) // (url,TableName,連接屬性)

.show(100)

六、Spark_SQL的兩種編碼方式

val spark: SparkSession = SparkSession.builder()

.config(conf)

.getOrCreate()

import spark.implicits._

val schema: StructType = StructType(

Seq(

StructField("user_id", LongType),

StructField("locale", StringType),

StructField("birthYear", IntegerType),

StructField("gender", StringType),

StructField("joinedAt", StringType),

StructField("location", StringType),

StructField("timezone", StringType)

)

)

val frmUsers: DataFrame = spark.read

.schema(schema)

.option("separator", ",") // 指定文件分割符

.option("header", "true") // 指定CSV文件包含表頭

.option("quoteChar", "\"")

.option("escapeChar", "\\")

.csv("C:\\Users\\lenovo\\Desktop\\users.csv")

.repartition(4)

.cache()

此處已經(jīng)創(chuàng)建好了DataFrame

1. 面向標(biāo)準SQL語句(偷懶用)

frmUsers.registerTempTable("user_info") // 此方法已過期

spark.sql(

"""

|select * from user_info

|where gender='female'

|""".stripMargin)

.show(10)

2. 使用Spark中的SQL算子(更規(guī)范)

frmUsers

.where($"birthYear">1990)

.groupBy($"locale")

.agg(

count($"locale").as("locale_count"),

round(avg($"birthYear"),2).as("avg_birth_year")

)

.where($"locale_count">=10 and $"avg_birth_year">=1993)

.orderBy($"locale_count".desc)

.select(

$"locale", $"locale_count", $"avg_birth_year",

dense_rank()

.over(win)

.as("rnk_by_locale_count"),

lag($"locale_count",1)

.over(win)

.as("last_locale_count")

)

.show(10)

七、常用算子

1.基本SQL模板

select

col,cols*,agg*

where

conditionCols

group by

col,cols*

having

condition

order by

col asc|desc

limit

n

2.select

select語句在代碼的開頭可以不寫,因為有后續(xù)的類似where和group by語句已經(jīng)對列進行了操作,指明了列名。如果后續(xù)有select語句,則優(yōu)先按照后面的select語句進行。

frmUsers.select(

$"locale",$"locale_count"

)

3.agg

.agg(

count($"locale").as("locale_count"),

round(avg($"birthYear"),2).as("avg_birth_year")

)

4.窗口函數(shù)

over子句

注意:over子句中的分區(qū)信息是可以被重用的

val win: WindowSpec = Window.partitionBy($"gender").orderBy($"locale_count".desc)

frmUsers

...

.select(

dense_rank()

.over(win)

.as("rnk_by_locale_count")

)

5.show

show(N)表示顯示符合條件的至多N條數(shù)據(jù)。(不是取前N條再提取出其中符合條件的數(shù)據(jù))

frmUsers

...

.show(10)

6.條件篩選 where

newCol:Column = $"cus_state".isNull

newCol:Column = $"cus_state".isNaN

newCol:Column = $"cus_state".isNotNull

newCol:Column = $"cus_state".gt(10) <=>$"cus_state">10

newCol:Column = $"cus_state".geq(10) <=>$"cus_state">=10

newCol:Column = $"cus_state".lt(10) <=>$"cus_state"<10

newCol:Column = $"cus_state".leq(10) <=>$"cus_state"<=10

newCol:Column = $"cus_state".eq(10) <=>$"cus_state"===10

newCol:Column = $"cus_state".ne(10) <=>$"cus_state"=!=10

newCol:Column = $"cus_state".between(10,20)

newCol:Column = $"cus_state".like("張%")

newCol:Column = $"cus_state".rlike("\\d+")

newCol:Column = $"cus_state".isin(list:Any*)

newCol:Column = $"cus_state".isInCollection(values:Itrable[_])

多條件:

newCol:Column = ColOne and ColTwo

newCol:Column = ColOne or ColTwo

在Spark SQL中,不存在Having子句,Where子句的實際作用根據(jù)相對于分組語句的前后決定。

7.分組

// 多重分組

/**

rollup的效果:

select birthYear,count(*) from user group by birthYear

union all

select gender,birthYear,count(*) from user group by gender,birthYear

存在"字段不對應(yīng)"的情況:

空缺的字段會自動補全為null

*/

frmUsers

.rollup("gender", "birthYear")

.count()

.show(100)

// 為了方便查找到每個數(shù)據(jù)行所對應(yīng)的分組方式

spark.sql(

"""

|select grouping__id,gender,birthYear,count(8) as cnt from user_info

|group by gender,birthday,

|grouping sets(gender,birthday,(gender,birthYear))

|""".stripMargin)

.show(100)

// 這里的group by子句定義了分組的列,到grouping sets明確指定了分組的組合

// 因而,在數(shù)倉設(shè)計的過程中,我們能夠?qū)Σ煌纸M依據(jù)下的不同數(shù)據(jù)依據(jù)grouping__id做分區(qū)。

RollUp和Cube的區(qū)別 假設(shè)有三列:1, 2, 3,使用CUBE(1, 2, 3),會生成以下組合:

GROUP BY ()(不分組,整體聚合)GROUP BY (1)GROUP BY (2)GROUP BY (3)GROUP BY (1, 2)GROUP BY (1, 3)GROUP BY (2, 3)GROUP BY (1, 2, 3) ROLLUP生成的分組組合是層級的,它從最詳細的分組開始,一步步減少分組的列,直到整體聚合。 假設(shè)有三列:1, 2, 3,使用ROLLUP(1, 2, 3),會生成以下組合:

GROUP BY (1, 2, 3)(最詳細的分組)GROUP BY (1, 2)GROUP BY (1)GROUP BY ()(不分組,整體聚合)

8.關(guān)聯(lián)查詢

val frmClass: DataFrame = spark.createDataFrame(

Seq(

Class(1, "yb12211"),

Class(2, "yb12309"),

Class(3, "yb12401")

)

)

val frmStu: DataFrame = spark.createDataFrame(

Seq(

Student("henry", 1),

Student("ariel", 2),

Student("jack", 1),

Student("rose", 4),

Student("jerry", 2),

Student("mary", 1)

)

)

// 1.笛卡爾積(默認情況下)

frmStu.as("S")

.join(frmClass.as("C"))

.show(100)

/**

+-----+-------+-------+---------+

| name|classId|classId|className|

+-----+-------+-------+---------+

|henry| 1 | 1 | yb12211|

|henry| 1 | 2 | yb12309|

|henry| 1 | 3 | yb12401|

|ariel| 2 | 1 | yb12211|

|ariel| 2 | 2 | yb12309|

|ariel| 2 | 3 | yb12401|

| jack| 1 | 1 | yb12211|

| jack| 1 | 2 | yb12309|

| jack| 1 | 3 | yb12401|

| rose| 4 | 1 | yb12211|

| rose| 4 | 2 | yb12309|

| rose| 4 | 3 | yb12401|

|jerry| 2 | 1 | yb12211|

|jerry| 2 | 2 | yb12309|

|jerry| 2 | 3 | yb12401|

| mary| 1 | 1 | yb12211|

| mary| 1 | 2 | yb12309|

| mary| 1 | 3 | yb12401|

+-----+-------+-------+---------+

*/

// 2.內(nèi)連接

frmStu.as("S")

.join(frmClass.as("C"), $"S.classId" === $"C.classId","inner")

.show(100)

/**

+-----+-------+-------+---------+

| name|classId|classId|className|

+-----+-------+-------+---------+

|henry| 1 | 1 | yb12211|

|ariel| 2 | 2 | yb12309|

| jack| 1 | 1 | yb12211|

|jerry| 2 | 2 | yb12309|

| mary| 1 | 1 | yb12211|

+-----+-------+-------+---------+

*/

// 啟用using:使用Seq("Column")代表關(guān)聯(lián)字段

frmStu.as("S")

.join(frmClass.as("C"), Seq("classId"),"right")

.show(100)

// 3.外連接

frmStu.as("S")

.join(frmClass.as("C"), $"S.classId" === $"C.classId","outer") // left | right | outer

.show(100)

/**

+-----+-------+-------+---------+

| name|classId|classId|className|

+-----+-------+-------+---------+

|henry| 1 | 1 | yb12211|

| jack| 1 | 1 | yb12211|

| mary| 1 | 1 | yb12211|

| null| null | 3 | yb12401|

| rose| 4 | null | null|

|ariel| 2 | 2 | yb12309|

|jerry| 2 | 2 | yb12309|

+-----+-------+-------+---------+

*/

// 4.反連接:返回左數(shù)據(jù)集中所有沒有關(guān)聯(lián)字段匹配記錄的左數(shù)據(jù)集的行

frmStu.as("S")

.join(frmClass.as("C"), $"S.classId" === $"C.classId","anti")

.show(100)

/**

+----+-------+

|name|classId|

+----+-------+

|rose| 4 |

+----+-------+

*/

// 5.半連接:返回左數(shù)據(jù)集中所有有關(guān)聯(lián)字段匹配記錄的左數(shù)據(jù)集的行

frmStu.as("S")

.join(frmClass.as("C"), $"S.classId" === $"C.classId","semi")

.show(100)

/**

+-----+-------+

| name|classId|

+-----+-------+

|henry| 1 |

|ariel| 2 |

| jack| 1 |

|jerry| 2 |

| mary| 1 |

+-----+-------+

*/

9.排序

frmStu.orderBy(cols:Column*)

10.數(shù)據(jù)截取

frmStu.tail(n:Int)

frmStu.take(n:Int)

八.SQL函數(shù)

常用函數(shù)

$"NAME" = col("NAME") // 取列值

as("ALIAS_NAME") // 別名

as(alias:Seq[String]) // 多個別名

when(CONDITION,V1) // 條件

.when(...)

.otherwise(VN)

lit(V) // 常量列

withColumn(colName:String,col:Column) // 擴展列(通常用于使用窗口函數(shù)做擴展列)

cast(DataType) // 類型轉(zhuǎn)換

常用函數(shù)案例

spark.createDataFrame(Seq(

Test(1,Array("money","freedom"),Map("java"->85,"c++"->92)),

Test(2,Array("beauty","writing"),Map("math"->91,"English"->88)),

Test(3,Array("movie","draw"),Map("Sql"->100,"LLM"->77))

))

// 多個explode不能寫在一個select中

.select($"id",explode($"hobbies").as("hobby"),$"scores")

.select($"id",$"hobby",explode($"scores").as(Seq("course","score")))

.select($"id",$"hobby",$"course",$"score".cast("Integer"))

.withColumn("score_rank",

when($"score">=90,lit("A")))

when($"score">=80,lit("B"))

when($"score">=70,lit("C"))

when($"score">=60,lit("D"))

.otherwise(lit("E"))

集合函數(shù)

array

size(collectCol:Column) // 計算數(shù)組大小

array(cols:Column*) // 一行中的多列轉(zhuǎn)為單列數(shù)組類型

array_sort(arrayCol:Column) // 對數(shù)組列中的元素進行排序

array_contains(arrayCol:Column,value:Any) // 依次判斷數(shù)組列的各個元素是否含有特定值

array_distinct(arrayCol:Column) // 對數(shù)組列的各個元素進行去重并返回去重后的結(jié)果

array_join(arrayCol:Column,sep:String,nullReplacement:String) // 對數(shù)組列的各個元素進行拼接

array_except(arrayCol:Column)

array_intersect(arrayCol:Column)

array_union(arrayCol:Column)

map

map_keys(mapCol:Column)

map_values(mapCol:Column)

map_entries(mapCol:Column)

集合函數(shù)案例

data.select($"id",size($"hobbies").as("hobbies_cnt")).show()

data.select($"id",array_sort($"hobbies").as("hobbies_sort")).show()

data.select($"id",array_contains($"hobbies","money")).show()

data.select($"id",array_distinct($"hobbies").as("unique_hobbies")).show()

data.select($"id",array_join($"hobbies",",","Unknown Value").as("union_hobby")).show()

data.withColumn("next_hobbies",lead($"hobbies",1) over(Window.orderBy("id")))

.where($"next_hobbies".isNotNull) // 提前做條件篩選

.select(

array_intersect($"hobbies",$"next_hobbies").as("intersect_hobbies")

)

.show(10)

data.select($"id",

map_keys($"scores").as("course_list"),

map_values($"scores").as("scores"),

map_entries($"scores").as("course_score_list")

).show()

// 考java的學(xué)生人數(shù)有多少

val num: Long = data.select(

array_contains(map_keys($"scores"), "java").as("isJava")

).filter($"isJava").count()

字符串函數(shù)

// 提取

// 1、提取 json

json_tuple(jsonCol:Column, fields:String*) // $"jsonString" => field1,field2 獲取單層Json字段

get_json_object(jsonCol:Column, path:String) // $"jsonString" => $.field1[.field2] 獲取多層嵌套Json字段

// 2、正則分組

regexp_extract(col:Column, pattern:String, groupId:Int)

// 3、分裂與截取

split(col:Column,pattern:String)

substring(col:Column,pos:Int,len:Int)

substring_index(col:Column,sep:String,groupId:Int)

// groupId +N 從左向右前N個

// groupId -N 從右向左前N個

// 第N個 substring_index(substring_index(COL,SEP,+N),SEP,-1)

// 4、子字符串在字段中的位置(表示子字符串的第一個字符在字符串中的索引位置)

locate(subStr:String,col:Column) // 有則>0,否則=0,

instr(col:Column,subStr:String)

// 5、字符串拼接

concat(cols:Column*)

concat_ws(sep:String,cols:Column*)

// 6、內(nèi)容長度

length(col:Column) // 字符長度

// 字節(jié)長度,未提供算子,需要通過 spark.sql(""" select octet_length(...)""") 實現(xiàn)

// 7、定長填充

lpad(col:Column,len:Int,pad:String)

rpad(col:Column,len:Int,pad:String)

// 8、清除兩端空格

ltrim(col:Column)

rtrim(col:Column)

trim(col:Column)

// 9、大小寫轉(zhuǎn)換

initcap(col:Column) // 每個單詞首字母大寫

upper(col:Column) // 全大寫

lower(col:Column) // 全小寫

hash(col:Column) // 去哈希值

regexp_replace(col:Column,pattern:String,replace:String) // 正則替換

translate(col:Column,from:String,to:String) // 按字母轉(zhuǎn)換

reverse(col:Column) // 翻轉(zhuǎn)

// 10、轉(zhuǎn)碼

encode(col:Column, charSet:String)

decode(col:Column, charSet:String)

// 11、非對稱加密

sha1(col:Column)

md5(col:Column)

字符串函數(shù)案例

val frm: DataFrame = spark

.createDataFrame(Seq(

Json(1, """{"name":"henry","age":22,"hobbies":["beauty","money","power"],"address":{"province":"jiangsu","city":"nanjing"}}"""),

Json(2, """{"name":"jack","age":23,"hobbies":["beauty","power"],"address":{"province":"jiangsu","city":"wuxi"}}"""),

Json(3, """{"name":"tom","age":24,"hobbies":["beauty","money"],"address":{"province":"jiangsu","city":"yancheng"}}""")

))

frm.select($"id",

json_tuple($"json","name","age","hobbies").as(Seq("name","age","hobbies")),

get_json_object($"json","$.address.province").as("province"),

get_json_object($"json","$.address.city").as("city")

).show(10)

// 通過正則提取獲取特定的日志信息

val regex_line = "(.*?) (INFO|WARN|ERROR) (.*?):(.*)"

val regex_log = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3} (INFO|WARN|ERROR) .*"

val frm: DataFrame = spark

.read

.text("spark-warehouse/datanode.log")

.toDF("line")

frm

.where($"line".rlike(regex_log))

.select(

regexp_extract($"line",regex_line,1).as("log_in_time"),

regexp_extract($"line",regex_line,2).as("log_type"),

regexp_extract($"line",regex_line,3).as("log_full_pack"),

regexp_extract($"line",regex_line,4).as("log_detail")

)

// 獲取錯誤日志信息中錯誤類別及其所占數(shù)量

.where($"log_type".equalTo("ERROR"))

.groupBy($"log_detail")

.count()

.show(100)

Spark自定義函數(shù)流程

自定義函數(shù)流程:定義-注冊-調(diào)用

建議將自定義函數(shù)實現(xiàn),單獨建對象保存 import java.nio.charset.{StandardCharsets}

import java.util.Base64

import javax.crypto.Cipher

import javax.crypto.spec.SecretKeySpec

object SparkUtil {

/**

* 處理密鑰

* @param secret 密鑰

*/

private def secretInit(secret:String)={

// 對密鑰長度進行約束

val allowNumBits: Array[Int] = Array(16, 24, 32)

// 如果密鑰長度符合,將密鑰轉(zhuǎn)換為AES密鑰對象

if (allowNumBits.contains(secret.size)) {

new SecretKeySpec(

secret.getBytes(StandardCharsets.UTF_8),"AES")

}else{

throw new RuntimeException(

s"AES secret size of numBits ${secret.size} not in

permitted values (${allowNumBits.mkString(",")})")

}

}

/**

* 加密函數(shù)

* @param src 源數(shù)據(jù)

* @param secret 密鑰

*/

def encrypt(src:String,secret:String)={

// 獲取加密算法實例

val cipher: Cipher = Cipher.getInstance("AES")

// 初始化加密模式,使用給定的密鑰(需要先用key()對密鑰進行處理)

cipher.init(Cipher.ENCRYPT_MODE,secretInit(secret))

// 執(zhí)行加密操作

val bytes: Array[Byte] = cipher.doFinal(src.getBytes(StandardCharset.UTF_8))

// 返回加密后的數(shù)據(jù)

Base64.getEncoder().encodeToString(bytes)

}

/**

* 解密函數(shù)

* @param dest 待解密數(shù)據(jù)

* @param secret 密鑰

*/

def decrypt(dest:String,secret:String)={

val cipher: Cipher = Cipher.getInstance("AES")

cipher.init(Cipher.DECRYPT_MODE,secretInit(secret))

val bytes: Array[Byte] = cipher.doFinal(

Base64.getDecoder.decode(dest))

new String(bytes, StandardCharsets.UTF_8)

}

}

在 Spark 環(huán)境下導(dǎo)入對象實現(xiàn)的方法,并在 SparkSession 中注冊 UDF 函數(shù) import core.SparkUtil.{encrypt,decrypt}

spark.udf.register(

"aes_encrypt",

(src:String,secret:String)

=>encrypt(src, secret),StringType)

spark.udf.register(

"aes_decrypt",

(src:String,secret:String)

=>decrypt(src, secret),StringType)

在 SparkSql 中調(diào)用注冊函數(shù) val frm: DataFrame = spark.createDataFrame(Seq(

Test(1,Array("money","freedom"),Map("java"->85,"mysql"->67)),

Test(2,Array("beauty","beauty"),Map("java"->72,"mysql"->90)),

Test(3,Array("sports","beauty"),Map("java"->76,"html"->52))

))

val secret = "henryyb2211ariel"

val frmEncrypt: DataFrame = frm

.select($"id",

callUDF(

"aes_encrypt",

array_join($"hobbies", ","),

lit(secret)

).as("encrypted_hobbies")

)

val frmDecrypt: DataFrame = frmEncrypt

.select($"id",

split(

callUDF(

"aes_decrypt",

$"encrypted_hobbies",

lit(secret)

),

","

).as("hobbies")

).show()

柚子快報邀請碼778899分享:大數(shù)據(jù) Spark SQL

http://yzkb.51969.com/

好文閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19177103.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄