Skip to content

Real-Time Streaming

Streaming provides real-time visibility into agent execution. The framework emits typed events for text generation, tool execution, sub-agent activity, custom events, and more.

Overview

When an agent executes, it produces a stream of chunks:

typescript
const handle = await executor.execute(agent, 'Research AI agents');
const stream = await handle.stream();

for await (const chunk of stream) {
  console.log(chunk.type, chunk);
}

Each chunk has:

  • type - Discriminator for the chunk kind
  • agentId - Which agent produced this chunk
  • agentType - The agent's type name
  • timestamp - When the chunk was created
  • Type-specific fields

Stream Chunk Types

text_delta

Incremental text from the LLM. Append deltas to build complete text.

typescript
interface TextDeltaChunk {
  type: 'text_delta';
  agentId: string;
  agentType: string;
  timestamp: number;
  delta: string; // Incremental text
}

Usage:

typescript
let fullText = '';

for await (const chunk of stream) {
  if (chunk.type === 'text_delta') {
    fullText += chunk.delta;
    process.stdout.write(chunk.delta); // Real-time display
  }
}

thinking

Reasoning/thinking content from LLMs that support it (Claude with extended thinking, OpenAI o-series).

typescript
interface ThinkingChunk {
  type: 'thinking';
  agentId: string;
  agentType: string;
  timestamp: number;
  content: string; // Thinking text
  isComplete: boolean; // false = streaming, true = complete block
}

Usage:

typescript
let thinkingBuffer = '';

for await (const chunk of stream) {
  if (chunk.type === 'thinking') {
    if (!chunk.isComplete) {
      // Streaming - accumulate
      thinkingBuffer += chunk.content;
    } else {
      // Complete block
      console.log('Thinking:', chunk.content);
      thinkingBuffer = '';
    }
  }
}

tool_start

Emitted when a tool begins execution.

typescript
interface ToolStartChunk {
  type: 'tool_start';
  agentId: string;
  agentType: string;
  timestamp: number;
  toolCallId: string; // Unique ID for this call
  toolName: string; // Tool being called
  arguments: Record<string, unknown>; // Arguments passed
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'tool_start') {
    console.log(`Starting: ${chunk.toolName}`);
    console.log(`Args: ${JSON.stringify(chunk.arguments)}`);
  }
}

tool_end

Emitted when a tool completes (success or failure).

typescript
interface ToolEndChunk {
  type: 'tool_end';
  agentId: string;
  agentType: string;
  timestamp: number;
  toolCallId: string; // Matches tool_start
  toolName: string;
  result: unknown; // Tool's return value
  error?: string; // Present if tool failed
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'tool_end') {
    if (chunk.error) {
      console.log(`${chunk.toolName} failed: ${chunk.error}`);
    } else {
      console.log(`${chunk.toolName} result:`, chunk.result);
    }
  }
}

subagent_start

Emitted when a sub-agent begins execution.

typescript
interface SubAgentStartChunk {
  type: 'subagent_start';
  agentId: string; // Parent agent ID
  agentType: string; // Parent agent type
  timestamp: number;
  subAgentType: string; // Sub-agent's type name
  subSessionId: string; // Sub-agent's unique session ID
  callId: string; // Tool call ID that spawned this
}

subagent_end

Emitted when a sub-agent completes.

typescript
interface SubAgentEndChunk {
  type: 'subagent_end';
  agentId: string; // Parent agent ID
  agentType: string; // Parent agent type
  timestamp: number;
  subAgentType: string;
  subSessionId: string;
  callId: string;
  result: unknown; // Sub-agent's output
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'subagent_start') {
    console.log(`Delegating to ${chunk.subAgentType}...`);
  }
  if (chunk.type === 'subagent_end') {
    console.log(`${chunk.subAgentType} completed:`, chunk.result);
  }
}

custom

Custom events emitted by tools via context.emit().

typescript
interface CustomEventChunk {
  type: 'custom';
  agentId: string;
  agentType: string;
  timestamp: number;
  eventName: string; // Event name from emit()
  data: unknown; // Event payload
}

Emitting custom events:

typescript
// In tool execute function
await context.emit('progress', { step: 1, total: 5 });
await context.emit('file_uploaded', { name: 'doc.pdf', size: 1024 });

Consuming custom events:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'custom') {
    switch (chunk.eventName) {
      case 'progress':
        console.log(`Progress: ${chunk.data.step}/${chunk.data.total}`);
        break;
      case 'file_uploaded':
        console.log(`Uploaded: ${chunk.data.name}`);
        break;
    }
  }
}

state_patch

RFC 6902 JSON Patch operations representing state changes.

typescript
interface StatePatchChunk {
  type: 'state_patch';
  agentId: string;
  agentType: string;
  timestamp: number;
  patches: JSONPatchOperation[];
}

interface JSONPatchOperation {
  op: 'add' | 'remove' | 'replace' | 'move' | 'copy' | 'test';
  path: string; // JSON Pointer path
  value?: unknown; // Value for add/replace
  from?: string; // Source for move/copy
}

Example patches:

typescript
// When tool does: draft.count = 5
{ op: 'replace', path: '/count', value: 5 }

// When tool does: draft.items.push('new')
{ op: 'add', path: '/items/0', value: 'new' }

// When tool does: delete draft.temp
{ op: 'remove', path: '/temp' }

Applying patches (client-side):

typescript
import { applyPatch } from 'fast-json-patch';

let clientState = { count: 0, items: [] };

for await (const chunk of stream) {
  if (chunk.type === 'state_patch') {
    clientState = applyPatch(clientState, chunk.patches).newDocument;
    console.log('State updated:', clientState);
  }
}

error

Error events during execution.

typescript
interface ErrorChunk {
  type: 'error';
  agentId: string;
  agentType: string;
  timestamp: number;
  error: string; // Error message
  code?: string; // Error code for categorization
  recoverable: boolean; // Whether execution can continue
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'error') {
    if (chunk.recoverable) {
      console.warn(`Warning: ${chunk.error}`);
    } else {
      console.error(`Fatal: ${chunk.error}`);
      break;
    }
  }
}

output

Final structured output when the agent completes successfully.

typescript
interface OutputChunk {
  type: 'output';
  agentId: string;
  agentType: string;
  timestamp: number;
  output: unknown; // Validated against outputSchema
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'output') {
    // Agent completed with structured output
    const result = chunk.output as MyOutputType;
    console.log('Final result:', result);
  }
}

Interrupt/Resume Chunks

These chunks are emitted during agent interrupt and resume operations.

Session vs Run Identifiers

Stream chunks include both sessionId (via agentId) and runId fields:

  • sessionId (shown as agentId): Primary key for all state operations. A session contains all messages, custom state, and checkpoints for a conversation.
  • runId: Identifies a specific execution instance within a session. Multiple runs can occur within a single session (e.g., after interrupts or when continuing a conversation).

Use sessionId for state operations (loading messages, saving checkpoints). Use runId for tracking specific executions (logging, tracing, debugging).

run_interrupted

Emitted when an agent is interrupted by user request.

typescript
interface RunInterruptedChunk {
  type: 'run_interrupted';
  agentId: string;
  agentType: string;
  timestamp: number;
  runId: string; // Run ID that was interrupted
  checkpointId: string | null; // Checkpoint created (null if failed)
  reason?: string; // Reason for interruption
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'run_interrupted') {
    console.log(`Agent interrupted: ${chunk.reason}`);
    console.log(`Resume from checkpoint: ${chunk.checkpointId}`);
  }
}

run_resumed

Emitted when an interrupted or paused agent resumes execution.

typescript
interface RunResumedChunk {
  type: 'run_resumed';
  agentId: string;
  agentType: string;
  timestamp: number;
  runId: string; // Run ID that was resumed
  fromCheckpointId: string | null; // Checkpoint resumed from
  fromStepCount: number; // Step count at resume point
  mode: string; // Resume mode used
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'run_resumed') {
    console.log(`Resumed from step ${chunk.fromStepCount}`);
    console.log(`Mode: ${chunk.mode}`);
  }
}

run_paused

Emitted when an agent pauses, typically waiting for user confirmation.

typescript
interface RunPausedChunk {
  type: 'run_paused';
  agentId: string;
  agentType: string;
  timestamp: number;
  runId: string;
  reason: string; // Why the agent paused
  pendingToolName?: string; // Tool waiting for confirmation
  pendingToolCallId?: string; // Tool call ID
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'run_paused') {
    console.log(`Agent paused: ${chunk.reason}`);
    if (chunk.pendingToolName) {
      console.log(`Waiting for approval: ${chunk.pendingToolName}`);
    }
  }
}

checkpoint_created

Emitted when a checkpoint is saved after a step completes.

typescript
interface CheckpointCreatedChunk {
  type: 'checkpoint_created';
  agentId: string;
  agentType: string;
  timestamp: number;
  runId: string;
  checkpointId: string; // The new checkpoint ID
  stepCount: number; // Step when checkpoint was created
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'checkpoint_created') {
    console.log(`Checkpoint at step ${chunk.stepCount}: ${chunk.checkpointId}`);
  }
}

executor_superseded

Emitted when this executor is superseded by another (e.g., during concurrent resume).

typescript
interface ExecutorSupersededChunk {
  type: 'executor_superseded';
  agentId: string;
  agentType: string;
  timestamp: number;
  reason?: string;
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'executor_superseded') {
    console.log('Another executor took over - stopping');
    break;
  }
}

step_committed

Emitted when staged changes are atomically committed to state.

typescript
interface StepCommittedChunk {
  type: 'step_committed';
  agentId: string;
  agentType: string;
  timestamp: number;
  runId: string;
  stepId: string; // Step that was committed
  checkpointId: string; // Checkpoint created by commit
}

step_discarded

Emitted when staged changes are discarded (on interrupt or failure).

typescript
interface StepDiscardedChunk {
  type: 'step_discarded';
  agentId: string;
  agentType: string;
  timestamp: number;
  runId: string;
  stepId: string;
  reason: string; // Why changes were discarded
}

stream_resync

Emitted when a stream is resynced after crash recovery, rollback, or branching. This notifies clients that the stream state has been reset to a previous checkpoint and they should refresh their UI.

typescript
interface StreamResyncChunk {
  type: 'stream_resync';
  agentId: string;
  agentType: string;
  timestamp: number;
  checkpointId: string; // Checkpoint that was restored
  stepCount: number; // Step count at the checkpoint
  messageCount: number; // Message count at the checkpoint
  fromSequence: number; // Stream sequence at the checkpoint
  reason: 'crash_recovery' | 'rollback' | 'branch';
}

When this is emitted:

  • crash_recovery: After a crash, the runtime recovers to the last checkpoint and emits this to notify clients
  • rollback: When explicitly rolling back to a previous checkpoint
  • branch: When creating a new execution branch from a checkpoint

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'stream_resync') {
    console.log(`Stream resynced: ${chunk.reason}`);
    console.log(`Restored to step ${chunk.stepCount}, message ${chunk.messageCount}`);

    // Refresh UI state from snapshot
    const snapshot = await fetch(`/api/chat/${sessionId}/snapshot`).then(r => r.json());
    setMessages(snapshot.messages);
  }
}

React Hook Integration:

The @helix-agents/ai-sdk/react package provides hooks for automatic handling:

typescript
import { useAutoResync } from '@helix-agents/ai-sdk/react';

// Automatic resync handling
useAutoResync(data, {
  snapshotUrl: `/api/chat/${sessionId}/snapshot`,
  setMessages,
  onResync: (event) => {
    toast.info(`Recovered from ${event.data.reason}`);
  },
});

See Frontend Integration for complete documentation of recovery hooks.

Complete Streaming Example

typescript
async function runWithStreaming() {
  const handle = await executor.execute(ResearchAgent, 'Research quantum computing');
  const stream = await handle.stream();

  if (!stream) {
    console.log('No stream available');
    return;
  }

  // Track state
  let currentTool = '';
  let toolDepth = 0;

  for await (const chunk of stream) {
    switch (chunk.type) {
      case 'text_delta':
        process.stdout.write(chunk.delta);
        break;

      case 'thinking':
        if (chunk.isComplete) {
          console.log('\n[Thinking]', chunk.content);
        }
        break;

      case 'tool_start':
        currentTool = chunk.toolName;
        console.log(`\n[→ ${chunk.toolName}]`, JSON.stringify(chunk.arguments));
        break;

      case 'tool_end':
        if (chunk.error) {
          console.log(`[✗ ${chunk.toolName}]`, chunk.error);
        } else {
          console.log(`[✓ ${chunk.toolName}]`);
        }
        break;

      case 'subagent_start':
        toolDepth++;
        console.log(`${'  '.repeat(toolDepth)}[Sub-agent: ${chunk.subAgentType}]`);
        break;

      case 'subagent_end':
        console.log(`${'  '.repeat(toolDepth)}[/${chunk.subAgentType}]`);
        toolDepth--;
        break;

      case 'custom':
        if (chunk.eventName === 'progress') {
          const { step, total } = chunk.data as { step: number; total: number };
          console.log(`[Progress: ${step}/${total}]`);
        }
        break;

      case 'state_patch':
        // Optionally apply patches to local state
        break;

      case 'error':
        console.error(`\n[Error]`, chunk.error);
        if (!chunk.recoverable) {
          console.error('Execution cannot continue');
        }
        break;

      case 'output':
        console.log('\n[Output]', JSON.stringify(chunk.output, null, 2));
        break;
    }
  }

  // Get final result
  const result = await handle.result();
  console.log('\nFinal status:', result.status);
}

Stream Filtering

The framework provides utilities for filtering streams.

By Chunk Type

typescript
import { filterStreamByType } from '@helix-agents/core';

const stream = await handle.stream();

// Only text deltas
const textStream = filterStreamByType(stream, 'text_delta');
for await (const chunk of textStream) {
  process.stdout.write(chunk.delta);
}

// Only tool events
const toolStream = filterStreamByType(stream, ['tool_start', 'tool_end']);
for await (const chunk of toolStream) {
  console.log(chunk.type, chunk.toolName);
}

By Agent

typescript
import { filterStreamByAgent } from '@helix-agents/core';

const stream = await handle.stream();

// Only chunks from specific agent
const agentStream = filterStreamByAgent(stream, 'run-123');
for await (const chunk of agentStream) {
  console.log(chunk);
}

// Exclude sub-agent chunks
const parentOnly = filterStreamByAgent(stream, handle.sessionId);

Custom Filters

typescript
async function* filterStream<T>(
  stream: AsyncIterable<T>,
  predicate: (chunk: T) => boolean
): AsyncIterable<T> {
  for await (const chunk of stream) {
    if (predicate(chunk)) {
      yield chunk;
    }
  }
}

// Example: only errors and outputs
const importantStream = filterStream(
  stream,
  (chunk) => chunk.type === 'error' || chunk.type === 'output'
);

Stream Control Messages

In addition to data chunks, streams can include control messages:

stream_end

Signals the stream is complete:

typescript
interface StreamEnd {
  type: 'stream_end';
  streamId: string;
  timestamp: number;
  finalOutput?: unknown;
}

This is a control message, not a data chunk. Most consumers don't need to handle it directly - the async iterator completes when the stream ends.

Frontend Integration

For web applications, streams are typically delivered via SSE:

typescript
// Backend
app.get('/agent/stream/:sessionId', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  const stream = await getStream(req.params.sessionId);

  for await (const chunk of stream) {
    res.write(`data: ${JSON.stringify(chunk)}\n\n`);
  }

  res.end();
});

// Frontend
const eventSource = new EventSource(`/agent/stream/${sessionId}`);

eventSource.onmessage = (event) => {
  const chunk = JSON.parse(event.data);
  handleChunk(chunk);
};

See Frontend Integration for complete patterns.

Resumable Streams

Streams support resumption from a specific sequence number, enabling:

  • Page refreshes - User refreshes browser, reconnects without losing progress
  • Network interruptions - Temporary disconnect, automatic reconnection
  • Mobile apps - App backgrounded and restored
  • Long-running agents - Hours-long research or analysis tasks

