Skip to content

Cloudflare Runtime

The Cloudflare runtime (@helix-agents/runtime-cloudflare) executes agents on Cloudflare's edge network using Workers and Workflows. This provides global low-latency execution with serverless scaling.

When to Use

Good fit:

  • Global edge deployment with low latency
  • Serverless architecture (no servers to manage)
  • Existing Cloudflare infrastructure (Workers, D1, DO)
  • Cost optimization with pay-per-request
  • Applications already using Cloudflare

Not ideal for:

  • Long-running agents (> 15 minutes)
  • Workflows requiring complex orchestration
  • Heavy computation workloads
  • Development without Cloudflare account

Prerequisites

  • Cloudflare account with Workers Paid plan
  • Wrangler CLI: npm install -g wrangler
  • D1 database for state storage
  • Durable Objects for streaming

Installation

bash
npm install @helix-agents/runtime-cloudflare @helix-agents/store-cloudflare

Architecture

┌──────────────────────────────────────────────────────────────┐
│                     Edge Location (Global)                    │
│  ┌────────────────────────────────────────────────────────┐  │
│  │ Cloudflare Worker                                       │  │
│  │   - HTTP endpoints                                      │  │
│  │   - CloudflareAgentExecutor                            │  │
│  │   - Starts Workflows                                    │  │
│  └────────────────────────────────────────────────────────┘  │
│                            │                                  │
│                            ▼                                  │
│  ┌────────────────────────────────────────────────────────┐  │
│  │ Cloudflare Workflow                                     │  │
│  │   - Agent execution steps                               │  │
│  │   - LLM calls                                           │  │
│  │   - Tool execution                                      │  │
│  └────────────────────────────────────────────────────────┘  │
│                            │                                  │
│              ┌─────────────┴─────────────┐                   │
│              ▼                           ▼                   │
│  ┌─────────────────────┐   ┌─────────────────────────────┐  │
│  │ D1 Database         │   │ Durable Object               │  │
│  │   - Agent state     │   │   - Stream events            │  │
│  │   - Messages        │   │   - Real-time streaming      │  │
│  └─────────────────────┘   └─────────────────────────────┘  │
└──────────────────────────────────────────────────────────────┘

Setup Guide

1. Configure wrangler.toml

toml
name = "agent-worker"
main = "src/index.ts"
compatibility_date = "2024-01-01"

# D1 Database for state
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"

# Durable Object for streaming
[durable_objects]
bindings = [
  { name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]

[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]

# Workflow binding
[[workflows]]
name = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"

2. Create D1 Migrations

sql
-- migrations/0001_init.sql
CREATE TABLE IF NOT EXISTS agent_state (
  run_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  stream_id TEXT NOT NULL,
  status TEXT NOT NULL,
  step_count INTEGER DEFAULT 0,
  custom_state TEXT,
  output TEXT,
  error TEXT,
  parent_agent_id TEXT,
  aborted INTEGER DEFAULT 0,
  abort_reason TEXT,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);

CREATE TABLE IF NOT EXISTS messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  run_id TEXT NOT NULL,
  role TEXT NOT NULL,
  content TEXT,
  tool_calls TEXT,
  tool_call_id TEXT,
  tool_name TEXT,
  thinking TEXT,
  sequence INTEGER NOT NULL,
  created_at INTEGER NOT NULL,
  FOREIGN KEY (run_id) REFERENCES agent_state(run_id)
);

CREATE INDEX idx_messages_run_id ON messages(run_id);

Run migrations:

bash
wrangler d1 migrations apply agent-state

3. Create the Durable Object

typescript
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';

export class StreamManagerDO extends DurableObject {
  private chunks: Map<string, StreamChunk[]> = new Map();

  async write(streamId: string, chunk: StreamChunk): Promise<void> {
    const chunks = this.chunks.get(streamId) ?? [];
    chunks.push(chunk);
    this.chunks.set(streamId, chunks);

    // Notify connected clients via WebSocket
    this.ctx.getWebSockets().forEach((ws) => {
      ws.send(JSON.stringify(chunk));
    });
  }

  async read(streamId: string, fromOffset: number): Promise<StreamChunk[]> {
    const chunks = this.chunks.get(streamId) ?? [];
    return chunks.slice(fromOffset);
  }

  async fetch(request: Request): Promise<Response> {
    // WebSocket upgrade for real-time streaming
    if (request.headers.get('Upgrade') === 'websocket') {
      const [client, server] = Object.values(new WebSocketPair());
      this.ctx.acceptWebSocket(server);
      return new Response(null, { status: 101, webSocket: client });
    }
    return new Response('Expected WebSocket', { status: 400 });
  }
}

4. Create the Workflow

typescript
// src/workflows/agent-workflow.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { executeWorkflowStep } from '@helix-agents/runtime-cloudflare';

export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
  async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
    const { runId, agentType, message, streamId } = event.payload;

    // Get agent from registry
    const registry = new AgentRegistry();
    const agent = registry.get(agentType);

    // Initialize or load state
    let state = await step.do('load-state', async () => {
      return this.env.DB.prepare('SELECT * FROM agent_state WHERE run_id = ?').bind(runId).first();
    });

    if (!state) {
      state = await step.do('init-state', async () => {
        // Initialize new agent state
        return initializeState(agent, runId, streamId, message);
      });
    }

    // Main execution loop
    while (state.status === 'running') {
      // Check for abort
      const aborted = await step.do(`check-abort-${state.step_count}`, async () => {
        const row = await this.env.DB.prepare('SELECT aborted FROM agent_state WHERE run_id = ?')
          .bind(runId)
          .first();
        return row?.aborted === 1;
      });

      if (aborted) {
        state.status = 'failed';
        state.error = 'Aborted by user';
        break;
      }

      // Execute one step
      state = await step.do(`step-${state.step_count}`, async () => {
        return executeWorkflowStep(agent, state, this.env.DB, this.env.STREAM_MANAGER, this.env);
      });
    }

    // Save final state
    await step.do('save-final', async () => {
      await saveState(this.env.DB, state);
    });

    return {
      status: state.status,
      output: state.output,
      error: state.error,
    };
  }
}

5. Create the Worker Entry

typescript
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
import { AgentWorkflow } from './workflows/agent-workflow';
import { StreamManagerDO } from './stream-manager-do';

export { AgentWorkflow, StreamManagerDO };

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Create executor
    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore: new D1StateStore(env.DB),
      streamManager: new DOStreamManager(env.STREAM_MANAGER),
    });

    // POST /agent/execute - Start new execution
    if (url.pathname === '/agent/execute' && request.method === 'POST') {
      const { agentType, message } = await request.json();
      const agent = registry.get(agentType);

      const handle = await executor.execute(agent, message);

      return Response.json({
        runId: handle.runId,
        streamUrl: `/agent/stream/${handle.runId}`,
      });
    }

    // GET /agent/stream/:runId - SSE stream
    if (url.pathname.startsWith('/agent/stream/')) {
      const runId = url.pathname.split('/').pop();
      const handle = await executor.getHandle(registry.get('default'), runId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const stream = await handle.stream();
      return new Response(
        new ReadableStream({
          async start(controller) {
            for await (const chunk of stream) {
              controller.enqueue(`data: ${JSON.stringify(chunk)}\n\n`);
            }
            controller.close();
          },
        }),
        {
          headers: {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
          },
        }
      );
    }

    // GET /agent/result/:runId - Get result
    if (url.pathname.startsWith('/agent/result/')) {
      const runId = url.pathname.split('/').pop();
      const handle = await executor.getHandle(registry.get('default'), runId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const result = await handle.result();
      return Response.json(result);
    }

    return new Response('Not found', { status: 404 });
  },
};

Agent Registry

Register agents so the workflow can instantiate them:

typescript
// src/registry.ts
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { ResearchAgent, AnalyzerAgent } from './agents';

export const registry = new AgentRegistry();
registry.register(ResearchAgent);
registry.register(AnalyzerAgent);

Executor API

Creating the Executor

typescript
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';

const executor = new CloudflareAgentExecutor({
  workflowBinding: env.AGENT_WORKFLOW, // From wrangler.toml
  stateStore: d1StateStore,
  streamManager: doStreamManager,
});

Executing Agents

typescript
const handle = await executor.execute(MyAgent, 'Research quantum computing', {
  runId: 'custom-id', // Optional
});

Getting Handles

typescript
const handle = await executor.getHandle(MyAgent, runId);

if (handle) {
  const result = await handle.result();
  console.log(result);
}

Streaming

Server-Sent Events

