Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/docs/content/3.adapters/1.overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ export default defineNitroPlugin((nitroApp) => {
---
Build your own adapter for any destination.
:::

:::card
---
icon: i-lucide-workflow
title: Pipeline
to: /adapters/pipeline
---
Batch events, retry on failure, and handle buffer overflow.
:::
::

## Multiple Destinations
Expand Down
42 changes: 16 additions & 26 deletions apps/docs/content/3.adapters/6.custom.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,41 +220,31 @@ export default defineNitroPlugin((nitroApp) => {

## Batching

For high-throughput scenarios, batch events before sending:
For high-throughput scenarios, use the [Drain Pipeline](/adapters/pipeline) to batch events, retry on failure, and handle buffer overflow automatically:

```typescript [server/plugins/evlog-drain.ts]
import type { WideEvent } from 'evlog'

const batch: WideEvent[] = []
const BATCH_SIZE = 100
const FLUSH_INTERVAL = 5000 // 5 seconds

async function flush() {
if (batch.length === 0) return

const events = batch.splice(0, batch.length)
await fetch('https://api.example.com/logs/batch', {
method: 'POST',
body: JSON.stringify(events),
})
}

// Flush periodically
setInterval(flush, FLUSH_INTERVAL)
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

export default defineNitroPlugin((nitroApp) => {
nitroApp.hooks.hook('evlog:drain', async (ctx) => {
batch.push(ctx.event)
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 100, intervalMs: 5000 },
})

if (batch.length >= BATCH_SIZE) {
await flush()
}
const drain = pipeline(async (batch) => {
await fetch('https://api.example.com/logs/batch', {
method: 'POST',
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
```

::callout{icon="i-lucide-alert-triangle" color="warning"}
**Note:** Batching in serverless environments (Vercel, Cloudflare Workers) requires careful handling since the runtime may terminate before the batch flushes. Consider using the platform's native batching or a queue service.
::callout{icon="i-lucide-arrow-right" color="info"}
See the [Pipeline documentation](/adapters/pipeline) for the full options reference, retry strategies, and buffer overflow handling.
::

## Error Handling Best Practices
Expand Down
167 changes: 167 additions & 0 deletions apps/docs/content/3.adapters/7.pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
---
title: Drain Pipeline
description: Batch events, retry on failure, and protect against buffer overflow with the shared drain pipeline.
navigation:
title: Pipeline
icon: i-lucide-workflow
links:
- label: Adapters Overview
icon: i-custom-plug
to: /adapters/overview
color: neutral
variant: subtle
- label: Custom Adapters
icon: i-lucide-code
to: /adapters/custom
color: neutral
variant: subtle
---

In production, sending one HTTP request per log event is wasteful. The drain pipeline buffers events and sends them in batches, retries on transient failures, and drops the oldest events when the buffer overflows.

## Quick Start

```typescript [server/plugins/evlog-drain.ts]
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
```

::callout{icon="i-lucide-alert-triangle" color="warning"}
Always call `drain.flush()` on server shutdown to ensure buffered events are sent before the process exits.
::

## How It Works

1. Events are buffered in memory as they arrive via the `evlog:drain` hook
2. A batch is flushed when either the **batch size** is reached or the **interval** expires (whichever comes first)
3. If the drain function fails, the batch is retried with the configured **backoff strategy**
4. If all retries are exhausted, `onDropped` is called with the lost events
5. If the buffer exceeds `maxBufferSize`, the oldest events are dropped to prevent memory leaks

## Configuration

```typescript [server/plugins/evlog-drain.ts]
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // Flush every 50 events
intervalMs: 5000, // Or every 5 seconds, whichever comes first
},
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 30000,
},
maxBufferSize: 1000,
onDropped: (events, error) => {
console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
},
})

const drain = pipeline(createAxiomDrain())

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
```

### Options Reference

| Option | Default | Description |
|--------|---------|-------------|
| `batch.size` | `50` | Maximum events per batch |
| `batch.intervalMs` | `5000` | Max time (ms) before flushing a partial batch |
| `retry.maxAttempts` | `3` | Total attempts including the initial one |
| `retry.backoff` | `'exponential'` | `'exponential'` \| `'linear'` \| `'fixed'` |
| `retry.initialDelayMs` | `1000` | Base delay for the first retry |
| `retry.maxDelayMs` | `30000` | Upper bound for any retry delay |
| `maxBufferSize` | `1000` | Max buffered events before dropping oldest |
| `onDropped` | — | Callback when events are dropped (overflow or retry exhaustion) |

## Backoff Strategies

| Strategy | Delay Pattern | Use Case |
|----------|--------------|----------|
| `exponential` | 1s, 2s, 4s, 8s... | Default. Best for transient failures that may need time to recover |
| `linear` | 1s, 2s, 3s, 4s... | Predictable delay growth |
| `fixed` | 1s, 1s, 1s, 1s... | Same delay every time. Useful for rate-limited APIs |

## Returned Drain Function

The function returned by `pipeline(drain)` is hook-compatible and exposes:

| Property | Type | Description |
|----------|------|-------------|
| `drain(ctx)` | `(ctx: T) => void` | Push a single event into the buffer |
| `drain.flush()` | `() => Promise<void>` | Force-flush all buffered events |
| `drain.pending` | `number` | Number of events currently buffered |

## Multiple Destinations

Wrap multiple adapters with a single pipeline:

```typescript [server/plugins/evlog-drain.ts]
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createOTLPDrain } from 'evlog/otlp'

export default defineNitroPlugin((nitroApp) => {
const axiom = createAxiomDrain()
const otlp = createOTLPDrain()

const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(async (batch) => {
await Promise.allSettled([axiom(batch), otlp(batch)])
})

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
```

## Custom Drain Function

You don't need an adapter — pass any async function that accepts a batch:

```typescript [server/plugins/evlog-drain.ts]
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 100 },
})

const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
```

## Next Steps

- [Adapters Overview](/adapters/overview) - Available built-in adapters
- [Custom Adapters](/adapters/custom) - Build your own drain function
- [Best Practices](/core-concepts/best-practices) - Security and production tips
38 changes: 38 additions & 0 deletions apps/playground/app/config/tests.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,44 @@ export const testConfig = {
},
],
} as TestSection,
{
id: 'pipeline',
label: 'Pipeline',
icon: 'i-lucide-layers',
title: 'Drain Pipeline (Batching + Retry)',
description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for batched drain output.',
layout: 'cards',
tests: [
{
id: 'pipeline-single',
label: '1 Request',
description: 'Single event - buffered until batch size (5) or interval (2s) is reached',
endpoint: '/api/test/success',
method: 'GET',
badge: {
label: 'Buffered',
color: 'blue',
},
toastOnSuccess: {
title: 'Event buffered',
description: 'Check terminal - will flush after 2s or when 5 events accumulate',
},
},
{
id: 'pipeline-batch',
label: 'Fire 10 Requests',
description: 'Fires 10 requests in parallel - should produce 2 batches of 5 events',
badge: {
label: '2 batches',
color: 'green',
},
toastOnSuccess: {
title: '10 requests sent',
description: 'Check terminal - should see 2 batches of 5 events',
},
},
],
} as TestSection,
{
id: 'drains',
label: 'Drains',
Expand Down
9 changes: 9 additions & 0 deletions apps/playground/app/pages/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ async function handleBatchRequest() {
)
}

async function handlePipelineBatch() {
await Promise.all(
Array.from({ length: 10 }, () => $fetch('/api/test/success')),
)
}

// Get custom onClick for specific tests
function getOnClick(testId: string) {
if (testId === 'structured-error-toast') {
Expand All @@ -47,6 +53,9 @@ function getOnClick(testId: string) {
if (testId === 'tail-fast-batch') {
return handleBatchRequest
}
if (testId === 'pipeline-batch') {
return handlePipelineBatch
}
return undefined
}
</script>
Expand Down
57 changes: 57 additions & 0 deletions packages/evlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,63 @@ export default defineNitroPlugin((nitroApp) => {

> See the [full documentation](https://evlog.hrcd.fr/adapters/overview) for adapter configuration options, troubleshooting, and advanced patterns.

## Drain Pipeline

For production use, wrap your drain adapter with `createDrainPipeline` to get **batching**, **retry with backoff**, and **buffer overflow protection**.

Without a pipeline, each event triggers a separate network call. The pipeline buffers events and sends them in batches, reducing overhead and handling transient failures automatically.

```typescript
// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 50, intervalMs: 5000 },
retry: { maxAttempts: 3, backoff: 'exponential', initialDelayMs: 1000 },
onDropped: (events, error) => {
console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
},
})

const drain = pipeline(createAxiomDrain())

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
```

### How it works

1. Events are buffered in memory as they arrive
2. A batch is flushed when either the **batch size** is reached or the **interval** expires (whichever comes first)
3. If the drain function fails, the batch is retried with the configured **backoff strategy**
4. If all retries are exhausted, `onDropped` is called with the lost events
5. If the buffer exceeds `maxBufferSize`, the oldest events are dropped to prevent memory leaks

### Options

| Option | Default | Description |
|--------|---------|-------------|
| `batch.size` | `50` | Maximum events per batch |
| `batch.intervalMs` | `5000` | Max time (ms) before flushing a partial batch |
| `retry.maxAttempts` | `3` | Total attempts (including first) |
| `retry.backoff` | `'exponential'` | `'exponential'` \| `'linear'` \| `'fixed'` |
| `retry.initialDelayMs` | `1000` | Base delay for first retry |
| `retry.maxDelayMs` | `30000` | Upper bound for any retry delay |
| `maxBufferSize` | `1000` | Max buffered events before dropping oldest |
| `onDropped` | — | Callback when events are dropped |

### Returned drain function

The function returned by `pipeline(drain)` is hook-compatible and exposes:

- **`drain(ctx)`** — Push a single event into the buffer
- **`drain.flush()`** — Force-flush all buffered events (call on server shutdown)
- **`drain.pending`** — Number of events currently buffered

## API Reference

### `initLogger(config)`
Expand Down
7 changes: 7 additions & 0 deletions packages/evlog/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
"./enrichers": {
"types": "./dist/enrichers.d.mts",
"import": "./dist/enrichers.mjs"
},
"./pipeline": {
"types": "./dist/pipeline.d.mts",
"import": "./dist/pipeline.mjs"
}
},
"main": "./dist/index.mjs",
Expand Down Expand Up @@ -90,6 +94,9 @@
],
"enrichers": [
"./dist/enrichers.d.mts"
],
"pipeline": [
"./dist/pipeline.d.mts"
]
}
},
Expand Down
Loading
Loading