柚子快報邀請碼778899分享:大數(shù)據(jù) spark方法總結(jié)
目錄
一、Spark是什么
二、Spark的特點
1、速度快
2、易用性
3、通用性
4、兼容性
三、什么是RDD,寬依賴、窄依賴
四、定義與使用數(shù)組
(1).scala定義一個數(shù)組的方法
(2).數(shù)組常用的方法?
(3).定義與使用函數(shù)
(4).集合操作常用方法:
(5).用函數(shù)組合器
map()方法
foreach()方法
filter()方法
flatten()方法
flatMap()方法
groupBy()方法
五、創(chuàng)建RDD
從內(nèi)存中讀取數(shù)據(jù)創(chuàng)建RDD
(1).parallelize()
(2).makeRDD()
?從外部存儲系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD
(1)通過HDFS文件創(chuàng)建RDD
(2)通過Linux本地文件創(chuàng)建RDD
六、操作方法?
1.map()方法?
2.sortBy()?方法:
3.collect()?方法
?編輯?4.flatMap()方法
5.take()方法?
七、轉(zhuǎn)換操作和行動操作?
轉(zhuǎn)換操作
(1)union()方法
(2)filter()方法?
(3)distinct()方法?
(4)intersection()?方法?
(5)subtract()方法?
(6)cartesian()方法?
(7)其他方法?
行動操作?
?八、創(chuàng)建鍵值對RDD的方法
1.reduceByKey()方法
2.groupByKey()方法?
3.combineByKey()方法
九、RDD連接方法
(1)join()方法
(2)rightOuterJoin()方法?
(3)leftOuterJoin()方法?
(4)fullOuterJoin()方法?
(5)?zip()方法
(6)combineByKey()方法
(7)lookup()方法
十、查看DataFrame數(shù)據(jù)?
1.?printSchema:輸出數(shù)據(jù)模式
2.?show():查看數(shù)據(jù)??
?3.first()/head()/take()/takeAsList():獲取若干條記錄
4.collect()/collectAsList():獲取所有數(shù)據(jù)
十一、將DataFrame注冊成為臨時表,然后通過SQL語句進行查詢
1.where()/filter()方法?
a.where()方法
b.filter()方法
2.select()/selectExpr()/col()/apply()方法
a.select()方法:獲取指定字段值
b.selectExpr()方法:對指定字段進行特殊處理
c.col()/apply()方法
一、Spark是什么
Spark 是一種基于內(nèi)存的快速、通用、可擴展的大數(shù)據(jù)分析計算引擎。主要用于數(shù)據(jù)計算,經(jīng)常被認為是Hadoop框架的升級版。
二、Spark的特點
1、速度快
小生根據(jù)官方數(shù)據(jù)統(tǒng)計,與Hadoop相比,Spark基于內(nèi)存的運算效率要快100倍以上,基于硬盤的運算效率也要快10倍以上。Spark實現(xiàn)了高效的DAG執(zhí)行引擎,能夠通過內(nèi)存計算高效地處理數(shù)據(jù)流。
2、易用性
Spark編程支持Java、Python、Scala及R語言,并且還擁有超過80種高級算法,除此之外,Spark還支持交互式的Shell操作,開發(fā)人員可以方便地在Shell客戶端中使用Spark集群解決問題。
3、通用性
Spark提供了統(tǒng)一的解決方案,適用于批處理、交互式查詢(SparkSQL)、實時流處理(SparkStreaming)、機器學(xué)習(SparkMLlib)和圖計算(GraphX),它們可以在同一個應(yīng)用程序中無縫地結(jié)合使用,大大減少大數(shù)據(jù)開發(fā)和維護的人力成本和部署平臺的物力成本。
4、兼容性
Spark開發(fā)容pSpark可以運行在Hadoop模式、Mesos模式、Standalone獨立模式或Cloud中,并且還可以訪問各種數(shù)據(jù)源,包括本地文件系統(tǒng)、HDFS、Cassandra、HBase和Hive等。
三、什么是RDD,寬依賴、窄依賴
RDD:是一個容錯的、只讀的、可進行并行操作的數(shù)據(jù)結(jié)構(gòu),是一個分布在集群各個節(jié)點中的存放元素的集合。RDD的創(chuàng)建有3種不同的方法。 第一種是將程序中已存在的Seq集合(如集合、列表、數(shù)組)轉(zhuǎn)換成RDD。 第二種是對已有RDD進行轉(zhuǎn)換得到新的RDD,這兩種方法都是通過內(nèi)存中已有的集合創(chuàng)建RDD的。 第三種是直接讀取外部存儲系統(tǒng)的數(shù)據(jù)創(chuàng)建RDD。
窄依賴:表現(xiàn)為一個父RDD的分區(qū)對應(yīng)于一個子RDD的分區(qū)或者多個父RDD的分區(qū)對應(yīng)于一個子RDD的分區(qū)。
寬依賴:表現(xiàn)為存在一個父RDD的一個分區(qū)對應(yīng)一個子RDD的多個分區(qū)。
四、定義與使用數(shù)組
數(shù)組是Scala中常用的一種數(shù)據(jù)結(jié)構(gòu),數(shù)組是一種存儲了相同類型元素的固定大小的順序集合。
(1).scala定義一個數(shù)組的方法
(2).數(shù)組常用的方法?
擴展:Scala可以使用range()方法創(chuàng)建區(qū)間數(shù)組?
(3).定義與使用函數(shù)
函數(shù)是Scala的重要組成部分,Scala作為支持函數(shù)式編程的語言,可以將函數(shù)作為對象.
定義函數(shù)的語法格式如下:
匿名函數(shù):
匿名函數(shù)即在定義函數(shù)時不給出函數(shù)名的函數(shù)。Scala中匿名函數(shù)是使用箭頭“=>”定義的,箭頭的左邊是參數(shù)列表,箭頭的右邊是表達式,表達式將產(chǎn)生函數(shù)的結(jié)果。通??梢詫⒛涿瘮?shù)賦值給一個常量或變量,再通過常量名或變量名調(diào)用該函數(shù)。若函數(shù)中的每個參數(shù)在函數(shù)中最多只出現(xiàn)一次,則可以使用占位符“_”代替參數(shù)。?
?Scala中常用的查看列表元素的方法有head、init、last、tail和take()。 ? ? ? ? ? ? ? ? ?head:查看列表的第一個元素。
? ? ? ? ? ? ? ? ?tail:查看第一個元素之后的其余元素。
? ? ? ? ? ? ? ? ?last:查看列表的最后一個元素。
? ? ? ? ? ? ? ? ?Init:查看除最后一個元素外的所有元素。
? ? ? ? ? ? ? ? ?take():查看列表前n個元素。
合并兩個列表還可以使用concat()方法。 用戶可以使用contains()方法判斷列表中是否包含某個元素,若列表中存在指定的元素則返回true,否則返回false。
Scala Set(集合)是沒有重復(fù)的對象集合,所有的元素都是唯一的。
(4).集合操作常用方法:
Scala合并兩個列表時使用的是:::()或concat()方法,而合并兩個集合使用的是++()方法。
(5).用函數(shù)組合器
map()方法
可通過一個函數(shù)重新計算列表中的所有元素,并且返回一個包含相同數(shù)目元素的新列表。
foreach()方法
和map()方法類似,但是foreach()方法沒有返回值,只用于對參數(shù)的結(jié)果進行輸出。?
filter()方法
可以移除傳入函數(shù)的返回值為false的元素?
flatten()方法
可以將嵌套的結(jié)構(gòu)展開,即flatten()方法可以將一個二維的列表展開成一個一維的列表。?
flatMap()方法
結(jié)合了map()方法和flatten()方法的功能,接收一個可以處理嵌套列表的函數(shù),再對返回結(jié)果進行連接。?
groupBy()方法
可對集合中的元素進行分組操作,返回的結(jié)果是一個映射。?
五、創(chuàng)建RDD
從內(nèi)存中讀取數(shù)據(jù)創(chuàng)建RDD
(1).parallelize()
parallelize()方法有兩個輸入?yún)?shù),說明如下:
要轉(zhuǎn)化的集合,必須是Seq集合。Seq表示序列,指的是一類具有一定長度的、可迭代訪問的對象,其中每個數(shù)據(jù)元素均帶有一個從0開始的、固定的索引。分區(qū)數(shù)。若不設(shè)分區(qū)數(shù),則RDD的分區(qū)數(shù)默認為該程序分配到的資源的CPU核心數(shù)。
可以使用SparkContext的parallelize方法將一個已有的集合轉(zhuǎn)換為RDD。
基本語法:
parallelize(collection, numSlices=None)
示例:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local")
val sc = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
rdd.foreach(println)
sc.stop()
效果展示:
(2).makeRDD()
makeRDD()方法有兩種使用方式:
?第一種方式的使用與parallelize()方法一致;?第二種方式是通過接收一個是Seq[(T,Seq[String])]參數(shù)類型創(chuàng)建RDD。
?從外部存儲系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD
(1)通過HDFS文件創(chuàng)建RDD
直接通過textFile()方法讀取HDFS文件的位置即可。
(2)通過Linux本地文件創(chuàng)建RDD
本地文件的讀取也是通過sc.textFile("路徑")的方法實現(xiàn)的,在路徑前面加上“file://”表示從Linux本地文件系統(tǒng)讀取。在IntelliJ IDEA開發(fā)環(huán)境中可以直接讀取本地文件;但在spark-shell中,要求在所有節(jié)點的相同位置保存該文件才可以讀取它.
示例:
六、操作方法?
1.map()方法?
是一種基礎(chǔ)的RDD轉(zhuǎn)換操作,可以對RDD中的每一個數(shù)據(jù)元素通過某種函數(shù)進行轉(zhuǎn)換并返回新的RDD。map()?方法用于對集合(如列表、數(shù)組、映射等)中的每個元素應(yīng)用一個函數(shù),并返回結(jié)果的新集合。
示例:
val list = List(1, 2, 3, 4, 5)
val incremented = list.map(x => x + 1)
2.sortBy()?方法:
?用于對標準RDD進行排序,有3個可輸入?yún)?shù)
第1個參數(shù)是一個函數(shù)f:(T) => K,左邊是要被排序?qū)ο笾械拿恳粋€元素,右邊返回的值是元素中要進行排序的值。第2個參數(shù)是ascending,決定排序后RDD中的元素是升序的還是降序的,默認是true,即升序排序,如果需要降序排序那么需要將參數(shù)的值設(shè)置為false。第3個參數(shù)是numPartitions,決定排序后的RDD的分區(qū)個數(shù),默認排序后的分區(qū)個數(shù)和排序之前的分區(qū)個數(shù)相等,即this.partitions.size。第一個參數(shù)是必須輸入的,而后面的兩個參數(shù)可以不輸入
示例:
val list = List(3, 1, 4, 1, 5, 9, 2, 6)
val sortedList = list.sortBy(x => x)
// sortedList: List[Int] = List(1, 1, 2, 3, 4, 5, 6, 9)
3.collect()?方法
是一種行動操作,可以將RDD中所有元素轉(zhuǎn)換成數(shù)組并返回到Driver端,適用于返回處理后的少量數(shù)據(jù)。
因為需要從集群各個節(jié)點收集數(shù)據(jù)到本地,經(jīng)過網(wǎng)絡(luò)傳輸,并且加載到Driver內(nèi)存中,所以如果數(shù)據(jù)量比較大,會給網(wǎng)絡(luò)傳輸造成很大的壓力。 因此,數(shù)據(jù)量較大時,盡量不使用collect()方法,否則可能導(dǎo)致Driver端出現(xiàn)內(nèi)存溢出問題。
collect()方法有以下兩種操作方式:
1.collect:直接調(diào)用collect返回該RDD中的所有元素,返回類型是一個Array[T]數(shù)組。?
2.collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。這種方式需要提供一個標準的偏函數(shù),將元素保存至一個RDD中。首先定義一個函數(shù)one,用于將collect方法得到的數(shù)組中數(shù)值為1的值替換為“one”,將其他值替換為“other”。
?4.flatMap()方法
將函數(shù)參數(shù)應(yīng)用于RDD之中的每一個元素,將返回的迭代器(如數(shù)組、列表等)中的所有元素構(gòu)成新的RDD。
示例:
5.take()方法?
用于獲取RDD的前N個元素,返回數(shù)據(jù)為數(shù)組
take()與collect()方法的原理相似,collect()方法用于獲取全部數(shù)據(jù),take()方法獲取指定個數(shù)的數(shù)據(jù)。
示例:獲取RDD的前5個元素
七、轉(zhuǎn)換操作和行動操作?
對于RDD有兩種計算方式: 轉(zhuǎn)換操作(返回值還是一個RDD)---也叫懶操作,不是立即執(zhí)行 執(zhí)行操作(返回值不是一個RDD)---立即執(zhí)行
轉(zhuǎn)換操作
(1)union()方法
用于將兩個RDD合并成一個,不進行去重操作,而且兩個RDD中每個元素中的值的個數(shù)、數(shù)據(jù)類型需要保持一致。
示例:
(2)filter()方法?
用于過濾RDD中的元素
filter()方法需要一個參數(shù),這個參數(shù)是一個用于過濾的函數(shù),該函數(shù)的返回值為Boolean類型。 filter()方法將返回值為true的元素保留,將返回值為false的元素過濾掉,最后返回一個存儲符合過濾條件的所有元素的新RDD。
示例:創(chuàng)建一個RDD,并且過濾掉每個元組第二個值小于等于1的元素。
(3)distinct()方法?
用于RDD的數(shù)據(jù)去重,去除兩個完全相同的元素,沒有參數(shù)。
示例:創(chuàng)建一個帶有重復(fù)數(shù)據(jù)的RDD,并使用distinct()方法去重。
(4)intersection()?方法?
用于求出兩個RDD的共同元素,即找出兩個RDD的交集,參數(shù)是另一個RDD,先后順序與結(jié)果無關(guān)。
示例:創(chuàng)建兩個RDD,其中有相同的元素,通過intersection()方法求出兩個RDD的交集
(5)subtract()方法?
用于將前一個RDD中在后一個RDD出現(xiàn)的元素刪除,可以認為是求補集的操作,返回值為前一個RDD去除與后一個RDD相同元素后的剩余值所組成的新的RDD。兩個RDD的順序會影響結(jié)果。
示例:創(chuàng)建兩個RDD,分別為rdd1和rdd2,包含相同元素和不同元素,通過subtract()方法求rdd1和rdd2彼此的補集。
(6)cartesian()方法?
可將兩個集合的元素兩兩組合成一組,即求笛卡兒積。
示例:創(chuàng)建兩個RDD,分別有4個元素,通過cartesian()方法求兩個RDD的笛卡兒積。
(7)其他方法?
行動操作?
執(zhí)行操作會返回結(jié)果或把RDD數(shù)據(jù)寫到存儲系統(tǒng)中。Actions是觸發(fā)Spark啟動計算的動因。
?八、創(chuàng)建鍵值對RDD的方法
1.reduceByKey()方法
用于合并具有相同鍵的值,作用對象是鍵值對,并且只對每個鍵的值進行處理,當RDD中有多個鍵相同的鍵值對時,則會對每個鍵對應(yīng)的值進行處理。
示例:
2.groupByKey()方法?
用于對具有相同鍵的值進行分組,可以對同一組的數(shù)據(jù)進行計數(shù)、求和等操作。 對于一個由類型K的鍵和類型V的值組成的RDD,通過groupByKey()方法得到的RDD類型是[K,Iterable[V]]。
3.combineByKey()方法
用于執(zhí)行基于鍵的聚合操作的高級轉(zhuǎn)換函數(shù)之一。它提供了一種靈活的方式來對每個鍵的值進行聚合,而不需要事先進行預(yù)先聚合或排序。
import org.apache.spark.{SparkConf, SparkContext}
object CombineByKeyExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CombineByKeyExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 創(chuàng)建一個包含鍵值對的RDD
val rdd = sc.parallelize(Seq(("apple", 3), ("banana", 5), ("apple", 7), ("banana", 2), ("orange", 1)))
// 使用combineByKey方法進行基于鍵的聚合操作
val aggregatedRDD = rdd.combineByKey(
createCombiner = (v: Int) => (v, 1), // 初始化值為(v, 1),其中v是值,1表示計數(shù)
mergeValue = (acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), // 將新值合并到已存在的聚合值中,并更新計數(shù)
mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 合并不同分區(qū)的聚合值,并更新計數(shù)
)
// 打印結(jié)果
aggregatedRDD.collect().foreach(println)
sc.stop()
}
}
?
九、RDD連接方法
(1)join()方法
用于根據(jù)鍵對兩個RDD進行內(nèi)連接,將兩個RDD中鍵相同的數(shù)據(jù)的值存放在一個元組中,最后只返回兩個RDD中都存在的鍵的連接結(jié)果。
例如,在兩個RDD中分別有鍵值對(K,V)和(K,W),通過join()方法連接會返回(K,(V,W))。
示例:創(chuàng)建兩個RDD,含有相同鍵和不同的鍵,通過join()方法進行內(nèi)連接。
(2)rightOuterJoin()方法?
用于根據(jù)鍵對兩個RDD進行右外連接,連接結(jié)果是右邊RDD的所有鍵的連接結(jié)果,不管這些鍵在左邊RDD中是否存在。
在rightOuterJoin()方法中,如果在左邊RDD中有對應(yīng)的鍵,那么連接結(jié)果中值顯示為Some類型值;如果沒有,那么顯示為None值。
示例:
(3)leftOuterJoin()方法?
用于根據(jù)鍵對兩個RDD進行左外連接,與rightOuterJoin()方法相反,返回結(jié)果保留左邊RDD的所有鍵。
示例:
(4)fullOuterJoin()方法?
用于對兩個RDD進行全外連接,保留兩個RDD中所有鍵的連接結(jié)果。
示例:
?
(5)?zip()方法
用于將兩個RDD組合成鍵值對RDD,要求兩個RDD的分區(qū)數(shù)量以及元素數(shù)量相同,否則會拋出異常。 將兩個RDD組合成Key/Value形式的RDD,這里要求兩個RDD的partition數(shù)量以及元素數(shù)量都相同,否則會拋出異常。
(6)combineByKey()方法
合并相同鍵的值 ,是Spark中一個比較核心的高級方法,鍵值對的其他一些高級方法底層均是使用combineByKey()方法實現(xiàn)的,如groupByKey()方法、reduceByKey()方法等。
combineByKey()方法用于將鍵相同的數(shù)據(jù)聚合,并且允許返回類型與輸入數(shù)據(jù)的類型不同的返回值。
combineByKey()方法的使用方式如下:
combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)
由于合并操作會遍歷分區(qū)中所有的元素,因此每個元素(這里指的是鍵值對)的鍵只有兩種情況:以前沒出現(xiàn)過或以前出現(xiàn)過。對于這兩種情況,3個參數(shù)的執(zhí)行情況描述如下。如果以前沒出現(xiàn)過,則執(zhí)行的是createCombiner()方法,createCombiner()方法會在新遇到的鍵對應(yīng)的累加器中賦予初始值,否則執(zhí)行mergeValue()方法。對于已經(jīng)出現(xiàn)過的鍵,調(diào)用mergeValue()方法進行合并操作,對該鍵的累加器對應(yīng)的當前值(C)與新值(V)進行合并。由于每個分區(qū)都是獨立處理的,因此對于同一個鍵可以有多個累加器。如果有兩個或更多的分區(qū)都有對應(yīng)同一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法對各個分區(qū)的結(jié)果(全是C)進行合并。?
(7)lookup()方法
查找指定鍵的值?,用于鍵值對RDD,返回指定鍵的所有值。lookup()方法查找指定鍵的值
十、查看DataFrame數(shù)據(jù)?
查看及獲取數(shù)據(jù)的常用函數(shù)或方法
將movies.dat電影數(shù)據(jù)上傳至HDFS中,加載數(shù)據(jù)為RDD并將其轉(zhuǎn)換為DataFrame.?
1.?printSchema:輸出數(shù)據(jù)模式
printSchema函數(shù)查看數(shù)據(jù)模式,打印出列的名稱和類型
2.?show():查看數(shù)據(jù)??
show()方法與show(true)方法一樣,只顯示前20條記錄并且最多只顯示20個字符
若是要顯示所有字符,需要使用show(false)方法
?3.first()/head()/take()/takeAsList():獲取若干條記錄
4.collect()/collectAsList():獲取所有數(shù)據(jù)
collect方法可以將DataFrame中的所有數(shù)據(jù)都獲取到,并返回一個數(shù)組。
collectAsList方法可以獲取所有數(shù)據(jù),返回一個列表。
十一、將DataFrame注冊成為臨時表,然后通過SQL語句進行查詢
直接在DataFrame對象上進行查詢,DataFrame提供了很多查詢的方法
1.where()/filter()方法?
a.where()方法
DataFrame可以使用where(conditionExpr: String)根據(jù)指定條件進行查詢
參數(shù)中可以使用and或or
該方法的返回結(jié)果仍然為DataFrame類型
b.filter()方法
DataFrame還可使用filter篩選符合條件的數(shù)據(jù)
2.select()/selectExpr()/col()/apply()方法
a.select()方法:獲取指定字段值
select方法根據(jù)傳入的string類型字段名,獲取指定字段的值,以DataFrame類型返回
b.selectExpr()方法:對指定字段進行特殊處理
selectExpr:對指定字段進行特殊處理,可以對指定字段調(diào)用UDF函數(shù)或者指定別名。
selectExpr傳入String類型的參數(shù),返回DataFrame對象。
c.col()/apply()方法
col或者apply也可以獲取DataFrame指定字段
col或者apply只能獲取一個字段,并且返回對象為Column類型
3.orderBy()/sort()方法
orderBy()方法用于根據(jù)指定字段對數(shù)據(jù)進行排序,默認為升序排序。若要求降序排序, orderBy(方法的參數(shù)可以使用“desc("字段段名稱")”或“$"字段名稱".desc”,也可以在指定字段前面加“-”。使用orderBy()方法根據(jù)u serId 字段對 user對象進行降序排序,如代碼下:
# 使用 orderBy()方法根據(jù) userId字段對寸user 對象進行降序排序
val userOrderBy = user.orderBy(deesc("userId")) val ? ?userOrderBy = ?user.orderBy($" userId".desc) ? val userOrderBy =u iser.orderBy(-user("userId"))
柚子快報邀請碼778899分享:大數(shù)據(jù) spark方法總結(jié)
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。