欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:kafka發(fā)送大消息

柚子快報激活碼778899分享:kafka發(fā)送大消息

http://yzkb.51969.com/

1 kafka消息壓縮

kafka關(guān)于消息壓縮的定義(來源于官網(wǎng)):

????????此為 Kafka 中端到端的塊壓縮功能。如果啟用,數(shù)據(jù)將由 producer 壓縮,以壓縮格式寫入服務(wù)器,并由 consumer 解壓縮。壓縮將提高 consumer 的吞吐量,但需付出一定的解壓成本。

????????壓縮就是用時間換空間,其基本理念是基于重復,將重復的片段編碼為字典,字典的 key 為重復片段,value 為更短的代碼,比如序列號,然后將原始內(nèi)容中的片段用代碼表示,達到縮短內(nèi)容的效果,壓縮后的內(nèi)容則由字典和代碼序列兩部分組成。解壓時根據(jù)字典和代碼序列可無損地還原為原始內(nèi)容。通常來講,重復越多,壓縮效果越好。比如 JSON 是 Kafka 消息中常用的序列化格式,單條消息內(nèi)可能并沒有多少重復片段,但如果是批量消息,則會有大量重復的字段名,批量中消息越多,則重復越多,這也是為什么 Kafka 更偏向塊壓縮,而不是單條消息壓縮。

2 kafka的消息壓縮類型對比

????????目前 Kafka 共支持四種主要的壓縮類型:Gzip、Snappy、Lz4 和 Zstd。關(guān)于這幾種壓縮的特性。

壓縮類型壓縮率CPU 使用率壓縮速度帶寬使用率GzipHighestHighestSlowestLowestSnappyMediumModerateModerateMediumLz4LowLowestFastestHighestZstdMediumModerateModerateMedium

????????從上表可知,Snappy 在 CPU 使用率、壓縮比、壓縮速度和網(wǎng)絡(luò)帶寬使用率之間實現(xiàn)良好的平衡,我們最終也是采用的該類型進行壓縮試點。這里值得一提的是,Zstd 是 Facebook 于 2016 年開源的新壓縮算法,壓縮率和壓縮性能都不錯,具有與 Snappy(Google 杰作)相似的特性,直到 Kafka 的 2.1.0 版本才引入支持。

3 何時需要壓縮

????????壓縮是需要額外的 CPU 代價的,并且會帶來一定的消息分發(fā)延遲,因而在壓縮前要慎重考慮是否有必要。

壓縮帶來的磁盤空間和帶寬節(jié)省遠大于額外的 CPU 代價,這樣的壓縮是值得的。數(shù)據(jù)量足夠大且具重復性。消息壓縮是批量的,低頻的數(shù)據(jù)流可能都無法填滿一個批量,會影響壓縮比。數(shù)據(jù)重復性越高,往往壓縮效果越好,例如 JSON、XML 等結(jié)構(gòu)化數(shù)據(jù);但若數(shù)據(jù)不具重復性,例如文本都是唯一的 md5 或 UUID 之類,違背了壓縮的重復性前提,壓縮效果可能不會理想。系統(tǒng)對消息分發(fā)的延遲沒有嚴苛要求,可容忍輕微的延遲增長。

4 如何開啟壓縮

????????Kafka 通過配置屬性?compression.type?控制是否壓縮。該屬性在 producer 端和 broker 端各自都有一份,也就是說,我們可以選擇在 producer 或 broker 端開啟壓縮,對應(yīng)的應(yīng)用場景各有不同。目前沒有嘗試在broker段開啟壓縮。

4.1 在broker端開啟解壓縮

????????Broker 端的?compression.type?屬性默認值為?producer,即直接繼承 producer 端所發(fā)來消息的壓縮方式,無論消息采用何種壓縮或者不壓縮,broker 都原樣存儲。、

4.1.1?broker 和 topic 兩個級別

????????在 broker 端的壓縮配置分為兩個級別:全局的 broker 級別 和 局部的 topic 級別。顧名思義,如果配置的是 broker 級別,則對于該 Kafka 集群中所有的 topic 都是生效的。但如果 topic 級別配置了自己的壓縮類型,則會覆蓋 broker 全局的配置,以 topic 自己配置的為準。

broker級別:要配置 broker 級別的壓縮類型,可通過?configs?命令修改? ?compression.type? 配置項取值。此處要使修改生效,是否需要重啟 broker 取決于 Kafak 的版本,在 1.1.0 之前,任何配置項的改動都需要重啟 broker 才生效,而從 1.1.0 版本開始,Kafka 引入了動態(tài) broker 參數(shù),將配置項分為三類:read-only、per-broker?和?cluster-wide,第一類跟原來一樣需重啟才生效,而后面兩類都是動態(tài)生效的,只是影響范圍不同,關(guān)于 Kafka 動態(tài)參數(shù),以后單開博文介紹。從?官網(wǎng)?可以看到,compression.type?是屬于?cluster-wide?的,如果是 1.1.0 及之后的版本,則無需重啟 broker。

topic級別:topic 的配置分為兩部分,一部分是 topic 特有的,如 partitions 等,另一部分則是默認采用 broker 配置,但也可以覆蓋。如果要定義 topic 級別的壓縮,可以在 topic 創(chuàng)建時通過 --config 選項覆蓋配置項?compression.type?的取值,命令如下:

sh bin/kafka-topics.sh \

--create \

--topic my-topic \

--replication-factor 1 \

--partitions 1 \

--config compression.type=snappy

也可以通過?configs?命令修改 topic 的?compression.type?取值,命令如下:

bin/kafka-configs.sh \

--entity-type topics \

--entity-name my-topic \

--alter \

--add-config compression.type=snappy

4.2?在 Producer 端壓縮

????????跟 broker 端一樣,producer 端的壓縮配置屬性依然是?compression.type,只不過默認值和可選值有所不同。默認值為?none,表示不壓縮。直接在代碼層面更改 producer 的 config。但需要注意的是,改完 config 之后,需要重啟 producer 端的應(yīng)用程序,壓縮才會生效。

代碼示例如下:

public class KafkaProducerTest {

public static void main(String[] args) {

String brokerList = "127.0.0.1:9092";

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"2097245");

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");

KafkaProducer producer = new KafkaProducer<>(properties);

String topic = "mytestTopic1";

int sizeInMb = 2; // 設(shè)置字符串大小為2MB

int sizeInBytes = sizeInMb * 1024 * 1024; // 轉(zhuǎn)換為字節(jié)數(shù)

StringBuilder largeString = new StringBuilder(sizeInBytes);

largeString.append(":");

for (int i = 0; i < sizeInBytes; i++) {

largeString.append("A"); // 使用大寫字母"A"來構(gòu)建字符串

}

String msg = largeString.toString();

try {

for (int i = 0; i < 100; i++) {

String msg1 = i+msg;

producer.send(new ProducerRecord<>(topic, msg1));

Thread.sleep(500);

}

}catch (Exception e){

e.printStackTrace();

}

}

}

上面示例特意制造了一個大字符串作為消息,測試壓縮,需要注意的是,配置壓縮的時候同時也需要配置消息的最大值。即:max.request.size。

5 解壓縮

可能發(fā)生解壓的地方依然是兩處:consumer 端和 broker 端。

consumer端:consumer 端發(fā)生解壓的唯一條件就是從 broker 端拉取到的消息是帶壓縮的。此時,consumer 會根據(jù)?recordBatch?中?compressionType?來對消息進行解壓。

broker端:broker 端是否發(fā)生解壓取決于 producer 發(fā)過來的批量消息?recordBatch?是否是壓縮的:如果 producer 開啟了壓縮,則會發(fā)生解壓,否則不會。原因簡單說下,在 broker 端持久化消息前,會對消息做各種驗證,此時必然會迭代?recordBatch,而在迭代的過程中,會直接采用?recordBatch?上的?compressionType?對消息字節(jié)流進行處理,是否解壓取決于?compressionType?是否是壓縮類型。關(guān)于這點,可以在? ?LogValidator?的?validateMessagesAndAssignOffsets?方法實現(xiàn)中可以看到,在?convertAndAssignOffsetsNonCompressed、assignOffsetsNonCompressed? 和?validateMessagesAndAssignOffsetsCompressed?三個不同的分支中,都會看到?records.batches.forEach {...}?的身影,而在后面的源碼分析中會發(fā)現(xiàn),在?recordBatch?的迭代器邏輯中,直接采用的?compressionType?的解壓邏輯對消息字節(jié)流讀取的。也就是說,如果?recordBatch?是壓縮的 ,只要對其進行了迭代訪問,則會自動觸發(fā)解壓邏輯。

通俗一點講:producer端配置了壓縮,consumer自動解壓縮。

柚子快報激活碼778899分享:kafka發(fā)送大消息

http://yzkb.51969.com/

好文閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19475359.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