柚子快報激活碼778899分享:ELK+kafka日志采集
ElasticSeach(存儲日志信息) Logstash(搬運工) Kibana 連接ElasticSeach圖形化界面查詢?nèi)罩?/p>
ELK采集日志的原理:
在每個服務器上安裝LogstashLogstash需要配置固定讀取某個日志文件Logstash將日志文件格式化為json的格式輸出到es中開發(fā)者使用Kibana連接到ElasticSeach 查詢存儲日志內(nèi)容
為什么將日志存儲在ElasticSeach 其底層使用到倒排索引 搜索效率高
為什么需要使用elk+kafka 如果單純的使用elk的話,服務器節(jié)點擴容時需要在每個服務器上安裝 Logstash 步驟十分冗余。 Logstash讀取本地日志文件,可能會對本地的磁盤io性能會有一定影響。
elk+kafka采集日志的原理:
springboot項目基于aop的方式攔截系統(tǒng)中日志將該日志投遞到 kafka 中,該過程一定要采用異步的形式Logstash 訂閱 kafka 的主題獲取日志消息內(nèi)容在將日志消息內(nèi)容輸出到es中存放開發(fā)者使用Kibana連接到ElasticSeach 查詢存儲日志內(nèi)容
logstash
Logstash是一個開源數(shù)據(jù)收集引擎,具有實時管道功能。 Logstash可以動態(tài)地將來自不同數(shù)據(jù)源的數(shù)據(jù)統(tǒng)一起來,并將數(shù)據(jù)標準化到你所選擇的目的地
進入 logstash 目錄,執(zhí)行命令安裝輸入輸出插件
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch
添加配置文件:logstash/config/kafka.conf
# 輸入
input {
kafka {
bootstrap_servers => "192.168.10.110:9091"
topics => "主題名稱"
}
}
# 過濾排除一些不需要寫入的日志
filter {
#Only matched data are send to output.
}
# 輸出
output {
elasticsearch {
action => "index" #The operation on ES
hosts => "192.168.10.110:9200" #ElasticSearch host, can be array.
index => "索引名稱" #The index to write data to.
}
}
啟動logstash:./logstash -f …/config/kafka.conf
Aop攔截日志
@Aspect
@Component
public class AopLogAspect {
@Value("${server.port}")
private String serverPort;
@Autowired
private KafkaTemplate
@Pointcut("execution(* com.example.service.*.*(..))")
private void serviceAspect() {}
@Autowired
private LogContainer logContainer;
// 異常通知
@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
public void serviceAspect(JoinPoint point, Exception e) {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 設置日期格式
jsonObject.put("request_time", df.format(new Date()));
jsonObject.put("request_url", request.getRequestURL().toString());
jsonObject.put("request_method", request.getMethod());
jsonObject.put("signature", point.getSignature());
jsonObject.put("request_args", Arrays.toString(point.getArgs()));
jsonObject.put("error", e.toString());
// IP地址信息
jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
JSONObject requestJsonObject = new JSONObject();
requestJsonObject.put("request", jsonObject);
// 將日志信息投遞到kafka中
String log = requestJsonObject.toJSONString();
logContainer.put(log);
}
}
使用隊列+線程實現(xiàn)異步
@Component
public class LogContainer {
private static BlockingDeque
@Autowired
private KafkaTemplate
public LogContainer() {
new LogThreadKafka().start();
}
// 存入日志
public void put(String log) {
logDeque.offer(log);
}
// 只需要創(chuàng)建一次線程
class LogThreadKafka extends Thread {
@Override
public void run() {
while (true) {
String log = logDeque.poll();
if (!StringUtils.isEmpty(log)) {
// 將消息投遞kafka中
kafkaTemplate.send("xxx-log", log);
}
}
}
}
}
柚子快報激活碼778899分享:ELK+kafka日志采集
相關(guān)閱讀
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。