Cloudflare Runtime
The Cloudflare runtime (@helix-agents/runtime-cloudflare) executes agents on Cloudflare's edge network using Workers and Workflows. This provides global low-latency execution with serverless scaling.
When to Use
Good fit:
- Global edge deployment with low latency
- Serverless architecture (no servers to manage)
- Existing Cloudflare infrastructure (Workers, D1, DO)
- Cost optimization with pay-per-request
- Applications already using Cloudflare
Not ideal for:
- Long-running agents (> 15 minutes)
- Workflows requiring complex orchestration
- Heavy computation workloads
- Development without Cloudflare account
Prerequisites
- Cloudflare account with Workers Paid plan
- Wrangler CLI:
npm install -g wrangler - D1 database for state storage
- Durable Objects for streaming
Installation
npm install @helix-agents/runtime-cloudflare @helix-agents/store-cloudflareArchitecture
┌──────────────────────────────────────────────────────────────┐
│ Edge Location (Global) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Cloudflare Worker │ │
│ │ - HTTP endpoints │ │
│ │ - CloudflareAgentExecutor │ │
│ │ - Starts Workflows │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Cloudflare Workflow │ │
│ │ - Agent execution steps │ │
│ │ - LLM calls │ │
│ │ - Tool execution │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────┴─────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────────────┐ │
│ │ D1 Database │ │ Durable Object │ │
│ │ - Agent state │ │ - Stream events │ │
│ │ - Messages │ │ - Real-time streaming │ │
│ └─────────────────────┘ └─────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘Setup Guide
1. Configure wrangler.toml
name = "agent-worker"
main = "src/index.ts"
compatibility_date = "2024-01-01"
# D1 Database for state
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"
# Durable Object for streaming
[durable_objects]
bindings = [
{ name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]
# Workflow binding
[[workflows]]
name = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"2. Create D1 Migrations
-- migrations/0001_init.sql
CREATE TABLE IF NOT EXISTS agent_state (
run_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
status TEXT NOT NULL,
step_count INTEGER DEFAULT 0,
custom_state TEXT,
output TEXT,
error TEXT,
parent_agent_id TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
tool_calls TEXT,
tool_call_id TEXT,
tool_name TEXT,
thinking TEXT,
sequence INTEGER NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY (run_id) REFERENCES agent_state(run_id)
);
CREATE INDEX idx_messages_run_id ON messages(run_id);Run migrations:
wrangler d1 migrations apply agent-state3. Create the Durable Object
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
export class StreamManagerDO extends DurableObject {
private chunks: Map<string, StreamChunk[]> = new Map();
async write(streamId: string, chunk: StreamChunk): Promise<void> {
const chunks = this.chunks.get(streamId) ?? [];
chunks.push(chunk);
this.chunks.set(streamId, chunks);
// Notify connected clients via WebSocket
this.ctx.getWebSockets().forEach((ws) => {
ws.send(JSON.stringify(chunk));
});
}
async read(streamId: string, fromOffset: number): Promise<StreamChunk[]> {
const chunks = this.chunks.get(streamId) ?? [];
return chunks.slice(fromOffset);
}
async fetch(request: Request): Promise<Response> {
// WebSocket upgrade for real-time streaming
if (request.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair());
this.ctx.acceptWebSocket(server);
return new Response(null, { status: 101, webSocket: client });
}
return new Response('Expected WebSocket', { status: 400 });
}
}4. Create the Workflow
// src/workflows/agent-workflow.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { executeWorkflowStep } from '@helix-agents/runtime-cloudflare';
export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
const { runId, agentType, message, streamId } = event.payload;
// Get agent from registry
const registry = new AgentRegistry();
const agent = registry.get(agentType);
// Initialize or load state
let state = await step.do('load-state', async () => {
return this.env.DB.prepare('SELECT * FROM agent_state WHERE run_id = ?').bind(runId).first();
});
if (!state) {
state = await step.do('init-state', async () => {
// Initialize new agent state
return initializeState(agent, runId, streamId, message);
});
}
// Main execution loop
while (state.status === 'running') {
// Check for abort
const aborted = await step.do(`check-abort-${state.step_count}`, async () => {
const row = await this.env.DB.prepare('SELECT aborted FROM agent_state WHERE run_id = ?')
.bind(runId)
.first();
return row?.aborted === 1;
});
if (aborted) {
state.status = 'failed';
state.error = 'Aborted by user';
break;
}
// Execute one step
state = await step.do(`step-${state.step_count}`, async () => {
return executeWorkflowStep(agent, state, this.env.DB, this.env.STREAM_MANAGER, this.env);
});
}
// Save final state
await step.do('save-final', async () => {
await saveState(this.env.DB, state);
});
return {
status: state.status,
output: state.output,
error: state.error,
};
}
}5. Create the Worker Entry
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
import { AgentWorkflow } from './workflows/agent-workflow';
import { StreamManagerDO } from './stream-manager-do';
export { AgentWorkflow, StreamManagerDO };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Create executor
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore: new D1StateStore(env.DB),
streamManager: new DOStreamManager(env.STREAM_MANAGER),
});
// POST /agent/execute - Start new execution
if (url.pathname === '/agent/execute' && request.method === 'POST') {
const { agentType, message } = await request.json();
const agent = registry.get(agentType);
const handle = await executor.execute(agent, message);
return Response.json({
runId: handle.runId,
streamUrl: `/agent/stream/${handle.runId}`,
});
}
// GET /agent/stream/:runId - SSE stream
if (url.pathname.startsWith('/agent/stream/')) {
const runId = url.pathname.split('/').pop();
const handle = await executor.getHandle(registry.get('default'), runId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const stream = await handle.stream();
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(`data: ${JSON.stringify(chunk)}\n\n`);
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
}
);
}
// GET /agent/result/:runId - Get result
if (url.pathname.startsWith('/agent/result/')) {
const runId = url.pathname.split('/').pop();
const handle = await executor.getHandle(registry.get('default'), runId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const result = await handle.result();
return Response.json(result);
}
return new Response('Not found', { status: 404 });
},
};Agent Registry
Register agents so the workflow can instantiate them:
// src/registry.ts
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { ResearchAgent, AnalyzerAgent } from './agents';
export const registry = new AgentRegistry();
registry.register(ResearchAgent);
registry.register(AnalyzerAgent);Executor API
Creating the Executor
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW, // From wrangler.toml
stateStore: d1StateStore,
streamManager: doStreamManager,
});Executing Agents
const handle = await executor.execute(MyAgent, 'Research quantum computing', {
runId: 'custom-id', // Optional
});Getting Handles
const handle = await executor.getHandle(MyAgent, runId);
if (handle) {
const result = await handle.result();
console.log(result);
}Streaming
Server-Sent Events
// Worker endpoint
if (url.pathname.startsWith('/stream/')) {
const stream = await handle.stream();
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
}
);
}WebSocket (via Durable Objects)
// Client connects to DO WebSocket
const ws = new WebSocket(`wss://your-worker.workers.dev/ws/${streamId}`);
ws.onmessage = (event) => {
const chunk = JSON.parse(event.data);
handleChunk(chunk);
};Sub-Agent Handling
Sub-agents execute as nested workflow calls:
// In workflow step
const subAgentResult = await step.do('sub-agent-call', async () => {
// Start child workflow
const instance = await this.env.AGENT_WORKFLOW.create({
id: `agent__${subAgentType}__${subRunId}`,
params: {
agentType: subAgentType,
runId: subRunId,
streamId: parentStreamId, // Same stream
message: inputMessage,
parentAgentId: parentRunId,
},
});
// Wait for completion
return pollUntilComplete(instance);
});Workflow Steps
Cloudflare Workflows use steps for durability:
// Each step is durable - if worker restarts, execution continues
state = await step.do('step-1', async () => {
// LLM call
return await callLLM(messages, tools);
});
state = await step.do('step-2', async () => {
// Tool execution
return await executeTools(toolCalls);
});Key points:
- Steps are atomic and retried on failure
- State between steps is persisted
- Worker can restart between steps without data loss
Abort Handling
// Set abort flag in D1
await handle.abort('User cancelled');
// In workflow, check abort flag each step
const aborted = await step.do('check-abort', async () => {
const row = await env.DB.prepare('SELECT aborted FROM agent_state WHERE run_id = ?')
.bind(runId)
.first();
return row?.aborted === 1;
});
if (aborted) {
return { status: 'failed', error: 'Aborted' };
}Deployment
Development
wrangler devProduction
wrangler deploySecrets
wrangler secret put OPENAI_API_KEYAccess in code:
const apiKey = env.OPENAI_API_KEY;D1 State Store
The Cloudflare runtime uses D1 for state:
import { D1StateStore } from '@helix-agents/store-cloudflare';
const stateStore = new D1StateStore(env.DB);
// Save state
await stateStore.save(state);
// Load state
const state = await stateStore.load(runId);
// Append messages
await stateStore.appendMessages(runId, messages);Limitations
Workflow Duration
Cloudflare Workflows have time limits:
- Individual steps: 15 minutes
- Total workflow: varies by plan
For longer agents, implement checkpointing.
Cold Starts
Edge workers may have cold starts. Minimize initialization code.
D1 Limitations
- Single region (replication to read replicas)
- Write throughput limits
- No full-text search
Durable Object Limits
- Single instance per ID
- Memory limits per instance
- Geographic pinning
Best Practices
1. Efficient Steps
Group related operations in single steps:
// Good: One step for LLM + response processing
await step.do('llm-step', async () => {
const response = await callLLM(...);
const parsed = processResponse(response);
await saveToD1(parsed);
return parsed;
});
// Avoid: Separate steps for each operation (more overhead)2. Handle Rate Limits
Cloudflare has request limits. Implement backoff:
async function withRetry<T>(fn: () => Promise<T>): Promise<T> {
for (let attempt = 0; attempt < 3; attempt++) {
try {
return await fn();
} catch (error) {
if (error.message.includes('rate limit')) {
await sleep(Math.pow(2, attempt) * 1000);
continue;
}
throw error;
}
}
throw new Error('Max retries exceeded');
}3. Optimize D1 Queries
Use indexes and prepared statements:
// Good: Prepared statement with index
const state = await env.DB.prepare('SELECT * FROM agent_state WHERE run_id = ?')
.bind(runId)
.first();
// Avoid: String interpolation, full scans4. Stream Efficiently
Buffer small chunks before sending:
const buffer: StreamChunk[] = [];
const BATCH_SIZE = 10;
async function flushBuffer() {
if (buffer.length > 0) {
await streamManager.writeBatch(streamId, buffer);
buffer.length = 0;
}
}
for await (const chunk of stream) {
buffer.push(chunk);
if (buffer.length >= BATCH_SIZE) {
await flushBuffer();
}
}
await flushBuffer();Next Steps
- JavaScript Runtime - Simpler option for development
- Temporal Runtime - Alternative for long-running agents
- Storage: Cloudflare - D1 and Durable Objects details