欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:kafka安全機制(SASL

柚子快報邀請碼778899分享:kafka安全機制(SASL

http://yzkb.51969.com/

在 Kafka 中,SASL 是一種重要的安全協(xié)議,用于提供基于身份驗證的訪問控制。Kafka 使用 SASL 來支持各種身份驗證機制,如:

PLAIN(基于用戶名和密碼的認證)GSSAPI(基于 Kerberos 的認證)SCRAM(Salted Challenge Response Authentication Mechanism)OAUTHBEARER

具體信息可以參考官網(wǎng):kafka安全機制官網(wǎng)-2.7 這里采用SCRAM用于kafka安全機制的實現(xiàn),而不是采用其他方式實現(xiàn),主要有如下原因:

SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0。主要是為 Kerberos 使用,如果當前已有 Kerberos 認證,只需要為集群中每個 Broker 和訪問用戶申請 Principle ,然后在 Kafka 配置文件中開啟 Kerberos 的支持即可,一般用于大型公司。SASL/PLAIN - starting at version 0.10.0.0。一個簡單的用戶名和密碼身份認證機制,通常與 TLS/SSL 一起用于加密,以實現(xiàn)身份驗證。是一種比較容易使用的方式,但是也有一個很明顯的缺點,這種方式會把用戶賬戶文件配置到靜態(tài)文件中,每次想要添加新的賬戶都需要重啟 Kafka 去加載靜態(tài)文件,才能生效,十分不方便。SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - starting at version 0.10.2.0。通過將認證信息保存在 ZooKeeper 里面,從而動態(tài)的獲取用戶信息,相當于把 ZK 用作一個認證中心使用。這種認證可以在使用過程中,使用 Kafka 提供的命令動態(tài)地創(chuàng)建和刪除用戶,無需重啟整個集群,十分方便。SASL/OAUTHBEARER - starting at version 2.0。 Kafka 引入的新認證機制,主要是為了實現(xiàn)與 OAuth2 框架的集成,Kafka 不提倡單純使用 OAUTHBEARER,因為它生成的不安全 Json Web Token,必須配以 SSL 加密才能在生產(chǎn)環(huán)境中使用。

一、環(huán)境搭建

1.1 環(huán)境準備

jdk1.8apache-zookeeper-3.5.9-binkafka_2.12-2.7.1

安裝順序:jdk–>zookeeper–>kafka

小知識:kafka版本命名約定 kafka: 這部分指的是 Apache Kafka,一個開源的分布式事件流平臺。Kafka 提供了一種可靠的、可擴展的發(fā)布-訂閱消息系統(tǒng),可以處理大規(guī)模的實時數(shù)據(jù)流。 2.12: 這表示 Scala 的版本。在 Kafka 的情況下,2.12意味著它是使用 Scala 2.12 編譯的。Scala 是一種運行在 Java 虛擬機上的多范式編程語言,被用于 Kafka 的實現(xiàn)。 2.7.1: 這是 Kafka 的版本號。在這個例子中,版本號是 2.7.1。版本號通常表示軟件的發(fā)布版本,新版本通常包含新功能、改進和修復之前版本的 bug。

安裝文件解壓之后,需要將kafka的libs目錄下的如下jar賦值到zookeeper的lib目錄下:

cp ${KAFKA_HOME}/libs/kafka-clients-2.7.1.jar ${ZOOKEEPER_HOME}/lib

cp ${KAFKA_HOME}/libs/lz4-java-1.7.1.jar ${ZOOKEEPER_HOME}/lib

cp ${KAFKA_HOME}/libs/snappy-java-1.1.7.7.jar ${ZOOKEEPER_HOME}/lib

cp ${KAFKA_HOME}/libs/slf4j-api-1.7.30.jar ${ZOOKEEPER_HOME}/lib

cp ${KAFKA_HOME}/libs/slf4j-log4j12-1.7.30.jar ${ZOOKEEPER_HOME}/lib

每個kafka對應的zookeeper版本不一樣,建議先下載想要的kafka版本,解壓之后,查看libs下依賴的zookeeper版本,然后去官網(wǎng)下載對應的版本進行安裝?。。?/p>

1.2 JDK的安裝與配置

Kafka從2.0.0版本開始就不再支持JDK7及以下版本

到官網(wǎng)下載jdk安裝包,并上傳至Linux的/opt目錄下解壓安裝包配置jdk環(huán)境變量,修改/etc/profile文件并向其中添加如下配置

export JAVA_HOME=/opt/jdk解壓后的文件名

export JRE_HOME=$JAVA_HOME/jre

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib

生效配置文件,source/etc/profile命令使配置生效

source /etc/profile

通過java-version命令驗證JDK 是否已經(jīng)安裝配置成功

java -version

1.3 Zookeeper的安裝與配置

ZooKeeper是安裝Kafka集群的必要組件,Kafka通過ZooKeeper來實施對元數(shù)據(jù)信息的管理,包括集群、broker、主題、分區(qū)等內(nèi)容。

ZooKeeper是一個開源的分布式協(xié)調(diào)服務,是Google Chubby的一個開源實現(xiàn)。分布式應用程序可以基于ZooKeeper實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱、負載均衡、命名服務、分布式協(xié)調(diào)/通知、集群管理、Master選舉、配置維護等功能。在ZooKeeper中共有3個角色:leader、follower和observer,同一時刻 ZooKeeper集群中只會有一個leader,其他的都是follower和observer。observer不參與投票,默認情況下 ZooKeeper 中只有 leader 和 follower 兩個角色。更多相關知識可以查閱ZooKeeper官方網(wǎng)站來獲得。

1.3.1 單機模式安裝

到官網(wǎng)下載zookeeper安裝包,并上傳至Linux的/opt目錄下解壓壓縮包添加配置,向/etc/profile配置文件中添加如下內(nèi)容

export ZOOKEEPER_HOME=/opt/zookeeper解壓后的文件名

export PATH=$PATH:$ZOOKEEPER_HOME/bin

執(zhí)行source/etc/profile命令使配置生效

source /etc/profile

修改 ZooKeeper 的配置文件。首先進入$ZOOKEEPER_HOME/conf 目錄,并將zoo_sample.cfg文件修改為zoo.cfg修改zoo.cfg配置文件,zoo.cfg文件的內(nèi)容參考如下

# ZooKeeper服務器心跳時間,單位為ms

tickTime=2000

# 投票選舉新leader的初始化時間

initLimit=10

# leader與follower心跳檢測最大客忍時間,響應超過syncLimit*tickTime,leader認為

# fo11ower“死掉”,從服務器列表中別除fol1ower

syncLimit=5

# 數(shù)據(jù)目錄

dataDir=/tmp/zookeeper/data

# 日志目錄

dataLogDir=/tmp/zookeeper/log

# ZooKeeper對外服務端口

clientPort=2181

默認情況下,Linux系統(tǒng)中沒有/tmp/zookeeper/data和/tmp/zookeeper/log這兩個目錄,所以接下來還要創(chuàng)建這兩個目錄

mkdir -p /tmp/zookeeper/data

mkdir -p /tmp/zookeeper/log

在${dataDir}目錄(也就是/tmp/zookeeper/data)下創(chuàng)建一個myid文件,并寫入一個數(shù)值,比如0。myid文件里存放的是服務器的編號通過zkServer.sh start啟動Zookeeper服務

zkServer.sh start

通過zkServer.sh status查看啟動狀態(tài)

zkServer.sh status

1.3.2 集群模式安裝

以上是關于ZooKeeper單機模式的安裝與配置,一般在生產(chǎn)環(huán)境中使用的都是集群模式,集群模式的配置也比較簡單,相比單機模式而言只需要修改一些配置即可。下面以3臺機器為例來配置一個ZooKeeper集群。首先在這3臺機器的**/etc/hosts文件中添加3臺集群的IP地址與機器域名的映射,示例如下(3個IP地址分別對應3臺機器) 然后在這3臺機器的zoo.cfg**文件中添加以下配置:

為了便于講解上面的配置,這里抽象出一個公式,即 server.A=B:C:D。其中:

A是一個數(shù)字,代表服務器的編號,就是前面所說的myid文件里面的值。集群中每臺服務器的編號都必須唯一,所以要保證每臺服務器中的myid文件中的值不同。B代表服務器的IP地址。C表示服務器與集群中的 leader 服務器交換信息的端口。D表示選舉時服務器相互通信的端口。

1.3.3 Zookeeper安全認證配置

zookeeper和kafka在默認情況下,是沒有開啟安全認證的,那么任意客戶端可以在不需要任何身份認證的情況下訪問zookeeper和kafka下的各節(jié)點,甚至可以進行節(jié)點的增加,修改以及刪除的動作。注意,前面的動作是基于客戶端能訪問服務端所在的網(wǎng)絡,如果進行了物理隔絕或者做了防火墻限制,那前述內(nèi)容就不一定成立。但是,在某些對安全加固要求比較嚴格的客戶或者生產(chǎn)環(huán)境中,那就必須開啟安全認證才行。除了最基本的身份認證以外,還有針對每個節(jié)點的權限訪問,但本文不涉及該話題。 進入正題,先從zookeeper開始配置,zookeeper官網(wǎng)提供了認證配置的參考,點擊下方官網(wǎng)地址,即可查看詳情。配置分兩種情況:

客戶端和服務端的雙向認證(3.4.0開始引入)服務端與服務端的雙向認證(2.4.10開始引入)

如果是非集群模式下,僅配置客戶端和服務端的雙向認證即可。集群模式下,則需要客戶端和服務端的認證以及zookeeper服務器之間的雙向認證。 Zookeeper 使用的是Java自帶的認證和授權服務(簡稱:JAAS),詳細內(nèi)容請看官網(wǎng),該鏈接是 Java 8 的 JAAS 的介紹。這里為zookeeper和kafka分別在對應配置文件下創(chuàng)建jass配置文件為(文件名可以隨意):

zookeeper:${ZOOKEEPER_HOME}/conf/zoo_jaas.confkafka:${KAFKA_HOME}/config/kafka-server-jaas.conf

注意:本節(jié)中的客戶端指的kafka,服務端指的是zookeeper

1.3.3.1 客戶端和服務端的雙向認證

配置zookeeper服務端

在zoo_jaas.conf添加如下配置

Server {

org.apache.zookeeper.server.auth.DigestLoginModule required

username="zookeeper"

password="zookeepersecret”

user_kafka="kafkasecret";

};

修改zoo.cfg配置

# 強制進行SASL認證

sessionRequireClientSASLAuth=true

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

增加jvm參數(shù),在${ZOOKEEPER_HOME}/bin/zkEnv.sh腳本中增加:

export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOKEEPER_HOME}/conf/zoo_jaas.conf"

配置客戶端

在kafka-server-jaas.conf中添加如下配置:

Client{

org.apache.zookeeper.server.auth.DigestLoginModule required

username="kafka"

password="kafkasecret";

};

修改客戶端的啟動腳本${KAFKA_HOME}/bin/kafka-server-start.sh,增加jvm參數(shù):

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka-server-jaas.conf kafka.Kafka "$@"

1.3.3.2 服務端與服務端的雙向認證

修改zoo.cfg,增加如下配置:

quorum.auth.enableSasl=true # 打開sasl開關, 默認是關的

quorum.auth.learnerRequireSasl=true # ZK做為leaner的時候, 會發(fā)送認證信息

quorum.auth.serverRequireSasl=true # 設置為true的時候,learner連接的時候需要發(fā)送認證信息,否則拒絕

quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字

quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字

quorum.cnxn.threads.size=20 # 建議設置成ZK節(jié)點的數(shù)量乘2

修改zoo_jaas.conf,增加如下配置:

QuorumServer {

org.apache.zookeeper.server.auth.DigestLoginModule required

user_test="test";

};

QuorumLearner {

org.apache.zookeeper.server.auth.DigestLoginModule required

username="test"

password="test";

};

QuorumServer 和 QuorumLearner 都是配置的ZK節(jié)點之間的認證配置

1.4 Kafka的安裝與配置

1.4.1 單機模式安裝

到官網(wǎng)下載kafka安裝包,并上傳至Linux的/opt目錄下解壓壓縮包修改broker的配置文件**$KAFKA_HOME/conf/server.properties**

# broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同

broker.id=0

# broker對外提供的服務入口地址

listeners=PLAINTEXT://localhost:9092

# 存放消息日志文件的地址

log.dirs=/tmp/kafka-logs

# Kafka所需的ZooKeeper集群地址,為了方便演示,我們假設Kafka和ZooKeeper都安裝在本機

zookeeper.connect=localhost:2181/kafka

1.4.2 集群模式安裝

如果是單機模式,那么修改完上述配置參數(shù)之后就可以啟動服務。如果是集群模式,那么只需要對單機模式的配置文件做相應的修改即可:確保集群中每個broker的broker.id配置參數(shù)的值不一樣,以及l(fā)isteners配置參數(shù)也需要修改為與broker對應的IP地址或域名,之后就可以各自啟動服務。注意,在啟動 Kafka 服務之前同樣需要確保 zookeeper.connect參數(shù)所配置的ZooKeeper服務已經(jīng)正確啟動。

1.4.3 Kafka安全認證配置

通過kafka-configs.sh腳本生成一個用戶admin作為超級管理員

bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin

注意:在配置kafka的server.properties對于zookeeper的連接我們采用的是zookeeper的CHROOT,所以上述命令中也需要指定對應路徑,不然啟動的kafka時會獲取不到生成的SCRAM認證信息?。?!

小知識: ZooKeeper 中的 CHROOT 是指將 ZooKeeper 的命名空間限定在一個特定的路徑下。這就是說,ZooKeeper 的所有數(shù)據(jù)和操作都將在指定的路徑下進行,而不是整個 ZooKeeper 服務器上。CHROOT 功能允許在一個 ZooKeeper 集群上運行多個獨立的 ZooKeeper 實例,每個實例都有自己的命名空間。 在 ZooKeeper 的配置文件 zoo.cfg 中,CHROOT 通過配置項 chroot 來設置。例如: chroot=/myapp 在這個例子中,ZooKeeper 就會將其根路徑設置為 /myapp,而不是默認的根路徑。這樣,對于 ZooKeeper 中的所有路徑,都將以 /myapp 為根進行解釋。這就好比把 ZooKeeper 變成了一個容器,其內(nèi)部的所有路徑都相對于 /myapp 這個容器。 CHROOT 的使用場景包括:

隔離命名空間: 允許多個應用在同一個 ZooKeeper 集群上使用不同的命名空間,防止彼此之間的命名沖突。模擬多個獨立環(huán)境: 允許在同一個 ZooKeeper 集群上模擬多個獨立的環(huán)境,每個環(huán)境有自己的數(shù)據(jù)和配置。

要注意的是,如果你在使用 CHROOT,ZooKeeper 客戶端在連接到 ZooKeeper 服務器時,也需要指定相應的 CHROOT 路徑。例如,如果 CHROOT 設置為 /myapp,那么客戶端在連接時需要指定 “/myapp” 作為根路徑。

總的來說,CHROOT 提供了一種簡單而有效的方式,使得在同一個 ZooKeeper 集群上可以支持多個隔離的命名空間。

生成之后可以通過如下命令進行查看:

bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type users --entity-name admin

也可以添加其他SCRAM認證信息,例如SCRAM-SHA-512:

bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin512]' --entity-type users --entity-name admin

