当前位置:网站首页>Multithreading and high concurrency Day11
Multithreading and high concurrency Day11
2022-07-23 18:55:00 【Dream qiusun~】
Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right
List of articles
Preface
Personal study notes , For reference only ! Welcome to correct !
One 、Java Microbenchmark Harness(JMH)
1. brief introduction
yes Java A tool for benchmarking , The tool consists of OpenJDK Provide and maintain , The test results are reliable .
2. Official website
JMH Official website :http://openjdk.java.net/projects/code-tools/jmh/
3. establish JMH test
- establish Maven project , Add dependency :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<encoding>UTF-8</encoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<groupId>mashibing.com</groupId>
<artifactId>HelloJMH2</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.21</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
idea install JMH plug-in unit JMH plugin v1.0.3
Due to the use of annotations , Open the running program annotation configuration
compiler -> Annotation Processors -> Enable Annotation Processing
Define the class that needs to be tested PS (ParallelStream)
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class PS {
static List<Integer> nums = new ArrayList<>();
static {
Random r = new Random();
for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));
}
static void foreach() {
nums.forEach(v->isPrime(v));
}
static void parallel() {
nums.parallelStream().forEach(PS::isPrime);
}
static boolean isPrime(int num) {
for(int i=2; i<=num/2; i++) {
if(num % i == 0) return false;
}
return true;
}
}
- Write unit tests :
public class PSTest {
@Benchmark
@Warmup(iterations = 1,time = 3)
@Fork(5)
@BenchmarkMode(Mode.Throughput)
@Measurement(iterations = 1,time = 3)
public void testForEach() {
PS.foreach();
}
}
- Run test class , If you encounter the following error :
ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock ( Access denied .), exiting. Use -Djmh.ignoreLock=true to forcefully continue.
at org.openjdk.jmh.runner.Runner.run(Runner.java:216)
at org.openjdk.jmh.Main.main(Main.java:71)
This mistake is because JMH Run the that requires access to the system TMP Catalog , The solution is :
open RunConfiguration -> Environment Variables -> include system environment viables
- Read the test report
4.JMH Basic concepts in
Warmup
preheating , because JVM There will be optimizations for specific code in ( localization ), Preheating is important for test resultsMesurement
How many tests are performed in totalTimeout
Threads
Number of threads , from fork AppointBenchmark mode
Benchmarking patternsBenchmark
Which piece of code to test
5. Official example
Official example :http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/
Two 、Disruptor
1. Introduce
Home page :http://lmax-exchange.github.io/disruptor/
Source code :https://github.com/LMAX-Exchange/disruptor
GettingStarted: https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
api: http://lmax-exchange.github.io/disruptor/docs/index.html
maven: https://mvnrepository.com/artifact/com.lmax/disruptor

2.Disruptor Characteristics
contrast ConcurrentLinkedQueue : Linked list implementation
JDK There is no ConcurrentArrayQueue
Disruptor It's an array implementation
unlocked , High concurrency , Use the ring Buffer, Directly covered ( No need to clean up ) Old data , Reduce GC frequency
The event based producer consumer model is realized ( Observer mode )
3.RingBuffer

