开放的编程资料库

当前位置:我爱分享网 > Python教程 > 正文

Python线程池执行器

在本文中,我们展示了如何使用ThreadPoolExecutor在Python中进行并发编程。

并发编程的主要目标是使我们的代码更高效。并发可以通过线程、并行或异步来实现。在本教程中,我们使用ThreadPoolExecutor处理线程。

线程是独立执行的代码。线程用于IO绑定任务,例如下载文件或执行数据库命令。由于Python强制执行GIL,真正的并行性(大部分)不可能通过线程实现。对于并行编程,我们应该考虑多处理/ThreadPoolExecutor。

全局解释器锁(GIL)是Python解释器中用于同步线程执行的一种机制,因此即使在多核处理器上运行,一次也只能执行一个本机线程。这是为了防止并发错误。

threading模块为并发使用线程提供了一个基本接口。

线程池执行器

concurrent.futures模块为并发执行可调用对象提供了一个高级接口。ThreadPoolExecutor是模块的一部分。ThreadPoolExecutor简化了线程的创建和管理。它管理一个工作池,使线程的创建、管理和关闭的整个过程更加高效。

创建新线程时会有一些显着的开销。工作线程旨在在任务完成后重新使用。

未来

Future是一个表示异步操作最终结果的对象。未来以结果或例外结束。它的result函数返回并发操作的结果。

Python线程池执行器提交

submit函数安排要执行的可调用对象,并返回表示可调用对象执行的未来对象。

#!/usr/bin/python

from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")


with ThreadPoolExecutor() as executor:

    executor.submit(task, 1, 4)
    executor.submit(task, 2, 3)
    executor.submit(task, 3, 2)

在示例中,我们提交了三个要执行的任务。

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")

任务是一个打印线程和一些基本消息的函数。它还会休眠给定的秒数。time.sleep函数常用于模拟一些长时间运行的任务。

with ThreadPoolExecutor() as executor:

创建了一个新的执行器。它用作上下文管理器,以便在最后关闭。

executor.submit(task, 1, 4)
executor.submit(task, 2, 3)
executor.submit(task, 3, 2)

我们使用submit函数提交三个任务。

$ ./submitfun.py
thread 1 started
thread 1 : 140563097032256
thread 2 started
thread 2 : 140563088639552
thread 3 started
thread 3 : 140563005306432
thread 3 completed
thread 2 completed
thread 1 completed

PythonThreadPoolExecutor图

map函数将给定函数应用于可迭代对象中的每个元素。该函数可以接受多个可迭代对象。

#!/usr/bin/python

from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")

with ThreadPoolExecutor() as executor:

    executor.map(task, [1, 2, 3], [4, 3, 2])

我们使用map重写了前面的例子。它接受两个可迭代对象:id和持续时间。

PythonThreadPoolExecutorFuture.result

Future表示并发操作的最终结果。未来的result函数返回可调用的值;它会阻塞,直到与未来相关的任务完成。

#!/usr/bin/python

from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor

def task(tid):

    r = random.randint(1, 5)
    print(f'task {tid} started, sleeping {r} secs')
    sleep(r)

    return f'finished task {tid}, slept {r}'

start = perf_counter()

with ThreadPoolExecutor() as executor:

    t1 = executor.submit(task, 1)
    t2 = executor.submit(task, 2)
    t3 = executor.submit(task, 3)

    print(t1.result())
    print(t2.result())
    print(t3.result())

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

我们的任务随机休眠几秒。我们从每个任务中得到一条消息。此外,我们使用time模块来计算经过的时间。

return f'finished task {tid}, slept {r}'

返回值将通过result函数调用获得。

start = perf_counter()

使用perf_counter,我们计算经过的时间。

t1 = executor.submit(task, 1)
t2 = executor.submit(task, 2)
t3 = executor.submit(task, 3)

print(t1.result())
print(t2.result())
print(t3.result())

我们提交了三个任务并检索了它们的结果。请注意,result函数是阻塞的;因此,我们按照原来的调度顺序得到任务结果。

