Skip to content

Temporal Runtime

The Temporal runtime (@helix-agents/runtime-temporal) executes agents as durable Temporal workflows. This provides crash recovery, automatic retries, and production-grade reliability for long-running agent tasks.

When to Use

Good fit:

  • Production workloads requiring reliability
  • Long-running agents (hours or days)
  • Agents that must survive process restarts
  • Complex multi-agent orchestrations
  • Operations requiring audit trails and observability

Not ideal for:

  • Quick development iteration (infrastructure overhead)
  • Simple, short-lived agents
  • Cost-sensitive deployments without existing Temporal infrastructure

Prerequisites

You need a running Temporal server:

Option 1: Temporal Cloud (Recommended for production)

bash
# Sign up at https://temporal.io/cloud

Option 2: Local development

bash
# Using Docker
docker run -d --name temporal \
  -p 7233:7233 -p 8233:8233 \
  temporalio/auto-setup:latest

# Or using Temporal CLI
temporal server start-dev

Installation

bash
npm install @helix-agents/runtime-temporal @helix-agents/store-redis @temporalio/client @temporalio/worker

Architecture

mermaid
graph TB
    subgraph App ["Your Application"]
        Executor["<b>TemporalAgentExecutor</b><br/>Starts workflows · Returns handles<br/>Reads from StateStore/StreamManager"]
    end

    subgraph Server ["Temporal Server"]
        ServerDesc["Persists workflow state<br/>Manages task queues<br/>Handles retries and timeouts"]
    end

    subgraph Worker ["Temporal Worker"]
        subgraph Workflow ["Agent Workflow"]
            WF["Orchestrates execution<br/>Calls activities for LLM/tools"]
        end
        subgraph Activities ["Activities"]
            Act["LLM calls · Tool execution<br/>State persistence"]
        end
    end

    App --> Server --> Worker

Setup Guide

1. Create the Workflow

Define a workflow that wraps the agent execution:

typescript
// src/workflows/agent-workflow.ts
import { proxyActivities, defineSignal, setHandler } from '@temporalio/workflow';
import type { AgentWorkflowInput, AgentWorkflowResult } from '@helix-agents/runtime-temporal';
import type * as activities from '../activities';

// Proxy activities with timeouts
const { executeAgentStep, saveState, loadState } = proxyActivities<typeof activities>({
  startToCloseTimeout: '5 minutes',
  retry: {
    maximumAttempts: 3,
    backoffCoefficient: 2,
  },
});

// Abort signal
export const abortSignal = defineSignal('abort');

export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
  let aborted = false;
  setHandler(abortSignal, () => {
    aborted = true;
  });

  try {
    // Load or initialize state
    let state = await loadState(input.sessionId);
    if (!state) {
      state = await initializeState(input);
    }

    // Main execution loop
    while (state.status === 'running' && !aborted) {
      const stepResult = await executeAgentStep(input.agentType, state);
      state = await processStepResult(state, stepResult);
      await saveState(state);
    }

    if (aborted) {
      return { status: 'failed', error: 'Workflow aborted' };
    }

    return {
      status: state.status === 'completed' ? 'completed' : 'failed',
      output: state.output,
      error: state.error,
    };
  } catch (error) {
    return {
      status: 'failed',
      error: error instanceof Error ? error.message : String(error),
    };
  }
}

2. Create Activities

Activities perform the actual work (LLM calls, tool execution):

typescript
// src/activities/agent-activities.ts
import { AgentRegistry, type AgentState } from '@helix-agents/runtime-temporal';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';

const stateStore = new RedisStateStore(redisClient);
const streamManager = new RedisStreamManager(redisClient);
const llmAdapter = new VercelAIAdapter();
const registry = new AgentRegistry();

// Register your agents
registry.register(ResearchAgent);
registry.register(AnalyzerAgent);

export async function loadState(sessionId: string): Promise<AgentState<unknown, unknown> | null> {
  return stateStore.load(sessionId);
}

export async function saveState(state: AgentState<unknown, unknown>): Promise<void> {
  await stateStore.save(state);
}

export async function executeAgentStep(
  agentType: string,
  state: AgentState<unknown, unknown>
): Promise<StepResult<unknown>> {
  const agent = registry.get(agentType);
  if (!agent) {
    throw new Error(`Unknown agent type: ${agentType}`);
  }

  // Execute one LLM step
  return executeStep(agent, state, llmAdapter, streamManager);
}

3. Create the Worker

The worker processes workflows and activities:

typescript
// src/worker.ts
import { Worker, NativeConnection } from '@temporalio/worker';
import * as activities from './activities';

async function runWorker() {
  const connection = await NativeConnection.connect({
    address: process.env.TEMPORAL_ADDRESS ?? 'localhost:7233',
  });

  const worker = await Worker.create({
    connection,
    namespace: 'default',
    taskQueue: 'agent-tasks',
    workflowsPath: require.resolve('./workflows'),
    activities,
  });

  await worker.run();
}

runWorker().catch(console.error);

4. Create the Executor

The executor starts workflows and returns handles:

typescript
// src/executor.ts
import { Client, Connection } from '@temporalio/client';
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';

async function createExecutor() {
  const connection = await Connection.connect({
    address: process.env.TEMPORAL_ADDRESS ?? 'localhost:7233',
  });

  const client = new Client({ connection });

  // Wrap Temporal client to match interface
  const temporalClientAdapter = {
    startWorkflow: async (name, options) => {
      const handle = await client.workflow.start(name, {
        workflowId: options.workflowId,
        taskQueue: options.taskQueue,
        args: options.args,
      });
      return wrapHandle(handle);
    },
    getHandle: (workflowId) => {
      return wrapHandle(client.workflow.getHandle(workflowId));
    },
  };

  return new TemporalAgentExecutor({
    client: temporalClientAdapter,
    stateStore: new RedisStateStore(redis),
    streamManager: new RedisStreamManager(redis),
    workflowName: 'agentWorkflow',
    taskQueue: 'agent-tasks',
  });
}

Using the Executor

Once set up, usage is identical to other runtimes:

typescript
const executor = await createExecutor();

// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research quantum computing');

// Stream events
const stream = await handle.stream();
for await (const chunk of stream) {
  console.log(chunk);
}

// Get result
const result = await handle.result();

Multi-Turn Conversations

The Temporal runtime supports the same multi-turn conversation API as the JS runtime using the session-centric model:

Using sessionId

typescript
// First message - creates a new session
const handle1 = await executor.execute(agent, 'Hello, my name is Alice', {
  sessionId: 'session-123',
});
await handle1.result();

// Continue the conversation - same sessionId
const handle2 = await executor.execute(agent, 'What is my name?', {
  sessionId: 'session-123',
});

Using handle.send()

typescript
const handle1 = await executor.execute(agent, 'Hello', {
  sessionId: 'session-123',
});
await handle1.result();

const handle2 = await handle1.send('Tell me more');

Using Direct Messages

typescript
const handle = await executor.execute(agent, {
  message: 'Continue from here',
  messages: myExternalMessageHistory,
});

Behavior Table

InputMessages SourceState Source
message only (new session)Empty (fresh)Empty (fresh)
message + sessionId (existing)From sessionFrom session
message + messagesFrom messagesEmpty (fresh)
message + stateEmpty (fresh)From state
message + sessionId + messagesFrom messages (override)From session
message + sessionId + stateFrom sessionFrom state (override)
All fourFrom messages (override)From state (override)

See JS Runtime - Multi-Turn Conversations for detailed documentation.

Execute vs Resume Semantics

The Temporal runtime provides two methods for running agents with different semantics:

MethodUse CaseStream BehaviorSession Behavior
execute()Start new conversation or add new messageCreates new stream segmentCreates or continues session
resume()Continue interrupted sessionContinues existing streamMust have interrupted state

execute() is for normal agent interactions:

  • Starts a new Temporal workflow (or continues an existing session)
  • Creates a fresh stream segment for the response
  • Appends the new user message to the session
typescript
// New conversation
const handle = await executor.execute(agent, 'Hello', { sessionId: 'session-1' });

// Follow-up in same session (new run, new stream segment)
const handle2 = await executor.execute(agent, 'Tell me more', { sessionId: 'session-1' });

resume() is specifically for recovering from interrupts:

  • Continues the existing workflow from where it stopped
  • Maintains stream continuity for clients reconnecting
  • Session must be in interrupted or paused status
typescript
// After an interrupt...
const state = await stateStore.loadState('session-1');
if (state?.status === 'interrupted') {
  // Resume continues the exact same stream
  const handle = await executor.resume(agent, 'session-1');
}

Agent Registry

Register agents so the worker can instantiate them:

typescript
import { AgentRegistry } from '@helix-agents/runtime-temporal';

const registry = new AgentRegistry();

// Register each agent type
registry.register(ResearchAgent); // name: 'researcher'
registry.register(AnalyzerAgent); // name: 'analyzer'
registry.register(SummarizerAgent); // name: 'summarizer'

// In activities, look up by type
export async function executeAgentStep(agentType: string, state) {
  const agent = registry.get(agentType); // Returns the agent config
  // ...
}

AgentRegistry with Factories

The AgentRegistry supports both static registration and factory functions for dynamic agent creation. Factories are useful when agents need runtime dependencies like API keys, database connections, or per-request configuration.

Static vs Factory Registration

typescript
import { AgentRegistry, AgentNotFoundError } from '@helix-agents/runtime-temporal';

const registry = new AgentRegistry();

// Static registration - for agents with no runtime dependencies
registry.register(ResearchAgent);
registry.register(SummarizerAgent);

// Factory registration - for agents needing runtime dependencies
interface Env {
  TAVILY_API_KEY: string;
  DATABASE: Database;
}

registry.registerFactory<Env>('dynamic-researcher', (ctx) => {
  return createResearchAgent({
    apiKey: ctx.env.TAVILY_API_KEY,
    database: ctx.env.DATABASE,
    userId: ctx.userId,
    sessionId: ctx.sessionId,
  });
});

AgentFactoryContext

Factory functions receive context about the current execution:

typescript
interface AgentFactoryContext<TEnv = unknown> {
  /** Environment bindings (e.g., API keys, connections) */
  env: TEnv;
  /** Current session identifier */
  sessionId: string;
  /** Current run identifier */
  runId: string;
  /** Optional user identifier */
  userId?: string;
}

Using resolve() in Activities

The resolve() method handles both static and factory agents. Use it instead of get() when you have factories:

typescript
// src/activities.ts
export async function executeAgentStep(
  agentType: string,
  state: AgentState<unknown, unknown>,
  context: { sessionId: string; runId: string; userId?: string }
): Promise<StepResult<unknown>> {
  // resolve() handles both static and factory agents
  const agent = registry.resolve(agentType, {
    env,           // Your environment object
    sessionId: context.sessionId,
    runId: context.runId,
    userId: context.userId,
  });

  return executeStep(agent, state, llmAdapter, streamManager);
}

Error Handling with AgentNotFoundError

When an agent type is not found, AgentNotFoundError provides helpful information:

typescript
import { AgentNotFoundError } from '@helix-agents/runtime-temporal';

try {
  const agent = registry.resolve('unknown-agent', context);
} catch (error) {
  if (error instanceof AgentNotFoundError) {
    console.error(`Unknown agent type: ${error.agentType}`);
    console.error(`Available types: ${error.availableTypes.join(', ')}`);
  }
}

Registry Methods

MethodDescription
register(config)Register a static agent configuration
registerFactory(type, factory)Register a factory function
resolve(type, context)Resolve agent (factory-first, then static)
get(type)Get static agent only (legacy)
has(type)Check if agent type is registered
unregister(type)Remove an agent registration
getRegisteredTypes()Get all registered agent type names
clear()Remove all registrations (for testing)

Sub-Agent Handling

Sub-agents execute as child workflows:

typescript
// In workflow
import { executeChild } from '@temporalio/workflow';

// When parent needs to execute sub-agent
const subAgentResult = await executeChild('agentWorkflow', {
  args: [
    {
      agentType: 'analyzer',
      sessionId: `${parentSessionId}-sub-${callId}`,
      streamId: parentStreamId, // Same stream for unified streaming
      message: inputMessage,
      parentSessionId,
    },
  ],
  workflowId: `agent__analyzer__${subSessionId}`,
  taskQueue: 'agent-tasks',
});

Benefits of child workflows:

  • Independent retry policies
  • Separate timeouts
  • Can be cancelled independently
  • Full workflow history preserved

Activity Configuration

Configure timeouts and retries per activity:

typescript
const { executeAgentStep } = proxyActivities<typeof activities>({
  // How long the activity can run
  startToCloseTimeout: '10 minutes',

  // How long to wait for worker to start processing
  scheduleToStartTimeout: '1 minute',

  // Heartbeat timeout for long activities
  heartbeatTimeout: '30 seconds',

  // Retry configuration
  retry: {
    initialInterval: '1 second',
    backoffCoefficient: 2,
    maximumInterval: '1 minute',
    maximumAttempts: 5,
    nonRetryableErrorTypes: ['InvalidAgentError'],
  },
});

Crash Recovery

Temporal provides automatic crash recovery:

mermaid
graph TB
    W1["Worker 1 starts workflow"]
    W1 --> S1["Step 1 completes, state saved"]
    S1 --> S2["Step 2 completes, state saved"]
    S2 --> Crash["Worker 1 crashes"]

    Crash --> Detect["Temporal detects failure"]
    Detect --> Resched["Workflow task rescheduled"]

    Resched --> W2["Worker 2 picks up"]
    W2 --> Replay["Replays history (deterministic)"]
    Replay --> Continue["Continues from Step 3"]
    Continue --> Complete["Completes normally"]

Key points:

  • Workflow code must be deterministic
  • State is reconstructed from event history
  • Activities are not re-executed (results cached)

Interrupt and Resume

The Temporal runtime supports soft interruption with the ability to resume later:

typescript
// Interrupt the agent (soft stop)
await handle.interrupt('user_requested');

// Agent status becomes 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'

// Later, resume execution
const { canResume } = await handle.canResume();
if (canResume) {
  const newHandle = await handle.resume();
  const result = await newHandle.result();
}

Workflow Setup for Interrupts

To enable sub-second interrupt response during sub-agent execution, use the Trigger primitive:

typescript
import {
  proxyActivities,
  defineSignal,
  setHandler,
  getExternalWorkflowHandle,
} from '@temporalio/workflow';
import { Trigger } from '@temporalio/workflow';
import { runAgentWorkflow, INTERRUPT_SIGNAL_NAME } from '@helix-agents/runtime-temporal';

export const interruptSignal = defineSignal<[string]>(INTERRUPT_SIGNAL_NAME);

export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
  // Create trigger for instant interrupt wake-up
  const interruptTrigger = new Trigger<string>();
  let interruptReason: string | undefined;

  setHandler(interruptSignal, (reason) => {
    interruptReason = reason;
    interruptTrigger.resolve(reason); // Wake up immediately
  });

  return runAgentWorkflow(input, activities, {
    interruptTrigger,
    getExternalWorkflowHandle: (workflowId) => getExternalWorkflowHandle(workflowId),
    // ... other options
  });
}

Sub-Agent Interrupt Propagation

When an agent has running child workflows, interrupts propagate through the entire hierarchy:

  1. Parent receives interrupt - Signal handler resolves the interruptTrigger
  2. Children are signaled - Parent uses getExternalWorkflowHandle to signal each child
  3. Children stop gracefully - Each child detects the interrupt and stops at its next safe point
  4. Parent completes - Returns with status interrupted

Target latency: < 500ms from interrupt request to stopped execution.

See Interrupt and Resume for complete documentation including resume modes and error handling.

Stream Resumption

The Temporal runtime supports stream resumption for handling client disconnections and page refreshes during agent execution. This uses the same FrontendHandler pattern as the JS runtime.

Setting Up Stream Resumption

typescript
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { RedisStreamManager, RedisStateStore } from '@helix-agents/store-redis';

const handler = createFrontendHandler({
  executor,        // TemporalAgentExecutor
  stateStore,      // RedisStateStore for snapshots
  streamManager,   // RedisStreamManager for stream chunks
  agent: myAgent,
});

Frontend Integration

Use HelixChatTransport with resumeFromSequence to handle reconnections:

typescript
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
import { useChat } from '@ai-sdk/react';

function Chat({ sessionId, snapshot }) {
  const { messages } = useChat({
    id: sessionId,
    initialMessages: snapshot?.messages ?? [],
    resume: snapshot?.status === 'active',
    transport: new HelixChatTransport({
      api: `/api/chat/${sessionId}`,
      resumeFromSequence: snapshot?.streamSequence,
    }),
  });
}

Content Replay Flow

When a client reconnects mid-stream:

  1. Load snapshot - Get messages and streamSequence from state store
  2. Resume stream - Connect to stream manager with fromSequence
  3. Content replay - Partial content is replayed as stream events
  4. Live streaming - Continue receiving new chunks

The FrontendHandler.getSnapshot() method automatically handles content replay when fromSequence > 0:

typescript
// In your API route (e.g., /api/chat/[sessionId]/route.ts)
export async function GET(
  request: Request,
  { params }: { params: { sessionId: string } }
) {
  const { sessionId } = params;
  const fromSequence = parseInt(request.headers.get('Last-Event-ID') ?? '0');

  // FrontendHandler builds replay content automatically
  return handler.handleStream(sessionId, { fromSequence });
}

Key Considerations

  • Redis required - Stream resumption relies on RedisStreamManager storing chunks with sequence numbers
  • Chunk retention - Configure appropriate TTL for stream chunks based on expected session duration
  • Multi-worker - Works across multiple workers since stream state is in Redis

Concurrency Protection

The Temporal runtime uses workflow ID uniqueness as its primary concurrency protection:

  • Each session maps to a unique workflow ID
  • Temporal server prevents duplicate workflow IDs
  • Concurrent execute() calls result in workflow collision errors

retry() Method

Retry failed agents in Temporal:

typescript
const result = await handle.result();
if (result.status === 'failed') {
  const retryHandle = await executor.retry(agent, sessionId);
  // or
  const retryHandle = await handle.retry();
}

The retry creates a new workflow execution while preserving session state.

Method Comparison

MethodPurposeStream BehaviorValid From Status
execute()New/continue conversationResets streamAny except running
resume()Continue after interruptPreserves streaminterrupted, paused
retry()Recover from failureResets to checkpointfailed

Determinism Requirements

Workflow code must be deterministic:

typescript
// BAD - Non-deterministic
export async function agentWorkflow(input) {
  const timestamp = Date.now(); // Different on replay!
  const random = Math.random(); // Different on replay!
  const uuid = crypto.randomUUID(); // Different on replay!
}

// GOOD - Use Temporal APIs
import { sleep, uuid4, workflowInfo } from '@temporalio/workflow';

export async function agentWorkflow(input) {
  const info = workflowInfo();
  const timestamp = info.startTime; // Deterministic
  const id = uuid4(); // Deterministic (seeded)
  await sleep('5 seconds'); // Deterministic timer
}

Move non-deterministic operations to activities:

  • LLM API calls
  • Database queries
  • External API calls
  • Random number generation

Observability

Temporal Web UI

Access at http://localhost:8233 (local) or via Temporal Cloud.

View:

  • Workflow history and events
  • Activity execution details
  • Pending/failed workflows
  • Search by workflow ID or type

Workflow Queries

Query running workflows:

typescript
// In workflow
import { defineQuery, setHandler } from '@temporalio/workflow';

export const getProgressQuery = defineQuery<{ stepCount: number; status: string }>('getProgress');

export async function agentWorkflow(input) {
  let progress = { stepCount: 0, status: 'running' };

  setHandler(getProgressQuery, () => progress);

  // Update progress during execution
  progress.stepCount++;
  // ...
}

// From client
const handle = client.workflow.getHandle(workflowId);
const progress = await handle.query(getProgressQuery);

Production Deployment

Worker Scaling

Run multiple workers for throughput:

bash
# Scale horizontally
docker-compose scale worker=5

Workers pull from the same task queue - Temporal handles distribution.

Temporal Cloud

For production, use Temporal Cloud:

typescript
import { Connection, Client } from '@temporalio/client';

const connection = await Connection.connect({
  address: 'your-namespace.tmprl.cloud:7233',
  tls: {
    clientCertPair: {
      crt: fs.readFileSync('client.pem'),
      key: fs.readFileSync('client.key'),
    },
  },
});

Monitoring

Set up metrics:

typescript
import { Runtime } from '@temporalio/worker';

Runtime.install({
  telemetryOptions: {
    metrics: {
      prometheus: { bindAddress: '0.0.0.0:9464' },
    },
  },
});

Limitations

Higher Latency

Each activity invocation adds network overhead. Batch operations when possible.

Determinism Constraints

Workflow code restrictions can be challenging. Move all I/O to activities.

Infrastructure Overhead

Requires running Temporal server and workers alongside your application.

Learning Curve

Temporal concepts (workflows, activities, replay, determinism) require understanding.

Best Practices

1. Keep Workflows Thin

Workflow code should only orchestrate - move logic to activities:

typescript
// Workflow just coordinates
export async function agentWorkflow(input) {
  const state = await loadState(input.sessionId);
  const result = await executeStep(state); // Activity does the work
  await saveState(state);
}

2. Appropriate Timeouts

Set realistic timeouts:

typescript
const { executeAgentStep } = proxyActivities<typeof activities>({
  startToCloseTimeout: '5 minutes', // LLM calls can be slow
});

3. Heartbeat Long Activities

For activities > 30 seconds, implement heartbeating:

typescript
export async function executeLongTool(input: ToolInput): Promise<ToolResult> {
  for (const item of items) {
    await processItem(item);
    Context.current().heartbeat(); // Report progress
  }
  return result;
}

4. Use Continue-As-New for Long Histories

Workflows with many events should reset:

typescript
import { continueAsNew, workflowInfo } from '@temporalio/workflow';

export async function agentWorkflow(input) {
  const info = workflowInfo();

  // After many steps, continue as new to reset history
  if (info.historyLength > 10000) {
    await continueAsNew<typeof agentWorkflow>(input);
  }
  // ...
}

Next Steps

Released under the MIT License.