Skip to content

Conversation

@LuisSantosJS
Copy link
Contributor

@LuisSantosJS LuisSantosJS commented Sep 2, 2025

Summary

• Remove retry limits for RabbitMQ message publishing to prevent event loss
• Add publisher confirms for guaranteed message delivery
• Enhance reconnection logic with exponential backoff and graceful shutdown

Changes

  • Unlimited retries: Replace 3-attempt limit with infinite retry loop until successful
    delivery
  • Publisher confirms: Enable confirmSelect() and waitForConfirms() for delivery
    guarantees
  • Persistent messages: Add persistent: true option for message durability
  • Better reconnection: Improved backoff strategy and connection state management
  • Graceful shutdown: Proper cleanup with connection/channel closing

Benefits

  • Zero event loss: Events are never discarded due to temporary connectivity issues
  • Delivery guarantee: Messages are confirmed by RabbitMQ before considering them sent
  • Improved resilience: Better handling of network interruptions and broker restarts

Summary by Sourcery

Enhance RabbitMQ integration to ensure zero event loss and reliable delivery by removing fixed retry limits, enabling publisher confirms, making messages persistent, implementing robust exponential backoff reconnections, and adding graceful shutdown.

New Features:

  • Enable publisher confirms (confirmSelect and waitForConfirms) to guarantee message delivery
  • Mark messages as persistent to survive broker restarts
  • Add a shutdown method to gracefully close RabbitMQ channel and connection

Enhancements:

  • Replace fixed 3-attempt retry limit with infinite retry loops for publishing until success
  • Implement exponential backoff reconnection strategy with capped max delay
  • Automatically handle connection and channel errors by scheduling unlimited reconnect attempts
  • Cancel reconnection timers during shutdown to prevent further retries

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Sep 2, 2025

Reviewer's Guide

This PR overhauls the RabbitMQ integration by removing retry limits in publishing and reconnection, adding publisher confirms with persistent messages, enhancing the reconnection logic with exponential backoff and robust error handlers, and introducing a graceful shutdown mechanism.

File-Level Changes

Change Details Files
Unlimited retry mechanism for publishing and reconnection
  • Replaced fixed 3-attempt loops with infinite while loops for publish retries
  • Removed maxReconnectAttempts limit and old retry counters
  • Implemented infinite reconnection attempts with exponential backoff delays
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Publisher confirms and persistent message delivery
  • Enabled channel.confirmSelect() and waitForConfirms() for guaranteed delivery
  • Added persistent: true option to publish calls
  • Wrapped publish calls with buffer checks and confirmation error handling
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Enhanced reconnection and error handling logic
  • Introduced reconnectAttempt, base/maxReconnectDelay, isReconnecting, isShuttingDown flags
  • Refactored event handlers with handleConnectionError and handleChannelError methods
  • Upgraded scheduleReconnect() to use exponential backoff and clear timers on shutdown
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Graceful shutdown support
  • Added shutdown() method to cancel reconnect timers and close channel/connection
  • Guarded reconnection logic with isShuttingDown to halt new attempts
  • Logged orderly shutdown steps for channel and connection cleanup
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider adding an abort signal or exit condition to the infinite retry loops in emit so that a permanently unavailable channel doesn’t block the event loop or consume resources indefinitely.
  • Calling waitForConfirms after every publish can serialize message flow and hurt throughput – you might batch confirms or use a windowed confirm strategy for better performance.
  • Double-check that isReconnecting and reconnectTimer are always cleared/reset on successful reconnection to avoid overlapping timers or stale reconnect state in corner cases.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider adding an abort signal or exit condition to the infinite retry loops in `emit` so that a permanently unavailable channel doesn’t block the event loop or consume resources indefinitely.
- Calling `waitForConfirms` after every `publish` can serialize message flow and hurt throughput – you might batch confirms or use a windowed confirm strategy for better performance.
- Double-check that `isReconnecting` and `reconnectTimer` are always cleared/reset on successful reconnection to avoid overlapping timers or stale reconnect state in corner cases.

## Individual Comments

### Comment 1
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:55` </location>
<code_context>
         vhost: url.pathname.slice(1) || '/',
         frameMax: frameMax,
-        heartbeat: 30, // Add heartbeat of 30 seconds
+        reconnect: true,
+        reconnectBackoffStrategy: 'linear',
+        reconnectExponentialLimit: 120000,
+        reconnectTimeInSeconds: 5,
       };
+      
</code_context>

<issue_to_address>
Unclear if amqp.connect supports custom reconnect options.

Please verify whether the amqp library actually supports these reconnect options. If not, ensure reconnection is managed by the controller instead.
</issue_to_address>

### Comment 2
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:111` </location>
<code_context>
-            autoDelete: false,
-          });
+            // Enable publisher confirms
+            channel.confirmSelect();

-          this.amqpConnection = connection;
</code_context>

<issue_to_address>
confirmSelect may not be available in all amqp libraries.

Ensure that channel.confirmSelect is supported by your AMQP library to avoid runtime errors.
</issue_to_address>

### Comment 3
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:62` </location>
<code_context>
-            local: 'RabbitmqController.connect',
-            message: 'Failed to connect to RabbitMQ',
-            error: error.message || error,
+      await new Promise<void>((resolve, reject) => {
+        amqp.connect(connectionOptions, (error, connection) => {
+          if (error) {
</code_context>

<issue_to_address>
waitForConfirms may block if no confirmation is received.

Add a timeout to waitForConfirms to prevent the emit method from hanging if confirmations are not received.

Suggested implementation:

```typescript
import { setTimeout as setNodeTimeout } from 'timers';
// ... other imports

```

```typescript
const WAIT_FOR_CONFIRMS_TIMEOUT_MS = 10000; // 10 seconds, adjust as needed

