Skip to content

Resumable Streams

Resumable streams allow clients to reconnect and continue receiving events from where they left off after network interruptions, page refreshes, or server restarts. This guide covers all approaches to stream resumability in Helix Agents.

Why Resumability Matters

Real-world applications face several challenges:

  • Network issues: Mobile networks drop, WiFi switches, connections timeout
  • Page refreshes: Users refresh the browser mid-stream
  • Server crashes: The server restarts while streaming
  • Long-running tasks: Agent executions that take minutes or hours

Without resumability, any of these events would lose all streaming progress, forcing users to restart from scratch.

Key Concepts

Sequence Numbers

Every chunk in a stream can have a sequence number - a monotonically increasing integer that identifies its position:

typescript
interface StoredChunk {
  chunk: StreamChunk;
  sequence: number; // 1, 2, 3, ...
}

Sequence numbers enable clients to say "give me everything after sequence 42" when reconnecting.

Stream Status

Streams have a lifecycle status:

StatusDescriptionClient Action
activeStream is running, new chunks expectedConnect and stream
pausedStream paused, may resume laterWait or poll
endedStream completed successfullyNo connection needed
failedStream failed with errorHandle error state

Snapshots

A snapshot combines the current state with the stream position:

typescript
interface Snapshot {
  messages: UIMessage[];      // Conversation history
  state: AgentState;          // Agent's custom state
  streamSequence: number;     // Position to resume from
  status: 'active' | 'paused' | 'ended' | 'failed';
  timestamp: number;
}

Snapshots solve the "sequence-last" problem: by loading state first and getting the sequence last, you guarantee no events are missed.

Content Replay

When a user refreshes mid-stream, they've already seen some content that hasn't been saved to message history yet (because the step isn't complete). Content replay reconstructs this partial content and replays it as stream events, ensuring a seamless experience.

Approaches to Resumability

1. SSE Event IDs (Browser Native)

The simplest approach uses Server-Sent Events' built-in reconnection:

typescript
// Server: Include event ID in SSE stream
for await (const { chunk, sequence } of reader) {
  yield `id: ${sequence}\ndata: ${JSON.stringify(chunk)}\n\n`;
}

When the connection drops, the browser automatically reconnects with the Last-Event-ID header, and the server can resume from that position.

Pros:

  • Zero client code needed
  • Browser handles reconnection automatically

Cons:

  • Only works for connection drops, not page refreshes
  • No state synchronization
  • Can miss events if server restarts

The snapshot approach loads complete state and stream position atomically:

typescript
// Step 1: Get snapshot (state + stream position)
const snapshot = await handler.getSnapshot(sessionId);

// Step 2: Initialize UI with snapshot data
const { messages } = useChat({
  initialMessages: snapshot.messages,
  resume: snapshot.status === 'active',
});

// Step 3: Resume stream from snapshot position
// (happens automatically if resume: true)

Pros:

  • Works across page refreshes
  • Handles server restarts
  • State always synchronized
  • Deterministically correct

Cons:

  • Requires snapshot endpoint
  • Slightly more complex setup

3. Content Replay (Mid-Stream Refresh)

Content replay handles the specific case where a user refreshes mid-message:

typescript
// Without content replay:
// - initialMessages has partial text from stream chunks
// - Stream resume sends text-start + text-delta
// - Result: DUPLICATE text in UI

// With content replay (default):
// - initialMessages excludes partial content
// - Stream replays partial content as events first
// - Then continues with live events
// - Result: Seamless, no duplicates

When to use: Always enable (it's the default) unless you have a specific reason not to.

Comparison

ApproachPage RefreshServer RestartState SyncComplexity
SSE Event IDsNoNoNoLow
Snapshot-basedYesYesYesMedium
+ Content ReplayYesYesYesMedium

Recommendation: Use snapshot-based resumption with content replay enabled (the default).

Core SDK Usage

If you're not using the AI SDK integration, you can implement resumable streams directly with the core primitives.

Creating a Resumable Reader

typescript
import { RedisStreamManager } from '@helix-agents/store-redis';

const streamManager = new RedisStreamManager(redis);

// Get stream info first
const info = await streamManager.getStreamInfo(streamId);
if (!info || info.status === 'failed') {
  throw new Error('Stream not available');
}

// Create resumable reader (from a specific position)
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: lastKnownSequence, // Resume from here (exclusive)
});

if (reader) {
  console.log(`Resuming from sequence ${reader.currentSequence}`);
  console.log(`Stream has ${reader.totalChunks} total chunks`);

  for await (const chunk of reader) {
    // Process chunk
    console.log(`Chunk at sequence ${reader.currentSequence}`);
  }
}

Building a Custom Resume Endpoint

typescript
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';

const app = new Hono();

// GET /stream/:sessionId - Stream with resumption support
app.get('/stream/:sessionId', async (c) => {
  const sessionId = c.req.param('sessionId');

  // Check for resume position from headers
  const lastEventId = c.req.header('Last-Event-ID');
  const resumeFrom = lastEventId ? parseInt(lastEventId, 10) : 0;

  // Get stream info
  const info = await streamManager.getStreamInfo(sessionId);
  if (!info) {
    return c.json({ error: 'Stream not found' }, 404);
  }

  if (info.status === 'failed') {
    return c.json({ error: 'Stream failed' }, 410);
  }

  // Create reader (resumable if position provided)
  const reader = resumeFrom > 0
    ? await streamManager.createResumableReader(sessionId, { fromSequence: resumeFrom })
    : await streamManager.createReader(sessionId);

  if (!reader) {
    return c.json({ error: 'Cannot read stream' }, 500);
  }

  // Stream SSE response
  return streamSSE(c, async (stream) => {
    let sequence = resumeFrom;

    for await (const chunk of reader) {
      sequence++;
      await stream.writeSSE({
        id: String(sequence),
        data: JSON.stringify(chunk),
      });
    }

    // Send end event
    await stream.writeSSE({
      data: JSON.stringify({ type: 'end' }),
    });
  });
});

// GET /stream/:sessionId/info - Get stream metadata
app.get('/stream/:sessionId/info', async (c) => {
  const sessionId = c.req.param('sessionId');
  const info = await streamManager.getStreamInfo(sessionId);

  if (!info) {
    return c.json({ error: 'Stream not found' }, 404);
  }

  return c.json({
    status: info.status,
    totalChunks: info.totalChunks,
    latestSequence: info.latestSequence,
  });
});

Reconstructing Partial Content

For custom UIs that need to show partial content on refresh:

typescript
// Get all chunks from the current step
const stepCount = checkpoint?.stepCount ?? 0;
const fromStep = stepCount + 1;

// Use optimized method if available
const chunks = streamManager.getChunksFromStep
  ? await streamManager.getChunksFromStep(sessionId, fromStep)
  : await streamManager.getAllChunks(sessionId);

// Reconstruct partial content from chunks
let partialText = '';
let partialReasoning = '';
const toolCalls = new Map();

for (const chunk of chunks) {
  switch (chunk.type) {
    case 'text_delta':
      partialText += chunk.delta;
      break;
    case 'thinking':
      partialReasoning += chunk.content;
      break;
    case 'tool_start':
      toolCalls.set(chunk.toolCallId, {
        name: chunk.toolName,
        input: chunk.arguments,
        state: 'executing',
      });
      break;
    case 'tool_end':
      const tool = toolCalls.get(chunk.toolCallId);
      if (tool) {
        tool.state = chunk.success ? 'completed' : 'error';
        tool.output = chunk.result;
      }
      break;
  }
}

// Use partialText, partialReasoning, toolCalls in your UI

AI SDK Usage

The @helix-agents/ai-sdk package provides higher-level abstractions that handle most resumability concerns automatically.

Quick Start with FrontendHandler

typescript
import { createFrontendHandler } from '@helix-agents/ai-sdk';

const handler = createFrontendHandler({
  streamManager,
  executor,
  agent: MyAgent,
  stateStore,
  // Content replay is enabled by default
});

// Get snapshot for initializing UI
const snapshot = await handler.getSnapshot(sessionId);

