diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java index 25f6603ec22e..1d6389fbd70e 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java @@ -20,7 +20,8 @@ import org.apache.paimon.arrow.reader.ArrowBatchReader; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.io.ProjectableBundleRecords; +import org.apache.paimon.io.ReplayableBundleRecords; import org.apache.paimon.types.RowType; import org.apache.arrow.vector.VectorSchemaRoot; @@ -28,7 +29,7 @@ import java.util.Iterator; /** Batch records for vector schema root. */ -public class ArrowBundleRecords implements BundleRecords { +public class ArrowBundleRecords implements ProjectableBundleRecords { private final VectorSchemaRoot vectorSchemaRoot; private final RowType rowType; @@ -55,4 +56,9 @@ public Iterator iterator() { ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive); return arrowBatchReader.readBatch(vectorSchemaRoot).iterator(); } + + @Override + public ReplayableBundleRecords project(int[] projection) { + return new ArrowBundleRecords(vectorSchemaRoot, rowType.project(projection), caseSensitive); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/io/ProjectableBundleRecords.java b/paimon-common/src/main/java/org/apache/paimon/io/ProjectableBundleRecords.java new file mode 100644 index 000000000000..35cfebed4ec7 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/ProjectableBundleRecords.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.annotation.Experimental; + +/** + * Opt-in extension for replayable bundles that can preserve their bundle type under projection. + * + *

This allows projection-aware writers to keep format-specific bundle fast-paths without + * coupling to concrete bundle implementations from other modules. + */ +@Experimental +public interface ProjectableBundleRecords extends ReplayableBundleRecords { + + /** Returns a replayable bundle that exposes the projected fields in projection order. */ + ReplayableBundleRecords project(int[] projection); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/ReplayableBundleRecords.java b/paimon-common/src/main/java/org/apache/paimon/io/ReplayableBundleRecords.java new file mode 100644 index 000000000000..958b381f2d99 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/ReplayableBundleRecords.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.annotation.Experimental; + +/** + * Opt-in marker for {@link BundleRecords} implementations that support repeated iteration. + * + *

This keeps replayability as an explicit capability instead of strengthening the base {@link + * BundleRecords} contract for all callers. + */ +@Experimental +public interface ReplayableBundleRecords extends BundleRecords {} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BundleAwareRowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/BundleAwareRowDataFileWriter.java new file mode 100644 index 000000000000..c677f3079096 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/BundleAwareRowDataFileWriter.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.io.FileWriterContext; +import org.apache.paimon.io.ReplayableBundleRecords; +import org.apache.paimon.io.RowDataFileWriter; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.LongCounter; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.function.Supplier; + +/** + * Dedicated-path {@link RowDataFileWriter} which preserves row-level side effects when writing + * replayable bundles. + */ +class BundleAwareRowDataFileWriter extends RowDataFileWriter implements BundlePassThroughWriter { + + private final boolean supportsBundlePassThrough; + + public BundleAwareRowDataFileWriter( + FileIO fileIO, + FileWriterContext context, + Path path, + RowType writeSchema, + long schemaId, + Supplier seqNumCounterSupplier, + FileIndexOptions fileIndexOptions, + FileSource fileSource, + boolean asyncFileWrite, + boolean statsDenseStore, + boolean isExternalPath, + @Nullable List writeCols) { + super( + fileIO, + context, + path, + writeSchema, + schemaId, + seqNumCounterSupplier, + fileIndexOptions, + fileSource, + asyncFileWrite, + statsDenseStore, + isExternalPath, + writeCols); + this.supportsBundlePassThrough = supportsBundleWrite(); + } + + @Override + public boolean supportsBundlePassThrough() { + return supportsBundlePassThrough; + } + + @Override + public void writeBundle(BundleRecords bundle) throws IOException { + if (!(bundle instanceof ReplayableBundleRecords)) { + for (InternalRow row : bundle) { + write(row); + } + return; + } + + writeReplayableBundle((ReplayableBundleRecords) bundle); + } + + @Override + public void writeReplayableBundle(ReplayableBundleRecords bundle) throws IOException { + if (!supportsBundlePassThrough) { + for (InternalRow row : bundle) { + write(row); + } + return; + } + + try { + super.writeBundle(bundle); + // Dedicated-format fan-out only forwards replayable bundles here, so row-level side + // effects can safely replay the same logical rows after the format writer consumes the + // bundle. + for (InternalRow row : bundle) { + recordRowWrite(row); + } + } catch (Throwable e) { + abort(); + throw e; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BundleAwareRowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/BundleAwareRowDataRollingFileWriter.java new file mode 100644 index 000000000000..e8e8ca9391a4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/BundleAwareRowDataRollingFileWriter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.ReplayableBundleRecords; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.io.RollingFileWriterImpl; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.RowType; + +import java.util.function.Supplier; + +/** Rolling writer used by dedicated-format bundle pass-through paths. */ +class BundleAwareRowDataRollingFileWriter extends RollingFileWriterImpl + implements BundlePassThroughWriter { + + private final boolean supportsBundlePassThrough; + + public BundleAwareRowDataRollingFileWriter( + Supplier writerFactory, + boolean supportsBundlePassThrough, + long targetFileSize) { + super(writerFactory, targetFileSize); + this.supportsBundlePassThrough = supportsBundlePassThrough; + } + + @Override + public boolean supportsBundlePassThrough() { + return supportsBundlePassThrough; + } + + @Override + public void writeReplayableBundle(ReplayableBundleRecords bundle) throws java.io.IOException { + writeBundle(bundle); + } + + @VisibleForTesting + public static boolean supportsBundlePassThrough( + FileFormat fileFormat, + RowType rowType, + SimpleColStatsCollector.Factory[] statsCollectors) { + return !RollingFileWriter.createStatsProducer(fileFormat, rowType, statsCollectors) + .requirePerRecord(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BundlePassThroughWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/BundlePassThroughWriter.java new file mode 100644 index 000000000000..1f2d27d3e22d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/BundlePassThroughWriter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.io.ReplayableBundleRecords; + +import java.io.IOException; + +/** + * Internal contract for writers that can safely accept replayable bundle pass-through from + * projection wrappers. + * + *

The support is instance-specific because it may depend on runtime configuration such as + * statistics collection mode. + */ +interface BundlePassThroughWriter { + + boolean supportsBundlePassThrough(); + + void writeReplayableBundle(ReplayableBundleRecords bundle) throws IOException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java index a6eef0eaeb3c..78068c54848e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java @@ -26,10 +26,8 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.FileWriterAbortExecutor; +import org.apache.paimon.io.ReplayableBundleRecords; import org.apache.paimon.io.RollingFileWriter; -import org.apache.paimon.io.RollingFileWriterImpl; -import org.apache.paimon.io.RowDataFileWriter; -import org.apache.paimon.io.SingleFileWriter; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.operation.BlobFileContext; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -103,13 +101,12 @@ public class DedicatedFormatRollingFileWriter private static final long CHECK_ROLLING_RECORD_CNT = 1000L; // Core components - private final Supplier< - ProjectedFileWriter, DataFileMeta>> + private final RowType writeSchema; + private final Supplier> writerFactory; private final @Nullable Supplier blobWriterFactory; private final @Nullable Supplier< - ProjectedFileWriter< - RollingFileWriterImpl, List>> + ProjectedFileWriter>> vectorStoreWriterFactory; private final long targetFileSize; private final @Nullable ExternalStorageBlobWriter externalStorageBlobWriter; @@ -118,11 +115,9 @@ public class DedicatedFormatRollingFileWriter private final List closedWriters; private final List results; - private ProjectedFileWriter, DataFileMeta> - currentWriter; + private ProjectedFileWriter currentWriter; private MultipleBlobFileWriter blobWriter; - private ProjectedFileWriter< - RollingFileWriterImpl, List> + private ProjectedFileWriter> vectorStoreWriter; private long recordCount = 0; private boolean closed = false; @@ -145,6 +140,7 @@ public DedicatedFormatRollingFileWriter( boolean statsDenseStore, @Nullable BlobFileContext context) { // Initialize basic fields + this.writeSchema = writeSchema; this.targetFileSize = targetFileSize; this.results = new ArrayList<>(); this.closedWriters = new ArrayList<>(); @@ -249,8 +245,7 @@ public DedicatedFormatRollingFileWriter( } /** Creates a factory for normal data writers. */ - private static Supplier< - ProjectedFileWriter, DataFileMeta>> + private static Supplier> createNormalWriterFactory( FileIO fileIO, long schemaId, @@ -270,8 +265,8 @@ public DedicatedFormatRollingFileWriter( int[] projectionNormalFields = writeSchema.projectIndexes(normalColumnNames); return () -> { - RowDataFileWriter rowDataFileWriter = - new RowDataFileWriter( + BundleAwareRowDataFileWriter rowDataFileWriter = + new BundleAwareRowDataFileWriter( fileIO, RollingFileWriter.createFileWriterContext( fileFormat, @@ -293,8 +288,7 @@ public DedicatedFormatRollingFileWriter( } /** Creates a vector-store writer for handling vector-store data. */ - private static ProjectedFileWriter< - RollingFileWriterImpl, List> + private static ProjectedFileWriter> createVectorStoreWriter( FileIO fileIO, long schemaId, @@ -313,10 +307,13 @@ public DedicatedFormatRollingFileWriter( List vectorStoreColumnNames = vectorStoreRowType.getFieldNames(); int[] vectorStoreProjection = writeSchema.projectIndexes(vectorStoreColumnNames); String vectorFormat = vectorFileFormat.getFormatIdentifier(); + final boolean supportsBundlePassThrough = + BundleAwareRowDataRollingFileWriter.supportsBundlePassThrough( + vectorFileFormat, vectorStoreRowType, statsCollectors); return new ProjectedFileWriter<>( - new RollingFileWriterImpl<>( + new BundleAwareRowDataRollingFileWriter( () -> - new RowDataFileWriter( + new BundleAwareRowDataFileWriter( fileIO, RollingFileWriter.createFileWriterContext( vectorFileFormat, @@ -333,6 +330,7 @@ public DedicatedFormatRollingFileWriter( statsDenseStore, pathFactory.isExternalPath(), vectorStoreColumnNames), + supportsBundlePassThrough, targetFileSize), vectorStoreProjection); } @@ -353,15 +351,7 @@ public void write(InternalRow row) throws IOException { ? externalStorageBlobWriter.transformRow(row) : row; - if (currentWriter == null) { - currentWriter = writerFactory.get(); - } - if ((blobWriter == null) && (blobWriterFactory != null)) { - blobWriter = blobWriterFactory.get(); - } - if ((vectorStoreWriter == null) && (vectorStoreWriterFactory != null)) { - vectorStoreWriter = vectorStoreWriterFactory.get(); - } + openWritersIfNeeded(); if (blobWriter != null) { blobWriter.write(transformedRow); } @@ -371,7 +361,7 @@ public void write(InternalRow row) throws IOException { currentWriter.write(transformedRow); recordCount++; - if (rollingFile()) { + if (rollingFile(false)) { closeCurrentWriter(); } } catch (Throwable e) { @@ -388,16 +378,48 @@ private void handleWriteException(Throwable e) { } /** - * Writes a bundle of records by iterating through each row. + * Writes a bundle of records while preserving bundle semantics for dedicated child writers. * * @param bundle The bundle of records to write * @throws IOException if writing fails */ @Override public void writeBundle(BundleRecords bundle) throws IOException { - // TODO: support bundle projection - for (InternalRow row : bundle) { - write(row); + if (bundle.rowCount() == 0) { + return; + } + + if (externalStorageBlobWriter != null) { + for (InternalRow row : bundle) { + write(row); + } + return; + } + + // Dedicated fan-out reuses the same logical bundle for normal/blob/vector writers, but + // only explicitly replayable bundles can be forwarded safely without copying. + ReplayableBundleRecords replayableBundle = + bundle instanceof ReplayableBundleRecords + ? (ReplayableBundleRecords) bundle + : MaterializedBundleRecords.from(bundle, writeSchema); + + try { + openWritersIfNeeded(); + if (blobWriter != null) { + blobWriter.writeBundle(replayableBundle); + } + if (vectorStoreWriter != null) { + vectorStoreWriter.writeBundle(replayableBundle); + } + currentWriter.writeBundle(replayableBundle); + recordCount += replayableBundle.rowCount(); + + if (rollingFile(true)) { + closeCurrentWriter(); + } + } catch (Throwable e) { + handleWriteException(e); + throw e; } } @@ -438,10 +460,24 @@ public void abort() { } /** Checks if the current file should be rolled based on size and record count. */ - private boolean rollingFile() throws IOException { + private void openWritersIfNeeded() { + if (currentWriter == null) { + currentWriter = writerFactory.get(); + } + if ((blobWriter == null) && (blobWriterFactory != null)) { + blobWriter = blobWriterFactory.get(); + } + if ((vectorStoreWriter == null) && (vectorStoreWriterFactory != null)) { + vectorStoreWriter = vectorStoreWriterFactory.get(); + } + } + + /** Checks if the current file should be rolled based on size and record count. */ + private boolean rollingFile(boolean forceCheck) throws IOException { return currentWriter .writer() - .reachTargetSize(recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize); + .reachTargetSize( + forceCheck || recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MaterializedBundleRecords.java b/paimon-core/src/main/java/org/apache/paimon/append/MaterializedBundleRecords.java new file mode 100644 index 000000000000..1d6e817f08bb --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/MaterializedBundleRecords.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.io.ReplayableBundleRecords; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Replayable {@link BundleRecords} backed by copied rows. + * + *

This is used by dedicated-format fan-out so multiple child writers can consume the same + * logical bundle without strengthening the public {@link BundleRecords} contract. + */ +class MaterializedBundleRecords implements ReplayableBundleRecords { + + private final List rows; + + private MaterializedBundleRecords(List rows) { + this.rows = rows; + } + + static MaterializedBundleRecords from(BundleRecords bundle, RowType rowType) { + InternalRowSerializer serializer = InternalSerializers.create(rowType); + List rows = + new ArrayList<>((int) Math.min(bundle.rowCount(), Integer.MAX_VALUE)); + for (InternalRow row : bundle) { + rows.add(serializer.copy(row)); + } + return new MaterializedBundleRecords(rows); + } + + @Override + public Iterator iterator() { + return rows.iterator(); + } + + @Override + public long rowCount() { + return rows.size(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java index 93075b203515..83e512e0f331 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java @@ -25,10 +25,8 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.ReplayableBundleRecords; import org.apache.paimon.io.RollingFileWriter; -import org.apache.paimon.io.RollingFileWriterImpl; -import org.apache.paimon.io.RowDataFileWriter; -import org.apache.paimon.io.SingleFileWriter; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.NoneSimpleColStatsCollector; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -69,20 +67,21 @@ public MultipleBlobFileWriter( for (String blobFieldName : blobRowType.getFieldNames()) { BlobFileFormat blobFileFormat = new BlobFileFormat(); blobFileFormat.setWriteConsumer(blobConsumer); + RowType projectedType = writeSchema.project(blobFieldName); + SimpleColStatsCollector.Factory[] statsCollectors = + new SimpleColStatsCollector.Factory[] {NoneSimpleColStatsCollector::new}; blobWriters.add( new BlobProjectedFileWriter( () -> - new RowDataFileWriter( + new BundleAwareRowDataFileWriter( fileIO, RollingFileWriter.createFileWriterContext( blobFileFormat, - writeSchema.project(blobFieldName), - new SimpleColStatsCollector.Factory[] { - NoneSimpleColStatsCollector::new - }, + projectedType, + statsCollectors, "none"), pathFactory.newBlobPath(), - writeSchema.project(blobFieldName), + projectedType, schemaId, seqNumCounterSupplier, new FileIndexOptions(), @@ -91,6 +90,8 @@ public MultipleBlobFileWriter( statsDenseStore, pathFactory.isExternalPath(), singletonList(blobFieldName)), + BundleAwareRowDataRollingFileWriter.supportsBundlePassThrough( + blobFileFormat, projectedType, statsCollectors), targetFileSize, writeSchema.projectIndexes(singletonList(blobFieldName)))); } @@ -102,6 +103,12 @@ public void write(InternalRow row) throws IOException { } } + public void writeBundle(ReplayableBundleRecords bundle) throws IOException { + for (BlobProjectedFileWriter blobWriter : blobWriters) { + blobWriter.writeBundle(bundle); + } + } + public void abort() { for (BlobProjectedFileWriter blobWriter : blobWriters) { blobWriter.abort(); @@ -124,13 +131,16 @@ public List result() throws IOException { } private static class BlobProjectedFileWriter - extends ProjectedFileWriter< - RollingFileWriterImpl, List> { + extends ProjectedFileWriter> { public BlobProjectedFileWriter( - Supplier> writerFactory, + Supplier writerFactory, + boolean supportsBundlePassThrough, long targetFileSize, int[] projection) { - super(new RollingFileWriterImpl<>(writerFactory, targetFileSize), projection); + super( + new BundleAwareRowDataRollingFileWriter( + writerFactory, supportsBundlePassThrough, targetFileSize), + projection); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/ProjectedBundleRecords.java b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedBundleRecords.java new file mode 100644 index 000000000000..220b54d31bbe --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedBundleRecords.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.ReplayableBundleRecords; +import org.apache.paimon.utils.ProjectedRow; + +import java.util.Iterator; + +/** Bundle wrapper which applies projection lazily during iteration. */ +class ProjectedBundleRecords implements ReplayableBundleRecords { + + private final ReplayableBundleRecords bundle; + private final int[] projection; + + ProjectedBundleRecords(ReplayableBundleRecords bundle, int[] projection) { + this.bundle = bundle; + this.projection = projection; + } + + @Override + public Iterator iterator() { + final Iterator iterator = bundle.iterator(); + final ProjectedRow projectedRow = ProjectedRow.from(projection); + return new Iterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public InternalRow next() { + return projectedRow.replaceRow(iterator.next()); + } + }; + } + + @Override + public long rowCount() { + return bundle.rowCount(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java index 20679866220f..c07ce758d6a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/ProjectedFileWriter.java @@ -19,10 +19,14 @@ package org.apache.paimon.append; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.FileWriter; +import org.apache.paimon.io.ProjectableBundleRecords; +import org.apache.paimon.io.ReplayableBundleRecords; import org.apache.paimon.utils.ProjectedRow; import java.io.IOException; +import java.util.Arrays; /** * A delegating {@link FileWriter} which applies a field projection to each incoming {@link @@ -35,11 +39,13 @@ public class ProjectedFileWriter, R> implements FileWriter { private final T writer; + private final int[] projection; private final ProjectedRow projectedRow; public ProjectedFileWriter(T writer, int[] projection) { this.writer = writer; - this.projectedRow = ProjectedRow.from(projection); + this.projection = Arrays.copyOf(projection, projection.length); + this.projectedRow = ProjectedRow.from(this.projection); } @Override @@ -48,6 +54,24 @@ public void write(InternalRow record) throws IOException { writer.write(projectedRow); } + public void writeBundle(BundleRecords bundle) throws IOException { + if (writer instanceof BundlePassThroughWriter + && ((BundlePassThroughWriter) writer).supportsBundlePassThrough() + && bundle instanceof ReplayableBundleRecords) { + ReplayableBundleRecords projectedBundle = + bundle instanceof ProjectableBundleRecords + ? ((ProjectableBundleRecords) bundle).project(projection) + : new ProjectedBundleRecords( + (ReplayableBundleRecords) bundle, projection); + ((BundlePassThroughWriter) writer).writeReplayableBundle(projectedBundle); + return; + } + + for (InternalRow row : bundle) { + write(row); + } + } + @Override public long recordCount() { return writer.recordCount(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 7f8715ab0846..fc4e35fa3b22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -81,6 +81,10 @@ public RowDataFileWriter( @Override public void write(InternalRow row) throws IOException { super.write(row); + recordRowWrite(row); + } + + protected final void recordRowWrite(InternalRow row) throws IOException { // add row to index if needed if (dataFileIndexWriter != null) { dataFileIndexWriter.write(row); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java index 773ca08a815f..0195742340bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java @@ -66,13 +66,21 @@ public void write(T record) throws IOException { @Override public void writeBundle(BundleRecords bundle) throws IOException { - if (statsRequirePerRecord) { + validateBundleWrite(); + super.writeBundle(bundle); + } + + protected final boolean supportsBundleWrite() { + return !statsRequirePerRecord; + } + + protected final void validateBundleWrite() { + if (!supportsBundleWrite()) { throw new IllegalArgumentException( String.format( "Can't write bundle for %s, we may lose all the statistical information.", statsProducer.getClass().getName())); } - super.writeBundle(bundle); } public SimpleColStats[] fieldStats(long fileSize) throws IOException { diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BundleAwareRowDataRollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BundleAwareRowDataRollingFileWriterTest.java new file mode 100644 index 000000000000..37b6077b4ff6 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/BundleAwareRowDataRollingFileWriterTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.ReplayableBundleRecords; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.options.Options; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.LongCounter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BundleAwareRowDataRollingFileWriter}. */ +public class BundleAwareRowDataRollingFileWriterTest { + + private static final RowType SCHEMA = + RowType.of(new DataType[] {new IntType()}, new String[] {"id"}); + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testWriteBundlePreservesSequenceSideEffects() throws IOException { + FileFormat fileFormat = FileFormat.fromIdentifier("parquet", new Options()); + SimpleColStatsCollector.Factory[] statsCollectors = + SimpleColStatsCollector.createFullStatsFactories(SCHEMA.getFieldCount()); + LongCounter seqNumCounter = new LongCounter(); + DataFilePathFactory pathFactory = + new DataFilePathFactory( + new Path(tempDir + "/bucket-0"), + CoreOptions.FILE_FORMAT.defaultValue().toString(), + CoreOptions.DATA_FILE_PREFIX.defaultValue(), + CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), + CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); + + BundleAwareRowDataRollingFileWriter writer = + new BundleAwareRowDataRollingFileWriter( + () -> + new BundleAwareRowDataFileWriter( + LocalFileIO.create(), + RollingFileWriter.createFileWriterContext( + fileFormat, + SCHEMA, + statsCollectors, + CoreOptions.FILE_COMPRESSION.defaultValue()), + pathFactory.newPath(), + SCHEMA, + 0L, + () -> seqNumCounter, + new FileIndexOptions(), + FileSource.APPEND, + false, + false, + false, + null), + BundleAwareRowDataRollingFileWriter.supportsBundlePassThrough( + fileFormat, SCHEMA, statsCollectors), + 1024L * 1024L); + + writer.writeBundle( + new ReplayableTestBundleRecords( + Arrays.asList( + GenericRow.of(1), GenericRow.of(2), GenericRow.of(3)))); + writer.close(); + + DataFileMeta file = writer.result().get(0); + assertThat(file.rowCount()).isEqualTo(3); + assertThat(file.minSequenceNumber()).isEqualTo(0); + assertThat(file.maxSequenceNumber()).isEqualTo(2); + assertThat(seqNumCounter.getValue()).isEqualTo(3); + } + + private static class ReplayableTestBundleRecords implements ReplayableBundleRecords { + + private final List rows; + + private ReplayableTestBundleRecords(List rows) { + this.rows = rows; + } + + @Override + public Iterator iterator() { + return rows.iterator(); + } + + @Override + public long rowCount() { + return rows.size(); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterTest.java index c0bd2814ce45..a8f1d4e81b09 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterTest.java @@ -43,9 +43,11 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.nio.file.Files; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -146,8 +148,80 @@ public void testMultipleWrites() throws IOException { } @Test - public void testBundleWriting() throws IOException { - // Create a bundle of records + public void testBundleWritingPreservesMainFileIndexSideEffects() throws IOException { + Options options = new Options(); + options.set("file-index.bloom-filter.columns", "f0"); + options.set("file-index.in-manifest-threshold", "1 MB"); + CoreOptions coreOptions = new CoreOptions(options); + writer = + new DedicatedFormatRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("parquet", new Options()), + null, + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + SCHEMA, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(coreOptions), + new FileIndexOptions(coreOptions), + FileSource.APPEND, + false, + BlobFileContext.create(SCHEMA, coreOptions)); + + List rows = + Arrays.asList( + GenericRow.of( + 1, BinaryString.fromString("test1"), new BlobData(testBlobData)), + GenericRow.of( + 2, BinaryString.fromString("test2"), new BlobData(testBlobData)), + GenericRow.of( + 3, BinaryString.fromString("test3"), new BlobData(testBlobData))); + + writer.writeBundle(new SingleUseBundleRecords(rows)); + writer.close(); + + DataFileMeta mainFile = + writer.result().stream() + .filter(file -> "parquet".equals(file.fileFormat())) + .findFirst() + .get(); + + assertThat(mainFile.rowCount()).isEqualTo(rows.size()); + assertThat(mainFile.embeddedIndex()).isNotNull(); + assertThat(mainFile.embeddedIndex()).isNotEmpty(); + assertThat(mainFile.extraFiles()).isEmpty(); + } + + @Test + public void testBundleWritingWithExternalStorageFallback() throws IOException { + Options options = new Options(); + options.set(CoreOptions.BLOB_DESCRIPTOR_FIELD, "f2"); + options.set(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD, "f2"); + java.nio.file.Path externalStoragePath = tempDir.resolve("external-storage-blob-path"); + options.set(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH, externalStoragePath.toString()); + writer = + new DedicatedFormatRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("parquet", new Options()), + null, + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + SCHEMA, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(options)), + new FileIndexOptions(), + FileSource.APPEND, + false, + BlobFileContext.create(SCHEMA, new CoreOptions(options))); + List rows = Arrays.asList( GenericRow.of( @@ -157,10 +231,17 @@ public void testBundleWriting() throws IOException { GenericRow.of( 3, BinaryString.fromString("test3"), new BlobData(testBlobData))); - // Write bundle - writer.writeBundle(new TestBundleRecords(rows)); + writer.writeBundle(new SingleUseBundleRecords(rows)); + writer.close(); + List metasResult = writer.result(); assertThat(writer.recordCount()).isEqualTo(3); + assertThat(metasResult).hasSize(1); + assertThat(metasResult.get(0).rowCount()).isEqualTo(3); + assertThat(Files.exists(externalStoragePath)).isTrue(); + try (Stream stream = Files.list(externalStoragePath)) { + assertThat(stream.anyMatch(p -> p.getFileName().toString().endsWith(".blob"))).isTrue(); + } } @Test @@ -494,18 +575,21 @@ void testSequenceNumberIncrementInBlobWritePath() throws IOException { @Test void testSequenceNumberIncrementInBlobWritePathBatch() throws IOException { - // Write multiple rows as a batch and verify sequence-number continuity in blob files int numRows = 10; + List rows = new java.util.ArrayList<>(); for (int i = 0; i < numRows; i++) { - InternalRow row = + rows.add( GenericRow.of( - i, BinaryString.fromString("test" + i), new BlobData(testBlobData)); - writer.write(row); + i, BinaryString.fromString("test" + i), new BlobData(testBlobData))); } + writer.writeBundle(new SingleUseBundleRecords(rows)); writer.close(); List metasResult = writer.result(); + assertThat(metasResult).hasSize(2); + assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(1).rowCount()); + // Extract blob files (skip the first normal file) List blobFiles = metasResult.stream() @@ -616,16 +700,21 @@ void testBlobStatsSchemaWithCustomColumnName() throws IOException { assertThat(writer.recordCount()).isEqualTo(3); } - /** Simple implementation of BundleRecords for testing. */ - private static class TestBundleRecords implements BundleRecords { + /** Bundle implementation that can only be iterated once. */ + private static class SingleUseBundleRecords implements BundleRecords { private final List rows; + private boolean iterated; - public TestBundleRecords(List rows) { + private SingleUseBundleRecords(List rows) { this.rows = rows; } @Override public java.util.Iterator iterator() { + if (iterated) { + throw new IllegalStateException("Bundle should only be consumed once."); + } + iterated = true; return rows.iterator(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterVectorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterVectorTest.java index 826d2c0c79d7..b3f695aad95e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterVectorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/DedicatedFormatRollingFileWriterVectorTest.java @@ -29,6 +29,7 @@ import org.apache.paimon.format.blob.BlobFileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileSource; @@ -47,6 +48,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Random; @@ -172,6 +174,20 @@ public void testVectorTargetFileSize() throws Exception { assertThat(writer.recordCount()).isEqualTo(rowNum); } + @Test + public void testBundleWriting() throws Exception { + List rows = makeRows(8, 10); + writer.writeBundle(new SingleUseBundleRecords(rows)); + writer.close(); + List metasResult = writer.result(); + + assertThat(writer.recordCount()).isEqualTo(rows.size()); + assertThat(metasResult).hasSize(3); + assertThat(metasResult.get(0).rowCount()).isEqualTo(rows.size()); + assertThat(metasResult.get(1).rowCount()).isEqualTo(rows.size()); + assertThat(metasResult.get(2).rowCount()).isEqualTo(rows.size()); + } + @Test void testVectorStoreFileNameFormatWithSharedUuid() throws Exception { // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files @@ -310,4 +326,28 @@ private List makeRows(int rowNum, int blobDataSize) { } return rows; } + + private static class SingleUseBundleRecords implements BundleRecords { + + private final List rows; + private boolean iterated; + + private SingleUseBundleRecords(List rows) { + this.rows = rows; + } + + @Override + public Iterator iterator() { + if (iterated) { + throw new IllegalStateException("Bundle should only be consumed once."); + } + iterated = true; + return rows.iterator(); + } + + @Override + public long rowCount() { + return rows.size(); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/ProjectedFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/ProjectedFileWriterTest.java new file mode 100644 index 000000000000..cccbf77ac2cd --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/ProjectedFileWriterTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.arrow.ArrowBundleRecords; +import org.apache.paimon.arrow.vector.ArrowFormatWriter; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.io.FileWriter; +import org.apache.paimon.io.ProjectableBundleRecords; +import org.apache.paimon.io.ReplayableBundleRecords; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ProjectedFileWriter}. */ +public class ProjectedFileWriterTest { + + @Test + public void testWriteBundlePassThroughForReplayableBundle() throws IOException { + RecordingWriter writer = new RecordingWriter(true, true); + ProjectedFileWriter> projectedWriter = + new ProjectedFileWriter<>(writer, new int[] {0, 2}); + + projectedWriter.writeBundle( + new ReplayableTestBundleRecords( + Arrays.asList( + GenericRow.of(1, 2, 3), GenericRow.of(4, 5, 6)))); + + assertThat(writer.bundleWriteCount).isEqualTo(1); + assertThat(writer.rowWriteCount).isEqualTo(0); + assertThat(writer.firstPassRows).containsExactly("1,3", "4,6"); + assertThat(writer.secondPassRows).containsExactly("1,3", "4,6"); + } + + @Test + public void testWriteBundlePassThroughForProjectableArrowBundle() throws IOException { + RecordingWriter writer = new RecordingWriter(true, true); + ProjectedFileWriter> projectedWriter = + new ProjectedFileWriter<>(writer, new int[] {0, 2}); + RowType rowType = + RowType.builder() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.INT()) + .field("f2", DataTypes.INT()) + .build(); + + try (ArrowFormatWriter arrowWriter = new ArrowFormatWriter(rowType, 16, true)) { + arrowWriter.write(GenericRow.of(1, 2, 3)); + arrowWriter.write(GenericRow.of(4, 5, 6)); + arrowWriter.flush(); + + projectedWriter.writeBundle( + new ArrowBundleRecords(arrowWriter.getVectorSchemaRoot(), rowType, true)); + } + + assertThat(writer.bundleWriteCount).isEqualTo(1); + assertThat(writer.rowWriteCount).isEqualTo(0); + assertThat(writer.projectableBundleWriteCount).isEqualTo(1); + assertThat(writer.arrowBundleWriteCount).isEqualTo(1); + assertThat(writer.firstPassRows).containsExactly("1,3", "4,6"); + assertThat(writer.secondPassRows).containsExactly("1,3", "4,6"); + } + + @Test + public void testWriteBundleFallbackToRowWritesWhenWriterDoesNotSupportPassThrough() + throws IOException { + RecordingWriter writer = new RecordingWriter(false); + ProjectedFileWriter> projectedWriter = + new ProjectedFileWriter<>(writer, new int[] {0, 2}); + + projectedWriter.writeBundle( + new ReplayableTestBundleRecords( + Arrays.asList( + GenericRow.of(1, 2, 3), GenericRow.of(4, 5, 6)))); + + assertThat(writer.bundleWriteCount).isEqualTo(0); + assertThat(writer.rowWriteCount).isEqualTo(2); + assertThat(writer.firstPassRows).containsExactly("1,3", "4,6"); + } + + @Test + public void testWriteBundleFallbackToRowWritesWhenBundleIsNotReplayable() throws IOException { + RecordingWriter writer = new RecordingWriter(true, true); + ProjectedFileWriter> projectedWriter = + new ProjectedFileWriter<>(writer, new int[] {0, 2}); + + projectedWriter.writeBundle( + new SingleUseBundleRecords( + Arrays.asList( + GenericRow.of(1, 2, 3), GenericRow.of(4, 5, 6)))); + + assertThat(writer.bundleWriteCount).isEqualTo(0); + assertThat(writer.rowWriteCount).isEqualTo(2); + assertThat(writer.firstPassRows).containsExactly("1,3", "4,6"); + assertThat(writer.secondPassRows).isEmpty(); + } + + private static class RecordingWriter + implements FileWriter>, BundlePassThroughWriter { + + private final boolean supportsBundlePassThrough; + private final boolean iterateBundleTwice; + private final List firstPassRows = new ArrayList<>(); + private final List secondPassRows = new ArrayList<>(); + private int bundleWriteCount; + private int projectableBundleWriteCount; + private int arrowBundleWriteCount; + private int rowWriteCount; + + private RecordingWriter(boolean supportsBundlePassThrough) { + this(supportsBundlePassThrough, false); + } + + private RecordingWriter(boolean supportsBundlePassThrough, boolean iterateBundleTwice) { + this.supportsBundlePassThrough = supportsBundlePassThrough; + this.iterateBundleTwice = iterateBundleTwice; + } + + @Override + public boolean supportsBundlePassThrough() { + return supportsBundlePassThrough; + } + + @Override + public void writeReplayableBundle(ReplayableBundleRecords bundle) { + bundleWriteCount++; + if (bundle instanceof ProjectableBundleRecords) { + projectableBundleWriteCount++; + } + if (bundle instanceof ArrowBundleRecords) { + arrowBundleWriteCount++; + } + for (InternalRow row : bundle) { + firstPassRows.add(format(row)); + } + if (iterateBundleTwice) { + for (InternalRow row : bundle) { + secondPassRows.add(format(row)); + } + } + } + + @Override + public void write(InternalRow record) { + rowWriteCount++; + firstPassRows.add(format(record)); + } + + @Override + public long recordCount() { + return firstPassRows.size(); + } + + @Override + public void abort() {} + + @Override + public List result() { + return firstPassRows; + } + + @Override + public void close() {} + + private String format(InternalRow row) { + return row.getInt(0) + "," + row.getInt(1); + } + } + + private static class ReplayableTestBundleRecords implements ReplayableBundleRecords { + + private final List rows; + + private ReplayableTestBundleRecords(List rows) { + this.rows = rows; + } + + @Override + public Iterator iterator() { + return rows.iterator(); + } + + @Override + public long rowCount() { + return rows.size(); + } + } + + private static class SingleUseBundleRecords implements BundleRecords { + + private final List rows; + private boolean iterated; + + private SingleUseBundleRecords(List rows) { + this.rows = rows; + } + + @Override + public Iterator iterator() { + if (iterated) { + throw new IllegalStateException("Bundle should only be consumed once."); + } + iterated = true; + return rows.iterator(); + } + + @Override + public long rowCount() { + return rows.size(); + } + } +}