Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions test/collection/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import pytest

from weaviate.collections.batch.base import (
_DynamicBatching,
_FixedSizeBatching,
_RateLimitedBatching,
_async_indexing_batch_params,
)
from weaviate.collections.batch.grpc_batch import _validate_props
from weaviate.collections.classes.batch import MAX_STORED_RESULTS, BatchObjectReturn
from weaviate.exceptions import WeaviateInsertInvalidPropertyError
Expand Down Expand Up @@ -53,3 +59,41 @@ def test_validate_props_raises_for_top_level_vector() -> None:
def test_validate_props_raises_for_nested_vector() -> None:
with pytest.raises(WeaviateInsertInvalidPropertyError):
_validate_props({"vector": [0.1, 0.2]}, nested=True)


def test_async_indexing_preserves_rate_limit() -> None:
# Async indexing has no server-side queue feedback, but a configured rate limit
# must still be honoured rather than replaced by large fixed-size batches (#1542).
requested = _RateLimitedBatching(requests_per_minute=100)

mode, recommended_num_objects, concurrent_requests = _async_indexing_batch_params(
requested, max_batch_size=1000
)

assert mode == requested
assert concurrent_requests == 1
assert recommended_num_objects == 100


def test_async_indexing_rate_limit_spans_multiple_batches() -> None:
# A rate limit larger than the maximum batch size is split across several batches.
requested = _RateLimitedBatching(requests_per_minute=3000)

mode, recommended_num_objects, concurrent_requests = _async_indexing_batch_params(
requested, max_batch_size=1000
)

assert mode == requested
assert concurrent_requests == 4
assert recommended_num_objects == 750


def test_async_indexing_dynamic_falls_back_to_fixed_size() -> None:
# Without a configured rate limit, dynamic batching keeps its large-batch fallback.
mode, recommended_num_objects, concurrent_requests = _async_indexing_batch_params(
_DynamicBatching(), max_batch_size=1000
)

assert mode == _FixedSizeBatching(1000, 10)
assert recommended_num_objects == 1000
assert concurrent_requests == 10
72 changes: 67 additions & 5 deletions weaviate/collections/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from dataclasses import dataclass, field
from typing import Any, Dict, Generic, List, Optional, Set, TypeVar, Union, cast
from typing import Any, Dict, Generic, List, Optional, Set, Tuple, TypeVar, Union, cast

from pydantic import ValidationError
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -267,6 +267,61 @@ class _ServerSideBatching:
_DynamicBatching, _FixedSizeBatching, _RateLimitedBatching, _ServerSideBatching
]

# When async indexing is enabled the server does not report `batchStats`, so dynamic
# batching cannot tune itself against the indexing queue and falls back to sending large
# batches. These are the parameters used for that fallback.
_ASYNC_INDEXING_FALLBACK_BATCH_SIZE = 1000
_ASYNC_INDEXING_FALLBACK_CONCURRENT_REQUESTS = 10


def _rate_limited_batch_params(mode: _RateLimitedBatching, max_batch_size: int) -> Tuple[int, int]:
"""Compute (concurrent_requests, recommended_num_objects) for rate-limited batching.

Args:
mode (_RateLimitedBatching): The configured rate-limited batching mode.
max_batch_size (int): The maximum number of objects allowed in a single batch.

Returns:
Tuple[int, int]: The number of concurrent requests and the recommended number of
objects per batch that together honour the configured requests-per-minute limit.
"""
concurrent_requests = (mode.requests_per_minute + max_batch_size) // max_batch_size
recommended_num_objects = mode.requests_per_minute // concurrent_requests
return concurrent_requests, recommended_num_objects


def _async_indexing_batch_params(
requested_mode: _BatchMode, max_batch_size: int
) -> Tuple[_BatchMode, int, int]:
"""Determine batching parameters to use when the server runs in async-indexing mode.

Dynamic batching relies on the server-reported indexing queue to size its batches.
Under async indexing that signal is unavailable, so dynamic batching falls back to
sending large batches. A configured rate limit must still be honoured in that case,
otherwise the limiter is silently ignored (see issue #1542).

Args:
requested_mode (_BatchMode): The batching mode originally requested by the user.
max_batch_size (int): The maximum number of objects allowed in a single batch.

Returns:
Tuple[_BatchMode, int, int]: The batching mode to apply, the recommended number
of objects per batch, and the number of concurrent requests.
"""
if isinstance(requested_mode, _RateLimitedBatching):
concurrent_requests, recommended_num_objects = _rate_limited_batch_params(
requested_mode, max_batch_size
)
return requested_mode, recommended_num_objects, concurrent_requests
return (
_FixedSizeBatching(
_ASYNC_INDEXING_FALLBACK_BATCH_SIZE,
_ASYNC_INDEXING_FALLBACK_CONCURRENT_REQUESTS,
),
_ASYNC_INDEXING_FALLBACK_BATCH_SIZE,
_ASYNC_INDEXING_FALLBACK_CONCURRENT_REQUESTS,
)


class _BatchBase:
def __init__(
Expand Down Expand Up @@ -302,6 +357,10 @@ def __init__(
self.__cluster = _ClusterBatch(self.__connection)

self.__batching_mode: _BatchMode = batch_mode
# The mode the user originally requested. `__batching_mode` may be reassigned at
# runtime (e.g. the async-indexing fallback in `__dynamic_batching`), so we keep
# the request separately to know whether a rate limit was configured.
self.__requested_batch_mode: _BatchMode = batch_mode
self.__max_batch_size: int = 1000

self.__executor = executor
Expand Down Expand Up @@ -510,10 +569,13 @@ def batch_send_wrapper() -> None:
def __dynamic_batching(self) -> None:
status = self.__cluster.get_nodes_status()
if "batchStats" not in status[0] or "queueLength" not in status[0]["batchStats"]:
# async indexing - just send a lot
self.__batching_mode = _FixedSizeBatching(1000, 10)
self.__recommended_num_objects = 1000
self.__concurrent_requests = 10
# async indexing - send a lot, unless the user configured a rate limit, in
# which case it must still be honoured (see issue #1542).
(
self.__batching_mode,
self.__recommended_num_objects,
self.__concurrent_requests,
) = _async_indexing_batch_params(self.__requested_batch_mode, self.__max_batch_size)
return

rate: int = status[0]["batchStats"]["ratePerSecond"]
Expand Down