Skip to content

feat: add zstd message compression codec for SNS and SQS#442

Open
irfanh94 wants to merge 10 commits into
kibertoad:mainfrom
irfanh94:main
Open

feat: add zstd message compression codec for SNS and SQS#442
irfanh94 wants to merge 10 commits into
kibertoad:mainfrom
irfanh94:main

Conversation

@irfanh94
Copy link
Copy Markdown
Contributor

@irfanh94 irfanh94 commented May 18, 2026

Summary

  • Adds a codec option to publisher and consumer options (currently supports 'zstd') that compresses outgoing messages using zstd before sending to SNS/SQS
  • Compressed messages are wrapped in a self-describing envelope { "__codec": "zstd", "__data": "<base64>" } so consumers auto-detect and decompress without requiring explicit codec configuration
  • Compression is skipped for offloaded payload pointers (large messages already handled by S3/GCS offloading)
  • Adds integration tests for both SQS and SNS→SQS codec round-trips, including auto-detection and external compressor compatibility

Changes

  • packages/core — new packages/core/lib/codec/messageCodec.ts with compressMessageBody / decompressMessageBody / isCodecEnvelope utilities using @mongodb-js/zstd; codec?: MessageCodec added to CommonQueueOptions
  • packages/sqsAbstractSqsPublisher.sendMessage() compresses body when codec is set; AbstractSqsConsumer.resolveMaybeOffloadedPayloadMessage() decompresses codec envelopes before schema validation
  • packages/snsAbstractSnsPublisher.sendMessage() compresses body when codec is set (decompression flows through the shared SQS consumer path)
  • packages/sqs, packages/sns — devDependencies changed from "*" to "workspace:*" for workspace packages to ensure consistent type resolution during development (pnpm replaces workspace:* with the concrete version on publish, no impact on npm consumers)

Test plan

  • pnpm --filter @message-queue-toolkit/sqs test — runs SqsPermissionConsumer.codec.spec.ts (4 tests: round-trip, sequence, external compressor, auto-detection)
  • pnpm --filter @message-queue-toolkit/sns test — runs SnsSqsPermissionConsumer.codec.spec.ts (3 tests: round-trip, sequence, auto-detection through SNS envelope)
  • Existing payload offloading and consumer tests continue to pass unmodified

Summary by CodeRabbit

  • New Features

    • Added message compression support using zstd codec for SNS and SQS publishers and consumers
    • Consumers automatically decompress messages via codec-envelope auto-detection, even without explicit codec configuration
    • Compression is applied before payload offloading decisions and supports self-describing envelopes for inline transmission
    • Requires Node.js 22.15.0 or later
  • Documentation

    • Updated SNS and SQS README files with message compression configuration examples and behavior documentation
    • Added compression interaction details with payload offloading in core documentation

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Warning

Rate limit exceeded

@irfanh94 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 17 minutes and 51 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a3b6b2ed-f873-4a37-af8c-2b927d215925

📥 Commits

Reviewing files that changed from the base of the PR and between ff779b5 and 989165f.

📒 Files selected for processing (4)
  • packages/codec/README.md
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts
  • packages/sqs/test/utils/s3Utils.ts
📝 Walkthrough

Walkthrough

Adds zstd message compression support to SNS and SQS queue systems through a new codec package, with optional codec configuration on queue publishers/consumers, codec-aware payload offloading, self-describing codec envelopes, comprehensive integration tests, and benchmarking suite.

Changes

Message Codec Compression Feature

