前言
为了以后写代码运行效率更高,遂补坑。
协程
协程不是计算机提供,程序员人为创造。
协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术,简而言之,其实就是通过一个线程实现代码块相互切换执行。例如:
def func1():
print(1)
...
print2()
def func2():
print(3)
...
print(4)
func1()
func2()
实现协程有这么几种方法:
- greenbelt,早期模块
- yield关键字
- asyncio装饰器(python3.4)
- async,await关键字(python3.5)【推荐】
greenlet
pip3 install greenlet
from greenlet import greenlet
def func1():
print(1) # 第2步:输出1
gr2.switch() # 第3步,切换到func2函数
print(2) # 第6步,输出2
gr2.switch() # 第7步,切换到func2函数
def func2():
print(3) # 第4步,输出3
gr1.switch() # 第5步,切换到func1函数
print(4) # 第8步,输出4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步,去执行func1函数
yield关键字
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
asyncio
在python3.4及之后的版本
import asyncio
@asyncio.coroutines
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)
@asyncio.coroutines
def func2():
print(3)
yield from asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
注意:遇到IO阻塞自动切换,网络请求也是IO
async & await关键字
在python3.5及之后的版本
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)
async def func2():
print(3)
await asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
协程意义
在一个线程中如果遇到IO等待时间,线程不会傻傻等,利用空闲的时间再去完成一点其它的任务
案例:爬取三个网站(网络IO)
普通方式
import requests def get_page(url): response = requests.get(url) print(len(response.text)) if __name__ == "__main__": urls = [ "https://www.ghtwf01.cn/index.php/archives/350/", "https://www.ghtwf01.cn/index.php/archives/341/", "https://www.ghtwf01.cn/index.php/archives/295/" ] for url in urls: get_page(url)
- 协程方式(异步)
import aiohttp
import asyncio
async def get_page(session, url):
async with session.get(url) as response:
html = await response.text()
print(len(html))
async def main():
urls = [
"https://www.ghtwf01.cn/index.php/archives/350/",
"https://www.ghtwf01.cn/index.php/archives/341/",
"https://www.ghtwf01.cn/index.php/archives/295/"
]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(get_page(session, url)) for url in urls]
await asyncio.wait(tasks)
if __name__ == "__main__":
asyncio.run(main())
异步编程
事件循环
理解成一个死循环,去检查并执行某些代码。
# 伪代码
任务列表 = [任务1, 任务2, 任务3]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将"可执行"和"已完成"的任务返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除已完成的任务
如果任务列表中的任务都已完成,则终止循环
import asyncio
# 去生成获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)
如何执行协程函数
协程函数:定义函数时async def 函数名
协程对象:执行协程函数()
得到的协程对象
async def func():
print("1")
result = func()
注意:执行协程函数创建协程对象,函数内部代码不会执行。
如果想要运行协程函数内部代码,必须将协程对象交给事件循环来处理
import asyncio
async def func():
print("1")
result = func()
# loop = asymcio.get_event_loop()
# loop.run_until_complete( result )
asyncio.run( result ) #python3.7
await
await + 可等待的对象(协程对象、Future、 Task对象——>暂且通俗理解为IO等待)
示例1:
import asyncio
async def func():
print(1)
response = await asyncio.sleep(2)
print("结束", response)
asyncio.run( func() )
示例2:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return '返回值'
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)
response = await others()
print("IO请求结束,结果为:", response)
asyncio.run(func())
示例3:
import asyncio
import time
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return '返回值'
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)
response1 = await others()
print("IO请求结束,结果为:", response1)
response2 = await others()
print("IO请求结束,结果为:", response2)
start_time = time.time()
asyncio.run(func())
end_time = time.time()
print("耗时:",end_time-start_time)
看上去像是串行,不过据说因为有await还是会切换到其它任务执行,应该是这个案例没有太多其他任务吧,我切换成串行没发现有时间优化,暂且记住可以优化,可添加到自己的程序里。
Task对象
白话:在事件循环中添加多个任务的。
Tasks用于并发调度协程,通过asyncio.create_task(协程对象)
方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()
函数以外,还可以用低层级的loop.create_task()
或ensure_future()
函数。不建议手动实例化Task对象。
注意:asyncio.create_task()
函数在Python3.7中被加入。在Python3.7之前
可以改用低层级的asyncio.ensure_future()
函数。
示例1:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
async def main():
print("main开始")
# 创建Task对象,将当前执行func函数任务添加到事件循环
task1 = asyncio.create_task( func() )
task2 = asyncio.create_task( func() )
# 当前执行某协程遇到IO操作时,会自动化切换执行其他任务
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
优化后示例2:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
task_list = [
asyncio.create_task( func() ),
asyncio.create_task( func() )
]
print("main结束")
done,pending = await asyncio.wait(task_list)
print(done)
asyncio.run( main() )
如果要将tasklist写在创建事件循环前,示例3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
task_list = [
func(),
func(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)
实战案例
异步redis
在使用python代码操作redis时,链接/操作/断开都是网络IO
pip3 install aioredis
示例1:
import asyncio
import aioredis
async def execute(address, password):
print("开始执行", address)
# 网络IO操作:创建redis连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO操作:在redis中设置哈希值car,内部再设三个键值对
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作:关闭redis连接
await redis.wait_closed()
print("结束", address)
asyncio.run( execute('redis://x.x.x.x','root') )
就是每一步网络IO操作就加上await,但因为没其它操作,所以速度优化感觉不明显,所以有示例2:
import asyncio
import aioredis
async def execute(address, password):
print("开始执行", address)
# 网络IO操作:创建redis连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO操作:在redis中设置哈希值car,内部再设三个键值对
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作:关闭redis连接
await redis.wait_closed()
print("结束", address)
task_list = [
execute('redis://x.x.x.x','root'),
execute('redis://x.x.x.x','root')
]
asyncio.run( asyncio.wait(task_list) )
异步mysql
pip3 install aiomysql
示例1:
import asyncio
import aiomysql
async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3303, user='root', password='123', db='mysql', )
# 网络IO操作:创建CURSOR
cur = await conn.cursor()
# 网络IO操作:执行SQL
result = await cur.fetchall()
# 网络IO操作:关闭连接
await cur.close()
conn.close()
asyncio.run(execute())
一样示例2:
import asyncio
import aiomysql
async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3303, user='root', password='123', db='mysql', )
# 网络IO操作:创建CURSOR
cur = await conn.cursor()
# 网络IO操作:执行SQL
result = await cur.fetchall()
# 网络IO操作:关闭连接
await cur.close()
conn.close()
task_list = [
execute(),
execute(),
]
asyncio.run(asyncio.wait(task_list))
异步爬虫
pip3 install aiohttp
import aiohttp
import asyncio
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print("得到结果:", url, len(text))
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
"https://www.ghtwf01.cn/index.php/archives/350/",
"https://www.ghtwf01.cn/index.php/archives/341/",
"https://www.ghtwf01.cn/index.php/archives/295/"
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
done, pedding = await asyncio.wait(tasks)
if __name__ == "__main__":
asyncio.run( main() )
总结
最大的意义:通过一个线程利用其IO等待时间去做一些其它的事情。