Skip to content

@helix-agents/agent-server

HTTP server for hosting agents remotely. Provides AgentServer for agent lifecycle management, HTTP adapters for routing, and SSE streaming utilities.

Installation

bash
npm install @helix-agents/agent-server

AgentServer

The main class for hosting agents over HTTP.

Constructor

typescript
import { AgentServer } from '@helix-agents/agent-server';

const server = new AgentServer(config: AgentServerConfig);

AgentServerConfig

typescript
interface AgentServerConfig {
  /** Registry of agents this server can run. Keys are agent type identifiers. */
  agents: Record<string, AgentConfig<any, any>>;

  /** State store for persisting session state */
  stateStore: SessionStateStore;

  /** Stream manager for real-time event streaming */
  streamManager: StreamManager;

  /** Executor (any runtime) */
  executor: AgentExecutor;

  /**
   * Optional authentication hook. Called before each HTTP operation; see
   * {@link AuthenticateHook}. If omitted, the constructor throws unless
   * `allowUnauthenticated: true` is also set.
   */
  authenticate?: AuthenticateHook;

  /**
   * Explicit opt-in for running without an `authenticate` hook. The
   * constructor emits a loud warning when set.
   */
  allowUnauthenticated?: boolean;

  /** Optional structured logger (defaults to noop). */
  logger?: Logger;

  /**
   * Optional override for the `/submit-tool-result` payload schema.
   * Defaults to the conservative `SubmitToolResultSchema` from core
   * (256-char IDs, 8 KB error strings, 1 MB serialized result).
   */
  submitToolResultSchema?: typeof SubmitToolResultSchema;

  /** Whether to leak Zod issue details in 400 responses (defaults `false`). */
  verboseValidationErrors?: boolean;

  /** Optional human-readable service name surfaced in startup logs. */
  serviceName?: string;

  /**
   * Workspace provider kinds wired into the underlying executor. When
   * set, AgentServer validates at construction that every agent declaring
   * a workspace provider kind has a matching entry here.
   */
  workspaceProviderKinds?: ReadonlyArray<string>;

  /**
   * Phase 7 chat-handler orchestrator (Layer B). Optional — when set,
   * the `/chat` family of routes is wired in. When omitted, those
   * routes respond `501 NOT_IMPLEMENTED` and the legacy routes
   * remain fully operational. Wire by binding `handleChatStream` from
   * `@helix-agents/ai-sdk`:
   *
   * ```ts
   * import { handleChatStream } from '@helix-agents/ai-sdk';
   *
   * new AgentServer({
   *   ...,
   *   chatHandler: (params) => handleChatStream(
   *     { executor, stateStore, streamManager, agent, logger },
   *     params,
   *   ),
   * });
   * ```
   */
  chatHandler?: ChatStreamHandler;

  /**
   * Deadline (ms) the durable interrupt protocol waits for a status
   * transition out of `'active'`/`'paused'` after writing the interrupt
   * flag. Default 5000 ms. When the deadline elapses, `interruptAgent` /
   * `abortAgent` throw an Error with `deadline exceeded`; the HTTP
   * handler maps that to a 504 with code `INTERRUPT_DEADLINE_EXCEEDED`
   * (replaces the v6 503 `INTERRUPT_NOT_LOCAL` code).
   */
  interruptObservationDeadlineMs?: number;
}

Methods

startAgent

Start a new agent execution. Idempotent — returns the existing session if already running.

typescript
const response = await server.startAgent(request: RemoteStartRequest): Promise<RemoteStartResponse>;

Parameters:

typescript
interface RemoteStartRequest {
  sessionId: string; // Unique session identifier
  agentType: string; // Must match a key in the agents registry
  message: string | UserInputMessage[]; // Initial user message (string or structured array)
  state?: Record<string, unknown>; // Optional initial custom state
  metadata?: Record<string, string>; // Optional metadata
}

Returns:

