Skip to content

feat: implement readStream method for streaming file reads #161

Open
lohanidamodar wants to merge 2 commits intomainfrom
feat-streaming
Open

feat: implement readStream method for streaming file reads #161
lohanidamodar wants to merge 2 commits intomainfrom
feat-streaming

Conversation

@lohanidamodar
Copy link
Contributor

@lohanidamodar lohanidamodar commented Mar 12, 2026

feat: implement readStream method for streaming file reads in Local and S3 devices with tests

Summary by CodeRabbit

  • New Features

    • Streaming read support added to storage drivers for efficient large-file transfers.
    • Supports offset and length parameters for partial reads.
    • Implemented for both local and cloud storage backends.
  • Tests

    • Comprehensive tests covering streaming reads, offsets/lengths, non-existent files, and large-file chunking/consistency.

@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

Walkthrough

Adds streaming read support to the storage layer by introducing an abstract method readStream(string $path, int $offset = 0, int $length = -1): \Generator on the Device base class. Implements readStream in the Local driver (binary file open, seek, yield ~2MB chunks, handle offset/length, close handle, throw NotFoundException for missing files). Implements readStream in the S3 driver (single HTTP request with Range support, AWS Signature V4 headers, stream via curl multi, yield ~2MB chunks, record telemetry). Adds comprehensive tests for Local and S3 covering offsets, lengths, large files, errors, and parity with read().

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title 'feat: implement readStream method for streaming file reads' clearly and concisely describes the main change: adding a readStream method for streaming file reads across storage devices.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-streaming

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
tests/Storage/S3Base.php (1)

415-521: Good test coverage, but missing non-existent file test.

The tests comprehensively cover offset/length combinations and verify consistency with read(). Consider adding a test for NotFoundException when reading a non-existent file, similar to testReadStreamNonExistentFile in LocalTest.php:

🧪 Suggested additional test
public function testReadStreamNonExistentFile()
{
    $this->expectException(NotFoundException::class);
    foreach ($this->object->readStream($this->object->getPath('non-existent-file.txt')) as $chunk) {
        // should not reach here
    }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/Storage/S3Base.php` around lines 415 - 521, Add a new test that asserts
readStream() throws NotFoundException for missing files: in
tests/Storage/S3Base.php create a test method (eg.
testReadStreamNonExistentFile) that calls
$this->object->readStream($this->object->getPath('non-existent-file.txt'))
inside a foreach and uses $this->expectException(NotFoundException::class)
before iterating. Reference the existing readStream tests (e.g., testReadStream,
testReadStreamWithOffset) to place the test near them so behavior parity is
verified across adapters.
src/Storage/Device/Local.php (1)

288-291: Consider validating fseek result for robustness.

fseek() returns -1 on failure (e.g., on non-seekable streams). While this is unlikely for regular files opened with fopen, defensive checking improves reliability.

🔧 Optional improvement
         try {
             if ($offset > 0) {
-                \fseek($handle, $offset);
+                if (\fseek($handle, $offset) === -1) {
+                    throw new Exception('Failed to seek to offset '.$offset.' in file: '.$path);
+                }
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Storage/Device/Local.php` around lines 288 - 291, The fseek call on
$handle isn't checked for failure; validate its return value (fseek($handle,
$offset) !== 0) and handle errors gracefully in the method that contains this
code (e.g., the Local storage read/stream function) by throwing a meaningful
exception or returning an error result when fseek fails; reference the $handle
and fseek calls and ensure the failure path closes the handle if opened and
provides a clear error message so non-seekable streams are handled robustly.
tests/Storage/Device/LocalTest.php (1)

497-504: Static analysis flags unused $chunk variable.

The static analysis tool correctly identifies that $chunk is unused. While the intent is clear (triggering the generator), consider using iterator_to_array() for clarity:

🔧 Alternative to silence warning
     public function testReadStreamNonExistentFile()
     {
         $this->expectException(NotFoundException::class);
-        // Must iterate to trigger the generator
-        foreach ($this->object->readStream($this->object->getPath('non-existent-stream.txt')) as $chunk) {
-            // should not reach here
-        }
+        // Must iterate to trigger the generator; iterator_to_array forces consumption
+        iterator_to_array($this->object->readStream($this->object->getPath('non-existent-stream.txt')));
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/Storage/Device/LocalTest.php` around lines 497 - 504, The test method
testReadStreamNonExistentFile triggers a generator but the foreach declares an
unused $chunk which static analysis flags; replace the foreach with a call that
consumes the generator without assigning (e.g., call iterator_to_array on
$this->object->readStream($this->object->getPath('non-existent-stream.txt'))) or
iterate using an unnamed/placeholder variable to silence the warning, ensuring
you still call readStream and getPath so the generator is executed and the
NotFoundException is thrown.
src/Storage/Device/S3.php (1)

467-474: Telemetry lacks retry count tracking (inconsistent with call()).

The call() method tracks and records actual retry attempts, but readStream hardcodes 'attempts' => 0. Since readStream doesn't implement retry logic for transient errors (unlike call()), consider either:

  1. Adding retry support to readStream for consistency
  2. Documenting that streaming operations don't retry

This is acceptable for an initial implementation but worth noting for future improvements.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Storage/Device/S3.php` around lines 467 - 474, The telemetry call in
readStream hardcodes 'attempts' => 0 while call() tracks retries; update the
readStream implementation (method readStream in S3.php) to implement the same
retry loop/attempt counting used by call() (increment an attempts counter on
each retry, catch transient S3 exceptions, retry with backoff) and pass the
attempts variable into storageOperationTelemetry->record instead of 0;
alternatively, if you choose not to add retries now, update the
storageOperationTelemetry->record call in readStream to include a documented
flag like 'attempts' => null or 'retries_supported' => false and add a short
comment in readStream explaining streaming operations do not retry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/Storage/Device.php`:
- Around line 123-131: Telemetry class is missing implementation of the abstract
readStream method from Device, causing a fatal error; implement public function
readStream(string $path, int $offset = 0, int $length = -1): \Generator in the
Telemetry class and delegate to the wrapped device by yielding from
$this->underlying->readStream($path, $offset, $length); do not call measure()
around the generator (it returns immediately), so simply forward the generator
from the underlying device to satisfy the abstract signature and preserve
streaming behavior.

In `@src/Storage/Device/S3.php`:
- Around line 452-460: The code may yield an error response body chunk before
parseAndThrowS3Error runs; update the streaming/flush logic in the S3 class so
the HTTP status is validated before yielding any buffered data: after retrieving
$code via curl_getinfo($curl, CURLINFO_HTTP_CODE) call
parseAndThrowS3Error($buffer, $code) and only yield $buffer when $code < 400 (or
throw/return early when an error is detected). Apply this change around the
yield/flush section that references $buffer and parseAndThrowS3Error so
consumers never receive error XML as file content.

---

Nitpick comments:
In `@src/Storage/Device/Local.php`:
- Around line 288-291: The fseek call on $handle isn't checked for failure;
validate its return value (fseek($handle, $offset) !== 0) and handle errors
gracefully in the method that contains this code (e.g., the Local storage
read/stream function) by throwing a meaningful exception or returning an error
result when fseek fails; reference the $handle and fseek calls and ensure the
failure path closes the handle if opened and provides a clear error message so
non-seekable streams are handled robustly.

In `@src/Storage/Device/S3.php`:
- Around line 467-474: The telemetry call in readStream hardcodes 'attempts' =>
0 while call() tracks retries; update the readStream implementation (method
readStream in S3.php) to implement the same retry loop/attempt counting used by
call() (increment an attempts counter on each retry, catch transient S3
exceptions, retry with backoff) and pass the attempts variable into
storageOperationTelemetry->record instead of 0; alternatively, if you choose not
to add retries now, update the storageOperationTelemetry->record call in
readStream to include a documented flag like 'attempts' => null or
'retries_supported' => false and add a short comment in readStream explaining
streaming operations do not retry.

In `@tests/Storage/Device/LocalTest.php`:
- Around line 497-504: The test method testReadStreamNonExistentFile triggers a
generator but the foreach declares an unused $chunk which static analysis flags;
replace the foreach with a call that consumes the generator without assigning
(e.g., call iterator_to_array on
$this->object->readStream($this->object->getPath('non-existent-stream.txt'))) or
iterate using an unnamed/placeholder variable to silence the warning, ensuring
you still call readStream and getPath so the generator is executed and the
NotFoundException is thrown.

In `@tests/Storage/S3Base.php`:
- Around line 415-521: Add a new test that asserts readStream() throws
NotFoundException for missing files: in tests/Storage/S3Base.php create a test
method (eg. testReadStreamNonExistentFile) that calls
$this->object->readStream($this->object->getPath('non-existent-file.txt'))
inside a foreach and uses $this->expectException(NotFoundException::class)
before iterating. Reference the existing readStream tests (e.g., testReadStream,
testReadStreamWithOffset) to place the test near them so behavior parity is
verified across adapters.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2752e79f-be9a-42af-ab83-21a017ebe328

📥 Commits

Reviewing files that changed from the base of the PR and between b167d58 and 0ace6b9.

📒 Files selected for processing (5)
  • src/Storage/Device.php
  • src/Storage/Device/Local.php
  • src/Storage/Device/S3.php
  • tests/Storage/Device/LocalTest.php
  • tests/Storage/S3Base.php

Comment on lines +123 to +131
/**
* Read file as a stream, yielding chunks.
*
* @param string $path File path to read
* @param int $offset Byte offset to start from (0 = beginning)
* @param int $length Total bytes to read (-1 = read to end)
* @return \Generator<string> Yields string chunks
*/
abstract public function readStream(string $path, int $offset = 0, int $length = -1): \Generator;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify Telemetry class exists and lacks readStream
cat src/Storage/Device/Telemetry.php | grep -E "(class Telemetry|function readStream)"

Repository: utopia-php/storage

Length of output: 92


🏁 Script executed:

#!/bin/bash
# Read full Telemetry.php to understand its structure
echo "=== TELEMETRY CLASS ==="
cat -n src/Storage/Device/Telemetry.php

echo -e "\n=== FIND ALL DEVICE SUBCLASSES ==="
rg "class \w+ extends Device" --type php -A 1

echo -e "\n=== VERIFY ABSTRACT readStream IN DEVICE ==="
grep -n "abstract.*readStream" src/Storage/Device.php

Repository: utopia-php/storage

Length of output: 5551


🏁 Script executed:

#!/bin/bash
# Check if S3 and Local implement readStream
echo "=== S3 readStream IMPLEMENTATION ==="
rg "readStream" src/Storage/Device/S3.php -A 5

echo -e "\n=== LOCAL readStream IMPLEMENTATION ==="
rg "readStream" src/Storage/Device/Local.php -A 5

echo -e "\n=== DEVICE MAGIC METHODS ==="
rg "__call|__get" src/Storage/Device.php

echo -e "\n=== TELEMETRY INSTANTIATION IN CODEBASE ==="
rg "new Telemetry" --type php

Repository: utopia-php/storage

Length of output: 803


Critical: Telemetry class will fail at runtime due to missing readStream implementation.

The Telemetry class (in src/Storage/Device/Telemetry.php) extends Device but does not implement the new abstract readStream method. Since Telemetry is a concrete class, PHP will throw a fatal error when the class is loaded or instantiated.

Both S3 and Local (the other concrete Device subclasses) implement readStream. Add the missing implementation to Telemetry:

public function readStream(string $path, int $offset = 0, int $length = -1): \Generator
{
    // Note: Cannot use measure() directly with generators.
    // Telemetry must yield from the underlying device's generator.
    yield from $this->underlying->readStream($path, $offset, $length);
}

Note: Standard telemetry wrapping via measure() won't work for generators since the method returns immediately. Consider whether telemetry timing is needed for streaming operations, or if this delegation suffices.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Storage/Device.php` around lines 123 - 131, Telemetry class is missing
implementation of the abstract readStream method from Device, causing a fatal
error; implement public function readStream(string $path, int $offset = 0, int
$length = -1): \Generator in the Telemetry class and delegate to the wrapped
device by yielding from $this->underlying->readStream($path, $offset, $length);
do not call measure() around the generator (it returns immediately), so simply
forward the generator from the underlying device to satisfy the abstract
signature and preserve streaming behavior.

Comment on lines +452 to +460
$code = \curl_getinfo($curl, CURLINFO_HTTP_CODE);

if ($code >= 400) {
$this->parseAndThrowS3Error($buffer, $code);
}

if (\strlen($buffer) > 0) {
yield $buffer;
$buffer = '';
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Error response may be partially yielded before detection.

The HTTP status code check occurs after the streaming loop completes. If S3 returns an error response (e.g., 404), the error body may be buffered and yielded as a "chunk" before parseAndThrowS3Error is called. This could cause consumers to process error XML as file content.

Consider checking for error status earlier, or at minimum, ensure the final buffer isn't yielded when an error is detected:

🔧 Suggested fix
             $code = \curl_getinfo($curl, CURLINFO_HTTP_CODE);

             if ($code >= 400) {
+                // Don't yield the error response body as content
                 $this->parseAndThrowS3Error($buffer, $code);
             }

-            if (\strlen($buffer) > 0) {
+            if ($code < 400 && \strlen($buffer) > 0) {
                 yield $buffer;
                 $buffer = '';
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Storage/Device/S3.php` around lines 452 - 460, The code may yield an
error response body chunk before parseAndThrowS3Error runs; update the
streaming/flush logic in the S3 class so the HTTP status is validated before
yielding any buffered data: after retrieving $code via curl_getinfo($curl,
CURLINFO_HTTP_CODE) call parseAndThrowS3Error($buffer, $code) and only yield
$buffer when $code < 400 (or throw/return early when an error is detected).
Apply this change around the yield/flush section that references $buffer and
parseAndThrowS3Error so consumers never receive error XML as file content.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/Storage/S3Base.php`:
- Around line 381-394: testReadStreamLargeFile relies on an object uploaded by
sibling tests (e.g., testPartUpload/testTransferLarge) which those siblings may
delete (testTransferLarge removes the object), causing flaky failures; update
tests so each test uploads its own large fixture or centralize cleanup.
Specifically, modify testReadStreamLargeFile to upload the large_file.mp4
fixture before reading (call the existing upload helper or the part upload
routine used by testPartUpload) and then read/verify it, or alternatively remove
the deletion from testTransferLarge and perform deletions in tearDown/afterEach;
reference the test methods testReadStreamLargeFile, testTransferLarge, and
testPartUpload when making the change so the upload/cleanup behavior is
localized per-test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dfe6575f-c2ca-4fcb-ae38-33712bd5c251

📥 Commits

Reviewing files that changed from the base of the PR and between 0ace6b9 and db65d32.

📒 Files selected for processing (1)
  • tests/Storage/S3Base.php

Comment on lines +381 to +394
public function testReadStreamLargeFile($path)
{
$source = __DIR__.'/../resources/disk-a/large_file.mp4';
$expectedSize = \filesize($source);

$totalRead = 0;
foreach ($this->object->readStream($path) as $chunk) {
$totalRead += strlen($chunk);
// Each chunk should be <= 2MB
$this->assertLessThanOrEqual(2 * 1024 * 1024, strlen($chunk));
}

$this->assertEquals($expectedSize, $totalRead);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid sharing testPartUpload() output across sibling dependents.

This test now consumes the same uploaded object as testTransferLarge(), but testTransferLarge() deletes it at Line 412. If PHPUnit ever changes dependent-test ordering, this becomes flaky because $path may already be gone before Line 387 runs. Upload the large fixture per test, or move cleanup out of sibling dependents.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/Storage/S3Base.php` around lines 381 - 394, testReadStreamLargeFile
relies on an object uploaded by sibling tests (e.g.,
testPartUpload/testTransferLarge) which those siblings may delete
(testTransferLarge removes the object), causing flaky failures; update tests so
each test uploads its own large fixture or centralize cleanup. Specifically,
modify testReadStreamLargeFile to upload the large_file.mp4 fixture before
reading (call the existing upload helper or the part upload routine used by
testPartUpload) and then read/verify it, or alternatively remove the deletion
from testTransferLarge and perform deletions in tearDown/afterEach; reference
the test methods testReadStreamLargeFile, testTransferLarge, and testPartUpload
when making the change so the upload/cleanup behavior is localized per-test.

@lohanidamodar lohanidamodar requested a review from loks0n March 12, 2026 03:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant