Skip to content

Real-Time Streaming

Streaming provides real-time visibility into agent execution. The framework emits typed events for text generation, tool execution, sub-agent activity, custom events, and more.

Overview

When an agent executes, it produces a stream of chunks:

typescript
const handle = await executor.execute(agent, 'Research AI agents');
const stream = await handle.stream();

for await (const chunk of stream) {
  console.log(chunk.type, chunk);
}

Each chunk has:

  • type - Discriminator for the chunk kind
  • agentId - Which agent produced this chunk
  • agentType - The agent's type name
  • timestamp - When the chunk was created
  • Type-specific fields

Stream Chunk Types

text_delta

Incremental text from the LLM. Append deltas to build complete text.

typescript
interface TextDeltaChunk {
  type: 'text_delta';
  agentId: string;
  agentType: string;
  timestamp: number;
  delta: string; // Incremental text
}

Usage:

typescript
let fullText = '';

for await (const chunk of stream) {
  if (chunk.type === 'text_delta') {
    fullText += chunk.delta;
    process.stdout.write(chunk.delta); // Real-time display
  }
}

thinking

Reasoning/thinking content from LLMs that support it (Claude with extended thinking, OpenAI o-series).

typescript
interface ThinkingChunk {
  type: 'thinking';
  agentId: string;
  agentType: string;
  timestamp: number;
  content: string; // Thinking text
  isComplete: boolean; // false = streaming, true = complete block
}

Usage:

typescript
let thinkingBuffer = '';

for await (const chunk of stream) {
  if (chunk.type === 'thinking') {
    if (!chunk.isComplete) {
      // Streaming - accumulate
      thinkingBuffer += chunk.content;
    } else {
      // Complete block
      console.log('Thinking:', chunk.content);
      thinkingBuffer = '';
    }
  }
}

tool_start

Emitted when a tool begins execution.

typescript
interface ToolStartChunk {
  type: 'tool_start';
  agentId: string;
  agentType: string;
  timestamp: number;
  toolCallId: string; // Unique ID for this call
  toolName: string; // Tool being called
  arguments: Record<string, unknown>; // Arguments passed
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'tool_start') {
    console.log(`Starting: ${chunk.toolName}`);
    console.log(`Args: ${JSON.stringify(chunk.arguments)}`);
  }
}

tool_end

Emitted when a tool completes (success or failure).

typescript
interface ToolEndChunk {
  type: 'tool_end';
  agentId: string;
  agentType: string;
  timestamp: number;
  toolCallId: string; // Matches tool_start
  toolName: string;
  result: unknown; // Tool's return value
  error?: string; // Present if tool failed
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'tool_end') {
    if (chunk.error) {
      console.log(`${chunk.toolName} failed: ${chunk.error}`);
    } else {
      console.log(`${chunk.toolName} result:`, chunk.result);
    }
  }
}

subagent_start

Emitted when a sub-agent begins execution.

typescript
interface SubAgentStartChunk {
  type: 'subagent_start';
  agentId: string; // Parent agent ID
  agentType: string; // Parent agent type
  timestamp: number;
  subAgentType: string; // Sub-agent's type name
  subAgentRunId: string; // Sub-agent's unique run ID
  callId: string; // Tool call ID that spawned this
}

subagent_end

Emitted when a sub-agent completes.

typescript
interface SubAgentEndChunk {
  type: 'subagent_end';
  agentId: string; // Parent agent ID
  agentType: string; // Parent agent type
  timestamp: number;
  subAgentType: string;
  subAgentRunId: string;
  callId: string;
  result: unknown; // Sub-agent's output
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'subagent_start') {
    console.log(`Delegating to ${chunk.subAgentType}...`);
  }
  if (chunk.type === 'subagent_end') {
    console.log(`${chunk.subAgentType} completed:`, chunk.result);
  }
}

custom

Custom events emitted by tools via context.emit().

typescript
interface CustomEventChunk {
  type: 'custom';
  agentId: string;
  agentType: string;
  timestamp: number;
  eventName: string; // Event name from emit()
  data: unknown; // Event payload
}

Emitting custom events:

typescript
// In tool execute function
await context.emit('progress', { step: 1, total: 5 });
await context.emit('file_uploaded', { name: 'doc.pdf', size: 1024 });

