Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
run: |
declare -A PATH_TO_NAME=(
["packages/amqp"]="@message-queue-toolkit/amqp"
["packages/codec"]="@message-queue-toolkit/codec"
["packages/core"]="@message-queue-toolkit/core"
["packages/gcp-pubsub"]="@message-queue-toolkit/gcp-pubsub"
["packages/gcs-payload-store"]="@message-queue-toolkit/gcs-payload-store"
Expand Down
14 changes: 13 additions & 1 deletion biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,17 @@
"noUnusedPrivateClassMembers": "off"
}
}
}
},
"overrides": [
{
"includes": ["**/bench/**"],
"linter": {
"rules": {
"suspicious": {
"noConsole": "off"
}
}
}
}
]
}
84 changes: 84 additions & 0 deletions packages/codec/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# @message-queue-toolkit/codec

Message compression codec implementations for [message-queue-toolkit](https://github.com/kibertoad/message-queue-toolkit).

This package provides the concrete codec implementations (e.g. zstd) used by the SQS and SNS adapters. The codec interfaces and types (`MessageCodecEnum`, `MessageCodecHandler`, `CodecEnvelope`) live in `@message-queue-toolkit/core`.

## Installation

```sh
npm install @message-queue-toolkit/codec @message-queue-toolkit/core
```

> **Requirements:** Node.js >=22.15.0 (uses the built-in `zlib` zstd support).

## Usage

Codec options are typically set on the publisher/consumer constructor in the SQS or SNS adapter packages. You do not need to interact with this package directly unless you are building a custom adapter.

### How compression works during publish

When `codec` is set on a publisher, compression happens **exactly once** at the start of `publish()`, before any other processing:

1. The message JSON is compressed to a raw `Buffer`.
2. If a payload store is configured **and** the compressed size exceeds `messageSizeThreshold`, the compressed bytes are stored in S3 and only a lightweight pointer is sent. The codec name is recorded in `payloadRef.codec` so the consumer can decompress after retrieval.
3. If the compressed size fits within the threshold (or no store is configured), the message is sent inline as a self-describing codec envelope.

The payload is never compressed twice. The same compressed `Buffer` from step 1 is either uploaded to S3 or wrapped in the envelope — whichever path is taken.

### Compress / decompress a message body

```typescript
import { compressMessageBody, decompressMessageBody } from '@message-queue-toolkit/codec'
import { MessageCodecEnum } from '@message-queue-toolkit/core'

// Compress (returns a JSON string containing the codec envelope)
const compressed = await compressMessageBody(JSON.stringify(payload), MessageCodecEnum.ZSTD)

// Decompress (parses the envelope and returns the original object)
const original = await decompressMessageBody(JSON.parse(compressed))
```

### Build a codec envelope from already-compressed bytes

When you have pre-compressed bytes (e.g., from `resolveCodecHandler(codec).compress(...)`) and want to produce the envelope string without compressing again:

```typescript
import { buildCodecEnvelope, resolveCodecHandler } from '@message-queue-toolkit/codec'
import { MessageCodecEnum } from '@message-queue-toolkit/core'

const handler = resolveCodecHandler(MessageCodecEnum.ZSTD)
const compressed: Buffer = await handler.compress(Buffer.from(JSON.stringify(payload), 'utf8'))

// Build envelope without a second compression pass
const envelopeString = buildCodecEnvelope(compressed, MessageCodecEnum.ZSTD)
// → '{"__codec":"zstd","__data":"<base64>"}'
```

### Custom codec handler

```typescript
import type { MessageCodecHandler } from '@message-queue-toolkit/core'

class MyCodecHandler implements MessageCodecHandler {
compress(data: Buffer): Promise<Buffer> { /* ... */ }
decompress(data: Buffer): Promise<Buffer> { /* ... */ }
}
```

## Codec envelope format

Compressed messages are wrapped in a self-describing JSON envelope:

```json
{
"__codec": "zstd",
"__data": "<base64-encoded compressed bytes>"
}
```

Consumers auto-detect this envelope and decompress transparently, even if the `codec` option is not set on the consumer.

## License

MIT
56 changes: 56 additions & 0 deletions packages/codec/lib/codec/codecHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { promisify } from 'node:util'
import zlib from 'node:zlib'
import type { CodecEnvelope, MessageCodec, MessageCodecHandler } from '@message-queue-toolkit/core'
import { MessageCodecEnum } from '@message-queue-toolkit/core'

if (typeof zlib.zstdCompress !== 'function' || typeof zlib.zstdDecompress !== 'function') {
throw new Error(
'zlib.zstdCompress and zlib.zstdDecompress are not available in this Node.js version. ' +
'@message-queue-toolkit/codec requires Node.js >=22.15.0 or >=23.8.0.',
)
}

const zstdCompress = promisify(zlib.zstdCompress)
const zstdDecompress = promisify(zlib.zstdDecompress)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

export class ZstdCodecHandler implements MessageCodecHandler {
Comment thread
irfanh94 marked this conversation as resolved.
Copy link
Copy Markdown
Collaborator

@CarlosGamero CarlosGamero May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we’re no longer relying on external libraries for compression, we might consider moving this utility into the core. That would allow us to reuse it across other modules like amp or gcp-pubsup without having to reimplement it each time.

@kibertoad tagging you to get your thoughts on this, especially since you previously suggested moving it out of core 😓

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just saw the comment about a separate package. I agree with that as well—ultimately, we’re both talking about the same thing: enabling compression support across the different technologies we support.

One open question I’m still unsure about: does it really make sense to create a separate package if we’re going to rely on Node internals anyway? If we keep it in core, customers don’t need to install additional dependencies they may not use. From that perspective, having it in core feels sufficient, but I don’t have a strong opinion either way.

compress(data: Buffer): Promise<Buffer> {
return zstdCompress(data)
}

decompress(data: Buffer): Promise<Buffer> {
return zstdDecompress(data)
}
}

const ZSTD_HANDLER = new ZstdCodecHandler()

export function resolveCodecHandler(codec: MessageCodec): MessageCodecHandler {
if (codec === MessageCodecEnum.ZSTD) return ZSTD_HANDLER
throw new Error(`Unsupported codec: ${codec}`)
}

export async function compressMessageBody(jsonBody: string, codec: MessageCodec): Promise<string> {
const handler = resolveCodecHandler(codec)
const compressed = await handler.compress(Buffer.from(jsonBody, 'utf8'))
return buildCodecEnvelope(compressed, codec)
}

/**
* Wraps an already-compressed buffer in a codec envelope string.
* Use this when you have pre-compressed bytes and want to avoid compressing twice.
*/
export function buildCodecEnvelope(compressed: Buffer, codec: MessageCodec): string {
const envelope: CodecEnvelope = {
__codec: codec,
__data: compressed.toString('base64'),
}
return JSON.stringify(envelope)
}

export async function decompressMessageBody(envelope: CodecEnvelope): Promise<unknown> {
const handler = resolveCodecHandler(envelope.__codec)
const compressed = Buffer.from(envelope.__data, 'base64')
const decompressed = await handler.decompress(compressed)
return JSON.parse(decompressed.toString('utf8'))
}
7 changes: 7 additions & 0 deletions packages/codec/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export {
buildCodecEnvelope,
compressMessageBody,
decompressMessageBody,
resolveCodecHandler,
ZstdCodecHandler,
} from './codec/codecHandler.ts'
58 changes: 58 additions & 0 deletions packages/codec/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"name": "@message-queue-toolkit/codec",
"version": "1.0.0",
"private": false,
"license": "MIT",
"description": "Message compression codec implementations for message-queue-toolkit",
"maintainers": [
{
"name": "Igor Savin",
"email": "kibertoad@gmail.com"
}
],
"type": "module",
"main": "./dist/index.js",
"exports": {
".": "./dist/index.js",
"./package.json": "./package.json"
},
"scripts": {
"build": "pnpm run clean && tsc --project tsconfig.build.json",
"clean": "rimraf dist",
"lint": "biome check . && tsc",
"lint:fix": "biome check --write .",
"prepublishOnly": "pnpm run lint && pnpm run build"
},
"engines": {
"node": ">=22.15.0"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=25.0.0"
},
"devDependencies": {
"@biomejs/biome": "^2.3.8",
"@lokalise/biome-config": "^3.1.0",
"@lokalise/tsconfig": "^3.0.0",
"@message-queue-toolkit/core": "workspace:*",
"@types/node": "^25.0.2",
"rimraf": "^6.0.1",
"typescript": "^5.9.3"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
"type": "git",
"url": "git://github.com/kibertoad/message-queue-toolkit.git"
},
"keywords": [
"message",
"queue",
"codec",
"compression",
"zstd"
],
"files": [
"README.md",
"LICENSE",
"dist/*"
]
}
4 changes: 4 additions & 0 deletions packages/codec/tsconfig.build.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": ["./tsconfig.json", "@lokalise/tsconfig/build-public-lib"],
"include": ["lib/**/*"]
}
4 changes: 4 additions & 0 deletions packages/codec/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "@lokalise/tsconfig/tsc",
"include": ["lib/**/*"]
}
11 changes: 11 additions & 0 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,17 @@ class MyPayloadStore implements PayloadStore {
}
```

#### Interaction with codec (compression)

When both `codec` and `payloadStoreConfig` are set on a publisher, compression and offloading work together with a single compression pass:

1. The message is compressed **once** at publish time.
2. The **compressed** size is compared against `messageSizeThreshold`.
3. If the compressed size exceeds the threshold, the raw compressed bytes are stored in the payload store. The codec name is written to `payloadRef.codec` so the consumer knows how to decompress after retrieval.
4. If the compressed size fits within the threshold, the message is sent inline as a self-describing codec envelope — S3 is never touched.

This means compression can prevent offloading entirely for messages that are large before compression but small after.

## API Reference

### Types
Expand Down
49 changes: 49 additions & 0 deletions packages/core/lib/codec/messageCodec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
type ObjectValues<T> = T[keyof T]

/**
* Supported message compression codecs.
*
* Use the enum values instead of raw strings so that adding a new codec in
* the future is a single-place change and consumers benefit from IDE
* auto-complete.
*
* @example
* new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD })
*/
export const MessageCodecEnum = {
/** zstd compression via Node.js built-in `zlib` (requires Node.js >=22.15.0). */
ZSTD: 'zstd',
} as const
export type MessageCodec = ObjectValues<typeof MessageCodecEnum>

const CODEC_FIELD = '__codec'
const DATA_FIELD = '__data'

export type CodecEnvelope = {
[CODEC_FIELD]: MessageCodec
[DATA_FIELD]: string
}

/**
* Low-level interface for a compression codec.
*
* Implement this interface to plug in a custom compression algorithm.
* The built-in implementation (`ZstdCodecHandler` in `@message-queue-toolkit/sqs`)
* uses Node.js built-in `zlib` zstd support.
*/
export interface MessageCodecHandler {
compress(data: Buffer): Promise<Buffer>
decompress(data: Buffer): Promise<Buffer>
}

export function isCodecEnvelope(value: unknown): value is CodecEnvelope {
const record = value as Record<string, unknown>
return (
typeof value === 'object' &&
value !== null &&
CODEC_FIELD in value &&
DATA_FIELD in value &&
(Object.values(MessageCodecEnum) as string[]).includes(record[CODEC_FIELD] as string) &&
typeof record[DATA_FIELD] === 'string'
)
Comment thread
irfanh94 marked this conversation as resolved.
}
7 changes: 7 additions & 0 deletions packages/core/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
export {
type CodecEnvelope,
isCodecEnvelope,
type MessageCodec,
MessageCodecEnum,
type MessageCodecHandler,
} from './codec/messageCodec.ts'
export { DoNotProcessMessageError } from './errors/DoNotProcessError.ts'
export {
isMessageError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ export const PAYLOAD_REF_SCHEMA = z.object({
store: z.string().min(1),
/** Size of the payload in bytes */
size: z.number().int().positive(),
/**
* Codec used to compress the stored payload.
* When set, the stored bytes are raw compressed binary (not base64 JSON).
* The consumer must decompress using this codec before parsing.
*/
codec: z.string().optional(),
})

export type PayloadRef = z.output<typeof PAYLOAD_REF_SCHEMA>
Expand Down
Loading
Loading