也可使用如下命令刪除已經(jīng)添加的認證信息:

bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin

修改kafka-server-jaas.conf,增加kafka服務的SCRAM認證用戶信息

KafkaServer {

org.apache.kafka.common.security.scram.ScramLoginModule required

username="admin"

password="admin"

user_admin="admin";

};

修改server.properties,新增如下配置:

# 啟用ACL

allow.everyone.if.no.acl.found=false

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# 設置本例中admin為超級用戶;在Zookeeper的“/kafka/config/users”下存在用戶

super.users=User:admin

# 同時啟用SCRAM和PLAIN機制

sasl.enabled.mechanisms=SCRAM-SHA-256

# 為broker間通訊開啟SCRAM機制,采用SCRAM-SHA-256算法

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# broker間通訊使用PLAINTEXT,本例中不演示SSL配置

security.inter.broker.protocol=SASL_PLAINTEXT

# 配置listeners使用SASL_PLAINTEXT

listeners=SASL_PLAINTEXT://192.168.64.102:9092

# 配置advertised.listeners

advertised.listeners=SASL_PLAINTEXT://192.168.64.102:9092

如果是集群,上述配置每個節(jié)點都應該配置一份?。?!

1.4.4 服務啟動

啟動Kafka服務,在$KAFKA_HOME目錄下執(zhí)行下面的命令即可

