Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2925,3 +2925,99 @@ func RemoveTableFromCDCMetadata(ctx context.Context, db DBQuerier, tableName, pu

return nil
}

func GetReplicationOriginByName(ctx context.Context, db DBQuerier, originName string) (*uint32, error) {
sql, err := RenderSQL(SQLTemplates.GetReplicationOriginByName, nil)
if err != nil {
return nil, fmt.Errorf("failed to render GetReplicationOriginByName SQL: %w", err)
}

var originID uint32
err = db.QueryRow(ctx, sql, originName).Scan(&originID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("query to get replication origin by name '%s' failed: %w", originName, err)
}

return &originID, nil
}

func CreateReplicationOrigin(ctx context.Context, db DBQuerier, originName string) (uint32, error) {
sql, err := RenderSQL(SQLTemplates.CreateReplicationOrigin, nil)
if err != nil {
return 0, fmt.Errorf("failed to render CreateReplicationOrigin SQL: %w", err)
}

var originID uint32
err = db.QueryRow(ctx, sql, originName).Scan(&originID)
if err != nil {
return 0, fmt.Errorf("query to create replication origin '%s' failed: %w", originName, err)
}

return originID, nil
}

func SetupReplicationOriginSession(ctx context.Context, db DBQuerier, originName string) error {
sql, err := RenderSQL(SQLTemplates.SetupReplicationOriginSession, nil)
if err != nil {
return fmt.Errorf("failed to render SetupReplicationOriginSession SQL: %w", err)
}

_, err = db.Exec(ctx, sql, originName)
if err != nil {
return fmt.Errorf("query to setup replication origin session for origin '%s' failed: %w", originName, err)
}

return nil
}

func ResetReplicationOriginSession(ctx context.Context, db DBQuerier) error {
sql, err := RenderSQL(SQLTemplates.ResetReplicationOriginSession, nil)
if err != nil {
return fmt.Errorf("failed to render ResetReplicationOriginSession SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to reset replication origin session failed: %w", err)
}

return nil
}

func SetupReplicationOriginXact(ctx context.Context, db DBQuerier, originLSN string, originTimestamp *time.Time) error {
sql, err := RenderSQL(SQLTemplates.SetupReplicationOriginXact, nil)
if err != nil {
return fmt.Errorf("failed to render SetupReplicationOriginXact SQL: %w", err)
}

var timestampParam any
if originTimestamp != nil {
timestampParam = originTimestamp.Format(time.RFC3339)
} else {
timestampParam = nil
}

_, err = db.Exec(ctx, sql, originLSN, timestampParam)
if err != nil {
return fmt.Errorf("query to setup replication origin xact with LSN %s failed: %w", originLSN, err)
}

return nil
}

func ResetReplicationOriginXact(ctx context.Context, db DBQuerier) error {
sql, err := RenderSQL(SQLTemplates.ResetReplicationOriginXact, nil)
if err != nil {
return fmt.Errorf("failed to render ResetReplicationOriginXact SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to reset replication origin xact failed: %w", err)
}

return nil
}
24 changes: 24 additions & 0 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ type Templates struct {
RemoveTableFromCDCMetadata *template.Template
GetSpockOriginLSNForNode *template.Template
GetSpockSlotLSNForNode *template.Template
GetReplicationOriginByName *template.Template
CreateReplicationOrigin *template.Template
SetupReplicationOriginSession *template.Template
ResetReplicationOriginSession *template.Template
SetupReplicationOriginXact *template.Template
ResetReplicationOriginXact *template.Template
}

var SQLTemplates = Templates{
Expand Down Expand Up @@ -1543,4 +1549,22 @@ var SQLTemplates = Templates{
ORDER BY rs.confirmed_flush_lsn DESC
LIMIT 1
`)),
GetReplicationOriginByName: template.Must(template.New("getReplicationOriginByName").Parse(`
SELECT roident FROM pg_replication_origin WHERE roname = $1
`)),
CreateReplicationOrigin: template.Must(template.New("createReplicationOrigin").Parse(`
SELECT pg_replication_origin_create($1)
`)),
SetupReplicationOriginSession: template.Must(template.New("setupReplicationOriginSession").Parse(`
SELECT pg_replication_origin_session_setup($1)
`)),
ResetReplicationOriginSession: template.Must(template.New("resetReplicationOriginSession").Parse(`
SELECT pg_replication_origin_session_reset()
`)),
SetupReplicationOriginXact: template.Must(template.New("setupReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_setup($1, $2)
`)),
ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_reset()
`)),
}
40 changes: 40 additions & 0 deletions docs/commands/repair/table-repair.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Performs repairs on tables of divergent nodes based on the diff report generated
| `--bidirectional` | `-Z` | Perform insert-only repairs in both directions | `false` |
| `--fire-triggers` | `-t` | Execute triggers (otherwise runs with `session_replication_role='replica'`) | `false` |
| `--recovery-mode` | | Enable recovery-mode repair when the diff was generated with `--against-origin`; can auto-select a source of truth using Spock LSNs | `false` |
| `--preserve-origin` | | Preserve replication origin node ID and LSN for repaired rows. When enabled, repaired rows will have commits with the original node's origin ID instead of the local node ID. Requires LSN to be available from a survivor node. | `true` |
| `--quiet` | `-q` | Suppress non-essential logging | `false` |
| `--debug` | `-v` | Enable verbose logging | `false` |

Expand Down Expand Up @@ -69,3 +70,42 @@ Diff reports share the same prefix generated by `table-diff` (for example `publi
## Fixing null-only drifts (`--fix-nulls`)

