diff --git a/.changeset/lovely-dodos-guess.md b/.changeset/lovely-dodos-guess.md new file mode 100644 index 000000000..cf75f4a47 --- /dev/null +++ b/.changeset/lovely-dodos-guess.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': minor +--- + +Allow globals to be passed into the execution environment diff --git a/packages/lightning-mock/src/api-dev.ts b/packages/lightning-mock/src/api-dev.ts index d3a32cbf6..6ddf7260e 100644 --- a/packages/lightning-mock/src/api-dev.ts +++ b/packages/lightning-mock/src/api-dev.ts @@ -111,16 +111,18 @@ const setupDevAPI = ( attemptId: string, fn: (evt: any) => void, once = true - ) => { + ): (() => void) => { + const unsubscribe = () => state.events.removeListener(event, handler); function handler(e: any) { if (e.attemptId && e.attemptId === attemptId) { if (once) { - state.events.removeListener(event, handler); + unsubscribe(); } fn(e); } } state.events.addListener(event, handler); + return unsubscribe; }; }; diff --git a/packages/runtime/src/execute/context.ts b/packages/runtime/src/execute/context.ts index 10bfeb624..585567199 100644 --- a/packages/runtime/src/execute/context.ts +++ b/packages/runtime/src/execute/context.ts @@ -15,11 +15,14 @@ const freezeAll = ( // Build a safe and helpful execution context // This will be shared by all jobs -export default (state: State, options: Pick) => { +export default (state: State, options: Pick) => { const logger = options.jobLogger ?? console; + const globals = options.globals || {}; const context = vm.createContext( freezeAll( { + ...globals, + // Note that these globals will be overridden console: logger, clearInterval, clearTimeout, diff --git a/packages/runtime/src/runtime.ts b/packages/runtime/src/runtime.ts index 33339d4ca..fd030e3d7 100644 --- a/packages/runtime/src/runtime.ts +++ b/packages/runtime/src/runtime.ts @@ -33,6 +33,9 @@ export type Options = { linker?: LinkerOptions; callbacks?: ExecutionCallbacks; + + // inject globals into the environment + globals?: any; }; const defaultState = { data: {}, configuration: {} }; diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index b004d09d1..74894f0ff 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -474,3 +474,47 @@ test('import from a module', async (t) => { t.is(result.data, 'test'); }); + +test('inject globals', async (t) => { + const expression = + 'export default [(s) => Object.assign(s, { data: { x } })]'; + + const result: any = await run( + expression, + {}, + { + globals: { + x: 90210, + }, + } + ); + t.is(result.data.x, 90210); +}); + +test("injected globals can't override special functions", async (t) => { + const panic = () => { + throw new Error('illegal override'); + }; + + const globals = { + console: panic, + clearInterval: panic, + clearTimeout: panic, + parseFloat: panic, + parseInt: panic, + setInterval: panic, + setTimeout: panic, + }; + const expression = `export default [(s) => { + parseFloat(); + parseInt(); + const i = setInterval(() => {}, 1000); + clearInterval(i); + const t = setTimeout(() => {}, 1000); + clearTimeout(t); + return s; + }]`; + + const result: any = await run(expression, {}, { globals }); + t.falsy(result.errors); +}); diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 9e4380294..bbf5324e6 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -1,6 +1,5 @@ -import crypto from 'node:crypto'; import { EventEmitter } from 'node:events'; -import type { ExecutionPlan, JobNode } from '@openfn/runtime'; +import run, { ExecutionPlan } from '@openfn/runtime'; import * as engine from '@openfn/engine-multi'; import mockResolvers from './resolvers'; @@ -26,6 +25,19 @@ export type WorkflowErrorEvent = { message: string; }; +// this is basically a fake adaptor +// these functions will be injected into scope +const helpers = { + fn: (f: Function) => (s: any) => f(s), + wait: (duration: number) => (s: any) => + new Promise((resolve) => setTimeout(() => resolve(s), duration)), +}; + +// The mock runtime engine creates a fake engine interface +// around a real runtime engine +// Note that it does not dispatch runtime logs and only supports console.log +// This gives us real eventing in the worker tests +// TODO - even better would be to re-use the engine's event map or something async function createMock() { const activeWorkflows = {} as Record; const bus = new EventEmitter(); @@ -53,115 +65,78 @@ async function createMock() { listeners[planId] = events; }; - const executeJob = async ( - workflowId: string, - job: JobNode, - initialState = {}, - resolvers: engine.Resolvers = mockResolvers - ) => { - const { id, expression, configuration, adaptor } = job; - - // If no expression or adaptor, this is (probably) a trigger node. - // Silently do nothing - if (!expression && !adaptor) { - return initialState; + const execute = async ( + xplan: ExecutionPlan, + options: { resolvers?: engine.Resolvers; throw?: boolean } = { + resolvers: mockResolvers, } + ) => { + const { id, jobs } = xplan; + activeWorkflows[id!] = true; - const runId = crypto.randomUUID(); + for (const job of jobs) { + if (typeof job.configuration === 'string') { + // Call the crendtial callback, but don't do anything with it + job.configuration = await options.resolvers?.credential?.( + job.configuration + ); + } - const jobId = id; - if (typeof configuration === 'string') { - // Fetch the credential but do nothing with it - // Maybe later we use it to assemble state - await resolvers.credential?.(configuration); + // Fake compilation + if ( + typeof job.expression === 'string' && + !(job.expression as string).match(/export default \[/) + ) { + job.expression = `export default [${job.expression}];`; + } } - const info = (...message: any[]) => { - dispatch('workflow-log', { - workflowId, - message: message, - level: 'info', - time: (BigInt(Date.now()) * BigInt(1e3)).toString(), - name: 'mck', - }); + // TODO do I need a more sophisticated solution here? + const jobLogger = { + log: (...args: any[]) => { + dispatch('workflow-log', { + workflowId: id, + level: 'info', + json: true, + message: args, + time: Date.now(), + }); + }, }; - // Get the job details from lightning - // start instantly and emit as it goes - dispatch('job-start', { workflowId, jobId, runId }); - info('Running job ' + jobId); - let nextState = initialState; - - // @ts-ignore - if (expression?.startsWith?.('wait@')) { - const [_, delay] = (expression as string).split('@'); - nextState = initialState; - await new Promise((resolve) => { - setTimeout(() => resolve(), parseInt(delay)); - }); - } else { - // Try and parse the expression as JSON, in which case we use it as the final state + const opts = { + strict: false, + jobLogger, + ...options, + globals: helpers, + callbacks: { + notify: (name: any, payload: any) => { + dispatch(name, { + workflowId: id, + ...payload, + }); + }, + }, + }; + setTimeout(async () => { + dispatch('workflow-start', { workflowId: id }); + try { - // @ts-ignore - nextState = JSON.parse(expression); - // What does this look like? Should be a logger object - info('Parsing expression as JSON state'); - info(nextState); - } catch (e) { - // Do nothing, it's fine - nextState = initialState; + await run(xplan, undefined, opts as any); + } catch (e: any) { + dispatch('workflow-error', { + workflowId: id, + type: e.name, + message: e.message, + }); } - } - dispatch('job-complete', { - workflowId, - jobId, - state: nextState, - runId, - next: [], // TODO hmm. I think we need to do better than this. - }); - - return nextState; - }; - - // Start executing an ExecutionPlan - // The mock uses lots of timeouts to make testing a bit easier and simulate asynchronicity - const execute = ( - xplan: ExecutionPlan, - options: { resolvers?: engine.Resolvers; throw?: boolean } = { - resolvers: mockResolvers, - } - ) => { - // This is just an easy way to test the options gets fed through to execute - // Also lets me test error handling! - if (options.throw) { - throw new Error('test error'); - } - - const { id, jobs, initialState } = xplan; - const workflowId = id; - activeWorkflows[id!] = true; - - // TODO do we want to load a globals dataclip from job.state here? - // This isn't supported right now - // We would need to use resolvers.dataclip if we wanted it - - setTimeout(() => { - dispatch('workflow-start', { workflowId }); - setTimeout(async () => { - let state = initialState || {}; - // Trivial job reducer in our mock - for (const job of jobs) { - state = await executeJob(id!, job, state, options.resolvers); - } - setTimeout(() => { - delete activeWorkflows[id!]; - dispatch('workflow-complete', { workflowId }); - // TODO on workflow complete we should maybe tidy the listeners? - // Doesn't really matter in the mock though - }, 1); - }, 1); + delete activeWorkflows[id!]; + dispatch('workflow-complete', { workflowId: id }); }, 1); + + // Technically the engine should return an event emitter + // But as I don't think we use it, I'm happy to ignore this }; // return a list of jobs in progress diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index d6af0e965..2b7c58e46 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -24,7 +24,8 @@ import { import createMockRTE from '../../src/mock/runtime-engine'; import { mockChannel } from '../../src/mock/sockets'; import { stringify, createAttemptState } from '../../src/util'; -import { ExecutionPlan } from '@openfn/runtime'; + +import type { ExecutionPlan } from '@openfn/runtime'; import type { AttemptState } from '../../src/types'; const enc = new TextEncoder(); @@ -407,7 +408,7 @@ test('execute should pass the final result to onFinish', async (t) => { id: 'a', jobs: [ { - expression: JSON.stringify({ done: true }), + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -431,7 +432,7 @@ test('execute should return a context object', async (t) => { id: 'a', jobs: [ { - expression: JSON.stringify({ done: true }), + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -476,7 +477,7 @@ test('execute should lazy-load a credential', async (t) => { jobs: [ { configuration: 'abc', - expression: JSON.stringify({ done: true }), + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -511,7 +512,7 @@ test('execute should lazy-load initial state', async (t) => { initialState: 'abc', jobs: [ { - expression: JSON.stringify({ done: true }), + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -545,7 +546,7 @@ test('execute should call all events on the socket', async (t) => { // GET_DATACLIP, // TODO not really implemented properly yet ATTEMPT_START, RUN_START, - ATTEMPT_LOG, // This won't log with the mock logger + ATTEMPT_LOG, RUN_COMPLETE, ATTEMPT_COMPLETE, ]; @@ -558,8 +559,8 @@ test('execute should call all events on the socket', async (t) => { { id: 'trigger', configuration: 'a', - expression: 'fn(a => a)', adaptor: '@openfn/language-common@1.0.0', + expression: 'fn(() => console.log("x"))', }, ], }; @@ -568,7 +569,6 @@ test('execute should call all events on the socket', async (t) => { return new Promise((done) => { execute(channel, engine, logger, plan, options, (result) => { - // console.log(events); // Check that events were passed to the socket // This is deliberately crude t.assert(allEvents.every((e) => events[e])); diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index faf004527..72cf22167 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -36,7 +36,7 @@ const getAttempt = (ext = {}, jobs?: any) => ({ { id: 'j', adaptor: '@openfn/language-common@1.0.0', - body: JSON.stringify({ answer: 42 }), + body: 'fn(() => ({ answer: 42 }))', }, ], ...ext, @@ -51,7 +51,7 @@ test.serial( id: 'attempt-1', jobs: [ { - body: JSON.stringify({ count: 122 }), + body: 'fn(() => ({ count: 122 }))', }, ], }; @@ -69,7 +69,7 @@ test.serial( test.serial('should run an attempt which returns intial state', async (t) => { return new Promise((done) => { lng.addDataclip('x', { - route: 66, + data: 66, }); const attempt = { @@ -77,13 +77,13 @@ test.serial('should run an attempt which returns intial state', async (t) => { dataclip_id: 'x', jobs: [ { - body: 'whatever', + body: 'fn((s) => s)', }, ], }; lng.waitForResult(attempt.id).then((result) => { - t.deepEqual(result, { route: 66 }); + t.deepEqual(result, { data: 66 }); done(); }); @@ -173,17 +173,17 @@ test.serial( id: 'some-job', credential_id: 'a', adaptor: '@openfn/language-common@1.0.0', - body: JSON.stringify({ answer: 42 }), + body: 'fn(() => ({ answer: 42 }))', }, ]); let didCallEvent = false; - lng.onSocketEvent(e.GET_CREDENTIAL, attempt.id, ({ payload }) => { + lng.onSocketEvent(e.GET_CREDENTIAL, attempt.id, () => { // again there's no way to check the right credential was returned didCallEvent = true; }); - lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, (evt) => { + lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, () => { t.true(didCallEvent); done(); }); @@ -268,11 +268,15 @@ test.serial( `events: lightning should receive a ${e.ATTEMPT_LOG} event`, (t) => { return new Promise((done) => { - const attempt = getAttempt(); - - let didCallEvent = false; + const attempt = { + id: 'attempt-1', + jobs: [ + { + body: 'fn((s) => { console.log("x"); return s })', + }, + ], + }; - // The mock runtime will put out a default log lng.onSocketEvent(e.ATTEMPT_LOG, attempt.id, ({ payload }) => { const log = payload; @@ -280,13 +284,10 @@ test.serial( t.truthy(log.attempt_id); t.truthy(log.run_id); t.truthy(log.message); - t.assert(log.message[0].startsWith('Running job')); - - didCallEvent = true; + t.deepEqual(log.message, ['x']); }); lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, (evt) => { - t.true(didCallEvent); done(); }); @@ -300,13 +301,14 @@ test.serial( test.serial.skip(`events: logs should have increasing timestamps`, (t) => { return new Promise((done) => { const attempt = getAttempt({}, [ - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, ]); const history: bigint[] = []; @@ -372,7 +374,7 @@ test('should register and de-register attempts to the server', async (t) => { id: 'attempt-1', jobs: [ { - body: JSON.stringify({ count: 122 }), + body: 'fn(() => ({ count: 122 }))', }, ], }; @@ -398,13 +400,14 @@ test('should register and de-register attempts to the server', async (t) => { // TODO this is a server test // What I am testing here is that the first job completes // before the second job starts -test('should not claim while at capacity', async (t) => { +// TODO add wait helper +test.skip('should not claim while at capacity', async (t) => { return new Promise((done) => { const attempt1 = { id: 'attempt-1', jobs: [ { - body: 'wait@500', + body: 'wait(500)', }, ], }; @@ -444,7 +447,85 @@ test('should not claim while at capacity', async (t) => { }); }); -// hmm, i don't even think I can test this in the mock runtime -test.skip('should pass the right dataclip when running in parallel', () => {}); +test('should pass the right dataclip when running in parallel', (t) => { + return new Promise((done) => { + const job = (id: string, next?: string) => ({ + id, + body: `fn((s) => { s.data.${id} = true; return s; })`, + }); + + const edge = (from: string, to: string) => ({ + id: `${from}-${to}`, + source_job_id: from, + target_job_id: to, + }); + + const outputDataclipIds = {}; + const inputDataclipIds = {}; + const outputs = {}; + const a = { + id: 'a', + body: 'fn(() => ({ data: { a: true } }))', + next: { j: true, k: true }, + }; + + const j = job('j', 'x'); + const k = job('k', 'y'); + const x = job('x'); + const y = job('y'); + + const attempt = { + id: 'p1', + jobs: [a, j, k, x, y], + edges: [edge('a', 'j'), edge('a', 'k'), edge('j', 'x'), edge('k', 'y')], + }; + + // Save all the input dataclip ids for each job + const unsub2 = lng.onSocketEvent( + e.RUN_START, + attempt.id, + ({ payload }) => { + inputDataclipIds[payload.job_id] = payload.input_dataclip_id; + }, + false + ); -test.todo(`should run multiple attempts`); + // Save all the output dataclips & ids for each job + const unsub1 = lng.onSocketEvent( + e.RUN_COMPLETE, + attempt.id, + ({ payload }) => { + outputDataclipIds[payload.job_id] = payload.output_dataclip_id; + outputs[payload.job_id] = JSON.parse(payload.output_dataclip); + }, + false + ); + + lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, (evt) => { + unsub1(); + unsub2(); + + // Now check everything was correct + + // Job a we don't really care about, but check the output anyway + t.deepEqual(outputs.a.data, { a: true }); + + // a feeds in to j and k + t.deepEqual(inputDataclipIds.j, outputDataclipIds.a); + t.deepEqual(inputDataclipIds.k, outputDataclipIds.a); + + // j feeds into x + t.deepEqual(inputDataclipIds.x, outputDataclipIds.j); + + // k feeds into y + t.deepEqual(inputDataclipIds.y, outputDataclipIds.k); + + // x and y should have divergent states + t.deepEqual(outputs.x.data, { a: true, j: true, x: true }); + t.deepEqual(outputs.y.data, { a: true, k: true, y: true }); + done(); + }); + + lng.enqueueAttempt(attempt); + }); +}); diff --git a/packages/ws-worker/test/mock/runtime-engine.test.ts b/packages/ws-worker/test/mock/runtime-engine.test.ts index eb207ab4a..14dc2b843 100644 --- a/packages/ws-worker/test/mock/runtime-engine.test.ts +++ b/packages/ws-worker/test/mock/runtime-engine.test.ts @@ -14,21 +14,24 @@ const sampleWorkflow = { { id: 'j1', adaptor: 'common@1.0.0', - expression: '{ "x": 10 }', + expression: 'fn(() => ({ data: { x: 10 } }))', }, ], } as ExecutionPlan; +let engine; + +test.before(async () => { + engine = await create(); +}); + test('getStatus() should should have no active workflows', async (t) => { - const engine = await create(); const { active } = engine.getStatus(); t.is(active, 0); }); test('Dispatch start events for a new workflow', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent(engine, 'workflow-start'); t.truthy(evt); @@ -36,7 +39,6 @@ test('Dispatch start events for a new workflow', async (t) => { }); test('getStatus should report one active workflow', async (t) => { - const engine = await create(); engine.execute(sampleWorkflow); const { active } = engine.getStatus(); @@ -45,8 +47,6 @@ test('getStatus should report one active workflow', async (t) => { }); test('Dispatch complete events when a workflow completes', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent( engine, @@ -57,8 +57,6 @@ test('Dispatch complete events when a workflow completes', async (t) => { }); test('Dispatch start events for a job', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent(engine, 'job-start'); t.truthy(evt); @@ -67,106 +65,51 @@ test('Dispatch start events for a job', async (t) => { }); test('Dispatch complete events for a job', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent(engine, 'job-complete'); t.truthy(evt); t.is(evt.workflowId, 'w1'); t.is(evt.jobId, 'j1'); - t.truthy(evt.state); + t.deepEqual(evt.state, { data: { x: 10 } }); }); -test('mock should evaluate expressions as JSON', async (t) => { - const engine = await create(); - - engine.execute(sampleWorkflow); - const evt = await waitForEvent(engine, 'job-complete'); - t.deepEqual(evt.state, { x: 10 }); -}); - -test('mock should wait if expression starts with @wait', async (t) => { - const engine = await create(); +test('Dispatch error event for a crash', async (t) => { const wf = { - id: 'w1', + id: 'xyz', jobs: [ { id: 'j1', - expression: 'wait@100', + adaptor: 'common@1.0.0', + expression: 'fn(() => ( @~!"@£!4 )', }, ], } as ExecutionPlan; - engine.execute(wf); - const start = Date.now(); - const evt = await waitForEvent(engine, 'workflow-complete'); - const end = Date.now() - start; - t.true(end > 90); -}); -test('mock should return initial state as result state', async (t) => { - const engine = await create(); - - const wf = { - initialState: { y: 22 }, - jobs: [ - { - adaptor: 'common@1.0.0', - }, - ], - }; engine.execute(wf); + const evt = await waitForEvent(engine, 'workflow-error'); - const evt = await waitForEvent(engine, 'job-complete'); - t.deepEqual(evt.state, { y: 22 }); + t.is(evt.workflowId, 'xyz'); + t.is(evt.type, 'RuntimeCrash'); + t.regex(evt.message, /invalid or unexpected token/i); }); -test('mock prefers JSON state to initial state', async (t) => { - const engine = await create(); - +test('wait function', async (t) => { const wf = { - initialState: { y: 22 }, + id: 'w1', jobs: [ { - adaptor: 'common@1.0.0', - expression: '{ "z": 33 }', + id: 'j1', + expression: 'wait(100)', }, ], - }; + } as ExecutionPlan; engine.execute(wf); + const start = Date.now(); - const evt = await waitForEvent(engine, 'job-complete'); - t.deepEqual(evt.state, { z: 33 }); -}); - -test('mock should dispatch log events when evaluating JSON', async (t) => { - const engine = await create(); - - const logs = []; - engine.on('workflow-log', (l) => { - logs.push(l); - }); - - engine.execute(sampleWorkflow); - await waitForEvent(engine, 'workflow-complete'); - - t.deepEqual(logs[0].message, ['Running job j1']); - t.deepEqual(logs[1].message, ['Parsing expression as JSON state']); -}); - -test('mock should throw if the magic option is passed', async (t) => { - const engine = await create(); - - const logs = []; - engine.on('workflow-log', (l) => { - logs.push(l); - }); + await waitForEvent(engine, 'workflow-complete'); - await t.throwsAsync( - async () => engine.execute(sampleWorkflow, { throw: true }), - { - message: 'test error', - } - ); + const end = Date.now() - start; + t.true(end > 90); }); test('resolve credential before job-start if credential is a string', async (t) => { @@ -179,7 +122,6 @@ test('resolve credential before job-start if credential is a string', async (t) return {}; }; - const engine = await create(); // @ts-ignore engine.execute(wf, { resolvers: { credential } }); @@ -188,8 +130,6 @@ test('resolve credential before job-start if credential is a string', async (t) }); test('listen to events', async (t) => { - const engine = await create(); - const called = { 'job-start': false, 'job-complete': false, @@ -198,41 +138,50 @@ test('listen to events', async (t) => { 'workflow-complete': false, }; - engine.listen(sampleWorkflow.id, { + const wf = { + id: 'wibble', + jobs: [ + { + id: 'j1', + adaptor: 'common@1.0.0', + expression: 'export default [() => { console.log("x"); }]', + }, + ], + } as ExecutionPlan; + + engine.listen(wf.id, { 'job-start': ({ workflowId, jobId }) => { called['job-start'] = true; - t.is(workflowId, sampleWorkflow.id); - t.is(jobId, sampleWorkflow.jobs[0].id); + t.is(workflowId, wf.id); + t.is(jobId, wf.jobs[0].id); }, 'job-complete': ({ workflowId, jobId }) => { called['job-complete'] = true; - t.is(workflowId, sampleWorkflow.id); - t.is(jobId, sampleWorkflow.jobs[0].id); + t.is(workflowId, wf.id); + t.is(jobId, wf.jobs[0].id); // TODO includes state? }, 'workflow-log': ({ workflowId, message }) => { called['workflow-log'] = true; - t.is(workflowId, sampleWorkflow.id); + t.is(workflowId, wf.id); t.truthy(message); }, 'workflow-start': ({ workflowId }) => { called['workflow-start'] = true; - t.is(workflowId, sampleWorkflow.id); + t.is(workflowId, wf.id); }, 'workflow-complete': ({ workflowId }) => { called['workflow-complete'] = true; - t.is(workflowId, sampleWorkflow.id); + t.is(workflowId, wf.id); }, }); - engine.execute(sampleWorkflow); + engine.execute(wf); await waitForEvent(engine, 'workflow-complete'); t.assert(Object.values(called).every((v) => v === true)); }); test('only listen to events for the correct workflow', async (t) => { - const engine = await create(); - engine.listen('bobby mcgee', { 'workflow-start': ({ workflowId }) => { throw new Error('should not have called this!!'); @@ -250,12 +199,11 @@ test('do nothing for a job if no expression and adaptor (trigger node)', async ( jobs: [ { id: 'j1', + adaptor: '@openfn/language-common@1.0.0', }, ], } as ExecutionPlan; - const engine = await create(); - let didCallEvent = false; engine.listen(workflow.id, {