柚子快報邀請碼778899分享:大數(shù)據(jù)學(xué)習(xí)-Spark
柚子快報邀請碼778899分享:大數(shù)據(jù)學(xué)習(xí)-Spark
大數(shù)據(jù)學(xué)習(xí)-Spark
1.Spark-core
1.Demo1WordCount
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/*
RDD: 彈性的分布式數(shù)據(jù)集
*/
object Demo1WordCount {
def main(args: Array[String]): Unit = {
//1、創(chuàng)建Spark環(huán)境
//1.1 創(chuàng)建配置文件對象
val conf: SparkConf = new SparkConf()
//1.2 指定運行的模式(local Standalone Mesos YARN)
conf.setMaster("local") //可以執(zhí)行所運行需要核數(shù)資源local[2],不指定的話默認使用所有的資源執(zhí)行程序
//1.3 給spark作業(yè)起一個名字
conf.setAppName("wc")
//2、創(chuàng)建spark運行時的上下文對象
val sparkContext: SparkContext = new SparkContext(conf)
//3、讀取文件數(shù)據(jù)
val wordsLine: RDD[String] = sparkContext.textFile("spark/data/words.txt")
//4、每一行根據(jù)|分隔符進行切分
val words: RDD[String] = wordsLine.flatMap(_.split("\\|"))
val wordsTuple2: RDD[(String, Int)] = words.map((_, 1))
val wordsTuple2Group: RDD[(String, Iterable[(String, Int)])] = wordsTuple2.groupBy(_._1)
val wordCount: RDD[(String, Int)] = wordsTuple2Group.map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))
wordCount.saveAsTextFile("spark/data/word_count") //將最終的結(jié)果保存在本地目錄
}
}
2.Demo2Partition
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* spark的運行過程中如果出現(xiàn)了相同的鍵被拉取到對應(yīng)的分區(qū),這個過程稱之為shuffle
* 注:spark的shuffle和mapreduce的shuffle原理是一樣,都是要進行落盤
*
* RDD: 彈性的分布式數(shù)據(jù)集
* 彈性:RDD將來在計算的時候,其中的數(shù)據(jù)可以是很大,也可以是很小
* 分布式:數(shù)據(jù)可以分布在多臺服務(wù)器中,RDD中的分區(qū)來自于block塊,而今后的block塊會來自不同的datanode
* 數(shù)據(jù)集:RDD自身是不存儲數(shù)據(jù)的,只是一個代碼計算邏輯,今后觸發(fā)作業(yè)執(zhí)行的時候,數(shù)據(jù)會在RDD之間流動
*
*/
object Demo2Partition {
def main(args: Array[String]): Unit = {
//1、創(chuàng)建Spark環(huán)境
//1.1 創(chuàng)建配置文件對象
val conf: SparkConf = new SparkConf()
//1.2 指定運行的模式(local Standalone Mesos YARN)
conf.setMaster("local") //可以執(zhí)行所運行需要核數(shù)資源local[2],不指定的話默認使用所有的資源執(zhí)行程序
//1.3 給spark作業(yè)起一個名字
conf.setAppName("wc")
//2、創(chuàng)建spark運行時的上下文對象
val sparkContext: SparkContext = new SparkContext(conf)
//3、讀取文件數(shù)據(jù)
// val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*", minPartitions = 7)
val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*")
println(s"wordsLineRDD分區(qū)數(shù)是:${wordsLine.getNumPartitions}")
//4、每一行根據(jù)|分隔符進行切分
val words: RDD[String] = wordsLine.flatMap(_.split("\\|"))
println(s"wordsRDD分區(qū)數(shù)是:${words.getNumPartitions}")
val wordsTuple2: RDD[(String, Int)] = words.map((_, 1))
println(s"wordsTuple2RDD分區(qū)數(shù)是:${wordsTuple2.getNumPartitions}")
//產(chǎn)生shuffle的算子上可以單獨設(shè)置分區(qū)數(shù)
val wordsTuple2Group: RDD[(String, Iterable[(String, Int)])] = wordsTuple2.groupBy(_._1, 5)
println(s"wordsTuple2GroupRDD分區(qū)數(shù)是:${wordsTuple2Group.getNumPartitions}")
val wordCount: RDD[(String, Int)] = wordsTuple2Group.map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))
println(s"wordCountRDD分區(qū)數(shù)是:${wordCount.getNumPartitions}")
wordCount.saveAsTextFile("spark/data/word_count2")
}
}
RDD5大特性:(面試必會的?。。?)RDD是由一些分區(qū)構(gòu)成的 讀取文件時有多少個block塊,RDD中就會有多少個分區(qū)注:默認情況下,所有的RDD中的分區(qū)數(shù)是一樣的,無論是shuffle之前還是shuffle之后的,在最開始加載數(shù)據(jù)的時候決定的
2)函數(shù)實際上是作用在RDD中的分區(qū)上的,一個分區(qū)是由一個task處理,有多少個分區(qū),總共就有多少個task注:函數(shù)在spark中稱之為算子(轉(zhuǎn)換transformation算子 RDD–>RDD,行動action算子 RDD->Other數(shù)據(jù)類型)
3)RDD之間存在一些依賴關(guān)系,后一個RDD中的數(shù)據(jù)是依賴與前一個RDD的計算結(jié)果,數(shù)據(jù)像水流一樣在RDD之間流動注:3.1 RDD之間有兩種依賴關(guān)系 **a. 窄依賴 后一個RDD中分區(qū)數(shù)據(jù)對應(yīng)前一個RDD中的一個分區(qū)數(shù)據(jù) 1對1的關(guān)系**
**b. 寬依賴 后一個RDD中分區(qū)數(shù)據(jù)來自于前一個RDD中的多個分區(qū)數(shù)據(jù) 1對多的關(guān)系 shuffle**
**3.2 因為有了依賴關(guān)系,將整個作業(yè)劃分了一個一個stage階段 sumNum(stage) = Num(寬依賴) + 1**
**3.3 窄依賴的分區(qū)數(shù)是不可以改變,取決于第一個RDD分區(qū)數(shù),寬依賴可以在產(chǎn)生shuffle的算子上設(shè)置分區(qū)數(shù)**
4)分區(qū)類的算子只能作用在kv格式的RDD上,groupByKey reduceByKey5)spark為task計算提供了精確的計算位置,移動計算而不移動數(shù)據(jù)
3.Demo3Map
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/*
map算子:轉(zhuǎn)換算子
一個spark作業(yè),由最后的一個action算子來觸發(fā)執(zhí)行的,若沒有action算子,整個作業(yè)不執(zhí)行
RDD具有懶執(zhí)行的特點
*/
object Demo3Map {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
/**
* map算子:將rdd中的數(shù)據(jù),一條一條的取出來傳入到map函數(shù)中,map會返回一個新的rdd,map不會改變總數(shù)據(jù)條數(shù)
*/
val splitRDD: RDD[List[String]] = studentRDD.map((s: String) => {
println("============數(shù)加防偽碼================")
s.split(",").toList
})
// splitRDD.foreach(println)
}
}
4.Demo4Filter
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo4Filter {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
/**
* filter: 過濾,將RDD中的數(shù)據(jù)一條一條取出傳遞給filter后面的函數(shù),如果函數(shù)的結(jié)果是true,該條數(shù)據(jù)就保留,否則丟棄
*
* filter一般情況下會減少數(shù)據(jù)的條數(shù)
*/
val filterRDD: RDD[String] = studentRDD.filter((s: String) => {
val strings: Array[String] = s.split(",")
"男".equals(strings(3))
})
filterRDD.foreach(println)
}
}
5.Demo5flatMap
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo5flatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("flatMap算子演示")
val context = new SparkContext(conf)
//====================================================
val linesRDD: RDD[String] = context.textFile("spark/data/words.txt")
/**
* flatMap算子:將RDD中的數(shù)據(jù)一條一條的取出傳遞給后面的函數(shù),函數(shù)的返回值必須是一個集合。最后會將集合展開構(gòu)成一個新的RDD
*/
val wordsRDD: RDD[String] = linesRDD.flatMap((line: String) => line.split("\\|"))
wordsRDD.foreach(println)
}
}
6.Demo6sample
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo6sample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("flatMap算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
/**
* sample算子:從前一個RDD的數(shù)據(jù)中抽樣一部分數(shù)據(jù)
*
* 抽取的比例不是正好對應(yīng)的,在抽取的比例上下浮動 比如1000條抽取10% 抽取的結(jié)果在100條左右
*/
val sampleRDD: RDD[String] = studentRDD.sample(withReplacement = true, 0.1)
sampleRDD.foreach(println)
}
}
7.Demo7GroupBy
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo7GroupBy {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("groupBy算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))
//需求:求出每個班級平均年齡
//使用模式匹配的方式取出班級和年齡
val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {
case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)
}
/**
* groupBy:按照指定的字段進行分組,返回的是一個鍵是分組字段,值是一個存放原本數(shù)據(jù)的迭代器的鍵值對 返回的是kv格式的RDD
*
* key: 是分組字段
* value: 是spark中的迭代器
* 迭代器中的數(shù)據(jù),不是完全被加載到內(nèi)存中計算,迭代器只能迭代一次
*
* groupBy會產(chǎn)生shuffle
*/
//按照班級進行分組
//val stringToStudents: Map[String, List[Student]] = stuList.groupBy((s: Student) => s.clazz)
val kvRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
case (clazz: String, itr: Iterable[(String, Int)]) =>
//CompactBuffer((理科二班,21), (理科二班,23), (理科二班,21), (理科二班,23), (理科二班,21), (理科二班,21), (理科二班,24))
//CompactBuffer(21,23,21,23,21,21,24)
val allAge: Iterable[Int] = itr.map((kv: (String, Int)) => kv._2)
val avgAge: Double = allAge.sum.toDouble / allAge.size
(clazz, avgAge)
}
clazzAvgAgeRDD.foreach(println)
while (true){
}
}
}
8.Demo8GroupByKey
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo8GroupByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("groupByKey算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))
//需求:求出每個班級平均年齡
//使用模式匹配的方式取出班級和年齡
val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {
case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)
}
/**
* groupByKey: 按照鍵進行分組,將value值構(gòu)成迭代器返回
* 將來你在spark中看到RDD[(xx, xxx)] 這樣的RDD就是kv鍵值對類型的RDD
* 只有kv類型鍵值對RDD才可以調(diào)用groupByKey算子
*
*/
val kvRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
case (clazz: String, ageItr: Iterable[Int]) =>
(clazz, ageItr.sum.toDouble / ageItr.size)
}
clazzAvgAgeRDD.foreach(println)
while (true){
}
}
}
groupBy與groupByKey的區(qū)別(spark的面試題)
* 1、代碼上的區(qū)別:任意一個RDD都可以調(diào)用groupBy算子,只有kv類型的RDD才可以調(diào)用groupByKey
* 2、groupByKey之后產(chǎn)生的RDD的結(jié)構(gòu)比較簡單,方便后續(xù)處理
* 3、groupByKey的性能更好,執(zhí)行速度更快,因為groupByKey相比較與groupBy算子來說,shuffle所需要的數(shù)據(jù)量較少
9.Demo9ReduceByKey
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo9ReduceByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("reduceByKey算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))
//求每個班級的人數(shù)
val clazzKVRDD: RDD[(String, Int)] = splitRDD.map {
case Array(_, _, _, _, clazz: String) => (clazz, 1)
}
/**
* 利用groupByKey實現(xiàn)
*/
// val kvRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()
// val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
// case (clazz: String, n: Iterable[Int]) =>
// (clazz, n.sum)
// }
// clazzAvgAgeRDD.foreach(println)
/**
* 利用reduceByKey實現(xiàn):按照鍵key對value值直接進行聚合,需要傳入聚合的方式
* reduceByKey算子也是只有kv類型的RDD才能調(diào)用
*
*
*/
val countRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey((x: Int, y: Int) => x + y)
countRDD.foreach(println)
// clazzKVRDD.groupByKey()
// .map(kv=>(kv._1,kv._2.sum))
// .foreach(println)
while (true){
}
/**
*/
}
}
reduceByKey與groupByKey的區(qū)別
1、reduceByKey比groupByKey在map端多了一個預(yù)聚合的操作,預(yù)聚合之后的shuffle數(shù)據(jù)量肯定是要少很多的,性能上比groupByKey要好2、從靈活角度來看,reduceByKey并沒有g(shù)roupByKey靈活比如reduceByKey無法做方差,groupByKey后續(xù)可以完成
10.Demo10Union
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo10Union {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Union算子演示")
val context = new SparkContext(conf)
//====================================================
val w1RDD: RDD[String] = context.textFile("spark/data/ws/w1.txt") // 1
val w2RDD: RDD[String] = context.textFile("spark/data/ws/w2.txt") // 1
/**
* union:上下合并兩個RDD,前提是兩個RDD中的數(shù)據(jù)類型要一致,合并后不會對結(jié)果進行去重
*
* 注:這里的合并只是邏輯層面上的合并,物理層面其實是沒有合并
*/
val unionRDD: RDD[String] = w1RDD.union(w2RDD)
println(unionRDD.getNumPartitions) // 2
unionRDD.foreach(println)
while (true){
}
}
}
11.Demo11Join
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo11Join {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Join算子演示")
val context = new SparkContext(conf)
//====================================================
//兩個kv類型的RDD之間的關(guān)聯(lián)
//通過scala中的集合構(gòu)建RDD
val rdd1: RDD[(String, String)] = context.parallelize(
List(
("1001", "尚平"),
("1002", "丁義杰"),
("1003", "徐昊宇"),
("1004", "包旭"),
("1005", "朱大牛"),
("1006","汪權(quán)")
)
)
val rdd2: RDD[(String, String)] = context.parallelize(
List(
("1001", "崩壞"),
("1002", "原神"),
("1003", "王者"),
("1004", "修仙"),
("1005", "學(xué)習(xí)"),
("1007", "敲代碼")
)
)
/**
* 內(nèi)連接:join
* 左連接:leftJoin
* 右連接:rightJoin
* 全連接:fullJoin
*/
//內(nèi)連接
// val innerJoinRDD: RDD[(String, (String, String))] = rdd1.join(rdd2)
// //加工一下RDD
// val innerJoinRDD2: RDD[(String, String, String)] = innerJoinRDD.map {
// case (id: String, (name: String, like: String)) => (id, name, like)
// }
// innerJoinRDD2.foreach(println)
//左連接
val leftJoinRDD: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
//加工一下RDD
val leftJoinRDD2: RDD[(String, String, String)] = leftJoinRDD.map {
case (id: String, (name: String, Some(like))) => (id, name, like)
case (id: String, (name: String, None)) => (id, name, "無愛好")
}
leftJoinRDD2.foreach(println)
println("=================================")
//右連接自己試
//TODO:自己試右連接
//全連接
val fullJoinRDD: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
//加工一下RDD
val fullJoinRDD2: RDD[(String, String, String)] = fullJoinRDD.map {
case (id: String, (Some(name), Some(like))) => (id, name, like)
case (id: String, (Some(name), None)) => (id, name, "無愛好")
case (id: String, (None, Some(like))) => (id, "無姓名", like)
}
fullJoinRDD2.foreach(println)
}
}
12.Demo12Student
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo12Student {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Join算子演示")
val context = new SparkContext(conf)
//====================================================
//需求:統(tǒng)計總分年級排名前10的學(xué)生的各科分數(shù)
//讀取分數(shù)文件數(shù)據(jù)
val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 讀取數(shù)據(jù)文件
.map((s: String) => s.split(",")) // 切分數(shù)據(jù)
.filter((arr: Array[String]) => arr.length == 3) // 過濾掉臟數(shù)據(jù)
.map {
//整理數(shù)據(jù),進行模式匹配取出數(shù)據(jù)
case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)
}
//計算每個學(xué)生的總分
val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {
case (sid: String, _: String, score: String) => (sid, score.toInt)
}.reduceByKey((x: Int, y: Int) => x + y)
//按照總分排序
val sumScoreTop10: Array[(String, Int)] = sumScoreWithSidRDD.sortBy(-_._2).take(10)
//取出前10的學(xué)生學(xué)號
val ids: Array[String] = sumScoreTop10.map(_._1)
//取出每個學(xué)生各科分數(shù)
val top10StuScore: RDD[(String, String, String)] = scoreRDD.filter {
case (id: String, _, _) => ids.contains(id)
}
top10StuScore.foreach(println)
}
}
13.Demo13MapValues
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo13MapValues {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Join算子演示")
val context = new SparkContext(conf)
//====================================================
//需求:統(tǒng)計總分年級排名前10的學(xué)生的各科分數(shù)
//讀取分數(shù)文件數(shù)據(jù)
val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 讀取數(shù)據(jù)文件
.map((s: String) => s.split(",")) // 切分數(shù)據(jù)
.filter((arr: Array[String]) => arr.length == 3) // 過濾掉臟數(shù)據(jù)
.map {
//整理數(shù)據(jù),進行模式匹配取出數(shù)據(jù)
case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)
}
//計算每個學(xué)生的總分
val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {
case (sid: String, _: String, score: String) => (sid, score.toInt)
}.reduceByKey((x: Int, y: Int) => x + y)
/**
* mapValues算子:也是作用在kv類型的RDD上
* 主要的作用鍵不變,處理值
*/
val resRDD: RDD[(String, Int)] = sumScoreWithSidRDD.mapValues(_ + 1000)
resRDD.foreach(println)
//等同于
val res2RDD: RDD[(String, Int)] = sumScoreWithSidRDD.map((kv: (String, Int)) => (kv._1, kv._2 + 1000))
}
}
14.Demo14mapPartition
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo14mapPartition {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("mapPartition算子演示")
val context = new SparkContext(conf)
//====================================================
//需求:統(tǒng)計總分年級排名前10的學(xué)生的各科分數(shù)
//讀取分數(shù)文件數(shù)據(jù)
val scoreRDD: RDD[String] = context.textFile("spark/data/ws/*") // 讀取數(shù)據(jù)文件
println(scoreRDD.getNumPartitions)
/**
* mapPartition: 主要作用是一次處理一個分區(qū)的數(shù)據(jù),將一個分區(qū)的數(shù)據(jù)一個一個傳給后面的函數(shù)進行處理
*
* 迭代器中存放的是一個分區(qū)的數(shù)據(jù)
*/
// val mapPartitionRDD: RDD[String] = scoreRDD.mapPartitions((itr: Iterator[String]) => {
//
// println(s"====================當(dāng)前處理的分區(qū)====================")
// //這里寫的邏輯是作用在一個分區(qū)上的所有數(shù)據(jù)
// val words: Iterator[String] = itr.flatMap(_.split("\\|"))
// words
// })
// mapPartitionRDD.foreach(println)
scoreRDD.mapPartitionsWithIndex{
case (index:Int,itr: Iterator[String]) =>
println(s"當(dāng)前所處理的分區(qū)編號是:${index}")
itr.flatMap(_.split("\\|"))
}.foreach(println)
}
}
15.Demo15Actions
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo15Actions {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Action算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
/**
* 轉(zhuǎn)換算子:transformation 將一個RDD轉(zhuǎn)換成另一個RDD,轉(zhuǎn)換算子是懶執(zhí)行的,需要一個action算子觸發(fā)執(zhí)行
*
* 行動算子(操作算子):action算子,觸發(fā)任務(wù)執(zhí)行。一個action算子就會觸發(fā)一次任務(wù)執(zhí)行
*/
println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
val studentsRDD: RDD[(String, String, String, String, String)] = studentRDD.map(_.split(","))
.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
println("**************************** 數(shù)加防偽碼 ^_^ ********************************")
(id, name, age, gender, clazz)
}
println("$$$$$$$$$$$$$$$$$$$$$$***__***$$$$$$$$$$$$$$$$$$$$$$$$$")
// foreach其實就是一個action算子
// studentsRDD.foreach(println)
// println("="*100)
// studentsRDD.foreach(println)
// while (true){
//
// }
/**
* collect()行動算子 主要作用是將RDD轉(zhuǎn)成scala中的數(shù)據(jù)結(jié)構(gòu)
*
*/
val tuples: Array[(String, String, String, String, String)] = studentsRDD.collect()
}
}
16.Demo16Catch
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
object Demo16Catch {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Action算子演示")
val context = new SparkContext(conf)
//設(shè)置checkpoint路徑,將來對應(yīng)的是HDFS上的路徑
context.setCheckpointDir("spark/data/checkpoint")
//====================================================
val linesRDD: RDD[String] = context.textFile("spark/data/students.csv")
val splitRDD: RDD[Array[String]] = linesRDD.map(_.split(","))
//處理數(shù)據(jù)
val studentsRDD: RDD[(String, String, String, String, String)] = splitRDD.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
(id, name, age, gender, clazz)
}
//對studentsRDD進行緩存
/**
* 特點帶來的問題:既然叫做緩存,所以在程序運行過程中無論是只放內(nèi)存還是磁盤內(nèi)存一起使用,一旦程序結(jié)束,緩存數(shù)據(jù)全部丟失。
*
* spark針對上面的場景提供了一個解決方案:可以將RDD運行時的數(shù)據(jù)永久持久化在HDFS上,這個方案叫做checkpoint,需要在spark環(huán)境中設(shè)置checkpoint的路徑
*/
// studentsRDD.cache() //默認情況下,是將數(shù)據(jù)緩存在內(nèi)存中
// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK)
studentsRDD.checkpoint()
//統(tǒng)計每個班級的人數(shù)
val clazzKVRDD: RDD[(String, Int)] = studentsRDD.map {
case (_, _, _, _, clazz: String) => (clazz, 1)
}
val clazzNumRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey(_ + _)
clazzNumRDD.saveAsTextFile("spark/data/clazz_num")
//統(tǒng)計性別的人數(shù)
val genderKVRDD: RDD[(String, Int)] = studentsRDD.map {
case (_, _, _, gender: String, _) => (gender, 1)
}
val genderNumRDD: RDD[(String, Int)] = genderKVRDD.reduceByKey(_ + _)
genderNumRDD.saveAsTextFile("spark/data/gender_num")
//
// while (true){
//
// }
}
}
17.Demo17SparkStandaloneSubmit
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo17SparkStandaloneSubmit {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
/**
* 如果將來在linux集群中運行,這里就不需要設(shè)置setMaster
*/
// conf.setMaster("local")
val sparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sparkContext.parallelize(List("java,hello,world", "hello,scala,spark", "java,hello,spark"))
val wordRDD: RDD[String] = linesRDD.flatMap(_.split(","))
val wordKVRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
val countRDD: RDD[(String, Int)] = wordKVRDD.reduceByKey(_ + _)
countRDD.foreach(println)
/**
* 將項目打包放到spark集群中使用standalone模式運行
* standalone client
* spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 100
*
* standalone cluster
* spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 100
*
*/
}
}
18.Demo18SparkYarnSubmit
package com.shujia.core
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 因為是提交到y(tǒng)arn上,可以對hdfs上的數(shù)據(jù)進行讀寫
*/
object Demo18SparkYarnSubmit {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
/**
* 提交到y(tǒng)arn上運行,這個參數(shù)依舊不用設(shè)置
*/
// conf.setMaster("local")
conf.setAppName("yarn submit")
val context = new SparkContext(conf)
//讀取hdfs上數(shù)據(jù)
val linesRDD: RDD[String] = context.textFile("/bigdata29/data/students.csv")
println("="*100)
println(s"分區(qū)數(shù)為:${linesRDD.getNumPartitions}")
println("="*100)
val classKVRDD: RDD[(String, Int)] = linesRDD.map((line: String) => {
val clazz: String = line.split(",")(4)
(clazz, 1)
})
//統(tǒng)計班級人數(shù)
val clazzNumRDD: RDD[(String, Int)] = classKVRDD.reduceByKey(_ + _)
//整理一下要寫到結(jié)果文件中的數(shù)據(jù)格式
val resRDD: RDD[String] = clazzNumRDD.map((kv: (String, Int)) => s"${kv._1}\t${kv._2}")
//刪除已經(jīng)存在的路徑
val hadoopConf = new Configuration()
val fileSystem: FileSystem = FileSystem.get(hadoopConf)
//判斷路徑是否存在
if(fileSystem.exists(new Path("/bigdata29/sparkout1"))){
fileSystem.delete(new Path("/bigdata29/sparkout1"),true)
}
//將RDD中的數(shù)據(jù)保存到HDFS上的文件中
resRDD.saveAsTextFile("/bigdata29/sparkout1")
/**
* spark-submit --class com.shujia.core.Demo18SparkYarnSubmit --master yarn --deploy-mode client spark-1.0.jar
*/
}
}
19.Demo19PI
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
object Demo19PI {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
/**
* 提交到y(tǒng)arn上運行,這個參數(shù)依舊不用設(shè)置
*/
// conf.setMaster("local")
conf.setAppName("yarn submit")
val context = new SparkContext(conf)
//設(shè)置生成點的個數(shù) 10000
val list: Range.Inclusive = 0 to 1000000000
//將scala的序列集合變成rdd
val rangeRDD: RDD[Int] = context.parallelize(list)
//隨機生成正方形內(nèi)的點
val dianRDD: RDD[(Double, Double)] = rangeRDD.map((i: Int) => {
val x: Double = Random.nextDouble() * 2 - 1
val y: Double = Random.nextDouble() * 2 - 1
(x, y)
})
// println(dianRDD.count())
//取出圓中點的個數(shù)
val yuanZuoRDD: RDD[(Double, Double)] = dianRDD.filter {
case (x: Double, y: Double) =>
x * x + y * y < 1
}
// println(yuanZuoRDD.count())
//計算PI
println("="*100)
println(s"PI的值為:${(yuanZuoRDD.count().toDouble / dianRDD.count()) * 4}")
println("="*100)
/**
* spark-submit --class com.shujia.core.Demo19PI --master yarn --deploy-mode client spark-1.0.jar
*/
}
}
20.Demo20Accumulator
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
object Demo20Accumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val context = new SparkContext(conf)
//====================================================
val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
val scoreRDD: RDD[String] = context.textFile("spark/data/score.txt")
// var count = 0
// studentRDD.foreach((line:String)=>{
// count+=1
// println("-------------------------")
// println(count)
// println("-------------------------")
// })
// println(s"count的值為:${count}")
/**
* 累加器
*
* 由SparkContext來創(chuàng)建
* 注意:
* 1、因為累加器的執(zhí)行實在RDD中執(zhí)行的,而RDD是在Executor中執(zhí)行的,而要想在Executor中執(zhí)行就得有一個action算子觸發(fā)任務(wù)調(diào)度
* 2、sparkRDD中無法使用其他的RDD
* 3、SparkContext無法在RDD內(nèi)部使用,因為SparkContext對象無法進行序列化,不能夠通過網(wǎng)絡(luò)發(fā)送到Executor中
*/
// val accumulator: LongAccumulator = context.longAccumulator
// studentRDD.foreach((line:String)=>{
// accumulator.add(1)
// })
// studentRDD.map((line:String)=>{
// accumulator.add(1)
// }).collect()
// println(s"accumulator的值為:${accumulator.value}")
// val value: RDD[RDD[(String, String)]] = studentRDD.map((stuLine: String) => {
// scoreRDD.map((scoreLine: String) => {
// val strings: Array[String] = scoreLine.split(",")
// val strings1: Array[String] = stuLine.split(",")
// val str1: String = strings.mkString("|")
// val str2: String = strings1.mkString("|")
// (str1, str2)
// })
// })
// value.foreach(println)
// val value: RDD[RDD[String]] = studentRDD.map((stuLine: String) => {
// val scoreRDD: RDD[String] = context.textFile("spark/data/score.txt")
// scoreRDD
// })
// value.foreach(println)
}
}
21.Demo21Broadcast
package com.shujia.core
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source
object Demo21Broadcast {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("廣播變量演示")
val context = new SparkContext(conf)
//====================================================
//使用Scala的方式讀取學(xué)生數(shù)據(jù)文件,將其轉(zhuǎn)換以學(xué)號作為鍵的map集合,屬于在Driver端的一個變量
val studentsMap: Map[String, String] = Source.fromFile("spark/data/students.csv")
.getLines()
.toList
.map((line: String) => {
val infos: Array[String] = line.split(",")
val stuInfo: String = infos.mkString(",")
(infos(0), stuInfo)
}).toMap
val scoresRDD: RDD[String] = context.textFile("spark/data/score.txt")
/**
* 將studentsMap變成一個廣播變量,讓每一個將來需要執(zhí)行關(guān)聯(lián)的Executor中都有一份studentsMap數(shù)據(jù)
* 避免了每次Task任務(wù)拉取都要附帶一個副本,拉取的速度變快了,執(zhí)行速度也就變快了
*
* 廣播大變量
*/
val studentsMapBroadcast: Broadcast[Map[String, String]] = context.broadcast(studentsMap)
/**
* 將Spark讀取的分數(shù)RDD與外部變量學(xué)生Map集合進行關(guān)聯(lián)
* 循環(huán)遍歷scoresRDD,將學(xué)號一樣的學(xué)生信息關(guān)聯(lián)起來
*/
// val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {
// val id: String = score.split(",")(0)
// //使用學(xué)號到學(xué)生map集合中獲取學(xué)生信息
// val studentInfo: String = studentsMap.getOrElse(id, "無學(xué)生信息")
// (score, studentInfo)
// })
// resMapRDD.foreach(println)
/**
* 使用廣播變量進行關(guān)聯(lián)
*/
val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {
val id: String = score.split(",")(0)
val stuMap: Map[String, String] = studentsMapBroadcast.value
//使用學(xué)號到學(xué)生map集合中獲取學(xué)生信息
val studentInfo: String = stuMap.getOrElse(id, "無學(xué)生信息")
(score, studentInfo)
})
resMapRDD.foreach(println)
}
}
2.Spark-sql
Demo1WordCount
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
object Demo1WordCount {
def main(args: Array[String]): Unit = {
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("wc spark sql")
.getOrCreate()
/**
* spark sql和spark core的核心數(shù)據(jù)類型不太一樣
*
* 1、讀取數(shù)據(jù)構(gòu)建一個DataFrame,相當(dāng)于一張表
*/
val linesDF: DataFrame = sparkSession.read
.format("csv") //指定讀取數(shù)據(jù)的格式
.schema("line STRING") //指定列的名和列的類型,多個列之間使用,分割
.option("sep", "\n") //指定分割符,csv格式讀取默認是英文逗號
.load("spark/data/words.txt") // 指定要讀取數(shù)據(jù)的位置,可以使用相對路徑
// println(linesDF)
// linesDF.show() //查看DF中的數(shù)據(jù)內(nèi)容(表內(nèi)容)
// linesDF.printSchema() //查看DF表結(jié)構(gòu)
/**
* 2、DF本身是無法直接在上面寫sql的,需要將DF注冊成一個視圖,才可以寫sql數(shù)據(jù)分析
*/
linesDF.createOrReplaceTempView("lines") // 起一個表名,后面的sql語句可以做查詢分析
/**
* 3、可以編寫sql語句 (統(tǒng)計單詞的數(shù)量)
* spark sql是完全兼容hive sql
*/
val resDF: DataFrame = sparkSession.sql(
"""
|select
|t1.word as word,
|count(1) as counts
|from
|(select
| explode(split(line,'\\|')) as word from lines) t1
| group by t1.word
|""".stripMargin)
/**
* 4、將計算的結(jié)果DF保存到HDFS上
*/
val resDS: Dataset[Row] = resDF.repartition(1)
resDS.write
.format("csv") //指定輸出數(shù)據(jù)文件格式
.option("sep","\t") // 指定列之間的分隔符
.mode(SaveMode.Overwrite) // 使用SaveMode枚舉類,設(shè)置為覆蓋寫
.save("spark/data/sqlout1") // 指定輸出的文件夾
}
}
Demo2DSLWordCount
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Demo2DSLWordCount {
def main(args: Array[String]): Unit = {
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("wc spark sql")
.getOrCreate()
/**
* spark sql和spark core的核心數(shù)據(jù)類型不太一樣
*
* 1、讀取數(shù)據(jù)構(gòu)建一個DataFrame,相當(dāng)于一張表
*/
val linesDF: DataFrame = sparkSession.read
.format("csv") //指定讀取數(shù)據(jù)的格式
.schema("line STRING") //指定列的名和列的類型,多個列之間使用,分割
.option("sep", "\n") //指定分割符,csv格式讀取默認是英文逗號
.load("spark/data/words.txt") // 指定要讀取數(shù)據(jù)的位置,可以使用相對路徑
/**
* DSL: 類SQL語法 api 介于代碼和純sql之間的一種api
*
* spark在DSL語法api中,將純sql中的函數(shù)都使用了隱式轉(zhuǎn)換變成一個scala中的函數(shù)
* 如果想要在DSL語法中使用這些函數(shù),需要導(dǎo)入隱式轉(zhuǎn)換
*
*/
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
import org.apache.spark.sql.functions._
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理
import sparkSession.implicits._
// linesDF.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()
val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word")
.groupBy($"word")
.agg(count($"word") as "counts")
/**
* 保存數(shù)據(jù)
*/
resultDF
.repartition(1)
.write
.format("csv")
.option("sep","\t")
.mode(SaveMode.Overwrite)
.save("spark/data/sqlout2")
}
}
Demo3DSLAPI
package com.shujia.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo3DSLAPI {
def main(args: Array[String]): Unit = {
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("dsl語法api演示")
.config("spark.sql.shuffle.partitions",1) //默認分區(qū)的數(shù)量是200個
.getOrCreate()
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
import org.apache.spark.sql.functions._
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理
import sparkSession.implicits._
/**
* DSL api
*/
//新版本的讀取方式,讀取一個json數(shù)據(jù),不需要手動指定列名
// val stuDF1: DataFrame = sparkSession.read
// .json("spark/data/students.json")
//以前版本讀取方式,靈活度要高一些
val stuDF: DataFrame = sparkSession.read
.format("json")
.load("spark/data/students.json")
// stuDF2.show(100, truncate = false) //傳入展示總條數(shù),并完全顯示數(shù)據(jù)內(nèi)容
/**
* select 函數(shù):選擇數(shù)據(jù)【字段】,和純sql語句中select意思基本是一樣,在數(shù)據(jù)的前提上選擇要留下的列
*
*/
//根據(jù)字段的名字選擇要查詢的字段
// stuDF.select("id","name","age").show(1)
// //根據(jù)字段的名字選擇要查詢的字段,selectExpr 可以傳入表達式字符串形式
// stuDF.selectExpr("id","name","age","age + 1 as new_age").show()
//使用隱式轉(zhuǎn)換中的$函數(shù)將字段變成一個對象
// stuDF.select($"id",$"name",$"age").show(10)
//使用對象做處理
// stuDF.select($"id",$"name",$"age" + 1 as "new_age").show(10)
//可以在select中使用sql的函數(shù)
//下面的操作等同于sql:select id,name,age+1 as new_age,substring(clazz,0,2) as km from lines;
// stuDF.select($"id",$"name",$"age" + 1 as "new_age",substring($"clazz",0,2) as "km").show(10)
/**
* where 函數(shù):過濾數(shù)據(jù)
*/
//直接將sql中where語句以字符串的形式傳參
// stuDF.where("gender='女' and age=23").show()
//使用$列對象的形式過濾
// =!= 不等于
// === 等于
// stuDF.where($"gender" === "女" and $"age" === 23).show()
// stuDF.where($"gender" =!= "男" and $"age" === 23).show()
//過濾文科的學(xué)生
// stuDF.where(substring($"clazz", 0, 2) === "文科").show()
/**
* groupBy 分組函數(shù)
* agg 聚合函數(shù)
* 分組聚合要在一起使用
* 分組聚合之后的結(jié)果DF中只會包含分組字段和聚合字段
* select中無法出現(xiàn)不是分組的字段
*/
//根據(jù)班級分組,求每個班級的人數(shù)和平均年齡
// stuDF.groupBy($"clazz")
// .agg(count($"clazz") as "number",round(avg($"age"),2) as "avg_age").show()
/**
* orderBy: 排序
*/
// stuDF.groupBy($"clazz")
// .agg(count($"clazz") as "number")
// .orderBy($"number").show()
/**
* join: 表關(guān)聯(lián)
*/
val scoreDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,subject_id STRING,score INT")
.load("spark/data/score.txt")
// scoreDF.show()
//關(guān)聯(lián)場景1:所關(guān)聯(lián)的字段名字不一樣的時候
// stuDF.join(scoreDF, $"id" === $"sid", "inner").show()
//關(guān)聯(lián)場景2:所關(guān)聯(lián)的字段名字一樣的時候
// stuDF.join(scoreDF,"id").show()
/**
* 開窗函數(shù)
* 統(tǒng)計每個班級總分前3的學(xué)生
*
* 開窗不會改變總條數(shù)的,會以新增一列的形式加上開窗的結(jié)果
* withColumn 新增一列
*/
val joinStuAndScoreWithIDDF: DataFrame = stuDF.join(scoreDF, "id")
joinStuAndScoreWithIDDF.groupBy($"id", $"clazz") //根據(jù)學(xué)號和班級一起分組
.agg(sum($"score") as "sumScore") //計算總分
.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))
//.select($"id", $"clazz", $"sumScore", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc) as "rn")
.where($"rn" <= 3)
.show()
}
}
Demo4DataSourceAPI
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo4DataSourceAPI {
def main(args: Array[String]): Unit = {
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("dsl語法api演示")
.config("spark.sql.shuffle.partitions",1) //默認分區(qū)的數(shù)量是200個
.getOrCreate()
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理
/**
* 讀取csv格式的數(shù)據(jù),默認是以英文逗號分割的
*
*/
// val stuCsvDF: DataFrame = sparkSession.read
// .format("csv")
// .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
// .option("sep", ",")
// .load("spark/data/students.csv")
// //求每個班級的人數(shù),保存到文件中
// stuCsvDF.groupBy($"clazz")
// .agg(count($"clazz") as "number")
// .write
// .format("csv")
// .option("sep",",")
// .mode(SaveMode.Overwrite)
// .save("spark/data/souceout1")
/**
* 讀取json數(shù)據(jù)格式,因為json數(shù)據(jù)有鍵值對。會自動地將鍵作為列名,值作為列值,不需要手動設(shè)置表結(jié)構(gòu)
*/
val stuJsonDF: DataFrame = sparkSession.read
.format("json")
.load("spark/data/students2.json")
// //統(tǒng)計每個性別的人數(shù)
// stuJsonDF.groupBy($"gender")
// .agg(count($"gender") as "number")
// .write
// .format("json")
// .mode(SaveMode.Overwrite)
// .save("spark/data/jsonout1")
/**
* parquet
* 壓縮的比例是由【信息熵】來決定的
*/
// stuJsonDF.write
// .format("parquet")
// .mode(SaveMode.Overwrite)
// .save("spark/data/parquetout2")
//讀取parquet格式文件的時候,也是不需要手動指定表結(jié)構(gòu)
// val stuParquetDF: DataFrame = sparkSession.read
// .format("parquet")
// .load("spark/data/parquetout1/part-00000-c5917bb6-172b-49bd-a90c-90b7f09b69d6-c000.snappy.parquet")
// stuParquetDF.show()
/**
* 讀取數(shù)據(jù)庫中的數(shù)據(jù),mysql
*
*/
val jdDF: DataFrame = sparkSession.read
.format("jdbc")
.option("url", "jdbc:mysql://192.168.220.100:3306")
.option("dbtable", "bigdata29.jd_goods")
.option("user", "root")
.option("password", "123456")
.load()
jdDF.show(10,truncate = false)
}
}
Demo5RDDToDF
package com.shujia.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Demo5RDDToDF {
def main(args: Array[String]): Unit = {
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("RDD和DF互相轉(zhuǎn)換演示")
.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個
.getOrCreate()
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換
import sparkSession.implicits._
/**
* 使用SparkContext讀取數(shù)據(jù)封裝成RDD
*
* SparkSession包含了SparkContext
*/
//使用SparkSession獲取SparkContext
val sc: SparkContext = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("spark/data/students.csv")
val studentsRDD: RDD[(String, String, Int, String, String)] = linesRDD.map((line: String) => line.split(","))
.map {
//1500100001,施笑槐,22,女,文科六班
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
(id, name, age.toInt, gender, clazz)
}
/**
* RDD轉(zhuǎn)DF
*/
val studentsDF: DataFrame = studentsRDD.toDF("id", "name", "age", "gender", "clazz")
studentsDF.createOrReplaceTempView("students")
val resultDF: DataFrame = sparkSession.sql(
"""
|select
|clazz,
|count(1) as number
|from
|students
|group by clazz
|""".stripMargin)
/**
* 在Row的數(shù)據(jù)類型中 所有整數(shù)類型統(tǒng)一為Long 小數(shù)類型統(tǒng)一為Double
* 轉(zhuǎn)RDD
*/
val studentsRDD2: RDD[Row] = resultDF.rdd
// studentsRDD2.map((row:Row)=>{
// val clazz: String = row.getAs[String]("clazz")
// val number: Long = row.getAs[Long]("number")
// (clazz,number)
// }).foreach(println)
studentsRDD2.map{
case Row(clazz:String,number:Long)=>
(clazz,number)
}.foreach(println)
}
}
Demo6Window
package com.shujia.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 開窗函數(shù)
* 聚合開窗函數(shù):sum count avg min max
* 排序開窗函數(shù):row_number rank desen_rank lag(向上?。?lead(向后?。?/p>
*/
object Demo6Window {
def main(args: Array[String]): Unit = {
//創(chuàng)建SparkSession對象
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("開窗函數(shù)DSL API演示")
.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個
.getOrCreate()
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
import org.apache.spark.sql.functions._
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換
import sparkSession.implicits._
//學(xué)生表
val studentsDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("spark/data/students.csv")
//成績表
val scoresDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("sid STRING,subject_id STRING,score INT")
.load("spark/data/score.txt")
//科目表
val subjectDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("subject_id STRING,subject_name STRING,subject_sum_score INT")
.load("spark/data/subject.csv")
//將學(xué)生數(shù)據(jù)與成績數(shù)據(jù)進行關(guān)聯(lián)
val joinDF: DataFrame = studentsDF.join(scoresDF, $"id" === $"sid")
// joinDF.show(10)
/**
* 1、統(tǒng)計總分年級排名前十學(xué)生各科的分數(shù)
*
* 未排序之前,是將開窗中所有數(shù)據(jù)一起聚合得到一個結(jié)果
* 若排序了,依次從上到下聚合得到一個結(jié)果
*
*/
joinDF
// sum(score) over(partition by id ) as sumScore
.withColumn("sumScore", sum($"score") over Window.partitionBy($"id"))
.orderBy($"sumScore".desc)
.limit(60)
//.show(60)
/**
* 3、統(tǒng)計每科都及格的學(xué)生
*/
scoresDF
.join(subjectDF, "subject_id")
.where($"score" >= $"subject_sum_score" * 0.6)
//統(tǒng)計學(xué)生及格的科目數(shù)
.withColumn("jiGeCounts", count($"sid") over Window.partitionBy($"sid"))
.where($"jiGeCounts" === 6)
// .show(100)
/**
* 2、統(tǒng)計總分大于年級平均分的學(xué)生
*/
joinDF
//計算每個學(xué)生的總分,新增一列
.withColumn("sumScore", sum($"score") over Window.partitionBy($"id"))
.withColumn("avgScore", avg($"sumScore") over Window.partitionBy(substring($"clazz", 0, 2)))
.where($"sumScore" > $"avgScore")
// .show(200)
/**
* 統(tǒng)計每個班級的每個名次之間的分數(shù)差
*
*/
joinDF
.groupBy($"id", $"clazz")
.agg(sum($"score") as "sumScore")
//開窗,班級開窗,總分降序排序,排個名次
.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))
//開窗,取出前一名的總分
.withColumn("front_score", lag($"sumScore",1,750) over Window.partitionBy($"clazz").orderBy($"sumScore".desc))
.withColumn("cha",$"front_score" - $"sumScore")
.show(100)
}
}
Demo7BurksTest1
package com.shujia.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Demo7BurksTest1 {
def main(args: Array[String]): Unit = {
//創(chuàng)建SparkSession對象
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("公司營收額數(shù)據(jù)需求演示")
.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個
.getOrCreate()
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
import org.apache.spark.sql.functions._
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換
import sparkSession.implicits._
//讀取數(shù)據(jù)
val burksDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("burk STRING,year STRING" +
",tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE" +
",tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE" +
",tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE")
.load("spark/data/burks.txt")
/**
* 1、統(tǒng)計每個公司每年按月累計收入 行轉(zhuǎn)列 --> sum窗口函數(shù)
* 輸出結(jié)果
* 公司代碼,年度,月份,當(dāng)月收入,累計收入
*
*/
//純sql的方式實現(xiàn)
// burksDF.createOrReplaceTempView("burks")
// sparkSession.sql(
// """
// |select
// |t1.burk as burk,
// |t1.year as year,
// |t1.month as month,
// |t1.tsl as tsl,
// |sum(t1.tsl) over(partition by t1.burk,t1.year order by t1.month) as leijia
// |from
// |(select
// | burk,
// | year,
// | month,
// | tsl
// | from
// | burks
// | lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,tsl
// |) t1
// |""".stripMargin).show()
// 使用DSL API實現(xiàn)
val m: Column = map(
expr("1"), $"tsl01",
expr("2"), $"tsl02",
expr("3"), $"tsl03",
expr("4"), $"tsl04",
expr("5"), $"tsl05",
expr("6"), $"tsl06",
expr("7"), $"tsl07",
expr("8"), $"tsl08",
expr("9"), $"tsl09",
expr("10"), $"tsl10",
expr("11"), $"tsl11",
expr("12"), $"tsl12"
)
burksDF
.select($"burk",$"year",explode(m) as Array("month","tsl"))
//按月累計
.withColumn("leijia",sum($"tsl") over Window.partitionBy($"burk",$"year").orderBy($"month"))
.show()
/**
* 2、統(tǒng)計每個公司當(dāng)月比上年同期增長率 行轉(zhuǎn)列 --> lag窗口函數(shù)
* 公司代碼,年度,月度,增長率(當(dāng)月收入/上年當(dāng)月收入 - 1)
*/
}
}
Demo8SubmitYarn
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Demo8SubmitYarn {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.config("spark.sql.shuffer.partitions", 1)
.getOrCreate()
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val stuendsDF: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("/bigdata29/spark_in/data/student")
val genderCountsDF: DataFrame = stuendsDF.groupBy($"gender")
.agg(count($"gender") as "counts")
genderCountsDF.write.format("csv").option("sep",",").mode(SaveMode.Overwrite).save("/bigdata29/spark_out/out2")
}
}
Demo9SparkOnHive
package com.shujia.sql
import org.apache.spark.sql.SparkSession
object Demo9SparkOnHive {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("spark讀取hive數(shù)據(jù)")
.enableHiveSupport()
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import sparkSession.implicits._
import org.apache.spark.sql.functions._
sparkSession.sql("use bigdata29")
sparkSession.sql("select clazz,count(1) as counts from students group by clazz").show()
}
}
Demo10Student
package com.shujia.sql
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
/**
* 1、行列轉(zhuǎn)換
*
* 表1
* 姓名,科目,分數(shù)
* name,item,score
* 張三,數(shù)學(xué),33
* 張三,英語,77
* 李四,數(shù)學(xué),66
* 李四,英語,78
*
*
* 表2
* 姓名,數(shù)學(xué),英語
* name,math,english
* 張三,33,77
* 李四,66,78
*
* 1、將表1轉(zhuǎn)化成表2
* 2、將表2轉(zhuǎn)化成表1
*/
object Demo10Student {
def main(args: Array[String]): Unit = {
//創(chuàng)建SparkSession對象
/**
* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("行列互相轉(zhuǎn)換演示")
.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個
.getOrCreate()
//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)
import org.apache.spark.sql.functions._
//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換
import sparkSession.implicits._
/**
* 列轉(zhuǎn)行
*/
val tb1DF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("name STRING,item STRING,score INT")
.load("/bigdata29/tb1.txt")
/*
張三,數(shù)學(xué),33
張三,英語,77
李四,數(shù)學(xué),66
李四,英語,78
張三,33,77
李四,66,78
*/
// val res1DF: DataFrame = tb1DF.groupBy($"name")
// .agg(
// sum(when($"item" === "數(shù)學(xué)", $"score").otherwise(0)) as "math",
// sum(when($"item" === "英語", $"score").otherwise(0)) as "english")
// res1DF.write
// .format("csv")
// .option("sep",",")
// .mode(SaveMode.Overwrite)
// .save("/bigdata29/out7")
// res1DF.show()
/**
* 行轉(zhuǎn)列
*/
// val tb2DF: DataFrame = sparkSession.read
// .format("csv")
// .option("sep", ",")
// .schema("name STRING,math STRING,english INT")
// .load("/bigdata29/out7/*")
//
// val m: Column = map(
// expr("'數(shù)學(xué)'"), $"math",
// expr("'語文'"), $"english"
// )
// tb2DF.select($"name",explode(m) as Array("item","score")).show()
}
}
Demo11UDF
package com.shujia.sql
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo11UDF {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("udf函數(shù)演示")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
/**
* 1、在使用DSL的時候使用自定義函數(shù)
*/
val studentsDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("spark/data/students.csv")
// studentsDF.show()
//編寫自定義函數(shù)
//udf中編寫的是scala代碼
val shujia_fun1: UserDefinedFunction = udf((str: String) => "數(shù)加:" + str)
// studentsDF.select(shujia_fun1($"clazz")).show()
/**
* 1、使用SQL語句中使用自定函數(shù)
*/
studentsDF.createOrReplaceTempView("students")
//將自定義的函數(shù)變量注冊成一個函數(shù)
sparkSession.udf.register("shujia_str",shujia_fun1)
sparkSession.sql(
"""
|select clazz,shujia_str(clazz) as new_clazz from students
|""".stripMargin).show()
}
}
Demo12ShuJiaStr
package com.shujia.sql
import org.apache.hadoop.hive.ql.exec.UDF
class Demo12ShuJiaStr extends UDF {
def evaluate(str: String): String = {
"shujia: " + str
}
}
/**
* 1、將類打包,放在linux中spark的jars目錄下
* 2、進入spark-sql的客戶端
* 3、使用上傳的jar中的udf類來創(chuàng)建一個函數(shù)
* create function shujia_str as 'com.shujia.sql.Demo12ShuJiaStr';
*/
Demo13SheBao
package com.shujia.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo13SheBao {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("作業(yè)社保演示")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//讀取數(shù)據(jù)
val sheBaoDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,burk STRING,sdate STRING")
.load("spark/data/shebao.txt")
//統(tǒng)計每個員工的工作經(jīng)歷
sheBaoDF
//取出員工上一個月所在公司
.withColumn("last_burk", lag($"burk", 1) over Window.partitionBy($"id").orderBy($"sdate"))
//.show()
//在每一行后新增一列,標(biāo)記列,如果換工作了,標(biāo)記1 否則標(biāo)記0
.withColumn("flag", when($"burk" === $"last_burk", 0).otherwise(1))//.show()
//以用戶開窗,將后面的flag值加起來
.withColumn("tmp",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))//.show()
.groupBy($"id",$"burk",$"tmp")
.agg(min($"sdate") as "start_date",max($"sdate") as "end_start")
.show(100)
}
}
package com.shujia.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Demo14MaYi {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("作業(yè)社保演示")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
//用戶碳排放量表
val ant_user_low_carbon: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("user_id STRING,data_dt STRING,low_carbon DOUBLE")
.load("spark/data/ant_user_low_carbon.txt")
val ant_plant_carbon: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("plant_id STRING,plant_name STRING,plant_carbon DOUBLE")
.load("spark/data/ant_plant_carbon.txt")
// ant_user_low_carbon.show()
// ant_plant_carbon.show()
/**
* 螞蟻森林植物申領(lǐng)統(tǒng)計
* 問題:假設(shè)2017年1月1日開始記錄低碳數(shù)據(jù)(user_low_carbon),假設(shè)2017年10月1日之前滿足申領(lǐng)條件的用戶都申領(lǐng)了一顆p004-胡楊,
* 剩余的能量全部用來領(lǐng)取“p002-沙柳” 。
* 統(tǒng)計在10月1日累計申領(lǐng)“p002-沙柳” 排名前10的用戶信息;以及他比后一名多領(lǐng)了幾顆沙柳。
* 得到的統(tǒng)計結(jié)果如下表樣式:
*/
//因為用戶能量表與植物能量表沒有關(guān)聯(lián)字段,如果想要在sql中使用胡楊以及沙柳的能量的話,需要先單獨從植物能量表將胡楊以及沙柳的能量取出來,使用變量接收
val huYangCarbon: Double = ant_plant_carbon
//過濾條件是胡楊
.where($"plant_name" === "胡楊")
//選擇能量的一列
.select($"plant_carbon")
//轉(zhuǎn)rdd
.rdd
//轉(zhuǎn)scala數(shù)組
.collect()
.map {
case Row(plant_carbon: Double) => plant_carbon
}.head
val shaLiuCarbon: Double = ant_plant_carbon
//過濾條件是胡楊
.where($"plant_name" === "沙柳")
//選擇能量的一列
.select($"plant_carbon")
//轉(zhuǎn)rdd
.rdd
//轉(zhuǎn)scala數(shù)組
.collect()
.map {
case Row(plant_carbon: Double) => plant_carbon
}.head
println(s"胡楊所需的碳排放量:${huYangCarbon},沙柳所需要的碳排放量:${shaLiuCarbon}")
println("---------------------------------------------------------------------------")
ant_user_low_carbon
//取出2017年1月1日到2017年10月1日
.where($"data_dt" >= "2017/1/1" and $"data_dt" <= "2017/10/1")
//按照用戶分組,求用戶的總的排放量
.groupBy($"user_id")
.agg(sum($"low_carbon") as "sum_low_carbon")
//判斷能量是否滿足一個胡楊的能量,如果滿足直接減去胡楊的能量,得到剩余的能量
.withColumn("shengYu_carbon", when($"sum_low_carbon" >= huYangCarbon, $"sum_low_carbon" - huYangCarbon).otherwise($"sum_low_carbon"))
//計算剩余的能量可以領(lǐng)取多少個沙柳
.withColumn("number", floor($"shengYu_carbon" / shaLiuCarbon))
//.show()
//獲取后一名用戶的領(lǐng)取沙柳的個數(shù)
.withColumn("lead_number", lead($"number", 1, 0) over Window.orderBy($"number".desc))
//.show()
.withColumn("duo", $"number" - $"lead_number")
.limit(10)
//.show()
/**
* 螞蟻森林低碳用戶排名分析
* 問題:查詢user_low_carbon表中每日流水記錄,條件為:
* 用戶在2017年,連續(xù)三天(或以上)的天數(shù)里,
* 每天減少碳排放(low_carbon)都超過100g的用戶低碳流水。
* 需要查詢返回滿足以上條件的user_low_carbon表中的記錄流水。
* 例如用戶u_002符合條件的記錄如下,因為2017/1/2~2017/1/5連續(xù)四天的碳排放量之和都大于等于100g:
*/
//("user_id STRING,data_dt STRING,low_carbon DOUBLE")
ant_user_low_carbon
//過濾出2017年的數(shù)據(jù)
.where(substring($"data_dt", 0, 4) === "2017")
//計算用戶每天積攢的能量 用戶id和日期一起分組
.groupBy($"user_id", $"data_dt")
.agg(sum($"low_carbon") as "sum_low_carbon")
//過濾掉能量小于等于100的
.where($"sum_low_carbon" > 100)
//開窗有,以用戶開窗,以日期升序排序,編號 row_number
.withColumn("rn", row_number() over Window.partitionBy($"user_id").orderBy($"data_dt"))
//使用日期減去排序
.withColumn("flag_dt", date_sub(regexp_replace($"data_dt", "/", "-"), $"rn"))
//計算連續(xù)的天數(shù)
.withColumn("lianxu_days", count($"data_dt") over Window.partitionBy($"user_id", $"flag_dt"))
//過濾出連續(xù)3天以上的數(shù)據(jù)
.where($"lianxu_days" >= 3)
//關(guān)聯(lián)流水記錄表,得到每個符合條件的數(shù)據(jù)
.join(ant_user_low_carbon,List("user_id","data_dt"))
.select($"user_id",$"data_dt",$"low_carbon")
.show(100)
}
}
Test
package com.shujia.sql
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
//1、創(chuàng)建Spark sql環(huán)境
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("sql")
.config("spark.sql.shuffle.partitions", 1) //默認在集群中時200個
.getOrCreate()
import org.apache.spark.sql.functions._
//定義UDF
val str_split: UserDefinedFunction = udf((line: String) => {
"數(shù)加:"+line
})
// val value2: Broadcast[UserDefinedFunction] = spark.sparkContext.broadcast(str_split)
// spark.udf.register("str_split", str_split)
/**
* 1、在使用DSL的時候使用自定義函數(shù)
*/
val studentsDF: DataFrame = spark.read
.format("csv")
.option("sep", ",")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("spark/data/students.csv")
//在使用DSL時,使用自定義函數(shù)
// studentsDF.select(str_split($"clazz")).show()
studentsDF.createOrReplaceTempView("lines")
spark.udf.register("str_split", (line: String) => "數(shù)加:"+line)
spark.sql(
"""
|select str_split(clazz) from lines
|""".stripMargin).show()
/**
* 在 sql中使用自定義函數(shù)
*/
// studentsDF.createOrReplaceTempView("lines")
//
// //注冊自定義函數(shù)
// spark.udf.register("str_split", (line: String) => "數(shù)加:"+line)
//
// spark.sql(
// """
// |select * from
// |lines
// |lateral view explode(str_split(line,',')) T as word
// |
// |""".stripMargin).show()
}
}
3.Spark-sql
Demo1WordCount
package com.shujia.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo1WordCount {
def main(args: Array[String]): Unit = {
/**
* spark core: SparkContext --> 核心數(shù)據(jù)類型 RDD
* spark sql: SparkSession --> 核心數(shù)據(jù)類型 DataFrame
* spark streaming: StreamingContext --> 核心數(shù)據(jù)類型 DStream(RDD)
*
* 創(chuàng)建SparkStreaming的環(huán)境需要使用StreamingContext對象
*/
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("spark streaming 單詞統(tǒng)計案例")
val sparkContext = new SparkContext(conf)
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val lineDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
val flatMapDS: DStream[String] = lineDS.flatMap(_.split(" "))
val wordsKVDS: DStream[(String, Int)] = flatMapDS.map((_, 1))
val resultDS: DStream[(String, Int)] = wordsKVDS.reduceByKey((x: Int, y: Int) => x + y)
resultDS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo2UpdateStateByKey
package com.shujia.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo2UpdateStateByKey {
def main(args: Array[String]): Unit = {
/**
* spark core: SparkContext --> 核心數(shù)據(jù)類型 RDD
* spark sql: SparkSession --> 核心數(shù)據(jù)類型 DataFrame
* spark streaming: StreamingContext --> 核心數(shù)據(jù)類型 DStream(RDD)
*
* 創(chuàng)建SparkStreaming的環(huán)境需要使用StreamingContext對象
*/
//spark streaming依賴于spark core的環(huán)境
//因為spark streaming核心數(shù)據(jù)類型底層封裝的是RDD
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("spark streaming 單詞統(tǒng)計案例1")
val sparkContext = new SparkContext(conf)
// def this(sparkContext: SparkContext, batchDuration: Duration) //傳入一個spark core 環(huán)境對象以及一個接收時間的范圍
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
//設(shè)置checkpoint路徑
streamingContext.checkpoint("spark/data/stream_state_wc_checkpoint")
/**
* 因為sparkstreaming是一個近實時的計算引擎,整個程序?qū)懲赀\行的狀態(tài),一直運行,一直接收數(shù)據(jù),除非異?;蛘呤謩油V?/p>
*
* 1、目前在spark使用nc工具監(jiān)控一個端口號中源源不斷產(chǎn)生的數(shù)據(jù)
* 2、yum install -y nc 下載安裝nc工具
* 3、nc -lk xxxx
*/
val lineDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
val flatMapDS: DStream[String] = lineDS.flatMap(_.split(" "))
val wordsKVDS: DStream[(String, Int)] = flatMapDS.map((_, 1))
val res2DS: DStream[(String, Int)] = wordsKVDS.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
val currentCount: Int = seq.sum
val count: Int = option.getOrElse(0)
Option(currentCount + count)
})
res2DS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo3ReduceByKeyAndWindow
package com.shujia.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo3ReduceByKeyAndWindow {
def main(args: Array[String]): Unit = {
/**
* 舊版本創(chuàng)建sparkContext的方式
*/
// val conf = new SparkConf()
// conf.setMaster("local[2]")
// conf.setAppName("spark streaming 單詞統(tǒng)計案例1")
// val sparkContext = new SparkContext(conf)
// // def this(sparkContext: SparkContext, batchDuration: Duration) //傳入一個spark core 環(huán)境對象以及一個接收時間的范圍
// val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
/**
* 新版本中推薦使用SparkSession對象來獲取
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("窗口演示")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
//創(chuàng)建SparkStreaming對象
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
//將數(shù)據(jù)封裝成kv格式,將來可以調(diào)用kv類型的算子來進行操作
val wordsKVDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))
//1、如果只需要計算當(dāng)前批次的數(shù)據(jù),直接reduceByKey
//2、如果要從最開始的數(shù)據(jù)計算,使用有狀態(tài)算子,updateStateByKey和checkpoint配合使用
//3、如果要計算最近的時間段內(nèi)的數(shù)據(jù),使用窗口類算子,reduceByKeyAndWindow
/**
* 滑動窗口
*/
// val res1DS: DStream[(String, Int)] = wordsKVDS.reduceByKeyAndWindow(
// (x: Int, y: Int) => x + y,
// Durations.seconds(15), // 設(shè)置窗口的大小
// Durations.seconds(5)) // 設(shè)置滑動的大小
/**
* 滾動窗口
*/
val res1DS: DStream[(String, Int)] = wordsKVDS.reduceByKeyAndWindow(
(x: Int, y: Int) => x + y,
Durations.seconds(10), // 設(shè)置窗口的大小
Durations.seconds(10)) // 設(shè)置滑動的大小
res1DS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo4DStreamToRDD
package com.shujia.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo4DStreamToRDD {
def main(args: Array[String]): Unit = {
/**
* 創(chuàng)建SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("DS2RDD演示")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
/**
* 通過spark streaming讀取數(shù)據(jù)得到一個DS
*/
//hello hello world java hello hadoop
val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
val new_linesDS: DStream[String] = linesDS.window(Durations.seconds(15), Durations.seconds(5))
/**
* DStream底層也是RDD,每隔一個批次將接收到的數(shù)據(jù)封裝到RDD中
* 每隔一個批次,接收到數(shù)據(jù)是不一樣的
*/
new_linesDS.foreachRDD((rdd: RDD[String]) => {
println("===================================================")
println("正在處理當(dāng)前批次的數(shù)據(jù).....")
println("===================================================")
//在這里面直接寫處理rdd的代碼
// rdd.flatMap(_.split(" "))
// .map((_, 1))
// .groupBy(_._1)
// .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))
// .foreach(println)
/**
* 既然這里可以操作rdd, 又因為rdd可以轉(zhuǎn)df, 那么我就可以在spark streaming中一個批次的數(shù)據(jù)可以使用使用sql語句來分析
*/
//def toDF(colNames: String*)
val linesDF: DataFrame = rdd.toDF("line")
linesDF.createOrReplaceTempView("words")
sparkSession.sql(
"""
|select
|t1.word as word,
|count(1) as number
|from
|(select
| explode(split(line,' ')) as word
|from
|words) t1
|group by t1.word
|""".stripMargin).show()
})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo5RDDToDStream
package com.shujia.streaming
import org.apache.spark.{SparkContext, rdd}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object Demo5RDDToDStream {
def main(args: Array[String]): Unit = {
/**
* 創(chuàng)建SparkSession
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("DS2RDD演示")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
/**
* 通過spark streaming讀取數(shù)據(jù)得到一個DS
*/
//hello hello world java hello hadoop
val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
//in_rdd:hello hello world java hello hadoop
//out_rdd:
// hello 3
// world 1
// java 1
// hadoop 1
val resDS: DStream[(String, Long)] = linesDS.transform((rdd: RDD[String]) => {
/**
* rdd處理之后會得到一個新的rdd進行返回
*/
// val resultRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" "))
// .map((_, 1))
// .groupBy(_._1)
// .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))
// resultRDD
//def toDF(colNames: String*)
val linesDF: DataFrame = rdd.toDF("line")
linesDF.createOrReplaceTempView("words")
val resRDD: RDD[(String, Long)] = sparkSession.sql(
"""
|select
|t1.word as word,
|count(1) as number
|from
|(select
| explode(split(line,' ')) as word
|from
|words) t1
|group by t1.word
|""".stripMargin)
.rdd
.map((row: Row) => {
(row.getAs[String](0), row.getAs[Long](1))
})
resRDD
})
resDS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo6Submit
package com.shujia.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo6Submit {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName("提交命令執(zhí)行")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
linesDS
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo7SaveToFile
package com.shujia.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo7SaveToFile {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("提交命令執(zhí)行")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
val resultDS: DStream[(String, Int)] = linesDS
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.transform((rdd:RDD[(String,Int)])=>{
println("=======================")
println("正在處理批次數(shù)據(jù)")
rdd
})
//目標(biāo)路徑是一個文件夾,文件的名字系統(tǒng)生成的,自己可以指定后綴
//每一批次計算結(jié)果都會生成一個結(jié)果文件,滾動生成的
resultDS.saveAsTextFiles("spark/data/streams/stream","txt")
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo8SaveToMysql
package com.shujia.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import java.sql.{Connection, DriverManager, PreparedStatement}
object Demo8SaveToMysql {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("提交命令執(zhí)行")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
linesDS.foreachRDD((rdd:RDD[String])=>{
println("------------正在處理一批數(shù)據(jù)-------------------")
println(s"該批次的分區(qū)數(shù):${rdd.getNumPartitions}")
/**
* foreachPartition 每一個分區(qū)處理一次邏輯
*/
rdd.foreachPartition((itr:Iterator[String])=>{
println("------------數(shù)加 防偽碼-------------------")
//創(chuàng)建與數(shù)據(jù)庫的連接對象
//1、注冊驅(qū)動
Class.forName("com.mysql.jdbc.Driver")
//2、獲取數(shù)據(jù)庫連接對象
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false",
"root",
"123456"
)
//獲取數(shù)據(jù)庫操作對象
val ps: PreparedStatement = conn.prepareStatement("insert into student values(?,?,?,?,?)")
itr.foreach((line:String)=>{
println("....正在處理一條數(shù)據(jù)....")
//1500100019,婁曦之,24,男,理科三班
val infos: Array[String] = line.split(",")
val id: Int = infos(0).toInt
val name: String = infos(1)
val age: Int = infos(2).toInt
val gender: String = infos(3)
val clazz: String = infos(4)
//給預(yù)編譯對象傳參
ps.setInt(1,id)
ps.setString(2,name)
ps.setInt(3,age)
ps.setString(4,gender)
ps.setString(5,clazz)
//執(zhí)行sql語句
ps.executeUpdate()
})
//釋放資源
ps.close()
conn.close()
println()
})
})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
Demo9CardCondition
package com.shujia.streaming
import com.alibaba.fastjson.{JSON, JSONObject}
import com.alibaba.fastjson.serializer.JSONObjectCodec
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import java.lang
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date
object Demo9CardCondition {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("提交命令執(zhí)行")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val sparkStreaming: StreamingContext = new StreamingContext(sparkContext, Durations.seconds(5))
/**
*
* 需求:統(tǒng)計卡口流量的情況
* 讀取卡口過車的監(jiān)控數(shù)據(jù)
* {"car":"皖A(yù)9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
*/
val InfoDS: ReceiverInputDStream[String] = sparkStreaming.socketTextStream("master", 12345)
/**
* 方式1:在數(shù)據(jù)源的產(chǎn)生的時候,設(shè)置窗口
* 統(tǒng)計最近的15s之內(nèi)的卡口情況,每5秒統(tǒng)計一次
*/
//設(shè)置滑動窗口
val carJsonDS: DStream[String] = InfoDS.window(Durations.seconds(15), Durations.seconds(5))
/**
* 1、解析接收到的json數(shù)據(jù)
* fastjson第三方工具處理
*
* json中值如果被雙引號括起來的是一個字符串,若沒有切是一個整數(shù),一般都用long來接收,小數(shù)一般用double
*
*/
val cardAndSpeedDS: DStream[(Long, (Double, Int))] = carJsonDS.map((line: String) => {
//使用fastjson將字符串轉(zhuǎn)成Json對象
val jSONObject: JSONObject = JSON.parseObject(line)
//根據(jù)需求,只需要獲取卡口的值和車速
val cardId: Long = jSONObject.getLong("card")
val carSpeed: Double = jSONObject.getDouble("speed")
(cardId, (carSpeed, 1)) //假設(shè)每次產(chǎn)生的數(shù)據(jù)都不是同一輛車
})
/**
* 2、實時統(tǒng)計每個卡口的平均車速和車的數(shù)量
* 方式2:在DS調(diào)用reduceByKeyAndWindow的時候設(shè)置窗口
*
*/
// cardAndSpeedDS.reduceByKeyAndWindow()
val cardConditionDS: DStream[(Long, (Double, Int))] = cardAndSpeedDS.reduceByKey((kv1: (Double, Int), kv2: (Double, Int)) => {
//將同一卡口的1加起來,就得到這個批次中的車的數(shù)量
val carNumber: Int = kv1._2 + kv2._2
//將同一卡口的速度加起來 / 車的數(shù)量 = 這一批次的卡口平均速度
val sumSpeed: Double = kv1._1 + kv2._1
val avgSpeed: Double = sumSpeed / carNumber
(avgSpeed, carNumber)
})
/**
* 將結(jié)果保存到數(shù)據(jù)庫中
*/
cardConditionDS.foreachRDD((rdd: RDD[(Long, (Double, Int))]) => {
rdd.foreachPartition((itr: Iterator[(Long, (Double, Int))]) => {
println("------------數(shù)加 防偽碼-------------------")
//創(chuàng)建與數(shù)據(jù)庫的連接對象
//1、注冊驅(qū)動
Class.forName("com.mysql.jdbc.Driver")
//2、獲取數(shù)據(jù)庫連接對象
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false",
"root",
"123456"
)
//獲取數(shù)據(jù)庫操作對象
val ps: PreparedStatement = conn.prepareStatement("insert into card_condition values(?,?,?,?)")
//獲取數(shù)據(jù)處理時間
val piCiTime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
itr.foreach((f: (Long, (Double, Int))) => {
val cardId: Long = f._1
val avgSpeed: Double = f._2._1
val carNumber: Int = f._2._2
ps.setLong(1, cardId)
ps.setDouble(2, avgSpeed)
ps.setInt(3, carNumber)
ps.setString(4, piCiTime)
ps.executeUpdate()
})
//釋放資源
ps.close()
conn.close()
println()
})
})
sparkStreaming.start()
sparkStreaming.awaitTermination()
sparkStreaming.stop()
}
}
4.Spark-opt
Demo1Cache
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object Demo1Cache {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/students.csv")
/**
* 盡量避免使用重復(fù)的RDD,避免了之前所有的RDD重復(fù)計算
*
* rdd
* df
* sql
*
*/
//針對被復(fù)用的rdd進行緩存
// studentsRDD.cache() // 默認是MEMORY_ONLY
studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
/**
* 統(tǒng)計每個班級的人數(shù)
*/
studentsRDD.map((line: String) => (line.split(",")(4), 1))
.reduceByKey(_ + _)
.saveAsTextFile("spark/data/opt/clazz_num")
/**
* 統(tǒng)計每個性別的人數(shù)
*
* 第一次作用用到studentsRDD的時候是原本的數(shù)據(jù)量大小,當(dāng)?shù)诙€作業(yè)也用到studentsRDD數(shù)據(jù)的時候,就去緩存中尋找數(shù)據(jù)
*/
studentsRDD.map((line: String) => (line.split(",")(3), 1))
.reduceByKey(_ + _)
.saveAsTextFile("spark/data/opt/gender_num")
while (true){
}
}
}
Demo2AggregateByKey
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Demo2AggregateByKey {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/students.csv")
val clazzKVRDD: RDD[(String, Int)] = studentsRDD.map((line: String) => (line.split(",")(4), 1))
/**
* reduceByKey: 分組聚合,但是只會在reduce端進行聚合
*
*
* AggregateByKey: 分組聚合,不僅可以設(shè)置reduce端聚合的方式,可以提前的在map端進行聚合,相當(dāng)于預(yù)聚合
* zeroValue:初始值
* seqOp: (U, V) => U:map端的聚合
* combOp: (U, U) => U:reduce端的聚合
*
* 如果當(dāng)map聚合邏輯和reduce聚合邏輯不一樣的時候,需要分別設(shè)置的時候,就可以使用該算子
*
*/
//aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
// clazzKVRDD.aggregateByKey(0)(
// (u: Int, v: Int) => u + v, // 在map端做聚合操作的函數(shù)
// (u1: Int, u2: Int) => u1 + u2 //在reduce端做聚合操作
// ).foreach(println)
//reduceByKey(func: (V, V) => V)
clazzKVRDD.reduceByKey(_ + _).foreach(println)
}
}
Demo3MapPartition
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.Date
object Demo3MapPartition {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val dataRDD: RDD[String] = sparkContext.textFile("spark/data/ant_user_low_carbon.txt")
val kvRDD: RDD[(String, String, String)] = dataRDD.mapPartitions((itr: Iterator[String]) => {
itr.map((line: String) => {
val infos: Array[String] = line.split("\t")
(infos(0), infos(1), infos(2))
})
})
// map的邏輯是RDD中的處理每一條數(shù)據(jù)
// val resRDD: RDD[(String, Long, String)] = kvRDD.map((kv: (String, String, String)) => {
// //這句話是是在map中的,所以針對每一個數(shù)據(jù)都要new一個SimpleDateFormat對象
// val sdf = new SimpleDateFormat("yyyy/MM/dd")
// println("----------------創(chuàng)建了一個SimpleDateFormat對象----------------")
//
// val dateObj: Date = sdf.parse(kv._2)
// val ts: Long = dateObj.getTime
// (kv._1, ts, kv._3)
// })
//
// resRDD.foreach(println)
//mapPartitions針對一個分區(qū)的數(shù)據(jù)進行處理
val resRDD2: RDD[(String, Long, String)] = kvRDD.mapPartitions((itr: Iterator[(String, String, String)]) => {
/**
* 將時間字段轉(zhuǎn)成時間戳
*/
val sdf = new SimpleDateFormat("yyyy/MM/dd")
println("----------------創(chuàng)建了一個SimpleDateFormat對象----------------")
itr.map((kv: (String, String, String)) => {
val dateObj: Date = sdf.parse(kv._2)
val ts: Long = dateObj.getTime
(kv._1, ts, kv._3)
})
})
resRDD2.foreach(println)
}
}
Demo4Coalesce
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/**
* 重分區(qū)
*
* repartition
*
* coalesce
*
* 面試題:如何修改rdd中的分區(qū),區(qū)別是什么?
*
*/
object Demo4Coalesce {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
// .config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/students.csv") // 1000條數(shù)據(jù)
println(s"studentsRDD的分區(qū)數(shù)量為:${studentsRDD.getNumPartitions}") //一開始RDD中的分區(qū)數(shù)量取決于block塊的數(shù)量
/**
* repartition:對RDD進行重分區(qū),返回一個新的RDD
* repartition可以增加RDD的分區(qū)數(shù)量或者減少RDD的分區(qū)數(shù)量
*
* 增加或者減少分區(qū)是根據(jù)資源而定
* 若資源充足,可以適當(dāng)?shù)脑黾臃謪^(qū),提高task任務(wù)數(shù)量并行度,加快整個spark作業(yè)執(zhí)行
*
* rdd中一個分區(qū)數(shù)據(jù)是否對應(yīng)有且僅有后一個分區(qū)數(shù)據(jù),如果前一個RDD的分區(qū)數(shù)據(jù)對應(yīng)的是后一個RDD中多個分區(qū),那么就是shuffle
* 使用repartition增加分區(qū)會產(chǎn)生shuffle,數(shù)據(jù)重構(gòu),不是按照原來的順序
* 減少分區(qū)也會產(chǎn)生shuffle
*
*/
// val students2RDD: RDD[String] = studentsRDD.repartition(10)
students2RDD.foreach(println)
// println(s"students2RDD的分區(qū)數(shù)量為:${students2RDD.getNumPartitions}")
//
// val students3RDD: RDD[String] = students2RDD.repartition(1)
// println(s"students3RDD的分區(qū)數(shù)量為:${students3RDD.getNumPartitions}")
// students3RDD.foreach(println)
/**
* coalesce 重分區(qū),增加分區(qū)和減少分區(qū)
*
* 傳入兩個值:numPartitions: Int 目標(biāo)分區(qū)數(shù)量, shuffle: Boolean = false 是否產(chǎn)生shuffle
*
* coalesce增加分區(qū),需要產(chǎn)生shuffle
* coalesce減少分區(qū),可以不用shuffle,也可以用。使用場景,如果要合并小文件的話,可以使用coalesce且不產(chǎn)生shuffle
*/
val studentsRDD2: RDD[String] = studentsRDD.coalesce(10, shuffle = true)
println(s"studentsRDD2的分區(qū)數(shù)量為:${studentsRDD2.getNumPartitions}")
studentsRDD2.foreach(println)
val studentRDD3: RDD[String] = studentsRDD2.coalesce(2, shuffle = false)
println(s"studentRDD3的分區(qū)數(shù)量為:${studentRDD3.getNumPartitions}")
studentRDD3.foreach(println)
while (true) {
}
}
}
Demo5Coalesce2
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Demo5Coalesce2 {
def main(args: Array[String]): Unit = {
/**
* 使用coalesce合并小文件
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
// .config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/studentsinfo/*") // 1000條數(shù)據(jù)
println(s"studentsRDD的分區(qū)數(shù)量為:${studentsRDD.getNumPartitions}") //一開始RDD中的分區(qū)數(shù)量取決于block塊的數(shù)量
//合并小文件
val students2RDD: RDD[String] = studentsRDD.coalesce(1, shuffle = false)
println(s"students2RDD的分區(qū)數(shù)量為:${students2RDD.getNumPartitions}") //一開始RDD中的分區(qū)數(shù)量取決于block塊的數(shù)量
students2RDD.saveAsTextFile("spark/data/studentsinfo2")
}
}
Demo6MapJoin
package com.shujia.opt
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo6MapJoin {
def main(args: Array[String]): Unit = {
/**
* 使用coalesce合并小文件
*/
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val studentsDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("spark/data/students.csv")
val scoresDF: DataFrame = sparkSession.read
.format("csv")
.option("sep", ",")
.schema("id STRING,subject_id STRING,score INT")
.load("spark/data/score.txt")
/**
* 在local模式下,會對sql進行優(yōu)化,使用廣播變量,思想類似于mapjoin
*
* 放到集群中運行,需要手動設(shè)置廣播的數(shù)據(jù)
* 在DSL代碼中使用語法糖來使用廣播數(shù)據(jù)
*
* 一般來說,大表join小表,小表在DSL函數(shù)中使用hint語法將小表廣播出去 小表一般指的是小于1G的數(shù)據(jù)
*
* 開啟兩個作業(yè)job
* 第一個job:將小表數(shù)據(jù)拉到Driver端,從Driver端廣播出去
* 第二個job:在Executor內(nèi)部與廣播的數(shù)據(jù)進行數(shù)據(jù)關(guān)聯(lián)
*
*/
val resDF: DataFrame = scoresDF.join(studentsDF.hint("broadcast"), "id")
resDF.show()
while (true){
}
}
}
Demo7Kryo
package com.shujia.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
/**
* Kryo序列化是spark提供的一種專門適配spark計算時提高性能的一種序列化方式
*
* 序列化可以提高計算的性能,加快計算速度,減少數(shù)據(jù)所占用的內(nèi)存空間
*/
case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
object Demo7Kryo {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
//將序列化方式設(shè)置為Kryo的序列化方式
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//自定義一個序列化類,指定要序列化的東西
.config("spark.kryo.registrator", "com.shujia.opt.Demo8KryoRegistrator")
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[Student] = sparkContext.textFile("spark/data/students.csv").map((line: String) => {
val infos: Array[String] = line.split(",")
Student(infos(0), infos(1), infos(2).toInt, infos(3), infos(4))
})
/**
* 未做序列化:238.3 KiB
* 使用默認的序列化方式:55.7 KiB
* 使用kryo序列化:43.0 KiB
*
*/
//針對被復(fù)用的rdd進行緩存
// studentsRDD.cache() // 默認是MEMORY_ONLY
// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// studentsRDD.persist(StorageLevel.MEMORY_ONLY)
studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
/**
* 統(tǒng)計每個班級的人數(shù)
*/
studentsRDD.map((stu: Student) => (stu.clazz, 1))
.reduceByKey(_ + _)
.saveAsTextFile("spark/data/opt/clazz_num")
/**
* 統(tǒng)計每個性別的人數(shù)
*
* 第一次作用用到studentsRDD的時候是原本的數(shù)據(jù)量大小,當(dāng)?shù)诙€作業(yè)也用到studentsRDD數(shù)據(jù)的時候,就去緩存中尋找數(shù)據(jù)
*/
studentsRDD.map((stu: Student) => (stu.gender, 1))
.reduceByKey(_ + _)
.saveAsTextFile("spark/data/opt/gender_num")
while (true) {
}
}
}
Demo8KryoRegistrator
package com.shujia.opt
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
/**
* 自定義一個序列化類,指定kryo要序列化的東西
*/
class Demo8KryoRegistrator extends KryoRegistrator{
override def registerClasses(kryo: Kryo): Unit = {
/**
* 在這個重寫的方法中指定要序列化的東西
*
*/
kryo.register(classOf[Student])
kryo.register(classOf[String])
kryo.register(classOf[Int])
}
}
Demo9FilterKey
package com.shujia.opt
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo9FilterKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("app")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("spark/data/ws/*")
println("第一個RDD分區(qū)數(shù)量:" + lines.getNumPartitions)
val countRDD: RDD[(String, Int)] = lines
.flatMap(_.split("\\|"))
.map((_, 1))
.groupByKey() // 這里會產(chǎn)生shuffle
.map((x: (String, Iterable[Int])) => (x._1, x._2.toList.sum))
println("聚合之后RDD分區(qū)的數(shù)量" + countRDD.getNumPartitions)
// countRDD.foreach(println)
/**
* 采樣key ,g過濾掉導(dǎo)致數(shù)據(jù)傾斜并且對業(yè)務(wù)影響不大的key
*
*/
val wordRDD: RDD[(String, Int)] = lines
.flatMap(_.split(","))
.map((_, 1))
//
// val top1: Array[(String, Int)] = wordRDD
// .sample(true, 0.1)
// .reduceByKey(_ + _)
// .sortBy(-_._2)
// .take(1)
//導(dǎo)致數(shù)據(jù)傾斜額key
// val key: String = top1(0)._1
//過濾導(dǎo)致傾斜的key
wordRDD
.filter(t => !"null".equals(t._1))
.groupByKey()
.map(x => (x._1, x._2.toList.sum))
.foreach(println)
while (true) {
}
}
}
Demo10DoubleReduce
package com.shujia.opt
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
object Demo10DoubleReduce {
/**
* 雙重聚合
* 一般適用于 業(yè)務(wù)不復(fù)雜的情況
*
*/
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("app")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("data/word")
val wordRDD: RDD[String] = lines
.flatMap(_.split(","))
.filter(!_.equals(""))
/* wordRDD
.map((_, 1))
.groupByKey()
.map(kv => (kv._1, kv._2.size))
.foreach(println)*/
//預(yù)聚合可以避免數(shù)據(jù)傾斜
/* wordRDD
.map((_, 1))
.reduceByKey(_ + _)
.foreach(println)*/
// 對每一個key打上隨機5以內(nèi)前綴
wordRDD
.map(word => {
val pix: Int = Random.nextInt(5)
(pix + "-" + word, 1)
})
.groupByKey() //第一次聚合
.map(t => (t._1, t._2.toList.sum))
.map(t => {
///去掉隨機前綴
(t._1.split("-")(1), t._2)
})
.groupByKey() //第二次聚合
.map(t => (t._1, t._2.toList.sum))
.foreach(println)
while (true) {
}
}
}
Demo11DoubleJoin
package com.shujia.opt
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo11DoubleJoin {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val dataList1 = List(
("java", 1),
("shujia", 2),
("shujia", 3),
("shujia", 1),
("shujia", 1))
val dataList2 = List(
("java", 100),
("java", 99),
("shujia", 88),
("shujia", 66))
val RDD1: RDD[(String, Int)] = sc.parallelize(dataList1)
val RDD2: RDD[(String, Int)] = sc.parallelize(dataList2)
//采樣傾斜的key
val sampleRDD: RDD[(String, Int)] = RDD1.sample(false, 1.0)
//skewedKey 導(dǎo)致數(shù)據(jù)傾斜的key shujia
val skewedKey: String = sampleRDD.map(x => (x._1, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey(ascending = false)
.take(1)(0)._2
//導(dǎo)致數(shù)據(jù)傾斜key的RDD
val skewedRDD1: RDD[(String, Int)] = RDD1.filter(tuple => {
tuple._1.equals(skewedKey)
})
//沒有傾斜的key
val commonRDD1: RDD[(String, Int)] = RDD1.filter(tuple => {
!tuple._1.equals(skewedKey)
})
val skewedRDD2: RDD[(String, Int)] = RDD2.filter(tuple => {
tuple._1.equals(skewedKey)
})
val commonRDD2: RDD[(String, Int)] = RDD2.filter(tuple => {
!tuple._1.equals(skewedKey)
})
val n = 2
//對產(chǎn)生數(shù)據(jù)傾斜的key 使用mapjoin
val skewedMap: Map[String, Int] = skewedRDD2.collect().toMap
val bro: Broadcast[Map[String, Int]] = sc.broadcast(skewedMap)
val resultRDD1: RDD[(String, (Int, Int))] = skewedRDD1.map(kv => {
val word: String = kv._1
val i: Int = bro.value.getOrElse(word, 0)
(word, (kv._2, i))
})
//沒有數(shù)據(jù)傾斜的RDD 正常join
val resultRDD2: RDD[(String, (Int, Int))] = commonRDD1.join(commonRDD2)
//將兩個結(jié)果拼接
resultRDD1.union(resultRDD2)
.foreach(println)
}
}
柚子快報邀請碼778899分享:大數(shù)據(jù)學(xué)習(xí)-Spark
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。