Temporal Runtime
The Temporal runtime (@helix-agents/runtime-temporal) executes agents as durable Temporal workflows. This provides crash recovery, automatic retries, and production-grade reliability for long-running agent tasks.
When to Use
Good fit:
- Production workloads requiring reliability
- Long-running agents (hours or days)
- Agents that must survive process restarts
- Complex multi-agent orchestrations
- Operations requiring audit trails and observability
Not ideal for:
- Quick development iteration (infrastructure overhead)
- Simple, short-lived agents
- Cost-sensitive deployments without existing Temporal infrastructure
Prerequisites
You need a running Temporal server:
Option 1: Temporal Cloud (Recommended for production)
# Sign up at https://temporal.io/cloudOption 2: Local development
# Using Docker
docker run -d --name temporal \
-p 7233:7233 -p 8233:8233 \
temporalio/auto-setup:latest
# Or using Temporal CLI
temporal server start-devInstallation
npm install @helix-agents/runtime-temporal @helix-agents/store-redis @temporalio/client @temporalio/workerArchitecture
graph TB
subgraph App ["Your Application"]
Executor["<b>TemporalAgentExecutor</b><br/>Starts workflows · Returns handles<br/>Reads from StateStore/StreamManager"]
end
subgraph Server ["Temporal Server"]
ServerDesc["Persists workflow state<br/>Manages task queues<br/>Handles retries and timeouts"]
end
subgraph Worker ["Temporal Worker"]
subgraph Workflow ["Agent Workflow"]
WF["Orchestrates execution<br/>Calls activities for LLM/tools"]
end
subgraph Activities ["Activities"]
Act["LLM calls · Tool execution<br/>State persistence"]
end
end
App --> Server --> WorkerSetup Guide
1. Create the Workflow
Define a workflow that wraps the agent execution:
// src/workflows/agent-workflow.ts
import { proxyActivities, defineSignal, setHandler } from '@temporalio/workflow';
import type { AgentWorkflowInput, AgentWorkflowResult } from '@helix-agents/runtime-temporal';
import type * as activities from '../activities';
// Proxy activities with timeouts
const { executeAgentStep, saveState, loadState } = proxyActivities<typeof activities>({
startToCloseTimeout: '5 minutes',
retry: {
maximumAttempts: 3,
backoffCoefficient: 2,
},
});
// Abort signal
export const abortSignal = defineSignal('abort');
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
let aborted = false;
setHandler(abortSignal, () => {
aborted = true;
});
try {
// Load or initialize state
let state = await loadState(input.sessionId);
if (!state) {
state = await initializeState(input);
}
// Main execution loop
while (state.status === 'running' && !aborted) {
const stepResult = await executeAgentStep(input.agentType, state);
state = await processStepResult(state, stepResult);
await saveState(state);
}
if (aborted) {
return { status: 'failed', error: 'Workflow aborted' };
}
return {
status: state.status === 'completed' ? 'completed' : 'failed',
output: state.output,
error: state.error,
};
} catch (error) {
return {
status: 'failed',
error: error instanceof Error ? error.message : String(error),
};
}
}2. Create Activities
Activities perform the actual work (LLM calls, tool execution):
// src/activities/agent-activities.ts
import { AgentRegistry, type AgentState } from '@helix-agents/runtime-temporal';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
const stateStore = new RedisStateStore(redisClient);
const streamManager = new RedisStreamManager(redisClient);
const llmAdapter = new VercelAIAdapter();
const registry = new AgentRegistry();
// Register your agents
registry.register(ResearchAgent);
registry.register(AnalyzerAgent);
export async function loadState(sessionId: string): Promise<AgentState<unknown, unknown> | null> {
return stateStore.load(sessionId);
}
export async function saveState(state: AgentState<unknown, unknown>): Promise<void> {
await stateStore.save(state);
}
export async function executeAgentStep(
agentType: string,
state: AgentState<unknown, unknown>
): Promise<StepResult<unknown>> {
const agent = registry.get(agentType);
if (!agent) {
throw new Error(`Unknown agent type: ${agentType}`);
}
// Execute one LLM step
return executeStep(agent, state, llmAdapter, streamManager);
}3. Create the Worker
The worker processes workflows and activities:
// src/worker.ts
import { Worker, NativeConnection } from '@temporalio/worker';
import * as activities from './activities';
async function runWorker() {
const connection = await NativeConnection.connect({
address: process.env.TEMPORAL_ADDRESS ?? 'localhost:7233',
});
const worker = await Worker.create({
connection,
namespace: 'default',
taskQueue: 'agent-tasks',
workflowsPath: require.resolve('./workflows'),
activities,
});
await worker.run();
}
runWorker().catch(console.error);4. Create the Executor
The executor starts workflows and returns handles:
// src/executor.ts
import { Client, Connection } from '@temporalio/client';
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
async function createExecutor() {
const connection = await Connection.connect({
address: process.env.TEMPORAL_ADDRESS ?? 'localhost:7233',
});
const client = new Client({ connection });
// Wrap Temporal client to match interface
const temporalClientAdapter = {
startWorkflow: async (name, options) => {
const handle = await client.workflow.start(name, {
workflowId: options.workflowId,
taskQueue: options.taskQueue,
args: options.args,
});
return wrapHandle(handle);
},
getHandle: (workflowId) => {
return wrapHandle(client.workflow.getHandle(workflowId));
},
};
return new TemporalAgentExecutor({
client: temporalClientAdapter,
stateStore: new RedisStateStore(redis),
streamManager: new RedisStreamManager(redis),
workflowName: 'agentWorkflow',
taskQueue: 'agent-tasks',
});
}Using the Executor
Once set up, usage is identical to other runtimes:
const executor = await createExecutor();
// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research quantum computing');
// Stream events
const stream = await handle.stream();
for await (const chunk of stream) {
console.log(chunk);
}
// Get result
const result = await handle.result();Multi-Turn Conversations
The Temporal runtime supports the same multi-turn conversation API as the JS runtime using the session-centric model:
Using sessionId
// First message - creates a new session
const handle1 = await executor.execute(agent, 'Hello, my name is Alice', {
sessionId: 'session-123',
});
await handle1.result();
// Continue the conversation - same sessionId
const handle2 = await executor.execute(agent, 'What is my name?', {
sessionId: 'session-123',
});Using handle.send()
const handle1 = await executor.execute(agent, 'Hello', {
sessionId: 'session-123',
});
await handle1.result();
const handle2 = await handle1.send('Tell me more');Using Direct Messages
const handle = await executor.execute(agent, {
message: 'Continue from here',
messages: myExternalMessageHistory,
});Behavior Table
| Input | Messages Source | State Source |
|---|---|---|
message only (new session) | Empty (fresh) | Empty (fresh) |
message + sessionId (existing) | From session | From session |
message + messages | From messages | Empty (fresh) |
message + state | Empty (fresh) | From state |
message + sessionId + messages | From messages (override) | From session |
message + sessionId + state | From session | From state (override) |
| All four | From messages (override) | From state (override) |
See JS Runtime - Multi-Turn Conversations for detailed documentation.
Execute vs Resume Semantics
The Temporal runtime provides two methods for running agents with different semantics:
| Method | Use Case | Stream Behavior | Session Behavior |
|---|---|---|---|
execute() | Start new conversation or add new message | Creates new stream segment | Creates or continues session |
resume() | Continue interrupted session | Continues existing stream | Must have interrupted state |
execute() is for normal agent interactions:
- Starts a new Temporal workflow (or continues an existing session)
- Creates a fresh stream segment for the response
- Appends the new user message to the session
// New conversation
const handle = await executor.execute(agent, 'Hello', { sessionId: 'session-1' });
// Follow-up in same session (new run, new stream segment)
const handle2 = await executor.execute(agent, 'Tell me more', { sessionId: 'session-1' });resume() is specifically for recovering from interrupts:
- Continues the existing workflow from where it stopped
- Maintains stream continuity for clients reconnecting
- Session must be in
interruptedorpausedstatus
// After an interrupt...
const state = await stateStore.loadState('session-1');
if (state?.status === 'interrupted') {
// Resume continues the exact same stream
const handle = await executor.resume(agent, 'session-1');
}Agent Registry
Register agents so the worker can instantiate them:
import { AgentRegistry } from '@helix-agents/runtime-temporal';
const registry = new AgentRegistry();
// Register each agent type
registry.register(ResearchAgent); // name: 'researcher'
registry.register(AnalyzerAgent); // name: 'analyzer'
registry.register(SummarizerAgent); // name: 'summarizer'
// In activities, look up by type
export async function executeAgentStep(agentType: string, state) {
const agent = registry.get(agentType); // Returns the agent config
// ...
}AgentRegistry with Factories
The AgentRegistry supports both static registration and factory functions for dynamic agent creation. Factories are useful when agents need runtime dependencies like API keys, database connections, or per-request configuration.
Static vs Factory Registration
import { AgentRegistry, AgentNotFoundError } from '@helix-agents/runtime-temporal';
const registry = new AgentRegistry();
// Static registration - for agents with no runtime dependencies
registry.register(ResearchAgent);
registry.register(SummarizerAgent);
// Factory registration - for agents needing runtime dependencies
interface Env {
TAVILY_API_KEY: string;
DATABASE: Database;
}
registry.registerFactory<Env>('dynamic-researcher', (ctx) => {
return createResearchAgent({
apiKey: ctx.env.TAVILY_API_KEY,
database: ctx.env.DATABASE,
userId: ctx.userId,
sessionId: ctx.sessionId,
});
});AgentFactoryContext
Factory functions receive context about the current execution:
interface AgentFactoryContext<TEnv = unknown> {
/** Environment bindings (e.g., API keys, connections) */
env: TEnv;
/** Current session identifier */
sessionId: string;
/** Current run identifier */
runId: string;
/** Optional user identifier */
userId?: string;
}Using resolve() in Activities
The resolve() method handles both static and factory agents. Use it instead of get() when you have factories:
// src/activities.ts
export async function executeAgentStep(
agentType: string,
state: AgentState<unknown, unknown>,
context: { sessionId: string; runId: string; userId?: string }
): Promise<StepResult<unknown>> {
// resolve() handles both static and factory agents
const agent = registry.resolve(agentType, {
env, // Your environment object
sessionId: context.sessionId,
runId: context.runId,
userId: context.userId,
});
return executeStep(agent, state, llmAdapter, streamManager);
}Error Handling with AgentNotFoundError
When an agent type is not found, AgentNotFoundError provides helpful information:
import { AgentNotFoundError } from '@helix-agents/runtime-temporal';
try {
const agent = registry.resolve('unknown-agent', context);
} catch (error) {
if (error instanceof AgentNotFoundError) {
console.error(`Unknown agent type: ${error.agentType}`);
console.error(`Available types: ${error.availableTypes.join(', ')}`);
}
}Registry Methods
| Method | Description |
|---|---|
register(config) | Register a static agent configuration |
registerFactory(type, factory) | Register a factory function |
resolve(type, context) | Resolve agent (factory-first, then static) |
get(type) | Get static agent only (legacy) |
has(type) | Check if agent type is registered |
unregister(type) | Remove an agent registration |
getRegisteredTypes() | Get all registered agent type names |
clear() | Remove all registrations (for testing) |
Sub-Agent Handling
Sub-agents execute as child workflows:
// In workflow
import { executeChild } from '@temporalio/workflow';
// When parent needs to execute sub-agent
const subAgentResult = await executeChild('agentWorkflow', {
args: [
{
agentType: 'analyzer',
sessionId: `${parentSessionId}-sub-${callId}`,
streamId: parentStreamId, // Same stream for unified streaming
message: inputMessage,
parentSessionId,
},
],
workflowId: `agent__analyzer__${subSessionId}`,
taskQueue: 'agent-tasks',
});Benefits of child workflows:
- Independent retry policies
- Separate timeouts
- Can be cancelled independently
- Full workflow history preserved
Activity Configuration
Configure timeouts and retries per activity:
const { executeAgentStep } = proxyActivities<typeof activities>({
// How long the activity can run
startToCloseTimeout: '10 minutes',
// How long to wait for worker to start processing
scheduleToStartTimeout: '1 minute',
// Heartbeat timeout for long activities
heartbeatTimeout: '30 seconds',
// Retry configuration
retry: {
initialInterval: '1 second',
backoffCoefficient: 2,
maximumInterval: '1 minute',
maximumAttempts: 5,
nonRetryableErrorTypes: ['InvalidAgentError'],
},
});Crash Recovery
Temporal provides automatic crash recovery:
graph TB
W1["Worker 1 starts workflow"]
W1 --> S1["Step 1 completes, state saved"]
S1 --> S2["Step 2 completes, state saved"]
S2 --> Crash["Worker 1 crashes"]
Crash --> Detect["Temporal detects failure"]
Detect --> Resched["Workflow task rescheduled"]
Resched --> W2["Worker 2 picks up"]
W2 --> Replay["Replays history (deterministic)"]
Replay --> Continue["Continues from Step 3"]
Continue --> Complete["Completes normally"]Key points:
- Workflow code must be deterministic
- State is reconstructed from event history
- Activities are not re-executed (results cached)
Interrupt and Resume
The Temporal runtime supports soft interruption with the ability to resume later:
// Interrupt the agent (soft stop)
await handle.interrupt('user_requested');
// Agent status becomes 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'
// Later, resume execution
const { canResume } = await handle.canResume();
if (canResume) {
const newHandle = await handle.resume();
const result = await newHandle.result();
}Workflow Setup for Interrupts
To enable sub-second interrupt response during sub-agent execution, use the Trigger primitive:
import {
proxyActivities,
defineSignal,
setHandler,
getExternalWorkflowHandle,
} from '@temporalio/workflow';
import { Trigger } from '@temporalio/workflow';
import { runAgentWorkflow, INTERRUPT_SIGNAL_NAME } from '@helix-agents/runtime-temporal';
export const interruptSignal = defineSignal<[string]>(INTERRUPT_SIGNAL_NAME);
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
// Create trigger for instant interrupt wake-up
const interruptTrigger = new Trigger<string>();
let interruptReason: string | undefined;
setHandler(interruptSignal, (reason) => {
interruptReason = reason;
interruptTrigger.resolve(reason); // Wake up immediately
});
return runAgentWorkflow(input, activities, {
interruptTrigger,
getExternalWorkflowHandle: (workflowId) => getExternalWorkflowHandle(workflowId),
// ... other options
});
}Sub-Agent Interrupt Propagation
When an agent has running child workflows, interrupts propagate through the entire hierarchy:
- Parent receives interrupt - Signal handler resolves the
interruptTrigger - Children are signaled - Parent uses
getExternalWorkflowHandleto signal each child - Children stop gracefully - Each child detects the interrupt and stops at its next safe point
- Parent completes - Returns with status
interrupted
Target latency: < 500ms from interrupt request to stopped execution.
See Interrupt and Resume for complete documentation including resume modes and error handling.
Stream Resumption
The Temporal runtime supports stream resumption for handling client disconnections and page refreshes during agent execution. This uses the same FrontendHandler pattern as the JS runtime.
Setting Up Stream Resumption
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { RedisStreamManager, RedisStateStore } from '@helix-agents/store-redis';
const handler = createFrontendHandler({
executor, // TemporalAgentExecutor
stateStore, // RedisStateStore for snapshots
streamManager, // RedisStreamManager for stream chunks
agent: myAgent,
});Frontend Integration
Use HelixChatTransport with resumeFromSequence to handle reconnections:
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
import { useChat } from '@ai-sdk/react';
function Chat({ sessionId, snapshot }) {
const { messages } = useChat({
id: sessionId,
initialMessages: snapshot?.messages ?? [],
resume: snapshot?.status === 'active',
transport: new HelixChatTransport({
api: `/api/chat/${sessionId}`,
resumeFromSequence: snapshot?.streamSequence,
}),
});
}Content Replay Flow
When a client reconnects mid-stream:
- Load snapshot - Get messages and
streamSequencefrom state store - Resume stream - Connect to stream manager with
fromSequence - Content replay - Partial content is replayed as stream events
- Live streaming - Continue receiving new chunks
The FrontendHandler.getSnapshot() method automatically handles content replay when fromSequence > 0:
// In your API route (e.g., /api/chat/[sessionId]/route.ts)
export async function GET(
request: Request,
{ params }: { params: { sessionId: string } }
) {
const { sessionId } = params;
const fromSequence = parseInt(request.headers.get('Last-Event-ID') ?? '0');
// FrontendHandler builds replay content automatically
return handler.handleStream(sessionId, { fromSequence });
}Key Considerations
- Redis required - Stream resumption relies on
RedisStreamManagerstoring chunks with sequence numbers - Chunk retention - Configure appropriate TTL for stream chunks based on expected session duration
- Multi-worker - Works across multiple workers since stream state is in Redis
Concurrency Protection
The Temporal runtime uses workflow ID uniqueness as its primary concurrency protection:
- Each session maps to a unique workflow ID
- Temporal server prevents duplicate workflow IDs
- Concurrent
execute()calls result in workflow collision errors
retry() Method
Retry failed agents in Temporal:
const result = await handle.result();
if (result.status === 'failed') {
const retryHandle = await executor.retry(agent, sessionId);
// or
const retryHandle = await handle.retry();
}The retry creates a new workflow execution while preserving session state.
Method Comparison
| Method | Purpose | Stream Behavior | Valid From Status |
|---|---|---|---|
execute() | New/continue conversation | Resets stream | Any except running |
resume() | Continue after interrupt | Preserves stream | interrupted, paused |
retry() | Recover from failure | Resets to checkpoint | failed |
Determinism Requirements
Workflow code must be deterministic:
// BAD - Non-deterministic
export async function agentWorkflow(input) {
const timestamp = Date.now(); // Different on replay!
const random = Math.random(); // Different on replay!
const uuid = crypto.randomUUID(); // Different on replay!
}
// GOOD - Use Temporal APIs
import { sleep, uuid4, workflowInfo } from '@temporalio/workflow';
export async function agentWorkflow(input) {
const info = workflowInfo();
const timestamp = info.startTime; // Deterministic
const id = uuid4(); // Deterministic (seeded)
await sleep('5 seconds'); // Deterministic timer
}Move non-deterministic operations to activities:
- LLM API calls
- Database queries
- External API calls
- Random number generation
Observability
Temporal Web UI
Access at http://localhost:8233 (local) or via Temporal Cloud.
View:
- Workflow history and events
- Activity execution details
- Pending/failed workflows
- Search by workflow ID or type
Workflow Queries
Query running workflows:
// In workflow
import { defineQuery, setHandler } from '@temporalio/workflow';
export const getProgressQuery = defineQuery<{ stepCount: number; status: string }>('getProgress');
export async function agentWorkflow(input) {
let progress = { stepCount: 0, status: 'running' };
setHandler(getProgressQuery, () => progress);
// Update progress during execution
progress.stepCount++;
// ...
}
// From client
const handle = client.workflow.getHandle(workflowId);
const progress = await handle.query(getProgressQuery);Production Deployment
Worker Scaling
Run multiple workers for throughput:
# Scale horizontally
docker-compose scale worker=5Workers pull from the same task queue - Temporal handles distribution.
Temporal Cloud
For production, use Temporal Cloud:
import { Connection, Client } from '@temporalio/client';
const connection = await Connection.connect({
address: 'your-namespace.tmprl.cloud:7233',
tls: {
clientCertPair: {
crt: fs.readFileSync('client.pem'),
key: fs.readFileSync('client.key'),
},
},
});Monitoring
Set up metrics:
import { Runtime } from '@temporalio/worker';
Runtime.install({
telemetryOptions: {
metrics: {
prometheus: { bindAddress: '0.0.0.0:9464' },
},
},
});Limitations
Higher Latency
Each activity invocation adds network overhead. Batch operations when possible.
Determinism Constraints
Workflow code restrictions can be challenging. Move all I/O to activities.
Infrastructure Overhead
Requires running Temporal server and workers alongside your application.
Learning Curve
Temporal concepts (workflows, activities, replay, determinism) require understanding.
Best Practices
1. Keep Workflows Thin
Workflow code should only orchestrate - move logic to activities:
// Workflow just coordinates
export async function agentWorkflow(input) {
const state = await loadState(input.sessionId);
const result = await executeStep(state); // Activity does the work
await saveState(state);
}2. Appropriate Timeouts
Set realistic timeouts:
const { executeAgentStep } = proxyActivities<typeof activities>({
startToCloseTimeout: '5 minutes', // LLM calls can be slow
});3. Heartbeat Long Activities
For activities > 30 seconds, implement heartbeating:
export async function executeLongTool(input: ToolInput): Promise<ToolResult> {
for (const item of items) {
await processItem(item);
Context.current().heartbeat(); // Report progress
}
return result;
}4. Use Continue-As-New for Long Histories
Workflows with many events should reset:
import { continueAsNew, workflowInfo } from '@temporalio/workflow';
export async function agentWorkflow(input) {
const info = workflowInfo();
// After many steps, continue as new to reset history
if (info.historyLength > 10000) {
await continueAsNew<typeof agentWorkflow>(input);
}
// ...
}Next Steps
- Interrupt and Resume - Complete interrupt/resume documentation
- JavaScript Runtime - Simpler option for development
- Cloudflare Runtime - Edge deployment alternative
- Storage: Redis - Recommended store for Temporal