fix(server): fix the scheduler and the scheduler selection logic#2937
fix(server): fix the scheduler and the scheduler selection logic#2937Tsukilc wants to merge 35 commits intoapache:masterfrom
Conversation
…into fix/scheduler
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2937 +/- ##
=============================================
- Coverage 39.38% 29.23% -10.16%
+ Complexity 456 264 -192
=============================================
Files 812 801 -11
Lines 68660 67341 -1319
Branches 8968 8734 -234
=============================================
- Hits 27044 19689 -7355
- Misses 38824 45423 +6599
+ Partials 2792 2229 -563 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
… into fix/scheduler
47ec466 to
68b906a
Compare
There was a problem hiding this comment.
Pull request overview
This PR disables the master-worker scheduling logic in StandardTaskScheduler, transitioning to a simplified task execution model. The scheduler type is now auto-determined based on storage backend (distributed for hstore, local otherwise), and server.id is auto-generated if not specified.
Changes:
- Removed SCHEDULER_TYPE configuration option and master-role scheduling logic from TaskManager and StandardTaskScheduler
- Removed server.id and server.role configuration requirements from rest-server.properties
- Added auto-generation of server.id using UUID when not explicitly configured
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| CoreOptions.java (hugegraph-struct) | Removed SCHEDULER_TYPE configuration option |
| CoreOptions.java (hugegraph-core) | Removed SCHEDULER_TYPE configuration option |
| TaskCoreTest.java | Updated tests with conditional assertions for DistributedTaskScheduler and code formatting changes |
| rest-server.properties | Removed server.id and server.role configuration entries |
| hugegraph.properties | Removed task.scheduler_type configuration entry |
| hstore.properties.template | Removed task.scheduler_type configuration entry |
| TaskManager.java | Removed scheduling thread pool and master/worker role-based scheduling logic |
| StandardTaskScheduler.java | Simplified to remove multi-node task distribution, server filtering, and load balancing |
| ServerInfoManager.java | Removed pickWorkerNode, updateServerInfos, and load management methods |
| HugeServerInfo.java | Removed suitableFor method for task-server matching |
| StandardRoleListener.java | Removed enableRoleElection call during initialization |
| GlobalMasterInfo.java | Updated TODO comment |
| StandardHugeGraph.java | Changed schedulerType to be dynamically determined based on backend type |
| GraphManager.java | Added auto-generation of server.id and removed validation requirements |
| ServerOptions.java | Changed SERVER_ID default to empty string and updated description |
| hugegraph.properties.template | Removed task.scheduler_type from cluster test configuration |
| server2-conf/hugegraph.properties | Removed task.scheduler_type from Docker configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
Outdated
Show resolved
Hide resolved
| HugeTask<?> memTask = this.tasks.get(task.id()); | ||
| if (memTask != null) { | ||
| boolean cancelled = memTask.cancel(true); | ||
| LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); |
There was a problem hiding this comment.
The cancel method no longer saves the task to storage when cancelling a task that's in memory. If the task is running in memory and gets cancelled via memTask.cancel(true), the cancelled status won't be persisted to storage until the task naturally completes. This could cause issues if the server restarts before the task finishes - the task would be restored and re-executed. Consider saving the task status to storage after successfully calling memTask.cancel(true).
| LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); | |
| LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); | |
| if (cancelled) { | |
| this.save(memTask); | |
| } |
There was a problem hiding this comment.
this.callable.cancelled()会负责调用 taskSchduler 持久化
...aph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
Show resolved
Hide resolved
...aph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
Show resolved
Hide resolved
...aph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
Show resolved
Hide resolved
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
Outdated
Show resolved
Hide resolved
5416720 to
f8fc58a
Compare
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
| @@ -284,14 +295,41 @@ protected <V> void initTaskParams(HugeTask<V> task) { | |||
| } | |||
There was a problem hiding this comment.
在 DistributedTaskScheduler.delete() 方法第 286-305 行,删除逻辑与原实现存在重大差异:
原逻辑:
force=false: 设置状态为 DELETING,返回 nullforce=true: 直接从数据库删除
新逻辑:
if (!force) {
if (!task.completed() && task.status() != TaskStatus.DELETING) {
throw new IllegalArgumentException(
String.format("Can't delete incomplete task '%s' in status %s, " +
"Please try to cancel the task first",
id, task.status()));
}
}
return this.deleteFromDB(id);问题:
- 移除了 DELETING 状态的设置逻辑,这可能破坏依赖定时清理的代码
- 非强制删除现在会直接删除完成的任务,而不是先标记为 DELETING
- 与
StandardTaskScheduler的实现可能不一致
建议:重新考虑删除流程,保持与原有逻辑的兼容性,或在 PR 描述中明确说明此行为变更
| LOG.info("cancel task({}) error, task has completed", task.id()); | ||
| task.overwriteStatus(TaskStatus.CANCELLING); | ||
| } | ||
| } |
There was a problem hiding this comment.
第 334 行的 TODO 注释过于模糊:
//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;问题:
- 未说明为什么要移除 serverInfoManager
- 未说明移除的时间节点或前提条件
- 注释掉的代码应该删除,而不是保留
建议:
| } | |
| // TODO(issue-XXX): Remove serverInfoManager.close() after migrating to | |
| // pure single-node architecture. Currently kept for backward compatibility. | |
| return this.serverManager().close(); |
代码审查总结感谢提交这个重要的架构简化 PR!我已经详细审查了所有变更,除了已经在具体代码行发布的评论外,还有以下关键问题需要关注:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 12 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
Show resolved
Hide resolved
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
Show resolved
Hide resolved
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java
Show resolved
Hide resolved
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
| @Override | ||
| public String schedulerType() { | ||
| return StandardHugeGraph.this.schedulerType; | ||
| // Use distributed scheduler for hstore backend, otherwise use local |
There was a problem hiding this comment.
The schedulerType() method now determines the scheduler type based on whether the backend is hstore. However, this logic change is undocumented. Consider adding a comment explaining why hstore backends require distributed scheduling while other backends use local scheduling, as this is an important architectural decision.
| // Use distributed scheduler for hstore backend, otherwise use local | |
| /* | |
| * HStore is a distributed backend: data and tasks may be handled by | |
| * multiple graph servers that must coordinate scheduling and state. | |
| * For this reason we require a distributed task scheduler when the | |
| * backend is hstore so that jobs can be balanced and recovered | |
| * across nodes. For other backends, the graph is served by a single | |
| * server instance and tasks are executed locally, so a local | |
| * in-process scheduler is sufficient and avoids the overhead of | |
| * distributed coordination. | |
| */ |
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
...aph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
Show resolved
Hide resolved
…hugegraph into fix/scheduler
This reverts commit 28e0390.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
Show resolved
Hide resolved
| // Transfer `pd.peers` from server config to graph config | ||
| // Only inject if not already configured in graph config | ||
| if (!config.containsKey("pd.peers")) { |
There was a problem hiding this comment.
The pd.peers configuration is being transferred from server config to graph config (lines 1571-1574), but this happens unconditionally for all graphs during loadGraph. This means even graphs that don't need PD will get this configuration injected. Consider checking if the graph actually uses PD (e.g., by checking if it's an hstore backend) before injecting this configuration, or document why all graphs need this configuration.
| // Transfer `pd.peers` from server config to graph config | |
| // Only inject if not already configured in graph config | |
| if (!config.containsKey("pd.peers")) { | |
| // Transfer `pd.peers` from server config to graph config when PD is used | |
| // Only inject if not already configured in graph config and backend uses PD | |
| String backend = config.get(CoreOptions.BACKEND); | |
| boolean backendUsesPd = "hstore".equalsIgnoreCase(backend); | |
| if (backendUsesPd && !config.containsKey("pd.peers")) { |
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
Show resolved
Hide resolved
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Show resolved
Hide resolved
hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
Show resolved
Hide resolved
| // Task not running locally, update status to CANCELLING | ||
| // for cronSchedule() or other nodes to handle | ||
| TaskStatus currentStatus = task.status(); | ||
| if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { |
There was a problem hiding this comment.
currentStatus 做 CAS 更新有明显竞态:调用方读取任务后到执行 cancel() 之间,任务可能从 QUEUED 变成 RUNNING,导致 updateStatus(..., currentStatus, CANCELLING) 失败并直接返回,取消请求被静默丢弃。建议 CAS 失败后读取最新状态并做一次兜底更新(只要未完成就转 CANCELLING)。
| if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { | |
| if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { | |
| HugeTask<?> latest = this.taskWithoutResult(task.id()); | |
| if (!latest.completed() && !latest.cancelling()) { | |
| this.updateStatus(task.id(), null, TaskStatus.CANCELLING); | |
| task.overwriteStatus(TaskStatus.CANCELLING); | |
| } | |
| } else { | |
| task.overwriteStatus(TaskStatus.CANCELLING); | |
| } |
|
|
||
| return task; | ||
| } | ||
| task.status(TaskStatus.QUEUED); |
There was a problem hiding this comment.
QUEUED 并提交,但不再写入 task.server。结合 restoreTasks() 取消按 server 过滤后,在共享后端的多实例场景(如 MySQL/PostgreSQL/Cassandra)重启时,每个节点都可能恢复并执行同一批 pending 任务,产生重复执行。建议保留 owner 绑定,至少让恢复阶段可按 owner 做隔离。
| task.status(TaskStatus.QUEUED); | |
| task.status(TaskStatus.QUEUED); | |
| task.server(this.serverManager().selfNodeId()); | |
| this.save(task); | |
| return this.submitTask(task); |
|
|
||
| # task config | ||
| task.scheduler_type=local | ||
| task.schedule_period=10 |
There was a problem hiding this comment.
task.scheduler_type,但同目录的 server1-conf / server3-conf 仍保留该配置项。由于本 PR 同时删除了 CoreOptions.SCHEDULER_TYPE,这会导致示例配置不一致,并在启动时产生冗余配置告警。建议同步清理另外两个 docker 配置文件,避免误导运维配置。
|
|
||
| // Process schema task | ||
| // Handle gremlin task | ||
| // Handle OLAP calculation tasks |
There was a problem hiding this comment.
在 DistributedTaskScheduler.java:268-270:
if (this.closed.get()) {
return;
}问题:
虽然添加了 closed 检查,但 this.graph.started() 和 this.graph.closed() 在后续调用时可能已经改变,存在 TOCTOU (Time-of-check to time-of-use) 问题。
建议:
| // Handle OLAP calculation tasks | |
| if (this.closed.get() || !this.graph.started() || this.graph.closed()) { | |
| return; | |
| } |
将所有状态检查合并到一个 if 语句中,减少竞态窗口。
| } else { | ||
| return this.deleteFromDB(id); | ||
| // Already in DELETING status, delete directly from DB | ||
| // Completed tasks can also be deleted directly |
There was a problem hiding this comment.
在 DistributedTaskScheduler.java:355-366:
if (!force && !task.completed()) {
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
// 下面的代码永远无法执行
}问题:
- Line 360 的注释 "Already in DELETING status" 放在了
return null之后,永远无法到达 - 逻辑不清晰:应该区分「未完成的任务」和「已经是 DELETING 状态的任务」
建议:
| // Completed tasks can also be deleted directly | |
| HugeTask<?> task = this.taskWithoutResult(id); | |
| if (!force && !task.completed()) { | |
| // Non-force mode: mark incomplete tasks as DELETING for async cleanup | |
| this.updateStatus(id, null, TaskStatus.DELETING); | |
| return null; | |
| } | |
| // Force mode OR task is completed OR already DELETING: delete directly | |
| return this.deleteFromDB(id); |
| } catch (ExecutionException | InterruptedException e) { | ||
| LOG.warn("Exception while waiting for cron task to complete", e); | ||
| } | ||
|
|
There was a problem hiding this comment.
在 DistributedTaskScheduler.java:399-410:
cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);问题:
- 如果 cron 任务超时,后续的
taskDbExecutor清理可能在 cron 任务仍持有事务时执行 - 注释说 "ensure all transactions are closed",但超时时无法保证
schedulePeriod可能很大(如 60 秒),导致关闭等待过长
建议:
| // Wait for cron task with reasonable timeout | |
| long waitTime = Math.min(schedulePeriod + 5, 15); // Cap at 15 seconds | |
| try { | |
| cronFuture.get(waitTime, TimeUnit.SECONDS); | |
| } catch (CancellationException e) { | |
| LOG.debug("Cron task was cancelled"); | |
| } catch (TimeoutException e) { | |
| LOG.warn("Cron task did not complete in {}s, proceeding with shutdown", waitTime); | |
| // Force interrupt the cron task thread if possible | |
| cronFuture.cancel(true); | |
| } catch (ExecutionException | InterruptedException e) { | |
| LOG.warn("Exception while waiting for cron task to complete", e); | |
| } |
|
|
||
| //todo: serverInfoManager section should be removed in the future. | ||
| return this.serverManager().close(); | ||
| //return true; |
There was a problem hiding this comment.
🧹 TODO 注释应该追踪到 Issue
在 DistributedTaskScheduler.java:422-424:
//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;建议:
将 TODO 关联到 GitHub Issue,方便追踪:
| //return true; | |
| // TODO(#issue-number): Remove serverInfoManager completely after full single-node migration | |
| return this.serverManager().close(); |
并删除注释掉的代码 //return true;
| // for cronSchedule() or other nodes to handle | ||
| TaskStatus currentStatus = task.status(); | ||
| if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { | ||
| LOG.info("Failed to cancel task '{}', status may have changed from {}", |
There was a problem hiding this comment.
在 DistributedTaskScheduler.java:323-330:
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
LOG.info("Failed to cancel task '{}', status may have changed from {}",
task.id(), currentStatus);
} else {
task.overwriteStatus(TaskStatus.CANCELLING);
}问题:
- 当
updateStatus返回 false 时(数据库更新失败),内存中的task对象状态未同步 - 调用者可能认为 cancel 成功,但数据库中状态未改变
- 缺少错误处理:应该重新读取最新状态或抛出异常
建议:
| LOG.info("Failed to cancel task '{}', status may have changed from {}", | |
| TaskStatus currentStatus = task.status(); | |
| if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { | |
| // Status changed concurrently, reload from DB | |
| HugeTask<?> latestTask = this.taskWithoutResult(task.id()); | |
| LOG.info("Failed to cancel task '{}': status changed from {} to {}", | |
| task.id(), currentStatus, latestTask.status()); | |
| task.overwriteStatus(latestTask.status()); | |
| } else { | |
| task.overwriteStatus(TaskStatus.CANCELLING); | |
| } |
imbajin
left a comment
There was a problem hiding this comment.
Code Review Summary (中文)
我已经完成了对 PR #2937 的详细代码审查。以下是关键发现:
‼️ 核心逻辑问题 (High Priority)
1. server.id 自动生成可能导致任务恢复失败
位置: GraphManager.java:72-76
当 server.id 为空时会自动生成随机 UUID:
server = "server-" + UUID.randomUUID().toString().substring(0, 8);问题:
- 服务重启后会生成新的 server.id,导致之前分配给该节点的任务无法恢复
StandardTaskScheduler.restoreTasks()中移除了selfServer.equals(task.server())过滤,现在会恢复所有任务,这与分布式场景不兼容
建议: 如果彻底移除多节点调度,应同时从 HugeTask 中移除 server 字段;或者基于持久化标识生成稳定的 server.id
2. 异常类型选择不当
位置: GraphManager.java:1131
原代码抛出 NotSupportException 表示类型转换失败,新代码改为 NotFoundException,但实际上图可能存在只是类型不匹配。
建议: 保持使用 NotSupportException,或改为更明确的异常信息说明是类型不匹配问题。
3. 调度器类型判断逻辑不完善
位置: StandardHugeGraph.java:1177-1178
return StandardHugeGraph.this.isHstore() ? "distributed" : "local";问题:
- 仅基于 backend 类型判断,未考虑多节点 RocksDB 集群场景(通过 PD 协调)
GraphManager.usePD()方法已可见,应优先使用它判断
建议: 改为 return graphManager.usePD() ? "distributed" : "local";
⚠️ 重要问题 (Medium Priority)
4. 任务删除逻辑错误
位置: DistributedTaskScheduler.java:355-366
Line 360 的注释 "Already in DELETING status" 放在了 return null 之后,永远无法到达。逻辑不清晰,应该明确区分「未完成的任务」和「已经是 DELETING 状态的任务」。
5. 状态更新失败后的任务状态不一致
位置: DistributedTaskScheduler.cancel()
当 updateStatus 返回 false 时(数据库更新失败),内存中的 task 对象状态未同步。应该重新读取最新状态或抛出异常。
6. 等待超时可能导致事务泄漏
位置: DistributedTaskScheduler.close():399-410
如果 cron 任务超时,后续的清理可能在 cron 任务仍持有事务时执行。schedulePeriod 可能很大,建议限制最大等待时间。
7. 测试用例修改掩盖了潜在问题
位置: TaskCoreTest.java:1290-1296
原本期望状态为 CANCELLING,现在放宽为 "CANCELLING 或 CANCELLED 都行"。这种不确定性可能导致竞态条件难以调试。
🧹 代码质量建议 (Low Priority)
8. 配置注入可能覆盖用户设置
位置: GraphManager.java:1105-1111
应该检查 pdPeers 是否有效再注入:
if (!config.containsKey("pd.peers")) {
String pdPeers = this.conf.get(ServerOptions.PD_PEERS);
if (StringUtils.isNotEmpty(pdPeers)) {
config.addProperty("pd.peers", pdPeers);
}
}9. TODO 注释应该追踪到 Issue
将 TODO 关联到 GitHub Issue,方便追踪;删除注释掉的代码。
总结
这个 PR 的核心改动是将任务调度从多节点模式简化为单节点模式,但存在以下关键风险:
- 向后兼容性: server.id 随机生成会导致重启后任务恢复失败
- 调度器选择: 应该基于 PD 存在性而非 backend 类型判断
- 状态一致性: 多处状态更新缺少失败后的同步处理
- 测试覆盖: 测试断言被放宽,可能掩盖并发问题
建议在合并前重点关注第 1-3 个核心逻辑问题。
New Features
Server IDs now support automatic generation, eliminating the need for manual configuration.
Refactoring
Significantly simplified the task scheduling architecture by adopting a single-node scheduling path by default, removing multi-node scheduling and role-election–related controls.
Streamlined server information and scheduling management logic by eliminating redundant multi-node–related workflows.
Configuration Changes
Removed explicit scheduling type configuration; the scheduling mode is now determined by the runtime environment.
Bug Fixes
Adjusted the handling logic for task cancellation/deletion and missing tasks, improving exception handling and state management.
Tests
Updated test cases to accommodate the new scheduling behavior and serialization differences.