Stream API
createStreamDrain() exposes the events flowing through evlog as an in-process pub/sub. It's the primitive any local consumer can subscribe to without re-implementing a drain.
import { createStreamDrain } from 'evlog/stream'
const stream = createStreamDrain({ buffer: 200 })
// Register as a normal evlog drain (Nitro hook or plugin runner):
nitroApp.hooks.hook('evlog:drain', stream.drain)
Subscribing
Two consumption styles are supported.
Sync listener
const unsubscribe = stream.subscribe((event) => {
if (event.level === 'error') notifyOps(event)
})
// Later:
unsubscribe()
Listener errors are caught and logged — they never affect other subscribers or the drain.
Async iterator
for await (const event of stream.events()) {
console.log(event.timestamp, event.action ?? event.message)
if (shouldStop(event)) break // breaking cleanly unsubscribes
}
Each call to events() returns a fresh independent iterator. Past buffered events are not replayed; pair with stream.recent() to seed history.
Replay buffer
stream.recent() returns a defensive copy of the most recent events (oldest first). The default buffer holds 500 events; pass buffer: 0 to disable, or set it explicitly:
const stream = createStreamDrain({ buffer: 1000 })
const initial = stream.recent()
for (const past of initial) seedDashboard(past)
stream.subscribe(liveEvent => updateDashboard(liveEvent))
Backpressure
A slow async-iterator consumer never blocks the drain. Each iterator has a per-subscriber queue (default 1000); when it overflows, the oldest queued events are dropped and stream.droppedCount increments.
Filter
Events that fail the optional filter predicate are not buffered nor delivered:
const errors = createStreamDrain({
filter: event => event.level === 'error' || event.status >= 500,
})
Default singleton
When several pieces of code in the same process need to share a single stream — typically a framework integration that wires the drain on one side and the stream server on the other — use the singleton accessors:
import { getDefaultStream, setDefaultStream } from 'evlog/stream'
// Lazily creates a singleton on first call
const stream = getDefaultStream({ buffer: 500 })
// Reset (mostly useful in tests)
setDefaultStream(null)
The mini stream server uses this singleton internally, so anything draining into getDefaultStream() automatically reaches all SSE clients.
Going further
- Network bridge — expose this stream over HTTP for browser tabs / CLIs / external devtools. See the Stream server.
- Recipes — concrete consumer patterns (devtool, replay-then-live, aggregation). See Recipes.
Overview
Two ways to build on top of evlog — observe the events flowing through the pipeline, or extend the pipeline itself with custom plugins, drains, enrichers, and shareable catalogs.
Stream server
A local HTTP mini-server on its own port that exposes the in-process stream over Server-Sent Events. Strict opt-in, framework-agnostic, no app route to wire.