Skip to content

Cloudflare Workflows Runtime

The Cloudflare Workflows runtime executes agents using Cloudflare Workflows for durable execution with D1 for state storage and separate Durable Objects for stream management.

When to Use Workflows

Choose the Workflows runtime when you need step-level durability, automatic retries, or want to share D1 state with other services. If your agents require heavy streaming (>100 chunks), consider the Durable Objects runtime instead.

Architecture

mermaid
graph TB
    subgraph Edge ["Edge Location (Global)"]
        subgraph Worker ["Cloudflare Worker"]
            W1["HTTP endpoints<br/>CloudflareAgentExecutor<br/>Starts Workflows"]
        end

        Worker --> Workflow

        subgraph Workflow ["Cloudflare Workflow"]
            WF1["Agent execution steps<br/>LLM calls<br/>Tool execution"]
        end

        Workflow --> D1
        Workflow --> DO

        D1["<b>D1 Database</b><br/>Agent state<br/>Messages"]
        DO["<b>Durable Object</b><br/>Stream events<br/>Real-time streaming"]
    end

Prerequisites

  • Cloudflare account with Workers Paid plan
  • Wrangler CLI: npm install -g wrangler
  • D1 database for state storage
  • Durable Objects for streaming

Installation

bash
npm install @helix-agents/runtime-cloudflare @helix-agents/store-cloudflare

Setup Guide

1. Configure wrangler.toml

toml
name = "agent-worker"
main = "src/index.ts"
compatibility_date = "2024-01-01"

# D1 Database for state
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"

# Durable Object for streaming
[durable_objects]
bindings = [
  { name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]

[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]

# Workflow binding
[[workflows]]
name = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"

2. Use Programmatic Migrations

The D1StateStore uses programmatic migrations that are automatically applied. Call runMigration() on startup:

typescript
import { runMigration } from '@helix-agents/store-cloudflare';

// Run migrations on startup (safe to call every request - no-op if already migrated)
await runMigration(env.DB);

The framework creates tables automatically with session-centric naming (all prefixed with __agents_):

sql
-- Core state table
CREATE TABLE __agents_states (
  session_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  stream_id TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  step_count INTEGER DEFAULT 0,
  custom_state TEXT,
  output TEXT,
  error TEXT,
  parent_session_id TEXT,
  aborted INTEGER DEFAULT 0,
  abort_reason TEXT,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);

-- Messages table (separated for O(1) append)
CREATE TABLE __agents_messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  session_id TEXT NOT NULL,
  sequence INTEGER NOT NULL,
  message TEXT NOT NULL,
  created_at INTEGER NOT NULL,
  UNIQUE(session_id, sequence)
);

3. Create the Durable Object

typescript
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';

export class StreamManagerDO extends DurableObject {
  private chunks: Map<string, StreamChunk[]> = new Map();

  async write(streamId: string, chunk: StreamChunk): Promise<void> {
    const chunks = this.chunks.get(streamId) ?? [];
    chunks.push(chunk);
    this.chunks.set(streamId, chunks);

    // Notify connected clients via WebSocket
    this.ctx.getWebSockets().forEach((ws) => {
      ws.send(JSON.stringify(chunk));
    });
  }

  async read(streamId: string, fromOffset: number): Promise<StreamChunk[]> {
    const chunks = this.chunks.get(streamId) ?? [];
    return chunks.slice(fromOffset);
  }

  async fetch(request: Request): Promise<Response> {
    // WebSocket upgrade for real-time streaming
    if (request.headers.get('Upgrade') === 'websocket') {
      const [client, server] = Object.values(new WebSocketPair());
      this.ctx.acceptWebSocket(server);
      return new Response(null, { status: 101, webSocket: client });
    }
    return new Response('Expected WebSocket', { status: 400 });
  }
}

4. Create the Workflow

typescript
// src/workflows/agent-workflow.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { executeWorkflowStep } from '@helix-agents/runtime-cloudflare';

export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
  async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
    const { sessionId, agentType, message, streamId } = event.payload;

    // Get agent from registry
    const registry = new AgentRegistry();
    const agent = registry.get(agentType);

    // Initialize or load state
    let state = await step.do('load-state', async () => {
      return this.env.DB.prepare('SELECT * FROM __agents_states WHERE session_id = ?').bind(sessionId).first();
    });

    if (!state) {
      state = await step.do('init-state', async () => {
        // Initialize new agent state
        return initializeState(agent, sessionId, streamId, message);
      });
    }

    // Main execution loop
    while (state.status === 'running') {
      // Check for abort
      const aborted = await step.do(`check-abort-${state.step_count}`, async () => {
        const row = await this.env.DB.prepare('SELECT aborted FROM __agents_states WHERE session_id = ?')
          .bind(sessionId)
          .first();
        return row?.aborted === 1;
      });

      if (aborted) {
        state.status = 'failed';
        state.error = 'Aborted by user';
        break;
      }

      // Execute one step
      state = await step.do(`step-${state.step_count}`, async () => {
        return executeWorkflowStep(agent, state, this.env.DB, this.env.STREAM_MANAGER, this.env);
      });
    }

    // Save final state
    await step.do('save-final', async () => {
      await saveState(this.env.DB, state);
    });

    return {
      status: state.status,
      output: state.output,
      error: state.error,
    };
  }
}

5. Create the Worker Entry

typescript
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
import { AgentWorkflow } from './workflows/agent-workflow';
import { StreamManagerDO } from './stream-manager-do';

export { AgentWorkflow, StreamManagerDO };

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Create executor
    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore: new D1StateStore(env.DB),
      streamManager: new DOStreamManager(env.STREAM_MANAGER),
    });

    // POST /agent/execute - Start new execution
    if (url.pathname === '/agent/execute' && request.method === 'POST') {
      const { agentType, message, sessionId } = await request.json();
      const agent = registry.get(agentType);

      const handle = await executor.execute(agent, { message }, { sessionId });

      return Response.json({
        sessionId: handle.sessionId,
        streamUrl: `/agent/stream/${handle.sessionId}`,
      });
    }

    // GET /agent/stream/:sessionId - SSE stream
    if (url.pathname.startsWith('/agent/stream/')) {
      const sessionId = url.pathname.split('/').pop();
      const handle = await executor.getHandle(registry.get('default'), sessionId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const stream = await handle.stream();
      return new Response(
        new ReadableStream({
          async start(controller) {
            for await (const chunk of stream) {
              controller.enqueue(`data: ${JSON.stringify(chunk)}\n\n`);
            }
            controller.close();
          },
        }),
        {
          headers: {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
          },
        }
      );
    }

    // GET /agent/result/:sessionId - Get result
    if (url.pathname.startsWith('/agent/result/')) {
      const sessionId = url.pathname.split('/').pop();
      const handle = await executor.getHandle(registry.get('default'), sessionId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const result = await handle.result();
      return Response.json(result);
    }

    return new Response('Not found', { status: 404 });
  },
};

Agent Registry

Register agents so the workflow can instantiate them:

typescript
// src/registry.ts
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { ResearchAgent, AnalyzerAgent } from './agents';

export const registry = new AgentRegistry();
registry.register(ResearchAgent);
registry.register(AnalyzerAgent);

Executor API

Creating the Executor

typescript
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';

const executor = new CloudflareAgentExecutor({
  workflowBinding: env.AGENT_WORKFLOW, // From wrangler.toml
  stateStore: d1StateStore,
  streamManager: doStreamManager,
});

Executing Agents

typescript
const handle = await executor.execute(MyAgent, { message: 'Research quantum computing' }, {
  sessionId: 'custom-session-id', // Optional
});

Getting Handles

typescript
const handle = await executor.getHandle(MyAgent, sessionId);

if (handle) {
  const result = await handle.result();
  console.log(result);
}

Multi-Turn Conversations

The Workflows runtime supports the same multi-turn conversation API as the JS runtime using the session-centric model:

Using sessionId

typescript
// First message - creates a new session
const handle1 = await executor.execute(agent, 'Hello, my name is Alice', {
  sessionId: 'session-123',
});
await handle1.result();

// Continue the conversation - same sessionId
const handle2 = await executor.execute(agent, 'What is my name?', {
  sessionId: 'session-123',
});

Using handle.send()

typescript
const handle1 = await executor.execute(agent, 'Hello', {
  sessionId: 'session-123',
});
await handle1.result();

const handle2 = await handle1.send('Tell me more');

