Adapters
The queue core is backend-agnostic. All storage and transport is handled by adapters that implement the QueueAdapter interface.
QueueAdapter Interface
interface QueueAdapter {
start: () => Promise<void>;
stop: () => Promise<void>;
initialize?: (ctx: { queues: readonly QueueDef[] }) => Promise<void>;
publish: (req: PublishRequest) => Promise<PublishedJob>;
subscribe: (
req: SubscribeRequest,
handler: (msg: SubscribeMessage) => Awaitable<void>,
) => Promise<{ subscriptionId: string }>;
unsubscribe: (subscriptionId: string) => Promise<void>;
}
| Method | Description |
|---|---|
start | Connect to the backend |
stop | Disconnect and clean up |
initialize | Optional. Create queues/topics on the backend |
publish | Enqueue a job |
subscribe | Register a worker handler for a queue |
unsubscribe | Remove a worker subscription |
pg-boss Adapter
pg-boss is a PostgreSQL-backed job queue. It manages its own schema and requires no external message broker.
Setup
Install the peer dependency:
pnpm add pg-boss
Usage
import { createPgBossAdapter } from "@repo/queue/adapters/pg-boss";
const adapter = createPgBossAdapter({
connectionString: process.env.DATABASE_URL,
});
Pass the adapter to createQueueRuntime:
import { createQueueRuntime } from "@repo/queue";
const runtime = createQueueRuntime({
queues: [
{ name: "low", priority: 0 },
{ name: "high", priority: 10 },
],
adapter,
workers: [
{ job: myJob, concurrency: 10, queues: ["low"] },
],
});
Options
createPgBossAdapter accepts the same options as the PgBoss constructor -- most commonly connectionString, but any valid pg-boss config is supported.
Behavior Details
- Queues are created automatically via
initialize()(callsboss.createQueue()) maxAttemptsmaps to pg-bossretryLimit(retryLimit = maxAttempts - 1)dedupeKeymaps to pg-bosssingletonKeydelaymaps to pg-bossstartAfterpriorityfromQueueDefis forwarded to pg-boss job options
Writing a Custom Adapter
Implement the QueueAdapter interface and pass it to createQueueRuntime. The core never calls backend-specific APIs directly -- all communication goes through the adapter contract.