Skip to content

Cloudflare Storage

The Cloudflare storage package (@helix-agents/store-cloudflare) provides state and stream storage for edge deployment. There are two sets of stores depending on which runtime approach you use:

ApproachState StoreStream ManagerPackage
WorkflowsD1StateStoreDurableObjectStreamManager@helix-agents/store-cloudflare
Durable ObjectsDOStateStoreDOStreamManager@helix-agents/runtime-cloudflare

Choose Based on Runtime

  • Workflows runtime: Use D1StateStore + DurableObjectStreamManager from @helix-agents/store-cloudflare
  • DO runtime: Use DOStateStore + DOStreamManager from @helix-agents/runtime-cloudflare (built into AgentServer)

When to Use

Good fit:

  • Cloudflare Workers deployments
  • Global edge distribution
  • Serverless architecture
  • Existing Cloudflare infrastructure

Not ideal for:

  • Non-Cloudflare environments
  • High write throughput (D1 limitations - Workflows only)
  • Complex queries (limited SQL support)

Installation

bash
npm install @helix-agents/store-cloudflare

Entry Points and Import Patterns

The package provides three entry points to support different deployment contexts:

Entry PointUse CaseContains
@helix-agents/store-cloudflareWorker entry points (full package)Everything
@helix-agents/store-cloudflare/d1Next.js routes, any Node.js contextD1StateStore, D1UsageStore, migrations
@helix-agents/store-cloudflare/streamWorker entry points onlyStreamServer, DurableObjectStreamManager

Why Split Entry Points?

The /stream entry point imports partyserver which depends on cloudflare:workers - a Cloudflare-specific module that cannot be imported in:

  • Next.js API routes (even with OpenNext for Cloudflare)
  • Node.js environments
  • Any code that runs during Next.js build

This is because:

  1. Build-time evaluation: Next.js evaluates all imports during build, even for server routes
  2. cloudflare:workers is runtime-only: The cloudflare:workers URL scheme only works inside the Cloudflare Workers runtime
  3. Durable Object class definitions vs bindings: You CAN access Durable Objects via env bindings, but you CANNOT import the class definition outside a Worker

Correct Import Patterns

In Next.js API routes (OpenNext):

typescript
// ✅ CORRECT - Use /d1 entry point
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare/d1';

export async function GET(request: Request) {
  const { env } = getCloudflareContext();
  const stateStore = new D1StateStore({ database: env.DATABASE });
  // Access Durable Objects via binding (NOT import)
  const streamStub = env.STREAMS.get(env.STREAMS.idFromName('my-stream'));
}

// ❌ WRONG - Will fail during Next.js build
import { D1StateStore } from '@helix-agents/store-cloudflare';
import { StreamServer } from '@helix-agents/store-cloudflare/stream';

In Cloudflare Worker entry points:

typescript
// ✅ CORRECT - Worker entry points can import everything
import { StreamServer } from '@helix-agents/store-cloudflare/stream';
import { D1StateStore } from '@helix-agents/store-cloudflare/d1';

// Export Durable Object class for Cloudflare to instantiate
export { StreamServer };

export default {
  async fetch(request: Request, env: Env) {
    // Now you can use both D1 and stream functionality
    const stateStore = new D1StateStore({ database: env.DB });
    // Access StreamServer via binding
    const stub = env.STREAMS.get(env.STREAMS.idFromName('stream-id'));
  },
};

Architecture Pattern for Next.js + Cloudflare

When using OpenNext with Cloudflare:

mermaid
graph TB
    subgraph NextJS ["Next.js App (OpenNext)"]
        subgraph API ["API Routes"]
            A1["Import from @helix-agents/store-cloudflare/d1"]
            A2["Query D1 directly for state/messages"]
            A3["Access workers via env.WORKER_NAME service binding"]
        end
    end

    NextJS -->|"Service Binding"| AgentWorker

    subgraph AgentWorker ["Agent Worker (separate worker)"]
        subgraph Entry ["Entry Point (src/index.ts)"]
            E1["Export StreamServer (Durable Object class)"]
            E2["Import from any entry point"]
            E3["Handle agent execution"]
        end
    end

Prerequisites

You need:

  • Cloudflare account with Workers Paid plan
  • D1 database created
  • Durable Object namespace configured

D1StateStore

Setup

Configure D1 in wrangler.toml:

toml
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"

Migrations

The D1StateStore uses programmatic migrations that are automatically applied. You should call runMigration() on startup:

typescript
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';

