Skip to content

@helix-agents/store-cloudflare

Cloudflare storage implementations using D1 and Durable Objects.

Installation

bash
npm install @helix-agents/store-cloudflare

createCloudflareStore

Factory function for creating stores from bindings.

typescript
import { createCloudflareStore } from '@helix-agents/store-cloudflare';

const { stateStore, streamManager } = createCloudflareStore({
  db: env.AGENT_DB, // D1Database binding
  streams: env.STREAMS, // DurableObjectNamespace binding
});

Options

typescript
interface CreateCloudflareStoreOptions {
  // State store options
  stateOptions?: {
    logger?: Logger;
  };

  // Stream manager options
  streamOptions?: {
    bufferSize?: number;
    flushInterval?: number;
    logger?: Logger;
  };
}

D1StateStore

D1-backed state storage.

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

const stateStore = new D1StateStore({
  database: env.AGENT_DB,
  logger: console,
});

Methods

typescript
// Save state
await stateStore.saveState(sessionId, state);

// Load state
const state = await stateStore.loadState(sessionId);

// Check existence
const exists = await stateStore.exists(sessionId);

// Update status
await stateStore.updateStatus(sessionId, 'completed');

// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages(sessionId, {
  offset: 0,
  limit: 50,
});

Checkpoint Methods

typescript
// Get a specific checkpoint
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-session-123-s5-...');

// Get most recent checkpoint
const latest = await stateStore.getLatestCheckpoint('session-123');

// List checkpoints with pagination
const result = await stateStore.listCheckpoints('session-123', {
  limit: 10,
  cursor: 'next-page-cursor',
});

Staging Methods

typescript
// Stage changes for a tool call within a step
await stateStore.stageChanges('session-123', 'step-1', {
  toolCallId: 'call-456',
  patches: [{ op: 'add', path: '/notes/-', value: 'New finding' }],
  mergeChanges: { notes: ['New finding'] },
  timestamp: Date.now(),
});

// Commit staged changes
await stateStore.promoteStaging('session-123', 'step-1');

// Rollback staged changes
await stateStore.discardStaging('session-123', 'step-1');

Distributed Coordination

typescript
// Atomic compare-and-set for status
const success = await stateStore.compareAndSetStatus(
  'session-123',
  ['running'],    // Expected statuses (array)
  'interrupted'   // New status
);

// Increment resume count
const count = await stateStore.incrementResumeCount('session-123');

Database Schema

The D1StateStore uses programmatic migrations. See d1-migrations.ts for the full schema. Core tables use session-centric naming (all prefixed with __agents_):

sql
CREATE TABLE __agents_states (
  session_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  step_count INTEGER NOT NULL DEFAULT 0,
  custom_state TEXT NOT NULL DEFAULT '{}',
  output TEXT,
  error TEXT,
  stream_id TEXT,
  parent_session_id TEXT,
  aborted INTEGER DEFAULT 0,
  abort_reason TEXT,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);

DurableObjectStreamManager

Stream management using Durable Objects.

typescript
import { DurableObjectStreamManager } from '@helix-agents/store-cloudflare';

const streamManager = new DurableObjectStreamManager({
  streamNamespace: env.STREAMS,
  bufferSize: 100, // Chunks to buffer before flush
  flushInterval: 100, // Flush interval (ms)
  logger: console,
});

Methods

typescript
// Create writer (implicitly creates stream)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk);
await writer.close();

// Create reader
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
  // Process chunk
}

// End stream
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output

// Fail stream
await streamManager.failStream(streamId, 'Error message');

// Get stream info
const info = await streamManager.getInfo(streamId);

// Resumable reader
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100,
});

Events

typescript
interface BufferOverflowEvent {
  streamId: string;
  droppedCount: number;
  bufferSize: number;
}

streamManager.on('bufferOverflow', (event: BufferOverflowEvent) => {
  console.warn('Buffer overflow:', event);
});

StreamServer

PartyServer-based Durable Object class for stream coordination. Must be exported from your worker.

typescript
import { StreamServer } from '@helix-agents/store-cloudflare';

// In worker.ts - re-export the class
export { StreamServer };

Custom Logger

Create a custom class with a logger:

typescript
import { StreamServer } from '@helix-agents/store-cloudflare';

const CustomStreamServer = StreamServer.withLogger(myLogger);

export { CustomStreamServer as StreamServer };

Migrations

D1 schema migrations.

