If you know me, you know that I have a thing for background jobs, queues, schedulers, data pipelines, and the intricacies of distributed systems that keep modern web applications running. There is something almost magical about setting up a system that efficiently manages tasks and jobs, and it is a very satisfying feeling when you see it handling real-world traffic and data in production.
Well, if that sounds like you, I have a treat for you today. I recently came across a Node.js library called pg-boss
, and I couldn't stop myself from diving deep into it and playing around with it. So, here we are, and I'm going to share some fun stuff with you today.
What is pg-boss?
pg-boss
is a job queue library for Node.js applications that uses PostgreSQL as its storage engine for persistence. It is very simple and straightforward to use, and it has a lot of features that make it a great choice when you need to add background jobs and specifically jobs that you want to run on a schedule or at a specific time.
Why use PostgreSQL for a job queue?
The better question is, why not? PostgreSQL is not just a workhorse in the world of relational databases; it's also surprisingly adept at handling queue-like tasks thanks to its robust locking mechanisms, transaction support, and the NOTIFY/LISTEN system. Chances are, you're already using PostgreSQL in your application, so it would make sense to use it for your job queue until you have a good reason not to. Which doesn't happen often.
Many companies and startups use a DB-based job queue system like this in production, such as hey.com, handling millions of jobs per day. If it's good enough for them, it's probably good enough for you too.
So, with that out of the way, let's get started. First, I'm going to show you how to do some basic stuff with pg-boss
, and then we'll dive into implementing a wrapper around it that will make it easier to use in TypeScript applications.
Getting Started with pg-boss
Before we dive into the code, make sure you have PostgreSQL installed and running. You'll also need to create a database for pg-boss to use. With that set up, let's get pg-boss installed:
Now, initialize pg-boss in your TypeScript application:
Here, we're creating a new instance of pg-boss
and passing our PostgreSQL connection string. The start method initializes pg-boss
, creating the necessary tables and database objects it needs to manage your jobs.
Sending Your First Job into the Queue
With pg-boss
up and running, let's send a job into the queue. We'll use the send
method to do this:
In this example, we're queuing up a job called welcome-email
and passing it an object with an email address. This is the data that will be passed to the job handler when it's time to process the job.
Processing Jobs with Handlers
Queuing jobs is only half the battle; we also need to process them. In pg-boss
, this is done by creating workers that listen for specific job names:
This worker listens for jobs with the name welcome-email
. When a job with that name is queued, the worker will process it by calling the handler function and passing in the job data. In this example, we're logging the email address to the console.
Scheduling Jobs with Cron
Let's say we want to send a daily summary email to all our users at 8 AM every day. pg-boss
makes this easy with cron scheduling:
Here, we're scheduling a job called send-daily-summary
to run every day at 8 AM. We're also passing it an object with a summary
property that contains the type of summary we want to send. This data will be passed to the job handler when the job is processed.
Delaying Jobs with a Timestamp
Sometimes, you want to schedule a job to run at a specific time in the future. For example, you might want to send a reminder email to a user 24 hours after they sign up. pg-boss
makes this easy with timestamps:
Here, we're scheduling a job called send-reminder
to run tomorrow. We're also passing it an object with an email address. This data will be passed to the job handler when the job is processed.
Instead, we can send the number of seconds to delay the job as the last argument:
In this example, we're scheduling a job called send-reminder
to run 24 hours from now.
Configuring Job Options
When creating a job using the send()
function or related methods, there are various options you can set to customize the behavior of the job. Here's a breakdown of a few of the most common options:
General Options
- priority: An integer value that specifies the priority of the job. Higher numbers indicate higher priority.
Retry Options
- retryLimit: The maximum number of times a failed job will be retried.
- retryDelay: The delay, in seconds, between retries of failed jobs.
- retryBackoff: A boolean value that, when set to true, enables exponential backoff retries based on the
retryDelay
.
Expiration Options
- expireInSeconds: The number of seconds a job can remain in the active state before it is considered expired and failed.
- expireInMinutes: The number of minutes a job can remain in the active state before it is considered expired and failed.
- expireInHours: The number of hours a job can remain in the active state before it is considered expired and failed.
Retention Options
- retentionSeconds: The number of seconds a job will be retained in the created or retry state before it's archived.
- retentionMinutes: The number of minutes a job will be retained in the created or retry state before it's archived.
- retentionHours: The number of hours a job will be retained in the created or retry state before it's archived.
- retentionDays: The number of days a job will be retained in the created or retry state before it's archived.
Deferred Jobs
- startAfter: Specifies when the job should start. It can be an integer (seconds to delay), a string (UTC Date time in ISO 8601 format), or a Date object.
Unique Jobs
- singletonKey: A unique key that ensures only one job with the same name and key can be queued or active.
- useSingletonQueue: When used with
singletonKey
, ensures only one job can be queued.
Throttled Jobs
- singletonSeconds: Specifies the interval in seconds to throttle job creation.
- singletonMinutes: Specifies the interval in minutes to throttle job creation.
- singletonHours: Specifies the interval in hours to throttle job creation.
- singletonNextSlot: When true, schedules the job to run after the current time slot if throttled.
These options allow you to control how jobs are processed, retried, and managed within pg-boss. You can set defaults for these options in the constructor or override them per job when calling send()
. We will circle back to these options later when we implement our wrapper around pg-boss
.
TypeScript and pg-boss: A Perfect Pair
Now, with the basics out of the way, let's get to the fun part. I'm going to build a tiny wrapper around pg-boss
that will make it easier for us to handle jobs in our TypeScript applications.
The plan is to define a Job
interface that will encapsulate each job so we can have a unified way of defining and handling jobs. In addition, we are going to create a JobManager
class that will handle all of our jobs through a single and type-safe interface.
Defining the Job Interface
Let's start by defining our Job
interface:
Here, we're defining a generic Job
interface that takes a type parameter T
that extends the object
type. This will allow us to define the data that will be passed to the job handler when the job is processed. We are defining a JobType
type that will be used to define the job name, and we're also defining an options
property that will be used to set the job options. (This is where we'll use the options we discussed earlier.)
The Job
interface also defines a few methods that we'll use to interact with the job. The start()
method will be used to start the job, the work()
method will be used to define the job handler, and the emit()
method will be used to emit the job data to the handler.
Implementing the Job Interface
Now that we have our Job
interface defined, I'm going to implement an abstract base class called BaseJob
that implements the Job
interface:
Here, we implemented the base class BaseJob
that implements the base options
property and start()
and emit()
methods. We also defined the type
property as an abstract property that will be implemented by the concrete job classes. This makes it easy to define new jobs by extending the BaseJob
class and implementing the abstract properties and methods. Most of our jobs will reuse the same start()
and emit()
methods, so we don't have to implement them over and over again.
Defining the WelcomeEmailJob Class
Now that we have our BaseJob
class defined, let's define a concrete job class called WelcomeEmailJob
that will be used to send welcome emails to new users. For the purpose of this example, I'll just console.log the email address, but in a real application, you would send an email to that address:
With the abstract BaseJob
class defined, our WelcomeEmailJob
class is very simple. We're defining the type
property as welcome_email
, and we're implementing the work()
method to log the email address to the console.
Ideally, you would want to do some validation on the email address and proper error handling in case the email fails to send. But for the purpose of this example, we'll keep it simple.
Testing the WelcomeEmailJob Class
Now that we have our WelcomeEmailJob
class defined, let's test it out by sending a job into the queue:
Here, we're creating a new instance of WelcomeEmailJob
and passing it the boss
instance we created earlier. We're then calling the start()
method to start the job, and finally, we're calling the emit()
method to send the job into the queue.
If you run this code, you should see the email address logged to the console.
Easy, right? Well, it gets better. Let's now define an onboarding-email
job that will send an onboarding email to the user 24 hours after they sign up. The difference here is that we are going to change the default SendOptions
and set a startAfter
option to delay the job by 24 hours:
Of course, we have to go back to our JobType
type and add the onboarding_email
type:
Now, let's test it out:
Here, we've created a new instance of OnboardingEmailJob
and passed it the boss
instance we created earlier. We're then calling the start()
method to start the job, and finally, we're calling the emit()
method to send the job into the queue.
If you run this code, and wait for 24 hours (literally), you should see the email address logged to the console.
P.S. Okay, maybe you don't have to wait 24 hours. You can just change the startAfter
option to a smaller value, like 10 seconds, and it should work just fine.
Small, Self-Contained, and Isolated Units of Work
One of the things I love about this approach is that each job is a small and self-contained unit of work. It has its own data, its own handler, and its own options. These units can be easily understood, tested, and managed. Thanks to PostgreSQL and pg-boss
, we can easily scale these units of work to handle millions of jobs per day and even run them in a distributed environment without any issues or race conditions.
However, there is one thing that I don't like about this approach, and that is the fact that we have to create a new instance of WelcomeEmailJob
and OnboardingEmailJob
every time we want to send a job into the queue. This is not a big deal when you have a few jobs, but it can get tedious when you have a lot of jobs.
So, let's see if we can improve this by creating a JobManager
class that will handle all of our jobs through a single and type-safe interface.
Defining the JobManager Class
The JobManager
class will be responsible for managing all of our jobs. It needs to know about all of our jobs, provide a way to start them, and it should also be type-safe, so we don't have to worry about passing the wrong data to the wrong job.
Let's get our hands dirty and start implementing the JobManager
class. First, let's define a type map that correlates each job type to its corresponding job class:
This will become handy when we implement the emit()
method later. Next, let's define the JobManager
class:
Here, we have defined the JobManager
class that takes a boss
instance in the constructor. We're also defining a jobs
map that correlates each job type to its corresponding job instance. We are using the register()
method to register each job with the JobManager
instance, and we're using the start()
method to start all of our jobs. Finally, we're using the emit()
method to emit a job into the queue.
Note that the emit()
method is using the JobTypeMapping
type to ensure that we're passing the correct data to the correct job. This is a very powerful feature that will save us a lot of headaches down the road.
Testing the JobManager Class
Now that we have our JobManager
class defined, let's test it out by registering our jobs and sending some jobs into the queue:
Our JobManager
class is very simple. We're creating a new instance of JobManager
and passing it the boss
instance we created earlier. We're then registering our jobs with the JobManager
instance. Note that I returned the JobManager
instance from the register()
method so we can chain the calls. Finally, we're calling the start()
method to start boss
and all of our jobs.
Once we have our JobManager
instance up and running, we can use the emit()
method to send jobs into the queue. The best part is that we get type safety for free. If we try to pass the wrong data to the wrong job, or a job that doesn't exist, we'll get a compile-time error.
Isn't that beautiful? :)
Conclusion
And that's it. We have successfully implemented a tiny wrapper around pg-boss
that makes it easier to use in TypeScript applications. We have also implemented a JobManager
class that handles all of our jobs through a single and type-safe interface.
This is the level of abstraction I like to work with when it comes to most things in software development. It's focused on creating clear, manageable layers of abstraction that make it easy to understand and work with the underlying system. It's also focused on making it easy to test and maintain the codebase.
Happy scheduling jobs, and may your queues always be managed with precision and efficiency!
Interested in LogSnag?