diff --git a/docker/configs/server2-conf/graphs/hugegraph.properties b/docker/configs/server2-conf/graphs/hugegraph.properties index 66cbccb731..b48bab5ea8 100644 --- a/docker/configs/server2-conf/graphs/hugegraph.properties +++ b/docker/configs/server2-conf/graphs/hugegraph.properties @@ -13,7 +13,6 @@ serializer=binary pd.peers=127.0.0.1:8686,127.0.0.1:8687,127.0.0.1:8688 # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template b/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template index 005031fe60..f97e365748 100644 --- a/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template +++ b/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template @@ -45,7 +45,6 @@ store=hugegraph pd.peers=$PD_PEERS_LIST$ # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java index 278542854b..f699ac199c 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java @@ -556,9 +556,9 @@ public class ServerOptions extends OptionHolder { public static final ConfigOption SERVER_ID = new ConfigOption<>( "server.id", - "The id of hugegraph-server.", - disallowEmpty(), - "server-1" + "The id of hugegraph-server, auto-generated if not specified.", + null, + "" ); public static final ConfigOption SERVER_ROLE = new ConfigOption<>( diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index eda050e16b..b92d5c536f 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -68,6 +69,7 @@ import org.apache.hugegraph.config.TypedOption; import org.apache.hugegraph.event.EventHub; import org.apache.hugegraph.exception.ExistedException; +import org.apache.hugegraph.exception.NotFoundException; import org.apache.hugegraph.exception.NotSupportException; import org.apache.hugegraph.io.HugeGraphSONModule; import org.apache.hugegraph.k8s.K8sDriver; @@ -195,7 +197,17 @@ public final class GraphManager { public GraphManager(HugeConfig conf, EventHub hub) { LOG.info("Init graph manager"); E.checkArgumentNotNull(conf, "The config can't be null"); + + // Auto-generate server.id if not configured. + // Random generation is to prevent duplicate id error reports.This id is currently + // meaningless and needs to be completely removed serverInfoManager in + // the future String server = conf.get(ServerOptions.SERVER_ID); + if (StringUtils.isEmpty(server)) { + server = "server-" + UUID.randomUUID().toString().substring(0, 8); + LOG.info("Auto-generated server.id: {}", server); + conf.setProperty(ServerOptions.SERVER_ID.name(), server); + } String role = conf.get(ServerOptions.SERVER_ROLE); this.config = conf; @@ -206,10 +218,6 @@ public GraphManager(HugeConfig conf, EventHub hub) { conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S); this.startIgnoreSingleGraphError = conf.get( ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR); - E.checkArgument(server != null && !server.isEmpty(), - "The server name can't be null or empty"); - E.checkArgument(role != null && !role.isEmpty(), - "The server role can't be null or empty"); this.graphsDir = conf.get(ServerOptions.GRAPHS); this.cluster = conf.get(ServerOptions.CLUSTER); this.graphSpaces = new ConcurrentHashMap<>(); @@ -276,7 +284,7 @@ private static String serviceId(String graphSpace, Service.ServiceType type, .replace("_", "-").toLowerCase(); } - private boolean usePD() { + public boolean usePD() { return this.PDExist; } @@ -1557,6 +1565,14 @@ private void loadGraph(String name, String graphConfPath) { String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS); config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), raftGroupPeers); + + // Transfer `pd.peers` from server config to graph config + // Only inject if not already configured in graph config + if (!config.containsKey("pd.peers")) { + String pdPeers = this.conf.get(ServerOptions.PD_PEERS); + config.addProperty("pd.peers", pdPeers); + } + this.transferRoleWorkerConfig(config); Graph graph = GraphFactory.open(config); @@ -1637,10 +1653,6 @@ private void checkBackendVersionOrExit(HugeConfig config) { private void initNodeRole() { String id = config.get(ServerOptions.SERVER_ID); String role = config.get(ServerOptions.SERVER_ROLE); - E.checkArgument(StringUtils.isNotEmpty(id), - "The server name can't be null or empty"); - E.checkArgument(StringUtils.isNotEmpty(role), - "The server role can't be null or empty"); NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase()); boolean supportRoleElection = !nodeRole.computer() && @@ -1960,7 +1972,7 @@ public HugeGraph graph(String graphSpace, String name) { } else if (graph instanceof HugeGraph) { return (HugeGraph) graph; } - throw new NotSupportException("graph instance of %s", graph.getClass()); + throw new NotFoundException(String.format("Graph '%s' does not exist", name)); } public void dropGraphLocal(String name) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index faf97aa8d6..5864e2a615 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph { private final BackendStoreProvider storeProvider; private final TinkerPopTransaction tx; private final RamTable ramtable; - private final String schedulerType; private volatile boolean started; private volatile boolean closed; private volatile GraphMode mode; @@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) { this.closed = false; this.mode = GraphMode.NONE; this.readMode = GraphReadMode.OLTP_ONLY; - this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE); LockUtil.init(this.spaceGraphName()); @@ -315,6 +313,7 @@ public String backend() { return this.storeProvider.type(); } + @Override public BackendStoreInfo backendStoreInfo() { // Just for trigger Tx.getOrNewTransaction, then load 3 stores // TODO: pass storeProvider.metaStore() @@ -465,6 +464,7 @@ public void updateTime(Date updateTime) { this.updateTime = updateTime; } + @Override public void waitStarted() { // Just for trigger Tx.getOrNewTransaction, then load 3 stores this.schemaTransaction(); @@ -1629,7 +1629,9 @@ public void submitEphemeralJob(EphemeralJob job) { @Override public String schedulerType() { - return StandardHugeGraph.this.schedulerType; + // Use distributed scheduler for hstore backend, otherwise use local + // After the merger of rocksdb and hstore, consider whether to change this logic + return StandardHugeGraph.this.isHstore() ? "distributed" : "local"; } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java index ba4d4a1c0e..72a2da9324 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java @@ -303,13 +303,7 @@ public class CoreOptions extends OptionHolder { rangeInt(1, 500), 1 ); - public static final ConfigOption SCHEDULER_TYPE = - new ConfigOption<>( - "task.scheduler_type", - "The type of scheduler used in distribution system.", - allowValues("local", "distributed"), - "local" - ); + public static final ConfigOption TASK_SYNC_DELETION = new ConfigOption<>( "task.sync_deletion", diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java index c345c50e60..4856744459 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java @@ -22,7 +22,7 @@ import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.E; -// TODO: rename to GlobalNodeRoleInfo +// TODO: We need to completely delete the startup of master-worker public final class GlobalMasterInfo { private static final NodeInfo NO_MASTER = new NodeInfo(false, ""); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java index dbbea6d91e..74515dacec 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java @@ -17,12 +17,12 @@ package org.apache.hugegraph.masterelection; -import java.util.Objects; - import org.apache.hugegraph.task.TaskManager; import org.apache.hugegraph.util.Log; import org.slf4j.Logger; +import java.util.Objects; + public class StandardRoleListener implements RoleListener { private static final Logger LOG = Log.logger(StandardRoleListener.class); @@ -36,7 +36,6 @@ public class StandardRoleListener implements RoleListener { public StandardRoleListener(TaskManager taskManager, GlobalMasterInfo roleInfo) { this.taskManager = taskManager; - this.taskManager.enableRoleElection(); this.roleInfo = roleInfo; this.selfIsMaster = false; } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java index b4bba2ea12..7c143fb33d 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java @@ -19,7 +19,9 @@ import java.util.Iterator; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,6 +50,7 @@ import org.slf4j.Logger; public class DistributedTaskScheduler extends TaskAndResultScheduler { + private static final Logger LOG = Log.logger(DistributedTaskScheduler.class); private final long schedulePeriod; private final ExecutorService taskDbExecutor; @@ -118,6 +121,11 @@ private static boolean sleep(long ms) { public void cronSchedule() { // Perform periodic scheduling tasks + // Check closed flag first to exit early + if (this.closed.get()) { + return; + } + if (!this.graph.started() || this.graph.closed()) { return; } @@ -253,6 +261,10 @@ public Future schedule(HugeTask task) { return this.ephemeralTaskExecutor.submit(task); } + // Validate task state before saving to ensure correct exception type + E.checkState(task.type() != null, "Task type can't be null"); + E.checkState(task.name() != null, "Task name can't be null"); + // Process schema task // Handle gremlin task // Handle OLAP calculation tasks @@ -284,14 +296,41 @@ protected void initTaskParams(HugeTask task) { } } + /** + * Note: This method will update the status of the input task. + * + * @param task + * @param + */ @Override public void cancel(HugeTask task) { - // Update status to CANCELLING - if (!task.completed()) { - // Task not completed, can only execute status not CANCELLING - this.updateStatus(task.id(), null, TaskStatus.CANCELLING); + E.checkArgumentNotNull(task, "Task can't be null"); + + if (task.completed() || task.cancelling()) { + return; + } + + LOG.info("Cancel task '{}' in status {}", task.id(), task.status()); + + // Check if task is running locally, cancel it directly if so + HugeTask runningTask = this.runningTasks.get(task.id()); + if (runningTask != null) { + boolean cancelled = runningTask.cancel(true); + if (cancelled) { + task.overwriteStatus(TaskStatus.CANCELLED); + } + LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled); + return; + } + + // Task not running locally, update status to CANCELLING + // for cronSchedule() or other nodes to handle + TaskStatus currentStatus = task.status(); + if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { + LOG.info("Failed to cancel task '{}', status may have changed from {}", + task.id(), currentStatus); } else { - LOG.info("cancel task({}) error, task has completed", task.id()); + task.overwriteStatus(TaskStatus.CANCELLING); } } @@ -316,14 +355,18 @@ protected HugeTask deleteFromDB(Id id) { @Override public HugeTask delete(Id id, boolean force) { - if (!force) { - // Change status to DELETING, perform the deletion operation through automatic - // scheduling. + HugeTask task = this.taskWithoutResult(id); + + if (!force && !task.completed()) { + // Check task status: can't delete running tasks without force this.updateStatus(id, null, TaskStatus.DELETING); return null; - } else { - return this.deleteFromDB(id); + // Already in DELETING status, delete directly from DB + // Completed tasks can also be deleted directly } + + // Delete from DB directly for completed/DELETING tasks or force=true + return this.deleteFromDB(id); } @Override @@ -353,6 +396,18 @@ public boolean close() { cronFuture.cancel(false); } + // Wait for cron task to complete to ensure all transactions are closed + try { + cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS); + } catch (CancellationException e) { + // Task was cancelled, this is expected + LOG.debug("Cron task was cancelled"); + } catch (TimeoutException e) { + LOG.warn("Cron task did not complete in time when closing scheduler"); + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Exception while waiting for cron task to complete", e); + } + if (!this.taskDbExecutor.isShutdown()) { this.call(() -> { try { @@ -363,7 +418,10 @@ public boolean close() { this.graph.closeTx(); }); } - return true; + + //todo: serverInfoManager section should be removed in the future. + return this.serverManager().close(); + //return true; } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java index 71feb3f688..f0485f6656 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java @@ -209,14 +209,6 @@ public static HugeServerInfo fromVertex(Vertex vertex) { return serverInfo; } - public boolean suitableFor(HugeTask task, long now) { - if (task.computer() != this.role.computer()) { - return false; - } - return this.updateTime.getTime() + EXPIRED_INTERVAL >= now && - this.load() + task.load() <= this.maxLoad; - } - public static Schema schema(HugeGraphParams graph) { return new Schema(graph); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java index bcef869017..d4b0f27ad2 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java @@ -19,7 +19,6 @@ import static org.apache.hugegraph.backend.query.Query.NO_LIMIT; -import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.concurrent.Callable; @@ -35,7 +34,6 @@ import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.exception.ConnectionException; -import org.apache.hugegraph.iterator.ListIterator; import org.apache.hugegraph.iterator.MapperIterator; import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.schema.PropertyKey; @@ -64,7 +62,6 @@ public class ServerInfoManager { private volatile GlobalMasterInfo globalNodeInfo; - private volatile boolean onlySingleNode; private volatile boolean closed; public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) { @@ -76,7 +73,6 @@ public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) { this.globalNodeInfo = null; - this.onlySingleNode = false; this.closed = false; } @@ -115,7 +111,7 @@ public synchronized void initServerInfo(GlobalMasterInfo nodeInfo) { try { Thread.sleep(existed.expireTime() - now + 1); } catch (InterruptedException e) { - throw new HugeException("Interrupted when waiting for server info expired", e); + throw new HugeException("Interrupted when waiting for server info expired", e); } } E.checkArgument(existed == null || !existed.alive(), @@ -176,11 +172,6 @@ public boolean selfIsMaster() { return this.selfNodeRole() != null && this.selfNodeRole().master(); } - public boolean onlySingleNode() { - // Only exists one node in the whole master - return this.onlySingleNode; - } - public synchronized void heartbeat() { assert this.graphIsReady(); @@ -212,13 +203,6 @@ public synchronized void heartbeat() { assert serverInfo != null; } - public synchronized void decreaseLoad(int load) { - assert load > 0 : load; - HugeServerInfo serverInfo = this.selfServerInfo(); - serverInfo.increaseLoad(-load); - this.save(serverInfo); - } - public int calcMaxLoad() { // TODO: calc max load based on CPU and Memory resources return 10000; @@ -228,48 +212,6 @@ protected boolean graphIsReady() { return !this.closed && this.graph.started() && this.graph.initialized(); } - protected synchronized HugeServerInfo pickWorkerNode(Collection servers, - HugeTask task) { - HugeServerInfo master = null; - HugeServerInfo serverWithMinLoad = null; - int minLoad = Integer.MAX_VALUE; - boolean hasWorkerNode = false; - long now = DateUtil.now().getTime(); - - // Iterate servers to find suitable one - for (HugeServerInfo server : servers) { - if (!server.alive()) { - continue; - } - if (server.role().master()) { - master = server; - continue; - } - hasWorkerNode = true; - if (!server.suitableFor(task, now)) { - continue; - } - if (server.load() < minLoad) { - minLoad = server.load(); - serverWithMinLoad = server; - } - } - - boolean singleNode = !hasWorkerNode; - if (singleNode != this.onlySingleNode) { - LOG.info("Switch only_single_node to {}", singleNode); - this.onlySingleNode = singleNode; - } - - // Only schedule to master if there are no workers and master are suitable - if (!hasWorkerNode) { - if (master != null && master.suitableFor(task, now)) { - serverWithMinLoad = master; - } - } - return serverWithMinLoad; - } - private GraphTransaction tx() { assert Thread.currentThread().getName().contains("server-info-db-worker"); return this.graph.systemTransaction(); @@ -299,33 +241,6 @@ private Id save(HugeServerInfo serverInfo) { }); } - private int save(Collection serverInfos) { - return this.call(() -> { - if (serverInfos.isEmpty()) { - return 0; - } - HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); - if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { - throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER); - } - // Save server info in batch - GraphTransaction tx = this.tx(); - int updated = 0; - for (HugeServerInfo server : serverInfos) { - if (!server.updated()) { - continue; - } - HugeVertex vertex = tx.constructVertex(false, server.asArray()); - tx.addVertex(vertex); - updated++; - } - // NOTE: actually it is auto-commit, to be improved - tx.commitOrRollback(); - - return updated; - }); - } - private V call(Callable callable) { assert !Thread.currentThread().getName().startsWith( "server-info-db-worker") : "can't call by itself"; @@ -388,24 +303,6 @@ private HugeServerInfo removeServerInfo(Id serverId) { }); } - protected void updateServerInfos(Collection serverInfos) { - this.save(serverInfos); - } - - protected Collection allServerInfos() { - Iterator infos = this.serverInfos(NO_LIMIT, null); - try (ListIterator iter = new ListIterator<>( - MAX_SERVERS, infos)) { - return iter.list(); - } catch (Exception e) { - throw new HugeException("Failed to close server info iterator", e); - } - } - - protected Iterator serverInfos(String page) { - return this.serverInfos(ImmutableMap.of(), PAGE_SIZE, page); - } - protected Iterator serverInfos(long limit, String page) { return this.serverInfos(ImmutableMap.of(), limit, page); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index 5f60792af1..79dd98c0f4 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -18,7 +18,6 @@ package org.apache.hugegraph.task; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -125,11 +124,9 @@ private TaskTransaction tx() { // NOTE: only the owner thread can access task tx if (this.taskTx == null) { /* - * NOTE: don't synchronized(this) due to scheduler thread hold - * this lock through scheduleTasks(), then query tasks and wait - * for db-worker thread after call(), the tx may not be initialized - * but can't catch this lock, then cause deadlock. - * We just use this.serverManager as a monitor here + * NOTE: don't synchronized(this) to avoid potential deadlock + * when multiple threads are accessing task transaction. + * We use this.serverManager as a monitor here for thread safety. */ synchronized (this.serverManager) { if (this.taskTx == null) { @@ -146,9 +143,9 @@ private TaskTransaction tx() { @Override public void restoreTasks() { - Id selfServer = this.serverManager().selfNodeId(); List> taskList = new ArrayList<>(); // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order. + // Single-node mode: restore all pending tasks without server filtering for (TaskStatus status : TaskStatus.PENDING_STATUSES) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { @@ -156,9 +153,7 @@ public void restoreTasks() { for (iter = this.findTask(status, PAGE_SIZE, page); iter.hasNext(); ) { HugeTask task = iter.next(); - if (selfServer.equals(task.server())) { - taskList.add(task); - } + taskList.add(task); } if (page != null) { page = PageInfo.pageInfo(iter); @@ -211,30 +206,9 @@ public Future schedule(HugeTask task) { return this.submitTask(task); } - // Check this is on master for normal task schedule - this.checkOnMasterNode("schedule"); - if (this.serverManager().onlySingleNode() && !task.computer()) { - /* - * Speed up for single node, submit the task immediately, - * this code can be removed without affecting code logic - */ - task.status(TaskStatus.QUEUED); - task.server(this.serverManager().selfNodeId()); - this.save(task); - return this.submitTask(task); - } else { - /* - * Just set the SCHEDULING status and save the task, - * it will be scheduled by periodic scheduler worker - */ - task.status(TaskStatus.SCHEDULING); - this.save(task); - - // Notify master server to schedule and execute immediately - TaskManager.instance().notifyNewTask(task); - - return task; - } + task.status(TaskStatus.QUEUED); + this.save(task); + return this.submitTask(task); } private Future submitTask(HugeTask task) { @@ -273,7 +247,6 @@ public void initTaskCallable(HugeTask task) { @Override public synchronized void cancel(HugeTask task) { E.checkArgumentNotNull(task, "Task can't be null"); - this.checkOnMasterNode("cancel"); if (task.completed() || task.cancelling()) { return; @@ -281,31 +254,15 @@ public synchronized void cancel(HugeTask task) { LOG.info("Cancel task '{}' in status {}", task.id(), task.status()); - if (task.server() == null) { - // The task not scheduled to workers, set canceled immediately - assert task.status().code() < TaskStatus.QUEUED.code(); - if (task.status(TaskStatus.CANCELLED)) { - this.save(task); - return; - } - } else if (task.status(TaskStatus.CANCELLING)) { - // The task scheduled to workers, let the worker node to cancel + HugeTask memTask = this.tasks.get(task.id()); + if (memTask != null) { + boolean cancelled = memTask.cancel(true); + LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); + return; + } + + if (task.status(TaskStatus.CANCELLED)) { this.save(task); - assert task.server() != null : task; - assert this.serverManager().selfIsMaster(); - if (!task.server().equals(this.serverManager().selfNodeId())) { - /* - * Remove the task from memory if it's running on worker node, - * but keep the task in memory if it's running on master node. - * Cancel-scheduling will read the task from backend store, if - * removed this instance from memory, there will be two task - * instances with the same id, and can't cancel the real task that - * is running but removed from memory. - */ - this.remove(task); - } - // Notify master server to schedule and execute immediately - TaskManager.instance().notifyNewTask(task); return; } @@ -318,128 +275,11 @@ public ServerInfoManager serverManager() { return this.serverManager; } - protected synchronized void scheduleTasksOnMaster() { - // Master server schedule all scheduling tasks to suitable worker nodes - Collection serverInfos = this.serverManager().allServerInfos(); - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page); - while (tasks.hasNext()) { - HugeTask task = tasks.next(); - if (task.server() != null) { - // Skip if already scheduled - continue; - } - - if (!this.serverManager.selfIsMaster()) { - return; - } - - HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task); - if (server == null) { - LOG.info("The master can't find suitable servers to " + - "execute task '{}', wait for next schedule", task.id()); - continue; - } - - // Found suitable server, update task status - assert server.id() != null; - task.server(server.id()); - task.status(TaskStatus.SCHEDULED); - this.save(task); - - // Update server load in memory, it will be saved at the ending - server.increaseLoad(task.load()); - - LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id()); - } - if (page != null) { - page = PageInfo.pageInfo(tasks); - } - } while (page != null); - - // Save to store - this.serverManager().updateServerInfos(serverInfos); - } - - protected void executeTasksOnWorker(Id server) { - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page); - while (tasks.hasNext()) { - HugeTask task = tasks.next(); - this.initTaskCallable(task); - Id taskServer = task.server(); - if (taskServer == null) { - LOG.warn("Task '{}' may not be scheduled", task.id()); - continue; - } - HugeTask memTask = this.tasks.get(task.id()); - if (memTask != null) { - assert memTask.status().code() > task.status().code(); - continue; - } - if (taskServer.equals(server)) { - task.status(TaskStatus.QUEUED); - this.save(task); - this.submitTask(task); - } - } - if (page != null) { - page = PageInfo.pageInfo(tasks); - } - } while (page != null); - } - - protected void cancelTasksOnWorker(Id server) { - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page); - while (tasks.hasNext()) { - HugeTask task = tasks.next(); - Id taskServer = task.server(); - if (taskServer == null) { - LOG.warn("Task '{}' may not be scheduled", task.id()); - continue; - } - if (!taskServer.equals(server)) { - continue; - } - /* - * Task may be loaded from backend store and not initialized. - * like: A task is completed but failed to save in the last - * step, resulting in the status of the task not being - * updated to storage, the task is not in memory, so it's not - * initialized when canceled. - */ - HugeTask memTask = this.tasks.get(task.id()); - if (memTask != null) { - task = memTask; - } else { - this.initTaskCallable(task); - } - boolean cancelled = task.cancel(true); - LOG.info("Server '{}' cancel task '{}' with cancelled={}", - server, task.id(), cancelled); - } - if (page != null) { - page = PageInfo.pageInfo(tasks); - } - } while (page != null); - } - @Override public void taskDone(HugeTask task) { this.remove(task); - - Id selfServerId = this.serverManager().selfNodeId(); - try { - this.serverManager().decreaseLoad(task.load()); - } catch (Throwable e) { - LOG.error("Failed to decrease load for task '{}' on server '{}'", - task.id(), selfServerId, e); - } - LOG.debug("Task '{}' done on server '{}'", task.id(), selfServerId); + // Single-node mode: no need to manage load + LOG.debug("Task '{}' done", task.id()); } protected void remove(HugeTask task) { @@ -738,10 +578,9 @@ public V call(Callable callable) { } } + @Deprecated private void checkOnMasterNode(String op) { - if (!this.serverManager().selfIsMaster()) { - throw new HugeException("Can't %s task on non-master server", op); - } + // Single-node mode: all operations are allowed, no role check needed } private boolean supportsPaging() { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java index 2ba3fd8a6d..6c99ef156d 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java @@ -46,6 +46,7 @@ * Base class of task & result scheduler */ public abstract class TaskAndResultScheduler implements TaskScheduler { + /** * Which graph the scheduler belongs to */ @@ -61,8 +62,8 @@ public abstract class TaskAndResultScheduler implements TaskScheduler { private final ServerInfoManager serverManager; public TaskAndResultScheduler( - HugeGraphParams graph, - ExecutorService serverInfoDbExecutor) { + HugeGraphParams graph, + ExecutorService serverInfoDbExecutor) { E.checkNotNull(graph, "graph"); this.graph = graph; @@ -90,7 +91,7 @@ public void save(HugeTask task) { // Save result outcome if (rawResult != null) { HugeTaskResult result = - new HugeTaskResult(HugeTaskResult.genId(task.id())); + new HugeTaskResult(HugeTaskResult.genId(task.id())); result.result(rawResult); this.call(() -> { @@ -164,7 +165,7 @@ protected Iterator> queryTask(Map conditions, } Iterator vertices = this.tx().queryTaskInfos(query); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, HugeTask::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -180,16 +181,16 @@ protected Iterator> queryTask(Map conditions, protected Iterator> queryTask(List ids) { ListIterator> ts = this.call( - () -> { - Object[] idArray = ids.toArray(new Id[ids.size()]); - Iterator vertices = this.tx() - .queryTaskInfos(idArray); - Iterator> tasks = - new MapperIterator<>(vertices, - HugeTask::fromVertex); - // Convert iterator to list to avoid across thread tx accessed - return QueryResults.toList(tasks); - }); + () -> { + Object[] idArray = ids.toArray(new Id[ids.size()]); + Iterator vertices = this.tx() + .queryTaskInfos(idArray); + Iterator> tasks = + new MapperIterator<>(vertices, + HugeTask::fromVertex); + // Convert iterator to list to avoid across thread tx accessed + return QueryResults.toList(tasks); + }); Iterator results = queryTaskResult(ids); @@ -201,7 +202,7 @@ protected Iterator> queryTask(List ids) { return new MapperIterator<>(ts, (task) -> { HugeTaskResult taskResult = - resultCaches.get(HugeTaskResult.genId(task.id())); + resultCaches.get(HugeTaskResult.genId(task.id())); if (taskResult != null) { task.result(taskResult); } @@ -219,6 +220,10 @@ protected HugeTask taskWithoutResult(Id id) { return HugeTask.fromVertex(vertex); }); + if (result == null) { + throw new NotFoundException("Can't find task with id '%s'", id); + } + return result; } @@ -227,7 +232,7 @@ protected Iterator> tasksWithoutResult(List ids) { Object[] idArray = ids.toArray(new Id[ids.size()]); Iterator vertices = this.tx().queryTaskInfos(idArray); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, HugeTask::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -250,7 +255,7 @@ protected Iterator> queryTaskWithoutResult(String key, } protected Iterator> queryTaskWithoutResult(Map conditions, long limit, String page) { + Object> conditions, long limit, String page) { return this.call(() -> { ConditionQuery query = new ConditionQuery(HugeType.TASK); if (page != null) { @@ -268,7 +273,7 @@ protected Iterator> queryTaskWithoutResult(Map vertices = this.tx().queryTaskInfos(query); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, HugeTask::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -277,7 +282,7 @@ protected Iterator> queryTaskWithoutResult(Map { Iterator vertices = - this.tx().queryTaskInfos(HugeTaskResult.genId(taskid)); + this.tx().queryTaskInfos(HugeTaskResult.genId(taskid)); Vertex vertex = QueryResults.one(vertices); if (vertex == null) { return null; @@ -292,12 +297,12 @@ protected HugeTaskResult queryTaskResult(Id taskid) { protected Iterator queryTaskResult(List taskIds) { return this.call(() -> { Object[] idArray = - taskIds.stream().map(HugeTaskResult::genId).toArray(); + taskIds.stream().map(HugeTaskResult::genId).toArray(); Iterator vertices = this.tx() .queryTaskInfos(idArray); Iterator tasks = - new MapperIterator<>(vertices, - HugeTaskResult::fromVertex); + new MapperIterator<>(vertices, + HugeTaskResult::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 277822a386..9ce9762743 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -18,7 +18,6 @@ package org.apache.hugegraph.task; import java.util.Map; -import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -33,7 +32,6 @@ import org.apache.hugegraph.util.Consumers; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.ExecutorUtil; -import org.apache.hugegraph.util.LockUtil; import org.apache.hugegraph.util.Log; import org.slf4j.Logger; @@ -76,8 +74,6 @@ public final class TaskManager { private final ExecutorService ephemeralTaskExecutor; private final PausableScheduledThreadPool distributedSchedulerExecutor; - private boolean enableRoleElected = false; - public static TaskManager instance() { return MANAGER; } @@ -102,11 +98,6 @@ private TaskManager(int pool) { // For a schedule task to run, just one thread is ok this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( 1, TASK_SCHEDULER); - // Start after 10x period time waiting for HugeGraphServer startup - this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob, - 10 * SCHEDULE_PERIOD, - SCHEDULE_PERIOD, - TimeUnit.MILLISECONDS); } public void addScheduler(HugeGraphParams graph) { @@ -230,14 +221,6 @@ private void closeDistributedSchedulerTx(HugeGraphParams graph) { } } - public void pauseScheduledThreadPool() { - this.schedulerExecutor.pauseSchedule(); - } - - public void resumeScheduledThreadPool() { - this.schedulerExecutor.resumeSchedule(); - } - public TaskScheduler getScheduler(HugeGraphParams graph) { return this.schedulers.get(graph); } @@ -349,10 +332,6 @@ public int pendingTasks() { return size; } - public void enableRoleElection() { - this.enableRoleElected = true; - } - public void onAsRoleMaster() { try { for (TaskScheduler entry : this.schedulers.values()) { @@ -385,91 +364,6 @@ public void onAsRoleWorker() { } } - void notifyNewTask(HugeTask task) { - Queue queue = this.schedulerExecutor - .getQueue(); - if (queue.size() <= 1) { - /* - * Notify to schedule tasks initiatively when have new task - * It's OK to not notify again if there are more than one task in - * queue(like two, one is timer task, one is immediate task), - * we don't want too many immediate tasks to be inserted into queue, - * one notify will cause all the tasks to be processed. - */ - this.schedulerExecutor.submit(this::scheduleOrExecuteJob); - } - } - - private void scheduleOrExecuteJob() { - // Called by scheduler timer - try { - for (TaskScheduler entry : this.schedulers.values()) { - // Maybe other threads close&remove scheduler at the same time - synchronized (entry) { - this.scheduleOrExecuteJobForGraph(entry); - } - } - } catch (Throwable e) { - LOG.error("Exception occurred when schedule job", e); - } - } - - private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) { - E.checkNotNull(scheduler, "scheduler"); - - if (scheduler instanceof StandardTaskScheduler) { - StandardTaskScheduler standardTaskScheduler = (StandardTaskScheduler) (scheduler); - ServerInfoManager serverManager = scheduler.serverManager(); - String spaceGraphName = scheduler.spaceGraphName(); - - LockUtil.lock(spaceGraphName, LockUtil.GRAPH_LOCK); - try { - /* - * Skip if: - * graph is closed (iterate schedulers before graph is closing) - * or - * graph is not initialized(maybe truncated or cleared). - * - * If graph is closing by other thread, current thread get - * serverManager and try lock graph, at the same time other - * thread deleted the lock-group, current thread would get - * exception 'LockGroup xx does not exists'. - * If graph is closed, don't call serverManager.initialized() - * due to it will reopen graph tx. - */ - if (!serverManager.graphIsReady()) { - return; - } - - // Update server heartbeat - serverManager.heartbeat(); - - /* - * Master will schedule tasks to suitable servers. - * Note a Worker may become to a Master, so elected-Master also needs to - * execute tasks assigned by previous Master when enableRoleElected=true. - * However, when enableRoleElected=false, a Master is only set by the - * config assignment, assigned-Master always stays the same state. - */ - if (serverManager.selfIsMaster()) { - standardTaskScheduler.scheduleTasksOnMaster(); - if (!this.enableRoleElected && !serverManager.onlySingleNode()) { - // assigned-Master + non-single-node don't need to execute tasks - return; - } - } - - // Execute queued tasks scheduled to current server - standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId()); - - // Cancel tasks scheduled to current server - standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId()); - } finally { - LockUtil.unlock(spaceGraphName, LockUtil.GRAPH_LOCK); - } - } - } - private static final ThreadLocal CONTEXTS = new ThreadLocal<>(); public static void setContext(String context) { diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template index d3834baf5c..fd2782a87d 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template @@ -31,7 +31,6 @@ store=hugegraph pd.peers=127.0.0.1:8686 # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties index b77cacb2de..3727919bbb 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties @@ -30,7 +30,6 @@ store=hugegraph #pd.peers=127.0.0.1:8686 # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties index ad3e2700f8..eba2ed1f5d 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties @@ -23,9 +23,6 @@ arthas.disabled_commands=jad #auth.admin_pa=pa #auth.graph_store=hugegraph -# lightweight load balancing (TODO: legacy mode, remove soon) -server.id=server-1 -server.role=master # use pd # usePD=true diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java index 4fae0f76c6..5c34236857 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java @@ -248,7 +248,7 @@ public void testCreateGraphsWithInvalidNames() { @Test public void testCreateGraphsWithSameName() { - List graphs = openGraphs("g", "g", "G"); + List graphs = openGraphs("gg", "gg", "GG"); HugeGraph g1 = graphs.get(0); HugeGraph g2 = graphs.get(1); HugeGraph g3 = graphs.get(2); diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java index 212ccc0588..3811a46f02 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java @@ -17,8 +17,8 @@ package org.apache.hugegraph.core; -import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -33,6 +33,7 @@ import org.apache.hugegraph.job.GremlinJob; import org.apache.hugegraph.job.JobBuilder; import org.apache.hugegraph.task.HugeTask; +import org.apache.hugegraph.task.StandardTaskScheduler; import org.apache.hugegraph.task.TaskCallable; import org.apache.hugegraph.task.TaskScheduler; import org.apache.hugegraph.task.TaskStatus; @@ -76,12 +77,14 @@ public void testTask() throws TimeoutException { Assert.assertEquals(id, task.id()); Assert.assertFalse(task.completed()); - Assert.assertThrows(IllegalArgumentException.class, () -> { - scheduler.delete(id, false); - }, e -> { - Assert.assertContains("Can't delete incomplete task '88888'", - e.getMessage()); - }); + if (scheduler.getClass().equals(StandardTaskScheduler.class)) { + Assert.assertThrows(IllegalArgumentException.class, () -> { + scheduler.delete(id, false); + }, e -> { + Assert.assertContains("Can't delete incomplete task '88888'", + e.getMessage()); + }); + } task = scheduler.waitUntilTaskCompleted(task.id(), 10); Assert.assertEquals(id, task.id()); @@ -89,7 +92,7 @@ public void testTask() throws TimeoutException { Assert.assertEquals(TaskStatus.SUCCESS, task.status()); Assert.assertEquals("test-task", scheduler.task(id).name()); - Assert.assertEquals("test-task", scheduler.tasks(Arrays.asList(id)) + Assert.assertEquals("test-task", scheduler.tasks(List.of(id)) .next().name()); Iterator> iter = scheduler.tasks(ImmutableList.of(id)); @@ -196,13 +199,18 @@ public Object execute() throws Exception { Assert.assertEquals("test", task.type()); Assert.assertFalse(task.completed()); - HugeTask task2 = scheduler.waitUntilTaskCompleted(task.id(), 10); + // Ephemeral tasks are node-local and not persisted to DB. + // Use Future.get() to wait for completion instead of ID-based lookup. + try { + task.get(10, java.util.concurrent.TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Ephemeral task execution failed", e); + } + Assert.assertEquals(TaskStatus.SUCCESS, task.status()); Assert.assertEquals("{\"k1\":13579,\"k2\":\"24680\"}", task.result()); - Assert.assertEquals(TaskStatus.SUCCESS, task2.status()); - Assert.assertEquals("{\"k1\":13579,\"k2\":\"24680\"}", task2.result()); - + // Ephemeral tasks are not stored in DB, so these should throw NotFoundException Assert.assertThrows(NotFoundException.class, () -> { scheduler.waitUntilTaskCompleted(task.id(), 10); }); @@ -557,7 +565,12 @@ public void testGremlinJobAndCancel() throws TimeoutException { scheduler.cancel(task); task = scheduler.task(task.id()); - Assert.assertEquals(TaskStatus.CANCELLING, task.status()); + // For DistributedTaskScheduler, local cancel may result in CANCELLED directly + // (task thread updates status after being interrupted) + // or CANCELLING (if task hasn't processed the interrupt yet) + Assert.assertTrue("Task status should be CANCELLING or CANCELLED, but was " + task.status(), + task.status() == TaskStatus.CANCELLING || + task.status() == TaskStatus.CANCELLED); task = scheduler.waitUntilTaskCompleted(task.id(), 10); Assert.assertEquals(TaskStatus.CANCELLED, task.status()); @@ -629,46 +642,51 @@ public void testGremlinJobAndRestore() throws Exception { scheduler.cancel(task); task = scheduler.task(task.id()); - Assert.assertEquals(TaskStatus.CANCELLING, task.status()); + Assert.assertTrue("Task status should be CANCELLING or CANCELLED, but was " + task.status(), + task.status() == TaskStatus.CANCELLING || + task.status() == TaskStatus.CANCELLED); task = scheduler.waitUntilTaskCompleted(task.id(), 10); Assert.assertEquals(TaskStatus.CANCELLED, task.status()); Assert.assertTrue("progress=" + task.progress(), 0 < task.progress() && task.progress() < 10); Assert.assertEquals(0, task.retries()); - Assert.assertEquals(null, task.result()); + Assert.assertNull(task.result()); HugeTask finalTask = task; - Assert.assertThrows(IllegalArgumentException.class, () -> { - Whitebox.invoke(scheduler.getClass(), "restore", scheduler, - finalTask); - }, e -> { - Assert.assertContains("No need to restore completed task", - e.getMessage()); - }); - HugeTask task2 = scheduler.task(task.id()); - Assert.assertThrows(IllegalArgumentException.class, () -> { + // because Distributed do nothing in restore, so only test StandardTaskScheduler here + if (scheduler.getClass().equals(StandardTaskScheduler.class)) { + Assert.assertThrows(IllegalArgumentException.class, () -> { + Whitebox.invoke(scheduler.getClass(), "restore", scheduler, + finalTask); + }, e -> { + Assert.assertContains("No need to restore completed task", + e.getMessage()); + }); + + HugeTask task2 = scheduler.task(task.id()); + Assert.assertThrows(IllegalArgumentException.class, () -> { + Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); + }, e -> { + Assert.assertContains("No need to restore completed task", + e.getMessage()); + }); + + Whitebox.setInternalState(task2, "status", TaskStatus.RUNNING); Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); - }, e -> { - Assert.assertContains("No need to restore completed task", - e.getMessage()); - }); - - Whitebox.setInternalState(task2, "status", TaskStatus.RUNNING); - Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); - Assert.assertThrows(IllegalArgumentException.class, () -> { - Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); - }, e -> { - Assert.assertContains("is already in the queue", e.getMessage()); - }); - - scheduler.waitUntilTaskCompleted(task2.id(), 10); - sleepAWhile(500); - Assert.assertEquals(10, task2.progress()); - Assert.assertEquals(1, task2.retries()); - Assert.assertEquals("100", task2.result()); + Assert.assertThrows(IllegalArgumentException.class, () -> { + Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); + }, e -> { + Assert.assertContains("is already in the queue", e.getMessage()); + }); + scheduler.waitUntilTaskCompleted(task2.id(), 10); + sleepAWhile(500); + Assert.assertEquals(10, task2.progress()); + Assert.assertEquals(1, task2.retries()); + Assert.assertEquals("100", task2.result()); + } } private HugeTask runGremlinJob(String gremlin) { diff --git a/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java b/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java index 849539419b..caf0146bb9 100644 --- a/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java +++ b/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java @@ -295,13 +295,7 @@ public class CoreOptions extends OptionHolder { rangeInt(1, 500), 1 ); - public static final ConfigOption SCHEDULER_TYPE = - new ConfigOption<>( - "task.scheduler_type", - "The type of scheduler used in distribution system.", - allowValues("local", "distributed"), - "local" - ); + public static final ConfigOption TASK_SYNC_DELETION = new ConfigOption<>( "task.sync_deletion",