欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:大數(shù)據(jù)學(xué)習(xí)-Spark

柚子快報邀請碼778899分享:大數(shù)據(jù)學(xué)習(xí)-Spark

http://yzkb.51969.com/

大數(shù)據(jù)學(xué)習(xí)-Spark

1.Spark-core

1.Demo1WordCount

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/*

RDD: 彈性的分布式數(shù)據(jù)集

*/

object Demo1WordCount {

def main(args: Array[String]): Unit = {

//1、創(chuàng)建Spark環(huán)境

//1.1 創(chuàng)建配置文件對象

val conf: SparkConf = new SparkConf()

//1.2 指定運行的模式(local Standalone Mesos YARN)

conf.setMaster("local") //可以執(zhí)行所運行需要核數(shù)資源local[2],不指定的話默認使用所有的資源執(zhí)行程序

//1.3 給spark作業(yè)起一個名字

conf.setAppName("wc")

//2、創(chuàng)建spark運行時的上下文對象

val sparkContext: SparkContext = new SparkContext(conf)

//3、讀取文件數(shù)據(jù)

val wordsLine: RDD[String] = sparkContext.textFile("spark/data/words.txt")

//4、每一行根據(jù)|分隔符進行切分

val words: RDD[String] = wordsLine.flatMap(_.split("\\|"))

val wordsTuple2: RDD[(String, Int)] = words.map((_, 1))

val wordsTuple2Group: RDD[(String, Iterable[(String, Int)])] = wordsTuple2.groupBy(_._1)

val wordCount: RDD[(String, Int)] = wordsTuple2Group.map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))

wordCount.saveAsTextFile("spark/data/word_count") //將最終的結(jié)果保存在本地目錄

}

}

2.Demo2Partition

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

/**

* spark的運行過程中如果出現(xiàn)了相同的鍵被拉取到對應(yīng)的分區(qū),這個過程稱之為shuffle

* 注:spark的shuffle和mapreduce的shuffle原理是一樣,都是要進行落盤

*

* RDD: 彈性的分布式數(shù)據(jù)集

* 彈性:RDD將來在計算的時候,其中的數(shù)據(jù)可以是很大,也可以是很小

* 分布式:數(shù)據(jù)可以分布在多臺服務(wù)器中,RDD中的分區(qū)來自于block塊,而今后的block塊會來自不同的datanode

* 數(shù)據(jù)集:RDD自身是不存儲數(shù)據(jù)的,只是一個代碼計算邏輯,今后觸發(fā)作業(yè)執(zhí)行的時候,數(shù)據(jù)會在RDD之間流動

*

*/

object Demo2Partition {

def main(args: Array[String]): Unit = {

//1、創(chuàng)建Spark環(huán)境

//1.1 創(chuàng)建配置文件對象

val conf: SparkConf = new SparkConf()

//1.2 指定運行的模式(local Standalone Mesos YARN)

conf.setMaster("local") //可以執(zhí)行所運行需要核數(shù)資源local[2],不指定的話默認使用所有的資源執(zhí)行程序

//1.3 給spark作業(yè)起一個名字

conf.setAppName("wc")

//2、創(chuàng)建spark運行時的上下文對象

val sparkContext: SparkContext = new SparkContext(conf)

//3、讀取文件數(shù)據(jù)

// val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*", minPartitions = 7)

val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*")

println(s"wordsLineRDD分區(qū)數(shù)是:${wordsLine.getNumPartitions}")

//4、每一行根據(jù)|分隔符進行切分

val words: RDD[String] = wordsLine.flatMap(_.split("\\|"))

println(s"wordsRDD分區(qū)數(shù)是:${words.getNumPartitions}")

val wordsTuple2: RDD[(String, Int)] = words.map((_, 1))

println(s"wordsTuple2RDD分區(qū)數(shù)是:${wordsTuple2.getNumPartitions}")

//產(chǎn)生shuffle的算子上可以單獨設(shè)置分區(qū)數(shù)

val wordsTuple2Group: RDD[(String, Iterable[(String, Int)])] = wordsTuple2.groupBy(_._1, 5)

println(s"wordsTuple2GroupRDD分區(qū)數(shù)是:${wordsTuple2Group.getNumPartitions}")

val wordCount: RDD[(String, Int)] = wordsTuple2Group.map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))

println(s"wordCountRDD分區(qū)數(shù)是:${wordCount.getNumPartitions}")

wordCount.saveAsTextFile("spark/data/word_count2")

}

}

RDD5大特性:(面試必會的?。。?)RDD是由一些分區(qū)構(gòu)成的 讀取文件時有多少個block塊,RDD中就會有多少個分區(qū)注:默認情況下,所有的RDD中的分區(qū)數(shù)是一樣的,無論是shuffle之前還是shuffle之后的,在最開始加載數(shù)據(jù)的時候決定的

2)函數(shù)實際上是作用在RDD中的分區(qū)上的,一個分區(qū)是由一個task處理,有多少個分區(qū),總共就有多少個task注:函數(shù)在spark中稱之為算子(轉(zhuǎn)換transformation算子 RDD–>RDD,行動action算子 RDD->Other數(shù)據(jù)類型)

3)RDD之間存在一些依賴關(guān)系,后一個RDD中的數(shù)據(jù)是依賴與前一個RDD的計算結(jié)果,數(shù)據(jù)像水流一樣在RDD之間流動注:3.1 RDD之間有兩種依賴關(guān)系 **a. 窄依賴 后一個RDD中分區(qū)數(shù)據(jù)對應(yīng)前一個RDD中的一個分區(qū)數(shù)據(jù) 1對1的關(guān)系**

**b. 寬依賴 后一個RDD中分區(qū)數(shù)據(jù)來自于前一個RDD中的多個分區(qū)數(shù)據(jù) 1對多的關(guān)系 shuffle**

**3.2 因為有了依賴關(guān)系,將整個作業(yè)劃分了一個一個stage階段 sumNum(stage) = Num(寬依賴) + 1**

**3.3 窄依賴的分區(qū)數(shù)是不可以改變,取決于第一個RDD分區(qū)數(shù),寬依賴可以在產(chǎn)生shuffle的算子上設(shè)置分區(qū)數(shù)**

4)分區(qū)類的算子只能作用在kv格式的RDD上,groupByKey reduceByKey5)spark為task計算提供了精確的計算位置,移動計算而不移動數(shù)據(jù)

3.Demo3Map

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/*

map算子:轉(zhuǎn)換算子

一個spark作業(yè),由最后的一個action算子來觸發(fā)執(zhí)行的,若沒有action算子,整個作業(yè)不執(zhí)行

RDD具有懶執(zhí)行的特點

*/

object Demo3Map {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("map算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

/**

* map算子:將rdd中的數(shù)據(jù),一條一條的取出來傳入到map函數(shù)中,map會返回一個新的rdd,map不會改變總數(shù)據(jù)條數(shù)

*/

val splitRDD: RDD[List[String]] = studentRDD.map((s: String) => {

println("============數(shù)加防偽碼================")

s.split(",").toList

})

// splitRDD.foreach(println)

}

}

4.Demo4Filter

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo4Filter {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("map算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

/**

* filter: 過濾,將RDD中的數(shù)據(jù)一條一條取出傳遞給filter后面的函數(shù),如果函數(shù)的結(jié)果是true,該條數(shù)據(jù)就保留,否則丟棄

*

* filter一般情況下會減少數(shù)據(jù)的條數(shù)

*/

val filterRDD: RDD[String] = studentRDD.filter((s: String) => {

val strings: Array[String] = s.split(",")

"男".equals(strings(3))

})

filterRDD.foreach(println)

}

}

5.Demo5flatMap

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo5flatMap {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("flatMap算子演示")

val context = new SparkContext(conf)

//====================================================

val linesRDD: RDD[String] = context.textFile("spark/data/words.txt")

/**

* flatMap算子:將RDD中的數(shù)據(jù)一條一條的取出傳遞給后面的函數(shù),函數(shù)的返回值必須是一個集合。最后會將集合展開構(gòu)成一個新的RDD

*/

val wordsRDD: RDD[String] = linesRDD.flatMap((line: String) => line.split("\\|"))

wordsRDD.foreach(println)

}

}

6.Demo6sample

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo6sample {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("flatMap算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

/**

* sample算子:從前一個RDD的數(shù)據(jù)中抽樣一部分數(shù)據(jù)

*

* 抽取的比例不是正好對應(yīng)的,在抽取的比例上下浮動 比如1000條抽取10% 抽取的結(jié)果在100條左右

*/

val sampleRDD: RDD[String] = studentRDD.sample(withReplacement = true, 0.1)

sampleRDD.foreach(println)

}

}

7.Demo7GroupBy

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo7GroupBy {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("groupBy算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))

//需求:求出每個班級平均年齡

//使用模式匹配的方式取出班級和年齡

val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {

case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)

}

/**

* groupBy:按照指定的字段進行分組,返回的是一個鍵是分組字段,值是一個存放原本數(shù)據(jù)的迭代器的鍵值對 返回的是kv格式的RDD

*

* key: 是分組字段

* value: 是spark中的迭代器

* 迭代器中的數(shù)據(jù),不是完全被加載到內(nèi)存中計算,迭代器只能迭代一次

*

* groupBy會產(chǎn)生shuffle

*/

//按照班級進行分組

//val stringToStudents: Map[String, List[Student]] = stuList.groupBy((s: Student) => s.clazz)

val kvRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)

val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {

case (clazz: String, itr: Iterable[(String, Int)]) =>

//CompactBuffer((理科二班,21), (理科二班,23), (理科二班,21), (理科二班,23), (理科二班,21), (理科二班,21), (理科二班,24))

//CompactBuffer(21,23,21,23,21,21,24)

val allAge: Iterable[Int] = itr.map((kv: (String, Int)) => kv._2)

val avgAge: Double = allAge.sum.toDouble / allAge.size

(clazz, avgAge)

}

clazzAvgAgeRDD.foreach(println)

while (true){

}

}

}

8.Demo8GroupByKey

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo8GroupByKey {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("groupByKey算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))

//需求:求出每個班級平均年齡

//使用模式匹配的方式取出班級和年齡

val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {

case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)

}

/**

* groupByKey: 按照鍵進行分組,將value值構(gòu)成迭代器返回

* 將來你在spark中看到RDD[(xx, xxx)] 這樣的RDD就是kv鍵值對類型的RDD

* 只有kv類型鍵值對RDD才可以調(diào)用groupByKey算子

*

*/

val kvRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()

val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {

case (clazz: String, ageItr: Iterable[Int]) =>

(clazz, ageItr.sum.toDouble / ageItr.size)

}

clazzAvgAgeRDD.foreach(println)

while (true){

}

}

}

groupBy與groupByKey的區(qū)別(spark的面試題)

* 1、代碼上的區(qū)別:任意一個RDD都可以調(diào)用groupBy算子,只有kv類型的RDD才可以調(diào)用groupByKey

* 2、groupByKey之后產(chǎn)生的RDD的結(jié)構(gòu)比較簡單,方便后續(xù)處理

* 3、groupByKey的性能更好,執(zhí)行速度更快,因為groupByKey相比較與groupBy算子來說,shuffle所需要的數(shù)據(jù)量較少

