当前位置:网站首页>线程池和生产者消费者模型
线程池和生产者消费者模型
2022-07-13 18:04:00 【Mysterious superstar】
线程池:
线程池是一个异步进程,程序员只需要把要执行的函数和参数丢进线程池,线程池就能处理这个函数,要注意的是,使用异步线程处理的函数,对临界资源的访问和修改是否满足同步和互斥,一般用异步线程处理的数据最好是线程独有的。
实现方式:首先有一个线程池类,成员变量:条件变量、锁、线程安全队列、最大线程数。成员函数:构造、析构、线程处理函数。(条件变量和锁分别保护临界资源“队列”的同步和互斥性)。一个函数任务类,通过这个任务类统筹管理要出用异步线程处理的函数,包括函数名、参数,可以直接将任务类对象传入线程池的线程安全队列,线程去安全队列中拿任务。
线程池类:
class ThreadTask
{
//统一的线程处理函数,传入要并发处理的函数和函数的参数
public:
ThreadTask(int i,handle _handle):num(i),handle_t(_handle)
{}
void run()
{
return handle_t(num);
}
private:
int num;
handle handle_t;
};
class ThreadPool
{
public:
ThreadPool():size(MAX_THREAD_NUM)
{
pthread_mutex_init(&_mutex,NULL);//锁在创建线程之前初始化,任何线程退出的地方解锁
pthread_cond_init(&_cond,NULL);//条件变量,用于实现同步,锁实现互斥
for(int i=0;i<size;++i)
{
pthread_t tid;
int ret=pthread_create(&tid,NULL,thr_start,(void*)this);
if(ret!=0)
{
perror("create thread error");
exit(0);
}
}
}
bool Push(ThreadTask & task)
{
pthread_mutex_lock(&_mutex);//队列属于临界资源
threadqueue.push(task);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond);//唤醒线程不需要被保护,每个线程执行的顺序我们并不关心
return true;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
queue<ThreadTask> threadqueue;
int size;
static void * thr_start(void *arg)
{
ThreadPool * pool=(ThreadPool *)arg;
while(1)
{
//线程队列属于临界资源,对它进行操作的时候要加锁保护
pthread_mutex_lock(&pool->_mutex);
while(pool->threadqueue.empty())//由于条件变量没有计数,因此要循环判断
{
pthread_cond_wait(&pool->_cond, &pool->_mutex);//休眠,解锁是原子操作
}
ThreadTask task=pool->threadqueue.front();
pool->threadqueue.pop();
pthread_mutex_unlock(&pool->_mutex);
task.run();
}
}
};
任务类:
封装一个任务类,便于需要多线程处理的函数统筹管理。
void test(int i)
{
int tmp=i%5;
//sleep(1);
printf("%d %x\n",i,pthread_self());
//cout<<i<<" "<<pthread_self()<<endl;
return ;
}
class ThreadTask
{
//统一的线程处理函数,传入要并发处理的函数和函数的参数
public:
ThreadTask(int i,handle _handle):num(i),handle_t(_handle)
{}
void run()
{
return handle_t(num);
}
private:
int num;
handle handle_t;
};整体实现:
//
// Created by didi on 2020-08-01.
//
//实现一个线程池,首先是一个函数类让线程去处理,然后是一个线程池,通过条件变量控制线程安全队列的同步和互斥、
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>
using namespace std;
typedef void (*handle)(int);
#define MAX_THREAD_NUM 5
//用户真实想要处理的函数
void test(int i)
{
int tmp=i%5;
//sleep(1);
printf("%d %x\n",i,pthread_self());
//cout<<i<<" "<<pthread_self()<<endl;
return ;
}
class ThreadTask
{
//统一的线程处理函数,传入要并发处理的函数和函数的参数
public:
ThreadTask(int i,handle _handle):num(i),handle_t(_handle)
{}
void run()
{
return handle_t(num);
}
private:
int num;
handle handle_t;
};
class ThreadPool
{
public:
ThreadPool():size(MAX_THREAD_NUM)
{
pthread_mutex_init(&_mutex,NULL);//锁在创建线程之前初始化,任何线程退出的地方解锁
pthread_cond_init(&_cond,NULL);//条件变量,用于实现同步,锁实现互斥
for(int i=0;i<size;++i)
{
pthread_t tid;
int ret=pthread_create(&tid,NULL,thr_start,(void*)this);
if(ret!=0)
{
perror("create thread error");
exit(0);
}
}
}
bool Push(ThreadTask & task)
{
pthread_mutex_lock(&_mutex);//队列属于临界资源
threadqueue.push(task);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond);//唤醒线程不需要被保护,每个线程执行的顺序我们并不关心
return true;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
queue<ThreadTask> threadqueue;
int size;
static void * thr_start(void *arg)
{
ThreadPool * pool=(ThreadPool *)arg;
while(1)
{
//线程队列属于临界资源,对它进行操作的时候要加锁保护
pthread_mutex_lock(&pool->_mutex);
while(pool->threadqueue.empty())//由于条件变量没有计数,因此要循环判断
{
pthread_cond_wait(&pool->_cond, &pool->_mutex);//休眠,解锁是原子操作
}
ThreadTask task=pool->threadqueue.front();
pool->threadqueue.pop();
pthread_mutex_unlock(&pool->_mutex);
task.run();
}
}
};
int main()
{
ThreadPool pool;
for(int i=0;i<100;++i)
{
ThreadTask task(i,test);
pool.Push(task);
}
//exit(0);
//pthread_exit(0);
while(1)
{
sleep(1);
}
return 0;
}生产者与消费者模型:
总的来说,生产者和消费者模型是,一个场景,两种角色和三种关系:
关系分别是:生产者和生产者互斥、消费者和消费者互斥、生产者和消费者同步+互斥。
通过条件变量来实现同步,锁来实现互斥,实现一个线程安全的队列。
#define MAX_QUEUE_SIZE 10
#define MAX_THREAD_SIZE 1000
//实现一个生产者和消费者模型,一种场景,两种角色,三种关系 生产者和生产者互斥,消费者和消费者互斥,生产者和消费者同步
//用条件变量实现,根据它的实现,决定要两个条件变量,首先有一个线程安全的队列,生产者生产数据,消费者消费数据
class mutexqueue
{
public:
mutexqueue():size(MAX_QUEUE_SIZE)
{
pthread_mutex_init(&_mutex,NULL);
pthread_cond_init(&pro,NULL);//用于实现生产者的同步
pthread_cond_init(&con,NULL);//用于实现消费者的同步
}
bool Push(const int &tmp)
{
pthread_mutex_lock(&_mutex);//保护临界资源
while(qu.size()==MAX_QUEUE_SIZE)
{
pthread_cond_wait(&pro,&_mutex);//队列写满的时候阻塞等待
}
qu.push(tmp);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&con);//唤醒消费者
return true;
}
bool Pop(int * tmp)
{
pthread_mutex_lock(&_mutex);
while(qu.empty())
{
pthread_cond_wait(&con,&_mutex);//队列里面没有数据的时候阻塞等待
}
*tmp=qu.front();
qu.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&pro);//唤醒生产者
return true;
}
~mutexqueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&pro);
pthread_cond_destroy(&con);
}
private:
queue<int> qu;
int size;
pthread_mutex_t _mutex;
pthread_cond_t pro;
pthread_cond_t con;
};生产者和消费者线程和创建:
void * productor(void * arg)
{
mutexqueue * qu= (mutexqueue *)arg;
int i=0;
while(1)
{
++i;
qu->Push(i);
printf("id=%p put a mum= %d\n",pthread_self(),i);
}
return NULL;
}
void * consumer(void * arg)
{
mutexqueue * qu= (mutexqueue *)arg;
while(1)
{
int tmp;
qu->Pop(&tmp);
printf("id=%p get a num= %d\n",pthread_self(),tmp);
sleep(1);
}
return NULL;
}
int main()
{
pthread_t pro[MAX_THREAD_SIZE],con[MAX_THREAD_SIZE];
mutexqueue qu;
int num=0;
for(int i=0;i<MAX_THREAD_SIZE;++i) {
int ret = pthread_create(&pro[i], NULL, productor, (void *)&qu);
++num;
if (ret != 0)
{
printf("create productor error %d",num);
return -1;
}
}
for(int i=0;i<MAX_THREAD_SIZE;++i)
{
int ret = pthread_create(&con[i],NULL,consumer,(void *)&qu );
num++;
if(ret!=0)
{
printf("create thread error %d", num);
return -1;
}
}
for(int i=0;i<MAX_THREAD_SIZE;++i)
{
pthread_join(pro[i],NULL);//等待线程退出,防止执行到主线程的return 0;进程退出
pthread_join(con[i],NULL);
}
return 0;
}
边栏推荐
- ReentrantLock的公平与非公平核心区别
- Implementation method of three column layout (generally, it is required to write as much as possible)
- Rocket directory
- 368. Maximum divisible subset dynamic programming
- From function test to automatic test, to double the salary, I collated the super complete learning guide [with learning notes]
- Headfirst state mode source code
- Transaction module development
- 【LeetCode】307. 区域和检索 - 数组可修改
- Brief introduction to the simple seckill project
- 128. Longest continuous sequence
猜你喜欢

