柚子快報(bào)邀請碼778899分享:大數(shù)據(jù) Hive 主要內(nèi)容一覽
柚子快報(bào)邀請碼778899分享:大數(shù)據(jù) Hive 主要內(nèi)容一覽
Hive架構(gòu)
用戶接口:Client
CLI(command-line interface)、JDBC/ODBC(jdbc訪問hive)
元數(shù)據(jù):Metastore
元數(shù)據(jù)包括:表名、表所屬的數(shù)據(jù)庫(默認(rèn)是default)、表的擁有者、列/分區(qū)字段、表的類型(是否是外部表)、表的數(shù)據(jù)所在目錄等; 默認(rèn)存儲(chǔ)在自帶的derby數(shù)據(jù)庫中,推薦使用MySQL存儲(chǔ)Metastore。
Hadoop
使用HDFS進(jìn)行存儲(chǔ),使用MapReduce進(jìn)行計(jì)算。
驅(qū)動(dòng)器:Driver
(1)解析器(SQL Parser):將SQL字符串轉(zhuǎn)換成抽象語法樹AST,這一步一般都用第三方工具庫完成,比如antlr;對AST進(jìn)行語法分析,比如表是否存在、字段是否存在、SQL語義是否有誤。
(2)編譯器(Physical Plan):將AST編譯生成邏輯執(zhí)行計(jì)劃。
(3)優(yōu)化器(Query Optimizer):對邏輯執(zhí)行計(jì)劃進(jìn)行優(yōu)化。
(4)執(zhí)行器(Execution):把邏輯執(zhí)行計(jì)劃轉(zhuǎn)換成可以運(yùn)行的物理計(jì)劃。對于Hive來說,就是MR/Spark。
Hive運(yùn)行原理
Hive通過給用戶提供的一系列交互接口,接收到用戶的指令(SQL),使用自己的Driver,結(jié)合元數(shù)據(jù)(MetaStore),將這些指令翻譯成MapReduce,提交到Hadoop中執(zhí)行,最后,將執(zhí)行返回的結(jié)果輸出到用戶交互接口。
其實(shí),還可以這樣理解:Hive要做的就是將SQL翻譯成MapReduce程序代碼。實(shí)際上,Hive內(nèi)置了很多Operator,每個(gè)Operator完成一個(gè)特定的計(jì)算過程,Hive將這些Operator構(gòu)造成一個(gè)有向無環(huán)圖DAG,然后根據(jù)這些Operator之間是否存在shuffle將其封裝到map或者reduce函數(shù)中,之后就可以提交給MapReduce執(zhí)行了。
內(nèi)部表與外部表
不同點(diǎn)
1 外部表不會(huì)加載數(shù)據(jù)到Hive,減少數(shù)據(jù)傳輸、數(shù)據(jù)還能共享。
共享的理解就是:當(dāng)我們刪除一個(gè)內(nèi)部表時(shí),Hive 也會(huì)刪除這個(gè)表中數(shù)據(jù)。內(nèi)部表不適合和其他工具共享數(shù)據(jù)。
2 Hive創(chuàng)建內(nèi)部表時(shí),會(huì)將數(shù)據(jù)移動(dòng)到數(shù)據(jù)倉庫指向的路徑。
創(chuàng)建外部表時(shí),僅記錄數(shù)據(jù)所在的路徑,不對數(shù)據(jù)的位置做任何改變。
在刪除表的時(shí)候,內(nèi)部表的元數(shù)據(jù)和數(shù)據(jù)會(huì)被一起刪除,而外部表只刪除元數(shù)據(jù),不刪除數(shù)據(jù)。這樣外部表相對來說更加安全些,數(shù)據(jù)組織也更加靈活,方便共享源數(shù)據(jù)。
場景選擇
在公司中絕大多數(shù)場景都是外部表。
自己使用的臨時(shí)表,才會(huì)創(chuàng)建內(nèi)部表。
Hive分區(qū)與分桶
Hive分區(qū)
是按照數(shù)據(jù)表的某列或者某些列分為多區(qū),在hive存儲(chǔ)上是hdfs文件,也就是文件夾形式。現(xiàn)在最常用的跑T+1數(shù)據(jù),按當(dāng)天時(shí)間分區(qū)的較多。
把每天通過sqoop或者datax拉取的一天的數(shù)據(jù)存儲(chǔ)一個(gè)區(qū),也就是所謂的文件夾與文件。在查詢時(shí)只要指定分區(qū)字段的值就可以直接從該分區(qū)查找即可。創(chuàng)建分區(qū)表的時(shí)候,要通過關(guān)鍵字 partitioned by (column name ?string)聲明該表是分區(qū)表,并且是按照字段column name進(jìn)行分區(qū),column name值一致的所有記錄存放在一個(gè)分區(qū)中,分區(qū)屬性name的類型是string類型。
當(dāng)然,可以依據(jù)多個(gè)列進(jìn)行分區(qū),即對某個(gè)分區(qū)的數(shù)據(jù)按照某些列繼續(xù)分區(qū)。
向分區(qū)表導(dǎo)入數(shù)據(jù)的時(shí)候,要通過關(guān)鍵字partition((column name="xxxx")顯示聲明數(shù)據(jù)要導(dǎo)入到表的哪個(gè)分區(qū)
設(shè)置分區(qū)的影響
首先是hive本身對分區(qū)數(shù)有限制,不過可以修改限制的數(shù)量。
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions=1000;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.parallel.thread.number=264;
hdfs對單個(gè)目錄下的目錄數(shù)量或者文件數(shù)量也是有限制的,也是可以修改的;NN的內(nèi)存肯定會(huì)限制,這是最重要的,如果分區(qū)數(shù)很大,會(huì)影響NN服務(wù),進(jìn)而影響一系列依賴于NN的服務(wù)。所以最好合理設(shè)置分區(qū)規(guī)則,對小文件也可以定期合并,減少NN的壓力。
Hive的分桶
在分區(qū)數(shù)量過于龐大以至于可能導(dǎo)致文件系統(tǒng)崩潰時(shí),我們就需要使用分桶來解決問題
分桶是相對分區(qū)進(jìn)行更細(xì)粒度的劃分。分桶則是指定分桶表的某一列,讓該列數(shù)據(jù)按照哈希取模的方式隨機(jī)、均勻地分發(fā)到各個(gè)桶文件中。因?yàn)榉滞安僮餍枰鶕?jù)某一列具體數(shù)據(jù)來進(jìn)行哈希取模操作,故指定的分桶列必須基于表中的某一列(字段) 要使用關(guān)鍵字clustered by 指定分區(qū)依據(jù)的列名,還要指定分為多少桶:
create table test(id int,name string) cluster by (id) into 5 buckets .......
insert into buck select id ,name from p cluster by (id)
Hive分區(qū)分桶區(qū)別
分區(qū)是表的部分列的集合,可以為頻繁使用的數(shù)據(jù)建立分區(qū),這樣查找分區(qū)中的數(shù)據(jù)時(shí)就不需要掃描全表,這對于提高查找效率很有幫助。不同于分區(qū)對列直接進(jìn)行拆分,桶往往使用列的哈希值對數(shù)據(jù)打散,并分發(fā)到各個(gè)不同的桶中從而完成數(shù)據(jù)的分桶過程。分區(qū)和分桶最大的區(qū)別就是分桶隨機(jī)分割數(shù)據(jù)庫,分區(qū)是非隨機(jī)分割數(shù)據(jù)庫。
函數(shù)
本環(huán)節(jié)不再介紹簡單的函數(shù),比如:'if' ,'is not null' ,'=='等等這類的函數(shù)。
內(nèi)置函數(shù)
(1) NVL
給值為NULL的數(shù)據(jù)賦值,它的格式是NVL( value,default_value)。它的功能是如果value為NULL,則NVL函數(shù)返回default_value的值,否則返回value的值,如果兩個(gè)參數(shù)都為NULL ,則返回NULL
select nvl(column, 0) from xxx;
(2)行轉(zhuǎn)列
函數(shù) 描述 CONCAT(string A/col, string B/col…) 返回輸入字符串連接后的結(jié)果,支持任意個(gè)輸入字符串 CONCAT_WS(separator, str1, str2,...) 第一個(gè)參數(shù)間的分隔符,如果分隔符是 NULL,返回值也將為 NULL。這個(gè)函數(shù)會(huì)跳過分隔符參數(shù)后的任何 NULL 和空字符串。分隔符將被加到被連接的字符串之間。 COLLECT_SET(col) 將某字段的值進(jìn)行去重匯總,產(chǎn)生array類型字段 COLLECT_LIST(col) 函數(shù)只接受基本數(shù)據(jù)類型,它的主要作用是將某字段的值進(jìn)行不去重匯總,產(chǎn)生array類型字段。
(3)列轉(zhuǎn)行(一列轉(zhuǎn)多行)
Split(str, separator): 將字符串按照后面的分隔符切割,轉(zhuǎn)換成字符array。
EXPLODE(col): 將hive一列中復(fù)雜的array或者map結(jié)構(gòu)拆分成多行。
LATERAL VIEW
用法:
LATERAL VIEW udtf(expression) tableAlias AS columnAlias
解釋:lateral view用于和split, explode等UDTF一起使用,它能夠?qū)⒁恍袛?shù)據(jù)拆成多行數(shù)據(jù),在此基礎(chǔ)上可以對拆分后的數(shù)據(jù)進(jìn)行聚合。
lateral view首先為原始表的每行調(diào)用UDTF,UDTF會(huì)把一行拆分成一或者多行,lateral view再把結(jié)果組合,產(chǎn)生一個(gè)支持別名表的虛擬表。
準(zhǔn)備數(shù)據(jù)源測試
movie category 《功勛》 記錄,劇情 《戰(zhàn)狼2》 戰(zhàn)爭,動(dòng)作,災(zāi)難
SQL
SELECT movie,category_name
FROM movie_info
lateral VIEW
explode(split(category,",")) movie_info_tmp AS category_name ;
測試結(jié)果
《功勛》 記錄
《功勛》 劇情
《戰(zhàn)狼2》 戰(zhàn)爭
《戰(zhàn)狼2》 動(dòng)作
《戰(zhàn)狼2》 災(zāi)難
窗口函數(shù)
(1)OVER()
定分析函數(shù)工作的數(shù)據(jù)窗口大小,這個(gè)數(shù)據(jù)窗口大小可能會(huì)隨著行的變而變化。
(2)CURRENT ROW(當(dāng)前行)
語法
n PRECEDING:往前n行數(shù)據(jù)
n FOLLOWING:往后n行數(shù)據(jù)
(3)UNBOUNDED(無邊界)
UNBOUNDED PRECEDING 前無邊界,表示從前面的起點(diǎn)
unbounded perceding/following
UNBOUNDED FOLLOWING后無邊界,表示到后面的終點(diǎn)
SQL案例:由起點(diǎn)到當(dāng)前行的聚合
select
sum(money) over(partition by user_id order by pay_time rows between UNBOUNDED PRECEDING and current row)
from or_order;
SQL案例:當(dāng)前行和前面一行做聚合
select
sum(money) over(partition by user_id order by pay_time rows between 1 PRECEDING and current row)
from or_order;
SQL案例:當(dāng)前行和前面一行和后一行做聚合
select
sum(money) over(partition by user_id order by pay_time rows between 1 PRECEDING AND 1 FOLLOWING )
from or_order;
SQL案例:當(dāng)前行及后面所有行
select
sum(money) over(partition by user_id order by pay_time rows between current row and UNBOUNDED FOLLOWING )
from or_order;
(4)LAG(col,n,default_val)
往前第n行數(shù)據(jù),沒有的話default_val
(5)LEAD(col,n, default_val)
往后第n行數(shù)據(jù),沒有的話default_val
SQL案例:查詢用戶購買明細(xì)以及上次的購買時(shí)間和下次購買時(shí)間
select
user_id,,pay_time,money,
lag(pay_time,1,'1970-01-01') over(PARTITION by name order by pay_time) prev_time,
lead(pay_time,1,'1970-01-01') over(PARTITION by name order by pay_time) next_time
from or_order;
(6)FIRST_VALUE(col,true/false)
當(dāng)前窗口下的第一個(gè)值,第二個(gè)參數(shù)為true,跳過空值。
(7)LAST_VALUE (col,true/false)
當(dāng)前窗口下的最后一個(gè)值,第二個(gè)參數(shù)為true,跳過空值。
SQL案例:查詢顧用戶每個(gè)月第一次的購買時(shí)間 和 每個(gè)月的最后一次購買時(shí)間。
select
FIRST_VALUE(pay_time)
over(
partition by user_id,month(pay_time) order by pay_time
rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
) first_time,
LAST_VALUE(pay_time)
over(partition by user_id,month(pay_time) order by pay_time rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING
) last_time
from or_order;
(8)NTILE(n)
把有序窗口的行分發(fā)到指定數(shù)據(jù)的組中,各個(gè)組有編號,編號從1開始,對于每一行,NTILE返回此行所屬的組的編號。(用于將分組數(shù)據(jù)按照順序切分成n片,返回當(dāng)前切片值)
SQL案例:查詢前25%時(shí)間的訂單信息
select * from (
select User_id,pay_time,money,
ntile(4) over(order by pay_time) sorted
from or_order
) t
where sorted = 1;
4個(gè)By
(1)Order By
全局排序,只有一個(gè)Reducer。
(2)Sort By
分區(qū)內(nèi)有序。
(3)Distrbute By
類似MR中Partition,進(jìn)行分區(qū),結(jié)合sort by使用。
(4) Cluster By
當(dāng)Distribute by和Sorts by字段相同時(shí),可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外還兼具Sort by的功能。但是排序只能是升序排序,不能指定排序規(guī)則為ASC或者DESC。
在生產(chǎn)環(huán)境中Order By用的比較少,容易導(dǎo)致OOM。
在生產(chǎn)環(huán)境中Sort By+ Distrbute By用的多。
排序函數(shù)
(1)RANK()
排序相同時(shí)會(huì)重復(fù),總數(shù)不會(huì)變
1
1
3
3
5
(2)DENSE_RANK()
排序相同時(shí)會(huì)重復(fù),總數(shù)會(huì)減少
1
1
2
2
3
(3)ROW_NUMBER()
會(huì)根據(jù)順序計(jì)算
1
2
3
4
5
Hive 優(yōu)化
首先要這樣優(yōu)化的原理,再去適當(dāng)去調(diào)節(jié)參數(shù)和選擇方案。
1. 表的優(yōu)化
(1) 小表、大表Join
將key相對分散,并且數(shù)據(jù)量小的表放在join的左邊,這樣可以有效減少內(nèi)存溢出錯(cuò)誤發(fā)生的概率;再進(jìn)一步,可以使用map join讓小的維度表(1000條以下的記錄條數(shù))先進(jìn)內(nèi)存。在map端完成reduce。
(2) 大表Join大表
a. 空key過濾
有時(shí)join超時(shí)是因?yàn)槟承﹌ey對應(yīng)的數(shù)據(jù)太多,而相同key對應(yīng)的數(shù)據(jù)都會(huì)發(fā)送到相同的reducer上,從而導(dǎo)致內(nèi)存不夠。此時(shí)我們應(yīng)該仔細(xì)分析這些異常的key,很多情況下,這些key對應(yīng)的數(shù)據(jù)是異常數(shù)據(jù),我們需要在SQL語句中進(jìn)行過濾。
b. 空key轉(zhuǎn)換
有時(shí)雖然某個(gè)key為空對應(yīng)的數(shù)據(jù)很多,但是相應(yīng)的數(shù)據(jù)不是異常數(shù)據(jù),必須要包含在join的結(jié)果中,此時(shí)我們可以表a中key為空的字段賦一個(gè)隨機(jī)的值,使得數(shù)據(jù)隨機(jī)均勻地分不到不同的reducer上。
(3) MapJoin
如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會(huì)將Join操作轉(zhuǎn)換成Common Join,即:在Reduce階段完成join。容易發(fā)生數(shù)據(jù)傾斜。可以用MapJoin把小表全部加載到內(nèi)存在map端進(jìn)行join,避免reducer處理。
設(shè)置自動(dòng)選擇Mapjoin
set hive.auto.convert.join = true; 默認(rèn)為true
大表小表的閾值設(shè)置(默認(rèn)25M以下認(rèn)為是小表):
set hive.mapjoin.smalltable.filesize=25000000;
(4) Group By
Map階段同一Key數(shù)據(jù)分發(fā)給一個(gè)reduce,當(dāng)一個(gè)key數(shù)據(jù)過大時(shí)就傾斜了。并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端進(jìn)行部分聚合,最后在Reduce端得出最終結(jié)果。
(5) 開啟Map端聚合
// 是否在Map端進(jìn)行聚合,默認(rèn)為True
set hive.map.aggr = true
// 在Map端進(jìn)行聚合操作的條目數(shù)目
set hive.groupby.mapaggr.checkinterval = 100000
// 有數(shù)據(jù)傾斜的時(shí)候進(jìn)行負(fù)載均衡(默認(rèn)是false)
set hive.groupby.skewindata = true
對數(shù)據(jù)傾斜負(fù)載均衡的理解
會(huì)有兩個(gè)MR Job。第一個(gè)MR Job中,Map的輸出結(jié)果會(huì)隨機(jī)分布到Reduce中,每個(gè)Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的Group By Key有可能被分發(fā)到不同的Reduce中,從而達(dá)到負(fù)載均衡的目的;第二個(gè)MR Job再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照Group By Key分布到Reduce中(這個(gè)過程可以保證相同的Group By Key被分布到同一個(gè)Reduce中),最后完成最終的聚合操作。
(6) Count(Distinct) 去重統(tǒng)計(jì)
由于COUNT DISTINCT操作需要用一個(gè)Reduce Task來完成,這一個(gè)Reduce需要處理的數(shù)據(jù)量太大,就會(huì)導(dǎo)致整個(gè)Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換,但是需要注意group by造成的數(shù)據(jù)傾斜問題。
(7) 笛卡爾積
盡量避免笛卡爾積,join的時(shí)候不加on條件,或者無效的on條件,Hive只能使用1個(gè)reducer來完成笛卡爾積。
(8) 行列過濾
列處理:在SELECT中,只拿需要的列,如果有,盡量使用分區(qū)過濾,少用SELECT *。
行處理:在分區(qū)剪裁中,當(dāng)使用外關(guān)聯(lián)時(shí),如果將副表的過濾條件寫在Where后面,那么就會(huì)先全表關(guān)聯(lián),之后再過濾
2. 合理設(shè)置Map及Reduce數(shù)
首先理清楚Map數(shù)是越多越好嗎?
邏輯:如果一個(gè)任務(wù)有很多小文件(遠(yuǎn)遠(yuǎn)小于塊大小128m),則每個(gè)小文件也會(huì)被當(dāng)作一個(gè)塊,用一個(gè)map任務(wù)來完成,而一個(gè)map任務(wù)啟動(dòng)和初始化的時(shí)間遠(yuǎn)遠(yuǎn)大于邏輯處理的時(shí)間,就會(huì)造成很大的資源浪費(fèi)。
保證每個(gè)map處理接近128m的文件塊是不是就可以了?
邏輯:比如有一個(gè)127m的文件,正常會(huì)用一個(gè)map去完成,但這個(gè)文件只有一個(gè)或者兩個(gè)小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復(fù)雜,用一個(gè)map任務(wù)去做,肯定也比較耗時(shí)
復(fù)雜文件增加Map數(shù)
原理:文件都很大,任務(wù)邏輯復(fù)雜,map執(zhí)行非常慢的時(shí)候,可以考慮增加Map數(shù),來使得每個(gè)map處理的數(shù)據(jù)量減少,從而提高任務(wù)的執(zhí)行效率。
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
調(diào)整maxSize最大值。讓maxSize最大值低于blocksize就可以增加map的個(gè)數(shù)。
小文件進(jìn)行合并,減少map數(shù)
在map執(zhí)行前合并小文件,減少map數(shù):CombineHiveInputFormat具有對小文件進(jìn)行合并的功能(系統(tǒng)默認(rèn)的格式)。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
Map-Reduce的任務(wù)結(jié)束時(shí)合并小文件的設(shè)置
// 在map-only任務(wù)結(jié)束時(shí)合并小文件,默認(rèn)true
SET hive.merge.mapfiles = true;
// 在map-reduce任務(wù)結(jié)束時(shí)合并小文件,默認(rèn)false
SET hive.merge.mapredfiles = true;
// 合并文件的大小,默認(rèn)256M
SET hive.merge.size.per.task = 268435456;
//當(dāng)輸出文件的平均大小小于該值時(shí),啟動(dòng)一個(gè)獨(dú)立的map-reduce任務(wù)進(jìn)行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;
3. 合理設(shè)置Reduce數(shù)
同樣考慮是不是越多越好?
過多的啟動(dòng)和初始化reduce也會(huì)消耗時(shí)間和資源。有多少個(gè)reduce,就會(huì)有多少個(gè)輸出文件,如果生成了很多個(gè)小文件,那么如果這些小文件作為下一個(gè)任務(wù)的輸入,則也會(huì)出現(xiàn)小文件過多的問題。
(1)數(shù)據(jù)量設(shè)置
// 每個(gè)Reduce處理的數(shù)據(jù)量默認(rèn)是256MB
hive.exec.reducers.bytes.per.reducer=256000000
// 每個(gè)任務(wù)最大的reduce數(shù),默認(rèn)為1009
hive.exec.reducers.max=1009
// 計(jì)算reducer數(shù)的公式
N=min(hive.exec.reducers.max,總輸入數(shù)據(jù)量/hive.exec.reducers.bytes.per.reducer)
(2)文件配置
mapreduce.job.reduces = 15
4. 并行執(zhí)行
通過設(shè)置參數(shù)hive.exec.parallel值為true,就可以開啟并發(fā)執(zhí)行。不過,在共享集群中,需要注意下,如果job中并行階段增多,那么集群利用率就會(huì)增加。建議在數(shù)據(jù)量大,sql很長的時(shí)候使用,數(shù)據(jù)量小,sql比較的小開啟有可能還不如之前快。
//打開任務(wù)并行執(zhí)行,默認(rèn)為false
set hive.exec.parallel=true;
//同一個(gè)sql允許最大并行度,默認(rèn)為8。
set hive.exec.parallel.thread.number=16;
5. JVM重用
JVM來執(zhí)行map和Reduce任務(wù)的。這時(shí)JVM的啟動(dòng)過程可能會(huì)造成相當(dāng)大的開銷,尤其是執(zhí)行的job包含有成百上千task任務(wù)的情況。JVM重用可以使得JVM實(shí)例在同一個(gè)job中重新使用N次。
缺點(diǎn)是,開啟JVM重用將一直占用使用到的task插槽,以便進(jìn)行重用,直到任務(wù)完成后才能釋放。
set mapreduce.job.jvm.numtasks=10
6. 列式存儲(chǔ)
因?yàn)槊總€(gè)字段的數(shù)據(jù)聚集存儲(chǔ),在查詢只需要少數(shù)幾個(gè)字段的時(shí)候,能大大減少讀取的數(shù)據(jù)量;每個(gè)字段的數(shù)據(jù)類型一定是相同的,列式存儲(chǔ)可以針對性地設(shè)計(jì)更好的設(shè)計(jì)壓縮算法。
TEXTFILE和SEQUENCEFILE的存儲(chǔ)格式都是基于行存儲(chǔ)的;
ORC和PARQUET是基于列式存儲(chǔ)的。
7. 壓縮(選擇快的)
// 啟用中間數(shù)據(jù)壓縮
set hive.exec.compress.intermediate=true
// 啟用最終數(shù)據(jù)壓縮
set mapreduce.map.output.compress=true
// 設(shè)置壓縮方式
set mapreduce.map.outout.compress.codec=
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.Lz4Codec
Hive數(shù)據(jù)傾斜
Hive數(shù)據(jù)傾斜表現(xiàn)
就是單說hive自身的MR引擎:發(fā)現(xiàn)所有的map task全部完成,并且99%的reduce task完成,只剩下一個(gè)或者少數(shù)幾個(gè)reduce task一直在執(zhí)行,這種情況下一般都是發(fā)生了數(shù)據(jù)傾斜。說白了就是Hive的數(shù)據(jù)傾斜本質(zhì)上是MapReduce的數(shù)據(jù)傾斜。
Hive數(shù)據(jù)傾斜的原因
在MapReduce編程模型中十分常見,大量相同的key被分配到一個(gè)reduce里,造成一個(gè)reduce任務(wù)累死,其他reduce任務(wù)閑死。查看任務(wù)進(jìn)度,發(fā)現(xiàn)長時(shí)間停留在99%或100%,查看任務(wù)監(jiān)控界面,只有少量的reduce子任務(wù)未完成。
key分布不均衡。業(yè)務(wù)問題或者業(yè)務(wù)數(shù)據(jù)本身的問題,某些數(shù)據(jù)比較集中。
(1)join小表:其中一個(gè)表是小表,但是key比較集中,導(dǎo)致的就是某些Reduce的值偏高。
(2)空值或無意義值:如果缺失的項(xiàng)很多,在做join時(shí)這些空值就會(huì)非常集中,拖累進(jìn)度。
(3)group by:維度過小。
(4)distinct:導(dǎo)致最終只有一個(gè)Reduce任務(wù)。
Hive數(shù)據(jù)傾斜解決
group by代替distinct 要統(tǒng)計(jì)某一列的去重?cái)?shù)時(shí),如果數(shù)據(jù)量很大,count(distinct)就會(huì)非常慢,原因與order by類似,count(distinct)邏輯導(dǎo)致最終只有一個(gè)Reduce任務(wù)。對1再優(yōu)化:group by配置調(diào)整
(1)map端預(yù)聚合
(2)group by時(shí),combiner在map端做部分預(yù)聚合,可以有效減少shuffle數(shù)據(jù)量。
(3)checkinterval:設(shè)置map端預(yù)聚合的行數(shù)閾值,超過該值就會(huì)分拆job。
hive.map.aggr=true //默認(rèn)
hive.groupby.mapaggr.checkinterval=100000 // 默認(rèn)
(4)傾斜均衡配置 Hive自帶了一個(gè)均衡數(shù)據(jù)傾斜的配置項(xiàng)。
其實(shí)現(xiàn)方法是在group by時(shí)啟動(dòng)兩個(gè)MR job。第一個(gè)job會(huì)將map端數(shù)據(jù)隨機(jī)輸入reducer,每個(gè)reducer做部分聚合,相同的key就會(huì)分布在不同的reducer中。第二個(gè)job再將前面預(yù)處理過的數(shù)據(jù)按key聚合并輸出結(jié)果,這樣就起到了均衡的效果。
hive.groupby.skewindata=false // 默認(rèn)
join基礎(chǔ)優(yōu)化
(1) Hive在解析帶join的SQL語句時(shí),會(huì)默認(rèn)將最后一個(gè)表作為大表,將前面的表作為小表,將它們讀進(jìn)內(nèi)存。如果表順序?qū)懛矗绻蟊碓谇懊?,引發(fā)OOM。不過現(xiàn)在hive自帶優(yōu)化。
(2) map join:特別適合大小表join的情況,大小表join在map端直接完成join過程,沒有reduce,效率很高。
(3)多表join時(shí)key相同:會(huì)將多個(gè)join合并為一個(gè)MR job來處理,兩個(gè)join的條件不相同,就會(huì)拆成多個(gè)MR job計(jì)算。
sort by代替order by
將結(jié)果按某字段全局排序,這會(huì)導(dǎo)致所有map端數(shù)據(jù)都進(jìn)入一個(gè)reducer中,在數(shù)據(jù)量大時(shí)可能會(huì)長時(shí)間計(jì)算不完。使用sort by,那么還是會(huì)視情況啟動(dòng)多個(gè)reducer進(jìn)行排序,并且保證每個(gè)reducer內(nèi)局部有序。為了控制map端數(shù)據(jù)分配到reducer的key,往往還要配合distribute by一同使用。如果不加distribute by的話,map端數(shù)據(jù)就會(huì)隨機(jī)分配到reducer。
單獨(dú)處理傾斜key
一般來講傾斜的key都很少,我們可以將它們抽樣出來,對應(yīng)的行單獨(dú)存入臨時(shí)表中,然后打上隨機(jī)數(shù)前綴,最后再進(jìn)行聚合。或者是先對key做一層hash,先將數(shù)據(jù)隨機(jī)打散讓它的并行度變大,再匯集。其實(shí)辦法一樣。
柚子快報(bào)邀請碼778899分享:大數(shù)據(jù) Hive 主要內(nèi)容一覽
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。