9.Demo9ReduceByKey

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo9ReduceByKey {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("reduceByKey算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))

//求每個班級的人數(shù)

val clazzKVRDD: RDD[(String, Int)] = splitRDD.map {

case Array(_, _, _, _, clazz: String) => (clazz, 1)

}

/**

* 利用groupByKey實現(xiàn)

*/

// val kvRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()

// val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {

// case (clazz: String, n: Iterable[Int]) =>

// (clazz, n.sum)

// }

// clazzAvgAgeRDD.foreach(println)

/**

* 利用reduceByKey實現(xiàn):按照鍵key對value值直接進行聚合,需要傳入聚合的方式

* reduceByKey算子也是只有kv類型的RDD才能調(diào)用

*

*

*/

val countRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey((x: Int, y: Int) => x + y)

countRDD.foreach(println)

// clazzKVRDD.groupByKey()

// .map(kv=>(kv._1,kv._2.sum))

// .foreach(println)

while (true){

}

/**

*/

}

}

reduceByKey與groupByKey的區(qū)別

1、reduceByKey比groupByKey在map端多了一個預(yù)聚合的操作,預(yù)聚合之后的shuffle數(shù)據(jù)量肯定是要少很多的,性能上比groupByKey要好2、從靈活角度來看,reduceByKey并沒有g(shù)roupByKey靈活比如reduceByKey無法做方差,groupByKey后續(xù)可以完成

10.Demo10Union

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo10Union {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("Union算子演示")

val context = new SparkContext(conf)

//====================================================

val w1RDD: RDD[String] = context.textFile("spark/data/ws/w1.txt") // 1

val w2RDD: RDD[String] = context.textFile("spark/data/ws/w2.txt") // 1

/**

* union:上下合并兩個RDD,前提是兩個RDD中的數(shù)據(jù)類型要一致,合并后不會對結(jié)果進行去重

*

* 注:這里的合并只是邏輯層面上的合并,物理層面其實是沒有合并

*/

val unionRDD: RDD[String] = w1RDD.union(w2RDD)

println(unionRDD.getNumPartitions) // 2

unionRDD.foreach(println)

while (true){

}

}

}

11.Demo11Join

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo11Join {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("Join算子演示")

val context = new SparkContext(conf)

//====================================================

//兩個kv類型的RDD之間的關(guān)聯(lián)

//通過scala中的集合構(gòu)建RDD

val rdd1: RDD[(String, String)] = context.parallelize(

List(

("1001", "尚平"),

("1002", "丁義杰"),

("1003", "徐昊宇"),

("1004", "包旭"),

("1005", "朱大牛"),

("1006","汪權(quán)")

)

)

val rdd2: RDD[(String, String)] = context.parallelize(

List(

("1001", "崩壞"),

("1002", "原神"),

("1003", "王者"),

("1004", "修仙"),

("1005", "學(xué)習(xí)"),

("1007", "敲代碼")

)

)

/**

* 內(nèi)連接:join

* 左連接:leftJoin

* 右連接:rightJoin

* 全連接:fullJoin

*/

//內(nèi)連接

// val innerJoinRDD: RDD[(String, (String, String))] = rdd1.join(rdd2)

// //加工一下RDD

// val innerJoinRDD2: RDD[(String, String, String)] = innerJoinRDD.map {

// case (id: String, (name: String, like: String)) => (id, name, like)

// }

// innerJoinRDD2.foreach(println)

//左連接

val leftJoinRDD: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)

//加工一下RDD

val leftJoinRDD2: RDD[(String, String, String)] = leftJoinRDD.map {

case (id: String, (name: String, Some(like))) => (id, name, like)

case (id: String, (name: String, None)) => (id, name, "無愛好")

}

leftJoinRDD2.foreach(println)

println("=================================")

//右連接自己試

//TODO:自己試右連接

//全連接

val fullJoinRDD: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)

//加工一下RDD

val fullJoinRDD2: RDD[(String, String, String)] = fullJoinRDD.map {

case (id: String, (Some(name), Some(like))) => (id, name, like)

case (id: String, (Some(name), None)) => (id, name, "無愛好")

case (id: String, (None, Some(like))) => (id, "無姓名", like)

}

fullJoinRDD2.foreach(println)

}

}

12.Demo12Student

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo12Student {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("Join算子演示")

val context = new SparkContext(conf)

//====================================================

//需求:統(tǒng)計總分年級排名前10的學(xué)生的各科分數(shù)

//讀取分數(shù)文件數(shù)據(jù)

val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 讀取數(shù)據(jù)文件

.map((s: String) => s.split(",")) // 切分數(shù)據(jù)

.filter((arr: Array[String]) => arr.length == 3) // 過濾掉臟數(shù)據(jù)

.map {

//整理數(shù)據(jù),進行模式匹配取出數(shù)據(jù)

case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)

}

//計算每個學(xué)生的總分

val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {

case (sid: String, _: String, score: String) => (sid, score.toInt)

}.reduceByKey((x: Int, y: Int) => x + y)

//按照總分排序

val sumScoreTop10: Array[(String, Int)] = sumScoreWithSidRDD.sortBy(-_._2).take(10)

//取出前10的學(xué)生學(xué)號

val ids: Array[String] = sumScoreTop10.map(_._1)

//取出每個學(xué)生各科分數(shù)

val top10StuScore: RDD[(String, String, String)] = scoreRDD.filter {

case (id: String, _, _) => ids.contains(id)

}

top10StuScore.foreach(println)

}

}

13.Demo13MapValues

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo13MapValues {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("Join算子演示")

val context = new SparkContext(conf)

//====================================================

//需求:統(tǒng)計總分年級排名前10的學(xué)生的各科分數(shù)

//讀取分數(shù)文件數(shù)據(jù)

val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 讀取數(shù)據(jù)文件

.map((s: String) => s.split(",")) // 切分數(shù)據(jù)

.filter((arr: Array[String]) => arr.length == 3) // 過濾掉臟數(shù)據(jù)

.map {

//整理數(shù)據(jù),進行模式匹配取出數(shù)據(jù)

case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)

}

//計算每個學(xué)生的總分

val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {

case (sid: String, _: String, score: String) => (sid, score.toInt)

}.reduceByKey((x: Int, y: Int) => x + y)

/**

* mapValues算子:也是作用在kv類型的RDD上

* 主要的作用鍵不變,處理值

*/

val resRDD: RDD[(String, Int)] = sumScoreWithSidRDD.mapValues(_ + 1000)

resRDD.foreach(println)

//等同于

val res2RDD: RDD[(String, Int)] = sumScoreWithSidRDD.map((kv: (String, Int)) => (kv._1, kv._2 + 1000))

}

}

14.Demo14mapPartition

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

object Demo14mapPartition {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("mapPartition算子演示")

val context = new SparkContext(conf)

//====================================================

//需求:統(tǒng)計總分年級排名前10的學(xué)生的各科分數(shù)

//讀取分數(shù)文件數(shù)據(jù)

val scoreRDD: RDD[String] = context.textFile("spark/data/ws/*") // 讀取數(shù)據(jù)文件

println(scoreRDD.getNumPartitions)

/**

* mapPartition: 主要作用是一次處理一個分區(qū)的數(shù)據(jù),將一個分區(qū)的數(shù)據(jù)一個一個傳給后面的函數(shù)進行處理

*

* 迭代器中存放的是一個分區(qū)的數(shù)據(jù)

*/

// val mapPartitionRDD: RDD[String] = scoreRDD.mapPartitions((itr: Iterator[String]) => {

//

// println(s"====================當(dāng)前處理的分區(qū)====================")

// //這里寫的邏輯是作用在一個分區(qū)上的所有數(shù)據(jù)

// val words: Iterator[String] = itr.flatMap(_.split("\\|"))

// words

// })

// mapPartitionRDD.foreach(println)

scoreRDD.mapPartitionsWithIndex{

case (index:Int,itr: Iterator[String]) =>

println(s"當(dāng)前所處理的分區(qū)編號是:${index}")

itr.flatMap(_.split("\\|"))

}.foreach(println)

}

}

15.Demo15Actions

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo15Actions {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("Action算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

/**

* 轉(zhuǎn)換算子:transformation 將一個RDD轉(zhuǎn)換成另一個RDD,轉(zhuǎn)換算子是懶執(zhí)行的,需要一個action算子觸發(fā)執(zhí)行

*

* 行動算子(操作算子):action算子,觸發(fā)任務(wù)執(zhí)行。一個action算子就會觸發(fā)一次任務(wù)執(zhí)行

*/

println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")

val studentsRDD: RDD[(String, String, String, String, String)] = studentRDD.map(_.split(","))

.map {

case Array(id: String, name: String, age: String, gender: String, clazz: String) =>

println("**************************** 數(shù)加防偽碼 ^_^ ********************************")

(id, name, age, gender, clazz)

}

println("$$$$$$$$$$$$$$$$$$$$$$***__***$$$$$$$$$$$$$$$$$$$$$$$$$")

// foreach其實就是一個action算子

// studentsRDD.foreach(println)

// println("="*100)

// studentsRDD.foreach(println)

// while (true){

//

// }

/**

* collect()行動算子 主要作用是將RDD轉(zhuǎn)成scala中的數(shù)據(jù)結(jié)構(gòu)

*

*/

val tuples: Array[(String, String, String, String, String)] = studentsRDD.collect()

}

}

16.Demo16Catch

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

import org.apache.spark.storage.StorageLevel

object Demo16Catch {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("Action算子演示")

val context = new SparkContext(conf)

//設(shè)置checkpoint路徑,將來對應(yīng)的是HDFS上的路徑

context.setCheckpointDir("spark/data/checkpoint")

//====================================================

val linesRDD: RDD[String] = context.textFile("spark/data/students.csv")

val splitRDD: RDD[Array[String]] = linesRDD.map(_.split(","))

//處理數(shù)據(jù)

val studentsRDD: RDD[(String, String, String, String, String)] = splitRDD.map {

case Array(id: String, name: String, age: String, gender: String, clazz: String) =>

(id, name, age, gender, clazz)

}

//對studentsRDD進行緩存

/**

* 特點帶來的問題:既然叫做緩存,所以在程序運行過程中無論是只放內(nèi)存還是磁盤內(nèi)存一起使用,一旦程序結(jié)束,緩存數(shù)據(jù)全部丟失。

*

* spark針對上面的場景提供了一個解決方案:可以將RDD運行時的數(shù)據(jù)永久持久化在HDFS上,這個方案叫做checkpoint,需要在spark環(huán)境中設(shè)置checkpoint的路徑

*/

// studentsRDD.cache() //默認情況下,是將數(shù)據(jù)緩存在內(nèi)存中

// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK)

studentsRDD.checkpoint()

//統(tǒng)計每個班級的人數(shù)

val clazzKVRDD: RDD[(String, Int)] = studentsRDD.map {

case (_, _, _, _, clazz: String) => (clazz, 1)

}

val clazzNumRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey(_ + _)

clazzNumRDD.saveAsTextFile("spark/data/clazz_num")

//統(tǒng)計性別的人數(shù)

val genderKVRDD: RDD[(String, Int)] = studentsRDD.map {

case (_, _, _, gender: String, _) => (gender, 1)

}

val genderNumRDD: RDD[(String, Int)] = genderKVRDD.reduceByKey(_ + _)

genderNumRDD.saveAsTextFile("spark/data/gender_num")

//

// while (true){

//

// }

}

}

17.Demo17SparkStandaloneSubmit

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo17SparkStandaloneSubmit {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

/**

* 如果將來在linux集群中運行,這里就不需要設(shè)置setMaster

*/

// conf.setMaster("local")

val sparkContext = new SparkContext(conf)

val linesRDD: RDD[String] = sparkContext.parallelize(List("java,hello,world", "hello,scala,spark", "java,hello,spark"))

val wordRDD: RDD[String] = linesRDD.flatMap(_.split(","))

val wordKVRDD: RDD[(String, Int)] = wordRDD.map((_, 1))

val countRDD: RDD[(String, Int)] = wordKVRDD.reduceByKey(_ + _)

countRDD.foreach(println)

/**

* 將項目打包放到spark集群中使用standalone模式運行

* standalone client

* spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 100

*

* standalone cluster

* spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 100

*

*/

}

}

18.Demo18SparkYarnSubmit

package com.shujia.core

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

* 因為是提交到y(tǒng)arn上,可以對hdfs上的數(shù)據(jù)進行讀寫

*/

object Demo18SparkYarnSubmit {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

/**

* 提交到y(tǒng)arn上運行,這個參數(shù)依舊不用設(shè)置

*/

// conf.setMaster("local")

conf.setAppName("yarn submit")

val context = new SparkContext(conf)

//讀取hdfs上數(shù)據(jù)

val linesRDD: RDD[String] = context.textFile("/bigdata29/data/students.csv")

println("="*100)

println(s"分區(qū)數(shù)為:${linesRDD.getNumPartitions}")

println("="*100)

val classKVRDD: RDD[(String, Int)] = linesRDD.map((line: String) => {

val clazz: String = line.split(",")(4)

(clazz, 1)

})

//統(tǒng)計班級人數(shù)

val clazzNumRDD: RDD[(String, Int)] = classKVRDD.reduceByKey(_ + _)

//整理一下要寫到結(jié)果文件中的數(shù)據(jù)格式

val resRDD: RDD[String] = clazzNumRDD.map((kv: (String, Int)) => s"${kv._1}\t${kv._2}")

//刪除已經(jīng)存在的路徑

val hadoopConf = new Configuration()

val fileSystem: FileSystem = FileSystem.get(hadoopConf)

//判斷路徑是否存在

if(fileSystem.exists(new Path("/bigdata29/sparkout1"))){

fileSystem.delete(new Path("/bigdata29/sparkout1"),true)

}

//將RDD中的數(shù)據(jù)保存到HDFS上的文件中

resRDD.saveAsTextFile("/bigdata29/sparkout1")

/**

* spark-submit --class com.shujia.core.Demo18SparkYarnSubmit --master yarn --deploy-mode client spark-1.0.jar

*/

}

}

19.Demo19PI

package com.shujia.core

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object Demo19PI {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

/**

* 提交到y(tǒng)arn上運行,這個參數(shù)依舊不用設(shè)置

*/

// conf.setMaster("local")

conf.setAppName("yarn submit")

val context = new SparkContext(conf)

//設(shè)置生成點的個數(shù) 10000

val list: Range.Inclusive = 0 to 1000000000

//將scala的序列集合變成rdd

val rangeRDD: RDD[Int] = context.parallelize(list)

//隨機生成正方形內(nèi)的點

val dianRDD: RDD[(Double, Double)] = rangeRDD.map((i: Int) => {

val x: Double = Random.nextDouble() * 2 - 1

val y: Double = Random.nextDouble() * 2 - 1

(x, y)

})

// println(dianRDD.count())

//取出圓中點的個數(shù)

val yuanZuoRDD: RDD[(Double, Double)] = dianRDD.filter {

case (x: Double, y: Double) =>

x * x + y * y < 1

}

// println(yuanZuoRDD.count())

//計算PI

println("="*100)

println(s"PI的值為:${(yuanZuoRDD.count().toDouble / dianRDD.count()) * 4}")

println("="*100)

/**

* spark-submit --class com.shujia.core.Demo19PI --master yarn --deploy-mode client spark-1.0.jar

*/

}

}

20.Demo20Accumulator

package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

import org.apache.spark.util.LongAccumulator

object Demo20Accumulator {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("map算子演示")

val context = new SparkContext(conf)

//====================================================

val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

val scoreRDD: RDD[String] = context.textFile("spark/data/score.txt")

// var count = 0

// studentRDD.foreach((line:String)=>{

// count+=1

// println("-------------------------")

// println(count)

// println("-------------------------")

// })

// println(s"count的值為:${count}")

/**

* 累加器

*

* 由SparkContext來創(chuàng)建

* 注意:

* 1、因為累加器的執(zhí)行實在RDD中執(zhí)行的,而RDD是在Executor中執(zhí)行的,而要想在Executor中執(zhí)行就得有一個action算子觸發(fā)任務(wù)調(diào)度

* 2、sparkRDD中無法使用其他的RDD

* 3、SparkContext無法在RDD內(nèi)部使用,因為SparkContext對象無法進行序列化,不能夠通過網(wǎng)絡(luò)發(fā)送到Executor中

*/

// val accumulator: LongAccumulator = context.longAccumulator

// studentRDD.foreach((line:String)=>{

// accumulator.add(1)

// })

// studentRDD.map((line:String)=>{

// accumulator.add(1)

// }).collect()

// println(s"accumulator的值為:${accumulator.value}")

// val value: RDD[RDD[(String, String)]] = studentRDD.map((stuLine: String) => {

// scoreRDD.map((scoreLine: String) => {

// val strings: Array[String] = scoreLine.split(",")

// val strings1: Array[String] = stuLine.split(",")

// val str1: String = strings.mkString("|")

// val str2: String = strings1.mkString("|")

// (str1, str2)

// })

// })

// value.foreach(println)

// val value: RDD[RDD[String]] = studentRDD.map((stuLine: String) => {

// val scoreRDD: RDD[String] = context.textFile("spark/data/score.txt")

// scoreRDD

// })

// value.foreach(println)

}

}

21.Demo21Broadcast

package com.shujia.core

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source

object Demo21Broadcast {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local")

conf.setAppName("廣播變量演示")

val context = new SparkContext(conf)

//====================================================

//使用Scala的方式讀取學(xué)生數(shù)據(jù)文件,將其轉(zhuǎn)換以學(xué)號作為鍵的map集合,屬于在Driver端的一個變量

val studentsMap: Map[String, String] = Source.fromFile("spark/data/students.csv")

.getLines()

.toList

.map((line: String) => {

val infos: Array[String] = line.split(",")

val stuInfo: String = infos.mkString(",")

(infos(0), stuInfo)

}).toMap

val scoresRDD: RDD[String] = context.textFile("spark/data/score.txt")

/**

* 將studentsMap變成一個廣播變量,讓每一個將來需要執(zhí)行關(guān)聯(lián)的Executor中都有一份studentsMap數(shù)據(jù)

* 避免了每次Task任務(wù)拉取都要附帶一個副本,拉取的速度變快了,執(zhí)行速度也就變快了

*

* 廣播大變量

*/

val studentsMapBroadcast: Broadcast[Map[String, String]] = context.broadcast(studentsMap)

/**

* 將Spark讀取的分數(shù)RDD與外部變量學(xué)生Map集合進行關(guān)聯(lián)

* 循環(huán)遍歷scoresRDD,將學(xué)號一樣的學(xué)生信息關(guān)聯(lián)起來

*/

// val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {

// val id: String = score.split(",")(0)

// //使用學(xué)號到學(xué)生map集合中獲取學(xué)生信息

// val studentInfo: String = studentsMap.getOrElse(id, "無學(xué)生信息")

// (score, studentInfo)

// })

// resMapRDD.foreach(println)

/**

* 使用廣播變量進行關(guān)聯(lián)

*/

val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {

val id: String = score.split(",")(0)

val stuMap: Map[String, String] = studentsMapBroadcast.value

//使用學(xué)號到學(xué)生map集合中獲取學(xué)生信息

val studentInfo: String = stuMap.getOrElse(id, "無學(xué)生信息")

(score, studentInfo)

})

resMapRDD.foreach(println)

}

}

2.Spark-sql

Demo1WordCount

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}

object Demo1WordCount {

def main(args: Array[String]): Unit = {

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("wc spark sql")

.getOrCreate()

/**

* spark sql和spark core的核心數(shù)據(jù)類型不太一樣

*

* 1、讀取數(shù)據(jù)構(gòu)建一個DataFrame,相當(dāng)于一張表

*/

val linesDF: DataFrame = sparkSession.read

.format("csv") //指定讀取數(shù)據(jù)的格式

.schema("line STRING") //指定列的名和列的類型,多個列之間使用,分割

.option("sep", "\n") //指定分割符,csv格式讀取默認是英文逗號

.load("spark/data/words.txt") // 指定要讀取數(shù)據(jù)的位置,可以使用相對路徑

// println(linesDF)

// linesDF.show() //查看DF中的數(shù)據(jù)內(nèi)容(表內(nèi)容)

// linesDF.printSchema() //查看DF表結(jié)構(gòu)

/**

* 2、DF本身是無法直接在上面寫sql的,需要將DF注冊成一個視圖,才可以寫sql數(shù)據(jù)分析

*/

linesDF.createOrReplaceTempView("lines") // 起一個表名,后面的sql語句可以做查詢分析

/**

* 3、可以編寫sql語句 (統(tǒng)計單詞的數(shù)量)

* spark sql是完全兼容hive sql

*/

val resDF: DataFrame = sparkSession.sql(

"""

|select

|t1.word as word,

|count(1) as counts

|from

|(select

| explode(split(line,'\\|')) as word from lines) t1

| group by t1.word

|""".stripMargin)

/**

* 4、將計算的結(jié)果DF保存到HDFS上

*/

val resDS: Dataset[Row] = resDF.repartition(1)

resDS.write

.format("csv") //指定輸出數(shù)據(jù)文件格式

.option("sep","\t") // 指定列之間的分隔符

.mode(SaveMode.Overwrite) // 使用SaveMode枚舉類,設(shè)置為覆蓋寫

.save("spark/data/sqlout1") // 指定輸出的文件夾

}

}

Demo2DSLWordCount

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo2DSLWordCount {

def main(args: Array[String]): Unit = {

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("wc spark sql")

.getOrCreate()

/**

* spark sql和spark core的核心數(shù)據(jù)類型不太一樣

*

* 1、讀取數(shù)據(jù)構(gòu)建一個DataFrame,相當(dāng)于一張表

*/

val linesDF: DataFrame = sparkSession.read

.format("csv") //指定讀取數(shù)據(jù)的格式

.schema("line STRING") //指定列的名和列的類型,多個列之間使用,分割

.option("sep", "\n") //指定分割符,csv格式讀取默認是英文逗號

.load("spark/data/words.txt") // 指定要讀取數(shù)據(jù)的位置,可以使用相對路徑

/**

* DSL: 類SQL語法 api 介于代碼和純sql之間的一種api

*

* spark在DSL語法api中,將純sql中的函數(shù)都使用了隱式轉(zhuǎn)換變成一個scala中的函數(shù)

* 如果想要在DSL語法中使用這些函數(shù),需要導(dǎo)入隱式轉(zhuǎn)換

*

*/

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

import org.apache.spark.sql.functions._

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理

import sparkSession.implicits._

// linesDF.select(explode(split($"line","\\|")) as "word")

// .groupBy($"word")

// .count().show()

val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word")

.groupBy($"word")

.agg(count($"word") as "counts")

/**

* 保存數(shù)據(jù)

*/

resultDF

.repartition(1)

.write

.format("csv")

.option("sep","\t")

.mode(SaveMode.Overwrite)

.save("spark/data/sqlout2")

}

}

Demo3DSLAPI

package com.shujia.sql

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo3DSLAPI {

def main(args: Array[String]): Unit = {

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("dsl語法api演示")

.config("spark.sql.shuffle.partitions",1) //默認分區(qū)的數(shù)量是200個

.getOrCreate()

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

import org.apache.spark.sql.functions._

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理

import sparkSession.implicits._

/**

* DSL api

*/

//新版本的讀取方式,讀取一個json數(shù)據(jù),不需要手動指定列名

// val stuDF1: DataFrame = sparkSession.read

// .json("spark/data/students.json")

//以前版本讀取方式,靈活度要高一些

val stuDF: DataFrame = sparkSession.read

.format("json")

.load("spark/data/students.json")

// stuDF2.show(100, truncate = false) //傳入展示總條數(shù),并完全顯示數(shù)據(jù)內(nèi)容

/**

* select 函數(shù):選擇數(shù)據(jù)【字段】,和純sql語句中select意思基本是一樣,在數(shù)據(jù)的前提上選擇要留下的列

*

*/

//根據(jù)字段的名字選擇要查詢的字段

// stuDF.select("id","name","age").show(1)

// //根據(jù)字段的名字選擇要查詢的字段,selectExpr 可以傳入表達式字符串形式

// stuDF.selectExpr("id","name","age","age + 1 as new_age").show()

//使用隱式轉(zhuǎn)換中的$函數(shù)將字段變成一個對象

// stuDF.select($"id",$"name",$"age").show(10)

//使用對象做處理

// stuDF.select($"id",$"name",$"age" + 1 as "new_age").show(10)

//可以在select中使用sql的函數(shù)

//下面的操作等同于sql:select id,name,age+1 as new_age,substring(clazz,0,2) as km from lines;

// stuDF.select($"id",$"name",$"age" + 1 as "new_age",substring($"clazz",0,2) as "km").show(10)

/**

* where 函數(shù):過濾數(shù)據(jù)

*/

//直接將sql中where語句以字符串的形式傳參

// stuDF.where("gender='女' and age=23").show()

//使用$列對象的形式過濾

// =!= 不等于

// === 等于

// stuDF.where($"gender" === "女" and $"age" === 23).show()

// stuDF.where($"gender" =!= "男" and $"age" === 23).show()

//過濾文科的學(xué)生

// stuDF.where(substring($"clazz", 0, 2) === "文科").show()

/**

* groupBy 分組函數(shù)

* agg 聚合函數(shù)

* 分組聚合要在一起使用

* 分組聚合之后的結(jié)果DF中只會包含分組字段和聚合字段

* select中無法出現(xiàn)不是分組的字段

*/

//根據(jù)班級分組,求每個班級的人數(shù)和平均年齡

// stuDF.groupBy($"clazz")

// .agg(count($"clazz") as "number",round(avg($"age"),2) as "avg_age").show()

/**

* orderBy: 排序

*/

// stuDF.groupBy($"clazz")

// .agg(count($"clazz") as "number")

// .orderBy($"number").show()

/**

* join: 表關(guān)聯(lián)

*/

val scoreDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("id STRING,subject_id STRING,score INT")

.load("spark/data/score.txt")

// scoreDF.show()

//關(guān)聯(lián)場景1:所關(guān)聯(lián)的字段名字不一樣的時候

// stuDF.join(scoreDF, $"id" === $"sid", "inner").show()

//關(guān)聯(lián)場景2:所關(guān)聯(lián)的字段名字一樣的時候

// stuDF.join(scoreDF,"id").show()

/**

* 開窗函數(shù)

* 統(tǒng)計每個班級總分前3的學(xué)生

*

* 開窗不會改變總條數(shù)的,會以新增一列的形式加上開窗的結(jié)果

* withColumn 新增一列

*/

val joinStuAndScoreWithIDDF: DataFrame = stuDF.join(scoreDF, "id")

joinStuAndScoreWithIDDF.groupBy($"id", $"clazz") //根據(jù)學(xué)號和班級一起分組

.agg(sum($"score") as "sumScore") //計算總分

.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))

//.select($"id", $"clazz", $"sumScore", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc) as "rn")

.where($"rn" <= 3)

.show()

}

}

Demo4DataSourceAPI

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo4DataSourceAPI {

def main(args: Array[String]): Unit = {

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("dsl語法api演示")

.config("spark.sql.shuffle.partitions",1) //默認分區(qū)的數(shù)量是200個

.getOrCreate()

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理

/**

* 讀取csv格式的數(shù)據(jù),默認是以英文逗號分割的

*

*/

// val stuCsvDF: DataFrame = sparkSession.read

// .format("csv")

// .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")

// .option("sep", ",")

// .load("spark/data/students.csv")

// //求每個班級的人數(shù),保存到文件中

// stuCsvDF.groupBy($"clazz")

// .agg(count($"clazz") as "number")

// .write

// .format("csv")

// .option("sep",",")

// .mode(SaveMode.Overwrite)

// .save("spark/data/souceout1")

/**

* 讀取json數(shù)據(jù)格式,因為json數(shù)據(jù)有鍵值對。會自動地將鍵作為列名,值作為列值,不需要手動設(shè)置表結(jié)構(gòu)

*/

val stuJsonDF: DataFrame = sparkSession.read

.format("json")

.load("spark/data/students2.json")

// //統(tǒng)計每個性別的人數(shù)

// stuJsonDF.groupBy($"gender")

// .agg(count($"gender") as "number")

// .write

// .format("json")

// .mode(SaveMode.Overwrite)

// .save("spark/data/jsonout1")

/**

* parquet

* 壓縮的比例是由【信息熵】來決定的

*/

// stuJsonDF.write

// .format("parquet")

// .mode(SaveMode.Overwrite)

// .save("spark/data/parquetout2")

//讀取parquet格式文件的時候,也是不需要手動指定表結(jié)構(gòu)

// val stuParquetDF: DataFrame = sparkSession.read

// .format("parquet")

// .load("spark/data/parquetout1/part-00000-c5917bb6-172b-49bd-a90c-90b7f09b69d6-c000.snappy.parquet")

// stuParquetDF.show()

/**

* 讀取數(shù)據(jù)庫中的數(shù)據(jù),mysql

*

*/

val jdDF: DataFrame = sparkSession.read

.format("jdbc")

.option("url", "jdbc:mysql://192.168.220.100:3306")

.option("dbtable", "bigdata29.jd_goods")

.option("user", "root")

.option("password", "123456")

.load()

jdDF.show(10,truncate = false)

}

}

Demo5RDDToDF

package com.shujia.sql

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object Demo5RDDToDF {

def main(args: Array[String]): Unit = {

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("RDD和DF互相轉(zhuǎn)換演示")

.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個

.getOrCreate()

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換

import sparkSession.implicits._

/**

* 使用SparkContext讀取數(shù)據(jù)封裝成RDD

*

* SparkSession包含了SparkContext

*/

//使用SparkSession獲取SparkContext

val sc: SparkContext = sparkSession.sparkContext

val linesRDD: RDD[String] = sc.textFile("spark/data/students.csv")

val studentsRDD: RDD[(String, String, Int, String, String)] = linesRDD.map((line: String) => line.split(","))

.map {

//1500100001,施笑槐,22,女,文科六班

case Array(id: String, name: String, age: String, gender: String, clazz: String) =>

(id, name, age.toInt, gender, clazz)

}

/**

* RDD轉(zhuǎn)DF

*/

val studentsDF: DataFrame = studentsRDD.toDF("id", "name", "age", "gender", "clazz")

studentsDF.createOrReplaceTempView("students")

val resultDF: DataFrame = sparkSession.sql(

"""

|select

|clazz,

|count(1) as number

|from

|students

|group by clazz

|""".stripMargin)

/**

* 在Row的數(shù)據(jù)類型中 所有整數(shù)類型統(tǒng)一為Long 小數(shù)類型統(tǒng)一為Double

* 轉(zhuǎn)RDD

*/

val studentsRDD2: RDD[Row] = resultDF.rdd

// studentsRDD2.map((row:Row)=>{

// val clazz: String = row.getAs[String]("clazz")

// val number: Long = row.getAs[Long]("number")

// (clazz,number)

// }).foreach(println)

studentsRDD2.map{

case Row(clazz:String,number:Long)=>

(clazz,number)

}.foreach(println)

}

}

Demo6Window

package com.shujia.sql

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.{DataFrame, SparkSession}

/**

* 開窗函數(shù)

* 聚合開窗函數(shù):sum count avg min max

* 排序開窗函數(shù):row_number rank desen_rank lag(向上?。?lead(向后?。?/p>

*/

object Demo6Window {

def main(args: Array[String]): Unit = {

//創(chuàng)建SparkSession對象

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("開窗函數(shù)DSL API演示")

.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個

.getOrCreate()

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

import org.apache.spark.sql.functions._

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換

import sparkSession.implicits._

//學(xué)生表

val studentsDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")

.load("spark/data/students.csv")

//成績表

val scoresDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("sid STRING,subject_id STRING,score INT")

.load("spark/data/score.txt")

//科目表

val subjectDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("subject_id STRING,subject_name STRING,subject_sum_score INT")

.load("spark/data/subject.csv")

//將學(xué)生數(shù)據(jù)與成績數(shù)據(jù)進行關(guān)聯(lián)

val joinDF: DataFrame = studentsDF.join(scoresDF, $"id" === $"sid")

// joinDF.show(10)

/**

* 1、統(tǒng)計總分年級排名前十學(xué)生各科的分數(shù)

*

* 未排序之前,是將開窗中所有數(shù)據(jù)一起聚合得到一個結(jié)果

* 若排序了,依次從上到下聚合得到一個結(jié)果

*

*/

joinDF

// sum(score) over(partition by id ) as sumScore

.withColumn("sumScore", sum($"score") over Window.partitionBy($"id"))

.orderBy($"sumScore".desc)

.limit(60)

//.show(60)

/**

* 3、統(tǒng)計每科都及格的學(xué)生

*/

scoresDF

.join(subjectDF, "subject_id")

.where($"score" >= $"subject_sum_score" * 0.6)

//統(tǒng)計學(xué)生及格的科目數(shù)

.withColumn("jiGeCounts", count($"sid") over Window.partitionBy($"sid"))

.where($"jiGeCounts" === 6)

// .show(100)

/**

* 2、統(tǒng)計總分大于年級平均分的學(xué)生

*/

joinDF

//計算每個學(xué)生的總分,新增一列

.withColumn("sumScore", sum($"score") over Window.partitionBy($"id"))

.withColumn("avgScore", avg($"sumScore") over Window.partitionBy(substring($"clazz", 0, 2)))

.where($"sumScore" > $"avgScore")

// .show(200)

/**

* 統(tǒng)計每個班級的每個名次之間的分數(shù)差

*

*/

joinDF

.groupBy($"id", $"clazz")

.agg(sum($"score") as "sumScore")

//開窗,班級開窗,總分降序排序,排個名次

.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))

//開窗,取出前一名的總分

.withColumn("front_score", lag($"sumScore",1,750) over Window.partitionBy($"clazz").orderBy($"sumScore".desc))

.withColumn("cha",$"front_score" - $"sumScore")

.show(100)

}

}

Demo7BurksTest1

