Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/lovely-dodos-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/runtime': minor
---

Allow globals to be passed into the execution environment
6 changes: 4 additions & 2 deletions packages/lightning-mock/src/api-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,18 @@ const setupDevAPI = (
attemptId: string,
fn: (evt: any) => void,
once = true
) => {
): (() => void) => {
const unsubscribe = () => state.events.removeListener(event, handler);
function handler(e: any) {
if (e.attemptId && e.attemptId === attemptId) {
if (once) {
state.events.removeListener(event, handler);
unsubscribe();
}
fn(e);
}
}
state.events.addListener(event, handler);
return unsubscribe;
};
};

Expand Down
5 changes: 4 additions & 1 deletion packages/runtime/src/execute/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ const freezeAll = (

// Build a safe and helpful execution context
// This will be shared by all jobs
export default (state: State, options: Pick<Options, 'jobLogger'>) => {
export default (state: State, options: Pick<Options, 'jobLogger' | 'globals'>) => {
const logger = options.jobLogger ?? console;
const globals = options.globals || {};
const context = vm.createContext(
freezeAll(
{
...globals,
// Note that these globals will be overridden
console: logger,
clearInterval,
clearTimeout,
Expand Down
3 changes: 3 additions & 0 deletions packages/runtime/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ export type Options = {
linker?: LinkerOptions;

callbacks?: ExecutionCallbacks;

// inject globals into the environment
globals?: any;
};

const defaultState = { data: {}, configuration: {} };
Expand Down
44 changes: 44 additions & 0 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,47 @@ test('import from a module', async (t) => {

t.is(result.data, 'test');
});

test('inject globals', async (t) => {
const expression =
'export default [(s) => Object.assign(s, { data: { x } })]';

const result: any = await run(
expression,
{},
{
globals: {
x: 90210,
},
}
);
t.is(result.data.x, 90210);
});

test("injected globals can't override special functions", async (t) => {
const panic = () => {
throw new Error('illegal override');
};

const globals = {
console: panic,
clearInterval: panic,
clearTimeout: panic,
parseFloat: panic,
parseInt: panic,
setInterval: panic,
setTimeout: panic,
};
const expression = `export default [(s) => {
parseFloat();
parseInt();
const i = setInterval(() => {}, 1000);
clearInterval(i);
const t = setTimeout(() => {}, 1000);
clearTimeout(t);
return s;
}]`;

const result: any = await run(expression, {}, { globals });
t.falsy(result.errors);
});
175 changes: 75 additions & 100 deletions packages/ws-worker/src/mock/runtime-engine.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import crypto from 'node:crypto';
import { EventEmitter } from 'node:events';
import type { ExecutionPlan, JobNode } from '@openfn/runtime';
import run, { ExecutionPlan } from '@openfn/runtime';
import * as engine from '@openfn/engine-multi';
import mockResolvers from './resolvers';

Expand All @@ -26,6 +25,19 @@ export type WorkflowErrorEvent = {
message: string;
};

// this is basically a fake adaptor
// these functions will be injected into scope
const helpers = {
fn: (f: Function) => (s: any) => f(s),
wait: (duration: number) => (s: any) =>
new Promise((resolve) => setTimeout(() => resolve(s), duration)),
};

// The mock runtime engine creates a fake engine interface
// around a real runtime engine
// Note that it does not dispatch runtime logs and only supports console.log
// This gives us real eventing in the worker tests
// TODO - even better would be to re-use the engine's event map or something
async function createMock() {
const activeWorkflows = {} as Record<string, true>;
const bus = new EventEmitter();
Expand Down Expand Up @@ -53,115 +65,78 @@ async function createMock() {
listeners[planId] = events;
};

const executeJob = async (
workflowId: string,
job: JobNode,
initialState = {},
resolvers: engine.Resolvers = mockResolvers
) => {
const { id, expression, configuration, adaptor } = job;

// If no expression or adaptor, this is (probably) a trigger node.
// Silently do nothing
if (!expression && !adaptor) {
return initialState;
const execute = async (
xplan: ExecutionPlan,
options: { resolvers?: engine.Resolvers; throw?: boolean } = {
resolvers: mockResolvers,
}
) => {
const { id, jobs } = xplan;
activeWorkflows[id!] = true;

const runId = crypto.randomUUID();
for (const job of jobs) {
if (typeof job.configuration === 'string') {
// Call the crendtial callback, but don't do anything with it
job.configuration = await options.resolvers?.credential?.(
job.configuration
);
}

const jobId = id;
if (typeof configuration === 'string') {
// Fetch the credential but do nothing with it
// Maybe later we use it to assemble state
await resolvers.credential?.(configuration);
// Fake compilation
if (
typeof job.expression === 'string' &&
!(job.expression as string).match(/export default \[/)
) {
job.expression = `export default [${job.expression}];`;
}
}

const info = (...message: any[]) => {
dispatch('workflow-log', {
workflowId,
message: message,
level: 'info',
time: (BigInt(Date.now()) * BigInt(1e3)).toString(),
name: 'mck',
});
// TODO do I need a more sophisticated solution here?
const jobLogger = {
log: (...args: any[]) => {
dispatch('workflow-log', {
workflowId: id,
level: 'info',
json: true,
message: args,
time: Date.now(),
});
},
};

// Get the job details from lightning
// start instantly and emit as it goes
dispatch('job-start', { workflowId, jobId, runId });
info('Running job ' + jobId);
let nextState = initialState;

// @ts-ignore
if (expression?.startsWith?.('wait@')) {
const [_, delay] = (expression as string).split('@');
nextState = initialState;
await new Promise<void>((resolve) => {
setTimeout(() => resolve(), parseInt(delay));
});
} else {
// Try and parse the expression as JSON, in which case we use it as the final state
const opts = {
strict: false,
jobLogger,
...options,
globals: helpers,
callbacks: {
notify: (name: any, payload: any) => {
dispatch(name, {
workflowId: id,
...payload,
});
},
},
};
setTimeout(async () => {
dispatch('workflow-start', { workflowId: id });

try {
// @ts-ignore
nextState = JSON.parse(expression);
// What does this look like? Should be a logger object
info('Parsing expression as JSON state');
info(nextState);
} catch (e) {
// Do nothing, it's fine
nextState = initialState;
await run(xplan, undefined, opts as any);
} catch (e: any) {
dispatch('workflow-error', {
workflowId: id,
type: e.name,
message: e.message,
});
}
}

dispatch('job-complete', {
workflowId,
jobId,
state: nextState,
runId,
next: [], // TODO hmm. I think we need to do better than this.
});

return nextState;
};

// Start executing an ExecutionPlan
// The mock uses lots of timeouts to make testing a bit easier and simulate asynchronicity
const execute = (
xplan: ExecutionPlan,
options: { resolvers?: engine.Resolvers; throw?: boolean } = {
resolvers: mockResolvers,
}
) => {
// This is just an easy way to test the options gets fed through to execute
// Also lets me test error handling!
if (options.throw) {
throw new Error('test error');
}

const { id, jobs, initialState } = xplan;
const workflowId = id;
activeWorkflows[id!] = true;

// TODO do we want to load a globals dataclip from job.state here?
// This isn't supported right now
// We would need to use resolvers.dataclip if we wanted it

setTimeout(() => {
dispatch('workflow-start', { workflowId });
setTimeout(async () => {
let state = initialState || {};
// Trivial job reducer in our mock
for (const job of jobs) {
state = await executeJob(id!, job, state, options.resolvers);
}
setTimeout(() => {
delete activeWorkflows[id!];
dispatch('workflow-complete', { workflowId });
// TODO on workflow complete we should maybe tidy the listeners?
// Doesn't really matter in the mock though
}, 1);
}, 1);
delete activeWorkflows[id!];
dispatch('workflow-complete', { workflowId: id });
}, 1);

// Technically the engine should return an event emitter
// But as I don't think we use it, I'm happy to ignore this
};

// return a list of jobs in progress
Expand Down
16 changes: 8 additions & 8 deletions packages/ws-worker/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import {
import createMockRTE from '../../src/mock/runtime-engine';
import { mockChannel } from '../../src/mock/sockets';
import { stringify, createAttemptState } from '../../src/util';
import { ExecutionPlan } from '@openfn/runtime';

import type { ExecutionPlan } from '@openfn/runtime';
import type { AttemptState } from '../../src/types';

const enc = new TextEncoder();
Expand Down Expand Up @@ -407,7 +408,7 @@ test('execute should pass the final result to onFinish', async (t) => {
id: 'a',
jobs: [
{
expression: JSON.stringify({ done: true }),
expression: 'fn(() => ({ done: true }))',
},
],
};
Expand All @@ -431,7 +432,7 @@ test('execute should return a context object', async (t) => {
id: 'a',
jobs: [
{
expression: JSON.stringify({ done: true }),
expression: 'fn(() => ({ done: true }))',
},
],
};
Expand Down Expand Up @@ -476,7 +477,7 @@ test('execute should lazy-load a credential', async (t) => {
jobs: [
{
configuration: 'abc',
expression: JSON.stringify({ done: true }),
expression: 'fn(() => ({ done: true }))',
},
],
};
Expand Down Expand Up @@ -511,7 +512,7 @@ test('execute should lazy-load initial state', async (t) => {
initialState: 'abc',
jobs: [
{
expression: JSON.stringify({ done: true }),
expression: 'fn(() => ({ done: true }))',
},
],
};
Expand Down Expand Up @@ -545,7 +546,7 @@ test('execute should call all events on the socket', async (t) => {
// GET_DATACLIP, // TODO not really implemented properly yet
ATTEMPT_START,
RUN_START,
ATTEMPT_LOG, // This won't log with the mock logger
ATTEMPT_LOG,
RUN_COMPLETE,
ATTEMPT_COMPLETE,
];
Expand All @@ -558,8 +559,8 @@ test('execute should call all events on the socket', async (t) => {
{
id: 'trigger',
configuration: 'a',
expression: 'fn(a => a)',
adaptor: '@openfn/language-common@1.0.0',
expression: 'fn(() => console.log("x"))',
},
],
};
Expand All @@ -568,7 +569,6 @@ test('execute should call all events on the socket', async (t) => {

return new Promise((done) => {
execute(channel, engine, logger, plan, options, (result) => {
// console.log(events);
// Check that events were passed to the socket
// This is deliberately crude
t.assert(allEvents.every((e) => events[e]));
Expand Down
Loading