Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -474,6 +476,44 @@ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
}
}

@Override
public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final boolean recursive,
Comment thread
KKcorps marked this conversation as resolved.
final Predicate<String> pathFilter, final int maxResults)
throws IOException {
if (maxResults <= 0) {
LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning empty list", maxResults);
return new ArrayList<>();
}
LOGGER.debug("listFilesWithMetadata (paginated) is called with fileUri='{}', recursive='{}', maxResults={}",
Comment thread
KKcorps marked this conversation as resolved.
fileUri, recursive, maxResults);
final List<FileMetadata> result = new ArrayList<>();
try {
// PagedIterable fetches pages lazily; breaking out stops further API calls
for (final PathItem item : listPathItems(fileUri, recursive)) {
if (item.isDirectory()) {
continue;
}
final String filePath = AzurePinotFSUtil.convertAzureStylePathToUriStylePath(item.getName());
if (pathFilter.test(filePath)) {
result.add(new FileMetadata.Builder()
.setFilePath(filePath)
.setLastModifiedTime(item.getLastModified().toInstant().toEpochMilli())
.setLength(item.getContentLength())
.setIsDirectory(false)
.build());
if (result.size() >= maxResults) {
break;
}
}
}
} catch (DataLakeStorageException e) {
throw new IOException(e);
}
LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
result.size(), maxResults, fileUri, recursive);
return result;
}

private PagedIterable<PathItem> listPathItems(URI fileUri, boolean recursive)
throws IOException {
// Unlike other Azure SDK APIs that takes url encoded path, ListPathsOptions takes decoded url
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.filesystem.test;

import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathItem;
import java.io.IOException;
import java.net.URI;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
import org.apache.pinot.spi.filesystem.FileMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


/**
* Unit tests for ADLSGen2PinotFS.listFilesWithMetadata(URI, boolean, Predicate, int)
* — the paginated listing with lazy PagedIterable iteration and early termination.
*/
public class ADLSGen2PinotFSPaginatedListTest {

private static final Predicate<String> ACCEPT_ALL = path -> true;
private static final OffsetDateTime MTIME = OffsetDateTime.of(2026, 1, 15, 10, 30, 0, 0, ZoneOffset.UTC);

@Mock
private DataLakeFileSystemClient _mockFileSystemClient;

@SuppressWarnings("rawtypes")
@Mock
private PagedIterable _mockPagedIterable;

private ADLSGen2PinotFS _adlsPinotFS;
private AutoCloseable _mocks;

@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
_adlsPinotFS = new ADLSGen2PinotFS(_mockFileSystemClient);
}

@AfterMethod
public void tearDown() throws Exception {
_mocks.close();
}

private static PathItem mockPathItem(final String name, final boolean isDirectory,
final long contentLength) {
final PathItem item = mock(PathItem.class);
when(item.getName()).thenReturn(name);
when(item.isDirectory()).thenReturn(isDirectory);
when(item.getContentLength()).thenReturn(contentLength);
when(item.getLastModified()).thenReturn(MTIME);
return item;
}

@SuppressWarnings("unchecked")
private void setupIterator(final PathItem... items) {
when(_mockFileSystemClient.listPaths(any(), any())).thenReturn(_mockPagedIterable);
final Iterator<PathItem> iterator = Arrays.asList(items).iterator();
when(_mockPagedIterable.iterator()).thenReturn(iterator);
}

@Test
public void testAllMatch() throws IOException {
setupIterator(
mockPathItem("data/file1.parquet", false, 100),
mockPathItem("data/file2.parquet", false, 200),
mockPathItem("data/file3.parquet", false, 300)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 10);

assertEquals(result.size(), 3);
assertEquals(result.get(0).getFilePath(), "/data/file1.parquet");
assertEquals(result.get(1).getFilePath(), "/data/file2.parquet");
assertEquals(result.get(2).getFilePath(), "/data/file3.parquet");
}

@Test
public void testEarlyTermination() throws IOException {
// 5 items, maxResults=3
setupIterator(
mockPathItem("data/f1.parquet", false, 100),
mockPathItem("data/f2.parquet", false, 200),
mockPathItem("data/f3.parquet", false, 300),
mockPathItem("data/f4.parquet", false, 400),
mockPathItem("data/f5.parquet", false, 500)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 3);

assertEquals(result.size(), 3);
assertEquals(result.get(2).getFilePath(), "/data/f3.parquet");
}

@Test
public void testMaxResultsOne() throws IOException {
setupIterator(
mockPathItem("data/f1.parquet", false, 100),
mockPathItem("data/f2.parquet", false, 200)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 1);

assertEquals(result.size(), 1);
}

@Test
public void testFilterPredicate() throws IOException {
setupIterator(
mockPathItem("data/file1.parquet", false, 100),
mockPathItem("data/file2.csv", false, 200),
mockPathItem("data/file3.parquet", false, 300),
mockPathItem("data/file4.json", false, 400)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, path -> path.endsWith(".parquet"), 10);

assertEquals(result.size(), 2);
assertTrue(result.stream().allMatch(f -> f.getFilePath().endsWith(".parquet")));
}

@Test
public void testDirectoriesSkipped() throws IOException {
setupIterator(
mockPathItem("data/subdir", true, 0),
mockPathItem("data/file1.parquet", false, 100),
mockPathItem("data/anotherdir", true, 0),
mockPathItem("data/file2.parquet", false, 200)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 10);

assertEquals(result.size(), 2);
assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
}

@Test
public void testFilterRejectsAll() throws IOException {
setupIterator(
mockPathItem("data/file1.parquet", false, 100),
mockPathItem("data/file2.parquet", false, 200)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, path -> false, 10);

assertEquals(result.size(), 0);
}

@Test
public void testEmptyListing() throws IOException {
setupIterator();

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 10);

assertEquals(result.size(), 0);
}

@Test
public void testFileMetadataAttributes() throws IOException {
setupIterator(
mockPathItem("data/file.parquet", false, 4096)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 10);

assertEquals(result.size(), 1);
final FileMetadata fm = result.get(0);
assertEquals(fm.getFilePath(), "/data/file.parquet");
assertEquals(fm.getLength(), 4096L);
assertEquals(fm.getLastModifiedTime(), MTIME.toInstant().toEpochMilli());
assertFalse(fm.isDirectory());
}

@SuppressWarnings("unchecked")
@Test(expectedExceptions = IOException.class)
public void testDataLakeExceptionWrappedAsIOException() throws IOException {
when(_mockFileSystemClient.listPaths(any(), any()))
.thenThrow(mock(DataLakeStorageException.class));

_adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 10);
}

@Test
public void testMaxResultsZeroReturnsEmpty() throws IOException {
final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, 0);

assertEquals(result.size(), 0);
}

@Test
public void testMaxResultsNegativeReturnsEmpty() throws IOException {
final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, ACCEPT_ALL, -3);

assertEquals(result.size(), 0);
}

@Test
public void testEarlyTerminationWithFilterAndDirectories() throws IOException {
// Mix of dirs, matching files, and non-matching files; maxResults=2
setupIterator(
mockPathItem("data/dir1", true, 0),
mockPathItem("data/file1.csv", false, 100),
mockPathItem("data/file2.parquet", false, 200),
mockPathItem("data/dir2", true, 0),
mockPathItem("data/file3.parquet", false, 300),
mockPathItem("data/file4.parquet", false, 400)
);

final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
URI.create("abfss://container@account.dfs.core.windows.net/data/"),
true, path -> path.endsWith(".parquet"), 2);

assertEquals(result.size(), 2);
assertEquals(result.get(0).getFilePath(), "/data/file2.parquet");
assertEquals(result.get(1).getFilePath(), "/data/file3.parquet");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -258,6 +259,60 @@ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
return listedFiles;
}

@Override
public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final boolean recursive,
final Predicate<String> pathFilter, final int maxResults)
throws IOException {
if (maxResults <= 0) {
LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning empty list", maxResults);
return new ArrayList<>();
}
final List<FileMetadata> result = new ArrayList<>();
final GcsUri gcsFileUri = new GcsUri(fileUri);
final String prefix = gcsFileUri.getPrefix();
final String bucketName = gcsFileUri.getBucketName();
try {
Page<Blob> page;
if (recursive) {
page = _storage.list(bucketName, Storage.BlobListOption.prefix(prefix));
} else {
page = _storage.list(bucketName, Storage.BlobListOption.prefix(prefix),
Storage.BlobListOption.currentDirectory());
}
while (page != null && result.size() < maxResults) {
for (final Blob blob : page.getValues()) {
if (blob.getName().equals(prefix)) {
continue;
}
final boolean isDirectory = blob.getName().endsWith(GcsUri.DELIMITER);
if (isDirectory) {
continue;
}
final String filePath = GcsUri.createGcsUri(bucketName, blob.getName()).toString();
if (pathFilter.test(filePath)) {
final FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
.setFilePath(filePath)
.setLength(blob.getSize())
.setIsDirectory(false);
if (blob.getUpdateTime() != null) {
fileBuilder.setLastModifiedTime(blob.getUpdateTime());
}
result.add(fileBuilder.build());
if (result.size() >= maxResults) {
break;
}
}
}
page = page.hasNextPage() ? page.getNextPage() : null;
}
} catch (Exception t) {
throw new IOException(t);
}
LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
result.size(), maxResults, gcsFileUri, recursive);
return result;
}

@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
Expand Down
Loading
Loading