Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/core/persist/knex/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,27 @@ class ProcessKnexPersist extends KnexPersist {
}
}

async lockBatch(batch, trx) {
return await trx(this._table)
.select(`${this._table}.*`)
.join(this._state_table, "process_state.id", "process.current_state_id")
.where("current_status", "running")
.limit(batch)
.forUpdate()
.skipLocked();
}

async getAndLock(id, processStateId, trx) {
return await trx(this._table)
.select("id", "current_state_id")
.from("process")
.where("id", id)
.where("current_state_id", processStateId)
.first()
.forUpdate()
.noWait();
}

_getTasks(filters) {
return this._db
.select(
Expand Down
9 changes: 9 additions & 0 deletions src/core/persist/knex/processState.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ class ProcessStateKnexPersist extends KnexPersist {
})
.first();
}

async getAndLock(processStateId, trx) {
return await trx(this._table)
.select()
.where("id", processStateId)
.forUpdate()
.noWait()
.first();
}
}

module.exports = { ProcessStateKnexPersist };
22 changes: 7 additions & 15 deletions src/core/workflow/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ class Process extends PersistedEntity {
return last_step_number + 1;
}

static async fetchAndLockBatch(batch, trx) {
const processes = await Process.getPersist().lockBatch(batch, trx);
return _.map(processes, (process) => Process.deserialize(process));
}

constructor(workflow_data, blueprint_spec) {
super();

Expand Down Expand Up @@ -380,28 +385,15 @@ class Process extends PersistedEntity {
}

async __inerLoop(current_state_id, { custom_lisp, actor_data }, trx) {
const p_lock = await trx
.select("id", "current_state_id")
.from("process")
.where("id", this.id)
.where("current_state_id", current_state_id)
.first()
.forUpdate()
.noWait();
const p_lock = await this.getPersist().getAndLock(this.id, current_state_id, trx);
if (!p_lock) {
throw new Error(`No process found for lock, process_id [${this.id}] current_state_id [${current_state_id}]`);
}
emitter.emit("INNERLOOP.LOCK", ` LOCK PID ${p_lock.id}`, {
process_id: p_lock.id,
});

const ps_lock = await trx
.select("id")
.from("process_state")
.first()
.where("id", current_state_id)
.forUpdate()
.noWait();
const ps_lock = await ProcessState.getPersist().getAndLock(current_state_id, trx);
if (!ps_lock) {
throw new Error(`No lock for process state [${current_state_id}]`);
}
Expand Down
5 changes: 5 additions & 0 deletions src/core/workflow/process_state.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class ProcessState extends PersistedEntity {
return ProcessState.deserialize(state);
}

static async fetchAndLock(processStateId, trx) {
const state = await this.getPersist().getAndLock(processStateId, trx);
return ProcessState.deserialize(state);
}

constructor(
process_id,
step_number,
Expand Down
21 changes: 4 additions & 17 deletions src/engine/heartbeat/process.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@

const emitter = require("../../core/utils/emitter");
const { Process } = require("../../core/workflow/process");
const { ENGINE_ID } = require("../../core/workflow/process_state");
const { ProcessState } = require("../../core/workflow/process_state");

const processHeartBeat = async () => {
const PROCESS_BATCH = process.env.PROCESS_BATCH || 10;
const processes = await Process.getPersist()._db.transaction(async (trx) => {
try {
emitter.emit("ENGINE.PROCESSES_FETCHING", ` FETCHING PROCESSES ON HEARTBEAT BATCH [${PROCESS_BATCH}]`);
const locked_processes = await trx("process")
.select("process.*")
.join("process_state", "process_state.id", "process.current_state_id")
.where("engine_id", "!=", ENGINE_ID)
.where("current_status", "running")
.limit(PROCESS_BATCH)
.forUpdate()
.skipLocked();
const locked_processes = await Process.fetchAndLockBatch(PROCESS_BATCH, trx);
emitter.emit("ENGINE.PROCESSES_FETCHED", ` FETCHED [${locked_processes.length}] PROCESSES ON HEARTBEAT`, {
processes: locked_processes.length,
});
Expand All @@ -24,18 +17,12 @@ const processHeartBeat = async () => {
emitter.emit("ENGINE.PROCESS_FETCHING", ` FETCHING PS FOR PROCESS [${process.id}] ON HEARTBEAT`, {
process_id: process.id,
});
process.state = await trx("process_state")
.select()
.where("id", process.current_state_id)
.where("engine_id", "!=", ENGINE_ID)
.forUpdate()
.noWait()
.first();
process.state = await ProcessState.fetchAndLock(process._current_state_id, trx);
emitter.emit("ENGINE.PROCESS_FETCHED", ` FETCHED PS FOR PROCESS [${process.id}] ON HEARTBEAT`, {
process_id: process.id,
});
if (process.state) {
return Process.deserialize(process);
return process;
}
})
);
Expand Down