Skip to content

@helix-agents/store-postgres

PostgreSQL implementations of store interfaces for production use. Provides durable session-state, lock-manager, and usage-tracking implementations that work across all runtimes (JS, Temporal, DBOS, Cloudflare via Neon/Hyperdrive).

Streaming: This package does NOT provide a stream manager. Pair PostgresStateStore with RedisStreamManager from @helix-agents/store-redis (recommended) or InMemoryStreamManager for live event streaming.

Installation

bash
# Node.js / Temporal / DBOS — use pg
npm install @helix-agents/store-postgres pg

# Cloudflare Workers / edge — use Neon serverless
npm install @helix-agents/store-postgres @neondatabase/serverless

# Cloudflare Workers via Hyperdrive — no extra dependency required
npm install @helix-agents/store-postgres

Quick Start

typescript
import { Pool } from 'pg';
import {
  createPgAdapter,
  PostgresMigrations,
  PostgresStateStore,
  PostgresLockManager,
  PostgresUsageStore,
} from '@helix-agents/store-postgres';

// 1. Wrap your underlying client in an adapter
const pool = new Pool({ connectionString: process.env.POSTGRES_URL });
const adapter = createPgAdapter(pool);

// 2. Run migrations (idempotent, advisory-locked, transactional)
const { current } = await new PostgresMigrations(adapter).run();
console.log(`Schema is at V${current}`);

// 3. Construct stores
const stateStore = new PostgresStateStore(adapter);
const lockManager = new PostgresLockManager(adapter);
const usageStore = new PostgresUsageStore(adapter);

Session vs Run Identifiers

  • sessionId: Primary key for all state operations. A session contains all messages, state, and checkpoints.
  • runId: Execution metadata identifying a specific run within a session.

All store methods use sessionId as the primary key for lookups and storage.

Adapter Factories

Each store accepts a PostgresAdapter so the same store implementations work across pg, Neon, and Hyperdrive without changes.

createPgAdapter(pool: Pool)

Wraps a pg.Pool. Each transaction() checks out a PoolClient, issues BEGIN/COMMIT/ROLLBACK, and releases it on completion.

typescript
import { Pool } from 'pg';
import { createPgAdapter } from '@helix-agents/store-postgres';

const pool = new Pool({ connectionString: process.env.POSTGRES_URL });
const adapter = createPgAdapter(pool);

Use for: Node.js, Temporal workers, DBOS workers, long-lived servers.

createNeonAdapter(pool)

Wraps a @neondatabase/serverless Pool. The Neon transport supports the same query/transaction surface as pg, so the same store implementations run unchanged against Neon's HTTP/WebSocket transport.

typescript
import { Pool } from '@neondatabase/serverless';
import { createNeonAdapter } from '@helix-agents/store-postgres';

const pool = new Pool({ connectionString: process.env.NEON_URL });
const adapter = createNeonAdapter(pool);

Use for: Neon serverless, Cloudflare Workers (with Neon as origin), edge runtimes.

createHyperdriveAdapter(hyperdrive)

Wraps a Cloudflare Hyperdrive binding. Allows running PostgresStateStore from inside a Cloudflare Worker, with Hyperdrive handling connection pooling and edge-routing to your origin Postgres.

typescript
import { createHyperdriveAdapter } from '@helix-agents/store-postgres';

// In a Cloudflare Worker fetch handler:
const adapter = createHyperdriveAdapter(env.HYPERDRIVE);

Custom adapters

PostgresAdapter is a small interface (query, transaction, close). Any Postgres client can be wrapped to plug into the same stores.

Migrations

PostgresMigrations.run() executes all pending migrations in a single transaction guarded by pg_advisory_xact_lock, so concurrent runners can't double-apply. The current schema is V7.

VersionDescription
V1Session-centric schema with all base tables and indexes.
V2Add remote_json column to __agents_sub_session_refs for remote agents.
V3Add pending_client_tool_calls JSONB to __agents_states for client-executed tools.
V4Add root_session_id TEXT and client_tool_call_ownership JSONB to __agents_states.
V5Add completed_client_tool_calls JSONB for the durable already_completed marker.
V6Add mode column on states + completion_reason on runs (consumed by runtime-dbos).
V7Add suspension_context JSONB + partial index on expiresAt for stateless HITL/client-tool flows.
typescript
const migrations = new PostgresMigrations(adapter, {
  tablePrefix: '__agents_', // default; must match `[a-zA-Z_][a-zA-Z0-9_]*`
});

const { applied, current } = await migrations.run();
// applied: [3, 4, 5, 6, 7]  (versions just applied)
// current: 7                (final schema version)

Migrations record applied versions in __agents_migrations. Re-running on an up-to-date database is a no-op. Multiple tenants can share a database by passing distinct tablePrefix values.

PostgresStateStore

Full implementation of SessionStateStore from @helix-agents/core.

typescript
const stateStore = new PostgresStateStore(adapter, {
  tablePrefix: '__agents_', // default
  logger: customLogger,     // optional
});

await stateStore.createSession('session-123', { agentType: 'my-agent' });
const state = await stateStore.loadState('session-123');
await stateStore.saveState('session-123', state);

Atomic save + promote (v7)

PostgresStateStore implements the v7 saveStateAndPromoteStaging primitive via a single BEGIN; UPDATE/INSERT/DELETE; COMMIT transaction. State, appended messages, staged changes, and the new checkpoint are all persisted atomically — the cross-runtime invariant C-1 contract that lets runtimes suspend without leaking partial writes.

typescript
const { checkpointId, newVersion } = await stateStore.saveStateAndPromoteStaging(
  'session-123',
  nextState,
  appendMessages,
  { stepId, stepCount, streamSequence },
  { expectedVersion: previousVersion } // throws StaleStateError on mismatch
);

Compare-and-set status

typescript
const result = await stateStore.compareAndSetStatus(
  'session-123',
  ['running'],
  'interrupted'
);

if (result.ok) {
  // Status changed; result.newVersion is the post-increment version
} else {
  // result.currentStatus / result.currentVersion show actual stored values
}

The version field is incremented on every successful CAS. Stores reject saveState calls whose expectedVersion doesn't match, which prevents stale processes from clobbering newer state.

Checkpoints

typescript
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-...');
const latest = await stateStore.getLatestCheckpoint('session-123');
const result = await stateStore.listCheckpoints('session-123', { limit: 10 });

v7 SessionState fields

PostgresStateStore persists the full v7 stateless-suspension surface:

  • suspendedAwaitingChildren / suspendedStepId / tracingContext / expiresAt — packed into suspension_context JSONB (V7 migration). The partial index on suspension_context->>'expiresAt' supports operator-side cleanup of abandoned suspended sessions.
  • pendingClientToolCalls / completedClientToolCalls / clientToolCallOwnership — dedicated JSONB columns (V3/V4/V5).
  • failureReason — discriminator on status === 'failed' (e.g. 'parent_suspended' for the γ-cascade).
  • mode'standard' | 'persistent', write-once on first execute() (V6).
  • version / resumeCount — optimistic concurrency + resume tracking.

See @helix-agents/core for field semantics.

PostgresLockManager

Distributed lock manager backed by Postgres advisory locks. Returns a fencing token so split-brain scenarios can be rejected by downstream operations.

typescript
import { PostgresLockManager } from '@helix-agents/store-postgres';

const lockManager = new PostgresLockManager(adapter, {
  tablePrefix: '__agents_',
  defaultTTLMs: 30_000,
});

const lock = await lockManager.acquire('session-123', { ttlMs: 30_000 });
if (lock) {
  console.log('Fencing token:', lock.fencingToken);

  // Refresh during long operations
  await lock.refresh();

  // Release when done
  await lock.release();
}

Use with executor

typescript
import { JSAgentExecutor } from '@helix-agents/runtime-js';

const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, {
  lockManager,
});

PostgresUsageStore

Records token / tool / sub-agent / custom usage entries; supports filter, range, and rollup queries.

typescript
import { PostgresUsageStore } from '@helix-agents/store-postgres';

const usageStore = new PostgresUsageStore(adapter);

await usageStore.recordEntry({
  kind: 'tokens',
  sessionId: 'session-123',
  stepCount: 1,
  timestamp: Date.now(),
  source: { type: 'agent', name: 'my-agent' },
  model: 'gpt-4o',
  tokens: { prompt: 100, completion: 50, total: 150 },
});

// All entries
const entries = await usageStore.getEntries('session-123');

// Filtered
const tokenOnly = await usageStore.getEntries('session-123', { kinds: ['tokens'] });

// Rollup
const rollup = await usageStore.getRollup('session-123', { includeSubAgents: true });

Querying

List Sessions

typescript
import type { ListSessionsOptions } from '@helix-agents/core';

const result = await stateStore.listSessions({
  agentType: 'chat-agent',
  status: 'active',
  orderBy: { field: 'updatedAt', direction: 'desc' },
  limit: 25,
});

Query Usage

typescript
import type { QueryUsageOptions } from '@helix-agents/core';

const result = await usageStore.queryUsage({
  agentType: 'chat-agent',
  kinds: ['tokens'],
  dateRange: { from: Date.now() - 7 * 24 * 60 * 60 * 1000 },
  limit: 100,
});

See the Querying Guide for complete documentation.

Streaming

This package does not include a stream manager. Pair the state store with one of:

  • RedisStreamManager from @helix-agents/store-redis (recommended for production)
  • InMemoryStreamManager from @helix-agents/store-memory (dev/tests)
  • DurableObjectStreamManager from @helix-agents/store-cloudflare (Cloudflare deploys)

Integration Tests

Integration tests run against a real Postgres instance. The repo's docker-compose.test.yml exposes a Postgres 16 service on port 5433:

bash
# Start Postgres
docker compose -f docker-compose.test.yml up -d postgres-store

# Run integration tests
POSTGRES_URL=postgresql://test:test@localhost:5433/helix_agents_test \
  npm run -w @helix-agents/store-postgres test:integration

The harness creates a unique tablePrefix per test run, applies all migrations via PostgresMigrations.run(), and drops the prefixed tables on teardown. The full integration suite covers state store contract, lock manager, usage store, atomic suspension writes, migration sequencing, and concurrent CAS scenarios.

When to Use

Choose Postgres stores when:

  • Running in production with an existing Postgres operational footprint
  • You need transactional guarantees for state + messages + checkpoints
  • You want a single durable backend across runtimes (JS / Temporal / DBOS / Cloudflare)
  • You need Hyperdrive or Neon for edge deploys

Consider other stores when:

  • Developing locally (@helix-agents/store-memory)
  • Postgres infrastructure isn't available (@helix-agents/store-redis)
  • Deploying primarily to Cloudflare without origin Postgres (@helix-agents/store-cloudflare)

See Also

Released under the MIT License.