Skip to content

@helix-agents/store-redis

Redis implementations of store interfaces for production use. Provides durable state storage and stream management across processes.

Installation

bash
npm install @helix-agents/store-redis ioredis

RedisStateStore

Redis-backed state storage.

typescript
import { RedisStateStore } from '@helix-agents/store-redis';

const stateStore = new RedisStateStore({
  host: 'localhost',
  port: 6379,
  password: 'optional',
  db: 0,
  keyPrefix: 'helix:', // Optional prefix for keys
  ttl: 86400 * 7, // Optional TTL in seconds (default: 7 days)
});

Configuration Options

typescript
interface RedisStateStoreOptions {
  // Connection options (ioredis compatible)
  host?: string;
  port?: number;
  password?: string;
  db?: number;

  // Or provide existing client
  client?: Redis;

  // Key prefix for namespacing
  keyPrefix?: string;

  // TTL for state entries (seconds)
  ttl?: number;

  // Logger
  logger?: Logger;
}

Methods

Same interface as InMemoryStateStore:

typescript
await stateStore.save(state); // runId is inside state object
const state = await stateStore.load(runId);
await stateStore.delete(runId);
await stateStore.updateStatus(runId, status);
await stateStore.appendMessages(runId, messages);
const { messages, hasMore } = await stateStore.getMessages(runId, options);

Data Structure

State is stored as Redis hashes:

helix:state:{runId} -> {
  runId: string,
  agentType: string,
  status: string,
  stepCount: number,
  customState: JSON string,
  messages: JSON string,
  output: JSON string,
  error: string,
  ...
}

RedisStreamManager

Redis-backed stream management using Redis Streams.

typescript
import { RedisStreamManager } from '@helix-agents/store-redis';

const streamManager = new RedisStreamManager({
  host: 'localhost',
  port: 6379,
  keyPrefix: 'helix:',
  maxStreamLength: 10000, // Max chunks per stream
  blockTimeout: 5000, // Read timeout (ms)
});

Configuration Options

typescript
interface RedisStreamManagerOptions {
  // Connection options
  host?: string;
  port?: number;
  password?: string;
  db?: number;
  client?: Redis;

  // Stream options
  keyPrefix?: string;
  maxStreamLength?: number; // Trim streams to this length
  blockTimeout?: number; // XREAD BLOCK timeout
  logger?: Logger;
}

Methods

Same interface as InMemoryStreamManager:

typescript
const writer = await streamManager.createWriter(streamId, agentId, agentType);
const reader = await streamManager.createReader(streamId);
await streamManager.endStream(streamId);
await streamManager.failStream(streamId, error);
const info = await streamManager.getInfo(streamId);

Resumable Reader

typescript
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100,
});

if (reader) {
  for await (const { chunk, sequence } of reader) {
    // Process chunk
    // sequence can be used for Last-Event-ID
  }
}

Data Structure

Uses Redis Streams:

helix:stream:{streamId} -> XADD entries with:
  - chunk: JSON-encoded StreamChunk
  - sequence: Auto-incrementing ID

helix:stream:{streamId}:meta -> {
  status: 'active' | 'ended' | 'failed',
  error?: string,
  createdAt: ISO timestamp,
}

RedisResumableStorage

Lower-level resumable stream storage.

typescript
import { RedisResumableStorage } from '@helix-agents/store-redis';

const storage = new RedisResumableStorage({
  client: redisClient,
  keyPrefix: 'helix:',
});

// Create stream
await storage.create(streamId);

// Append chunk with sequence
await storage.append(streamId, chunk, sequence);

// Read from sequence
const chunks = await storage.readFrom(streamId, fromSequence, limit);

// Subscribe to new chunks
const unsubscribe = storage.subscribe(streamId, (chunk, sequence) => {
  console.log(`New chunk [${sequence}]:`, chunk);
});

// Get metadata
const metadata = await storage.getMetadata(streamId);

// End stream
await storage.end(streamId);

Error Classes

typescript
import {
  SequenceConflictError, // Sequence number conflict
  MalformedChunkError, // Invalid chunk data
} from '@helix-agents/store-redis';

try {
  await storage.append(streamId, chunk, sequence);
} catch (error) {
  if (error instanceof SequenceConflictError) {
    // Handle sequence conflict
  }
}

Usage Example

typescript
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Create Redis-backed stores
const stateStore = new RedisStateStore({
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  keyPrefix: 'myapp:agents:',
});

const streamManager = new RedisStreamManager({
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  keyPrefix: 'myapp:agents:',
});

// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();

Production Considerations

Connection Pooling

Use a shared Redis client:

typescript
import Redis from 'ioredis';

const redisClient = new Redis({
  host: process.env.REDIS_HOST,
  maxRetriesPerRequest: 3,
  enableReadyCheck: true,
});

const stateStore = new RedisStateStore({ client: redisClient });
const streamManager = new RedisStreamManager({ client: redisClient });

Cluster Support

typescript
import Redis from 'ioredis';

const cluster = new Redis.Cluster([
  { host: 'node1', port: 6379 },
  { host: 'node2', port: 6379 },
]);

const stateStore = new RedisStateStore({ client: cluster });

Error Handling

typescript
stateStore.on('error', (error) => {
  console.error('Redis state store error:', error);
});

streamManager.on('error', (error) => {
  console.error('Redis stream manager error:', error);
});

TTL Management

Set appropriate TTLs based on your retention needs:

typescript
const stateStore = new RedisStateStore({
  ttl: 86400 * 30, // 30 days for state
});

Schemas

Zod schema for stored state:

typescript
import { AgentStateSchema } from '@helix-agents/store-redis';

// Validate state
const result = AgentStateSchema.safeParse(data);

See Also

Released under the MIT License.