当前位置:网站首页>30行自己写并发工具类(Semaphore, CyclicBarrier, CountDownLatch)
30行自己写并发工具类(Semaphore, CyclicBarrier, CountDownLatch)
2022-07-25 15:32:00 【InfoQ】
public static int[] data = new int[10];
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int temp = i;
new Thread(() -> {
Random random = new Random();
data[temp] = random.nextInt(100001);
latch.countDown();
}).start();
}
// 只有函数 latch.countDown() 至少被调用10次
// 主线程才不会被阻塞
// 这个10是在CountDownLatch初始化传递的10
latch.await();
System.out.println("求和结果为:" + Arrays.stream(data).sum());
}
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "开始等待");
// 所有线程都会调用这行代码
// 在这行代码调用的线程个数不足5
// 个的时候所有的线程都会阻塞在这里
// 只有到5的时候,这5个线程才会被放行
// 所以这行代码叫做同步点
barrier.await();
// 如果有第六个线程执行这行代码时
// 第六个线程也会被阻塞 知道第10
// 线程执行这行代码 6-10 这5个线程
// 才会被放行
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "等待完成");
}).start();
}
}
public MyCountDownLatch(int targetValue) {
// 我们需要有一个变量去保存计数器的值
this.curValue = targetValue;
}
public void countDown() {
// curValue 是一个共享变量
// 我们需要用锁保护起来
// 因此每次只有一个线程进入 lock 保护
// 的代码区域
lock.lock();
try {
// 每次执行 countDown 计数器都需要减一
// 而且如果计数器等于0我们需要唤醒哪些被
// await 函数阻塞的线程
curValue--;
if (curValue <= 0)
condition.signalAll();
}catch (Exception ignored){}
finally {
lock.unlock();
}
}
public void await() {
lock.lock();
try {
// 如果 curValue 的值大于0
// 则说明 countDown 调用次数还不够
// 需要将线程挂起 否则直接放行
if (curValue > 0)
// 使用条件变量 condition 将线程挂起
condition.await();
}catch (Exception ignored){}
finally {
lock.unlock();
}
}
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "latch执行完成");
}).start();
}
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int threadCount;
private int currentThreadNumber;
private Runnable runnable;
public MyBarrier(int count) {
threadCount = count;
}
/**
* 允许传入一个 runnable 对象
* 当放行一批线程的时候就执行这个 runnable 函数
* @param count
* @param runnable
*/
public MyBarrier(int count, Runnable runnable) {
this(count);
this.runnable = runnable;
}
public void await() {
lock.lock();
currentThreadNumber++;
try {
// 如果阻塞的线程数量不到 threadCount 需要进行阻塞
// 如果到了需要由这个线程唤醒其他线程
if (currentThreadNumber == threadCount) {
// 放行之后需要重新进行计数
// 因为放行之后 condition.await();
// 阻塞的线程个数为 0
currentThreadNumber = 0;
if (runnable != null) {
new Thread(runnable).start();
}
// 唤醒 threadCount - 1 个线程 因为当前这个线程
// 已经是在运行的状态 所以只需要唤醒 threadCount - 1
// 个被阻塞的线程
for (int i = 1; i < threadCount; i++)
condition.signal();
}else {
// 如果数目还没有达到则需要阻塞线程
condition.await();
}
}catch (Exception ignored){}
finally {
lock.unlock();
}
}
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "进入阻塞");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
barrier.await();
System.out.println(Thread.currentThread().getName() + "阻塞完成");
}).start();
}
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int semCount;
private int curCount;
public MySemaphore(int semCount) {
this.semCount = semCount;
}
public void acquire() {
lock.lock();
try {
// 正在执行临界区代码的线程个数加一
curCount++;
// 如果线程个数大于指定的能够执行的线程个数
// 需要将当前这个线程阻塞起来
// 否则直接放行
if (curCount > semCount) {
condition.await();
}
}catch (Exception ignored) {}
finally {
lock.unlock();
}
}
public void release() {
lock.lock();
try {
// 线程执行完临界区的代码
// 将要离开临界区 因此 curCount
// 需要减一
curCount--;
// 如果有线程阻塞需要唤醒被阻塞的线程
// 如果没有被阻塞的线程 这个函数执行之后
// 对结果也不会产生影响 因此在这里不需要进行
// if 判断
condition.signal();
// signal函数只对在调用signal函数之前
// 被await函数阻塞的线程产生影响 如果
// 某个线程调用 await 函数在 signal 函数
// 执行之后,那么前面那次 signal 函数调用
// 不会影响后面这次 await 函数
}catch (Exception ignored){}
finally {
lock.unlock();
}
}
边栏推荐
- GAMES101复习:变换
- Pytorch框架练习(基于Kaggle Titanic竞赛)
- 盒子躲避鼠标
- Cf888g clever dictionary tree + violent divide and conquer (XOR minimum spanning tree)
- GAMES101复习:三维变换
- MySQL—用户和权限管控
- Pat grade a 1151 LCA in a binary tree (30 points)
- IDEA—点击文件代码与目录自动同步对应
- Leetcode - 677 key value mapping (Design)*
- Leetcode - 380 o (1) time to insert, delete and get random elements (design hash table + array)
猜你喜欢

LeetCode - 677 键值映射(设计)*

LeetCode - 232 用栈实现队列 (设计 双栈实现队列)

获取键盘按下的键位对应ask码

MySQL—常用SQL语句整理总结

《图书馆管理系统——“借书还书”模块》项目研发阶段性总结

Leetcode - 380 o (1) time to insert, delete and get random elements (design hash table + array)

Get the ask code corresponding to the key pressed by the keyboard

Gary Marcus: 学习语言比你想象的更难

MATLAB读取显示图像时数据格式转换原因

Are you ready to break away from the "involution circle"?
随机推荐
Week303 of leetcode
GAMES101复习:三维变换
Cf750f1 thinking DP
使用cpolar建立一个商业网站(如何购买域名)
Window system black window redis error 20creating server TCP listening socket *: 6379: listen: unknown error19-07-28
Cf888g clever dictionary tree + violent divide and conquer (XOR minimum spanning tree)
对this对象的理解
var、let、const之间的区别
Cf365-e - Mishka and divisors, number theory +dp
Leetcode - 622 design cycle queue (Design)
Qtime定义(手工废物利用简单好看)
C # fine sorting knowledge points 9 Set 2 (recommended Collection)
PAT甲级题目目录
MySQL - user and permission control
Pytorch学习笔记-Advanced_CNN(Using Inception_Module)实现Mnist数据集分类-(注释及结果)
2021上海市赛-H-二分答案
带你详细认识JS基础语法(建议收藏)
2021 Shanghai match-b-ranked DP
Idea eye care settings
ZOJ - 4114 Flipping Game-dp,合理状态表示