Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,10 @@ GITHUB_TOKEN=#Your GitHub access token for GitHubCard functionality

# Optional: Provider-specific Configuration
# Add any provider-specific configuration variables here
# Example: OPENAI_ORG_ID=#Your OpenAI Organization ID if needed
# Example: OPENAI_ORG_ID=#Your OpenAI Organization ID if needed

# ADSP configuration
ADSP_CLIENT_SECRET=#ADSP Client secret
ADSP_TENANT_REALM=#ADSP Tenant realm
ADSP_ACCESS_SERVICE_URL=#ADSP Access service URL
ADSP_DIRECTORY_URL=#ADSP Service directory URL
51 changes: 34 additions & 17 deletions config/realTime.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const { handlePrompt } = require("./handleAiInteractions"); // Import the handle

const realTimeClients = {};
const { authenticateAndDecode } = require("../middleware/verify");
const { connectFormSubmissions, disconnectFormSubmissions } = require("../controllers/adsp");

async function verifyTokenAndAccount(token) {
try {
Expand Down Expand Up @@ -157,24 +158,40 @@ async function handleMessage(message, client) {
sendError(client, "Missing Uuid");
return;
}

if (data.type === "ping") {
sendToClient(data.uuid, data.session, "pong");
return;
}

if (data.type !== "prompt") {
sendToClient(
data.uuid,
data.session,
"ERROR",
"Unrecognized message type"
);
return;
switch (data.type) {
case "ping": {
sendToClient(data.uuid, data.session, "pong");
return;
}
case "prompt": {
const promptConfig = buildPromptConfig(data, null);
await handlePrompt(promptConfig, sendToClient);
break;
}
case "connect-form-submissions": {
await connectFormSubmissions(
{ uuid: data.uuid, session: data.session },
sendToClient
);
break;
}
case "disconnect-form-submissions": {
await disconnectFormSubmissions(
{ uuid: data.uuid, session: data.session },
sendToClient
);
break;
}
default: {
sendToClient(
data.uuid,
data.session,
"ERROR",
"Unrecognized message type"
);
return;
}
}

const promptConfig = buildPromptConfig(data, null);
await handlePrompt(promptConfig, sendToClient);
} catch (error) {
console.error("Failed to handle message:", error);
sendError(client, "Error processing message");
Expand Down
149 changes: 149 additions & 0 deletions controllers/adsp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
const { initializeService, adspId } = require("@abgov/adsp-service-sdk");
const { default: axios } = require("axios");
const { io } = require("socket.io-client");

const LOG_CONTEXT = {
context: "ADSP controller",
};

const FORM_SUBMITTED_STREAM_ID = "form-submitted";

let adsp;
async function initializeAdsp() {
if (!adsp) {
// Initialize the ADSP SDK.
adsp = await initializeService(
{
serviceId: adspId`urn:ads:demo:logicstudio.ai`,
clientSecret: process.env.ADSP_CLIENT_SECRET,
realm: process.env.ADSP_TENANT_REALM,
accessServiceUrl: process.env.ADSP_ACCESS_SERVICE_URL,
directoryUrl: process.env.ADSP_DIRECTORY_URL,
// Configuration defined event stream for socket.io connection.
eventStreams: [
{
id: FORM_SUBMITTED_STREAM_ID,
name: "Form submitted updates",
description: "Provides updates on form submissions",
subscriberRoles: [
`urn:ads:platform:tenant-service:platform-service`,
],
publicSubscribe: false,
events: [
{
namespace: "form-service",
name: "form-submitted",
},
],
},
],
},
{}
);
}
return adsp;
}

async function getFormData(directory, tokenProvider, formId) {
const formServiceUrl = await directory.getServiceUrl(
adspId`urn:ads:platform:form-service`
);
const token = await tokenProvider.getAccessToken();
const { data } = await axios.get(
new URL(`/form/v1/forms/${formId}/data`, formServiceUrl).href,
{
headers: {
Authorization: `Bearer ${token}`,
},
}
);

return data;
}

let socket;
const clientContexts = {};

// Connect to event stream via socket.io
async function connectFormSubmissions(config, sendToClient) {
const { uuid, session } = config;

const { directory, tokenProvider, logger } = await initializeAdsp();

const pushServiceUrl = await directory.getServiceUrl(
adspId`urn:ads:platform:push-service`
);

clientContexts[`${uuid}${session}`] = { uuid, session };
if (!socket) {
socket = io(pushServiceUrl.href, {
autoConnect: true,
reconnection: true,
query: {
stream: FORM_SUBMITTED_STREAM_ID,
},
transports: ["websocket"],
withCredentials: true,
auth: async (cb) => cb({ token: await tokenProvider.getAccessToken() }),
});

socket.on("connect", function () {
logger.info("Connected for form submissions...", LOG_CONTEXT);
sendToClient(uuid, config.session, "message", {
type: "form-submitted-updates-status",
connected: true,
});
});

socket.on("connect_error", function (err) {
logger.error(
`Connect to form submission updates failed with error: ${err}`,
LOG_CONTEXT
);
});

socket.on("disconnect", function (reason) {
logger.info(
`Disconnected from form submission updates due to reason: ${reason}`
);
});

socket.on("form-service:form-submitted", async ({ payload }) => {
if (payload?.form?.id) {
// Read the form data, which isn't included as part of the event payload.
const data = await getFormData(directory, tokenProvider, payload.form.id);

for (const { uuid, session } of Object.values(clientContexts)) {
sendToClient(uuid, session, "message", {
type: "form-submitted",
form: payload.form,
...data,
});
}
}
});
} else {
sendToClient(uuid, session, "message", {
type: "form-submitted-updates-status",
connected: true,
});
}
}

// Disconnect from event stream
async function disconnectFormSubmissions(config, sendToClient) {
const { uuid, session } = config;
if (clientContexts[`${uuid}${session}`]) {
delete clientContexts[`${uuid}${session}`];
}
if (!Object.keys(clientContexts).length && socket?.connected) {
socket.disconnect();
socket = undefined;
}
sendToClient(uuid, session, "message", {
type: "form-submitted-updates-status",
connected: false,
});
}

module.exports = { connectFormSubmissions, disconnectFormSubmissions };
Loading