Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ on:
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: write
id-token: write
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -32,8 +35,7 @@ jobs:
id: release

- name: Publish to NPM Registry
run: cd build && npm publish --access public
run: cd build && npm publish --provenance --access public
if: steps.release.outputs.released == 'true'
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
name: Deploy
3,179 changes: 1,736 additions & 1,443 deletions package-lock.json

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@athenna/queue",
"version": "5.18.0",
"version": "5.19.0",
"description": "The Athenna queue handler.",
"license": "MIT",
"author": "João Lenon <lenon@athenna.io>",
Expand Down Expand Up @@ -55,20 +55,20 @@
"#tests": "./tests/index.js"
},
"dependencies": {
"@aws-sdk/client-sqs": "^3.859.0"
"@aws-sdk/client-sqs": "^3.1009.0"
},
"devDependencies": {
"@athenna/artisan": "^5.7.0",
"@athenna/common": "^5.14.0",
"@athenna/config": "^5.4.0",
"@athenna/database": "^5.27.0",
"@athenna/artisan": "^5.11.0",
"@athenna/common": "^5.30.0",
"@athenna/config": "^5.6.0",
"@athenna/database": "^5.39.0",
"@athenna/ioc": "^5.2.0",
"@athenna/logger": "^5.7.0",
"@athenna/test": "^5.5.0",
"@athenna/logger": "^5.14.0",
"@athenna/test": "^5.6.0",
"@athenna/tsconfig": "^5.0.0",
"@athenna/view": "^5.4.0",
"@typescript-eslint/eslint-plugin": "^8.38.0",
"@typescript-eslint/parser": "^8.38.0",
"@typescript-eslint/eslint-plugin": "^8.57.0",
"@typescript-eslint/parser": "^8.57.0",
"better-sqlite3": "^10.1.0",
"commitizen": "^4.3.1",
"cz-conventional-changelog": "^3.3.0",
Expand Down
60 changes: 12 additions & 48 deletions src/drivers/AwsSqsDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import {
ChangeMessageVisibilityCommand
} from '@aws-sdk/client-sqs'

import { Log } from '@athenna/logger'
import { createHash } from 'node:crypto'
import { Driver } from '#src/drivers/Driver'
import { Is, Options, Uuid } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { AwsSqsDriverExceptionHandler } from '#src/handlers/AwsSqsDriverExceptionHandler'
import { NotFifoSqsQueueTypeException } from '#src/exceptions/NotFifoSqsQueueTypeException'

export class AwsSqsDriver extends Driver<SQSClient> {
Expand All @@ -39,7 +39,7 @@ export class AwsSqsDriver extends Driver<SQSClient> {
/**
* Convert milliseconds to seconds.
*/
private msToS(v: number) {
public msToS(v: number) {
const s = Math.ceil(v / 1000)
return Math.max(0, Math.min(43200, s))
}
Expand Down Expand Up @@ -430,57 +430,21 @@ export class AwsSqsDriver extends Driver<SQSClient> {
this.msToS(this.noAckDelayMs + requeueJitterMs)
)
}
} catch (err) {
stopHeartbeat()

const receiveCount = Number(
job.metadata.Attributes?.ApproximateReceiveCount ?? '1'
)
const attempts = Math.max(this.attempts - receiveCount, 0)
const shouldRetry = attempts > 0

if (Config.is('worker.logger.prettifyException')) {
Log.channelOrVanilla('exception').error(
await err.toAthennaException().prettify()
)
} else {
Log.channelOrVanilla('exception').error({
msg: `failed to process job: ${err.message}`,
queue: this.queueName,
deadletter: this.deadletter,
name: err.name,
code: err.code,
help: err.help,
details: err.details,
metadata: err.metadata,
stack: err.stack,
job
})
}

if (shouldRetry) {
const delay = this.calculateBackoffDelay(job.attempts)

await this.changeJobVisibility(
job.id,
this.msToS(delay + requeueJitterMs)
)

return
}

if (this.deadletter) {
await this.sendJobToDLQ(job)
}

await this.ack(job.id)
} catch (error) {
await new AwsSqsDriverExceptionHandler().handle({
job,
error,
driver: this,
stopHeartbeat,
requeueJitterMs
})
}
}

/**
* Send a job to the deadletter quue.
*/
private async sendJobToDLQ(job: any) {
public async sendJobToDLQ(job: any) {
if (Is.Object(job.data)) {
job.data = JSON.stringify(job.data)
}
Expand All @@ -503,7 +467,7 @@ export class AwsSqsDriver extends Driver<SQSClient> {
/**
* Change the job visibility values in SQS.
*/
private async changeJobVisibility(id: string, seconds: number) {
public async changeJobVisibility(id: string, seconds: number) {
const cmd = new ChangeMessageVisibilityCommand({
QueueUrl: this.queueName,
ReceiptHandle: id,
Expand Down
63 changes: 9 additions & 54 deletions src/drivers/DatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
* file that was distributed with this source code.
*/

import { Log } from '@athenna/logger'
import { Config } from '@athenna/config'
import { Driver } from '#src/drivers/Driver'
import { Is, Options } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import type { DatabaseImpl } from '@athenna/database'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { DatabaseDriverExceptionHandler } from '#src/handlers/DatabaseDriverExceptionHandler'

export class DatabaseDriver extends Driver<DatabaseImpl> {
/**
Expand Down Expand Up @@ -229,12 +229,7 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
* ```
*/
public async length() {
const count = await this.client
.table(this.table)
.where('queue', this.queueName)
.count()

return parseInt(count)
return this.client.table(this.table).where('queue', this.queueName).count()
}

/**
Expand Down Expand Up @@ -356,53 +351,13 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
reservedUntil: job.reservedUntil
})
}
} catch (err) {
const shouldRetry = job.attempts > 0

if (Config.is('worker.logger.prettifyException')) {
Log.channelOrVanilla('exception').error(
await err.toAthennaException().prettify()
)
} else {
Log.channelOrVanilla('exception').error({
msg: `failed to process job: ${err.message}`,
queue: this.queueName,
deadletter: this.deadletter,
name: err.name,
code: err.code,
help: err.help,
details: err.details,
metadata: err.metadata,
stack: err.stack,
job
})
}

if (!shouldRetry) {
await this.ack(job.id)

if (this.deadletter) {
await this.client.table(this.table).create({
...job,
queue: this.deadletter,
reservedUntil: null,
attempts: 0
})
}

return
}

await this.client
.table(this.table)
.where('id', job.id)
.update({
reservedUntil: null,
availableAt:
Date.now() +
this.calculateBackoffDelay(job.attempts) +
requeueJitterMs
})
} catch (error) {
await new DatabaseDriverExceptionHandler().handle({
job,
error,
driver: this,
requeueJitterMs
})
}
}
}
2 changes: 1 addition & 1 deletion src/drivers/Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export abstract class Driver<Client = any> {

let { type, delay, jitter } = this.backoff

if (jitter < 0) {
if (!jitter || jitter < 0) {
jitter = 0
}

Expand Down
8 changes: 8 additions & 0 deletions src/drivers/FakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ export class FakeDriver {
*/
public static async ack() {}

/**
* Calculate the heartbeat delay. Used to define if job is still
* running.
*/
public static calculateHeartbeatDelay() {
return 0
}

/**
* Process the next job of the queue with a handler.
*
Expand Down
50 changes: 8 additions & 42 deletions src/drivers/MemoryDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
* file that was distributed with this source code.
*/

import { Log } from '@athenna/logger'
import { Driver } from '#src/drivers/Driver'
import { Options, Uuid } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { MemoryDriverExceptionHandler } from '#src/handlers/MemoryDriverExceptionHandler'

export class MemoryDriver extends Driver {
/**
Expand Down Expand Up @@ -280,47 +280,13 @@ export class MemoryDriver extends Driver {
job.reservedUntil = null
job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs
}
} catch (err) {
const shouldRetry = job.attempts > 0

job.reservedUntil = null

if (Config.is('worker.logger.prettifyException')) {
Log.channelOrVanilla('exception').error(
await err.toAthennaException().prettify()
)
} else {
Log.channelOrVanilla('exception').error({
msg: `failed to process job: ${err.message}`,
queue: this.queueName,
deadletter: this.deadletter,
name: err.name,
code: err.code,
help: err.help,
details: err.details,
metadata: err.metadata,
stack: err.stack,
job
})
}

if (shouldRetry) {
job.availableAt =
Date.now() +
this.calculateBackoffDelay(job.attempts) +
requeueJitterMs

return
}

await this.ack(job.id)

if (this.deadletter) {
this.client.queues[this.deadletter].push({
...job,
attempts: 0
})
}
} catch (error) {
await new MemoryDriverExceptionHandler().handle({
job,
error,
driver: this,
requeueJitterMs
})
}
}
}
Loading
Loading