typescript
interface RemoteStartResponse {
  sessionId: string;
  streamId: string;
  runId: string;
}

Errors:

  • NOT_FOUND — Unknown agentType
  • ALREADY_COMPLETED — Session already completed or failed

resumeAgent

Resume an interrupted or paused agent execution.

typescript
const response = await server.resumeAgent(request: RemoteResumeRequest): Promise<RemoteStartResponse>;

Parameters:

typescript
interface RemoteResumeRequest {
  sessionId: string;
  message?: string | UserInputMessage[]; // Optional message to send on resume
}

Errors:

  • NOT_FOUND — Session not found
  • ALREADY_COMPLETED — Session already completed or failed

streamAgent

Subscribe to agent execution events. Returns an async iterable of transport events.

typescript
const events = server.streamAgent(
  sessionId: string,
  fromSequence?: number
): AsyncIterable<TransportEvent>;

Parameters:

  • sessionId — Session to stream
  • fromSequence — Skip events up to this sequence number (for reconnection)

Transport events:

typescript
type TransportEvent =
  | { type: 'chunk'; chunk: StreamChunk; sequence: number }
  | { type: 'end'; output?: unknown; state?: Record<string, unknown> }
  | { type: 'error'; error: string; recoverable: boolean };

getStatus

Get the current execution status for a session.

typescript
const status = await server.getStatus(sessionId: string): Promise<RemoteStatusResponse>;

Returns:

typescript
interface RemoteStatusResponse {
  sessionId: string;
  runId?: string;
  status: 'running' | 'completed' | 'failed' | 'interrupted' | 'paused';
  stepCount: number;
  output?: unknown;
  state?: Record<string, unknown>;
  error?: string;
  isExecuting: boolean;
  streamId: string;
  latestSequence: number; // Current stream position from StreamManager.getStreamInfo()
}

latestSequence reflects the actual stream position from the stream manager. Clients can use this value as fromSequence when reconnecting to /sse to avoid replaying already-seen events.

interruptAgent

Soft stop — the agent can be resumed later. v7 uses a durable interrupt flag observed at every step boundary, so this works across runtimes (no longer limited to the local-handle case).

typescript
await server.interruptAgent(sessionId: string, reason?: string): Promise<void>;

Errors:

  • NOT_FOUND — Session not found
  • Throws an Error containing deadline exceeded if the underlying runtime fails to observe the flag within interruptObservationDeadlineMs (default 5000 ms). The HTTP adapter maps that to 504 INTERRUPT_DEADLINE_EXCEEDED.

abortAgent

Hard stop — the agent cannot be resumed. Same durable-flag mechanism as interruptAgent.

typescript
await server.abortAgent(sessionId: string, reason?: string): Promise<void>;

Errors:

  • NOT_FOUND — Session not found
  • 504 INTERRUPT_DEADLINE_EXCEEDED if the runtime does not observe the flag within interruptObservationDeadlineMs.

submitToolResult

Submit a result for a paused client-executed tool. Validated against submitToolResultSchema (or your override).

typescript
const result = await server.submitToolResult(payload: SubmitToolResult): Promise<SubmitToolResultResponse>;

describeWorkspace

Operator-facing introspection of the agent's workspace registry for a session. Returns a discriminated union: { kind: 'ok', snapshot: EntrySnapshot | null }, { kind: 'no-runtime-support' }, or { kind: 'no-registry' }. Most consumers should use the HTTP GET /workspace?sessionId=… route instead (response { workspace: EntrySnapshot | null }, operation tag 'workspace').

Durable interrupt model

v7 replaced v6's in-memory handle tracking with a durable-flag protocol observed at every step boundary by every runtime. interruptAgent and abortAgent work whether the session is running on this replica, a different replica, or about to be picked up by a worker — no INTERRUPT_NOT_LOCAL 503 code remains. Cross-replica interrupts surface as 504 INTERRUPT_DEADLINE_EXCEEDED only when the underlying runtime fails to observe the flag within interruptObservationDeadlineMs.

