Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions samples/Z0DAN/zodan.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,29 +399,33 @@ def create_sub(self, node_dsn: str, subscription_name: str, provider_dsn: str,
if self.verbose:
self.info(f"Subscription {subscription_name} created remotely")

def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = "spock_output"):
"""Create a logical replication slot on a remote node"""
def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = "spock_output") -> Optional[str]:
"""Create a logical replication slot on a remote node and return the LSN"""
# Check if slot already exists
sql = f"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '{slot_name}';"
count = self.run_psql(node_dsn, sql, fetch=True, return_single=True)

if count and int(count.strip()) > 0:
if self.verbose:
self.info(f"Replication slot '{slot_name}' already exists. Skipping creation.")
return
sql = f"SELECT slot_name, lsn FROM pg_create_logical_replication_slot('{slot_name}', '{plugin}');"
return None

sql = f"SELECT lsn FROM pg_create_logical_replication_slot('{slot_name}', '{plugin}');"

if self.verbose:
self.info(f"[QUERY] {sql}")

result = self.run_psql(node_dsn, sql)

# Fetch the result to get the LSN
result = self.run_psql(node_dsn, sql, fetch=True, return_single=True)
if result is None:
if self.verbose:
self.info(f"Replication slot '{slot_name}' may already exist or creation failed.")
return None
else:
lsn = result.strip()
if self.verbose:
self.info(f"Created replication slot '{slot_name}' with plugin '{plugin}' on remote node.")
self.info(f"Created replication slot '{slot_name}' with plugin '{plugin}' on remote node (LSN: {lsn}).")
return lsn

def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: str,
new_node_name: str, new_node_dsn: str):
Expand All @@ -438,7 +442,7 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st
if rec['node_name'] == src_node_name:
continue

# Create replication slot
# Create replication slot and capture the commit LSN
dbname = "pgedge" # Default database name
if "dbname=" in rec['dsn']:
dbname = rec['dsn'].split("dbname=")[1].split()[0]
Expand All @@ -447,12 +451,17 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st
if len(slot_name) > 64:
slot_name = slot_name[:64]

self.create_replication_slot(rec['dsn'], slot_name)
self.notice(f" OK: Creating replication slot {slot_name} on node {rec['node_name']}")
commit_lsn = self.create_replication_slot(rec['dsn'], slot_name)
self.notice(f" OK: Creating replication slot {slot_name} (LSN: {commit_lsn}) on node {rec['node_name']}")

# Trigger sync event on origin node and store LSN for later use
sync_lsn = self.sync_event(rec['dsn'])
self.sync_lsns[rec['node_name']] = sync_lsn

# Store both sync_lsn and commit_lsn
self.sync_lsns[rec['node_name']] = {
'sync_lsn': sync_lsn,
'commit_lsn': commit_lsn
}
self.notice(f" OK: Triggering sync event on node {rec['node_name']} (LSN: {sync_lsn})")

# Create disabled subscription
Expand Down Expand Up @@ -576,42 +585,38 @@ def get_commit_timestamp(self, node_dsn: str, origin: str, receiver: str) -> str
result = self.run_psql(node_dsn, sql, fetch=True, return_single=True)
return result

def advance_replication_slot(self, node_dsn: str, slot_name: str, sync_timestamp: str):
"""Advance a replication slot to a specific timestamp"""
if not sync_timestamp:
def advance_replication_slot(self, node_dsn: str, slot_name: str, target_lsn: str):
"""Advance a replication slot to a specific LSN"""
if not target_lsn:
if self.verbose:
self.info(f"Commit timestamp is NULL, skipping slot advance for slot '{slot_name}'.")
self.info(f"Target LSN is NULL, skipping slot advance for slot '{slot_name}'.")
return

sql = f"""
WITH lsn_cte AS (
SELECT spock.get_lsn_from_commit_ts('{slot_name}', '{sync_timestamp}') AS lsn
)
SELECT pg_replication_slot_advance('{slot_name}', lsn) FROM lsn_cte;
"""


