From 366788151aeb9a7e0a8b4de597053afd26880b32 Mon Sep 17 00:00:00 2001 From: Adam Benhassen Date: Tue, 27 Jan 2026 03:59:19 +0100 Subject: [PATCH 1/4] feat(app-deployments): improve app:create performance with v2 format --- docker/docker-compose.dev.yml | 4 +- .../tests/api/app-deployments.spec.ts | 795 ++++++++++++++++++ load-tests/app-deployments/README.md | 40 + load-tests/app-deployments/run-benchmark.ts | 392 +++++++++ .../libraries/cli/src/commands/app/create.ts | 155 +++- .../modules/app-deployments/module.graphql.ts | 77 ++ .../providers/app-deployments-manager.ts | 33 + .../providers/app-deployments.ts | 61 ++ .../providers/persisted-document-ingester.ts | 212 ++++- .../Mutation/addDocumentsToAppDeployment.ts | 2 + .../app-deployments/resolvers/Target.ts | 30 +- .../cdn-worker/src/artifact-storage-reader.ts | 88 +- 12 files changed, 1830 insertions(+), 59 deletions(-) create mode 100644 load-tests/app-deployments/README.md create mode 100644 load-tests/app-deployments/run-benchmark.ts diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index 44f3852c133..422b9e8d0bc 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -42,7 +42,8 @@ services: s3: image: quay.io/minio/minio:RELEASE.2025-09-07T16-13-09Z - mem_limit: 200m + mem_limit: 400m + cpus: 1 command: server /data --console-address ":9001" ports: - '9000:9000' @@ -75,6 +76,7 @@ services: clickhouse: image: clickhouse/clickhouse-server:24.8-alpine mem_limit: 4096m + cpus: 1 environment: CLICKHOUSE_USER: test CLICKHOUSE_PASSWORD: test diff --git a/integration-tests/tests/api/app-deployments.spec.ts b/integration-tests/tests/api/app-deployments.spec.ts index 5a5aab27b59..6b7fdd3e6d7 100644 --- a/integration-tests/tests/api/app-deployments.spec.ts +++ b/integration-tests/tests/api/app-deployments.spec.ts @@ -5,6 +5,7 @@ import { initSeed } from 'testkit/seed'; import { getServiceHost } from 'testkit/utils'; import { createHive } from '@graphql-hive/core'; import { graphql } from '../../testkit/gql'; +import { AppDeploymentFormatType } from '../../testkit/gql/graphql'; import { execute } from '../../testkit/graphql'; const CreateAppDeployment = graphql(` @@ -92,6 +93,11 @@ const AddDocumentsToAppDeployment = graphql(` version status } + timing { + totalMs + s3Ms + clickhouseMs + } } } } @@ -300,6 +306,7 @@ test('create app deployment, add operations, publish, access via CDN (happy path version: '1.0.0', status: 'pending', }, + timing: expect.any(Object), }, }); @@ -5211,3 +5218,791 @@ test('retire app deployment with --force bypasses protection', async () => { }, }); }); + +const GetExistingDocumentHashes = graphql(` + query GetExistingDocumentHashes($targetSelector: TargetSelectorInput!, $appName: String!) { + target(reference: { bySelector: $targetSelector }) { + appDeploymentDocumentHashes(appName: $appName) { + ok { + hashes + } + error { + message + } + } + } + } +`); + +const AddDocumentsToAppDeploymentWithFormat = graphql(` + mutation AddDocumentsToAppDeploymentWithFormat($input: AddDocumentsToAppDeploymentInput!) { + addDocumentsToAppDeployment(input: $input) { + error { + message + details { + index + message + } + } + ok { + appDeployment { + id + name + version + status + } + } + } + } +`); + +test('v2 format rejects non-sha256 hashes', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Try to add document with non-sha256 hash (should fail) + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { + hash: 'not-a-sha256-hash', + body: 'query { hello }', + }, + ], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(addDocumentsToAppDeployment.error).toBeTruthy(); + expect(addDocumentsToAppDeployment.error?.message).toContain('Invalid input'); + expect(addDocumentsToAppDeployment.error?.details?.message).toContain('sha256'); +}); + +test('v2 format rejects sha256 hash that does not match document content', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Valid sha256 format but does NOT match the document content + const wrongSha256Hash = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; + + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { + hash: wrongSha256Hash, + body: 'query { hello }', + }, + ], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(addDocumentsToAppDeployment.error).toBeTruthy(); + expect(addDocumentsToAppDeployment.error?.message).toContain('Hash does not match'); +}); + +test('v2 format accepts sha256 hash with sha256: prefix', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, createCdnAccess } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + const cdnAccess = await createCdnAccess(); + + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Valid sha256 hash that matches the content: sha256('query { hello }'), with prefix + const sha256Hash = 'ec2e01311ab3b02f3d8c8c712f9e579356d332cd007ac4c1ea5df727f482f05f'; + const sha256HashWithPrefix = `sha256:${sha256Hash}`; + + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { + hash: sha256HashWithPrefix, + body: 'query { hello }', + }, + ], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(addDocumentsToAppDeployment.error).toBeNull(); + expect(addDocumentsToAppDeployment.ok?.appDeployment.name).toBe('my-app'); + + // Activate and verify CDN access + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // CDN access uses the hash with prefix - stored and looked up with the prefix + const response = await fetch( + `${cdnAccess.cdnUrl}/apps/my-app/1.0.0/${sha256HashWithPrefix}`, + { + method: 'GET', + headers: { + 'X-Hive-CDN-Key': cdnAccess.secretAccessToken, + }, + }, + ); + expect(response.status).toBe(200); + expect(await response.text()).toBe('query { hello }'); +}); + +test('v1 format (legacy) accepts any hash format', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, createCdnAccess } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + const cdnAccess = await createCdnAccess(); + + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Add document with non-sha256 hash using V1 format (should succeed) + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { + hash: 'my-custom-hash', + body: 'query { hello }', + }, + ], + format: AppDeploymentFormatType.V1, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(addDocumentsToAppDeployment.error).toBeNull(); + expect(addDocumentsToAppDeployment.ok?.appDeployment.name).toBe('my-app'); + + // Activate and verify CDN access + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // v1 format: CDN path includes version + const response = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.0.0/my-custom-hash`, { + method: 'GET', + headers: { + 'X-Hive-CDN-Key': cdnAccess.secretAccessToken, + }, + }); + expect(response.status).toBe(200); + expect(await response.text()).toBe('query { hello }'); +}); + +test('v2 format accepts sha256 hashes and CDN access works', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, createCdnAccess } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + const cdnAccess = await createCdnAccess(); + + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Valid sha256 hash that matches the content: sha256('query { hello }') + const sha256Hash = 'ec2e01311ab3b02f3d8c8c712f9e579356d332cd007ac4c1ea5df727f482f05f'; + + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { + hash: sha256Hash, + body: 'query { hello }', + }, + ], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(addDocumentsToAppDeployment.error).toBeNull(); + + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // v2 format: CDN path still includes version in URL, but internally uses v2 key + const response = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.0.0/${sha256Hash}`, { + method: 'GET', + headers: { + 'X-Hive-CDN-Key': cdnAccess.secretAccessToken, + }, + }); + expect(response.status).toBe(200); + expect(await response.text()).toBe('query { hello }'); +}); + +test('appDeploymentDocumentHashes returns existing hashes for delta upload', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag, organization } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, project, target } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + // Create first version with some documents + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // sha256('query { hello }') + const hash1 = 'ec2e01311ab3b02f3d8c8c712f9e579356d332cd007ac4c1ea5df727f482f05f'; + // sha256('query hello { hello }') + const hash2 = '56a61ffe6bb6ed7e5163569143f0c73fc8e663c1843a8cc0776a818f1cb71faa'; + + await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { hash: hash1, body: 'query { hello }' }, + { hash: hash2, body: 'query hello { hello }' }, + ], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Query existing hashes + const { target: targetResult } = await execute({ + document: GetExistingDocumentHashes, + variables: { + targetSelector: { + organizationSlug: organization.slug, + projectSlug: project.slug, + targetSlug: target.slug, + }, + appName: 'my-app', + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(targetResult?.appDeploymentDocumentHashes.ok?.hashes).toContain(hash1); + expect(targetResult?.appDeploymentDocumentHashes.ok?.hashes).toContain(hash2); + expect(targetResult?.appDeploymentDocumentHashes.ok?.hashes?.length).toBe(2); +}); + +test('v2 format prevents hash collision by rejecting non-sha256 hashes', async () => { + // This test verifies the fix for: if someone uses operation name as hash (e.g., "GetUser"), + // different document bodies with the same "hash" across versions would overwrite each other. + // v2 format prevents this by requiring sha256 (content-based) hashes. + + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, createCdnAccess } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + world: String + } + `, + }); + + const cdnAccess = await createCdnAccess(); + + // Create v1.0.0 with operation name as hash (V1 format - allowed) + await execute({ + document: CreateAppDeployment, + variables: { input: { appName: 'my-app', appVersion: '1.0.0' } }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [{ hash: 'GetUser', body: 'query { hello }' }], // body A + format: AppDeploymentFormatType.V1, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: ActivateAppDeployment, + variables: { input: { appName: 'my-app', appVersion: '1.0.0' } }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Create v1.1.0 with SAME hash but DIFFERENT body (V1 format - allowed) + await execute({ + document: CreateAppDeployment, + variables: { input: { appName: 'my-app', appVersion: '1.1.0' } }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.1.0', + documents: [{ hash: 'GetUser', body: 'query { world }' }], // body B - DIFFERENT! + format: AppDeploymentFormatType.V1, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: ActivateAppDeployment, + variables: { input: { appName: 'my-app', appVersion: '1.1.0' } }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // v1 format: each version has its own S3 key, so no collision + // v1.0.0 should still return body A (query { hello }) + const responseV1 = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.0.0/GetUser`, { + headers: { 'X-Hive-CDN-Key': cdnAccess.secretAccessToken }, + }); + expect(responseV1.status).toBe(200); + expect(await responseV1.text()).toBe('query { hello }'); // body A preserved! + + // v1.1.0 should return body B (query { world }) + const responseV1_1 = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.1.0/GetUser`, { + headers: { 'X-Hive-CDN-Key': cdnAccess.secretAccessToken }, + }); + expect(responseV1_1.status).toBe(200); + expect(await responseV1_1.text()).toBe('query { world }'); // body B + + // Now try the same with v2 format - should be REJECTED because "GetUser" is not sha256 + await execute({ + document: CreateAppDeployment, + variables: { input: { appName: 'my-app', appVersion: '2.0.0' } }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '2.0.0', + documents: [{ hash: 'GetUser', body: 'query { hello }' }], + format: AppDeploymentFormatType.V2, // v2 format + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // v2 format rejects non-sha256 hashes to prevent collision + expect(addDocumentsToAppDeployment.error).toBeTruthy(); + expect(addDocumentsToAppDeployment.error?.details?.message).toContain('sha256'); +}); + +test('v2 format enables cross-version document sharing', async () => { + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag, organization } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, createCdnAccess, project, target } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + const cdnAccess = await createCdnAccess(); + // sha256('query { hello }') + const sharedHash = 'ec2e01311ab3b02f3d8c8c712f9e579356d332cd007ac4c1ea5df727f482f05f'; + + // Create v1.0.0 with a document + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [{ hash: sharedHash, body: 'query { hello }' }], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Create v2.0.0 - the same hash should be available via delta query + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '2.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Query should show the hash from v1.0.0 + const { target: targetResult } = await execute({ + document: GetExistingDocumentHashes, + variables: { + targetSelector: { + organizationSlug: organization.slug, + projectSlug: project.slug, + targetSlug: target.slug, + }, + appName: 'my-app', + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(targetResult?.appDeploymentDocumentHashes.ok?.hashes).toContain(sharedHash); + + // Add the same document to v2.0.0 (should succeed, uses shared storage) + const { addDocumentsToAppDeployment } = await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '2.0.0', + documents: [{ hash: sharedHash, body: 'query { hello }' }], + format: AppDeploymentFormatType.V2, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + expect(addDocumentsToAppDeployment.error).toBeNull(); + + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '2.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Both versions should be able to access the document via CDN + const responseV1 = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.0.0/${sharedHash}`, { + headers: { 'X-Hive-CDN-Key': cdnAccess.secretAccessToken }, + }); + expect(responseV1.status).toBe(200); + expect(await responseV1.text()).toBe('query { hello }'); + + const responseV2 = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/2.0.0/${sharedHash}`, { + headers: { 'X-Hive-CDN-Key': cdnAccess.secretAccessToken }, + }); + expect(responseV2.status).toBe(200); + expect(await responseV2.text()).toBe('query { hello }'); +}); + +test('v2 CDN lookup falls back to v1 key when v2 key not found', async () => { + // This test verifies that documents deployed with v1 format are still accessible + // after the CDN worker added v2 key lookup (which tries v2 key first, then falls back to v1) + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, createCdnAccess } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + const cdnAccess = await createCdnAccess(); + + // Deploy v1.0.0 using V1 format (legacy) with a custom hash + await execute({ + document: CreateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: AddDocumentsToAppDeploymentWithFormat, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + documents: [ + { + hash: 'my-legacy-hash', + body: 'query { hello }', + }, + ], + format: AppDeploymentFormatType.V1, + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + await execute({ + document: ActivateAppDeployment, + variables: { + input: { + appName: 'my-app', + appVersion: '1.0.0', + }, + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // The CDN worker will try v2 key first (which won't exist), then fall back to v1 key + // This should still succeed because fallback logic preserves v1 document access + const response = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.0.0/my-legacy-hash`, { + method: 'GET', + headers: { + 'X-Hive-CDN-Key': cdnAccess.secretAccessToken, + }, + }); + + expect(response.status).toBe(200); + expect(await response.text()).toBe('query { hello }'); +}); + +test('appDeploymentDocumentHashes returns empty array for app with no previous documents', async () => { + // This test verifies that the delta upload query returns an empty array (not null/error) + // when an app exists but has no documents yet + const { createOrg } = await initSeed().createOwner(); + const { createProject, setFeatureFlag, organization } = await createOrg(); + await setFeatureFlag('appDeployments', true); + const { createTargetAccessToken, project, target } = await createProject(); + const token = await createTargetAccessToken({}); + + await token.publishSchema({ + sdl: /* GraphQL */ ` + type Query { + hello: String + } + `, + }); + + // Query for an app name that has no deployments + const { target: targetResult } = await execute({ + document: GetExistingDocumentHashes, + variables: { + targetSelector: { + organizationSlug: organization.slug, + projectSlug: project.slug, + targetSlug: target.slug, + }, + appName: 'non-existent-app', + }, + authToken: token.secret, + }).then(res => res.expectNoGraphQLErrors()); + + // Should return empty array, not error + expect(targetResult?.appDeploymentDocumentHashes.ok?.hashes).toEqual([]); + expect(targetResult?.appDeploymentDocumentHashes.error).toBeNull(); +}); diff --git a/load-tests/app-deployments/README.md b/load-tests/app-deployments/README.md new file mode 100644 index 00000000000..b9653c3fa22 --- /dev/null +++ b/load-tests/app-deployments/README.md @@ -0,0 +1,40 @@ +# App Deployment Benchmark: V1 vs V2 Format + +Benchmark comparing v1 (legacy) and v2 (SHA256) storage formats for app deployments. + +## Quick Start + +```bash +# Run benchmark +npx tsx load-tests/app-deployments/run-benchmark.ts + +# Run with multiple document counts +DOC_COUNT=1000 npx tsx load-tests/app-deployments/run-benchmark.ts +``` + +## Benchmark Results + +### 1000 Documents + +| Scenario | Docs Uploaded | Parse | Validate | Coords | ClickHouse | S3 | **Total** | +|----------|---------------|-------|----------|--------|------------|-----|-----------| +| V1 Initial | 1000 | 17ms | 401ms | 45ms | 1790ms | 8274ms | **9.1s** | +| V2 Initial | 1000 | 10ms | 276ms | 48ms | 936ms | 7040ms | **7.6s** | +| V2 Delta | 50 (950 skipped) | 1ms | 25ms | 5ms | 51ms | 323ms | **376ms** | + +## Format Comparison + +| Feature | V1 (Legacy) | V2 (SHA256) | +|---------|-------------|-------------| +| Hash format | Any alphanumeric | SHA256 only | +| Hash validation | Format only | Format + content match | +| S3 key | `app/{target}/{name}/{version}/{hash}` | `app-v2/{target}/{name}/{hash}` | +| Cross-version dedup | No | Yes | +| Delta uploads | No | Yes | + +## Environment Variables + +- `DOC_COUNT` - Number of documents in fixture (default: 1000) +- `REGISTRY_ENDPOINT` - GraphQL endpoint (default: http://localhost:3001/graphql) +- `REGISTRY_TOKEN` - Access token (default: local dev token) +- `TARGET` - Target slug (default: the-guild/hive/demo) diff --git a/load-tests/app-deployments/run-benchmark.ts b/load-tests/app-deployments/run-benchmark.ts new file mode 100644 index 00000000000..b48713b6eda --- /dev/null +++ b/load-tests/app-deployments/run-benchmark.ts @@ -0,0 +1,392 @@ +import { spawn } from 'node:child_process'; +import { createHash, randomBytes } from 'node:crypto'; +import { existsSync, mkdirSync, writeFileSync } from 'node:fs'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +const ENDPOINT = process.env.REGISTRY_ENDPOINT ?? 'http://localhost:3001/graphql'; +const TOKEN = process.env.REGISTRY_TOKEN ?? 'd43544cd1400e177c280afdce6876e7f'; +const TARGET = process.env.TARGET ?? 'the-guild/hive/demo'; +const TIMESTAMP = Math.floor(Date.now() / 1000); +const DOC_COUNTS = process.env.DOC_COUNT + ? [parseInt(process.env.DOC_COUNT, 10)] + : [1000]; + +const operations = [ + 'query GetUsers { users { id name } }', + 'query GetWorld { world }', + 'query GetUsersWithAlias { allUsers: users { id name } }', + '{ users { id } }', + 'query UsersAndWorld { users { id } world }', +]; + +function generateRandomHash(): string { + return randomBytes(16).toString('hex'); +} + +function generateSha256Hash(content: string): string { + return createHash('sha256').update(content).digest('hex'); +} + +function generateManifest(count: number, useSha256: boolean): Record { + const manifest: Record = {}; + for (let i = 0; i < count; i++) { + const baseOp = operations[i % operations.length]; + const op = `# op-${i}\n${baseOp}`; + const hash = useSha256 ? generateSha256Hash(op) : generateRandomHash(); + manifest[hash] = op; + } + return manifest; +} + +interface GeneratedFixtures { + v1: { path: string; manifest: Record }; + v2: { path: string; manifest: Record }; + v2Delta: { path: string; manifest: Record }; +} + +function generateFixtures(docCount: number): GeneratedFixtures { + const outputDir = join(__dirname, 'fixtures'); + + if (!existsSync(outputDir)) { + mkdirSync(outputDir, { recursive: true }); + } + + console.log(`Generating fixtures for ${docCount} documents...`); + + const v1Manifest = generateManifest(docCount, false); + const v1Path = join(outputDir, `${docCount}-docs-v1.json`); + writeFileSync(v1Path, JSON.stringify(v1Manifest, null, 2)); + console.log(` v1 (random hashes): ${v1Path}`); + + const v2Manifest = generateManifest(docCount, true); + const v2Path = join(outputDir, `${docCount}-docs-v2.json`); + writeFileSync(v2Path, JSON.stringify(v2Manifest, null, 2)); + console.log(` v2 (sha256 hashes): ${v2Path}`); + + const NEW_DOCS = 50; + const existingCount = docCount - NEW_DOCS; + const v2DeltaManifest: Record = {}; + + const v2Entries = Object.entries(v2Manifest); + for (let i = 0; i < existingCount; i++) { + const [hash, body] = v2Entries[i]; + v2DeltaManifest[hash] = body; + } + + for (let i = 0; i < NEW_DOCS; i++) { + const baseOp = operations[i % operations.length]; + const op = `# op-${docCount + i}\n${baseOp}`; + const hash = generateSha256Hash(op); + v2DeltaManifest[hash] = op; + } + + const v2DeltaPath = join(outputDir, `${docCount}-docs-v2-delta.json`); + writeFileSync(v2DeltaPath, JSON.stringify(v2DeltaManifest, null, 2)); + console.log(` v2-delta (${existingCount} existing + ${NEW_DOCS} new): ${v2DeltaPath}`); + console.log(''); + + return { + v1: { path: v1Path, manifest: v1Manifest }, + v2: { path: v2Path, manifest: v2Manifest }, + v2Delta: { path: v2DeltaPath, manifest: v2DeltaManifest }, + }; +} + +interface TimingResult { + docsUploaded: number; + docsSkipped: number; + totalMs: number; + parseMs: number; + validateMs: number; + coordsMs: number; + clickhouseMs: number; + s3Ms: number; +} + +const CLI_PATH = join(__dirname, '../../packages/libraries/cli/bin/dev'); + +function runCommand(args: string[]): Promise { + return new Promise((resolve, reject) => { + let output = ''; + const child = spawn(CLI_PATH, args, { + stdio: ['inherit', 'pipe', 'inherit'], + shell: true, + }); + + child.stdout?.on('data', (data: Buffer) => { + const text = data.toString(); + output += text; + process.stdout.write(text); + }); + + child.on('close', code => { + if (code === 0) { + const timingRegex = + /Batch timing: (\d+)ms total \((\d+) docs, parse: (\d+)ms, validate: (\d+)ms, coords: (\d+)ms, ch: (\d+)ms, s3: (\d+)ms\)/g; + const timingMatches = [...output.matchAll(timingRegex)]; + + const docsMatch = output.match(/\((\d+) new, (\d+) skipped\)/); + + if (timingMatches.length > 0 && docsMatch) { + const totals = timingMatches.reduce( + (acc, match) => ({ + totalMs: acc.totalMs + parseInt(match[1], 10), + parseMs: acc.parseMs + parseInt(match[3], 10), + validateMs: acc.validateMs + parseInt(match[4], 10), + coordsMs: acc.coordsMs + parseInt(match[5], 10), + clickhouseMs: acc.clickhouseMs + parseInt(match[6], 10), + s3Ms: acc.s3Ms + parseInt(match[7], 10), + }), + { totalMs: 0, parseMs: 0, validateMs: 0, coordsMs: 0, clickhouseMs: 0, s3Ms: 0 }, + ); + + resolve({ + docsUploaded: parseInt(docsMatch[1], 10), + docsSkipped: parseInt(docsMatch[2], 10), + ...totals, + }); + } else { + resolve(null); + } + } else { + reject(new Error(`Command exited with code ${code}`)); + } + }); + + child.on('error', reject); + }); +} + +function runPublishCommand(args: string[]): Promise { + return new Promise((resolve, reject) => { + const child = spawn(CLI_PATH, args, { + stdio: 'inherit', + shell: true, + }); + + child.on('close', code => { + if (code === 0) { + resolve(); + } else { + reject(new Error(`Command exited with code ${code}`)); + } + }); + + child.on('error', reject); + }); +} + +function formatMs(ms: number): string { + if (ms >= 1000) { + return `**${(ms / 1000).toFixed(1)}s**`; + } + return `${ms}ms`; +} + +function printResultsTable( + docCount: number, + results: { v1: TimingResult; v2: TimingResult; v2Delta: TimingResult }, +) { + console.log(''); + console.log(`### ${docCount} Documents`); + console.log(''); + console.log( + '| Scenario | Docs Uploaded | Parse | Validate | Coords | ClickHouse | S3 | **Total** |', + ); + console.log( + '|----------|---------------|-------|----------|--------|------------|-----|-----------|', + ); + + const { v1, v2, v2Delta } = results; + + console.log( + `| V1 Initial | ${v1.docsUploaded} | ${v1.parseMs}ms | ${v1.validateMs}ms | ${v1.coordsMs}ms | ${v1.clickhouseMs}ms | ${v1.s3Ms}ms | ${formatMs(v1.totalMs)} |`, + ); + console.log( + `| V2 Initial | ${v2.docsUploaded} | ${v2.parseMs}ms | ${v2.validateMs}ms | ${v2.coordsMs}ms | ${v2.clickhouseMs}ms | ${v2.s3Ms}ms | ${formatMs(v2.totalMs)} |`, + ); + console.log( + `| V2 Delta | ${v2Delta.docsUploaded} (${v2Delta.docsSkipped} skipped) | ${v2Delta.parseMs}ms | ${v2Delta.validateMs}ms | ${v2Delta.coordsMs}ms | ${v2Delta.clickhouseMs}ms | ${v2Delta.s3Ms}ms | ${formatMs(v2Delta.totalMs)} |`, + ); +} + +async function runBenchmarkForDocCount( + docCount: number, +): Promise<{ v1: TimingResult; v2: TimingResult; v2Delta: TimingResult } | null> { + console.log(''); + console.log(`##########################################`); + console.log(`# Benchmark for ${docCount} Documents`); + console.log(`##########################################`); + console.log(''); + + const fixtures = generateFixtures(docCount); + + const APP_NAME_V1 = `bench-v1-${docCount}-${TIMESTAMP}`; + const APP_NAME_V2 = `bench-v2-${docCount}-${TIMESTAMP}`; + + console.log('=========================================='); + console.log(' V1 Format (Legacy, random hashes)'); + console.log('=========================================='); + console.log(`App: ${APP_NAME_V1}`); + console.log(`Documents: ${Object.keys(fixtures.v1.manifest).length}`); + console.log(''); + + console.log('--- Initial deployment (v1) ---'); + const v1Result = await runCommand([ + 'app:create', + fixtures.v1.path, + '--name', + APP_NAME_V1, + '--version', + 'v1', + '--target', + TARGET, + '--registry.endpoint', + ENDPOINT, + '--registry.accessToken', + TOKEN, + '--format', + 'v1', + '--showTiming', + ]); + + console.log(''); + console.log('--- Publishing (v1) ---'); + await runPublishCommand([ + 'app:publish', + '--name', + APP_NAME_V1, + '--version', + 'v1', + '--target', + TARGET, + '--registry.endpoint', + ENDPOINT, + '--registry.accessToken', + TOKEN, + ]); + + console.log(''); + + console.log('=========================================='); + console.log(' V2 Format (SHA256, cross-version dedup)'); + console.log('=========================================='); + console.log(`App: ${APP_NAME_V2}`); + console.log(`Documents: ${Object.keys(fixtures.v2.manifest).length}`); + console.log(''); + + console.log('--- Initial deployment (v2) ---'); + const v2Result = await runCommand([ + 'app:create', + fixtures.v2.path, + '--name', + APP_NAME_V2, + '--version', + 'v1', + '--target', + TARGET, + '--registry.endpoint', + ENDPOINT, + '--registry.accessToken', + TOKEN, + '--format', + 'v2', + '--showTiming', + ]); + + console.log(''); + console.log('--- Publishing (v2) ---'); + await runPublishCommand([ + 'app:publish', + '--name', + APP_NAME_V2, + '--version', + 'v1', + '--target', + TARGET, + '--registry.endpoint', + ENDPOINT, + '--registry.accessToken', + TOKEN, + ]); + + console.log(''); + console.log('Waiting for ClickHouse sync...'); + await new Promise(resolve => setTimeout(resolve, 1000)); + + console.log('=========================================='); + console.log(' V2 Delta Upload (50 new, rest skipped)'); + console.log('=========================================='); + console.log(`App: ${APP_NAME_V2} (version v2)`); + console.log(`Documents: ${Object.keys(fixtures.v2Delta.manifest).length}`); + console.log(''); + + console.log('--- Delta deployment (v2, 50 new + rest skipped) ---'); + const v2DeltaResult = await runCommand([ + 'app:create', + fixtures.v2Delta.path, + '--name', + APP_NAME_V2, + '--version', + 'v2', + '--target', + TARGET, + '--registry.endpoint', + ENDPOINT, + '--registry.accessToken', + TOKEN, + '--format', + 'v2', + '--showTiming', + ]); + + if (v1Result && v2Result && v2DeltaResult) { + return { v1: v1Result, v2: v2Result, v2Delta: v2DeltaResult }; + } + return null; +} + +async function main(): Promise { + console.log('=========================================='); + console.log(' App Deployment Benchmark: v1 vs v2'); + console.log('=========================================='); + console.log(`Endpoint: ${ENDPOINT}`); + console.log(`Target: ${TARGET}`); + console.log(`Document counts: ${DOC_COUNTS.join(', ')}`); + + const allResults: Array<{ + docCount: number; + results: { v1: TimingResult; v2: TimingResult; v2Delta: TimingResult }; + }> = []; + + for (const docCount of DOC_COUNTS) { + const results = await runBenchmarkForDocCount(docCount); + if (results) { + allResults.push({ docCount, results }); + } + } + + console.log(''); + console.log('=========================================='); + console.log(' Benchmark Complete'); + console.log('=========================================='); + + if (allResults.length > 0) { + console.log(''); + console.log('## Benchmark Results'); + for (const { docCount, results } of allResults) { + printResultsTable(docCount, results); + } + } else { + console.log(''); + console.log('Warning: Could not parse timing results from commands.'); + } +} + +main().catch(err => { + console.error(err); + process.exit(1); +}); diff --git a/packages/libraries/cli/src/commands/app/create.ts b/packages/libraries/cli/src/commands/app/create.ts index e326a36a5a0..4c0c18b679b 100644 --- a/packages/libraries/cli/src/commands/app/create.ts +++ b/packages/libraries/cli/src/commands/app/create.ts @@ -1,8 +1,9 @@ +import { createHash } from 'crypto'; import { z } from 'zod'; import { Args, Flags } from '@oclif/core'; import Command from '../../base-command'; import { graphql } from '../../gql'; -import { AppDeploymentStatus } from '../../gql/graphql'; +import { AppDeploymentFormatType, AppDeploymentStatus } from '../../gql/graphql'; import * as GraphQLSchema from '../../gql/graphql'; import { graphqlEndpoint } from '../../helpers/config'; import { @@ -37,6 +38,16 @@ export default class AppCreate extends Command { ' This can either be a slug following the format "$organizationSlug/$projectSlug/$targetSlug" (e.g "the-guild/graphql-hive/staging")' + ' or an UUID (e.g. "a0f4c605-6541-4350-8cfe-b31f21a4bf80").', }), + showTiming: Flags.boolean({ + description: 'Show timing breakdown for each batch', + default: false, + }), + format: Flags.string({ + description: + 'Storage format version. "v1" (default) uses per-version storage and allows any hash format. "v2" enables cross-version deduplication and requires sha256 hashes.', + default: 'v1', + options: ['v1', 'v2'], + }), }; static args = { @@ -50,6 +61,7 @@ export default class AppCreate extends Command { async run() { const { flags, args } = await this.parse(AppCreate); + const startTime = Date.now(); let endpoint: string, accessToken: string; try { @@ -93,6 +105,50 @@ export default class AppCreate extends Command { throw new PersistedOperationsMalformedError(file); } + // Validate hashes are sha256 and match content (unless v1 format) + if (flags.format !== 'v1') { + const sha256Regex = /^(sha256:)?[a-f0-9]{64}$/i; + const invalidFormatHashes: string[] = []; + const mismatchedHashes: Array<{ hash: string; expected: string }> = []; + + for (const [hash, body] of Object.entries(validationResult.data)) { + if (!sha256Regex.test(hash)) { + invalidFormatHashes.push(hash); + } else { + // Verify hash matches content + const computedHash = createHash('sha256').update(body).digest('hex'); + const providedHash = hash.replace(/^sha256:/i, '').toLowerCase(); + if (computedHash !== providedHash) { + mismatchedHashes.push({ hash: providedHash, expected: computedHash }); + } + } + } + + if (invalidFormatHashes.length > 0) { + const examples = invalidFormatHashes.slice(0, 3).join(', '); + const more = + invalidFormatHashes.length > 3 ? ` (and ${invalidFormatHashes.length - 3} more)` : ''; + throw new APIError( + `Invalid hash format detected: ${examples}${more}\n` + + `Hashes must be sha256 (64 hexadecimal characters, optionally prefixed with "sha256:").\n` + + `This is required for safe cross-version document deduplication.\n` + + `Use --format=v1 to bypass this check (disables cross-version deduplication).`, + ); + } + + if (mismatchedHashes.length > 0) { + const example = mismatchedHashes[0]; + const more = + mismatchedHashes.length > 1 ? ` (and ${mismatchedHashes.length - 1} more)` : ''; + throw new APIError( + `Hash does not match document content${more}.\n` + + `Provided: ${example.hash}\n` + + `Expected: ${example.expected}\n` + + `Ensure your manifest uses sha256 hash of the raw document body.`, + ); + } + } + const result = await this.registryApi(endpoint, accessToken).request({ operation: CreateAppDeploymentMutation, variables: { @@ -119,10 +175,57 @@ export default class AppCreate extends Command { return; } - const totalDocuments = Object.keys(validationResult.data).length; + const allDocuments = Object.entries(validationResult.data); + const totalDocuments = allDocuments.length; + + // Fetch existing hashes for delta upload + let existingHashes = new Set(); + if (flags.format !== 'v1') { + if (!target) { + throw new APIError( + 'The --target flag is required when using --format=v2 for delta optimization.', + ); + } + const hashesResult = await this.registryApi(endpoint, accessToken).request({ + operation: GetExistingDocumentHashesQuery, + variables: { + target, + appName: flags['name'], + }, + }); + + if (hashesResult.target?.appDeploymentDocumentHashes.error) { + this.logWarning( + `Could not fetch existing document hashes: ${hashesResult.target.appDeploymentDocumentHashes.error.message}. Delta optimization disabled.`, + ); + } else if (hashesResult.target?.appDeploymentDocumentHashes.ok?.hashes) { + existingHashes = new Set(hashesResult.target.appDeploymentDocumentHashes.ok.hashes); + if (flags.showTiming) { + this.log(`Found ${existingHashes.size} existing documents (will skip)`); + } + } else if (!hashesResult.target) { + this.logWarning( + `Target not found when fetching existing hashes. Delta optimization disabled.`, + ); + } + } + + // Filter out already-existing documents + const newDocuments = allDocuments.filter(([hash]) => !existingHashes.has(hash)); + const skippedCount = totalDocuments - newDocuments.length; + + if (newDocuments.length === 0) { + this.log( + `App deployment "${flags['name']}@${flags['version']}" - all ${totalDocuments} documents already exist. Nothing to upload.`, + ); + this.log( + `Note: The deployment is still in "pending" status. Run "hive app:publish --name=${flags['name']} --version=${flags['version']}" to activate it.`, + ); + return; + } this.log( - `App deployment "${flags['name']}@${flags['version']}" is created pending document upload. Uploading documents...`, + `App deployment "${flags['name']}@${flags['version']}" is created pending document upload. Uploading ${newDocuments.length} new documents (${skippedCount} already exist)...`, ); let buffer: Array<{ hash: string; body: string }> = []; @@ -139,6 +242,7 @@ export default class AppCreate extends Command { appName: flags['name'], appVersion: flags['version'], documents: buffer, + format: flags.format === 'v1' ? AppDeploymentFormatType.V1 : AppDeploymentFormatType.V2, }, }, }); @@ -166,18 +270,26 @@ export default class AppCreate extends Command { } throw new APIError(result.addDocumentsToAppDeployment.error.message); } + + if (flags.showTiming && result.addDocumentsToAppDeployment.ok?.timing) { + const t = result.addDocumentsToAppDeployment.ok.timing; + this.log( + ` Batch timing: ${t.totalMs}ms total (${t.documentsProcessed} docs, parse: ${t.parsingMs}ms, validate: ${t.validationMs}ms, coords: ${t.coordinateExtractionMs}ms, ch: ${t.clickhouseMs}ms, s3: ${t.s3Ms}ms)`, + ); + } + buffer = []; // don't bother showing 100% since there's another log line when it's done. And for deployments with just a few docs, showing this progress is unnecessary. - if (counter !== totalDocuments) { + if (counter !== newDocuments.length) { this.log( - `${counter} / ${totalDocuments} (${Math.round((100.0 * counter) / totalDocuments)}%) documents uploaded...`, + `${counter} / ${newDocuments.length} (${Math.round((100.0 * counter) / newDocuments.length)}%) documents uploaded...`, ); } } }; - for (const [hash, body] of Object.entries(validationResult.data)) { + for (const [hash, body] of newDocuments) { buffer.push({ hash, body }); counter++; await flush(); @@ -185,8 +297,13 @@ export default class AppCreate extends Command { await flush(true); + if (flags.showTiming) { + const totalTime = Date.now() - startTime; + this.log(`Total time: ${totalTime}ms`); + } + this.log( - `\nApp deployment "${flags['name']}@${flags['version']}" (${counter} operations) created.\nActive it with the "hive app:publish" command.`, + `\nApp deployment "${flags['name']}@${flags['version']}" (${counter} new, ${skippedCount} skipped) created.\nActivate it with the "hive app:publish" command.`, ); } } @@ -221,6 +338,15 @@ const AddDocumentsToAppDeploymentMutation = graphql(/* GraphQL */ ` version status } + timing { + totalMs + parsingMs + validationMs + coordinateExtractionMs + clickhouseMs + s3Ms + documentsProcessed + } } error { message @@ -233,3 +359,18 @@ const AddDocumentsToAppDeploymentMutation = graphql(/* GraphQL */ ` } } `); + +const GetExistingDocumentHashesQuery = graphql(/* GraphQL */ ` + query GetExistingDocumentHashes($target: TargetReferenceInput!, $appName: String!) { + target(reference: $target) { + appDeploymentDocumentHashes(appName: $appName) { + ok { + hashes + } + error { + message + } + } + } + } +`); diff --git a/packages/services/api/src/modules/app-deployments/module.graphql.ts b/packages/services/api/src/modules/app-deployments/module.graphql.ts index 0efbf6e2ccb..0aca7108325 100644 --- a/packages/services/api/src/modules/app-deployments/module.graphql.ts +++ b/packages/services/api/src/modules/app-deployments/module.graphql.ts @@ -32,6 +32,22 @@ export default gql` retired } + """ + Storage format for app deployment documents. + """ + enum AppDeploymentFormatType { + """ + V1 format: version-scoped storage. Each version stores documents separately. + Allows any hash format. No cross-version deduplication. + """ + V1 + """ + V2 format: content-addressed storage with SHA256 hashes. + Enables cross-version deduplication and delta uploads. + """ + V2 + } + type GraphQLDocumentConnection { pageInfo: PageInfo! edges: [GraphQLDocumentEdge!]! @@ -118,6 +134,27 @@ export default gql` after: String @tag(name: "public") filter: ActiveAppDeploymentsFilter! @tag(name: "public") ): AppDeploymentConnection! @tag(name: "public") + """ + Get list of document hashes that already exist for this app (across all versions). + Used for delta uploads - CLI can skip uploading documents that already exist. + """ + appDeploymentDocumentHashes(appName: String!): AppDeploymentDocumentHashesResult! + } + + type AppDeploymentDocumentHashesResult { + ok: AppDeploymentDocumentHashesOk + error: AppDeploymentDocumentHashesError + } + + type AppDeploymentDocumentHashesOk { + """ + List of document hashes that already exist for this app. + """ + hashes: [String!]! + } + + type AppDeploymentDocumentHashesError implements Error { + message: String! } extend type Mutation { @@ -250,6 +287,11 @@ export default gql` A list of operations to add to the app deployment. (max 100 per single batch) """ documents: [DocumentInput!]! + """ + Storage format for documents. Defaults to V1 for backwards compatibility. + V2 enables cross-version deduplication and delta uploads (requires SHA256 hashes). + """ + format: AppDeploymentFormatType } type AddDocumentsToAppDeploymentErrorDetails { @@ -271,8 +313,43 @@ export default gql` details: AddDocumentsToAppDeploymentErrorDetails } + """ + Timing breakdown for document upload operations. All time values are in milliseconds. + """ + type DocumentUploadTiming { + """ + Total elapsed time for the entire batch processing. + """ + totalMs: Int! + """ + Time spent parsing GraphQL documents. + """ + parsingMs: Int! + """ + Time spent validating documents against the schema. + """ + validationMs: Int! + """ + Time spent extracting schema coordinates from documents. + """ + coordinateExtractionMs: Int! + """ + Time spent inserting records into ClickHouse. + """ + clickhouseMs: Int! + """ + Time spent uploading documents to S3. + """ + s3Ms: Int! + """ + Number of documents successfully processed. + """ + documentsProcessed: Int! + } + type AddDocumentsToAppDeploymentOk { appDeployment: AppDeployment! + timing: DocumentUploadTiming } type AddDocumentsToAppDeploymentResult { diff --git a/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts b/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts index 3403d4a38f1..d92f76a64af 100644 --- a/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts +++ b/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts @@ -104,6 +104,7 @@ export class AppDeploymentsManager { hash: string; body: string; }>; + isV1Format: boolean; }) { const selector = await this.idTranslator.resolveTargetReference({ reference: args.reference, @@ -130,6 +131,7 @@ export class AppDeploymentsManager { targetId: selector.targetId, appDeployment: args.appDeployment, operations: args.documents, + isV1Format: args.isV1Format, }); } @@ -276,4 +278,35 @@ export class AppDeploymentsManager { return appDeploymentIds.map(id => Promise.resolve(dateMap.get(id) ?? null)); }); + + async getExistingDocumentHashes(args: { + organizationId: string; + projectId: string; + targetId: string; + appName: string; + }): Promise< + | { type: 'success'; hashes: string[] } + | { type: 'error'; error: { message: string } } + > { + await this.session.assertPerformAction({ + action: 'appDeployment:create', + organizationId: args.organizationId, + params: { + organizationId: args.organizationId, + projectId: args.projectId, + targetId: args.targetId, + appDeploymentName: args.appName, + }, + }); + + const hashes = await this.appDeployments.getExistingDocumentHashes({ + targetId: args.targetId, + appName: args.appName, + }); + + return { + type: 'success', + hashes, + }; + } } diff --git a/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts b/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts index f8258cc7113..64eaf28576f 100644 --- a/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts +++ b/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts @@ -254,6 +254,7 @@ export class AppDeployments { hash: string; body: string; }>; + isV1Format: boolean; }) { if (this.appDeploymentsEnabled === false) { const organization = await this.storage.getOrganization({ @@ -344,6 +345,7 @@ export class AppDeployments { version: args.appDeployment.version, }, documents: args.operations, + isV1Format: args.isV1Format, }); if (result.type === 'error') { @@ -352,11 +354,18 @@ export class AppDeployments { error: result.error, }; } + + return { + type: 'success' as const, + appDeployment, + timing: result.timing, + }; } return { type: 'success' as const, appDeployment, + timing: null, }; } @@ -1543,6 +1552,58 @@ export class AppDeployments { appDeploymentOperations: appDeploymentOps, }; } + + async getExistingDocumentHashes(args: { + targetId: string; + appName: string; + }): Promise { + this.logger.debug( + 'get existing document hashes (targetId=%s, appName=%s)', + args.targetId, + args.appName, + ); + + let result; + try { + result = await this.clickhouse.query({ + query: cSql` + SELECT document_hash AS hash + FROM app_deployment_documents + PREWHERE app_deployment_id IN ( + SELECT app_deployment_id + FROM app_deployments + PREWHERE target_id = ${args.targetId} + AND app_name = ${args.appName} + GROUP BY app_deployment_id + ) + GROUP BY document_hash + `, + queryId: 'get-existing-document-hashes', + timeout: 30_000, + }); + } catch (error) { + this.logger.error( + 'Failed to query existing document hashes from ClickHouse (targetId=%s, appName=%s): %s', + args.targetId, + args.appName, + error instanceof Error ? error.message : String(error), + ); + throw error; + } + + const model = z.array(z.object({ hash: z.string() })); + const parsed = model.parse(result.data); + const hashes = parsed.map(row => row.hash); + + this.logger.debug( + 'found %d existing document hashes (targetId=%s, appName=%s)', + hashes.length, + args.targetId, + args.appName, + ); + + return hashes; + } } const appDeploymentFields = sql` diff --git a/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts b/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts index a47aa777737..38303be9d57 100644 --- a/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts +++ b/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts @@ -1,12 +1,31 @@ +import { createHash } from 'crypto'; import { buildSchema, DocumentNode, GraphQLError, Kind, parse, TypeInfo, validate } from 'graphql'; import PromiseQueue from 'p-queue'; import { z } from 'zod'; import { collectSchemaCoordinates, preprocessOperation } from '@graphql-hive/core'; -import { buildOperationS3BucketKey } from '@hive/cdn-script/artifact-storage-reader'; +import { + buildOperationS3BucketKey, + buildOperationS3BucketKeyV2, +} from '@hive/cdn-script/artifact-storage-reader'; import { ServiceLogger } from '@hive/service-common'; import { sql as c_sql, ClickHouse } from '../../operations/providers/clickhouse-client'; import { S3Config } from '../../shared/providers/s3-config'; +const parseS3Concurrency = (): number => { + const value = process.env.S3_UPLOAD_CONCURRENCY; + if (!value) return 100; + + const parsed = parseInt(value, 10); + if (isNaN(parsed) || parsed < 1) { + throw new Error( + `Invalid S3_UPLOAD_CONCURRENCY: "${value}". Must be a positive integer.`, + ); + } + return parsed; +}; + +const S3_UPLOAD_CONCURRENCY = parseS3Concurrency(); + type DocumentRecord = { appDeploymentId: string; /** hash as provided by the user */ @@ -18,7 +37,28 @@ type DocumentRecord = { schemaCoordinates: Array; }; +export type ProcessingTiming = { + totalMs: number; + parsingMs: number; + validationMs: number; + coordinateExtractionMs: number; + clickhouseMs: number; + s3Ms: number; + documentsProcessed: number; +}; + + const AppDeploymentOperationHashModel = z + .string() + .trim() + .regex( + /^(sha256:)?[a-f0-9]{64}$/i, + 'Hash must be a sha256 hash (64 hexadecimal characters, optionally prefixed with "sha256:"). ' + + 'This is required for safe cross-version document deduplication.', + ); + + +const AppDeploymentOperationHashModelLegacy = z .string() .trim() .min(1, 'Hash must be at least 1 characters long') @@ -42,6 +82,7 @@ export type BatchProcessEvent = { version: string; }; documents: ReadonlyArray<{ hash: string; body: string }>; + isV1Format?: boolean; }; }; @@ -63,6 +104,7 @@ export type BatchProcessedEvent = { } | { type: 'success'; + timing: ProcessingTiming; }; }; @@ -87,7 +129,7 @@ export type OnDocumentsPersistedCallback = ( ) => Promise; export class PersistedDocumentIngester { - private promiseQueue = new PromiseQueue({ concurrency: 30 }); + private promiseQueue = new PromiseQueue({ concurrency: S3_UPLOAD_CONCURRENCY }); private logger: ServiceLogger; constructor( @@ -107,13 +149,23 @@ export class PersistedDocumentIngester { data.documents.length, ); + const timingStart = performance.now(); + let parsingMs = 0; + let validationMs = 0; + let coordinateExtractionMs = 0; + const schema = buildSchema(data.schemaSdl); const typeInfo = new TypeInfo(schema); const documents: Array = []; + // Use different hash validation based on format (V1 allows any hash, V2 requires sha256) + const hashModel = data.isV1Format + ? AppDeploymentOperationHashModelLegacy + : AppDeploymentOperationHashModel; + let index = 0; for (const operation of data.documents) { - const hashValidation = AppDeploymentOperationHashModel.safeParse(operation.hash); + const hashValidation = hashModel.safeParse(operation.hash); const bodyValidation = AppDeploymentOperationBodyModel.safeParse(operation.body); if (hashValidation.success === false || bodyValidation.success === false) { @@ -127,7 +179,6 @@ export class PersistedDocumentIngester { return { type: 'error' as const, error: { - // TODO: we should add more details (what hash is affected etc.) message: 'Invalid input, please check the operations.', details: { index, @@ -139,13 +190,41 @@ export class PersistedDocumentIngester { }, }; } + + // For v2 format, verify hash matches content (sha256 of body) + if (!data.isV1Format) { + const computedHash = createHash('sha256').update(operation.body).digest('hex'); + const providedHash = operation.hash.replace(/^sha256:/i, '').toLowerCase(); + + if (computedHash !== providedHash) { + this.logger.debug( + 'Hash does not match document content. (targetId=%s, appDeploymentId=%s, operationIndex=%d)', + data.targetId, + data.appDeployment.id, + index, + ); + + return { + type: 'error' as const, + error: { + message: 'Hash does not match document content.', + details: { + index, + message: `Expected sha256: ${computedHash}, got: ${providedHash}`, + }, + }, + }; + } + } + let documentNode: DocumentNode; + const parseStart = performance.now(); try { documentNode = parse(operation.body); } catch (err) { if (err instanceof GraphQLError) { - console.error(err); this.logger.debug( + { err }, 'Failed parsing GraphQL operation. (targetId=%s, appDeploymentId=%s, operationIndex=%d)', data.targetId, data.appDeployment.id, @@ -165,9 +244,12 @@ export class PersistedDocumentIngester { } throw err; } + parsingMs += performance.now() - parseStart; + const validateStart = performance.now(); const errors = validate(schema, documentNode, undefined, { maxErrors: 1, }); + validationMs += performance.now() - validateStart; if (errors.length > 0) { this.logger.debug( @@ -206,6 +288,7 @@ export class PersistedDocumentIngester { const operationName = operationNames[0] ?? null; + const coordsStart = performance.now(); const schemaCoordinates = collectSchemaCoordinates({ documentNode, processVariables: false, @@ -213,6 +296,7 @@ export class PersistedDocumentIngester { schema, typeInfo, }); + coordinateExtractionMs += performance.now() - coordsStart; const normalizedOperation = preprocessOperation({ document: documentNode, @@ -232,6 +316,9 @@ export class PersistedDocumentIngester { index++; } + let clickhouseMs = 0; + let s3Ms = 0; + if (documents.length) { this.logger.debug( 'inserting documents into clickhouse and s3. (targetId=%s, appDeployment=%s, documentCount=%d)', @@ -240,15 +327,29 @@ export class PersistedDocumentIngester { documents.length, ); - await this.insertDocuments({ + const timing = await this.insertDocuments({ targetId: data.targetId, appDeployment: data.appDeployment, documents: documents, + isV1Format: data.isV1Format, }); + clickhouseMs = timing.clickhouseMs; + s3Ms = timing.s3Ms; } + const totalMs = performance.now() - timingStart; + return { type: 'success' as const, + timing: { + totalMs: Math.round(totalMs), + parsingMs: Math.round(parsingMs), + validationMs: Math.round(validationMs), + coordinateExtractionMs: Math.round(coordinateExtractionMs), + clickhouseMs: Math.round(clickhouseMs), + s3Ms: Math.round(s3Ms), + documentsProcessed: documents.length, + }, }; } @@ -306,44 +407,70 @@ export class PersistedDocumentIngester { version: string; }; documents: Array; + isV1Format?: boolean; }) { this.logger.debug( - 'Inserting documents into S3. (targetId=%s, appDeployment=%s, documentCount=%d)', + 'Inserting documents into S3. (targetId=%s, appDeployment=%s, documentCount=%d, isV1Format=%s)', args.targetId, args.appDeployment.id, args.documents.length, + args.isV1Format ?? false, ); /** We parallelize and queue the requests. */ const tasks: Array> = []; for (const document of args.documents) { - const s3Key = buildOperationS3BucketKey( - args.targetId, - args.appDeployment.name, - args.appDeployment.version, - document.hash, - ); + const s3Key = args.isV1Format + ? buildOperationS3BucketKey( + args.targetId, + args.appDeployment.name, + args.appDeployment.version, + document.hash, + ) + : buildOperationS3BucketKeyV2(args.targetId, args.appDeployment.name, document.hash); tasks.push( this.promiseQueue.add(async () => { for (const s3 of this.s3) { - const response = await s3.client.fetch([s3.endpoint, s3.bucket, s3Key].join('/'), { - method: 'PUT', - headers: { - 'content-type': 'text/plain', - }, - body: document.body, - aws: { - // This boolean makes Google Cloud Storage & AWS happy. - signQuery: true, - }, - }); - - if (response.statusCode !== 200) { - throw new Error( - `Failed to upload operation to S3: [${response.statusCode}] ${response.statusMessage}`, + try { + const response = await s3.client.fetch([s3.endpoint, s3.bucket, s3Key].join('/'), { + method: 'PUT', + headers: { + 'content-type': 'text/plain', + }, + body: document.body, + aws: { + // This boolean makes Google Cloud Storage & AWS happy. + signQuery: true, + }, + }); + + if (response.statusCode !== 200) { + this.logger.error( + { statusCode: response.statusCode, statusMessage: response.statusMessage, s3Key }, + 'Failed to upload document to S3. (targetId=%s, appDeploymentId=%s, documentHash=%s, endpoint=%s)', + args.targetId, + args.appDeployment.id, + document.hash, + s3.endpoint, + ); + throw new Error( + `Failed to upload operation to S3: [${response.statusCode}] ${response.statusMessage} (key: ${s3Key})`, + ); + } + } catch (err) { + if (err instanceof Error && err.message.includes('Failed to upload operation')) { + throw err; + } + this.logger.error( + { err, s3Key }, + 'S3 upload failed unexpectedly. (targetId=%s, appDeploymentId=%s, documentHash=%s)', + args.targetId, + args.appDeployment.id, + document.hash, ); + throw err; } } }), @@ -375,13 +502,14 @@ export class PersistedDocumentIngester { args.appDeployment.id, docsForCache.length, ); - } catch (error) { + } catch (err) { // Don't fail the deployment for cache prefill failures - this.logger.warn( - { error }, - 'Cache prefill callback failed. (targetId=%s, appDeployment=%s)', + this.logger.error( + { err }, + 'Cache prefill callback failed. (targetId=%s, appDeployment=%s, documentCount=%d)', args.targetId, args.appDeployment.id, + docsForCache.length, ); } } @@ -396,12 +524,22 @@ export class PersistedDocumentIngester { version: string; }; documents: Array; - }) { - await Promise.all([ - // prettier-ignore - this.insertClickHouseDocuments(args), - this.insertS3Documents(args), + isV1Format?: boolean; + }): Promise<{ clickhouseMs: number; s3Ms: number }> { + const [clickhouseMs, s3Ms] = await Promise.all([ + (async () => { + const start = performance.now(); + await this.insertClickHouseDocuments(args); + return performance.now() - start; + })(), + (async () => { + const start = performance.now(); + await this.insertS3Documents({ ...args, isV1Format: args.isV1Format }); + return performance.now() - start; + })(), ]); + + return { clickhouseMs, s3Ms }; } } diff --git a/packages/services/api/src/modules/app-deployments/resolvers/Mutation/addDocumentsToAppDeployment.ts b/packages/services/api/src/modules/app-deployments/resolvers/Mutation/addDocumentsToAppDeployment.ts index e5d091d51d7..8423f1e18b9 100644 --- a/packages/services/api/src/modules/app-deployments/resolvers/Mutation/addDocumentsToAppDeployment.ts +++ b/packages/services/api/src/modules/app-deployments/resolvers/Mutation/addDocumentsToAppDeployment.ts @@ -11,6 +11,7 @@ export const addDocumentsToAppDeployment: NonNullable< version: input.appVersion, }, documents: input.documents, + isV1Format: input.format !== 'V2', }); if (result.type === 'error') { @@ -27,6 +28,7 @@ export const addDocumentsToAppDeployment: NonNullable< error: null, ok: { appDeployment: result.appDeployment, + timing: result.timing, }, }; }; diff --git a/packages/services/api/src/modules/app-deployments/resolvers/Target.ts b/packages/services/api/src/modules/app-deployments/resolvers/Target.ts index b8a5830738d..9af2f53dfe0 100644 --- a/packages/services/api/src/modules/app-deployments/resolvers/Target.ts +++ b/packages/services/api/src/modules/app-deployments/resolvers/Target.ts @@ -14,7 +14,11 @@ import type { TargetResolvers } from './../../../__generated__/types'; */ export const Target: Pick< TargetResolvers, - 'activeAppDeployments' | 'appDeployment' | 'appDeployments' | 'viewerCanViewAppDeployments' + | 'activeAppDeployments' + | 'appDeployment' + | 'appDeploymentDocumentHashes' + | 'appDeployments' + | 'viewerCanViewAppDeployments' > = { /* Implement Target resolver logic here */ appDeployment: async (target, args, { injector }) => { @@ -53,4 +57,28 @@ export const Target: Pick< }, }); }, + appDeploymentDocumentHashes: async (target, args, { injector }) => { + const result = await injector.get(AppDeploymentsManager).getExistingDocumentHashes({ + organizationId: target.orgId, + projectId: target.projectId, + targetId: target.id, + appName: args.appName, + }); + + if (result.type === 'error') { + return { + ok: null, + error: { + message: result.error.message, + }, + }; + } + + return { + ok: { + hashes: result.hashes, + }, + error: null, + }; + }, }; diff --git a/packages/services/cdn-worker/src/artifact-storage-reader.ts b/packages/services/cdn-worker/src/artifact-storage-reader.ts index 51bfcb64df6..83943771717 100644 --- a/packages/services/cdn-worker/src/artifact-storage-reader.ts +++ b/packages/services/cdn-worker/src/artifact-storage-reader.ts @@ -41,6 +41,18 @@ export function buildOperationS3BucketKey( return ['app', ...OperationS3BucketKeyModel.parse(args)].join('/'); } +const OperationS3BucketKeyV2Model = zod.tuple([ + zod.string().uuid(), + zod.string().min(1), + zod.string().min(1), +]); + +export function buildOperationS3BucketKeyV2( + ...args: [targetId: string, appName: string, hash: string] +) { + return ['app-v2', ...OperationS3BucketKeyV2Model.parse(args)].join('/'); +} + const AppDeploymentIsEnabledKeyModel = zod.tuple([ zod.string().uuid(), zod.string().min(1), @@ -360,25 +372,26 @@ export class ArtifactStorageReader { hash: string, etagValue: string | null, ) { - const key = buildOperationS3BucketKey(targetId, appName, appVersion, hash); - const headers: Record = {}; if (etagValue) { headers['if-none-match'] = etagValue; } - const response = await this.request({ - key, + // Try v2 key first + const keyV2 = buildOperationS3BucketKeyV2(targetId, appName, hash); + + const responseV2 = await this.request({ + key: keyV2, method: 'GET', headers, onAttempt: args => { if (args.result.type === 'error') { this.breadcrumb( - `Fetch attempt failed (source=${args.isMirror ? 'mirror' : 'primary'}, attempt=${args.attempt} duration=${args.duration}, result=${args.result.type}, key=${key}, message=${args.result.error.message})`, + `Fetch attempt failed (source=${args.isMirror ? 'mirror' : 'primary'}, attempt=${args.attempt} duration=${args.duration}, result=${args.result.type}, key=${keyV2}, message=${args.result.error.message})`, ); } else { this.breadcrumb( - `Fetch attempt succeeded (source=${args.isMirror ? 'mirror' : 'primary'}, attempt=${args.attempt} duration=${args.duration}, result=${args.result.type}, key=${key})`, + `Fetch attempt succeeded (source=${args.isMirror ? 'mirror' : 'primary'}, attempt=${args.attempt} duration=${args.duration}, result=${args.result.type}, key=${keyV2})`, ); } this.analytics?.track( @@ -396,24 +409,73 @@ export class ArtifactStorageReader { }, }); - if (etagValue && response.status === 304) { + if (etagValue && responseV2.status === 304) { return { type: 'notModified' } as const; } - if (response.status === 200) { - const body = await response.text(); + if (responseV2.status === 200) { + const body = await responseV2.text(); return { type: 'body', body, } as const; } - if (response.status === 404) { - return { type: 'notFound' } as const; + // Fallback to v1 key + if (responseV2.status === 404) { + const keyV1 = buildOperationS3BucketKey(targetId, appName, appVersion, hash); + + const responseV1 = await this.request({ + key: keyV1, + method: 'GET', + headers, + onAttempt: args => { + if (args.result.type === 'error') { + this.breadcrumb( + `Fetch attempt failed (source=${args.isMirror ? 'mirror' : 'primary'}, attempt=${args.attempt} duration=${args.duration}, result=${args.result.type}, key=${keyV1}, message=${args.result.error.message})`, + ); + } else { + this.breadcrumb( + `Fetch attempt succeeded (source=${args.isMirror ? 'mirror' : 'primary'}, attempt=${args.attempt} duration=${args.duration}, result=${args.result.type}, key=${keyV1})`, + ); + } + this.analytics?.track( + { + type: args.isMirror ? 's3' : 'r2', + statusCodeOrErrCode: + args.result.type === 'error' + ? String(args.result.error.name ?? 'unknown') + : args.result.response.status, + action: 'GET persistedOperation', + duration: args.duration, + }, + targetId, + ); + }, + }); + + if (etagValue && responseV1.status === 304) { + return { type: 'notModified' } as const; + } + + if (responseV1.status === 200) { + const body = await responseV1.text(); + return { + type: 'body', + body, + } as const; + } + + if (responseV1.status === 404) { + return { type: 'notFound' } as const; + } + + const body = await responseV1.text(); + throw new Error(`GET request failed with status ${responseV1.status}: ${body}`); } - const body = await response.text(); - throw new Error(`HEAD request failed with status ${response.status}: ${body}`); + const body = await responseV2.text(); + throw new Error(`GET request failed with status ${responseV2.status}: ${body}`); } async readLegacyAccessKey(targetId: string) { From 4b475b9ad0a842d584f0f8cf79f7140783ef7752 Mon Sep 17 00:00:00 2001 From: Adam Benhassen Date: Tue, 27 Jan 2026 04:24:15 +0100 Subject: [PATCH 2/4] inject S3_UPLOAD_CONCURRENCY via env config --- .../providers/persisted-document-ingester.ts | 19 ++++--------------- .../worker/persisted-documents-worker.ts | 3 +++ packages/services/server/src/environment.ts | 2 ++ 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts b/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts index 38303be9d57..97a75abe331 100644 --- a/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts +++ b/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts @@ -11,20 +11,7 @@ import { ServiceLogger } from '@hive/service-common'; import { sql as c_sql, ClickHouse } from '../../operations/providers/clickhouse-client'; import { S3Config } from '../../shared/providers/s3-config'; -const parseS3Concurrency = (): number => { - const value = process.env.S3_UPLOAD_CONCURRENCY; - if (!value) return 100; - - const parsed = parseInt(value, 10); - if (isNaN(parsed) || parsed < 1) { - throw new Error( - `Invalid S3_UPLOAD_CONCURRENCY: "${value}". Must be a positive integer.`, - ); - } - return parsed; -}; - -const S3_UPLOAD_CONCURRENCY = parseS3Concurrency(); +const DEFAULT_S3_UPLOAD_CONCURRENCY = 100; type DocumentRecord = { appDeploymentId: string; @@ -129,7 +116,7 @@ export type OnDocumentsPersistedCallback = ( ) => Promise; export class PersistedDocumentIngester { - private promiseQueue = new PromiseQueue({ concurrency: S3_UPLOAD_CONCURRENCY }); + private promiseQueue: PromiseQueue; private logger: ServiceLogger; constructor( @@ -137,8 +124,10 @@ export class PersistedDocumentIngester { private s3: S3Config, logger: ServiceLogger, private onDocumentsPersisted?: OnDocumentsPersistedCallback, + s3UploadConcurrency: number = DEFAULT_S3_UPLOAD_CONCURRENCY, ) { this.logger = logger.child({ source: 'PersistedDocumentIngester' }); + this.promiseQueue = new PromiseQueue({ concurrency: s3UploadConcurrency }); } async processBatch(data: BatchProcessEvent['data']) { diff --git a/packages/services/api/src/modules/app-deployments/worker/persisted-documents-worker.ts b/packages/services/api/src/modules/app-deployments/worker/persisted-documents-worker.ts index 5ddd0f5ceeb..124c62495dd 100644 --- a/packages/services/api/src/modules/app-deployments/worker/persisted-documents-worker.ts +++ b/packages/services/api/src/modules/app-deployments/worker/persisted-documents-worker.ts @@ -26,6 +26,7 @@ export function createWorker( readonly secretAccessKey: string; readonly sessionToken: string | undefined; }; + readonly uploadConcurrency: number; }; s3Mirror: { readonly bucketName: string; @@ -81,6 +82,8 @@ export function createWorker( clickhouse, s3Config, logger as any, + undefined, + env.s3.uploadConcurrency, ); process.on('unhandledRejection', function (err) { diff --git a/packages/services/server/src/environment.ts b/packages/services/server/src/environment.ts index f3612b0f92f..e0fe748f012 100644 --- a/packages/services/server/src/environment.ts +++ b/packages/services/server/src/environment.ts @@ -174,6 +174,7 @@ const S3Model = zod.object({ S3_SECRET_ACCESS_KEY: zod.string(), S3_SESSION_TOKEN: emptyString(zod.string().optional()), S3_BUCKET_NAME: zod.string(), + S3_UPLOAD_CONCURRENCY: emptyString(NumberFromString.optional()), }); const S3MirrorModel = zod.union([ @@ -495,6 +496,7 @@ export const env = { secretAccessKey: s3.S3_SECRET_ACCESS_KEY, sessionToken: s3.S3_SESSION_TOKEN, }, + uploadConcurrency: s3.S3_UPLOAD_CONCURRENCY ?? 100, }, s3Mirror: s3Mirror.S3_MIRROR === '1' From 8b9565282827cbed3ab99a4973b1e3ac6af3a49c Mon Sep 17 00:00:00 2001 From: Adam Benhassen Date: Tue, 27 Jan 2026 04:26:25 +0100 Subject: [PATCH 3/4] prettier changes --- .../tests/api/app-deployments.spec.ts | 13 ++++------ load-tests/app-deployments/README.md | 24 +++++++++---------- load-tests/app-deployments/run-benchmark.ts | 4 +--- .../libraries/cli/src/commands/app/create.ts | 3 ++- .../providers/app-deployments-manager.ts | 3 +-- .../providers/app-deployments.ts | 5 +--- .../providers/persisted-document-ingester.ts | 2 -- 7 files changed, 22 insertions(+), 32 deletions(-) diff --git a/integration-tests/tests/api/app-deployments.spec.ts b/integration-tests/tests/api/app-deployments.spec.ts index 6b7fdd3e6d7..3c1c480f795 100644 --- a/integration-tests/tests/api/app-deployments.spec.ts +++ b/integration-tests/tests/api/app-deployments.spec.ts @@ -5423,15 +5423,12 @@ test('v2 format accepts sha256 hash with sha256: prefix', async () => { }).then(res => res.expectNoGraphQLErrors()); // CDN access uses the hash with prefix - stored and looked up with the prefix - const response = await fetch( - `${cdnAccess.cdnUrl}/apps/my-app/1.0.0/${sha256HashWithPrefix}`, - { - method: 'GET', - headers: { - 'X-Hive-CDN-Key': cdnAccess.secretAccessToken, - }, + const response = await fetch(`${cdnAccess.cdnUrl}/apps/my-app/1.0.0/${sha256HashWithPrefix}`, { + method: 'GET', + headers: { + 'X-Hive-CDN-Key': cdnAccess.secretAccessToken, }, - ); + }); expect(response.status).toBe(200); expect(await response.text()).toBe('query { hello }'); }); diff --git a/load-tests/app-deployments/README.md b/load-tests/app-deployments/README.md index b9653c3fa22..f9a05a209d9 100644 --- a/load-tests/app-deployments/README.md +++ b/load-tests/app-deployments/README.md @@ -16,21 +16,21 @@ DOC_COUNT=1000 npx tsx load-tests/app-deployments/run-benchmark.ts ### 1000 Documents -| Scenario | Docs Uploaded | Parse | Validate | Coords | ClickHouse | S3 | **Total** | -|----------|---------------|-------|----------|--------|------------|-----|-----------| -| V1 Initial | 1000 | 17ms | 401ms | 45ms | 1790ms | 8274ms | **9.1s** | -| V2 Initial | 1000 | 10ms | 276ms | 48ms | 936ms | 7040ms | **7.6s** | -| V2 Delta | 50 (950 skipped) | 1ms | 25ms | 5ms | 51ms | 323ms | **376ms** | +| Scenario | Docs Uploaded | Parse | Validate | Coords | ClickHouse | S3 | **Total** | +| ---------- | ---------------- | ----- | -------- | ------ | ---------- | ------ | --------- | +| V1 Initial | 1000 | 17ms | 401ms | 45ms | 1790ms | 8274ms | **9.1s** | +| V2 Initial | 1000 | 10ms | 276ms | 48ms | 936ms | 7040ms | **7.6s** | +| V2 Delta | 50 (950 skipped) | 1ms | 25ms | 5ms | 51ms | 323ms | **376ms** | ## Format Comparison -| Feature | V1 (Legacy) | V2 (SHA256) | -|---------|-------------|-------------| -| Hash format | Any alphanumeric | SHA256 only | -| Hash validation | Format only | Format + content match | -| S3 key | `app/{target}/{name}/{version}/{hash}` | `app-v2/{target}/{name}/{hash}` | -| Cross-version dedup | No | Yes | -| Delta uploads | No | Yes | +| Feature | V1 (Legacy) | V2 (SHA256) | +| ------------------- | -------------------------------------- | ------------------------------- | +| Hash format | Any alphanumeric | SHA256 only | +| Hash validation | Format only | Format + content match | +| S3 key | `app/{target}/{name}/{version}/{hash}` | `app-v2/{target}/{name}/{hash}` | +| Cross-version dedup | No | Yes | +| Delta uploads | No | Yes | ## Environment Variables diff --git a/load-tests/app-deployments/run-benchmark.ts b/load-tests/app-deployments/run-benchmark.ts index b48713b6eda..d9e5340a561 100644 --- a/load-tests/app-deployments/run-benchmark.ts +++ b/load-tests/app-deployments/run-benchmark.ts @@ -10,9 +10,7 @@ const ENDPOINT = process.env.REGISTRY_ENDPOINT ?? 'http://localhost:3001/graphql const TOKEN = process.env.REGISTRY_TOKEN ?? 'd43544cd1400e177c280afdce6876e7f'; const TARGET = process.env.TARGET ?? 'the-guild/hive/demo'; const TIMESTAMP = Math.floor(Date.now() / 1000); -const DOC_COUNTS = process.env.DOC_COUNT - ? [parseInt(process.env.DOC_COUNT, 10)] - : [1000]; +const DOC_COUNTS = process.env.DOC_COUNT ? [parseInt(process.env.DOC_COUNT, 10)] : [1000]; const operations = [ 'query GetUsers { users { id name } }', diff --git a/packages/libraries/cli/src/commands/app/create.ts b/packages/libraries/cli/src/commands/app/create.ts index 4c0c18b679b..6b543dba6f9 100644 --- a/packages/libraries/cli/src/commands/app/create.ts +++ b/packages/libraries/cli/src/commands/app/create.ts @@ -242,7 +242,8 @@ export default class AppCreate extends Command { appName: flags['name'], appVersion: flags['version'], documents: buffer, - format: flags.format === 'v1' ? AppDeploymentFormatType.V1 : AppDeploymentFormatType.V2, + format: + flags.format === 'v1' ? AppDeploymentFormatType.V1 : AppDeploymentFormatType.V2, }, }, }); diff --git a/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts b/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts index d92f76a64af..35c90aeddb9 100644 --- a/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts +++ b/packages/services/api/src/modules/app-deployments/providers/app-deployments-manager.ts @@ -285,8 +285,7 @@ export class AppDeploymentsManager { targetId: string; appName: string; }): Promise< - | { type: 'success'; hashes: string[] } - | { type: 'error'; error: { message: string } } + { type: 'success'; hashes: string[] } | { type: 'error'; error: { message: string } } > { await this.session.assertPerformAction({ action: 'appDeployment:create', diff --git a/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts b/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts index 64eaf28576f..31b155ed66b 100644 --- a/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts +++ b/packages/services/api/src/modules/app-deployments/providers/app-deployments.ts @@ -1553,10 +1553,7 @@ export class AppDeployments { }; } - async getExistingDocumentHashes(args: { - targetId: string; - appName: string; - }): Promise { + async getExistingDocumentHashes(args: { targetId: string; appName: string }): Promise { this.logger.debug( 'get existing document hashes (targetId=%s, appName=%s)', args.targetId, diff --git a/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts b/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts index 97a75abe331..65332e4c7de 100644 --- a/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts +++ b/packages/services/api/src/modules/app-deployments/providers/persisted-document-ingester.ts @@ -34,7 +34,6 @@ export type ProcessingTiming = { documentsProcessed: number; }; - const AppDeploymentOperationHashModel = z .string() .trim() @@ -44,7 +43,6 @@ const AppDeploymentOperationHashModel = z 'This is required for safe cross-version document deduplication.', ); - const AppDeploymentOperationHashModelLegacy = z .string() .trim() From 69c3012b7a3f4155c12d74c957774ed07b375362 Mon Sep 17 00:00:00 2001 From: Adam Benhassen Date: Tue, 27 Jan 2026 04:28:53 +0100 Subject: [PATCH 4/4] add changeset --- .changeset/fast-dolphins-fly.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/fast-dolphins-fly.md diff --git a/.changeset/fast-dolphins-fly.md b/.changeset/fast-dolphins-fly.md new file mode 100644 index 00000000000..24293958808 --- /dev/null +++ b/.changeset/fast-dolphins-fly.md @@ -0,0 +1,6 @@ +--- +'@graphql-hive/cli': minor +'hive': minor +--- + +Add v2 storage format for app deployments with delta uploads support. The v2 format uses SHA256 content-addressed hashes enabling cross-version document deduplication - only new documents are uploaded, existing ones are skipped. This can reduce upload times by up to 24x for deployments with mostly unchanged documents.