@helix-agents/runtime-temporal
Temporal runtime for durable agent execution. Provides crash recovery, automatic retries, and distributed execution using Temporal workflows.
v7 Stateless Suspension
v7 (the A.2 Temporal HITL rewrite, Tasks 3.2/3.3) collapsed the v6 runAgentWorkflow + WorkflowActivities shim into a single agentWorkflow body. The workflow exits on every HITL boundary instead of waiting on signals — submissions are durable state, and resume is driven by executor.resume() starting a new workflow instance with a __resume-N suffix. The v6 signals (submitToolResult, interruptAgent, runResumed, childSuspended, childWoke) have been removed; only INTERRUPT_SIGNAL_NAME remains, kept for cross-process interrupt back-compat.
Installation
npm install @helix-agents/runtime-temporalAgentRegistry
Maps agent type names to configurations. Supports both static registration and factory functions for dynamic agent creation.
import { AgentRegistry, AgentNotFoundError } from '@helix-agents/runtime-temporal';
const registry = new AgentRegistry();
// Static registration
registry.register(ResearchAgent);
registry.register(SummarizerAgent);
// Factory registration (for agents needing runtime deps)
registry.registerFactory<Env>('dynamic-agent', (ctx) =>
createAgent({ apiKey: ctx.env.API_KEY, userId: ctx.userId })
);
// Resolve (handles both static and factory)
const agent = registry.resolve('dynamic-agent', {
env,
sessionId: 'session-123',
runId: 'run-456',
userId: 'user-789',
});
// Get static agent only (legacy)
const agent = registry.get('research-assistant');
// Check if registered
const exists = registry.has('research-assistant');
// Remove registration
registry.unregister('research-assistant');
// Get all registered types
const types = registry.getRegisteredTypes();
// Clear all registrations (for testing)
registry.clear();
// Use default global registry
import { defaultAgentRegistry } from '@helix-agents/runtime-temporal';AgentFactoryContext
Context passed to factory functions:
interface AgentFactoryContext<TEnv = unknown> {
/** Environment bindings (e.g., API keys, database connections) */
env: TEnv;
/** Current session identifier */
sessionId: string;
/** Current run identifier */
runId: string;
/** Optional user identifier */
userId?: string;
}AgentFactory
Factory function type:
type AgentFactory<TEnv = unknown> = (
context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;AgentResolverInterface
Interface implemented by AgentRegistry:
interface AgentResolverInterface<TEnv = unknown> {
/**
* Resolve an agent configuration by type.
* @throws AgentNotFoundError if agent type is not found
*/
resolve(agentType: string, context: AgentFactoryContext<TEnv>): AgentConfig<z.ZodType, z.ZodType>;
/**
* Check if an agent type is registered.
*/
has(agentType: string): boolean;
}AgentNotFoundError
Error thrown when an agent type is not found:
class AgentNotFoundError extends Error {
/** The agent type that was requested */
readonly agentType: string;
/** List of available agent types */
readonly availableTypes: string[];
constructor(agentType: string, availableTypes: string[]);
}Example usage:
import { AgentNotFoundError } from '@helix-agents/runtime-temporal';
try {
const agent = registry.resolve('unknown', context);
} catch (error) {
if (error instanceof AgentNotFoundError) {
console.error(`Unknown agent type: ${error.agentType}`);
console.error(`Available types: ${error.availableTypes.join(', ')}`);
}
}Registry Methods Summary
| Method | Description |
|---|---|
register(config) | Register a static agent configuration |
registerFactory(type, factory) | Register a factory function |
resolve(type, context) | Resolve agent (factory-first, then static) |
get(type) | Get static agent only (legacy) |
has(type) | Check if agent type is registered |
unregister(type) | Remove an agent registration |
getRegisteredTypes() | Get all registered agent type names |
clear() | Remove all registrations (for testing) |
GenericActivities
Activity implementations for Temporal workflows. No decorators — wrap these with platform-specific activity decorators.
import { GenericActivities } from '@helix-agents/runtime-temporal';
const activities = new GenericActivities({
stateStore, // SessionStateStore
streamManager, // StreamManager
llmAdapter, // LLMAdapter
agentResolver: {
// AgentResolverInterface (typically wraps AgentRegistry)
resolveAgent: (type) => registry.resolve(type, ctx),
},
logger, // Optional: Logger
usageStore, // Optional: UsageTrackingStore
});v7 activity surface
agentWorkflow proxies these methods via wf.proxyActivities<GenericActivities>. When wiring a worker, bind every method below — the workflow looks them up by name.
| Activity | Purpose |
|---|---|
initializeAgentState | Create or load session state at workflow start (mode: 'fresh'). |
applyResultsAndReload | Resume entry point. Drains submitted client-tool / approval results and completed children, then reloads session state. |
refreshState | Reload session state (cache-bust before a step). |
loadStateForResume | Load session state without bumping cache invariants — used by the durable resume path. |
runLLMStep | Execute one LLM step: build messages, invoke adapter, stage assistant message + tool calls. |
commitStep | Promote staged step output (assistant message + tool results) to durable state. |
commitSuspendedStep | HITL boundary. Persist a suspended step's durable state (pending client tool calls / approval gates / awaiting children). |
runPhase2FinishWith | Execute deferred finishWith* tools after the main step. |
evaluateApprovalGate | Evaluate an approval-gated tool call against its policy (always / hook / external). |
executeCompanionToolCall | Run companion tools (companion__spawnAgent, companion__sendMessage, …). |
executeRemoteSubAgentCall | Execute a createRemoteSubAgentTool call with crash recovery + stream proxying. |
recordSubAgentResult | Persist a finished local sub-agent's output back to the parent. |
recordSubSessionResult | Persist a finished sub-session into the parent's subSessionRefs. |
addRemoteSubSessionRef | Append a remote sub-agent's SubSessionRef to the parent. |
markPersistentChildCompleted | Patch a persistent child's SubSessionRef to a terminal status. |
commitChildRespawnResults | Resume bookkeeping for sub-agents that were re-spawned after a parent resume. |
loadChildState | Load a child session's state for the parent's wait-for-child loop. |
appendMessages | Append messages to a session (used by resume drain). |
consumeInterruptFlag | Read-and-clear the durable interrupt flag at every step boundary. |
persistTerminalState | Write terminal state (completed / failed / cancelled) with output / error. |
emitChunk | Emit a single StreamChunk to the stream. |
emitSuspensionMarker | Emit a suspension-boundary chunk (UI signals the run is paused). |
The legacy v6 activities (executeAgentStep, executeToolCall, registerSubAgents, markAgentFailed, markAgentAborted, endAgentStream, failAgentStream, checkExistingState, updateAgentStatus) have been removed. See Quick start workflow file for the canonical wiring.
agentWorkflow
The v7 stateless-suspension workflow body. Imported from the workflow-safe entry point and wrapped by your registered Temporal workflow.
// In your workflow file (no Node built-ins allowed)
import { agentWorkflow as runAgentWorkflow } from '@helix-agents/runtime-temporal/workflow';
import type { AgentWorkflowInput, AgentWorkflowResult } from '@helix-agents/runtime-temporal';
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
return runAgentWorkflow(input);
}agentWorkflow proxies activities internally via wf.proxyActivities<GenericActivities> — there is no WorkflowActivities shim and no runAgentWorkflow(input, activities, options) two-arg form. The workflow exits on every HITL boundary, returning a suspended_* status that the executor observes; resume creates a new workflow instance with id ${prefix}__${agentType}__${sessionId}__resume-${resumeCount} (single-dash separator per spec §5).
TemporalAgentExecutor
Executor that starts and manages Temporal workflows. Same API as JSAgentExecutor.
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
const executor = new TemporalAgentExecutor({
client, // Temporal Client
taskQueue, // Task queue name
stateStore, // SessionStateStore
streamManager, // StreamManager
logger, // Optional
});
// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
// Get existing handle
const handle = await executor.getHandle(MyAgent, 'session-123');
// Check if resumable
const result = await executor.canResume(MyAgent, 'session-123');AgentExecutionHandle
Handle returned by execute() for interacting with a running agent.
Properties
handle.sessionId; // Unique run identifier
handle.streamId; // Stream identifierstream
Get a stream of events from the execution.
const stream = await handle.stream();
for await (const chunk of stream) {
switch (chunk.type) {
case 'text_delta':
process.stdout.write(chunk.delta);
break;
case 'tool_start':
console.log(`Tool: ${chunk.toolName}`);
break;
case 'run_interrupted':
console.log(`Interrupted: ${chunk.reason}`);
break;
}
}result
Wait for the execution to complete and get the result.
const result = await handle.result();
if (result.status === 'completed') {
console.log('Output:', result.output);
} else if (result.status === 'interrupted') {
console.log('Interrupted:', result.interruptReason);
}abort
Cancel the execution. This is a HARD stop — the agent fails and cannot be resumed. v7 writes a durable abort flag observed at the next step boundary.
await handle.abort('User requested cancellation');interrupt
Interrupt execution for later resumption. This is a SOFT stop — the agent can be resumed.
await handle.interrupt('user_requested');
// Status is now 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'The interrupt writes the durable interrupt flag, which is observed at the next step boundary by consumeInterruptFlag. The workflow checkpoints state and exits gracefully.
canResume
Check if execution can be resumed.
const { canResume, reason } = await handle.canResume();
if (canResume) {
const resumed = await handle.resume();
}resume
Resume interrupted, paused, or HITL-suspended execution. Returns a new handle for the resumed execution. Resume starts a new workflow instance with workflow ID suffix __resume-N (incremented per resume) and mode: 'resume' so the workflow body enters via applyResultsAndReload.
// Continue from where it stopped
const newHandle = await handle.resume();
// Resume with a new message
const newHandle = await handle.resume({
mode: 'with_message',
message: 'Continue with this context',
});
// Time-travel to a specific checkpoint
const newHandle = await handle.resume({
mode: 'from_checkpoint',
checkpointId: 'cpv1-session-123-s5-...',
});getState
Get current agent state.
const state = await handle.getState();
console.log('Status:', state.status);
console.log('Step count:', state.stepCount);send
Continue the conversation after completion. Returns a new handle.
const handle1 = await executor.execute(agent, 'Hello');
await handle1.result();
const handle2 = await handle1.send('Tell me more');
const result = await handle2.result();DTOs
Type-safe schemas for workflow and activity inputs/outputs.
Session vs Run Identifiers
sessionId: Primary key for state storage. A session contains all messages, state, and checkpoints.runId: Identifies a specific workflow execution. Multiple runs can occur within one session.
In Temporal, the workflow ID is tied to sessionId for session continuity. The runId is execution metadata for tracing.
Workflow DTOs
import {
AgentWorkflowInputSchema,
AgentWorkflowResultSchema,
type AgentWorkflowInput,
type AgentWorkflowResult,
} from '@helix-agents/runtime-temporal';
interface AgentWorkflowInput {
agentType: string;
sessionId: string;
streamId?: string;
newMessages?: Array<{ role: 'user' | 'assistant' | 'system'; content?: string /* ... */ }>;
initialState?: Record<string, unknown>;
parentSessionId?: string;
parentStreamId?: string;
userId?: string;
tags?: string[];
metadata?: Record<string, string>;
branch?: { fromSessionId: string; checkpointId?: string; messageIndex?: number };
files?: Array<{ data: string; mediaType: string; filename?: string }>;
maxSteps?: number;
clientToolTimeoutMs?: number;
workflowIdPrefix?: string;
runId?: string;
/**
* v7 entry mode (A.2 rewrite). `'fresh'` runs `initializeAgentState`
* and starts the loop from scratch; `'resume'` runs
* `applyResultsAndReload` to drain submissions before re-entering
* the loop. Set by `TemporalAgentExecutor.resume()`.
*/
mode?: 'fresh' | 'resume';
/**
* Resume counter (v7). Atomically incremented on each
* `executor.resume()` call. The workflow ID for the resumed
* instance is `${prefix}__${agentType}__${sessionId}__resume-${resumeCount}`.
*/
resumeCount?: number;
}
interface AgentWorkflowResult {
sessionId: string;
status:
| 'completed'
| 'failed'
| 'cancelled'
| 'interrupted'
// v7 stateless-suspension statuses (A.2):
| 'suspended_client_tool'
| 'suspended_awaiting_children'
| 'suspended_step_partial';
output?: unknown;
error?: string;
checkpointId?: string;
suspended?: {
/* discriminated union per status */
};
}Activity DTOs
import {
InitializeAgentInputSchema,
ExecuteAgentStepInputSchema,
ExecuteToolCallInputSchema,
AgentStepResultSchema,
ToolCallResultSchema,
CommitStepInputSchema,
// ... and more
} from '@helix-agents/runtime-temporal';Key fields added for error classification:
AgentStepResultSchema— IncludeserrorCode: z.string().optional()for propagating error classification from activities to workflow.FailAgentStreamInputSchema— IncludeserrorCode: z.string().optional()for writing classified error chunks when failing a stream.
Signal DTOs
v7 deleted the v6/early-v7 signals (submitToolResult, interruptAgent, runResumed, childSuspended, childWoke). Submissions are durable state, resume creates a fresh workflow instance with a __resume-N suffix, and child completion is observed via state-store reads. The remaining signal-related exports are pause/resume/abort scaffolding for backward-compat:
import {
PauseAgentSignalSchema,
ResumeAgentSignalSchema,
AbortAgentSignalSchema,
// Cross-process interrupt signal name (the only signal the workflow
// body still listens to — kept for back-compat with adapters that
// call `handle.signal(INTERRUPT_SIGNAL_NAME, reason)`. The durable
// interrupt flag is canonical.)
INTERRUPT_SIGNAL_NAME,
} from '@helix-agents/runtime-temporal';Cancellation & Heartbeats
The runtime-temporal package does not export bespoke cancellation or heartbeat helpers. Use Temporal's native primitives directly inside activities:
import { Context, heartbeat } from '@temporalio/activity';
export async function longRunningActivity(input: ActivityInput) {
const cancelSignal = Context.current().cancellationSignal;
// Periodic heartbeats so Temporal knows the activity is alive
const interval = setInterval(() => heartbeat(), 5_000);
try {
await doWork({ signal: cancelSignal });
} finally {
clearInterval(interval);
}
}Temporal automatically cancels long-running activities via heartbeatTimeout configuration on proxyActivities. No additional helpers are needed from @helix-agents/runtime-temporal.
Complete Example
activities.ts
import { GenericActivities, AgentRegistry } from '@helix-agents/runtime-temporal';
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { MyAgent } from './agent.js';
export function createActivities() {
const registry = new AgentRegistry();
registry.register(MyAgent);
const activities = new GenericActivities({
stateStore: new InMemoryStateStore(),
streamManager: new InMemoryStreamManager(),
llmAdapter: new VercelAIAdapter(),
agentResolver: {
resolveAgent: (type) => registry.resolve(type, { env: undefined, sessionId: '', runId: '' }),
},
});
return {
initializeAgentState: activities.initializeAgentState.bind(activities),
applyResultsAndReload: activities.applyResultsAndReload.bind(activities),
refreshState: activities.refreshState.bind(activities),
runLLMStep: activities.runLLMStep.bind(activities),
commitStep: activities.commitStep.bind(activities),
commitSuspendedStep: activities.commitSuspendedStep.bind(activities),
runPhase2FinishWith: activities.runPhase2FinishWith.bind(activities),
evaluateApprovalGate: activities.evaluateApprovalGate.bind(activities),
executeCompanionToolCall: activities.executeCompanionToolCall.bind(activities),
executeRemoteSubAgentCall: activities.executeRemoteSubAgentCall.bind(activities),
recordSubAgentResult: activities.recordSubAgentResult.bind(activities),
recordSubSessionResult: activities.recordSubSessionResult.bind(activities),
addRemoteSubSessionRef: activities.addRemoteSubSessionRef.bind(activities),
markPersistentChildCompleted: activities.markPersistentChildCompleted.bind(activities),
commitChildRespawnResults: activities.commitChildRespawnResults.bind(activities),
loadChildState: activities.loadChildState.bind(activities),
appendMessages: activities.appendMessages.bind(activities),
consumeInterruptFlag: activities.consumeInterruptFlag.bind(activities),
persistTerminalState: activities.persistTerminalState.bind(activities),
emitChunk: activities.emitChunk.bind(activities),
emitSuspensionMarker: activities.emitSuspensionMarker.bind(activities),
};
}Quick start workflow file
// workflows.ts — must be bundled separately (no Node built-ins).
import { agentWorkflow as runAgentWorkflow } from '@helix-agents/runtime-temporal/workflow';
import type { AgentWorkflowInput, AgentWorkflowResult } from '@helix-agents/runtime-temporal';
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
return runAgentWorkflow(input);
}worker.ts
import { Worker, bundleWorkflowCode, NativeConnection } from '@temporalio/worker';
import { createActivities } from './activities.js';
async function main() {
const connection = await NativeConnection.connect({ address: 'localhost:7233' });
const workflowBundle = await bundleWorkflowCode({
workflowsPath: './workflows.ts',
});
const worker = await Worker.create({
connection,
taskQueue: 'my-agents',
workflowBundle,
activities: createActivities(),
});
await worker.run();
}See examples/research-assistant-temporal/src/{worker,client,workflows,activities}.ts for the canonical v7 wiring.
Interrupt and Resume
The Temporal runtime provides durable interrupt/resume via the v7 stateless-suspension model.
v7 Stateless Suspension Model
When the agent hits a HITL boundary (client-executed tool, approval gate, awaiting sub-agent children), the workflow exits with a suspended status:
| Status | Meaning |
|---|---|
'suspended_client_tool' | A client-executed tool is awaiting submitToolResult. |
'suspended_awaiting_children' | A sub-agent child is still running (or itself suspended). |
'suspended_step_partial' | The step ran partially before suspending (e.g. mid-batch tools). |
Submissions are durable state stored on the session. When the caller calls executor.resume():
- The executor atomically increments
resumeCounton the session. - A new workflow instance is started with id
${prefix}__${agentType}__${sessionId}__resume-${resumeCount}(single-dash separator per spec §5;executor.ts:1252). - The workflow input is
{ ..., mode: 'resume', resumeCount }(perworkflow.ts:497). - The workflow body runs
applyResultsAndReloadfirst, which drains submitted client-tool results, completed children, and approval-gate decisions into the agent's message history. - The main loop re-enters from the reloaded state.
There are no submitToolResult / runResumed / childSuspended / childWoke signals — those were deleted in A.2 Task 3.3. Resume composes via fresh workflow instances; HTTP callers go through POST /submit-tool-result (handled by the executor's durable submission state) and then POST /resume.
Cross-process interrupt
handle.interrupt() writes a durable interrupt flag observed at every step boundary by consumeInterruptFlag (sub-second cadence). The legacy INTERRUPT_SIGNAL_NAME workflow signal is still wired for backward-compat with platform adapters that call handle.signal(INTERRUPT_SIGNAL_NAME, reason), but the durable flag is canonical. Cross-replica interrupts work without sending a Temporal signal at all.
Resuming after process restart
Temporal provides built-in durability. After worker restart, reconnect via the executor:
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
import { Client, Connection } from '@temporalio/client';
const connection = await Connection.connect({ address: 'localhost:7233' });
const client = new Client({ connection });
const executor = new TemporalAgentExecutor({
client,
taskQueue: 'my-agents',
stateStore,
streamManager,
});
// Reconnect to the existing session and resume
const handle = await executor.getHandle(MyAgent, savedSessionId);
if (handle) {
const { canResume } = await handle.canResume();
if (canResume) {
const resumed = await handle.resume();
const result = await resumed.result();
}
}Workflow execution semantics
| Action | Workflow Behavior |
|---|---|
interrupt() | Writes durable interrupt flag → workflow observes at next step → checkpoints + exits as interrupted. |
resume() | Starts new workflow instance __resume-N with mode: 'resume'; applyResultsAndReload drains state. |
| HITL suspension | Workflow exits with suspended_* status; no signals required. |
abort() | Writes durable abort flag → workflow observes at next step → terminates as cancelled. |