-
Notifications
You must be signed in to change notification settings - Fork 7
feat: add zstd message compression codec for SNS and SQS #442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0794af6
b3b0fcf
9950db2
2dfdb6e
c1d83e7
12897ca
094dc5d
69bf6cc
ff779b5
989165f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| # @message-queue-toolkit/codec | ||
|
|
||
| Message compression codec implementations for [message-queue-toolkit](https://github.com/kibertoad/message-queue-toolkit). | ||
|
|
||
| This package provides the concrete codec implementations (e.g. zstd) used by the SQS and SNS adapters. The codec interfaces and types (`MessageCodecEnum`, `MessageCodecHandler`, `CodecEnvelope`) live in `@message-queue-toolkit/core`. | ||
|
|
||
| ## Installation | ||
|
|
||
| ```sh | ||
| npm install @message-queue-toolkit/codec @message-queue-toolkit/core | ||
| ``` | ||
|
|
||
| > **Requirements:** Node.js >=22.15.0 (uses the built-in `zlib` zstd support). | ||
|
|
||
| ## Usage | ||
|
|
||
| Codec options are typically set on the publisher/consumer constructor in the SQS or SNS adapter packages. You do not need to interact with this package directly unless you are building a custom adapter. | ||
|
|
||
| ### How compression works during publish | ||
|
|
||
| When `codec` is set on a publisher, compression happens **exactly once** at the start of `publish()`, before any other processing: | ||
|
|
||
| 1. The message JSON is compressed to a raw `Buffer`. | ||
| 2. If a payload store is configured **and** the compressed size exceeds `messageSizeThreshold`, the compressed bytes are stored in S3 and only a lightweight pointer is sent. The codec name is recorded in `payloadRef.codec` so the consumer can decompress after retrieval. | ||
| 3. If the compressed size fits within the threshold (or no store is configured), the message is sent inline as a self-describing codec envelope. | ||
|
|
||
| The payload is never compressed twice. The same compressed `Buffer` from step 1 is either uploaded to S3 or wrapped in the envelope — whichever path is taken. | ||
|
|
||
| ### Compress / decompress a message body | ||
|
|
||
| ```typescript | ||
| import { compressMessageBody, decompressMessageBody } from '@message-queue-toolkit/codec' | ||
| import { MessageCodecEnum } from '@message-queue-toolkit/core' | ||
|
|
||
| // Compress (returns a JSON string containing the codec envelope) | ||
| const compressed = await compressMessageBody(JSON.stringify(payload), MessageCodecEnum.ZSTD) | ||
|
|
||
| // Decompress (parses the envelope and returns the original object) | ||
| const original = await decompressMessageBody(JSON.parse(compressed)) | ||
| ``` | ||
|
|
||
| ### Build a codec envelope from already-compressed bytes | ||
|
|
||
| When you have pre-compressed bytes (e.g., from `resolveCodecHandler(codec).compress(...)`) and want to produce the envelope string without compressing again: | ||
|
|
||
| ```typescript | ||
| import { buildCodecEnvelope, resolveCodecHandler } from '@message-queue-toolkit/codec' | ||
| import { MessageCodecEnum } from '@message-queue-toolkit/core' | ||
|
|
||
| const handler = resolveCodecHandler(MessageCodecEnum.ZSTD) | ||
| const compressed: Buffer = await handler.compress(Buffer.from(JSON.stringify(payload), 'utf8')) | ||
|
|
||
| // Build envelope without a second compression pass | ||
| const envelopeString = buildCodecEnvelope(compressed, MessageCodecEnum.ZSTD) | ||
| // → '{"__codec":"zstd","__data":"<base64>"}' | ||
| ``` | ||
|
|
||
| ### Custom codec handler | ||
|
|
||
| ```typescript | ||
| import type { MessageCodecHandler } from '@message-queue-toolkit/core' | ||
|
|
||
| class MyCodecHandler implements MessageCodecHandler { | ||
| compress(data: Buffer): Promise<Buffer> { /* ... */ } | ||
| decompress(data: Buffer): Promise<Buffer> { /* ... */ } | ||
| } | ||
| ``` | ||
|
|
||
| ## Codec envelope format | ||
|
|
||
| Compressed messages are wrapped in a self-describing JSON envelope: | ||
|
|
||
| ```json | ||
| { | ||
| "__codec": "zstd", | ||
| "__data": "<base64-encoded compressed bytes>" | ||
| } | ||
| ``` | ||
|
|
||
| Consumers auto-detect this envelope and decompress transparently, even if the `codec` option is not set on the consumer. | ||
|
|
||
| ## License | ||
|
|
||
| MIT |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| import { promisify } from 'node:util' | ||
| import zlib from 'node:zlib' | ||
| import type { CodecEnvelope, MessageCodec, MessageCodecHandler } from '@message-queue-toolkit/core' | ||
| import { MessageCodecEnum } from '@message-queue-toolkit/core' | ||
|
|
||
| if (typeof zlib.zstdCompress !== 'function' || typeof zlib.zstdDecompress !== 'function') { | ||
| throw new Error( | ||
| 'zlib.zstdCompress and zlib.zstdDecompress are not available in this Node.js version. ' + | ||
| '@message-queue-toolkit/codec requires Node.js >=22.15.0 or >=23.8.0.', | ||
| ) | ||
| } | ||
|
|
||
| const zstdCompress = promisify(zlib.zstdCompress) | ||
| const zstdDecompress = promisify(zlib.zstdDecompress) | ||
|
|
||
| export class ZstdCodecHandler implements MessageCodecHandler { | ||
|
irfanh94 marked this conversation as resolved.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we’re no longer relying on external libraries for compression, we might consider moving this utility into the core. That would allow us to reuse it across other modules like amp or gcp-pubsup without having to reimplement it each time. @kibertoad tagging you to get your thoughts on this, especially since you previously suggested moving it out of core 😓
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I just saw the comment about a separate package. I agree with that as well—ultimately, we’re both talking about the same thing: enabling compression support across the different technologies we support. One open question I’m still unsure about: does it really make sense to create a separate package if we’re going to rely on Node internals anyway? If we keep it in core, customers don’t need to install additional dependencies they may not use. From that perspective, having it in core feels sufficient, but I don’t have a strong opinion either way. |
||
| compress(data: Buffer): Promise<Buffer> { | ||
| return zstdCompress(data) | ||
| } | ||
|
|
||
| decompress(data: Buffer): Promise<Buffer> { | ||
| return zstdDecompress(data) | ||
| } | ||
| } | ||
|
|
||
| const ZSTD_HANDLER = new ZstdCodecHandler() | ||
|
|
||
| export function resolveCodecHandler(codec: MessageCodec): MessageCodecHandler { | ||
| if (codec === MessageCodecEnum.ZSTD) return ZSTD_HANDLER | ||
| throw new Error(`Unsupported codec: ${codec}`) | ||
| } | ||
|
|
||
| export async function compressMessageBody(jsonBody: string, codec: MessageCodec): Promise<string> { | ||
| const handler = resolveCodecHandler(codec) | ||
| const compressed = await handler.compress(Buffer.from(jsonBody, 'utf8')) | ||
| return buildCodecEnvelope(compressed, codec) | ||
| } | ||
|
|
||
| /** | ||
| * Wraps an already-compressed buffer in a codec envelope string. | ||
| * Use this when you have pre-compressed bytes and want to avoid compressing twice. | ||
| */ | ||
| export function buildCodecEnvelope(compressed: Buffer, codec: MessageCodec): string { | ||
| const envelope: CodecEnvelope = { | ||
| __codec: codec, | ||
| __data: compressed.toString('base64'), | ||
| } | ||
| return JSON.stringify(envelope) | ||
| } | ||
|
|
||
| export async function decompressMessageBody(envelope: CodecEnvelope): Promise<unknown> { | ||
| const handler = resolveCodecHandler(envelope.__codec) | ||
| const compressed = Buffer.from(envelope.__data, 'base64') | ||
| const decompressed = await handler.decompress(compressed) | ||
| return JSON.parse(decompressed.toString('utf8')) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| export { | ||
| buildCodecEnvelope, | ||
| compressMessageBody, | ||
| decompressMessageBody, | ||
| resolveCodecHandler, | ||
| ZstdCodecHandler, | ||
| } from './codec/codecHandler.ts' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| { | ||
| "name": "@message-queue-toolkit/codec", | ||
| "version": "1.0.0", | ||
| "private": false, | ||
| "license": "MIT", | ||
| "description": "Message compression codec implementations for message-queue-toolkit", | ||
| "maintainers": [ | ||
| { | ||
| "name": "Igor Savin", | ||
| "email": "kibertoad@gmail.com" | ||
| } | ||
| ], | ||
| "type": "module", | ||
| "main": "./dist/index.js", | ||
| "exports": { | ||
| ".": "./dist/index.js", | ||
| "./package.json": "./package.json" | ||
| }, | ||
| "scripts": { | ||
| "build": "pnpm run clean && tsc --project tsconfig.build.json", | ||
| "clean": "rimraf dist", | ||
| "lint": "biome check . && tsc", | ||
| "lint:fix": "biome check --write .", | ||
| "prepublishOnly": "pnpm run lint && pnpm run build" | ||
| }, | ||
| "engines": { | ||
| "node": ">=22.15.0" | ||
| }, | ||
| "peerDependencies": { | ||
| "@message-queue-toolkit/core": ">=25.0.0" | ||
| }, | ||
| "devDependencies": { | ||
| "@biomejs/biome": "^2.3.8", | ||
| "@lokalise/biome-config": "^3.1.0", | ||
| "@lokalise/tsconfig": "^3.0.0", | ||
| "@message-queue-toolkit/core": "workspace:*", | ||
| "@types/node": "^25.0.2", | ||
| "rimraf": "^6.0.1", | ||
| "typescript": "^5.9.3" | ||
| }, | ||
| "homepage": "https://github.com/kibertoad/message-queue-toolkit", | ||
| "repository": { | ||
| "type": "git", | ||
| "url": "git://github.com/kibertoad/message-queue-toolkit.git" | ||
| }, | ||
| "keywords": [ | ||
| "message", | ||
| "queue", | ||
| "codec", | ||
| "compression", | ||
| "zstd" | ||
| ], | ||
| "files": [ | ||
| "README.md", | ||
| "LICENSE", | ||
| "dist/*" | ||
| ] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "extends": ["./tsconfig.json", "@lokalise/tsconfig/build-public-lib"], | ||
| "include": ["lib/**/*"] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "extends": "@lokalise/tsconfig/tsc", | ||
| "include": ["lib/**/*"] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| type ObjectValues<T> = T[keyof T] | ||
|
|
||
| /** | ||
| * Supported message compression codecs. | ||
| * | ||
| * Use the enum values instead of raw strings so that adding a new codec in | ||
| * the future is a single-place change and consumers benefit from IDE | ||
| * auto-complete. | ||
| * | ||
| * @example | ||
| * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD }) | ||
| */ | ||
| export const MessageCodecEnum = { | ||
| /** zstd compression via Node.js built-in `zlib` (requires Node.js >=22.15.0). */ | ||
| ZSTD: 'zstd', | ||
| } as const | ||
| export type MessageCodec = ObjectValues<typeof MessageCodecEnum> | ||
|
|
||
| const CODEC_FIELD = '__codec' | ||
| const DATA_FIELD = '__data' | ||
|
|
||
| export type CodecEnvelope = { | ||
| [CODEC_FIELD]: MessageCodec | ||
| [DATA_FIELD]: string | ||
| } | ||
|
|
||
| /** | ||
| * Low-level interface for a compression codec. | ||
| * | ||
| * Implement this interface to plug in a custom compression algorithm. | ||
| * The built-in implementation (`ZstdCodecHandler` in `@message-queue-toolkit/sqs`) | ||
| * uses Node.js built-in `zlib` zstd support. | ||
| */ | ||
| export interface MessageCodecHandler { | ||
| compress(data: Buffer): Promise<Buffer> | ||
| decompress(data: Buffer): Promise<Buffer> | ||
| } | ||
|
|
||
| export function isCodecEnvelope(value: unknown): value is CodecEnvelope { | ||
| const record = value as Record<string, unknown> | ||
| return ( | ||
| typeof value === 'object' && | ||
| value !== null && | ||
| CODEC_FIELD in value && | ||
| DATA_FIELD in value && | ||
| (Object.values(MessageCodecEnum) as string[]).includes(record[CODEC_FIELD] as string) && | ||
| typeof record[DATA_FIELD] === 'string' | ||
| ) | ||
|
irfanh94 marked this conversation as resolved.
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.