sql = f"SELECT pg_replication_slot_advance('{slot_name}', '{target_lsn}'::pg_lsn);"

if self.verbose:
self.info(f"[QUERY] {sql}")

self.run_psql(node_dsn, sql)

def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: str,
def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: str,
new_node_name: str, new_node_dsn: str):
"""Phase 7: Check commit timestamp and advance replication slot"""
self.notice("Phase 7: Checking commit timestamp and advancing replication slot")
"""Phase 7: Check commit LSN and advance replication slot"""
self.notice("Phase 7: Checking commit LSN and advancing replication slot")

# Get all nodes from source cluster
nodes = self.get_spock_nodes(src_dsn)

for rec in nodes:
if rec['node_name'] == src_node_name:
continue

# Get commit timestamp
sync_timestamp = self.get_commit_timestamp(new_node_dsn, src_node_name, rec['node_name'])
if sync_timestamp:
self.notice(f" OK: Found commit timestamp for {src_node_name}->{rec['node_name']}: {sync_timestamp}")


# Get the stored commit LSN from when subscription was created
commit_lsn = self.sync_lsns[rec['node_name']]['commit_lsn']

if commit_lsn:
self.notice(f" OK: Found commit LSN for {rec['node_name']} (LSN: {commit_lsn})...")

# Advance replication slot
dbname = "pgedge"
if "dbname=" in rec['dsn']:
Expand All @@ -625,18 +630,15 @@ def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: s
self.info(f"[QUERY] {sql}")

current_lsn = self.run_psql(rec['dsn'], sql, fetch=True, return_single=True)

# Get target LSN
sql = f"SELECT spock.get_lsn_from_commit_ts('{slot_name}', '{sync_timestamp}')"
if self.verbose:
self.info(f"[QUERY] {sql}")

target_lsn = self.run_psql(rec['dsn'], sql, fetch=True, return_single=True)


target_lsn = commit_lsn

if current_lsn and target_lsn and current_lsn >= target_lsn:
self.notice(f" - Slot {slot_name} already at or beyond target LSN (current: {current_lsn}, target: {target_lsn})")
else:
self.advance_replication_slot(rec['dsn'], slot_name, sync_timestamp)
self.advance_replication_slot(rec['dsn'], slot_name, target_lsn)
else:
self.notice(f" - No commit LSN found for {rec['node_name']}->{new_node_name}")

def enable_sub(self, node_dsn: str, sub_name: str, immediate: bool = True):
"""Enable a subscription on a remote node"""
Expand Down Expand Up @@ -747,7 +749,8 @@ def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str,
# Wait for the sync event that was captured when subscription was created
# This ensures the subscription starts replicating from the correct sync point
timeout_ms = 1200 # 20 minutes
sync_lsn = self.sync_lsns.get(rec['node_name']) # Use stored sync LSN from Phase 3
sync_lsn = self.sync_lsns[rec['node_name']]['sync_lsn'] # Use stored sync LSN from Phase 3

if sync_lsn:
self.notice(f" OK: Using stored sync event from origin node {rec['node_name']} (LSN: {sync_lsn})...")

Expand Down
55 changes: 29 additions & 26 deletions samples/Z0DAN/zodan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,7 @@ DECLARE
dbname text;
slot_name text;
sub_name text;
_commit_lsn pg_lsn;
BEGIN
RAISE NOTICE 'Phase 3: Creating disabled subscriptions and slots';

Expand All @@ -1204,7 +1205,8 @@ BEGIN
-- Create temporary table to store sync LSNs
CREATE TEMP TABLE IF NOT EXISTS temp_sync_lsns (
origin_node text PRIMARY KEY,
sync_lsn text NOT NULL
sync_lsn text NOT NULL,
commit_lsn pg_lsn
);

-- Check if there are any "other" nodes (not source, not new)
Expand Down Expand Up @@ -1276,8 +1278,11 @@ BEGIN
RAISE NOTICE ' Remote SQL for slot creation: %', remotesql;
END IF;

