feat: implement readStream method for streaming file reads #161
feat: implement readStream method for streaming file reads #161lohanidamodar wants to merge 2 commits intomainfrom
Conversation
…nd S3 devices with tests
WalkthroughAdds streaming read support to the storage layer by introducing an abstract method Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
tests/Storage/S3Base.php (1)
415-521: Good test coverage, but missing non-existent file test.The tests comprehensively cover offset/length combinations and verify consistency with
read(). Consider adding a test forNotFoundExceptionwhen reading a non-existent file, similar totestReadStreamNonExistentFileinLocalTest.php:🧪 Suggested additional test
public function testReadStreamNonExistentFile() { $this->expectException(NotFoundException::class); foreach ($this->object->readStream($this->object->getPath('non-existent-file.txt')) as $chunk) { // should not reach here } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/Storage/S3Base.php` around lines 415 - 521, Add a new test that asserts readStream() throws NotFoundException for missing files: in tests/Storage/S3Base.php create a test method (eg. testReadStreamNonExistentFile) that calls $this->object->readStream($this->object->getPath('non-existent-file.txt')) inside a foreach and uses $this->expectException(NotFoundException::class) before iterating. Reference the existing readStream tests (e.g., testReadStream, testReadStreamWithOffset) to place the test near them so behavior parity is verified across adapters.src/Storage/Device/Local.php (1)
288-291: Consider validatingfseekresult for robustness.
fseek()returns-1on failure (e.g., on non-seekable streams). While this is unlikely for regular files opened withfopen, defensive checking improves reliability.🔧 Optional improvement
try { if ($offset > 0) { - \fseek($handle, $offset); + if (\fseek($handle, $offset) === -1) { + throw new Exception('Failed to seek to offset '.$offset.' in file: '.$path); + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/Storage/Device/Local.php` around lines 288 - 291, The fseek call on $handle isn't checked for failure; validate its return value (fseek($handle, $offset) !== 0) and handle errors gracefully in the method that contains this code (e.g., the Local storage read/stream function) by throwing a meaningful exception or returning an error result when fseek fails; reference the $handle and fseek calls and ensure the failure path closes the handle if opened and provides a clear error message so non-seekable streams are handled robustly.tests/Storage/Device/LocalTest.php (1)
497-504: Static analysis flags unused$chunkvariable.The static analysis tool correctly identifies that
$chunkis unused. While the intent is clear (triggering the generator), consider usingiterator_to_array()for clarity:🔧 Alternative to silence warning
public function testReadStreamNonExistentFile() { $this->expectException(NotFoundException::class); - // Must iterate to trigger the generator - foreach ($this->object->readStream($this->object->getPath('non-existent-stream.txt')) as $chunk) { - // should not reach here - } + // Must iterate to trigger the generator; iterator_to_array forces consumption + iterator_to_array($this->object->readStream($this->object->getPath('non-existent-stream.txt'))); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/Storage/Device/LocalTest.php` around lines 497 - 504, The test method testReadStreamNonExistentFile triggers a generator but the foreach declares an unused $chunk which static analysis flags; replace the foreach with a call that consumes the generator without assigning (e.g., call iterator_to_array on $this->object->readStream($this->object->getPath('non-existent-stream.txt'))) or iterate using an unnamed/placeholder variable to silence the warning, ensuring you still call readStream and getPath so the generator is executed and the NotFoundException is thrown.src/Storage/Device/S3.php (1)
467-474: Telemetry lacks retry count tracking (inconsistent withcall()).The
call()method tracks and records actual retryattempts, butreadStreamhardcodes'attempts' => 0. SincereadStreamdoesn't implement retry logic for transient errors (unlikecall()), consider either:
- Adding retry support to
readStreamfor consistency- Documenting that streaming operations don't retry
This is acceptable for an initial implementation but worth noting for future improvements.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/Storage/Device/S3.php` around lines 467 - 474, The telemetry call in readStream hardcodes 'attempts' => 0 while call() tracks retries; update the readStream implementation (method readStream in S3.php) to implement the same retry loop/attempt counting used by call() (increment an attempts counter on each retry, catch transient S3 exceptions, retry with backoff) and pass the attempts variable into storageOperationTelemetry->record instead of 0; alternatively, if you choose not to add retries now, update the storageOperationTelemetry->record call in readStream to include a documented flag like 'attempts' => null or 'retries_supported' => false and add a short comment in readStream explaining streaming operations do not retry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/Storage/Device.php`:
- Around line 123-131: Telemetry class is missing implementation of the abstract
readStream method from Device, causing a fatal error; implement public function
readStream(string $path, int $offset = 0, int $length = -1): \Generator in the
Telemetry class and delegate to the wrapped device by yielding from
$this->underlying->readStream($path, $offset, $length); do not call measure()
around the generator (it returns immediately), so simply forward the generator
from the underlying device to satisfy the abstract signature and preserve
streaming behavior.
In `@src/Storage/Device/S3.php`:
- Around line 452-460: The code may yield an error response body chunk before
parseAndThrowS3Error runs; update the streaming/flush logic in the S3 class so
the HTTP status is validated before yielding any buffered data: after retrieving
$code via curl_getinfo($curl, CURLINFO_HTTP_CODE) call
parseAndThrowS3Error($buffer, $code) and only yield $buffer when $code < 400 (or
throw/return early when an error is detected). Apply this change around the
yield/flush section that references $buffer and parseAndThrowS3Error so
consumers never receive error XML as file content.
---
Nitpick comments:
In `@src/Storage/Device/Local.php`:
- Around line 288-291: The fseek call on $handle isn't checked for failure;
validate its return value (fseek($handle, $offset) !== 0) and handle errors
gracefully in the method that contains this code (e.g., the Local storage
read/stream function) by throwing a meaningful exception or returning an error
result when fseek fails; reference the $handle and fseek calls and ensure the
failure path closes the handle if opened and provides a clear error message so
non-seekable streams are handled robustly.
In `@src/Storage/Device/S3.php`:
- Around line 467-474: The telemetry call in readStream hardcodes 'attempts' =>
0 while call() tracks retries; update the readStream implementation (method
readStream in S3.php) to implement the same retry loop/attempt counting used by
call() (increment an attempts counter on each retry, catch transient S3
exceptions, retry with backoff) and pass the attempts variable into
storageOperationTelemetry->record instead of 0; alternatively, if you choose not
to add retries now, update the storageOperationTelemetry->record call in
readStream to include a documented flag like 'attempts' => null or
'retries_supported' => false and add a short comment in readStream explaining
streaming operations do not retry.
In `@tests/Storage/Device/LocalTest.php`:
- Around line 497-504: The test method testReadStreamNonExistentFile triggers a
generator but the foreach declares an unused $chunk which static analysis flags;
replace the foreach with a call that consumes the generator without assigning
(e.g., call iterator_to_array on
$this->object->readStream($this->object->getPath('non-existent-stream.txt'))) or
iterate using an unnamed/placeholder variable to silence the warning, ensuring
you still call readStream and getPath so the generator is executed and the
NotFoundException is thrown.
In `@tests/Storage/S3Base.php`:
- Around line 415-521: Add a new test that asserts readStream() throws
NotFoundException for missing files: in tests/Storage/S3Base.php create a test
method (eg. testReadStreamNonExistentFile) that calls
$this->object->readStream($this->object->getPath('non-existent-file.txt'))
inside a foreach and uses $this->expectException(NotFoundException::class)
before iterating. Reference the existing readStream tests (e.g., testReadStream,
testReadStreamWithOffset) to place the test near them so behavior parity is
verified across adapters.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2752e79f-be9a-42af-ab83-21a017ebe328
📒 Files selected for processing (5)
src/Storage/Device.phpsrc/Storage/Device/Local.phpsrc/Storage/Device/S3.phptests/Storage/Device/LocalTest.phptests/Storage/S3Base.php
| /** | ||
| * Read file as a stream, yielding chunks. | ||
| * | ||
| * @param string $path File path to read | ||
| * @param int $offset Byte offset to start from (0 = beginning) | ||
| * @param int $length Total bytes to read (-1 = read to end) | ||
| * @return \Generator<string> Yields string chunks | ||
| */ | ||
| abstract public function readStream(string $path, int $offset = 0, int $length = -1): \Generator; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify Telemetry class exists and lacks readStream
cat src/Storage/Device/Telemetry.php | grep -E "(class Telemetry|function readStream)"Repository: utopia-php/storage
Length of output: 92
🏁 Script executed:
#!/bin/bash
# Read full Telemetry.php to understand its structure
echo "=== TELEMETRY CLASS ==="
cat -n src/Storage/Device/Telemetry.php
echo -e "\n=== FIND ALL DEVICE SUBCLASSES ==="
rg "class \w+ extends Device" --type php -A 1
echo -e "\n=== VERIFY ABSTRACT readStream IN DEVICE ==="
grep -n "abstract.*readStream" src/Storage/Device.phpRepository: utopia-php/storage
Length of output: 5551
🏁 Script executed:
#!/bin/bash
# Check if S3 and Local implement readStream
echo "=== S3 readStream IMPLEMENTATION ==="
rg "readStream" src/Storage/Device/S3.php -A 5
echo -e "\n=== LOCAL readStream IMPLEMENTATION ==="
rg "readStream" src/Storage/Device/Local.php -A 5
echo -e "\n=== DEVICE MAGIC METHODS ==="
rg "__call|__get" src/Storage/Device.php
echo -e "\n=== TELEMETRY INSTANTIATION IN CODEBASE ==="
rg "new Telemetry" --type phpRepository: utopia-php/storage
Length of output: 803
Critical: Telemetry class will fail at runtime due to missing readStream implementation.
The Telemetry class (in src/Storage/Device/Telemetry.php) extends Device but does not implement the new abstract readStream method. Since Telemetry is a concrete class, PHP will throw a fatal error when the class is loaded or instantiated.
Both S3 and Local (the other concrete Device subclasses) implement readStream. Add the missing implementation to Telemetry:
public function readStream(string $path, int $offset = 0, int $length = -1): \Generator
{
// Note: Cannot use measure() directly with generators.
// Telemetry must yield from the underlying device's generator.
yield from $this->underlying->readStream($path, $offset, $length);
}Note: Standard telemetry wrapping via measure() won't work for generators since the method returns immediately. Consider whether telemetry timing is needed for streaming operations, or if this delegation suffices.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/Storage/Device.php` around lines 123 - 131, Telemetry class is missing
implementation of the abstract readStream method from Device, causing a fatal
error; implement public function readStream(string $path, int $offset = 0, int
$length = -1): \Generator in the Telemetry class and delegate to the wrapped
device by yielding from $this->underlying->readStream($path, $offset, $length);
do not call measure() around the generator (it returns immediately), so simply
forward the generator from the underlying device to satisfy the abstract
signature and preserve streaming behavior.
| $code = \curl_getinfo($curl, CURLINFO_HTTP_CODE); | ||
|
|
||
| if ($code >= 400) { | ||
| $this->parseAndThrowS3Error($buffer, $code); | ||
| } | ||
|
|
||
| if (\strlen($buffer) > 0) { | ||
| yield $buffer; | ||
| $buffer = ''; |
There was a problem hiding this comment.
Error response may be partially yielded before detection.
The HTTP status code check occurs after the streaming loop completes. If S3 returns an error response (e.g., 404), the error body may be buffered and yielded as a "chunk" before parseAndThrowS3Error is called. This could cause consumers to process error XML as file content.
Consider checking for error status earlier, or at minimum, ensure the final buffer isn't yielded when an error is detected:
🔧 Suggested fix
$code = \curl_getinfo($curl, CURLINFO_HTTP_CODE);
if ($code >= 400) {
+ // Don't yield the error response body as content
$this->parseAndThrowS3Error($buffer, $code);
}
- if (\strlen($buffer) > 0) {
+ if ($code < 400 && \strlen($buffer) > 0) {
yield $buffer;
$buffer = '';
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/Storage/Device/S3.php` around lines 452 - 460, The code may yield an
error response body chunk before parseAndThrowS3Error runs; update the
streaming/flush logic in the S3 class so the HTTP status is validated before
yielding any buffered data: after retrieving $code via curl_getinfo($curl,
CURLINFO_HTTP_CODE) call parseAndThrowS3Error($buffer, $code) and only yield
$buffer when $code < 400 (or throw/return early when an error is detected).
Apply this change around the yield/flush section that references $buffer and
parseAndThrowS3Error so consumers never receive error XML as file content.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/Storage/S3Base.php`:
- Around line 381-394: testReadStreamLargeFile relies on an object uploaded by
sibling tests (e.g., testPartUpload/testTransferLarge) which those siblings may
delete (testTransferLarge removes the object), causing flaky failures; update
tests so each test uploads its own large fixture or centralize cleanup.
Specifically, modify testReadStreamLargeFile to upload the large_file.mp4
fixture before reading (call the existing upload helper or the part upload
routine used by testPartUpload) and then read/verify it, or alternatively remove
the deletion from testTransferLarge and perform deletions in tearDown/afterEach;
reference the test methods testReadStreamLargeFile, testTransferLarge, and
testPartUpload when making the change so the upload/cleanup behavior is
localized per-test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: dfe6575f-c2ca-4fcb-ae38-33712bd5c251
📒 Files selected for processing (1)
tests/Storage/S3Base.php
| public function testReadStreamLargeFile($path) | ||
| { | ||
| $source = __DIR__.'/../resources/disk-a/large_file.mp4'; | ||
| $expectedSize = \filesize($source); | ||
|
|
||
| $totalRead = 0; | ||
| foreach ($this->object->readStream($path) as $chunk) { | ||
| $totalRead += strlen($chunk); | ||
| // Each chunk should be <= 2MB | ||
| $this->assertLessThanOrEqual(2 * 1024 * 1024, strlen($chunk)); | ||
| } | ||
|
|
||
| $this->assertEquals($expectedSize, $totalRead); | ||
| } |
There was a problem hiding this comment.
Avoid sharing testPartUpload() output across sibling dependents.
This test now consumes the same uploaded object as testTransferLarge(), but testTransferLarge() deletes it at Line 412. If PHPUnit ever changes dependent-test ordering, this becomes flaky because $path may already be gone before Line 387 runs. Upload the large fixture per test, or move cleanup out of sibling dependents.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/Storage/S3Base.php` around lines 381 - 394, testReadStreamLargeFile
relies on an object uploaded by sibling tests (e.g.,
testPartUpload/testTransferLarge) which those siblings may delete
(testTransferLarge removes the object), causing flaky failures; update tests so
each test uploads its own large fixture or centralize cleanup. Specifically,
modify testReadStreamLargeFile to upload the large_file.mp4 fixture before
reading (call the existing upload helper or the part upload routine used by
testPartUpload) and then read/verify it, or alternatively remove the deletion
from testTransferLarge and perform deletions in tearDown/afterEach; reference
the test methods testReadStreamLargeFile, testTransferLarge, and testPartUpload
when making the change so the upload/cleanup behavior is localized per-test.
feat: implement readStream method for streaming file reads in Local and S3 devices with tests
Summary by CodeRabbit
New Features
Tests