package com.shujia.sql

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Demo7BurksTest1 {

def main(args: Array[String]): Unit = {

//創(chuàng)建SparkSession對象

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("公司營收額數(shù)據(jù)需求演示")

.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個

.getOrCreate()

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

import org.apache.spark.sql.functions._

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換

import sparkSession.implicits._

//讀取數(shù)據(jù)

val burksDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("burk STRING,year STRING" +

",tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE" +

",tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE" +

",tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE")

.load("spark/data/burks.txt")

/**

* 1、統(tǒng)計每個公司每年按月累計收入 行轉(zhuǎn)列 --> sum窗口函數(shù)

* 輸出結(jié)果

* 公司代碼,年度,月份,當(dāng)月收入,累計收入

*

*/

//純sql的方式實現(xiàn)

// burksDF.createOrReplaceTempView("burks")

// sparkSession.sql(

// """

// |select

// |t1.burk as burk,

// |t1.year as year,

// |t1.month as month,

// |t1.tsl as tsl,

// |sum(t1.tsl) over(partition by t1.burk,t1.year order by t1.month) as leijia

// |from

// |(select

// | burk,

// | year,

// | month,

// | tsl

// | from

// | burks

// | lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,tsl

// |) t1

// |""".stripMargin).show()

// 使用DSL API實現(xiàn)

val m: Column = map(

expr("1"), $"tsl01",

expr("2"), $"tsl02",

expr("3"), $"tsl03",

expr("4"), $"tsl04",

expr("5"), $"tsl05",

expr("6"), $"tsl06",

expr("7"), $"tsl07",

expr("8"), $"tsl08",

expr("9"), $"tsl09",

expr("10"), $"tsl10",

expr("11"), $"tsl11",

expr("12"), $"tsl12"

)

burksDF

.select($"burk",$"year",explode(m) as Array("month","tsl"))

//按月累計

.withColumn("leijia",sum($"tsl") over Window.partitionBy($"burk",$"year").orderBy($"month"))

.show()

/**

* 2、統(tǒng)計每個公司當(dāng)月比上年同期增長率 行轉(zhuǎn)列 --> lag窗口函數(shù)

* 公司代碼,年度,月度,增長率(當(dāng)月收入/上年當(dāng)月收入 - 1)

*/

}

}

Demo8SubmitYarn

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo8SubmitYarn {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.config("spark.sql.shuffer.partitions", 1)

.getOrCreate()

import sparkSession.implicits._

import org.apache.spark.sql.functions._

val stuendsDF: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")

.load("/bigdata29/spark_in/data/student")

val genderCountsDF: DataFrame = stuendsDF.groupBy($"gender")

.agg(count($"gender") as "counts")

genderCountsDF.write.format("csv").option("sep",",").mode(SaveMode.Overwrite).save("/bigdata29/spark_out/out2")

}

}

Demo9SparkOnHive

package com.shujia.sql

import org.apache.spark.sql.SparkSession

object Demo9SparkOnHive {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("spark讀取hive數(shù)據(jù)")

.enableHiveSupport()

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

import sparkSession.implicits._

import org.apache.spark.sql.functions._

sparkSession.sql("use bigdata29")

sparkSession.sql("select clazz,count(1) as counts from students group by clazz").show()

}

}

Demo10Student

package com.shujia.sql

import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}

/**

* 1、行列轉(zhuǎn)換

*

* 表1

* 姓名,科目,分數(shù)

* name,item,score

* 張三,數(shù)學(xué),33

* 張三,英語,77

* 李四,數(shù)學(xué),66

* 李四,英語,78

*

*

* 表2

* 姓名,數(shù)學(xué),英語

* name,math,english

* 張三,33,77

* 李四,66,78

*

* 1、將表1轉(zhuǎn)化成表2

* 2、將表2轉(zhuǎn)化成表1

*/

object Demo10Student {

def main(args: Array[String]): Unit = {

//創(chuàng)建SparkSession對象

/**

* 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("行列互相轉(zhuǎn)換演示")

.config("spark.sql.shuffle.partitions", 1) //默認分區(qū)的數(shù)量是200個

.getOrCreate()

//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)

import org.apache.spark.sql.functions._

//導(dǎo)入另一個隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進行處理,如果需要做RDD和DF之間的轉(zhuǎn)換

import sparkSession.implicits._

/**

* 列轉(zhuǎn)行

*/

val tb1DF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("name STRING,item STRING,score INT")

.load("/bigdata29/tb1.txt")

/*

張三,數(shù)學(xué),33

張三,英語,77

李四,數(shù)學(xué),66

李四,英語,78

張三,33,77

李四,66,78

*/

// val res1DF: DataFrame = tb1DF.groupBy($"name")

// .agg(

// sum(when($"item" === "數(shù)學(xué)", $"score").otherwise(0)) as "math",

// sum(when($"item" === "英語", $"score").otherwise(0)) as "english")

// res1DF.write

// .format("csv")

// .option("sep",",")

// .mode(SaveMode.Overwrite)

// .save("/bigdata29/out7")

// res1DF.show()

/**

* 行轉(zhuǎn)列

*/

// val tb2DF: DataFrame = sparkSession.read

// .format("csv")

// .option("sep", ",")

// .schema("name STRING,math STRING,english INT")

// .load("/bigdata29/out7/*")

//

// val m: Column = map(

// expr("'數(shù)學(xué)'"), $"math",

// expr("'語文'"), $"english"

// )

// tb2DF.select($"name",explode(m) as Array("item","score")).show()

}

}

Demo11UDF

package com.shujia.sql

import org.apache.spark.sql.expressions.UserDefinedFunction

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo11UDF {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("udf函數(shù)演示")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

import org.apache.spark.sql.functions._

/**

* 1、在使用DSL的時候使用自定義函數(shù)

*/

val studentsDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")

.load("spark/data/students.csv")

// studentsDF.show()

//編寫自定義函數(shù)

//udf中編寫的是scala代碼

val shujia_fun1: UserDefinedFunction = udf((str: String) => "數(shù)加:" + str)

// studentsDF.select(shujia_fun1($"clazz")).show()

/**

* 1、使用SQL語句中使用自定函數(shù)

*/

studentsDF.createOrReplaceTempView("students")

//將自定義的函數(shù)變量注冊成一個函數(shù)

sparkSession.udf.register("shujia_str",shujia_fun1)

sparkSession.sql(

"""

|select clazz,shujia_str(clazz) as new_clazz from students

|""".stripMargin).show()

}

}

Demo12ShuJiaStr

package com.shujia.sql

import org.apache.hadoop.hive.ql.exec.UDF

class Demo12ShuJiaStr extends UDF {

def evaluate(str: String): String = {

"shujia: " + str

}

}

/**

* 1、將類打包,放在linux中spark的jars目錄下

* 2、進入spark-sql的客戶端

* 3、使用上傳的jar中的udf類來創(chuàng)建一個函數(shù)

* create function shujia_str as 'com.shujia.sql.Demo12ShuJiaStr';

*/

Demo13SheBao

package com.shujia.sql

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo13SheBao {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("作業(yè)社保演示")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

import org.apache.spark.sql.functions._

import sparkSession.implicits._

//讀取數(shù)據(jù)

val sheBaoDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("id STRING,burk STRING,sdate STRING")

.load("spark/data/shebao.txt")

//統(tǒng)計每個員工的工作經(jīng)歷

sheBaoDF

//取出員工上一個月所在公司

.withColumn("last_burk", lag($"burk", 1) over Window.partitionBy($"id").orderBy($"sdate"))

//.show()

//在每一行后新增一列,標(biāo)記列,如果換工作了,標(biāo)記1 否則標(biāo)記0

.withColumn("flag", when($"burk" === $"last_burk", 0).otherwise(1))//.show()

//以用戶開窗,將后面的flag值加起來

.withColumn("tmp",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))//.show()

.groupBy($"id",$"burk",$"tmp")

.agg(min($"sdate") as "start_date",max($"sdate") as "end_start")

.show(100)

}

}

package com.shujia.sql

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object Demo14MaYi {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("作業(yè)社保演示")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

import org.apache.spark.sql.functions._

import sparkSession.implicits._

//用戶碳排放量表

val ant_user_low_carbon: DataFrame = sparkSession.read

.format("csv")

.option("sep", "\t")

.schema("user_id STRING,data_dt STRING,low_carbon DOUBLE")

.load("spark/data/ant_user_low_carbon.txt")

val ant_plant_carbon: DataFrame = sparkSession.read

.format("csv")

.option("sep", "\t")

.schema("plant_id STRING,plant_name STRING,plant_carbon DOUBLE")

.load("spark/data/ant_plant_carbon.txt")

// ant_user_low_carbon.show()

// ant_plant_carbon.show()

/**

* 螞蟻森林植物申領(lǐng)統(tǒng)計

* 問題:假設(shè)2017年1月1日開始記錄低碳數(shù)據(jù)(user_low_carbon),假設(shè)2017年10月1日之前滿足申領(lǐng)條件的用戶都申領(lǐng)了一顆p004-胡楊,

* 剩余的能量全部用來領(lǐng)取“p002-沙柳” 。

* 統(tǒng)計在10月1日累計申領(lǐng)“p002-沙柳” 排名前10的用戶信息;以及他比后一名多領(lǐng)了幾顆沙柳。

* 得到的統(tǒng)計結(jié)果如下表樣式:

*/

//因為用戶能量表與植物能量表沒有關(guān)聯(lián)字段,如果想要在sql中使用胡楊以及沙柳的能量的話,需要先單獨從植物能量表將胡楊以及沙柳的能量取出來,使用變量接收

val huYangCarbon: Double = ant_plant_carbon

//過濾條件是胡楊

.where($"plant_name" === "胡楊")

//選擇能量的一列

.select($"plant_carbon")

//轉(zhuǎn)rdd

.rdd

//轉(zhuǎn)scala數(shù)組

.collect()

.map {

case Row(plant_carbon: Double) => plant_carbon

}.head

val shaLiuCarbon: Double = ant_plant_carbon

//過濾條件是胡楊

.where($"plant_name" === "沙柳")

//選擇能量的一列

.select($"plant_carbon")

//轉(zhuǎn)rdd

.rdd

//轉(zhuǎn)scala數(shù)組

.collect()

.map {

case Row(plant_carbon: Double) => plant_carbon

}.head

println(s"胡楊所需的碳排放量:${huYangCarbon},沙柳所需要的碳排放量:${shaLiuCarbon}")

println("---------------------------------------------------------------------------")

ant_user_low_carbon

//取出2017年1月1日到2017年10月1日

.where($"data_dt" >= "2017/1/1" and $"data_dt" <= "2017/10/1")

//按照用戶分組,求用戶的總的排放量

.groupBy($"user_id")

.agg(sum($"low_carbon") as "sum_low_carbon")

//判斷能量是否滿足一個胡楊的能量,如果滿足直接減去胡楊的能量,得到剩余的能量

.withColumn("shengYu_carbon", when($"sum_low_carbon" >= huYangCarbon, $"sum_low_carbon" - huYangCarbon).otherwise($"sum_low_carbon"))

//計算剩余的能量可以領(lǐng)取多少個沙柳

.withColumn("number", floor($"shengYu_carbon" / shaLiuCarbon))

//.show()

//獲取后一名用戶的領(lǐng)取沙柳的個數(shù)

.withColumn("lead_number", lead($"number", 1, 0) over Window.orderBy($"number".desc))

//.show()

.withColumn("duo", $"number" - $"lead_number")

.limit(10)

//.show()

/**

* 螞蟻森林低碳用戶排名分析

* 問題:查詢user_low_carbon表中每日流水記錄,條件為:

* 用戶在2017年,連續(xù)三天(或以上)的天數(shù)里,

* 每天減少碳排放(low_carbon)都超過100g的用戶低碳流水。

* 需要查詢返回滿足以上條件的user_low_carbon表中的記錄流水。

* 例如用戶u_002符合條件的記錄如下,因為2017/1/2~2017/1/5連續(xù)四天的碳排放量之和都大于等于100g:

*/

//("user_id STRING,data_dt STRING,low_carbon DOUBLE")

ant_user_low_carbon

//過濾出2017年的數(shù)據(jù)

.where(substring($"data_dt", 0, 4) === "2017")

//計算用戶每天積攢的能量 用戶id和日期一起分組

.groupBy($"user_id", $"data_dt")

.agg(sum($"low_carbon") as "sum_low_carbon")

//過濾掉能量小于等于100的

.where($"sum_low_carbon" > 100)

//開窗有,以用戶開窗,以日期升序排序,編號 row_number

.withColumn("rn", row_number() over Window.partitionBy($"user_id").orderBy($"data_dt"))

//使用日期減去排序

.withColumn("flag_dt", date_sub(regexp_replace($"data_dt", "/", "-"), $"rn"))

//計算連續(xù)的天數(shù)

.withColumn("lianxu_days", count($"data_dt") over Window.partitionBy($"user_id", $"flag_dt"))

//過濾出連續(xù)3天以上的數(shù)據(jù)

.where($"lianxu_days" >= 3)

//關(guān)聯(lián)流水記錄表,得到每個符合條件的數(shù)據(jù)

.join(ant_user_low_carbon,List("user_id","data_dt"))

.select($"user_id",$"data_dt",$"low_carbon")

.show(100)

}

}

