Skip to content

Resumable Streams

Resumable streams allow clients to reconnect and continue receiving events from where they left off after network interruptions, page refreshes, or server restarts. This guide covers all approaches to stream resumability in Helix Agents.

Why Resumability Matters

Real-world applications face several challenges:

  • Network issues: Mobile networks drop, WiFi switches, connections timeout
  • Page refreshes: Users refresh the browser mid-stream
  • Server crashes: The server restarts while streaming
  • Long-running tasks: Agent executions that take minutes or hours

Without resumability, any of these events would lose all streaming progress, forcing users to restart from scratch.

Key Concepts

Sequence Numbers

Every chunk in a stream can have a sequence number - a monotonically increasing integer that identifies its position:

typescript
interface StoredChunk {
  chunk: StreamChunk;
  sequence: number; // 1, 2, 3, ...
}

Sequence numbers enable clients to say "give me everything after sequence 42" when reconnecting.

Stream Status

Streams have a lifecycle status:

StatusDescriptionClient Action
activeStream is running, new chunks expected.Connect and stream
pausedStream is closed cleanly at a HITL boundary (v7).Reattach via snapshot
endedStream completed successfully.No connection needed
failedStream failed with error.Handle error state

v7 'paused' semantic

In v7, the stream is marked 'paused' whenever a run suspends at a HITL boundary (client-executed tool, approval gate, or awaiting children). The SSR snapshot is fully populated at that point — partial content from the in-flight assistant turn is included so the page can render immediately on refresh — and the stream is closed cleanly. This replaces the v6 model where the stream stayed open and the in-memory runtime kept the connection alive across the human pause.

useResumableChat continues to work unchanged with this model: the hook detects 'paused' and reattaches when the run resumes via the chat handler's prepareHelixChatRequest builder.

Snapshots

A snapshot combines the current state with the stream position:

typescript
interface Snapshot {
  messages: UIMessage[]; // Conversation history
  state: AgentState; // Agent's custom state
  streamSequence: number; // Position to resume from
  status: 'active' | 'paused' | 'ended' | 'failed';
  timestamp: number;
}

Snapshots solve the "sequence-last" problem: by loading state first and getting the sequence last, you guarantee no events are missed.

Content Replay

When a user refreshes mid-stream, they've already seen some content that hasn't been saved to message history yet (because the step isn't complete). Content replay reconstructs this partial content and replays it as stream events, ensuring a seamless experience.

Approaches to Resumability

1. SSE Event IDs (Browser Native)

The simplest approach uses Server-Sent Events' built-in reconnection:

typescript
// Server: Include event ID in SSE stream
for await (const { chunk, sequence } of reader) {
  yield`id: ${sequence}\ndata: ${JSON.stringify(chunk)}\n\n`;
}

When the connection drops, the browser automatically reconnects with the Last-Event-ID header, and the server can resume from that position.

Pros:

  • Zero client code needed
  • Browser handles reconnection automatically

Cons:

  • Only works for connection drops, not page refreshes
  • No state synchronization
  • Can miss events if server restarts

The snapshot approach loads complete state and stream position atomically:

typescript
// Step 1: Get snapshot (state + stream position)
const snapshot = await buildSnapshot(deps, { sessionId });

// Step 2: Initialize UI with snapshot data
const { messages } = useChat({
  initialMessages: snapshot.messages,
  resume: snapshot.status === 'active',
});

// Step 3: Resume stream from snapshot position
// (happens automatically if resume: true)

Pros:

  • Works across page refreshes
  • Handles server restarts
  • State always synchronized
  • Deterministically correct

Cons:

  • Requires snapshot endpoint
  • Slightly more complex setup

3. Content Replay (Mid-Stream Refresh)

Content replay handles the specific case where a user refreshes mid-message:

typescript
// Without content replay:
// - initialMessages has partial text from stream chunks
// - Stream resume sends text-start + text-delta
// - Result: DUPLICATE text in UI

// With content replay (opt-in):
// - initialMessages excludes partial content
// - Stream replays partial content as events first
// - Then continues with live events
// - Result: Seamless, no duplicates

When to use: Enable for production apps. It defaults to false for backward compatibility, but is recommended for any app with page refresh support.

Comparison

ApproachPage RefreshServer RestartState SyncComplexity
SSE Event IDsNoNoNoLow
Snapshot-basedYesYesYesMedium
+ Content ReplayYesYesYesMedium

Recommendation: Use snapshot-based resumption with content replay enabled.

Core SDK Usage

If you're not using the AI SDK integration, you can implement resumable streams directly with the core primitives.

Creating a Resumable Reader

typescript
import { RedisStreamManager } from '@helix-agents/store-redis';

const streamManager = new RedisStreamManager(redis);

// Get stream info first
const info = await streamManager.getStreamInfo(streamId);
if (!info || info.status === 'failed') {
  throw new Error('Stream not available');
}

// Create resumable reader (from a specific position)
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: lastKnownSequence, // Resume from here (exclusive)
});

