您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

【Python】multiprocessing多进程实例

5b51 2022/1/14 8:25:21 python 字数 16818 阅读 811 来源 www.jb51.cc/python

以下代码亲测可运行,环境py3.5 案例1:使用多进程的pool+map #coding:utf-8 importmultiprocessing

概述

以下代码亲测可运行,环境py3.5

:使用多进程的pool+map

def f(x):
return x * x

if name == "main":
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
xs = range(5)

# method 1: map
print(pool.map(f,xs))  # prints [0,1,4,9,16]

# method 2: imap
for y in pool.imap(f,xs):
    print(y)            # 0,16,respectively

# method 3: imap_unordered
for y in pool.imap_unordered(f,xs):
    print(y)           # may be in any order

cnt = 0
for _ in pool.imap_unordered(f,xs):
    sys.stdout.write('done %d/%d\r' % (cnt,len(xs)))
    cnt += 1</code></pre><p><span style="color:#cc0000;"&gt;<strong>或者</strong></span></p><pre><code class="language-python"&gt;import multiprocessing

import time

def func(msg):
for i in range(3):
print(msg)
time.sleep(1)
return "done " + msg

if name == "main":
pool = multiprocessing.Pool(processes=2)
result = []
for i in range(5):
msg = "hello %d" %(i)
result.append(pool.apply_async(func,(msg,)))
pool.close()
pool.join()
for res in result:
print(res.get())
print("Sub-process(es) done.")

# method 1: map
print(pool.map(f,xs))  # prints [0,1,4,9,16]

# method 2: imap
for y in pool.imap(f,xs):
    print(y)            # 0,16,respectively

# method 3: imap_unordered
for y in pool.imap_unordered(f,xs):
    print(y)           # may be in any order

cnt = 0
for _ in pool.imap_unordered(f,xs):
    sys.stdout.write('done %d/%d\r' % (cnt,len(xs)))
    cnt += 1</code></pre><p><span style="color:#cc0000;"&gt;<strong>或者</strong></span></p><pre><code class="language-python"&gt;import multiprocessing

if name == "main":
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
xs = range(5)

import time

def func(msg):
for i in range(3):
print(msg)
time.sleep(1)
return "done " + msg

if name == "main":
pool = multiprocessing.Pool(processes=2)
result = []
for i in range(5):
msg = "hello %d" %(i)
result.append(pool.apply_async(func,(msg,)))
pool.close()
pool.join()
for res in result:
print(res.get())
print("Sub-process(es) done.")

if name == "main":
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
xs = range(5)

# method 1: map
print(pool.map(f,xs))  # prints [0,1,4,9,16]

# method 2: imap
for y in pool.imap(f,xs):
    print(y)            # 0,16,respectively

# method 3: imap_unordered
for y in pool.imap_unordered(f,xs):
    print(y)           # may be in any order

cnt = 0
for _ in pool.imap_unordered(f,xs):
    sys.stdout.write('done %d/%d\r' % (cnt,len(xs)))
    cnt += 1</code></pre><p><span style="color:#cc0000;"&gt;<strong>或者</strong></span></p><pre><code class="language-python"&gt;import multiprocessing

import time

def func(msg):
for i in range(3):
print(msg)
time.sleep(1)
return "done " + msg

if name == "main":
pool = multiprocessing.Pool(processes=2)
result = []
for i in range(5):
msg = "hello %d" %(i)
result.append(pool.apply_async(func,(msg,)))
pool.close()
pool.join()
for res in result:
print(res.get())
print("Sub-process(es) done.")

<span style="color:#cc0000;">

<span style="color:#cc0000;">案例2:使用多进程(multiprocessing)

<code class="language-python"># Similarity and difference of multi thread vs. multi process

Written by Vamei

import os
import threading
import multiprocessing

worker function

def worker(sign,lock):
lock.acquire()
print(sign,os.getpid())
lock.release()

if name == "main":

Main

print('Main:',os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)

for process in record:
    process.join()</code></pre>注意:<p>但在使用这些共享API的时候,我们要注意以下几点:</p><p>在UNIX平台上,当某个进程终结之后,该进程需要被其父进程<a href="https://www.jb51.cc/tag/diaoyong/" target="_blank" class="keywords">调用</a>wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象<a href="https://www.jb51.cc/tag/diaoyong/" target="_blank" class="keywords">调用</a>join()<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a> (实际上等同于wait)。对于多线程来说,由于只有<a href="https://www.jb51.cc/tag/yige/" target="_blank" class="keywords">一个</a>进程,所以不存在此必要性。</p><p>multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是<a href="https://www.jb51.cc/tag/yonghu/" target="_blank" class="keywords">用户</a>进程的资源)。</p><p>多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用<a href="https://www.jb51.cc/tag/quanjubianliang/" target="_blank" class="keywords">全局变量</a>或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a>并不合适。此时我们可以通过共享内存和Manager的<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a>来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。</p><p>Process.PID中保存有PID,如果进程还没有start(),则PID为None。</p><p><span style="color:#cc0000;"&gt;<strong>案例3</strong></span>:使用多进程的quene</p><pre><code class="language-python"&gt;# Written by Vamei

