当前位置:网站首页>Système de collecte des journaux
Système de collecte des journaux
2022-06-27 04:06:00 【Alex CYL】
1. Contexte du projet
- Chaque système a un journal,Quand il y a un problème avec le système,Le problème doit être résolu par le journal
- Quand il y a moins de machines système,Connectez - vous au serveur pour voir les journaux pour répondre à vos besoins
- Quand les machines du système sont énormes,Il est presque impossible de se connecter à la machine pour voir les journaux
2. Solutions
a. Recueillir les journaux sur la machine en temps réel,Stockage unifié àSystème central
b. Et ensuite sur ces journauxCréer un index,Le journal correspondant peut être trouvé par la recherche
c. En offrant une interface convivialewebInterface,AdoptionwebLa recherche de journaux peut être effectuée
3. Problèmes rencontrés/Défis à relever
a. La quantité de journaux en temps réel est très importante,Des milliards par jour
b. Collecte quasi en temps réel des journaux,Le délai est contrôlé au niveau des minutes
c. Extensible horizontalement
4. Programmes de l'industrieELK
4.1 ELKIntroduction
En termes simples,ELKC'est parElasticsearch(Moteur de recherche flexible)、Logstash(Collecte des journaux)、Kibana(Voir le journal/VisualiséwebInterface)Une combinaison de trois logiciels libres,ELK- Oui.elasticUne collection complète de journaux développée par l'entreprise、Solutions d'entreprise analysées et présentées,Parmi ces trois logiciels,,Chaque logiciel est utilisé pour remplir différentes fonctions,ELKAussi appeléELKstack.
Voir le diagramme d'architecture , Regardons comment ces trois bêtes fonctionnent.

1. L'utilisateur envoie la demande au serveur
2. Le serveur transmet les données du Journal à enregistrer par demande réseau à logstash
3. logstash Après filtrage et nettoyage des données ,À transmettreElasticsearch
4. Elasticsearch Responsable de l'indexation des données ,Stockage
5. Utilisateurs accédantkibanaDewebPage,En temps réel( Délai inférieur à une seconde )Voir le journal
reference:
elkConfiguration
elkDétails
4.2 elk Questions relatives au programme
a. Coûts élevés d'exploitation et d'entretien, Pour chaque nouvelle collection de journaux , Nécessite une modification manuelle de la configuration
b. Surveillance manquante , Impossible d'obtenir avec précision logstashÉtat de
5. Aveckafka Conception du système de journalisation

Introduction à chaque composant:
a. Log Agent,Client de collecte de journaux, Utilisé pour collecter des journaux sur le serveur , Un sur chaque serveur log Agent
b. Kafka,Débit élevéDeFile d'attente distribuée,linkinDéveloppement,apacheProjets open source de haut niveau
c. ES,elasticsearch,Moteur de recherche open source,Base d'approvisionnementÀhttp restfulDewebInterface
d. Hadoop,Cadre informatique distribué, Plate - forme de traitement distribué de grandes quantités de données
5.1 kafkaScénario d'application:
1.Traitement asynchrone, Prends ça.Non critique Processus asynchrone , Améliorer le système Temps de réponseEtRobustesse

