Runtime
The runtime ties together queue definitions, an adapter, and worker bindings into a single object that manages the full lifecycle.
createQueueRuntime
import { createQueueRuntime } from "@repo/queue";
import { createPgBossAdapter } from "@repo/queue/adapters/pg-boss";
import { sendEmailsJob } from "./jobs/sendEmails.job";
export const queueRuntime = createQueueRuntime({
queues: [
{ name: "low", priority: 0 },
{ name: "high", priority: 10 },
],
adapter: createPgBossAdapter({
connectionString: process.env.DATABASE_URL,
}),
workers: [
{
job: sendEmailsJob,
concurrency: 10,
queues: ["low"],
},
],
});
Config
| Field | Type | Description |
|---|---|---|
queues | QueueDef[] | Named queues with optional priority |
adapter | QueueAdapter | Backend adapter (e.g. pg-boss) |
workers | WorkerBinding[] | Maps jobs to queues with concurrency |
hooks | JobHooks | Optional global hooks (run in addition to per-job hooks) |
QueueDef
interface QueueDef {
name: string;
priority?: number;
}
WorkerBinding
interface WorkerBinding {
job: JobDef;
concurrency?: number; // defaults to 4
queues: string | string[];
}
A single job can be bound to multiple queues. Concurrency is summed per queue across all bindings.
Runtime API
The returned QueueRuntime object exposes:
startWorkers()
await queueRuntime.startWorkers();
Starts the adapter, initializes queues, and subscribes workers. Call once at server startup. In SvelteKit:
// hooks.server.ts
queueRuntime.startWorkers();
Automatically skipped during SvelteKit build (building === true).
stopWorkers()
await queueRuntime.stopWorkers();
Unsubscribes all workers and stops the adapter. Use for graceful shutdown.
createClient()
Returns a QueueClient for dispatching jobs. See Dispatching.
const client = queueRuntime.createClient();
Global Hooks
Hooks defined on the runtime config run in addition to per-job hooks, in order: job hooks first, then global hooks.
const queueRuntime = createQueueRuntime({
queues: [{ name: "default" }],
adapter: createPgBossAdapter({ connectionString: "..." }),
workers: [{ job: myJob, queues: ["default"] }],
hooks: {
onError: (err, payload, ctx) => {
// runs on every failure across all jobs
},
onSuccess: (result, payload, ctx) => {
// runs on every success across all jobs
},
},
});