Consuming custom events:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'custom') {
    switch (chunk.eventName) {
      case 'progress':
        console.log(`Progress: ${chunk.data.step}/${chunk.data.total}`);
        break;
      case 'file_uploaded':
        console.log(`Uploaded: ${chunk.data.name}`);
        break;
    }
  }
}

state_patch

RFC 6902 JSON Patch operations representing state changes.

typescript
interface StatePatchChunk {
  type: 'state_patch';
  agentId: string;
  agentType: string;
  timestamp: number;
  patches: JSONPatchOperation[];
}

interface JSONPatchOperation {
  op: 'add' | 'remove' | 'replace' | 'move' | 'copy' | 'test';
  path: string; // JSON Pointer path
  value?: unknown; // Value for add/replace
  from?: string; // Source for move/copy
}

Example patches:

typescript
// When tool does: draft.count = 5
{ op: 'replace', path: '/count', value: 5 }

// When tool does: draft.items.push('new')
{ op: 'add', path: '/items/0', value: 'new' }

// When tool does: delete draft.temp
{ op: 'remove', path: '/temp' }

Applying patches (client-side):

typescript
import { applyPatch } from 'fast-json-patch';

let clientState = { count: 0, items: [] };

for await (const chunk of stream) {
  if (chunk.type === 'state_patch') {
    clientState = applyPatch(clientState, chunk.patches).newDocument;
    console.log('State updated:', clientState);
  }
}

error

Error events during execution.

typescript
interface ErrorChunk {
  type: 'error';
  agentId: string;
  agentType: string;
  timestamp: number;
  error: string; // Error message
  code?: string; // Error code for categorization
  recoverable: boolean; // Whether execution can continue
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'error') {
    if (chunk.recoverable) {
      console.warn(`Warning: ${chunk.error}`);
    } else {
      console.error(`Fatal: ${chunk.error}`);
      break;
    }
  }
}

output

Final structured output when the agent completes successfully.

typescript
interface OutputChunk {
  type: 'output';
  agentId: string;
  agentType: string;
  timestamp: number;
  output: unknown; // Validated against outputSchema
}

Usage:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'output') {
    // Agent completed with structured output
    const result = chunk.output as MyOutputType;
    console.log('Final result:', result);
  }
}

Complete Streaming Example

typescript
async function runWithStreaming() {
  const handle = await executor.execute(ResearchAgent, 'Research quantum computing');
  const stream = await handle.stream();

  if (!stream) {
    console.log('No stream available');
    return;
  }

  // Track state
  let currentTool = '';
  let toolDepth = 0;

  for await (const chunk of stream) {
    switch (chunk.type) {
      case 'text_delta':
        process.stdout.write(chunk.delta);
        break;

      case 'thinking':
        if (chunk.isComplete) {
          console.log('\n[Thinking]', chunk.content);
        }
        break;

      case 'tool_start':
        currentTool = chunk.toolName;
        console.log(`\n[→ ${chunk.toolName}]`, JSON.stringify(chunk.arguments));
        break;

      case 'tool_end':
        if (chunk.error) {
          console.log(`[✗ ${chunk.toolName}]`, chunk.error);
        } else {
          console.log(`[✓ ${chunk.toolName}]`);
        }
        break;

      case 'subagent_start':
        toolDepth++;
        console.log(`${'  '.repeat(toolDepth)}[Sub-agent: ${chunk.subAgentType}]`);
        break;

      case 'subagent_end':
        console.log(`${'  '.repeat(toolDepth)}[/${chunk.subAgentType}]`);
        toolDepth--;
        break;

      case 'custom':
        if (chunk.eventName === 'progress') {
          const { step, total } = chunk.data as { step: number; total: number };
          console.log(`[Progress: ${step}/${total}]`);
        }
        break;

      case 'state_patch':
        // Optionally apply patches to local state
        break;

      case 'error':
        console.error(`\n[Error]`, chunk.error);
        if (!chunk.recoverable) {
          console.error('Execution cannot continue');
        }
        break;

      case 'output':
        console.log('\n[Output]', JSON.stringify(chunk.output, null, 2));
        break;
    }
  }

  // Get final result
  const result = await handle.result();
  console.log('\nFinal status:', result.status);
}

Stream Filtering

The framework provides utilities for filtering streams.

By Chunk Type

typescript
import { filterStreamByType } from '@helix-agents/core';

const stream = await handle.stream();

