Skip to content

Storage Overview

Storage in Helix Agents consists of two components: State Stores for persisting agent state, and Stream Managers for real-time event streaming. Both are defined as interfaces, enabling different implementations for different environments.

Why Separate Components?

State and streaming have different requirements:

ConcernState StoreStream Manager
PurposePersist agent stateReal-time event delivery
Access PatternRead-modify-writeAppend-only, subscribe
ConsistencyStrong (atomic updates)Eventual (ordering matters)
RetentionUntil explicit deleteTemporary (TTL-based)

By separating them, you can:

  • Use in-memory streams with Redis state
  • Scale streaming infrastructure independently
  • Apply different retention policies

Available Implementations

PackageState StoreStream ManagerUse Case
@helix-agents/store-memoryInMemoryStateStoreInMemoryStreamManagerDevelopment, testing
@helix-agents/store-redisRedisStateStoreRedisStreamManagerProduction
@helix-agents/store-cloudflareD1StateStoreDOStreamManagerCloudflare Workers

State Store Interface

The StateStore interface defines how agent state is persisted:

typescript
interface StateStore {
  // Core operations
  save(state: AgentState): Promise<void>;
  load(runId: string): Promise<AgentState | null>;
  delete(runId: string): Promise<void>;

  // Atomic operations (for parallel tool execution)
  appendMessages(runId: string, messages: Message[]): Promise<void>;
  mergeCustomState(runId: string, changes: MergeChanges): Promise<{ warnings: string[] }>;
  updateStatus(runId: string, status: AgentStatus): Promise<void>;
  incrementStepCount(runId: string): Promise<number>;

  // Sub-agent tracking
  addSubAgentRefs(runId: string, refs: SubAgentRef[]): Promise<void>;
  updateSubAgentRef(runId: string, update: SubAgentRefUpdate): Promise<void>;

  // Message queries (for UI/recovery)
  getMessages(runId: string, options?: GetMessagesOptions): Promise<PaginatedMessages>;
  getMessageCount(runId: string): Promise<number>;
}

Atomic Operations

The atomic operations enable safe concurrent modifications from parallel tool execution:

typescript
// Multiple tools can safely append messages
await Promise.all([
  stateStore.appendMessages(runId, [toolResult1]),
  stateStore.appendMessages(runId, [toolResult2]),
]);

// Multiple tools can safely update different state keys
await Promise.all([
  stateStore.mergeCustomState(runId, { values: { key1: 'value1' } }),
  stateStore.mergeCustomState(runId, { values: { key2: 'value2' } }),
]);

State Merge Semantics

The mergeCustomState operation uses smart merging:

typescript
// Array delta mode: new items are appended
// Tool 1: draft.items.push('a')
// Tool 2: draft.items.push('b')
// Result: items = [...original, 'a', 'b']

// Scalar values: last write wins
// Tool 1: draft.count = 5
// Tool 2: draft.count = 10
// Result: count = 5 or 10 (depends on timing)

// Null means delete
// draft.temp = null
// Result: temp key is removed

Stream Manager Interface

The StreamManager interface defines how events are streamed:

typescript
interface StreamManager {
  // Writer lifecycle
  createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter>;

  // Reader lifecycle
  createReader(streamId: string): Promise<StreamReader | null>;

  // Stream control
  endStream(streamId: string, finalOutput?: unknown): Promise<void>;
  failStream(streamId: string, error: string): Promise<void>;

  // Optional: Resumable streams
  createResumableReader?(
    streamId: string,
    options?: ReaderOptions
  ): Promise<ResumableStreamReader | null>;
  getStreamInfo?(streamId: string): Promise<StreamInfo | null>;
  pauseStream?(streamId: string): Promise<void>;
  resumeStream?(streamId: string): Promise<void>;
}

Writer/Reader Pattern

Writers emit chunks, readers consume them:

typescript
// Create writer (usually done by runtime)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write({ type: 'text_delta', delta: 'Hello', ... });
await writer.close();  // Close writer, NOT the stream

// Create reader (usually done by client/UI)
const reader = await streamManager.createReader(streamId);
if (reader) {
  try {
    for await (const chunk of reader) {
      console.log(chunk);
    }
  } finally {
    await reader.close();  // Always close reader
  }
}

Stream Lifecycle

Stream Created (first writer)

    ├── Writers emit chunks
    │   └── Readers receive in real-time

    ├── endStream() called
    │   └── Readers complete normally
    │       └── New readers get historical chunks

    └── OR failStream() called
        └── Readers receive error
            └── New readers get null

Choosing Storage

Development

Use in-memory stores for fast iteration:

typescript
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';

const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();

Pros: Zero setup, fast, easy debugging Cons: Data lost on restart, single process only

Production (Self-Hosted)

Use Redis for durability and multi-process:

typescript
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

Pros: Persistent, multi-process, production-ready Cons: Requires Redis infrastructure

Production (Cloudflare)

Use D1 and Durable Objects for edge deployment:

typescript
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';

const stateStore = new D1StateStore(env.DB);
const streamManager = new DOStreamManager(env.STREAM_MANAGER);

Pros: Global edge, serverless, integrated with Cloudflare Cons: Cloudflare-specific, different API from other stores

Swapping Stores

Because stores implement standard interfaces, swapping is straightforward:

typescript
// Environment-based selection
const createStores = () => {
  if (process.env.NODE_ENV === 'production') {
    const redis = new Redis(process.env.REDIS_URL);
    return {
      stateStore: new RedisStateStore(redis),
      streamManager: new RedisStreamManager(redis),
    };
  }

  return {
    stateStore: new InMemoryStateStore(),
    streamManager: new InMemoryStreamManager(),
  };
};

const { stateStore, streamManager } = createStores();
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);

State Data Model

Agent state includes:

typescript
interface AgentState<TState, TOutput> {
  // Identity
  runId: string; // Unique run identifier
  agentType: string; // Agent type name
  streamId: string; // Stream identifier

  // Hierarchy
  parentAgentId?: string; // Parent if this is a sub-agent

  // Execution state
  messages: Message[]; // Conversation history
  customState: TState; // Your custom state schema
  stepCount: number; // LLM call count
  status: AgentStatus; // 'running' | 'completed' | 'failed' | 'paused'

  // Output
  output?: TOutput; // Final structured output
  error?: string; // Error message if failed

  // Sub-agents
  subAgentRefs: SubAgentRef[];

  // Abort handling
  aborted: boolean;
  abortReason?: string;
}

Next Steps

Released under the MIT License.