if (reader) {
  console.log(`Resuming from sequence ${reader.currentSequence}`);
  console.log(`Stream has ${reader.totalChunks} total chunks`);

  for await (const chunk of reader) {
    // Process chunk
    console.log(`Chunk at sequence ${reader.currentSequence}`);
  }
}

Building a Custom Resume Endpoint

typescript
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';

const app = new Hono();

// GET /stream/:sessionId - Stream with resumption support
app.get('/stream/:sessionId', async (c) => {
  const sessionId = c.req.param('sessionId');

  // Check for resume position from headers
  const lastEventId = c.req.header('Last-Event-ID');
  const resumeFrom = lastEventId ? parseInt(lastEventId, 10) : 0;

  // Get stream info
  const info = await streamManager.getStreamInfo(sessionId);
  if (!info) {
    return c.json({ error: 'Stream not found' }, 404);
  }

  if (info.status === 'failed') {
    return c.json({ error: 'Stream failed' }, 410);
  }

  // Create reader (resumable if position provided)
  const reader =
    resumeFrom > 0
      ? await streamManager.createResumableReader(sessionId, { fromSequence: resumeFrom })
      : await streamManager.createReader(sessionId);

  if (!reader) {
    return c.json({ error: 'Cannot read stream' }, 500);
  }

  // Stream SSE response
  return streamSSE(c, async (stream) => {
    let sequence = resumeFrom;

    for await (const chunk of reader) {
      sequence++;
      await stream.writeSSE({
        id: String(sequence),
        data: JSON.stringify(chunk),
      });
    }

    // Send end event
    await stream.writeSSE({
      data: JSON.stringify({ type: 'end' }),
    });
  });
});

// GET /stream/:sessionId/info - Get stream metadata
app.get('/stream/:sessionId/info', async (c) => {
  const sessionId = c.req.param('sessionId');
  const info = await streamManager.getStreamInfo(sessionId);

  if (!info) {
    return c.json({ error: 'Stream not found' }, 404);
  }

  return c.json({
    status: info.status,
    totalChunks: info.totalChunks,
    latestSequence: info.latestSequence,
  });
});

Reconstructing Partial Content

For custom UIs that need to show partial content on refresh:

typescript
// Get all chunks from the current step
const stepCount = checkpoint?.stepCount ?? 0;
const fromStep = stepCount + 1;

// Use optimized method if available
const chunks = streamManager.getChunksFromStep
  ? await streamManager.getChunksFromStep(sessionId, fromStep)
  : await streamManager.getAllChunks(sessionId);

// Reconstruct partial content from chunks
let partialText = '';
let partialReasoning = '';
const toolCalls = new Map();

for (const chunk of chunks) {
  switch (chunk.type) {
    case 'text_delta':
      partialText += chunk.delta;
      break;
    case 'thinking':
      partialReasoning += chunk.content;
      break;
    case 'tool_start':
      toolCalls.set(chunk.toolCallId, {
        name: chunk.toolName,
        input: chunk.arguments,
        state: 'executing',
      });
      break;
    case 'tool_end':
      const tool = toolCalls.get(chunk.toolCallId);
      if (tool) {
        tool.state = chunk.success ? 'completed' : 'error';
        tool.output = chunk.result;
      }
      break;
  }
}

// Use partialText, partialReasoning, toolCalls in your UI

AI SDK Usage

The @helix-agents/ai-sdk package provides higher-level abstractions that handle most resumability concerns automatically.

Quick Start (v7)

The v7+ surface uses two pieces:

typescript
import {
  handleChatStream,
  buildSnapshot,
  type HandleChatStreamParams,
} from '@helix-agents/ai-sdk';

// Shared deps for chat dispatch + snapshot.
const deps = {
  executor,
  stateStore,
  streamManager,
  agent: MyAgent,
  contentReplayEnabled: true, // Content replay opt-in
} as const;

// Drive every chat POST/GET through the seven-path orchestrator.
export const dispatchChat = (params: HandleChatStreamParams) => handleChatStream(deps, params);

// Get snapshot for initializing UI
const snapshot = await buildSnapshot(deps, { sessionId });

// snapshot contains:
// - messages: UIMessage[] (for useChat({ messages }))
// - state: AgentState (custom state)
// - streamSequence: number (resume position)
// - status: 'active' | 'paused' | 'ended' | 'failed'
// - checkpointId: string | null

Client Transport (v7)

HelixChatTransport was deleted in v7

v6's HelixChatTransport class is gone. new HelixChatTransport(...) will throw TypeError. Migrate to AI SDK v6's DefaultChatTransport plus prepareHelixChatRequest AND prepareHelixReconnectRequest from @helix-agents/ai-sdk/client.

Use AI SDK v6's DefaultChatTransport plus the two preparers from @helix-agents/ai-sdk/client:

  • prepareHelixChatRequest — packs AI SDK fields and the X-Resume-From-Sequence / X-Existing-Message-Id headers onto the POST body so the chat handler can reattach to the existing run.
  • prepareHelixReconnectRequest — same headers, but rewrites the AI SDK's reconnect URL (${api}/${chatId}/stream) to the Helix single-path layout. Required for HITL UX — without it, page refresh during a stream silently 404s and tools that hadn't completed stay in pending state forever.
tsx
'use client';

import { useChat } from '@ai-sdk/react';
import { DefaultChatTransport } from 'ai';
import {
  prepareHelixChatRequest,
  prepareHelixReconnectRequest,
} from '@helix-agents/ai-sdk/client';

const helixOptions = {
  api: `/api/chat/${sessionId}`,
  resumeFromSequence: snapshot?.streamSequence,
  existingMessageId: snapshot?.existingMessageId,
};

const transport = new DefaultChatTransport({
  api: `/api/chat/${sessionId}`,
  prepareSendMessagesRequest: prepareHelixChatRequest(helixOptions),
  prepareReconnectToStreamRequest: prepareHelixReconnectRequest(helixOptions),
});

React Integration (v7)

tsx
'use client';

import { useChat } from '@ai-sdk/react';
import { DefaultChatTransport } from 'ai';
import {
  prepareHelixChatRequest,
  prepareHelixReconnectRequest,
} from '@helix-agents/ai-sdk/client';
import { useResumeClientTools } from '@helix-agents/ai-sdk/react';
import { useMemo } from 'react';

