diff --git a/src/core/persist/knex/process.js b/src/core/persist/knex/process.js index fd47b290..a9f5785b 100644 --- a/src/core/persist/knex/process.js +++ b/src/core/persist/knex/process.js @@ -198,6 +198,27 @@ class ProcessKnexPersist extends KnexPersist { } } + async lockBatch(batch, trx) { + return await trx(this._table) + .select(`${this._table}.*`) + .join(this._state_table, "process_state.id", "process.current_state_id") + .where("current_status", "running") + .limit(batch) + .forUpdate() + .skipLocked(); + } + + async getAndLock(id, processStateId, trx) { + return await trx(this._table) + .select("id", "current_state_id") + .from("process") + .where("id", id) + .where("current_state_id", processStateId) + .first() + .forUpdate() + .noWait(); + } + _getTasks(filters) { return this._db .select( diff --git a/src/core/persist/knex/processState.js b/src/core/persist/knex/processState.js index 2bfbe0cf..683d2b68 100644 --- a/src/core/persist/knex/processState.js +++ b/src/core/persist/knex/processState.js @@ -27,6 +27,15 @@ class ProcessStateKnexPersist extends KnexPersist { }) .first(); } + + async getAndLock(processStateId, trx) { + return await trx(this._table) + .select() + .where("id", processStateId) + .forUpdate() + .noWait() + .first(); + } } module.exports = { ProcessStateKnexPersist }; diff --git a/src/core/workflow/process.js b/src/core/workflow/process.js index 839c7ff7..c0dfad94 100644 --- a/src/core/workflow/process.js +++ b/src/core/workflow/process.js @@ -108,6 +108,11 @@ class Process extends PersistedEntity { return last_step_number + 1; } + static async fetchAndLockBatch(batch, trx) { + const processes = await Process.getPersist().lockBatch(batch, trx); + return _.map(processes, (process) => Process.deserialize(process)); + } + constructor(workflow_data, blueprint_spec) { super(); @@ -380,14 +385,7 @@ class Process extends PersistedEntity { } async __inerLoop(current_state_id, { custom_lisp, actor_data }, trx) { - const p_lock = await trx - .select("id", "current_state_id") - .from("process") - .where("id", this.id) - .where("current_state_id", current_state_id) - .first() - .forUpdate() - .noWait(); + const p_lock = await this.getPersist().getAndLock(this.id, current_state_id, trx); if (!p_lock) { throw new Error(`No process found for lock, process_id [${this.id}] current_state_id [${current_state_id}]`); } @@ -395,13 +393,7 @@ class Process extends PersistedEntity { process_id: p_lock.id, }); - const ps_lock = await trx - .select("id") - .from("process_state") - .first() - .where("id", current_state_id) - .forUpdate() - .noWait(); + const ps_lock = await ProcessState.getPersist().getAndLock(current_state_id, trx); if (!ps_lock) { throw new Error(`No lock for process state [${current_state_id}]`); } diff --git a/src/core/workflow/process_state.js b/src/core/workflow/process_state.js index b80a8bb6..4b6e3828 100644 --- a/src/core/workflow/process_state.js +++ b/src/core/workflow/process_state.js @@ -93,6 +93,11 @@ class ProcessState extends PersistedEntity { return ProcessState.deserialize(state); } + static async fetchAndLock(processStateId, trx) { + const state = await this.getPersist().getAndLock(processStateId, trx); + return ProcessState.deserialize(state); + } + constructor( process_id, step_number, diff --git a/src/engine/heartbeat/process.js b/src/engine/heartbeat/process.js index 71a97b91..e3055bef 100644 --- a/src/engine/heartbeat/process.js +++ b/src/engine/heartbeat/process.js @@ -1,21 +1,14 @@ const emitter = require("../../core/utils/emitter"); const { Process } = require("../../core/workflow/process"); -const { ENGINE_ID } = require("../../core/workflow/process_state"); +const { ProcessState } = require("../../core/workflow/process_state"); const processHeartBeat = async () => { const PROCESS_BATCH = process.env.PROCESS_BATCH || 10; const processes = await Process.getPersist()._db.transaction(async (trx) => { try { emitter.emit("ENGINE.PROCESSES_FETCHING", ` FETCHING PROCESSES ON HEARTBEAT BATCH [${PROCESS_BATCH}]`); - const locked_processes = await trx("process") - .select("process.*") - .join("process_state", "process_state.id", "process.current_state_id") - .where("engine_id", "!=", ENGINE_ID) - .where("current_status", "running") - .limit(PROCESS_BATCH) - .forUpdate() - .skipLocked(); + const locked_processes = await Process.fetchAndLockBatch(PROCESS_BATCH, trx); emitter.emit("ENGINE.PROCESSES_FETCHED", ` FETCHED [${locked_processes.length}] PROCESSES ON HEARTBEAT`, { processes: locked_processes.length, }); @@ -24,18 +17,12 @@ const processHeartBeat = async () => { emitter.emit("ENGINE.PROCESS_FETCHING", ` FETCHING PS FOR PROCESS [${process.id}] ON HEARTBEAT`, { process_id: process.id, }); - process.state = await trx("process_state") - .select() - .where("id", process.current_state_id) - .where("engine_id", "!=", ENGINE_ID) - .forUpdate() - .noWait() - .first(); + process.state = await ProcessState.fetchAndLock(process._current_state_id, trx); emitter.emit("ENGINE.PROCESS_FETCHED", ` FETCHED PS FOR PROCESS [${process.id}] ON HEARTBEAT`, { process_id: process.id, }); if (process.state) { - return Process.deserialize(process); + return process; } }) );