diff --git a/apps/docs/content/3.adapters/1.overview.md b/apps/docs/content/3.adapters/1.overview.md index 1253186..928b657 100644 --- a/apps/docs/content/3.adapters/1.overview.md +++ b/apps/docs/content/3.adapters/1.overview.md @@ -92,6 +92,15 @@ export default defineNitroPlugin((nitroApp) => { --- Build your own adapter for any destination. ::: + + :::card + --- + icon: i-lucide-workflow + title: Pipeline + to: /adapters/pipeline + --- + Batch events, retry on failure, and handle buffer overflow. + ::: :: ## Multiple Destinations diff --git a/apps/docs/content/3.adapters/6.custom.md b/apps/docs/content/3.adapters/6.custom.md index 9569811..d12b1b7 100644 --- a/apps/docs/content/3.adapters/6.custom.md +++ b/apps/docs/content/3.adapters/6.custom.md @@ -220,41 +220,31 @@ export default defineNitroPlugin((nitroApp) => { ## Batching -For high-throughput scenarios, batch events before sending: +For high-throughput scenarios, use the [Drain Pipeline](/adapters/pipeline) to batch events, retry on failure, and handle buffer overflow automatically: ```typescript [server/plugins/evlog-drain.ts] -import type { WideEvent } from 'evlog' - -const batch: WideEvent[] = [] -const BATCH_SIZE = 100 -const FLUSH_INTERVAL = 5000 // 5 seconds - -async function flush() { - if (batch.length === 0) return - - const events = batch.splice(0, batch.length) - await fetch('https://api.example.com/logs/batch', { - method: 'POST', - body: JSON.stringify(events), - }) -} - -// Flush periodically -setInterval(flush, FLUSH_INTERVAL) +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' export default defineNitroPlugin((nitroApp) => { - nitroApp.hooks.hook('evlog:drain', async (ctx) => { - batch.push(ctx.event) + const pipeline = createDrainPipeline({ + batch: { size: 100, intervalMs: 5000 }, + }) - if (batch.length >= BATCH_SIZE) { - await flush() - } + const drain = pipeline(async (batch) => { + await fetch('https://api.example.com/logs/batch', { + method: 'POST', + body: JSON.stringify(batch.map(ctx => ctx.event)), + }) }) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) }) ``` -::callout{icon="i-lucide-alert-triangle" color="warning"} -**Note:** Batching in serverless environments (Vercel, Cloudflare Workers) requires careful handling since the runtime may terminate before the batch flushes. Consider using the platform's native batching or a queue service. +::callout{icon="i-lucide-arrow-right" color="info"} +See the [Pipeline documentation](/adapters/pipeline) for the full options reference, retry strategies, and buffer overflow handling. :: ## Error Handling Best Practices diff --git a/apps/docs/content/3.adapters/7.pipeline.md b/apps/docs/content/3.adapters/7.pipeline.md new file mode 100644 index 0000000..4e19205 --- /dev/null +++ b/apps/docs/content/3.adapters/7.pipeline.md @@ -0,0 +1,167 @@ +--- +title: Drain Pipeline +description: Batch events, retry on failure, and protect against buffer overflow with the shared drain pipeline. +navigation: + title: Pipeline + icon: i-lucide-workflow +links: + - label: Adapters Overview + icon: i-custom-plug + to: /adapters/overview + color: neutral + variant: subtle + - label: Custom Adapters + icon: i-lucide-code + to: /adapters/custom + color: neutral + variant: subtle +--- + +In production, sending one HTTP request per log event is wasteful. The drain pipeline buffers events and sends them in batches, retries on transient failures, and drops the oldest events when the buffer overflows. + +## Quick Start + +```typescript [server/plugins/evlog-drain.ts] +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' +import { createAxiomDrain } from 'evlog/axiom' + +export default defineNitroPlugin((nitroApp) => { + const pipeline = createDrainPipeline() + const drain = pipeline(createAxiomDrain()) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +::callout{icon="i-lucide-alert-triangle" color="warning"} +Always call `drain.flush()` on server shutdown to ensure buffered events are sent before the process exits. +:: + +## How It Works + +1. Events are buffered in memory as they arrive via the `evlog:drain` hook +2. A batch is flushed when either the **batch size** is reached or the **interval** expires (whichever comes first) +3. If the drain function fails, the batch is retried with the configured **backoff strategy** +4. If all retries are exhausted, `onDropped` is called with the lost events +5. If the buffer exceeds `maxBufferSize`, the oldest events are dropped to prevent memory leaks + +## Configuration + +```typescript [server/plugins/evlog-drain.ts] +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' +import { createAxiomDrain } from 'evlog/axiom' + +export default defineNitroPlugin((nitroApp) => { + const pipeline = createDrainPipeline({ + batch: { + size: 50, // Flush every 50 events + intervalMs: 5000, // Or every 5 seconds, whichever comes first + }, + retry: { + maxAttempts: 3, + backoff: 'exponential', + initialDelayMs: 1000, + maxDelayMs: 30000, + }, + maxBufferSize: 1000, + onDropped: (events, error) => { + console.error(`[evlog] Dropped ${events.length} events:`, error?.message) + }, + }) + + const drain = pipeline(createAxiomDrain()) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +### Options Reference + +| Option | Default | Description | +|--------|---------|-------------| +| `batch.size` | `50` | Maximum events per batch | +| `batch.intervalMs` | `5000` | Max time (ms) before flushing a partial batch | +| `retry.maxAttempts` | `3` | Total attempts including the initial one | +| `retry.backoff` | `'exponential'` | `'exponential'` \| `'linear'` \| `'fixed'` | +| `retry.initialDelayMs` | `1000` | Base delay for the first retry | +| `retry.maxDelayMs` | `30000` | Upper bound for any retry delay | +| `maxBufferSize` | `1000` | Max buffered events before dropping oldest | +| `onDropped` | — | Callback when events are dropped (overflow or retry exhaustion) | + +## Backoff Strategies + +| Strategy | Delay Pattern | Use Case | +|----------|--------------|----------| +| `exponential` | 1s, 2s, 4s, 8s... | Default. Best for transient failures that may need time to recover | +| `linear` | 1s, 2s, 3s, 4s... | Predictable delay growth | +| `fixed` | 1s, 1s, 1s, 1s... | Same delay every time. Useful for rate-limited APIs | + +## Returned Drain Function + +The function returned by `pipeline(drain)` is hook-compatible and exposes: + +| Property | Type | Description | +|----------|------|-------------| +| `drain(ctx)` | `(ctx: T) => void` | Push a single event into the buffer | +| `drain.flush()` | `() => Promise` | Force-flush all buffered events | +| `drain.pending` | `number` | Number of events currently buffered | + +## Multiple Destinations + +Wrap multiple adapters with a single pipeline: + +```typescript [server/plugins/evlog-drain.ts] +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' +import { createAxiomDrain } from 'evlog/axiom' +import { createOTLPDrain } from 'evlog/otlp' + +export default defineNitroPlugin((nitroApp) => { + const axiom = createAxiomDrain() + const otlp = createOTLPDrain() + + const pipeline = createDrainPipeline() + const drain = pipeline(async (batch) => { + await Promise.allSettled([axiom(batch), otlp(batch)]) + }) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +## Custom Drain Function + +You don't need an adapter — pass any async function that accepts a batch: + +```typescript [server/plugins/evlog-drain.ts] +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' + +export default defineNitroPlugin((nitroApp) => { + const pipeline = createDrainPipeline({ + batch: { size: 100 }, + }) + + const drain = pipeline(async (batch) => { + await fetch('https://your-service.com/logs', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(batch.map(ctx => ctx.event)), + }) + }) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +## Next Steps + +- [Adapters Overview](/adapters/overview) - Available built-in adapters +- [Custom Adapters](/adapters/custom) - Build your own drain function +- [Best Practices](/core-concepts/best-practices) - Security and production tips diff --git a/apps/playground/app/config/tests.config.ts b/apps/playground/app/config/tests.config.ts index e28a451..28103af 100644 --- a/apps/playground/app/config/tests.config.ts +++ b/apps/playground/app/config/tests.config.ts @@ -265,6 +265,44 @@ export const testConfig = { }, ], } as TestSection, + { + id: 'pipeline', + label: 'Pipeline', + icon: 'i-lucide-layers', + title: 'Drain Pipeline (Batching + Retry)', + description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for batched drain output.', + layout: 'cards', + tests: [ + { + id: 'pipeline-single', + label: '1 Request', + description: 'Single event - buffered until batch size (5) or interval (2s) is reached', + endpoint: '/api/test/success', + method: 'GET', + badge: { + label: 'Buffered', + color: 'blue', + }, + toastOnSuccess: { + title: 'Event buffered', + description: 'Check terminal - will flush after 2s or when 5 events accumulate', + }, + }, + { + id: 'pipeline-batch', + label: 'Fire 10 Requests', + description: 'Fires 10 requests in parallel - should produce 2 batches of 5 events', + badge: { + label: '2 batches', + color: 'green', + }, + toastOnSuccess: { + title: '10 requests sent', + description: 'Check terminal - should see 2 batches of 5 events', + }, + }, + ], + } as TestSection, { id: 'drains', label: 'Drains', diff --git a/apps/playground/app/pages/index.vue b/apps/playground/app/pages/index.vue index d62c6c9..21f02a3 100644 --- a/apps/playground/app/pages/index.vue +++ b/apps/playground/app/pages/index.vue @@ -39,6 +39,12 @@ async function handleBatchRequest() { ) } +async function handlePipelineBatch() { + await Promise.all( + Array.from({ length: 10 }, () => $fetch('/api/test/success')), + ) +} + // Get custom onClick for specific tests function getOnClick(testId: string) { if (testId === 'structured-error-toast') { @@ -47,6 +53,9 @@ function getOnClick(testId: string) { if (testId === 'tail-fast-batch') { return handleBatchRequest } + if (testId === 'pipeline-batch') { + return handlePipelineBatch + } return undefined } diff --git a/packages/evlog/README.md b/packages/evlog/README.md index c1ebcb2..ab34b4f 100644 --- a/packages/evlog/README.md +++ b/packages/evlog/README.md @@ -524,6 +524,63 @@ export default defineNitroPlugin((nitroApp) => { > See the [full documentation](https://evlog.hrcd.fr/adapters/overview) for adapter configuration options, troubleshooting, and advanced patterns. +## Drain Pipeline + +For production use, wrap your drain adapter with `createDrainPipeline` to get **batching**, **retry with backoff**, and **buffer overflow protection**. + +Without a pipeline, each event triggers a separate network call. The pipeline buffers events and sends them in batches, reducing overhead and handling transient failures automatically. + +```typescript +// server/plugins/evlog-drain.ts +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' +import { createAxiomDrain } from 'evlog/axiom' + +export default defineNitroPlugin((nitroApp) => { + const pipeline = createDrainPipeline({ + batch: { size: 50, intervalMs: 5000 }, + retry: { maxAttempts: 3, backoff: 'exponential', initialDelayMs: 1000 }, + onDropped: (events, error) => { + console.error(`[evlog] Dropped ${events.length} events:`, error?.message) + }, + }) + + const drain = pipeline(createAxiomDrain()) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +### How it works + +1. Events are buffered in memory as they arrive +2. A batch is flushed when either the **batch size** is reached or the **interval** expires (whichever comes first) +3. If the drain function fails, the batch is retried with the configured **backoff strategy** +4. If all retries are exhausted, `onDropped` is called with the lost events +5. If the buffer exceeds `maxBufferSize`, the oldest events are dropped to prevent memory leaks + +### Options + +| Option | Default | Description | +|--------|---------|-------------| +| `batch.size` | `50` | Maximum events per batch | +| `batch.intervalMs` | `5000` | Max time (ms) before flushing a partial batch | +| `retry.maxAttempts` | `3` | Total attempts (including first) | +| `retry.backoff` | `'exponential'` | `'exponential'` \| `'linear'` \| `'fixed'` | +| `retry.initialDelayMs` | `1000` | Base delay for first retry | +| `retry.maxDelayMs` | `30000` | Upper bound for any retry delay | +| `maxBufferSize` | `1000` | Max buffered events before dropping oldest | +| `onDropped` | — | Callback when events are dropped | + +### Returned drain function + +The function returned by `pipeline(drain)` is hook-compatible and exposes: + +- **`drain(ctx)`** — Push a single event into the buffer +- **`drain.flush()`** — Force-flush all buffered events (call on server shutdown) +- **`drain.pending`** — Number of events currently buffered + ## API Reference ### `initLogger(config)` diff --git a/packages/evlog/package.json b/packages/evlog/package.json index c5dcfef..5cb2ccb 100644 --- a/packages/evlog/package.json +++ b/packages/evlog/package.json @@ -58,6 +58,10 @@ "./enrichers": { "types": "./dist/enrichers.d.mts", "import": "./dist/enrichers.mjs" + }, + "./pipeline": { + "types": "./dist/pipeline.d.mts", + "import": "./dist/pipeline.mjs" } }, "main": "./dist/index.mjs", @@ -90,6 +94,9 @@ ], "enrichers": [ "./dist/enrichers.d.mts" + ], + "pipeline": [ + "./dist/pipeline.d.mts" ] } }, diff --git a/packages/evlog/src/adapters/axiom.ts b/packages/evlog/src/adapters/axiom.ts index 495e041..950548d 100644 --- a/packages/evlog/src/adapters/axiom.ts +++ b/packages/evlog/src/adapters/axiom.ts @@ -34,8 +34,11 @@ export interface AxiomConfig { * })) * ``` */ -export function createAxiomDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createAxiomDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.axiom and runtimeConfig.axiom const evlogAxiom = runtimeConfig?.evlog?.axiom @@ -56,9 +59,9 @@ export function createAxiomDrain(overrides?: Partial): (ctx: DrainC } try { - await sendToAxiom(ctx.event, config as AxiomConfig) + await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) } catch (error) { - console.error('[evlog/axiom] Failed to send event:', error) + console.error('[evlog/axiom] Failed to send events to Axiom:', error) } } } diff --git a/packages/evlog/src/adapters/otlp.ts b/packages/evlog/src/adapters/otlp.ts index b9849b2..0781e7d 100644 --- a/packages/evlog/src/adapters/otlp.ts +++ b/packages/evlog/src/adapters/otlp.ts @@ -215,8 +215,11 @@ function buildResourceAttributes( * })) * ``` */ -export function createOTLPDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createOTLPDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.otlp and runtimeConfig.otlp const evlogOtlp = runtimeConfig?.evlog?.otlp @@ -269,9 +272,9 @@ export function createOTLPDrain(overrides?: Partial): (ctx: DrainCon } try { - await sendToOTLP(ctx.event, config as OTLPConfig) + await sendBatchToOTLP(contexts.map(c => c.event), config as OTLPConfig) } catch (error) { - console.error('[evlog/otlp] Failed to send event:', error) + console.error('[evlog/otlp] Failed to send events to OTLP:', error) } } } diff --git a/packages/evlog/src/adapters/posthog.ts b/packages/evlog/src/adapters/posthog.ts index 2966232..32a87d6 100644 --- a/packages/evlog/src/adapters/posthog.ts +++ b/packages/evlog/src/adapters/posthog.ts @@ -61,8 +61,11 @@ export function toPostHogEvent(event: WideEvent, config: PostHogConfig): PostHog * })) * ``` */ -export function createPostHogDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createPostHogDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.posthog and runtimeConfig.posthog const evlogPostHog = runtimeConfig?.evlog?.posthog @@ -83,9 +86,9 @@ export function createPostHogDrain(overrides?: Partial): (ctx: Dr } try { - await sendToPostHog(ctx.event, config as PostHogConfig) + await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) } catch (error) { - console.error('[evlog/posthog] Failed to send event:', error) + console.error('[evlog/posthog] Failed to send events to PostHog:', error) } } } diff --git a/packages/evlog/src/adapters/sentry.ts b/packages/evlog/src/adapters/sentry.ts index 45f765b..dd99c3f 100644 --- a/packages/evlog/src/adapters/sentry.ts +++ b/packages/evlog/src/adapters/sentry.ts @@ -214,8 +214,11 @@ function buildEnvelopeBody(logs: SentryLog[], dsn: string): string { * })) * ``` */ -export function createSentryDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createSentryDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() const evlogSentry = runtimeConfig?.evlog?.sentry const rootSentry = runtimeConfig?.sentry @@ -234,9 +237,9 @@ export function createSentryDrain(overrides?: Partial): (ctx: Drai } try { - await sendToSentry(ctx.event, config as SentryConfig) + await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) } catch (error) { - console.error('[evlog/sentry] Failed to send log:', error) + console.error('[evlog/sentry] Failed to send events to Sentry:', error) } } } diff --git a/packages/evlog/src/pipeline.ts b/packages/evlog/src/pipeline.ts new file mode 100644 index 0000000..b0c839c --- /dev/null +++ b/packages/evlog/src/pipeline.ts @@ -0,0 +1,191 @@ +export interface DrainPipelineOptions { + batch?: { + /** Maximum number of events per batch sent to the drain function. @default 50 */ + size?: number + /** Maximum time (ms) an event can stay buffered before a flush is triggered, even if the batch is not full. @default 5000 */ + intervalMs?: number + } + retry?: { + /** Total number of attempts (including the initial one) before dropping the batch. @default 3 */ + maxAttempts?: number + /** Delay strategy between retry attempts. @default 'exponential' */ + backoff?: 'exponential' | 'linear' | 'fixed' + /** Base delay (ms) for the first retry. Scaled by the backoff strategy on subsequent retries. @default 1000 */ + initialDelayMs?: number + /** Upper bound (ms) for any single retry delay. @default 30000 */ + maxDelayMs?: number + } + /** Maximum number of events held in the buffer. When exceeded, the oldest event is dropped. @default 1000 */ + maxBufferSize?: number + /** Called when a batch is dropped after all retry attempts are exhausted, or when the buffer overflows. */ + onDropped?: (events: T[], error?: Error) => void +} + +export interface PipelineDrainFn { + (ctx: T): void + /** Flush all buffered events. Call on server shutdown. */ + flush: () => Promise + readonly pending: number +} + +/** + * Create a drain pipeline that batches events, retries on failure, and manages buffer overflow. + * + * Returns a higher-order function: pass your drain adapter to get a hook-compatible function. + * + * @example + * ```ts + * const pipeline = createDrainPipeline({ batch: { size: 50 } }) + * const drain = pipeline(async (batch) => { + * await sendToBackend(batch) + * }) + * + * // Use as a hook + * nitroApp.hooks.hook('evlog:drain', drain) + * + * // Flush on shutdown + * nitroApp.hooks.hook('close', () => drain.flush()) + * ``` + */ +export function createDrainPipeline(options?: DrainPipelineOptions): (drain: (batch: T[]) => void | Promise) => PipelineDrainFn { + const batchSize = options?.batch?.size ?? 50 + const intervalMs = options?.batch?.intervalMs ?? 5000 + const maxBufferSize = options?.maxBufferSize ?? 1000 + const maxAttempts = options?.retry?.maxAttempts ?? 3 + const backoffStrategy = options?.retry?.backoff ?? 'exponential' + const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 + const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 + const onDropped = options?.onDropped + + if (batchSize <= 0 || !Number.isFinite(batchSize)) { + throw new Error(`[evlog/pipeline] batch.size must be a positive finite number, got: ${batchSize}`) + } + if (intervalMs <= 0 || !Number.isFinite(intervalMs)) { + throw new Error(`[evlog/pipeline] batch.intervalMs must be a positive finite number, got: ${intervalMs}`) + } + if (maxBufferSize <= 0 || !Number.isFinite(maxBufferSize)) { + throw new Error(`[evlog/pipeline] maxBufferSize must be a positive finite number, got: ${maxBufferSize}`) + } + if (maxAttempts <= 0 || !Number.isFinite(maxAttempts)) { + throw new Error(`[evlog/pipeline] retry.maxAttempts must be a positive finite number, got: ${maxAttempts}`) + } + if (initialDelayMs < 0 || !Number.isFinite(initialDelayMs)) { + throw new Error(`[evlog/pipeline] retry.initialDelayMs must be a non-negative finite number, got: ${initialDelayMs}`) + } + if (maxDelayMs < 0 || !Number.isFinite(maxDelayMs)) { + throw new Error(`[evlog/pipeline] retry.maxDelayMs must be a non-negative finite number, got: ${maxDelayMs}`) + } + + return (drain: (batch: T[]) => void | Promise): PipelineDrainFn => { + const buffer: T[] = [] + let timer: ReturnType | null = null + let activeFlush: Promise | null = null + + function clearTimer(): void { + if (timer !== null) { + clearTimeout(timer) + timer = null + } + } + + function scheduleFlush(): void { + if (timer !== null || activeFlush) return + timer = setTimeout(() => { + timer = null + if (!activeFlush) startFlush() + }, intervalMs) + } + + function getRetryDelay(attempt: number): number { + let delay: number + switch (backoffStrategy) { + case 'linear': + delay = initialDelayMs * attempt + break + case 'fixed': + delay = initialDelayMs + break + case 'exponential': + default: + delay = initialDelayMs * 2 ** (attempt - 1) + break + } + return Math.min(delay, maxDelayMs) + } + + async function sendWithRetry(batch: T[]): Promise { + let lastError: Error | undefined + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + await drain(batch) + return + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)) + if (attempt < maxAttempts) { + await new Promise(r => setTimeout(r, getRetryDelay(attempt))) + } + } + } + onDropped?.(batch, lastError) + } + + async function drainBuffer(): Promise { + while (buffer.length > 0) { + const batch = buffer.splice(0, batchSize) + await sendWithRetry(batch) + } + } + + function startFlush(): void { + if (activeFlush) return + activeFlush = drainBuffer().finally(() => { + activeFlush = null + if (buffer.length >= batchSize) { + startFlush() + } else if (buffer.length > 0) { + scheduleFlush() + } + }) + } + + function push(ctx: T): void { + if (buffer.length >= maxBufferSize) { + const dropped = buffer.splice(0, 1) + onDropped?.(dropped) + } + buffer.push(ctx) + + if (buffer.length >= batchSize) { + clearTimer() + startFlush() + } else if (!activeFlush) { + scheduleFlush() + } + } + + async function flush(): Promise { + clearTimer() + if (activeFlush) { + await activeFlush + } + // Snapshot the buffer length to avoid infinite loop if push() is called during flush + const snapshot = buffer.length + if (snapshot > 0) { + const toFlush = buffer.splice(0, snapshot) + while (toFlush.length > 0) { + const batch = toFlush.splice(0, batchSize) + await sendWithRetry(batch) + } + } + } + + const hookFn = push as PipelineDrainFn + hookFn.flush = flush + Object.defineProperty(hookFn, 'pending', { + get: () => buffer.length, + enumerable: true, + }) + + return hookFn + } +} diff --git a/packages/evlog/test/pipeline.test.ts b/packages/evlog/test/pipeline.test.ts new file mode 100644 index 0000000..892f8e9 --- /dev/null +++ b/packages/evlog/test/pipeline.test.ts @@ -0,0 +1,386 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import type { DrainContext } from '../src/types' +import { createDrainPipeline } from '../src/pipeline' + +function createTestContext(id: number): DrainContext { + return { + event: { + timestamp: '2024-01-01T00:00:00.000Z', + level: 'info', + service: 'test', + environment: 'test', + id, + }, + } +} + +describe('createDrainPipeline', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + describe('batching by size', () => { + it('does not flush before batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).not.toHaveBeenCalled() + + await hook.flush() + }) + + it('flushes when batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + + await vi.runAllTimersAsync() + + expect(drain).toHaveBeenCalledTimes(1) + const batch = drain.mock.calls[0]![0] as DrainContext[] + expect(batch).toHaveLength(3) + expect(batch[0]!.event.id).toBe(1) + expect(batch[1]!.event.id).toBe(2) + expect(batch[2]!.event.id).toBe(3) + }) + + it('splits into multiple batches', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3 } })(drain) + + for (let i = 1; i <= 7; i++) hook(createTestContext(i)) + + await hook.flush() + + expect(drain).toHaveBeenCalledTimes(3) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(3) + expect((drain.mock.calls[1]![0] as DrainContext[])).toHaveLength(3) + expect((drain.mock.calls[2]![0] as DrainContext[])).toHaveLength(1) + }) + }) + + describe('batching by interval', () => { + it('flushes after interval expires', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 5000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + await vi.advanceTimersByTimeAsync(4999) + expect(drain).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(1) + expect(drain).toHaveBeenCalledTimes(1) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(2) + }) + + it('resets interval when batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 2, intervalMs: 5000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + // Batch size reached, should flush immediately + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Push one more - should start interval timer, not flush immediately + hook(createTestContext(3)) + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Wait for interval + await vi.advanceTimersByTimeAsync(5000) + expect(drain).toHaveBeenCalledTimes(2) + }) + }) + + describe('retry', () => { + it('retries on failure with exponential backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail 1')) + .mockRejectedValueOnce(new Error('fail 2')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 3, backoff: 'exponential', initialDelayMs: 100, maxDelayMs: 10000 }, + })(drain) + + hook(createTestContext(1)) + + // First attempt (immediate) + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Wait for first retry delay (100ms * 2^0 = 100ms) + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + + // Wait for second retry delay (100ms * 2^1 = 200ms) + await vi.advanceTimersByTimeAsync(200) + expect(drain).toHaveBeenCalledTimes(3) + }) + + it('retries with linear backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 3, backoff: 'linear', initialDelayMs: 100 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Linear: 100 * 1 = 100ms + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + + // Linear: 100 * 2 = 200ms + await vi.advanceTimersByTimeAsync(200) + expect(drain).toHaveBeenCalledTimes(3) + }) + + it('retries with fixed backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'fixed', initialDelayMs: 100 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Fixed: always 100ms + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + }) + + it('caps retry delay at maxDelayMs', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'exponential', initialDelayMs: 1000, maxDelayMs: 500 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Exponential: 1000 * 2^0 = 1000ms, capped to 500ms + await vi.advanceTimersByTimeAsync(500) + expect(drain).toHaveBeenCalledTimes(2) + }) + + it('calls onDropped after all retries exhausted', async () => { + const drain = vi.fn().mockRejectedValue(new Error('permanent failure')) + const onDropped = vi.fn() + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'fixed', initialDelayMs: 50 }, + onDropped, + })(drain) + + hook(createTestContext(1)) + + await vi.runAllTimersAsync() + + expect(drain).toHaveBeenCalledTimes(2) + expect(onDropped).toHaveBeenCalledTimes(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])).toHaveLength(1) + expect(onDropped.mock.calls[0]![1]).toBeInstanceOf(Error) + expect((onDropped.mock.calls[0]![1] as Error).message).toBe('permanent failure') + }) + }) + + describe('buffer overflow', () => { + it('drops oldest events and calls onDropped when buffer is full', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const onDropped = vi.fn() + + const hook = createDrainPipeline({ + batch: { size: 100, intervalMs: 60000 }, + maxBufferSize: 3, + onDropped, + })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + expect(onDropped).not.toHaveBeenCalled() + + // Buffer full - should drop oldest + hook(createTestContext(4)) + expect(onDropped).toHaveBeenCalledTimes(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])).toHaveLength(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])[0]!.event.id).toBe(1) + + expect(hook.pending).toBe(3) + + // Flush and verify the remaining events are 2, 3, 4 + await hook.flush() + const batch = drain.mock.calls[0]![0] as DrainContext[] + expect(batch.map(c => c.event.id)).toEqual([2, 3, 4]) + }) + }) + + describe('flush()', () => { + it('drains all buffered events', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + + await hook.flush() + + expect(drain).toHaveBeenCalledTimes(1) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(3) + expect(hook.pending).toBe(0) + }) + + it('is safe to call when buffer is empty', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline()(drain) + + await hook.flush() + + expect(drain).not.toHaveBeenCalled() + }) + + it('handles concurrent flush() calls safely', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + const [r1, r2] = await Promise.all([hook.flush(), hook.flush()]) + + expect(r1).toBeUndefined() + expect(r2).toBeUndefined() + expect(drain).toHaveBeenCalledTimes(1) + expect(hook.pending).toBe(0) + }) + + it('flush drains events that arrived during active flush', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 2, intervalMs: 60000 } })(drain) + + // Push 2 events to trigger auto-flush + hook(createTestContext(1)) + hook(createTestContext(2)) + + // Push more while flush may be in progress + hook(createTestContext(3)) + + // Explicit flush should drain everything + await hook.flush() + + expect(hook.pending).toBe(0) + const allEvents = drain.mock.calls.flatMap(call => call[0] as DrainContext[]) + expect(allEvents).toHaveLength(3) + }) + }) + + describe('pending', () => { + it('returns current buffer size', () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 60000 } })(drain) + + expect(hook.pending).toBe(0) + hook(createTestContext(1)) + expect(hook.pending).toBe(1) + hook(createTestContext(2)) + expect(hook.pending).toBe(2) + }) + + it('returns 0 after flush', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + expect(hook.pending).toBe(2) + + await hook.flush() + expect(hook.pending).toBe(0) + }) + }) + + describe('defaults', () => { + it('uses default options when none provided', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline()(drain) + + hook(createTestContext(1)) + + // Default interval is 5000ms + await vi.advanceTimersByTimeAsync(4999) + expect(drain).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(1) + expect(drain).toHaveBeenCalledTimes(1) + }) + }) + + describe('input validation', () => { + it('throws on batch.size <= 0', () => { + expect(() => createDrainPipeline({ batch: { size: 0 } })).toThrow('batch.size must be a positive finite number') + }) + + it('throws on batch.size = -1', () => { + expect(() => createDrainPipeline({ batch: { size: -1 } })).toThrow('batch.size must be a positive finite number') + }) + + it('throws on batch.intervalMs <= 0', () => { + expect(() => createDrainPipeline({ batch: { intervalMs: 0 } })).toThrow('batch.intervalMs must be a positive finite number') + }) + + it('throws on maxBufferSize <= 0', () => { + expect(() => createDrainPipeline({ maxBufferSize: 0 })).toThrow('maxBufferSize must be a positive finite number') + }) + + it('throws on retry.maxAttempts <= 0', () => { + expect(() => createDrainPipeline({ retry: { maxAttempts: 0 } })).toThrow('retry.maxAttempts must be a positive finite number') + }) + + it('throws on non-finite batch.size', () => { + expect(() => createDrainPipeline({ batch: { size: Infinity } })).toThrow('batch.size must be a positive finite number') + }) + + it('throws on NaN batch.size', () => { + expect(() => createDrainPipeline({ batch: { size: NaN } })).toThrow('batch.size must be a positive finite number') + }) + }) +}) diff --git a/packages/evlog/tsdown.config.ts b/packages/evlog/tsdown.config.ts index 1198b3f..d57bc47 100644 --- a/packages/evlog/tsdown.config.ts +++ b/packages/evlog/tsdown.config.ts @@ -21,6 +21,7 @@ export default defineConfig({ 'adapters/posthog': 'src/adapters/posthog.ts', 'adapters/sentry': 'src/adapters/sentry.ts', 'enrichers': 'src/enrichers/index.ts', + 'pipeline': 'src/pipeline.ts', }, format: 'esm', dts: true, diff --git a/skills/evlog/SKILL.md b/skills/evlog/SKILL.md index c5a737a..c0a899e 100644 --- a/skills/evlog/SKILL.md +++ b/skills/evlog/SKILL.md @@ -36,6 +36,7 @@ Review and improve logging patterns in TypeScript/JavaScript codebases. Transfor | Error handling | [references/structured-errors.md](references/structured-errors.md) | | Code review checklist | [references/code-review.md](references/code-review.md) | | Log draining & adapters | See "Log Draining & Adapters" section below | +| Drain pipeline | [references/drain-pipeline.md](references/drain-pipeline.md) | ## Important: Auto-imports in Nuxt @@ -410,6 +411,35 @@ export default defineNitroPlugin((nitroApp) => { }) ``` +### Drain Pipeline (Production) + +For production use, wrap any adapter with `createDrainPipeline` to get batching, retry with backoff, and buffer overflow protection. Without a pipeline, each event triggers a separate network call. + +```typescript +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' +import { createAxiomDrain } from 'evlog/axiom' + +export default defineNitroPlugin((nitroApp) => { + const pipeline = createDrainPipeline({ + batch: { size: 50, intervalMs: 5000 }, + retry: { maxAttempts: 3, backoff: 'exponential' }, + onDropped: (events, error) => { + console.error(`[evlog] Dropped ${events.length} events:`, error?.message) + }, + }) + + const drain = pipeline(createAxiomDrain()) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +Key options: `batch.size` (default 50), `batch.intervalMs` (default 5000), `retry.maxAttempts` (default 3), `retry.backoff` (`'exponential'` | `'linear'` | `'fixed'`), `maxBufferSize` (default 1000). + +See [references/drain-pipeline.md](references/drain-pipeline.md) for full patterns and options. + ## Security: Preventing Sensitive Data Leakage Wide events capture comprehensive context, making it easy to accidentally log sensitive data. @@ -468,6 +498,7 @@ When reviewing code, check for: 8. **Client-side logging** → Use `log` API for debugging in Vue components 9. **Client log centralization** → Enable `transport.enabled: true` to send client logs to server 10. **Missing log draining** → Set up adapters (`evlog/axiom`, `evlog/otlp`) for production log export +11. **No drain pipeline** → Wrap adapters with `createDrainPipeline()` for batching, retry, and buffer overflow protection ## Loading Reference Files @@ -476,5 +507,6 @@ Load reference files based on what you're working on: - Designing wide events → [references/wide-events.md](references/wide-events.md) - Improving errors → [references/structured-errors.md](references/structured-errors.md) - Full code review → [references/code-review.md](references/code-review.md) +- Drain pipeline setup → [references/drain-pipeline.md](references/drain-pipeline.md) **DO NOT load all files at once** - load only what's needed for the current task. diff --git a/skills/evlog/references/drain-pipeline.md b/skills/evlog/references/drain-pipeline.md new file mode 100644 index 0000000..d605665 --- /dev/null +++ b/skills/evlog/references/drain-pipeline.md @@ -0,0 +1,158 @@ +# Drain Pipeline Reference + +The drain pipeline wraps any adapter to add batching, retry with backoff, and buffer overflow protection. Use it in production to reduce network overhead and handle transient failures. + +## When to Recommend + +- The user has a drain adapter (Axiom, OTLP, custom) and is deploying to production +- High-throughput scenarios where one HTTP request per event is wasteful +- The user needs retry logic for unreliable backends +- The user is implementing batching manually with `setInterval` and arrays + +## Basic Setup + +```typescript +// server/plugins/evlog-drain.ts +import type { DrainContext } from 'evlog' +import { createDrainPipeline } from 'evlog/pipeline' +import { createAxiomDrain } from 'evlog/axiom' + +export default defineNitroPlugin((nitroApp) => { + const pipeline = createDrainPipeline() + const drain = pipeline(createAxiomDrain()) + + nitroApp.hooks.hook('evlog:drain', drain) + nitroApp.hooks.hook('close', () => drain.flush()) +}) +``` + +**Important:** Always call `drain.flush()` on server `close` hook. Without it, buffered events are lost when the process exits. + +## Full Configuration + +```typescript +const pipeline = createDrainPipeline({ + batch: { + size: 50, // Max events per batch (default: 50) + intervalMs: 5000, // Max wait before flushing partial batch (default: 5000) + }, + retry: { + maxAttempts: 3, // Total attempts including first (default: 3) + backoff: 'exponential', // 'exponential' | 'linear' | 'fixed' (default: 'exponential') + initialDelayMs: 1000, // Base delay for first retry (default: 1000) + maxDelayMs: 30000, // Upper bound for any retry delay (default: 30000) + }, + maxBufferSize: 1000, // Max buffered events; oldest dropped on overflow (default: 1000) + onDropped: (events, error) => { + // Called when events are dropped (overflow or retry exhaustion) + console.error(`[evlog] Dropped ${events.length} events:`, error?.message) + }, +}) +``` + +## How It Works + +1. `drain(ctx)` pushes a single event into the buffer +2. When `buffer.length >= batch.size`, the batch is flushed immediately +3. If the batch isn't full, a timer starts; after `intervalMs`, whatever is buffered gets flushed +4. On flush, the drain function receives `T[]` (always an array) +5. If the drain throws, the batch is retried with the configured backoff +6. After `maxAttempts` failures, `onDropped` is called and the batch is discarded +7. If the buffer exceeds `maxBufferSize`, the oldest event is dropped and `onDropped` is called + +## Backoff Strategies + +| Strategy | Delay Pattern | Best For | +|----------|--------------|----------| +| `exponential` | 1s, 2s, 4s, 8s... | Default. Transient failures needing recovery time | +| `linear` | 1s, 2s, 3s, 4s... | Predictable delay growth | +| `fixed` | 1s, 1s, 1s, 1s... | Rate-limited APIs with known cooldown | + +## Returned Drain Function API + +```typescript +const drain = pipeline(myDrainFn) + +drain(ctx) // Push a single event (synchronous, non-blocking) +await drain.flush() // Force-flush all buffered events +drain.pending // Number of events currently buffered (readonly) +``` + +## Common Patterns + +### With multiple adapters + +```typescript +const axiom = createAxiomDrain() +const otlp = createOTLPDrain() + +const pipeline = createDrainPipeline() +const drain = pipeline(async (batch) => { + await Promise.allSettled([axiom(batch), otlp(batch)]) +}) +``` + +### Custom drain function + +```typescript +const pipeline = createDrainPipeline({ batch: { size: 100 } }) +const drain = pipeline(async (batch) => { + await fetch('https://your-service.com/logs', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(batch.map(ctx => ctx.event)), + }) +}) +``` + +### Low-traffic with longer interval + +```typescript +const pipeline = createDrainPipeline({ + batch: { size: 10, intervalMs: 30000 }, // Flush every 30s or 10 events +}) +``` + +## Anti-Patterns + +### Manual batching with setInterval + +```typescript +// ❌ No retry, no overflow protection, no flush on shutdown +const batch: WideEvent[] = [] +setInterval(() => { + if (batch.length > 0) fetch(...) +}, 5000) +``` + +**Transform to:** + +```typescript +// ✅ Use the pipeline +const pipeline = createDrainPipeline() +const drain = pipeline(async (batch) => { await fetch(...) }) +nitroApp.hooks.hook('close', () => drain.flush()) +``` + +### Missing flush on shutdown + +```typescript +// ❌ Buffered events lost on process exit +nitroApp.hooks.hook('evlog:drain', drain) +``` + +**Fix:** + +```typescript +// ✅ Always flush on close +nitroApp.hooks.hook('evlog:drain', drain) +nitroApp.hooks.hook('close', () => drain.flush()) +``` + +## Review Checklist + +- [ ] Pipeline wraps the adapter for production use +- [ ] `drain.flush()` called on server `close` hook +- [ ] `onDropped` callback logs or reports dropped events +- [ ] Batch size and interval are appropriate for the traffic volume +- [ ] `maxBufferSize` is set to prevent memory leaks under load