Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Storage/Device.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ abstract public function abort(string $path, string $extra = ''): bool;
*/
abstract public function read(string $path, int $offset = 0, ?int $length = null): string;

/**
* Read file as a stream, yielding chunks.
*
* @param string $path File path to read
* @param int $offset Byte offset to start from (0 = beginning)
* @param int $length Total bytes to read (-1 = read to end)
* @return \Generator<string> Yields string chunks
*/
abstract public function readStream(string $path, int $offset = 0, int $length = -1): \Generator;
Comment on lines +123 to +131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify Telemetry class exists and lacks readStream
cat src/Storage/Device/Telemetry.php | grep -E "(class Telemetry|function readStream)"

Repository: utopia-php/storage

Length of output: 92


🏁 Script executed:

#!/bin/bash
# Read full Telemetry.php to understand its structure
echo "=== TELEMETRY CLASS ==="
cat -n src/Storage/Device/Telemetry.php

echo -e "\n=== FIND ALL DEVICE SUBCLASSES ==="
rg "class \w+ extends Device" --type php -A 1

echo -e "\n=== VERIFY ABSTRACT readStream IN DEVICE ==="
grep -n "abstract.*readStream" src/Storage/Device.php

Repository: utopia-php/storage

Length of output: 5551


🏁 Script executed:

#!/bin/bash
# Check if S3 and Local implement readStream
echo "=== S3 readStream IMPLEMENTATION ==="
rg "readStream" src/Storage/Device/S3.php -A 5

echo -e "\n=== LOCAL readStream IMPLEMENTATION ==="
rg "readStream" src/Storage/Device/Local.php -A 5

echo -e "\n=== DEVICE MAGIC METHODS ==="
rg "__call|__get" src/Storage/Device.php

echo -e "\n=== TELEMETRY INSTANTIATION IN CODEBASE ==="
rg "new Telemetry" --type php

Repository: utopia-php/storage

Length of output: 803


Critical: Telemetry class will fail at runtime due to missing readStream implementation.

The Telemetry class (in src/Storage/Device/Telemetry.php) extends Device but does not implement the new abstract readStream method. Since Telemetry is a concrete class, PHP will throw a fatal error when the class is loaded or instantiated.

Both S3 and Local (the other concrete Device subclasses) implement readStream. Add the missing implementation to Telemetry:

public function readStream(string $path, int $offset = 0, int $length = -1): \Generator
{
    // Note: Cannot use measure() directly with generators.
    // Telemetry must yield from the underlying device's generator.
    yield from $this->underlying->readStream($path, $offset, $length);
}

Note: Standard telemetry wrapping via measure() won't work for generators since the method returns immediately. Consider whether telemetry timing is needed for streaming operations, or if this delegation suffices.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Storage/Device.php` around lines 123 - 131, Telemetry class is missing
implementation of the abstract readStream method from Device, causing a fatal
error; implement public function readStream(string $path, int $offset = 0, int
$length = -1): \Generator in the Telemetry class and delegate to the wrapped
device by yielding from $this->underlying->readStream($path, $offset, $length);
do not call measure() around the generator (it returns immediately), so simply
forward the generator from the underlying device to satisfy the abstract
signature and preserve streaming behavior.


/**
* Transfer
* Transfer a file from current device to destination device.
Expand Down
46 changes: 46 additions & 0 deletions src/Storage/Device/Local.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,52 @@ public function read(string $path, int $offset = 0, ?int $length = null): string
return \file_get_contents($path, use_include_path: false, context: null, offset: $offset, length: $length);
}

/**
* Read file as a stream, yielding chunks.
*/
public function readStream(string $path, int $offset = 0, int $length = -1): \Generator
{
if (! $this->exists($path)) {
throw new NotFoundException('File not found');
}

$handle = \fopen($path, 'rb');
if ($handle === false) {
throw new Exception('Failed to open file: '.$path);
}

try {
if ($offset > 0) {
\fseek($handle, $offset);
}

$chunkSize = 2 * 1024 * 1024; // 2MB
$bytesRead = 0;

while (! \feof($handle)) {
if ($length >= 0) {
$remaining = $length - $bytesRead;
if ($remaining <= 0) {
break;
}
$readSize = \min($chunkSize, $remaining);
} else {
$readSize = $chunkSize;
}

$chunk = \fread($handle, $readSize);
if ($chunk === false || $chunk === '') {
break;
}

$bytesRead += \strlen($chunk);
yield $chunk;
}
} finally {
\fclose($handle);
}
}

/**
* Write file by given path.
*/
Expand Down
125 changes: 125 additions & 0 deletions src/Storage/Device/S3.php
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,131 @@ public function read(string $path, int $offset = 0, ?int $length = null): string
return $response->body;
}

