开放的编程资料库

当前位置:我爱分享网 > Python教程 > 正文

Python多处理

Python多处理教程是Python中基于进程的并行性的入门教程。

Python多处理

multiprocessing模块允许程序员在给定机器上充分利用多个处理器。使用的API类似于经典的threading模块。它同时提供本地和远程并发。

多进程模块通过使用子进程而不是线程来避免全局解释器锁(GIL)的限制。多进程代码的执行顺序与串行代码不同。无法保证第一个创建的进程将最先完成。

蟒蛇GIL

全局解释器锁(GIL)是Python解释器中用于同步线程执行的一种机制,因此即使在多核处理器上运行,一次也只能执行一个本机线程。

C扩展,如numpy,可以手动释放GIL来加速计算。此外,GIL在可能阻止I/O操作之前发布。

请注意,Jython和IronPython都没有GIL。

并发和并行

并发意味着两个或多个计算在同一时间范围内发生。并行意味着两个或多个计算同时发生。因此,并行是并发的一种特殊情况。它需要多个CPU单元或内核。

Python中真正的并行性是通过创建多个进程来实现的,每个进程都有一个Python解释器和自己独立的GIL。

Python具有三个并发模块:multiprocessingthreadingasyncio。当任务是CPU密集型时,我们应该考虑multiprocessing模块。当任务受I/O限制并需要大量连接时,推荐使用asyncio模块。对于其他类型的任务,当库无法与asyncio配合时,可以考虑使用threading模块。

尴尬的并行

术语embarrassinblyparallel用于描述可以轻松并行运行的问题或工作负载。重要的是要认识到并非所有工作负载都可以划分为子任务并并行运行。例如那些需要在子任务之间进行大量通信的人。

完美并行计算的例子包括:

  • 蒙特卡罗分析
  • 数值积分
  • 计算机图形渲染
  • 密码学中的强力搜索
  • 遗传算法

另一种可以应用并行计算的情况是当我们运行多个不同的计算时,也就是说,我们不将问题划分为子任务。例如,我们可以使用不同的算法并行运行Ï的计算。

进程与线程

进程和线程都是独立的执行序列。下表总结了进程和线程的区别:

td>孩子可以变成僵尸

tr>

Process Thread
进程在单独的内存中运行(进程隔离) 线程共享内存
使用更多内存 使用更少内存
不可能变成僵尸
更多开销 更少开销
创建和销毁速度较慢 创建和销毁速度较快
更易于编码和调试 可以成为更难编码和调试

过程

Process对象表示在单独进程中运行的活动。multiprocessing.Process类具有threading.Thread的所有方法的等效项。Process构造函数应始终使用关键字参数调用。

构造函数的target参数是run方法调用的可调用对象。name是进程名。start方法启动进程的活动。join方法会阻塞,直到其join方法被调用的进程终止。如果提供了timeout选项,它最多阻塞超时秒数。is_alive方法返回一个布尔值,指示进程是否处于活动状态。terminate方法终止进程。

__main__守卫

Python多处理风格指南建议将多处理代码放在__name__=='__main__'习语中。这是由于在Windows上创建进程的方式。守卫是为了防止进程生成的死循环。

简单流程示例

下面是一个使用多处理的简单程序。

#!/usr/bin/python

from multiprocessing import Process


def fun(name):
    print(f'hello {name}')

def main():

    p = Process(target=fun, args=('Peter',))
    p.start()


if __name__ == '__main__':
    main()

我们创建一个新流程并向其传递一个值。

def fun(name):
    print(f'hello {name}')

函数打印传递的参数。

def main():

    p = Process(target=fun, args=('Peter',))
    p.start()

创建了一个新进程。target选项提供在新进程中运行的可调用对象。args提供要传递的数据。multiprocessing代码放在mainguard里面。进程是用start方法启动的。

if __name__ == '__main__':
    main()

代码放在__name__=='__main__'习语中。

Python多进程连接

join方法会阻塞主进程的执行,直到其join方法被调用的进程终止。如果没有join方法,主进程将不会等到进程终止。

#!/usr/bin/python

from multiprocessing import Process
import time

def fun():

    print('starting fun')
    time.sleep(2)
    print('finishing fun')

def main():

    p = Process(target=fun)
    p.start()
    p.join()


if __name__ == '__main__':

    print('starting main')
    main()
    print('finishing main')

该示例在新创建的进程上调用join

$ ./joining.py
starting main
starting fun
finishing fun
finishing main

finishingmain消息在子进程完成后打印。

$ ./joining.py
starting main
finishing main
starting fun
finishing fun

当我们注释掉join方法时,主进程在子进程之前完成。

start方法之后调用join方法很重要。

#!/usr/bin/python

from multiprocessing import Process
import time

def fun(val):

    print(f'starting fun with {val} s')
    time.sleep(val)
    print(f'finishing fun with {val} s')


def main():

    p1 = Process(target=fun, args=(3, ))
    p1.start()
    # p1.join()

    p2 = Process(target=fun, args=(2, ))
    p2.start()
    # p2.join()

    p3 = Process(target=fun, args=(1, ))
    p3.start()
    # p3.join()

    p1.join()
    p2.join()
    p3.join()

    print('finished main')

if __name__ == '__main__':

    main()

如果我们错误地调用了join方法,那么我们实际上是按顺序运行进程。(不正确的方式已注释掉。)

Python多进程is_alive

is_alive方法确定进程是否正在运行。

#!/usr/bin/python

from multiprocessing import Process
import time

def fun():

    print('calling fun')
    time.sleep(2)

def main():

    print('main fun')

    p = Process(target=fun)
    p.start()
    p.join()

    print(f'Process p is alive: {p.is_alive()}')


if __name__ == '__main__':
    main()

当我们用join方法等待子进程结束时,检查时进程已经挂了。如果我们注释掉join,进程仍然存在。

Python多处理进程ID

os.getpid返回当前进程Id,而os.getppid返回父进程Id。

#!/usr/bin/python

from multiprocessing import Process
import os

def fun():

    print('--------------------------')

    print('calling fun')
    print('parent process id:', os.getppid())
    print('process id:', os.getpid())

def main():

    print('main fun')
    print('process id:', os.getpid())

    p1 = Process(target=fun)
    p1.start()
    p1.join()

    p2 = Process(target=fun)
    p2.start()
    p2.join()


if __name__ == '__main__':
    main()

该示例运行两个子进程。它打印他们的Id和他们父母的Id。

$ ./parent_id.py
main fun
process id: 7605
--------------------------
calling fun
parent process id: 7605
process id: 7606
--------------------------
calling fun
parent process id: 7605
process id: 7607

父进程Id相同,每个子进程的进程Id不同。

命名过程

通过Processname属性,我们可以为worker指定一个特定的名称。否则,模块会创建自己的名称。

#!/usr/bin/python

from multiprocessing import Process, current_process
import time

def worker():

    name = current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')

def service():

    name = current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')

if __name__ == '__main__':

    service = Process(name='Service 1', target=service)
    worker1 = Process(name='Worker 1', target=worker)
    worker2 = Process(target=worker) # use default name

    worker1.start()
    worker2.start()
    service.start()

在示例中,我们创建了三个进程;其中两个被赋予了自定义名称。

$ ./naming_workers.py
Worker 1 Starting
Process-3 Starting
Service 1 Starting
Worker 1 Exiting
Process-3 Exiting
Service 1 Exiting

子类化过程

当我们子类化Process时,我们覆盖了run方法。

#!/usr/bin/python

import time
from multiprocessing import Process


class Worker(Process):

    def run(self):

        print(f'In {self.name}')
        time.sleep(2)

def main():

    worker = Worker()
    worker.start()

    worker2 = Worker()
    worker2.start()

    worker.join()
    worker2.join()

if __name__ == '__main__':
    main()

我们创建了一个继承自ProcessWorker类。在run方法中,我们编写了worker的代码。

Python多处理池

可以使用Pool对象简化工作进程的管理。它控制可以提交作业的工作进程池。该池的map方法将给定的可迭代对象分成许多块,将这些块作为单独的任务提交给进程池。池的map是内置map方法的并行等价物。map阻塞主执行,直到所有计算完成。

Pool可以把进程数作为一个参数。这是一个我们可以试验的值。如果我们不提供任何值,则使用os.cpu_count返回的数字。

#!/usr/bin/python

import time
from timeit import default_timer as timer
from multiprocessing import Pool, cpu_count


def square(n):

    time.sleep(2)

    return n * n


def main():

    start = timer()

    print(f'starting computations on {cpu_count()} cores')

    values = (2, 4, 6, 8)

    with Pool() as pool:
        res = pool.map(square, values)
        print(res)

    end = timer()
    print(f'elapsed time: {end - start}')

if __name__ == '__main__':
    main()

在示例中,我们创建了一个进程池并在square函数上应用了值。核心数由cpu_unit函数决定。

$ ./worker_pool.py
starting computations on 4 cores
[4, 16, 36, 64]
elapsed time: 2.0256662130013865

在具有四核的计算机上,完成四次计算需要略多于2秒的时间,每次持续两秒。

$ ./worker_pool.py
starting computations on 4 cores
[4, 16, 36, 64, 100]
elapsed time: 4.029600699999719

当我们添加要计算的附加值时,时间增加到超过四秒。

多个参数

要将多个参数传递给辅助函数,我们可以使用starmap方法。可迭代对象的元素应该是作为参数解包的可迭代对象。

#!/usr/bin/python

import time
from timeit import default_timer as timer
from multiprocessing import Pool, cpu_count


def power(x, n):

    time.sleep(1)

    return x ** n


def main():

    start = timer()

    print(f'starting computations on {cpu_count()} cores')

    values = ((2, 2), (4, 3), (5, 5))

    with Pool() as pool:
        res = pool.starmap(power, values)
        print(res)

    end = timer()
    print(f'elapsed time: {end - start}')


if __name__ == '__main__':
    main()

在此示例中,我们将两个值传递给power函数:thevalue和exponent。

$ ./multi_args.py
starting computations on 4 cores
[4, 64, 3125]
elapsed time: 1.0230950259974634

多功能

以下示例展示了如何在池中运行多个函数。

#!/usr/bin/python

from multiprocessing import Pool
import functools


def inc(x):
    return x + 1

def dec(x):
    return x - 1

def add(x, y):
    return x + y

def smap(f):
    return f()


def main():

    f_inc = functools.partial(inc, 4)
    f_dec = functools.partial(dec, 2)
    f_add = functools.partial(add, 3, 4)

    with Pool() as pool:
        res = pool.map(smap, [f_inc, f_dec, f_add])

        print(res)


if __name__ == '__main__':
    main()

我们有三个函数,它们在一个池中独立运行。我们使用functools.partial在函数执行前准备函数及其参数。

$ ./multiple_functions.py
[5, 1, 7]

Python多处理运算

Ï是任何圆的周长与圆的直径之比。Ï是一个无理数,其小数形式既不结束也不重复。它约等于3.14159。有几个公式可以计算Ï。

计算μ的近似值可能需要很长时间,因此我们可以利用并行计算。我们使用BaileyâBorweinâPlouffe公式来计算Ï。

#!/usr/bin/python

from decimal import Decimal, getcontext
from timeit import default_timer as timer

def pi(precision):

    getcontext().prec = precision

    return sum(1/Decimal(16)**k *
        (Decimal(4)/(8*k+1) -
         Decimal(2)/(8*k+4) -
         Decimal(1)/(8*k+5) -
         Decimal(1)/(8*k+6)) for k in range (precision))


start = timer()
values = (1000, 1500, 2000)
data = list(map(pi, values))
print(data)

end = timer()
print(f'sequentially: {end - start}')

首先,我们依次计算三个近似值。精度是计算出的μ的位数。

$ ./calc_pi.py
...
sequentially: 0.5738053179993585

在我们的机器上,计算这三个近似值需要0.57381秒。

在下面的示例中,我们使用进程池来计算三个近似值。

#!/usr/bin/python

from decimal import Decimal, getcontext
from timeit import default_timer as timer
from multiprocessing import Pool, current_process
import time


def pi(precision):

    getcontext().prec=precision

    return sum(1/Decimal(16)**k *
        (Decimal(4)/(8*k+1) -
         Decimal(2)/(8*k+4) -
         Decimal(1)/(8*k+5) -
         Decimal(1)/(8*k+6)) for k in range (precision))

def main():

    start = timer()

    with Pool(3) as pool:

        values = (1000, 1500, 2000)
        data = pool.map(pi, values)
        print(data)

    end = timer()
    print(f'paralelly: {end - start}')


if __name__ == '__main__':
    main()

我们在三个进程的池中运行计算,我们获得了一些小的效率提升。

./calc_pi2.py
...
paralelly: 0.38216479000038817

当我们并行运行计算时,耗时0.38216479秒。

在一个进程中分离内存

在多进程中,每个worker都有自己的内存。内存不像线程那样共享。

#!/usr/bin/python

from multiprocessing import Process, current_process

data = [1, 2]

def fun():

    global data

    data.extend((3, 4, 5))
    print(f'Result in {current_process().name}: {data}')

def main():

    worker = Process(target=fun)
    worker.start()
    worker.join()

    print(f'Result in main: {data}')


if __name__ == '__main__':

    main()

我们创建了一个worker,我们将全局data列表传递给它。我们向worker中的列表添加了额外的值,但主进程中的原始列表没有被修改。

$ ./own_memory_space.py
Result in Process-1: [1, 2, 3, 4, 5]
Result in main: [1, 2]

从输出中我们可以看出,这两个列表是分开的。

进程间共享状态

可以使用ValueArray将数据存储在共享内存中。

#!/usr/bin/python

from multiprocessing import Process, Value
from time import sleep


def f(counter):

    sleep(1)

    with counter.get_lock():
        counter.value += 1

    print(f'Counter: {counter.value}')

def main():

    counter = Value('i', 0)

    processes = [Process(target=f, args=(counter, )) for _ in range(30)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

if __name__ == '__main__':
    main()

该示例创建了一个在进程之间共享的计数器对象。每个进程都会增加计数器。

with counter.get_lock():
    counter.value += 1

每个进程都必须为自己获取一个锁。

使用队列传递消息

消息传递是进程间通信的首选方式。消息传递避免了必须使用同步原语,例如锁,这些同步原语在复杂情况下难以使用且容易出错。

为了传递消息,我们可以利用管道连接两个进程。队列允许多个生产者和消费者。

#!/usr/bin/python

from multiprocessing import Process, Queue
import random

def rand_val(queue):

    num = random.random()
    queue.put(num)


def main():

    queue = Queue()

    processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    results = [queue.get() for _ in processes]
    print(results)

if __name__ == "__main__":
    main()

在示例中,我们创建了四个进程。每个进程生成一个随机值并将其放入队列中。所有进程完成后,我们从队列中获取所有值。

processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)]

队列作为参数传递给进程。

results = [queue.get() for _ in processes]

get方法从队列中删除并返回项目。

$ ./simple_queue.py
[0.7829025790441544, 0.46465345633928223, 0.4804438310782676, 0.7146952404346074]

该示例生成一个包含四个随机值的列表。

在下面的示例中,我们将单词放入队列中。创建的进程从队列中读取单词。

#!/usr/bin/python

from multiprocessing import Queue, Process, current_process


def worker(queue):
    name = current_process().name
    print(f'{name} data received: {queue.get()}')


