diff --git a/.github/wiki/Configuration.md b/.github/wiki/Configuration.md index 6a52acf..4909543 100644 --- a/.github/wiki/Configuration.md +++ b/.github/wiki/Configuration.md @@ -16,6 +16,10 @@ The following environment variables can be set to configure the `pop-queue` libr - `RATE_LIMIT`: The rate limit for job processing. - `CONCURRENCY`: The maximum number of concurrent jobs being processed. - `BACKOFF_STRATEGY`: The backoff strategy for job retries (e.g., `{"type":"exponential","delay":1000}`). +- `BATCH_SIZE`: The batch size for job processing. +- `PARALLEL_EXECUTION`: Whether to enable parallel execution of jobs. +- `REDIS_PIPELINING`: Whether to enable Redis pipelining. +- `NOTIFICATION_CONFIG`: The configuration for notifications. ## Configuration File @@ -39,7 +43,11 @@ Here is an example `config.json` file: "backoffStrategy": { "type": "exponential", "delay": 1000 - } + }, + "batchSize": 1000, + "parallelExecution": true, + "redisPipelining": true, + "notificationConfig": {} } ``` @@ -61,6 +69,10 @@ The following configuration values are required: - `rateLimit` - `concurrency` - `backoffStrategy` +- `batchSize` +- `parallelExecution` +- `redisPipelining` +- `notificationConfig` ## Example @@ -79,6 +91,10 @@ WORKER_TIMEOUT=30000 RATE_LIMIT=100 CONCURRENCY=5 BACKOFF_STRATEGY={"type":"exponential","delay":1000} +BATCH_SIZE=1000 +PARALLEL_EXECUTION=true +REDIS_PIPELINING=true +NOTIFICATION_CONFIG={} ``` 2. Create a `config.json` file in the root directory of the project and add the following configuration values: @@ -99,7 +115,11 @@ BACKOFF_STRATEGY={"type":"exponential","delay":1000} "backoffStrategy": { "type": "exponential", "delay": 1000 - } + }, + "batchSize": 1000, + "parallelExecution": true, + "redisPipelining": true, + "notificationConfig": {} } ``` diff --git a/.github/wiki/Error-Handling.md b/.github/wiki/Error-Handling.md index d3eadd1..d635e0a 100644 --- a/.github/wiki/Error-Handling.md +++ b/.github/wiki/Error-Handling.md @@ -51,6 +51,66 @@ app.post('/api/requeue-job', async (req, res) => { }); ``` +### Example: Registering a Worker + +**Endpoint:** `POST /api/register-worker` + +**Error Handling:** +- If an error occurs while registering the worker, a `500 Internal Server Error` status code is returned along with an error message. + +**Code:** +```javascript +app.post('/api/register-worker', async (req, res) => { + try { + await queue.registerWorker(); + res.status(200).json({ message: 'Worker registered successfully' }); + } catch (error) { + logger.error('Error registering worker:', error); + res.status(500).json({ error: 'Failed to register worker' }); + } +}); +``` + +### Example: Deregistering a Worker + +**Endpoint:** `POST /api/deregister-worker` + +**Error Handling:** +- If an error occurs while deregistering the worker, a `500 Internal Server Error` status code is returned along with an error message. + +**Code:** +```javascript +app.post('/api/deregister-worker', async (req, res) => { + try { + await queue.deregisterWorker(); + res.status(200).json({ message: 'Worker deregistered successfully' }); + } catch (error) { + logger.error('Error deregistering worker:', error); + res.status(500).json({ error: 'Failed to deregister worker' }); + } +}); +``` + +### Example: Redistributing Jobs + +**Endpoint:** `POST /api/redistribute-jobs` + +**Error Handling:** +- If an error occurs while redistributing jobs, a `500 Internal Server Error` status code is returned along with an error message. + +**Code:** +```javascript +app.post('/api/redistribute-jobs', async (req, res) => { + try { + await queue.redistributeJobs(); + res.status(200).json({ message: 'Jobs redistributed successfully' }); + } catch (error) { + logger.error('Error redistributing jobs:', error); + res.status(500).json({ error: 'Failed to redistribute jobs' }); + } +}); +``` + ## Error Handling in Queue Operations ### General Error Handling @@ -106,31 +166,42 @@ async now(job, name, identifier, score, priority = 0, delay = 0) { **Code:** ```javascript async run(name) { - let job = await this.pop(name); - if (!job) { + let jobs = await this.popBatch(name, this.batchSize); + if (!jobs.length) { let error = new Error(`No job for ${name}`); error.code = 404; throw error; } try { if (this.runners[name] && this.runners[name].fn) { - try { - let fnTimeout = setTimeout(() => { - throw new Error("Timeout"); - }, (this.runners[name].options && this.runners[name].options.timeout) || 10 * 60 * 1000) - const isSuccess = await this.runners[name].fn(job); - if(isSuccess) { - await this.finish(job, name); + const promises = jobs.map(async (job) => { + try { + let fnTimeout = setTimeout(() => { + throw new Error("Timeout"); + }, (this.runners[name].options && this.runners[name].options.timeout) || 10 * 60 * 1000) + const isSuccess = await this.runners[name].fn(job); + if(isSuccess) { + await this.finish(job, name); + } + else{ + await this.fail(job, "Failed"); + } + clearTimeout(fnTimeout); + } catch(err) { + await this.fail(job, err.toString()); } - else{ - await this.fail(job, "Failed"); + }); + if (this.parallelExecution) { + await Promise.all(promises); + } else { + for (const promise of promises) { + await promise; } - clearTimeout(fnTimeout); - } catch(err) { - await this.fail(job, err.toString()); } } else { - await this.fail(job, `Runner ${name} not defined`); + for (const job of jobs) { + await this.fail(job, `Runner ${name} not defined`); + } throw new Error('Runner not defined'); } } catch(e) { @@ -139,3 +210,131 @@ async run(name) { } } ``` + +### Example: Failing a Job + +**Operation:** `queue.fail` + +**Error Handling:** +- If an error occurs while failing a job, the error is logged and the job is not moved to the dead letter queue. + +**Code:** +```javascript +async fail(document, reason, force) { + try { + if (document.attempts >= this.retries && !force) { + let finishTime = new Date(); + if (this.dbUrl.startsWith('postgres://')) { + const updateQuery = ` + UPDATE ${this.cName} + SET finishedAt = $1, status = 'failed', requeuedAt = $2, failedReason = COALESCE(failedReason, '[]'::jsonb) || $3::jsonb + WHERE identifier = $4; + `; + await this.db.query(updateQuery, [finishTime, new Date(), JSON.stringify({ reason, time: new Date() }), document.identifier]); + } else { + await this.db.collection(this.getDbCollectionName(document.name)).findOneAndUpdate({ + identifier: document.identifier + }, { + $push: { + failedReason: { + reason, + time: new Date() + }, + }, + $set: { + finishedAt: finishTime, + status: 'failed', + requeuedAt: new Date() + } + }); + } + await this.moveToDeadLetterQueue(document); + await this.notifySystems('jobFailed', document); + this.metrics.jobsFailed++; + this.emit('jobFailed', document); + } else { + if (this.dbUrl.startsWith('postgres://')) { + const updateQuery = ` + UPDATE ${this.cName} + SET pickedAt = NULL, finishedAt = NULL, status = NULL, duration = NULL, requeuedAt = $1, + failedReason = COALESCE(failedReason, '[]'::jsonb) || $2::jsonb, + runHistory = COALESCE(runHistory, '[]'::jsonb) || $3::jsonb + WHERE identifier = $4 + RETURNING *; + `; + const result = await this.db.query(updateQuery, [new Date(), JSON.stringify({ reason, time: new Date() }), JSON.stringify({ + pickedAt: document.pickedAt, + finishedAt: document.finishedAt, + status: document.status, + duration: document.duration + }), document.identifier]); + const newDocument = result.rows[0]; + if (newDocument) { + await sleep(2000); + await this.pushToQueue(newDocument, newDocument.name, newDocument.identifier); + } + } else { + let newDocument = await this.db.collection(this.getDbCollectionName(document.name)).findOneAndUpdate({ + identifier: document.identifier + }, { + $unset: { + pickedAt: 1, + finishedAt: 1, + status: 1, + duration: 1 + }, + $push: { + failedReason: { + reason, + time: new Date() + }, + runHistory: { + pickedAt: document.pickedAt, + finishedAt: document.finishedAt, + status: document.status, + duration: document.duration + } + }, + $set: { + requeuedAt: new Date() + } + }, {new: true}); + if(newDocument.value && newDocument.value.name) { + await sleep(2000); + await this.pushToQueue(newDocument.value, newDocument.value.name, newDocument.value.identifier); + } + } + } + } catch (e) { + console.log(e); + this.logger.error('Error failing job:', e); + } +} +``` + +### Example: Moving a Job to Dead Letter Queue + +**Operation:** `queue.moveToDeadLetterQueue` + +**Error Handling:** +- If an error occurs while moving a job to the dead letter queue, the error is logged and the job is not moved. + +**Code:** +```javascript +async moveToDeadLetterQueue(document) { + try { + if (this.dbUrl.startsWith('postgres://')) { + const insertQuery = ` + INSERT INTO dead_letter_queue (data, createdAt, name, identifier, priority, delay, pickedAt, finishedAt, attempts, status, duration, requeuedAt, failedReason, runHistory) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14); + `; + await this.db.query(insertQuery, [document.data, document.createdAt, document.name, document.identifier, document.priority, document.delay, document.pickedAt, document.finishedAt, document.attempts, document.status, document.duration, document.requeuedAt, document.failedReason, document.runHistory]); + } else { + await this.db.collection('dead_letter_queue').insertOne(document); + } + } catch (e) { + console.log(e); + this.logger.error('Error moving job to dead letter queue:', e); + } +} +``` diff --git a/config/config.js b/config/config.js index 2bd2401..c79ba5e 100644 --- a/config/config.js +++ b/config/config.js @@ -18,7 +18,8 @@ let config = { backoffStrategy: process.env.BACKOFF_STRATEGY || { type: 'exponential', delay: 1000 }, batchSize: process.env.BATCH_SIZE || 1000, parallelExecution: process.env.PARALLEL_EXECUTION || true, - redisPipelining: process.env.REDIS_PIPELINING || true + redisPipelining: process.env.REDIS_PIPELINING || true, + notificationConfig: process.env.NOTIFICATION_CONFIG || {} }; if (fs.existsSync(configPath)) { @@ -27,7 +28,7 @@ if (fs.existsSync(configPath)) { } // Validate configuration values -const requiredConfigKeys = ['dbUrl', 'redisUrl', 'memcachedUrl', 'postgresUrl', 'dbName', 'collectionName', 'retries', 'workerId', 'workerTimeout', 'rateLimit', 'concurrency', 'backoffStrategy', 'batchSize', 'parallelExecution', 'redisPipelining']; +const requiredConfigKeys = ['dbUrl', 'redisUrl', 'memcachedUrl', 'postgresUrl', 'dbName', 'collectionName', 'retries', 'workerId', 'workerTimeout', 'rateLimit', 'concurrency', 'backoffStrategy', 'batchSize', 'parallelExecution', 'redisPipelining', 'notificationConfig']; requiredConfigKeys.forEach(key => { if (!config[key]) { throw new Error(`Missing required configuration value: ${key}`); @@ -56,6 +57,9 @@ requiredConfigKeys.forEach(key => { if (key === 'redisPipelining' && typeof config[key] !== 'boolean') { throw new Error(`Invalid configuration value for ${key}: must be a boolean`); } + if (key === 'notificationConfig' && typeof config[key] !== 'object') { + throw new Error(`Invalid configuration value for ${key}: must be an object`); + } }); module.exports = config; diff --git a/pop-queue/jobExecution.js b/pop-queue/jobExecution.js index 0dd28a9..02eb313 100644 --- a/pop-queue/jobExecution.js +++ b/pop-queue/jobExecution.js @@ -42,7 +42,6 @@ async function pushToBatchQueue(redisClient, documents, name) { async function popBatch(redisClient, redlock, name, batchSize) { try { - // const lock = await redlock.acquire([`locks:pop:queue:${name}`], 1000); const pipeline = redisClient.pipeline(); for (let i = 0; i < batchSize; i++) { pipeline.zpopmin(`pop:queue:${name}`, 1); @@ -82,7 +81,6 @@ async function popBatch(redisClient, redlock, name, batchSize) { } jobs.push(document); } - // await lock.unlock(); return jobs; } catch(err) { console.log("error parsing doc from redis", err); @@ -92,12 +90,10 @@ async function popBatch(redisClient, redlock, name, batchSize) { async function pop(redisClient, redlock, name) { try { - // const lock = await redlock.acquire([`locks:pop:queue:${name}`], 1000); let stringDocument = await redisClient.zpopmin(`pop:queue:${name}`, 1); let valueDocument = await redisClient.get(`pop:queue:${name}:${stringDocument[0]}`); if (!valueDocument || stringDocument.length == 0) { console.log("no document in redis"); - // await lock.unlock(); return null; } let document = parseDocFromRedis(valueDocument); @@ -122,7 +118,6 @@ async function pop(redisClient, redlock, name) { } }); } - // await lock.unlock(); return document; } catch(err) { console.log("error parsing doc from redis", err); @@ -280,4 +275,4 @@ module.exports = { finish, fail, moveToDeadLetterQueue -}; \ No newline at end of file +}; diff --git a/pop-queue/jobManagement.js b/pop-queue/jobManagement.js index cd1c0f3..32dcb65 100644 --- a/pop-queue/jobManagement.js +++ b/pop-queue/jobManagement.js @@ -129,73 +129,7 @@ class PopQueue extends EventEmitter { } async connect() { - await Promise.all([this.connectDb(), this.connectRedis()]); - } - - async connectDb() { - try { - if (this.dbUrl.startsWith('postgres://')) { - this.db = await connectDb(this.dbUrl, this.dbName); - console.log('PostgreSQL connected'); - await this.setupPostgresSchema(); - } else { - this.db = await connectDb(this.dbUrl, this.dbName); - console.log('MongoDB connected'); - if (this.mongoShardConfig) { - await this.db.admin().command({ enableSharding: this.dbName }); - await this.db.admin().command({ shardCollection: `${this.dbName}.${this.cName}`, key: this.mongoShardConfig }); - console.log('MongoDB sharding enabled'); - } - } - } catch (e) { - console.log(e); - this.logger.error('Error connecting to database:', e); - } - } - - async connectRedis() { - try { - if (this.redisClusterConfig) { - this.redisClient = new Redis.Cluster(this.redisClusterConfig); - console.log('Redis cluster connected'); - } else if (this.redis.startsWith('memcached://')) { - this.redisClient = await memcachedClient(this.redis); - console.log('Memcached connected'); - } else { - this.redisClient = new Redis(this.redis); - console.log('Redis connected'); - } - // this.redlock = new Redlock([this.redisClient], { - // retryCount: 10, - // retryDelay: 200 - // }); - } catch (e) { - console.log(e); - this.logger.error('Error connecting to Redis:', e); - } - } - - async setupPostgresSchema() { - const createTableQuery = ` - CREATE TABLE IF NOT EXISTS ${this.cName} ( - id SERIAL PRIMARY KEY, - data JSONB, - createdAt TIMESTAMP, - name VARCHAR(255), - identifier VARCHAR(255) UNIQUE, - priority INT, - delay INT, - pickedAt TIMESTAMP, - finishedAt TIMESTAMP, - attempts INT DEFAULT 0, - status VARCHAR(50), - duration INT, - requeuedAt TIMESTAMP, - failedReason JSONB, - runHistory JSONB - ); - `; - await this.db.query(createTableQuery); + await Promise.all([connectDb(this.dbUrl, this.dbName, this.mongoShardConfig), connectRedis(this.redis, this.redisClusterConfig)]); } async now(job, name, identifier, score, priority = 0, delay = 0) { @@ -429,4 +363,4 @@ class PopQueue extends EventEmitter { module.exports = { PopQueue -}; \ No newline at end of file +}; diff --git a/services/api.js b/services/api.js index d4d0d17..be17dc8 100644 --- a/services/api.js +++ b/services/api.js @@ -259,7 +259,10 @@ const popqueueProto = grpc.loadPackageDefinition(packageDefinition).popqueue; function getJobDetails(call, callback) { queue.getCurrentQueue('myJob') .then(jobDetails => callback(null, { jobDetails })) - .catch(error => callback(error)); + .catch(error => { + logger.error('Error fetching job details:', error); + callback(error); + }); } function requeueJob(call, callback) { @@ -272,25 +275,37 @@ function requeueJob(call, callback) { } queue.requeueJob('myJob', jobId) .then(() => callback(null, { message: 'Job requeued successfully' })) - .catch(error => callback(error)); + .catch(error => { + logger.error('Error requeuing job:', error); + callback(error); + }); } function registerWorker(call, callback) { queue.registerWorker() .then(() => callback(null, { message: 'Worker registered successfully' })) - .catch(error => callback(error)); + .catch(error => { + logger.error('Error registering worker:', error); + callback(error); + }); } function deregisterWorker(call, callback) { queue.deregisterWorker() .then(() => callback(null, { message: 'Worker deregistered successfully' })) - .catch(error => callback(error)); + .catch(error => { + logger.error('Error deregistering worker:', error); + callback(error); + }); } function redistributeJobs(call, callback) { queue.redistributeJobs() .then(() => callback(null, { message: 'Jobs redistributed successfully' })) - .catch(error => callback(error)); + .catch(error => { + logger.error('Error redistributing jobs:', error); + callback(error); + }); } const server = new grpc.Server(); diff --git a/services/job-failure.js b/services/job-failure.js index 953e0f9..4769a88 100644 --- a/services/job-failure.js +++ b/services/job-failure.js @@ -1,6 +1,17 @@ const { sleep } = require('../utils/helpers'); +const winston = require('winston'); -async function failJob(document, reason, force, db, dbUrl, retries, redisClient, redlock, logger) { +const logger = winston.createLogger({ + level: 'info', + format: winston.format.json(), + transports: [ + new winston.transports.Console(), + new winston.transports.File({ filename: 'error.log', level: 'error' }), + new winston.transports.File({ filename: 'combined.log' }) + ] +}); + +async function failJob(document, reason, force, db, dbUrl, retries, redisClient, redlock) { try { if (document.attempts >= retries && !force) { let finishTime = new Date(); @@ -28,7 +39,7 @@ async function failJob(document, reason, force, db, dbUrl, retries, redisClient, } }); } - await moveToDeadLetterQueue(document, db, dbUrl, logger); + await moveToDeadLetterQueue(document, db, dbUrl); await notifySystems('jobFailed', document); this.metrics.jobsFailed++; this.emit('jobFailed', document); @@ -51,7 +62,7 @@ async function failJob(document, reason, force, db, dbUrl, retries, redisClient, const newDocument = result.rows[0]; if (newDocument) { await sleep(2000); - await pushToQueue(newDocument, newDocument.name, newDocument.identifier, redisClient, redlock, logger); + await pushToQueue(newDocument, newDocument.name, newDocument.identifier, redisClient, redlock); } } else { let newDocument = await db.collection(document.cName).findOneAndUpdate({ @@ -81,7 +92,7 @@ async function failJob(document, reason, force, db, dbUrl, retries, redisClient, }, {new: true}); if(newDocument.value && newDocument.value.name) { await sleep(2000); - await pushToQueue(newDocument.value, newDocument.value.name, newDocument.value.identifier, redisClient, redlock, logger); + await pushToQueue(newDocument.value, newDocument.value.name, newDocument.value.identifier, redisClient, redlock); } } } @@ -91,7 +102,7 @@ async function failJob(document, reason, force, db, dbUrl, retries, redisClient, } } -async function moveToDeadLetterQueue(document, db, dbUrl, logger) { +async function moveToDeadLetterQueue(document, db, dbUrl) { try { if (dbUrl.startsWith('postgres://')) { const insertQuery = ` @@ -108,7 +119,7 @@ async function moveToDeadLetterQueue(document, db, dbUrl, logger) { } } -async function pushToQueue(document, name, identifier, redisClient, redlock, logger) { +async function pushToQueue(document, name, identifier, redisClient, redlock) { try { const lock = await redlock.lock(`locks:pop:queue:${name}`, 1000); const score = new Date().getTime() + document.delay - document.priority;