feat: add zstd message compression codec for SNS and SQS#442
Conversation
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughAdds zstd message compression support to SNS and SQS queue systems through a new codec package, with optional codec configuration on queue publishers/consumers, codec-aware payload offloading, self-describing codec envelopes, comprehensive integration tests, and benchmarking suite. ChangesMessage Codec Compression Feature
Sequence DiagramsequenceDiagram
participant Publisher
participant Compress
participant Store as Payload Store
participant Network as Network/Queue
participant Consumer
participant Decompress
Publisher->>Compress: compressMessageBody(jsonString, codec)
Compress-->>Publisher: envelope {__codec: 'zstd', __data: base64(...)}
Publisher->>Network: send envelope string or offloaded pointer
alt Offloaded (large compressed)
Publisher->>Store: offloadCompressedPayload(msg, compressed, codec)
Store-->>Publisher: pointer {codec: 'zstd', ...}
Publisher->>Network: send pointer
end
Network->>Consumer: receive envelope or pointer
alt Is CodecEnvelope
Consumer->>Decompress: decompressMessageBody(envelope)
Decompress-->>Consumer: parsed JSON
else Is Offloaded Pointer
Consumer->>Store: retrieveOffloadedMessagePayload(pointer, decompress)
Store->>Decompress: decompress(codec, bytes)
Decompress-->>Store: Buffer
Store-->>Consumer: parsed JSON
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/core/lib/codec/messageCodec.ts`:
- Around line 14-21: isCodecEnvelope currently only checks presence of
CODEC_FIELD and DATA_FIELD and that CODEC_FIELD is a supported codec, but it
doesn't assert the type of the payload so a non-string __data will later blow up
in Buffer.from; update isCodecEnvelope to also verify that (value as
Record<string, unknown>)[DATA_FIELD] is a string (and optionally that (value as
Record<string, unknown>)[CODEC_FIELD] is a string) before returning true so
downstream code (e.g., Buffer.from usage) is guaranteed a string __data.
In `@packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts`:
- Around line 82-103: The test has a race because the original consumer started
in beforeEach remains running and can steal messages from the SQS queue; modify
the spec to stop the original consumer before creating/starting the
SnsSqsPermissionConsumer instance named autoConsumer (call the appropriate
close/shutdown on the existing consumer from beforeEach), then start
autoConsumer, publish and assert via
autoConsumer.handlerSpy.waitForMessageWithId, and finally restore/close
autoConsumer; reference the existing variables/instances consumer, autoConsumer,
SnsSqsPermissionConsumer, and handlerSpy.waitForMessageWithId to locate where to
add the consumer.close() call.
In `@packages/sqs/lib/sqs/AbstractSqsConsumer.ts`:
- Around line 896-904: The current logic replaces
resolveMessageResult.result.body with the decompressed payload (using
isCodecEnvelope and decompressMessageBody), which loses the original codec
envelope needed when republishing retries; instead, preserve the original
compressed envelope by not overwriting resolveMessageResult.result.body—store
the decompressed payload on a new field (e.g., decompressedBody) or attach it to
originalMessage so downstream processing can read the decompressed content while
any retry/republish logic still serializes the original codec envelope; keep the
existing try/catch and error handling (handleError and ABORT_EARLY_EITHER) but
ensure decompressMessageBody is only used to populate the new field and never
replaces the envelope used for republish.
In `@packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts`:
- Around line 104-122: Test is flaky because the original consumer started in
beforeEach (consumer) is still polling the same queueUrl when you create
autoConsumer, so the published message may be consumed by the original consumer
instead of autoConsumer; to fix, stop the original consumer before
creating/starting autoConsumer (call and await consumer.close(true) or
consumer.stop() as appropriate) so only autoConsumer is polling this queue for
the duration of this test, then publish and await
handlerSpy.waitForMessageWithId on autoConsumer and finally restart or clean up
the original consumer if needed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5cc1c3d3-5e4b-48db-97b1-d2d1d9701551
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (18)
packages/core/lib/codec/messageCodec.tspackages/core/lib/index.tspackages/core/lib/queues/AbstractQueueService.tspackages/core/lib/types/queueOptionsTypes.tspackages/core/package.jsonpackages/sns/lib/sns/AbstractSnsPublisher.tspackages/sns/lib/sns/AbstractSnsSqsConsumer.tspackages/sns/package.jsonpackages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.tspackages/sns/test/consumers/SnsSqsPermissionConsumer.tspackages/sns/test/publishers/SnsPermissionPublisher.tspackages/sqs/lib/sqs/AbstractSqsConsumer.tspackages/sqs/lib/sqs/AbstractSqsPublisher.tspackages/sqs/package.jsonpackages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.tspackages/sqs/test/consumers/SqsPermissionConsumer.tspackages/sqs/test/publishers/SqsPermissionPublisher.tspnpm-workspace.yaml
…ls, fix test races - Move @mongodb-js/zstd out of core into sqs; core now only defines MessageCodecHandler interface + pure envelope types (CodecEnvelope, isCodecEnvelope, MessageCodec) with no native dependencies - Add packages/sqs/lib/codec/sqsCodecHandler.ts: ZstdCodecHandler, resolveCodecHandler, and the concrete compressMessageBody / decompressMessageBody helpers - AbstractSqsPublisher and AbstractSqsConsumer import from local codec; AbstractSnsPublisher imports compressMessageBody from @message-queue-toolkit/sqs - Strengthen isCodecEnvelope to assert typeof __data === 'string' so Buffer.from downstream is guaranteed a string - Fix race condition in SQS codec auto-detection test: use a dedicated queue (user_permissions_multi-auto-detect) instead of sharing the beforeEach consumer's queue, eliminating both the steal-race and the localstack long-poll timing issue - Fix race condition in SNS codec auto-detection test: stop the original consumer before starting autoConsumer, reassign consumer = autoConsumer so afterEach handles cleanup without a double-close Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tall The native addon requires node-gyp compilation. pnpm install runs with --ignore-scripts in CI, so the binary is never built. pnpm rebuild explicitly compiles it regardless of that flag. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Measures wall-clock time and msg/s for 50 messages with and without zstd compression across small (~80 B) and large (~6 KB) payloads. Each run deletes its queues before and after so no resources are left behind. Run with: pnpm --filter @message-queue-toolkit/sqs bench Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nd add codec documentation Switch from @mongodb-js/zstd (native node-gyp addon requiring Python and a C++ toolchain) to zlib.zstdCompress/zstdDecompress built into Node.js 22+. This removes 24 transitive packages, drops the pnpm rebuild CI step, and eliminates native build requirements for end users of the package. Refactor MessageCodec to use MessageCodecEnum object pattern, enabling MessageCodecEnum.ZSTD usage alongside the plain string literal. Add JSDoc to MessageCodecEnum, MessageCodecHandler, and the codec option in queueOptionsTypes. Add a Message Compression section to the SQS README with publisher/consumer examples and auto-detection behaviour, and reference it from the SNS README. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/sqs/lib/codec/sqsCodecHandler.ts`:
- Around line 6-7: The import eagerly promisifies
zlib.zstdCompress/zstdDecompress (zstdCompress, zstdDecompress) which will throw
on Node versions before 23.8.0; add a runtime guard that checks for
zlib.zstdCompress and zlib.zstdDecompress existence before calling promisify and
throw a clear error message if missing, and update packages/sqs/package.json to
include "engines": { "node": ">=23.8.0" } (or your chosen minimum) and align the
CI matrix to only run supported Node versions; reference the symbols
zstdCompress and zstdDecompress in your guard and update related tests/CI
configs accordingly.
In `@packages/sqs/README.md`:
- Around line 848-850: Update the compression notes that currently state "256
KB" to the correct AWS SQS maximum "1 MiB" so the text about compression being
applied after schema validation and before the SQS SendMessage call, and the
note about compressed payload size limits, match the rest of the README; ensure
the wording near MessageCodecEnum.ZSTD (and any mention of SQS 256 KB) is
replaced with "1 MiB" and is consistent with other occurrences in the document.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 18893cc0-2a22-4b04-9337-32b52eb05035
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (16)
biome.jsonpackages/core/lib/codec/messageCodec.tspackages/core/lib/index.tspackages/core/lib/types/queueOptionsTypes.tspackages/sns/README.mdpackages/sns/lib/sns/AbstractSnsPublisher.tspackages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.tspackages/sqs/README.mdpackages/sqs/bench/codec.bench.tspackages/sqs/lib/codec/sqsCodecHandler.tspackages/sqs/lib/index.tspackages/sqs/lib/sqs/AbstractSqsConsumer.tspackages/sqs/lib/sqs/AbstractSqsPublisher.tspackages/sqs/package.jsonpackages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.tspackages/sqs/vitest.bench.config.ts
✅ Files skipped from review due to trivial changes (1)
- packages/sns/README.md
🚧 Files skipped from review as they are similar to previous changes (5)
- packages/core/lib/types/queueOptionsTypes.ts
- packages/sns/lib/sns/AbstractSnsPublisher.ts
- packages/sqs/lib/sqs/AbstractSqsPublisher.ts
- packages/sqs/package.json
- packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
| - Compression is applied **after** schema validation and **before** the SQS `SendMessage` call. | ||
| - Compressed payloads are still subject to the SQS 256 KB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](#payload-offloading). | ||
| - Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type. |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
What is the current maximum Amazon SQS message size according to AWS official documentation (SQS Developer Guide / API Reference)?
💡 Result:
According to official AWS documentation, the current maximum message size for Amazon SQS is 1 MiB (1,048,576 bytes) [1][2]. This limit applies to the total size of the message, which includes both the message body and any message attributes [3][4]. For scenarios requiring larger payloads, you can use the Amazon SQS Extended Client Library for Java or Python, which allows you to send messages up to 2 GB by storing the payload in Amazon S3 and sending a reference to that object via SQS [5][2]. Note that this 1 MiB limit was increased from the previous 256 KiB limit in August 2025 [6].
Citations:
- 1: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
- 2: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
- 3: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html
- 4: https://docs.aws.amazon.com/boto3/latest/reference/services/sqs/queue/send_message.html
- 5: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
- 6: https://aws.amazon.com/about-aws/whats-new/2025/08/amazon-sqs-max-payload-size-1mib/
🏁 Script executed:
# Check the file exists and read the specific lines
head -850 packages/sqs/README.md | tail -20Repository: kibertoad/message-queue-toolkit
Length of output: 1088
🏁 Script executed:
# Search for references to message size, KB, MiB, limits in the README
rg -i "(message size|256|1\s*mib|256\s*k)" packages/sqs/README.md -nRepository: kibertoad/message-queue-toolkit
Length of output: 749
🏁 Script executed:
# Get more context around lines 848-850 to understand the compression section
sed -n '840,860p' packages/sqs/README.mdRepository: kibertoad/message-queue-toolkit
Length of output: 915
Fix the incorrect SQS message size limit in the compression notes.
Line 849 states 256 KB, but this contradicts the 1 MiB limit documented throughout the rest of the README (lines 76, 385, 462, 768, 782, 790). The current AWS SQS maximum message size is 1 MiB.
Suggested doc fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
+- Compressed payloads are still subject to the SQS 1 MiB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - Compression is applied **after** schema validation and **before** the SQS `SendMessage` call. | |
| - Compressed payloads are still subject to the SQS 256 KB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](#payload-offloading). | |
| - Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type. | |
| - Compression is applied **after** schema validation and **before** the SQS `SendMessage` call. | |
| - Compressed payloads are still subject to the SQS 1 MiB message size limit. For large messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). | |
| - Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/sqs/README.md` around lines 848 - 850, Update the compression notes
that currently state "256 KB" to the correct AWS SQS maximum "1 MiB" so the text
about compression being applied after schema validation and before the SQS
SendMessage call, and the note about compressed payload size limits, match the
rest of the README; ensure the wording near MessageCodecEnum.ZSTD (and any
mention of SQS 256 KB) is replaced with "1 MiB" and is consistent with other
occurrences in the document.
There was a problem hiding this comment.
@irfanh94, yes, this comment is still valid. The README's "Message Compression" notes section still reads "SQS 256 KB message size limit", while:
- AWS increased the maximum message payload size from 256 KiB to 1 MiB on August 4, 2025, making the 256 KB reference outdated.
- The rest of the README already consistently uses 1 MiB (e.g., in the Payload Offloading section and the Features list).
The fix is a one-line update in the Notes bullet:
- - Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
+ - Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).| const zstdCompress = promisify(zlib.zstdCompress) | ||
| const zstdDecompress = promisify(zlib.zstdDecompress) | ||
|
|
||
| export class ZstdCodecHandler implements MessageCodecHandler { |
There was a problem hiding this comment.
Since we’re no longer relying on external libraries for compression, we might consider moving this utility into the core. That would allow us to reuse it across other modules like amp or gcp-pubsup without having to reimplement it each time.
@kibertoad tagging you to get your thoughts on this, especially since you previously suggested moving it out of core 😓
There was a problem hiding this comment.
Oh, I just saw the comment about a separate package. I agree with that as well—ultimately, we’re both talking about the same thing: enabling compression support across the different technologies we support.
One open question I’m still unsure about: does it really make sense to create a separate package if we’re going to rely on Node internals anyway? If we keep it in core, customers don’t need to install additional dependencies they may not use. From that perspective, having it in core feels sufficient, but I don’t have a strong opinion either way.
Move the zstd codec implementation (ZstdCodecHandler, compressMessageBody, decompressMessageBody, resolveCodecHandler) from packages/sqs into a new dedicated packages/codec package so any adapter can use compression without depending on @message-queue-toolkit/sqs. - Create packages/codec with package.json, tsconfigs, and lib/codec/codecHandler.ts - Delete packages/sqs/lib/codec/sqsCodecHandler.ts - Update sqs and sns to import from @message-queue-toolkit/codec - Re-export codec functions from @message-queue-toolkit/sqs for backwards compatibility - Add @message-queue-toolkit/codec as peer dependency in sqs and sns packages - Remove @mongodb-js/zstd from pnpm-workspace.yaml allowBuilds (no longer used) - Register packages/codec in CI PATH_TO_NAME map - Update SQS and SNS READMEs to document codec as a separate peer dependency Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| options: SNSMessageOptions, | ||
| ): Promise<void> { | ||
| const attributes = resolveOutgoingMessageAttributes<MessageAttributeValue>(payload) | ||
| const jsonBody = JSON.stringify(payload) |
There was a problem hiding this comment.
json-stream-stringify is a better option here, and compression method ideally should operate on streams, not on full content
There was a problem hiding this comment.
@kibertoad Is it safe to use this package: https://www.npmjs.com/package/json-stream-stringify since its been 2 years since last update.
zlib.zstdCompress/zstdDecompress were added in Node.js v22.15.0 and v23.8.0,
not v22.0.0. The previous "Node.js 22+" claim was incorrect and would cause a
cryptic TypeError at import time on v22.0.0-v22.14.x.
- Add runtime guard in codecHandler.ts that throws a clear error if zstd
functions are missing, before promisify() is called
- Add engines: { node: ">=22.15.0" } to packages/codec/package.json
- Update all JSDoc and README references from "Node.js 22+" to ">=22.15.0"
CI matrix (22.x, 24.x) resolves to latest patches which are >=22.15.0 — no change needed.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fixes double compression: previously publish() delegated to offloadMessagePayloadIfNeeded which compressed the payload to check the size threshold, then returned the original message, and sendMessage() compressed again. Now when a codec is set, the message is compressed exactly once at publish() entry point, regardless of whether offloading is also configured. The same compressed Buffer is then either: - stored in S3 and replaced with a pointer (if compressed size exceeds messageSizeThreshold), or - wrapped in a codec envelope and sent inline (if it fits). The payload is never compressed twice. Key changes: - codec: add buildCodecEnvelope(compressed, codec) to wrap pre-compressed bytes without re-compressing - core: replace offloadMessagePayloadIfNeeded with three focused methods: - private buildPointer() — shared pointer construction logic - protected offloadPayload() — no-codec path, returns null if fits - protected offloadCompressedPayload() — codec path, always stores - sqs/sns: restructure publish() via private prepareOutgoingPayload() that compresses once and branches; sendMessage() accepts preBuiltBody to skip re-serialization - gcp-pubsub: migrate to offloadPayload(), pin core to workspace:* - docs: update SQS, SNS, core, and codec READMEs to explain the single compression pass and how codec interacts with payload offloading Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/core/lib/utils/streamUtils.ts (1)
13-14:⚠️ Potential issue | 🟠 Major | ⚡ Quick winValidate that stream data fits within the expected size.
Buffer.copy()silently truncates if the source doesn't fit in the target buffer. If the stream contains more data than the providedsize, line 13 will copy only what fits and line 14 will incrementoffsetby the full chunk length, causing offset tracking to become inaccurate and subsequent chunks to be written at incorrect positions or dropped entirely. In the codec compression context, truncated compressed payloads will fail decompression.🛡️ Proposed fix to detect and reject oversized streams
- chunkBuffer.copy(buffer, offset) - offset += chunkBuffer.length + const bytesCopied = chunkBuffer.copy(buffer, offset) + if (bytesCopied !== chunkBuffer.length) { + throw new Error( + `Stream size exceeds expected size of ${size} bytes. Data truncation detected at offset ${offset}.` + ) + } + offset += bytesCopied🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/lib/utils/streamUtils.ts` around lines 13 - 14, The code currently copies chunkBuffer into buffer without checking bounds, then increments offset by chunkBuffer.length which breaks offset tracking if the stream exceeds the provided size; in the stream processing function in streamUtils.ts (the block using variables buffer, offset, chunkBuffer and size) add a pre-copy check: if offset + chunkBuffer.length > size, reject/throw an error (or return a failed Promise) indicating the stream is oversized; otherwise perform Buffer.copy and increment offset by the actual number of bytes copied (or chunkBuffer.length if safe). Ensure the error/path prevents silent truncation and surfaces the oversized-stream condition to callers.packages/core/lib/queues/AbstractQueueService.ts (1)
835-860:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReturn an explicit error when codec metadata is present but no decompressor is provided.
At Line 835, if
codecexists butdecompressis undefined, execution falls through and tries JSON parsing compressed bytes (Line 858), which hides the real failure mode.🛠️ Suggested fix
const codec = parsedPayload.payloadRef?.codec + if (codec && !decompress) { + return { + error: new Error( + `Offloaded payload is compressed with codec "${codec}" but no decompressor callback was provided`, + ), + } + } + if (codec && decompress) { try { const compressedBuffer = await streamWithKnownSizeToBuffer(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/lib/queues/AbstractQueueService.ts` around lines 835 - 860, When parsedPayload.payloadRef?.codec is present but the decompress function is undefined, the code must return an explicit error instead of falling through and treating compressed bytes as JSON; update the branch around codec handling in AbstractQueueService (the block using parsedPayload.payloadRef?.codec, decompress, and streamWithKnownSizeToBuffer/streamWithKnownSizeToString) to detect codec && !decompress and immediately return an Error (with a clear message and cause if appropriate) indicating "no decompressor provided for codec <codec>" so the later JSON.parse path (using streamWithKnownSizeToString) is not attempted on compressed data.
♻️ Duplicate comments (1)
packages/sqs/README.md (1)
872-872:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix incorrect size limit (duplicate) and markdown link syntax.
Two issues on this line:
Incorrect SQS limit (duplicate of past review): States "256 KB" but should be "1 MiB" to match the rest of the README (lines 76, 385, 462, 768, 782, 790) and AWS documentation.
Broken markdown link syntax: The link has backticks around the anchor:
[Payload Offloading](`#payload-offloading`)which renders incorrectly. Should be[Payload Offloading](#payload-offloading).📝 Proposed fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration. +- Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/sqs/README.md` at line 872, Update the sentence to use the correct SQS size limit and fix the markdown link: replace "256 KB" with "1 MiB" to match other README references and AWS docs, and change the link syntax from [Payload Offloading](`#payload-offloading`) to [Payload Offloading](`#payload-offloading`) so the anchor renders correctly; ensure the note still mentions combining compression with the Payload Offloading flow and that payloadRef.codec records the algorithm.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/codec/README.md`:
- Around line 47-51: The example in README.md uses resolveCodecHandler but does
not import it; update the import list at the top (where buildCodecEnvelope is
imported) to also import resolveCodecHandler (and any other required symbols
like MessageCodecEnum if not present) so the snippet runs as-is; locate the
import statement that currently reads "import { buildCodecEnvelope } from
'`@message-queue-toolkit/codec`'" and add resolveCodecHandler to that named
import.
---
Outside diff comments:
In `@packages/core/lib/queues/AbstractQueueService.ts`:
- Around line 835-860: When parsedPayload.payloadRef?.codec is present but the
decompress function is undefined, the code must return an explicit error instead
of falling through and treating compressed bytes as JSON; update the branch
around codec handling in AbstractQueueService (the block using
parsedPayload.payloadRef?.codec, decompress, and
streamWithKnownSizeToBuffer/streamWithKnownSizeToString) to detect codec &&
!decompress and immediately return an Error (with a clear message and cause if
appropriate) indicating "no decompressor provided for codec <codec>" so the
later JSON.parse path (using streamWithKnownSizeToString) is not attempted on
compressed data.
In `@packages/core/lib/utils/streamUtils.ts`:
- Around line 13-14: The code currently copies chunkBuffer into buffer without
checking bounds, then increments offset by chunkBuffer.length which breaks
offset tracking if the stream exceeds the provided size; in the stream
processing function in streamUtils.ts (the block using variables buffer, offset,
chunkBuffer and size) add a pre-copy check: if offset + chunkBuffer.length >
size, reject/throw an error (or return a failed Promise) indicating the stream
is oversized; otherwise perform Buffer.copy and increment offset by the actual
number of bytes copied (or chunkBuffer.length if safe). Ensure the error/path
prevents silent truncation and surfaces the oversized-stream condition to
callers.
---
Duplicate comments:
In `@packages/sqs/README.md`:
- Line 872: Update the sentence to use the correct SQS size limit and fix the
markdown link: replace "256 KB" with "1 MiB" to match other README references
and AWS docs, and change the link syntax from [Payload
Offloading](`#payload-offloading`) to [Payload Offloading](`#payload-offloading`)
so the anchor renders correctly; ensure the note still mentions combining
compression with the Payload Offloading flow and that payloadRef.codec records
the algorithm.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e7a677e7-4dda-4dd8-8852-fcad4c20b50f
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (28)
.github/workflows/ci.ymlpackages/codec/README.mdpackages/codec/lib/codec/codecHandler.tspackages/codec/lib/index.tspackages/codec/package.jsonpackages/codec/tsconfig.build.jsonpackages/codec/tsconfig.jsonpackages/core/README.mdpackages/core/lib/codec/messageCodec.tspackages/core/lib/payload-store/offloadedPayloadMessageSchemas.tspackages/core/lib/queues/AbstractQueueService.tspackages/core/lib/types/queueOptionsTypes.tspackages/core/lib/utils/streamUtils.tspackages/core/test/queues/AbstractQueueService.offload.spec.tspackages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.tspackages/gcp-pubsub/package.jsonpackages/sns/README.mdpackages/sns/lib/sns/AbstractSnsPublisher.tspackages/sns/package.jsonpackages/sns/test/consumers/SnsSqsPermissionConsumer.tspackages/sqs/README.mdpackages/sqs/lib/index.tspackages/sqs/lib/sqs/AbstractSqsConsumer.tspackages/sqs/lib/sqs/AbstractSqsPublisher.tspackages/sqs/package.jsonpackages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.tspackages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.tspackages/sqs/test/consumers/SqsPermissionConsumer.ts
✅ Files skipped from review due to trivial changes (6)
- packages/gcp-pubsub/package.json
- packages/codec/tsconfig.json
- packages/codec/lib/index.ts
- packages/core/README.md
- packages/codec/package.json
- packages/sns/README.md
🚧 Files skipped from review as they are similar to previous changes (8)
- packages/sqs/lib/index.ts
- packages/sqs/package.json
- packages/sqs/test/consumers/SqsPermissionConsumer.ts
- packages/sns/package.json
- packages/core/lib/codec/messageCodec.ts
- packages/core/lib/types/queueOptionsTypes.ts
- packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
- packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
For the inline codec path, read the raw SQS message body from an isolated queue (no consumer) using ReceiveMessageCommand and assert: - body is a JSON codec envelope with __codec === 'zstd' - __data decodes from base64 to a valid zstd frame (magic bytes 28 B5 2F FD) For the codec + payload offloading path, assert: - SQS message body is a plain JSON pointer (no __codec field), with payloadRef.codec === 'zstd' confirming which algorithm was used - S3 object contains raw compressed binary, not a JSON envelope (first 4 bytes match the zstd magic number 0xFD2FB528) Also add getObjectBuffer() to s3Utils for reading S3 objects as raw Buffer without UTF-8 decoding, and fix missing resolveCodecHandler import in codec README example snippet. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
codecoption to publisher and consumer options (currently supports'zstd') that compresses outgoing messages using zstd before sending to SNS/SQS{ "__codec": "zstd", "__data": "<base64>" }so consumers auto-detect and decompress without requiring explicit codec configurationChanges
packages/core— newpackages/core/lib/codec/messageCodec.tswithcompressMessageBody/decompressMessageBody/isCodecEnvelopeutilities using@mongodb-js/zstd;codec?: MessageCodecadded toCommonQueueOptionspackages/sqs—AbstractSqsPublisher.sendMessage()compresses body when codec is set;AbstractSqsConsumer.resolveMaybeOffloadedPayloadMessage()decompresses codec envelopes before schema validationpackages/sns—AbstractSnsPublisher.sendMessage()compresses body when codec is set (decompression flows through the shared SQS consumer path)packages/sqs,packages/sns— devDependencies changed from"*"to"workspace:*"for workspace packages to ensure consistent type resolution during development (pnpm replacesworkspace:*with the concrete version on publish, no impact on npm consumers)Test plan
pnpm --filter @message-queue-toolkit/sqs test— runsSqsPermissionConsumer.codec.spec.ts(4 tests: round-trip, sequence, external compressor, auto-detection)pnpm --filter @message-queue-toolkit/sns test— runsSnsSqsPermissionConsumer.codec.spec.ts(3 tests: round-trip, sequence, auto-detection through SNS envelope)Summary by CodeRabbit
New Features
Documentation