Research Assistant (Cloudflare Runtime)
This example demonstrates a research assistant deployed on Cloudflare's edge infrastructure. It shows:
- Cloudflare Workflows for durable execution
- D1 database for state persistence
- Durable Objects for real-time streaming
- HTTP API endpoints
- Edge deployment
Source Code
The full example is in examples/research-assistant-cloudflare/.
Prerequisites
- Node.js 18+
- Cloudflare account (free tier works)
- Wrangler CLI (
npm install -g wrangler) - OpenAI API key
Project Structure
examples/research-assistant-cloudflare/
├── src/
│ ├── worker.ts # Worker entry point + workflow
│ ├── agent.ts # Agent definition
│ ├── types.ts # State, output, and Env types
│ ├── tools/
│ │ ├── web-search.ts
│ │ ├── take-notes.ts
│ │ └── summarize.ts
│ └── agents/
│ └── summarizer.ts # Sub-agent
├── wrangler.toml # Cloudflare configuration
├── migrations/ # D1 schema migrations
└── package.jsonRunning the Example
1. Setup Wrangler
cd examples/research-assistant-cloudflare
npm install
# Login to Cloudflare
npx wrangler login2. Create D1 Database
# Create the database
npx wrangler d1 create helix-agents-db
# Note the database_id returned, update wrangler.tomlUpdate wrangler.toml with the database ID:
[[d1_databases]]
binding = "AGENT_DB"
database_name = "helix-agents-db"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" # Your ID here3. Run Migrations
# Create the tables
npx wrangler d1 execute helix-agents-db --file=./migrations/0001_init.sql4. Set API Key
# Create .dev.vars for local development
echo "OPENAI_API_KEY=sk-xxx" > .dev.vars
# For production, use secrets
npx wrangler secret put OPENAI_API_KEY5. Start Development Server
npm run dev
# Server starts at http://localhost:87876. Test the API
# Start research
curl -X POST http://localhost:8787/agent/run \
-H "Content-Type: application/json" \
-d '{"message": "Research TypeScript benefits"}'
# Response: { "runId": "xxx", "message": "Research started" }
# Check status
curl http://localhost:8787/agent/{runId}/status
# Stream events (SSE)
curl http://localhost:8787/agent/{runId}/stream
# Get final result
curl http://localhost:8787/agent/{runId}/resultKey Components
Environment Bindings
Define Cloudflare bindings in types.ts:
// src/types.ts
import type { AgentWorkflowBinding } from '@helix-agents/runtime-cloudflare';
export interface Env {
/** D1 database for state persistence */
AGENT_DB: D1Database;
/** Durable Object namespace for streaming */
STREAMS: DurableObjectNamespace;
/** Workflow binding for agent execution */
AGENT_WORKFLOW: AgentWorkflowBinding;
/** OpenAI API key */
OPENAI_API_KEY: string;
}Wrangler Configuration
Configure bindings in wrangler.toml:
name = "research-assistant-cloudflare"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]
# D1 Database for state persistence
[[d1_databases]]
binding = "AGENT_DB"
database_name = "helix-agents-db"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# Durable Object for real-time streaming
[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamDurableObject"
# Cloudflare Workflow for durable execution
[[workflows]]
name = "agent-workflow"
binding = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"
# Durable Object migrations
[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamDurableObject"]Worker Entry Point
The worker handles HTTP requests and defines the workflow:
// src/worker.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';
import {
runAgentWorkflow,
AgentRegistry,
CloudflareAgentExecutor,
} from '@helix-agents/runtime-cloudflare';
import { createCloudflareStore, StreamDurableObject } from '@helix-agents/store-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { ResearchAssistantAgent } from './agent.js';
import type { Env } from './types.js';
// Re-export Durable Object (required by Cloudflare)
export { StreamDurableObject };
// Agent registry
const registry = new AgentRegistry();
registry.register(ResearchAssistantAgent);
// Cloudflare Workflow class
export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
async run(
event: WorkflowEvent<AgentWorkflowInput>,
step: WorkflowStep
): Promise<ReturnType<typeof runAgentWorkflow>> {
const { stateStore, streamManager } = createCloudflareStore({
db: this.env.AGENT_DB,
streams: this.env.STREAMS,
});
const llmAdapter = new VercelAIAdapter();
return runAgentWorkflow(event, step, {
stateStore,
streamManager,
llmAdapter,
registry,
workflowBinding: this.env.AGENT_WORKFLOW,
});
}
}
// HTTP handler
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const url = new URL(request.url);
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB,
streams: env.STREAMS,
});
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore,
streamManager,
});
// POST /agent/run - Start new research
if (url.pathname === '/agent/run' && request.method === 'POST') {
const body = await request.json();
const handle = await executor.execute(ResearchAssistantAgent, {
message: body.message,
state: body.state,
});
return Response.json({ runId: handle.runId });
}
// GET /agent/:runId/status
const statusMatch = url.pathname.match(/^\/agent\/([^/]+)\/status$/);
if (statusMatch && request.method === 'GET') {
const runId = statusMatch[1];
const state = await stateStore.load(runId);
return Response.json(state);
}
// GET /agent/:runId/stream - SSE streaming
const streamMatch = url.pathname.match(/^\/agent\/([^/]+)\/stream$/);
if (streamMatch && request.method === 'GET') {
const runId = streamMatch[1];
const state = await stateStore.load(runId);
const reader = await streamManager.createReader(state.streamId);
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
const encoder = new TextEncoder();
ctx.waitUntil(
(async () => {
for await (const chunk of reader) {
await writer.write(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
await writer.write(encoder.encode('data: [DONE]\n\n'));
await writer.close();
})()
);
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
});
}
// GET /agent/:runId/result
const resultMatch = url.pathname.match(/^\/agent\/([^/]+)\/result$/);
if (resultMatch && request.method === 'GET') {
const runId = resultMatch[1];
const handle = await executor.getHandle(ResearchAssistantAgent, runId);
const result = await handle.result();
return Response.json(result);
}
return Response.json({ error: 'Not found' }, { status: 404 });
},
};Key points:
StreamDurableObjectmust be re-exported for CloudflarecreateCloudflareStorecreates D1 state store and DO stream managerAgentWorkflowextendsWorkflowEntrypointfor durable executionctx.waitUntilensures streaming completes even after response starts
Database Schema
Create the D1 schema:
-- migrations/0001_init.sql
-- Agent state table
CREATE TABLE IF NOT EXISTS agent_state (
run_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'running',
step_count INTEGER NOT NULL DEFAULT 0,
custom_state TEXT NOT NULL DEFAULT '{}',
messages TEXT NOT NULL DEFAULT '[]',
output TEXT,
error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Indexes for common queries
CREATE INDEX IF NOT EXISTS idx_agent_state_status ON agent_state(status);
CREATE INDEX IF NOT EXISTS idx_agent_state_agent_type ON agent_state(agent_type);
CREATE INDEX IF NOT EXISTS idx_agent_state_created_at ON agent_state(created_at);Deployment
Deploy to Cloudflare
# Deploy
npm run deploy
# View logs
npx wrangler tailEnvironment-Specific Configuration
# Production configuration
[env.production]
vars = { LOG_LEVEL = "warn" }
[env.production.d1_databases]
binding = "AGENT_DB"
database_name = "helix-agents-db-prod"
database_id = "your-production-database-id"Deploy to production:
npx wrangler deploy --env productionCORS Configuration
For frontend integration:
const CORS_HEADERS = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
'Access-Control-Max-Age': '86400',
};
function withCors(response: Response): Response {
const newHeaders = new Headers(response.headers);
for (const [key, value] of Object.entries(CORS_HEADERS)) {
newHeaders.set(key, value);
}
return new Response(response.body, {
status: response.status,
headers: newHeaders,
});
}
// Handle preflight
if (request.method === 'OPTIONS') {
return new Response(null, { status: 204, headers: CORS_HEADERS });
}Sub-Agents
Sub-agents are invoked as separate workflow instances:
// In agent.ts
import { createSubAgentTool, defineAgent } from '@helix-agents/core';
import { SummarizerAgent } from './agents/summarizer.js';
import { z } from 'zod';
// Create a sub-agent tool from an existing agent definition
// The SummarizerAgent must have an outputSchema defined
const summarizeTool = createSubAgentTool(
SummarizerAgent,
z.object({
texts: z.array(z.string()),
}),
{ description: 'Summarize collected information' }
);
export const ResearchAssistantWithSubAgent = defineAgent({
name: 'research-assistant-with-subagent',
tools: [webSearchTool, takeNotesTool, summarizeTool],
// ...
});The runAgentWorkflow function automatically handles sub-agent execution using the workflow binding.
Production Considerations
Rate Limiting
Implement rate limiting using Cloudflare's built-in features or custom logic:
// Check rate limit (example)
const ip = request.headers.get('CF-Connecting-IP');
const rateLimitKey = `rate:${ip}`;
// Use KV or D1 to track request countsAuthentication
Add authentication before processing requests:
const authHeader = request.headers.get('Authorization');
if (!authHeader?.startsWith('Bearer ')) {
return Response.json({ error: 'Unauthorized' }, { status: 401 });
}
// Validate token...Monitoring
Use Cloudflare's analytics and logging:
console.log(
JSON.stringify({
level: 'info',
runId,
agentType: agent.name,
action: 'start',
timestamp: new Date().toISOString(),
})
);Caching
Cache expensive operations using Cloudflare Cache API:
const cache = caches.default;
const cacheKey = new Request(`https://cache/${runId}/result`);
// Check cache
const cached = await cache.match(cacheKey);
if (cached) {
return cached;
}
// Compute and cache
const result = await computeResult();
const response = Response.json(result);
ctx.waitUntil(cache.put(cacheKey, response.clone()));
return response;Limitations
- D1 row size: Maximum 1MB per row (state + messages)
- Worker CPU time: 30 seconds for free tier, 30 minutes for paid
- Workflow duration: Up to 24 hours per workflow
- Durable Object: 128 concurrent connections per DO
Next Steps
- JS Runtime Example - Simpler setup for development
- Temporal Example - Alternative durable runtime
- Cloudflare Runtime Reference - Full API documentation