export default {
  async fetch(request: Request, env: Env) {
    // Run migrations on startup (safe to call every request - no-op if already migrated)
    await runMigration(env.DB);

    const stateStore = new D1StateStore({ database: env.DB });
    // ...
  }
};

The framework creates these tables automatically (all prefixed with __agents_):

sql
-- Core state table (session-centric model)
CREATE TABLE IF NOT EXISTS __agents_states (
  session_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  parent_session_id TEXT,
  stream_id TEXT NOT NULL,
  step_count INTEGER NOT NULL DEFAULT 0,
  status TEXT NOT NULL DEFAULT 'active',
  output TEXT,
  error TEXT,
  aborted INTEGER NOT NULL DEFAULT 0,
  abort_reason TEXT,
  custom_state TEXT NOT NULL DEFAULT '{}',
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);

-- Messages table (separated for O(1) append)
CREATE TABLE IF NOT EXISTS __agents_messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  session_id TEXT NOT NULL,
  sequence INTEGER NOT NULL,
  message TEXT NOT NULL,
  created_at INTEGER NOT NULL,
  UNIQUE(session_id, sequence)
);

-- Sub-session refs table (session-centric naming)
CREATE TABLE IF NOT EXISTS __agents_sub_session_refs (
  session_id TEXT NOT NULL,
  sub_session_id TEXT NOT NULL,
  agent_type TEXT NOT NULL,
  parent_tool_call_id TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  started_at INTEGER NOT NULL,
  completed_at INTEGER,
  PRIMARY KEY (session_id, sub_session_id)
);

Additional tables are created for usage tracking, checkpoints, and interrupt flags. See d1-migrations.ts for the full schema.

Usage

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

export default {
  async fetch(request: Request, env: Env) {
    const stateStore = new D1StateStore({ database: env.DB });

    // Save state
    await stateStore.saveState(sessionId, sessionState);

    // Load state
    const loaded = await stateStore.loadState('session-123');

    // Delete state
    await stateStore.deleteState('session-123');
  },
};

Atomic Operations

typescript
// Append messages
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);

// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages('session-123', { limit: 100 });

// Update state atomically
await stateStore.saveState('session-123', updatedState);

Message Truncation

For crash recovery scenarios:

typescript
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('session-123', messageCount);

This removes messages beyond the specified count, used during crash recovery to sync messages with a checkpoint's messageCount field.

Interrupt Flag Operations

D1StateStore implements the InterruptableStateStore interface, enabling soft interruption of running agents. This is required for the Cloudflare Workflows runtime's interrupt functionality.

typescript
import { isInterruptableStateStore } from '@helix-agents/core';

// Check if store supports interrupts
if (isInterruptableStateStore(stateStore)) {
  // Set interrupt flag (called by executor.interrupt())
  await stateStore.setInterruptFlag('session-123', 'user_requested');

  // Check for interrupt (called in workflow loop)
  const flag = await stateStore.checkInterruptFlag('session-123');
  if (flag) {
    console.log(`Interrupted: ${flag.reason}`);
  }

  // Clear flag (called on resume or after handling)
  await stateStore.clearInterruptFlag('session-123');
}

InterruptableStateStore Support

Currently, only D1StateStore implements InterruptableStateStore. The in-memory and Redis state stores do not support interrupt flags. If using the Cloudflare Workflows runtime, you must use D1StateStore for interrupt functionality to work.

The interrupt flags are automatically created by the programmatic migrations in a dedicated table:

sql
CREATE TABLE IF NOT EXISTS __agents_interrupt_flags (
  session_id TEXT PRIMARY KEY,
  interrupted INTEGER NOT NULL DEFAULT 0,
  reason TEXT,
  timestamp INTEGER
);

See Interrupt and Resume for complete documentation on interrupt behavior, including sub-agent interrupt propagation.

Programmatic Migrations

For dynamic environments:

typescript
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';

// Run migrations on first request
let migrated = false;

export default {
  async fetch(request: Request, env: Env) {
    if (!migrated) {
      await runMigration(env.DB);
      migrated = true;
    }

    const stateStore = new D1StateStore({ database: env.DB });
    // ...
  },
};

DOStreamManager (Durable Objects)

Setup

Configure Durable Objects in wrangler.toml:

toml
[durable_objects]
bindings = [
  { name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]

[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]

Durable Object Implementation

typescript
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
import type { StreamChunk } from '@helix-agents/core';

interface StreamState {
  chunks: StreamChunk[];
  status: 'active' | 'ended' | 'failed';
  endedAt?: number;
  error?: string;
}

export class StreamManagerDO extends DurableObject {
  private state: StreamState = {
    chunks: [],
    status: 'active',
  };

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    // Load state from storage
    ctx.blockConcurrencyWhile(async () => {
      const stored = await ctx.storage.get<StreamState>('state');
      if (stored) {
        this.state = stored;
      }
    });
  }

  async write(chunk: StreamChunk): Promise<void> {
    if (this.state.status !== 'active') {
      throw new Error(`Cannot write to ${this.state.status} stream`);
    }

    this.state.chunks.push(chunk);
    await this.ctx.storage.put('state', this.state);

    // Notify WebSocket clients
    for (const ws of this.ctx.getWebSockets()) {
      ws.send(JSON.stringify(chunk));
    }
  }

  async read(fromOffset: number = 0): Promise<StreamChunk[]> {
    return this.state.chunks.slice(fromOffset);
  }

  async end(finalOutput?: unknown): Promise<void> {
    this.state.status = 'ended';
    this.state.endedAt = Date.now();
    await this.ctx.storage.put('state', this.state);

    // Close WebSocket connections
    for (const ws of this.ctx.getWebSockets()) {
      ws.close(1000, 'Stream ended');
    }
  }

  async fail(error: string): Promise<void> {
    this.state.status = 'failed';
    this.state.error = error;
    await this.ctx.storage.put('state', this.state);

    for (const ws of this.ctx.getWebSockets()) {
      ws.close(1011, error);
    }
  }

  async getInfo(): Promise<{ status: string; chunkCount: number }> {
    return {
      status: this.state.status,
      chunkCount: this.state.chunks.length,
    };
  }

  async fetch(request: Request): Promise<Response> {
    // WebSocket upgrade for real-time streaming
    if (request.headers.get('Upgrade') === 'websocket') {
      const [client, server] = Object.values(new WebSocketPair());
      this.ctx.acceptWebSocket(server);

      // Send historical chunks
      for (const chunk of this.state.chunks) {
        server.send(JSON.stringify(chunk));
      }

      return new Response(null, { status: 101, webSocket: client });
    }

    return new Response('Expected WebSocket', { status: 400 });
  }
}

Stream Manager Wrapper

typescript
// src/do-stream-manager.ts
import type { StreamManager, StreamWriter, StreamReader, StreamChunk } from '@helix-agents/core';

export class DOStreamManager implements StreamManager {
  constructor(private readonly binding: DurableObjectNamespace) {}

  async createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter> {
    const stub = this.getStub(streamId);

    return {
      write: async (chunk: StreamChunk) => {
        await stub.write(chunk);
      },
      close: async () => {
        // Writer close is a no-op - stream continues
      },
    };
  }

  async createReader(streamId: string): Promise<StreamReader | null> {
    const stub = this.getStub(streamId);
    const info = await stub.getInfo();

    if (info.status === 'failed') {
      return null;
    }

    let offset = 0;
    let done = false;

    return {
      [Symbol.asyncIterator]: () => ({
        next: async () => {
          if (done) {
            return { done: true, value: undefined };
          }

          const chunks = await stub.read(offset);

          if (chunks.length > 0) {
            offset += chunks.length;
            return { done: false, value: chunks[0] };
          }

          // Check if stream ended
          const currentInfo = await stub.getInfo();
          if (currentInfo.status === 'ended') {
            done = true;
            return { done: true, value: undefined };
          }

          // Poll for new chunks (in practice, use WebSocket)
          await new Promise((r) => setTimeout(r, 100));
          return this.next();
        },
      }),
      close: async () => {},
    };
  }

  async endStream(streamId: string, finalOutput?: unknown): Promise<void> {
    const stub = this.getStub(streamId);
    await stub.end(finalOutput);
  }

  async failStream(streamId: string, error: string): Promise<void> {
    const stub = this.getStub(streamId);
    await stub.fail(error);
  }

  private getStub(streamId: string) {
    const id = this.binding.idFromName(streamId);
    return this.binding.get(id);
  }
}

Export Durable Object

typescript
// src/index.ts
export { StreamManagerDO } from './stream-manager-do';

Complete Example

typescript
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
import { DOStreamManager } from './do-stream-manager';
import { StreamManagerDO } from './stream-manager-do';
import { registry } from './agents';

export { StreamManagerDO };

export interface Env {
  DB: D1Database;
  STREAM_MANAGER: DurableObjectNamespace;
  AGENT_WORKFLOW: Workflow;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Run migrations on startup
    await runMigration(env.DB);

    const stateStore = new D1StateStore({ database: env.DB });
    const streamManager = new DOStreamManager(env.STREAM_MANAGER);

    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore,
      streamManager,
    });

    // POST /execute
    if (url.pathname === '/execute' && request.method === 'POST') {
      const { agentType, message, sessionId } = await request.json();
      const agent = registry.get(agentType);

      const handle = await executor.execute(agent, { message }, { sessionId });

      return Response.json({
        sessionId: handle.sessionId,
        streamUrl: `/stream/${handle.sessionId}`,
      });
    }

    // GET /stream/:sessionId (SSE)
    if (url.pathname.startsWith('/stream/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const handle = await executor.getHandle(registry.get('default'), sessionId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const stream = await handle.stream();
      if (!stream) {
        return new Response('Stream not available', { status: 404 });
      }

      return new Response(
        new ReadableStream({
          async start(controller) {
            const encoder = new TextEncoder();
            for await (const chunk of stream) {
              controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
            }
            controller.close();
          },
        }),
        {
          headers: {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
          },
        }
      );
    }

    // GET /result/:sessionId
    if (url.pathname.startsWith('/result/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const handle = await executor.getHandle(registry.get('default'), sessionId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const result = await handle.result();
      return Response.json(result);
    }

    return new Response('Not found', { status: 404 });
  },
};

D1 Considerations

Write Limits

D1 has write throughput limits. For high-volume writes:

typescript
// The framework handles message batching internally via appendMessages()
// For custom implementations, batch message inserts like this:
const messages = [msg1, msg2, msg3];
const stmt = env.DB.prepare(
  'INSERT INTO __agents_messages (session_id, sequence, message, created_at) VALUES (?, ?, ?, ?)'
);

await env.DB.batch(
  messages.map((msg, i) => stmt.bind(sessionId, baseSequence + i, JSON.stringify(msg), Date.now()))
);

Read Replicas

D1 supports read replicas for global reads:

typescript
// Reads are automatically routed to nearest replica
const state = await stateStore.loadState(sessionId);

// Writes go to primary
await stateStore.saveState(sessionId, state);

SQLite Limitations

D1 is SQLite-based:

  • No stored procedures
  • Limited concurrent writes
  • JSON stored as TEXT (use JSON1 functions)

Durable Object Considerations

Geographic Pinning

Durable Objects are pinned to a region:

typescript
// First access pins the DO to nearest region
const stub = env.STREAM_MANAGER.get(id);

// Subsequent access goes to that region
// For global apps, consider DO per-region

Memory Limits

Durable Objects have memory limits:

typescript
// For large streams, persist to storage
async write(chunk: StreamChunk) {
  this.state.chunks.push(chunk);

  // Persist every N chunks
  if (this.state.chunks.length % 100 === 0) {
    await this.ctx.storage.put('state', this.state);
  }
}

WebSocket Limits

Limited concurrent WebSocket connections:

typescript
// Max ~32 WebSocket connections per DO
// For more, use pub/sub pattern or multiple DOs

Deployment

Create D1 Database

bash
wrangler d1 create agent-state

Run Migrations

bash
wrangler d1 migrations apply agent-state

Deploy

bash
wrangler deploy

Secrets

bash
wrangler secret put OPENAI_API_KEY

DO Runtime Stores

When using the Durable Objects runtime, stores are built into the AgentServer DO:

DOStateStore

Uses the DO's built-in SQLite for state persistence:

typescript
import { DOStateStore } from '@helix-agents/runtime-cloudflare';

// Created internally by AgentServer
const stateStore = new DOStateStore({
  sql: this.sql, // PartyServer's sql tagged template
  logger: this.logger,
});

// Same SessionStateStore interface
await stateStore.saveState(sessionId, state);
const loaded = await stateStore.loadState(sessionId);
await stateStore.appendMessages(sessionId, messages);

Key differences from D1StateStore:

  • Uses synchronous this.sql template literals (not async D1 queries)
  • Single-session-per-DO architecture (no session_id filtering needed)
  • No race conditions (single-threaded DO execution)
  • Tables created automatically on first use

DOStateStore Schema v3 (v0.10+)

Breaking Change

DOStateStore schema v3 introduces new columns for crash recovery. Existing Durable Objects with schema v2 will throw SchemaVersionMismatchError on first access after upgrading.

What Changed

Schema v3 adds two columns to the checkpoints table:

ColumnTypePurpose
message_countINTEGERExact message count at checkpoint time
stream_sequenceINTEGERStream position for resumption