typescript
import {
  runMigration,
  isMigrated,
  dropAllTables,
  getAgentsMigrationVersion,
  getAgentsTableNames,
  SCHEMA_MIGRATION_V1,
  CURRENT_SCHEMA_VERSION,
  TABLE_NAMES,
} from '@helix-agents/store-cloudflare';

// Run migration
await runMigration(env.AGENT_DB);

// Check if migrated
const migrated = await isMigrated(env.AGENT_DB);

// Get current version
const version = await getAgentsMigrationVersion(env.AGENT_DB);

// Drop all tables (for testing)
await dropAllTables(env.AGENT_DB);

Migration SQL

typescript
// Get the SQL for manual migration
console.log(SCHEMA_MIGRATION_V1);

Errors

typescript
import {
  D1StateError, // Base D1 error
  StateNotFoundError, // State not found
  SubSessionRefNotFoundError,
  StreamConnectionError, // DO connection error
  StreamNotFoundError, // Stream not found
  SequenceConflictError, // Sequence mismatch
} from '@helix-agents/store-cloudflare';

try {
  const state = await stateStore.loadState(sessionId);
} catch (error) {
  if (error instanceof StateNotFoundError) {
    // Handle not found
  }
}

Complete Example

wrangler.toml

toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]

[[d1_databases]]
binding = "AGENT_DB"
database_name = "my-agents-db"
database_id = "xxx-xxx-xxx"

[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamServer"

[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamServer"]

worker.ts

typescript
import { createCloudflareStore, StreamServer, runMigration } from '@helix-agents/store-cloudflare';

// Re-export Durable Object
export { StreamServer };

interface Env {
  AGENT_DB: D1Database;
  STREAMS: DurableObjectNamespace;
}

export default {
  async fetch(request: Request, env: Env) {
    // Run migration on first request (or use scheduled task)
    await runMigration(env.AGENT_DB);

    // Create stores
    const { stateStore, streamManager } = createCloudflareStore({
      db: env.AGENT_DB,
      streams: env.STREAMS,
    });

    // Use stores...
    const state = await stateStore.loadState('session-123');

    return Response.json({ state });
  },
};

Migration Script

bash
# Create database
npx wrangler d1 create my-agents-db

# Run migration (creates tables)
npx wrangler d1 execute my-agents-db --file=./migrations/0001_init.sql

D1UsageStore

D1-backed usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics in Cloudflare Workers.

typescript
import { D1UsageStore } from '@helix-agents/store-cloudflare';

const usageStore = new D1UsageStore({
  database: env.AGENT_DB,
  tableName: 'usage_entries', // Optional, default: 'usage_entries'
});

Basic Usage

Pass to executor to enable usage tracking:

typescript
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();

// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);

Methods

recordEntry

Record a usage entry (called internally by the framework).

typescript
await usageStore.recordEntry({
  kind: 'tokens',
  sessionId: 'session-123',
  stepCount: 1,
  timestamp: Date.now(),
  source: { type: 'agent', name: 'my-agent' },
  model: 'gpt-4o',
  tokens: { prompt: 100, completion: 50, total: 150 },
});

getEntries

Get usage entries for a run with optional filtering.

typescript
// All entries
const entries = await usageStore.getEntries('session-123');

// Filter by kind
const tokenEntries = await usageStore.getEntries('session-123', {
  kinds: ['tokens'],
});

// Filter by step range
const midRunEntries = await usageStore.getEntries('session-123', {
  stepRange: { min: 5, max: 10 },
});

// Pagination
const page = await usageStore.getEntries('session-123', {
  limit: 10,
  offset: 20,
});

getRollup

Get aggregated usage rollup.

typescript
// This agent's usage only
const rollup = await usageStore.getRollup('session-123');

// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('session-123', {
  includeSubAgents: true,
});

exists

Check if usage data exists for a run.

typescript
const hasUsage = await usageStore.exists('session-123');

delete

Delete usage data for a run.

typescript
await usageStore.delete('session-123');

getEntryCount

Get entry count without fetching entries.

typescript
const count = await usageStore.getEntryCount('session-123');

findSessionIds

Find all tracked session IDs with optional filters.

typescript
// All session IDs
const sessionIds = await usageStore.findSessionIds();

// With filters
const filteredSessionIds = await usageStore.findSessionIds({
  agentType: 'researcher',
  limit: 100,
});

deleteOldEntries

Delete entries older than a specified age.

typescript
// Delete entries older than 7 days
await usageStore.deleteOldEntries(7 * 24 * 60 * 60 * 1000);

