@helix-agents/runtime-temporal
Temporal runtime for durable agent execution. Provides crash recovery, automatic retries, and distributed execution using Temporal workflows.
Installation
npm install @helix-agents/runtime-temporalAgentRegistry
Maps agent type names to configurations. Supports both static registration and factory functions for dynamic agent creation.
import { AgentRegistry, AgentNotFoundError } from '@helix-agents/runtime-temporal';
const registry = new AgentRegistry();
// Static registration
registry.register(ResearchAgent);
registry.register(SummarizerAgent);
// Factory registration (for agents needing runtime deps)
registry.registerFactory<Env>('dynamic-agent', (ctx) =>
createAgent({ apiKey: ctx.env.API_KEY, userId: ctx.userId })
);
// Resolve (handles both static and factory)
const agent = registry.resolve('dynamic-agent', {
env,
sessionId: 'session-123',
runId: 'run-456',
userId: 'user-789',
});
// Get static agent only (legacy)
const agent = registry.get('research-assistant');
// Check if registered
const exists = registry.has('research-assistant');
// Remove registration
registry.unregister('research-assistant');
// Get all registered types
const types = registry.getRegisteredTypes();
// Clear all registrations (for testing)
registry.clear();
// Use default global registry
import { defaultAgentRegistry } from '@helix-agents/runtime-temporal';AgentFactoryContext
Context passed to factory functions:
interface AgentFactoryContext<TEnv = unknown> {
/** Environment bindings (e.g., API keys, database connections) */
env: TEnv;
/** Current session identifier */
sessionId: string;
/** Current run identifier */
runId: string;
/** Optional user identifier */
userId?: string;
}AgentFactory
Factory function type:
type AgentFactory<TEnv = unknown> = (
context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;AgentResolverInterface
Interface implemented by AgentRegistry:
interface AgentResolverInterface<TEnv = unknown> {
/**
* Resolve an agent configuration by type.
* @throws AgentNotFoundError if agent type is not found
*/
resolve(
agentType: string,
context: AgentFactoryContext<TEnv>
): AgentConfig<z.ZodType, z.ZodType>;
/**
* Check if an agent type is registered.
*/
has(agentType: string): boolean;
}AgentNotFoundError
Error thrown when an agent type is not found:
class AgentNotFoundError extends Error {
/** The agent type that was requested */
readonly agentType: string;
/** List of available agent types */
readonly availableTypes: string[];
constructor(agentType: string, availableTypes: string[]);
}Example usage:
import { AgentNotFoundError } from '@helix-agents/runtime-temporal';
try {
const agent = registry.resolve('unknown', context);
} catch (error) {
if (error instanceof AgentNotFoundError) {
console.error(`Unknown agent type: ${error.agentType}`);
console.error(`Available types: ${error.availableTypes.join(', ')}`);
}
}Registry Methods Summary
| Method | Description |
|---|---|
register(config) | Register a static agent configuration |
registerFactory(type, factory) | Register a factory function |
resolve(type, context) | Resolve agent (factory-first, then static) |
get(type) | Get static agent only (legacy) |
has(type) | Check if agent type is registered |
unregister(type) | Remove an agent registration |
getRegisteredTypes() | Get all registered agent type names |
clear() | Remove all registrations (for testing) |
GenericAgentActivities
Activity implementations for Temporal workflows. No decorators—wrap these with platform-specific activity decorators.
import { GenericAgentActivities } from '@helix-agents/runtime-temporal';
const activities = new GenericAgentActivities({
registry, // AgentRegistry
stateStore, // StateStore
streamManager, // StreamManager
llmAdapter, // LLMAdapter
logger, // Optional: Logger
});
// Available activity methods:
activities.initializeAgentState(input);
activities.executeAgentStep(input);
activities.executeToolCall(input);
activities.registerSubAgents(input);
activities.recordSubAgentResult(input);
activities.markAgentFailed(input);
activities.markAgentAborted(input);
activities.endAgentStream(input);
activities.failAgentStream(input);
activities.checkExistingState(input);
activities.updateAgentStatus(input);runAgentWorkflow
Core workflow logic function. Wrap with Temporal workflow decorators.
import { runAgentWorkflow } from '@helix-agents/runtime-temporal/workflow-exports';
import type { AgentWorkflowActivities } from '@helix-agents/runtime-temporal/workflow-exports';
// In your workflow file
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
const activities = proxyActivities<AgentWorkflowActivities>({
startToCloseTimeout: '5m',
});
return runAgentWorkflow(input, activities, {
startChildWorkflow: async (childInput) => {
return executeChild(agentWorkflow, {
workflowId: childInput.runId,
args: [childInput],
});
},
isCancellation: (error) => isCancellation(error),
log: {
info: (msg, data) => log.info(msg, data),
warn: (msg, data) => log.warn(msg, data),
error: (msg, data) => log.error(msg, data),
},
});
}Options:
startChildWorkflow- Function to start sub-agent workflowsisCancellation- Check if error is Temporal cancellationlog- Workflow-safe logger
TemporalAgentExecutor
Executor that starts and manages Temporal workflows. Same API as JSAgentExecutor.
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
const executor = new TemporalAgentExecutor({
client, // Temporal Client
taskQueue, // Task queue name
stateStore, // StateStore
streamManager, // StreamManager
logger, // Optional
});
// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
// Get existing handle
const handle = await executor.getHandle(MyAgent, 'session-123');
// Check if resumable
const result = await executor.canResume(MyAgent, 'session-123');AgentExecutionHandle
Handle returned by execute() for interacting with a running agent.
Properties
handle.sessionId; // Unique run identifier
handle.streamId; // Stream identifierstream
Get a stream of events from the execution.
const stream = await handle.stream();
for await (const chunk of stream) {
switch (chunk.type) {
case 'text_delta':
process.stdout.write(chunk.delta);
break;
case 'tool_start':
console.log(`Tool: ${chunk.toolName}`);
break;
case 'run_interrupted':
console.log(`Interrupted: ${chunk.reason}`);
break;
}
}result
Wait for the execution to complete and get the result.
const result = await handle.result();
if (result.status === 'completed') {
console.log('Output:', result.output);
} else if (result.status === 'interrupted') {
console.log('Interrupted:', result.interruptReason);
}abort
Cancel the execution. This is a HARD stop - the agent fails and cannot be resumed. Sends a cancellation signal to the Temporal workflow.
await handle.abort('User requested cancellation');interrupt
Interrupt execution for later resumption. This is a SOFT stop - the agent can be resumed.
await handle.interrupt('user_requested');
// Status is now 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'The interrupt sends a signal to the Temporal workflow, which checkpoints state and exits gracefully.
canResume
Check if execution can be resumed.
const { canResume, reason } = await handle.canResume();
if (canResume) {
const resumed = await handle.resume();
}resume
Resume interrupted or paused execution. Returns a new handle for the resumed execution.
// Continue from where it stopped
const newHandle = await handle.resume();
// Resume with a new message
const newHandle = await handle.resume({
mode: 'with_message',
message: 'Continue with this context',
});
// Time-travel to a specific checkpoint
const newHandle = await handle.resume({
mode: 'from_checkpoint',
checkpointId: 'cpv1-session-123-s5-...',
});getState
Get current agent state.
const state = await handle.getState();
console.log('Status:', state.status);
console.log('Step count:', state.stepCount);send
Continue the conversation after completion. Returns a new handle.
const handle1 = await executor.execute(agent, 'Hello');
await handle1.result();
const handle2 = await handle1.send('Tell me more');
const result = await handle2.result();DTOs
Type-safe schemas for workflow and activity inputs/outputs.
Session vs Run Identifiers
sessionId: Primary key for state storage. A session contains all messages, state, and checkpoints.runId: Identifies a specific workflow execution. Multiple runs can occur within one session.
In Temporal, the workflow ID is tied to sessionId for session continuity. The runId is execution metadata for tracing.
Workflow DTOs
import {
AgentWorkflowInputSchema,
AgentWorkflowResultSchema,
type AgentWorkflowInput,
type AgentWorkflowResult,
} from '@helix-agents/runtime-temporal';
interface AgentWorkflowInput {
agentType: string;
runId: string; // Execution metadata (for tracing)
streamId: string;
message: string;
initialState?: Record<string, unknown>;
parentSessionId?: string; // Parent's sessionId for sub-agents
}
interface AgentWorkflowResult {
status: 'completed' | 'failed' | 'cancelled';
output?: unknown;
error?: string;
}Activity DTOs
import {
InitializeAgentInputSchema,
ExecuteAgentStepInputSchema,
ExecuteToolCallInputSchema,
AgentStepResultSchema,
ToolCallResultSchema,
// ... and more
} from '@helix-agents/runtime-temporal';Signal DTOs
import {
PauseAgentSignalSchema,
ResumeAgentSignalSchema,
AbortAgentSignalSchema,
InterruptAgentSignalSchema,
} from '@helix-agents/runtime-temporal';
// Interrupt signal payload
interface InterruptAgentSignal {
reason?: string;
}Cancellation Helpers
import {
type CancellationSignalProvider,
defaultCancellationSignalProvider,
createLinkedAbortController,
isActivityCancelled,
} from '@helix-agents/runtime-temporal';
// Check if activity is cancelled
if (isActivityCancelled()) {
throw new Error('Activity cancelled');
}
// Create linked abort controller
const controller = createLinkedAbortController();Heartbeat Helpers
import {
type HeartbeatProvider,
defaultHeartbeatProvider,
HeartbeatManager,
} from '@helix-agents/runtime-temporal';
// Manage heartbeats for long activities
const heartbeat = new HeartbeatManager({
intervalMs: 5000,
onHeartbeat: () => console.log('Heartbeat'),
});
heartbeat.start();
// ... do work ...
heartbeat.stop();Complete Example
activities.ts
import { GenericAgentActivities, AgentRegistry } from '@helix-agents/runtime-temporal';
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { MyAgent } from './agent.js';
const registry = new AgentRegistry();
registry.register(MyAgent);
export function createActivities() {
const activities = new GenericAgentActivities({
registry,
stateStore: new InMemoryStateStore(),
streamManager: new InMemoryStreamManager(),
llmAdapter: new VercelAIAdapter(),
});
return {
initializeAgentState: activities.initializeAgentState.bind(activities),
executeAgentStep: activities.executeAgentStep.bind(activities),
executeToolCall: activities.executeToolCall.bind(activities),
registerSubAgents: activities.registerSubAgents.bind(activities),
recordSubAgentResult: activities.recordSubAgentResult.bind(activities),
markAgentFailed: activities.markAgentFailed.bind(activities),
markAgentAborted: activities.markAgentAborted.bind(activities),
endAgentStream: activities.endAgentStream.bind(activities),
failAgentStream: activities.failAgentStream.bind(activities),
checkExistingState: activities.checkExistingState.bind(activities),
updateAgentStatus: activities.updateAgentStatus.bind(activities),
};
}workflows.ts
import { proxyActivities, executeChild, isCancellation, log } from '@temporalio/workflow';
import type { AgentWorkflowActivities } from '@helix-agents/runtime-temporal/workflow-exports';
import { runAgentWorkflow } from '@helix-agents/runtime-temporal/workflow-exports';
import type { AgentWorkflowInput, AgentWorkflowResult } from '@helix-agents/runtime-temporal';
const activities = proxyActivities<AgentWorkflowActivities>({
startToCloseTimeout: '5m',
retry: {
initialInterval: '1s',
maximumInterval: '30s',
backoffCoefficient: 2,
maximumAttempts: 3,
},
});
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
return runAgentWorkflow(input, activities, {
startChildWorkflow: async (childInput) => {
return executeChild(agentWorkflow, {
workflowId: childInput.runId,
args: [childInput],
parentClosePolicy: 'ABANDON',
});
},
isCancellation: (error) => isCancellation(error),
log: { info: log.info, warn: log.warn, error: log.error },
});
}worker.ts
import { Worker, bundleWorkflowCode, NativeConnection } from '@temporalio/worker';
import { createActivities } from './activities.js';
async function main() {
const connection = await NativeConnection.connect({ address: 'localhost:7233' });
const workflowBundle = await bundleWorkflowCode({
workflowsPath: './workflows.ts',
});
const worker = await Worker.create({
connection,
taskQueue: 'my-agents',
workflowBundle,
activities: createActivities(),
});
await worker.run();
}Interrupt and Resume
The Temporal runtime provides durable interrupt/resume via workflow signals.
Interrupt Signal Handling
Workflows listen for the interruptAgent signal:
import { setHandler, defineSignal } from '@temporalio/workflow';
import { INTERRUPT_SIGNAL_NAME } from '@helix-agents/runtime-temporal';
const interruptSignal = defineSignal<[{ reason?: string }]>(INTERRUPT_SIGNAL_NAME);
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
let interrupted = false;
let interruptReason: string | undefined;
setHandler(interruptSignal, (payload) => {
interrupted = true;
interruptReason = payload.reason;
});
// In your step loop, check for interruption
while (!interrupted && status === 'running') {
// Execute step...
if (interrupted) {
await activities.checkpointAndExit({ runId, reason: interruptReason });
return { status: 'interrupted' };
}
}
}Resuming After Process Restart
Temporal provides built-in durability. After worker restart, reconnect to workflows:
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
import { Client, Connection } from '@temporalio/client';
// Reconnect to Temporal
const connection = await Connection.connect({ address: 'localhost:7233' });
const client = new Client({ connection });
const executor = new TemporalAgentExecutor({
client,
taskQueue: 'my-agents',
stateStore,
streamManager,
});
// Resume interrupted workflow
const handle = await executor.getHandle(MyAgent, savedSessionId);
if (handle) {
const { canResume } = await handle.canResume();
if (canResume) {
const resumed = await handle.resume();
const result = await resumed.result();
}
}Workflow Execution Semantics
| Action | Workflow Behavior |
|---|---|
interrupt() | Sends signal, workflow checkpoints and exits |
resume() | Starts new workflow from checkpoint |
abort() | Sends cancellation, workflow terminates |