bin/kafka-server-start.sh config/server.properties

如果要在后臺運行Kafka服務,那么可以在啟動命令中加入-daemon參數(shù)或&字符,示例如下:

bin/kafka-server-start.sh -daemon config/server.properties

或者

bin/kafka-server-start.sh config/server.properties &

通過jps命令查看Kafka服務進程是否已經(jīng)啟動

二、Admin API使用

2.1 SCRAM用戶操作

package com.kafka.adminclient;

import org.apache.kafka.clients.admin.*;

import org.apache.kafka.common.KafkaFuture;

import java.util.Collections;

import java.util.Map;

/**

* @Author: Jiangxx

* @Date: 2023/11/24

* @Description:

*/

public class KafkaUserOperator {

private final AdminClient adminClient;

public KafkaUserOperator(AdminClient adminClient) {

this.adminClient = adminClient;

}

public boolean createScramUser(String username, String password) {

boolean res = false;

//指定一個協(xié)議ScramMechanism,迭代次數(shù)iterations還沒搞清楚干嘛的,設置太小會報錯

ScramCredentialInfo scramCredentialInfo = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000);

//創(chuàng)建Scram用戶憑證,用戶不存在,會先創(chuàng)建用戶

UserScramCredentialAlteration userScramCredentialUpsertion = new UserScramCredentialUpsertion(username, scramCredentialInfo, password);

AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialUpsertion));

