Skip to main content

Runtime

The runtime ties together queue definitions, an adapter, and worker bindings into a single object that manages the full lifecycle.

createQueueRuntime

import { createQueueRuntime } from "@repo/queue";
import { createPgBossAdapter } from "@repo/queue/adapters/pg-boss";
import { sendEmailsJob } from "./jobs/sendEmails.job";

export const queueRuntime = createQueueRuntime({
queues: [
{ name: "low", priority: 0 },
{ name: "high", priority: 10 },
],
adapter: createPgBossAdapter({
connectionString: process.env.DATABASE_URL,
}),
workers: [
{
job: sendEmailsJob,
concurrency: 10,
queues: ["low"],
},
],
});

Config

FieldTypeDescription
queuesQueueDef[]Named queues with optional priority
adapterQueueAdapterBackend adapter (e.g. pg-boss)
workersWorkerBinding[]Maps jobs to queues with concurrency
hooksJobHooksOptional global hooks (run in addition to per-job hooks)

QueueDef

interface QueueDef {
name: string;
priority?: number;
}

WorkerBinding

interface WorkerBinding {
job: JobDef;
concurrency?: number; // defaults to 4
queues: string | string[];
}

A single job can be bound to multiple queues. Concurrency is summed per queue across all bindings.

Runtime API

The returned QueueRuntime object exposes:

startWorkers()

await queueRuntime.startWorkers();

Starts the adapter, initializes queues, and subscribes workers. Call once at server startup. In SvelteKit:

// hooks.server.ts
queueRuntime.startWorkers();

Automatically skipped during SvelteKit build (building === true).

stopWorkers()

await queueRuntime.stopWorkers();

Unsubscribes all workers and stops the adapter. Use for graceful shutdown.

createClient()

Returns a QueueClient for dispatching jobs. See Dispatching.

const client = queueRuntime.createClient();

Global Hooks

Hooks defined on the runtime config run in addition to per-job hooks, in order: job hooks first, then global hooks.

const queueRuntime = createQueueRuntime({
queues: [{ name: "default" }],
adapter: createPgBossAdapter({ connectionString: "..." }),
workers: [{ job: myJob, queues: ["default"] }],
hooks: {
onError: (err, payload, ctx) => {
// runs on every failure across all jobs
},
onSuccess: (result, payload, ctx) => {
// runs on every success across all jobs
},
},
});