AgentServerError

Error class thrown by AgentServer methods.

typescript
class AgentServerError extends Error {
  code:
    | 'NOT_FOUND'
    | 'ALREADY_RUNNING'
    | 'ALREADY_COMPLETED'
    | 'INVALID_REQUEST'
    | 'INTERNAL_ERROR'
    | 'WORKSPACE_WIRING'
    | 'NOT_IMPLEMENTED';
}

HTTP status mapping:

CodeHTTP Status
NOT_FOUND404
ALREADY_RUNNING409
ALREADY_COMPLETED409
INVALID_REQUEST400
INTERNAL_ERROR500
WORKSPACE_WIRING500
NOT_IMPLEMENTED501
INTERRUPT_DEADLINE_EXCEEDED*504
PAYLOAD_TOO_LARGE*413
LENGTH_REQUIRED*411
UNAUTHORIZED*401 / 403

* Surfaced from the HTTP adapter directly rather than from AgentServerError.

Authentication

AuthenticateHook

Authentication hook called before every HTTP operation. Set via AgentServerConfig.authenticate; the server is fail-closed without it (constructor throws unless allowUnauthenticated: true is set).

typescript
type AuthenticateHook = (
  request: AgentHttpRequest,
  operation: AgentServerOperation
) => Promise<AuthenticateResult> | AuthenticateResult;

type AuthenticateResult = boolean | { error: string; status?: number };

true → request proceeds. false → 401 generic. { error, status? } → custom rejection (use status: 403 for "authenticated but not authorized").

AgentServerOperation

Stable operation tags passed to the hook. Names are NOT 1:1 with paths — they're preserved across releases so existing switches keep working.

typescript
type AgentServerOperation =
  | 'start'
  | 'resume'
  | 'stream' // GET /sse
  | 'status'
  | 'interrupt'
  | 'abort'
  | 'submit-tool-result'
  | 'workspace'
  // Layer B chat routes (Phase 7 Task 7.8)
  | 'chat'
  | 'chat-stream'
  | 'chat-submit-tool-result'
  | 'chat-interrupt'
  | 'chat-abort';

HTTP Adapters

createHttpAdapter

Creates a generic HTTP handler from an AgentServer instance. Framework-agnostic.

typescript
import { createHttpAdapter } from '@helix-agents/agent-server';

const handler: AgentHttpHandler = createHttpAdapter(server);

AgentHttpHandler signature:

typescript
interface AgentHttpHandler {
  handleRequest(req: AgentHttpRequest): Promise<AgentHttpResponse>;
}

AgentHttpRequest:

typescript
interface AgentHttpRequest {
  method: string;
  url: string;
  headers: Record<string, string | string[] | undefined>;
  body?: unknown;
  query?: Record<string, string | string[] | undefined>;
}

AgentHttpResponse:

typescript
interface AgentHttpResponse {
  status: number;
  headers: Record<string, string>;
  body: string | ReadableStream<Uint8Array>;
}

Routes handled:

LayerMethodPathAction
APOST/startStart agent execution
APOST/resumeResume agent execution
AGET/sseSSE event stream
AGET/statusGet execution status
APOST/interruptSoft stop (durable-flag protocol)
APOST/abortHard stop (durable-flag protocol)
APOST/submit-tool-resultSubmit a client-executed tool result
AGET/workspaceOperator introspection of the workspace registry for a session
BPOST/chatAI SDK v6-native chat stream (requires chatHandler)
BGET/chat/{sessionId}/streamReconnect to an active chat stream
BPOST/chat/{sessionId}/submit-tool-resultChat-route variant of /submit-tool-result (sessionId in path)
BPOST/chat/{sessionId}/interruptChat-route soft stop
BPOST/chat/{sessionId}/abortChat-route hard stop

Layer B routes respond 501 NOT_IMPLEMENTED when the server is constructed without a chatHandler.

