在本文中,我们展示了如何使用aiohttp模块在Python中创建异步HTTP客户端和服务器。
通过异步编程,我们可以在执行主程序的同时执行任务。
使用aiohttp模块,我们可以在Python中创建异步HTTP客户端和服务器。该模块还支持websocket。它允许创建具有可插入中间件和路由的Web服务器。
Pythonaiohttp简单客户端
第一个示例是一个简单的异步HTTP客户端。
#!/usr/bin/python import aiohttp import asyncio url = 'http://webcode.me' async def main(): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print("Status:", response.status) data = await response.text() print(data) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main())
在程序中,我们连接到一个小型网站并检索其状态和内容。
import aiohttp import asyncio
我们导入了aiohttp和asyncio模块。
async with aiohttp.ClientSession() as session:
客户端会话已创建。它使用连接池进行网络连接。
async with session.get(url) as response:
我们使用get
方法创建一个GET请求。
print("Status:", response.status)
从响应对象中,我们通过status
字段获取状态代码。
data = await response.text() print(data)
我们通过text
方法获取内容,并将其打印到终端。
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main())
在asyncio
模块的帮助下,我们设置了一个异步编程所必需的事件循环。
λ ./simple_client.py Status: 200 <!DOCTYPE html> <html lang="en"> <head> ...
Pythonaiohttp多个异步请求
在下一个示例中,我们使用aiohttp生成多个异步请求。
#!/usr/bin/python import aiohttp import asyncio async def get_async(url): async with aiohttp.ClientSession() as session: return await session.get(url) urls = ['http://webcode.me', 'https://httpbin.org/get', 'https://google.com', 'https://stackoverflow.com', 'https://github.com'] async def launch(): resps = await asyncio.gather(*map(get_async, urls)) data = [resp.status for resp in resps] for status_code in data: print(status_code) asyncio.run(launch())
在程序中,我们创建了对多个URL的异步HTTPGET请求。
urls = ['http://webcode.me', 'https://httpbin.org/get', 'https://google.com', 'https://stackoverflow.com', 'https://github.com']
这是URL列表。
async def launch(): resps = await asyncio.gather(*map(get_async, urls)) data = [resp.status for resp in resps] for status_code in data: print(status_code)
利用内置的地图功能,我们将get_async
功能应用于URL列表。使用*(星号)运算符将返回的列表解压缩为位置参数。如果所有协程都成功完成,则结果是返回值(HTML代码)的聚合列表。
asyncio.run(launch())
asyncio.run
是一个方便的函数,它简化了我们的代码。该函数创建一个事件循环,调度协程并在最后关闭循环。
$ ./multiple.py 200 200 200 200 200
Pythonaiohttp简单网络服务器
以下示例创建了一个简单的网络服务器。
#!/usr/bin/python from aiohttp import web async def home(req): return web.Response(text="home page") app = web.Application() app.add_routes([web.get('/', home)]) web.run_app(app)
Web服务器有一个返回文本消息的路由。
async def home(req): return web.Response(text="home page")
home
函数返回文本响应。
app = web.Application()
Web应用程序已创建。
app.add_routes([web.get('/', home)])
使用add_routes
添加了一条新路线。/
路径映射到home
处理程序。
web.run_app(app)
run_app
启动网络应用程序。
$ ./simple_web.py ======== Running on http://0.0.0.0:8080 ======== (Press CTRL+C to quit)
我们启动网络服务器。
$ curl localhost:8080 home page
在不同的终端中,我们使用curl
工具生成GET请求。
我们可以使用Python装饰器来定义路由。
#!/usr/bin/python from aiohttp import web routes = web.RouteTableDef() @routes.get('/') async def home(request): return web.Response(text="home page") app = web.Application() app.add_routes(routes) web.run_app(app)
这个程序创建了一个简单的网络服务器,它有一个路由。路由是用@routes
装饰器定义的。
PythonaiohttpJSON响应
JSON(JavaScript对象表示法)是一种轻量级数据交换格式。JSON的官方互联网媒体类型是application/json。
aiohttp
模块使用web.json_response
生成JSON响应。
#!/usr/bin/python from aiohttp import web routes = web.RouteTableDef() @routes.get('/') async def home(req): return web.Response(text="home page") @routes.get('/users') async def users(req): users = [{'name': 'John Doe', 'email': 'john.doe@example.org'}, {'name': 'Roger Roe', 'email': 'roger.roe@example.org'}] return web.json_response(users) app = web.Application() app.add_routes(routes) web.run_app(app)
在应用程序中,我们使用装饰器定义了两个路由。/users
路径向调用方返回JSON响应。
$ curl localhost:8080/ home page $ curl localhost:8080/users [{"name": "John Doe", "email": "john.doe@example.org"}, {"name": "Roger Roe", "email": "roger.roe@example.org"}]
在本文中,我们使用了Python的aiohttp模块。
列出所有Python教程。