feat(go): add go desrialization support via io streams#3374
feat(go): add go desrialization support via io streams#3374ayush00git wants to merge 23 commits intoapache:mainfrom
Conversation
|
Hey @chaokunyang |
|
hey @ayush00git, looked through this and the main issue i see is in func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
defer f.resetReadState()
f.readCtx.buffer.ResetWithReader(r, 0) // this wipes the prefetch window every timeso if fill() reads ahead past the first object boundary (which it will), those bytes for {
var msg Msg
f.DeserializeFromReader(conn, &msg) // bytes after first object get thrown away
}if you look at how he handles this for c++/python — the Buffer is constructed the go version probably needs something similar — a stream reader type that owns the Happy to discuss if I'm misreading the flow here |
|
Hiii @Zakir032002 |
|
hey @ayush00git , one more thing — ReadBinary and ReadBytes return a direct slice into the problem is fill() compacts the buffer in-place: so if someone reads a []byte field and holds onto that slice, then the next in stream mode you probably want to copy before returning instead of aliasing: in-memory path stays as is. |
|
also noticed — easiest fix is probably just routing the multi-byte case through Happy to discuss if I'm misreading the flow here |
|
Hey @Zakir032002 |
|
Hii @Zakir032002
But i think you misunderstood the if len(b.data)-readIdx >= 5 {
}If we are near a chunk boundary (less than 5 bytes remaining in the buffer), the execution completely skips |
…le stateful deserialization
…le stateful deserialization
|
I've added the |
| b.data = b.data[:len(b.data)+readBytes] | ||
| b.writerIndex += readBytes | ||
| } | ||
| if err != nil { |
There was a problem hiding this comment.
fill currently folds reader errors into false, and callers then emit BufferOutOfBoundError. This masks non-EOF transport failures (for example connection reset) as bounds issues. Please preserve/propagate non-EOF read errors so stream deserialization reports the real I/O failure.
There was a problem hiding this comment.
now it logs the exact error
| return &ByteBuffer{data: data} | ||
| } | ||
|
|
||
| func NewByteBufferFromReader(r io.Reader, minCap int) *ByteBuffer { |
There was a problem hiding this comment.
This introduces stream mode, but ByteBuffer.Read(p) still only copies from in-memory b.data and never calls fill. Any decode path that uses Read can therefore observe partial/zero bytes with short-chunk readers. Please make stream-backed Read fetch until len(p) bytes are available (or return the underlying read error) to avoid silent metadata corruption.
There was a problem hiding this comment.
now it calls fill() upto reserved bytes and copies as well to prevent metadata corruption.
|
Hii @chaokunyang |
|
Please take #3307 as reference to finish the remaining works. And create a Deseralize help methods in tests, then use that instead of Then run benchmarks/go to compare with asf/main to enure your code change don't introduce any performance regression. |
Why?
To enable stream-based deserialization in Fory's Go library, allowing for direct reading from
io.Readerwithout pre-buffering the entire payload. This improves efficiency for network and file-based transport.What does this PR do?
1. go/fory/buffer.go
Enhanced
ByteBufferto supportio.Readerwith an internal sliding window and automatic filling.reader io.ReaderandminCap intfields.fill(n int) boolfor on-demand data fetching and compaction.Read*methods (fixed-size, varint, tagged) to fetch data from the reader if not cached.2. go/fory/fory.go
Added the
DeserializeFromReadermethod as the primary public API for stream deserialization.iopackage.DeserializeFromReaderto reset the buffer state and initiate deserialization from a stream.3. go/fory/reader.go
Ensured
ReadContextcorrectly manages the buffer state when switching between memory-only and stream-backed modes.SetDatato reset thereaderfield.Related issues
Closes #3302
Does this PR introduce any user-facing change?
Benchmark
N/A