diff --git a/src/Storage/Device.php b/src/Storage/Device.php index 961339c1..705bd3a6 100644 --- a/src/Storage/Device.php +++ b/src/Storage/Device.php @@ -120,6 +120,16 @@ abstract public function abort(string $path, string $extra = ''): bool; */ abstract public function read(string $path, int $offset = 0, ?int $length = null): string; + /** + * Read file as a stream, yielding chunks. + * + * @param string $path File path to read + * @param int $offset Byte offset to start from (0 = beginning) + * @param int $length Total bytes to read (-1 = read to end) + * @return \Generator Yields string chunks + */ + abstract public function readStream(string $path, int $offset = 0, int $length = -1): \Generator; + /** * Transfer * Transfer a file from current device to destination device. diff --git a/src/Storage/Device/Local.php b/src/Storage/Device/Local.php index 8236129e..a08e86cc 100644 --- a/src/Storage/Device/Local.php +++ b/src/Storage/Device/Local.php @@ -271,6 +271,52 @@ public function read(string $path, int $offset = 0, ?int $length = null): string return \file_get_contents($path, use_include_path: false, context: null, offset: $offset, length: $length); } + /** + * Read file as a stream, yielding chunks. + */ + public function readStream(string $path, int $offset = 0, int $length = -1): \Generator + { + if (! $this->exists($path)) { + throw new NotFoundException('File not found'); + } + + $handle = \fopen($path, 'rb'); + if ($handle === false) { + throw new Exception('Failed to open file: '.$path); + } + + try { + if ($offset > 0) { + \fseek($handle, $offset); + } + + $chunkSize = 2 * 1024 * 1024; // 2MB + $bytesRead = 0; + + while (! \feof($handle)) { + if ($length >= 0) { + $remaining = $length - $bytesRead; + if ($remaining <= 0) { + break; + } + $readSize = \min($chunkSize, $remaining); + } else { + $readSize = $chunkSize; + } + + $chunk = \fread($handle, $readSize); + if ($chunk === false || $chunk === '') { + break; + } + + $bytesRead += \strlen($chunk); + yield $chunk; + } + } finally { + \fclose($handle); + } + } + /** * Write file by given path. */ diff --git a/src/Storage/Device/S3.php b/src/Storage/Device/S3.php index 50558f60..c395bde9 100644 --- a/src/Storage/Device/S3.php +++ b/src/Storage/Device/S3.php @@ -350,6 +350,131 @@ public function read(string $path, int $offset = 0, ?int $length = null): string return $response->body; } + /** + * Read file as a stream, yielding chunks. + * + * Makes a single HTTP request and yields data as it arrives from the network. + */ + public function readStream(string $path, int $offset = 0, int $length = -1): \Generator + { + $startTime = microtime(true); + + unset($this->amzHeaders['x-amz-acl']); + unset($this->amzHeaders['x-amz-content-sha256']); + unset($this->headers['content-type']); + $this->headers['content-md5'] = \base64_encode(md5('', true)); + + $uri = ($path !== '') ? '/'.\str_replace('%2F', '/', \rawurlencode($path)) : '/'; + + if ($length > 0) { + $end = $offset + $length - 1; + $this->headers['range'] = "bytes=$offset-$end"; + } elseif ($offset > 0) { + $this->headers['range'] = "bytes=$offset-"; + } else { + unset($this->headers['range']); + } + + $uri = $this->getAbsolutePath($uri); + $url = $this->fqdn.$uri.'?'.\http_build_query([], '', '&', PHP_QUERY_RFC3986); + + $buffer = ''; + $chunkSize = 2 * 1024 * 1024; // 2MB + + $curl = \curl_init(); + \curl_setopt($curl, CURLOPT_USERAGENT, 'utopia-php/storage'); + \curl_setopt($curl, CURLOPT_URL, $url); + + $httpHeaders = []; + $this->amzHeaders['x-amz-date'] = \gmdate('Ymd\THis\Z'); + $this->amzHeaders['x-amz-content-sha256'] = \hash('sha256', ''); + + foreach ($this->amzHeaders as $header => $value) { + if (\strlen($value) > 0) { + $httpHeaders[] = $header.': '.$value; + } + } + + $this->headers['date'] = \gmdate('D, d M Y H:i:s T'); + + foreach ($this->headers as $header => $value) { + if (\strlen($value) > 0) { + $httpHeaders[] = $header.': '.$value; + } + } + + $httpHeaders[] = 'Authorization: '.$this->getSignatureV4(self::METHOD_GET, $uri); + + \curl_setopt($curl, CURLOPT_HTTPHEADER, $httpHeaders); + \curl_setopt($curl, CURLOPT_HEADER, false); + \curl_setopt($curl, CURLOPT_RETURNTRANSFER, false); + \curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true); + \curl_setopt($curl, CURLOPT_CUSTOMREQUEST, self::METHOD_GET); + + if ($this->curlHttpVersion != null) { + \curl_setopt($curl, CURLOPT_HTTP_VERSION, $this->curlHttpVersion); + } + + \curl_setopt($curl, CURLOPT_WRITEFUNCTION, function ($curl, string $data) use (&$buffer) { + $buffer .= $data; + + return \strlen($data); + }); + + $responseHeaders = []; + \curl_setopt($curl, CURLOPT_HEADERFUNCTION, function ($curl, string $header) use (&$responseHeaders) { + $len = \strlen($header); + $parts = \explode(':', $header, 2); + if (\count($parts) >= 2) { + $responseHeaders[\strtolower(\trim($parts[0]))] = \trim($parts[1]); + } + + return $len; + }); + + $mh = \curl_multi_init(); + \curl_multi_add_handle($mh, $curl); + + try { + do { + $status = \curl_multi_exec($mh, $active); + + while (\strlen($buffer) >= $chunkSize) { + yield \substr($buffer, 0, $chunkSize); + $buffer = \substr($buffer, $chunkSize); + } + + if ($active) { + \curl_multi_select($mh, 1.0); + } + } while ($active && $status === CURLM_OK); + + $code = \curl_getinfo($curl, CURLINFO_HTTP_CODE); + + if ($code >= 400) { + $this->parseAndThrowS3Error($buffer, $code); + } + + if (\strlen($buffer) > 0) { + yield $buffer; + $buffer = ''; + } + } finally { + \curl_multi_remove_handle($mh, $curl); + \curl_close($curl); + \curl_multi_close($mh); + + $this->storageOperationTelemetry->record( + microtime(true) - $startTime, + [ + 'storage' => $this->getType(), + 'operation' => 's3:readStream', + 'attempts' => 0, + ] + ); + } + } + /** * Write file by given path. * diff --git a/tests/Storage/Device/LocalTest.php b/tests/Storage/Device/LocalTest.php index 5d1f0f9a..b90a074d 100644 --- a/tests/Storage/Device/LocalTest.php +++ b/tests/Storage/Device/LocalTest.php @@ -429,6 +429,142 @@ public function testNestedDeletePath() $this->assertFalse($this->object->exists($dir)); } + // ------------------------------------------------------------------------- + // readStream tests + // ------------------------------------------------------------------------- + + public function testReadStream() + { + $path = $this->object->getPath('text-for-stream.txt'); + $this->object->write($path, 'Hello World'); + + $chunks = []; + foreach ($this->object->readStream($path) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertCount(1, $chunks); + $this->assertEquals('Hello World', $chunks[0]); + + $this->object->delete($path); + } + + public function testReadStreamWithOffset() + { + $path = $this->object->getPath('text-for-stream-offset.txt'); + $this->object->write($path, 'Hello World'); + + $chunks = []; + foreach ($this->object->readStream($path, offset: 6) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('World', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamWithLength() + { + $path = $this->object->getPath('text-for-stream-length.txt'); + $this->object->write($path, 'Hello World'); + + $chunks = []; + foreach ($this->object->readStream($path, offset: 0, length: 5) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('Hello', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamWithOffsetAndLength() + { + $path = $this->object->getPath('text-for-stream-off-len.txt'); + $this->object->write($path, 'Hello World'); + + $chunks = []; + foreach ($this->object->readStream($path, offset: 3, length: 5) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('lo Wo', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamNonExistentFile() + { + $this->expectException(NotFoundException::class); + // Must iterate to trigger the generator + foreach ($this->object->readStream($this->object->getPath('non-existent-stream.txt')) as $chunk) { + // should not reach here + } + } + + public function testReadStreamLargeFile() + { + $path = $this->object->getPath('large-stream-test.bin'); + // Create a 5MB file + $size = 5 * 1024 * 1024; + $data = str_repeat('A', $size); + $this->object->write($path, $data); + + $totalRead = 0; + $chunkCount = 0; + foreach ($this->object->readStream($path) as $chunk) { + $totalRead += strlen($chunk); + $chunkCount++; + // Each chunk should be <= 2MB + $this->assertLessThanOrEqual(2 * 1024 * 1024, strlen($chunk)); + } + + $this->assertEquals($size, $totalRead); + // 5MB file with 2MB chunks = 3 chunks + $this->assertEquals(3, $chunkCount); + + $this->object->delete($path); + } + + public function testReadStreamMatchesRead() + { + $source = __DIR__.'/../../resources/disk-a/kitten-1.jpg'; + $path = $this->object->getPath('kitten-stream-test.jpg'); + copy($source, $path); + + $readContent = $this->object->read($path); + + $streamContent = ''; + foreach ($this->object->readStream($path) as $chunk) { + $streamContent .= $chunk; + } + + $this->assertEquals($readContent, $streamContent); + + $this->object->delete($path); + } + + public function testReadStreamPartialMatchesRead() + { + $source = __DIR__.'/../../resources/disk-a/kitten-1.jpg'; + $path = $this->object->getPath('kitten-stream-partial.jpg'); + copy($source, $path); + + $offset = 1000; + $length = 5000; + $readContent = $this->object->read($path, $offset, $length); + + $streamContent = ''; + foreach ($this->object->readStream($path, offset: $offset, length: $length) as $chunk) { + $streamContent .= $chunk; + } + + $this->assertEquals($readContent, $streamContent); + + $this->object->delete($path); + } + // ------------------------------------------------------------------------- // joinChunks tests // ------------------------------------------------------------------------- diff --git a/tests/Storage/S3Base.php b/tests/Storage/S3Base.php index 5b428606..b0efed44 100644 --- a/tests/Storage/S3Base.php +++ b/tests/Storage/S3Base.php @@ -375,6 +375,24 @@ public function testPartRead($path) $this->assertEquals($chunk, $readChunk); } + /** + * @depends testPartUpload + */ + public function testReadStreamLargeFile($path) + { + $source = __DIR__.'/../resources/disk-a/large_file.mp4'; + $expectedSize = \filesize($source); + + $totalRead = 0; + foreach ($this->object->readStream($path) as $chunk) { + $totalRead += strlen($chunk); + // Each chunk should be <= 2MB + $this->assertLessThanOrEqual(2 * 1024 * 1024, strlen($chunk)); + } + + $this->assertEquals($expectedSize, $totalRead); + } + /** * @depends testPartUpload */ @@ -412,6 +430,96 @@ public function testTransferSmall() $device->delete($destination); } + public function testReadStream() + { + $path = $this->object->getPath('text-for-stream.txt'); + $this->object->write($path, 'Hello World', 'text/plain'); + + $chunks = []; + foreach ($this->object->readStream($path) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('Hello World', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamWithOffset() + { + $path = $this->object->getPath('text-for-stream-offset.txt'); + $this->object->write($path, 'Hello World', 'text/plain'); + + $chunks = []; + foreach ($this->object->readStream($path, offset: 6) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('World', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamWithLength() + { + $path = $this->object->getPath('text-for-stream-length.txt'); + $this->object->write($path, 'Hello World', 'text/plain'); + + $chunks = []; + foreach ($this->object->readStream($path, offset: 0, length: 5) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('Hello', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamWithOffsetAndLength() + { + $path = $this->object->getPath('text-for-stream-off-len.txt'); + $this->object->write($path, 'Hello World', 'text/plain'); + + $chunks = []; + foreach ($this->object->readStream($path, offset: 3, length: 5) as $chunk) { + $chunks[] = $chunk; + } + + $this->assertEquals('lo Wo', implode('', $chunks)); + + $this->object->delete($path); + } + + public function testReadStreamMatchesRead() + { + $path = $this->object->getPath('testing/kitten-1.jpg'); + + $readContent = $this->object->read($path); + + $streamContent = ''; + foreach ($this->object->readStream($path) as $chunk) { + $streamContent .= $chunk; + } + + $this->assertEquals($readContent, $streamContent); + } + + public function testReadStreamPartialMatchesRead() + { + $path = $this->object->getPath('testing/kitten-1.jpg'); + + $offset = 1000; + $length = 5000; + $readContent = $this->object->read($path, $offset, $length); + + $streamContent = ''; + foreach ($this->object->readStream($path, offset: $offset, length: $length) as $chunk) { + $streamContent .= $chunk; + } + + $this->assertEquals($readContent, $streamContent); + } + public function testTransferNonExistentFile() { $device = new Local(__DIR__.'/../resources/disk-a');