Skip to content

Cloudflare Storage

The Cloudflare storage package (@helix-agents/store-cloudflare) provides state and stream storage using Cloudflare's D1 database and Durable Objects. Designed for edge deployment with the Cloudflare runtime.

When to Use

Good fit:

  • Cloudflare Workers deployments
  • Global edge distribution
  • Serverless architecture
  • Existing Cloudflare infrastructure

Not ideal for:

  • Non-Cloudflare environments
  • High write throughput (D1 limitations)
  • Complex queries (limited SQL support)

Installation

bash
npm install @helix-agents/store-cloudflare

Prerequisites

You need:

  • Cloudflare account with Workers Paid plan
  • D1 database created
  • Durable Object namespace configured

D1StateStore

Setup

Configure D1 in wrangler.toml:

toml
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"

Migrations

Create the required tables:

sql
-- migrations/0001_create_tables.sql
CREATE TABLE IF NOT EXISTS agent_state (
  run_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  stream_id TEXT NOT NULL,
  parent_agent_id TEXT,
  status TEXT NOT NULL DEFAULT 'running',
  step_count INTEGER DEFAULT 0,
  custom_state TEXT,  -- JSON
  output TEXT,        -- JSON
  error TEXT,
  aborted INTEGER DEFAULT 0,
  abort_reason TEXT,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);

