Real-Time Streaming
Streaming provides real-time visibility into agent execution. The framework emits typed events for text generation, tool execution, sub-agent activity, custom events, and more.
Overview
When an agent executes, it produces a stream of chunks:
const handle = await executor.execute(agent, 'Research AI agents');
const stream = await handle.stream();
for await (const chunk of stream) {
console.log(chunk.type, chunk);
}Each chunk has:
type- Discriminator for the chunk kindagentId- Which agent produced this chunkagentType- The agent's type nametimestamp- When the chunk was created- Type-specific fields
Stream Chunk Types
text_delta
Incremental text from the LLM. Append deltas to build complete text.
interface TextDeltaChunk {
type: 'text_delta';
agentId: string;
agentType: string;
timestamp: number;
delta: string; // Incremental text
}Usage:
let fullText = '';
for await (const chunk of stream) {
if (chunk.type === 'text_delta') {
fullText += chunk.delta;
process.stdout.write(chunk.delta); // Real-time display
}
}thinking
Reasoning/thinking content from LLMs that support it (Claude with extended thinking, OpenAI o-series).
interface ThinkingChunk {
type: 'thinking';
agentId: string;
agentType: string;
timestamp: number;
content: string; // Thinking text
isComplete: boolean; // false = streaming, true = complete block
}Usage:
let thinkingBuffer = '';
for await (const chunk of stream) {
if (chunk.type === 'thinking') {
if (!chunk.isComplete) {
// Streaming - accumulate
thinkingBuffer += chunk.content;
} else {
// Complete block
console.log('Thinking:', chunk.content);
thinkingBuffer = '';
}
}
}tool_start
Emitted when a tool begins execution.
interface ToolStartChunk {
type: 'tool_start';
agentId: string;
agentType: string;
timestamp: number;
toolCallId: string; // Unique ID for this call
toolName: string; // Tool being called
arguments: Record<string, unknown>; // Arguments passed
}Usage:
for await (const chunk of stream) {
if (chunk.type === 'tool_start') {
console.log(`Starting: ${chunk.toolName}`);
console.log(`Args: ${JSON.stringify(chunk.arguments)}`);
}
}tool_end
Emitted when a tool completes (success or failure).
interface ToolEndChunk {
type: 'tool_end';
agentId: string;
agentType: string;
timestamp: number;
toolCallId: string; // Matches tool_start
toolName: string;
result: unknown; // Tool's return value
error?: string; // Present if tool failed
}Usage:
for await (const chunk of stream) {
if (chunk.type === 'tool_end') {
if (chunk.error) {
console.log(`${chunk.toolName} failed: ${chunk.error}`);
} else {
console.log(`${chunk.toolName} result:`, chunk.result);
}
}
}subagent_start
Emitted when a sub-agent begins execution.
interface SubAgentStartChunk {
type: 'subagent_start';
agentId: string; // Parent agent ID
agentType: string; // Parent agent type
timestamp: number;
subAgentType: string; // Sub-agent's type name
subAgentRunId: string; // Sub-agent's unique run ID
callId: string; // Tool call ID that spawned this
}subagent_end
Emitted when a sub-agent completes.
interface SubAgentEndChunk {
type: 'subagent_end';
agentId: string; // Parent agent ID
agentType: string; // Parent agent type
timestamp: number;
subAgentType: string;
subAgentRunId: string;
callId: string;
result: unknown; // Sub-agent's output
}Usage:
for await (const chunk of stream) {
if (chunk.type === 'subagent_start') {
console.log(`Delegating to ${chunk.subAgentType}...`);
}
if (chunk.type === 'subagent_end') {
console.log(`${chunk.subAgentType} completed:`, chunk.result);
}
}custom
Custom events emitted by tools via context.emit().
interface CustomEventChunk {
type: 'custom';
agentId: string;
agentType: string;
timestamp: number;
eventName: string; // Event name from emit()
data: unknown; // Event payload
}Emitting custom events:
// In tool execute function
await context.emit('progress', { step: 1, total: 5 });
await context.emit('file_uploaded', { name: 'doc.pdf', size: 1024 });Consuming custom events:
for await (const chunk of stream) {
if (chunk.type === 'custom') {
switch (chunk.eventName) {
case 'progress':
console.log(`Progress: ${chunk.data.step}/${chunk.data.total}`);
break;
case 'file_uploaded':
console.log(`Uploaded: ${chunk.data.name}`);
break;
}
}
}state_patch
RFC 6902 JSON Patch operations representing state changes.
interface StatePatchChunk {
type: 'state_patch';
agentId: string;
agentType: string;
timestamp: number;
patches: JSONPatchOperation[];
}
interface JSONPatchOperation {
op: 'add' | 'remove' | 'replace' | 'move' | 'copy' | 'test';
path: string; // JSON Pointer path
value?: unknown; // Value for add/replace
from?: string; // Source for move/copy
}Example patches:
// When tool does: draft.count = 5
{ op: 'replace', path: '/count', value: 5 }
// When tool does: draft.items.push('new')
{ op: 'add', path: '/items/0', value: 'new' }
// When tool does: delete draft.temp
{ op: 'remove', path: '/temp' }Applying patches (client-side):
import { applyPatch } from 'fast-json-patch';
let clientState = { count: 0, items: [] };
for await (const chunk of stream) {
if (chunk.type === 'state_patch') {
clientState = applyPatch(clientState, chunk.patches).newDocument;
console.log('State updated:', clientState);
}
}error
Error events during execution.
interface ErrorChunk {
type: 'error';
agentId: string;
agentType: string;
timestamp: number;
error: string; // Error message
code?: string; // Error code for categorization
recoverable: boolean; // Whether execution can continue
}Usage:
for await (const chunk of stream) {
if (chunk.type === 'error') {
if (chunk.recoverable) {
console.warn(`Warning: ${chunk.error}`);
} else {
console.error(`Fatal: ${chunk.error}`);
break;
}
}
}output
Final structured output when the agent completes successfully.
interface OutputChunk {
type: 'output';
agentId: string;
agentType: string;
timestamp: number;
output: unknown; // Validated against outputSchema
}Usage:
for await (const chunk of stream) {
if (chunk.type === 'output') {
// Agent completed with structured output
const result = chunk.output as MyOutputType;
console.log('Final result:', result);
}
}Complete Streaming Example
async function runWithStreaming() {
const handle = await executor.execute(ResearchAgent, 'Research quantum computing');
const stream = await handle.stream();
if (!stream) {
console.log('No stream available');
return;
}
// Track state
let currentTool = '';
let toolDepth = 0;
for await (const chunk of stream) {
switch (chunk.type) {
case 'text_delta':
process.stdout.write(chunk.delta);
break;
case 'thinking':
if (chunk.isComplete) {
console.log('\n[Thinking]', chunk.content);
}
break;
case 'tool_start':
currentTool = chunk.toolName;
console.log(`\n[→ ${chunk.toolName}]`, JSON.stringify(chunk.arguments));
break;
case 'tool_end':
if (chunk.error) {
console.log(`[✗ ${chunk.toolName}]`, chunk.error);
} else {
console.log(`[✓ ${chunk.toolName}]`);
}
break;
case 'subagent_start':
toolDepth++;
console.log(`${' '.repeat(toolDepth)}[Sub-agent: ${chunk.subAgentType}]`);
break;
case 'subagent_end':
console.log(`${' '.repeat(toolDepth)}[/${chunk.subAgentType}]`);
toolDepth--;
break;
case 'custom':
if (chunk.eventName === 'progress') {
const { step, total } = chunk.data as { step: number; total: number };
console.log(`[Progress: ${step}/${total}]`);
}
break;
case 'state_patch':
// Optionally apply patches to local state
break;
case 'error':
console.error(`\n[Error]`, chunk.error);
if (!chunk.recoverable) {
console.error('Execution cannot continue');
}
break;
case 'output':
console.log('\n[Output]', JSON.stringify(chunk.output, null, 2));
break;
}
}
// Get final result
const result = await handle.result();
console.log('\nFinal status:', result.status);
}Stream Filtering
The framework provides utilities for filtering streams.
By Chunk Type
import { filterStreamByType } from '@helix-agents/core';
const stream = await handle.stream();
// Only text deltas
const textStream = filterStreamByType(stream, 'text_delta');
for await (const chunk of textStream) {
process.stdout.write(chunk.delta);
}
// Only tool events
const toolStream = filterStreamByType(stream, ['tool_start', 'tool_end']);
for await (const chunk of toolStream) {
console.log(chunk.type, chunk.toolName);
}By Agent
import { filterStreamByAgent } from '@helix-agents/core';
const stream = await handle.stream();
// Only chunks from specific agent
const agentStream = filterStreamByAgent(stream, 'run-123');
for await (const chunk of agentStream) {
console.log(chunk);
}
// Exclude sub-agent chunks
const parentOnly = filterStreamByAgent(stream, handle.runId);Custom Filters
async function* filterStream<T>(
stream: AsyncIterable<T>,
predicate: (chunk: T) => boolean
): AsyncIterable<T> {
for await (const chunk of stream) {
if (predicate(chunk)) {
yield chunk;
}
}
}
// Example: only errors and outputs
const importantStream = filterStream(
stream,
(chunk) => chunk.type === 'error' || chunk.type === 'output'
);Stream Control Messages
In addition to data chunks, streams can include control messages:
stream_end
Signals the stream is complete:
interface StreamEnd {
type: 'stream_end';
streamId: string;
timestamp: number;
finalOutput?: unknown;
}This is a control message, not a data chunk. Most consumers don't need to handle it directly - the async iterator completes when the stream ends.
Frontend Integration
For web applications, streams are typically delivered via SSE:
// Backend
app.get('/agent/stream/:runId', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
const stream = await getStream(req.params.runId);
for await (const chunk of stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
});
// Frontend
const eventSource = new EventSource(`/agent/stream/${runId}`);
eventSource.onmessage = (event) => {
const chunk = JSON.parse(event.data);
handleChunk(chunk);
};See Frontend Integration for complete patterns.
Resumable Streams
Streams support resumption from a specific point:
// Read from offset
const stream = await handle.stream({ fromOffset: 100 });
// Each chunk includes sequence info for resumability
for await (const chunk of stream) {
// Store sequence for resumption
localStorage.setItem('lastSeq', chunk._sequence);
}
// Resume later
const lastSeq = localStorage.getItem('lastSeq');
const resumedStream = await handle.stream({ fromOffset: parseInt(lastSeq) });Best Practices
1. Handle All Chunk Types
Defensive programming for unknown chunk types:
for await (const chunk of stream) {
switch (chunk.type) {
case 'text_delta':
case 'tool_start':
case 'tool_end':
// Handle known types
break;
default:
// Log unknown types for debugging
console.log('Unknown chunk type:', chunk.type);
}
}2. Buffer Text Deltas
Don't update UI on every delta - batch them:
let buffer = '';
let timeout: NodeJS.Timeout;
for await (const chunk of stream) {
if (chunk.type === 'text_delta') {
buffer += chunk.delta;
// Debounce UI updates
clearTimeout(timeout);
timeout = setTimeout(() => {
updateUI(buffer);
buffer = '';
}, 50);
}
}
// Flush remaining
if (buffer) updateUI(buffer);3. Track Tool Call IDs
Match tool_start with tool_end:
const pendingTools = new Map<string, ToolStartChunk>();
for await (const chunk of stream) {
if (chunk.type === 'tool_start') {
pendingTools.set(chunk.toolCallId, chunk);
}
if (chunk.type === 'tool_end') {
const start = pendingTools.get(chunk.toolCallId);
if (start) {
const duration = chunk.timestamp - start.timestamp;
console.log(`${chunk.toolName} took ${duration}ms`);
pendingTools.delete(chunk.toolCallId);
}
}
}4. Handle Errors Gracefully
Distinguish recoverable from fatal errors:
for await (const chunk of stream) {
if (chunk.type === 'error') {
if (chunk.recoverable) {
// Show warning, continue streaming
showWarning(chunk.error);
} else {
// Show error, stop streaming
showError(chunk.error);
break;
}
}
}Next Steps
- Frontend Integration - Display streams in web UIs
- Sub-Agents - Understanding nested streams
- State Management - Using state_patch chunks