当前位置:网站首页>Multithreading and high concurrency Day11
Multithreading and high concurrency Day11
2022-07-23 18:55:00 【Dream qiusun~】
Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right
List of articles
Preface
Personal study notes , For reference only ! Welcome to correct !
One 、Java Microbenchmark Harness(JMH)
1. brief introduction
yes Java A tool for benchmarking , The tool consists of OpenJDK Provide and maintain , The test results are reliable .
2. Official website
JMH Official website :http://openjdk.java.net/projects/code-tools/jmh/
3. establish JMH test
- establish Maven project , Add dependency :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<encoding>UTF-8</encoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<groupId>mashibing.com</groupId>
<artifactId>HelloJMH2</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.21</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
idea install JMH plug-in unit JMH plugin v1.0.3
Due to the use of annotations , Open the running program annotation configuration
compiler -> Annotation Processors -> Enable Annotation Processing
Define the class that needs to be tested PS (ParallelStream)
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class PS {
static List<Integer> nums = new ArrayList<>();
static {
Random r = new Random();
for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));
}
static void foreach() {
nums.forEach(v->isPrime(v));
}
static void parallel() {
nums.parallelStream().forEach(PS::isPrime);
}
static boolean isPrime(int num) {
for(int i=2; i<=num/2; i++) {
if(num % i == 0) return false;
}
return true;
}
}
- Write unit tests :
public class PSTest {
@Benchmark
@Warmup(iterations = 1,time = 3)
@Fork(5)
@BenchmarkMode(Mode.Throughput)
@Measurement(iterations = 1,time = 3)
public void testForEach() {
PS.foreach();
}
}
- Run test class , If you encounter the following error :
ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock ( Access denied .), exiting. Use -Djmh.ignoreLock=true to forcefully continue.
at org.openjdk.jmh.runner.Runner.run(Runner.java:216)
at org.openjdk.jmh.Main.main(Main.java:71)
This mistake is because JMH Run the that requires access to the system TMP Catalog , The solution is :
open RunConfiguration -> Environment Variables -> include system environment viables
- Read the test report
4.JMH Basic concepts in
Warmup
preheating , because JVM There will be optimizations for specific code in ( localization ), Preheating is important for test resultsMesurement
How many tests are performed in totalTimeout
Threads
Number of threads , from fork AppointBenchmark mode
Benchmarking patternsBenchmark
Which piece of code to test
5. Official example
Official example :http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/
Two 、Disruptor
1. Introduce
Home page :http://lmax-exchange.github.io/disruptor/
Source code :https://github.com/LMAX-Exchange/disruptor
GettingStarted: https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
api: http://lmax-exchange.github.io/disruptor/docs/index.html
maven: https://mvnrepository.com/artifact/com.lmax/disruptor

2.Disruptor Characteristics
contrast ConcurrentLinkedQueue : Linked list implementation
JDK There is no ConcurrentArrayQueue
Disruptor It's an array implementation
unlocked , High concurrency , Use the ring Buffer, Directly covered ( No need to clean up ) Old data , Reduce GC frequency
The event based producer consumer model is realized ( Observer mode )
3.RingBuffer

