欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:spark Hive小文件處理

柚子快報邀請碼778899分享:spark Hive小文件處理

http://yzkb.51969.com/

MR任務

mr任務參考鏈接

set hive.exec.reducers.max=3

set hive.exec.dynamic.partition.mode = true; --使用動態(tài)分區(qū)時,設置為ture。

set hive.exec.dynamic.partition.mode = nonstrict; --動態(tài)分區(qū)模式,默認值:strict,表示必須指定一個分區(qū)為靜態(tài)分區(qū); nonstrict模式表示允許所有的分區(qū)字段都可以使用動態(tài)分區(qū)。一般需要設置為nonstrict。

set hive.exec.max.dynamic.partitions.pernode =10; --在每個執(zhí)行MR的節(jié)點上,最多可以創(chuàng)建多少個動態(tài)分區(qū),默認值:100。 set hive.exec.max.dynamic.partitions =1000; --在所有執(zhí)行MR的節(jié)點上,最多一共可以創(chuàng)建多少個動態(tài)分區(qū),默認值:1000。 set hive.exec.max.created.files = 100000; --整個MR Job中最多可以創(chuàng)建多少個HDFS文件,默認值:100000。 set hive.error.on.empty.partition = false; --當有空分區(qū)產(chǎn)生時,是否拋出異常,默認值:false。

Hive文件產(chǎn)生大量小文件的原因: 一是文件本身的原因:小文件多,以及文件的大小; 二是使用動態(tài)分區(qū),可能會導致產(chǎn)生大量分區(qū),從而產(chǎn)生很多小文件,也會導致產(chǎn)生很多Mapper; 三是Reduce數(shù)量較多,Hive SQL輸出文件的數(shù)量和Reduce的個數(shù)是一樣的。

小文件帶來的影響: 文件的數(shù)量和大小決定Mapper任務的數(shù)量,小文件越多,Mapper任務越多,每一個Mapper都會啟動一個JVM來運行,所以這些任務的初始化和執(zhí)行會花費大量的資源,嚴重影響性能。 在NameNode中每個文件大約占150字節(jié),小文件多,會嚴重影響NameNode性能。 解決小文件問題: 如果動態(tài)分區(qū)數(shù)量不可預測,最好不用。 如果用,最好使用distributed by分區(qū)字段,這樣會對字段進行一個hash操作,把相同的分區(qū)給同一個Reduce處理; 減少Reduce數(shù)量; 進行以一些參數(shù)調(diào)整。

Hdfs文件數(shù)

指定目錄下的文件夾,文件,容量大小

[root@mz-hadoop-01 ~]# hdfs dfs -count /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed

568 7433 6065483664 /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed

[root@mz-hadoop-01 ~]# hdfs dfs -count -h /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed

568 7.3 K 5.6 G /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed

Hive文件數(shù)

SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts

FROM

(

SELECT * FROM PARTITIONS WHERE tbl_id='97387'

) a

LEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b

ON a.part_id=b.part_id

GROUP BY tbl_id

ORDER BY file_cnts DESC;

TBL_ID file_cnts

------ -----------

97387 2082

所有文件數(shù)

SELECT SUM(PARAM_VALUE) AS file_cnts

FROM

(

SELECT * FROM PARTITIONS

) a

LEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b

ON a.part_id=b.part_id

file_cnts

-----------

340323

表文件數(shù)topN

SELECT e.*,f.*

FROM

(

SELECT c.*,d.db_id,d.tbl_name

FROM

(

SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts

FROM

(

SELECT * FROM PARTITIONS

) a

LEFT JOIN (

SELECT * FROM partition_params WHERE PARAM_KEY='numFiles'

) b

ON a.part_id=b.part_id

GROUP BY tbl_id

ORDER BY file_cnts DESC

) c

LEFT JOIN (

SELECT * FROM tbls

) d

ON c.tbl_id=d.tbl_id

) e LEFT JOIN

(

SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs

)f ON e.db_id=f.DB_ID2

庫文件數(shù)topN

select

db_id,db_name,DB_LOCATION_URI,sum(file_cnts) as file_cnts

from (

SELECT e.*,f.*

FROM

(

SELECT c.*,d.db_id,d.tbl_name

FROM

(

SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts

FROM

(

SELECT * FROM PARTITIONS

) a

LEFT JOIN (

SELECT * FROM partition_params WHERE PARAM_KEY='numFiles'

) b

ON a.part_id=b.part_id

GROUP BY tbl_id

ORDER BY file_cnts DESC

) c

LEFT JOIN (

SELECT * FROM tbls

) d

ON c.tbl_id=d.tbl_id

) e LEFT JOIN

(

SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs

)f ON e.db_id=f.DB_ID2

)g group by db_id,db_name,DB_LOCATION_URI order by file_cnts desc

