当前位置:网站首页>Database connection pool: implementation of connection pool function point

Database connection pool: implementation of connection pool function point

2022-06-22 15:05:00 _ Soren

Connection pool function

It mainly contains the following functions :

  1. Connection pooling in singleton mode , Constructor needs to be privatized
  2. You need a function to get the connection
  3. Functions to load configuration files
  4. Producer thread , Used to produce connections
  5. A function used to reclaim redundant idle connections , Independent threads

Data members have :

  1. Information required by the database
  2. The initial and maximum connections of the connection pool
  3. The maximum waiting time of the connection pool
  4. The timeout for the connection pool to obtain connections
  5. Storage MySQL Connected queue
  6. Maintain the mutex of the connection queue
  7. Atomic integer operations , Record the number of connections
  8. Condition variables, , For communication between producer and consumer threads .

Icon :
 Insert picture description here

Therefore, header information can be written :

Connection pool header file

#pragma once
//  Implement the connection pool function module 
#include<iostream>
#include<string>
#include<queue>
#include<mutex>
#include<atomic>
#include<thread>
#include<memory>
#include<functional>
#include<condition_variable>
#include "Connection.h"

using namespace std;

class ConnectionPool
{
    
public:
	//  Get connection pool object instance   Static 
	static ConnectionPool* getConnectionPool();

	//  Provide an interface to the outside , Provide an idle connection 
	shared_ptr<Connection> getConnection();

private:
	ConnectionPool(); //  The singleton pattern   Constructor privatization 

	bool loadConfigFile(); //  Load profile 

	//  Producer thread function ,  Responsible for the production of new connections 
	void produceConnectionTask();

	//  Recycling thread functions , Responsible for recycling redundant idle connections 
	void scannerConnectionTask();

	string _ip;				// MySQL Of IP Address 
	unsigned short _port;	// MySQL Port number 
	string _username;		// MySQL Login user name of 
	string _password;		// MySQL Login password for 
	string _dbname;			//  Database name 
	
	int _initSize;			//  The initial number of connections to the connection pool 
	int _maxSize;			//  The maximum number of connections in the connection pool 
	int _maxIdleTime;		//  The maximum waiting time of the connection pool 
	int _connectionTimeOut;	//  The timeout for the connection pool to obtain connections 

	queue<Connection*> _connectionQue; //  Storage MySQL Connected queue 
	
	mutex _queueMutex;			//  Maintain thread safe mutexes for connection queues 
	atomic_int _connectionCnt;	//  Record the... Created by the connection connection Total connections for , No more than _maxSize
	condition_variable cv;		//  Connect the communication between the production thread and the consumption thread 
};

Function realization

Connection pooling in singleton mode

The lazy singleton mode is used here , The compiler performs the locking operation itself

//  Thread safe lazy singleton mode function interface 
ConnectionPool* ConnectionPool::getConnectionPool()
{
    
	//  Static local variables , Compiler automatic lock and unlock
	static ConnectionPool pool; 
	return& pool;
}

MySQL The configuration file

stay Windows The suffix of this file is MySQL.ini

#  Configuration file of database connection pool 
ip=127.0.0.1
port=3306
username=root
password=123456
dbname=chat
initSize=10
maxSize=1024
#  The maximum idle time is seconds by default 
maxIdleTime=60
#  Connection timeout , In milliseconds 
connectionTimeOut=100

Read configuration file

//  Load configuration items from the configuration file 
bool ConnectionPool::loadConfigFile()
{
    
	FILE* pf = fopen("mysql.ini", "r");
	if (pf == nullptr)
	{
    
		LOG("mysql.ini file is not exist!");
		return false;
	}

	while (!feof(pf))
	{
    
		char line[1024] = {
     0 };
		fgets(line, 1024, pf);
		string str = line;
		int idx = str.find('=', 0);
		if (-1 == idx) //  Invalid configuration 
		{
    
			continue;
		}

		int endidx = str.find('\n', idx);
		string key = str.substr(0, idx);
		string value = str.substr(idx + 1, endidx - idx - 1);

		if (key == "ip")
		{
    
			_ip = value;
		}
		else if (key == "port")
		{
    
			_port = atoi(value.c_str());
		}
		else if (key == "username")
		{
    
			_username = value;
		}
		else if (key == "password")
		{
    
			_password = value;
		}
		else if (key == "dbname")
		{
    
			_dbname = value;
		}
		else if (key == "initSize")
		{
    
			_initSize = atoi(value.c_str());
		}
		else if (key == "maxSize")
		{
    
			_maxSize = atoi(value.c_str());
		}
		else if (key == "maxIdleTime")
		{
    
			_maxIdleTime = atoi(value.c_str());
		}
		else if (key == "connectionTimeOut")
		{
    
			_connectionTimeOut = atoi(value.c_str());
		}
	}
	return true;
}

