当前位置:网站首页>Thread pool and producer consumer model
Thread pool and producer consumer model
2022-07-16 07:37:00 【Mysterious superstar】
Thread pool :
Thread pool is an asynchronous process , The programmer only needs to throw the functions and parameters to be executed into the thread pool , The thread pool can handle this function , It should be noted that , Functions processed with asynchronous threads , Whether the access and modification of critical resources meet the requirements of synchronization and mutual exclusion , Generally, the data processed by asynchronous threads should be unique to threads .
Realization way : First, there is a thread pool class , Member variables : Condition variables, 、 lock 、 Thread safe queues 、 Maximum number of threads . Member functions : structure 、 destructor 、 Thread handler .( Condition variables and locks protect critical resources respectively “ queue ” Synchronization and mutual exclusion of ). A function task class , Through this task class, we can manage the functions that need to be processed with asynchronous threads , Include function name 、 Parameters , You can directly transfer the task class object to the thread safe queue of the thread pool , The thread goes to the safe queue to get the task .
Thread pool class :
class ThreadTask
{
// Unified thread processing function , Pass in the function to be processed concurrently and the parameters of the function
public:
ThreadTask(int i,handle _handle):num(i),handle_t(_handle)
{}
void run()
{
return handle_t(num);
}
private:
int num;
handle handle_t;
};
class ThreadPool
{
public:
ThreadPool():size(MAX_THREAD_NUM)
{
pthread_mutex_init(&_mutex,NULL);// The lock is initialized before the thread is created , Unlock where any thread exits
pthread_cond_init(&_cond,NULL);// Condition variables, , For synchronization , Lock implements mutual exclusion
for(int i=0;i<size;++i)
{
pthread_t tid;
int ret=pthread_create(&tid,NULL,thr_start,(void*)this);
if(ret!=0)
{
perror("create thread error");
exit(0);
}
}
}
bool Push(ThreadTask & task)
{
pthread_mutex_lock(&_mutex);// Queues belong to critical resources
threadqueue.push(task);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond);// Wakeup threads do not need to be protected , We don't care about the order in which each thread executes
return true;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
queue<ThreadTask> threadqueue;
int size;
static void * thr_start(void *arg)
{
ThreadPool * pool=(ThreadPool *)arg;
while(1)
{
// Thread queues belong to critical resources , Lock it when operating
pthread_mutex_lock(&pool->_mutex);
while(pool->threadqueue.empty())// Because the condition variable is not counted , Therefore, it is necessary to judge circularly
{
pthread_cond_wait(&pool->_cond, &pool->_mutex);// Sleep , Unlocking is an atomic operation
}
ThreadTask task=pool->threadqueue.front();
pool->threadqueue.pop();
pthread_mutex_unlock(&pool->_mutex);
task.run();
}
}
};
The task class :
Encapsulate a task class , It is convenient for the overall management of functions requiring multi-threaded processing .
void test(int i)
{
int tmp=i%5;
//sleep(1);
printf("%d %x\n",i,pthread_self());
//cout<<i<<" "<<pthread_self()<<endl;
return ;
}
class ThreadTask
{
// Unified thread processing function , Pass in the function to be processed concurrently and the parameters of the function
public:
ThreadTask(int i,handle _handle):num(i),handle_t(_handle)
{}
void run()
{
return handle_t(num);
}
private:
int num;
handle handle_t;
};Overall implementation :
//
// Created by didi on 2020-08-01.
//
// Implement a thread pool , First, a function class lets the thread handle , Then there is a thread pool , Control the synchronization and mutual exclusion of thread safety queues through conditional variables 、
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>
using namespace std;
typedef void (*handle)(int);
#define MAX_THREAD_NUM 5
// The function that the user really wants to handle
void test(int i)
{
int tmp=i%5;
//sleep(1);
printf("%d %x\n",i,pthread_self());
//cout<<i<<" "<<pthread_self()<<endl;
return ;
}
class ThreadTask
{
// Unified thread processing function , Pass in the function to be processed concurrently and the parameters of the function
public:
ThreadTask(int i,handle _handle):num(i),handle_t(_handle)
{}
void run()
{
return handle_t(num);
}
private:
int num;
handle handle_t;
};
class ThreadPool
{
public:
ThreadPool():size(MAX_THREAD_NUM)
{
pthread_mutex_init(&_mutex,NULL);// The lock is initialized before the thread is created , Unlock where any thread exits
pthread_cond_init(&_cond,NULL);// Condition variables, , For synchronization , Lock implements mutual exclusion
for(int i=0;i<size;++i)
{
pthread_t tid;
int ret=pthread_create(&tid,NULL,thr_start,(void*)this);
if(ret!=0)
{
perror("create thread error");
exit(0);
}
}
}
bool Push(ThreadTask & task)
{
pthread_mutex_lock(&_mutex);// Queues belong to critical resources
threadqueue.push(task);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond);// Wakeup threads do not need to be protected , We don't care about the order in which each thread executes
return true;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
queue<ThreadTask> threadqueue;
int size;
static void * thr_start(void *arg)
{
ThreadPool * pool=(ThreadPool *)arg;
while(1)
{
// Thread queues belong to critical resources , Lock it when operating
pthread_mutex_lock(&pool->_mutex);
while(pool->threadqueue.empty())// Because the condition variable is not counted , Therefore, it is necessary to judge circularly
{
pthread_cond_wait(&pool->_cond, &pool->_mutex);// Sleep , Unlocking is an atomic operation
}
ThreadTask task=pool->threadqueue.front();
pool->threadqueue.pop();
pthread_mutex_unlock(&pool->_mutex);
task.run();
}
}
};
int main()
{
ThreadPool pool;
for(int i=0;i<100;++i)
{
ThreadTask task(i,test);
pool.Push(task);
}
//exit(0);
//pthread_exit(0);
while(1)
{
sleep(1);
}
return 0;
}The producer and consumer model :
in general , The producer and consumer model is , A scene , Two roles and three relationships :
The relationships are : Producers and producers are mutually exclusive 、 Consumers and consumers are mutually exclusive 、 Producers and consumers synchronize + Mutually exclusive .
Synchronization is achieved through conditional variables , Lock to realize mutual exclusion , Implement a thread safe queue .
#define MAX_QUEUE_SIZE 10
#define MAX_THREAD_SIZE 1000
// Implement a producer and consumer model , A scene , Two characters , Three relationships Producers and producers are mutually exclusive , Consumers and consumers are mutually exclusive , Producers and consumers synchronize
// Use conditional variables to realize , According to its implementation , Decide to have two conditional variables , First, there is a thread safe queue , Producer production data , Consumer consumption data
class mutexqueue
{
public:
mutexqueue():size(MAX_QUEUE_SIZE)
{
pthread_mutex_init(&_mutex,NULL);
pthread_cond_init(&pro,NULL);// Used to synchronize producers
pthread_cond_init(&con,NULL);// Used to synchronize consumers
}
bool Push(const int &tmp)
{
pthread_mutex_lock(&_mutex);// Protect critical resources
while(qu.size()==MAX_QUEUE_SIZE)
{
pthread_cond_wait(&pro,&_mutex);// Block waiting when the queue is full
}
qu.push(tmp);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&con);// Wake up consumers
return true;
}
bool Pop(int * tmp)
{
pthread_mutex_lock(&_mutex);
while(qu.empty())
{
pthread_cond_wait(&con,&_mutex);// Block waiting when there is no data in the queue
}
*tmp=qu.front();
qu.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&pro);// Wake up producers
return true;
}
~mutexqueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&pro);
pthread_cond_destroy(&con);
}
private:
queue<int> qu;
int size;
pthread_mutex_t _mutex;
pthread_cond_t pro;
pthread_cond_t con;
};Producer and consumer threads and creation :
void * productor(void * arg)
{
mutexqueue * qu= (mutexqueue *)arg;
int i=0;
while(1)
{
++i;
qu->Push(i);
printf("id=%p put a mum= %d\n",pthread_self(),i);
}
return NULL;
}
void * consumer(void * arg)
{
mutexqueue * qu= (mutexqueue *)arg;
while(1)
{
int tmp;
qu->Pop(&tmp);
printf("id=%p get a num= %d\n",pthread_self(),tmp);
sleep(1);
}
return NULL;
}
int main()
{
pthread_t pro[MAX_THREAD_SIZE],con[MAX_THREAD_SIZE];
mutexqueue qu;
int num=0;
for(int i=0;i<MAX_THREAD_SIZE;++i) {
int ret = pthread_create(&pro[i], NULL, productor, (void *)&qu);
++num;
if (ret != 0)
{
printf("create productor error %d",num);
return -1;
}
}
for(int i=0;i<MAX_THREAD_SIZE;++i)
{
int ret = pthread_create(&con[i],NULL,consumer,(void *)&qu );
num++;
if(ret!=0)
{
printf("create thread error %d", num);
return -1;
}
}
for(int i=0;i<MAX_THREAD_SIZE;++i)
{
pthread_join(pro[i],NULL);// Wait for thread to exit , Prevent execution to the main thread return 0; Process exits
pthread_join(con[i],NULL);
}
return 0;
}
边栏推荐
猜你喜欢

MySQL - data page

Obtaining control coordinates and control properties in appium

TCP协议详解

【LeetCode】1217. Play chips

Unity3d record case dots

基于SonarQube代码质量检查

Men should be "strong", not "soft", "weak" and "empty"

30岁转行软件测试靠谱吗?一个过来人的心路历程送给迷茫的你

2年时间,涨薪20k,从外包手工到测试经理的蜕变...

Mvcc multi version concurrency control
随机推荐
【软件质量保障笔记】软件质量保障
6、 Configuration experiment report of data backup software
RAID磁盘阵列
基于SonarQube代码质量检查
【LeetCode】676. 实现一个魔法字典
Week4
jmeter中设置登录接口只调用一次
攻防世界web
Men should be "strong", not "soft", "weak" and "empty"
丑数
Chrome realizes automated testing: recording and playback web page actions
【MySQL】分页查询踩坑
Mvcc multi version concurrency control
How to solve the relationship between the two use cases?
无重叠区间
【LeetCode】1217. Play chips
通俗讲Cookie,Session,Token区别
Set up in Jenkins to show the summary of allure Report
都说软件测试工资高,那么软件测试如何才能月薪过10k呢..
数据存储与容灾(第2版)主编 鲁先志 武春岭综合训练答案