The circular queue
RingBuffer The serial number of , Point to the next available element
Using arrays to implement , No head and tail pointers
contrast ConcurrentLinkedQueue, It's faster to implement with arrays
If the length is 8, When added to section 12 On which sequence number is an element ? use 12%8 decision
When Buffer Whether to cover or wait when it is filled , from Producer decision
The length is set to 2 Of n The next power , Conducive to binary computing , for example :12%8 = 12 & (8 - 1) pos = num & (size -1)
4.Disruptor Development steps
- Definition Event - The elements in the queue that need to be processed
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value="+value+
"}";
}
}
- Definition Event factory , Used to fill the queue
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
Efficiency is involved here :disruptor When initializing , Would call Event factory , Yes ringBuffer Allocate memory in advance
GC The frequency of production will decrease
- Definition EventHandler( consumer ), Dealing with elements in containers
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent> {
/** * * @param event * @param sequence * @param endOfBatch * @throws Exception */
public static long count = 0;
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
count++;
System.out.println("["+Thread.currentThread().getName()+"]"+event+" Serial number :"+sequence);
}
}
- Main01: Official routine :
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
public class Main01 {
public static void main(String[] args) {
//The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer,must be power of 2
int ringBufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, Executors.defaultThreadFactory());
//Construct the handler
disruptor.handleEventsWith(new LongEventHandler());
//Start the Disruptor ,start all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// Official routine
long sequence = ringBuffer.next(); // Grab the next sequence
try{
LongEvent event = ringBuffer.get(sequence);// Get the entry in the Disruptor
event.setValue(8888L); //Fill with data
}finally {
ringBuffer.publish(sequence);
}
}
}
- Examples after encapsulation :
modify Main Program and write a producer to produce tasks , With other 4 In the same way ===》LongEventProducer:
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer) {
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.setValue(buffer.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
Main02:
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class Main02 {
public static void main(String[] args) {
//The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer,must be power of 2
int ringBufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, Executors.defaultThreadFactory());
//Construct the handler
disruptor.handleEventsWith(new LongEventHandler());
//Start the Disruptor ,start all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long l = 0; l<100; l++) {
bb.putLong(0, l);
producer.onData(bb);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
disruptor.shutdown();
}
}
- Use Translator The way :
Other unchanged and 5 identical ,Producer modify :
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
event.setValue(bb.getLong(0));
}
};
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer) {
ringBuffer.publishEvent(TRANSLATOR, buffer);
}
}
This step is for java8 Of lambda Expression preparation ,Translator You can also use multiple parameters .
- Use Lambda The way : It can simplify the code a lot , Only the following two parts are needed :
Event:
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value="+value+
"}";
}
}
Main:
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class Main {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("["+Thread.currentThread().getName()+"]"+event+" Serial number :"+sequence);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.setValue(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(Main::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(Main::translate, bb);
Thread.sleep(1000);
}
}
}
5.ProducerType Producer thread mode
ProducerType There are two patterns Producer.MULTI and Producer.SINGLE
The default is MULTI, Indicates that the generated in multithreaded mode sequence
If you confirm that you are a single threaded producer , Then you can specify SINGLE, Efficiency will increase
If there are multiple producers ( Multithreading ), But the mode is specified as SINGLE, What's going to happen ?
6. Wait strategy
1,( Commonly used )BlockingWaitStrategy: Through thread blocking , Wait for the producer to wake up , After being awakened , The recirculation check depends on sequence Have you consumed .
2,BusySpinWaitStrategy: The thread has been spinning and waiting , It may be more expensive cpu
3,LiteBlockingWaitStrategy: Thread blocking waits for the producer to wake up , And BlockingWaitStrategy comparison , The difference is signalNeeded.getAndSet, If two threads access one at the same time waitfor, A visit signalAll when , Can reduce the lock Lock times .
4,LiteTimeoutBlockingWaitStrategy: And LiteBlockingWaitStrategy comparison , Blocking time is set , Throw an exception after time .
5,PhasedBackoffWaitStrategy: According to the time parameter and the incoming waiting policy, which waiting policy to use
6,TimeoutBlockingWaitStrategy: be relative to BlockingWaitStrategy Come on , Set the waiting time , Beyond the backward throw exception
7,( Commonly used )YieldingWaitStrategy: Try 100 Time , then Thread.yield() Give up cpu
8,( Commonly used )SleepingWaitStrategy : sleep
7. Consumer designation
Let's see how multiple consumers specify , By default, there is only one consumer , It's also very simple when you want to have multiple consumers , Looking at the code below, I define two consumers h1、h2,disruptor.handleEventsWith(h1,h2) Here is a variable parameter , So if you want to have multiple consumers, put it in , Multiple consumers are located in multiple threads .
import java.util.concurrent.*;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.ProducerType;
public class Main06_MultiConsumer{
public static void main(String[] args) throws Exception{
//the factory for the event
LongEventFactory factory = new LongEventFactory();
//Specify the of the ring buffer,must be power of 2.
int bufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI,new SleepingWaitStrategy());
//Connect the handlers
LongEventHandler h1 = new LongEventHandler();
LongEventHandler h2 = new LongEventHandler();
disruptor.handleEventsWith(h1,h2);
//Start the Disruptor,start all threads running
disruptor.start();
//Get the ring buffer form the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//========================================================================
final int threadCount = 10;
CyclicBarrier barrier=new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for(long i=0; i<threadCount; i++){
final long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);
try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
for(int j=0; j<10;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println(" Produced "+threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
8. Consumer exception handling
Default :disruptor.setDefaultExceptionHandler()
Cover :disruptor.handleExceptionFor().with()
Look at the code below , In this method, there is a EventHandler It's our consumers , Printed in the consumer event Then immediately threw an exception , When our consumer has an exception, you can't stop the whole thread , If one consumer is abnormal, the other consumers won't work , Definitely not .handleExceptionsFor Specify... For consumers Exception processor (h1).with In the back is our ExceptionHandler What should I do after an exception occurs , Rewrite three methods , The first one is that when an exception occurs, it is simply printed out ; The second is handleOnStart If something goes wrong during startup ; Third handleOnShutdown What should you do with .
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
public class Main07_ExceptionHandler{
public static void main(String[] args) throws InterruptedException {
//the factory for the event
LongEventFactory factory = new LongEventFactory();
//Specify the of the ring buffer,must be power of 2.
int bufferSize = 1024;
//Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI,new SleepingWaitStrategy());
//Connect the handlers
EventHandler h1 = (event, sequence, end)->{
System.out.println(event);
throw new Exception(" The consumer is abnormal ");
};
disruptor.handleEventsWith(h1);
disruptor.handleExceptionsFor(h1).with(new ExceptionHandler<LongEvent>(){
@Override
public void handleEventException(Throwable throwable,long l,LongEvent longEvent){
throwable.printStackTrace();
}
@Override
public void handleOnStartException(Throwable throwable){
System.out.println("Exception Start to Handle!");
}
@Override
public void handleOnShutdownException(Throwable throwable){
System.out.println("Exception End to Handle!");
}
});
//Start the Disruptor,start all threads running
disruptor.start();
//Get the ring buffer form the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//========================================================================
final int threadCount = 1;
CyclicBarrier barrier=new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for(long i=0; i<threadCount; i++){
final long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);
try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
for(int j=0; j<10;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println(" Produced "+threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
summary
disruptor It's a ring. , Then there are multiple producers in this ring that can produce , Because it is a ring design, the efficiency will be very high , This is what we wrote when we wrote the program , First, you define it yourself Event Format of message , Then define the message factory , The message factory is used to initialize the whole ring. When the message factory is used to initialize the whole ring, all kinds of different messages in some corresponding positions are sent to it first new come out ,new Take up space after you come out , During production, we only need to take out the default space in this position and fill in the value , After filling in the value, consumers can spend inside , After consumption, producers can continue to produce inside , If your producer consumes faster , Consumption is slow , What to do when it's full , Is to use all kinds of waiting strategies , When consumers have problems, they can use ExceptionHandler To process .
边栏推荐
- 类的基础
- The first layer of OSI model: physical layer, the cornerstone of existence!
- Completion report of communication software development and Application
- Modeling just learning is very confused. How to learn the next generation role modeling process?
- EmguCV 常用函数功能说明「建议收藏」
- Use three JS realize the 'ice cream' earth, and let the earth cool for a summer
- [2020] [paper notes] phase change materials and Hypersurfaces——
- The original path is not original [if there is infringement, please contact the original blogger to delete]
- Past weaving of zero one-2022
- Database modeling
猜你喜欢
![[2020] [paper notes] phase change materials and Hypersurfaces——](/img/cc/a69afb3acd4b73a17dbbe95896404d.png)
[2020] [paper notes] phase change materials and Hypersurfaces——

【2018】【论文笔记】石墨烯场效应管及【2】——石墨烯的制备、转移

As a senior 3D modeler, I give some suggestions to novice learning partners to use
![[heavyweight] focusing on the terminal business of securities companies, Borui data released a new generation of observable platform for the core business experience of securities companies' terminals](/img/28/8d9f33ad6f54d6344429a687a7d1d7.png)
[heavyweight] focusing on the terminal business of securities companies, Borui data released a new generation of observable platform for the core business experience of securities companies' terminals

Error reporting caused by the use of responsebodyadvice interface and its solution

类的基础

LeetCode 剑指 Offer II 115.重建序列:图解 - 拓扑排序

Google正在改进所有产品中的肤色表现 践行“图像公平”理念

Analysis on the implementation of Flink exactly once delivery

零基础要学建模该从何开始?如何才能学好游戏建模?
随机推荐
识别引擎ocropy-&gt;ocropy2-&gt;OCRopus3总结
[onnx] the problem of dynamic input size (multi output / multi input)
Where should we start to learn modeling from zero foundation? How to learn game modeling well?
LM393 low power dual voltage comparator parameters, pins, application details
Jumpserver administrator account is locked
Common problems of sklearn classifier
[heavyweight] focusing on the terminal business of securities companies, Borui data released a new generation of observable platform for the core business experience of securities companies' terminals
Have a safe summer vacation, no holidays! Please keep these summer safety tips
《通信软件开发与应用》课程结业报告
并非原创的原文路径【如有侵权 请原博主联系删除】
学次世代建模是场景好还是角色好?选对职业薪资多一半
【2013】【论文笔记】太赫兹波段纳米颗粒表面增强拉曼——
Spark 安装与启动
JUC并发编程【详解及演示】
[2020] [paper notes] Based on Rydberg atom——
398. Random number index hash table method
图的存储结构与方法(二)
LeetCode 剑指 Offer II 115.重建序列:图解 - 拓扑排序
[attack and defense world web] difficulty four-star 12 point advanced question: cat
电子元件-电阻