使用 asyncio
Python 2 对协程的支持是通过生成器(Generator)实现的。利用 yield 实现生产者/消费者模型的例子如下(use_yield.py):
import random
def consumer():
r=None
while 1:
data=yield r
print 'Consuming:{}'.format(data)
r=data+1
def producer(consumer):
n=3
consumer.send(None)
while n:
data=random.choice(range(10))
print('Producing:{}'.format(data))
rs=consumer.send(data)
print 'Consumer return:{}'.format(rs)
n-=1
consumer.close()
c=consumer()
producer(c)
执行的结果如下:
> python chapter13/section5/use_yield.py Producing: 0 Consuming: 0 Consumer return: 1 Producing: 4 Consuming: 4 Consumer return: 5 Producing: 6 Consuming: 6 Consumer return: 7
c.send(None) 和 c.next() 的作用一样,都是让协程运行起来,通过之后不断 send 数据给消费者,消费者再通过“yield r”把执行结果返回给发布者这样的方式,实现一个生产/消费循环。
Python 2 内置的支持大抵如此。大量的项目(尤其是网络编程相关的项目)都是通过使用第三方库实现的协程来编写程序,如 Eventlet 和 Gevent。由于 Python 2 语言的局限,协程的实现比较原始,众多第三方库的实现并不统一,并且通常都需要使用类似 Monkey Patch 的技术才能实现非阻塞 I/O 等特性来真正提高性能。
使用 yield 语句只能将 CPU 控制权还给直接调用者,Python 3.3 中添加了“yield from”表达式,允许生成器把它的部分操作委任给另外一个生成器。但是仍有一些缺点:
- 协程与常规的生成器使用相同语法时容易混淆,尤其对于新的开发者而言。
- 一个函数是否是协程需要通过主体代码中是否使用了 yield 或者 yield from 语句进行检测,这一点容易在重构中忽略,而导致迷惑和错误。
- 对异步调用的支持被 yield 的语法限制了,不能使用更多的语法特性,比如 with 和 for。
Python 3.4 中 asyncio 被纳入了标准库,它提供了使用协程编写单线程并发代码,通过 I/O 多路复用访问套接字和其他资源,运行网络客户端和服务器等原语。而 Python 3.5 添加了 async 和 await 这两个关键字。自此,协程成为新的语法,而不再是一种生成器类型了。I/O 多路复用与协程的引入,可以极大提高高负载下程序的 I/O 性能。
async/await
async 用于声明一个协程:
async def foo():
pass
在普通的函数前加上 async 关键字后,这个函数就变成了一个协程。
await 表示等待另一个协程执行完返回,获取协程执行结果,它必须在协程内才能使用。
Python 3.5 之前协程是这样写的(old_coroutine.py):
import asyncio
@asyncio.coroutine
def slow_operation(n):
yield from asyncio.sleep(1)
print('Slow operation{}complete'.format(n))
@asyncio.coroutine
def main():
yield from asyncio.wait([
slow_operation(1),
slow_operation(2),
slow_operation(3),
])
loop=asyncio.get_event_loop()
loop.run_until_complete(main())
asyncio 事件循环受到了 Tornado 与 Twisted 等的影响,使用事件循环,Tornado、Twisted 以及 Gevent 可以与 asyncio 一起工作。asyncio 还为每个平台选择了最佳的 I/O 机制,比如 UNIX 和 Linux 平台上使用 selectors 库来做系统级别的 I/O 切换。
调用 get_event_loop 将返回默认的事件循环,用于负责所有协程的调度。在大量协程并发执行的过程中,除了在协程中主动使用 await,当本地协程发生 I/O 等待时,调用 asyncio.sleep,程序的控制权也会在不同的协程间切换,从而在 GIL 的限制下实现最大程度的并发执行,不会由于等待 I/O 等原因导致程序阻塞,达到较高的性能。
执行结果如下:
> time python3 chapter13/section5/old_coroutine.py
Slow operation 2 complete
Slow operation 3 complete
Slow operation 1 complete
python3 chapter13/section5/old_coroutine.py 0.10s user 0.05s system 12%cpu 1.201
total
接下来的例子都使用 Python 3.5.1 执行。
使用 async/await 关键词之后会是这样(new_coroutine.py):
import asyncio
async def slow_operation(n):
await asyncio.sleep(1)
print('Slow operation{}complete'.format(n))
async def main():
await asyncio.wait([
slow_operation(1),
slow_operation(2),
slow_operation(3),
])
loop=asyncio.get_event_loop()
loop.run_until_complete(main())
可以感受到,async 的使用简化了 asyncio.coroutine;await 的使用简化了 yield from。
除了“async def”,还有“async for”和“async with”关键字。
1.async for:异步迭代器语法。为了支持异步迭代,异步对象需要实现__aiter__方法,异步迭代器需要实现__anext__方法,停止迭代需要在__anext__方法内抛出 StopAsyncIteration 异常(async_for.py)。
import random
import asyncio
class AsyncIterable:
def __init__(self):
self.count=0
async def __aiter__(self):
return self
async def __anext__(self):
if self.count >= 5:
raise StopAsyncIteration
data=await self.fetch_data()
self.count+=1
return data
async def fetch_data(self):
return random.choice(range(10))
async def main():
async for data in AsyncIterable():
print(data)
loop=asyncio.get_event_loop()
loop.run_until_complete(main())
2.async with:异步上下文管理器语法。为了支持上下文管理器,需要实现__aenter__和__aexit__方法(async_with.py)。
async def log(msg):
print(msg)
class AsyncContextManager:
async def__aenter__(self):
await log('entering context')
async def__aexit__(self, exc_type, exc, tb):
await log('exiting context')
async def coro():
async with AsyncContextManager():
print('body')
除了使用事件循环,采用原来的 send(None) 方式也是可以的:
c=coro()
try:
c.send(None)
except StopIteration:
print('finished')
它的执行结果如下:
entering context body exiting context finished
Future
Future 是一种异步编程范式,它对异步过程调用的结果做了抽象,它并不关心具体的异步机制。无论是线程、网络,还是 I/O,甚至 RPC,只要是异步过程调用,都可以通过 Future 的概念统一处理。基于 Future 的接口可以简化代码编写,让各种异步操作以一种顺序的、更接近人类逻辑思维的方式编写异步代码。
asyncio.Future 几乎兼容 13.4 节介绍的 concurrent.futures.Future:
> cat chapter13/section5/async_future.py
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Done!')
loop=asyncio.get_event_loop()
future=asyncio.Future()
print('Future Done:{}'.format(future.done()))
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print('Future Done:{}'.format(future.done()))
print(future.result())
loop.close()
它的执行结果如下:
Future Done:False Future Done:True Done!
asyncio.Future 也支添加回调函数:
from functools import partial
def set_result(future, result):
print('Setting future result to{!r}'.format(result))
future.set_result(result)
def callback(who, future):
print('[{}]:returned result:{!r}'.format(who, future.result()))
event_loop=asyncio.get_event_loop()
future=asyncio.Future()
future.add_done_callback(partial(callback, 'CB1'))
event_loop.call_soon(set_result, future, 'Done!')
future.add_done_callback(partial(callback, 'CB2'))
event_loop.run_until_complete(future)
event_loop.close()
通过 call_soon 添加修改结果的回调函数,通过 add_done_callback 添加 Future 完成的回调函数,它的执行结果如下:
Setting future result to 'Done!' [CB1]: returned result: 'Done!' [CB2]: returned result: 'Done!'
可见添加的回调是一个先进先出(FIFO)的队列,保证了回调的顺序。这里强调一点,Future 的结果在设置之后就不能修改了:
... future.add_done_callback(partial(callback, 'CB1')) event_loop.call_soon(set_result, future, 'Done!') event_loop.call_soon(set_result, future, 'Done again!') event_loop.run_until_complete(future)
如果再将结果设置为“Done again!”,就会收到 InvalidStateError 错误。
使用 aiohttp
现在把 13.4 节的抓取微信文章页面的爬虫用 asyncio 来实现。首先创建一个 Python 3 的虚拟环境,再安装相关依赖:
> pyvenv-3.5 ~/venv3 > source ~/venv3/bin/activate > pip install beautifulsoup4 lxml aiohttp mongoengine fake_useragent cchardet
chardet 是一个常用的字符编码检测器,cchardet 是一个更快的实现,可以替代 chardet。
用 aiohttp 替代 requests 作为 HTTP 客户端,先感受下 aiohttp 的用法:
from asyncio import TimeoutError
import aiohttp
from aiohttp import ProxyConnectionError
async def fetch(retry=0):
proxy='http://{}'.format(Proxy.get_random()['address'])
headers={'user-agent':get_user_agent()}
conn=aiohttp.ProxyConnector(proxy=proxy)
url='http://httpbin.org/ip'
try:
with aiohttp.ClientSession(connector=conn) as session:
with aiohttp.Timeout(TIMEOUT):
async with session.get(url, headers=headers) as resp:
return await resp.json()
except (ProxyConnectionError, TimeoutError):
try:
p=Proxy.objects.get(address=proxy)
if p:
p.delete()
except DoesNotExist:
pass
retry += 1
if retry > 5:
raise TimeoutError()
await asyncio.sleep(1)
return await fetch(retry=retry)
requests 所支持的常用特性 aiohttp 也都是支持的,只是在用法上有比较大的调整。
使用事件循环调用 fetch 函数:
loop=asyncio.get_event_loop()
f=asyncio.wait([fetch()])
completed, pending=loop.run_until_complete(f)
for future in completed:
print(future.result())
其中使用 asyncio.wait 接受了一个任务列表,run_until_complete 返回的也会是一个任务结果的列表。
aiohttp 也可以用来实现 HTTP 服务,我们实现一个简单的 API 服务(aiohttp_server.py):
import json
import asyncio
import aiohttp
from aiohttp import web
REQEUST_URLS=[
'http://httpbin.org/ip',
'http://httpbin.org/user-agent',
'http://httpbin.org/headers'
]
async def handle(request):
coroutines=[aiohttp.request('get', url) for url in REQEUST_URLS]
results=await asyncio.gather(*coroutines, return_exceptions=True)
response_data={
url:not isinstance(result, Exception) and result.status==200
for url, result in zip(REQEUST_URLS, results)
}
body=json.dumps(response_data).encode('utf-8')
return web.Response(body=body, content_type="application/json")
loop=asyncio.get_event_loop()
app=web.Application(loop=loop)
app.router.add_route('GET', '/', handle)
server=loop.create_server(app.make_handler(), '0.0.0.0', 8080)
print('Server started at http://127.0.0.1:8080')
loop.run_until_complete(server)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
使用 asyncio.gather 替代 asyncio.wait 并设置 return_exceptions 为 True,会收集到全部内容,而且不会抛出异常,只是返回一个异常的对象。
使用队列
asyncio 同样支持多种原语,如锁(Lock)、事件(Event)、信号量(Semaphore)、条件变量(Condition),当然也支持队列。asyncio 中的队列的设计很像 Queue 模块,但是没有 timeout 参数,只能通过 asyncio.wait_for 在任务超时之后取消任务。队列包含如下三种。
- Queue:用生产者/消费者模型的队列,适合我们的微信文章抓取队列。
- PriorityQueue:Queue 的子类,带有优先级的队列。
- LifoQueue:Queue 的子类,最近添加优先的队列。
改写微信抓取,其实就是做如下四件事。
1.修改使用的标准库模块的引用方法。比如 url 解析相关的函数都放在 urllib.parse 下,需要修改为如下引用:
from urllib.parse import urlparse, urlsplit, parse_qs, urlencode
2.给希望改写成协程的函数添加 async 关键字。
3.在调用协程函数的地方添加 await 关键字。
4.使用事件循环。
看一下修改后的 save_article_result_with_queue 函数:
async def save_article_result_with_queue(queue):
while 1:
article=await queue.get()
if article is None:
queue.task_done()
break
await save_article_result(article, queue)
queue.task_done()
和之前的处理空队列的逻辑不同,我们没有捕获 Empty 异常然后 break,而是向队列添加一个 None,get 的时候发现消息为 None 就结束循环。这样做的原因是 queue.get 不接受 timeout 参数,这样可以简化实现。
把文章放入队列的操作,我们放在一个协程函数中来做:
async def producer(queue):
for article in Article.objects.all():
await queue.put(article)
for i in range(5): # 有几个协程就放为几个为 None 的消息
await queue.put(None)
await queue.join()
现在使用事件循环:
loop=asyncio.get_event_loop()
queue=asyncio.Queue()
consumers=[
loop.create_task(save_article_result_with_queue(queue,))
for i in range(5)
]
prod=loop.create_task(producer(queue))
loop.run_until_complete(
asyncio.wait(consumers+[prod])
)
create_task 创建的是 asyncio.tasks.Task 对象,它是 asyncio.Future 的子类,相当于 asynio 对 Future 的封装。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论