欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:分布式 Kafka詳解

柚子快報激活碼778899分享:分布式 Kafka詳解

http://yzkb.51969.com/

一、Kafka介紹

Kafka包括producer、consumer、broker、topic、Partition、Group

1、Producer

生產(chǎn)者即數(shù)據(jù)的發(fā)布者,該角色將消息發(fā)布到Kafka的topic中。broker接收到生產(chǎn)者發(fā)送的消息后, broker將該消息追加到當(dāng)前用于追加數(shù)據(jù)的segment文件中。生產(chǎn)者發(fā)送的消息,存儲到一個partition 中,生產(chǎn)者也可以指定數(shù)據(jù)存儲的partition。

2、Consumer

消費(fèi)者可以從broker中讀取數(shù)據(jù)。消費(fèi)者可以消費(fèi)多個topic中的數(shù)據(jù)。

3、Topic

在Kafka中,使用一個類別屬性來劃分?jǐn)?shù)據(jù)的所屬類,劃分?jǐn)?shù)據(jù)的這個類稱為topic。如果把Kafka看做 為一個數(shù)據(jù)庫,topic可以理解為數(shù)據(jù)庫中的一張表,topic的名字即為表名。

4、Partition

一個Topic下面會有多個Partition(分區(qū)),每個Partition都是一個有序隊列,Partition中的每條消息都會被分配一個有序的id。每個topic至少有一個partition。partition中的數(shù)據(jù)是有序的,partition間的數(shù)據(jù)丟失了數(shù)據(jù)的順序。如果 topic有多個partition,消費(fèi)數(shù)據(jù)時就不能保證數(shù)據(jù)的順序。在需要嚴(yán)格保證消息的消費(fèi)順序的場景下, 需要將partition數(shù)目設(shè)為1。

5、Partition o?set

每條消息都有一個當(dāng)前Partition下唯一的64字節(jié)的o?set,它指明了這條消息的起始位置。

6、Replicas of partition

副本是一個分區(qū)的備份。副本不會被消費(fèi)者消費(fèi),副本只用于防止數(shù)據(jù)丟失,即消費(fèi)者不從為follower 的partition中消費(fèi)數(shù)據(jù),而是從為leader的partition中讀取數(shù)據(jù)。副本之間是一主多從的關(guān)系。

7、Broker

Kafka 集群包含一個或多個服務(wù)器,服務(wù)器節(jié)點(diǎn)稱為broker。broker存儲topic的數(shù)據(jù)。如果某topic有 N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。如果某topic有N個 partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個 broker不存儲該topic的partition數(shù)據(jù)。如果某topic有N個partition,集群中broker數(shù)目少于N個,那么?一個broker存儲該topic的一個或多個partition。在實際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種 情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。

8、Leader

每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當(dāng)前負(fù)責(zé)數(shù)據(jù)的讀寫的 partition。

二、Kafka架構(gòu)組件

1)topic:消息存放的目錄即主題

2)Producer:生產(chǎn)消息到topic的一方

3)Consumer:訂閱topic消費(fèi)消息的一方

4)Broker:Kafka的服務(wù)實例就是一個broker

三、Kafka特點(diǎn)

Kafka:內(nèi)存、磁盤、數(shù)據(jù)庫、支持大量堆積Kafka:支持負(fù)載均衡集群方式,天然的‘Leader-Slave’無狀態(tài)集群,每臺服務(wù)器既是Master也是Slave。

四、Kafka配置

在kafka解壓目錄下下有一個config的文件夾,里面放置的是我們的配置文件

1、consumer.properites 消費(fèi)者配置

2、producer.properties 生產(chǎn)者配置?

3、server.properties kafka服務(wù)器的配置?

1)broker.id 申明當(dāng)前kafka服務(wù)器在集群中的唯一ID,需配置為integer,并且集群中的每一個kafka服務(wù)器的id都應(yīng)是唯一的,我們這里采用默認(rèn)配置即可

2)listeners 申明此kafka服務(wù)器需要監(jiān)聽的端口號,如果是在本機(jī)上跑虛擬機(jī)運(yùn)行可以不用配置本項,默認(rèn)會使用localhost的地址,如果是在遠(yuǎn)程服務(wù)器上運(yùn)行則必須配置。

????????例如:listeners=PLAINTEXT:// 192.168.180.128:9092。并確保服務(wù)器的9092端口能夠訪問

3)zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由于本次使用的是kafka高版本中自帶zookeeper,使用默認(rèn)配置即可zookeeper.connect=localhost:2181

??

五、常用配置項?

1、broker配置

配置項 作用 broker.id broker的唯一標(biāo)識 auto.create.topics.auto 設(shè)置成true,就是遇到?jīng)]有的topic自動創(chuàng)建topic。 log.dirs log的目錄數(shù),目錄里面放partition,當(dāng)生成新的partition時,會挑目錄里partition數(shù)最少的目錄放。

2、topic配置?

配置項 作用 num.partitions 新建一個topic,會有幾個partition。 log.retention.ms 對應(yīng)的還有minutes,hours的單位。日志保留時間,因為刪除是文件維度而不是消息維度,看的是日志文件的mtime。 log.retention.bytes partion最大的容量,超過就清理老的。注意這個是partion維度,就是說如果你的topic有8個partition,配置1G,那么平均分配下,topic理論最大值8G。 log.segment.bytes 一個segment的大小。超過了就滾動。 log.segment.ms 一個segment的打開時間,超過了就滾動。 message.max.bytes message最大多大

六、啟動

1、啟動ZooKeeper

.\zookeeper-server-start.bat ..\..\config\zookeeper.properties

2、啟動Kafka

.\kafka-server-start.bat ..\..\config\server.properties?

?七、linux環(huán)境下創(chuàng)建topic

[root@iZ2zegzlkedbo3e64vkbefZ ~]# cd /usr/local/kafka-cluster/kafka1/bin/

[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1

Created topic topic1.

[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2

Created topic topic2.

當(dāng)然我們也可以不手動創(chuàng)建topic,在執(zhí)行代碼kafkaTemplate.send("topic1", normalMessage)發(fā)送消息時,kafka會幫我們自動完成topic的創(chuàng)建工作,但這種情況下創(chuàng)建的topic默認(rèn)只有一個分區(qū),分區(qū)也沒有副本。

所以,我們可以在項目中新建一個配置類專門用來初始化topic,如下,

@Configuration

public class KafkaInitialConfiguration {

// 創(chuàng)建一個名為testtopic的Topic并設(shè)置分區(qū)數(shù)為8,分區(qū)副本數(shù)為2

@Bean

public NewTopic initialTopic() {

return new NewTopic("testtopic",8, (short) 1 );

}

?

// 如果要修改分區(qū)數(shù),只需修改配置值重啟項目即可

// 修改分區(qū)數(shù)并不會導(dǎo)致數(shù)據(jù)的丟失,但是分區(qū)數(shù)只能增大不能減小

@Bean

public NewTopic updateTopic() {

return new NewTopic("testtopic",10, (short) 2 );

}

}

八、Kafka API AdminClient的使用

1、創(chuàng)建kafka隊列

2、修改kafka分區(qū)數(shù)?

?3、查詢所有的topic

4、查詢topic是否存在

5、查詢Topic的配置信息

6、獲取指定topic的分區(qū)數(shù)?

?

?九、Springboot集成kafka

1、springboot引入kafka依賴

org.springframework.kafka

spring-kafka

2、application.propertise配置

###########【Kafka集群】###########

spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093

###########【初始化生產(chǎn)者配置】###########

# 重試次數(shù)

spring.kafka.producer.retries=0

# 應(yīng)答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)

spring.kafka.producer.acks=1

# 批量大小

spring.kafka.producer.batch-size=16384

# 提交延時

spring.kafka.producer.properties.linger.ms=0

# 當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka

# linger.ms為0表示每接收到一條消息就提交給kafka,這時候batch-size其實就沒用了

?

# 生產(chǎn)端緩沖區(qū)大小

spring.kafka.producer.buffer-memory = 33554432

# Kafka提供的序列化和反序列化類

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 自定義分區(qū)器

# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

?

###########【初始化消費(fèi)者配置】###########

# 默認(rèn)的消費(fèi)組ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

# 是否自動提交offset

spring.kafka.consumer.enable-auto-commit=true

# 提交offset延時(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

# 當(dāng)kafka中沒有初始o(jì)ffset或offset超出范圍時將自動重置offset

# earliest:重置為分區(qū)中最小的offset;

# latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù));

# none:只要有一個分區(qū)不存在已提交的offset,就拋出異常;

spring.kafka.consumer.auto-offset-reset=latest

# 消費(fèi)會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

# 消費(fèi)請求超時時間

spring.kafka.consumer.properties.request.timeout.ms=180000

# Kafka提供的序列化和反序列化類

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消費(fèi)端監(jiān)聽的topic不存在時,項目啟動會報錯(關(guān)掉)

spring.kafka.listener.missing-topics-fatal=false

# 設(shè)置批量消費(fèi)

# spring.kafka.listener.type=batch

# 批量消費(fèi)每次最多消費(fèi)多少條消息

# spring.kafka.consumer.max-poll-records=50

3、Kafka簡單操作

1)生產(chǎn)者

@RestController

@RequestMapping("/kafka")

public class KafkaProducer {

@Autowired

private KafkaTemplate kafkaTemplate;

?

// 發(fā)送消息

@GetMapping("/normal")

public void sendMessage1() {

kafkaTemplate.send("topic1", normalMessage);

}

}

2)消費(fèi)者

