Skip to content

Commit d5db024

Browse files
committed
feat(Queues): support working all queues with single command
1 parent a42b6b8 commit d5db024

File tree

3 files changed

+84
-40
lines changed

3 files changed

+84
-40
lines changed

admin/commands/queue/work.ts

Lines changed: 78 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,114 @@ import { BaseCommand, flags } from '@adonisjs/core/ace'
22
import type { CommandOptions } from '@adonisjs/core/types/ace'
33
import { Worker } from 'bullmq'
44
import queueConfig from '#config/queue'
5+
import { RunDownloadJob } from '#jobs/run_download_job'
6+
import { DownloadModelJob } from '#jobs/download_model_job'
7+
import { RunBenchmarkJob } from '#jobs/run_benchmark_job'
58

69
export default class QueueWork extends BaseCommand {
710
static commandName = 'queue:work'
811
static description = 'Start processing jobs from the queue'
912

10-
@flags.string({ description: 'Queue name to process', required: true })
13+
@flags.string({ description: 'Queue name to process' })
1114
declare queue: string
1215

16+
@flags.boolean({ description: 'Process all queues automatically' })
17+
declare all: boolean
18+
1319
static options: CommandOptions = {
1420
startApp: true,
1521
staysAlive: true,
1622
}
1723

1824
async run() {
19-
const queueName = this.queue || 'default'
25+
// Validate that either --queue or --all is provided
26+
if (!this.queue && !this.all) {
27+
this.logger.error('You must specify either --queue=<name> or --all')
28+
process.exit(1)
29+
}
30+
31+
if (this.queue && this.all) {
32+
this.logger.error('Cannot specify both --queue and --all flags')
33+
process.exit(1)
34+
}
35+
36+
const [jobHandlers, allQueues] = await this.loadJobHandlers()
37+
38+
// Determine which queues to process
39+
const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue]
40+
41+
this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`)
42+
43+
const workers: Worker[] = []
2044

21-
const jobHandlers = await this.loadJobHandlers()
45+
// Create a worker for each queue
46+
for (const queueName of queuesToProcess) {
47+
const worker = new Worker(
48+
queueName,
49+
async (job) => {
50+
this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`)
51+
const jobHandler = jobHandlers.get(job.name)
52+
if (!jobHandler) {
53+
throw new Error(`No handler found for job: ${job.name}`)
54+
}
2255

23-
const worker = new Worker(
24-
queueName,
25-
async (job) => {
26-
this.logger.info(`Processing job: ${job.id} of type: ${job.name}`)
27-
const jobHandler = jobHandlers.get(job.name)
28-
if (!jobHandler) {
29-
throw new Error(`No handler found for job: ${job.name}`)
56+
return await jobHandler.handle(job)
57+
},
58+
{
59+
connection: queueConfig.connection,
60+
concurrency: this.getConcurrencyForQueue(queueName),
61+
autorun: true,
3062
}
63+
)
3164

32-
return await jobHandler.handle(job)
33-
},
34-
{
35-
connection: queueConfig.connection,
36-
concurrency: 3,
37-
autorun: true,
38-
}
39-
)
40-
41-
worker.on('failed', (job, err) => {
42-
this.logger.error(`Job failed: ${job?.id}, Error: ${err.message}`)
43-
})
65+
worker.on('failed', (job, err) => {
66+
this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`)
67+
})
4468

45-
worker.on('completed', (job) => {
46-
this.logger.info(`Job completed: ${job.id}`)
47-
})
69+
worker.on('completed', (job) => {
70+
this.logger.info(`[${queueName}] Job completed: ${job.id}`)
71+
})
4872

49-
this.logger.info(`Worker started for queue: ${queueName}`)
73+
workers.push(worker)
74+
this.logger.info(`Worker started for queue: ${queueName}`)
75+
}
5076

77+
// Graceful shutdown for all workers
5178
process.on('SIGTERM', async () => {
52-
this.logger.info('SIGTERM received. Shutting down worker...')
53-
await worker.close()
54-
this.logger.info('Worker shut down gracefully.')
79+
this.logger.info('SIGTERM received. Shutting down workers...')
80+
await Promise.all(workers.map((worker) => worker.close()))
81+
this.logger.info('All workers shut down gracefully.')
5582
process.exit(0)
5683
})
5784
}
5885

59-
private async loadJobHandlers() {
86+
private async loadJobHandlers(): Promise<[Map<string, any>, Map<string, string>]> {
6087
const handlers = new Map<string, any>()
88+
const queues = new Map<string, string>()
6189

62-
const { RunDownloadJob } = await import('#jobs/run_download_job')
63-
const { DownloadModelJob } = await import('#jobs/download_model_job')
64-
const { RunBenchmarkJob } = await import('#jobs/run_benchmark_job')
6590
handlers.set(RunDownloadJob.key, new RunDownloadJob())
6691
handlers.set(DownloadModelJob.key, new DownloadModelJob())
6792
handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob())
6893

69-
return handlers
94+
queues.set(RunDownloadJob.key, RunDownloadJob.queue)
95+
queues.set(DownloadModelJob.key, DownloadModelJob.queue)
96+
queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue)
97+
98+
return [handlers, queues]
99+
}
100+
101+
/**
102+
* Get concurrency setting for a specific queue
103+
* Can be customized per queue based on workload characteristics
104+
*/
105+
private getConcurrencyForQueue(queueName: string): number {
106+
const concurrencyMap: Record<string, number> = {
107+
[RunDownloadJob.queue]: 3,
108+
[DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads
109+
[RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results
110+
default: 3,
111+
}
112+
113+
return concurrencyMap[queueName] || concurrencyMap.default
70114
}
71115
}

admin/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
"format": "prettier --write .",
1515
"typecheck": "tsc --noEmit",
1616
"work:downloads": "node ace queue:work --queue=downloads",
17-
"work:model-downloads": "node ace queue:work --queue=model-downloads"
17+
"work:model-downloads": "node ace queue:work --queue=model-downloads",
18+
"work:benchmarks": "node ace queue:work --queue=benchmarks",
19+
"work:all": "node ace queue:work --all"
1820
},
1921
"imports": {
2022
"#controllers/*": "./app/controllers/*.js",

install/entrypoint.sh

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ node ace migration:run --force
1717
echo "Seeding the database..."
1818
node ace db:seed
1919

20-
# Start background workers for queues
21-
echo "Starting background workers for queues..."
22-
node ace queue:work --queue=downloads &
23-
node ace queue:work --queue=model-downloads &
24-
node ace queue:work --queue
20+
# Start background workers for all queues
21+
echo "Starting background workers for all queues..."
22+
node ace queue:work --all &
2523

2624
# Start the AdonisJS application
2725
echo "Starting AdonisJS application..."

0 commit comments

Comments
 (0)