Basic Resumption

typescript
// Read from offset
const stream = await handle.stream({ fromOffset: 100 });

// Each chunk includes sequence info for resumability
for await (const chunk of stream) {
  // Store sequence for resumption
  localStorage.setItem('lastSeq', chunk._sequence);
}

// Resume later
const lastSeq = localStorage.getItem('lastSeq');
const resumedStream = await handle.stream({ fromOffset: parseInt(lastSeq) });

For production use, the recommended approach combines snapshots with sequence-based resumption. This eliminates race conditions and minimizes data transfer.

typescript
import { createFrontendHandler } from '@helix-agents/ai-sdk';

// Create handler
const handler = createFrontendHandler({
  streamManager,
  executor,
  agent: myAgent,
  stateStore,
});

// Get snapshot (implements "sequence last" pattern)
const snapshot = await handler.getSnapshot(sessionId);

// snapshot contains:
// - state: Agent state
// - messages: UI messages for initialMessages
// - streamSequence: Resume position
// - timestamp: When snapshot was created
// - status: 'active' | 'paused' | 'ended' | 'failed'

// Initialize client with snapshot
const { messages } = useChat({
  initialMessages: snapshot.messages,
  resume: snapshot.status === 'active', // Only resume if stream is active
});

Mid-Stream Page Refresh

A key feature of the snapshot system is partial content preservation. When a user refreshes the page during active streaming:

  1. Messages are only saved to the database after each step completes
  2. During streaming, content (text, tool calls) exists only in stream chunks
  3. The server can reconstruct partial content from stream chunks
  4. Result: User sees all content that was visible before refresh

Content Replay (Default):

By default, the @helix-agents/ai-sdk package uses content replay to handle mid-stream refresh. Instead of including partial content in initialMessages, the server replays it as stream events when the client reconnects. This prevents duplicate text that would otherwise occur due to how the AI SDK handles text-start events.

typescript
// With content replay (default), partial content is NOT in messages
const snapshot = await handler.getSnapshot(sessionId);
// snapshot.messages contains only completed messages

// When client reconnects, replay events are emitted first:
// [replay] text-start, text-delta("Hello ")  <- partial content
// [live]   text-delta("world"), text-end     <- new content

See Content Replay for configuration options.

Alternative Mode (Partial Content in Messages):

If you disable content replay or explicitly request partial content, it's included in the messages:

typescript
// Option 1: Disable content replay globally
const handler = createFrontendHandler({
  // ...
  contentReplay: { enabled: false },
});

// Option 2: Override per-snapshot
const snapshot = await handler.getSnapshot(sessionId, {
  includePartialContent: true,
});

// Result: partial messages have id ending in '-partial'
// [
//   { id: 'msg-0', role: 'user', parts: [...] },
//   { id: 'msg-1', role: 'assistant', parts: [...] }, // complete
//   { id: 'msg-2-partial', role: 'assistant', parts: [...] } // partial
// ]

The partial message includes:

  • Text: Accumulated from text_delta chunks
  • Tool calls: With current state (executing, completed, error)
  • Reasoning: Extended thinking content

Why Snapshot + Sequence Works

The key insight: State is saved BEFORE events are emitted. The snapshot endpoint loads state first and gets the sequence number last, so:

  1. State at T1 reflects events up to some sequence N
  2. Sequence at T2 (captured after) is >= N
  3. Client resumes from T2's sequence
  4. Events between N and T2 may appear in both state and stream (overlap)
  5. Deduplication removes duplicates - no data is lost

This is called the "sequence last" pattern and is deterministically correct.

Stream Status

The status field tells clients whether to attempt stream resumption:

StatusDescriptionAction
activeStream is runningSet resume: true
pausedStream is pausedMay resume later
endedCompleted successfullyNo SSE needed
failedStream failedHandle error state

SSE Event IDs

When using the AI SDK integration, streams automatically include SSE id: fields:

id: 42
data: {"type":"text-delta","delta":"Hello"}