小文件壓縮任務

package com.mingzhi.common.universal

import com.mingzhi.common.interf.{IDate, MySaveMode}

import com.mingzhi.common.utils.{HiveUtil, SinkUtil, SparkUtils, TableUtils}

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.spark.storage.StorageLevel

/**

* 處理只有一個分區(qū)dt的表

*/

object table_compress_process {

private var hive_dbs: String = "paascloud"

private var hive_tables: String = "dwd_order_info_abi"

private var dt: String = "2023-06-30"

private var dt1: String = "2023-06-30"

def main(args: Array[String]): Unit = {

System.setProperty("HADOOP_USER_NAME", "root")

val builder = SparkUtils.getBuilder

if (System.getProperties.getProperty("os.name").contains("Windows")) {

builder.master("local[*]")

} else {

hive_dbs = args(0)

hive_tables = args(1)

dt = args(2)

dt1 = args(3)

}

val spark: SparkSession = builder.appName("clean_process").getOrCreate()

HiveUtil.openDynamicPartition(spark)

spark.sql("set spark.sql.shuffle.partitions=1")

if ("all".equalsIgnoreCase(hive_dbs)) {

val builder = new StringBuilder()

val frame_db = spark.sql("show databases").select("databaseName")

frame_db.show(false)

frame_db.collect().foreach(db => {

builder.append(db.toString().replace("[", "").replace("]", ","))

})

println("dbs:" + builder.toString())

hive_dbs = builder.toString()

}

hive_dbs.split(",").foreach(db => {

if (StringUtils.isNotBlank(db)) {

if ("all".equalsIgnoreCase(hive_tables)) {

compress_all_table(spark, db)

} else {

hive_tables.split(",").foreach(t => {

compress_the_table(spark, db, t)

})

}

}

})

spark.stop()

}

private def compress_the_table(spark: SparkSession, hive_db: String, table: String): Unit = {

println("compress_the_table======>:" + hive_db + "." + table)

spark.sql(s"use $hive_db")

if (TableUtils.tableExists(spark, hive_db, table)) {

try {

new IDate {

override def onDate(dt: String): Unit = {

/**

* 建議:對需要checkpoint的RDD,先執(zhí)行persist(StorageLevel.DISK_ONLY)

*/

val f1 = spark.sql(

s"""

|

|select * from $hive_db.$table where dt='$dt'

|""".stripMargin)

.persist(StorageLevel.MEMORY_ONLY)

val r_ck: (DataFrame, String) = SparkUtils.persistDataFrame(spark, f1)

val f2 = r_ck._1

println("f2 show===>")

f2.show(false)

val type_ = TableUtils.getCompressType(spark, hive_db, table)

if ("HiveFileFormat".equalsIgnoreCase(type_)) {

println("sink HiveFileFormat table:" + table)

SinkUtil.sink_to_hive_HiveFileFormat(spark, f2, hive_db, table, null)

} else {

//spark表

SinkUtil.sink_to_hive(dt

, spark

, f2

, hive_db

, table

, type_

, MySaveMode.OverWriteByDt

, 1)

}

spark.sql(s"drop table ${r_ck._2} ")

}

}.invoke(dt, dt1)

} catch {

case e: org.apache.spark.sql.AnalysisException => {

println("exception1:" + e)

}

case e: Exception => println("exception:" + e)

}

}

}

private def compress_all_table(spark: SparkSession, hive_db: String): Unit = {

spark.sql(s"use $hive_db")

val frame_table = spark.sql(s"show tables")

frame_table.show(100, false)

frame_table.printSchema()

frame_table

.filter(r => {

!r.getAs[Boolean]("isTemporary")

})

.select("tableName").collect().foreach(r => {

//r:[ads_order_topn]

val table = r.toString().replace("[", "").replace("]", "")

println("compress table:" + hive_db + "." + table)

if (TableUtils.tableExists(spark, hive_db, table)) {

try {

new IDate {

override def onDate(dt: String): Unit = {

val f1 = spark.sql(

s"""

|

|select * from $hive_db.$table where dt='$dt'

|""".stripMargin)

SinkUtil.sink_to_hive(dt, spark, f1, hive_db, table, "orc", MySaveMode.OverWriteByDt, 1)

}

}.invoke(dt, dt1)

} catch {

case e: org.apache.spark.sql.AnalysisException => {

println("exception1:" + e)

}

case e: Exception => println("exception:" + e)

}

}

})

}

}

柚子快報邀請碼778899分享:spark Hive小文件處理

http://yzkb.51969.com/

好文鏈接

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19566982.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