diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index a561d990b9..4e4e1d0f3f 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -1,4 +1,5 @@ -import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse"; +import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse"; +import { getTaskRunField, getPayloadField } from "@internal/clickhouse"; import { type RedisOptions } from "@internal/redis"; import { LogicalReplicationClient, @@ -81,7 +82,7 @@ type TaskRunInsert = { export type RunsReplicationServiceEvents = { message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }]; batchFlushed: [ - { flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] } + { flushId: string; taskRunInserts: TaskRunInsertArray[]; payloadInserts: PayloadInsertArray[] } ]; }; @@ -171,12 +172,9 @@ export class RunsReplicationService { description: "Insert retry attempts", }); - this._eventsProcessedCounter = this._meter.createCounter( - "runs_replication.events_processed", - { - description: "Replication events processed (inserts, updates, deletes)", - } - ); + this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", { + description: "Replication events processed (inserts, updates, deletes)", + }); this._flushDurationHistogram = this._meter.createHistogram( "runs_replication.flush_duration_ms", @@ -216,34 +214,18 @@ export class RunsReplicationService { flushInterval: options.flushIntervalMs ?? 100, maxConcurrency: options.maxFlushConcurrency ?? 100, callback: this.#flushBatch.bind(this), - // we can do some pre-merging to reduce the amount of data we need to send to clickhouse - mergeBatch: (existingBatch: TaskRunInsert[], newBatch: TaskRunInsert[]) => { - const merged = new Map(); - - for (const item of existingBatch) { - const key = `${item.event}_${item.run.id}`; - merged.set(key, item); + // Key-based deduplication to reduce duplicates sent to ClickHouse + getKey: (item) => { + if (!item?.run?.id) { + this.logger.warn("Skipping replication event with null run", { event: item }); + return null; } - - for (const item of newBatch) { - if (!item?.run?.id) { - this.logger.warn("Skipping replication event with null run", { event: item }); - continue; - } - - const key = `${item.event}_${item.run.id}`; - const existingItem = merged.get(key); - - // Keep the run with the higher version (latest) - // and take the last occurrence for that version. - // Items originating from the same DB transaction have the same version. - if (!existingItem || item._version >= existingItem._version) { - merged.set(key, item); - } - } - - return Array.from(merged.values()); + return `${item.event}_${item.run.id}`; }, + // Keep the run with the higher version (latest) + // and take the last occurrence for that version. + // Items originating from the same DB transaction have the same version. + shouldReplace: (existing, incoming) => incoming._version >= existing._version, logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"), tracer: options.tracer, }); @@ -506,7 +488,7 @@ export class RunsReplicationService { this._eventsProcessedCounter.add(1, { event_type: event.tag }); } - this.logger.info("handle_transaction", { + this.logger.debug("handle_transaction", { transaction: { xid: transaction.xid, commitLsn: transaction.commitLsn, @@ -578,32 +560,46 @@ export class RunsReplicationService { const taskRunInserts = preparedInserts .map(({ taskRunInsert }) => taskRunInsert) - .filter(Boolean) + .filter((x): x is TaskRunInsertArray => Boolean(x)) // batch inserts in clickhouse are more performant if the items // are pre-sorted by the primary key .sort((a, b) => { - if (a.organization_id !== b.organization_id) { - return a.organization_id < b.organization_id ? -1 : 1; + const aOrgId = getTaskRunField(a, "organization_id"); + const bOrgId = getTaskRunField(b, "organization_id"); + if (aOrgId !== bOrgId) { + return aOrgId < bOrgId ? -1 : 1; } - if (a.project_id !== b.project_id) { - return a.project_id < b.project_id ? -1 : 1; + const aProjId = getTaskRunField(a, "project_id"); + const bProjId = getTaskRunField(b, "project_id"); + if (aProjId !== bProjId) { + return aProjId < bProjId ? -1 : 1; } - if (a.environment_id !== b.environment_id) { - return a.environment_id < b.environment_id ? -1 : 1; + const aEnvId = getTaskRunField(a, "environment_id"); + const bEnvId = getTaskRunField(b, "environment_id"); + if (aEnvId !== bEnvId) { + return aEnvId < bEnvId ? -1 : 1; } - if (a.created_at !== b.created_at) { - return a.created_at - b.created_at; + const aCreatedAt = getTaskRunField(a, "created_at"); + const bCreatedAt = getTaskRunField(b, "created_at"); + if (aCreatedAt !== bCreatedAt) { + return aCreatedAt - bCreatedAt; } - return a.run_id < b.run_id ? -1 : 1; + const aRunId = getTaskRunField(a, "run_id"); + const bRunId = getTaskRunField(b, "run_id"); + if (aRunId === bRunId) return 0; + return aRunId < bRunId ? -1 : 1; }); const payloadInserts = preparedInserts .map(({ payloadInsert }) => payloadInsert) - .filter(Boolean) + .filter((x): x is PayloadInsertArray => Boolean(x)) // batch inserts in clickhouse are more performant if the items // are pre-sorted by the primary key .sort((a, b) => { - return a.run_id < b.run_id ? -1 : 1; + const aRunId = getPayloadField(a, "run_id"); + const bRunId = getPayloadField(b, "run_id"); + if (aRunId === bRunId) return 0; + return aRunId < bRunId ? -1 : 1; }); span.setAttribute("task_run_inserts", taskRunInserts.length); @@ -633,7 +629,6 @@ export class RunsReplicationService { this.logger.error("Error inserting task run inserts", { error: taskRunError, flushId, - runIds: taskRunInserts.map((r) => r.run_id), }); recordSpanError(span, taskRunError); } @@ -642,7 +637,6 @@ export class RunsReplicationService { this.logger.error("Error inserting payload inserts", { error: payloadError, flushId, - runIds: payloadInserts.map((r) => r.run_id), }); recordSpanError(span, payloadError); } @@ -760,26 +754,24 @@ export class RunsReplicationService { #getClickhouseInsertSettings() { if (this._insertStrategy === "insert") { return {}; - } else if (this._insertStrategy === "insert_async") { - return { - async_insert: 1 as const, - async_insert_max_data_size: "1000000", - async_insert_busy_timeout_ms: 1000, - wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const), - }; } + + return { + async_insert: 1 as const, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const), + }; } - async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) { + async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { - const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( - taskRunInserts, - { + const [insertError, insertResult] = + await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, - } - ); + }); if (insertError) { this.logger.error("Error inserting task run inserts attempt", { @@ -795,16 +787,14 @@ export class RunsReplicationService { }); } - async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) { + async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) { return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { - const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads( - payloadInserts, - { + const [insertError, insertResult] = + await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, - } - ); + }); if (insertError) { this.logger.error("Error inserting payload inserts attempt", { @@ -822,25 +812,15 @@ export class RunsReplicationService { async #prepareRunInserts( batchedRun: TaskRunInsert - ): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> { + ): Promise<{ taskRunInsert?: TaskRunInsertArray; payloadInsert?: PayloadInsertArray }> { this.logger.debug("Preparing run", { batchedRun, }); const { run, _version, event } = batchedRun; - if (!run.environmentType) { - return { - taskRunInsert: undefined, - payloadInsert: undefined, - }; - } - - if (!run.organizationId) { - return { - taskRunInsert: undefined, - payloadInsert: undefined, - }; + if (!run.environmentType || !run.organizationId) { + return {}; } if (event === "update" || event === "delete" || this._disablePayloadInsert) { @@ -852,10 +832,7 @@ export class RunsReplicationService { _version ); - return { - taskRunInsert, - payloadInsert: undefined, - }; + return { taskRunInsert }; } const [taskRunInsert, payloadInsert] = await Promise.all([ @@ -863,10 +840,7 @@ export class RunsReplicationService { this.#preparePayloadInsert(run, _version), ]); - return { - taskRunInsert, - payloadInsert, - }; + return { taskRunInsert, payloadInsert }; } async #prepareTaskRunInsert( @@ -875,66 +849,68 @@ export class RunsReplicationService { environmentType: string, event: "insert" | "update" | "delete", _version: bigint - ): Promise { + ): Promise { const output = await this.#prepareJson(run.output, run.outputType); - return { - environment_id: run.runtimeEnvironmentId, - organization_id: organizationId, - project_id: run.projectId, - run_id: run.id, - updated_at: run.updatedAt.getTime(), - created_at: run.createdAt.getTime(), - status: run.status, - environment_type: environmentType, - friendly_id: run.friendlyId, - engine: run.engine, - task_identifier: run.taskIdentifier, - queue: run.queue, - span_id: run.spanId, - trace_id: run.traceId, - error: { data: run.error }, - attempt: run.attemptNumber ?? 1, - schedule_id: run.scheduleId ?? "", - batch_id: run.batchId ?? "", - completed_at: run.completedAt?.getTime(), - started_at: run.startedAt?.getTime(), - executed_at: run.executedAt?.getTime(), - delay_until: run.delayUntil?.getTime(), - queued_at: run.queuedAt?.getTime(), - expired_at: run.expiredAt?.getTime(), - usage_duration_ms: run.usageDurationMs, - cost_in_cents: run.costInCents, - base_cost_in_cents: run.baseCostInCents, - tags: run.runTags ?? [], - task_version: run.taskVersion ?? "", - sdk_version: run.sdkVersion ?? "", - cli_version: run.cliVersion ?? "", - machine_preset: run.machinePreset ?? "", - root_run_id: run.rootTaskRunId ?? "", - parent_run_id: run.parentTaskRunId ?? "", - depth: run.depth, - is_test: run.isTest, - idempotency_key: run.idempotencyKey ?? "", - expiration_ttl: run.ttl ?? "", - output, - concurrency_key: run.concurrencyKey ?? "", - bulk_action_group_ids: run.bulkActionGroupIds ?? [], - worker_queue: run.masterQueue, - max_duration_in_seconds: run.maxDurationInSeconds ?? undefined, - _version: _version.toString(), - _is_deleted: event === "delete" ? 1 : 0, - }; + // Return array matching TASK_RUN_COLUMNS order + return [ + run.runtimeEnvironmentId, // environment_id + organizationId, // organization_id + run.projectId, // project_id + run.id, // run_id + run.updatedAt.getTime(), // updated_at + run.createdAt.getTime(), // created_at + run.status, // status + environmentType, // environment_type + run.friendlyId, // friendly_id + run.attemptNumber ?? 1, // attempt + run.engine, // engine + run.taskIdentifier, // task_identifier + run.queue, // queue + run.scheduleId ?? "", // schedule_id + run.batchId ?? "", // batch_id + run.completedAt?.getTime() ?? null, // completed_at + run.startedAt?.getTime() ?? null, // started_at + run.executedAt?.getTime() ?? null, // executed_at + run.delayUntil?.getTime() ?? null, // delay_until + run.queuedAt?.getTime() ?? null, // queued_at + run.expiredAt?.getTime() ?? null, // expired_at + run.usageDurationMs ?? 0, // usage_duration_ms + run.costInCents ?? 0, // cost_in_cents + run.baseCostInCents ?? 0, // base_cost_in_cents + output, // output + { data: run.error }, // error + run.runTags ?? [], // tags + run.taskVersion ?? "", // task_version + run.sdkVersion ?? "", // sdk_version + run.cliVersion ?? "", // cli_version + run.machinePreset ?? "", // machine_preset + run.rootTaskRunId ?? "", // root_run_id + run.parentTaskRunId ?? "", // parent_run_id + run.depth ?? 0, // depth + run.spanId, // span_id + run.traceId, // trace_id + run.idempotencyKey ?? "", // idempotency_key + run.ttl ?? "", // expiration_ttl + run.isTest ?? false, // is_test + _version.toString(), // _version + event === "delete" ? 1 : 0, // _is_deleted + run.concurrencyKey ?? "", // concurrency_key + run.bulkActionGroupIds ?? [], // bulk_action_group_ids + run.masterQueue ?? "", // worker_queue + run.maxDurationInSeconds ?? null, // max_duration_in_seconds + ]; } - async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise { + async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise { const payload = await this.#prepareJson(run.payload, run.payloadType); - return { - run_id: run.id, - created_at: run.createdAt.getTime(), - payload, - }; + // Return array matching PAYLOAD_COLUMNS order + return [ + run.id, // run_id + run.createdAt.getTime(), // created_at + payload, // payload + ]; } async #prepareJson( @@ -982,13 +958,16 @@ export type ConcurrentFlushSchedulerConfig = { flushInterval: number; maxConcurrency?: number; callback: (flushId: string, batch: T[]) => Promise; - mergeBatch?: (existingBatch: T[], newBatch: T[]) => T[]; + /** Key-based deduplication. Return null to skip the item. */ + getKey: (item: T) => string | null; + /** Determine if incoming item should replace existing. */ + shouldReplace: (existing: T, incoming: T) => boolean; tracer?: Tracer; logger?: Logger; }; export class ConcurrentFlushScheduler { - private currentBatch: T[]; + private batch = new Map(); private readonly BATCH_SIZE: number; private readonly flushInterval: number; private readonly MAX_CONCURRENCY: number; @@ -1003,7 +982,6 @@ export class ConcurrentFlushScheduler { this.logger = config.logger ?? new Logger("ConcurrentFlushScheduler", "info"); this._tracer = config.tracer ?? trace.getTracer("concurrent-flush-scheduler"); - this.currentBatch = []; this.BATCH_SIZE = config.batchSize; this.flushInterval = config.flushInterval; this.MAX_CONCURRENCY = config.maxConcurrency || 1; @@ -1013,9 +991,17 @@ export class ConcurrentFlushScheduler { } addToBatch(items: T[]): void { - this.currentBatch = this.config.mergeBatch - ? this.config.mergeBatch(this.currentBatch, items) - : this.currentBatch.concat(items); + for (const item of items) { + const key = this.config.getKey(item); + if (key === null) { + continue; + } + + const existing = this.batch.get(key); + if (!existing || this.config.shouldReplace(existing, item)) { + this.batch.set(key, item); + } + } this.#flushNextBatchIfNeeded(); } @@ -1039,11 +1025,16 @@ export class ConcurrentFlushScheduler { this.#flushNextBatchIfNeeded(); } + #getBatchSize(): number { + return this.batch.size; + } + #flushNextBatchIfNeeded(): void { - if (this.currentBatch.length >= this.BATCH_SIZE || this._isShutDown) { + const currentSize = this.#getBatchSize(); + if (currentSize >= this.BATCH_SIZE || this._isShutDown) { this.logger.debug("Batch size threshold reached, initiating flush", { batchSize: this.BATCH_SIZE, - currentSize: this.currentBatch.length, + currentSize, isShutDown: this._isShutDown, }); @@ -1068,19 +1059,20 @@ export class ConcurrentFlushScheduler { } async #checkAndFlush(): Promise { - if (this.currentBatch.length > 0) { + const currentSize = this.#getBatchSize(); + if (currentSize > 0) { this.logger.debug("Periodic flush check triggered", { - currentBatchSize: this.currentBatch.length, + currentBatchSize: currentSize, }); await this.#flushNextBatch(); } } async #flushNextBatch(): Promise { - if (this.currentBatch.length === 0) return; + if (this.batch.size === 0) return; - const batch = this.currentBatch; - this.currentBatch = []; + const batch = Array.from(this.batch.values()); + this.batch.clear(); const callback = this.config.callback; diff --git a/apps/webapp/test/concurrentFlushScheduler.test.ts b/apps/webapp/test/concurrentFlushScheduler.test.ts new file mode 100644 index 0000000000..9ab7954033 --- /dev/null +++ b/apps/webapp/test/concurrentFlushScheduler.test.ts @@ -0,0 +1,161 @@ +import { ConcurrentFlushScheduler } from "~/services/runsReplicationService.server"; + +vi.setConfig({ testTimeout: 10_000 }); + +type TestItem = { + id: string; + event: "insert" | "update"; + version: number; +}; + +describe("ConcurrentFlushScheduler", () => { + it("should deduplicate items by key, keeping the latest version", async () => { + const flushedBatches: TestItem[][] = []; + + const scheduler = new ConcurrentFlushScheduler({ + batchSize: 100, + flushInterval: 50, + maxConcurrency: 1, + callback: async (_flushId, batch) => { + flushedBatches.push([...batch]); + }, + getKey: (item) => `${item.event}_${item.id}`, + shouldReplace: (existing, incoming) => incoming.version >= existing.version, + }); + + scheduler.start(); + + // Add items with duplicate keys but different versions + scheduler.addToBatch([ + { id: "run_1", event: "insert", version: 1 }, + { id: "run_1", event: "update", version: 2 }, + { id: "run_2", event: "insert", version: 1 }, + ]); + + // Add more items - should merge with existing + scheduler.addToBatch([ + { id: "run_1", event: "insert", version: 3 }, // Higher version, should replace + { id: "run_1", event: "update", version: 1 }, // Lower version, should NOT replace + { id: "run_2", event: "update", version: 4 }, + ]); + + // Wait for flush + await new Promise((resolve) => setTimeout(resolve, 100)); + + scheduler.shutdown(); + + // Should have flushed once with deduplicated items + expect(flushedBatches.length).toBeGreaterThanOrEqual(1); + + const allFlushed = flushedBatches.flat(); + + // Find items by their key + const insertRun1 = allFlushed.find((i) => i.id === "run_1" && i.event === "insert"); + const updateRun1 = allFlushed.find((i) => i.id === "run_1" && i.event === "update"); + const insertRun2 = allFlushed.find((i) => i.id === "run_2" && i.event === "insert"); + const updateRun2 = allFlushed.find((i) => i.id === "run_2" && i.event === "update"); + + // Verify correct versions were kept + expect(insertRun1?.version).toBe(3); // Latest version for insert_run_1 + expect(updateRun1?.version).toBe(2); // Original update_run_1 (v1 didn't replace v2) + expect(insertRun2?.version).toBe(1); // Only version for insert_run_2 + expect(updateRun2?.version).toBe(4); // Only version for update_run_2 + }); + + it("should skip items where getKey returns null", async () => { + const flushedBatches: TestItem[][] = []; + + const scheduler = new ConcurrentFlushScheduler({ + batchSize: 100, + flushInterval: 50, + maxConcurrency: 1, + callback: async (_flushId, batch) => { + flushedBatches.push([...batch]); + }, + getKey: (item) => { + if (!item.id) { + return null; + } + return `${item.event}_${item.id}`; + }, + shouldReplace: (existing, incoming) => incoming.version >= existing.version, + }); + + scheduler.start(); + + scheduler.addToBatch([ + { id: "run_1", event: "insert", version: 1 }, + { id: "", event: "insert", version: 2 }, // Should be skipped (null key) + { id: "run_2", event: "insert", version: 1 }, + ]); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + scheduler.shutdown(); + + const allFlushed = flushedBatches.flat(); + expect(allFlushed).toHaveLength(2); + expect(allFlushed.map((i) => i.id).sort()).toEqual(["run_1", "run_2"]); + }); + + it("should flush when batch size threshold is reached", async () => { + const flushedBatches: TestItem[][] = []; + + const scheduler = new ConcurrentFlushScheduler({ + batchSize: 3, + flushInterval: 10000, // Long interval so timer doesn't trigger + maxConcurrency: 1, + callback: async (_flushId, batch) => { + flushedBatches.push([...batch]); + }, + getKey: (item) => `${item.event}_${item.id}`, + shouldReplace: (existing, incoming) => incoming.version >= existing.version, + }); + + scheduler.start(); + + // Add 3 unique items - should trigger flush + scheduler.addToBatch([ + { id: "run_1", event: "insert", version: 1 }, + { id: "run_2", event: "insert", version: 1 }, + { id: "run_3", event: "insert", version: 1 }, + ]); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(flushedBatches.length).toBe(1); + expect(flushedBatches[0]).toHaveLength(3); + + scheduler.shutdown(); + }); + + it("should respect shouldReplace returning false", async () => { + const flushedBatches: TestItem[][] = []; + + const scheduler = new ConcurrentFlushScheduler({ + batchSize: 100, + flushInterval: 50, + maxConcurrency: 1, + callback: async (_flushId, batch) => { + flushedBatches.push([...batch]); + }, + getKey: (item) => `${item.event}_${item.id}`, + // Never replace - first item wins + shouldReplace: () => false, + }); + + scheduler.start(); + + scheduler.addToBatch([{ id: "run_1", event: "insert", version: 10 }]); + + scheduler.addToBatch([{ id: "run_1", event: "insert", version: 999 }]); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + scheduler.shutdown(); + + const allFlushed = flushedBatches.flat(); + const insertRun1 = allFlushed.find((i) => i.id === "run_1" && i.event === "insert"); + expect(insertRun1?.version).toBe(10); // First one wins + }); +}); diff --git a/apps/webapp/test/runsReplicationService.part1.test.ts b/apps/webapp/test/runsReplicationService.part1.test.ts index a8726d8221..87ebd0cde2 100644 --- a/apps/webapp/test/runsReplicationService.part1.test.ts +++ b/apps/webapp/test/runsReplicationService.part1.test.ts @@ -1,6 +1,5 @@ import { ClickHouse } from "@internal/clickhouse"; import { containerTest } from "@internal/testcontainers"; -import { Logger } from "@trigger.dev/core/logger"; import { setTimeout } from "node:timers/promises"; import { z } from "zod"; import { TaskRunStatus } from "~/database-types"; @@ -22,6 +21,7 @@ describe("RunsReplicationService (part 1/2)", () => { compression: { request: true, }, + logLevel: "warn", }); const { tracer, exporter } = createInMemoryTracing(); @@ -40,6 +40,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, tracer, + logLevel: "warn", }); await runsReplicationService.start(); @@ -135,6 +136,7 @@ describe("RunsReplicationService (part 1/2)", () => { compression: { request: true, }, + logLevel: "warn", }); const { tracer, exporter } = createInMemoryTracing(); @@ -153,6 +155,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, tracer, + logLevel: "warn", }); await runsReplicationService.start(); @@ -271,6 +274,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication", + logLevel: "warn", }); const { tracer, exporter } = createInMemoryTracing(); @@ -289,6 +293,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, tracer, + logLevel: "warn", }); await runsReplicationService.start(); @@ -341,6 +346,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-batching", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -356,6 +362,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationService.start(); @@ -443,6 +450,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-payload", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -458,6 +466,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationService.start(); @@ -542,6 +551,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-payload", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -557,6 +567,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationService.start(); @@ -646,6 +657,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-update", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -661,6 +673,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationService.start(); @@ -751,6 +764,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-delete", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -766,6 +780,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationService.start(); @@ -849,6 +864,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-shutdown-handover", + logLevel: "warn", }); // Service A @@ -865,6 +881,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationServiceA.start(); @@ -968,6 +985,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationServiceB.start(); @@ -997,6 +1015,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-shutdown-after-processed", + logLevel: "warn", }); // Service A @@ -1013,6 +1032,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationServiceA.start(); @@ -1114,6 +1134,7 @@ describe("RunsReplicationService (part 1/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationServiceB.start(); @@ -1137,6 +1158,7 @@ describe("RunsReplicationService (part 1/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-metrics", + logLevel: "warn", }); const { tracer } = createInMemoryTracing(); @@ -1157,6 +1179,7 @@ describe("RunsReplicationService (part 1/2)", () => { ackIntervalSeconds: 5, tracer, meter: metricsHelper.meter, + logLevel: "warn", }); await runsReplicationService.start(); diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index e08b579738..9eece84791 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -1,4 +1,4 @@ -import { ClickHouse } from "@internal/clickhouse"; +import { ClickHouse, getTaskRunField, getPayloadField } from "@internal/clickhouse"; import { containerTest } from "@internal/testcontainers"; import { Logger } from "@trigger.dev/core/logger"; import { readFile } from "node:fs/promises"; @@ -18,6 +18,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-shutdown-handover", + logLevel: "warn", }); // Service A @@ -35,7 +36,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockExtendIntervalMs: 1000, leaderLockAcquireAdditionalTimeMs: 10_000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-shutdown-handover-a", "debug"), + logger: new Logger("runs-replication-shutdown-handover-a", "warn"), }); await runsReplicationServiceA.start(); @@ -55,7 +56,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockExtendIntervalMs: 1000, leaderLockAcquireAdditionalTimeMs: 10_000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-shutdown-handover-b", "debug"), + logger: new Logger("runs-replication-shutdown-handover-b", "warn"), }); // Now we need to initiate starting the second service, and after 6 seconds, we need to shutdown the first service @@ -147,6 +148,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-stress-bulk-insert", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -162,7 +164,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-stress-bulk-insert", "info"), + logLevel: "warn", }); await runsReplicationService.start(); @@ -261,6 +263,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-stress-bulk-insert", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -276,7 +279,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-stress-bulk-insert", "info"), + logLevel: "warn", }); await runsReplicationService.start(); @@ -381,6 +384,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-multi-event-tx", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -396,6 +400,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, + logLevel: "warn", }); await runsReplicationService.start(); @@ -513,6 +518,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-long-tx", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -528,7 +534,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-long-tx", "info"), + logLevel: "warn", }); await runsReplicationService.start(); @@ -619,6 +625,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-stress-bulk-insert", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -634,7 +641,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-stress-bulk-insert", "info"), + logLevel: "warn", }); await runsReplicationService.start(); @@ -787,6 +794,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-merge-batch", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -802,7 +810,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-merge-batch", "info"), + logLevel: "warn", }); // Listen to batchFlushed events to verify merging @@ -889,17 +897,14 @@ describe("RunsReplicationService (part 2/2)", () => { await setTimeout(1000); expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2); - expect(batchFlushedEvents?.[0].taskRunInserts[0]).toEqual( - expect.objectContaining({ - run_id: run.id, - status: "PENDING_VERSION", - }) + // Use getTaskRunField for type-safe array access + expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[0], "run_id")).toEqual(run.id); + expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[0], "status")).toEqual( + "PENDING_VERSION" ); - expect(batchFlushedEvents?.[0].taskRunInserts[1]).toEqual( - expect.objectContaining({ - run_id: run.id, - status: "COMPLETED_SUCCESSFULLY", - }) + expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[1], "run_id")).toEqual(run.id); + expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[1], "status")).toEqual( + "COMPLETED_SUCCESSFULLY" ); await runsReplicationService.stop(); @@ -914,6 +919,7 @@ describe("RunsReplicationService (part 2/2)", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication-sorting", + logLevel: "warn", }); const runsReplicationService = new RunsReplicationService({ @@ -929,7 +935,7 @@ describe("RunsReplicationService (part 2/2)", () => { leaderLockTimeoutMs: 5000, leaderLockExtendIntervalMs: 1000, ackIntervalSeconds: 5, - logger: new Logger("runs-replication-sorting", "info"), + logLevel: "warn", }); // Listen to batchFlushed events to verify sorting @@ -1066,22 +1072,22 @@ describe("RunsReplicationService (part 2/2)", () => { // Verify sorting order: organization_id, project_id, environment_id, created_at, run_id for (let i = 1; i < batchFlushedEvents[0]?.taskRunInserts.length; i++) { - const prev = batchFlushedEvents[0]?.taskRunInserts[i - 1]; - const curr = batchFlushedEvents[0]?.taskRunInserts[i]; + const prev = batchFlushedEvents[0]!.taskRunInserts[i - 1]; + const curr = batchFlushedEvents[0]!.taskRunInserts[i]; const prevKey = [ - prev.organization_id, - prev.project_id, - prev.environment_id, - prev.created_at, - prev.run_id, + getTaskRunField(prev, "organization_id"), + getTaskRunField(prev, "project_id"), + getTaskRunField(prev, "environment_id"), + getTaskRunField(prev, "created_at"), + getTaskRunField(prev, "run_id"), ]; const currKey = [ - curr.organization_id, - curr.project_id, - curr.environment_id, - curr.created_at, - curr.run_id, + getTaskRunField(curr, "organization_id"), + getTaskRunField(curr, "project_id"), + getTaskRunField(curr, "environment_id"), + getTaskRunField(curr, "created_at"), + getTaskRunField(curr, "run_id"), ]; const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]); @@ -1109,11 +1115,315 @@ describe("RunsReplicationService (part 2/2)", () => { // Verify payloadInserts are also sorted by run_id for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) { - const prev = batchFlushedEvents[0]?.payloadInserts[i - 1]; - const curr = batchFlushedEvents[0]?.payloadInserts[i]; - expect(prev.run_id <= curr.run_id).toBeTruthy(); + const prev = batchFlushedEvents[0]!.payloadInserts[i - 1]; + const curr = batchFlushedEvents[0]!.payloadInserts[i]; + expect(getPayloadField(prev, "run_id") <= getPayloadField(curr, "run_id")).toBeTruthy(); + } + + await runsReplicationService.stop(); + } + ); + + containerTest( + "should exhaustively replicate all TaskRun columns to ClickHouse", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-exhaustive", + logLevel: "warn", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-exhaustive", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test-exhaustive", + slug: "test-exhaustive", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test-exhaustive", + slug: "test-exhaustive", + organizationId: organization.id, + externalRef: "test-exhaustive", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-exhaustive", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "test-exhaustive", + pkApiKey: "test-exhaustive", + shortcode: "test-exhaustive", + }, + }); + + // Create a batch for the batchId field + const batch = await prisma.batchTaskRun.create({ + data: { + friendlyId: "batch_exhaustive", + runtimeEnvironmentId: runtimeEnvironment.id, + status: "PENDING", + }, + }); + + // Create a root run for the rootTaskRunId field + const rootRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_root_exhaustive", + taskIdentifier: "root-task", + payload: JSON.stringify({ root: true }), + traceId: "root-trace-id", + spanId: "root-span-id", + queue: "root-queue", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "PRODUCTION", + engine: "V2", + }, + }); + + // Create a parent run for the parentTaskRunId field + const parentRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_parent_exhaustive", + taskIdentifier: "parent-task", + payload: JSON.stringify({ parent: true }), + traceId: "parent-trace-id", + spanId: "parent-span-id", + queue: "parent-queue", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "PRODUCTION", + engine: "V2", + rootTaskRunId: rootRun.id, + depth: 1, + }, + }); + + // Set up all the dates we'll use + const now = new Date(); + const createdAt = new Date(now.getTime() - 10000); + const updatedAt = new Date(now.getTime() - 5000); + const startedAt = new Date(now.getTime() - 8000); + const executedAt = new Date(now.getTime() - 7500); + const completedAt = new Date(now.getTime() - 6000); + const delayUntil = new Date(now.getTime() - 9000); + const queuedAt = new Date(now.getTime() - 9500); + const expiredAt = null; // Not expired + + // Create the main task run with ALL fields populated + const taskRun = await prisma.taskRun.create({ + data: { + // Core identifiers + friendlyId: "run_exhaustive_test", + taskIdentifier: "exhaustive-task", + + // Environment/project/org + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "PRODUCTION", + + // Engine and execution + engine: "V2", + status: "COMPLETED_SUCCESSFULLY", + attemptNumber: 3, + queue: "exhaustive-queue", + workerQueue: "exhaustive-worker-queue", + + // Relationships + // Note: scheduleId is not set to test empty string handling + batchId: batch.id, + rootTaskRunId: rootRun.id, + parentTaskRunId: parentRun.id, + depth: 2, + + // Timestamps + createdAt, + updatedAt, + startedAt, + executedAt, + completedAt, + delayUntil, + queuedAt, + expiredAt, + + // Payload and output + payload: JSON.stringify({ input: "test-payload" }), + payloadType: "application/json", + output: JSON.stringify({ result: "test-output" }), + outputType: "application/json", + error: { message: "test error", name: "TestError" }, + + // Tracing + traceId: "exhaustive-trace-id-12345", + spanId: "exhaustive-span-id-67890", + + // Versioning + taskVersion: "1.2.3", + sdkVersion: "3.0.0", + cliVersion: "2.5.1", + + // Execution settings + machinePreset: "large-1x", + idempotencyKey: "exhaustive-idempotency-key", + ttl: "1h", + isTest: true, + concurrencyKey: "exhaustive-concurrency-key", + maxDurationInSeconds: 3600, + + // Tags and bulk actions + runTags: ["tag1", "tag2", "exhaustive-tag"], + bulkActionGroupIds: ["bulk-group-1", "bulk-group-2"], + + // Usage metrics + usageDurationMs: 12345, + costInCents: 50, + baseCostInCents: 25, + }, + }); + + // Wait for replication + await setTimeout(1500); + + // Query ClickHouse directly to get all columns + const queryRuns = clickhouse.reader.query({ + name: "exhaustive-replication-test", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", + schema: z.any(), + params: z.object({ run_id: z.string() }), + }); + + const [queryError, result] = await queryRuns({ run_id: taskRun.id }); + + expect(queryError).toBeNull(); + expect(result).toHaveLength(1); + + const clickhouseRun = result![0]; + + // Exhaustively verify each column + // Core identifiers + expect(clickhouseRun.run_id).toBe(taskRun.id); + expect(clickhouseRun.friendly_id).toBe("run_exhaustive_test"); + expect(clickhouseRun.task_identifier).toBe("exhaustive-task"); + + // Environment/project/org + expect(clickhouseRun.environment_id).toBe(runtimeEnvironment.id); + expect(clickhouseRun.project_id).toBe(project.id); + expect(clickhouseRun.organization_id).toBe(organization.id); + expect(clickhouseRun.environment_type).toBe("PRODUCTION"); + + // Engine and execution + expect(clickhouseRun.engine).toBe("V2"); + expect(clickhouseRun.status).toBe("COMPLETED_SUCCESSFULLY"); + expect(clickhouseRun.attempt).toBe(3); + expect(clickhouseRun.queue).toBe("exhaustive-queue"); + expect(clickhouseRun.worker_queue).toBe("exhaustive-worker-queue"); + + // Relationships + expect(clickhouseRun.schedule_id).toBe(""); // Empty when not set + expect(clickhouseRun.batch_id).toBe(batch.id); + expect(clickhouseRun.root_run_id).toBe(rootRun.id); + expect(clickhouseRun.parent_run_id).toBe(parentRun.id); + expect(clickhouseRun.depth).toBe(2); + + // Timestamps (ClickHouse returns DateTime64 as strings in UTC without 'Z' suffix) + // Helper to parse ClickHouse timestamp strings to milliseconds + function parseClickhouseTimestamp(ts: string | null): number | null { + if (ts === null || ts === "1970-01-01 00:00:00.000") return null; + return new Date(ts + "Z").getTime(); } + expect(parseClickhouseTimestamp(clickhouseRun.created_at)).toBe(createdAt.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.updated_at)).toBe(updatedAt.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.started_at)).toBe(startedAt.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.executed_at)).toBe(executedAt.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.completed_at)).toBe(completedAt.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.delay_until)).toBe(delayUntil.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.queued_at)).toBe(queuedAt.getTime()); + expect(parseClickhouseTimestamp(clickhouseRun.expired_at)).toBeNull(); + + // Output (parsed JSON) + expect(clickhouseRun.output).toEqual({ data: { result: "test-output" } }); + + // Error + expect(clickhouseRun.error).toEqual({ + data: { message: "test error", name: "TestError" }, + }); + + // Tracing + expect(clickhouseRun.trace_id).toBe("exhaustive-trace-id-12345"); + expect(clickhouseRun.span_id).toBe("exhaustive-span-id-67890"); + + // Versioning + expect(clickhouseRun.task_version).toBe("1.2.3"); + expect(clickhouseRun.sdk_version).toBe("3.0.0"); + expect(clickhouseRun.cli_version).toBe("2.5.1"); + + // Execution settings + expect(clickhouseRun.machine_preset).toBe("large-1x"); + expect(clickhouseRun.idempotency_key).toBe("exhaustive-idempotency-key"); + expect(clickhouseRun.expiration_ttl).toBe("1h"); + expect(clickhouseRun.is_test).toBe(1); // ClickHouse returns booleans as integers + expect(clickhouseRun.concurrency_key).toBe("exhaustive-concurrency-key"); + expect(clickhouseRun.max_duration_in_seconds).toBe(3600); + + // Tags and bulk actions + expect(clickhouseRun.tags).toEqual(["tag1", "tag2", "exhaustive-tag"]); + expect(clickhouseRun.bulk_action_group_ids).toEqual(["bulk-group-1", "bulk-group-2"]); + + // Usage metrics + expect(clickhouseRun.usage_duration_ms).toBe(12345); + expect(clickhouseRun.cost_in_cents).toBe(50); + expect(clickhouseRun.base_cost_in_cents).toBe(25); + + // Internal ClickHouse columns + expect(clickhouseRun._is_deleted).toBe(0); + expect(clickhouseRun._version).toBeDefined(); + expect(typeof clickhouseRun._version).toBe("number"); // ClickHouse returns UInt64 as number + + // Also verify the payload was inserted into the payloads table + const queryPayloads = clickhouse.reader.query({ + name: "exhaustive-payload-test", + query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}", + schema: z.any(), + params: z.object({ run_id: z.string() }), + }); + + const [payloadError, payloadResult] = await queryPayloads({ run_id: taskRun.id }); + + expect(payloadError).toBeNull(); + expect(payloadResult).toHaveLength(1); + expect(payloadResult![0].run_id).toBe(taskRun.id); + expect(parseClickhouseTimestamp(payloadResult![0].created_at)).toBe(createdAt.getTime()); + expect(payloadResult![0].payload).toEqual({ data: { input: "test-payload" } }); + await runsReplicationService.stop(); } ); diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index 0842665c14..5a4118934e 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -6,9 +6,11 @@ import { createClient, type ResultSet, type Row, + type BaseQueryParams, + type InsertResult, } from "@clickhouse/client"; import { recordSpanError, Span, startSpan, trace, Tracer } from "@internal/tracing"; -import { flattenAttributes, tryCatch } from "@trigger.dev/core/v3"; +import { flattenAttributes, tryCatch, type Result } from "@trigger.dev/core/v3"; import { z } from "zod"; import { InsertError, QueryError } from "./errors.js"; import type { @@ -645,6 +647,76 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { }; } + public insertCompact>(req: { + name: string; + table: string; + columns: readonly string[]; + toArray: (record: TRecord) => any[]; + settings?: ClickHouseSettings; + }): ClickhouseInsertFunction { + return async (events, options) => { + const queryId = randomUUID(); + + return await startSpan(this.tracer, "insert", async (span) => { + const eventsArray = Array.isArray(events) ? events : [events]; + + this.logger.debug("Inserting into clickhouse (compact)", { + clientName: this.name, + name: req.name, + table: req.table, + events: eventsArray.length, + settings: req.settings, + attributes: options?.attributes, + options, + queryId, + }); + + span.setAttributes({ + "clickhouse.clientName": this.name, + "clickhouse.tableName": req.table, + "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, + "clickhouse.format": "JSONCompactEachRowWithNames", + ...flattenAttributes(req.settings, "clickhouse.settings"), + ...flattenAttributes(options?.attributes), + }); + + // Build compact format: [columns, ...rows] + const compactData: any[] = [Array.from(req.columns)]; + for (let i = 0; i < eventsArray.length; i++) { + compactData.push(req.toArray(eventsArray[i])); + } + + const [clickhouseError, result] = await tryCatch( + this.client.insert({ + table: req.table, + format: "JSONCompactEachRowWithNames", + values: compactData, + query_id: queryId, + ...options?.params, + clickhouse_settings: { + ...req.settings, + ...options?.params?.clickhouse_settings, + }, + }) + ); + + if (clickhouseError) { + this.logger.error("Error inserting into clickhouse", { + name: req.name, + error: clickhouseError, + table: req.table, + }); + + recordClickhouseError(span, clickhouseError); + return [new InsertError(clickhouseError.message), null]; + } + + return [null, result]; + }); + }; + } + public insertUnsafe>(req: { name: string; table: string; @@ -654,11 +726,13 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { const queryId = randomUUID(); return await startSpan(this.tracer, "insert", async (span) => { + const eventsArray = Array.isArray(events) ? events : [events]; + this.logger.debug("Inserting into clickhouse", { clientName: this.name, name: req.name, table: req.table, - events: Array.isArray(events) ? events.length : 1, + events: eventsArray.length, settings: req.settings, attributes: options?.attributes, options, @@ -678,7 +752,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { this.client.insert({ table: req.table, format: "JSONEachRow", - values: Array.isArray(events) ? events : [events], + values: eventsArray, query_id: queryId, ...options?.params, clickhouse_settings: { @@ -725,26 +799,119 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { }); }; } + + public insertCompactRaw(req: { + name: string; + table: string; + columns: readonly string[]; + settings?: ClickHouseSettings; + }): ( + events: readonly any[][] | any[], + options?: { + attributes?: Record; + params?: BaseQueryParams; + } + ) => Promise> { + return async (events, options) => { + const queryId = randomUUID(); + + return await startSpan(this.tracer, "insert", async (span) => { + // Check if events is a single row (array) or multiple rows (array of arrays) + // If first element is not an array, treat as single row + const isSingleRow = events.length > 0 && !Array.isArray(events[0]); + const eventsArray: readonly any[][] = isSingleRow + ? [events as any[]] + : (events as readonly any[][]); + + this.logger.debug("Inserting into clickhouse (compact raw)", { + clientName: this.name, + name: req.name, + table: req.table, + events: eventsArray.length, + settings: req.settings, + attributes: options?.attributes, + options, + queryId, + }); + + span.setAttributes({ + "clickhouse.clientName": this.name, + "clickhouse.tableName": req.table, + "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, + "clickhouse.format": "JSONCompactEachRowWithNames", + ...flattenAttributes(req.settings, "clickhouse.settings"), + ...flattenAttributes(options?.attributes), + }); + + // Build compact format: [columns, ...rows] + // Data is already in array format, no conversion needed + const compactData: any[] = [Array.from(req.columns), ...eventsArray]; + + const [clickhouseError, result] = await tryCatch( + this.client.insert({ + table: req.table, + format: "JSONCompactEachRowWithNames", + values: compactData, + query_id: queryId, + ...options?.params, + clickhouse_settings: { + ...req.settings, + ...options?.params?.clickhouse_settings, + }, + }) + ); + + if (clickhouseError) { + this.logger.error("Error inserting into clickhouse", { + name: req.name, + error: clickhouseError, + table: req.table, + }); + + recordClickhouseError(span, clickhouseError); + return [new InsertError(clickhouseError.message), null]; + } + + this.logger.debug("Inserted into clickhouse", { + clientName: this.name, + name: req.name, + table: req.table, + result, + queryId, + }); + + span.setAttributes({ + "clickhouse.query_id": result.query_id, + "clickhouse.executed": result.executed, + "clickhouse.summary.read_rows": result.summary?.read_rows, + "clickhouse.summary.read_bytes": result.summary?.read_bytes, + "clickhouse.summary.written_rows": result.summary?.written_rows, + "clickhouse.summary.written_bytes": result.summary?.written_bytes, + "clickhouse.summary.total_rows_to_read": result.summary?.total_rows_to_read, + "clickhouse.summary.result_rows": result.summary?.result_rows, + "clickhouse.summary.result_bytes": result.summary?.result_bytes, + "clickhouse.summary.elapsed_ns": result.summary?.elapsed_ns, + }); + + return [null, result]; + }); + }; + } } -function recordClickhouseError(span: Span, error: Error) { +function recordClickhouseError(span: Span, error: Error): void { if (error instanceof ClickHouseError) { span.setAttributes({ "clickhouse.error.code": error.code, "clickhouse.error.message": error.message, "clickhouse.error.type": error.type, }); - recordSpanError(span, error); - } else { - recordSpanError(span, error); } + recordSpanError(span, error); } -function convertLogLevelToClickhouseLogLevel(logLevel?: LogLevel) { - if (!logLevel) { - return ClickHouseLogLevel.INFO; - } - +function convertLogLevelToClickhouseLogLevel(logLevel?: LogLevel): ClickHouseLogLevel { switch (logLevel) { case "debug": return ClickHouseLogLevel.DEBUG; diff --git a/internal-packages/clickhouse/src/client/noop.ts b/internal-packages/clickhouse/src/client/noop.ts index 3509297f9f..e0872cada6 100644 --- a/internal-packages/clickhouse/src/client/noop.ts +++ b/internal-packages/clickhouse/src/client/noop.ts @@ -159,4 +159,61 @@ export class NoopClient implements ClickhouseReader, ClickhouseWriter { ]; }; } + + public insertCompact>(req: { + name: string; + table: string; + columns: readonly string[]; + toArray: (record: TRecord) => any[]; + settings?: ClickHouseSettings; + }): (events: TRecord | TRecord[]) => Promise> { + return async (events: TRecord | TRecord[]) => { + return [ + null, + { + executed: true, + query_id: "noop", + summary: { + read_rows: "0", + read_bytes: "0", + written_rows: "0", + written_bytes: "0", + total_rows_to_read: "0", + result_rows: "0", + result_bytes: "0", + elapsed_ns: "0", + }, + response_headers: {}, + }, + ]; + }; + } + + public insertCompactRaw(req: { + name: string; + table: string; + columns: readonly string[]; + settings?: ClickHouseSettings; + }): (events: readonly any[][] | any[]) => Promise> { + return async (events: readonly any[][] | any[]) => { + return [ + null, + { + executed: true, + query_id: "noop", + summary: { + read_rows: "0", + read_bytes: "0", + written_rows: "0", + written_bytes: "0", + total_rows_to_read: "0", + result_rows: "0", + result_bytes: "0", + elapsed_ns: "0", + }, + response_headers: {}, + }, + ]; + }; + } } diff --git a/internal-packages/clickhouse/src/client/types.ts b/internal-packages/clickhouse/src/client/types.ts index 25cd2efde0..7120422508 100644 --- a/internal-packages/clickhouse/src/client/types.ts +++ b/internal-packages/clickhouse/src/client/types.ts @@ -220,5 +220,26 @@ export interface ClickhouseWriter { settings?: ClickHouseSettings; }): ClickhouseInsertFunction; + insertCompact>(req: { + name: string; + table: string; + columns: readonly string[]; + toArray: (record: TRecord) => any[]; + settings?: ClickHouseSettings; + }): ClickhouseInsertFunction; + + insertCompactRaw(req: { + name: string; + table: string; + columns: readonly string[]; + settings?: ClickHouseSettings; + }): ( + events: readonly any[][] | any[], + options?: { + attributes?: Record; + params?: BaseQueryParams; + } + ) => Promise>; + close(): Promise; } diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 03b8b81e13..a11fc2edc1 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -3,8 +3,8 @@ import { ClickhouseClient } from "./client/client.js"; import { ClickhouseReader, ClickhouseWriter } from "./client/types.js"; import { NoopClient } from "./client/noop.js"; import { - insertTaskRuns, - insertRawTaskRunPayloads, + insertTaskRunsCompactArrays, + insertRawTaskRunPayloadsCompactArrays, getTaskRunsQueryBuilder, getTaskActivityQueryBuilder, getCurrentRunningStats, @@ -31,6 +31,16 @@ export type * from "./taskRuns.js"; export type * from "./taskEvents.js"; export type * from "./client/queryBuilder.js"; +// Re-export column constants, indices, and type-safe accessors +export { + TASK_RUN_COLUMNS, + TASK_RUN_INDEX, + PAYLOAD_COLUMNS, + PAYLOAD_INDEX, + getTaskRunField, + getPayloadField, +} from "./taskRuns.js"; + // TSQL query execution export { executeTSQL, @@ -168,8 +178,8 @@ export class ClickHouse { get taskRuns() { return { - insert: insertTaskRuns(this.writer), - insertPayloads: insertRawTaskRunPayloads(this.writer), + insertCompactArrays: insertTaskRunsCompactArrays(this.writer), + insertPayloadsCompactArrays: insertRawTaskRunPayloadsCompactArrays(this.writer), queryBuilder: getTaskRunsQueryBuilder(this.reader), countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader), tagQueryBuilder: getTaskRunTagsQueryBuilder(this.reader), diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index b51c9f38c0..feecb63a00 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -1,7 +1,13 @@ import { clickhouseTest } from "@internal/testcontainers"; import { z } from "zod"; import { ClickhouseClient } from "./client/client.js"; -import { getTaskRunsQueryBuilder, insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js"; +import { + getTaskRunsQueryBuilder, + insertRawTaskRunPayloadsCompactArrays, + insertTaskRunsCompactArrays, + type TaskRunInsertArray, + type PayloadInsertArray, +} from "./taskRuns.js"; describe("Task Runs V2", () => { clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => { @@ -11,61 +17,64 @@ describe("Task Runs V2", () => { logLevel: "debug", }); - const insert = insertTaskRuns(client, { + const insert = insertTaskRunsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const insertPayloads = insertRawTaskRunPayloads(client, { + const insertPayloads = insertRawTaskRunPayloadsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertError, insertResult] = await insert([ - { - environment_id: "env_1234", - environment_type: "DEVELOPMENT", - organization_id: "org_1234", - project_id: "project_1234", - run_id: "run_1234", - friendly_id: "friendly_1234", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "my-task", - queue: "my-queue", - schedule_id: "schedule_1234", - batch_id: "batch_1234", - created_at: Date.now(), - updated_at: Date.now(), - completed_at: undefined, - tags: ["tag1", "tag2"], - output: { - key: "value", - }, - error: { - type: "BUILT_IN_ERROR", - name: "Error", - message: "error", - stackTrace: "stack trace", - }, - usage_duration_ms: 1000, - cost_in_cents: 100, - task_version: "1.0.0", - sdk_version: "1.0.0", - cli_version: "1.0.0", - machine_preset: "small-1x", - is_test: true, - span_id: "span_1234", - trace_id: "trace_1234", - idempotency_key: "idempotency_key_1234", - expiration_ttl: "1h", - root_run_id: "root_run_1234", - parent_run_id: "parent_run_1234", - depth: 1, - concurrency_key: "concurrency_key_1234", - bulk_action_group_ids: ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], - _version: "1", - }, - ]); + const now = Date.now(); + const taskRunData: TaskRunInsertArray = [ + "env_1234", // environment_id + "org_1234", // organization_id + "project_1234", // project_id + "run_1234", // run_id + now, // updated_at + now, // created_at + "PENDING", // status + "DEVELOPMENT", // environment_type + "friendly_1234", // friendly_id + 1, // attempt + "V2", // engine + "my-task", // task_identifier + "my-queue", // queue + "schedule_1234", // schedule_id + "batch_1234", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + null, // queued_at + null, // expired_at + 1000, // usage_duration_ms + 100, // cost_in_cents + 0, // base_cost_in_cents + { data: { key: "value" } }, // output + { data: { type: "BUILT_IN_ERROR", name: "Error", message: "error", stackTrace: "stack trace" } }, // error + ["tag1", "tag2"], // tags + "1.0.0", // task_version + "1.0.0", // sdk_version + "1.0.0", // cli_version + "small-1x", // machine_preset + "root_run_1234", // root_run_id + "parent_run_1234", // parent_run_id + 1, // depth + "span_1234", // span_id + "trace_1234", // trace_id + "idempotency_key_1234", // idempotency_key + "1h", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "concurrency_key_1234", // concurrency_key + ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const [insertError, insertResult] = await insert([taskRunData]); expect(insertError).toBeNull(); expect(insertResult).toEqual(expect.objectContaining({ executed: true })); @@ -99,15 +108,13 @@ describe("Task Runs V2", () => { ]) ); - const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([ - { - run_id: "run_1234", - created_at: Date.now(), - payload: { - key: "value", - }, - }, - ]); + const payloadData: PayloadInsertArray = [ + "run_1234", // run_id + Date.now(), // created_at + { data: { key: "value" } }, // payload + ]; + + const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([payloadData]); expect(insertPayloadsError).toBeNull(); expect(insertPayloadsResult).toEqual(expect.objectContaining({ executed: true })); @@ -137,96 +144,110 @@ describe("Task Runs V2", () => { url: clickhouseContainer.getConnectionUrl(), }); - const insert = insertTaskRuns(client, { + const insert = insertTaskRunsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertError, insertResult] = await insert([ - { - environment_id: "cm9kddfcs01zqdy88ld9mmrli", - organization_id: "cm8zs78wb0002dy616dg75tv3", - project_id: "cm9kddfbz01zpdy88t9dstecu", - run_id: "cma45oli70002qrdy47w0j4n7", - environment_type: "PRODUCTION", - friendly_id: "run_cma45oli70002qrdy47w0j4n7", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "retry-task", - queue: "task/retry-task", - schedule_id: "", - batch_id: "", - root_run_id: "", - parent_run_id: "", - depth: 0, - span_id: "538677637f937f54", - trace_id: "20a28486b0b9f50c647b35e8863e36a5", - idempotency_key: "", - created_at: new Date("2025-04-30 16:34:04.312").getTime(), - updated_at: new Date("2025-04-30 16:34:04.312").getTime(), - started_at: null, - executed_at: null, - completed_at: null, - delay_until: null, - queued_at: new Date("2025-04-30 16:34:04.311").getTime(), - expired_at: null, - expiration_ttl: "", - usage_duration_ms: 0, - cost_in_cents: 0, - base_cost_in_cents: 0, - output: null, - error: null, - tags: [], - task_version: "", - sdk_version: "", - cli_version: "", - machine_preset: "", - is_test: true, - _version: "1", - }, - { - environment_id: "cm9kddfcs01zqdy88ld9mmrli", - organization_id: "cm8zs78wb0002dy616dg75tv3", - project_id: "cm9kddfbz01zpdy88t9dstecu", - run_id: "cma45oli70002qrdy47w0j4n7", - environment_type: "PRODUCTION", - friendly_id: "run_cma45oli70002qrdy47w0j4n7", - attempt: 1, - engine: "V2", - status: "COMPLETED_SUCCESSFULLY", - task_identifier: "retry-task", - queue: "task/retry-task", - schedule_id: "", - batch_id: "", - root_run_id: "", - parent_run_id: "", - depth: 0, - span_id: "538677637f937f54", - trace_id: "20a28486b0b9f50c647b35e8863e36a5", - idempotency_key: "", - created_at: new Date("2025-04-30 16:34:04.312").getTime(), - updated_at: new Date("2025-04-30 16:34:04.312").getTime(), - started_at: null, - executed_at: null, - completed_at: null, - delay_until: null, - queued_at: new Date("2025-04-30 16:34:04.311").getTime(), - expired_at: null, - expiration_ttl: "", - usage_duration_ms: 0, - cost_in_cents: 0, - base_cost_in_cents: 0, - output: null, - error: null, - tags: [], - task_version: "", - sdk_version: "", - cli_version: "", - machine_preset: "", - is_test: true, - _version: "2", - }, - ]); + const createdAt = new Date("2025-04-30 16:34:04.312").getTime(); + const queuedAt = new Date("2025-04-30 16:34:04.311").getTime(); + + const run1: TaskRunInsertArray = [ + "cm9kddfcs01zqdy88ld9mmrli", // environment_id + "cm8zs78wb0002dy616dg75tv3", // organization_id + "cm9kddfbz01zpdy88t9dstecu", // project_id + "cma45oli70002qrdy47w0j4n7", // run_id + createdAt, // updated_at + createdAt, // created_at + "PENDING", // status + "PRODUCTION", // environment_type + "run_cma45oli70002qrdy47w0j4n7", // friendly_id + 1, // attempt + "V2", // engine + "retry-task", // task_identifier + "task/retry-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + queuedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "538677637f937f54", // span_id + "20a28486b0b9f50c647b35e8863e36a5", // trace_id + "", // idempotency_key + "", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const run2: TaskRunInsertArray = [ + "cm9kddfcs01zqdy88ld9mmrli", // environment_id + "cm8zs78wb0002dy616dg75tv3", // organization_id + "cm9kddfbz01zpdy88t9dstecu", // project_id + "cma45oli70002qrdy47w0j4n7", // run_id + createdAt, // updated_at + createdAt, // created_at + "COMPLETED_SUCCESSFULLY", // status + "PRODUCTION", // environment_type + "run_cma45oli70002qrdy47w0j4n7", // friendly_id + 1, // attempt + "V2", // engine + "retry-task", // task_identifier + "task/retry-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + queuedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "538677637f937f54", // span_id + "20a28486b0b9f50c647b35e8863e36a5", // trace_id + "", // idempotency_key + "", // expiration_ttl + true, // is_test + "2", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const [insertError, insertResult] = await insert([run1, run2]); expect(insertError).toBeNull(); expect(insertResult).toEqual(expect.objectContaining({ executed: true })); @@ -266,54 +287,62 @@ describe("Task Runs V2", () => { url: clickhouseContainer.getConnectionUrl(), }); - const insert = insertTaskRuns(client, { + const insert = insertTaskRunsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertError, insertResult] = await insert([ - { - environment_id: "cm9kddfcs01zqdy88ld9mmrli", - organization_id: "cm8zs78wb0002dy616dg75tv3", - project_id: "cm9kddfbz01zpdy88t9dstecu", - run_id: "cma45oli70002qrdy47w0j4n7", - environment_type: "PRODUCTION", - friendly_id: "run_cma45oli70002qrdy47w0j4n7", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "retry-task", - queue: "task/retry-task", - schedule_id: "", - batch_id: "", - root_run_id: "", - parent_run_id: "", - depth: 0, - span_id: "538677637f937f54", - trace_id: "20a28486b0b9f50c647b35e8863e36a5", - idempotency_key: "", - created_at: new Date("2025-04-30 16:34:04.312").getTime(), - updated_at: new Date("2025-04-30 16:34:04.312").getTime(), - started_at: null, - executed_at: null, - completed_at: null, - delay_until: null, - queued_at: new Date("2025-04-30 16:34:04.311").getTime(), - expired_at: null, - expiration_ttl: "", - usage_duration_ms: 0, - cost_in_cents: 0, - base_cost_in_cents: 0, - output: null, - error: null, - tags: [], - task_version: "", - sdk_version: "", - cli_version: "", - machine_preset: "", - is_test: true, - _version: "1", - }, - ]); + const createdAt = new Date("2025-04-30 16:34:04.312").getTime(); + const queuedAt = new Date("2025-04-30 16:34:04.311").getTime(); + + const taskRun: TaskRunInsertArray = [ + "cm9kddfcs01zqdy88ld9mmrli", // environment_id + "cm8zs78wb0002dy616dg75tv3", // organization_id + "cm9kddfbz01zpdy88t9dstecu", // project_id + "cma45oli70002qrdy47w0j4n7", // run_id + createdAt, // updated_at + createdAt, // created_at + "PENDING", // status + "PRODUCTION", // environment_type + "run_cma45oli70002qrdy47w0j4n7", // friendly_id + 1, // attempt + "V2", // engine + "retry-task", // task_identifier + "task/retry-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + queuedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "538677637f937f54", // span_id + "20a28486b0b9f50c647b35e8863e36a5", // trace_id + "", // idempotency_key + "", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const [insertError, insertResult] = await insert([taskRun]); const queryBuilder = getTaskRunsQueryBuilder(client)(); queryBuilder.where("environment_id = {environmentId: String}", { @@ -360,15 +389,15 @@ describe("Task Runs V2", () => { url: clickhouseContainer.getConnectionUrl(), }); - const insertPayloads = insertRawTaskRunPayloads(client, { + const insertPayloads = insertRawTaskRunPayloadsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([ + const payloadData: PayloadInsertArray = [ + "run_1234", // run_id + Date.now(), // created_at { - run_id: "run_1234", - created_at: Date.now(), - payload: { + data: { data: { title: { id: "123", @@ -376,8 +405,10 @@ describe("Task Runs V2", () => { "title.id": 123, }, }, - }, - ]); + }, // payload + ]; + + const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([payloadData]); expect(insertPayloadsError).toBeNull(); expect(insertPayloadsResult).toEqual(expect.objectContaining({ executed: true })); diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index d57a8b2a3e..1e8df3b28d 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -52,6 +52,144 @@ export const TaskRunV2 = z.object({ export type TaskRunV2 = z.input; +// Column order for compact format - must match ClickHouse table schema +export const TASK_RUN_COLUMNS = [ + "environment_id", + "organization_id", + "project_id", + "run_id", + "updated_at", + "created_at", + "status", + "environment_type", + "friendly_id", + "attempt", + "engine", + "task_identifier", + "queue", + "schedule_id", + "batch_id", + "completed_at", + "started_at", + "executed_at", + "delay_until", + "queued_at", + "expired_at", + "usage_duration_ms", + "cost_in_cents", + "base_cost_in_cents", + "output", + "error", + "tags", + "task_version", + "sdk_version", + "cli_version", + "machine_preset", + "root_run_id", + "parent_run_id", + "depth", + "span_id", + "trace_id", + "idempotency_key", + "expiration_ttl", + "is_test", + "_version", + "_is_deleted", + "concurrency_key", + "bulk_action_group_ids", + "worker_queue", + "max_duration_in_seconds", +] as const; + +export type TaskRunColumnName = (typeof TASK_RUN_COLUMNS)[number]; + +// Type-safe column indices generated from TASK_RUN_COLUMNS +// This ensures indices stay in sync with column order automatically +export const TASK_RUN_INDEX = Object.fromEntries( + TASK_RUN_COLUMNS.map((col, idx) => [col, idx]) +) as { readonly [K in TaskRunColumnName]: number }; + +/** + * Type mapping from column name to its type in TaskRunInsertArray. + * This enables type-safe field access without manual casting. + */ +export type TaskRunFieldTypes = { + environment_id: string; + organization_id: string; + project_id: string; + run_id: string; + updated_at: number; + created_at: number; + status: string; + environment_type: string; + friendly_id: string; + attempt: number; + engine: string; + task_identifier: string; + queue: string; + schedule_id: string; + batch_id: string; + completed_at: number | null; + started_at: number | null; + executed_at: number | null; + delay_until: number | null; + queued_at: number | null; + expired_at: number | null; + usage_duration_ms: number; + cost_in_cents: number; + base_cost_in_cents: number; + output: { data: unknown }; + error: { data: unknown }; + tags: string[]; + task_version: string; + sdk_version: string; + cli_version: string; + machine_preset: string; + root_run_id: string; + parent_run_id: string; + depth: number; + span_id: string; + trace_id: string; + idempotency_key: string; + expiration_ttl: string; + is_test: boolean; + _version: string; + _is_deleted: number; + concurrency_key: string; + bulk_action_group_ids: string[]; + worker_queue: string; + max_duration_in_seconds: number | null; +}; + +/** + * Type-safe accessor for TaskRunInsertArray fields. + * Returns the correct type for each field without manual casting. + * + * @example + * const orgId = getTaskRunField(run, "organization_id"); // type: string + * const createdAt = getTaskRunField(run, "created_at"); // type: number + */ +export function getTaskRunField( + run: TaskRunInsertArray, + field: K +): TaskRunFieldTypes[K] { + return run[TASK_RUN_INDEX[field]] as TaskRunFieldTypes[K]; +} + +export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) { + return ch.insertCompactRaw({ + name: "insertTaskRunsCompactArrays", + table: "trigger_dev.task_runs_v2", + columns: TASK_RUN_COLUMNS, + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + ...settings, + }, + }); +} + +// Object-based insert function for tests and non-performance-critical code export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertTaskRuns", @@ -73,6 +211,114 @@ export const RawTaskRunPayloadV1 = z.object({ export type RawTaskRunPayloadV1 = z.infer; +export const PAYLOAD_COLUMNS = ["run_id", "created_at", "payload"] as const; + +export type PayloadColumnName = (typeof PAYLOAD_COLUMNS)[number]; + +// Type-safe column indices generated from PAYLOAD_COLUMNS +export const PAYLOAD_INDEX = Object.fromEntries(PAYLOAD_COLUMNS.map((col, idx) => [col, idx])) as { + readonly [K in PayloadColumnName]: number; +}; + +/** + * Type mapping from column name to its type in PayloadInsertArray. + */ +export type PayloadFieldTypes = { + run_id: string; + created_at: number; + payload: { data: unknown }; +}; + +/** + * Type-safe accessor for PayloadInsertArray fields. + * Returns the correct type for each field without manual casting. + */ +export function getPayloadField( + payload: PayloadInsertArray, + field: K +): PayloadFieldTypes[K] { + return payload[PAYLOAD_INDEX[field]] as PayloadFieldTypes[K]; +} + +/** + * Type-safe tuple representing a task run insert array. + * Order matches TASK_RUN_COLUMNS exactly. + */ +export type TaskRunInsertArray = [ + environment_id: string, + organization_id: string, + project_id: string, + run_id: string, + updated_at: number, + created_at: number, + status: string, + environment_type: string, + friendly_id: string, + attempt: number, + engine: string, + task_identifier: string, + queue: string, + schedule_id: string, + batch_id: string, + completed_at: number | null, + started_at: number | null, + executed_at: number | null, + delay_until: number | null, + queued_at: number | null, + expired_at: number | null, + usage_duration_ms: number, + cost_in_cents: number, + base_cost_in_cents: number, + output: { data: unknown }, + error: { data: unknown }, + tags: string[], + task_version: string, + sdk_version: string, + cli_version: string, + machine_preset: string, + root_run_id: string, + parent_run_id: string, + depth: number, + span_id: string, + trace_id: string, + idempotency_key: string, + expiration_ttl: string, + is_test: boolean, + _version: string, + _is_deleted: number, + concurrency_key: string, + bulk_action_group_ids: string[], + worker_queue: string, + max_duration_in_seconds: number | null, +]; + +/** + * Type-safe tuple representing a payload insert array. + * Order matches PAYLOAD_COLUMNS exactly. + */ +export type PayloadInsertArray = [run_id: string, created_at: number, payload: { data: unknown }]; + +export function insertRawTaskRunPayloadsCompactArrays( + ch: ClickhouseWriter, + settings?: ClickHouseSettings +) { + return ch.insertCompactRaw({ + name: "insertRawTaskRunPayloadsCompactArrays", + table: "trigger_dev.raw_task_runs_payload_v1", + columns: PAYLOAD_COLUMNS, + settings: { + async_insert: 1, + wait_for_async_insert: 0, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + ...settings, + }, + }); +} + +// Object-based insert function for tests and non-performance-critical code export function insertRawTaskRunPayloads(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertRawTaskRunPayloads", diff --git a/internal-packages/replication/src/pgoutput.ts b/internal-packages/replication/src/pgoutput.ts index 0e75a697f4..809ad87758 100644 --- a/internal-packages/replication/src/pgoutput.ts +++ b/internal-packages/replication/src/pgoutput.ts @@ -20,6 +20,18 @@ export type PgoutputMessage = | MessageType | MessageUpdate; +export type PgoutputMessageArray = + | MessageBegin + | MessageCommit + | MessageDeleteArray + | MessageInsertArray + | MessageMessage + | MessageOrigin + | MessageRelation + | MessageTruncate + | MessageType + | MessageUpdateArray; + export interface MessageBegin { tag: "begin"; commitLsn: string | null; @@ -95,6 +107,26 @@ export interface MessageUpdate { new: Record; } +// Array variants for zero-copy performance +export interface MessageInsertArray { + tag: "insert"; + relation: MessageRelation; + new: any[]; +} +export interface MessageUpdateArray { + tag: "update"; + relation: MessageRelation; + key: any[] | null; + old: any[] | null; + new: any[]; +} +export interface MessageDeleteArray { + tag: "delete"; + relation: MessageRelation; + key: any[] | null; + old: any[] | null; +} + class BinaryReader { private offset = 0; constructor(private buf: Buffer) {} @@ -193,6 +225,35 @@ export class PgoutputParser { } } + public parseArray(buf: Buffer): PgoutputMessageArray { + const reader = new BinaryReader(buf); + const tag = reader.readUint8(); + switch (tag) { + case 0x42: + return this.msgBegin(reader); + case 0x4f: + return this.msgOrigin(reader); + case 0x59: + return this.msgType(reader); + case 0x52: + return this.msgRelation(reader); + case 0x49: + return this.msgInsertArray(reader); + case 0x55: + return this.msgUpdateArray(reader); + case 0x44: + return this.msgDeleteArray(reader); + case 0x54: + return this.msgTruncate(reader); + case 0x4d: + return this.msgMessage(reader); + case 0x43: + return this.msgCommit(reader); + default: + throw Error("unknown pgoutput message"); + } + } + private msgBegin(reader: BinaryReader): MessageBegin { return { tag: "begin", @@ -312,6 +373,55 @@ export class PgoutputParser { } return { tag: "delete", relation, key, old }; } + + // Array variants - skip object creation for performance + private msgInsertArray(reader: BinaryReader): MessageInsertArray { + const relation = this._relationCache.get(reader.readInt32()); + if (!relation) throw Error("missing relation"); + reader.readUint8(); // consume the 'N' key + return { + tag: "insert", + relation, + new: this.readTupleAsArray(reader, relation), + }; + } + private msgUpdateArray(reader: BinaryReader): MessageUpdateArray { + const relation = this._relationCache.get(reader.readInt32()); + if (!relation) throw Error("missing relation"); + let key: any[] | null = null; + let old: any[] | null = null; + let new_: any[] | null = null; + const subMsgKey = reader.readUint8(); + if (subMsgKey === 0x4b) { + key = this.readTupleAsArray(reader, relation); + reader.readUint8(); + new_ = this.readTupleAsArray(reader, relation); + } else if (subMsgKey === 0x4f) { + old = this.readTupleAsArray(reader, relation); + reader.readUint8(); + new_ = this.readTupleAsArray(reader, relation, old); + } else if (subMsgKey === 0x4e) { + new_ = this.readTupleAsArray(reader, relation); + } else { + throw Error(`unknown submessage key ${String.fromCharCode(subMsgKey)}`); + } + return { tag: "update", relation, key, old, new: new_ }; + } + private msgDeleteArray(reader: BinaryReader): MessageDeleteArray { + const relation = this._relationCache.get(reader.readInt32()); + if (!relation) throw Error("missing relation"); + let key: any[] | null = null; + let old: any[] | null = null; + const subMsgKey = reader.readUint8(); + if (subMsgKey === 0x4b) { + key = this.readTupleAsArray(reader, relation); + } else if (subMsgKey === 0x4f) { + old = this.readTupleAsArray(reader, relation); + } else { + throw Error(`unknown submessage key ${String.fromCharCode(subMsgKey)}`); + } + return { tag: "delete", relation, key, old }; + } private readKeyTuple(reader: BinaryReader, relation: MessageRelation): Record { const tuple = this.readTuple(reader, relation); const key = Object.create(null); @@ -354,6 +464,40 @@ export class PgoutputParser { } return tuple; } + + private readTupleAsArray( + reader: BinaryReader, + { columns }: MessageRelation, + unchangedToastFallback?: any[] | null + ): any[] { + const nfields = reader.readInt16(); + const tuple = new Array(nfields); + for (let i = 0; i < nfields; i++) { + const { parser } = columns[i]; + const kind = reader.readUint8(); + switch (kind) { + case 0x62: // 'b' binary + const bsize = reader.readInt32(); + tuple[i] = reader.read(bsize); + break; + case 0x74: // 't' text + const valsize = reader.readInt32(); + const valbuf = reader.read(valsize); + const valtext = reader.decodeText(valbuf); + tuple[i] = parser(valtext); + break; + case 0x6e: // 'n' null + tuple[i] = null; + break; + case 0x75: // 'u' unchanged toast datum + tuple[i] = unchangedToastFallback?.[i]; + break; + default: + throw Error(`unknown attribute kind ${String.fromCharCode(kind)}`); + } + } + return tuple; + } private msgTruncate(reader: BinaryReader): MessageTruncate { const nrels = reader.readInt32(); const flags = reader.readUint8(); diff --git a/packages/core/src/v3/imports/superjson-cjs.cts b/packages/core/src/v3/imports/superjson-cjs.cts new file mode 100644 index 0000000000..a7f1466e7c --- /dev/null +++ b/packages/core/src/v3/imports/superjson-cjs.cts @@ -0,0 +1,15 @@ +// @ts-ignore +const { default: superjson } = require("superjson"); + +// @ts-ignore +superjson.registerCustom( + { + isApplicable: (v: unknown): v is Buffer => typeof Buffer === "function" && Buffer.isBuffer(v), + serialize: (v: Buffer) => [...v], + deserialize: (v: number[]) => Buffer.from(v), + }, + "buffer" +); + +// @ts-ignore +module.exports.default = superjson; diff --git a/packages/core/src/v3/imports/superjson.ts b/packages/core/src/v3/imports/superjson.ts new file mode 100644 index 0000000000..aa29250523 --- /dev/null +++ b/packages/core/src/v3/imports/superjson.ts @@ -0,0 +1,14 @@ +// @ts-ignore +import superjson from "superjson"; + +superjson.registerCustom( + { + isApplicable: (v): v is Buffer => typeof Buffer === "function" && Buffer.isBuffer(v), + serialize: (v) => [...v], + deserialize: (v) => Buffer.from(v), + }, + "buffer" +); + +// @ts-ignore +export default superjson; diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index 9bacc41422..ed4d2a0895 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -13,6 +13,7 @@ import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { TriggerTracer } from "../tracer.js"; import { zodfetch } from "../zodfetch.js"; import { flattenAttributes } from "./flattenAttributes.js"; +import superjson from "../imports/superjson.js"; export type IOPacket = { data?: string | undefined; @@ -32,9 +33,7 @@ export async function parsePacket(value: IOPacket, options?: ParsePacketOptions) case "application/json": return JSON.parse(value.data, makeSafeReviver(options)); case "application/super+json": - const { parse } = await loadSuperJSON(); - - return parse(value.data); + return superjson.parse(value.data); case "text/plain": return value.data; case "application/store": @@ -58,11 +57,9 @@ export async function parsePacketAsJson( case "application/json": return JSON.parse(value.data, makeSafeReviver(options)); case "application/super+json": - const { parse, serialize } = await loadSuperJSON(); - - const superJsonResult = parse(value.data); + const superJsonResult = superjson.parse(value.data); - const { json } = serialize(superJsonResult); + const { json } = superjson.serialize(superJsonResult); return json; case "text/plain": @@ -95,8 +92,7 @@ export async function stringifyIO(value: any): Promise { } try { - const { stringify } = await loadSuperJSON(); - const data = stringify(value); + const data = superjson.stringify(value); return { data, dataType: "application/super+json" }; } catch { @@ -302,14 +298,12 @@ export async function createPacketAttributes( [dataTypeKey]: packet.dataType, }; case "application/super+json": - const { parse } = await loadSuperJSON(); - if (typeof packet.data === "undefined" || packet.data === null) { return; } try { - const parsed = parse(packet.data) as any; + const parsed = superjson.parse(packet.data) as any; const jsonified = JSON.parse(JSON.stringify(parsed, makeSafeReplacer())); const result = { @@ -358,9 +352,7 @@ export async function createPacketAttributesAsJson( ); } case "application/super+json": { - const { deserialize } = await loadSuperJSON(); - - const deserialized = deserialize(data) as any; + const deserialized = superjson.deserialize(data) as any; const jsonify = safeJsonParse(JSON.stringify(deserialized, makeSafeReplacer())); return imposeAttributeLimits( @@ -390,18 +382,16 @@ export async function prettyPrintPacket( rawData = safeJsonParse(rawData); } - const { deserialize } = await loadSuperJSON(); - const hasCircularReferences = rawData && rawData.meta && hasCircularReference(rawData.meta); if (hasCircularReferences) { - return await prettyPrintPacket(deserialize(rawData), "application/json", { + return await prettyPrintPacket(superjson.deserialize(rawData), "application/json", { ...options, cloneReferences: false, }); } - return await prettyPrintPacket(deserialize(rawData), "application/json", { + return await prettyPrintPacket(superjson.deserialize(rawData), "application/json", { ...options, cloneReferences: true, }); @@ -512,21 +502,6 @@ function getPacketExtension(outputType: string): string { } } -async function loadSuperJSON() { - const superjson = await import("superjson"); - - superjson.registerCustom( - { - isApplicable: (v): v is Buffer => typeof Buffer === "function" && Buffer.isBuffer(v), - serialize: (v) => [...v], - deserialize: (v) => Buffer.from(v), - }, - "buffer" - ); - - return superjson; -} - function safeJsonParse(value: string): any { try { return JSON.parse(value); @@ -554,7 +529,6 @@ function safeJsonParse(value: string): any { * @throws {Error} If the newPayload is not valid JSON */ export async function replaceSuperJsonPayload(original: string, newPayload: string) { - const superjson = await loadSuperJSON(); const originalObject = superjson.parse(original); const newPayloadObject = JSON.parse(newPayload); const { meta } = superjson.serialize(originalObject); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 185ae798c3..57989c3f7f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -5594,9 +5594,6 @@ packages: resolution: {integrity: sha512-R8gLRTZeyp03ymzP/6Lil/28tGeGEzhx1q2k703KGWRAI1VdvPIXdG70VJc2pAMw3NA6JKL5hhFu1sJX0Mnn/A==} engines: {node: '>=6.0.0'} - '@jridgewell/source-map@0.3.11': - resolution: {integrity: sha512-ZMp1V8ZFcPG5dIWnQLr3NSI1MiCU7UETdS/A0G8V/XWHvJv3ZsFqutJn1Y5RPmAPX6F3BiE397OqveU/9NCuIA==} - '@jridgewell/source-map@0.3.3': resolution: {integrity: sha512-b+fsZXeLYi9fEULmfBrhxn4IrPlINf8fiNarzTof004v3lFdntdwa9PF7vFJqm3mg7s+ScJMxXaE3Acp1irZcg==} @@ -17273,9 +17270,6 @@ packages: pump@2.0.1: resolution: {integrity: sha512-ruPMNRkN3MHP1cWJc9OWr+T/xDP0jhXYCLfJcBuX54hhfIBnaQmAUMfDcG4DM5UMWByBbJY69QSphm3jtDKIkA==} - pump@3.0.0: - resolution: {integrity: sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==} - pump@3.0.2: resolution: {integrity: sha512-tUPXtzlGM8FE3P0ZL6DVs/3P58k9nk8/jZeQCurTJylQA8qFYzHFfhBJkuqyE0FifOsQ0uKWekiZ5g8wtr28cw==} @@ -18770,11 +18764,6 @@ packages: uglify-js: optional: true - terser@5.17.1: - resolution: {integrity: sha512-hVl35zClmpisy6oaoKALOpS0rDYLxRFLHhRuDlEGTKey9qHjS1w9GMORjuwIMt70Wan4lwsLYyWDVnWgF+KUEw==} - engines: {node: '>=10'} - hasBin: true - terser@5.44.1: resolution: {integrity: sha512-t/R3R/n0MSwnnazuPpPNVO60LX0SKL45pyl9YlvxIdkH0Of7D5qM2EVe+yASRIlY5pZ73nclYJfNANGWPwFDZw==} engines: {node: '>=10'} @@ -23906,11 +23895,6 @@ snapshots: '@jridgewell/set-array@1.2.1': {} - '@jridgewell/source-map@0.3.11': - dependencies: - '@jridgewell/gen-mapping': 0.3.13 - '@jridgewell/trace-mapping': 0.3.31 - '@jridgewell/source-map@0.3.3': dependencies: '@jridgewell/gen-mapping': 0.3.8 @@ -24304,7 +24288,7 @@ snapshots: json-parse-even-better-errors: 3.0.0 normalize-package-data: 5.0.0 proc-log: 3.0.0 - semver: 7.7.2 + semver: 7.7.3 transitivePeerDependencies: - bluebird @@ -24902,7 +24886,7 @@ snapshots: '@types/shimmer': 1.2.0 import-in-the-middle: 1.11.0 require-in-the-middle: 7.1.1(supports-color@10.0.0) - semver: 7.7.2 + semver: 7.7.3 shimmer: 1.2.1 transitivePeerDependencies: - supports-color @@ -24914,7 +24898,7 @@ snapshots: '@types/shimmer': 1.2.0 import-in-the-middle: 1.11.0 require-in-the-middle: 7.1.1(supports-color@10.0.0) - semver: 7.7.2 + semver: 7.7.3 shimmer: 1.2.1 transitivePeerDependencies: - supports-color @@ -31047,7 +31031,7 @@ snapshots: debug: 4.4.1(supports-color@10.0.0) globby: 11.1.0 is-glob: 4.0.3 - semver: 7.7.2 + semver: 7.7.3 tsutils: 3.21.0(typescript@5.9.3) optionalDependencies: typescript: 5.9.3 @@ -31064,7 +31048,7 @@ snapshots: '@typescript-eslint/typescript-estree': 5.59.6(typescript@5.9.3) eslint: 8.31.0 eslint-scope: 5.1.1 - semver: 7.7.2 + semver: 7.7.3 transitivePeerDependencies: - supports-color - typescript @@ -31553,17 +31537,17 @@ snapshots: mime-types: 3.0.0 negotiator: 1.0.0 - acorn-import-assertions@1.9.0(acorn@8.14.1): + acorn-import-assertions@1.9.0(acorn@8.15.0): dependencies: - acorn: 8.14.1 + acorn: 8.15.0 acorn-import-attributes@1.9.5(acorn@8.12.1): dependencies: acorn: 8.12.1 - acorn-import-attributes@1.9.5(acorn@8.14.1): + acorn-import-attributes@1.9.5(acorn@8.15.0): dependencies: - acorn: 8.14.1 + acorn: 8.15.0 acorn-import-phases@1.0.4(acorn@8.15.0): dependencies: @@ -31573,9 +31557,9 @@ snapshots: dependencies: acorn: 8.12.1 - acorn-jsx@5.3.2(acorn@8.14.1): + acorn-jsx@5.3.2(acorn@8.15.0): dependencies: - acorn: 8.14.1 + acorn: 8.15.0 acorn-node@1.8.2: dependencies: @@ -32094,7 +32078,7 @@ snapshots: dependencies: buffer: 5.7.1 inherits: 2.0.4 - readable-stream: 3.6.0 + readable-stream: 3.6.2 body-parser@1.20.3: dependencies: @@ -34166,8 +34150,8 @@ snapshots: espree@9.6.0: dependencies: - acorn: 8.14.1 - acorn-jsx: 5.3.2(acorn@8.14.1) + acorn: 8.15.0 + acorn-jsx: 5.3.2(acorn@8.15.0) eslint-visitor-keys: 3.4.2 esprima@4.0.1: {} @@ -34498,7 +34482,7 @@ snapshots: process-warning: 5.0.0 rfdc: 1.4.1 secure-json-parse: 4.0.0 - semver: 7.7.2 + semver: 7.7.3 toad-cache: 3.7.0 fastq@1.15.0: @@ -34991,7 +34975,7 @@ snapshots: chalk: 4.1.2 debug: 4.4.1(supports-color@10.0.0) interpret: 3.1.1 - semver: 7.7.2 + semver: 7.7.3 tslib: 2.8.1 yargs: 17.7.2 transitivePeerDependencies: @@ -35337,8 +35321,8 @@ snapshots: import-in-the-middle@1.14.2: dependencies: - acorn: 8.14.1 - acorn-import-attributes: 1.9.5(acorn@8.14.1) + acorn: 8.15.0 + acorn-import-attributes: 1.9.5(acorn@8.15.0) cjs-module-lexer: 1.2.3 module-details-from-path: 1.0.3 @@ -36714,8 +36698,8 @@ snapshots: micromark-extension-mdxjs@1.0.0: dependencies: - acorn: 8.14.1 - acorn-jsx: 5.3.2(acorn@8.14.1) + acorn: 8.15.0 + acorn-jsx: 5.3.2(acorn@8.15.0) micromark-extension-mdx-expression: 1.0.3 micromark-extension-mdx-jsx: 1.0.3 micromark-extension-mdx-md: 1.0.0 @@ -38405,7 +38389,7 @@ snapshots: mkdirp-classic: 0.5.3 napi-build-utils: 2.0.0 node-abi: 3.75.0 - pump: 3.0.0 + pump: 3.0.2 rc: 1.2.8 simple-get: 4.0.1 tar-fs: 2.1.3 @@ -38601,11 +38585,6 @@ snapshots: end-of-stream: 1.4.4 once: 1.4.0 - pump@3.0.0: - dependencies: - end-of-stream: 1.4.4 - once: 1.4.0 - pump@3.0.2: dependencies: end-of-stream: 1.4.4 @@ -40724,7 +40703,7 @@ snapshots: end-of-stream: 1.4.4 fs-constants: 1.0.0 inherits: 2.0.4 - readable-stream: 3.6.0 + readable-stream: 3.6.2 tar-stream@3.1.7: dependencies: @@ -40785,22 +40764,15 @@ snapshots: jest-worker: 27.5.1 schema-utils: 3.3.0 serialize-javascript: 6.0.1 - terser: 5.17.1 + terser: 5.44.1 webpack: 5.88.2(@swc/core@1.3.101(@swc/helpers@0.5.15))(esbuild@0.19.11) optionalDependencies: '@swc/core': 1.3.101(@swc/helpers@0.5.15) esbuild: 0.19.11 - terser@5.17.1: - dependencies: - '@jridgewell/source-map': 0.3.3 - acorn: 8.14.1 - commander: 2.20.3 - source-map-support: 0.5.21 - terser@5.44.1: dependencies: - '@jridgewell/source-map': 0.3.11 + '@jridgewell/source-map': 0.3.3 acorn: 8.15.0 commander: 2.20.3 source-map-support: 0.5.21 @@ -41794,7 +41766,7 @@ snapshots: webpack-bundle-analyzer@4.10.1(bufferutil@4.0.9): dependencies: '@discoveryjs/json-ext': 0.5.7 - acorn: 8.14.1 + acorn: 8.15.0 acorn-walk: 8.3.2 commander: 7.2.0 debounce: 1.2.1 @@ -41853,8 +41825,8 @@ snapshots: '@webassemblyjs/ast': 1.11.5 '@webassemblyjs/wasm-edit': 1.11.5 '@webassemblyjs/wasm-parser': 1.11.5 - acorn: 8.14.1 - acorn-import-assertions: 1.9.0(acorn@8.14.1) + acorn: 8.15.0 + acorn-import-assertions: 1.9.0(acorn@8.15.0) browserslist: 4.24.4 chrome-trace-event: 1.0.3 enhanced-resolve: 5.18.3