function Chat({ sessionId, initialSnapshot }) {
  const shouldResume =
    initialSnapshot.status === 'active' || initialSnapshot.status === 'paused';

  // Pull the most recent assistant message id from the snapshot so the
  // resume request continues that message instead of opening a new bubble.
  const existingMessageId = useMemo(() => {
    for (let i = initialSnapshot.messages.length - 1; i >= 0; i--) {
      const m = initialSnapshot.messages[i];
      if (m?.role === 'assistant') return m.id;
    }
    return undefined;
  }, [initialSnapshot.messages]);

  const transport = useMemo(() => {
    const api = `/api/chat/${sessionId}`;
    const helixOptions = {
      api,
      resumeFromSequence: shouldResume ? initialSnapshot.streamSequence : undefined,
      existingMessageId,
    };
    return new DefaultChatTransport({
      api,
      prepareSendMessagesRequest: prepareHelixChatRequest(helixOptions),
      prepareReconnectToStreamRequest: prepareHelixReconnectRequest(helixOptions),
    });
  }, [sessionId, shouldResume, initialSnapshot.streamSequence, existingMessageId]);

  const chat = useChat({
    id: sessionId,
    transport,
    messages: initialSnapshot.messages,
    resume: shouldResume,
  });

  // Auto-dispatch any pending client-executed tools.
  useResumeClientTools({ chat, toolHandlers: { /* ... */ } });

  return <MessageList messages={chat.messages} />;
}

Content Replay Configuration

Content replay is opt-in via contentReplayEnabled on the deps bundle:

typescript
// Disable content replay globally
const deps = {
  executor,
  stateStore,
  streamManager,
  agent: MyAgent,
  contentReplayEnabled: false,
} as const;

// Or override per-snapshot
const snapshot = await buildSnapshot(deps, {
  sessionId,
  includePartialContent: true, // Force include partial content in snapshot
});

For detailed AI SDK documentation, see AI SDK Integration.

StreamManager Implementations

Different StreamManager implementations have different resumability features:

FeatureInMemoryRedisCloudflare DO
createReaderYesYesYes
createResumableReaderNoYesYes
getStreamInfoYesYesYes
getAllChunksYesYesYes
getChunksFromStepYesYesYes
pauseStream / resumeStreamNoYesYes
Multi-processNoYesYes
PersistenceNoYesYes
TTL/ExpirationNoYesYes

InMemoryStreamManager

Best for development and testing. No persistence or multi-process support.

typescript
import { InMemoryStreamManager } from '@helix-agents/store-memory';

const streamManager = new InMemoryStreamManager();

Limitations:

  • No createResumableReader - use getAllChunks + filtering instead
  • Data lost on process restart
  • Single process only

RedisStreamManager

Production-ready with full resumability support.

typescript
import { RedisStreamManager } from '@helix-agents/store-redis';

const streamManager = new RedisStreamManager(redis, {
  prefix: 'myapp:streams:',
  ttl: 86400, // 24 hours
});

Features:

  • Full createResumableReader with sequence tracking
  • Pub/Sub for real-time delivery
  • Atomic cleanup operations
  • Configurable TTL

DurableObjectStreamManager

For Cloudflare Workers deployments.

typescript
import { DurableObjectStreamManager } from '@helix-agents/store-cloudflare/stream';

const streamManager = new DurableObjectStreamManager({
  streamNamespace: env.STREAMS,
});

Features:

  • HTTP-based resumable reader
  • SSE support with buffer overflow protection
  • Global edge deployment

Use Cases

Chat Applications

For chat apps, use snapshot-based resumption with content replay:

typescript
// Server: Next.js App Router
export async function GET(req, { params }) {
  const snapshot = await buildSnapshot(deps, { sessionId: params.sessionId });
  return Response.json(snapshot);
}

// Client: React
const { snapshot, isLoading } = useResumableChat({
  snapshotUrl: `/api/chat/${sessionId}/snapshot`,
  setMessages,
});

Long-Running Tasks

For tasks that take minutes or hours, add progress tracking:

typescript
const agent = defineAgent({
  name: 'researcher',
  tools: [
    defineTool({
      name: 'search',
      execute: async (input, context) => {
        // Emit progress events
        await context.emit('search_progress', {
          query: input.query,
          status: 'searching',
        });

        const results = await search(input.query);

        await context.emit('search_progress', {
          query: input.query,
          status: 'complete',
          resultCount: results.length,
        });

        return results;
      },
    }),
  ],
});

Clients can track custom events and show progress UI even after reconnection.

Mobile Apps