import os
import multiprocessing
import time

==================

input worker

def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)

output worker

def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + '(get):' + info)
lock.release()

===================

Main

record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)

input processes

for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)

output processes

for i in range(10):
process = multiprocessing.Process(target=outputQ,lock))
process.start()
record2.append(process)

for p in record1:
p.join()

queue.close() # No more object will come,close the queue

for p in record2:
p.join()

print('Main:',os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)

for process in record:
    process.join()</code></pre>注意:<p>但在使用这些共享API的时候,我们要注意以下几点:</p><p>在UNIX平台上,当某个进程终结之后,该进程需要被其父进程<a href="https://www.jb51.cc/tag/diaoyong/" target="_blank" class="keywords">调用</a>wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象<a href="https://www.jb51.cc/tag/diaoyong/" target="_blank" class="keywords">调用</a>join()<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a> (实际上等同于wait)。对于多线程来说,由于只有<a href="https://www.jb51.cc/tag/yige/" target="_blank" class="keywords">一个</a>进程,所以不存在此必要性。</p><p>multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是<a href="https://www.jb51.cc/tag/yonghu/" target="_blank" class="keywords">用户</a>进程的资源)。</p><p>多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用<a href="https://www.jb51.cc/tag/quanjubianliang/" target="_blank" class="keywords">全局变量</a>或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a>并不合适。此时我们可以通过共享内存和Manager的<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a>来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。</p><p>Process.PID中保存有PID,如果进程还没有start(),则PID为None。</p><p><span style="color:#cc0000;"&gt;<strong>案例3</strong></span>:使用多进程的quene</p><pre><code class="language-python"&gt;# Written by Vamei

import os
import threading
import multiprocessing

def worker(sign,lock):
lock.acquire()
print(sign,os.getpid())
lock.release()

if name == "main":

import os
import multiprocessing
import time

def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)

def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + '(get):' + info)
lock.release()

record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)

for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)

for i in range(10):
process = multiprocessing.Process(target=outputQ,lock))
process.start()
record2.append(process)

for p in record1:
p.join()

queue.close() # No more object will come,close the queue

for p in record2:
p.join()

import os
import threading
import multiprocessing

def worker(sign,lock):
lock.acquire()
print(sign,os.getpid())
lock.release()

if name == "main":

print('Main:',os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)

for process in record:
    process.join()</code></pre>注意:<p>但在使用这些共享API的时候,我们要注意以下几点:</p><p>在UNIX平台上,当某个进程终结之后,该进程需要被其父进程<a href="https://www.jb51.cc/tag/diaoyong/" target="_blank" class="keywords">调用</a>wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象<a href="https://www.jb51.cc/tag/diaoyong/" target="_blank" class="keywords">调用</a>join()<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a> (实际上等同于wait)。对于多线程来说,由于只有<a href="https://www.jb51.cc/tag/yige/" target="_blank" class="keywords">一个</a>进程,所以不存在此必要性。</p><p>multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是<a href="https://www.jb51.cc/tag/yonghu/" target="_blank" class="keywords">用户</a>进程的资源)。</p><p>多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用<a href="https://www.jb51.cc/tag/quanjubianliang/" target="_blank" class="keywords">全局变量</a>或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a>并不合适。此时我们可以通过共享内存和Manager的<a href="https://www.jb51.cc/tag/fangfa/" target="_blank" class="keywords">方法</a>来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。</p><p>Process.PID中保存有PID,如果进程还没有start(),则PID为None。</p><p><span style="color:#cc0000;"&gt;<strong>案例3</strong></span>:使用多进程的quene</p><pre><code class="language-python"&gt;# Written by Vamei

import os
import multiprocessing
import time

def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)

def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + '(get):' + info)
lock.release()

record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)

for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)

for i in range(10):
process = multiprocessing.Process(target=outputQ,lock))
process.start()
record2.append(process)

for p in record1:
p.join()

queue.close() # No more object will come,close the queue

for p in record2:
p.join()


参考:http://www.cnblogs.com/vamei/archive/2012/10/12/2721484.html




总结

以上是编程之家为你收集整理的【Python】multiprocessing多进程实例全部内容,希望文章能够帮你解决【Python】multiprocessing多进程实例所遇到的程序开发问题。


如果您也喜欢它,动动您的小指点个赞吧

除非注明,文章均由 laddyq.com 整理发布,欢迎转载。

转载请注明:
链接:http://laddyq.com
来源:laddyq.com
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


联系我
置顶