柚子快報邀請碼778899分享:kafka安全機制(SASL
柚子快報邀請碼778899分享:kafka安全機制(SASL
在 Kafka 中,SASL 是一種重要的安全協(xié)議,用于提供基于身份驗證的訪問控制。Kafka 使用 SASL 來支持各種身份驗證機制,如:
PLAIN(基于用戶名和密碼的認證)GSSAPI(基于 Kerberos 的認證)SCRAM(Salted Challenge Response Authentication Mechanism)OAUTHBEARER
具體信息可以參考官網(wǎng):kafka安全機制官網(wǎng)-2.7 這里采用SCRAM用于kafka安全機制的實現(xiàn),而不是采用其他方式實現(xiàn),主要有如下原因:
SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0。主要是為 Kerberos 使用,如果當前已有 Kerberos 認證,只需要為集群中每個 Broker 和訪問用戶申請 Principle ,然后在 Kafka 配置文件中開啟 Kerberos 的支持即可,一般用于大型公司。SASL/PLAIN - starting at version 0.10.0.0。一個簡單的用戶名和密碼身份認證機制,通常與 TLS/SSL 一起用于加密,以實現(xiàn)身份驗證。是一種比較容易使用的方式,但是也有一個很明顯的缺點,這種方式會把用戶賬戶文件配置到靜態(tài)文件中,每次想要添加新的賬戶都需要重啟 Kafka 去加載靜態(tài)文件,才能生效,十分不方便。SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - starting at version 0.10.2.0。通過將認證信息保存在 ZooKeeper 里面,從而動態(tài)的獲取用戶信息,相當于把 ZK 用作一個認證中心使用。這種認證可以在使用過程中,使用 Kafka 提供的命令動態(tài)地創(chuàng)建和刪除用戶,無需重啟整個集群,十分方便。SASL/OAUTHBEARER - starting at version 2.0。 Kafka 引入的新認證機制,主要是為了實現(xiàn)與 OAuth2 框架的集成,Kafka 不提倡單純使用 OAUTHBEARER,因為它生成的不安全 Json Web Token,必須配以 SSL 加密才能在生產(chǎn)環(huán)境中使用。
一、環(huán)境搭建
1.1 環(huán)境準備
jdk1.8apache-zookeeper-3.5.9-binkafka_2.12-2.7.1
安裝順序:jdk–>zookeeper–>kafka
小知識:kafka版本命名約定 kafka: 這部分指的是 Apache Kafka,一個開源的分布式事件流平臺。Kafka 提供了一種可靠的、可擴展的發(fā)布-訂閱消息系統(tǒng),可以處理大規(guī)模的實時數(shù)據(jù)流。 2.12: 這表示 Scala 的版本。在 Kafka 的情況下,2.12意味著它是使用 Scala 2.12 編譯的。Scala 是一種運行在 Java 虛擬機上的多范式編程語言,被用于 Kafka 的實現(xiàn)。 2.7.1: 這是 Kafka 的版本號。在這個例子中,版本號是 2.7.1。版本號通常表示軟件的發(fā)布版本,新版本通常包含新功能、改進和修復之前版本的 bug。
安裝文件解壓之后,需要將kafka的libs目錄下的如下jar賦值到zookeeper的lib目錄下:
cp ${KAFKA_HOME}/libs/kafka-clients-2.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/lz4-java-1.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/snappy-java-1.1.7.7.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-api-1.7.30.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-log4j12-1.7.30.jar ${ZOOKEEPER_HOME}/lib
每個kafka對應的zookeeper版本不一樣,建議先下載想要的kafka版本,解壓之后,查看libs下依賴的zookeeper版本,然后去官網(wǎng)下載對應的版本進行安裝?。。?/p>
1.2 JDK的安裝與配置
Kafka從2.0.0版本開始就不再支持JDK7及以下版本
到官網(wǎng)下載jdk安裝包,并上傳至Linux的/opt目錄下解壓安裝包配置jdk環(huán)境變量,修改/etc/profile文件并向其中添加如下配置
export JAVA_HOME=/opt/jdk解壓后的文件名
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib
生效配置文件,source/etc/profile命令使配置生效
source /etc/profile
通過java-version命令驗證JDK 是否已經(jīng)安裝配置成功
java -version
1.3 Zookeeper的安裝與配置
ZooKeeper是安裝Kafka集群的必要組件,Kafka通過ZooKeeper來實施對元數(shù)據(jù)信息的管理,包括集群、broker、主題、分區(qū)等內(nèi)容。
ZooKeeper是一個開源的分布式協(xié)調(diào)服務,是Google Chubby的一個開源實現(xiàn)。分布式應用程序可以基于ZooKeeper實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱、負載均衡、命名服務、分布式協(xié)調(diào)/通知、集群管理、Master選舉、配置維護等功能。在ZooKeeper中共有3個角色:leader、follower和observer,同一時刻 ZooKeeper集群中只會有一個leader,其他的都是follower和observer。observer不參與投票,默認情況下 ZooKeeper 中只有 leader 和 follower 兩個角色。更多相關知識可以查閱ZooKeeper官方網(wǎng)站來獲得。
1.3.1 單機模式安裝
到官網(wǎng)下載zookeeper安裝包,并上傳至Linux的/opt目錄下解壓壓縮包添加配置,向/etc/profile配置文件中添加如下內(nèi)容
export ZOOKEEPER_HOME=/opt/zookeeper解壓后的文件名
export PATH=$PATH:$ZOOKEEPER_HOME/bin
執(zhí)行source/etc/profile命令使配置生效
source /etc/profile
修改 ZooKeeper 的配置文件。首先進入$ZOOKEEPER_HOME/conf 目錄,并將zoo_sample.cfg文件修改為zoo.cfg修改zoo.cfg配置文件,zoo.cfg文件的內(nèi)容參考如下
# ZooKeeper服務器心跳時間,單位為ms
tickTime=2000
# 投票選舉新leader的初始化時間
initLimit=10
# leader與follower心跳檢測最大客忍時間,響應超過syncLimit*tickTime,leader認為
# fo11ower“死掉”,從服務器列表中別除fol1ower
syncLimit=5
# 數(shù)據(jù)目錄
dataDir=/tmp/zookeeper/data
# 日志目錄
dataLogDir=/tmp/zookeeper/log
# ZooKeeper對外服務端口
clientPort=2181
默認情況下,Linux系統(tǒng)中沒有/tmp/zookeeper/data和/tmp/zookeeper/log這兩個目錄,所以接下來還要創(chuàng)建這兩個目錄
mkdir -p /tmp/zookeeper/data
mkdir -p /tmp/zookeeper/log
在${dataDir}目錄(也就是/tmp/zookeeper/data)下創(chuàng)建一個myid文件,并寫入一個數(shù)值,比如0。myid文件里存放的是服務器的編號通過zkServer.sh start啟動Zookeeper服務
zkServer.sh start
通過zkServer.sh status查看啟動狀態(tài)
zkServer.sh status
1.3.2 集群模式安裝
以上是關于ZooKeeper單機模式的安裝與配置,一般在生產(chǎn)環(huán)境中使用的都是集群模式,集群模式的配置也比較簡單,相比單機模式而言只需要修改一些配置即可。下面以3臺機器為例來配置一個ZooKeeper集群。首先在這3臺機器的**/etc/hosts文件中添加3臺集群的IP地址與機器域名的映射,示例如下(3個IP地址分別對應3臺機器) 然后在這3臺機器的zoo.cfg**文件中添加以下配置:
為了便于講解上面的配置,這里抽象出一個公式,即 server.A=B:C:D。其中:
A是一個數(shù)字,代表服務器的編號,就是前面所說的myid文件里面的值。集群中每臺服務器的編號都必須唯一,所以要保證每臺服務器中的myid文件中的值不同。B代表服務器的IP地址。C表示服務器與集群中的 leader 服務器交換信息的端口。D表示選舉時服務器相互通信的端口。
1.3.3 Zookeeper安全認證配置
zookeeper和kafka在默認情況下,是沒有開啟安全認證的,那么任意客戶端可以在不需要任何身份認證的情況下訪問zookeeper和kafka下的各節(jié)點,甚至可以進行節(jié)點的增加,修改以及刪除的動作。注意,前面的動作是基于客戶端能訪問服務端所在的網(wǎng)絡,如果進行了物理隔絕或者做了防火墻限制,那前述內(nèi)容就不一定成立。但是,在某些對安全加固要求比較嚴格的客戶或者生產(chǎn)環(huán)境中,那就必須開啟安全認證才行。除了最基本的身份認證以外,還有針對每個節(jié)點的權限訪問,但本文不涉及該話題。 進入正題,先從zookeeper開始配置,zookeeper官網(wǎng)提供了認證配置的參考,點擊下方官網(wǎng)地址,即可查看詳情。配置分兩種情況:
客戶端和服務端的雙向認證(3.4.0開始引入)服務端與服務端的雙向認證(2.4.10開始引入)
如果是非集群模式下,僅配置客戶端和服務端的雙向認證即可。集群模式下,則需要客戶端和服務端的認證以及zookeeper服務器之間的雙向認證。 Zookeeper 使用的是Java自帶的認證和授權服務(簡稱:JAAS),詳細內(nèi)容請看官網(wǎng),該鏈接是 Java 8 的 JAAS 的介紹。這里為zookeeper和kafka分別在對應配置文件下創(chuàng)建jass配置文件為(文件名可以隨意):
zookeeper:${ZOOKEEPER_HOME}/conf/zoo_jaas.confkafka:${KAFKA_HOME}/config/kafka-server-jaas.conf
注意:本節(jié)中的客戶端指的kafka,服務端指的是zookeeper
1.3.3.1 客戶端和服務端的雙向認證
配置zookeeper服務端
在zoo_jaas.conf添加如下配置
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeepersecret”
user_kafka="kafkasecret";
};
修改zoo.cfg配置
# 強制進行SASL認證
sessionRequireClientSASLAuth=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
增加jvm參數(shù),在${ZOOKEEPER_HOME}/bin/zkEnv.sh腳本中增加:
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOKEEPER_HOME}/conf/zoo_jaas.conf"
配置客戶端
在kafka-server-jaas.conf中添加如下配置:
Client{
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafkasecret";
};
修改客戶端的啟動腳本${KAFKA_HOME}/bin/kafka-server-start.sh,增加jvm參數(shù):
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka-server-jaas.conf kafka.Kafka "$@"
1.3.3.2 服務端與服務端的雙向認證
修改zoo.cfg,增加如下配置:
quorum.auth.enableSasl=true # 打開sasl開關, 默認是關的
quorum.auth.learnerRequireSasl=true # ZK做為leaner的時候, 會發(fā)送認證信息
quorum.auth.serverRequireSasl=true # 設置為true的時候,learner連接的時候需要發(fā)送認證信息,否則拒絕
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=20 # 建議設置成ZK節(jié)點的數(shù)量乘2
修改zoo_jaas.conf,增加如下配置:
QuorumServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_test="test";
};
QuorumLearner {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="test"
password="test";
};
QuorumServer 和 QuorumLearner 都是配置的ZK節(jié)點之間的認證配置
1.4 Kafka的安裝與配置
1.4.1 單機模式安裝
到官網(wǎng)下載kafka安裝包,并上傳至Linux的/opt目錄下解壓壓縮包修改broker的配置文件**$KAFKA_HOME/conf/server.properties**
# broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同
broker.id=0
# broker對外提供的服務入口地址
listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,為了方便演示,我們假設Kafka和ZooKeeper都安裝在本機
zookeeper.connect=localhost:2181/kafka
1.4.2 集群模式安裝
如果是單機模式,那么修改完上述配置參數(shù)之后就可以啟動服務。如果是集群模式,那么只需要對單機模式的配置文件做相應的修改即可:確保集群中每個broker的broker.id配置參數(shù)的值不一樣,以及l(fā)isteners配置參數(shù)也需要修改為與broker對應的IP地址或域名,之后就可以各自啟動服務。注意,在啟動 Kafka 服務之前同樣需要確保 zookeeper.connect參數(shù)所配置的ZooKeeper服務已經(jīng)正確啟動。
1.4.3 Kafka安全認證配置
通過kafka-configs.sh腳本生成一個用戶admin作為超級管理員
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin
注意:在配置kafka的server.properties對于zookeeper的連接我們采用的是zookeeper的CHROOT,所以上述命令中也需要指定對應路徑,不然啟動的kafka時會獲取不到生成的SCRAM認證信息?。?!
小知識: ZooKeeper 中的 CHROOT 是指將 ZooKeeper 的命名空間限定在一個特定的路徑下。這就是說,ZooKeeper 的所有數(shù)據(jù)和操作都將在指定的路徑下進行,而不是整個 ZooKeeper 服務器上。CHROOT 功能允許在一個 ZooKeeper 集群上運行多個獨立的 ZooKeeper 實例,每個實例都有自己的命名空間。 在 ZooKeeper 的配置文件 zoo.cfg 中,CHROOT 通過配置項 chroot 來設置。例如: chroot=/myapp 在這個例子中,ZooKeeper 就會將其根路徑設置為 /myapp,而不是默認的根路徑。這樣,對于 ZooKeeper 中的所有路徑,都將以 /myapp 為根進行解釋。這就好比把 ZooKeeper 變成了一個容器,其內(nèi)部的所有路徑都相對于 /myapp 這個容器。 CHROOT 的使用場景包括:
隔離命名空間: 允許多個應用在同一個 ZooKeeper 集群上使用不同的命名空間,防止彼此之間的命名沖突。模擬多個獨立環(huán)境: 允許在同一個 ZooKeeper 集群上模擬多個獨立的環(huán)境,每個環(huán)境有自己的數(shù)據(jù)和配置。
要注意的是,如果你在使用 CHROOT,ZooKeeper 客戶端在連接到 ZooKeeper 服務器時,也需要指定相應的 CHROOT 路徑。例如,如果 CHROOT 設置為 /myapp,那么客戶端在連接時需要指定 “/myapp” 作為根路徑。
總的來說,CHROOT 提供了一種簡單而有效的方式,使得在同一個 ZooKeeper 集群上可以支持多個隔離的命名空間。
生成之后可以通過如下命令進行查看:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type users --entity-name admin
也可以添加其他SCRAM認證信息,例如SCRAM-SHA-512:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin512]' --entity-type users --entity-name admin
也可使用如下命令刪除已經(jīng)添加的認證信息:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin
修改kafka-server-jaas.conf,增加kafka服務的SCRAM認證用戶信息
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
修改server.properties,新增如下配置:
# 啟用ACL
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 設置本例中admin為超級用戶;在Zookeeper的“/kafka/config/users”下存在用戶
super.users=User:admin
# 同時啟用SCRAM和PLAIN機制
sasl.enabled.mechanisms=SCRAM-SHA-256
# 為broker間通訊開啟SCRAM機制,采用SCRAM-SHA-256算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker間通訊使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.64.102:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://192.168.64.102:9092
如果是集群,上述配置每個節(jié)點都應該配置一份?。?!
1.4.4 服務啟動
啟動Kafka服務,在$KAFKA_HOME目錄下執(zhí)行下面的命令即可
bin/kafka-server-start.sh config/server.properties
如果要在后臺運行Kafka服務,那么可以在啟動命令中加入-daemon參數(shù)或&字符,示例如下:
bin/kafka-server-start.sh -daemon config/server.properties
或者
bin/kafka-server-start.sh config/server.properties &
通過jps命令查看Kafka服務進程是否已經(jīng)啟動
二、Admin API使用
2.1 SCRAM用戶操作
package com.kafka.adminclient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collections;
import java.util.Map;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class KafkaUserOperator {
private final AdminClient adminClient;
public KafkaUserOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
public boolean createScramUser(String username, String password) {
boolean res = false;
//指定一個協(xié)議ScramMechanism,迭代次數(shù)iterations還沒搞清楚干嘛的,設置太小會報錯
ScramCredentialInfo scramCredentialInfo = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000);
//創(chuàng)建Scram用戶憑證,用戶不存在,會先創(chuàng)建用戶
UserScramCredentialAlteration userScramCredentialUpsertion = new UserScramCredentialUpsertion(username, scramCredentialInfo, password);
AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialUpsertion));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
System.err.println("返回信息:" + exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
public boolean deleteScramUser(String username) {
boolean res = false;
//刪除Scram用戶憑證,刪除后用戶無權限操作kafka,zk中用戶節(jié)點還會存在
UserScramCredentialAlteration userScramCredentialDeletion = new UserScramCredentialDeletion(username, ScramMechanism.SCRAM_SHA_256);
AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialDeletion));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
System.err.println("返回信息:" + exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
}
2.2 主題操作
package com.kafka.adminclient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class KafkaTopicOperator {
private final Logger logger = LoggerFactory.getLogger(KafkaTopicOperator.class);
private final AdminClient adminClient;
public KafkaTopicOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
/**
* 創(chuàng)建系統(tǒng)對應的topic
*
* @param topicName 主題名稱
* @param partitions 分區(qū)
* @param replicationFactor 副本
* @param retention 數(shù)據(jù)有效期
* @return boolean
*/
public boolean createTopic(String topicName, Integer partitions, Integer replicationFactor, Integer retention) {
boolean res = false;
Set
if (!topics.contains(topicName)) {
partitions = partitions == null ? 1 : partitions;
replicationFactor = replicationFactor == null ? 1 : replicationFactor;
NewTopic topic = new NewTopic(topicName, partitions, replicationFactor.shortValue());
long param = retention * 24 * 60 * 60 * 1000;
Map
configs.put("retention.ms", String.valueOf(param));
topic.configs(configs);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topic));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
logger.warn("創(chuàng)建topic參數(shù)異常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
} else {
res = true;
logger.warn("該主題已存在,主題名稱:{}", topicName);
}
return res;
}
/**
* 修改topic數(shù)據(jù)有效期
*
* @param topicName 主題名稱
* @param retention 天數(shù)
* @return boolean
*/
public boolean updateTopic(String topicName, Integer retention) {
if (retention < 0) {
return false;
}
boolean res = false;
Map
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
//轉換為毫秒
long param = retention * 24 * 60 * 60 * 1000;
ConfigEntry configEntry = new ConfigEntry("retention.ms", String.valueOf(param));
Config config = new Config(Collections.singletonList(configEntry));
alertConfigs.put(configResource, config);
AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(alertConfigs);
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
logger.warn("修改topic參數(shù)異常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
/**
* 刪除topic
*
* @param topicName 主題
* @return boolean
*/
public boolean deleteTopic(String topicName) {
boolean res = false;
Set
if (topics.contains(topicName)) {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));
for (Map.Entry
KafkaFuture
try {
future.get();
} catch (Exception exc) {
logger.warn("刪除topic參數(shù)異常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
} else {
logger.info("topic不存在,名稱:{}", topicName);
res = true;
}
return res;
}
/**
* 獲取主題列表
*
* @return Set
*/
public Set
Set
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
result = listTopicsResult.names().get();
} catch (Exception e) {
logger.warn("獲取主題列表失敗,失敗原因:{}", e.getMessage());
e.printStackTrace();
}
return result;
}
}
2.3 ACL操作
package com.kafka.adminclient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class AclOperator {
private final Logger logger = LoggerFactory.getLogger(AclOperator.class);
private final AdminClient adminClient;
public AclOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
/**
* 添加權限
*
* @param resourceType 資源類型
* @param resourceName 資源名稱
* @param username 用戶名
* @param operation 權限名稱
*/
public void addAclAuth(String resourceType, String resourceName, String username, String operation) {
ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);
AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);
AclBinding aclBinding = new AclBinding(resource, accessControlEntry);
CreateAclsResult createAclsResult = adminClient.createAcls(Collections.singletonList(aclBinding));
for (Map.Entry
KafkaFuture
try {
future.get();
boolean success = !future.isCompletedExceptionally();
if (success) {
logger.info("創(chuàng)建權限成功");
}
} catch (Exception exc) {
logger.warn("創(chuàng)建權限失敗,錯誤信息:{}", exc.getMessage());
exc.printStackTrace();
}
}
}
/**
* 刪除權限
*
* @param resourceType 資源類型
* @param resourceName 資源名稱
* @param username 用戶名
* @param operation 權限名稱
*/
public void deleteACLAuth(String resourceType, String resourceName, String username, String operation) {
ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);
AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);
AclBinding aclBinding = new AclBinding(resource, accessControlEntry);
DeleteAclsResult deleteAclsResult = adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter()));
for (Map.Entry
KafkaFuture
try {
future.get();
boolean success = !future.isCompletedExceptionally();
if (success) {
logger.info("刪除權限成功");
}
} catch (Exception exc) {
logger.warn("刪除權限失敗,錯誤信息:{}", exc.getMessage());
exc.printStackTrace();
}
}
}
private AclOperation getOperation(String operation) {
AclOperation aclOperation = null;
switch (operation) {
case "CREATE":
aclOperation = AclOperation.CREATE;
break;
case "WRITE":
aclOperation = AclOperation.WRITE;
break;
case "READ":
aclOperation = AclOperation.READ;
break;
default:
break;
}
return aclOperation;
}
private ResourceType getResourceType(String type) {
ResourceType resourceType = null;
switch (type) {
case "Group":
resourceType = ResourceType.GROUP;
break;
case "Topic":
resourceType = ResourceType.TOPIC;
break;
default:
break;
}
return resourceType;
}
}
三、參考鏈接
kafka、zookeeper配置sasl認證-CSDN博客Zookeeper & Kafka 開啟安全認證的配置_kafka認證配置_IT布道的博客-CSDN博客Kafka安全(以SASL+ACL為例)_kafka 安全-CSDN博客Kafka安全認證授權配置_kafka認證配置-CSDN博客Java版 Kafka ACL使用實戰(zhàn)_java kafka acl_芒果無憂的博客-CSDN博客kafka官網(wǎng)
柚子快報邀請碼778899分享:kafka安全機制(SASL
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。