Skip to content

@helix-agents/store-cloudflare

Cloudflare storage implementations using D1 and Durable Objects.

Installation

bash
npm install @helix-agents/store-cloudflare

createCloudflareStore

Factory function for creating stores from bindings.

typescript
import { createCloudflareStore } from '@helix-agents/store-cloudflare';

const { stateStore, streamManager } = createCloudflareStore({
  db: env.AGENT_DB, // D1Database binding
  streams: env.STREAMS, // DurableObjectNamespace binding
});

Options

typescript
interface CreateCloudflareStoreOptions {
  // State store options
  stateOptions?: {
    logger?: Logger;
  };

  // Stream manager options
  streamOptions?: {
    bufferSize?: number;
    flushInterval?: number;
    logger?: Logger;
  };
}

D1StateStore

D1-backed state storage.

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

const stateStore = new D1StateStore({
  database: env.AGENT_DB,
  logger: console,
});

Methods

typescript
// Save state (runId is inside state object)
await stateStore.save(state);

// Load state
const state = await stateStore.load(runId);

// Check existence
const exists = await stateStore.exists(runId);

// Update status
await stateStore.updateStatus(runId, 'completed');

// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages(runId, {
  offset: 0,
  limit: 50,
});

Database Schema

sql
CREATE TABLE agent_state (
  run_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'running',
  step_count INTEGER NOT NULL DEFAULT 0,
  custom_state TEXT NOT NULL DEFAULT '{}',
  messages TEXT NOT NULL DEFAULT '[]',
  output TEXT,
  error TEXT,
  stream_id TEXT,
  parent_agent_id TEXT,
  sub_agents TEXT NOT NULL DEFAULT '[]',
  aborted INTEGER DEFAULT 0,
  abort_reason TEXT,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

DurableObjectStreamManager

Stream management using Durable Objects.

typescript
import { DurableObjectStreamManager } from '@helix-agents/store-cloudflare';

const streamManager = new DurableObjectStreamManager({
  streamNamespace: env.STREAMS,
  bufferSize: 100, // Chunks to buffer before flush
  flushInterval: 100, // Flush interval (ms)
  logger: console,
});

Methods

typescript
// Create writer (implicitly creates stream)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk);
await writer.close();

// Create reader
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
  // Process chunk
}

// End stream
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output

// Fail stream
await streamManager.failStream(streamId, 'Error message');

// Get stream info
const info = await streamManager.getInfo(streamId);

// Resumable reader
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100,
});

Events

typescript
interface BufferOverflowEvent {
  streamId: string;
  droppedCount: number;
  bufferSize: number;
}

streamManager.on('bufferOverflow', (event: BufferOverflowEvent) => {
  console.warn('Buffer overflow:', event);
});

StreamDurableObject

Durable Object class for stream coordination. Must be exported from your worker.

typescript
import { StreamDurableObject } from '@helix-agents/store-cloudflare';

// In worker.ts - re-export the class
export { StreamDurableObject };

Custom Durable Object

Create a custom class with options:

typescript
import { createStreamDurableObjectClass } from '@helix-agents/store-cloudflare';

const CustomStreamDO = createStreamDurableObjectClass({
  maxChunks: 10000,
  chunkTTL: 3600, // 1 hour
});

export { CustomStreamDO as StreamDurableObject };

Migrations

D1 schema migrations.

typescript
import {
  runMigration,
  isMigrated,
  dropAllTables,
  getAgentsMigrationVersion,
  getAgentsTableNames,
  SCHEMA_MIGRATION_V1,
  CURRENT_SCHEMA_VERSION,
  TABLE_NAMES,
} from '@helix-agents/store-cloudflare';

// Run migration
await runMigration(env.AGENT_DB);

// Check if migrated
const migrated = await isMigrated(env.AGENT_DB);

// Get current version
const version = await getAgentsMigrationVersion(env.AGENT_DB);

// Drop all tables (for testing)
await dropAllTables(env.AGENT_DB);

Migration SQL

typescript
// Get the SQL for manual migration
console.log(SCHEMA_MIGRATION_V1);

Errors

typescript
import {
  D1StateError, // Base D1 error
  StateNotFoundError, // State not found
  SubAgentRefNotFoundError,
  StreamConnectionError, // DO connection error
  StreamNotFoundError, // Stream not found
  SequenceConflictError, // Sequence mismatch
} from '@helix-agents/store-cloudflare';

try {
  const state = await stateStore.load(runId);
} catch (error) {
  if (error instanceof StateNotFoundError) {
    // Handle not found
  }
}

Complete Example

wrangler.toml

toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]

[[d1_databases]]
binding = "AGENT_DB"
database_name = "my-agents-db"
database_id = "xxx-xxx-xxx"

[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamDurableObject"

[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamDurableObject"]

worker.ts

typescript
import {
  createCloudflareStore,
  StreamDurableObject,
  runMigration,
} from '@helix-agents/store-cloudflare';

// Re-export Durable Object
export { StreamDurableObject };

interface Env {
  AGENT_DB: D1Database;
  STREAMS: DurableObjectNamespace;
}

export default {
  async fetch(request: Request, env: Env) {
    // Run migration on first request (or use scheduled task)
    await runMigration(env.AGENT_DB);

    // Create stores
    const { stateStore, streamManager } = createCloudflareStore({
      db: env.AGENT_DB,
      streams: env.STREAMS,
    });

    // Use stores...
    const state = await stateStore.load('run-123');

    return Response.json({ state });
  },
};

Migration Script

bash
# Create database
npx wrangler d1 create my-agents-db

# Run migration (creates tables)
npx wrangler d1 execute my-agents-db --file=./migrations/0001_init.sql

Re-exported Types

For convenience, core types are re-exported:

typescript
import type {
  StateStore,
  StreamManager,
  StreamWriter,
  StreamReader,
  StreamChunk,
  ResumableStreamReader,
  ResumableReaderOptions,
  StreamInfo,
  ResumableStreamStatus,
  AgentState,
  Message,
  AgentStatus,
  MergeChanges,
} from '@helix-agents/store-cloudflare';

See Also

Released under the MIT License.