Skip to content

Commit 503345c

Browse files
committed
bpo-30773: Fix ag_running; prohibit running athrow/asend/aclose in parallel
1 parent 9f04f0d commit 503345c

File tree

5 files changed

+74
-64
lines changed

5 files changed

+74
-64
lines changed

Include/genobject.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ typedef struct {
7878
/* Flag is set to 1 when aclose() is called for the first time, or
7979
when a StopAsyncIteration exception is raised. */
8080
int ag_closed;
81+
82+
int ag_running_async;
8183
} PyAsyncGenObject;
8284

8385
PyAPI_DATA(PyTypeObject) PyAsyncGen_Type;

Lib/asyncio/base_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ async def shutdown_asyncgens(self):
490490
self._asyncgens.clear()
491491

492492
results = await tasks.gather(
493-
*[ag.aclose() for ag in closing_agens],
493+
*[ag.aclose() for ag in closing_agens if not ag.ag_running],
494494
return_exceptions=True,
495495
loop=self)
496496

Lib/test/test_asyncgen.py

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,26 @@ def sync_iterate(g):
111111
def async_iterate(g):
112112
res = []
113113
while True:
114+
an = g.__anext__()
114115
try:
115-
g.__anext__().__next__()
116+
while True:
117+
try:
118+
an.__next__()
119+
except StopIteration as ex:
120+
if ex.args:
121+
res.append(ex.args[0])
122+
break
123+
else:
124+
res.append('EMPTY StopIteration')
125+
break
126+
except StopAsyncIteration:
127+
raise
128+
except Exception as ex:
129+
res.append(str(type(ex)))
130+
break
116131
except StopAsyncIteration:
117132
res.append('STOP')
118133
break
119-
except StopIteration as ex:
120-
if ex.args:
121-
res.append(ex.args[0])
122-
else:
123-
res.append('EMPTY StopIteration')
124-
break
125-
except Exception as ex:
126-
res.append(str(type(ex)))
127134
return res
128135

129136
sync_gen_result = sync_iterate(sync_gen)
@@ -151,19 +158,22 @@ async def gen():
151158

152159
g = gen()
153160
ai = g.__aiter__()
154-
self.assertEqual(ai.__anext__().__next__(), ('result',))
161+
162+
an = ai.__anext__()
163+
self.assertEqual(an.__next__(), ('result',))
155164

156165
try:
157-
ai.__anext__().__next__()
166+
an.__next__()
158167
except StopIteration as ex:
159168
self.assertEqual(ex.args[0], 123)
160169
else:
161170
self.fail('StopIteration was not raised')
162171

163-
self.assertEqual(ai.__anext__().__next__(), ('result',))
172+
an = ai.__anext__()
173+
self.assertEqual(an.__next__(), ('result',))
164174

165175
try:
166-
ai.__anext__().__next__()
176+
an.__next__()
167177
except StopAsyncIteration as ex:
168178
self.assertFalse(ex.args)
169179
else:
@@ -187,10 +197,11 @@ async def gen():
187197

188198
g = gen()
189199
ai = g.__aiter__()
190-
self.assertEqual(ai.__anext__().__next__(), ('result',))
200+
an = ai.__anext__()
201+
self.assertEqual(an.__next__(), ('result',))
191202

192203
try:
193-
ai.__anext__().__next__()
204+
an.__next__()
194205
except StopIteration as ex:
195206
self.assertEqual(ex.args[0], 123)
196207
else:
@@ -590,17 +601,13 @@ async def run():
590601
gen = foo()
591602
it = gen.__aiter__()
592603
self.assertEqual(await it.__anext__(), 1)
593-
t = self.loop.create_task(it.__anext__())
594-
await asyncio.sleep(0.01, loop=self.loop)
595604
await gen.aclose()
596-
return t
597605

598-
t = self.loop.run_until_complete(run())
606+
self.loop.run_until_complete(run())
599607
self.assertEqual(DONE, 1)
600608

601609
# Silence ResourceWarnings
602610
fut.cancel()
603-
t.cancel()
604611
self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
605612

606613
def test_async_gen_asyncio_gc_aclose_09(self):
@@ -997,46 +1004,16 @@ async def wait():
9971004

