@helix-agents/core
Core agent framework functionality - types, factories, state management, LLM interface, store interfaces, stream utilities, and orchestration functions.
Installation
npm install @helix-agents/coreAgent Definition
defineAgent
Create an agent configuration.
import { defineAgent } from '@helix-agents/core';
import { z } from 'zod';
const MyAgent = defineAgent({
name: 'my-agent',
description: 'Does something useful',
systemPrompt: 'You are a helpful assistant.',
stateSchema: z.object({ count: z.number().default(0) }),
outputSchema: z.object({ result: z.string() }),
tools: [myTool],
llmConfig: { model: openai('gpt-4o') },
maxSteps: 10,
});Types:
AgentConfig<TStateSchema, TOutputSchema>- Full agent configurationAgent<TState, TOutput>- Shorthand for configured agentLLMConfig- Model configuration (model, temperature, maxOutputTokens, providerOptions)
Tool Definition
defineTool
Create a tool that agents can use.
import { defineTool } from '@helix-agents/core';
import { z } from 'zod';
const myTool = defineTool({
name: 'my_tool',
description: 'Does something',
inputSchema: z.object({ query: z.string() }),
outputSchema: z.object({ result: z.string() }),
execute: async (input, context) => {
return { result: 'done' };
},
});Types:
Tool<TInput, TOutput>- Tool definitionToolConfig<TInput, TOutput>- Tool configurationToolContext- Context passed to execute function
createSubAgentTool
Create a tool that invokes a sub-agent. The sub-agent must have an outputSchema defined.
import { createSubAgentTool, defineAgent } from '@helix-agents/core';
import { z } from 'zod';
// First, define the sub-agent with an outputSchema
const WorkerAgent = defineAgent({
name: 'worker-agent',
systemPrompt: 'You process tasks.',
outputSchema: z.object({ result: z.string() }),
llmConfig: { model: openai('gpt-4o') },
});
// Then create a sub-agent tool from it
const delegateTool = createSubAgentTool(
WorkerAgent, // The agent config (must have outputSchema)
z.object({ task: z.string() }), // Input schema for the tool
{ description: 'Delegate to worker' } // Optional description override
);Signature:
function createSubAgentTool<TInput, TOutputSchema>(
agent: AgentConfig<any, TOutputSchema>,
inputSchema: TInput,
options?: { description?: string }
): SubAgentTool<TInput, TOutputSchema>;Types:
SubAgentTool- Sub-agent tool definition
Tool Utilities
import {
SUBAGENT_TOOL_PREFIX, // 'subagent__'
FINISH_TOOL_NAME, // '__finish__'
isSubAgentTool, // Check if tool name is sub-agent
isFinishTool, // Check if tool name is __finish__
createFinishTool, // Create finish tool from schema
} from '@helix-agents/core';State Types
AgentState
The full state structure for a running agent.
interface AgentState<TState, TOutput> {
runId: string;
agentType: string;
streamId: string;
status: AgentStatus;
stepCount: number;
customState: TState;
messages: Message[];
output?: TOutput;
error?: string;
parentAgentId?: string;
subAgentRefs: SubAgentRef[]; // References to child agent runs
aborted: boolean;
abortReason?: string;
}Message Types
type Message = SystemMessage | UserMessage | AssistantMessage | ToolResultMessage;
interface SystemMessage {
role: 'system';
content: string;
}
interface UserMessage {
role: 'user';
content: string;
}
interface AssistantMessage {
role: 'assistant';
content?: string;
toolCalls?: ToolCallRequest[];
thinking?: ThinkingContent;
}
interface ToolResultMessage {
role: 'tool';
toolCallId: string;
toolName: string;
content: string;
}State Helpers
import {
isAssistantMessage,
isToolResultMessage,
stripThinking, // Remove thinking from messages
getSubAgentRefsByType, // Filter sub-agent refs
getToolResultsFromMessages,
createInitialAgentState,
} from '@helix-agents/core';Stream Types
Stream Chunks
All chunk types emitted during agent execution:
type StreamChunk =
| TextDeltaChunk // Token-by-token text
| ThinkingChunk // Reasoning content
| ToolStartChunk // Tool invocation starting
| ToolEndChunk // Tool execution complete
| SubAgentStartChunk // Sub-agent starting
| SubAgentEndChunk // Sub-agent complete
| CustomEventChunk // Custom events from tools
| StatePatchChunk // RFC 6902 state patches
| ErrorChunk // Error events
| OutputChunk; // Structured outputChunk Type Guards
import {
isTextDeltaChunk,
isThinkingChunk,
isToolStartChunk,
isToolEndChunk,
isSubAgentStartChunk,
isSubAgentEndChunk,
isCustomEventChunk,
isStatePatchChunk,
isErrorChunk,
isOutputChunk,
isStreamEnd,
} from '@helix-agents/core';Schemas
import {
StreamChunkSchema, // Zod schema for chunks
StreamMessageSchema, // Zod schema for messages
} from '@helix-agents/core';Runtime Types
StepResult
Result from an LLM generation step:
type StepResult<TOutput> =
| TextStepResult
| ToolCallsStepResult
| StructuredOutputStepResult<TOutput>
| ErrorStepResult;
interface TextStepResult {
type: 'text';
content: string;
thinking?: ThinkingContent;
shouldStop: boolean;
stopReason?: StopReason;
}
interface ToolCallsStepResult {
type: 'tool_calls';
content?: string;
toolCalls: ParsedToolCall[];
subAgentCalls: ParsedSubAgentCall[];
thinking?: ThinkingContent;
stopReason?: StopReason;
}StopReason
Normalized stop reasons from LLM providers:
type StopReason =
| 'end_turn' // Normal completion
| 'stop_sequence' // Hit stop sequence
| 'tool_use' // Tool call requested
| 'max_tokens' // Token limit (error)
| 'content_filter' // Safety filter (error)
| 'refusal' // Model refused (error)
| 'error' // Generation error
| 'unknown'; // Unrecognized
import { isErrorStopReason } from '@helix-agents/core';Execution Types
interface AgentExecutionHandle<TOutput> {
readonly runId: string;
stream(): Promise<AsyncIterable<StreamChunk> | null>;
result(): Promise<AgentResult<TOutput>>;
abort(reason?: string): Promise<void>;
getState(): Promise<AgentState<unknown, TOutput>>;
canResume(): Promise<CanResumeResult>;
resume(options?: ResumeOptions): Promise<AgentExecutionHandle<TOutput>>;
}
interface AgentResult<TOutput> {
status: AgentStatus;
output?: TOutput;
error?: string;
}
interface CanResumeResult {
canResume: boolean;
reason?: string;
}State Management
ImmerStateTracker
Track state mutations with RFC 6902 patches.
import { ImmerStateTracker, createImmerStateTracker } from '@helix-agents/core';
const tracker = new ImmerStateTracker(initialState, {
arrayDeltaMode: 'append_only', // or 'full_replace'
});
// Make mutations
tracker.update((draft) => {
draft.notes.push({ content: 'New note' });
});
// Get patches
const patches = tracker.getPatches();
// Get current state
const currentState = tracker.getState();
// Reset patch tracking
tracker.resetPatches();Types:
ImmerStateTrackerOptions- Configuration optionsMergeChanges- Function type for merging state
convertImmerPatchToRFC6902
Convert Immer patches to RFC 6902 format.
import { convertImmerPatchToRFC6902 } from '@helix-agents/core';
const rfc6902Patch = convertImmerPatchToRFC6902(immerPatch);LLM Module
LLMAdapter Interface
Interface for LLM providers:
interface LLMAdapter {
generateStep(input: LLMGenerateInput): Promise<StepResult<unknown>>;
}
interface LLMGenerateInput {
messages: Message[];
tools: Tool[];
config: LLMConfig;
abortSignal?: AbortSignal;
callbacks?: LLMStreamCallbacks;
agentId: string;
agentType: string;
}
interface LLMStreamCallbacks {
onTextDelta?: (delta: string) => void;
onThinking?: (content: string, isComplete: boolean) => void;
onToolCall?: (toolCall: ParsedToolCall) => void;
onError?: (error: Error) => void;
}MockLLMAdapter
Mock adapter for testing:
import { MockLLMAdapter } from '@helix-agents/core';
const mock = new MockLLMAdapter([
{ type: 'text', content: 'Hello!' },
{ type: 'tool_calls', toolCalls: [{ id: 't1', name: 'search', arguments: {} }] },
{ type: 'structured_output', output: { result: 'done' } },
]);Stop Reason Mapping
import {
mapVercelFinishReason,
mapOpenAIFinishReason,
mapAnthropicStopReason,
mapGeminiFinishReason,
} from '@helix-agents/core';Store Interfaces
StateStore
Interface for state persistence. Supports atomic operations for safe concurrent modifications from parallel tool execution.
interface StateStore {
// Basic CRUD operations
save(state: UntypedAgentState): Promise<void>;
load(runId: string): Promise<UntypedAgentState | null>;
delete(runId: string): Promise<void>;
// Atomic operations (safe for parallel tool execution)
appendMessages(runId: string, messages: Message[]): Promise<void>;
mergeCustomState(runId: string, changes: MergeChanges): Promise<{ warnings: string[] }>;
updateStatus(runId: string, status: AgentStatus): Promise<void>;
incrementStepCount(runId: string): Promise<number>;
// Sub-agent management
addSubAgentRefs(
runId: string,
refs: Array<{
subAgentRunId: string;
agentType: string;
parentToolCallId: string;
startedAt: number;
}>
): Promise<void>;
updateSubAgentRef(
runId: string,
update: {
subAgentRunId: string;
status: 'running' | 'completed' | 'failed';
completedAt?: number;
}
): Promise<void>;
// Message queries
getMessages(runId: string, options?: GetMessagesOptions): Promise<PaginatedMessages>;
getMessageCount(runId: string): Promise<number>;
}
interface GetMessagesOptions {
offset?: number; // Starting position (default: 0)
limit?: number; // Max messages (default: 50)
includeThinking?: boolean; // Include thinking content (default: true)
}
interface PaginatedMessages {
messages: Message[];
total: number;
offset: number;
limit: number;
hasMore: boolean;
}StreamManager
Interface for real-time streaming:
interface StreamManager {
// Create a writer for emitting chunks (implicitly creates stream)
createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter>;
// Create a reader to consume chunks
createReader(streamId: string): Promise<StreamReader | null>;
// Create a resumable reader (optional, for crash recovery)
createResumableReader?(
streamId: string,
options?: ResumableReaderOptions
): Promise<ResumableStreamReader | null>;
// Mark stream as complete
endStream(streamId: string, output?: unknown): Promise<void>;
// Mark stream as failed
failStream(streamId: string, error: string): Promise<void>;
}
interface StreamWriter {
write(chunk: StreamChunk): Promise<void>;
close(): Promise<void>; // Closes this writer, NOT the stream
}
interface StreamReader extends AsyncIterable<StreamChunk> {
[Symbol.asyncIterator](): AsyncIterator<StreamChunk>;
close(): Promise<void>;
}
interface ResumableStreamReader extends StreamReader {
readonly currentSequence: number;
readonly totalChunks: number;
readonly latestSequence: number;
}Stream Utilities
Stream Filters
import {
filterByAgentId,
filterByAgentType,
filterByType,
excludeTypes,
filterWith,
combineStreams,
take,
skip,
collectText,
collectAll,
} from '@helix-agents/core';
// Filter by agent
const filtered = filterByAgentId(stream, 'agent-123');
// Filter by chunk type
const textOnly = filterByType(stream, ['text_delta']);
// Exclude types
const noThinking = excludeTypes(stream, ['thinking']);
// Collect all text
const fullText = await collectText(stream);State Streaming
import { CustomStateStreamer, createStateStreamer } from '@helix-agents/core';
const streamer = createStateStreamer({
streamManager,
streamId: 'run-123',
});
// Emit state patches
await streamer.emitPatch(patches);State Projection
import { createStateProjection, StreamProjector } from '@helix-agents/core';
// Project subset of state
const projection = createStateProjection<FullState, { count: number }>((state) => ({
count: state.count,
}));Resumable Stream Handler
import { createResumableStreamHandler, extractResumePosition } from '@helix-agents/core';
const handler = createResumableStreamHandler({
streamManager,
});
// Handle request with resume support
const response = await handler.handle({
streamId: 'run-123',
resumeAt: extractResumePosition(lastEventId),
});Orchestration
initializeAgentState
Create initial state from input:
import { initializeAgentState } from '@helix-agents/core';
const state = initializeAgentState({
agent,
input: 'Hello', // or { message: 'Hello', state: { ... } }
runId: 'run-123',
streamId: 'run-123',
parentAgentId: undefined,
});buildMessagesForLLM
Prepare messages with system prompt:
import { buildMessagesForLLM } from '@helix-agents/core';
const messages = buildMessagesForLLM(state.messages, agent.systemPrompt, state.customState);buildEffectiveTools
Get tools including __finish__:
import { buildEffectiveTools } from '@helix-agents/core';
const tools = buildEffectiveTools(agent);planStepProcessing
Analyze LLM result and plan actions:
import { planStepProcessing } from '@helix-agents/core';
const plan = planStepProcessing(stepResult, {
outputSchema: agent.outputSchema,
});
// plan.assistantMessagePlan - For creating assistant message
// plan.pendingToolCalls - Tools to execute
// plan.pendingSubAgentCalls - Sub-agents to invoke
// plan.statusUpdate - Status change to apply
// plan.isTerminal - Whether execution should stop
// plan.output - Parsed output (if __finish__ called)shouldStopExecution
Check if agent should stop:
import { shouldStopExecution, determineFinalStatus } from '@helix-agents/core';
const shouldStop = shouldStopExecution(stepResult, stepCount, {
maxSteps: 10,
stopWhen: (result) => result.type === 'text' && result.content.includes('DONE'),
});
const finalStatus = determineFinalStatus(stepResult);Message Builders
import {
createAssistantMessage,
createToolResultMessage,
createSubAgentResultMessage,
} from '@helix-agents/core';
const assistantMsg = createAssistantMessage(plan.assistantMessagePlan);
const toolResult = createToolResultMessage({
toolCallId: 'tc1',
toolName: 'search',
result: { data: 'found' },
success: true,
});Recovery
recoverConversation
Resume a conversation from stored state:
import { recoverConversation, loadConversationMessages } from '@helix-agents/core';
const { messages, canResume } = await recoverConversation({
stateStore,
runId: 'run-123',
});
// Or just load messages
const messages = await loadConversationMessages(stateStore, 'run-123');Utilities
createToolContext
Create a tool execution context:
import { createToolContext } from '@helix-agents/core';
const context = createToolContext({
agentId: 'run-123',
agentType: 'my-agent',
stateTracker,
streamWriter,
});Logger Types
import { noopLogger, consoleLogger, type Logger } from '@helix-agents/core';
const logger: Logger = {
debug: (msg, data) => { ... },
info: (msg, data) => { ... },
warn: (msg, data) => { ... },
error: (msg, data) => { ... },
};Type Re-exports
The package re-exports Draft from Immer for tool authors:
import type { Draft } from '@helix-agents/core';
// Use in updateState callbacks
context.updateState((draft: Draft<MyState>) => {
draft.items.push(newItem);
});