For mobile apps with unreliable connectivity:

typescript
// Implement aggressive reconnection
let retryCount = 0;
const maxRetries = 10;

async function connectWithRetry(sessionId, fromSequence) {
  while (retryCount < maxRetries) {
    try {
      const response = await fetch(`/stream/${sessionId}`, {
        headers: fromSequence > 0 ? { 'Last-Event-ID': String(fromSequence) } : {},
      });

      if (response.ok) {
        retryCount = 0; // Reset on success
        return response;
      }
    } catch (error) {
      retryCount++;
      // Exponential backoff
      await sleep(Math.min(1000 * Math.pow(2, retryCount), 30000));
    }
  }

  throw new Error('Max retries exceeded');
}

Server-Side Rendering

For SSR with Next.js:

typescript
// Server Component
export default async function ChatPage({ params }) {
  const snapshot = await buildSnapshot(deps, { sessionId: params.sessionId });

  if (!snapshot) {
    notFound();
  }

  // Server renders with messages immediately
  return (
    <div>
      <ChatClient
        sessionId={params.sessionId}
        initialSnapshot={snapshot}
      />
    </div>
  );
}

The page is server-rendered with all messages, then hydrates on the client and resumes streaming if active.

Multi-Turn Conversations

When users send follow-up messages in the same session, special handling is needed to prevent content duplication.

The Problem

In a multi-turn conversation:

  1. User sends message A → Agent responds with chunks 1-50
  2. User sends message B → Agent responds with chunks 51-100
  3. User refreshes during message B response

Without filtering, the client would replay chunks 1-100, showing both responses in the current message.

Sequence Concepts

Two sequence numbers control multi-turn filtering:

SequenceDescriptionUse
startSequencePosition when current run startedFilter for current turn
streamSequenceLatest chunk sequence globallyResume position
typescript
// Before starting follow-up execution
const snapshot = await buildSnapshot(deps, { sessionId });
const startSequence = snapshot.streamSequence; // e.g., 50

// Start new execution
const handle = await executor.execute(agent, 'Follow-up message', { sessionId });

// New chunks will be 51, 52, 53...
// To show only current turn, filter: sequence >= startSequence

Implementing Multi-Turn Filtering

Server-side (recommended):

The run metadata tracks startSequence:

typescript
const run = await stateStore.getCurrentRun(sessionId);
const startSequence = run?.startSequence ?? 0;

// Filter chunks to current run only
const chunks = await streamManager.getChunksFromSequence(sessionId, startSequence);

Client-side with prepareHelixChatRequest:

The preparer handles this automatically when resumeFromSequence is set correctly:

typescript
import { DefaultChatTransport } from 'ai';
import {
  prepareHelixChatRequest,
  prepareHelixReconnectRequest,
} from '@helix-agents/ai-sdk/client';

// On follow-up message, get current sequence BEFORE execution
const preExecSnapshot = await fetch(`/api/chat/${sessionId}/snapshot`).then((r) => r.json());
const startSequence = preExecSnapshot.streamSequence;

const helixOptions = {
  api: `/api/chat/${sessionId}`,
  resumeFromSequence: startSequence,
};

const transport = new DefaultChatTransport({
  api: `/api/chat/${sessionId}`,
  prepareSendMessagesRequest: prepareHelixChatRequest(helixOptions),
  prepareReconnectToStreamRequest: prepareHelixReconnectRequest(helixOptions),
});

Content Replay with Multi-Turn

Content replay works per-turn. When buildSnapshot() is called:

  1. It identifies the current run's startSequence
  2. Reconstructs partial content only from chunks >= startSequence
  3. Returns messages excluding the current turn's partial content
  4. On resume, replays only the current turn's partial content
typescript
// Snapshot automatically handles multi-turn
const snapshot = await buildSnapshot(deps, { sessionId });

// snapshot.messages: All completed messages + previous turns
// snapshot.streamSequence: Resume position
// snapshot.startSequence: Current turn start (used internally for filtering)

