Skip to content

Research Assistant (Cloudflare Runtime)

This example demonstrates a research assistant deployed on Cloudflare's edge infrastructure. It shows:

  • Cloudflare Workflows for durable execution
  • D1 database for state persistence
  • Durable Objects for real-time streaming
  • HTTP API endpoints
  • Edge deployment

Source Code

The full example is in examples/research-assistant-cloudflare/.

Prerequisites

  • Node.js 18+
  • Cloudflare account (free tier works)
  • Wrangler CLI (npm install -g wrangler)
  • OpenAI API key

Project Structure

examples/research-assistant-cloudflare/
├── src/
│   ├── worker.ts         # Worker entry point + workflow
│   ├── agent.ts          # Agent definition
│   ├── types.ts          # State, output, and Env types
│   ├── tools/
│   │   ├── web-search.ts
│   │   ├── take-notes.ts
│   │   └── summarize.ts
│   └── agents/
│       └── summarizer.ts # Sub-agent
├── wrangler.toml         # Cloudflare configuration
├── migrations/           # D1 schema migrations
└── package.json

Running the Example

1. Setup Wrangler

bash
cd examples/research-assistant-cloudflare
npm install

# Login to Cloudflare
npx wrangler login

2. Create D1 Database

bash
# Create the database
npx wrangler d1 create helix-agents-db

# Note the database_id returned, update wrangler.toml

Update wrangler.toml with the database ID:

toml
[[d1_databases]]
binding = "AGENT_DB"
database_name = "helix-agents-db"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"  # Your ID here

3. Run Migrations

bash
# Create the tables
npx wrangler d1 execute helix-agents-db --file=./migrations/0001_init.sql

4. Set API Key

bash
# Create .dev.vars for local development
echo "OPENAI_API_KEY=sk-xxx" > .dev.vars

# For production, use secrets
npx wrangler secret put OPENAI_API_KEY

5. Start Development Server

bash
npm run dev
# Server starts at http://localhost:8787

6. Test the API

bash
# Start research
curl -X POST http://localhost:8787/agent/run \
  -H "Content-Type: application/json" \
  -d '{"message": "Research TypeScript benefits"}'

# Response: { "runId": "xxx", "message": "Research started" }

# Check status
curl http://localhost:8787/agent/{runId}/status

# Stream events (SSE)
curl http://localhost:8787/agent/{runId}/stream

# Get final result
curl http://localhost:8787/agent/{runId}/result

Key Components

Environment Bindings

Define Cloudflare bindings in types.ts:

typescript
// src/types.ts
import type { AgentWorkflowBinding } from '@helix-agents/runtime-cloudflare';

export interface Env {
  /** D1 database for state persistence */
  AGENT_DB: D1Database;

  /** Durable Object namespace for streaming */
  STREAMS: DurableObjectNamespace;

  /** Workflow binding for agent execution */
  AGENT_WORKFLOW: AgentWorkflowBinding;

  /** OpenAI API key */
  OPENAI_API_KEY: string;
}

Wrangler Configuration

Configure bindings in wrangler.toml:

toml
name = "research-assistant-cloudflare"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]

# D1 Database for state persistence
[[d1_databases]]
binding = "AGENT_DB"
database_name = "helix-agents-db"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

# Durable Object for real-time streaming
[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamDurableObject"

# Cloudflare Workflow for durable execution
[[workflows]]
name = "agent-workflow"
binding = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"

# Durable Object migrations
[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamDurableObject"]

Worker Entry Point

The worker handles HTTP requests and defines the workflow:

typescript
// src/worker.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';
import {
  runAgentWorkflow,
  AgentRegistry,
  CloudflareAgentExecutor,
} from '@helix-agents/runtime-cloudflare';
import { createCloudflareStore, StreamDurableObject } from '@helix-agents/store-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { ResearchAssistantAgent } from './agent.js';
import type { Env } from './types.js';

// Re-export Durable Object (required by Cloudflare)
export { StreamDurableObject };

// Agent registry
const registry = new AgentRegistry();
registry.register(ResearchAssistantAgent);

// Cloudflare Workflow class
export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
  async run(
    event: WorkflowEvent<AgentWorkflowInput>,
    step: WorkflowStep
  ): Promise<ReturnType<typeof runAgentWorkflow>> {
    const { stateStore, streamManager } = createCloudflareStore({
      db: this.env.AGENT_DB,
      streams: this.env.STREAMS,
    });
    const llmAdapter = new VercelAIAdapter();

    return runAgentWorkflow(event, step, {
      stateStore,
      streamManager,
      llmAdapter,
      registry,
      workflowBinding: this.env.AGENT_WORKFLOW,
    });
  }
}

// HTTP handler
export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
    const url = new URL(request.url);

    const { stateStore, streamManager } = createCloudflareStore({
      db: env.AGENT_DB,
      streams: env.STREAMS,
    });

    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore,
      streamManager,
    });

    // POST /agent/run - Start new research
    if (url.pathname === '/agent/run' && request.method === 'POST') {
      const body = await request.json();
      const handle = await executor.execute(ResearchAssistantAgent, {
        message: body.message,
        state: body.state,
      });
      return Response.json({ runId: handle.runId });
    }

    // GET /agent/:runId/status
    const statusMatch = url.pathname.match(/^\/agent\/([^/]+)\/status$/);
    if (statusMatch && request.method === 'GET') {
      const runId = statusMatch[1];
      const state = await stateStore.load(runId);
      return Response.json(state);
    }

    // GET /agent/:runId/stream - SSE streaming
    const streamMatch = url.pathname.match(/^\/agent\/([^/]+)\/stream$/);
    if (streamMatch && request.method === 'GET') {
      const runId = streamMatch[1];
      const state = await stateStore.load(runId);
      const reader = await streamManager.createReader(state.streamId);

      const { readable, writable } = new TransformStream();
      const writer = writable.getWriter();
      const encoder = new TextEncoder();

      ctx.waitUntil(
        (async () => {
          for await (const chunk of reader) {
            await writer.write(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
          }
          await writer.write(encoder.encode('data: [DONE]\n\n'));
          await writer.close();
        })()
      );

      return new Response(readable, {
        headers: {
          'Content-Type': 'text/event-stream',
          'Cache-Control': 'no-cache',
        },
      });
    }

    // GET /agent/:runId/result
    const resultMatch = url.pathname.match(/^\/agent\/([^/]+)\/result$/);
    if (resultMatch && request.method === 'GET') {
      const runId = resultMatch[1];
      const handle = await executor.getHandle(ResearchAssistantAgent, runId);
      const result = await handle.result();
      return Response.json(result);
    }

    return Response.json({ error: 'Not found' }, { status: 404 });
  },
};

Key points:

  • StreamDurableObject must be re-exported for Cloudflare
  • createCloudflareStore creates D1 state store and DO stream manager
  • AgentWorkflow extends WorkflowEntrypoint for durable execution
  • ctx.waitUntil ensures streaming completes even after response starts

Database Schema

Create the D1 schema:

sql
-- migrations/0001_init.sql

-- Agent state table
CREATE TABLE IF NOT EXISTS agent_state (
  run_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'running',
  step_count INTEGER NOT NULL DEFAULT 0,
  custom_state TEXT NOT NULL DEFAULT '{}',
  messages TEXT NOT NULL DEFAULT '[]',
  output TEXT,
  error TEXT,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

-- Indexes for common queries
CREATE INDEX IF NOT EXISTS idx_agent_state_status ON agent_state(status);
CREATE INDEX IF NOT EXISTS idx_agent_state_agent_type ON agent_state(agent_type);
CREATE INDEX IF NOT EXISTS idx_agent_state_created_at ON agent_state(created_at);

Deployment

Deploy to Cloudflare

bash
# Deploy
npm run deploy

# View logs
npx wrangler tail

Environment-Specific Configuration

toml
# Production configuration
[env.production]
vars = { LOG_LEVEL = "warn" }

[env.production.d1_databases]
binding = "AGENT_DB"
database_name = "helix-agents-db-prod"
database_id = "your-production-database-id"

Deploy to production:

bash
npx wrangler deploy --env production

CORS Configuration

For frontend integration:

typescript
const CORS_HEADERS = {
  'Access-Control-Allow-Origin': '*',
  'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
  'Access-Control-Allow-Headers': 'Content-Type, Authorization',
  'Access-Control-Max-Age': '86400',
};

function withCors(response: Response): Response {
  const newHeaders = new Headers(response.headers);
  for (const [key, value] of Object.entries(CORS_HEADERS)) {
    newHeaders.set(key, value);
  }
  return new Response(response.body, {
    status: response.status,
    headers: newHeaders,
  });
}

// Handle preflight
if (request.method === 'OPTIONS') {
  return new Response(null, { status: 204, headers: CORS_HEADERS });
}

Sub-Agents

Sub-agents are invoked as separate workflow instances:

typescript
// In agent.ts
import { createSubAgentTool, defineAgent } from '@helix-agents/core';
import { SummarizerAgent } from './agents/summarizer.js';
import { z } from 'zod';

// Create a sub-agent tool from an existing agent definition
// The SummarizerAgent must have an outputSchema defined
const summarizeTool = createSubAgentTool(
  SummarizerAgent,
  z.object({
    texts: z.array(z.string()),
  }),
  { description: 'Summarize collected information' }
);

export const ResearchAssistantWithSubAgent = defineAgent({
  name: 'research-assistant-with-subagent',
  tools: [webSearchTool, takeNotesTool, summarizeTool],
  // ...
});

The runAgentWorkflow function automatically handles sub-agent execution using the workflow binding.

Production Considerations

Rate Limiting

Implement rate limiting using Cloudflare's built-in features or custom logic:

typescript
// Check rate limit (example)
const ip = request.headers.get('CF-Connecting-IP');
const rateLimitKey = `rate:${ip}`;
// Use KV or D1 to track request counts

Authentication

Add authentication before processing requests:

typescript
const authHeader = request.headers.get('Authorization');
if (!authHeader?.startsWith('Bearer ')) {
  return Response.json({ error: 'Unauthorized' }, { status: 401 });
}
// Validate token...

Monitoring

Use Cloudflare's analytics and logging:

typescript
console.log(
  JSON.stringify({
    level: 'info',
    runId,
    agentType: agent.name,
    action: 'start',
    timestamp: new Date().toISOString(),
  })
);

Caching

Cache expensive operations using Cloudflare Cache API:

typescript
const cache = caches.default;
const cacheKey = new Request(`https://cache/${runId}/result`);

// Check cache
const cached = await cache.match(cacheKey);
if (cached) {
  return cached;
}

// Compute and cache
const result = await computeResult();
const response = Response.json(result);
ctx.waitUntil(cache.put(cacheKey, response.clone()));
return response;

Limitations

  • D1 row size: Maximum 1MB per row (state + messages)
  • Worker CPU time: 30 seconds for free tier, 30 minutes for paid
  • Workflow duration: Up to 24 hours per workflow
  • Durable Object: 128 concurrent connections per DO

Next Steps

Released under the MIT License.