Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/contracts/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,14 @@ export interface Adapter {
* @param jobId - The job ID to complete
* @param queue - The queue the job belongs to
* @param removeOnComplete - Optional retention policy for completed jobs
* @param output - Optional output returned by the job
*/
completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void>
completeJob(
jobId: string,
queue: string,
removeOnComplete?: JobRetention,
output?: any
): Promise<void>

/**
* Mark a job as failed permanently and remove it from the queue.
Expand Down
95 changes: 41 additions & 54 deletions src/drivers/knex_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ export class KnexAdapter implements Adapter {

// Update job to active status
// For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim
const updateQuery = trx(this.#jobsTable)
.where('id', job.id)
.where('queue', queue)
const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue)

if (!this.#supportsSkipLocked()) {
updateQuery.where('status', 'pending')
Expand Down Expand Up @@ -178,19 +176,21 @@ export class KnexAdapter implements Adapter {
const priority = jobData.priority ?? DEFAULT_PRIORITY
const score = calculateScore(priority, now)

await trx(this.#jobsTable)
.where('id', job.id)
.where('queue', queue)
.update({
status: 'pending',
score,
execute_at: null,
})
await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({
status: 'pending',
score,
execute_at: null,
})
}
})
}

async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void> {
async completeJob(
jobId: string,
queue: string,
removeOnComplete?: JobRetention,
output?: any
): Promise<void> {
const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete)

if (!keep) {
Expand All @@ -213,6 +213,7 @@ export class KnexAdapter implements Adapter {
worker_id: null,
acquired_at: null,
finished_at: now,
output: output ? JSON.stringify(output) : null,
})

if (!updated) {
Expand Down Expand Up @@ -276,6 +277,7 @@ export class KnexAdapter implements Adapter {
status: row.status as JobStatus,
data: jobData,
finishedAt: row.finished_at ? Number(row.finished_at) : undefined,
output: row.output ? JSON.parse(row.output) : undefined,
error: row.error || undefined,
}
}
Expand Down Expand Up @@ -331,33 +333,27 @@ export class KnexAdapter implements Adapter {

if (retryAt && retryAt.getTime() > now) {
// Move to delayed
await this.#connection(this.#jobsTable)
.where('id', jobId)
.where('queue', queue)
.update({
status: 'delayed',
data: updatedData,
worker_id: null,
acquired_at: null,
score: null,
execute_at: retryAt.getTime(),
})
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
status: 'delayed',
data: updatedData,
worker_id: null,
acquired_at: null,
score: null,
execute_at: retryAt.getTime(),
})
} else {
// Move back to pending
const priority = jobData.priority ?? DEFAULT_PRIORITY
const score = calculateScore(priority, now)

await this.#connection(this.#jobsTable)
.where('id', jobId)
.where('queue', queue)
.update({
status: 'pending',
data: updatedData,
worker_id: null,
acquired_at: null,
score,
execute_at: null,
})
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
status: 'pending',
data: updatedData,
worker_id: null,
acquired_at: null,
score,
execute_at: null,
})
}
}

Expand Down Expand Up @@ -458,10 +454,7 @@ export class KnexAdapter implements Adapter {

if (currentStalledCount >= maxStalledCount) {
// Fail permanently - remove the job
await trx(this.#jobsTable)
.where('id', row.id)
.where('queue', queue)
.delete()
await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete()
} else {
// Recover: increment stalledCount and put back in pending
jobData.stalledCount = currentStalledCount + 1
Expand Down Expand Up @@ -534,9 +527,9 @@ export class KnexAdapter implements Adapter {
}

async getSchedule(id: string): Promise<ScheduleData | null> {
const row = (await this.#connection(this.#schedulesTable)
.where('id', id)
.first()) as ScheduleRow | undefined
const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as
| ScheduleRow
| undefined
if (!row) return null

return this.#rowToScheduleData(row)
Expand Down Expand Up @@ -565,16 +558,12 @@ export class KnexAdapter implements Adapter {
if (updates.runCount !== undefined) data.run_count = updates.runCount

if (Object.keys(data).length > 0) {
await this.#connection(this.#schedulesTable)
.where('id', id)
.update(data)
await this.#connection(this.#schedulesTable).where('id', id).update(data)
}
}

async deleteSchedule(id: string): Promise<void> {
await this.#connection(this.#schedulesTable)
.where('id', id)
.delete()
await this.#connection(this.#schedulesTable).where('id', id).delete()
}

async claimDueSchedule(): Promise<ScheduleData | null> {
Expand Down Expand Up @@ -629,13 +618,11 @@ export class KnexAdapter implements Adapter {
}

// Update atomically
await trx(this.#schedulesTable)
.where('id', row.id)
.update({
next_run_at: nextRunAt,
last_run_at: now,
run_count: newRunCount,
})
await trx(this.#schedulesTable).where('id', row.id).update({
next_run_at: nextRunAt,
last_run_at: now,
run_count: newRunCount,
})

// Return schedule data (before update state for payload)
return this.#rowToScheduleData(row)
Expand Down
10 changes: 10 additions & 0 deletions src/exceptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ export const E_JOB_NOT_FOUND = createError<[jobName: string]>(
'E_JOB_NOT_FOUND'
)

export const E_JOB_EXECUTION_NOT_FOUND = createError<[jobId: string]>(
'The job execution "%s" could not be found',
'E_JOB_EXECUTION_NOT_FOUND'
)

export const E_JOB_EXECUTION_FAILED = createError<[jobId: string]>(
'The job execution "%s" failed',
'E_JOB_EXECUTION_FAILED'
)

export const E_JOB_MAX_ATTEMPTS_REACHED = createError<[jobName: string]>(
'The job "%s" has reached the maximum number of retry attempts',
'E_JOB_MAX_ATTEMPTS_REACHED'
Expand Down
11 changes: 7 additions & 4 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import type { JobContext, JobOptions } from './types/main.js'
* }
* ```
*/
export abstract class Job<Payload = any> {
export abstract class Job<Payload = any, Output = any> {
#payload!: Payload
#context!: JobContext
#signal?: AbortSignal
Expand Down Expand Up @@ -173,12 +173,15 @@ export abstract class Job<Payload = any> {
static dispatch<T extends Job>(
this: abstract new (...args: any[]) => T,
payload: T extends Job<infer P> ? P : never
): JobDispatcher<T extends Job<infer P> ? P : never> {
): JobDispatcher<T extends Job<infer P> ? P : never, T extends Job<any, infer O> ? O : never> {
const jobClass = this as unknown as { options?: JobOptions; name: string }
const options = jobClass.options || {}
const jobName = options.name || this.name

const dispatcher = new JobDispatcher<T extends Job<infer P> ? P : never>(jobName, payload)
const dispatcher = new JobDispatcher<
T extends Job<infer P> ? P : never,
T extends Job<any, infer O> ? O : never
>(jobName, payload)

if (options.queue) {
dispatcher.toQueue(options.queue)
Expand Down Expand Up @@ -305,7 +308,7 @@ export abstract class Job<Payload = any> {
* }
* ```
*/
abstract execute(): Promise<void>
abstract execute(): Promise<Output>

/**
* Called when the job has permanently failed (after all retries exhausted).
Expand Down
41 changes: 38 additions & 3 deletions src/job_dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { dispatchChannel } from './tracing_channels.js'
import type { Adapter } from './contracts/adapter.js'
import type { DispatchResult, Duration } from './types/main.js'
import type { JobDispatchMessage } from './types/tracing_channels.js'
import { setTimeout } from 'node:timers/promises'
import { parse } from './utils.js'
import { E_JOB_EXECUTION_FAILED, E_JOB_EXECUTION_NOT_FOUND } from './exceptions.ts'

/**
* Fluent builder for dispatching jobs to the queue.
Expand Down Expand Up @@ -39,9 +41,9 @@ import { parse } from './utils.js'
* await ReminderJob.dispatch({ userId: 123 }).in('24h')
* ```
*/
export class JobDispatcher<T> {
export class JobDispatcher<TPayload, TOutput> {
readonly #name: string
readonly #payload: T
readonly #payload: TPayload
#queue: string = 'default'
#adapter?: string | (() => Adapter)
#delay?: Duration
Expand All @@ -54,7 +56,7 @@ export class JobDispatcher<T> {
* @param name - The job class name (used to locate the class at runtime)
* @param payload - The data to pass to the job
*/
constructor(name: string, payload: T) {
constructor(name: string, payload: TPayload) {
this.#name = name
this.#payload = payload
}
Expand Down Expand Up @@ -211,6 +213,39 @@ export class JobDispatcher<T> {
return { jobId: id }
}

/**
* Dispatch the job to the queue and
* await for job to complete or fail.
*
* @param pollingInterval - Interval between each check
* @param signal - Optional signal to abort waiting
* @returns The job output
*/
async wait(pollingInterval: Duration = 2000, signal?: AbortSignal): Promise<TOutput> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add a default timeout based on the job timeout option.
Could make DX simpler with an explicit timeout option (less flexible than signal)

const adapter = this.#getAdapterInstance()
const dispatchResult = await this.run()

while (true) {
signal?.throwIfAborted()

await setTimeout(parse(pollingInterval))

const job = await adapter.getJob(dispatchResult.jobId, this.#queue)

if (!job) {
throw new E_JOB_EXECUTION_NOT_FOUND([dispatchResult.jobId])
}

if (job.status === 'completed') {
return job.output
}

if (job.status === 'failed') {
throw new E_JOB_EXECUTION_FAILED([dispatchResult.jobId], { cause: job.error })
}
}
}

/**
* Thenable implementation for auto-dispatch when awaited.
*
Expand Down
4 changes: 2 additions & 2 deletions src/job_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class JobExecutionRuntime {
/**
* Execute a hydrated job instance and enforce the configured timeout.
*/
async execute(instance: Job, payload: unknown, context: JobContext): Promise<void> {
async execute(instance: Job, payload: unknown, context: JobContext): Promise<unknown> {
if (this.#timeout === undefined) {
instance.$hydrate(payload, context)
return instance.execute()
Expand All @@ -90,7 +90,7 @@ export class JobExecutionRuntime {
)

try {
await Promise.race([instance.execute(), abortPromise])
return Promise.race([instance.execute(), abortPromise])
} finally {
cleanupAbortListener()
}
Expand Down
6 changes: 2 additions & 4 deletions src/services/queue_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class QueueSchemaService {
table.string('queue', 255).notNullable()
table.enu('status', ['pending', 'active', 'delayed', 'completed', 'failed']).notNullable()
table.text('data').notNullable()
table.text('output').nullable()
table.bigint('score').unsigned().nullable()
table.string('worker_id', 255).nullable()
table.bigint('acquired_at').unsigned().nullable()
Expand Down Expand Up @@ -57,10 +58,7 @@ export class QueueSchemaService {
table.integer('run_count').unsigned().notNullable().defaultTo(0)
table.timestamp('next_run_at').nullable()
table.timestamp('last_run_at').nullable()
table
.timestamp('created_at')
.notNullable()
.defaultTo(this.#connection.fn.now())
table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now())
table.index(['status', 'next_run_at'])

extend?.(table)
Expand Down
2 changes: 2 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ export interface JobRecord {
finishedAt?: number
/** Error message (for failed jobs) */
error?: string
/** Serialized job output */
output?: any
}

/**
Expand Down
Loading
Loading