Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b4321eb
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
d19096d
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
a32901a
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
42ee2ba
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
c976caa
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
570d670
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 10, 2025
5915428
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
e7da7e5
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
b0f381c
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
804055d
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
2e8578e
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
2c44cee
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 10, 2025
925c384
fix(server): fix npe in non-auth mode
Tsukilc Jan 11, 2026
3974048
fix(server): fix npe in non-auth mode
Tsukilc Jan 11, 2026
a349f62
fix(server): fix npe in non-auth mode
Tsukilc Jan 11, 2026
3e7bc6f
fix(server): remove server.id
Tsukilc Jan 11, 2026
e6cc98b
Merge branch 'fix/scheduler' of https://github.com/hugegraph/hugegrap…
Tsukilc Jan 11, 2026
e6f6487
fix(server): remove task.scheduler_type
Tsukilc Jan 11, 2026
b325dba
fix(server): remove task.scheduler_type
Tsukilc Jan 11, 2026
68b906a
Merge branch 'master' of https://github.com/Tsukilc/incubator-hugegra…
Tsukilc Jan 11, 2026
1113520
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
5ffd20b
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
a110112
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
a31e937
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
d89b9bd
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
5807fb7
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
f8fc58a
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
6dd52e4
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
af85bef
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
b332674
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
cac78f4
Merge branch 'master' into pr/2937
imbajin Feb 3, 2026
28e0390
fix(server): fix some issues of the distributed scheduler
Tsukilc Feb 10, 2026
5e30cac
Merge branch 'fix/scheduler' of https://github.com/Tsukilc/incubator-…
Tsukilc Feb 10, 2026
b70788f
Revert "fix(server): fix some issues of the distributed scheduler"
Tsukilc Feb 11, 2026
7ba40bd
fix(server): fix some issues of the distributed scheduler
Tsukilc Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker/configs/server2-conf/graphs/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ serializer=binary
pd.peers=127.0.0.1:8686,127.0.0.1:8687,127.0.0.1:8688

# task config
task.scheduler_type=local
task.schedule_period=10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 这里已经移除了 task.scheduler_type,但同目录的 server1-conf / server3-conf 仍保留该配置项。由于本 PR 同时删除了 CoreOptions.SCHEDULER_TYPE,这会导致示例配置不一致,并在启动时产生冗余配置告警。建议同步清理另外两个 docker 配置文件,避免误导运维配置。

task.retry=0
task.wait_timeout=10
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ store=hugegraph
pd.peers=$PD_PEERS_LIST$

