在本文中,我们展示了如何使用ThreadPoolExecutor在Python中进行并发编程。
并发编程的主要目标是使我们的代码更高效。并发可以通过线程、并行或异步来实现。在本教程中,我们使用ThreadPoolExecutor处理线程。
线程是独立执行的代码。线程用于IO绑定任务,例如下载文件或执行数据库命令。由于Python强制执行GIL,真正的并行性(大部分)不可能通过线程实现。对于并行编程,我们应该考虑多处理/ThreadPoolExecutor。
全局解释器锁(GIL)是Python解释器中用于同步线程执行的一种机制,因此即使在多核处理器上运行,一次也只能执行一个本机线程。这是为了防止并发错误。
threading模块为并发使用线程提供了一个基本接口。
线程池执行器
concurrent.futures模块为并发执行可调用对象提供了一个高级接口。ThreadPoolExecutor是模块的一部分。ThreadPoolExecutor简化了线程的创建和管理。它管理一个工作池,使线程的创建、管理和关闭的整个过程更加高效。
创建新线程时会有一些显着的开销。工作线程旨在在任务完成后重新使用。
未来
Future是一个表示异步操作最终结果的对象。未来以结果或例外结束。它的result函数返回并发操作的结果。
Python线程池执行器提交
submit函数安排要执行的可调用对象,并返回表示可调用对象执行的未来对象。
#!/usr/bin/python
from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading
def task(id, n):
print(f"thread {id} started")
print(f"thread {id} : {threading.get_ident()}")
sleep(n)
print(f"thread {id} completed")
with ThreadPoolExecutor() as executor:
executor.submit(task, 1, 4)
executor.submit(task, 2, 3)
executor.submit(task, 3, 2)
在示例中,我们提交了三个要执行的任务。
def task(id, n):
print(f"thread {id} started")
print(f"thread {id} : {threading.get_ident()}")
sleep(n)
print(f"thread {id} completed")
任务是一个打印线程和一些基本消息的函数。它还会休眠给定的秒数。time.sleep函数常用于模拟一些长时间运行的任务。
with ThreadPoolExecutor() as executor:
创建了一个新的执行器。它用作上下文管理器,以便在最后关闭。
executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
我们使用submit函数提交三个任务。
$ ./submitfun.py thread 1 started thread 1 : 140563097032256 thread 2 started thread 2 : 140563088639552 thread 3 started thread 3 : 140563005306432 thread 3 completed thread 2 completed thread 1 completed
PythonThreadPoolExecutor图
map函数将给定函数应用于可迭代对象中的每个元素。该函数可以接受多个可迭代对象。
#!/usr/bin/python
from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading
def task(id, n):
print(f"thread {id} started")
print(f"thread {id} : {threading.get_ident()}")
sleep(n)
print(f"thread {id} completed")
with ThreadPoolExecutor() as executor:
executor.map(task, [1, 2, 3], [4, 3, 2])
我们使用map重写了前面的例子。它接受两个可迭代对象:id和持续时间。
PythonThreadPoolExecutorFuture.result
Future表示并发操作的最终结果。未来的result函数返回可调用的值;它会阻塞,直到与未来相关的任务完成。
#!/usr/bin/python
from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor
def task(tid):
r = random.randint(1, 5)
print(f'task {tid} started, sleeping {r} secs')
sleep(r)
return f'finished task {tid}, slept {r}'
start = perf_counter()
with ThreadPoolExecutor() as executor:
t1 = executor.submit(task, 1)
t2 = executor.submit(task, 2)
t3 = executor.submit(task, 3)
print(t1.result())
print(t2.result())
print(t3.result())
finish = perf_counter()
print(f"It took {finish-start} second(s) to finish.")
我们的任务随机休眠几秒。我们从每个任务中得到一条消息。此外,我们使用time模块来计算经过的时间。
return f'finished task {tid}, slept {r}'
返回值将通过result函数调用获得。
start = perf_counter()
使用perf_counter,我们计算经过的时间。
t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result())
我们提交了三个任务并检索了它们的结果。请注意,result函数是阻塞的;因此,我们按照原来的调度顺序得到任务结果。
$ ./resultfun.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 1 secs finished task 1, slept 3 finished task 2, slept 4 finished task 3, slept 1 It took 4.005295900977217 second(s) to finish.
整个过程的持续时间与其最长的任务一样长(加上一些开销)。任务按计划的顺序完成,因为result函数处于阻塞状态。在下一个示例中,我们将解决此问题。
PythonThreadPoolExecutoras_completed
as_completed函数返回一个迭代器,该迭代器会在完成时产生未来。
不幸的是,无法将map与as_completed一起使用。
#!/usr/bin/python
from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(tid):
r = random.randint(1, 5)
print(f'task {tid} started, sleeping {r} secs')
sleep(r)
return f'finished task {tid}, slept {r}'
start = perf_counter()
with ThreadPoolExecutor() as executor:
tids = [1, 2, 3]
futures = []
for tid in tids:
futures.append(executor.submit(task, tid))
for res in as_completed(futures):
print(res.result())
finish = perf_counter()
print(f"It took {finish-start} second(s) to finish.")
在这个例子中,我们按照任务完成的顺序获取任务的结果。
$ ./as_completed.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 2 secs finished task 3, slept 2 finished task 1, slept 3 finished task 2, slept 4 It took 4.00534593896009 second(s) to finish.
多个并发HTTP请求
在下一个示例中,我们使用ThreadPoolExecutor生成多个HTTP请求。requests库用于生成HTTP请求。
#!/usr/bin/python
import requests
import concurrent.futures
import time
def get_status(url):
resp = requests.get(url=url)
return resp.status_code
urls = ['http://webcode.me', 'https://httpbin.org/get',
'https://google.com', 'https://stackoverflow.com',
'https://github.com', 'https://clojure.org',
'https://fsharp.org']
tm1 = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in urls:
futures.append(executor.submit(get_status, url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
tm2 = time.perf_counter()
print(f'elapsed {tm2-tm1:0.2f} seconds')
该示例同时检查多个网站的HTTP状态代码。
$ ./web_requests.py 200 200 200 200 200 200 200 elapsed 0.81 seconds
并发ping
在下一个示例中,我们使用外部程序ping给定的网站。
#!/usr/bin/python
from time import perf_counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess
def task(url):
ok, _ = subprocess.getstatusoutput(
[f'ping -c 3 -w 10 {url}'])
return ok == 0, url
urls = ['webcode.me', 'clojure.org', 'fsharp.org',
'www.perl.org', 'python.org', 'go.dev', 'raku.org']
start = perf_counter()
with ThreadPoolExecutor() as executor:
futures = []
for url in urls:
futures.append(executor.submit(task, url))
for future in as_completed(futures):
r, u = future.result()
if r:
print(f'OK -> {u}')
else:
print(f'failed -> {u}')
finish = perf_counter()
print(f"elapsed {finish-start} second(s)")
该示例使用subprocess模块来执行外部程序。
ok, _ = subprocess.getstatusoutput(
[f'ping -c 3 -w 10 {url}'])
getstatusoutput函数返回已执行命令的(退出代码,输出)。ping是一个标准的Unix程序,它向网络主机发送ICMPECHO_REQUEST。-c选项确定发送的数据包数。-w选项以秒为单位设置截止日期。
$ ./pinging.py OK -> go.dev OK -> fsharp.org OK -> www.perl.org OK -> python.org OK -> raku.org OK -> clojure.org OK -> webcode.me elapsed 2.384801392967347 second(s)
在本文中,我们使用了ThreadPoolExecutor。
列出所有Python教程。
