Skip to content

Stream Protocol

This document describes the internal streaming protocol used by Helix Agents for real-time event communication.

Overview

The stream protocol enables real-time communication of:

  • Text generation - Token-by-token LLM output
  • Reasoning/thinking - Internal reasoning traces
  • Tool execution - Start/end events for tools
  • Sub-agent activity - Delegation to child agents
  • State changes - RFC 6902 patches
  • Custom events - Application-specific data
  • Errors and output - Final results

Chunk Types

TextDeltaChunk

Token-by-token text from the LLM.

typescript
interface TextDeltaChunk {
  type: 'text_delta';
  delta: string; // The text delta content
  agentId: string; // Run ID
  agentType?: string; // Agent type name
  timestamp: number; // Unix timestamp (ms)
}

Example:

json
{
  "type": "text_delta",
  "delta": "Hello, ",
  "agentId": "run-abc123",
  "agentType": "assistant",
  "timestamp": 1702329600000
}

ThinkingChunk

Reasoning/thinking content from models that support it.

typescript
interface ThinkingChunk {
  type: 'thinking';
  content: string; // Thinking content
  isComplete: boolean; // True if thinking is complete
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "thinking",
  "content": "Let me analyze this step by step...",
  "isComplete": false,
  "agentId": "run-abc123",
  "timestamp": 1702329600000
}

ToolStartChunk

Emitted when a tool invocation begins.

typescript
interface ToolStartChunk {
  type: 'tool_start';
  toolCallId: string; // Unique ID from LLM
  toolName: string; // Tool name
  arguments: unknown; // Tool input arguments
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "tool_start",
  "toolCallId": "call_xyz789",
  "toolName": "web_search",
  "arguments": { "query": "TypeScript tutorials" },
  "agentId": "run-abc123",
  "timestamp": 1702329600000
}

ToolEndChunk

Emitted when a tool invocation completes.

typescript
interface ToolEndChunk {
  type: 'tool_end';
  toolCallId: string; // Matches tool_start
  result: unknown; // Tool return value
  success: boolean; // Whether execution succeeded
  error?: string; // Error message if failed
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "tool_end",
  "toolCallId": "call_xyz789",
  "result": { "results": ["Tutorial 1", "Tutorial 2"] },
  "success": true,
  "agentId": "run-abc123",
  "timestamp": 1702329601000
}

SubAgentStartChunk

Emitted when a sub-agent begins execution.

typescript
interface SubAgentStartChunk {
  type: 'subagent_start';
  agentId: string; // Parent agent's session ID
  agentType: string; // Parent agent's type
  timestamp: number;
  step: number;
  subAgentType: string; // Sub-agent type (e.g., 'researcher')
  subSessionId: string; // Sub-agent's session ID (use to correlate sub-agent chunks)
  callId: string; // Tool call ID that spawned this sub-agent
}

SubAgentEndChunk

Emitted when a sub-agent completes.

typescript
interface SubAgentEndChunk {
  type: 'subagent_end';
  agentId: string; // Parent agent's session ID
  agentType: string; // Parent agent's type
  timestamp: number;
  step: number;
  subAgentType: string; // Sub-agent type
  subSessionId: string; // Sub-agent's session ID
  callId: string; // Tool call ID
  result: unknown; // Sub-agent output
}

CustomEventChunk

Application-specific events from tools.

typescript
interface CustomEventChunk {
  type: 'custom';
  eventName: string; // Custom event name
  data: unknown; // Event payload
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "custom",
  "eventName": "search_progress",
  "data": { "processed": 50, "total": 100 },
  "agentId": "run-abc123",
  "timestamp": 1702329600500
}

StatePatchChunk

RFC 6902 JSON Patches for state updates.

typescript
interface StatePatchChunk {
  type: 'state_patch';
  patches: JSONPatchOperation[]; // RFC 6902 operations
  agentId: string;
  agentType?: string;
  timestamp: number;
}

interface JSONPatchOperation {
  op: 'add' | 'remove' | 'replace';
  path: string; // JSON Pointer path
  value?: unknown; // For add/replace
}

Example:

json
{
  "type": "state_patch",
  "patches": [
    { "op": "add", "path": "/notes/-", "value": { "content": "Note 1" } },
    { "op": "replace", "path": "/searchCount", "value": 5 }
  ],
  "agentId": "run-abc123",
  "timestamp": 1702329600000
}

ErrorChunk

Error events during execution.

typescript
interface ErrorChunk {
  type: 'error';
  error: string; // Error message
  code?: string; // Error classification code (see ErrorCode in @helix-agents/core)
  recoverable: boolean; // Whether the error is recoverable/retryable
  agentId: string;
  agentType?: string;
  timestamp: number;
  step: number; // Step number when error occurred
}

When the error originates from a classified HelixError, the code field contains the error code (e.g., provider_overloaded, provider_rate_limited) and recoverable reflects whether the operation can be retried. For unclassified errors, code is omitted and recoverable defaults to false.

See Error Handling Guide for the complete error classification system.

OutputChunk

Final structured output.