2.Découplage des applications,Via la file d'attente des messages
3.Pic de débit
Comme double 11 secondes d'activité meurtrière , Une explosion soudaine du trafic , La mise en file d'attente des messages peut efficacement réduire le trafic de pointe , C'est - à - dire limiter la quantité de données à traiter en même temps 
5.2 zookeeper(Système de stockage distribué)Scénario d'application
Dans le système de collecte des journaux ,En généralkafka Va connecter un zookeeper
1. Inscription au service&Découverte de services
Lorsque le fournisseur de services s'agrandit ou se rétrécit , Le fournisseur de services inscrit le service au Registre ;
Le registre informe les consommateurs de services des changements apportés aux services , Le consommateur de services réalise l'optimisation automatique de l'ordonnancement des tâches basée sur les informations de changement d'enregistrement ( Demande d'appel pour l'attribution de données à un nouveau fournisseur de services ou à un fournisseur qui arrêtera le Service )
2. Centre de configuration(Configuration automatisée)
1.Inwep La plateforme a modifié le Business , Transmission des informations sur les changements à zk
2.zk Envoyer l'information sur le changement d'entreprise de service à l'application d'entreprise correspondante ,
3. L'application d'affaires correspondante tire les informations de changement vers local pour modifier la configuration d'affaires
Configuration automatisée
3.Serrure distribuée
zookeeperC'est très cohérent.
Plusieurs clients en même temps Zookeeper Créer le même znode, Un seul a été créé avec succès
5.3 zookeeperAveckafkaInstallation:
Parce quezookeeperEtkafkaBasé surjava,Installer d'abordJDK
sudo apt-get update
sudo apt-get install openjdk-8-jdk
zookeeperInstallation:
ubuntu Installationzookeeper
Installationzookeepr
sudo apt-get install zookeeperd
Configurationzookeeper
cat /etc/zookeeper/conf/zoo.cfg | more //Voirzoo.cfgInformations de configuration pour
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper
# the port at which the clients will connect
clientPort=2181
# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
server.1=zookeeper1:2888:3888
Démarragezookeeper
Démarrageserver
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
Voir l'état de démarrage
$ sudo /usr/share/zookeeper/bin/zkServer.sh status
Voir les informations de démarrage
ps -aux | grep zookeeper
Liensserver
sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
Afficher les informations du Journal
Les informations du journal sont configurables ,Adoptionzoo.cfg,Par défaut:
/var/log/zookeeper/zookeeper.log
Vous pouvez voir les informations du journal pour voir quelques erreurs et détails
kafkaInstallation
ubuntu18.04En bas.KafkaInstallation et déploiement
Installationkafka:
ubuntuÇa marche.wgetTéléchargement direct, J'ai téléchargé /home/cyl/kafkaTable des matières
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz
Décompresser:
tar -zxvf kafka_2.12-3.1.0.tgz
Renommer
mv kafka_2.12-3.1.0 ./kafka
Créer un répertoire de stockage de journaux
[email protected]:~/kafka$ mkdir logs-1
Modifierkafka-serverProfil pour
[email protected]:~/kafka/kafka$ sudo vim config/server.properties
Modifier le fichier de configuration21、31、36Et60D'accord
broker.id=1
listeners=PLAINTEXT://10.141.184:9092 # Pour un démarrage sans heurt broker
advertised.listeners=PLAINTEXT://10.141.184:9092
log.dirs=/home/wzj/kafka/logs-1
DémarrageZookeeper
Il faut d'abord modifierconfig/zookeeper.propertiesConfiguration
[email protected]:~/kafka/kafka$ sudo ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Utilisation recommandée:
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
Liensserver
$ sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
DémarrageKafkaServices
Référence du processus de démarrage
Utiliser kafka-server-start.sh Démarrage kafka Services
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-server-start.sh ./config/server.properties
Créationtopic
Utiliser kafka-topics.sh Pour créer une copie unique d'une partition topic test
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --create --bootstrap-server 10.141.65.188:9092 --replication-factor 1 --partitions 1 --topic nginxLog
Impossible d'utiliser icilocalhost:9092,
Problèmes rencontrés
Question 1: Modification de l'instruction de version
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Erreur signalée“Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option”
Dans une version plus récente (2.2 Et plus)De Kafka Plus besoin. ZooKeeper Chaîne de connexion,C'est - à - dire:- -zookeeper localhost:2181.Utiliser Kafka BrokerDe --bootstrap-server localhost:9092Pour remplacer- -zookeeper localhost:2181.
Deuxième question:
WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Doit être compatible avec listenersEnregistrer la cohérence,Par exemple
listeners=PLAINTEXT://192.168.156.131:9092
# Doit également être utilisé lors de l'utilisation de la commande 192.168.156.131:9092 Comme adresse de connexion ,Comme suit
./kafka-console-producer.sh --broker-list 192.168.156.131:9092 --topic userlog
Voir topic Liste
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --list --bootstrap-server 10.141.65.188:9092
Générer un message,Créer un producteur de messages
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-producer.sh --broker-list 10.141.65.188:9092 --topic nginxLog
Message de consommation,Créer un consommateur de messages
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-consumer.sh --bootstrap-server 10.141.65.188:9092 --topic nginxLog --from-beginning
Dans la fenêtre des messages de production ,Saisissez le contenu, Vous pouvez l'imprimer dans la fenêtre de consommation
VoirTopicMessage
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --describe --bootstrap-server 10.141.65.188:9092 --topic nginxLog
Topic: nginxLog TopicId: t6M81RsMRPGj2tZVXaxltw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: nginxLog Partition: 0 Leader: 1 Replicas: 1 Isr: 1
La première ligne résume toutes les partitions,Chaque ligne supplémentaire donne des informations sur une partition. Comme nous n'avons qu'une seule partition,Donc il n'y a qu'une seule ligne.
“Leader”: Est le noeud responsable de toutes les lectures et écritures pour une partition donnée. Chaque noeud devient le leader de la Section de sélection aléatoire de la partition .
“Replicas”: Est une liste de noeuds qui copient ce journal de partition,Qu'ils soient ou non des leaders,Ou même s'ils sont actuellement actifs.
“Isr”: C'est un groupe.“Synchroniser”Copie.C'estreplications Sous - ensemble de la liste ,Actuellement vivant et dirigé vers le leader.
Supprimertopic
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --delete --bootstrap-server 10.141.65.188:9092 --topic nginxLogtest
Commande de démarrage:
bin/kafka-server-start.sh -daemon config/server.properties
Créationtopic
./kafka-topics.sh --create --bootstrap-server spark01:9092 --replication-factor 1 --partitions 1 --topic test2
Voirtopic
./kafka-topics.sh --bootstrap-server spark01:9092 --list
AssignertopicDonnées de production moyennes
./kafka-console-producer.sh --broker-list spark01:9092 --topic test2
Par exemple:{
"id":"1","name":"xiaoming","age":"20"}
VoirtopicContenu spécifique
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
Créer un groupe de consommateurs
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --group kafkatest
Voir les groupes de consommateurs
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --list
Voir les détails du consommateur
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --describe --group kafkatest
Données sur la consommation
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
6 Mise en œuvre du Code
logagent Code de mise en œuvregithub
1.kafka demo:
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
//ConfigurationkafkaEnvironnement
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{
"10.141.65.188:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
for i := 0; i < 10; i++ {
msg := &sarama.ProducerMessage{
}
msg.Topic = "nginxLogTest"
msg.Value = sarama.StringEncoder("this is a good test, my message is good~~12")
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
}
- tailf demo
package tailf
import (
"fmt"
"time"
"github.com/hpcloud/tail"
)
func main() {
//main()
filename := "./my.log"
tails, err := tail.TailFile(filename, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{
Offset: 0, Whence: 2}, // Position de lecture
MustExist: false, // Exiger que les documents existent ou soient exposés
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
var msg *tail.Line
var ok bool
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
fmt.Println("msg:", msg)
}
}
- config demo
package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
func main() {
conf, err := config.NewConfig("ini", "./logcollect.conf")
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
port, err := conf.Int("server::port")
if err != nil {
fmt.Println("read server:port failed, err:", err)
return
}
fmt.Println("Port:", port)
log_level := conf.String("logs::log_level")
fmt.Println("log_level:", log_level)
log_port, err := conf.Int("logs::port")
if err != nil {
fmt.Println("read logs:port failed, err:", err)
return
}
fmt.Println("log_Port:", log_port)
log_path := conf.String("logs::log_path")
fmt.Println("log_path:", log_path)
}
- logs demo
package main
import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)
func main() {
config := make(map[string]interface{
})
config["filename"] = "./logcollect.log"
config["level"] = logs.LevelTrace
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("marshal failed, err:", err)
return
}
logs.SetLogger(logs.AdapterFile, string(configStr))
logs.Debug("this is a test, my name is %s", "stu01~~")
logs.Trace("this is a trace, my name is %s", "stu02~~")
logs.Warn("this is a warn, my name is %s", "stu03~~")
}
边栏推荐
- How to make ef core 6 support dateonly type
- Uni-app 之uParse 富文本解析 完美解析富文本!
- 如何让 EF Core 6 支持 DateOnly 类型
- IOS development: understanding of dynamic library shared cache (dyld)
- 1.5 conda的使用
- Anaconda3 is missing a large number of files during and after installation, and there are no scripts and other directories
- MobileNet系列(4):MobileNetv3网络详解
- 2021:AdaVQA: Overcoming Language Priors with Adapted Margin Cosine Loss∗自适应的边缘余弦损失解决语言先验
- MySQL development environment
- Promise source code class version [III. promise source code] [detailed code comments / complete test cases]
猜你喜欢

