From 1a87f321965b763e72a96d46e51d69929afd71d1 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Tue, 20 Apr 2021 22:26:10 +0200 Subject: [PATCH 1/9] Stackless issue #239: refactor SLP_EXCHANGE_EXCINFO to become a function Use a Py_LOCAL_INLINE function instead of a complicated macro. It is much simpler to debug. In the next commit, we will context switching to this function. --- Stackless/module/scheduling.c | 61 ++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/Stackless/module/scheduling.c b/Stackless/module/scheduling.c index 83b041e333f5a1..913a0a71a44911 100644 --- a/Stackless/module/scheduling.c +++ b/Stackless/module/scheduling.c @@ -712,36 +712,54 @@ new_lock(void) * the exc_info-pointer in the thread state. */ +#if 1 +Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObject *task) +{ + PyThreadState *ts_ = (tstate); + PyTaskletObject *t_ = (task); + _PyErr_StackItem *exc_info; + assert(ts_); + assert(t_); + exc_info = ts_->exc_info; + assert(exc_info); + assert(t_->exc_info); #if 0 -#define SLP_EXCHANGE_EXCINFO(tstate, task) \ + PyObject *c; + c = PyStackless_GetCurrent(); + fprintf(stderr, "SLP_EXCHANGE_EXCINFO %3d current %14p,\tset task %p = %p,\ttstate %p = %p\n", __LINE__, c, t_, exc_info, ts_, t_->exc_info); + Py_XDECREF(c); +#endif + ts_->exc_info = t_->exc_info; + t_->exc_info = exc_info; +} +#else +#define SLP_EXCHANGE_EXCINFO(tstate_, task_) \ do { \ - PyThreadState *ts_ = (tstate); \ - PyTaskletObject *t_ = (task); \ + PyThreadState *ts_ = (tstate_); \ + PyTaskletObject *t_ = (task_); \ _PyErr_StackItem *exc_info; \ - PyObject * c = PyStackless_GetCurrent(); \ assert(ts_); \ assert(t_); \ exc_info = ts_->exc_info; \ assert(exc_info); \ assert(t_->exc_info); \ - fprintf(stderr, "SLP_EXCHANGE_EXCINFO %3d current %14p,\tset task %p = %p,\ttstate %p = %p\n", __LINE__, c, t_, exc_info, ts_, t_->exc_info); \ - Py_XDECREF(c); \ ts_->exc_info = t_->exc_info; \ t_->exc_info = exc_info; \ } while(0) +#endif + +#if 1 +Py_LOCAL_INLINE(void) SLP_UPDATE_TSTATE_ON_SWITCH(PyThreadState *tstate, PyTaskletObject *prev, PyTaskletObject *next) +{ + SLP_EXCHANGE_EXCINFO(tstate, prev); + SLP_EXCHANGE_EXCINFO(tstate, next); +} #else -#define SLP_EXCHANGE_EXCINFO(tstate, task) \ +#define SLP_UPDATE_TSTATE_ON_SWITCH(tstate__, prev_, next_) \ do { \ - PyThreadState *ts_ = (tstate); \ - PyTaskletObject *t_ = (task); \ - _PyErr_StackItem *exc_info; \ - assert(ts_); \ - assert(t_); \ - exc_info = ts_->exc_info; \ - assert(exc_info); \ - assert(t_->exc_info); \ - ts_->exc_info = t_->exc_info; \ - t_->exc_info = exc_info; \ + PyThreadState *ts__ = (tstate__); \ + SLP_EXCHANGE_EXCINFO(ts__, (prev_)); \ + SLP_EXCHANGE_EXCINFO(ts__, (next_)); \ } while(0) #endif @@ -1147,10 +1165,9 @@ slp_schedule_task_prepared(PyThreadState *ts, PyObject **result, PyTaskletObject retval = slp_bomb_explode(retval); } /* no failure possible from here on */ - SLP_EXCHANGE_EXCINFO(ts, prev); + SLP_UPDATE_TSTATE_ON_SWITCH(ts, prev, next); ts->recursion_depth = next->recursion_depth; ts->st.current = next; - SLP_EXCHANGE_EXCINFO(ts, next); if (did_switch) *did_switch = 1; *result = STACKLESS_PACK(ts, retval); @@ -1175,8 +1192,7 @@ slp_schedule_task_prepared(PyThreadState *ts, PyObject **result, PyTaskletObject else transfer = slp_transfer; - SLP_EXCHANGE_EXCINFO(ts, prev); - SLP_EXCHANGE_EXCINFO(ts, next); + SLP_UPDATE_TSTATE_ON_SWITCH(ts, prev, next); transfer_result = transfer(cstprev, next->cstate, prev); /* Note: If the transfer was successful from here on "prev" holds the @@ -1224,8 +1240,7 @@ slp_schedule_task_prepared(PyThreadState *ts, PyObject **result, PyTaskletObject } else { /* Failed transfer. */ - SLP_EXCHANGE_EXCINFO(ts, next); - SLP_EXCHANGE_EXCINFO(ts, prev); + SLP_UPDATE_TSTATE_ON_SWITCH(ts, next, prev); PyFrameObject *f = SLP_CLAIM_NEXT_FRAME(ts); Py_XSETREF(next->f.frame, f); /* revert the Py_CLEAR(next->f.frame) */ kill_wrap_bad_guy(prev, next); From f9400d874fe5d50ee353e467ac6fdb3ede4fbf6f Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Tue, 20 Apr 2021 22:51:32 +0200 Subject: [PATCH 2/9] Stackless issue #239: add support for PEP 567 context variables Add a private context attribute and appropriate methods to class tasklet. Document the changes in the manual. New methods: tasklet.set_context(context), tasklet.context_run(...) New readonly attribute: tasklet.context_id --- Doc/library/stackless/pickling.rst | 12 + Doc/library/stackless/tasklets.rst | 122 +++++++ Include/internal/stackless_impl.h | 5 + Include/slp_structs.h | 1 + Python/context.c | 27 +- Stackless/changelog.txt | 7 + Stackless/module/clinic/taskletobject.c.h | 49 +++ Stackless/module/scheduling.c | 15 +- Stackless/module/taskletobject.c | 228 +++++++++++- Stackless/unittests/test_miscell.py | 423 ++++++++++++++++++++++ 10 files changed, 879 insertions(+), 10 deletions(-) create mode 100644 Stackless/module/clinic/taskletobject.c.h diff --git a/Doc/library/stackless/pickling.rst b/Doc/library/stackless/pickling.rst index 9cab8e970f1a36..a746ce05d63e59 100644 --- a/Doc/library/stackless/pickling.rst +++ b/Doc/library/stackless/pickling.rst @@ -108,6 +108,18 @@ different address than *t1*, which was displayed earlier. objects and frame objects contain code objects. And code objects are usually incompatible between different minor versions of |CPY|. +.. note:: + + If you pickle a tasklet, its :class:`~contextvars.Context` won't be pickled, + because :class:`~contextvars.Context` objects can't be pickled. See + :pep:`567` for an explanation. + + It is however possible to create a subclass of :class:`tasklet` and + overload the methods :meth:`tasklet.__reduce_ex__` and :meth:`tasklet.__setstate__` to + pickle the values of particular :class:`~contextvars.ContextVar` objects together + with a tasklet. + + ====================== Pickling other objects ====================== diff --git a/Doc/library/stackless/tasklets.rst b/Doc/library/stackless/tasklets.rst index 1f702c2fd4d9bd..2f8661d5b3888a 100644 --- a/Doc/library/stackless/tasklets.rst +++ b/Doc/library/stackless/tasklets.rst @@ -110,6 +110,13 @@ The ``tasklet`` class :meth:`tasklet.setup`. The difference is that when providing them to :meth:`tasklet.bind`, the tasklet is not made runnable yet. + .. versionadded:: 3.7.6 + + If *func* is not :data:`None`, this method also sets the + :class:`~contextvars.Context` object of this tasklet to the + :class:`~contextvars.Context` object of the current tasklet. + Therefore it is usually not required to set the context explicitly. + *func* can be :data:`None` when providing arguments, in which case a previous call to :meth:`tasklet.bind` must have provided the function. @@ -344,6 +351,47 @@ The ``tasklet`` class # Implement unsafe logic here. t.set_ignore_nesting(old_value) +.. method:: tasklet.set_context(context) + + .. versionadded:: 3.7.6 + + Set the :class:`~contextvars.Context` object to be used while this tasklet runs. + + Every tasklet has a private context attribute. + When the tasklet runs, this context becomes the current context of the thread. + + :param context: the context to be set + :type context: :class:`contextvars.Context` + :return: the tasklet itself + :rtype: :class:`tasklet` + :raises RuntimeError: if the tasklet is bound to a foreign thread and is current or scheduled. + :raises RuntimeError: if called from within :meth:`contextvars.Context.run`. + + .. note:: + + The methods :meth:`__init__`, :meth:`bind` and :meth:`__setstate__` also set the context + of the tasklet they are called on to the context of the current tasklet. Therefore it is + usually not required to set the context explicitly. + +.. method:: tasklet.context_run(callable, \*args, \*\*kwargs) + + .. versionadded:: 3.7.6 + + Execute ``callable(*args, **kwargs)`` in the context object of the tasklet + the contest_run method is called on. Return the result of the + execution or propagate an exception if one occurred. + This method is roughly equivalent following pseudo code:: + + def context_run(self, callable, *args, **kwargs): + saved_context = stackless.current._internal_get_context() + stackless.current.set_context(self._internal_get_context()) + try: + return callable(*args, **kw) + finally: + stackless.current.set_context(saved_context) + + See also :meth:`contextvars.Context.run` for additional information. + .. method:: tasklet.__del__() .. versionadded:: 3.7 @@ -365,6 +413,12 @@ The ``tasklet`` class See :meth:`object.__setstate__`. + .. versionadded:: 3.7.6 + + If the tasklet becomes alive through this + call, the :meth:`~__setstate__` also sets the :class:`~contextvars.Context` object of the + tasklet to the :class:`~contextvars.Context` object of the current tasklet. + :param state: the state as given by ``__reduce_ex__(...)[2]`` :type state: :class:`tuple` :return: self @@ -423,6 +477,13 @@ The following attributes allow checking of user set situations: This attribute is ``True`` while this tasklet is within a :meth:`tasklet.set_ignore_nesting` block +.. attribute:: tasklet.context_id + + .. versionadded:: 3.7.6 + + This attribute is the :func:`id` of the :class:`~contextvars.Context` object to be used while this tasklet runs. + + The following attributes allow identification of tasklet place: .. attribute:: tasklet.is_current @@ -511,3 +572,64 @@ state transitions these functions are roughly equivalent to the following def schedule_remove(): stackless.current.next.switch() + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Tasklets and Context Variables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. versionadded:: 3.7.6 + +Version 3.7 of the |PPL| adds context variables, see module :mod:`contextvars`. +Usually they are used in connection with +:mod:`asyncio`, but they are a useful concept for |SLP| too. +Using context variables and multiple tasklets together didn't work well in |SLP| versions 3.7.0 to +3.7.5, because all tasklets of a given thread shared the same context. + +Starting with version 3.7.6 |SLP| adds explicit support for context variables. +Design requirements were + +1. Be fully compatible with |CPY| and its design decisions. +2. Be fully compatible with previous applications of |SLP|, which are unaware of context variables. +3. Automatically share a context between related tasklets. This way a tasklet, that needs to set + a context variable, can delegate this duty to a sub-tasklet without the need to manage the + context of the sub-tasklet manually. +4. Enable the integration of tasklet-based co-routines into the :mod:`asyncio` framework. + This is an obvious application which involves context variables and tasklets. + +Now each tasklet object has a private context attribute, which is either undefined (``NULL``) or a +:class:`~contextvars.Context` object. The design goals have some consequences: + +* The active :class:`~contextvars.Context` object of a thread (as defined by the |PPL|) + is the context of the :attr:`~stackless.current` tasklet. This implies that a tasklet switch, + switches the active context of the thread. + +* In accordance with the design decisions made in :pep:`576` the context of a tasklet can't be + accessed directly, but you can use the method :meth:`tasklet.context_run` to run arbitrary code + in this context. For instance ``tasklet.context_run(contextvars.copy_context())`` returns a copy + of the context. + The attribute :attr:`tasklet.context_id` can be used to test, if two tasklets share the context. + +* A tasklet, whose context is undefined must behave identically to a tasklet, whose context is an + empty :class:`~contextvars.Context` object. [#f1]_ Therefore the |PY| API provides no way to distinguish + both states. + +* Whenever the context of a tasklet is to be shared with another tasklet and the context is initially + undefined, it must be set to a newly created :class:`~contextvars.Context` object beforehand. + This affects the methods :meth:`~tasklet.context_run`, :meth:`~tasklet.__init__`, :meth:`~tasklet.bind` + and :meth:`~tasklet.__setstate__`. + +* If the state of a tasklet changes from *not alive* to *bound* or to *alive* (methods :meth:`~tasklet.__init__`, + :meth:`~tasklet.bind` or :meth:`~tasklet.__setstate__`), the context + of the tasklet is set to the currently active context. This way a newly initialized tasklet automatically + shares the context of its creator. + +* The :mod:`contextvars` implementation of |CPY| imposes several restrictions on |SLP|. Especially the sanity checks in + :c:func:`PyContext_Enter` and :c:func:`PyContext_Exit` make it impossible to replace the current context within + the execution of the method :meth:`contextvars.Context.run`. In that case |SLP| raises :exc:`RuntimeError`. + +.. rubric:: Footnotes + +.. [#f1] Setting a context variable to a non default value sets a previously undefined + context attribute to a newly created :class:`~contextvars.Context` object. This can happen anytime in a + library call. Therefore any difference between an undefined context and an empty context causes ill defined + behavior. diff --git a/Include/internal/stackless_impl.h b/Include/internal/stackless_impl.h index 437b3b58efb527..89ffd67cfb13f5 100644 --- a/Include/internal/stackless_impl.h +++ b/Include/internal/stackless_impl.h @@ -761,6 +761,11 @@ PyTaskletTStateStruc * slp_get_saved_tstate(PyTaskletObject *task); PyObject * slp_channel_seq_callback(struct _frame *f, int throwflag, PyObject *retval); PyObject * slp_get_channel_callback(void); +/* + * contextvars related prototypes + */ +PyObject* slp_context_run_callback(PyFrameObject *f, int exc, PyObject *result); + /* macro for use when interrupting tasklets from watchdog */ #define TASKLET_NESTING_OK(task) \ (ts->st.nesting_level == 0 || \ diff --git a/Include/slp_structs.h b/Include/slp_structs.h index 6d26204af3a9c0..13e686fb44c592 100644 --- a/Include/slp_structs.h +++ b/Include/slp_structs.h @@ -134,6 +134,7 @@ typedef struct _tasklet { int recursion_depth; PyObject *def_globals; PyObject *tsk_weakreflist; + PyObject *context; /* if running: the saved context, otherwise the context for the tasklet */ } PyTaskletObject; diff --git a/Python/context.c b/Python/context.c index 54986c322e3bf2..8eb874348bc725 100644 --- a/Python/context.c +++ b/Python/context.c @@ -604,23 +604,32 @@ _contextvars_Context_copy_impl(PyContext *self) #ifdef STACKLESS -static PyObject* context_run_callback(PyFrameObject *f, int exc, PyObject *result) +PyObject* slp_context_run_callback(PyFrameObject *f, int exc, PyObject *result) { PyCFrameObject *cf = (PyCFrameObject *)f; - assert(PyContext_CheckExact(cf->ob1)); PyObject *context = cf->ob1; cf->ob1 = NULL; - if (PyContext_Exit(context)) { - Py_CLEAR(result); + if (cf->i) { + /* called by tasklet.context_run(...) */ + PyThreadState *ts = PyThreadState_GET(); + assert(ts); + assert(NULL == context || PyContext_CheckExact(context)); + Py_XSETREF(ts->context, context); + ts->context_ver++; + } else { + assert(PyContext_CheckExact(context)); + if (PyContext_Exit(context)) { + Py_CLEAR(result); + } + Py_DECREF(context); } - Py_DECREF(context); SLP_STORE_NEXT_FRAME(PyThreadState_GET(), cf->f_back); return result; } -SLP_DEF_INVALID_EXEC(context_run_callback) +SLP_DEF_INVALID_EXEC(slp_context_run_callback) #endif @@ -629,6 +638,7 @@ context_run(PyContext *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) { STACKLESS_GETARG(); + assert(NULL != self); if (nargs < 1) { PyErr_SetString(PyExc_TypeError, @@ -644,11 +654,12 @@ context_run(PyContext *self, PyObject *const *args, PyThreadState *ts = PyThreadState_GET(); PyCFrameObject *f = NULL; if (stackless) { - f = slp_cframe_new(context_run_callback, 1); + f = slp_cframe_new(slp_context_run_callback, 1); if (f == NULL) return NULL; Py_INCREF(self); f->ob1 = (PyObject *)self; + assert(f->i == 0); SLP_SET_CURRENT_FRAME(ts, (PyFrameObject *)f); /* f contains the only counted reference to current frame. This reference * keeps the fame alive during the following _PyObject_FastCallKeywords(). @@ -1326,7 +1337,7 @@ _PyContext_Init(void) #ifdef STACKLESS if (slp_register_execute(&PyCFrame_Type, "context_run_callback", - context_run_callback, SLP_REF_INVALID_EXEC(context_run_callback)) != 0) + slp_context_run_callback, SLP_REF_INVALID_EXEC(slp_context_run_callback)) != 0) { return 0; } diff --git a/Stackless/changelog.txt b/Stackless/changelog.txt index 4b69b2329978e4..b4907ff5364fc5 100644 --- a/Stackless/changelog.txt +++ b/Stackless/changelog.txt @@ -9,6 +9,13 @@ What's New in Stackless 3.X.X? *Release date: 20XX-XX-XX* +- https://github.com/stackless-dev/stackless/issues/239 + Add support for PEP 567 context variables to tasklets. + Each tasklet now has a contextvars.Context object, that becomes active during + the execution of the tasklet. + New tasklet methods set_context() and context_run(). + New read only attribute tasklet.context_id. + - https://github.com/stackless-dev/stackless/issues/241 Prevent a crash, if you call tasklet.__setstate__() on a tasklet, that is alive. diff --git a/Stackless/module/clinic/taskletobject.c.h b/Stackless/module/clinic/taskletobject.c.h new file mode 100644 index 00000000000000..b96b253fff3e79 --- /dev/null +++ b/Stackless/module/clinic/taskletobject.c.h @@ -0,0 +1,49 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +#if defined(STACKLESS) + +PyDoc_STRVAR(_stackless_tasklet_set_context__doc__, +"set_context($self, /, context)\n" +"--\n" +"\n" +"Set the context to be used while this tasklet runs.\n" +"\n" +"Every tasklet has a private context attribute. When the tasklet runs,\n" +"this context becomes the current context of the thread.\n" +"\n" +"This method raises RuntimeError, if the tasklet is bound to a foreign thread and is current or scheduled.\n" +"This method raises RuntimeError, if called from within Context.run().\n" +"This method returns the tasklet it is called on."); + +#define _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF \ + {"set_context", (PyCFunction)_stackless_tasklet_set_context, METH_FASTCALL|METH_KEYWORDS, _stackless_tasklet_set_context__doc__}, + +static PyObject * +_stackless_tasklet_set_context_impl(PyTaskletObject *self, PyObject *context); + +static PyObject * +_stackless_tasklet_set_context(PyTaskletObject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"context", NULL}; + static _PyArg_Parser _parser = {"O!:set_context", _keywords, 0}; + PyObject *context; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &PyContext_Type, &context)) { + goto exit; + } + return_value = _stackless_tasklet_set_context_impl(self, context); + +exit: + return return_value; +} + +#endif /* defined(STACKLESS) */ + +#ifndef _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF + #define _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF +#endif /* !defined(_STACKLESS_TASKLET_SET_CONTEXT_METHODDEF) */ +/*[clinic end generated code: output=f45e93a186a75d8b input=a9049054013a1b77]*/ diff --git a/Stackless/module/scheduling.c b/Stackless/module/scheduling.c index 913a0a71a44911..b40fa329f5f713 100644 --- a/Stackless/module/scheduling.c +++ b/Stackless/module/scheduling.c @@ -710,6 +710,10 @@ new_lock(void) * whereas Stackless stores the exception state in the tasklet object. * When switching from one tasklet to another tasklet, we have to switch * the exc_info-pointer in the thread state. + * + * The same situation is for the current contextvars.Context. When switching + * from one tasklet to another tasklet, we have to switch the context-pointer + * in the thread state. */ #if 1 @@ -718,19 +722,23 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec PyThreadState *ts_ = (tstate); PyTaskletObject *t_ = (task); _PyErr_StackItem *exc_info; + PyObject *c; assert(ts_); assert(t_); exc_info = ts_->exc_info; assert(exc_info); assert(t_->exc_info); #if 0 - PyObject *c; c = PyStackless_GetCurrent(); fprintf(stderr, "SLP_EXCHANGE_EXCINFO %3d current %14p,\tset task %p = %p,\ttstate %p = %p\n", __LINE__, c, t_, exc_info, ts_, t_->exc_info); Py_XDECREF(c); #endif ts_->exc_info = t_->exc_info; t_->exc_info = exc_info; + c = ts_->context; + ts_->context = t_->context; + t_->context = c; + ts_->context_ver++; } #else #define SLP_EXCHANGE_EXCINFO(tstate_, task_) \ @@ -738,6 +746,7 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec PyThreadState *ts_ = (tstate_); \ PyTaskletObject *t_ = (task_); \ _PyErr_StackItem *exc_info; \ + PyObject *c; \ assert(ts_); \ assert(t_); \ exc_info = ts_->exc_info; \ @@ -745,6 +754,10 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec assert(t_->exc_info); \ ts_->exc_info = t_->exc_info; \ t_->exc_info = exc_info; \ + c = ts_->context; \ + ts_->context = t_->context; \ + t_->context = c; \ + ts_->context_ver++; \ } while(0) #endif diff --git a/Stackless/module/taskletobject.c b/Stackless/module/taskletobject.c index 01e23f4f4def7e..92c105ce3205ac 100644 --- a/Stackless/module/taskletobject.c +++ b/Stackless/module/taskletobject.c @@ -9,6 +9,15 @@ #ifdef STACKLESS #include "internal/stackless_impl.h" +#include "internal/context.h" + +/*[clinic input] +module _stackless +class _stackless.tasklet "PyTaskletObject *" "&PyTasklet_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=81570dcf604e6e6d]*/ +#include "clinic/taskletobject.c.h" + /* * Convert C-bitfield @@ -162,6 +171,7 @@ tasklet_traverse(PyTaskletObject *t, visitproc visit, void *arg) Py_VISIT(t->exc_state.exc_type); Py_VISIT(t->exc_state.exc_value); Py_VISIT(t->exc_state.exc_traceback); + Py_VISIT(t->context); return 0; } @@ -193,6 +203,7 @@ tasklet_clear(PyTaskletObject *t) tasklet_clear_frames(t); Py_CLEAR(t->tempval); Py_CLEAR(t->def_globals); + Py_CLEAR(t->context); /* unlink task from cstate */ if (t->cstate != NULL && t->cstate->task == t) @@ -328,6 +339,66 @@ PyTasklet_New(PyTypeObject *type, PyObject *func) return (PyTaskletObject*)PyObject_CallFunction((PyObject*)type, NULL); } +Py_LOCAL_INLINE(PyObject *) +_get_tasklet_context(PyTaskletObject *self) +{ + PyThreadState *ts = self->cstate->tstate; + PyThreadState *cts = PyThreadState_Get(); + PyObject *ctx; + assert(cts); + + /* Get the context for the tasklet *self. + * If the tasklet has no context, set a new empty one. + */ + if (ts && self == ts->st.current) { + /* the tasklet *self is current */ + ctx = ts->context; + if (NULL == ctx) { + if (ts == cts) { + /* *self belongs to the current thread. Call a C-API function, that + * initializes ts->context as a side effect */ + ctx = PyContext_CopyCurrent(); + if (NULL == ctx) + return NULL; + Py_DECREF(ctx); + ctx = ts->context; + assert(NULL != ctx); + } else { + slp_runtime_error("The tasklet has no context and you can't set one from a foreign thread."); + } + } + } else { + /* the tasklet *self is not current */ + ctx = self->context; + if (NULL == ctx) { + ctx = PyContext_New(); + if (NULL == ctx) + return NULL; + self->context = ctx; + } + } + Py_INCREF(ctx); + return ctx; +} + +Py_LOCAL_INLINE(int) +_tasklet_init_context(PyTaskletObject *task) +{ + PyThreadState *cts = PyThreadState_Get(); + assert(cts); + + PyObject *ctx = _get_tasklet_context(cts->st.current); + if (NULL == ctx) + return -1; + + PyObject *obj = _stackless_tasklet_set_context_impl(task, ctx); + Py_DECREF(ctx); + if (NULL == obj) + return -1; + Py_DECREF(obj); + return 0; +} + static int impl_tasklet_setup(PyTaskletObject *task, PyObject *args, PyObject *kwds, int insert); @@ -357,6 +428,15 @@ PyTasklet_BindEx(PyTaskletObject *task, PyObject *func, PyObject *args, PyObject RUNTIME_ERROR("can't unbind the main tasklet", -1); } + /* + * Set the context to the current context. It can be changed later on. + * But only for non-main tasklest, because tasklet.set_context must not + * be used for a main tasklet. + */ + if (func && !(ts && task == ts->st.main)) + if (_tasklet_init_context(task)) + return -1; + tasklet_clear_frames(task); task->recursion_depth = 0; assert(task->flags.autoschedule == 0); /* probably unused */ @@ -475,6 +555,7 @@ tasklet_new(PyTypeObject *type, PyObject *args, PyObject *kwds) Py_INCREF(Py_None); t->tempval = Py_None; t->tsk_weakreflist = NULL; + t->context = NULL; Py_INCREF(ts->st.initial_stub); t->cstate = ts->st.initial_stub; t->def_globals = PyEval_GetGlobals(); @@ -653,7 +734,7 @@ tasklet_setstate(PyObject *self, PyObject *args) nframes = PyList_GET_SIZE(lis); TASKLET_SETVAL(t, tempval); - /* There is a unpickling race condition. While it is rare, + /* There is an unpickling race condition. While it is rare, * sometimes tasklets get their setstate call after the * channel they are blocked on. If this happens and we * do not account for it, they will be left in a broken @@ -698,6 +779,8 @@ tasklet_setstate(PyObject *self, PyObject *args) back = f; } t->f.frame = f; + if(_tasklet_init_context(t)) + return NULL; } /* walk frames again and calculate recursion_depth */ for (f = t->f.frame; f != NULL; f = f->f_back) { @@ -2197,6 +2280,143 @@ tasklet_set_profile_function(PyTaskletObject *task, PyObject *value) RUNTIME_ERROR("tasklet is not alive", -1); } +/*[clinic input] +_stackless.tasklet.set_context + + context: object(subclass_of='&PyContext_Type') + +Set the context to be used while this tasklet runs. + +Every tasklet has a private context attribute. When the tasklet runs, +this context becomes the current context of the thread. + +This method raises RuntimeError, if the tasklet is bound to a foreign thread and is current or scheduled. +This method raises RuntimeError, if called from within Context.run(). +This method returns the tasklet it is called on. +[clinic start generated code]*/ + +static PyObject * +_stackless_tasklet_set_context_impl(PyTaskletObject *self, PyObject *context) +/*[clinic end generated code: output=23061bb958da0ff9 input=3c29aedc0d51481c]*/ +{ + PyThreadState *ts = self->cstate->tstate; + PyThreadState *cts = PyThreadState_Get(); + PyObject *ctx; + + assert(context); + assert(PyContext_CheckExact(context)); + + if (ts && self == ts->st.current) { + /* the tasklet is the current tasklet. */ + + /* I'm not sure, if setting the context for a current tasklet is really relevant, + * but it can be implemented. Therefore I'm going to implement it. */ + if (ts != cts) + goto fail_other_thread; + + /* Its context is in ts->context */ + ctx = ts->context; + if (ctx && ((PyContext *) ctx)->ctx_entered) + goto fail_ctx_entered; + Py_INCREF(context); + Py_XSETREF(ts->context, context); + ts->context_ver++; + } else { + /* the tasklet is not the current tasklet. Its context is in self->context */ + if (ts != cts && PyTasklet_Scheduled(self) && !self->flags.blocked) + goto fail_other_thread; + ctx = self->context; + if (ctx && ((PyContext *) ctx)->ctx_entered) + goto fail_ctx_entered; + Py_INCREF(context); + Py_XSETREF(self->context, context); + } + Py_INCREF(self); + return (PyObject *) self; +fail_ctx_entered: + return slp_runtime_error("the current context of the tasklet has been entered."); +fail_other_thread: + return slp_runtime_error("tasklet belongs to a different thread"); +} + +/* AFAIK argument clinic currently does not support the signature of context_run(callable, *args, **kwargs). */ +PyDoc_STRVAR(tasklet_context_run__doc__,"context_run(callable, *args, **kwargs)\n\ +\n\ +Execute callable(*args, **kwargs) code in the context object of the tasklet the contest_run method is called on.\n\ +Return the result of the execution or propagate an exception if one occurred."); + +static PyObject * +tasklet_context_run(PyTaskletObject *self, PyObject *const *args, + Py_ssize_t nargs, PyObject *kwnames) +{ + STACKLESS_GETARG(); + PyThreadState *ts = self->cstate->tstate; + PyThreadState *cts = PyThreadState_Get(); + PyObject *ctx; + assert(cts); + + if (nargs < 1) { + PyErr_SetString(PyExc_TypeError, + "run() missing 1 required positional argument"); + return NULL; + } + + ctx = _get_tasklet_context(self); /* returns an new reference */ + + PyObject * saved_context = cts->context; + cts->context = ctx; + cts->context_ver++; + ctx = NULL; + + PyCFrameObject *f = NULL; + if (stackless) { + f = slp_cframe_new(slp_context_run_callback, 1); + if (f == NULL) { + Py_XSETREF(cts->context, saved_context); + return NULL; + } + f->i = 1; + Py_XINCREF(saved_context); + f->ob1 = saved_context; + SLP_SET_CURRENT_FRAME(ts, (PyFrameObject *)f); + /* f contains the only counted reference to current frame. This reference + * keeps the fame alive during the following _PyObject_FastCallKeywords(). + */ + } + STACKLESS_PROMOTE_ALL(); + PyObject *call_result = _PyObject_FastCallKeywords( + args[0], args + 1, nargs - 1, kwnames); + STACKLESS_ASSERT(); + + if (stackless && !STACKLESS_UNWINDING(call_result)) { + /* required, because we added a C-frame */ + assert(f); + assert((PyFrameObject *)f == SLP_CURRENT_FRAME(ts)); + SLP_STORE_NEXT_FRAME(ts, (PyFrameObject *)f); + Py_DECREF(f); + Py_XDECREF(saved_context); + return STACKLESS_PACK(ts, call_result); + } + Py_XDECREF(f); + if (STACKLESS_UNWINDING(call_result)) { + Py_XDECREF(saved_context); + return call_result; + } + Py_XSETREF(cts->context, saved_context); + cts->context_ver++; + return call_result; +} + +static PyObject * +tasklet_context_id(PyTaskletObject *self) +{ + PyObject *ctx = _get_tasklet_context(self); + PyObject *result = PyLong_FromVoidPtr(ctx); + Py_DECREF(ctx); + return result; +} + + static PyMemberDef tasklet_members[] = { {"cstate", T_OBJECT, offsetof(PyTaskletObject, cstate), READONLY, PyDoc_STR("the C stack object associated with the tasklet.\n\ @@ -2296,6 +2516,9 @@ static PyGetSetDef tasklet_getsetlist[] = { "For the current tasklet this property is equivalent to sys.gettrace()\n" "and sys.settrace().")}, + {"context_id", (getter)tasklet_context_id, NULL, + PyDoc_STR("The id of the context object of this tasklet.")}, + {0}, }; @@ -2335,6 +2558,9 @@ static PyMethodDef tasklet_methods[] = { tasklet_setstate__doc__}, {"bind_thread", (PCF)tasklet_bind_thread, METH_VARARGS, tasklet_bind_thread__doc__}, + {"context_run", (PCF)tasklet_context_run, METH_FASTCALL | METH_KEYWORDS | METH_STACKLESS, + tasklet_context_run__doc__}, + _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF {NULL, NULL} /* sentinel */ }; diff --git a/Stackless/unittests/test_miscell.py b/Stackless/unittests/test_miscell.py index d697ef7cd7ad47..dcf1d8e5f84791 100644 --- a/Stackless/unittests/test_miscell.py +++ b/Stackless/unittests/test_miscell.py @@ -12,6 +12,7 @@ import os import struct import gc +import contextvars from stackless import _test_nostacklesscall as apply_not_stackless import _teststackless @@ -1330,6 +1331,428 @@ def task(): t.kill() +class TestTaskletContext(AsTaskletTestCase): + cvar = contextvars.ContextVar('TestTaskletContext', default='unset') + + def test_contexct_id(self): + ct = stackless.current + + # prepare a context + sentinel = object() + self.cvar.set(sentinel) # make sure, a context is active + self.assertIsInstance(ct.context_id, int) + + # no context + t = stackless.tasklet() # new tasklet without context + self.assertEqual(t.context_run(self.cvar.get), "unset") + + # known context + ctx = contextvars.Context() + self.assertNotEqual(id(ctx), ct.context_id) + self.assertIs(ctx.run(stackless.getcurrent), ct) + cid = ctx.run(getattr, ct, "context_id") + self.assertEqual(id(ctx), cid) + + def test_set_context_same_thread_not_current(self): + # same thread, tasklet is not current + + # prepare a context + ctx = contextvars.Context() + sentinel = object() + ctx.run(self.cvar.set, sentinel) + + tasklet_started = False + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + + t.bind(task)() + + # set the context + t.set_context(ctx) + + # validate the context + stackless.run() + self.assertTrue(tasklet_started) + + def test_set_context_same_thread_not_current_entered(self): + # prepare a context + sentinel = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel) + tasklet_started = False + + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + stackless.schedule_remove() + self.assertEqual(t.context_id, id(ctx)) + + t.bind(ctx.run)(task) + stackless.run() + self.assertTrue(tasklet_started) + self.assertTrue(t.alive) + self.assertIsNot(t, stackless.current) + self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered", + t.set_context, contextvars.Context()) + t.insert() + stackless.run() + + def test_set_context_same_thread_current(self): + # same thread, current tasklet + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + self.assertIsInstance(stackless.current.context_id, int) + + # prepare another context + sentinel2 = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel2) + self.assertNotEqual(stackless.current.context_id, id(ctx)) + + # change the context of the current tasklet + stackless.current.set_context(ctx) + + # check that the new context is the context of the current tasklet + self.assertEqual(stackless.current.context_id, id(ctx)) + self.assertIs(self.cvar.get(), sentinel2) + + def test_set_context_same_thread_current_entered(self): + # entered current context on the same thread + sentinel = object() + self.cvar.set(sentinel) + cid = stackless.current.context_id + self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered", + contextvars.Context().run, stackless.current.set_context, contextvars.Context()) + self.assertEqual(stackless.current.context_id, cid) + self.assertIs(self.cvar.get(), sentinel) + + def test_set_context_other_thread_paused(self): + # other thread, tasklet is not current + # prepare a context + ctx = contextvars.Context() + sentinel = object() + ctx.run(self.cvar.set, sentinel) + + tasklet_started = False + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + + t.bind(task, ()) # paused + + # set the context + thr = threading.Thread(target=t.set_context, args=(ctx,), name="other thread") + thr.start() + thr.join() + + # validate the context + t.insert() + stackless.run() + self.assertTrue(tasklet_started) + + def test_set_context_other_thread_scheduled(self): + # other thread, tasklet is not current + # prepare a context + ctx = contextvars.Context() + sentinel = object() + ctx.run(self.cvar.set, sentinel) + + tasklet_started = False + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + + t.bind(task)() # scheduled + t.set_context(ctx) + + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + + # validate the result + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "tasklet belongs to a different thread"): + raise got_exception + + stackless.run() + self.assertTrue(tasklet_started) + + def test_set_context_other_thread_not_current_entered(self): + # prepare a context + sentinel = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel) + tasklet_started = False + + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + stackless.schedule_remove() + self.assertEqual(t.context_id, id(ctx)) + + t.bind(ctx.run)(task) + stackless.run() + self.assertTrue(tasklet_started) + self.assertTrue(t.alive) + self.assertIsNot(t, stackless.current) + + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + + # validate the result + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered"): + raise got_exception + t.insert() + stackless.run() + + def test_set_context_other_thread_current(self): + # other thread, current tasklet + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + cid = stackless.current.context_id + + t = stackless.current + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "tasklet belongs to a different thread"): + raise got_exception + self.assertEqual(stackless.current.context_id, cid) + + def test_set_context_other_thread_current_entered(self): + # entered current context on other thread + + t = stackless.current + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + + thr = threading.Thread(target=other_thread, name="other thread") + + # prepare a context + sentinel = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel) + + def in_ctx(): + self.assertEqual(stackless.current.context_id, id(ctx)) + thr.start() + thr.join() + self.assertEqual(stackless.current.context_id, id(ctx)) + + ctx.run(in_ctx) + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "tasklet belongs to a different thread"): + raise got_exception + + + def test_context_init_null_main(self): + # test the set_context in tasklet.bind, if the current context is NULL in main tasklet + + cid = None + t = stackless.tasklet() + + def other_thread(): + nonlocal cid + self.assertIs(stackless.current, stackless.main) + + # this creates a context, because all tasklets initialized by the main-tasklet + # shall share a common context + t.bind(lambda: None) + cid = stackless.current.context_id + + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + self.assertIsInstance(cid, int) + self.assertEqual(t.context_id, cid) + + def test_context_init_main(self): + # test the set_context in tasklet.bind, in main tasklet + + sentinel = object() + cid = None + t = stackless.tasklet() + + def other_thread(): + nonlocal cid + self.cvar.set(sentinel) + self.assertIs(stackless.current, stackless.main) + cid = stackless.current.context_id + self.assertIsInstance(cid, int) + t.bind(lambda: None) + self.assertEqual(stackless.current.context_id, cid) + + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + self.assertEqual(t.context_id, cid) + + def test_context_init_nonmain(self): + # test the set_context in tasklet.bind + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + + cid = stackless.current.context_id + t = stackless.tasklet() + + # this creates + t.bind(lambda: None) + + self.assertEqual(stackless.current.context_id, cid) + self.assertEqual(t.context_id, cid) + + @staticmethod + def _test_context_setstate_alive_task(): + stackless.schedule_remove(100) + return 200 + + def test_context_setstate_alive(self): + # prepare a state of a half executed tasklet + t = stackless.tasklet(self._test_context_setstate_alive_task)() + stackless.run() + self.assertEqual(t.tempval, 100) + self.assertTrue(t.paused) + + state = t.__reduce__()[2] + for i, fw in enumerate(state[3]): + frame_factory, frame_args, frame_state = fw.__reduce__() + state[3][i] = frame_factory(*frame_args) + state[3][i].__setstate__(frame_state) + # from pprint import pprint ; pprint(state) + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + + cid = stackless.current.context_id + t = stackless.tasklet() + + # this creates + t.__setstate__(state) + + self.assertTrue(t.alive) + self.assertEqual(stackless.current.context_id, cid) + self.assertEqual(t.context_id, cid) + t.bind(None) + + def test_context_setstate_notalive(self): + # prepare a state of a new tasklet + state = stackless.tasklet().__reduce__()[2] + self.assertEqual(state[3], []) # no frames + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + + cid = stackless.current.context_id + t = stackless.tasklet() + + # this creates + t.__setstate__(state) + + self.assertFalse(t.alive) + self.assertEqual(stackless.current.context_id, cid) + self.assertEqual(t.context_run(self.cvar.get), "unset") + t.bind(None) + + def test_context_run_no_context(self): + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + cid0 = stackless.current.context_id + + t = stackless.tasklet() + def get_cid(): + self.assertEqual(self.cvar.get(), "unset") + return stackless.current.context_id + + cid = t.context_run(get_cid) + + self.assertEqual(stackless.current.context_id, cid0) + self.assertIsInstance(cid, int) + self.assertNotEqual(cid0, cid) + self.assertEqual(t.context_id, cid) + + def test_context_run(self): + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + cid0 = stackless.current.context_id + + t = stackless.tasklet() + ctx = contextvars.Context() + t.set_context(ctx) + + def get_cid(): + self.assertEqual(self.cvar.get(), "unset") + return stackless.current.context_id + + cid = t.context_run(get_cid) + + self.assertEqual(stackless.current.context_id, cid0) + self.assertIsInstance(cid, int) + self.assertNotEqual(cid0, cid) + self.assertEqual(id(ctx), cid) + + #/////////////////////////////////////////////////////////////////////////////// if __name__ == '__main__': From 4148ec3c47ab539c041c33b721d4adafac27c5c3 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Wed, 5 May 2021 14:17:33 +0200 Subject: [PATCH 3/9] Stackless issue #239: add support for PEP 567 context variables Add/improve pickling of the context of tasklets. New pickle flag "PICKLEFLAGS_PICKLE_CONTEXT", new undocumented function stackless._tasklet_get_unpicklable_state() --- Doc/library/stackless/pickling.rst | 13 +- Doc/library/stackless/stackless.rst | 15 ++ Doc/library/stackless/tasklets.rst | 20 ++- Include/internal/slp_prickelpit.h | 3 +- Lib/stackless.py | 32 +++++ Stackless/changelog.txt | 7 +- Stackless/core/cframeobject.c | 26 +++- Stackless/module/taskletobject.c | 32 ++++- Stackless/unittests/support.py | 65 +++++++-- Stackless/unittests/test_pickle.py | 204 ++++++++++++++++++++++++++++ 10 files changed, 391 insertions(+), 26 deletions(-) diff --git a/Doc/library/stackless/pickling.rst b/Doc/library/stackless/pickling.rst index a746ce05d63e59..4216493a5e9f4c 100644 --- a/Doc/library/stackless/pickling.rst +++ b/Doc/library/stackless/pickling.rst @@ -114,10 +114,17 @@ different address than *t1*, which was displayed earlier. because :class:`~contextvars.Context` objects can't be pickled. See :pep:`567` for an explanation. - It is however possible to create a subclass of :class:`tasklet` and - overload the methods :meth:`tasklet.__reduce_ex__` and :meth:`tasklet.__setstate__` to + It is sometimes possible enable pickling of :class:`~contextvars.Context` objects + in an application specific way (see for instance: :func:`copyreg.pickle` or + :attr:`pickle.Pickler.dispatch_table` or :attr:`pickle.Pickler.persistent_id`). + Such an application can set the pickle flag + :const:`~stackless.PICKLEFLAGS_PICKLE_CONTEXT` to include the + context in the pickled state of a tasklet. + + Another option is to subclass :class:`tasklet` and overload the methods + :meth:`tasklet.__reduce_ex__` and :meth:`tasklet.__setstate__` to pickle the values of particular :class:`~contextvars.ContextVar` objects together - with a tasklet. + with the tasklet. ====================== diff --git a/Doc/library/stackless/stackless.rst b/Doc/library/stackless/stackless.rst index 5ed2a97160c029..9ef12b1f343574 100644 --- a/Doc/library/stackless/stackless.rst +++ b/Doc/library/stackless/stackless.rst @@ -51,6 +51,21 @@ Constants These constants have been added on a provisional basis (see :pep:`411` for details.) +.. data:: PICKLEFLAGS_PICKLE_CONTEXT + + This constant defines an option flag for the function + :func:`pickle_flags`. + + If this flag is set, |SLP| assumes that a :class:`~contextvars.Context` object + is pickleable. As a consequence the state information returned by :meth:`tasklet.__reduce_ex__` + includes the context of the tasklet. + + .. versionadded:: 3.7.6 + + .. note:: + This constant has been added on a provisional basis (see :pep:`411` + for details.) + --------- Functions --------- diff --git a/Doc/library/stackless/tasklets.rst b/Doc/library/stackless/tasklets.rst index 2f8661d5b3888a..85b783c627487a 100644 --- a/Doc/library/stackless/tasklets.rst +++ b/Doc/library/stackless/tasklets.rst @@ -373,6 +373,10 @@ The ``tasklet`` class of the tasklet they are called on to the context of the current tasklet. Therefore it is usually not required to set the context explicitly. + .. note:: + This method has been added on a provisional basis (see :pep:`411` + for details.) + .. method:: tasklet.context_run(callable, \*args, \*\*kwargs) .. versionadded:: 3.7.6 @@ -392,6 +396,10 @@ The ``tasklet`` class See also :meth:`contextvars.Context.run` for additional information. + .. note:: + This method has been added on a provisional basis (see :pep:`411` + for details.) + .. method:: tasklet.__del__() .. versionadded:: 3.7 @@ -415,8 +423,9 @@ The ``tasklet`` class .. versionadded:: 3.7.6 - If the tasklet becomes alive through this - call, the :meth:`~__setstate__` also sets the :class:`~contextvars.Context` object of the + If the tasklet becomes alive through this call and if *state* does not contain + a :class:`~contextvars.Context` object, then :meth:`~__setstate__` also sets + the :class:`~contextvars.Context` object of the tasklet to the :class:`~contextvars.Context` object of the current tasklet. :param state: the state as given by ``__reduce_ex__(...)[2]`` @@ -483,6 +492,10 @@ The following attributes allow checking of user set situations: This attribute is the :func:`id` of the :class:`~contextvars.Context` object to be used while this tasklet runs. + .. note:: + This attribute has been added on a provisional basis (see :pep:`411` + for details.) + The following attributes allow identification of tasklet place: @@ -627,6 +640,9 @@ Now each tasklet object has a private context attribute, which is either undefin :c:func:`PyContext_Enter` and :c:func:`PyContext_Exit` make it impossible to replace the current context within the execution of the method :meth:`contextvars.Context.run`. In that case |SLP| raises :exc:`RuntimeError`. +.. note:: + Context support has been added on a provisional basis (see :pep:`411` for details.) + .. rubric:: Footnotes .. [#f1] Setting a context variable to a non default value sets a previously undefined diff --git a/Include/internal/slp_prickelpit.h b/Include/internal/slp_prickelpit.h index f98665b1c5f505..3cef7e64b9591b 100644 --- a/Include/internal/slp_prickelpit.h +++ b/Include/internal/slp_prickelpit.h @@ -49,7 +49,8 @@ Py_ssize_t slp_from_tuple_with_nulls(PyObject **start, PyObject *tup); #define SLP_PICKLEFLAGS_PRESERVE_TRACING_STATE (1U) #define SLP_PICKLEFLAGS_PRESERVE_AG_FINALIZER (1U<<1) #define SLP_PICKLEFLAGS_RESET_AG_FINALIZER (1U<<2) -#define SLP_PICKLEFLAGS__MAX_VALUE ((1<<3)-1) /* must be a signed value */ +#define SLP_PICKLEFLAGS_PICKLE_CONTEXT (1U<<3) +#define SLP_PICKLEFLAGS__MAX_VALUE ((1<<4)-1) /* must be a signed value */ /* helper functions for module dicts */ diff --git a/Lib/stackless.py b/Lib/stackless.py index f8fe2f3468597b..206125103b7393 100644 --- a/Lib/stackless.py +++ b/Lib/stackless.py @@ -24,6 +24,7 @@ def __reduce_ex__(*args): PICKLEFLAGS_PRESERVE_TRACING_STATE = 1 PICKLEFLAGS_PRESERVE_AG_FINALIZER = 2 PICKLEFLAGS_RESET_AG_FINALIZER = 4 +PICKLEFLAGS_PICKLE_CONTEXT = 8 # Backwards support for unpickling older pickles, even from 2.7 from _stackless import _wrap @@ -72,6 +73,37 @@ def __iter__(self): # expressions like "stackless.current" as well defined. current = runcount = main = debug = uncollectables = threads = pickle_with_tracing_state = None +def _tasklet_get_unpicklable_state(tasklet): + """Get a dict with additional state, that can't be pickled + + The method tasklet.__reduce_ex__() returns the picklable state and this + function returns a tuple containing the rest. + + The items in the return value are: + 'context': the context of the tasklet + + Additional items may be added later. + + Note: this function has been added on a provisional basis (see :pep:`411` for details.) + """ + if not isinstance(tasklet, _stackless.tasklet): + raise TypeError("Argument must be a tasklet") + + with atomic(): + if tasklet.is_current or tasklet.thread_id != _stackless.current.thread_id: + # - A current tasklet can't be reduced. + # - We can't set pickle_flags for a foreign thread + # To mitigate these problems, we copy the context to a new tasklet + # (implicit copy by tasklet.__init__(callable, ...)) and reduce the new + # context instead + tasklet = tasklet.context_run(_stackless.tasklet, id) # "id" is just an arbitrary callable + + flags = pickle_flags(PICKLEFLAGS_PICKLE_CONTEXT, PICKLEFLAGS_PICKLE_CONTEXT) + try: + return {'context': tasklet.__reduce__()[2][8]} + finally: + pickle_flags(flags, PICKLEFLAGS_PICKLE_CONTEXT) + def transmogrify(): """ this function creates a subclass of the ModuleType with properties. diff --git a/Stackless/changelog.txt b/Stackless/changelog.txt index e49c658b718964..b2bf92a565c6d3 100644 --- a/Stackless/changelog.txt +++ b/Stackless/changelog.txt @@ -13,8 +13,11 @@ What's New in Stackless 3.X.X? Add support for PEP 567 context variables to tasklets. Each tasklet now has a contextvars.Context object, that becomes active during the execution of the tasklet. - New tasklet methods set_context() and context_run(). - New read only attribute tasklet.context_id. + New tasklet methods "set_context()" and "context_run()". + New read only attribute "tasklet.context_id". + New constant "stackless.PICKLEFLAGS_PICKLE_CONTEXT". + And an intentionally undocumented function + "stackless._tasklet_get_unpicklable_state()" - https://github.com/stackless-dev/stackless/issues/245 Prevent a crash, if you call tasklet.__init__() or tasklet.bind() with wrong diff --git a/Stackless/core/cframeobject.c b/Stackless/core/cframeobject.c index 913dd1b472af21..3591f176a27b11 100644 --- a/Stackless/core/cframeobject.c +++ b/Stackless/core/cframeobject.c @@ -30,6 +30,7 @@ #ifdef STACKLESS #include "internal/stackless_impl.h" #include "internal/slp_prickelpit.h" +#include "internal/context.h" static PyCFrameObject *free_list = NULL; static int numfree = 0; /* number of cframes currently in free_list */ @@ -136,6 +137,8 @@ cframe_reduce(PyCFrameObject *cf) PyObject *res = NULL, *exec_name = NULL; PyObject *params = NULL; int valid = 1; + PyObject *obs[3]; + long i, n; if (cf->f_execute == execute_soft_switchable_func) { exec_name = (PyObject *) cf->any2; @@ -146,7 +149,24 @@ cframe_reduce(PyCFrameObject *cf) } else if ((exec_name = slp_find_execname((PyFrameObject *) cf, &valid)) == NULL) return NULL; - params = slp_into_tuple_with_nulls(&cf->ob1, 3); + obs[0] = cf->ob1; + obs[1] = cf->ob2; + obs[2] = cf->ob3; + i = cf->i; + n = cf->n; + + if (cf->f_execute == slp_context_run_callback && 0 == i) { + /* + * Replace a logical PyContext_Exit(context) with the equivalent + * stackless.current.set_context(). + */ + assert(PyContext_CheckExact(obs[0])); + assert(((PyContext *) obs[0])->ctx_entered); + obs[0] = (PyObject *)((PyContext *) obs[0])->ctx_prev; + i = 1; + } + + params = slp_into_tuple_with_nulls(obs, 3); if (params == NULL) goto err_exit; res = Py_BuildValue ("(O()(" cframetuplefmt "))", @@ -154,8 +174,8 @@ cframe_reduce(PyCFrameObject *cf) valid, exec_name, params, - cf->i, - cf->n); + i, + n); err_exit: Py_XDECREF(exec_name); diff --git a/Stackless/module/taskletobject.c b/Stackless/module/taskletobject.c index c17504a81c1f64..4244a3d152a683 100644 --- a/Stackless/module/taskletobject.c +++ b/Stackless/module/taskletobject.c @@ -617,6 +617,7 @@ tasklet_reduce(PyTaskletObject * t) PyFrameObject *f; PyThreadState *ts = t->cstate->tstate; PyObject *exc_type, *exc_value, *exc_traceback, *exc_info; + PyObject *context = NULL; if (ts && t == ts->st.current) RUNTIME_ERROR("You cannot __reduce__ the tasklet which is" @@ -649,6 +650,10 @@ tasklet_reduce(PyTaskletObject * t) goto err_exit; } + context = _get_tasklet_context(t); + if (NULL == context) + goto err_exit; + assert(!ts || t->exc_info != &ts->exc_state); /* Because of the test a few lines above, it is guaranteed that t is not the current tasklet. * Therefore we can simplify the line @@ -677,7 +682,8 @@ tasklet_reduce(PyTaskletObject * t) Py_INCREF(exc_type); Py_INCREF(exc_value); Py_INCREF(exc_traceback); - tup = Py_BuildValue("(O()(" TASKLET_TUPLEFMT "))", + tup = Py_BuildValue((ts && (ts->st.pickleflags & SLP_PICKLEFLAGS_PICKLE_CONTEXT)) ? + "(O()(" TASKLET_TUPLEFMT "O))" : "(O()(" TASKLET_TUPLEFMT "))", Py_TYPE(t), tasklet_flags_as_integer(t->flags), t->tempval, @@ -686,7 +692,8 @@ tasklet_reduce(PyTaskletObject * t) exc_type, exc_value, exc_traceback, - exc_info + exc_info, + context ); Py_DECREF(exc_info); Py_DECREF(exc_type); @@ -694,6 +701,7 @@ tasklet_reduce(PyTaskletObject * t) Py_DECREF(exc_traceback); err_exit: Py_XDECREF(lis); + Py_XDECREF(context); return tup; } @@ -715,6 +723,7 @@ tasklet_setstate(PyObject *self, PyObject *args) PyObject *exc_type, *exc_value, *exc_traceback; PyObject *old_type, *old_value, *old_traceback; PyObject *exc_info_obj; + PyObject *context = NULL; PyFrameObject *f; Py_ssize_t i, nframes; int j; @@ -724,7 +733,7 @@ tasklet_setstate(PyObject *self, PyObject *args) if (PyTasklet_Alive(t)) RUNTIME_ERROR("tasklet is alive", NULL); - if (!PyArg_ParseTuple(args, "iOiO!OOOO:tasklet", + if (!PyArg_ParseTuple(args, "iOiO!OOOO|O:tasklet", &flags, &tempval, &nesting_level, @@ -732,9 +741,15 @@ tasklet_setstate(PyObject *self, PyObject *args) &exc_type, &exc_value, &exc_traceback, - &exc_info_obj)) + &exc_info_obj, + &context)) return NULL; + if (Py_None == context) + context = NULL; + if (context != NULL && !PyContext_CheckExact(context)) + TYPE_ERROR("tasklet state[8] must be a contextvars.Context or None", NULL); + nframes = PyList_GET_SIZE(lis); TASKLET_SETVAL(t, tempval); @@ -783,9 +798,16 @@ tasklet_setstate(PyObject *self, PyObject *args) back = f; } t->f.frame = f; - if(_tasklet_init_context(t)) + if(NULL == context && _tasklet_init_context(t)) return NULL; } + if (context) { + PyObject *obj = _stackless_tasklet_set_context_impl(t, context); + if (NULL == obj) + return NULL; + Py_DECREF(obj); + } + /* walk frames again and calculate recursion_depth */ for (f = t->f.frame; f != NULL; f = f->f_back) { if (PyFrame_Check(f) && f->f_execute != PyEval_EvalFrameEx_slp) { diff --git a/Stackless/unittests/support.py b/Stackless/unittests/support.py index 826f1ad5e488aa..6000b20ddaec2a 100644 --- a/Stackless/unittests/support.py +++ b/Stackless/unittests/support.py @@ -34,6 +34,7 @@ import gc import os import functools +import copyreg from test.support import run_unittest # emit warnings about uncollectable objects @@ -674,21 +675,65 @@ def tasklet_is_uncollectable(self, tlet): self.assertIsInstance(tlet, stackless.tasklet) self.__uncollectable_tasklets.append(id(tlet)) - def dumps(self, obj, protocol=None, *, fix_imports=True): - if protocol is None: - protocol = self._pickle_protocol + def dumps(self, obj, protocol=None, *, fix_imports=True, external_map=None, additional_dispatch_table=None): if self._pickle_module == "P": - return pickle._dumps(obj, protocol=protocol, fix_imports=fix_imports) + cls = pickle._Pickler elif self._pickle_module == "C": - return pickle.dumps(obj, protocol=protocol, fix_imports=fix_imports) - raise ValueError("Invalid pickle module") + cls = pickle.Pickler + else: + raise ValueError("Invalid pickle module") + + if external_map is not None: + inverted_external_map = {id(v): k for k,v in external_map.items()} + class PicklerWithExternalObjects(cls): + def persistent_id(self, obj): + if external_map: + return inverted_external_map.get(id(obj)) + return None + cls = PicklerWithExternalObjects - def loads(self, s, *, fix_imports=True, encoding="ASCII", errors="strict"): + if protocol is None: + protocol = self._pickle_protocol + + f = io.BytesIO() + p=cls(f, protocol, fix_imports=fix_imports) + if additional_dispatch_table is not None: + dispatch_table = copyreg.dispatch_table.copy() + dispatch_table.update(additional_dispatch_table) + p.dispatch_table = dispatch_table + p.dump(obj) + res = f.getvalue() + assert isinstance(res, (bytes, bytearray)) + return res + + def loads(self, s, *, fix_imports=True, encoding="ASCII", errors="strict", external_map=None): + if isinstance(s, str): + raise TypeError("Can't load pickle from unicode string") if self._pickle_module == "P": - return pickle._loads(s, fix_imports=fix_imports, encoding=encoding, errors=errors) + cls = pickle._Unpickler elif self._pickle_module == "C": - return pickle.loads(s, fix_imports=fix_imports, encoding=encoding, errors=errors) - raise ValueError("Invalid pickle module") + cls = pickle.Unpickler + else: + raise ValueError("Invalid pickle module") + + if external_map is not None: + class UnPicklerWithExternalObjects(cls): + def persistent_load(self, pid): + # This method is invoked whenever a persistent ID is encountered. + # Here, pid is the tuple returned by DBPickler. + if external_map is None: + raise pickle.UnpicklingError("external_map not set") + try: + return external_map[pid] + except KeyError: + # Always raises an error if you cannot return the correct object. + # Otherwise, the unpickler will think None is the object referenced + # by the persistent ID. + raise pickle.UnpicklingError("unsupported persistent object") + cls = UnPicklerWithExternalObjects + + file = io.BytesIO(s) + return cls(file, fix_imports=fix_imports, encoding=encoding, errors=errors).load() # limited pickling support for test cases # Between setUp() and tearDown() the test-case has a diff --git a/Stackless/unittests/test_pickle.py b/Stackless/unittests/test_pickle.py index 0733e324f1bc56..51dd427a88eaac 100644 --- a/Stackless/unittests/test_pickle.py +++ b/Stackless/unittests/test_pickle.py @@ -6,6 +6,8 @@ import copy import contextlib import threading +import contextvars +import ctypes import stackless from stackless import schedule, tasklet @@ -926,6 +928,208 @@ def test_pickling1(self): self.assertTupleEqual(self.inspect(agt), ((e, ), 1, True)) +class TestContextRunCallbackPickling(StacklessTestCase): + # Actually we test cframe.__reduce__ for cframes with exec function "context_run_callback" + # We can't pickle a contextvars.Context object. + + # declare int PyContext_Exit(PyObject *octx) + PyContext_Exit = ctypes.pythonapi.PyContext_Exit + PyContext_Exit.argtypes = [ctypes.py_object] + PyContext_Exit.restype = ctypes.c_int + + # make sure, a context exists and contains an var whose value can't be pickled + cvar = contextvars.ContextVar("TestContextPickling.cvar", default="unset") + cvar.set(object()) + + @classmethod + def task(cls): + v = cls.cvar.get() + v = stackless.schedule_remove(v) + cls.cvar.set(v) + + def _test_pickle_in_context_run(self, ctx1): + # test cframe.__reduce__ for "context_run_callback" called in Context.run + ctx2 = contextvars.Context() + t = stackless.tasklet(ctx2.run)(self.task) + + stackless.run() + self.assertEqual(t.tempval, "unset") + self.assertTrue(t.alive) + self.assertTrue(t.paused) + if is_soft(): + self.assertTrue(t.restorable) + + # now ctx2 must be in state entered + self.assertRaisesRegex(RuntimeError, "cannot enter context:.*is already entered", ctx2.run, lambda: None) + frames = t.__reduce__()[2][3] + self.assertIsInstance(frames, list) + + for f in frames: + if not isinstance(f, stackless.cframe): + continue + valid, exec_name, params, i, n = f.__reduce__()[2] + if exec_name != 'context_run_callback': + continue + self.assertEqual(valid, 1) + self.assertEqual(f.n, 0) + self.assertEqual(n, 0) + # now check, that context to switch to is in the state + # the original cframe has the currently active context + self.assertIs(f.ob1, ctx2) + self.assertTupleEqual(params, ((1, 2), ctx1, None, None)) + self.assertEqual(f.i, 0) + self.assertEqual(i, 1) + break + else: + self.assertFalse(is_soft(), "no cframe 'context_run_callback'") + + if is_soft(): + t.bind(None) + self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered", t.set_context, contextvars.Context()) + t.context_run(self.PyContext_Exit, ctx2) + t.set_context(contextvars.Context()) + + def test_pickle_in_context_run(self): + # test cframe.__reduce__ for "context_run_callback" called in Context.run + ctx1 = contextvars.copy_context() + ctx1.run(self._test_pickle_in_context_run, ctx1) + + def _test_pickle_in_tasklet_context_run(self, ctx1): + # test cframe.__reduce__ for "context_run_callback" called in tasklet.context_run + ctx2 = contextvars.Context() + t_run = stackless.tasklet().set_context(ctx2) + t = stackless.tasklet(t_run.context_run)(self.task) + + stackless.run() + self.assertEqual(t.tempval, "unset") + self.assertTrue(t.alive) + self.assertTrue(t.paused) + if is_soft(): + self.assertTrue(t.restorable) + + # now ctx2 must not be in state entered + ctx2.run(lambda:None) + frames = t.__reduce__()[2][3] + self.assertIsInstance(frames, list) + + for f in frames: + if not isinstance(f, stackless.cframe): + continue + valid, exec_name, params, i, n = f.__reduce__()[2] + if exec_name != 'context_run_callback': + continue + self.assertEqual(valid, 1) + self.assertEqual(f.n, 0) + self.assertEqual(n, 0) + # now check, that context to switch to is in the state + # the original cframe has the currently active context + self.assertIs(f.ob1, ctx1) + self.assertTupleEqual(params, ((1, 2), ctx1, None, None)) + self.assertEqual(f.i, 1) + self.assertEqual(i, 1) + break + else: + self.assertFalse(is_soft(), "no cframe 'context_run_callback'") + + if is_soft(): + t.bind(None) + t.set_context(contextvars.Context()) + + def test_pickle_in_tasklet_context_run(self): + # test cframe.__reduce__ for "context_run_callback" called in tasklet.context_run + ctx1 = contextvars.copy_context() + ctx1.run(self._test_pickle_in_tasklet_context_run, ctx1) + + def test_tasklet_get_unpicklable_state(self): + cid = stackless.current.context_id + ctx = stackless._tasklet_get_unpicklable_state(stackless.current)['context'] + self.assertEqual(stackless.current.context_id, cid) + self.assertIsInstance(ctx, contextvars.Context) + self.assertEqual(id(ctx), cid) + + +class TestContextPickling(StacklessPickleTestCase): + # We can't pickle a contextvars.Context object. + + # declare int PyContext_Exit(PyObject *octx) + PyContext_Exit = ctypes.pythonapi.PyContext_Exit + PyContext_Exit.argtypes = [ctypes.py_object] + PyContext_Exit.restype = ctypes.c_int + + # make sure, a context exists and contains an var whose value can't be pickled + cvar = contextvars.ContextVar("TestContextPickling.cvar", default="unset") + sentinel = object() + cvar.set(sentinel) + + def setUp(self): + super().setUp() + stackless.pickle_flags(stackless.PICKLEFLAGS_PICKLE_CONTEXT, stackless.PICKLEFLAGS_PICKLE_CONTEXT) + + def tearDown(self): + super().tearDown() + stackless.pickle_flags(0, stackless.PICKLEFLAGS_PICKLE_CONTEXT) + + @classmethod + def task(cls, arg): + v = cls.cvar.get() + cls.cvar.set(arg) + v = stackless.schedule_remove(v) + cls.cvar.set(v) + + def test_pickle_tasklet_with_context(self): + ctx = contextvars.copy_context() + t = stackless.tasklet(self.task)("changed").set_context(ctx) + stackless.run() + self.assertTrue(t.alive) + self.assertIs(t.tempval, self.sentinel) + self.assertEqual(ctx[self.cvar], "changed") + + external_map = {"context ctx": ctx} + + p = self.dumps(t, external_map = external_map) + # import pickletools; pickletools.dis(pickletools.optimize(p)) + t.kill() + + t = self.loads(p, external_map = external_map) + self.assertEqual(t.context_id, id(ctx)) + self.assertTrue(t.alive) + t.insert() + t.tempval = "after unpickling" + if is_soft(): + stackless.run() + self.assertFalse(t.alive) + self.assertEqual(ctx[self.cvar], "after unpickling") + + def test_pickle_tasklet_in_tasklet_context_run(self): + ctx1 = contextvars.copy_context() + ctx2 = contextvars.Context() + t = stackless.tasklet(stackless.tasklet().set_context(ctx2).context_run)(self.task, "changed").set_context(ctx1) + self.assertEqual(t.context_id, id(ctx1)) + stackless.run() + self.assertTrue(t.alive) + self.assertIs(ctx1[self.cvar], self.sentinel) + self.assertEqual(t.tempval, "unset") + self.assertEqual(ctx2[self.cvar], "changed") + + external_map = {"context ctx1": ctx1, "context ctx2": ctx2,} + + p = self.dumps(t, external_map = external_map) + # import pickletools; pickletools.dis(pickletools.optimize(p)) + t.kill() + + t = self.loads(p, external_map = external_map) + self.assertEqual(t.context_id, id(ctx2)) + self.assertTrue(t.alive) + t.insert() + t.tempval = "after unpickling" + if is_soft(): + stackless.run() + self.assertFalse(t.alive) + self.assertEqual(t.context_id, id(ctx1)) + self.assertEqual(ctx2[self.cvar], "after unpickling") + self.assertIs(ctx1[self.cvar], self.sentinel) + + class TestCopy(StacklessTestCase): ITERATOR_TYPE = type(iter("abc")) From 266ee33a3b2beb182e8ba3160ba5a6ad8e65407b Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Wed, 5 May 2021 23:37:28 +0200 Subject: [PATCH 4/9] Stackless issue #239: fix a typo in the documentation --- Doc/library/stackless/tasklets.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/stackless/tasklets.rst b/Doc/library/stackless/tasklets.rst index 85b783c627487a..b9dc97f30598ce 100644 --- a/Doc/library/stackless/tasklets.rst +++ b/Doc/library/stackless/tasklets.rst @@ -616,7 +616,7 @@ Now each tasklet object has a private context attribute, which is either undefin is the context of the :attr:`~stackless.current` tasklet. This implies that a tasklet switch, switches the active context of the thread. -* In accordance with the design decisions made in :pep:`576` the context of a tasklet can't be +* In accordance with the design decisions made in :pep:`567` the context of a tasklet can't be accessed directly, but you can use the method :meth:`tasklet.context_run` to run arbitrary code in this context. For instance ``tasklet.context_run(contextvars.copy_context())`` returns a copy of the context. From c02eb3e8ddcad06cd436aa0b86e752e2d62f99b5 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Sun, 9 May 2021 00:06:33 +0200 Subject: [PATCH 5/9] Stackless issue #239: fix issues found during review - better documentation of the concept - fix a left-over from the time, when setting the context of a main- tasklet was not supported - fix the context handling, when the main tasklet starts/ends. Copy the context from/to the thread state. --- Doc/library/stackless/tasklets.rst | 28 ++++++++++-------- Include/slp_structs.h | 5 +++- Stackless/module/scheduling.c | 6 ++++ Stackless/module/taskletobject.c | 4 +-- Stackless/unittests/test_miscell.py | 45 +++++++++++++++++++++++++++++ 5 files changed, 72 insertions(+), 16 deletions(-) diff --git a/Doc/library/stackless/tasklets.rst b/Doc/library/stackless/tasklets.rst index b9dc97f30598ce..30aaa25f7a9b17 100644 --- a/Doc/library/stackless/tasklets.rst +++ b/Doc/library/stackless/tasklets.rst @@ -609,27 +609,27 @@ Design requirements were 4. Enable the integration of tasklet-based co-routines into the :mod:`asyncio` framework. This is an obvious application which involves context variables and tasklets. -Now each tasklet object has a private context attribute, which is either undefined (``NULL``) or a -:class:`~contextvars.Context` object. The design goals have some consequences: +Now each tasklet object has it own private context attribute. The design goals have some consequences: * The active :class:`~contextvars.Context` object of a thread (as defined by the |PPL|) is the context of the :attr:`~stackless.current` tasklet. This implies that a tasklet switch, switches the active context of the thread. * In accordance with the design decisions made in :pep:`567` the context of a tasklet can't be - accessed directly, but you can use the method :meth:`tasklet.context_run` to run arbitrary code + accessed directly [#f1]_, but you can use the method :meth:`tasklet.context_run` to run arbitrary code in this context. For instance ``tasklet.context_run(contextvars.copy_context())`` returns a copy of the context. The attribute :attr:`tasklet.context_id` can be used to test, if two tasklets share the context. -* A tasklet, whose context is undefined must behave identically to a tasklet, whose context is an - empty :class:`~contextvars.Context` object. [#f1]_ Therefore the |PY| API provides no way to distinguish - both states. - -* Whenever the context of a tasklet is to be shared with another tasklet and the context is initially - undefined, it must be set to a newly created :class:`~contextvars.Context` object beforehand. +* If you use the C-API, the context attribute of a tasklet is stored in the field *context* of the structure + :c:type:`PyTaskletObject` or :c:type:`PyThreadState`. This field is is either undefined (``NULL``) or a pointer to a + :class:`~contextvars.Context` object. + A tasklet, whose *context* is ``NULL`` **must** behave identically to a tasklet, whose context is an + empty :class:`~contextvars.Context` object [#f2]_. Therefore the |PY| API provides no way to distinguish + both states. Whenever the context of a tasklet is to be shared with another tasklet and `tasklet->context` + is initially `NULL`, it must be set to a newly created :class:`~contextvars.Context` object beforehand. This affects the methods :meth:`~tasklet.context_run`, :meth:`~tasklet.__init__`, :meth:`~tasklet.bind` - and :meth:`~tasklet.__setstate__`. + and :meth:`~tasklet.__setstate__` as well as the attribute :attr:`tasklet.context_id`. * If the state of a tasklet changes from *not alive* to *bound* or to *alive* (methods :meth:`~tasklet.__init__`, :meth:`~tasklet.bind` or :meth:`~tasklet.__setstate__`), the context @@ -645,7 +645,11 @@ Now each tasklet object has a private context attribute, which is either undefin .. rubric:: Footnotes -.. [#f1] Setting a context variable to a non default value sets a previously undefined - context attribute to a newly created :class:`~contextvars.Context` object. This can happen anytime in a +.. [#f1] Not exactly true. The return value of :meth:`tasklet.__reduce_ex__` can contain references to class + :class:`contextvars.Context`, but it is strongly discouraged, to use them for any other purpose + than pickling. + +.. [#f2] Setting a context variable to a non default value changes the value of the field *context* from ``NULL`` + to a pointer to a newly created :class:`~contextvars.Context` object. This can happen anytime in a library call. Therefore any difference between an undefined context and an empty context causes ill defined behavior. diff --git a/Include/slp_structs.h b/Include/slp_structs.h index 13e686fb44c592..cf49dc2ef48f01 100644 --- a/Include/slp_structs.h +++ b/Include/slp_structs.h @@ -134,7 +134,10 @@ typedef struct _tasklet { int recursion_depth; PyObject *def_globals; PyObject *tsk_weakreflist; - PyObject *context; /* if running: the saved context, otherwise the context for the tasklet */ + /* If the tasklet is current: the context, the value of ts->context when the main tasklet was created. + * (The context of a current tasklet is always ints->tasklet.) + * If the tasklet is not current: the context for the tasklet */ + PyObject *context; } PyTaskletObject; diff --git a/Stackless/module/scheduling.c b/Stackless/module/scheduling.c index b40fa329f5f713..6c243efa3c93b0 100644 --- a/Stackless/module/scheduling.c +++ b/Stackless/module/scheduling.c @@ -1298,6 +1298,9 @@ slp_initialize_main_and_current(void) assert(task->exc_state.exc_traceback == NULL); assert(task->exc_state.previous_item == NULL); assert(task->exc_info == &task->exc_state); + assert(task->context == NULL); + Py_XINCREF(ts->context); + task->context = ts->context; SLP_EXCHANGE_EXCINFO(ts, task); NOTIFY_SCHEDULE(ts, NULL, task, -1); @@ -1393,6 +1396,9 @@ schedule_task_destruct(PyObject **retval, PyTaskletObject *prev, PyTaskletObject assert(ts->exc_info == &prev->exc_state); SLP_EXCHANGE_EXCINFO(ts, prev); TASKLET_CLAIMVAL(prev, retval); + Py_XINCREF(prev->context); + Py_XSETREF(ts->context, prev->context); + ts->context_ver++; if (PyBomb_Check(*retval)) *retval = slp_bomb_explode(*retval); } diff --git a/Stackless/module/taskletobject.c b/Stackless/module/taskletobject.c index 4244a3d152a683..088293582d698a 100644 --- a/Stackless/module/taskletobject.c +++ b/Stackless/module/taskletobject.c @@ -434,10 +434,8 @@ PyTasklet_BindEx(PyTaskletObject *task, PyObject *func, PyObject *args, PyObject /* * Set the context to the current context. It can be changed later on. - * But only for non-main tasklest, because tasklet.set_context must not - * be used for a main tasklet. */ - if (func && !(ts && task == ts->st.main)) + if (func) if (_tasklet_init_context(task)) return -1; diff --git a/Stackless/unittests/test_miscell.py b/Stackless/unittests/test_miscell.py index dcf1d8e5f84791..c7390082d68883 100644 --- a/Stackless/unittests/test_miscell.py +++ b/Stackless/unittests/test_miscell.py @@ -1752,6 +1752,51 @@ def get_cid(): self.assertNotEqual(cid0, cid) self.assertEqual(id(ctx), cid) + def test_main_tasklet_init(self): + # This test succeeds, if Stackless copies ts->context to into the main + # tasklet, when Stackless creates the main tasklet. + # This is important, if there is already a context set, when the interpreter + # gets called. Example: an interactive python prompt. + # See also: test_main_tasklet_fini + cid1 = stackless.current.context_id + cid2 = None + def task(): + nonlocal cid2 + cid2 = stackless.main.context_id + + stackless.tasklet(task)() + stackless.test_outside() + self.assertEqual(stackless.current.context_id, cid1) + self.assertEqual(cid1, cid2) + + def test_main_tasklet_fini(self): + # for a main tasklet of a thread initially ts->context == NULL + # This test succeeds, if Stackless copies the context of the main + # tasklet to ts->context after the main tasklet exits + # This way the last context of the main tasklet is preserved and available + # on the next invocation of the interpreter. + ctx_holder1 = None # use a tasklet to keep the context alive + ctx_holder2 = None + def task(): + nonlocal ctx_holder1 + ctx_holder1 = stackless.main.context_run(stackless.tasklet, id) + self.assertEqual(ctx_holder1.context_id, stackless.main.context_id) + + t = stackless.tasklet(task, ()) + def other_thread(): + nonlocal ctx_holder2 + t.bind_thread() + t.insert() + stackless.test_outside() + ctx_holder2 = stackless.tasklet(id) + self.assertEqual(ctx_holder2.context_id, stackless.current.context_id) + + tr = threading.Thread(target=other_thread, name="other thread") + tr.start() + tr.join() + self.assertIsNot(ctx_holder1, ctx_holder2) + self.assertEqual(ctx_holder1.context_id, ctx_holder2.context_id) + #/////////////////////////////////////////////////////////////////////////////// From 4ea261042e30b7c26a18699b229a42e764b37f5d Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Sun, 9 May 2021 01:35:54 +0200 Subject: [PATCH 6/9] Stackless issue #239: fix issues found during review, part 2 - fix the context handling, when the main tasklet starts/ends. Copy the context from/to the thread state. Fix the test case. --- Stackless/unittests/test_miscell.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/Stackless/unittests/test_miscell.py b/Stackless/unittests/test_miscell.py index c7390082d68883..39b505bb4c98b4 100644 --- a/Stackless/unittests/test_miscell.py +++ b/Stackless/unittests/test_miscell.py @@ -1758,16 +1758,27 @@ def test_main_tasklet_init(self): # This is important, if there is already a context set, when the interpreter # gets called. Example: an interactive python prompt. # See also: test_main_tasklet_fini - cid1 = stackless.current.context_id - cid2 = None + ctx_holder1 = None # use a tasklet to keep the context alive + ctx_holder2 = None def task(): - nonlocal cid2 - cid2 = stackless.main.context_id + nonlocal ctx_holder2 + ctx_holder2 = stackless.main.context_run(stackless.tasklet, id) + self.assertEqual(ctx_holder2.context_id, stackless.main.context_id) - stackless.tasklet(task)() - stackless.test_outside() - self.assertEqual(stackless.current.context_id, cid1) - self.assertEqual(cid1, cid2) + t = stackless.tasklet(task, ()) + def other_thread(): + nonlocal ctx_holder1 + t.bind_thread() + t.insert() + ctx_holder1 = stackless.tasklet(id) + self.assertEqual(ctx_holder1.context_id, stackless.current.context_id) + stackless.test_outside() + + tr = threading.Thread(target=other_thread, name="other thread") + tr.start() + tr.join() + self.assertIsNot(ctx_holder1, ctx_holder2) + self.assertEqual(ctx_holder1.context_id, ctx_holder2.context_id) def test_main_tasklet_fini(self): # for a main tasklet of a thread initially ts->context == NULL From d52488773ddc15d76870a39ad7270c15bbdcbb10 Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Sun, 9 May 2021 14:08:01 +0200 Subject: [PATCH 7/9] Stackless issue #239: adapt a test case Use stackless._stackless._test_outside instead of stackless.test_outside. --- Stackless/unittests/test_miscell.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Stackless/unittests/test_miscell.py b/Stackless/unittests/test_miscell.py index 082cc2e939a04e..a59228373e2130 100644 --- a/Stackless/unittests/test_miscell.py +++ b/Stackless/unittests/test_miscell.py @@ -1772,7 +1772,7 @@ def other_thread(): t.insert() ctx_holder1 = stackless.tasklet(id) self.assertEqual(ctx_holder1.context_id, stackless.current.context_id) - stackless.test_outside() + stackless._stackless._test_outside() tr = threading.Thread(target=other_thread, name="other thread") tr.start() @@ -1798,7 +1798,7 @@ def other_thread(): nonlocal ctx_holder2 t.bind_thread() t.insert() - stackless.test_outside() + stackless._stackless._test_outside() ctx_holder2 = stackless.tasklet(id) self.assertEqual(ctx_holder2.context_id, stackless.current.context_id) From 71d8e0e04f4727438d55a416b93a63f0086ee21c Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Thu, 13 May 2021 20:23:19 +0200 Subject: [PATCH 8/9] Stackless issue #239: simplify context switching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplify context switching as sugested by Kristján. --- Include/slp_structs.h | 3 +-- Stackless/module/scheduling.c | 32 ++++++++++++++------------------ 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/Include/slp_structs.h b/Include/slp_structs.h index cf49dc2ef48f01..21ab38ac073def 100644 --- a/Include/slp_structs.h +++ b/Include/slp_structs.h @@ -134,8 +134,7 @@ typedef struct _tasklet { int recursion_depth; PyObject *def_globals; PyObject *tsk_weakreflist; - /* If the tasklet is current: the context, the value of ts->context when the main tasklet was created. - * (The context of a current tasklet is always ints->tasklet.) + /* If the tasklet is current: NULL. (The context of a current tasklet is always in ts->tasklet.) * If the tasklet is not current: the context for the tasklet */ PyObject *context; } PyTaskletObject; diff --git a/Stackless/module/scheduling.c b/Stackless/module/scheduling.c index 6c243efa3c93b0..3210ecbd26ed0c 100644 --- a/Stackless/module/scheduling.c +++ b/Stackless/module/scheduling.c @@ -722,23 +722,18 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec PyThreadState *ts_ = (tstate); PyTaskletObject *t_ = (task); _PyErr_StackItem *exc_info; - PyObject *c; assert(ts_); assert(t_); exc_info = ts_->exc_info; assert(exc_info); assert(t_->exc_info); #if 0 - c = PyStackless_GetCurrent(); + PyObject *c = PyStackless_GetCurrent(); fprintf(stderr, "SLP_EXCHANGE_EXCINFO %3d current %14p,\tset task %p = %p,\ttstate %p = %p\n", __LINE__, c, t_, exc_info, ts_, t_->exc_info); Py_XDECREF(c); #endif ts_->exc_info = t_->exc_info; t_->exc_info = exc_info; - c = ts_->context; - ts_->context = t_->context; - t_->context = c; - ts_->context_ver++; } #else #define SLP_EXCHANGE_EXCINFO(tstate_, task_) \ @@ -746,7 +741,6 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec PyThreadState *ts_ = (tstate_); \ PyTaskletObject *t_ = (task_); \ _PyErr_StackItem *exc_info; \ - PyObject *c; \ assert(ts_); \ assert(t_); \ exc_info = ts_->exc_info; \ @@ -754,10 +748,6 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec assert(t_->exc_info); \ ts_->exc_info = t_->exc_info; \ t_->exc_info = exc_info; \ - c = ts_->context; \ - ts_->context = t_->context; \ - t_->context = c; \ - ts_->context_ver++; \ } while(0) #endif @@ -766,13 +756,23 @@ Py_LOCAL_INLINE(void) SLP_UPDATE_TSTATE_ON_SWITCH(PyThreadState *tstate, PyTaskl { SLP_EXCHANGE_EXCINFO(tstate, prev); SLP_EXCHANGE_EXCINFO(tstate, next); + prev->context = tstate->context; + tstate->context = next->context; + tstate->context_ver++; + next->context = NULL; } #else #define SLP_UPDATE_TSTATE_ON_SWITCH(tstate__, prev_, next_) \ do { \ PyThreadState *ts__ = (tstate__); \ - SLP_EXCHANGE_EXCINFO(ts__, (prev_)); \ - SLP_EXCHANGE_EXCINFO(ts__, (next_)); \ + PyTaskletObject *prev__ = (prev_); \ + PyTaskletObject *next__ = (next_); \ + SLP_EXCHANGE_EXCINFO(ts__, prev__); \ + SLP_EXCHANGE_EXCINFO(ts__, next__); \ + prev__->context = ts__->context; \ + ts__->context = next__->context; \ + ts__->context_ver++; \ + next__->context = NULL; \ } while(0) #endif @@ -1299,8 +1299,6 @@ slp_initialize_main_and_current(void) assert(task->exc_state.previous_item == NULL); assert(task->exc_info == &task->exc_state); assert(task->context == NULL); - Py_XINCREF(ts->context); - task->context = ts->context; SLP_EXCHANGE_EXCINFO(ts, task); NOTIFY_SCHEDULE(ts, NULL, task, -1); @@ -1394,11 +1392,9 @@ schedule_task_destruct(PyObject **retval, PyTaskletObject *prev, PyTaskletObject /* main is exiting */ assert(ts->st.main == NULL); assert(ts->exc_info == &prev->exc_state); + assert(prev->context == NULL); SLP_EXCHANGE_EXCINFO(ts, prev); TASKLET_CLAIMVAL(prev, retval); - Py_XINCREF(prev->context); - Py_XSETREF(ts->context, prev->context); - ts->context_ver++; if (PyBomb_Check(*retval)) *retval = slp_bomb_explode(*retval); } From bdb7bc4fe10a66663dd00b2831a1968f4b71135b Mon Sep 17 00:00:00 2001 From: Anselm Kruis Date: Tue, 18 May 2021 23:19:05 +0200 Subject: [PATCH 9/9] Stackless issue #239: documentation and comment updates --- Doc/library/stackless/tasklets.rst | 12 ++++++++++-- Stackless/module/scheduling.c | 12 ++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/Doc/library/stackless/tasklets.rst b/Doc/library/stackless/tasklets.rst index 30aaa25f7a9b17..1e1ca32a7469bf 100644 --- a/Doc/library/stackless/tasklets.rst +++ b/Doc/library/stackless/tasklets.rst @@ -395,6 +395,12 @@ The ``tasklet`` class stackless.current.set_context(saved_context) See also :meth:`contextvars.Context.run` for additional information. + Use this method with care, because it lets you manipulate the context of + another tasklet. Often it is sufficient to use a copy of the context + instead of the original object:: + + copied_context = tasklet.context_run(contextvars.copy_context) + copied_context.run(...) .. note:: This method has been added on a provisional basis (see :pep:`411` @@ -491,6 +497,7 @@ The following attributes allow checking of user set situations: .. versionadded:: 3.7.6 This attribute is the :func:`id` of the :class:`~contextvars.Context` object to be used while this tasklet runs. + It is intended mostly for debugging. .. note:: This attribute has been added on a provisional basis (see :pep:`411` @@ -599,7 +606,7 @@ Using context variables and multiple tasklets together didn't work well in |SLP| 3.7.5, because all tasklets of a given thread shared the same context. Starting with version 3.7.6 |SLP| adds explicit support for context variables. -Design requirements were +Design requirements were: 1. Be fully compatible with |CPY| and its design decisions. 2. Be fully compatible with previous applications of |SLP|, which are unaware of context variables. @@ -607,7 +614,8 @@ Design requirements were a context variable, can delegate this duty to a sub-tasklet without the need to manage the context of the sub-tasklet manually. 4. Enable the integration of tasklet-based co-routines into the :mod:`asyncio` framework. - This is an obvious application which involves context variables and tasklets. + This is an obvious application which involves context variables and tasklets. See + `slp-coroutine `_ for an example. Now each tasklet object has it own private context attribute. The design goals have some consequences: diff --git a/Stackless/module/scheduling.c b/Stackless/module/scheduling.c index 3210ecbd26ed0c..554950d93c93de 100644 --- a/Stackless/module/scheduling.c +++ b/Stackless/module/scheduling.c @@ -711,11 +711,9 @@ new_lock(void) * When switching from one tasklet to another tasklet, we have to switch * the exc_info-pointer in the thread state. * - * The same situation is for the current contextvars.Context. When switching - * from one tasklet to another tasklet, we have to switch the context-pointer - * in the thread state. + * With current compilers, an inline function performs no worse than a macro, + * but in the debugger single stepping it is much simpler. */ - #if 1 Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObject *task) { @@ -751,6 +749,12 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec } while(0) #endif +/* + * The inline function (or macro) SLP_UPDATE_TSTATE_ON_SWITCH encapsulates some changes + * to the thread state when Stackless switches tasklets: + * - Exchange the exception information + * - Switch the PEP 567 context + */ #if 1 Py_LOCAL_INLINE(void) SLP_UPDATE_TSTATE_ON_SWITCH(PyThreadState *tstate, PyTaskletObject *prev, PyTaskletObject *next) {