@helix-agents/store-memory
In-memory implementations of state store and stream manager. Useful for development, testing, and single-process deployments.
Installation
npm install @helix-agents/store-memoryInMemoryStateStore
In-memory state storage implementation.
import { InMemoryStateStore } from '@helix-agents/store-memory';
const stateStore = new InMemoryStateStore();Session vs Run Identifiers
sessionId: Primary key for all state operations. A session contains all messages, state, and checkpoints.runId: Execution metadata identifying a specific run within a session.
All store methods use sessionId as the primary key for lookups and storage.
Methods
save
Save agent state. The sessionId from the state object is used as the storage key.
await stateStore.save(state);load
Load agent state.
const state = await stateStore.load('session-123');
if (state) {
console.log('Status:', state.status);
console.log('Output:', state.output);
}exists
Check if state exists.
const exists = await stateStore.exists('session-123');updateStatus
Update agent status.
await stateStore.updateStatus('session-123', 'completed');compareAndSetStatus
Atomically compare and set status (CAS operation).
const success = await stateStore.compareAndSetStatus(
'session-123',
['running'], // Expected statuses (array)
'interrupted' // New status
);
if (!success) {
// Status was not in ['running'], update failed
}getCheckpoint
Get a specific checkpoint.
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-session-123-s5-...');
if (checkpoint) {
console.log('State at step:', checkpoint.state.stepCount);
}getLatestCheckpoint
Get the most recent checkpoint for a run.
const checkpoint = await stateStore.getLatestCheckpoint('session-123');listCheckpoints
List checkpoints with pagination.
const result = await stateStore.listCheckpoints('session-123', {
limit: 10,
cursor: 'next-page-cursor',
});
for (const checkpoint of result.items) {
console.log(`${checkpoint.id}: step ${checkpoint.state.stepCount}`);
}
if (result.nextCursor) {
// More pages available
}stageChanges
Stage changes for a tool call within a step (before commit).
await stateStore.stageChanges('session-123', 'step-1', {
toolCallId: 'call-456',
patches: [{ op: 'add', path: '/notes/-', value: 'New finding' }],
mergeChanges: { notes: ['New finding'] },
timestamp: Date.now(),
});promoteStaging
Commit staged changes to the main state.
await stateStore.promoteStaging('session-123', 'step-1');discardStaging
Discard staged changes (rollback).
await stateStore.discardStaging('session-123', 'step-1');getMessages
Get paginated messages.
const { messages, hasMore } = await stateStore.getMessages('session-123', {
offset: 0,
limit: 50,
});Testing Utilities
The in-memory store provides additional methods for testing:
// Clear all stored state
stateStore.clear();
// Get all session IDs
const sessionIds = stateStore.getAllSessionIds();
// Get state without async
const state = stateStore.getSync('session-123');InMemoryStreamManager
In-memory stream management implementation.
import { InMemoryStreamManager } from '@helix-agents/store-memory';
const streamManager = new InMemoryStreamManager();Methods
createWriter
Create a stream writer (implicitly creates stream if needed).
const writer = await streamManager.createWriter('stream-123', 'run-123', 'agent-name');
// Write chunks
await writer.write({
type: 'text_delta',
delta: 'Hello',
agentId: 'run-123',
timestamp: Date.now(),
});
// Close writer when done
await writer.close();createReader
Create a stream reader.
const reader = await streamManager.createReader('stream-123');
if (reader) {
for await (const chunk of reader) {
console.log(chunk);
}
}endStream
End a stream normally with optional output.
await streamManager.endStream('stream-123');
await streamManager.endStream('stream-123', output); // With final outputfailStream
End a stream with an error.
await streamManager.failStream('stream-123', 'Something went wrong');getInfo
Get stream information.
const info = await streamManager.getInfo('stream-123');
if (info) {
console.log('Status:', info.status); // 'active' | 'ended' | 'failed'
console.log('Chunk count:', info.chunkCount);
}createResumableReader
Create a reader that can resume from a position.
const reader = await streamManager.createResumableReader('stream-123', {
fromSequence: 10, // Resume from sequence 10
});
if (reader) {
console.log('Status:', reader.status); // 'active' | 'ended' | 'failed'
for await (const { chunk, sequence } of reader) {
console.log(`[${sequence}]`, chunk);
}
}Testing Utilities
// Clear all streams
streamManager.clear();
// Get all stream IDs
const streamIds = streamManager.getAllStreamIds();
// Get stream status synchronously
const status = streamManager.getStatusSync('stream-123');
// Get all chunks for a stream
const chunks = streamManager.getChunksSync('stream-123');Usage Example
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Create stores
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();
// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();
// Inspect stored state (for debugging)
const state = stateStore.getSync(handle.sessionId);
const chunks = streamManager.getChunksSync(handle.streamId);Testing Patterns
Reset Between Tests
import { beforeEach, describe, it } from 'vitest';
describe('MyAgent', () => {
let stateStore: InMemoryStateStore;
let streamManager: InMemoryStreamManager;
beforeEach(() => {
stateStore = new InMemoryStateStore();
streamManager = new InMemoryStreamManager();
});
it('should complete successfully', async () => {
const executor = new JSAgentExecutor(stateStore, streamManager, mockAdapter);
const handle = await executor.execute(MyAgent, 'test');
const result = await handle.result();
expect(result.status).toBe('completed');
});
});Inspect State After Execution
it('should update custom state', async () => {
const handle = await executor.execute(CounterAgent, 'increment 3 times');
await handle.result();
const state = stateStore.getSync(handle.sessionId);
expect(state?.customState.count).toBe(3);
});Inspect Stream Chunks
it('should emit tool events', async () => {
const handle = await executor.execute(ToolAgent, 'use the search tool');
await handle.result();
const chunks = streamManager.getChunksSync(handle.streamId);
const toolStarts = chunks.filter((c) => c.type === 'tool_start');
expect(toolStarts).toHaveLength(1);
expect(toolStarts[0].toolName).toBe('search');
});InMemoryUsageStore
In-memory usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics.
import { InMemoryUsageStore } from '@helix-agents/store-memory';
const usageStore = new InMemoryUsageStore();Basic Usage
Pass to executor to enable usage tracking:
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();
// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);
console.log(`Tool calls: ${rollup?.toolStats.totalCalls}`);Methods
recordEntry
Record a usage entry (called internally by the framework).
await usageStore.recordEntry({
kind: 'tokens',
sessionId: 'session-123',
stepCount: 1,
timestamp: Date.now(),
source: { type: 'agent', name: 'my-agent' },
model: 'gpt-4o',
tokens: { prompt: 100, completion: 50, total: 150 },
});getEntries
Get usage entries for a run with optional filtering.
// All entries
const entries = await usageStore.getEntries('session-123');
// Filter by kind
const tokenEntries = await usageStore.getEntries('session-123', {
kinds: ['tokens'],
});
// Filter by step range
const midRunEntries = await usageStore.getEntries('session-123', {
stepRange: { min: 5, max: 10 },
});
// Pagination
const page = await usageStore.getEntries('session-123', {
limit: 10,
offset: 20,
});getRollup
Get aggregated usage rollup.
// This agent's usage only
const rollup = await usageStore.getRollup('session-123');
// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('session-123', {
includeSubAgents: true,
});exists
Check if usage data exists for a run.
const hasUsage = usageStore.exists('session-123');delete
Delete usage data for a run.
usageStore.delete('session-123');Testing Utilities
// Clear all usage data
usageStore.clear();
// Get all tracked session IDs
const sessionIds = usageStore.getAllSessionIds();
// Get counts
console.log('Tracked runs:', usageStore.size);
console.log('Total entries:', usageStore.totalEntryCount);Testing Patterns
import { beforeEach, describe, it, expect } from 'vitest';
describe('MyAgent usage', () => {
let usageStore: InMemoryUsageStore;
beforeEach(() => {
usageStore = new InMemoryUsageStore();
});
it('should track tool usage', async () => {
const handle = await executor.execute(agent, 'Use the search tool', {
usageStore,
});
await handle.result();
const rollup = await handle.getUsageRollup();
expect(rollup?.toolStats.totalCalls).toBeGreaterThan(0);
expect(rollup?.toolStats.byTool['search']).toBeDefined();
});
it('should track custom metrics', async () => {
const handle = await executor.execute(agent, 'Search for something', {
usageStore,
});
await handle.result();
const rollup = await handle.getUsageRollup();
expect(rollup?.custom['api_calls']['tavily']).toBe(1);
});
});InMemoryLockManager
In-memory lock manager for single-process coordination. Prevents concurrent execution of the same agent run.
import { InMemoryLockManager } from '@helix-agents/store-memory';
const lockManager = new InMemoryLockManager();Methods
acquire
Acquire a lock with fencing token.
const lock = await lockManager.acquire('session-123', {
ttlMs: 30000, // Lock expires after 30s
owner: 'executor-1',
});
if (lock) {
console.log('Lock acquired with token:', lock.fencingToken);
// Do work...
// Optionally refresh to extend TTL
await lock.refresh();
// Release when done
await lock.release();
}isLocked
Check if a lock is currently held.
const locked = await lockManager.isLocked('session-123');Usage with Executor
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { InMemoryLockManager } from '@helix-agents/store-memory';
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, {
lockManager: new InMemoryLockManager(),
});Testing Utilities
// Clear all locks
lockManager.clear();
// Get all active locks
const locks = lockManager.getActiveLocks();Limitations
- No persistence - Data is lost when process exits
- No sharing - Cannot share state between processes
- Memory bound - Large states consume memory
- Single process only - Lock manager doesn't coordinate across processes
For production use with persistence and distributed coordination, see:
- @helix-agents/store-redis - Redis storage with RedisLockManager
- @helix-agents/store-cloudflare - D1/Durable Objects with DurableObjectLockManager