Skip to content

@helix-agents/store-memory

In-memory implementations of state store and stream manager. Useful for development, testing, and single-process deployments.

Installation

bash
npm install @helix-agents/store-memory

InMemoryStateStore

In-memory state storage implementation.

typescript
import { InMemoryStateStore } from '@helix-agents/store-memory';

const stateStore = new InMemoryStateStore();

Methods

save

Save agent state. The runId is taken from inside the state object.

typescript
await stateStore.save(state);

load

Load agent state.

typescript
const state = await stateStore.load('run-123');

if (state) {
  console.log('Status:', state.status);
  console.log('Output:', state.output);
}

exists

Check if state exists.

typescript
const exists = await stateStore.exists('run-123');

updateStatus

Update agent status.

typescript
await stateStore.updateStatus('run-123', 'completed');

getMessages

Get paginated messages.

typescript
const { messages, hasMore } = await stateStore.getMessages('run-123', {
  offset: 0,
  limit: 50,
});

Testing Utilities

The in-memory store provides additional methods for testing:

typescript
// Clear all stored state
stateStore.clear();

// Get all run IDs
const runIds = stateStore.getAllRunIds();

// Get state without async
const state = stateStore.getSync('run-123');

InMemoryStreamManager

In-memory stream management implementation.

typescript
import { InMemoryStreamManager } from '@helix-agents/store-memory';

const streamManager = new InMemoryStreamManager();

Methods

createWriter

Create a stream writer (implicitly creates stream if needed).

typescript
const writer = await streamManager.createWriter('stream-123', 'run-123', 'agent-name');

// Write chunks
await writer.write({
  type: 'text_delta',
  delta: 'Hello',
  agentId: 'run-123',
  timestamp: Date.now(),
});

// Close writer when done
await writer.close();

createReader

Create a stream reader.

typescript
const reader = await streamManager.createReader('stream-123');

if (reader) {
  for await (const chunk of reader) {
    console.log(chunk);
  }
}

endStream

End a stream normally with optional output.

typescript
await streamManager.endStream('stream-123');
await streamManager.endStream('stream-123', output); // With final output

failStream

End a stream with an error.

typescript
await streamManager.failStream('stream-123', 'Something went wrong');

getInfo

Get stream information.

typescript
const info = await streamManager.getInfo('stream-123');

if (info) {
  console.log('Status:', info.status); // 'active' | 'ended' | 'failed'
  console.log('Chunk count:', info.chunkCount);
}

createResumableReader

Create a reader that can resume from a position.

typescript
const reader = await streamManager.createResumableReader('stream-123', {
  fromSequence: 10, // Resume from sequence 10
});

if (reader) {
  console.log('Status:', reader.status); // 'active' | 'ended' | 'failed'

  for await (const { chunk, sequence } of reader) {
    console.log(`[${sequence}]`, chunk);
  }
}

Testing Utilities

typescript
// Clear all streams
streamManager.clear();

// Get all stream IDs
const streamIds = streamManager.getAllStreamIds();

// Get stream status synchronously
const status = streamManager.getStatusSync('stream-123');

// Get all chunks for a stream
const chunks = streamManager.getChunksSync('stream-123');

Usage Example

typescript
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Create stores
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();

// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();

// Inspect stored state (for debugging)
const state = stateStore.getSync(handle.runId);
const chunks = streamManager.getChunksSync(handle.streamId);

Testing Patterns

Reset Between Tests

typescript
import { beforeEach, describe, it } from 'vitest';

describe('MyAgent', () => {
  let stateStore: InMemoryStateStore;
  let streamManager: InMemoryStreamManager;

  beforeEach(() => {
    stateStore = new InMemoryStateStore();
    streamManager = new InMemoryStreamManager();
  });

  it('should complete successfully', async () => {
    const executor = new JSAgentExecutor(stateStore, streamManager, mockAdapter);

    const handle = await executor.execute(MyAgent, 'test');
    const result = await handle.result();

    expect(result.status).toBe('completed');
  });
});

Inspect State After Execution

typescript
it('should update custom state', async () => {
  const handle = await executor.execute(CounterAgent, 'increment 3 times');
  await handle.result();

  const state = stateStore.getSync(handle.runId);
  expect(state?.customState.count).toBe(3);
});

Inspect Stream Chunks

typescript
it('should emit tool events', async () => {
  const handle = await executor.execute(ToolAgent, 'use the search tool');
  await handle.result();

  const chunks = streamManager.getChunksSync(handle.streamId);
  const toolStarts = chunks.filter((c) => c.type === 'tool_start');

  expect(toolStarts).toHaveLength(1);
  expect(toolStarts[0].toolName).toBe('search');
});

Limitations

  • No persistence - Data is lost when process exits
  • No sharing - Cannot share state between processes
  • Memory bound - Large states consume memory

For production use with persistence, see:

See Also

Released under the MIT License.