Test

package com.shujia.sql

import org.apache.spark.sql.expressions.UserDefinedFunction

import org.apache.spark.sql.{DataFrame, SparkSession}

object Test {

def main(args: Array[String]): Unit = {

//1、創(chuàng)建Spark sql環(huán)境

val spark: SparkSession = SparkSession

.builder()

.master("local[2]")

.appName("sql")

.config("spark.sql.shuffle.partitions", 1) //默認在集群中時200個

.getOrCreate()

import org.apache.spark.sql.functions._

//定義UDF

val str_split: UserDefinedFunction = udf((line: String) => {

"數(shù)加:"+line

})

// val value2: Broadcast[UserDefinedFunction] = spark.sparkContext.broadcast(str_split)

// spark.udf.register("str_split", str_split)

/**

* 1、在使用DSL的時候使用自定義函數(shù)

*/

val studentsDF: DataFrame = spark.read

.format("csv")

.option("sep", ",")

.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")

.load("spark/data/students.csv")

//在使用DSL時,使用自定義函數(shù)

// studentsDF.select(str_split($"clazz")).show()

studentsDF.createOrReplaceTempView("lines")

spark.udf.register("str_split", (line: String) => "數(shù)加:"+line)

spark.sql(

"""

|select str_split(clazz) from lines

|""".stripMargin).show()

/**

* 在 sql中使用自定義函數(shù)

*/

// studentsDF.createOrReplaceTempView("lines")

//

// //注冊自定義函數(shù)

// spark.udf.register("str_split", (line: String) => "數(shù)加:"+line)

//

// spark.sql(

// """

// |select * from

// |lines

// |lateral view explode(str_split(line,',')) T as word

// |

// |""".stripMargin).show()

}

}

3.Spark-sql

Demo1WordCount

