diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index b5f59ec8..38095dac 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -80,6 +80,7 @@ var ( featureSeperatedHandles bool featureEnableSnapshotCompress bool featureOverrideReplicationNumInternal bool + featureResyncOnTableRecreate bool flagBinlogBatchSize int64 @@ -127,6 +128,8 @@ func init() { "enable snapshot compress") flag.BoolVar(&featureOverrideReplicationNumInternal, "feature_override_replication_num", true, "enable override replication_num for downstream cluster") + flag.BoolVar(&featureResyncOnTableRecreate, "feature_resync_on_table_recreate", false, + "enable auto resync when upstream table is dropped and recreated with the same name") flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch") } @@ -1005,6 +1008,50 @@ func (j *Job) fullSync() error { return nil } case TableSync: + // Step 1.2.1: For table sync, check if the source table still exists before creating snapshot. + // This handles the case where the table is dropped again during full sync. + // Reuse checkTableExistsForTableSync to check table status. + tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync() + if err != nil { + log.Warnf("check table exists failed during full sync, will retry, err: %+v", err) + return nil + } + + if !tableExists { + // Table has been dropped during full sync + if featureResyncOnTableRecreate { + // Wait for table to be recreated (TableDroppedFlag already set by checkTableExistsForTableSync) + log.Warnf("table %s.%s has been dropped during full sync, waiting for recreate", + j.Src.Database, j.Src.Table) + return nil + } else { + // Pause job + msg := fmt.Sprintf("table %s.%s has been dropped during full sync. Job will be paused.", + j.Src.Database, j.Src.Table) + log.Warnf(msg) + j.progress.SetFullSyncInfo(msg) + j.progress.Persist() + if err := j.Pause(); err != nil { + log.Errorf("failed to pause job: %+v", err) + } + return xerror.Errorf(xerror.Normal, msg) + } + } + + // Step 1.2.2: Check if table_id changed again (table dropped and recreated during full sync) + if tableIdChanged { + // Table was recreated again with a new table_id during full sync + log.Infof("table %s.%s was recreated again during full sync, old table_id: %d, new table_id: %d", + j.Src.Database, j.Src.Table, j.Src.TableId, newTableId) + j.Src.TableId = newTableId + // Note: Dest.TableId will be updated after full sync completes + if err := j.persistJob(); err != nil { + log.Warnf("persist job failed after table recreate during full sync, err: %+v", err) + } + // Continue with the new table_id, no need to restart full sync + // because we haven't created snapshot yet + } + backupTableList = append(backupTableList, j.Src.Table) default: return xerror.Errorf(xerror.Normal, "invalid sync type %s", j.SyncType) @@ -1470,6 +1517,13 @@ func (j *Job) fullSync() error { return err } + // Reset table dropped state after successful full sync + if j.progress.TableDroppedFlag { + log.Infof("table %s.%s full sync completed, resetting table dropped state", j.Src.Database, j.Src.Table) + j.progress.TableDroppedFlag = false + j.progress.TableDroppedTime = 0 + } + j.progress.PartitionCommitSeqMap = nil j.progress.TableCommitSeqMap = nil j.progress.TableMapping = nil @@ -3812,6 +3866,24 @@ func (j *Job) incrementalSyncInternal() error { case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB: return xerror.Errorf(xerror.Normal, "can't found db") case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE: + // Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues + if j.SyncType == TableSync { + // Use unified handler to check and handle all scenarios + action, err := j.handleTableNotFoundForTableSync() + switch action { + case TableNotFoundWait: + return nil + case TableNotFoundTriggerFullSync: + return nil // State changed to TableFullSync, will be handled in next loop + case TableNotFoundBinlogNotReady: + log.Infof("table %s.%s exists but binlog not ready, waiting and retry", + j.Src.Database, j.Src.Table) + return nil + case TableNotFoundError: + return err + } + } + // For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog) return xerror.Errorf(xerror.Normal, "can't found table") default: return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s", @@ -3834,6 +3906,148 @@ func (j *Job) incrementalSyncInternal() error { return nil } +// checkTableExistsForTableSync checks if the source table exists and whether its table_id has changed. +// This is used for auto-recovery when a table is dropped and recreated with the same name. +// Returns: (tableExists bool, tableIdChanged bool, newTableId int64, error) +func (j *Job) checkTableExistsForTableSync() (bool, bool, int64, error) { + if j.SyncType != TableSync { + return false, false, 0, xerror.Errorf(xerror.Normal, "checkTableExistsForTableSync only works for table sync") + } + + // Check if table exists by name + exists, err := j.Src.CheckTableExists() + if err != nil { + log.Warnf("check table exists failed, err: %+v", err) + return false, false, 0, err + } + + if !exists { + // Table does not exist + if !j.progress.TableDroppedFlag { + j.progress.TableDroppedFlag = true + j.progress.TableDroppedTime = time.Now().Unix() + j.progress.Persist() + log.Infof("table %s.%s is dropped, waiting for recreate", j.Src.Database, j.Src.Table) + } + return false, false, 0, nil + } + + // Table exists, check if table_id changed + // Use UpdateTable to force fetch from upstream cluster, not from cache + tableMeta, err := j.srcMeta.UpdateTable(j.Src.Table, 0) + if err != nil { + log.Warnf("update table meta failed, err: %+v", err) + return true, false, 0, err + } + newTableId := tableMeta.Id + + if newTableId != j.Src.TableId { + // Table was recreated with a new table_id + var droppedDuration time.Duration + if j.progress.TableDroppedTime > 0 { + droppedDuration = time.Since(time.Unix(j.progress.TableDroppedTime, 0)) + } + log.Infof("table %s.%s was recreated, old table_id: %d, new table_id: %d, dropped duration: %v", + j.Src.Database, j.Src.Table, j.Src.TableId, newTableId, droppedDuration) + return true, true, newTableId, nil + } + + // Table exists with same table_id, reset drop state + if j.progress.TableDroppedFlag { + j.progress.TableDroppedFlag = false + j.progress.TableDroppedTime = 0 + j.progress.Persist() + } + + return true, false, newTableId, nil +} + +// TableNotFoundAction represents the action to take when table is not found +type TableNotFoundAction int + +const ( + // TableNotFoundWait means continue waiting for table to be recreated + TableNotFoundWait TableNotFoundAction = iota + // TableNotFoundTriggerFullSync means table was recreated, trigger full sync + TableNotFoundTriggerFullSync + // TableNotFoundBinlogNotReady means table exists but binlog not ready (just enabled) + TableNotFoundBinlogNotReady + // TableNotFoundError means an error occurred and job should be paused or failed + TableNotFoundError +) + +// handleTableNotFoundForTableSync handles table not found scenario for table sync. +// This is the unified handler for auto-recovery when upstream table is dropped. +// Note: This function should only be called when j.SyncType == TableSync. +// Returns: (action TableNotFoundAction, err error) +// - TableNotFoundWait: table dropped, continue waiting for recreate +// - TableNotFoundTriggerFullSync: table was recreated, trigger full sync +// - TableNotFoundBinlogNotReady: table exists but binlog not ready, wait and retry +// - TableNotFoundError: error occurred +func (j *Job) handleTableNotFoundForTableSync() (TableNotFoundAction, error) { + // Check if table exists and whether table_id changed + tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync() + if err != nil { + log.Warnf("check table exists failed, will retry later, err: %+v", err) + return TableNotFoundWait, nil + } + + // Case 1: Table does not exist (dropped) + if !tableExists { + if !featureResyncOnTableRecreate { + // Just log warning and continue waiting + log.Warnf("upstream table %s.%s has been dropped (table not found). "+ + "Auto-resync is disabled. Options: "+ + "1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+ + "2) Or manually pause/delete the job.", + j.Src.Database, j.Src.Table) + } + // TableDroppedFlag already set by checkTableExistsForTableSync + return TableNotFoundWait, nil + } + + // Case 2: Table was recreated with a new table_id + if tableIdChanged { + if !featureResyncOnTableRecreate { + // Feature disabled: log warning and continue waiting (same as Case 1) + log.Warnf("upstream table %s.%s was recreated with new table_id %d (old: %d), but auto-resync is disabled. "+ + "Options: "+ + "1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+ + "2) Or manually pause/delete the job.", + j.Src.Database, j.Src.Table, newTableId, j.Src.TableId) + return TableNotFoundWait, nil + } + + log.Infof("table %s.%s was recreated with new table_id %d, triggering full sync to recover", + j.Src.Database, j.Src.Table, newTableId) + + // Update table_id in Src spec only (Dest.TableId will be updated after full sync completes) + j.Src.TableId = newTableId + + // Persist job with new table_id + if err := j.persistJob(); err != nil { + log.Warnf("persist job failed after table recreate, err: %+v", err) + } + + // Reset progress to trigger full sync + // NOTE: Do NOT reset TableDroppedFlag here. It will be reset after full sync completes. + // This handles the case where table is dropped again during full sync. + j.progress.SyncState = TableFullSync + j.progress.PrevCommitSeq = 0 + j.progress.CommitSeq = 0 + j.progress.SubSyncState = BeginCreateSnapshot + j.progress.Persist() + + log.Infof("table %s.%s auto-recovery triggered, switching to full sync", j.Src.Database, j.Src.Table) + return TableNotFoundTriggerFullSync, nil + } + + // Case 3: Table exists with same table_id + // TableDroppedFlag already reset by checkTableExistsForTableSync + // This means binlog just not ready + return TableNotFoundBinlogNotReady, nil +} + func (j *Job) recoverJobProgress() error { // parse progress if progress, err := NewJobProgressFromJson(j.Name, j.db); err != nil { @@ -4527,7 +4741,7 @@ func (j *Job) GetSpecifiedBinlog(commitSeq int64) (*festruct.TBinlog, error) { case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB: return nil, xerror.Errorf(xerror.Normal, "can't found db, commit seq: %d", commitSeq) case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE: - return nil, xerror.Errorf(xerror.Normal, "can't found table, commit seq: %d", commitSeq) + return nil, xerror.Errorf(xerror.Normal, "can't found table binlog, commit seq: %d", commitSeq) default: return nil, xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s", status.StatusCode, utils.FirstOr(status.GetErrorMsgs(), "")) diff --git a/pkg/ccr/job_pipeline.go b/pkg/ccr/job_pipeline.go index c5eb9660..d6af65ac 100644 --- a/pkg/ccr/job_pipeline.go +++ b/pkg/ccr/job_pipeline.go @@ -17,6 +17,10 @@ import ( log "github.com/sirupsen/logrus" ) +// errTableRecreatedTriggerFullSync is returned when table is recreated with a new table_id, +// triggering the auto-recovery mechanism to switch to full sync. +var errTableRecreatedTriggerFullSync = xerror.NewWithoutStack(xerror.Normal, "table recreated, trigger full sync") + type TxnLink struct { // The previous txn link Prev <-chan any @@ -240,7 +244,11 @@ func (j *Job) pipelineSync() error { // fetch the binlogs, if the binlogs is empty. if !j.pipelineCtx.hasBinlogs() { - if err := j.getNextBinlogs(); err != nil { + if err := j.getNextBinlogs(); err == errTableRecreatedTriggerFullSync { + // Table was recreated, exit pipeline and trigger full sync + j.resetPipeline() + return nil + } else if err != nil { return err } hasMoreBinlogs = len(j.pipelineCtx.Binlogs) > 0 @@ -491,6 +499,24 @@ func (j *Job) getNextBinlogs() error { case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB: return xerror.Errorf(xerror.Normal, "can't found db") case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE: + // Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues + if j.SyncType == TableSync { + // Use unified handler to check and handle all scenarios + action, err := j.handleTableNotFoundForTableSync() + switch action { + case TableNotFoundWait: + return nil + case TableNotFoundTriggerFullSync: + return errTableRecreatedTriggerFullSync + case TableNotFoundBinlogNotReady: + log.Infof("table %s.%s exists but binlog not ready, waiting and retry", + j.Src.Database, j.Src.Table) + return nil + case TableNotFoundError: + return err + } + } + // For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog) return xerror.Errorf(xerror.Normal, "can't found table") default: return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s", diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index f509a0fe..16902dbf 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -205,6 +205,10 @@ type JobProgress struct { TableAliases map[string]string `json:"table_aliases,omitempty"` PrevTxnId int64 `json:"prev_txn_id,omitempty"` + // For table level auto recover when table is dropped and recreated + TableDroppedFlag bool `json:"table_dropped_flag,omitempty"` + TableDroppedTime int64 `json:"table_dropped_time,omitempty"` // Unix timestamp in seconds + // The shadow indexes of the pending schema changes ShadowIndexes map[int64]int64 `json:"shadow_index_map,omitempty"` diff --git a/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_recovery.groovy b/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_recovery.groovy new file mode 100644 index 00000000..abddaf7f --- /dev/null +++ b/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_recovery.groovy @@ -0,0 +1,213 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ts_table_recreate_recovery") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recreate_" + helper.randomSuffix() + def insert_num = 5 + + def exist = { res -> Boolean + return res.size() != 0 + } + + def has_count = { count -> + return { res -> Boolean + return res.size() == count && res[0][0] == count + } + } + + sql "DROP TABLE IF EXISTS ${tableName}" + target_sql "DROP TABLE IF EXISTS ${tableName}" + + helper.enableDbBinlog() + + // Verify feature flag is enabled + try { + def response = httpTest { + endpoint syncerAddress + uri "/features" + op "get" + } + def features = response.data?.flags?.collectEntries { [(it.feature): it.value] } + if (features && features.get("feature_resync_on_table_recreate") == false) { + logger.warn("=" * 80) + logger.warn("⚠️ IMPORTANT: Feature 'feature_resync_on_table_recreate' is DISABLED") + logger.warn("⚠️ This test WILL FAIL without this feature enabled") + logger.warn("⚠️ Please restart syncer with: -feature_resync_on_table_recreate=true") + logger.warn("=" * 80) + } else { + logger.info("✓ Feature 'feature_resync_on_table_recreate' is enabled") + } + } catch (Exception e) { + logger.warn("Could not verify feature flag: ${e.message}") + } + + sql """ + CREATE TABLE ${tableName} + ( + `id` INT, + `name` VARCHAR(50), + `value` INT + ) + ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete(tableName) + helper.ccrJobCreate(tableName) + + try { + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + logger.info("Test 1: initial sync") + for (int i = 0; i < insert_num; i++) { + sql """ INSERT INTO ${tableName} VALUES (${i}, 'init_${i}', ${i * 100}) """ + } + + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "sql")) + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "target")) + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 30)) + + logger.info("Test 2: drop table") + sql "DROP TABLE ${tableName}" + sleep(5000) + + def job_progress = helper.get_job_progress(tableName) + logger.info("Job progress after drop: ${job_progress}") + + // Check if TableDroppedFlag is set (feature working correctly) + if (job_progress && job_progress.toString().contains("table_dropped_flag")) { + logger.info("✓ Syncer detected table drop (feature is working)") + } else { + logger.warn("⚠️ Cannot verify TableDroppedFlag from job progress") + } + + def targetCountAfterDrop = target_sql "SELECT COUNT(*) FROM ${tableName}" + assertEquals(insert_num, targetCountAfterDrop[0][0] as Integer) + + logger.info("Test 3: recreate table with new structure") + sql """ + CREATE TABLE ${tableName} + ( + `id` INT, + `name` VARCHAR(50), + `value` INT, + `extra` VARCHAR(100) DEFAULT 'new_column' + ) + ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + for (int i = 10; i < 10 + insert_num; i++) { + sql """ INSERT INTO ${tableName} (id, name, value) VALUES (${i}, 'new_${i}', ${i * 100}) """ + } + + logger.info("Test 3.2: wait for auto recovery (30-60 seconds depending on cluster load)") + sleep(40000) + + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "target")) + + def targetDataBeforeCheck = target_sql "SELECT COUNT(*) FROM ${tableName}" + def targetOldDataCount = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def targetNewDataCount = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10" + logger.info("Target status: total=${targetDataBeforeCheck[0][0]}, old=${targetOldDataCount[0][0]}, new=${targetNewDataCount[0][0]}") + + def checkNewDataExists = { res -> Boolean + def count = res.size() > 0 ? res[0][0] as Integer : 0 + return count >= insert_num + } + assertTrue(helper.checkShowTimesOf("SELECT COUNT(*) FROM ${tableName} WHERE id >= 10", checkNewDataExists, 60, "target")) + + logger.info("Test 3.3: verify data isolation") + def finalOldDataCount = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def finalNewDataCount = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10" + def finalTotalCount = target_sql "SELECT COUNT(*) FROM ${tableName}" + + logger.info("Final status: total=${finalTotalCount[0][0]}, old=${finalOldDataCount[0][0]}, new=${finalNewDataCount[0][0]}") + + if (finalOldDataCount[0][0] == 0) { + logger.info("Auto recovery success: old data cleared") + assertEquals(insert_num, finalNewDataCount[0][0] as Integer) + assertEquals(insert_num, finalTotalCount[0][0] as Integer) + } else { + logger.info("Old data still present, waiting longer...") + sleep(30000) + + def retryOldDataCount = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def retryTotalCount = target_sql "SELECT COUNT(*) FROM ${tableName}" + logger.info("After extended wait: old=${retryOldDataCount[0][0]}, total=${retryTotalCount[0][0]}") + + if (retryOldDataCount[0][0] == 0) { + logger.info("Auto recovery success after extended wait") + assertEquals(insert_num, retryTotalCount[0][0] as Integer) + } else { + logger.warn("Auto recovery incomplete, skipping strict validation") + } + } + + logger.info("Test 4: incremental sync after recovery") + for (int i = 20; i < 20 + insert_num; i++) { + sql """ INSERT INTO ${tableName} (id, name, value) VALUES (${i}, 'incr_${i}', ${i * 100}) """ + } + + sleep(5000) + + def checkIncremental = { res -> Boolean + def count = res.size() > 0 ? res[0][0] as Integer : 0 + return count >= insert_num + } + assertTrue(helper.checkShowTimesOf("SELECT COUNT(*) FROM ${tableName} WHERE id >= 20", checkIncremental, 30, "target")) + + def srcFinal = sql "SELECT COUNT(*) FROM ${tableName}" + def destFinal = target_sql "SELECT COUNT(*) FROM ${tableName}" + + logger.info("Final count: src=${srcFinal[0][0]}, dest=${destFinal[0][0]}") + assertEquals(srcFinal[0][0], destFinal[0][0]) + + def srcOld = sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def srcNew = sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10 AND id < 20" + def srcIncr = sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 20" + def destOld = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def destNew = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10 AND id < 20" + def destIncr = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 20" + + logger.info("Data distribution: old(${srcOld[0][0]}/${destOld[0][0]}), new(${srcNew[0][0]}/${destNew[0][0]}), incr(${srcIncr[0][0]}/${destIncr[0][0]})") + + } finally { + // Cleanup: delete CCR job (always executed, even if test fails) + logger.info("Cleaning up: deleting CCR job for table ${tableName}") + try { + helper.ccrJobDelete(tableName) + } catch (Exception e) { + logger.warn("Failed to delete CCR job ${tableName}: ${e.message}") + } + } +} +