Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
211 commits
Select commit Hold shift + click to select a range
d2175f1
use replaceable channel wrapper
daniel-sanche Jul 25, 2025
5e107fc
got unit tests working
daniel-sanche Jul 26, 2025
c4a97e1
put back in cache invalidation
daniel-sanche Jul 26, 2025
e71b1d5
added wrapped multicallables to avoid cache invalidation
daniel-sanche Jul 26, 2025
b81a9be
added crosssync, moved close logic back to client
daniel-sanche Jul 29, 2025
a1dffb5
generated sync code
daniel-sanche Jul 29, 2025
e3ec02b
got tests running
daniel-sanche Jul 29, 2025
4e13783
fixed tests
daniel-sanche Jul 29, 2025
7d90a04
remove extra wrapper; added invalidate_stubs helper
daniel-sanche Jul 29, 2025
26cd601
fixed lint
daniel-sanche Jul 29, 2025
375332f
fixed lint
daniel-sanche Jul 29, 2025
428d75a
renamed replaceablechannel to swappablechannel
daniel-sanche Jul 29, 2025
4b39bc5
added tests
daniel-sanche Jul 29, 2025
3f090c2
added docstrings
daniel-sanche Jul 29, 2025
883ceab
Merge branch 'main' into refactor_refresh
daniel-sanche Jul 29, 2025
04c762a
initial commit
daniel-sanche Jul 29, 2025
29dff4d
added back interceptor
daniel-sanche Jul 29, 2025
e4f8238
added metrics to client
daniel-sanche Jul 29, 2025
fcb062e
fixed lint
daniel-sanche Aug 1, 2025
ac8dbe4
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 1, 2025
d155f8a
set up channel interceptions
daniel-sanche Aug 2, 2025
9fece96
added TrackedBackoffGenerator
daniel-sanche Aug 2, 2025
aec2577
fixed lint
daniel-sanche Aug 2, 2025
ec4e847
fixed import
daniel-sanche Aug 2, 2025
0d93889
added stdout handler to test
daniel-sanche Aug 4, 2025
a580fa2
instrumented check_and_mutate
daniel-sanche Aug 4, 2025
bc13b46
added instrumentation to read_modify_write
daniel-sanche Aug 4, 2025
6c3be46
added instrumentation to sample_row_keys
daniel-sanche Aug 4, 2025
ca38615
instrumented mutate_row
daniel-sanche Aug 4, 2025
eb82ae9
added instrumentation to mutate_rows
daniel-sanche Aug 6, 2025
8f99e4e
added operation.cancel
daniel-sanche Aug 6, 2025
0ef3372
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 6, 2025
f8e6603
added operation cancelled to interceptor
daniel-sanche Aug 6, 2025
f5e057e
gave each operation a uuid
daniel-sanche Aug 6, 2025
0f8067c
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 6, 2025
996acf2
instrumented read_rows
daniel-sanche Aug 7, 2025
3eae4aa
fixed lint and mypy
daniel-sanche Aug 7, 2025
8c397bb
return attempt metric on new attempt
daniel-sanche Aug 7, 2025
2c34198
use standard context manager
daniel-sanche Aug 7, 2025
9bd1e07
use default backoff generator
daniel-sanche Aug 7, 2025
4684a72
use default backoff instance
daniel-sanche Aug 7, 2025
98e4006
remove async with for metrics
daniel-sanche Aug 7, 2025
fa865cb
followed same operation management pattern for read_rows as mutate_rows
daniel-sanche Aug 7, 2025
96d1355
require backoff; refactor check
daniel-sanche Aug 7, 2025
9fd4c56
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 7, 2025
ff7e681
fixed lint
daniel-sanche Aug 7, 2025
de5d07b
fixed context manager naming; lint
daniel-sanche Aug 7, 2025
8049be5
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 7, 2025
d73f379
moved first_response_latency to operation
daniel-sanche Aug 7, 2025
a2070f9
Merge branch 'main' into refactor_refresh
daniel-sanche Aug 8, 2025
40fcbe8
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 8, 2025
410cfb8
fixed mypy
daniel-sanche Aug 8, 2025
2adec5a
fixed errors
daniel-sanche Aug 8, 2025
4a4f80a
fixed import
daniel-sanche Aug 8, 2025
14d252b
fixed operation end for read_rows
daniel-sanche Aug 8, 2025
ab30b02
fixed lint
daniel-sanche Aug 8, 2025
bb55c46
made merge_rows into an instance method
daniel-sanche Aug 8, 2025
5ea9f0e
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 11, 2025
708a35a
fixed broken unit tests
daniel-sanche Aug 11, 2025
67c08fd
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 11, 2025
5ae7acc
added set_next to TrackedBackoffGenerator
daniel-sanche Aug 11, 2025
cb32296
added assertions to test_client
daniel-sanche Aug 12, 2025
f07e765
added new test metrics interceptor file
daniel-sanche Aug 26, 2025
a34c01e
first round of tests
daniel-sanche Aug 26, 2025
84f61ee
added metadata capture for failed rpcs
daniel-sanche Aug 26, 2025
d4ae637
added test for starting attempts
daniel-sanche Aug 26, 2025
1fbcadd
added sync tests
daniel-sanche Aug 26, 2025
edacd04
got tests passing
daniel-sanche Aug 26, 2025
c628d21
removed helper class
daniel-sanche Aug 26, 2025
6d585ec
refactoring
daniel-sanche Aug 26, 2025
01e6b36
refactored interceptor
daniel-sanche Aug 26, 2025
05fe577
added unit tests for interceptor
daniel-sanche Aug 26, 2025
4871abd
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 26, 2025
b6eac6c
fixed lint
daniel-sanche Aug 26, 2025
07b3295
feat: added metrics interceptor
daniel-sanche Sep 9, 2025
e3ac131
pulled out operation logic
daniel-sanche Sep 9, 2025
557b54a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2025
3306ad7
fixed missing convert
daniel-sanche Sep 9, 2025
65f15de
updated system test
daniel-sanche Sep 9, 2025
16c5b6a
fixed lint
daniel-sanche Sep 9, 2025
00cc52f
fixed annotation
daniel-sanche Sep 9, 2025
019a8c2
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 9, 2025
4ccfdab
removed duplicate import
daniel-sanche Sep 9, 2025
bd9ab70
added more tests
daniel-sanche Sep 9, 2025
50b3e48
remove operation metadata key
daniel-sanche Sep 9, 2025
2b35127
assign metadata directly
daniel-sanche Sep 9, 2025
9cbda99
added test
daniel-sanche Sep 9, 2025
73f4b3c
replace details mocks with real type
daniel-sanche Sep 9, 2025
d5e012d
strip operation id from metadata before request
daniel-sanche Sep 9, 2025
486068b
added try; generated sync
daniel-sanche Sep 9, 2025
bb00b8b
re-generated sync classes; removed test
daniel-sanche Sep 15, 2025
a4746f0
added new file for system tests
daniel-sanche Sep 15, 2025
417faf0
got tests to run
daniel-sanche Sep 15, 2025
4c8ffe5
added test for read_modify_write
daniel-sanche Sep 15, 2025
542dfb9
added test for check_and_mutate
daniel-sanche Sep 15, 2025
b815674
added test for sample_row_keys
daniel-sanche Sep 15, 2025
f84151f
pass down metadata in other rpcs
daniel-sanche Sep 15, 2025
b57ab24
added stubs for other rpcs
daniel-sanche Sep 15, 2025
a35a6b6
added start of system tests for metrics
daniel-sanche Sep 15, 2025
c89f6d4
loosen test
daniel-sanche Sep 15, 2025
f0b8983
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Sep 15, 2025
55ff4d1
fixed read_rows
daniel-sanche Sep 15, 2025
614c32a
refactored system tests
daniel-sanche Sep 15, 2025
d3f8dbf
moved event loop back into test files
daniel-sanche Sep 15, 2025
2a93a28
implemented full success tests for rpcs
daniel-sanche Sep 15, 2025
3fb4f13
added failure tests for checK_and_mutate
daniel-sanche Sep 16, 2025
66d3254
added unauthenticated error test
daniel-sanche Sep 16, 2025
eb8a229
added stubs
daniel-sanche Sep 16, 2025
761cf9c
added tests for sample_row_keys
daniel-sanche Sep 16, 2025
c33b813
sped up test
daniel-sanche Sep 16, 2025
661d6eb
added read_modify_write tests
daniel-sanche Sep 16, 2025
3935744
added operation logic to retry factory
daniel-sanche Sep 16, 2025
0a9af2d
added read_rows tests
daniel-sanche Sep 16, 2025
b52c871
added read_row and read_rows_sharded tests
daniel-sanche Sep 17, 2025
759d198
added bulk_mutate_row tests
daniel-sanche Sep 17, 2025
18fba8a
added batcher tests
daniel-sanche Sep 17, 2025
c107118
added mutate_row tests
daniel-sanche Sep 17, 2025
2a42b90
fixed read_rows_sharded test
daniel-sanche Sep 17, 2025
fb84b4b
refacotred tracked exception factory
daniel-sanche Sep 17, 2025
c560589
fixed lint
daniel-sanche Sep 17, 2025
0f4ee8d
use cancelled error to test non-retryable methods
daniel-sanche Sep 18, 2025
2683b50
added aclose test to read_rows_stream
daniel-sanche Sep 18, 2025
685b62a
changed test names
daniel-sanche Sep 18, 2025
2336e64
fixed warnings
daniel-sanche Sep 18, 2025
e239d56
adding sync tests
daniel-sanche Sep 19, 2025
1a54a69
capture status for unary failed attempts
daniel-sanche Sep 19, 2025
a23d920
added test for streaming retries
daniel-sanche Sep 19, 2025
b33458f
added test
daniel-sanche Sep 20, 2025
3b146f5
fixed tracked flow control
daniel-sanche Sep 20, 2025
b91da1c
updated mutate_rows test
daniel-sanche Sep 22, 2025
64ce0a4
improved retry instrumentation
daniel-sanche Sep 22, 2025
7eb50d8
added trackers to data model
daniel-sanche Sep 22, 2025
c1baf81
fixed bug in application blocking time
daniel-sanche Sep 23, 2025
9d63963
record last attempt in exception factory
daniel-sanche Sep 23, 2025
d3f9d05
simplified helper
daniel-sanche Sep 23, 2025
1d26452
added metric system tests; solved issues
daniel-sanche Sep 23, 2025
d1b74fc
swapped out custom metadata with contextvar
daniel-sanche Sep 25, 2025
8707d40
simplified interceptor
daniel-sanche Sep 25, 2025
e9877fd
removed cancel from spec
daniel-sanche Sep 25, 2025
f92d66b
fixed tests
daniel-sanche Sep 25, 2025
d9de44d
removed dead pointer
daniel-sanche Sep 25, 2025
253284f
replace custom metadata with contextvars
daniel-sanche Sep 25, 2025
2e0e402
updated sync files
daniel-sanche Sep 25, 2025
88644b2
fixed lint
daniel-sanche Sep 26, 2025
0340fce
fixed tests
daniel-sanche Sep 26, 2025
6798ea2
fixed more tests
daniel-sanche Sep 30, 2025
c96dd25
removed unneeded kwargs
daniel-sanche Sep 30, 2025
7eb83a8
fixed unit tests
daniel-sanche Sep 30, 2025
61f8b85
ran blacken
daniel-sanche Sep 30, 2025
ee72ae9
fixed tests
daniel-sanche Sep 30, 2025
dd7453b
removed unneeded kwargs
daniel-sanche Sep 30, 2025
911a299
fixed lint
daniel-sanche Sep 30, 2025
bebeb70
use contextvars
daniel-sanche Sep 30, 2025
5ba2bbe
pulled in improvements to data model
daniel-sanche Sep 30, 2025
4098fd9
removed cancel from spec
daniel-sanche Sep 30, 2025
144d75e
broke out streaming wrapper into static function
daniel-sanche Sep 30, 2025
e8785ac
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
c1cc24d
fixed tests
daniel-sanche Sep 30, 2025
231f38b
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Sep 30, 2025
c433f3c
fixed lint
daniel-sanche Sep 30, 2025
85d4cf0
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
ed9d3cf
fixed lint
daniel-sanche Oct 1, 2025
abaf5b2
fixed flakes
daniel-sanche Oct 2, 2025
bc6036e
added close to metric spec
daniel-sanche Oct 2, 2025
18ec330
fixed lint
daniel-sanche Oct 2, 2025
f1be54a
added test
daniel-sanche Oct 3, 2025
1832824
generated sync
daniel-sanche Oct 3, 2025
6bbac87
moved all fixtures into one place
daniel-sanche Oct 3, 2025
4fd97e1
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Oct 3, 2025
2bde30d
fixed event loop error
daniel-sanche Oct 3, 2025
08f5ce4
remvoed docstring
daniel-sanche Oct 6, 2025
9a33d86
Merge branch 'main' into csm_1_data_model
daniel-sanche Nov 22, 2025
e94e143
renamed methods
daniel-sanche Nov 22, 2025
596e54e
improved from_context
daniel-sanche Nov 22, 2025
16c8ed8
changed buffer time
daniel-sanche Nov 26, 2025
4887830
ran blacken
daniel-sanche Nov 26, 2025
5c80c79
use time mocks
daniel-sanche Nov 26, 2025
8de4875
fixed lint
daniel-sanche Nov 26, 2025
6a4d742
loosened test tolerances
daniel-sanche Nov 26, 2025
a474560
removed metrics superclass from interceptor
daniel-sanche Nov 26, 2025
5390813
fixed lint
daniel-sanche Nov 26, 2025
3e0d134
improved comments
daniel-sanche Nov 26, 2025
6b48242
moved interceptor into _metrics
daniel-sanche Nov 26, 2025
dcf3d0a
pulled out tracking into new file
daniel-sanche Nov 26, 2025
c94e4ff
simplified wrapper method
daniel-sanche Nov 26, 2025
3a87a35
Revert "moved interceptor into _metrics"
daniel-sanche Nov 26, 2025
b5c361b
moved tracked retry out of autogen folder
daniel-sanche Nov 26, 2025
1b0b857
fixed typing
daniel-sanche Nov 26, 2025
ac315d0
added tests
daniel-sanche Nov 27, 2025
87e78d1
removed unneeded imports
daniel-sanche Nov 27, 2025
54b7208
ran blacken
daniel-sanche Nov 27, 2025
819e1ae
Moved retry trackers into own file
daniel-sanche Nov 27, 2025
377044a
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Nov 27, 2025
9fe5d4e
use new tracked_retry function
daniel-sanche Nov 27, 2025
22eb2e1
added docstring
daniel-sanche Nov 27, 2025
0ec8d14
fixed type
daniel-sanche Nov 27, 2025
fa25c2b
import annotations
daniel-sanche Dec 3, 2025
f9ac548
Update google/cloud/bigtable/data/_metrics/data_model.py
daniel-sanche Dec 19, 2025
284c8a6
addressed PR comments
daniel-sanche Jan 6, 2026
16f7d57
improved state machine
daniel-sanche Jan 6, 2026
d167487
added negative check
daniel-sanche Jan 6, 2026
c408e14
removed uuid
daniel-sanche Jan 6, 2026
a70824b
fixed lint
daniel-sanche Jan 6, 2026
4ceab60
added cryptography to prerelease deps
daniel-sanche Jan 6, 2026
78c640b
updated state diagram
daniel-sanche Jan 15, 2026
5b8c22b
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Jan 15, 2026
d3a9013
removed read_rows and mutate_rows instrumentation
daniel-sanche Jan 15, 2026
83c806a
reverted unneeded files
daniel-sanche Jan 15, 2026
bacc505
fixed tests
daniel-sanche Jan 15, 2026
ced3ee3
fixed lint
daniel-sanche Jan 15, 2026
700ebe3
Merge branch 'main' into csm_2_instrumentation
daniel-sanche Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 81 additions & 70 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics import OperationType
from google.cloud.bigtable.data._metrics import tracked_retry

