柚子快報激活碼778899分享:kafka發(fā)送大消息
柚子快報激活碼778899分享:kafka發(fā)送大消息
1 kafka消息壓縮
kafka關(guān)于消息壓縮的定義(來源于官網(wǎng)):
????????此為 Kafka 中端到端的塊壓縮功能。如果啟用,數(shù)據(jù)將由 producer 壓縮,以壓縮格式寫入服務(wù)器,并由 consumer 解壓縮。壓縮將提高 consumer 的吞吐量,但需付出一定的解壓成本。
????????壓縮就是用時間換空間,其基本理念是基于重復,將重復的片段編碼為字典,字典的 key 為重復片段,value 為更短的代碼,比如序列號,然后將原始內(nèi)容中的片段用代碼表示,達到縮短內(nèi)容的效果,壓縮后的內(nèi)容則由字典和代碼序列兩部分組成。解壓時根據(jù)字典和代碼序列可無損地還原為原始內(nèi)容。通常來講,重復越多,壓縮效果越好。比如 JSON 是 Kafka 消息中常用的序列化格式,單條消息內(nèi)可能并沒有多少重復片段,但如果是批量消息,則會有大量重復的字段名,批量中消息越多,則重復越多,這也是為什么 Kafka 更偏向塊壓縮,而不是單條消息壓縮。
2 kafka的消息壓縮類型對比
????????目前 Kafka 共支持四種主要的壓縮類型:Gzip、Snappy、Lz4 和 Zstd。關(guān)于這幾種壓縮的特性。
壓縮類型壓縮率CPU 使用率壓縮速度帶寬使用率GzipHighestHighestSlowestLowestSnappyMediumModerateModerateMediumLz4LowLowestFastestHighestZstdMediumModerateModerateMedium
????????從上表可知,Snappy 在 CPU 使用率、壓縮比、壓縮速度和網(wǎng)絡(luò)帶寬使用率之間實現(xiàn)良好的平衡,我們最終也是采用的該類型進行壓縮試點。這里值得一提的是,Zstd 是 Facebook 于 2016 年開源的新壓縮算法,壓縮率和壓縮性能都不錯,具有與 Snappy(Google 杰作)相似的特性,直到 Kafka 的 2.1.0 版本才引入支持。
3 何時需要壓縮
????????壓縮是需要額外的 CPU 代價的,并且會帶來一定的消息分發(fā)延遲,因而在壓縮前要慎重考慮是否有必要。
壓縮帶來的磁盤空間和帶寬節(jié)省遠大于額外的 CPU 代價,這樣的壓縮是值得的。數(shù)據(jù)量足夠大且具重復性。消息壓縮是批量的,低頻的數(shù)據(jù)流可能都無法填滿一個批量,會影響壓縮比。數(shù)據(jù)重復性越高,往往壓縮效果越好,例如 JSON、XML 等結(jié)構(gòu)化數(shù)據(jù);但若數(shù)據(jù)不具重復性,例如文本都是唯一的 md5 或 UUID 之類,違背了壓縮的重復性前提,壓縮效果可能不會理想。系統(tǒng)對消息分發(fā)的延遲沒有嚴苛要求,可容忍輕微的延遲增長。
4 如何開啟壓縮
????????Kafka 通過配置屬性?compression.type?控制是否壓縮。該屬性在 producer 端和 broker 端各自都有一份,也就是說,我們可以選擇在 producer 或 broker 端開啟壓縮,對應(yīng)的應(yīng)用場景各有不同。目前沒有嘗試在broker段開啟壓縮。
4.1 在broker端開啟解壓縮
????????Broker 端的?compression.type?屬性默認值為?producer,即直接繼承 producer 端所發(fā)來消息的壓縮方式,無論消息采用何種壓縮或者不壓縮,broker 都原樣存儲。、
4.1.1?broker 和 topic 兩個級別
????????在 broker 端的壓縮配置分為兩個級別:全局的 broker 級別 和 局部的 topic 級別。顧名思義,如果配置的是 broker 級別,則對于該 Kafka 集群中所有的 topic 都是生效的。但如果 topic 級別配置了自己的壓縮類型,則會覆蓋 broker 全局的配置,以 topic 自己配置的為準。
broker級別:要配置 broker 級別的壓縮類型,可通過?configs?命令修改? ?compression.type? 配置項取值。此處要使修改生效,是否需要重啟 broker 取決于 Kafak 的版本,在 1.1.0 之前,任何配置項的改動都需要重啟 broker 才生效,而從 1.1.0 版本開始,Kafka 引入了動態(tài) broker 參數(shù),將配置項分為三類:read-only、per-broker?和?cluster-wide,第一類跟原來一樣需重啟才生效,而后面兩類都是動態(tài)生效的,只是影響范圍不同,關(guān)于 Kafka 動態(tài)參數(shù),以后單開博文介紹。從?官網(wǎng)?可以看到,compression.type?是屬于?cluster-wide?的,如果是 1.1.0 及之后的版本,則無需重啟 broker。
topic級別:topic 的配置分為兩部分,一部分是 topic 特有的,如 partitions 等,另一部分則是默認采用 broker 配置,但也可以覆蓋。如果要定義 topic 級別的壓縮,可以在 topic 創(chuàng)建時通過 --config 選項覆蓋配置項?compression.type?的取值,命令如下:
sh bin/kafka-topics.sh \
--create \
--topic my-topic \
--replication-factor 1 \
--partitions 1 \
--config compression.type=snappy
也可以通過?configs?命令修改 topic 的?compression.type?取值,命令如下:
bin/kafka-configs.sh \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config compression.type=snappy
4.2?在 Producer 端壓縮
????????跟 broker 端一樣,producer 端的壓縮配置屬性依然是?compression.type,只不過默認值和可選值有所不同。默認值為?none,表示不壓縮。直接在代碼層面更改 producer 的 config。但需要注意的是,改完 config 之后,需要重啟 producer 端的應(yīng)用程序,壓縮才會生效。
代碼示例如下:
public class KafkaProducerTest {
public static void main(String[] args) {
String brokerList = "127.0.0.1:9092";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"2097245");
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
KafkaProducer
String topic = "mytestTopic1";
int sizeInMb = 2; // 設(shè)置字符串大小為2MB
int sizeInBytes = sizeInMb * 1024 * 1024; // 轉(zhuǎn)換為字節(jié)數(shù)
StringBuilder largeString = new StringBuilder(sizeInBytes);
largeString.append(":");
for (int i = 0; i < sizeInBytes; i++) {
largeString.append("A"); // 使用大寫字母"A"來構(gòu)建字符串
}
String msg = largeString.toString();
try {
for (int i = 0; i < 100; i++) {
String msg1 = i+msg;
producer.send(new ProducerRecord<>(topic, msg1));
Thread.sleep(500);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
上面示例特意制造了一個大字符串作為消息,測試壓縮,需要注意的是,配置壓縮的時候同時也需要配置消息的最大值。即:max.request.size。
5 解壓縮
可能發(fā)生解壓的地方依然是兩處:consumer 端和 broker 端。
consumer端:consumer 端發(fā)生解壓的唯一條件就是從 broker 端拉取到的消息是帶壓縮的。此時,consumer 會根據(jù)?recordBatch?中?compressionType?來對消息進行解壓。
broker端:broker 端是否發(fā)生解壓取決于 producer 發(fā)過來的批量消息?recordBatch?是否是壓縮的:如果 producer 開啟了壓縮,則會發(fā)生解壓,否則不會。原因簡單說下,在 broker 端持久化消息前,會對消息做各種驗證,此時必然會迭代?recordBatch,而在迭代的過程中,會直接采用?recordBatch?上的?compressionType?對消息字節(jié)流進行處理,是否解壓取決于?compressionType?是否是壓縮類型。關(guān)于這點,可以在? ?LogValidator?的?validateMessagesAndAssignOffsets?方法實現(xiàn)中可以看到,在?convertAndAssignOffsetsNonCompressed、assignOffsetsNonCompressed? 和?validateMessagesAndAssignOffsetsCompressed?三個不同的分支中,都會看到?records.batches.forEach {...}?的身影,而在后面的源碼分析中會發(fā)現(xiàn),在?recordBatch?的迭代器邏輯中,直接采用的?compressionType?的解壓邏輯對消息字節(jié)流讀取的。也就是說,如果?recordBatch?是壓縮的 ,只要對其進行了迭代訪問,則會自動觸發(fā)解壓邏輯。
通俗一點講:producer端配置了壓縮,consumer自動解壓縮。
柚子快報激活碼778899分享:kafka發(fā)送大消息
好文閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。