AI SDK Package
The @helix-agents/ai-sdk package bridges Helix Agents with Vercel AI SDK frontend hooks. It transforms Helix's internal streaming protocol to the AI SDK UI Data Stream format.
Installation
npm install @helix-agents/ai-sdkFrontendHandler
The FrontendHandler provides a unified API for streaming agent responses to AI SDK frontends. It works in two deployment modes depending on your architecture.
Deployment Modes
Direct Mode (In-Process)
Use direct mode when your API routes run in the same process as the agent executor:
┌─────────────────────────────────────────┐
│ Your Server │
│ │
│ API Route → FrontendHandler │
│ ↓ │
│ AgentExecutor (JS/Temporal) │
│ ↓ │
│ StateStore + StreamManager │
│ (Redis, Memory, etc.) │
└─────────────────────────────────────────┘Use with: JS Runtime, Temporal Runtime, Cloudflare Workflows (same worker)
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);
const handler = createFrontendHandler({
executor,
streamManager,
stateStore,
agent: MyAgent,
});Cloudflare Durable Objects Mode
For Cloudflare deployments, pass the DO namespace directly:
┌─────────────────────────────────────────┐
│ Cloudflare Worker │
│ │
│ API Route → FrontendHandler │
│ ↓ │
│ AgentServer (DO) │
│ ↓ │
│ StateStore + StreamManager │
└─────────────────────────────────────────┘Use with: Cloudflare Workers with Durable Objects
import { createFrontendHandler } from '@helix-agents/ai-sdk';
const handler = createFrontendHandler({
namespace: env.AGENTS,
agentName: 'chat-agent',
});The handler automatically creates the necessary DO clients internally.
Choosing a Mode
| Scenario | Mode | Why |
|---|---|---|
| Next.js + Redis | Direct | Same process, direct store access |
| Express + Temporal | Direct | Temporal client in same process |
| Cloudflare Workers + DO | Cloudflare | Pass namespace and agentName |
| Single Cloudflare Worker | Cloudflare | DO accessible via env.AGENTS binding |
Basic Setup
Once you've chosen a mode, the FrontendHandler API is identical:
import { createFrontendHandler } from '@helix-agents/ai-sdk';
const handler = createFrontendHandler({
streamManager,
executor,
agent: MyAgent,
stateStore, // Optional: for getMessages()
transformerOptions: { ... }, // Optional: customize transformation
logger: console, // Optional: debug logging
});Request Modes
POST Mode - Execute new agent:
const response = await handler.handleRequest({
method: 'POST',
body: {
message: 'Hello, agent!',
state: { initialValue: 42 }, // Optional initial state
},
});GET Mode - Stream existing execution:
const response = await handler.handleRequest({
method: 'GET',
streamId: 'run-123',
resumeAt: lastEventId, // Optional: resume from position
});Response Handling
The handler returns a framework-agnostic response:
interface FrontendResponse {
status: number;
headers: Record<string, string>;
body: ReadableStream<Uint8Array> | string;
}Convert to your framework's response:
// Hono / Web standards
return new Response(response.body, {
status: response.status,
headers: response.headers,
});
// Express (use pipeToExpress helper)
import { pipeToExpress } from '@helix-agents/ai-sdk/adapters/express';
await pipeToExpress(response, res);Loading Message History
Load conversation history for useChat initialMessages:
const { messages, hasMore } = await handler.getMessages(sessionId, {
// Pagination
offset: 0,
limit: 50,
// Content options
includeReasoning: true, // Include thinking content
includeToolResults: true, // Merge tool results into messages
// Custom ID generation
generateId: (index, msg) => `msg-${index}`,
});
// Use with useChat
const { messages } = useChat({
initialMessages: messages,
});Using Store Utilities
For more control, use the store utilities directly:
import { loadUIMessages, loadAllUIMessages } from '@helix-agents/ai-sdk';
// Paginated loading
const { messages, hasMore } = await loadUIMessages(stateStore, sessionId, {
offset: 0,
limit: 50,
includeReasoning: true,
includeToolResults: true,
});
// Load all messages (handles pagination internally)
const allMessages = await loadAllUIMessages(stateStore, sessionId);UIMessageStore Wrapper
For repeated access, wrap your state store:
import { createUIMessageStore } from '@helix-agents/ai-sdk';
const uiStore = createUIMessageStore(stateStore);
const { messages, hasMore } = await uiStore.getUIMessages(sessionId);
const all = await uiStore.getAllUIMessages(sessionId);StreamTransformer
Transforms individual Helix chunks to AI SDK events:
import { StreamTransformer } from '@helix-agents/ai-sdk';
const transformer = new StreamTransformer({
// Custom message ID generation
generateMessageId: (agentId) => `msg-${agentId}`,
// Include step boundary events
includeStepEvents: false,
// Filter chunks
chunkFilter: (chunk) => chunk.type !== 'state_patch',
// Debug logging
logger: console,
});Transformation Flow
// Stream processing
for await (const chunk of helixStream) {
const { events, sequence } = transformer.transform(chunk);
for (const event of events) {
// Emit SSE with optional event ID for resumability
yield { event, sequence };
}
}
// Always finalize to close blocks and emit finish
const { events } = transformer.finalize();
for (const event of events) {
yield event;
}Event Mapping
| Helix Chunk | AI SDK Events |
|---|---|
text_delta | text-start (once), text-delta |
thinking | reasoning-start (once), reasoning-delta, reasoning-end (if complete) |
tool_start | text-end (if text open), tool-input-available |
tool_end | tool-output-available |
subagent_start | data-subagent-start |
subagent_end | data-subagent-end |
custom | data-{eventName} |
state_patch | data-state-patch |
error | error |
output | data-output |
Tool Argument Streaming
| Helix Chunk | AI SDK Event |
|---|---|
tool_arg_stream_start | tool-input-start |
tool_arg_stream_delta | tool-input-delta |
tool_arg_stream_end | tool-input-available |
tool_input_error | tool-input-error |
tool_output_error | tool-output-error |
Control Flow Events
| Helix Chunk | AI SDK Event |
|---|---|
run_interrupted | data-run-interrupted |
run_resumed | data-run-resumed |
run_paused | data-run-paused |
checkpoint_created | data-checkpoint-created |
step_committed | data-step-committed |
step_discarded | data-step-discarded |
stream_resync | data-stream-resync |
executor_superseded | data-executor-superseded |
Important: All tool events include
dynamic: truebecause Helix tools are defined at runtime. This tells the AI SDK to useDynamicToolUIPartformat.
Block Management
The transformer manages text and reasoning blocks:
// First text_delta opens a text block
// { type: 'text-start', id: 'block-1' }
// { type: 'text-delta', id: 'block-1', delta: 'Hello' }
// Switching to tool_start closes the text block
// { type: 'text-end', id: 'block-1' }
// { type: 'tool-input-available', ... }
// New text_delta opens a new block
// { type: 'text-start', id: 'block-2' }Message Converter
Converts Helix internal messages to AI SDK v6 UIMessage format:
import { convertToUIMessages } from '@helix-agents/ai-sdk';
const uiMessages = convertToUIMessages(helixMessages, {
generateId: (index, msg) => `msg-${index}`,
includeReasoning: true,
includeToolResults: true,
});AI SDK v6 Format
The converter produces AI SDK v6 UIMessage format:
interface UIMessage {
id: string;
role: 'user' | 'assistant' | 'system';
parts: UIMessagePart[]; // v6: parts is the source of truth
}
type UIMessagePart =
| { type: 'text'; text: string }
| { type: 'reasoning'; text: string }
| {
type: `tool-${string}`;
toolCallId: string;
input: Record<string, unknown>;
state: ToolInvocationState;
output?: unknown;
};Conversion Rules
- System messages → Single text part
- User messages → Single text part
- Assistant messages → Text, reasoning, and tool parts
- Tool result messages → Merged into assistant's tool parts (not separate messages)
// Helix messages
[
{ role: 'user', content: 'Hello' },
{ role: 'assistant', content: 'Let me search...', toolCalls: [...] },
{ role: 'tool', toolCallId: 'tc1', content: '{"result": "..."}' },
]
// Converted to UI messages (v6 format)
[
{ id: 'msg-0', role: 'user', parts: [{ type: 'text', text: 'Hello' }] },
{
id: 'msg-1',
role: 'assistant',
parts: [
{ type: 'text', text: 'Let me search...' },
{ type: 'tool-search', toolCallId: 'tc1', input: {...}, state: 'output-available', output: {...} }
]
},
]Message Format Reference
AI SDK v6 Format
The AI SDK package produces messages in Vercel AI SDK v6 format:
- Tool parts use
type: 'tool-${toolName}'pattern (e.g.,tool-search) - Tool state uses
ToolInvocationState:input-streaming,input-available,output-available,output-error - Error field is
errorText
Core Format
For framework-agnostic code, use the core format via loadCoreUIMessages():
- Tool parts use
type: 'tool-invocation' - Tool state uses
UIToolState:pending,executing,completed,error - Error field is
error
Choosing a Format
| Use Case | Format | Function |
|---|---|---|
| React with useChat | AI SDK v6 | loadUIMessages() |
| Custom UI | Either | Choose based on needs |
| Framework-agnostic library | Core | loadCoreUIMessages() |
State Mapping
| Core State | AI SDK State | Description |
|---|---|---|
| pending | input-available | Awaiting execution |
| executing | input-available | Currently running |
| completed | output-available | Finished successfully |
| error | output-error | Execution failed |
For complete documentation, see UI Messages Guide.
SSE Response Builder
Build Server-Sent Events responses:
import { buildSSEResponse, createSSEStream, createSSEHeaders } from '@helix-agents/ai-sdk';
// Full response builder
const response = buildSSEResponse(eventsGenerator, {
headers: { 'X-Custom-Header': 'value' },
});
// Or build manually
const headers = createSSEHeaders({ 'X-Custom': 'value' });
const stream = createSSEStream(eventsGenerator);SSE Format
Events are formatted as SSE:
id: 1
data: {"type":"text-delta","id":"block-1","delta":"Hello"}
id: 2
data: {"type":"text-delta","id":"block-1","delta":" world"}
data: {"type":"finish"}The id: field enables stream resumability.
Header Utilities
Extract resume position from headers:
import { extractResumePosition, AI_SDK_UI_HEADER } from '@helix-agents/ai-sdk';
// From Last-Event-ID header (automatic reconnection)
const lastEventId = request.headers.get('Last-Event-ID');
const resumeAt = extractResumePosition(lastEventId);
// AI SDK UI header for detection
// 'X-AI-SDK-UI': 'vercel-ai-sdk-ui'
const isAISDK = request.headers.get(AI_SDK_UI_HEADER) === AI_SDK_UI_HEADER_VALUE;Typed Errors
All errors extend FrontendHandlerError:
import {
FrontendHandlerError,
ValidationError,
StreamNotFoundError,
StreamFailedError,
ConfigurationError,
ExecutionError,
StreamCreationError,
} from '@helix-agents/ai-sdk';Error Types
| Error | Code | Status | When |
|---|---|---|---|
ValidationError | VALIDATION_ERROR | 400 | Missing/invalid request params |
StreamNotFoundError | STREAM_NOT_FOUND | 404 | Stream doesn't exist |
StreamFailedError | STREAM_FAILED | 410 | Stream has failed |
ConfigurationError | CONFIGURATION_ERROR | 501 | Missing configuration |
ExecutionError | EXECUTION_ERROR | 500 | Agent execution failed |
StreamCreationError | STREAM_CREATION_ERROR | 500 | Stream creation failed |
Error Handling Pattern
try {
const response = await handler.handleRequest(req);
return new Response(response.body, {
status: response.status,
headers: response.headers,
});
} catch (error) {
if (error instanceof FrontendHandlerError) {
return Response.json({ error: error.message, code: error.code }, { status: error.statusCode });
}
// Re-throw unexpected errors
throw error;
}Multi-Turn Conversations
The handler supports multi-turn conversations using the session-centric model. There are two approaches:
sessionId- Continue a conversation within the same session (history is stored in the framework's state store)messages- Pass your own conversation history directly (for external storage)
Using sessionId
Pass sessionId in the request body to continue a conversation within the same session:
const response = await handler.handleRequest({
method: 'POST',
body: {
message: 'Tell me more about that',
sessionId: 'session-123', // Session ID for conversation continuity
},
});Using Direct messages
When you manage your own conversation storage, pass the message history directly:
const response = await handler.handleRequest({
method: 'POST',
body: {
message: 'Tell me more about that', // New user message to append
messages: [
// Previous conversation history
{ role: 'user', content: 'Hello, my name is Alice' },
{ role: 'assistant', content: 'Hello Alice! How can I help you?' },
],
},
});This is useful when:
- You store conversation history in your own database
- You want full control over what context the agent sees
- You're building chat features outside the framework's state store
Note: System messages in messages are filtered out and re-added dynamically by the agent.
Behavior
Both messages and state have override semantics - when provided, they replace (not merge with) values from the session.
| Input | Messages Source | State Source |
|---|---|---|
message only (new session) | Empty (fresh) | Empty (fresh) |
message + sessionId (existing) | From session | From session |
message + messages | From messages | Empty (fresh) |
message + state | Empty (fresh) | From state |
message + sessionId + messages | From messages (override) | From session |
message + sessionId + state | From session | From state (override) |
| All four | From messages (override) | From state (override) |
- Sessions contain all messages and state for a conversation
- Each execution creates a new run within the session (for debugging, billing, tracing)
- Non-existent sessions are automatically created on first message
Frontend Integration
Track the sessionId for conversation continuity:
const [sessionId, setSessionId] = useState<string>(() => crypto.randomUUID());
const { messages } = useChat({
api: '/api/chat',
body: { sessionId },
});Response Header
The stream ID is returned in the X-Session-Id header:
X-Session-Id: session-123Stream Resumability
The handler supports SSE event IDs for stream resumability. The recommended approach for production use is Snapshot + Sequence-Based Resume using the HelixChatTransport.
Client Transport (Recommended)
The HelixChatTransport is a convenience transport for AI SDK v6 that handles Helix-specific stream resumption patterns:
import { useChat } from '@ai-sdk/react';
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
function ResumableChat({ sessionId, initialSnapshot }) {
const shouldResume = initialSnapshot.status === 'active';
const { messages, sendMessage, status } = useChat({
id: `chat-${sessionId}`,
transport: new HelixChatTransport({
api: `/api/chat/${sessionId}`,
resumeFromSequence: shouldResume ? initialSnapshot.streamSequence : undefined,
}),
initialMessages: initialSnapshot.messages,
resume: shouldResume,
});
return <MessageList messages={messages} />;
}The transport handles:
- Same API path for POST and GET - No
/streamsuffix needed X-Resume-From-Sequenceheader - Automatically added whenresumeFromSequenceis provided- Custom options - Headers, body, credentials, and fetch can be customized
Transport Options
interface HelixChatTransportOptions {
api: string; // API endpoint (e.g., '/api/chat/session-123')
resumeFromSequence?: number; // Stream sequence to resume from
headers?: Record<string, string>; // Custom headers
body?: Record<string, unknown>; // Additional body properties
credentials?: 'omit' | 'same-origin' | 'include';
fetch?: typeof fetch; // Custom fetch implementation
}The Snapshot Approach
For deterministically correct resumption with no race conditions:
- Load snapshot first - Get messages and stream sequence in one call
- Initialize with snapshot data - Pass messages as initial state
- Resume from sequence - Only fetch new events
// Step 1: Get snapshot (implements "sequence last" pattern internally)
const snapshot = await handler.getSnapshot(sessionId);
// snapshot contains:
// - state: Agent state (typed)
// - messages: UIMessage[] for initialMessages
// - streamSequence: Resume position
// - timestamp: When snapshot was created
// - status: 'active' | 'paused' | 'ended' | 'failed'
// Step 2: Initialize useChat with snapshot and transport
const { messages } = useChat({
transport: new HelixChatTransport({
api: `/api/chat/${sessionId}`,
resumeFromSequence: snapshot.status === 'active' ? snapshot.streamSequence : undefined,
}),
initialMessages: snapshot.messages,
resume: snapshot.status === 'active',
});Snapshot Endpoint
import { extractResumePosition } from '@helix-agents/ai-sdk';
// GET /api/chat/:sessionId/snapshot
app.get('/api/chat/:sessionId/snapshot', async (req, res) => {
const snapshot = await handler.getSnapshot(req.params.sessionId);
if (!snapshot) {
return res.status(404).json({ error: 'Session not found' });
}
res.json(snapshot);
});
// GET /api/chat/:sessionId/stream
app.get('/api/chat/:sessionId/stream', async (req, res) => {
// extractResumePosition handles multiple header formats:
// - Last-Event-ID (browser auto-reconnect)
// - X-Resume-From-Sequence (snapshot-based resumption)
// - X-Resume-At (alternative header format)
const resumeAt = extractResumePosition(req.headers);
const response = await handler.handleRequest({
method: 'GET',
streamId: req.params.sessionId,
resumeAt,
});
return new Response(response.body, {
status: response.status,
headers: response.headers,
});
});Mid-Stream Page Refresh
When the user refreshes the page during active streaming, getSnapshot() automatically preserves any partial content that was visible. This prevents the jarring experience of content disappearing and reappearing.
How it works:
- During streaming, content (text deltas, tool calls) is emitted to clients in real-time
- Assistant messages are only saved to the message store after each step completes
- If a refresh occurs mid-step,
getSnapshot()reconstructs partial content from stream chunks - The partial content is appended as an assistant message so
initialMessagesreflects what was visible
// Snapshot during active streaming includes partial content
const snapshot = await handler.getSnapshot(sessionId);
// snapshot.messages includes:
// - All completed messages from previous steps
// - Partial assistant message with content streamed so far (if streaming)
// The partial message contains:
// - Text accumulated from text_delta chunks
// - Tool calls with their current state (pending, executing, or completed)Important: Partial content is only included when status === 'active'. Once streaming ends, the full message is saved normally and no reconstruction is needed.
For more details on the streaming architecture, see Mid-Stream Page Refresh.
Content Replay for Stream Resumption
When a user refreshes the page mid-stream, the AI SDK useChat hook reconnects and resumes receiving events. However, there's a subtle issue: the AI SDK creates new text/reasoning blocks on text-start and reasoning-start events, even when resuming mid-message. This causes duplicate content when initialMessages already contains partial text.
Content Replay solves this by replaying partial content as stream events instead of including it in initialMessages. This way, the client receives a complete, linear stream from the beginning of the current assistant turn.
How it works:
- When
getSnapshot()is called with content replay enabled (default), partial content is excluded frommessages - When the client resumes the stream, the server replays the partial content as events before continuing with live events
- The client receives a complete stream: replay events + live events, avoiding duplicate content
// Content replay is enabled by default
const handler = createFrontendHandler({
streamManager,
executor,
agent: MyAgent,
stateStore,
// contentReplay: { enabled: true } // default
});
// Snapshot excludes partial content (will come from replayed stream)
const snapshot = await handler.getSnapshot(sessionId);
// snapshot.messages only contains completed messages
// On stream resume, replay events are emitted first:
// 1. Replay: text-start, text-delta (partial content)
// 2. Live: text-delta (new content), text-end, finishDisabling Content Replay:
If you prefer the old behavior (partial content in initialMessages), disable content replay:
const handler = createFrontendHandler({
streamManager,
executor,
agent: MyAgent,
stateStore,
contentReplay: { enabled: false }, // Disable replay
});
// Now snapshot includes partial content in messages
const snapshot = await handler.getSnapshot(sessionId);Override for Specific Snapshots:
You can also override the behavior per-snapshot call:
// Force include partial content even with content replay enabled
const snapshot = await handler.getSnapshot(sessionId, {
includePartialContent: true,
});
// Force exclude partial content even with content replay disabled
const snapshot = await handler.getSnapshot(sessionId, {
includePartialContent: false,
});Using Replay Utilities Directly:
For custom streaming implementations, use the replay utilities:
import { createReplayEvents, hasReplayContent } from '@helix-agents/ai-sdk';
import type { ReplayContent } from '@helix-agents/ai-sdk';
// Check if there's content to replay
if (hasReplayContent(replayContent)) {
// Generate replay events
const result = createReplayEvents(replayContent, {
generateBlockId: () => `block-${++blockCounter}`,
generateMessageId: () => 'msg-123',
agentId: 'run-123',
});
// result.events contains: start, text-start, text-delta, etc.
// result.messageId is the message ID used
// result.textBlockId / result.reasoningBlockId track open blocks
for (const event of result.events) {
yield event;
}
}Stream Status Field
The status field tells the client whether to attempt stream resumption:
| Status | Description | Client Action |
|---|---|---|
active | Stream is running | Set resume: true in useChat |
paused | Stream is paused | May need to resume later |
ended | Stream completed successfully | No SSE connection needed |
failed | Stream failed | Handle error state |
// Client decides whether to connect based on status
const shouldResume = snapshot.status === 'active';SSE Event IDs
Each chunk gets a sequence number that becomes an SSE id: field:
id: 42
data: {"type":"text-delta","delta":"Hello"}
id: 43
data: {"type":"text-delta","delta":" world"}On disconnect, the browser reconnects with Last-Event-ID: 43 header, and the handler resumes from that position.
Stream Status Handling (GET Mode)
// GET mode returns different status codes:
// 200 - Active stream with content
// 204 - No content (stream ended, not found, or empty)
// 410 - Stream failed (Gone)Server-Side Rendering with Next.js
The snapshot approach works seamlessly with Next.js App Router for SSR.
Server Component
// app/chat/[sessionId]/page.tsx (Server Component)
import { handler } from '@/lib/agent-handler';
import { ChatClient } from './ChatClient';
import { notFound } from 'next/navigation';
export default async function ChatPage({ params }: { params: { sessionId: string } }) {
// Server-side: fetch snapshot directly (no API call needed)
const snapshot = await handler.getSnapshot(params.sessionId);
if (!snapshot) {
notFound();
}
// Server renders with messages, hydrates on client
return (
<div className="container mx-auto p-4">
<h1>Chat Session</h1>
<p className="text-gray-600">
Status: {snapshot.status} | Sequence: {snapshot.streamSequence}
</p>
<ChatClient sessionId={params.sessionId} initialSnapshot={snapshot} />
</div>
);
}Client Component
// app/chat/[sessionId]/ChatClient.tsx
'use client';
import { useChat } from '@ai-sdk/react';
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
import type { FrontendSnapshot } from '@helix-agents/ai-sdk';
interface Props {
sessionId: string;
initialSnapshot: FrontendSnapshot<MyState>;
}
export function ChatClient({ sessionId, initialSnapshot }: Props) {
const shouldResume = initialSnapshot.status === 'active';
const { messages, input, handleInputChange, handleSubmit, status } = useChat({
id: `chat-${sessionId}`,
// Use HelixChatTransport for proper stream resumption
transport: new HelixChatTransport({
api: `/api/chat/${sessionId}`,
resumeFromSequence: shouldResume ? initialSnapshot.streamSequence : undefined,
}),
// Key: use initialMessages for SSR hydration
initialMessages: initialSnapshot.messages,
// Only attempt resume if stream is active
resume: shouldResume,
});
return (
<div className="flex flex-col gap-4">
<div className="flex-1 overflow-y-auto">
{messages.map(m => (
<div key={m.id} className={`p-2 ${m.role === 'user' ? 'bg-blue-100' : 'bg-gray-100'}`}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
</div>
{initialSnapshot.status === 'active' && status === 'streaming' && (
<div className="text-gray-500">Agent is running...</div>
)}
<form onSubmit={handleSubmit} className="flex gap-2">
<input
value={input}
onChange={handleInputChange}
placeholder="Ask a question..."
className="flex-1 border rounded p-2"
disabled={status === 'streaming'}
/>
<button type="submit" disabled={status === 'streaming'}>
Send
</button>
</form>
</div>
);
}API Routes for Next.js App Router
// app/api/chat/[sessionId]/route.ts
import { handler } from '@/lib/agent-handler';
import { extractResumePosition } from '@helix-agents/ai-sdk';
// POST - Continue conversation
export async function POST(req: Request, { params }: { params: { sessionId: string } }) {
const body = await req.json();
const response = await handler.handleRequest({
method: 'POST',
body: {
message: body.message,
sessionId: params.sessionId,
},
});
return new Response(response.body, {
status: response.status,
headers: response.headers,
});
}
// GET - Resume stream
export async function GET(req: Request, { params }: { params: { sessionId: string } }) {
const resumeAt = extractResumePosition(
Object.fromEntries(req.headers.entries())
);
const response = await handler.handleRequest({
method: 'GET',
streamId: params.sessionId,
resumeAt,
});
return new Response(response.body, {
status: response.status,
headers: response.headers,
});
}// app/api/chat/[sessionId]/snapshot/route.ts
import { handler } from '@/lib/agent-handler';
export async function GET(req: Request, { params }: { params: { sessionId: string } }) {
const snapshot = await handler.getSnapshot(params.sessionId);
if (!snapshot) {
return Response.json({ error: 'Session not found' }, { status: 404 });
}
return Response.json(snapshot);
}Why This Works
- No duplicate data transfer - Messages loaded once via snapshot
- No race conditions - Sequence number precisely coordinates state
- SSR-friendly -
FrontendSnapshotis JSON-serializable - Framework-agnostic - Works with any SSR solution, not just Next.js
For more details on the "sequence last" pattern and complete implementation, see the Resumable Streams Example.
Complete Example
import { createFrontendHandler, FrontendHandlerError } from '@helix-agents/ai-sdk';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { defineAgent } from '@helix-agents/core';
import { openai } from '@ai-sdk/openai';
import { z } from 'zod';
// Define agent
const ChatAgent = defineAgent({
name: 'chat',
systemPrompt: 'You are a helpful assistant.',
outputSchema: z.object({
response: z.string(),
}),
llmConfig: {
model: openai('gpt-4o'),
},
});
// Create executor
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Create handler
const handler = createFrontendHandler({
streamManager,
executor,
agent: ChatAgent,
stateStore,
});
// Use with Hono
import { Hono } from 'hono';
const app = new Hono();
app.post('/api/chat', async (c) => {
try {
const body = await c.req.json();
const response = await handler.handleRequest({
method: 'POST',
body: { message: body.message },
});
return new Response(response.body, {
status: response.status,
headers: response.headers,
});
} catch (error) {
if (error instanceof FrontendHandlerError) {
return c.json({ error: error.message, code: error.code }, error.statusCode);
}
throw error;
}
});
// Load messages for conversation restore
app.get('/api/messages/:sessionId', async (c) => {
const sessionId = c.req.param('sessionId');
const { messages, hasMore } = await handler.getMessages(sessionId);
return c.json({ messages, hasMore });
});Recovery Hooks
The @helix-agents/ai-sdk/react package provides hooks for handling stream recovery scenarios like crashes, rollbacks, and page refreshes.
useStreamResync
Handle stream_resync events manually:
import { useStreamResync } from '@helix-agents/ai-sdk/react';
function ChatUI({ sessionId }: { sessionId: string }) {
const { messages, setMessages, data } = useChat({ api: '/api/chat' });
useStreamResync(data, {
onResync: async (event) => {
console.log(`Resync: ${event.data.reason}`);
// Manually handle resync - fetch fresh messages
const response = await fetch(`/api/chat/${sessionId}/snapshot`);
const snapshot = await response.json();
setMessages(snapshot.messages);
},
});
return <Messages messages={messages} />;
}useAutoResync
Automatic resync with snapshot fetching:
import { useAutoResync } from '@helix-agents/ai-sdk/react';
function ChatUI({ sessionId }: { sessionId: string }) {
const { messages, setMessages, data } = useChat({ api: '/api/chat' });
useAutoResync(data, {
snapshotUrl: `/api/chat/${sessionId}/snapshot`,
setMessages,
onResync: (event) => {
toast.info(`Recovered from ${event.data.reason}`);
},
onError: (error) => {
console.error('Resync failed:', error);
},
});
return <Messages messages={messages} />;
}useResyncState
Track resync state without automatic handling:
import { useResyncState } from '@helix-agents/ai-sdk/react';
function ChatUI() {
const { data } = useChat({ api: '/api/chat' });
const { isResyncing, lastResyncEvent } = useResyncState(data);
if (isResyncing) {
return <div>Recovering state...</div>;
}
return <Messages />;
}useCheckpointSnapshot
Load UI state from a checkpoint:
import { useCheckpointSnapshot } from '@helix-agents/ai-sdk/react';
function ChatPage({ sessionId }: { sessionId: string }) {
const { snapshot, loading, error } = useCheckpointSnapshot({
snapshotUrl: `/api/chat/${sessionId}/snapshot`,
});
const { messages, setMessages } = useChat({
api: '/api/chat',
initialMessages: snapshot?.messages ?? [],
});
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return <Messages messages={messages} />;
}useResumableChat
Turnkey hook combining snapshot loading and resync handling. This is the recommended hook for production chat interfaces as it handles all recovery scenarios automatically.
import { useResumableChat } from '@helix-agents/ai-sdk/react';
function ChatPage({ sessionId }: { sessionId: string }) {
const {
// Snapshot state
snapshot, // Full snapshot data (state, messages, etc.)
isLoading, // True while loading initial snapshot
error, // Error from initial snapshot load
// Resync state
resyncError, // Error from automatic resync (separate from initial load)
hasResynced, // True if any resync has occurred
resyncCount, // Number of resyncs that have occurred
// Checkpoint info
checkpoint, // { id, stepCount, streamSequence }
// Methods
refetch, // Manually refetch snapshot
} = useResumableChat({
// Required
snapshotUrl: `/api/chat/${sessionId}/snapshot`,
setMessages, // From useChat
// Optional
checkpointId: 'cp-123', // Specific checkpoint to load (optional)
enabled: true, // Enable/disable the hook
// Callbacks
onResync: (event) => {
// Called on each resync event
console.log(`Recovered from ${event.data.reason}`);
},
onError: (error) => {
// Called on resync errors
console.error('Resync failed:', error);
},
onSnapshotLoaded: (snapshot) => {
// Called when initial snapshot loads
console.log(`Loaded ${snapshot.messages.length} messages`);
},
});
// Separate error handling for initial load vs resync
if (error) {
return <div>Failed to load chat: {error.message}</div>;
}
if (resyncError) {
return <div>Recovery failed: {resyncError.message}</div>;
}
if (isLoading) {
return <div>Loading chat...</div>;
}
return (
<div>
{hasResynced && (
<div className="text-sm text-gray-500">
Recovered ({resyncCount} resyncs)
</div>
)}
<Messages messages={messages} />
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
<button type="submit">Send</button>
</form>
</div>
);
}Full Integration Example
import { useChat } from '@ai-sdk/react';
import { useResumableChat } from '@helix-agents/ai-sdk/react';
function ResumableChat({ sessionId }: { sessionId: string }) {
const { messages, setMessages, input, handleInputChange, handleSubmit, data } = useChat({
api: `/api/chat/${sessionId}`,
});
const {
snapshot,
isLoading,
error,
resyncError,
hasResynced,
} = useResumableChat(data, {
snapshotUrl: `/api/chat/${sessionId}/snapshot`,
setMessages,
onResync: () => toast.info('Connection restored'),
});
// Use snapshot for additional state
const agentState = snapshot?.state;
const streamSequence = snapshot?.streamSequence ?? 0;
return (
<div>
{hasResynced && <Banner>Reconnected to stream</Banner>}
{/* ... rest of UI */}
</div>
);
}Hook Comparison
| Hook | Use Case | Automatic Snapshot | Automatic Resync |
|---|---|---|---|
useStreamResync | Manual resync handling | ❌ | ❌ |
useAutoResync | Auto-handle resyncs | ❌ | ✅ |
useCheckpointSnapshot | Load checkpoint state | ✅ | ❌ |
useResyncState | Track resync events | ❌ | ❌ |
useResumableChat | Full solution | ✅ | ✅ |
Use useResumableChat for most production applications. Use the individual hooks when you need fine-grained control over specific behaviors.
Message Merging
During streaming, the AI SDK shows all content (text, tool calls) as a single assistant message being built incrementally. However, in storage, each LLM call creates a separate assistant message. This mismatch can cause UI issues when loading from snapshots.
The Problem
// In storage after 3-step execution:
[
{ role: 'user', content: 'Research AI' },
{ role: 'assistant', parts: [{ type: 'tool-search', ... }] }, // Step 1
{ role: 'assistant', parts: [{ type: 'text', text: 'Found...' }] }, // Step 2
{ role: 'assistant', parts: [{ type: 'text', text: 'Summary' }] }, // Step 3
]
// But during streaming, the user saw ONE assistant messagemergeAssistantMessages()
The mergeAssistantMessages() function combines consecutive assistant messages:
import { mergeAssistantMessages } from '@helix-agents/ai-sdk';
const messages = await loadUIMessages(stateStore, sessionId);
const merged = mergeAssistantMessages(messages);
// Before: [user, assistant, assistant, assistant]
// After: [user, assistant] (all parts combined)Options
mergeAssistantMessages(messages, {
// Deduplicate tools with same toolCallId (default: true)
deduplicateTools: true,
// Remove empty text parts (default: true)
filterEmptyText: true,
});How It Works
- Consecutive Detection: Finds runs of adjacent assistant messages
- Part Merging: Combines all parts from consecutive messages
- Tool Deduplication: Keeps only first occurrence of each
toolCallId - Partial Suffix: Preserves
-partialID suffix if any source message had it - Metadata: Copies metadata from the first message in the run
When It's Applied
The getSnapshot() method in FrontendHandler applies merging automatically:
const snapshot = await handler.getSnapshot(sessionId);
// snapshot.messages are already mergedFor custom implementations:
import { loadUIMessages, mergeAssistantMessages } from '@helix-agents/ai-sdk';
const { messages } = await loadUIMessages(stateStore, sessionId);
const merged = mergeAssistantMessages(messages);Edge Cases
Partial content with merging:
// Message with -partial suffix indicates incomplete content
{ id: 'msg-1-partial', role: 'assistant', parts: [...] }
// After merge, suffix is preserved if ANY source had it
{ id: 'msg-1-partial', role: 'assistant', parts: [...] }Tool deduplication:
// Same toolCallId in multiple messages
[
{ parts: [{ toolCallId: 'tc1', state: 'input-available' }] },
{ parts: [{ toolCallId: 'tc1', state: 'output-available' }] },
]
// Only first occurrence kept (usually want the most complete state)
// For correct state, ensure messages are in order with latest state lastCommon Pitfalls
1. Missing UI Stream Header
The AI SDK requires a specific header for UI Message Stream Protocol:
// FrontendHandler sets this automatically
headers: {
'x-vercel-ai-ui-message-stream': 'v1',
}Custom endpoints must include this header.
2. Forgetting to Call finalize()
If using StreamTransformer directly:
const transformer = new StreamTransformer();
for await (const chunk of stream) {
yield* transformer.transform(chunk).events;
}
// Don't forget this!
yield* transformer.finalize().events;The static toDataStream() method handles this automatically.
3. Using Typed Tool Event Names
Helix uses dynamic tools. Don't expect typed tool events:
// ❌ Wrong - these are for typed tools
onToolCall: ({ toolCall }) => { ... }
// ✅ Correct - use message parts
messages.map(m => m.parts?.filter(p => p.type === 'tool-invocation'))4. Looking for Tool Results in content
Tool results are in message parts, not content:
// ❌ Wrong
const result = message.content;
// ✅ Correct
const toolParts = message.parts?.filter(p => p.type === 'tool-invocation');
const results = toolParts?.filter(p => p.toolInvocation.state === 'result');Next Steps
- React Integration - Building React chat UIs
- Framework Examples - Express, Hono setup
- Streaming Guide - Helix streaming deep dive