-
Notifications
You must be signed in to change notification settings - Fork 169
feat(experimental): integrate writes strategy and appendable object writer #1695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
90a60b8
0d9f834
6b7b4f3
7987c57
cf745fc
3dee18c
af9fc7a
fe3d210
ac07610
1303256
97e068e
1945a9b
eaac82c
99b20b7
359a23c
78088db
c59db11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,8 +21,7 @@ | |
| if you want to use these Rapid Storage APIs. | ||
|
|
||
| """ | ||
| from typing import Optional | ||
| from . import _utils | ||
| from typing import List, Optional, Tuple | ||
| from google.cloud import _storage_v2 | ||
| from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient | ||
| from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( | ||
|
|
@@ -60,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream): | |
| same name already exists, it will be overwritten the moment | ||
| `writer.open()` is called. | ||
|
|
||
| :type write_handle: _storage_v2.BidiWriteHandle | ||
| :type write_handle: bytes | ||
| :param write_handle: (Optional) An existing handle for writing the object. | ||
| If provided, opening the bidi-gRPC connection will be faster. | ||
| """ | ||
|
|
@@ -71,7 +70,8 @@ def __init__( | |
| bucket_name: str, | ||
| object_name: str, | ||
| generation_number: Optional[int] = None, # None means new object | ||
| write_handle: Optional[_storage_v2.BidiWriteHandle] = None, | ||
| write_handle: Optional[bytes] = None, | ||
| routing_token: Optional[str] = None, | ||
| ) -> None: | ||
| if client is None: | ||
| raise ValueError("client must be provided") | ||
|
|
@@ -86,7 +86,8 @@ def __init__( | |
| generation_number=generation_number, | ||
| ) | ||
| self.client: AsyncGrpcClient.grpc_client = client | ||
| self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle | ||
| self.write_handle: Optional[bytes] = write_handle | ||
| self.routing_token: Optional[str] = routing_token | ||
|
|
||
| self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" | ||
|
|
||
|
|
@@ -101,7 +102,7 @@ def __init__( | |
| self.persisted_size = 0 | ||
| self.object_resource: Optional[_storage_v2.Object] = None | ||
|
|
||
| async def open(self) -> None: | ||
| async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: | ||
| """ | ||
| Opens the bidi-gRPC connection to write to the object. | ||
|
|
||
|
|
@@ -110,20 +111,19 @@ async def open(self) -> None: | |
|
|
||
| :rtype: None | ||
| :raises ValueError: If the stream is already open. | ||
| :raises google.api_core.exceptions.FailedPrecondition: | ||
| :raises google.api_core.exceptions.FailedPrecondition: | ||
| if `generation_number` is 0 and object already exists. | ||
| """ | ||
| if self._is_stream_open: | ||
| raise ValueError("Stream is already open") | ||
|
|
||
| write_handle = self.write_handle if self.write_handle else None | ||
|
|
||
| # Create a new object or overwrite existing one if generation_number | ||
| # is None. This makes it consistent with GCS JSON API behavior. | ||
| # Created object type would be Appendable Object. | ||
| # if `generation_number` == 0 new object will be created only if there | ||
| # isn't any existing object. | ||
| is_open_via_write_handle = ( | ||
| self.write_handle is not None and self.generation_number | ||
| ) | ||
| if self.generation_number is None or self.generation_number == 0: | ||
| self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( | ||
| write_object_spec=_storage_v2.WriteObjectSpec( | ||
|
|
@@ -140,44 +140,47 @@ async def open(self) -> None: | |
| bucket=self._full_bucket_name, | ||
| object=self.object_name, | ||
| generation=self.generation_number, | ||
| write_handle=self.write_handle, | ||
| write_handle=write_handle, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not this - |
||
| routing_token=self.routing_token if self.routing_token else None, | ||
| ), | ||
| ) | ||
|
|
||
| request_params = [f"bucket={self._full_bucket_name}"] | ||
| other_metadata = [] | ||
| if metadata: | ||
| for key, value in metadata: | ||
| if key == "x-goog-request-params": | ||
| request_params.append(value) | ||
| else: | ||
| other_metadata.append((key, value)) | ||
|
|
||
| current_metadata = other_metadata | ||
| current_metadata.append(("x-goog-request-params", ",".join(request_params))) | ||
|
Comment on lines
+157
to
+158
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not append Why create 2 more lists ? |
||
|
|
||
| self.socket_like_rpc = AsyncBidiRpc( | ||
| self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata | ||
| self.rpc, | ||
| initial_request=self.first_bidi_write_req, | ||
| metadata=current_metadata, | ||
| ) | ||
|
|
||
| await self.socket_like_rpc.open() # this is actually 1 send | ||
| response = await self.socket_like_rpc.recv() | ||
| self._is_stream_open = True | ||
| if is_open_via_write_handle: | ||
| # Don't use if not response.persisted_size because this will be true | ||
| # if persisted_size==0 (0 is considered "Falsy" in Python) | ||
| if response.persisted_size is None: | ||
| raise ValueError( | ||
| "Failed to obtain persisted_size after opening the stream via write_handle" | ||
| ) | ||
|
Comment on lines
-153
to
-159
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this check is being removed ? |
||
|
|
||
| if response.persisted_size: | ||
| self.persisted_size = response.persisted_size | ||
| else: | ||
| if not response.resource: | ||
| raise ValueError( | ||
| "Failed to obtain object resource after opening the stream" | ||
| ) | ||
| if not response.resource.generation: | ||
| raise ValueError( | ||
| "Failed to obtain object generation after opening the stream" | ||
| ) | ||
|
|
||
| if response.resource: | ||
| if not response.resource.size: | ||
| # Appending to a 0 byte appendable object. | ||
| self.persisted_size = 0 | ||
| else: | ||
| self.persisted_size = response.resource.size | ||
|
|
||
| if not response.write_handle: | ||
| raise ValueError("Failed to obtain write_handle after opening the stream") | ||
| self.generation_number = response.resource.generation | ||
|
|
||
| self.generation_number = response.resource.generation | ||
| self.write_handle = response.write_handle | ||
| if response.write_handle: | ||
| self.write_handle = response.write_handle | ||
|
|
||
| async def close(self) -> None: | ||
| """Closes the bidi-gRPC connection.""" | ||
|
|
@@ -191,7 +194,7 @@ async def requests_done(self): | |
| """Signals that all requests have been sent.""" | ||
|
|
||
| await self.socket_like_rpc.send(None) | ||
| _utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv()) | ||
| await self.socket_like_rpc.recv() | ||
|
Comment on lines
-194
to
+197
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this is removed ? |
||
|
|
||
| async def send( | ||
| self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest | ||
|
|
@@ -220,9 +223,17 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: | |
| if not self._is_stream_open: | ||
| raise ValueError("Stream is not open") | ||
| response = await self.socket_like_rpc.recv() | ||
| _utils.update_write_handle_if_exists(self, response) | ||
| # Update write_handle if present in response | ||
| if response: | ||
| if response.write_handle: | ||
| self.write_handle = response.write_handle | ||
| if response.persisted_size is not None: | ||
| self.persisted_size = response.persisted_size | ||
| if response.resource and response.resource.size: | ||
| self.persisted_size = response.resource.size | ||
| return response | ||
|
|
||
| @property | ||
| def is_stream_open(self) -> bool: | ||
| return self._is_stream_open | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need this change. write_handle shouldn't be bytes. it's of type
BidiWriteHandle.