@helix-agents/store-redis
Redis implementations of store interfaces for production use. Provides durable state storage and stream management across processes.
Installation
bash
npm install @helix-agents/store-redis ioredisRedisStateStore
Redis-backed state storage.
typescript
import { RedisStateStore } from '@helix-agents/store-redis';
const stateStore = new RedisStateStore({
host: 'localhost',
port: 6379,
password: 'optional',
db: 0,
keyPrefix: 'helix:', // Optional prefix for keys
ttl: 86400 * 7, // Optional TTL in seconds (default: 7 days)
});Configuration Options
typescript
interface RedisStateStoreOptions {
// Connection options (ioredis compatible)
host?: string;
port?: number;
password?: string;
db?: number;
// Or provide existing client
client?: Redis;
// Key prefix for namespacing
keyPrefix?: string;
// TTL for state entries (seconds)
ttl?: number;
// Logger
logger?: Logger;
}Methods
Same interface as InMemoryStateStore:
typescript
await stateStore.save(state); // runId is inside state object
const state = await stateStore.load(runId);
await stateStore.delete(runId);
await stateStore.updateStatus(runId, status);
await stateStore.appendMessages(runId, messages);
const { messages, hasMore } = await stateStore.getMessages(runId, options);Data Structure
State is stored as Redis hashes:
helix:state:{runId} -> {
runId: string,
agentType: string,
status: string,
stepCount: number,
customState: JSON string,
messages: JSON string,
output: JSON string,
error: string,
...
}RedisStreamManager
Redis-backed stream management using Redis Streams.
typescript
import { RedisStreamManager } from '@helix-agents/store-redis';
const streamManager = new RedisStreamManager({
host: 'localhost',
port: 6379,
keyPrefix: 'helix:',
maxStreamLength: 10000, // Max chunks per stream
blockTimeout: 5000, // Read timeout (ms)
});Configuration Options
typescript
interface RedisStreamManagerOptions {
// Connection options
host?: string;
port?: number;
password?: string;
db?: number;
client?: Redis;
// Stream options
keyPrefix?: string;
maxStreamLength?: number; // Trim streams to this length
blockTimeout?: number; // XREAD BLOCK timeout
logger?: Logger;
}Methods
Same interface as InMemoryStreamManager:
typescript
const writer = await streamManager.createWriter(streamId, agentId, agentType);
const reader = await streamManager.createReader(streamId);
await streamManager.endStream(streamId);
await streamManager.failStream(streamId, error);
const info = await streamManager.getInfo(streamId);Resumable Reader
typescript
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100,
});
if (reader) {
for await (const { chunk, sequence } of reader) {
// Process chunk
// sequence can be used for Last-Event-ID
}
}Data Structure
Uses Redis Streams:
helix:stream:{streamId} -> XADD entries with:
- chunk: JSON-encoded StreamChunk
- sequence: Auto-incrementing ID
helix:stream:{streamId}:meta -> {
status: 'active' | 'ended' | 'failed',
error?: string,
createdAt: ISO timestamp,
}RedisResumableStorage
Lower-level resumable stream storage.
typescript
import { RedisResumableStorage } from '@helix-agents/store-redis';
const storage = new RedisResumableStorage({
client: redisClient,
keyPrefix: 'helix:',
});
// Create stream
await storage.create(streamId);
// Append chunk with sequence
await storage.append(streamId, chunk, sequence);
// Read from sequence
const chunks = await storage.readFrom(streamId, fromSequence, limit);
// Subscribe to new chunks
const unsubscribe = storage.subscribe(streamId, (chunk, sequence) => {
console.log(`New chunk [${sequence}]:`, chunk);
});
// Get metadata
const metadata = await storage.getMetadata(streamId);
// End stream
await storage.end(streamId);Error Classes
typescript
import {
SequenceConflictError, // Sequence number conflict
MalformedChunkError, // Invalid chunk data
} from '@helix-agents/store-redis';
try {
await storage.append(streamId, chunk, sequence);
} catch (error) {
if (error instanceof SequenceConflictError) {
// Handle sequence conflict
}
}Usage Example
typescript
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Create Redis-backed stores
const stateStore = new RedisStateStore({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
keyPrefix: 'myapp:agents:',
});
const streamManager = new RedisStreamManager({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
keyPrefix: 'myapp:agents:',
});
// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();Production Considerations
Connection Pooling
Use a shared Redis client:
typescript
import Redis from 'ioredis';
const redisClient = new Redis({
host: process.env.REDIS_HOST,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
});
const stateStore = new RedisStateStore({ client: redisClient });
const streamManager = new RedisStreamManager({ client: redisClient });Cluster Support
typescript
import Redis from 'ioredis';
const cluster = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
]);
const stateStore = new RedisStateStore({ client: cluster });Error Handling
typescript
stateStore.on('error', (error) => {
console.error('Redis state store error:', error);
});
streamManager.on('error', (error) => {
console.error('Redis stream manager error:', error);
});TTL Management
Set appropriate TTLs based on your retention needs:
typescript
const stateStore = new RedisStateStore({
ttl: 86400 * 30, // 30 days for state
});Schemas
Zod schema for stored state:
typescript
import { AgentStateSchema } from '@helix-agents/store-redis';
// Validate state
const result = AgentStateSchema.safeParse(data);