当前位置:网站首页>Multithreading and high concurrency Day11

Multithreading and high concurrency Day11

2022-07-23 18:55:00 Dream qiusun~

Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right


Preface

Personal study notes , For reference only ! Welcome to correct !


One 、Java Microbenchmark Harness(JMH)

1. brief introduction

yes Java A tool for benchmarking , The tool consists of OpenJDK Provide and maintain , The test results are reliable .

2. Official website

JMH Official website :http://openjdk.java.net/projects/code-tools/jmh/

3. establish JMH test

  1. establish Maven project , Add dependency :
   <?xml version="1.0" encoding="UTF-8"?>
   <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
   
       <properties>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           <encoding>UTF-8</encoding>
           <java.version>1.8</java.version>
           <maven.compiler.source>1.8</maven.compiler.source>
           <maven.compiler.target>1.8</maven.compiler.target>
       </properties>
   
       <groupId>mashibing.com</groupId>
       <artifactId>HelloJMH2</artifactId>
       <version>1.0-SNAPSHOT</version>
      
       <dependencies>
           <!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core -->
           <dependency>
               <groupId>org.openjdk.jmh</groupId>
               <artifactId>jmh-core</artifactId>
               <version>1.21</version>
           </dependency>
   
           <!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess -->
           <dependency>
               <groupId>org.openjdk.jmh</groupId>
               <artifactId>jmh-generator-annprocess</artifactId>
               <version>1.21</version>
               <scope>test</scope>
           </dependency>
       </dependencies>
   
   </project>
  1. idea install JMH plug-in unit JMH plugin v1.0.3

  2. Due to the use of annotations , Open the running program annotation configuration

    compiler -> Annotation Processors -> Enable Annotation Processing

  3. Define the class that needs to be tested PS (ParallelStream)

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class PS {
    
    static List<Integer> nums = new ArrayList<>();
    static {
    
        Random r = new Random();
        for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));
    }
    static void foreach() {
    
        nums.forEach(v->isPrime(v));
    }
    static void parallel() {
    
        nums.parallelStream().forEach(PS::isPrime);
    }
    static boolean isPrime(int num) {
    
        for(int i=2; i<=num/2; i++) {
    
            if(num % i == 0) return false;
        }
        return true;
    }
}
  1. Write unit tests :
public class PSTest {
    

    @Benchmark
    @Warmup(iterations = 1,time = 3)
    @Fork(5)
    @BenchmarkMode(Mode.Throughput)
    @Measurement(iterations = 1,time = 3)
    public void testForEach() {
    
        PS.foreach();
    }
}
  1. Run test class , If you encounter the following error :

   ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock ( Access denied .), exiting. Use -Djmh.ignoreLock=true to forcefully continue.
   	at org.openjdk.jmh.runner.Runner.run(Runner.java:216)
   	at org.openjdk.jmh.Main.main(Main.java:71)

This mistake is because JMH Run the that requires access to the system TMP Catalog , The solution is :

open RunConfiguration -> Environment Variables -> include system environment viables

  1. Read the test report

4.JMH Basic concepts in

  1. Warmup
    preheating , because JVM There will be optimizations for specific code in ( localization ), Preheating is important for test results

  2. Mesurement
    How many tests are performed in total

  3. Timeout

  4. Threads
    Number of threads , from fork Appoint

  5. Benchmark mode
    Benchmarking patterns

  6. Benchmark
    Which piece of code to test

5. Official example

Official example :http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/

Two 、Disruptor

1. Introduce

Home page :http://lmax-exchange.github.io/disruptor/

Source code :https://github.com/LMAX-Exchange/disruptor

GettingStarted: https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

api: http://lmax-exchange.github.io/disruptor/docs/index.html

maven: https://mvnrepository.com/artifact/com.lmax/disruptor

 Insert picture description here

2.Disruptor Characteristics

contrast ConcurrentLinkedQueue : Linked list implementation

JDK There is no ConcurrentArrayQueue

Disruptor It's an array implementation

unlocked , High concurrency , Use the ring Buffer, Directly covered ( No need to clean up ) Old data , Reduce GC frequency

The event based producer consumer model is realized ( Observer mode )

3.RingBuffer

 Insert picture description here

The circular queue

RingBuffer The serial number of , Point to the next available element

Using arrays to implement , No head and tail pointers

contrast ConcurrentLinkedQueue, It's faster to implement with arrays

If the length is 8, When added to section 12 On which sequence number is an element ? use 12%8 decision

When Buffer Whether to cover or wait when it is filled , from Producer decision

The length is set to 2 Of n The next power , Conducive to binary computing , for example :12%8 = 12 & (8 - 1) pos = num & (size -1)

4.Disruptor Development steps

  1. Definition Event - The elements in the queue that need to be processed
public class LongEvent {
    
    private long value;
    
    public long getValue() {
    
        return value;
    }
    public void setValue(long value) {
    
        this.value = value;
    }
    @Override
    public String toString() {
    
        return "LongEvent{" +
                "value="+value+
                "}";
    }
}
  1. Definition Event factory , Used to fill the queue