// Only text deltas
const textStream = filterStreamByType(stream, 'text_delta');
for await (const chunk of textStream) {
  process.stdout.write(chunk.delta);
}

// Only tool events
const toolStream = filterStreamByType(stream, ['tool_start', 'tool_end']);
for await (const chunk of toolStream) {
  console.log(chunk.type, chunk.toolName);
}

By Agent

typescript
import { filterStreamByAgent } from '@helix-agents/core';

const stream = await handle.stream();

// Only chunks from specific agent
const agentStream = filterStreamByAgent(stream, 'run-123');
for await (const chunk of agentStream) {
  console.log(chunk);
}

// Exclude sub-agent chunks
const parentOnly = filterStreamByAgent(stream, handle.runId);

Custom Filters

typescript
async function* filterStream<T>(
  stream: AsyncIterable<T>,
  predicate: (chunk: T) => boolean
): AsyncIterable<T> {
  for await (const chunk of stream) {
    if (predicate(chunk)) {
      yield chunk;
    }
  }
}

// Example: only errors and outputs
const importantStream = filterStream(
  stream,
  (chunk) => chunk.type === 'error' || chunk.type === 'output'
);

Stream Control Messages

In addition to data chunks, streams can include control messages:

stream_end

Signals the stream is complete:

typescript
interface StreamEnd {
  type: 'stream_end';
  streamId: string;
  timestamp: number;
  finalOutput?: unknown;
}

This is a control message, not a data chunk. Most consumers don't need to handle it directly - the async iterator completes when the stream ends.

Frontend Integration

For web applications, streams are typically delivered via SSE:

typescript
// Backend
app.get('/agent/stream/:runId', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  const stream = await getStream(req.params.runId);

  for await (const chunk of stream) {
    res.write(`data: ${JSON.stringify(chunk)}\n\n`);
  }

  res.end();
});

// Frontend
const eventSource = new EventSource(`/agent/stream/${runId}`);

eventSource.onmessage = (event) => {
  const chunk = JSON.parse(event.data);
  handleChunk(chunk);
};

See Frontend Integration for complete patterns.

Resumable Streams

Streams support resumption from a specific point:

typescript
// Read from offset
const stream = await handle.stream({ fromOffset: 100 });

// Each chunk includes sequence info for resumability
for await (const chunk of stream) {
  // Store sequence for resumption
  localStorage.setItem('lastSeq', chunk._sequence);
}

// Resume later
const lastSeq = localStorage.getItem('lastSeq');
const resumedStream = await handle.stream({ fromOffset: parseInt(lastSeq) });

Best Practices

1. Handle All Chunk Types

Defensive programming for unknown chunk types:

typescript
for await (const chunk of stream) {
  switch (chunk.type) {
    case 'text_delta':
    case 'tool_start':
    case 'tool_end':
      // Handle known types
      break;
    default:
      // Log unknown types for debugging
      console.log('Unknown chunk type:', chunk.type);
  }
}

2. Buffer Text Deltas

Don't update UI on every delta - batch them:

typescript
let buffer = '';
let timeout: NodeJS.Timeout;

for await (const chunk of stream) {
  if (chunk.type === 'text_delta') {
    buffer += chunk.delta;

    // Debounce UI updates
    clearTimeout(timeout);
    timeout = setTimeout(() => {
      updateUI(buffer);
      buffer = '';
    }, 50);
  }
}

// Flush remaining
if (buffer) updateUI(buffer);

3. Track Tool Call IDs

Match tool_start with tool_end:

typescript
const pendingTools = new Map<string, ToolStartChunk>();

for await (const chunk of stream) {
  if (chunk.type === 'tool_start') {
    pendingTools.set(chunk.toolCallId, chunk);
  }
  if (chunk.type === 'tool_end') {
    const start = pendingTools.get(chunk.toolCallId);
    if (start) {
      const duration = chunk.timestamp - start.timestamp;
      console.log(`${chunk.toolName} took ${duration}ms`);
      pendingTools.delete(chunk.toolCallId);
    }
  }
}

4. Handle Errors Gracefully

Distinguish recoverable from fatal errors:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'error') {
    if (chunk.recoverable) {
      // Show warning, continue streaming
      showWarning(chunk.error);
    } else {
      // Show error, stop streaming
      showError(chunk.error);
      break;
    }
  }
}

Next Steps

Released under the MIT License.