Published on December 22, 2023

Scheduled and Background Jobs with pg-boss in TypeScript: A Deep Dive

Scheduled and Background Jobs with pg-boss in TypeScript: A Deep Dive

If you know me, you know that I have a thing for background jobs, queues, schedulers, data pipelines, and the intricacies of distributed systems that keep modern web applications running. There is something almost magical about setting up a system that efficiently manages tasks and jobs, and it is a very satisfying feeling when you see it handling real-world traffic and data in production.

Well, if that sounds like you, I have a treat for you today. I recently came across a Node.js library called pg-boss, and I couldn't stop myself from diving deep into it and playing around with it. So, here we are, and I'm going to share some fun stuff with you today.

What is pg-boss?

pg-boss is a job queue library for Node.js applications that uses PostgreSQL as its storage engine for persistence. It is very simple and straightforward to use, and it has a lot of features that make it a great choice when you need to add background jobs and specifically jobs that you want to run on a schedule or at a specific time.

Why use PostgreSQL for a job queue?

The better question is, why not? PostgreSQL is not just a workhorse in the world of relational databases; it's also surprisingly adept at handling queue-like tasks thanks to its robust locking mechanisms, transaction support, and the NOTIFY/LISTEN system. Chances are, you're already using PostgreSQL in your application, so it would make sense to use it for your job queue until you have a good reason not to. Which doesn't happen often.

Many companies and startups use a DB-based job queue system like this in production, such as hey.com, handling millions of jobs per day. If it's good enough for them, it's probably good enough for you too.

So, with that out of the way, let's get started. First, I'm going to show you how to do some basic stuff with pg-boss, and then we'll dive into implementing a wrapper around it that will make it easier to use in TypeScript applications.

Getting Started with pg-boss

Before we dive into the code, make sure you have PostgreSQL installed and running. You'll also need to create a database for pg-boss to use. With that set up, let's get pg-boss installed:

npm install pg-boss

Now, initialize pg-boss in your TypeScript application:

import PgBoss from "pg-boss";

const boss = new PgBoss("postgres://username:password@localhost:5432/pgboss");

await boss.start();

Here, we're creating a new instance of pg-boss and passing our PostgreSQL connection string. The start method initializes pg-boss, creating the necessary tables and database objects it needs to manage your jobs.

Sending Your First Job into the Queue

With pg-boss up and running, let's send a job into the queue. We'll use the send method to do this:

await boss.send("welcome-email", { email: "john@doe.com" });

In this example, we're queuing up a job called welcome-email and passing it an object with an email address. This is the data that will be passed to the job handler when it's time to process the job.

Processing Jobs with Handlers

Queuing jobs is only half the battle; we also need to process them. In pg-boss, this is done by creating workers that listen for specific job names:

await boss.work("welcome-email", async (job) => {
console.log(`[welcome-email] Sending email to ${job.data.email}`);
});

This worker listens for jobs with the name welcome-email. When a job with that name is queued, the worker will process it by calling the handler function and passing in the job data. In this example, we're logging the email address to the console.

Scheduling Jobs with Cron

Let's say we want to send a daily summary email to all our users at 8 AM every day. pg-boss makes this easy with cron scheduling:

await boss.schedule("send-daily-summary", "0 8 * * *", { summary: "daily" });

Here, we're scheduling a job called send-daily-summary to run every day at 8 AM. We're also passing it an object with a summary property that contains the type of summary we want to send. This data will be passed to the job handler when the job is processed.

Delaying Jobs with a Timestamp

Sometimes, you want to schedule a job to run at a specific time in the future. For example, you might want to send a reminder email to a user 24 hours after they sign up. pg-boss makes this easy with timestamps:

const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);

await boss.schedule("send-reminder", { email: "john@doe.com" }, {}, tomorrow);

Here, we're scheduling a job called send-reminder to run tomorrow. We're also passing it an object with an email address. This data will be passed to the job handler when the job is processed.

Instead, we can send the number of seconds to delay the job as the last argument:

await boss.schedule("send-reminder", { email: "john@doe.com" }, {}, 86400);

In this example, we're scheduling a job called send-reminder to run 24 hours from now.

Configuring Job Options

When creating a job using the send() function or related methods, there are various options you can set to customize the behavior of the job. Here's a breakdown of a few of the most common options:

General Options

  • priority: An integer value that specifies the priority of the job. Higher numbers indicate higher priority.

Retry Options

  • retryLimit: The maximum number of times a failed job will be retried.
  • retryDelay: The delay, in seconds, between retries of failed jobs.
  • retryBackoff: A boolean value that, when set to true, enables exponential backoff retries based on the retryDelay.

Expiration Options

  • expireInSeconds: The number of seconds a job can remain in the active state before it is considered expired and failed.
  • expireInMinutes: The number of minutes a job can remain in the active state before it is considered expired and failed.
  • expireInHours: The number of hours a job can remain in the active state before it is considered expired and failed.

Retention Options

  • retentionSeconds: The number of seconds a job will be retained in the created or retry state before it's archived.
  • retentionMinutes: The number of minutes a job will be retained in the created or retry state before it's archived.
  • retentionHours: The number of hours a job will be retained in the created or retry state before it's archived.
  • retentionDays: The number of days a job will be retained in the created or retry state before it's archived.

Deferred Jobs

  • startAfter: Specifies when the job should start. It can be an integer (seconds to delay), a string (UTC Date time in ISO 8601 format), or a Date object.

Unique Jobs

  • singletonKey: A unique key that ensures only one job with the same name and key can be queued or active.
  • useSingletonQueue: When used with singletonKey, ensures only one job can be queued.

Throttled Jobs

  • singletonSeconds: Specifies the interval in seconds to throttle job creation.
  • singletonMinutes: Specifies the interval in minutes to throttle job creation.
  • singletonHours: Specifies the interval in hours to throttle job creation.
  • singletonNextSlot: When true, schedules the job to run after the current time slot if throttled.

These options allow you to control how jobs are processed, retried, and managed within pg-boss. You can set defaults for these options in the constructor or override them per job when calling send(). We will circle back to these options later when we implement our wrapper around pg-boss.

TypeScript and pg-boss: A Perfect Pair

Now, with the basics out of the way, let's get to the fun part. I'm going to build a tiny wrapper around pg-boss that will make it easier for us to handle jobs in our TypeScript applications.

The plan is to define a Job interface that will encapsulate each job so we can have a unified way of defining and handling jobs. In addition, we are going to create a JobManager class that will handle all of our jobs through a single and type-safe interface.

Defining the Job Interface

Let's start by defining our Job interface:

import pgBoss from "pg-boss";

type JobType = "welcome-email";

interface Job<T extends object> {
type: JobType;
options: pgBoss.SendOptions;
start: () => Promise<void>;
work: (job: pgBoss.Job<T>) => Promise<void>;
emit: (data: T) => Promise<void>;
}

Here, we're defining a generic Job interface that takes a type parameter T that extends the object type. This will allow us to define the data that will be passed to the job handler when the job is processed. We are defining a JobType type that will be used to define the job name, and we're also defining an options property that will be used to set the job options. (This is where we'll use the options we discussed earlier.)

The Job interface also defines a few methods that we'll use to interact with the job. The start() method will be used to start the job, the work() method will be used to define the job handler, and the emit() method will be used to emit the job data to the handler.

Implementing the Job Interface

Now that we have our Job interface defined, I'm going to implement an abstract base class called BaseJob that implements the Job interface:

abstract class BaseJob<T extends object> implements Job<T> {
protected boss: pgBoss;
abstract readonly type: JobType;
readonly options = { retryLimit: 3, retryDelay: 1000 };

constructor(boss: pgBoss) {
this.boss = boss;
}

async start(): Promise<void> {
await this.boss.work(this.type, this.work);
}

abstract work(job: pgBoss.Job<T>): Promise<void>;

async emit(data: T): Promise<void> {
await this.boss.send(this.type, data, this.options);
}
}

Here, we implemented the base class BaseJob that implements the base options property and start() and emit() methods. We also defined the type property as an abstract property that will be implemented by the concrete job classes. This makes it easy to define new jobs by extending the BaseJob class and implementing the abstract properties and methods. Most of our jobs will reuse the same start() and emit() methods, so we don't have to implement them over and over again.

Defining the WelcomeEmailJob Class

Now that we have our BaseJob class defined, let's define a concrete job class called WelcomeEmailJob that will be used to send welcome emails to new users. For the purpose of this example, I'll just console.log the email address, but in a real application, you would send an email to that address:

type WelcomeEmail = { email: string };
export class WelcomeEmailJob extends BaseJob<WelcomeEmail> {
readonly type = "welcome_email";

async work(job: pgBoss.Job<WelcomeEmail>): Promise<void> {
console.log(`[WelcomeEmailJob] Sent welcome email to ${job.data.email}!`);
}
}

With the abstract BaseJob class defined, our WelcomeEmailJob class is very simple. We're defining the type property as welcome_email, and we're implementing the work() method to log the email address to the console.

Ideally, you would want to do some validation on the email address and proper error handling in case the email fails to send. But for the purpose of this example, we'll keep it simple.

Testing the WelcomeEmailJob Class

Now that we have our WelcomeEmailJob class defined, let's test it out by sending a job into the queue:

const boss = new pgBoss("postgres://username:password@localhost:5432/pgboss");
await boss.start();

const job = new WelcomeEmailJob(boss);
await job.start();

await job.emit({ email: "john@doe.com" });

Here, we're creating a new instance of WelcomeEmailJob and passing it the boss instance we created earlier. We're then calling the start() method to start the job, and finally, we're calling the emit() method to send the job into the queue.

If you run this code, you should see the email address logged to the console.

[WelcomeEmailJob] Sent welcome email to john@doe.com!

Easy, right? Well, it gets better. Let's now define an onboarding-email job that will send an onboarding email to the user 24 hours after they sign up. The difference here is that we are going to change the default SendOptions and set a startAfter option to delay the job by 24 hours:

type OnboardingEmail = { email: string };
export class OnboardingEmailJob extends BaseJob<OnboardingEmail> {
readonly type = "onboarding_email";
readonly options: pgBoss.SendOptions = {
retryLimit: 3,
retryDelay: 1000,
startAfter: 24 * 60 * 60 * 1000,
};

async work(job: pgBoss.Job<OnboardingEmail>): Promise<void> {
console.log(
`[OnboardingEmailJob] Sent onboarding email to ${job.data.email}!`
);
}
}

Of course, we have to go back to our JobType type and add the onboarding_email type:

type JobType = "welcome_email" | "onboarding_email";

Now, let's test it out:

await boss.start();

const welcomeJob = new WelcomeEmailJob(boss);
await welcomeJob.start();

const onboardingJob = new OnboardingEmailJob(boss);
await onboardingJob.start();

await welcomeJob.emit({ email: "john@doe.com" });
await onboardingJob.emit({ email: "john@doe.com" });

Here, we've created a new instance of OnboardingEmailJob and passed it the boss instance we created earlier. We're then calling the start() method to start the job, and finally, we're calling the emit() method to send the job into the queue.

If you run this code, and wait for 24 hours (literally), you should see the email address logged to the console.

[WelcomeEmailJob] Sent welcome email to john@doe.com!
// 24 hours later
[OnboardingEmailJob] Sent onboarding email to john@doe.com!

P.S. Okay, maybe you don't have to wait 24 hours. You can just change the startAfter option to a smaller value, like 10 seconds, and it should work just fine.

Small, Self-Contained, and Isolated Units of Work

One of the things I love about this approach is that each job is a small and self-contained unit of work. It has its own data, its own handler, and its own options. These units can be easily understood, tested, and managed. Thanks to PostgreSQL and pg-boss, we can easily scale these units of work to handle millions of jobs per day and even run them in a distributed environment without any issues or race conditions.

However, there is one thing that I don't like about this approach, and that is the fact that we have to create a new instance of WelcomeEmailJob and OnboardingEmailJob every time we want to send a job into the queue. This is not a big deal when you have a few jobs, but it can get tedious when you have a lot of jobs.

So, let's see if we can improve this by creating a JobManager class that will handle all of our jobs through a single and type-safe interface.

Defining the JobManager Class

The JobManager class will be responsible for managing all of our jobs. It needs to know about all of our jobs, provide a way to start them, and it should also be type-safe, so we don't have to worry about passing the wrong data to the wrong job.

Let's get our hands dirty and start implementing the JobManager class. First, let's define a type map that correlates each job type to its corresponding job class:

type JobTypeMapping = {
welcome_email: WelcomeEmailJob;
onboarding_email: OnboardingEmailJob;
};

This will become handy when we implement the emit() method later. Next, let's define the JobManager class:

export class JobManager {
private readonly boss: pgBoss;
private jobs = new Map<string, Job<any>>();

constructor(boss: pgBoss) {
this.boss = boss;
}

register(job: new (boss: pgBoss) => Job<any>): JobManager {
const jobInstance = new job(this.boss);
this.jobs.set(jobInstance.type, jobInstance);
return this;
}

async start(): Promise<void> {
await this.boss.start();
for (const job of this.jobs.values()) {
await job.start();
}
}

async emit<K extends keyof JobTypeMapping>(
jobName: K,
data: Parameters<JobTypeMapping[K]['emit']>[0]
): Promise<void> {
const job = this.jobs.get(jobName);
if (job === undefined) {
throw new Error(`No job registered with the name ${jobName}`);
}
await job.emit(data);
}
}

Here, we have defined the JobManager class that takes a boss instance in the constructor. We're also defining a jobs map that correlates each job type to its corresponding job instance. We are using the register() method to register each job with the JobManager instance, and we're using the start() method to start all of our jobs. Finally, we're using the emit() method to emit a job into the queue.

Note that the emit() method is using the JobTypeMapping type to ensure that we're passing the correct data to the correct job. This is a very powerful feature that will save us a lot of headaches down the road.

Testing the JobManager Class

Now that we have our JobManager class defined, let's test it out by registering our jobs and sending some jobs into the queue:

const jobs = new JobManager(boss)
.register(WelcomeEmailJob)
.register(OnboardingEmailJob);

await jobs.start();
await jobs.emit("welcome_email", { email: "john@doe.com" });
await jobs.emit("onboarding_email", { email: "shayan@doe.com" });

Our JobManager class is very simple. We're creating a new instance of JobManager and passing it the boss instance we created earlier. We're then registering our jobs with the JobManager instance. Note that I returned the JobManager instance from the register() method so we can chain the calls. Finally, we're calling the start() method to start boss and all of our jobs.

Once we have our JobManager instance up and running, we can use the emit() method to send jobs into the queue. The best part is that we get type safety for free. If we try to pass the wrong data to the wrong job, or a job that doesn't exist, we'll get a compile-time error.

Isn't that beautiful? :)

Conclusion

And that's it. We have successfully implemented a tiny wrapper around pg-boss that makes it easier to use in TypeScript applications. We have also implemented a JobManager class that handles all of our jobs through a single and type-safe interface.

This is the level of abstraction I like to work with when it comes to most things in software development. It's focused on creating clear, manageable layers of abstraction that make it easy to understand and work with the underlying system. It's also focused on making it easy to test and maintain the codebase.

Happy scheduling jobs, and may your queues always be managed with precision and efficiency!

Interested in LogSnag?

Get started right now for free!