Using Direct Messages

typescript
const handle = await executor.execute(agent, {
  message: 'Continue from here',
  messages: myExternalMessageHistory,
});

Behavior Table

InputMessages SourceState Source
message only (new session)Empty (fresh)Empty (fresh)
message + sessionId (existing)From sessionFrom session
message + messagesFrom messagesEmpty (fresh)
message + stateEmpty (fresh)From state
message + sessionId + messagesFrom messages (override)From session
message + sessionId + stateFrom sessionFrom state (override)
All fourFrom messages (override)From state (override)

See JS Runtime - Multi-Turn Conversations for detailed documentation.

Streaming

Server-Sent Events

typescript
// Worker endpoint
if (url.pathname.startsWith('/stream/')) {
  const stream = await handle.stream();

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
        }
        controller.close();
      },
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        Connection: 'keep-alive',
      },
    }
  );
}

WebSocket (via Durable Objects)

typescript
// Client connects to DO WebSocket
const ws = new WebSocket(`wss://your-worker.workers.dev/ws/${streamId}`);

ws.onmessage = (event) => {
  const chunk = JSON.parse(event.data);
  handleChunk(chunk);
};

Sub-Agent Handling

Sub-agents execute as nested workflow calls:

typescript
// In workflow step
const subAgentResult = await step.do('sub-agent-call', async () => {
  // Start child workflow
  const instance = await this.env.AGENT_WORKFLOW.create({
    id: `agent__${subAgentType}__${subSessionId}`,
    params: {
      agentType: subAgentType,
      sessionId: subSessionId,
      streamId: parentStreamId, // Same stream
      message: inputMessage,
      parentSessionId: parentSessionId,
    },
  });

  // Wait for completion
  return pollUntilComplete(instance);
});

Workflow Steps

Cloudflare Workflows use steps for durability:

typescript
// Each step is durable - if worker restarts, execution continues
state = await step.do('step-1', async () => {
  // LLM call
  return await callLLM(messages, tools);
});

state = await step.do('step-2', async () => {
  // Tool execution
  return await executeTools(toolCalls);
});

Key points:

  • Steps are atomic and retried on failure
  • State between steps is persisted
  • Worker can restart between steps without data loss

Abort Handling

typescript
// Set abort flag in D1
await handle.abort('User cancelled');

// In workflow, check abort flag each step
const aborted = await step.do('check-abort', async () => {
  const row = await env.DB.prepare('SELECT aborted FROM __agents_states WHERE session_id = ?')
    .bind(sessionId)
    .first();
  return row?.aborted === 1;
});

if (aborted) {
  return { status: 'failed', error: 'Aborted' };
}

Interrupt Handling

Unlike abort (which is a hard stop), interrupt is a soft stop that saves state for later resumption:

typescript
// Interrupt the agent (soft stop)
await handle.interrupt('user_requested');

// Agent status becomes 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'

// Later, resume execution
const { canResume } = await handle.canResume();
if (canResume) {
  const newHandle = await handle.resume();
  const result = await newHandle.result();
}

How It Works

The Cloudflare runtime uses a dual approach for responsive interrupts:

  1. Interrupt flag - Set in D1 via stateStore.setInterruptFlag() for persistence
  2. Interrupt event - Sent via instance.sendEvent() for immediate wake-up

The workflow checks for interrupts at two points:

  • At each step boundary - Before starting a new LLM call
  • During sub-agent waits - Using Promise.race against interrupt events

Sub-Agent Interrupt Propagation

When an agent has running sub-agents, interrupts propagate through the entire hierarchy:

User calls handle.interrupt()


Parent receives interrupt (immediate via event)

    ├──► Child 1: interrupt flag set
    ├──► Child 2: interrupt flag set
    └──► Child 3: interrupt flag set


Each child stops at next safe point


Parent returns { status: 'interrupted' }

Target latency: < 200ms from interrupt request to stopped execution.

See Interrupt and Resume for complete documentation including resume modes and error handling.

Stream Resumption

The Workflows runtime supports stream resumption for handling client disconnections during agent execution. This uses the same FrontendHandler pattern as other runtimes.

Setting Up Stream Resumption