for (Map.Entry> e : alterUserScramCredentialsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

} catch (Exception exc) {

System.err.println("返回信息:" + exc.getMessage());

}

res = !future.isCompletedExceptionally();

}

return res;

}

public boolean deleteScramUser(String username) {

boolean res = false;

//刪除Scram用戶憑證,刪除后用戶無權限操作kafka,zk中用戶節(jié)點還會存在

UserScramCredentialAlteration userScramCredentialDeletion = new UserScramCredentialDeletion(username, ScramMechanism.SCRAM_SHA_256);

AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialDeletion));

for (Map.Entry> e : alterUserScramCredentialsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

} catch (Exception exc) {

System.err.println("返回信息:" + exc.getMessage());

}

res = !future.isCompletedExceptionally();

}

return res;

}

}

2.2 主題操作

package com.kafka.adminclient;

import org.apache.kafka.clients.admin.*;

import org.apache.kafka.common.KafkaFuture;

import org.apache.kafka.common.config.ConfigResource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

/**

* @Author: Jiangxx

* @Date: 2023/11/24

* @Description:

*/

public class KafkaTopicOperator {

private final Logger logger = LoggerFactory.getLogger(KafkaTopicOperator.class);

private final AdminClient adminClient;

public KafkaTopicOperator(AdminClient adminClient) {

this.adminClient = adminClient;

}

/**

* 創(chuàng)建系統(tǒng)對應的topic

*

* @param topicName 主題名稱

* @param partitions 分區(qū)

* @param replicationFactor 副本

* @param retention 數(shù)據(jù)有效期

* @return boolean

*/

public boolean createTopic(String topicName, Integer partitions, Integer replicationFactor, Integer retention) {

boolean res = false;

Set topics = getTopicList();

if (!topics.contains(topicName)) {

partitions = partitions == null ? 1 : partitions;

replicationFactor = replicationFactor == null ? 1 : replicationFactor;

NewTopic topic = new NewTopic(topicName, partitions, replicationFactor.shortValue());

long param = retention * 24 * 60 * 60 * 1000;

Map configs = new HashMap<>();

configs.put("retention.ms", String.valueOf(param));

topic.configs(configs);

CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topic));

for (Map.Entry> e : createTopicsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

} catch (Exception exc) {

logger.warn("創(chuàng)建topic參數(shù)異常,返回信息:{}", exc.getMessage());

}

res = !future.isCompletedExceptionally();

}

} else {

res = true;

logger.warn("該主題已存在,主題名稱:{}", topicName);

}

return res;

}

/**

* 修改topic數(shù)據(jù)有效期

*

* @param topicName 主題名稱

* @param retention 天數(shù)

* @return boolean

*/

public boolean updateTopic(String topicName, Integer retention) {

if (retention < 0) {

return false;

}

boolean res = false;

Map alertConfigs = new HashMap<>();

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);

//轉換為毫秒

long param = retention * 24 * 60 * 60 * 1000;

ConfigEntry configEntry = new ConfigEntry("retention.ms", String.valueOf(param));

Config config = new Config(Collections.singletonList(configEntry));

alertConfigs.put(configResource, config);

AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(alertConfigs);

for (Map.Entry> e : alterConfigsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

} catch (Exception exc) {

logger.warn("修改topic參數(shù)異常,返回信息:{}", exc.getMessage());

}

res = !future.isCompletedExceptionally();

}

return res;

}

/**

* 刪除topic

*

* @param topicName 主題

* @return boolean

*/

public boolean deleteTopic(String topicName) {

boolean res = false;

Set topics = getTopicList();

if (topics.contains(topicName)) {

DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));

for (Map.Entry> e : deleteTopicsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

} catch (Exception exc) {

logger.warn("刪除topic參數(shù)異常,返回信息:{}", exc.getMessage());

}

res = !future.isCompletedExceptionally();

}

} else {

logger.info("topic不存在,名稱:{}", topicName);

res = true;

}

