Skip to content

Commit 00329e0

Browse files
authored
Refactor BatchLogRecordProcessor and associated tests (#4535)
1 parent adbec50 commit 00329e0

File tree

3 files changed

+200
-269
lines changed

3 files changed

+200
-269
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 81 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
import threading
2323
import weakref
2424
from os import environ, linesep
25-
from time import time_ns
26-
from typing import IO, Callable, Deque, List, Optional, Sequence
25+
from typing import IO, Callable, Deque, Optional, Sequence
2726

2827
from opentelemetry.context import (
2928
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -56,6 +55,12 @@ class LogExportResult(enum.Enum):
5655
FAILURE = 1
5756

5857

58+
class BatchLogExportStrategy(enum.Enum):
59+
EXPORT_ALL = 0
60+
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61+
EXPORT_AT_LEAST_ONE_BATCH = 2
62+
63+
5964
class LogExporter(abc.ABC):
6065
"""Interface for exporting logs.
6166
@@ -141,14 +146,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
141146
return True
142147

143148

144-
class _FlushRequest:
145-
__slots__ = ["event", "num_log_records"]
146-
147-
def __init__(self):
148-
self.event = threading.Event()
149-
self.num_log_records = 0
150-
151-
152149
_BSP_RESET_ONCE = Once()
153150

154151

@@ -167,8 +164,6 @@ class BatchLogRecordProcessor(LogRecordProcessor):
167164
"""
168165

169166
_queue: Deque[LogData]
170-
_flush_request: _FlushRequest | None
171-
_log_records: List[LogData | None]
172167

173168
def __init__(
174169
self,
@@ -190,7 +185,7 @@ def __init__(
190185
max_export_batch_size = (
191186
BatchLogRecordProcessor._default_max_export_batch_size()
192187
)
193-
188+
# Not used. No way currently to pass timeout to export.
194189
if export_timeout_millis is None:
195190
export_timeout_millis = (
196191
BatchLogRecordProcessor._default_export_timeout_millis()
@@ -202,27 +197,45 @@ def __init__(
202197

203198
self._exporter = exporter
204199
self._max_queue_size = max_queue_size
205-
self._schedule_delay_millis = schedule_delay_millis
200+
self._schedule_delay = schedule_delay_millis / 1e3
206201
self._max_export_batch_size = max_export_batch_size
202+
# Not used. No way currently to pass timeout to export.
203+
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
207204
self._export_timeout_millis = export_timeout_millis
205+
# Deque is thread safe.
208206
self._queue = collections.deque([], max_queue_size)
209207
self._worker_thread = threading.Thread(
210208
name="OtelBatchLogRecordProcessor",
211209
target=self.worker,
212210
daemon=True,
213211
)
214-
self._condition = threading.Condition(threading.Lock())
212+
215213
self._shutdown = False
216-
self._flush_request = None
217-
self._log_records = [None] * self._max_export_batch_size
214+
self._export_lock = threading.Lock()
215+
self._worker_awaken = threading.Event()
218216
self._worker_thread.start()
219217
if hasattr(os, "register_at_fork"):
220218
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
221219
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
222220
self._pid = os.getpid()
223221

222+
def _should_export_batch(
223+
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
224+
) -> bool:
225+
if not self._queue:
226+
return False
227+
# Always continue to export while queue length exceeds max batch size.
228+
if len(self._queue) >= self._max_export_batch_size:
229+
return True
230+
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
231+
return True
232+
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
233+
return num_iterations == 0
234+
return False
235+
224236
def _at_fork_reinit(self):
225-
self._condition = threading.Condition(threading.Lock())
237+
self._export_lock = threading.Lock()
238+
self._worker_awaken = threading.Event()
226239
self._queue.clear()
227240
self._worker_thread = threading.Thread(
228241
name="OtelBatchLogRecordProcessor",
@@ -233,152 +246,75 @@ def _at_fork_reinit(self):
233246
self._pid = os.getpid()
234247

235248
def worker(self):
236-
timeout = self._schedule_delay_millis / 1e3
237-
flush_request: Optional[_FlushRequest] = None
238249
while not self._shutdown:
239-
with self._condition:
240-
if self._shutdown:
241-
# shutdown may have been called, avoid further processing
242-
break
243-
flush_request = self._get_and_unset_flush_request()
244-
if (
245-
len(self._queue) < self._max_export_batch_size
246-
and flush_request is None
247-
):
248-
self._condition.wait(timeout)
249-
250-
flush_request = self._get_and_unset_flush_request()
251-
if not self._queue:
252-
timeout = self._schedule_delay_millis / 1e3
253-
self._notify_flush_request_finished(flush_request)
254-
flush_request = None
255-
continue
256-
if self._shutdown:
257-
break
258-
259-
start_ns = time_ns()
260-
self._export(flush_request)
261-
end_ns = time_ns()
262-
# subtract the duration of this export call to the next timeout
263-
timeout = self._schedule_delay_millis / 1e3 - (
264-
(end_ns - start_ns) / 1e9
265-
)
266-
267-
self._notify_flush_request_finished(flush_request)
268-
flush_request = None
269-
270-
# there might have been a new flush request while export was running
271-
# and before the done flag switched to true
272-
with self._condition:
273-
shutdown_flush_request = self._get_and_unset_flush_request()
274-
275-
# flush the remaining logs
276-
self._drain_queue()
277-
self._notify_flush_request_finished(flush_request)
278-
self._notify_flush_request_finished(shutdown_flush_request)
279-
280-
def _export(self, flush_request: Optional[_FlushRequest] = None):
281-
"""Exports logs considering the given flush_request.
282-
283-
If flush_request is not None then logs are exported in batches
284-
until the number of exported logs reached or exceeded the num of logs in
285-
flush_request, otherwise exports at max max_export_batch_size logs.
286-
"""
287-
if flush_request is None:
288-
self._export_batch()
289-
return
290-
291-
num_log_records = flush_request.num_log_records
292-
while self._queue:
293-
exported = self._export_batch()
294-
num_log_records -= exported
295-
296-
if num_log_records <= 0:
250+
# Lots of strategies in the spec for setting next timeout.
251+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
252+
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
253+
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
254+
if self._shutdown:
297255
break
298-
299-
def _export_batch(self) -> int:
300-
"""Exports at most max_export_batch_size logs and returns the number of
301-
exported logs.
302-
"""
303-
idx = 0
304-
while idx < self._max_export_batch_size and self._queue:
305-
record = self._queue.pop()
306-
self._log_records[idx] = record
307-
idx += 1
308-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
309-
try:
310-
self._exporter.export(self._log_records[:idx]) # type: ignore
311-
except Exception: # pylint: disable=broad-exception-caught
312-
_logger.exception("Exception while exporting logs.")
313-
detach(token)
314-
315-
for index in range(idx):
316-
self._log_records[index] = None
317-
return idx
318-
319-
def _drain_queue(self):
320-
"""Export all elements until queue is empty.
321-
322-
Can only be called from the worker thread context because it invokes
323-
`export` that is not thread safe.
324-
"""
325-
while self._queue:
326-
self._export_batch()
327-
328-
def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
329-
flush_request = self._flush_request
330-
self._flush_request = None
331-
if flush_request is not None:
332-
flush_request.num_log_records = len(self._queue)
333-
return flush_request
334-
335-
@staticmethod
336-
def _notify_flush_request_finished(
337-
flush_request: Optional[_FlushRequest] = None,
338-
):
339-
if flush_request is not None:
340-
flush_request.event.set()
341-
342-
def _get_or_create_flush_request(self) -> _FlushRequest:
343-
if self._flush_request is None:
344-
self._flush_request = _FlushRequest()
345-
return self._flush_request
256+
self._export(
257+
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
258+
if sleep_interrupted
259+
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
260+
)
261+
self._worker_awaken.clear()
262+
self._export(BatchLogExportStrategy.EXPORT_ALL)
263+
264+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
265+
with self._export_lock:
266+
iteration = 0
267+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
268+
# once the lock is obtained to see if we still need to make the requested export.
269+
while self._should_export_batch(batch_strategy, iteration):
270+
iteration += 1
271+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
272+
try:
273+
self._exporter.export(
274+
[
275+
# Oldest records are at the back, so pop from there.
276+
self._queue.pop()
277+
for _ in range(
278+
min(
279+
self._max_export_batch_size,
280+
len(self._queue),
281+
)
282+
)
283+
]
284+
)
285+
except Exception: # pylint: disable=broad-exception-caught
286+
_logger.exception("Exception while exporting logs.")
287+
detach(token)
346288

347289
def emit(self, log_data: LogData) -> None:
348-
"""Adds the `LogData` to queue and notifies the waiting threads
349-
when size of queue reaches max_export_batch_size.
350-
"""
351290
if self._shutdown:
291+
_logger.info("Shutdown called, ignoring log.")
352292
return
353293
if self._pid != os.getpid():
354294
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
355295

296+
if len(self._queue) == self._max_queue_size:
297+
_logger.warning("Queue full, dropping log.")
356298
self._queue.appendleft(log_data)
357299
if len(self._queue) >= self._max_export_batch_size:
358-
with self._condition:
359-
self._condition.notify()
300+
self._worker_awaken.set()
360301

361302
def shutdown(self):
303+
if self._shutdown:
304+
return
305+
# Prevents emit and force_flush from further calling export.
362306
self._shutdown = True
363-
with self._condition:
364-
self._condition.notify_all()
307+
# Interrupts sleep in the worker, if it's sleeping.
308+
self._worker_awaken.set()
309+
# Main worker loop should exit after one final export call with flush all strategy.
365310
self._worker_thread.join()
366311
self._exporter.shutdown()
367312

368313
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
369-
if timeout_millis is None:
370-
timeout_millis = self._export_timeout_millis
371314
if self._shutdown:
372-
return True
373-
374-
with self._condition:
375-
flush_request = self._get_or_create_flush_request()
376-
self._condition.notify_all()
377-
378-
ret = flush_request.event.wait(timeout_millis / 1e3)
379-
if not ret:
380-
_logger.warning("Timeout was exceeded in force_flush().")
381-
return ret
315+
return
316+
# Blocking call to export.
317+
self._export(BatchLogExportStrategy.EXPORT_ALL)
382318

383319
@staticmethod
384320
def _default_max_queue_size():

opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT
8888
8989
The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor.
90+
This environment variable currently does nothing, see https://github.com/open-telemetry/opentelemetry-python/issues/4555.
9091
Default: 30000
9192
"""
9293

0 commit comments

Comments
 (0)