python-lib-asyncio
这个包使用事件循环驱动的协程实现并发
asyncio
包使用的协程是比较严格的定义,适合 asyncio API
的协程在定义体中必须使用 yield from
,而不能使用 yield
;或者把协程传给 asyncio
包中的某个函数,例如 asyncio.async(...)
,从而驱动协程
asyncio.gather
和asyncio.wait
asyncio.gather
和asyncio.wait
都是同时运行aws
中的awaitable objects
,但是有一点点区别
asyncio.gather(*aws, loop=None, return_exceptions=False)
如果所有的任务都成功完成了,按照aws
中的顺序返回值的汇总列表
如果return_exceptions
是False
(默认),那么第一个异常会冒泡到gather()
,其他的任务不会被取消并且继续执行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40import asyncio
def sleepcoro(n):
yield from asyncio.sleep(1) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'
def te(n):
print(f'te {n} begin')
m = yield from sleepcoro(n)
if n == 3:
raise Exception(f'err:{n}')
print(m)
print(f'te {n} end')
return 1
loop = asyncio.get_event_loop()
a = asyncio.gather(*[te(i) for i in range(5)])
try:
done = loop.run_until_complete(a)
except Exception as e:
print(e)
###
te 2 begin
te 0 begin
te 3 begin
te 4 begin
te 1 begin
testcoro
te 2 end
testcoro
te 0 end
testcoro
te 4 end
testcoro
te 1 end
err:3
如果return_exceptions
是True
,那么异常会像成功的结果一样添加到结果列表中1
2
3
4
5a = asyncio.gather(*[te(i) for i in range(5)],return_exceptions=True)
done = loop.run_until_complete(a)
print(done)
###
[1, 1, 1, Exception('err:3',), 1]
如果gather()
被取消。所有没完成的任务也会被取消1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40import asyncio
def sleepcoro(n):
yield from asyncio.sleep(1) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'
def te(n):
print(f'te {n} begin')
m = yield from sleepcoro(n)
if n == 3:
a.cancel()
print(m)
print(f'te {n} end')
return 1
loop = asyncio.get_event_loop()
ls = [te(i) for i in range(5)]
a = asyncio.gather(*ls)
try:
done = loop.run_until_complete(a)
except Exception as e:
print(e)
###
te 2 begin
te 0 begin
te 3 begin
te 4 begin
te 1 begin
testcoro
te 2 end
testcoro
te 0 end
testcoro
te 3 end
asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同时运行aws
中的awaitable objects
并阻塞,直到return_when
指定的条件loop
参数将被废弃timeout
可以指定等待的最大秒数,这个不会引发asyncio.TimeoutError
,未完成的任务会在pending中返回return_when
必须是以下常量之一:
值 | 描述 |
---|---|
FIRST_COMPLETED | 任何一个完成或者取消时返回 |
FIRST_EXCEPTION | 任何一个因为引发异常完成时返回,如果都没有与ALL_COMPLETED相同 |
ALL_COMPLETED | 所有的结束或者取消时返回 |
返回两个列表,done和pending,需通过future.result
调用Task的result。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46import asyncio
def sleepcoro(n):
yield from asyncio.sleep(1) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'
def te(n):
print(f'te {n} begin')
m = yield from sleepcoro(n)
print(m)
print(f'te {n} end')
return n
loop = asyncio.get_event_loop()
ls = [te(i) for i in range(5)]
a = asyncio.wait(ls)
done, pending = loop.run_until_complete(a)
for i in done:
print(i.result())
###
te 2 begin
te 0 begin
te 3 begin
te 4 begin
te 1 begin
testcoro
te 2 end
testcoro
te 0 end
testcoro
te 3 end
testcoro
te 4 end
testcoro
te 1 end
0
3
4
2
1已经被弃用aws
直接传递协程对象
run_until_complete(future)
参数是一个futrue对象,如果传入了一个协程对象,会被隐式的转换成asyncio.Task
asyncio.ensure_future(obj, *, loop=None)
obj
是一个Future-like object
(isfuture()
)
会返回一个asyncio.Task
实例,Task 对象可以取消,即调用实例的cancel
方法;取消后会在协程当前暂停的 yield
处抛出 asyncio.CancelledError
异常。协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。Python3.7之后推荐用create_task()
方法
1 | import asyncio |
asyncio.run_in_executor(executor, func, *args)
第一个参数是 Executor
实例;如果设为 None
,使用事件循环的默认 ThreadPoolExecutor
实例。
阻塞型 I/O 调用在背后会释放 GIL,因此另一个线程可以继续。asyncio
的事件循环在背后维护着一个 ThreadPoolExecutor
对象,我们可以调用 run_in_executor
方法,把可调用的对象发给它执行
1 | import asyncio |