Python多处理教程是Python中基于进程的并行性的入门教程。
Python多处理
multiprocessing
模块允许程序员在给定机器上充分利用多个处理器。使用的API类似于经典的threading
模块。它同时提供本地和远程并发。
多进程模块通过使用子进程而不是线程来避免全局解释器锁(GIL)的限制。多进程代码的执行顺序与串行代码不同。无法保证第一个创建的进程将最先完成。
蟒蛇GIL
全局解释器锁(GIL)是Python解释器中用于同步线程执行的一种机制,因此即使在多核处理器上运行,一次也只能执行一个本机线程。
C扩展,如numpy,可以手动释放GIL来加速计算。此外,GIL在可能阻止I/O操作之前发布。
请注意,Jython和IronPython都没有GIL。
并发和并行
并发意味着两个或多个计算在同一时间范围内发生。并行意味着两个或多个计算同时发生。因此,并行是并发的一种特殊情况。它需要多个CPU单元或内核。
Python中真正的并行性是通过创建多个进程来实现的,每个进程都有一个Python解释器和自己独立的GIL。
Python具有三个并发模块:multiprocessing
、threading
和asyncio
。当任务是CPU密集型时,我们应该考虑multiprocessing
模块。当任务受I/O限制并需要大量连接时,推荐使用asyncio
模块。对于其他类型的任务,当库无法与asyncio
配合时,可以考虑使用threading
模块。
尴尬的并行
术语embarrassinblyparallel用于描述可以轻松并行运行的问题或工作负载。重要的是要认识到并非所有工作负载都可以划分为子任务并并行运行。例如那些需要在子任务之间进行大量通信的人。
完美并行计算的例子包括:
- 蒙特卡罗分析
- 数值积分
- 计算机图形渲染
- 密码学中的强力搜索
- 遗传算法
另一种可以应用并行计算的情况是当我们运行多个不同的计算时,也就是说,我们不将问题划分为子任务。例如,我们可以使用不同的算法并行运行Ï的计算。
进程与线程
进程和线程都是独立的执行序列。下表总结了进程和线程的区别:
Process | Thread |
---|---|
进程在单独的内存中运行(进程隔离) | 线程共享内存 |
使用更多内存 | 使用更少内存 |
不可能变成僵尸 | |
更多开销 | 更少开销 | 创建和销毁速度较慢 | 创建和销毁速度较快 |
更易于编码和调试 | 可以成为更难编码和调试 |
过程
Process
对象表示在单独进程中运行的活动。multiprocessing.Process
类具有threading.Thread
的所有方法的等效项。Process
构造函数应始终使用关键字参数调用。
构造函数的target
参数是run
方法调用的可调用对象。name
是进程名。start
方法启动进程的活动。join
方法会阻塞,直到其join
方法被调用的进程终止。如果提供了timeout
选项,它最多阻塞超时秒数。is_alive
方法返回一个布尔值,指示进程是否处于活动状态。terminate
方法终止进程。
__main__守卫
Python多处理风格指南建议将多处理代码放在__name__=='__main__'
习语中。这是由于在Windows上创建进程的方式。守卫是为了防止进程生成的死循环。
简单流程示例
下面是一个使用多处理
的简单程序。
#!/usr/bin/python from multiprocessing import Process def fun(name): print(f'hello {name}') def main(): p = Process(target=fun, args=('Peter',)) p.start() if __name__ == '__main__': main()
我们创建一个新流程并向其传递一个值。
def fun(name): print(f'hello {name}')
函数打印传递的参数。
def main(): p = Process(target=fun, args=('Peter',)) p.start()
创建了一个新进程。target
选项提供在新进程中运行的可调用对象。args
提供要传递的数据。multiprocessing代码放在mainguard里面。进程是用start
方法启动的。
if __name__ == '__main__': main()
代码放在__name__=='__main__'
习语中。
Python多进程连接
join
方法会阻塞主进程的执行,直到其join
方法被调用的进程终止。如果没有join
方法,主进程将不会等到进程终止。
#!/usr/bin/python from multiprocessing import Process import time def fun(): print('starting fun') time.sleep(2) print('finishing fun') def main(): p = Process(target=fun) p.start() p.join() if __name__ == '__main__': print('starting main') main() print('finishing main')
该示例在新创建的进程上调用join
。
$ ./joining.py starting main starting fun finishing fun finishing main
finishingmain消息在子进程完成后打印。
$ ./joining.py starting main finishing main starting fun finishing fun
当我们注释掉join
方法时,主进程在子进程之前完成。
在start
方法之后调用join
方法很重要。
#!/usr/bin/python from multiprocessing import Process import time def fun(val): print(f'starting fun with {val} s') time.sleep(val) print(f'finishing fun with {val} s') def main(): p1 = Process(target=fun, args=(3, )) p1.start() # p1.join() p2 = Process(target=fun, args=(2, )) p2.start() # p2.join() p3 = Process(target=fun, args=(1, )) p3.start() # p3.join() p1.join() p2.join() p3.join() print('finished main') if __name__ == '__main__': main()
如果我们错误地调用了join
方法,那么我们实际上是按顺序运行进程。(不正确的方式已注释掉。)
Python多进程is_alive
is_alive
方法确定进程是否正在运行。
#!/usr/bin/python from multiprocessing import Process import time def fun(): print('calling fun') time.sleep(2) def main(): print('main fun') p = Process(target=fun) p.start() p.join() print(f'Process p is alive: {p.is_alive()}') if __name__ == '__main__': main()
当我们用join
方法等待子进程结束时,检查时进程已经挂了。如果我们注释掉join
,进程仍然存在。
Python多处理进程ID
os.getpid
返回当前进程Id,而os.getppid
返回父进程Id。
#!/usr/bin/python from multiprocessing import Process import os def fun(): print('--------------------------') print('calling fun') print('parent process id:', os.getppid()) print('process id:', os.getpid()) def main(): print('main fun') print('process id:', os.getpid()) p1 = Process(target=fun) p1.start() p1.join() p2 = Process(target=fun) p2.start() p2.join() if __name__ == '__main__': main()
该示例运行两个子进程。它打印他们的Id和他们父母的Id。
$ ./parent_id.py main fun process id: 7605 -------------------------- calling fun parent process id: 7605 process id: 7606 -------------------------- calling fun parent process id: 7605 process id: 7607
父进程Id相同,每个子进程的进程Id不同。
命名过程
通过Process
的name
属性,我们可以为worker指定一个特定的名称。否则,模块会创建自己的名称。
#!/usr/bin/python from multiprocessing import Process, current_process import time def worker(): name = current_process().name print(name, 'Starting') time.sleep(2) print(name, 'Exiting') def service(): name = current_process().name print(name, 'Starting') time.sleep(3) print(name, 'Exiting') if __name__ == '__main__': service = Process(name='Service 1', target=service) worker1 = Process(name='Worker 1', target=worker) worker2 = Process(target=worker) # use default name worker1.start() worker2.start() service.start()
在示例中,我们创建了三个进程;其中两个被赋予了自定义名称。
$ ./naming_workers.py Worker 1 Starting Process-3 Starting Service 1 Starting Worker 1 Exiting Process-3 Exiting Service 1 Exiting
子类化过程
当我们子类化Process
时,我们覆盖了run
方法。
#!/usr/bin/python import time from multiprocessing import Process class Worker(Process): def run(self): print(f'In {self.name}') time.sleep(2) def main(): worker = Worker() worker.start() worker2 = Worker() worker2.start() worker.join() worker2.join() if __name__ == '__main__': main()
我们创建了一个继承自Process
的Worker
类。在run
方法中,我们编写了worker的代码。
Python多处理池
可以使用Pool
对象简化工作进程的管理。它控制可以提交作业的工作进程池。该池的map
方法将给定的可迭代对象分成许多块,将这些块作为单独的任务提交给进程池。池的map
是内置map
方法的并行等价物。map
阻塞主执行,直到所有计算完成。
Pool
可以把进程数作为一个参数。这是一个我们可以试验的值。如果我们不提供任何值,则使用os.cpu_count
返回的数字。
#!/usr/bin/python import time from timeit import default_timer as timer from multiprocessing import Pool, cpu_count def square(n): time.sleep(2) return n * n def main(): start = timer() print(f'starting computations on {cpu_count()} cores') values = (2, 4, 6, 8) with Pool() as pool: res = pool.map(square, values) print(res) end = timer() print(f'elapsed time: {end - start}') if __name__ == '__main__': main()
在示例中,我们创建了一个进程池并在square
函数上应用了值。核心数由cpu_unit
函数决定。
$ ./worker_pool.py starting computations on 4 cores [4, 16, 36, 64] elapsed time: 2.0256662130013865
在具有四核的计算机上,完成四次计算需要略多于2秒的时间,每次持续两秒。
$ ./worker_pool.py starting computations on 4 cores [4, 16, 36, 64, 100] elapsed time: 4.029600699999719
当我们添加要计算的附加值时,时间增加到超过四秒。
多个参数
要将多个参数传递给辅助函数,我们可以使用starmap
方法。可迭代对象的元素应该是作为参数解包的可迭代对象。
#!/usr/bin/python import time from timeit import default_timer as timer from multiprocessing import Pool, cpu_count def power(x, n): time.sleep(1) return x ** n def main(): start = timer() print(f'starting computations on {cpu_count()} cores') values = ((2, 2), (4, 3), (5, 5)) with Pool() as pool: res = pool.starmap(power, values) print(res) end = timer() print(f'elapsed time: {end - start}') if __name__ == '__main__': main()
在此示例中,我们将两个值传递给power
函数:thevalue和exponent。
$ ./multi_args.py starting computations on 4 cores [4, 64, 3125] elapsed time: 1.0230950259974634
多功能
以下示例展示了如何在池中运行多个函数。
#!/usr/bin/python from multiprocessing import Pool import functools def inc(x): return x + 1 def dec(x): return x - 1 def add(x, y): return x + y def smap(f): return f() def main(): f_inc = functools.partial(inc, 4) f_dec = functools.partial(dec, 2) f_add = functools.partial(add, 3, 4) with Pool() as pool: res = pool.map(smap, [f_inc, f_dec, f_add]) print(res) if __name__ == '__main__': main()
我们有三个函数,它们在一个池中独立运行。我们使用functools.partial
在函数执行前准备函数及其参数。
$ ./multiple_functions.py [5, 1, 7]
Python多处理运算
Ï是任何圆的周长与圆的直径之比。Ï是一个无理数,其小数形式既不结束也不重复。它约等于3.14159。有几个公式可以计算Ï。
计算μ的近似值可能需要很长时间,因此我们可以利用并行计算。我们使用BaileyâBorweinâPlouffe公式来计算Ï。
#!/usr/bin/python from decimal import Decimal, getcontext from timeit import default_timer as timer def pi(precision): getcontext().prec = precision return sum(1/Decimal(16)**k * (Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) - Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)) for k in range (precision)) start = timer() values = (1000, 1500, 2000) data = list(map(pi, values)) print(data) end = timer() print(f'sequentially: {end - start}')
首先,我们依次计算三个近似值。精度是计算出的μ的位数。
$ ./calc_pi.py ... sequentially: 0.5738053179993585
在我们的机器上,计算这三个近似值需要0.57381秒。
在下面的示例中,我们使用进程池来计算三个近似值。
#!/usr/bin/python from decimal import Decimal, getcontext from timeit import default_timer as timer from multiprocessing import Pool, current_process import time def pi(precision): getcontext().prec=precision return sum(1/Decimal(16)**k * (Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) - Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)) for k in range (precision)) def main(): start = timer() with Pool(3) as pool: values = (1000, 1500, 2000) data = pool.map(pi, values) print(data) end = timer() print(f'paralelly: {end - start}') if __name__ == '__main__': main()
我们在三个进程的池中运行计算,我们获得了一些小的效率提升。
./calc_pi2.py ... paralelly: 0.38216479000038817
当我们并行运行计算时,耗时0.38216479秒。
在一个进程中分离内存
在多进程中,每个worker都有自己的内存。内存不像线程那样共享。
#!/usr/bin/python from multiprocessing import Process, current_process data = [1, 2] def fun(): global data data.extend((3, 4, 5)) print(f'Result in {current_process().name}: {data}') def main(): worker = Process(target=fun) worker.start() worker.join() print(f'Result in main: {data}') if __name__ == '__main__': main()
我们创建了一个worker,我们将全局data
列表传递给它。我们向worker中的列表添加了额外的值,但主进程中的原始列表没有被修改。
$ ./own_memory_space.py Result in Process-1: [1, 2, 3, 4, 5] Result in main: [1, 2]
从输出中我们可以看出,这两个列表是分开的。
进程间共享状态
可以使用Value
或Array
将数据存储在共享内存中。
#!/usr/bin/python from multiprocessing import Process, Value from time import sleep def f(counter): sleep(1) with counter.get_lock(): counter.value += 1 print(f'Counter: {counter.value}') def main(): counter = Value('i', 0) processes = [Process(target=f, args=(counter, )) for _ in range(30)] for p in processes: p.start() for p in processes: p.join() if __name__ == '__main__': main()
该示例创建了一个在进程之间共享的计数器对象。每个进程都会增加计数器。
with counter.get_lock(): counter.value += 1
每个进程都必须为自己获取一个锁。
使用队列传递消息
消息传递是进程间通信的首选方式。消息传递避免了必须使用同步原语,例如锁,这些同步原语在复杂情况下难以使用且容易出错。
为了传递消息,我们可以利用管道连接两个进程。队列允许多个生产者和消费者。
#!/usr/bin/python from multiprocessing import Process, Queue import random def rand_val(queue): num = random.random() queue.put(num) def main(): queue = Queue() processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)] for p in processes: p.start() for p in processes: p.join() results = [queue.get() for _ in processes] print(results) if __name__ == "__main__": main()
在示例中,我们创建了四个进程。每个进程生成一个随机值并将其放入队列中。所有进程完成后,我们从队列中获取所有值。
processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)]
队列作为参数传递给进程。
results = [queue.get() for _ in processes]
get
方法从队列中删除并返回项目。
$ ./simple_queue.py [0.7829025790441544, 0.46465345633928223, 0.4804438310782676, 0.7146952404346074]
该示例生成一个包含四个随机值的列表。
在下面的示例中,我们将单词放入队列中。创建的进程从队列中读取单词。
#!/usr/bin/python from multiprocessing import Queue, Process, current_process def worker(queue): name = current_process().name print(f'{name} data received: {queue.get()}') def main(): queue = Queue() queue.put("wood") queue.put("sky") queue.put("cloud") queue.put("ocean") processes = [Process(target=worker, args=(queue,)) for _ in range(4)] for p in processes: p.start() for p in processes: p.join() if __name__ == "__main__": main()
创建了四个进程;他们每个人都从队列中读取一个单词并打印出来。
$ ./simple_queue2.py Process-1 data received: wood Process-2 data received: sky Process-3 data received: cloud Process-4 data received: ocean
排队顺序
在多处理中,不能保证进程按特定顺序完成。
#!/usr/bin/python from multiprocessing import Process, Queue import time import random def square(idx, x, queue): time.sleep(random.randint(1, 3)) queue.put((idx, x * x)) def main(): data = [2, 4, 6, 3, 5, 8, 9, 7] queue = Queue() processes = [Process(target=square, args=(idx, val, queue)) for idx, val in enumerate(data)] for p in processes: p.start() for p in processes: p.join() unsorted_result = [queue.get() for _ in processes] result = [val[1] for val in sorted(unsorted_result)] print(result) if __name__ == '__main__': main()
我们有计算值平方的过程。输入数据是按一定顺序排列的,我们需要维护这个顺序。为了解决这个问题,我们为每个输入值保留了一个额外的索引。
def square(idx, x, queue): time.sleep(random.randint(1, 3)) queue.put((idx, x * x))
为了说明变化,我们使用sleep
方法随机减慢计算速度。我们将索引放入具有计算平方的队列中。
unsorted_result = [queue.get() for _ in processes]
我们得到了结果。此时,元组的顺序是随机的。
result = [val[1] for val in sorted(unsorted_result)]
我们根据索引值对结果数据进行排序。
$ ./queue_order.py [4, 16, 36, 9, 25, 64, 81, 49]
我们得到对应于初始数据的平方值。
用蒙特卡洛方法计算μ
蒙特卡洛方法是一类广泛的计算算法,它依赖于重复随机抽样来获得数值结果。基本概念是使用随机性来解决原则上可能是确定性的问题。
以下公式用于计算ϯ的近似值:
Ï退4â孟鈈MN
M是正方形中生成的点数,N是点的总数。
虽然这种μ计算方法很有趣并且非常适合学校示例,但它不是很准确。有更好的算法来获得Ï。
#!/usr/bin/python from random import random from math import sqrt from timeit import default_timer as timer def pi(n): count = 0 for i in range(n): x, y = random(), random() r = sqrt(pow(x, 2) + pow(y, 2)) if r < 1: count += 1 return 4 * count / n start = timer() pi_est = pi(100_000_000) end = timer() print(f'elapsed time: {end - start}') print(f'Ï estimate: {pi_est}')
在示例中,我们使用一亿个生成的随机点来计算μ值的近似值。
$ ./monte_carlo_pi.py elapsed time: 44.7768127549989 Ï estimate: 3.14136588
计算μ的近似值用了44.78秒
现在我们将μ计算的整个任务划分为子任务。
#!/usr/bin/python import random from multiprocessing import Pool, cpu_count from math import sqrt from timeit import default_timer as timer def pi_part(n): print(n) count = 0 for i in range(int(n)): x, y = random.random(), random.random() r = sqrt(pow(x, 2) + pow(y, 2)) if r < 1: count += 1 return count def main(): start = timer() np = cpu_count() print(f'You have {np} cores') n = 100_000_000 part_count = [n/np for i in range(np)] with Pool(processes=np) as pool: count = pool.map(pi_part, part_count) pi_est = sum(count) / (n * 1.0) * 4 end = timer() print(f'elapsed time: {end - start}') print(f'Ï estimate: {pi_est}') if __name__=='__main__': main()
在示例中,我们找出核心数并将随机采样划分为子任务。每个任务将独立计算随机值。
n = 100_000_000 part_count = [n/np for i in range(np)]
不是一次性计算100_000_000,而是每个子任务计算其中的一部分。
count = pool.map(pi_part, part_count) pi_est = sum(count) / (n * 1.0) * 4
部分计算被传递给count
变量,然后在最终公式中使用总和。
$ ./monte_carlo_pi_mul.py You have 4 cores 25000000.0 25000000.0 25000000.0 25000000.0 elapsed time: 29.45832426099878 Ï estimate: 3.1414868
当使用四核并行运行示例时,计算耗时29.46秒。
在本教程中,我们使用了multiprocessing
模块。
列出所有Python教程。