From c21a62a933d14935ecf2aaa701b41ab778879d49 Mon Sep 17 00:00:00 2001 From: Baudbot Date: Mon, 23 Feb 2026 00:12:47 -0500 Subject: [PATCH] bridge: generic event envelope dispatch for multi-source broker events Refactor processPulledMessage() in broker-bridge.mjs to support a generic envelope format alongside the legacy raw Slack event_callback format. Changes: - Add isGenericEnvelope() detection function for structured envelopes with source, type, payload, and broker_timestamp fields - Extract existing Slack handling into standalone handleSlackPayload() - Add stub handlers for dashboard and system event sources - Dispatch generic envelopes by source (slack, dashboard, system) - Unknown sources are acked to avoid blocking the queue - Legacy raw Slack payloads continue to work (backwards compat) Tests: - Add 3 integration tests: generic slack dispatch, dashboard dispatch, unknown source ack - Fix test env inheritance (cleanEnv helper) to prevent false failures when host broker token is expired Refs: modem-dev/baudbot-services#56 --- slack-bridge/broker-bridge.mjs | 73 +++- test/broker-bridge.integration.test.mjs | 484 +++++++++++++++++++++++- 2 files changed, 536 insertions(+), 21 deletions(-) diff --git a/slack-bridge/broker-bridge.mjs b/slack-bridge/broker-bridge.mjs index e9658d3..faabade 100755 --- a/slack-bridge/broker-bridge.mjs +++ b/slack-bridge/broker-bridge.mjs @@ -655,21 +655,19 @@ function isPoisonMessageError(err) { return message.includes("invalid broker envelope signature") || message.includes("failed to decrypt broker envelope"); } -async function processPulledMessage(message) { - if (!verifyBrokerEnvelope(message)) { - throw new Error("invalid broker envelope signature"); - } - - let slackEventEnvelopePayload; - try { - slackEventEnvelopePayload = decryptEnvelope(message); - markHealth("inbound_decrypt", true); - } catch (err) { - markHealth("inbound_decrypt", false, err); - throw err; - } +function isGenericEnvelope(payload) { + return ( + payload != null && + typeof payload === "object" && + typeof payload.source === "string" && + typeof payload.type === "string" && + "payload" in payload && + typeof payload.broker_timestamp === "number" + ); +} - logInfo(`📦 decrypted envelope — type: ${slackEventEnvelopePayload?.type || "unknown"}`); +async function handleSlackPayload(slackEventEnvelopePayload) { + logInfo(`📦 slack payload — type: ${slackEventEnvelopePayload?.type || "unknown"}`); if (slackEventEnvelopePayload?.type !== "event_callback") { logInfo(` ↳ ignoring non-event_callback type: ${slackEventEnvelopePayload?.type}`); @@ -712,6 +710,53 @@ async function processPulledMessage(message) { return true; } +async function handleDashboardEvent(type, payload) { + logInfo(`📊 dashboard event: ${type}`, JSON.stringify(payload).slice(0, 200)); + // TODO: implement dashboard event handling (env updates, config changes) + return true; +} + +async function handleSystemEvent(type, payload) { + logInfo(`⚙️ system event: ${type}`, JSON.stringify(payload).slice(0, 200)); + // TODO: implement system event handling + return true; +} + +async function processPulledMessage(message) { + if (!verifyBrokerEnvelope(message)) { + throw new Error("invalid broker envelope signature"); + } + + let payload; + try { + payload = decryptEnvelope(message); + markHealth("inbound_decrypt", true); + } catch (err) { + markHealth("inbound_decrypt", false, err); + throw err; + } + + // Generic envelope dispatch + if (isGenericEnvelope(payload)) { + logInfo(`📦 generic envelope — source: ${payload.source}, type: ${payload.type}`); + switch (payload.source) { + case "slack": + return handleSlackPayload(payload.payload); + case "dashboard": + return handleDashboardEvent(payload.type, payload.payload); + case "system": + return handleSystemEvent(payload.type, payload.payload); + default: + logWarn(`⚠️ unknown event source: ${payload.source} — acking to avoid blocking queue`); + return true; + } + } + + // Legacy: raw Slack event_callback (backwards compat during rollout) + logInfo(`📦 legacy envelope — type: ${payload?.type || "unknown"}`); + return handleSlackPayload(payload); +} + function getLogLinesForResponse(url) { const nParam = url.searchParams.get("n"); const filterParam = url.searchParams.get("filter"); diff --git a/test/broker-bridge.integration.test.mjs b/test/broker-bridge.integration.test.mjs index 750e935..54f1eaa 100644 --- a/test/broker-bridge.integration.test.mjs +++ b/test/broker-bridge.integration.test.mjs @@ -55,6 +55,16 @@ describe("broker pull bridge semi-integration", () => { const servers = []; const tempDirs = []; + // Strip real broker credentials from inherited env so spawned bridges don't + // fail with "broker access token is expired" when the host token is stale. + function cleanEnv(overrides = {}) { + const env = { ...process.env, ...overrides }; + if (!overrides.SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT) { + delete env.SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT; + } + return env; + } + afterEach(async () => { for (const child of children) { if (!child.killed) child.kill("SIGTERM"); @@ -117,7 +127,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), SLACK_BROKER_URL: `http://127.0.0.1:${brokerAddress.port}`, SLACK_BROKER_WORKSPACE_ID: "T123BROKER", SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11), @@ -238,7 +248,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), SLACK_BROKER_URL: brokerUrl, SLACK_BROKER_WORKSPACE_ID: "T123BROKER", SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11), @@ -422,7 +432,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), HOME: tempHome, PI_SESSION_ID: sessionId, SLACK_BROKER_URL: brokerUrl, @@ -536,7 +546,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), SLACK_BROKER_URL: brokerUrl, SLACK_BROKER_WORKSPACE_ID: workspaceId, SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11), @@ -623,7 +633,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), SLACK_BROKER_URL: brokerUrl, SLACK_BROKER_WORKSPACE_ID: workspaceId, SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11), @@ -711,7 +721,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), SLACK_BROKER_URL: brokerUrl, SLACK_BROKER_WORKSPACE_ID: workspaceId, SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11), @@ -795,7 +805,7 @@ describe("broker pull bridge semi-integration", () => { const bridge = spawn("node", [bridgePath], { cwd: bridgeCwd, env: { - ...process.env, + ...cleanEnv(), SLACK_BROKER_URL: brokerUrl, SLACK_BROKER_WORKSPACE_ID: workspaceId, SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11), @@ -929,4 +939,464 @@ describe("broker pull bridge semi-integration", () => { expect(exited.code).toBe(1); expect(`${bridgeStdout}\n${bridgeStderr}`).toContain("broker access token is expired"); }); + + it("dispatches generic envelope with source=slack to Slack handler", async () => { + await sodium.ready; + + const testFileDir = path.dirname(fileURLToPath(import.meta.url)); + const repoRoot = path.dirname(testFileDir); + const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs"); + const bridgeCwd = path.join(repoRoot, "slack-bridge"); + + const tempHome = mkdtempSync(path.join(tmpdir(), "baudbot-broker-test-")); + tempDirs.push(tempHome); + + const sessionDir = path.join(tempHome, ".pi", "session-control"); + mkdirSync(sessionDir, { recursive: true }); + const sessionId = "22222222-2222-2222-2222-222222222222"; + const socketFile = path.join(sessionDir, `${sessionId}.sock`); + + const receivedCommands = []; + const agentSocket = net.createServer((conn) => { + let buffer = ""; + conn.on("data", (chunk) => { + buffer += chunk.toString(); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + for (const line of lines) { + if (!line.trim()) continue; + const msg = JSON.parse(line); + receivedCommands.push(msg); + if (msg.type === "send") { + conn.write(`${JSON.stringify({ type: "response", command: "send", success: true })}\n`); + } + } + }); + }); + await new Promise((resolve) => agentSocket.listen(socketFile, resolve)); + servers.push(agentSocket); + + const serverBox = sodium.crypto_box_keypair(); + const brokerBox = sodium.crypto_box_keypair(); + const brokerSign = sodium.crypto_sign_keypair(); + const serverSignSeed = sodium.randombytes_buf(sodium.crypto_sign_SEEDBYTES); + + const testWorkspaceId = "T123BROKER"; + + // Generic envelope wrapping a Slack event_callback + const genericEnvelope = { + source: "slack", + type: "event_callback", + broker_timestamp: Math.floor(Date.now() / 1000), + payload: { + type: "event_callback", + event: { + type: "app_mention", + user: "U_ALLOWED", + channel: "C456", + ts: "1730000000.000200", + text: "<@U_BOT> generic envelope test", + }, + }, + }; + + const encrypted = sodium.crypto_box_seal( + Buffer.from(JSON.stringify(genericEnvelope)), + serverBox.publicKey, + ); + const brokerTimestamp = Math.floor(Date.now() / 1000); + const encryptedB64 = toBase64(encrypted); + const brokerSignature = toBase64( + sodium.crypto_sign_detached( + canonicalizeEnvelope(testWorkspaceId, brokerTimestamp, encryptedB64), + brokerSign.privateKey, + ), + ); + + let pullCount = 0; + let ackPayload = null; + + const broker = createServer(async (req, res) => { + if (req.method === "POST" && req.url === "/api/inbox/pull") { + pullCount += 1; + const messages = pullCount === 1 + ? [{ + message_id: "m-generic-slack-1", + workspace_id: testWorkspaceId, + encrypted: encryptedB64, + broker_timestamp: brokerTimestamp, + broker_signature: brokerSignature, + }] + : []; + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, messages })); + return; + } + + if (req.method === "POST" && req.url === "/api/inbox/ack") { + let raw = ""; + for await (const chunk of req) raw += chunk; + ackPayload = JSON.parse(raw); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, acked: ackPayload.message_ids?.length ?? 0 })); + return; + } + + if (req.method === "POST" && req.url === "/api/send") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, ts: "1234.5678" })); + return; + } + + res.writeHead(404, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: false, error: "not found" })); + }); + + await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve)); + servers.push(broker); + + const address = broker.address(); + if (!address || typeof address === "string") { + throw new Error("failed to get broker test server address"); + } + + let bridgeStdout = ""; + let bridgeStderr = ""; + let bridgeExit = null; + + const bridge = spawn("node", [bridgePath], { + cwd: bridgeCwd, + env: { + ...cleanEnv(), + HOME: tempHome, + PI_SESSION_ID: sessionId, + SLACK_BROKER_URL: `http://127.0.0.1:${address.port}`, + SLACK_BROKER_WORKSPACE_ID: testWorkspaceId, + SLACK_BROKER_SERVER_PRIVATE_KEY: toBase64(serverBox.privateKey), + SLACK_BROKER_SERVER_PUBLIC_KEY: toBase64(serverBox.publicKey), + SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: toBase64(serverSignSeed), + SLACK_BROKER_PUBLIC_KEY: toBase64(brokerBox.publicKey), + SLACK_BROKER_SIGNING_PUBLIC_KEY: toBase64(brokerSign.publicKey), + SLACK_BROKER_ACCESS_TOKEN: "test-broker-token", + SLACK_ALLOWED_USERS: "U_ALLOWED", + SLACK_BROKER_POLL_INTERVAL_MS: "50", + BRIDGE_API_PORT: "0", + }, + stdio: ["ignore", "pipe", "pipe"], + }); + + bridge.stdout.on("data", (chunk) => { bridgeStdout += chunk.toString(); }); + bridge.stderr.on("data", (chunk) => { bridgeStderr += chunk.toString(); }); + + const bridgeExited = new Promise((_, reject) => { + bridge.on("error", (err) => { + if (ackPayload !== null) return; + reject(new Error(`bridge spawn error: ${err.message}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + bridge.on("exit", (code, signal) => { + bridgeExit = { code, signal }; + if (ackPayload !== null) return; + reject(new Error(`bridge exited early: code=${code} signal=${signal}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + }); + + children.push(bridge); + + const completeWait = waitFor( + () => ackPayload !== null && receivedCommands.length > 0, + 12_000, + 50, + `timeout waiting for generic envelope forward+ack; pullCount=${pullCount}; commands=${JSON.stringify(receivedCommands)}; exit=${JSON.stringify(bridgeExit)}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`, + ); + + await Promise.race([completeWait, bridgeExited]); + + // Verify the message was acked + expect(ackPayload.message_ids).toContain("m-generic-slack-1"); + + // Verify the agent received the message (forwarded through Slack handler) + expect(receivedCommands.length).toBe(1); + expect(receivedCommands[0].type).toBe("send"); + expect(receivedCommands[0].mode).toBe("steer"); + expect(receivedCommands[0].message).toContain("generic envelope test"); + + // Verify generic envelope log line appeared + expect(bridgeStdout).toContain("generic envelope"); + expect(bridgeStdout).toContain("source: slack"); + }); + + it("dispatches generic envelope with source=dashboard and acks", async () => { + await sodium.ready; + + const testFileDir = path.dirname(fileURLToPath(import.meta.url)); + const repoRoot = path.dirname(testFileDir); + const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs"); + const bridgeCwd = path.join(repoRoot, "slack-bridge"); + + const serverBox = sodium.crypto_box_keypair(); + const brokerBox = sodium.crypto_box_keypair(); + const brokerSign = sodium.crypto_sign_keypair(); + const serverSignSeed = sodium.randombytes_buf(sodium.crypto_sign_SEEDBYTES); + + const testWorkspaceId = "T123BROKER"; + + const dashboardEnvelope = { + source: "dashboard", + type: "config.updated", + broker_timestamp: Math.floor(Date.now() / 1000), + payload: { + key: "SOME_CONFIG", + value: "new_value", + }, + }; + + const encrypted = sodium.crypto_box_seal( + Buffer.from(JSON.stringify(dashboardEnvelope)), + serverBox.publicKey, + ); + const brokerTimestamp = Math.floor(Date.now() / 1000); + const encryptedB64 = toBase64(encrypted); + const brokerSignature = toBase64( + sodium.crypto_sign_detached( + canonicalizeEnvelope(testWorkspaceId, brokerTimestamp, encryptedB64), + brokerSign.privateKey, + ), + ); + + let pullCount = 0; + let ackPayload = null; + + const broker = createServer(async (req, res) => { + if (req.method === "POST" && req.url === "/api/inbox/pull") { + pullCount += 1; + const messages = pullCount === 1 + ? [{ + message_id: "m-dashboard-1", + workspace_id: testWorkspaceId, + encrypted: encryptedB64, + broker_timestamp: brokerTimestamp, + broker_signature: brokerSignature, + }] + : []; + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, messages })); + return; + } + + if (req.method === "POST" && req.url === "/api/inbox/ack") { + let raw = ""; + for await (const chunk of req) raw += chunk; + ackPayload = JSON.parse(raw); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, acked: ackPayload.message_ids?.length ?? 0 })); + return; + } + + if (req.method === "POST" && req.url === "/api/send") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, ts: "1234.5678" })); + return; + } + + res.writeHead(404, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: false, error: "not found" })); + }); + + await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve)); + servers.push(broker); + + const address = broker.address(); + if (!address || typeof address === "string") { + throw new Error("failed to get broker test server address"); + } + + let bridgeStdout = ""; + let bridgeStderr = ""; + let bridgeExit = null; + + const bridge = spawn("node", [bridgePath], { + cwd: bridgeCwd, + env: { + ...cleanEnv(), + SLACK_BROKER_URL: `http://127.0.0.1:${address.port}`, + SLACK_BROKER_WORKSPACE_ID: testWorkspaceId, + SLACK_BROKER_SERVER_PRIVATE_KEY: toBase64(serverBox.privateKey), + SLACK_BROKER_SERVER_PUBLIC_KEY: toBase64(serverBox.publicKey), + SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: toBase64(serverSignSeed), + SLACK_BROKER_PUBLIC_KEY: toBase64(brokerBox.publicKey), + SLACK_BROKER_SIGNING_PUBLIC_KEY: toBase64(brokerSign.publicKey), + SLACK_BROKER_ACCESS_TOKEN: "test-broker-token", + SLACK_ALLOWED_USERS: "U_ALLOWED", + SLACK_BROKER_POLL_INTERVAL_MS: "50", + BRIDGE_API_PORT: "0", + }, + stdio: ["ignore", "pipe", "pipe"], + }); + + bridge.stdout.on("data", (chunk) => { bridgeStdout += chunk.toString(); }); + bridge.stderr.on("data", (chunk) => { bridgeStderr += chunk.toString(); }); + + const bridgeExited = new Promise((_, reject) => { + bridge.on("error", (err) => { + if (ackPayload !== null) return; + reject(new Error(`bridge spawn error: ${err.message}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + bridge.on("exit", (code, signal) => { + bridgeExit = { code, signal }; + if (ackPayload !== null) return; + reject(new Error(`bridge exited early: code=${code} signal=${signal}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + }); + + children.push(bridge); + + const ackWait = waitFor( + () => ackPayload !== null, + 12_000, + 50, + `timeout waiting for dashboard envelope ack; pullCount=${pullCount}; exit=${JSON.stringify(bridgeExit)}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`, + ); + + await Promise.race([ackWait, bridgeExited]); + + expect(ackPayload.message_ids).toContain("m-dashboard-1"); + expect(bridgeStdout).toContain("generic envelope"); + expect(bridgeStdout).toContain("source: dashboard"); + expect(bridgeStdout).toContain("dashboard event: config.updated"); + }); + + it("acks generic envelope with unknown source to avoid blocking queue", async () => { + await sodium.ready; + + const testFileDir = path.dirname(fileURLToPath(import.meta.url)); + const repoRoot = path.dirname(testFileDir); + const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs"); + const bridgeCwd = path.join(repoRoot, "slack-bridge"); + + const serverBox = sodium.crypto_box_keypair(); + const brokerBox = sodium.crypto_box_keypair(); + const brokerSign = sodium.crypto_sign_keypair(); + const serverSignSeed = sodium.randombytes_buf(sodium.crypto_sign_SEEDBYTES); + + const testWorkspaceId = "T123BROKER"; + + const unknownEnvelope = { + source: "future_service", + type: "something.happened", + broker_timestamp: Math.floor(Date.now() / 1000), + payload: { detail: "some data" }, + }; + + const encrypted = sodium.crypto_box_seal( + Buffer.from(JSON.stringify(unknownEnvelope)), + serverBox.publicKey, + ); + const brokerTimestamp = Math.floor(Date.now() / 1000); + const encryptedB64 = toBase64(encrypted); + const brokerSignature = toBase64( + sodium.crypto_sign_detached( + canonicalizeEnvelope(testWorkspaceId, brokerTimestamp, encryptedB64), + brokerSign.privateKey, + ), + ); + + let pullCount = 0; + let ackPayload = null; + + const broker = createServer(async (req, res) => { + if (req.method === "POST" && req.url === "/api/inbox/pull") { + pullCount += 1; + const messages = pullCount === 1 + ? [{ + message_id: "m-unknown-src-1", + workspace_id: testWorkspaceId, + encrypted: encryptedB64, + broker_timestamp: brokerTimestamp, + broker_signature: brokerSignature, + }] + : []; + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, messages })); + return; + } + + if (req.method === "POST" && req.url === "/api/inbox/ack") { + let raw = ""; + for await (const chunk of req) raw += chunk; + ackPayload = JSON.parse(raw); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, acked: ackPayload.message_ids?.length ?? 0 })); + return; + } + + if (req.method === "POST" && req.url === "/api/send") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, ts: "1234.5678" })); + return; + } + + res.writeHead(404, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: false, error: "not found" })); + }); + + await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve)); + servers.push(broker); + + const address = broker.address(); + if (!address || typeof address === "string") { + throw new Error("failed to get broker test server address"); + } + + let bridgeStdout = ""; + let bridgeStderr = ""; + let bridgeExit = null; + + const bridge = spawn("node", [bridgePath], { + cwd: bridgeCwd, + env: { + ...cleanEnv(), + SLACK_BROKER_URL: `http://127.0.0.1:${address.port}`, + SLACK_BROKER_WORKSPACE_ID: testWorkspaceId, + SLACK_BROKER_SERVER_PRIVATE_KEY: toBase64(serverBox.privateKey), + SLACK_BROKER_SERVER_PUBLIC_KEY: toBase64(serverBox.publicKey), + SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: toBase64(serverSignSeed), + SLACK_BROKER_PUBLIC_KEY: toBase64(brokerBox.publicKey), + SLACK_BROKER_SIGNING_PUBLIC_KEY: toBase64(brokerSign.publicKey), + SLACK_BROKER_ACCESS_TOKEN: "test-broker-token", + SLACK_ALLOWED_USERS: "U_ALLOWED", + SLACK_BROKER_POLL_INTERVAL_MS: "50", + BRIDGE_API_PORT: "0", + }, + stdio: ["ignore", "pipe", "pipe"], + }); + + bridge.stdout.on("data", (chunk) => { bridgeStdout += chunk.toString(); }); + bridge.stderr.on("data", (chunk) => { bridgeStderr += chunk.toString(); }); + + const bridgeExited = new Promise((_, reject) => { + bridge.on("error", (err) => { + if (ackPayload !== null) return; + reject(new Error(`bridge spawn error: ${err.message}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + bridge.on("exit", (code, signal) => { + bridgeExit = { code, signal }; + if (ackPayload !== null) return; + reject(new Error(`bridge exited early: code=${code} signal=${signal}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + }); + + children.push(bridge); + + const ackWait = waitFor( + () => ackPayload !== null, + 12_000, + 50, + `timeout waiting for unknown source ack; pullCount=${pullCount}; exit=${JSON.stringify(bridgeExit)}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`, + ); + + await Promise.race([ackWait, bridgeExited]); + + expect(ackPayload.message_ids).toContain("m-unknown-src-1"); + expect(bridgeStdout + bridgeStderr).toContain("unknown event source: future_service"); + }); });