The circular queue
RingBuffer The serial number of , Point to the next available element
Using arrays to implement , No head and tail pointers
contrast ConcurrentLinkedQueue, It's faster to implement with arrays
If the length is 8, When added to section 12 On which sequence number is an element ? use 12%8 decision
When Buffer Whether to cover or wait when it is filled , from Producer decision
The length is set to 2 Of n The next power , Conducive to binary computing , for example :12%8 = 12 & (8 - 1) pos = num & (size -1)
4.Disruptor Development steps
- Definition Event - The elements in the queue that need to be processed
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value="+value+
"}";
}
}
- Definition Event factory , Used to fill the queue
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
Efficiency is involved here :disruptor When initializing , Would call Event factory , Yes ringBuffer Allocate memory in advance
GC The frequency of production will decrease
- Definition EventHandler( consumer ), Dealing with elements in containers
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent> {
/** * * @param event * @param sequence * @param endOfBatch * @throws Exception */
public static long count = 0;
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
count++;
System.out.println("["+Thread.currentThread().getName()+"]"+event+" Serial number :"+sequence);
}
}
- Main01: Official routine :
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
public class Main01 {
public static void main(String[] args) {
//The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer,must be power of 2
int ringBufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, Executors.defaultThreadFactory());
//Construct the handler
disruptor.handleEventsWith(new LongEventHandler());
//Start the Disruptor ,start all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// Official routine
long sequence = ringBuffer.next(); // Grab the next sequence
try{
LongEvent event = ringBuffer.get(sequence);// Get the entry in the Disruptor
event.setValue(8888L); //Fill with data
}finally {
ringBuffer.publish(sequence);
}
}
}
- Examples after encapsulation :
modify Main Program and write a producer to produce tasks , With other 4 In the same way ===》LongEventProducer:
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer) {
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.setValue(buffer.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
Main02:
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class Main02 {
public static void main(String[] args) {
//The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer,must be power of 2
int ringBufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, Executors.defaultThreadFactory());
//Construct the handler
disruptor.handleEventsWith(new LongEventHandler());
//Start the Disruptor ,start all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long l = 0; l<100; l++) {
bb.putLong(0, l);
producer.onData(bb);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
disruptor.shutdown();
}
}
- Use Translator The way :
Other unchanged and 5 identical ,Producer modify :
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
event.setValue(bb.getLong(0));
}
};
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer) {
ringBuffer.publishEvent(TRANSLATOR, buffer);
}
}
This step is for java8 Of lambda Expression preparation ,Translator You can also use multiple parameters .
- Use Lambda The way : It can simplify the code a lot , Only the following two parts are needed :
Event:
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value="+value+
"}";
}
}
Main:
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class Main {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("["+Thread.currentThread().getName()+"]"+event+" Serial number :"+sequence);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.setValue(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(Main::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(Main::translate, bb);
Thread.sleep(1000);
}
}
}
5.ProducerType Producer thread mode
ProducerType There are two patterns Producer.MULTI and Producer.SINGLE
The default is MULTI, Indicates that the generated in multithreaded mode sequence
If you confirm that you are a single threaded producer , Then you can specify SINGLE, Efficiency will increase
If there are multiple producers ( Multithreading ), But the mode is specified as SINGLE, What's going to happen ?
6. Wait strategy
1,( Commonly used )BlockingWaitStrategy: Through thread blocking , Wait for the producer to wake up , After being awakened , The recirculation check depends on sequence Have you consumed .
2,BusySpinWaitStrategy: The thread has been spinning and waiting , It may be more expensive cpu
3,LiteBlockingWaitStrategy: Thread blocking waits for the producer to wake up , And BlockingWaitStrategy comparison , The difference is signalNeeded.getAndSet, If two threads access one at the same time waitfor, A visit signalAll when , Can reduce the lock Lock times .
4,LiteTimeoutBlockingWaitStrategy: And LiteBlockingWaitStrategy comparison , Blocking time is set , Throw an exception after time .
5,PhasedBackoffWaitStrategy: According to the time parameter and the incoming waiting policy, which waiting policy to use
6,TimeoutBlockingWaitStrategy: be relative to BlockingWaitStrategy Come on , Set the waiting time , Beyond the backward throw exception
7,( Commonly used )YieldingWaitStrategy: Try 100 Time , then Thread.yield() Give up cpu
8,( Commonly used )SleepingWaitStrategy : sleep
7. Consumer designation
Let's see how multiple consumers specify , By default, there is only one consumer , It's also very simple when you want to have multiple consumers , Looking at the code below, I define two consumers h1、h2,disruptor.handleEventsWith(h1,h2) Here is a variable parameter , So if you want to have multiple consumers, put it in , Multiple consumers are located in multiple threads .
import java.util.concurrent.*;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.ProducerType;
public class Main06_MultiConsumer{
public static void main(String[] args) throws Exception{
//the factory for the event
LongEventFactory factory = new LongEventFactory();
//Specify the of the ring buffer,must be power of 2.
int bufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI,new SleepingWaitStrategy());
//Connect the handlers
LongEventHandler h1 = new LongEventHandler();
LongEventHandler h2 = new LongEventHandler();
disruptor.handleEventsWith(h1,h2);
//Start the Disruptor,start all threads running
disruptor.start();
//Get the ring buffer form the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//========================================================================
final int threadCount = 10;
CyclicBarrier barrier=new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for(long i=0; i<threadCount; i++){
final long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);
try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
for(int j=0; j<10;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println(" Produced "+threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
8. Consumer exception handling
Default :disruptor.setDefaultExceptionHandler()
Cover :disruptor.handleExceptionFor().with()
Look at the code below , In this method, there is a EventHandler It's our consumers , Printed in the consumer event Then immediately threw an exception , When our consumer has an exception, you can't stop the whole thread , If one consumer is abnormal, the other consumers won't work , Definitely not .handleExceptionsFor Specify... For consumers Exception processor (h1).with In the back is our ExceptionHandler What should I do after an exception occurs , Rewrite three methods , The first one is that when an exception occurs, it is simply printed out ; The second is handleOnStart If something goes wrong during startup ; Third handleOnShutdown What should you do with .
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
public class Main07_ExceptionHandler{
public static void main(String[] args) throws InterruptedException {
//the factory for the event
LongEventFactory factory = new LongEventFactory();
//Specify the of the ring buffer,must be power of 2.
int bufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI,new SleepingWaitStrategy());
//Connect the handlers
EventHandler h1 = (event, sequence, end)->{
System.out.println(event);
throw new Exception(" The consumer is abnormal ");
};
disruptor.handleEventsWith(h1);
disruptor.handleExceptionsFor(h1).with(new ExceptionHandler<LongEvent>(){
@Override
public void handleEventException(Throwable throwable,long l,LongEvent longEvent){
throwable.printStackTrace();
}
@Override
public void handleOnStartException(Throwable throwable){
System.out.println("Exception Start to Handle!");
}
@Override
public void handleOnShutdownException(Throwable throwable){
System.out.println("Exception End to Handle!");
}
});
//Start the Disruptor,start all threads running
disruptor.start();
//Get the ring buffer form the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//========================================================================
final int threadCount = 1;
CyclicBarrier barrier=new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for(long i=0; i<threadCount; i++){
final long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);
try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
for(int j=0; j<10;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println(" Produced "+threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
summary
disruptor It's a ring. , Then there are multiple producers in this ring that can produce , Because it is a ring design, the efficiency will be very high , This is what we wrote when we wrote the program , First, you define it yourself Event Format of message , Then define the message factory , The message factory is used to initialize the whole ring. When the message factory is used to initialize the whole ring, all kinds of different messages in some corresponding positions are sent to it first new come out ,new Take up space after you come out , During production, we only need to take out the default space in this position and fill in the value , After filling in the value, consumers can spend inside , After consumption, producers can continue to produce inside , If your producer consumes faster , Consumption is slow , What to do when it's full , Is to use all kinds of waiting strategies , When consumers have problems, they can use ExceptionHandler To process .
边栏推荐
- Past weaving of zero one-2022
- 80 + guests took the stage, users from more than 10 countries attended the meeting, and 70000 + viewers watched the end of "Gwei 2022 Singapore"
- DevStack云计算平台快速搭建
- [heavyweight] focusing on the terminal business of securities companies, Borui data released a new generation of observable platform for the core business experience of securities companies' terminals
- quota的使用方法
- 11.神经网络基本概念
- C#Split的用法,Split分割字符串
- Database modeling
- 398. Random number index hash table method
- 80+嘉宾登台,10余国用户参会,7万+观众收看,「GWEI 2022-新加坡」落幕
猜你喜欢

80 + guests took the stage, users from more than 10 countries attended the meeting, and 70000 + viewers watched the end of "Gwei 2022 Singapore"

Flink Exactly-Once 投递实现浅析
【论文阅读】GETNext: Trajectory Flow Map Enhanced Transformer for Next POI Recommendation
![Redis [2022 latest interview question]](/img/32/7efbc52f97eb3be90c7fa2e6863758.png)
Redis [2022 latest interview question]
![JUC concurrent programming [detailed explanation and demonstration]](/img/08/a680e4686a34f7b177c2650f330dfb.png)
JUC concurrent programming [detailed explanation and demonstration]

【2022】【论文笔记】太赫兹量子阱——

Opencv (13): brief introduction to cv2.findcontours, cv:: findcontours and description of cv2.findcontours function in various versions of opencv

有人是靠自学建模找到工作的吗?千万别让这些思维害了你

Redis【2022最新面试题】

Interviewer: what do you think is your biggest weakness?
随机推荐
LeetCode 0131. 分割回文串
Redis【超强超细 入门教程】
C#启动程序传递参数丢失双引号,如何解决?
Rhcsa note 4
Completion report of communication software development and Application
JUC并发编程【详解及演示】
作为3D资深建模师给刚入门学习伙伴的一些建议,用起来
[onnx] the problem of dynamic input size (multi output / multi input)
一文了解 NebulaGraph 上的 Spark 项目
Cell array processing
【2020】【论文笔记】相变材料与超表面——
ROS (27): the simple use of rosparam and the unsuccessful transfer of parameters through launch and its solution
Great God "magic change" airpods, equipped with usb-c interface, 3D printing shell makes maintenance easier
C#Split的用法,Split分割字符串
建模刚学习很迷茫,次世代角色建模流程具体该怎么学习?
【ONNX】动态输入尺寸的问题(多输出/多输入)
PCL:多直线拟合(RANSAC)
Past weaving of zero one-2022
Three things programmers want to do most | comics
Integer and = = compare