// snapshot contains:
// - messages: UIMessage[] (for initialMessages)
// - state: AgentState (custom state)
// - streamSequence: number (resume position)
// - status: 'active' | 'paused' | 'ended' | 'failed'

Client Transport

Use HelixChatTransport to integrate with AI SDK v6's useChat hook. This transport handles Helix-specific patterns for stream resumption:

typescript
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';

const transport = new HelixChatTransport({
  api: `/api/chat/${sessionId}`,
  resumeFromSequence: snapshot.streamSequence,  // From snapshot
  headers: { 'Authorization': `Bearer ${token}` },
  credentials: 'include',
});

The transport automatically:

  • Uses the same API path for POST (send) and GET (resume) requests
  • Adds the X-Resume-From-Sequence header when resuming
  • Passes through custom headers, body, and credentials

React Integration

typescript
'use client';

import { useChat } from '@ai-sdk/react';
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
import { useAutoResync } from '@helix-agents/ai-sdk/react';

function Chat({ sessionId, initialSnapshot }) {
  const shouldResume = initialSnapshot.status === 'active';

  const { messages, setMessages, data } = useChat({
    id: `chat-${sessionId}`,
    transport: new HelixChatTransport({
      api: `/api/chat/${sessionId}`,
      resumeFromSequence: shouldResume ? initialSnapshot.streamSequence : undefined,
    }),
    initialMessages: initialSnapshot.messages,
    resume: shouldResume,
  });

  // Automatic crash recovery
  useAutoResync(data, {
    snapshotUrl: `/api/chat/${sessionId}/snapshot`,
    setMessages,
    onResync: (event) => {
      console.log('Recovered from:', event.data.reason);
    },
  });

  return <MessageList messages={messages} />;
}

Content Replay Configuration

Content replay is enabled by default. To customize:

typescript
// Disable content replay globally
const handler = createFrontendHandler({
  // ...
  contentReplay: { enabled: false },
});

// Or override per-snapshot
const snapshot = await handler.getSnapshot(sessionId, {
  includePartialContent: true, // Force include partial content
});

For detailed AI SDK documentation, see AI SDK Integration.

StreamManager Implementations

Different StreamManager implementations have different resumability features:

FeatureInMemoryRedisCloudflare DO
createReaderYesYesYes
createResumableReaderNoYesYes
getStreamInfoYesYesYes
getAllChunksYesYesYes
getChunksFromStepYesYesYes
pauseStream / resumeStreamNoYesYes
Multi-processNoYesYes
PersistenceNoYesYes
TTL/ExpirationNoYesYes

InMemoryStreamManager

Best for development and testing. No persistence or multi-process support.

typescript
import { InMemoryStreamManager } from '@helix-agents/store-memory';

const streamManager = new InMemoryStreamManager();

Limitations:

  • No createResumableReader - use getAllChunks + filtering instead
  • Data lost on process restart
  • Single process only

RedisStreamManager

Production-ready with full resumability support.

typescript
import { RedisStreamManager } from '@helix-agents/store-redis';

const streamManager = new RedisStreamManager(redis, {
  prefix: 'myapp:streams:',
  ttl: 86400, // 24 hours
});

Features:

  • Full createResumableReader with sequence tracking
  • Pub/Sub for real-time delivery
  • Atomic cleanup operations
  • Configurable TTL

DurableObjectStreamManager

For Cloudflare Workers deployments.

typescript
import { DurableObjectStreamManager } from '@helix-agents/store-cloudflare/stream';

const streamManager = new DurableObjectStreamManager({
  streamNamespace: env.STREAMS,
});

Features:

  • HTTP-based resumable reader
  • SSE support with buffer overflow protection
  • Global edge deployment

Use Cases

Chat Applications

For chat apps, use snapshot-based resumption with content replay:

typescript
// Server: Next.js App Router
export async function GET(req, { params }) {
  const snapshot = await handler.getSnapshot(params.sessionId);
  return Response.json(snapshot);
}

// Client: React
const { snapshot, isLoading } = useResumableChat({
  snapshotUrl: `/api/chat/${sessionId}/snapshot`,
  setMessages,
});

