Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 215 additions & 1 deletion pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
featureSeperatedHandles bool
featureEnableSnapshotCompress bool
featureOverrideReplicationNumInternal bool
featureResyncOnTableRecreate bool

flagBinlogBatchSize int64

Expand Down Expand Up @@ -127,6 +128,8 @@ func init() {
"enable snapshot compress")
flag.BoolVar(&featureOverrideReplicationNumInternal, "feature_override_replication_num", true,
"enable override replication_num for downstream cluster")
flag.BoolVar(&featureResyncOnTableRecreate, "feature_resync_on_table_recreate", false,
"enable auto resync when upstream table is dropped and recreated with the same name")

flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch")
}
Expand Down Expand Up @@ -1005,6 +1008,50 @@ func (j *Job) fullSync() error {
return nil
}
case TableSync:
// Step 1.2.1: For table sync, check if the source table still exists before creating snapshot.
// This handles the case where the table is dropped again during full sync.
// Reuse checkTableExistsForTableSync to check table status.
tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync()
if err != nil {
log.Warnf("check table exists failed during full sync, will retry, err: %+v", err)
return nil
}

if !tableExists {
// Table has been dropped during full sync
if featureResyncOnTableRecreate {
// Wait for table to be recreated (TableDroppedFlag already set by checkTableExistsForTableSync)
log.Warnf("table %s.%s has been dropped during full sync, waiting for recreate",
j.Src.Database, j.Src.Table)
return nil
} else {
// Pause job
msg := fmt.Sprintf("table %s.%s has been dropped during full sync. Job will be paused.",
j.Src.Database, j.Src.Table)
log.Warnf(msg)
j.progress.SetFullSyncInfo(msg)
j.progress.Persist()
if err := j.Pause(); err != nil {
log.Errorf("failed to pause job: %+v", err)
}
return xerror.Errorf(xerror.Normal, msg)
}
}

// Step 1.2.2: Check if table_id changed again (table dropped and recreated during full sync)
if tableIdChanged {
// Table was recreated again with a new table_id during full sync
log.Infof("table %s.%s was recreated again during full sync, old table_id: %d, new table_id: %d",
j.Src.Database, j.Src.Table, j.Src.TableId, newTableId)
j.Src.TableId = newTableId
// Note: Dest.TableId will be updated after full sync completes
if err := j.persistJob(); err != nil {
log.Warnf("persist job failed after table recreate during full sync, err: %+v", err)
}
// Continue with the new table_id, no need to restart full sync
// because we haven't created snapshot yet
}

backupTableList = append(backupTableList, j.Src.Table)
default:
return xerror.Errorf(xerror.Normal, "invalid sync type %s", j.SyncType)
Expand Down Expand Up @@ -1470,6 +1517,13 @@ func (j *Job) fullSync() error {
return err
}

// Reset table dropped state after successful full sync
if j.progress.TableDroppedFlag {
log.Infof("table %s.%s full sync completed, resetting table dropped state", j.Src.Database, j.Src.Table)
j.progress.TableDroppedFlag = false
j.progress.TableDroppedTime = 0
}

