diff --git a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh index 1f329bf4..eab2d86f 100755 --- a/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh +++ b/documentdb-playground/aks-fleet-deployment/deploy-fleet-bicep.sh @@ -10,19 +10,11 @@ RG_LOCATION="${RG_LOCATION:-eastus2}" HUB_REGION="${HUB_REGION:-westus3}" SCRIPT_DIR="$(dirname "$0")" -# Regions for member clusters (keep in sync with parameters.bicepparam if you change it) -if [ -n "${MEMBER_REGIONS_CSV:-}" ]; then - IFS=',' read -r -a MEMBER_REGIONS <<< "$MEMBER_REGIONS_CSV" -else - MEMBER_REGIONS=("westus3" "uksouth" "eastus2") -fi - # Optional: explicitly override the VM size used by the template param vmSize. # If left empty, the template's default (currently Standard_DS2_v2) will be used. KUBE_VM_SIZE="${KUBE_VM_SIZE:-}" - -# Build JSON arrays for parameters (after any fallbacks) -MEMBER_REGIONS_JSON=$(printf '%s\n' "${MEMBER_REGIONS[@]}" | jq -R . | jq -s .) +# Optional: override the default member regions defined in main.bicep (comma-separated list) +MEMBER_REGIONS="${MEMBER_REGIONS:-}" # Wait for any in-progress AKS operations in this resource group to finish wait_for_no_inprogress() { @@ -59,16 +51,24 @@ if ! wait_for_no_inprogress "$RESOURCE_GROUP"; then echo "Exiting without changes due to in-progress operations. Re-run when provisioning completes." >&2 exit 1 fi + +PARAMS=() # Build parameter overrides -PARAMS=( - --parameters "$SCRIPT_DIR/parameters.bicepparam" - --parameters memberRegions="$MEMBER_REGIONS_JSON" -) if [ -n "$KUBE_VM_SIZE" ]; then echo "Overriding kubernetes VM size with: $KUBE_VM_SIZE" PARAMS+=( --parameters vmSize="$KUBE_VM_SIZE" ) fi +if [ -n "$MEMBER_REGIONS" ]; then + echo "Overriding member regions with: $MEMBER_REGIONS" + MEMBER_REGION_JSON=$(printf '%s' "$MEMBER_REGIONS" | jq -Rsc 'split(",") | map(gsub("^\\s+|\\s+$";"")) | map(select(length>0))') + if [ "$(printf '%s' "$MEMBER_REGION_JSON" | jq 'length')" -eq 0 ]; then + echo "MEMBER_REGIONS did not contain any valid entries" >&2 + exit 1 + fi + PARAMS+=( --parameters memberRegions="$MEMBER_REGION_JSON" ) +fi + DEPLOYMENT_NAME=${DEPLOYMENT_NAME:-"aks-deployment-$(date +%s)"} az deployment group create \ --name "$DEPLOYMENT_NAME" \ @@ -84,6 +84,23 @@ DEPLOYMENT_OUTPUT=$(az deployment group show \ # Extract outputs MEMBER_CLUSTER_NAMES=$(echo $DEPLOYMENT_OUTPUT | jq -r '.memberClusterNames.value[]') +VNET_NAMES=$(echo $DEPLOYMENT_OUTPUT | jq -r '.memberVnetNames.value[]') + +while read -r vnet1; do + while read -r vnet2; do + [ -z "$vnet1" ] && continue + [ -z "$vnet2" ] && continue + [ "$vnet1" = "$vnet2" ] && continue + echo "Peering VNet '$vnet1' with VNet '$vnet2'..." + az network vnet peering create \ + --name "${vnet1}-to-${vnet2}-peering" \ + --resource-group "$RESOURCE_GROUP" \ + --vnet-name "$vnet1" \ + --remote-vnet "$vnet2" \ + --allow-vnet-access true \ + --allow-forwarded-traffic true + done <<< "$VNET_NAMES" +done <<< "$VNET_NAMES" HUB_CLUSTER="" while read -r cluster; do @@ -97,6 +114,20 @@ git clone https://github.com/kubefleet-dev/kubefleet.git $kubeDir pushd $kubeDir # Set up HUB_CLUSTER as the hub kubectl config use-context $HUB_CLUSTER + +# Install cert manager on hub cluster +helm repo add jetstack https://charts.jetstack.io +helm repo update + +echo -e "\nInstalling cert-manager on $HUB_CLUSTER..." +helm upgrade --install cert-manager jetstack/cert-manager \ + --namespace cert-manager \ + --create-namespace \ + --set crds.enabled=true +kubectl rollout status deployment/cert-manager -n cert-manager --timeout=240s || true +echo "Pods ($HUB_CLUSTER):" +kubectl get pods -n cert-manager -o wide || true + export REGISTRY="ghcr.io/kubefleet-dev/kubefleet" export TAG=$(curl "https://api.github.com/repos/kubefleet-dev/kubefleet/tags" | jq -r '.[0].name') # Gets latest tag # Install the helm chart for running Fleet agents on the hub cluster. @@ -111,7 +142,10 @@ helm upgrade --install hub-agent ./charts/hub-agent/ \ --set logFileMaxSize=100000 \ --set MaxConcurrentClusterPlacement=200 \ --set namespace=fleet-system-hub \ - --set enableWorkload=true + --set enableWorkload=true #\ + #--set useCertManager=true \ + #--set enableWebhook=true + # Run the script. chmod +x ./hack/membership/joinMC.sh diff --git a/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh b/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh index 7f5077a9..3ac3633e 100755 --- a/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh +++ b/documentdb-playground/aks-fleet-deployment/install-documentdb-operator.sh @@ -10,6 +10,7 @@ set -euo pipefail RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" HUB_REGION="${HUB_REGION:-westus3}" CHART_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)/operator/documentdb-helm-chart" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" VERSION="${VERSION:-200}" VALUES_FILE="${VALUES_FILE:-}" BUILD_CHART="${BUILD_CHART:-true}" @@ -76,7 +77,7 @@ else fi fi -kubectl --context "$HUB_CLUSTER" apply -f ./documentdb-operator-crp.yaml +kubectl --context "$HUB_CLUSTER" apply -f $SCRIPT_DIR/documentdb-operator-crp.yaml # Get all member clusters diff --git a/documentdb-playground/aks-fleet-deployment/main.bicep b/documentdb-playground/aks-fleet-deployment/main.bicep index b2354332..e54f775e 100644 --- a/documentdb-playground/aks-fleet-deployment/main.bicep +++ b/documentdb-playground/aks-fleet-deployment/main.bicep @@ -19,18 +19,6 @@ param nodeCount int = 2 // Optionally include kubernetesVersion in cluster properties var maybeK8sVersion = empty(kubernetesVersion) ? {} : { kubernetesVersion: kubernetesVersion } -// Define non-overlapping address spaces for each member cluster -var memberVnetAddressSpaces = [ - '10.1.0.0/16' // westus3 - '10.2.0.0/16' // uksouth - '10.3.0.0/16' // eastus2 -] -var memberSubnetAddressSpaces = [ - '10.1.0.0/20' // westus3 - '10.2.0.0/20' // uksouth - '10.3.0.0/20' // eastus2 -] - // Member VNets resource memberVnets 'Microsoft.Network/virtualNetworks@2023-09-01' = [for (region, i) in memberRegions: { name: 'member-${region}-vnet' @@ -38,14 +26,14 @@ resource memberVnets 'Microsoft.Network/virtualNetworks@2023-09-01' = [for (regi properties: { addressSpace: { addressPrefixes: [ - memberVnetAddressSpaces[i] + '10.${i}.0.0/16' ] } subnets: [ { name: 'aks-subnet' properties: { - addressPrefix: memberSubnetAddressSpaces[i] + addressPrefix: '10.${i}.0.0/20' } } ] @@ -84,59 +72,5 @@ resource memberClusters 'Microsoft.ContainerService/managedClusters@2023-10-01' ] }] -// Create peering pairs for full mesh -var peeringPairs = [ - { - sourceIndex: 0 - targetIndex: 1 - sourceName: memberRegions[0] - targetName: memberRegions[1] - } - { - sourceIndex: 0 - targetIndex: 2 - sourceName: memberRegions[0] - targetName: memberRegions[2] - } - { - sourceIndex: 1 - targetIndex: 2 - sourceName: memberRegions[1] - targetName: memberRegions[2] - } -] - -// VNet peerings - Forward direction -resource memberPeeringsForward 'Microsoft.Network/virtualNetworks/virtualNetworkPeerings@2023-09-01' = [for pair in peeringPairs: { - name: '${pair.sourceName}-to-${pair.targetName}' - parent: memberVnets[pair.sourceIndex] - properties: { - remoteVirtualNetwork: { - id: memberVnets[pair.targetIndex].id - } - allowVirtualNetworkAccess: true - allowForwardedTraffic: true - allowGatewayTransit: false - useRemoteGateways: false - } -}] - -// VNet peerings - Reverse direction -resource memberPeeringsReverse 'Microsoft.Network/virtualNetworks/virtualNetworkPeerings@2023-09-01' = [for pair in peeringPairs: { - name: '${pair.targetName}-to-${pair.sourceName}' - parent: memberVnets[pair.targetIndex] - properties: { - remoteVirtualNetwork: { - id: memberVnets[pair.sourceIndex].id - } - allowVirtualNetworkAccess: true - allowForwardedTraffic: true - allowGatewayTransit: false - useRemoteGateways: false - } - dependsOn: [ - memberPeeringsForward - ] -}] - output memberClusterNames array = [for i in range(0, length(memberRegions)): memberClusters[i].name] +output memberVnetNames array = [for i in range(0, length(memberRegions)): memberVnets[i].name] diff --git a/documentdb-playground/aks-fleet-deployment/parameters.bicepparam b/documentdb-playground/aks-fleet-deployment/parameters.bicepparam deleted file mode 100644 index 97f91506..00000000 --- a/documentdb-playground/aks-fleet-deployment/parameters.bicepparam +++ /dev/null @@ -1,10 +0,0 @@ -using './main.bicep' - -param memberRegions = [ - 'westus3' - 'uksouth' - 'eastus2' -] -param kubernetesVersion = '' -param nodeCount = 2 -param vmSize = 'Standard_DS2_v2' diff --git a/documentdb-playground/fleet-add-region/README.md b/documentdb-playground/fleet-add-region/README.md new file mode 100644 index 00000000..9b1eb5df --- /dev/null +++ b/documentdb-playground/fleet-add-region/README.md @@ -0,0 +1,38 @@ +# Fleet Add Region Playground + +This playground focuses on exercising DocumentDB across changing fleet shapes. +It builds on the AKS Fleet Deployment playground (shared Bicep templates and +install scripts) but layers extra tooling to: add a region, remove a region, +verify wiring, and iterate rapidly on those flows before changes graduate to +docs or automation. + +## Goals + +- **Prove add/remove**: Validate that DocumentDB state, KubeFleet placements, +and CNPG clusters survive when a member region joins or leaves. +- **Shareable workflows**: Capture the manual commands and patches used during +prototyping so they can be replayed by others. +- **Regression surface**: Provide a safe spot to run disruptive tests (failovers, +partial rollouts, patching CRPs) without touching the core deployment guide. +- **Consistency with AKS Fleet**: Reuse credentials, hub selection, and discovery +logic from the `aks-fleet-deployment` playground to avoid divergence. + +- `deploy-four-region.sh`: Convenience wrapper to stand up a fresh four-region +fleet using the upstream deployment assets before exercising the add/remove scripts. + +## Typical Workflow + +1. **Bootstrap fleet** using the `deploy-four-region.sh` script which calls the +functions from `../aks-fleet-deployment` (Bicep deployment, cert-manager install, +operator install). All environment variables (e.g., `RESOURCE_GROUP`, `HUB_REGION`) +match the upstream playground so secrets and kubeconfigs remain reusable. +2. **Stand up baseline DocumentDB** via `documentdb-three-region.sh`, which will +make a 3-region cluster, excluding the westus2 region to start. +3. **Introduce changes**: + - Add a westus2 with `add-region.sh` to patch `DocumentDB` and `resourceplacement` + lists. + - Validate with `check.sh` and watch KubeFleet propagate CRs. + - Remove the hub region, westus3 via `remove-region.sh` and re-run `check.sh` + to confirm cleanup. +4. **Experiment repeatedly**, adjusting variables such as `EXCLUDE_REGION`, `HUB_REGION`, + or `DOCUMENTDB_PASSWORD` to simulate production scenarios. diff --git a/documentdb-playground/fleet-add-region/add-region.sh b/documentdb-playground/fleet-add-region/add-region.sh new file mode 100755 index 00000000..f820cad4 --- /dev/null +++ b/documentdb-playground/fleet-add-region/add-region.sh @@ -0,0 +1,97 @@ +#!/bin/bash + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +PRIMARY_REGION="${PRIMARY_REGION:-eastus2}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" + +# Dynamically get member clusters from Azure +echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." +MEMBER_CLUSTERS=$(az aks list -g "$RESOURCE_GROUP" -o json | jq -r '.[] | select(.name|startswith("member-")) | .name' | sort) + +if [ -z "$MEMBER_CLUSTERS" ]; then + echo "Error: No member clusters found in resource group $RESOURCE_GROUP" + echo "Please ensure the fleet is deployed first" + exit 1 +fi + +CLUSTER_ARRAY=($MEMBER_CLUSTERS) +echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" +EXCLUDE_CLUSTER="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then EXCLUDE_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$PRIMARY_REGION"* ]]; then PRIMARY_CLUSTER="$cluster"; fi +done + +# Build the cluster list YAML with proper indentation +CLUSTER_LIST="" +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then + echo "Including cluster $cluster in DocumentDB configuration" + fi + if [ -z "$CLUSTER_LIST" ]; then + CLUSTER_LIST=" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP=" - ${cluster}" + else + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" + fi +done + +TEMP_YAML=$(mktemp) + +# Use sed for safer substitution +sed -e "s/{{DOCUMENTDB_PASSWORD}}/$DOCUMENTDB_PASSWORD/g" \ + -e "s/{{PRIMARY_CLUSTER}}/$PRIMARY_CLUSTER/g" \ + "$SCRIPT_DIR/documentdb-resource-crp.yaml" | \ +while IFS= read -r line; do + if [[ "$line" == '{{CLUSTER_LIST}}' ]]; then + echo "$CLUSTER_LIST" + elif [[ "$line" == '{{CLUSTER_LIST_CRP}}' ]]; then + echo "$CLUSTER_LIST_CRP" + else + echo "$line" + fi +done > "$TEMP_YAML" + +echo "" +echo "Applying DocumentDB multi-region configuration..." + +MAX_RETRIES=60 +RETRY_INTERVAL=3 +RETRY_COUNT=0 + +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do + kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" &> /dev/null + + echo "Checking if $EXCLUDE_CLUSTER has been added to clusterReplication on the excluded cluster..." + + # Get the clusterReplication.clusters field from the DocumentDB object on the excluded cluster + CLUSTER_LIST_JSON=$(kubectl --context "$EXCLUDE_CLUSTER" get documentdb documentdb-preview -n documentdb-preview-ns -o jsonpath='{.spec.clusterReplication.clusterList[*].name}' 2>/dev/null) + + if echo "$CLUSTER_LIST_JSON" | grep -q "$EXCLUDE_CLUSTER"; then + echo "Success: $EXCLUDE_CLUSTER is now included in clusterReplication field" + break + fi + + RETRY_COUNT=$((RETRY_COUNT + 1)) + echo "Cluster not yet in clusterReplication (attempt $RETRY_COUNT/$MAX_RETRIES). Retrying in ${RETRY_INTERVAL}s..." + sleep $RETRY_INTERVAL +done + +if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then + echo "Error: Timed out waiting for $EXCLUDE_CLUSTER to appear in clusterReplication" + exit 1 +fi + +rm -f "$TEMP_YAML" +echo "Done." diff --git a/documentdb-playground/fleet-add-region/check.sh b/documentdb-playground/fleet-add-region/check.sh new file mode 100755 index 00000000..92679c54 --- /dev/null +++ b/documentdb-playground/fleet-add-region/check.sh @@ -0,0 +1,454 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Validates DocumentDB deployment state across fleet member clusters by comparing +# the configured replication topology with the actual CNPG resources. +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +DOCUMENTDB_NAME="${DOCUMENTDB_NAME:-documentdb-preview}" +DOCUMENTDB_NAMESPACE="${DOCUMENTDB_NAMESPACE:-documentdb-preview-ns}" +DOCUMENTDB_APP_LABEL="${DOCUMENTDB_APP_LABEL:-$DOCUMENTDB_NAME}" +CLUSTER_SELECTOR_PREFIX="${CLUSTER_SELECTOR_PREFIX:-member-}" + +declare -i OVERALL_STATUS=0 +FAILURE_MESSAGES=() + +declare -a CLUSTER_ARRAY=() +declare -a DOCUMENTDB_CLUSTER_NAMES=() +declare -A DOCUMENTDB_CLUSTER_SET=() +EXPECTED_CLUSTER_NAMES_JSON="[]" +EXPECTED_CNPG_NAMES_JSON="[]" +declare -i EXPECTED_CLUSTER_COUNT=0 + +log() { + echo "[$(date '+%Y-%m-%dT%H:%M:%S%z')] $*" +} + +require_command() { + if ! command -v "$1" >/dev/null 2>&1; then + echo "Error: required command '$1' not found in PATH" >&2 + exit 1 + fi +} + +record_success() { + local cluster="$1"; shift + echo "[$cluster] ✔ $*" +} + +record_failure() { + local cluster="$1"; shift + echo "[$cluster] ✖ $*" + OVERALL_STATUS=1 + FAILURE_MESSAGES+=("[$cluster] $*") +} + +get_cnpg_name_for_cluster() { + local cluster="$1" + + if ! kubectl --context "$cluster" get namespace "$DOCUMENTDB_NAMESPACE" >/dev/null 2>&1; then + return 1 + fi + + local cnpg_list + if ! cnpg_list=$(kubectl --context "$cluster" get clusters.postgresql.cnpg.io -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + return 1 + fi + + local doc_owned_clusters + doc_owned_clusters=$(echo "$cnpg_list" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local doc_owned_count + doc_owned_count=$(echo "$doc_owned_clusters" | jq 'length') + if (( doc_owned_count == 0 )); then + return 1 + fi + + echo "$doc_owned_clusters" | jq -r '.[0].metadata.name' +} + +check_member_cluster() { + local cluster="$1" + + if ! kubectl --context "$cluster" get namespace "$DOCUMENTDB_NAMESPACE" >/dev/null 2>&1; then + record_failure "$cluster" "Namespace $DOCUMENTDB_NAMESPACE is missing" + return + fi + + local cnpg_list + if ! cnpg_list=$(kubectl --context "$cluster" get clusters.postgresql.cnpg.io -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$cluster" "Unable to list CNPG clusters: $cnpg_list" + return + fi + + local doc_owned_clusters + doc_owned_clusters=$(echo "$cnpg_list" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local doc_owned_count + doc_owned_count=$(echo "$doc_owned_clusters" | jq 'length') + + if (( doc_owned_count == 0 )); then + record_failure "$cluster" "No CNPG Cluster owned by DocumentDB $DOCUMENTDB_NAME" + return + fi + + if (( doc_owned_count > 1 )); then + record_failure "$cluster" "Found $doc_owned_count CNPG Clusters owned by DocumentDB (expected 1)" + fi + + local cnpg_obj + cnpg_obj=$(echo "$doc_owned_clusters" | jq '.[0]') + local cnpg_name + cnpg_name=$(echo "$cnpg_obj" | jq -r '.metadata.name') + + local external_count + local expected_external_names_json + expected_external_names_json="$EXPECTED_CNPG_NAMES_JSON" + local expected_external_count + expected_external_count=$(echo "$expected_external_names_json" | jq 'length') + external_count=$(echo "$cnpg_obj" | jq '.spec.externalClusters // [] | length') + if (( external_count == expected_external_count )); then + record_success "$cluster" "Cluster $cnpg_name externalClusters count matches ($external_count)" + else + record_failure "$cluster" "Cluster $cnpg_name has $external_count externalClusters (expected $expected_external_count)" + fi + + local external_names_json + external_names_json=$(echo "$cnpg_obj" | jq '[.spec.externalClusters // [] | .[]? | .name] | map(select(. != null))') + local missing_names + missing_names=$(jq --argjson expected "$expected_external_names_json" --argjson actual "$external_names_json" -n '[ $expected[] | select(. as $item | ($actual | index($item)) | not) ]') + local missing_count + missing_count=$(echo "$missing_names" | jq 'length') + if (( missing_count > 0 )); then + local missing_list + missing_list=$(echo "$missing_names" | jq -r 'join(", ")') + record_failure "$cluster" "Cluster $cnpg_name missing externalClusters for: $missing_list" + else + record_success "$cluster" "Cluster $cnpg_name exposes all expected externalClusters" + fi + + local extra_names + extra_names=$(jq --argjson expected "$expected_external_names_json" --argjson actual "$external_names_json" -n '[ $actual[] | select(. as $item | ($expected | index($item)) | not) ]') + local extra_count + extra_count=$(echo "$extra_names" | jq 'length') + if (( extra_count > 0 )); then + local extra_list + extra_list=$(echo "$extra_names" | jq -r 'join(", ")') + record_failure "$cluster" "Cluster $cnpg_name has unexpected externalClusters: $extra_list" + fi + + local expected_instances + expected_instances=$(echo "$cnpg_obj" | jq '.spec.instances // 0') + local pods_json + if ! pods_json=$(kubectl --context "$cluster" get pods -n "$DOCUMENTDB_NAMESPACE" -l "cnpg.io/cluster=$cnpg_name" -o json 2>&1); then + record_failure "$cluster" "Unable to list pods for cluster $cnpg_name: $pods_json" + return + fi + local actual_pods + actual_pods=$(echo "$pods_json" | jq '.items | length') + if (( actual_pods == expected_instances )); then + record_success "$cluster" "Cluster $cnpg_name has $actual_pods pods (matches spec.instances)" + else + record_failure "$cluster" "Cluster $cnpg_name has $actual_pods pods (expected $expected_instances)" + fi + + local additional_service_count + additional_service_count=$(echo "$cnpg_obj" | jq '.spec.managed.services.additional // [] | length') + local expected_service_count=$((3 + additional_service_count)) + local services_json + if ! services_json=$(kubectl --context "$cluster" get svc -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$cluster" "Unable to list services in namespace $DOCUMENTDB_NAMESPACE: $services_json" + return + fi + local services_for_cluster + services_for_cluster=$(echo "$services_json" | jq --arg name "$cnpg_name" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="Cluster" and .name==$name))]') + local actual_service_count + actual_service_count=$(echo "$services_for_cluster" | jq 'length') + if (( actual_service_count == expected_service_count )); then + record_success "$cluster" "Cluster $cnpg_name has $actual_service_count services (expected $expected_service_count)" + else + record_failure "$cluster" "Cluster $cnpg_name has $actual_service_count services (expected $expected_service_count)" + fi +} + +check_non_member_cluster() { + local cluster="$1" + + local cnpg_list + if cnpg_list=$(kubectl --context "$cluster" get clusters.postgresql.cnpg.io -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + local doc_owned_clusters + doc_owned_clusters=$(echo "$cnpg_list" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local doc_owned_count + doc_owned_count=$(echo "$doc_owned_clusters" | jq 'length') + if (( doc_owned_count == 0 )); then + record_success "$cluster" "No DocumentDB CNPG clusters present" + else + record_failure "$cluster" "Found $doc_owned_count DocumentDB CNPG cluster(s) but region is not in clusterList" + fi + else + record_success "$cluster" "CNPG CRD unavailable; treated as no DocumentDB clusters" + fi + + local pods_json + if pods_json=$(kubectl --context "$cluster" get pods -n "$DOCUMENTDB_NAMESPACE" -l "app=$DOCUMENTDB_APP_LABEL" -o json 2>/dev/null); then + local pod_count + pod_count=$(echo "$pods_json" | jq '.items | length') + if (( pod_count == 0 )); then + record_success "$cluster" "No DocumentDB pods present" + else + record_failure "$cluster" "Found $pod_count DocumentDB pods but region is not in clusterList" + fi + else + record_success "$cluster" "Namespace $DOCUMENTDB_NAMESPACE absent; no DocumentDB pods present" + fi + + local services_json + if services_json=$(kubectl --context "$cluster" get svc -n "$DOCUMENTDB_NAMESPACE" -l "app=$DOCUMENTDB_APP_LABEL" -o json 2>/dev/null); then + local service_count + service_count=$(echo "$services_json" | jq '.items | length') + if (( service_count == 0 )); then + record_success "$cluster" "No DocumentDB services present" + else + record_failure "$cluster" "Found $service_count DocumentDB services but region is not in clusterList" + fi + else + record_success "$cluster" "Namespace $DOCUMENTDB_NAMESPACE absent; no DocumentDB services present" + fi + + # Check for ServiceExports that shouldn't exist on non-member clusters + local svcexport_json + if svcexport_json=$(kubectl --context "$cluster" get serviceexport -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + local doc_owned_exports + doc_owned_exports=$(echo "$svcexport_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local export_count + export_count=$(echo "$doc_owned_exports" | jq 'length') + if (( export_count == 0 )); then + record_success "$cluster" "No DocumentDB ServiceExports present" + else + record_failure "$cluster" "Found $export_count DocumentDB ServiceExport(s) but region is not in clusterList" + fi + else + record_success "$cluster" "ServiceExport CRD unavailable; no DocumentDB ServiceExports present" + fi + + # Check for MultiClusterServices that shouldn't exist on non-member clusters + local mcs_json + if mcs_json=$(kubectl --context "$cluster" get multiclusterservice -n "$DOCUMENTDB_NAMESPACE" -o json 2>/dev/null); then + local doc_owned_mcs + doc_owned_mcs=$(echo "$mcs_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local mcs_count + mcs_count=$(echo "$doc_owned_mcs" | jq 'length') + if (( mcs_count == 0 )); then + record_success "$cluster" "No DocumentDB MultiClusterServices present" + else + record_failure "$cluster" "Found $mcs_count DocumentDB MultiClusterService(s) but region is not in clusterList" + fi + else + record_success "$cluster" "MultiClusterService CRD unavailable; no DocumentDB MultiClusterServices present" + fi +} + +check_multiclusterservices() { + local hub_cluster="$1" + + log "Checking MultiClusterServices on hub cluster: $hub_cluster" + + local mcs_json + if ! mcs_json=$(kubectl --context "$hub_cluster" get multiclusterservice -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$hub_cluster" "Unable to list MultiClusterServices: $mcs_json" + return + fi + + local doc_owned_mcs + doc_owned_mcs=$(echo "$mcs_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local mcs_count + mcs_count=$(echo "$doc_owned_mcs" | jq 'length') + + if (( mcs_count == 0 )); then + record_failure "$hub_cluster" "No MultiClusterServices owned by DocumentDB $DOCUMENTDB_NAME" + return + fi + + record_success "$hub_cluster" "Found $mcs_count MultiClusterService(s) owned by DocumentDB $DOCUMENTDB_NAME" + + # Check each MCS for Valid condition + local invalid_mcs + invalid_mcs=$(echo "$doc_owned_mcs" | jq '[.[] | select(.status.conditions == null or ([.status.conditions[] | select(.type=="Valid" and .status=="True")] | length == 0)) | .metadata.name]') + local invalid_count + invalid_count=$(echo "$invalid_mcs" | jq 'length') + + if (( invalid_count > 0 )); then + local invalid_list + invalid_list=$(echo "$invalid_mcs" | jq -r 'join(", ")') + record_failure "$hub_cluster" "MultiClusterServices not valid: $invalid_list" + else + record_success "$hub_cluster" "All $mcs_count MultiClusterService(s) are valid" + fi +} + +check_serviceexports() { + local cluster="$1" + + local svcexport_json + if ! svcexport_json=$(kubectl --context "$cluster" get serviceexport -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + record_failure "$cluster" "Unable to list ServiceExports: $svcexport_json" + return + fi + + local doc_owned_exports + doc_owned_exports=$(echo "$svcexport_json" | jq --arg doc "$DOCUMENTDB_NAME" '[.items[] | select(any(.metadata.ownerReferences[]?; .kind=="DocumentDB" and .name==$doc))]') + local export_count + export_count=$(echo "$doc_owned_exports" | jq 'length') + + if (( export_count == 0 )); then + record_failure "$cluster" "No ServiceExports owned by DocumentDB $DOCUMENTDB_NAME" + return + fi + + record_success "$cluster" "Found $export_count ServiceExport(s) owned by DocumentDB $DOCUMENTDB_NAME" + + # Check each ServiceExport for Valid condition + local invalid_exports + invalid_exports=$(echo "$doc_owned_exports" | jq '[.[] | select(.status.conditions == null or ([.status.conditions[] | select(.type=="Valid" and .status=="True")] | length == 0)) | .metadata.name]') + local invalid_count + invalid_count=$(echo "$invalid_exports" | jq 'length') + + if (( invalid_count > 0 )); then + local invalid_list + invalid_list=$(echo "$invalid_exports" | jq -r 'join(", ")') + record_failure "$cluster" "ServiceExports not valid: $invalid_list" + else + record_success "$cluster" "All $export_count ServiceExport(s) are valid" + fi + + # Check for conflicts + local conflicted_exports + conflicted_exports=$(echo "$doc_owned_exports" | jq '[.[] | select([.status.conditions[]? | select(.type=="Conflict" and .status=="True")] | length > 0) | .metadata.name]') + local conflict_count + conflict_count=$(echo "$conflicted_exports" | jq 'length') + + if (( conflict_count > 0 )); then + local conflict_list + conflict_list=$(echo "$conflicted_exports" | jq -r 'join(", ")') + record_failure "$cluster" "ServiceExports have conflicts: $conflict_list" + else + record_success "$cluster" "No ServiceExport conflicts detected" + fi +} + +main() { + require_command az + require_command jq + require_command kubectl + + log "Discovering fleet member clusters in resource group $RESOURCE_GROUP" + local discovery_output + if ! discovery_output=$(az aks list -g "$RESOURCE_GROUP" -o json 2>&1); then + echo "Error: unable to list AKS clusters - $discovery_output" >&2 + exit 1 + fi + + readarray -t CLUSTER_ARRAY < <(echo "$discovery_output" | jq -r --arg prefix "$CLUSTER_SELECTOR_PREFIX" '.[] | select(.name | startswith($prefix)) | .name' | sort -u) + if (( ${#CLUSTER_ARRAY[@]} == 0 )); then + echo "Error: no member clusters found with prefix '$CLUSTER_SELECTOR_PREFIX' in resource group $RESOURCE_GROUP" >&2 + exit 1 + fi + + log "Found ${#CLUSTER_ARRAY[@]} member cluster(s):" + for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + done + + local hub_cluster="" + for cluster in "${CLUSTER_ARRAY[@]}"; do + if [[ "$cluster" == *"$HUB_REGION"* ]]; then + hub_cluster="$cluster" + break + fi + done + + if [[ -z "$hub_cluster" ]]; then + echo "Error: unable to find hub cluster matching region substring '$HUB_REGION'" >&2 + exit 1 + fi + + log "Using hub cluster context: $hub_cluster" + + local documentdb_json + if ! documentdb_json=$(kubectl --context "$hub_cluster" get documentdb "$DOCUMENTDB_NAME" -n "$DOCUMENTDB_NAMESPACE" -o json 2>&1); then + echo "Error: unable to fetch DocumentDB $DOCUMENTDB_NAME from hub cluster: $documentdb_json" >&2 + exit 1 + fi + + EXPECTED_CLUSTER_NAMES_JSON=$(echo "$documentdb_json" | jq '[.spec.clusterReplication.clusterList[]? | .name] | map(select(. != null))') + EXPECTED_CLUSTER_COUNT=$(echo "$EXPECTED_CLUSTER_NAMES_JSON" | jq 'length') + readarray -t DOCUMENTDB_CLUSTER_NAMES < <(echo "$EXPECTED_CLUSTER_NAMES_JSON" | jq -r '.[]') + + if (( EXPECTED_CLUSTER_COUNT == 0 )); then + echo "Error: DocumentDB $DOCUMENTDB_NAME has an empty clusterReplication.clusterList" >&2 + exit 1 + fi + + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + DOCUMENTDB_CLUSTER_SET["$name"]=1 + done + + EXPECTED_CNPG_NAMES_JSON="[]" + for cluster in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + local cnpg_name + if ! cnpg_name=$(get_cnpg_name_for_cluster "$cluster"); then + record_failure "$cluster" "Unable to determine CNPG cluster name for DocumentDB $DOCUMENTDB_NAME" + continue + fi + EXPECTED_CNPG_NAMES_JSON=$(jq --arg name "$cnpg_name" '. + [$name]' <<<"$EXPECTED_CNPG_NAMES_JSON") + done + + log "DocumentDB $DOCUMENTDB_NAME expects $EXPECTED_CLUSTER_COUNT cluster(s):" + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + echo " - $name" + done + + log "Resolved CNPG cluster names:" + echo "$EXPECTED_CNPG_NAMES_JSON" | jq -r '.[]' | sed 's/^/ - /' + + for name in "${DOCUMENTDB_CLUSTER_NAMES[@]}"; do + local match_found="false" + for cluster in "${CLUSTER_ARRAY[@]}"; do + if [[ "$cluster" == "$name" ]]; then + match_found="true" + break + fi + done + if [[ "$match_found" == "false" ]]; then + record_failure "$hub_cluster" "DocumentDB references cluster '$name' that was not discovered in resource group $RESOURCE_GROUP" + fi + done + + # Check MultiClusterServices on the hub cluster + echo + check_multiclusterservices "$hub_cluster" + + for cluster in "${CLUSTER_ARRAY[@]}"; do + echo + if [[ -n "${DOCUMENTDB_CLUSTER_SET[$cluster]:-}" ]]; then + log "Checking DocumentDB member cluster: $cluster" + check_member_cluster "$cluster" + check_serviceexports "$cluster" + else + log "Checking non-member cluster: $cluster" + check_non_member_cluster "$cluster" + fi + done + + echo + if (( OVERALL_STATUS == 0 )); then + log "All checks passed across ${#CLUSTER_ARRAY[@]} cluster(s)." + else + log "Completed with ${#FAILURE_MESSAGES[@]} issue(s):" + for msg in "${FAILURE_MESSAGES[@]}"; do + echo " - $msg" + done + fi + + exit $OVERALL_STATUS +} + +main "$@" diff --git a/documentdb-playground/fleet-add-region/deploy-four-region.sh b/documentdb-playground/fleet-add-region/deploy-four-region.sh new file mode 100755 index 00000000..7d8c41ac --- /dev/null +++ b/documentdb-playground/fleet-add-region/deploy-four-region.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +export MEMBER_REGIONS="westus3,uksouth,eastus2,westus2" +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-add-region-test-rg}" +SCRIPT_DIR="$(dirname "$0")" + +# Deploy the AKS fleet with four regions and install cert-manager on all +$SCRIPT_DIR/../aks-fleet-deployment/deploy-fleet-bicep.sh +$SCRIPT_DIR/../aks-fleet-deployment/install-cert-manager.sh +$SCRIPT_DIR/../aks-fleet-deployment/install-documentdb-operator.sh diff --git a/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml new file mode 100644 index 00000000..63afdf0c --- /dev/null +++ b/documentdb-playground/fleet-add-region/documentdb-resource-crp.yaml @@ -0,0 +1,87 @@ +# Namespace definition +apiVersion: v1 +kind: Namespace +metadata: + name: documentdb-preview-ns + +--- + +apiVersion: v1 +kind: Secret +metadata: + name: documentdb-credentials + namespace: documentdb-preview-ns +type: Opaque +stringData: + username: default_user + password: {{DOCUMENTDB_PASSWORD}} + +--- + +apiVersion: documentdb.io/preview +kind: DocumentDB +metadata: + name: documentdb-preview + namespace: documentdb-preview-ns +spec: + nodeCount: 1 + instancesPerNode: 1 + resource: + storage: + pvcSize: 10Gi + environment: aks + clusterReplication: + highAvailability: true + crossCloudNetworkingStrategy: AzureFleet + primary: {{PRIMARY_CLUSTER}} + clusterList: +{{CLUSTER_LIST}} + exposeViaService: + serviceType: LoadBalancer + logLevel: info + +--- + +apiVersion: placement.kubernetes-fleet.io/v1beta1 +kind: ClusterResourcePlacement +metadata: + name: documentdb-namespace-crp +spec: + resourceSelectors: + - group: "" + version: v1 + kind: Namespace + name: documentdb-preview-ns + selectionScope: NamespaceOnly # only namespace itself is placed, no resources within the namespace + policy: + placementType: PickAll + strategy: + type: RollingUpdate + +--- +# Application team manages resources using RP +apiVersion: placement.kubernetes-fleet.io/v1beta1 +kind: ResourcePlacement +metadata: + name: documentdb-resource-rp + namespace: documentdb-preview-ns +spec: + resourceSelectors: + - group: documentdb.io + kind: DocumentDB + version: preview + name: documentdb-preview + - group: "" + version: v1 + kind: Secret + name: documentdb-credentials + policy: + placementType: PickFixed + clusterNames: +{{CLUSTER_LIST_CRP}} + strategy: + type: RollingUpdate + applyStrategy: + type: ServerSideApply + serverSideApplyConfig: + force: true diff --git a/documentdb-playground/fleet-add-region/documentdb-three-region.sh b/documentdb-playground/fleet-add-region/documentdb-three-region.sh new file mode 100755 index 00000000..cd6ade9b --- /dev/null +++ b/documentdb-playground/fleet-add-region/documentdb-three-region.sh @@ -0,0 +1,164 @@ +#!/bin/bash +set -euo pipefail + +# Environment variables: +# RESOURCE_GROUP: Azure resource group (default: documentdb-aks-fleet-rg) +# DOCUMENTDB_PASSWORD: Database password (will be generated if not provided) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" + +# Set password from argument or environment variable +DOCUMENTDB_PASSWORD="${1:-${DOCUMENTDB_PASSWORD:-}}" + +# If no password provided, generate a secure one +if [ -z "$DOCUMENTDB_PASSWORD" ]; then + echo "No password provided. Generating a secure password..." + DOCUMENTDB_PASSWORD=$(openssl rand -base64 32 | tr -d "=+/" | cut -c1-25) + echo "Generated password: $DOCUMENTDB_PASSWORD" + echo "(Save this password - you'll need it to connect to the database)" + echo "" +fi + +# Export for envsubst +export DOCUMENTDB_PASSWORD + +# Dynamically get member clusters from Azure +echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." +MEMBER_CLUSTERS=$(az aks list -g "$RESOURCE_GROUP" -o json | jq -r '.[] | select(.name|startswith("member-")) | .name' | sort) + +if [ -z "$MEMBER_CLUSTERS" ]; then + echo "Error: No member clusters found in resource group $RESOURCE_GROUP" + echo "Please ensure the fleet is deployed first" + exit 1 +fi + +# Convert to array +CLUSTER_ARRAY=($MEMBER_CLUSTERS) +echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi +done + +# Select primary cluster and excluded cluster +PRIMARY_CLUSTER="${CLUSTER_ARRAY[0]}" +EXCLUDE_CLUSTER="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [[ "$cluster" == *"eastus2"* ]]; then + PRIMARY_CLUSTER="$cluster" + fi + if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then + EXCLUDE_CLUSTER="$cluster" + fi +done + +echo "" +echo "Selected primary cluster: $PRIMARY_CLUSTER" + +# Build the cluster list YAML with proper indentation +CLUSTER_LIST="" +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then + echo "Excluding cluster $cluster from DocumentDB configuration" + continue + fi + if [ -z "$CLUSTER_LIST" ]; then + CLUSTER_LIST=" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP=" - ${cluster}" + else + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" + fi +done + +# Step 1: Create cluster identification ConfigMaps on each member cluster +echo "" +echo "=======================================" +echo "Creating cluster identification ConfigMaps..." +echo "=======================================" + +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo "" + echo "Processing ConfigMap for $cluster..." + + # Check if context exists + if ! kubectl config get-contexts "$cluster" &>/dev/null; then + echo "✗ Context $cluster not found, skipping" + continue + fi + + # Create or update the cluster-name ConfigMap + kubectl --context "$cluster" create configmap cluster-name \ + -n kube-system \ + --from-literal=name="$cluster" \ + --dry-run=client -o yaml | kubectl --context "$cluster" apply -f - + + # Verify the ConfigMap was created + if kubectl --context "$cluster" get configmap cluster-name -n kube-system &>/dev/null; then + echo "✓ ConfigMap created/updated for $cluster" + else + echo "✗ Failed to create ConfigMap for $cluster" + fi +done + +# Step 2: Deploy DocumentDB resources via Fleet +echo "" +echo "=======================================" +echo "Deploying DocumentDB multi-region configuration..." +echo "=======================================" + +# Determine hub context +HUB_CLUSTER="${HUB_CLUSTER:-hub}" +if ! kubectl config get-contexts "$HUB_CLUSTER" &>/dev/null; then + echo "Error: Hub context not found. Please ensure you have credentials for the fleet." + exit 1 +fi + +echo "Using hub context: $HUB_CLUSTER" + +# Create a temporary file with substituted values +TEMP_YAML=$(mktemp) + +# Use sed for safer substitution +sed -e "s/{{DOCUMENTDB_PASSWORD}}/$DOCUMENTDB_PASSWORD/g" \ + -e "s/{{PRIMARY_CLUSTER}}/$PRIMARY_CLUSTER/g" \ + "$SCRIPT_DIR/documentdb-resource-crp.yaml" | \ +while IFS= read -r line; do + if [[ "$line" == '{{CLUSTER_LIST}}' ]]; then + echo "$CLUSTER_LIST" + elif [[ "$line" == '{{CLUSTER_LIST_CRP}}' ]]; then + echo "$CLUSTER_LIST_CRP" + else + echo "$line" + fi +done > "$TEMP_YAML" + +# Debug: show the generated YAML section with clusterReplication +echo "" +echo "Generated configuration preview:" +echo "--------------------------------" +echo "Primary cluster: $PRIMARY_CLUSTER" +echo "Cluster list:" +echo "$CLUSTER_LIST" +echo "--------------------------------" + +# Apply the configuration +echo "" +echo "Applying DocumentDB multi-region configuration..." +kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" + +# Clean up temp file +rm -f "$TEMP_YAML" + +echo "" +echo "Connection Information:" +echo " Username: default_user" +echo " Password: $DOCUMENTDB_PASSWORD" +echo "" diff --git a/documentdb-playground/fleet-add-region/remove-region.sh b/documentdb-playground/fleet-add-region/remove-region.sh new file mode 100755 index 00000000..64931716 --- /dev/null +++ b/documentdb-playground/fleet-add-region/remove-region.sh @@ -0,0 +1,103 @@ +#!/bin/bash + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +RESOURCE_GROUP="${RESOURCE_GROUP:-documentdb-aks-fleet-rg}" +HUB_REGION="${HUB_REGION:-westus3}" +PRIMARY_REGION="${PRIMARY_REGION:-eastus2}" +EXCLUDE_REGION="${EXCLUDE_REGION:-westus2}" + +# Dynamically get member clusters from Azure +echo "Discovering member clusters in resource group: $RESOURCE_GROUP..." +MEMBER_CLUSTERS=$(az aks list -g "$RESOURCE_GROUP" -o json | jq -r '.[] | select(.name|startswith("member-")) | .name' | sort) + +if [ -z "$MEMBER_CLUSTERS" ]; then + echo "Error: No member clusters found in resource group $RESOURCE_GROUP" + echo "Please ensure the fleet is deployed first" + exit 1 +fi + +CLUSTER_ARRAY=($MEMBER_CLUSTERS) +echo "Found ${#CLUSTER_ARRAY[@]} member clusters:" +for cluster in "${CLUSTER_ARRAY[@]}"; do + echo " - $cluster" + if [[ "$cluster" == *"$HUB_REGION"* ]]; then HUB_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$EXCLUDE_REGION"* ]]; then EXCLUDE_CLUSTER="$cluster"; fi + if [[ "$cluster" == *"$PRIMARY_REGION"* ]]; then PRIMARY_CLUSTER="$cluster"; fi +done + +# Build the cluster list YAML with proper indentation +CLUSTER_LIST="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ "$cluster" == "$EXCLUDE_CLUSTER" ]; then + echo "Excluding cluster $cluster from DocumentDB configuration" + continue + fi + if [ -z "$CLUSTER_LIST" ]; then + CLUSTER_LIST=" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + else + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" - name: ${cluster}" + CLUSTER_LIST="${CLUSTER_LIST}"$'\n'" environment: aks" + fi +done + +CLUSTER_LIST_CRP="" +for cluster in "${CLUSTER_ARRAY[@]}"; do + if [ -z "$CLUSTER_LIST_CRP" ]; then + CLUSTER_LIST_CRP=" - ${cluster}" + else + CLUSTER_LIST_CRP="${CLUSTER_LIST_CRP}"$'\n'" - ${cluster}" + fi +done + +TEMP_YAML=$(mktemp) + +# Use sed for safer substitution +sed -e "s/{{DOCUMENTDB_PASSWORD}}/$DOCUMENTDB_PASSWORD/g" \ + -e "s/{{PRIMARY_CLUSTER}}/$PRIMARY_CLUSTER/g" \ + "$SCRIPT_DIR/documentdb-resource-crp.yaml" | \ +while IFS= read -r line; do + if [[ "$line" == '{{CLUSTER_LIST}}' ]]; then + echo "$CLUSTER_LIST" + elif [[ "$line" == '{{CLUSTER_LIST_CRP}}' ]]; then + echo "$CLUSTER_LIST_CRP" + else + echo "$line" + fi +done > "$TEMP_YAML" + +echo "" +echo "Applying DocumentDB multi-region configuration..." + +MAX_RETRIES=60 +RETRY_INTERVAL=3 +RETRY_COUNT=0 + +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do + kubectl --context "$HUB_CLUSTER" apply -f "$TEMP_YAML" &> /dev/null + + echo "Checking if $EXCLUDE_CLUSTER has been removed from clusterReplication on the excluded cluster..." + + # Get the clusterReplication.clusters field from the DocumentDB object on the excluded cluster + CLUSTER_LIST_JSON=$(kubectl --context "$EXCLUDE_CLUSTER" get documentdb documentdb-preview -n documentdb-preview-ns -o jsonpath='{.spec.clusterReplication.clusterList[*].name}' 2>/dev/null) + + if ! echo "$CLUSTER_LIST_JSON" | grep -q "$EXCLUDE_CLUSTER"; then + echo "Success: $EXCLUDE_CLUSTER is now excluded from clusterReplication field" + break + fi + + RETRY_COUNT=$((RETRY_COUNT + 1)) + echo "Cluster still in clusterReplication (attempt $RETRY_COUNT/$MAX_RETRIES). Retrying in ${RETRY_INTERVAL}s..." + sleep $RETRY_INTERVAL +done + +if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then + echo "Error: Timed out waiting for $EXCLUDE_CLUSTER to be removed from clusterReplication" + exit 1 +fi + +rm -f "$TEMP_YAML" +echo "Done." diff --git a/operator/documentdb-helm-chart/templates/05_clusterrole.yaml b/operator/documentdb-helm-chart/templates/05_clusterrole.yaml index 54e69af2..1d92f649 100644 --- a/operator/documentdb-helm-chart/templates/05_clusterrole.yaml +++ b/operator/documentdb-helm-chart/templates/05_clusterrole.yaml @@ -25,7 +25,7 @@ rules: resources: ["leases"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: ["networking.fleet.azure.com"] # fleet permissions for multi-cluster services - resources: ["serviceexports", "multiclusterservices"] + resources: ["serviceexports", "multiclusterservices", "serviceimports", "internalserviceexports"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: [""] resources: ["secrets"] diff --git a/operator/src/internal/controller/documentdb_controller.go b/operator/src/internal/controller/documentdb_controller.go index c0e9b126..eb4b99e3 100644 --- a/operator/src/internal/controller/documentdb_controller.go +++ b/operator/src/internal/controller/documentdb_controller.go @@ -82,6 +82,17 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + if replicationContext.IsNotPresent() { + logger.Info("DocumentDB instance is not part of the replication setup; skipping reconciliation and deleting any present resources") + if err := r.cleanupResources(ctx, req); err != nil { + return ctrl.Result{}, err + } + if err := util.DeleteOwnedResources(ctx, r.Client, documentdb.ObjectMeta); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + var documentDbServiceIp string // Only create/manage the service if ExposeViaService is configured @@ -257,6 +268,28 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } + // Check for fleet-networking issues and attempt to remediate + if replicationContext.IsAzureFleetNetworking() { + deleted, imports, err := r.CleanupMismatchedServiceImports(ctx, documentdb.Namespace, replicationContext) + if err != nil { + log.Log.Error(err, "Failed to cleanup ServiceImports") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + if deleted { + log.Log.Info("Deleted mismatched ServiceImports; requeuing to allow for proper recreation") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + reconciled, err := r.ForceReconcileInternalServiceExports(ctx, documentdb.Namespace, replicationContext, imports) + if err != nil { + log.Log.Error(err, "Failed to force reconcile InternalServiceExports") + return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil + } + if reconciled { + log.Log.Info("Annotated InternalServiceExports for reconciliation; requeuing to allow fleet-networking to recreate ServiceImports") + return ctrl.Result{RequeueAfter: RequeueAfterLong}, nil + } + } + // Don't reque again unless there is a change return ctrl.Result{}, nil } diff --git a/operator/src/internal/controller/physical_replication.go b/operator/src/internal/controller/physical_replication.go index 0ef99b83..ea045c03 100644 --- a/operator/src/internal/controller/physical_replication.go +++ b/operator/src/internal/controller/physical_replication.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "slices" + "strings" "time" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" @@ -21,10 +22,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) +const ( + demotionTokenPollInterval = 5 * time.Second + demotionTokenWaitTimeout = 10 * time.Minute +) + func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( ctx context.Context, documentdb *dbpreview.DocumentDB, @@ -44,13 +51,13 @@ func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( } // No more errors possible, so we can safely edit the spec - cnpgCluster.Name = replicationContext.Self + cnpgCluster.Name = replicationContext.CNPGClusterName if !replicationContext.IsPrimary() { cnpgCluster.Spec.InheritedMetadata.Labels[util.LABEL_REPLICATION_CLUSTER_TYPE] = "replica" cnpgCluster.Spec.Bootstrap = &cnpgv1.BootstrapConfiguration{ PgBaseBackup: &cnpgv1.BootstrapPgBaseBackup{ - Source: replicationContext.PrimaryCluster, + Source: replicationContext.PrimaryCNPGClusterName, Database: "postgres", Owner: "postgres", }, @@ -93,8 +100,8 @@ func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( cnpgCluster.Spec.ReplicaCluster = &cnpgv1.ReplicaClusterConfiguration{ Source: replicationContext.GetReplicationSource(), - Primary: replicationContext.PrimaryCluster, - Self: replicationContext.Self, + Primary: replicationContext.PrimaryCNPGClusterName, + Self: replicationContext.CNPGClusterName, } if replicationContext.IsAzureFleetNetworking() { @@ -116,10 +123,10 @@ func (r *DocumentDBReconciler) AddClusterReplicationToClusterSpec( }) } } - selfHost := replicationContext.Self + "-rw." + documentdb.Namespace + ".svc" + selfHost := replicationContext.CNPGClusterName + "-rw." + documentdb.Namespace + ".svc" cnpgCluster.Spec.ExternalClusters = []cnpgv1.ExternalCluster{ { - Name: replicationContext.Self, + Name: replicationContext.CNPGClusterName, ConnectionParameters: map[string]string{ "host": selfHost, "port": "5432", @@ -147,7 +154,7 @@ func (r *DocumentDBReconciler) CreateIstioRemoteServices(ctx context.Context, re // Create dummy -rw services for remote clusters so DNS resolution works // These services have non-matching selectors, so they have no local endpoints // Istio will automatically route traffic through the east-west gateway - for _, remoteCluster := range replicationContext.Others { + for _, remoteCluster := range replicationContext.OtherCNPGClusterNames { // Create the -rw (read-write/primary) service for each remote cluster serviceNameRW := remoteCluster + "-rw" foundServiceRW := &corev1.Service{} @@ -196,37 +203,95 @@ func (r *DocumentDBReconciler) CreateIstioRemoteServices(ctx context.Context, re } func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, replicationContext *util.ReplicationContext, documentdb *dbpreview.DocumentDB) error { - for serviceName := range replicationContext.GenerateOutgoingServiceNames(documentdb.Name, documentdb.Namespace) { - foundServiceExport := &fleetv1alpha1.ServiceExport{} - err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: documentdb.Namespace}, foundServiceExport) - if err != nil && errors.IsNotFound(err) { - log.Log.Info("Service Export not found. Creating a new Service Export " + serviceName) + labels := map[string]string{ + util.LABEL_DOCUMENTDB_NAME: documentdb.Name, + } + + // List all existing ServiceExports in the namespace + existingServiceExports := &fleetv1alpha1.ServiceExportList{} + if err := r.Client.List(ctx, existingServiceExports, client.InNamespace(documentdb.Namespace), client.MatchingLabels(labels)); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("failed to list ServiceExports: %w", err) + } + } + + // Build a map of existing ServiceExports that are tagged for this DocumentDB + existingExports := make(map[string]*fleetv1alpha1.ServiceExport) + for i := range existingServiceExports.Items { + export := &existingServiceExports.Items[i] + existingExports[export.Name] = export + } - // Service Export + for serviceName := range replicationContext.GenerateOutgoingServiceNames(documentdb.Name, documentdb.Namespace) { + _, exists := existingExports[serviceName] + if !exists { ringServiceExport := &fleetv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: documentdb.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), + }, + }, }, } - err = r.Create(ctx, ringServiceExport) - if err != nil { + if err := r.Create(ctx, ringServiceExport); err != nil && !errors.IsAlreadyExists(err) { return err } + } else { // if exists then we don't want to remove it + delete(existingExports, serviceName) + } + } + + // If it's still in the existingExports map, it means it's no longer needed and should be deleted + for serviceName, export := range existingExports { + if err := r.Client.Delete(ctx, export); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete ServiceExport %s: %w", serviceName, err) + } + } + + // List all existing MultiClusterServices in the namespace + existingMCSList := &fleetv1alpha1.MultiClusterServiceList{} + if err := r.Client.List(ctx, existingMCSList, client.InNamespace(documentdb.Namespace), client.MatchingLabels(labels)); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("failed to list MultiClusterServices: %w", err) } } - // Below is true because this function is only called if we are fleet enabled + // Build a map of existing MultiClusterServices that are tagged for this DocumentDB + existingMCS := make(map[string]*fleetv1alpha1.MultiClusterService) + for i := range existingMCSList.Items { + mcs := &existingMCSList.Items[i] + existingMCS[mcs.Name] = mcs + } + + // Below is valid because this function is only called if fleet-networking is enabled for sourceServiceName := range replicationContext.GenerateIncomingServiceNames(documentdb.Name, documentdb.Namespace) { - foundMCS := &fleetv1alpha1.MultiClusterService{} - err := r.Get(ctx, types.NamespacedName{Name: sourceServiceName, Namespace: documentdb.Namespace}, foundMCS) - if err != nil && errors.IsNotFound(err) { - log.Log.Info("Multi Cluster Service not found. Creating a new Multi Cluster Service") - // Multi Cluster Service - foundMCS = &fleetv1alpha1.MultiClusterService{ + _, exists := existingMCS[sourceServiceName] + if !exists { + // Multi Cluster Service with owner reference to ensure cleanup + newMCS := &fleetv1alpha1.MultiClusterService{ ObjectMeta: metav1.ObjectMeta{ Name: sourceServiceName, Namespace: documentdb.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), + }, + }, }, Spec: fleetv1alpha1.MultiClusterServiceSpec{ ServiceImport: fleetv1alpha1.ServiceImportRef{ @@ -234,10 +299,18 @@ func (r *DocumentDBReconciler) CreateServiceImportAndExport(ctx context.Context, }, }, } - err = r.Create(ctx, foundMCS) - if err != nil { + if err := r.Create(ctx, newMCS); err != nil && !errors.IsAlreadyExists(err) { return err } + } else { // if exists then we don't want to remove it + delete(existingMCS, sourceServiceName) + } + } + + // If it's still in the existingMCS map, it means it's no longer needed and should be deleted + for serviceName, mcs := range existingMCS { + if err := r.Client.Delete(ctx, mcs); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete MultiClusterService %s: %w", serviceName, err) } } @@ -250,14 +323,6 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de return nil, -1 } - // Update the primary if it has changed - primaryChanged := current.Spec.ReplicaCluster.Primary != desired.Spec.ReplicaCluster.Primary - - tokenNeedsUpdate, err := r.PromotionTokenNeedsUpdate(ctx, current.Namespace) - if err != nil { - return err, time.Second * 10 - } - if current.Spec.ReplicaCluster.Self != desired.Spec.ReplicaCluster.Self { return fmt.Errorf("self cannot be changed"), time.Second * 60 } @@ -265,10 +330,40 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de // Create JSON patch operations for all replica cluster updates var patchOps []util.JSONPatch - if tokenNeedsUpdate || primaryChanged && current.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { + // Update if the primary has changed + primaryChanged := current.Spec.ReplicaCluster.Primary != desired.Spec.ReplicaCluster.Primary + if primaryChanged { + err, refreshTime := r.getPrimaryChangePatchOps(ctx, &patchOps, current, desired, documentdb, replicationContext) + if refreshTime > 0 || err != nil { + return err, refreshTime + } + } + + // Update if the cluster list has changed + replicasChanged := externalClusterNamesChanged(current.Spec.ExternalClusters, desired.Spec.ExternalClusters) + if replicasChanged { + getReplicasChangePatchOps(&patchOps, desired, replicationContext) + } + + if len(patchOps) > 0 { + patch, err := json.Marshal(patchOps) + if err != nil { + return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 + } + err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) + if err != nil { + return err, time.Second * 10 + } + } + + return nil, -1 +} + +func (r *DocumentDBReconciler) getPrimaryChangePatchOps(ctx context.Context, patchOps *[]util.JSONPatch, current, desired *cnpgv1.Cluster, documentdb *dbpreview.DocumentDB, replicationContext *util.ReplicationContext) (error, time.Duration) { + if current.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { // Primary => replica // demote - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICA_CLUSTER, Value: desired.Spec.ReplicaCluster, @@ -279,61 +374,50 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de // Only add remove operation if synchronous field exists, otherwise there's an error // TODO this wouldn't be true if our "wait for token" logic wasn't reliant on a failure if current.Spec.PostgresConfiguration.Synchronous != nil { - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REMOVE, Path: util.JSON_PATCH_PATH_POSTGRES_CONFIG_SYNC, }) } - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_INSTANCES, Value: desired.Spec.Instances, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_PLUGINS, Value: desired.Spec.Plugins, }) } - patch, err := json.Marshal(patchOps) - if err != nil { - return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 - } - - log.Log.Info("Applying patch for Primary => Replica transition", "patch", string(patch), "cluster", current.Name) + log.Log.Info("Applying patch for Primary => Replica transition", "cluster", current.Name) - err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) - if err != nil { - return err, time.Second * 10 - } + // push out the promotion token when it's available + nn := types.NamespacedName{Name: current.Name, Namespace: current.Namespace} + go r.waitForDemotionTokenAndCreateService(nn, replicationContext) - // push out the promotion token - err = r.CreateTokenService(ctx, current.Status.DemotionToken, documentdb.Namespace, replicationContext) - if err != nil { - return err, time.Second * 10 - } - } else if primaryChanged && desired.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { + } else if desired.Spec.ReplicaCluster.Primary == current.Spec.ReplicaCluster.Self { // Replica => primary // Look for the token if this is a managed failover oldPrimaryAvailable := slices.Contains( - replicationContext.Others, + replicationContext.OtherCNPGClusterNames, current.Spec.ReplicaCluster.Primary) replicaClusterConfig := desired.Spec.ReplicaCluster // If the old primary is available, we can read the token from it if oldPrimaryAvailable { - token, err, refreshTime := r.ReadToken(ctx, documentdb.Namespace, replicationContext) + token, err, refreshTime := r.ReadToken(ctx, documentdb, replicationContext) if err != nil || refreshTime > 0 { return err, refreshTime } - log.Log.Info("Token read successfully", "token", token) + log.Log.Info("Token read successfully") // Update the configuration with the token replicaClusterConfig.PromotionToken = token } - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICA_CLUSTER, Value: replicaClusterConfig, @@ -341,65 +425,91 @@ func (r *DocumentDBReconciler) TryUpdateCluster(ctx context.Context, current, de if documentdb.Spec.ClusterReplication.HighAvailability { // need to add second instance and wal replica - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_POSTGRES_CONFIG, Value: desired.Spec.PostgresConfiguration, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_INSTANCES, Value: desired.Spec.Instances, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_PLUGINS, Value: desired.Spec.Plugins, }) - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICATION_SLOTS, Value: desired.Spec.ReplicationSlots, }) } - - patch, err := json.Marshal(patchOps) - if err != nil { - return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 - } - - log.Log.Info("Applying patch for Replica => Primary transition", "patch", string(patch), "cluster", current.Name, "hasToken", replicaClusterConfig.PromotionToken != "") - - err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) - if err != nil { - return err, time.Second * 10 - } - } else if primaryChanged { + log.Log.Info("Applying patch for Replica => Primary transition", "cluster", current.Name, "hasToken", replicaClusterConfig.PromotionToken != "") + } else { // Replica => replica - patchOps = append(patchOps, util.JSONPatch{ + *patchOps = append(*patchOps, util.JSONPatch{ Op: util.JSON_PATCH_OP_REPLACE, Path: util.JSON_PATCH_PATH_REPLICA_CLUSTER, Value: desired.Spec.ReplicaCluster, }) - patch, err := json.Marshal(patchOps) - if err != nil { - return fmt.Errorf("failed to marshal patch operations: %w", err), time.Second * 10 - } + log.Log.Info("Applying patch for Replica => Replica transition", "cluster", current.Name) + } - log.Log.Info("Applying patch for Replica => Replica transition", "patch", string(patch), "cluster", current.Name) + return nil, -1 +} - err = r.Client.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch)) - if err != nil { - return err, time.Second * 10 +func externalClusterNamesChanged(currentClusters, desiredClusters []cnpgv1.ExternalCluster) bool { + if len(currentClusters) != len(desiredClusters) { + return true + } + + if len(currentClusters) == 0 { + return false + } + + nameSet := make(map[string]bool, len(currentClusters)) + for _, cluster := range currentClusters { + nameSet[cluster.Name] = true + } + + for _, cluster := range desiredClusters { + if found := nameSet[cluster.Name]; !found { + return true } + delete(nameSet, cluster.Name) } - return nil, -1 + return len(nameSet) != 0 +} + +func getReplicasChangePatchOps(patchOps *[]util.JSONPatch, desired *cnpgv1.Cluster, replicationContext *util.ReplicationContext) { + *patchOps = append(*patchOps, util.JSONPatch{ + Op: util.JSON_PATCH_OP_REPLACE, + Path: util.JSON_PATCH_PATH_EXTERNAL_CLUSTERS, + Value: desired.Spec.ExternalClusters, + }) + if replicationContext.IsAzureFleetNetworking() { + *patchOps = append(*patchOps, util.JSONPatch{ + Op: util.JSON_PATCH_OP_REPLACE, + Path: util.JSON_PATCH_PATH_MANAGED_SERVICES, + Value: desired.Spec.Managed.Services.Additional, + }) + } + if replicationContext.IsPrimary() { + *patchOps = append(*patchOps, util.JSONPatch{ + Op: util.JSON_PATCH_OP_REPLACE, + Path: util.JSON_PATCH_PATH_SYNCHRONOUS, + Value: desired.Spec.PostgresConfiguration.Synchronous, + }) + } } -func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, replicationContext *util.ReplicationContext) (string, error, time.Duration) { +func (r *DocumentDBReconciler) ReadToken(ctx context.Context, documentdb *dbpreview.DocumentDB, replicationContext *util.ReplicationContext) (string, error, time.Duration) { tokenServiceName := "promotion-token" + namespace := documentdb.Namespace // If we are not using cross-cloud networking, we only need to read the token from the configmap if !replicationContext.IsAzureFleetNetworking() && !replicationContext.IsIstioNetworking() { @@ -476,6 +586,16 @@ func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: documentdb.APIVersion, + Kind: documentdb.Kind, + Name: documentdb.Name, + UID: documentdb.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), + }, + }, }, Spec: fleetv1alpha1.MultiClusterServiceSpec{ ServiceImport: fleetv1alpha1.ServiceImportRef{ @@ -506,24 +626,157 @@ func (r *DocumentDBReconciler) ReadToken(ctx context.Context, namespace string, return string(token[:]), nil, -1 } -// TODO make this not have to check the configmap twice -// RETURN true if we have a configmap with a blank token -func (r *DocumentDBReconciler) PromotionTokenNeedsUpdate(ctx context.Context, namespace string) (bool, error) { - tokenServiceName := "promotion-token" - configMap := &corev1.ConfigMap{} - err := r.Get(ctx, types.NamespacedName{Name: tokenServiceName, Namespace: namespace}, configMap) - if err != nil { - // If we don't find the map, we don't need to update +func (r *DocumentDBReconciler) waitForDemotionTokenAndCreateService(clusterNN types.NamespacedName, replicationContext *util.ReplicationContext) { + ctx := context.Background() + ticker := time.NewTicker(demotionTokenPollInterval) + timeout := time.NewTimer(demotionTokenWaitTimeout) + defer ticker.Stop() + defer timeout.Stop() + + for { + select { + case <-ticker.C: + done, err := r.ensureTokenServiceResources(ctx, clusterNN, replicationContext) + if err != nil { + log.Log.Error(err, "Failed to create token service resources", "cluster", clusterNN.Name) + } + if done { + return + } + case <-timeout.C: + log.Log.Info("Timed out waiting for demotion token", "cluster", clusterNN.Name, "timeout", demotionTokenWaitTimeout) + return + } + } +} + +// CleanupMismatchedServiceImports finds and removes ServiceImports that have no ownerReferences +// and are marked as "in-use-by" the current cluster. +// RETURNS: Whether or not a deletion occurred, and error if any error occurs during the process +// +// There is currently an incompatibility when you use fleet-networking with a cluster that +// is both a hub and a member. The ServiceImport that is generated on the hub will sometimes +// be interpreted as a member-side ServiceImport and attach itself to the export, thus preventing +// the intended MCS from attaching to it. This function finds those offending ServiceImports and +// removes them. +func (r *DocumentDBReconciler) CleanupMismatchedServiceImports(ctx context.Context, namespace string, replicationContext *util.ReplicationContext) (bool, *fleetv1alpha1.ServiceImportList, error) { + deleted := false + + // List all ServiceImports in the namespace + serviceImportList := &fleetv1alpha1.ServiceImportList{} + if err := r.Client.List(ctx, serviceImportList, client.InNamespace(namespace)); err != nil { + // If the CRD doesn't exist, skip cleanup if errors.IsNotFound(err) { - return false, nil + return deleted, nil, nil + } + return deleted, nil, fmt.Errorf("failed to list ServiceImports: %w", err) + } + + for i := range serviceImportList.Items { + badServiceImport := &serviceImportList.Items[i] + // If it has an OwnerReference, then it is properly being used by the cluster's MCS + if len(badServiceImport.OwnerReferences) > 0 { + continue + } + + annotations := badServiceImport.GetAnnotations() + if annotations == nil { + continue + } + + inUseBy, exists := annotations[util.FLEET_IN_USE_BY_ANNOTATION] + // If it has its own name as the cluster name, then it has erroneously attached itself to the export + if !exists || !containsClusterName(inUseBy, replicationContext.FleetMemberName) { + continue + } + + if err := r.Client.Delete(ctx, badServiceImport); err != nil && !errors.IsNotFound(err) { + log.Log.Error(err, "Failed to delete ServiceImport", "name", badServiceImport.Name) + continue + } + deleted = true + } + + return deleted, serviceImportList, nil +} + +// ForceReconcileInternalServiceExports finds InternalServiceExports that don't have a matching +// ServiceImport with proper owner references in the target namespace, and annotates them to +// trigger reconciliation so the fleet-networking controller can recreate the ServiceImports properly. +// Returns whether any InternalServiceExports were annotated for reconciliation, and error if any occurs. +func (r *DocumentDBReconciler) ForceReconcileInternalServiceExports(ctx context.Context, namespace string, replicationContext *util.ReplicationContext, imports *fleetv1alpha1.ServiceImportList) (bool, error) { + reconciled := false + + // Extract all serviceImport names for easy lookup + serviceImportNames := make(map[string]bool) + for i := range imports.Items { + serviceImportNames[imports.Items[i].Name] = true + } + + for fleetMemberName := range replicationContext.GenerateFleetMemberNames() { + // List all InternalServiceExports in each fleet member namespace + fleetMemberNamespace := "fleet-member-" + fleetMemberName + iseList := &fleetv1alpha1.InternalServiceExportList{} + if err := r.Client.List(ctx, iseList, client.InNamespace(fleetMemberNamespace)); err != nil { + // If the CRD doesn't exist or namespace doesn't exist, skip + if errors.IsNotFound(err) { + continue + } + return reconciled, fmt.Errorf("failed to list InternalServiceExports: %w", err) + } + + // Check each InternalServiceExport for a matching ServiceImport + for i := range iseList.Items { + ise := &iseList.Items[i] + + // ISE name format is: - + // Extract the service name by removing the namespace prefix + prefix := namespace + "-" + if !strings.HasPrefix(ise.Name, prefix) { + continue + } + serviceName := strings.TrimPrefix(ise.Name, prefix) + + // Check if there's a valid ServiceImport for this ISE + if serviceImportNames[serviceName] { + continue + } + + // Add reconcile annotation with current timestamp to trigger reconciliation + if ise.Annotations == nil { + ise.Annotations = make(map[string]string) + } + ise.Annotations["reconcile"] = fmt.Sprintf("%d", time.Now().Unix()) + + if err := r.Client.Update(ctx, ise); err != nil { + log.Log.Error(err, "Failed to annotate InternalServiceExport", "name", ise.Name, "namespace", fleetMemberNamespace) + continue + } + + reconciled = true } - return false, err } - // Otherwise, we need to update if the value is blank - return configMap.Data["index.html"] == "", nil + return reconciled, nil } -func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token string, namespace string, replicationContext *util.ReplicationContext) error { +// containsClusterName checks if the inUseBy string contains the cluster name +func containsClusterName(inUseBy, clusterName string) bool { + // The annotation value typically contains the cluster name + return strings.Contains(inUseBy, clusterName) +} + +// Returns true when token service resources are ready +func (r *DocumentDBReconciler) ensureTokenServiceResources(ctx context.Context, clusterNN types.NamespacedName, replicationContext *util.ReplicationContext) (bool, error) { + cluster := &cnpgv1.Cluster{} + if err := r.Client.Get(ctx, clusterNN, cluster); err != nil { + return false, err + } + + token := cluster.Status.DemotionToken + if token == "" { + return false, nil + } + tokenServiceName := "promotion-token" labels := map[string]string{ "app": tokenServiceName, @@ -533,7 +786,7 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str configMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, }, Data: map[string]string{ "index.html": token, @@ -546,27 +799,23 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str configMap.Data["index.html"] = token err = r.Client.Update(ctx, configMap) if err != nil { - return fmt.Errorf("failed to update token ConfigMap: %w", err) + return false, fmt.Errorf("failed to update token ConfigMap: %w", err) } } else { - return fmt.Errorf("failed to create token ConfigMap: %w", err) + return false, fmt.Errorf("failed to create token ConfigMap: %w", err) } } - if token == "" { - return fmt.Errorf("No token found yet") - } - // When not using cross-cloud networking, just transfer with the configmap if !replicationContext.IsAzureFleetNetworking() && !replicationContext.IsIstioNetworking() { - return nil + return true, nil } // Create nginx Pod pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, Labels: labels, }, Spec: corev1.PodSpec{ @@ -605,14 +854,14 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str err = r.Client.Create(ctx, pod) if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create nginx Pod: %w", err) + return false, fmt.Errorf("failed to create nginx Pod: %w", err) } // Create Service service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, Labels: labels, }, Spec: corev1.ServiceSpec{ @@ -629,7 +878,7 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str err = r.Client.Create(ctx, service) if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create Service: %w", err) + return false, fmt.Errorf("failed to create Service: %w", err) } // Create ServiceExport only for fleet networking @@ -637,15 +886,25 @@ func (r *DocumentDBReconciler) CreateTokenService(ctx context.Context, token str serviceExport := &fleetv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Name: tokenServiceName, - Namespace: namespace, + Namespace: clusterNN.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: cluster.APIVersion, + Kind: cluster.Kind, + Name: cluster.Name, + UID: cluster.UID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), + }, + }, }, } err = r.Client.Create(ctx, serviceExport) if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create ServiceExport: %w", err) + return false, fmt.Errorf("failed to create ServiceExport: %w", err) } } - return nil + return true, nil } diff --git a/operator/src/internal/controller/physical_replication_test.go b/operator/src/internal/controller/physical_replication_test.go new file mode 100644 index 00000000..03903aff --- /dev/null +++ b/operator/src/internal/controller/physical_replication_test.go @@ -0,0 +1,174 @@ +package controller + +import ( + "context" + "time" + + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + dbpreview "github.com/documentdb/documentdb-operator/api/preview" + util "github.com/documentdb/documentdb-operator/internal/utils" +) + +func buildDocumentDBReconciler(objs ...runtime.Object) *DocumentDBReconciler { + scheme := runtime.NewScheme() + Expect(dbpreview.AddToScheme(scheme)).To(Succeed()) + Expect(cnpgv1.AddToScheme(scheme)).To(Succeed()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + Expect(rbacv1.AddToScheme(scheme)).To(Succeed()) + + builder := fake.NewClientBuilder().WithScheme(scheme) + if len(objs) > 0 { + builder = builder.WithRuntimeObjects(objs...) + clientObjs := make([]client.Object, 0, len(objs)) + for _, obj := range objs { + if co, ok := obj.(client.Object); ok { + clientObjs = append(clientObjs, co) + } + } + if len(clientObjs) > 0 { + builder = builder.WithStatusSubresource(clientObjs...) + } + } + + return &DocumentDBReconciler{Client: builder.Build(), Scheme: scheme} +} + +var _ = Describe("Physical Replication", func() { + It("deletes owned resources when DocumentDB is not present", func() { + ctx := context.Background() + namespace := "default" + + documentdb := baseDocumentDB("docdb-not-present", namespace) + documentdb.UID = types.UID("docdb-not-present-uid") + documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ + CrossCloudNetworkingStrategy: string(util.AzureFleet), + Primary: "member-2", + ClusterList: []dbpreview.MemberCluster{ + {Name: "member-2"}, + {Name: "member-3"}, + }, + } + + ownerRef := metav1.OwnerReference{ + APIVersion: "documentdb.io/preview", + Kind: "DocumentDB", + Name: documentdb.Name, + UID: documentdb.UID, + } + + ownedService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "owned-service", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ownerRef}, + }, + } + + ownedCluster := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "owned-cnpg", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ownerRef}, + }, + } + + clusterNameConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-name", + Namespace: "kube-system", + }, + Data: map[string]string{ + "name": "member-1", + }, + } + + reconciler := buildDocumentDBReconciler(documentdb, ownedService, ownedCluster, clusterNameConfigMap) + + result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: documentdb.Name, Namespace: namespace}}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + + service := &corev1.Service{} + err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedService.Name, Namespace: namespace}, service) + Expect(errors.IsNotFound(err)).To(BeTrue()) + + cluster := &cnpgv1.Cluster{} + err = reconciler.Client.Get(ctx, types.NamespacedName{Name: ownedCluster.Name, Namespace: namespace}, cluster) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) + + It("updates external clusters and synchronous config", func() { + ctx := context.Background() + namespace := "default" + + documentdb := baseDocumentDB("docdb-repl", namespace) + documentdb.Spec.ClusterReplication = &dbpreview.ClusterReplication{ + CrossCloudNetworkingStrategy: string(util.None), + Primary: documentdb.Name, + ClusterList: []dbpreview.MemberCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + }, + } + + current := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "docdb-repl", + Namespace: namespace, + }, + Spec: cnpgv1.ClusterSpec{ + ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{ + Self: documentdb.Name, + Primary: documentdb.Name, + Source: documentdb.Name, + }, + ExternalClusters: []cnpgv1.ExternalCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + }, + PostgresConfiguration: cnpgv1.PostgresConfiguration{ + Synchronous: &cnpgv1.SynchronousReplicaConfiguration{ + Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, + Number: 1, + }, + }, + }, + } + + desired := current.DeepCopy() + desired.Spec.ExternalClusters = []cnpgv1.ExternalCluster{ + {Name: documentdb.Name}, + {Name: "member-2"}, + {Name: "member-3"}, + } + desired.Spec.PostgresConfiguration.Synchronous = &cnpgv1.SynchronousReplicaConfiguration{ + Method: cnpgv1.SynchronousReplicaConfigurationMethodAny, + Number: 2, + } + + reconciler := buildDocumentDBReconciler(current) + replicationContext, err := util.GetReplicationContext(ctx, reconciler.Client, *documentdb) + Expect(err).ToNot(HaveOccurred()) + + err, requeue := reconciler.TryUpdateCluster(ctx, current, desired, documentdb, replicationContext) + Expect(err).ToNot(HaveOccurred()) + Expect(requeue).To(Equal(time.Duration(-1))) + + updated := &cnpgv1.Cluster{} + Expect(reconciler.Client.Get(ctx, types.NamespacedName{Name: current.Name, Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Spec.ExternalClusters).To(HaveLen(3)) + Expect(updated.Spec.PostgresConfiguration.Synchronous.Number).To(Equal(2)) + }) +}) diff --git a/operator/src/internal/utils/constants.go b/operator/src/internal/utils/constants.go index 5481cb9b..92873d7a 100644 --- a/operator/src/internal/utils/constants.go +++ b/operator/src/internal/utils/constants.go @@ -24,6 +24,9 @@ const ( LABEL_NODE_INDEX = "node_index" LABEL_SERVICE_TYPE = "service_type" LABEL_REPLICATION_CLUSTER_TYPE = "replication_cluster_type" + LABEL_DOCUMENTDB_NAME = "documentdb.io/name" + LABEL_DOCUMENTDB_COMPONENT = "documentdb.io/component" + FLEET_IN_USE_BY_ANNOTATION = "networking.fleet.azure.com/service-in-use-by" DOCUMENTDB_SERVICE_PREFIX = "documentdb-service-" @@ -42,6 +45,9 @@ const ( JSON_PATCH_PATH_INSTANCES = "/spec/instances" JSON_PATCH_PATH_PLUGINS = "/spec/plugins" JSON_PATCH_PATH_REPLICATION_SLOTS = "/spec/replicationSlots" + JSON_PATCH_PATH_EXTERNAL_CLUSTERS = "/spec/externalClusters" + JSON_PATCH_PATH_MANAGED_SERVICES = "/spec/managed/services/additional" + JSON_PATCH_PATH_SYNCHRONOUS = "/spec/postgresql/synchronous" // JSON Patch operations JSON_PATCH_OP_REPLACE = "replace" diff --git a/operator/src/internal/utils/replication_context.go b/operator/src/internal/utils/replication_context.go index 66c3ff47..a3383eda 100644 --- a/operator/src/internal/utils/replication_context.go +++ b/operator/src/internal/utils/replication_context.go @@ -15,12 +15,14 @@ import ( ) type ReplicationContext struct { - Self string - Others []string - PrimaryCluster string + CNPGClusterName string + OtherCNPGClusterNames []string + PrimaryCNPGClusterName string CrossCloudNetworkingStrategy crossCloudNetworkingStrategy Environment string StorageClass string + FleetMemberName string + OtherFleetMemberNames []string currentLocalPrimary string targetLocalPrimary string state replicationState @@ -40,6 +42,7 @@ const ( NoReplication replicationState = iota Primary Replica + NotPresent ) func GetReplicationContext(ctx context.Context, client client.Client, documentdb dbpreview.DocumentDB) (*ReplicationContext, error) { @@ -48,7 +51,7 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb CrossCloudNetworkingStrategy: None, Environment: documentdb.Spec.Environment, StorageClass: documentdb.Spec.Resource.Storage.StorageClass, - Self: documentdb.Name, + CNPGClusterName: documentdb.Name, } if documentdb.Spec.ClusterReplication == nil { return &singleClusterReplicationContext, nil @@ -59,6 +62,19 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb return nil, err } + // If self is nil, then this cluster is not part of the replication setup + // This edge case can happen when the Hub cluster is also a member, and we are not + // putting the documentdb instance on it + if self == nil { + return &ReplicationContext{ + state: NotPresent, + CrossCloudNetworkingStrategy: None, + Environment: "", + StorageClass: "", + CNPGClusterName: "", + }, nil + } + // If no remote clusters, then just proceed with a regular cluster if len(others) == 0 { return &singleClusterReplicationContext, nil @@ -66,6 +82,11 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb primaryCluster := generateCNPGClusterName(documentdb.Name, documentdb.Spec.ClusterReplication.Primary) + otherCNPGClusterNames := make([]string, len(others)) + for i, other := range others { + otherCNPGClusterNames[i] = generateCNPGClusterName(documentdb.Name, other) + } + storageClass := documentdb.Spec.Resource.Storage.StorageClass if self.StorageClassOverride != "" { storageClass = self.StorageClassOverride @@ -76,13 +97,15 @@ func GetReplicationContext(ctx context.Context, client client.Client, documentdb } return &ReplicationContext{ - Self: self.Name, - Others: others, + CNPGClusterName: generateCNPGClusterName(documentdb.Name, self.Name), + OtherCNPGClusterNames: otherCNPGClusterNames, CrossCloudNetworkingStrategy: crossCloudNetworkingStrategy(documentdb.Spec.ClusterReplication.CrossCloudNetworkingStrategy), - PrimaryCluster: primaryCluster, + PrimaryCNPGClusterName: primaryCluster, Environment: environment, StorageClass: storageClass, state: replicationState, + FleetMemberName: self.Name, + OtherFleetMemberNames: others, targetLocalPrimary: documentdb.Status.TargetPrimary, currentLocalPrimary: documentdb.Status.LocalPrimary, }, nil @@ -98,10 +121,12 @@ func (r ReplicationContext) String() string { stateStr = "Primary" case Replica: stateStr = "Replica" + case NotPresent: + stateStr = "NotPresent" } - return fmt.Sprintf("ReplicationContext{Self: %s, State: %s, Others: %v, PrimaryRegion: %s, CurrentLocalPrimary: %s, TargetLocalPrimary: %s}", - r.Self, stateStr, r.Others, r.PrimaryCluster, r.currentLocalPrimary, r.targetLocalPrimary) + return fmt.Sprintf("ReplicationContext{CNPGClusterName: %s, State: %s, Others: %v, PrimaryRegion: %s, CurrentLocalPrimary: %s, TargetLocalPrimary: %s}", + r.CNPGClusterName, stateStr, r.OtherCNPGClusterNames, r.PrimaryCNPGClusterName, r.currentLocalPrimary, r.targetLocalPrimary) } // Returns true if this instance is the primary or if there is no replication configured. @@ -113,12 +138,16 @@ func (r *ReplicationContext) IsReplicating() bool { return r.state == Replica || r.state == Primary } +func (r *ReplicationContext) IsNotPresent() bool { + return r.state == NotPresent +} + // Gets the primary if you're a replica, otherwise returns the first other cluster func (r ReplicationContext) GetReplicationSource() string { if r.state == Replica { - return r.PrimaryCluster + return r.PrimaryCNPGClusterName } - return r.Others[0] + return r.OtherCNPGClusterNames[0] } // EndpointEnabled returns true if the endpoint should be enabled for this DocumentDB instance. @@ -133,10 +162,10 @@ func (r ReplicationContext) EndpointEnabled() bool { func (r ReplicationContext) GenerateExternalClusterServices(name, namespace string, fleetEnabled bool) func(yield func(string, string) bool) { return func(yield func(string, string) bool) { - for _, other := range r.Others { + for _, other := range r.OtherCNPGClusterNames { serviceName := other + "-rw." + namespace + ".svc" if fleetEnabled { - serviceName = namespace + "-" + generateServiceName(name, other, r.Self, namespace) + ".fleet-system.svc" + serviceName = namespace + "-" + generateServiceName(name, other, r.CNPGClusterName, namespace) + ".fleet-system.svc" } if !yield(other, serviceName) { @@ -149,8 +178,8 @@ func (r ReplicationContext) GenerateExternalClusterServices(name, namespace stri // Create an iterator that yields outgoing service names, for use in a for each loop func (r ReplicationContext) GenerateIncomingServiceNames(name, resourceGroup string) func(yield func(string) bool) { return func(yield func(string) bool) { - for _, other := range r.Others { - serviceName := generateServiceName(name, other, r.Self, resourceGroup) + for _, other := range r.OtherCNPGClusterNames { + serviceName := generateServiceName(name, other, r.CNPGClusterName, resourceGroup) if !yield(serviceName) { break } @@ -161,8 +190,8 @@ func (r ReplicationContext) GenerateIncomingServiceNames(name, resourceGroup str // Create an iterator that yields outgoing service names, for use in a for each loop func (r ReplicationContext) GenerateOutgoingServiceNames(name, resourceGroup string) func(yield func(string) bool) { return func(yield func(string) bool) { - for _, other := range r.Others { - serviceName := generateServiceName(name, r.Self, other, resourceGroup) + for _, other := range r.OtherCNPGClusterNames { + serviceName := generateServiceName(name, r.CNPGClusterName, other, resourceGroup) if !yield(serviceName) { break } @@ -170,11 +199,24 @@ func (r ReplicationContext) GenerateOutgoingServiceNames(name, resourceGroup str } } +func (r ReplicationContext) GenerateFleetMemberNames() func(yield func(string) bool) { + return func(yield func(string) bool) { + for _, other := range r.OtherFleetMemberNames { + if !yield(other) { + return + } + } + if !yield(r.FleetMemberName) { + return + } + } +} + // Creates the standby names list, which will be all other clusters in addition to "pg_receivewal" func (r *ReplicationContext) CreateStandbyNamesList() []string { - standbyNames := make([]string, len(r.Others)+1) - copy(standbyNames, r.Others) - /* TODO re-enable when we have a WAL replica image + standbyNames := make([]string, len(r.OtherCNPGClusterNames)) + copy(standbyNames, r.OtherCNPGClusterNames) + /* TODO re-enable when we have a WAL replica image (also add one to length) standbyNames[len(r.Others)] = "pg_receivewal" */ return standbyNames @@ -185,7 +227,7 @@ func getTopology(ctx context.Context, client client.Client, documentdb dbpreview var err error if documentdb.Spec.ClusterReplication.CrossCloudNetworkingStrategy != string(None) { - memberClusterName, err = GetSelfName(ctx, client) + memberClusterName, err = GetFleetMemberName(ctx, client) if err != nil { return nil, nil, NoReplication, err } @@ -197,19 +239,18 @@ func getTopology(ctx context.Context, client client.Client, documentdb dbpreview } others := []string{} - var self dbpreview.MemberCluster + var self *dbpreview.MemberCluster for _, c := range documentdb.Spec.ClusterReplication.ClusterList { if c.Name != memberClusterName { - others = append(others, generateCNPGClusterName(documentdb.Name, c.Name)) + others = append(others, c.Name) } else { - self = c + self = c.DeepCopy() } } - self.Name = generateCNPGClusterName(documentdb.Name, self.Name) - return &self, others, state, nil + return self, others, state, nil } -func GetSelfName(ctx context.Context, client client.Client) (string, error) { +func GetFleetMemberName(ctx context.Context, client client.Client) (string, error) { clusterMapName := "cluster-name" clusterNameConfigMap := &corev1.ConfigMap{} err := client.Get(ctx, types.NamespacedName{Name: clusterMapName, Namespace: "kube-system"}, clusterNameConfigMap) @@ -217,11 +258,11 @@ func GetSelfName(ctx context.Context, client client.Client) (string, error) { return "", err } - self := clusterNameConfigMap.Data["name"] - if self == "" { + memberName := clusterNameConfigMap.Data["name"] + if memberName == "" { return "", fmt.Errorf("name key not found in kube-system:cluster-name configmap") } - return self, nil + return memberName, nil } func (r *ReplicationContext) IsAzureFleetNetworking() bool { diff --git a/operator/src/internal/utils/util.go b/operator/src/internal/utils/util.go index fe71412c..89fd34ba 100644 --- a/operator/src/internal/utils/util.go +++ b/operator/src/internal/utils/util.go @@ -10,11 +10,16 @@ import ( "strconv" "time" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + fleetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -313,6 +318,100 @@ func DeleteRoleBinding(ctx context.Context, c client.Client, name, namespace str return nil } +func DeleteOwnedResources(ctx context.Context, c client.Client, owner metav1.ObjectMeta) error { + log := log.FromContext(ctx) + + hasOwnerReference := func(refs []metav1.OwnerReference) bool { + for _, ref := range refs { + if ref.UID == owner.UID && ref.Name == owner.Name { + return true + } + } + return false + } + + listInNamespace := client.InNamespace(owner.Namespace) + var errList []error + + var serviceList corev1.ServiceList + if err := c.List(ctx, &serviceList, listInNamespace); err != nil { + return fmt.Errorf("failed to list services: %w", err) + } + for i := range serviceList.Items { + svc := &serviceList.Items[i] + if hasOwnerReference(svc.OwnerReferences) { + if err := c.Delete(ctx, svc); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned Service", "name", svc.Name, "namespace", svc.Namespace) + errList = append(errList, fmt.Errorf("service %s/%s: %w", svc.Namespace, svc.Name, err)) + } + } + } + + var clusterList cnpgv1.ClusterList + if err := c.List(ctx, &clusterList, listInNamespace); err != nil { + return fmt.Errorf("failed to list CNPG clusters: %w", err) + } + for i := range clusterList.Items { + cluster := &clusterList.Items[i] + if hasOwnerReference(cluster.OwnerReferences) { + if err := c.Delete(ctx, cluster); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned CNPG Cluster", "name", cluster.Name, "namespace", cluster.Namespace) + errList = append(errList, fmt.Errorf("cnpg cluster %s/%s: %w", cluster.Namespace, cluster.Name, err)) + } + } + } + + var mcsList fleetv1alpha1.MultiClusterServiceList + if err := c.List(ctx, &mcsList, listInNamespace); err != nil && !errors.IsNotFound(err) { + // Ignore if CRD doesn't exist + if !isCRDMissing(err) { + return fmt.Errorf("failed to list MultiClusterServices: %w", err) + } + } else { + for i := range mcsList.Items { + mcs := &mcsList.Items[i] + if hasOwnerReference(mcs.OwnerReferences) { + if err := c.Delete(ctx, mcs); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned MultiClusterService", "name", mcs.Name, "namespace", mcs.Namespace) + errList = append(errList, fmt.Errorf("multiclusterservice %s/%s: %w", mcs.Namespace, mcs.Name, err)) + } + } + } + } + + var serviceExportList fleetv1alpha1.ServiceExportList + if err := c.List(ctx, &serviceExportList, listInNamespace); err != nil && !errors.IsNotFound(err) { + // Ignore if CRD doesn't exist + if !isCRDMissing(err) { + return fmt.Errorf("failed to list ServiceExports: %w", err) + } + } else { + for i := range serviceExportList.Items { + se := &serviceExportList.Items[i] + if hasOwnerReference(se.OwnerReferences) { + if err := c.Delete(ctx, se); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete owned ServiceExport", "name", se.Name, "namespace", se.Namespace) + errList = append(errList, fmt.Errorf("serviceexport %s/%s: %w", se.Namespace, se.Name, err)) + } + } + } + } + + if len(errList) > 0 { + return utilerrors.NewAggregate(errList) + } + return nil +} + +// isCRDMissing checks if the error is a "no kind match" error, which occurs when +// a CRD is not installed in the cluster +func isCRDMissing(err error) bool { + if err == nil { + return false + } + return meta.IsNoMatchError(err) || runtime.IsNotRegisteredError(err) +} + // GenerateConnectionString returns a MongoDB connection string for the DocumentDB instance. // When trustTLS is true, tlsAllowInvalidCertificates is omitted for strict verification. func GenerateConnectionString(documentdb *dbpreview.DocumentDB, serviceIp string, trustTLS bool) string { diff --git a/operator/src/internal/utils/util_test.go b/operator/src/internal/utils/util_test.go index 5fcb1fec..96210715 100644 --- a/operator/src/internal/utils/util_test.go +++ b/operator/src/internal/utils/util_test.go @@ -335,9 +335,9 @@ func TestGetDocumentDBServiceDefinition_CNPGLabels(t *testing.T) { // Create a mock ReplicationContext replicationContext := &ReplicationContext{ - Self: tt.documentDBName, - Environment: "test", - state: NoReplication, // This will make EndpointEnabled() return true + CNPGClusterName: tt.documentDBName, + Environment: "test", + state: NoReplication, // This will make EndpointEnabled() return true } // If endpoint should be disabled, set a different state