/**
* Read file as a stream, yielding chunks.
*
* Makes a single HTTP request and yields data as it arrives from the network.
*/
public function readStream(string $path, int $offset = 0, int $length = -1): \Generator
{
$startTime = microtime(true);

unset($this->amzHeaders['x-amz-acl']);
unset($this->amzHeaders['x-amz-content-sha256']);
unset($this->headers['content-type']);
$this->headers['content-md5'] = \base64_encode(md5('', true));

$uri = ($path !== '') ? '/'.\str_replace('%2F', '/', \rawurlencode($path)) : '/';

if ($length > 0) {
$end = $offset + $length - 1;
$this->headers['range'] = "bytes=$offset-$end";
} elseif ($offset > 0) {
$this->headers['range'] = "bytes=$offset-";
} else {
unset($this->headers['range']);
}

$uri = $this->getAbsolutePath($uri);
$url = $this->fqdn.$uri.'?'.\http_build_query([], '', '&', PHP_QUERY_RFC3986);

$buffer = '';
$chunkSize = 2 * 1024 * 1024; // 2MB

$curl = \curl_init();
\curl_setopt($curl, CURLOPT_USERAGENT, 'utopia-php/storage');
\curl_setopt($curl, CURLOPT_URL, $url);

$httpHeaders = [];
$this->amzHeaders['x-amz-date'] = \gmdate('Ymd\THis\Z');
$this->amzHeaders['x-amz-content-sha256'] = \hash('sha256', '');

foreach ($this->amzHeaders as $header => $value) {
if (\strlen($value) > 0) {
$httpHeaders[] = $header.': '.$value;
}
}

$this->headers['date'] = \gmdate('D, d M Y H:i:s T');

foreach ($this->headers as $header => $value) {
if (\strlen($value) > 0) {
$httpHeaders[] = $header.': '.$value;
}
}

$httpHeaders[] = 'Authorization: '.$this->getSignatureV4(self::METHOD_GET, $uri);

\curl_setopt($curl, CURLOPT_HTTPHEADER, $httpHeaders);
\curl_setopt($curl, CURLOPT_HEADER, false);
\curl_setopt($curl, CURLOPT_RETURNTRANSFER, false);
\curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true);
\curl_setopt($curl, CURLOPT_CUSTOMREQUEST, self::METHOD_GET);

if ($this->curlHttpVersion != null) {
\curl_setopt($curl, CURLOPT_HTTP_VERSION, $this->curlHttpVersion);
}

\curl_setopt($curl, CURLOPT_WRITEFUNCTION, function ($curl, string $data) use (&$buffer) {
$buffer .= $data;

return \strlen($data);
});

$responseHeaders = [];
\curl_setopt($curl, CURLOPT_HEADERFUNCTION, function ($curl, string $header) use (&$responseHeaders) {
$len = \strlen($header);
$parts = \explode(':', $header, 2);
if (\count($parts) >= 2) {
$responseHeaders[\strtolower(\trim($parts[0]))] = \trim($parts[1]);
}

return $len;
});

$mh = \curl_multi_init();
\curl_multi_add_handle($mh, $curl);