Database Schema

The D1 migration creates this table:

sql
CREATE TABLE __agents_usage (
  id TEXT PRIMARY KEY,
  session_id TEXT NOT NULL,
  kind TEXT NOT NULL,
  step_count INTEGER NOT NULL,
  timestamp INTEGER NOT NULL,
  source_type TEXT NOT NULL,
  source_name TEXT NOT NULL,
  data TEXT NOT NULL,
  created_at INTEGER NOT NULL
);

CREATE INDEX idx___agents_usage_session_id ON __agents_usage(session_id);
CREATE INDEX idx___agents_usage_kind ON __agents_usage(kind);
CREATE INDEX idx___agents_usage_timestamp ON __agents_usage(timestamp);

Migration

Run migration to create the usage table:

typescript
import { runMigration } from '@helix-agents/store-cloudflare';

// Run on worker startup or via scheduled task
await runMigration(env.AGENT_DB);

Complete Example

typescript
import { createCloudflareStore, D1UsageStore, runMigration } from '@helix-agents/store-cloudflare';

interface Env {
  AGENT_DB: D1Database;
  STREAMS: DurableObjectNamespace;
}

export default {
  async fetch(request: Request, env: Env) {
    await runMigration(env.AGENT_DB);

    const { stateStore, streamManager } = createCloudflareStore({
      db: env.AGENT_DB,
      streams: env.STREAMS,
    });

    const usageStore = new D1UsageStore({ database: env.AGENT_DB });

    // Execute agent with usage tracking
    const handle = await executor.execute(agent, 'Task', { usageStore });
    await handle.result();

    // Get usage data
    const rollup = await handle.getUsageRollup();

    return Response.json({
      tokens: rollup?.tokens.total,
      toolCalls: rollup?.toolStats.totalCalls,
    });
  },

  // Scheduled cleanup
  async scheduled(event: ScheduledEvent, env: Env) {
    const usageStore = new D1UsageStore({ database: env.AGENT_DB });
    // Delete entries older than 30 days
    await usageStore.deleteOldEntries(30 * 24 * 60 * 60 * 1000);
  },
};

DurableObjectLockManager

Distributed lock manager using Durable Objects. Prevents concurrent execution of the same agent across Workers.

typescript
import { DurableObjectLockManager } from '@helix-agents/store-cloudflare';

const lockManager = new DurableObjectLockManager(env.LOCKS, {
  defaultTTLMs: 30000, // Default lock TTL (30s)
});

wrangler.toml Configuration

toml
[[durable_objects.bindings]]
name = "LOCKS"
class_name = "LockServer"

[[migrations]]
tag = "v2"
new_sqlite_classes = ["LockServer"]

Export the Durable Object

typescript
import { LockServer } from '@helix-agents/store-cloudflare';

export { LockServer };

Methods

acquire

Acquire a lock with fencing token.

typescript
const lock = await lockManager.acquire('session-123', {
  ttlMs: 30000,
  owner: 'worker-instance-1',
});

if (lock) {
  console.log('Lock acquired');
  console.log('Fencing token:', lock.fencingToken);

  // Do work...

  // Refresh to extend TTL
  await lock.refresh();

  // Release when done
  await lock.release();
}

isLocked

Check if a lock is currently held.

typescript
const locked = await lockManager.isLocked('session-123');

Usage with Executor

typescript
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { DurableObjectLockManager } from '@helix-agents/store-cloudflare';

const lockManager = new DurableObjectLockManager(env.LOCKS);

const executor = new CloudflareAgentExecutor({
  workflowBinding: env.AGENT_WORKFLOW,
  stateStore,
  streamManager,
  lockManager, // Prevents concurrent execution
});

Fencing Tokens

Fencing tokens prevent split-brain in distributed scenarios:

typescript
const lock = await lockManager.acquire('session-123');

// Operations validate the fencing token
await stateStore.save(state, { fencingToken: lock.fencingToken });

Re-exported Types

For convenience, core types are re-exported:

typescript
import type {
  StateStore,
  StreamManager,
  StreamWriter,
  StreamReader,
  StreamChunk,
  ResumableStreamReader,
  ResumableReaderOptions,
  StreamInfo,
  ResumableStreamStatus,
  AgentState,
  Message,
  AgentStatus,
  MergeChanges,
  LockManager,
  Lock,
  LockAcquireOptions,
} from '@helix-agents/store-cloudflare';

See Also

Released under the MIT License.