Stream Protocol
This document describes the internal streaming protocol used by Helix Agents for real-time event communication.
The canonical schema lives at packages/core/src/types/stream.ts. This document is the authoritative internals reference for the on-the-wire shape — when other internals docs (e.g. subagent-execution.md) reference chunk fields, they should match the schemas defined here.
Overview
The stream protocol enables real-time communication of:
- Text generation — Token-by-token LLM output
- Reasoning/thinking — Internal reasoning traces
- Tool execution — Start / end / approval / argument-streaming events
- Sub-agent activity — Delegation to child agents
- State changes — RFC 6902 patches
- Custom events — Application-specific data
- Errors and output — Final results
- Run lifecycle — Interrupt, resume, pause, supersede, checkpoint, step commit/discard, stream resync, suspension marker
All chunk schemas extend BaseChunkSchema which carries agentId, agentType, timestamp, and step fields. The discriminator field is type. Validation is via Zod (StreamChunkSchema, StreamMessageSchema).
Chunk Types
The full discriminated union has 30+ variants. They group into the following categories:
LLM-emitted chunks
TextDeltaChunk
Token-by-token text from the LLM.
interface TextDeltaChunk {
type: 'text_delta';
delta: string; // Incremental text
agentId: string;
agentType: string;
timestamp: number;
step: number;
}ThinkingChunk
Reasoning/thinking content from models that support it.
interface ThinkingChunk {
type: 'thinking';
content: string;
isComplete: boolean; // false = streaming; true = complete block
agentId: string;
agentType: string;
timestamp: number;
step: number;
}Tool argument streaming
For models that emit tool arguments incrementally.
interface ToolArgStreamStartChunk {
type: 'tool_arg_stream_start';
toolCallId: string;
toolName: string;
// + Base fields
}
interface ToolArgStreamDeltaChunk {
type: 'tool_arg_stream_delta';
toolCallId: string;
delta: string; // Incremental JSON text
// + Base fields
}
interface ToolArgStreamEndChunk {
type: 'tool_arg_stream_end';
toolCallId: string;
// + Base fields
}Tool execution chunks
ToolStartChunk
Emitted when a tool invocation begins.
interface ToolStartChunk {
type: 'tool_start';
toolCallId: string;
toolName: string;
arguments: Record<string, unknown>;
// + Base fields
}ToolEndChunk
Emitted when a tool invocation completes.
interface ToolEndChunk {
type: 'tool_end';
toolCallId: string;
toolName: string;
result: unknown;
/** Human-readable error message if the tool failed. */
error?: string;
/**
* Machine-readable error code when the tool failed. For client-executed
* tools the runtime emits one of `CLIENT_TOOL_ERROR_CODES` (e.g.
* `client_tool_timeout`, `aborted`); operator tooling can grep on this
* rather than the humanized `error` text.
*/
errorCode?: string;
/**
* True when the tool was executed by the framework (regular function
* tools, sub-agents, remote sub-agents, finishWith). False when the
* tool was executed client-side (`execute: 'client'`).
*
* Optional for backward compatibility — chunks emitted by older
* runtimes don't carry this flag and consumers should treat absence
* as "client-executed" (the conservative default that matches the
* Vercel AI SDK's `!providerExecuted` semantics). Used by
* `extractToolOutputs` in `HelixChatTransport` to filter out
* provider-executed tool parts from the auto-flush to
* `/submit-tool-result` — these are already complete on the server.
*/
providerExecuted?: boolean;
// + Base fields
}ToolApprovalRequestChunk (v7)
Emitted when an approval-gated tool requires human (or automatic) approval before execution. The UI should render the proposed action for the user to approve or deny.
interface ToolApprovalRequestChunk {
type: 'tool_approval_request';
toolCallId: string;
toolName: string;
/** Server-generated `${runId}::${toolCallId}` (matches Mastra's APPROVAL_ID_SEPARATOR). */
approvalId: string;
/** The validated tool input the LLM produced; sent so the UI can render the proposed action. */
input: unknown;
/** True when approval was decided automatically (e.g., a deterministic `requireApproval` evaluator). */
isAutomatic?: boolean;
// + Base fields
}ToolApprovalResponseChunk (v7)
Mirrors the request chunk so consumers can correlate request/response pairs via approvalId.
interface ToolApprovalResponseChunk {
type: 'tool_approval_response';
toolCallId: string;
toolName: string;
approvalId: string;
approved: boolean;
reason?: string;
isAutomatic?: boolean;
// + Base fields
}Both approval-gated and client-executed tool flows share the same
RunOutcome.kind = 'suspended_client_tool'AgentResult.status (see./concepts.md). Routing happens off thekindfield of thesubmitToolResultpayload ('client-tool-result'vs'approval-response'), not off the stream-chunk type. UIs typically dispatch off the chunk type for rendering and off the kind for submission.
Tool input/output errors
Distinct from the regular error chunk — these carry toolCallId/toolName and are emitted at specific failure boundaries.
interface ToolInputErrorChunk {
type: 'tool_input_error';
toolCallId: string;
toolName: string;
error: string;
partialInput?: unknown;
// + Base fields
}
interface ToolOutputErrorChunk {
type: 'tool_output_error';
toolCallId: string;
toolName: string;
error: string;
// + Base fields
}Sub-agent chunks
SubAgentStartChunk
Emitted when a sub-agent begins execution.
interface SubAgentStartChunk {
type: 'subagent_start';
subAgentType: string; // Sub-agent type (e.g. 'researcher')
subSessionId: string; // Sub-agent's session ID; correlate sub-agent chunks via this
callId: string; // Tool call ID that spawned this sub-agent
// + Base fields (agentId is the PARENT's session ID)
}SubAgentEndChunk
Emitted when a sub-agent completes.
interface SubAgentEndChunk {
type: 'subagent_end';
subAgentType: string;
subSessionId: string;
callId: string;
result: unknown; // Sub-agent output (failures surface via the result payload itself or via emitted error chunks)
// + Base fields
}This is the authoritative schema. Other internals docs that reference these chunks should match.
Custom + state chunks
CustomEventChunk
Application-specific events from tools (created via ToolContext.emit()).
interface CustomEventChunk {
type: 'custom';
eventName: string;
data: unknown;
// + Base fields
}StatePatchChunk
RFC 6902 JSON Patches for state updates.
interface StatePatchChunk {
type: 'state_patch';
patches: JSONPatchOperation[]; // RFC 6902 'add'|'remove'|'replace'|'move'|'copy'|'test'
// + Base fields
}Error + output chunks
ErrorChunk
interface ErrorChunk {
type: 'error';
error: string;
code?: string;
errorDetail?: {
message: string;
code?: string;
category?: string;
cause?: string;
retryable?: boolean;
};
recoverable: boolean;
// + Base fields
}When the error originates from a classified HelixError, the code field contains the error code (e.g., provider_overloaded, provider_rate_limited) and recoverable reflects whether the operation can be retried. See Error Handling Guide.
OutputChunk
Final structured output (when the agent has an outputSchema).
interface OutputChunk {
type: 'output';
output: unknown;
// + Base fields
}Source / citation chunks (RAG)
interface SourceUrlChunk {
type: 'source_url';
sourceId: string;
url: string;
title?: string;
// + Base fields
}
interface SourceDocumentChunk {
type: 'source_document';
sourceId: string;
mediaType: string;
title: string;
filename?: string;
// + Base fields
}Step chunks (multi-step workflow tracking)
interface StepStartChunk {
type: 'step_start';
stepId?: string;
// + Base fields
}
interface StepEndChunk {
type: 'step_end';
stepId?: string;
usage?: {
inputTokens: number;
outputTokens: number;
cachedTokens?: number;
cacheWriteTokens?: number;
};
finishReason?: 'stop' | 'length' | 'tool-calls' | 'content-filter' | 'error' | 'other';
// + Base fields
}File chunk (generated files / attachments)
interface FileChunk {
type: 'file';
url: string; // Data URL or hosted URL
mediaType: string;
filename?: string;
// + Base fields
}Run lifecycle chunks (v7)
These are emitted by the runtime (not the LLM) and signal run-level state transitions.
RunInterruptedChunk
Emitted when the agent is interrupted. The agent can be resumed via executor.resume().
interface RunInterruptedChunk {
type: 'run_interrupted';
runId: string;
checkpointId: string | null; // null if checkpointing failed
reason?: string;
// + Base fields
}RunResumedChunk
Emitted when an interrupted/paused agent is resumed.
interface RunResumedChunk {
type: 'run_resumed';
runId: string;
fromCheckpointId: string | null;
fromStepCount: number;
mode: string; // 'continue' | 'with_message' | 'with_confirmation' | 'from_checkpoint'
// + Base fields
}RunPausedChunk
Emitted when the agent has paused (typically waiting for user confirmation).
interface RunPausedChunk {
type: 'run_paused';
runId: string;
reason: string;
pendingToolName?: string;
pendingToolCallId?: string;
// + Base fields
}ExecutorSupersededChunk
Emitted when this executor has been superseded by another (StaleStateError detected during save).
interface ExecutorSupersededChunk {
type: 'executor_superseded';
reason?: string;
// + Base fields
}CheckpointCreatedChunk
interface CheckpointCreatedChunk {
type: 'checkpoint_created';
runId: string;
checkpointId: string;
stepCount: number;
// + Base fields
}StepCommittedChunk / StepDiscardedChunk
Signal whether staged changes were committed or discarded at a step boundary.
interface StepCommittedChunk {
type: 'step_committed';
runId: string;
stepId: string;
checkpointId: string;
// + Base fields
}
interface StepDiscardedChunk {
type: 'step_discarded';
runId: string;
stepId: string;
reason: string;
// + Base fields
}StreamResyncChunk
Emitted on resume only when orphaned messages or stream chunks were actually cleaned up — i.e. crash recovery, interrupt cleanup, retry, or checkpoint rollback/branch. The runtime gates emission on a real truncation (cleanedUpOrphans && checkpoint in runtime-js); a normal client-tool resume or follow-up turn truncates nothing and emits no resync. Clients receiving it should reload state from the server (e.g. a /snapshot endpoint).
This chunk is not emitted on the happy path. In particular, it is not part of the client-executed-tool continuation flow: on the Cloudflare DO and DBOS runtimes the run auto-continues after a submit and streams the continuation live (the AI SDK re-attaches to the running stream without a resync). See Client-Executed Tools for the per-runtime continuation model.
Run-scoped orphan cleanup (cleanupToStep minSequence). Orphan stream-chunk cleanup on resume is bounded to the resumed run. Step numbers reset per turn (each turn restarts step counting at 1), but stream sequence numbers are monotonic across the whole per-session stream. A naive cleanupToStep(checkpoint.stepCount) would therefore delete prior turns' committed chunks — they carry per-turn step numbers greater than the current turn's low stepCount — spuriously tripping cleanedUpOrphans and emitting a bogus crash_recovery resync on every multi-turn client-tool resume. To prevent this, runtimes pass the resumed run's startSequence as the cleanupToStep minSequence floor; only chunks with sequence > minSequence are eligible for deletion, so earlier turns are protected while genuine within-run orphans (a crashed step's chunks) are still removed. All four stream managers (in-memory, Cloudflare DO SQLite, Cloudflare StreamDurableObject, Redis) honor the floor; the cross-store contract test pins the behavior. The retry / from_checkpoint rollback path deliberately does not pass the floor — it intends to clean later turns' chunks.
fromSequence accuracy. fromSequence is the checkpoint's streamSequence. Checkpoints record the stream's live latestSequence at write time (resolved by resolveCheckpointStreamSequence from @helix-agents/core, invoked at each runtime's persistence point — a JS hook, or a Temporal/DBOS/CF-Workflow activity/step that is allowed to perform I/O, replay-safe). Earlier releases hardcoded 0 here; that was never load-bearing (the /snapshot endpoint computes its own sequence and no consumer replays from the resync's fromSequence), but the field is now accurate for observability and any future consumer. DBOS does not emit stream_resync, so it leaves streamSequence unset.
interface StreamResyncChunk {
type: 'stream_resync';
checkpointId: string;
stepCount: number;
messageCount: number;
fromSequence: number;
reason: 'crash_recovery' | 'rollback' | 'branch' | 'retry';
// + Base fields
}SuspensionMarkerChunk (Helix-internal)
Emitted by runtimes whose workflow body stays alive across suspensions (Temporal, Cloudflare Workflows). Lets the executor's handle observe via stream-watching that the run is suspended and project the right 'suspended_*' status from handle.result() without re-deriving from persisted state.
interface SuspensionMarkerChunk {
type: 'suspension_marker';
/** Suspension kind, matching `AgentResult.status`. */
kind: 'suspended_client_tool' | 'suspended_awaiting_children' | 'suspended_step_partial';
/**
* Suspension payload, matching `AgentResult.suspended` shape.
* - `suspended_client_tool`: { toolCallIds }
* - `suspended_step_partial`: { toolCallIds, stepId }
* - `suspended_awaiting_children`: { children }
*/
payload:
| { toolCallIds: string[]; stepId: string }
| { toolCallIds: string[] }
| { children: SuspendedChildWaitPayload[] };
// + Base fields
}@internal Helix coordination chunk — NOT part of the public stream surface. The binding-layer transformer filters this out of v6 streams; AI SDK consumers see suspension via existing
tool_start,tool_approval_request, andsubagent_startchunks.
Control message: stream_end
The only non-data variant. Lives on StreamMessageSchema (the union of data chunks + control messages).
interface StreamEnd {
type: 'stream_end';
streamId: string;
timestamp: number;
finalOutput?: unknown;
}Per-runtime chunk-emission parity
| Chunk type | runtime-js | runtime-cloudflare (DO) | runtime-cloudflare (Workflow) | runtime-temporal | runtime-dbos |
|---|---|---|---|---|---|
text_delta / thinking / tool_arg_stream_* | yes | yes | yes | yes | yes |
tool_start / tool_end | yes | yes | yes | yes | yes |
tool_end.providerExecuted field | yes (v7) | yes (v7) | yes (v7) | yes (v7) | yes (v7) |
tool_approval_request / tool_approval_response | yes (v7) | yes (v7) | yes (v7) | yes (v7) | yes (DBOS-native) |
subagent_start / subagent_end | yes | yes | yes | yes | partial |
state_patch / custom / error / output | yes | yes | yes | yes | yes |
run_interrupted / run_resumed / run_paused | yes | yes | yes | yes | yes |
checkpoint_created / step_committed / step_discarded | yes | yes | yes | yes | yes |
stream_resync | yes | yes | yes | yes | yes |
executor_superseded | yes | yes | yes | yes | yes |
suspension_marker | n/a | n/a | yes | yes | n/a |
suspension_marker is only emitted by runtimes whose workflow body stays alive across suspensions (Temporal, CFW Workflows). JS, CF DO, and DBOS surface suspension via the AgentResult.status projection instead.
Stream Lifecycle
Creation
const writer = await streamManager.createWriter(streamId, agentId, agentType);Writing
await writer.write({
type: 'text_delta',
delta: 'Hello',
agentId: runId,
agentType: 'assistant',
timestamp: Date.now(),
step: 0,
});
await writer.close();Reading
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
// Process chunk
}Ending
// Normal completion
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output
// With error
await streamManager.failStream(streamId, 'Something went wrong');Stream End Sentinel
A special marker indicates stream completion:
const STREAM_END_SENTINEL = Symbol.for('helix.stream.end');
interface StreamEndSentinel {
__streamEnd: true;
status: 'ended' | 'failed';
error?: string;
}Readers yield chunks until they encounter the sentinel.
Resumability
Sequence Numbers
Each chunk can have a sequence number for resumability:
interface StoredChunk {
chunk: StreamChunk;
sequence: number; // Monotonically increasing
}Resumable Reader
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100, // Resume from sequence 100
});
for await (const { chunk, sequence } of reader) {
// sequence can be used as Last-Event-ID
}Stream Status
type ResumableStreamStatus = 'active' | 'ended' | 'failed';
interface ResumableStreamReader {
status: ResumableStreamStatus;
[Symbol.asyncIterator](): AsyncIterator<{ chunk: StreamChunk; sequence: number }>;
}Wire Format
For transport over HTTP (SSE), events are serialized:
interface StreamEvent {
type: 'chunk' | 'end' | 'fail' | 'status' | 'truncated';
// ... payload based on type
}
interface StreamChunkEvent {
type: 'chunk';
chunk: StreamChunk;
sequence?: number;
}
interface StreamEndEvent {
type: 'end';
finalOutput?: unknown;
}
interface StreamFailEvent {
type: 'fail';
error: string;
}
interface StreamStatusEvent {
type: 'status';
// Lifecycle transition. Used as a wakeup ping for blocked readers when
// the stream pauses/resumes — readers re-check metadata and do NOT emit a
// chunk to the consumer.
status: 'paused' | 'active';
}
interface StreamTruncatedEvent {
type: 'truncated';
// The `stepCount` boundary passed to `cleanupToStep`; chunks with
// `step > truncatedAtStep` were removed.
truncatedAtStep: number;
// The stream's sequence high-water mark at the moment of cleanup. Readers
// gate the throw on `atSequence > lastYieldedSequence`. Optional for
// wire compatibility.
atSequence?: number;
}G4 — truncation surfacing
The 'truncated' event implements StreamManager concurrency guarantee #4 (G4): when a cleanupToStep(N) removes chunks past an attached reader's cursor, PUSH transports (the Cloudflare Durable Object SSE/WS path) broadcast a truncated control event so the attached reader surfaces a StreamTruncatedError on its next iteration. This is best-effort — a reader that attaches after cleanup (a fresh replay) won't see the event and instead resumes from the surviving chunks.
MARKER/POLL transports do not need the wire event: they detect the same condition via a stored marker consulted on each next() — the Redis hash field __truncated_at_step, the in-process Durable Object SQLite truncated_at_step meta, and an equivalent O(1) in-process field in the in-memory manager.
SSE Encoding
id: 1
data: {"type":"chunk","chunk":{"type":"text_delta","delta":"Hello"},"sequence":1}
id: 2
data: {"type":"chunk","chunk":{"type":"text_delta","delta":" world"},"sequence":2}
id: 3
data: {"type":"end"}Resume Headers (v7)
Reconnecting clients negotiate resume position via these headers (see prepare-helix-chat-request.ts):
X-Resume-From-Sequence— Client-supplied resume position; server honors viaextractResumePosition(headers)and synthesizes astream_resyncchunk with the matchingfromSequence.Last-Event-ID— Standard SSE; mapped to the same resume semantics asX-Resume-From-Sequenceif both are absent (server falls back toLast-Event-ID).X-Existing-Message-Id— Client-supplied existing message ID so the reconnected stream'sstartevent reuses it (avoids creating a duplicate assistant message client-side on reconnect).
Filtering Streams
Utility functions for stream manipulation:
import {
filterByAgentId,
filterByAgentType,
filterByType,
excludeTypes,
filterWith,
combineStreams,
take,
skip,
collectText,
collectAll,
} from '@helix-agents/core';
// Filter by agent
const agentChunks = filterByAgentId(stream, 'run-123');
// Only text chunks
const textChunks = filterByType(stream, ['text_delta']);
// Exclude thinking
const noThinking = excludeTypes(stream, ['thinking']);
// Custom filter
const important = filterWith(stream, (chunk) => chunk.type === 'error' || chunk.type === 'output');
// Combine multiple streams
const combined = combineStreams([stream1, stream2]);
// Collect all text
const fullText = await collectText(stream);Type Guards
Runtime type checking for chunks (full set — one per chunk variant):
import {
isTextDeltaChunk,
isThinkingChunk,
isToolStartChunk,
isToolEndChunk,
isSubAgentStartChunk,
isSubAgentEndChunk,
isCustomEventChunk,
isStatePatchChunk,
isErrorChunk,
isOutputChunk,
isToolArgStreamStartChunk,
isToolArgStreamDeltaChunk,
isToolArgStreamEndChunk,
isToolInputErrorChunk,
isToolOutputErrorChunk,
isSourceUrlChunk,
isSourceDocumentChunk,
isStepStartChunk,
isStepEndChunk,
isFileChunk,
isRunInterruptedChunk,
isRunResumedChunk,
isExecutorSupersededChunk,
isRunPausedChunk,
isCheckpointCreatedChunk,
isStepCommittedChunk,
isStepDiscardedChunk,
isStreamResyncChunk,
isSuspensionMarkerChunk,
isStreamEnd,
} from '@helix-agents/core';
for await (const chunk of stream) {
if (isTextDeltaChunk(chunk)) {
process.stdout.write(chunk.delta);
} else if (isToolStartChunk(chunk)) {
console.log(`Tool: ${chunk.toolName}`);
}
}Validation
Zod schemas for runtime validation:
import { StreamChunkSchema, StreamMessageSchema } from '@helix-agents/core';
// Validate a chunk
const result = StreamChunkSchema.safeParse(data);
if (result.success) {
const chunk: StreamChunk = result.data;
}Implementation Notes
Memory Streams
In-memory streams use arrays and async iterators:
class InMemoryStreamManager {
private streams = new Map<
string,
{
chunks: StreamChunk[];
readers: Set<() => void>;
status: 'active' | 'ended' | 'failed';
}
>();
}Redis Streams
Redis implementation uses Redis Streams (XADD/XREAD):
// Writing
await redis.xadd(
`stream:${streamId}`,
'*',
'chunk',
JSON.stringify(chunk),
'sequence',
sequence.toString()
);
// Reading with blocking
const entries = await redis.xread('BLOCK', 5000, 'STREAMS', `stream:${streamId}`, lastId);Durable Object Streams
Cloudflare Durable Objects provide coordination:
class StreamServer {
private chunks: StreamChunk[] = [];
private waiters: Set<(chunk: StreamChunk) => void> = new Set();
async write(chunk: StreamChunk) {
this.chunks.push(chunk);
for (const waiter of this.waiters) {
waiter(chunk);
}
}
}See Also
- Streaming Guide
- State Tracking
- Frontend Integration
- Sub-Agent Execution —
subagent_start/subagent_endfield schema must match this file.