Cloudflare Storage
The Cloudflare storage package (@helix-agents/store-cloudflare) provides state and stream storage for edge deployment. There are two sets of stores depending on which runtime approach you use:
| Approach | State Store | Stream Manager | Package |
|---|---|---|---|
| Workflows | D1StateStore | DurableObjectStreamManager | @helix-agents/store-cloudflare |
| Durable Objects | DOStateStore | DOStreamManager | @helix-agents/runtime-cloudflare |
Choose Based on Runtime
- Workflows runtime: Use D1StateStore + DurableObjectStreamManager from
@helix-agents/store-cloudflare - DO runtime: Use DOStateStore + DOStreamManager from
@helix-agents/runtime-cloudflare(built into AgentServer)
When to Use
Good fit:
- Cloudflare Workers deployments
- Global edge distribution
- Serverless architecture
- Existing Cloudflare infrastructure
Not ideal for:
- Non-Cloudflare environments
- High write throughput (D1 limitations - Workflows only)
- Complex queries (limited SQL support)
Installation
npm install @helix-agents/store-cloudflareEntry Points and Import Patterns
The package provides three entry points to support different deployment contexts:
| Entry Point | Use Case | Contains |
|---|---|---|
@helix-agents/store-cloudflare | Worker entry points (full package) | Everything |
@helix-agents/store-cloudflare/d1 | Next.js routes, any Node.js context | D1StateStore, D1UsageStore, migrations |
@helix-agents/store-cloudflare/stream | Worker entry points only | StreamServer, DurableObjectStreamManager |
Why Split Entry Points?
The /stream entry point imports partyserver which depends on cloudflare:workers - a Cloudflare-specific module that cannot be imported in:
- Next.js API routes (even with OpenNext for Cloudflare)
- Node.js environments
- Any code that runs during Next.js build
This is because:
- Build-time evaluation: Next.js evaluates all imports during build, even for server routes
- cloudflare:workers is runtime-only: The
cloudflare:workersURL scheme only works inside the Cloudflare Workers runtime - Durable Object class definitions vs bindings: You CAN access Durable Objects via
envbindings, but you CANNOT import the class definition outside a Worker
Correct Import Patterns
In Next.js API routes (OpenNext):
// ✅ CORRECT - Use /d1 entry point
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare/d1';
export async function GET(request: Request) {
const { env } = getCloudflareContext();
const stateStore = new D1StateStore({ database: env.DATABASE });
// Access Durable Objects via binding (NOT import)
const streamStub = env.STREAMS.get(env.STREAMS.idFromName('my-stream'));
}
// ❌ WRONG - Will fail during Next.js build
import { D1StateStore } from '@helix-agents/store-cloudflare';
import { StreamServer } from '@helix-agents/store-cloudflare/stream';In Cloudflare Worker entry points:
// ✅ CORRECT - Worker entry points can import everything
import { StreamServer } from '@helix-agents/store-cloudflare/stream';
import { D1StateStore } from '@helix-agents/store-cloudflare/d1';
// Export Durable Object class for Cloudflare to instantiate
export { StreamServer };
export default {
async fetch(request: Request, env: Env) {
// Now you can use both D1 and stream functionality
const stateStore = new D1StateStore({ database: env.DB });
// Access StreamServer via binding
const stub = env.STREAMS.get(env.STREAMS.idFromName('stream-id'));
},
};Architecture Pattern for Next.js + Cloudflare
When using OpenNext with Cloudflare:
graph TB
subgraph NextJS ["Next.js App (OpenNext)"]
subgraph API ["API Routes"]
A1["Import from @helix-agents/store-cloudflare/d1"]
A2["Query D1 directly for state/messages"]
A3["Access workers via env.WORKER_NAME service binding"]
end
end
NextJS -->|"Service Binding"| AgentWorker
subgraph AgentWorker ["Agent Worker (separate worker)"]
subgraph Entry ["Entry Point (src/index.ts)"]
E1["Export StreamServer (Durable Object class)"]
E2["Import from any entry point"]
E3["Handle agent execution"]
end
endPrerequisites
You need:
- Cloudflare account with Workers Paid plan
- D1 database created
- Durable Object namespace configured
D1StateStore
Setup
Configure D1 in wrangler.toml:
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"Migrations
The D1StateStore uses programmatic migrations that are automatically applied. You should call runMigration() on startup:
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
export default {
async fetch(request: Request, env: Env) {
// Run migrations on startup (safe to call every request - no-op if already migrated)
await runMigration(env.DB);
const stateStore = new D1StateStore({ database: env.DB });
// ...
}
};The framework creates these tables automatically (all prefixed with __agents_):
-- Core state table (session-centric model)
CREATE TABLE IF NOT EXISTS __agents_states (
session_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
parent_session_id TEXT,
stream_id TEXT NOT NULL,
step_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'active',
output TEXT,
error TEXT,
aborted INTEGER NOT NULL DEFAULT 0,
abort_reason TEXT,
custom_state TEXT NOT NULL DEFAULT '{}',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Messages table (separated for O(1) append)
CREATE TABLE IF NOT EXISTS __agents_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
message TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(session_id, sequence)
);
-- Sub-session refs table (session-centric naming)
CREATE TABLE IF NOT EXISTS __agents_sub_session_refs (
session_id TEXT NOT NULL,
sub_session_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
parent_tool_call_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
started_at INTEGER NOT NULL,
completed_at INTEGER,
PRIMARY KEY (session_id, sub_session_id)
);Additional tables are created for usage tracking, checkpoints, and interrupt flags. See d1-migrations.ts for the full schema.
Usage
import { D1StateStore } from '@helix-agents/store-cloudflare';
export default {
async fetch(request: Request, env: Env) {
const stateStore = new D1StateStore({ database: env.DB });
// Save state
await stateStore.saveState(sessionId, sessionState);
// Load state
const loaded = await stateStore.loadState('session-123');
// Delete state
await stateStore.deleteState('session-123');
},
};Atomic Operations
// Append messages
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);
// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages('session-123', { limit: 100 });
// Update state atomically
await stateStore.saveState('session-123', updatedState);Message Truncation
For crash recovery scenarios:
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('session-123', messageCount);This removes messages beyond the specified count, used during crash recovery to sync messages with a checkpoint's messageCount field.
Interrupt Flag Operations
D1StateStore implements the InterruptableStateStore interface, enabling soft interruption of running agents. This is required for the Cloudflare Workflows runtime's interrupt functionality.
import { isInterruptableStateStore } from '@helix-agents/core';
// Check if store supports interrupts
if (isInterruptableStateStore(stateStore)) {
// Set interrupt flag (called by executor.interrupt())
await stateStore.setInterruptFlag('session-123', 'user_requested');
// Check for interrupt (called in workflow loop)
const flag = await stateStore.checkInterruptFlag('session-123');
if (flag) {
console.log(`Interrupted: ${flag.reason}`);
}
// Clear flag (called on resume or after handling)
await stateStore.clearInterruptFlag('session-123');
}InterruptableStateStore Support
Currently, only D1StateStore implements InterruptableStateStore. The in-memory and Redis state stores do not support interrupt flags. If using the Cloudflare Workflows runtime, you must use D1StateStore for interrupt functionality to work.
The interrupt flags are automatically created by the programmatic migrations in a dedicated table:
CREATE TABLE IF NOT EXISTS __agents_interrupt_flags (
session_id TEXT PRIMARY KEY,
interrupted INTEGER NOT NULL DEFAULT 0,
reason TEXT,
timestamp INTEGER
);See Interrupt and Resume for complete documentation on interrupt behavior, including sub-agent interrupt propagation.
Programmatic Migrations
For dynamic environments:
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
// Run migrations on first request
let migrated = false;
export default {
async fetch(request: Request, env: Env) {
if (!migrated) {
await runMigration(env.DB);
migrated = true;
}
const stateStore = new D1StateStore({ database: env.DB });
// ...
},
};DOStreamManager (Durable Objects)
Setup
Configure Durable Objects in wrangler.toml:
[durable_objects]
bindings = [
{ name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]Durable Object Implementation
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
import type { StreamChunk } from '@helix-agents/core';
interface StreamState {
chunks: StreamChunk[];
status: 'active' | 'ended' | 'failed';
endedAt?: number;
error?: string;
}
export class StreamManagerDO extends DurableObject {
private state: StreamState = {
chunks: [],
status: 'active',
};
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
// Load state from storage
ctx.blockConcurrencyWhile(async () => {
const stored = await ctx.storage.get<StreamState>('state');
if (stored) {
this.state = stored;
}
});
}
async write(chunk: StreamChunk): Promise<void> {
if (this.state.status !== 'active') {
throw new Error(`Cannot write to ${this.state.status} stream`);
}
this.state.chunks.push(chunk);
await this.ctx.storage.put('state', this.state);
// Notify WebSocket clients
for (const ws of this.ctx.getWebSockets()) {
ws.send(JSON.stringify(chunk));
}
}
async read(fromOffset: number = 0): Promise<StreamChunk[]> {
return this.state.chunks.slice(fromOffset);
}
async end(finalOutput?: unknown): Promise<void> {
this.state.status = 'ended';
this.state.endedAt = Date.now();
await this.ctx.storage.put('state', this.state);
// Close WebSocket connections
for (const ws of this.ctx.getWebSockets()) {
ws.close(1000, 'Stream ended');
}
}
async fail(error: string): Promise<void> {
this.state.status = 'failed';
this.state.error = error;
await this.ctx.storage.put('state', this.state);
for (const ws of this.ctx.getWebSockets()) {
ws.close(1011, error);
}
}
async getInfo(): Promise<{ status: string; chunkCount: number }> {
return {
status: this.state.status,
chunkCount: this.state.chunks.length,
};
}
async fetch(request: Request): Promise<Response> {
// WebSocket upgrade for real-time streaming
if (request.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair());
this.ctx.acceptWebSocket(server);
// Send historical chunks
for (const chunk of this.state.chunks) {
server.send(JSON.stringify(chunk));
}
return new Response(null, { status: 101, webSocket: client });
}
return new Response('Expected WebSocket', { status: 400 });
}
}Stream Manager Wrapper
// src/do-stream-manager.ts
import type { StreamManager, StreamWriter, StreamReader, StreamChunk } from '@helix-agents/core';
export class DOStreamManager implements StreamManager {
constructor(private readonly binding: DurableObjectNamespace) {}
async createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter> {
const stub = this.getStub(streamId);
return {
write: async (chunk: StreamChunk) => {
await stub.write(chunk);
},
close: async () => {
// Writer close is a no-op - stream continues
},
};
}
async createReader(streamId: string): Promise<StreamReader | null> {
const stub = this.getStub(streamId);
const info = await stub.getInfo();
if (info.status === 'failed') {
return null;
}
let offset = 0;
let done = false;
return {
[Symbol.asyncIterator]: () => ({
next: async () => {
if (done) {
return { done: true, value: undefined };
}
const chunks = await stub.read(offset);
if (chunks.length > 0) {
offset += chunks.length;
return { done: false, value: chunks[0] };
}
// Check if stream ended
const currentInfo = await stub.getInfo();
if (currentInfo.status === 'ended') {
done = true;
return { done: true, value: undefined };
}
// Poll for new chunks (in practice, use WebSocket)
await new Promise((r) => setTimeout(r, 100));
return this.next();
},
}),
close: async () => {},
};
}
async endStream(streamId: string, finalOutput?: unknown): Promise<void> {
const stub = this.getStub(streamId);
await stub.end(finalOutput);
}
async failStream(streamId: string, error: string): Promise<void> {
const stub = this.getStub(streamId);
await stub.fail(error);
}
private getStub(streamId: string) {
const id = this.binding.idFromName(streamId);
return this.binding.get(id);
}
}Export Durable Object
// src/index.ts
export { StreamManagerDO } from './stream-manager-do';Complete Example
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
import { DOStreamManager } from './do-stream-manager';
import { StreamManagerDO } from './stream-manager-do';
import { registry } from './agents';
export { StreamManagerDO };
export interface Env {
DB: D1Database;
STREAM_MANAGER: DurableObjectNamespace;
AGENT_WORKFLOW: Workflow;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Run migrations on startup
await runMigration(env.DB);
const stateStore = new D1StateStore({ database: env.DB });
const streamManager = new DOStreamManager(env.STREAM_MANAGER);
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore,
streamManager,
});
// POST /execute
if (url.pathname === '/execute' && request.method === 'POST') {
const { agentType, message, sessionId } = await request.json();
const agent = registry.get(agentType);
const handle = await executor.execute(agent, { message }, { sessionId });
return Response.json({
sessionId: handle.sessionId,
streamUrl: `/stream/${handle.sessionId}`,
});
}
// GET /stream/:sessionId (SSE)
if (url.pathname.startsWith('/stream/')) {
const sessionId = url.pathname.split('/').pop()!;
const handle = await executor.getHandle(registry.get('default'), sessionId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const stream = await handle.stream();
if (!stream) {
return new Response('Stream not available', { status: 404 });
}
return new Response(
new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for await (const chunk of stream) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
}
);
}
// GET /result/:sessionId
if (url.pathname.startsWith('/result/')) {
const sessionId = url.pathname.split('/').pop()!;
const handle = await executor.getHandle(registry.get('default'), sessionId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const result = await handle.result();
return Response.json(result);
}
return new Response('Not found', { status: 404 });
},
};D1 Considerations
Write Limits
D1 has write throughput limits. For high-volume writes:
// The framework handles message batching internally via appendMessages()
// For custom implementations, batch message inserts like this:
const messages = [msg1, msg2, msg3];
const stmt = env.DB.prepare(
'INSERT INTO __agents_messages (session_id, sequence, message, created_at) VALUES (?, ?, ?, ?)'
);
await env.DB.batch(
messages.map((msg, i) => stmt.bind(sessionId, baseSequence + i, JSON.stringify(msg), Date.now()))
);Read Replicas
D1 supports read replicas for global reads:
// Reads are automatically routed to nearest replica
const state = await stateStore.loadState(sessionId);
// Writes go to primary
await stateStore.saveState(sessionId, state);SQLite Limitations
D1 is SQLite-based:
- No stored procedures
- Limited concurrent writes
- JSON stored as TEXT (use JSON1 functions)
Durable Object Considerations
Geographic Pinning
Durable Objects are pinned to a region:
// First access pins the DO to nearest region
const stub = env.STREAM_MANAGER.get(id);
// Subsequent access goes to that region
// For global apps, consider DO per-regionMemory Limits
Durable Objects have memory limits:
// For large streams, persist to storage
async write(chunk: StreamChunk) {
this.state.chunks.push(chunk);
// Persist every N chunks
if (this.state.chunks.length % 100 === 0) {
await this.ctx.storage.put('state', this.state);
}
}WebSocket Limits
Limited concurrent WebSocket connections:
// Max ~32 WebSocket connections per DO
// For more, use pub/sub pattern or multiple DOsDeployment
Create D1 Database
wrangler d1 create agent-stateRun Migrations
wrangler d1 migrations apply agent-stateDeploy
wrangler deploySecrets
wrangler secret put OPENAI_API_KEYDO Runtime Stores
When using the Durable Objects runtime, stores are built into the AgentServer DO:
DOStateStore
Uses the DO's built-in SQLite for state persistence:
import { DOStateStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const stateStore = new DOStateStore({
sql: this.sql, // PartyServer's sql tagged template
logger: this.logger,
});
// Same SessionStateStore interface
await stateStore.saveState(sessionId, state);
const loaded = await stateStore.loadState(sessionId);
await stateStore.appendMessages(sessionId, messages);Key differences from D1StateStore:
- Uses synchronous
this.sqltemplate literals (not async D1 queries) - Single-session-per-DO architecture (no session_id filtering needed)
- No race conditions (single-threaded DO execution)
- Tables created automatically on first use
DOStateStore Schema v3 (v0.10+)
Breaking Change
DOStateStore schema v3 introduces new columns for crash recovery. Existing Durable Objects with schema v2 will throw SchemaVersionMismatchError on first access after upgrading.
What Changed
Schema v3 adds two columns to the checkpoints table:
| Column | Type | Purpose |
|---|---|---|
message_count | INTEGER | Exact message count at checkpoint time |
stream_sequence | INTEGER | Stream position for resumption |
These fields enable reliable crash recovery:
- Message truncation: On resume,
truncateMessages()usesmessage_countto remove orphaned messages written after the checkpoint - Stream cleanup: Stream managers use
stream_sequenceto remove orphaned chunks - Client resumption: Clients can resume streams from the stored sequence position
Why Store These Values?
Previously, these values were derived from state at load time. This caused issues:
- Race conditions: Derived values could be stale if state changed concurrently
- Data loss: If messages were appended after checkpoint but before crash, derived count wouldn't match actual stored messages
Now the exact values at checkpoint time are persisted, ensuring crash recovery works correctly.
Migration Steps
For new deployments: No action needed. New DOs automatically use schema v3.
For existing deployments with v2 schema:
Create new DO instances: The safest approach is to use new DO instances for new sessions
typescript// Use a new naming convention or namespace for v3 const doId = env.AGENTS.idFromName(`v3:session:${sessionId}`);Or migrate existing DOs: If you need to preserve existing data, you can manually run the migration SQL on each DO (advanced):
sqlALTER TABLE checkpoints ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0; ALTER TABLE checkpoints ADD COLUMN stream_sequence INTEGER NOT NULL DEFAULT 0; UPDATE schema_version SET version = 3;Handle the error gracefully: If migration isn't possible, catch
SchemaVersionMismatchError:typescripttry { const response = await stub.fetch('/start', { ... }); } catch (error) { if (error.message?.includes('SchemaVersionMismatch')) { // Redirect to new DO instance const newDoId = env.AGENTS.idFromName(`v3:${originalName}`); // ... } }
DOStreamManager
Streams directly to connected WebSocket/SSE clients:
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const streamManager = new DOStreamManager({
sql: this.sql,
getConnections: () => this.getConnections(),
broadcast: (msg) => this.broadcast(msg),
sseConnections: this.sseConnections,
logger: this.logger,
});
// Writes to SQLite AND broadcasts to all connected clients
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk); // FREE - no subrequest!Key differences from DurableObjectStreamManager:
- Writes directly to WebSocket/SSE connections (no subrequest cost)
- Also persists to SQLite for resumability
- Built into AgentServer (not separate DO)
- Supports hibernation (connections survive DO sleep/wake)
Stream cleanup methods:
// Clean up chunks beyond a specific step (for crash recovery)
await streamManager.cleanupToStep(streamId, stepCount);
// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream(streamId);cleanupToStep() removes chunks with step > stepCount, used during crash recovery. resetStream() clears all chunks, called by execute() for fresh starts.
DOUsageStore
Tracks token/tool/sub-agent usage using DO's SQLite:
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const usageStore = new DOUsageStore({
sql: this.sql,
sessionId: 'session-123',
logger: this.logger,
});
// Same UsageStore interface
await usageStore.recordEntry(entry);
const rollup = await usageStore.getRollup(sessionId);
const entries = await usageStore.getEntries(sessionId, { kinds: ['tokens'] });Key differences from D1UsageStore:
- Uses synchronous
this.sqltemplate literals (not async D1 queries) - Single-session-per-DO architecture (sessionId passed at construction)
- Created automatically by AgentServer
- Usage accessible via
/usageendpoint on the DO
Automatic usage tracking:
AgentServer automatically creates DOUsageStore and passes it to the JSAgentExecutor. After execution, retrieve usage via the handle:
const handle = await executor.execute(agent, 'Do the task');
await handle.result();
// Get usage rollup
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);Automatic Schema
DOStateStore creates these tables automatically:
CREATE TABLE state (
id INTEGER PRIMARY KEY DEFAULT 1,
session_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
step_count INTEGER DEFAULT 0,
custom_state TEXT,
output TEXT,
error TEXT,
checkpoint_id TEXT,
...
);
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT,
tool_calls TEXT,
...
);
CREATE TABLE stream_chunks (
sequence INTEGER PRIMARY KEY,
chunk TEXT NOT NULL,
created_at INTEGER NOT NULL
);Next Steps
- In-Memory Storage - For development
- Redis Storage - For self-hosted production
- Cloudflare Runtime - Overview of both approaches
- DO Runtime - Uses DOStateStore and DOStreamManager
- Workflows Runtime - Uses D1StateStore