Table of Contents

Stream Queries

Stream queries return a sequence of values over time via IAsyncEnumerable<T>. They are separate from regular queries — dispatched through IQueryDispatcher.StreamAsync() and processed by dedicated IStreamQueryBehavior<TQuery, TResult> pipeline behaviors.

Defining a stream query

public record TailOrderEventsQuery(Guid OrderId) : IStreamQuery<OrderEventDto>;

Handler

[StreamQueryHandler]
public sealed class TailOrderEventsHandler
    : IStreamQueryHandler<TailOrderEventsQuery, OrderEventDto>
{
    private readonly IOrderEventLog _log;
    public TailOrderEventsHandler(IOrderEventLog log) => _log = log;

    public async IAsyncEnumerable<OrderEventDto> HandleAsync(
        TailOrderEventsQuery query,
        [EnumeratorCancellation] CancellationToken ct)
    {
        await foreach (var evt in _log.TailAsync(query.OrderId, ct))
        {
            yield return evt.ToDto();
        }
    }
}

Dispatching

app.MapGet("/orders/{id:guid}/events", (
    Guid id,
    IQueryDispatcher dispatcher,
    CancellationToken ct) =>
{
    var stream = dispatcher.StreamAsync(new TailOrderEventsQuery(id), ct);
    return TypedResults.Stream(stream);
});

Notice that stream dispatch uses StreamAsync, not DispatchAsync — the distinction is intentional. Stream semantics (backpressure, cancellation, partial results) are fundamentally different from a single ValueTask<T> return, so Nexum uses a separate method name to prevent accidental misuse.

Stream pipeline behaviors

IStreamQueryBehavior<TQuery, TResult> lets you wrap a stream handler with cross-cutting concerns that understand async enumerables:

public sealed class LoggingStreamBehavior<TQuery, TResult>
    : IStreamQueryBehavior<TQuery, TResult>
    where TQuery : IStreamQuery<TResult>
{
    private readonly ILogger<LoggingStreamBehavior<TQuery, TResult>> _log;
    public LoggingStreamBehavior(ILogger<LoggingStreamBehavior<TQuery, TResult>> log) => _log = log;

    public async IAsyncEnumerable<TResult> HandleAsync(
        TQuery query,
        StreamQueryHandlerDelegate<TResult> next,
        [EnumeratorCancellation] CancellationToken ct)
    {
        _log.LogInformation("Streaming {Query}", typeof(TQuery).Name);
        var count = 0;
        await foreach (var item in next().WithCancellation(ct))
        {
            count++;
            yield return item;
        }
        _log.LogInformation("Streamed {Count} items from {Query}", count, typeof(TQuery).Name);
    }
}

Backpressure

Because the return is an IAsyncEnumerable<T>, the consumer drives iteration speed. If the consumer pauses, the handler pauses — there is no internal buffering beyond what the handler itself introduces. This makes stream queries naturally backpressure-friendly for HTTP streaming and SignalR.