Layer / File(s) Summary
Core codec types and validation
packages/core/lib/codec/messageCodec.ts, packages/core/lib/index.ts
Defines MessageCodecEnum with ZSTD, CodecEnvelope type with __codec and __data, MessageCodecHandler interface, and isCodecEnvelope runtime validator; re-exports from core package.
Codec package implementation
packages/codec/lib/codec/codecHandler.ts, packages/codec/lib/index.ts, packages/codec/package.json, packages/codec/tsconfig.json, packages/codec/tsconfig.build.json, packages/codec/README.md
Creates @message-queue-toolkit/codec with ZstdCodecHandler delegating to Node.js zlib.zstd, resolveCodecHandler, compressMessageBody (JSON→envelope), buildCodecEnvelope (bytes→envelope), and decompressMessageBody (envelope→JSON).
Queue options codec configuration
packages/core/lib/types/queueOptionsTypes.ts, packages/core/lib/queues/AbstractQueueService.ts
Adds optional codec?: MessageCodec field to CommonQueueOptions with documentation; wires codec into AbstractQueueService protected property.
AbstractQueueService offloading refactor
packages/core/lib/queues/AbstractQueueService.ts
Replaces offloadMessagePayloadIfNeeded with offloadPayload (returns null or pointer) and adds offloadCompressedPayload for pre-compressed bytes; refactors buildPointer to optionally embed codec in payloadRef; extends retrieveOffloadedMessagePayload with optional decompress callback.
Payload reference schema codec support
packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts
Extends PAYLOAD_REF_SCHEMA with optional codec?: string field; documents compressed-binary storage requiring codec-based decompression.
Stream utilities for buffer handling
packages/core/lib/utils/streamUtils.ts
Adds streamWithKnownSizeToBuffer helper; refactors streamWithKnownSizeToString to delegate to buffer helper and convert to UTF-8.
SQS publisher message compression
packages/sqs/lib/sqs/AbstractSqsPublisher.ts
Updates publish with prepareOutgoingPayload that compresses JSON via compressMessageBody when codec is configured, then either offloads compressed bytes or returns pre-built codec envelope; sendMessage accepts optional preBuiltBody.
SQS consumer message decompression
packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Extends resolveMaybeOffloadedPayloadMessage to detect codec-envelope bodies via isCodecEnvelope and decompress; passes codec-aware decompression callback to retrieveOffloadedMessagePayload; handles decompression errors.
SQS test harness codec wiring
packages/sqs/test/consumers/SqsPermissionConsumer.ts, packages/sqs/test/publishers/SqsPermissionPublisher.ts
Extends consumer/publisher constructor options to accept and forward codec parameter to base implementations.
SNS publisher message compression
packages/sns/lib/sns/AbstractSnsPublisher.ts
Updates publish with prepareOutgoingPayload for codec compression; sendMessage accepts optional preBuiltBody to avoid re-stringification.
SNS consumer type signature
packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Tightens resolveMessage override return type to explicit Either for validation errors and resolved messages.
SNS test harness codec wiring
packages/sns/test/consumers/SnsSqsPermissionConsumer.ts, packages/sns/test/publishers/SnsPermissionPublisher.ts
Extends consumer/publisher constructor options to accept and forward codec parameter.
GCP Pub/Sub offloading refactor
packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts, packages/gcp-pubsub/package.json
Updates publish to use new offloadPayload method; updates workspace dependency reference.
AbstractQueueService offload regression tests
packages/core/test/queues/AbstractQueueService.offload.spec.ts
Targets offloadPayload instead of deprecated offloadMessagePayloadIfNeeded; verifies type field preservation.
SQS codec integration and auto-detection tests
packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
Vitest suite validating zstd compression via publisher, external compression, sequential messages, and auto-detection without explicit codec.
SQS payload offloading with codec tests
packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts
Tests codec-enabled payload offloading to S3 and consumer decompression; validates auto-detection for codecs in payload pointers.
SNS codec integration and auto-detection tests
packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
Vitest suite validating zstd via SNS→SQS pipeline for single, multiple, and auto-detected messages.
SQS codec performance benchmarks
packages/sqs/bench/codec.bench.ts, packages/sqs/vitest.bench.config.ts
Measures codec compression overhead on publish and consume paths for small and large payloads; includes benchmark configuration.
Documentation updates
packages/codec/README.md, packages/core/README.md, packages/sqs/README.md, packages/sns/README.md
Comprehensive docs for codec package, compression behavior, SQS/SNS configuration, envelope format, and size limit interactions.
Package dependencies and CI configuration
packages/sns/package.json, packages/sqs/package.json, packages/gcp-pubsub/package.json, .github/workflows/ci.yml, biome.json
Adds @message-queue-toolkit/codec peerDependency; updates workspace references; adds bench script; updates CI and linter configuration.

Sequence Diagram

