diff --git a/src/aap_eda/analytics/analytics_collectors.py b/src/aap_eda/analytics/analytics_collectors.py index d22ca3175..ee3f293c3 100644 --- a/src/aap_eda/analytics/analytics_collectors.py +++ b/src/aap_eda/analytics/analytics_collectors.py @@ -516,7 +516,9 @@ def projects_table( def rulebooks_table( since: datetime, full_path: str, until: datetime, **kwargs ) -> list[str]: - queryset = _get_query(models.Rulebook.objects, since, until) + queryset = _get_query(models.Rulebook.objects, since, until).defer( + "rulesets_sha256" + ) return _copy_table("rulebooks", queryset, full_path) diff --git a/src/aap_eda/api/serializers/activation.py b/src/aap_eda/api/serializers/activation.py index 065e2249d..86db94482 100644 --- a/src/aap_eda/api/serializers/activation.py +++ b/src/aap_eda/api/serializers/activation.py @@ -279,6 +279,7 @@ class Meta: "is_enabled", "status", "git_hash", + "restart_on_project_update", "extra_var", "decision_environment_id", "project_id", @@ -341,6 +342,7 @@ class Meta: "description", "is_enabled", "status", + "restart_on_project_update", "extra_var", "decision_environment_id", "project_id", @@ -406,6 +408,9 @@ def to_representation(self, activation): "rulebook_id": activation.rulebook_id, "extra_var": extra_var, "organization_id": activation.organization_id, + "restart_on_project_update": ( + activation.restart_on_project_update + ), "restart_policy": activation.restart_policy, "restart_count": activation.restart_count, "rulebook_name": activation.rulebook_name, @@ -441,6 +446,7 @@ class Meta: "name", "description", "is_enabled", + "restart_on_project_update", "decision_environment_id", "rulebook_id", "extra_var", @@ -509,6 +515,9 @@ def create(self, validated_data): validated_data["user_id"] = validated_data["user"].id validated_data["rulebook_name"] = rulebook.name validated_data["rulebook_rulesets"] = rulebook.rulesets + validated_data["rulebook_rulesets_sha256"] = get_rulebook_hash( + rulebook.rulesets + ) validated_data["git_hash"] = rulebook.project.git_hash validated_data["project_id"] = rulebook.project.id validated_data["log_tracking_id"] = str(uuid.uuid4()) @@ -572,6 +581,7 @@ def copy(self) -> dict: "skip_audit_events": activation.skip_audit_events, "rulebook_name": activation.rulebook.name, "rulebook_rulesets": activation.rulebook_rulesets, + "rulebook_rulesets_sha256": activation.rulebook_rulesets_sha256, "git_hash": activation.rulebook.project.git_hash, "project": activation.rulebook.project, "log_tracking_id": str(uuid.uuid4()), @@ -602,6 +612,7 @@ class Meta: "name", "description", "is_enabled", + "restart_on_project_update", "decision_environment_id", "rulebook_id", "extra_var", @@ -687,6 +698,9 @@ def prepare_update(self, activation: models.Activation): rulebook = models.Rulebook.objects.get(id=rulebook_id) self.validated_data["rulebook_name"] = rulebook.name self.validated_data["rulebook_rulesets"] = rulebook.rulesets + self.validated_data[ + "rulebook_rulesets_sha256" + ] = get_rulebook_hash(rulebook.rulesets) self.validated_data["git_hash"] = rulebook.project.git_hash self.validated_data["project_id"] = rulebook.project.id @@ -701,9 +715,11 @@ def prepare_update(self, activation: models.Activation): if yaml.safe_load(self.validated_data.get("source_mappings", "")): if not rulebook_id: # load the original ruleset + rulesets = activation.rulebook.rulesets + self.validated_data["rulebook_rulesets"] = rulesets self.validated_data[ - "rulebook_rulesets" - ] = activation.rulebook.rulesets + "rulebook_rulesets_sha256" + ] = get_rulebook_hash(rulesets) _update_event_streams_and_credential(self.validated_data) else: @@ -875,6 +891,7 @@ class Meta: "decision_environment", "status", "git_hash", + "restart_on_project_update", "project", "rulebook", "extra_var", @@ -973,6 +990,28 @@ def to_representation(self, activation): else "" ) + warnings = [] + if activation.source_mappings: + if not activation.rulebook: + warnings.append( + "The rulebook associated with this activation " + "no longer exists. Source mappings may be " + "invalid." + ) + elif ( + activation.rulebook_rulesets_sha256 + and activation.rulebook.rulesets_sha256 + and ( + activation.rulebook_rulesets_sha256 + != activation.rulebook.rulesets_sha256 + ) + ): + warnings.append( + "Rulebook content has changed since event " + "stream sources were mapped. Please update " + "source mappings." + ) + return { "id": activation.id, "name": activation.name, @@ -988,6 +1027,9 @@ def to_representation(self, activation): "instances": ActivationInstanceSerializer( activation_instances, many=True ).data, + "restart_on_project_update": ( + activation.restart_on_project_update + ), "restart_policy": activation.restart_policy, "restart_count": activation.restart_count, "rulebook_name": activation.rulebook_name, @@ -1006,6 +1048,7 @@ def to_representation(self, activation): "k8s_service_name": activation.k8s_service_name, "event_streams": event_streams, "source_mappings": activation.source_mappings, + "warnings": warnings, "skip_audit_events": activation.skip_audit_events, "log_tracking_id": activation.log_tracking_id, "created_by": BasicUserSerializer(activation.created_by).data, diff --git a/src/aap_eda/api/serializers/project.py b/src/aap_eda/api/serializers/project.py index 0f5a013c0..a96a9c12c 100644 --- a/src/aap_eda/api/serializers/project.py +++ b/src/aap_eda/api/serializers/project.py @@ -70,6 +70,9 @@ class Meta: "scm_refspec", "verify_ssl", "proxy", + "update_revision_on_launch", + "scm_update_cache_timeout", + "last_synced_at", "created_by", "modified_by", *read_only_fields, @@ -119,6 +122,22 @@ class ProjectCreateRequestSerializer( validators=[validators.check_if_refspec_valid], ) + scm_update_cache_timeout = serializers.IntegerField( + required=False, + default=0, + min_value=0, + max_value=86400, # Max 1 day + help_text=( + "Cache timeout in seconds for project updates " + "(0 = no cache, max 86400)" + ), + ) + update_revision_on_launch = serializers.BooleanField( + required=False, + default=False, + help_text="Enable automatic project sync on activation launch", + ) + class Meta: model = models.Project fields = [ @@ -133,6 +152,8 @@ class Meta: "scm_type", "scm_branch", "scm_refspec", + "update_revision_on_launch", + "scm_update_cache_timeout", ] @@ -213,6 +234,19 @@ class ProjectUpdateRequestSerializer( validators.check_if_scm_url_valid, ], ) + scm_update_cache_timeout = serializers.IntegerField( + required=False, + min_value=0, + max_value=86400, # Max 1 day + help_text=( + "Cache timeout in seconds for project updates " + "(0 = no cache, max 86400)" + ), + ) + update_revision_on_launch = serializers.BooleanField( + required=False, + help_text="Enable automatic project sync on activation launch", + ) class Meta: model = models.Project @@ -227,6 +261,8 @@ class Meta: "verify_ssl", "proxy", "url", + "update_revision_on_launch", + "scm_update_cache_timeout", ] def validate(self, data): @@ -288,6 +324,9 @@ class Meta: "scm_branch", "scm_refspec", "proxy", + "update_revision_on_launch", + "scm_update_cache_timeout", + "last_synced_at", "created_by", "modified_by", *read_only_fields, @@ -326,6 +365,9 @@ def to_representation(self, project): "organization": organization, "eda_credential": eda_credential, "signature_validation_credential": signature_validation_credential, + "update_revision_on_launch": project.update_revision_on_launch, + "scm_update_cache_timeout": project.scm_update_cache_timeout, + "last_synced_at": project.last_synced_at, "import_state": project.import_state, "import_error": project.import_error, "created_at": project.created_at, diff --git a/src/aap_eda/api/views/activation.py b/src/aap_eda/api/views/activation.py index 3aca65666..cb2ade7f5 100644 --- a/src/aap_eda/api/views/activation.py +++ b/src/aap_eda/api/views/activation.py @@ -41,10 +41,9 @@ start_rulebook_process, stop_rulebook_process, ) +from aap_eda.tasks.project import sync_project from aap_eda.utils import str_to_bool -# RedisDependencyMixin import removed - no longer required with dispatcherd - logger = logging.getLogger(__name__) resource_name = "RulebookActivation" @@ -254,7 +253,8 @@ def destroy(self, request, *args, **kwargs): with transaction.atomic(): activation.status = ActivationStatus.DELETING - activation.save(update_fields=["status"]) + activation.awaiting_project_sync = False + activation.save(update_fields=["status", "awaiting_project_sync"]) name = activation.name delete_rulebook_process( @@ -424,6 +424,10 @@ def _start(self, request, activation: models.Activation) -> Response: # Check if activation workers are available before enabling check_dispatcherd_workers_health(raise_exceptions=True) + sync_response = self._sync_project_if_needed(activation) + if sync_response: + return sync_response + logger.info(f"Now enabling {activation.name} ...") activation.is_enabled = True @@ -497,12 +501,15 @@ def disable(self, request, pk): activation, force_disable, "Disabled" ) + if activation.awaiting_project_sync: + activation.awaiting_project_sync = False + activation.save(update_fields=["awaiting_project_sync"]) + if activation.is_enabled: # Check if activation workers are available before disabling # (unless force=true to allow operation with unhealthy workers) if not force_disable: check_dispatcherd_workers_health(raise_exceptions=True) - if activation.status in [ ActivationStatus.STARTING, ActivationStatus.RUNNING, @@ -544,7 +551,7 @@ def disable(self, request, pk): ), status.HTTP_409_CONFLICT: OpenApiResponse( None, - description="Activation blocked while Workers offline.", + description=("Activation blocked while Workers offline."), ), }, parameters=[ @@ -593,6 +600,10 @@ def restart(self, request, pk): {"errors": error}, status=status.HTTP_400_BAD_REQUEST ) + sync_response = self._sync_project_if_needed(activation) + if sync_response: + return sync_response + restart_rulebook_process( process_parent_type=ProcessParentType.ACTIVATION, process_parent_id=activation.id, @@ -657,6 +668,67 @@ def copy(self, request, pk): status=status.HTTP_201_CREATED, ) + def _sync_project_if_needed( + self, activation: models.Activation + ) -> Response | None: + """Check if project sync is needed before launch. + + Sets awaiting_project_sync flag, updates status to PENDING, + and triggers sync if project is not already syncing. + + Returns a 202 Response if sync was initiated, or None if + no sync is needed and the caller should proceed normally. + """ + if not activation.needs_project_update_on_launch: + return None + + activation.awaiting_project_sync = True + activation.status = ActivationStatus.PENDING + activation.status_message = "Waiting for project sync to complete" + activation.save( + update_fields=[ + "awaiting_project_sync", + "status", + "status_message", + ] + ) + + # Only submit a new sync if one isn't already running. + # If a sync IS in progress, the flag is already set and + # the running sync will resume this activation on completion. + if activation.project.import_state not in [ + models.Project.ImportState.PENDING, + models.Project.ImportState.RUNNING, + ]: + try: + sync_project(activation.project.id) + except Exception as e: + logger.error( + f"Failed to start project sync for " + f"'{activation.name}': {e}", + exc_info=True, + ) + activation.awaiting_project_sync = False + activation.status = ActivationStatus.ERROR + activation.status_message = ( + f"Failed to start project sync: {e}" + ) + activation.save( + update_fields=[ + "awaiting_project_sync", + "status", + "status_message", + ] + ) + return Response( + {"errors": str(e)}, + status=status.HTTP_400_BAD_REQUEST, + ) + + return Response( + status=status.HTTP_204_NO_CONTENT, + ) + def _check_deleting(self, activation): if activation.status == ActivationStatus.DELETING: raise exceptions.APIException( diff --git a/src/aap_eda/core/migrations/0068_add_project_sync_fields.py b/src/aap_eda/core/migrations/0068_add_project_sync_fields.py new file mode 100644 index 000000000..2aa84b389 --- /dev/null +++ b/src/aap_eda/core/migrations/0068_add_project_sync_fields.py @@ -0,0 +1,47 @@ +# Generated by Django 5.2.9 on 2026-01-27 22:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0067_decisionenvironment_pull_policy"), + ] + + operations = [ + migrations.AddField( + model_name="project", + name="last_synced_at", + field=models.DateTimeField( + blank=True, + default=None, + help_text="Timestamp of the last successful project sync", + null=True, + ), + ), + migrations.AddField( + model_name="project", + name="scm_update_cache_timeout", + field=models.IntegerField( + default=0, + help_text="Cache timeout in seconds for project updates (0 = always update, max 86400)", + ), + ), + migrations.AddField( + model_name="project", + name="update_revision_on_launch", + field=models.BooleanField( + default=False, + help_text="Enable automatic project sync on activation launch", + ), + ), + migrations.AddConstraint( + model_name="project", + constraint=models.CheckConstraint( + condition=models.Q( + ("scm_update_cache_timeout__range", (0, 86400)) + ), + name="ck_cache_timeout_range", + ), + ), + ] diff --git a/src/aap_eda/core/migrations/0069_activation_restart_and_sync_fields.py b/src/aap_eda/core/migrations/0069_activation_restart_and_sync_fields.py new file mode 100644 index 000000000..450c18084 --- /dev/null +++ b/src/aap_eda/core/migrations/0069_activation_restart_and_sync_fields.py @@ -0,0 +1,74 @@ +import hashlib + +from django.db import migrations, models + + +def _compute_sha256(content): + return hashlib.sha256((content or "").encode("utf-8")).hexdigest() + + +def populate_rulesets_sha256(apps, schema_editor): + """Compute SHA256 for existing records that lack it.""" + Rulebook = apps.get_model("core", "Rulebook") # noqa: N806 + for rb in Rulebook.objects.filter(rulesets_sha256="").iterator( + chunk_size=500 + ): + rb.rulesets_sha256 = _compute_sha256(rb.rulesets) + rb.save(update_fields=["rulesets_sha256"]) + + Activation = apps.get_model("core", "Activation") # noqa: N806 + for act in Activation.objects.filter(rulebook_rulesets_sha256="").iterator( + chunk_size=500 + ): + act.rulebook_rulesets_sha256 = _compute_sha256(act.rulebook_rulesets) + act.save(update_fields=["rulebook_rulesets_sha256"]) + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0068_add_project_sync_fields"), + ] + + operations = [ + migrations.AddField( + model_name="activation", + name="restart_on_project_update", + field=models.BooleanField( + default=False, + help_text="Auto-restart when rulebook changes after project sync", + ), + ), + migrations.AddField( + model_name="activation", + name="awaiting_project_sync", + field=models.BooleanField( + default=False, + help_text=( + "Activation is waiting for project " + "sync to complete before launch" + ), + ), + ), + migrations.AddField( + model_name="rulebook", + name="rulesets_sha256", + field=models.CharField( + default="", + help_text="SHA256 hash of rulesets content", + max_length=64, + ), + ), + migrations.AddField( + model_name="activation", + name="rulebook_rulesets_sha256", + field=models.CharField( + default="", + help_text="SHA256 hash of original rulebook content", + max_length=64, + ), + ), + migrations.RunPython( + populate_rulesets_sha256, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/src/aap_eda/core/models/activation.py b/src/aap_eda/core/models/activation.py index b7c069db5..2fd2b8dd7 100644 --- a/src/aap_eda/core/models/activation.py +++ b/src/aap_eda/core/models/activation.py @@ -58,6 +58,17 @@ class Meta: ) is_enabled = models.BooleanField(default=DEFAULT_ENABLED) git_hash = models.TextField(null=False, default="") + restart_on_project_update = models.BooleanField( + default=False, + help_text="Auto-restart when rulebook changes after project sync", + ) + awaiting_project_sync = models.BooleanField( + default=False, + help_text=( + "Activation is waiting for project " + "sync to complete before launch" + ), + ) # TODO(alex) Since local activations are no longer supported # this field should be mandatory. decision_environment = models.ForeignKey( @@ -92,6 +103,14 @@ class Meta: null=False, help_text="Content of the last referenced rulebook", ) + # Hash of the original rulebook rulesets content (before + # event stream source swapping). Used for change detection + # against the upstream rulebook, not the stored content. + rulebook_rulesets_sha256 = models.CharField( + max_length=64, + default="", + help_text="SHA256 hash of original rulebook content", + ) ruleset_stats = models.JSONField(default=dict) user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True) created_at = models.DateTimeField(auto_now_add=True, null=False) @@ -161,3 +180,10 @@ def get_parent_type(self) -> str: def _get_skip_audit_events(self) -> bool: """Activation can optionally skip audit events.""" return self.skip_audit_events + + @property + def needs_project_update_on_launch(self) -> bool: + """Check if activation requires project update on launch.""" + if not self.project: + return False + return self.project.needs_update_on_launch diff --git a/src/aap_eda/core/models/project.py b/src/aap_eda/core/models/project.py index df8b026ce..c706e96c1 100644 --- a/src/aap_eda/core/models/project.py +++ b/src/aap_eda/core/models/project.py @@ -24,23 +24,36 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import logging + from django.db import models +from django.utils import timezone from aap_eda.core.utils.crypto.fields import EncryptedTextField from .base import BaseOrgModel, PrimordialModel, UniqueNamedModel +logger = logging.getLogger(__name__) + PROJECT_ARCHIVE_DIR = "projects/" class Project(BaseOrgModel, UniqueNamedModel, PrimordialModel): + # Grace period to prevent infinite sync->restart->sync cycles + SYNC_GRACE_PERIOD_SECONDS = 30 + class Meta: db_table = "core_project" constraints = [ models.CheckConstraint( check=~models.Q(name=""), name="ck_empty_project_name", - ) + ), + models.CheckConstraint( + check=models.Q(scm_update_cache_timeout__range=(0, 86400)), + name="ck_cache_timeout_range", + ), ] permissions = [ ("sync_project", "Can sync a project"), @@ -95,9 +108,78 @@ class ScmType(models.TextChoices): on_delete=models.SET_NULL, ) + # Project Auto-Sync Configuration Fields + update_revision_on_launch = models.BooleanField( + default=False, + help_text="Enable automatic project sync on activation launch", + ) + scm_update_cache_timeout = models.IntegerField( + default=0, + help_text=( + "Cache timeout in seconds for project updates" + " (0 = always update, max 86400)" + ), + ) + last_synced_at = models.DateTimeField( + null=True, + blank=True, + default=None, + help_text="Timestamp of the last successful project sync", + ) + def __repr__(self) -> str: return f"<{self.__class__.__name__}(id={self.id}, name={self.name})>" + @property + def needs_update_on_launch(self) -> bool: + """ + Determine if project needs to be updated on activation launch. + + Returns False if update_revision_on_launch is disabled. + Returns False if within SYNC_GRACE_PERIOD_SECONDS of last sync. + Returns False if cache timeout hasn't expired. + Returns True otherwise. + + The grace period prevents infinite loops when + restart_on_project_update=True on activations, even if + scm_update_cache_timeout=0 (always update). + """ + try: + if not self.update_revision_on_launch: + return False + + current_time = timezone.now() + + # Apply grace period to prevent infinite restart loops + if self.last_synced_at is not None: + grace_expiry = self.last_synced_at + datetime.timedelta( + seconds=self.SYNC_GRACE_PERIOD_SECONDS + ) + if current_time <= grace_expiry: + return False + + # If cache timeout is 0, always update (after grace period) + if self.scm_update_cache_timeout == 0: + return True + + # If never synced, update is needed + if self.last_synced_at is None: + return True + + # Check if cache timeout has expired + cache_expiry = self.last_synced_at + datetime.timedelta( + seconds=self.scm_update_cache_timeout + ) + return current_time > cache_expiry + + except (AttributeError, TypeError, ValueError) as e: + # Log error but return safe default to prevent activation failures + logger.error( + f"Error determining sync status for project {self.pk}: {e}" + ) + # Safe default: assume sync needed to prevent stale content + return True + __all__ = [ "Project", diff --git a/src/aap_eda/core/models/rulebook.py b/src/aap_eda/core/models/rulebook.py index ef2df7439..fd59b8346 100644 --- a/src/aap_eda/core/models/rulebook.py +++ b/src/aap_eda/core/models/rulebook.py @@ -41,6 +41,11 @@ class Meta: # TODO: should the content of this field be validated? # https://issues.redhat.com/browse/AAP-19202 rulesets = models.TextField(null=False, default="") + rulesets_sha256 = models.CharField( + max_length=64, + default="", + help_text="SHA256 hash of rulesets content", + ) project = models.ForeignKey("Project", on_delete=models.CASCADE, null=True) created_at = models.DateTimeField(auto_now_add=True, null=False) modified_at = models.DateTimeField(auto_now=True, null=False) diff --git a/src/aap_eda/core/utils/rulebook.py b/src/aap_eda/core/utils/rulebook.py index 616b0f3eb..5ba8af87a 100644 --- a/src/aap_eda/core/utils/rulebook.py +++ b/src/aap_eda/core/utils/rulebook.py @@ -72,22 +72,14 @@ def build_source_list(rulesets_data: str) -> list[dict]: def get_rulebook_hash(rulebook: str) -> str: """ - Get the hash code of rulebook. + Get the SHA256 hash of rulebook content. Args: - rulebook: string format of rulebook + rulebook: string format of rulebook content Returns: the hexadecimal representation of the hash - - """ - if isinstance(rulebook, str): - rulebook = rulebook.encode("utf-8") - - sha256 = hashlib.sha256() - sha256.update(rulebook) - - return sha256.hexdigest() + return hashlib.sha256((rulebook or "").encode("utf-8")).hexdigest() def swap_event_stream_sources( diff --git a/src/aap_eda/services/project/imports.py b/src/aap_eda/services/project/imports.py index 2736dd6e4..86aa6d78f 100644 --- a/src/aap_eda/services/project/imports.py +++ b/src/aap_eda/services/project/imports.py @@ -27,6 +27,7 @@ from aap_eda.core import models from aap_eda.core.types import StrPath +from aap_eda.core.utils.rulebook import get_rulebook_hash from aap_eda.services.project.scm import ScmRepository logger = logging.getLogger(__name__) @@ -184,6 +185,7 @@ def _import_rulebook( project=project, name=rulebook_info.relpath, rulesets=rulebook_info.raw_content, + rulesets_sha256=get_rulebook_hash(rulebook_info.raw_content), organization=project.organization, ) return rulebook @@ -194,15 +196,25 @@ def _sync_rulebook( rulebook_info: RulebookInfo, git_hash: str, ): - if rulebook.rulesets == rulebook_info.raw_content: + new_sha256 = get_rulebook_hash(rulebook_info.raw_content) + if rulebook.rulesets_sha256 == new_sha256: models.Activation.objects.filter(rulebook=rulebook).update( - git_hash=git_hash, + git_hash=git_hash ) return rulebook.rulesets = rulebook_info.raw_content - rulebook.save() - models.Activation.objects.filter(rulebook=rulebook).update( + rulebook.rulesets_sha256 = new_sha256 + rulebook.save(update_fields=["rulesets", "rulesets_sha256"]) + # Update activations without auto-restart. Those with + # restart_on_project_update=True are handled by + # _auto_restart_activations which needs to detect + # the change via SHA256 comparison. + models.Activation.objects.filter( + rulebook=rulebook, + restart_on_project_update=False, + ).update( rulebook_rulesets=rulebook.rulesets, + rulebook_rulesets_sha256=new_sha256, git_hash=git_hash, ) diff --git a/src/aap_eda/tasks/project.py b/src/aap_eda/tasks/project.py index eedd667ac..8ba5c1a1f 100644 --- a/src/aap_eda/tasks/project.py +++ b/src/aap_eda/tasks/project.py @@ -13,15 +13,25 @@ # limitations under the License. import logging +from datetime import timedelta +from typing import Optional from ansible_base.lib.utils.db import advisory_lock from dispatcherd.publish import submit_task from django.conf import settings +from django.core.exceptions import ObjectDoesNotExist +from django.db import DatabaseError, transaction +from django.utils import timezone from aap_eda import utils from aap_eda.core import models +from aap_eda.core.enums import ActivationStatus, ProcessParentType from aap_eda.services.project import ProjectImportError, ProjectImportService -from aap_eda.tasks.orchestrator import check_rulebook_queue_health +from aap_eda.tasks.orchestrator import ( + check_rulebook_queue_health, + restart_rulebook_process, + start_rulebook_process, +) logger = logging.getLogger(__name__) PROJECT_TASKS_QUEUE = "default" @@ -68,15 +78,45 @@ def _import_project_no_lock(project_id: int): This function is intended to be run by the tasking system inside the lock. """ - logger.info(f"Task started: Import project ( {project_id=} )") + logger.info(f"Task started: Import project ({project_id=})") - project = models.Project.objects.get(pk=project_id) + error_message = None try: - ProjectImportService().import_project(project) + project = _get_project_safely(project_id) + if not project: + logger.info( + f"Task complete: Import project " + f"(project_id={project_id}) - project not found" + ) + return + + with transaction.atomic(): + ProjectImportService().import_project(project) + project.last_synced_at = timezone.now() + project.save(update_fields=["last_synced_at"]) except ProjectImportError as e: - logger.error(e, exc_info=settings.DEBUG) + logger.error( + f"Project import error for project {project_id}: {e}", + exc_info=True, + ) + error_message = f"Import failed: {str(e)}" + except DatabaseError as e: + logger.error( + f"Database error during project import {project_id}: {e}", + exc_info=True, + ) + error_message = "Database error during import" + except Exception as e: + logger.error( + f"Unexpected error during project import {project_id}: {e}", + exc_info=True, + ) + error_message = f"Unexpected error during import: {str(e)}" + finally: + if error_message: + _handle_project_error_recovery(project_id, error_message) - logger.info(f"Task complete: Import project ( project_id={project.id} )") + logger.info(f"Task complete: Import project (project_id={project_id})") def sync_project(project_id: int) -> str: @@ -105,17 +145,346 @@ def _sync_project(project_id: int): def _sync_project_no_lock(project_id: int): """Sync project without lock. - This function is intended to be run by the tasking system inside the lock. + This function is intended to be run by the tasking system + inside the lock. After sync completes, handles post-sync + activation operations (auto-restart and resume waiting). """ - logger.info(f"Task started: Sync project ( {project_id=} )") + logger.info(f"Task started: Sync project ({project_id=})") - project = models.Project.objects.get(pk=project_id) + error_message = None try: - ProjectImportService().sync_project(project) + project = _get_project_safely(project_id) + if not project: + logger.info( + f"Task complete: Sync project " + f"(project_id={project_id}) - project not found" + ) + _handle_sync_failure_activations( + project_id, "Project not found or deleted" + ) + return + + with transaction.atomic(): + ProjectImportService().sync_project(project) + project.last_synced_at = timezone.now() + project.save(update_fields=["last_synced_at"]) except ProjectImportError as e: - logger.error(e, exc_info=settings.DEBUG) + logger.error( + f"Project sync error for project {project_id}: {e}", + exc_info=True, + ) + error_message = f"Sync failed: {str(e)}" + except DatabaseError as e: + logger.error( + f"Database error during project sync {project_id}: {e}", + exc_info=True, + ) + error_message = "Database error during sync" + except Exception as e: + logger.error( + f"Unexpected error during project sync {project_id}: {e}", + exc_info=True, + ) + error_message = f"Unexpected error during sync: {str(e)}" + + if error_message: + _handle_project_error_recovery(project_id, error_message) + _handle_sync_failure_activations(project_id, error_message) + else: + _handle_post_sync_activations(project) + + logger.info(f"Task complete: Sync project (project_id={project_id})") + + +def _handle_post_sync_activations(project: models.Project): + """Handle activation operations after a successful sync. + + 1. Auto-restart enabled activations with changed content + 2. Resume activations waiting for project sync + """ + try: + _auto_restart_activations(project) + except Exception as e: + logger.error( + f"Auto-restart failed for project {project.id}: {e}", + exc_info=True, + ) + + try: + _resume_waiting_activations(project) + except Exception as e: + logger.error( + f"Resume waiting activations failed for " + f"project {project.id}: {e}", + exc_info=True, + ) + + +def _auto_restart_activations(project: models.Project): + """Auto-restart enabled activations when rulebook content changes. + + Excludes activations with awaiting_project_sync=True since + those will be handled by _resume_waiting_activations. + Skips activations with source_mappings when content changes, + as those require manual source mapping updates. + """ + activations = models.Activation.objects.filter( + project=project, + is_enabled=True, + restart_on_project_update=True, + awaiting_project_sync=False, + ) + + restart_count = 0 + checked_count = 0 + for activation in activations: + checked_count += 1 + try: + current_rulebook = models.Rulebook.objects.get( + id=activation.rulebook_id + ) + current_sha256 = current_rulebook.rulesets_sha256 + current_git_hash = project.git_hash + + content_changed = ( + activation.rulebook_rulesets_sha256 != current_sha256 + ) + hash_changed = activation.git_hash != current_git_hash + + if content_changed and activation.source_mappings: + logger.warning( + f"Skipping auto-restart for activation " + f"'{activation.name}' - has event stream " + f"source mappings that need manual update" + ) + continue + + # Only auto-restart activations have their cached + # rulesets updated (filtered by query above). + if content_changed or hash_changed: + activation.rulebook_rulesets = current_rulebook.rulesets or "" + activation.rulebook_rulesets_sha256 = current_sha256 + activation.git_hash = current_git_hash + activation.save( + update_fields=[ + "rulebook_rulesets", + "rulebook_rulesets_sha256", + "git_hash", + ] + ) + + if content_changed: + logger.info( + f"Content changed for activation " + f"'{activation.name}', " + f"triggering auto-restart" + ) + _restart_activation(activation) + restart_count += 1 + + except ObjectDoesNotExist: + logger.warning( + f"Rulebook for activation " + f"'{activation.name}' no longer exists" + ) + except Exception as e: + logger.error( + f"Failed to check activation " + f"'{activation.name}' for " + f"auto-restart: {e}", + exc_info=True, + ) + + logger.info( + f"Auto-restart check complete: {restart_count} " + f"activations restarted out of " + f"{checked_count} checked" + ) - logger.info(f"Task complete: Sync project ( project_id={project.id} )") + +def _restart_activation(activation: models.Activation): + """Execute auto-restart for an activation.""" + try: + restart_rulebook_process( + process_parent_type=ProcessParentType.ACTIVATION, + process_parent_id=activation.id, + request_id="", + ) + except Exception as e: + logger.error( + f"Failed to restart activation " + f"'{activation.name}' after sync: {e}", + exc_info=True, + ) + try: + activation.status = ActivationStatus.ERROR + activation.status_message = ( + f"Auto-restart failed after project sync: {e}" + ) + activation.save(update_fields=["status", "status_message"]) + except Exception as save_err: + logger.error( + f"Failed to set error state for " + f"'{activation.name}': {save_err}", + exc_info=True, + ) + return + + try: + activation.restart_count += 1 + activation.save(update_fields=["restart_count"]) + except Exception as e: + logger.warning( + f"Restart succeeded for '{activation.name}' " + f"but failed to update restart_count: {e}", + exc_info=True, + ) + + logger.info(f"Auto-restarted activation '{activation.name}'") + + +def _update_activation_content( + activation: models.Activation, + project: models.Project, +): + """Update activation's cached rulebook content, hash, and git hash.""" + try: + rulebook = models.Rulebook.objects.get(id=activation.rulebook_id) + activation.rulebook_rulesets = rulebook.rulesets or "" + activation.rulebook_rulesets_sha256 = rulebook.rulesets_sha256 + except ObjectDoesNotExist: + logger.warning( + f"Rulebook for activation " + f"'{activation.name}' not found, " + f"keeping existing rulesets" + ) + activation.git_hash = project.git_hash + + +def _resume_waiting_activations(project: models.Project): + """Resume activations waiting for project sync. + + Updates cached rulebook content and git hash before + starting/restarting to ensure fresh content is used. + """ + waiting = models.Activation.objects.filter( + project=project, + awaiting_project_sync=True, + ) + + for activation in waiting: + try: + # Update cached content from post-sync rulebook + _update_activation_content(activation, project) + + activation.awaiting_project_sync = False + if not activation.is_enabled: + logger.info( + f"Resuming enable for '{activation.name}' after sync" + ) + activation.is_enabled = True + activation.failure_count = 0 + activation.status = ActivationStatus.PENDING + activation.save( + update_fields=[ + "awaiting_project_sync", + "is_enabled", + "failure_count", + "status", + "rulebook_rulesets", + "rulebook_rulesets_sha256", + "git_hash", + "modified_at", + ] + ) + start_rulebook_process( + process_parent_type=(ProcessParentType.ACTIVATION), + process_parent_id=activation.id, + request_id="", + ) + else: + logger.info( + f"Resuming restart for '{activation.name}' after sync" + ) + activation.save( + update_fields=[ + "awaiting_project_sync", + "rulebook_rulesets", + "rulebook_rulesets_sha256", + "git_hash", + ] + ) + restart_rulebook_process( + process_parent_type=(ProcessParentType.ACTIVATION), + process_parent_id=activation.id, + request_id="", + ) + except Exception as e: + logger.error( + f"Failed to resume activation " + f"'{activation.name}' after sync: {e}", + exc_info=True, + ) + try: + activation.status = ActivationStatus.ERROR + activation.status_message = ( + f"Failed to start after project sync: {e}" + ) + activation.awaiting_project_sync = False + activation.save( + update_fields=[ + "status", + "status_message", + "awaiting_project_sync", + ] + ) + except Exception as save_err: + logger.error( + f"Failed to update activation status " + f"after resume failure: {save_err}", + exc_info=True, + ) + + +def _handle_sync_failure_activations(project_id: int, error_message: str): + """Handle waiting activations when sync fails.""" + try: + waiting = models.Activation.objects.filter( + project_id=project_id, + awaiting_project_sync=True, + ) + for activation in waiting: + try: + activation.status = ActivationStatus.ERROR + activation.status_message = ( + f"Project sync failed: {error_message}" + ) + activation.awaiting_project_sync = False + activation.save( + update_fields=[ + "status", + "status_message", + "awaiting_project_sync", + ] + ) + logger.info( + f"Set activation '{activation.name}' " + f"to ERROR after sync failure" + ) + except Exception as e: + logger.error( + f"Failed to update activation " + f"'{activation.name}' after " + f"sync failure: {e}", + exc_info=True, + ) + except Exception as e: + logger.error( + f"Failed to handle sync failure " + f"activations for project " + f"{project_id}: {e}", + exc_info=True, + ) # Started by the scheduler, unique concurrent execution @@ -131,33 +500,210 @@ def monitor_project_tasks(): def _monitor_project_tasks() -> None: - """Handle project tasks that are stuck. + """Handle project tasks that may be stuck in edge cases. - With dispatcherd, task monitoring is handled internally. This function - now focuses on cleaning up projects that may be in inconsistent states - due to external issues. + While dispatcherd handles task reliability internally, this function + focuses on cleaning up projects that may be in inconsistent states + due to external issues or crashes that bypass normal error handling. """ logger.info("Task started: Monitor project tasks") - # Find projects that have been in transition states for a long time - # Since dispatcherd handles task reliability internally, we only need - # to handle edge cases where projects might be stuck + timeout_threshold = timezone.now() - timedelta( + seconds=settings.DISPATCHERD_PROJECT_TASK_TIMEOUT * 2 + ) - # For now, this is a simplified monitoring approach - # In a dispatcherd environment, monitoring is handled by the dispatcher - unfinished_projects = models.Project.objects.filter( - import_state__in=[ - models.Project.ImportState.PENDING, - models.Project.ImportState.RUNNING, - ] + stuck_count = _recover_stuck_projects( + models.Project.ImportState.RUNNING, + timeout_threshold, + "Task appears to have been abandoned or crashed. " + "Marked as failed by monitoring system.", + ) + pending_count = _recover_stuck_projects( + models.Project.ImportState.PENDING, + timeout_threshold, + "Task was stuck in pending state. " + "Marked as failed by monitoring system.", ) - # Since dispatcherd handles task reliability, we just log the count - # The actual task recovery is handled by dispatcherd's internal mechanisms - if unfinished_projects.exists(): - logger.info( - f"Found {unfinished_projects.count()} projects in transition " - "states. Dispatcherd handles task recovery." - ) + total_stuck = stuck_count + pending_count + if total_stuck > 0: + logger.warning(f"Recovered {total_stuck} stuck project(s)") + else: + logger.info("No stuck projects found") + + orphaned = _recover_orphaned_awaiting_activations() + if orphaned > 0: + logger.warning(f"Recovered {orphaned} orphaned awaiting activation(s)") logger.info("Task complete: Monitor project tasks") + + +def _recover_stuck_projects( + expected_state: str, + timeout_threshold, + error_message: str, +) -> int: + """Recover projects stuck in the given state past the threshold. + + Uses select_for_update with a double-check pattern to avoid + race conditions with legitimate in-progress operations. + + Returns the number of projects recovered. + """ + stuck_projects = models.Project.objects.filter( + import_state=expected_state, + modified_at__lt=timeout_threshold, + ) + + recovered = 0 + for project in stuck_projects: + logger.warning( + f"Found project {project.id} stuck in " + f"{expected_state} state since " + f"{project.modified_at}. Marking as failed." + ) + try: + with transaction.atomic(): + fresh = models.Project.objects.select_for_update().get( + pk=project.id + ) + if fresh.import_state == expected_state: + fresh.import_state = models.Project.ImportState.FAILED + fresh.import_error = error_message + fresh.save( + update_fields=[ + "import_state", + "import_error", + ] + ) + logger.warning(f"Recovered stuck project {project.id}") + recovered += 1 + except ObjectDoesNotExist: + logger.warning(f"Project {project.id} was deleted during recovery") + except DatabaseError as e: + logger.error( + f"Failed to recover project {project.id}: {e}", + exc_info=True, + ) + + return recovered + + +def _recover_orphaned_awaiting_activations() -> int: + """Recover activations stuck with awaiting_project_sync=True. + + Finds activations whose project is no longer syncing + (COMPLETED, FAILED, or deleted) and either resumes them + or sets them to ERROR. This is a safety net for races + where the sync task completes before the flag is set, + or where sync_project() submission fails silently. + """ + stuck = models.Activation.objects.filter( + awaiting_project_sync=True, + ).select_related("project") + + recovered = 0 + resumed_projects = set() + for activation in stuck: + project = activation.project + # Project deleted or not syncing — activation is orphaned + if project is None or project.import_state not in [ + models.Project.ImportState.PENDING, + models.Project.ImportState.RUNNING, + ]: + state_desc = "deleted" if project is None else project.import_state + logger.warning( + f"Found orphaned activation " + f"'{activation.name}' with " + f"awaiting_project_sync=True " + f"(project state: {state_desc})" + ) + try: + if ( + project is not None + and project.import_state + == models.Project.ImportState.COMPLETED + and project.id not in resumed_projects + ): + # Project synced successfully — resume + _resume_waiting_activations(project) + resumed_projects.add(project.id) + else: + # Project failed or deleted — set ERROR + error_msg = ( + "Project sync did not complete. " + "Recovered by monitoring system." + ) + activation.status = ActivationStatus.ERROR + activation.status_message = error_msg + activation.awaiting_project_sync = False + activation.save( + update_fields=[ + "status", + "status_message", + "awaiting_project_sync", + ] + ) + recovered += 1 + except Exception as e: + logger.error( + f"Failed to recover orphaned " + f"activation '{activation.name}': {e}", + exc_info=True, + ) + + return recovered + + +def _get_project_safely(project_id: int) -> Optional[models.Project]: + """Get project, returning None only if it doesn't exist. + + Returns None if project doesn't exist. + Raises DatabaseError and other exceptions to be handled by callers, + which already have appropriate handlers for these cases. + """ + try: + return models.Project.objects.get(pk=project_id) + except ObjectDoesNotExist: + logger.error(f"Project {project_id} does not exist or was deleted") + return None + + +def _handle_project_error_recovery( + project_id: int, error_message: str +) -> None: + """Handle error recovery for project operations. + + Attempts to reset project state to FAILED to allow for future + retry attempts. Uses best-effort approach with transaction safety. + + Args: + project_id: The project ID (not the object, which may be stale + after a transaction rollback). + error_message: Error message to store in import_error. + """ + try: + with transaction.atomic(): + fresh_project = models.Project.objects.select_for_update().get( + pk=project_id + ) + if fresh_project.import_state in [ + models.Project.ImportState.PENDING, + models.Project.ImportState.RUNNING, + ]: + fresh_project.import_state = models.Project.ImportState.FAILED + fresh_project.import_error = error_message + fresh_project.save( + update_fields=["import_state", "import_error"] + ) + logger.info(f"Reset project {project_id} state to FAILED") + except ObjectDoesNotExist: + logger.warning( + f"Project {project_id} was deleted during error recovery" + ) + except DatabaseError as e: + logger.critical( + f"Failed to reset project {project_id} state after " + f"error (project may be stuck): {e}", + exc_info=True, + ) diff --git a/tests/integration/api/test_activation.py b/tests/integration/api/test_activation.py index a57f5fae3..cff1bd3d9 100644 --- a/tests/integration/api/test_activation.py +++ b/tests/integration/api/test_activation.py @@ -28,6 +28,7 @@ ) from aap_eda.api.serializers.project import ENCRYPTED_STRING from aap_eda.core import enums, models +from aap_eda.core.utils.rulebook import get_rulebook_hash from tests.integration.constants import api_url_v1 PROJECT_GIT_HASH = "684f62df18ce5f8d5c428e53203b9b975426eed0" @@ -1294,3 +1295,402 @@ def test_copy_activation_invalid_body( f"{api_url_v1}/activations/{a_id}/copy/", data={"name": name} ) assert response.status_code == status.HTTP_400_BAD_REQUEST + + +# ------------------------------------------------------------------ +# Project sync dependency tests +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +@mock.patch("aap_eda.api.views.activation.sync_project") +def test_enable_triggers_sync_when_project_needs_update( + mock_sync, + mock_health, + default_activation: models.Activation, + default_project: models.Project, + admin_client: APIClient, + preseed_credential_types, +): + """Enable triggers sync when project needs update on launch.""" + mock_sync.return_value = "task-uuid" + default_project.update_revision_on_launch = True + default_project.scm_update_cache_timeout = 0 + default_project.save( + update_fields=[ + "update_revision_on_launch", + "scm_update_cache_timeout", + ] + ) + default_activation.is_enabled = False + default_activation.status = enums.ActivationStatus.STOPPED + default_activation.save(update_fields=["is_enabled", "status"]) + + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/enable/" + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + default_activation.refresh_from_db() + assert default_activation.awaiting_project_sync is True + assert default_activation.status == enums.ActivationStatus.PENDING + mock_sync.assert_called_once_with(default_project.id) + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +@mock.patch("aap_eda.api.views.activation.sync_project") +def test_enable_skips_sync_when_project_already_syncing( + mock_sync, + mock_health, + default_activation: models.Activation, + default_project: models.Project, + admin_client: APIClient, + preseed_credential_types, +): + """Enable sets flag but skips sync_project when already running.""" + default_project.update_revision_on_launch = True + default_project.scm_update_cache_timeout = 0 + default_project.import_state = models.Project.ImportState.RUNNING + default_project.save( + update_fields=[ + "update_revision_on_launch", + "scm_update_cache_timeout", + "import_state", + ] + ) + default_activation.is_enabled = False + default_activation.status = enums.ActivationStatus.STOPPED + default_activation.save(update_fields=["is_enabled", "status"]) + + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/enable/" + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + default_activation.refresh_from_db() + assert default_activation.awaiting_project_sync is True + mock_sync.assert_not_called() + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +def test_enable_proceeds_normally_without_sync( + mock_health, + default_activation: models.Activation, + default_project: models.Project, + admin_client: APIClient, + preseed_credential_types, +): + """Enable returns 204 when project doesn't need sync.""" + default_project.update_revision_on_launch = False + default_project.save(update_fields=["update_revision_on_launch"]) + default_activation.is_enabled = False + default_activation.status = enums.ActivationStatus.STOPPED + default_activation.save(update_fields=["is_enabled", "status"]) + + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/enable/" + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + default_activation.refresh_from_db() + assert default_activation.awaiting_project_sync is False + assert default_activation.is_enabled is True + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +@mock.patch("aap_eda.api.views.activation.sync_project") +def test_restart_triggers_sync_when_project_needs_update( + mock_sync, + mock_health, + default_activation: models.Activation, + default_project: models.Project, + admin_client: APIClient, + preseed_credential_types, +): + """Restart triggers sync when project needs update.""" + mock_sync.return_value = "task-uuid" + default_project.update_revision_on_launch = True + default_project.scm_update_cache_timeout = 0 + default_project.save( + update_fields=[ + "update_revision_on_launch", + "scm_update_cache_timeout", + ] + ) + default_activation.is_enabled = True + default_activation.status = enums.ActivationStatus.RUNNING + default_activation.save(update_fields=["is_enabled", "status"]) + + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/restart/" + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + default_activation.refresh_from_db() + assert default_activation.awaiting_project_sync is True + mock_sync.assert_called_once_with(default_project.id) + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +@mock.patch("aap_eda.api.views.activation.sync_project") +def test_enable_sync_failure_resets_flag( + mock_sync, + mock_health, + default_activation: models.Activation, + default_project: models.Project, + admin_client: APIClient, + preseed_credential_types, +): + """Enable resets flag and returns error when sync_project throws.""" + mock_sync.side_effect = RuntimeError("dispatcherd down") + default_project.update_revision_on_launch = True + default_project.scm_update_cache_timeout = 0 + default_project.save( + update_fields=[ + "update_revision_on_launch", + "scm_update_cache_timeout", + ] + ) + default_activation.is_enabled = False + default_activation.status = enums.ActivationStatus.STOPPED + default_activation.save(update_fields=["is_enabled", "status"]) + + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/enable/" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + default_activation.refresh_from_db() + assert default_activation.awaiting_project_sync is False + assert default_activation.status == (enums.ActivationStatus.ERROR) + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +@mock.patch("aap_eda.api.views.activation.sync_project") +def test_disable_clears_awaiting_project_sync( + mock_sync, + mock_health, + default_activation: models.Activation, + default_project: models.Project, + admin_client: APIClient, + preseed_credential_types, +): + """Disable clears awaiting_project_sync flag.""" + mock_sync.return_value = "task-uuid" + default_project.update_revision_on_launch = True + default_project.scm_update_cache_timeout = 0 + default_project.save( + update_fields=[ + "update_revision_on_launch", + "scm_update_cache_timeout", + ] + ) + default_activation.is_enabled = False + default_activation.status = enums.ActivationStatus.STOPPED + default_activation.save(update_fields=["is_enabled", "status"]) + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/enable/" + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + + # Set is_enabled so disable logic runs + default_activation.refresh_from_db() + default_activation.is_enabled = True + default_activation.save(update_fields=["is_enabled"]) + + response = admin_client.post( + f"{api_url_v1}/activations/" f"{default_activation.id}/disable/" + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + + default_activation.refresh_from_db() + assert default_activation.awaiting_project_sync is False + assert default_activation.is_enabled is False + + +@pytest.mark.django_db +@mock.patch.object(settings, "RULEBOOK_WORKER_QUEUES", []) +@mock.patch( + "aap_eda.api.views.activation.check_dispatcherd_workers_health", +) +def test_destroy_clears_awaiting_project_sync( + mock_health, + default_activation: models.Activation, + admin_client: APIClient, + preseed_credential_types, +): + """Destroy clears awaiting_project_sync flag.""" + default_activation.awaiting_project_sync = True + default_activation.save(update_fields=["awaiting_project_sync"]) + + response = admin_client.delete( + f"{api_url_v1}/activations/" f"{default_activation.id}/" + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + + +@pytest.mark.django_db +def test_activation_detail_shows_warning_on_rulebook_drift( + default_activation: models.Activation, + admin_client: APIClient, + preseed_credential_types, +): + """Warnings show when rulebook SHA256 drifts from activation.""" + old_hash = get_rulebook_hash("old-content") + default_activation.source_mappings = "[{source: src1, event_stream: es1}]" + default_activation.rulebook_rulesets_sha256 = old_hash + default_activation.save( + update_fields=[ + "source_mappings", + "rulebook_rulesets_sha256", + ] + ) + + # Simulate rulebook drift by updating the rulebook's SHA + rulebook = default_activation.rulebook + rulebook.rulesets_sha256 = get_rulebook_hash("new-content") + rulebook.save(update_fields=["rulesets_sha256"]) + + response = admin_client.get( + f"{api_url_v1}/activations/{default_activation.id}/" + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data["warnings"]) == 1 + assert "source mappings" in response.data["warnings"][0] + + +@pytest.mark.django_db +def test_activation_detail_no_warning_when_hashes_match( + default_activation: models.Activation, + admin_client: APIClient, + preseed_credential_types, +): + """No warnings when activation and rulebook SHA256 match.""" + matching_hash = get_rulebook_hash("same-content") + default_activation.source_mappings = "[{source: src1, event_stream: es1}]" + default_activation.rulebook_rulesets_sha256 = matching_hash + default_activation.save( + update_fields=[ + "source_mappings", + "rulebook_rulesets_sha256", + ] + ) + + rulebook = default_activation.rulebook + rulebook.rulesets_sha256 = matching_hash + rulebook.save(update_fields=["rulesets_sha256"]) + + response = admin_client.get( + f"{api_url_v1}/activations/{default_activation.id}/" + ) + assert response.status_code == status.HTTP_200_OK + assert response.data["warnings"] == [] + + +@pytest.mark.django_db +def test_activation_detail_no_warning_without_source_mappings( + default_activation: models.Activation, + admin_client: APIClient, + preseed_credential_types, +): + """No warnings when activation has no source_mappings.""" + default_activation.source_mappings = "" + default_activation.rulebook_rulesets_sha256 = get_rulebook_hash( + "old-content" + ) + default_activation.save( + update_fields=[ + "source_mappings", + "rulebook_rulesets_sha256", + ] + ) + + rulebook = default_activation.rulebook + rulebook.rulesets_sha256 = get_rulebook_hash("new-content") + rulebook.save(update_fields=["rulesets_sha256"]) + + response = admin_client.get( + f"{api_url_v1}/activations/{default_activation.id}/" + ) + assert response.status_code == status.HTTP_200_OK + assert response.data["warnings"] == [] + + +@pytest.mark.django_db +def test_list_activations_does_not_include_warnings( + default_activation: models.Activation, + admin_client: APIClient, + preseed_credential_types, +): + """List endpoint does not include warnings field.""" + default_activation.source_mappings = "[{source: src1, event_stream: es1}]" + default_activation.rulebook_rulesets_sha256 = get_rulebook_hash( + "old-content" + ) + default_activation.save( + update_fields=[ + "source_mappings", + "rulebook_rulesets_sha256", + ] + ) + + rulebook = default_activation.rulebook + rulebook.rulesets_sha256 = get_rulebook_hash("new-content") + rulebook.save(update_fields=["rulesets_sha256"]) + + response = admin_client.get(f"{api_url_v1}/activations/") + assert response.status_code == status.HTTP_200_OK + for result in response.data["results"]: + assert "warnings" not in result + + +@pytest.mark.django_db +def test_activation_detail_warning_on_deleted_rulebook( + default_activation: models.Activation, + admin_client: APIClient, + preseed_credential_types, +): + """Warning when rulebook is deleted but source_mappings exist.""" + default_activation.source_mappings = "[{source: src1, event_stream: es1}]" + default_activation.rulebook_rulesets_sha256 = get_rulebook_hash( + "old-content" + ) + default_activation.rulebook = None + default_activation.save( + update_fields=[ + "source_mappings", + "rulebook_rulesets_sha256", + "rulebook", + ] + ) + + response = admin_client.get( + f"{api_url_v1}/activations/{default_activation.id}/" + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data["warnings"]) == 1 + assert "no longer exists" in response.data["warnings"][0] diff --git a/tests/integration/api/test_project.py b/tests/integration/api/test_project.py index 93c76a8ff..a3d7ac132 100644 --- a/tests/integration/api/test_project.py +++ b/tests/integration/api/test_project.py @@ -16,6 +16,7 @@ from unittest import mock import pytest +from django.utils import timezone from rest_framework import status from rest_framework.test import APIClient @@ -1086,7 +1087,7 @@ def test_multiple_concurrent_health_check_calls( create_bodies = [ { "name": f"test-project-concurrent-{i}", - "url": f"https://git.example.com/test/project{i}", # noqa E231 + "url": f"https://git.example.com/test/project{i}", # noqa: E231 "organization_id": default_organization.id, } for i in range(3) @@ -1108,6 +1109,305 @@ def test_multiple_concurrent_health_check_calls( assert mock_health_check.call_count == 3 +# Test: Project Auto-Sync Configuration +# ------------------------------------- +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_default_values( + mock_health_check, + default_organization: models.Organization, + admin_client: APIClient, +): + """Test that new project sync fields have correct default values.""" + mock_health_check.return_value = True + + test_project = { + "name": "test-project-sync-defaults", + "url": "https://git.example.com/repo.git", + "organization_id": default_organization.id, + } + + response = admin_client.post(f"{api_url_v1}/projects/", data=test_project) + assert response.status_code == status.HTTP_201_CREATED + + # Verify default values in response + data = response.json() + assert data["update_revision_on_launch"] is False + assert data["scm_update_cache_timeout"] == 0 + assert data["last_synced_at"] is None + + # Verify in database + project = models.Project.objects.get(id=data["id"]) + assert project.update_revision_on_launch is False + assert project.scm_update_cache_timeout == 0 + assert project.last_synced_at is None + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_fields_in_create_request( + mock_health_check, + default_organization: models.Organization, + admin_client: APIClient, +): + """Test that project sync fields can be set in create request.""" + mock_health_check.return_value = True + + test_project = { + "name": "test-project-sync-create", + "url": "https://git.example.com/repo.git", + "organization_id": default_organization.id, + "update_revision_on_launch": True, + "scm_update_cache_timeout": 300, + } + + response = admin_client.post(f"{api_url_v1}/projects/", data=test_project) + assert response.status_code == status.HTTP_201_CREATED + + data = response.json() + assert data["update_revision_on_launch"] is True + assert data["scm_update_cache_timeout"] == 300 + + # Verify in database + project = models.Project.objects.get(id=data["id"]) + assert project.update_revision_on_launch is True + assert project.scm_update_cache_timeout == 300 + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_fields_in_update_request( + mock_health_check, default_project: models.Project, admin_client: APIClient +): + """Test that project sync fields can be updated via PATCH.""" + mock_health_check.return_value = True + + update_data = { + "update_revision_on_launch": True, + "scm_update_cache_timeout": 600, + } + + response = admin_client.patch( + f"{api_url_v1}/projects/{default_project.id}/", data=update_data + ) + assert response.status_code == status.HTTP_200_OK + + data = response.json() + assert data["update_revision_on_launch"] is True + assert data["scm_update_cache_timeout"] == 600 + + # Verify in database + default_project.refresh_from_db() + assert default_project.update_revision_on_launch is True + assert default_project.scm_update_cache_timeout == 600 + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_cache_timeout_validation_create( + mock_health_check, + default_organization: models.Organization, + admin_client: APIClient, +): + """Test validation of negative cache timeout values in create request.""" + mock_health_check.return_value = True + + test_project = { + "name": "test-project-negative-cache", + "url": "https://git.example.com/repo.git", + "organization_id": default_organization.id, + "scm_update_cache_timeout": -1, # Invalid negative value + } + + response = admin_client.post(f"{api_url_v1}/projects/", data=test_project) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "scm_update_cache_timeout" in response.json() + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_cache_timeout_validation_update( + mock_health_check, default_project: models.Project, admin_client: APIClient +): + """Test validation of negative cache timeout values in update request.""" + mock_health_check.return_value = True + + update_data = { + "scm_update_cache_timeout": -5, # Invalid negative value + } + + response = admin_client.patch( + f"{api_url_v1}/projects/{default_project.id}/", data=update_data + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "scm_update_cache_timeout" in response.json() + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_cache_timeout_max_validation_create( + mock_health_check, + default_organization: models.Organization, + admin_client: APIClient, +): + """Test validation of maximum cache timeout values in create request.""" + mock_health_check.return_value = True + + test_project = { + "name": "test-project-max-cache", + "url": "https://git.example.com/repo.git", + "organization_id": default_organization.id, + "scm_update_cache_timeout": 86401, # Invalid: > 86400 (1 day) + } + + response = admin_client.post(f"{api_url_v1}/projects/", data=test_project) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "scm_update_cache_timeout" in response.json() + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_sync_cache_timeout_max_validation_update( + mock_health_check, default_project: models.Project, admin_client: APIClient +): + """Test validation of maximum cache timeout values in update request.""" + mock_health_check.return_value = True + + update_data = { + "scm_update_cache_timeout": 100000, # Invalid: > 86400 (1 day) + } + + response = admin_client.patch( + f"{api_url_v1}/projects/{default_project.id}/", data=update_data + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "scm_update_cache_timeout" in response.json() + + +@pytest.mark.django_db +def test_project_needs_update_on_launch_property( + default_organization: models.Organization, +): + """Test the needs_update_on_launch property logic.""" + project = models.Project.objects.create( + name="test-project-needs-update", + url="https://git.example.com/repo.git", + organization=default_organization, + git_hash="abc123", + update_revision_on_launch=False, # Initially disabled + scm_update_cache_timeout=300, # 5 minutes cache + ) + + # Test 1: When update_revision_on_launch is False, should return False + assert project.needs_update_on_launch is False + + # Test 2: Enable update_revision_on_launch but no cache (timeout=0) + project.update_revision_on_launch = True + project.scm_update_cache_timeout = 0 + assert project.needs_update_on_launch is True + + # Test 3: With cache timeout but never synced, should return True + project.scm_update_cache_timeout = 300 + project.last_synced_at = None + assert project.needs_update_on_launch is True + + # Test 4: Recently synced within cache timeout, should return False + from django.utils import timezone + + project.last_synced_at = timezone.now() - datetime.timedelta( + seconds=60 + ) # 1 minute ago + assert project.needs_update_on_launch is False + + # Test 5: Synced outside cache timeout, should return True + project.last_synced_at = timezone.now() - datetime.timedelta( + seconds=600 + ) # 10 minutes ago + assert project.needs_update_on_launch is True + + +@pytest.mark.django_db +@mock.patch("aap_eda.api.views.project.check_default_worker_health") +def test_project_retrieve_includes_sync_fields( + mock_health_check, default_project: models.Project, admin_client: APIClient +): + """Test that project retrieve includes all sync configuration fields.""" + mock_health_check.return_value = True + + # Update project with sync configuration + default_project.update_revision_on_launch = True + default_project.scm_update_cache_timeout = 900 + default_project.last_synced_at = timezone.now() + default_project.save() + + response = admin_client.get(f"{api_url_v1}/projects/{default_project.id}/") + assert response.status_code == status.HTTP_200_OK + + data = response.json() + assert "update_revision_on_launch" in data + assert "scm_update_cache_timeout" in data + assert "last_synced_at" in data + + assert data["update_revision_on_launch"] is True + assert data["scm_update_cache_timeout"] == 900 + assert data["last_synced_at"] is not None + + +@pytest.mark.django_db +def test_project_model_default_sync_values( + default_organization: models.Organization, +): + """Test that sync fields have correct default values in model.""" + project = models.Project.objects.create( + name="test-project-model-defaults", + url="https://git.example.com/repo.git", + organization=default_organization, + git_hash="abc123", + ) + + assert project.update_revision_on_launch is False + assert project.scm_update_cache_timeout == 0 + assert project.last_synced_at is None + + +@pytest.mark.django_db +def test_project_model_cache_timeout_constraint( + default_organization: models.Organization, +): + """Test database constraint prevents negative cache timeout values.""" + from django.db import IntegrityError, transaction + + with pytest.raises(IntegrityError) as excinfo: + with transaction.atomic(): + models.Project.objects.create( + name="test-project-negative-timeout", + url="https://git.example.com/repo.git", + organization=default_organization, + git_hash="abc123", + scm_update_cache_timeout=-1, # Invalid negative value + ) + assert "ck_cache_timeout_range" in str(excinfo.value) + + +@pytest.mark.django_db +def test_project_model_cache_timeout_max_constraint( + default_organization: models.Organization, +): + """Test database constraint prevents cache timeout values > 86400.""" + from django.db import IntegrityError, transaction + + with pytest.raises(IntegrityError) as excinfo: + with transaction.atomic(): + models.Project.objects.create( + name="test-project-max-timeout", + url="https://git.example.com/repo.git", + organization=default_organization, + git_hash="abc123", + scm_update_cache_timeout=86401, # Invalid: > 86400 (1 day) + ) + assert "ck_cache_timeout_range" in str(excinfo.value) + + # Utils # ------------------------------------- DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" diff --git a/tests/integration/core/migrations/test_0069_activation_restart_and_sync_fields.py b/tests/integration/core/migrations/test_0069_activation_restart_and_sync_fields.py new file mode 100644 index 000000000..f5ef80245 --- /dev/null +++ b/tests/integration/core/migrations/test_0069_activation_restart_and_sync_fields.py @@ -0,0 +1,115 @@ +import hashlib + +import pytest +from django.core.management import call_command +from django.db import connection +from django.db.migrations.executor import MigrationExecutor + +from aap_eda.core import models + + +def _sha256(content): + return hashlib.sha256((content or "").encode("utf-8")).hexdigest() + + +def get_historical_model(app_label, model_name, migration): + """Get model as it existed at a specific migration.""" + executor = MigrationExecutor(connection) + state = executor.loader.project_state((app_label, migration)) + return state.apps.get_model(app_label, model_name) + + +@pytest.fixture +def rollback_migration(): + """Rollback to pre-0069 state and restore after test.""" + call_command("migrate", "core", "0068") + yield + call_command("migrate") + + +@pytest.mark.django_db(transaction=True) +def test_migration_populates_rulebook_hash( + rollback_migration, + default_organization, +): + """Migration computes SHA256 for existing Rulebook rows.""" + HistoricalRulebook = get_historical_model( # noqa: N806 + "core", + "Rulebook", + "0068_add_project_sync_fields", + ) + + content = "---\n- name: test\n hosts: all\n" + rb = HistoricalRulebook.objects.create( + name="test-rb", + rulesets=content, + organization_id=default_organization.id, + ) + + call_command("migrate", "core", "0069") + + rb_fresh = models.Rulebook.objects.get(pk=rb.pk) + assert rb_fresh.rulesets_sha256 == _sha256(content) + assert len(rb_fresh.rulesets_sha256) == 64 + + +@pytest.mark.django_db(transaction=True) +def test_migration_populates_activation_hash( + rollback_migration, + default_organization, +): + """Migration computes SHA256 for existing Activation rows.""" + HistoricalRulebook = get_historical_model( # noqa: N806 + "core", + "Rulebook", + "0068_add_project_sync_fields", + ) + HistoricalActivation = get_historical_model( # noqa: N806 + "core", + "Activation", + "0068_add_project_sync_fields", + ) + + content = "rule-content" + rb = HistoricalRulebook.objects.create( + name="test-rb", + rulesets=content, + organization_id=default_organization.id, + ) + act = HistoricalActivation.objects.create( + name="test-act", + rulebook_id=rb.pk, + rulebook_name=rb.name, + rulebook_rulesets=content, + organization_id=default_organization.id, + ) + + call_command("migrate", "core", "0069") + + act_fresh = models.Activation.objects.get(pk=act.pk) + assert act_fresh.rulebook_rulesets_sha256 == (_sha256(content)) + + +@pytest.mark.django_db(transaction=True) +def test_migration_handles_empty_rulesets( + rollback_migration, + default_organization, +): + """Migration handles empty rulesets content.""" + HistoricalRulebook = get_historical_model( # noqa: N806 + "core", + "Rulebook", + "0068_add_project_sync_fields", + ) + + rb = HistoricalRulebook.objects.create( + name="empty-rb", + rulesets="", + organization_id=default_organization.id, + ) + + call_command("migrate", "core", "0069") + + rb_fresh = models.Rulebook.objects.get(pk=rb.pk) + assert rb_fresh.rulesets_sha256 == _sha256("") + assert len(rb_fresh.rulesets_sha256) == 64 diff --git a/tests/integration/tasks/test_project_dispatcherd.py b/tests/integration/tasks/test_project_dispatcherd.py index 40cfa9083..21c8359c5 100644 --- a/tests/integration/tasks/test_project_dispatcherd.py +++ b/tests/integration/tasks/test_project_dispatcherd.py @@ -382,22 +382,29 @@ def test_monitor_project_tasks_internal_no_unfinished_projects(mock_logger): # No projects exist, so no unfinished ones project._monitor_project_tasks() - # Should log start and complete - assert mock_logger.info.call_count == 2 + # Should log start, no stuck projects, and complete + assert mock_logger.info.call_count == 3 start_call = mock_logger.info.call_args_list[0][0][0] - complete_call = mock_logger.info.call_args_list[1][0][0] + no_stuck_call = mock_logger.info.call_args_list[1][0][0] + complete_call = mock_logger.info.call_args_list[2][0][0] assert "Task started: Monitor project tasks" in start_call + assert "No stuck projects found" in no_stuck_call assert "Task complete: Monitor project tasks" in complete_call @pytest.mark.django_db @patch("aap_eda.tasks.project.logger") -def test_monitor_project_tasks_internal_with_unfinished_projects( +def test_monitor_project_tasks_internal_with_recent_unfinished_projects( mock_logger, default_organization ): - """Test _monitor_project_tasks with unfinished projects.""" - # Create projects in transition states + """Test _monitor_project_tasks ignores recently created projects. + + Projects in transition states (PENDING/RUNNING) that were recently + modified should not be recovered -- they may still be processing. + Only projects older than the timeout threshold are recovered. + """ + # Create projects in transition states (recently modified) models.Project.objects.create( name="pending-project", url="https://github.com/test/pending.git", @@ -419,14 +426,14 @@ def test_monitor_project_tasks_internal_with_unfinished_projects( project._monitor_project_tasks() - # Should log start, unfinished count, and complete + # Recently created projects are not recovered; + # should log start, no stuck projects, and complete assert mock_logger.info.call_count == 3 start_call = mock_logger.info.call_args_list[0][0][0] - unfinished_call = mock_logger.info.call_args_list[1][0][0] + no_stuck_call = mock_logger.info.call_args_list[1][0][0] complete_call = mock_logger.info.call_args_list[2][0][0] assert "Task started: Monitor project tasks" in start_call - assert "Found 2 projects in transition states" in unfinished_call - assert "Dispatcherd handles task recovery" in unfinished_call + assert "No stuck projects found" in no_stuck_call assert "Task complete: Monitor project tasks" in complete_call diff --git a/tests/integration/tasks/test_projects.py b/tests/integration/tasks/test_projects.py index a0feaa95c..4c2586295 100644 --- a/tests/integration/tasks/test_projects.py +++ b/tests/integration/tasks/test_projects.py @@ -14,11 +14,31 @@ """Tests for project task monitoring functionality with dispatcherd.""" +from datetime import timedelta from unittest.mock import Mock, patch import pytest +from django.core.exceptions import ObjectDoesNotExist +from django.db import DatabaseError +from django.utils import timezone +from aap_eda.core import models +from aap_eda.core.enums import ActivationStatus +from aap_eda.core.utils.rulebook import get_rulebook_hash +from aap_eda.services.project import ProjectImportError from aap_eda.tasks.project import ( + _auto_restart_activations, + _get_project_safely, + _handle_post_sync_activations, + _handle_project_error_recovery, + _handle_sync_failure_activations, + _import_project_no_lock, + _monitor_project_tasks, + _recover_orphaned_awaiting_activations, + _restart_activation, + _resume_waiting_activations, + _sync_project_no_lock, + _update_activation_content, check_default_worker_health, monitor_project_tasks, ) @@ -166,3 +186,1704 @@ def test_check_default_worker_health_sanitizes_queue_name( assert result is True mock_sanitize.assert_called_once_with("default") mock_check_rulebook_health.assert_called_once_with("sanitized_default") + + +# Tests for new critical exception handling functions (AAP-64076) + + +@pytest.mark.django_db +def test_get_project_safely_success(default_organization): + """Test _get_project_safely returns project when it exists.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + + result = _get_project_safely(project.id) + + assert result is not None + assert result.id == project.id + assert result.name == "Test Project" + + +@pytest.mark.django_db +def test_get_project_safely_does_not_exist(): + """Test _get_project_safely returns None for non-existent project.""" + non_existent_id = 99999 + + result = _get_project_safely(non_existent_id) + + assert result is None + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.models.Project.objects.get") +def test_get_project_safely_database_error_propagates(mock_get): + """Test _get_project_safely lets DatabaseError propagate to callers.""" + mock_get.side_effect = DatabaseError("Connection lost") + + with pytest.raises(DatabaseError): + _get_project_safely(123) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.models.Project.objects.get") +def test_get_project_safely_generic_exception_propagates(mock_get): + """Test _get_project_safely lets generic exceptions propagate.""" + mock_get.side_effect = RuntimeError("Unexpected error") + + with pytest.raises(RuntimeError): + _get_project_safely(123) + + +@pytest.mark.django_db +def test_handle_project_error_recovery_success(default_organization): + """Test _handle_project_error_recovery resets project state.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + + _handle_project_error_recovery(project.id, "Test error message") + + project.refresh_from_db() + assert project.import_state == models.Project.ImportState.FAILED + assert project.import_error == "Test error message" + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.logger") +def test_handle_project_error_recovery_project_deleted( + mock_logger, default_organization +): + """Test _handle_project_error_recovery handles deleted projects.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + + project_id = project.id + project.delete() + + _handle_project_error_recovery(project_id, "Test error") + + # Should log a warning about deleted project + mock_logger.warning.assert_called_once() + log_call = mock_logger.warning.call_args + assert "was deleted during error recovery" in log_call[0][0] + + +@pytest.mark.django_db +def test_handle_project_error_recovery_completed_project( + default_organization, +): + """Test _handle_project_error_recovery skips completed projects.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.COMPLETED, + import_error="", + ) + + _handle_project_error_recovery(project.id, "Test error message") + + project.refresh_from_db() + # Should not change completed project + assert project.import_state == models.Project.ImportState.COMPLETED + assert project.import_error == "" + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project.logger") +def test_import_project_no_lock_success( + mock_logger, mock_service, mock_get_project, default_organization +): + """Test _import_project_no_lock handles successful import.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_import_service = Mock() + mock_service.return_value = mock_import_service + + _import_project_no_lock(project.id) + + mock_get_project.assert_called_once_with(project.id) + mock_import_service.import_project.assert_called_once_with(project) + project.refresh_from_db() + assert project.last_synced_at is not None + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.logger") +def test_import_project_no_lock_project_not_found( + mock_logger, mock_get_project +): + """Test _import_project_no_lock handles missing project gracefully.""" + mock_get_project.return_value = None + + _import_project_no_lock(999) + + mock_get_project.assert_called_once_with(999) + # Should log task started and completed with project not found message + assert mock_logger.info.call_count == 2 + start_call = mock_logger.info.call_args_list[0] + complete_call = mock_logger.info.call_args_list[1] + assert "Task started: Import project" in start_call[0][0] + assert "project not found" in complete_call[0][0] + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project._handle_project_error_recovery") +@patch("aap_eda.tasks.project.logger") +def test_import_project_no_lock_project_import_error( + mock_logger, + mock_recovery, + mock_service, + mock_get_project, + default_organization, +): + """Test _import_project_no_lock handles ProjectImportError.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_import_service = Mock() + mock_import_service.import_project.side_effect = ProjectImportError( + "Import failed" + ) + mock_service.return_value = mock_import_service + + _import_project_no_lock(project.id) + + mock_import_service.import_project.assert_called_once_with(project) + mock_logger.error.assert_called_once() + log_call = mock_logger.error.call_args + assert "Project import error for project" in log_call[0][0] + # Should call error recovery to set FAILED state + # (transaction.atomic rollback undoes the wrapper's state save) + mock_recovery.assert_called_once_with( + project.id, "Import failed: Import failed" + ) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project._handle_project_error_recovery") +@patch("aap_eda.tasks.project.logger") +def test_import_project_no_lock_database_error( + mock_logger, + mock_recovery, + mock_service, + mock_get_project, + default_organization, +): + """Test _import_project_no_lock handles database errors.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_import_service = Mock() + mock_import_service.import_project.side_effect = DatabaseError("DB error") + mock_service.return_value = mock_import_service + + _import_project_no_lock(project.id) + + mock_recovery.assert_called_once_with( + project.id, "Database error during import" + ) + mock_logger.error.assert_called_once() + log_call = mock_logger.error.call_args + assert "Database error during project import" in log_call[0][0] + assert log_call[1]["exc_info"] is True + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._handle_post_sync_activations") +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project.logger") +def test_sync_project_no_lock_success( + mock_logger, + mock_service, + mock_get_project, + mock_post_sync, + default_organization, +): + """Test _sync_project_no_lock handles successful sync.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_sync_service = Mock() + mock_service.return_value = mock_sync_service + + _sync_project_no_lock(project.id) + + mock_get_project.assert_called_once_with(project.id) + mock_sync_service.sync_project.assert_called_once_with(project) + project.refresh_from_db() + assert project.last_synced_at is not None + mock_post_sync.assert_called_once_with(project) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.settings") +def test_monitor_project_tasks_recovers_stuck_projects( + mock_settings, default_organization +): + """Test _monitor_project_tasks recovers stuck RUNNING projects.""" + mock_settings.DISPATCHERD_PROJECT_TASK_TIMEOUT = 300 + + # Create a project stuck in RUNNING state for too long + old_time = timezone.now() - timedelta(seconds=700) # Beyond 2x timeout + project = models.Project.objects.create( + name="Stuck Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + # Manually set modified_at to simulate old timestamp + models.Project.objects.filter(id=project.id).update(modified_at=old_time) + + _monitor_project_tasks() + + project.refresh_from_db() + assert project.import_state == models.Project.ImportState.FAILED + assert "Task appears to have been abandoned" in project.import_error + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.settings") +def test_monitor_project_tasks_recovers_stuck_pending( + mock_settings, default_organization +): + """Test _monitor_project_tasks recovers stuck PENDING projects.""" + mock_settings.DISPATCHERD_PROJECT_TASK_TIMEOUT = 300 + + # Create a project stuck in PENDING state for too long + old_time = timezone.now() - timedelta(seconds=700) + project = models.Project.objects.create( + name="Stuck Pending Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.PENDING, + ) + models.Project.objects.filter(id=project.id).update(modified_at=old_time) + + _monitor_project_tasks() + + project.refresh_from_db() + assert project.import_state == models.Project.ImportState.FAILED + assert "Task was stuck in pending state" in project.import_error + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.settings") +def test_monitor_project_tasks_ignores_recent_projects( + mock_settings, default_organization +): + """Test _monitor_project_tasks ignores recently modified projects.""" + mock_settings.DISPATCHERD_PROJECT_TASK_TIMEOUT = 300 + + project = models.Project.objects.create( + name="Recent Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + + _monitor_project_tasks() + + project.refresh_from_db() + # Should still be running as it's recent + assert project.import_state == models.Project.ImportState.RUNNING + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.settings") +@patch("aap_eda.tasks.project.logger") +def test_monitor_project_tasks_handles_concurrent_access( + mock_logger, mock_settings, default_organization +): + """Test _monitor_project_tasks handles race conditions gracefully.""" + mock_settings.DISPATCHERD_PROJECT_TASK_TIMEOUT = 300 + + # Create a project that will be deleted during monitoring + old_time = timezone.now() - timedelta(seconds=700) + project = models.Project.objects.create( + name="Soon to be deleted", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + models.Project.objects.filter(id=project.id).update(modified_at=old_time) + + # Mock the project to be already completed by the time we try to recover it + with patch( + "aap_eda.tasks.project.models.Project.objects.select_for_update" + ) as mock_select: + mock_queryset = Mock() + mock_select.return_value = mock_queryset + fresh_project = Mock() + fresh_project.import_state = models.Project.ImportState.COMPLETED + mock_queryset.get.return_value = fresh_project + + _monitor_project_tasks() + + # Should not save the project if it's already completed + fresh_project.save.assert_not_called() + + +# ------------------------------------------------------------------ +# Additional coverage: _import_project_no_lock generic Exception +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project._handle_project_error_recovery") +@patch("aap_eda.tasks.project.logger") +def test_import_project_no_lock_unexpected_exception( + mock_logger, + mock_recovery, + mock_service, + mock_get_project, + default_organization, +): + """Test _import_project_no_lock handles unexpected exceptions.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_import_service = Mock() + mock_import_service.import_project.side_effect = RuntimeError( + "Something broke" + ) + mock_service.return_value = mock_import_service + + _import_project_no_lock(project.id) + + mock_recovery.assert_called_once_with( + project.id, + "Unexpected error during import: Something broke", + ) + mock_logger.error.assert_called_once() + log_call = mock_logger.error.call_args + assert "Unexpected error during project import" in log_call[0][0] + assert log_call[1]["exc_info"] is True + + +# ------------------------------------------------------------------ +# Additional coverage: _sync_project_no_lock all error paths +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._handle_sync_failure_activations") +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.logger") +def test_sync_project_no_lock_project_not_found( + mock_logger, mock_get_project, mock_failure_handler +): + """Test _sync_project_no_lock handles missing project.""" + mock_get_project.return_value = None + + _sync_project_no_lock(999) + + mock_get_project.assert_called_once_with(999) + info_messages = [c[0][0] for c in mock_logger.info.call_args_list] + assert any("project not found" in m for m in info_messages) + mock_failure_handler.assert_called_once_with( + 999, "Project not found or deleted" + ) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._handle_sync_failure_activations") +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project._handle_project_error_recovery") +@patch("aap_eda.tasks.project.logger") +def test_sync_project_no_lock_project_import_error( + mock_logger, + mock_recovery, + mock_service, + mock_get_project, + mock_failure_handler, + default_organization, +): + """Test _sync_project_no_lock handles ProjectImportError.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_sync_service = Mock() + mock_sync_service.sync_project.side_effect = ProjectImportError( + "Sync failed" + ) + mock_service.return_value = mock_sync_service + + _sync_project_no_lock(project.id) + + mock_logger.error.assert_called_once() + log_call = mock_logger.error.call_args + assert "Project sync error for project" in log_call[0][0] + mock_recovery.assert_called_once_with( + project.id, "Sync failed: Sync failed" + ) + mock_failure_handler.assert_called_once_with( + project.id, "Sync failed: Sync failed" + ) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._handle_sync_failure_activations") +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project._handle_project_error_recovery") +@patch("aap_eda.tasks.project.logger") +def test_sync_project_no_lock_database_error( + mock_logger, + mock_recovery, + mock_service, + mock_get_project, + mock_failure_handler, + default_organization, +): + """Test _sync_project_no_lock handles database errors.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_sync_service = Mock() + mock_sync_service.sync_project.side_effect = DatabaseError("DB error") + mock_service.return_value = mock_sync_service + + _sync_project_no_lock(project.id) + + mock_recovery.assert_called_once_with( + project.id, "Database error during sync" + ) + mock_failure_handler.assert_called_once_with( + project.id, "Database error during sync" + ) + mock_logger.error.assert_called_once() + assert mock_logger.error.call_args[1]["exc_info"] is True + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._handle_sync_failure_activations") +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.ProjectImportService") +@patch("aap_eda.tasks.project._handle_project_error_recovery") +@patch("aap_eda.tasks.project.logger") +def test_sync_project_no_lock_unexpected_exception( + mock_logger, + mock_recovery, + mock_service, + mock_get_project, + mock_failure_handler, + default_organization, +): + """Test _sync_project_no_lock handles unexpected exceptions.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + mock_get_project.return_value = project + mock_sync_service = Mock() + mock_sync_service.sync_project.side_effect = RuntimeError( + "Something broke" + ) + mock_service.return_value = mock_sync_service + + _sync_project_no_lock(project.id) + + mock_recovery.assert_called_once_with( + project.id, + "Unexpected error during sync: Something broke", + ) + mock_failure_handler.assert_called_once_with( + project.id, + "Unexpected error during sync: Something broke", + ) + mock_logger.error.assert_called_once() + assert mock_logger.error.call_args[1]["exc_info"] is True + + +# ------------------------------------------------------------------ +# Additional coverage: _recover_stuck_projects error paths +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.settings") +@patch("aap_eda.tasks.project.logger") +def test_recover_stuck_projects_deleted_during_recovery( + mock_logger, mock_settings, default_organization +): + """Test _recover_stuck_projects handles project deleted mid-recovery.""" + mock_settings.DISPATCHERD_PROJECT_TASK_TIMEOUT = 300 + + old_time = timezone.now() - timedelta(seconds=700) + project = models.Project.objects.create( + name="Will be deleted", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + models.Project.objects.filter(id=project.id).update(modified_at=old_time) + + # Simulate deletion between queryset iteration and select_for_update + with patch( + "aap_eda.tasks.project.models.Project.objects" ".select_for_update" + ) as mock_select: + mock_qs = Mock() + mock_select.return_value = mock_qs + mock_qs.get.side_effect = ObjectDoesNotExist("Project not found") + + _monitor_project_tasks() + + warning_calls = [c[0][0] for c in mock_logger.warning.call_args_list] + assert any("was deleted" in msg for msg in warning_calls) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.settings") +@patch("aap_eda.tasks.project.logger") +def test_recover_stuck_projects_database_error( + mock_logger, mock_settings, default_organization +): + """Test _recover_stuck_projects handles DatabaseError during recovery.""" + mock_settings.DISPATCHERD_PROJECT_TASK_TIMEOUT = 300 + + old_time = timezone.now() - timedelta(seconds=700) + project = models.Project.objects.create( + name="DB Error Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + models.Project.objects.filter(id=project.id).update(modified_at=old_time) + + with patch( + "aap_eda.tasks.project.models.Project.objects" ".select_for_update" + ) as mock_select: + mock_qs = Mock() + mock_select.return_value = mock_qs + mock_qs.get.side_effect = DatabaseError("Connection lost") + + _monitor_project_tasks() + + error_calls = [c[0][0] for c in mock_logger.error.call_args_list] + assert any("Failed to recover project" in msg for msg in error_calls) + + +# ------------------------------------------------------------------ +# Additional coverage: _handle_project_error_recovery +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +def test_handle_project_error_recovery_pending_state( + default_organization, +): + """Test _handle_project_error_recovery resets PENDING projects.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.PENDING, + ) + + _handle_project_error_recovery(project.id, "Error in pending state") + + project.refresh_from_db() + assert project.import_state == models.Project.ImportState.FAILED + assert project.import_error == "Error in pending state" + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.logger") +def test_handle_project_error_recovery_database_error( + mock_logger, default_organization +): + """Test _handle_project_error_recovery handles DatabaseError.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + + with patch( + "aap_eda.tasks.project.models.Project.objects" ".select_for_update" + ) as mock_select: + mock_qs = Mock() + mock_select.return_value = mock_qs + mock_qs.get.side_effect = DatabaseError("Connection lost") + + _handle_project_error_recovery(project.id, "Test error") + + # Should log at CRITICAL level + mock_logger.critical.assert_called_once() + log_call = mock_logger.critical.call_args + assert "Failed to reset project" in log_call[0][0] + assert "project may be stuck" in log_call[0][0] + assert log_call[1]["exc_info"] is True + + +# ------------------------------------------------------------------ +# Post-sync activation handling tests +# ------------------------------------------------------------------ + + +def _create_test_rulebook(project, organization, **kwargs): + """Helper to create a Rulebook with SHA256 auto-computed.""" + defaults = { + "name": "test-rulebook", + "rulesets": "", + "project": project, + "organization": organization, + } + defaults.update(kwargs) + defaults["rulesets_sha256"] = get_rulebook_hash(defaults["rulesets"]) + return models.Rulebook.objects.create(**defaults) + + +def _create_test_activation(project, organization, rulebook, **kwargs): + """Helper to create test activations with defaults.""" + rulesets = kwargs.get("rulebook_rulesets", rulebook.rulesets or "") + defaults = { + "name": f"test-activation-{project.id}", + "project": project, + "organization": organization, + "rulebook": rulebook, + "rulebook_name": rulebook.name, + "rulebook_rulesets": rulesets, + "rulebook_rulesets_sha256": get_rulebook_hash(rulesets), + "is_enabled": True, + "status": ActivationStatus.RUNNING, + } + defaults.update(kwargs) + return models.Activation.objects.create(**defaults) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_activations_content_changed( + mock_restart, + default_organization, +): + """Test auto-restart triggers when rulebook content changes.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc123", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="new-content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="auto-restart-activation", + restart_on_project_update=True, + rulebook_rulesets="old-content", + git_hash="old-hash", + ) + + _auto_restart_activations(project) + + activation.refresh_from_db() + assert activation.rulebook_rulesets == "new-content" + assert activation.git_hash == "abc123" + mock_restart.assert_called_once() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_skips_activation_with_source_mappings( + mock_restart, + default_organization, +): + """Auto-restart skips activations with source_mappings.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc123", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="new-content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="source-mapped-activation", + restart_on_project_update=True, + rulebook_rulesets="old-content", + git_hash="old-hash", + source_mappings="[{source: src1, event_stream: es1}]", + ) + + _auto_restart_activations(project) + + activation.refresh_from_db() + # Content and SHA256 should remain frozen + assert activation.rulebook_rulesets == "old-content" + assert activation.rulebook_rulesets_sha256 == get_rulebook_hash( + "old-content" + ) + assert activation.git_hash == "old-hash" + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_works_without_source_mappings( + mock_restart, + default_organization, +): + """Auto-restart still works for activations without source_mappings.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc123", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="new-content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="no-source-mapping-activation", + restart_on_project_update=True, + rulebook_rulesets="old-content", + git_hash="old-hash", + ) + + _auto_restart_activations(project) + + activation.refresh_from_db() + assert activation.rulebook_rulesets == "new-content" + assert activation.git_hash == "abc123" + mock_restart.assert_called_once() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_mixed_source_mappings_activations( + mock_restart, + default_organization, +): + """Skip only applies to source_mappings activation in loop.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc123", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="new-content", + ) + mapped = _create_test_activation( + project, + default_organization, + rulebook, + name="mapped-activation", + restart_on_project_update=True, + rulebook_rulesets="old-content", + git_hash="old-hash", + source_mappings="[{source: src1, event_stream: es1}]", + ) + unmapped = _create_test_activation( + project, + default_organization, + rulebook, + name="unmapped-activation", + restart_on_project_update=True, + rulebook_rulesets="old-content", + git_hash="old-hash", + ) + + _auto_restart_activations(project) + + mapped.refresh_from_db() + assert mapped.rulebook_rulesets == "old-content" + assert mapped.git_hash == "old-hash" + + unmapped.refresh_from_db() + assert unmapped.rulebook_rulesets == "new-content" + assert unmapped.git_hash == "abc123" + + mock_restart.assert_called_once() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_skips_unchanged_content( + mock_restart, + default_organization, +): + """Test auto-restart skips when content unchanged.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="same-hash", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="same-content", + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="no-restart-activation", + restart_on_project_update=True, + rulebook_rulesets="same-content", + git_hash="same-hash", + ) + + _auto_restart_activations(project) + + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.start_rulebook_process") +def test_resume_waiting_activations_enable( + mock_start, + default_organization, +): + """Test resume enables disabled activations waiting for sync.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="waiting-activation", + is_enabled=False, + awaiting_project_sync=True, + status=ActivationStatus.STOPPED, + ) + + _resume_waiting_activations(project) + + activation.refresh_from_db() + assert activation.is_enabled is True + assert activation.awaiting_project_sync is False + assert activation.status == ActivationStatus.PENDING + mock_start.assert_called_once() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_resume_waiting_activations_restart( + mock_restart, + default_organization, +): + """Test resume restarts enabled activations waiting for sync.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="restart-waiting", + is_enabled=True, + awaiting_project_sync=True, + ) + + _resume_waiting_activations(project) + + activation.refresh_from_db() + assert activation.awaiting_project_sync is False + mock_restart.assert_called_once() + + +@pytest.mark.django_db +def test_handle_sync_failure_sets_error( + default_organization, +): + """Test sync failure sets ERROR on waiting activations.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="failing-activation", + awaiting_project_sync=True, + ) + + _handle_sync_failure_activations(project.id, "Sync failed: error") + + activation.refresh_from_db() + assert activation.status == ActivationStatus.ERROR + assert "Sync failed" in activation.status_message + assert activation.awaiting_project_sync is False + + +@pytest.mark.django_db +def test_handle_sync_failure_ignores_non_waiting( + default_organization, +): + """Test sync failure ignores activations not awaiting sync.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="not-waiting", + awaiting_project_sync=False, + ) + + _handle_sync_failure_activations(project.id, "Sync failed") + + activation.refresh_from_db() + assert activation.status == ActivationStatus.RUNNING + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_excludes_awaiting_sync( + mock_restart, + default_organization, +): + """Auto-restart skips activations awaiting sync.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc123", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="new-content", + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="both-flags", + restart_on_project_update=True, + awaiting_project_sync=True, + rulebook_rulesets="old-content", + git_hash="old-hash", + ) + + _auto_restart_activations(project) + + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._handle_sync_failure_activations") +@patch("aap_eda.tasks.project._get_project_safely") +@patch("aap_eda.tasks.project.logger") +def test_sync_project_no_lock_deleted_project_cleans_up( + mock_logger, + mock_get_project, + mock_failure_handler, +): + """Deleted project clears waiting activations.""" + mock_get_project.return_value = None + + _sync_project_no_lock(999) + + mock_failure_handler.assert_called_once_with( + 999, "Project not found or deleted" + ) + + +@pytest.mark.django_db +def test_recover_orphaned_awaiting_completed_project( + default_organization, +): + """Recovery resumes activations when project completed.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.COMPLETED, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="orphaned-completed", + is_enabled=False, + awaiting_project_sync=True, + status=ActivationStatus.PENDING, + ) + + with patch( + "aap_eda.tasks.project._resume_waiting_activations" + ) as mock_resume: + recovered = _recover_orphaned_awaiting_activations() + + assert recovered == 1 + mock_resume.assert_called_once_with(project) + + +@pytest.mark.django_db +def test_recover_orphaned_awaiting_failed_project( + default_organization, +): + """Recovery sets ERROR when project failed.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.FAILED, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="orphaned-failed", + awaiting_project_sync=True, + ) + + recovered = _recover_orphaned_awaiting_activations() + + assert recovered == 1 + activation.refresh_from_db() + assert activation.status == ActivationStatus.ERROR + assert activation.awaiting_project_sync is False + assert "monitoring system" in activation.status_message + + +@pytest.mark.django_db +def test_recover_orphaned_skips_actively_syncing( + default_organization, +): + """Recovery skips activations whose project is still syncing.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.RUNNING, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="still-syncing", + awaiting_project_sync=True, + ) + + recovered = _recover_orphaned_awaiting_activations() + + assert recovered == 0 + + +# ------------------------------------------------------------------ +# Additional coverage: error paths and edge cases +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_restart_activation_failure_sets_error( + mock_restart, + default_organization, +): + """Failed restart sets activation to ERROR status.""" + mock_restart.side_effect = RuntimeError("worker down") + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="failing-restart", + ) + + _restart_activation(activation) + + activation.refresh_from_db() + assert activation.status == ActivationStatus.ERROR + assert "Auto-restart failed" in activation.status_message + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_hash_only_change_no_restart( + mock_restart, + default_organization, +): + """Hash-only change updates metadata but does not restart.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="new-hash", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="same-content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="hash-only", + restart_on_project_update=True, + rulebook_rulesets="same-content", + git_hash="old-hash", + ) + + _auto_restart_activations(project) + + activation.refresh_from_db() + assert activation.git_hash == "new-hash" + assert activation.rulebook_rulesets == "same-content" + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_hash_only_change_updates_source_mappings_activation( + mock_restart, + default_organization, +): + """Hash-only change updates metadata for source_mappings activation.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="new-hash", + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="same-content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="hash-only-mapped", + restart_on_project_update=True, + rulebook_rulesets="same-content", + git_hash="old-hash", + source_mappings="[{source: src1, event_stream: es1}]", + ) + + _auto_restart_activations(project) + + activation.refresh_from_db() + # git_hash should be updated (hash-only, no content change) + assert activation.git_hash == "new-hash" + assert activation.rulebook_rulesets == "same-content" + # No restart for hash-only change + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.start_rulebook_process") +def test_resume_multiple_activations_same_project( + mock_start, + default_organization, +): + """Resume handles multiple waiting activations for one project.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + act1 = _create_test_activation( + project, + default_organization, + rulebook, + name="waiting-1", + is_enabled=False, + awaiting_project_sync=True, + status=ActivationStatus.PENDING, + ) + act2 = _create_test_activation( + project, + default_organization, + rulebook, + name="waiting-2", + is_enabled=False, + awaiting_project_sync=True, + status=ActivationStatus.PENDING, + ) + + _resume_waiting_activations(project) + + act1.refresh_from_db() + act2.refresh_from_db() + assert act1.is_enabled is True + assert act1.awaiting_project_sync is False + assert act2.is_enabled is True + assert act2.awaiting_project_sync is False + assert mock_start.call_count == 2 + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.start_rulebook_process") +def test_resume_start_failure_sets_error( + mock_start, + default_organization, +): + """Resume sets ERROR when start_rulebook_process throws.""" + mock_start.side_effect = RuntimeError("queue full") + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, + default_organization, + rulesets="content", + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="resume-fails", + is_enabled=False, + awaiting_project_sync=True, + status=ActivationStatus.PENDING, + ) + + _resume_waiting_activations(project) + + activation.refresh_from_db() + assert activation.status == ActivationStatus.ERROR + assert activation.awaiting_project_sync is False + assert "Failed to start" in activation.status_message + + +# ------------------------------------------------------------------ +# Coverage: error handling paths +# ------------------------------------------------------------------ + + +@pytest.mark.django_db +def test_needs_project_update_on_launch_no_project( + default_organization, +): + """Property returns False when activation has no project.""" + rulebook = _create_test_rulebook( + None, default_organization, rulesets="content" + ) + activation = models.Activation.objects.create( + name="no-project", + rulebook=rulebook, + rulebook_name=rulebook.name, + rulebook_rulesets="content", + organization=default_organization, + project=None, + ) + assert activation.needs_project_update_on_launch is False + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._resume_waiting_activations") +@patch("aap_eda.tasks.project._auto_restart_activations") +def test_post_sync_isolates_auto_restart_failure( + mock_auto_restart, + mock_resume, + default_organization, +): + """Resume still runs when auto-restart throws.""" + mock_auto_restart.side_effect = RuntimeError("boom") + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + + _handle_post_sync_activations(project) + + mock_auto_restart.assert_called_once() + mock_resume.assert_called_once_with(project) + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project._resume_waiting_activations") +@patch("aap_eda.tasks.project._auto_restart_activations") +def test_post_sync_handles_resume_failure( + mock_auto_restart, + mock_resume, + default_organization, +): + """Post-sync logs error when resume throws.""" + mock_resume.side_effect = RuntimeError("db down") + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + + _handle_post_sync_activations(project) + + mock_auto_restart.assert_called_once() + mock_resume.assert_called_once() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_rulebook_deleted( + mock_restart, + default_organization, +): + """Auto-restart handles deleted rulebook gracefully.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc", + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="content" + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="deleted-rb", + restart_on_project_update=True, + ) + # Delete the rulebook after creating activation + rulebook.delete() + + _auto_restart_activations(project) + + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_auto_restart_generic_exception( + mock_restart, + default_organization, +): + """Auto-restart handles generic exception per activation.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="abc", + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="new" + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="error-activation", + restart_on_project_update=True, + rulebook_rulesets="old", + ) + # Make Rulebook.objects.get raise unexpected error + with patch( + "aap_eda.tasks.project.models.Rulebook.objects.get", + side_effect=RuntimeError("unexpected"), + ): + _auto_restart_activations(project) + + mock_restart.assert_not_called() + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_restart_activation_save_error_after_failure( + mock_restart, + default_organization, +): + """Secondary save failure in _restart_activation is logged.""" + mock_restart.side_effect = RuntimeError("worker down") + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="content" + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="double-fail", + ) + + with patch.object( + type(activation), + "save", + side_effect=RuntimeError("save failed"), + ): + _restart_activation(activation) + + # Should not raise despite double failure + + +@pytest.mark.django_db +@patch("aap_eda.tasks.project.restart_rulebook_process") +def test_restart_activation_count_save_failure( + mock_restart, + default_organization, +): + """Restart count save failure doesn't set ERROR.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="content" + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="count-fail", + ) + + original_save = activation.save + + call_count = 0 + + def fail_on_second_save(**kwargs): + nonlocal call_count + call_count += 1 + if call_count > 1: + raise RuntimeError("count save failed") + original_save(**kwargs) + + with patch.object( + type(activation), "save", side_effect=fail_on_second_save + ): + _restart_activation(activation) + + # Restart succeeded, count save failed but no ERROR + mock_restart.assert_called_once() + + +@pytest.mark.django_db +def test_update_activation_content_rulebook_deleted( + default_organization, +): + """Content update handles deleted rulebook.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + git_hash="new-hash", + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="content" + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="deleted-rb-resume", + ) + rulebook.delete() + + _update_activation_content(activation, project) + + # git_hash updated, rulesets kept as-is + assert activation.git_hash == "new-hash" + assert activation.rulebook_rulesets == "content" + + +@pytest.mark.django_db +def test_handle_sync_failure_per_activation_error( + default_organization, +): + """Sync failure handles per-activation save error.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="content" + ) + _create_test_activation( + project, + default_organization, + rulebook, + name="save-error", + awaiting_project_sync=True, + ) + + with patch( + "aap_eda.tasks.project.models.Activation.objects.filter" + ) as mock_filter: + mock_qs = Mock() + mock_filter.return_value = mock_qs + mock_act = Mock() + mock_act.name = "save-error" + mock_act.save.side_effect = RuntimeError("save err") + mock_qs.__iter__ = Mock(return_value=iter([mock_act])) + + _handle_sync_failure_activations(project.id, "sync failed") + + # Should not raise + + +@pytest.mark.django_db +def test_handle_sync_failure_top_level_error( + default_organization, +): + """Sync failure handles top-level query error.""" + with patch( + "aap_eda.tasks.project.models.Activation.objects.filter", + side_effect=RuntimeError("db error"), + ): + _handle_sync_failure_activations(999, "sync failed") + + # Should not raise + + +@pytest.mark.django_db +def test_recover_orphaned_exception_handling( + default_organization, +): + """Recovery handles exception during activation fix.""" + project = models.Project.objects.create( + name="Test Project", + url="https://github.com/example/repo", + organization=default_organization, + import_state=models.Project.ImportState.FAILED, + ) + rulebook = _create_test_rulebook( + project, default_organization, rulesets="content" + ) + activation = _create_test_activation( + project, + default_organization, + rulebook, + name="recover-error", + awaiting_project_sync=True, + ) + + with patch.object( + models.Activation, "save", side_effect=RuntimeError("err") + ): + recovered = _recover_orphaned_awaiting_activations() + + assert recovered == 0 + activation.refresh_from_db() + # Still stuck because save failed + assert activation.awaiting_project_sync is True