From f57f31318fd5d7a7a6d2a0d60f4e646914c32812 Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Fri, 5 Dec 2025 10:26:09 -0500 Subject: [PATCH 1/8] update readme and deployment --- .../deploy-fleet-bicep.sh | 52 +++- .../deploy-multi-region.sh | 3 +- .../install-documentdb-operator.sh | 3 +- .../aks-fleet-deployment/main.bicep | 72 +---- .../parameters.bicepparam | 10 - .../fleet-add-region/README.md | 38 +++ .../fleet-add-region/add-region.sh | 36 +++ .../fleet-add-region/check.sh | 288 ++++++++++++++++++ .../fleet-add-region/deploy-four-region.sh | 10 + .../documentdb-resource-crp.yaml | 85 ++++++ .../documentdb-three-region.sh | 164 ++++++++++ .../fleet-add-region/remove-region.sh | 34 +++ .../controller/documentdb_controller.go | 11 + .../controller/physical_replication.go | 226 ++++++++------ operator/src/internal/utils/constants.go | 3 + .../src/internal/utils/replication_context.go | 22 +- operator/src/internal/utils/util.go | 51 ++++ 17 files changed, 925 insertions(+), 183 deletions(-) delete mode 100644 documentdb-playground/aks-fleet-deployment/parameters.bicepparam create mode 100644 documentdb-playground/fleet-add-region/README.md create mode 100755 documentdb-playground/fleet-add-region/add-region.sh create mode 100755 documentdb-playground/fleet-add-region/check.sh create mode 100755 documentdb-playground/fleet-add-region/deploy-four-region.sh create mode 100644 documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml create mode 100755 documentdb-playground/fleet-add-region/documentdb-three-region.sh create mode 100755 documentdb-playground/fleet-add-region/remove-region.sh diff --git a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh index 1f329bf4..ac62ef46 100755 --- a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh +++ b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh @@ -10,19 +10,11 @@ RG_LOCATION="${RG_LOCATION:-eastus2}" HUB_REGION="${HUB_REGION:-westus3}" SCRIPT_DIR="$(dirname "$0")" -# Regions for member clusters (keep in sync with parameters.bicepparam if you change it) -if [ -n "${MEMBER_REGIONS_CSV:-}" ]; then - IFS=',' read -r -a MEMBER_REGIONS <<< "$MEMBER_REGIONS_CSV" -else - MEMBER_REGIONS=("westus3" "uksouth" "eastus2") -fi - # Optional: explicitly override the VM size used by the template param vmSize. # If left empty, the template's default (currently Standard_DS2_v2) will be used. KUBE_VM_SIZE="${KUBE_VM_SIZE:-}" - -# Build JSON arrays for parameters (after any fallbacks) -MEMBER_REGIONS_JSON=$(printf '%s\n' "${MEMBER_REGIONS[@]}" | jq -R . | jq -s .) +# Optional: override the default member regions defined in main.bicep (comma-separated list) +MEMBER_REGIONS="${MEMBER_REGIONS:-}" # Wait for any in-progress AKS operations in this resource group to finish wait_for_no_inprogress() { @@ -59,16 +51,24 @@ if ! wait_for_no_inprogress "$RESOURCE_GROUP"; then echo "Exiting without changes due to in-progress operations. Re-run when provisioning completes." >&2 exit 1 fi + +PARAMS=() # Build parameter overrides -PARAMS=( - --parameters "$SCRIPT_DIR/parameters.bicepparam" - --parameters memberRegions="$MEMBER_REGIONS_JSON" -) if [ -n "$KUBE_VM_SIZE" ]; then echo "Overriding kubernetes VM size with: $KUBE_VM_SIZE" PARAMS+=( --parameters vmSize="$KUBE_VM_SIZE" ) fi +if [ -n "$MEMBER_REGIONS" ]; then + echo "Overriding member regions with: $MEMBER_REGIONS" + MEMBER_REGION_JSON=$(printf '%s' "$MEMBER_REGIONS" | jq -Rsc 'split(",") | map(gsub("^\\s+|\\s+$";"")) | map(select(length>0))') + if [ "$(printf '%s' "$MEMBER_REGION_JSON" | jq 'length')" -eq 0 ]; then + echo "MEMBER_REGIONS did not contain any valid entries" >&2 + exit 1 + fi + PARAMS+=( --parameters memberRegions="$MEMBER_REGION_JSON" ) +fi + DEPLOYMENT_NAME=${DEPLOYMENT_NAME:-"aks-deployment-$(date +%s)"} az deployment group create \ --name "$DEPLOYMENT_NAME" \ @@ -84,6 +84,30 @@ DEPLOYMENT_OUTPUT=$(az deployment group show \ # Extract outputs MEMBER_CLUSTER_NAMES=$(echo $DEPLOYMENT_OUTPUT | jq -r '.memberClusterNames.value[]') +VNET_NAMES=$(echo $DEPLOYMENT_OUTPUT | jq -r '.memberVnetNames.value[]') + +while read -r vnet1; do + while read -r vnet2; do + [ -z "$vnet1" ] && continue + [ -z "$vnet2" ] && continue + [ "$vnet1" = "$vnet2" ] && continue + echo "Peering VNet '$vnet1' with VNet '$vnet2'..." + az network vnet peering create \ + --name "${vnet1}-to-${vnet2}-peering" \ + --resource-group "$RESOURCE_GROUP" \ + --vnet-name "$vnet1" \ + --remote-vnet "$vnet2" \ + --allow-vnet-access true \ + --allow-forwarded-traffic true + az network vnet peering create \ + --name "${vnet2}-to-${vnet1}-peering" \ + --resource-group "$RESOURCE_GROUP" \ + --vnet-name "$vnet2" \ + --remote-vnet "$vnet1" \ + --allow-vnet-access true \ + --allow-forwarded-traffic true + done <<< "$VNET_NAMES" +done <<< "$VNET_NAMES" HUB_CLUSTER="" while read -r cluster; do diff --git a/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh b/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh index 614c1753..1062e107 100755 --- a/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh +++ b/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh @@ -318,7 +318,8 @@ if [ "$ENABLE_AZURE_DNS" = "true" ]; then echo "Creating DNS record: $cluster" # Create service name by concatenating documentdb-preview with cluster name (max 63 chars) - SERVICE_NAME="documentdb-service-documentdb-preview" + SERVICE_NAME="documentdb-service-${cluster}" + SERVICE_NAME="${SERVICE_NAME:0:63}" # Get the external IP of the DocumentDB service EXTERNAL_IP="" diff --git a/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh b/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh index 7f5077a9..3ac3633e 100755 --- a/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh +++ b/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh @@ -10,6 +10,7 @@ set -euo pipefail RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" HUB_REGION="${HUB_REGION:-westus3}" CHART_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)/operator/documentdb-helm-chart" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" VERSION="${VERSION:-200}" VALUES_FILE="${VALUES_FILE:-}" BUILD_CHART="${BUILD_CHART:-true}" @@ -76,7 +77,7 @@ else fi fi -kubectl --context "$HUB_CLUSTER" apply -f ./documentdb-operator-crp.yaml +kubectl --context "$HUB_CLUSTER" apply -f $SCRIPT_DIR/documentdb-operator-crp.yaml # Get all member clusters diff --git a/documentdb-playground/aks-fleet-deployment/main.bicep b/documentdb-playground/aks-fleet-deployment/main.bicep index b2354332..e54f775e 100644 --- a/documentdb-playground/aks-fleet-deployment/main.bicep +++ b/documentdb-playground/aks-fleet-deployment/main.bicep @@ -19,18 +19,6 @@ param nodeCount int = 2 // Optionally include kubernetesVersion in cluster properties var maybeK8sVersion = empty(kubernetesVersion) ? {} : { kubernetesVersion: kubernetesVersion } -// Define non-overlapping address spaces for each member cluster -var memberVnetAddressSpaces = [ - '10.1.0.0/16' // westus3 - '10.2.0.0/16' // uksouth - '10.3.0.0/16' // eastus2 -] -var memberSubnetAddressSpaces = [ - '10.1.0.0/20' // westus3 - '10.2.0.0/20' // uksouth - '10.3.0.0/20' // eastus2 -] - // Member VNets resource memberVnets 'Microsoft.Network/virtualNetworks@2023-09-01' = [for (region, i) in memberRegions: { name: 'member-${region}-vnet' @@ -38,14 +26,14 @@ resource memberVnets 'Microsoft.Network/virtualNetworks@2023-09-01' = [for (regi properties: { addressSpace: { addressPrefixes: [ - memberVnetAddressSpaces[i] + '10.${i}.0.0/16' ] } subnets: [ { name: 'aks-subnet' properties: { - addressPrefix: memberSubnetAddressSpaces[i] + addressPrefix: '10.${i}.0.0/20' } } ] @@ -84,59 +72,5 @@ resource memberClusters 'Microsoft.ContainerService/managedClusters@2023-10-01' ] }] -// Create peering pairs for full mesh -var peeringPairs = [ - { - sourceIndex: 0 - targetIndex: 1 - sourceName: memberRegions[0] - targetName: memberRegions[1] - } - { - sourceIndex: 0 - targetIndex: 2 - sourceName: memberRegions[0] - targetName: memberRegions[2] - } - { - sourceIndex: 1 - targetIndex: 2 - sourceName: memberRegions[1] - targetName: memberRegions[2] - } -] - -// VNet peerings - Forward direction -resource memberPeeringsForward 'Microsoft.Network/virtualNetworks/virtualNetworkPeerings@2023-09-01' = [for pair in peeringPairs: { - name: '${pair.sourceName}-to-${pair.targetName}' - parent: memberVnets[pair.sourceIndex] - properties: { - remoteVirtualNetwork: { - id: memberVnets[pair.targetIndex].id - } - allowVirtualNetworkAccess: true - allowForwardedTraffic: true - allowGatewayTransit: false - useRemoteGateways: false - } -}] - -// VNet peerings - Reverse direction -resource memberPeeringsReverse 'Microsoft.Network/virtualNetworks/virtualNetworkPeerings@2023-09-01' = [for pair in peeringPairs: { - name: '${pair.targetName}-to-${pair.sourceName}' - parent: memberVnets[pair.targetIndex] - properties: { - remoteVirtualNetwork: { - id: memberVnets[pair.sourceIndex].id - } - allowVirtualNetworkAccess: true - allowForwardedTraffic: true - allowGatewayTransit: false - useRemoteGateways: false - } - dependsOn: [ - memberPeeringsForward - ] -}] - output memberClusterNames array = [for i in range(0, length(memberRegions)): memberClusters[i].name] +output memberVnetNames array = [for i in range(0, length(memberRegions)): memberVnets[i].name] diff --git a/documentdb-playground/aks-fleet-deployment/parameters.bicepparam b/documentdb-playground/aks-fleet-deployment/parameters.bicepparam deleted file mode 100644 index 97f91506..00000000 --- a/documentdb-playground/aks-fleet-deployment/parameters.bicepparam +++ /dev/null @@ -1,10 +0,0 @@ -using './main.bicep' - -param memberRegions = [ - 'westus3' - 'uksouth' - 'eastus2' -] -param kubernetesVersion = '' -param nodeCount = 2 -param vmSize = 'Standard_DS2_v2' diff --git a/documentdb-playground/fleet-add-region/README.md b/documentdb-playground/fleet-add-region/README.md new file mode 100644 index 00000000..9b1eb5df --- /dev/null +++ b/documentdb-playground/fleet-add-region/README.md @@ -0,0 +1,38 @@ +# Fleet Add Region Playground + +This playground focuses on exercising DocumentDB across changing fleet shapes. +It builds on the AKS Fleet Deployment playground (shared Bicep templates and +install scripts) but layers extra tooling to: add a region, remove a region, +verify wiring, and iterate rapidly on those flows before changes graduate to +docs or automation. + +## Goals + +- **Prove add/remove**: Validate that DocumentDB state, KubeFleet placements, +and CNPG clusters survive when a member region joins or leaves. +- **Shareable workflows**: Capture the manual commands and patches used during +prototyping so they can be replayed by others. +- **Regression surface**: Provide a safe spot to run disruptive tests (failovers, +partial rollouts, patching CRPs) without touching the core deployment guide. +- **Consistency with AKS Fleet**: Reuse credentials, hub selection, and discovery +logic from the `aks-fleet-deployment` playground to avoid divergence. + +- `deploy-four-region.sh`: Convenience wrapper to stand up a fresh four-region +fleet using the upstream deployment assets before exercising the add/remove scripts. + +## Typical Workflow + +1. **Bootstrap fleet** using the `deploy-four-region.sh` script which calls the +functions from `../aks-fleet-deployment` (Bicep deployment, cert-manager install, +operator install). All environment variables (e.g., `RESOURCE_GROUP`, `HUB_REGION`) +match the upstream playground so secrets and kubeconfigs remain reusable. +2. **Stand up baseline DocumentDB** via `documentdb-three-region.sh`, which will +make a 3-region cluster, excluding the westus2 region to start. +3. **Introduce changes**: + - Add a westus2 with `add-region.sh` to patch `DocumentDB` and `resourceplacement` + lists. + - Validate with `check.sh` and watch KubeFleet propagate CRs. + - Remove the hub region, westus3 via `remove-region.sh` and re-run `check.sh` + to confirm cleanup. +4. **Experiment repeatedly**, adjusting variables such as `EXCLUDE_REGION`, `HUB_REGION`, + or `DOCUMENTDB_PASSWORD` to simulate production scenarios. diff --git a/documentdb-playground/fleet-add-region/add-region.sh b/documentdb-playground/fleet-add-region/add-region.sh new file mode 100755 index 00000000..3eb669b3 --- /dev/null +++ b/documentdb-playground/fleet-add-region/add-region.sh @@ -0,0 +1,36 @@ +#/bin/bash + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" + +# Dynamically get member clusters from Azure +echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." +MEMBER_CLUSTERS=$(az aks list -g "$RESOURCE_GROUP" -o json | jq -r '.[] | select(.name|startswith("member-")) | .name' | sort) + +if [ -z "$MEMBER_CLUSTERS" ]; then + echo "Error: No member clusters found in resource group $RESOURCE_GROUP" + echo "Please ensure the fleet is deployed first" + exit 1 +fi + +CLUSTER_ARRAY=($MEMBER_CLUSTERS) +echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" +EXCLUDE_CLUSTER="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then EXCLUDE_CLUSTER="$cluster"; fi +done + +kubectl --context $HUB_CLUSTER patch documentdb documentdb-preview -n documentdb-preview-ns \ + --type='json' -p='[ + {"op": "add", "path": "/spec/clusterReplication/clusterList/3", "value":{name: "'"$EXCLUDE_CLUSTER"'", environment: "aks"}}, + ]' + +kubectl --context $HUB_CLUSTER patch resourceplacement documentdb-resource-rp -n documentdb-preview-ns \ + --type='json' -p='[ + {"op": "add", "path": "/spec/policy/clusterNames/3", "value": "'"$EXCLUDE_CLUSTER"'"}, + ]' diff --git a/documentdb-playground/fleet-add-region/check.sh b/documentdb-playground/fleet-add-region/check.sh new file mode 100755 index 00000000..7478ab03 --- /dev/null +++ b/documentdb-playground/fleet-add-region/check.sh @@ -0,0 +1,288 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Validates DocumentDB deployment state across fleet member clusters by comparing +# the configured replication topology with the actual CNPG resources. +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +DOCUMENTDB_NAME="${DOCUMENTDB_NAME:-documentdb-preview}" +DOCUMENTDB_NAMESPACE="${DOCUMENTDB_NAMESPACE:-documentdb-preview-ns}" +DOCUMENTDB_APP_LABEL="${DOCUMENTDB_APP_LABEL:-$DOCUMENTDB_NAME}" +CLUSTER_SELECTOR_PREFIX="${CLUSTER_SELECTOR_PREFIX:-member-}" + +declare -i OVERALL_STATUS=0 +FAILURE_MESSAGES=() + +declare -a CLUSTER_ARRAY=() +declare -a DOCUMENTDB_CLUSTER_NAMES=() +declare -A DOCUMENTDB_CLUSTER_SET=() +EXPECTED_CLUSTER_NAMES_JSON="[]" +declare -i EXPECTED_CLUSTER_COUNT=0 + +log() { + echo "[$(date '+%Y-%m-%dT%H:%M:%S%z')] $*" +} + +require_command() { + if ! command -v "$1" >/dev/null 2>&1; then + echo "Error: required command '$1' not found in PATH" >&2 + exit 1 + fi +} + +record_success() { + local cluster="$1"; shift + echo "[$cluster] ✔ $*" +} + +record_failure() { + local cluster="$1"; shift + echo "[$cluster] ✖ $*" + OVERALL_STATUS=1 + FAILURE_MESSAGES+=("[$cluster] $*") +} + +check_member_cluster() { + local cluster="$1" + + if ! kubectl --context "$cluster" get namespace "$DOCUMENTDB_NAMESPACE" >/dev/null 2>&1; then + record_failure "$cluster" "Namespace $DOCUMENTDB_NAMESPACE is missing" + return + fi + + local cnpg_list + if ! cnpg_list=$(kubectl --context "$cluster" get clusters.postgresql.cnpg.io -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$cluster" "Unable to list CNPG clusters: $cnpg_list" + return + fi + + local doc_owned_clusters + doc_owned_clusters=$(echo "$cnpg_list" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local doc_owned_count + doc_owned_count=$(echo "$doc_owned_clusters" | jq 'length') + + if (( doc_owned_count == 0 )); then + record_failure "$cluster" "No CNPG Cluster owned by DocumentDB $DOCUMENTDB_NAME" + return + fi + + if (( doc_owned_count > 1 )); then + record_failure "$cluster" "Found $doc_owned_count CNPG Clusters owned by DocumentDB (expected 1)" + fi + + local cnpg_obj + cnpg_obj=$(echo "$doc_owned_clusters" | jq '.[0]') + local cnpg_name + cnpg_name=$(echo "$cnpg_obj" | jq -r '.metadata.name') + + local external_count + external_count=$(echo "$cnpg_obj" | jq '.spec.externalClusters // [] | length') + if (( external_count == EXPECTED_CLUSTER_COUNT )); then + record_success "$cluster" "Cluster $cnpg_name externalClusters count matches ($external_count)" + else + record_failure "$cluster" "Cluster $cnpg_name has $external_count externalClusters (expected $EXPECTED_CLUSTER_COUNT)" + fi + + local external_names_json + external_names_json=$(echo "$cnpg_obj" | jq '[.spec.externalClusters // [] | .[]? | .name] | map(select(. != null))') + local missing_names + missing_names=$(jq --argjson expected "$EXPECTED_CLUSTER_NAMES_JSON" --argjson actual "$external_names_json" -n '[ $expected[] | select(. as $item | ($actual | index($item)) | not) ]') + local missing_count + missing_count=$(echo "$missing_names" | jq 'length') + if (( missing_count > 0 )); then + local missing_list + missing_list=$(echo "$missing_names" | jq -r 'join(", ")') + record_failure "$cluster" "Cluster $cnpg_name missing externalClusters for: $missing_list" + else + record_success "$cluster" "Cluster $cnpg_name exposes all expected externalClusters" + fi + + local extra_names + extra_names=$(jq --argjson expected "$EXPECTED_CLUSTER_NAMES_JSON" --argjson actual "$external_names_json" -n '[ $actual[] | select(. as $item | ($expected | index($item)) | not) ]') + local extra_count + extra_count=$(echo "$extra_names" | jq 'length') + if (( extra_count > 0 )); then + local extra_list + extra_list=$(echo "$extra_names" | jq -r 'join(", ")') + record_failure "$cluster" "Cluster $cnpg_name has unexpected externalClusters: $extra_list" + fi + + local expected_instances + expected_instances=$(echo "$cnpg_obj" | jq '.spec.instances // 0') + local pods_json + if ! pods_json=$(kubectl --context "$cluster" get pods -n "$DOCUMENTDB_NAMESPACE" -l "cnpg.io/cluster=$cnpg_name" -o json 2>&1); then + record_failure "$cluster" "Unable to list pods for cluster $cnpg_name: $pods_json" + return + fi + local actual_pods + actual_pods=$(echo "$pods_json" | jq '.items | length') + if (( actual_pods == expected_instances )); then + record_success "$cluster" "Cluster $cnpg_name has $actual_pods pods (matches spec.instances)" + else + record_failure "$cluster" "Cluster $cnpg_name has $actual_pods pods (expected $expected_instances)" + fi + + local additional_service_count + additional_service_count=$(echo "$cnpg_obj" | jq '.spec.managed.services.additional // [] | length') + local expected_service_count=$((3 + additional_service_count)) + local services_json + if ! services_json=$(kubectl --context "$cluster" get svc -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$cluster" "Unable to list services in namespace $DOCUMENTDB_NAMESPACE: $services_json" + return + fi + local services_for_cluster + services_for_cluster=$(echo "$services_json" | jq --arg name "$cnpg_name" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="Cluster" and .name==$name))]') + local actual_service_count + actual_service_count=$(echo "$services_for_cluster" | jq 'length') + if (( actual_service_count == expected_service_count )); then + record_success "$cluster" "Cluster $cnpg_name has $actual_service_count services (expected $expected_service_count)" + else + record_failure "$cluster" "Cluster $cnpg_name has $actual_service_count services (expected $expected_service_count)" + fi +} + +check_non_member_cluster() { + local cluster="$1" + + local cnpg_list + if cnpg_list=$(kubectl --context "$cluster" get clusters.postgresql.cnpg.io -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + local doc_owned_clusters + doc_owned_clusters=$(echo "$cnpg_list" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local doc_owned_count + doc_owned_count=$(echo "$doc_owned_clusters" | jq 'length') + if (( doc_owned_count == 0 )); then + record_success "$cluster" "No DocumentDB CNPG clusters present" + else + record_failure "$cluster" "Found $doc_owned_count DocumentDB CNPG cluster(s) but region is not in clusterList" + fi + else + record_success "$cluster" "CNPG CRD unavailable; treated as no DocumentDB clusters" + fi + + local pods_json + if pods_json=$(kubectl --context "$cluster" get pods -n "$DOCUMENTDB_NAMESPACE" -l "app=$DOCUMENTDB_APP_LABEL" -o json 2>/dev/null); then + local pod_count + pod_count=$(echo "$pods_json" | jq '.items | length') + if (( pod_count == 0 )); then + record_success "$cluster" "No DocumentDB pods present" + else + record_failure "$cluster" "Found $pod_count DocumentDB pods but region is not in clusterList" + fi + else + record_success "$cluster" "Namespace $DOCUMENTDB_NAMESPACE absent; no DocumentDB pods present" + fi + + local services_json + if services_json=$(kubectl --context "$cluster" get svc -n "$DOCUMENTDB_NAMESPACE" -l "app=$DOCUMENTDB_APP_LABEL" -o json 2>/dev/null); then + local service_count + service_count=$(echo "$services_json" | jq '.items | length') + if (( service_count == 0 )); then + record_success "$cluster" "No DocumentDB services present" + else + record_failure "$cluster" "Found $service_count DocumentDB services but region is not in clusterList" + fi + else + record_success "$cluster" "Namespace $DOCUMENTDB_NAMESPACE absent; no DocumentDB services present" + fi +} + +main() { + require_command az + require_command jq + require_command kubectl + + log "Discovering fleet member clusters in resource group $RESOURCE_GROUP" + local discovery_output + if ! discovery_output=$(az aks list -g "$RESOURCE_GROUP" -o json 2>&1); then + echo "Error: unable to list AKS clusters - $discovery_output" >&2 + exit 1 + fi + + readarray -t CLUSTER_ARRAY < <(echo "$discovery_output" | jq -r --arg prefix "$CLUSTER_SELECTOR_PREFIX" '.[] | select(.name | startswith($prefix)) | .name' | sort -u) + if (( ${#CLUSTER_ARRAY[@]} == 0 )); then + echo "Error: no member clusters found with prefix '$CLUSTER_SELECTOR_PREFIX' in resource group $RESOURCE_GROUP" >&2 + exit 1 + fi + + log "Found ${#CLUSTER_ARRAY[@]} member cluster(s):" + for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + done + + local hub_cluster="" + for cluster in "${CLUSTER_ARRAY[@]}"; do + if [[ "$cluster" == *"$HUB_REGION"* ]]; then + hub_cluster="$cluster" + break + fi + done + + if [[ -z "$hub_cluster" ]]; then + echo "Error: unable to find hub cluster matching region substring '$HUB_REGION'" >&2 + exit 1 + fi + + log "Using hub cluster context: $hub_cluster" + + local documentdb_json + if ! documentdb_json=$(kubectl --context "$hub_cluster" get documentdb "$DOCUMENTDB_NAME" -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + echo "Error: unable to fetch DocumentDB $DOCUMENTDB_NAME from hub cluster: $documentdb_json" >&2 + exit 1 + fi + + EXPECTED_CLUSTER_NAMES_JSON=$(echo "$documentdb_json" | jq '[.spec.clusterReplication.clusterList[]? | .name] | map(select(. != null))') + EXPECTED_CLUSTER_COUNT=$(echo "$EXPECTED_CLUSTER_NAMES_JSON" | jq 'length') + readarray -t DOCUMENTDB_CLUSTER_NAMES < <(echo "$EXPECTED_CLUSTER_NAMES_JSON" | jq -r '.[]') + + if (( EXPECTED_CLUSTER_COUNT == 0 )); then + echo "Error: DocumentDB $DOCUMENTDB_NAME has an empty clusterReplication.clusterList" >&2 + exit 1 + fi + + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + DOCUMENTDB_CLUSTER_SET["$name"]=1 + done + + log "DocumentDB $DOCUMENTDB_NAME expects $EXPECTED_CLUSTER_COUNT cluster(s):" + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + echo " - $name" + done + + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + local match_found="false" + for cluster in "${CLUSTER_ARRAY[@]}"; do + if [[ "$cluster" == "$name" ]]; then + match_found="true" + break + fi + done + if [[ "$match_found" == "false" ]]; then + record_failure "$hub_cluster" "DocumentDB references cluster '$name' that was not discovered in resource group $RESOURCE_GROUP" + fi + done + + for cluster in "${CLUSTER_ARRAY[@]}"; do + echo + if [[ -n "${DOCUMENTDB_CLUSTER_SET[$cluster]:-}" ]]; then + log "Checking DocumentDB member cluster: $cluster" + check_member_cluster "$cluster" + else + log "Checking non-member cluster: $cluster" + check_non_member_cluster "$cluster" + fi + done + + echo + if (( OVERALL_STATUS == 0 )); then + log "All checks passed across ${#CLUSTER_ARRAY[@]} cluster(s)." + else + log "Completed with ${#FAILURE_MESSAGES[@]} issue(s):" + for msg in "${FAILURE_MESSAGES[@]}"; do + echo " - $msg" + done + fi + + exit $OVERALL_STATUS +} + +main "$@" diff --git a/documentdb-playground/fleet-add-region/deploy-four-region.sh b/documentdb-playground/fleet-add-region/deploy-four-region.sh new file mode 100755 index 00000000..7d8c41ac --- /dev/null +++ b/documentdb-playground/fleet-add-region/deploy-four-region.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +export MEMBER_REGIONS="westus3,uksouth,eastus2,westus2" +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-add-region-test-rg}" +SCRIPT_DIR="$(dirname "$0")" + +# Deploy the AKS fleet with four regions and install cert-manager on all +$SCRIPT_DIR/../aks-fleet-deployment/deploy-fleet-bicep.sh +$SCRIPT_DIR/../aks-fleet-deployment/install-cert-manager.sh +$SCRIPT_DIR/../aks-fleet-deployment/install-documentdb-operator.sh diff --git a/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml new file mode 100644 index 00000000..43880319 --- /dev/null +++ b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml @@ -0,0 +1,85 @@ +# Namespace definition +apiVersion: v1 +kind: Namespace +metadata: + name: documentdb-preview-ns + +--- + +apiVersion: v1 +kind: Secret +metadata: + name: documentdb-credentials + namespace: documentdb-preview-ns +type: Opaque +stringData: + username: default_user + password: {{DOCUMENTDB_PASSWORD}} + +--- + +apiVersion: db.microsoft.com/preview +kind: DocumentDB +metadata: + name: documentdb-preview + namespace: documentdb-preview-ns +spec: + nodeCount: 1 + instancesPerNode: 1 + documentDBImage: ghcr.io/microsoft/documentdb/documentdb-local:16 + gatewayImage: ghcr.io/microsoft/documentdb/documentdb-local:16 + resource: + storage: + pvcSize: 10Gi + environment: aks + clusterReplication: + highAvailability: true + crossCloudNetworkingStrategy: AzureFleet + primary: {{PRIMARY_CLUSTER}} + clusterList: +{{CLUSTER_LIST}} + exposeViaService: + serviceType: LoadBalancer + logLevel: info + +--- + +apiVersion: placement.kubernetes-fleet.io/v1beta1 +kind: ClusterResourcePlacement +metadata: + name: documentdb-namespace-crp +spec: + resourceSelectors: + - group: "" + version: v1 + kind: Namespace + name: documentdb-preview-ns + selectionScope: NamespaceOnly # only namespace itself is placed, no resources within the namespace + policy: + placementType: PickAll + strategy: + type: RollingUpdate + +--- +# Application team manages resources using RP +apiVersion: placement.kubernetes-fleet.io/v1beta1 +kind: ResourcePlacement +metadata: + name: documentdb-resource-rp + namespace: documentdb-preview-ns +spec: + resourceSelectors: + - group: db.microsoft.com + kind: DocumentDB + version: preview + name: documentdb-preview + - group: "" + version: v1 + kind: Secret + name: documentdb-credentials + policy: + placementType: PickFixed + clusterNames: +{{CLUSTER_LIST_CRP}} + strategy: + type: RollingUpdate diff --git a/documentdb-playground/fleet-add-region/documentdb-three-region.sh b/documentdb-playground/fleet-add-region/documentdb-three-region.sh new file mode 100755 index 00000000..cd6ade9b --- /dev/null +++ b/documentdb-playground/fleet-add-region/documentdb-three-region.sh @@ -0,0 +1,164 @@ +#!/bin/bash +set -euo pipefail + +# Environment variables: +# RESOURCE_GROUP: Azure resource group (default: documentdb-aks-fleet-rg) +# DOCUMENTDB_PASSWORD: Database password (will be generated if not provided) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" + +# Set password from argument or environment variable +DOCUMENTDB_PASSWORD="${1:-${DOCUMENTDB_PASSWORD:-}}" + +# If no password provided, generate a secure one +if [ -z "$DOCUMENTDB_PASSWORD" ]; then + echo "No password provided. Generating a secure password..." + DOCUMENTDB_PASSWORD=$(openssl rand -base64 32 | tr -d "=+/" | cut -c1-25) + echo "Generated password: $DOCUMENTDB_PASSWORD" + echo "(Save this password - you'll need it to connect to the database)" + echo "" +fi + +# Export for envsubst +export DOCUMENTDB_PASSWORD + +# Dynamically get member clusters from Azure +echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." +MEMBER_CLUSTERS=$(az aks list -g "$RESOURCE_GROUP" -o json | jq -r '.[] | select(.name|startswith("member-")) | .name' | sort) + +if [ -z "$MEMBER_CLUSTERS" ]; then + echo "Error: No member clusters found in resource group $RESOURCE_GROUP" + echo "Please ensure the fleet is deployed first" + exit 1 +fi + +# Convert to array +CLUSTER_ARRAY=($MEMBER_CLUSTERS) +echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi +done + +# Select primary cluster and excluded cluster +PRIMARY_CLUSTER="${CLUSTER_ARRAY[0]}" +EXCLUDE_CLUSTER="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [[ "$cluster" == *"eastus2"* ]]; then + PRIMARY_CLUSTER="$cluster" + fi + if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then + EXCLUDE_CLUSTER="$cluster" + fi +done + +echo "" +echo "Selected primary cluster: $PRIMARY_CLUSTER" + +# Build the cluster list YAML with proper indentation +CLUSTER_LIST="" +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then + echo "Excluding cluster $cluster from DocumentDB configuration" + continue + fi + if [ -z "$CLUSTER_LIST" ]; then + CLUSTER_LIST=" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP=" - ${cluster}" + else + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" + fi +done + +# Step 1: Create cluster identification ConfigMaps on each member cluster +echo "" +echo "=======================================" +echo "Creating cluster identification ConfigMaps..." +echo "=======================================" + +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo "" + echo "Processing ConfigMap for $cluster..." + + # Check if context exists + if ! kubectl config get-contexts "$cluster" &>/dev/null; then + echo "✗ Context $cluster not found, skipping" + continue + fi + + # Create or update the cluster-name ConfigMap + kubectl --context "$cluster" create configmap cluster-name \ + -n kube-system \ + --from-literal=name="$cluster" \ + --dry-run=client -o yaml | kubectl --context "$cluster" apply -f - + + # Verify the ConfigMap was created + if kubectl --context "$cluster" get configmap cluster-name -n kube-system &>/dev/null; then + echo "✓ ConfigMap created/updated for $cluster" + else + echo "✗ Failed to create ConfigMap for $cluster" + fi +done + +# Step 2: Deploy DocumentDB resources via Fleet +echo "" +echo "=======================================" +echo "Deploying DocumentDB multi-region configuration..." +echo "=======================================" + +# Determine hub context +HUB_CLUSTER="${HUB_CLUSTER:-hub}" +if ! kubectl config get-contexts "$HUB_CLUSTER" &>/dev/null; then + echo "Error: Hub context not found. Please ensure you have credentials for the fleet." + exit 1 +fi + +echo "Using hub context: $HUB_CLUSTER" + +# Create a temporary file with substituted values +TEMP_YAML=$(mktemp) + +# Use sed for safer substitution +sed -e "s/{{DOCUMENTDB_PASSWORD}}/$DOCUMENTDB_PASSWORD/g" \ + -e "s/{{PRIMARY_CLUSTER}}/$PRIMARY_CLUSTER/g" \ + "$SCRIPT_DIR/documentdb-resource-crp.yaml" | \ +while IFS= read -r line; do + if [[ "$line" == '{{CLUSTER_LIST}}' ]]; then + echo "$CLUSTER_LIST" + elif [[ "$line" == '{{CLUSTER_LIST_CRP}}' ]]; then + echo "$CLUSTER_LIST_CRP" + else + echo "$line" + fi +done > "$TEMP_YAML" + +# Debug: show the generated YAML section with clusterReplication +echo "" +echo "Generated configuration preview:" +echo "--------------------------------" +echo "Primary cluster: $PRIMARY_CLUSTER" +echo "Cluster list:" +echo "$CLUSTER_LIST" +echo "--------------------------------" + +# Apply the configuration +echo "" +echo "Applying DocumentDB multi-region configuration..." +kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" + +# Clean up temp file +rm -f "$TEMP_YAML" + +echo "" +echo "Connection Information:" +echo " Username: default_user" +echo " Password: $DOCUMENTDB_PASSWORD" +echo "" diff --git a/documentdb-playground/fleet-add-region/remove-region.sh b/documentdb-playground/fleet-add-region/remove-region.sh new file mode 100755 index 00000000..c0f6e855 --- /dev/null +++ b/documentdb-playground/fleet-add-region/remove-region.sh @@ -0,0 +1,34 @@ +#/bin/bash + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" + +# Dynamically get member clusters from Azure +echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." +MEMBER_CLUSTERS=$(az aks list -g "$RESOURCE_GROUP" -o json | jq -r '.[] | select(.name|startswith("member-")) | .name' | sort) + +if [ -z "$MEMBER_CLUSTERS" ]; then + echo "Error: No member clusters found in resource group $RESOURCE_GROUP" + echo "Please ensure the fleet is deployed first" + exit 1 +fi + +CLUSTER_ARRAY=($MEMBER_CLUSTERS) +echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi +done + +kubectl --context $HUB_CLUSTER patch documentdb documentdb-preview -n documentdb-preview-ns \ + --type='json' -p='[ + {"op": "remove", "path": "/spec/clusterReplication/clusterList/2"}, + ]' + +kubectl --context $HUB_CLUSTER patch resourceplacement documentdb-resource-rp -n documentdb-preview-ns \ + --type='json' -p='[ + {"op": "add", "path": "/spec/policy/clusterNames/2"} + ]' diff --git a/operator/src/internal/controller/documentdb_controller.go b/operator/src/internal/controller/documentdb_controller.go index c0e9b126..844119a3 100644 --- a/operator/src/internal/controller/documentdb_controller.go +++ b/operator/src/internal/controller/documentdb_controller.go @@ -82,6 +82,17 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + if replicationContext.IsNotPresent() { + logger.Info("DocumentDB instance is not part of the replication setup; skipping reconciliation and deleting any present resources") + if err := r.cleanupResources(ctx, req, documentdb); err != nil { + return ctrl.Result{}, err + } + if err := util.DeleteOwnedResources(ctx, r.Client, documentdb.ObjectMeta); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + var documentDbServiceIp string // Only create/manage the service if ExposeViaService is configured diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index 0ef99b83..2df6fc2b 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -25,6 +25,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +const ( + demotionTokenPollInterval = 5 * time.Second + demotionTokenWaitTimeout = 10 * time.Minute +) + func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( ctx context.Context, documentdb *dbpreview.DocumentDB, @@ -250,14 +255,6 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de return nil, -1 } - // Update the primary if it has changed - primaryChanged := current.Spec.ReplicaCluster.Primary != desired.Spec.ReplicaCluster.Primary - - tokenNeedsUpdate, err := r.PromotionTokenNeedsUpdate(ctx, current.Namespace) - if err != nil { - return err, time.Second * 10 - } - if current.Spec.ReplicaCluster.Self != desired.Spec.ReplicaCluster.Self { return fmt.Errorf("self cannot be changed"), time.Second * 60 } @@ -265,10 +262,40 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de // Create JSON patch operations for all replica cluster updates var patchOps []util.JSONPatch - if tokenNeedsUpdate || primaryChanged && current.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { + // Update if the primary has changed + primaryChanged := current.Spec.ReplicaCluster.Primary != desired.Spec.ReplicaCluster.Primary + if primaryChanged { + err, refreshTime := r.getPrimaryChangePatchOps(ctx, &patchOps, current, desired, documentdb, replicationContext) + if refreshTime > 0 || err != nil { + return err, refreshTime + } + } + + // Update if the cluster list has changed + replicasChanged := externalClusterNamesChanged(current.Spec.ExternalClusters, desired.Spec.ExternalClusters) + if replicasChanged { + getReplicasChangePatchOps(&patchOps, desired, replicationContext) + } + + if len(patchOps) > 0 { + patch, err := json.Marshal(patchOps) + if err != nil { + return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 + } + err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) + if err != nil { + return err, time.Second * 10 + } + } + + return nil, -1 +} + +func (r *DocumentDBReconciler) getPrimaryChangePatchOps(ctx context.Context, patchOps *[]util.JSONPatch, current, desired *cnpgv1.Cluster, documentdb *dbpreview.DocumentDB, replicationContext *util.ReplicationContext) (error, time.Duration) { + if current.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { // Primary => replica // demote - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICA_CLUSTER, Value: desired.Spec.ReplicaCluster, @@ -279,41 +306,30 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de // Only add remove operation if synchronous field exists, otherwise there's an error // TODO this wouldn't be true if our "wait for token" logic wasn't reliant on a failure if current.Spec.PostgresConfiguration.Synchronous != nil { - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REMOVE, Path: util.JSON_PATCH_PATH_POSTGRES_CONFIG_SYNC, }) } - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_INSTANCES, Value: desired.Spec.Instances, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_PLUGINS, Value: desired.Spec.Plugins, }) } - patch, err := json.Marshal(patchOps) - if err != nil { - return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 - } - - log.Log.Info("Applying patch for Primary => Replica transition", "patch", string(patch), "cluster", current.Name) + log.Log.Info("Applying patch for Primary => Replica transition", "cluster", current.Name) - err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) - if err != nil { - return err, time.Second * 10 - } + // push out the promotion token when it's available + nn := types.NamespacedName{Name: current.Name, Namespace: current.Namespace} + go r.waitForDemotionTokenAndCreateService(nn, replicationContext) - // push out the promotion token - err = r.CreateTokenService(ctx, current.Status.DemotionToken, documentdb.Namespace, replicationContext) - if err != nil { - return err, time.Second * 10 - } - } else if primaryChanged && desired.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { + } else if desired.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { // Replica => primary // Look for the token if this is a managed failover oldPrimaryAvailable := slices.Contains( @@ -333,7 +349,7 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de replicaClusterConfig.PromotionToken = token } - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICA_CLUSTER, Value: replicaClusterConfig, @@ -341,61 +357,86 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de if documentdb.Spec.ClusterReplication.HighAvailability { // need to add second instance and wal replica - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_POSTGRES_CONFIG, Value: desired.Spec.PostgresConfiguration, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_INSTANCES, Value: desired.Spec.Instances, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_PLUGINS, Value: desired.Spec.Plugins, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICATION_SLOTS, Value: desired.Spec.ReplicationSlots, }) } - - patch, err := json.Marshal(patchOps) - if err != nil { - return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 - } - - log.Log.Info("Applying patch for Replica => Primary transition", "patch", string(patch), "cluster", current.Name, "hasToken", replicaClusterConfig.PromotionToken != "") - - err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) - if err != nil { - return err, time.Second * 10 - } - } else if primaryChanged { + log.Log.Info("Applying patch for Replica => Primary transition", "cluster", current.Name, "hasToken", replicaClusterConfig.PromotionToken != "") + } else { // Replica => replica - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICA_CLUSTER, Value: desired.Spec.ReplicaCluster, }) - patch, err := json.Marshal(patchOps) - if err != nil { - return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 - } + log.Log.Info("Applying patch for Replica => Replica transition", "patch", "cluster", current.Name) + } - log.Log.Info("Applying patch for Replica => Replica transition", "patch", string(patch), "cluster", current.Name) + return nil, -1 +} - err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) - if err != nil { - return err, time.Second * 10 +func externalClusterNamesChanged(currentClusters, desiredClusters []cnpgv1.ExternalCluster) bool { + if len(currentClusters) != len(desiredClusters) { + return true + } + + if len(currentClusters) == 0 { + return false + } + + nameSet := make(map[string]bool, len(currentClusters)) + for _, cluster := range currentClusters { + nameSet[cluster.Name] = true + } + + for _, cluster := range desiredClusters { + if found := nameSet[cluster.Name]; !found { + return true } + delete(nameSet, cluster.Name) } - return nil, -1 + return len(nameSet) != 0 +} + +func getReplicasChangePatchOps(patchOps *[]util.JSONPatch, desired *cnpgv1.Cluster, replicationContext *util.ReplicationContext) { + *patchOps = append(*patchOps, util.JSONPatch{ + Op: util.JSON_PATCH_OP_REPLACE, + Path: util.JSON_PATCH_PATH_EXTERNAL_CLUSTERS, + Value: desired.Spec.ExternalClusters, + }) + if replicationContext.IsAzureFleetNetworking() { + *patchOps = append(*patchOps, util.JSONPatch{ + Op: util.JSON_PATCH_OP_REPLACE, + Path: util.JSON_PATCH_PATH_MANAGED_SERVICES, + Value: desired.Spec.Managed.Services.Additional, + }) + } + if replicationContext.IsPrimary() { + *patchOps = append(*patchOps, util.JSONPatch{ + Op: util.JSON_PATCH_OP_REPLACE, + Path: util.JSON_PATCH_PATH_SYNCHRONOUS, + Value: desired.Spec.PostgresConfiguration.Synchronous, + }) + } } func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, replicationContext *util.ReplicationContext) (string, error, time.Duration) { @@ -506,24 +547,41 @@ func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, return string(token[:]), nil, -1 } -// TODO make this not have to check the configmap twice -// RETURN true if we have a configmap with a blank token -func (r *DocumentDBReconciler) PromotionTokenNeedsUpdate(ctx context.Context, namespace string) (bool, error) { - tokenServiceName := "promotion-token" - configMap := &corev1.ConfigMap{} - err := r.Get(ctx, types.NamespacedName{Name: tokenServiceName, Namespace: namespace}, configMap) - if err != nil { - // If we don't find the map, we don't need to update - if errors.IsNotFound(err) { - return false, nil +func (r *DocumentDBReconciler) waitForDemotionTokenAndCreateService(clusterNN types.NamespacedName, replicationContext *util.ReplicationContext) { + ctx := context.Background() + ticker := time.NewTicker(demotionTokenPollInterval) + timeout := time.NewTimer(demotionTokenWaitTimeout) + + for { + select { + case <-ticker.C: + done, err := r.ensureTokenServiceResources(ctx, clusterNN, replicationContext) + if err != nil { + log.Log.Error(err, "Failed to create token service resources", "cluster", clusterNN.Name) + } + if done { + return + } + case <-timeout.C: + ticker.Stop() + log.Log.Info("Timed out waiting for demotion token", "cluster", clusterNN.Name, "timeout", demotionTokenWaitTimeout) + return } - return false, err } - // Otherwise, we need to update if the value is blank - return configMap.Data["index.html"] == "", nil } -func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token string, namespace string, replicationContext *util.ReplicationContext) error { +// Returns true when token service resources are ready +func (r *DocumentDBReconciler) ensureTokenServiceResources(ctx context.Context, clusterNN types.NamespacedName, replicationContext *util.ReplicationContext) (bool, error) { + cluster := &cnpgv1.Cluster{} + if err := r.Client.Get(ctx, clusterNN, cluster); err != nil { + return false, err + } + + token := cluster.Status.DemotionToken + if token == "" { + return false, nil + } + tokenServiceName := "promotion-token" labels := map[string]string{ "app": tokenServiceName, @@ -533,7 +591,7 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str configMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, }, Data: map[string]string{ "index.html": token, @@ -546,27 +604,23 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str configMap.Data["index.html"] = token err = r.Client.Update(ctx, configMap) if err != nil { - return fmt.Errorf("failed to update token ConfigMap: %w", err) + return false, fmt.Errorf("failed to update token ConfigMap: %w", err) } } else { - return fmt.Errorf("failed to create token ConfigMap: %w", err) + return false, fmt.Errorf("failed to create token ConfigMap: %w", err) } } - if token == "" { - return fmt.Errorf("No token found yet") - } - // When not using cross-cloud networking, just transfer with the configmap if !replicationContext.IsAzureFleetNetworking() && !replicationContext.IsIstioNetworking() { - return nil + return true, nil } // Create nginx Pod pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, Labels: labels, }, Spec: corev1.PodSpec{ @@ -605,14 +659,14 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str err = r.Client.Create(ctx, pod) if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create nginx Pod: %w", err) + return false, fmt.Errorf("failed to create nginx Pod: %w", err) } // Create Service service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, Labels: labels, }, Spec: corev1.ServiceSpec{ @@ -629,7 +683,7 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str err = r.Client.Create(ctx, service) if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create Service: %w", err) + return false, fmt.Errorf("failed to create Service: %w", err) } // Create ServiceExport only for fleet networking @@ -637,15 +691,15 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str serviceExport := &fleetv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, }, } err = r.Client.Create(ctx, serviceExport) if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create ServiceExport: %w", err) + return false, fmt.Errorf("failed to create ServiceExport: %w", err) } } - return nil + return true, nil } diff --git a/operator/src/internal/utils/constants.go b/operator/src/internal/utils/constants.go index 5481cb9b..25c23835 100644 --- a/operator/src/internal/utils/constants.go +++ b/operator/src/internal/utils/constants.go @@ -42,6 +42,9 @@ const ( JSON_PATCH_PATH_INSTANCES = "/spec/instances" JSON_PATCH_PATH_PLUGINS = "/spec/plugins" JSON_PATCH_PATH_REPLICATION_SLOTS = "/spec/replicationSlots" + JSON_PATCH_PATH_EXTERNAL_CLUSTERS = "/spec/externalClusters" + JSON_PATCH_PATH_MANAGED_SERVICES = "/spec/managed/services/additional" + JSON_PATCH_PATH_SYNCHRONOUS = "/spec/postgresql/synchronous" // JSON Patch operations JSON_PATCH_OP_REPLACE = "replace" diff --git a/operator/src/internal/utils/replication_context.go b/operator/src/internal/utils/replication_context.go index 66c3ff47..f6d187aa 100644 --- a/operator/src/internal/utils/replication_context.go +++ b/operator/src/internal/utils/replication_context.go @@ -40,6 +40,7 @@ const ( NoReplication replicationState = iota Primary Replica + NotPresent ) func GetReplicationContext(ctx context.Context, client client.Client, documentdb dbpreview.DocumentDB) (*ReplicationContext, error) { @@ -59,6 +60,19 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb return nil, err } + // If self is nil, then this cluster is not part of the replication setup + // This edge case can happen when the Hub cluster is also a member, and we are not + // putting the documentdb instance on it + if self == nil { + return &ReplicationContext{ + state: NotPresent, + CrossCloudNetworkingStrategy: None, + Environment: "", + StorageClass: "", + Self: "", + }, nil + } + // If no remote clusters, then just proceed with a regular cluster if len(others) == 0 { return &singleClusterReplicationContext, nil @@ -113,6 +127,10 @@ func (r *ReplicationContext) IsReplicating() bool { return r.state == Replica || r.state == Primary } +func (r *ReplicationContext) IsNotPresent() bool { + return r.state == NotPresent +} + // Gets the primary if you're a replica, otherwise returns the first other cluster func (r ReplicationContext) GetReplicationSource() string { if r.state == Replica { @@ -172,9 +190,9 @@ func (r ReplicationContext) GenerateOutgoingServiceNames(name, resourceGroup str // Creates the standby names list, which will be all other clusters in addition to "pg_receivewal" func (r *ReplicationContext) CreateStandbyNamesList() []string { - standbyNames := make([]string, len(r.Others)+1) + standbyNames := make([]string, len(r.Others)) copy(standbyNames, r.Others) - /* TODO re-enable when we have a WAL replica image + /* TODO re-enable when we have a WAL replica image (also add one to length) standbyNames[len(r.Others)] = "pg_receivewal" */ return standbyNames diff --git a/operator/src/internal/utils/util.go b/operator/src/internal/utils/util.go index fe71412c..e5a40e49 100644 --- a/operator/src/internal/utils/util.go +++ b/operator/src/internal/utils/util.go @@ -10,11 +10,13 @@ import ( "strconv" "time" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -313,6 +315,55 @@ func DeleteRoleBinding(ctx context.Context, c client.Client, name, namespace str return nil } +func DeleteOwnedResources(ctx context.Context, c client.Client, owner metav1.ObjectMeta) error { + log := log.FromContext(ctx) + + hasOwnerReference := func(refs []metav1.OwnerReference) bool { + for _, ref := range refs { + if ref.UID == owner.UID && ref.Name == owner.Name { + return true + } + } + return false + } + + listInNamespace := client.InNamespace(owner.Namespace) + var errList []error + + var serviceList corev1.ServiceList + if err := c.List(ctx, &serviceList, listInNamespace); err != nil { + return fmt.Errorf("failed to list services: %w", err) + } + for i := range serviceList.Items { + svc := &serviceList.Items[i] + if hasOwnerReference(svc.OwnerReferences) { + if err := c.Delete(ctx, svc); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned Service", "name", svc.Name, "namespace", svc.Namespace) + errList = append(errList, fmt.Errorf("service %s/%s: %w", svc.Namespace, svc.Name, err)) + } + } + } + + var clusterList cnpgv1.ClusterList + if err := c.List(ctx, &clusterList, listInNamespace); err != nil { + return fmt.Errorf("failed to list CNPG clusters: %w", err) + } + for i := range clusterList.Items { + cluster := &clusterList.Items[i] + if hasOwnerReference(cluster.OwnerReferences) { + if err := c.Delete(ctx, cluster); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned CNPG Cluster", "name", cluster.Name, "namespace", cluster.Namespace) + errList = append(errList, fmt.Errorf("cnpg cluster %s/%s: %w", cluster.Namespace, cluster.Name, err)) + } + } + } + + if len(errList) > 0 { + return utilerrors.NewAggregate(errList) + } + return nil +} + // GenerateConnectionString returns a MongoDB connection string for the DocumentDB instance. // When trustTLS is true, tlsAllowInvalidCertificates is omitted for strict verification. func GenerateConnectionString(documentdb *dbpreview.DocumentDB, serviceIp string, trustTLS bool) string { From dcfa735a722b2c4ba157a5650e49a18d0d12488f Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Thu, 22 Jan 2026 15:46:18 -0500 Subject: [PATCH 2/8] bugfix: don't create cluster when not in crd --- .../aks-fleet-deployment/deploy-fleet-bicep.sh | 7 ------- .../aks-fleet-deployment/deploy-multi-region.sh | 3 +-- documentdb-playground/fleet-add-region/add-region.sh | 6 +++--- .../fleet-add-region/documentdb-resource-crp.yaml | 6 ++---- .../fleet-add-region/remove-region.sh | 7 +++---- .../src/internal/controller/documentdb_controller.go | 2 +- .../src/internal/controller/physical_replication.go | 10 +++++++--- operator/src/internal/utils/replication_context.go | 8 ++++---- 8 files changed, 21 insertions(+), 28 deletions(-) diff --git a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh index ac62ef46..3aa7ce4f 100755 --- a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh +++ b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh @@ -99,13 +99,6 @@ while read -r vnet1; do --remote-vnet "$vnet2" \ --allow-vnet-access true \ --allow-forwarded-traffic true - az network vnet peering create \ - --name "${vnet2}-to-${vnet1}-peering" \ - --resource-group "$RESOURCE_GROUP" \ - --vnet-name "$vnet2" \ - --remote-vnet "$vnet1" \ - --allow-vnet-access true \ - --allow-forwarded-traffic true done <<< "$VNET_NAMES" done <<< "$VNET_NAMES" diff --git a/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh b/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh index 1062e107..614c1753 100755 --- a/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh +++ b/documentdb-playground/aks-fleet-deployment/deploy-multi-region.sh @@ -318,8 +318,7 @@ if [ "$ENABLE_AZURE_DNS" = "true" ]; then echo "Creating DNS record: $cluster" # Create service name by concatenating documentdb-preview with cluster name (max 63 chars) - SERVICE_NAME="documentdb-service-${cluster}" - SERVICE_NAME="${SERVICE_NAME:0:63}" + SERVICE_NAME="documentdb-service-documentdb-preview" # Get the external IP of the DocumentDB service EXTERNAL_IP="" diff --git a/documentdb-playground/fleet-add-region/add-region.sh b/documentdb-playground/fleet-add-region/add-region.sh index 3eb669b3..88c69ebf 100755 --- a/documentdb-playground/fleet-add-region/add-region.sh +++ b/documentdb-playground/fleet-add-region/add-region.sh @@ -1,4 +1,4 @@ -#/bin/bash +#!/bin/bash SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" @@ -27,10 +27,10 @@ done kubectl --context $HUB_CLUSTER patch documentdb documentdb-preview -n documentdb-preview-ns \ --type='json' -p='[ - {"op": "add", "path": "/spec/clusterReplication/clusterList/3", "value":{name: "'"$EXCLUDE_CLUSTER"'", environment: "aks"}}, + {"op": "add", "path": "/spec/clusterReplication/clusterList/3", "value":{"name": "'"$EXCLUDE_CLUSTER"'", "environment": "aks"}} ]' kubectl --context $HUB_CLUSTER patch resourceplacement documentdb-resource-rp -n documentdb-preview-ns \ --type='json' -p='[ - {"op": "add", "path": "/spec/policy/clusterNames/3", "value": "'"$EXCLUDE_CLUSTER"'"}, + {"op": "add", "path": "/spec/policy/clusterNames/3", "value": "'"$EXCLUDE_CLUSTER"'"} ]' diff --git a/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml index 43880319..14de7912 100644 --- a/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml +++ b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml @@ -18,7 +18,7 @@ stringData: --- -apiVersion: db.microsoft.com/preview +apiVersion: documentdb.io/preview kind: DocumentDB metadata: name: documentdb-preview @@ -26,8 +26,6 @@ metadata: spec: nodeCount: 1 instancesPerNode: 1 - documentDBImage: ghcr.io/microsoft/documentdb/documentdb-local:16 - gatewayImage: ghcr.io/microsoft/documentdb/documentdb-local:16 resource: storage: pvcSize: 10Gi @@ -69,7 +67,7 @@ metadata: namespace: documentdb-preview-ns spec: resourceSelectors: - - group: db.microsoft.com + - group: documentdb.io kind: DocumentDB version: preview name: documentdb-preview diff --git a/documentdb-playground/fleet-add-region/remove-region.sh b/documentdb-playground/fleet-add-region/remove-region.sh index c0f6e855..4bb19319 100755 --- a/documentdb-playground/fleet-add-region/remove-region.sh +++ b/documentdb-playground/fleet-add-region/remove-region.sh @@ -1,10 +1,9 @@ -#/bin/bash +#!/bin/bash SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" HUB_REGION="${HUB_REGION:-westus3}" -EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" # Dynamically get member clusters from Azure echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." @@ -25,10 +24,10 @@ done kubectl --context $HUB_CLUSTER patch documentdb documentdb-preview -n documentdb-preview-ns \ --type='json' -p='[ - {"op": "remove", "path": "/spec/clusterReplication/clusterList/2"}, + {"op": "remove", "path": "/spec/clusterReplication/clusterList/2"} ]' kubectl --context $HUB_CLUSTER patch resourceplacement documentdb-resource-rp -n documentdb-preview-ns \ --type='json' -p='[ - {"op": "add", "path": "/spec/policy/clusterNames/2"} + {"op": "remove", "path": "/spec/policy/clusterNames/2"} ]' diff --git a/operator/src/internal/controller/documentdb_controller.go b/operator/src/internal/controller/documentdb_controller.go index 844119a3..c37e2513 100644 --- a/operator/src/internal/controller/documentdb_controller.go +++ b/operator/src/internal/controller/documentdb_controller.go @@ -84,7 +84,7 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) if replicationContext.IsNotPresent() { logger.Info("DocumentDB instance is not part of the replication setup; skipping reconciliation and deleting any present resources") - if err := r.cleanupResources(ctx, req, documentdb); err != nil { + if err := r.cleanupResources(ctx, req); err != nil { return ctrl.Result{}, err } if err := util.DeleteOwnedResources(ctx, r.Client, documentdb.ObjectMeta); err != nil { diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index 2df6fc2b..97f82940 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -204,7 +204,7 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, for serviceName := range replicationContext.GenerateOutgoingServiceNames(documentdb.Name, documentdb.Namespace) { foundServiceExport := &fleetv1alpha1.ServiceExport{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: documentdb.Namespace}, foundServiceExport) - if err != nil && errors.IsNotFound(err) { + if errors.IsNotFound(err) { log.Log.Info("Service Export not found. Creating a new Service Export " + serviceName) // Service Export @@ -218,6 +218,8 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, if err != nil { return err } + } else { + return err } } @@ -274,6 +276,7 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de // Update if the cluster list has changed replicasChanged := externalClusterNamesChanged(current.Spec.ExternalClusters, desired.Spec.ExternalClusters) if replicasChanged { + log.Log.Info("Updating external clusters") getReplicasChangePatchOps(&patchOps, desired, replicationContext) } @@ -387,7 +390,7 @@ func (r *DocumentDBReconciler) getPrimaryChangePatchOps(ctx context.Context, pat Value: desired.Spec.ReplicaCluster, }) - log.Log.Info("Applying patch for Replica => Replica transition", "patch", "cluster", current.Name) + log.Log.Info("Applying patch for Replica => Replica transition", "cluster", current.Name) } return nil, -1 @@ -551,6 +554,8 @@ func (r *DocumentDBReconciler) waitForDemotionTokenAndCreateService(clusterNN ty ctx := context.Background() ticker := time.NewTicker(demotionTokenPollInterval) timeout := time.NewTimer(demotionTokenWaitTimeout) + defer ticker.Stop() + defer timeout.Stop() for { select { @@ -563,7 +568,6 @@ func (r *DocumentDBReconciler) waitForDemotionTokenAndCreateService(clusterNN ty return } case <-timeout.C: - ticker.Stop() log.Log.Info("Timed out waiting for demotion token", "cluster", clusterNN.Name, "timeout", demotionTokenWaitTimeout) return } diff --git a/operator/src/internal/utils/replication_context.go b/operator/src/internal/utils/replication_context.go index f6d187aa..5eec302f 100644 --- a/operator/src/internal/utils/replication_context.go +++ b/operator/src/internal/utils/replication_context.go @@ -215,16 +215,16 @@ func getTopology(ctx context.Context, client client.Client, documentdb dbpreview } others := []string{} - var self dbpreview.MemberCluster + var self *dbpreview.MemberCluster for _, c := range documentdb.Spec.ClusterReplication.ClusterList { if c.Name != memberClusterName { others = append(others, generateCNPGClusterName(documentdb.Name, c.Name)) } else { - self = c + self = &c + self.Name = generateCNPGClusterName(documentdb.Name, self.Name) } } - self.Name = generateCNPGClusterName(documentdb.Name, self.Name) - return &self, others, state, nil + return self, others, state, nil } func GetSelfName(ctx context.Context, client client.Client) (string, error) { From b8c1d8a81ffc613bbeb9fe3eaa0edbb63e2dcdee Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Tue, 27 Jan 2026 16:46:51 -0500 Subject: [PATCH 3/8] fix hub/member service conflicts --- .../deploy-fleet-bicep.sh | 19 +- .../fleet-add-region/add-region.sh | 75 +++++++- .../fleet-add-region/check.sh | 49 ++++- .../documentdb-resource-crp.yaml | 4 + .../fleet-add-region/remove-region.sh | 49 ++++- .../templates/05_clusterrole.yaml | 2 +- .../controller/documentdb_controller.go | 22 +++ .../controller/physical_replication.go | 162 +++++++++++++++- .../controller/physical_replication_test.go | 173 ++++++++++++++++++ .../src/internal/utils/replication_context.go | 79 +++++--- operator/src/internal/utils/util_test.go | 6 +- 11 files changed, 577 insertions(+), 63 deletions(-) create mode 100644 operator/src/internal/controller/physical_replication_test.go diff --git a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh index 3aa7ce4f..eab2d86f 100755 --- a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh +++ b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh @@ -114,6 +114,20 @@ git clone https://github.com/kubefleet-dev/kubefleet.git $kubeDir pushd $kubeDir # Set up HUB_CLUSTER as the hub kubectl config use-context $HUB_CLUSTER + +# Install cert manager on hub cluster +helm repo add jetstack https://charts.jetstack.io +helm repo update + +echo -e "\nInstalling cert-manager on $HUB_CLUSTER..." +helm upgrade --install cert-manager jetstack/cert-manager \ + --namespace cert-manager \ + --create-namespace \ + --set crds.enabled=true +kubectl rollout status deployment/cert-manager -n cert-manager --timeout=240s || true +echo "Pods ($HUB_CLUSTER):" +kubectl get pods -n cert-manager -o wide || true + export REGISTRY="ghcr.io/kubefleet-dev/kubefleet" export TAG=$(curl "https://api.github.com/repos/kubefleet-dev/kubefleet/tags" | jq -r '.[0].name') # Gets latest tag # Install the helm chart for running Fleet agents on the hub cluster. @@ -128,7 +142,10 @@ helm upgrade --install hub-agent ./charts/hub-agent/ \ --set logFileMaxSize=100000 \ --set MaxConcurrentClusterPlacement=200 \ --set namespace=fleet-system-hub \ - --set enableWorkload=true + --set enableWorkload=true #\ + #--set useCertManager=true \ + #--set enableWebhook=true + # Run the script. chmod +x ./hack/membership/joinMC.sh diff --git a/documentdb-playground/fleet-add-region/add-region.sh b/documentdb-playground/fleet-add-region/add-region.sh index 88c69ebf..670aa34d 100755 --- a/documentdb-playground/fleet-add-region/add-region.sh +++ b/documentdb-playground/fleet-add-region/add-region.sh @@ -4,6 +4,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" HUB_REGION="${HUB_REGION:-westus3}" +PRIMARY_REGION="${PRIMARY_REGION:-eastus2}" EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" # Dynamically get member clusters from Azure @@ -23,14 +24,72 @@ for cluster in "${CLUSTER_ARRAY[@]}"; do echo " - $cluster" if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then EXCLUDE_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$PRIMARY_REGION"* ]]; then PRIMARY_CLUSTER="$cluster"; fi done -kubectl --context $HUB_CLUSTER patch documentdb documentdb-preview -n documentdb-preview-ns \ - --type='json' -p='[ - {"op": "add", "path": "/spec/clusterReplication/clusterList/3", "value":{"name": "'"$EXCLUDE_CLUSTER"'", "environment": "aks"}} - ]' +# Build the cluster list YAML with proper indentation +CLUSTER_LIST="" +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then + echo "Including cluster $cluster in DocumentDB configuration" + fi + if [ -z "$CLUSTER_LIST" ]; then + CLUSTER_LIST=" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP=" - ${cluster}" + else + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" + fi +done + +TEMP_YAML=$(mktemp) + +# Use sed for safer substitution +sed -e "s/{{DOCUMENTDB_PASSWORD}}/$DOCUMENTDB_PASSWORD/g" \ + -e "s/{{PRIMARY_CLUSTER}}/$PRIMARY_CLUSTER/g" \ + "$SCRIPT_DIR/documentdb-resource-crp.yaml" | \ +while IFS= read -r line; do + if [[ "$line" == '{{CLUSTER_LIST}}' ]]; then + echo "$CLUSTER_LIST" + elif [[ "$line" == '{{CLUSTER_LIST_CRP}}' ]]; then + echo "$CLUSTER_LIST_CRP" + else + echo "$line" + fi +done > "$TEMP_YAML" + +echo "" +echo "Applying DocumentDB multi-region configuration..." + +MAX_RETRIES=60 +RETRY_INTERVAL=3 +RETRY_COUNT=0 + +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do + kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" &> /dev/null + + echo "Checking if $EXCLUDE_CLUSTER has been added to clusterReplication on the excluded cluster..." + + # Get the clusterReplication.clusters field from the DocumentDB object on the excluded cluster + CLUSTER_LIST_JSON=$(kubectl --context "$EXCLUDE_CLUSTER" get documentdb documentdb-preview -n documentdb-preview-ns -o jsonpath='{.spec.clusterReplication.clusterList[*].name}' 2>/dev/null) + + if echo "$CLUSTER_LIST_JSON" | grep -q "$EXCLUDE_CLUSTER"; then + echo "Success: $EXCLUDE_CLUSTER is now included in clusterReplication field" + break + fi + + RETRY_COUNT=$((RETRY_COUNT + 1)) + echo "Cluster not yet in clusterReplication (attempt $RETRY_COUNT/$MAX_RETRIES). Retrying in ${RETRY_INTERVAL}s..." + sleep $RETRY_INTERVAL +done + +if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then + echo "Error: Timed out waiting for $EXCLUDE_CLUSTER to appear in clusterReplication" + exit 1 +fi -kubectl --context $HUB_CLUSTER patch resourceplacement documentdb-resource-rp -n documentdb-preview-ns \ - --type='json' -p='[ - {"op": "add", "path": "/spec/policy/clusterNames/3", "value": "'"$EXCLUDE_CLUSTER"'"} - ]' +rm -f "$TEMP_YAML" +echo "Done." \ No newline at end of file diff --git a/documentdb-playground/fleet-add-region/check.sh b/documentdb-playground/fleet-add-region/check.sh index 7478ab03..a785f4ec 100755 --- a/documentdb-playground/fleet-add-region/check.sh +++ b/documentdb-playground/fleet-add-region/check.sh @@ -17,6 +17,7 @@ declare -a CLUSTER_ARRAY=() declare -a DOCUMENTDB_CLUSTER_NAMES=() declare -A DOCUMENTDB_CLUSTER_SET=() EXPECTED_CLUSTER_NAMES_JSON="[]" +EXPECTED_CNPG_NAMES_JSON="[]" declare -i EXPECTED_CLUSTER_COUNT=0 log() { @@ -42,6 +43,29 @@ record_failure() { FAILURE_MESSAGES+=("[$cluster] $*") } +get_cnpg_name_for_cluster() { + local cluster="$1" + + if ! kubectl --context "$cluster" get namespace "$DOCUMENTDB_NAMESPACE" >/dev/null 2>&1; then + return 1 + fi + + local cnpg_list + if ! cnpg_list=$(kubectl --context "$cluster" get clusters.postgresql.cnpg.io -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + return 1 + fi + + local doc_owned_clusters + doc_owned_clusters=$(echo "$cnpg_list" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local doc_owned_count + doc_owned_count=$(echo "$doc_owned_clusters" | jq 'length') + if (( doc_owned_count == 0 )); then + return 1 + fi + + echo "$doc_owned_clusters" | jq -r '.[0].metadata.name' +} + check_member_cluster() { local cluster="$1" @@ -76,17 +100,21 @@ check_member_cluster() { cnpg_name=$(echo "$cnpg_obj" | jq -r '.metadata.name') local external_count + local expected_external_names_json + expected_external_names_json="$EXPECTED_CNPG_NAMES_JSON" + local expected_external_count + expected_external_count=$(echo "$expected_external_names_json" | jq 'length') external_count=$(echo "$cnpg_obj" | jq '.spec.externalClusters // [] | length') - if (( external_count == EXPECTED_CLUSTER_COUNT )); then + if (( external_count == expected_external_count )); then record_success "$cluster" "Cluster $cnpg_name externalClusters count matches ($external_count)" else - record_failure "$cluster" "Cluster $cnpg_name has $external_count externalClusters (expected $EXPECTED_CLUSTER_COUNT)" + record_failure "$cluster" "Cluster $cnpg_name has $external_count externalClusters (expected $expected_external_count)" fi local external_names_json external_names_json=$(echo "$cnpg_obj" | jq '[.spec.externalClusters // [] | .[]? | .name] | map(select(. != null))') local missing_names - missing_names=$(jq --argjson expected "$EXPECTED_CLUSTER_NAMES_JSON" --argjson actual "$external_names_json" -n '[ $expected[] | select(. as $item | ($actual | index($item)) | not) ]') + missing_names=$(jq --argjson expected "$expected_external_names_json" --argjson actual "$external_names_json" -n '[ $expected[] | select(. as $item | ($actual | index($item)) | not) ]') local missing_count missing_count=$(echo "$missing_names" | jq 'length') if (( missing_count > 0 )); then @@ -98,7 +126,7 @@ check_member_cluster() { fi local extra_names - extra_names=$(jq --argjson expected "$EXPECTED_CLUSTER_NAMES_JSON" --argjson actual "$external_names_json" -n '[ $actual[] | select(. as $item | ($expected | index($item)) | not) ]') + extra_names=$(jq --argjson expected "$expected_external_names_json" --argjson actual "$external_names_json" -n '[ $actual[] | select(. as $item | ($expected | index($item)) | not) ]') local extra_count extra_count=$(echo "$extra_names" | jq 'length') if (( extra_count > 0 )); then @@ -243,11 +271,24 @@ main() { DOCUMENTDB_CLUSTER_SET["$name"]=1 done + EXPECTED_CNPG_NAMES_JSON="[]" + for cluster in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + local cnpg_name + if ! cnpg_name=$(get_cnpg_name_for_cluster "$cluster"); then + record_failure "$cluster" "Unable to determine CNPG cluster name for DocumentDB $DOCUMENTDB_NAME" + continue + fi + EXPECTED_CNPG_NAMES_JSON=$(jq --arg name "$cnpg_name" '. + [$name]' <<<"$EXPECTED_CNPG_NAMES_JSON") + done + log "DocumentDB $DOCUMENTDB_NAME expects $EXPECTED_CLUSTER_COUNT cluster(s):" for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do echo " - $name" done + log "Resolved CNPG cluster names:" + echo "$EXPECTED_CNPG_NAMES_JSON" | jq -r '.[]' | sed 's/^/ - /' + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do local match_found="false" for cluster in "${CLUSTER_ARRAY[@]}"; do diff --git a/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml index 14de7912..63afdf0c 100644 --- a/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml +++ b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml @@ -81,3 +81,7 @@ spec: {{CLUSTER_LIST_CRP}} strategy: type: RollingUpdate + applyStrategy: + type: ServerSideApply + serverSideApplyConfig: + force: true diff --git a/documentdb-playground/fleet-add-region/remove-region.sh b/documentdb-playground/fleet-add-region/remove-region.sh index 4bb19319..e5b3d23a 100755 --- a/documentdb-playground/fleet-add-region/remove-region.sh +++ b/documentdb-playground/fleet-add-region/remove-region.sh @@ -4,6 +4,8 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" HUB_REGION="${HUB_REGION:-westus3}" +PRIMARY_REGION="${PRIMARY_REGION:-eastus2}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" # Dynamically get member clusters from Azure echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." @@ -20,14 +22,45 @@ echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" for cluster in "${CLUSTER_ARRAY[@]}"; do echo " - $cluster" if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then EXCLUDE_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$PRIMARY_REGION"* ]]; then PRIMARY_CLUSTER="$cluster"; fi done -kubectl --context $HUB_CLUSTER patch documentdb documentdb-preview -n documentdb-preview-ns \ - --type='json' -p='[ - {"op": "remove", "path": "/spec/clusterReplication/clusterList/2"} - ]' +# Build the cluster list YAML with proper indentation +CLUSTER_LIST="" +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then + echo "Excluding cluster $cluster from DocumentDB configuration" + continue + fi + if [ -z "$CLUSTER_LIST" ]; then + CLUSTER_LIST=" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP=" - ${cluster}" + else + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" + fi +done + +TEMP_YAML=$(mktemp) + +# Use sed for safer substitution +sed -e "s/{{DOCUMENTDB_PASSWORD}}/$DOCUMENTDB_PASSWORD/g" \ + -e "s/{{PRIMARY_CLUSTER}}/$PRIMARY_CLUSTER/g" \ + "$SCRIPT_DIR/documentdb-resource-crp.yaml" | \ +while IFS= read -r line; do + if [[ "$line" == '{{CLUSTER_LIST}}' ]]; then + echo "$CLUSTER_LIST" + elif [[ "$line" == '{{CLUSTER_LIST_CRP}}' ]]; then + echo "$CLUSTER_LIST_CRP" + else + echo "$line" + fi +done > "$TEMP_YAML" -kubectl --context $HUB_CLUSTER patch resourceplacement documentdb-resource-rp -n documentdb-preview-ns \ - --type='json' -p='[ - {"op": "remove", "path": "/spec/policy/clusterNames/2"} - ]' +echo "" +echo "Applying DocumentDB multi-region configuration..." +kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" \ No newline at end of file diff --git a/operator/documentdb-helm-chart/templates/05_clusterrole.yaml b/operator/documentdb-helm-chart/templates/05_clusterrole.yaml index 54e69af2..1d92f649 100644 --- a/operator/documentdb-helm-chart/templates/05_clusterrole.yaml +++ b/operator/documentdb-helm-chart/templates/05_clusterrole.yaml @@ -25,7 +25,7 @@ rules: resources: ["leases"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: ["networking.fleet.azure.com"] # fleet permissions for multi-cluster services - resources: ["serviceexports", "multiclusterservices"] + resources: ["serviceexports", "multiclusterservices", "serviceimports", "internalserviceexports"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: [""] resources: ["secrets"] diff --git a/operator/src/internal/controller/documentdb_controller.go b/operator/src/internal/controller/documentdb_controller.go index c37e2513..89f3e0ee 100644 --- a/operator/src/internal/controller/documentdb_controller.go +++ b/operator/src/internal/controller/documentdb_controller.go @@ -268,6 +268,28 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } + // Check for fleet-networking issues and attempt to remediate + if replicationContext.IsAzureFleetNetworking() { + deleted, imports, err := r.CleanupMismatchedServiceImports(ctx, documentdb.Namespace, replicationContext) + if err != nil { + log.Log.Error(err, "Failed to cleanup ServiceImports") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + if deleted { + log.Log.Info("Deleted mismatched ServiceImports; requeuing to allow for proper recreation") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + reconciled, err := r.ForceReconcileInternalServiceExports(ctx, documentdb.Namespace, replicationContext, imports) + if err != nil { + log.Log.Error(err, "Failed to force reconcile InternalServiceExports") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + if reconciled { + log.Log.Info("Annotated InternalServiceExports for reconciliation; requeuing to allow fleet-networking to recreate ServiceImports") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + } + // Don't reque again unless there is a change return ctrl.Result{}, nil } diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index 97f82940..e190ffac 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "slices" + "strings" "time" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" @@ -21,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -49,13 +51,13 @@ func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( } // No more errors possible, so we can safely edit the spec - cnpgCluster.Name = replicationContext.Self + cnpgCluster.Name = replicationContext.CNPGClusterName if !replicationContext.IsPrimary() { cnpgCluster.Spec.InheritedMetadata.Labels[util.LABEL_REPLICATION_CLUSTER_TYPE] = "replica" cnpgCluster.Spec.Bootstrap = &cnpgv1.BootstrapConfiguration{ PgBaseBackup: &cnpgv1.BootstrapPgBaseBackup{ - Source: replicationContext.PrimaryCluster, + Source: replicationContext.PrimaryCNPGClusterName, Database: "postgres", Owner: "postgres", }, @@ -98,8 +100,8 @@ func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( cnpgCluster.Spec.ReplicaCluster = &cnpgv1.ReplicaClusterConfiguration{ Source: replicationContext.GetReplicationSource(), - Primary: replicationContext.PrimaryCluster, - Self: replicationContext.Self, + Primary: replicationContext.PrimaryCNPGClusterName, + Self: replicationContext.CNPGClusterName, } if replicationContext.IsAzureFleetNetworking() { @@ -121,10 +123,10 @@ func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( }) } } - selfHost := replicationContext.Self + "-rw." + documentdb.Namespace + ".svc" + selfHost := replicationContext.CNPGClusterName + "-rw." + documentdb.Namespace + ".svc" cnpgCluster.Spec.ExternalClusters = []cnpgv1.ExternalCluster{ { - Name: replicationContext.Self, + Name: replicationContext.CNPGClusterName, ConnectionParameters: map[string]string{ "host": selfHost, "port": "5432", @@ -152,7 +154,7 @@ func (r *DocumentDBReconciler) CreateIstioRemoteServices(ctx context.Context, re // Create dummy -rw services for remote clusters so DNS resolution works // These services have non-matching selectors, so they have no local endpoints // Istio will automatically route traffic through the east-west gateway - for _, remoteCluster := range replicationContext.Others { + for _, remoteCluster := range replicationContext.OtherCNPGClusterNames { // Create the -rw (read-write/primary) service for each remote cluster serviceNameRW := remoteCluster + "-rw" foundServiceRW := &corev1.Service{} @@ -216,9 +218,12 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, } err = r.Create(ctx, ringServiceExport) if err != nil { + if errors.IsAlreadyExists(err) { + continue + } return err } - } else { + } else if err != nil { return err } } @@ -229,11 +234,21 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, err := r.Get(ctx, types.NamespacedName{Name: sourceServiceName, Namespace: documentdb.Namespace}, foundMCS) if err != nil && errors.IsNotFound(err) { log.Log.Info("Multi Cluster Service not found. Creating a new Multi Cluster Service") - // Multi Cluster Service + // Multi Cluster Service with owner reference to ensure cleanup foundMCS = &fleetv1alpha1.MultiClusterService{ ObjectMeta: metav1.ObjectMeta{ Name: sourceServiceName, Namespace: documentdb.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + BlockOwnerDeletion: ptr.To(true), + Controller: ptr.To(true), + }, + }, }, Spec: fleetv1alpha1.MultiClusterServiceSpec{ ServiceImport: fleetv1alpha1.ServiceImportRef{ @@ -336,7 +351,7 @@ func (r *DocumentDBReconciler) getPrimaryChangePatchOps(ctx context.Context, pat // Replica => primary // Look for the token if this is a managed failover oldPrimaryAvailable := slices.Contains( - replicationContext.Others, + replicationContext.OtherCNPGClusterNames, current.Spec.ReplicaCluster.Primary) replicaClusterConfig := desired.Spec.ReplicaCluster @@ -574,6 +589,133 @@ func (r *DocumentDBReconciler) waitForDemotionTokenAndCreateService(clusterNN ty } } +// CleanupMismatchedServiceImports finds and removes ServiceImports that have no ownerReferences +// and are marked as "in-use-by" the current cluster. +// RETURNS: Whether or not a deletion occurred, and error if any error occurs during the process +// +// There is currently an incompatibility when you use fleet-networking with a cluster that +// is both a hub and a member. The ServiceImport that is generated on the hub will sometimes +// be interpreted as a member-side ServiceImport and attach itself to the export, thus preventing +// the intended MCS from attaching to it. This function finds those offending ServiceImports and +// removes them. +func (r *DocumentDBReconciler) CleanupMismatchedServiceImports(ctx context.Context, namespace string, replicationContext *util.ReplicationContext) (bool, *fleetv1alpha1.ServiceImportList, error) { + deleted := false + + // List all ServiceImports in the namespace + serviceImportList := &fleetv1alpha1.ServiceImportList{} + if err := r.Client.List(ctx, serviceImportList, client.InNamespace(namespace)); err != nil { + // If the CRD doesn't exist, skip cleanup + if errors.IsNotFound(err) { + return deleted, nil, nil + } + return deleted, nil, fmt.Errorf("failed to list ServiceImports: %w", err) + } + + inUseByAnnotation := "networking.fleet.azure.com/service-in-use-by" + + for i := range serviceImportList.Items { + badServiceImport := &serviceImportList.Items[i] + // If it has an OwnerReference, then it is properly being used by the cluster's MCS + if len(badServiceImport.OwnerReferences) > 0 { + continue + } + + annotations := badServiceImport.GetAnnotations() + if annotations == nil { + continue + } + + inUseBy, exists := annotations[inUseByAnnotation] + // If it has its own name as the cluster name, then it has erroneously attached itself to the export + if !exists || !containsClusterName(inUseBy, replicationContext.FleetMemberName) { + continue + } + + log.Log.Info("Found mismatched ServiceImport", "name", badServiceImport.Name, "namespace", namespace, "cluster", replicationContext.FleetMemberName) + + // Debug log the entire ServiceImport before deletion + log.Log.V(1).Info("Deleting ServiceImport", "serviceImport", fmt.Sprintf("%+v", badServiceImport)) + + if err := r.Client.Delete(ctx, badServiceImport); err != nil && !errors.IsNotFound(err) { + log.Log.Error(err, "Failed to delete ServiceImport", "name", badServiceImport.Name) + continue + } + deleted = true + + log.Log.Info("Deleted ServiceImport", "name", badServiceImport.Name, "namespace", namespace) + } + + return deleted, serviceImportList, nil +} + +// ForceReconcileInternalServiceExports finds InternalServiceExports that don't have a matching +// ServiceImport with proper owner references in the target namespace, and annotates them to +// trigger reconciliation so the fleet-networking controller can recreate the ServiceImports properly. +// Returns whether any InternalServiceExports were annotated for reconciliation, and error if any occurs. +func (r *DocumentDBReconciler) ForceReconcileInternalServiceExports(ctx context.Context, namespace string, replicationContext *util.ReplicationContext, imports *fleetv1alpha1.ServiceImportList) (bool, error) { + reconciled := false + + // Extract all serviceImport names for easy lookup + serviceImportNames := make(map[string]bool) + for i := range imports.Items { + serviceImportNames[imports.Items[i].Name] = true + } + + for fleetMemberName := range replicationContext.GenerateFleetMemberNames() { + // List all InternalServiceExports in each fleet member namespace + fleetMemberNamespace := "fleet-member-" + fleetMemberName + iseList := &fleetv1alpha1.InternalServiceExportList{} + if err := r.Client.List(ctx, iseList, client.InNamespace(fleetMemberNamespace)); err != nil { + // If the CRD doesn't exist or namespace doesn't exist, skip + if errors.IsNotFound(err) { + return reconciled, nil + } + return reconciled, fmt.Errorf("failed to list InternalServiceExports: %w", err) + } + + // Check each InternalServiceExport for a matching ServiceImport + for i := range iseList.Items { + ise := &iseList.Items[i] + + // ISE name format is: - + // Extract the service name by removing the namespace prefix + prefix := namespace + "-" + if !strings.HasPrefix(ise.Name, prefix) { + continue + } + serviceName := strings.TrimPrefix(ise.Name, prefix) + + // Check if there's a valid ServiceImport for this ISE + if serviceImportNames[serviceName] { + continue + } + + log.Log.Info("Found InternalServiceExport without import", "name", ise.Name, "namespace", fleetMemberNamespace) + + // Add reconcile annotation with current timestamp to trigger reconciliation + if ise.Annotations == nil { + ise.Annotations = make(map[string]string) + } + ise.Annotations["reconcile"] = fmt.Sprintf("%d", time.Now().Unix()) + + if err := r.Client.Update(ctx, ise); err != nil { + log.Log.Error(err, "Failed to annotate InternalServiceExport", "name", ise.Name, "namespace", fleetMemberNamespace) + continue + } + + reconciled = true + log.Log.Info("Annotated InternalServiceExport for reconciliation", "name", ise.Name, "namespace", fleetMemberNamespace) + } + } + return reconciled, nil +} + +// containsClusterName checks if the inUseBy string contains the cluster name +func containsClusterName(inUseBy, clusterName string) bool { + // The annotation value typically contains the cluster name + return strings.Contains(inUseBy, clusterName) +} + // Returns true when token service resources are ready func (r *DocumentDBReconciler) ensureTokenServiceResources(ctx context.Context, clusterNN types.NamespacedName, replicationContext *util.ReplicationContext) (bool, error) { cluster := &cnpgv1.Cluster{} diff --git a/operator/src/internal/controller/physical_replication_test.go b/operator/src/internal/controller/physical_replication_test.go new file mode 100644 index 00000000..a1106741 --- /dev/null +++ b/operator/src/internal/controller/physical_replication_test.go @@ -0,0 +1,173 @@ +package controller + +import ( + "context" + "testing" + "time" + + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + dbpreview "github.com/documentdb/documentdb-operator/api/preview" + util "github.com/documentdb/documentdb-operator/internal/utils" +) + +func buildDocumentDBReconciler(t *testing.T, objs ...runtime.Object) *DocumentDBReconciler { + t.Helper() + scheme := runtime.NewScheme() + require.NoError(t, dbpreview.AddToScheme(scheme)) + require.NoError(t, cnpgv1.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + require.NoError(t, rbacv1.AddToScheme(scheme)) + + builder := fake.NewClientBuilder().WithScheme(scheme) + if len(objs) > 0 { + builder = builder.WithRuntimeObjects(objs...) + clientObjs := make([]client.Object, 0, len(objs)) + for _, obj := range objs { + if co, ok := obj.(client.Object); ok { + clientObjs = append(clientObjs, co) + } + } + if len(clientObjs) > 0 { + builder = builder.WithStatusSubresource(clientObjs...) + } + } + + return &DocumentDBReconciler{Client: builder.Build(), Scheme: scheme} +} + +func TestDocumentDBReconcileSkipsWhenNotPresent(t *testing.T) { + ctx := context.Background() + namespace := "default" + + documentdb := baseDocumentDB("docdb-not-present", namespace) + documentdb.UID = types.UID("docdb-not-present-uid") + documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ + CrossCloudNetworkingStrategy: string(util.AzureFleet), + Primary: "member-2", + ClusterList: []dbpreview.MemberCluster{ + {Name: "member-2"}, + {Name: "member-3"}, + }, + } + + ownerRef := metav1.OwnerReference{ + APIVersion: "documentdb.io/preview", + Kind: "DocumentDB", + Name: documentdb.Name, + UID: documentdb.UID, + } + + ownedService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "owned-service", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ownerRef}, + }, + } + + ownedCluster := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "owned-cnpg", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ownerRef}, + }, + } + + clusterNameConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-name", + Namespace: "kube-system", + }, + Data: map[string]string{ + "name": "member-1", + }, + } + + reconciler := buildDocumentDBReconciler(t, documentdb, ownedService, ownedCluster, clusterNameConfigMap) + + result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: documentdb.Name, Namespace: namespace}}) + require.NoError(t, err) + require.Equal(t, ctrl.Result{}, result) + + service := &corev1.Service{} + err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedService.Name, Namespace: namespace}, service) + require.True(t, errors.IsNotFound(err)) + + cluster := &cnpgv1.Cluster{} + err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedCluster.Name, Namespace: namespace}, cluster) + require.True(t, errors.IsNotFound(err)) +} + +func TestTryUpdateClusterUpdatesExternalClusters(t *testing.T) { + ctx := context.Background() + namespace := "default" + + documentdb := baseDocumentDB("docdb-repl", namespace) + documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ + CrossCloudNetworkingStrategy: string(util.None), + Primary: documentdb.Name, + ClusterList: []dbpreview.MemberCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + }, + } + + current := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "docdb-repl", + Namespace: namespace, + }, + Spec: cnpgv1.ClusterSpec{ + ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{ + Self: documentdb.Name, + Primary: documentdb.Name, + Source: documentdb.Name, + }, + ExternalClusters: []cnpgv1.ExternalCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + }, + PostgresConfiguration: cnpgv1.PostgresConfiguration{ + Synchronous: &cnpgv1.SynchronousReplicaConfiguration{ + Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, + Number: 1, + }, + }, + }, + } + + desired := current.DeepCopy() + desired.Spec.ExternalClusters = []cnpgv1.ExternalCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + {Name: "member-3"}, + } + desired.Spec.PostgresConfiguration.Synchronous = &cnpgv1.SynchronousReplicaConfiguration{ + Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, + Number: 2, + } + + reconciler := buildDocumentDBReconciler(t, current) + replicationContext, err := util.GetReplicationContext(ctx, reconciler.Client, *documentdb) + require.NoError(t, err) + + err, requeue := reconciler.TryUpdateCluster(ctx, current, desired, documentdb, replicationContext) + require.NoError(t, err) + require.Equal(t, time.Duration(-1), requeue) + + updated := &cnpgv1.Cluster{} + require.NoError(t, reconciler.Client.Get(ctx, types.NamespacedName{Name: current.Name, Namespace: namespace}, updated)) + require.Len(t, updated.Spec.ExternalClusters, 3) + require.Equal(t, 2, updated.Spec.PostgresConfiguration.Synchronous.Number) +} diff --git a/operator/src/internal/utils/replication_context.go b/operator/src/internal/utils/replication_context.go index 5eec302f..a3383eda 100644 --- a/operator/src/internal/utils/replication_context.go +++ b/operator/src/internal/utils/replication_context.go @@ -15,12 +15,14 @@ import ( ) type ReplicationContext struct { - Self string - Others []string - PrimaryCluster string + CNPGClusterName string + OtherCNPGClusterNames []string + PrimaryCNPGClusterName string CrossCloudNetworkingStrategy crossCloudNetworkingStrategy Environment string StorageClass string + FleetMemberName string + OtherFleetMemberNames []string currentLocalPrimary string targetLocalPrimary string state replicationState @@ -49,7 +51,7 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb CrossCloudNetworkingStrategy: None, Environment: documentdb.Spec.Environment, StorageClass: documentdb.Spec.Resource.Storage.StorageClass, - Self: documentdb.Name, + CNPGClusterName: documentdb.Name, } if documentdb.Spec.ClusterReplication == nil { return &singleClusterReplicationContext, nil @@ -69,7 +71,7 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb CrossCloudNetworkingStrategy: None, Environment: "", StorageClass: "", - Self: "", + CNPGClusterName: "", }, nil } @@ -80,6 +82,11 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb primaryCluster := generateCNPGClusterName(documentdb.Name, documentdb.Spec.ClusterReplication.Primary) + otherCNPGClusterNames := make([]string, len(others)) + for i, other := range others { + otherCNPGClusterNames[i] = generateCNPGClusterName(documentdb.Name, other) + } + storageClass := documentdb.Spec.Resource.Storage.StorageClass if self.StorageClassOverride != "" { storageClass = self.StorageClassOverride @@ -90,13 +97,15 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb } return &ReplicationContext{ - Self: self.Name, - Others: others, + CNPGClusterName: generateCNPGClusterName(documentdb.Name, self.Name), + OtherCNPGClusterNames: otherCNPGClusterNames, CrossCloudNetworkingStrategy: crossCloudNetworkingStrategy(documentdb.Spec.ClusterReplication.CrossCloudNetworkingStrategy), - PrimaryCluster: primaryCluster, + PrimaryCNPGClusterName: primaryCluster, Environment: environment, StorageClass: storageClass, state: replicationState, + FleetMemberName: self.Name, + OtherFleetMemberNames: others, targetLocalPrimary: documentdb.Status.TargetPrimary, currentLocalPrimary: documentdb.Status.LocalPrimary, }, nil @@ -112,10 +121,12 @@ func (r ReplicationContext) String() string { stateStr = "Primary" case Replica: stateStr = "Replica" + case NotPresent: + stateStr = "NotPresent" } - return fmt.Sprintf("ReplicationContext{Self: %s, State: %s, Others: %v, PrimaryRegion: %s, CurrentLocalPrimary: %s, TargetLocalPrimary: %s}", - r.Self, stateStr, r.Others, r.PrimaryCluster, r.currentLocalPrimary, r.targetLocalPrimary) + return fmt.Sprintf("ReplicationContext{CNPGClusterName: %s, State: %s, Others: %v, PrimaryRegion: %s, CurrentLocalPrimary: %s, TargetLocalPrimary: %s}", + r.CNPGClusterName, stateStr, r.OtherCNPGClusterNames, r.PrimaryCNPGClusterName, r.currentLocalPrimary, r.targetLocalPrimary) } // Returns true if this instance is the primary or if there is no replication configured. @@ -134,9 +145,9 @@ func (r *ReplicationContext) IsNotPresent() bool { // Gets the primary if you're a replica, otherwise returns the first other cluster func (r ReplicationContext) GetReplicationSource() string { if r.state == Replica { - return r.PrimaryCluster + return r.PrimaryCNPGClusterName } - return r.Others[0] + return r.OtherCNPGClusterNames[0] } // EndpointEnabled returns true if the endpoint should be enabled for this DocumentDB instance. @@ -151,10 +162,10 @@ func (r ReplicationContext) EndpointEnabled() bool { func (r ReplicationContext) GenerateExternalClusterServices(name, namespace string, fleetEnabled bool) func(yield func(string, string) bool) { return func(yield func(string, string) bool) { - for _, other := range r.Others { + for _, other := range r.OtherCNPGClusterNames { serviceName := other + "-rw." + namespace + ".svc" if fleetEnabled { - serviceName = namespace + "-" + generateServiceName(name, other, r.Self, namespace) + ".fleet-system.svc" + serviceName = namespace + "-" + generateServiceName(name, other, r.CNPGClusterName, namespace) + ".fleet-system.svc" } if !yield(other, serviceName) { @@ -167,8 +178,8 @@ func (r ReplicationContext) GenerateExternalClusterServices(name, namespace stri // Create an iterator that yields outgoing service names, for use in a for each loop func (r ReplicationContext) GenerateIncomingServiceNames(name, resourceGroup string) func(yield func(string) bool) { return func(yield func(string) bool) { - for _, other := range r.Others { - serviceName := generateServiceName(name, other, r.Self, resourceGroup) + for _, other := range r.OtherCNPGClusterNames { + serviceName := generateServiceName(name, other, r.CNPGClusterName, resourceGroup) if !yield(serviceName) { break } @@ -179,8 +190,8 @@ func (r ReplicationContext) GenerateIncomingServiceNames(name, resourceGroup str // Create an iterator that yields outgoing service names, for use in a for each loop func (r ReplicationContext) GenerateOutgoingServiceNames(name, resourceGroup string) func(yield func(string) bool) { return func(yield func(string) bool) { - for _, other := range r.Others { - serviceName := generateServiceName(name, r.Self, other, resourceGroup) + for _, other := range r.OtherCNPGClusterNames { + serviceName := generateServiceName(name, r.CNPGClusterName, other, resourceGroup) if !yield(serviceName) { break } @@ -188,10 +199,23 @@ func (r ReplicationContext) GenerateOutgoingServiceNames(name, resourceGroup str } } +func (r ReplicationContext) GenerateFleetMemberNames() func(yield func(string) bool) { + return func(yield func(string) bool) { + for _, other := range r.OtherFleetMemberNames { + if !yield(other) { + return + } + } + if !yield(r.FleetMemberName) { + return + } + } +} + // Creates the standby names list, which will be all other clusters in addition to "pg_receivewal" func (r *ReplicationContext) CreateStandbyNamesList() []string { - standbyNames := make([]string, len(r.Others)) - copy(standbyNames, r.Others) + standbyNames := make([]string, len(r.OtherCNPGClusterNames)) + copy(standbyNames, r.OtherCNPGClusterNames) /* TODO re-enable when we have a WAL replica image (also add one to length) standbyNames[len(r.Others)] = "pg_receivewal" */ @@ -203,7 +227,7 @@ func getTopology(ctx context.Context, client client.Client, documentdb dbpreview var err error if documentdb.Spec.ClusterReplication.CrossCloudNetworkingStrategy != string(None) { - memberClusterName, err = GetSelfName(ctx, client) + memberClusterName, err = GetFleetMemberName(ctx, client) if err != nil { return nil, nil, NoReplication, err } @@ -218,16 +242,15 @@ func getTopology(ctx context.Context, client client.Client, documentdb dbpreview var self *dbpreview.MemberCluster for _, c := range documentdb.Spec.ClusterReplication.ClusterList { if c.Name != memberClusterName { - others = append(others, generateCNPGClusterName(documentdb.Name, c.Name)) + others = append(others, c.Name) } else { - self = &c - self.Name = generateCNPGClusterName(documentdb.Name, self.Name) + self = c.DeepCopy() } } return self, others, state, nil } -func GetSelfName(ctx context.Context, client client.Client) (string, error) { +func GetFleetMemberName(ctx context.Context, client client.Client) (string, error) { clusterMapName := "cluster-name" clusterNameConfigMap := &corev1.ConfigMap{} err := client.Get(ctx, types.NamespacedName{Name: clusterMapName, Namespace: "kube-system"}, clusterNameConfigMap) @@ -235,11 +258,11 @@ func GetSelfName(ctx context.Context, client client.Client) (string, error) { return "", err } - self := clusterNameConfigMap.Data["name"] - if self == "" { + memberName := clusterNameConfigMap.Data["name"] + if memberName == "" { return "", fmt.Errorf("name key not found in kube-system:cluster-name configmap") } - return self, nil + return memberName, nil } func (r *ReplicationContext) IsAzureFleetNetworking() bool { diff --git a/operator/src/internal/utils/util_test.go b/operator/src/internal/utils/util_test.go index 5fcb1fec..96210715 100644 --- a/operator/src/internal/utils/util_test.go +++ b/operator/src/internal/utils/util_test.go @@ -335,9 +335,9 @@ func TestGetDocumentDBServiceDefinition_CNPGLabels(t *testing.T) { // Create a mock ReplicationContext replicationContext := &ReplicationContext{ - Self: tt.documentDBName, - Environment: "test", - state: NoReplication, // This will make EndpointEnabled() return true + CNPGClusterName: tt.documentDBName, + Environment: "test", + state: NoReplication, // This will make EndpointEnabled() return true } // If endpoint should be disabled, set a different state From 4aad5db82fec259232ed41369ccf62148b40cabd Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Wed, 28 Jan 2026 13:59:28 -0500 Subject: [PATCH 4/8] Delete MCS when region is removed --- .../fleet-add-region/remove-region.sh | 41 +++++- .../controller/documentdb_controller.go | 2 +- .../controller/physical_replication.go | 119 ++++++++++++------ operator/src/internal/utils/util.go | 47 +++++++ 4 files changed, 166 insertions(+), 43 deletions(-) diff --git a/documentdb-playground/fleet-add-region/remove-region.sh b/documentdb-playground/fleet-add-region/remove-region.sh index e5b3d23a..04871a83 100755 --- a/documentdb-playground/fleet-add-region/remove-region.sh +++ b/documentdb-playground/fleet-add-region/remove-region.sh @@ -28,7 +28,6 @@ done # Build the cluster list YAML with proper indentation CLUSTER_LIST="" -CLUSTER_LIST_CRP="" for cluster in "${CLUSTER_ARRAY[@]}"; do if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then echo "Excluding cluster $cluster from DocumentDB configuration" @@ -37,10 +36,17 @@ for cluster in "${CLUSTER_ARRAY[@]}"; do if [ -z "$CLUSTER_LIST" ]; then CLUSTER_LIST=" - name: ${cluster}" CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" - CLUSTER_LIST_CRP=" - ${cluster}" else CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + fi +done + +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ -z "$CLUSTER_LIST_CRP" ]; then + CLUSTER_LIST_CRP=" - ${cluster}" + else CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" fi done @@ -63,4 +69,33 @@ done > "$TEMP_YAML" echo "" echo "Applying DocumentDB multi-region configuration..." -kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" \ No newline at end of file + +MAX_RETRIES=60 +RETRY_INTERVAL=3 +RETRY_COUNT=0 + +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do + kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" &> /dev/null + + echo "Checking if $EXCLUDE_CLUSTER has been removed from clusterReplication on the excluded cluster..." + + # Get the clusterReplication.clusters field from the DocumentDB object on the excluded cluster + CLUSTER_LIST_JSON=$(kubectl --context "$EXCLUDE_CLUSTER" get documentdb documentdb-preview -n documentdb-preview-ns -o jsonpath='{.spec.clusterReplication.clusterList[*].name}' 2>/dev/null) + + if ! echo "$CLUSTER_LIST_JSON" | grep -q "$EXCLUDE_CLUSTER"; then + echo "Success: $EXCLUDE_CLUSTER is now excluded from clusterReplication field" + break + fi + + RETRY_COUNT=$((RETRY_COUNT + 1)) + echo "Cluster still in clusterReplication (attempt $RETRY_COUNT/$MAX_RETRIES). Retrying in ${RETRY_INTERVAL}s..." + sleep $RETRY_INTERVAL +done + +if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then + echo "Error: Timed out waiting for $EXCLUDE_CLUSTER to be removed from clusterReplication" + exit 1 +fi + +rm -f "$TEMP_YAML" +echo "Done." \ No newline at end of file diff --git a/operator/src/internal/controller/documentdb_controller.go b/operator/src/internal/controller/documentdb_controller.go index 89f3e0ee..eb4b99e3 100644 --- a/operator/src/internal/controller/documentdb_controller.go +++ b/operator/src/internal/controller/documentdb_controller.go @@ -286,7 +286,7 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if reconciled { log.Log.Info("Annotated InternalServiceExports for reconciliation; requeuing to allow fleet-networking to recreate ServiceImports") - return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + return ctrl.Result{RequeueAfter: RequeueAfterLong}, nil } } diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index e190ffac..8b088d5a 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -22,7 +22,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -203,50 +202,87 @@ func (r *DocumentDBReconciler) CreateIstioRemoteServices(ctx context.Context, re } func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, replicationContext *util.ReplicationContext, documentdb *dbpreview.DocumentDB) error { - for serviceName := range replicationContext.GenerateOutgoingServiceNames(documentdb.Name, documentdb.Namespace) { - foundServiceExport := &fleetv1alpha1.ServiceExport{} - err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: documentdb.Namespace}, foundServiceExport) - if errors.IsNotFound(err) { - log.Log.Info("Service Export not found. Creating a new Service Export " + serviceName) + // List all existing ServiceExports in the namespace + existingServiceExports := &fleetv1alpha1.ServiceExportList{} + if err := r.Client.List(ctx, existingServiceExports, client.InNamespace(documentdb.Namespace)); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("failed to list ServiceExports: %w", err) + } + } - // Service Export + // Build a map of existing ServiceExports that start with documentdb.Name + existingExports := make(map[string]*fleetv1alpha1.ServiceExport) + for i := range existingServiceExports.Items { + export := &existingServiceExports.Items[i] + if strings.HasPrefix(export.Name, documentdb.Name) { + existingExports[export.Name] = export + } + } + + for serviceName := range replicationContext.GenerateOutgoingServiceNames(documentdb.Name, documentdb.Namespace) { + _, exists := existingExports[serviceName] + if !exists { ringServiceExport := &fleetv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: documentdb.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + }, + }, }, } - err = r.Create(ctx, ringServiceExport) - if err != nil { - if errors.IsAlreadyExists(err) { - continue - } + if err := r.Create(ctx, ringServiceExport); err != nil && !errors.IsAlreadyExists(err) { return err } - } else if err != nil { - return err + } else { // if exists then we don't want to remove it + delete(existingExports, serviceName) + } + } + + // If it's still in the existingExports map, it means it's no longer needed and should be deleted + for serviceName, export := range existingExports { + if err := r.Client.Delete(ctx, export); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete ServiceExport %s: %w", serviceName, err) } } - // Below is true because this function is only called if we are fleet enabled + // List all existing MultiClusterServices in the namespace + existingMCSList := &fleetv1alpha1.MultiClusterServiceList{} + if err := r.Client.List(ctx, existingMCSList, client.InNamespace(documentdb.Namespace)); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("failed to list MultiClusterServices: %w", err) + } + } + + // Build a map of existing MultiClusterServices that start with documentdb.Name + existingMCS := make(map[string]*fleetv1alpha1.MultiClusterService) + for i := range existingMCSList.Items { + mcs := &existingMCSList.Items[i] + if strings.HasPrefix(mcs.Name, documentdb.Name) { + existingMCS[mcs.Name] = mcs + } + } + + // Below is valid because this function is only called if fleet-networking is enabled for sourceServiceName := range replicationContext.GenerateIncomingServiceNames(documentdb.Name, documentdb.Namespace) { - foundMCS := &fleetv1alpha1.MultiClusterService{} - err := r.Get(ctx, types.NamespacedName{Name: sourceServiceName, Namespace: documentdb.Namespace}, foundMCS) - if err != nil && errors.IsNotFound(err) { - log.Log.Info("Multi Cluster Service not found. Creating a new Multi Cluster Service") + _, exists := existingMCS[sourceServiceName] + if !exists { // Multi Cluster Service with owner reference to ensure cleanup - foundMCS = &fleetv1alpha1.MultiClusterService{ + newMCS := &fleetv1alpha1.MultiClusterService{ ObjectMeta: metav1.ObjectMeta{ Name: sourceServiceName, Namespace: documentdb.Namespace, OwnerReferences: []metav1.OwnerReference{ { - APIVersion: documentdb.APIVersion, - Kind: documentdb.Kind, - Name: documentdb.Name, - UID: documentdb.UID, - BlockOwnerDeletion: ptr.To(true), - Controller: ptr.To(true), + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, }, }, }, @@ -256,10 +292,18 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, }, }, } - err = r.Create(ctx, foundMCS) - if err != nil { + if err := r.Create(ctx, newMCS); err != nil && !errors.IsAlreadyExists(err) { return err } + } else { // if exists then we don't want to remove it + delete(existingMCS, sourceServiceName) + } + } + + // If it's still in the existingMCS map, it means it's no longer needed and should be deleted + for serviceName, mcs := range existingMCS { + if err := r.Client.Delete(ctx, mcs); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete MultiClusterService %s: %w", serviceName, err) } } @@ -291,7 +335,6 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de // Update if the cluster list has changed replicasChanged := externalClusterNamesChanged(current.Spec.ExternalClusters, desired.Spec.ExternalClusters) if replicasChanged { - log.Log.Info("Updating external clusters") getReplicasChangePatchOps(&patchOps, desired, replicationContext) } @@ -631,18 +674,11 @@ func (r *DocumentDBReconciler) CleanupMismatchedServiceImports(ctx context.Conte continue } - log.Log.Info("Found mismatched ServiceImport", "name", badServiceImport.Name, "namespace", namespace, "cluster", replicationContext.FleetMemberName) - - // Debug log the entire ServiceImport before deletion - log.Log.V(1).Info("Deleting ServiceImport", "serviceImport", fmt.Sprintf("%+v", badServiceImport)) - if err := r.Client.Delete(ctx, badServiceImport); err != nil && !errors.IsNotFound(err) { log.Log.Error(err, "Failed to delete ServiceImport", "name", badServiceImport.Name) continue } deleted = true - - log.Log.Info("Deleted ServiceImport", "name", badServiceImport.Name, "namespace", namespace) } return deleted, serviceImportList, nil @@ -690,8 +726,6 @@ func (r *DocumentDBReconciler) ForceReconcileInternalServiceExports(ctx context. continue } - log.Log.Info("Found InternalServiceExport without import", "name", ise.Name, "namespace", fleetMemberNamespace) - // Add reconcile annotation with current timestamp to trigger reconciliation if ise.Annotations == nil { ise.Annotations = make(map[string]string) @@ -704,7 +738,6 @@ func (r *DocumentDBReconciler) ForceReconcileInternalServiceExports(ctx context. } reconciled = true - log.Log.Info("Annotated InternalServiceExport for reconciliation", "name", ise.Name, "namespace", fleetMemberNamespace) } } return reconciled, nil @@ -838,6 +871,14 @@ func (r *DocumentDBReconciler) ensureTokenServiceResources(ctx context.Context, ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, Namespace: clusterNN.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: cluster.APIVersion, + Kind: cluster.Kind, + Name: cluster.Name, + UID: cluster.UID, + }, + }, }, } diff --git a/operator/src/internal/utils/util.go b/operator/src/internal/utils/util.go index e5a40e49..0762981e 100644 --- a/operator/src/internal/utils/util.go +++ b/operator/src/internal/utils/util.go @@ -8,9 +8,11 @@ import ( "fmt" "os" "strconv" + "strings" "time" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + fleetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -358,12 +360,57 @@ func DeleteOwnedResources(ctx context.Context, c client.Client, owner metav1.Obj } } + var mcsList fleetv1alpha1.MultiClusterServiceList + if err := c.List(ctx, &mcsList, listInNamespace); err != nil && !errors.IsNotFound(err) { + // Ignore if CRD doesn't exist + if !isNoKindMatchError(err) { + return fmt.Errorf("failed to list MultiClusterServices: %w", err) + } + } else { + for i := range mcsList.Items { + mcs := &mcsList.Items[i] + if hasOwnerReference(mcs.OwnerReferences) { + if err := c.Delete(ctx, mcs); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned MultiClusterService", "name", mcs.Name, "namespace", mcs.Namespace) + errList = append(errList, fmt.Errorf("multiclusterservice %s/%s: %w", mcs.Namespace, mcs.Name, err)) + } + } + } + } + + var serviceExportList fleetv1alpha1.ServiceExportList + if err := c.List(ctx, &serviceExportList, listInNamespace); err != nil && !errors.IsNotFound(err) { + // Ignore if CRD doesn't exist + if !isNoKindMatchError(err) { + return fmt.Errorf("failed to list ServiceExports: %w", err) + } + } else { + for i := range serviceExportList.Items { + se := &serviceExportList.Items[i] + if hasOwnerReference(se.OwnerReferences) { + if err := c.Delete(ctx, se); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned ServiceExport", "name", se.Name, "namespace", se.Namespace) + errList = append(errList, fmt.Errorf("serviceexport %s/%s: %w", se.Namespace, se.Name, err)) + } + } + } + } + if len(errList) > 0 { return utilerrors.NewAggregate(errList) } return nil } +// isNoKindMatchError checks if the error is a "no kind match" error, which occurs when +// a CRD is not installed in the cluster +func isNoKindMatchError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "no matches for kind") +} + // GenerateConnectionString returns a MongoDB connection string for the DocumentDB instance. // When trustTLS is true, tlsAllowInvalidCertificates is omitted for strict verification. func GenerateConnectionString(documentdb *dbpreview.DocumentDB, serviceIp string, trustTLS bool) string { From e440e3ec3372eebceac8d5a831c7cf066f248f8a Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Wed, 28 Jan 2026 14:50:53 -0500 Subject: [PATCH 5/8] GPT5.2-codex review comments --- .../controller/physical_replication.go | 68 ++++-- .../controller/physical_replication_test.go | 229 +++++++++--------- operator/src/internal/utils/constants.go | 2 + operator/src/internal/utils/util.go | 13 +- 4 files changed, 168 insertions(+), 144 deletions(-) diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index 8b088d5a..cc4ed6d9 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -202,21 +203,23 @@ func (r *DocumentDBReconciler) CreateIstioRemoteServices(ctx context.Context, re } func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, replicationContext *util.ReplicationContext, documentdb *dbpreview.DocumentDB) error { + labels := map[string]string{ + util.LABEL_DOCUMENTDB_NAME: documentdb.Name, + } + // List all existing ServiceExports in the namespace existingServiceExports := &fleetv1alpha1.ServiceExportList{} - if err := r.Client.List(ctx, existingServiceExports, client.InNamespace(documentdb.Namespace)); err != nil { + if err := r.Client.List(ctx, existingServiceExports, client.InNamespace(documentdb.Namespace), client.MatchingLabels(labels)); err != nil { if !errors.IsNotFound(err) { return fmt.Errorf("failed to list ServiceExports: %w", err) } } - // Build a map of existing ServiceExports that start with documentdb.Name + // Build a map of existing ServiceExports that are tagged for this DocumentDB existingExports := make(map[string]*fleetv1alpha1.ServiceExport) for i := range existingServiceExports.Items { export := &existingServiceExports.Items[i] - if strings.HasPrefix(export.Name, documentdb.Name) { - existingExports[export.Name] = export - } + existingExports[export.Name] = export } for serviceName := range replicationContext.GenerateOutgoingServiceNames(documentdb.Name, documentdb.Namespace) { @@ -226,12 +229,15 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: documentdb.Namespace, + Labels: labels, OwnerReferences: []metav1.OwnerReference{ { - APIVersion: documentdb.APIVersion, - Kind: documentdb.Kind, - Name: documentdb.Name, - UID: documentdb.UID, + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), }, }, }, @@ -253,19 +259,17 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, // List all existing MultiClusterServices in the namespace existingMCSList := &fleetv1alpha1.MultiClusterServiceList{} - if err := r.Client.List(ctx, existingMCSList, client.InNamespace(documentdb.Namespace)); err != nil { + if err := r.Client.List(ctx, existingMCSList, client.InNamespace(documentdb.Namespace), client.MatchingLabels(labels)); err != nil { if !errors.IsNotFound(err) { return fmt.Errorf("failed to list MultiClusterServices: %w", err) } } - // Build a map of existing MultiClusterServices that start with documentdb.Name + // Build a map of existing MultiClusterServices that are tagged for this DocumentDB existingMCS := make(map[string]*fleetv1alpha1.MultiClusterService) for i := range existingMCSList.Items { mcs := &existingMCSList.Items[i] - if strings.HasPrefix(mcs.Name, documentdb.Name) { - existingMCS[mcs.Name] = mcs - } + existingMCS[mcs.Name] = mcs } // Below is valid because this function is only called if fleet-networking is enabled @@ -277,12 +281,15 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, ObjectMeta: metav1.ObjectMeta{ Name: sourceServiceName, Namespace: documentdb.Namespace, + Labels: labels, OwnerReferences: []metav1.OwnerReference{ { - APIVersion: documentdb.APIVersion, - Kind: documentdb.Kind, - Name: documentdb.Name, - UID: documentdb.UID, + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), }, }, }, @@ -400,7 +407,7 @@ func (r *DocumentDBReconciler) getPrimaryChangePatchOps(ctx context.Context, pat replicaClusterConfig := desired.Spec.ReplicaCluster // If the old primary is available, we can read the token from it if oldPrimaryAvailable { - token, err, refreshTime := r.ReadToken(ctx, documentdb.Namespace, replicationContext) + token, err, refreshTime := r.ReadToken(ctx, documentdb, replicationContext) if err != nil || refreshTime > 0 { return err, refreshTime } @@ -500,8 +507,9 @@ func getReplicasChangePatchOps(patchOps *[]util.JSONPatch, desired *cnpgv1.Clust } } -func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, replicationContext *util.ReplicationContext) (string, error, time.Duration) { +func (r *DocumentDBReconciler) ReadToken(ctx context.Context, documentdb *dbpreview.DocumentDB, replicationContext *util.ReplicationContext) (string, error, time.Duration) { tokenServiceName := "promotion-token" + namespace := documentdb.Namespace // If we are not using cross-cloud networking, we only need to read the token from the configmap if !replicationContext.IsAzureFleetNetworking() && !replicationContext.IsIstioNetworking() { @@ -578,6 +586,16 @@ func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), + }, + }, }, Spec: fleetv1alpha1.MultiClusterServiceSpec{ ServiceImport: fleetv1alpha1.ServiceImportRef{ @@ -873,10 +891,12 @@ func (r *DocumentDBReconciler) ensureTokenServiceResources(ctx context.Context, Namespace: clusterNN.Namespace, OwnerReferences: []metav1.OwnerReference{ { - APIVersion: cluster.APIVersion, - Kind: cluster.Kind, - Name: cluster.Name, - UID: cluster.UID, + APIVersion: cluster.APIVersion, + Kind: cluster.Kind, + Name: cluster.Name, + UID: cluster.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), }, }, }, diff --git a/operator/src/internal/controller/physical_replication_test.go b/operator/src/internal/controller/physical_replication_test.go index a1106741..03903aff 100644 --- a/operator/src/internal/controller/physical_replication_test.go +++ b/operator/src/internal/controller/physical_replication_test.go @@ -2,11 +2,11 @@ package controller import ( "context" - "testing" "time" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" - "github.com/stretchr/testify/require" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -21,13 +21,12 @@ import ( util "github.com/documentdb/documentdb-operator/internal/utils" ) -func buildDocumentDBReconciler(t *testing.T, objs ...runtime.Object) *DocumentDBReconciler { - t.Helper() +func buildDocumentDBReconciler(objs ...runtime.Object) *DocumentDBReconciler { scheme := runtime.NewScheme() - require.NoError(t, dbpreview.AddToScheme(scheme)) - require.NoError(t, cnpgv1.AddToScheme(scheme)) - require.NoError(t, corev1.AddToScheme(scheme)) - require.NoError(t, rbacv1.AddToScheme(scheme)) + Expect(dbpreview.AddToScheme(scheme)).To(Succeed()) + Expect(cnpgv1.AddToScheme(scheme)).To(Succeed()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + Expect(rbacv1.AddToScheme(scheme)).To(Succeed()) builder := fake.NewClientBuilder().WithScheme(scheme) if len(objs) > 0 { @@ -46,128 +45,130 @@ func buildDocumentDBReconciler(t *testing.T, objs ...runtime.Object) *DocumentDB return &DocumentDBReconciler{Client: builder.Build(), Scheme: scheme} } -func TestDocumentDBReconcileSkipsWhenNotPresent(t *testing.T) { - ctx := context.Background() - namespace := "default" - - documentdb := baseDocumentDB("docdb-not-present", namespace) - documentdb.UID = types.UID("docdb-not-present-uid") - documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ - CrossCloudNetworkingStrategy: string(util.AzureFleet), - Primary: "member-2", - ClusterList: []dbpreview.MemberCluster{ - {Name: "member-2"}, - {Name: "member-3"}, - }, - } - - ownerRef := metav1.OwnerReference{ - APIVersion: "documentdb.io/preview", - Kind: "DocumentDB", - Name: documentdb.Name, - UID: documentdb.UID, - } +var _ = Describe("Physical Replication", func() { + It("deletes owned resources when DocumentDB is not present", func() { + ctx := context.Background() + namespace := "default" + + documentdb := baseDocumentDB("docdb-not-present", namespace) + documentdb.UID = types.UID("docdb-not-present-uid") + documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ + CrossCloudNetworkingStrategy: string(util.AzureFleet), + Primary: "member-2", + ClusterList: []dbpreview.MemberCluster{ + {Name: "member-2"}, + {Name: "member-3"}, + }, + } - ownedService := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "owned-service", - Namespace: namespace, - OwnerReferences: []metav1.OwnerReference{ownerRef}, - }, - } + ownerRef := metav1.OwnerReference{ + APIVersion: "documentdb.io/preview", + Kind: "DocumentDB", + Name: documentdb.Name, + UID: documentdb.UID, + } - ownedCluster := &cnpgv1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "owned-cnpg", - Namespace: namespace, - OwnerReferences: []metav1.OwnerReference{ownerRef}, - }, - } + ownedService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "owned-service", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ownerRef}, + }, + } - clusterNameConfigMap := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster-name", - Namespace: "kube-system", - }, - Data: map[string]string{ - "name": "member-1", - }, - } + ownedCluster := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "owned-cnpg", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ownerRef}, + }, + } - reconciler := buildDocumentDBReconciler(t, documentdb, ownedService, ownedCluster, clusterNameConfigMap) + clusterNameConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-name", + Namespace: "kube-system", + }, + Data: map[string]string{ + "name": "member-1", + }, + } - result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: documentdb.Name, Namespace: namespace}}) - require.NoError(t, err) - require.Equal(t, ctrl.Result{}, result) + reconciler := buildDocumentDBReconciler(documentdb, ownedService, ownedCluster, clusterNameConfigMap) - service := &corev1.Service{} - err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedService.Name, Namespace: namespace}, service) - require.True(t, errors.IsNotFound(err)) + result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: documentdb.Name, Namespace: namespace}}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) - cluster := &cnpgv1.Cluster{} - err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedCluster.Name, Namespace: namespace}, cluster) - require.True(t, errors.IsNotFound(err)) -} + service := &corev1.Service{} + err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedService.Name, Namespace: namespace}, service) + Expect(errors.IsNotFound(err)).To(BeTrue()) -func TestTryUpdateClusterUpdatesExternalClusters(t *testing.T) { - ctx := context.Background() - namespace := "default" + cluster := &cnpgv1.Cluster{} + err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedCluster.Name, Namespace: namespace}, cluster) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) - documentdb := baseDocumentDB("docdb-repl", namespace) - documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ - CrossCloudNetworkingStrategy: string(util.None), - Primary: documentdb.Name, - ClusterList: []dbpreview.MemberCluster{ - {Name: documentdb.Name}, - {Name: "member-2"}, - }, - } + It("updates external clusters and synchronous config", func() { + ctx := context.Background() + namespace := "default" - current := &cnpgv1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "docdb-repl", - Namespace: namespace, - }, - Spec: cnpgv1.ClusterSpec{ - ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{ - Self: documentdb.Name, - Primary: documentdb.Name, - Source: documentdb.Name, - }, - ExternalClusters: []cnpgv1.ExternalCluster{ + documentdb := baseDocumentDB("docdb-repl", namespace) + documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ + CrossCloudNetworkingStrategy: string(util.None), + Primary: documentdb.Name, + ClusterList: []dbpreview.MemberCluster{ {Name: documentdb.Name}, {Name: "member-2"}, }, - PostgresConfiguration: cnpgv1.PostgresConfiguration{ - Synchronous: &cnpgv1.SynchronousReplicaConfiguration{ - Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, - Number: 1, + } + + current := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "docdb-repl", + Namespace: namespace, + }, + Spec: cnpgv1.ClusterSpec{ + ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{ + Self: documentdb.Name, + Primary: documentdb.Name, + Source: documentdb.Name, + }, + ExternalClusters: []cnpgv1.ExternalCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + }, + PostgresConfiguration: cnpgv1.PostgresConfiguration{ + Synchronous: &cnpgv1.SynchronousReplicaConfiguration{ + Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, + Number: 1, + }, }, }, - }, - } + } - desired := current.DeepCopy() - desired.Spec.ExternalClusters = []cnpgv1.ExternalCluster{ - {Name: documentdb.Name}, - {Name: "member-2"}, - {Name: "member-3"}, - } - desired.Spec.PostgresConfiguration.Synchronous = &cnpgv1.SynchronousReplicaConfiguration{ - Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, - Number: 2, - } + desired := current.DeepCopy() + desired.Spec.ExternalClusters = []cnpgv1.ExternalCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + {Name: "member-3"}, + } + desired.Spec.PostgresConfiguration.Synchronous = &cnpgv1.SynchronousReplicaConfiguration{ + Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, + Number: 2, + } - reconciler := buildDocumentDBReconciler(t, current) - replicationContext, err := util.GetReplicationContext(ctx, reconciler.Client, *documentdb) - require.NoError(t, err) + reconciler := buildDocumentDBReconciler(current) + replicationContext, err := util.GetReplicationContext(ctx, reconciler.Client, *documentdb) + Expect(err).ToNot(HaveOccurred()) - err, requeue := reconciler.TryUpdateCluster(ctx, current, desired, documentdb, replicationContext) - require.NoError(t, err) - require.Equal(t, time.Duration(-1), requeue) + err, requeue := reconciler.TryUpdateCluster(ctx, current, desired, documentdb, replicationContext) + Expect(err).ToNot(HaveOccurred()) + Expect(requeue).To(Equal(time.Duration(-1))) - updated := &cnpgv1.Cluster{} - require.NoError(t, reconciler.Client.Get(ctx, types.NamespacedName{Name: current.Name, Namespace: namespace}, updated)) - require.Len(t, updated.Spec.ExternalClusters, 3) - require.Equal(t, 2, updated.Spec.PostgresConfiguration.Synchronous.Number) -} + updated := &cnpgv1.Cluster{} + Expect(reconciler.Client.Get(ctx, types.NamespacedName{Name: current.Name, Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Spec.ExternalClusters).To(HaveLen(3)) + Expect(updated.Spec.PostgresConfiguration.Synchronous.Number).To(Equal(2)) + }) +}) diff --git a/operator/src/internal/utils/constants.go b/operator/src/internal/utils/constants.go index 25c23835..6aebcea6 100644 --- a/operator/src/internal/utils/constants.go +++ b/operator/src/internal/utils/constants.go @@ -24,6 +24,8 @@ const ( LABEL_NODE_INDEX = "node_index" LABEL_SERVICE_TYPE = "service_type" LABEL_REPLICATION_CLUSTER_TYPE = "replication_cluster_type" + LABEL_DOCUMENTDB_NAME = "documentdb.io/name" + LABEL_DOCUMENTDB_COMPONENT = "documentdb.io/component" DOCUMENTDB_SERVICE_PREFIX = "documentdb-service-" diff --git a/operator/src/internal/utils/util.go b/operator/src/internal/utils/util.go index 0762981e..89fd34ba 100644 --- a/operator/src/internal/utils/util.go +++ b/operator/src/internal/utils/util.go @@ -8,7 +8,6 @@ import ( "fmt" "os" "strconv" - "strings" "time" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" @@ -16,7 +15,9 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" @@ -363,7 +364,7 @@ func DeleteOwnedResources(ctx context.Context, c client.Client, owner metav1.Obj var mcsList fleetv1alpha1.MultiClusterServiceList if err := c.List(ctx, &mcsList, listInNamespace); err != nil && !errors.IsNotFound(err) { // Ignore if CRD doesn't exist - if !isNoKindMatchError(err) { + if !isCRDMissing(err) { return fmt.Errorf("failed to list MultiClusterServices: %w", err) } } else { @@ -381,7 +382,7 @@ func DeleteOwnedResources(ctx context.Context, c client.Client, owner metav1.Obj var serviceExportList fleetv1alpha1.ServiceExportList if err := c.List(ctx, &serviceExportList, listInNamespace); err != nil && !errors.IsNotFound(err) { // Ignore if CRD doesn't exist - if !isNoKindMatchError(err) { + if !isCRDMissing(err) { return fmt.Errorf("failed to list ServiceExports: %w", err) } } else { @@ -402,13 +403,13 @@ func DeleteOwnedResources(ctx context.Context, c client.Client, owner metav1.Obj return nil } -// isNoKindMatchError checks if the error is a "no kind match" error, which occurs when +// isCRDMissing checks if the error is a "no kind match" error, which occurs when // a CRD is not installed in the cluster -func isNoKindMatchError(err error) bool { +func isCRDMissing(err error) bool { if err == nil { return false } - return strings.Contains(err.Error(), "no matches for kind") + return meta.IsNoMatchError(err) || runtime.IsNotRegisteredError(err) } // GenerateConnectionString returns a MongoDB connection string for the DocumentDB instance. From b817d65171744430e5120faa45dca64080d2d873 Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Wed, 28 Jan 2026 15:50:51 -0500 Subject: [PATCH 6/8] remove token from logs --- operator/src/internal/controller/physical_replication.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index cc4ed6d9..c40c9dcf 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -411,7 +411,7 @@ func (r *DocumentDBReconciler) getPrimaryChangePatchOps(ctx context.Context, pat if err != nil || refreshTime > 0 { return err, refreshTime } - log.Log.Info("Token read successfully", "token", token) + log.Log.Info("Token read successfully") // Update the configuration with the token replicaClusterConfig.PromotionToken = token @@ -722,7 +722,7 @@ func (r *DocumentDBReconciler) ForceReconcileInternalServiceExports(ctx context. if err := r.Client.List(ctx, iseList, client.InNamespace(fleetMemberNamespace)); err != nil { // If the CRD doesn't exist or namespace doesn't exist, skip if errors.IsNotFound(err) { - return reconciled, nil + continue } return reconciled, fmt.Errorf("failed to list InternalServiceExports: %w", err) } From a15693035f26dd5257eab508d532b44889e568ae Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Wed, 28 Jan 2026 16:19:07 -0500 Subject: [PATCH 7/8] Move constant to proper file --- operator/src/internal/controller/physical_replication.go | 4 +--- operator/src/internal/utils/constants.go | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index c40c9dcf..ea045c03 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -672,8 +672,6 @@ func (r *DocumentDBReconciler) CleanupMismatchedServiceImports(ctx context.Conte return deleted, nil, fmt.Errorf("failed to list ServiceImports: %w", err) } - inUseByAnnotation := "networking.fleet.azure.com/service-in-use-by" - for i := range serviceImportList.Items { badServiceImport := &serviceImportList.Items[i] // If it has an OwnerReference, then it is properly being used by the cluster's MCS @@ -686,7 +684,7 @@ func (r *DocumentDBReconciler) CleanupMismatchedServiceImports(ctx context.Conte continue } - inUseBy, exists := annotations[inUseByAnnotation] + inUseBy, exists := annotations[util.FLEET_IN_USE_BY_ANNOTATION] // If it has its own name as the cluster name, then it has erroneously attached itself to the export if !exists || !containsClusterName(inUseBy, replicationContext.FleetMemberName) { continue diff --git a/operator/src/internal/utils/constants.go b/operator/src/internal/utils/constants.go index 6aebcea6..92873d7a 100644 --- a/operator/src/internal/utils/constants.go +++ b/operator/src/internal/utils/constants.go @@ -26,6 +26,7 @@ const ( LABEL_REPLICATION_CLUSTER_TYPE = "replication_cluster_type" LABEL_DOCUMENTDB_NAME = "documentdb.io/name" LABEL_DOCUMENTDB_COMPONENT = "documentdb.io/component" + FLEET_IN_USE_BY_ANNOTATION = "networking.fleet.azure.com/service-in-use-by" DOCUMENTDB_SERVICE_PREFIX = "documentdb-service-" From 8860648cdcfd1a721c0e7229894473f2280e1293 Mon Sep 17 00:00:00 2001 From: Alexander Laye Date: Thu, 29 Jan 2026 11:22:14 -0500 Subject: [PATCH 8/8] check update --- .../fleet-add-region/add-region.sh | 4 +- .../fleet-add-region/check.sh | 125 ++++++++++++++++++ .../fleet-add-region/remove-region.sh | 4 +- 3 files changed, 131 insertions(+), 2 deletions(-) diff --git a/documentdb-playground/fleet-add-region/add-region.sh b/documentdb-playground/fleet-add-region/add-region.sh index 670aa34d..f820cad4 100755 --- a/documentdb-playground/fleet-add-region/add-region.sh +++ b/documentdb-playground/fleet-add-region/add-region.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -e + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" @@ -92,4 +94,4 @@ if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then fi rm -f "$TEMP_YAML" -echo "Done." \ No newline at end of file +echo "Done." diff --git a/documentdb-playground/fleet-add-region/check.sh b/documentdb-playground/fleet-add-region/check.sh index a785f4ec..92679c54 100755 --- a/documentdb-playground/fleet-add-region/check.sh +++ b/documentdb-playground/fleet-add-region/check.sh @@ -212,6 +212,126 @@ check_non_member_cluster() { else record_success "$cluster" "Namespace $DOCUMENTDB_NAMESPACE absent; no DocumentDB services present" fi + + # Check for ServiceExports that shouldn't exist on non-member clusters + local svcexport_json + if svcexport_json=$(kubectl --context "$cluster" get serviceexport -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + local doc_owned_exports + doc_owned_exports=$(echo "$svcexport_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local export_count + export_count=$(echo "$doc_owned_exports" | jq 'length') + if (( export_count == 0 )); then + record_success "$cluster" "No DocumentDB ServiceExports present" + else + record_failure "$cluster" "Found $export_count DocumentDB ServiceExport(s) but region is not in clusterList" + fi + else + record_success "$cluster" "ServiceExport CRD unavailable; no DocumentDB ServiceExports present" + fi + + # Check for MultiClusterServices that shouldn't exist on non-member clusters + local mcs_json + if mcs_json=$(kubectl --context "$cluster" get multiclusterservice -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + local doc_owned_mcs + doc_owned_mcs=$(echo "$mcs_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local mcs_count + mcs_count=$(echo "$doc_owned_mcs" | jq 'length') + if (( mcs_count == 0 )); then + record_success "$cluster" "No DocumentDB MultiClusterServices present" + else + record_failure "$cluster" "Found $mcs_count DocumentDB MultiClusterService(s) but region is not in clusterList" + fi + else + record_success "$cluster" "MultiClusterService CRD unavailable; no DocumentDB MultiClusterServices present" + fi +} + +check_multiclusterservices() { + local hub_cluster="$1" + + log "Checking MultiClusterServices on hub cluster: $hub_cluster" + + local mcs_json + if ! mcs_json=$(kubectl --context "$hub_cluster" get multiclusterservice -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$hub_cluster" "Unable to list MultiClusterServices: $mcs_json" + return + fi + + local doc_owned_mcs + doc_owned_mcs=$(echo "$mcs_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local mcs_count + mcs_count=$(echo "$doc_owned_mcs" | jq 'length') + + if (( mcs_count == 0 )); then + record_failure "$hub_cluster" "No MultiClusterServices owned by DocumentDB $DOCUMENTDB_NAME" + return + fi + + record_success "$hub_cluster" "Found $mcs_count MultiClusterService(s) owned by DocumentDB $DOCUMENTDB_NAME" + + # Check each MCS for Valid condition + local invalid_mcs + invalid_mcs=$(echo "$doc_owned_mcs" | jq '[.[] | select(.status.conditions == null or ([.status.conditions[] | select(.type=="Valid" and .status=="True")] | length == 0)) | .metadata.name]') + local invalid_count + invalid_count=$(echo "$invalid_mcs" | jq 'length') + + if (( invalid_count > 0 )); then + local invalid_list + invalid_list=$(echo "$invalid_mcs" | jq -r 'join(", ")') + record_failure "$hub_cluster" "MultiClusterServices not valid: $invalid_list" + else + record_success "$hub_cluster" "All $mcs_count MultiClusterService(s) are valid" + fi +} + +check_serviceexports() { + local cluster="$1" + + local svcexport_json + if ! svcexport_json=$(kubectl --context "$cluster" get serviceexport -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$cluster" "Unable to list ServiceExports: $svcexport_json" + return + fi + + local doc_owned_exports + doc_owned_exports=$(echo "$svcexport_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local export_count + export_count=$(echo "$doc_owned_exports" | jq 'length') + + if (( export_count == 0 )); then + record_failure "$cluster" "No ServiceExports owned by DocumentDB $DOCUMENTDB_NAME" + return + fi + + record_success "$cluster" "Found $export_count ServiceExport(s) owned by DocumentDB $DOCUMENTDB_NAME" + + # Check each ServiceExport for Valid condition + local invalid_exports + invalid_exports=$(echo "$doc_owned_exports" | jq '[.[] | select(.status.conditions == null or ([.status.conditions[] | select(.type=="Valid" and .status=="True")] | length == 0)) | .metadata.name]') + local invalid_count + invalid_count=$(echo "$invalid_exports" | jq 'length') + + if (( invalid_count > 0 )); then + local invalid_list + invalid_list=$(echo "$invalid_exports" | jq -r 'join(", ")') + record_failure "$cluster" "ServiceExports not valid: $invalid_list" + else + record_success "$cluster" "All $export_count ServiceExport(s) are valid" + fi + + # Check for conflicts + local conflicted_exports + conflicted_exports=$(echo "$doc_owned_exports" | jq '[.[] | select([.status.conditions[]? | select(.type=="Conflict" and .status=="True")] | length > 0) | .metadata.name]') + local conflict_count + conflict_count=$(echo "$conflicted_exports" | jq 'length') + + if (( conflict_count > 0 )); then + local conflict_list + conflict_list=$(echo "$conflicted_exports" | jq -r 'join(", ")') + record_failure "$cluster" "ServiceExports have conflicts: $conflict_list" + else + record_success "$cluster" "No ServiceExport conflicts detected" + fi } main() { @@ -302,11 +422,16 @@ main() { fi done + # Check MultiClusterServices on the hub cluster + echo + check_multiclusterservices "$hub_cluster" + for cluster in "${CLUSTER_ARRAY[@]}"; do echo if [[ -n "${DOCUMENTDB_CLUSTER_SET[$cluster]:-}" ]]; then log "Checking DocumentDB member cluster: $cluster" check_member_cluster "$cluster" + check_serviceexports "$cluster" else log "Checking non-member cluster: $cluster" check_non_member_cluster "$cluster" diff --git a/documentdb-playground/fleet-add-region/remove-region.sh b/documentdb-playground/fleet-add-region/remove-region.sh index 04871a83..64931716 100755 --- a/documentdb-playground/fleet-add-region/remove-region.sh +++ b/documentdb-playground/fleet-add-region/remove-region.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -e + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" @@ -98,4 +100,4 @@ if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then fi rm -f "$TEMP_YAML" -echo "Done." \ No newline at end of file +echo "Done."