typescript
// Worker endpoint
if (url.pathname.startsWith('/stream/')) {
  const stream = await handle.stream();

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
        }
        controller.close();
      },
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        Connection: 'keep-alive',
      },
    }
  );
}

WebSocket (via Durable Objects)

typescript
// Client connects to DO WebSocket
const ws = new WebSocket(`wss://your-worker.workers.dev/ws/${streamId}`);

ws.onmessage = (event) => {
  const chunk = JSON.parse(event.data);
  handleChunk(chunk);
};

Sub-Agent Handling

Sub-agents execute as nested workflow calls:

typescript
// In workflow step
const subAgentResult = await step.do('sub-agent-call', async () => {
  // Start child workflow
  const instance = await this.env.AGENT_WORKFLOW.create({
    id: `agent__${subAgentType}__${subRunId}`,
    params: {
      agentType: subAgentType,
      runId: subRunId,
      streamId: parentStreamId, // Same stream
      message: inputMessage,
      parentAgentId: parentRunId,
    },
  });

  // Wait for completion
  return pollUntilComplete(instance);
});

Workflow Steps

Cloudflare Workflows use steps for durability:

typescript
// Each step is durable - if worker restarts, execution continues
state = await step.do('step-1', async () => {
  // LLM call
  return await callLLM(messages, tools);
});

state = await step.do('step-2', async () => {
  // Tool execution
  return await executeTools(toolCalls);
});

Key points:

  • Steps are atomic and retried on failure
  • State between steps is persisted
  • Worker can restart between steps without data loss

Abort Handling

typescript
// Set abort flag in D1
await handle.abort('User cancelled');

// In workflow, check abort flag each step
const aborted = await step.do('check-abort', async () => {
  const row = await env.DB.prepare('SELECT aborted FROM agent_state WHERE run_id = ?')
    .bind(runId)
    .first();
  return row?.aborted === 1;
});

if (aborted) {
  return { status: 'failed', error: 'Aborted' };
}

Deployment

Development

bash
wrangler dev

Production

bash
wrangler deploy

Secrets

bash
wrangler secret put OPENAI_API_KEY

Access in code:

typescript
const apiKey = env.OPENAI_API_KEY;

D1 State Store

The Cloudflare runtime uses D1 for state:

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

const stateStore = new D1StateStore(env.DB);

// Save state
await stateStore.save(state);

// Load state
const state = await stateStore.load(runId);

// Append messages
await stateStore.appendMessages(runId, messages);

Limitations

Workflow Duration

Cloudflare Workflows have time limits:

  • Individual steps: 15 minutes
  • Total workflow: varies by plan

For longer agents, implement checkpointing.

Cold Starts

Edge workers may have cold starts. Minimize initialization code.

D1 Limitations

  • Single region (replication to read replicas)
  • Write throughput limits
  • No full-text search

Durable Object Limits

  • Single instance per ID
  • Memory limits per instance
  • Geographic pinning

Best Practices

1. Efficient Steps

Group related operations in single steps:

typescript
// Good: One step for LLM + response processing
await step.do('llm-step', async () => {
  const response = await callLLM(...);
  const parsed = processResponse(response);
  await saveToD1(parsed);
  return parsed;
});

// Avoid: Separate steps for each operation (more overhead)

2. Handle Rate Limits

Cloudflare has request limits. Implement backoff:

typescript
async function withRetry<T>(fn: () => Promise<T>): Promise<T> {
  for (let attempt = 0; attempt < 3; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (error.message.includes('rate limit')) {
        await sleep(Math.pow(2, attempt) * 1000);
        continue;
      }
      throw error;
    }
  }
  throw new Error('Max retries exceeded');
}

3. Optimize D1 Queries

Use indexes and prepared statements:

typescript
// Good: Prepared statement with index
const state = await env.DB.prepare('SELECT * FROM agent_state WHERE run_id = ?')
  .bind(runId)
  .first();

// Avoid: String interpolation, full scans

4. Stream Efficiently

Buffer small chunks before sending:

typescript
const buffer: StreamChunk[] = [];
const BATCH_SIZE = 10;

async function flushBuffer() {
  if (buffer.length > 0) {
    await streamManager.writeBatch(streamId, buffer);
    buffer.length = 0;
  }
}

for await (const chunk of stream) {
  buffer.push(chunk);
  if (buffer.length >= BATCH_SIZE) {
    await flushBuffer();
  }
}
await flushBuffer();

Next Steps

Released under the MIT License.