Follow-Up Message Pattern

Complete pattern for follow-up messages:

typescript
async function sendFollowUp(sessionId: string, message: string) {
  // 1. Get current sequence before execution
  const preSnapshot = await buildSnapshot(deps, { sessionId });
  const startSequence = preSnapshot.streamSequence;

  // 2. Start execution (will create new run with startSequence set)
  const response = await fetch(`/api/chat/${sessionId}`, {
    method: 'POST',
    body: JSON.stringify({ message }),
  });

  // 3. If user refreshes, resume from startSequence
  // The new run's chunks will be >= startSequence
  // Content replay will only include current turn's partial content

  return response;
}

Common Multi-Turn Issues

Duplicate content on refresh:

  • Cause: Resuming from 0 instead of startSequence
  • Fix: Always pass startSequence to resumeFromSequence

Old messages appearing in stream:

  • Cause: Chunks from previous runs being replayed
  • Fix: Filter by startSequence before streaming

Empty response after follow-up:

  • Cause: Using wrong sequence (too high)
  • Fix: Capture sequence BEFORE starting execution

Best Practices

1. Use the Sequence-Last Pattern

Always get the stream sequence after loading state:

typescript
// CORRECT: State first, sequence last
const state = await stateStore.load(sessionId);
const messages = convertToUIMessages(state.messages);
const info = await streamManager.getStreamInfo(sessionId);
const sequence = info?.latestSequence ?? 0;

// WRONG: Sequence first, state second
const info = await streamManager.getStreamInfo(sessionId); // Gets sequence
const state = await stateStore.load(sessionId); // State may have changed!
// Events between these calls could be missed

This ensures any events that occur during loading are captured when resuming.

2. Handle Deduplication

When resuming, you may receive some duplicate events. Handle gracefully:

typescript
const seenSequences = new Set();

for await (const { chunk, sequence } of reader) {
  if (seenSequences.has(sequence)) {
    continue; // Skip duplicates
  }
  seenSequences.add(sequence);

  processChunk(chunk);
}

3. Configure Appropriate TTLs

Set TTLs based on your use case:

typescript
// Short-lived chats (1 hour)
const streamManager = new RedisStreamManager(redis, { ttl: 3600 });

// Long-running research tasks (7 days)
const streamManager = new RedisStreamManager(redis, { ttl: 86400 * 7 });

4. Clean Up Completed Streams

Don't leave completed streams indefinitely:

typescript
// After getting final result
const result = await handle.result();

// Optionally clean up if no longer needed
if (!needToKeepHistory) {
  await stateStore.delete(sessionId);
}

Troubleshooting

Events Lost on Page Refresh

Symptom: Some text disappears when refreshing mid-stream.

Cause: Content replay may be disabled or not working.

Solution:

  1. Ensure contentReplay: { enabled: true } (default)
  2. Check that getSnapshot() is being called
  3. Verify the stream status is 'active'

Duplicate Content After Resume

Symptom: Same text appears twice after reconnection.

Cause: Content replay is disabled but partial content is in initialMessages.

Solution:

  1. Enable content replay, OR
  2. Call getSnapshot() with includePartialContent: false

Stream Shows as Failed

Symptom: getStreamInfo() returns status: 'failed'.

Cause: The stream was explicitly failed or encountered an error.

Solution:

  1. Check info.error for the error message
  2. Check server logs for the failure reason
  3. The client should show an error state, not attempt to resume

createResumableReader Returns Null

Symptom: createResumableReader() returns null.

Cause: Stream doesn't exist or has failed status.

Solution:

  1. Check getStreamInfo() first
  2. Use createReader() as fallback for active streams
  3. Handle the null case gracefully in your code

High Memory Usage with Long Streams

Symptom: Server memory grows with long-running streams.

Cause: Chunks accumulating in memory.

Solution:

  1. Use Redis or Cloudflare instead of in-memory
  2. Set appropriate maxLength limits
  3. Implement periodic cleanup of old chunks

Next Steps

Released under the MIT License.