From edfbc9a465f069c96ddae54ed90622d30c660d64 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 31 May 2026 19:10:37 +0000 Subject: [PATCH 1/7] feat(parakeet-cpp): dynamic-batching scheduler (queue + dispatcher) Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- backend/go/parakeet-cpp/batcher.go | 79 ++++++++++++++ backend/go/parakeet-cpp/batcher_test.go | 131 ++++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 backend/go/parakeet-cpp/batcher.go create mode 100644 backend/go/parakeet-cpp/batcher_test.go diff --git a/backend/go/parakeet-cpp/batcher.go b/backend/go/parakeet-cpp/batcher.go new file mode 100644 index 000000000000..4a7c169e710d --- /dev/null +++ b/backend/go/parakeet-cpp/batcher.go @@ -0,0 +1,79 @@ +package main + +import "time" + +// batchRequest is one in-flight unary transcription waiting to be batched. +// In production pcm/decoder are set; tag is an opaque marker used by tests. +type batchRequest struct { + pcm []float32 + decoder int32 + tag string + reply chan batchReply +} + +// batchReply carries one per-item JSON object string (an element of the C-API's +// JSON array) or an error back to the waiting handler goroutine. +type batchReply struct { + json string + err error +} + +// batcher coalesces concurrent batchRequests into batched runBatch calls. A +// single run() goroutine is the sole caller of runBatch, so runBatch (which in +// production calls the thread-unsafe C engine) is never entered concurrently. +type batcher struct { + submit chan *batchRequest + maxSize int + maxWait time.Duration + runBatch func(reqs []*batchRequest) // must deliver a reply to every req +} + +func newBatcher(maxSize int, maxWait time.Duration, runBatch func([]*batchRequest)) *batcher { + if maxSize < 1 { + maxSize = 1 + } + return &batcher{ + submit: make(chan *batchRequest), + maxSize: maxSize, + maxWait: maxWait, + runBatch: runBatch, + } +} + +// run is the dispatcher loop: accumulate submitted requests until either maxSize +// is reached or maxWait elapses since the first queued request, then dispatch. +// Exits when stop is closed (draining any partially-filled batch first). +func (b *batcher) run(stop <-chan struct{}) { + for { + var first *batchRequest + select { + case first = <-b.submit: + case <-stop: + return + } + batch := []*batchRequest{first} + + // maxSize==1 disables batching: dispatch immediately (passthrough). + if b.maxSize == 1 { + b.runBatch(batch) + continue + } + + timer := time.NewTimer(b.maxWait) + fill: + for len(batch) < b.maxSize { + select { + case r := <-b.submit: + batch = append(batch, r) + case <-timer.C: + break fill + case <-stop: + timer.Stop() + b.runBatch(batch) + return + } + } + timer.Stop() + b.runBatch(batch) + } +} diff --git a/backend/go/parakeet-cpp/batcher_test.go b/backend/go/parakeet-cpp/batcher_test.go new file mode 100644 index 000000000000..59eecf61111c --- /dev/null +++ b/backend/go/parakeet-cpp/batcher_test.go @@ -0,0 +1,131 @@ +package main + +import ( + "sync" + "testing" + "time" +) + +func TestBatcherCoalescesConcurrentSubmits(t *testing.T) { + var mu sync.Mutex + var sizes []int + run := func(reqs []*batchRequest) { + mu.Lock() + sizes = append(sizes, len(reqs)) + mu.Unlock() + for _, r := range reqs { + r.reply <- batchReply{json: r.tag} + } + } + b := newBatcher(4, 50*time.Millisecond, run) + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + + const N = 4 + var wg sync.WaitGroup + got := make([]string, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: string(rune('a' + i)), reply: rep} + got[i] = (<-rep).json + }(i) + } + wg.Wait() + + mu.Lock() + defer mu.Unlock() + total, maxBatch := 0, 0 + for _, s := range sizes { + total += s + if s > maxBatch { + maxBatch = s + } + } + if total != N { + t.Fatalf("handled %d, want %d", total, N) + } + if maxBatch < 2 { + t.Fatalf("no coalescing: max batch %d", maxBatch) + } +} + +func TestBatcherSizeTrigger(t *testing.T) { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + for _, r := range reqs { + r.reply <- batchReply{json: r.tag} + } + } + b := newBatcher(2, time.Hour, run) // huge window: only size triggers + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + for i := 0; i < 2; i++ { + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func(rep chan batchReply) { <-rep }(rep) + } + select { + case n := <-dispatched: + if n != 2 { + t.Fatalf("size trigger batch = %d, want 2", n) + } + case <-time.After(2 * time.Second): + t.Fatal("size trigger did not fire") + } +} + +func TestBatcherWindowTrigger(t *testing.T) { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + for _, r := range reqs { + r.reply <- batchReply{json: r.tag} + } + } + b := newBatcher(8, 20*time.Millisecond, run) // size never reached; window fires + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func() { <-rep }() + select { + case n := <-dispatched: + if n != 1 { + t.Fatalf("window batch = %d, want 1", n) + } + case <-time.After(2 * time.Second): + t.Fatal("window trigger did not fire") + } +} + +func TestBatcherSizeOneBypass(t *testing.T) { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + for _, r := range reqs { + r.reply <- batchReply{json: r.tag} + } + } + b := newBatcher(1, time.Hour, run) // size 1 => immediate per-request dispatch + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func() { <-rep }() + select { + case n := <-dispatched: + if n != 1 { + t.Fatalf("size-1 batch = %d, want 1", n) + } + case <-time.After(2 * time.Second): + t.Fatal("size-1 dispatch did not fire") + } +} From 0bc3e2b9bd64927aa7b4019364172e17affecc8d Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 31 May 2026 19:17:22 +0000 Subject: [PATCH 2/7] feat(parakeet-cpp): dynamic batching for AudioTranscription via batched JSON C-API Drop SingleThread; route unary transcription through the in-process batcher which coalesces concurrent requests into one batched engine call. Streaming stays mutually exclusive via engineMu. Adds batch_max_size / batch_max_wait_ms options (size=1 disables; recommended on CPU). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- backend/go/parakeet-cpp/goparakeetcpp.go | 139 +++++++++++++++--- backend/go/parakeet-cpp/goparakeetcpp_test.go | 1 + backend/go/parakeet-cpp/main.go | 1 + docs/content/features/audio-to-text.md | 16 ++ 4 files changed, 138 insertions(+), 19 deletions(-) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index f8d49f058e95..fdc408ac8b4a 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -7,7 +7,10 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" + "sync" + "time" "unsafe" "github.com/go-audio/wav" @@ -34,6 +37,15 @@ var ( CppFreeString func(s uintptr) CppLastError func(ctx uintptr) string + // Batched JSON transcription: takes a concatenated float buffer of clips + // plus their per-clip sample counts (sum(nSamples)==len(samplesConcat)) + // and returns a malloc'd char* JSON ARRAY of per-clip {"text","words", + // "tokens"} objects (uintptr, freed via CppFreeString). purego passes the + // Go slices as the base pointer of their backing array (kept alive for the + // call), matching the CppStreamFeed pcm []float32 binding pattern; the C + // side reads them as const float*/const int*. + CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr + // Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for // non-streaming models. feed/finalize return a malloc'd char* (uintptr, // freed via CppFreeString); feed writes 1 to *eouOut on an /. @@ -77,11 +89,17 @@ type transcriptToken struct { } // ParakeetCpp owns a single loaded parakeet_ctx. The C engine is a -// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp), so we -// serialize calls through base.SingleThread. +// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp). Rather than +// serialize every call through base.SingleThread, we route unary +// transcription through an in-process batcher (its sole dispatcher goroutine +// is the only caller of the engine on that path) and guard the shared engine +// with engineMu so a streaming session and a batched-unary dispatch never +// touch it concurrently. type ParakeetCpp struct { - base.SingleThread - ctxPtr uintptr + base.Base + ctxPtr uintptr + engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming) + bat *batcher } // Load is the LocalAI gRPC entry point for LoadModel: it calls @@ -100,13 +118,81 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { return fmt.Errorf("parakeet-cpp: parakeet_capi_load failed for %q", opts.ModelFile) } p.ctxPtr = ctx + + // Dynamic batching knobs (model YAML options:, key:value form). On GPU, + // coalescing concurrent requests into one batched engine call improves + // throughput; set batch_max_size:1 to disable (recommended on CPU). + maxSize := optInt(opts, "batch_max_size", 8) + maxWaitMs := optInt(opts, "batch_max_wait_ms", 15) + if maxWaitMs < 0 { + maxWaitMs = 0 + } + p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch) + go p.bat.run(make(chan struct{})) // dispatcher lives for the process lifetime return nil } -// AudioTranscription runs parakeet_capi_transcribe_path_json on the wav at -// opts.Dst with the default decoder (decoder=0, which selects the right head -// per architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes -// the per-word timestamps into a LocalAI TranscriptResult. +// optInt reads an integer model option (key:value form) from ModelOptions, +// returning def when absent or unparseable. The options array carries the +// model YAML's options: entries (see core/config; siblings such as +// acestep-cpp parse the same key:value form via strings.Cut on ":"). +func optInt(opts *pb.ModelOptions, key string, def int) int { + for _, o := range opts.GetOptions() { + k, v, ok := strings.Cut(o, ":") + if ok && strings.TrimSpace(k) == key { + if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil { + return n + } + } + } + return def +} + +// runBatch is the dispatcher's batch handler and the ONLY caller of the C +// engine on the unary path. It concatenates the batch PCM, calls the batched +// JSON C-API under engineMu, splits the JSON array, and replies to each request. +func (p *ParakeetCpp) runBatch(reqs []*batchRequest) { + var concat []float32 + nSamples := make([]int32, len(reqs)) + for i, r := range reqs { + nSamples[i] = int32(len(r.pcm)) + concat = append(concat, r.pcm...) + } + var dec int32 + if len(reqs) > 0 { + dec = reqs[0].decoder + } + p.engineMu.Lock() + cstr := CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec) + p.engineMu.Unlock() + if cstr == 0 { + err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr)) + for _, r := range reqs { + r.reply <- batchReply{err: err} + } + return + } + raw := goStringFromCPtr(cstr) + CppFreeString(cstr) + var docs []json.RawMessage + if err := json.Unmarshal([]byte(raw), &docs); err != nil || len(docs) != len(reqs) { + e := fmt.Errorf("parakeet-cpp: batch json: got %d results for %d reqs (%v)", len(docs), len(reqs), err) + for _, r := range reqs { + r.reply <- batchReply{err: e} + } + return + } + for i, r := range reqs { + r.reply <- batchReply{json: string(docs[i])} + } +} + +// AudioTranscription decodes the wav at opts.Dst to 16 kHz mono PCM and +// submits it to the in-process batcher, which coalesces concurrent requests +// into a single batched engine call (parakeet_capi_transcribe_pcm_batch_json) +// with the default decoder (decoder=0, which selects the right head per +// architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes the +// per-word timestamps into a LocalAI TranscriptResult. // // Parakeet emits word- and token-level timestamps but no native segment // boundaries, so we synthesise a single whole-clip segment spanning the first @@ -118,7 +204,7 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { // translate/diarize/prompt/temperature/language/threads are not applicable to // parakeet and are ignored; streaming is handled by AudioTranscriptionStream // (L2). -func (p *ParakeetCpp) AudioTranscription(_ context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) { +func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) { if p.ctxPtr == 0 { return pb.TranscriptResult{}, errors.New("parakeet-cpp: model not loaded") } @@ -126,20 +212,31 @@ func (p *ParakeetCpp) AudioTranscription(_ context.Context, opts *pb.TranscriptR return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required") } - cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0) - if cstr == 0 { - msg := CppLastError(p.ctxPtr) - if msg == "" { - msg = "unknown error" - } - return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", msg) + pcm, _, err := decodeWavMono16k(opts.Dst) + if err != nil { + return pb.TranscriptResult{}, err } - raw := goStringFromCPtr(cstr) - CppFreeString(cstr) + // Submit to the batcher and wait for our per-clip JSON; the dispatcher is + // the sole engine caller on this path. Both sends honour ctx cancellation. + rep := make(chan batchReply, 1) + select { + case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}: + case <-ctx.Done(): + return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled") + } + var res batchReply + select { + case res = <-rep: + case <-ctx.Done(): + return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled") + } + if res.err != nil { + return pb.TranscriptResult{}, res.err + } var doc transcriptJSON - if err := json.Unmarshal([]byte(raw), &doc); err != nil { + if err := json.Unmarshal([]byte(res.json), &doc); err != nil { return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err) } @@ -243,6 +340,10 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra return nil } defer CppStreamFree(stream) + // The engine is a single shared context; streaming is mutually exclusive + // with batched unary dispatch, so hold engineMu for the whole session. + p.engineMu.Lock() + defer p.engineMu.Unlock() data, duration, err := decodeWavMono16k(opts.Dst) if err != nil { diff --git a/backend/go/parakeet-cpp/goparakeetcpp_test.go b/backend/go/parakeet-cpp/goparakeetcpp_test.go index 9ce4251398aa..8893bd50e9f4 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp_test.go +++ b/backend/go/parakeet-cpp/goparakeetcpp_test.go @@ -43,6 +43,7 @@ func ensureLibLoaded() { purego.RegisterLibFunc(&CppFree, lib, "parakeet_capi_free") purego.RegisterLibFunc(&CppTranscribePath, lib, "parakeet_capi_transcribe_path") purego.RegisterLibFunc(&CppTranscribePathJSON, lib, "parakeet_capi_transcribe_path_json") + purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") purego.RegisterLibFunc(&CppStreamBegin, lib, "parakeet_capi_stream_begin") purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed") purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize") diff --git a/backend/go/parakeet-cpp/main.go b/backend/go/parakeet-cpp/main.go index a8fd7fe3bbba..9788f0d2f1e8 100644 --- a/backend/go/parakeet-cpp/main.go +++ b/backend/go/parakeet-cpp/main.go @@ -47,6 +47,7 @@ func main() { {&CppFree, "parakeet_capi_free"}, {&CppTranscribePath, "parakeet_capi_transcribe_path"}, {&CppTranscribePathJSON, "parakeet_capi_transcribe_path_json"}, + {&CppTranscribePcmBatchJSON, "parakeet_capi_transcribe_pcm_batch_json"}, {&CppStreamBegin, "parakeet_capi_stream_begin"}, {&CppStreamFeed, "parakeet_capi_stream_feed"}, {&CppStreamFinalize, "parakeet_capi_stream_finalize"}, diff --git a/docs/content/features/audio-to-text.md b/docs/content/features/audio-to-text.md index c786c9e7c09f..e72a315fcea1 100644 --- a/docs/content/features/audio-to-text.md +++ b/docs/content/features/audio-to-text.md @@ -187,6 +187,22 @@ curl http://localhost:8080/v1/audio/transcriptions \ For real-time use, load a cache-aware streaming model (e.g. `realtime_eou_120m-v1-*.gguf`) and pass `-F stream=true`. Deltas are emitted as the audio is decoded, with end-of-utterance events closing each segment. +### Dynamic batching + +The backend coalesces concurrent transcription requests into a single batched engine call, which improves throughput on GPU when many requests arrive at once. Two `options:` knobs control it: + +```yaml +name: parakeet-110m +backend: parakeet-cpp +parameters: + model: tdt_ctc-110m-f16.gguf +options: +- batch_max_size:8 # max requests coalesced into one batch (default 8) +- batch_max_wait_ms:15 # how long to wait to fill a batch, in ms (default 15) +``` + +Set `batch_max_size:1` to disable batching (requests run one at a time). This is recommended on CPU, where batching does not help and only adds latency. Batching only affects concurrent unary requests; streaming sessions always run on their own. + ## See also - [Audio Transform]({{< relref "audio-transform.md" >}}) — clean up the audio (echo cancellation, noise suppression, dereverberation) before passing it to a transcription model. From fbad6a9495ea851f7d191dad04538c5da5bc2128 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 31 May 2026 19:25:52 +0000 Subject: [PATCH 3/7] fix(parakeet-cpp): tear down dispatcher in Free; log batch config; preallocate; clarify stream lock Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- backend/go/parakeet-cpp/goparakeetcpp.go | 27 ++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index fdc408ac8b4a..621a5e49a47a 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -17,6 +17,7 @@ import ( "github.com/mudler/LocalAI/pkg/grpc/base" pb "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/utils" + "github.com/mudler/xlog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -100,6 +101,7 @@ type ParakeetCpp struct { ctxPtr uintptr engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming) bat *batcher + batStop chan struct{} } // Load is the LocalAI gRPC entry point for LoadModel: it calls @@ -127,8 +129,11 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { if maxWaitMs < 0 { maxWaitMs = 0 } + xlog.Info("parakeet-cpp: dynamic batching configured", + "batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs) + p.batStop = make(chan struct{}) p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch) - go p.bat.run(make(chan struct{})) // dispatcher lives for the process lifetime + go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop return nil } @@ -152,10 +157,14 @@ func optInt(opts *pb.ModelOptions, key string, def int) int { // engine on the unary path. It concatenates the batch PCM, calls the batched // JSON C-API under engineMu, splits the JSON array, and replies to each request. func (p *ParakeetCpp) runBatch(reqs []*batchRequest) { - var concat []float32 nSamples := make([]int32, len(reqs)) + total := 0 for i, r := range reqs { nSamples[i] = int32(len(r.pcm)) + total += len(r.pcm) + } + concat := make([]float32, 0, total) + for _, r := range reqs { concat = append(concat, r.pcm...) } var dec int32 @@ -340,8 +349,12 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra return nil } defer CppStreamFree(stream) - // The engine is a single shared context; streaming is mutually exclusive - // with batched unary dispatch, so hold engineMu for the whole session. + // The C engine is a single shared context: a streaming session and a batched + // unary dispatch must never touch it at once, so hold engineMu for the whole + // stream. This lock is intentionally taken AFTER the non-streaming fallback + // above returns: that fallback goes through AudioTranscription -> the batcher + // -> runBatch, which itself acquires engineMu, so locking here first would + // deadlock. Do not hoist this lock above the fallback. p.engineMu.Lock() defer p.engineMu.Unlock() @@ -463,6 +476,12 @@ func decodeWavMono16k(path string) ([]float32, float32, error) { // Free releases the underlying parakeet_ctx. Called by LocalAI when the // model is unloaded. func (p *ParakeetCpp) Free() error { + // Stop the dispatcher before releasing the engine so no in-flight runBatch + // can touch a freed ctx (close leak / use-after-free on reload). + if p.batStop != nil { + close(p.batStop) + p.batStop = nil + } if p.ctxPtr != 0 { CppFree(p.ctxPtr) p.ctxPtr = 0 From c23f5514b6d8e359d934640a5354d008db1da314 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 31 May 2026 20:09:06 +0000 Subject: [PATCH 4/7] fix(parakeet-cpp): Ginkgo batcher tests; optional batch C-API binding with per-request fallback The batched JSON C-API symbol exists only in newer libparakeet.so (ABI >= 2); probe it with Dlsym and register optionally so the backend still loads against an older library, falling back to per-request transcription. Rewrites the batcher unit tests as Ginkgo/Gomega specs (forbidigo bans t.Fatal in tests). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- backend/go/parakeet-cpp/batcher_test.go | 195 ++++++++---------- backend/go/parakeet-cpp/goparakeetcpp.go | 69 ++++--- backend/go/parakeet-cpp/goparakeetcpp_test.go | 4 +- backend/go/parakeet-cpp/main.go | 8 +- 4 files changed, 134 insertions(+), 142 deletions(-) diff --git a/backend/go/parakeet-cpp/batcher_test.go b/backend/go/parakeet-cpp/batcher_test.go index 59eecf61111c..e51122ee5e17 100644 --- a/backend/go/parakeet-cpp/batcher_test.go +++ b/backend/go/parakeet-cpp/batcher_test.go @@ -2,130 +2,107 @@ package main import ( "sync" - "testing" "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func TestBatcherCoalescesConcurrentSubmits(t *testing.T) { - var mu sync.Mutex - var sizes []int - run := func(reqs []*batchRequest) { - mu.Lock() - sizes = append(sizes, len(reqs)) - mu.Unlock() +var _ = Describe("batcher", func() { + echoReply := func(reqs []*batchRequest) { for _, r := range reqs { r.reply <- batchReply{json: r.tag} } } - b := newBatcher(4, 50*time.Millisecond, run) - stop := make(chan struct{}) - go b.run(stop) - defer close(stop) - - const N = 4 - var wg sync.WaitGroup - got := make([]string, N) - for i := 0; i < N; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - rep := make(chan batchReply, 1) - b.submit <- &batchRequest{tag: string(rune('a' + i)), reply: rep} - got[i] = (<-rep).json - }(i) - } - wg.Wait() - mu.Lock() - defer mu.Unlock() - total, maxBatch := 0, 0 - for _, s := range sizes { - total += s - if s > maxBatch { - maxBatch = s + It("coalesces concurrent submits into batches", func() { + var mu sync.Mutex + var sizes []int + run := func(reqs []*batchRequest) { + mu.Lock() + sizes = append(sizes, len(reqs)) + mu.Unlock() + echoReply(reqs) } - } - if total != N { - t.Fatalf("handled %d, want %d", total, N) - } - if maxBatch < 2 { - t.Fatalf("no coalescing: max batch %d", maxBatch) - } -} + b := newBatcher(4, 50*time.Millisecond, run) + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) -func TestBatcherSizeTrigger(t *testing.T) { - dispatched := make(chan int, 8) - run := func(reqs []*batchRequest) { - dispatched <- len(reqs) - for _, r := range reqs { - r.reply <- batchReply{json: r.tag} + const N = 4 + var wg sync.WaitGroup + got := make([]string, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: string(rune('a' + i)), reply: rep} + got[i] = (<-rep).json + }(i) } - } - b := newBatcher(2, time.Hour, run) // huge window: only size triggers - stop := make(chan struct{}) - go b.run(stop) - defer close(stop) - for i := 0; i < 2; i++ { - rep := make(chan batchReply, 1) - b.submit <- &batchRequest{tag: "x", reply: rep} - go func(rep chan batchReply) { <-rep }(rep) - } - select { - case n := <-dispatched: - if n != 2 { - t.Fatalf("size trigger batch = %d, want 2", n) + wg.Wait() + + mu.Lock() + defer mu.Unlock() + total, maxBatch := 0, 0 + for _, s := range sizes { + total += s + if s > maxBatch { + maxBatch = s + } } - case <-time.After(2 * time.Second): - t.Fatal("size trigger did not fire") - } -} + Expect(total).To(Equal(N)) + Expect(maxBatch).To(BeNumerically(">=", 2), "expected at least one batch to coalesce >1 request") + }) -func TestBatcherWindowTrigger(t *testing.T) { - dispatched := make(chan int, 8) - run := func(reqs []*batchRequest) { - dispatched <- len(reqs) - for _, r := range reqs { - r.reply <- batchReply{json: r.tag} + It("dispatches when max size is reached", func() { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + echoReply(reqs) } - } - b := newBatcher(8, 20*time.Millisecond, run) // size never reached; window fires - stop := make(chan struct{}) - go b.run(stop) - defer close(stop) - rep := make(chan batchReply, 1) - b.submit <- &batchRequest{tag: "x", reply: rep} - go func() { <-rep }() - select { - case n := <-dispatched: - if n != 1 { - t.Fatalf("window batch = %d, want 1", n) + b := newBatcher(2, time.Hour, run) // huge window: only size can trigger + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + for i := 0; i < 2; i++ { + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func(rep chan batchReply) { <-rep }(rep) } - case <-time.After(2 * time.Second): - t.Fatal("window trigger did not fire") - } -} + Eventually(dispatched, "2s").Should(Receive(Equal(2))) + }) -func TestBatcherSizeOneBypass(t *testing.T) { - dispatched := make(chan int, 8) - run := func(reqs []*batchRequest) { - dispatched <- len(reqs) - for _, r := range reqs { - r.reply <- batchReply{json: r.tag} + It("dispatches when the wait window elapses", func() { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + echoReply(reqs) } - } - b := newBatcher(1, time.Hour, run) // size 1 => immediate per-request dispatch - stop := make(chan struct{}) - go b.run(stop) - defer close(stop) - rep := make(chan batchReply, 1) - b.submit <- &batchRequest{tag: "x", reply: rep} - go func() { <-rep }() - select { - case n := <-dispatched: - if n != 1 { - t.Fatalf("size-1 batch = %d, want 1", n) + b := newBatcher(8, 20*time.Millisecond, run) // size unreachable; window fires + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func() { <-rep }() + Eventually(dispatched, "2s").Should(Receive(Equal(1))) + }) + + It("bypasses batching when max size is 1", func() { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + echoReply(reqs) } - case <-time.After(2 * time.Second): - t.Fatal("size-1 dispatch did not fire") - } -} + b := newBatcher(1, time.Hour, run) // size 1 => immediate dispatch + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func() { <-rep }() + Eventually(dispatched, "2s").Should(Receive(Equal(1))) + }) +}) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index 621a5e49a47a..96160590ccf3 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -129,11 +129,16 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { if maxWaitMs < 0 { maxWaitMs = 0 } - xlog.Info("parakeet-cpp: dynamic batching configured", - "batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs) - p.batStop = make(chan struct{}) - p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch) - go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop + if CppTranscribePcmBatchJSON != nil { + p.batStop = make(chan struct{}) + p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch) + go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop + xlog.Info("parakeet-cpp: dynamic batching enabled", + "batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs) + } else { + xlog.Info("parakeet-cpp: batched C-API not present in libparakeet.so; " + + "batching disabled, using per-request transcription") + } return nil } @@ -221,13 +226,29 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required") } + // Fallback when the batched C-API is unavailable: transcribe directly from + // the file path (original behavior, no batching). + if p.bat == nil { + cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0) + if cstr == 0 { + return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", CppLastError(p.ctxPtr)) + } + raw := goStringFromCPtr(cstr) + CppFreeString(cstr) + var doc transcriptJSON + if err := json.Unmarshal([]byte(raw), &doc); err != nil { + return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err) + } + return transcriptResultFromDoc(doc, opts), nil + } + + // Batched path: decode to PCM, submit to the batcher, wait for this request's + // JSON element. The dispatcher is the sole engine caller on this path; both + // sends honour ctx cancellation. pcm, _, err := decodeWavMono16k(opts.Dst) if err != nil { return pb.TranscriptResult{}, err } - - // Submit to the batcher and wait for our per-clip JSON; the dispatcher is - // the sole engine caller on this path. Both sends honour ctx cancellation. rep := make(chan batchReply, 1) select { case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}: @@ -243,50 +264,36 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip if res.err != nil { return pb.TranscriptResult{}, res.err } - var doc transcriptJSON if err := json.Unmarshal([]byte(res.json), &doc); err != nil { return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err) } + return transcriptResultFromDoc(doc, opts), nil +} +// transcriptResultFromDoc maps a decoded transcriptJSON to a TranscriptResult, +// synthesising a single whole-clip segment and attaching word timings only when +// the caller requested word granularity. Shared by the batched and direct paths. +func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest) pb.TranscriptResult { text := strings.TrimSpace(doc.Text) - words := make([]*pb.TranscriptWord, 0, len(doc.Words)) for _, w := range doc.Words { - words = append(words, &pb.TranscriptWord{ - Start: secondsToNanos(w.Start), - End: secondsToNanos(w.End), - Text: w.W, - }) + words = append(words, &pb.TranscriptWord{Start: secondsToNanos(w.Start), End: secondsToNanos(w.End), Text: w.W}) } - tokens := make([]int32, 0, len(doc.Tokens)) for _, t := range doc.Tokens { tokens = append(tokens, t.ID) } - - // Single whole-clip segment, spanning the first word start to the last - // word end (0/0 when the clip produced no words). var segStart, segEnd int64 if len(words) > 0 { segStart = words[0].Start segEnd = words[len(words)-1].End } - seg := &pb.TranscriptSegment{ - Id: 0, - Start: segStart, - End: segEnd, - Text: text, - Tokens: tokens, - } + seg := &pb.TranscriptSegment{Id: 0, Start: segStart, End: segEnd, Text: text, Tokens: tokens} if wordsRequested(opts.TimestampGranularities) { seg.Words = words } - - return pb.TranscriptResult{ - Text: text, - Segments: []*pb.TranscriptSegment{seg}, - }, nil + return pb.TranscriptResult{Text: text, Segments: []*pb.TranscriptSegment{seg}} } // wordsRequested reports whether the caller asked for word-level timestamps. diff --git a/backend/go/parakeet-cpp/goparakeetcpp_test.go b/backend/go/parakeet-cpp/goparakeetcpp_test.go index 8893bd50e9f4..c059eb4bff11 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp_test.go +++ b/backend/go/parakeet-cpp/goparakeetcpp_test.go @@ -43,7 +43,9 @@ func ensureLibLoaded() { purego.RegisterLibFunc(&CppFree, lib, "parakeet_capi_free") purego.RegisterLibFunc(&CppTranscribePath, lib, "parakeet_capi_transcribe_path") purego.RegisterLibFunc(&CppTranscribePathJSON, lib, "parakeet_capi_transcribe_path_json") - purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") + if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 { + purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") + } purego.RegisterLibFunc(&CppStreamBegin, lib, "parakeet_capi_stream_begin") purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed") purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize") diff --git a/backend/go/parakeet-cpp/main.go b/backend/go/parakeet-cpp/main.go index 9788f0d2f1e8..32d94b7b1aa8 100644 --- a/backend/go/parakeet-cpp/main.go +++ b/backend/go/parakeet-cpp/main.go @@ -47,7 +47,6 @@ func main() { {&CppFree, "parakeet_capi_free"}, {&CppTranscribePath, "parakeet_capi_transcribe_path"}, {&CppTranscribePathJSON, "parakeet_capi_transcribe_path_json"}, - {&CppTranscribePcmBatchJSON, "parakeet_capi_transcribe_pcm_batch_json"}, {&CppStreamBegin, "parakeet_capi_stream_begin"}, {&CppStreamFeed, "parakeet_capi_stream_feed"}, {&CppStreamFinalize, "parakeet_capi_stream_finalize"}, @@ -59,6 +58,13 @@ func main() { purego.RegisterLibFunc(lf.FuncPtr, lib, lf.Name) } + // The batched-JSON entry point exists only in newer libparakeet.so (ABI >= 2). + // Probe with Dlsym and register only if present, so the backend still loads + // against an older library (it falls back to per-request transcription). + if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 { + purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") + } + fmt.Fprintf(os.Stderr, "[parakeet-cpp] ABI=%d\n", CppAbiVersion()) flag.Parse() From 8ab61507800a67d73eeb3d2774e88eb43209fcf0 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 31 May 2026 20:52:00 +0000 Subject: [PATCH 5/7] feat(parakeet-cpp): debug-log coalesced batch size in runBatch Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- backend/go/parakeet-cpp/goparakeetcpp.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index 96160590ccf3..c29b7d05326c 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -162,6 +162,9 @@ func optInt(opts *pb.ModelOptions, key string, def int) int { // engine on the unary path. It concatenates the batch PCM, calls the batched // JSON C-API under engineMu, splits the JSON array, and replies to each request. func (p *ParakeetCpp) runBatch(reqs []*batchRequest) { + // Observability: the actual coalesced batch size per engine call. Debug-level + // so it stays silent in normal operation but lets operators confirm/tune batching. + xlog.Debug("parakeet-cpp: dispatching batch", "size", len(reqs)) nSamples := make([]int32, len(reqs)) total := 0 for i, r := range reqs { From a9f6c8cb220c2579d2761c27ca3f7175e1577120 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 1 Jun 2026 12:53:40 +0000 Subject: [PATCH 6/7] fix(parakeet-cpp): default batch_max_size to 1 (batching opt-in) Dynamic batching now defaults off (batch_max_size:1, one request at a time). Raise batch_max_size to opt in: it is a large throughput win on GPU under concurrent load, but on CPU and low-concurrency setups it only adds latency, so off is the safer default. The startup log now states whether batching is on or off, and the audio-to-text docs are updated to match. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --- backend/go/parakeet-cpp/goparakeetcpp.go | 19 +++++++++++++------ docs/content/features/audio-to-text.md | 6 +++--- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index c29b7d05326c..969962a76e22 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -121,10 +121,12 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { } p.ctxPtr = ctx - // Dynamic batching knobs (model YAML options:, key:value form). On GPU, - // coalescing concurrent requests into one batched engine call improves - // throughput; set batch_max_size:1 to disable (recommended on CPU). - maxSize := optInt(opts, "batch_max_size", 8) + // Dynamic batching knobs (model YAML options:, key:value form). Batching is + // OFF by default (batch_max_size:1): each request runs on its own. On GPU, + // raising batch_max_size coalesces concurrent requests into one batched + // engine call and improves throughput under load; leave it at 1 on CPU and + // for low-concurrency setups, where batching only adds latency. + maxSize := optInt(opts, "batch_max_size", 1) maxWaitMs := optInt(opts, "batch_max_wait_ms", 15) if maxWaitMs < 0 { maxWaitMs = 0 @@ -133,8 +135,13 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { p.batStop = make(chan struct{}) p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch) go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop - xlog.Info("parakeet-cpp: dynamic batching enabled", - "batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs) + if maxSize > 1 { + xlog.Info("parakeet-cpp: dynamic batching enabled", + "batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs) + } else { + xlog.Info("parakeet-cpp: dynamic batching off (batch_max_size=1); " + + "set batch_max_size>1 to coalesce concurrent requests on GPU") + } } else { xlog.Info("parakeet-cpp: batched C-API not present in libparakeet.so; " + "batching disabled, using per-request transcription") diff --git a/docs/content/features/audio-to-text.md b/docs/content/features/audio-to-text.md index e72a315fcea1..22e7d2529aae 100644 --- a/docs/content/features/audio-to-text.md +++ b/docs/content/features/audio-to-text.md @@ -189,7 +189,7 @@ For real-time use, load a cache-aware streaming model (e.g. `realtime_eou_120m-v ### Dynamic batching -The backend coalesces concurrent transcription requests into a single batched engine call, which improves throughput on GPU when many requests arrive at once. Two `options:` knobs control it: +The backend can coalesce concurrent transcription requests into a single batched engine call, which improves throughput on GPU when many requests arrive at once. Batching is **off by default** (`batch_max_size:1`, one request at a time); raise it to opt in. Two `options:` knobs control it: ```yaml name: parakeet-110m @@ -197,11 +197,11 @@ backend: parakeet-cpp parameters: model: tdt_ctc-110m-f16.gguf options: -- batch_max_size:8 # max requests coalesced into one batch (default 8) +- batch_max_size:8 # max requests coalesced into one batch (default 1 = off) - batch_max_wait_ms:15 # how long to wait to fill a batch, in ms (default 15) ``` -Set `batch_max_size:1` to disable batching (requests run one at a time). This is recommended on CPU, where batching does not help and only adds latency. Batching only affects concurrent unary requests; streaming sessions always run on their own. +By default each request runs on its own. Raise `batch_max_size` (for example 4 to 16) to enable batching; it pays off on GPU under concurrent load, where coalescing the per-step decode GEMMs across requests is a large throughput win. Leave it at 1 on CPU and for low-concurrency setups, where batching only adds latency. Batching only affects concurrent unary requests; streaming sessions always run on their own. ## See also From 14cd9b2fa7a56a9276d7ea1157eb285a759ec400 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 1 Jun 2026 21:02:14 +0000 Subject: [PATCH 7/7] chore(parakeet-cpp): bump parakeet.cpp to 8a7c482 (batched decode + B=1 fast-path) parakeet.cpp PR #1 merged the batched encoder/decode and the B=1 encoder fast-path to master. Point PARAKEET_VERSION at that commit so the backend builds the batched C-API (parakeet_capi_transcribe_pcm_batch_json) that the dynamic batcher calls; the prior pin (30a3075) predated it, so only the per-request fallback path was exercised. Verified the shared lib builds with the backend's CMake flags and exports the batch symbol. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --- backend/go/parakeet-cpp/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/go/parakeet-cpp/Makefile b/backend/go/parakeet-cpp/Makefile index de0989640356..fa6a94d35d39 100644 --- a/backend/go/parakeet-cpp/Makefile +++ b/backend/go/parakeet-cpp/Makefile @@ -1,6 +1,6 @@ # parakeet-cpp backend Makefile. # -# Upstream pin lives below as PARAKEET_VERSION?=cb45f68068081af01e7092e91b038ee353eb56be +# Upstream pin lives below as PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543 # (.github/bump_deps.sh) can find and update it - matches the # whisper.cpp / ds4 / vibevoice-cpp convention. # @@ -15,7 +15,7 @@ # That's what the L0 smoke test uses. The default target below does the # proper clone-at-pin + cmake build so CI doesn't need a side-checkout. -PARAKEET_VERSION?=cb45f68068081af01e7092e91b038ee353eb56be +PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543 PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp GOCMD?=go