Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ea6bb3a
Back to duplex
CarlosGamero Jan 21, 2026
a633009
Proper backpreassure handling on batch stream
CarlosGamero Jan 21, 2026
1b967fa
Improving tests
CarlosGamero Jan 21, 2026
8fed538
lint
CarlosGamero Jan 21, 2026
6b516d4
Ajusting consumer
CarlosGamero Jan 21, 2026
13aaacb
Improving logging
CarlosGamero Feb 19, 2026
f54213e
Merge branch 'main' into feat/improve_kafka_backpreassure
CarlosGamero Feb 26, 2026
37b9574
erro handling
CarlosGamero Feb 26, 2026
f3b2848
Update kafka lib
CarlosGamero Feb 26, 2026
094ad19
Remove maxWait from test
CarlosGamero Feb 26, 2026
1814df8
Lint fix
CarlosGamero Feb 26, 2026
f2adcea
Revert "Remove maxWait from test"
CarlosGamero Feb 26, 2026
36dbdb5
Coverage fix
CarlosGamero Feb 26, 2026
719d19f
Handling backpreassure on timeout + tests
CarlosGamero Feb 26, 2026
3e19de7
Typo fix
CarlosGamero Feb 27, 2026
6d733b2
New readableHighWaterMark param
CarlosGamero Feb 27, 2026
9e81bc1
Adding test
CarlosGamero Feb 27, 2026
87e8a1c
Addressing final comment
CarlosGamero Feb 27, 2026
fb7e355
AI comments
CarlosGamero Feb 27, 2026
876f49a
Error handling
CarlosGamero Feb 27, 2026
983424f
Minor change
CarlosGamero Feb 27, 2026
041eefd
Merge branch 'main' of https://github.com/kibertoad/message-queue-too…
kibertoad Feb 27, 2026
8b76727
Merge branch 'main' into feat/improve_kafka_backpreassure
CarlosGamero Feb 27, 2026
b68c633
typo
CarlosGamero Feb 27, 2026
259ac31
Force fixed dep version
kibertoad Feb 28, 2026
75f6c40
Merge remote-tracking branch 'origin/feat/improve_kafka_backpreassure…
kibertoad Feb 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions packages/kafka/lib/AbstractKafkaConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { randomUUID } from 'node:crypto'
import { pipeline } from 'node:stream/promises'
import { setTimeout } from 'node:timers/promises'
import {
InternalError,
Expand Down Expand Up @@ -190,19 +191,23 @@ export abstract class AbstractKafkaConsumer<
})

this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
this.consumerStream.on('error', (error) => this.handlerError(error))

if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
this.messageBatchStream = new KafkaMessageBatchStream<
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
>(
(batch) =>
this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)),
this.options.batchProcessingOptions,
>({
batchSize: this.options.batchProcessingOptions.batchSize,
timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds,
readableHighWaterMark: this.options.batchProcessingOptions.readableHighWaterMark,
})

// Use pipeline for better error handling and backpressure management.
// pipeline() internally listens for errors on all streams
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
)
this.consumerStream.pipe(this.messageBatchStream)
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
this.consumerStream.on('error', (error) => this.handlerError(error))
}
} catch (error) {
throw new InternalError({
Expand All @@ -211,6 +216,12 @@ export abstract class AbstractKafkaConsumer<
cause: error,
})
}

if (this.messageBatchStream) {
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
}
}

private async handleSyncStream(
Expand All @@ -223,6 +234,13 @@ export abstract class AbstractKafkaConsumer<
)
}
}
private async handleSyncStreamBatch(
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
): Promise<void> {
for await (const messageBatch of stream) {
await this.consume(messageBatch[0].topic, messageBatch)
}
}

async close(): Promise<void> {
if (!this.consumerStream && !this.messageBatchStream) {
Expand Down Expand Up @@ -371,6 +389,7 @@ export abstract class AbstractKafkaConsumer<
): Promise<MessageProcessingResult> {
try {
const isBatch = Array.isArray(messageOrBatch)
/* v8 ignore start */
if (this.options.batchProcessingEnabled && !isBatch) {
throw new Error(
'Batch processing is enabled, but a single message was passed to the handler',
Expand All @@ -381,6 +400,7 @@ export abstract class AbstractKafkaConsumer<
'Batch processing is disabled, but a batch of messages was passed to the handler',
)
}
/* v8 ignore stop */

await handler(
// We need casting to match message type with handler type - it is safe as we verify the type above
Expand All @@ -395,10 +415,7 @@ export abstract class AbstractKafkaConsumer<
const errorContext = Array.isArray(messageOrBatch)
? { batchSize: messageOrBatch.length }
: { message: stringValueSerializer(messageOrBatch.value) }
this.handlerError(error, {
topic,
...errorContext,
})
this.handlerError(error, { topic, ...errorContext })
}

return { status: 'error', errorReason: 'handlerError' }
Expand Down Expand Up @@ -443,7 +460,7 @@ export abstract class AbstractKafkaConsumer<
} catch (error) {
this.logger.debug(logDetails, 'Message commit failed')
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
throw error
this.handlerError(error)
}
}

Expand All @@ -455,7 +472,7 @@ export abstract class AbstractKafkaConsumer<
error.apiCode &&
commitErrorCodesToIgnore.has(error.apiCode)
) {
this.logger.error(
this.logger.warn(
{
apiCode: error.apiCode,
apiId: error.apiId,
Expand All @@ -466,8 +483,7 @@ export abstract class AbstractKafkaConsumer<
`Failed to commit message: ${error.message}`,
)
} else {
// If error is not recognized, rethrow it
throw responseError
this.handlerError(error)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion packages/kafka/lib/AbstractKafkaService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ export abstract class AbstractKafkaService<

protected handlerError(error: unknown, context: Record<string, unknown> = {}): void {
this.logger.error({ ...resolveGlobalErrorLogObject(error), ...context })
if (isError(error)) this.errorReporter.report({ error, context })
if (isError(error))
this.errorReporter.report({
error,
context: context,
})
}
}
Loading
Loading