typescript
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    const handler = createFrontendHandler({
      executor,          // CloudflareAgentExecutor
      stateStore: new D1StateStore(env.DB),
      streamManager: new DOStreamManager(env.STREAM_MANAGER),
      agent: myAgent,
    });

    // Handle stream resumption
    if (url.pathname.startsWith('/api/chat/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const fromSequence = parseInt(request.headers.get('Last-Event-ID') ?? '0');

      return handler.handleStream(sessionId, { fromSequence });
    }

    return new Response('Not found', { status: 404 });
  }
};

Frontend Integration

Use HelixChatTransport for automatic resume handling:

typescript
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
import { useChat } from '@ai-sdk/react';

function Chat({ sessionId, snapshot }) {
  const { messages } = useChat({
    id: sessionId,
    initialMessages: snapshot?.messages ?? [],
    resume: snapshot?.status === 'active',
    transport: new HelixChatTransport({
      api: `/api/chat/${sessionId}`,
      resumeFromSequence: snapshot?.streamSequence,
    }),
  });
}

Limitations

Subrequest Limit Impact

Stream resumption in the Workflows runtime is subject to the 1000 subrequest limit. Each stream chunk write to the Durable Object counts as a subrequest. For streaming-heavy agents that require frequent reconnections, consider the Durable Objects runtime which has unlimited streaming.

Key considerations:

  • Chunk storage - Stream chunks are stored in the Durable Object via DOStreamManager
  • Sequence tracking - Each chunk has a sequence number for resumption
  • TTL management - Configure appropriate chunk retention based on session duration

Deployment

Development

bash
wrangler dev

Production

bash
wrangler deploy

Secrets

bash
wrangler secret put OPENAI_API_KEY

Access in code:

typescript
const apiKey = env.OPENAI_API_KEY;

D1 State Store

The Workflows runtime uses D1 for state:

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

const stateStore = new D1StateStore({ database: env.DB });

// Save state
await stateStore.saveState(sessionId, state);

// Load state
const state = await stateStore.loadState(sessionId);

// Append messages
await stateStore.appendMessages(sessionId, messages);

Limitations

Subrequest Limit

Important

Cloudflare Workers have a 1000 subrequest limit per invocation. Each stream chunk write to the Durable Object counts as a subrequest. For streaming-heavy agents, consider the Durable Objects runtime which bypasses this limit.

Workflow Duration

Cloudflare Workflows have time limits:

  • Individual steps: 15 minutes
  • Total workflow: varies by plan

For longer agents, implement checkpointing.

Cold Starts

Edge workers may have cold starts. Minimize initialization code.

D1 Limitations

  • Single region (replication to read replicas)
  • Write throughput limits
  • No full-text search

Durable Object Limits

  • Single instance per ID
  • Memory limits per instance
  • Geographic pinning

Best Practices

1. Efficient Steps

Group related operations in single steps:

typescript
// Good: One step for LLM + response processing
await step.do('llm-step', async () => {
  const response = await callLLM(...);
  const parsed = processResponse(response);
  await saveToD1(parsed);
  return parsed;
});

// Avoid: Separate steps for each operation (more overhead)

2. Handle Rate Limits

Cloudflare has request limits. Implement backoff:

typescript
async function withRetry<T>(fn: () => Promise<T>): Promise<T> {
  for (let attempt = 0; attempt < 3; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (error.message.includes('rate limit')) {
        await sleep(Math.pow(2, attempt) * 1000);
        continue;
      }
      throw error;
    }
  }
  throw new Error('Max retries exceeded');
}

3. Optimize D1 Queries

Use indexes and prepared statements:

typescript
// Good: Prepared statement with index
const state = await env.DB.prepare('SELECT * FROM __agents_states WHERE session_id = ?')
  .bind(sessionId)
  .first();

// Avoid: String interpolation, full scans

4. Stream Efficiently

Buffer small chunks before sending:

typescript
const buffer: StreamChunk[] = [];
const BATCH_SIZE = 10;

async function flushBuffer() {
  if (buffer.length > 0) {
    await streamManager.writeBatch(streamId, buffer);
    buffer.length = 0;
  }
}

for await (const chunk of stream) {
  buffer.push(chunk);
  if (buffer.length >= BATCH_SIZE) {
    await flushBuffer();
  }
}
await flushBuffer();

Next Steps

Released under the MIT License.