Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 59 additions & 14 deletions slack-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -655,21 +655,19 @@ function isPoisonMessageError(err) {
return message.includes("invalid broker envelope signature") || message.includes("failed to decrypt broker envelope");
}

async function processPulledMessage(message) {
if (!verifyBrokerEnvelope(message)) {
throw new Error("invalid broker envelope signature");
}

let slackEventEnvelopePayload;
try {
slackEventEnvelopePayload = decryptEnvelope(message);
markHealth("inbound_decrypt", true);
} catch (err) {
markHealth("inbound_decrypt", false, err);
throw err;
}
function isGenericEnvelope(payload) {
return (
payload != null &&
typeof payload === "object" &&
typeof payload.source === "string" &&
typeof payload.type === "string" &&
"payload" in payload &&
typeof payload.broker_timestamp === "number"
);
}

logInfo(`📦 decrypted envelope — type: ${slackEventEnvelopePayload?.type || "unknown"}`);
async function handleSlackPayload(slackEventEnvelopePayload) {
logInfo(`📦 slack payload — type: ${slackEventEnvelopePayload?.type || "unknown"}`);

if (slackEventEnvelopePayload?.type !== "event_callback") {
logInfo(` ↳ ignoring non-event_callback type: ${slackEventEnvelopePayload?.type}`);
Expand Down Expand Up @@ -712,6 +710,53 @@ async function processPulledMessage(message) {
return true;
}

async function handleDashboardEvent(type, payload) {
logInfo(`📊 dashboard event: ${type}`, JSON.stringify(payload).slice(0, 200));
// TODO: implement dashboard event handling (env updates, config changes)
return true;
}

async function handleSystemEvent(type, payload) {
logInfo(`⚙️ system event: ${type}`, JSON.stringify(payload).slice(0, 200));
// TODO: implement system event handling
return true;
}

async function processPulledMessage(message) {
if (!verifyBrokerEnvelope(message)) {
throw new Error("invalid broker envelope signature");
}

let payload;
try {
payload = decryptEnvelope(message);
markHealth("inbound_decrypt", true);
} catch (err) {
markHealth("inbound_decrypt", false, err);
throw err;
}

// Generic envelope dispatch
if (isGenericEnvelope(payload)) {
logInfo(`📦 generic envelope — source: ${payload.source}, type: ${payload.type}`);
switch (payload.source) {
case "slack":
return handleSlackPayload(payload.payload);
case "dashboard":
return handleDashboardEvent(payload.type, payload.payload);
case "system":
return handleSystemEvent(payload.type, payload.payload);
default:
logWarn(`⚠️ unknown event source: ${payload.source} — acking to avoid blocking queue`);
return true;
}
}

// Legacy: raw Slack event_callback (backwards compat during rollout)
logInfo(`📦 legacy envelope — type: ${payload?.type || "unknown"}`);
return handleSlackPayload(payload);
}

function getLogLinesForResponse(url) {
const nParam = url.searchParams.get("n");
const filterParam = url.searchParams.get("filter");
Expand Down
Loading