id: 43
data: {"type":"text-delta","delta":" world"}

On disconnect, browsers reconnect with Last-Event-ID: 43, and the handler resumes from that position.

For complete patterns including SSR with Next.js, see:

Streaming Metadata

When using the AI SDK integration (@helix-agents/ai-sdk), you can attach custom metadata to stream start and finish events. This is useful for request tracing, timing metrics, model attribution, and passing context to clients.

Configuration

Configure metadata in the FrontendHandler:

typescript
import { createFrontendHandler } from '@helix-agents/ai-sdk';

const handler = createFrontendHandler({
  streamManager,
  executor,
  agent: myAgent,
  stateStore,
  transformerOptions: {
    // Static metadata - same for all streams
    startMetadata: { environment: 'production', version: '1.0' },
    finishMetadata: { service: 'chat-api' },

    // Or dynamic metadata - computed per stream
    startMetadata: (agentId) => ({
      agentId,
      requestId: generateRequestId(),
      startedAt: Date.now(),
    }),
    finishMetadata: (agentId) => ({
      agentId,
      finishedAt: Date.now(),
    }),
  },
});

Stream Events

Metadata appears in the messageMetadata field of SSE start and finish events:

data: {"type":"start","messageId":"msg-123","messageMetadata":{"requestId":"req-001","startedAt":1234567890}}

data: {"type":"finish","messageId":"msg-123","finishReason":"stop","messageMetadata":{"model":"claude-3","totalTokens":150}}

Use Cases

Use CasestartMetadatafinishMetadata
Request tracing{ requestId, traceId }{ requestId }
Timing metrics{ startedAt }{ duration, finishedAt }
Model attribution-{ model, provider }
Token counting-{ inputTokens, outputTokens }
User context{ userId, sessionId }-
Feature flags{ features: [...] }-

User Message Metadata

When using the AI SDK, you can also attach metadata to user messages:

typescript
// Frontend: include metadata in request body
const { messages } = useChat({
  api: '/api/chat',
  body: {
    metadata: {
      source: 'web-ui',
      userId: currentUser.id,
      sessionId: sessionId,
    },
  },
});

This metadata is persisted with the user message and available when loading message history via handler.getMessages().

See the AI SDK documentation for complete frontend integration patterns.

Best Practices

1. Handle All Chunk Types

Defensive programming for unknown chunk types:

typescript
for await (const chunk of stream) {
  switch (chunk.type) {
    case 'text_delta':
    case 'tool_start':
    case 'tool_end':
      // Handle known types
      break;
    default:
      // Log unknown types for debugging
      console.log('Unknown chunk type:', chunk.type);
  }
}

2. Buffer Text Deltas

Don't update UI on every delta - batch them:

typescript
let buffer = '';
let timeout: NodeJS.Timeout;

for await (const chunk of stream) {
  if (chunk.type === 'text_delta') {
    buffer += chunk.delta;

    // Debounce UI updates
    clearTimeout(timeout);
    timeout = setTimeout(() => {
      updateUI(buffer);
      buffer = '';
    }, 50);
  }
}

// Flush remaining
if (buffer) updateUI(buffer);

3. Track Tool Call IDs

Match tool_start with tool_end:

typescript
const pendingTools = new Map<string, ToolStartChunk>();

for await (const chunk of stream) {
  if (chunk.type === 'tool_start') {
    pendingTools.set(chunk.toolCallId, chunk);
  }
  if (chunk.type === 'tool_end') {
    const start = pendingTools.get(chunk.toolCallId);
    if (start) {
      const duration = chunk.timestamp - start.timestamp;
      console.log(`${chunk.toolName} took ${duration}ms`);
      pendingTools.delete(chunk.toolCallId);
    }
  }
}

4. Handle Errors Gracefully

Distinguish recoverable from fatal errors:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'error') {
    if (chunk.recoverable) {
      // Show warning, continue streaming
      showWarning(chunk.error);
    } else {
      // Show error, stop streaming
      showError(chunk.error);
      break;
    }
  }
}

Next Steps

Released under the MIT License.