Skip to content

Conversation

@Guosmilesmile
Copy link
Contributor

This PR is a continuation of #15042.

In this approach, the lock is maintained inside TriggerManagerOperator, and TableMaintenanceCoordinator acts as a bridge. It holds a reference to TriggerManagerOperator in a static map, and uses that to receive events from LockRemoverOperator and release the lock held by TriggerManagerOperator.

Normal flow

  1. TriggerManagerOperator decides whether to fire a trigger and, if so, acquires the lock internally and sends the trigger downstream.
  2. After the task is finished, the trigger or the watermark reaches LockRemoverOperator.
  3. LockRemoverOperator sends an event to TableMaintenanceCoordinator. TableMaintenanceCoordinator uses the reference stored in its static map to find the corresponding TriggerManagerOperator and sends a “release lock” event to it.
  4. TriggerManagerOperator releases the lock.

Recovery flow

  1. During initializeState, TriggerManagerOperator marks itself as “in recovery” and then sends a recovery trigger downstream. While the “in recovery” flag is true, other tasks are not allowed to operate on it.
  2. When the recovery trigger or the watermark reaches LockRemoverOperator, it sends an event to TableMaintenanceCoordinator to release the lock.
  3. TriggerManagerOperator releases the lock.

@github-actions github-actions bot added the flink label Jan 27, 2026
void handleLockReleaseResult(LockReleasedEvent event) {
if (event.lockId().equals(tableName)) {
this.lockHeld = false;
this.restoreTasks = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be correct on recovery.
If we have an ongoing task and the recovery lock, then this might be only the "task" lock release.
Let's imagine this sequence:

  • Restore with ongoing T1. We send a Trigger for restore. restoreTasks is set to true.
  • T1 lock release arrives. restoreTasks is set to false.
  • We trigger T2, and set lockHeld to true
  • Restore Trigger arrives to the LockRemover. We receive a LockReleasedEvent, and set lockHeld to false.
  • We trigger T3 - which is wrong

We need to differentiate between the restoreLock release and the lockHeld release

Copy link
Contributor Author

@Guosmilesmile Guosmilesmile Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can differentiate between the restoreLock release and the lockHeld release from the source from processWatermark or processElement, from processWatermark is recove. So I add a flag in LockReleasedEvent. And TriggerManagerOperator only release restoreTasksLock when it is from watermark

Comment on lines 100 to 101
operatorEventGateway.sendEventToCoordinator(
new LockReleasedEvent(tableName, streamRecord.getTimestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the LockReleasedEvent is unnecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I keep the code since the reason above I use processWatermark or processElement to differentiate between the restoreLock release and the lockHeld release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the cost of sending a message, but theoretically we could differentiate between the lock release requests simply by the lockId and the timestamp. If we keep track of the release trigger timestamp on the TriggerManager side, then we could decide if this is a recovery or a task lock.

With the current solution we always send 2 release requests when a normal task is finished. Which is unnecessary, and also might cause concurrency issues, if between the 2 release message the TriggerManager triggers another Task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change this part to use the “lockTime” in a Long instead of two boolean flag ,and use keep only one place to send release event.


@SuppressWarnings("FutureReturnValueIgnored")
private void registerTriggerManagerReceiveReleaseEvent(LockRegisterEvent lockRegisterEvent) {
LOCK_RELEASE_CONSUMERS.put(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check that we don't accidentally overwrite any other consumer?
We use the JobManager's JVM, so we might get accidental issues if multiple job is registered on the same JM, or on job restart.

@mxm: Any ideas about the possible issues here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is classloader-level isolation between coordinators from different jobs, which means that there will be multiple instances of the static field because every job will load this class through its user class loader.

+1 for checking for existing consumers though. Is the locking meant to be re-entrant? Even so, we should check that its actually re-entrant and not coming from a different subtask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TriggerManagerOperator and LockRemoverOperator both are forceNonParallel, so they will only 1 subtask, so for now I haven’t taken registrations from different subtasks into account.

If the job restarts, it will re-register during initializeState, overwriting the previous one so that we always have the latest reference.So is it still necessary to do any checks here, and if so, what exactly should we be checking for?

Comment on lines +155 to +166
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
// We don’t need to track how many locks are currently held, because when recovering from state,
// a `recover lock` will be issued to ensure all tasks finish running and then release all
// locks.
// The `TriggerManagerOperator` already keeps the `TableChange` state and related information,
// so there’s no need to store additional state here.
runInCoordinatorThread(
() -> {
resultFuture.complete(new byte[0]);
},
String.format(Locale.ROOT, "taking checkpoint %d", checkpointId));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we checkpoint all the locks? This will give us a precise snapshot of the current locking state. That way, we could also get rid of the recovery lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, some tasks are stateless and will not continue execution after a job failure and restart. If we remove the recovery lock, when restore this lock, then this lock would never be released. If I understand correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stateless tasks who do not continue after restore, should send a lock release message on restore. This will ensure that the lock will get released if it was previously held.

In general, if we want table maintenance to be stateful, the lock state should be included as well. We have the option to do this now because we maintain the lock state here.

See also #15042 (comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/Guosmilesmile/iceberg/blob/14c82af75296a63b8552a6fc01f88c8e7713f627/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java#L176-L185

In this PR, we only have lockTime instead of a boolean flag. After restore, I reset the lock time to the current time in the TriggerManagerOperator, which is equivalent to releasing the lock. Therefore, there is no need to release the lock again from the coordinator; it has already been handled inside TriggerManagerOperator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding recovery strategies for stateful tasks, these are two ideas:

  1. Current approach

The TriggerManagerOperator does not persist the historical lock state. Instead, upon recovery it acquires a new lock and sends a recover trigger downstream. Once all tasks have finished, the downstream will release the lock.

  1. Alternative approach

If we persist the lock state in TriggerManagerOperator, then upon task recovery there are two scenarios:

  1. Stateless tasks will not release the lock.
  2. Stateful tasks will release the lock when the task completes.

In that case, we would need to determine explicitly, based on the task type, whether we should manually trigger a lock release during recovery. Stateful tasks must not trigger this manually. This feels like it would couple the design more tightly to the task implementation, and each task would need to be modified. By contrast, approach 1 looks more generic and reusable.

Is my understanding correct? I’d like to hear your thoughts.@mxm @pvary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't considered stateless tasks as much as stateful tasks, but I don't think it is a bad idea to make all tasks stateful in the sense that they are guaranteed to send a lock on restore. Maybe using a recovery approach is simpler after all, I just didn't find it to be very straight-forward.


@SuppressWarnings("FutureReturnValueIgnored")
private void registerTriggerManagerReceiveReleaseEvent(LockRegisterEvent lockRegisterEvent) {
LOCK_RELEASE_CONSUMERS.put(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is classloader-level isolation between coordinators from different jobs, which means that there will be multiple instances of the static field because every job will load this class through its user class loader.

+1 for checking for existing consumers though. Is the locking meant to be re-entrant? Even so, we should check that its actually re-entrant and not coming from a different subtask.


@SuppressWarnings("FutureReturnValueIgnored")
private void registerTriggerManagerReceiveReleaseEvent(LockRegisterEvent lockRegisterEvent) {
LOCK_RELEASE_CONSUMERS.put(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered a different flow, where we confirm the lock registration? Why are we assuming that every lock register event succeeds?

}

// register the lock register event
operatorEventGateway.sendEventToCoordinator(new LockRegisterEvent(tableName, current));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK We register once on state restore, but never again, even after the lock has been released again. Am I missing something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are mainly registering something like a hook that can receive the “rock remove” signal from coordinate.
It is registered once when the job starts for the first time or when state is restored, and this registration overwrites the previous one.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR supersede #15042? Or are we currently evaluating both approaches?

Comment on lines 50 to 51
* TriggerManagerOperator sends events to the coordinator to acquire a lock, then waits for the
* response. If the response indicates that the lock has been acquired, it fires a trigger;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this is no longer true for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I still keep the old state , I have change it to new version. Thanks for point it out.

Comment on lines +155 to +166
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
// We don’t need to track how many locks are currently held, because when recovering from state,
// a `recover lock` will be issued to ensure all tasks finish running and then release all
// locks.
// The `TriggerManagerOperator` already keeps the `TableChange` state and related information,
// so there’s no need to store additional state here.
runInCoordinatorThread(
() -> {
resultFuture.complete(new byte[0]);
},
String.format(Locale.ROOT, "taking checkpoint %d", checkpointId));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stateless tasks who do not continue after restore, should send a lock release message on restore. This will ensure that the lock will get released if it was previously held.

In general, if we want table maintenance to be stateful, the lock state should be included as well. We have the option to do this now because we maintain the lock state here.

See also #15042 (comment).

@Guosmilesmile
Copy link
Contributor Author

@mxm This PR is an improvement based on the comments in #15042, and I hope it will replace #15042. However, I’m worried that #15042 might still be in use, so I haven’t closed it for now. Please treat this PR as the primary one, and I apologize for any confusion this may cause.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Guosmilesmile! I appreciate your work on this.

I think we can remove the recovery lock, if we switch to this design:

Flink locking

The important parts are that we (a) introduce ACK messages between both coordinators and the corresponding operators, (b) checkpoint the lock state on just the TriggerManager coordinator.

On the LockRemover operator side, we will need to persist outgoing lock releases in Flink state, which is checkpointed. Only once a LockReleased message has been received, we can clear this lock state. On restore, we read the pending lock release state and send corresponding LockRelease messages to the LockRemover coordinator, which clears the corresponding lock and acks the request.

On the TriggerManager operator side, we acquire the lock and will be notified by the LockAcquired message from the coordinator once the lock is available.

The TriggerManager coordinator maintains the source of truth for the lock state. Modifications to the shared lock state must be synchronized. No modifications can take place during coordinator checkpointing. The order in which coordinators are checkpointed isn't defined, so whichever coordinator comes first, must lock state updates. Any updates must be deferred to after checkpointing.

@Override
public OperatorCoordinator.Provider getCoordinatorProvider(
String operatorName, OperatorID operatorID) {
return new TableMaintenanceCoordinatorProvider(operatorName, operatorID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that we're now using the same coordinator code for both the TriggerManager and the LockRemover coordinator, albeit parameterized with the actual operator name. I found that confusing because the code executes different branches depending on which role it fulfills.

@pvary
Copy link
Contributor

pvary commented Jan 30, 2026

Thanks @Guosmilesmile! I appreciate your work on this.

I think we can remove the recovery lock, if we switch to this design:

I don't really like the solution:

  • It would need way more messages than a simple release lock solution
  • It would mean that we need to have constant communication with the Job Manager when we are waiting for the lock

I don't think that the recovery lock is that complicated to make it worth to accept the messaging overhead and also add state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants