diff --git a/backend/go/parakeet-cpp/Makefile b/backend/go/parakeet-cpp/Makefile index de0989640356..fa6a94d35d39 100644 --- a/backend/go/parakeet-cpp/Makefile +++ b/backend/go/parakeet-cpp/Makefile @@ -1,6 +1,6 @@ # parakeet-cpp backend Makefile. # -# Upstream pin lives below as PARAKEET_VERSION?=cb45f68068081af01e7092e91b038ee353eb56be +# Upstream pin lives below as PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543 # (.github/bump_deps.sh) can find and update it - matches the # whisper.cpp / ds4 / vibevoice-cpp convention. # @@ -15,7 +15,7 @@ # That's what the L0 smoke test uses. The default target below does the # proper clone-at-pin + cmake build so CI doesn't need a side-checkout. -PARAKEET_VERSION?=cb45f68068081af01e7092e91b038ee353eb56be +PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543 PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp GOCMD?=go diff --git a/backend/go/parakeet-cpp/batcher.go b/backend/go/parakeet-cpp/batcher.go new file mode 100644 index 000000000000..4a7c169e710d --- /dev/null +++ b/backend/go/parakeet-cpp/batcher.go @@ -0,0 +1,79 @@ +package main + +import "time" + +// batchRequest is one in-flight unary transcription waiting to be batched. +// In production pcm/decoder are set; tag is an opaque marker used by tests. +type batchRequest struct { + pcm []float32 + decoder int32 + tag string + reply chan batchReply +} + +// batchReply carries one per-item JSON object string (an element of the C-API's +// JSON array) or an error back to the waiting handler goroutine. +type batchReply struct { + json string + err error +} + +// batcher coalesces concurrent batchRequests into batched runBatch calls. A +// single run() goroutine is the sole caller of runBatch, so runBatch (which in +// production calls the thread-unsafe C engine) is never entered concurrently. +type batcher struct { + submit chan *batchRequest + maxSize int + maxWait time.Duration + runBatch func(reqs []*batchRequest) // must deliver a reply to every req +} + +func newBatcher(maxSize int, maxWait time.Duration, runBatch func([]*batchRequest)) *batcher { + if maxSize < 1 { + maxSize = 1 + } + return &batcher{ + submit: make(chan *batchRequest), + maxSize: maxSize, + maxWait: maxWait, + runBatch: runBatch, + } +} + +// run is the dispatcher loop: accumulate submitted requests until either maxSize +// is reached or maxWait elapses since the first queued request, then dispatch. +// Exits when stop is closed (draining any partially-filled batch first). +func (b *batcher) run(stop <-chan struct{}) { + for { + var first *batchRequest + select { + case first = <-b.submit: + case <-stop: + return + } + batch := []*batchRequest{first} + + // maxSize==1 disables batching: dispatch immediately (passthrough). + if b.maxSize == 1 { + b.runBatch(batch) + continue + } + + timer := time.NewTimer(b.maxWait) + fill: + for len(batch) < b.maxSize { + select { + case r := <-b.submit: + batch = append(batch, r) + case <-timer.C: + break fill + case <-stop: + timer.Stop() + b.runBatch(batch) + return + } + } + timer.Stop() + b.runBatch(batch) + } +} diff --git a/backend/go/parakeet-cpp/batcher_test.go b/backend/go/parakeet-cpp/batcher_test.go new file mode 100644 index 000000000000..e51122ee5e17 --- /dev/null +++ b/backend/go/parakeet-cpp/batcher_test.go @@ -0,0 +1,108 @@ +package main + +import ( + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("batcher", func() { + echoReply := func(reqs []*batchRequest) { + for _, r := range reqs { + r.reply <- batchReply{json: r.tag} + } + } + + It("coalesces concurrent submits into batches", func() { + var mu sync.Mutex + var sizes []int + run := func(reqs []*batchRequest) { + mu.Lock() + sizes = append(sizes, len(reqs)) + mu.Unlock() + echoReply(reqs) + } + b := newBatcher(4, 50*time.Millisecond, run) + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + + const N = 4 + var wg sync.WaitGroup + got := make([]string, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: string(rune('a' + i)), reply: rep} + got[i] = (<-rep).json + }(i) + } + wg.Wait() + + mu.Lock() + defer mu.Unlock() + total, maxBatch := 0, 0 + for _, s := range sizes { + total += s + if s > maxBatch { + maxBatch = s + } + } + Expect(total).To(Equal(N)) + Expect(maxBatch).To(BeNumerically(">=", 2), "expected at least one batch to coalesce >1 request") + }) + + It("dispatches when max size is reached", func() { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + echoReply(reqs) + } + b := newBatcher(2, time.Hour, run) // huge window: only size can trigger + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + for i := 0; i < 2; i++ { + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func(rep chan batchReply) { <-rep }(rep) + } + Eventually(dispatched, "2s").Should(Receive(Equal(2))) + }) + + It("dispatches when the wait window elapses", func() { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + echoReply(reqs) + } + b := newBatcher(8, 20*time.Millisecond, run) // size unreachable; window fires + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func() { <-rep }() + Eventually(dispatched, "2s").Should(Receive(Equal(1))) + }) + + It("bypasses batching when max size is 1", func() { + dispatched := make(chan int, 8) + run := func(reqs []*batchRequest) { + dispatched <- len(reqs) + echoReply(reqs) + } + b := newBatcher(1, time.Hour, run) // size 1 => immediate dispatch + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: "x", reply: rep} + go func() { <-rep }() + Eventually(dispatched, "2s").Should(Receive(Equal(1))) + }) +}) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index f8d49f058e95..969962a76e22 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -7,13 +7,17 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" + "sync" + "time" "unsafe" "github.com/go-audio/wav" "github.com/mudler/LocalAI/pkg/grpc/base" pb "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/utils" + "github.com/mudler/xlog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -34,6 +38,15 @@ var ( CppFreeString func(s uintptr) CppLastError func(ctx uintptr) string + // Batched JSON transcription: takes a concatenated float buffer of clips + // plus their per-clip sample counts (sum(nSamples)==len(samplesConcat)) + // and returns a malloc'd char* JSON ARRAY of per-clip {"text","words", + // "tokens"} objects (uintptr, freed via CppFreeString). purego passes the + // Go slices as the base pointer of their backing array (kept alive for the + // call), matching the CppStreamFeed pcm []float32 binding pattern; the C + // side reads them as const float*/const int*. + CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr + // Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for // non-streaming models. feed/finalize return a malloc'd char* (uintptr, // freed via CppFreeString); feed writes 1 to *eouOut on an /. @@ -77,11 +90,18 @@ type transcriptToken struct { } // ParakeetCpp owns a single loaded parakeet_ctx. The C engine is a -// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp), so we -// serialize calls through base.SingleThread. +// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp). Rather than +// serialize every call through base.SingleThread, we route unary +// transcription through an in-process batcher (its sole dispatcher goroutine +// is the only caller of the engine on that path) and guard the shared engine +// with engineMu so a streaming session and a batched-unary dispatch never +// touch it concurrently. type ParakeetCpp struct { - base.SingleThread - ctxPtr uintptr + base.Base + ctxPtr uintptr + engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming) + bat *batcher + batStop chan struct{} } // Load is the LocalAI gRPC entry point for LoadModel: it calls @@ -100,13 +120,103 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { return fmt.Errorf("parakeet-cpp: parakeet_capi_load failed for %q", opts.ModelFile) } p.ctxPtr = ctx + + // Dynamic batching knobs (model YAML options:, key:value form). Batching is + // OFF by default (batch_max_size:1): each request runs on its own. On GPU, + // raising batch_max_size coalesces concurrent requests into one batched + // engine call and improves throughput under load; leave it at 1 on CPU and + // for low-concurrency setups, where batching only adds latency. + maxSize := optInt(opts, "batch_max_size", 1) + maxWaitMs := optInt(opts, "batch_max_wait_ms", 15) + if maxWaitMs < 0 { + maxWaitMs = 0 + } + if CppTranscribePcmBatchJSON != nil { + p.batStop = make(chan struct{}) + p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch) + go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop + if maxSize > 1 { + xlog.Info("parakeet-cpp: dynamic batching enabled", + "batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs) + } else { + xlog.Info("parakeet-cpp: dynamic batching off (batch_max_size=1); " + + "set batch_max_size>1 to coalesce concurrent requests on GPU") + } + } else { + xlog.Info("parakeet-cpp: batched C-API not present in libparakeet.so; " + + "batching disabled, using per-request transcription") + } return nil } -// AudioTranscription runs parakeet_capi_transcribe_path_json on the wav at -// opts.Dst with the default decoder (decoder=0, which selects the right head -// per architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes -// the per-word timestamps into a LocalAI TranscriptResult. +// optInt reads an integer model option (key:value form) from ModelOptions, +// returning def when absent or unparseable. The options array carries the +// model YAML's options: entries (see core/config; siblings such as +// acestep-cpp parse the same key:value form via strings.Cut on ":"). +func optInt(opts *pb.ModelOptions, key string, def int) int { + for _, o := range opts.GetOptions() { + k, v, ok := strings.Cut(o, ":") + if ok && strings.TrimSpace(k) == key { + if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil { + return n + } + } + } + return def +} + +// runBatch is the dispatcher's batch handler and the ONLY caller of the C +// engine on the unary path. It concatenates the batch PCM, calls the batched +// JSON C-API under engineMu, splits the JSON array, and replies to each request. +func (p *ParakeetCpp) runBatch(reqs []*batchRequest) { + // Observability: the actual coalesced batch size per engine call. Debug-level + // so it stays silent in normal operation but lets operators confirm/tune batching. + xlog.Debug("parakeet-cpp: dispatching batch", "size", len(reqs)) + nSamples := make([]int32, len(reqs)) + total := 0 + for i, r := range reqs { + nSamples[i] = int32(len(r.pcm)) + total += len(r.pcm) + } + concat := make([]float32, 0, total) + for _, r := range reqs { + concat = append(concat, r.pcm...) + } + var dec int32 + if len(reqs) > 0 { + dec = reqs[0].decoder + } + p.engineMu.Lock() + cstr := CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec) + p.engineMu.Unlock() + if cstr == 0 { + err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr)) + for _, r := range reqs { + r.reply <- batchReply{err: err} + } + return + } + raw := goStringFromCPtr(cstr) + CppFreeString(cstr) + var docs []json.RawMessage + if err := json.Unmarshal([]byte(raw), &docs); err != nil || len(docs) != len(reqs) { + e := fmt.Errorf("parakeet-cpp: batch json: got %d results for %d reqs (%v)", len(docs), len(reqs), err) + for _, r := range reqs { + r.reply <- batchReply{err: e} + } + return + } + for i, r := range reqs { + r.reply <- batchReply{json: string(docs[i])} + } +} + +// AudioTranscription decodes the wav at opts.Dst to 16 kHz mono PCM and +// submits it to the in-process batcher, which coalesces concurrent requests +// into a single batched engine call (parakeet_capi_transcribe_pcm_batch_json) +// with the default decoder (decoder=0, which selects the right head per +// architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes the +// per-word timestamps into a LocalAI TranscriptResult. // // Parakeet emits word- and token-level timestamps but no native segment // boundaries, so we synthesise a single whole-clip segment spanning the first @@ -118,7 +228,7 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error { // translate/diarize/prompt/temperature/language/threads are not applicable to // parakeet and are ignored; streaming is handled by AudioTranscriptionStream // (L2). -func (p *ParakeetCpp) AudioTranscription(_ context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) { +func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) { if p.ctxPtr == 0 { return pb.TranscriptResult{}, errors.New("parakeet-cpp: model not loaded") } @@ -126,61 +236,74 @@ func (p *ParakeetCpp) AudioTranscription(_ context.Context, opts *pb.TranscriptR return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required") } - cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0) - if cstr == 0 { - msg := CppLastError(p.ctxPtr) - if msg == "" { - msg = "unknown error" + // Fallback when the batched C-API is unavailable: transcribe directly from + // the file path (original behavior, no batching). + if p.bat == nil { + cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0) + if cstr == 0 { + return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", CppLastError(p.ctxPtr)) + } + raw := goStringFromCPtr(cstr) + CppFreeString(cstr) + var doc transcriptJSON + if err := json.Unmarshal([]byte(raw), &doc); err != nil { + return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err) } - return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", msg) + return transcriptResultFromDoc(doc, opts), nil } - raw := goStringFromCPtr(cstr) - CppFreeString(cstr) - + // Batched path: decode to PCM, submit to the batcher, wait for this request's + // JSON element. The dispatcher is the sole engine caller on this path; both + // sends honour ctx cancellation. + pcm, _, err := decodeWavMono16k(opts.Dst) + if err != nil { + return pb.TranscriptResult{}, err + } + rep := make(chan batchReply, 1) + select { + case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}: + case <-ctx.Done(): + return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled") + } + var res batchReply + select { + case res = <-rep: + case <-ctx.Done(): + return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled") + } + if res.err != nil { + return pb.TranscriptResult{}, res.err + } var doc transcriptJSON - if err := json.Unmarshal([]byte(raw), &doc); err != nil { + if err := json.Unmarshal([]byte(res.json), &doc); err != nil { return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err) } + return transcriptResultFromDoc(doc, opts), nil +} +// transcriptResultFromDoc maps a decoded transcriptJSON to a TranscriptResult, +// synthesising a single whole-clip segment and attaching word timings only when +// the caller requested word granularity. Shared by the batched and direct paths. +func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest) pb.TranscriptResult { text := strings.TrimSpace(doc.Text) - words := make([]*pb.TranscriptWord, 0, len(doc.Words)) for _, w := range doc.Words { - words = append(words, &pb.TranscriptWord{ - Start: secondsToNanos(w.Start), - End: secondsToNanos(w.End), - Text: w.W, - }) + words = append(words, &pb.TranscriptWord{Start: secondsToNanos(w.Start), End: secondsToNanos(w.End), Text: w.W}) } - tokens := make([]int32, 0, len(doc.Tokens)) for _, t := range doc.Tokens { tokens = append(tokens, t.ID) } - - // Single whole-clip segment, spanning the first word start to the last - // word end (0/0 when the clip produced no words). var segStart, segEnd int64 if len(words) > 0 { segStart = words[0].Start segEnd = words[len(words)-1].End } - seg := &pb.TranscriptSegment{ - Id: 0, - Start: segStart, - End: segEnd, - Text: text, - Tokens: tokens, - } + seg := &pb.TranscriptSegment{Id: 0, Start: segStart, End: segEnd, Text: text, Tokens: tokens} if wordsRequested(opts.TimestampGranularities) { seg.Words = words } - - return pb.TranscriptResult{ - Text: text, - Segments: []*pb.TranscriptSegment{seg}, - }, nil + return pb.TranscriptResult{Text: text, Segments: []*pb.TranscriptSegment{seg}} } // wordsRequested reports whether the caller asked for word-level timestamps. @@ -243,6 +366,14 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra return nil } defer CppStreamFree(stream) + // The C engine is a single shared context: a streaming session and a batched + // unary dispatch must never touch it at once, so hold engineMu for the whole + // stream. This lock is intentionally taken AFTER the non-streaming fallback + // above returns: that fallback goes through AudioTranscription -> the batcher + // -> runBatch, which itself acquires engineMu, so locking here first would + // deadlock. Do not hoist this lock above the fallback. + p.engineMu.Lock() + defer p.engineMu.Unlock() data, duration, err := decodeWavMono16k(opts.Dst) if err != nil { @@ -362,6 +493,12 @@ func decodeWavMono16k(path string) ([]float32, float32, error) { // Free releases the underlying parakeet_ctx. Called by LocalAI when the // model is unloaded. func (p *ParakeetCpp) Free() error { + // Stop the dispatcher before releasing the engine so no in-flight runBatch + // can touch a freed ctx (close leak / use-after-free on reload). + if p.batStop != nil { + close(p.batStop) + p.batStop = nil + } if p.ctxPtr != 0 { CppFree(p.ctxPtr) p.ctxPtr = 0 diff --git a/backend/go/parakeet-cpp/goparakeetcpp_test.go b/backend/go/parakeet-cpp/goparakeetcpp_test.go index 9ce4251398aa..c059eb4bff11 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp_test.go +++ b/backend/go/parakeet-cpp/goparakeetcpp_test.go @@ -43,6 +43,9 @@ func ensureLibLoaded() { purego.RegisterLibFunc(&CppFree, lib, "parakeet_capi_free") purego.RegisterLibFunc(&CppTranscribePath, lib, "parakeet_capi_transcribe_path") purego.RegisterLibFunc(&CppTranscribePathJSON, lib, "parakeet_capi_transcribe_path_json") + if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 { + purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") + } purego.RegisterLibFunc(&CppStreamBegin, lib, "parakeet_capi_stream_begin") purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed") purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize") diff --git a/backend/go/parakeet-cpp/main.go b/backend/go/parakeet-cpp/main.go index a8fd7fe3bbba..32d94b7b1aa8 100644 --- a/backend/go/parakeet-cpp/main.go +++ b/backend/go/parakeet-cpp/main.go @@ -58,6 +58,13 @@ func main() { purego.RegisterLibFunc(lf.FuncPtr, lib, lf.Name) } + // The batched-JSON entry point exists only in newer libparakeet.so (ABI >= 2). + // Probe with Dlsym and register only if present, so the backend still loads + // against an older library (it falls back to per-request transcription). + if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 { + purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") + } + fmt.Fprintf(os.Stderr, "[parakeet-cpp] ABI=%d\n", CppAbiVersion()) flag.Parse() diff --git a/docs/content/features/audio-to-text.md b/docs/content/features/audio-to-text.md index c786c9e7c09f..22e7d2529aae 100644 --- a/docs/content/features/audio-to-text.md +++ b/docs/content/features/audio-to-text.md @@ -187,6 +187,22 @@ curl http://localhost:8080/v1/audio/transcriptions \ For real-time use, load a cache-aware streaming model (e.g. `realtime_eou_120m-v1-*.gguf`) and pass `-F stream=true`. Deltas are emitted as the audio is decoded, with end-of-utterance events closing each segment. +### Dynamic batching + +The backend can coalesce concurrent transcription requests into a single batched engine call, which improves throughput on GPU when many requests arrive at once. Batching is **off by default** (`batch_max_size:1`, one request at a time); raise it to opt in. Two `options:` knobs control it: + +```yaml +name: parakeet-110m +backend: parakeet-cpp +parameters: + model: tdt_ctc-110m-f16.gguf +options: +- batch_max_size:8 # max requests coalesced into one batch (default 1 = off) +- batch_max_wait_ms:15 # how long to wait to fill a batch, in ms (default 15) +``` + +By default each request runs on its own. Raise `batch_max_size` (for example 4 to 16) to enable batching; it pays off on GPU under concurrent load, where coalescing the per-step decode GEMMs across requests is a large throughput win. Leave it at 1 on CPU and for low-concurrency setups, where batching only adds latency. Batching only affects concurrent unary requests; streaming sessions always run on their own. + ## See also - [Audio Transform]({{< relref "audio-transform.md" >}}) — clean up the audio (echo cancellation, noise suppression, dereverberation) before passing it to a transcription model.