CREATE TABLE IF NOT EXISTS messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  run_id TEXT NOT NULL,
  role TEXT NOT NULL,
  content TEXT,
  tool_calls TEXT,     -- JSON
  tool_call_id TEXT,
  tool_name TEXT,
  thinking TEXT,       -- JSON
  sequence INTEGER NOT NULL,
  created_at INTEGER NOT NULL,
  FOREIGN KEY (run_id) REFERENCES agent_state(run_id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_messages_run_id ON messages(run_id);
CREATE INDEX IF NOT EXISTS idx_messages_sequence ON messages(run_id, sequence);

CREATE TABLE IF NOT EXISTS sub_agent_refs (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  parent_run_id TEXT NOT NULL,
  sub_agent_run_id TEXT NOT NULL,
  agent_type TEXT NOT NULL,
  parent_tool_call_id TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'running',
  started_at INTEGER NOT NULL,
  completed_at INTEGER,
  FOREIGN KEY (parent_run_id) REFERENCES agent_state(run_id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_sub_agent_refs_parent ON sub_agent_refs(parent_run_id);

Apply migrations:

bash
wrangler d1 migrations apply agent-state

Usage

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

export default {
  async fetch(request: Request, env: Env) {
    const stateStore = new D1StateStore(env.DB);

    // Save state
    await stateStore.save(state);

    // Load state
    const loaded = await stateStore.load('run-123');

    // Delete state
    await stateStore.delete('run-123');
  },
};

Atomic Operations

typescript
// Append messages
await stateStore.appendMessages('run-123', [{ role: 'assistant', content: 'Hello!' }]);

// Update status
await stateStore.updateStatus('run-123', 'completed');

// Increment step count
const newCount = await stateStore.incrementStepCount('run-123');

// Merge custom state
await stateStore.mergeCustomState('run-123', {
  values: { count: 5 },
  arrayReplacements: new Set(),
  warnings: [],
});

Programmatic Migrations

For dynamic environments:

typescript
import { D1StateStore, runMigrations } from '@helix-agents/store-cloudflare';

// Run migrations on first request
let migrated = false;

export default {
  async fetch(request: Request, env: Env) {
    if (!migrated) {
      await runMigrations(env.DB);
      migrated = true;
    }

    const stateStore = new D1StateStore(env.DB);
    // ...
  },
};

DOStreamManager (Durable Objects)

Setup

Configure Durable Objects in wrangler.toml:

toml
[durable_objects]
bindings = [
  { name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]

[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]

Durable Object Implementation

typescript
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
import type { StreamChunk } from '@helix-agents/core';

interface StreamState {
  chunks: StreamChunk[];
  status: 'active' | 'ended' | 'failed';
  endedAt?: number;
  error?: string;
}

export class StreamManagerDO extends DurableObject {
  private state: StreamState = {
    chunks: [],
    status: 'active',
  };

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    // Load state from storage
    ctx.blockConcurrencyWhile(async () => {
      const stored = await ctx.storage.get<StreamState>('state');
      if (stored) {
        this.state = stored;
      }
    });
  }

  async write(chunk: StreamChunk): Promise<void> {
    if (this.state.status !== 'active') {
      throw new Error(`Cannot write to ${this.state.status} stream`);
    }

    this.state.chunks.push(chunk);
    await this.ctx.storage.put('state', this.state);

    // Notify WebSocket clients
    for (const ws of this.ctx.getWebSockets()) {
      ws.send(JSON.stringify(chunk));
    }
  }

  async read(fromOffset: number = 0): Promise<StreamChunk[]> {
    return this.state.chunks.slice(fromOffset);
  }

  async end(finalOutput?: unknown): Promise<void> {
    this.state.status = 'ended';
    this.state.endedAt = Date.now();
    await this.ctx.storage.put('state', this.state);

    // Close WebSocket connections
    for (const ws of this.ctx.getWebSockets()) {
      ws.close(1000, 'Stream ended');
    }
  }

  async fail(error: string): Promise<void> {
    this.state.status = 'failed';
    this.state.error = error;
    await this.ctx.storage.put('state', this.state);

    for (const ws of this.ctx.getWebSockets()) {
      ws.close(1011, error);
    }
  }

  async getInfo(): Promise<{ status: string; chunkCount: number }> {
    return {
      status: this.state.status,
      chunkCount: this.state.chunks.length,
    };
  }

  async fetch(request: Request): Promise<Response> {
    // WebSocket upgrade for real-time streaming
    if (request.headers.get('Upgrade') === 'websocket') {
      const [client, server] = Object.values(new WebSocketPair());
      this.ctx.acceptWebSocket(server);

      // Send historical chunks
      for (const chunk of this.state.chunks) {
        server.send(JSON.stringify(chunk));
      }

      return new Response(null, { status: 101, webSocket: client });
    }

    return new Response('Expected WebSocket', { status: 400 });
  }
}

Stream Manager Wrapper

typescript
// src/do-stream-manager.ts
import type { StreamManager, StreamWriter, StreamReader, StreamChunk } from '@helix-agents/core';

export class DOStreamManager implements StreamManager {
  constructor(private readonly binding: DurableObjectNamespace) {}

  async createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter> {
    const stub = this.getStub(streamId);

    return {
      write: async (chunk: StreamChunk) => {
        await stub.write(chunk);
      },
      close: async () => {
        // Writer close is a no-op - stream continues
      },
    };
  }

  async createReader(streamId: string): Promise<StreamReader | null> {
    const stub = this.getStub(streamId);
    const info = await stub.getInfo();

    if (info.status === 'failed') {
      return null;
    }

    let offset = 0;
    let done = false;

    return {
      [Symbol.asyncIterator]: () => ({
        next: async () => {
          if (done) {
            return { done: true, value: undefined };
          }

          const chunks = await stub.read(offset);

          if (chunks.length > 0) {
            offset += chunks.length;
            return { done: false, value: chunks[0] };
          }

          // Check if stream ended
          const currentInfo = await stub.getInfo();
          if (currentInfo.status === 'ended') {
            done = true;
            return { done: true, value: undefined };
          }

          // Poll for new chunks (in practice, use WebSocket)
          await new Promise((r) => setTimeout(r, 100));
          return this.next();
        },
      }),
      close: async () => {},
    };
  }

  async endStream(streamId: string, finalOutput?: unknown): Promise<void> {
    const stub = this.getStub(streamId);
    await stub.end(finalOutput);
  }

  async failStream(streamId: string, error: string): Promise<void> {
    const stub = this.getStub(streamId);
    await stub.fail(error);
  }

  private getStub(streamId: string) {
    const id = this.binding.idFromName(streamId);
    return this.binding.get(id);
  }
}

Export Durable Object

typescript
// src/index.ts
export { StreamManagerDO } from './stream-manager-do';

Complete Example

typescript
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore } from '@helix-agents/store-cloudflare';
import { DOStreamManager } from './do-stream-manager';
import { StreamManagerDO } from './stream-manager-do';
import { registry } from './agents';

export { StreamManagerDO };

export interface Env {
  DB: D1Database;
  STREAM_MANAGER: DurableObjectNamespace;
  AGENT_WORKFLOW: Workflow;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    const stateStore = new D1StateStore(env.DB);
    const streamManager = new DOStreamManager(env.STREAM_MANAGER);

    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore,
      streamManager,
    });

    // POST /execute
    if (url.pathname === '/execute' && request.method === 'POST') {
      const { agentType, message } = await request.json();
      const agent = registry.get(agentType);

      const handle = await executor.execute(agent, message);

      return Response.json({
        runId: handle.runId,
        streamUrl: `/stream/${handle.runId}`,
      });
    }

    // GET /stream/:runId (SSE)
    if (url.pathname.startsWith('/stream/')) {
      const runId = url.pathname.split('/').pop()!;
      const handle = await executor.getHandle(registry.get('default'), runId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const stream = await handle.stream();
      if (!stream) {
        return new Response('Stream not available', { status: 404 });
      }

      return new Response(
        new ReadableStream({
          async start(controller) {
            const encoder = new TextEncoder();
            for await (const chunk of stream) {
              controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
            }
            controller.close();
          },
        }),
        {
          headers: {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
          },
        }
      );
    }

    // GET /result/:runId
    if (url.pathname.startsWith('/result/')) {
      const runId = url.pathname.split('/').pop()!;
      const handle = await executor.getHandle(registry.get('default'), runId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const result = await handle.result();
      return Response.json(result);
    }

    return new Response('Not found', { status: 404 });
  },
};

D1 Considerations

Write Limits

D1 has write throughput limits. For high-volume writes:

typescript
// Batch message inserts
const messages = [msg1, msg2, msg3];
const stmt = env.DB.prepare(
  'INSERT INTO messages (run_id, role, content, sequence, created_at) VALUES (?, ?, ?, ?, ?)'
);

await env.DB.batch(
  messages.map((msg, i) => stmt.bind(runId, msg.role, msg.content, baseSequence + i, Date.now()))
);

Read Replicas

D1 supports read replicas for global reads:

typescript
// Reads are automatically routed to nearest replica
const state = await stateStore.load(runId);

// Writes go to primary
await stateStore.save(state);

SQLite Limitations

D1 is SQLite-based:

  • No stored procedures
  • Limited concurrent writes
  • JSON stored as TEXT (use JSON1 functions)

Durable Object Considerations

Geographic Pinning

Durable Objects are pinned to a region:

typescript
// First access pins the DO to nearest region
const stub = env.STREAM_MANAGER.get(id);

// Subsequent access goes to that region
// For global apps, consider DO per-region

Memory Limits

Durable Objects have memory limits:

typescript
// For large streams, persist to storage
async write(chunk: StreamChunk) {
  this.state.chunks.push(chunk);

  // Persist every N chunks
  if (this.state.chunks.length % 100 === 0) {
    await this.ctx.storage.put('state', this.state);
  }
}

WebSocket Limits

Limited concurrent WebSocket connections:

typescript
// Max ~32 WebSocket connections per DO
// For more, use pub/sub pattern or multiple DOs

Deployment

Create D1 Database

bash
wrangler d1 create agent-state

Run Migrations

bash
wrangler d1 migrations apply agent-state

Deploy

bash
wrangler deploy

Secrets

bash
wrangler secret put OPENAI_API_KEY

Next Steps

Released under the MIT License.