def main():

    queue = Queue()
    queue.put("wood")
    queue.put("sky")
    queue.put("cloud")
    queue.put("ocean")

    processes = [Process(target=worker, args=(queue,)) for _ in range(4)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

if __name__ == "__main__":
    main()

创建了四个进程;他们每个人都从队列中读取一个单词并打印出来。

$ ./simple_queue2.py
Process-1 data received: wood
Process-2 data received: sky
Process-3 data received: cloud
Process-4 data received: ocean

排队顺序

在多处理中,不能保证进程按特定顺序完成。

#!/usr/bin/python

from multiprocessing import Process, Queue
import time
import random

def square(idx, x, queue):

    time.sleep(random.randint(1, 3))
    queue.put((idx, x * x))


def main():

    data = [2, 4, 6, 3, 5, 8, 9, 7]
    queue = Queue()
    processes = [Process(target=square, args=(idx, val, queue))
                 for idx, val in enumerate(data)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    unsorted_result = [queue.get() for _ in processes]

    result = [val[1] for val in sorted(unsorted_result)]
    print(result)


if __name__ == '__main__':
    main()

我们有计算值平方的过程。输入数据是按一定顺序排列的,我们需要维护这个顺序。为了解决这个问题,我们为每个输入值保留了一个额外的索引。

def square(idx, x, queue):

    time.sleep(random.randint(1, 3))
    queue.put((idx, x * x))

为了说明变化,我们使用sleep方法随机减慢计算速度。我们将索引放入具有计算平方的队列中。

unsorted_result = [queue.get() for _ in processes]

我们得到了结果。此时,元组的顺序是随机的。

result = [val[1] for val in sorted(unsorted_result)]

我们根据索引值对结果数据进行排序。

$ ./queue_order.py
[4, 16, 36, 9, 25, 64, 81, 49]

我们得到对应于初始数据的平方值。

用蒙特卡洛方法计算μ

蒙特卡洛方法是一类广泛的计算算法,它依赖于重复随机抽样来获得数值结果。基本概念是使用随机性来解决原则上可能是确定性的问题。

以下公式用于计算ϯ的近似值:

Ï退4â孟鈈MN

M是正方形中生成的点数,N是点的总数。

虽然这种μ计算方法很有趣并且非常适合学校示例,但它不是很准确。有更好的算法来获得Ï。

#!/usr/bin/python

from random import random
from math import sqrt
from timeit import default_timer as timer


def pi(n):

    count = 0

    for i in range(n):

        x, y = random(), random()

        r = sqrt(pow(x, 2) + pow(y, 2))

        if r < 1:
            count += 1

    return 4 * count / n


start = timer()
pi_est = pi(100_000_000)
end = timer()

print(f'elapsed time: {end - start}')
print(f'π estimate: {pi_est}')

在示例中,我们使用一亿个生成的随机点来计算μ值的近似值。

$ ./monte_carlo_pi.py
elapsed time: 44.7768127549989
π estimate: 3.14136588

计算μ的近似值用了44.78秒

现在我们将μ计算的整个任务划分为子任务。

#!/usr/bin/python

import random
from multiprocessing import Pool, cpu_count
from math import sqrt
from timeit import default_timer as timer


def pi_part(n):
    print(n)

    count = 0

    for i in range(int(n)):

        x, y = random.random(), random.random()

        r = sqrt(pow(x, 2) + pow(y, 2))

        if r < 1:
            count += 1

    return count


def main():

    start = timer()

    np = cpu_count()
    print(f'You have {np} cores')

    n = 100_000_000

    part_count = [n/np for i in range(np)]

    with Pool(processes=np) as pool:

        count = pool.map(pi_part, part_count)
        pi_est = sum(count) / (n * 1.0) * 4

        end = timer()

        print(f'elapsed time: {end - start}')
        print(f'π estimate: {pi_est}')

if __name__=='__main__':
    main()

在示例中,我们找出核心数并将随机采样划分为子任务。每个任务将独立计算随机值。

n = 100_000_000

part_count = [n/np for i in range(np)]

不是一次性计算100_000_000,而是每个子任务计算其中的一部分。

count = pool.map(pi_part, part_count)
pi_est = sum(count) / (n * 1.0) * 4

部分计算被传递给count变量,然后在最终公式中使用总和。

$ ./monte_carlo_pi_mul.py
You have 4 cores
25000000.0
25000000.0
25000000.0
25000000.0
elapsed time: 29.45832426099878
π estimate: 3.1414868

当使用四核并行运行示例时,计算耗时29.46秒。

在本教程中,我们使用了multiprocessing模块。

列出所有Python教程。

未经允许不得转载:我爱分享网 » Python多处理

感觉很棒!可以赞赏支持我哟~

赞(0) 打赏