From ba26d6a615fe2e02a1fe12c668e608f1b585d1cb Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 10:15:36 +0530 Subject: [PATCH 01/27] added io.Reader to ByteBuffer for streaming deserialization --- go/fory/buffer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index f008333001..70e47a5320 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -20,6 +20,7 @@ package fory import ( "encoding/binary" "fmt" + "io" "math" "unsafe" ) @@ -28,6 +29,8 @@ type ByteBuffer struct { data []byte // Most accessed field first for cache locality writerIndex int readerIndex int + reader io.Reader + minCap int } func NewByteBuffer(data []byte) *ByteBuffer { From 8ba4dba6de8d40049a2b0ee0ccc974301cec37e3 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:01:07 +0530 Subject: [PATCH 02/27] added NewByteBufferFromReader and fill method --- go/fory/buffer.go | 66 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index 70e47a5320..f052debe24 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -29,14 +29,76 @@ type ByteBuffer struct { data []byte // Most accessed field first for cache locality writerIndex int readerIndex int - reader io.Reader - minCap int + reader io.Reader + minCap int } func NewByteBuffer(data []byte) *ByteBuffer { return &ByteBuffer{data: data} } +func NewByteBufferFromReader(r io.Reader, minCap int) *ByteBuffer { + if minCap <= 0 { + minCap = 4096 + } + return &ByteBuffer{ + data: make([]byte, 0, minCap), + reader: r, + minCap: minCap, + } +} + +//go:noinline +func (b *ByteBuffer) fill(n int) bool { + if b.reader == nil { + return false + } + + available := len(b.data) - b.readerIndex + if available >= n { + return true + } + + if b.readerIndex > 0 { + copy(b.data, b.data[b.readerIndex:]) + b.writerIndex -= b.readerIndex + b.readerIndex = 0 + b.data = b.data[:b.writerIndex] + } + + if cap(b.data) < n { + newCap := cap(b.data) * 2 + if newCap < n { + newCap = n + } + if newCap < b.minCap { + newCap = b.minCap + } + newData := make([]byte, len(b.data), newCap) + copy(newData, b.data) + b.data = newData + } + + for len(b.data) < n { + spare := b.data[len(b.data):cap(b.data)] + if len(spare) == 0 { + return false + } + readBytes, err := b.reader.Read(spare) + if readBytes > 0 { + b.data = b.data[:len(b.data)+readBytes] + b.writerIndex += readBytes + } + if err != nil { + if len(b.data) >= n { + return true + } + return false + } + } + return true +} + // grow ensures there's space for n more bytes. Hot path is inlined. // //go:inline From e4baf32cb4133bd25403ed026de366c2d44eccea Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:32:02 +0530 Subject: [PATCH 03/27] added condition to check for read stream and ResetByteBuffer method --- go/fory/buffer.go | 200 +++++++++++++++++++++++++++------------------- 1 file changed, 117 insertions(+), 83 deletions(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index f052debe24..e4a43b701e 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -250,8 +250,10 @@ func (b *ByteBuffer) WriteBinary(p []byte) { //go:inline func (b *ByteBuffer) ReadBool(err *Error) bool { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return false + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return false + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -263,8 +265,10 @@ func (b *ByteBuffer) ReadBool(err *Error) bool { //go:inline func (b *ByteBuffer) ReadByte(err *Error) byte { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -276,8 +280,10 @@ func (b *ByteBuffer) ReadByte(err *Error) byte { //go:inline func (b *ByteBuffer) ReadInt8(err *Error) int8 { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := int8(b.data[b.readerIndex]) b.readerIndex++ @@ -289,8 +295,10 @@ func (b *ByteBuffer) ReadInt8(err *Error) int8 { //go:inline func (b *ByteBuffer) ReadInt16(err *Error) int16 { if b.readerIndex+2 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) - return 0 + if !b.fill(2) { + *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + return 0 + } } v := int16(binary.LittleEndian.Uint16(b.data[b.readerIndex:])) b.readerIndex += 2 @@ -302,8 +310,10 @@ func (b *ByteBuffer) ReadInt16(err *Error) int16 { //go:inline func (b *ByteBuffer) ReadUint16(err *Error) uint16 { if b.readerIndex+2 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) - return 0 + if !b.fill(2) { + *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + return 0 + } } v := binary.LittleEndian.Uint16(b.data[b.readerIndex:]) b.readerIndex += 2 @@ -315,8 +325,10 @@ func (b *ByteBuffer) ReadUint16(err *Error) uint16 { //go:inline func (b *ByteBuffer) ReadUint32(err *Error) uint32 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } i := binary.LittleEndian.Uint32(b.data[b.readerIndex:]) b.readerIndex += 4 @@ -328,8 +340,10 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 { //go:inline func (b *ByteBuffer) ReadUint64(err *Error) uint64 { if b.readerIndex+8 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) - return 0 + if !b.fill(8) { + *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) + return 0 + } } i := binary.LittleEndian.Uint64(b.data[b.readerIndex:]) b.readerIndex += 8 @@ -373,8 +387,10 @@ func (b *ByteBuffer) Read(p []byte) (n int, err error) { // ReadBinary reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte { if b.readerIndex+length > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) - return nil + if !b.fill(length) { + *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + return nil + } } v := b.data[b.readerIndex : b.readerIndex+length] b.readerIndex += length @@ -425,13 +441,27 @@ func (b *ByteBuffer) SetReaderIndex(index int) { func (b *ByteBuffer) Reset() { b.readerIndex = 0 b.writerIndex = 0 + b.reader = nil // Keep the underlying buffer if it's reasonable sized to reduce allocations // Only nil it out if we want to release memory if cap(b.data) > 64*1024 { b.data = nil } - // Note: We keep b.data as-is (with its current length) to avoid issues - // with grow() needing to expand the slice on first write +} + +func (b *ByteBuffer) ResetWithReader(r io.Reader, minCap int) { + b.readerIndex = 0 + b.writerIndex = 0 + b.reader = r + if minCap <= 0 { + minCap = 4096 + } + b.minCap = minCap + if cap(b.data) < minCap { + b.data = make([]byte, 0, minCap) + } else { + b.data = b.data[:0] + } } // Reserve ensures buffer has at least n bytes available for writing from current position. @@ -942,7 +972,7 @@ func (b *ByteBuffer) WriteVaruint36Small(value uint64) { // //go:inline func (b *ByteBuffer) ReadVaruint36Small(err *Error) uint64 { - if b.remaining() >= 8 { + if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { return b.readVaruint36SmallFast() } return b.readVaruint36SmallSlow(err) @@ -986,7 +1016,13 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { var result uint64 var shift uint - for b.readerIndex < len(b.data) { + for { + if b.readerIndex >= len(b.data) { + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } + } byteVal := b.data[b.readerIndex] b.readerIndex++ result |= uint64(byteVal&0x7F) << shift @@ -999,8 +1035,6 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { return 0 } } - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 } // ReadVarint64 reads the varint encoded with zig-zag (compatible with Java's readVarint64). @@ -1040,8 +1074,10 @@ func (b *ByteBuffer) WriteTaggedInt64(value int64) { // Otherwise, skip flag byte and read 8 bytes as int64. func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } var i int32 if isLittleEndian { @@ -1054,8 +1090,10 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { return int64(i >> 1) // arithmetic right shift } if b.readerIndex+9 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) - return 0 + if !b.fill(9) { + *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + return 0 + } } var value int64 if isLittleEndian { @@ -1091,8 +1129,10 @@ func (b *ByteBuffer) WriteTaggedUint64(value uint64) { // Otherwise, skip flag byte and read 8 bytes as uint64. func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } var i uint32 if isLittleEndian { @@ -1105,8 +1145,10 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { return uint64(i >> 1) } if b.readerIndex+9 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) - return 0 + if !b.fill(9) { + *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + return 0 + } } var value uint64 if isLittleEndian { @@ -1122,7 +1164,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint64(err *Error) uint64 { - if b.remaining() >= 9 { + if b.remaining() >= 9 || (b.reader != nil && b.fill(9)) { return b.readVarUint64Fast() } return b.readVarUint64Slow(err) @@ -1187,8 +1229,10 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { var shift uint for i := 0; i < 8; i++ { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1199,8 +1243,10 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { shift += 7 } if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1219,8 +1265,10 @@ func (b *ByteBuffer) remaining() int { //go:inline func (b *ByteBuffer) ReadUint8(err *Error) uint8 { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -1293,7 +1341,7 @@ func (b *ByteBuffer) UnsafeReadVarUint64() uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint32(err *Error) uint32 { - if b.remaining() >= 8 { // Need 8 bytes for bulk uint64 read in fast path + if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { // Need 8 bytes for bulk uint64 read in fast path return b.readVarUint32Fast() } return b.readVarUint32Slow(err) @@ -1341,8 +1389,10 @@ func (b *ByteBuffer) readVarUint32Slow(err *Error) uint32 { var shift uint for { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1502,8 +1552,10 @@ func (b *ByteBuffer) UnsafePutTaggedUint64(offset int, value uint64) int { // ReadVarUint32Small7 reads a VarUint32 in small-7 format with error checking func (b *ByteBuffer) ReadVarUint32Small7(err *Error) uint32 { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } readIdx := b.readerIndex v := b.data[readIdx] @@ -1551,47 +1603,25 @@ func (b *ByteBuffer) continueReadVarUint32(readIdx int, bulkRead, value uint32) } func (b *ByteBuffer) readVaruint36Slow(err *Error) uint64 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b0 := b.data[b.readerIndex] - b.readerIndex++ - result := uint64(b0 & 0x7F) - if b0&0x80 != 0 { + var shift uint + var result uint64 + for { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b1 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b1&0x7F) << 7 - if b1&0x80 != 0 { - if b.readerIndex >= len(b.data) { + if !b.fill(1) { *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) return 0 } - b2 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b2&0x7F) << 14 - if b2&0x80 != 0 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b3 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b3&0x7F) << 21 - if b3&0x80 != 0 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b4 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b4) << 28 - } - } + } + b0 := b.data[b.readerIndex] + b.readerIndex++ + result |= uint64(b0&0x7F) << shift + if b0&0x80 == 0 { + break + } + shift += 7 + if shift >= 35 { + *err = DeserializationError("varuint36 overflow") + return 0 } } return result @@ -1614,8 +1644,10 @@ func (b *ByteBuffer) IncreaseReaderIndex(n int) { // ReadBytes reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { if b.readerIndex+n > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) - return nil + if !b.fill(n) { + *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) + return nil + } } p := b.data[b.readerIndex : b.readerIndex+n] b.readerIndex += n @@ -1625,8 +1657,10 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { // Skip skips n bytes and sets error on bounds violation func (b *ByteBuffer) Skip(length int, err *Error) { if b.readerIndex+length > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) - return + if !b.fill(length) { + *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + return + } } b.readerIndex += length } From f751916336992e55929b442c59bef94768e68e11 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:41:23 +0530 Subject: [PATCH 04/27] added stream deserializer and initialized buffer reader to 0 --- go/fory/fory.go | 23 +++++++++++++++++++++++ go/fory/reader.go | 1 + 2 files changed, 24 insertions(+) diff --git a/go/fory/fory.go b/go/fory/fory.go index 09a0e3c6d2..86b2e3c041 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -20,6 +20,7 @@ package fory import ( "errors" "fmt" + "io" "reflect" "strconv" "strings" @@ -495,6 +496,28 @@ func (f *Fory) Deserialize(data []byte, v any) error { return nil } +func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { + defer f.resetReadState() + f.readCtx.buffer.ResetWithReader(r, 0) + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + if isNull { + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + return nil +} + // resetReadState resets read context state without allocation func (f *Fory) resetReadState() { f.readCtx.Reset() diff --git a/go/fory/reader.go b/go/fory/reader.go index e7a1df1710..a0a37d92fb 100644 --- a/go/fory/reader.go +++ b/go/fory/reader.go @@ -83,6 +83,7 @@ func (c *ReadContext) SetData(data []byte) { c.buffer.data = data c.buffer.readerIndex = 0 c.buffer.writerIndex = len(data) + c.buffer.reader = nil } } From 88ecb2dc1757736cecd4ec5a1b35cb13b0c8d030 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:48:54 +0530 Subject: [PATCH 05/27] added stream test suites --- go/fory/stream_test.go | 119 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 go/fory/stream_test.go diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go new file mode 100644 index 0000000000..c2da9e1638 --- /dev/null +++ b/go/fory/stream_test.go @@ -0,0 +1,119 @@ +package fory + +import ( + "bytes" + "io" + "testing" +) + +type StreamTestStruct struct { + ID int32 + Name string + Data []byte +} + +func TestStreamDeserialization(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "Stream Test", + Data: []byte{1, 2, 3, 4, 5}, + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // 1. Test normal reader + reader := bytes.NewReader(data) + var decoded StreamTestStruct + err = f.DeserializeFromReader(reader, &decoded) + if err != nil { + t.Fatalf("DeserializeFromReader failed: %v", err) + } + + if decoded.ID != original.ID || decoded.Name != original.Name || !bytes.Equal(decoded.Data, original.Data) { + t.Errorf("Decoded value mismatch. Got: %+v, Want: %+v", decoded, original) + } +} + +// slowReader returns data byte by byte to test fill() logic and compaction +type slowReader struct { + data []byte + pos int +} + +func (r *slowReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + if len(p) == 0 { + return 0, nil + } + p[0] = r.data[r.pos] + r.pos++ + return 1, nil +} + +func TestStreamDeserializationSlow(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "Slow Stream Test with a reasonably long string and some data to trigger multiple fills", + Data: bytes.Repeat([]byte{0xAA}, 100), + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // Test with slow reader and small minCap to force compaction/growth + reader := &slowReader{data: data} + var decoded StreamTestStruct + // Use small minCap (16) to force frequent fills and compactions + f.readCtx.buffer.ResetWithReader(reader, 16) + + err = f.DeserializeFromReader(reader, &decoded) + if err != nil { + t.Fatalf("DeserializeFromReader (slow) failed: %v", err) + } + + if decoded.ID != original.ID || decoded.Name != original.Name || !bytes.Equal(decoded.Data, original.Data) { + t.Errorf("Decoded value mismatch (slow). Got: %+v, Want: %+v", decoded, original) + } +} + +func TestStreamDeserializationEOF(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "EOF Test", + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // Truncate data to cause unexpected EOF during reading Name + truncated := data[:len(data)-2] + reader := bytes.NewReader(truncated) + var decoded StreamTestStruct + err = f.DeserializeFromReader(reader, &decoded) + if err == nil { + t.Fatal("Expected error on truncated stream, got nil") + } + + // Ideally it should be a BufferOutOfBoundError + if _, ok := err.(Error); !ok { + t.Errorf("Expected fory.Error, got %T: %v", err, err) + } +} From a768ac1149e605d191dc29d4fd528451265b20e9 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:59:03 +0530 Subject: [PATCH 06/27] fix ci --- go/fory/stream_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index c2da9e1638..7388a78bb6 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package fory import ( From 1742a5d925445be16002c132abf798f408100cf1 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Wed, 25 Feb 2026 21:16:13 +0530 Subject: [PATCH 07/27] fix(docs): updated compiler guide --- docs/compiler/compiler-guide.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/compiler/compiler-guide.md b/docs/compiler/compiler-guide.md index 833ea0d3ff..49a3787b2e 100644 --- a/docs/compiler/compiler-guide.md +++ b/docs/compiler/compiler-guide.md @@ -63,9 +63,10 @@ Compile options: | `--cpp_out=DST_DIR` | Generate C++ code in DST_DIR | (none) | | `--go_out=DST_DIR` | Generate Go code in DST_DIR | (none) | | `--rust_out=DST_DIR` | Generate Rust code in DST_DIR | (none) | -| `--go_nested_type_style` | Go nested type naming: `camelcase` or `underscore` | from schema/default | -| `--emit-fdl` | Print translated Fory IDL for non-`.fdl` inputs | `false` | -| `--emit-fdl-path` | Write translated Fory IDL to a file or directory | (stdout) | +| `--grpc` | Generate gRPC service code (in development) | `false` | +| `--go_nested_type_style` | Go nested type naming: `camelcase` or `underscore` | `underscore` | +| `--emit-fdl` | Emit translated FDL (for non-FDL inputs) | `false` | +| `--emit-fdl-path` | Write translated FDL to this path (file or directory) | (stdout) | Scan options (with `--scan-generated`): @@ -179,6 +180,12 @@ foryc schema.proto --emit-fdl foryc schema.fbs --emit-fdl --emit-fdl-path ./translated ``` +**Generate gRPC service code:** + +```bash +foryc schema.fdl --grpc --lang go,java +``` + ## Import Path Resolution When compiling Fory IDL files with imports, the compiler searches for imported files in this order: From e205d5dca58828bdbca0403152d7aa8822f8f4f8 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:03:54 +0530 Subject: [PATCH 08/27] fix: create a copy of slice while desrlz to prevent overwriting --- go/fory/buffer.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index e4a43b701e..67641faeb9 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -392,6 +392,15 @@ func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte { return nil } } + + if b.reader != nil { + // In stream mode, compaction might overwrite these bytes, so we must copy + result := make([]byte, length) + copy(result, b.data[b.readerIndex:b.readerIndex+length]) + b.readerIndex += length + return result + } + v := b.data[b.readerIndex : b.readerIndex+length] b.readerIndex += length return v @@ -1649,6 +1658,15 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { return nil } } + + if b.reader != nil { + // In stream mode, compaction might overwrite these bytes, so we must copy + result := make([]byte, n) + copy(result, b.data[b.readerIndex:b.readerIndex+n]) + b.readerIndex += n + return result + } + p := b.data[b.readerIndex : b.readerIndex+n] b.readerIndex += n return p From 4fdb8ecf3e7b38c69164068773e7d781e49c7000 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:12:35 +0530 Subject: [PATCH 09/27] added StreamReader struct and NewStreamReader method which would handle stateful deserialization --- go/fory/fory.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/go/fory/fory.go b/go/fory/fory.go index 86b2e3c041..868a480f2e 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -496,9 +496,87 @@ func (f *Fory) Deserialize(data []byte, v any) error { return nil } +// StreamReader supports robust sequential deserialization from a stream. +// It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, +// preventing data loss from prefetched buffers and preserving TypeResolver metadata +// (Meta Sharing) across object boundaries. +type StreamReader struct { + fory *Fory + reader io.Reader + buffer *ByteBuffer +} + +// NewStreamReader creates a new StreamReader that reads from the provided io.Reader. +// The StreamReader owns the buffer and maintains state across sequential Deserialize calls. +func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { + return f.NewStreamReaderWithMinCap(r, 0) +} + +// NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. +func (f *Fory) NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { + buf := NewByteBufferFromReader(r, minCap) + return &StreamReader{ + fory: f, + reader: r, + buffer: buf, + } +} + +// Deserialize reads the next object from the stream into the provided value. +// It uses a shared ReadContext for the lifetime of the StreamReader, clearing +// temporary state between calls but preserving the buffer and TypeResolver state. +func (sr *StreamReader) Deserialize(v any) error { + f := sr.fory + + // We only reset the temporary read state (like refTracker and outOfBand buffers), + // NOT the buffer or the type mapping, which must persist. + defer func() { + f.readCtx.refReader.Reset() + f.readCtx.outOfBandBuffers = nil + f.readCtx.outOfBandIndex = 0 + f.readCtx.err = Error{} + if f.readCtx.refResolver != nil { + f.readCtx.refResolver.resetRead() + } + // Do NOT reset typeResolver here. It must persist across objects in a stream. + }() + + // Temporarily swap buffer + origBuffer := f.readCtx.buffer + f.readCtx.buffer = sr.buffer + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + if isNull { + f.readCtx.buffer = origBuffer + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + // Restore original buffer + f.readCtx.buffer = origBuffer + + return nil +} + +// DeserializeFromReader is deprecated for sequential streaming. Use NewStreamReader instead. +// It deserializes a single object from a stream but will discard prefetched data +// and type metadata after the call. func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { defer f.resetReadState() - f.readCtx.buffer.ResetWithReader(r, 0) + if f.readCtx.buffer.reader != r { + f.readCtx.buffer.ResetWithReader(r, 0) + } isNull := readHeader(f.readCtx) if f.readCtx.HasError() { From 1a8c100a893a20a0cb3ad29bb8e0f2fb7ca5a77c Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:13:12 +0530 Subject: [PATCH 10/27] added StreamReader struct and NewStreamReader method which would handle stateful deserialization --- go/fory/fory.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/go/fory/fory.go b/go/fory/fory.go index 868a480f2e..c04e822d5a 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -538,7 +538,6 @@ func (sr *StreamReader) Deserialize(v any) error { if f.readCtx.refResolver != nil { f.readCtx.refResolver.resetRead() } - // Do NOT reset typeResolver here. It must persist across objects in a stream. }() // Temporarily swap buffer @@ -569,8 +568,8 @@ func (sr *StreamReader) Deserialize(v any) error { return nil } -// DeserializeFromReader is deprecated for sequential streaming. Use NewStreamReader instead. -// It deserializes a single object from a stream but will discard prefetched data +// For Sequential Streaming use NewStreamReader instead of DeserializeFromReader. +// DeserializeFromReader deserializes a single object from a stream but will discard prefetched data // and type metadata after the call. func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { defer f.resetReadState() From 92c207d4697e9264981db63172747a4f2899ccdc Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:14:08 +0530 Subject: [PATCH 11/27] added tests for stream reader --- go/fory/stream_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index 7388a78bb6..c0392c0b59 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -134,3 +134,58 @@ func TestStreamDeserializationEOF(t *testing.T) { t.Errorf("Expected fory.Error, got %T: %v", err, err) } } + +func TestStreamReaderSequential(t *testing.T) { + f := New() + // Register type in compatible mode to test Meta Sharing across sequential reads + f.config.Compatible = true + f.RegisterStruct(&StreamTestStruct{}, 100) + + msg1 := &StreamTestStruct{ID: 1, Name: "Msg 1", Data: []byte{1, 1}} + msg2 := &StreamTestStruct{ID: 2, Name: "Msg 2", Data: []byte{2, 2}} + msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}} + + var buf bytes.Buffer + + // Serialize sequentially into one stream + data1, _ := f.Serialize(msg1) + buf.Write(data1) + data2, _ := f.Serialize(msg2) + buf.Write(data2) + data3, _ := f.Serialize(msg3) + buf.Write(data3) + + fDec := New() + fDec.config.Compatible = true + fDec.RegisterStruct(&StreamTestStruct{}, 100) + + // Create a StreamReader + sr := fDec.NewStreamReader(&buf) + + // Deserialize sequentially + var out1, out2, out3 StreamTestStruct + + err := sr.Deserialize(&out1) + if err != nil { + t.Fatalf("Deserialize 1 failed: %v", err) + } + if out1.ID != msg1.ID || out1.Name != msg1.Name || !bytes.Equal(out1.Data, msg1.Data) { + t.Errorf("Msg 1 mismatch. Got: %+v, Want: %+v", out1, msg1) + } + + err = sr.Deserialize(&out2) + if err != nil { + t.Fatalf("Deserialize 2 failed: %v", err) + } + if out2.ID != msg2.ID || out2.Name != msg2.Name || !bytes.Equal(out2.Data, msg2.Data) { + t.Errorf("Msg 2 mismatch. Got: %+v, Want: %+v", out2, msg2) + } + + err = sr.Deserialize(&out3) + if err != nil { + t.Fatalf("Deserialize 3 failed: %v", err) + } + if out3.ID != msg3.ID || out3.Name != msg3.Name || !bytes.Equal(out3.Data, msg3.Data) { + t.Errorf("Msg 3 mismatch. Got: %+v, Want: %+v", out3, msg3) + } +} From d90c55d4ec85ef882edc5f59643d1429d05f1946 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:26:12 +0530 Subject: [PATCH 12/27] code lint checks --- go/fory/stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index c0392c0b59..e48fa37868 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -146,7 +146,7 @@ func TestStreamReaderSequential(t *testing.T) { msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}} var buf bytes.Buffer - + // Serialize sequentially into one stream data1, _ := f.Serialize(msg1) buf.Write(data1) From 42c3bbba8e7ce8c5b0a9518a5c78dae2c2b5d718 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Sat, 28 Feb 2026 13:10:45 +0530 Subject: [PATCH 13/27] fix: added correct error boundation --- go/fory/buffer.go | 81 ++++++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 46 deletions(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index 67641faeb9..377fb40202 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -49,8 +49,11 @@ func NewByteBufferFromReader(r io.Reader, minCap int) *ByteBuffer { } //go:noinline -func (b *ByteBuffer) fill(n int) bool { +func (b *ByteBuffer) fill(n int, errOut *Error) bool { if b.reader == nil { + if errOut != nil { + *errOut = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) + } return false } @@ -93,6 +96,13 @@ func (b *ByteBuffer) fill(n int) bool { if len(b.data) >= n { return true } + if errOut != nil { + if err == io.EOF { + *errOut = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) + } else { + *errOut = DeserializationError(fmt.Sprintf("stream read error: %v", err)) + } + } return false } } @@ -250,8 +260,7 @@ func (b *ByteBuffer) WriteBinary(p []byte) { //go:inline func (b *ByteBuffer) ReadBool(err *Error) bool { if b.readerIndex+1 > len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return false } } @@ -265,8 +274,7 @@ func (b *ByteBuffer) ReadBool(err *Error) bool { //go:inline func (b *ByteBuffer) ReadByte(err *Error) byte { if b.readerIndex+1 > len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -280,8 +288,7 @@ func (b *ByteBuffer) ReadByte(err *Error) byte { //go:inline func (b *ByteBuffer) ReadInt8(err *Error) int8 { if b.readerIndex+1 > len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -295,8 +302,7 @@ func (b *ByteBuffer) ReadInt8(err *Error) int8 { //go:inline func (b *ByteBuffer) ReadInt16(err *Error) int16 { if b.readerIndex+2 > len(b.data) { - if !b.fill(2) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + if !b.fill(2, err) { return 0 } } @@ -310,8 +316,7 @@ func (b *ByteBuffer) ReadInt16(err *Error) int16 { //go:inline func (b *ByteBuffer) ReadUint16(err *Error) uint16 { if b.readerIndex+2 > len(b.data) { - if !b.fill(2) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + if !b.fill(2, err) { return 0 } } @@ -325,8 +330,7 @@ func (b *ByteBuffer) ReadUint16(err *Error) uint16 { //go:inline func (b *ByteBuffer) ReadUint32(err *Error) uint32 { if b.readerIndex+4 > len(b.data) { - if !b.fill(4) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + if !b.fill(4, err) { return 0 } } @@ -340,8 +344,7 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 { //go:inline func (b *ByteBuffer) ReadUint64(err *Error) uint64 { if b.readerIndex+8 > len(b.data) { - if !b.fill(8) { - *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) + if !b.fill(8, nil) { return 0 } } @@ -387,8 +390,7 @@ func (b *ByteBuffer) Read(p []byte) (n int, err error) { // ReadBinary reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte { if b.readerIndex+length > len(b.data) { - if !b.fill(length) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + if !b.fill(length, err) { return nil } } @@ -981,7 +983,7 @@ func (b *ByteBuffer) WriteVaruint36Small(value uint64) { // //go:inline func (b *ByteBuffer) ReadVaruint36Small(err *Error) uint64 { - if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { + if b.remaining() >= 8 || (b.reader != nil && b.fill(8, nil)) { return b.readVaruint36SmallFast() } return b.readVaruint36SmallSlow(err) @@ -1027,8 +1029,7 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { for { if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1083,8 +1084,7 @@ func (b *ByteBuffer) WriteTaggedInt64(value int64) { // Otherwise, skip flag byte and read 8 bytes as int64. func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { if b.readerIndex+4 > len(b.data) { - if !b.fill(4) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + if !b.fill(4, err) { return 0 } } @@ -1099,8 +1099,7 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { return int64(i >> 1) // arithmetic right shift } if b.readerIndex+9 > len(b.data) { - if !b.fill(9) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + if !b.fill(9, nil) { return 0 } } @@ -1138,8 +1137,7 @@ func (b *ByteBuffer) WriteTaggedUint64(value uint64) { // Otherwise, skip flag byte and read 8 bytes as uint64. func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { if b.readerIndex+4 > len(b.data) { - if !b.fill(4) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + if !b.fill(4, err) { return 0 } } @@ -1154,8 +1152,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { return uint64(i >> 1) } if b.readerIndex+9 > len(b.data) { - if !b.fill(9) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + if !b.fill(9, nil) { return 0 } } @@ -1173,7 +1170,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint64(err *Error) uint64 { - if b.remaining() >= 9 || (b.reader != nil && b.fill(9)) { + if b.remaining() >= 9 || (b.reader != nil && b.fill(9, nil)) { return b.readVarUint64Fast() } return b.readVarUint64Slow(err) @@ -1238,8 +1235,7 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { var shift uint for i := 0; i < 8; i++ { if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1252,8 +1248,7 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { shift += 7 } if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1274,8 +1269,7 @@ func (b *ByteBuffer) remaining() int { //go:inline func (b *ByteBuffer) ReadUint8(err *Error) uint8 { if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1350,7 +1344,7 @@ func (b *ByteBuffer) UnsafeReadVarUint64() uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint32(err *Error) uint32 { - if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { // Need 8 bytes for bulk uint64 read in fast path + if b.remaining() >= 8 || (b.reader != nil && b.fill(8, nil)) { // Need 8 bytes for bulk uint64 read in fast path return b.readVarUint32Fast() } return b.readVarUint32Slow(err) @@ -1398,8 +1392,7 @@ func (b *ByteBuffer) readVarUint32Slow(err *Error) uint32 { var shift uint for { if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1561,8 +1554,7 @@ func (b *ByteBuffer) UnsafePutTaggedUint64(offset int, value uint64) int { // ReadVarUint32Small7 reads a VarUint32 in small-7 format with error checking func (b *ByteBuffer) ReadVarUint32Small7(err *Error) uint32 { if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1616,8 +1608,7 @@ func (b *ByteBuffer) readVaruint36Slow(err *Error) uint64 { var result uint64 for { if b.readerIndex >= len(b.data) { - if !b.fill(1) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + if !b.fill(1, err) { return 0 } } @@ -1653,8 +1644,7 @@ func (b *ByteBuffer) IncreaseReaderIndex(n int) { // ReadBytes reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { if b.readerIndex+n > len(b.data) { - if !b.fill(n) { - *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) + if !b.fill(n, err) { return nil } } @@ -1675,8 +1665,7 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { // Skip skips n bytes and sets error on bounds violation func (b *ByteBuffer) Skip(length int, err *Error) { if b.readerIndex+length > len(b.data) { - if !b.fill(length) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + if !b.fill(length, err) { return } } From 08c5cf168361fcfd8fb90af3e0d74656f99ab305 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Sat, 28 Feb 2026 13:24:21 +0530 Subject: [PATCH 14/27] read now fetch from the stream until p bytes eln --- go/fory/buffer.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index 377fb40202..08b7b8b21b 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -382,8 +382,27 @@ func (b *ByteBuffer) ReadFloat64(err *Error) float64 { } func (b *ByteBuffer) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + if b.readerIndex+len(p) > len(b.data) && b.reader != nil { + var errOut Error + if !b.fill(len(p), &errOut) { + copied := copy(p, b.data[b.readerIndex:]) + b.readerIndex += copied + if errOut.Kind() == ErrKindBufferOutOfBound { + return copied, io.EOF + } + return copied, errOut + } + } + copied := copy(p, b.data[b.readerIndex:]) b.readerIndex += copied + if copied == 0 { + return 0, io.EOF + } return copied, nil } From 0c6dc2b2cb79861bab2bbf3c2faf09618d9891d2 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 12:06:46 +0530 Subject: [PATCH 15/27] trigger ci From fd03acf8fbf610565b30487212ef4f69878afe01 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 13:43:39 +0530 Subject: [PATCH 16/27] added a helper test function for stream dsr --- go/fory/test_helper_test.go | 72 +++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 go/fory/test_helper_test.go diff --git a/go/fory/test_helper_test.go b/go/fory/test_helper_test.go new file mode 100644 index 0000000000..5cc051c289 --- /dev/null +++ b/go/fory/test_helper_test.go @@ -0,0 +1,72 @@ +package fory + +import ( + "io" + "reflect" + "testing" +) + +// oneByteReader returns data byte by byte to ensure aggressively that all +// `fill()` boundaries, loops, and buffering conditions are tested. +type oneByteReader struct { + data []byte + pos int +} + +func (r *oneByteReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + if len(p) == 0 { + return 0, nil + } + p[0] = r.data[r.pos] + r.pos++ + return 1, nil +} + +// testDeserialize is a testing helper that performs standard in-memory +// deserialization and additionally wraps the payload in a slow one-byte +// stream reader to verify that stream decoding handles fragmented reads correctly. +func testDeserialize(t *testing.T, f *Fory, data []byte, v any) error { + t.Helper() + + // 1. First, deserialize from bytes (the fast path) + err := f.Deserialize(data, v) + if err != nil { + return err + } + + // 2. Deserialize from oneByteReader (the slow stream path) + // We create a new instance of the same target type to ensure clean state + vType := reflect.TypeOf(v) + if vType == nil || vType.Kind() != reflect.Ptr { + t.Fatalf("testDeserialize requires a pointer to a value, got %v", vType) + } + + // Create new pointer to a new zero value of the element type + v2 := reflect.New(vType.Elem()).Interface() + + stream := &oneByteReader{data: data, pos: 0} + + // Create a new stream reader. The stream context handles boundaries and compactions. + streamReader := f.NewStreamReader(stream) + errStream := streamReader.Deserialize(v2) + + if errStream != nil { + t.Fatalf("Stream deserialization via OneByteStream failed: %v", errStream) + } + + // Note: We don't assert deep equality because many tests deserialize into interfaces + // or perform custom conversions. Simply verifying that the stream mode DOES NOT error + // on a payload that normally succeeds is a very strong proxy for correctness here. + + // Returns the original error from standard deserialization + return err +} + +// testUnmarshal is an identical helper for `Unmarshal` (which is often used in tests) +func testUnmarshal(t *testing.T, f *Fory, data []byte, v any) error { + t.Helper() + return testDeserialize(t, f, data, v) +} From b1cc78c5e096f136d70d3195d8ee0479d894d167 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 13:46:54 +0530 Subject: [PATCH 17/27] f.desialize -> testDeserialize to dsr one by one bytebuffer --- go/fory/array_primitive_test.go | 18 ++++----- go/fory/primitive_test.go | 8 ++-- go/fory/slice_primitive_test.go | 66 ++++++++++++++++----------------- go/fory/struct_test.go | 10 ++--- 4 files changed, 51 insertions(+), 51 deletions(-) diff --git a/go/fory/array_primitive_test.go b/go/fory/array_primitive_test.go index e8c99fb2f2..c1b989efce 100644 --- a/go/fory/array_primitive_test.go +++ b/go/fory/array_primitive_test.go @@ -36,7 +36,7 @@ func TestPrimitiveArraySerializer(t *testing.T) { assert.NoError(t, err) var result [3]uint16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, arr, result) }) @@ -48,7 +48,7 @@ func TestPrimitiveArraySerializer(t *testing.T) { assert.NoError(t, err) var result [3]uint32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, arr, result) }) @@ -60,7 +60,7 @@ func TestPrimitiveArraySerializer(t *testing.T) { assert.NoError(t, err) var result [3]uint64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, arr, result) }) @@ -86,7 +86,7 @@ func TestPrimitiveArraySerializer(t *testing.T) { assert.NoError(t, err) var result [3]bfloat16.BFloat16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, arr, result) }) @@ -103,7 +103,7 @@ func TestArraySliceInteroperability(t *testing.T) { // Deserialize into Slice []int32 var slice []int32 - err = f.Deserialize(data, &slice) + err = testDeserialize(t, f, data, &slice) assert.NoError(t, err) assert.Equal(t, []int32{1, 2, 3}, slice) }) @@ -116,7 +116,7 @@ func TestArraySliceInteroperability(t *testing.T) { // Deserialize into Array [3]int32 var arr [3]int32 - err = f.Deserialize(data, &arr) + err = testDeserialize(t, f, data, &arr) assert.NoError(t, err) assert.Equal(t, [3]int32{4, 5, 6}, arr) }) @@ -128,7 +128,7 @@ func TestArraySliceInteroperability(t *testing.T) { assert.NoError(t, err) var slice []int64 // different type - err = f.Deserialize(data, &slice) + err = testDeserialize(t, f, data, &slice) // Strict checking means this should error immediately upon reading wrong TypeID assert.Error(t, err) }) @@ -141,7 +141,7 @@ func TestArraySliceInteroperability(t *testing.T) { // Deserialize into Array [3]int32 - should fail size check var arr [3]int32 - err = f.Deserialize(data, &arr) + err = testDeserialize(t, f, data, &arr) // Serialized as list with len 2. Array expects 3. assert.Error(t, err) assert.Contains(t, err.Error(), "array length") @@ -161,7 +161,7 @@ func TestFloat16Array(t *testing.T) { assert.NoError(t, err) var result [3]float16.Float16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, arr, result) }) diff --git a/go/fory/primitive_test.go b/go/fory/primitive_test.go index 0f478590e0..f5bf7b08c1 100644 --- a/go/fory/primitive_test.go +++ b/go/fory/primitive_test.go @@ -34,7 +34,7 @@ func TestFloat16Primitive(t *testing.T) { require.NoError(t, err) var res float16.Float16 - err = f.Deserialize(data, &res) + err = testDeserialize(t, f, data, &res) require.NoError(t, err) require.True(t, f16.Equal(res)) @@ -53,7 +53,7 @@ func TestFloat16PrimitiveSliceDirect(t *testing.T) { require.NoError(t, err) var resSlice []float16.Float16 - err = f.Deserialize(data, &resSlice) + err = testDeserialize(t, f, data, &resSlice) require.NoError(t, err) require.Equal(t, slice, resSlice) } @@ -67,7 +67,7 @@ func TestBFloat16Primitive(t *testing.T) { require.NoError(t, err) var res bfloat16.BFloat16 - err = f.Deserialize(data, &res) + err = testDeserialize(t, f, data, &res) require.NoError(t, err) require.Equal(t, bf16.Bits(), res.Bits()) @@ -86,7 +86,7 @@ func TestBFloat16PrimitiveSliceDirect(t *testing.T) { require.NoError(t, err) var resSlice []bfloat16.BFloat16 - err = f.Deserialize(data, &resSlice) + err = testDeserialize(t, f, data, &resSlice) require.NoError(t, err) require.Equal(t, slice, resSlice) } diff --git a/go/fory/slice_primitive_test.go b/go/fory/slice_primitive_test.go index e03a4cb00c..b072078e3b 100644 --- a/go/fory/slice_primitive_test.go +++ b/go/fory/slice_primitive_test.go @@ -39,7 +39,7 @@ func TestFloat16Slice(t *testing.T) { assert.NoError(t, err) var result []float16.Float16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -50,7 +50,7 @@ func TestFloat16Slice(t *testing.T) { assert.NoError(t, err) var result []float16.Float16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -62,7 +62,7 @@ func TestFloat16Slice(t *testing.T) { assert.NoError(t, err) var result []float16.Float16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -81,7 +81,7 @@ func TestBFloat16Slice(t *testing.T) { assert.NoError(t, err) var result []bfloat16.BFloat16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -92,7 +92,7 @@ func TestBFloat16Slice(t *testing.T) { assert.NoError(t, err) var result []bfloat16.BFloat16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -104,7 +104,7 @@ func TestBFloat16Slice(t *testing.T) { assert.NoError(t, err) var result []bfloat16.BFloat16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -119,7 +119,7 @@ func TestIntSlice(t *testing.T) { assert.NoError(t, err) var result []int - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -130,7 +130,7 @@ func TestIntSlice(t *testing.T) { assert.NoError(t, err) var result []int - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -142,7 +142,7 @@ func TestIntSlice(t *testing.T) { assert.NoError(t, err) var result []int - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -157,7 +157,7 @@ func TestUintSlice(t *testing.T) { assert.NoError(t, err) var result []uint - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -168,7 +168,7 @@ func TestUintSlice(t *testing.T) { assert.NoError(t, err) var result []uint - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -180,7 +180,7 @@ func TestUintSlice(t *testing.T) { assert.NoError(t, err) var result []uint - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -195,7 +195,7 @@ func TestInt8Slice(t *testing.T) { assert.NoError(t, err) var result []int8 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -206,7 +206,7 @@ func TestInt8Slice(t *testing.T) { assert.NoError(t, err) var result []int8 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -218,7 +218,7 @@ func TestInt8Slice(t *testing.T) { assert.NoError(t, err) var result []int8 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -233,7 +233,7 @@ func TestInt16Slice(t *testing.T) { assert.NoError(t, err) var result []int16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -244,7 +244,7 @@ func TestInt16Slice(t *testing.T) { assert.NoError(t, err) var result []int16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -256,7 +256,7 @@ func TestInt16Slice(t *testing.T) { assert.NoError(t, err) var result []int16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -271,7 +271,7 @@ func TestInt32Slice(t *testing.T) { assert.NoError(t, err) var result []int32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -282,7 +282,7 @@ func TestInt32Slice(t *testing.T) { assert.NoError(t, err) var result []int32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -294,7 +294,7 @@ func TestInt32Slice(t *testing.T) { assert.NoError(t, err) var result []int32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -309,7 +309,7 @@ func TestInt64Slice(t *testing.T) { assert.NoError(t, err) var result []int64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -320,7 +320,7 @@ func TestInt64Slice(t *testing.T) { assert.NoError(t, err) var result []int64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -332,7 +332,7 @@ func TestInt64Slice(t *testing.T) { assert.NoError(t, err) var result []int64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -347,7 +347,7 @@ func TestUint16Slice(t *testing.T) { assert.NoError(t, err) var result []uint16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -358,7 +358,7 @@ func TestUint16Slice(t *testing.T) { assert.NoError(t, err) var result []uint16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -370,7 +370,7 @@ func TestUint16Slice(t *testing.T) { assert.NoError(t, err) var result []uint16 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -385,7 +385,7 @@ func TestUint32Slice(t *testing.T) { assert.NoError(t, err) var result []uint32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -396,7 +396,7 @@ func TestUint32Slice(t *testing.T) { assert.NoError(t, err) var result []uint32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -408,7 +408,7 @@ func TestUint32Slice(t *testing.T) { assert.NoError(t, err) var result []uint32 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) @@ -423,7 +423,7 @@ func TestUint64Slice(t *testing.T) { assert.NoError(t, err) var result []uint64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Equal(t, slice, result) }) @@ -434,7 +434,7 @@ func TestUint64Slice(t *testing.T) { assert.NoError(t, err) var result []uint64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -446,7 +446,7 @@ func TestUint64Slice(t *testing.T) { assert.NoError(t, err) var result []uint64 - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) assert.NoError(t, err) assert.Nil(t, result) }) diff --git a/go/fory/struct_test.go b/go/fory/struct_test.go index d97bd1f3bf..430babdcfb 100644 --- a/go/fory/struct_test.go +++ b/go/fory/struct_test.go @@ -56,7 +56,7 @@ func TestUnsignedTypeSerialization(t *testing.T) { } var result any - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) if err != nil { t.Fatalf("Deserialize failed: %v", err) } @@ -104,7 +104,7 @@ func TestOptionFieldSerialization(t *testing.T) { require.NoError(t, err) var result any - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) require.NoError(t, err) out := result.(*OptionStruct) @@ -357,7 +357,7 @@ func TestSetFieldSerializationSchemaConsistent(t *testing.T) { // Deserialize var result any - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) require.NoError(t, err, "Deserialize failed") resultObj := result.(*SetFieldsStruct) @@ -404,7 +404,7 @@ func TestSetFieldSerializationCompatible(t *testing.T) { // Deserialize var result any - err = f.Deserialize(data, &result) + err = testDeserialize(t, f, data, &result) require.NoError(t, err, "Deserialize failed") resultObj := result.(*SetFieldsStruct) @@ -525,7 +525,7 @@ func TestFloat16StructField(t *testing.T) { // Create new instance res := &StructWithFloat16{} - err = f.Deserialize(data, res) + err = testDeserialize(t, f, data, res) require.NoError(t, err) // Verify From cc2077e5a1d0cb205b9d15efce3afbdbb8e01fcd Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 13:48:38 +0530 Subject: [PATCH 18/27] fixed the fast-path check bug --- go/fory/buffer.go | 11 +++++++++++ go/fory/struct.go | 5 +++++ go/fory/type_def.go | 2 +- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index 08b7b8b21b..c896aac83b 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -1690,3 +1690,14 @@ func (b *ByteBuffer) Skip(length int, err *Error) { } b.readerIndex += length } + +// CheckReadable ensures that at least n bytes are available to read. +// In stream mode, it will attempt to fill the buffer if necessary. +// +//go:inline +func (b *ByteBuffer) CheckReadable(n int, err *Error) bool { + if b.readerIndex+n > len(b.data) { + return b.fill(n, err) + } + return true +} diff --git a/go/fory/struct.go b/go/fory/struct.go index 2cd21991bd..52e53ad997 100644 --- a/go/fory/struct.go +++ b/go/fory/struct.go @@ -1424,6 +1424,11 @@ func (s *structSerializer) ReadData(ctx *ReadContext, value reflect.Value) { // Phase 1: Fixed-size primitives (inline unsafe reads with endian handling) if s.fieldGroup.FixedSize > 0 { + var errOut Error + if !buf.CheckReadable(int(s.fieldGroup.FixedSize), &errOut) { + ctx.SetError(errOut) + return + } baseOffset := buf.ReaderIndex() data := buf.GetData() diff --git a/go/fory/type_def.go b/go/fory/type_def.go index f2bca01a6f..7376787643 100644 --- a/go/fory/type_def.go +++ b/go/fory/type_def.go @@ -291,7 +291,7 @@ func skipTypeDef(buffer *ByteBuffer, header int64, err *Error) { if sz == META_SIZE_MASK { sz += int(buffer.ReadVarUint32(err)) } - buffer.IncreaseReaderIndex(sz) + buffer.Skip(sz, err) } const BIG_NAME_THRESHOLD = 0b111111 // 6 bits for size when using 2 bits for encoding From 4e5a6d67d8bfb48d4e844665375687e7a19a8e20 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 14:26:34 +0530 Subject: [PATCH 19/27] removed unused testUnmarshal --- go/fory/test_helper_test.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/go/fory/test_helper_test.go b/go/fory/test_helper_test.go index 5cc051c289..bad5dff67c 100644 --- a/go/fory/test_helper_test.go +++ b/go/fory/test_helper_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package fory import ( @@ -64,9 +81,3 @@ func testDeserialize(t *testing.T, f *Fory, data []byte, v any) error { // Returns the original error from standard deserialization return err } - -// testUnmarshal is an identical helper for `Unmarshal` (which is often used in tests) -func testUnmarshal(t *testing.T, f *Fory, data []byte, v any) error { - t.Helper() - return testDeserialize(t, f, data, v) -} From a78197ffd5329bfe39b6b0a7811c5fe5e51ee6c9 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 14:57:51 +0530 Subject: [PATCH 20/27] moved StreamReader to a new file --- go/fory/stream.go | 122 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 go/fory/stream.go diff --git a/go/fory/stream.go b/go/fory/stream.go new file mode 100644 index 0000000000..43094525f3 --- /dev/null +++ b/go/fory/stream.go @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fory + +import ( + "io" + "reflect" +) + +// StreamReader supports robust sequential deserialization from a stream. +// It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, +// preventing data loss from prefetched buffers and preserving TypeResolver metadata +// (Meta Sharing) across object boundaries. +type StreamReader struct { + fory *Fory + reader io.Reader + buffer *ByteBuffer +} + +// NewStreamReader creates a new StreamReader that reads from the provided io.Reader. +// The StreamReader owns the buffer and maintains state across sequential Deserialize calls. +func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { + return f.NewStreamReaderWithMinCap(r, 0) +} + +// NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. +func (f *Fory) NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { + buf := NewByteBufferFromReader(r, minCap) + return &StreamReader{ + fory: f, + reader: r, + buffer: buf, + } +} + +// Deserialize reads the next object from the stream into the provided value. +// It uses a shared ReadContext for the lifetime of the StreamReader, clearing +// temporary state between calls but preserving the buffer and TypeResolver state. +func (sr *StreamReader) Deserialize(v any) error { + f := sr.fory + + // We only reset the temporary read state (like refTracker and outOfBand buffers), + // NOT the buffer or the type mapping, which must persist. + defer func() { + f.readCtx.refReader.Reset() + f.readCtx.outOfBandBuffers = nil + f.readCtx.outOfBandIndex = 0 + f.readCtx.err = Error{} + if f.readCtx.refResolver != nil { + f.readCtx.refResolver.resetRead() + } + }() + + // Temporarily swap buffer + origBuffer := f.readCtx.buffer + f.readCtx.buffer = sr.buffer + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + if isNull { + f.readCtx.buffer = origBuffer + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + // Restore original buffer + f.readCtx.buffer = origBuffer + + return nil +} + +// For Sequential Streaming use NewStreamReader instead of DeserializeFromReader. +// DeserializeFromReader deserializes a single object from a stream but will discard prefetched data +// and type metadata after the call. +func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { + defer f.resetReadState() + if f.readCtx.buffer.reader != r { + f.readCtx.buffer.ResetWithReader(r, 0) + } + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + if isNull { + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + return nil +} From 91d04f46aac0492e45222603369f41a8b154e176 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 15:12:03 +0530 Subject: [PATCH 21/27] refined apis --- go/fory/fory.go | 100 ---------------------------------------------- go/fory/stream.go | 13 +++--- 2 files changed, 5 insertions(+), 108 deletions(-) diff --git a/go/fory/fory.go b/go/fory/fory.go index c04e822d5a..09a0e3c6d2 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -20,7 +20,6 @@ package fory import ( "errors" "fmt" - "io" "reflect" "strconv" "strings" @@ -496,105 +495,6 @@ func (f *Fory) Deserialize(data []byte, v any) error { return nil } -// StreamReader supports robust sequential deserialization from a stream. -// It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, -// preventing data loss from prefetched buffers and preserving TypeResolver metadata -// (Meta Sharing) across object boundaries. -type StreamReader struct { - fory *Fory - reader io.Reader - buffer *ByteBuffer -} - -// NewStreamReader creates a new StreamReader that reads from the provided io.Reader. -// The StreamReader owns the buffer and maintains state across sequential Deserialize calls. -func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { - return f.NewStreamReaderWithMinCap(r, 0) -} - -// NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. -func (f *Fory) NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { - buf := NewByteBufferFromReader(r, minCap) - return &StreamReader{ - fory: f, - reader: r, - buffer: buf, - } -} - -// Deserialize reads the next object from the stream into the provided value. -// It uses a shared ReadContext for the lifetime of the StreamReader, clearing -// temporary state between calls but preserving the buffer and TypeResolver state. -func (sr *StreamReader) Deserialize(v any) error { - f := sr.fory - - // We only reset the temporary read state (like refTracker and outOfBand buffers), - // NOT the buffer or the type mapping, which must persist. - defer func() { - f.readCtx.refReader.Reset() - f.readCtx.outOfBandBuffers = nil - f.readCtx.outOfBandIndex = 0 - f.readCtx.err = Error{} - if f.readCtx.refResolver != nil { - f.readCtx.refResolver.resetRead() - } - }() - - // Temporarily swap buffer - origBuffer := f.readCtx.buffer - f.readCtx.buffer = sr.buffer - - isNull := readHeader(f.readCtx) - if f.readCtx.HasError() { - f.readCtx.buffer = origBuffer - return f.readCtx.TakeError() - } - - if isNull { - f.readCtx.buffer = origBuffer - return nil - } - - target := reflect.ValueOf(v).Elem() - f.readCtx.ReadValue(target, RefModeTracking, true) - if f.readCtx.HasError() { - f.readCtx.buffer = origBuffer - return f.readCtx.TakeError() - } - - // Restore original buffer - f.readCtx.buffer = origBuffer - - return nil -} - -// For Sequential Streaming use NewStreamReader instead of DeserializeFromReader. -// DeserializeFromReader deserializes a single object from a stream but will discard prefetched data -// and type metadata after the call. -func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { - defer f.resetReadState() - if f.readCtx.buffer.reader != r { - f.readCtx.buffer.ResetWithReader(r, 0) - } - - isNull := readHeader(f.readCtx) - if f.readCtx.HasError() { - return f.readCtx.TakeError() - } - - if isNull { - return nil - } - - target := reflect.ValueOf(v).Elem() - f.readCtx.ReadValue(target, RefModeTracking, true) - if f.readCtx.HasError() { - return f.readCtx.TakeError() - } - - return nil -} - // resetReadState resets read context state without allocation func (f *Fory) resetReadState() { f.readCtx.Reset() diff --git a/go/fory/stream.go b/go/fory/stream.go index 43094525f3..8318eb2ccb 100644 --- a/go/fory/stream.go +++ b/go/fory/stream.go @@ -27,32 +27,29 @@ import ( // preventing data loss from prefetched buffers and preserving TypeResolver metadata // (Meta Sharing) across object boundaries. type StreamReader struct { - fory *Fory reader io.Reader buffer *ByteBuffer } // NewStreamReader creates a new StreamReader that reads from the provided io.Reader. // The StreamReader owns the buffer and maintains state across sequential Deserialize calls. -func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { - return f.NewStreamReaderWithMinCap(r, 0) +func NewStreamReader(r io.Reader) *StreamReader { + return NewStreamReaderWithMinCap(r, 0) } // NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. -func (f *Fory) NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { +func NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { buf := NewByteBufferFromReader(r, minCap) return &StreamReader{ - fory: f, reader: r, buffer: buf, } } -// Deserialize reads the next object from the stream into the provided value. +// DeserializeFromStream reads the next object from the stream into the provided value. // It uses a shared ReadContext for the lifetime of the StreamReader, clearing // temporary state between calls but preserving the buffer and TypeResolver state. -func (sr *StreamReader) Deserialize(v any) error { - f := sr.fory +func (f *Fory) DeserializeFromStream(sr *StreamReader, v any) error { // We only reset the temporary read state (like refTracker and outOfBand buffers), // NOT the buffer or the type mapping, which must persist. From 8935868469e6081b8a28a27a6d82c615ce9da97d Mon Sep 17 00:00:00 2001 From: ayush00git Date: Tue, 3 Mar 2026 15:14:47 +0530 Subject: [PATCH 22/27] updated test suites --- go/fory/stream_test.go | 8 ++++---- go/fory/test_helper_test.go | 12 ++++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index e48fa37868..701909d381 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -160,12 +160,12 @@ func TestStreamReaderSequential(t *testing.T) { fDec.RegisterStruct(&StreamTestStruct{}, 100) // Create a StreamReader - sr := fDec.NewStreamReader(&buf) + sr := NewStreamReader(&buf) // Deserialize sequentially var out1, out2, out3 StreamTestStruct - err := sr.Deserialize(&out1) + err := fDec.DeserializeFromStream(sr, &out1) if err != nil { t.Fatalf("Deserialize 1 failed: %v", err) } @@ -173,7 +173,7 @@ func TestStreamReaderSequential(t *testing.T) { t.Errorf("Msg 1 mismatch. Got: %+v, Want: %+v", out1, msg1) } - err = sr.Deserialize(&out2) + err = fDec.DeserializeFromStream(sr, &out2) if err != nil { t.Fatalf("Deserialize 2 failed: %v", err) } @@ -181,7 +181,7 @@ func TestStreamReaderSequential(t *testing.T) { t.Errorf("Msg 2 mismatch. Got: %+v, Want: %+v", out2, msg2) } - err = sr.Deserialize(&out3) + err = fDec.DeserializeFromStream(sr, &out3) if err != nil { t.Fatalf("Deserialize 3 failed: %v", err) } diff --git a/go/fory/test_helper_test.go b/go/fory/test_helper_test.go index bad5dff67c..a455330ff5 100644 --- a/go/fory/test_helper_test.go +++ b/go/fory/test_helper_test.go @@ -61,17 +61,13 @@ func testDeserialize(t *testing.T, f *Fory, data []byte, v any) error { t.Fatalf("testDeserialize requires a pointer to a value, got %v", vType) } - // Create new pointer to a new zero value of the element type - v2 := reflect.New(vType.Elem()).Interface() - stream := &oneByteReader{data: data, pos: 0} // Create a new stream reader. The stream context handles boundaries and compactions. - streamReader := f.NewStreamReader(stream) - errStream := streamReader.Deserialize(v2) - - if errStream != nil { - t.Fatalf("Stream deserialization via OneByteStream failed: %v", errStream) + streamReader := NewStreamReader(stream) + err = f.DeserializeFromStream(streamReader, v) + if err != nil { + t.Fatalf("Stream deserialization via OneByteStream failed: %v", err) } // Note: We don't assert deep equality because many tests deserialize into interfaces From d5040990b1e88538131f470852b9a44f6f098dc1 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Wed, 4 Mar 2026 16:10:34 +0530 Subject: [PATCH 23/27] renamed StreamReader api to InputStream and implemented a buffer shrink api --- go/fory/stream.go | 73 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/go/fory/stream.go b/go/fory/stream.go index 8318eb2ccb..aca232975d 100644 --- a/go/fory/stream.go +++ b/go/fory/stream.go @@ -22,34 +22,81 @@ import ( "reflect" ) -// StreamReader supports robust sequential deserialization from a stream. +// InputStream supports robust sequential deserialization from a stream. // It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, // preventing data loss from prefetched buffers and preserving TypeResolver metadata // (Meta Sharing) across object boundaries. -type StreamReader struct { +type InputStream struct { reader io.Reader buffer *ByteBuffer } -// NewStreamReader creates a new StreamReader that reads from the provided io.Reader. -// The StreamReader owns the buffer and maintains state across sequential Deserialize calls. -func NewStreamReader(r io.Reader) *StreamReader { - return NewStreamReaderWithMinCap(r, 0) +// NewInputStream creates a new InputStream that reads from the provided io.Reader. +// The InputStream owns the buffer and maintains state across sequential Deserialize calls. +func NewInputStream(r io.Reader) *InputStream { + return NewInputStreamWithMinCap(r, 0) } -// NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. -func NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { +// NewInputStreamWithMinCap creates a new InputStream with a specified minimum buffer capacity. +func NewInputStreamWithMinCap(r io.Reader, minCap int) *InputStream { buf := NewByteBufferFromReader(r, minCap) - return &StreamReader{ + return &InputStream{ reader: r, buffer: buf, } } +// Shrink compacts the internal buffer, dropping already-read bytes to reclaim memory. +// It applies a heuristic to avoid tiny frequent compactions and reallocates the backing +// slice if the capacity becomes excessively large compared to the remaining data. +func (is *InputStream) Shrink() { + b := is.buffer + if b == nil { + return + } + + readPos := b.readerIndex + // Best-effort policy: keep a 4096-byte floor to avoid tiny frequent compactions + if readPos <= 4096 || readPos < b.minCap { + return + } + + remaining := b.writerIndex - readPos + currentCapacity := cap(b.data) + targetCapacity := currentCapacity + + if currentCapacity > b.minCap { + if remaining == 0 { + targetCapacity = b.minCap + } else if remaining <= currentCapacity/4 { + doubled := remaining * 2 + targetCapacity = doubled + if targetCapacity < b.minCap { + targetCapacity = b.minCap + } + } + } + + if targetCapacity < currentCapacity { + // Actually reclaim memory by copying to a new, smaller slice + newData := make([]byte, remaining, targetCapacity) + copy(newData, b.data[readPos:b.writerIndex]) + b.data = newData + b.writerIndex = remaining + b.readerIndex = 0 + } else if readPos > 0 { + // Just compact without reallocating + copy(b.data, b.data[readPos:b.writerIndex]) + b.writerIndex = remaining + b.readerIndex = 0 + b.data = b.data[:remaining] + } +} + // DeserializeFromStream reads the next object from the stream into the provided value. -// It uses a shared ReadContext for the lifetime of the StreamReader, clearing +// It uses a shared ReadContext for the lifetime of the InputStream, clearing // temporary state between calls but preserving the buffer and TypeResolver state. -func (f *Fory) DeserializeFromStream(sr *StreamReader, v any) error { +func (f *Fory) DeserializeFromStream(is *InputStream, v any) error { // We only reset the temporary read state (like refTracker and outOfBand buffers), // NOT the buffer or the type mapping, which must persist. @@ -65,7 +112,7 @@ func (f *Fory) DeserializeFromStream(sr *StreamReader, v any) error { // Temporarily swap buffer origBuffer := f.readCtx.buffer - f.readCtx.buffer = sr.buffer + f.readCtx.buffer = is.buffer isNull := readHeader(f.readCtx) if f.readCtx.HasError() { @@ -91,7 +138,7 @@ func (f *Fory) DeserializeFromStream(sr *StreamReader, v any) error { return nil } -// For Sequential Streaming use NewStreamReader instead of DeserializeFromReader. +// For Sequential Streaming use NewInputStream instead of DeserializeFromReader. // DeserializeFromReader deserializes a single object from a stream but will discard prefetched data // and type metadata after the call. func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { From 53e4f3d2396ccd5dbf1cf81859dd0aa617fde43a Mon Sep 17 00:00:00 2001 From: ayush00git Date: Wed, 4 Mar 2026 16:12:04 +0530 Subject: [PATCH 24/27] replaced the occurence with new api methods --- go/fory/stream_test.go | 6 +++--- go/fory/test_helper_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index 701909d381..037e811f66 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -135,7 +135,7 @@ func TestStreamDeserializationEOF(t *testing.T) { } } -func TestStreamReaderSequential(t *testing.T) { +func TestInputStreamSequential(t *testing.T) { f := New() // Register type in compatible mode to test Meta Sharing across sequential reads f.config.Compatible = true @@ -159,8 +159,8 @@ func TestStreamReaderSequential(t *testing.T) { fDec.config.Compatible = true fDec.RegisterStruct(&StreamTestStruct{}, 100) - // Create a StreamReader - sr := NewStreamReader(&buf) + // Create a InputStream + sr := NewInputStream(&buf) // Deserialize sequentially var out1, out2, out3 StreamTestStruct diff --git a/go/fory/test_helper_test.go b/go/fory/test_helper_test.go index a455330ff5..ddbcf4fd9c 100644 --- a/go/fory/test_helper_test.go +++ b/go/fory/test_helper_test.go @@ -64,7 +64,7 @@ func testDeserialize(t *testing.T, f *Fory, data []byte, v any) error { stream := &oneByteReader{data: data, pos: 0} // Create a new stream reader. The stream context handles boundaries and compactions. - streamReader := NewStreamReader(stream) + streamReader := NewInputStream(stream) err = f.DeserializeFromStream(streamReader, v) if err != nil { t.Fatalf("Stream deserialization via OneByteStream failed: %v", err) From 0300946984a1b639f64a1f4592ec54c79c164b10 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Wed, 4 Mar 2026 16:18:24 +0530 Subject: [PATCH 25/27] added unit test for shrink buffer api --- go/fory/stream_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index 037e811f66..d92724a376 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -189,3 +189,50 @@ func TestInputStreamSequential(t *testing.T) { t.Errorf("Msg 3 mismatch. Got: %+v, Want: %+v", out3, msg3) } } + +func TestInputStreamShrink(t *testing.T) { + // Create a large payload that easily escapes the minCap (4096) + data := make([]byte, 10000) + for i := range data { + data[i] = byte(i % 256) + } + + // Create a stream reader with a tiny minCap so we can trigger Shrink reliably + buf := bytes.NewReader(data) + sr := NewInputStreamWithMinCap(buf, 100) + + // Force a read/fill to pull a chunk into memory + err := sr.buffer.fill(5000, nil) + if !err { + t.Fatalf("Failed to fill buffer") + } + + // Fake an artificial read that consumed a massive portion of the buffer + originalCapacity := cap(sr.buffer.data) + sr.buffer.readerIndex = 4500 + + // Trigger Shrink + sr.Shrink() + + // 1. Validate reader index was successfully reset + if sr.buffer.readerIndex != 0 { + t.Errorf("Expected readerIndex to reset to 0, got %d", sr.buffer.readerIndex) + } + + // 2. Validate the capacity actually shrank (reclaimed memory) + newCapacity := cap(sr.buffer.data) + if newCapacity >= originalCapacity { + t.Errorf("Expected capacity to shrink (was %d, now %d)", originalCapacity, newCapacity) + } + + // 3. Validate the remaining unread data remained intact + if sr.buffer.writerIndex != 500 { + t.Errorf("Expected writerIndex to be 500 remaining bytes, got %d", sr.buffer.writerIndex) + } + for i := 0; i < 500; i++ { + if sr.buffer.data[i] != byte((4500+i)%256) { + t.Errorf("Data corruption post-shrink at index %d", i) + break + } + } +} From 9f929686fe47702bb77d847636c4a712cebe2013 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Wed, 4 Mar 2026 16:24:48 +0530 Subject: [PATCH 26/27] fixed indentation --- go/fory/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/fory/stream.go b/go/fory/stream.go index aca232975d..95e98754db 100644 --- a/go/fory/stream.go +++ b/go/fory/stream.go @@ -47,7 +47,7 @@ func NewInputStreamWithMinCap(r io.Reader, minCap int) *InputStream { } // Shrink compacts the internal buffer, dropping already-read bytes to reclaim memory. -// It applies a heuristic to avoid tiny frequent compactions and reallocates the backing +// It applies a heuristic to avoid tiny frequent compactions and reallocates the backing // slice if the capacity becomes excessively large compared to the remaining data. func (is *InputStream) Shrink() { b := is.buffer From 722e25208e4f8cd730bd2b3a67bfb61ca9351bfc Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 5 Mar 2026 21:41:41 +0530 Subject: [PATCH 27/27] correct err checking and removed reader from InputStream --- go/fory/buffer.go | 12 ++++++------ go/fory/stream.go | 14 ++++++-------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index c896aac83b..714b0ba3c8 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -344,7 +344,7 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 { //go:inline func (b *ByteBuffer) ReadUint64(err *Error) uint64 { if b.readerIndex+8 > len(b.data) { - if !b.fill(8, nil) { + if !b.fill(8, err) { return 0 } } @@ -1002,7 +1002,7 @@ func (b *ByteBuffer) WriteVaruint36Small(value uint64) { // //go:inline func (b *ByteBuffer) ReadVaruint36Small(err *Error) uint64 { - if b.remaining() >= 8 || (b.reader != nil && b.fill(8, nil)) { + if b.remaining() >= 8 { return b.readVaruint36SmallFast() } return b.readVaruint36SmallSlow(err) @@ -1118,7 +1118,7 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { return int64(i >> 1) // arithmetic right shift } if b.readerIndex+9 > len(b.data) { - if !b.fill(9, nil) { + if !b.fill(9, err) { return 0 } } @@ -1171,7 +1171,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { return uint64(i >> 1) } if b.readerIndex+9 > len(b.data) { - if !b.fill(9, nil) { + if !b.fill(9, err) { return 0 } } @@ -1189,7 +1189,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint64(err *Error) uint64 { - if b.remaining() >= 9 || (b.reader != nil && b.fill(9, nil)) { + if b.remaining() >= 9 { return b.readVarUint64Fast() } return b.readVarUint64Slow(err) @@ -1363,7 +1363,7 @@ func (b *ByteBuffer) UnsafeReadVarUint64() uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint32(err *Error) uint32 { - if b.remaining() >= 8 || (b.reader != nil && b.fill(8, nil)) { // Need 8 bytes for bulk uint64 read in fast path + if b.remaining() >= 8 { // Need 8 bytes for bulk uint64 read in fast path return b.readVarUint32Fast() } return b.readVarUint32Slow(err) diff --git a/go/fory/stream.go b/go/fory/stream.go index 95e98754db..558fffad78 100644 --- a/go/fory/stream.go +++ b/go/fory/stream.go @@ -27,7 +27,6 @@ import ( // preventing data loss from prefetched buffers and preserving TypeResolver metadata // (Meta Sharing) across object boundaries. type InputStream struct { - reader io.Reader buffer *ByteBuffer } @@ -41,7 +40,6 @@ func NewInputStream(r io.Reader) *InputStream { func NewInputStreamWithMinCap(r io.Reader, minCap int) *InputStream { buf := NewByteBufferFromReader(r, minCap) return &InputStream{ - reader: r, buffer: buf, } } @@ -138,14 +136,14 @@ func (f *Fory) DeserializeFromStream(is *InputStream, v any) error { return nil } -// For Sequential Streaming use NewInputStream instead of DeserializeFromReader. -// DeserializeFromReader deserializes a single object from a stream but will discard prefetched data -// and type metadata after the call. +// DeserializeFromReader deserializes a single object from a stream. +// It is strictly stateless: the buffer and all read state are always reset before +// each call, discarding any prefetched data and type metadata. +// For sequential multi-object reads on the same stream, use NewInputStream instead. func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { defer f.resetReadState() - if f.readCtx.buffer.reader != r { - f.readCtx.buffer.ResetWithReader(r, 0) - } + // Always reset to enforce stateless semantics. + f.readCtx.buffer.ResetWithReader(r, 0) isNull := readHeader(f.readCtx) if f.readCtx.HasError() {