Skip to content
NestJS ns tasks 4 min read

Queue Processors & Concurrency

Adding a job to a BullMQ queue is only half the story — something has to pull jobs off the queue and run them. In NestJS that something is a processor: a provider decorated with @Processor whose @Process methods consume jobs. Processors are where the real work happens (sending email, transcoding video, generating PDFs), and they are also where you control how many jobs run at once, how fast they drain, and how progress flows back to clients. Getting these knobs right is the difference between a snappy background system and one that hammers your database into the ground.

Defining a processor

A processor is a class registered against a named queue. Decorate it with @Processor('queue-name') and mark consumer methods with @Process(). Each handler receives the BullMQ Job object, which carries the payload, metadata, and helpers for reporting progress.

// src/email/email.processor.ts
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
import { MailerService } from './mailer.service';

interface SendEmailJob {
  to: string;
  template: string;
  context: Record<string, unknown>;
}

@Processor('email')
export class EmailProcessor {
  private readonly logger = new Logger(EmailProcessor.name);

  constructor(private readonly mailer: MailerService) {}

  @Process('send')
  async handleSend(job: Job<SendEmailJob>): Promise<{ messageId: string }> {
    const { to, template, context } = job.data;
    await job.progress(10);

    const html = await this.mailer.render(template, context);
    await job.progress(60);

    const messageId = await this.mailer.deliver(to, html);
    await job.progress(100);

    return { messageId };
  }

  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(`Processing job ${job.id} of type ${job.name}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job, result: unknown) {
    this.logger.log(`Job ${job.id} completed: ${JSON.stringify(result)}`);
  }

  @OnQueueFailed()
  onFailed(job: Job, err: Error) {
    this.logger.error(`Job ${job.id} failed: ${err.message}`);
  }
}

Register the processor like any provider in the module that owns the queue.

// src/email/email.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { EmailProcessor } from './email.processor';
import { MailerService } from './mailer.service';

@Module({
  imports: [BullModule.registerQueue({ name: 'email' })],
  providers: [EmailProcessor, MailerService],
})
export class EmailModule {}

Output:

[Nest] LOG [EmailProcessor] Processing job 42 of type send
[Nest] LOG [EmailProcessor] Job 42 completed: {"messageId":"<a1b2@smtp>"}

The return value of a @Process handler is stored as the job’s returnvalue and forwarded to @OnQueueCompleted. Throwing inside the handler marks the job failed and triggers any configured retry/backoff. Never swallow errors you want BullMQ to retry.

Tuning concurrency

By default a processor runs one job at a time per worker. Pass a concurrency number to @Process to let a single worker process run several jobs in parallel. Because Node.js is single-threaded, concurrency only helps when your handler is I/O-bound (network, DB, disk) — it overlaps waits rather than running CPU work truly in parallel.

@Process({ name: 'send', concurrency: 5 })
async handleSend(job: Job<SendEmailJob>) {
  // up to 5 'send' jobs run concurrently in this process
}
ApproachHowBest for
Handler concurrency@Process({ concurrency: 5 })I/O-bound work in one process
Multiple workersRun more app instances / replicasCPU-bound work, horizontal scale
Separate processor appDedicated worker deploymentIsolating heavy jobs from the API

For CPU-heavy work, prefer scaling out: run several instances of your worker, or use BullMQ sandboxed processors (a separate file run in a child process) so a blocking job never stalls your HTTP event loop.

Rate limiting

Concurrency caps parallelism; rate limiting caps throughput over time — essential when a downstream API allows, say, 100 calls per minute. Configure it on the queue registration via limiter.

BullModule.registerQueue({
  name: 'email',
  limiter: {
    max: 100,      // at most 100 jobs
    duration: 60_000, // per 60 seconds
  },
});

When the limit is hit, BullMQ pauses pulling new jobs until the window resets — jobs already in flight finish normally. Combine a sensible concurrency with limiter to stay both responsive and within quota.

Reporting progress

Long-running jobs should report progress so UIs and dashboards stay informative. Call job.progress(value) with a number or an object; listeners receive updates via @OnQueueProgress.

@Process('transcode')
async transcode(job: Job<{ fileId: string }>) {
  for (let pct = 0; pct <= 100; pct += 25) {
    await this.doChunk(job.data.fileId, pct);
    await job.progress({ percent: pct, stage: 'encoding' });
  }
}

@OnQueueProgress()
onProgress(job: Job, progress: number | object) {
  this.logger.debug(`Job ${job.id} progress: ${JSON.stringify(progress)}`);
}

You can push these events to clients over a WebSocket gateway or Server-Sent Events for live progress bars.

Best Practices

  • Keep payloads small — store an ID and re-fetch large data inside the handler rather than serializing blobs into Redis.
  • Set concurrency only for I/O-bound work; for CPU-bound jobs scale workers or use sandboxed processors instead.
  • Always pair limiter with downstream API quotas so you never trip rate-limit bans under load.
  • Make handlers idempotent — a job may run more than once after a crash or retry.
  • Report progress on anything that runs longer than a couple of seconds so operators and users have visibility.
  • Use the @OnQueueFailed / @OnQueueCompleted listeners for observability, but keep business retries driven by BullMQ’s attempts and backoff options.
  • Run dedicated worker instances in production so a slow job never degrades API latency.
Last updated June 14, 2026
Was this helpful?