diff --git a/test/collection/test_batch.py b/test/collection/test_batch.py index 0a2cda954..62b6c5da2 100644 --- a/test/collection/test_batch.py +++ b/test/collection/test_batch.py @@ -2,6 +2,12 @@ import pytest +from weaviate.collections.batch.base import ( + _DynamicBatching, + _FixedSizeBatching, + _RateLimitedBatching, + _async_indexing_batch_params, +) from weaviate.collections.batch.grpc_batch import _validate_props from weaviate.collections.classes.batch import MAX_STORED_RESULTS, BatchObjectReturn from weaviate.exceptions import WeaviateInsertInvalidPropertyError @@ -53,3 +59,41 @@ def test_validate_props_raises_for_top_level_vector() -> None: def test_validate_props_raises_for_nested_vector() -> None: with pytest.raises(WeaviateInsertInvalidPropertyError): _validate_props({"vector": [0.1, 0.2]}, nested=True) + + +def test_async_indexing_preserves_rate_limit() -> None: + # Async indexing has no server-side queue feedback, but a configured rate limit + # must still be honoured rather than replaced by large fixed-size batches (#1542). + requested = _RateLimitedBatching(requests_per_minute=100) + + mode, recommended_num_objects, concurrent_requests = _async_indexing_batch_params( + requested, max_batch_size=1000 + ) + + assert mode == requested + assert concurrent_requests == 1 + assert recommended_num_objects == 100 + + +def test_async_indexing_rate_limit_spans_multiple_batches() -> None: + # A rate limit larger than the maximum batch size is split across several batches. + requested = _RateLimitedBatching(requests_per_minute=3000) + + mode, recommended_num_objects, concurrent_requests = _async_indexing_batch_params( + requested, max_batch_size=1000 + ) + + assert mode == requested + assert concurrent_requests == 4 + assert recommended_num_objects == 750 + + +def test_async_indexing_dynamic_falls_back_to_fixed_size() -> None: + # Without a configured rate limit, dynamic batching keeps its large-batch fallback. + mode, recommended_num_objects, concurrent_requests = _async_indexing_batch_params( + _DynamicBatching(), max_batch_size=1000 + ) + + assert mode == _FixedSizeBatching(1000, 10) + assert recommended_num_objects == 1000 + assert concurrent_requests == 10 diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index af6a9ea49..5d59ab51b 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -11,7 +11,7 @@ from concurrent.futures import ThreadPoolExecutor from copy import copy from dataclasses import dataclass, field -from typing import Any, Dict, Generic, List, Optional, Set, TypeVar, Union, cast +from typing import Any, Dict, Generic, List, Optional, Set, Tuple, TypeVar, Union, cast from pydantic import ValidationError from typing_extensions import TypeAlias @@ -267,6 +267,61 @@ class _ServerSideBatching: _DynamicBatching, _FixedSizeBatching, _RateLimitedBatching, _ServerSideBatching ] +# When async indexing is enabled the server does not report `batchStats`, so dynamic +# batching cannot tune itself against the indexing queue and falls back to sending large +# batches. These are the parameters used for that fallback. +_ASYNC_INDEXING_FALLBACK_BATCH_SIZE = 1000 +_ASYNC_INDEXING_FALLBACK_CONCURRENT_REQUESTS = 10 + + +def _rate_limited_batch_params(mode: _RateLimitedBatching, max_batch_size: int) -> Tuple[int, int]: + """Compute (concurrent_requests, recommended_num_objects) for rate-limited batching. + + Args: + mode (_RateLimitedBatching): The configured rate-limited batching mode. + max_batch_size (int): The maximum number of objects allowed in a single batch. + + Returns: + Tuple[int, int]: The number of concurrent requests and the recommended number of + objects per batch that together honour the configured requests-per-minute limit. + """ + concurrent_requests = (mode.requests_per_minute + max_batch_size) // max_batch_size + recommended_num_objects = mode.requests_per_minute // concurrent_requests + return concurrent_requests, recommended_num_objects + + +def _async_indexing_batch_params( + requested_mode: _BatchMode, max_batch_size: int +) -> Tuple[_BatchMode, int, int]: + """Determine batching parameters to use when the server runs in async-indexing mode. + + Dynamic batching relies on the server-reported indexing queue to size its batches. + Under async indexing that signal is unavailable, so dynamic batching falls back to + sending large batches. A configured rate limit must still be honoured in that case, + otherwise the limiter is silently ignored (see issue #1542). + + Args: + requested_mode (_BatchMode): The batching mode originally requested by the user. + max_batch_size (int): The maximum number of objects allowed in a single batch. + + Returns: + Tuple[_BatchMode, int, int]: The batching mode to apply, the recommended number + of objects per batch, and the number of concurrent requests. + """ + if isinstance(requested_mode, _RateLimitedBatching): + concurrent_requests, recommended_num_objects = _rate_limited_batch_params( + requested_mode, max_batch_size + ) + return requested_mode, recommended_num_objects, concurrent_requests + return ( + _FixedSizeBatching( + _ASYNC_INDEXING_FALLBACK_BATCH_SIZE, + _ASYNC_INDEXING_FALLBACK_CONCURRENT_REQUESTS, + ), + _ASYNC_INDEXING_FALLBACK_BATCH_SIZE, + _ASYNC_INDEXING_FALLBACK_CONCURRENT_REQUESTS, + ) + class _BatchBase: def __init__( @@ -302,6 +357,10 @@ def __init__( self.__cluster = _ClusterBatch(self.__connection) self.__batching_mode: _BatchMode = batch_mode + # The mode the user originally requested. `__batching_mode` may be reassigned at + # runtime (e.g. the async-indexing fallback in `__dynamic_batching`), so we keep + # the request separately to know whether a rate limit was configured. + self.__requested_batch_mode: _BatchMode = batch_mode self.__max_batch_size: int = 1000 self.__executor = executor @@ -510,10 +569,13 @@ def batch_send_wrapper() -> None: def __dynamic_batching(self) -> None: status = self.__cluster.get_nodes_status() if "batchStats" not in status[0] or "queueLength" not in status[0]["batchStats"]: - # async indexing - just send a lot - self.__batching_mode = _FixedSizeBatching(1000, 10) - self.__recommended_num_objects = 1000 - self.__concurrent_requests = 10 + # async indexing - send a lot, unless the user configured a rate limit, in + # which case it must still be honoured (see issue #1542). + ( + self.__batching_mode, + self.__recommended_num_objects, + self.__concurrent_requests, + ) = _async_indexing_batch_params(self.__requested_batch_mode, self.__max_batch_size) return rate: int = status[0]["batchStats"]["ratePerSecond"]