当前位置:网站首页>日志收集系統
日志收集系統
2022-06-27 04:05:00 【ALEX_CYL】
1. 項目背景
- 每個系統都有日志,當系統出現問題時,需要通過日志解决問題
- 當系統機器較少時,登錄到服務器上查看日志即可滿足需求
- 當系統機器規模龐大時,登錄到機器上查看日志幾乎不現實
2. 解决方案
a. 把機器上的日志實時收集,統一的存儲到中心系統
b. 然後再對這些日志建立索引,通過搜索即可以找到對應日志
c. 通過提供界面友好的web界面,通過web即可以完成日志搜索
3. 面臨的問題/挑戰
a. 實時日志量非常大,每天幾十億條
b. 日志准實時收集,延遲控制在分鐘級別
c. 能够水平可擴展
4. 業界方案ELK
4.1 ELK簡介
通俗來講,ELK是由Elasticsearch(彈性搜索引擎)、Logstash(日志收集)、Kibana(查看日志/可視化的web界面)三個開源軟件的組成的一個組合體,ELK是elastic公司研發的一套完整的日志收集、分析和展示的企業級解决方案,在這三個軟件當中,每個軟件用於完成不同的功能,ELK又稱為ELKstack。
對照架構圖,我們來看下這三大神獸的工作過程

1. 用戶發送請求到服務端
2. 服務端將需要記載的日志的數據通過網絡請求傳送到logstash
3. logstash對數據進行過濾清洗後,再傳給Elasticsearch
4. Elasticsearch 負責對數據創建索引,進行存儲
5. 用戶通過訪問kibana的web頁面,能够實時(延遲低於一秒)查看日志
4.2 elk方案問題
a. 運維成本高,每增加一個日志收集,都需要手動修改配置
b. 監控缺失,無法准確獲取logstash的狀態
5. 帶有kafka的日志系統設計

各組件介紹:
a. Log Agent,日志收集客戶端,用來收集服務器上的日志,每個服務器上都有一個log Agent
b. Kafka,高吞吐量的分布式隊列,linkin開發,apache頂級開源項目
c. ES,elasticsearch,開源的搜索引擎,提供基於http restful的web接口
d. Hadoop,分布式計算框架,能够對大量數據進行分布式處理的平臺
5.1 kafka應用場景:
1.异步處理, 把非關鍵流程异步化,提高系統的響應時間和健壯性