sequenceDiagram
  participant Publisher
  participant Compress
  participant Store as Payload Store
  participant Network as Network/Queue
  participant Consumer
  participant Decompress
  
  Publisher->>Compress: compressMessageBody(jsonString, codec)
  Compress-->>Publisher: envelope {__codec: 'zstd', __data: base64(...)}
  Publisher->>Network: send envelope string or offloaded pointer
  alt Offloaded (large compressed)
    Publisher->>Store: offloadCompressedPayload(msg, compressed, codec)
    Store-->>Publisher: pointer {codec: 'zstd', ...}
    Publisher->>Network: send pointer
  end
  Network->>Consumer: receive envelope or pointer
  alt Is CodecEnvelope
    Consumer->>Decompress: decompressMessageBody(envelope)
    Decompress-->>Consumer: parsed JSON
  else Is Offloaded Pointer
    Consumer->>Store: retrieveOffloadedMessagePayload(pointer, decompress)
    Store->>Decompress: decompress(codec, bytes)
    Decompress-->>Store: Buffer
    Store-->>Consumer: parsed JSON
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • kibertoad/message-queue-toolkit#429: Extends AbstractQueueService offloading logic to preserve message type field during offloading, which is also foundational to codec-aware offloading in this PR.
  • kibertoad/message-queue-toolkit#373: Modifies AbstractQueueService offloaded-payload flow to preserve type via messageTypePath/messageTypeResolver, sharing the same offloading code path as the main PR's codec metadata annotations.
  • kibertoad/message-queue-toolkit#385: Touches AbstractQueueService offloading behavior and payload-pointer metadata, overlapping with the main PR's refactored offloading/payloadRef construction paths.

Suggested labels

minor

Suggested reviewers

  • CarlosGamero
  • kjamrog

🐰 Hoppin' through the queues with zstd so fine,
Compressed messages travel end to end in line,
Envelopes self-describe their codec state,
Auto-detect decompression sealed by fate!
This codec patch bounces swift and light,
Message queue compression done just right! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add zstd message compression codec for SNS and SQS' clearly and specifically describes the main change: adding a zstd compression codec feature to SNS and SQS packages.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/core/lib/codec/messageCodec.ts`:
- Around line 14-21: isCodecEnvelope currently only checks presence of
CODEC_FIELD and DATA_FIELD and that CODEC_FIELD is a supported codec, but it
doesn't assert the type of the payload so a non-string __data will later blow up
in Buffer.from; update isCodecEnvelope to also verify that (value as
Record<string, unknown>)[DATA_FIELD] is a string (and optionally that (value as
Record<string, unknown>)[CODEC_FIELD] is a string) before returning true so
downstream code (e.g., Buffer.from usage) is guaranteed a string __data.

In `@packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts`:
- Around line 82-103: The test has a race because the original consumer started
in beforeEach remains running and can steal messages from the SQS queue; modify
the spec to stop the original consumer before creating/starting the
SnsSqsPermissionConsumer instance named autoConsumer (call the appropriate
close/shutdown on the existing consumer from beforeEach), then start
autoConsumer, publish and assert via
autoConsumer.handlerSpy.waitForMessageWithId, and finally restore/close
autoConsumer; reference the existing variables/instances consumer, autoConsumer,
SnsSqsPermissionConsumer, and handlerSpy.waitForMessageWithId to locate where to
add the consumer.close() call.

In `@packages/sqs/lib/sqs/AbstractSqsConsumer.ts`:
- Around line 896-904: The current logic replaces
resolveMessageResult.result.body with the decompressed payload (using
isCodecEnvelope and decompressMessageBody), which loses the original codec
envelope needed when republishing retries; instead, preserve the original
compressed envelope by not overwriting resolveMessageResult.result.body—store
the decompressed payload on a new field (e.g., decompressedBody) or attach it to
originalMessage so downstream processing can read the decompressed content while
any retry/republish logic still serializes the original codec envelope; keep the
existing try/catch and error handling (handleError and ABORT_EARLY_EITHER) but
ensure decompressMessageBody is only used to populate the new field and never
replaces the envelope used for republish.