return res;

}

/**

* 獲取主題列表

*

* @return Set

*/

public Set getTopicList() {

Set result = null;

ListTopicsResult listTopicsResult = adminClient.listTopics();

try {

result = listTopicsResult.names().get();

} catch (Exception e) {

logger.warn("獲取主題列表失敗,失敗原因:{}", e.getMessage());

e.printStackTrace();

}

return result;

}

}

2.3 ACL操作

package com.kafka.adminclient;

import org.apache.kafka.clients.admin.AdminClient;

import org.apache.kafka.clients.admin.CreateAclsResult;

import org.apache.kafka.clients.admin.DeleteAclsResult;

import org.apache.kafka.common.KafkaFuture;

import org.apache.kafka.common.acl.*;

import org.apache.kafka.common.resource.PatternType;

import org.apache.kafka.common.resource.ResourcePattern;

import org.apache.kafka.common.resource.ResourceType;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.Collections;

import java.util.Map;

/**

* @Author: Jiangxx

* @Date: 2023/11/24

* @Description:

*/

public class AclOperator {

private final Logger logger = LoggerFactory.getLogger(AclOperator.class);

private final AdminClient adminClient;

public AclOperator(AdminClient adminClient) {

this.adminClient = adminClient;

}

/**

* 添加權限

*

* @param resourceType 資源類型

* @param resourceName 資源名稱

* @param username 用戶名

* @param operation 權限名稱

*/

public void addAclAuth(String resourceType, String resourceName, String username, String operation) {

ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);

AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);

AclBinding aclBinding = new AclBinding(resource, accessControlEntry);

CreateAclsResult createAclsResult = adminClient.createAcls(Collections.singletonList(aclBinding));

for (Map.Entry> e : createAclsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

boolean success = !future.isCompletedExceptionally();

if (success) {

logger.info("創(chuàng)建權限成功");

}

} catch (Exception exc) {

logger.warn("創(chuàng)建權限失敗,錯誤信息:{}", exc.getMessage());

exc.printStackTrace();

}

}

}

/**

* 刪除權限

*

* @param resourceType 資源類型

* @param resourceName 資源名稱

* @param username 用戶名

* @param operation 權限名稱

*/

public void deleteACLAuth(String resourceType, String resourceName, String username, String operation) {

ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);

AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);

AclBinding aclBinding = new AclBinding(resource, accessControlEntry);

DeleteAclsResult deleteAclsResult = adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter()));

for (Map.Entry> e : deleteAclsResult.values().entrySet()) {

KafkaFuture future = e.getValue();

try {

future.get();

boolean success = !future.isCompletedExceptionally();

if (success) {

logger.info("刪除權限成功");

}

} catch (Exception exc) {

logger.warn("刪除權限失敗,錯誤信息:{}", exc.getMessage());

exc.printStackTrace();

}

}

}

private AclOperation getOperation(String operation) {

AclOperation aclOperation = null;

switch (operation) {

case "CREATE":

aclOperation = AclOperation.CREATE;

break;

case "WRITE":

aclOperation = AclOperation.WRITE;

break;

case "READ":

aclOperation = AclOperation.READ;

break;

default:

break;

}

return aclOperation;

}

private ResourceType getResourceType(String type) {

ResourceType resourceType = null;

switch (type) {

case "Group":

resourceType = ResourceType.GROUP;

break;

case "Topic":

resourceType = ResourceType.TOPIC;

break;

default:

break;

}

return resourceType;

}

}

三、參考鏈接

kafka、zookeeper配置sasl認證-CSDN博客Zookeeper & Kafka 開啟安全認證的配置_kafka認證配置_IT布道的博客-CSDN博客Kafka安全(以SASL+ACL為例)_kafka 安全-CSDN博客Kafka安全認證授權配置_kafka認證配置-CSDN博客Java版 Kafka ACL使用實戰(zhàn)_java kafka acl_芒果無憂的博客-CSDN博客kafka官網(wǎng)

柚子快報邀請碼778899分享:kafka安全機制(SASL

http://yzkb.51969.com/

參考閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19122356.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