@helix-agents/store-cloudflare
Cloudflare storage implementations using D1 and Durable Objects.
Installation
npm install @helix-agents/store-cloudflarecreateCloudflareStore
Factory function for creating stores from bindings.
import { createCloudflareStore } from '@helix-agents/store-cloudflare';
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB, // D1Database binding
streams: env.STREAMS, // DurableObjectNamespace binding
});Options
interface CreateCloudflareStoreOptions {
// State store options
stateOptions?: {
logger?: Logger;
};
// Stream manager options
streamOptions?: {
bufferSize?: number;
flushInterval?: number;
logger?: Logger;
};
}D1StateStore
D1-backed state storage.
import { D1StateStore } from '@helix-agents/store-cloudflare';
const stateStore = new D1StateStore({
database: env.AGENT_DB,
logger: console,
});Methods
// Save state
await stateStore.saveState(sessionId, state);
// Load state
const state = await stateStore.loadState(sessionId);
// Check existence
const exists = await stateStore.exists(sessionId);
// Update status
await stateStore.updateStatus(sessionId, 'completed');
// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages(sessionId, {
offset: 0,
limit: 50,
});Checkpoint Methods
// Get a specific checkpoint
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-session-123-s5-...');
// Get most recent checkpoint
const latest = await stateStore.getLatestCheckpoint('session-123');
// List checkpoints with pagination
const result = await stateStore.listCheckpoints('session-123', {
limit: 10,
cursor: 'next-page-cursor',
});Staging Methods
// Stage changes for a tool call within a step
await stateStore.stageChanges('session-123', 'step-1', {
toolCallId: 'call-456',
patches: [{ op: 'add', path: '/notes/-', value: 'New finding' }],
mergeChanges: { notes: ['New finding'] },
timestamp: Date.now(),
});
// Commit staged changes
await stateStore.promoteStaging('session-123', 'step-1');
// Rollback staged changes
await stateStore.discardStaging('session-123', 'step-1');Distributed Coordination
// Atomic compare-and-set for status
const success = await stateStore.compareAndSetStatus(
'session-123',
['running'], // Expected statuses (array)
'interrupted' // New status
);
// Increment resume count
const count = await stateStore.incrementResumeCount('session-123');Database Schema
The D1StateStore uses programmatic migrations. See d1-migrations.ts for the full schema. Core tables use session-centric naming (all prefixed with __agents_):
CREATE TABLE __agents_states (
session_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
step_count INTEGER NOT NULL DEFAULT 0,
custom_state TEXT NOT NULL DEFAULT '{}',
output TEXT,
error TEXT,
stream_id TEXT,
parent_session_id TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);DurableObjectStreamManager
Stream management using Durable Objects.
import { DurableObjectStreamManager } from '@helix-agents/store-cloudflare';
const streamManager = new DurableObjectStreamManager({
streamNamespace: env.STREAMS,
bufferSize: 100, // Chunks to buffer before flush
flushInterval: 100, // Flush interval (ms)
logger: console,
});Methods
// Create writer (implicitly creates stream)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk);
await writer.close();
// Create reader
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
// Process chunk
}
// End stream
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output
// Fail stream
await streamManager.failStream(streamId, 'Error message');
// Get stream info
const info = await streamManager.getInfo(streamId);
// Resumable reader
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100,
});Events
interface BufferOverflowEvent {
streamId: string;
droppedCount: number;
bufferSize: number;
}
streamManager.on('bufferOverflow', (event: BufferOverflowEvent) => {
console.warn('Buffer overflow:', event);
});StreamServer
PartyServer-based Durable Object class for stream coordination. Must be exported from your worker.
import { StreamServer } from '@helix-agents/store-cloudflare';
// In worker.ts - re-export the class
export { StreamServer };Custom Logger
Create a custom class with a logger:
import { StreamServer } from '@helix-agents/store-cloudflare';
const CustomStreamServer = StreamServer.withLogger(myLogger);
export { CustomStreamServer as StreamServer };Migrations
D1 schema migrations.
import {
runMigration,
isMigrated,
dropAllTables,
getAgentsMigrationVersion,
getAgentsTableNames,
SCHEMA_MIGRATION_V1,
CURRENT_SCHEMA_VERSION,
TABLE_NAMES,
} from '@helix-agents/store-cloudflare';
// Run migration
await runMigration(env.AGENT_DB);
// Check if migrated
const migrated = await isMigrated(env.AGENT_DB);
// Get current version
const version = await getAgentsMigrationVersion(env.AGENT_DB);
// Drop all tables (for testing)
await dropAllTables(env.AGENT_DB);Migration SQL
// Get the SQL for manual migration
console.log(SCHEMA_MIGRATION_V1);Errors
import {
D1StateError, // Base D1 error
StateNotFoundError, // State not found
SubSessionRefNotFoundError,
StreamConnectionError, // DO connection error
StreamNotFoundError, // Stream not found
SequenceConflictError, // Sequence mismatch
} from '@helix-agents/store-cloudflare';
try {
const state = await stateStore.loadState(sessionId);
} catch (error) {
if (error instanceof StateNotFoundError) {
// Handle not found
}
}Complete Example
wrangler.toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]
[[d1_databases]]
binding = "AGENT_DB"
database_name = "my-agents-db"
database_id = "xxx-xxx-xxx"
[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamServer"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamServer"]worker.ts
import { createCloudflareStore, StreamServer, runMigration } from '@helix-agents/store-cloudflare';
// Re-export Durable Object
export { StreamServer };
interface Env {
AGENT_DB: D1Database;
STREAMS: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
// Run migration on first request (or use scheduled task)
await runMigration(env.AGENT_DB);
// Create stores
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB,
streams: env.STREAMS,
});
// Use stores...
const state = await stateStore.loadState('session-123');
return Response.json({ state });
},
};Migration Script
# Create database
npx wrangler d1 create my-agents-db
# Run migration (creates tables)
npx wrangler d1 execute my-agents-db --file=./migrations/0001_init.sqlD1UsageStore
D1-backed usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics in Cloudflare Workers.
import { D1UsageStore } from '@helix-agents/store-cloudflare';
const usageStore = new D1UsageStore({
database: env.AGENT_DB,
tableName: 'usage_entries', // Optional, default: 'usage_entries'
});Basic Usage
Pass to executor to enable usage tracking:
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();
// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);Methods
recordEntry
Record a usage entry (called internally by the framework).
await usageStore.recordEntry({
kind: 'tokens',
sessionId: 'session-123',
stepCount: 1,
timestamp: Date.now(),
source: { type: 'agent', name: 'my-agent' },
model: 'gpt-4o',
tokens: { prompt: 100, completion: 50, total: 150 },
});getEntries
Get usage entries for a run with optional filtering.
// All entries
const entries = await usageStore.getEntries('session-123');
// Filter by kind
const tokenEntries = await usageStore.getEntries('session-123', {
kinds: ['tokens'],
});
// Filter by step range
const midRunEntries = await usageStore.getEntries('session-123', {
stepRange: { min: 5, max: 10 },
});
// Pagination
const page = await usageStore.getEntries('session-123', {
limit: 10,
offset: 20,
});getRollup
Get aggregated usage rollup.
// This agent's usage only
const rollup = await usageStore.getRollup('session-123');
// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('session-123', {
includeSubAgents: true,
});exists
Check if usage data exists for a run.
const hasUsage = await usageStore.exists('session-123');delete
Delete usage data for a run.
await usageStore.delete('session-123');getEntryCount
Get entry count without fetching entries.
const count = await usageStore.getEntryCount('session-123');findSessionIds
Find all tracked session IDs with optional filters.
// All session IDs
const sessionIds = await usageStore.findSessionIds();
// With filters
const filteredSessionIds = await usageStore.findSessionIds({
agentType: 'researcher',
limit: 100,
});deleteOldEntries
Delete entries older than a specified age.
// Delete entries older than 7 days
await usageStore.deleteOldEntries(7 * 24 * 60 * 60 * 1000);Database Schema
The D1 migration creates this table:
CREATE TABLE __agents_usage (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
kind TEXT NOT NULL,
step_count INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
source_type TEXT NOT NULL,
source_name TEXT NOT NULL,
data TEXT NOT NULL,
created_at INTEGER NOT NULL
);
CREATE INDEX idx___agents_usage_session_id ON __agents_usage(session_id);
CREATE INDEX idx___agents_usage_kind ON __agents_usage(kind);
CREATE INDEX idx___agents_usage_timestamp ON __agents_usage(timestamp);Migration
Run migration to create the usage table:
import { runMigration } from '@helix-agents/store-cloudflare';
// Run on worker startup or via scheduled task
await runMigration(env.AGENT_DB);Complete Example
import { createCloudflareStore, D1UsageStore, runMigration } from '@helix-agents/store-cloudflare';
interface Env {
AGENT_DB: D1Database;
STREAMS: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
await runMigration(env.AGENT_DB);
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB,
streams: env.STREAMS,
});
const usageStore = new D1UsageStore({ database: env.AGENT_DB });
// Execute agent with usage tracking
const handle = await executor.execute(agent, 'Task', { usageStore });
await handle.result();
// Get usage data
const rollup = await handle.getUsageRollup();
return Response.json({
tokens: rollup?.tokens.total,
toolCalls: rollup?.toolStats.totalCalls,
});
},
// Scheduled cleanup
async scheduled(event: ScheduledEvent, env: Env) {
const usageStore = new D1UsageStore({ database: env.AGENT_DB });
// Delete entries older than 30 days
await usageStore.deleteOldEntries(30 * 24 * 60 * 60 * 1000);
},
};DurableObjectLockManager
Distributed lock manager using Durable Objects. Prevents concurrent execution of the same agent across Workers.
import { DurableObjectLockManager } from '@helix-agents/store-cloudflare';
const lockManager = new DurableObjectLockManager(env.LOCKS, {
defaultTTLMs: 30000, // Default lock TTL (30s)
});wrangler.toml Configuration
[[durable_objects.bindings]]
name = "LOCKS"
class_name = "LockServer"
[[migrations]]
tag = "v2"
new_sqlite_classes = ["LockServer"]Export the Durable Object
import { LockServer } from '@helix-agents/store-cloudflare';
export { LockServer };Methods
acquire
Acquire a lock with fencing token.
const lock = await lockManager.acquire('session-123', {
ttlMs: 30000,
owner: 'worker-instance-1',
});
if (lock) {
console.log('Lock acquired');
console.log('Fencing token:', lock.fencingToken);
// Do work...
// Refresh to extend TTL
await lock.refresh();
// Release when done
await lock.release();
}isLocked
Check if a lock is currently held.
const locked = await lockManager.isLocked('session-123');Usage with Executor
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { DurableObjectLockManager } from '@helix-agents/store-cloudflare';
const lockManager = new DurableObjectLockManager(env.LOCKS);
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore,
streamManager,
lockManager, // Prevents concurrent execution
});Fencing Tokens
Fencing tokens prevent split-brain in distributed scenarios:
const lock = await lockManager.acquire('session-123');
// Operations validate the fencing token
await stateStore.save(state, { fencingToken: lock.fencingToken });Re-exported Types
For convenience, core types are re-exported:
import type {
StateStore,
StreamManager,
StreamWriter,
StreamReader,
StreamChunk,
ResumableStreamReader,
ResumableReaderOptions,
StreamInfo,
ResumableStreamStatus,
AgentState,
Message,
AgentStatus,
MergeChanges,
LockManager,
Lock,
LockAcquireOptions,
} from '@helix-agents/store-cloudflare';