typescript
interface OutputChunk {
  type: 'output';
  output: unknown; // The structured output
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Stream Lifecycle

Creation

typescript
const writer = await streamManager.createWriter(streamId, agentId, agentType);

Writing

typescript
await writer.write({
  type: 'text_delta',
  delta: 'Hello',
  agentId: runId,
  timestamp: Date.now(),
});

await writer.close();

Reading

typescript
const reader = await streamManager.createReader(streamId);

for await (const chunk of reader) {
  // Process chunk
}

Ending

typescript
// Normal completion
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output

// With error
await streamManager.failStream(streamId, 'Something went wrong');

Stream End Sentinel

A special marker indicates stream completion:

typescript
const STREAM_END_SENTINEL = Symbol.for('helix.stream.end');

interface StreamEndSentinel {
  __streamEnd: true;
  status: 'ended' | 'failed';
  error?: string;
}

Readers yield chunks until they encounter the sentinel.

Resumability

Sequence Numbers

Each chunk can have a sequence number for resumability:

typescript
interface StoredChunk {
  chunk: StreamChunk;
  sequence: number; // Monotonically increasing
}

Resumable Reader

typescript
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100, // Resume from sequence 100
});

for await (const { chunk, sequence } of reader) {
  // sequence can be used as Last-Event-ID
}

Stream Status

typescript
type ResumableStreamStatus = 'active' | 'ended' | 'failed';

interface ResumableStreamReader {
  status: ResumableStreamStatus;
  [Symbol.asyncIterator](): AsyncIterator<{ chunk: StreamChunk; sequence: number }>;
}

Wire Format

For transport over HTTP (SSE), events are serialized:

typescript
interface StreamEvent {
  type: 'chunk' | 'end' | 'fail';
  // ... payload based on type
}

interface StreamChunkEvent {
  type: 'chunk';
  chunk: StreamChunk;
  sequence?: number;
}

interface StreamEndEvent {
  type: 'end';
}

interface StreamFailEvent {
  type: 'fail';
  error: string;
}

SSE Encoding

id: 1
data: {"type":"chunk","chunk":{"type":"text_delta","content":"Hello"},"sequence":1}

id: 2
data: {"type":"chunk","chunk":{"type":"text_delta","content":" world"},"sequence":2}

id: 3
data: {"type":"end"}

Filtering Streams

Utility functions for stream manipulation:

typescript
import {
  filterByAgentId,
  filterByAgentType,
  filterByType,
  excludeTypes,
  filterWith,
  combineStreams,
  take,
  skip,
  collectText,
  collectAll,
} from '@helix-agents/core';

// Filter by agent
const agentChunks = filterByAgentId(stream, 'run-123');

// Only text chunks
const textChunks = filterByType(stream, ['text_delta']);

// Exclude thinking
const noThinking = excludeTypes(stream, ['thinking']);

// Custom filter
const important = filterWith(stream, (chunk) => chunk.type === 'error' || chunk.type === 'output');

// Combine multiple streams
const combined = combineStreams([stream1, stream2]);

// Collect all text
const fullText = await collectText(stream);

Type Guards

Runtime type checking for chunks:

typescript
import {
  isTextDeltaChunk,
  isThinkingChunk,
  isToolStartChunk,
  isToolEndChunk,
  isSubAgentStartChunk,
  isSubAgentEndChunk,
  isCustomEventChunk,
  isStatePatchChunk,
  isErrorChunk,
  isOutputChunk,
  isStreamEnd,
} from '@helix-agents/core';

for await (const chunk of stream) {
  if (isTextDeltaChunk(chunk)) {
    process.stdout.write(chunk.delta);
  } else if (isToolStartChunk(chunk)) {
    console.log(`Tool: ${chunk.toolName}`);
  }
}

Validation

Zod schemas for runtime validation:

typescript
import { StreamChunkSchema, StreamMessageSchema } from '@helix-agents/core';

// Validate a chunk
const result = StreamChunkSchema.safeParse(data);
if (result.success) {
  const chunk: StreamChunk = result.data;
}

Implementation Notes

Memory Streams

In-memory streams use arrays and async iterators:

typescript
class InMemoryStreamManager {
  private streams = new Map<
    string,
    {
      chunks: StreamChunk[];
      readers: Set<() => void>;
      status: 'active' | 'ended' | 'failed';
    }
  >();
}

Redis Streams

Redis implementation uses Redis Streams (XADD/XREAD):

typescript
// Writing
await redis.xadd(
  `stream:${streamId}`,
  '*',
  'chunk',
  JSON.stringify(chunk),
  'sequence',
  sequence.toString()
);

// Reading with blocking
const entries = await redis.xread('BLOCK', 5000, 'STREAMS', `stream:${streamId}`, lastId);

Durable Object Streams

Cloudflare Durable Objects provide coordination:

typescript
class StreamServer {
  private chunks: StreamChunk[] = [];
  private waiters: Set<(chunk: StreamChunk) => void> = new Set();

  async write(chunk: StreamChunk) {
    this.chunks.push(chunk);
    for (const waiter of this.waiters) {
      waiter(chunk);
    }
  }
}

See Also

Released under the MIT License.