@helix-agents/agent-server
HTTP server for hosting agents remotely. Provides AgentServer for agent lifecycle management, HTTP adapters for routing, and SSE streaming utilities.
Installation
npm install @helix-agents/agent-serverAgentServer
The main class for hosting agents over HTTP.
Constructor
import { AgentServer } from '@helix-agents/agent-server';
const server = new AgentServer(config: AgentServerConfig);AgentServerConfig
interface AgentServerConfig {
/** Registry of agents this server can run. Keys are agent type identifiers. */
agents: Record<string, AgentConfig<any, any>>;
/** State store for persisting session state */
stateStore: SessionStateStore;
/** Stream manager for real-time event streaming */
streamManager: StreamManager;
/** Executor (any runtime) */
executor: AgentExecutor;
/**
* Optional authentication hook. Called before each HTTP operation; see
* {@link AuthenticateHook}. If omitted, the constructor throws unless
* `allowUnauthenticated: true` is also set.
*/
authenticate?: AuthenticateHook;
/**
* Explicit opt-in for running without an `authenticate` hook. The
* constructor emits a loud warning when set.
*/
allowUnauthenticated?: boolean;
/** Optional structured logger (defaults to noop). */
logger?: Logger;
/**
* Optional override for the `/submit-tool-result` payload schema.
* Defaults to the conservative `SubmitToolResultSchema` from core
* (256-char IDs, 8 KB error strings, 1 MB serialized result).
*/
submitToolResultSchema?: typeof SubmitToolResultSchema;
/** Whether to leak Zod issue details in 400 responses (defaults `false`). */
verboseValidationErrors?: boolean;
/** Optional human-readable service name surfaced in startup logs. */
serviceName?: string;
/**
* Workspace provider kinds wired into the underlying executor. When
* set, AgentServer validates at construction that every agent declaring
* a workspace provider kind has a matching entry here.
*/
workspaceProviderKinds?: ReadonlyArray<string>;
/**
* Phase 7 chat-handler orchestrator (Layer B). Optional — when set,
* the `/chat` family of routes is wired in. When omitted, those
* routes respond `501 NOT_IMPLEMENTED` and the legacy routes
* remain fully operational. Wire by binding `handleChatStream` from
* `@helix-agents/ai-sdk`:
*
* ```ts
* import { handleChatStream } from '@helix-agents/ai-sdk';
*
* new AgentServer({
* ...,
* chatHandler: (params) => handleChatStream(
* { executor, stateStore, streamManager, agent, logger },
* params,
* ),
* });
* ```
*/
chatHandler?: ChatStreamHandler;
/**
* Deadline (ms) the durable interrupt protocol waits for a status
* transition out of `'active'`/`'paused'` after writing the interrupt
* flag. Default 5000 ms. When the deadline elapses, `interruptAgent` /
* `abortAgent` throw an Error with `deadline exceeded`; the HTTP
* handler maps that to a 504 with code `INTERRUPT_DEADLINE_EXCEEDED`
* (replaces the v6 503 `INTERRUPT_NOT_LOCAL` code).
*/
interruptObservationDeadlineMs?: number;
}Methods
startAgent
Start a new agent execution. Idempotent — returns the existing session if already running.
const response = await server.startAgent(request: RemoteStartRequest): Promise<RemoteStartResponse>;Parameters:
interface RemoteStartRequest {
sessionId: string; // Unique session identifier
agentType: string; // Must match a key in the agents registry
message: string | UserInputMessage[]; // Initial user message (string or structured array)
state?: Record<string, unknown>; // Optional initial custom state
metadata?: Record<string, string>; // Optional metadata
}Returns:
interface RemoteStartResponse {
sessionId: string;
streamId: string;
runId: string;
}Errors:
NOT_FOUND— UnknownagentTypeALREADY_COMPLETED— Session already completed or failed
resumeAgent
Resume an interrupted or paused agent execution.
const response = await server.resumeAgent(request: RemoteResumeRequest): Promise<RemoteStartResponse>;Parameters:
interface RemoteResumeRequest {
sessionId: string;
message?: string | UserInputMessage[]; // Optional message to send on resume
}Errors:
NOT_FOUND— Session not foundALREADY_COMPLETED— Session already completed or failed
streamAgent
Subscribe to agent execution events. Returns an async iterable of transport events.
const events = server.streamAgent(
sessionId: string,
fromSequence?: number
): AsyncIterable<TransportEvent>;Parameters:
sessionId— Session to streamfromSequence— Skip events up to this sequence number (for reconnection)
Transport events:
type TransportEvent =
| { type: 'chunk'; chunk: StreamChunk; sequence: number }
| { type: 'end'; output?: unknown; state?: Record<string, unknown> }
| { type: 'error'; error: string; recoverable: boolean };getStatus
Get the current execution status for a session.
const status = await server.getStatus(sessionId: string): Promise<RemoteStatusResponse>;Returns:
interface RemoteStatusResponse {
sessionId: string;
runId?: string;
status: 'running' | 'completed' | 'failed' | 'interrupted' | 'paused';
stepCount: number;
output?: unknown;
state?: Record<string, unknown>;
error?: string;
isExecuting: boolean;
streamId: string;
latestSequence: number; // Current stream position from StreamManager.getStreamInfo()
}latestSequence reflects the actual stream position from the stream manager. Clients can use this value as fromSequence when reconnecting to /sse to avoid replaying already-seen events.
interruptAgent
Soft stop — the agent can be resumed later. v7 uses a durable interrupt flag observed at every step boundary, so this works across runtimes (no longer limited to the local-handle case).
await server.interruptAgent(sessionId: string, reason?: string): Promise<void>;Errors:
NOT_FOUND— Session not found- Throws an
Errorcontainingdeadline exceededif the underlying runtime fails to observe the flag withininterruptObservationDeadlineMs(default 5000 ms). The HTTP adapter maps that to 504INTERRUPT_DEADLINE_EXCEEDED.
abortAgent
Hard stop — the agent cannot be resumed. Same durable-flag mechanism as interruptAgent.
await server.abortAgent(sessionId: string, reason?: string): Promise<void>;Errors:
NOT_FOUND— Session not found- 504
INTERRUPT_DEADLINE_EXCEEDEDif the runtime does not observe the flag withininterruptObservationDeadlineMs.
submitToolResult
Submit a result for a paused client-executed tool. Validated against submitToolResultSchema (or your override).
const result = await server.submitToolResult(payload: SubmitToolResult): Promise<SubmitToolResultResponse>;describeWorkspace
Operator-facing introspection of the agent's workspace registry for a session. Returns a discriminated union: { kind: 'ok', snapshot: EntrySnapshot | null }, { kind: 'no-runtime-support' }, or { kind: 'no-registry' }. Most consumers should use the HTTP GET /workspace?sessionId=… route instead (response { workspace: EntrySnapshot | null }, operation tag 'workspace').
Durable interrupt model
v7 replaced v6's in-memory handle tracking with a durable-flag protocol observed at every step boundary by every runtime. interruptAgent and abortAgent work whether the session is running on this replica, a different replica, or about to be picked up by a worker — no INTERRUPT_NOT_LOCAL 503 code remains. Cross-replica interrupts surface as 504 INTERRUPT_DEADLINE_EXCEEDED only when the underlying runtime fails to observe the flag within interruptObservationDeadlineMs.
AgentServerError
Error class thrown by AgentServer methods.
class AgentServerError extends Error {
code:
| 'NOT_FOUND'
| 'ALREADY_RUNNING'
| 'ALREADY_COMPLETED'
| 'INVALID_REQUEST'
| 'INTERNAL_ERROR'
| 'WORKSPACE_WIRING'
| 'NOT_IMPLEMENTED';
}HTTP status mapping:
| Code | HTTP Status |
|---|---|
NOT_FOUND | 404 |
ALREADY_RUNNING | 409 |
ALREADY_COMPLETED | 409 |
INVALID_REQUEST | 400 |
INTERNAL_ERROR | 500 |
WORKSPACE_WIRING | 500 |
NOT_IMPLEMENTED | 501 |
INTERRUPT_DEADLINE_EXCEEDED* | 504 |
PAYLOAD_TOO_LARGE* | 413 |
LENGTH_REQUIRED* | 411 |
UNAUTHORIZED* | 401 / 403 |
* Surfaced from the HTTP adapter directly rather than from AgentServerError.
Authentication
AuthenticateHook
Authentication hook called before every HTTP operation. Set via AgentServerConfig.authenticate; the server is fail-closed without it (constructor throws unless allowUnauthenticated: true is set).
type AuthenticateHook = (
request: AgentHttpRequest,
operation: AgentServerOperation
) => Promise<AuthenticateResult> | AuthenticateResult;
type AuthenticateResult = boolean | { error: string; status?: number };true → request proceeds. false → 401 generic. { error, status? } → custom rejection (use status: 403 for "authenticated but not authorized").
AgentServerOperation
Stable operation tags passed to the hook. Names are NOT 1:1 with paths — they're preserved across releases so existing switches keep working.
type AgentServerOperation =
| 'start'
| 'resume'
| 'stream' // GET /sse
| 'status'
| 'interrupt'
| 'abort'
| 'submit-tool-result'
| 'workspace'
// Layer B chat routes (Phase 7 Task 7.8)
| 'chat'
| 'chat-stream'
| 'chat-submit-tool-result'
| 'chat-interrupt'
| 'chat-abort';HTTP Adapters
createHttpAdapter
Creates a generic HTTP handler from an AgentServer instance. Framework-agnostic.
import { createHttpAdapter } from '@helix-agents/agent-server';
const handler: AgentHttpHandler = createHttpAdapter(server);AgentHttpHandler signature:
interface AgentHttpHandler {
handleRequest(req: AgentHttpRequest): Promise<AgentHttpResponse>;
}AgentHttpRequest:
interface AgentHttpRequest {
method: string;
url: string;
headers: Record<string, string | string[] | undefined>;
body?: unknown;
query?: Record<string, string | string[] | undefined>;
}AgentHttpResponse:
interface AgentHttpResponse {
status: number;
headers: Record<string, string>;
body: string | ReadableStream<Uint8Array>;
}Routes handled:
| Layer | Method | Path | Action |
|---|---|---|---|
| A | POST | /start | Start agent execution |
| A | POST | /resume | Resume agent execution |
| A | GET | /sse | SSE event stream |
| A | GET | /status | Get execution status |
| A | POST | /interrupt | Soft stop (durable-flag protocol) |
| A | POST | /abort | Hard stop (durable-flag protocol) |
| A | POST | /submit-tool-result | Submit a client-executed tool result |
| A | GET | /workspace | Operator introspection of the workspace registry for a session |
| B | POST | /chat | AI SDK v6-native chat stream (requires chatHandler) |
| B | GET | /chat/{sessionId}/stream | Reconnect to an active chat stream |
| B | POST | /chat/{sessionId}/submit-tool-result | Chat-route variant of /submit-tool-result (sessionId in path) |
| B | POST | /chat/{sessionId}/interrupt | Chat-route soft stop |
| B | POST | /chat/{sessionId}/abort | Chat-route hard stop |
Layer B routes respond 501 NOT_IMPLEMENTED when the server is constructed without a chatHandler.
createExpressAdapter
Wraps an AgentHttpHandler as Express middleware.
import { createExpressAdapter } from '@helix-agents/agent-server';
const middleware = createExpressAdapter(handler: AgentHttpHandler);
app.use('/agents', middleware);Features:
- Uses
req.path(relative to mount point) for routing - Handles both string and
ReadableStreamresponses - Pipes SSE streams with proper disconnect cleanup
SSE Utilities
createSSEStream
Creates a pull-based ReadableStream that emits SSE-formatted data from an async source. The consumer controls the pace — no unbounded buffering. Heartbeats are emitted as SSE comments when the source has no events within the heartbeat interval.
import { createSSEStream, type SSEEvent } from '@helix-agents/agent-server';
async function* events(): AsyncGenerator<SSEEvent> {
yield { id: '1', event: 'chunk', data: '{"hello":"world"}' };
yield { event: 'end', data: '{"done":true}' };
}
const stream: ReadableStream<Uint8Array> = createSSEStream(events(), {
heartbeatIntervalMs: 15_000, // default
});SSEEvent:
interface SSEEvent {
id?: string;
event: string;
data: string;
}The stream automatically sends : heartbeat comments every 15 seconds (configurable). The stream closes when the source is exhausted; errors are logged and the stream closes gracefully.
expiredSessionCleanup
Operator-driven sweep that scans the state store for sessions whose expiresAt has elapsed, prunes their workspace snapshots (best-effort), and CAS-transitions the session status to 'failed' with reason 'session_expired'. Not run automatically by the framework — wire it into a cron / Cloudflare Alarm / k8s CronJob yourself.
import { expiredSessionCleanup } from '@helix-agents/agent-server';
const summary = await expiredSessionCleanup({
stateStore,
workspaceProviders, // optional Map<string, WorkspaceProvider>
logger, // optional structured logger (defaults to noop)
pageSize: 200, // optional override (default 200)
now: () => Date.now(), // optional clock override for tests
});Returns:
interface ExpiredSessionCleanupResult {
detected: number; // expired sessions detected
marked: number; // successfully transitioned to 'failed'
alreadyTerminal: number; // CAS rejected (already terminal)
snapshotsDeleted: number; // snapshots successfully deleted
errors: number; // per-session errors that were logged
}Per-session failures (load error, snapshot error, CAS conflict) are logged but do not abort the loop — one bad session must not block cleanup of the rest.
HttpRemoteAgentTransport
Client-side transport for communicating with a remote AgentServer. Defined in @helix-agents/core.
import { HttpRemoteAgentTransport } from '@helix-agents/core';Constructor
interface HttpTransportConfig {
/** Base URL of the remote agent server */
url: string;
/** Static headers or async function returning headers */
headers?:
| Record<string, string>
| (() => Record<string, string> | Promise<Record<string, string>>);
/** Maximum retry attempts for failed requests (default: 3) */
maxRetries?: number;
/** Base delay between retries in ms (default: 1000) */
retryBaseDelayMs?: number;
}Methods
Implements the RemoteAgentTransport interface:
interface RemoteAgentTransport {
start(request: RemoteStartRequest): Promise<RemoteStartResponse>;
resume(request: RemoteResumeRequest): Promise<RemoteStartResponse>;
stream(
sessionId: string,
options?: {
fromSequence?: number;
signal?: AbortSignal;
}
): AsyncIterable<TransportEvent>;
getStatus(sessionId: string): Promise<RemoteStatusResponse>;
interrupt(sessionId: string, reason?: string): Promise<void>;
abort(sessionId: string, reason?: string): Promise<void>;
}Retry Behavior
- 5xx errors — Retried with exponential backoff (
baseDelay * 2^attempt) - 4xx errors — Not retried
- Network errors — Retried with backoff
SSE Parsing
The transport uses native ReadableStream parsing (not EventSource) for SSE consumption. It handles:
- Multi-line
data:fields id:fields for sequence trackingevent:fields for event type discrimination:heartbeatcomments (ignored)AbortSignalfor stream cancellation
createRemoteSubAgentTool
Factory function for creating a tool that delegates to a remote agent. Defined in @helix-agents/core.
import { createRemoteSubAgentTool } from '@helix-agents/core';Signature
function createRemoteSubAgentTool<TInput, TOutput>(
name: string,
config: {
description: string;
inputSchema: z.ZodType<TInput>;
outputSchema: z.ZodType<TOutput>;
transport: RemoteAgentTransport;
remoteAgentType: string;
timeoutMs: number;
streamRetries?: number;
streamRetryBaseMs?: number;
}
): RemoteSubAgentTool<TInput, TOutput>;Parameters
| Parameter | Type | Description |
|---|---|---|
name | string | Tool name (automatically prefixed with subagent__) |
description | string | Description shown to the LLM |
inputSchema | z.ZodType | Zod schema for tool input |
outputSchema | z.ZodType | Zod schema for expected output |
transport | RemoteAgentTransport | Transport instance for HTTP communication |
remoteAgentType | string | Agent type identifier (must match server registry key) |
timeoutMs | number | Maximum execution time in milliseconds (must be a finite positive number) |
streamRetries | number (optional) | Max stream reconnection attempts on SSE drop (default: 3, range: 0-50) |
streamRetryBaseMs | number (optional) | Base delay in ms between stream retries, doubled each attempt (default: 100) |
RemoteSubAgentTool
interface RemoteSubAgentTool<TInput, TOutput> extends Tool<TInput, TOutput> {
_isRemoteSubAgent: true;
_remoteConfig: {
transport: RemoteAgentTransport;
remoteAgentType: string;
timeoutMs: number;
streamRetries: number; // Default: 3
streamRetryBaseMs: number; // Default: 100
};
}Input-to-Message Mapping
When the tool executes, the full tool input is JSON-serialized and sent as the message field in the RemoteStartRequest. The remote agent server is responsible for parsing the JSON and constructing the appropriate user message for its agent.
Runtime Behavior
All three runtimes provide first-class remote sub-agent support:
- JS Runtime — Intercepts via
isRemoteSubAgentTool()for enhanced handling: stream proxying,SubSessionReftracking withremotemetadata, timeout management viaAbortSignal, reconnection on resume - Temporal Runtime — Dedicated
executeRemoteSubAgentCallactivity with deterministic session IDs, crash recovery viatransport.getStatus(), heartbeat-based reconnection, stream proxying, and interrupt propagation - Cloudflare Runtime — Dedicated
executeRemoteSubAgentCallstep with deterministic session IDs, crash recovery, stream proxying, interrupt propagation via abort-check interval, and timeout enforcement
The tool's built-in execute() method exists as a fallback but is not used by any of the pre-built runtimes.
Types
All types are exported from @helix-agents/agent-server:
import type {
AgentServerConfig,
AgentHttpHandler,
AgentHttpRequest,
AgentHttpResponse,
AgentServerOperation,
AuthenticateHook,
AuthenticateResult,
ChatStreamHandler,
ExpiredSessionCleanupDeps,
ExpiredSessionCleanupResult,
} from '@helix-agents/agent-server';Transport protocol types are exported from @helix-agents/core:
import type {
RemoteStartRequest,
RemoteResumeRequest,
RemoteStartResponse,
RemoteStatusResponse,
RemoteAgentErrorResponse,
TransportEvent,
RemoteAgentTransport,
} from '@helix-agents/core';See Also
- Remote Agents Guide — Concepts, patterns, and production setup
- Remote Agents Example — Full working example
- Sub-Agents Guide — Local sub-agent orchestration