try {
do {
$status = \curl_multi_exec($mh, $active);

while (\strlen($buffer) >= $chunkSize) {
yield \substr($buffer, 0, $chunkSize);
$buffer = \substr($buffer, $chunkSize);
}

if ($active) {
\curl_multi_select($mh, 1.0);
}
} while ($active && $status === CURLM_OK);

$code = \curl_getinfo($curl, CURLINFO_HTTP_CODE);

if ($code >= 400) {
$this->parseAndThrowS3Error($buffer, $code);
}

if (\strlen($buffer) > 0) {
yield $buffer;
$buffer = '';
Comment on lines +452 to +460
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Error response may be partially yielded before detection.

The HTTP status code check occurs after the streaming loop completes. If S3 returns an error response (e.g., 404), the error body may be buffered and yielded as a "chunk" before parseAndThrowS3Error is called. This could cause consumers to process error XML as file content.

Consider checking for error status earlier, or at minimum, ensure the final buffer isn't yielded when an error is detected:

🔧 Suggested fix
             $code = \curl_getinfo($curl, CURLINFO_HTTP_CODE);

             if ($code >= 400) {
+                // Don't yield the error response body as content
                 $this->parseAndThrowS3Error($buffer, $code);
             }

-            if (\strlen($buffer) > 0) {
+            if ($code < 400 && \strlen($buffer) > 0) {
                 yield $buffer;
                 $buffer = '';
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Storage/Device/S3.php` around lines 452 - 460, The code may yield an
error response body chunk before parseAndThrowS3Error runs; update the
streaming/flush logic in the S3 class so the HTTP status is validated before
yielding any buffered data: after retrieving $code via curl_getinfo($curl,
CURLINFO_HTTP_CODE) call parseAndThrowS3Error($buffer, $code) and only yield
$buffer when $code < 400 (or throw/return early when an error is detected).
Apply this change around the yield/flush section that references $buffer and
parseAndThrowS3Error so consumers never receive error XML as file content.

}
} finally {
\curl_multi_remove_handle($mh, $curl);
\curl_close($curl);
\curl_multi_close($mh);

$this->storageOperationTelemetry->record(
microtime(true) - $startTime,
[
'storage' => $this->getType(),
'operation' => 's3:readStream',
'attempts' => 0,
]
);
}
}

/**
* Write file by given path.
*
Expand Down
136 changes: 136 additions & 0 deletions tests/Storage/Device/LocalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,142 @@ public function testNestedDeletePath()
$this->assertFalse($this->object->exists($dir));
}

// -------------------------------------------------------------------------
// readStream tests
// -------------------------------------------------------------------------

public function testReadStream()
{
$path = $this->object->getPath('text-for-stream.txt');
$this->object->write($path, 'Hello World');

$chunks = [];
foreach ($this->object->readStream($path) as $chunk) {
$chunks[] = $chunk;
}

$this->assertCount(1, $chunks);
$this->assertEquals('Hello World', $chunks[0]);

$this->object->delete($path);
}

public function testReadStreamWithOffset()
{
$path = $this->object->getPath('text-for-stream-offset.txt');
$this->object->write($path, 'Hello World');

$chunks = [];
foreach ($this->object->readStream($path, offset: 6) as $chunk) {
$chunks[] = $chunk;
}

$this->assertEquals('World', implode('', $chunks));

$this->object->delete($path);
}

public function testReadStreamWithLength()
{
$path = $this->object->getPath('text-for-stream-length.txt');
$this->object->write($path, 'Hello World');

$chunks = [];
foreach ($this->object->readStream($path, offset: 0, length: 5) as $chunk) {
$chunks[] = $chunk;
}

$this->assertEquals('Hello', implode('', $chunks));

$this->object->delete($path);
}

public function testReadStreamWithOffsetAndLength()
{
$path = $this->object->getPath('text-for-stream-off-len.txt');
$this->object->write($path, 'Hello World');

$chunks = [];
foreach ($this->object->readStream($path, offset: 3, length: 5) as $chunk) {
$chunks[] = $chunk;
}

$this->assertEquals('lo Wo', implode('', $chunks));

$this->object->delete($path);
}

public function testReadStreamNonExistentFile()
{
$this->expectException(NotFoundException::class);
// Must iterate to trigger the generator
foreach ($this->object->readStream($this->object->getPath('non-existent-stream.txt')) as $chunk) {
// should not reach here
}
}

public function testReadStreamLargeFile()
{
$path = $this->object->getPath('large-stream-test.bin');
// Create a 5MB file
$size = 5 * 1024 * 1024;
$data = str_repeat('A', $size);
$this->object->write($path, $data);

$totalRead = 0;
$chunkCount = 0;
foreach ($this->object->readStream($path) as $chunk) {
$totalRead += strlen($chunk);
$chunkCount++;
// Each chunk should be <= 2MB
$this->assertLessThanOrEqual(2 * 1024 * 1024, strlen($chunk));
}

$this->assertEquals($size, $totalRead);
// 5MB file with 2MB chunks = 3 chunks
$this->assertEquals(3, $chunkCount);

$this->object->delete($path);
}

public function testReadStreamMatchesRead()
{
$source = __DIR__.'/../../resources/disk-a/kitten-1.jpg';
$path = $this->object->getPath('kitten-stream-test.jpg');
copy($source, $path);

$readContent = $this->object->read($path);

$streamContent = '';
foreach ($this->object->readStream($path) as $chunk) {
$streamContent .= $chunk;
}

$this->assertEquals($readContent, $streamContent);

$this->object->delete($path);
}

public function testReadStreamPartialMatchesRead()
{
$source = __DIR__.'/../../resources/disk-a/kitten-1.jpg';
$path = $this->object->getPath('kitten-stream-partial.jpg');
copy($source, $path);

$offset = 1000;
$length = 5000;
$readContent = $this->object->read($path, $offset, $length);

$streamContent = '';
foreach ($this->object->readStream($path, offset: $offset, length: $length) as $chunk) {
$streamContent .= $chunk;
}

$this->assertEquals($readContent, $streamContent);

$this->object->delete($path);
}

// -------------------------------------------------------------------------
// joinChunks tests
// -------------------------------------------------------------------------
Expand Down
Loading
Loading