import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {
    
    @Override
    public LongEvent newInstance() {
    
        return new LongEvent();
    }
}

Efficiency is involved here :disruptor When initializing , Would call Event factory , Yes ringBuffer Allocate memory in advance

GC The frequency of production will decrease

  1. Definition EventHandler( consumer ), Dealing with elements in containers
import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {
    
    /** * * @param event * @param sequence * @param endOfBatch * @throws Exception */
    public static long count = 0;

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
    
        count++;
        System.out.println("["+Thread.currentThread().getName()+"]"+event+" Serial number :"+sequence);
    }
}
  1. Main01: Official routine :
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;

public class Main01 {
    
    public static void main(String[] args) {
    

        //The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer,must be power of 2
        int ringBufferSize = 1024;

        //Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, Executors.defaultThreadFactory());

        //Construct the handler
        disruptor.handleEventsWith(new LongEventHandler());

        //Start the Disruptor ,start all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        // Official routine 
        long sequence = ringBuffer.next(); // Grab the next sequence
        try{
    
            LongEvent event = ringBuffer.get(sequence);// Get the entry in the Disruptor
            event.setValue(8888L); //Fill with data
        }finally {
    
            ringBuffer.publish(sequence);
        }

    }
}
  1. Examples after encapsulation :
    modify Main Program and write a producer to produce tasks , With other 4 In the same way ===》LongEventProducer:
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class LongEventProducer {
    
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
    
        this.ringBuffer = ringBuffer;
    }
    public void onData(ByteBuffer buffer) {
    
        long sequence = ringBuffer.next();
        try {
    
            LongEvent event = ringBuffer.get(sequence);
            event.setValue(buffer.getLong(0));
        } finally {
    
            ringBuffer.publish(sequence);
        }
    }
}

Main02:

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;

public class Main02 {
    
    public static void main(String[] args) {
    

        //The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer,must be power of 2
        int ringBufferSize = 1024;

        //Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, Executors.defaultThreadFactory());

        //Construct the handler
        disruptor.handleEventsWith(new LongEventHandler());

        //Start the Disruptor ,start all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for(long l = 0; l<100; l++) {
    
            bb.putLong(0, l);
            producer.onData(bb);
            try {
    
                Thread.sleep(100);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }
        disruptor.shutdown();
    }
}
  1. Use Translator The way :
    Other unchanged and 5 identical ,Producer modify :
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;

public class LongEventProducer {
    
    private final RingBuffer<LongEvent> ringBuffer;

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
    
        @Override
        public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
    
            event.setValue(bb.getLong(0));
        }
    };
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
    
        this.ringBuffer = ringBuffer;
    }
    public void onData(ByteBuffer buffer) {
    
        ringBuffer.publishEvent(TRANSLATOR, buffer);
    }
}

This step is for java8 Of lambda Expression preparation ,Translator You can also use multiple parameters .

  1. Use Lambda The way : It can simplify the code a lot , Only the following two parts are needed :
    Event:
public class LongEvent {
    
    private long value;
    public long getValue() {
    
        return value;
    }
    public void setValue(long value) {
    
        this.value = value;
    }
    @Override
    public String toString() {
    
        return "LongEvent{" +
                "value="+value+
                "}";
    }
}

Main:

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

public class Main {
    

    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
    
        System.out.println("["+Thread.currentThread().getName()+"]"+event+" Serial number :"+sequence);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
    {
    
        event.setValue(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception
    {
    
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(Main::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
    
            bb.putLong(0, l);
            ringBuffer.publishEvent(Main::translate, bb);
            Thread.sleep(1000);
        }
    }
}

5.ProducerType Producer thread mode

ProducerType There are two patterns Producer.MULTI and Producer.SINGLE

The default is MULTI, Indicates that the generated in multithreaded mode sequence

If you confirm that you are a single threaded producer , Then you can specify SINGLE, Efficiency will increase

If there are multiple producers ( Multithreading ), But the mode is specified as SINGLE, What's going to happen ?

6. Wait strategy

1,( Commonly used )BlockingWaitStrategy: Through thread blocking , Wait for the producer to wake up , After being awakened , The recirculation check depends on sequence Have you consumed .

2,BusySpinWaitStrategy: The thread has been spinning and waiting , It may be more expensive cpu

3,LiteBlockingWaitStrategy: Thread blocking waits for the producer to wake up , And BlockingWaitStrategy comparison , The difference is signalNeeded.getAndSet, If two threads access one at the same time waitfor, A visit signalAll when , Can reduce the lock Lock times .

4,LiteTimeoutBlockingWaitStrategy: And LiteBlockingWaitStrategy comparison , Blocking time is set , Throw an exception after time .

5,PhasedBackoffWaitStrategy: According to the time parameter and the incoming waiting policy, which waiting policy to use

6,TimeoutBlockingWaitStrategy: be relative to BlockingWaitStrategy Come on , Set the waiting time , Beyond the backward throw exception

7,( Commonly used )YieldingWaitStrategy: Try 100 Time , then Thread.yield() Give up cpu

8,( Commonly used )SleepingWaitStrategy : sleep

7. Consumer designation

Let's see how multiple consumers specify , By default, there is only one consumer , It's also very simple when you want to have multiple consumers , Looking at the code below, I define two consumers h1、h2,disruptor.handleEventsWith(h1,h2) Here is a variable parameter , So if you want to have multiple consumers, put it in , Multiple consumers are located in multiple threads .

import java.util.concurrent.*;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.ProducerType;

public class Main06_MultiConsumer{
    
    public static void main(String[] args) throws Exception{
    

        //the factory for the event
        LongEventFactory factory = new LongEventFactory();

        //Specify the of the ring buffer,must be power of 2.
        int bufferSize = 1024;

        //Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI,new SleepingWaitStrategy());

        //Connect the handlers
        LongEventHandler h1 = new LongEventHandler();
        LongEventHandler h2 = new LongEventHandler();
        disruptor.handleEventsWith(h1,h2);

        //Start the Disruptor,start all threads running
        disruptor.start();

        //Get the ring buffer form the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //========================================================================
        final int threadCount = 10;
        CyclicBarrier barrier=new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for(long i=0; i<threadCount; i++){
    
            final long threadNum = i;
            service.submit(()->{
    
                System.out.printf("Thread %s ready to start!\n",threadNum);
                try{
    
                    barrier.await();
                }catch(InterruptedException e){
    
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
    
                    e.printStackTrace();
                }

                for(int j=0; j<10;j++){
    
                    ringBuffer.publishEvent((event,sequence)->{
    
                        event.set(threadNum);
                        System.out.println(" Produced "+threadNum);
                    });
                }
            });
        }

        service.shutdown();
        //disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandler.count);
    }
}

8. Consumer exception handling

Default :disruptor.setDefaultExceptionHandler()

Cover :disruptor.handleExceptionFor().with()

Look at the code below , In this method, there is a EventHandler It's our consumers , Printed in the consumer event Then immediately threw an exception , When our consumer has an exception, you can't stop the whole thread , If one consumer is abnormal, the other consumers won't work , Definitely not .handleExceptionsFor Specify... For consumers Exception processor (h1).with In the back is our ExceptionHandler What should I do after an exception occurs , Rewrite three methods , The first one is that when an exception occurs, it is simply printed out ; The second is handleOnStart If something goes wrong during startup ; Third handleOnShutdown What should you do with .

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.*;

public class Main07_ExceptionHandler{
    
    public static void main(String[] args) throws InterruptedException {
    

        //the factory for the event
        LongEventFactory factory = new LongEventFactory();

        //Specify the of the ring buffer,must be power of 2.
        int bufferSize = 1024;

        //Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI,new SleepingWaitStrategy());

        //Connect the handlers
        EventHandler h1 = (event, sequence, end)->{
    
            System.out.println(event);
            throw new Exception(" The consumer is abnormal ");
        };
        disruptor.handleEventsWith(h1);

        disruptor.handleExceptionsFor(h1).with(new ExceptionHandler<LongEvent>(){
    
            @Override
            public void handleEventException(Throwable throwable,long l,LongEvent longEvent){
    
                throwable.printStackTrace();
            }

            @Override
            public void handleOnStartException(Throwable throwable){
    
                System.out.println("Exception Start to Handle!");
            }

            @Override
            public void handleOnShutdownException(Throwable throwable){
    
                System.out.println("Exception End to Handle!");
            }
        });

        //Start the Disruptor,start all threads running
        disruptor.start();

        //Get the ring buffer form the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //========================================================================
        final int threadCount = 1;
        CyclicBarrier barrier=new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for(long i=0; i<threadCount; i++){
    
            final long threadNum = i;
            service.submit(()->{
    
                System.out.printf("Thread %s ready to start!\n",threadNum);
                try{
    
                    barrier.await();
                }catch(InterruptedException e){
    
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
    
                    e.printStackTrace();
                }

                for(int j=0; j<10;j++){
    
                    ringBuffer.publishEvent((event,sequence)->{
    
                        event.set(threadNum);
                        System.out.println(" Produced "+threadNum);
                    });
                }
            });
        }

        service.shutdown();
        //disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandler.count);
    }
}

summary

disruptor It's a ring. , Then there are multiple producers in this ring that can produce , Because it is a ring design, the efficiency will be very high , This is what we wrote when we wrote the program , First, you define it yourself Event Format of message , Then define the message factory , The message factory is used to initialize the whole ring. When the message factory is used to initialize the whole ring, all kinds of different messages in some corresponding positions are sent to it first new come out ,new Take up space after you come out , During production, we only need to take out the default space in this position and fill in the value , After filling in the value, consumers can spend inside , After consumption, producers can continue to produce inside , If your producer consumes faster , Consumption is slow , What to do when it's full , Is to use all kinds of waiting strategies , When consumers have problems, they can use ExceptionHandler To process .

原网站

版权声明
本文为[Dream qiusun~]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207231638433063.html