from google.cloud.bigtable.data._cross_sync import CrossSync

Expand Down Expand Up @@ -1420,26 +1422,28 @@ async def sample_row_keys(
retryable_excs = _get_retryable_errors(retryable_errors, self)
predicate = retries.if_exception_type(*retryable_excs)

sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

@CrossSync.convert
async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
with self._metrics.create_operation(
OperationType.SAMPLE_ROW_KEYS
) as operation_metric:

@CrossSync.convert
async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
)
return [(s.row_key, s.offset_bytes) async for s in results]

return await tracked_retry(
retry_fn=CrossSync.retry_target,
operation=operation_metric,
target=execute_rpc,
predicate=predicate,
timeout=operation_timeout,
)
return [(s.row_key, s.offset_bytes) async for s in results]

return await CrossSync.retry_target(
execute_rpc,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)

@CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"})
def mutations_batcher(
Expand Down Expand Up @@ -1550,28 +1554,29 @@ async def mutate_row(
# mutations should not be retried
predicate = retries.if_exception_type()

sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return await CrossSync.retry_target(
target,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
with self._metrics.create_operation(
OperationType.MUTATE_ROW
) as operation_metric:
target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return await tracked_retry(
retry_fn=CrossSync.retry_target,
operation=operation_metric,
target=target,
predicate=predicate,
timeout=operation_timeout,
)

@CrossSync.convert
async def bulk_mutate_rows(
Expand Down Expand Up @@ -1682,21 +1687,25 @@ async def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
result = await self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched

with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE):
result = await self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb()
if predicate is not None
else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched

@CrossSync.convert
async def read_modify_write_row(
Expand Down Expand Up @@ -1736,20 +1745,22 @@ async def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
result = await self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
# construct Row from result
return Row._from_pb(result.row)

with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE):
result = await self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
# construct Row from result
return Row._from_pb(result.row)

@CrossSync.convert
async def close(self):
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def _retry_exception_factory(
tuple[Exception, Exception|None]:
tuple of the exception to raise, and a cause exception if applicable
"""
exc_list = exc_list.copy()
if reason == RetryFailureReason.TIMEOUT:
timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
# if failed due to timeout, raise deadline exceeded as primary exception
Expand Down
144 changes: 77 additions & 67 deletions google/cloud/bigtable/data/_sync_autogen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics import OperationType
from google.cloud.bigtable.data._metrics import tracked_retry
from google.cloud.bigtable.data._cross_sync import CrossSync
from typing import Iterable
from grpc import insecure_channel
Expand Down Expand Up @@ -1173,25 +1175,27 @@ def sample_row_keys(
)
retryable_excs = _get_retryable_errors(retryable_errors, self)
predicate = retries.if_exception_type(*retryable_excs)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

def execute_rpc():
results = self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
with self._metrics.create_operation(
OperationType.SAMPLE_ROW_KEYS
) as operation_metric:

def execute_rpc():
results = self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
)
return [(s.row_key, s.offset_bytes) for s in results]

return tracked_retry(
retry_fn=CrossSync._Sync_Impl.retry_target,
operation=operation_metric,
target=execute_rpc,
predicate=predicate,
timeout=operation_timeout,
)
return [(s.row_key, s.offset_bytes) for s in results]

return CrossSync._Sync_Impl.retry_target(
execute_rpc,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)

def mutations_batcher(
self,
Expand Down Expand Up @@ -1292,27 +1296,29 @@ def mutate_row(
)
else:
predicate = retries.if_exception_type()
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return CrossSync._Sync_Impl.retry_target(
target,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
with self._metrics.create_operation(
OperationType.MUTATE_ROW
) as operation_metric:
target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return tracked_retry(
retry_fn=CrossSync._Sync_Impl.retry_target,
operation=operation_metric,
target=target,
predicate=predicate,
timeout=operation_timeout,
)

def bulk_mutate_rows(
self,
Expand Down Expand Up @@ -1416,21 +1422,24 @@ def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
result = self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched
with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE):
result = self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb()
if predicate is not None
else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched

def read_modify_write_row(
self,
Expand Down Expand Up @@ -1467,19 +1476,20 @@ def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
result = self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return Row._from_pb(result.row)
with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE):
result = self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return Row._from_pb(result.row)

def close(self):
"""Called to close the Table instance and release any resources held by it."""
Expand Down
4 changes: 0 additions & 4 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
script_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(script_path)

pytest_plugins = [
"data.setup_fixtures",
]


@pytest.fixture(scope="session")
def event_loop():
Expand Down
Loading
Loading