当前位置:网站首页>Using reentrantlock and synchronized to implement blocking queue
Using reentrantlock and synchronized to implement blocking queue
2022-06-25 15:20:00 【xiao326791055】
Use JUC class , And non JUC Class implements the blocking queue
1: Use JUC Class ReentrantLock Implement blocking queue
1.1: The main use of lock.newCondition() To declare two Condition , Use Condition Of await Achieve blocking , Use signalAll To wake up .Condition Of await,signalAll Be similar to Object Of wait,notifyAll More similar ; A special document will be listed in the follow-up meeting to say Condition .
package com.xiao.bloackQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueueJuc<E> {
/**
* Use Condition Implement blocking queue
*/
private int count;// Number of elements in the queue
private int maxcount = 1<<30;// Default maximum
private List<E> containers;// Use List Store elements as queues
private final Lock lock = new ReentrantLock();// Declare a lock
private final Condition takeCondition = lock.newCondition();
private final Condition putCondition = lock.newCondition();
/**
* Constructor with default size
* @param maxcount
*/
public BlockingQueueJuc(int maxcount){
this.maxcount = maxcount;
this.count = 0;
containers = new ArrayList<>(maxcount);// Initialize directly to the maximum number
}
/**
* Use the default size constructor
*/
public BlockingQueueJuc(){
this.count = 0;
containers = new ArrayList<>(16);// Initialize directly to the maximum number
}
/**
* Put elements into the queue
* @param node
*/
public void put(E node){
try{
lock.lockInterruptibly();
if(count==maxcount){// The thread is full , Then wait
putCondition.await();
}
count++;
containers.add(node);
System.out.println("+++++++++++++++++ Add parameter , The current queue parameters are :"+count+"+++++++++++++++++++++++++");
takeCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* Take elements out of the queue
* @return
*/
public E take(){
try{
lock.lockInterruptibly();
if(count==0){
takeCondition.await();
}
count--;
E node = containers.remove(0);
System.out.println("------------------------- Take out the parameters , The current queue parameters are :"+count+"-------------------------");
takeCondition.signalAll();
putCondition.signalAll();
return node;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} finally {
lock.unlock();
}
}
}
1.2: Code testing
package com.xiao.bloackQueue;
import java.util.ArrayList;
import java.util.List;
public class BlockingQueueJucThread {
public static void main(String[] args){
BlockingQueueJuc blockingQueueJuc = new BlockingQueueJuc();
List<Thread> list = new ArrayList<>();
for(int i=0;i<100;i++){
Thread thread = new Thread(new Produer(blockingQueueJuc));
list.add(thread);
}
for(int i=0;i<100;i++){
Thread thread = new Thread(new Consumer(blockingQueueJuc));
list.add(thread);
}
list.forEach(e->{
e.start();
});
}
}
class Consumer implements Runnable{
BlockingQueueJuc blockingQueueJuc;
public Consumer(BlockingQueueJuc blockingQueueJuc){
this.blockingQueueJuc = blockingQueueJuc;
}
@Override
public void run() {
Object take = blockingQueueJuc.take();
}
}
class Produer implements Runnable{
BlockingQueueJuc blockingQueueJuc;
public Produer(BlockingQueueJuc blockingQueueJuc){
this.blockingQueueJuc = blockingQueueJuc;
}
@Override
public void run() {
blockingQueueJuc.put(Thread.currentThread().getName());
}
2: Use synchronized Implement blocking queue
2.1:synchronized Implement the blocking queue code
package com.xiao.bloackQueue;
import java.util.ArrayList;
import java.util.List;
public class BlockingQueueNotJuc<E> {
private int count;// The number of items in the current collection
private int maxcount = 1<<30;// The largest number ( It can also be specified during initialization )
private List<E> containers;
private final Object takeObject = new Object();// empty
private final Object putObject = new Object();// full
volatile boolean notalil = false;
/**
* Queue initialization of specified size
* @param maxcount
*/
public BlockingQueueNotJuc(int maxcount){
containers = new ArrayList<>(maxcount);// Initialize directly to the maximum number
this.count = 0;
this.maxcount = maxcount;
}
/**
* Default initialization queue of non specified size
*/
public BlockingQueueNotJuc(){
containers = new ArrayList<>(16);// Preinitialized to 16
this.count = 0;
}
/**
* Put data into the blocking queue
* @param node
*/
public void put(E node){
synchronized(putObject){
try {
if(count == maxcount){// Description is full , The queue needs to be blocked
putObject.wait();
}
this.count++;
containers.add(node);
// if(count==1){
// takeObject.notifyAll();
// }
if(notalil){
synchronized (takeObject) {
System.out.println(" Wake up blocked threads ");
notalil = false;
takeObject.notifyAll();
}
}
System.out.println("+++++++++++++++++ Add parameter , The current queue parameters are :"+count+"+++++++++++++++++++++++++");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* Get the contents of the blocking queue
* @return
*/
public E toke(){
synchronized(takeObject){
try {
if(count<=0){// The description is empty , You need to block the get operation
notalil = true;
System.out.println(Thread.currentThread().getName()+": Go into blocking mode ");
takeObject.wait();
}
System.out.println(Thread.currentThread().getName()+"-------------- To obtain parameters , The current queue parameters are :"+count+"---------------------");
this.count--;
E node = containers.get(0);
containers.remove(0);
if(count==(maxcount-1)){
synchronized (putObject) {
putObject.notifyAll();
}
}
return node;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
}
}
2.2: Test code
package com.xiao.bloackQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockQueueThread {
public static void main(String[] args){
BlockingQueueNotJuc queueNotJuc = new BlockingQueueNotJuc();
ExecutorService executorService = Executors.newFixedThreadPool(40);
List<Thread> threads = new ArrayList<>();
for(int i=0;i<20;i++){
Thread thread = new Thread(new PutThread(queueNotJuc,i+""));
threads.add(thread);
thread.start();
}
for(int i=0;i<20;i++){
Thread thread = new Thread(new TakeThread(queueNotJuc,i+""));
threads.add(thread);
thread.start();
}
threads.forEach(e->{
// executorService.submit(e);
//executorService.execute(e);
});
}
static class TakeThread implements Runnable{
BlockingQueueNotJuc queueNotJuc;
String name;
TakeThread(BlockingQueueNotJuc queueNotJuc,String name){
this.queueNotJuc = queueNotJuc;
this.name = name;
}
@Override
public void run() {
for(int i=0;i<100;i++){
queueNotJuc.toke();
}
}
}
static class PutThread implements Runnable{
BlockingQueueNotJuc queueNotJuc;
String name;
PutThread(BlockingQueueNotJuc queueNotJuc,String name){
this.queueNotJuc = queueNotJuc;
this.name = name;
}
@Override
public void run() {
for(int i=0;i<100;i++){
queueNotJuc.put(" The thread of :"+name);
}
}
}
}
边栏推荐
- Basic syntax and common commands of R language
- Go language modifies / removes multiple line breaks in strings
- Modal and modeless dialogs for QT
- Leetcode123 timing of buying and selling stocks III
- Qmake uses toplevel or topbuilddir
- 15 -- k points closest to the origin
- C language LNK2019 unresolved external symbols_ Main error
- Dynamic memory allocation
- Judging the number of leap years from 1 to N years
- google_ Breakpad crash detection
猜你喜欢
Character encoding minutes
Iterator failure condition
Common dynamic memory errors
Sequential programming 1
[paper notes] semi supervised object detection (ssod)
Mining procedure processing
How to cut the size of a moving picture? Try this online photo cropping tool
SPARQL learning notes of query, an rrdf query language
1090.Phone List
What moment makes you think there is a bug in the world?
随机推荐
QT loading third-party library basic operation
Software packaging and deployment
3. Sequential structure multiple choice questions
[C language] implementation of magic square array (the most complete)
One question per day, punch in
Qcodeeditor - QT based code editor
(2) Relational database
One question per day, a classic simulation question
Learning notes on February 18, 2022 (C language)
Judging the number of leap years from 1 to N years
Work of the first week
Qmake uses toplevel or topbuilddir
How to download and install Weka package
The last glory of the late Ming Dynasty - the battle of Korea
Use Matplotlib to draw a line chart
Luogu p5707 [deep foundation 2. example 12] late for school
Data feature analysis skills - correlation test
(1) Introduction
QT database connection
[try to hack] vulhub shooting range construction