2.應用解耦,通過消息隊列
3.流量削峰
如雙十一秒殺活動,訪問量突然劇增,通過消息隊列可以有效實現流量削峰作用,即限制一次性傳入後端處理的數據量
5.2 zookeeper(分布式存儲系統)應用場景
在日志收集系統中,一般kafka會連接一個zookeeper
1. 服務注册&服務發現
當服務提供者發生擴容或縮容時,服務提供者會將服務注册到注册中心;
注册中心將服務變更通知給服務消費者,服務消費者根據注册變動信息實現自動任務調度優化(將數據分配給新增的服務提供者或將停止服務的提供者提出調用請求)
2. 配置中心(自動化配置)
1.在wep平臺修改了業務,將變動信息傳輸到zk
2.zk將服務業務變動信息發送給相應的業務應用,
3.相應的業務應用將變動信息拉到本地修改業務配置
實現自動化配置
3.分布式鎖
zookeeper是强一致的
多個客戶端同時在Zookeeper上創建相同的znode,只有一個創建成功
5.3 zookeeper與kafka安裝:
由於zookeeper和kafka基於java,先安裝JDK
sudo apt-get update
sudo apt-get install openjdk-8-jdk
zookeeper安裝:
ubuntu 安裝zookeeper
安裝zookeepr
sudo apt-get install zookeeperd
配置zookeeper
cat /etc/zookeeper/conf/zoo.cfg | more //查看zoo.cfg的配置信息
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper
# the port at which the clients will connect
clientPort=2181
# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
server.1=zookeeper1:2888:3888
啟動zookeeper
啟動server
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
查看啟動狀態
$ sudo /usr/share/zookeeper/bin/zkServer.sh status
查看啟動信息
ps -aux | grep zookeeper
鏈接server
sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
查看日志信息
日志信息是可以配置的,通過zoo.cfg,默認在:
/var/log/zookeeper/zookeeper.log
可以查看日志信息查看一些錯誤和細節
kafka安裝
安裝kafka:
ubuntu下可以用wget直接下載,我是下載到了/home/cyl/kafka目錄
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz
解壓:
tar -zxvf kafka_2.12-3.1.0.tgz
重命名
mv kafka_2.12-3.1.0 ./kafka
創建日志存儲目錄
[email protected]:~/kafka$ mkdir logs-1
修改kafka-server的配置文件
[email protected]:~/kafka/kafka$ sudo vim config/server.properties
修改配置文件中21、31、36和60行
broker.id=1
listeners=PLAINTEXT://10.141.184:9092 #為了能順利啟動broker
advertised.listeners=PLAINTEXT://10.141.184:9092
log.dirs=/home/wzj/kafka/logs-1
啟動Zookeeper
得先修改config/zookeeper.properties配置
[email protected]:~/kafka/kafka$ sudo ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
建議使用:
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
鏈接server
$ sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
啟動Kafka服務
啟動流程參考
使用 kafka-server-start.sh 啟動 kafka 服務
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-server-start.sh ./config/server.properties
創建topic
使用 kafka-topics.sh 創建單分區單副本的 topic test
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --create --bootstrap-server 10.141.65.188:9092 --replication-factor 1 --partitions 1 --topic nginxLog
此處不能使用localhost:9092,
遇到的問題
問題一:版本指令變更
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
報錯“Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option”
在較新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 連接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092來替代- -zookeeper localhost:2181。
問題二:
WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
必須與在配置文件裏面的listeners保存一致,例如
listeners=PLAINTEXT://192.168.156.131:9092
# 在命令使用時也必須使用192.168.156.131:9092作為連接的地址,如下
./kafka-console-producer.sh --broker-list 192.168.156.131:9092 --topic userlog
查看 topic 列錶
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --list --bootstrap-server 10.141.65.188:9092
產生消息,創建消息生產者
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-producer.sh --broker-list 10.141.65.188:9092 --topic nginxLog
消費消息,創建消息消費者
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-consumer.sh --bootstrap-server 10.141.65.188:9092 --topic nginxLog --from-beginning
在生產消息的窗口,輸入內容,在消費窗口就可以打印出來
查看Topic消息
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --describe --bootstrap-server 10.141.65.188:9092 --topic nginxLog
Topic: nginxLog TopicId: t6M81RsMRPGj2tZVXaxltw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: nginxLog Partition: 0 Leader: 1 Replicas: 1 Isr: 1
第一行給出了所有分區的摘要,每個附加行給出了關於一個分區的信息。 由於我們只有一個分區,所以只有一行。
“Leader”: 是負責給定分區的所有讀取和寫入的節點。 每個節點將成為分區隨機選擇部分的領導者。
“Replicas”: 是複制此分區日志的節點列錶,無論它們是否是領導者,或者即使他們當前處於活動狀態。
“Isr”: 是一組“同步”副本。這是replications列錶的子集,當前活著並被引導到領導者。
删除topic
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --delete --bootstrap-server 10.141.65.188:9092 --topic nginxLogtest
啟動命令:
bin/kafka-server-start.sh -daemon config/server.properties
創建topic
./kafka-topics.sh --create --bootstrap-server spark01:9092 --replication-factor 1 --partitions 1 --topic test2
查看topic
./kafka-topics.sh --bootstrap-server spark01:9092 --list
向指定topic中生產數據
./kafka-console-producer.sh --broker-list spark01:9092 --topic test2
例如:{
"id":"1","name":"xiaoming","age":"20"}
查看topic具體內容
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
創建消費者組
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --group kafkatest
查看消費者組
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --list
查看消費者詳情
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --describe --group kafkatest
消費數據
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
6 代碼實現
logagent 實現代碼github
1.kafka demo:
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
//配置kafka環境
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{
"10.141.65.188:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
for i := 0; i < 10; i++ {
msg := &sarama.ProducerMessage{
}
msg.Topic = "nginxLogTest"
msg.Value = sarama.StringEncoder("this is a good test, my message is good~~12")
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
}
- tailf demo
package tailf
import (
"fmt"
"time"
"github.com/hpcloud/tail"
)
func main() {
//main()
filename := "./my.log"
tails, err := tail.TailFile(filename, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{
Offset: 0, Whence: 2}, //定比特讀取比特置
MustExist: false, //要求文件必須存在或者暴露
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
var msg *tail.Line
var ok bool
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
fmt.Println("msg:", msg)
}
}
- config demo
package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
func main() {
conf, err := config.NewConfig("ini", "./logcollect.conf")
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
port, err := conf.Int("server::port")
if err != nil {
fmt.Println("read server:port failed, err:", err)
return
}
fmt.Println("Port:", port)
log_level := conf.String("logs::log_level")
fmt.Println("log_level:", log_level)
log_port, err := conf.Int("logs::port")
if err != nil {
fmt.Println("read logs:port failed, err:", err)
return
}
fmt.Println("log_Port:", log_port)
log_path := conf.String("logs::log_path")
fmt.Println("log_path:", log_path)
}
- logs demo
package main
import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)
func main() {
config := make(map[string]interface{
})
config["filename"] = "./logcollect.log"
config["level"] = logs.LevelTrace
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("marshal failed, err:", err)
return
}
logs.SetLogger(logs.AdapterFile, string(configStr))
logs.Debug("this is a test, my name is %s", "stu01~~")
logs.Trace("this is a trace, my name is %s", "stu02~~")
logs.Warn("this is a warn, my name is %s", "stu03~~")
}
边栏推荐
- Argo workflows - getting started with kubernetes' workflow engine
- There are two problems when Nacos calls microservices: 1 Load balancer does not contain an instance for the service 2. Connection refused
- MATLAB | 基于分块图布局的三纵坐标图绘制
- IOS development: understanding of dynamic library shared cache (dyld)
- Il manque beaucoup de fichiers et de répertoires tels que scripts pendant et après l'installation d'anaconda3
- 面对AI人才培养的“产学研”鸿沟,昇腾AI如何做厚产业人才黑土地?
- Resnet152 pepper pest image recognition 1.0
- iOS开发:对于动态库共享缓存(dyld)的了解
- [array]bm94 rainwater connection problem - difficult
- Office VR porn, coquettish operation! The father of Microsoft hololens resigns!
猜你喜欢

