Skip to content

Commit 91b95ea

Browse files
potiukephraimbuddy
authored andcommitted
Change the storage of frame to use threadLocal rather than Dict (#21993)
There is a very probable WeakKeyDict bug in Python standard library (to be confirmed and investigated further) that manifests itself in a very rare failure of the test_stacktrace_on_failure_starts_with_task_execute_method This turned out to be related to an unexpected behaviour (and most likely a bug - to be confirmed) of WeakKeyDict when you have potentially two different objects with the same `equals` and `hash` values added to the same WeakKeyDict as keys. More info on similar report (but raised for a bit different reason) bug in Python can be found here: https://bugs.python.org/issue44140 We submitted a PR to fix the problem found python/cpython#31685 (cherry picked from commit 1949f5d)
1 parent 068d1be commit 91b95ea

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

airflow/models/taskinstance.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
import os
2323
import pickle
2424
import signal
25+
import threading
2526
import warnings
2627
from collections import defaultdict
2728
from datetime import datetime, timedelta
2829
from functools import partial
2930
from inspect import currentframe
3031
from tempfile import NamedTemporaryFile
31-
from types import FrameType
32+
from types import TracebackType
3233
from typing import IO, TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union
3334
from urllib.parse import quote
34-
from weakref import WeakKeyDictionary
3535

3636
import dill
3737
import jinja2
@@ -118,7 +118,7 @@
118118
from airflow.models.baseoperator import BaseOperator
119119
from airflow.models.dag import DAG, DagModel, DagRun
120120

121-
_EXECUTION_FRAME_MAPPING: "WeakKeyDictionary[Operator, FrameType]" = WeakKeyDictionary()
121+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE = threading.local()
122122

123123

124124
@contextlib.contextmanager
@@ -1537,7 +1537,7 @@ def _execute_task(self, context, task_copy):
15371537
else:
15381538
result = execute_callable(context=context)
15391539
except: # noqa: E722
1540-
_EXECUTION_FRAME_MAPPING[task_copy] = currentframe()
1540+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = currentframe()
15411541
raise
15421542
# If the task returns a result, push an XCom containing it
15431543
if task_copy.do_xcom_push and result is not None:
@@ -1725,6 +1725,36 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=
17251725
session.commit()
17261726
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
17271727

1728+
def get_truncated_error_traceback(self, error: BaseException) -> Optional[TracebackType]:
1729+
"""
1730+
Returns truncated error traceback.
1731+
1732+
This method returns traceback of the error truncated to the
1733+
frame saved by earlier try/except along the way. If the frame
1734+
is found, the traceback will be truncated to below the frame.
1735+
1736+
:param error: exception to get traceback from
1737+
:return: traceback to print
1738+
"""
1739+
tb = error.__traceback__
1740+
try:
1741+
execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
1742+
except AttributeError:
1743+
self.log.warning(
1744+
"We expected to get frame set in local storage but it was not."
1745+
" Please report this as an issue with full logs"
1746+
" at https://github.com/apache/airflow/issues/new",
1747+
exc_info=True,
1748+
)
1749+
return tb
1750+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = None
1751+
while tb is not None:
1752+
if tb.tb_frame is execution_frame:
1753+
tb = tb.tb_next
1754+
break
1755+
tb = tb.tb_next
1756+
return tb or error.__traceback__
1757+
17281758
@provide_session
17291759
def handle_failure(
17301760
self,
@@ -1740,14 +1770,7 @@ def handle_failure(
17401770

17411771
if error:
17421772
if isinstance(error, Exception):
1743-
execution_frame = _EXECUTION_FRAME_MAPPING.get(self.task)
1744-
tb = error.__traceback__
1745-
while tb is not None:
1746-
if tb.tb_frame is execution_frame:
1747-
tb = tb.tb_next
1748-
break
1749-
tb = tb.tb_next
1750-
tb = tb or error.__traceback__
1773+
tb = self.get_truncated_error_traceback(error)
17511774
self.log.error("Task failed with exception", exc_info=(type(error), error, tb))
17521775
else:
17531776
self.log.error("%s", error)

0 commit comments

Comments
 (0)