柚子快報(bào)邀請(qǐng)碼778899分享:kafka安全機(jī)制(SASL
柚子快報(bào)邀請(qǐng)碼778899分享:kafka安全機(jī)制(SASL
在 Kafka 中,SASL 是一種重要的安全協(xié)議,用于提供基于身份驗(yàn)證的訪問(wèn)控制。Kafka 使用 SASL 來(lái)支持各種身份驗(yàn)證機(jī)制,如:
PLAIN(基于用戶名和密碼的認(rèn)證)GSSAPI(基于 Kerberos 的認(rèn)證)SCRAM(Salted Challenge Response Authentication Mechanism)OAUTHBEARER
具體信息可以參考官網(wǎng):kafka安全機(jī)制官網(wǎng)-2.7 這里采用SCRAM用于kafka安全機(jī)制的實(shí)現(xiàn),而不是采用其他方式實(shí)現(xiàn),主要有如下原因:
SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0。主要是為 Kerberos 使用,如果當(dāng)前已有 Kerberos 認(rèn)證,只需要為集群中每個(gè) Broker 和訪問(wèn)用戶申請(qǐng) Principle ,然后在 Kafka 配置文件中開啟 Kerberos 的支持即可,一般用于大型公司。SASL/PLAIN - starting at version 0.10.0.0。一個(gè)簡(jiǎn)單的用戶名和密碼身份認(rèn)證機(jī)制,通常與 TLS/SSL 一起用于加密,以實(shí)現(xiàn)身份驗(yàn)證。是一種比較容易使用的方式,但是也有一個(gè)很明顯的缺點(diǎn),這種方式會(huì)把用戶賬戶文件配置到靜態(tài)文件中,每次想要添加新的賬戶都需要重啟 Kafka 去加載靜態(tài)文件,才能生效,十分不方便。SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - starting at version 0.10.2.0。通過(guò)將認(rèn)證信息保存在 ZooKeeper 里面,從而動(dòng)態(tài)的獲取用戶信息,相當(dāng)于把 ZK 用作一個(gè)認(rèn)證中心使用。這種認(rèn)證可以在使用過(guò)程中,使用 Kafka 提供的命令動(dòng)態(tài)地創(chuàng)建和刪除用戶,無(wú)需重啟整個(gè)集群,十分方便。SASL/OAUTHBEARER - starting at version 2.0。 Kafka 引入的新認(rèn)證機(jī)制,主要是為了實(shí)現(xiàn)與 OAuth2 框架的集成,Kafka 不提倡單純使用 OAUTHBEARER,因?yàn)樗傻牟话踩?Json Web Token,必須配以 SSL 加密才能在生產(chǎn)環(huán)境中使用。
一、環(huán)境搭建
1.1 環(huán)境準(zhǔn)備
jdk1.8apache-zookeeper-3.5.9-binkafka_2.12-2.7.1
安裝順序:jdk–>zookeeper–>kafka
小知識(shí):kafka版本命名約定 kafka: 這部分指的是 Apache Kafka,一個(gè)開源的分布式事件流平臺(tái)。Kafka 提供了一種可靠的、可擴(kuò)展的發(fā)布-訂閱消息系統(tǒng),可以處理大規(guī)模的實(shí)時(shí)數(shù)據(jù)流。 2.12: 這表示 Scala 的版本。在 Kafka 的情況下,2.12意味著它是使用 Scala 2.12 編譯的。Scala 是一種運(yùn)行在 Java 虛擬機(jī)上的多范式編程語(yǔ)言,被用于 Kafka 的實(shí)現(xiàn)。 2.7.1: 這是 Kafka 的版本號(hào)。在這個(gè)例子中,版本號(hào)是 2.7.1。版本號(hào)通常表示軟件的發(fā)布版本,新版本通常包含新功能、改進(jìn)和修復(fù)之前版本的 bug。
安裝文件解壓之后,需要將kafka的libs目錄下的如下jar賦值到zookeeper的lib目錄下:
cp ${KAFKA_HOME}/libs/kafka-clients-2.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/lz4-java-1.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/snappy-java-1.1.7.7.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-api-1.7.30.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-log4j12-1.7.30.jar ${ZOOKEEPER_HOME}/lib
每個(gè)kafka對(duì)應(yīng)的zookeeper版本不一樣,建議先下載想要的kafka版本,解壓之后,查看libs下依賴的zookeeper版本,然后去官網(wǎng)下載對(duì)應(yīng)的版本進(jìn)行安裝?。?!
1.2 JDK的安裝與配置
Kafka從2.0.0版本開始就不再支持JDK7及以下版本
到官網(wǎng)下載jdk安裝包,并上傳至Linux的/opt目錄下解壓安裝包配置jdk環(huán)境變量,修改/etc/profile文件并向其中添加如下配置
export JAVA_HOME=/opt/jdk解壓后的文件名
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib
生效配置文件,source/etc/profile命令使配置生效
source /etc/profile
通過(guò)java-version命令驗(yàn)證JDK 是否已經(jīng)安裝配置成功
java -version
1.3 Zookeeper的安裝與配置
ZooKeeper是安裝Kafka集群的必要組件,Kafka通過(guò)ZooKeeper來(lái)實(shí)施對(duì)元數(shù)據(jù)信息的管理,包括集群、broker、主題、分區(qū)等內(nèi)容。
ZooKeeper是一個(gè)開源的分布式協(xié)調(diào)服務(wù),是Google Chubby的一個(gè)開源實(shí)現(xiàn)。分布式應(yīng)用程序可以基于ZooKeeper實(shí)現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱、負(fù)載均衡、命名服務(wù)、分布式協(xié)調(diào)/通知、集群管理、Master選舉、配置維護(hù)等功能。在ZooKeeper中共有3個(gè)角色:leader、follower和observer,同一時(shí)刻 ZooKeeper集群中只會(huì)有一個(gè)leader,其他的都是follower和observer。observer不參與投票,默認(rèn)情況下 ZooKeeper 中只有 leader 和 follower 兩個(gè)角色。更多相關(guān)知識(shí)可以查閱ZooKeeper官方網(wǎng)站來(lái)獲得。
1.3.1 單機(jī)模式安裝
到官網(wǎng)下載zookeeper安裝包,并上傳至Linux的/opt目錄下解壓壓縮包添加配置,向/etc/profile配置文件中添加如下內(nèi)容
export ZOOKEEPER_HOME=/opt/zookeeper解壓后的文件名
export PATH=$PATH:$ZOOKEEPER_HOME/bin
執(zhí)行source/etc/profile命令使配置生效
source /etc/profile
修改 ZooKeeper 的配置文件。首先進(jìn)入$ZOOKEEPER_HOME/conf 目錄,并將zoo_sample.cfg文件修改為zoo.cfg修改zoo.cfg配置文件,zoo.cfg文件的內(nèi)容參考如下
# ZooKeeper服務(wù)器心跳時(shí)間,單位為ms
tickTime=2000
# 投票選舉新leader的初始化時(shí)間
initLimit=10
# leader與follower心跳檢測(cè)最大客忍時(shí)間,響應(yīng)超過(guò)syncLimit*tickTime,leader認(rèn)為
# fo11ower“死掉”,從服務(wù)器列表中別除fol1ower
syncLimit=5
# 數(shù)據(jù)目錄
dataDir=/tmp/zookeeper/data
# 日志目錄
dataLogDir=/tmp/zookeeper/log
# ZooKeeper對(duì)外服務(wù)端口
clientPort=2181
默認(rèn)情況下,Linux系統(tǒng)中沒有/tmp/zookeeper/data和/tmp/zookeeper/log這兩個(gè)目錄,所以接下來(lái)還要?jiǎng)?chuàng)建這兩個(gè)目錄
mkdir -p /tmp/zookeeper/data
mkdir -p /tmp/zookeeper/log
在${dataDir}目錄(也就是/tmp/zookeeper/data)下創(chuàng)建一個(gè)myid文件,并寫入一個(gè)數(shù)值,比如0。myid文件里存放的是服務(wù)器的編號(hào)通過(guò)zkServer.sh start啟動(dòng)Zookeeper服務(wù)
zkServer.sh start
通過(guò)zkServer.sh status查看啟動(dòng)狀態(tài)
zkServer.sh status
1.3.2 集群模式安裝
以上是關(guān)于ZooKeeper單機(jī)模式的安裝與配置,一般在生產(chǎn)環(huán)境中使用的都是集群模式,集群模式的配置也比較簡(jiǎn)單,相比單機(jī)模式而言只需要修改一些配置即可。下面以3臺(tái)機(jī)器為例來(lái)配置一個(gè)ZooKeeper集群。首先在這3臺(tái)機(jī)器的**/etc/hosts文件中添加3臺(tái)集群的IP地址與機(jī)器域名的映射,示例如下(3個(gè)IP地址分別對(duì)應(yīng)3臺(tái)機(jī)器) 然后在這3臺(tái)機(jī)器的zoo.cfg**文件中添加以下配置:
為了便于講解上面的配置,這里抽象出一個(gè)公式,即 server.A=B:C:D。其中:
A是一個(gè)數(shù)字,代表服務(wù)器的編號(hào),就是前面所說(shuō)的myid文件里面的值。集群中每臺(tái)服務(wù)器的編號(hào)都必須唯一,所以要保證每臺(tái)服務(wù)器中的myid文件中的值不同。B代表服務(wù)器的IP地址。C表示服務(wù)器與集群中的 leader 服務(wù)器交換信息的端口。D表示選舉時(shí)服務(wù)器相互通信的端口。
1.3.3 Zookeeper安全認(rèn)證配置
zookeeper和kafka在默認(rèn)情況下,是沒有開啟安全認(rèn)證的,那么任意客戶端可以在不需要任何身份認(rèn)證的情況下訪問(wèn)zookeeper和kafka下的各節(jié)點(diǎn),甚至可以進(jìn)行節(jié)點(diǎn)的增加,修改以及刪除的動(dòng)作。注意,前面的動(dòng)作是基于客戶端能訪問(wèn)服務(wù)端所在的網(wǎng)絡(luò),如果進(jìn)行了物理隔絕或者做了防火墻限制,那前述內(nèi)容就不一定成立。但是,在某些對(duì)安全加固要求比較嚴(yán)格的客戶或者生產(chǎn)環(huán)境中,那就必須開啟安全認(rèn)證才行。除了最基本的身份認(rèn)證以外,還有針對(duì)每個(gè)節(jié)點(diǎn)的權(quán)限訪問(wèn),但本文不涉及該話題。 進(jìn)入正題,先從zookeeper開始配置,zookeeper官網(wǎng)提供了認(rèn)證配置的參考,點(diǎn)擊下方官網(wǎng)地址,即可查看詳情。配置分兩種情況:
客戶端和服務(wù)端的雙向認(rèn)證(3.4.0開始引入)服務(wù)端與服務(wù)端的雙向認(rèn)證(2.4.10開始引入)
如果是非集群模式下,僅配置客戶端和服務(wù)端的雙向認(rèn)證即可。集群模式下,則需要客戶端和服務(wù)端的認(rèn)證以及zookeeper服務(wù)器之間的雙向認(rèn)證。 Zookeeper 使用的是Java自帶的認(rèn)證和授權(quán)服務(wù)(簡(jiǎn)稱:JAAS),詳細(xì)內(nèi)容請(qǐng)看官網(wǎng),該鏈接是 Java 8 的 JAAS 的介紹。這里為zookeeper和kafka分別在對(duì)應(yīng)配置文件下創(chuàng)建jass配置文件為(文件名可以隨意):
zookeeper:${ZOOKEEPER_HOME}/conf/zoo_jaas.confkafka:${KAFKA_HOME}/config/kafka-server-jaas.conf
注意:本節(jié)中的客戶端指的kafka,服務(wù)端指的是zookeeper
1.3.3.1 客戶端和服務(wù)端的雙向認(rèn)證
配置zookeeper服務(wù)端
在zoo_jaas.conf添加如下配置
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeepersecret”
user_kafka="kafkasecret";
};
修改zoo.cfg配置
# 強(qiáng)制進(jìn)行SASL認(rèn)證
sessionRequireClientSASLAuth=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
增加jvm參數(shù),在${ZOOKEEPER_HOME}/bin/zkEnv.sh腳本中增加:
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOKEEPER_HOME}/conf/zoo_jaas.conf"
配置客戶端
在kafka-server-jaas.conf中添加如下配置:
Client{
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafkasecret";
};
修改客戶端的啟動(dòng)腳本${KAFKA_HOME}/bin/kafka-server-start.sh,增加jvm參數(shù):
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka-server-jaas.conf kafka.Kafka "$@"
1.3.3.2 服務(wù)端與服務(wù)端的雙向認(rèn)證
修改zoo.cfg,增加如下配置:
quorum.auth.enableSasl=true # 打開sasl開關(guān), 默認(rèn)是關(guān)的
quorum.auth.learnerRequireSasl=true # ZK做為leaner的時(shí)候, 會(huì)發(fā)送認(rèn)證信息
quorum.auth.serverRequireSasl=true # 設(shè)置為true的時(shí)候,learner連接的時(shí)候需要發(fā)送認(rèn)證信息,否則拒絕
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=20 # 建議設(shè)置成ZK節(jié)點(diǎn)的數(shù)量乘2
修改zoo_jaas.conf,增加如下配置:
QuorumServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_test="test";
};
QuorumLearner {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="test"
password="test";
};
QuorumServer 和 QuorumLearner 都是配置的ZK節(jié)點(diǎn)之間的認(rèn)證配置
1.4 Kafka的安裝與配置
1.4.1 單機(jī)模式安裝
到官網(wǎng)下載kafka安裝包,并上傳至Linux的/opt目錄下解壓壓縮包修改broker的配置文件**$KAFKA_HOME/conf/server.properties**
# broker的編號(hào),如果集群中有多個(gè)broker,則每個(gè)broker的編號(hào)需要設(shè)置的不同
broker.id=0
# broker對(duì)外提供的服務(wù)入口地址
listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,為了方便演示,我們假設(shè)Kafka和ZooKeeper都安裝在本機(jī)
zookeeper.connect=localhost:2181/kafka
1.4.2 集群模式安裝
如果是單機(jī)模式,那么修改完上述配置參數(shù)之后就可以啟動(dòng)服務(wù)。如果是集群模式,那么只需要對(duì)單機(jī)模式的配置文件做相應(yīng)的修改即可:確保集群中每個(gè)broker的broker.id配置參數(shù)的值不一樣,以及l(fā)isteners配置參數(shù)也需要修改為與broker對(duì)應(yīng)的IP地址或域名,之后就可以各自啟動(dòng)服務(wù)。注意,在啟動(dòng) Kafka 服務(wù)之前同樣需要確保 zookeeper.connect參數(shù)所配置的ZooKeeper服務(wù)已經(jīng)正確啟動(dòng)。
1.4.3 Kafka安全認(rèn)證配置
通過(guò)kafka-configs.sh腳本生成一個(gè)用戶admin作為超級(jí)管理員
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin
注意:在配置kafka的server.properties對(duì)于zookeeper的連接我們采用的是zookeeper的CHROOT,所以上述命令中也需要指定對(duì)應(yīng)路徑,不然啟動(dòng)的kafka時(shí)會(huì)獲取不到生成的SCRAM認(rèn)證信息?。?!
小知識(shí): ZooKeeper 中的 CHROOT 是指將 ZooKeeper 的命名空間限定在一個(gè)特定的路徑下。這就是說(shuō),ZooKeeper 的所有數(shù)據(jù)和操作都將在指定的路徑下進(jìn)行,而不是整個(gè) ZooKeeper 服務(wù)器上。CHROOT 功能允許在一個(gè) ZooKeeper 集群上運(yùn)行多個(gè)獨(dú)立的 ZooKeeper 實(shí)例,每個(gè)實(shí)例都有自己的命名空間。 在 ZooKeeper 的配置文件 zoo.cfg 中,CHROOT 通過(guò)配置項(xiàng) chroot 來(lái)設(shè)置。例如: chroot=/myapp 在這個(gè)例子中,ZooKeeper 就會(huì)將其根路徑設(shè)置為 /myapp,而不是默認(rèn)的根路徑。這樣,對(duì)于 ZooKeeper 中的所有路徑,都將以 /myapp 為根進(jìn)行解釋。這就好比把 ZooKeeper 變成了一個(gè)容器,其內(nèi)部的所有路徑都相對(duì)于 /myapp 這個(gè)容器。 CHROOT 的使用場(chǎng)景包括:
隔離命名空間: 允許多個(gè)應(yīng)用在同一個(gè) ZooKeeper 集群上使用不同的命名空間,防止彼此之間的命名沖突。模擬多個(gè)獨(dú)立環(huán)境: 允許在同一個(gè) ZooKeeper 集群上模擬多個(gè)獨(dú)立的環(huán)境,每個(gè)環(huán)境有自己的數(shù)據(jù)和配置。
要注意的是,如果你在使用 CHROOT,ZooKeeper 客戶端在連接到 ZooKeeper 服務(wù)器時(shí),也需要指定相應(yīng)的 CHROOT 路徑。例如,如果 CHROOT 設(shè)置為 /myapp,那么客戶端在連接時(shí)需要指定 “/myapp” 作為根路徑。
總的來(lái)說(shuō),CHROOT 提供了一種簡(jiǎn)單而有效的方式,使得在同一個(gè) ZooKeeper 集群上可以支持多個(gè)隔離的命名空間。
生成之后可以通過(guò)如下命令進(jìn)行查看:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type users --entity-name admin
也可以添加其他SCRAM認(rèn)證信息,例如SCRAM-SHA-512:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin512]' --entity-type users --entity-name admin
也可使用如下命令刪除已經(jīng)添加的認(rèn)證信息:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin
修改kafka-server-jaas.conf,增加kafka服務(wù)的SCRAM認(rèn)證用戶信息
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
修改server.properties,新增如下配置:
# 啟用ACL
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 設(shè)置本例中admin為超級(jí)用戶;在Zookeeper的“/kafka/config/users”下存在用戶
super.users=User:admin
# 同時(shí)啟用SCRAM和PLAIN機(jī)制
sasl.enabled.mechanisms=SCRAM-SHA-256
# 為broker間通訊開啟SCRAM機(jī)制,采用SCRAM-SHA-256算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker間通訊使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.64.102:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://192.168.64.102:9092
如果是集群,上述配置每個(gè)節(jié)點(diǎn)都應(yīng)該配置一份?。。?/p>
1.4.4 服務(wù)啟動(dòng)
啟動(dòng)Kafka服務(wù),在$KAFKA_HOME目錄下執(zhí)行下面的命令即可
bin/kafka-server-start.sh config/server.properties
如果要在后臺(tái)運(yùn)行Kafka服務(wù),那么可以在啟動(dòng)命令中加入-daemon參數(shù)或&字符,示例如下:
bin/kafka-server-start.sh -daemon config/server.properties
或者
bin/kafka-server-start.sh config/server.properties &
通過(guò)jps命令查看Kafka服務(wù)進(jìn)程是否已經(jīng)啟動(dòng)
二、Admin API使用
2.1 SCRAM用戶操作
package com.kafka.adminclient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collections;
import java.util.Map;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class KafkaUserOperator {
private final AdminClient adminClient;
public KafkaUserOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
public boolean createScramUser(String username, String password) {
boolean res = false;
//指定一個(gè)協(xié)議ScramMechanism,迭代次數(shù)iterations還沒搞清楚干嘛的,設(shè)置太小會(huì)報(bào)錯(cuò)
ScramCredentialInfo scramCredentialInfo = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000);
//創(chuàng)建Scram用戶憑證,用戶不存在,會(huì)先創(chuàng)建用戶
UserScramCredentialAlteration userScramCredentialUpsertion = new UserScramCredentialUpsertion(username, scramCredentialInfo, password);
AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialUpsertion));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
System.err.println("返回信息:" + exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
public boolean deleteScramUser(String username) {
boolean res = false;
//刪除Scram用戶憑證,刪除后用戶無(wú)權(quán)限操作kafka,zk中用戶節(jié)點(diǎn)還會(huì)存在
UserScramCredentialAlteration userScramCredentialDeletion = new UserScramCredentialDeletion(username, ScramMechanism.SCRAM_SHA_256);
AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialDeletion));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
System.err.println("返回信息:" + exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
}
2.2 主題操作
package com.kafka.adminclient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class KafkaTopicOperator {
private final Logger logger = LoggerFactory.getLogger(KafkaTopicOperator.class);
private final AdminClient adminClient;
public KafkaTopicOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
/**
* 創(chuàng)建系統(tǒng)對(duì)應(yīng)的topic
*
* @param topicName 主題名稱
* @param partitions 分區(qū)
* @param replicationFactor 副本
* @param retention 數(shù)據(jù)有效期
* @return boolean
*/
public boolean createTopic(String topicName, Integer partitions, Integer replicationFactor, Integer retention) {
boolean res = false;
Set
if (!topics.contains(topicName)) {
partitions = partitions == null ? 1 : partitions;
replicationFactor = replicationFactor == null ? 1 : replicationFactor;
NewTopic topic = new NewTopic(topicName, partitions, replicationFactor.shortValue());
long param = retention * 24 * 60 * 60 * 1000;
Map
configs.put("retention.ms", String.valueOf(param));
topic.configs(configs);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topic));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
logger.warn("創(chuàng)建topic參數(shù)異常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
} else {
res = true;
logger.warn("該主題已存在,主題名稱:{}", topicName);
}
return res;
}
/**
* 修改topic數(shù)據(jù)有效期
*
* @param topicName 主題名稱
* @param retention 天數(shù)
* @return boolean
*/
public boolean updateTopic(String topicName, Integer retention) {
if (retention < 0) {
return false;
}
boolean res = false;
Map
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
//轉(zhuǎn)換為毫秒
long param = retention * 24 * 60 * 60 * 1000;
ConfigEntry configEntry = new ConfigEntry("retention.ms", String.valueOf(param));
Config config = new Config(Collections.singletonList(configEntry));
alertConfigs.put(configResource, config);
AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(alertConfigs);
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
logger.warn("修改topic參數(shù)異常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
/**
* 刪除topic
*
* @param topicName 主題
* @return boolean
*/
public boolean deleteTopic(String topicName) {
boolean res = false;
Set
if (topics.contains(topicName)) {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
logger.warn("刪除topic參數(shù)異常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
} else {
logger.info("topic不存在,名稱:{}", topicName);
res = true;
}
return res;
}
/**
* 獲取主題列表
*
* @return Set
*/
public Set
Set
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
result = listTopicsResult.names().get();
} catch (Exception e) {
logger.warn("獲取主題列表失敗,失敗原因:{}", e.getMessage());
e.printStackTrace();
}
return result;
}
}
2.3 ACL操作
package com.kafka.adminclient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class AclOperator {
private final Logger logger = LoggerFactory.getLogger(AclOperator.class);
private final AdminClient adminClient;
public AclOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
/**
* 添加權(quán)限
*
* @param resourceType 資源類型
* @param resourceName 資源名稱
* @param username 用戶名
* @param operation 權(quán)限名稱
*/
public void addAclAuth(String resourceType, String resourceName, String username, String operation) {
ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);
AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);
AclBinding aclBinding = new AclBinding(resource, accessControlEntry);
CreateAclsResult createAclsResult = adminClient.createAcls(Collections.singletonList(aclBinding));
for (Map.Entry
KafkaFuture
try {
future.get();
boolean success = !future.isCompletedExceptionally();
if (success) {
logger.info("創(chuàng)建權(quán)限成功");
}
} catch (Exception exc) {
logger.warn("創(chuàng)建權(quán)限失敗,錯(cuò)誤信息:{}", exc.getMessage());
exc.printStackTrace();
}
}
}
/**
* 刪除權(quán)限
*
* @param resourceType 資源類型
* @param resourceName 資源名稱
* @param username 用戶名
* @param operation 權(quán)限名稱
*/
public void deleteACLAuth(String resourceType, String resourceName, String username, String operation) {
ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);
AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);
AclBinding aclBinding = new AclBinding(resource, accessControlEntry);
DeleteAclsResult deleteAclsResult = adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter()));
for (Map.Entry
KafkaFuture
try {
future.get();
boolean success = !future.isCompletedExceptionally();
if (success) {
logger.info("刪除權(quán)限成功");
}
} catch (Exception exc) {
logger.warn("刪除權(quán)限失敗,錯(cuò)誤信息:{}", exc.getMessage());
exc.printStackTrace();
}
}
}
private AclOperation getOperation(String operation) {
AclOperation aclOperation = null;
switch (operation) {
case "CREATE":
aclOperation = AclOperation.CREATE;
break;
case "WRITE":
aclOperation = AclOperation.WRITE;
break;
case "READ":
aclOperation = AclOperation.READ;
break;
default:
break;
}
return aclOperation;
}
private ResourceType getResourceType(String type) {
ResourceType resourceType = null;
switch (type) {
case "Group":
resourceType = ResourceType.GROUP;
break;
case "Topic":
resourceType = ResourceType.TOPIC;
break;
default:
break;
}
return resourceType;
}
}
三、參考鏈接
kafka、zookeeper配置sasl認(rèn)證-CSDN博客Zookeeper & Kafka 開啟安全認(rèn)證的配置_kafka認(rèn)證配置_IT布道的博客-CSDN博客Kafka安全(以SASL+ACL為例)_kafka 安全-CSDN博客Kafka安全認(rèn)證授權(quán)配置_kafka認(rèn)證配置-CSDN博客Java版 Kafka ACL使用實(shí)戰(zhàn)_java kafka acl_芒果無(wú)憂的博客-CSDN博客kafka官網(wǎng)
柚子快報(bào)邀請(qǐng)碼778899分享:kafka安全機(jī)制(SASL
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。