11import { randomUUID } from 'node:crypto'
2+ import { pipeline } from 'node:stream/promises'
23import { setTimeout } from 'node:timers/promises'
34import {
45 InternalError ,
@@ -190,19 +191,23 @@ export abstract class AbstractKafkaConsumer<
190191 } )
191192
192193 this . consumerStream = await this . consumer . consume ( { ...consumeOptions , topics } )
193- this . consumerStream . on ( 'error' , ( error ) => this . handlerError ( error ) )
194194
195195 if ( this . options . batchProcessingEnabled && this . options . batchProcessingOptions ) {
196196 this . messageBatchStream = new KafkaMessageBatchStream <
197197 DeserializedMessage < SupportedMessageValues < TopicsConfig > >
198- > (
199- ( batch ) =>
200- this . consume ( batch . topic , batch . messages ) . catch ( ( error ) => this . handlerError ( error ) ) ,
201- this . options . batchProcessingOptions ,
198+ > ( {
199+ batchSize : this . options . batchProcessingOptions . batchSize ,
200+ timeoutMilliseconds : this . options . batchProcessingOptions . timeoutMilliseconds ,
201+ readableHighWaterMark : this . options . batchProcessingOptions . readableHighWaterMark ,
202+ } )
203+
204+ // Use pipeline for better error handling and backpressure management.
205+ // pipeline() internally listens for errors on all streams
206+ pipeline ( this . consumerStream , this . messageBatchStream ) . catch ( ( error ) =>
207+ this . handlerError ( error ) ,
202208 )
203- this . consumerStream . pipe ( this . messageBatchStream )
204209 } else {
205- this . handleSyncStream ( this . consumerStream ) . catch ( ( error ) => this . handlerError ( error ) )
210+ this . consumerStream . on ( 'error' , ( error ) => this . handlerError ( error ) )
206211 }
207212 } catch ( error ) {
208213 throw new InternalError ( {
@@ -211,6 +216,12 @@ export abstract class AbstractKafkaConsumer<
211216 cause : error ,
212217 } )
213218 }
219+
220+ if ( this . messageBatchStream ) {
221+ this . handleSyncStreamBatch ( this . messageBatchStream ) . catch ( ( error ) => this . handlerError ( error ) )
222+ } else {
223+ this . handleSyncStream ( this . consumerStream ) . catch ( ( error ) => this . handlerError ( error ) )
224+ }
214225 }
215226
216227 private async handleSyncStream (
@@ -223,6 +234,13 @@ export abstract class AbstractKafkaConsumer<
223234 )
224235 }
225236 }
237+ private async handleSyncStreamBatch (
238+ stream : KafkaMessageBatchStream < DeserializedMessage < SupportedMessageValues < TopicsConfig > > > ,
239+ ) : Promise < void > {
240+ for await ( const messageBatch of stream ) {
241+ await this . consume ( messageBatch [ 0 ] . topic , messageBatch )
242+ }
243+ }
226244
227245 async close ( ) : Promise < void > {
228246 if ( ! this . consumerStream && ! this . messageBatchStream ) {
@@ -371,6 +389,7 @@ export abstract class AbstractKafkaConsumer<
371389 ) : Promise < MessageProcessingResult > {
372390 try {
373391 const isBatch = Array . isArray ( messageOrBatch )
392+ /* v8 ignore start */
374393 if ( this . options . batchProcessingEnabled && ! isBatch ) {
375394 throw new Error (
376395 'Batch processing is enabled, but a single message was passed to the handler' ,
@@ -381,6 +400,7 @@ export abstract class AbstractKafkaConsumer<
381400 'Batch processing is disabled, but a batch of messages was passed to the handler' ,
382401 )
383402 }
403+ /* v8 ignore stop */
384404
385405 await handler (
386406 // We need casting to match message type with handler type - it is safe as we verify the type above
@@ -395,10 +415,7 @@ export abstract class AbstractKafkaConsumer<
395415 const errorContext = Array . isArray ( messageOrBatch )
396416 ? { batchSize : messageOrBatch . length }
397417 : { message : stringValueSerializer ( messageOrBatch . value ) }
398- this . handlerError ( error , {
399- topic,
400- ...errorContext ,
401- } )
418+ this . handlerError ( error , { topic, ...errorContext } )
402419 }
403420
404421 return { status : 'error' , errorReason : 'handlerError' }
@@ -443,7 +460,7 @@ export abstract class AbstractKafkaConsumer<
443460 } catch ( error ) {
444461 this . logger . debug ( logDetails , 'Message commit failed' )
445462 if ( error instanceof ResponseError ) return this . handleResponseErrorOnCommit ( error )
446- throw error
463+ this . handlerError ( error )
447464 }
448465 }
449466
@@ -455,7 +472,7 @@ export abstract class AbstractKafkaConsumer<
455472 error . apiCode &&
456473 commitErrorCodesToIgnore . has ( error . apiCode )
457474 ) {
458- this . logger . error (
475+ this . logger . warn (
459476 {
460477 apiCode : error . apiCode ,
461478 apiId : error . apiId ,
@@ -466,8 +483,7 @@ export abstract class AbstractKafkaConsumer<
466483 `Failed to commit message: ${ error . message } ` ,
467484 )
468485 } else {
469- // If error is not recognized, rethrow it
470- throw responseError
486+ this . handlerError ( error )
471487 }
472488 }
473489 }
0 commit comments