Skip to main content

Adapters

The queue core is backend-agnostic. All storage and transport is handled by adapters that implement the QueueAdapter interface.

QueueAdapter Interface

interface QueueAdapter {
start: () => Promise<void>;
stop: () => Promise<void>;
initialize?: (ctx: { queues: readonly QueueDef[] }) => Promise<void>;
publish: (req: PublishRequest) => Promise<PublishedJob>;
subscribe: (
req: SubscribeRequest,
handler: (msg: SubscribeMessage) => Awaitable<void>,
) => Promise<{ subscriptionId: string }>;
unsubscribe: (subscriptionId: string) => Promise<void>;
}
MethodDescription
startConnect to the backend
stopDisconnect and clean up
initializeOptional. Create queues/topics on the backend
publishEnqueue a job
subscribeRegister a worker handler for a queue
unsubscribeRemove a worker subscription

pg-boss Adapter

pg-boss is a PostgreSQL-backed job queue. It manages its own schema and requires no external message broker.

Setup

Install the peer dependency:

pnpm add pg-boss

Usage

import { createPgBossAdapter } from "@repo/queue/adapters/pg-boss";

const adapter = createPgBossAdapter({
connectionString: process.env.DATABASE_URL,
});

Pass the adapter to createQueueRuntime:

import { createQueueRuntime } from "@repo/queue";

const runtime = createQueueRuntime({
queues: [
{ name: "low", priority: 0 },
{ name: "high", priority: 10 },
],
adapter,
workers: [
{ job: myJob, concurrency: 10, queues: ["low"] },
],
});

Options

createPgBossAdapter accepts the same options as the PgBoss constructor -- most commonly connectionString, but any valid pg-boss config is supported.

Behavior Details

  • Queues are created automatically via initialize() (calls boss.createQueue())
  • maxAttempts maps to pg-boss retryLimit (retryLimit = maxAttempts - 1)
  • dedupeKey maps to pg-boss singletonKey
  • delay maps to pg-boss startAfter
  • priority from QueueDef is forwarded to pg-boss job options

Writing a Custom Adapter

Implement the QueueAdapter interface and pass it to createQueueRuntime. The core never calls backend-specific APIs directly -- all communication goes through the adapter contract.