Replication hiccups can leave some columns NULL on one node while populated on another. The `--fix-nulls` mode cross-fills those NULLs in both directions using values from the paired node(s); it does **not** require a source-of-truth. Use it when the diff shows only NULL/NOT NULL mismatches and you want to reconcile columns without preferring a single node.

## Preserving replication origin (`--preserve-origin`)

By default, `--preserve-origin` is enabled. When repairing rows, this ensures that the repaired rows maintain the correct replication origin node ID and LSN from the original transaction, rather than using the local node's ID. This is particularly important in recovery scenarios where:

- A node fails and rows are repaired from a survivor
- The failed node may come back online
- Without origin tracking, the repaired rows would have the local node's origin ID, which could cause conflicts when the original node resumes replication

### How it works

1. **Origin extraction**: ACE extracts the `node_origin` and `commit_ts` from the diff file metadata for each row being repaired.

2. **LSN retrieval**: For each origin node, ACE queries a survivor node to obtain the origin LSN. This LSN must be available - if it's not, the repair will fail (as required for data consistency).

3. **Replication origin session**: Before executing repairs for each origin group, ACE:
- Gets or creates a replication origin for the origin node
- Sets up a replication origin session
- Configures the session with the origin LSN and timestamp
- Executes the repairs
- Resets the session

4. **Grouping**: Rows are automatically grouped by origin node to minimize session setup overhead.

### Requirements and limitations

- **LSN availability**: The origin LSN must be available from at least one survivor node. If not available, the repair will fail with an error.
- **Survivor nodes**: At least one survivor node must be accessible to fetch the origin LSN.
- **Privileges**: Replication origin functions require superuser or replication privileges on the target database.
- **Missing metadata**: If origin metadata is missing from the diff file for some rows, those rows will be repaired without origin tracking (a warning will be logged).

### When to disable

You may want to disable `--preserve-origin` with `--no-preserve-origin` if:
- You're certain the origin node will not come back online
- You've permanently removed the origin node from the cluster
- You want repaired rows to be treated as local writes

**Note**: Disabling origin preservation should only be done when you're certain about the node's status, as it can cause replication conflicts if the origin node returns.
5 changes: 5 additions & 0 deletions internal/api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type tableRepairRequest struct {
GenerateReport bool `json:"generate_report"`
FixNulls bool `json:"fix_nulls"`
Bidirectional bool `json:"bidirectional"`
PreserveOrigin *bool `json:"preserve_origin,omitempty"`
}

type spockDiffRequest struct {
Expand Down Expand Up @@ -434,6 +435,10 @@ func (s *APIServer) handleTableRepair(w http.ResponseWriter, r *http.Request) {
task.GenerateReport = req.GenerateReport
task.FixNulls = req.FixNulls
task.Bidirectional = req.Bidirectional
// PreserveOrigin defaults to true if not explicitly set
if req.PreserveOrigin != nil {
task.PreserveOrigin = *req.PreserveOrigin
}
task.Ctx = r.Context()
task.ClientRole = clientInfo.role
task.InvokeMethod = "api"
Expand Down
6 changes: 6 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func SetupCLI() *cli.App {
Usage: "Enable recovery-mode repair using origin-only diffs",
Value: false,
},
&cli.BoolFlag{
Name: "preserve-origin",
Usage: "Preserve replication origin node ID and LSN for repaired rows (default: true)",
Value: true,
},
&cli.BoolFlag{
Name: "fix-nulls",
Aliases: []string{"X"},
Expand Down Expand Up @@ -1199,6 +1204,7 @@ func TableRepairCLI(ctx *cli.Context) error {
task.Bidirectional = ctx.Bool("bidirectional")
task.GenerateReport = ctx.Bool("generate-report")
task.RecoveryMode = ctx.Bool("recovery-mode")
task.PreserveOrigin = ctx.Bool("preserve-origin")

if err := task.ValidateAndPrepare(); err != nil {
return fmt.Errorf("validation failed: %w", err)
Expand Down
Loading
Loading