@Component

public class KafkaConsumer {

// 消費(fèi)監(jiān)聽

@KafkaListener(topics = {"topic1"})

public void onMessage1(ConsumerRecord record){

// 消費(fèi)的哪個topic、partition的消息,打印出消息內(nèi)容

System.out.println("簡單消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value());

}

}

消息確認(rèn)機(jī)制: 為確保消息被成功處理,可以使用消息確認(rèn)機(jī)制。例如,在消費(fèi)者中手動確認(rèn)消息:

@Service

public class KafkaConsumer {

@KafkaListener(topics = "my-topic", groupName = "my-group")

public void consume(String message) {

System.out.println("Consumed: " + message);

// 手動確認(rèn)消息已處理完成。

kafkaTemplate.acknowledge(Collections.singletonList(message)); // 如果是手動確認(rèn)模式。

}

}

3)帶回調(diào)的生產(chǎn)者

@GetMapping("/kafka/callbackOne/{message}")

public void sendMessage2(@PathVariable("message") String callbackMessage) {

kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {

// 消息發(fā)送到的topic

String topic = success.getRecordMetadata().topic();

// 消息發(fā)送到的分區(qū)

int partition = success.getRecordMetadata().partition();

// 消息在分區(qū)內(nèi)的offset

long offset = success.getRecordMetadata().offset();

System.out.println("發(fā)送消息成功:" + topic + "-" + partition + "-" + offset);

}, failure -> {

System.out.println("發(fā)送消息失敗:" + failure.getMessage());

});

}

4)指定topic、partition、offset消費(fèi)

/**

* @Title 指定topic、partition、offset消費(fèi)

* @Description 同時監(jiān)聽topic1和topic2,監(jiān)聽topic1的0號分區(qū)、topic2的 "0號和1號" 分區(qū),指向1號分區(qū)的offset初始值為8

* @Author long.yuan

* @Date 2020/3/22 13:38

* @Param [record]

* @return void

**/

@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {

@TopicPartition(topic = "topic1", partitions = { "1" }),

@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))

})

public void onMessage1(ConsumerRecord record){

// 消費(fèi)的哪個topic、partition的消息,打印出消息內(nèi)容

System.out.println("簡單消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value()+"-"+record.offset());

}

5)消息過濾器

@Component

public class KafkaConsumer {

@Autowired

ConsumerFactory consumerFactory;

?

// 消息過濾器

@Bean

public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(consumerFactory);

// 被過濾的消息將被丟棄

factory.setAckDiscarded(true);

// 消息過濾策略

factory.setRecordFilterStrategy(consumerRecord -> {

if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {

return false;

}

//返回true消息則被過濾

return true;

});

return factory;

}

?

// 消息過濾監(jiān)聽

@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")

public void onMessage6(ConsumerRecord record) {

System.out.println(record.value());

}

}

6)消息轉(zhuǎn)發(fā)

從topic1接收到的消息經(jīng)過處理后轉(zhuǎn)發(fā)到topic2

@KafkaListener(topics = {"topic1"})

@SendTo("topic2")

public String onMessage7(ConsumerRecord record) {

return record.value()+"-forward message";

}

十、Kafka保證消息不丟失

?Kafka 要保證消息不會丟失,需要在producer、broker、consumer共同保證消息不丟失

1、producer生產(chǎn)者配置

1)producer端使用producer.send(msg,callback) 帶有回調(diào) send 的方法,而不是producer.send(msg)方法,根據(jù)callback 回調(diào),一旦消息提交失敗,就可以針對性的補(bǔ)償處理。 2)設(shè)置ack=all,表面所有的broker上的副本都已經(jīng)落盤成功了,才算是“已提交” 3)retries >1自動重試的次數(shù),當(dāng)出現(xiàn)網(wǎng)絡(luò)問題時,消息可能會發(fā)送失敗,配置了retries 能夠自動重試,盡量避免消息丟失。最嚴(yán)謹(jǐn)?shù)姆绞绞鞘〉南罩居涗浕蛘呷霂?,然后定時重發(fā)。

2、Broker配置

1)unclean.leader.election.enable =false,禁止ISR之外的副本參與選舉,否則就有可能丟丟失消息 2)replication-factor >=3,需要三個以上的副本 3)min.insync.replicas>1,broker端的參數(shù),至少寫入多少個ISR中副本才算是“已提交”,大于1 可以提升消息的持久性,推薦設(shè)置replication-factor=min.insync.replicas+1

3、consumer消費(fèi)者配置

1)確保消息已經(jīng)消費(fèi)完成在提交

2)enable.auto.commit 設(shè)置成false,并自己來處理offset的提交更新

//文件KafkaReceiver中的消息接收,新增Acknowledgment 接收字段

@KafkaListener(id = "rollback_default_test", topics = {"topic.quick.default"})

public void receiveSk(ConsumerRecord record, Acknowledgment ack) {

System.out.println(record);

System.out.println("我收到了普通消息");

// 手動確認(rèn)消息被 消費(fèi)

ack.acknowledge();

// ack.nack(1000); 拒收當(dāng)前消息,并睡眠10秒鐘后再重新接收消息

// ack.nack(100,1000); 拒收當(dāng)前消息,并睡眠10秒鐘后接收第100條之后的消息

}

十一、常見消息中間件的介紹和對比

????Kafka、RabbitMQ、RocketMQ常見消息中間件的介紹和對比

1、Kafka

????????Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache頂級項目。Kafka主要為高吞吐量的訂閱發(fā)布系統(tǒng)而設(shè)計,追求速度與持久化。kafka中的消息由鍵、值、時間戳組成,kafka不記錄每個消息被誰使用,只通過偏移量記錄哪些消息是未讀的,kafka中可以指定消費(fèi)組來實現(xiàn)訂閱發(fā)布的功能。?

2、RabbitMQ

????????RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議來實現(xiàn)。AMQP的主要特征是面向消息、隊列、路由(包括點(diǎn)對點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。?

3、RocketMQ

????????RocketMQ是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計算、消息推送、日志流式處理、binglog分發(fā)等場景。支持的客戶端語言不多,目前是Java及C++,其中C++還不成熟;

?4、對比概覽

1、Rabbitmq比kafka可靠,kafka更適合IO高吞吐的處理,比如ELK日志收集。

2、kafka具有高的吞吐量,內(nèi)部采用消息的批量處理,zero-copy機(jī)制,數(shù)據(jù)的存儲和獲取是本地磁盤順序批量操作,具有O(1)的復(fù)雜度,消息處理的效率很高。rabbitMQ在吞吐量方面稍遜于kafka,他們的出發(fā)點(diǎn)不一樣,rabbitMQ支持對消息的可靠的傳遞,支持事務(wù),不支持批量的操作;基于存儲的可靠性的要求存儲可以采用內(nèi)存或者硬盤。

十二、docker啟動kafka

1、拉取zookeeper鏡像

docker pull?bitnami/zookeeper

2、拉取kafka鏡像

?docker pull?bitnami/kafka

3、docker-compose.yml?

version: "3"

services:

zookeeper:

image: bitnami/zookeeper:latest

container_name: zookeeper

# user: root

restart: always

ports:

- 2181:2181

environment:

# 匿名登錄--必須開啟

- ALLOW_ANONYMOUS_LOGIN=yes

volumes:

- ./zookeeper:/bitnami/zookeeper

# 該鏡像具體配置參考 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md

kafka:

image: bitnami/kafka:latest

container_name: kafka

restart: always

hostname: kafka

# user: root

ports:

- 9092:9092

environment:

- KAFKA_BROKER_ID=1

- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092

# 客戶端訪問地址,更換成自己的

- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.4.252:9092

- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181

- KAFKA_ADVERTISED_HOST_NAME=kafka

- KAFKA_ADVERTISED_PORT=9092

- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181

# 允許使用PLAINTEXT協(xié)議(鏡像中默認(rèn)為關(guān)閉,需要手動開啟)

- ALLOW_PLAINTEXT_LISTENER=yes

# 關(guān)閉自動創(chuàng)建 topic 功能

- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false

# 全局消息過期時間 6 小時(測試時可以設(shè)置短一點(diǎn))

- KAFKA_CFG_LOG_RETENTION_HOURS=6

# 開啟JMX監(jiān)控

# - JMX_PORT=9999

volumes:

- ./kafka:/bitnami/kafka

depends_on:

- zookeeper

# Web 管理界面 另外也可以用exporter+prometheus+grafana的方式來監(jiān)控 https://github.com/danielqsj/kafka_exporter

# kafdrop:

# image: obsidiandynamics/kafdrop:latest

# ports:

# - 9000:9000

# restart: always

# extra_hosts:

# - kafka1:192.168.4.252

# environment:

# KAFKA_BROKERCONNECT: "kafka:9092"

# depends_on:

# - zookeeper

# - kafka

# container_name: kafdrop

# cpus: '1'

# mem_limit: 1024m

柚子快報激活碼778899分享:分布式 Kafka詳解

http://yzkb.51969.com/

好文閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19106641.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