From d0f49808e713499b69b45c0a1410d6e6ddb23e08 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 15 May 2026 09:01:14 +0000 Subject: [PATCH 1/5] feat(storage): implement App-centric Observability (ACO) OpenTelemetry tracing and lockless bucket metadata caching --- .../cloud/storage/_bucket_metadata_cache.py | 115 +++++++++++++ .../google/cloud/storage/_helpers.py | 49 ++++++ .../google/cloud/storage/_lru_cache.py | 89 ++++++++++ .../google/cloud/storage/blob.py | 49 +++--- .../google/cloud/storage/bucket.py | 45 +++-- .../google/cloud/storage/client.py | 15 ++ .../tests/unit/test__bucket_metadata_cache.py | 159 ++++++++++++++++++ .../tests/unit/test__lru_cache.py | 101 +++++++++++ 8 files changed, 574 insertions(+), 48 deletions(-) create mode 100644 packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py create mode 100644 packages/google-cloud-storage/google/cloud/storage/_lru_cache.py create mode 100644 packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py create mode 100644 packages/google-cloud-storage/tests/unit/test__lru_cache.py diff --git a/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py new file mode 100644 index 000000000000..72069445ee9b --- /dev/null +++ b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py @@ -0,0 +1,115 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO).""" + +import logging +import threading + +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound +from google.cloud.storage._lru_cache import LRUCache + +logger = logging.getLogger(__name__) + + +class BucketMetadataCache: + """Thread-safe LRU cache for storing GCS bucket metadata (project number and location). + + Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses. + """ + + def __init__(self, client, max_size=10000): + self._client = client + self._cache = LRUCache(max_size) + self._lock = threading.Lock() + self._inflight_fetches = set() + + def get_or_queue_fetch(self, bucket_name): + """Retrieve bucket metadata or queue a background fetch on cache miss. + + Returns None immediately on cache miss so caller does not block. + """ + with self._lock: + if bucket_name in self._cache: + return self._cache.get(bucket_name) + elif bucket_name in self._inflight_fetches: + # this would be the case of thundering herd, where 'n' threads + # all of them faced "cache miss" and 1 is in progress to fetch metadata. + # hence we don't want rest `n - 1` threads to make the same req + return None + else: + # fire a background thread and get bucket metadata. + self._inflight_fetches.add(bucket_name) + threading.Thread( + target=self._fetch_background, args=(bucket_name,), daemon=True + ).start() + return None + + def _fetch_background(self, bucket_name): + """Asynchronously fetch bucket metadata and update the cache.""" + try: + bucket = self._client.get_bucket(bucket_name, timeout=10.0) + self.update_from_bucket(bucket) + except (NotFound, api_exceptions.NotFound): + self.evict(bucket_name) + except api_exceptions.Forbidden: + # On 403 (Forbidden), cache fallback values permanently to avoid retry storms + self.update_cache( + bucket_name, f"projects/_/buckets/{bucket_name}", "global" + ) + except Exception as e: + logger.debug( + f"Background fetch for bucket metadata failed for {bucket_name}: {e}" + ) + finally: + with self._lock: + self._inflight_fetches.discard(bucket_name) + + def update_from_bucket(self, bucket): + """Update cache from a Bucket instance.""" + if not bucket or not bucket.name: + return + + project_number = getattr(bucket, "project_number", None) + location = getattr(bucket, "location", None) or "global" + location = location.lower() + location_type = getattr(bucket, "location_type", None) or "region" + location_type = location_type.lower() + + if location_type in ("multi-region", "dual-region"): + location = "global" + + if project_number: + destination_id = f"projects/{project_number}/buckets/{bucket.name}" + else: + destination_id = f"projects/_/buckets/{bucket.name}" + + self.update_cache(bucket.name, destination_id, location) + + def update_cache(self, bucket_name, destination_id, location): + """Thread-safely update or insert a cache entry with bounded size.""" + with self._lock: + self._cache.put(bucket_name, (destination_id, location)) + + def evict(self, bucket_name): + """Remove a bucket from the cache (e.g., on 404).""" + with self._lock: + self._cache.delete(bucket_name) + + def clear(self): + """Clear all cached metadata.""" + with self._lock: + self._cache.clear() + self._inflight_fetches.clear() diff --git a/packages/google-cloud-storage/google/cloud/storage/_helpers.py b/packages/google-cloud-storage/google/cloud/storage/_helpers.py index 3ba8caff6611..257a7358e71c 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_helpers.py +++ b/packages/google-cloud-storage/google/cloud/storage/_helpers.py @@ -33,6 +33,9 @@ DEFAULT_RETRY, DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ) +from google.cloud.storage._opentelemetry_tracing import ( + create_trace_span as _base_create_trace_span, +) STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme. """Environment variable defining host for Storage emulator.""" @@ -185,6 +188,52 @@ def _require_client(self, client): client = self.client return client + def _get_aco_attributes(self): + from google.cloud.storage.blob import Blob + from google.cloud.storage.bucket import Bucket + + if isinstance(self, Bucket): + cache = getattr(self.client, "_bucket_metadata_cache", None) + bucket_name = self.name + elif isinstance(self, Blob): + bucket = getattr(self, "bucket", None) + cache = ( + getattr(bucket.client, "_bucket_metadata_cache", None) + if bucket and hasattr(bucket, "client") + else None + ) + bucket_name = getattr(bucket, "name", None) if bucket else None + else: + raise TypeError( + f"Unexpected type for ACO attribute retrieval: {type(self)}" + ) + + if callable(bucket_name): + try: + bucket_name = bucket_name() + except Exception: + pass + + if cache and bucket_name and isinstance(bucket_name, str): + try: + cached = cache.get_or_queue_fetch(bucket_name) + if cached and isinstance(cached, tuple) and len(cached) == 2: + dest_id, loc = cached + return { + "gcp.resource.destination.id": dest_id, + "gcp.resource.destination.location": loc, + } + except Exception: + pass + return {} + + def _create_trace_span(self, name, attributes=None, **kwargs): + aco_attrs = self._get_aco_attributes() + if attributes is None: + attributes = {} + attributes.update(aco_attrs) + return _base_create_trace_span(name, attributes=attributes, **kwargs) + def _encryption_headers(self): """Return any encryption headers needed to fetch the object. diff --git a/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py new file mode 100644 index 000000000000..8a208e0ea6c8 --- /dev/null +++ b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py @@ -0,0 +1,89 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A Least Recently Used (LRU) cache implementation.""" + +from collections import OrderedDict +from typing import Any, Generic, Optional, TypeVar + +K = TypeVar("K") +V = TypeVar("V") + + +class LRUCache(Generic[K, V]): + """A Least Recently Used (LRU) cache implementation using OrderedDict. + + :type capacity: int + :param capacity: The maximum number of items the cache can hold. + """ + + def __init__(self, capacity: int) -> None: + if capacity <= 0: + raise ValueError("Capacity must be greater than 0") + self._capacity = capacity + self._cache: OrderedDict[K, V] = OrderedDict() + + @property + def capacity(self) -> int: + """Return the capacity of the cache.""" + return self._capacity + + def get(self, key: K, default: Optional[V] = None) -> Optional[V]: + """Retrieve an item from the cache. + + If the key exists, it is moved to the end (marked as most recently used). + + :type key: Any + :param key: The key to look up in the cache. + + :type default: Any + :param default: Default value to return if key is not found. + """ + if key not in self._cache: + return default + self._cache.move_to_end(key) + return self._cache[key] + + def put(self, key: K, value: V) -> None: + """Add or update an item in the cache. + + If the key already exists, it is updated and moved to the end. + If adding the item exceeds capacity, the least recently used item (at the beginning) + is evicted. + + :type key: Any + :param key: The key to store. + + :type value: Any + :param value: The value to store. + """ + if key in self._cache: + self._cache.move_to_end(key) + self._cache[key] = value + if len(self._cache) > self._capacity: + self._cache.popitem(last=False) + + def __len__(self) -> int: + return len(self._cache) + + def __contains__(self, key: K) -> bool: + return key in self._cache + + def clear(self) -> None: + """Clear all items from the cache.""" + self._cache.clear() + + def delete(self, key: K) -> None: + """Remove an item from the cache if it exists.""" + self._cache.pop(key, None) diff --git a/packages/google-cloud-storage/google/cloud/storage/blob.py b/packages/google-cloud-storage/google/cloud/storage/blob.py index c6fbcf4c12b7..7b4c6b0a0e79 100644 --- a/packages/google-cloud-storage/google/cloud/storage/blob.py +++ b/packages/google-cloud-storage/google/cloud/storage/blob.py @@ -60,7 +60,6 @@ ) from google.cloud.storage._opentelemetry_tracing import ( _get_opentelemetry_attributes_from_url, - create_trace_span, ) from google.cloud.storage._signing import generate_signed_url_v2, generate_signed_url_v4 from google.cloud.storage.acl import ACL, ObjectACL @@ -743,7 +742,7 @@ def exists( :rtype: bool :returns: True if the blob exists in Cloud Storage. """ - with create_trace_span(name="Storage.Blob.exists"): + with self._create_trace_span(name="Storage.Blob.exists"): client = self._require_client(client) # We only need the status code (200 or not) so we seek to # minimize the returned payload. @@ -847,7 +846,7 @@ def delete( (propagated from :meth:`google.cloud.storage.bucket.Bucket.delete_blob`). """ - with create_trace_span(name="Storage.Blob.delete"): + with self._create_trace_span(name="Storage.Blob.delete"): self.bucket.delete_blob( self.name, client=client, @@ -1086,7 +1085,7 @@ def _do_download( # not supported for chunked downloads. single_shot_download=single_shot_download, ) - with create_trace_span( + with self._create_trace_span( name=f"Storage.{download_class}/consume", attributes=extra_attributes, api_request=args, @@ -1115,7 +1114,7 @@ def _do_download( retry=retry, ) - with create_trace_span( + with self._create_trace_span( name=f"Storage.{download_class}/consumeNextChunk", attributes=extra_attributes, api_request=args, @@ -1243,7 +1242,7 @@ def download_to_file( :raises: :class:`google.cloud.exceptions.NotFound` """ - with create_trace_span(name="Storage.Blob.downloadToFile"): + with self._create_trace_span(name="Storage.Blob.downloadToFile"): self._prep_and_do_download( file_obj, client=client, @@ -1399,7 +1398,7 @@ def download_to_filename( :raises: :class:`google.cloud.exceptions.NotFound` """ - with create_trace_span(name="Storage.Blob.downloadToFilename"): + with self._create_trace_span(name="Storage.Blob.downloadToFilename"): self._handle_filename_and_download( filename, client=client, @@ -1524,7 +1523,7 @@ def download_as_bytes( :raises: :class:`google.cloud.exceptions.NotFound` """ - with create_trace_span(name="Storage.Blob.downloadAsBytes"): + with self._create_trace_span(name="Storage.Blob.downloadAsBytes"): string_buffer = BytesIO() self._prep_and_do_download( @@ -1647,7 +1646,7 @@ def download_as_string( PendingDeprecationWarning, stacklevel=2, ) - with create_trace_span(name="Storage.Blob.downloadAsString"): + with self._create_trace_span(name="Storage.Blob.downloadAsString"): return self.download_as_bytes( client=client, start=start, @@ -1761,7 +1760,7 @@ def download_as_text( :rtype: text :returns: The data stored in this blob, decoded to text. """ - with create_trace_span(name="Storage.Blob.downloadAsText"): + with self._create_trace_span(name="Storage.Blob.downloadAsText"): data = self.download_as_bytes( client=client, start=start, @@ -2052,7 +2051,7 @@ def _do_multipart_upload( extra_attributes = _get_opentelemetry_attributes_from_url(upload_url) extra_attributes["upload.checksum"] = f"{checksum}" args = {"timeout": timeout} - with create_trace_span( + with self._create_trace_span( name="Storage.MultipartUpload/transmit", attributes=extra_attributes, client=client, @@ -2452,7 +2451,7 @@ def _do_resumable_upload( extra_attributes["upload.checksum"] = f"{checksum}" args = {"timeout": timeout} - with create_trace_span( + with self._create_trace_span( name="Storage.ResumableUpload/transmitNextChunk", attributes=extra_attributes, client=client, @@ -3011,7 +3010,7 @@ def upload_from_file( :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. """ - with create_trace_span(name="Storage.Blob.uploadFromFile"): + with self._create_trace_span(name="Storage.Blob.uploadFromFile"): self._prep_and_do_upload( file_obj, rewind=rewind, @@ -3194,7 +3193,7 @@ def upload_from_filename( https://datatracker.ietf.org/doc/html/rfc4960#appendix-B and base64: https://datatracker.ietf.org/doc/html/rfc4648#section-4 """ - with create_trace_span(name="Storage.Blob.uploadFromFilename"): + with self._create_trace_span(name="Storage.Blob.uploadFromFilename"): self._handle_filename_and_upload( filename, content_type=content_type, @@ -3343,7 +3342,7 @@ def upload_from_string( https://datatracker.ietf.org/doc/html/rfc4960#appendix-B and base64: https://datatracker.ietf.org/doc/html/rfc4648#section-4 """ - with create_trace_span(name="Storage.Blob.uploadFromString"): + with self._create_trace_span(name="Storage.Blob.uploadFromString"): data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) self.upload_from_file( @@ -3495,7 +3494,7 @@ def create_resumable_upload_session( :raises: :class:`google.cloud.exceptions.GoogleCloudError` if the session creation response returns an error status. """ - with create_trace_span(name="Storage.Blob.createResumableUploadSession"): + with self._create_trace_span(name="Storage.Blob.createResumableUploadSession"): # Handle ConditionalRetryPolicy. if isinstance(retry, ConditionalRetryPolicy): # Conditional retries are designed for non-media calls, which change @@ -3591,7 +3590,7 @@ def get_iam_policy( :returns: the policy instance, based on the resource returned from the ``getIamPolicy`` API request. """ - with create_trace_span(name="Storage.Blob.getIamPolicy"): + with self._create_trace_span(name="Storage.Blob.getIamPolicy"): client = self._require_client(client) query_params = {} @@ -3652,7 +3651,7 @@ def set_iam_policy( :returns: the policy instance, based on the resource returned from the ``setIamPolicy`` API request. """ - with create_trace_span(name="Storage.Blob.setIamPolicy"): + with self._create_trace_span(name="Storage.Blob.setIamPolicy"): client = self._require_client(client) query_params = {} @@ -3714,7 +3713,7 @@ def test_iam_permissions( :returns: the permissions returned by the ``testIamPermissions`` API request. """ - with create_trace_span(name="Storage.Blob.testIamPermissions"): + with self._create_trace_span(name="Storage.Blob.testIamPermissions"): client = self._require_client(client) query_params = {"permissions": permissions} @@ -3774,7 +3773,7 @@ def make_public( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Blob.makePublic"): + with self._create_trace_span(name="Storage.Blob.makePublic"): self.acl.all().grant_read() self.acl.save( client=client, @@ -3828,7 +3827,7 @@ def make_private( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Blob.makePrivate"): + with self._create_trace_span(name="Storage.Blob.makePrivate"): self.acl.all().revoke_read() self.acl.save( client=client, @@ -3909,7 +3908,7 @@ def compose( to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). """ - with create_trace_span(name="Storage.Blob.compose"): + with self._create_trace_span(name="Storage.Blob.compose"): sources_len = len(sources) client = self._require_client(client) query_params = {} @@ -4088,7 +4087,7 @@ def rewrite( and ``total_bytes`` is the total number of bytes to be rewritten. """ - with create_trace_span(name="Storage.Blob.rewrite"): + with self._create_trace_span(name="Storage.Blob.rewrite"): client = self._require_client(client) headers = _get_encryption_headers(self._encryption_key) headers.update(_get_encryption_headers(source._encryption_key, source=True)) @@ -4248,7 +4247,7 @@ def update_storage_class( to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). """ - with create_trace_span(name="Storage.Blob.updateStorageClass"): + with self._create_trace_span(name="Storage.Blob.updateStorageClass"): # Update current blob's storage class prior to rewrite self._patch_property("storageClass", new_class) @@ -4392,7 +4391,7 @@ def open( 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one of those classes, depending on the 'mode' argument. """ - with create_trace_span(name="Storage.Blob.open"): + with self._create_trace_span(name="Storage.Blob.open"): if mode == "rb": if encoding or errors or newline: raise ValueError( diff --git a/packages/google-cloud-storage/google/cloud/storage/bucket.py b/packages/google-cloud-storage/google/cloud/storage/bucket.py index 6fd690cf38b2..4abcbf09444a 100644 --- a/packages/google-cloud-storage/google/cloud/storage/bucket.py +++ b/packages/google-cloud-storage/google/cloud/storage/bucket.py @@ -38,7 +38,6 @@ _validate_name, _virtual_hosted_style_base_url, ) -from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage._signing import generate_signed_url_v2, generate_signed_url_v4 from google.cloud.storage.acl import BucketACL, DefaultObjectACL from google.cloud.storage.blob import Blob, _quote @@ -972,7 +971,7 @@ def exists( :rtype: bool :returns: True if the bucket exists in Cloud Storage. """ - with create_trace_span(name="Storage.Bucket.exists"): + with self._create_trace_span(name="Storage.Bucket.exists"): client = self._require_client(client) # We only need the status code (200 or not) so we seek to # minimize the returned payload. @@ -1073,7 +1072,7 @@ def create( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Bucket.create"): + with self._create_trace_span(name="Storage.Bucket.create"): client = self._require_client(client) client.create_bucket( bucket_or_name=self, @@ -1123,7 +1122,7 @@ def update( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Bucket.update"): + with self._create_trace_span(name="Storage.Bucket.update"): super(Bucket, self).update( client=client, timeout=timeout, @@ -1190,7 +1189,7 @@ def reload( set if ``soft_deleted`` is set to True. See: https://cloud.google.com/storage/docs/soft-delete """ - with create_trace_span(name="Storage.Bucket.reload"): + with self._create_trace_span(name="Storage.Bucket.reload"): super(Bucket, self).reload( client=client, projection=projection, @@ -1239,7 +1238,7 @@ def patch( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Bucket.patch"): + with self._create_trace_span(name="Storage.Bucket.patch"): # Special case: For buckets, it is possible that labels are being # removed; this requires special handling. if self._label_removals: @@ -1375,7 +1374,7 @@ def get_blob( :rtype: :class:`google.cloud.storage.blob.Blob` or None :returns: The blob object if it exists, otherwise None. """ - with create_trace_span(name="Storage.Bucket.getBlob"): + with self._create_trace_span(name="Storage.Bucket.getBlob"): blob = Blob( bucket=self, name=blob_name, @@ -1525,7 +1524,7 @@ def list_blobs( :returns: Iterator of all :class:`~google.cloud.storage.blob.Blob` in this bucket matching the arguments. """ - with create_trace_span(name="Storage.Bucket.listBlobs"): + with self._create_trace_span(name="Storage.Bucket.listBlobs"): client = self._require_client(client) return client.list_blobs( self, @@ -1573,7 +1572,7 @@ def list_notifications( :rtype: list of :class:`.BucketNotification` :returns: notification instances """ - with create_trace_span(name="Storage.Bucket.listNotifications"): + with self._create_trace_span(name="Storage.Bucket.listNotifications"): client = self._require_client(client) path = self.path + "/notificationConfigs" iterator = client._list_resource( @@ -1618,7 +1617,7 @@ def get_notification( :rtype: :class:`.BucketNotification` :returns: notification instance. """ - with create_trace_span(name="Storage.Bucket.getNotification"): + with self._create_trace_span(name="Storage.Bucket.getNotification"): notification = self.notification(notification_id=notification_id) notification.reload(client=client, timeout=timeout, retry=retry) return notification @@ -1678,7 +1677,7 @@ def delete( :raises: :class:`ValueError` if ``force`` is ``True`` and the bucket contains more than 256 objects / blobs. """ - with create_trace_span(name="Storage.Bucket.delete"): + with self._create_trace_span(name="Storage.Bucket.delete"): client = self._require_client(client) query_params = {} @@ -1801,7 +1800,7 @@ def delete_blob( the exception, use :meth:`delete_blobs` by passing a no-op ``on_error`` callback. """ - with create_trace_span(name="Storage.Bucket.deleteBlob"): + with self._create_trace_span(name="Storage.Bucket.deleteBlob"): client = self._require_client(client) blob = Blob(blob_name, bucket=self, generation=generation) @@ -1914,7 +1913,7 @@ def delete_blobs( :raises: :class:`~google.cloud.exceptions.NotFound` (if `on_error` is not passed). """ - with create_trace_span(name="Storage.Bucket.deleteBlobs"): + with self._create_trace_span(name="Storage.Bucket.deleteBlobs"): _raise_if_len_differs( len(blobs), if_generation_match=if_generation_match, @@ -2068,7 +2067,7 @@ def copy_blob( :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The new Blob. """ - with create_trace_span(name="Storage.Bucket.copyBlob"): + with self._create_trace_span(name="Storage.Bucket.copyBlob"): client = self._require_client(client) query_params = {} @@ -2222,7 +2221,7 @@ def rename_blob( :rtype: :class:`Blob` :returns: The newly-renamed blob. """ - with create_trace_span(name="Storage.Bucket.renameBlob"): + with self._create_trace_span(name="Storage.Bucket.renameBlob"): same_name = blob.name == new_name new_blob = self.copy_blob( @@ -2342,7 +2341,7 @@ def move_blob( :rtype: :class:`Blob` :returns: The newly-moved blob. """ - with create_trace_span(name="Storage.Bucket.moveBlob"): + with self._create_trace_span(name="Storage.Bucket.moveBlob"): client = self._require_client(client) query_params = {} @@ -2451,7 +2450,7 @@ def restore_blob( :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The restored Blob. """ - with create_trace_span(name="Storage.Bucket.restore_blob"): + with self._create_trace_span(name="Storage.Bucket.restore_blob"): client = self._require_client(client) query_params = {} @@ -3346,7 +3345,7 @@ def get_iam_policy( :returns: the policy instance, based on the resource returned from the ``getIamPolicy`` API request. """ - with create_trace_span(name="Storage.Bucket.getIamPolicy"): + with self._create_trace_span(name="Storage.Bucket.getIamPolicy"): client = self._require_client(client) query_params = {} @@ -3400,7 +3399,7 @@ def set_iam_policy( :returns: the policy instance, based on the resource returned from the ``setIamPolicy`` API request. """ - with create_trace_span(name="Storage.Bucket.setIamPolicy"): + with self._create_trace_span(name="Storage.Bucket.setIamPolicy"): client = self._require_client(client) query_params = {} @@ -3457,7 +3456,7 @@ def test_iam_permissions( :returns: the permissions returned by the ``testIamPermissions`` API request. """ - with create_trace_span(name="Storage.Bucket.testIamPermissions"): + with self._create_trace_span(name="Storage.Bucket.testIamPermissions"): client = self._require_client(client) query_params = {"permissions": permissions} @@ -3523,7 +3522,7 @@ def make_public( :meth:`~google.cloud.storage.blob.Blob.make_public` for each blob. """ - with create_trace_span(name="Storage.Bucket.makePublic"): + with self._create_trace_span(name="Storage.Bucket.makePublic"): self.acl.all().grant_read() self.acl.save( client=client, @@ -3620,7 +3619,7 @@ def make_private( :meth:`~google.cloud.storage.blob.Blob.make_private` for each blob. """ - with create_trace_span(name="Storage.Bucket.makePrivate"): + with self._create_trace_span(name="Storage.Bucket.makePrivate"): self.acl.all().revoke_read() self.acl.save( client=client, @@ -3744,7 +3743,7 @@ def lock_retention_policy( if the bucket has no retention policy assigned; if the bucket's retention policy is already locked. """ - with create_trace_span(name="Storage.Bucket.lockRetentionPolicy"): + with self._create_trace_span(name="Storage.Bucket.lockRetentionPolicy"): if "metageneration" not in self._properties: raise ValueError( "Bucket has no retention policy assigned: try 'reload'?" diff --git a/packages/google-cloud-storage/google/cloud/storage/client.py b/packages/google-cloud-storage/google/cloud/storage/client.py index 528b2255f451..afd97fe33286 100644 --- a/packages/google-cloud-storage/google/cloud/storage/client.py +++ b/packages/google-cloud-storage/google/cloud/storage/client.py @@ -44,6 +44,7 @@ _get_storage_emulator_override, _virtual_hosted_style_base_url, ) +from google.cloud.storage._bucket_metadata_cache import BucketMetadataCache from google.cloud.storage._http import Connection from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage._signing import ( @@ -289,6 +290,20 @@ def __init__( connection.extra_headers = extra_headers self._connection = connection self._batch_stack = _LocalStack() + self._bucket_metadata_cache = BucketMetadataCache(self) + + def close(self): + """Close the client and clear any cached metadata or active connections.""" + if hasattr(self, "_bucket_metadata_cache") and self._bucket_metadata_cache: + self._bucket_metadata_cache.clear() + if hasattr(self._http, "close"): + self._http.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() @classmethod def create_anonymous_client(cls): diff --git a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py new file mode 100644 index 000000000000..22a94cb23fa6 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py @@ -0,0 +1,159 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest +from unittest import mock + +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound +from google.cloud.storage._bucket_metadata_cache import BucketMetadataCache + + +class TestBucketMetadataCache(unittest.TestCase): + @mock.patch("threading.Thread") + def test_lru_eviction(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client, max_size=3) + + cache.update_cache("b1", "dest1", "loc1") + cache.update_cache("b2", "dest2", "loc2") + cache.update_cache("b3", "dest3", "loc3") + cache.update_cache("b4", "dest4", "loc4") # Evicts b1 (oldest) + + self.assertIsNone(cache.get_or_queue_fetch("b1")) + self.assertEqual(cache.get_or_queue_fetch("b2"), ("dest2", "loc2")) + self.assertEqual(cache.get_or_queue_fetch("b3"), ("dest3", "loc3")) + self.assertEqual(cache.get_or_queue_fetch("b4"), ("dest4", "loc4")) + + def test_update_from_bucket(self): + client = mock.Mock() + cache = BucketMetadataCache(client) + + # Multi-region -> global + b1 = mock.Mock() + b1.name = "b1" + b1.location = "US" + b1.location_type = "multi-region" + b1.project_number = 123 + cache.update_from_bucket(b1) + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/123/buckets/b1", "global") + ) + + # Dual-region -> global + b2 = mock.Mock() + b2.name = "b2" + b2.location = "NAM4" + b2.location_type = "dual-region" + b2.project_number = 456 + cache.update_from_bucket(b2) + self.assertEqual( + cache.get_or_queue_fetch("b2"), ("projects/456/buckets/b2", "global") + ) + + # Region -> us-east1 + b3 = mock.Mock() + b3.name = "b3" + b3.location = "US-EAST1" + b3.location_type = "region" + b3.project_number = 789 + cache.update_from_bucket(b3) + self.assertEqual( + cache.get_or_queue_fetch("b3"), ("projects/789/buckets/b3", "us-east1") + ) + + # Missing project number -> _ + b4 = mock.Mock() + b4.name = "b4" + b4.location = "eu-west1" + b4.location_type = "region" + b4.project_number = None + cache.update_from_bucket(b4) + self.assertEqual( + cache.get_or_queue_fetch("b4"), ("projects/_/buckets/b4", "eu-west1") + ) + + @mock.patch("threading.Thread") + def test_get_or_queue_fetch(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + + # Cache miss -> returns None immediately and spawns thread + result = cache.get_or_queue_fetch("my-bucket") + self.assertIsNone(result) + mock_thread.assert_called_once() + + # Second immediate lookup -> returns None, does not spawn another thread (singleflight) + mock_thread.reset_mock() + result2 = cache.get_or_queue_fetch("my-bucket") + self.assertIsNone(result2) + mock_thread.assert_not_called() + + def test_fetch_background_success(self): + client = mock.Mock() + b1 = mock.Mock() + b1.name = "b1" + b1.location = "US-WEST1" + b1.location_type = "region" + b1.project_number = 999 + client.get_bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/999/buckets/b1", "us-west1") + ) + self.assertNotIn("b1", cache._inflight_fetches) + + def test_fetch_background_not_found(self): + client = mock.Mock() + client.get_bucket.side_effect = NotFound("Bucket not found") + cache = BucketMetadataCache(client) + cache.update_cache("b1", "projects/_/buckets/b1", "global") + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertNotIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_fetches) + + def test_fetch_background_forbidden(self): + client = mock.Mock() + client.get_bucket.side_effect = api_exceptions.Forbidden("403") + cache = BucketMetadataCache(client) + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/_/buckets/b1", "global") + ) + self.assertNotIn("b1", cache._inflight_fetches) + + @mock.patch("threading.Thread") + def test_clear_and_evict(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + + cache.update_cache("b1", "dest1", "loc1") + cache.evict("b1") + self.assertNotIn("b1", cache._cache) + + cache.update_cache("b2", "dest2", "loc2") + cache.clear() + self.assertNotIn("b2", cache._cache) diff --git a/packages/google-cloud-storage/tests/unit/test__lru_cache.py b/packages/google-cloud-storage/tests/unit/test__lru_cache.py new file mode 100644 index 000000000000..2f3fb59527a3 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/test__lru_cache.py @@ -0,0 +1,101 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.storage._lru_cache import LRUCache + + +def test_lru_cache_capacity(): + cache = LRUCache(capacity=3) + assert cache.capacity == 3 + + with pytest.raises(ValueError): + LRUCache(capacity=0) + + with pytest.raises(ValueError): + LRUCache(capacity=-1) + + +def test_lru_cache_put_and_get(): + cache = LRUCache(capacity=2) + assert cache.get("a") is None + assert cache.get("a", default="default") == "default" + + cache.put("a", 1) + assert cache.get("a") == 1 + assert len(cache) == 1 + assert "a" in cache + + cache.put("b", 2) + assert cache.get("b") == 2 + assert len(cache) == 2 + assert "b" in cache + + +def test_lru_cache_eviction(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + # Access "a" so "b" becomes least recently used + assert cache.get("a") == 1 + + # Put "c" should evict "b" + cache.put("c", 3) + + assert "b" not in cache + assert cache.get("b") is None + assert cache.get("a") == 1 + assert cache.get("c") == 3 + assert len(cache) == 2 + + +def test_lru_cache_update(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + # Update "a", so it becomes most recently used + cache.put("a", 10) + + # Put "c" should evict "b" + cache.put("c", 3) + + assert "b" not in cache + assert cache.get("a") == 10 + assert cache.get("c") == 3 + + +def test_lru_cache_clear(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + cache.clear() + assert len(cache) == 0 + assert "a" not in cache + assert "b" not in cache + + +def test_lru_cache_delete(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + cache.delete("a") + assert len(cache) == 1 + assert "a" not in cache + assert cache.get("a") is None + assert cache.get("b") == 2 From ec3efa7697ee42997c36e30cea86c6101fc48f8f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 16 May 2026 13:48:17 +0000 Subject: [PATCH 2/5] feat(storage): add check_and_evict background existence verification and debug logging on exceptions --- .../cloud/storage/_bucket_metadata_cache.py | 29 +++++++++ .../google/cloud/storage/_helpers.py | 61 +++++++++++++++++-- .../google/cloud/storage/_http.py | 44 ++++++++++++- .../tests/unit/test__bucket_metadata_cache.py | 45 ++++++++++++++ 4 files changed, 172 insertions(+), 7 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py index 72069445ee9b..eb6d3e88082b 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py +++ b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py @@ -35,6 +35,7 @@ def __init__(self, client, max_size=10000): self._cache = LRUCache(max_size) self._lock = threading.Lock() self._inflight_fetches = set() + self._inflight_checks = set() def get_or_queue_fetch(self, bucket_name): """Retrieve bucket metadata or queue a background fetch on cache miss. @@ -57,6 +58,33 @@ def get_or_queue_fetch(self, bucket_name): ).start() return None + def check_and_evict(self, bucket_name): + """Asynchronously verify if a bucket exists on 404 and evict if deleted.""" + with self._lock: + if bucket_name not in self._cache: + return + if bucket_name in self._inflight_checks: + return + self._inflight_checks.add(bucket_name) + threading.Thread( + target=self._verify_existence_background, + args=(bucket_name,), + daemon=True, + ).start() + + def _verify_existence_background(self, bucket_name): + try: + bucket = self._client.bucket(bucket_name) + if not bucket.exists(): + self.evict(bucket_name) + except Exception as e: + logger.debug( + f"Background verification for bucket existence failed for {bucket_name}: {e}" + ) + finally: + with self._lock: + self._inflight_checks.discard(bucket_name) + def _fetch_background(self, bucket_name): """Asynchronously fetch bucket metadata and update the cache.""" try: @@ -113,3 +141,4 @@ def clear(self): with self._lock: self._cache.clear() self._inflight_fetches.clear() + self._inflight_checks.clear() diff --git a/packages/google-cloud-storage/google/cloud/storage/_helpers.py b/packages/google-cloud-storage/google/cloud/storage/_helpers.py index 257a7358e71c..83e444917d8d 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_helpers.py +++ b/packages/google-cloud-storage/google/cloud/storage/_helpers.py @@ -19,13 +19,18 @@ import base64 import datetime +import logging import os import secrets import sys +from contextlib import contextmanager from hashlib import md5 from urllib.parse import urlsplit, urlunsplit from uuid import uuid4 +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound + from google.auth import environment_vars from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -37,6 +42,8 @@ create_trace_span as _base_create_trace_span, ) +_logger = logging.getLogger(__name__) + STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme. """Environment variable defining host for Storage emulator.""" @@ -211,8 +218,10 @@ def _get_aco_attributes(self): if callable(bucket_name): try: bucket_name = bucket_name() - except Exception: - pass + except Exception as e: + _logger.debug( + f"Failed callable bucket_name resolution in _get_aco_attributes: {e}" + ) if cache and bucket_name and isinstance(bucket_name, str): try: @@ -223,16 +232,58 @@ def _get_aco_attributes(self): "gcp.resource.destination.id": dest_id, "gcp.resource.destination.location": loc, } - except Exception: - pass + except Exception as e: + _logger.debug( + f"Failed cache.get_or_queue_fetch in _get_aco_attributes: {e}" + ) return {} + @contextmanager def _create_trace_span(self, name, attributes=None, **kwargs): + from google.cloud.storage.blob import Blob + from google.cloud.storage.bucket import Bucket + aco_attrs = self._get_aco_attributes() if attributes is None: attributes = {} attributes.update(aco_attrs) - return _base_create_trace_span(name, attributes=attributes, **kwargs) + with _base_create_trace_span(name, attributes=attributes, **kwargs) as span: + try: + yield span + except (NotFound, api_exceptions.NotFound): + if isinstance(self, Bucket): + cache = getattr(self.client, "_bucket_metadata_cache", None) + bucket_name = self.name + elif isinstance(self, Blob): + bucket = getattr(self, "bucket", None) + cache = ( + getattr(bucket.client, "_bucket_metadata_cache", None) + if bucket and hasattr(bucket, "client") + else None + ) + bucket_name = ( + getattr(bucket, "name", None) if bucket else None + ) + else: + cache = None + bucket_name = None + + if callable(bucket_name): + try: + bucket_name = bucket_name() + except Exception as e: + _logger.debug( + f"Failed callable bucket_name resolution on 404 in _create_trace_span: {e}" + ) + + if cache and bucket_name and isinstance(bucket_name, str): + try: + cache.check_and_evict(bucket_name) + except Exception as e: + _logger.debug( + f"Failed cache.check_and_evict on 404 in _create_trace_span: {e}" + ) + raise def _encryption_headers(self): """Return any encryption headers needed to fetch the object. diff --git a/packages/google-cloud-storage/google/cloud/storage/_http.py b/packages/google-cloud-storage/google/cloud/storage/_http.py index 64d7454529ee..e01609f05d91 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_http.py +++ b/packages/google-cloud-storage/google/cloud/storage/_http.py @@ -15,11 +15,17 @@ """Create / interact with Google Cloud Storage connections.""" import functools +import logging +import re +from google.api_core import exceptions as api_exceptions from google.cloud import _http +from google.cloud.exceptions import NotFound from google.cloud.storage import __version__, _helpers from google.cloud.storage._opentelemetry_tracing import create_trace_span +logger = logging.getLogger(__name__) + class Connection(_http.JSONConnection): """A connection to Google Cloud Storage via the JSON REST API. @@ -71,11 +77,28 @@ def api_request(self, *args, **kwargs): span_attributes = { "gccl-invocation-id": invocation_id, } + client = self._client + if hasattr(client, "_bucket_metadata_cache") and client._bucket_metadata_cache: + match = re.search(r"/b/([^/?#]+)", kwargs.get("path", "")) + if match: + try: + cached = client._bucket_metadata_cache.get_or_queue_fetch( + match.group(1) + ) + if cached and isinstance(cached, tuple) and len(cached) == 2: + dest_id, loc = cached + span_attributes["gcp.resource.destination.id"] = dest_id + span_attributes["gcp.resource.destination.location"] = loc + except Exception as e: + logger.debug( + f"Failed cache.get_or_queue_fetch in api_request: {e}" + ) + call = functools.partial(super(Connection, self).api_request, *args, **kwargs) with create_trace_span( name="Storage.Connection.api_request", attributes=span_attributes, - client=self._client, + client=client, api_request=kwargs, retry=retry, ): @@ -87,4 +110,21 @@ def api_request(self, *args, **kwargs): pass if retry: call = retry(call) - return call() + try: + return call() + except (NotFound, api_exceptions.NotFound): + if ( + hasattr(client, "_bucket_metadata_cache") + and client._bucket_metadata_cache + ): + match = re.search(r"/b/([^/?#]+)", kwargs.get("path", "")) + if match: + try: + client._bucket_metadata_cache.check_and_evict( + match.group(1) + ) + except Exception as e: + logger.debug( + f"Failed cache.check_and_evict on 404 in api_request: {e}" + ) + raise diff --git a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py index 22a94cb23fa6..02d0487ca4e3 100644 --- a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py +++ b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py @@ -157,3 +157,48 @@ def test_clear_and_evict(self, mock_thread): cache.update_cache("b2", "dest2", "loc2") cache.clear() self.assertNotIn("b2", cache._cache) + + @mock.patch("threading.Thread") + def test_check_and_evict_queue(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + cache.update_cache("b1", "dest1", "loc1") + + cache.check_and_evict("b1") + mock_thread.assert_called_once() + self.assertIn("b1", cache._inflight_checks) + + # Second immediate check -> singleflight + mock_thread.reset_mock() + cache.check_and_evict("b1") + mock_thread.assert_not_called() + + def test_verify_existence_background_exists(self): + client = mock.Mock() + b1 = mock.Mock() + b1.exists.return_value = True + client.bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache.update_cache("b1", "dest1", "loc1") + cache._inflight_checks.add("b1") + + cache._verify_existence_background("b1") + + self.assertIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_checks) + + def test_verify_existence_background_deleted(self): + client = mock.Mock() + b1 = mock.Mock() + b1.exists.return_value = False + client.bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache.update_cache("b1", "dest1", "loc1") + cache._inflight_checks.add("b1") + + cache._verify_existence_background("b1") + + self.assertNotIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_checks) From 9c5981bb95ce4f91989702512638bc68b6e16749 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 17 May 2026 14:22:58 +0000 Subject: [PATCH 3/5] fix(storage): cleanup linter warnings and refine trace attribute dict copying --- .../google/cloud/storage/_helpers.py | 12 ++++-------- .../google/cloud/storage/_http.py | 4 +--- .../google/cloud/storage/_lru_cache.py | 2 +- .../tests/unit/test__bucket_metadata_cache.py | 1 - .../tests/unit/test__lru_cache.py | 16 ++++++++-------- 5 files changed, 14 insertions(+), 21 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/_helpers.py b/packages/google-cloud-storage/google/cloud/storage/_helpers.py index 83e444917d8d..a1c31cb69afc 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_helpers.py +++ b/packages/google-cloud-storage/google/cloud/storage/_helpers.py @@ -243,11 +243,9 @@ def _create_trace_span(self, name, attributes=None, **kwargs): from google.cloud.storage.blob import Blob from google.cloud.storage.bucket import Bucket - aco_attrs = self._get_aco_attributes() - if attributes is None: - attributes = {} - attributes.update(aco_attrs) - with _base_create_trace_span(name, attributes=attributes, **kwargs) as span: + span_attrs = dict(attributes) if attributes else {} + span_attrs.update(self._get_aco_attributes()) + with _base_create_trace_span(name, attributes=span_attrs, **kwargs) as span: try: yield span except (NotFound, api_exceptions.NotFound): @@ -261,9 +259,7 @@ def _create_trace_span(self, name, attributes=None, **kwargs): if bucket and hasattr(bucket, "client") else None ) - bucket_name = ( - getattr(bucket, "name", None) if bucket else None - ) + bucket_name = getattr(bucket, "name", None) if bucket else None else: cache = None bucket_name = None diff --git a/packages/google-cloud-storage/google/cloud/storage/_http.py b/packages/google-cloud-storage/google/cloud/storage/_http.py index e01609f05d91..77c006a19cf8 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_http.py +++ b/packages/google-cloud-storage/google/cloud/storage/_http.py @@ -90,9 +90,7 @@ def api_request(self, *args, **kwargs): span_attributes["gcp.resource.destination.id"] = dest_id span_attributes["gcp.resource.destination.location"] = loc except Exception as e: - logger.debug( - f"Failed cache.get_or_queue_fetch in api_request: {e}" - ) + logger.debug(f"Failed cache.get_or_queue_fetch in api_request: {e}") call = functools.partial(super(Connection, self).api_request, *args, **kwargs) with create_trace_span( diff --git a/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py index 8a208e0ea6c8..23a37cf846d3 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py +++ b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py @@ -15,7 +15,7 @@ """A Least Recently Used (LRU) cache implementation.""" from collections import OrderedDict -from typing import Any, Generic, Optional, TypeVar +from typing import Generic, Optional, TypeVar K = TypeVar("K") V = TypeVar("V") diff --git a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py index 02d0487ca4e3..39f6975ded4c 100644 --- a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py +++ b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import unittest from unittest import mock diff --git a/packages/google-cloud-storage/tests/unit/test__lru_cache.py b/packages/google-cloud-storage/tests/unit/test__lru_cache.py index 2f3fb59527a3..62b612be5513 100644 --- a/packages/google-cloud-storage/tests/unit/test__lru_cache.py +++ b/packages/google-cloud-storage/tests/unit/test__lru_cache.py @@ -48,13 +48,13 @@ def test_lru_cache_eviction(): cache = LRUCache(capacity=2) cache.put("a", 1) cache.put("b", 2) - + # Access "a" so "b" becomes least recently used assert cache.get("a") == 1 - + # Put "c" should evict "b" cache.put("c", 3) - + assert "b" not in cache assert cache.get("b") is None assert cache.get("a") == 1 @@ -66,13 +66,13 @@ def test_lru_cache_update(): cache = LRUCache(capacity=2) cache.put("a", 1) cache.put("b", 2) - + # Update "a", so it becomes most recently used cache.put("a", 10) - + # Put "c" should evict "b" cache.put("c", 3) - + assert "b" not in cache assert cache.get("a") == 10 assert cache.get("c") == 3 @@ -82,7 +82,7 @@ def test_lru_cache_clear(): cache = LRUCache(capacity=2) cache.put("a", 1) cache.put("b", 2) - + cache.clear() assert len(cache) == 0 assert "a" not in cache @@ -93,7 +93,7 @@ def test_lru_cache_delete(): cache = LRUCache(capacity=2) cache.put("a", 1) cache.put("b", 2) - + cache.delete("a") assert len(cache) == 1 assert "a" not in cache From 14e0ec96749ee690cd72de23f23d9315d515874a Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 17 May 2026 15:43:19 +0000 Subject: [PATCH 4/5] fix(storage): fix CI failures by gating ACO tracing and cache eviction behind OpenTelemetry enablement Ensure that BucketMetadataCache lookups and asynchronous cache eviction threads only execute when OpenTelemetry tracing is installed and enabled. Also safely access _extra_headers on Client objects. TAG=agy CONV=c671fa00-7189-45b9-a5af-12f4c7a7c486 --- .../google/cloud/storage/_helpers.py | 6 ++++++ .../google/cloud/storage/_http.py | 12 +++++++++--- .../google/cloud/storage/blob.py | 4 ++-- .../google/cloud/storage/transfer_manager.py | 2 +- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/_helpers.py b/packages/google-cloud-storage/google/cloud/storage/_helpers.py index a1c31cb69afc..34ccbba91667 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_helpers.py +++ b/packages/google-cloud-storage/google/cloud/storage/_helpers.py @@ -40,6 +40,8 @@ ) from google.cloud.storage._opentelemetry_tracing import ( create_trace_span as _base_create_trace_span, + enable_otel_traces, + HAS_OPENTELEMETRY, ) _logger = logging.getLogger(__name__) @@ -196,6 +198,8 @@ def _require_client(self, client): return client def _get_aco_attributes(self): + if not HAS_OPENTELEMETRY or not enable_otel_traces: + return {} from google.cloud.storage.blob import Blob from google.cloud.storage.bucket import Bucket @@ -249,6 +253,8 @@ def _create_trace_span(self, name, attributes=None, **kwargs): try: yield span except (NotFound, api_exceptions.NotFound): + if not HAS_OPENTELEMETRY or not enable_otel_traces: + raise if isinstance(self, Bucket): cache = getattr(self.client, "_bucket_metadata_cache", None) bucket_name = self.name diff --git a/packages/google-cloud-storage/google/cloud/storage/_http.py b/packages/google-cloud-storage/google/cloud/storage/_http.py index 77c006a19cf8..d11a01a12a7b 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_http.py +++ b/packages/google-cloud-storage/google/cloud/storage/_http.py @@ -22,7 +22,11 @@ from google.cloud import _http from google.cloud.exceptions import NotFound from google.cloud.storage import __version__, _helpers -from google.cloud.storage._opentelemetry_tracing import create_trace_span +from google.cloud.storage._opentelemetry_tracing import ( + create_trace_span, + enable_otel_traces, + HAS_OPENTELEMETRY, +) logger = logging.getLogger(__name__) @@ -78,7 +82,7 @@ def api_request(self, *args, **kwargs): "gccl-invocation-id": invocation_id, } client = self._client - if hasattr(client, "_bucket_metadata_cache") and client._bucket_metadata_cache: + if HAS_OPENTELEMETRY and enable_otel_traces and hasattr(client, "_bucket_metadata_cache") and client._bucket_metadata_cache: match = re.search(r"/b/([^/?#]+)", kwargs.get("path", "")) if match: try: @@ -112,7 +116,9 @@ def api_request(self, *args, **kwargs): return call() except (NotFound, api_exceptions.NotFound): if ( - hasattr(client, "_bucket_metadata_cache") + HAS_OPENTELEMETRY + and enable_otel_traces + and hasattr(client, "_bucket_metadata_cache") and client._bucket_metadata_cache ): match = re.search(r"/b/([^/?#]+)", kwargs.get("path", "")) diff --git a/packages/google-cloud-storage/google/cloud/storage/blob.py b/packages/google-cloud-storage/google/cloud/storage/blob.py index 7b4c6b0a0e79..de3192e08abe 100644 --- a/packages/google-cloud-storage/google/cloud/storage/blob.py +++ b/packages/google-cloud-storage/google/cloud/storage/blob.py @@ -1883,7 +1883,7 @@ def _get_upload_arguments(self, client, content_type, filename=None, command=Non client._connection.user_agent, content_type, command=command ), **_get_encryption_headers(self._encryption_key), - **client._extra_headers, + **getattr(client, "_extra_headers", {}), } object_metadata = self._get_writable_metadata() return headers, object_metadata, content_type @@ -4649,7 +4649,7 @@ def _prep_and_do_download( headers = { **_get_default_headers(client._connection.user_agent, command=command), **headers, - **client._extra_headers, + **getattr(client, "_extra_headers", {}), } transport = client._http diff --git a/packages/google-cloud-storage/google/cloud/storage/transfer_manager.py b/packages/google-cloud-storage/google/cloud/storage/transfer_manager.py index 46709da8d284..6486696021fc 100644 --- a/packages/google-cloud-storage/google/cloud/storage/transfer_manager.py +++ b/packages/google-cloud-storage/google/cloud/storage/transfer_manager.py @@ -1382,7 +1382,7 @@ def _reduce_client(cl): _http = None # Can't carry this over client_info = cl._initial_client_info client_options = cl._initial_client_options - extra_headers = cl._extra_headers + extra_headers = getattr(cl, "_extra_headers", {}) return _LazyClient, ( client_object_id, From b99fd2ecad3cfde32b48b79796179ffc8d3fa7e3 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 17 May 2026 16:04:36 +0000 Subject: [PATCH 5/5] style(storage): format _http.py to satisfy linter line length limits TAG=agy CONV=c671fa00-7189-45b9-a5af-12f4c7a7c486 --- .../google-cloud-storage/google/cloud/storage/_http.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/_http.py b/packages/google-cloud-storage/google/cloud/storage/_http.py index d11a01a12a7b..c92d6e064fd5 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_http.py +++ b/packages/google-cloud-storage/google/cloud/storage/_http.py @@ -82,7 +82,12 @@ def api_request(self, *args, **kwargs): "gccl-invocation-id": invocation_id, } client = self._client - if HAS_OPENTELEMETRY and enable_otel_traces and hasattr(client, "_bucket_metadata_cache") and client._bucket_metadata_cache: + if ( + HAS_OPENTELEMETRY + and enable_otel_traces + and hasattr(client, "_bucket_metadata_cache") + and client._bucket_metadata_cache + ): match = re.search(r"/b/([^/?#]+)", kwargs.get("path", "")) if match: try: