在本文中,我们展示了如何使用asyncio模块在Python中进行异步编程。
通过异步编程,我们可以在执行主程序的同时执行任务。
异步模块
asyncio
是一个用Python编写异步程序的库。
该模块提供高级API以:
- 并发运行Python协程
- 执行网络IO和IPC
- 控制子进程
- 通过队列分配任务
- 同步并发代码
并发编程用于两种任务:IO-bound任务和CPU-boud任务。从网络请求数据、访问数据库或读取和写入都是IO绑定任务。CPU-boud任务是计算量大的任务,例如数学计算或图形处理。
为了在Python中进行异步编程,我们使用事件循环、协程和未来。事件循环是负责管理异步任务并分配它们执行的主要任务。协程是安排事件执行的函数。Futures是协程执行的结果。结果可能以异常结束。
协程是一种用于协作式多任务处理的Python函数,它们可以在其中暂停和恢复。async
关键字用于创建Python协程。await
关键字暂停协程的执行,直到它完成并返回结果数据。
Pythonasyncio简单例子
下一个是使用asyncio的简单示例。
#!/usr/bin/python import asyncio async def mul(x, y): return x * y loop = asyncio.get_event_loop() try: res2 = loop.run_until_complete(mul(5, 5)) print(res2) finally: loop.close()
该程序创建并运行一个将两个数字相乘的异步函数。
async def mul(x, y): return x * y
协程是使用async
修饰符声明的函数。
loop = asyncio.get_event_loop()
get_event_loop
返回一个异步事件循环。需要一个事件循环来执行异步代码。
res = loop.run_until_complete(mul(5, 5))
run_until_complete
函数运行事件循环,直到future完成。它返回未来的结果,或引发其异常。Future表示异步操作的最终结果。
Python异步创建任务
create_task
函数将给定的协程包装到任务中并安排其执行。它返回任务对象。任务在get_running_loop
返回的循环中执行。
协程被包装到任务中以获得额外的功能,例如任务取消或检查就绪状态。
#!/usr/bin/python import asyncio async def mul(x, y): return x * y loop = asyncio.get_event_loop() try: task = loop.create_task(mul(10, 10)) res = loop.run_until_complete(task) print(res) finally: loop.close()
该示例创建一个任务并安排它执行。它打印最终结果。
Pythonasyncio.sleep
asyncio.sleep
函数使当前协程休眠指定的秒数。该函数通常用于模拟长时间运行的任务。
#!/usr/bin/python import asyncio import time async def task(tid, n): await asyncio.sleep(n) print(f'task {tid} finished') loop = asyncio.get_event_loop() tm1 = time.perf_counter() try: t1 = loop.create_task(task(1, 3)) loop.run_until_complete(t1) t2 = loop.create_task(task(2, 2)) loop.run_until_complete(t2) t3 = loop.create_task(task(3, 1)) loop.run_until_complete(t3) finally: loop.close() tm2 = time.perf_counter() print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
在示例中,我们创建并安排了三个任务。这些任务使用asyncio.sleep
模拟一些工作。每个任务都与主程序异步运行,但任务本身在主程序中按顺序运行。我们使用time.perf_counter
测量经过的时间。
$ ./sleep.py task 1 finished task 2 finished task 3 finished Total time elapsed: 6.01 seconds
Python异步收集
asyncio.gather
函数用于并发调度多个协程。
#!/usr/bin/python import asyncio import time async def task(tid, n): await asyncio.sleep(n) print(f'task {tid} finished') loop = asyncio.get_event_loop() tm1 = time.perf_counter() try: tasks = [ loop.create_task(task(1, 3)), loop.create_task(task(2, 2)), loop.create_task(task(3, 1)) ] loop.run_until_complete(asyncio.gather(*tasks)) finally: loop.close() tm2 = time.perf_counter() print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
在示例中,我们使用asyncio.gather
同时运行这三个任务。
$ ./asyncio_gather.py task 3 finished task 2 finished task 1 finished Total time elapsed: 3.00 seconds
Pythonasyncio.run
asyncio.run
是一个方便的函数,它简化了我们的代码。该函数创建一个事件循环,调度协程并在最后关闭循环。
#!/usr/bin/python import asyncio import time async def task(tid, n): await asyncio.sleep(n) print(f'task {tid} finished') async def main(): t1 = asyncio.create_task(task(1, 3)) t2 = asyncio.create_task(task(2, 2)) t3 = asyncio.create_task(task(3, 1)) await asyncio.gather(t1, t2, t3) tm1 = time.perf_counter() asyncio.run(main()) tm2 = time.perf_counter() print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
使用asyncio.run
,代码更紧凑。
Python异步回显服务器
在下面的例子中,我们创建了一个回显服务器。回显服务器从客户端发回消息。
#!/usr/bin/python import asyncio class EchoProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport def data_received(self, data): print(f'received: {data}') self.transport.write(data) async def main(host, port): print(f'starting server on port {port}') loop = asyncio.get_running_loop() server = await loop.create_server(EchoProtocol, host, port) await server.serve_forever() try: asyncio.run(main('127.0.0.1', 8001)) except KeyboardInterrupt: print('terminated')
create_server
函数返回一个协程,它创建一个绑定到主机和端口的TCP服务器。serve_forever
函数开始接受连接,直到协程被取消。协程取消时服务器关闭。
$ nc 127.0.0.1 8001 Hello! Hello! Hi! Hi!
我们使用nc
命令测试服务器。
多个HTTP请求
要发出多个异步HTTP请求,我们使用httpx
模块。该模块提供了一个包含同步和异步API的HTTP客户端,并支持HTTP/1.1和HTTP/2。
#!/usr/bin/python import httpx import asyncio async def get_async(url): async with httpx.AsyncClient() as client: return await client.get(url) urls = ['http://webcode.me', 'https://httpbin.org/get', 'https://google.com', 'https://stackoverflow.com', 'https://github.com'] async def launch(): resps = await asyncio.gather(*map(get_async, urls)) data = [resp.status_code for resp in resps] for status_code in data: print(status_code) asyncio.run(launch())
该示例向五个网站发出异步HTTP请求。它打印所有提供的url的状态代码。
$ ./async_req.py 200 200 200 200 200
在本文中,我们使用了Python的asyncio模块。
列出所有Python教程。