当前位置:网站首页>Process pool and fallback function [easy to understand]
Process pool and fallback function [easy to understand]
2022-07-24 18:58:00 【Full stack programmer webmaster】
Hello everyone , I meet you again , I'm your friend, Quan Jun .
The process of pool
Before talking about process pool, let's talk about a concept : Data sharing
Data sharing
1. Communication between processes should avoid sharing data as much as possible
2. The data between processes is independent , You can use queues or pipes to communicate , Both are based on messaging .
Although inter process data is independent , But it can be used Manager Implement data sharing , in fact Manager It's more than that .
A command is a program , Press enter to execute ( This is just windows Under the circumstances )
tasklist Check the process
tasklist | findstr pycharm #(findstr It is filtered ),| Is the pipe (tasklist The execution content is put into the pipeline ,
Behind the pipe findstr pycharm So I took over )3.(IPC) There are two ways to realize the communication between processes : Pipes and queues
1 from multiprocessing import Manager,Process,Lock
2 def work(dic,mutex):
3 # mutex.acquire()
4 # dic['count']-=1
5 # mutex.release()
6 # You can also lock it like this
7 with mutex:
8 dic['count'] -= 1
9 if __name__ == '__main__':
10 mutex = Lock()
11 m = Manager() # Share , Because the dictionary is a shared dictionary , So we have to add a lock
12 share_dic = m.dict({'count':100})
13 p_l = []
14 for i in range(100):
15 p = Process(target=work,args=(share_dic,mutex))
16 p_l.append(p) # Add it first
17 p.start()
18 for i in p_l:
19 i.join()
20 print(share_dic)
21 # Sharing means competition ,Data sharing
Why do I have a process pool ? The concept of process pool .
In the process of dealing with problems in practice , There will be thousands of tasks to be performed when busy , There may be only a few tasks in my spare time . So when thousands of tasks need to be performed , We need to create thousands of processes ? First , Creating a process takes time , The destruction process also takes time . Second, even if we start thousands of processes , The operating system can't let them execute at the same time , This will affect the efficiency of the program . So we can't start or end the process according to the task without limitation . So what are we going to do ? ad locum , To introduce you to the concept of a process pool , Define a pool , Put a fixed number of processes in it , There's a need , Take a process in the pool to handle the task , Wait until it's done , The process does not shut down , Instead, put the process back in the process pool and wait for the task . If there are many tasks to perform , There are not enough processes in the pool , The task will wait for the previous process to finish executing the task and return , Get the idle process to continue . in other words , The number of processes in the pool is fixed , Then at most a fixed number of processes are running at the same time . This will not increase the scheduling difficulty of the operating system , It also saves the opening and closing process time , To some extent, it can achieve concurrent effect So what is a process pool ?
Process pool is to control the number of processes ps: For advanced applications of remote procedure calls , Process pool should be used ,Pool Can provide a specified number of processes , For users to call , When a new request is submitted to pool In the middle of the day , If the pool is not full , Then a new process will be created to execute
Line the request ; But if the number of processes in the pool has reached the specified maximum , Then the request will wait , Until there are processes in the pool , Reuse the process in the process pool Structure of process pool
Create a class for the process pool : If specified numprocess by 3, The process pool will create three processes from scratch , Then all three processes are used to perform all tasks , No other process will be started 1. Create a process pool
Pool([numprocess [,initializer [, initargs]]]): Create a process pool 2. Parameter Introduction
numprocess: Number of processes to create , If omitted , Default to cpu_count() Value , can os.cpu_count() see
initializer: Is the callable object to be executed when each worker process starts , The default is None
initargs: It's about passing it on initializer Parameter group for 3. Methods to introduce
p.apply(func [, args [, kwargs]]): Execute in a pool worker process
func(*args,**kwargs), And then return the result .
It's important to note that : This operation is not performed in all pool worker processes func function .
If you want to execute simultaneously through different parameters func function , Must be called from a different thread p.apply()
Function or use p.apply_async()
p.apply_async(func [, args [, kwargs]]): Execute in a pool worker process func(*args,**kwargs), And then return the result . The result of this method is AsyncResult Class ,
callback Is a callable object , Receive input parameters . When func When the result of becomes available ,
Pass on understanding to callback.callback Do not perform any blocking operations ,
Otherwise, results from other asynchronous operations will be received .
p.close(): Close process pool , Prevent further operation . It is forbidden to add tasks to the process pool ( It should be noted that it must be written in close() On top of )
1
P.jion(): Wait for all worker processes to exit . This method can only be used in close() or teminate() Then call application 1:
1 from multiprocessing import Pool
2 import os,time
3 def task(n):
4 print('[%s] is running'%os.getpid())
5 time.sleep(2)
6 print('[%s] is done'%os.getpid())
7 return n**2
8 if __name__ == '__main__':
9 # print(os.cpu_count()) # see cpu Number
10 p = Pool(4) # The maximum four processes
11 for i in range(1,7):# open 7 A mission
12 res = p.apply(task,args=(i,)) # synchronous , Wait for one to run before executing the other
13 print(' The end of this task :%s'%res)
14 p.close()# It is forbidden to add tasks to the process pool
15 p.join() # Waiting for the process pool
16 print(' Lord ')apply Synchronization process pool ( Blocking / Serial )
1 # ----------------
2 # So why do we use process pools ? This is because the process pool is used to control the number of processes ,
3 # We need to start several processes . If you don't use the process pool to realize concurrency , There will be a lot of progress
4 # If you have a lot of progress , Then your machine will be too laggy , So we control the process , Just use a few
5 # How many , It won't take up too much memory
6 from multiprocessing import Pool
7 import os,time
8 def walk(n):
9 print('task[%s] running...'%os.getpid())
10 time.sleep(3)
11 return n**2
12 if __name__ == '__main__':
13 p = Pool(4)
14 res_obj_l = []
15 for i in range(10):
16 res = p.apply_async(walk,args=(i,))
17 # print(res) # What prints out is the object
18 res_obj_l.append(res) # So now I get a list , How to get the value ? Let's use a .get Method
19 p.close() # It is forbidden to add tasks to the process pool
20 p.join()
21 # print(res_obj_l)
22 print([obj.get() for obj in res_obj_l]) # So you get apply_async Asynchronous process pool ( Non blocking )( parallel )
Sync / Asynchronous and serial / parallel
Sync / asynchronous
Synchronization is when a process executes a request , If the request takes a while to return information , Then the process will wait , The execution will not continue until the return message is received. Asynchronous means that the process does not need to wait all the time , Instead, continue to do the following , Regardless of the status of other processes . When a message is returned, the system will notify the process to process , This can improve the efficiency of execution .Serial / parallel
give an example : Those who can drive several cars in parallel can be said to be “ parallel ”, Only one car a car belongs to “ Serial ” 了 . Obviously , Parallel is much faster than serial .( Parallelism does not affect each other , Wait serially for one to finish before another )application
Using process pools to maintain a fixed number of processes ( Previous improvements in client and server )
1 from socket import *
2 from multiprocessing import Pool
3 s = socket(AF_INET,SOCK_STREAM)
4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # Port reuse
5 s.bind(('127.0.0.1',8081))
6 s.listen(5)
7 print('start running...')
8 def talk(coon,addr):
9 while True: # Communication cycle
10 try:
11 cmd = coon.recv(1024)
12 print(cmd.decode('utf-8'))
13 if not cmd: break
14 coon.send(cmd.upper())
15 print(' Send the %s'%cmd.upper().decode('utf-8'))
16 except Exception:
17 break
18 coon.close()
19 if __name__ == '__main__':
20 p = Pool(4)
21 while True:# Link loop
22 coon,addr = s.accept()
23 print(coon,addr)
24 p.apply_async(talk,args=(coon,addr))
25 s.close()
26 # Because it's a cycle , So there's no need to p.join 了 Server side
1 from socket import *
2 c = socket(AF_INET,SOCK_STREAM)
3 c.connect(('127.0.0.1',8081))
4 while True:
5 cmd = input('>>:').strip()
6 if not cmd:continue
7 c.send(cmd.encode('utf-8'))
8 data = c.recv(1024)
9 print(' It is accepted that %s'%data.decode('utf-8'))
10 c.close()client
Fallback function
When is the callback function used ?( Callback functions are most commonly used in crawlers ) Data generation is very time-consuming, and data processing is not time-consuming If the address you downloaded is completed , It will automatically remind the main process to resolve who. If it's good, it will notify the parsing function to resolve ( The power of callback function )Scenarios requiring callback functions : Once any task in the process pool is processed , Tell the main process now : I'm ready , You can handle my results . The main process calls a function to process the result , This function is called a callback function
We can spend time ( Blocking ) Tasks in the process pool , Then specify the callback function ( The main process is responsible for execution ), In this way, the main process does not need to execute the callback function I/O The process of , It's the result of the mission .
1 from multiprocessing import Pool
2 import requests
3 import os
4 import time
5 def get_page(url):
6 print('<%s> is getting [%s]' %(os.getpid(),url))
7 response = requests.get(url) # Get the address
8 time.sleep(2)
9 print('<%s> is done [%s]'%(os.getpid(),url))
10 return {'url':url,'text':response.text}
11 def parse_page(res):
12 ''' analytic function '''
13 print('<%s> parse [%s]'%(os.getpid(),res['url']))
14 with open('db.txt','a') as f:
15 parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text']))
16 f.write(parse_res)
17 if __name__ == '__main__':
18 p = Pool(4)
19 urls = [
20 'https://www.baidu.com',
21 'http://www.openstack.org',
22 'https://www.python.org',
23 'https://help.github.com/',
24 'http://www.sina.com.cn/'
25 ]
26 for url in urls:
27 obj = p.apply_async(get_page,args=(url,),callback=parse_page)
28 p.close()
29 p.join()
30 print(' Lord ',os.getpid()) # No need .get() The method Callback function ( Download a small example of a web page )
If you wait in the main process for all tasks in the process pool to finish , Re unification of treatment results , No callback function is required
1 from multiprocessing import Pool
2 import requests
3 import os
4 def get_page(url):
5 print('<%os> get [%s]' %(os.getpid(),url))
6 response = requests.get(url) # Get the address response Respond to
7 return {'url':url,'text':response.text}
8 if __name__ == '__main__':
9 p = Pool(4)
10 urls = [
11 'https://www.baidu.com',
12 'http://www.openstack.org',
13 'https://www.python.org',
14 'https://help.github.com/',
15 'http://www.sina.com.cn/'
16 ]
17 obj_l= []
18 for url in urls:
19 obj = p.apply_async(get_page,args=(url,))
20 obj_l.append(obj)
21 p.close()
22 p.join()
23 print([obj.get() for obj in obj_l])Download small examples of web pages ( No callback function is required )
Publisher : Full stack programmer stack length , Reprint please indicate the source :https://javaforall.cn/124521.html Link to the original text :https://javaforall.cn
边栏推荐
- New stage of investment
- TCL programming style guide
- Ionic4 learning notes 13 - Classification List of an East Project
- 深度学习中Dropout原理解析
- Ionic4 learning notes 5-- custom public module
- 引发0xC0000005内存违例几种可能原因分析
- Data model subclassing reference
- Nacos introduction and console service installation
- Crazy God redis notes 11
- 狂神redis笔记11
猜你喜欢
![[today in history] July 24: caldera v. Microsoft; Amd announced its acquisition of ATI; Google launches chromecast](/img/7d/7a01c8c6923077d6c201bf1ae02c8c.png)
[today in history] July 24: caldera v. Microsoft; Amd announced its acquisition of ATI; Google launches chromecast

