Redis Storage
The Redis storage package (@helix-agents/store-redis) provides production-ready state and stream storage using Redis. It supports persistence, multi-process deployments, and atomic operations for parallel tool execution.
When to Use
Good fit:
- Production deployments
- Multi-process/multi-server architectures
- Agents requiring crash recovery
- Real-time streaming across services
Not ideal for:
- Simple development (use in-memory)
- Serverless with cold starts (connection overhead)
- Cloudflare Workers (use D1/Durable Objects)
Installation
npm install @helix-agents/store-redis ioredisSetup
Basic Connection
import Redis from 'ioredis';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
// Create Redis client
const redis = new Redis({
host: 'localhost',
port: 6379,
// password: 'your-password',
// tls: {}, // For Redis Cloud/AWS ElastiCache
});
// Create stores
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);With Connection URL
const redis = new Redis(process.env.REDIS_URL);
// e.g., redis://user:password@host:6379
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);With Options
const stateStore = new RedisStateStore(redis, {
prefix: 'myapp:agents:', // Key prefix (default: 'helix:')
ttl: 86400 * 7, // State TTL in seconds (default: 7 days)
});
const streamManager = new RedisStreamManager(redis, {
prefix: 'myapp:streams:',
ttl: 86400, // Stream TTL (default: 24 hours)
maxLength: 10000, // Max chunks per stream (default: 10000)
});RedisStateStore
Session vs Run Identifiers
sessionId: Primary key for all state operations. All Redis keys are scoped bysessionId.runId: Execution metadata identifying a specific run within a session.
The key structure uses sessionId as the primary scoping mechanism. Multiple runs within the same session share state.
Key Structure
The store uses a separated key structure for efficient atomic operations:
{prefix}:session:{sessionId} -> HASH (main state)
{prefix}:session:messages:{sessionId} -> LIST (messages)
{prefix}:session:subsession:ids:{sessionId} -> SET (sub-session IDs)
{prefix}:session:subsession:{sessionId}:{subId} -> HASH (sub-session details)
{prefix}:session:customstate:{sessionId} -> HASH (scalar custom state)
{prefix}:session:customstate:arraykeys:{sessionId} -> SET (array field names)
{prefix}:session:customstate:list:{sessionId}:{field} -> LIST (array fields)Basic Operations
// Create a session
await stateStore.createSession('session-123', { agentType: 'my-agent' });
// Load state (reconstitutes from multiple keys)
const state = await stateStore.loadState('session-123');
// Save state (updates existing session)
await stateStore.saveState('session-123', state);
// Delete session (removes all related keys)
await stateStore.deleteSession('session-123');Atomic Operations
These operations are safe for concurrent access:
// Append messages - uses RPUSH (atomic)
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);
// Merge custom state - uses HSET + RPUSH
await stateStore.mergeCustomState('run-123', {
values: {
count: 5, // Scalar: HSET
items: ['new'], // Array: RPUSH (append)
},
arrayReplacements: new Set(['replaced']), // Arrays to replace, not append
warnings: [],
});
// Update status - uses HSET
await stateStore.updateStatus('run-123', 'completed');
// Increment step count - uses HINCRBY (atomic)
const newCount = await stateStore.incrementStepCount('run-123');Parallel Tool Execution
The key structure enables safe parallel updates:
// Two tools running in parallel
await Promise.all([
// Tool 1: Appends to items array
stateStore.mergeCustomState(runId, {
values: { items: ['from-tool-1'] },
arrayReplacements: new Set(),
warnings: [],
}),
// Tool 2: Appends to items array
stateStore.mergeCustomState(runId, {
values: { items: ['from-tool-2'] },
arrayReplacements: new Set(),
warnings: [],
}),
]);
// Result: items = [...original, 'from-tool-1', 'from-tool-2']
// (order may vary, but both appends are preserved)Message Queries
// Paginated messages (for UI)
const result = await stateStore.getMessages('run-123', {
offset: 0,
limit: 50,
includeThinking: false,
});
// Message count
const count = await stateStore.getMessageCount('run-123');Message Truncation
For crash recovery, messages can be truncated to match a checkpoint:
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('run-123', messageCount);This is used internally during crash recovery when resuming from a checkpoint. If a crash occurred mid-step, messages added after the checkpoint are orphaned and need to be removed. The messageCount parameter comes from the checkpoint's messageCount field.
RedisStreamManager
Key Structure
{prefix}:stream:{streamId} -> STREAM (Redis Stream for chunks)
{prefix}:stream:meta:{streamId} -> HASH (stream metadata)Basic Operations
// Create writer
const writer = await streamManager.createWriter('stream-123', 'run-123', 'researcher');
// Write chunks (uses XADD)
await writer.write({
type: 'text_delta',
agentId: 'run-123',
agentType: 'researcher',
timestamp: Date.now(),
delta: 'Hello world',
});
// Close writer (releases resources, doesn't end stream)
await writer.close();
// Create reader (uses XREAD with blocking)
const reader = await streamManager.createReader('stream-123');
if (reader) {
for await (const chunk of reader) {
console.log(chunk);
}
await reader.close();
}
// End stream
await streamManager.endStream('stream-123', { result: 'done' });
// Or fail stream
await streamManager.failStream('stream-123', 'Error message');Real-Time Streaming
Redis Streams provide real-time delivery:
// Writer (agent execution)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
for (const token of tokens) {
await writer.write({ type: 'text_delta', delta: token, ... });
// Reader receives immediately
}
// Reader (UI/client) - receives chunks as they're written
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
updateUI(chunk); // Real-time updates
}Resumable Streams
Support for client reconnection:
// Get stream info
const info = await streamManager.getStreamInfo('stream-123');
console.log(info?.totalChunks);
console.log(info?.latestSequence);
console.log(info?.status);
// Create resumable reader
const reader = await streamManager.createResumableReader('stream-123', {
fromSequence: lastKnownSequence, // Resume from this point
});
if (reader) {
console.log(`Starting from sequence ${reader.currentSequence}`);
for await (const chunk of reader) {
// Process chunks, track position
savePosition(reader.currentSequence);
}
}Stream Cleanup
For crash recovery and stream reset scenarios:
// Clean up chunks beyond a specific step (atomic Lua script)
await streamManager.cleanupToStep('stream-123', stepCount);
// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream('stream-123');cleanupToStep() removes all chunks with step > stepCount. This is used during crash recovery to remove orphaned chunks that were written after the checkpoint being resumed from. The operation uses an atomic Lua script to ensure consistency.
resetStream() clears the entire stream. This is called by execute() when starting a fresh run within a session. It ensures clients don't see stale chunks from previous executions.
Stream TTL
Streams auto-expire to prevent unbounded growth:
const streamManager = new RedisStreamManager(redis, {
ttl: 3600, // 1 hour
});
// Stream keys expire after TTL
// Completed streams can be read until expiryComplete Example
import Redis from 'ioredis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Setup
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis, { prefix: 'myapp:' });
const streamManager = new RedisStreamManager(redis, { prefix: 'myapp:' });
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research AI agents', {
sessionId: 'my-session-1',
});
// Stream in real-time
const stream = await handle.stream();
if (stream) {
for await (const chunk of stream) {
process.stdout.write(chunk.type === 'text_delta' ? chunk.delta : '');
}
}
// Get result
const result = await handle.result();
console.log('\\nResult:', result.output);
// Cleanup (optional)
await stateStore.deleteSession(handle.sessionId);
// Close connection when done
await redis.quit();Production Considerations
Connection Pooling
Use connection pooling for high-throughput:
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
connectTimeout: 10000,
});Cluster Support
For Redis Cluster:
import Redis from 'ioredis';
const redis = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
{ host: 'node3', port: 6379 },
]);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);Error Handling
Handle Redis connection errors:
redis.on('error', (err) => {
console.error('Redis error:', err);
});
redis.on('connect', () => {
console.log('Redis connected');
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await redis.quit();
process.exit(0);
});Monitoring
Monitor Redis memory and connections:
# Memory usage
redis-cli INFO memory
# Connected clients
redis-cli INFO clients
# Key count
redis-cli DBSIZE
# Specific key patterns
redis-cli KEYS "myapp:state:*" | wc -lBackup and Recovery
Agent state can be recovered from Redis after crashes:
// After process restart
const handle = await executor.getHandle(MyAgent, savedSessionId);
if (handle) {
const { canResume, reason } = await handle.canResume();
if (canResume) {
// Resume from saved state
const newHandle = await handle.resume();
const result = await newHandle.result();
}
}Limitations
No Transactions Across Keys
Atomic operations work per-key, not across multiple keys:
// This is NOT atomic across both updates
await stateStore.updateStatus(sessionId, 'completed');
await stateStore.mergeCustomState(sessionId, { values: { finalResult: result } });
// If process crashes between these, state may be inconsistentMemory Usage
Monitor Redis memory with many concurrent agents:
// Set maxmemory and policy
// redis.conf: maxmemory 1gb
// redis.conf: maxmemory-policy allkeys-lru
// Or use TTL aggressively
const stateStore = new RedisStateStore(redis, { ttl: 3600 });Network Latency
Each operation is a network round-trip. Batch when possible:
// Multiple sequential calls - multiple round-trips
await stateStore.appendMessages(sessionId, [msg1]);
await stateStore.appendMessages(sessionId, [msg2]);
// Single call - one round-trip
await stateStore.appendMessages(sessionId, [msg1, msg2]);Next Steps
- In-Memory Storage - For development
- Cloudflare Storage - For edge deployment
- Temporal Runtime - Uses Redis storage