这个包使用事件循环驱动的协程实现并发

asyncio包使用的协程是比较严格的定义,适合 asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield;或者把协程传给 asyncio 包中的某个函数,例如 asyncio.async(...) ,从而驱动协程

asyncio.gatherasyncio.wait

asyncio.gatherasyncio.wait都是同时运行aws中的awaitable objects,但是有一点点区别

asyncio.gather(*aws, loop=None, return_exceptions=False)

如果所有的任务都成功完成了,按照aws中的顺序返回值的汇总列表
如果return_exceptionsFalse(默认),那么第一个异常会冒泡到gather(),其他的任务不会被取消并且继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio


@asyncio.coroutine
def sleepcoro(n):
yield from asyncio.sleep(1) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'

@asyncio.coroutine
def te(n):
print(f'te {n} begin')
m = yield from sleepcoro(n)
if n == 3:
raise Exception(f'err:{n}')
print(m)
print(f'te {n} end')
return 1


loop = asyncio.get_event_loop()
a = asyncio.gather(*[te(i) for i in range(5)])
try:
done = loop.run_until_complete(a)
except Exception as e:
print(e)
###
te 2 begin
te 0 begin
te 3 begin
te 4 begin
te 1 begin
testcoro
te 2 end
testcoro
te 0 end
testcoro
te 4 end
testcoro
te 1 end
err:3

如果return_exceptionsTrue,那么异常会像成功的结果一样添加到结果列表中
1
2
3
4
5
a = asyncio.gather(*[te(i) for i in range(5)],return_exceptions=True)
done = loop.run_until_complete(a)
print(done)
###
[1, 1, 1, Exception('err:3',), 1]

如果gather()被取消。所有没完成的任务也会被取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio


@asyncio.coroutine
def sleepcoro(n):
yield from asyncio.sleep(1) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'

@asyncio.coroutine
def te(n):
print(f'te {n} begin')
m = yield from sleepcoro(n)
if n == 3:
a.cancel()
print(m)
print(f'te {n} end')
return 1


loop = asyncio.get_event_loop()
ls = [te(i) for i in range(5)]
a = asyncio.gather(*ls)
try:
done = loop.run_until_complete(a)
except Exception as e:
print(e)

###
te 2 begin
te 0 begin
te 3 begin
te 4 begin
te 1 begin
testcoro
te 2 end
testcoro
te 0 end
testcoro
te 3 end

asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

同时运行aws中的awaitable objects阻塞,直到return_when指定的条件
loop参数将被废弃
timeout可以指定等待的最大秒数,这个不会引发asyncio.TimeoutError,未完成的任务会在pending中返回
return_when必须是以下常量之一:

描述
FIRST_COMPLETED 任何一个完成或者取消时返回
FIRST_EXCEPTION 任何一个因为引发异常完成时返回,如果都没有与ALL_COMPLETED相同
ALL_COMPLETED 所有的结束或者取消时返回

返回两个列表,done和pending,需通过future.result调用Task的result。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio


@asyncio.coroutine
def sleepcoro(n):
yield from asyncio.sleep(1) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'

@asyncio.coroutine
def te(n):
print(f'te {n} begin')
m = yield from sleepcoro(n)
print(m)
print(f'te {n} end')
return n


loop = asyncio.get_event_loop()
ls = [te(i) for i in range(5)]
a = asyncio.wait(ls)
done, pending = loop.run_until_complete(a)
for i in done:
print(i.result())

###
te 2 begin
te 0 begin
te 3 begin
te 4 begin
te 1 begin
testcoro
te 2 end
testcoro
te 0 end
testcoro
te 3 end
testcoro
te 4 end
testcoro
te 1 end
0
3
4
2
1


aws直接传递协程对象已经被弃用

run_until_complete(future)

参数是一个futrue对象,如果传入了一个协程对象,会被隐式的转换成asyncio.Task

asyncio.ensure_future(obj, *, loop=None)

obj是一个Future-like object(isfuture())
会返回一个asyncio.Task实例,Task 对象可以取消,即调用实例的cancel方法;取消后会在协程当前暂停的 yield 处抛出 asyncio.CancelledError 异常。协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。Python3.7之后推荐用create_task()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio


@asyncio.coroutine
def sleepcoro(n):
yield from asyncio.sleep(n) # 如果使用time.sleep()会阻塞整个应用
return 'testcoro'


@asyncio.coroutine
def te():
while True:
print('o',end='')
try:
m = yield from sleepcoro(.1)
except Exception as e:
print()
print(e.__class__)
break


@asyncio.coroutine
def main():
a = asyncio.ensure_future(te())
print('a:',a)
res = yield from sleepcoro(1)
a.cancel()
return res


if __name__ == '__main__':
loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
print(res)

###

a: <Task pending coro=<te() running at test.py:10>>
oooooooooo
<class 'concurrent.futures._base.CancelledError'>
testcoro

asyncio.run_in_executor(executor, func, *args)

第一个参数是 Executor 实例;如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例。

阻塞型 I/O 调用在背后会释放 GIL,因此另一个线程可以继续。asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio

import time


async def sl():
with open('t.txt','w') as f:
s = 'a'*1000000000
f.write(s)
return 1,3,5


async def main():
result = await sl()
t3(*result)

async def main2():
result = await sl()
loop = asyncio.get_event_loop()
loop.run_in_executor(None,t3,*result)

def t3(a,b,c):
pass


if __name__ == '__main__':
st = time.time()
l = asyncio.get_event_loop()
l.run_until_complete(asyncio.gather(*[main(),main(),main()]))
print(time.time()-st)
st = time.time()
l = asyncio.get_event_loop()
l.run_until_complete(asyncio.gather(*[main2(),main2(),main2()]))
l.close()
print(time.time()-st)

###

7.47020411491394
6.83022403717041