multiprocessing --- 基于进程的并行 — Python 3.8.2 文档

概述

multiprocessing 是一个用于产生进程的包,具有与 threading 模块相似API。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多核。可运行于 Unix 和 Windows 。

multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool

from multiprocessing import Pool

def f(x):

return x*x

if __name__ == '__main__':

with Pool(5) as p:
    print(p.map(f, \[1, 2, 3\]))

将在标准输出中打印

[1, 4, 9]

Process

multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。 Processthreading.Thread API 相同。 一个简单的多进程程序示例是:

from multiprocessing import Process

def f(name):

print('hello', name)

if __name__ == '__main__':

p = Process(target=f, args=('bob',))
p.start()
p.join()

要显示所涉及的各个进程ID,这是一个扩展示例:

from multiprocessing import Process
import os

def info(title):

print(title)
print('module name:', \_\_name\_\_)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):

info('function f')
print('hello', name)

if __name__ == '__main__':

info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()

关于为什么 if __name__ == '__main__' 部分是必需的解释,请参见 编程指导

上下文和启动方法

根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法

spawn

父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 _forkserver_,使用这个方法启动进程相当慢。

可在Unix和Windows上使用。 Windows上的默认设置。

fork

父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。

只存在于Unix。Unix中的默认值。

forkserver

程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。

可在Unix平台上使用,支持通过Unix管道传递文件描述符。

在 3.8 版更改: 对于 macOS,_spawn_ 启动方式是默认方式。 因为 fork 可能导致subprocess崩溃,被认为是不安全的,查看 bpo-33725

在 3.4 版更改: 在所有unix平台上添加支持了 spawn ,并且为一些unix平台添加了 forkserver 。在Windows上子进程不再继承所有可继承的父进程句柄。

在 Unix 上通过 spawnforkserver 方式启动多进程会同时启动一个 资源追踪 进程,负责追踪当前程序的进程产生的、并且不再被使用的命名系统资源(如命名信号量以及 SharedMemory 对象)。当所有进程退出后,资源追踪会负责释放这些仍被追踪的的对象。通常情况下是不会有这种对象的,但是假如一个子进程被某个信号杀死,就可能存在这一类资源的“泄露”情况。(泄露的信号量以及共享内存不会被释放,直到下一次系统重启,对于这两类资源来说,这是一个比较大的问题,因为操作系统允许的命名信号量的数量是有限的,而共享内存也会占据主内存的一片空间)

要选择一个启动方法,你应该在主模块的 if __name__ == '__main__' 子句中调用 set_start_method() 。例如:

import multiprocessing as mp

def foo(q):

q.put('hello')

if __name__ == '__main__':

mp.set\_start\_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()

在程序中 set_start_method() 不应该被多次调用。

或者,你可以使用 get_context() 来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法。:

import multiprocessing as mp

def foo(q):

q.put('hello')

if __name__ == '__main__':

ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()

请注意,关联到不同上下文的对象和进程之前可能不兼容。特别是,使用 fork 上下文创建的锁不能传递给使用 spawnforkserver 启动方法启动的进程。

想要使用特定启动方法的库应该使用 get_context() 以避免干扰库用户的选择。

警告

'spawn''forkserver' 启动方法当前不能在Unix上和“冻结的”可执行内容一同使用(例如,有类似 PyInstallercx_Freeze 的包产生的二进制文件)。 'fork' 启动方法可以使用。

在进程之间交换对象

multiprocessing 支持进程之间的两种通信通道:

队列

Queue 类是一个近似 queue.Queue 的克隆。 例如:

from multiprocessing import Process, Queue

def f(q):

q.put(\[42, None, 'hello'\])

if __name__ == '__main__':

q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get())    \# prints "\[42, None, 'hello'\]"
p.join()

队列是线程和进程安全的。

管道

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

from multiprocessing import Process, Pipe

def f(conn):

conn.send(\[42, None, 'hello'\])
conn.close()

if __name__ == '__main__':

parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())   \# prints "\[42, None, 'hello'\]"
p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send()recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

进程间同步

对于所有在 threading 存在的同步原语,multiprocessing 中都有类似的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock

def f(l, i):

l.acquire()
try:
    print('hello world', i)
finally:
    l.release()

if __name__ == '__main__':

lock = Lock()

for num in range(10):
    Process(target=f, args=(lock, num)).start()

不使用锁的情况下,来自于多进程的输出很容易产生混淆。

进程间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。例如,以下代码:

from multiprocessing import Process, Value, Array

def f(n, a):

n.value = 3.1415927
for i in range(len(a)):
    a\[i\] = -a\[i\]

if __name__ == '__main__':

num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr\[:\])

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务进程

Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型: listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 。例如

from multiprocessing import Process, Manager

def f(d, l):

d\[1\] = '1'
d\['2'\] = 2
d\[0.25\] = None
l.reverse()

if __name__ == '__main__':

with Manager() as manager:
    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print(d)
    print(l)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

使用工作进程

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):

return x*x

if __name__ == '__main__':

\# start 4 worker processes
with Pool(processes=4) as pool:

    \# print "\[0, 1, 4,..., 81\]"
    print(pool.map(f, range(10)))

    \# print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print(i)

    \# evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      \# runs in \*only\* one process
    print(res.get(timeout=1))             \# prints "400"

    \# evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) \# runs in \*only\* one process
    print(res.get(timeout=1))             \# prints the PID of that process

    \# launching multiple evaluations asynchronously \*may\* use more processes
    multiple_results = \[pool.apply_async(os.getpid, ()) for i in range(4)\]
    print(\[res.get(timeout=1) for res in multiple_results\])

    \# make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print(res.get(timeout=1))
    except TimeoutError:
        print("We lacked patience and got a multiprocessing.TimeoutError")

    print("For the moment, the pool remains available for more work")

\# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")

请注意,进程池的方法只能由创建它的进程使用。

注解

这个包中的功能要求子进程可以导入 __main__ 模块。虽然这在 编程指导 中有描述,但还是需要提前说明一下。这意味着一些示例在交互式解释器中不起作用,比如 multiprocessing.pool.Pool 示例。例如:

> from multiprocessing import Pool
> p = Pool(5)
> def f(x):
... return x*x
...
> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果尝试执行上面的代码,它会以一种半随机的方式将三个完整的堆栈内容交替输出,然后你只能以某种方式停止父进程。)


Original url: Access
Created at: 2020-03-27 10:46:10
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论