In `@packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts`:
- Around line 104-122: Test is flaky because the original consumer started in
beforeEach (consumer) is still polling the same queueUrl when you create
autoConsumer, so the published message may be consumed by the original consumer
instead of autoConsumer; to fix, stop the original consumer before
creating/starting autoConsumer (call and await consumer.close(true) or
consumer.stop() as appropriate) so only autoConsumer is polling this queue for
the duration of this test, then publish and await
handlerSpy.waitForMessageWithId on autoConsumer and finally restart or clean up
the original consumer if needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5cc1c3d3-5e4b-48db-97b1-d2d1d9701551

📥 Commits

Reviewing files that changed from the base of the PR and between e551bb5 and 0794af6.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (18)
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/index.ts
  • packages/core/lib/queues/AbstractQueueService.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/core/package.json
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
  • packages/sns/package.json
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sns/test/publishers/SnsPermissionPublisher.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
  • packages/sqs/test/publishers/SqsPermissionPublisher.ts
  • pnpm-workspace.yaml

Comment thread packages/core/lib/codec/messageCodec.ts
Comment thread packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
Comment thread packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Comment thread packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts Outdated
Comment thread packages/core/lib/codec/messageCodec.ts Outdated
Comment thread packages/core/lib/codec/messageCodec.ts Outdated
…ls, fix test races

- Move @mongodb-js/zstd out of core into sqs; core now only defines
  MessageCodecHandler interface + pure envelope types (CodecEnvelope,
  isCodecEnvelope, MessageCodec) with no native dependencies
- Add packages/sqs/lib/codec/sqsCodecHandler.ts: ZstdCodecHandler,
  resolveCodecHandler, and the concrete compressMessageBody /
  decompressMessageBody helpers
- AbstractSqsPublisher and AbstractSqsConsumer import from local codec;
  AbstractSnsPublisher imports compressMessageBody from @message-queue-toolkit/sqs
- Strengthen isCodecEnvelope to assert typeof __data === 'string' so
  Buffer.from downstream is guaranteed a string
- Fix race condition in SQS codec auto-detection test: use a dedicated
  queue (user_permissions_multi-auto-detect) instead of sharing the
  beforeEach consumer's queue, eliminating both the steal-race and the
  localstack long-poll timing issue
- Fix race condition in SNS codec auto-detection test: stop the original
  consumer before starting autoConsumer, reassign consumer = autoConsumer
  so afterEach handles cleanup without a double-close

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread packages/codec/lib/codec/codecHandler.ts
Comment thread packages/core/lib/queues/AbstractQueueService.ts
Irfan Hodzic and others added 2 commits May 19, 2026 09:16
…tall

The native addon requires node-gyp compilation. pnpm install runs with
--ignore-scripts in CI, so the binary is never built. pnpm rebuild
explicitly compiles it regardless of that flag.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Measures wall-clock time and msg/s for 50 messages with and without
zstd compression across small (~80 B) and large (~6 KB) payloads.
Each run deletes its queues before and after so no resources are left
behind. Run with: pnpm --filter @message-queue-toolkit/sqs bench

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread packages/core/lib/codec/messageCodec.ts Outdated
…nd add codec documentation

Switch from @mongodb-js/zstd (native node-gyp addon requiring Python and
a C++ toolchain) to zlib.zstdCompress/zstdDecompress built into Node.js 22+.
This removes 24 transitive packages, drops the pnpm rebuild CI step, and
eliminates native build requirements for end users of the package.

Refactor MessageCodec to use MessageCodecEnum object pattern, enabling
MessageCodecEnum.ZSTD usage alongside the plain string literal.