Producer thread implementation

//  producer : Running on a separate thread , Responsible for production connection 
void ConnectionPool::produceConnectionTask()
{
    
	//  loop ( Keep listening )
	for (; ;)
	{
    
		unique_lock<mutex> lock(_queueMutex);
		while (!_connectionQue.empty())
		{
    	
			cv.wait(lock); //  The queue is not empty , The production thread enters the waiting queue for the condition variable 
		}
		
		//  The number of connections has not reached the maximum , Continue creating new connections 
		if (_connectionCnt < _maxSize)
		{
    
			Connection* p = new Connection();
			p->connect(_ip, _port, _username, _password, _dbname);
			
			p->refreshAliveTime();	//  Refresh the start time of idle 
			_connectionQue.push(p);
			_connectionCnt++;
		}

		//  Wake up all threads in the waiting queue 
		cv.notify_all();	//  Notify consumer thread ,  You can consume and connect 
	}
}

Consumer function realization

//  Provide an interface to the outside , Provide an idle connection 
shared_ptr<Connection> ConnectionPool::getConnection()
{
    
	unique_lock<mutex> lock(_queueMutex);
	while (_connectionQue.empty())
	{
    
		//  Not directly sleep
		//  When I wake up after a timeout, I find that it is still empty , Just go back to nullptr
		if (cv_status::timeout == cv.wait_for(lock, std::chrono::milliseconds(_connectionTimeOut)))
		{
    
			if (_connectionQue.empty())
			{
    
				LOG(" Getting idle connection timed out ... Failed to get connection !");
				return nullptr;
			}
		}
	}

	/*  because shared_ptr At the time of deconstruction , Will be able to Connection Resource Direct delete fall ,  Equivalent to calling Connection Destructor of ,Connection I was close It fell off ,  So you need to customize how smart pointers release resources , Instead, return the resource to the queue  */
	shared_ptr<Connection> sp(_connectionQue.front(),
		[&](Connection* pcon) {
    
			//  This is called in the server application thread. , Thread safety needs to be considered 
			unique_lock<mutex> lock(_queueMutex);

			pcon->refreshAliveTime();	//  Refresh the start time of idle 
			_connectionQue.push(pcon);
		});
	_connectionQue.pop();

	//  Consume the last one in the queue Connection, The producer thread is notified of the production connection 
	cv.notify_all(); 
	return sp;
}

Redundant connection recycling , Independent threads

Need here MySQL Implement two functions in the connected class :

clock_t _aliveTime; //  Record the survival time after entering the idle state 

//  Refresh the starting idle time of the connection 
void Connection::refreshAliveTime()
{
    
	_aliveTime = clock();
}

//  Return to the time of survival 
clock_t Connection::getAliveTime() const
{
    
	return clock() - _aliveTime;
}

Then let's look at the recycling function

//  Scan over maxIdleTime Of free connections , Recycle excess connections 
void ConnectionPool::scannerConnectionTask()
{
    
	for (; ;)
	{
    
		//  adopt sleep Simulate the timing effect 
		this_thread::sleep_for(std::chrono::seconds(_maxIdleTime));

		//  Scan the entire queue , Release more than connections 
		unique_lock<mutex> lock(_queueMutex);
		while (_connectionCnt > _initSize)
		{
    
			Connection* p = _connectionQue.front();
			if (p->getAliveTime() >= (_maxIdleTime * 1000))
			{
    
				_connectionQue.pop();
				_connectionCnt--;
				delete p;	//  call  ~Connection();
			}
			else 
			{
    
				break; //  The connection of the team head does not exceed _maxIdleTime, There must be none in the back 
			}
		}
	}
}
原网站

版权声明
本文为[_ Soren]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221339138451.html