Uni app's uparse rich text parsing perfectly parses rich text!

Kotlin Compose 隐式传参 CompositionLocalProvider

Quickly master asp Net authentication framework identity - reset password by mail

办公室VR黄片,骚操作!微软HoloLens之父辞职!

How can e-commerce products be promoted and advertised on Zhihu?

MySql的开发环境

2021:Beyond Question-Based Biases:Assessing Multimodal Shortcut Learning in Visual Question Answeri

resnet152 辣椒病虫害图像识别1.0

fplan-电源规划

PAT甲级 1025 PAT Ranking
随机推荐
Semantic version 2.0.0
Static timing analysis OCV and time derive
Ledrui ldr6035 usb-c interface device supports rechargeable OTG data transmission scheme.
ERP需求和销售管理 金蝶
再探Handler(下)(Handler核心原理最全解析)
Why does C throw exceptions when accessing null fields?
mysql数据库基础:DQL数据查询语言
NestJS环境变量配置,解决如何在拦截器(interceptor)注入服务(service)的问题
PAT甲级 1019 General Palindromic Number
苹果手机证书构体知识
2016Analyzing the Behavior of Visual Question Answering Models
人间清醒:底层逻辑和顶层认知
Method of decoding iPhone certificate file
Ldr6028 OTG data transmission scheme for mobile devices while charging
Servlet and JSP final review examination site sorting 42 questions and 42 answers
Kotlin compose implicitly passes the parameter compositionlocalprovider
iOS开发:对于动态库共享缓存(dyld)的了解
2021:Beyond Question-Based Biases:Assessing Multimodal Shortcut Learning in Visual Question Answeri
Anaconda3 is missing a large number of files during and after installation, and there are no scripts and other directories
Argo Workflows —— Kubernetes的工作流引擎入门