Doris中沒有kafka引擎表,如何實現(xiàn)同步kafka的數(shù)據(jù)到Doris? kafka direct
Digimart數(shù)碼優(yōu)選跨境問答2025-04-252280
要在Doris中實現(xiàn)同步Kafka數(shù)據(jù),可以使用以下步驟:
- 確保已經(jīng)安裝了Kafka和Doris的Java驅(qū)動。在Doris的配置文件(例如
doris_conf.properties
)中添加以下內(nèi)容:
# Kafka driver class path
KAFKA_DRIVER_CLASS_PATH=/path/to/kafka-java-driver-class.jar
- 在Doris中創(chuàng)建一個表,用于存儲Kafka數(shù)據(jù)。例如,創(chuàng)建一個名為
kafka_data
的表:
CREATE TABLE kafka_data (
id INT NOT NULL,
topic STRING,
message STRING,
time TIMESTAMP,
PRIMARY KEY (id)
);
- 編寫一個Python腳本,用于從Kafka中讀取數(shù)據(jù)并將其插入到Doris表中。以下是一個示例腳本:
from kafka import KafkaConsumer
import pymysql
# Kafka消費者配置
consumer_configs = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'test_group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
}
# Doris連接配置
doris_conn_configs = {
'host': 'localhost',
'port': 3306,
'username': 'root',
'password': '',
}
# Kafka消費者實例
consumer = KafkaConsumer(consumer_configs)
# 循環(huán)處理Kafka消息
while True:
try:
# 從Kafka獲取消息
message = consumer.next()
if message is not None:
# 將消息插入到Doris表
with pymysql.connect(**doris_conn_configs) as conn:
cursor = conn.cursor()
insert_sql = "INSERT INTO kafka_data (topic, message, time) VALUES (%s, %s, %s)"
cursor.execute(insert_sql, (message['topic'], message['value'], message['timestamp']))
conn.commit()
except Exception as e:
print(f"Error processing message: {e}")
continue
- 運行Python腳本,從Kafka中讀取數(shù)據(jù)并將其插入到Doris表中。請確保Kafka集群正常運行,并且Doris服務(wù)正在運行。
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。