Add JSDoc to MessageCodecEnum, MessageCodecHandler, and the codec option
in queueOptionsTypes. Add a Message Compression section to the SQS README
with publisher/consumer examples and auto-detection behaviour, and reference
it from the SNS README.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/sqs/lib/codec/sqsCodecHandler.ts`:
- Around line 6-7: The import eagerly promisifies
zlib.zstdCompress/zstdDecompress (zstdCompress, zstdDecompress) which will throw
on Node versions before 23.8.0; add a runtime guard that checks for
zlib.zstdCompress and zlib.zstdDecompress existence before calling promisify and
throw a clear error message if missing, and update packages/sqs/package.json to
include "engines": { "node": ">=23.8.0" } (or your chosen minimum) and align the
CI matrix to only run supported Node versions; reference the symbols
zstdCompress and zstdDecompress in your guard and update related tests/CI
configs accordingly.

In `@packages/sqs/README.md`:
- Around line 848-850: Update the compression notes that currently state "256
KB" to the correct AWS SQS maximum "1 MiB" so the text about compression being
applied after schema validation and before the SQS SendMessage call, and the
note about compressed payload size limits, match the rest of the README; ensure
the wording near MessageCodecEnum.ZSTD (and any mention of SQS 256 KB) is
replaced with "1 MiB" and is consistent with other occurrences in the document.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 18893cc0-2a22-4b04-9337-32b52eb05035

📥 Commits

Reviewing files that changed from the base of the PR and between 0794af6 and c1d83e7.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (16)
  • biome.json
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/index.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/sns/README.md
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
  • packages/sqs/README.md
  • packages/sqs/bench/codec.bench.ts
  • packages/sqs/lib/codec/sqsCodecHandler.ts
  • packages/sqs/lib/index.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/vitest.bench.config.ts
✅ Files skipped from review due to trivial changes (1)
  • packages/sns/README.md
🚧 Files skipped from review as they are similar to previous changes (5)
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts

Comment thread packages/codec/lib/codec/codecHandler.ts
Comment thread packages/sqs/README.md
Comment on lines +848 to +850
- Compression is applied **after** schema validation and **before** the SQS `SendMessage` call.
- Compressed payloads are still subject to the SQS 256 KB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](#payload-offloading).
- Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

What is the current maximum Amazon SQS message size according to AWS official documentation (SQS Developer Guide / API Reference)?

💡 Result:

According to official AWS documentation, the current maximum message size for Amazon SQS is 1 MiB (1,048,576 bytes) [1][2]. This limit applies to the total size of the message, which includes both the message body and any message attributes [3][4]. For scenarios requiring larger payloads, you can use the Amazon SQS Extended Client Library for Java or Python, which allows you to send messages up to 2 GB by storing the payload in Amazon S3 and sending a reference to that object via SQS [5][2]. Note that this 1 MiB limit was increased from the previous 256 KiB limit in August 2025 [6].

Citations:


🏁 Script executed:

# Check the file exists and read the specific lines
head -850 packages/sqs/README.md | tail -20

Repository: kibertoad/message-queue-toolkit

Length of output: 1088


🏁 Script executed:

# Search for references to message size, KB, MiB, limits in the README
rg -i "(message size|256|1\s*mib|256\s*k)" packages/sqs/README.md -n

Repository: kibertoad/message-queue-toolkit

Length of output: 749


🏁 Script executed:

# Get more context around lines 848-850 to understand the compression section
sed -n '840,860p' packages/sqs/README.md

Repository: kibertoad/message-queue-toolkit

Length of output: 915


Fix the incorrect SQS message size limit in the compression notes.

Line 849 states 256 KB, but this contradicts the 1 MiB limit documented throughout the rest of the README (lines 76, 385, 462, 768, 782, 790). The current AWS SQS maximum message size is 1 MiB.

Suggested doc fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
+- Compressed payloads are still subject to the SQS 1 MiB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- Compression is applied **after** schema validation and **before** the SQS `SendMessage` call.
- Compressed payloads are still subject to the SQS 256 KB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](#payload-offloading).
- Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type.
- Compression is applied **after** schema validation and **before** the SQS `SendMessage` call.
- Compressed payloads are still subject to the SQS 1 MiB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
- Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/sqs/README.md` around lines 848 - 850, Update the compression notes
that currently state "256 KB" to the correct AWS SQS maximum "1 MiB" so the text
about compression being applied after schema validation and before the SQS
SendMessage call, and the note about compressed payload size limits, match the
rest of the README; ensure the wording near MessageCodecEnum.ZSTD (and any
mention of SQS 256 KB) is replaced with "1 MiB" and is consistent with other
occurrences in the document.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai is this still valid?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@irfanh94, yes, this comment is still valid. The README's "Message Compression" notes section still reads "SQS 256 KB message size limit", while:

  1. AWS increased the maximum message payload size from 256 KiB to 1 MiB on August 4, 2025, making the 256 KB reference outdated.
  2. The rest of the README already consistently uses 1 MiB (e.g., in the Payload Offloading section and the Features list).

The fix is a one-line update in the Notes bullet:

- - Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
+ - Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).

