Redis Storage
The Redis storage package (@helix-agents/store-redis) provides production-ready state and stream storage using Redis. It supports persistence, multi-process deployments, and atomic operations for parallel tool execution.
When to Use
Good fit:
- Production deployments
- Multi-process/multi-server architectures
- Agents requiring crash recovery
- Real-time streaming across services
Not ideal for:
- Simple development (use in-memory)
- Serverless with cold starts (connection overhead)
- Cloudflare Workers (use D1/Durable Objects)
Installation
npm install @helix-agents/store-redis ioredisSetup
Basic Connection
import Redis from 'ioredis';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
// Create Redis client
const redis = new Redis({
host: 'localhost',
port: 6379,
// password: 'your-password',
// tls: {}, // For Redis Cloud/AWS ElastiCache
});
// Create stores
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);With Connection URL
const redis = new Redis(process.env.REDIS_URL);
// e.g., redis://user:password@host:6379
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);With Options
const stateStore = new RedisStateStore(redis, {
prefix: 'myapp:agents:', // Key prefix (default: 'helix:')
ttl: 86400 * 7, // State TTL in seconds (default: 7 days)
});
const streamManager = new RedisStreamManager(redis, {
prefix: 'myapp:streams:',
ttl: 86400, // Stream TTL (default: 24 hours)
maxLength: 10000, // Max chunks per stream (default: 10000)
});RedisStateStore
Key Structure
The store uses a separated key structure for efficient atomic operations:
{prefix}:state:{runId} -> HASH (main state)
{prefix}:messages:{runId} -> LIST (messages)
{prefix}:subagentref:ids:{runId} -> SET (sub-agent IDs)
{prefix}:subagentref:{runId}:{subId} -> HASH (sub-agent details)
{prefix}:customstate:{runId} -> HASH (scalar custom state)
{prefix}:customstate:arraykeys:{runId} -> SET (array field names)
{prefix}:customstate:list:{runId}:{field} -> LIST (array fields)Basic Operations
// Save state
await stateStore.save(state);
// Load state (reconstitutes from multiple keys)
const state = await stateStore.load('run-123');
// Delete state (removes all related keys)
await stateStore.delete('run-123');Atomic Operations
These operations are safe for concurrent access:
// Append messages - uses RPUSH (atomic)
await stateStore.appendMessages('run-123', [{ role: 'assistant', content: 'Hello!' }]);
// Merge custom state - uses HSET + RPUSH
await stateStore.mergeCustomState('run-123', {
values: {
count: 5, // Scalar: HSET
items: ['new'], // Array: RPUSH (append)
},
arrayReplacements: new Set(['replaced']), // Arrays to replace, not append
warnings: [],
});
// Update status - uses HSET
await stateStore.updateStatus('run-123', 'completed');
// Increment step count - uses HINCRBY (atomic)
const newCount = await stateStore.incrementStepCount('run-123');Parallel Tool Execution
The key structure enables safe parallel updates:
// Two tools running in parallel
await Promise.all([
// Tool 1: Appends to items array
stateStore.mergeCustomState(runId, {
values: { items: ['from-tool-1'] },
arrayReplacements: new Set(),
warnings: [],
}),
// Tool 2: Appends to items array
stateStore.mergeCustomState(runId, {
values: { items: ['from-tool-2'] },
arrayReplacements: new Set(),
warnings: [],
}),
]);
// Result: items = [...original, 'from-tool-1', 'from-tool-2']
// (order may vary, but both appends are preserved)Message Queries
// Paginated messages (for UI)
const result = await stateStore.getMessages('run-123', {
offset: 0,
limit: 50,
includeThinking: false,
});
// Message count
const count = await stateStore.getMessageCount('run-123');RedisStreamManager
Key Structure
{prefix}:stream:{streamId} -> STREAM (Redis Stream for chunks)
{prefix}:stream:meta:{streamId} -> HASH (stream metadata)Basic Operations
// Create writer
const writer = await streamManager.createWriter('stream-123', 'run-123', 'researcher');
// Write chunks (uses XADD)
await writer.write({
type: 'text_delta',
agentId: 'run-123',
agentType: 'researcher',
timestamp: Date.now(),
delta: 'Hello world',
});
// Close writer (releases resources, doesn't end stream)
await writer.close();
// Create reader (uses XREAD with blocking)
const reader = await streamManager.createReader('stream-123');
if (reader) {
for await (const chunk of reader) {
console.log(chunk);
}
await reader.close();
}
// End stream
await streamManager.endStream('stream-123', { result: 'done' });
// Or fail stream
await streamManager.failStream('stream-123', 'Error message');Real-Time Streaming
Redis Streams provide real-time delivery:
// Writer (agent execution)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
for (const token of tokens) {
await writer.write({ type: 'text_delta', delta: token, ... });
// Reader receives immediately
}
// Reader (UI/client) - receives chunks as they're written
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
updateUI(chunk); // Real-time updates
}Resumable Streams
Support for client reconnection:
// Get stream info
const info = await streamManager.getStreamInfo('stream-123');
console.log(info?.totalChunks);
console.log(info?.latestSequence);
console.log(info?.status);
// Create resumable reader
const reader = await streamManager.createResumableReader('stream-123', {
fromSequence: lastKnownSequence, // Resume from this point
});
if (reader) {
console.log(`Starting from sequence ${reader.currentSequence}`);
for await (const chunk of reader) {
// Process chunks, track position
savePosition(reader.currentSequence);
}
}Stream TTL
Streams auto-expire to prevent unbounded growth:
const streamManager = new RedisStreamManager(redis, {
ttl: 3600, // 1 hour
});
// Stream keys expire after TTL
// Completed streams can be read until expiryComplete Example
import Redis from 'ioredis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Setup
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis, { prefix: 'myapp:' });
const streamManager = new RedisStreamManager(redis, { prefix: 'myapp:' });
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research AI agents');
// Stream in real-time
const stream = await handle.stream();
if (stream) {
for await (const chunk of stream) {
process.stdout.write(chunk.type === 'text_delta' ? chunk.delta : '');
}
}
// Get result
const result = await handle.result();
console.log('\\nResult:', result.output);
// Cleanup (optional)
await stateStore.delete(handle.runId);
// Close connection when done
await redis.quit();Production Considerations
Connection Pooling
Use connection pooling for high-throughput:
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
connectTimeout: 10000,
});Cluster Support
For Redis Cluster:
import Redis from 'ioredis';
const redis = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
{ host: 'node3', port: 6379 },
]);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);Error Handling
Handle Redis connection errors:
redis.on('error', (err) => {
console.error('Redis error:', err);
});
redis.on('connect', () => {
console.log('Redis connected');
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await redis.quit();
process.exit(0);
});Monitoring
Monitor Redis memory and connections:
# Memory usage
redis-cli INFO memory
# Connected clients
redis-cli INFO clients
# Key count
redis-cli DBSIZE
# Specific key patterns
redis-cli KEYS "myapp:state:*" | wc -lBackup and Recovery
Agent state can be recovered from Redis after crashes:
// After process restart
const handle = await executor.getHandle(MyAgent, savedRunId);
if (handle) {
const { canResume, reason } = await handle.canResume();
if (canResume) {
// Resume from saved state
const newHandle = await handle.resume();
const result = await newHandle.result();
}
}Limitations
No Transactions Across Keys
Atomic operations work per-key, not across multiple keys:
// This is NOT atomic across both updates
await stateStore.updateStatus(runId, 'completed');
await stateStore.mergeCustomState(runId, { values: { finalResult: result } });
// If process crashes between these, state may be inconsistentMemory Usage
Monitor Redis memory with many concurrent agents:
// Set maxmemory and policy
// redis.conf: maxmemory 1gb
// redis.conf: maxmemory-policy allkeys-lru
// Or use TTL aggressively
const stateStore = new RedisStateStore(redis, { ttl: 3600 });Network Latency
Each operation is a network round-trip. Batch when possible:
// Multiple sequential calls - multiple round-trips
await stateStore.appendMessages(runId, [msg1]);
await stateStore.appendMessages(runId, [msg2]);
// Single call - one round-trip
await stateStore.appendMessages(runId, [msg1, msg2]);Next Steps
- In-Memory Storage - For development
- Cloudflare Storage - For edge deployment
- Temporal Runtime - Uses Redis storage