柚子快報邀請碼778899分享:spark Hive小文件處理
柚子快報邀請碼778899分享:spark Hive小文件處理
MR任務
mr任務參考鏈接
set hive.exec.reducers.max=3
set hive.exec.dynamic.partition.mode = true; --使用動態(tài)分區(qū)時,設置為ture。
set hive.exec.dynamic.partition.mode = nonstrict; --動態(tài)分區(qū)模式,默認值:strict,表示必須指定一個分區(qū)為靜態(tài)分區(qū); nonstrict模式表示允許所有的分區(qū)字段都可以使用動態(tài)分區(qū)。一般需要設置為nonstrict。
set hive.exec.max.dynamic.partitions.pernode =10; --在每個執(zhí)行MR的節(jié)點上,最多可以創(chuàng)建多少個動態(tài)分區(qū),默認值:100。 set hive.exec.max.dynamic.partitions =1000; --在所有執(zhí)行MR的節(jié)點上,最多一共可以創(chuàng)建多少個動態(tài)分區(qū),默認值:1000。 set hive.exec.max.created.files = 100000; --整個MR Job中最多可以創(chuàng)建多少個HDFS文件,默認值:100000。 set hive.error.on.empty.partition = false; --當有空分區(qū)產(chǎn)生時,是否拋出異常,默認值:false。
Hive文件產(chǎn)生大量小文件的原因: 一是文件本身的原因:小文件多,以及文件的大小; 二是使用動態(tài)分區(qū),可能會導致產(chǎn)生大量分區(qū),從而產(chǎn)生很多小文件,也會導致產(chǎn)生很多Mapper; 三是Reduce數(shù)量較多,Hive SQL輸出文件的數(shù)量和Reduce的個數(shù)是一樣的。
小文件帶來的影響: 文件的數(shù)量和大小決定Mapper任務的數(shù)量,小文件越多,Mapper任務越多,每一個Mapper都會啟動一個JVM來運行,所以這些任務的初始化和執(zhí)行會花費大量的資源,嚴重影響性能。 在NameNode中每個文件大約占150字節(jié),小文件多,會嚴重影響NameNode性能。 解決小文件問題: 如果動態(tài)分區(qū)數(shù)量不可預測,最好不用。 如果用,最好使用distributed by分區(qū)字段,這樣會對字段進行一個hash操作,把相同的分區(qū)給同一個Reduce處理; 減少Reduce數(shù)量; 進行以一些參數(shù)調(diào)整。
Hdfs文件數(shù)
指定目錄下的文件夾,文件,容量大小
[root@mz-hadoop-01 ~]# hdfs dfs -count /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed
568 7433 6065483664 /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed
[root@mz-hadoop-01 ~]# hdfs dfs -count -h /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed
568 7.3 K 5.6 G /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed
Hive文件數(shù)
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS WHERE tbl_id='97387'
) a
LEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_id
GROUP BY tbl_id
ORDER BY file_cnts DESC;
TBL_ID file_cnts
------ -----------
97387 2082
所有文件數(shù)
SELECT SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) a
LEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_id
file_cnts
-----------
340323
表文件數(shù)topN
SELECT e.*,f.*
FROM
(
SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) a
LEFT JOIN (
SELECT * FROM partition_params WHERE PARAM_KEY='numFiles'
) b
ON a.part_id=b.part_id
GROUP BY tbl_id
ORDER BY file_cnts DESC
) c
LEFT JOIN (
SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id
) e LEFT JOIN
(
SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2
庫文件數(shù)topN
select
db_id,db_name,DB_LOCATION_URI,sum(file_cnts) as file_cnts
from (
SELECT e.*,f.*
FROM
(
SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) a
LEFT JOIN (
SELECT * FROM partition_params WHERE PARAM_KEY='numFiles'
) b
ON a.part_id=b.part_id
GROUP BY tbl_id
ORDER BY file_cnts DESC
) c
LEFT JOIN (
SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id
) e LEFT JOIN
(
SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2
)g group by db_id,db_name,DB_LOCATION_URI order by file_cnts desc
小文件壓縮任務
package com.mingzhi.common.universal
import com.mingzhi.common.interf.{IDate, MySaveMode}
import com.mingzhi.common.utils.{HiveUtil, SinkUtil, SparkUtils, TableUtils}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 處理只有一個分區(qū)dt的表
*/
object table_compress_process {
private var hive_dbs: String = "paascloud"
private var hive_tables: String = "dwd_order_info_abi"
private var dt: String = "2023-06-30"
private var dt1: String = "2023-06-30"
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val builder = SparkUtils.getBuilder
if (System.getProperties.getProperty("os.name").contains("Windows")) {
builder.master("local[*]")
} else {
hive_dbs = args(0)
hive_tables = args(1)
dt = args(2)
dt1 = args(3)
}
val spark: SparkSession = builder.appName("clean_process").getOrCreate()
HiveUtil.openDynamicPartition(spark)
spark.sql("set spark.sql.shuffle.partitions=1")
if ("all".equalsIgnoreCase(hive_dbs)) {
val builder = new StringBuilder()
val frame_db = spark.sql("show databases").select("databaseName")
frame_db.show(false)
frame_db.collect().foreach(db => {
builder.append(db.toString().replace("[", "").replace("]", ","))
})
println("dbs:" + builder.toString())
hive_dbs = builder.toString()
}
hive_dbs.split(",").foreach(db => {
if (StringUtils.isNotBlank(db)) {
if ("all".equalsIgnoreCase(hive_tables)) {
compress_all_table(spark, db)
} else {
hive_tables.split(",").foreach(t => {
compress_the_table(spark, db, t)
})
}
}
})
spark.stop()
}
private def compress_the_table(spark: SparkSession, hive_db: String, table: String): Unit = {
println("compress_the_table======>:" + hive_db + "." + table)
spark.sql(s"use $hive_db")
if (TableUtils.tableExists(spark, hive_db, table)) {
try {
new IDate {
override def onDate(dt: String): Unit = {
/**
* 建議:對需要checkpoint的RDD,先執(zhí)行persist(StorageLevel.DISK_ONLY)
*/
val f1 = spark.sql(
s"""
|
|select * from $hive_db.$table where dt='$dt'
|""".stripMargin)
.persist(StorageLevel.MEMORY_ONLY)
val r_ck: (DataFrame, String) = SparkUtils.persistDataFrame(spark, f1)
val f2 = r_ck._1
println("f2 show===>")
f2.show(false)
val type_ = TableUtils.getCompressType(spark, hive_db, table)
if ("HiveFileFormat".equalsIgnoreCase(type_)) {
println("sink HiveFileFormat table:" + table)
SinkUtil.sink_to_hive_HiveFileFormat(spark, f2, hive_db, table, null)
} else {
//spark表
SinkUtil.sink_to_hive(dt
, spark
, f2
, hive_db
, table
, type_
, MySaveMode.OverWriteByDt
, 1)
}
spark.sql(s"drop table ${r_ck._2} ")
}
}.invoke(dt, dt1)
} catch {
case e: org.apache.spark.sql.AnalysisException => {
println("exception1:" + e)
}
case e: Exception => println("exception:" + e)
}
}
}
private def compress_all_table(spark: SparkSession, hive_db: String): Unit = {
spark.sql(s"use $hive_db")
val frame_table = spark.sql(s"show tables")
frame_table.show(100, false)
frame_table.printSchema()
frame_table
.filter(r => {
!r.getAs[Boolean]("isTemporary")
})
.select("tableName").collect().foreach(r => {
//r:[ads_order_topn]
val table = r.toString().replace("[", "").replace("]", "")
println("compress table:" + hive_db + "." + table)
if (TableUtils.tableExists(spark, hive_db, table)) {
try {
new IDate {
override def onDate(dt: String): Unit = {
val f1 = spark.sql(
s"""
|
|select * from $hive_db.$table where dt='$dt'
|""".stripMargin)
SinkUtil.sink_to_hive(dt, spark, f1, hive_db, table, "orc", MySaveMode.OverWriteByDt, 1)
}
}.invoke(dt, dt1)
} catch {
case e: org.apache.spark.sql.AnalysisException => {
println("exception1:" + e)
}
case e: Exception => println("exception:" + e)
}
}
})
}
}
柚子快報邀請碼778899分享:spark Hive小文件處理
好文鏈接
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。