Credit: M. Salman Agustian
Queue is an important technique in Node.js used for effectively handling asynchronous operations. These operations exist in different forms, including HTTP requests, read or write file operations, streams, and more.
Bull is a Node library that implements a fast and robust queue system based on redis. If you are new to queues you may wonder why they are needed after all.
"Queues can solve many different problems in an elegant way, from smoothing out processing peaks to creating robust communication channels between microservices or offloading heavy work from one server to many smaller workers, etc."
In its implementation, Bull has three (3) main cores concept including Producers, Consumers and Listeners.
- Simple Producers is a code node js program where its function is to add a job to the Queue. One Producers can have one or more Producers.
- Consumers is a process function where the main function is to run jobs that have been previously added to the Queue. The relationship between Consumers and Queues is almost the same as that of Producers in that the Queue can have one or more Consumers.
- While Listeners is a process function that functions to provide actions/events on changes in Queue status (Progress, Completed, Failed, etc).
"The three main cores concept above make a job lifecycle that we can use to find out all the potential uses of Bull."
A Job Lifecycle
For full explanation, you can see directly here.
In the next session to understand more about using Queue, we will implement creating a Queue Send User Purchase Invoice using Bull in Nest Js.
. . .
Installation Bull in Nest Js
- NestJS provides @nestjs/bull package as a wrapper on top of the Bull.
- You need to install the Redis server before you get into the Bull. Because Bull is based on Redis.
- Then run the command below to install Bull dependency in Nest Js.
npm install — save @nestjs/bull bull
npm install — save-dev @types/bull
Next… Is How To Config Bull in Nest Js ⚙️
Assume we already have a fresh Nest JS installation. Then create a config/queue folder inside the src folder. Next we create 3 files, namely: config.module.ts, config.provider.ts, and config.ts. The folder structure we have now looks like this:
config.module.ts
- We will configure bull asynchronously instead of statically, in this case use the forRootAsync() method.
- For the Queue Options configuration, we use the Factory Class, using the useClass option, where the value is the Queue Options provider configuration.
- Registering the service Queue we can use the registerQueue() method. with the Queue name argument that we will use.
config.provider.ts
- The QueueConfigProvider class must implement the SharedBullConfigurationFactory interface.
- We import QueueOptions from a separate config file to make the Queue Config look more modular.
config.ts
- These are all Queue Options configurations consisting of redis options, prefixes and DefaultJobOptions, which we apply to the application’s Queue system.
- We can use environment variables to store all global variables that we use.
- DefaultJobOptions serves as the default way of working that we apply to the Queue. (later we can override this setting).
What Next…? We Create Our Route EndPoint API 🚀
First we create a purchase folder in the src folder. Then we need some files like purchase.controller.ts, purchase.service.ts and purchase.module.ts.
purchase.controller.ts
- For Route path API, we give /api/purchase prefix.
- We perform Dependecy Injection (DI) purchaseService into our purchase controller.
- Inside the send() method of type POST method, we have a dummy array of User Invoice objects with interface type IUserPurchase[]. Sent into the sendInvoiceCustomer method that comes from our Purchase Service.
purchase.service.ts
- In this service, we create a Producers to add a job to the queue. We do Queue injection on the purchase service.
- In this Producers example we create a single job. To create a named job we need to add 1 additional argument. Something like this the writing await this.purchaseQueue.add(‘our named job’, {…}).
- You can add Queue Options. An example of Queue Options can be seen here.
purchase.module.ts
- Since we need Queue injection in the previous purchaseService, we need to import PurchaseQueueModule against PurchaseModule API.
Last but not least.. We Need To Create Our Job Processor ⚡
We create a job/purchase folder in the src folder with the aim of separating the config queue folder, route API folder, and job folder. Here we need two files for the execution of the job that we added to the previous Queue, including: purchase-queue.processor.ts and purchase-queue.module.ts.
purchase-queue.processor.ts
- The processor will handle the operations of the data added to the queue. A queue can have one Processor and many processes. @Processor and @Process decorators can be used to indicate the processor and processes.
- The registered queue name will be used as the Processor name (send-invoice)
- The business logic goes inside the Process.
In this example, we only display invoice data on the job using Nest Js’ built-in logger. - If needed we create a listener to catch the Queue state changes. In our example we created a Listener to catch state changes when the Queue is finished with @OnQueueCompleted decorators.