Ionic4 learning notes 4 -- add a tab page

Go小白实现一个简易的go mock server

Type-C边充边听PD协议芯片

National vocational college skills competition network security competition -- detailed explanation of Apache security configuration
![BUUCTF-pwn[1]](/img/93/6b9fe53b31e0c846b8c2ec7ab793ce.png)
BUUCTF-pwn[1]

Nacos简介和控制台服务安装

Mysqlworkbench performance analysis tool -- Performance dashboard

MySQL1

Why is gradient the fastest changing direction of function
随机推荐
JDBC batch inserts 100000 /1million pieces of data
On dynamic application of binary array
Typora user manual
The difference between static method and instance method
OpenGL learning (III) glut two-dimensional image rendering
Rookie colleagues cost me 2K. Did you recite the secret of salary increase? (collect it quickly!)
DDR SDRAM board design guide
New stage of investment
About core files
Math
知乎上的那些神回复
深度学习中Dropout原理解析
LTSpice software power settings
Vsftpd2.3.4-端口渗透 6200 irc_3281_backdoor
Calling startActivity() from outside of an Activity context requires the FLAG_ ACTIVITY_ NEW_ TASK flag
Crazy God redis notes 11
OPENGL学习(三)GLUT二维图像绘制
[wechat applet development] custom tabbar case (custom message 99 + little hearts)
使用 tftp 无法向服务器上传文件问题解决
Type-C PD protocol chip while charging and listening