From c62a127cdc32dc066e11ea18997f25aad4fc8474 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Fri, 6 Oct 2023 00:53:09 +0200 Subject: [PATCH] rework eventbus scheduled rule execution --- localstack/services/events/dispatcher.py | 143 ++++++++++++ localstack/services/events/provider.py | 271 ++++++++++------------ localstack/services/events/scheduler.py | 281 ++++++++++++++++------- localstack/utils/json.py | 2 +- localstack/utils/scheduler.py | 18 +- 5 files changed, 484 insertions(+), 231 deletions(-) create mode 100644 localstack/services/events/dispatcher.py diff --git a/localstack/services/events/dispatcher.py b/localstack/services/events/dispatcher.py new file mode 100644 index 0000000000000..2310befed2b82 --- /dev/null +++ b/localstack/services/events/dispatcher.py @@ -0,0 +1,143 @@ +import abc +import dataclasses +import json +import logging +from datetime import datetime +from typing import TypedDict + +from localstack.aws.api.events import Target +from localstack.aws.connect import connect_to +from localstack.utils.aws.arns import parse_arn, sqs_queue_url_for_arn +from localstack.utils.aws.client_types import ServicePrincipal +from localstack.utils.json import extract_jsonpath +from localstack.utils.strings import long_uid, truncate +from localstack.utils.time import TIMESTAMP_FORMAT_TZ + +LOG = logging.getLogger(__name__) + +EventDict = TypedDict( + "EventDict", + { + "version": str, + "id": str, + "detail-type": str, + "source": str, + "account": str, + "time": str, + "region": str, + "resources": list[str], + "detail": dict, + }, +) + + +@dataclasses.dataclass +class Event: + detail_type: str + source: str + account: str + region: str + resources: str | list[str] + details: dict = dataclasses.field(default_factory=dict) + time: datetime = dataclasses.field(default_factory=datetime.utcnow) + id: str = dataclasses.field(default_factory=long_uid) + + def to_event_dict(self) -> EventDict: + return { + "version": "0", + "id": self.id, + "detail-type": self.detail_type, + "source": self.source, + "account": self.account, + "time": self.time.strftime(TIMESTAMP_FORMAT_TZ), + "region": self.region, + "resources": self.resources if isinstance(self.resources, list) else [self.resources], + "detail": self.details, + } + + +class EventDispatcher(abc.ABC): + target_service: str + + def dispatch(self, event: Event, target: Target): + raise NotImplementedError + + @staticmethod + def dispatcher_for_target(target_arn: str) -> "EventDispatcher": + service = parse_arn(target_arn)["service"] + + # TODO: split out `send_event_to_target` into individual dispatcher classes + if service == "sqs": + return SqsEventDispatcher() + + return LegacyScheduledEventDispatcher() + + +class LegacyScheduledEventDispatcher(EventDispatcher): + target_service = None + + def dispatch(self, event: Event, target: Target): + from localstack.utils.aws.message_forwarding import send_event_to_target + from localstack.utils.collections import pick_attributes + + # TODO generate event matching aws in case no Input has been specified + event_str = target.get("Input") + event_data = json.loads(event_str) if event_str is not None else event.to_event_dict() + attr = pick_attributes(target, ["$.SqsParameters", "$.KinesisParameters"]) + + try: + LOG.debug( + "Event rule %s sending event to target %s: %s", + event.resources[0], + target["Arn"], + event, + ) + + send_event_to_target( + target["Arn"], + event_data, + target_attributes=attr, + role=target.get("RoleArn"), + target=target, + source_arn=event.resources[0], + source_service=ServicePrincipal.events, + ) + except Exception as e: + LOG.error( + "Unable to send event notification %s to target %s: %s", + truncate(event_data), + target, + e, + exc_info=e if LOG.isEnabledFor(logging.DEBUG) else None, + ) + + +class SqsEventDispatcher(EventDispatcher): + target_service = "sqs" + + def dispatch(self, event: Event, target: Target): + if input_ := target.get("Input"): + body = input_ + else: + body = json.dumps(self.create_event(event, target)) + + request = { + "QueueUrl": self.get_queue_url(target), + "MessageBody": body, + **target.get("SqsParameters", {}), + } + + connect_to().sqs.send_message(**request) + + def get_queue_url(self, target: Target) -> str: + return sqs_queue_url_for_arn(target["Arn"]) + + def create_event(self, event: Event, target: Target) -> dict: + event_data = event.to_event_dict() + if input_path := target.get("InputPath"): + event_data = extract_jsonpath(event_data, input_path) + + if target.get("InputTransformer"): + LOG.warning("InputTransformer is currently not supported for SQS") + + return event_data diff --git a/localstack/services/events/provider.py b/localstack/services/events/provider.py index 1b96756c4b206..8bdf253244d65 100644 --- a/localstack/services/events/provider.py +++ b/localstack/services/events/provider.py @@ -7,7 +7,6 @@ import time from typing import Any, Dict, List, Optional -from moto.events import events_backends from moto.events.responses import EventsHandler as MotoEventsHandler from werkzeug import Request from werkzeug.exceptions import NotFound @@ -40,8 +39,9 @@ from localstack.constants import APPLICATION_AMZ_JSON_1_1 from localstack.http import route from localstack.services.edge import ROUTER +from localstack.services.events.dispatcher import Event, EventDispatcher from localstack.services.events.models import EventsStore, events_stores -from localstack.services.events.scheduler import JobScheduler +from localstack.services.events.scheduler import JobId, JobScheduler, parse_schedule_expression from localstack.services.moto import call_moto from localstack.services.plugins import ServiceLifecycleHook from localstack.utils.aws.arns import event_bus_arn, parse_arn @@ -51,7 +51,6 @@ from localstack.utils.common import TMP_FILES, mkdir, save_file, truncate from localstack.utils.json import extract_jsonpath from localstack.utils.strings import long_uid, short_uid -from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp LOG = logging.getLogger(__name__) @@ -72,15 +71,16 @@ class ValidationException(ServiceException): class EventsProvider(EventsApi, ServiceLifecycleHook): def __init__(self): apply_patches() + self.job_scheduler = JobScheduler() def on_after_init(self): ROUTER.add(self.trigger_scheduled_rule) def on_before_start(self): - JobScheduler.start() + self.job_scheduler.start() def on_before_stop(self): - JobScheduler.shutdown() + self.job_scheduler.shutdown() @route("/_aws/events/rules//trigger") def trigger_scheduled_rule(self, request: Request, rule_arn: str): @@ -93,13 +93,14 @@ def trigger_scheduled_rule(self, request: Request, rule_arn: str): job_id = events_stores[account_id][region].rule_scheduled_jobs.get(rule_name) if not job_id: raise NotFound() - job = JobScheduler().instance().get_job(job_id) + job = self.job_scheduler.get_job(job_id) if not job: raise NotFound() + if not job.task: + raise NotFound(f"Job {job_id} not started") - # TODO: once job scheduler is refactored, we can update the deadline of the task instead of running - # it here - job.run() + job.task.deadline = 0 + self.job_scheduler.scheduler.notify() @staticmethod def get_store(context: RequestContext) -> EventsStore: @@ -119,134 +120,6 @@ def test_event_pattern( return TestEventPatternResponse(Result=False) return TestEventPatternResponse(Result=True) - @staticmethod - def get_scheduled_rule_func( - store: EventsStore, - rule_name: RuleName, - event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, - ): - def func(*args, **kwargs): - account_id = store._account_id - region = store._region_name - moto_backend = events_backends[account_id][region] - event_bus_name = get_event_bus_name(event_bus_name_or_arn) - event_bus = moto_backend.event_buses[event_bus_name] - rule = event_bus.rules.get(rule_name) - if not rule: - LOG.info("Unable to find rule `%s` for event bus `%s`", rule_name, event_bus_name) - return - if rule.targets: - LOG.debug( - "Notifying %s targets in response to triggered Events rule %s", - len(rule.targets), - rule_name, - ) - - default_event = { - "version": "0", - "id": long_uid(), - "detail-type": "Scheduled Event", - "source": "aws.events", - "account": account_id, - "time": timestamp(format=TIMESTAMP_FORMAT_TZ), - "region": region, - "resources": [rule.arn], - "detail": {}, - } - - for target in rule.targets: - arn = target.get("Arn") - - if input_ := target.get("Input"): - event = json.loads(input_) - else: - event = default_event - if target.get("InputPath"): - event = filter_event_with_target_input_path(target, event) - if target.get("InputTransformer"): - LOG.warning( - "InputTransformer is currently not supported for scheduled rules" - ) - - attr = pick_attributes(target, ["$.SqsParameters", "$.KinesisParameters"]) - - try: - send_event_to_target( - arn, - event, - target_attributes=attr, - role=target.get("RoleArn"), - target=target, - source_arn=rule.arn, - source_service=ServicePrincipal.events, - ) - except Exception as e: - LOG.info( - "Unable to send event notification %s to target %s: %s", - truncate(event), - target, - e, - ) - - return func - - @staticmethod - def convert_schedule_to_cron(schedule): - """Convert Events schedule like "cron(0 20 * * ? *)" or "rate(5 minutes)" """ - cron_regex = r"\s*cron\s*\(([^\)]*)\)\s*" - if re.match(cron_regex, schedule): - cron = re.sub(cron_regex, r"\1", schedule) - return cron - rate_regex = r"\s*rate\s*\(([^\)]*)\)\s*" - if re.match(rate_regex, schedule): - rate = re.sub(rate_regex, r"\1", schedule) - value, unit = re.split(r"\s+", rate.strip()) - - value = int(value) - if value < 1: - raise ValueError("Rate value must be larger than 0") - # see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rate-expressions.html - if value == 1 and unit.endswith("s"): - raise ValueError("If the value is equal to 1, then the unit must be singular") - if value > 1 and not unit.endswith("s"): - raise ValueError("If the value is greater than 1, the unit must be plural") - - if "minute" in unit: - return "*/%s * * * *" % value - if "hour" in unit: - return "0 */%s * * *" % value - if "day" in unit: - return "0 0 */%s * *" % value - raise ValueError("Unable to parse events schedule expression: %s" % schedule) - return schedule - - @staticmethod - def put_rule_job_scheduler( - store: EventsStore, - name: Optional[RuleName], - state: Optional[RuleState], - schedule_expression: Optional[ScheduleExpression], - event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, - ): - if not schedule_expression: - return - - try: - cron = EventsProvider.convert_schedule_to_cron(schedule_expression) - except ValueError as e: - LOG.error("Error parsing schedule expression: %s", e) - raise ValidationException("Parameter ScheduleExpression is not valid.") from e - - job_func = EventsProvider.get_scheduled_rule_func( - store, name, event_bus_name_or_arn=event_bus_name_or_arn - ) - LOG.debug("Adding new scheduled Events rule with cron schedule %s", cron) - - enabled = state != "DISABLED" - job_id = JobScheduler.instance().add_job(job_func, cron, enabled) - rule_scheduled_jobs = store.rule_scheduled_jobs - rule_scheduled_jobs[name] = job_id - def put_rule( self, context: RequestContext, @@ -259,11 +132,23 @@ def put_rule( tags: TagList = None, event_bus_name: EventBusNameOrArn = None, ) -> PutRuleResponse: - store = self.get_store(context) - self.put_rule_job_scheduler( - store, name, state, schedule_expression, event_bus_name_or_arn=event_bus_name - ) - return call_moto(context) + response = call_moto(context) + + # rules are defined with either an event_pattern or a schedule_expression. the event_pattern case + # is currently handled with moto patches, and the scheduled_expression case is handled here + # explicitly. + if schedule_expression: + job_id = self._schedule_rule_job( + region=context.region, + account_id=context.account_id, + event_bus_name_or_arn=event_bus_name, + rule_name=name, + rule_state=state, + schedule_expression=schedule_expression, + ) + self.get_store(context).rule_scheduled_jobs[name] = job_id + + return response def delete_rule( self, @@ -273,10 +158,10 @@ def delete_rule( force: Boolean = None, ) -> None: rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs - job_id = rule_scheduled_jobs.get(name) + job_id = rule_scheduled_jobs.pop(name, None) if job_id: - LOG.debug("Removing scheduled Events: {} | job_id: {}".format(name, job_id)) - JobScheduler.instance().cancel_job(job_id=job_id) + LOG.debug("Removing rule: %s (job_id: %s)", name, job_id) + self.job_scheduler.cancel_job(job_id=job_id) call_moto(context) def disable_rule( @@ -285,8 +170,18 @@ def disable_rule( rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs job_id = rule_scheduled_jobs.get(name) if job_id: - LOG.debug("Disabling Rule: {} | job_id: {}".format(name, job_id)) - JobScheduler.instance().disable_job(job_id=job_id) + LOG.debug("Disabling rule %s (job_id: %s)", name, job_id) + self.job_scheduler.disable_job(job_id=job_id) + call_moto(context) + + def enable_rule( + self, context: RequestContext, name: RuleName, event_bus_name: EventBusNameOrArn = None + ) -> None: + rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs + job_id = rule_scheduled_jobs.get(name) + if job_id: + LOG.debug("Enabling rule %s (job_id: %s)", name, job_id) + self.job_scheduler.enable_job(job_id=job_id) call_moto(context) def create_connection( @@ -349,6 +244,87 @@ def put_targets( return call_moto(context) + def _schedule_rule_job( + self, + region: str, + account_id: str, + event_bus_name_or_arn: EventBusNameOrArn, + rule_name: RuleName, + rule_state: RuleState, + schedule_expression: ScheduleExpression, + ) -> JobId | None: + """Used when PutRule is used with a ScheduleExpression. It creates a RuleEventDispatcher and + schedules it using the JobScheduler. Returns the JobId assigned by the JobScheduler.""" + try: + # guard against invalid expressions + parse_schedule_expression(schedule_expression) + except ValueError as e: + LOG.error("Error parsing schedule expression %s: %s", schedule_expression, e) + raise ValidationException("Parameter ScheduleExpression is not valid.") from e + + dispatcher = ScheduledRuleDispatcher( + region, account_id, get_event_bus_name(event_bus_name_or_arn), rule_name + ) + + enabled = rule_state != "DISABLED" + job_id = self.job_scheduler.add_job(dispatcher, schedule_expression, enabled) + return job_id + + +class ScheduledRuleDispatcher: + """ + Callable used as a Job function for a Rule that was scheduled using a schedule expression. + """ + + def __init__(self, region: str, account_id: str, event_bus_name: str, rule_name: str): + self.region = region + self.account_id = account_id + self.event_bus_name = event_bus_name + self.rule_name = rule_name + + def __call__(self, *args, **kwargs): + # look up rule on every call from moto to pick up any updated targets + from moto.events import events_backends + + moto_backend = events_backends[self.account_id][self.region] + event_bus = moto_backend.event_buses[self.event_bus_name] + rule = event_bus.rules.get(self.rule_name) + + if not rule: + # this should not happen, since the dispatcher is scheduled only after the rule has been + # created, and is removed once the rule is removed. but we guard against the case anyway. + LOG.warning( + "Event rule %s was not found in event bus %s", self.rule_name, self.event_bus_name + ) + return + + if not rule.targets: + LOG.debug("Event rule %s was triggered but has no targets", self.rule_name) + return + + # event id and timestamp should stay the same across targets + event = Event( + source="aws.events", + detail_type="Scheduled Event", + resources=rule.arn, + account=self.account_id, + region=self.region, + ) + + for target in rule.targets: + # TODO: RetryPolicy + dispatcher = EventDispatcher.dispatcher_for_target(target.get("Arn")) + try: + dispatcher.dispatch(event, target) + except Exception as e: + LOG.error( + "Failed to dispatch event notification rule %s to %s: %s", + rule.name, + target, + e, + exc_info=e if LOG.isEnabledFor(logging.DEBUG) else None, + ) + def _get_events_tmp_dir(): return os.path.join(config.dirs.tmp, EVENTS_TMP_DIR) @@ -622,6 +598,7 @@ def get_event_bus_name(event_bus_name_or_arn: Optional[EventBusNameOrArn] = None # specific logic for put_events which forwards matching events to target listeners def events_handler_put_events(self): + # TODO: replace with EventDispatcher entries = self._get_param("Entries") # keep track of events for local integration testing diff --git a/localstack/services/events/scheduler.py b/localstack/services/events/scheduler.py index d6dfa66c06ec9..573dbffcab1e0 100644 --- a/localstack/services/events/scheduler.py +++ b/localstack/services/events/scheduler.py @@ -1,101 +1,222 @@ import logging +import re import threading +import time +from concurrent.futures.thread import ThreadPoolExecutor +from datetime import timedelta +from typing import Callable from crontab import CronTab from localstack.utils.common import short_uid -from localstack.utils.run import FuncThread +from localstack.utils.scheduler import ScheduledTask, Scheduler LOG = logging.getLogger(__name__) -class Job: - def __init__(self, job_func, schedule, enabled): - self.job_func = job_func +class CronScheduledTask(ScheduledTask): + """ + Special implementation of a ScheduledTask that dynamically determines the next deadline based on a + CronTab expression. + """ + + schedule: CronTab + + def __init__( + self, task: Callable, schedule: CronTab, on_error: Callable[[Exception], None] = None + ) -> None: + super().__init__(task, on_error=on_error) self.schedule = schedule + self.set_next_deadline() + + def is_periodic(self) -> bool: + return True + + def set_next_deadline(self): + delay = self.schedule.next() + self.deadline = time.time() + delay + + +JobId = str + + +class Job: + """Glue between JobScheduler and Scheduler API""" + + func: Callable + schedule_expression: str + job_id: JobId + enabled: bool + task: ScheduledTask | None + schedule: CronTab | timedelta + + def __init__(self, func: Callable, scheduler_expression: str): + self.func = func + self.schedule_expression = scheduler_expression self.job_id = short_uid() - self.is_enabled = enabled + self.task = None + self.schedule = parse_schedule_expression(scheduler_expression) - def run(self): - try: - if self.should_run_now() and self.is_enabled: - self.do_run() - except Exception as e: - LOG.debug("Unable to run scheduled function %s: %s", self.job_func, e) + @property + def enabled(self) -> bool: + return self.task is not None - def should_run_now(self): - schedule = CronTab(self.schedule) - delay_secs = schedule.next() - return delay_secs is not None and delay_secs < 60 + def disable(self): + if self.task: + LOG.debug("Disabling job %s", self.job_id) + self.task.cancel() + self.task = None - def do_run(self): - FuncThread(self.job_func, name="events-job-run").start() + def enable(self, scheduler: Scheduler): + if self.task: + return + + schedule = parse_schedule_expression(self.schedule_expression) + + if isinstance(schedule, CronTab): + LOG.debug("Scheduling job %s with crontab %s", self.job_id, schedule) + self.task = CronScheduledTask( + self.func, + schedule=CronTab(self.schedule_expression), + on_error=self.on_execute_error, + ) + elif isinstance(schedule, timedelta): + LOG.debug("Scheduling job %s every %d seconds", self.job_id, schedule.seconds) + self.task = ScheduledTask( + self.func, + period=schedule.seconds, + on_error=self.on_execute_error, + ) + else: + raise ValueError(f"unexpected return type {type(schedule)}") + + scheduler.schedule_task(self.task) + + def on_execute_error(self, exception: Exception): + LOG.error("Error executing job %s", self.job_id, exc_info=exception) class JobScheduler: - _instance = None + """ + A singleton wrapper around a Scheduler that allows you to toggle scheduled tasks based on a unique job id. + """ def __init__(self): - # TODO: introduce RLock for mutating jobs list - self.jobs = [] - self.thread = None - self._stop_event = threading.Event() - - def add_job(self, job_func, schedule, enabled=True): - job = Job(job_func, schedule, enabled=enabled) - self.jobs.append(job) - return job.job_id - - def get_job(self, job_id) -> Job | None: - for job in self.jobs: - if job.job_id == job_id: - return job - - def disable_job(self, job_id): - for job in self.jobs: - if job.job_id == job_id: - job.is_enabled = False - break - - def cancel_job(self, job_id): - i = 0 - while i < len(self.jobs): - if self.jobs[i].job_id == job_id: - del self.jobs[i] - else: - i += 1 - - def loop(self, *args): - while not self._stop_event.is_set(): + self.jobs: dict[str, Job] = {} + self.mutex = threading.RLock() + self.executor = ThreadPoolExecutor(10, thread_name_prefix="events-jobscheduler-worker") + self.scheduler = Scheduler(executor=self.executor) + + def add_job(self, job_func: Callable, schedule_expression: str, enabled: bool = True) -> JobId: + with self.mutex: + job = Job(job_func, schedule_expression) + self.jobs[job.job_id] = job + if enabled: + job.enable(self.scheduler) + return job.job_id + + def get_job(self, job_id: JobId) -> Job | None: + return self.jobs.get(job_id) + + def enable_job(self, job_id: JobId): + with self.mutex: try: - for job in list(self.jobs): - job.run() - except Exception: - pass - # This is a simple heuristic to cause the loop to run apprx every minute - # TODO: we should keep track of jobs execution times, to avoid duplicate executions - self._stop_event.wait(timeout=59.9) - - def start_loop(self): - self.thread = FuncThread(self.loop, name="events-jobscheduler-loop") - self.thread.start() - - @classmethod - def instance(cls): - if not cls._instance: - cls._instance = JobScheduler() - return cls._instance - - @classmethod - def start(cls): - instance = cls.instance() - if not instance.thread: - instance.start_loop() - return instance - - @classmethod - def shutdown(cls): - instance = cls.instance() - if not instance.thread: - return - instance._stop_event.set() + self.jobs[job_id].enable(self.scheduler) + except KeyError: + raise ValueError(f"No job with id {job_id}") + + def disable_job(self, job_id: JobId): + with self.mutex: + try: + self.jobs[job_id].disable() + except KeyError: + raise ValueError(f"No job with id {job_id}") + + def cancel_job(self, job_id: JobId): + with self.mutex: + try: + self.jobs.pop(job_id).disable() + except KeyError: + raise ValueError(f"No job with id {job_id}") + + def shutdown(self): + self.scheduler.close() + self.executor.shutdown(cancel_futures=True) + + def start(self): + thread = threading.Thread( + target=self.scheduler.run, + name="events-jobscheduler-loop", + daemon=True, + ) + thread.start() + + +def parse_schedule_expression(expression: str) -> CronTab | timedelta: + """ + Parses a scheduling expression which can either be ``cron()`` or + ``rate( )``. In the first case, a ``CronTab`` object will be returned, and in the second case + a ``timedelta`` object will be returned. + + See https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule-schedule.html + + :param expression: the expression + :return: a CronTab or timedelta + """ + if expression.startswith("cron"): + return parse_cron_expression(expression) + if expression.startswith("rate"): + return parse_rate_expression(expression) + + raise ValueError("Syntax error in expression") + + +def parse_rate_expression(expression: str) -> timedelta: + """ + Parses a rate expression as defined in + https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rate-expressions.html. + + :param expression: a rate expression, e.g. rate(5 minutes) + :return: a timedelta describing the rate + """ + rate_pattern = r"rate\(([0-9]+) (minutes?|hours?|days?)\)" + + if matcher := re.match(rate_pattern, expression): + value = int(matcher.group(1)) + unit = matcher.group(2) + + if value < 1: + raise ValueError("Value needs to be larger than 0") + if value == 1 and unit.endswith("s"): + raise ValueError("If the value is equal to 1, then the unit must be singular") + if value > 1 and not unit.endswith("s"): + raise ValueError("If the value is greater than 1, the unit must be plural") + + if unit.startswith("minute"): + return timedelta(minutes=value) + elif unit.startswith("hour"): + return timedelta(hours=value) + elif unit.startswith("day"): + return timedelta(days=value) + else: + raise ValueError(f"Unknown rate unit {unit}") + + raise ValueError(f"Rate expression did not match pattern {rate_pattern}") + + +def parse_cron_expression(expression: str) -> CronTab: + """ + Parses a crontab expression as defined in + https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-cron-expressions.html. + + :param expression: a cron expression, e.g., cron(0 12 * * ? *) + :return: a CronTab instance + """ + if not expression.startswith("cron(") or not expression.endswith(")"): + raise ValueError("Cron expression did not match pattern cron()") + + expression = expression[5:-1] + if expression.startswith(" ") or expression.endswith(" "): + raise ValueError("Superfluous whitespaces in cron expression") + + return CronTab(expression) diff --git a/localstack/utils/json.py b/localstack/utils/json.py index 2f1d64303ec94..b17d79da31cb7 100644 --- a/localstack/utils/json.py +++ b/localstack/utils/json.py @@ -159,7 +159,7 @@ def canonical_json(obj): return json.dumps(obj, sort_keys=True) -def extract_jsonpath(value, path): +def extract_jsonpath(value: Any, path: str) -> Any | list[Any]: from jsonpath_rw import parse jsonpath_expr = parse(path) diff --git a/localstack/utils/scheduler.py b/localstack/utils/scheduler.py index c295ef70ced7a..75091176df6cf 100644 --- a/localstack/utils/scheduler.py +++ b/localstack/utils/scheduler.py @@ -126,11 +126,13 @@ def schedule( def schedule_task(self, task: ScheduledTask) -> None: """ - Schedules the given task and sets the deadline of the task to either ``task.start`` or the current time. + Schedules the given task and, unless the task already has one, sets the deadline of the task to + either ``task.start`` or the current time. :param task: the task to schedule """ - task.deadline = max(task.start or 0, time.time()) + if task.deadline is None: + task.deadline = max(task.start or 0, time.time()) self.add(task) def add(self, task: ScheduledTask) -> None: @@ -140,7 +142,7 @@ def add(self, task: ScheduledTask) -> None: :param task: the task to schedule. """ if task.deadline is None: - raise ValueError + raise ValueError("Task has no deadline. Run schedule_task instead or set a deadline.") task._cancelled = False @@ -148,6 +150,16 @@ def add(self, task: ScheduledTask) -> None: self._queue.put((task.deadline, task)) self._condition.notify() + def cancel(self, task: ScheduledTask) -> None: + task._cancelled = True + + def notify(self): + """ + Notify the run loop that something in the schedule has happened. + """ + with self._condition: + self._condition.notify() + def close(self) -> None: """ Terminates the run loop.