@helix-agents/store-cloudflare
Cloudflare storage implementations using D1 and Durable Objects.
Installation
bash
npm install @helix-agents/store-cloudflarecreateCloudflareStore
Factory function for creating stores from bindings.
typescript
import { createCloudflareStore } from '@helix-agents/store-cloudflare';
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB, // D1Database binding
streams: env.STREAMS, // DurableObjectNamespace binding
});Options
typescript
interface CreateCloudflareStoreOptions {
// State store options
stateOptions?: {
logger?: Logger;
};
// Stream manager options
streamOptions?: {
bufferSize?: number;
flushInterval?: number;
logger?: Logger;
};
}D1StateStore
D1-backed state storage.
typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';
const stateStore = new D1StateStore({
database: env.AGENT_DB,
logger: console,
});Methods
typescript
// Save state (runId is inside state object)
await stateStore.save(state);
// Load state
const state = await stateStore.load(runId);
// Check existence
const exists = await stateStore.exists(runId);
// Update status
await stateStore.updateStatus(runId, 'completed');
// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages(runId, {
offset: 0,
limit: 50,
});Database Schema
sql
CREATE TABLE agent_state (
run_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'running',
step_count INTEGER NOT NULL DEFAULT 0,
custom_state TEXT NOT NULL DEFAULT '{}',
messages TEXT NOT NULL DEFAULT '[]',
output TEXT,
error TEXT,
stream_id TEXT,
parent_agent_id TEXT,
sub_agents TEXT NOT NULL DEFAULT '[]',
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);DurableObjectStreamManager
Stream management using Durable Objects.
typescript
import { DurableObjectStreamManager } from '@helix-agents/store-cloudflare';
const streamManager = new DurableObjectStreamManager({
streamNamespace: env.STREAMS,
bufferSize: 100, // Chunks to buffer before flush
flushInterval: 100, // Flush interval (ms)
logger: console,
});Methods
typescript
// Create writer (implicitly creates stream)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk);
await writer.close();
// Create reader
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
// Process chunk
}
// End stream
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output
// Fail stream
await streamManager.failStream(streamId, 'Error message');
// Get stream info
const info = await streamManager.getInfo(streamId);
// Resumable reader
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100,
});Events
typescript
interface BufferOverflowEvent {
streamId: string;
droppedCount: number;
bufferSize: number;
}
streamManager.on('bufferOverflow', (event: BufferOverflowEvent) => {
console.warn('Buffer overflow:', event);
});StreamDurableObject
Durable Object class for stream coordination. Must be exported from your worker.
typescript
import { StreamDurableObject } from '@helix-agents/store-cloudflare';
// In worker.ts - re-export the class
export { StreamDurableObject };Custom Durable Object
Create a custom class with options:
typescript
import { createStreamDurableObjectClass } from '@helix-agents/store-cloudflare';
const CustomStreamDO = createStreamDurableObjectClass({
maxChunks: 10000,
chunkTTL: 3600, // 1 hour
});
export { CustomStreamDO as StreamDurableObject };Migrations
D1 schema migrations.
typescript
import {
runMigration,
isMigrated,
dropAllTables,
getAgentsMigrationVersion,
getAgentsTableNames,
SCHEMA_MIGRATION_V1,
CURRENT_SCHEMA_VERSION,
TABLE_NAMES,
} from '@helix-agents/store-cloudflare';
// Run migration
await runMigration(env.AGENT_DB);
// Check if migrated
const migrated = await isMigrated(env.AGENT_DB);
// Get current version
const version = await getAgentsMigrationVersion(env.AGENT_DB);
// Drop all tables (for testing)
await dropAllTables(env.AGENT_DB);Migration SQL
typescript
// Get the SQL for manual migration
console.log(SCHEMA_MIGRATION_V1);Errors
typescript
import {
D1StateError, // Base D1 error
StateNotFoundError, // State not found
SubAgentRefNotFoundError,
StreamConnectionError, // DO connection error
StreamNotFoundError, // Stream not found
SequenceConflictError, // Sequence mismatch
} from '@helix-agents/store-cloudflare';
try {
const state = await stateStore.load(runId);
} catch (error) {
if (error instanceof StateNotFoundError) {
// Handle not found
}
}Complete Example
wrangler.toml
toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]
[[d1_databases]]
binding = "AGENT_DB"
database_name = "my-agents-db"
database_id = "xxx-xxx-xxx"
[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamDurableObject"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamDurableObject"]worker.ts
typescript
import {
createCloudflareStore,
StreamDurableObject,
runMigration,
} from '@helix-agents/store-cloudflare';
// Re-export Durable Object
export { StreamDurableObject };
interface Env {
AGENT_DB: D1Database;
STREAMS: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
// Run migration on first request (or use scheduled task)
await runMigration(env.AGENT_DB);
// Create stores
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB,
streams: env.STREAMS,
});
// Use stores...
const state = await stateStore.load('run-123');
return Response.json({ state });
},
};Migration Script
bash
# Create database
npx wrangler d1 create my-agents-db
# Run migration (creates tables)
npx wrangler d1 execute my-agents-db --file=./migrations/0001_init.sqlRe-exported Types
For convenience, core types are re-exported:
typescript
import type {
StateStore,
StreamManager,
StreamWriter,
StreamReader,
StreamChunk,
ResumableStreamReader,
ResumableReaderOptions,
StreamInfo,
ResumableStreamStatus,
AgentState,
Message,
AgentStatus,
MergeChanges,
} from '@helix-agents/store-cloudflare';