These fields enable reliable crash recovery:

  1. Message truncation: On resume, truncateMessages() uses message_count to remove orphaned messages written after the checkpoint
  2. Stream cleanup: Stream managers use stream_sequence to remove orphaned chunks
  3. Client resumption: Clients can resume streams from the stored sequence position

Why Store These Values?

Previously, these values were derived from state at load time. This caused issues:

  • Race conditions: Derived values could be stale if state changed concurrently
  • Data loss: If messages were appended after checkpoint but before crash, derived count wouldn't match actual stored messages

Now the exact values at checkpoint time are persisted, ensuring crash recovery works correctly.

Migration Steps

For new deployments: No action needed. New DOs automatically use schema v3.

For existing deployments with v2 schema:

  1. Create new DO instances: The safest approach is to use new DO instances for new sessions

    typescript
    // Use a new naming convention or namespace for v3
    const doId = env.AGENTS.idFromName(`v3:session:${sessionId}`);
  2. Or migrate existing DOs: If you need to preserve existing data, you can manually run the migration SQL on each DO (advanced):

    sql
    ALTER TABLE checkpoints ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0;
    ALTER TABLE checkpoints ADD COLUMN stream_sequence INTEGER NOT NULL DEFAULT 0;
    UPDATE schema_version SET version = 3;
  3. Handle the error gracefully: If migration isn't possible, catch SchemaVersionMismatchError:

    typescript
    try {
      const response = await stub.fetch('/start', { ... });
    } catch (error) {
      if (error.message?.includes('SchemaVersionMismatch')) {
        // Redirect to new DO instance
        const newDoId = env.AGENTS.idFromName(`v3:${originalName}`);
        // ...
      }
    }

DOStreamManager

Streams directly to connected WebSocket/SSE clients:

typescript
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';

// Created internally by AgentServer
const streamManager = new DOStreamManager({
  sql: this.sql,
  getConnections: () => this.getConnections(),
  broadcast: (msg) => this.broadcast(msg),
  sseConnections: this.sseConnections,
  logger: this.logger,
});

// Writes to SQLite AND broadcasts to all connected clients
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk); // FREE - no subrequest!

Key differences from DurableObjectStreamManager:

  • Writes directly to WebSocket/SSE connections (no subrequest cost)
  • Also persists to SQLite for resumability
  • Built into AgentServer (not separate DO)
  • Supports hibernation (connections survive DO sleep/wake)

Stream cleanup methods:

typescript
// Clean up chunks beyond a specific step (for crash recovery)
await streamManager.cleanupToStep(streamId, stepCount);

// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream(streamId);

cleanupToStep() removes chunks with step > stepCount, used during crash recovery. resetStream() clears all chunks, called by execute() for fresh starts.

DOUsageStore

Tracks token/tool/sub-agent usage using DO's SQLite:

typescript
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';

// Created internally by AgentServer
const usageStore = new DOUsageStore({
  sql: this.sql,
  sessionId: 'session-123',
  logger: this.logger,
});

// Same UsageStore interface
await usageStore.recordEntry(entry);
const rollup = await usageStore.getRollup(sessionId);
const entries = await usageStore.getEntries(sessionId, { kinds: ['tokens'] });

Key differences from D1UsageStore:

  • Uses synchronous this.sql template literals (not async D1 queries)
  • Single-session-per-DO architecture (sessionId passed at construction)
  • Created automatically by AgentServer
  • Usage accessible via /usage endpoint on the DO

Automatic usage tracking:

AgentServer automatically creates DOUsageStore and passes it to the JSAgentExecutor. After execution, retrieve usage via the handle:

typescript
const handle = await executor.execute(agent, 'Do the task');
await handle.result();

// Get usage rollup
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);

Automatic Schema

DOStateStore creates these tables automatically:

sql
CREATE TABLE state (
  id INTEGER PRIMARY KEY DEFAULT 1,
  session_id TEXT NOT NULL,
  agent_type TEXT NOT NULL,
  stream_id TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  step_count INTEGER DEFAULT 0,
  custom_state TEXT,
  output TEXT,
  error TEXT,
  checkpoint_id TEXT,
  ...
);

CREATE TABLE messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  role TEXT NOT NULL,
  content TEXT,
  tool_calls TEXT,
  ...
);

CREATE TABLE stream_chunks (
  sequence INTEGER PRIMARY KEY,
  chunk TEXT NOT NULL,
  created_at INTEGER NOT NULL
);

Next Steps

Released under the MIT License.