$ ./resultfun.py
task 1 started, sleeping 3 secs
task 2 started, sleeping 4 secs
task 3 started, sleeping 1 secs
finished task 1, slept 3
finished task 2, slept 4
finished task 3, slept 1
It took 4.005295900977217 second(s) to finish.

整个过程的持续时间与其最长的任务一样长(加上一些开销)。任务按计划的顺序完成,因为result函数处于阻塞状态。在下一个示例中,我们将解决此问题。

PythonThreadPoolExecutoras_completed

as_completed函数返回一个迭代器,该迭代器会在完成时产生未来。

不幸的是,无法将mapas_completed一起使用。

#!/usr/bin/python

from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(tid):

    r = random.randint(1, 5)
    print(f'task {tid} started, sleeping {r} secs')
    sleep(r)

    return f'finished task {tid}, slept {r}'

start = perf_counter()

with ThreadPoolExecutor() as executor:

    tids = [1, 2, 3]
    futures = []

    for tid in tids:
        futures.append(executor.submit(task, tid))

    for res in as_completed(futures):
        print(res.result())

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

在这个例子中,我们按照任务完成的顺序获取任务的结果。

$ ./as_completed.py
task 1 started, sleeping 3 secs
task 2 started, sleeping 4 secs
task 3 started, sleeping 2 secs
finished task 3, slept 2
finished task 1, slept 3
finished task 2, slept 4
It took 4.00534593896009 second(s) to finish.

多个并发HTTP请求

在下一个示例中,我们使用ThreadPoolExecutor生成多个HTTP请求。requests库用于生成HTTP请求。

#!/usr/bin/python

import requests
import concurrent.futures
import time

def get_status(url):

    resp = requests.get(url=url)
    return resp.status_code

urls = ['http://webcode.me', 'https://httpbin.org/get',
    'https://google.com', 'https://stackoverflow.com',
    'https://github.com', 'https://clojure.org',
    'https://fsharp.org']

tm1 = time.perf_counter()

with concurrent.futures.ThreadPoolExecutor() as executor:

    futures = []

    for url in urls:
        futures.append(executor.submit(get_status, url=url))

    for future in concurrent.futures.as_completed(futures):
        print(future.result())

tm2 = time.perf_counter()
print(f'elapsed {tm2-tm1:0.2f} seconds')

该示例同时检查多个网站的HTTP状态代码。

$ ./web_requests.py 
200
200
200
200
200
200
200
elapsed 0.81 seconds

并发ping

在下一个示例中,我们使用外部程序ping给定的网站。

#!/usr/bin/python

from time import perf_counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess


def task(url):

    ok, _ = subprocess.getstatusoutput(
        [f'ping -c 3 -w 10 {url}'])
    
    return ok == 0, url


urls = ['webcode.me', 'clojure.org', 'fsharp.org', 
    'www.perl.org', 'python.org', 'go.dev', 'raku.org']

start = perf_counter()

with ThreadPoolExecutor() as executor:

    futures = []

    for url in urls:
        futures.append(executor.submit(task, url))

    for future in as_completed(futures):

        r, u = future.result()
        
        if r:
            print(f'OK -> {u}')
        else:
            print(f'failed -> {u}')

finish = perf_counter()

print(f"elapsed {finish-start} second(s)")

该示例使用subprocess模块来执行外部程序。

ok, _ = subprocess.getstatusoutput(
    [f'ping -c 3 -w 10 {url}'])

getstatusoutput函数返回已执行命令的(退出代码,输出)。ping是一个标准的Unix程序,它向网络主机发送ICMPECHO_REQUEST。-c选项确定发送的数据包数。-w选项以秒为单位设置截止日期。

$ ./pinging.py 
OK -> go.dev
OK -> fsharp.org
OK -> www.perl.org
OK -> python.org
OK -> raku.org
OK -> clojure.org
OK -> webcode.me
elapsed 2.384801392967347 second(s)

在本文中,我们使用了ThreadPoolExecutor。

列出所有Python教程。

未经允许不得转载:我爱分享网 » Python线程池执行器

感觉很棒!可以赞赏支持我哟~

赞(0) 打赏