-
Notifications
You must be signed in to change notification settings - Fork 5.2k
RabbitMQ reliability with unlimited retries and publisher confirms #1901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR overhauls the RabbitMQ integration by removing retry limits in publishing and reconnection, adding publisher confirms with persistent messages, enhancing the reconnection logic with exponential backoff and robust error handlers, and introducing a graceful shutdown mechanism. File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider adding an abort signal or exit condition to the infinite retry loops in
emitso that a permanently unavailable channel doesn’t block the event loop or consume resources indefinitely. - Calling
waitForConfirmsafter everypublishcan serialize message flow and hurt throughput – you might batch confirms or use a windowed confirm strategy for better performance. - Double-check that
isReconnectingandreconnectTimerare always cleared/reset on successful reconnection to avoid overlapping timers or stale reconnect state in corner cases.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider adding an abort signal or exit condition to the infinite retry loops in `emit` so that a permanently unavailable channel doesn’t block the event loop or consume resources indefinitely.
- Calling `waitForConfirms` after every `publish` can serialize message flow and hurt throughput – you might batch confirms or use a windowed confirm strategy for better performance.
- Double-check that `isReconnecting` and `reconnectTimer` are always cleared/reset on successful reconnection to avoid overlapping timers or stale reconnect state in corner cases.
## Individual Comments
### Comment 1
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:55` </location>
<code_context>
vhost: url.pathname.slice(1) || '/',
frameMax: frameMax,
- heartbeat: 30, // Add heartbeat of 30 seconds
+ reconnect: true,
+ reconnectBackoffStrategy: 'linear',
+ reconnectExponentialLimit: 120000,
+ reconnectTimeInSeconds: 5,
};
+
</code_context>
<issue_to_address>
Unclear if amqp.connect supports custom reconnect options.
Please verify whether the amqp library actually supports these reconnect options. If not, ensure reconnection is managed by the controller instead.
</issue_to_address>
### Comment 2
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:111` </location>
<code_context>
- autoDelete: false,
- });
+ // Enable publisher confirms
+ channel.confirmSelect();
- this.amqpConnection = connection;
</code_context>
<issue_to_address>
confirmSelect may not be available in all amqp libraries.
Ensure that channel.confirmSelect is supported by your AMQP library to avoid runtime errors.
</issue_to_address>
### Comment 3
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:62` </location>
<code_context>
- local: 'RabbitmqController.connect',
- message: 'Failed to connect to RabbitMQ',
- error: error.message || error,
+ await new Promise<void>((resolve, reject) => {
+ amqp.connect(connectionOptions, (error, connection) => {
+ if (error) {
</code_context>
<issue_to_address>
waitForConfirms may block if no confirmation is received.
Add a timeout to waitForConfirms to prevent the emit method from hanging if confirmations are not received.
Suggested implementation:
```typescript
import { setTimeout as setNodeTimeout } from 'timers';
// ... other imports
```
```typescript
const WAIT_FOR_CONFIRMS_TIMEOUT_MS = 10000; // 10 seconds, adjust as needed
// ... inside RabbitmqController class
async emit(message: any): Promise<void> {
// ... existing code before waitForConfirms
await new Promise<void>((resolve, reject) => {
let settled = false;
const timeout = setNodeTimeout(() => {
if (!settled) {
settled = true;
this.logger.error('waitForConfirms timed out');
reject(new Error('waitForConfirms timed out'));
}
}, WAIT_FOR_CONFIRMS_TIMEOUT_MS);
channel.waitForConfirms()
.then(() => {
if (!settled) {
settled = true;
clearTimeout(timeout);
resolve();
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timeout);
this.logger.error('waitForConfirms failed', err);
reject(err);
}
});
});
// ... existing code after waitForConfirms
}
```
</issue_to_address>
### Comment 4
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:192` </location>
<code_context>
}
- private async ensureConnection(): Promise<boolean> {
- if (!this.amqpChannel) {
- this.logger.warn('AMQP channel is not available, attempting to reconnect...');
- if (!this.isReconnecting) {
</code_context>
<issue_to_address>
Global queue initialization silently returns if channel unavailable.
Callers are not notified of initialization failure. Recommend propagating an error or using a callback to indicate failure.
Suggested implementation:
```typescript
private set channel(channel: amqp.Channel) {
this.amqpChannel = channel;
}
/**
* Initializes the queue. Throws an error if the AMQP channel is not available.
* Callers must handle the error to be notified of initialization failure.
*/
public async initializeQueue(queueName: string): Promise<void> {
if (!this.amqpChannel) {
this.logger.warn('AMQP channel is not available, queue initialization failed.');
throw new Error('AMQP channel is not available');
}
try {
await this.amqpChannel.assertQueue(queueName, { durable: true });
this.logger.info(`Queue "${queueName}" initialized successfully.`);
} catch (err) {
this.logger.error(`Failed to initialize queue "${queueName}": ${err.message}`);
throw err;
}
}
```
- You will need to update all callers of `initializeQueue` to handle the thrown error, either with a try/catch block or by handling the rejected promise.
- If you have a callback-based pattern elsewhere, you can adapt the error propagation to use callbacks instead of throwing.
- Remove any old queue initialization logic that silently returns if the channel is unavailable.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| reconnect: true, | ||
| reconnectBackoffStrategy: 'linear', | ||
| reconnectExponentialLimit: 120000, | ||
| reconnectTimeInSeconds: 5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Unclear if amqp.connect supports custom reconnect options.
Please verify whether the amqp library actually supports these reconnect options. If not, ensure reconnection is managed by the controller instead.
| autoDelete: false, | ||
| }); | ||
| // Enable publisher confirms | ||
| channel.confirmSelect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): confirmSelect may not be available in all amqp libraries.
Ensure that channel.confirmSelect is supported by your AMQP library to avoid runtime errors.
|
|
||
| await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); | ||
|
|
||
| await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); | ||
| // Publish with confirmation | ||
| const published = this.amqpChannel.publish( | ||
| exchangeName, | ||
| event, | ||
| Buffer.from(JSON.stringify(message)), | ||
| { persistent: true } | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): waitForConfirms may block if no confirmation is received.
Add a timeout to waitForConfirms to prevent the emit method from hanging if confirmations are not received.
Suggested implementation:
import { setTimeout as setNodeTimeout } from 'timers';
// ... other importsconst WAIT_FOR_CONFIRMS_TIMEOUT_MS = 10000; // 10 seconds, adjust as needed
// ... inside RabbitmqController class
async emit(message: any): Promise<void> {
// ... existing code before waitForConfirms
await new Promise<void>((resolve, reject) => {
let settled = false;
const timeout = setNodeTimeout(() => {
if (!settled) {
settled = true;
this.logger.error('waitForConfirms timed out');
reject(new Error('waitForConfirms timed out'));
}
}, WAIT_FOR_CONFIRMS_TIMEOUT_MS);
channel.waitForConfirms()
.then(() => {
if (!settled) {
settled = true;
clearTimeout(timeout);
resolve();
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timeout);
this.logger.error('waitForConfirms failed', err);
reject(err);
}
});
});
// ... existing code after waitForConfirms
}| // Verificar novamente se não está em shutdown | ||
| if (this.isShuttingDown) { | ||
| this.logger.info('🛑 Sistema em shutdown durante reconnect timer'); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Global queue initialization silently returns if channel unavailable.
Callers are not notified of initialization failure. Recommend propagating an error or using a callback to indicate failure.
Suggested implementation:
private set channel(channel: amqp.Channel) {
this.amqpChannel = channel;
}
/**
* Initializes the queue. Throws an error if the AMQP channel is not available.
* Callers must handle the error to be notified of initialization failure.
*/
public async initializeQueue(queueName: string): Promise<void> {
if (!this.amqpChannel) {
this.logger.warn('AMQP channel is not available, queue initialization failed.');
throw new Error('AMQP channel is not available');
}
try {
await this.amqpChannel.assertQueue(queueName, { durable: true });
this.logger.info(`Queue "${queueName}" initialized successfully.`);
} catch (err) {
this.logger.error(`Failed to initialize queue "${queueName}": ${err.message}`);
throw err;
}
}- You will need to update all callers of
initializeQueueto handle the thrown error, either with a try/catch block or by handling the rejected promise. - If you have a callback-based pattern elsewhere, you can adapt the error propagation to use callbacks instead of throwing.
- Remove any old queue initialization logic that silently returns if the channel is unavailable.
Summary
• Remove retry limits for RabbitMQ message publishing to prevent event loss
• Add publisher confirms for guaranteed message delivery
• Enhance reconnection logic with exponential backoff and graceful shutdown
Changes
delivery
confirmSelect()andwaitForConfirms()for deliveryguarantees
persistent: trueoption for message durabilityBenefits
Summary by Sourcery
Enhance RabbitMQ integration to ensure zero event loss and reliable delivery by removing fixed retry limits, enabling publisher confirms, making messages persistent, implementing robust exponential backoff reconnections, and adding graceful shutdown.
New Features:
Enhancements: