Table of contents
adonis5-queue
priority-queue worker-queue job-queue
Adonis5 Queue
Adonis5 queue is a worker-based queue library for AdonisJS, it is backed by Kue and Kue-Scheduler.
Features
- Ace commands for generating jobs and start workers
- Use your existing adonis modules in your queue processor
- (Close-to) Full kue/kue-scheduler API supported including future/repeat job scheduling
- Produce/Consumer model for structuring your job
- Simple and Elegant API for scheduling and processing your jobs
Consumer/Producer model
Instead of defining Job as a single entity, this library separates the responsibility of job into consumer and producer, here are the definitions:
Producer: Define the static properties of the job, in kue's context, supported properties include priority, attempts, backOff, delay, ttl and unique. Documentations of each property can be found in Kue and Kue-scheduler's Github.
Consumer: Define the runtime properties of the job, in kue's context, supported properties include concurrency and the process handler.
Example of a basic producer/consumer pair can be found by generating a sample job using the node ace queue:job
command.
Notices
This version only support Adonis V5+.
Installation
Install it:
npm i --save adonis5-queue
Compile your code:
node ace serve --watch
Connect all dependences:
node ace invoke adonis5-queue
Create a job
Make your first job:
node ace queue:job ExampleJob
or:
node ace queue:job ExampleJob --jobId='custom-job-id'
The option jobId
is optional, if not provided, the kue type for the job will be a kebab-case of the argument. i.e. SendEmail -> send-email.
This command will create job producers and consumers in designated directories, which are configurable in config/queue.js with consumerPath and producerPath; this defaults to app/Jobs/{Consumers | Producers}.
The job consumers and producers will both run in Adonis framework's context, thus you can easily use any supported libraries within the job file.
Run worker
node ace queue:work
Notice: For multiworker usage, you can use tools such as Supervisor or PM2 or sclalable docker containers with orchestrator such as Docker Swarm or Kuberneties (k8s), and the command will be node ace queue:work
in your app’s root directory.
Job API
The producer job file supports Kue job properties which are defined as an ES6 get
property in the class, see example by running node ace queue:job
.
Refer to supported job properties above in the Consumer/Producer Model section.
The consumer job file supports Kue job's concurrecy defined as an ES6 static get
property in the class, see example by running node ace queue:job
.
The processing function is defined as an async function async handle()
or handle()
depending on whether your task is asynchronous. Within the task class, you can access constructor-injected payload with this.data
.
The producer job class also supports job events, listed below:
// with in producer class
// job has been created and scheduled
// useful for retrieving redis id for the job
onInit(Kue/Job job)
// See kue documentation for below
onEnqueue(String jobType)
onStart(String jobType)
onPromotion(String jobType)
onProgress(Float progress)
// data returned from handle() method
onComplete(Object data)
onRemove(String jobType)
// error caught in the handle() method
onFailed(Error error)
onFailedAttempts(Error error)
This producer job class itself is an Event Listener, thus you can export the data received from the job event to the outside world.
A useful scenario is to remove job after it has been initialized:
// within job producer class
onInit(job) {
this.emit('init');
}
// outside of the consumer
// for queue.remove() see Queue API below
job.on('init', async () => await Queue.remove(job));
Queue API
Access the queue
const Queue = use('@ioc:Adonis5/Qeo');
Push job onto the queue
// optionally inject data into the job class using constructor
// and access it in the consumer handler using this.data
const ExampleJob = use('App/Jobs/Producer/ExampleJob');
Queue.dispatch(new ExampleJob({'data': 'whatever'}));
Queue.dispatch()
has a second optional String
argument default to 'now', which reflects the Kue-Scheduler API:
// schedule a job immediately
Queue.dispatch(new ExampleJob, 'now');
// schedule a repeated job every 2 seconds
// basically embeding the 'every' method into the string itself
Queue.dispatch(new ExampleJob, 'every 2 seconds');
// schedule a single job in the future
Queue.dispatch(new ExampleJob, '2 seconds from now');
Remove jobs
Remove a single job, the argument must be the job instance you created:
// asynchronous removal...
Queue.remove(job).then(response => {}, error => {});
Clear all jobs:
// also returns a promise
Queue.clear().then(response => {}, error => {});
Note: currently clear() will not trigger the remove event on the job.
Run tests
Please clone this repo, install the dependencies, and run npm run build && npm run test
to run the spec tests. (Make sure redis is installed and configured properly as required by Kue).
You can also contribute to the test repo by submitting issues and PRs.
Development
Contributions are welcome! This is a community project so please send a pull request whenever you feel like to!
Thanks
Many thanks to:
- Harminder Virk
- Romain Lanz
- Jackie Zhang for older version (it used in Adonis 4) Adonis queue pro