package com.shujia.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo1WordCount {

def main(args: Array[String]): Unit = {

/**

* spark core: SparkContext --> 核心數(shù)據(jù)類型 RDD

* spark sql: SparkSession --> 核心數(shù)據(jù)類型 DataFrame

* spark streaming: StreamingContext --> 核心數(shù)據(jù)類型 DStream(RDD)

*

* 創(chuàng)建SparkStreaming的環(huán)境需要使用StreamingContext對象

*/

val conf = new SparkConf()

conf.setMaster("local[2]")

conf.setAppName("spark streaming 單詞統(tǒng)計案例")

val sparkContext = new SparkContext(conf)

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

val lineDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

val flatMapDS: DStream[String] = lineDS.flatMap(_.split(" "))

val wordsKVDS: DStream[(String, Int)] = flatMapDS.map((_, 1))

val resultDS: DStream[(String, Int)] = wordsKVDS.reduceByKey((x: Int, y: Int) => x + y)

resultDS.print()

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo2UpdateStateByKey

package com.shujia.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo2UpdateStateByKey {

def main(args: Array[String]): Unit = {

/**

* spark core: SparkContext --> 核心數(shù)據(jù)類型 RDD

* spark sql: SparkSession --> 核心數(shù)據(jù)類型 DataFrame

* spark streaming: StreamingContext --> 核心數(shù)據(jù)類型 DStream(RDD)

*

* 創(chuàng)建SparkStreaming的環(huán)境需要使用StreamingContext對象

*/

//spark streaming依賴于spark core的環(huán)境

//因為spark streaming核心數(shù)據(jù)類型底層封裝的是RDD

val conf = new SparkConf()

conf.setMaster("local[2]")

conf.setAppName("spark streaming 單詞統(tǒng)計案例1")

val sparkContext = new SparkContext(conf)

// def this(sparkContext: SparkContext, batchDuration: Duration) //傳入一個spark core 環(huán)境對象以及一個接收時間的范圍

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

//設(shè)置checkpoint路徑

streamingContext.checkpoint("spark/data/stream_state_wc_checkpoint")

/**

* 因為sparkstreaming是一個近實時的計算引擎,整個程序?qū)懲赀\行的狀態(tài),一直運行,一直接收數(shù)據(jù),除非異?;蛘呤謩油V?/p>

*

* 1、目前在spark使用nc工具監(jiān)控一個端口號中源源不斷產(chǎn)生的數(shù)據(jù)

* 2、yum install -y nc 下載安裝nc工具

* 3、nc -lk xxxx

*/

val lineDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

val flatMapDS: DStream[String] = lineDS.flatMap(_.split(" "))

val wordsKVDS: DStream[(String, Int)] = flatMapDS.map((_, 1))

val res2DS: DStream[(String, Int)] = wordsKVDS.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {

val currentCount: Int = seq.sum

val count: Int = option.getOrElse(0)

Option(currentCount + count)

})

res2DS.print()

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo3ReduceByKeyAndWindow

package com.shujia.streaming

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo3ReduceByKeyAndWindow {

def main(args: Array[String]): Unit = {

/**

* 舊版本創(chuàng)建sparkContext的方式

*/

// val conf = new SparkConf()

// conf.setMaster("local[2]")

// conf.setAppName("spark streaming 單詞統(tǒng)計案例1")

// val sparkContext = new SparkContext(conf)

// // def this(sparkContext: SparkContext, batchDuration: Duration) //傳入一個spark core 環(huán)境對象以及一個接收時間的范圍

// val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

/**

* 新版本中推薦使用SparkSession對象來獲取

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local[2]")

.appName("窗口演示")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

//創(chuàng)建SparkStreaming對象

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

//將數(shù)據(jù)封裝成kv格式,將來可以調(diào)用kv類型的算子來進行操作

val wordsKVDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))

//1、如果只需要計算當(dāng)前批次的數(shù)據(jù),直接reduceByKey

//2、如果要從最開始的數(shù)據(jù)計算,使用有狀態(tài)算子,updateStateByKey和checkpoint配合使用

//3、如果要計算最近的時間段內(nèi)的數(shù)據(jù),使用窗口類算子,reduceByKeyAndWindow

/**

* 滑動窗口

*/

// val res1DS: DStream[(String, Int)] = wordsKVDS.reduceByKeyAndWindow(

// (x: Int, y: Int) => x + y,

// Durations.seconds(15), // 設(shè)置窗口的大小

// Durations.seconds(5)) // 設(shè)置滑動的大小

/**

* 滾動窗口

*/

val res1DS: DStream[(String, Int)] = wordsKVDS.reduceByKeyAndWindow(

(x: Int, y: Int) => x + y,

Durations.seconds(10), // 設(shè)置窗口的大小

Durations.seconds(10)) // 設(shè)置滑動的大小

res1DS.print()

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo4DStreamToRDD

package com.shujia.streaming

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo4DStreamToRDD {

def main(args: Array[String]): Unit = {

/**

* 創(chuàng)建SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local[2]")

.appName("DS2RDD演示")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

import sparkSession.implicits._

import org.apache.spark.sql.functions._

val sparkContext: SparkContext = sparkSession.sparkContext

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

/**

* 通過spark streaming讀取數(shù)據(jù)得到一個DS

*/

//hello hello world java hello hadoop

val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

val new_linesDS: DStream[String] = linesDS.window(Durations.seconds(15), Durations.seconds(5))

/**

* DStream底層也是RDD,每隔一個批次將接收到的數(shù)據(jù)封裝到RDD中

* 每隔一個批次,接收到數(shù)據(jù)是不一樣的

*/

new_linesDS.foreachRDD((rdd: RDD[String]) => {

println("===================================================")

println("正在處理當(dāng)前批次的數(shù)據(jù).....")

println("===================================================")

//在這里面直接寫處理rdd的代碼

// rdd.flatMap(_.split(" "))

// .map((_, 1))

// .groupBy(_._1)

// .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))

// .foreach(println)

/**

* 既然這里可以操作rdd, 又因為rdd可以轉(zhuǎn)df, 那么我就可以在spark streaming中一個批次的數(shù)據(jù)可以使用使用sql語句來分析

*/

//def toDF(colNames: String*)

val linesDF: DataFrame = rdd.toDF("line")

linesDF.createOrReplaceTempView("words")

sparkSession.sql(

"""

|select

|t1.word as word,

|count(1) as number

|from

|(select

| explode(split(line,' ')) as word

|from

|words) t1

|group by t1.word

|""".stripMargin).show()

})

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo5RDDToDStream

package com.shujia.streaming

import org.apache.spark.{SparkContext, rdd}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import org.apache.spark.streaming.{Durations, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Demo5RDDToDStream {

def main(args: Array[String]): Unit = {

/**

* 創(chuàng)建SparkSession

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local[2]")

.appName("DS2RDD演示")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

import sparkSession.implicits._

import org.apache.spark.sql.functions._

val sparkContext: SparkContext = sparkSession.sparkContext

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

/**

* 通過spark streaming讀取數(shù)據(jù)得到一個DS

*/

//hello hello world java hello hadoop

val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

//in_rdd:hello hello world java hello hadoop

//out_rdd:

// hello 3

// world 1

// java 1

// hadoop 1

val resDS: DStream[(String, Long)] = linesDS.transform((rdd: RDD[String]) => {

/**

* rdd處理之后會得到一個新的rdd進行返回

*/

// val resultRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" "))

// .map((_, 1))

// .groupBy(_._1)

// .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))

// resultRDD

//def toDF(colNames: String*)

val linesDF: DataFrame = rdd.toDF("line")

linesDF.createOrReplaceTempView("words")

val resRDD: RDD[(String, Long)] = sparkSession.sql(

"""

|select

|t1.word as word,

|count(1) as number

|from

|(select

| explode(split(line,' ')) as word

|from

|words) t1

|group by t1.word

|""".stripMargin)

.rdd

.map((row: Row) => {

(row.getAs[String](0), row.getAs[Long](1))

})

resRDD

})

resDS.print()

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo6Submit

package com.shujia.streaming

import org.apache.spark.SparkContext

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.dstream.ReceiverInputDStream

import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo6Submit {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.appName("提交命令執(zhí)行")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

linesDS

.flatMap(_.split(" "))

.map((_,1))

.reduceByKey(_+_)

.print()

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo7SaveToFile

package com.shujia.streaming

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo7SaveToFile {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local[2]")

.appName("提交命令執(zhí)行")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

val resultDS: DStream[(String, Int)] = linesDS

.flatMap(_.split(" "))

.map((_, 1))

.reduceByKey(_ + _)

.transform((rdd:RDD[(String,Int)])=>{

println("=======================")

println("正在處理批次數(shù)據(jù)")

rdd

})

//目標(biāo)路徑是一個文件夾,文件的名字系統(tǒng)生成的,自己可以指定后綴

//每一批次計算結(jié)果都會生成一個結(jié)果文件,滾動生成的

resultDS.saveAsTextFiles("spark/data/streams/stream","txt")

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo8SaveToMysql

package com.shujia.streaming

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.{Durations, StreamingContext}

import org.apache.spark.streaming.dstream.ReceiverInputDStream

import java.sql.{Connection, DriverManager, PreparedStatement}

object Demo8SaveToMysql {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local[2]")

.appName("提交命令執(zhí)行")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

val linesDS: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)

linesDS.foreachRDD((rdd:RDD[String])=>{

println("------------正在處理一批數(shù)據(jù)-------------------")

println(s"該批次的分區(qū)數(shù):${rdd.getNumPartitions}")

/**

* foreachPartition 每一個分區(qū)處理一次邏輯

*/

rdd.foreachPartition((itr:Iterator[String])=>{

println("------------數(shù)加 防偽碼-------------------")

//創(chuàng)建與數(shù)據(jù)庫的連接對象

//1、注冊驅(qū)動

Class.forName("com.mysql.jdbc.Driver")

//2、獲取數(shù)據(jù)庫連接對象

val conn: Connection = DriverManager.getConnection(

"jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false",

"root",

"123456"

)

//獲取數(shù)據(jù)庫操作對象

val ps: PreparedStatement = conn.prepareStatement("insert into student values(?,?,?,?,?)")

itr.foreach((line:String)=>{

println("....正在處理一條數(shù)據(jù)....")

//1500100019,婁曦之,24,男,理科三班

val infos: Array[String] = line.split(",")

val id: Int = infos(0).toInt

val name: String = infos(1)

val age: Int = infos(2).toInt

val gender: String = infos(3)

val clazz: String = infos(4)

//給預(yù)編譯對象傳參

ps.setInt(1,id)

ps.setString(2,name)

ps.setInt(3,age)

ps.setString(4,gender)

ps.setString(5,clazz)

//執(zhí)行sql語句

ps.executeUpdate()

})

//釋放資源

ps.close()

conn.close()

println()

})

})

streamingContext.start()

streamingContext.awaitTermination()

streamingContext.stop()

}

}

Demo9CardCondition

package com.shujia.streaming

import com.alibaba.fastjson.{JSON, JSONObject}

import com.alibaba.fastjson.serializer.JSONObjectCodec

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Durations, StreamingContext}

import java.lang

import java.sql.{Connection, DriverManager, PreparedStatement}

import java.text.SimpleDateFormat

import java.util.Date

object Demo9CardCondition {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local[2]")

.appName("提交命令執(zhí)行")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val sparkStreaming: StreamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

/**

*

* 需求:統(tǒng)計卡口流量的情況

* 讀取卡口過車的監(jiān)控數(shù)據(jù)

* {"car":"皖A(yù)9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}

*/

val InfoDS: ReceiverInputDStream[String] = sparkStreaming.socketTextStream("master", 12345)

/**

* 方式1:在數(shù)據(jù)源的產(chǎn)生的時候,設(shè)置窗口

* 統(tǒng)計最近的15s之內(nèi)的卡口情況,每5秒統(tǒng)計一次

*/

//設(shè)置滑動窗口

val carJsonDS: DStream[String] = InfoDS.window(Durations.seconds(15), Durations.seconds(5))

/**

* 1、解析接收到的json數(shù)據(jù)

* fastjson第三方工具處理

*

* json中值如果被雙引號括起來的是一個字符串,若沒有切是一個整數(shù),一般都用long來接收,小數(shù)一般用double

*

*/

val cardAndSpeedDS: DStream[(Long, (Double, Int))] = carJsonDS.map((line: String) => {

//使用fastjson將字符串轉(zhuǎn)成Json對象

val jSONObject: JSONObject = JSON.parseObject(line)

//根據(jù)需求,只需要獲取卡口的值和車速

val cardId: Long = jSONObject.getLong("card")

val carSpeed: Double = jSONObject.getDouble("speed")

(cardId, (carSpeed, 1)) //假設(shè)每次產(chǎn)生的數(shù)據(jù)都不是同一輛車

})

/**

* 2、實時統(tǒng)計每個卡口的平均車速和車的數(shù)量

* 方式2:在DS調(diào)用reduceByKeyAndWindow的時候設(shè)置窗口

*

*/

// cardAndSpeedDS.reduceByKeyAndWindow()

val cardConditionDS: DStream[(Long, (Double, Int))] = cardAndSpeedDS.reduceByKey((kv1: (Double, Int), kv2: (Double, Int)) => {

//將同一卡口的1加起來,就得到這個批次中的車的數(shù)量

val carNumber: Int = kv1._2 + kv2._2

//將同一卡口的速度加起來 / 車的數(shù)量 = 這一批次的卡口平均速度

val sumSpeed: Double = kv1._1 + kv2._1

val avgSpeed: Double = sumSpeed / carNumber

(avgSpeed, carNumber)

})

/**

* 將結(jié)果保存到數(shù)據(jù)庫中

*/

cardConditionDS.foreachRDD((rdd: RDD[(Long, (Double, Int))]) => {

rdd.foreachPartition((itr: Iterator[(Long, (Double, Int))]) => {

println("------------數(shù)加 防偽碼-------------------")

//創(chuàng)建與數(shù)據(jù)庫的連接對象

//1、注冊驅(qū)動

Class.forName("com.mysql.jdbc.Driver")

//2、獲取數(shù)據(jù)庫連接對象

val conn: Connection = DriverManager.getConnection(

"jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false",

"root",

"123456"

)

//獲取數(shù)據(jù)庫操作對象

val ps: PreparedStatement = conn.prepareStatement("insert into card_condition values(?,?,?,?)")

//獲取數(shù)據(jù)處理時間

val piCiTime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())

itr.foreach((f: (Long, (Double, Int))) => {

val cardId: Long = f._1

val avgSpeed: Double = f._2._1

val carNumber: Int = f._2._2

ps.setLong(1, cardId)

ps.setDouble(2, avgSpeed)

ps.setInt(3, carNumber)

ps.setString(4, piCiTime)

ps.executeUpdate()

})

//釋放資源

ps.close()

conn.close()

println()

})

})

sparkStreaming.start()

sparkStreaming.awaitTermination()

sparkStreaming.stop()

}

}

4.Spark-opt

Demo1Cache

package com.shujia.opt

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

import org.apache.spark.storage.StorageLevel

object Demo1Cache {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/students.csv")

/**

* 盡量避免使用重復(fù)的RDD,避免了之前所有的RDD重復(fù)計算

*

* rdd

* df

* sql

*

*/

//針對被復(fù)用的rdd進行緩存

// studentsRDD.cache() // 默認是MEMORY_ONLY

studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

/**

* 統(tǒng)計每個班級的人數(shù)

*/

studentsRDD.map((line: String) => (line.split(",")(4), 1))

.reduceByKey(_ + _)

.saveAsTextFile("spark/data/opt/clazz_num")

/**

* 統(tǒng)計每個性別的人數(shù)

*

* 第一次作用用到studentsRDD的時候是原本的數(shù)據(jù)量大小,當(dāng)?shù)诙€作業(yè)也用到studentsRDD數(shù)據(jù)的時候,就去緩存中尋找數(shù)據(jù)

*/

studentsRDD.map((line: String) => (line.split(",")(3), 1))

.reduceByKey(_ + _)

.saveAsTextFile("spark/data/opt/gender_num")

while (true){

}

}

}

Demo2AggregateByKey

package com.shujia.opt

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

object Demo2AggregateByKey {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/students.csv")

val clazzKVRDD: RDD[(String, Int)] = studentsRDD.map((line: String) => (line.split(",")(4), 1))

/**

* reduceByKey: 分組聚合,但是只會在reduce端進行聚合

*

*

* AggregateByKey: 分組聚合,不僅可以設(shè)置reduce端聚合的方式,可以提前的在map端進行聚合,相當(dāng)于預(yù)聚合

* zeroValue:初始值

* seqOp: (U, V) => U:map端的聚合

* combOp: (U, U) => U:reduce端的聚合

*

* 如果當(dāng)map聚合邏輯和reduce聚合邏輯不一樣的時候,需要分別設(shè)置的時候,就可以使用該算子

*

*/

//aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

// clazzKVRDD.aggregateByKey(0)(

// (u: Int, v: Int) => u + v, // 在map端做聚合操作的函數(shù)

// (u1: Int, u2: Int) => u1 + u2 //在reduce端做聚合操作

// ).foreach(println)

//reduceByKey(func: (V, V) => V)

clazzKVRDD.reduceByKey(_ + _).foreach(println)

}

}

Demo3MapPartition

package com.shujia.opt

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

import java.text.SimpleDateFormat

import java.util.Date

object Demo3MapPartition {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

.config("spark.sql.shuffle.partitions", 2)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val dataRDD: RDD[String] = sparkContext.textFile("spark/data/ant_user_low_carbon.txt")

val kvRDD: RDD[(String, String, String)] = dataRDD.mapPartitions((itr: Iterator[String]) => {

itr.map((line: String) => {

val infos: Array[String] = line.split("\t")

(infos(0), infos(1), infos(2))

})

})

// map的邏輯是RDD中的處理每一條數(shù)據(jù)

// val resRDD: RDD[(String, Long, String)] = kvRDD.map((kv: (String, String, String)) => {

// //這句話是是在map中的,所以針對每一個數(shù)據(jù)都要new一個SimpleDateFormat對象

// val sdf = new SimpleDateFormat("yyyy/MM/dd")

// println("----------------創(chuàng)建了一個SimpleDateFormat對象----------------")

//

// val dateObj: Date = sdf.parse(kv._2)

// val ts: Long = dateObj.getTime

// (kv._1, ts, kv._3)

// })

//

// resRDD.foreach(println)

//mapPartitions針對一個分區(qū)的數(shù)據(jù)進行處理

val resRDD2: RDD[(String, Long, String)] = kvRDD.mapPartitions((itr: Iterator[(String, String, String)]) => {

/**

* 將時間字段轉(zhuǎn)成時間戳

*/

val sdf = new SimpleDateFormat("yyyy/MM/dd")

println("----------------創(chuàng)建了一個SimpleDateFormat對象----------------")

itr.map((kv: (String, String, String)) => {

val dateObj: Date = sdf.parse(kv._2)

val ts: Long = dateObj.getTime

(kv._1, ts, kv._3)

})

})

resRDD2.foreach(println)

}

}

Demo4Coalesce

package com.shujia.opt

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

/**

* 重分區(qū)

*

* repartition

*

* coalesce

*

* 面試題:如何修改rdd中的分區(qū),區(qū)別是什么?

*

*/

object Demo4Coalesce {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

// .config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/students.csv") // 1000條數(shù)據(jù)

println(s"studentsRDD的分區(qū)數(shù)量為:${studentsRDD.getNumPartitions}") //一開始RDD中的分區(qū)數(shù)量取決于block塊的數(shù)量

/**

* repartition:對RDD進行重分區(qū),返回一個新的RDD

* repartition可以增加RDD的分區(qū)數(shù)量或者減少RDD的分區(qū)數(shù)量

*

* 增加或者減少分區(qū)是根據(jù)資源而定

* 若資源充足,可以適當(dāng)?shù)脑黾臃謪^(qū),提高task任務(wù)數(shù)量并行度,加快整個spark作業(yè)執(zhí)行

*

* rdd中一個分區(qū)數(shù)據(jù)是否對應(yīng)有且僅有后一個分區(qū)數(shù)據(jù),如果前一個RDD的分區(qū)數(shù)據(jù)對應(yīng)的是后一個RDD中多個分區(qū),那么就是shuffle

* 使用repartition增加分區(qū)會產(chǎn)生shuffle,數(shù)據(jù)重構(gòu),不是按照原來的順序

* 減少分區(qū)也會產(chǎn)生shuffle

*

*/

// val students2RDD: RDD[String] = studentsRDD.repartition(10)

students2RDD.foreach(println)

// println(s"students2RDD的分區(qū)數(shù)量為:${students2RDD.getNumPartitions}")

//

// val students3RDD: RDD[String] = students2RDD.repartition(1)

// println(s"students3RDD的分區(qū)數(shù)量為:${students3RDD.getNumPartitions}")

// students3RDD.foreach(println)

/**

* coalesce 重分區(qū),增加分區(qū)和減少分區(qū)

*

* 傳入兩個值:numPartitions: Int 目標(biāo)分區(qū)數(shù)量, shuffle: Boolean = false 是否產(chǎn)生shuffle

*

* coalesce增加分區(qū),需要產(chǎn)生shuffle

* coalesce減少分區(qū),可以不用shuffle,也可以用。使用場景,如果要合并小文件的話,可以使用coalesce且不產(chǎn)生shuffle

*/

val studentsRDD2: RDD[String] = studentsRDD.coalesce(10, shuffle = true)

println(s"studentsRDD2的分區(qū)數(shù)量為:${studentsRDD2.getNumPartitions}")

studentsRDD2.foreach(println)

val studentRDD3: RDD[String] = studentsRDD2.coalesce(2, shuffle = false)

println(s"studentRDD3的分區(qū)數(shù)量為:${studentRDD3.getNumPartitions}")

studentRDD3.foreach(println)

while (true) {

}

}

}

Demo5Coalesce2

package com.shujia.opt

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

object Demo5Coalesce2 {

def main(args: Array[String]): Unit = {

/**

* 使用coalesce合并小文件

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

// .config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val studentsRDD: RDD[String] = sparkContext.textFile("spark/data/studentsinfo/*") // 1000條數(shù)據(jù)

println(s"studentsRDD的分區(qū)數(shù)量為:${studentsRDD.getNumPartitions}") //一開始RDD中的分區(qū)數(shù)量取決于block塊的數(shù)量

//合并小文件

val students2RDD: RDD[String] = studentsRDD.coalesce(1, shuffle = false)

println(s"students2RDD的分區(qū)數(shù)量為:${students2RDD.getNumPartitions}") //一開始RDD中的分區(qū)數(shù)量取決于block塊的數(shù)量

students2RDD.saveAsTextFile("spark/data/studentsinfo2")

}

}

Demo6MapJoin

package com.shujia.opt

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo6MapJoin {

def main(args: Array[String]): Unit = {

/**

* 使用coalesce合并小文件

*/

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

.config("spark.sql.shuffle.partitions", 1)

.getOrCreate()

val studentsDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")

.load("spark/data/students.csv")

val scoresDF: DataFrame = sparkSession.read

.format("csv")

.option("sep", ",")

.schema("id STRING,subject_id STRING,score INT")

.load("spark/data/score.txt")

/**

* 在local模式下,會對sql進行優(yōu)化,使用廣播變量,思想類似于mapjoin

*

* 放到集群中運行,需要手動設(shè)置廣播的數(shù)據(jù)

* 在DSL代碼中使用語法糖來使用廣播數(shù)據(jù)

*

* 一般來說,大表join小表,小表在DSL函數(shù)中使用hint語法將小表廣播出去 小表一般指的是小于1G的數(shù)據(jù)

*

* 開啟兩個作業(yè)job

* 第一個job:將小表數(shù)據(jù)拉到Driver端,從Driver端廣播出去

* 第二個job:在Executor內(nèi)部與廣播的數(shù)據(jù)進行數(shù)據(jù)關(guān)聯(lián)

*

*/

val resDF: DataFrame = scoresDF.join(studentsDF.hint("broadcast"), "id")

resDF.show()

while (true){

}

}

}

Demo7Kryo

package com.shujia.opt

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession

import org.apache.spark.storage.StorageLevel

/**

* Kryo序列化是spark提供的一種專門適配spark計算時提高性能的一種序列化方式

*

* 序列化可以提高計算的性能,加快計算速度,減少數(shù)據(jù)所占用的內(nèi)存空間

*/

case class Student(id: String, name: String, age: Int, gender: String, clazz: String)

object Demo7Kryo {

def main(args: Array[String]): Unit = {

val sparkSession: SparkSession = SparkSession.builder()

.master("local")

.appName("cache")

.config("spark.sql.shuffle.partitions", 1)

//將序列化方式設(shè)置為Kryo的序列化方式

.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

//自定義一個序列化類,指定要序列化的東西

.config("spark.kryo.registrator", "com.shujia.opt.Demo8KryoRegistrator")

.getOrCreate()

val sparkContext: SparkContext = sparkSession.sparkContext

val studentsRDD: RDD[Student] = sparkContext.textFile("spark/data/students.csv").map((line: String) => {

val infos: Array[String] = line.split(",")

Student(infos(0), infos(1), infos(2).toInt, infos(3), infos(4))

})

/**

* 未做序列化:238.3 KiB

* 使用默認的序列化方式:55.7 KiB

* 使用kryo序列化:43.0 KiB

*

*/

//針對被復(fù)用的rdd進行緩存

// studentsRDD.cache() // 默認是MEMORY_ONLY

// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

// studentsRDD.persist(StorageLevel.MEMORY_ONLY)

studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

/**

* 統(tǒng)計每個班級的人數(shù)

*/

studentsRDD.map((stu: Student) => (stu.clazz, 1))

.reduceByKey(_ + _)

.saveAsTextFile("spark/data/opt/clazz_num")

/**

* 統(tǒng)計每個性別的人數(shù)

*

* 第一次作用用到studentsRDD的時候是原本的數(shù)據(jù)量大小,當(dāng)?shù)诙€作業(yè)也用到studentsRDD數(shù)據(jù)的時候,就去緩存中尋找數(shù)據(jù)

*/

studentsRDD.map((stu: Student) => (stu.gender, 1))

.reduceByKey(_ + _)

.saveAsTextFile("spark/data/opt/gender_num")

while (true) {

}

}

}

Demo8KryoRegistrator

package com.shujia.opt

import com.esotericsoftware.kryo.Kryo

import org.apache.spark.serializer.KryoRegistrator

/**

* 自定義一個序列化類,指定kryo要序列化的東西

*/

class Demo8KryoRegistrator extends KryoRegistrator{

override def registerClasses(kryo: Kryo): Unit = {

/**

* 在這個重寫的方法中指定要序列化的東西

*

*/

kryo.register(classOf[Student])

kryo.register(classOf[String])

kryo.register(classOf[Int])

}

}

Demo9FilterKey

package com.shujia.opt

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo9FilterKey {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()

.setMaster("local")

.setAppName("app")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("spark/data/ws/*")

println("第一個RDD分區(qū)數(shù)量:" + lines.getNumPartitions)

val countRDD: RDD[(String, Int)] = lines

.flatMap(_.split("\\|"))

.map((_, 1))

.groupByKey() // 這里會產(chǎn)生shuffle

.map((x: (String, Iterable[Int])) => (x._1, x._2.toList.sum))

println("聚合之后RDD分區(qū)的數(shù)量" + countRDD.getNumPartitions)

// countRDD.foreach(println)

/**

* 采樣key ,g過濾掉導(dǎo)致數(shù)據(jù)傾斜并且對業(yè)務(wù)影響不大的key

*

*/

val wordRDD: RDD[(String, Int)] = lines

.flatMap(_.split(","))

.map((_, 1))

//

// val top1: Array[(String, Int)] = wordRDD

// .sample(true, 0.1)

// .reduceByKey(_ + _)

// .sortBy(-_._2)

// .take(1)

//導(dǎo)致數(shù)據(jù)傾斜額key

// val key: String = top1(0)._1

//過濾導(dǎo)致傾斜的key

wordRDD

.filter(t => !"null".equals(t._1))

.groupByKey()

.map(x => (x._1, x._2.toList.sum))

.foreach(println)

while (true) {

}

}

}

Demo10DoubleReduce

package com.shujia.opt

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object Demo10DoubleReduce {

/**

* 雙重聚合

* 一般適用于 業(yè)務(wù)不復(fù)雜的情況

*

*/

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("app")

val sc: SparkContext = new SparkContext(conf)

val lines: RDD[String] = sc.textFile("data/word")

val wordRDD: RDD[String] = lines

.flatMap(_.split(","))

.filter(!_.equals(""))

/* wordRDD

.map((_, 1))

.groupByKey()

.map(kv => (kv._1, kv._2.size))

.foreach(println)*/

//預(yù)聚合可以避免數(shù)據(jù)傾斜

/* wordRDD

.map((_, 1))

.reduceByKey(_ + _)

.foreach(println)*/

// 對每一個key打上隨機5以內(nèi)前綴

wordRDD

.map(word => {

val pix: Int = Random.nextInt(5)

(pix + "-" + word, 1)

})

.groupByKey() //第一次聚合

.map(t => (t._1, t._2.toList.sum))

.map(t => {

///去掉隨機前綴

(t._1.split("-")(1), t._2)

})

.groupByKey() //第二次聚合

.map(t => (t._1, t._2.toList.sum))

.foreach(println)

while (true) {

}

}

}

Demo11DoubleJoin

package com.shujia.opt

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo11DoubleJoin {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("app").setMaster("local")

val sc = new SparkContext(conf)

val dataList1 = List(

("java", 1),

("shujia", 2),

("shujia", 3),

("shujia", 1),

("shujia", 1))

val dataList2 = List(

("java", 100),

("java", 99),

("shujia", 88),

("shujia", 66))

val RDD1: RDD[(String, Int)] = sc.parallelize(dataList1)

val RDD2: RDD[(String, Int)] = sc.parallelize(dataList2)

//采樣傾斜的key

val sampleRDD: RDD[(String, Int)] = RDD1.sample(false, 1.0)

//skewedKey 導(dǎo)致數(shù)據(jù)傾斜的key shujia

val skewedKey: String = sampleRDD.map(x => (x._1, 1))

.reduceByKey(_ + _)

.map(x => (x._2, x._1))

.sortByKey(ascending = false)

.take(1)(0)._2

//導(dǎo)致數(shù)據(jù)傾斜key的RDD

val skewedRDD1: RDD[(String, Int)] = RDD1.filter(tuple => {

tuple._1.equals(skewedKey)

})

//沒有傾斜的key

val commonRDD1: RDD[(String, Int)] = RDD1.filter(tuple => {

!tuple._1.equals(skewedKey)

})

val skewedRDD2: RDD[(String, Int)] = RDD2.filter(tuple => {

tuple._1.equals(skewedKey)

})

val commonRDD2: RDD[(String, Int)] = RDD2.filter(tuple => {

!tuple._1.equals(skewedKey)

})

val n = 2

//對產(chǎn)生數(shù)據(jù)傾斜的key 使用mapjoin

val skewedMap: Map[String, Int] = skewedRDD2.collect().toMap

val bro: Broadcast[Map[String, Int]] = sc.broadcast(skewedMap)

val resultRDD1: RDD[(String, (Int, Int))] = skewedRDD1.map(kv => {

val word: String = kv._1

val i: Int = bro.value.getOrElse(word, 0)

(word, (kv._2, i))

})

//沒有數(shù)據(jù)傾斜的RDD 正常join

val resultRDD2: RDD[(String, (Int, Int))] = commonRDD1.join(commonRDD2)

//將兩個結(jié)果拼接

resultRDD1.union(resultRDD2)

.foreach(println)

}

}

柚子快報邀請碼778899分享:大數(shù)據(jù)學(xué)習(xí)-Spark

http://yzkb.51969.com/

文章來源

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19107278.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