Long-Running Tasks

For tasks that take minutes or hours, add progress tracking:

typescript
const agent = defineAgent({
  name: 'researcher',
  tools: [
    defineTool({
      name: 'search',
      execute: async (input, context) => {
        // Emit progress events
        await context.emit('search_progress', {
          query: input.query,
          status: 'searching',
        });

        const results = await search(input.query);

        await context.emit('search_progress', {
          query: input.query,
          status: 'complete',
          resultCount: results.length,
        });

        return results;
      },
    }),
  ],
});

Clients can track custom events and show progress UI even after reconnection.

Mobile Apps

For mobile apps with unreliable connectivity:

typescript
// Implement aggressive reconnection
let retryCount = 0;
const maxRetries = 10;

async function connectWithRetry(sessionId, fromSequence) {
  while (retryCount < maxRetries) {
    try {
      const response = await fetch(`/stream/${sessionId}`, {
        headers: fromSequence > 0
          ? { 'Last-Event-ID': String(fromSequence) }
          : {},
      });

      if (response.ok) {
        retryCount = 0; // Reset on success
        return response;
      }
    } catch (error) {
      retryCount++;
      // Exponential backoff
      await sleep(Math.min(1000 * Math.pow(2, retryCount), 30000));
    }
  }

  throw new Error('Max retries exceeded');
}

Server-Side Rendering

For SSR with Next.js:

typescript
// Server Component
export default async function ChatPage({ params }) {
  const snapshot = await handler.getSnapshot(params.sessionId);

  if (!snapshot) {
    notFound();
  }

  // Server renders with messages immediately
  return (
    <div>
      <ChatClient
        sessionId={params.sessionId}
        initialSnapshot={snapshot}
      />
    </div>
  );
}

The page is server-rendered with all messages, then hydrates on the client and resumes streaming if active.

Multi-Turn Conversations

When users send follow-up messages in the same session, special handling is needed to prevent content duplication.

The Problem

In a multi-turn conversation:

  1. User sends message A → Agent responds with chunks 1-50
  2. User sends message B → Agent responds with chunks 51-100
  3. User refreshes during message B response

Without filtering, the client would replay chunks 1-100, showing both responses in the current message.

Sequence Concepts

Two sequence numbers control multi-turn filtering:

SequenceDescriptionUse
startSequencePosition when current run startedFilter for current turn
streamSequenceLatest chunk sequence globallyResume position
typescript
// Before starting follow-up execution
const snapshot = await handler.getSnapshot(sessionId);
const startSequence = snapshot.streamSequence; // e.g., 50

// Start new execution
const handle = await executor.execute(agent, 'Follow-up message', { sessionId });

// New chunks will be 51, 52, 53...
// To show only current turn, filter: sequence >= startSequence

Implementing Multi-Turn Filtering

Server-side (recommended):

The run metadata tracks startSequence:

typescript
const run = await stateStore.getCurrentRun(sessionId);
const startSequence = run?.startSequence ?? 0;

// Filter chunks to current run only
const chunks = await streamManager.getChunksFromSequence(sessionId, startSequence);

Client-side with HelixChatTransport:

The transport handles this automatically when resumeFromSequence is set correctly:

typescript
// On follow-up message, get current sequence BEFORE execution
const preExecSnapshot = await fetch(`/api/chat/${sessionId}/snapshot`).then(r => r.json());
const startSequence = preExecSnapshot.streamSequence;

// After execution starts, use startSequence for resumption
const transport = new HelixChatTransport({
  api: `/api/chat/${sessionId}`,
  resumeFromSequence: startSequence,
});

Content Replay with Multi-Turn

Content replay works per-turn. When getSnapshot() is called:

  1. It identifies the current run's startSequence
  2. Reconstructs partial content only from chunks >= startSequence
  3. Returns messages excluding the current turn's partial content
  4. On resume, replays only the current turn's partial content
typescript
// Snapshot automatically handles multi-turn
const snapshot = await handler.getSnapshot(sessionId);