软件测试真的干不到35岁吗?那咋办呢...我36了...

Shut up that thing
![[sword finger offer] special summary of linked list](/img/5d/7b951a75caf8818cb1191f83b7fa94.png)
[sword finger offer] special summary of linked list

小米举办第五届IoT安全峰会,助力行业安全隐私保护

七、SAN和NAS环境下的安全实施方案实验报告

26岁,干了三年自动化,月薪才12k,能跳槽找到一个更高薪资的工作吗?

太香了, 终于明白为什么这么多人要转行软件测试了~

二、实现软件RAID实验报告

Scheduledthreadpoolexecutor source code and error explanation

SAP DUMP CALLBACK_ REJECTED_ BY_ WHITELIST - SE51, RSSCREENPAINTER
随机推荐
Multi graph detailed blocking queue - synchronousqueue
作为测试开发岗的面试官,我都是怎么选人的?
C#-Mathf
Title: the nearest common ancestor of binary tree
软件测试真的干不到35岁吗?那咋办呢...我36了...
都说软件测试工资高,那么软件测试如何才能月薪过10k呢..
小米举办第五届IoT安全峰会,助力行业安全隐私保护
SAP ABAP BP batch maintenance email address
SAP DUMP CX_ SY_ CONVERSION_ NO_ NUMBER
Implementation of hash table linear detection class template
【LeetCode】676. Implement a magic dictionary
【LeetCode】1217. Play chips
从功能测试到自动化测试,实现薪资翻倍,我整理的超全学习指南【附学习笔记】
Implementation of [priority queue (heap)] binary heap class template
Combined mode application
【LeetCode】954. Double pair array
Unity foundation to getting started - Navigation
【LeetCode】933. Recent requests
SAP BW extraction layer error s:aa 821 (bukrs)
2021/12/12 攻防世界 crypto做题记录