PERFORM * FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn);
RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name, 120, ' ');
SELECT lsn INTO _commit_lsn
FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn);
UPDATE temp_sync_lsns SET commit_lsn = _commit_lsn
WHERE origin_node = rec.node_name;
RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' (LSN: ' || _commit_lsn || ')' || ' on node ' || rec.node_name, 120, ' ');
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE ' ✗ %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' ');
Expand Down Expand Up @@ -1662,7 +1667,7 @@ CREATE OR REPLACE PROCEDURE spock.check_commit_timestamp_and_advance_slot(
) LANGUAGE plpgsql AS $$
DECLARE
rec RECORD;
start_point record;
commit_lsn pg_lsn;
slot_name text;
dbname text;
remotesql text;
Expand All @@ -1677,27 +1682,23 @@ BEGIN

-- Multi-node scenario: check commit timestamp for "other" nodes to new node
FOR rec IN SELECT * FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name LOOP
-- Check commit timestamp for lag from "other" node to new node
BEGIN
remotesql := format('SELECT commit_lsn, commit_timestamp
FROM spock.lag_tracker
WHERE origin_name = %L AND receiver_name = %L',
rec.node_name, new_node_name);
IF verb THEN
RAISE NOTICE ' Remote SQL for commit timestamp check: %', remotesql;
END IF;

SELECT * FROM dblink(new_node_dsn, remotesql) AS t(lsn pg_lsn, ts timestamp) INTO start_point;

IF start_point.ts IS NOT NULL THEN
RAISE NOTICE ' OK: %', rpad('Found commit timestamp for ' || rec.node_name || '->' || new_node_name || ': ' || start_point.ts, 120, ' ');
ELSE
RAISE NOTICE ' - %', rpad('No commit timestamp found for ' || rec.node_name || '->' || new_node_name, 120, ' ');
CONTINUE;
IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'temp_sync_lsns' AND relpersistence = 't') THEN
-- Get the stored sync LSN from when subscription was created
SELECT tsl.commit_lsn INTO commit_lsn
FROM temp_sync_lsns tsl
WHERE tsl.origin_node = rec.node_name;

IF commit_lsn IS NOT NULL THEN
RAISE NOTICE ' OK: %', rpad('Found commit LSN for ' || rec.node_name || ' (LSN: ' || commit_lsn || ')...', 120, ' ');
ELSE
RAISE NOTICE ' - %', rpad('No commit LSN found for ' || rec.node_name || '->' || new_node_name, 120, ' ');
CONTINUE;
END IF;
END IF;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE ' ✗ %', rpad('Checking commit timestamp for ' || rec.node_name || '->' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' ');
RAISE NOTICE ' ✗ %', rpad('Checking commit LSN for ' || rec.node_name || '->' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' ');
CONTINUE;
END;

Expand Down Expand Up @@ -1727,6 +1728,7 @@ BEGIN

DECLARE
current_lsn pg_lsn;
target_lsn pg_lsn;
BEGIN
SELECT * FROM dblink(rec.dsn, remotesql) AS t(lsn pg_lsn) INTO current_lsn;

Expand All @@ -1735,23 +1737,24 @@ BEGIN
CONTINUE;
END IF;

IF start_point.lsn IS NULL OR start_point.lsn <= current_lsn THEN
RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, start_point.lsn;
target_lsn := commit_lsn;
IF target_lsn IS NULL OR target_lsn <= current_lsn THEN
RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, target_lsn;
CONTINUE;
END IF;

-- Advance the slot
remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', slot_name, start_point.lsn);
remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', slot_name, target_lsn);
IF verb THEN
RAISE NOTICE ' Remote SQL for slot advancement: %', remotesql;
END IF;

PERFORM * FROM dblink(rec.dsn, remotesql) AS t(result text);
RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || start_point.lsn, 120, ' ');
RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || target_lsn, 120, ' ');
END;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to timestamp ' || start_point.ts || ' (error: ' || SQLERRM || ')', 120, ' ');
RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to LSN ' || commit_lsn || ' (error: ' || SQLERRM || ')', 120, ' ');
-- Continue with other nodes even if this one fails
END;
END LOOP;
Expand Down