@helix-agents/store-postgres
PostgreSQL implementations of store interfaces for production use. Provides durable session-state, lock-manager, and usage-tracking implementations that work across all runtimes (JS, Temporal, DBOS, Cloudflare via Neon/Hyperdrive).
Streaming: This package does NOT provide a stream manager. Pair
PostgresStateStorewithRedisStreamManagerfrom@helix-agents/store-redis(recommended) orInMemoryStreamManagerfor live event streaming.
Installation
# Node.js / Temporal / DBOS — use pg
npm install @helix-agents/store-postgres pg
# Cloudflare Workers / edge — use Neon serverless
npm install @helix-agents/store-postgres @neondatabase/serverless
# Cloudflare Workers via Hyperdrive — no extra dependency required
npm install @helix-agents/store-postgresQuick Start
import { Pool } from 'pg';
import {
createPgAdapter,
PostgresMigrations,
PostgresStateStore,
PostgresLockManager,
PostgresUsageStore,
} from '@helix-agents/store-postgres';
// 1. Wrap your underlying client in an adapter
const pool = new Pool({ connectionString: process.env.POSTGRES_URL });
const adapter = createPgAdapter(pool);
// 2. Run migrations (idempotent, advisory-locked, transactional)
const { current } = await new PostgresMigrations(adapter).run();
console.log(`Schema is at V${current}`);
// 3. Construct stores
const stateStore = new PostgresStateStore(adapter);
const lockManager = new PostgresLockManager(adapter);
const usageStore = new PostgresUsageStore(adapter);Session vs Run Identifiers
sessionId: Primary key for all state operations. A session contains all messages, state, and checkpoints.runId: Execution metadata identifying a specific run within a session.
All store methods use sessionId as the primary key for lookups and storage.
Adapter Factories
Each store accepts a PostgresAdapter so the same store implementations work across pg, Neon, and Hyperdrive without changes.
createPgAdapter(pool: Pool)
Wraps a pg.Pool. Each transaction() checks out a PoolClient, issues BEGIN/COMMIT/ROLLBACK, and releases it on completion.
import { Pool } from 'pg';
import { createPgAdapter } from '@helix-agents/store-postgres';
const pool = new Pool({ connectionString: process.env.POSTGRES_URL });
const adapter = createPgAdapter(pool);Use for: Node.js, Temporal workers, DBOS workers, long-lived servers.
createNeonAdapter(pool)
Wraps a @neondatabase/serverless Pool. The Neon transport supports the same query/transaction surface as pg, so the same store implementations run unchanged against Neon's HTTP/WebSocket transport.
import { Pool } from '@neondatabase/serverless';
import { createNeonAdapter } from '@helix-agents/store-postgres';
const pool = new Pool({ connectionString: process.env.NEON_URL });
const adapter = createNeonAdapter(pool);Use for: Neon serverless, Cloudflare Workers (with Neon as origin), edge runtimes.
createHyperdriveAdapter(hyperdrive)
Wraps a Cloudflare Hyperdrive binding. Allows running PostgresStateStore from inside a Cloudflare Worker, with Hyperdrive handling connection pooling and edge-routing to your origin Postgres.
import { createHyperdriveAdapter } from '@helix-agents/store-postgres';
// In a Cloudflare Worker fetch handler:
const adapter = createHyperdriveAdapter(env.HYPERDRIVE);Custom adapters
PostgresAdapter is a small interface (query, transaction, close). Any Postgres client can be wrapped to plug into the same stores.
Migrations
PostgresMigrations.run() executes all pending migrations in a single transaction guarded by pg_advisory_xact_lock, so concurrent runners can't double-apply. The current schema is V7.
| Version | Description |
|---|---|
| V1 | Session-centric schema with all base tables and indexes. |
| V2 | Add remote_json column to __agents_sub_session_refs for remote agents. |
| V3 | Add pending_client_tool_calls JSONB to __agents_states for client-executed tools. |
| V4 | Add root_session_id TEXT and client_tool_call_ownership JSONB to __agents_states. |
| V5 | Add completed_client_tool_calls JSONB for the durable already_completed marker. |
| V6 | Add mode column on states + completion_reason on runs (consumed by runtime-dbos). |
| V7 | Add suspension_context JSONB + partial index on expiresAt for stateless HITL/client-tool flows. |
const migrations = new PostgresMigrations(adapter, {
tablePrefix: '__agents_', // default; must match `[a-zA-Z_][a-zA-Z0-9_]*`
});
const { applied, current } = await migrations.run();
// applied: [3, 4, 5, 6, 7] (versions just applied)
// current: 7 (final schema version)Migrations record applied versions in __agents_migrations. Re-running on an up-to-date database is a no-op. Multiple tenants can share a database by passing distinct tablePrefix values.
PostgresStateStore
Full implementation of SessionStateStore from @helix-agents/core.
const stateStore = new PostgresStateStore(adapter, {
tablePrefix: '__agents_', // default
logger: customLogger, // optional
});
await stateStore.createSession('session-123', { agentType: 'my-agent' });
const state = await stateStore.loadState('session-123');
await stateStore.saveState('session-123', state);Atomic save + promote (v7)
PostgresStateStore implements the v7 saveStateAndPromoteStaging primitive via a single BEGIN; UPDATE/INSERT/DELETE; COMMIT transaction. State, appended messages, staged changes, and the new checkpoint are all persisted atomically — the cross-runtime invariant C-1 contract that lets runtimes suspend without leaking partial writes.
const { checkpointId, newVersion } = await stateStore.saveStateAndPromoteStaging(
'session-123',
nextState,
appendMessages,
{ stepId, stepCount, streamSequence },
{ expectedVersion: previousVersion } // throws StaleStateError on mismatch
);Compare-and-set status
const result = await stateStore.compareAndSetStatus(
'session-123',
['running'],
'interrupted'
);
if (result.ok) {
// Status changed; result.newVersion is the post-increment version
} else {
// result.currentStatus / result.currentVersion show actual stored values
}The version field is incremented on every successful CAS. Stores reject saveState calls whose expectedVersion doesn't match, which prevents stale processes from clobbering newer state.
Checkpoints
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-...');
const latest = await stateStore.getLatestCheckpoint('session-123');
const result = await stateStore.listCheckpoints('session-123', { limit: 10 });v7 SessionState fields
PostgresStateStore persists the full v7 stateless-suspension surface:
suspendedAwaitingChildren/suspendedStepId/tracingContext/expiresAt— packed intosuspension_context JSONB(V7 migration). The partial index onsuspension_context->>'expiresAt'supports operator-side cleanup of abandoned suspended sessions.pendingClientToolCalls/completedClientToolCalls/clientToolCallOwnership— dedicated JSONB columns (V3/V4/V5).failureReason— discriminator onstatus === 'failed'(e.g.'parent_suspended'for the γ-cascade).mode—'standard' | 'persistent', write-once on firstexecute()(V6).version/resumeCount— optimistic concurrency + resume tracking.
See @helix-agents/core for field semantics.
PostgresLockManager
Distributed lock manager backed by Postgres advisory locks. Returns a fencing token so split-brain scenarios can be rejected by downstream operations.
import { PostgresLockManager } from '@helix-agents/store-postgres';
const lockManager = new PostgresLockManager(adapter, {
tablePrefix: '__agents_',
defaultTTLMs: 30_000,
});
const lock = await lockManager.acquire('session-123', { ttlMs: 30_000 });
if (lock) {
console.log('Fencing token:', lock.fencingToken);
// Refresh during long operations
await lock.refresh();
// Release when done
await lock.release();
}Use with executor
import { JSAgentExecutor } from '@helix-agents/runtime-js';
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, {
lockManager,
});PostgresUsageStore
Records token / tool / sub-agent / custom usage entries; supports filter, range, and rollup queries.
import { PostgresUsageStore } from '@helix-agents/store-postgres';
const usageStore = new PostgresUsageStore(adapter);
await usageStore.recordEntry({
kind: 'tokens',
sessionId: 'session-123',
stepCount: 1,
timestamp: Date.now(),
source: { type: 'agent', name: 'my-agent' },
model: 'gpt-4o',
tokens: { prompt: 100, completion: 50, total: 150 },
});
// All entries
const entries = await usageStore.getEntries('session-123');
// Filtered
const tokenOnly = await usageStore.getEntries('session-123', { kinds: ['tokens'] });
// Rollup
const rollup = await usageStore.getRollup('session-123', { includeSubAgents: true });Querying
List Sessions
import type { ListSessionsOptions } from '@helix-agents/core';
const result = await stateStore.listSessions({
agentType: 'chat-agent',
status: 'active',
orderBy: { field: 'updatedAt', direction: 'desc' },
limit: 25,
});Query Usage
import type { QueryUsageOptions } from '@helix-agents/core';
const result = await usageStore.queryUsage({
agentType: 'chat-agent',
kinds: ['tokens'],
dateRange: { from: Date.now() - 7 * 24 * 60 * 60 * 1000 },
limit: 100,
});See the Querying Guide for complete documentation.
Streaming
This package does not include a stream manager. Pair the state store with one of:
RedisStreamManagerfrom@helix-agents/store-redis(recommended for production)InMemoryStreamManagerfrom@helix-agents/store-memory(dev/tests)DurableObjectStreamManagerfrom@helix-agents/store-cloudflare(Cloudflare deploys)
Integration Tests
Integration tests run against a real Postgres instance. The repo's docker-compose.test.yml exposes a Postgres 16 service on port 5433:
# Start Postgres
docker compose -f docker-compose.test.yml up -d postgres-store
# Run integration tests
POSTGRES_URL=postgresql://test:test@localhost:5433/helix_agents_test \
npm run -w @helix-agents/store-postgres test:integrationThe harness creates a unique tablePrefix per test run, applies all migrations via PostgresMigrations.run(), and drops the prefixed tables on teardown. The full integration suite covers state store contract, lock manager, usage store, atomic suspension writes, migration sequencing, and concurrent CAS scenarios.
When to Use
Choose Postgres stores when:
- Running in production with an existing Postgres operational footprint
- You need transactional guarantees for state + messages + checkpoints
- You want a single durable backend across runtimes (JS / Temporal / DBOS / Cloudflare)
- You need Hyperdrive or Neon for edge deploys
Consider other stores when:
- Developing locally (
@helix-agents/store-memory) - Postgres infrastructure isn't available (
@helix-agents/store-redis) - Deploying primarily to Cloudflare without origin Postgres (
@helix-agents/store-cloudflare)
See Also
- Helix concepts — session/run model, stateless suspension, hooks
@helix-agents/core—SessionStateStoreinterface- v6 → v7 upgrade guide
- Distributed Coordination — CAS operations, version management
- Client-Executed Tools