Cloudflare Storage
The Cloudflare storage package (@helix-agents/store-cloudflare) provides state and stream storage using Cloudflare's D1 database and Durable Objects. Designed for edge deployment with the Cloudflare runtime.
When to Use
Good fit:
- Cloudflare Workers deployments
- Global edge distribution
- Serverless architecture
- Existing Cloudflare infrastructure
Not ideal for:
- Non-Cloudflare environments
- High write throughput (D1 limitations)
- Complex queries (limited SQL support)
Installation
bash
npm install @helix-agents/store-cloudflarePrerequisites
You need:
- Cloudflare account with Workers Paid plan
- D1 database created
- Durable Object namespace configured
D1StateStore
Setup
Configure D1 in wrangler.toml:
toml
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"Migrations
Create the required tables:
sql
-- migrations/0001_create_tables.sql
CREATE TABLE IF NOT EXISTS agent_state (
run_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
parent_agent_id TEXT,
status TEXT NOT NULL DEFAULT 'running',
step_count INTEGER DEFAULT 0,
custom_state TEXT, -- JSON
output TEXT, -- JSON
error TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
tool_calls TEXT, -- JSON
tool_call_id TEXT,
tool_name TEXT,
thinking TEXT, -- JSON
sequence INTEGER NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY (run_id) REFERENCES agent_state(run_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_messages_run_id ON messages(run_id);
CREATE INDEX IF NOT EXISTS idx_messages_sequence ON messages(run_id, sequence);
CREATE TABLE IF NOT EXISTS sub_agent_refs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
parent_run_id TEXT NOT NULL,
sub_agent_run_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
parent_tool_call_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'running',
started_at INTEGER NOT NULL,
completed_at INTEGER,
FOREIGN KEY (parent_run_id) REFERENCES agent_state(run_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_sub_agent_refs_parent ON sub_agent_refs(parent_run_id);Apply migrations:
bash
wrangler d1 migrations apply agent-stateUsage
typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';
export default {
async fetch(request: Request, env: Env) {
const stateStore = new D1StateStore(env.DB);
// Save state
await stateStore.save(state);
// Load state
const loaded = await stateStore.load('run-123');
// Delete state
await stateStore.delete('run-123');
},
};Atomic Operations
typescript
// Append messages
await stateStore.appendMessages('run-123', [{ role: 'assistant', content: 'Hello!' }]);
// Update status
await stateStore.updateStatus('run-123', 'completed');
// Increment step count
const newCount = await stateStore.incrementStepCount('run-123');
// Merge custom state
await stateStore.mergeCustomState('run-123', {
values: { count: 5 },
arrayReplacements: new Set(),
warnings: [],
});Programmatic Migrations
For dynamic environments:
typescript
import { D1StateStore, runMigrations } from '@helix-agents/store-cloudflare';
// Run migrations on first request
let migrated = false;
export default {
async fetch(request: Request, env: Env) {
if (!migrated) {
await runMigrations(env.DB);
migrated = true;
}
const stateStore = new D1StateStore(env.DB);
// ...
},
};DOStreamManager (Durable Objects)
Setup
Configure Durable Objects in wrangler.toml:
toml
[durable_objects]
bindings = [
{ name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]Durable Object Implementation
typescript
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
import type { StreamChunk } from '@helix-agents/core';
interface StreamState {
chunks: StreamChunk[];
status: 'active' | 'ended' | 'failed';
endedAt?: number;
error?: string;
}
export class StreamManagerDO extends DurableObject {
private state: StreamState = {
chunks: [],
status: 'active',
};
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
// Load state from storage
ctx.blockConcurrencyWhile(async () => {
const stored = await ctx.storage.get<StreamState>('state');
if (stored) {
this.state = stored;
}
});
}
async write(chunk: StreamChunk): Promise<void> {
if (this.state.status !== 'active') {
throw new Error(`Cannot write to ${this.state.status} stream`);
}
this.state.chunks.push(chunk);
await this.ctx.storage.put('state', this.state);
// Notify WebSocket clients
for (const ws of this.ctx.getWebSockets()) {
ws.send(JSON.stringify(chunk));
}
}
async read(fromOffset: number = 0): Promise<StreamChunk[]> {
return this.state.chunks.slice(fromOffset);
}
async end(finalOutput?: unknown): Promise<void> {
this.state.status = 'ended';
this.state.endedAt = Date.now();
await this.ctx.storage.put('state', this.state);
// Close WebSocket connections
for (const ws of this.ctx.getWebSockets()) {
ws.close(1000, 'Stream ended');
}
}
async fail(error: string): Promise<void> {
this.state.status = 'failed';
this.state.error = error;
await this.ctx.storage.put('state', this.state);
for (const ws of this.ctx.getWebSockets()) {
ws.close(1011, error);
}
}
async getInfo(): Promise<{ status: string; chunkCount: number }> {
return {
status: this.state.status,
chunkCount: this.state.chunks.length,
};
}
async fetch(request: Request): Promise<Response> {
// WebSocket upgrade for real-time streaming
if (request.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair());
this.ctx.acceptWebSocket(server);
// Send historical chunks
for (const chunk of this.state.chunks) {
server.send(JSON.stringify(chunk));
}
return new Response(null, { status: 101, webSocket: client });
}
return new Response('Expected WebSocket', { status: 400 });
}
}Stream Manager Wrapper
typescript
// src/do-stream-manager.ts
import type { StreamManager, StreamWriter, StreamReader, StreamChunk } from '@helix-agents/core';
export class DOStreamManager implements StreamManager {
constructor(private readonly binding: DurableObjectNamespace) {}
async createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter> {
const stub = this.getStub(streamId);
return {
write: async (chunk: StreamChunk) => {
await stub.write(chunk);
},
close: async () => {
// Writer close is a no-op - stream continues
},
};
}
async createReader(streamId: string): Promise<StreamReader | null> {
const stub = this.getStub(streamId);
const info = await stub.getInfo();
if (info.status === 'failed') {
return null;
}
let offset = 0;
let done = false;
return {
[Symbol.asyncIterator]: () => ({
next: async () => {
if (done) {
return { done: true, value: undefined };
}
const chunks = await stub.read(offset);
if (chunks.length > 0) {
offset += chunks.length;
return { done: false, value: chunks[0] };
}
// Check if stream ended
const currentInfo = await stub.getInfo();
if (currentInfo.status === 'ended') {
done = true;
return { done: true, value: undefined };
}
// Poll for new chunks (in practice, use WebSocket)
await new Promise((r) => setTimeout(r, 100));
return this.next();
},
}),
close: async () => {},
};
}
async endStream(streamId: string, finalOutput?: unknown): Promise<void> {
const stub = this.getStub(streamId);
await stub.end(finalOutput);
}
async failStream(streamId: string, error: string): Promise<void> {
const stub = this.getStub(streamId);
await stub.fail(error);
}
private getStub(streamId: string) {
const id = this.binding.idFromName(streamId);
return this.binding.get(id);
}
}Export Durable Object
typescript
// src/index.ts
export { StreamManagerDO } from './stream-manager-do';Complete Example
typescript
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore } from '@helix-agents/store-cloudflare';
import { DOStreamManager } from './do-stream-manager';
import { StreamManagerDO } from './stream-manager-do';
import { registry } from './agents';
export { StreamManagerDO };
export interface Env {
DB: D1Database;
STREAM_MANAGER: DurableObjectNamespace;
AGENT_WORKFLOW: Workflow;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const stateStore = new D1StateStore(env.DB);
const streamManager = new DOStreamManager(env.STREAM_MANAGER);
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore,
streamManager,
});
// POST /execute
if (url.pathname === '/execute' && request.method === 'POST') {
const { agentType, message } = await request.json();
const agent = registry.get(agentType);
const handle = await executor.execute(agent, message);
return Response.json({
runId: handle.runId,
streamUrl: `/stream/${handle.runId}`,
});
}
// GET /stream/:runId (SSE)
if (url.pathname.startsWith('/stream/')) {
const runId = url.pathname.split('/').pop()!;
const handle = await executor.getHandle(registry.get('default'), runId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const stream = await handle.stream();
if (!stream) {
return new Response('Stream not available', { status: 404 });
}
return new Response(
new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for await (const chunk of stream) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
}
);
}
// GET /result/:runId
if (url.pathname.startsWith('/result/')) {
const runId = url.pathname.split('/').pop()!;
const handle = await executor.getHandle(registry.get('default'), runId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const result = await handle.result();
return Response.json(result);
}
return new Response('Not found', { status: 404 });
},
};D1 Considerations
Write Limits
D1 has write throughput limits. For high-volume writes:
typescript
// Batch message inserts
const messages = [msg1, msg2, msg3];
const stmt = env.DB.prepare(
'INSERT INTO messages (run_id, role, content, sequence, created_at) VALUES (?, ?, ?, ?, ?)'
);
await env.DB.batch(
messages.map((msg, i) => stmt.bind(runId, msg.role, msg.content, baseSequence + i, Date.now()))
);Read Replicas
D1 supports read replicas for global reads:
typescript
// Reads are automatically routed to nearest replica
const state = await stateStore.load(runId);
// Writes go to primary
await stateStore.save(state);SQLite Limitations
D1 is SQLite-based:
- No stored procedures
- Limited concurrent writes
- JSON stored as TEXT (use JSON1 functions)
Durable Object Considerations
Geographic Pinning
Durable Objects are pinned to a region:
typescript
// First access pins the DO to nearest region
const stub = env.STREAM_MANAGER.get(id);
// Subsequent access goes to that region
// For global apps, consider DO per-regionMemory Limits
Durable Objects have memory limits:
typescript
// For large streams, persist to storage
async write(chunk: StreamChunk) {
this.state.chunks.push(chunk);
// Persist every N chunks
if (this.state.chunks.length % 100 === 0) {
await this.ctx.storage.put('state', this.state);
}
}WebSocket Limits
Limited concurrent WebSocket connections:
typescript
// Max ~32 WebSocket connections per DO
// For more, use pub/sub pattern or multiple DOsDeployment
Create D1 Database
bash
wrangler d1 create agent-stateRun Migrations
bash
wrangler d1 migrations apply agent-stateDeploy
bash
wrangler deploySecrets
bash
wrangler secret put OPENAI_API_KEYNext Steps
- In-Memory Storage - For development
- Redis Storage - For self-hosted production
- Cloudflare Runtime - Uses these stores