const zstdCompress = promisify(zlib.zstdCompress)
const zstdDecompress = promisify(zlib.zstdDecompress)

export class ZstdCodecHandler implements MessageCodecHandler {
Copy link
Copy Markdown
Collaborator

@CarlosGamero CarlosGamero May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we’re no longer relying on external libraries for compression, we might consider moving this utility into the core. That would allow us to reuse it across other modules like amp or gcp-pubsup without having to reimplement it each time.

@kibertoad tagging you to get your thoughts on this, especially since you previously suggested moving it out of core 😓

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just saw the comment about a separate package. I agree with that as well—ultimately, we’re both talking about the same thing: enabling compression support across the different technologies we support.

One open question I’m still unsure about: does it really make sense to create a separate package if we’re going to rely on Node internals anyway? If we keep it in core, customers don’t need to install additional dependencies they may not use. From that perspective, having it in core feels sufficient, but I don’t have a strong opinion either way.

Comment thread packages/sns/README.md
Comment thread pnpm-workspace.yaml Outdated
Move the zstd codec implementation (ZstdCodecHandler, compressMessageBody,
decompressMessageBody, resolveCodecHandler) from packages/sqs into a new
dedicated packages/codec package so any adapter can use compression without
depending on @message-queue-toolkit/sqs.

- Create packages/codec with package.json, tsconfigs, and lib/codec/codecHandler.ts
- Delete packages/sqs/lib/codec/sqsCodecHandler.ts
- Update sqs and sns to import from @message-queue-toolkit/codec
- Re-export codec functions from @message-queue-toolkit/sqs for backwards compatibility
- Add @message-queue-toolkit/codec as peer dependency in sqs and sns packages
- Remove @mongodb-js/zstd from pnpm-workspace.yaml allowBuilds (no longer used)
- Register packages/codec in CI PATH_TO_NAME map
- Update SQS and SNS READMEs to document codec as a separate peer dependency

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread packages/core/lib/types/queueOptionsTypes.ts Outdated
Comment thread packages/core/lib/types/queueOptionsTypes.ts
options: SNSMessageOptions,
): Promise<void> {
const attributes = resolveOutgoingMessageAttributes<MessageAttributeValue>(payload)
const jsonBody = JSON.stringify(payload)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json-stream-stringify is a better option here, and compression method ideally should operate on streams, not on full content

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kibertoad Is it safe to use this package: https://www.npmjs.com/package/json-stream-stringify since its been 2 years since last update.

Irfan Hodzic and others added 3 commits May 19, 2026 10:57
zlib.zstdCompress/zstdDecompress were added in Node.js v22.15.0 and v23.8.0,
not v22.0.0. The previous "Node.js 22+" claim was incorrect and would cause a
cryptic TypeError at import time on v22.0.0-v22.14.x.

- Add runtime guard in codecHandler.ts that throws a clear error if zstd
  functions are missing, before promisify() is called
- Add engines: { node: ">=22.15.0" } to packages/codec/package.json
- Update all JSDoc and README references from "Node.js 22+" to ">=22.15.0"

CI matrix (22.x, 24.x) resolves to latest patches which are >=22.15.0 — no change needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fixes double compression: previously publish() delegated to
offloadMessagePayloadIfNeeded which compressed the payload to check the
size threshold, then returned the original message, and sendMessage()
compressed again.

Now when a codec is set, the message is compressed exactly once at
publish() entry point, regardless of whether offloading is also
configured. The same compressed Buffer is then either:
- stored in S3 and replaced with a pointer (if compressed size exceeds
  messageSizeThreshold), or
- wrapped in a codec envelope and sent inline (if it fits).

The payload is never compressed twice.

Key changes:
- codec: add buildCodecEnvelope(compressed, codec) to wrap pre-compressed
  bytes without re-compressing
- core: replace offloadMessagePayloadIfNeeded with three focused methods:
  - private buildPointer() — shared pointer construction logic
  - protected offloadPayload() — no-codec path, returns null if fits
  - protected offloadCompressedPayload() — codec path, always stores
- sqs/sns: restructure publish() via private prepareOutgoingPayload()
  that compresses once and branches; sendMessage() accepts preBuiltBody
  to skip re-serialization
- gcp-pubsub: migrate to offloadPayload(), pin core to workspace:*
- docs: update SQS, SNS, core, and codec READMEs to explain the single
  compression pass and how codec interacts with payload offloading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
packages/core/lib/utils/streamUtils.ts (1)

13-14: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate that stream data fits within the expected size.

Buffer.copy() silently truncates if the source doesn't fit in the target buffer. If the stream contains more data than the provided size, line 13 will copy only what fits and line 14 will increment offset by the full chunk length, causing offset tracking to become inaccurate and subsequent chunks to be written at incorrect positions or dropped entirely. In the codec compression context, truncated compressed payloads will fail decompression.

🛡️ Proposed fix to detect and reject oversized streams
-    chunkBuffer.copy(buffer, offset)
-    offset += chunkBuffer.length
+    const bytesCopied = chunkBuffer.copy(buffer, offset)
+    if (bytesCopied !== chunkBuffer.length) {
+      throw new Error(
+        `Stream size exceeds expected size of ${size} bytes. Data truncation detected at offset ${offset}.`
+      )
+    }
+    offset += bytesCopied
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/lib/utils/streamUtils.ts` around lines 13 - 14, The code
currently copies chunkBuffer into buffer without checking bounds, then
increments offset by chunkBuffer.length which breaks offset tracking if the
stream exceeds the provided size; in the stream processing function in
streamUtils.ts (the block using variables buffer, offset, chunkBuffer and size)
add a pre-copy check: if offset + chunkBuffer.length > size, reject/throw an
error (or return a failed Promise) indicating the stream is oversized; otherwise
perform Buffer.copy and increment offset by the actual number of bytes copied
(or chunkBuffer.length if safe). Ensure the error/path prevents silent
truncation and surfaces the oversized-stream condition to callers.
packages/core/lib/queues/AbstractQueueService.ts (1)

835-860: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Return an explicit error when codec metadata is present but no decompressor is provided.

At Line 835, if codec exists but decompress is undefined, execution falls through and tries JSON parsing compressed bytes (Line 858), which hides the real failure mode.

🛠️ Suggested fix
     const codec = parsedPayload.payloadRef?.codec
+    if (codec && !decompress) {
+      return {
+        error: new Error(
+          `Offloaded payload is compressed with codec "${codec}" but no decompressor callback was provided`,
+        ),
+      }
+    }
+
     if (codec && decompress) {
       try {
         const compressedBuffer = await streamWithKnownSizeToBuffer(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/lib/queues/AbstractQueueService.ts` around lines 835 - 860,
When parsedPayload.payloadRef?.codec is present but the decompress function is
undefined, the code must return an explicit error instead of falling through and
treating compressed bytes as JSON; update the branch around codec handling in
AbstractQueueService (the block using parsedPayload.payloadRef?.codec,
decompress, and streamWithKnownSizeToBuffer/streamWithKnownSizeToString) to
detect codec && !decompress and immediately return an Error (with a clear
message and cause if appropriate) indicating "no decompressor provided for codec
<codec>" so the later JSON.parse path (using streamWithKnownSizeToString) is not
attempted on compressed data.
♻️ Duplicate comments (1)
packages/sqs/README.md (1)

872-872: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix incorrect size limit (duplicate) and markdown link syntax.

Two issues on this line:

  1. Incorrect SQS limit (duplicate of past review): States "256 KB" but should be "1 MiB" to match the rest of the README (lines 76, 385, 462, 768, 782, 790) and AWS documentation.

  2. Broken markdown link syntax: The link has backticks around the anchor: [Payload Offloading](`#payload-offloading`) which renders incorrectly. Should be [Payload Offloading](#payload-offloading).

📝 Proposed fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.
+- Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/sqs/README.md` at line 872, Update the sentence to use the correct
SQS size limit and fix the markdown link: replace "256 KB" with "1 MiB" to match
other README references and AWS docs, and change the link syntax from [Payload
Offloading](`#payload-offloading`) to [Payload Offloading](`#payload-offloading`)
so the anchor renders correctly; ensure the note still mentions combining
compression with the Payload Offloading flow and that payloadRef.codec records
the algorithm.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/codec/README.md`:
- Around line 47-51: The example in README.md uses resolveCodecHandler but does
not import it; update the import list at the top (where buildCodecEnvelope is
imported) to also import resolveCodecHandler (and any other required symbols
like MessageCodecEnum if not present) so the snippet runs as-is; locate the
import statement that currently reads "import { buildCodecEnvelope } from
'`@message-queue-toolkit/codec`'" and add resolveCodecHandler to that named
import.

---

Outside diff comments:
In `@packages/core/lib/queues/AbstractQueueService.ts`:
- Around line 835-860: When parsedPayload.payloadRef?.codec is present but the
decompress function is undefined, the code must return an explicit error instead
of falling through and treating compressed bytes as JSON; update the branch
around codec handling in AbstractQueueService (the block using
parsedPayload.payloadRef?.codec, decompress, and
streamWithKnownSizeToBuffer/streamWithKnownSizeToString) to detect codec &&
!decompress and immediately return an Error (with a clear message and cause if
appropriate) indicating "no decompressor provided for codec <codec>" so the
later JSON.parse path (using streamWithKnownSizeToString) is not attempted on
compressed data.

In `@packages/core/lib/utils/streamUtils.ts`:
- Around line 13-14: The code currently copies chunkBuffer into buffer without
checking bounds, then increments offset by chunkBuffer.length which breaks
offset tracking if the stream exceeds the provided size; in the stream
processing function in streamUtils.ts (the block using variables buffer, offset,
chunkBuffer and size) add a pre-copy check: if offset + chunkBuffer.length >
size, reject/throw an error (or return a failed Promise) indicating the stream
is oversized; otherwise perform Buffer.copy and increment offset by the actual
number of bytes copied (or chunkBuffer.length if safe). Ensure the error/path
prevents silent truncation and surfaces the oversized-stream condition to
callers.

---

Duplicate comments:
In `@packages/sqs/README.md`:
- Line 872: Update the sentence to use the correct SQS size limit and fix the
markdown link: replace "256 KB" with "1 MiB" to match other README references
and AWS docs, and change the link syntax from [Payload
Offloading](`#payload-offloading`) to [Payload Offloading](`#payload-offloading`)
so the anchor renders correctly; ensure the note still mentions combining
compression with the Payload Offloading flow and that payloadRef.codec records
the algorithm.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e7a677e7-4dda-4dd8-8852-fcad4c20b50f

📥 Commits

Reviewing files that changed from the base of the PR and between c1d83e7 and ff779b5.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (28)
  • .github/workflows/ci.yml
  • packages/codec/README.md
  • packages/codec/lib/codec/codecHandler.ts
  • packages/codec/lib/index.ts
  • packages/codec/package.json
  • packages/codec/tsconfig.build.json
  • packages/codec/tsconfig.json
  • packages/core/README.md
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts
  • packages/core/lib/queues/AbstractQueueService.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/core/lib/utils/streamUtils.ts
  • packages/core/test/queues/AbstractQueueService.offload.spec.ts
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts
  • packages/gcp-pubsub/package.json
  • packages/sns/README.md
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/package.json
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sqs/README.md
  • packages/sqs/lib/index.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
✅ Files skipped from review due to trivial changes (6)
  • packages/gcp-pubsub/package.json
  • packages/codec/tsconfig.json
  • packages/codec/lib/index.ts
  • packages/core/README.md
  • packages/codec/package.json
  • packages/sns/README.md
🚧 Files skipped from review as they are similar to previous changes (8)
  • packages/sqs/lib/index.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
  • packages/sns/package.json
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts

Comment thread packages/codec/README.md Outdated
For the inline codec path, read the raw SQS message body from an isolated
queue (no consumer) using ReceiveMessageCommand and assert:
- body is a JSON codec envelope with __codec === 'zstd'
- __data decodes from base64 to a valid zstd frame (magic bytes 28 B5 2F FD)

For the codec + payload offloading path, assert:
- SQS message body is a plain JSON pointer (no __codec field), with
  payloadRef.codec === 'zstd' confirming which algorithm was used
- S3 object contains raw compressed binary, not a JSON envelope
  (first 4 bytes match the zstd magic number 0xFD2FB528)

Also add getObjectBuffer() to s3Utils for reading S3 objects as raw
Buffer without UTF-8 decoding, and fix missing resolveCodecHandler import
in codec README example snippet.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants