Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/aap_eda/analytics/analytics_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,9 @@ def projects_table(
def rulebooks_table(
since: datetime, full_path: str, until: datetime, **kwargs
) -> list[str]:
queryset = _get_query(models.Rulebook.objects, since, until)
queryset = _get_query(models.Rulebook.objects, since, until).defer(
"rulesets_sha256"
)

return _copy_table("rulebooks", queryset, full_path)

Expand Down
47 changes: 45 additions & 2 deletions src/aap_eda/api/serializers/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class Meta:
"is_enabled",
"status",
"git_hash",
"restart_on_project_update",
"extra_var",
"decision_environment_id",
"project_id",
Expand Down Expand Up @@ -341,6 +342,7 @@ class Meta:
"description",
"is_enabled",
"status",
"restart_on_project_update",
"extra_var",
"decision_environment_id",
"project_id",
Expand Down Expand Up @@ -406,6 +408,9 @@ def to_representation(self, activation):
"rulebook_id": activation.rulebook_id,
"extra_var": extra_var,
"organization_id": activation.organization_id,
"restart_on_project_update": (
activation.restart_on_project_update
),
"restart_policy": activation.restart_policy,
"restart_count": activation.restart_count,
"rulebook_name": activation.rulebook_name,
Expand Down Expand Up @@ -441,6 +446,7 @@ class Meta:
"name",
"description",
"is_enabled",
"restart_on_project_update",
"decision_environment_id",
"rulebook_id",
"extra_var",
Expand Down Expand Up @@ -509,6 +515,9 @@ def create(self, validated_data):
validated_data["user_id"] = validated_data["user"].id
validated_data["rulebook_name"] = rulebook.name
validated_data["rulebook_rulesets"] = rulebook.rulesets
validated_data["rulebook_rulesets_sha256"] = get_rulebook_hash(
rulebook.rulesets
)
validated_data["git_hash"] = rulebook.project.git_hash
validated_data["project_id"] = rulebook.project.id
validated_data["log_tracking_id"] = str(uuid.uuid4())
Expand Down Expand Up @@ -572,6 +581,7 @@ def copy(self) -> dict:
"skip_audit_events": activation.skip_audit_events,
"rulebook_name": activation.rulebook.name,
"rulebook_rulesets": activation.rulebook_rulesets,
"rulebook_rulesets_sha256": activation.rulebook_rulesets_sha256,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the revision hash? If so, is this what controller calls it? It doesn't seem familiar to me as a field that controller sends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not the Git revision hash (that's git_hash). rulebook_rulesets_sha256 is a SHA256 hash of the rulebook's YAML rulesets content, computed locally by EDA using get_rulebook_hash(). It's not sent by controller — it's generated at activation create/update time and stored so we can later detect if the upstream rulebook content changed after a project sync. The naming follows the existing rulebook_rulesets field, with _sha256 appended to indicate it's a content hash of that field.

"git_hash": activation.rulebook.project.git_hash,
"project": activation.rulebook.project,
"log_tracking_id": str(uuid.uuid4()),
Expand Down Expand Up @@ -602,6 +612,7 @@ class Meta:
"name",
"description",
"is_enabled",
"restart_on_project_update",
"decision_environment_id",
"rulebook_id",
"extra_var",
Expand Down Expand Up @@ -687,6 +698,9 @@ def prepare_update(self, activation: models.Activation):
rulebook = models.Rulebook.objects.get(id=rulebook_id)
self.validated_data["rulebook_name"] = rulebook.name
self.validated_data["rulebook_rulesets"] = rulebook.rulesets
self.validated_data[
"rulebook_rulesets_sha256"
] = get_rulebook_hash(rulebook.rulesets)
self.validated_data["git_hash"] = rulebook.project.git_hash
self.validated_data["project_id"] = rulebook.project.id

Expand All @@ -701,9 +715,11 @@ def prepare_update(self, activation: models.Activation):
if yaml.safe_load(self.validated_data.get("source_mappings", "")):
if not rulebook_id:
# load the original ruleset
rulesets = activation.rulebook.rulesets
self.validated_data["rulebook_rulesets"] = rulesets
self.validated_data[
"rulebook_rulesets"
] = activation.rulebook.rulesets
"rulebook_rulesets_sha256"
] = get_rulebook_hash(rulesets)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument in the definition of get_rulebook_hash is called 'rulebook'. Here we call it rulesets, which implies plural rule set where as the function definition argument implies a singular rulebook. I understand that this function returns a hash, but I don't really understand how that hash applies to project sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation on the naming. The get_rulebook_hash function parameter is called rulebook because it takes the string content of a rulebook. Here rulesets refers to activation.rulebook.rulesets, which is the model field that stores the YAML content string — so it's the same data, the variable name just follows the field name. We could rename the variable for clarity but it matches the existing pattern from the previous code.

Regarding project sync: when a project sync occurs, it re-imports the git repo and updates each Rulebook.rulesets_sha256. The activation stores its own copy in rulebook_rulesets_sha256 from when it was created/updated. During project sync, we compare the two hashes — if they differ, the rulebook content has changed and the activation either auto-restarts (if restart_on_project_update=True) or surfaces a warning in the API response indicating source mappings may be stale.


_update_event_streams_and_credential(self.validated_data)
else:
Expand Down Expand Up @@ -875,6 +891,7 @@ class Meta:
"decision_environment",
"status",
"git_hash",
"restart_on_project_update",
"project",
"rulebook",
"extra_var",
Expand Down Expand Up @@ -973,6 +990,28 @@ def to_representation(self, activation):
else ""
)

warnings = []
if activation.source_mappings:
if not activation.rulebook:
warnings.append(
"The rulebook associated with this activation "
"no longer exists. Source mappings may be "
"invalid."
)
elif (
activation.rulebook_rulesets_sha256
and activation.rulebook.rulesets_sha256
and (
activation.rulebook_rulesets_sha256
!= activation.rulebook.rulesets_sha256
)
):
warnings.append(
"Rulebook content has changed since event "
"stream sources were mapped. Please update "
"source mappings."
)

return {
"id": activation.id,
"name": activation.name,
Expand All @@ -988,6 +1027,9 @@ def to_representation(self, activation):
"instances": ActivationInstanceSerializer(
activation_instances, many=True
).data,
"restart_on_project_update": (
activation.restart_on_project_update
),
"restart_policy": activation.restart_policy,
"restart_count": activation.restart_count,
"rulebook_name": activation.rulebook_name,
Expand All @@ -1006,6 +1048,7 @@ def to_representation(self, activation):
"k8s_service_name": activation.k8s_service_name,
"event_streams": event_streams,
"source_mappings": activation.source_mappings,
"warnings": warnings,
"skip_audit_events": activation.skip_audit_events,
"log_tracking_id": activation.log_tracking_id,
"created_by": BasicUserSerializer(activation.created_by).data,
Expand Down
42 changes: 42 additions & 0 deletions src/aap_eda/api/serializers/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class Meta:
"scm_refspec",
"verify_ssl",
"proxy",
"update_revision_on_launch",
"scm_update_cache_timeout",
"last_synced_at",
"created_by",
"modified_by",
*read_only_fields,
Expand Down Expand Up @@ -119,6 +122,22 @@ class ProjectCreateRequestSerializer(
validators=[validators.check_if_refspec_valid],
)

scm_update_cache_timeout = serializers.IntegerField(
required=False,
default=0,
min_value=0,
max_value=86400, # Max 1 day
help_text=(
"Cache timeout in seconds for project updates "
"(0 = no cache, max 86400)"
),
)
update_revision_on_launch = serializers.BooleanField(
required=False,
default=False,
help_text="Enable automatic project sync on activation launch",
)

class Meta:
model = models.Project
fields = [
Expand All @@ -133,6 +152,8 @@ class Meta:
"scm_type",
"scm_branch",
"scm_refspec",
"update_revision_on_launch",
"scm_update_cache_timeout",
]


Expand Down Expand Up @@ -213,6 +234,19 @@ class ProjectUpdateRequestSerializer(
validators.check_if_scm_url_valid,
],
)
scm_update_cache_timeout = serializers.IntegerField(
required=False,
min_value=0,
max_value=86400, # Max 1 day
help_text=(
"Cache timeout in seconds for project updates "
"(0 = no cache, max 86400)"
),
)
update_revision_on_launch = serializers.BooleanField(
required=False,
help_text="Enable automatic project sync on activation launch",
)

class Meta:
model = models.Project
Expand All @@ -227,6 +261,8 @@ class Meta:
"verify_ssl",
"proxy",
"url",
"update_revision_on_launch",
"scm_update_cache_timeout",
]

