当前位置:网站首页>multiprocessing. Detailed explanation of pool

multiprocessing. Detailed explanation of pool

2022-06-27 06:33:00 Startled

because python Global lock limit , If you want to use multi-core , You need to use multi process modules , But the module has many pits , This article records its usage and the pits it has trodden .

One 、map、apply、apply_async contrast

First post a comparison chart , To quote multiprocessin.pool

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.apply        | yes          no             yes          yes
Pool.apply_async  | yes          yes            no           no

Multi-args intend task Can I import different function;
Ordered-results Consciousness is whether the result is orderly .

See how to use it :

apply()

import multiprocessing
import os
import time,datetime

# task
def square(n):
    print(f'process id is {
      os.getpid()}')
    if n == 5:
        time.sleep(5)
    else:
    	time.sleep(1)
    return n*n

def _apply():
    pool = multiprocessing.Pool()
    for i in l:
        res = pool.apply(square, args=(i,))
        print(res)

if __name__ == '__main__':
    start_time = datetime.datetime.now()
    l = [5, 0, 1, 2, 3, 4]
    print(f'main process id is {
      os.getpid()}')
    _apply()
    end_time = datetime.datetime.now()
    print(' when : ',end_time-start_time)

Output :

main process id is 585033
child process id is 585034
25
child process id is 585035
0
child process id is 585036
1
child process id is 585037
4
child process id is 585038
9
child process id is 585039
16
 when :  0:00:11.024689

The whole process took 11s, It takes about the same time as sequential execution , And the calculation result is consistent with the parameter transfer sequence , So we can come to the conclusion that :

  • pool.apply() It's blocked , Before all child processes return , Will block the main process
  • Multiple child processes are executed sequentially

further , We can conclude that :

  • pool.apply() Cannot achieve concurrency . And the reason is that , At the same time , Only one subprocess is actually running the task . therefore , This function is really chicken ribs , I can't think of any scenarios that will be applied to it

apply_async()

def division(n):
    print(f'child process id is {
      os.getpid()}')
    time.sleep(1)
    res = 10/n
    return res
    
def _apply_async():
    #  must close+join, Otherwise, the main process runs out , The subprocess is not finished yet , You're going to report a mistake 
    pool = multiprocessing.Pool()
    for i in l:
        # proc_lst.append(pool.apply_async(square, args=(i,)))
        pool.apply_async(division, args=(i,), callback=print)
    pool.close()
    pool.join()
    
  start_time = datetime.datetime.now()
    l = [5, 0, 1, 2, 3, 4]
    print(f'main process id is {
      os.getpid()}')
    # _apply()
    _apply_async()
    end_time = datetime.datetime.now()
    print(' when : ',end_time-start_time)

Output :

main process id is 586731
child process id is 586732
child process id is 586733
child process id is 586734
child process id is 586735
child process id is 586736
child process id is 586737
10.0
2.0
5.0
3.3333333333333335
2.5
 when :  0:00:01.016798

At first glance , Total time 1s The clock , It shows that concurrency is indeed implemented . We'll find that ,l In all, there are 6 Parameters , But why is one result missing from the output ? This is it. apply_async() Where the pit is located , After in-depth study, it is found that this function has the following characteristics :

  • You can see from the name , It's asynchronous . The so-called asynchronous , The object of comparison is the main process , That is, the main process does not have to wait for the results of the child process , You can move on , This feature is achieved by adding apply_async() The function is designed to be non blocking , When calling apply_async() when , Immediately return a child process object , At this point, the subprocess may not have actually finished running , But it does not affect the main process to continue to execute .
  • apply_async() Medium callback The argument is , When the subprocess is executed , Automatically call apply_async() The function represented by , In the above example is print, So the results will be printed out . It can also be understood from this example Callback This is the concept . And if you don't show it callback Parameters , What to do if you want to get the result ? So you need to call that apply_async().get() 了 , But the function is blocked , That is, the main process will be blocked until the child process ends , So if you want to implement concurrency , It is best to start after all child processes , Go again get result .
  • pool.close() and pool.join() What's the usage? ? The former means that the process pool is closed ( Do not receive new processes , But the original process does not affect ), The latter means that the block waits for all child processes to end . Why do we have to join? As mentioned before ,apply_async() It's non blocking , If you don't join, It is possible that the main process has finished running and the sub process has not finished yet , Then those subprocesses cannot be recycled , The program will report an error , So there must be join. Some friends still have questions , What then? join You have to close? This is actually the standard way of writing , These two must be used together .
  • Compared with the parameter transfer sequence , The result is disorder .
  • Last question , Why is one of the results in the above example missing ? On closer inspection , Less 0 The corresponding result , because 10/0 Illegal exception will pop up . But why didn't you see the error report ? This is one of the pits ,apply_async() Exception in child process of function , The main process is senseless . therefore , When debugging code , Don't feel that everything is all right when you see that there is no error reported , There may be a hidden pit waiting for you !

map()

def _map():
    pool = multiprocessing.Pool()
    res = pool.map(square, l)
    print(res)

if __name__ == '__main__':
    start_time = datetime.datetime.now()
    l = [5, 0, 1, 2, 3, 4]
    print(f'main process id is {os.getpid()}')
    # _apply()
    # _apply_async()
    _map()
    end_time = datetime.datetime.now()
    print(' when : ',end_time-start_time)

Output :

main process id is 588059
child process id is 588060
child process id is 588061
child process id is 588062
child process id is 588063
child process id is 588064
child process id is 588065
[25, 0, 1, 4, 9, 16]
 when :  0:00:06.018487

when 6s about , And the result is one-time , The following conclusions can be drawn :

  • map Is to start a child process with the same number of iteratible objects at one time , Therefore, concurrency can be realized
  • This function is blocked , That is, wait until all the child processes have been executed , The main process can continue to execute
  • The results are orderly .

Two 、 Multi process data sharing Manager

this Manager Another big hole , Use with caution ! Fill the pit when you have time

原网站

版权声明
本文为[Startled]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/178/202206270609405654.html