Storage Overview
Storage in Helix Agents consists of two components: State Stores for persisting agent state, and Stream Managers for real-time event streaming. Both are defined as interfaces, enabling different implementations for different environments.
Why Separate Components?
State and streaming have different requirements:
| Concern | State Store | Stream Manager |
|---|---|---|
| Purpose | Persist agent state | Real-time event delivery |
| Access Pattern | Read-modify-write | Append-only, subscribe |
| Consistency | Strong (atomic updates) | Eventual (ordering matters) |
| Retention | Until explicit delete | Temporary (TTL-based) |
By separating them, you can:
- Use in-memory streams with Redis state
- Scale streaming infrastructure independently
- Apply different retention policies
Available Implementations
| Package | State Store | Stream Manager | Use Case |
|---|---|---|---|
@helix-agents/store-memory | InMemoryStateStore | InMemoryStreamManager | Development, testing |
@helix-agents/store-redis | RedisStateStore | RedisStreamManager | Production |
@helix-agents/store-cloudflare | D1StateStore | DOStreamManager | Cloudflare Workers |
State Store Interface
The StateStore interface defines how agent state is persisted:
interface StateStore {
// Core operations
save(state: AgentState): Promise<void>;
load(runId: string): Promise<AgentState | null>;
delete(runId: string): Promise<void>;
// Atomic operations (for parallel tool execution)
appendMessages(runId: string, messages: Message[]): Promise<void>;
mergeCustomState(runId: string, changes: MergeChanges): Promise<{ warnings: string[] }>;
updateStatus(runId: string, status: AgentStatus): Promise<void>;
incrementStepCount(runId: string): Promise<number>;
// Sub-agent tracking
addSubAgentRefs(runId: string, refs: SubAgentRef[]): Promise<void>;
updateSubAgentRef(runId: string, update: SubAgentRefUpdate): Promise<void>;
// Message queries (for UI/recovery)
getMessages(runId: string, options?: GetMessagesOptions): Promise<PaginatedMessages>;
getMessageCount(runId: string): Promise<number>;
}Atomic Operations
The atomic operations enable safe concurrent modifications from parallel tool execution:
// Multiple tools can safely append messages
await Promise.all([
stateStore.appendMessages(runId, [toolResult1]),
stateStore.appendMessages(runId, [toolResult2]),
]);
// Multiple tools can safely update different state keys
await Promise.all([
stateStore.mergeCustomState(runId, { values: { key1: 'value1' } }),
stateStore.mergeCustomState(runId, { values: { key2: 'value2' } }),
]);State Merge Semantics
The mergeCustomState operation uses smart merging:
// Array delta mode: new items are appended
// Tool 1: draft.items.push('a')
// Tool 2: draft.items.push('b')
// Result: items = [...original, 'a', 'b']
// Scalar values: last write wins
// Tool 1: draft.count = 5
// Tool 2: draft.count = 10
// Result: count = 5 or 10 (depends on timing)
// Null means delete
// draft.temp = null
// Result: temp key is removedStream Manager Interface
The StreamManager interface defines how events are streamed:
interface StreamManager {
// Writer lifecycle
createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter>;
// Reader lifecycle
createReader(streamId: string): Promise<StreamReader | null>;
// Stream control
endStream(streamId: string, finalOutput?: unknown): Promise<void>;
failStream(streamId: string, error: string): Promise<void>;
// Optional: Resumable streams
createResumableReader?(
streamId: string,
options?: ReaderOptions
): Promise<ResumableStreamReader | null>;
getStreamInfo?(streamId: string): Promise<StreamInfo | null>;
pauseStream?(streamId: string): Promise<void>;
resumeStream?(streamId: string): Promise<void>;
}Writer/Reader Pattern
Writers emit chunks, readers consume them:
// Create writer (usually done by runtime)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write({ type: 'text_delta', delta: 'Hello', ... });
await writer.close(); // Close writer, NOT the stream
// Create reader (usually done by client/UI)
const reader = await streamManager.createReader(streamId);
if (reader) {
try {
for await (const chunk of reader) {
console.log(chunk);
}
} finally {
await reader.close(); // Always close reader
}
}Stream Lifecycle
Stream Created (first writer)
│
├── Writers emit chunks
│ └── Readers receive in real-time
│
├── endStream() called
│ └── Readers complete normally
│ └── New readers get historical chunks
│
└── OR failStream() called
└── Readers receive error
└── New readers get nullChoosing Storage
Development
Use in-memory stores for fast iteration:
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();Pros: Zero setup, fast, easy debugging Cons: Data lost on restart, single process only
Production (Self-Hosted)
Use Redis for durability and multi-process:
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);Pros: Persistent, multi-process, production-ready Cons: Requires Redis infrastructure
Production (Cloudflare)
Use D1 and Durable Objects for edge deployment:
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
const stateStore = new D1StateStore(env.DB);
const streamManager = new DOStreamManager(env.STREAM_MANAGER);Pros: Global edge, serverless, integrated with Cloudflare Cons: Cloudflare-specific, different API from other stores
Swapping Stores
Because stores implement standard interfaces, swapping is straightforward:
// Environment-based selection
const createStores = () => {
if (process.env.NODE_ENV === 'production') {
const redis = new Redis(process.env.REDIS_URL);
return {
stateStore: new RedisStateStore(redis),
streamManager: new RedisStreamManager(redis),
};
}
return {
stateStore: new InMemoryStateStore(),
streamManager: new InMemoryStreamManager(),
};
};
const { stateStore, streamManager } = createStores();
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);State Data Model
Agent state includes:
interface AgentState<TState, TOutput> {
// Identity
runId: string; // Unique run identifier
agentType: string; // Agent type name
streamId: string; // Stream identifier
// Hierarchy
parentAgentId?: string; // Parent if this is a sub-agent
// Execution state
messages: Message[]; // Conversation history
customState: TState; // Your custom state schema
stepCount: number; // LLM call count
status: AgentStatus; // 'running' | 'completed' | 'failed' | 'paused'
// Output
output?: TOutput; // Final structured output
error?: string; // Error message if failed
// Sub-agents
subAgentRefs: SubAgentRef[];
// Abort handling
aborted: boolean;
abortReason?: string;
}Next Steps
- In-Memory Storage - Development and testing
- Redis Storage - Production deployment
- Cloudflare Storage - Edge deployment