Skip to content

Distributed Coordination

When running agents across multiple processes or servers, you need coordination to prevent conflicts. This guide covers the tools and patterns for safe distributed execution.

The Problem

In distributed deployments, multiple processes might try to:

  • Resume the same agent simultaneously
  • Write state at the same time
  • Process the same step concurrently

Without coordination, this leads to:

  • Split-brain - Two processes think they own the agent
  • Lost updates - One process overwrites another's changes
  • Corrupted state - Partial writes from concurrent modifications

Solutions

Helix Agents provides two complementary mechanisms:

  1. Lock Managers - Prevent concurrent execution
  2. Optimistic Concurrency - Detect stale writes via version checking

Lock Managers

Lock managers provide distributed mutex semantics. A lock ensures only one process executes an agent at a time.

Available Implementations

Lock ManagerUse CaseBackend
NoOpLockManagerSingle processNone
InMemoryLockManagerDevelopmentMemory
RedisLockManagerProductionRedis
DurableObjectLockManagerCloudflareDurable Objects

NoOpLockManager

For single-process deployments where coordination isn't needed:

typescript
import { NoOpLockManager } from '@helix-agents/core';

const lockManager = new NoOpLockManager();
// All lock operations succeed immediately

InMemoryLockManager

For development and testing:

typescript
import { InMemoryLockManager } from '@helix-agents/store-memory';

const lockManager = new InMemoryLockManager();

RedisLockManager

For production distributed deployments:

typescript
import { RedisLockManager } from '@helix-agents/store-redis';

const lockManager = new RedisLockManager({
  host: 'localhost',
  port: 6379,
  keyPrefix: 'myapp:locks:',
});

DurableObjectLockManager

For Cloudflare Workers:

typescript
import { DurableObjectLockManager } from '@helix-agents/store-cloudflare';

const lockManager = new DurableObjectLockManager(env.LOCK_DO);

Using Lock Managers

Acquire and Release

Manual lock management:

typescript
const resource = `session:${sessionId}`;

const result = await lockManager.acquire(resource, {
  ttlMs: 30000, // 30 second lease
});

if (result.acquired) {
  try {
    // Do work with the lock held
    await executeAgent();
  } finally {
    await lockManager.release(result.lock);
  }
} else {
  console.log(`Lock held by ${result.heldBy}`);
}

withLock Helper

Automatic acquisition, heartbeat, and release:

typescript
await lockManager.withLock(`session:${sessionId}`, { ttlMs: 30000, heartbeatMs: 10000 }, async (lock) => {
  // Lock is automatically refreshed every 10 seconds
  await executeAgent();
  // Lock is released when function completes
});

Fencing Tokens

Locks include monotonically increasing fencing tokens to prevent split-brain writes:

typescript
const result = await lockManager.acquire(resource, { ttlMs: 30000 });

if (result.acquired) {
  const { fencingToken } = result.lock;
  // Pass token to state store for validation
  await stateStore.save(state, { fencingToken });
}

Optimistic Concurrency Control

Even with locks, network partitions can cause issues. Version checking provides a second layer of protection.

How It Works

  1. State includes a version field
  2. On save, the store checks if version matches
  3. If mismatched, save fails with StaleStateError
typescript
// Process A loads state at version 5
const state = await stateStore.loadState(sessionId);
console.log(state.version); // 5

// Process B loads same state, modifies, saves -> version 6

// Process A tries to save its changes
try {
  await stateStore.save(state); // Expected version 5, but store has 6
} catch (error) {
  if (error instanceof StaleStateError) {
    // Our changes are stale - another process modified state
    console.log('State was modified by another process');
  }
}

Handling StaleStateError

When you detect a stale state, stop gracefully:

typescript
import { StaleStateError, ExecutorSupersededError } from '@helix-agents/core';

try {
  await executeStep();
} catch (error) {
  if (error instanceof StaleStateError) {
    // Another executor has this run - stop gracefully
    throw new ExecutorSupersededError(sessionId);
  }
  throw error;
}

Error Types

StaleStateError

Thrown when a save fails due to version mismatch:

typescript
interface StaleStateError {
  runId: string;
  expectedVersion: number;
  actualVersion: number;
}

ExecutorSupersededError

Thrown when an executor should stop because another took over:

typescript
interface ExecutorSupersededError {
  runId: string;
}

This is a graceful shutdown signal, not a failure. The other executor will continue the work.

FencingTokenMismatchError

Thrown when a fencing token doesn't match:

typescript
interface FencingTokenMismatchError {
  runId: string;
  expectedToken: number;
}

Stream Events

executor_superseded

Emitted when an executor is superseded:

typescript
for await (const chunk of stream) {
  if (chunk.type === 'executor_superseded') {
    console.log('Another executor took over');
    break;
  }
}

Compare-And-Set Status

For atomic status transitions, use compareAndSetStatus:

typescript
// Only update status if currently 'interrupted'
const success = await stateStore.compareAndSetStatus(
  sessionId,
  ['interrupted'], // expected statuses (array)
  'running'        // new status
);

if (!success) {
  // Status was changed by another process
  console.log('Status transition failed - concurrent modification');
}

This prevents race conditions when multiple processes try to resume.

Version Increment on CAS

When compareAndSetStatus() succeeds, it atomically increments the state version along with the status change. This is critical for closing a subtle race condition in optimistic concurrency control.

The Race Condition Problem

Without version increment on CAS, a race condition exists:

typescript
// Timeline without version increment (BUG):
// 1. Process A loads state: version=5, status='interrupted'
// 2. Process B loads state: version=5, status='interrupted'
// 3. Process B: CAS succeeds, status='running' (version stays 5!)
// 4. Process A: CAS fails (status mismatch - correct)
// 5. Process B crashes without saving
// 6. Process A retries, loads state: version=5, status='running'
// 7. Process A does CAS: 'running' -> 'running' (succeeds, version stays 5)
// 8. Process A saves with version=5 -> SUCCEEDS (but shouldn't!)

The problem: If CAS doesn't increment version, a process that loaded state before another's CAS can still save successfully because versions match.

The Solution

CAS atomically increments version on success:

typescript
// Timeline with version increment (CORRECT):
// 1. Process A loads state: version=5, status='interrupted'
// 2. Process B loads state: version=5, status='interrupted'
// 3. Process B: CAS succeeds, status='running', version=6
// 4. Process A: CAS fails (status mismatch - correct)
// 5. Later, if Process A somehow tries to save with version=5:
//    -> StaleStateError (store has version=6)

Example

typescript
// Initial state: version = 5, status = 'interrupted'

// Process A: CAS succeeds
const success = await stateStore.compareAndSetStatus(sessionId, ['interrupted'], 'running');
// success = true, version is now 6

// Process B: CAS fails (status is now 'running')
const success2 = await stateStore.compareAndSetStatus(sessionId, ['interrupted'], 'running');
// success2 = false (status mismatch)

// After successful CAS, sync local state version to match store:
if (success) {
  state.status = 'running';
  state.version = (state.version ?? 0) + 1; // Now 6, matches store
}

// Later saves will use the correct version
await stateStore.save(state); // Works because version=6 matches store

Implementation Across Stores

All state stores implement atomic CAS with version increment:

StoreImplementation
InMemoryStateStoreDirect increment: state.version = (state.version ?? 0) + 1
RedisStateStoreLua script with HINCRBY key 'version' 1 for atomicity
D1StateStoreSQL UPDATE ... SET version = version + 1
DOStateStoreSQL transaction with version increment

The Lua script in Redis ensures the check-and-increment is atomic, preventing any race window.

Runtime-Specific Coordination

JavaScript Runtime

The JS runtime relies on explicit lock managers:

typescript
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, {
  lockManager: new RedisLockManager(redis),
});

Temporal Runtime

Temporal provides coordination via workflow ID uniqueness:

  • Each run has a unique workflow ID
  • Temporal ensures only one workflow with that ID runs at a time
  • Resume creates a new workflow that continues from checkpoint
typescript
// Temporal handles coordination automatically
const executor = new TemporalAgentExecutor(connection, {
  taskQueue: 'agents',
});

Cloudflare Runtime

Cloudflare Workflows use instance ID uniqueness:

  • Each run has a unique instance ID
  • Cloudflare ensures instance uniqueness within a Workflow
  • Durable Objects provide additional locking if needed
typescript
const executor = new CloudflareAgentExecutor(ctx, env, {
  lockDO: env.LOCK_DO, // Optional Durable Object for extra coordination
});

Best Practices

1. Use Appropriate Lock Manager

DeploymentRecommended Lock Manager
Single processNoOpLockManager
Multiple Node.js processesRedisLockManager
Cloudflare WorkersDurableObjectLockManager
TemporalBuilt-in workflow uniqueness

2. Handle Supersession Gracefully

When superseded, stop cleanly without failing:

typescript
try {
  await runAgentLoop();
} catch (error) {
  if (error instanceof ExecutorSupersededError) {
    // Not a failure - another executor continues
    console.log('Superseded, stopping gracefully');
    return;
  }
  throw error;
}

3. Set Appropriate Lock TTLs

Too short: Lock expires during long operations Too long: Long wait if process crashes

typescript
// Rule of thumb: 2-3x expected step duration
const lockManager = new RedisLockManager(redis);
await lockManager.withLock(
  resource,
  {
    ttlMs: 60000, // 1 minute lease
    heartbeatMs: 20000, // Refresh every 20 seconds
  },
  fn
);

4. Monitor Lock Contention

Track lock acquisition failures:

typescript
const result = await lockManager.acquire(resource, options);
if (!result.acquired) {
  metrics.increment('lock_contention');
  console.log(`Lock held by ${result.heldBy}, expires at ${result.expiresAt}`);
}

Next Steps

Released under the MIT License.