当前位置:网站首页>Data warehouse 4.0 notes - user behavior data collection IV

Data warehouse 4.0 notes - user behavior data collection IV

2022-07-23 11:44:00 Silky

1 Log collection Flume install

 

 [[email protected] software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

[[email protected] module]$ mv apache-flume-1.9.0-bin/ flume

  take lib Under folder guava-11.0.2.jar Delete to be compatible with Hadoop 3.1.3

[[email protected] module]$ rm /opt/module/flume/lib/guava-11.0.2.jar

 hadoop Can work normally

  take flume/conf Under the flume-env.sh.template Change the file to flume-env.sh, And configuration flume-env.sh file

[[email protected] conf]$ mv flume-env.sh.template flume-env.sh

 [[email protected] conf]$ vi flume-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_212

distribution

[[email protected] module]$ xsync flume/

 

 

 2 Log collection Flume To configure

Flume The specific configuration is as follows :

stay /opt/module/flume/conf Create under directory file-flume-kafka.conf file

[[email protected] conf]$ vim file-flume-kafka.conf

In the file configuration, the following ( Write it down first , Then configure )

# Name each component 
a1.sources = r1
a1.channels = c1

# describe source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# Configure interceptors (ETL Data cleaning    Judge json Is it complete )
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.zhang.flume.interceptor.ETLInterceptor$Builder

# describe channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# binding source and channel as well as sink and channel The relationship between 
a1.sources.r1.channels = c1

establish Maven engineering flume-interceptor

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

  Create a package name :com.zhang.flume.interceptor

stay com.zhang.flume.interceptor Package created under JSONUtils class

package com.zhang.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils {
    public static boolean isJSONValidate(String log){
        try {
            JSON.parse(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

stay com.zhang.flume.interceptor Package created under ETLInterceptor class

package com.zhang.flume.interceptor;

import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        if (JSONUtils.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

  Package compilation

 

 

 

  You need to put the packed bag into hadoop102 Of /opt/module/flume/lib Under the folder .

[[email protected] module]$ cd flume/lib/

Upload files

Filter it out :

[[email protected] lib]$ ls | grep interceptor

  distribution

[[email protected] lib]$ xsync flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

  Get the full class name

  Put it in the initial configuration file :

  Now put the configuration file on the cluster

[[email protected] conf]$ vim file-flume-kafka.conf

distribution [[email protected] conf]$ xsync file-flume-kafka.conf

  start-up flume

[[email protected] flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

 

[[email protected] flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

 103 Also started successfully .

3 test Flume-Kafka passageway

Generate logs

[[email protected] flume]$ lg.sh

consumption Kafka data , Observe whether the console has data acquisition :

[[email protected] kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

You can see the corresponding log

 

  If it's bad hadoop102 window , Find out flume It was shut down

[[email protected] ~]$ cd /opt/module/flume/

Start at the front desk

[[email protected] flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf

  Successful launch , But after closing the client, I found that it was closed again

  add nohup

[[email protected] flume]$ nohup bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf

 

 nohup, The order can be made after you exit the account / After closing the terminal Continue to run the corresponding process .nohup Just don't hang up , Run the command without hanging up .

How to stop ?…… How to get the number 13901?

[[email protected] kafka]$ ps -ef | grep Application

[[email protected] kafka]$ ps -ef | grep Application | grep -v grep

[[email protected] kafka]$ ps -ef | grep Application | grep -v grep | awk '{print $2}'

[[email protected] kafka]$ ps -ef | grep Application | grep -v grep | awk '{print $2}' | xargs

 [[email protected] kafka]$ ps -ef | grep Application | grep -v grep | awk '{print $2}' | xargs -n1 kill -9

 Application May be replaced by other identical names , So we need to find a unique identifier flume The logo of

 

 

 4 Log collection Flume Start stop script

stay /home/atguigu/bin Create script in directory f1.sh

[[email protected] bin]$ vim f1.sh

       Fill in the script as follows

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " -------- start-up  $i  collection flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};;	
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " -------- stop it  $i  collection flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

 

 [[email protected] bin]$ chmod 777 f1.sh

Test that the stop and start are normal

 5 consumption Kafka data Flume

 

 

  Time blockers are important , Solve the problem of zero drift

consumer Flume To configure

stay hadoop104 Of /opt/module/flume/conf Create under directory kafka-flume-hdfs.conf file

( Get a1.sources.r1.interceptors.i1.type Then configure )

[[email protected] conf]$ vim kafka-flume-hdfs.conf

##  Components 
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log

## Time interceptor 
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zhang.flume.interceptor.TimeStampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

# Control the generated small files 
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

##  The control output file is a native file .
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

##  assemble 
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

Flume Time stamp interceptor ( Solve the problem of zero drift )

stay com.zhang.flume.interceptor Package created under TimeStampInterceptor class

package com.zhang.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

 

 

 

  Came to hadoop104 On :

  Transfer documents

  Check whether it is

[[email protected] lib]$ ls | grep interceptor

  Delete the previous

[[email protected] lib]$ rm -rf flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar.0

Be careful : It is suggested to check the time , here .0 Is a newly generated file , Not before .

Complete the script written before

 com.zhang.flume.interceptor.TimeStampInterceptor

  Now start writing the configuration file

 

  start-up flume

[[email protected] flume]$ nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &

  Check the date (104 On )

  Look again. 102 Log date previously configured on

  Now let's take a look at HDFS The date in is the time corresponding to the log , still 104 The system time corresponding to this machine

To open the first HDFS

[[email protected]adoop102 applog]$ lg.sh

Check the time , Is the time corresponding to the log

  episode : At first my origin_data file , Why don't you come out , But I followed the video step by step , Finally, I found out the problem :

Import jar When the package ,flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar The file is previous ,.0 Is the latest document , So you should delete the previous file , leave .0 file . Of course, if there is a problem, go back and delete it again jar package , Just re import .

 

  consumer Flume Start stop script

stay /home/zhang/bin Create script in directory f2.sh

[[email protected] bin]$ vim f2.sh

  Fill in the script as follows

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " -------- start-up  $i  consumption flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " -------- stop it  $i  consumption flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac

 [[email protected] bin]$ chmod 777 f2.sh

 

  Project experience Flume Memory optimization

modify flume Memory parameter settings

 6 Acquisition channel start / Stop script

stay /home/zhang/bin Create script in directory cluster.sh

[[email protected] bin]$ vim cluster.sh

       Fill in the script as follows ( Pay attention to the closing sequence , stop it Kafka It takes time , If shut down Kafka Then close Zookeeper, It may be due to delay , It doesn't shut down properly

#!/bin/bash

case $1 in
"start"){
        echo ==================  start-up   colony  ==================

        # start-up  Zookeeper colony 
        zk.sh start

        # start-up  Hadoop colony 
        myhadoop.sh start

        # start-up  Kafka Collection cluster 
        kf.sh start

        # start-up  Flume Collection cluster 
        f1.sh start

        # start-up  Flume Consumer clusters 
        f2.sh start

        };;
"stop"){
        echo ==================  stop it   colony  ==================

        # stop it  Flume Consumer clusters 
        f2.sh stop

        # stop it  Flume Collection cluster 
        f1.sh stop

        # stop it  Kafka Collection cluster 
        kf.sh stop

        # stop it  Hadoop colony 
        myhadoop.sh stop

        # stop it  Zookeeper colony 
        zk.sh stop

};;
esac

 

 [[email protected] bin]$ chmod 777 cluster.sh

It can be closed normally , start-up

 

7 Common problems and solutions

visit 2NN page http://hadoop104:9868, I can't see the details

 

  Find the file to modify

[[email protected] ~]$ cd /opt/module/hadoop-3.1.3/share/hadoop/hdfs/webapps/static/

 [[email protected] static]$ vim dfs-dust.js

  find 61 That's ok

modify 61 That's ok

return new Date(Number(v)).toLocaleString();

  Forced to refresh ( More tools —— Clear browser —— Clear data )

 

 

原网站

版权声明
本文为[Silky]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207230538336752.html