柚子快報激活碼778899分享:大數(shù)據 Spark SQL基礎
Spark SQL基礎
Spark SQL介紹
? Spark SQL是一個用于結構化數(shù)據處理的Spark組件。所謂結構化數(shù)據,是指具有Schema信息的數(shù)據,例如JSON、Parquet、Avro、CSV格式的數(shù)據。與基礎的Spark RDD API不同,Spark SQL提供了對結構化數(shù)據的查詢和計算接口。
Spark SQL的主要特點:
將SQL查詢與Spark應用程序無縫組合
? Spark SQL允許使用SQL或熟悉的API在Spark程序中查詢結構化數(shù)據。與Hive不同的是,Hive是將SQL翻譯成MapReduce作業(yè),底層是基于MapReduce的;而Spark SQL底層使用的是Spark RDD。
可以連接到多種數(shù)據源
? Spark SQL提供了訪問各種數(shù)據源的通用方法,數(shù)據源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。
在現(xiàn)有的數(shù)據倉庫上運行SQL或HiveQL查詢
? Spark SQL支持HiveQL語法以及Hive SerDes和UDF (用戶自定義函數(shù)) ,允許訪問現(xiàn)有的Hive倉庫。
DataFrame和DataSet
DataFrame的結構
? DataFrame是Spark SQL提供的一個編程抽象,與RDD類似,也是一個分布式的數(shù)據集合。但與RDD不同的是,DataFrame的數(shù)據都被組織到有名字的列中,就像關系型數(shù)據庫中的表一樣。
? DataFrame在RDD的基礎上添加了數(shù)據描述信息(Schema,即元信息) ,因此看起來更像是一張數(shù)據庫表。例如,在一個RDD中有3行數(shù)據,將該RDD轉成DataFrame后,其中的數(shù)據可能如圖所示:
DataSet的結構 Dataset是一個分布式數(shù)據集,是Spark 1.6中添加的一個新的API。相比于RDD, Dataset提供了強類型支持,在RDD的每行數(shù)據加了類型約束。
? 在Spark中,一個DataFrame代表的是一個元素類型為Row的Dataset,即DataFrame只是Dataset[Row]的一個類型別名。
Spark SQL的基本使用
? Spark Shell啟動時除了默認創(chuàng)建一個名為sc的SparkContext的實例外,還創(chuàng)建了一個名為spark的SparkSession實例,該spark變量可以在Spark Shell中直接使用。
? SparkSession只是在SparkContext基礎上的封裝,應用程序的入口仍然是SparkContext。SparkSession允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序,支持從不同的數(shù)據源加載數(shù)據,并把數(shù)據轉換成DataFrame,然后使用SQL語句來操作DataFrame數(shù)據。
Spark SQL基本使用案例
在HDFS中有一個文件/input/person.txt,文件內容如下:
現(xiàn)需要使用Spark SQL將該文件中的數(shù)據按照年齡降序排列,步驟如下:
進入spark-shell環(huán)境
加載數(shù)據為Dataset
val d1 = spark.read.textFile("hdfs://192.168.121.131:9000/input/person.txt")
d1.show() # 查看d1中的數(shù)據內容
? 從上述代碼的結果可以看出,Dataset將文件中的每一行看作一個元素,并且所有元素組成了一列,列名默認為value。
給Dataset添加元數(shù)據信息
? 定義一個樣例類Person,用于存放數(shù)據描述信息,代碼如下:
case class Person(id:Int,name:String,age:Int)
? 注:Scala有一種特殊的類叫做樣例類(case class)。默認情況下,樣例類一般用于不可變對象(樣例類構造參數(shù)默認聲明為val)。
? 調用Dataset的map()算子將每一個元素拆分并存入Person類中,代碼如下:
val personDataset = d1.map(line=>{
val fields = line.split(",")
val id = fields(0).toInt
val name = fields(1)
val age = fields(2).toInt
Person(id,name,age)
})
personDataset.show() # 查看personDataset中的數(shù)據內容
可以看到,personDataset中的數(shù)據類似于一張關系型數(shù)據庫的表。
將Dataset轉為DataFrame
? Spark SQL查詢的是DataFrame中的數(shù)據,因此需要將存有元數(shù)據信息的Dataset轉為DataFrame。
? 調用Dataset的toDF()方法,將存有元數(shù)據的Dataset轉為DataFrame,代碼如下:
val pdf = personDataset.toDF()
執(zhí)行SQL查詢
? 在DataFrame上創(chuàng)建一個臨時視圖v_person,并使用SparkSession對象執(zhí)行SQL查詢,代碼如下:
pdf.createTempView("v_person")
val result = spark.sql("select * from v_person order by age desc")
result.show()
Spark SQL函數(shù)
內置函數(shù)
? Spark SQL內置了大量的函數(shù),位于API org.apache.spark.sql.functions
中。其中大部分函數(shù)與Hive中的相同。
? 使用內置函數(shù)有兩種方式:一種是通過編程的方式使用;另一種是在SQL
語句中使用。
以編程的方式使用lower()函數(shù)將用戶姓名轉為小寫/大寫,代碼如下:
df.select(lower(col("name")).as("greet")).show()
df.select(upper(col("name")).as("greet")).show()
? 上述代碼中,df指的是DataFrame對象,使用select()方法傳入需要查詢的列,使用as()方法指定列的別名。代碼col(“name”)指定要查詢的列,也可以使用$"name"代替,代碼如下:
df.select(lower($"name").as("greet")).show()
以SQL語句的方式使用lower()函數(shù),代碼如下:
df.createTempView("temp")
spark.sql("select upper(name) as greet from temp").show()
? 除了可以使用select()方法查詢指定的列外,還可以直接使用filter()、groupBy()等方法對DataFrame數(shù)據進行過濾和分組,例如以下代碼:
df.printSchema() # 打印Schema信息
df.select("name").show() # 查詢name列
# 查詢name列和age列,其中將age列的值增加1
df.select($"name",$"age"+1).show()
df.filter($"age">25).show() # 查詢age>25的所有數(shù)據
# 根據age進行分組,并求每一組的數(shù)量
df.groupBy("age").count().show()
自定義函數(shù)
? 當Spark SQL提供的內置函數(shù)不能滿足查詢需求時,用戶可以根據需求編寫自定義函數(shù)(User Defined Functions, UDF),然后在Spark SQL中調用。
? 例如有這樣一個需求:為了保護用戶的隱私,當查詢數(shù)據的時候,需要將用戶手機號的中間4位數(shù)字用星號()代替,比如手機號180***2688。這時就可以編寫一個自定義函數(shù)來實現(xiàn)這個需求,實現(xiàn)代碼如下:
?
package spark.demo.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* 用戶自定義函數(shù),隱藏手機號中間4位
*/
object SparkSQLUDF {
def main(args: Array[String]): Unit = {
//創(chuàng)建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLUDF")
.master("local[*]")
.getOrCreate()
//第一步:創(chuàng)建測試數(shù)據(或直接從文件中讀?。?/p>
//模擬數(shù)據
val arr=Array("18001292080","13578698076","13890890876")
//將數(shù)組數(shù)據轉為RDD
val rdd: RDD[String] = spark.sparkContext.parallelize(arr)
//將RDD[String]轉為RDD[Row]
val rowRDD: RDD[Row] = rdd.map(line=>Row(line))
//定義數(shù)據的schema
val schema=StructType(
List{
StructField("phone",StringType,true)
}
)
//將RDD[Row]轉為DataFrame
val df = spark.createDataFrame(rowRDD, schema)
//第二步:創(chuàng)建自定義函數(shù)(phoneHide)
val phoneUDF=(phone:String)=>{
var result = "手機號碼錯誤!"
if (phone != null && (phone.length==11)) {
val sb = new StringBuffer
sb.append(phone.substring(0, 3))
sb.append("****")
sb.append(phone.substring(7))
result = sb.toString
}
result
}
//注冊函數(shù)(第一個參數(shù)為函數(shù)名稱,第二個參數(shù)為自定義的函數(shù))
spark.udf.register("phoneHide",phoneUDF)
//第三步:調用自定義函數(shù)
df.createTempView("t_phone") //創(chuàng)建臨時視圖
spark.sql("select phoneHide(phone) as phone from t_phone").show()
// +-----------+
// | phone|
// +-----------+
// |180****2080|
// |135****8076|
// |138****0876|
// +-----------+
}
}
窗口(開窗)函數(shù)
? 開窗函數(shù)是為了既顯示聚合前的數(shù)據,又顯示聚合后的數(shù)據,即在每一行的最后一列添加聚合函數(shù)的結果。開窗口函數(shù)有以下功能:
同時具有分組和排序的功能不減少原表的行數(shù)開窗函數(shù)語法:
聚合類型開窗函數(shù)
sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])
排序類型開窗函數(shù)
ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])
以row_number()開窗函數(shù)為例:
? 開窗函數(shù)row_number()是Spark SQL中常用的一個窗口函數(shù),使用該函數(shù)可以在查詢結果中對每個分組的數(shù)據,按照其排列的順序添加一列行號(從1開始),根據行號可以方便地對每一組數(shù)據取前N行(分組取TopN)。row_number()函數(shù)的使用格式如下:
row_number() over (partition by 列名 order by 列名 desc) 行號列別名
上述格式說明如下:
partition by:按照某一列進行分組;
order by:分組后按照某一列進行組內排序;
desc:降序,默認升序。
例如,統(tǒng)計每一個產品類別的銷售額前3名,代碼如下:
package spark.demo.sql
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
/**
* 統(tǒng)計每一個產品類別的銷售額前3名(相當于分組求TOPN)
*/
object SparkSQLWindowFunctionDemo {
def main(args: Array[String]): Unit = {
//創(chuàng)建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLWindowFunctionDemo")
.master("local[*]")
.getOrCreate()
//第一步:創(chuàng)建測試數(shù)據(字段:日期、產品類別、銷售額)
val arr=Array(
"2019-06-01,A,500",
"2019-06-01,B,600",
"2019-06-01,C,550",
"2019-06-02,A,700",
"2019-06-02,B,800",
"2019-06-02,C,880",
"2019-06-03,A,790",
"2019-06-03,B,700",
"2019-06-03,C,980",
"2019-06-04,A,920",
"2019-06-04,B,990",
"2019-06-04,C,680"
)
//轉為RDD[Row]
val rowRDD=spark.sparkContext
.makeRDD(arr)
.map(line=>Row(
line.split(",")(0),
line.split(",")(1),
line.split(",")(2).toInt
))
//構建DataFrame元數(shù)據
val structType=StructType(Array(
StructField("date",StringType,true),
StructField("type",StringType,true),
StructField("money",IntegerType,true)
))
//將RDD[Row]轉為DataFrame
val df=spark.createDataFrame(rowRDD,structType)
//第二步:使用開窗函數(shù)取每一個類別的金額前3名
df.createTempView("t_sales") //創(chuàng)建臨時視圖
//執(zhí)行SQL查詢
spark.sql(
"select date,type,money,rank from " +
"(select date,type,money," +
"row_number() over (partition by type order by money desc) rank "+
"from t_sales) t " +
"where t.rank<=3"
).show()
}
}
柚子快報激活碼778899分享:大數(shù)據 Spark SQL基礎
文章鏈接
本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。