9981005
self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
9991006

1000-
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
1001-
self.assertEqual(finalized, 2)
1002-
10031007
# Silence warnings
10041008
t1.cancel()
10051009
t2.cancel()
1006-
self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
1007-
1008-
def test_async_gen_asyncio_shutdown_02(self):
1009-
logged = 0
1010-
1011-
def logger(loop, context):
1012-
nonlocal logged
1013-
self.assertIn('asyncgen', context)
1014-
expected = 'an error occurred during closing of asynchronous'
1015-
if expected in context['message']:
1016-
logged += 1
1017-
1018-
async def waiter(timeout):
1019-
try:
1020-
await asyncio.sleep(timeout, loop=self.loop)
1021-
yield 1
1022-
finally:
1023-
1 / 0
1024-
1025-
async def wait():
1026-
async for _ in waiter(1):
1027-
pass
1028-
1029-
t = self.loop.create_task(wait())
1030-
self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
1010+
with self.assertRaises(asyncio.CancelledError):
1011+
self.loop.run_until_complete(t1)
1012+
with self.assertRaises(asyncio.CancelledError):
1013+
self.loop.run_until_complete(t2)
10311014

1032-
self.loop.set_exception_handler(logger)
10331015
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
1034-
1035-
self.assertEqual(logged, 1)
1036-
1037-
# Silence warnings
1038-
t.cancel()
1039-
self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
1016+
self.assertEqual(finalized, 2)
10401017

10411018
def test_async_gen_expression_01(self):
10421019
async def arange(n):
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Prohibit parallel running of aclose() / asend() / athrow(). Fix ag_running
2+
to reflect the actual running sttaus of the AG.

Objects/genobject.c

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,8 @@ static PyGetSetDef async_gen_getsetlist[] = {
13401340

13411341
static PyMemberDef async_gen_memberlist[] = {
13421342
{"ag_frame", T_OBJECT, offsetof(PyAsyncGenObject, ag_frame), READONLY},
1343-
{"ag_running", T_BOOL, offsetof(PyAsyncGenObject, ag_running), READONLY},
1343+
{"ag_running", T_BOOL, offsetof(PyAsyncGenObject, ag_running_async),
1344+
READONLY},
13441345
{"ag_code", T_OBJECT, offsetof(PyAsyncGenObject, ag_code), READONLY},
13451346
{NULL} /* Sentinel */
13461347
};
@@ -1435,6 +1436,7 @@ PyAsyncGen_New(PyFrameObject *f, PyObject *name, PyObject *qualname)
14351436
o->ag_finalizer = NULL;
14361437
o->ag_closed = 0;
14371438
o->ag_hooks_inited = 0;
1439+
o->ag_running_async = 0;
14381440
return (PyObject*)o;
14391441
}
14401442

@@ -1482,13 +1484,15 @@ async_gen_unwrap_value(PyAsyncGenObject *gen, PyObject *result)
14821484
gen->ag_closed = 1;
14831485
}
14841486

1487+
gen->ag_running_async = 0;
14851488
return NULL;
14861489
}
14871490

14881491
if (_PyAsyncGenWrappedValue_CheckExact(result)) {
14891492
/* async yield */
14901493
_PyGen_SetStopIterationValue(((_PyAsyncGenWrappedValue*)result)->agw_val);
14911494
Py_DECREF(result);
1495+
gen->ag_running_async = 0;
14921496
return NULL;
14931497
}
14941498

@@ -1533,12 +1537,20 @@ async_gen_asend_send(PyAsyncGenASend *o, PyObject *arg)
15331537
}
15341538

15351539
if (o->ags_state == AWAITABLE_STATE_INIT) {
1540+
if (o->ags_gen->ag_running_async) {
1541+
PyErr_SetString(
1542+
PyExc_RuntimeError,
1543+
"anext(): asynchronous generator is already running");
1544+
return NULL;
1545+
}
1546+
15361547
if (arg == NULL || arg == Py_None) {
15371548
arg = o->ags_sendval;
15381549
}
15391550
o->ags_state = AWAITABLE_STATE_ITER;
15401551
}
15411552

1553+
o->ags_gen->ag_running_async = 1;
15421554
result = gen_send_ex((PyGenObject*)o->ags_gen, arg, 0, 0);
15431555
result = async_gen_unwrap_value(o->ags_gen, result);
15441556

@@ -1802,8 +1814,23 @@ async_gen_athrow_send(PyAsyncGenAThrow *o, PyObject *arg)
18021814
}
18031815

18041816
if (o->agt_state == AWAITABLE_STATE_INIT) {
1817+
if (o->agt_gen->ag_running_async) {
1818+
if (o->agt_args == NULL) {
1819+
PyErr_SetString(
1820+
PyExc_RuntimeError,
1821+
"aclose(): asynchronous generator is already running");
1822+
}
1823+
else {
1824+
PyErr_SetString(
1825+
PyExc_RuntimeError,
1826+
"athrow(): asynchronous generator is already running");
1827+
}
1828+
return NULL;
1829+
}
1830+
18051831
if (o->agt_gen->ag_closed) {
1806-
PyErr_SetNone(PyExc_StopIteration);
1832+
o->agt_state == AWAITABLE_STATE_CLOSED;
1833+
PyErr_SetNone(PyExc_StopAsyncIteration);
18071834
return NULL;
18081835
}
18091836

@@ -1813,6 +1840,7 @@ async_gen_athrow_send(PyAsyncGenAThrow *o, PyObject *arg)
18131840
}
18141841

18151842
o->agt_state = AWAITABLE_STATE_ITER;
1843+
o->agt_gen->ag_running_async = 1;
18161844

18171845
if (o->agt_args == NULL) {
18181846
/* aclose() mode */
@@ -1858,6 +1886,7 @@ async_gen_athrow_send(PyAsyncGenAThrow *o, PyObject *arg)
18581886
/* aclose() mode */
18591887
if (retval) {
18601888
if (_PyAsyncGenWrappedValue_CheckExact(retval)) {
1889+
o->agt_gen->ag_running_async = 0;
18611890
Py_DECREF(retval);
18621891
goto yield_close;
18631892
}
@@ -1871,12 +1900,16 @@ async_gen_athrow_send(PyAsyncGenAThrow *o, PyObject *arg)
18711900
}
18721901

18731902
yield_close:
1903+
o->agt_gen->ag_running_async = 0;
18741904
PyErr_SetString(
18751905
PyExc_RuntimeError, ASYNC_GEN_IGNORED_EXIT_MSG);
18761906
return NULL;
18771907

18781908
check_error:
1879-
if (PyErr_ExceptionMatches(PyExc_StopAsyncIteration)) {
1909+
o->agt_gen->ag_running_async = 0;
1910+
if (PyErr_ExceptionMatches(PyExc_StopAsyncIteration) ||
1911+
PyErr_ExceptionMatches(PyExc_GeneratorExit))
1912+
{
18801913
o->agt_state = AWAITABLE_STATE_CLOSED;
18811914
if (o->agt_args == NULL) {
18821915
/* when aclose() is called we don't want to propagate
@@ -1886,11 +1919,6 @@ async_gen_athrow_send(PyAsyncGenAThrow *o, PyObject *arg)
18861919
PyErr_SetNone(PyExc_StopIteration);
18871920
}
18881921
}
1889-
else if (PyErr_ExceptionMatches(PyExc_GeneratorExit)) {
1890-
o->agt_state = AWAITABLE_STATE_CLOSED;
1891-
PyErr_Clear(); /* ignore these errors */
1892-
PyErr_SetNone(PyExc_StopIteration);
1893-
}
18941922
return NULL;
18951923
}
18961924

@@ -1916,6 +1944,7 @@ async_gen_athrow_throw(PyAsyncGenAThrow *o, PyObject *args)
19161944
} else {
19171945
/* aclose() mode */
19181946
if (retval && _PyAsyncGenWrappedValue_CheckExact(retval)) {
1947+
o->agt_gen->ag_running_async = 0;
19191948
Py_DECREF(retval);
19201949
PyErr_SetString(PyExc_RuntimeError, ASYNC_GEN_IGNORED_EXIT_MSG);
19211950
return NULL;

0 commit comments

Comments
 (0)