def validate(self, data):
Expand Down Expand Up @@ -288,6 +324,9 @@ class Meta:
"scm_branch",
"scm_refspec",
"proxy",
"update_revision_on_launch",
"scm_update_cache_timeout",
"last_synced_at",
"created_by",
"modified_by",
*read_only_fields,
Expand Down Expand Up @@ -326,6 +365,9 @@ def to_representation(self, project):
"organization": organization,
"eda_credential": eda_credential,
"signature_validation_credential": signature_validation_credential,
"update_revision_on_launch": project.update_revision_on_launch,
"scm_update_cache_timeout": project.scm_update_cache_timeout,
"last_synced_at": project.last_synced_at,
"import_state": project.import_state,
"import_error": project.import_error,
"created_at": project.created_at,
Expand Down
82 changes: 77 additions & 5 deletions src/aap_eda/api/views/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@
start_rulebook_process,
stop_rulebook_process,
)
from aap_eda.tasks.project import sync_project
from aap_eda.utils import str_to_bool

# RedisDependencyMixin import removed - no longer required with dispatcherd

logger = logging.getLogger(__name__)

resource_name = "RulebookActivation"
Expand Down Expand Up @@ -254,7 +253,8 @@ def destroy(self, request, *args, **kwargs):

with transaction.atomic():
activation.status = ActivationStatus.DELETING
activation.save(update_fields=["status"])
activation.awaiting_project_sync = False
activation.save(update_fields=["status", "awaiting_project_sync"])
name = activation.name

delete_rulebook_process(
Expand Down Expand Up @@ -424,6 +424,10 @@ def _start(self, request, activation: models.Activation) -> Response:
# Check if activation workers are available before enabling
check_dispatcherd_workers_health(raise_exceptions=True)

sync_response = self._sync_project_if_needed(activation)
if sync_response:
return sync_response

logger.info(f"Now enabling {activation.name} ...")

activation.is_enabled = True
Expand Down Expand Up @@ -497,12 +501,15 @@ def disable(self, request, pk):
activation, force_disable, "Disabled"
)

if activation.awaiting_project_sync:
activation.awaiting_project_sync = False
activation.save(update_fields=["awaiting_project_sync"])

if activation.is_enabled:
# Check if activation workers are available before disabling
# (unless force=true to allow operation with unhealthy workers)
if not force_disable:
check_dispatcherd_workers_health(raise_exceptions=True)

if activation.status in [
ActivationStatus.STARTING,
ActivationStatus.RUNNING,
Expand Down Expand Up @@ -544,7 +551,7 @@ def disable(self, request, pk):
),
status.HTTP_409_CONFLICT: OpenApiResponse(
None,
description="Activation blocked while Workers offline.",
description=("Activation blocked while Workers offline."),
),
},
parameters=[
Expand Down Expand Up @@ -593,6 +600,10 @@ def restart(self, request, pk):
{"errors": error}, status=status.HTTP_400_BAD_REQUEST
)

sync_response = self._sync_project_if_needed(activation)
if sync_response:
return sync_response

restart_rulebook_process(
process_parent_type=ProcessParentType.ACTIVATION,
process_parent_id=activation.id,
Expand Down Expand Up @@ -657,6 +668,67 @@ def copy(self, request, pk):
status=status.HTTP_201_CREATED,
)

def _sync_project_if_needed(
self, activation: models.Activation
) -> Response | None:
"""Check if project sync is needed before launch.

Sets awaiting_project_sync flag, updates status to PENDING,
and triggers sync if project is not already syncing.

Returns a 202 Response if sync was initiated, or None if
no sync is needed and the caller should proceed normally.
"""
if not activation.needs_project_update_on_launch:
return None

activation.awaiting_project_sync = True
activation.status = ActivationStatus.PENDING
activation.status_message = "Waiting for project sync to complete"
activation.save(
update_fields=[
"awaiting_project_sync",
"status",
"status_message",
]
)

# Only submit a new sync if one isn't already running.
# If a sync IS in progress, the flag is already set and
# the running sync will resume this activation on completion.
if activation.project.import_state not in [
models.Project.ImportState.PENDING,
models.Project.ImportState.RUNNING,
]:
try:
sync_project(activation.project.id)
except Exception as e:
logger.error(
f"Failed to start project sync for "
f"'{activation.name}': {e}",
exc_info=True,
)
activation.awaiting_project_sync = False
activation.status = ActivationStatus.ERROR
activation.status_message = (
f"Failed to start project sync: {e}"
)
activation.save(
update_fields=[
"awaiting_project_sync",
"status",
"status_message",
]
)
return Response(
{"errors": str(e)},
status=status.HTTP_400_BAD_REQUEST,
)

return Response(
status=status.HTTP_204_NO_CONTENT,
)

def _check_deleting(self, activation):
if activation.status == ActivationStatus.DELETING:
raise exceptions.APIException(
Expand Down
Loading