Skip to content

Commit 0fd0b1b

Browse files
potiukephraimbuddy
authored andcommitted
Change the storage of frame to use threadLocal rather than Dict (#21993)
There is a very probable WeakKeyDict bug in Python standard library (to be confirmed and investigated further) that manifests itself in a very rare failure of the test_stacktrace_on_failure_starts_with_task_execute_method This turned out to be related to an unexpected behaviour (and most likely a bug - to be confirmed) of WeakKeyDict when you have potentially two different objects with the same `equals` and `hash` values added to the same WeakKeyDict as keys. More info on similar report (but raised for a bit different reason) bug in Python can be found here: https://bugs.python.org/issue44140 We submitted a PR to fix the problem found python/cpython#31685 (cherry picked from commit 1949f5d)
1 parent fd3b4e9 commit 0fd0b1b

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

airflow/models/taskinstance.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
import os
2323
import pickle
2424
import signal
25+
import threading
2526
import warnings
2627
from collections import defaultdict
2728
from datetime import datetime, timedelta
2829
from functools import partial
2930
from inspect import currentframe
3031
from tempfile import NamedTemporaryFile
31-
from types import FrameType
32+
from types import TracebackType
3233
from typing import IO, TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union
3334
from urllib.parse import quote
34-
from weakref import WeakKeyDictionary
3535

3636
import dill
3737
import jinja2
@@ -118,7 +118,7 @@
118118
from airflow.models.baseoperator import BaseOperator
119119
from airflow.models.dag import DAG, DagModel, DagRun
120120

121-
_EXECUTION_FRAME_MAPPING: "WeakKeyDictionary[Operator, FrameType]" = WeakKeyDictionary()
121+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE = threading.local()
122122

123123

124124
@contextlib.contextmanager
@@ -1536,7 +1536,7 @@ def _execute_task(self, context, task_copy):
15361536
else:
15371537
result = execute_callable(context=context)
15381538
except: # noqa: E722
1539-
_EXECUTION_FRAME_MAPPING[task_copy] = currentframe()
1539+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = currentframe()
15401540
raise
15411541
# If the task returns a result, push an XCom containing it
15421542
if task_copy.do_xcom_push and result is not None:
@@ -1724,6 +1724,36 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=
17241724
session.commit()
17251725
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
17261726

1727+
def get_truncated_error_traceback(self, error: BaseException) -> Optional[TracebackType]:
1728+
"""
1729+
Returns truncated error traceback.
1730+
1731+
This method returns traceback of the error truncated to the
1732+
frame saved by earlier try/except along the way. If the frame
1733+
is found, the traceback will be truncated to below the frame.
1734+
1735+
:param error: exception to get traceback from
1736+
:return: traceback to print
1737+
"""
1738+
tb = error.__traceback__
1739+
try:
1740+
execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
1741+
except AttributeError:
1742+
self.log.warning(
1743+
"We expected to get frame set in local storage but it was not."
1744+
" Please report this as an issue with full logs"
1745+
" at https://github.com/apache/airflow/issues/new",
1746+
exc_info=True,
1747+
)
1748+
return tb
1749+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = None
1750+
while tb is not None:
1751+
if tb.tb_frame is execution_frame:
1752+
tb = tb.tb_next
1753+
break
1754+
tb = tb.tb_next
1755+
return tb or error.__traceback__
1756+
17271757
@provide_session
17281758
def handle_failure(
17291759
self,
@@ -1739,14 +1769,7 @@ def handle_failure(
17391769

17401770
if error:
17411771
if isinstance(error, Exception):
1742-
execution_frame = _EXECUTION_FRAME_MAPPING.get(self.task)
1743-
tb = error.__traceback__
1744-
while tb is not None:
1745-
if tb.tb_frame is execution_frame:
1746-
tb = tb.tb_next
1747-
break
1748-
tb = tb.tb_next
1749-
tb = tb or error.__traceback__
1772+
tb = self.get_truncated_error_traceback(error)
17501773
self.log.error("Task failed with exception", exc_info=(type(error), error, tb))
17511774
else:
17521775
self.log.error("%s", error)

0 commit comments

Comments
 (0)