j.progress.PartitionCommitSeqMap = nil
j.progress.TableCommitSeqMap = nil
j.progress.TableMapping = nil
Expand Down Expand Up @@ -3812,6 +3866,24 @@ func (j *Job) incrementalSyncInternal() error {
case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB:
return xerror.Errorf(xerror.Normal, "can't found db")
case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE:
// Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues
if j.SyncType == TableSync {
// Use unified handler to check and handle all scenarios
action, err := j.handleTableNotFoundForTableSync()
switch action {
case TableNotFoundWait:
return nil
case TableNotFoundTriggerFullSync:
return nil // State changed to TableFullSync, will be handled in next loop
case TableNotFoundBinlogNotReady:
log.Infof("table %s.%s exists but binlog not ready, waiting and retry",
j.Src.Database, j.Src.Table)
return nil
case TableNotFoundError:
return err
}
}
// For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog)
return xerror.Errorf(xerror.Normal, "can't found table")
default:
return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s",
Expand All @@ -3834,6 +3906,148 @@ func (j *Job) incrementalSyncInternal() error {
return nil
}

// checkTableExistsForTableSync checks if the source table exists and whether its table_id has changed.
// This is used for auto-recovery when a table is dropped and recreated with the same name.
// Returns: (tableExists bool, tableIdChanged bool, newTableId int64, error)
func (j *Job) checkTableExistsForTableSync() (bool, bool, int64, error) {
if j.SyncType != TableSync {
return false, false, 0, xerror.Errorf(xerror.Normal, "checkTableExistsForTableSync only works for table sync")
}

// Check if table exists by name
exists, err := j.Src.CheckTableExists()
if err != nil {
log.Warnf("check table exists failed, err: %+v", err)
return false, false, 0, err
}

if !exists {
// Table does not exist
if !j.progress.TableDroppedFlag {
j.progress.TableDroppedFlag = true
j.progress.TableDroppedTime = time.Now().Unix()
j.progress.Persist()
log.Infof("table %s.%s is dropped, waiting for recreate", j.Src.Database, j.Src.Table)
}
return false, false, 0, nil
}

// Table exists, check if table_id changed
// Use UpdateTable to force fetch from upstream cluster, not from cache
tableMeta, err := j.srcMeta.UpdateTable(j.Src.Table, 0)
if err != nil {
log.Warnf("update table meta failed, err: %+v", err)
return true, false, 0, err
}
newTableId := tableMeta.Id

if newTableId != j.Src.TableId {
// Table was recreated with a new table_id
var droppedDuration time.Duration
if j.progress.TableDroppedTime > 0 {
droppedDuration = time.Since(time.Unix(j.progress.TableDroppedTime, 0))
}
log.Infof("table %s.%s was recreated, old table_id: %d, new table_id: %d, dropped duration: %v",
j.Src.Database, j.Src.Table, j.Src.TableId, newTableId, droppedDuration)
return true, true, newTableId, nil
}

// Table exists with same table_id, reset drop state
if j.progress.TableDroppedFlag {
j.progress.TableDroppedFlag = false
j.progress.TableDroppedTime = 0
j.progress.Persist()
}

return true, false, newTableId, nil
}

// TableNotFoundAction represents the action to take when table is not found
type TableNotFoundAction int

const (
// TableNotFoundWait means continue waiting for table to be recreated
TableNotFoundWait TableNotFoundAction = iota
// TableNotFoundTriggerFullSync means table was recreated, trigger full sync
TableNotFoundTriggerFullSync
// TableNotFoundBinlogNotReady means table exists but binlog not ready (just enabled)
TableNotFoundBinlogNotReady
// TableNotFoundError means an error occurred and job should be paused or failed
TableNotFoundError
)

// handleTableNotFoundForTableSync handles table not found scenario for table sync.
// This is the unified handler for auto-recovery when upstream table is dropped.
// Note: This function should only be called when j.SyncType == TableSync.
// Returns: (action TableNotFoundAction, err error)
// - TableNotFoundWait: table dropped, continue waiting for recreate
// - TableNotFoundTriggerFullSync: table was recreated, trigger full sync
// - TableNotFoundBinlogNotReady: table exists but binlog not ready, wait and retry
// - TableNotFoundError: error occurred
func (j *Job) handleTableNotFoundForTableSync() (TableNotFoundAction, error) {
// Check if table exists and whether table_id changed
tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync()
if err != nil {
log.Warnf("check table exists failed, will retry later, err: %+v", err)
return TableNotFoundWait, nil
}

// Case 1: Table does not exist (dropped)
if !tableExists {
if !featureResyncOnTableRecreate {
// Just log warning and continue waiting
log.Warnf("upstream table %s.%s has been dropped (table not found). "+
"Auto-resync is disabled. Options: "+
"1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+
"2) Or manually pause/delete the job.",
j.Src.Database, j.Src.Table)
}
// TableDroppedFlag already set by checkTableExistsForTableSync
return TableNotFoundWait, nil
}

// Case 2: Table was recreated with a new table_id
if tableIdChanged {
if !featureResyncOnTableRecreate {
// Feature disabled: log warning and continue waiting (same as Case 1)
log.Warnf("upstream table %s.%s was recreated with new table_id %d (old: %d), but auto-resync is disabled. "+
"Options: "+
"1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+
"2) Or manually pause/delete the job.",
j.Src.Database, j.Src.Table, newTableId, j.Src.TableId)
return TableNotFoundWait, nil
}

log.Infof("table %s.%s was recreated with new table_id %d, triggering full sync to recover",
j.Src.Database, j.Src.Table, newTableId)

// Update table_id in Src spec only (Dest.TableId will be updated after full sync completes)
j.Src.TableId = newTableId

// Persist job with new table_id
if err := j.persistJob(); err != nil {
log.Warnf("persist job failed after table recreate, err: %+v", err)
}

// Reset progress to trigger full sync
// NOTE: Do NOT reset TableDroppedFlag here. It will be reset after full sync completes.
// This handles the case where table is dropped again during full sync.
j.progress.SyncState = TableFullSync
j.progress.PrevCommitSeq = 0
j.progress.CommitSeq = 0
j.progress.SubSyncState = BeginCreateSnapshot
j.progress.Persist()

log.Infof("table %s.%s auto-recovery triggered, switching to full sync", j.Src.Database, j.Src.Table)
return TableNotFoundTriggerFullSync, nil
}

// Case 3: Table exists with same table_id
// TableDroppedFlag already reset by checkTableExistsForTableSync
// This means binlog just not ready
return TableNotFoundBinlogNotReady, nil
}

func (j *Job) recoverJobProgress() error {
// parse progress
if progress, err := NewJobProgressFromJson(j.Name, j.db); err != nil {
Expand Down Expand Up @@ -4527,7 +4741,7 @@ func (j *Job) GetSpecifiedBinlog(commitSeq int64) (*festruct.TBinlog, error) {
case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB:
return nil, xerror.Errorf(xerror.Normal, "can't found db, commit seq: %d", commitSeq)
case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE:
return nil, xerror.Errorf(xerror.Normal, "can't found table, commit seq: %d", commitSeq)
return nil, xerror.Errorf(xerror.Normal, "can't found table binlog, commit seq: %d", commitSeq)
default:
return nil, xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s",
status.StatusCode, utils.FirstOr(status.GetErrorMsgs(), ""))
Expand Down
28 changes: 27 additions & 1 deletion pkg/ccr/job_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
log "github.com/sirupsen/logrus"
)

// errTableRecreatedTriggerFullSync is returned when table is recreated with a new table_id,
// triggering the auto-recovery mechanism to switch to full sync.
var errTableRecreatedTriggerFullSync = xerror.NewWithoutStack(xerror.Normal, "table recreated, trigger full sync")

type TxnLink struct {
// The previous txn link
Prev <-chan any
Expand Down Expand Up @@ -240,7 +244,11 @@ func (j *Job) pipelineSync() error {

// fetch the binlogs, if the binlogs is empty.
if !j.pipelineCtx.hasBinlogs() {
if err := j.getNextBinlogs(); err != nil {
if err := j.getNextBinlogs(); err == errTableRecreatedTriggerFullSync {
// Table was recreated, exit pipeline and trigger full sync
j.resetPipeline()
return nil
} else if err != nil {
return err
}
hasMoreBinlogs = len(j.pipelineCtx.Binlogs) > 0
Expand Down Expand Up @@ -491,6 +499,24 @@ func (j *Job) getNextBinlogs() error {
case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB:
return xerror.Errorf(xerror.Normal, "can't found db")
case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE:
// Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues
if j.SyncType == TableSync {
// Use unified handler to check and handle all scenarios
action, err := j.handleTableNotFoundForTableSync()
switch action {
case TableNotFoundWait:
return nil
case TableNotFoundTriggerFullSync:
return errTableRecreatedTriggerFullSync
case TableNotFoundBinlogNotReady:
log.Infof("table %s.%s exists but binlog not ready, waiting and retry",
j.Src.Database, j.Src.Table)
return nil
case TableNotFoundError:
return err
}
}
// For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog)
return xerror.Errorf(xerror.Normal, "can't found table")
default:
return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccr/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ type JobProgress struct {
TableAliases map[string]string `json:"table_aliases,omitempty"`
PrevTxnId int64 `json:"prev_txn_id,omitempty"`

// For table level auto recover when table is dropped and recreated
TableDroppedFlag bool `json:"table_dropped_flag,omitempty"`
TableDroppedTime int64 `json:"table_dropped_time,omitempty"` // Unix timestamp in seconds

// The shadow indexes of the pending schema changes
ShadowIndexes map[int64]int64 `json:"shadow_index_map,omitempty"`

Expand Down
Loading
Loading