Cloudflare Storage
The Cloudflare storage package (@helix-agents/store-cloudflare) provides state and stream storage for edge deployment. There are two sets of stores depending on which runtime approach you use:
| Approach | State Store | Stream Manager | Package |
|---|---|---|---|
| Workflows | D1StateStore | DurableObjectStreamManager | @helix-agents/store-cloudflare |
| Durable Objects | DOStateStore | DOStreamManager | @helix-agents/runtime-cloudflare |
Choose Based on Runtime
- Workflows runtime: Use D1StateStore + DurableObjectStreamManager from
@helix-agents/store-cloudflare - DO runtime: Use DOStateStore + DOStreamManager from
@helix-agents/runtime-cloudflare(built into AgentServer)
When to Use
Good fit:
- Cloudflare Workers deployments
- Global edge distribution
- Serverless architecture
- Existing Cloudflare infrastructure
Not ideal for:
- Non-Cloudflare environments
- High write throughput (D1 limitations - Workflows only)
- Complex queries (limited SQL support)
Installation
npm install @helix-agents/store-cloudflareEntry Points and Import Patterns
The package provides three entry points to support different deployment contexts:
| Entry Point | Use Case | Contains |
|---|---|---|
@helix-agents/store-cloudflare | Worker entry points (full package) | Everything |
@helix-agents/store-cloudflare/d1 | Next.js routes, any Node.js context | D1StateStore, D1UsageStore, migrations |
@helix-agents/store-cloudflare/stream | Worker entry points only | StreamServer, DurableObjectStreamManager |
Why Split Entry Points?
The /stream entry point imports partyserver which depends on cloudflare:workers - a Cloudflare-specific module that cannot be imported in:
- Next.js API routes (even with OpenNext for Cloudflare)
- Node.js environments
- Any code that runs during Next.js build
This is because:
- Build-time evaluation: Next.js evaluates all imports during build, even for server routes
- cloudflare:workers is runtime-only: The
cloudflare:workersURL scheme only works inside the Cloudflare Workers runtime - Durable Object class definitions vs bindings: You CAN access Durable Objects via
envbindings, but you CANNOT import the class definition outside a Worker
Correct Import Patterns
In Next.js API routes (OpenNext):
// ✅ CORRECT - Use /d1 entry point
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare/d1';
export async function GET(request: Request) {
const { env } = getCloudflareContext();
const stateStore = new D1StateStore({ database: env.DATABASE });
// Access Durable Objects via binding (NOT import)
const streamStub = env.STREAMS.get(env.STREAMS.idFromName('my-stream'));
}
// ❌ WRONG - Will fail during Next.js build
import { D1StateStore } from '@helix-agents/store-cloudflare';
import { StreamServer } from '@helix-agents/store-cloudflare/stream';In Cloudflare Worker entry points:
// ✅ CORRECT - Worker entry points can import everything
import { StreamServer } from '@helix-agents/store-cloudflare/stream';
import { D1StateStore } from '@helix-agents/store-cloudflare/d1';
// Export Durable Object class for Cloudflare to instantiate
export { StreamServer };
export default {
async fetch(request: Request, env: Env) {
// Now you can use both D1 and stream functionality
const stateStore = new D1StateStore({ database: env.DB });
// Access StreamServer via binding
const stub = env.STREAMS.get(env.STREAMS.idFromName('stream-id'));
},
};Architecture Pattern for Next.js + Cloudflare
When using OpenNext with Cloudflare:
graph TB
subgraph NextJS ["Next.js App (OpenNext)"]
subgraph API ["API Routes"]
A1["Import from @helix-agents/store-cloudflare/d1"]
A2["Query D1 directly for state/messages"]
A3["Access workers via env.WORKER_NAME service binding"]
end
end
NextJS -->|"Service Binding"| AgentWorker
subgraph AgentWorker ["Agent Worker (separate worker)"]
subgraph Entry ["Entry Point (src/index.ts)"]
E1["Export StreamServer (Durable Object class)"]
E2["Import from any entry point"]
E3["Handle agent execution"]
end
endPrerequisites
You need:
- Cloudflare account with Workers Paid plan
- D1 database created
- Durable Object namespace configured
D1StateStore
Setup
Configure D1 in wrangler.toml:
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"Migrations
The D1StateStore uses programmatic migrations that are automatically applied. You should call runMigration() on startup:
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
export default {
async fetch(request: Request, env: Env) {
// Run migrations on startup (safe to call every request - no-op if already migrated)
await runMigration(env.DB);
const stateStore = new D1StateStore({ database: env.DB });
// ...
},
};The framework creates these tables automatically (all prefixed with __agents_). The current schema is V9; migrations apply progressively from V1.
-- Core state table (V1 baseline + V3 user context + V7 HITL columns
-- + V8 completed_client_tool_calls + V9 suspension_context)
CREATE TABLE IF NOT EXISTS __agents_states (
session_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
parent_session_id TEXT,
stream_id TEXT NOT NULL,
step_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'active',
output TEXT,
error TEXT,
failure_reason TEXT, -- V7: cascade discriminator
aborted INTEGER NOT NULL DEFAULT 0,
abort_reason TEXT,
custom_state TEXT NOT NULL DEFAULT '{}',
user_id TEXT, -- V3
tags TEXT, -- V3 (JSON array)
metadata TEXT, -- V3 (JSON object)
pending_client_tool_calls TEXT, -- V7: HITL pending map
completed_client_tool_calls TEXT, -- V8: idempotency tombstones
expires_at INTEGER, -- V7 (also packed in suspension_context)
suspension_context TEXT DEFAULT NULL, -- V9: stateless HITL/client-tools
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Partial index for operator cleanup of abandoned suspended sessions
CREATE INDEX IF NOT EXISTS idx___agents_states_suspension_expires_at
ON __agents_states(json_extract(suspension_context, '$.expiresAt'))
WHERE suspension_context IS NOT NULL;
-- Messages table (separated for O(1) append)
CREATE TABLE IF NOT EXISTS __agents_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
message TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(session_id, sequence)
);
-- Sub-session refs (V2 adds remote_json; V4 adds mode + name for persistent children)
CREATE TABLE IF NOT EXISTS __agents_sub_session_refs (
session_id TEXT NOT NULL,
sub_session_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
parent_tool_call_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
remote_json TEXT, -- V2
mode TEXT, -- V4: 'persistent' | 'ephemeral'
name TEXT, -- V4: persistent child name
started_at INTEGER NOT NULL,
completed_at INTEGER,
PRIMARY KEY (session_id, sub_session_id)
);
-- Run tracking (__agents_runs is part of the V1 baseline)
CREATE TABLE IF NOT EXISTS __agents_runs (
run_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
turn INTEGER NOT NULL,
status TEXT NOT NULL,
step_count INTEGER NOT NULL DEFAULT 0,
started_at INTEGER NOT NULL,
completed_at INTEGER
);suspension_context packs suspendedAwaitingChildren, suspendedStepId, tracingContext, and expiresAt as a single JSON-encoded blob, written and read atomically as part of saveStateAndPromoteStaging.
Additional tables are created for usage tracking, checkpoints, interrupt flags, and message-merge staging. See d1-migrations.ts for the full schema progression V1 → V9.
Atomic Writes (saveStateAndPromoteStaging)
D1StateStore implements the v7 saveStateAndPromoteStaging primitive using db.batch([...]), which Cloudflare D1 executes as a single auto-commit transaction. The atomic batch:
- INSERTs new messages (one row per message)
- UPDATEs
__agents_stateswith the new state, version (post-bump), and suspension context - UPDATEs the staging row to mark it promoted with the new checkpoint id
- Optional: INSERTs the
__agents_checkpointsrow when one is needed
This guarantees the cross-runtime invariant C-1: pendingClientToolCalls, clientToolCallOwnership, completed phase-1 messages, the checkpoint, and suspendedStepId all become visible together.
Usage
import { D1StateStore } from '@helix-agents/store-cloudflare';
export default {
async fetch(request: Request, env: Env) {
const stateStore = new D1StateStore({ database: env.DB });
// Save state
await stateStore.saveState(sessionId, sessionState);
// Load state
const loaded = await stateStore.loadState('session-123');
// Delete state
await stateStore.deleteState('session-123');
},
};Atomic Operations
// Append messages
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);
// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages('session-123', { limit: 100 });
// Update state atomically
await stateStore.saveState('session-123', updatedState);Message Truncation
For crash recovery scenarios:
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('session-123', messageCount);This removes messages beyond the specified count, used during crash recovery to sync messages with a checkpoint's messageCount field.
Interrupt Flag Operations
D1StateStore (and all v7 stores — memory, Redis, postgres, DO) implement durable interrupt flags directly on the SessionStateStore interface, enabling soft interruption of running agents.
// Set interrupt flag (called by executor.interrupt() / POST /chat/{id}/interrupt)
await stateStore.setInterruptFlag('session-123', 'user_requested');
// Check for interrupt (called inside the workflow / DO loop)
const flag = await stateStore.checkInterruptFlag('session-123');
if (flag) {
console.log(`Interrupted: ${flag.reason}`);
}
// Clear flag (called on resume or after handling)
await stateStore.clearInterruptFlag('session-123');Durable interrupt parity (v7)
All v7 stores write durable interrupt flags. The previous restriction (only D1 supported InterruptableStateStore) is gone — setInterruptFlag / checkInterruptFlag / clearInterruptFlag are now part of the base SessionStateStore contract and work uniformly across memory, Redis, postgres, D1, and DO.
The interrupt flags are automatically created by the programmatic migrations in a dedicated table:
CREATE TABLE IF NOT EXISTS __agents_interrupt_flags (
session_id TEXT PRIMARY KEY,
interrupted INTEGER NOT NULL DEFAULT 0,
reason TEXT,
timestamp INTEGER
);See Interrupt and Resume for complete documentation on interrupt behavior, including sub-agent interrupt propagation.
Programmatic Migrations
For dynamic environments:
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
// Run migrations on first request
let migrated = false;
export default {
async fetch(request: Request, env: Env) {
if (!migrated) {
await runMigration(env.DB);
migrated = true;
}
const stateStore = new D1StateStore({ database: env.DB });
// ...
},
};DOStreamManager (Durable Objects)
Setup
Configure Durable Objects in wrangler.toml:
[durable_objects]
bindings = [
{ name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]Durable Object Implementation
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
import type { StreamChunk } from '@helix-agents/core';
interface StreamState {
chunks: StreamChunk[];
status: 'active' | 'ended' | 'failed';
endedAt?: number;
error?: string;
}
export class StreamManagerDO extends DurableObject {
private state: StreamState = {
chunks: [],
status: 'active',
};
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
// Load state from storage
ctx.blockConcurrencyWhile(async () => {
const stored = await ctx.storage.get<StreamState>('state');
if (stored) {
this.state = stored;
}
});
}
async write(chunk: StreamChunk): Promise<void> {
if (this.state.status !== 'active') {
throw new Error(`Cannot write to ${this.state.status} stream`);
}
this.state.chunks.push(chunk);
await this.ctx.storage.put('state', this.state);
// Notify WebSocket clients
for (const ws of this.ctx.getWebSockets()) {
ws.send(JSON.stringify(chunk));
}
}
async read(fromOffset: number = 0): Promise<StreamChunk[]> {
return this.state.chunks.slice(fromOffset);
}
async end(finalOutput?: unknown): Promise<void> {
this.state.status = 'ended';
this.state.endedAt = Date.now();
await this.ctx.storage.put('state', this.state);
// Close WebSocket connections
for (const ws of this.ctx.getWebSockets()) {
ws.close(1000, 'Stream ended');
}
}
async fail(error: string): Promise<void> {
this.state.status = 'failed';
this.state.error = error;
await this.ctx.storage.put('state', this.state);
for (const ws of this.ctx.getWebSockets()) {
ws.close(1011, error);
}
}
async getInfo(): Promise<{ status: string; chunkCount: number }> {
return {
status: this.state.status,
chunkCount: this.state.chunks.length,
};
}
async fetch(request: Request): Promise<Response> {
// WebSocket upgrade for real-time streaming
if (request.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair());
this.ctx.acceptWebSocket(server);
// Send historical chunks
for (const chunk of this.state.chunks) {
server.send(JSON.stringify(chunk));
}
return new Response(null, { status: 101, webSocket: client });
}
return new Response('Expected WebSocket', { status: 400 });
}
}Stream Manager Wrapper
// src/do-stream-manager.ts
import type { StreamManager, StreamWriter, StreamReader, StreamChunk } from '@helix-agents/core';
export class DOStreamManager implements StreamManager {
constructor(private readonly binding: DurableObjectNamespace) {}
async createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter> {
const stub = this.getStub(streamId);
return {
write: async (chunk: StreamChunk) => {
await stub.write(chunk);
},
close: async () => {
// Writer close is a no-op - stream continues
},
};
}
async createReader(streamId: string): Promise<StreamReader | null> {
const stub = this.getStub(streamId);
const info = await stub.getInfo();
if (info.status === 'failed') {
return null;
}
let offset = 0;
let done = false;
return {
[Symbol.asyncIterator]: () => ({
next: async () => {
if (done) {
return { done: true, value: undefined };
}
const chunks = await stub.read(offset);
if (chunks.length > 0) {
offset += chunks.length;
return { done: false, value: chunks[0] };
}
// Check if stream ended
const currentInfo = await stub.getInfo();
if (currentInfo.status === 'ended') {
done = true;
return { done: true, value: undefined };
}
// Poll for new chunks (in practice, use WebSocket)
await new Promise((r) => setTimeout(r, 100));
return this.next();
},
}),
close: async () => {},
};
}
async endStream(streamId: string, finalOutput?: unknown): Promise<void> {
const stub = this.getStub(streamId);
await stub.end(finalOutput);
}
async failStream(streamId: string, error: string): Promise<void> {
const stub = this.getStub(streamId);
await stub.fail(error);
}
private getStub(streamId: string) {
const id = this.binding.idFromName(streamId);
return this.binding.get(id);
}
}Export Durable Object
// src/index.ts
export { StreamManagerDO } from './stream-manager-do';Complete Example
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
import { DOStreamManager } from './do-stream-manager';
import { StreamManagerDO } from './stream-manager-do';
import { registry } from './agents';
export { StreamManagerDO };
export interface Env {
DB: D1Database;
STREAM_MANAGER: DurableObjectNamespace;
AGENT_WORKFLOW: Workflow;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Run migrations on startup
await runMigration(env.DB);
const stateStore = new D1StateStore({ database: env.DB });
const streamManager = new DOStreamManager(env.STREAM_MANAGER);
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore,
streamManager,
});
// POST /execute
if (url.pathname === '/execute' && request.method === 'POST') {
const { agentType, message, sessionId } = await request.json();
const agent = registry.get(agentType);
const handle = await executor.execute(agent, { message }, { sessionId });
return Response.json({
sessionId: handle.sessionId,
streamUrl: `/stream/${handle.sessionId}`,
});
}
// GET /stream/:sessionId (SSE)
if (url.pathname.startsWith('/stream/')) {
const sessionId = url.pathname.split('/').pop()!;
const handle = await executor.getHandle(registry.get('default'), sessionId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const stream = await handle.stream();
if (!stream) {
return new Response('Stream not available', { status: 404 });
}
return new Response(
new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for await (const chunk of stream) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
}
);
}
// GET /result/:sessionId
if (url.pathname.startsWith('/result/')) {
const sessionId = url.pathname.split('/').pop()!;
const handle = await executor.getHandle(registry.get('default'), sessionId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const result = await handle.result();
return Response.json(result);
}
return new Response('Not found', { status: 404 });
},
};D1 Considerations
Write Limits
D1 has write throughput limits. For high-volume writes:
// The framework handles message batching internally via appendMessages()
// For custom implementations, batch message inserts like this:
const messages = [msg1, msg2, msg3];
const stmt = env.DB.prepare(
'INSERT INTO __agents_messages (session_id, sequence, message, created_at) VALUES (?, ?, ?, ?)'
);
await env.DB.batch(
messages.map((msg, i) => stmt.bind(sessionId, baseSequence + i, JSON.stringify(msg), Date.now()))
);Read Replicas
D1 supports read replicas for global reads:
// Reads are automatically routed to nearest replica
const state = await stateStore.loadState(sessionId);
// Writes go to primary
await stateStore.saveState(sessionId, state);SQLite Limitations
D1 is SQLite-based:
- No stored procedures
- Limited concurrent writes
- JSON stored as TEXT (use JSON1 functions)
Durable Object Considerations
Geographic Pinning
Durable Objects are pinned to a region:
// First access pins the DO to nearest region
const stub = env.STREAM_MANAGER.get(id);
// Subsequent access goes to that region
// For global apps, consider DO per-regionMemory Limits
Durable Objects have memory limits:
// For large streams, persist to storage
async write(chunk: StreamChunk) {
this.state.chunks.push(chunk);
// Persist every N chunks
if (this.state.chunks.length % 100 === 0) {
await this.ctx.storage.put('state', this.state);
}
}WebSocket Limits
Limited concurrent WebSocket connections:
// Max ~32 WebSocket connections per DO
// For more, use pub/sub pattern or multiple DOsDeployment
Create D1 Database
wrangler d1 create agent-stateRun Migrations
wrangler d1 migrations apply agent-stateDeploy
wrangler deploySecrets
wrangler secret put OPENAI_API_KEYDO Runtime Stores
When using the Durable Objects runtime, stores are built into the AgentServer DO:
DOStateStore
Uses the DO's built-in SQLite for state persistence:
import { DOStateStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const stateStore = new DOStateStore({
sql: this.sql, // PartyServer's sql tagged template
logger: this.logger,
});
// Same SessionStateStore interface
await stateStore.saveState(sessionId, state);
const loaded = await stateStore.loadState(sessionId);
await stateStore.appendMessages(sessionId, messages);Key differences from D1StateStore:
- Uses synchronous
this.sqltemplate literals (not async D1 queries) - Single-session-per-DO architecture (no session_id filtering needed)
- No race conditions (single-threaded DO execution)
- Tables created automatically on first use
compareAndSetStatus return shape (v7)
The v7 contract returns a discriminated union (replacing the boolean of v6) so callers can branch on conflict without an extra round-trip:
const result = await stateStore.compareAndSetStatus(
sessionId,
['active'], // expected statuses
'paused', // new status
{ error: 'user paused', expectedVersion: 7 }
);
if (result.ok) {
console.log(`Promoted; new version=${result.newVersion}`);
} else {
console.log(`CAS conflict: stored=${result.currentStatus} v${result.currentVersion}`);
}Both D1StateStore and DOStateStore implement this signature uniformly with the rest of the v7 stores.
DOStateStore Schema V5
The DO-local SQLite schema is at V5 as of the v7 stateless-suspension release (DO_SCHEMA_VERSION = 5). Migrations apply lazily on first DO access — no operator action required, even for upgrades from earlier versions.
| Version | Adds |
|---|---|
| V1 | Baseline state, messages, checkpoints, sub-session refs (incl. message_count/stream_sequence on checkpoints) |
| V2 | mode column on sub_session_refs (persistent sub-agent disambiguation) |
| V3 | pending_client_tool_calls, root_session_id, client_tool_call_ownership on state (HITL pending map + cross-session ownership) |
| V4 | completed_client_tool_calls TEXT on state (HITL idempotency tombstones) |
| V5 | suspension_context TEXT DEFAULT NULL on state (stateless HITL: packs suspendedAwaitingChildren, suspendedStepId, tracingContext, expiresAt, failureReason) |
-- V3
ALTER TABLE state ADD COLUMN pending_client_tool_calls TEXT;
ALTER TABLE state ADD COLUMN root_session_id TEXT;
ALTER TABLE state ADD COLUMN client_tool_call_ownership TEXT;
-- V4
ALTER TABLE state ADD COLUMN completed_client_tool_calls TEXT;
-- V5
ALTER TABLE state ADD COLUMN suspension_context TEXT DEFAULT NULL;suspension_context carries the same JSON-encoded payload as the D1 column: suspendedAwaitingChildren, suspendedStepId, tracingContext, expiresAt.
Atomic Writes via db.batch
DOStateStore.saveStateAndPromoteStaging uses transactionSync(() => { ... }) (the DO equivalent of db.batch) so that messages, state row updates, and staging promotion become visible together. This satisfies the cross-runtime atomicity invariant C-1 documented in the core SessionStateStore interface.
Lazy Migration
When a DO with schema V<5 wakes up after the v7 upgrade, the next saveState / loadState call detects the older schema_version row and runs the V→5 migrations inline before serving the request. Idle DOs migrate on their next access — there is no SchemaVersionMismatchError and no namespace-rename workaround needed.
DOStreamManager
Streams directly to connected WebSocket/SSE clients:
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const streamManager = new DOStreamManager({
sql: this.sql,
getConnections: () => this.getConnections(),
broadcast: (msg) => this.broadcast(msg),
sseConnectionManager: this.sseConnectionManager,
logger: this.logger,
});
// Writes to SQLite AND broadcasts to all connected clients
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk); // FREE - no subrequest!Key differences from DurableObjectStreamManager:
- Writes directly to WebSocket/SSE connections (no subrequest cost)
- Also persists to SQLite for resumability
- Built into AgentServer (not separate DO)
- Supports hibernation (connections survive DO sleep/wake)
Stream cleanup methods:
// Clean up chunks beyond a specific step (for crash recovery)
await streamManager.cleanupToStep(streamId, stepCount);
// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream(streamId);cleanupToStep() removes chunks with step > stepCount, used during crash recovery. resetStream() clears all chunks, called by execute() for fresh starts.
DOUsageStore
Tracks token/tool/sub-agent usage using DO's SQLite:
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const usageStore = new DOUsageStore({
sql: this.sql,
sessionId: 'session-123',
logger: this.logger,
});
// Same UsageStore interface
await usageStore.recordEntry(entry);
const rollup = await usageStore.getRollup(sessionId);
const entries = await usageStore.getEntries(sessionId, { kinds: ['tokens'] });Key differences from D1UsageStore:
- Uses synchronous
this.sqltemplate literals (not async D1 queries) - Single-session-per-DO architecture (sessionId passed at construction)
- Created automatically by AgentServer
- Usage accessible via
/usageendpoint on the DO
Automatic usage tracking:
AgentServer automatically creates DOUsageStore and passes it to the JSAgentExecutor. After execution, retrieve usage via the handle:
const handle = await executor.execute(agent, 'Do the task');
await handle.result();
// Get usage rollup
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);Automatic Schema
DOStateStore creates these tables automatically:
CREATE TABLE state (
id INTEGER PRIMARY KEY DEFAULT 1,
session_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
step_count INTEGER DEFAULT 0,
custom_state TEXT,
output TEXT,
error TEXT,
checkpoint_id TEXT,
...
);
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT,
tool_calls TEXT,
...
);
CREATE TABLE stream_chunks (
sequence INTEGER PRIMARY KEY,
chunk TEXT NOT NULL,
created_at INTEGER NOT NULL
);Next Steps
- In-Memory Storage - For development
- Redis Storage - For self-hosted production
- Cloudflare Runtime - Overview of both approaches
- DO Runtime - Uses DOStateStore and DOStreamManager
- Workflows Runtime - Uses D1StateStore