From d1f45f959a746c3f34e711ee7afcd1bbbf0abb97 Mon Sep 17 00:00:00 2001 From: Behzad-rabiei Date: Mon, 30 Jun 2025 18:03:30 +0200 Subject: [PATCH] feat: exec mediawiki workflow on module changes --- src/services/module.service.ts | 26 +++++++++++++- src/services/temporal/mediaWiki.service.ts | 42 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 src/services/temporal/mediaWiki.service.ts diff --git a/src/services/module.service.ts b/src/services/module.service.ts index 151c706..15d7376 100644 --- a/src/services/module.service.ts +++ b/src/services/module.service.ts @@ -4,7 +4,7 @@ import { IModule, IModuleUpdateBody, Module, PlatformNames, ModuleNames } from ' import platformService from './platform.service'; import websiteService from './website'; - +import temporalMediaWiki from './temporal/mediaWiki.service'; /** * Create a module * @param {IModule} ModuleBody @@ -94,6 +94,9 @@ const updateModule = async ( // if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.Website) { // await handleHivemindWebsiteCase(newPlatform); // } + if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.MediaWiki) { + await handleHivemindMediaWikiCase(newPlatform); + } existingPlatform.metadata = newPlatform.metadata; } else { module.options.platforms.push(newPlatform); @@ -132,6 +135,27 @@ const handleHivemindWebsiteCase = async (platform: any) => { } }; +/** + * Handle special case for Hivemind module with MediaWiki platform + * @param {Object} platform - Platform object + */ +const handleHivemindMediaWikiCase = async (platform: any) => { + const platformDoc = await platformService.getPlatformById(platform.platform); + + if (!platformDoc) return; + + const isActivated = platform.metadata?.activated; + const existingWorkflowId = platformDoc.get('metadata.workflowId'); + + if (isActivated === true) { + if (!existingWorkflowId) { + const workflowId = await temporalMediaWiki.executeWorkflow(platform.platform); + platformDoc.set('metadata.workflowId', workflowId); + await platformDoc.save(); + } + } +}; + /** * Delete module * @param {HydratedDocument} module - module doc diff --git a/src/services/temporal/mediaWiki.service.ts b/src/services/temporal/mediaWiki.service.ts new file mode 100644 index 0000000..0e4a623 --- /dev/null +++ b/src/services/temporal/mediaWiki.service.ts @@ -0,0 +1,42 @@ +import { Types } from 'mongoose'; +import { v4 as uuidv4 } from 'uuid'; + +import { Client } from '@temporalio/client'; + +import parentLogger from '../../config/logger'; +import { queues } from './configs/temporal.config'; +import { TemporalCoreService } from './core.service'; + +const logger = parentLogger.child({ module: 'MediaWikiTemporalService' }); + +class TemporalMediaWikiService extends TemporalCoreService { + public async executeWorkflow(platformId: Types.ObjectId) { + const client: Client = await this.getClient(); + const payload = { + platform_id: platformId, + }; + try { + const workflowHandle = await client.workflow.execute('MediaWikiETLWorkflow', { + taskQueue: queues.TEMPORAL_QUEUE_PYTHON_HEAVY, + args: [payload], + workflowId: `mediawiki/${platformId}/${uuidv4()}`, + }); + logger.info(`Started MediaWiki workflow with ID: ${workflowHandle}`); + return workflowHandle; + } catch (error) { + logger.error(`Failed to trigger MediaWiki workflow: ${(error as Error).message}`); + throw new Error(`Failed to trigger MediaWiki workflow: ${(error as Error).message}`); + } + } + + public async terminateWorkflow(workflowId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.workflow.getHandle(workflowId); + const description = await handle.describe(); + if (description.status.name !== 'TERMINATED' && description.status.name !== 'COMPLETED') { + await handle.terminate('Terminated due to schedule deletion'); + } + } +} + +export default new TemporalMediaWikiService();