Skip to content

Redis Storage

The Redis storage package (@helix-agents/store-redis) provides production-ready state and stream storage using Redis. It supports persistence, multi-process deployments, and atomic operations for parallel tool execution.

When to Use

Good fit:

  • Production deployments
  • Multi-process/multi-server architectures
  • Agents requiring crash recovery
  • Real-time streaming across services

Not ideal for:

  • Simple development (use in-memory)
  • Serverless with cold starts (connection overhead)
  • Cloudflare Workers (use D1/Durable Objects)

Installation

bash
npm install @helix-agents/store-redis ioredis

Setup

Basic Connection

typescript
import Redis from 'ioredis';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';

// Create Redis client
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  // password: 'your-password',
  // tls: {},  // For Redis Cloud/AWS ElastiCache
});

// Create stores
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

With Connection URL

typescript
const redis = new Redis(process.env.REDIS_URL);
// e.g., redis://user:password@host:6379

const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

With Options

typescript
const stateStore = new RedisStateStore(redis, {
  prefix: 'myapp:agents:', // Key prefix (default: 'agent')
  ttl: 86400 * 7, // State TTL in seconds (default: 7 days)
});

const streamManager = new RedisStreamManager(redis, {
  prefix: 'myapp:streams:',
  ttl: 86400, // Stream TTL (default: 24 hours)
  maxChunks: 10000, // Max chunks per stream — applied via LTRIM (default: 10000, 0 = unlimited)
});

RedisStateStore

Session vs Run Identifiers

  • sessionId: Primary key for all state operations. All Redis keys are scoped by sessionId.
  • runId: Execution metadata identifying a specific run within a session.

The key structure uses sessionId as the primary scoping mechanism. Multiple runs within the same session share state.

Key Structure

The store uses a separated key structure for efficient atomic operations:

{prefix}:session:{sessionId}                      -> HASH (main state)
{prefix}:session:messages:{sessionId}             -> LIST (messages)
{prefix}:session:subsession:ids:{sessionId}       -> SET (sub-session IDs)
{prefix}:session:subsession:{sessionId}:{subId}   -> HASH (sub-session details)
{prefix}:session:customstate:{sessionId}          -> HASH (scalar custom state)
{prefix}:session:customstate:arraykeys:{sessionId} -> SET (array field names)
{prefix}:session:customstate:list:{sessionId}:{field} -> LIST (array fields)

Basic Operations

typescript
// Create a session
await stateStore.createSession('session-123', { agentType: 'my-agent' });

// Load state (reconstitutes from multiple keys)
const state = await stateStore.loadState('session-123');

// Save state (updates existing session)
await stateStore.saveState('session-123', state);

// Delete session (removes all related keys)
await stateStore.deleteSession('session-123');

Atomic Operations

These operations are safe for concurrent access:

typescript
// Append messages - uses RPUSH (atomic)
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);

// Merge custom state - applies StepWrites ops atomically
await stateStore.mergeCustomState('run-123', {
  ops: [
    { kind: 'replace', key: 'count', value: 5 }, // Scalar: HSET
    { kind: 'append', key: 'items', items: ['new'] }, // Array: RPUSH (append)
    { kind: 'replace', key: 'replaced', value: [1, 2] }, // Full replacement
  ],
  warnings: [],
});

// Update status - uses HSET
await stateStore.updateStatus('run-123', 'completed');

// Increment step count - uses HINCRBY (atomic)
const newCount = await stateStore.incrementStepCount('run-123');

Atomic Write (saveStateAndPromoteStaging)

The v7 atomic write primitive is implemented as a server-side Lua script (EVAL-loaded once, then EVALSHA-cached). The script executes the message append, state hash update, staging promotion, and checkpoint creation as a single atomic Redis operation — there is no client-side compensating-write window.

typescript
// Single round-trip, fully atomic on the server
await stateStore.saveStateAndPromoteStaging(
  sessionId,
  newState,
  pendingMessages,
  { stepId: 'step-7', stepCount: 7, streamSequence: 142 },
  { expectedVersion: state.version }
);

This satisfies cross-runtime invariant C-1: pendingClientToolCalls, clientToolCallOwnership, completed phase-1 messages, the checkpoint, and suspendedStepId all become visible together. Redis 6+ is required (already the documented minimum).

Sequential fallback escape hatch (allowSequentialFallback)

If your Redis deployment forbids EVAL/EVALSHA (some hardened managed offerings, certain proxy layers), pass allowSequentialFallback: true to the constructor:

typescript
const stateStore = new RedisStateStore(redisClient, {
  keyPrefix: 'agent',
  allowSequentialFallback: true, // ⚠️ NON-ATOMIC — see warning below
});

When allowSequentialFallback is enabled, saveStateAndPromoteStaging falls back to a sequential pipeline of individual commands. This loses the atomicity guarantee — a process crash mid-pipeline can leave the session in a partially-updated state that violates invariant C-1.

WARNING: treat allowSequentialFallback: true as a debugging or hardware-constrained deployment escape hatch only. Production deployments SHOULD run a Redis instance that supports EVAL and leave this flag at its default (false).

Known atomicity gap: customState pipeline

RedisStateStore.saveState writes customState in a separate non-atomic pipeline AFTER the version-check Lua script returns. A process crash between the two leaves the session hash with the new version but customState partially updated. The next saveState's "clean up orphaned arrays" loop recovers from this within ~1 saveState window in active sessions (longer for paused sessions).

The proper fix (a Lua script that performs all customState mutations atomically) is tracked as FU-A2-41 in docs/dev/follow-ups.md.

v7 SessionState Fields

The following fields are persisted as JSON-encoded entries on the existing __agents:state:{sessionId} hash — no new keys are introduced and existing keys remain read-compatible (records that pre-date this version simply return undefined for the new fields):

  • suspendedAwaitingChildren — map of parentToolCallId → SuspendedChildWait
  • suspendedStepId — mid-step suspension marker
  • tracingContext{ traceId, rootSpanId } for trace continuity across resumes
  • expiresAt — epoch-ms hint for operator GC via expiredSessionCleanup
  • failureReason — discriminator for failed children (e.g. 'parent_suspended')
  • completedClientToolCalls — root-session tombstone map for HITL idempotency

compareAndSetStatus Return Shape (v7)

The Lua-backed CAS now returns a discriminated union including the prior status and version on conflict, so callers can make richer retry decisions without an extra round-trip:

typescript
const result = await stateStore.compareAndSetStatus(sessionId, ['active'], 'paused', {
  error: 'user paused',
  expectedVersion: 7,
});

if (result.ok) {
  // result.newVersion = post-bump version
} else {
  // result.currentStatus, result.currentVersion = stored values at conflict
}

Parallel Tool Execution

The key structure enables safe parallel updates:

typescript
// Two tools running in parallel
await Promise.all([
  // Tool 1: Appends to items array
  stateStore.mergeCustomState(runId, {
    ops: [{ kind: 'append', key: 'items', items: ['from-tool-1'] }],
    warnings: [],
  }),
  // Tool 2: Appends to items array
  stateStore.mergeCustomState(runId, {
    ops: [{ kind: 'append', key: 'items', items: ['from-tool-2'] }],
    warnings: [],
  }),
]);

// Result: items = [...original, 'from-tool-1', 'from-tool-2']
// (order may vary, but both appends are preserved)

Message Queries

typescript
// Paginated messages (for UI)
const result = await stateStore.getMessages('run-123', {
  offset: 0,
  limit: 50,
  includeThinking: false,
});

// Message count
const count = await stateStore.getMessageCount('run-123');

Message Truncation

For crash recovery, messages can be truncated to match a checkpoint:

typescript
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('run-123', messageCount);

This is used internally during crash recovery when resuming from a checkpoint. If a crash occurred mid-step, messages added after the checkpoint are orphaned and need to be removed. The messageCount parameter comes from the checkpoint's messageCount field.

RedisStreamManager

Key Structure

{prefix}:stream:{streamId}       -> STREAM (Redis Stream for chunks)
{prefix}:stream:meta:{streamId}  -> HASH (stream metadata)

Basic Operations

typescript
// Create writer
const writer = await streamManager.createWriter('stream-123', 'run-123', 'researcher');

// Write chunks (uses XADD)
await writer.write({
  type: 'text_delta',
  agentId: 'run-123',
  agentType: 'researcher',
  timestamp: Date.now(),
  delta: 'Hello world',
});

// Close writer (releases resources, doesn't end stream)
await writer.close();

// Create reader (uses XREAD with blocking)
const reader = await streamManager.createReader('stream-123');
if (reader) {
  for await (const chunk of reader) {
    console.log(chunk);
  }
  await reader.close();
}

// End stream
await streamManager.endStream('stream-123', { result: 'done' });

// Or fail stream
await streamManager.failStream('stream-123', 'Error message');

Real-Time Streaming

Redis Streams provide real-time delivery:

typescript
// Writer (agent execution)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
for (const token of tokens) {
  await writer.write({ type: 'text_delta', delta: token, ... });
  // Reader receives immediately
}

// Reader (UI/client) - receives chunks as they're written
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
  updateUI(chunk);  // Real-time updates
}

Resumable Streams

Support for client reconnection:

typescript
// Get stream info
const info = await streamManager.getStreamInfo('stream-123');
console.log(info?.totalChunks);
console.log(info?.latestSequence);
console.log(info?.status);

// Create resumable reader
const reader = await streamManager.createResumableReader('stream-123', {
  fromSequence: lastKnownSequence, // Resume from this point
});

if (reader) {
  console.log(`Starting from sequence ${reader.currentSequence}`);
  for await (const chunk of reader) {
    // Process chunks, track position
    savePosition(reader.currentSequence);
  }
}

Stream Cleanup

For crash recovery and stream reset scenarios:

typescript
// Clean up chunks beyond a specific step (atomic Lua script)
await streamManager.cleanupToStep('stream-123', stepCount);

// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream('stream-123');

cleanupToStep() removes all chunks with step > stepCount. This is used during crash recovery to remove orphaned chunks that were written after the checkpoint being resumed from. The operation uses an atomic Lua script to ensure consistency.

resetStream() clears the entire stream. This is called by execute() when starting a fresh run within a session. It ensures clients don't see stale chunks from previous executions.

Stream TTL

Streams auto-expire to prevent unbounded growth:

typescript
const streamManager = new RedisStreamManager(redis, {
  ttl: 3600, // 1 hour
});

// Stream keys expire after TTL
// Completed streams can be read until expiry

Complete Example

typescript
import Redis from 'ioredis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Setup
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis, { prefix: 'myapp:' });
const streamManager = new RedisStreamManager(redis, { prefix: 'myapp:' });

const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research AI agents', {
  sessionId: 'my-session-1',
});

// Stream in real-time
const stream = await handle.stream();
if (stream) {
  for await (const chunk of stream) {
    process.stdout.write(chunk.type === 'text_delta' ? chunk.delta : '');
  }
}

// Get result
const result = await handle.result();
console.log('\\nResult:', result.output);

// Cleanup (optional)
await stateStore.deleteSession(handle.sessionId);

// Close connection when done
await redis.quit();

Production Considerations

Connection Pooling

Use connection pooling for high-throughput:

typescript
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: 3,
  enableReadyCheck: true,
  connectTimeout: 10000,
});

Cluster Support

For Redis Cluster:

typescript
import Redis from 'ioredis';

const redis = new Redis.Cluster([
  { host: 'node1', port: 6379 },
  { host: 'node2', port: 6379 },
  { host: 'node3', port: 6379 },
]);

const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

Error Handling

Handle Redis connection errors:

typescript
redis.on('error', (err) => {
  console.error('Redis error:', err);
});

redis.on('connect', () => {
  console.log('Redis connected');
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await redis.quit();
  process.exit(0);
});

Monitoring

Monitor Redis memory and connections:

bash
# Memory usage
redis-cli INFO memory

# Connected clients
redis-cli INFO clients

# Key count
redis-cli DBSIZE

# Specific key patterns
redis-cli KEYS "myapp:state:*" | wc -l

Backup and Recovery

Agent state can be recovered from Redis after crashes:

typescript
// After process restart
const handle = await executor.getHandle(MyAgent, savedSessionId);

if (handle) {
  const { canResume, reason } = await handle.canResume();
  if (canResume) {
    // Resume from saved state
    const newHandle = await handle.resume();
    const result = await newHandle.result();
  }
}

Limitations

No Transactions Across Keys

Atomic operations work per-key, not across multiple keys:

typescript
// This is NOT atomic across both updates
await stateStore.updateStatus(sessionId, 'completed');
await stateStore.mergeCustomState(sessionId, {
  ops: [{ kind: 'replace', key: 'finalResult', value: result }],
  warnings: [],
});

// If process crashes between these, state may be inconsistent

Memory Usage

Monitor Redis memory with many concurrent agents:

typescript
// Set maxmemory and policy
// redis.conf: maxmemory 1gb
// redis.conf: maxmemory-policy allkeys-lru

// Or use TTL aggressively
const stateStore = new RedisStateStore(redis, { ttl: 3600 });

Network Latency

Each operation is a network round-trip. Batch when possible:

typescript
// Multiple sequential calls - multiple round-trips
await stateStore.appendMessages(sessionId, [msg1]);
await stateStore.appendMessages(sessionId, [msg2]);

// Single call - one round-trip
await stateStore.appendMessages(sessionId, [msg1, msg2]);

Next Steps

Released under the MIT License.