diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1796fc95..9c6cc4bf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,6 +37,7 @@ jobs: run: | declare -A PATH_TO_NAME=( ["packages/amqp"]="@message-queue-toolkit/amqp" + ["packages/codec"]="@message-queue-toolkit/codec" ["packages/core"]="@message-queue-toolkit/core" ["packages/gcp-pubsub"]="@message-queue-toolkit/gcp-pubsub" ["packages/gcs-payload-store"]="@message-queue-toolkit/gcs-payload-store" diff --git a/biome.json b/biome.json index e55fa69d..b0390747 100644 --- a/biome.json +++ b/biome.json @@ -15,5 +15,17 @@ "noUnusedPrivateClassMembers": "off" } } - } + }, + "overrides": [ + { + "includes": ["**/bench/**"], + "linter": { + "rules": { + "suspicious": { + "noConsole": "off" + } + } + } + } + ] } diff --git a/packages/codec/README.md b/packages/codec/README.md new file mode 100644 index 00000000..d915544f --- /dev/null +++ b/packages/codec/README.md @@ -0,0 +1,84 @@ +# @message-queue-toolkit/codec + +Message compression codec implementations for [message-queue-toolkit](https://github.com/kibertoad/message-queue-toolkit). + +This package provides the concrete codec implementations (e.g. zstd) used by the SQS and SNS adapters. The codec interfaces and types (`MessageCodecEnum`, `MessageCodecHandler`, `CodecEnvelope`) live in `@message-queue-toolkit/core`. + +## Installation + +```sh +npm install @message-queue-toolkit/codec @message-queue-toolkit/core +``` + +> **Requirements:** Node.js >=22.15.0 (uses the built-in `zlib` zstd support). + +## Usage + +Codec options are typically set on the publisher/consumer constructor in the SQS or SNS adapter packages. You do not need to interact with this package directly unless you are building a custom adapter. + +### How compression works during publish + +When `codec` is set on a publisher, compression happens **exactly once** at the start of `publish()`, before any other processing: + +1. The message JSON is compressed to a raw `Buffer`. +2. If a payload store is configured **and** the compressed size exceeds `messageSizeThreshold`, the compressed bytes are stored in S3 and only a lightweight pointer is sent. The codec name is recorded in `payloadRef.codec` so the consumer can decompress after retrieval. +3. If the compressed size fits within the threshold (or no store is configured), the message is sent inline as a self-describing codec envelope. + +The payload is never compressed twice. The same compressed `Buffer` from step 1 is either uploaded to S3 or wrapped in the envelope — whichever path is taken. + +### Compress / decompress a message body + +```typescript +import { compressMessageBody, decompressMessageBody } from '@message-queue-toolkit/codec' +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +// Compress (returns a JSON string containing the codec envelope) +const compressed = await compressMessageBody(JSON.stringify(payload), MessageCodecEnum.ZSTD) + +// Decompress (parses the envelope and returns the original object) +const original = await decompressMessageBody(JSON.parse(compressed)) +``` + +### Build a codec envelope from already-compressed bytes + +When you have pre-compressed bytes (e.g., from `resolveCodecHandler(codec).compress(...)`) and want to produce the envelope string without compressing again: + +```typescript +import { buildCodecEnvelope, resolveCodecHandler } from '@message-queue-toolkit/codec' +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +const handler = resolveCodecHandler(MessageCodecEnum.ZSTD) +const compressed: Buffer = await handler.compress(Buffer.from(JSON.stringify(payload), 'utf8')) + +// Build envelope without a second compression pass +const envelopeString = buildCodecEnvelope(compressed, MessageCodecEnum.ZSTD) +// → '{"__codec":"zstd","__data":""}' +``` + +### Custom codec handler + +```typescript +import type { MessageCodecHandler } from '@message-queue-toolkit/core' + +class MyCodecHandler implements MessageCodecHandler { + compress(data: Buffer): Promise { /* ... */ } + decompress(data: Buffer): Promise { /* ... */ } +} +``` + +## Codec envelope format + +Compressed messages are wrapped in a self-describing JSON envelope: + +```json +{ + "__codec": "zstd", + "__data": "" +} +``` + +Consumers auto-detect this envelope and decompress transparently, even if the `codec` option is not set on the consumer. + +## License + +MIT diff --git a/packages/codec/lib/codec/codecHandler.ts b/packages/codec/lib/codec/codecHandler.ts new file mode 100644 index 00000000..e39eebc0 --- /dev/null +++ b/packages/codec/lib/codec/codecHandler.ts @@ -0,0 +1,56 @@ +import { promisify } from 'node:util' +import zlib from 'node:zlib' +import type { CodecEnvelope, MessageCodec, MessageCodecHandler } from '@message-queue-toolkit/core' +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +if (typeof zlib.zstdCompress !== 'function' || typeof zlib.zstdDecompress !== 'function') { + throw new Error( + 'zlib.zstdCompress and zlib.zstdDecompress are not available in this Node.js version. ' + + '@message-queue-toolkit/codec requires Node.js >=22.15.0 or >=23.8.0.', + ) +} + +const zstdCompress = promisify(zlib.zstdCompress) +const zstdDecompress = promisify(zlib.zstdDecompress) + +export class ZstdCodecHandler implements MessageCodecHandler { + compress(data: Buffer): Promise { + return zstdCompress(data) + } + + decompress(data: Buffer): Promise { + return zstdDecompress(data) + } +} + +const ZSTD_HANDLER = new ZstdCodecHandler() + +export function resolveCodecHandler(codec: MessageCodec): MessageCodecHandler { + if (codec === MessageCodecEnum.ZSTD) return ZSTD_HANDLER + throw new Error(`Unsupported codec: ${codec}`) +} + +export async function compressMessageBody(jsonBody: string, codec: MessageCodec): Promise { + const handler = resolveCodecHandler(codec) + const compressed = await handler.compress(Buffer.from(jsonBody, 'utf8')) + return buildCodecEnvelope(compressed, codec) +} + +/** + * Wraps an already-compressed buffer in a codec envelope string. + * Use this when you have pre-compressed bytes and want to avoid compressing twice. + */ +export function buildCodecEnvelope(compressed: Buffer, codec: MessageCodec): string { + const envelope: CodecEnvelope = { + __codec: codec, + __data: compressed.toString('base64'), + } + return JSON.stringify(envelope) +} + +export async function decompressMessageBody(envelope: CodecEnvelope): Promise { + const handler = resolveCodecHandler(envelope.__codec) + const compressed = Buffer.from(envelope.__data, 'base64') + const decompressed = await handler.decompress(compressed) + return JSON.parse(decompressed.toString('utf8')) +} diff --git a/packages/codec/lib/index.ts b/packages/codec/lib/index.ts new file mode 100644 index 00000000..ed36e669 --- /dev/null +++ b/packages/codec/lib/index.ts @@ -0,0 +1,7 @@ +export { + buildCodecEnvelope, + compressMessageBody, + decompressMessageBody, + resolveCodecHandler, + ZstdCodecHandler, +} from './codec/codecHandler.ts' diff --git a/packages/codec/package.json b/packages/codec/package.json new file mode 100644 index 00000000..aca4b8cf --- /dev/null +++ b/packages/codec/package.json @@ -0,0 +1,58 @@ +{ + "name": "@message-queue-toolkit/codec", + "version": "1.0.0", + "private": false, + "license": "MIT", + "description": "Message compression codec implementations for message-queue-toolkit", + "maintainers": [ + { + "name": "Igor Savin", + "email": "kibertoad@gmail.com" + } + ], + "type": "module", + "main": "./dist/index.js", + "exports": { + ".": "./dist/index.js", + "./package.json": "./package.json" + }, + "scripts": { + "build": "pnpm run clean && tsc --project tsconfig.build.json", + "clean": "rimraf dist", + "lint": "biome check . && tsc", + "lint:fix": "biome check --write .", + "prepublishOnly": "pnpm run lint && pnpm run build" + }, + "engines": { + "node": ">=22.15.0" + }, + "peerDependencies": { + "@message-queue-toolkit/core": ">=25.0.0" + }, + "devDependencies": { + "@biomejs/biome": "^2.3.8", + "@lokalise/biome-config": "^3.1.0", + "@lokalise/tsconfig": "^3.0.0", + "@message-queue-toolkit/core": "workspace:*", + "@types/node": "^25.0.2", + "rimraf": "^6.0.1", + "typescript": "^5.9.3" + }, + "homepage": "https://github.com/kibertoad/message-queue-toolkit", + "repository": { + "type": "git", + "url": "git://github.com/kibertoad/message-queue-toolkit.git" + }, + "keywords": [ + "message", + "queue", + "codec", + "compression", + "zstd" + ], + "files": [ + "README.md", + "LICENSE", + "dist/*" + ] +} diff --git a/packages/codec/tsconfig.build.json b/packages/codec/tsconfig.build.json new file mode 100644 index 00000000..198dcfd5 --- /dev/null +++ b/packages/codec/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": ["./tsconfig.json", "@lokalise/tsconfig/build-public-lib"], + "include": ["lib/**/*"] +} diff --git a/packages/codec/tsconfig.json b/packages/codec/tsconfig.json new file mode 100644 index 00000000..8dca583f --- /dev/null +++ b/packages/codec/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "@lokalise/tsconfig/tsc", + "include": ["lib/**/*"] +} diff --git a/packages/core/README.md b/packages/core/README.md index 125a1285..ce1139be 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -641,6 +641,17 @@ class MyPayloadStore implements PayloadStore { } ``` +#### Interaction with codec (compression) + +When both `codec` and `payloadStoreConfig` are set on a publisher, compression and offloading work together with a single compression pass: + +1. The message is compressed **once** at publish time. +2. The **compressed** size is compared against `messageSizeThreshold`. +3. If the compressed size exceeds the threshold, the raw compressed bytes are stored in the payload store. The codec name is written to `payloadRef.codec` so the consumer knows how to decompress after retrieval. +4. If the compressed size fits within the threshold, the message is sent inline as a self-describing codec envelope — S3 is never touched. + +This means compression can prevent offloading entirely for messages that are large before compression but small after. + ## API Reference ### Types diff --git a/packages/core/lib/codec/messageCodec.ts b/packages/core/lib/codec/messageCodec.ts new file mode 100644 index 00000000..6ecc46dd --- /dev/null +++ b/packages/core/lib/codec/messageCodec.ts @@ -0,0 +1,49 @@ +type ObjectValues = T[keyof T] + +/** + * Supported message compression codecs. + * + * Use the enum values instead of raw strings so that adding a new codec in + * the future is a single-place change and consumers benefit from IDE + * auto-complete. + * + * @example + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD }) + */ +export const MessageCodecEnum = { + /** zstd compression via Node.js built-in `zlib` (requires Node.js >=22.15.0). */ + ZSTD: 'zstd', +} as const +export type MessageCodec = ObjectValues + +const CODEC_FIELD = '__codec' +const DATA_FIELD = '__data' + +export type CodecEnvelope = { + [CODEC_FIELD]: MessageCodec + [DATA_FIELD]: string +} + +/** + * Low-level interface for a compression codec. + * + * Implement this interface to plug in a custom compression algorithm. + * The built-in implementation (`ZstdCodecHandler` in `@message-queue-toolkit/sqs`) + * uses Node.js built-in `zlib` zstd support. + */ +export interface MessageCodecHandler { + compress(data: Buffer): Promise + decompress(data: Buffer): Promise +} + +export function isCodecEnvelope(value: unknown): value is CodecEnvelope { + const record = value as Record + return ( + typeof value === 'object' && + value !== null && + CODEC_FIELD in value && + DATA_FIELD in value && + (Object.values(MessageCodecEnum) as string[]).includes(record[CODEC_FIELD] as string) && + typeof record[DATA_FIELD] === 'string' + ) +} diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index bb97ea60..6d23f0ca 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -1,3 +1,10 @@ +export { + type CodecEnvelope, + isCodecEnvelope, + type MessageCodec, + MessageCodecEnum, + type MessageCodecHandler, +} from './codec/messageCodec.ts' export { DoNotProcessMessageError } from './errors/DoNotProcessError.ts' export { isMessageError, diff --git a/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts b/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts index fa62f183..29bc770f 100644 --- a/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts +++ b/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts @@ -11,6 +11,12 @@ export const PAYLOAD_REF_SCHEMA = z.object({ store: z.string().min(1), /** Size of the payload in bytes */ size: z.number().int().positive(), + /** + * Codec used to compress the stored payload. + * When set, the stored bytes are raw compressed binary (not base64 JSON). + * The consumer must decompress using this codec before parsing. + */ + codec: z.string().optional(), }) export type PayloadRef = z.output diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 40fa618b..c42c926a 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -1,3 +1,4 @@ +import { Readable } from 'node:stream' import { types } from 'node:util' import { type CommonLogger, @@ -14,6 +15,7 @@ import { } from '@message-queue-toolkit/schemas' import { getProperty, setProperty } from 'dot-prop' import type { ZodSchema, ZodType } from 'zod/v4' +import type { MessageCodec } from '../codec/messageCodec.ts' import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors.ts' import { type AcquireLockTimeoutError, @@ -48,7 +50,7 @@ import type { QueueOptions, } from '../types/queueOptionsTypes.ts' import { isRetryDateExceeded } from '../utils/dateUtils.ts' -import { streamWithKnownSizeToString } from '../utils/streamUtils.ts' +import { streamWithKnownSizeToBuffer, streamWithKnownSizeToString } from '../utils/streamUtils.ts' import { toDatePreprocessor } from '../utils/toDateProcessor.ts' import type { BarrierCallback, @@ -135,6 +137,7 @@ export abstract class AbstractQueueService< protected readonly messageDeduplicationConfig?: MessageDeduplicationConfig protected readonly messageMetricsManager?: MessageMetricsManager protected readonly _handlerSpy?: HandlerSpy + protected readonly codec?: MessageCodec protected isInitted: boolean @@ -172,6 +175,7 @@ export abstract class AbstractQueueService< } : undefined this.messageDeduplicationConfig = options.messageDeduplicationConfig + this.codec = options.codec this.logMessages = options.logMessages ?? false this._handlerSpy = resolveHandlerSpy(options) @@ -657,49 +661,30 @@ export abstract class AbstractQueueService< } /** - * Offload message payload to an external store if it exceeds the threshold. - * Returns a special type that contains a pointer to the offloaded payload or the original payload if it was not offloaded. - * Requires message size as only the implementation knows how to calculate it. + * Builds an OffloadedPayloadPointerPayload from the given message and storage metadata. + * Copies identity fields and preserves the message type field through offloading. * - * For multi-store configuration, uses the configured outgoingStore. - * For single-store configuration, uses the single store. - * - * The returned payload includes both the new payloadRef format and legacy fields for backward compatibility. + * We default to the conventional top-level `type` path so that routing/identity fields are + * handled consistently with `messageIdField`/`messageTimestampField`/etc. Without this + * fallback, `messageTypeResolver` modes that don't specify a body path silently strip `type` + * from the offloaded body, breaking downstream SNS subscription FilterPolicy filters. */ - protected async offloadMessagePayloadIfNeeded( + private buildPointer( message: MessagePayloadSchemas, - messageSizeFn: () => number, - ): Promise { - if ( - !this.payloadStoreConfig || - messageSizeFn() <= this.payloadStoreConfig.messageSizeThreshold - ) { - return message - } - - const { store, storeName } = this.resolveOutgoingStore() - const serializedPayload = await this.payloadStoreConfig.serializer.serialize(message) - - let payloadId: string - try { - payloadId = await store.storePayload(serializedPayload) - } finally { - if (isDestroyable(serializedPayload)) { - await serializedPayload.destroy() - } - } - - // Return message with both new and legacy formats for backward compatibility + payloadId: string, + storeName: string, + size: number, + codec?: MessageCodec, + ): OffloadedPayloadPointerPayload { const result: OffloadedPayloadPointerPayload = { - // Extended payload reference format payloadRef: { id: payloadId, store: storeName, - size: serializedPayload.size, + size, + ...(codec ? { codec } : {}), }, - // Legacy format for backward compatibility offloadedPayloadPointer: payloadId, - offloadedPayloadSize: serializedPayload.size, + offloadedPayloadSize: size, // @ts-expect-error [this.messageIdField]: message[this.messageIdField], // @ts-expect-error @@ -710,14 +695,6 @@ export abstract class AbstractQueueService< [this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField], } - // Preserve the message type field through offloading. We default to the conventional - // top-level `type` path so that routing/identity fields are handled consistently with - // `messageIdField`/`messageTimestampField`/etc., which have defaulted names ('id', - // 'timestamp', ...) and are always copied across when present. Without this fallback, - // `messageTypeResolver` modes that don't specify a body path (no resolver, `literal`, - // or `resolver`) silently strip `type` from the offloaded SNS body, which then breaks - // any downstream subscription whose FilterPolicy filters on `type` - // (FilterPolicyScope: 'MessageBody'). const typePath = this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver) ? this.messageTypeResolver.messageTypePath @@ -730,14 +707,82 @@ export abstract class AbstractQueueService< return result } + /** + * Offloads the message payload to the configured store if it exceeds the size threshold. + * Returns null if no offloading is needed (store not configured or message fits within threshold). + * + * For multi-store configuration, uses the configured outgoingStore. + * For single-store configuration, uses the single store. + * + * The returned pointer includes both the new payloadRef format and legacy fields for backward + * compatibility. The message type field is always preserved through offloading. + */ + protected async offloadPayload( + message: MessagePayloadSchemas, + messageSizeFn: () => number, + ): Promise { + if (!this.payloadStoreConfig) { + return null + } + + if (messageSizeFn() <= this.payloadStoreConfig.messageSizeThreshold) { + return null + } + + const { store, storeName } = this.resolveOutgoingStore() + const serializedPayload = await this.payloadStoreConfig.serializer.serialize(message) + + let payloadId: string + try { + payloadId = await store.storePayload(serializedPayload) + } finally { + if (isDestroyable(serializedPayload)) { + await serializedPayload.destroy() + } + } + + return this.buildPointer(message, payloadId, storeName, serializedPayload.size) + } + + /** + * Stores an already-compressed payload in the configured store. + * The `codec` name is recorded in payloadRef so the consumer can decompress after retrieval. + * + * The threshold check is NOT performed here — callers must decide whether to offload. + * Use this when compression has already been done and the compressed size exceeds the threshold. + * + * @throws Error if payload store is not configured + */ + protected async offloadCompressedPayload( + message: MessagePayloadSchemas, + compressed: Buffer, + codec: MessageCodec, + ): Promise { + if (!this.payloadStoreConfig) { + throw new Error('Payload store is not configured') + } + + const { store, storeName } = this.resolveOutgoingStore() + const payloadId = await store.storePayload({ + value: Readable.from(compressed), + size: compressed.byteLength, + }) + + return this.buildPointer(message, payloadId, storeName, compressed.byteLength, codec) + } + /** * Retrieve previously offloaded message payload using provided pointer payload. * Returns the original payload or an error if the payload was not found or could not be parsed. * * Supports both new multi-store format (payloadRef) and legacy format (offloadedPayloadPointer). + * + * When `decompress` is provided and the pointer's `payloadRef.codec` matches, the fetched bytes + * are treated as raw compressed binary and decompressed before JSON parsing. */ protected async retrieveOffloadedMessagePayload( maybeOffloadedPayloadPointerPayload: unknown, + decompress?: (codec: string, data: Buffer) => Promise, ): Promise> { if (!this.payloadStoreConfig) { return { @@ -787,6 +832,24 @@ export abstract class AbstractQueueService< } } + const codec = parsedPayload.payloadRef?.codec + if (codec && decompress) { + try { + const compressedBuffer = await streamWithKnownSizeToBuffer( + serializedOffloadedPayloadReadable, + payloadSize, + ) + const decompressed = await decompress(codec, compressedBuffer) + return { result: JSON.parse(decompressed.toString('utf8')) } + } catch (e) { + return { + error: new Error(`Failed to decompress offloaded payload with codec "${codec}"`, { + cause: e, + }), + } + } + } + const serializedOffloadedPayloadString = await streamWithKnownSizeToString( serializedOffloadedPayloadReadable, payloadSize, diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index d8f26445..204f37a8 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -1,5 +1,6 @@ import type { CommonLogger, ErrorReporter, ErrorResolver } from '@lokalise/node-core' import type { ZodSchema } from 'zod/v4' +import type { MessageCodec } from '../codec/messageCodec.ts' import type { MessageDeduplicationConfig } from '../message-deduplication/messageDeduplicationTypes.ts' import type { PayloadStoreConfig } from '../payload-store/payloadStoreTypes.ts' import type { MessageHandlerConfig } from '../queues/HandlerContainer.ts' @@ -139,6 +140,27 @@ export type CommonQueueOptions = { deletionConfig?: DeletionConfig payloadStoreConfig?: PayloadStoreConfig messageDeduplicationConfig?: MessageDeduplicationConfig + /** + * Compression codec applied to message bodies. + * + * - **Publisher**: every outgoing message body is compressed and wrapped in a + * self-describing envelope `{ __codec: 'zstd', __data: '' }`. + * - **Consumer**: when set, the consumer expects compressed messages. + * Even without this option, consumers auto-detect and decompress any message + * that carries a codec envelope, so mixed queues work transparently. + * + * Uses Node.js built-in `zlib` zstd support — **requires Node.js >=22.15.0** (or >=23.8.0). + * + * @example + * import { MessageCodecEnum } from '@message-queue-toolkit/core' + * + * // Publisher + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD }) + * + * // Consumer (optional — auto-detection handles it even without this) + * new MyConsumer(deps, { codec: MessageCodecEnum.ZSTD }) + */ + codec?: MessageCodec } export type CommonCreationConfigType = { diff --git a/packages/core/lib/utils/streamUtils.ts b/packages/core/lib/utils/streamUtils.ts index 7ea2155f..7e69617b 100644 --- a/packages/core/lib/utils/streamUtils.ts +++ b/packages/core/lib/utils/streamUtils.ts @@ -1,6 +1,6 @@ import type { Readable } from 'node:stream' -export async function streamWithKnownSizeToString(stream: Readable, size: number): Promise { +export async function streamWithKnownSizeToBuffer(stream: Readable, size: number): Promise { const buffer = Buffer.alloc(size) let offset = 0 @@ -14,5 +14,10 @@ export async function streamWithKnownSizeToString(stream: Readable, size: number offset += chunkBuffer.length } - return buffer.toString('utf8', 0, offset) + return buffer.subarray(0, offset) +} + +export async function streamWithKnownSizeToString(stream: Readable, size: number): Promise { + const buffer = await streamWithKnownSizeToBuffer(stream, size) + return buffer.toString('utf8') } diff --git a/packages/core/test/queues/AbstractQueueService.offload.spec.ts b/packages/core/test/queues/AbstractQueueService.offload.spec.ts index 2d6142ac..32167f30 100644 --- a/packages/core/test/queues/AbstractQueueService.offload.spec.ts +++ b/packages/core/test/queues/AbstractQueueService.offload.spec.ts @@ -1,5 +1,5 @@ /** - * Regression tests for `AbstractQueueService.offloadMessagePayloadIfNeeded`. + * Regression tests for `AbstractQueueService.offloadPayload`. * * Identity fields (`messageIdField`, `messageTimestampField`, `messageDeduplicationIdField`, * `messageDeduplicationOptionsField`) all have defaulted names ('id', 'timestamp', ...) and @@ -58,7 +58,7 @@ class TestQueueService extends AbstractQueueService< // Expose protected method for direct testing. public callOffload(message: TestMessage, sizeFn: () => number) { - return this.offloadMessagePayloadIfNeeded(message, sizeFn) + return this.offloadPayload(message, sizeFn) } } @@ -102,7 +102,7 @@ const baseMessage: TestMessage = { payload: { large: 'data' }, } -describe('AbstractQueueService.offloadMessagePayloadIfNeeded — `type` preservation', () => { +describe('AbstractQueueService.offloadPayload — `type` preservation', () => { it('preserves `type` when no messageTypeResolver is configured', async () => { const svc = buildService(undefined) const result = (await svc.callOffload( diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts index 5927afa0..067fdc58 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts @@ -69,11 +69,12 @@ export abstract class AbstractPubSubPublisher const parsedMessage = messageSchemaResult.result.parse(message) message = this.updateInternalProperties(message) - const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => { - // Calculate message size for PubSub - const messageData = Buffer.from(JSON.stringify(message)) - return messageData.length - }) + const maybeOffloadedPayloadMessage = + (await this.offloadPayload(message, () => { + // Calculate message size for PubSub + const messageData = Buffer.from(JSON.stringify(message)) + return messageData.length + })) ?? message if ( this.isDeduplicationEnabledForMessage(parsedMessage) && diff --git a/packages/gcp-pubsub/package.json b/packages/gcp-pubsub/package.json index e731daef..b43d284d 100644 --- a/packages/gcp-pubsub/package.json +++ b/packages/gcp-pubsub/package.json @@ -40,7 +40,7 @@ "@biomejs/biome": "^2.3.8", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", + "@message-queue-toolkit/core": "workspace:*", "@message-queue-toolkit/gcs-payload-store": "*", "@message-queue-toolkit/redis-message-deduplication-store": "*", "@message-queue-toolkit/schemas": "*", diff --git a/packages/sns/README.md b/packages/sns/README.md index 3eb9cb2e..29f07ac1 100644 --- a/packages/sns/README.md +++ b/packages/sns/README.md @@ -43,6 +43,7 @@ npm install @message-queue-toolkit/sns @message-queue-toolkit/sqs @message-queue - `@aws-sdk/client-sqs` - AWS SDK for SQS (required for consumers) - `@aws-sdk/client-sts` - AWS SDK for STS (for ARN resolution) - `zod` - Schema validation +- `@message-queue-toolkit/codec` - Required when using message compression ## Features @@ -58,6 +59,7 @@ npm install @message-queue-toolkit/sns @message-queue-toolkit/sqs @message-queue - ✅ **Handler spies** for testing - ✅ **Pre-handlers and barriers** for complex message processing - ✅ **Cross-account and cross-region publishing** +- ✅ **Message compression** with zstd via Node.js built-in `zlib` (Node.js >=22.15.0 required) ## Core Concepts @@ -683,6 +685,9 @@ await consumer.start() // Optional - Payload Offloading (same as SQS) payloadStoreConfig: { /* ... */ }, + // Optional - Compression (Node.js >=22.15.0 required) + codec: MessageCodecEnum.ZSTD, // Compress every outgoing message with zstd + // Optional - Deletion deletionConfig: { /* ... */ }, } @@ -1005,6 +1010,7 @@ SNS consumers inherit all advanced features from SQS consumers. See the SQS READ - **[Message Retry Logic](../sqs/README.md#message-retry-logic)** - Exponential backoff and retry limits - **[Message Deduplication](../sqs/README.md#message-deduplication)** - Publisher and consumer-level deduplication - **[Payload Offloading](../sqs/README.md#payload-offloading)** - S3 storage for large messages +- **[Message Compression](../sqs/README.md#message-compression)** - zstd compression via Node.js built-in `zlib` - **[Message Handlers](../sqs/README.md#message-handlers)** - Type-safe handler configuration - **[Pre-handlers and Barriers](../sqs/README.md#pre-handlers-and-barriers)** - Middleware and message dependencies - **[Handler Spies](../sqs/README.md#handler-spies)** - Testing async message flows diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index f6c14ad4..090f27b1 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -2,6 +2,7 @@ import type { MessageAttributeValue } from '@aws-sdk/client-sns' import { PublishCommand } from '@aws-sdk/client-sns' import type { Either } from '@lokalise/node-core' import { InternalError } from '@lokalise/node-core' +import { buildCodecEnvelope, resolveCodecHandler } from '@message-queue-toolkit/codec' import { type AsyncPublisher, type BarrierResult, @@ -131,10 +132,7 @@ export abstract class AbstractSnsPublisher // (offloaded payload won't have user fields needed for messageGroupIdField) const resolvedOptions = this.resolveFifoOptions(updatedMessage, options) - const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded( - updatedMessage, - () => calculateOutgoingMessageSize(updatedMessage), - ) + const { payload, preBuiltBody } = await this.prepareOutgoingPayload(updatedMessage) if ( this.isDeduplicationEnabledForMessage(parsedMessage) && @@ -150,7 +148,7 @@ export abstract class AbstractSnsPublisher return } - await this.sendMessage(maybeOffloadedPayloadMessage, resolvedOptions) + await this.sendMessage(payload, resolvedOptions, preBuiltBody) this.handleMessageProcessed({ message: parsedMessage, @@ -206,13 +204,51 @@ export abstract class AbstractSnsPublisher return this.isDeduplicationEnabled && super.isDeduplicationEnabledForMessage(message) } + /** + * Compresses (when codec is set) or offloads (when store is configured) the message. + * Returns the payload to send and an optional pre-built body string. + * When preBuiltBody is set, it is a ready-to-send codec envelope — sendMessage must use it as-is. + */ + private async prepareOutgoingPayload(message: MessagePayloadType): Promise<{ + payload: MessagePayloadType | OffloadedPayloadPointerPayload + preBuiltBody?: string + }> { + const codec = this.codec + + if (codec) { + // Compress once up-front, then decide: offload the compressed bytes or send inline. + const compressed = await resolveCodecHandler(codec).compress( + Buffer.from(JSON.stringify(message), 'utf8'), + ) + + if ( + this.payloadStoreConfig && + compressed.byteLength > this.payloadStoreConfig.messageSizeThreshold + ) { + return { payload: await this.offloadCompressedPayload(message, compressed, codec) } + } + + return { payload: message, preBuiltBody: buildCodecEnvelope(compressed, codec) } + } + + return { + payload: + (await this.offloadPayload(message, () => calculateOutgoingMessageSize(message))) ?? + message, + } + } + protected async sendMessage( payload: MessagePayloadType | OffloadedPayloadPointerPayload, options: SNSMessageOptions, + preBuiltBody?: string, ): Promise { const attributes = resolveOutgoingMessageAttributes(payload) + // preBuiltBody is set when codec is active and the payload was not offloaded — + // it contains the already-compressed codec envelope, so we skip re-serialization. + const body = preBuiltBody ?? JSON.stringify(payload) const command = new PublishCommand({ - Message: JSON.stringify(payload), + Message: body, MessageAttributes: attributes, TopicArn: this.topicArn, ...options, diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 9e1113e1..37f2d857 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -1,5 +1,11 @@ import type { SNSClient } from '@aws-sdk/client-sns' import type { STSClient } from '@aws-sdk/client-sts' +import type { Either } from '@lokalise/node-core' +import type { + MessageInvalidFormatError, + MessageValidationError, + ResolvedMessage, +} from '@message-queue-toolkit/core' import type { SQSConsumerDependencies, SQSConsumerOptions, @@ -201,7 +207,9 @@ export abstract class AbstractSnsSqsConsumer< await this.startConsumers() } - protected override resolveMessage(message: SQSMessage) { + protected override resolveMessage( + message: SQSMessage, + ): Either { const result = readSnsMessage(message, this.errorResolver) if (result.result) { return result diff --git a/packages/sns/package.json b/packages/sns/package.json index 305c0d09..39684782 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -38,6 +38,7 @@ "@aws-sdk/client-sns": "^3.632.0", "@aws-sdk/client-sqs": "^3.1034.0", "@aws-sdk/client-sts": "^3.632.0", + "@message-queue-toolkit/codec": ">=1.0.0", "@message-queue-toolkit/core": ">=24.0.0", "@message-queue-toolkit/schemas": ">=7.0.0", "@message-queue-toolkit/sqs": ">=23.0.0", @@ -50,9 +51,10 @@ "@biomejs/biome": "^2.3.6", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", - "@message-queue-toolkit/redis-message-deduplication-store": "*", - "@message-queue-toolkit/s3-payload-store": "*", + "@message-queue-toolkit/codec": "workspace:*", + "@message-queue-toolkit/core": "workspace:*", + "@message-queue-toolkit/redis-message-deduplication-store": "workspace:*", + "@message-queue-toolkit/s3-payload-store": "workspace:*", "@message-queue-toolkit/sqs": "workspace:*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts new file mode 100644 index 00000000..e859c317 --- /dev/null +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts @@ -0,0 +1,107 @@ +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' + +import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher.ts' +import type { TestAwsResourceAdmin } from '../utils/testAdmin.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas.ts' + +describe('SnsSqsPermissionConsumer - zstd codec', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + let publisher: SnsPermissionPublisher + let consumer: SnsSqsPermissionConsumer + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + beforeEach(async () => { + await testAdmin.deleteQueues(SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME) + await testAdmin.deleteTopics(SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME) + + consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + codec: 'zstd', + }) + publisher = new SnsPermissionPublisher(diContainer.cradle, { + codec: 'zstd', + }) + + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await publisher.close() + await consumer.close() + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('publishes a compressed SNS message and consumer decompresses it correctly', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'sns-codec-test-1', + messageType: 'add', + metadata: { info: 'hello sns zstd' }, + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }, 15000) + + it('consumer correctly handles multiple compressed SNS messages in sequence', async () => { + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = [ + { id: 'sns-codec-seq-1', messageType: 'add' }, + { id: 'sns-codec-seq-2', messageType: 'add' }, + { id: 'sns-codec-seq-3', messageType: 'add' }, + ] + + for (const msg of messages) { + await publisher.publish(msg) + } + + for (const msg of messages) { + const result = await consumer.handlerSpy.waitForMessageWithId(msg.id, 'consumed') + expect(result.message).toMatchObject(msg) + } + }, 15000) + + it('consumer without codec option auto-detects and decompresses zstd messages from SNS', async () => { + // Stop the beforeEach consumer so it cannot steal messages from the shared queue + await consumer.close() + + // Consumer without explicit codec — decompression is auto-detected from envelope __codec field + const autoConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + locatorConfig: { + queueUrl: consumer.subscriptionProps.queueUrl, + topicArn: consumer.subscriptionProps.topicArn, + subscriptionArn: consumer.subscriptionProps.subscriptionArn, + }, + }) + await autoConsumer.start() + // Reassign so afterEach closes autoConsumer instead of the already-closed consumer + consumer = autoConsumer + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'sns-codec-auto-1', + messageType: 'add', + } + await publisher.publish(message) + + const result = await autoConsumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }, 15000) +}) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index 68abd72e..5d3548a5 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -41,6 +41,7 @@ type SnsSqsPermissionConsumerOptions = Pick< | 'maxRetryDuration' | 'payloadStoreConfig' | 'concurrentConsumersAmount' + | 'codec' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -148,6 +149,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< deleteIfExists: false, }, payloadStoreConfig: options.payloadStoreConfig, + codec: options.codec, consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately }, diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.ts b/packages/sns/test/publishers/SnsPermissionPublisher.ts index e5c2dda7..659f8d87 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.ts @@ -26,6 +26,7 @@ export class SnsPermissionPublisher extends AbstractSnsPublisher | 'payloadStoreConfig' | 'messageDeduplicationConfig' | 'enablePublisherDeduplication' + | 'codec' >, ) { super(dependencies, { @@ -40,6 +41,7 @@ export class SnsPermissionPublisher extends AbstractSnsPublisher deleteIfExists: false, }, payloadStoreConfig: options?.payloadStoreConfig, + codec: options?.codec, messageSchemas: [PERMISSIONS_ADD_MESSAGE_SCHEMA, PERMISSIONS_REMOVE_MESSAGE_SCHEMA], handlerSpy: true, messageTypeResolver: { messageTypePath: 'messageType' }, diff --git a/packages/sqs/README.md b/packages/sqs/README.md index 686470b6..9eebaa7b 100644 --- a/packages/sqs/README.md +++ b/packages/sqs/README.md @@ -24,6 +24,7 @@ for publishing and consuming messages from both standard and FIFO SQS queues. - [Message Retry Logic](#message-retry-logic) - [Message Deduplication](#message-deduplication) - [Payload Offloading](#payload-offloading) + - [Message Compression](#message-compression) - [Message Handlers](#message-handlers) - [Pre-handlers and Barriers](#pre-handlers-and-barriers) - [Handler Spies](#handler-spies) @@ -48,6 +49,7 @@ npm install @message-queue-toolkit/sqs @message-queue-toolkit/core **Peer Dependencies:** - `@aws-sdk/client-sqs` - AWS SDK for SQS - `zod` - Schema validation +- `@message-queue-toolkit/codec` - Required when using message compression ## Features @@ -62,6 +64,7 @@ npm install @message-queue-toolkit/sqs @message-queue-toolkit/core - ✅ **Handler spies** for testing - ✅ **Pre-handlers and barriers** for complex message processing - ✅ **Automatic queue creation** with validation +- ✅ **Message compression** with zstd via Node.js built-in `zlib` (Node.js >=22.15.0 required) ## Core Concepts @@ -460,6 +463,9 @@ When using `locatorConfig`, you connect to an existing queue without creating it maxPayloadSize: 1024 * 1024, // 1 MiB }, + // Optional - Compression (Node.js >=22.15.0 required) + codec: MessageCodecEnum.ZSTD, // Compress every outgoing message with zstd + // Optional - Deletion deletionConfig: { deleteIfExists: false, // Delete queue on init @@ -538,6 +544,11 @@ When using `locatorConfig`, you connect to an existing queue without creating it payloadStore: s3Store, }, + // Optional - Compression (Node.js >=22.15.0 required) + // Auto-detection is always active: consumers decompress codec envelopes + // even without this option set. + codec: MessageCodecEnum.ZSTD, + // Optional - Other logMessages: false, handlerSpy: true, @@ -788,16 +799,79 @@ await publisher.publish({ }) ``` -**How it works:** +**How it works (without codec):** 1. Publisher checks message size before sending -2. If size exceeds `maxPayloadSize`, stores payload in S3 -3. Replaces payload with pointer: `{ _offloadedPayload: { bucketName, key, size } }` -4. Sends pointer message to SQS -5. Consumer detects pointer, fetches payload from S3 -6. Processes message with full payload +2. If size exceeds `messageSizeThreshold`, serializes and stores payload in S3 +3. Sends a lightweight pointer message to SQS instead +4. Consumer detects the pointer, fetches payload from S3 +5. Processes message with full payload + +**How it works (with codec — compress + offload):** +1. Publisher compresses the serialized message with zstd **once**, up-front +2. If the **compressed** size exceeds `messageSizeThreshold`, stores the compressed bytes in S3 and sends a pointer +3. If the compressed size fits within the threshold, sends the message inline as a codec envelope +4. Consumer fetches the pointer payload as raw bytes, decompresses, then processes as normal + +The codec embedded in `payloadRef.codec` tells the consumer which algorithm to use — no `codec` option is needed on the consumer. **Note:** Payload cleanup is the responsibility of the store (e.g., S3 lifecycle policies). +### Message Compression + +Compress message bodies with zstd using the Node.js built-in `zlib` module. Requires **Node.js >=22.15.0**. + +The codec implementation lives in the separate [`@message-queue-toolkit/codec`](../codec/README.md) package, which must be installed alongside this package when using compression. + +```bash +npm install @message-queue-toolkit/codec +``` + +Compressed messages are **self-describing**: the codec is embedded in the message envelope (`{ __codec: 'zstd', __data: '' }`), so a consumer without `codec` set will still decompress automatically via envelope detection. This allows a gradual rollout — enable compression on the publisher first, consumers adapt without configuration changes. + +#### Publisher + +```typescript +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +class MyPublisher extends AbstractSqsPublisher { + constructor(deps: SQSDependencies) { + super(deps, { + codec: MessageCodecEnum.ZSTD, // compress every outgoing message + creationConfig: { queue: { QueueName: 'my-queue' } }, + // ... + }) + } +} +``` + +#### Consumer + +```typescript +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +class MyConsumer extends AbstractSqsConsumer { + constructor(deps: SQSConsumerDependencies) { + super(deps, { + // Optional: explicitly declare that messages are compressed. + // Without this, consumers still auto-detect and decompress codec envelopes. + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: 'my-queue' } }, + handlers: new MessageHandlerConfigBuilder() + .addConfig(MySchema, myHandler) + .build(), + }, executionContext) + } +} +``` + +#### Notes + +- Compression is applied **after** schema validation and **before** the SQS `SendMessage` call. +- The message is compressed **exactly once**, regardless of whether payload offloading is also configured. When both features are active: the payload is compressed first, and the decision to offload is made against the compressed size (not the raw size). This means smaller payloads after compression may stay inline and never touch S3. +- The compressed bytes are **never re-compressed** when sent inline — the codec envelope is built directly from the first (and only) compression pass. +- Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](#payload-offloading). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration. +- Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type. + ### Message Handlers Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeResolver`): diff --git a/packages/sqs/bench/codec.bench.ts b/packages/sqs/bench/codec.bench.ts new file mode 100644 index 00000000..600a7ca9 --- /dev/null +++ b/packages/sqs/bench/codec.bench.ts @@ -0,0 +1,198 @@ +/** + * Codec benchmarks — publish and consume throughput with vs without zstd compression. + * + * Run: pnpm --filter @message-queue-toolkit/sqs bench + * + * Each benchmark pre-fills queues (consume) or sends N messages (publish) and + * measures wall-clock time, reporting msg/s and the overhead percentage. + * All queues are deleted before and after each case. + */ +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, beforeAll, describe, it } from 'vitest' + +import { SqsPermissionConsumer } from '../test/consumers/SqsPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '../test/consumers/userConsumerSchemas.ts' +import { SqsPermissionPublisher } from '../test/publishers/SqsPermissionPublisher.ts' +import type { TestAwsResourceAdmin } from '../test/utils/testAdmin.ts' +import type { Dependencies } from '../test/utils/testContext.ts' +import { registerDependencies } from '../test/utils/testContext.ts' + +// ─── Configuration ──────────────────────────────────────────────────────────── + +const N = 50 + +/** Small message with minimal payload (~80 B serialised). */ +const SMALL_META: undefined = undefined + +/** + * Large message with repetitive text (~6 KB serialised). + * Repetitive content compresses very well, showing the realistic best case. + */ +const LARGE_META: Record = { + description: 'The quick brown fox jumps over the lazy dog. '.repeat(60), + items: Array.from({ length: 80 }, (_, i) => ({ + id: `item-${i}`, + value: `value-number-${i}`, + enabled: i % 2 === 0, + })), +} + +const CASES = [ + { label: 'small payload (~80 B) ', suffix: 'sm', meta: SMALL_META }, + { label: 'large payload (~6 KB) ', suffix: 'lg', meta: LARGE_META }, +] as const + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function makeMessages( + prefix: string, + count: number, + meta: Record | undefined, +): PERMISSIONS_ADD_MESSAGE_TYPE[] { + return Array.from({ length: count }, (_, i) => ({ + id: `${prefix}-${i}`, + messageType: 'add' as const, + ...(meta !== undefined ? { metadata: meta } : {}), + })) +} + +function printRow(label: string, count: number, plainMs: number, codecMs: number): void { + const tps = (ms: number) => ((count / ms) * 1000).toFixed(0).padStart(6) + const diff = codecMs - plainMs + const pct = ((diff / plainMs) * 100).toFixed(1) + const sign = diff >= 0 ? '+' : '' + console.log( + ` ${label}` + + ` plain: ${String(plainMs.toFixed(0)).padStart(5)} ms (${tps(plainMs)} msg/s)` + + ` zstd: ${String(codecMs.toFixed(0)).padStart(5)} ms (${tps(codecMs)} msg/s)` + + ` overhead: ${sign}${pct}%`, + ) +} + +// ─── Suite ──────────────────────────────────────────────────────────────────── + +describe('SQS codec benchmarks', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + // ── Publish ──────────────────────────────────────────────────────────────── + + it(`publish: with vs without zstd (${N} messages)`, async () => { + console.log(`\n${'─'.repeat(72)}`) + console.log(` PUBLISH BENCHMARK — ${N} messages per run`) + console.log('─'.repeat(72)) + + for (const { label, suffix, meta } of CASES) { + const plainQ = `bench-pub-plain-${suffix}` + const codecQ = `bench-pub-codec-${suffix}` + await testAdmin.deleteQueues(plainQ, codecQ) + + const plainMsgs = makeMessages('bpp', N, meta) + const codecMsgs = makeMessages('bcp', N, meta) + + // ── Plain publish ── + const plainPub = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { queue: { QueueName: plainQ } }, + }) + await plainPub.init() + const t0 = performance.now() + for (const msg of plainMsgs) await plainPub.publish(msg) + const plainMs = performance.now() - t0 + await plainPub.close() + + // ── Codec publish ── + const codecPub = new SqsPermissionPublisher(diContainer.cradle, { + codec: 'zstd', + creationConfig: { queue: { QueueName: codecQ } }, + }) + await codecPub.init() + const t1 = performance.now() + for (const msg of codecMsgs) await codecPub.publish(msg) + const codecMs = performance.now() - t1 + await codecPub.close() + + await testAdmin.deleteQueues(plainQ, codecQ) + printRow(label, N, plainMs, codecMs) + } + }, 120_000) + + // ── Consume ──────────────────────────────────────────────────────────────── + + it(`consume: with vs without zstd (${N} messages)`, async () => { + console.log(`\n${'─'.repeat(72)}`) + console.log(` CONSUME BENCHMARK — ${N} messages per run`) + console.log('─'.repeat(72)) + + for (const { label, suffix, meta } of CASES) { + const plainQ = `bench-con-plain-${suffix}` + const codecQ = `bench-con-codec-${suffix}` + await testAdmin.deleteQueues(plainQ, codecQ) + + const plainMsgs = makeMessages('bpc', N, meta) + const codecMsgs = makeMessages('bcc', N, meta) + + // ── Pre-fill plain queue ── + const plainPub = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { queue: { QueueName: plainQ } }, + }) + await plainPub.init() + for (const msg of plainMsgs) await plainPub.publish(msg) + await plainPub.close() + + // ── Pre-fill codec queue ── + const codecPub = new SqsPermissionPublisher(diContainer.cradle, { + codec: 'zstd', + creationConfig: { queue: { QueueName: codecQ } }, + }) + await codecPub.init() + for (const msg of codecMsgs) await codecPub.publish(msg) + await codecPub.close() + + // ── Measure plain consume ── + // deletionConfig: { deleteIfExists: false } preserves the pre-filled queue + const plainCon = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: plainQ } }, + deletionConfig: { deleteIfExists: false }, + }) + await plainCon.start() + const t2 = performance.now() + await Promise.all( + plainMsgs.map((m) => plainCon.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + const plainMs = performance.now() - t2 + await plainCon.close(true) + + // ── Measure codec consume ── + const codecCon = new SqsPermissionConsumer(diContainer.cradle, { + codec: 'zstd', + creationConfig: { queue: { QueueName: codecQ } }, + deletionConfig: { deleteIfExists: false }, + }) + await codecCon.start() + const t3 = performance.now() + await Promise.all( + codecMsgs.map((m) => codecCon.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + const codecMs = performance.now() - t3 + await codecCon.close(true) + + await testAdmin.deleteQueues(plainQ, codecQ) + printRow(label, N, plainMs, codecMs) + } + }, 120_000) +}) diff --git a/packages/sqs/lib/index.ts b/packages/sqs/lib/index.ts index 49d3803c..40828db2 100644 --- a/packages/sqs/lib/index.ts +++ b/packages/sqs/lib/index.ts @@ -1,3 +1,9 @@ +export { + compressMessageBody, + decompressMessageBody, + resolveCodecHandler, + ZstdCodecHandler, +} from '@message-queue-toolkit/codec' export { SqsConsumerErrorResolver } from './errors/SqsConsumerErrorResolver.ts' export { FakeConsumerErrorResolver } from './fakes/FakeConsumerErrorResolver.ts' export { TestSqsPublisher, type TestSqsPublishOptions } from './fakes/TestSqsPublisher.ts' diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index d8a06b76..9122f464 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -5,12 +5,14 @@ import { SetQueueAttributesCommand, } from '@aws-sdk/client-sqs' import type { Either, ErrorResolver } from '@lokalise/node-core' +import { decompressMessageBody, resolveCodecHandler } from '@message-queue-toolkit/codec' import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' import { type BarrierResult, type DeadLetterQueueOptions, DeduplicationRequesterEnum, HandlerContainer, + isCodecEnvelope, isMessageError, type MessageSchemaContainer, noopReleasableLock, @@ -915,19 +917,32 @@ export abstract class AbstractSqsConsumer< } // Empty content for whatever reason - if (!resolveMessageResult.result || !resolveMessageResult.result.body) { + if (!resolveMessageResult.result?.body) { return ABORT_EARLY_EITHER } if (hasOffloadedPayload(resolveMessageResult.result)) { const retrieveOffloadedMessagePayloadResult = await this.retrieveOffloadedMessagePayload( resolveMessageResult.result.body, + (codec, data) => { + const handler = resolveCodecHandler(codec as Parameters[0]) + return handler.decompress(data) + }, ) if (retrieveOffloadedMessagePayloadResult.error) { this.handleError(retrieveOffloadedMessagePayloadResult.error) return ABORT_EARLY_EITHER } resolveMessageResult.result.body = retrieveOffloadedMessagePayloadResult.result + } else if (isCodecEnvelope(resolveMessageResult.result.body)) { + try { + resolveMessageResult.result.body = await decompressMessageBody( + resolveMessageResult.result.body, + ) + } catch (err) { + this.handleError(err as Error) + return ABORT_EARLY_EITHER + } } return resolveMessageResult @@ -942,7 +957,7 @@ export abstract class AbstractSqsConsumer< const resolvedMessage = resolveMessageResult.result // Empty content for whatever reason - if (!resolvedMessage || !resolvedMessage.body) return ABORT_EARLY_EITHER + if (!resolvedMessage?.body) return ABORT_EARLY_EITHER // @ts-expect-error if (this.messageIdField in resolvedMessage.body) { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index f4ed02d4..29d8b9c8 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -2,6 +2,7 @@ import type { MessageAttributeValue } from '@aws-sdk/client-sqs' import { SendMessageCommand } from '@aws-sdk/client-sqs' import type { Either } from '@lokalise/node-core' import { InternalError } from '@lokalise/node-core' +import { buildCodecEnvelope, resolveCodecHandler } from '@message-queue-toolkit/codec' import { type AsyncPublisher, type BarrierResult, @@ -123,9 +124,7 @@ export abstract class AbstractSqsPublisher // (offloaded payload won't have user fields needed for messageGroupIdField) const resolvedOptions = this.resolveFifoOptions(message, options) - const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => - calculateOutgoingMessageSize(message), - ) + const { payload, preBuiltBody } = await this.prepareOutgoingPayload(message) if ( this.isDeduplicationEnabledForMessage(parsedMessage) && @@ -141,7 +140,7 @@ export abstract class AbstractSqsPublisher return } - await this.sendMessage(maybeOffloadedPayloadMessage, resolvedOptions) + await this.sendMessage(payload, resolvedOptions, preBuiltBody) this.handleMessageProcessed({ message: parsedMessage, processingResult: { status: 'published' }, @@ -199,16 +198,52 @@ export abstract class AbstractSqsPublisher return this.messageSchemaContainer.resolveSchema(message) } + /** + * Compresses (when codec is set) or offloads (when store is configured) the message. + * Returns the payload to send and an optional pre-built body string. + * When preBuiltBody is set, it is a ready-to-send codec envelope — sendMessage must use it as-is. + */ + private async prepareOutgoingPayload(message: MessagePayloadType): Promise<{ + payload: MessagePayloadType | OffloadedPayloadPointerPayload + preBuiltBody?: string + }> { + const codec = this.codec + + if (codec) { + // Compress once up-front, then decide: offload the compressed bytes or send inline. + const compressed = await resolveCodecHandler(codec).compress( + Buffer.from(JSON.stringify(message), 'utf8'), + ) + + if ( + this.payloadStoreConfig && + compressed.byteLength > this.payloadStoreConfig.messageSizeThreshold + ) { + return { payload: await this.offloadCompressedPayload(message, compressed, codec) } + } + + return { payload: message, preBuiltBody: buildCodecEnvelope(compressed, codec) } + } + + return { + payload: + (await this.offloadPayload(message, () => calculateOutgoingMessageSize(message))) ?? + message, + } + } + protected async sendMessage( payload: MessagePayloadType | OffloadedPayloadPointerPayload, options: SQSMessageOptions, + preBuiltBody?: string, ): Promise { const attributes = resolveOutgoingMessageAttributes(payload) - - // Options are already resolved in publish() before offloading + // preBuiltBody is set when codec is active and the payload was not offloaded — + // it contains the already-compressed codec envelope, so we skip re-serialization. + const body = preBuiltBody ?? JSON.stringify(payload) const command = new SendMessageCommand({ QueueUrl: this.queueUrl, - MessageBody: JSON.stringify(payload), + MessageBody: body, MessageAttributes: attributes, ...options, }) diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 31231a1b..9a50b306 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -22,6 +22,7 @@ "scripts": { "build": "pnpm run clean && tsc --project tsconfig.build.json", "clean": "rimraf dist", + "bench": "vitest run --config vitest.bench.config.ts --reporter=verbose", "test": "vitest", "test:coverage": "pnpm run test --coverage", "lint": "biome check && tsc", @@ -36,6 +37,7 @@ }, "peerDependencies": { "@aws-sdk/client-sqs": "^3.1034.0", + "@message-queue-toolkit/codec": ">=1.0.0", "@message-queue-toolkit/core": ">=25.0.0", "zod": ">=3.25.76 <5.0.0" }, @@ -45,10 +47,11 @@ "@biomejs/biome": "^2.3.8", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", - "@message-queue-toolkit/redis-message-deduplication-store": "*", - "@message-queue-toolkit/s3-payload-store": "*", - "@message-queue-toolkit/schemas": "*", + "@message-queue-toolkit/codec": "workspace:*", + "@message-queue-toolkit/core": "workspace:*", + "@message-queue-toolkit/redis-message-deduplication-store": "workspace:*", + "@message-queue-toolkit/s3-payload-store": "workspace:*", + "@message-queue-toolkit/schemas": "workspace:*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", "awilix": "^13.0.3", diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts new file mode 100644 index 00000000..43f44493 --- /dev/null +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts @@ -0,0 +1,176 @@ +import { ReceiveMessageCommand, SendMessageCommand } from '@aws-sdk/client-sqs' +import { compressMessageBody } from '@message-queue-toolkit/codec' +import { MessageCodecEnum } from '@message-queue-toolkit/core' +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' + +import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher.ts' +import type { TestAwsResourceAdmin } from '../utils/testAdmin.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { SqsPermissionConsumer } from './SqsPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas.ts' + +describe('SqsPermissionConsumer - zstd codec', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + let publisher: SqsPermissionPublisher + let consumer: SqsPermissionConsumer + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + beforeEach(async () => { + await testAdmin.deleteQueues(SqsPermissionConsumer.QUEUE_NAME) + + consumer = new SqsPermissionConsumer(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + deletionConfig: { deleteIfExists: false }, + }) + publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + }) + + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await publisher.close() + await consumer.close(true) + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('publishes a compressed message and consumer decompresses it correctly', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-test-1', + messageType: 'add', + metadata: { info: 'hello zstd' }, + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }) + + it('published SQS message body is a codec envelope containing valid zstd bytes', async () => { + // Use an isolated queue with no consumer so we can read the raw message without a race + const wireQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-wire-check` + await testAdmin.deleteQueues(wireQueueName) + + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-wire-1', + messageType: 'add', + } + await wirePublisher.publish(message) + + // Read the raw message directly from SQS — no consumer is running on this queue + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + // Body must be a self-describing codec envelope, not raw message JSON + const envelope = JSON.parse(Messages![0]!.Body!) as Record + expect(envelope.__codec).toBe(MessageCodecEnum.ZSTD) + expect(typeof envelope.__data).toBe('string') + + // __data must decode to a valid zstd frame: magic number 0xFD2FB528 (LE → 28 B5 2F FD) + const compressed = Buffer.from(envelope.__data as string, 'base64') + expect(compressed.subarray(0, 4)).toEqual(Buffer.from([0x28, 0xb5, 0x2f, 0xfd])) + + await wirePublisher.close() + }) + + it('consumer correctly handles multiple compressed messages in sequence', async () => { + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = [ + { id: 'codec-seq-1', messageType: 'add' }, + { id: 'codec-seq-2', messageType: 'add' }, + { id: 'codec-seq-3', messageType: 'add' }, + ] + + for (const msg of messages) { + await publisher.publish(msg) + } + + for (const msg of messages) { + const result = await consumer.handlerSpy.waitForMessageWithId(msg.id, 'consumed') + expect(result.message).toMatchObject(msg) + } + }) + + it('consumer decompresses a message compressed externally with zstd', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-external-1', + messageType: 'add', + metadata: { source: 'external-compressor' }, + } + + // Simulate a publisher that compressed the message itself + const compressedBody = await compressMessageBody(JSON.stringify(message), MessageCodecEnum.ZSTD) + await diContainer.cradle.sqsClient.send( + new SendMessageCommand({ + QueueUrl: consumer.queueProps.url, + MessageBody: compressedBody, + }), + ) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }) + + it('consumer without codec option still decompresses zstd messages (auto-detection)', async () => { + // Use a dedicated queue so only autoConsumer polls it — avoids both the race + // condition (shared queue) and localstack long-poll timing issues (abort + restart) + const autoQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-auto-detect` + await testAdmin.deleteQueues(autoQueueName) + + const autoPublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: autoQueueName } }, + }) + await autoPublisher.init() + + // Consumer without codec — auto-detects from envelope __codec field + const autoConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: autoQueueName } }, + deletionConfig: { deleteIfExists: false }, + }) + await autoConsumer.start() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-auto-detect-1', + messageType: 'add', + } + await autoPublisher.publish(message) + + const result = await autoConsumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await autoPublisher.close() + await autoConsumer.close(true) + }, 15000) +}) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts index c53ff481..d41bad0c 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts @@ -1,7 +1,7 @@ import type { S3 } from '@aws-sdk/client-s3' -import { SendMessageCommand } from '@aws-sdk/client-sqs' +import { ReceiveMessageCommand, SendMessageCommand } from '@aws-sdk/client-sqs' import type { SinglePayloadStoreConfig } from '@message-queue-toolkit/core' -import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' +import { MessageCodecEnum, MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' import { S3PayloadStore } from '@message-queue-toolkit/s3-payload-store' import { OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' @@ -13,7 +13,7 @@ import { AbstractSqsConsumer } from '../../lib/sqs/AbstractSqsConsumer.ts' import { AbstractSqsPublisher } from '../../lib/sqs/AbstractSqsPublisher.ts' import { SQS_MESSAGE_MAX_SIZE } from '../../lib/sqs/AbstractSqsService.ts' import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher.ts' -import { putObjectContent, waitForS3Objects } from '../utils/s3Utils.ts' +import { getObjectBuffer, putObjectContent, waitForS3Objects } from '../utils/s3Utils.ts' import type { TestAwsResourceAdmin } from '../utils/testAdmin.ts' import type { Dependencies } from '../utils/testContext.ts' import { registerDependencies } from '../utils/testContext.ts' @@ -401,3 +401,157 @@ describe('SqsPermissionConsumer - nested messageTypePath with payload offloading }) }) }) + +describe('SqsPermissionConsumer - codec + payload offloading', () => { + const s3BucketName = 'test-bucket-codec' + // Threshold low enough that even a small compressed payload triggers offloading + const smallThreshold = 10 + + let diContainer: AwilixContainer + let s3: S3 + let testAdmin: TestAwsResourceAdmin + let payloadStoreConfig: SinglePayloadStoreConfig + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + s3 = diContainer.cradle.s3 + testAdmin = diContainer.cradle.testAdmin + + await testAdmin.createBucket(s3BucketName) + payloadStoreConfig = { + messageSizeThreshold: smallThreshold, + store: new S3PayloadStore(diContainer.cradle, { bucketName: s3BucketName }), + storeName: 's3', + } + }) + + afterAll(async () => { + await testAdmin.emptyBuckets(s3BucketName) + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('S3 object is raw zstd binary and SQS message carries a plain pointer (not a codec envelope)', async () => { + // Use an isolated queue with no consumer so we can read the raw SQS message without a race + const wireQueueName = 'codec-offload-wire-check' + await testAdmin.deleteQueues(wireQueueName) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-offload-wire-1', + messageType: 'add', + metadata: { info: 'wire format check' }, + } + + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + payloadStoreConfig, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + await wirePublisher.publish(message) + + // Read the raw SQS message before any consumer touches it + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + // SQS body must be a plain JSON pointer — not a codec envelope. + // Compressed bytes live in S3; only the pointer is sent inline. + const sqsBody = JSON.parse(Messages![0]!.Body!) as Record + expect(sqsBody.__codec, 'SQS body must not be a codec envelope when offloading').toBeUndefined() + expect(sqsBody.payloadRef, 'SQS body must contain a payloadRef pointer').toBeDefined() + const payloadRef = sqsBody.payloadRef as Record + expect(payloadRef.codec).toBe(MessageCodecEnum.ZSTD) + + // S3 object must be raw compressed binary, not a JSON codec envelope. + // zstd frames start with magic number 0xFD2FB528 (little-endian: 28 B5 2F FD). + const s3Keys = await waitForS3Objects(s3, s3BucketName, 1, 5000) + expect(s3Keys.length).toBeGreaterThan(0) + const s3Bytes = await getObjectBuffer(s3, s3BucketName, s3Keys[0]!) + expect(s3Bytes.subarray(0, 4)).toEqual(Buffer.from([0x28, 0xb5, 0x2f, 0xfd])) + + await wirePublisher.close() + }, 30_000) + + it('compresses payload, offloads to S3 as raw binary, and consumer decompresses correctly', async () => { + const queueName = 'codec-offload-roundtrip' + await testAdmin.deleteQueues(queueName) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-offload-1', + messageType: 'add', + metadata: { info: 'compressed and offloaded' }, + } + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + }) + // No codec on consumer — codec is read from payloadRef.codec in the pointer + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + + await publisher.init() + await consumer.start() + await publisher.publish(message) + + // Verify payload was offloaded to S3 + const s3Keys = await waitForS3Objects(s3, s3BucketName, 1, 5000) + expect(s3Keys.length).toBeGreaterThan(0) + + // Verify consumer receives the correct decompressed payload + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await publisher.close() + await consumer.close(true) + }, 30_000) + + it('consumer without explicit codec still decompresses codec-offloaded payload', async () => { + const queueName = 'codec-offload-auto-detect' + await testAdmin.deleteQueues(queueName) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-offload-auto-1', + messageType: 'add', + metadata: { info: 'auto-detect codec from pointer' }, + } + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + }) + // Consumer has no explicit codec — should still work because codec comes from payloadRef.codec + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + + await publisher.init() + await consumer.start() + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await publisher.close() + await consumer.close(true) + }, 30_000) +}) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index 26618288..e10b38fd 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -32,6 +32,7 @@ type SqsPermissionConsumerOptions = Pick< | 'payloadStoreConfig' | 'messageDeduplicationConfig' | 'enableConsumerDeduplication' + | 'codec' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -128,6 +129,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< payloadStoreConfig: options.payloadStoreConfig, messageDeduplicationConfig: options.messageDeduplicationConfig, enableConsumerDeduplication: options.enableConsumerDeduplication, + codec: options.codec, messageDeduplicationIdField: 'deduplicationId', messageDeduplicationOptionsField: 'deduplicationOptions', handlers: new MessageHandlerConfigBuilder< diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.ts index 33ab0c15..5aa73c20 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.ts @@ -31,6 +31,7 @@ export class SqsPermissionPublisher extends AbstractSqsPublisher, ) { super(dependencies, { @@ -53,6 +54,7 @@ export class SqsPermissionPublisher extends AbstractSqsPublisher { + const result = await s3.getObject({ Bucket: bucket, Key: key }) + const bytes = await result.Body?.transformToByteArray() + if (!bytes) throw new Error(`No body for S3 object ${key}`) + return Buffer.from(bytes) +} + export async function putObjectContent(s3: S3, bucket: string, key: string, content: string) { await s3.putObject({ Bucket: bucket, Key: key, Body: content }) } diff --git a/packages/sqs/vitest.bench.config.ts b/packages/sqs/vitest.bench.config.ts new file mode 100644 index 00000000..fd4e580e --- /dev/null +++ b/packages/sqs/vitest.bench.config.ts @@ -0,0 +1,14 @@ +import { defineConfig } from 'vitest/config' + +// biome-ignore lint/style/noDefaultExport: vite expects default export +export default defineConfig({ + test: { + globals: true, + watch: false, + mockReset: true, + pool: 'threads', + maxWorkers: 1, + setupFiles: ['test/utils/vitest.setup.ts'], + include: ['bench/**/*.ts'], + }, +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4826d0c7..cf6ea3d5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -64,6 +64,30 @@ importers: specifier: ^4.1.13 version: 4.4.3 + packages/codec: + devDependencies: + '@biomejs/biome': + specifier: ^2.3.8 + version: 2.4.15 + '@lokalise/biome-config': + specifier: ^3.1.0 + version: 3.1.1 + '@lokalise/tsconfig': + specifier: ^3.0.0 + version: 3.1.0 + '@message-queue-toolkit/core': + specifier: workspace:* + version: link:../core + '@types/node': + specifier: ^25.0.2 + version: 25.8.0 + rimraf: + specifier: ^6.0.1 + version: 6.1.3 + typescript: + specifier: ^5.9.3 + version: 5.9.3 + packages/core: dependencies: '@lokalise/node-core': @@ -150,14 +174,14 @@ importers: specifier: ^3.0.0 version: 3.1.0 '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@message-queue-toolkit/gcs-payload-store': specifier: '*' - version: 1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3)) + version: 1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@packages+core) '@message-queue-toolkit/redis-message-deduplication-store': specifier: '*' - version: 2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3) + version: 2.0.2(@message-queue-toolkit/core@packages+core)(ioredis@5.10.1)(zod@4.4.3) '@message-queue-toolkit/schemas': specifier: '*' version: 7.1.0(zod@4.4.3) @@ -497,15 +521,18 @@ importers: '@lokalise/tsconfig': specifier: ^3.0.0 version: 3.1.0 + '@message-queue-toolkit/codec': + specifier: workspace:* + version: link:../codec '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@message-queue-toolkit/redis-message-deduplication-store': - specifier: '*' - version: 2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3) + specifier: workspace:* + version: link:../redis-message-deduplication-store '@message-queue-toolkit/s3-payload-store': - specifier: '*' - version: 3.0.0(@aws-sdk/client-s3@3.1048.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3)) + specifier: workspace:* + version: link:../s3-payload-store '@message-queue-toolkit/sqs': specifier: workspace:* version: link:../sqs @@ -564,18 +591,21 @@ importers: '@lokalise/tsconfig': specifier: ^3.0.0 version: 3.1.0 + '@message-queue-toolkit/codec': + specifier: workspace:* + version: link:../codec '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@message-queue-toolkit/redis-message-deduplication-store': - specifier: '*' - version: 2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3) + specifier: workspace:* + version: link:../redis-message-deduplication-store '@message-queue-toolkit/s3-payload-store': - specifier: '*' - version: 3.0.0(@aws-sdk/client-s3@3.1048.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3)) + specifier: workspace:* + version: link:../s3-payload-store '@message-queue-toolkit/schemas': - specifier: '*' - version: 7.1.0(zod@4.4.3) + specifier: workspace:* + version: link:../schemas '@types/node': specifier: ^25.0.2 version: 25.8.0 @@ -991,12 +1021,6 @@ packages: '@message-queue-toolkit/core': '>=23.1.0' ioredis: ^5.3.2 - '@message-queue-toolkit/s3-payload-store@3.0.0': - resolution: {integrity: sha512-AX2PI74CN9CBQWHT/nJBhUPR8E6beGodTsuSSlZ/zQvy6ViDcI4gEKxFViqKR2xai7PeLsqw+HWdkXhawwEqYA==} - peerDependencies: - '@aws-sdk/client-s3': ^3.596.0 - '@message-queue-toolkit/core': '>=24.0.0' - '@message-queue-toolkit/schemas@7.1.0': resolution: {integrity: sha512-JAzSQAHouympK/cEDBxsfEuS2Ifu1pv0a/NRvhNWfFlgW0TmsWT7SkYNERA7x89OK7PGk9PyDN88cV9l0gZ22Q==} peerDependencies: @@ -3350,26 +3374,21 @@ snapshots: toad-cache: 3.7.0 zod: 4.4.3 - '@message-queue-toolkit/gcs-payload-store@1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3))': + '@message-queue-toolkit/gcs-payload-store@1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@packages+core)': dependencies: '@google-cloud/storage': 7.19.0 - '@message-queue-toolkit/core': 25.5.0(zod@4.4.3) + '@message-queue-toolkit/core': link:packages/core - '@message-queue-toolkit/redis-message-deduplication-store@2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3)': + '@message-queue-toolkit/redis-message-deduplication-store@2.0.2(@message-queue-toolkit/core@packages+core)(ioredis@5.10.1)(zod@4.4.3)': dependencies: '@lokalise/node-core': 14.8.1(zod@4.4.3) - '@message-queue-toolkit/core': 25.5.0(zod@4.4.3) + '@message-queue-toolkit/core': link:packages/core ioredis: 5.10.1 redis-semaphore: 5.7.0(ioredis@5.10.1) transitivePeerDependencies: - supports-color - zod - '@message-queue-toolkit/s3-payload-store@3.0.0(@aws-sdk/client-s3@3.1048.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3))': - dependencies: - '@aws-sdk/client-s3': 3.1048.0 - '@message-queue-toolkit/core': 25.5.0(zod@4.4.3) - '@message-queue-toolkit/schemas@7.1.0(zod@4.4.3)': dependencies: zod: 4.4.3