createExpressAdapter

Wraps an AgentHttpHandler as Express middleware.

typescript
import { createExpressAdapter } from '@helix-agents/agent-server';

const middleware = createExpressAdapter(handler: AgentHttpHandler);

app.use('/agents', middleware);

Features:

  • Uses req.path (relative to mount point) for routing
  • Handles both string and ReadableStream responses
  • Pipes SSE streams with proper disconnect cleanup

SSE Utilities

createSSEStream

Creates a pull-based ReadableStream that emits SSE-formatted data from an async source. The consumer controls the pace — no unbounded buffering. Heartbeats are emitted as SSE comments when the source has no events within the heartbeat interval.

typescript
import { createSSEStream, type SSEEvent } from '@helix-agents/agent-server';

async function* events(): AsyncGenerator<SSEEvent> {
  yield { id: '1', event: 'chunk', data: '{"hello":"world"}' };
  yield { event: 'end', data: '{"done":true}' };
}

const stream: ReadableStream<Uint8Array> = createSSEStream(events(), {
  heartbeatIntervalMs: 15_000, // default
});

SSEEvent:

typescript
interface SSEEvent {
  id?: string;
  event: string;
  data: string;
}

The stream automatically sends : heartbeat comments every 15 seconds (configurable). The stream closes when the source is exhausted; errors are logged and the stream closes gracefully.

expiredSessionCleanup

Operator-driven sweep that scans the state store for sessions whose expiresAt has elapsed, prunes their workspace snapshots (best-effort), and CAS-transitions the session status to 'failed' with reason 'session_expired'. Not run automatically by the framework — wire it into a cron / Cloudflare Alarm / k8s CronJob yourself.

typescript
import { expiredSessionCleanup } from '@helix-agents/agent-server';

const summary = await expiredSessionCleanup({
  stateStore,
  workspaceProviders, // optional Map<string, WorkspaceProvider>
  logger, // optional structured logger (defaults to noop)
  pageSize: 200, // optional override (default 200)
  now: () => Date.now(), // optional clock override for tests
});

Returns:

typescript
interface ExpiredSessionCleanupResult {
  detected: number; // expired sessions detected
  marked: number; // successfully transitioned to 'failed'
  alreadyTerminal: number; // CAS rejected (already terminal)
  snapshotsDeleted: number; // snapshots successfully deleted
  errors: number; // per-session errors that were logged
}

Per-session failures (load error, snapshot error, CAS conflict) are logged but do not abort the loop — one bad session must not block cleanup of the rest.

HttpRemoteAgentTransport

Client-side transport for communicating with a remote AgentServer. Defined in @helix-agents/core.

typescript
import { HttpRemoteAgentTransport } from '@helix-agents/core';

Constructor

typescript
interface HttpTransportConfig {
  /** Base URL of the remote agent server */
  url: string;

  /** Static headers or async function returning headers */
  headers?:
    | Record<string, string>
    | (() => Record<string, string> | Promise<Record<string, string>>);

  /** Maximum retry attempts for failed requests (default: 3) */
  maxRetries?: number;

  /** Base delay between retries in ms (default: 1000) */
  retryBaseDelayMs?: number;
}

Methods

Implements the RemoteAgentTransport interface:

typescript
interface RemoteAgentTransport {
  start(request: RemoteStartRequest): Promise<RemoteStartResponse>;
  resume(request: RemoteResumeRequest): Promise<RemoteStartResponse>;
  stream(
    sessionId: string,
    options?: {
      fromSequence?: number;
      signal?: AbortSignal;
    }
  ): AsyncIterable<TransportEvent>;
  getStatus(sessionId: string): Promise<RemoteStatusResponse>;
  interrupt(sessionId: string, reason?: string): Promise<void>;
  abort(sessionId: string, reason?: string): Promise<void>;
}

Retry Behavior

  • 5xx errors — Retried with exponential backoff (baseDelay * 2^attempt)
  • 4xx errors — Not retried
  • Network errors — Retried with backoff

