3.1 异步操作Redis
-
Python操作Redis时,链接、设置值、获取值都涉及网络IO请求,使用asycio异步的方式可以在IO等待时处理其他任务,从而提升性能
-
安装Python异步操作Redis的模块
pip3 install aioredis
-
示例1
import asyncio import aioredis async def execute(address, password): print("开始执行", address) # 网络IO操作:创建redis连接 redis = await aioredis.create_redis(address, password=password) # 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}} await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 网络IO操作:去redis中获取值 result = await redis.hgetall('car', encoding='utf-8') print(result) redis.close() # 网络IO操作:关闭redis连接 await redis.wait_closed() print("结束", address) asyncio.run(execute('redis://0.0.0.0:6379', None))
-
示例2
import asyncio import aioredis async def execute(address, password): print("开始执行", address) # 网络IO操作:创建redis连接,先去连接 本机192.168.0.101:6379,遇到IO则自动切换任务,去连接192.168.0.101:6379 redis = await aioredis.create_redis_pool(address, password=password) # 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}},遇到IO会自动切换任务 await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 网络IO操作:去redis中获取值,遇到IO会自动切换任务 result = await redis.hgetall('car', encoding='utf-8') print(result) redis.close() # 网络IO操作:关闭redis连接,遇到IO会自动切换任务 await redis.wait_closed() print("结束", address) task_list = [ execute('redis://192.168.0.101:6379', None), execute('redis://192.168.0.101:6379', None) ] asyncio.run(asyncio.wait(task_list))
3.2 异步操作MySQL
-
Python操作MySQL时,链接、设置值、获取值都涉及网络IO请求,使用asycio异步的方式可以在IO等待时处理其他任务,从而提升性能
-
安装Python异步操作MySQL的模块
pip install aiomysql
-
示例1
import asyncio import aioredis async def execute(address, password): print("开始执行", address) # 网络IO操作:创建redis连接 redis = await aioredis.create_redis(address, password=password) # 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}} await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 网络IO操作:去redis中获取值 result = await redis.hgetall('car', encoding='utf-8') print(result) redis.close() # 网络IO操作:关闭redis连接 await redis.wait_closed() print("结束", address) asyncio.run(execute('redis://0.0.0.0:6379', None))
-
示例2
import asyncio import aiomysql async def execute(host, password): print("开始", host) # 网络IO操作:连接MySQL,先去连接 192.168.0.101:3306,遇到IO则自动切换任务,去连接192.168.0.102:3306 conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql') # 网络IO操作:创建CURSOR,遇到IO会自动切换任务 cur = await conn.cursor() # 网络IO操作:执行SQL,遇到IO会自动切换任务 await cur.execute("SELECT Host,User FROM user") # 网络IO操作:获取SQL结果,遇到IO会自动切换任务 result = await cur.fetchall() print(result) # 网络IO操作:关闭链接,遇到IO会自动切换任务 await cur.close() conn.close() print("结束", host) task_list = [ execute('192.168.0.101', None), execute('192.168.0.102', None) ] asyncio.run(asyncio.wait(task_list))
3.3 FastAPI
-
FastAPI是一款用于构建API的高性能web框架,框架基于Python3.6+的
type hints
搭建。 -
接下里的异步示例以
FastAPI
和uvicorn
来讲解(uvicorn是一个支持异步的asgi)。 -
安装FastAPI web 框架
pip3 install fastapi
-
安装uvicorn,本质上为web提供socket server的支持的asgi(一般支持异步称asgi、不支持异步称wsgi)
pip3 install uvicorn
-
示例:
test_fast_api.py
import asyncio import uvicorn import aioredis from aioredis import Redis from fastapi import FastAPI app = FastAPI() REDIS_POOL = aioredis.ConnectionsPool('redis://0.0.0.0:6379', password="", minsize=1, maxsize=10) @app.get("/") def index(): """ 普通操作接口 """ return {"message": "Hello World"} @app.get("/red") async def red(): """ 异步操作接口 服务端只有一个线程,同一时刻只有一个请求被处理 当视图函数在处理第一个请求时,第二个请求此时是等待被处理的状态 当第一个请求遇到IO等待时,会自动切换去接收并处理第二个请求 一旦有请求IO执行完毕,则会再次回到指定请求向下继续执行其功能代码 """ print("请求来了") await asyncio.sleep(3) # 网络IO操作:连接池获取一个连接,遇到IO会自动切换任务 conn = await REDIS_POOL.acquire() redis = Redis(conn) # 网络IO操作:设置值,遇到IO会自动切换任务 await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 网络IO操作:读取值,遇到IO会自动切换任务 result = await redis.hgetall('car', encoding='utf-8') print(result) # 连接归还连接池 REDIS_POOL.release(conn) return result if __name__ == '__main__': uvicorn.run("test_fast_api:app", host="127.0.0.1", port=5000, log_level="info")
1.5 爬虫示例
-
在编写爬虫应用时,需要通过网络IO去请求目标数据,这种情况适合使用异步编程来提升性能,接下来我们使用支持异步编程的aiohttp模块来实现。
-
安装aiohttp模块
pip3 install aiohttp
-
示例1
import aiohttp import asyncio async def fetch(session, url): print("发送请求:", url) async with session.get(url, verify_ssl=False) as response: text = await response.text() print("得到结果:", url, len(text)) return text async def main(): async with aiohttp.ClientSession() as session: url_list = [ 'https://python.org', 'https://www.baidu.com', 'https://www.pythonav.com' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] done, pending = await asyncio.wait(tasks) if __name__ == '__main__': asyncio.run(main())