在本文中,我们展示了如何使用ThreadPoolExecutor在Python中进行并发编程。
并发编程的主要目标是使我们的代码更高效。并发可以通过线程、并行或异步来实现。在本教程中,我们使用ThreadPoolExecutor
处理线程。
线程是独立执行的代码。线程用于IO绑定任务,例如下载文件或执行数据库命令。由于Python强制执行GIL,真正的并行性(大部分)不可能通过线程实现。对于并行编程,我们应该考虑多处理/ThreadPoolExecutor。
全局解释器锁(GIL)是Python解释器中用于同步线程执行的一种机制,因此即使在多核处理器上运行,一次也只能执行一个本机线程。这是为了防止并发错误。
threading
模块为并发使用线程提供了一个基本接口。
线程池执行器
concurrent.futures
模块为并发执行可调用对象提供了一个高级接口。ThreadPoolExecutor
是模块的一部分。ThreadPoolExecutor
简化了线程的创建和管理。它管理一个工作池,使线程的创建、管理和关闭的整个过程更加高效。
创建新线程时会有一些显着的开销。工作线程旨在在任务完成后重新使用。
未来
Future是一个表示异步操作最终结果的对象。未来以结果或例外结束。它的result
函数返回并发操作的结果。
Python线程池执行器提交
submit
函数安排要执行的可调用对象,并返回表示可调用对象执行的未来对象。
#!/usr/bin/python from time import sleep from concurrent.futures import ThreadPoolExecutor import threading def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed") with ThreadPoolExecutor() as executor: executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
在示例中,我们提交了三个要执行的任务。
def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed")
任务是一个打印线程和一些基本消息的函数。它还会休眠给定的秒数。time.sleep
函数常用于模拟一些长时间运行的任务。
with ThreadPoolExecutor() as executor:
创建了一个新的执行器。它用作上下文管理器,以便在最后关闭。
executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
我们使用submit
函数提交三个任务。
$ ./submitfun.py thread 1 started thread 1 : 140563097032256 thread 2 started thread 2 : 140563088639552 thread 3 started thread 3 : 140563005306432 thread 3 completed thread 2 completed thread 1 completed
PythonThreadPoolExecutor图
map
函数将给定函数应用于可迭代对象中的每个元素。该函数可以接受多个可迭代对象。
#!/usr/bin/python from time import sleep from concurrent.futures import ThreadPoolExecutor import threading def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed") with ThreadPoolExecutor() as executor: executor.map(task, [1, 2, 3], [4, 3, 2])
我们使用map
重写了前面的例子。它接受两个可迭代对象:id和持续时间。
PythonThreadPoolExecutorFuture.result
Future表示并发操作的最终结果。未来的result
函数返回可调用的值;它会阻塞,直到与未来相关的任务完成。
#!/usr/bin/python from time import sleep, perf_counter import random from concurrent.futures import ThreadPoolExecutor def task(tid): r = random.randint(1, 5) print(f'task {tid} started, sleeping {r} secs') sleep(r) return f'finished task {tid}, slept {r}' start = perf_counter() with ThreadPoolExecutor() as executor: t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
我们的任务随机休眠几秒。我们从每个任务中得到一条消息。此外,我们使用time
模块来计算经过的时间。
return f'finished task {tid}, slept {r}'
返回值将通过result
函数调用获得。
start = perf_counter()
使用perf_counter
,我们计算经过的时间。
t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result())
我们提交了三个任务并检索了它们的结果。请注意,result
函数是阻塞的;因此,我们按照原来的调度顺序得到任务结果。
$ ./resultfun.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 1 secs finished task 1, slept 3 finished task 2, slept 4 finished task 3, slept 1 It took 4.005295900977217 second(s) to finish.
整个过程的持续时间与其最长的任务一样长(加上一些开销)。任务按计划的顺序完成,因为result
函数处于阻塞状态。在下一个示例中,我们将解决此问题。
PythonThreadPoolExecutoras_completed
as_completed
函数返回一个迭代器,该迭代器会在完成时产生未来。
不幸的是,无法将map
与as_completed
一起使用。
#!/usr/bin/python from time import sleep, perf_counter import random from concurrent.futures import ThreadPoolExecutor, as_completed def task(tid): r = random.randint(1, 5) print(f'task {tid} started, sleeping {r} secs') sleep(r) return f'finished task {tid}, slept {r}' start = perf_counter() with ThreadPoolExecutor() as executor: tids = [1, 2, 3] futures = [] for tid in tids: futures.append(executor.submit(task, tid)) for res in as_completed(futures): print(res.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
在这个例子中,我们按照任务完成的顺序获取任务的结果。
$ ./as_completed.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 2 secs finished task 3, slept 2 finished task 1, slept 3 finished task 2, slept 4 It took 4.00534593896009 second(s) to finish.
多个并发HTTP请求
在下一个示例中,我们使用ThreadPoolExecutor
生成多个HTTP请求。requests
库用于生成HTTP请求。
#!/usr/bin/python import requests import concurrent.futures import time def get_status(url): resp = requests.get(url=url) return resp.status_code urls = ['http://webcode.me', 'https://httpbin.org/get', 'https://google.com', 'https://stackoverflow.com', 'https://github.com', 'https://clojure.org', 'https://fsharp.org'] tm1 = time.perf_counter() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in urls: futures.append(executor.submit(get_status, url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) tm2 = time.perf_counter() print(f'elapsed {tm2-tm1:0.2f} seconds')
该示例同时检查多个网站的HTTP状态代码。
$ ./web_requests.py 200 200 200 200 200 200 200 elapsed 0.81 seconds
并发ping
在下一个示例中,我们使用外部程序ping给定的网站。
#!/usr/bin/python from time import perf_counter from concurrent.futures import ThreadPoolExecutor, as_completed import subprocess def task(url): ok, _ = subprocess.getstatusoutput( [f'ping -c 3 -w 10 {url}']) return ok == 0, url urls = ['webcode.me', 'clojure.org', 'fsharp.org', 'www.perl.org', 'python.org', 'go.dev', 'raku.org'] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [] for url in urls: futures.append(executor.submit(task, url)) for future in as_completed(futures): r, u = future.result() if r: print(f'OK -> {u}') else: print(f'failed -> {u}') finish = perf_counter() print(f"elapsed {finish-start} second(s)")
该示例使用subprocess
模块来执行外部程序。
ok, _ = subprocess.getstatusoutput( [f'ping -c 3 -w 10 {url}'])
getstatusoutput
函数返回已执行命令的(退出代码,输出)。ping
是一个标准的Unix程序,它向网络主机发送ICMPECHO_REQUEST。-c
选项确定发送的数据包数。-w
选项以秒为单位设置截止日期。
$ ./pinging.py OK -> go.dev OK -> fsharp.org OK -> www.perl.org OK -> python.org OK -> raku.org OK -> clojure.org OK -> webcode.me elapsed 2.384801392967347 second(s)
在本文中,我们使用了ThreadPoolExecutor。
列出所有Python教程。