SSE Parsing

The transport uses native ReadableStream parsing (not EventSource) for SSE consumption. It handles:

  • Multi-line data: fields
  • id: fields for sequence tracking
  • event: fields for event type discrimination
  • :heartbeat comments (ignored)
  • AbortSignal for stream cancellation

createRemoteSubAgentTool

Factory function for creating a tool that delegates to a remote agent. Defined in @helix-agents/core.

typescript
import { createRemoteSubAgentTool } from '@helix-agents/core';

Signature

typescript
function createRemoteSubAgentTool<TInput, TOutput>(
  name: string,
  config: {
    description: string;
    inputSchema: z.ZodType<TInput>;
    outputSchema: z.ZodType<TOutput>;
    transport: RemoteAgentTransport;
    remoteAgentType: string;
    timeoutMs: number;
    streamRetries?: number;
    streamRetryBaseMs?: number;
  }
): RemoteSubAgentTool<TInput, TOutput>;

Parameters

ParameterTypeDescription
namestringTool name (automatically prefixed with subagent__)
descriptionstringDescription shown to the LLM
inputSchemaz.ZodTypeZod schema for tool input
outputSchemaz.ZodTypeZod schema for expected output
transportRemoteAgentTransportTransport instance for HTTP communication
remoteAgentTypestringAgent type identifier (must match server registry key)
timeoutMsnumberMaximum execution time in milliseconds (must be a finite positive number)
streamRetriesnumber (optional)Max stream reconnection attempts on SSE drop (default: 3, range: 0-50)
streamRetryBaseMsnumber (optional)Base delay in ms between stream retries, doubled each attempt (default: 100)

RemoteSubAgentTool

typescript
interface RemoteSubAgentTool<TInput, TOutput> extends Tool<TInput, TOutput> {
  _isRemoteSubAgent: true;
  _remoteConfig: {
    transport: RemoteAgentTransport;
    remoteAgentType: string;
    timeoutMs: number;
    streamRetries: number; // Default: 3
    streamRetryBaseMs: number; // Default: 100
  };
}

Input-to-Message Mapping

When the tool executes, the full tool input is JSON-serialized and sent as the message field in the RemoteStartRequest. The remote agent server is responsible for parsing the JSON and constructing the appropriate user message for its agent.

Runtime Behavior

All three runtimes provide first-class remote sub-agent support:

  • JS Runtime — Intercepts via isRemoteSubAgentTool() for enhanced handling: stream proxying, SubSessionRef tracking with remote metadata, timeout management via AbortSignal, reconnection on resume
  • Temporal Runtime — Dedicated executeRemoteSubAgentCall activity with deterministic session IDs, crash recovery via transport.getStatus(), heartbeat-based reconnection, stream proxying, and interrupt propagation
  • Cloudflare Runtime — Dedicated executeRemoteSubAgentCall step with deterministic session IDs, crash recovery, stream proxying, interrupt propagation via abort-check interval, and timeout enforcement

The tool's built-in execute() method exists as a fallback but is not used by any of the pre-built runtimes.

Types

All types are exported from @helix-agents/agent-server:

typescript
import type {
  AgentServerConfig,
  AgentHttpHandler,
  AgentHttpRequest,
  AgentHttpResponse,
  AgentServerOperation,
  AuthenticateHook,
  AuthenticateResult,
  ChatStreamHandler,
  ExpiredSessionCleanupDeps,
  ExpiredSessionCleanupResult,
} from '@helix-agents/agent-server';

Transport protocol types are exported from @helix-agents/core:

typescript
import type {
  RemoteStartRequest,
  RemoteResumeRequest,
  RemoteStartResponse,
  RemoteStatusResponse,
  RemoteAgentErrorResponse,
  TransportEvent,
  RemoteAgentTransport,
} from '@helix-agents/core';

See Also

Released under the MIT License.