diff --git a/lang/java/avro/src/main/java/org/apache/avro/Conversions.java b/lang/java/avro/src/main/java/org/apache/avro/Conversions.java index 2fa15eb959c..9130c572f03 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Conversions.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Conversions.java @@ -190,8 +190,7 @@ public BigDecimal fromBytes(final ByteBuffer value, final Schema schema, final L try { BigInteger bg = null; - ByteBuffer buffer = decoder.readBytes(null); - byte[] array = buffer.array(); + byte[] array = decoder.readBytes(); if (array.length > 0) { bg = new BigInteger(array); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader12.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader12.java index 0c0a670ea40..489ea6da865 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader12.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader12.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.io.Closeable; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -76,9 +75,7 @@ public DataFileReader12(SeekableInput sin, DatumReader reader) throws IOExcep do { for (long i = 0; i < l; i++) { String key = vin.readString(null).toString(); - ByteBuffer value = vin.readBytes(null); - byte[] bb = new byte[value.remaining()]; - value.get(bb); + byte[] bb = vin.readBytes(); meta.put(key, bb); } } while ((l = vin.mapNext()) != 0); diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java index f93ad8e5fdd..6b4e719d17c 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java @@ -129,9 +129,7 @@ void initialize(InputStream in, byte[] magic) throws IOException { do { for (long i = 0; i < l; i++) { String key = vin.readString(null).toString(); - ByteBuffer value = vin.readBytes(null); - byte[] bb = new byte[value.remaining()]; - value.get(bb); + byte[] bb = vin.readBytes(); header.meta.put(key, bb); header.metaKeyList.add(key); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java index 77fc8490764..2b63068396b 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java @@ -334,6 +334,18 @@ public ByteBuffer readBytes(ByteBuffer old) throws IOException { return result; } + @Override + public byte[] readBytes() throws IOException { + int length = SystemLimitException.checkMaxBytesLength(readLong()); + ensureAvailableBytes(length); + if (length == 0) { + return EMPTY_BYTES; + } + final byte[] result = new byte[length]; + doReadBytes(result, 0, result.length); + return result; + } + @Override public void skipBytes() throws IOException { doSkipBytes(readLong()); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java index 80640a61aa0..8316124d5a1 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.avro.AvroTypeException; import org.apache.avro.util.Utf8; /** @@ -41,6 +42,8 @@ public abstract class Decoder { + protected static final byte[] EMPTY_BYTES = new byte[0]; + /** * "Reads" a null value. (Doesn't actually read anything, but advances the state * of the parser if the implementation is stateful.) @@ -126,6 +129,22 @@ public abstract class Decoder { */ public abstract ByteBuffer readBytes(ByteBuffer old) throws IOException; + /** + * Reads a byte-string written by {@link Encoder#writeBytes}. + *

+ * This is useful when you want to avoid the creation of a ByteBuffer, and only + * want the byte[], e.g.: + * + *

+   * ByteBuffer buffer = decoder.readBytes(null);
+   * byte[] array = buffer.array();
+   * 
+ * + * @throws AvroTypeException If this is a stateful reader and byte-string is not + * the type of the next value to be read + */ + public abstract byte[] readBytes() throws IOException; + /** * Discards a byte-string written by {@link Encoder#writeBytes}. * diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java index ac251550da2..eafec328c82 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java @@ -51,6 +51,15 @@ public ByteBuffer read(ByteBuffer old, int length) throws IOException { result.limit(length); return result; } + + public byte[] read(int length) throws IOException { + if (length == 0) { + return EMPTY_BYTES; + } + final byte[] result = new byte[length]; + doReadBytes(result, 0, length); + return result; + } } private class ReuseByteReader extends ByteReader { @@ -159,6 +168,12 @@ public ByteBuffer readBytes(ByteBuffer old) throws IOException { return byteReader.read(old, SystemLimitException.checkMaxBytesLength(length)); } + @Override + public byte[] readBytes() throws IOException { + long length = readLong(); + return byteReader.read(SystemLimitException.checkMaxBytesLength(length)); + } + @Override protected void doSkipBytes(long length) throws IOException { while (length > 0) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java index 512c9ebf34f..fcb11a33a0d 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java @@ -394,18 +394,14 @@ private FieldReader createSimpleStringReader(Schema readerSchema) { private FieldReader createBytesPromotingToStringReader(Schema readerSchema) { String stringProperty = readerSchema.getProp(GenericData.STRING_PROP); if (GenericData.StringType.String.name().equals(stringProperty)) { - return (old, decoder) -> getStringFromByteBuffer(decoder.readBytes(null)); + return (old, decoder) -> new String(decoder.readBytes(), StandardCharsets.UTF_8); } else { - return (old, decoder) -> getUtf8FromByteBuffer(old, decoder.readBytes(null)); + return (old, decoder) -> getUtf8FromByteArray(old, decoder.readBytes()); } } - private String getStringFromByteBuffer(ByteBuffer buffer) { - return new String(buffer.array(), buffer.position(), buffer.remaining(), StandardCharsets.UTF_8); - } - - private Utf8 getUtf8FromByteBuffer(Object old, ByteBuffer buffer) { - return (old instanceof Utf8) ? ((Utf8) old).set(new Utf8(buffer.array())) : new Utf8(buffer.array()); + private Utf8 getUtf8FromByteArray(Object old, byte[] bytes) { + return (old instanceof Utf8) ? ((Utf8) old).set(bytes) : new Utf8(bytes); } private FieldReader createUnionReader(WriterUnion action) throws IOException { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java index 1876f87aaac..84ac344f30e 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java @@ -311,6 +311,18 @@ public ByteBuffer readBytes(ByteBuffer old) throws IOException { } } + @Override + public byte[] readBytes() throws IOException { + advance(Symbol.BYTES); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + return result; + } else { + throw error("bytes"); + } + } + private byte[] readByteArray() throws IOException { return in.getText().getBytes(StandardCharsets.ISO_8859_1); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java index 6bdb16a332c..43a943c19b4 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java @@ -202,7 +202,7 @@ public double readDouble() throws IOException { public Utf8 readString(Utf8 old) throws IOException { Symbol actual = parser.advance(Symbol.STRING); if (actual == Symbol.BYTES) { - return new Utf8(in.readBytes(null).array()); + return old == null ? new Utf8(in.readBytes()) : old.set(in.readBytes()); } else { assert actual == Symbol.STRING; return in.readString(old); @@ -213,7 +213,7 @@ public Utf8 readString(Utf8 old) throws IOException { public String readString() throws IOException { Symbol actual = parser.advance(Symbol.STRING); if (actual == Symbol.BYTES) { - return new String(in.readBytes(null).array(), StandardCharsets.UTF_8); + return new String(in.readBytes(), StandardCharsets.UTF_8); } else { assert actual == Symbol.STRING; return in.readString(); @@ -243,6 +243,18 @@ public ByteBuffer readBytes(ByteBuffer old) throws IOException { } } + @Override + public byte[] readBytes() throws IOException { + Symbol actual = parser.advance(Symbol.BYTES); + if (actual == Symbol.STRING) { + Utf8 s = in.readString(null); + return s.getBytes(); // readString(null) allocated an exactly fitting byte[] + } else { + assert actual == Symbol.BYTES; + return in.readBytes(); + } + } + @Override public void skipBytes() throws IOException { Symbol actual = parser.advance(Symbol.BYTES); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java index 26f79a16ff2..18e72f7e09a 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java @@ -36,7 +36,7 @@ * and configure. *

* ValidatingDecoder is not thread-safe. - * + * * @see Decoder * @see DecoderFactory */ @@ -124,6 +124,12 @@ public ByteBuffer readBytes(ByteBuffer old) throws IOException { return in.readBytes(old); } + @Override + public byte[] readBytes() throws IOException { + parser.advance(Symbol.BYTES); + return in.readBytes(); + } + @Override public void skipBytes() throws IOException { parser.advance(Symbol.BYTES); diff --git a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java index 7ba8e4827c6..898f3f4a131 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.lang.reflect.Array; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.HashMap; @@ -132,11 +131,11 @@ protected void addToArray(Object array, long pos, Object e) { throw new AvroRuntimeException("reflectDatumReader does not use addToArray"); } - @Override /** * Called to read an array instance. May be overridden for alternate array * representations. */ + @Override protected Object readArray(Object old, Schema expected, ResolvingDecoder in) throws IOException { Schema expectedType = expected.getElementType(); long l = in.readArrayStart(); @@ -244,14 +243,11 @@ protected Object createString(String value) { @Override protected Object readBytes(Object old, Schema s, Decoder in) throws IOException { - ByteBuffer bytes = in.readBytes(null); Class c = ReflectData.getClassProp(s, SpecificData.CLASS_PROP); if (c != null && c.isArray()) { - byte[] result = new byte[bytes.remaining()]; - bytes.get(result); - return result; + return in.readBytes(); } else { - return bytes; + return in.readBytes(null); } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java index a5c4ece29d7..8400fa26c97 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java +++ b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java @@ -132,6 +132,14 @@ public Utf8 set(String string) { return this; } + public Utf8 set(byte[] bytes) { + this.bytes = bytes; + this.length = SystemLimitException.checkMaxStringLength(bytes.length); + this.string = null; + this.hash = 0; + return this; + } + public Utf8 set(Utf8 other) { if (this.bytes.length < other.length) { this.bytes = new byte[other.length]; diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java index 33e6f098926..edb5b6749ef 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java @@ -148,6 +148,12 @@ void eofBytes(boolean useDirect) { Assertions.assertThrows(EOFException.class, () -> newDecoderWithNoData(useDirect).readBytes(null)); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void eofBytesRaw(boolean useDirect) { + Assertions.assertThrows(EOFException.class, () -> newDecoderWithNoData(useDirect).readBytes()); + } + @ParameterizedTest @ValueSource(booleans = { true, false }) void eofString(boolean useDirect) { @@ -471,6 +477,14 @@ public void testBytesNegativeLength(boolean useDirect) throws IOException { Assertions.assertEquals(ERROR_NEGATIVE, ex.getMessage()); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBytesNegativeLengthRaw(boolean useDirect) throws IOException { + Exception ex = Assertions.assertThrows(AvroRuntimeException.class, + () -> this.newDecoder(useDirect, -1).readBytes()); + Assertions.assertEquals(ERROR_NEGATIVE, ex.getMessage()); + } + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testBytesVmMaxSize(boolean useDirect) throws IOException { @@ -479,6 +493,14 @@ public void testBytesVmMaxSize(boolean useDirect) throws IOException { Assertions.assertEquals(ERROR_VM_LIMIT_BYTES, ex.getMessage()); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBytesVmMaxSizeRaw(boolean useDirect) throws IOException { + Exception ex = Assertions.assertThrows(UnsupportedOperationException.class, + () -> this.newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 1).readBytes()); + Assertions.assertEquals(ERROR_VM_LIMIT_BYTES, ex.getMessage()); + } + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testBytesMaxCustom(boolean useDirect) throws IOException { diff --git a/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java b/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java index 3e36d9a0214..f77042609cd 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java +++ b/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java @@ -27,6 +27,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import org.apache.avro.SystemLimitException; import org.apache.avro.TestSystemLimitException; @@ -97,6 +98,20 @@ void hashCodeReused() { assertEquals(127791473, u.hashCode()); u.setByteLength(4); assertEquals(4122302, u.hashCode()); + + u.set(getTrimmedBytes(new Utf8("zz"))); + assertEquals(4865, u.hashCode()); + u.setByteLength(1); + assertEquals(153, u.hashCode()); + + u.set(getTrimmedBytes(new Utf8("hello"))); + assertEquals(127791473, u.hashCode()); + u.setByteLength(4); + assertEquals(4122302, u.hashCode()); + } + + private byte[] getTrimmedBytes(Utf8 utf8) { + return Arrays.copyOf(utf8.getBytes(), utf8.getByteLength()); } /**