柚子快報激活碼778899分享:分布式 Kafka詳解
柚子快報激活碼778899分享:分布式 Kafka詳解
一、Kafka介紹
Kafka包括producer、consumer、broker、topic、Partition、Group
1、Producer
生產(chǎn)者即數(shù)據(jù)的發(fā)布者,該角色將消息發(fā)布到Kafka的topic中。broker接收到生產(chǎn)者發(fā)送的消息后, broker將該消息追加到當(dāng)前用于追加數(shù)據(jù)的segment文件中。生產(chǎn)者發(fā)送的消息,存儲到一個partition 中,生產(chǎn)者也可以指定數(shù)據(jù)存儲的partition。
2、Consumer
消費(fèi)者可以從broker中讀取數(shù)據(jù)。消費(fèi)者可以消費(fèi)多個topic中的數(shù)據(jù)。
3、Topic
在Kafka中,使用一個類別屬性來劃分?jǐn)?shù)據(jù)的所屬類,劃分?jǐn)?shù)據(jù)的這個類稱為topic。如果把Kafka看做 為一個數(shù)據(jù)庫,topic可以理解為數(shù)據(jù)庫中的一張表,topic的名字即為表名。
4、Partition
一個Topic下面會有多個Partition(分區(qū)),每個Partition都是一個有序隊列,Partition中的每條消息都會被分配一個有序的id。每個topic至少有一個partition。partition中的數(shù)據(jù)是有序的,partition間的數(shù)據(jù)丟失了數(shù)據(jù)的順序。如果 topic有多個partition,消費(fèi)數(shù)據(jù)時就不能保證數(shù)據(jù)的順序。在需要嚴(yán)格保證消息的消費(fèi)順序的場景下, 需要將partition數(shù)目設(shè)為1。
5、Partition o?set
每條消息都有一個當(dāng)前Partition下唯一的64字節(jié)的o?set,它指明了這條消息的起始位置。
6、Replicas of partition
副本是一個分區(qū)的備份。副本不會被消費(fèi)者消費(fèi),副本只用于防止數(shù)據(jù)丟失,即消費(fèi)者不從為follower 的partition中消費(fèi)數(shù)據(jù),而是從為leader的partition中讀取數(shù)據(jù)。副本之間是一主多從的關(guān)系。
7、Broker
Kafka 集群包含一個或多個服務(wù)器,服務(wù)器節(jié)點(diǎn)稱為broker。broker存儲topic的數(shù)據(jù)。如果某topic有 N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。如果某topic有N個 partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個 broker不存儲該topic的partition數(shù)據(jù)。如果某topic有N個partition,集群中broker數(shù)目少于N個,那么?一個broker存儲該topic的一個或多個partition。在實際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種 情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。
8、Leader
每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當(dāng)前負(fù)責(zé)數(shù)據(jù)的讀寫的 partition。
二、Kafka架構(gòu)組件
1)topic:消息存放的目錄即主題
2)Producer:生產(chǎn)消息到topic的一方
3)Consumer:訂閱topic消費(fèi)消息的一方
4)Broker:Kafka的服務(wù)實例就是一個broker
三、Kafka特點(diǎn)
Kafka:內(nèi)存、磁盤、數(shù)據(jù)庫、支持大量堆積Kafka:支持負(fù)載均衡集群方式,天然的‘Leader-Slave’無狀態(tài)集群,每臺服務(wù)器既是Master也是Slave。
四、Kafka配置
在kafka解壓目錄下下有一個config的文件夾,里面放置的是我們的配置文件
1、consumer.properites 消費(fèi)者配置
2、producer.properties 生產(chǎn)者配置?
3、server.properties kafka服務(wù)器的配置?
1)broker.id 申明當(dāng)前kafka服務(wù)器在集群中的唯一ID,需配置為integer,并且集群中的每一個kafka服務(wù)器的id都應(yīng)是唯一的,我們這里采用默認(rèn)配置即可
2)listeners 申明此kafka服務(wù)器需要監(jiān)聽的端口號,如果是在本機(jī)上跑虛擬機(jī)運(yùn)行可以不用配置本項,默認(rèn)會使用localhost的地址,如果是在遠(yuǎn)程服務(wù)器上運(yùn)行則必須配置。
????????例如:listeners=PLAINTEXT:// 192.168.180.128:9092。并確保服務(wù)器的9092端口能夠訪問
3)zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由于本次使用的是kafka高版本中自帶zookeeper,使用默認(rèn)配置即可zookeeper.connect=localhost:2181
??
五、常用配置項?
1、broker配置
配置項 作用 broker.id broker的唯一標(biāo)識 auto.create.topics.auto 設(shè)置成true,就是遇到?jīng)]有的topic自動創(chuàng)建topic。 log.dirs log的目錄數(shù),目錄里面放partition,當(dāng)生成新的partition時,會挑目錄里partition數(shù)最少的目錄放。
2、topic配置?
配置項 作用 num.partitions 新建一個topic,會有幾個partition。 log.retention.ms 對應(yīng)的還有minutes,hours的單位。日志保留時間,因為刪除是文件維度而不是消息維度,看的是日志文件的mtime。 log.retention.bytes partion最大的容量,超過就清理老的。注意這個是partion維度,就是說如果你的topic有8個partition,配置1G,那么平均分配下,topic理論最大值8G。 log.segment.bytes 一個segment的大小。超過了就滾動。 log.segment.ms 一個segment的打開時間,超過了就滾動。 message.max.bytes message最大多大
六、啟動
1、啟動ZooKeeper
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2、啟動Kafka
.\kafka-server-start.bat ..\..\config\server.properties?
?七、linux環(huán)境下創(chuàng)建topic
[root@iZ2zegzlkedbo3e64vkbefZ ~]# cd /usr/local/kafka-cluster/kafka1/bin/
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
Created topic topic1.
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
Created topic topic2.
當(dāng)然我們也可以不手動創(chuàng)建topic,在執(zhí)行代碼kafkaTemplate.send("topic1", normalMessage)發(fā)送消息時,kafka會幫我們自動完成topic的創(chuàng)建工作,但這種情況下創(chuàng)建的topic默認(rèn)只有一個分區(qū),分區(qū)也沒有副本。
所以,我們可以在項目中新建一個配置類專門用來初始化topic,如下,
@Configuration
public class KafkaInitialConfiguration {
// 創(chuàng)建一個名為testtopic的Topic并設(shè)置分區(qū)數(shù)為8,分區(qū)副本數(shù)為2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 1 );
}
?
// 如果要修改分區(qū)數(shù),只需修改配置值重啟項目即可
// 修改分區(qū)數(shù)并不會導(dǎo)致數(shù)據(jù)的丟失,但是分區(qū)數(shù)只能增大不能減小
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
}
八、Kafka API AdminClient的使用
1、創(chuàng)建kafka隊列
2、修改kafka分區(qū)數(shù)?
?3、查詢所有的topic
4、查詢topic是否存在
5、查詢Topic的配置信息
6、獲取指定topic的分區(qū)數(shù)?
?
?九、Springboot集成kafka
1、springboot引入kafka依賴
2、application.propertise配置
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生產(chǎn)者配置】###########
# 重試次數(shù)
spring.kafka.producer.retries=0
# 應(yīng)答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延時
spring.kafka.producer.properties.linger.ms=0
# 當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka
# linger.ms為0表示每接收到一條消息就提交給kafka,這時候batch-size其實就沒用了
?
# 生產(chǎn)端緩沖區(qū)大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定義分區(qū)器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
?
###########【初始化消費(fèi)者配置】###########
# 默認(rèn)的消費(fèi)組ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延時(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 當(dāng)kafka中沒有初始o(jì)ffset或offset超出范圍時將自動重置offset
# earliest:重置為分區(qū)中最小的offset;
# latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù));
# none:只要有一個分區(qū)不存在已提交的offset,就拋出異常;
spring.kafka.consumer.auto-offset-reset=latest
# 消費(fèi)會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費(fèi)請求超時時間
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消費(fèi)端監(jiān)聽的topic不存在時,項目啟動會報錯(關(guān)掉)
spring.kafka.listener.missing-topics-fatal=false
# 設(shè)置批量消費(fèi)
# spring.kafka.listener.type=batch
# 批量消費(fèi)每次最多消費(fèi)多少條消息
# spring.kafka.consumer.max-poll-records=50
3、Kafka簡單操作
1)生產(chǎn)者
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {
@Autowired
private KafkaTemplate
?
// 發(fā)送消息
@GetMapping("/normal")
public void sendMessage1() {
kafkaTemplate.send("topic1", normalMessage);
}
}
2)消費(fèi)者
@Component
public class KafkaConsumer {
// 消費(fèi)監(jiān)聽
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord, ?> record){
// 消費(fèi)的哪個topic、partition的消息,打印出消息內(nèi)容
System.out.println("簡單消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
消息確認(rèn)機(jī)制: 為確保消息被成功處理,可以使用消息確認(rèn)機(jī)制。例如,在消費(fèi)者中手動確認(rèn)消息:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupName = "my-group")
public void consume(String message) {
System.out.println("Consumed: " + message);
// 手動確認(rèn)消息已處理完成。
kafkaTemplate.acknowledge(Collections.singletonList(message)); // 如果是手動確認(rèn)模式。
}
}
3)帶回調(diào)的生產(chǎn)者
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息發(fā)送到的topic
String topic = success.getRecordMetadata().topic();
// 消息發(fā)送到的分區(qū)
int partition = success.getRecordMetadata().partition();
// 消息在分區(qū)內(nèi)的offset
long offset = success.getRecordMetadata().offset();
System.out.println("發(fā)送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("發(fā)送消息失敗:" + failure.getMessage());
});
}
4)指定topic、partition、offset消費(fèi)
/**
* @Title 指定topic、partition、offset消費(fèi)
* @Description 同時監(jiān)聽topic1和topic2,監(jiān)聽topic1的0號分區(qū)、topic2的 "0號和1號" 分區(qū),指向1號分區(qū)的offset初始值為8
* @Author long.yuan
* @Date 2020/3/22 13:38
* @Param [record]
* @return void
**/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "1" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage1(ConsumerRecord, ?> record){
// 消費(fèi)的哪個topic、partition的消息,打印出消息內(nèi)容
System.out.println("簡單消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value()+"-"+record.offset());
}
5)消息過濾器
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
?
// 消息過濾器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被過濾的消息將被丟棄
factory.setAckDiscarded(true);
// 消息過濾策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息則被過濾
return true;
});
return factory;
}
?
// 消息過濾監(jiān)聽
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord, ?> record) {
System.out.println(record.value());
}
}
6)消息轉(zhuǎn)發(fā)
從topic1接收到的消息經(jīng)過處理后轉(zhuǎn)發(fā)到topic2
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord, ?> record) {
return record.value()+"-forward message";
}
十、Kafka保證消息不丟失
?Kafka 要保證消息不會丟失,需要在producer、broker、consumer共同保證消息不丟失
1、producer生產(chǎn)者配置
1)producer端使用producer.send(msg,callback) 帶有回調(diào) send 的方法,而不是producer.send(msg)方法,根據(jù)callback 回調(diào),一旦消息提交失敗,就可以針對性的補(bǔ)償處理。 2)設(shè)置ack=all,表面所有的broker上的副本都已經(jīng)落盤成功了,才算是“已提交” 3)retries >1自動重試的次數(shù),當(dāng)出現(xiàn)網(wǎng)絡(luò)問題時,消息可能會發(fā)送失敗,配置了retries 能夠自動重試,盡量避免消息丟失。最嚴(yán)謹(jǐn)?shù)姆绞绞鞘〉南罩居涗浕蛘呷霂?,然后定時重發(fā)。
2、Broker配置
1)unclean.leader.election.enable =false,禁止ISR之外的副本參與選舉,否則就有可能丟丟失消息 2)replication-factor >=3,需要三個以上的副本 3)min.insync.replicas>1,broker端的參數(shù),至少寫入多少個ISR中副本才算是“已提交”,大于1 可以提升消息的持久性,推薦設(shè)置replication-factor=min.insync.replicas+1
3、consumer消費(fèi)者配置
1)確保消息已經(jīng)消費(fèi)完成在提交
2)enable.auto.commit 設(shè)置成false,并自己來處理offset的提交更新
//文件KafkaReceiver中的消息接收,新增Acknowledgment 接收字段
@KafkaListener(id = "rollback_default_test", topics = {"topic.quick.default"})
public void receiveSk(ConsumerRecord
System.out.println(record);
System.out.println("我收到了普通消息");
// 手動確認(rèn)消息被 消費(fèi)
ack.acknowledge();
// ack.nack(1000); 拒收當(dāng)前消息,并睡眠10秒鐘后再重新接收消息
// ack.nack(100,1000); 拒收當(dāng)前消息,并睡眠10秒鐘后接收第100條之后的消息
}
十一、常見消息中間件的介紹和對比
????Kafka、RabbitMQ、RocketMQ常見消息中間件的介紹和對比
1、Kafka
????????Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache頂級項目。Kafka主要為高吞吐量的訂閱發(fā)布系統(tǒng)而設(shè)計,追求速度與持久化。kafka中的消息由鍵、值、時間戳組成,kafka不記錄每個消息被誰使用,只通過偏移量記錄哪些消息是未讀的,kafka中可以指定消費(fèi)組來實現(xiàn)訂閱發(fā)布的功能。?
2、RabbitMQ
????????RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議來實現(xiàn)。AMQP的主要特征是面向消息、隊列、路由(包括點(diǎn)對點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。?
3、RocketMQ
????????RocketMQ是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計算、消息推送、日志流式處理、binglog分發(fā)等場景。支持的客戶端語言不多,目前是Java及C++,其中C++還不成熟;
?4、對比概覽
1、Rabbitmq比kafka可靠,kafka更適合IO高吞吐的處理,比如ELK日志收集。
2、kafka具有高的吞吐量,內(nèi)部采用消息的批量處理,zero-copy機(jī)制,數(shù)據(jù)的存儲和獲取是本地磁盤順序批量操作,具有O(1)的復(fù)雜度,消息處理的效率很高。rabbitMQ在吞吐量方面稍遜于kafka,他們的出發(fā)點(diǎn)不一樣,rabbitMQ支持對消息的可靠的傳遞,支持事務(wù),不支持批量的操作;基于存儲的可靠性的要求存儲可以采用內(nèi)存或者硬盤。
十二、docker啟動kafka
1、拉取zookeeper鏡像
docker pull?bitnami/zookeeper
2、拉取kafka鏡像
?docker pull?bitnami/kafka
3、docker-compose.yml?
version: "3"
services:
zookeeper:
image: bitnami/zookeeper:latest
container_name: zookeeper
# user: root
restart: always
ports:
- 2181:2181
environment:
# 匿名登錄--必須開啟
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- ./zookeeper:/bitnami/zookeeper
# 該鏡像具體配置參考 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
kafka:
image: bitnami/kafka:latest
container_name: kafka
restart: always
hostname: kafka
# user: root
ports:
- 9092:9092
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092
# 客戶端訪問地址,更換成自己的
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.4.252:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# 允許使用PLAINTEXT協(xié)議(鏡像中默認(rèn)為關(guān)閉,需要手動開啟)
- ALLOW_PLAINTEXT_LISTENER=yes
# 關(guān)閉自動創(chuàng)建 topic 功能
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# 全局消息過期時間 6 小時(測試時可以設(shè)置短一點(diǎn))
- KAFKA_CFG_LOG_RETENTION_HOURS=6
# 開啟JMX監(jiān)控
# - JMX_PORT=9999
volumes:
- ./kafka:/bitnami/kafka
depends_on:
- zookeeper
# Web 管理界面 另外也可以用exporter+prometheus+grafana的方式來監(jiān)控 https://github.com/danielqsj/kafka_exporter
# kafdrop:
# image: obsidiandynamics/kafdrop:latest
# ports:
# - 9000:9000
# restart: always
# extra_hosts:
# - kafka1:192.168.4.252
# environment:
# KAFKA_BROKERCONNECT: "kafka:9092"
# depends_on:
# - zookeeper
# - kafka
# container_name: kafdrop
# cpus: '1'
# mem_limit: 1024m
柚子快報激活碼778899分享:分布式 Kafka詳解
好文閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。