PAT甲级 1026 Table Tennis

PostgreSQL basic command tutorial: create a new user admin to access PostgreSQL

静态时序分析-OCV和time derate

math_数集(数集符号)和集合论

Matlab | drawing of three ordinate diagram based on block diagram layout

PostgreSQL基础命令教程:创建新用户admin来访问PostgreSQL

There are two problems when Nacos calls microservices: 1 Load balancer does not contain an instance for the service 2. Connection refused

fplan-Powerplan实例

1.5 conda的使用

PAT甲级 1020 Tree Traversals
随机推荐
通信中的机器学习最佳阅读资料列表
How to make ef core 6 support dateonly type
promise源码-class版本【三、Promise源码】【代码详细注释/测试案例完整】
投资理财产品的钱有保障吗?会不会没有了?
[array]bm94 rainwater connection problem - difficult
2021:Zero-shot Visual Question Answering using Knowledge Graphs使用知识图的零次视觉问答
2021:passage retrieval for outside knowledgevisual question answering
Ledrui ldr6035 usb-c interface device supports rechargeable OTG data transmission scheme.
文旅灯光秀打破时空限制,展现景区夜游魅力
Building lightweight target detection based on mobilenet-yolov4
WPF 开源控件库Extended WPF Toolkit介绍(经典)
Semantic version 2.0.0
与STM32或GD32替换说明
敏捷开发篇--Agile Development-自用
PAT甲级 1018 Public Bike Management
Common sense of Apple's unique map architecture
Matlab | visualization of mathematical properties related to three interesting circles
Implementation of window encryption shell
math_ Number set (number set symbol) and set theory
I found a JSON visualization tool artifact. I love it!