Skip to content

Redis Storage

The Redis storage package (@helix-agents/store-redis) provides production-ready state and stream storage using Redis. It supports persistence, multi-process deployments, and atomic operations for parallel tool execution.

When to Use

Good fit:

  • Production deployments
  • Multi-process/multi-server architectures
  • Agents requiring crash recovery
  • Real-time streaming across services

Not ideal for:

  • Simple development (use in-memory)
  • Serverless with cold starts (connection overhead)
  • Cloudflare Workers (use D1/Durable Objects)

Installation

bash
npm install @helix-agents/store-redis ioredis

Setup

Basic Connection

typescript
import Redis from 'ioredis';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';

// Create Redis client
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  // password: 'your-password',
  // tls: {},  // For Redis Cloud/AWS ElastiCache
});

// Create stores
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

With Connection URL

typescript
const redis = new Redis(process.env.REDIS_URL);
// e.g., redis://user:password@host:6379

const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

With Options

typescript
const stateStore = new RedisStateStore(redis, {
  prefix: 'myapp:agents:', // Key prefix (default: 'helix:')
  ttl: 86400 * 7, // State TTL in seconds (default: 7 days)
});

const streamManager = new RedisStreamManager(redis, {
  prefix: 'myapp:streams:',
  ttl: 86400, // Stream TTL (default: 24 hours)
  maxLength: 10000, // Max chunks per stream (default: 10000)
});

RedisStateStore

Key Structure

The store uses a separated key structure for efficient atomic operations:

{prefix}:state:{runId}                      -> HASH (main state)
{prefix}:messages:{runId}                   -> LIST (messages)
{prefix}:subagentref:ids:{runId}            -> SET (sub-agent IDs)
{prefix}:subagentref:{runId}:{subId}        -> HASH (sub-agent details)
{prefix}:customstate:{runId}                -> HASH (scalar custom state)
{prefix}:customstate:arraykeys:{runId}      -> SET (array field names)
{prefix}:customstate:list:{runId}:{field}   -> LIST (array fields)

Basic Operations

typescript
// Save state
await stateStore.save(state);

// Load state (reconstitutes from multiple keys)
const state = await stateStore.load('run-123');

// Delete state (removes all related keys)
await stateStore.delete('run-123');

Atomic Operations

These operations are safe for concurrent access:

typescript
// Append messages - uses RPUSH (atomic)
await stateStore.appendMessages('run-123', [{ role: 'assistant', content: 'Hello!' }]);

// Merge custom state - uses HSET + RPUSH
await stateStore.mergeCustomState('run-123', {
  values: {
    count: 5, // Scalar: HSET
    items: ['new'], // Array: RPUSH (append)
  },
  arrayReplacements: new Set(['replaced']), // Arrays to replace, not append
  warnings: [],
});

// Update status - uses HSET
await stateStore.updateStatus('run-123', 'completed');

// Increment step count - uses HINCRBY (atomic)
const newCount = await stateStore.incrementStepCount('run-123');

Parallel Tool Execution

The key structure enables safe parallel updates:

typescript
// Two tools running in parallel
await Promise.all([
  // Tool 1: Appends to items array
  stateStore.mergeCustomState(runId, {
    values: { items: ['from-tool-1'] },
    arrayReplacements: new Set(),
    warnings: [],
  }),
  // Tool 2: Appends to items array
  stateStore.mergeCustomState(runId, {
    values: { items: ['from-tool-2'] },
    arrayReplacements: new Set(),
    warnings: [],
  }),
]);

// Result: items = [...original, 'from-tool-1', 'from-tool-2']
// (order may vary, but both appends are preserved)

Message Queries

typescript
// Paginated messages (for UI)
const result = await stateStore.getMessages('run-123', {
  offset: 0,
  limit: 50,
  includeThinking: false,
});

// Message count
const count = await stateStore.getMessageCount('run-123');

RedisStreamManager

Key Structure

{prefix}:stream:{streamId}       -> STREAM (Redis Stream for chunks)
{prefix}:stream:meta:{streamId}  -> HASH (stream metadata)

Basic Operations

typescript
// Create writer
const writer = await streamManager.createWriter('stream-123', 'run-123', 'researcher');

// Write chunks (uses XADD)
await writer.write({
  type: 'text_delta',
  agentId: 'run-123',
  agentType: 'researcher',
  timestamp: Date.now(),
  delta: 'Hello world',
});

// Close writer (releases resources, doesn't end stream)
await writer.close();

// Create reader (uses XREAD with blocking)
const reader = await streamManager.createReader('stream-123');
if (reader) {
  for await (const chunk of reader) {
    console.log(chunk);
  }
  await reader.close();
}

// End stream
await streamManager.endStream('stream-123', { result: 'done' });

// Or fail stream
await streamManager.failStream('stream-123', 'Error message');

Real-Time Streaming

Redis Streams provide real-time delivery:

typescript
// Writer (agent execution)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
for (const token of tokens) {
  await writer.write({ type: 'text_delta', delta: token, ... });
  // Reader receives immediately
}

// Reader (UI/client) - receives chunks as they're written
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
  updateUI(chunk);  // Real-time updates
}

Resumable Streams

Support for client reconnection:

typescript
// Get stream info
const info = await streamManager.getStreamInfo('stream-123');
console.log(info?.totalChunks);
console.log(info?.latestSequence);
console.log(info?.status);

// Create resumable reader
const reader = await streamManager.createResumableReader('stream-123', {
  fromSequence: lastKnownSequence, // Resume from this point
});

if (reader) {
  console.log(`Starting from sequence ${reader.currentSequence}`);
  for await (const chunk of reader) {
    // Process chunks, track position
    savePosition(reader.currentSequence);
  }
}

Stream TTL

Streams auto-expire to prevent unbounded growth:

typescript
const streamManager = new RedisStreamManager(redis, {
  ttl: 3600, // 1 hour
});

// Stream keys expire after TTL
// Completed streams can be read until expiry

Complete Example

typescript
import Redis from 'ioredis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Setup
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis, { prefix: 'myapp:' });
const streamManager = new RedisStreamManager(redis, { prefix: 'myapp:' });

const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research AI agents');

// Stream in real-time
const stream = await handle.stream();
if (stream) {
  for await (const chunk of stream) {
    process.stdout.write(chunk.type === 'text_delta' ? chunk.delta : '');
  }
}

// Get result
const result = await handle.result();
console.log('\\nResult:', result.output);

// Cleanup (optional)
await stateStore.delete(handle.runId);

// Close connection when done
await redis.quit();

Production Considerations

Connection Pooling

Use connection pooling for high-throughput:

typescript
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: 3,
  enableReadyCheck: true,
  connectTimeout: 10000,
});

Cluster Support

For Redis Cluster:

typescript
import Redis from 'ioredis';

const redis = new Redis.Cluster([
  { host: 'node1', port: 6379 },
  { host: 'node2', port: 6379 },
  { host: 'node3', port: 6379 },
]);

const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

Error Handling

Handle Redis connection errors:

typescript
redis.on('error', (err) => {
  console.error('Redis error:', err);
});

redis.on('connect', () => {
  console.log('Redis connected');
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await redis.quit();
  process.exit(0);
});

Monitoring

Monitor Redis memory and connections:

bash
# Memory usage
redis-cli INFO memory

# Connected clients
redis-cli INFO clients

# Key count
redis-cli DBSIZE

# Specific key patterns
redis-cli KEYS "myapp:state:*" | wc -l

Backup and Recovery

Agent state can be recovered from Redis after crashes:

typescript
// After process restart
const handle = await executor.getHandle(MyAgent, savedRunId);

if (handle) {
  const { canResume, reason } = await handle.canResume();
  if (canResume) {
    // Resume from saved state
    const newHandle = await handle.resume();
    const result = await newHandle.result();
  }
}

Limitations

No Transactions Across Keys

Atomic operations work per-key, not across multiple keys:

typescript
// This is NOT atomic across both updates
await stateStore.updateStatus(runId, 'completed');
await stateStore.mergeCustomState(runId, { values: { finalResult: result } });

// If process crashes between these, state may be inconsistent

Memory Usage

Monitor Redis memory with many concurrent agents:

typescript
// Set maxmemory and policy
// redis.conf: maxmemory 1gb
// redis.conf: maxmemory-policy allkeys-lru

// Or use TTL aggressively
const stateStore = new RedisStateStore(redis, { ttl: 3600 });

Network Latency

Each operation is a network round-trip. Batch when possible:

typescript
// Multiple sequential calls - multiple round-trips
await stateStore.appendMessages(runId, [msg1]);
await stateStore.appendMessages(runId, [msg2]);

// Single call - one round-trip
await stateStore.appendMessages(runId, [msg1, msg2]);

Next Steps

Released under the MIT License.