diff --git a/src/cronpal/scheduler.py b/src/cronpal/scheduler.py new file mode 100644 index 0000000..2de205e --- /dev/null +++ b/src/cronpal/scheduler.py @@ -0,0 +1,281 @@ +"""Scheduler for calculating cron expression run times.""" + +from datetime import datetime, timedelta +from typing import Optional + +from cronpal.exceptions import CronPalError +from cronpal.models import CronExpression +from cronpal.time_utils import ( + get_next_day, + get_next_hour, + get_next_minute, + get_next_month, + get_weekday, + is_valid_day_in_month, + round_to_next_minute, +) + + +class CronScheduler: + """Calculator for cron expression run times.""" + + def __init__(self, cron_expr: CronExpression): + """ + Initialize the scheduler with a cron expression. + + Args: + cron_expr: The CronExpression to calculate times for. + """ + self.cron_expr = cron_expr + self._validate_expression() + + def _validate_expression(self): + """Validate that the expression has all required fields.""" + if not self.cron_expr.is_valid(): + raise CronPalError("Invalid or incomplete cron expression") + + def get_next_run(self, after: Optional[datetime] = None) -> datetime: + """ + Calculate the next run time for the cron expression. + + Args: + after: The datetime to start searching from. + Defaults to current time if not provided. + + Returns: + The next datetime when the cron expression will run. + """ + if after is None: + after = datetime.now() + + # Round up to next minute if needed + current = round_to_next_minute(after) + + # Maximum iterations to prevent infinite loops + max_iterations = 10000 + iterations = 0 + + while iterations < max_iterations: + iterations += 1 + + # Check if current time matches the cron expression + if self._matches_time(current): + return current + + # Move to next possible time + current = self._advance_to_next_possible(current) + + raise CronPalError("Could not find next run time within reasonable limits") + + def _matches_time(self, dt: datetime) -> bool: + """ + Check if a datetime matches the cron expression. + + Args: + dt: The datetime to check. + + Returns: + True if the datetime matches all cron fields. + """ + # Check minute + if dt.minute not in self.cron_expr.minute.parsed_values: + return False + + # Check hour + if dt.hour not in self.cron_expr.hour.parsed_values: + return False + + # Check month + if dt.month not in self.cron_expr.month.parsed_values: + return False + + # Check day of month - but only if it's valid for this month + if not is_valid_day_in_month(dt.year, dt.month, dt.day): + return False + + # For day fields, we need to check if EITHER day of month OR day of week matches + # This is the standard cron behavior + day_of_month_match = dt.day in self.cron_expr.day_of_month.parsed_values + day_of_week_match = get_weekday(dt) in self.cron_expr.day_of_week.parsed_values + + # If both day of month and day of week are restricted (not wildcards), + # then we match if EITHER matches (OR logic) + if not self.cron_expr.day_of_month.is_wildcard() and not self.cron_expr.day_of_week.is_wildcard(): + return day_of_month_match or day_of_week_match + + # Otherwise both must match + return day_of_month_match and day_of_week_match + + def _advance_to_next_possible(self, dt: datetime) -> datetime: + """ + Advance datetime to the next possible matching time. + + Args: + dt: The current datetime. + + Returns: + The next datetime that could potentially match. + """ + # Try to advance minute first + next_minute = self._get_next_minute(dt) + if next_minute is not None: + return next_minute + + # If no valid minute in this hour, try next hour + next_hour = self._get_next_hour(dt) + if next_hour is not None: + return next_hour + + # If no valid hour today, try next day + next_day = self._get_next_day(dt) + if next_day is not None: + return next_day + + # If no valid day this month, try next month + return self._get_next_month(dt) + + def _get_next_minute(self, dt: datetime) -> Optional[datetime]: + """ + Get the next valid minute after the current time. + + Args: + dt: The current datetime. + + Returns: + Next valid minute in the same hour, or None if no valid minute. + """ + current_minute = dt.minute + valid_minutes = sorted(self.cron_expr.minute.parsed_values) + + for minute in valid_minutes: + if minute > current_minute: + return dt.replace(minute=minute, second=0, microsecond=0) + + return None + + def _get_next_hour(self, dt: datetime) -> Optional[datetime]: + """ + Get the next valid hour after the current time. + + Args: + dt: The current datetime. + + Returns: + Next valid hour in the same day, or None if no valid hour. + """ + current_hour = dt.hour + valid_hours = sorted(self.cron_expr.hour.parsed_values) + valid_minutes = sorted(self.cron_expr.minute.parsed_values) + + # First minute of the next valid hour + first_minute = valid_minutes[0] if valid_minutes else 0 + + for hour in valid_hours: + if hour > current_hour: + return dt.replace(hour=hour, minute=first_minute, second=0, microsecond=0) + + return None + + def _get_next_day(self, dt: datetime) -> Optional[datetime]: + """ + Get the next valid day after the current time. + + Args: + dt: The current datetime. + + Returns: + Next valid day in the same month, or None if no valid day. + """ + current_day = dt.day + valid_hours = sorted(self.cron_expr.hour.parsed_values) + valid_minutes = sorted(self.cron_expr.minute.parsed_values) + + # First time of the day + first_hour = valid_hours[0] if valid_hours else 0 + first_minute = valid_minutes[0] if valid_minutes else 0 + + # Check remaining days in the month + for day in range(current_day + 1, 32): + if not is_valid_day_in_month(dt.year, dt.month, day): + break + + test_dt = dt.replace(day=day, hour=first_hour, minute=first_minute, + second=0, microsecond=0) + + # Check if this day matches day constraints + day_of_month_match = day in self.cron_expr.day_of_month.parsed_values + day_of_week_match = get_weekday(test_dt) in self.cron_expr.day_of_week.parsed_values + + # Apply OR logic for day fields if both are restricted + if not self.cron_expr.day_of_month.is_wildcard() and not self.cron_expr.day_of_week.is_wildcard(): + if day_of_month_match or day_of_week_match: + return test_dt + else: + if day_of_month_match and day_of_week_match: + return test_dt + + return None + + def _get_next_month(self, dt: datetime) -> datetime: + """ + Get the first valid time in the next valid month. + + Args: + dt: The current datetime. + + Returns: + First valid time in the next valid month. + """ + valid_months = sorted(self.cron_expr.month.parsed_values) + valid_hours = sorted(self.cron_expr.hour.parsed_values) + valid_minutes = sorted(self.cron_expr.minute.parsed_values) + + # First time of any day + first_hour = valid_hours[0] if valid_hours else 0 + first_minute = valid_minutes[0] if valid_minutes else 0 + + # Try remaining months this year + for month in valid_months: + if month > dt.month: + # Find first valid day in this month + for day in range(1, 32): + if not is_valid_day_in_month(dt.year, month, day): + break + + test_dt = datetime(dt.year, month, day, first_hour, first_minute, 0, 0) + + # Check day constraints + day_of_month_match = day in self.cron_expr.day_of_month.parsed_values + day_of_week_match = get_weekday(test_dt) in self.cron_expr.day_of_week.parsed_values + + if not self.cron_expr.day_of_month.is_wildcard() and not self.cron_expr.day_of_week.is_wildcard(): + if day_of_month_match or day_of_week_match: + return test_dt + else: + if day_of_month_match and day_of_week_match: + return test_dt + + # No valid month found this year, try next year + next_year = dt.year + 1 + first_month = valid_months[0] + + # Find first valid day in the first month of next year + for day in range(1, 32): + if not is_valid_day_in_month(next_year, first_month, day): + break + + test_dt = datetime(next_year, first_month, day, first_hour, first_minute, 0, 0) + + # Check day constraints + day_of_month_match = day in self.cron_expr.day_of_month.parsed_values + day_of_week_match = get_weekday(test_dt) in self.cron_expr.day_of_week.parsed_values + + if not self.cron_expr.day_of_month.is_wildcard() and not self.cron_expr.day_of_week.is_wildcard(): + if day_of_month_match or day_of_week_match: + return test_dt + else: + if day_of_month_match and day_of_week_match: + return test_dt + + # This should rarely happen unless the cron expression is very restrictive + raise CronPalError("Could not find valid next month") \ No newline at end of file diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..22da88c --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,313 @@ +"""Tests for the cron scheduler.""" + +import sys +from datetime import datetime +from pathlib import Path + +import pytest + +# Add src to path for testing +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from cronpal.exceptions import CronPalError +from cronpal.field_parser import FieldParser +from cronpal.models import CronExpression +from cronpal.scheduler import CronScheduler + + +def create_cron_expression(expr_str: str) -> CronExpression: + """Helper to create a parsed CronExpression.""" + fields = expr_str.split() + expr = CronExpression(expr_str) + parser = FieldParser() + + expr.minute = parser.parse_minute(fields[0]) + expr.hour = parser.parse_hour(fields[1]) + expr.day_of_month = parser.parse_day_of_month(fields[2]) + expr.month = parser.parse_month(fields[3]) + expr.day_of_week = parser.parse_day_of_week(fields[4]) + + return expr + + +class TestCronScheduler: + """Tests for CronScheduler class.""" + + def test_initialization(self): + """Test scheduler initialization.""" + expr = create_cron_expression("0 0 * * *") + scheduler = CronScheduler(expr) + assert scheduler.cron_expr == expr + + def test_initialization_invalid_expression(self): + """Test scheduler with invalid expression.""" + expr = CronExpression("0 0 * * *") + # Expression without parsed fields + with pytest.raises(CronPalError, match="Invalid or incomplete"): + CronScheduler(expr) + + def test_every_minute(self): + """Test expression that runs every minute.""" + expr = create_cron_expression("* * * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 30, 0) + next_run = scheduler.get_next_run(start) + + # Should be the same time (already on a minute boundary) + assert next_run == start + + def test_every_minute_with_seconds(self): + """Test every minute with seconds in start time.""" + expr = create_cron_expression("* * * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 30, 45) + next_run = scheduler.get_next_run(start) + + # Should round up to next minute + assert next_run == datetime(2024, 1, 15, 10, 31, 0) + + def test_specific_minute(self): + """Test specific minute of every hour.""" + expr = create_cron_expression("15 * * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 15, 10, 15, 0) + + def test_specific_minute_next_hour(self): + """Test specific minute when current minute is past.""" + expr = create_cron_expression("15 * * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 30, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 15, 11, 15, 0) + + def test_specific_hour(self): + """Test specific hour of every day.""" + expr = create_cron_expression("0 14 * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 15, 14, 0, 0) + + def test_specific_hour_next_day(self): + """Test specific hour when current hour is past.""" + expr = create_cron_expression("0 14 * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 16, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 16, 14, 0, 0) + + def test_specific_day_of_month(self): + """Test specific day of month.""" + expr = create_cron_expression("0 0 15 * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 15, 0, 0, 0) + + def test_specific_day_of_month_next_month(self): + """Test specific day when current day is past.""" + expr = create_cron_expression("0 0 15 * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 20, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 2, 15, 0, 0, 0) + + def test_specific_month(self): + """Test specific month.""" + expr = create_cron_expression("0 0 1 6 *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 6, 1, 0, 0, 0) + + def test_specific_month_next_year(self): + """Test specific month when current month is past.""" + expr = create_cron_expression("0 0 1 6 *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 7, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2025, 6, 1, 0, 0, 0) + + def test_specific_weekday(self): + """Test specific day of week (Monday).""" + expr = create_cron_expression("0 0 * * 1") + scheduler = CronScheduler(expr) + + # Start on Sunday, January 14, 2024 + start = datetime(2024, 1, 14, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # Next Monday is January 15 + assert next_run == datetime(2024, 1, 15, 0, 0, 0) + assert next_run.weekday() == 0 # Monday in Python + + def test_weekday_and_monthday(self): + """Test expression with both day of month and day of week.""" + # Run on 15th OR Mondays (OR logic) + expr = create_cron_expression("0 0 15 * 1") + scheduler = CronScheduler(expr) + + # Start on January 13 (Saturday) + start = datetime(2024, 1, 13, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # Next should be January 15 (Monday) - matches both + assert next_run == datetime(2024, 1, 15, 0, 0, 0) + + def test_every_15_minutes(self): + """Test every 15 minutes.""" + expr = create_cron_expression("*/15 * * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 5, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 15, 10, 15, 0) + + def test_business_hours(self): + """Test business hours (9-17 on weekdays).""" + expr = create_cron_expression("0 9-17 * * 1-5") + scheduler = CronScheduler(expr) + + # Start Friday afternoon + start = datetime(2024, 1, 12, 16, 30, 0) # Friday + next_run = scheduler.get_next_run(start) + + # Next is 5 PM same day + assert next_run == datetime(2024, 1, 12, 17, 0, 0) + + def test_business_hours_weekend_skip(self): + """Test business hours skipping weekend.""" + expr = create_cron_expression("0 9 * * 1-5") + scheduler = CronScheduler(expr) + + # Start Friday after business hours + start = datetime(2024, 1, 12, 18, 0, 0) # Friday evening + next_run = scheduler.get_next_run(start) + + # Next is Monday morning + assert next_run == datetime(2024, 1, 15, 9, 0, 0) + assert next_run.weekday() == 0 # Monday + + def test_last_day_of_month(self): + """Test handling of last day of month.""" + expr = create_cron_expression("0 0 31 * *") + scheduler = CronScheduler(expr) + + # Start in February (no 31st) + start = datetime(2024, 2, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # Should skip to March 31 + assert next_run == datetime(2024, 3, 31, 0, 0, 0) + + def test_february_29(self): + """Test February 29 in leap year.""" + expr = create_cron_expression("0 0 29 2 *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # 2024 is a leap year + assert next_run == datetime(2024, 2, 29, 0, 0, 0) + + def test_february_29_non_leap_year(self): + """Test February 29 in non-leap year.""" + expr = create_cron_expression("0 0 29 2 *") + scheduler = CronScheduler(expr) + + start = datetime(2023, 1, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # 2023 is not a leap year, skip to 2024 + assert next_run == datetime(2024, 2, 29, 0, 0, 0) + + def test_complex_expression(self): + """Test complex expression with multiple constraints.""" + expr = create_cron_expression("30 2 1-15 * MON-FRI") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 1, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # First occurrence is January 1 (Monday) at 2:30 AM + assert next_run == datetime(2024, 1, 1, 2, 30, 0) + + def test_range_with_step(self): + """Test range with step values.""" + expr = create_cron_expression("0 */4 * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 30, 0) + next_run = scheduler.get_next_run(start) + + # Next 4-hour interval is 12:00 + assert next_run == datetime(2024, 1, 15, 12, 0, 0) + + def test_list_of_values(self): + """Test list of specific values.""" + expr = create_cron_expression("0 9,12,15 * * *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 0, 0) + next_run = scheduler.get_next_run(start) + + assert next_run == datetime(2024, 1, 15, 12, 0, 0) + + def test_current_time_matches(self): + """Test when current time already matches.""" + expr = create_cron_expression("30 10 15 1 *") + scheduler = CronScheduler(expr) + + start = datetime(2024, 1, 15, 10, 30, 0) + next_run = scheduler.get_next_run(start) + + # Current time matches, so it should return the same time + assert next_run == start + + def test_sunday_as_zero(self): + """Test Sunday as day 0.""" + expr = create_cron_expression("0 0 * * 0") + scheduler = CronScheduler(expr) + + # Start on Saturday + start = datetime(2024, 1, 13, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # Next Sunday is January 14 + assert next_run == datetime(2024, 1, 14, 0, 0, 0) + assert next_run.weekday() == 6 # Sunday in Python + + def test_sunday_as_seven(self): + """Test Sunday as day 7 (should be same as 0).""" + expr = create_cron_expression("0 0 * * 7") + scheduler = CronScheduler(expr) + + # Start on Saturday + start = datetime(2024, 1, 13, 0, 0, 0) + next_run = scheduler.get_next_run(start) + + # Next Sunday is January 14 + assert next_run == datetime(2024, 1, 14, 0, 0, 0) + assert next_run.weekday() == 6 # Sunday in Python \ No newline at end of file