// ... inside RabbitmqController class

  async emit(message: any): Promise<void> {
    // ... existing code before waitForConfirms

    await new Promise<void>((resolve, reject) => {
      let settled = false;
      const timeout = setNodeTimeout(() => {
        if (!settled) {
          settled = true;
          this.logger.error('waitForConfirms timed out');
          reject(new Error('waitForConfirms timed out'));
        }
      }, WAIT_FOR_CONFIRMS_TIMEOUT_MS);

      channel.waitForConfirms()
        .then(() => {
          if (!settled) {
            settled = true;
            clearTimeout(timeout);
            resolve();
          }
        })
        .catch((err) => {
          if (!settled) {
            settled = true;
            clearTimeout(timeout);
            this.logger.error('waitForConfirms failed', err);
            reject(err);
          }
        });
    });

    // ... existing code after waitForConfirms
  }

```
</issue_to_address>

### Comment 4
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:192` </location>
<code_context>
   }

-  private async ensureConnection(): Promise<boolean> {
-    if (!this.amqpChannel) {
-      this.logger.warn('AMQP channel is not available, attempting to reconnect...');
-      if (!this.isReconnecting) {
</code_context>

<issue_to_address>
Global queue initialization silently returns if channel unavailable.

Callers are not notified of initialization failure. Recommend propagating an error or using a callback to indicate failure.

Suggested implementation:

```typescript
  private set channel(channel: amqp.Channel) {
    this.amqpChannel = channel;
  }

  /**
   * Initializes the queue. Throws an error if the AMQP channel is not available.
   * Callers must handle the error to be notified of initialization failure.
   */
  public async initializeQueue(queueName: string): Promise<void> {
    if (!this.amqpChannel) {
      this.logger.warn('AMQP channel is not available, queue initialization failed.');
      throw new Error('AMQP channel is not available');
    }
    try {
      await this.amqpChannel.assertQueue(queueName, { durable: true });
      this.logger.info(`Queue "${queueName}" initialized successfully.`);
    } catch (err) {
      this.logger.error(`Failed to initialize queue "${queueName}": ${err.message}`);
      throw err;
    }
  }

```

- You will need to update all callers of `initializeQueue` to handle the thrown error, either with a try/catch block or by handling the rejected promise.
- If you have a callback-based pattern elsewhere, you can adapt the error propagation to use callbacks instead of throwing.
- Remove any old queue initialization logic that silently returns if the channel is unavailable.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +55 to +58
reconnect: true,
reconnectBackoffStrategy: 'linear',
reconnectExponentialLimit: 120000,
reconnectTimeInSeconds: 5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Unclear if amqp.connect supports custom reconnect options.

Please verify whether the amqp library actually supports these reconnect options. If not, ensure reconnection is managed by the controller instead.

autoDelete: false,
});
// Enable publisher confirms
channel.confirmSelect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): confirmSelect may not be available in all amqp libraries.

Ensure that channel.confirmSelect is supported by your AMQP library to avoid runtime errors.

Comment on lines 331 to +340

await this.amqpChannel.bindQueue(queueName, exchangeName, eventName);

await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
// Publish with confirmation
const published = this.amqpChannel.publish(
exchangeName,
event,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): waitForConfirms may block if no confirmation is received.

Add a timeout to waitForConfirms to prevent the emit method from hanging if confirmations are not received.

Suggested implementation:

import { setTimeout as setNodeTimeout } from 'timers';
// ... other imports
const WAIT_FOR_CONFIRMS_TIMEOUT_MS = 10000; // 10 seconds, adjust as needed

// ... inside RabbitmqController class

  async emit(message: any): Promise<void> {
    // ... existing code before waitForConfirms

    await new Promise<void>((resolve, reject) => {
      let settled = false;
      const timeout = setNodeTimeout(() => {
        if (!settled) {
          settled = true;
          this.logger.error('waitForConfirms timed out');
          reject(new Error('waitForConfirms timed out'));
        }
      }, WAIT_FOR_CONFIRMS_TIMEOUT_MS);

      channel.waitForConfirms()
        .then(() => {
          if (!settled) {
            settled = true;
            clearTimeout(timeout);
            resolve();
          }
        })
        .catch((err) => {
          if (!settled) {
            settled = true;
            clearTimeout(timeout);
            this.logger.error('waitForConfirms failed', err);
            reject(err);
          }
        });
    });

    // ... existing code after waitForConfirms
  }

// Verificar novamente se não está em shutdown
if (this.isShuttingDown) {
this.logger.info('🛑 Sistema em shutdown durante reconnect timer');
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Global queue initialization silently returns if channel unavailable.

Callers are not notified of initialization failure. Recommend propagating an error or using a callback to indicate failure.

Suggested implementation:

  private set channel(channel: amqp.Channel) {
    this.amqpChannel = channel;
  }

  /**
   * Initializes the queue. Throws an error if the AMQP channel is not available.
   * Callers must handle the error to be notified of initialization failure.
   */
  public async initializeQueue(queueName: string): Promise<void> {
    if (!this.amqpChannel) {
      this.logger.warn('AMQP channel is not available, queue initialization failed.');
      throw new Error('AMQP channel is not available');
    }
    try {
      await this.amqpChannel.assertQueue(queueName, { durable: true });
      this.logger.info(`Queue "${queueName}" initialized successfully.`);
    } catch (err) {
      this.logger.error(`Failed to initialize queue "${queueName}": ${err.message}`);
      throw err;
    }
  }
  • You will need to update all callers of initializeQueue to handle the thrown error, either with a try/catch block or by handling the rejected promise.
  • If you have a callback-based pattern elsewhere, you can adapt the error propagation to use callbacks instead of throwing.
  • Remove any old queue initialization logic that silently returns if the channel is unavailable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant