From 3b2d1014810f38ec4f0080e3eab0dfd75440e110 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Apr 2026 17:26:34 +0200 Subject: [PATCH] GH-49697: [C++][CI] Check IPC file body bounds are in sync with decoder outcome --- cpp/src/arrow/ipc/message.cc | 10 +- cpp/src/arrow/ipc/reader.cc | 198 +++++++++++++++++------------------ testing | 2 +- 3 files changed, 106 insertions(+), 104 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index c21eb913c389..84ee62fe9e8d 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -423,10 +423,12 @@ static Result> ReadMessageInternal( body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); } - if (body->size() < decoder.next_required_size()) { - return Status::IOError("Expected to be able to read ", - decoder.next_required_size(), - " bytes for message body, got ", body->size()); + if (body->size() != decoder.next_required_size()) { + // The streaming decoder got out of sync with the actual advertised + // metadata and body size, which signals an invalid IPC file. + return Status::IOError("Invalid IPC file: advertised body size is ", body->size(), + ", but message decoder expects to read ", + decoder.next_required_size(), " bytes instead"); } RETURN_NOT_OK(decoder.Consume(body)); return result; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index b79fbf6dd712..7afe7dc55fb7 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -122,14 +122,14 @@ Status InvalidMessageType(MessageType expected, MessageType actual) { /// \brief Structure to keep common arguments to be passed struct IpcReadContext { - IpcReadContext(DictionaryMemo* memo, const IpcReadOptions& option, bool swap, + IpcReadContext(DictionaryMemo* memo, const IpcReadOptions& option, bool swap_endian, MetadataVersion version = MetadataVersion::V5, Compression::type kind = Compression::UNCOMPRESSED) : dictionary_memo(memo), options(option), metadata_version(version), compression(kind), - swap_endian(swap) {} + swap_endian(swap_endian) {} DictionaryMemo* dictionary_memo; @@ -589,6 +589,7 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op } AppendFrom(field->child_data); } + // Dictionary buffers are decompressed separately (see ReadDictionary). } BufferPtrVector Get(const ArrayDataVector& fields) && { @@ -613,16 +614,90 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op }); } +// Helper class to run post-ArrayLoader steps: +// buffer decompression, dictionary resolution, buffer re-alignment. +struct RecordBatchLoader { + Result> CreateRecordBatch(ArrayDataVector columns) { + ARROW_ASSIGN_OR_RAISE(auto filtered_columns, CreateColumns(std::move(columns))); + + std::shared_ptr filtered_schema; + if (!inclusion_mask_.empty()) { + FieldVector filtered_fields; + for (int i = 0; i < schema_->num_fields(); ++i) { + if (inclusion_mask_[i]) { + filtered_fields.push_back(schema_->field(i)); + } + } + filtered_schema = schema(std::move(filtered_fields), schema_->metadata()); + } else { + filtered_schema = schema_; + } + + return RecordBatch::Make(std::move(filtered_schema), batch_length_, + std::move(filtered_columns)); + } + + Result CreateColumns(ArrayDataVector columns, + bool resolve_dictionaries = true) { + if (resolve_dictionaries) { + // Dictionary resolution needs to happen on the unfiltered columns, + // because fields are mapped structurally (by path in the original schema). + RETURN_NOT_OK(ResolveDictionaries(columns, *context_.dictionary_memo, + context_.options.memory_pool)); + } + + ArrayDataVector filtered_columns; + if (!inclusion_mask_.empty()) { + FieldVector filtered_fields; + for (int i = 0; i < schema_->num_fields(); ++i) { + if (inclusion_mask_[i]) { + filtered_columns.push_back(std::move(columns[i])); + } + } + columns.clear(); + } else { + filtered_columns = std::move(columns); + } + + if (context_.compression != Compression::UNCOMPRESSED) { + RETURN_NOT_OK( + DecompressBuffers(context_.compression, context_.options, &filtered_columns)); + } + + // Swap endian if necessary + if (context_.swap_endian) { + for (auto& column : filtered_columns) { + ARROW_ASSIGN_OR_RAISE( + column, arrow::internal::SwapEndianArrayData(std::move(column), + context_.options.memory_pool)); + } + } + if (context_.options.ensure_alignment != Alignment::kAnyAlignment) { + for (auto& column : filtered_columns) { + ARROW_ASSIGN_OR_RAISE( + column, + util::EnsureAlignment( + std::move(column), + // The numerical value of the enum is taken literally as byte alignment + static_cast(context_.options.ensure_alignment), + context_.options.memory_pool)); + } + } + return filtered_columns; + } + + IpcReadContext context_; + std::shared_ptr schema_; + int64_t batch_length_; + std::vector inclusion_mask_; +}; + Result> LoadRecordBatchSubset( const flatbuf::RecordBatch* metadata, const std::shared_ptr& schema, const std::vector* inclusion_mask, const IpcReadContext& context, io::RandomAccessFile* file) { ArrayLoader loader(metadata, context.metadata_version, context.options, file); - ArrayDataVector columns(schema->num_fields()); - ArrayDataVector filtered_columns; - FieldVector filtered_fields; - std::shared_ptr filtered_schema; for (int i = 0; i < schema->num_fields(); ++i) { const Field& field = *schema->field(i); @@ -634,10 +709,6 @@ Result> LoadRecordBatchSubset( return Status::IOError("Array length did not match record batch length"); } columns[i] = std::move(column); - if (inclusion_mask) { - filtered_columns.push_back(columns[i]); - filtered_fields.push_back(schema->field(i)); - } } else { // Skip field. This logic must be executed to advance the state of the // loader to the next field @@ -645,41 +716,9 @@ Result> LoadRecordBatchSubset( } } - // Dictionary resolution needs to happen on the unfiltered columns, - // because fields are mapped structurally (by path in the original schema). - RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, - context.options.memory_pool)); - - if (inclusion_mask) { - filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); - columns.clear(); - } else { - filtered_schema = schema; - filtered_columns = std::move(columns); - } - if (context.compression != Compression::UNCOMPRESSED) { - RETURN_NOT_OK( - DecompressBuffers(context.compression, context.options, &filtered_columns)); - } - - // swap endian in a set of ArrayData if necessary (swap_endian == true) - if (context.swap_endian) { - for (auto& filtered_column : filtered_columns) { - ARROW_ASSIGN_OR_RAISE(filtered_column, - arrow::internal::SwapEndianArrayData(filtered_column)); - } - } - auto batch = RecordBatch::Make(std::move(filtered_schema), metadata->length(), - std::move(filtered_columns)); - - if (ARROW_PREDICT_FALSE(context.options.ensure_alignment != Alignment::kAnyAlignment)) { - return util::EnsureAlignment(batch, - // the numerical value of ensure_alignment enum is taken - // literally as byte alignment - static_cast(context.options.ensure_alignment), - context.options.memory_pool); - } - return batch; + RecordBatchLoader batch_loader{context, schema, metadata->length(), + inclusion_mask ? *inclusion_mask : std::vector{}}; + return batch_loader.CreateRecordBatch(std::move(columns)); } Result> LoadRecordBatch( @@ -845,7 +884,7 @@ Status UnpackSchemaMessage(const Message& message, const IpcReadOptions& options out_schema, field_inclusion_mask, swap_endian); } -Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, +Status ReadDictionary(const Buffer& metadata, IpcReadContext context, DictionaryKind* kind, io::RandomAccessFile* file) { const flatbuf::Message* message = nullptr; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message)); @@ -860,13 +899,12 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data"); - Compression::type compression; - RETURN_NOT_OK(GetCompression(batch_meta, &compression)); - if (compression == Compression::UNCOMPRESSED && + RETURN_NOT_OK(GetCompression(batch_meta, &context.compression)); + if (context.compression == Compression::UNCOMPRESSED && message->version() == flatbuf::MetadataVersion::MetadataVersion_V4) { // Possibly obtain codec information from experimental serialization format // in 0.17.x - RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); + RETURN_NOT_OK(GetCompressionExperimental(message, &context.compression)); } const int64_t id = dictionary_batch->id(); @@ -882,16 +920,14 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, const Field dummy_field("", value_type); RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get())); - if (compression != Compression::UNCOMPRESSED) { - ArrayDataVector dict_fields{dict_data}; - RETURN_NOT_OK(DecompressBuffers(compression, context.options, &dict_fields)); - } - - // swap endian in dict_data if necessary (swap_endian == true) - if (context.swap_endian) { - ARROW_ASSIGN_OR_RAISE(dict_data, ::arrow::internal::SwapEndianArrayData( - dict_data, context.options.memory_pool)); - } + // Run post-load steps: buffer decompression, etc. + RecordBatchLoader batch_loader{context, /*schema=*/nullptr, batch_meta->length(), + /*inclusion_mask=*/std::vector{}}; + ARROW_ASSIGN_OR_RAISE( + auto dict_columns, + batch_loader.CreateColumns({dict_data}, /*resolve_dictionaries=*/false)); + DCHECK_EQ(dict_columns.size(), 1); + dict_data = dict_columns[0]; if (dictionary_batch->isDelta()) { if (kind != nullptr) { @@ -1756,10 +1792,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { std::shared_ptr out_schema; RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields, &inclusion_mask, &out_schema)); - for (int i = 0; i < schema->num_fields(); ++i) { const Field& field = *schema->field(i); - if (inclusion_mask.size() == 0 || inclusion_mask[i]) { + if (inclusion_mask.empty() || inclusion_mask[i]) { // Read field auto column = std::make_shared(); RETURN_NOT_OK(loader.Load(&field, column.get())); @@ -1767,21 +1802,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::IOError("Array length did not match record batch length"); } columns[i] = std::move(column); - if (inclusion_mask.size() > 0) { - filtered_columns.push_back(columns[i]); - filtered_fields.push_back(schema->field(i)); - } } else { // Skip field. This logic must be executed to advance the state of the // loader to the next field RETURN_NOT_OK(loader.SkipField(&field)); } } - if (inclusion_mask.size() > 0) { - filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); - } else { - filtered_schema = schema; - } return Status::OK(); } @@ -1798,31 +1824,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } loader.read_request().FulfillRequest(buffers); - // Dictionary resolution needs to happen on the unfiltered columns, - // because fields are mapped structurally (by path in the original schema). - RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, - context.options.memory_pool)); - if (inclusion_mask.size() > 0) { - columns.clear(); - } else { - filtered_columns = std::move(columns); - } - - if (context.compression != Compression::UNCOMPRESSED) { - RETURN_NOT_OK( - DecompressBuffers(context.compression, context.options, &filtered_columns)); - } - - // swap endian in a set of ArrayData if necessary (swap_endian == true) - if (context.swap_endian) { - for (int i = 0; i < static_cast(filtered_columns.size()); ++i) { - ARROW_ASSIGN_OR_RAISE(filtered_columns[i], - arrow::internal::SwapEndianArrayData( - filtered_columns[i], context.options.memory_pool)); - } - } - return RecordBatch::Make(std::move(filtered_schema), length, - std::move(filtered_columns)); + RecordBatchLoader batch_loader{context, schema, length, std::move(inclusion_mask)}; + return batch_loader.CreateRecordBatch(std::move(columns)); } std::shared_ptr schema; @@ -1834,9 +1837,6 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { ArrayDataVector columns; io::internal::ReadRangeCache cache; int64_t length; - ArrayDataVector filtered_columns; - FieldVector filtered_fields; - std::shared_ptr filtered_schema; std::vector inclusion_mask; }; diff --git a/testing b/testing index a871ddc17a4d..249079a810ca 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a871ddc17a4dd936b7aa43898d59f86a11c3a2b5 +Subproject commit 249079a810caedda6898464003c7ef8a47efeeae