欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:大數(shù)據 Spark SQL基礎

柚子快報激活碼778899分享:大數(shù)據 Spark SQL基礎

http://yzkb.51969.com/

Spark SQL基礎

Spark SQL介紹

? Spark SQL是一個用于結構化數(shù)據處理的Spark組件。所謂結構化數(shù)據,是指具有Schema信息的數(shù)據,例如JSON、Parquet、Avro、CSV格式的數(shù)據。與基礎的Spark RDD API不同,Spark SQL提供了對結構化數(shù)據的查詢和計算接口。

Spark SQL的主要特點:

將SQL查詢與Spark應用程序無縫組合

? Spark SQL允許使用SQL或熟悉的API在Spark程序中查詢結構化數(shù)據。與Hive不同的是,Hive是將SQL翻譯成MapReduce作業(yè),底層是基于MapReduce的;而Spark SQL底層使用的是Spark RDD。

可以連接到多種數(shù)據源

? Spark SQL提供了訪問各種數(shù)據源的通用方法,數(shù)據源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。

在現(xiàn)有的數(shù)據倉庫上運行SQL或HiveQL查詢

? Spark SQL支持HiveQL語法以及Hive SerDes和UDF (用戶自定義函數(shù)) ,允許訪問現(xiàn)有的Hive倉庫。

DataFrame和DataSet

DataFrame的結構

? DataFrame是Spark SQL提供的一個編程抽象,與RDD類似,也是一個分布式的數(shù)據集合。但與RDD不同的是,DataFrame的數(shù)據都被組織到有名字的列中,就像關系型數(shù)據庫中的表一樣。

? DataFrame在RDD的基礎上添加了數(shù)據描述信息(Schema,即元信息) ,因此看起來更像是一張數(shù)據庫表。例如,在一個RDD中有3行數(shù)據,將該RDD轉成DataFrame后,其中的數(shù)據可能如圖所示:

DataSet的結構 Dataset是一個分布式數(shù)據集,是Spark 1.6中添加的一個新的API。相比于RDD, Dataset提供了強類型支持,在RDD的每行數(shù)據加了類型約束。

? 在Spark中,一個DataFrame代表的是一個元素類型為Row的Dataset,即DataFrame只是Dataset[Row]的一個類型別名。

Spark SQL的基本使用

? Spark Shell啟動時除了默認創(chuàng)建一個名為sc的SparkContext的實例外,還創(chuàng)建了一個名為spark的SparkSession實例,該spark變量可以在Spark Shell中直接使用。

? SparkSession只是在SparkContext基礎上的封裝,應用程序的入口仍然是SparkContext。SparkSession允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序,支持從不同的數(shù)據源加載數(shù)據,并把數(shù)據轉換成DataFrame,然后使用SQL語句來操作DataFrame數(shù)據。

Spark SQL基本使用案例

在HDFS中有一個文件/input/person.txt,文件內容如下:

現(xiàn)需要使用Spark SQL將該文件中的數(shù)據按照年齡降序排列,步驟如下:

進入spark-shell環(huán)境

加載數(shù)據為Dataset

val d1 = spark.read.textFile("hdfs://192.168.121.131:9000/input/person.txt")

d1.show() # 查看d1中的數(shù)據內容

? 從上述代碼的結果可以看出,Dataset將文件中的每一行看作一個元素,并且所有元素組成了一列,列名默認為value。

給Dataset添加元數(shù)據信息

? 定義一個樣例類Person,用于存放數(shù)據描述信息,代碼如下:

case class Person(id:Int,name:String,age:Int)

? 注:Scala有一種特殊的類叫做樣例類(case class)。默認情況下,樣例類一般用于不可變對象(樣例類構造參數(shù)默認聲明為val)。

? 調用Dataset的map()算子將每一個元素拆分并存入Person類中,代碼如下:

val personDataset = d1.map(line=>{

val fields = line.split(",")

val id = fields(0).toInt

val name = fields(1)

val age = fields(2).toInt

Person(id,name,age)

})

personDataset.show() # 查看personDataset中的數(shù)據內容

可以看到,personDataset中的數(shù)據類似于一張關系型數(shù)據庫的表。

將Dataset轉為DataFrame

? Spark SQL查詢的是DataFrame中的數(shù)據,因此需要將存有元數(shù)據信息的Dataset轉為DataFrame。

? 調用Dataset的toDF()方法,將存有元數(shù)據的Dataset轉為DataFrame,代碼如下:

val pdf = personDataset.toDF()

執(zhí)行SQL查詢

? 在DataFrame上創(chuàng)建一個臨時視圖v_person,并使用SparkSession對象執(zhí)行SQL查詢,代碼如下:

pdf.createTempView("v_person")

val result = spark.sql("select * from v_person order by age desc")

result.show()

Spark SQL函數(shù)

內置函數(shù)

? Spark SQL內置了大量的函數(shù),位于API org.apache.spark.sql.functions

中。其中大部分函數(shù)與Hive中的相同。

? 使用內置函數(shù)有兩種方式:一種是通過編程的方式使用;另一種是在SQL

語句中使用。

以編程的方式使用lower()函數(shù)將用戶姓名轉為小寫/大寫,代碼如下:

df.select(lower(col("name")).as("greet")).show()

df.select(upper(col("name")).as("greet")).show()

? 上述代碼中,df指的是DataFrame對象,使用select()方法傳入需要查詢的列,使用as()方法指定列的別名。代碼col(“name”)指定要查詢的列,也可以使用$"name"代替,代碼如下:

df.select(lower($"name").as("greet")).show()

以SQL語句的方式使用lower()函數(shù),代碼如下:

df.createTempView("temp")

spark.sql("select upper(name) as greet from temp").show()

? 除了可以使用select()方法查詢指定的列外,還可以直接使用filter()、groupBy()等方法對DataFrame數(shù)據進行過濾和分組,例如以下代碼:

df.printSchema() # 打印Schema信息

df.select("name").show() # 查詢name列

# 查詢name列和age列,其中將age列的值增加1

df.select($"name",$"age"+1).show()

df.filter($"age">25).show() # 查詢age>25的所有數(shù)據

# 根據age進行分組,并求每一組的數(shù)量

df.groupBy("age").count().show()

自定義函數(shù)

? 當Spark SQL提供的內置函數(shù)不能滿足查詢需求時,用戶可以根據需求編寫自定義函數(shù)(User Defined Functions, UDF),然后在Spark SQL中調用。

? 例如有這樣一個需求:為了保護用戶的隱私,當查詢數(shù)據的時候,需要將用戶手機號的中間4位數(shù)字用星號()代替,比如手機號180***2688。這時就可以編寫一個自定義函數(shù)來實現(xiàn)這個需求,實現(xiàn)代碼如下:

?

package spark.demo.sql

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.types.{StringType, StructField, StructType}

import org.apache.spark.sql.{Row, SparkSession}

/**

* 用戶自定義函數(shù),隱藏手機號中間4位

*/

object SparkSQLUDF {

def main(args: Array[String]): Unit = {

//創(chuàng)建或得到SparkSession

val spark = SparkSession.builder()

.appName("SparkSQLUDF")

.master("local[*]")

.getOrCreate()

//第一步:創(chuàng)建測試數(shù)據(或直接從文件中讀?。?/p>

//模擬數(shù)據

val arr=Array("18001292080","13578698076","13890890876")

//將數(shù)組數(shù)據轉為RDD

val rdd: RDD[String] = spark.sparkContext.parallelize(arr)

//將RDD[String]轉為RDD[Row]

val rowRDD: RDD[Row] = rdd.map(line=>Row(line))

//定義數(shù)據的schema

val schema=StructType(

List{

StructField("phone",StringType,true)

}

)

//將RDD[Row]轉為DataFrame

val df = spark.createDataFrame(rowRDD, schema)

//第二步:創(chuàng)建自定義函數(shù)(phoneHide)

val phoneUDF=(phone:String)=>{

var result = "手機號碼錯誤!"

if (phone != null && (phone.length==11)) {

val sb = new StringBuffer

sb.append(phone.substring(0, 3))

sb.append("****")

sb.append(phone.substring(7))

result = sb.toString

}

result

}

//注冊函數(shù)(第一個參數(shù)為函數(shù)名稱,第二個參數(shù)為自定義的函數(shù))

spark.udf.register("phoneHide",phoneUDF)

//第三步:調用自定義函數(shù)

df.createTempView("t_phone") //創(chuàng)建臨時視圖

spark.sql("select phoneHide(phone) as phone from t_phone").show()

// +-----------+

// | phone|

// +-----------+

// |180****2080|

// |135****8076|

// |138****0876|

// +-----------+

}

}

窗口(開窗)函數(shù)

? 開窗函數(shù)是為了既顯示聚合前的數(shù)據,又顯示聚合后的數(shù)據,即在每一行的最后一列添加聚合函數(shù)的結果。開窗口函數(shù)有以下功能:

同時具有分組和排序的功能不減少原表的行數(shù)開窗函數(shù)語法:

聚合類型開窗函數(shù)

sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])

排序類型開窗函數(shù)

ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])

以row_number()開窗函數(shù)為例:

? 開窗函數(shù)row_number()是Spark SQL中常用的一個窗口函數(shù),使用該函數(shù)可以在查詢結果中對每個分組的數(shù)據,按照其排列的順序添加一列行號(從1開始),根據行號可以方便地對每一組數(shù)據取前N行(分組取TopN)。row_number()函數(shù)的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行號列別名

上述格式說明如下:

partition by:按照某一列進行分組;

order by:分組后按照某一列進行組內排序;

desc:降序,默認升序。

例如,統(tǒng)計每一個產品類別的銷售額前3名,代碼如下:

package spark.demo.sql

import org.apache.spark.sql.types._

import org.apache.spark.sql.{Row, SparkSession}

/**

* 統(tǒng)計每一個產品類別的銷售額前3名(相當于分組求TOPN)

*/

object SparkSQLWindowFunctionDemo {

def main(args: Array[String]): Unit = {

//創(chuàng)建或得到SparkSession

val spark = SparkSession.builder()

.appName("SparkSQLWindowFunctionDemo")

.master("local[*]")

.getOrCreate()

//第一步:創(chuàng)建測試數(shù)據(字段:日期、產品類別、銷售額)

val arr=Array(

"2019-06-01,A,500",

"2019-06-01,B,600",

"2019-06-01,C,550",

"2019-06-02,A,700",

"2019-06-02,B,800",

"2019-06-02,C,880",

"2019-06-03,A,790",

"2019-06-03,B,700",

"2019-06-03,C,980",

"2019-06-04,A,920",

"2019-06-04,B,990",

"2019-06-04,C,680"

)

//轉為RDD[Row]

val rowRDD=spark.sparkContext

.makeRDD(arr)

.map(line=>Row(

line.split(",")(0),

line.split(",")(1),

line.split(",")(2).toInt

))

//構建DataFrame元數(shù)據

val structType=StructType(Array(

StructField("date",StringType,true),

StructField("type",StringType,true),

StructField("money",IntegerType,true)

))

//將RDD[Row]轉為DataFrame

val df=spark.createDataFrame(rowRDD,structType)

//第二步:使用開窗函數(shù)取每一個類別的金額前3名

df.createTempView("t_sales") //創(chuàng)建臨時視圖

//執(zhí)行SQL查詢

spark.sql(

"select date,type,money,rank from " +

"(select date,type,money," +

"row_number() over (partition by type order by money desc) rank "+

"from t_sales) t " +

"where t.rank<=3"

).show()

}

}

柚子快報激活碼778899分享:大數(shù)據 Spark SQL基礎

http://yzkb.51969.com/

文章鏈接

評論可見,查看隱藏內容

本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19282698.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