--- title: 'Queues' slug: queues description: 'Run background jobs in-process with Mochi.queue(), backed by bunqueue embedded mode.' --- ## Queues Offload work that shouldn't block a response — sending email, encoding media, calling slow third-party APIs — to a background **queue**. A queue bundles a job channel with the `process` function that consumes it; both run in your process, backed by [bunqueue](https://bunqueue.dev/)'s embedded mode. `Mochi.queue()` — like `Mochi.page` / `api` / `ws` / `sse` — returns an **inert config** that you mount in `Mochi.serve({ queues })`, keyed by name, so every background queue the server runs is declared in one place. Produce jobs from anywhere with `Mochi.getQueue(name).add(...)`. ```ts import { Mochi } from 'mochi-framework'; await Mochi.serve({ routes: { /* … */ }, queues: { // the map key is the queue name emails: Mochi.queue<{ to: string }>({ concurrency: 10, process: async (job) => { await sendEmail(job.data.to); return { sent: true }; }, }), }, }); // from a page action, an API route, anywhere: await Mochi.getQueue<{ to: string }>('emails').add('send', { to: 'alice@example.com' }); ``` ### `Mochi.queue()` ```ts const queueConfig = Mochi.queue({ process, ...options }); ``` `Mochi.queue()` returns an inert config — mount it under the queue name in `Mochi.serve({ queues })`. The required `process` function receives a read-only `MochiJob` and returns the job result: | Field | Type | Notes | | ------------ | -------- | --------------------------------------- | | `id` | `string` | job id | | `name` | `string` | job name passed to `add()` | | `data` | `T` | the enqueued payload | | `queue` | `string` | queue name | | `attempt` | `number` | 1-based attempt number (1 on first run) | | `enqueuedAt` | `number` | epoch ms when enqueued | Options: `concurrency` (jobs processed at once), `dataPath` (see below), and `on` for lifecycle listeners: ```ts Mochi.queue({ concurrency: 10, process, on: { completed: (job, result) => log.info(`${job.name} done`), failed: (job, error) => log.warn(`${job.name} failed: ${error.message}`), }, }); ``` Or subscribe globally on the [`mochiEvents` bus](#observability) (filter by `queue` name) — handy when the listener lives far from the queue declaration. ### `Mochi.getQueue()` `Mochi.getQueue(name)` resolves the producer handle for a mounted queue. Pass the payload type explicitly. It throws if the name was never declared in `Mochi.serve({ queues })`, or if reached before `Mochi.serve()` mounted its queues. | Method | Returns | Notes | | ------------------------ | ------------------------ | ------------------------ | | `add(name, data, opts?)` | `Promise` | enqueue one job | | `addBulk(jobs)` | `Promise` | enqueue many in one call | `MochiJobRef` is `{ id, name }`. Per-job options: `priority`, `delay` (ms), `attempts`, `jobId`. ```ts const emails = Mochi.getQueue<{ to: string }>('emails'); await emails.add('send', { to: 'bob@example.com' }, { priority: 10, delay: 5000 }); await emails.addBulk([ { name: 'send', data: { to: 'a@x.com' } }, { name: 'send', data: { to: 'b@x.com' }, opts: { priority: 10 } }, ]); ``` ### A shared queue module The common pattern is a shared module that exports the queue config (so your entry can mount it) while route code produces by name. ```ts // jobs.server.ts import { Mochi } from 'mochi-framework'; export const emailQueue = Mochi.queue<{ to: string }>({ process: async (job) => { await sendEmail(job.data.to); return { sent: true }; }, }); ``` ```ts // routes.ts import { Mochi, success } from 'mochi-framework'; export const routes = { '/signup': Mochi.page('./Signup.svelte', { actions: { register: async ({ request }) => { const data = await request.formData(); await Mochi.getQueue<{ to: string }>('emails').add('send', { to: String(data.get('email')) }); return success({ ok: true }); }, }, }), }; ``` ```ts // index.ts import { Mochi } from 'mochi-framework'; import { routes } from './routes'; import { emailQueue } from './jobs.server'; await Mochi.serve({ routes, queues: { emails: emailQueue }, }); ``` ### Persistence By default the queue is **in-memory** — jobs do not survive a restart. Pass `dataPath` to persist to SQLite: ```ts Mochi.queue({ process, dataPath: '.mochi/queue.sqlite' }); ``` bunqueue locks the embedded store to the **first** `dataPath` used in the process. Use one `dataPath` across all your queues; conflicting paths are ignored (Mochi logs a warning). ### Advanced options Mochi wraps a small, stable core. For bunqueue features Mochi doesn't surface first-class — retry backoff, rate limiting, cron/repeat, dead-letter queue, deduplication — pass a `bunqueue` object that is forwarded verbatim to the underlying queue and worker: ```ts const apiCalls = Mochi.queue({ process, defaultJobOptions: { attempts: 5 }, bunqueue: { limiter: { max: 100, duration: 1000 } }, }); await Mochi.getQueue('api-calls').add('call', data, { attempts: 5, bunqueue: { backoff: { type: 'jitter', delay: 1000 } }, }); ``` See the [bunqueue docs](https://bunqueue.dev/guide/simple-mode/) for the full option set. ### Observability Queues emit [events](/docs/events/) on the `mochiEvents` bus — `queue:added`, `queue:active`, `queue:completed`, `queue:failed`, `queue:error` — and the built-in [console logger](/docs/logging/) prints a `QUEUE` line per job. Wire your own metrics directly: ```ts import { mochiEvents } from 'mochi-framework'; mochiEvents.on('queue:completed', ({ queue, jobName, duration }) => { metrics.timing('queue.job', duration, { queue, job: jobName }); }); ``` ### Dev mode & hot reload A queue is instantiated **once**, by `Mochi.serve({ queues })` — `Mochi.queue()` itself is just inert config, so the dev route hot-reload watcher re-running your modules can't spawn a duplicate consumer. The trade-off: **changes to a queue's `process` function or options don't hot-reload** — restart the dev server to apply them. Because the queue module is imported once, you can keep ordinary in-memory state (a results buffer, a counter) in module scope without it being duplicated. A long-running resource the `process` function _opens_ (a DB pool, a client connection) is your own singleton — if it must survive a dev module re-run, pin it to `globalThis` the way the framework pins its own internals. ### Shutdown Queues close gracefully when `Mochi.serve()` receives `SIGTERM`/`SIGINT` — in-flight jobs drain before the process exits. A **queue-only process** (no page/API routes) is just `Mochi.serve({ queues })` with no `routes`: ```ts // worker.ts — run with `bun worker.ts` import { Mochi } from 'mochi-framework'; await Mochi.serve({ queues: { emails: Mochi.queue({ process: async (job) => { await sendEmail(job.data.to); }, }), }, }); ``` **Embedded mode only.** Producer and consumer share one process. bunqueue also supports a TCP server mode for distributed workers; Mochi doesn't expose that yet. ### Dependencies Mochi uses bunqueue as the underlying implementation for queues. bunqueue pulls in `msgpackr`, whose optional native accelerator (`msgpackr-extract`) would otherwise drag platform-specific prebuilt binaries into your install — so Mochi swaps it out via a `package.json` `overrides` entry pointing at [`@mochi-framework/msgpackr-extract-stub`](https://www.npmjs.com/package/@mochi-framework/msgpackr-extract-stub), an empty stub that keeps `msgpackr` on its pure-JS codec so no native binaries are installed. New projects scaffolded with `create-mochi` ship this override by default — to opt out and use the native bindings, just delete the `overrides` entry from your `package.json`.