// snapshot.messages: All completed messages + previous turns
// snapshot.streamSequence: Resume position
// snapshot.startSequence: Current turn start (used internally for filtering)

Follow-Up Message Pattern

Complete pattern for follow-up messages:

typescript
async function sendFollowUp(sessionId: string, message: string) {
  // 1. Get current sequence before execution
  const preSnapshot = await handler.getSnapshot(sessionId);
  const startSequence = preSnapshot.streamSequence;

  // 2. Start execution (will create new run with startSequence set)
  const response = await fetch(`/api/chat/${sessionId}`, {
    method: 'POST',
    body: JSON.stringify({ message }),
  });

  // 3. If user refreshes, resume from startSequence
  // The new run's chunks will be >= startSequence
  // Content replay will only include current turn's partial content

  return response;
}

Common Multi-Turn Issues

Duplicate content on refresh:

  • Cause: Resuming from 0 instead of startSequence
  • Fix: Always pass startSequence to resumeFromSequence

Old messages appearing in stream:

  • Cause: Chunks from previous runs being replayed
  • Fix: Filter by startSequence before streaming

Empty response after follow-up:

  • Cause: Using wrong sequence (too high)
  • Fix: Capture sequence BEFORE starting execution

Best Practices

1. Use the Sequence-Last Pattern

Always get the stream sequence after loading state:

typescript
// CORRECT: State first, sequence last
const state = await stateStore.load(sessionId);
const messages = convertToUIMessages(state.messages);
const info = await streamManager.getStreamInfo(sessionId);
const sequence = info?.latestSequence ?? 0;

// WRONG: Sequence first, state second
const info = await streamManager.getStreamInfo(sessionId); // Gets sequence
const state = await stateStore.load(sessionId); // State may have changed!
// Events between these calls could be missed

This ensures any events that occur during loading are captured when resuming.

2. Handle Deduplication

When resuming, you may receive some duplicate events. Handle gracefully:

typescript
const seenSequences = new Set();

for await (const { chunk, sequence } of reader) {
  if (seenSequences.has(sequence)) {
    continue; // Skip duplicates
  }
  seenSequences.add(sequence);

  processChunk(chunk);
}

3. Configure Appropriate TTLs

Set TTLs based on your use case:

typescript
// Short-lived chats (1 hour)
const streamManager = new RedisStreamManager(redis, { ttl: 3600 });

// Long-running research tasks (7 days)
const streamManager = new RedisStreamManager(redis, { ttl: 86400 * 7 });

4. Clean Up Completed Streams

Don't leave completed streams indefinitely:

typescript
// After getting final result
const result = await handle.result();

// Optionally clean up if no longer needed
if (!needToKeepHistory) {
  await stateStore.delete(sessionId);
}

Troubleshooting

Events Lost on Page Refresh

Symptom: Some text disappears when refreshing mid-stream.

Cause: Content replay may be disabled or not working.

Solution:

  1. Ensure contentReplay: { enabled: true } (default)
  2. Check that getSnapshot() is being called
  3. Verify the stream status is 'active'

Duplicate Content After Resume

Symptom: Same text appears twice after reconnection.

Cause: Content replay is disabled but partial content is in initialMessages.

Solution:

  1. Enable content replay, OR
  2. Call getSnapshot() with includePartialContent: false

Stream Shows as Failed

Symptom: getStreamInfo() returns status: 'failed'.

Cause: The stream was explicitly failed or encountered an error.

Solution:

  1. Check info.error for the error message
  2. Check server logs for the failure reason
  3. The client should show an error state, not attempt to resume

createResumableReader Returns Null

Symptom: createResumableReader() returns null.

Cause: Stream doesn't exist or has failed status.

Solution:

  1. Check getStreamInfo() first
  2. Use createReader() as fallback for active streams
  3. Handle the null case gracefully in your code

High Memory Usage with Long Streams

Symptom: Server memory grows with long-running streams.

Cause: Chunks accumulating in memory.

Solution:

  1. Use Redis or Cloudflare instead of in-memory
  2. Set appropriate maxLength limits
  3. Implement periodic cleanup of old chunks

Next Steps

Released under the MIT License.