From a91a20bdfd7352179139009f45c1d6ba9defe5d3 Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Mon, 5 Jan 2026 16:58:50 +0500 Subject: [PATCH 1/2] Fix Z0DAN sync origin advancement to use slot creation LSN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When adding a third node in a Z0DAN sync scenario, the origin advancement logic in spock.check_commit_timestamp_and_advance_slot was using spock.lag_tracker to retrieve commit timestamps and convert them back to LSNs. This approach no longer works because spock.progress is now an in-memory HTAB instead of a catalog table, so lag_tracker doesn't retain historical data after the SYNC process COPY operation. Root Cause: The procedure spock.create_disable_subscriptions_and_slots creates logical slots on existing nodes (e.g., n2) when adding a new node (n3). In v5, the commit LSN/timestamp from the source node (n1) was copied to n3 via lag_tracker during SYNC, and spock.check_commit_timestamp_and_advance_slot would use this to advance the origin. With the HTAB-based progress table, this data is no longer available after COPY. The Fix: 1. Capture the LSN returned by pg_create_logical_replication_slot in create_disable_subscriptions_and_slots and store it in temp_sync_lsns 2. Use this LSN directly in check_commit_timestamp_and_advance_slot to advance the origin, eliminating the dependency on lag_tracker and the timestamp→LSN conversion This approach is more reliable because it uses the authoritative LSN from slot creation - the exact point where the apply/sync process will begin decoding when the subscription is enabled. --- samples/Z0DAN/zodan.sql | 55 ++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index 93439c30..ef8b0fbc 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -1195,6 +1195,7 @@ DECLARE dbname text; slot_name text; sub_name text; + _commit_lsn pg_lsn; BEGIN RAISE NOTICE 'Phase 3: Creating disabled subscriptions and slots'; @@ -1204,7 +1205,8 @@ BEGIN -- Create temporary table to store sync LSNs CREATE TEMP TABLE IF NOT EXISTS temp_sync_lsns ( origin_node text PRIMARY KEY, - sync_lsn text NOT NULL + sync_lsn text NOT NULL, + commit_lsn pg_lsn ); -- Check if there are any "other" nodes (not source, not new) @@ -1276,8 +1278,11 @@ BEGIN RAISE NOTICE ' Remote SQL for slot creation: %', remotesql; END IF; - PERFORM * FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn); - RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name, 120, ' '); + SELECT lsn INTO _commit_lsn + FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn); + UPDATE temp_sync_lsns SET commit_lsn = _commit_lsn + WHERE origin_node = rec.node_name; + RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' (LSN: ' || _commit_lsn || ')' || ' on node ' || rec.node_name, 120, ' '); EXCEPTION WHEN OTHERS THEN RAISE NOTICE ' ✗ %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); @@ -1662,7 +1667,7 @@ CREATE OR REPLACE PROCEDURE spock.check_commit_timestamp_and_advance_slot( ) LANGUAGE plpgsql AS $$ DECLARE rec RECORD; - start_point record; + commit_lsn pg_lsn; slot_name text; dbname text; remotesql text; @@ -1677,27 +1682,23 @@ BEGIN -- Multi-node scenario: check commit timestamp for "other" nodes to new node FOR rec IN SELECT * FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name LOOP - -- Check commit timestamp for lag from "other" node to new node BEGIN - remotesql := format('SELECT commit_lsn, commit_timestamp - FROM spock.lag_tracker - WHERE origin_name = %L AND receiver_name = %L', - rec.node_name, new_node_name); - IF verb THEN - RAISE NOTICE ' Remote SQL for commit timestamp check: %', remotesql; - END IF; - - SELECT * FROM dblink(new_node_dsn, remotesql) AS t(lsn pg_lsn, ts timestamp) INTO start_point; - - IF start_point.ts IS NOT NULL THEN - RAISE NOTICE ' OK: %', rpad('Found commit timestamp for ' || rec.node_name || '->' || new_node_name || ': ' || start_point.ts, 120, ' '); - ELSE - RAISE NOTICE ' - %', rpad('No commit timestamp found for ' || rec.node_name || '->' || new_node_name, 120, ' '); - CONTINUE; + IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'temp_sync_lsns' AND relpersistence = 't') THEN + -- Get the stored sync LSN from when subscription was created + SELECT tsl.commit_lsn INTO commit_lsn + FROM temp_sync_lsns tsl + WHERE tsl.origin_node = rec.node_name; + + IF commit_lsn IS NOT NULL THEN + RAISE NOTICE ' OK: %', rpad('Found commit LSN for ' || rec.node_name || ' (LSN: ' || commit_lsn || ')...', 120, ' '); + ELSE + RAISE NOTICE ' - %', rpad('No commit LSN found for ' || rec.node_name || '->' || new_node_name, 120, ' '); + CONTINUE; + END IF; END IF; EXCEPTION WHEN OTHERS THEN - RAISE NOTICE ' ✗ %', rpad('Checking commit timestamp for ' || rec.node_name || '->' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); + RAISE NOTICE ' ✗ %', rpad('Checking commit LSN for ' || rec.node_name || '->' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); CONTINUE; END; @@ -1727,6 +1728,7 @@ BEGIN DECLARE current_lsn pg_lsn; + target_lsn pg_lsn; BEGIN SELECT * FROM dblink(rec.dsn, remotesql) AS t(lsn pg_lsn) INTO current_lsn; @@ -1735,23 +1737,24 @@ BEGIN CONTINUE; END IF; - IF start_point.lsn IS NULL OR start_point.lsn <= current_lsn THEN - RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, start_point.lsn; + target_lsn := commit_lsn; + IF target_lsn IS NULL OR target_lsn <= current_lsn THEN + RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, target_lsn; CONTINUE; END IF; -- Advance the slot - remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', slot_name, start_point.lsn); + remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', slot_name, target_lsn); IF verb THEN RAISE NOTICE ' Remote SQL for slot advancement: %', remotesql; END IF; PERFORM * FROM dblink(rec.dsn, remotesql) AS t(result text); - RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || start_point.lsn, 120, ' '); + RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || target_lsn, 120, ' '); END; EXCEPTION WHEN OTHERS THEN - RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to timestamp ' || start_point.ts || ' (error: ' || SQLERRM || ')', 120, ' '); + RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to LSN ' || commit_lsn || ' (error: ' || SQLERRM || ')', 120, ' '); -- Continue with other nodes even if this one fails END; END LOOP; From 49d81343d63dd58ddbfb59b4b6feb8350f0e4eae Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Wed, 7 Jan 2026 15:04:35 +0500 Subject: [PATCH 2/2] Port Z0DAN sync LSN fix to Python script Apply the same fix from commit 86acd7b to zodan.py: - Use LSN from pg_create_logical_replication_slot() - Advance slot to stored commit LSN instead of querying lag_tracker - Store both sync_lsn and commit_lsn for later use --- samples/Z0DAN/zodan.py | 93 ++++++++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/samples/Z0DAN/zodan.py b/samples/Z0DAN/zodan.py index 40605aab..f6ab72e2 100755 --- a/samples/Z0DAN/zodan.py +++ b/samples/Z0DAN/zodan.py @@ -399,8 +399,8 @@ def create_sub(self, node_dsn: str, subscription_name: str, provider_dsn: str, if self.verbose: self.info(f"Subscription {subscription_name} created remotely") - def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = "spock_output"): - """Create a logical replication slot on a remote node""" + def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = "spock_output") -> Optional[str]: + """Create a logical replication slot on a remote node and return the LSN""" # Check if slot already exists sql = f"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '{slot_name}';" count = self.run_psql(node_dsn, sql, fetch=True, return_single=True) @@ -408,20 +408,24 @@ def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = " if count and int(count.strip()) > 0: if self.verbose: self.info(f"Replication slot '{slot_name}' already exists. Skipping creation.") - return - - sql = f"SELECT slot_name, lsn FROM pg_create_logical_replication_slot('{slot_name}', '{plugin}');" - + return None + + sql = f"SELECT lsn FROM pg_create_logical_replication_slot('{slot_name}', '{plugin}');" + if self.verbose: self.info(f"[QUERY] {sql}") - - result = self.run_psql(node_dsn, sql) + + # Fetch the result to get the LSN + result = self.run_psql(node_dsn, sql, fetch=True, return_single=True) if result is None: if self.verbose: self.info(f"Replication slot '{slot_name}' may already exist or creation failed.") + return None else: + lsn = result.strip() if self.verbose: - self.info(f"Created replication slot '{slot_name}' with plugin '{plugin}' on remote node.") + self.info(f"Created replication slot '{slot_name}' with plugin '{plugin}' on remote node (LSN: {lsn}).") + return lsn def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: str, new_node_name: str, new_node_dsn: str): @@ -438,7 +442,7 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st if rec['node_name'] == src_node_name: continue - # Create replication slot + # Create replication slot and capture the commit LSN dbname = "pgedge" # Default database name if "dbname=" in rec['dsn']: dbname = rec['dsn'].split("dbname=")[1].split()[0] @@ -447,12 +451,17 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st if len(slot_name) > 64: slot_name = slot_name[:64] - self.create_replication_slot(rec['dsn'], slot_name) - self.notice(f" OK: Creating replication slot {slot_name} on node {rec['node_name']}") + commit_lsn = self.create_replication_slot(rec['dsn'], slot_name) + self.notice(f" OK: Creating replication slot {slot_name} (LSN: {commit_lsn}) on node {rec['node_name']}") # Trigger sync event on origin node and store LSN for later use sync_lsn = self.sync_event(rec['dsn']) - self.sync_lsns[rec['node_name']] = sync_lsn + + # Store both sync_lsn and commit_lsn + self.sync_lsns[rec['node_name']] = { + 'sync_lsn': sync_lsn, + 'commit_lsn': commit_lsn + } self.notice(f" OK: Triggering sync event on node {rec['node_name']} (LSN: {sync_lsn})") # Create disabled subscription @@ -576,42 +585,38 @@ def get_commit_timestamp(self, node_dsn: str, origin: str, receiver: str) -> str result = self.run_psql(node_dsn, sql, fetch=True, return_single=True) return result - def advance_replication_slot(self, node_dsn: str, slot_name: str, sync_timestamp: str): - """Advance a replication slot to a specific timestamp""" - if not sync_timestamp: + def advance_replication_slot(self, node_dsn: str, slot_name: str, target_lsn: str): + """Advance a replication slot to a specific LSN""" + if not target_lsn: if self.verbose: - self.info(f"Commit timestamp is NULL, skipping slot advance for slot '{slot_name}'.") + self.info(f"Target LSN is NULL, skipping slot advance for slot '{slot_name}'.") return - - sql = f""" - WITH lsn_cte AS ( - SELECT spock.get_lsn_from_commit_ts('{slot_name}', '{sync_timestamp}') AS lsn - ) - SELECT pg_replication_slot_advance('{slot_name}', lsn) FROM lsn_cte; - """ - + + sql = f"SELECT pg_replication_slot_advance('{slot_name}', '{target_lsn}'::pg_lsn);" + if self.verbose: self.info(f"[QUERY] {sql}") self.run_psql(node_dsn, sql) - def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: str, + def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: str, new_node_name: str, new_node_dsn: str): - """Phase 7: Check commit timestamp and advance replication slot""" - self.notice("Phase 7: Checking commit timestamp and advancing replication slot") - + """Phase 7: Check commit LSN and advance replication slot""" + self.notice("Phase 7: Checking commit LSN and advancing replication slot") + # Get all nodes from source cluster nodes = self.get_spock_nodes(src_dsn) for rec in nodes: if rec['node_name'] == src_node_name: continue - - # Get commit timestamp - sync_timestamp = self.get_commit_timestamp(new_node_dsn, src_node_name, rec['node_name']) - if sync_timestamp: - self.notice(f" OK: Found commit timestamp for {src_node_name}->{rec['node_name']}: {sync_timestamp}") - + + # Get the stored commit LSN from when subscription was created + commit_lsn = self.sync_lsns[rec['node_name']]['commit_lsn'] + + if commit_lsn: + self.notice(f" OK: Found commit LSN for {rec['node_name']} (LSN: {commit_lsn})...") + # Advance replication slot dbname = "pgedge" if "dbname=" in rec['dsn']: @@ -625,18 +630,15 @@ def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: s self.info(f"[QUERY] {sql}") current_lsn = self.run_psql(rec['dsn'], sql, fetch=True, return_single=True) - - # Get target LSN - sql = f"SELECT spock.get_lsn_from_commit_ts('{slot_name}', '{sync_timestamp}')" - if self.verbose: - self.info(f"[QUERY] {sql}") - - target_lsn = self.run_psql(rec['dsn'], sql, fetch=True, return_single=True) - + + target_lsn = commit_lsn + if current_lsn and target_lsn and current_lsn >= target_lsn: self.notice(f" - Slot {slot_name} already at or beyond target LSN (current: {current_lsn}, target: {target_lsn})") else: - self.advance_replication_slot(rec['dsn'], slot_name, sync_timestamp) + self.advance_replication_slot(rec['dsn'], slot_name, target_lsn) + else: + self.notice(f" - No commit LSN found for {rec['node_name']}->{new_node_name}") def enable_sub(self, node_dsn: str, sub_name: str, immediate: bool = True): """Enable a subscription on a remote node""" @@ -747,7 +749,8 @@ def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str, # Wait for the sync event that was captured when subscription was created # This ensures the subscription starts replicating from the correct sync point timeout_ms = 1200 # 20 minutes - sync_lsn = self.sync_lsns.get(rec['node_name']) # Use stored sync LSN from Phase 3 + sync_lsn = self.sync_lsns[rec['node_name']]['sync_lsn'] # Use stored sync LSN from Phase 3 + if sync_lsn: self.notice(f" OK: Using stored sync event from origin node {rec['node_name']} (LSN: {sync_lsn})...")