# task config
task.scheduler_type=local
task.schedule_period=10
task.retry=0
task.wait_timeout=10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ public class ServerOptions extends OptionHolder {
public static final ConfigOption<String> SERVER_ID =
new ConfigOption<>(
"server.id",
"The id of hugegraph-server.",
disallowEmpty(),
"server-1"
"The id of hugegraph-server, auto-generated if not specified.",
null,
""
);
public static final ConfigOption<String> SERVER_ROLE =
new ConfigOption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.exception.ExistedException;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.io.HugeGraphSONModule;
import org.apache.hugegraph.k8s.K8sDriver;
Expand Down Expand Up @@ -195,7 +197,17 @@ public final class GraphManager {
public GraphManager(HugeConfig conf, EventHub hub) {
LOG.info("Init graph manager");
E.checkArgumentNotNull(conf, "The config can't be null");

// Auto-generate server.id if not configured.
// Random generation is to prevent duplicate id error reports.This id is currently
// meaningless and needs to be completely removed serverInfoManager in
// the future
String server = conf.get(ServerOptions.SERVER_ID);
if (StringUtils.isEmpty(server)) {
server = "server-" + UUID.randomUUID().toString().substring(0, 8);
LOG.info("Auto-generated server.id: {}", server);
conf.setProperty(ServerOptions.SERVER_ID.name(), server);
}
String role = conf.get(ServerOptions.SERVER_ROLE);

this.config = conf;
Expand All @@ -206,10 +218,6 @@ public GraphManager(HugeConfig conf, EventHub hub) {
conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S);
this.startIgnoreSingleGraphError = conf.get(
ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR);
E.checkArgument(server != null && !server.isEmpty(),
"The server name can't be null or empty");
E.checkArgument(role != null && !role.isEmpty(),
"The server role can't be null or empty");
this.graphsDir = conf.get(ServerOptions.GRAPHS);
this.cluster = conf.get(ServerOptions.CLUSTER);
this.graphSpaces = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -276,7 +284,7 @@ private static String serviceId(String graphSpace, Service.ServiceType type,
.replace("_", "-").toLowerCase();
}

private boolean usePD() {
public boolean usePD() {
return this.PDExist;
}

Expand Down Expand Up @@ -1557,6 +1565,14 @@ private void loadGraph(String name, String graphConfPath) {
String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

// Transfer `pd.peers` from server config to graph config
// Only inject if not already configured in graph config
if (!config.containsKey("pd.peers")) {
Comment on lines +1569 to +1571
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pd.peers configuration is being transferred from server config to graph config (lines 1571-1574), but this happens unconditionally for all graphs during loadGraph. This means even graphs that don't need PD will get this configuration injected. Consider checking if the graph actually uses PD (e.g., by checking if it's an hstore backend) before injecting this configuration, or document why all graphs need this configuration.

Suggested change
// Transfer `pd.peers` from server config to graph config
// Only inject if not already configured in graph config
if (!config.containsKey("pd.peers")) {
// Transfer `pd.peers` from server config to graph config when PD is used
// Only inject if not already configured in graph config and backend uses PD
String backend = config.get(CoreOptions.BACKEND);
boolean backendUsesPd = "hstore".equalsIgnoreCase(backend);
if (backendUsesPd && !config.containsKey("pd.peers")) {

Copilot uses AI. Check for mistakes.
String pdPeers = this.conf.get(ServerOptions.PD_PEERS);
config.addProperty("pd.peers", pdPeers);
}

this.transferRoleWorkerConfig(config);

Graph graph = GraphFactory.open(config);
Expand Down Expand Up @@ -1637,10 +1653,6 @@ private void checkBackendVersionOrExit(HugeConfig config) {
private void initNodeRole() {
String id = config.get(ServerOptions.SERVER_ID);
String role = config.get(ServerOptions.SERVER_ROLE);
E.checkArgument(StringUtils.isNotEmpty(id),
"The server name can't be null or empty");
E.checkArgument(StringUtils.isNotEmpty(role),
"The server role can't be null or empty");

NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
boolean supportRoleElection = !nodeRole.computer() &&
Expand Down Expand Up @@ -1960,7 +1972,7 @@ public HugeGraph graph(String graphSpace, String name) {
} else if (graph instanceof HugeGraph) {
return (HugeGraph) graph;
}
throw new NotSupportException("graph instance of %s", graph.getClass());
throw new NotFoundException(String.format("Graph '%s' does not exist", name));
}

public void dropGraphLocal(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph {
private final BackendStoreProvider storeProvider;
private final TinkerPopTransaction tx;
private final RamTable ramtable;
private final String schedulerType;
private volatile boolean started;
private volatile boolean closed;
private volatile GraphMode mode;
Expand Down Expand Up @@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) {
this.closed = false;
this.mode = GraphMode.NONE;
this.readMode = GraphReadMode.OLTP_ONLY;
this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);

LockUtil.init(this.spaceGraphName());

Expand Down Expand Up @@ -315,6 +313,7 @@ public String backend() {
return this.storeProvider.type();
}

@Override
public BackendStoreInfo backendStoreInfo() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
// TODO: pass storeProvider.metaStore()
Expand Down Expand Up @@ -465,6 +464,7 @@ public void updateTime(Date updateTime) {
this.updateTime = updateTime;
}

@Override
public void waitStarted() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.schemaTransaction();
Expand Down Expand Up @@ -1629,7 +1629,9 @@ public <T> void submitEphemeralJob(EphemeralJob<T> job) {

@Override
public String schedulerType() {
return StandardHugeGraph.this.schedulerType;
// Use distributed scheduler for hstore backend, otherwise use local
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schedulerType() method now determines the scheduler type based on whether the backend is hstore. However, this logic change is undocumented. Consider adding a comment explaining why hstore backends require distributed scheduling while other backends use local scheduling, as this is an important architectural decision.

Suggested change
// Use distributed scheduler for hstore backend, otherwise use local
/*
* HStore is a distributed backend: data and tasks may be handled by
* multiple graph servers that must coordinate scheduling and state.
* For this reason we require a distributed task scheduler when the
* backend is hstore so that jobs can be balanced and recovered
* across nodes. For other backends, the graph is served by a single
* server instance and tasks are executed locally, so a local
* in-process scheduler is sufficient and avoids the overhead of
* distributed coordination.
*/

Copilot uses AI. Check for mistakes.
// After the merger of rocksdb and hstore, consider whether to change this logic
return StandardHugeGraph.this.isHstore() ? "distributed" : "local";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,7 @@ public class CoreOptions extends OptionHolder {
rangeInt(1, 500),
1
);
public static final ConfigOption<String> SCHEDULER_TYPE =
new ConfigOption<>(
"task.scheduler_type",
"The type of scheduler used in distribution system.",
allowValues("local", "distributed"),
"local"
);

public static final ConfigOption<Boolean> TASK_SYNC_DELETION =
new ConfigOption<>(
"task.sync_deletion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hugegraph.type.define.NodeRole;
import org.apache.hugegraph.util.E;

// TODO: rename to GlobalNodeRoleInfo
// TODO: We need to completely delete the startup of master-worker
public final class GlobalMasterInfo {

private static final NodeInfo NO_MASTER = new NodeInfo(false, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.hugegraph.masterelection;

import java.util.Objects;

import org.apache.hugegraph.task.TaskManager;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import java.util.Objects;

public class StandardRoleListener implements RoleListener {

private static final Logger LOG = Log.logger(StandardRoleListener.class);
Expand All @@ -36,7 +36,6 @@ public class StandardRoleListener implements RoleListener {
public StandardRoleListener(TaskManager taskManager,
GlobalMasterInfo roleInfo) {
this.taskManager = taskManager;
this.taskManager.enableRoleElection();
this.roleInfo = roleInfo;
this.selfIsMaster = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.slf4j.Logger;

public class DistributedTaskScheduler extends TaskAndResultScheduler {

private static final Logger LOG = Log.logger(DistributedTaskScheduler.class);
private final long schedulePeriod;
private final ExecutorService taskDbExecutor;
Expand Down Expand Up @@ -118,6 +121,11 @@ private static boolean sleep(long ms) {
public void cronSchedule() {
// Perform periodic scheduling tasks

// Check closed flag first to exit early
if (this.closed.get()) {
return;
}

if (!this.graph.started() || this.graph.closed()) {
return;
}
Expand Down Expand Up @@ -253,6 +261,10 @@ public <V> Future<?> schedule(HugeTask<V> task) {
return this.ephemeralTaskExecutor.submit(task);
}

// Validate task state before saving to ensure correct exception type
E.checkState(task.type() != null, "Task type can't be null");
E.checkState(task.name() != null, "Task name can't be null");

// Process schema task
// Handle gremlin task
// Handle OLAP calculation tasks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 并发安全问题: 缺少 volatile 或同步保护

DistributedTaskScheduler.java:268-270:

if (this.closed.get()) {
    return;
}

问题:
虽然添加了 closed 检查,但 this.graph.started()this.graph.closed() 在后续调用时可能已经改变,存在 TOCTOU (Time-of-check to time-of-use) 问题。

建议:

Suggested change
// Handle OLAP calculation tasks
if (this.closed.get() || !this.graph.started() || this.graph.closed()) {
return;
}

将所有状态检查合并到一个 if 语句中,减少竞态窗口。

Expand Down Expand Up @@ -284,14 +296,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 业务逻辑设计问题 - delete 方法的强制删除逻辑不一致

DistributedTaskScheduler.delete() 方法第 286-305 行,删除逻辑与原实现存在重大差异:

原逻辑:

  • force=false: 设置状态为 DELETING,返回 null
  • force=true: 直接从数据库删除

新逻辑:

if (!force) {
    if (!task.completed() && task.status() != TaskStatus.DELETING) {
        throw new IllegalArgumentException(
                String.format("Can't delete incomplete task '%s' in status %s, " +
                              "Please try to cancel the task first",
                              id, task.status()));
    }
}
return this.deleteFromDB(id);

问题:

  1. 移除了 DELETING 状态的设置逻辑,这可能破坏依赖定时清理的代码
  2. 非强制删除现在会直接删除完成的任务,而不是先标记为 DELETING
  3. StandardTaskScheduler 的实现可能不一致

建议:重新考虑删除流程,保持与原有逻辑的兼容性,或在 PR 描述中明确说明此行为变更

}

/**
* Note: This method will update the status of the input task.
*
* @param task
* @param <V>
*/
@Override
public <V> void cancel(HugeTask<V> task) {
// Update status to CANCELLING
if (!task.completed()) {
// Task not completed, can only execute status not CANCELLING
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
E.checkArgumentNotNull(task, "Task can't be null");

if (task.completed() || task.cancelling()) {
return;
}

LOG.info("Cancel task '{}' in status {}", task.id(), task.status());

// Check if task is running locally, cancel it directly if so
HugeTask<?> runningTask = this.runningTasks.get(task.id());
if (runningTask != null) {
boolean cancelled = runningTask.cancel(true);
if (cancelled) {
task.overwriteStatus(TaskStatus.CANCELLED);
Comment on lines +318 to +320
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel method now updates the task status directly by calling task.overwriteStatus(TaskStatus.CANCELLED) on line 320 when the task is running locally. However, this doesn't persist the status change to the database. The task status should be saved to ensure consistency between in-memory state and persisted state.

Suggested change
boolean cancelled = runningTask.cancel(true);
if (cancelled) {
task.overwriteStatus(TaskStatus.CANCELLED);
TaskStatus previousStatus = task.status();
boolean cancelled = runningTask.cancel(true);
if (cancelled) {
if (this.updateStatus(task.id(), previousStatus,
TaskStatus.CANCELLED)) {
task.overwriteStatus(TaskStatus.CANCELLED);
} else {
LOG.info("Failed to persist cancelled status for task '{}', " +
"status may have changed from {}",
task.id(), previousStatus);
}

Copilot uses AI. Check for mistakes.
}
LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled);
return;
}

// Task not running locally, update status to CANCELLING
// for cronSchedule() or other nodes to handle
TaskStatus currentStatus = task.status();
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 这里使用 currentStatus 做 CAS 更新有明显竞态:调用方读取任务后到执行 cancel() 之间,任务可能从 QUEUED 变成 RUNNING,导致 updateStatus(..., currentStatus, CANCELLING) 失败并直接返回,取消请求被静默丢弃。建议 CAS 失败后读取最新状态并做一次兜底更新(只要未完成就转 CANCELLING)。

Suggested change
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
HugeTask<?> latest = this.taskWithoutResult(task.id());
if (!latest.completed() && !latest.cancelling()) {
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
task.overwriteStatus(TaskStatus.CANCELLING);
}
} else {
task.overwriteStatus(TaskStatus.CANCELLING);
}

LOG.info("Failed to cancel task '{}', status may have changed from {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 状态更新失败后的任务状态不一致

DistributedTaskScheduler.java:323-330:

if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
    LOG.info("Failed to cancel task '{}', status may have changed from {}",
             task.id(), currentStatus);
} else {
    task.overwriteStatus(TaskStatus.CANCELLING);
}

问题:

  • updateStatus 返回 false 时(数据库更新失败),内存中的 task 对象状态未同步
  • 调用者可能认为 cancel 成功,但数据库中状态未改变
  • 缺少错误处理:应该重新读取最新状态或抛出异常

建议:

Suggested change
LOG.info("Failed to cancel task '{}', status may have changed from {}",
TaskStatus currentStatus = task.status();
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
// Status changed concurrently, reload from DB
HugeTask<?> latestTask = this.taskWithoutResult(task.id());
LOG.info("Failed to cancel task '{}': status changed from {} to {}",
task.id(), currentStatus, latestTask.status());
task.overwriteStatus(latestTask.status());
} else {
task.overwriteStatus(TaskStatus.CANCELLING);
}

task.id(), currentStatus);
} else {
LOG.info("cancel task({}) error, task has completed", task.id());
task.overwriteStatus(TaskStatus.CANCELLING);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 代码可维护性 - TODO 注释应该更具体

第 334 行的 TODO 注释过于模糊:

//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;

问题:

  1. 未说明为什么要移除 serverInfoManager
  2. 未说明移除的时间节点或前提条件
  3. 注释掉的代码应该删除,而不是保留

建议:

Suggested change
}
// TODO(issue-XXX): Remove serverInfoManager.close() after migrating to
// pure single-node architecture. Currently kept for backward compatibility.
return this.serverManager().close();


Expand All @@ -316,14 +355,18 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {

@Override
public <V> HugeTask<V> delete(Id id, boolean force) {
if (!force) {
// Change status to DELETING, perform the deletion operation through automatic
// scheduling.
HugeTask<?> task = this.taskWithoutResult(id);

if (!force && !task.completed()) {
// Check task status: can't delete running tasks without force
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
} else {
return this.deleteFromDB(id);
// Already in DELETING status, delete directly from DB
// Completed tasks can also be deleted directly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 任务删除逻辑错误

DistributedTaskScheduler.java:355-366:

if (!force && !task.completed()) {
    this.updateStatus(id, null, TaskStatus.DELETING);
    return null;
    // 下面的代码永远无法执行
}

问题:

  1. Line 360 的注释 "Already in DELETING status" 放在了 return null 之后,永远无法到达
  2. 逻辑不清晰:应该区分「未完成的任务」和「已经是 DELETING 状态的任务」

建议:

Suggested change
// Completed tasks can also be deleted directly
HugeTask<?> task = this.taskWithoutResult(id);
if (!force && !task.completed()) {
// Non-force mode: mark incomplete tasks as DELETING for async cleanup
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
}
// Force mode OR task is completed OR already DELETING: delete directly
return this.deleteFromDB(id);

}

// Delete from DB directly for completed/DELETING tasks or force=true
return this.deleteFromDB(id);
}

@Override
Expand Down Expand Up @@ -353,6 +396,18 @@ public boolean close() {
cronFuture.cancel(false);
}

// Wait for cron task to complete to ensure all transactions are closed
try {
cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
} catch (CancellationException e) {
// Task was cancelled, this is expected
LOG.debug("Cron task was cancelled");
} catch (TimeoutException e) {
LOG.warn("Cron task did not complete in time when closing scheduler");
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Exception while waiting for cron task to complete", e);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 等待超时可能导致数据库事务泄漏

DistributedTaskScheduler.java:399-410:

cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);

问题:

  1. 如果 cron 任务超时,后续的 taskDbExecutor 清理可能在 cron 任务仍持有事务时执行
  2. 注释说 "ensure all transactions are closed",但超时时无法保证
  3. schedulePeriod 可能很大(如 60 秒),导致关闭等待过长

建议:

Suggested change
// Wait for cron task with reasonable timeout
long waitTime = Math.min(schedulePeriod + 5, 15); // Cap at 15 seconds
try {
cronFuture.get(waitTime, TimeUnit.SECONDS);
} catch (CancellationException e) {
LOG.debug("Cron task was cancelled");
} catch (TimeoutException e) {
LOG.warn("Cron task did not complete in {}s, proceeding with shutdown", waitTime);
// Force interrupt the cron task thread if possible
cronFuture.cancel(true);
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Exception while waiting for cron task to complete", e);
}

if (!this.taskDbExecutor.isShutdown()) {
this.call(() -> {
try {
Expand All @@ -363,7 +418,10 @@ public boolean close() {
this.graph.closeTx();
});
}
return true;

//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 TODO 注释应该追踪到 Issue

DistributedTaskScheduler.java:422-424:

//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;

建议:
将 TODO 关联到 GitHub Issue,方便追踪:

Suggested change
//return true;
// TODO(#issue-number): Remove serverInfoManager completely after full single-node migration
return this.serverManager().close();

并删除注释掉的代码 //return true;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,6 @@ public static HugeServerInfo fromVertex(Vertex vertex) {
return serverInfo;
}

public <V> boolean suitableFor(HugeTask<V> task, long now) {
if (task.computer() != this.role.computer()) {
return false;
}
return this.updateTime.getTime() + EXPIRED_INTERVAL >= now &&
this.load() + task.load() <= this.maxLoad;
}

public static Schema schema(HugeGraphParams graph) {
return new Schema(graph);
}
Expand Down
Loading
Loading