diff --git a/.code-samples.meilisearch.yaml b/.code-samples.meilisearch.yaml index 472b1384..413fdcb9 100644 --- a/.code-samples.meilisearch.yaml +++ b/.code-samples.meilisearch.yaml @@ -83,6 +83,8 @@ get_all_tasks_1: |- client.get_tasks() get_task_1: |- client.get_task(1) +get_task_documents_1: |- + client.get_task_documents(1) delete_tasks_1: |- client.delete_tasks({'uids': ['1', '2']}) cancel_tasks_1: |- diff --git a/meilisearch/_httprequests.py b/meilisearch/_httprequests.py index 1e98c3a2..e1120fca 100644 --- a/meilisearch/_httprequests.py +++ b/meilisearch/_httprequests.py @@ -220,6 +220,32 @@ def post_stream( raise MeilisearchCommunicationError(str(err)) from err + def get_stream(self, path: str) -> requests.Response: + """Send a GET request with streaming enabled. + + Returns the raw response object for streaming consumption. + """ + try: + request_path = self.config.url + "/" + path + response = requests.get( + request_path, + timeout=self.config.timeout, + headers=self.headers, + stream=True, + ) + + if not response.ok: + response.raise_for_status() + + return response + + except requests.exceptions.Timeout as err: + raise MeilisearchTimeoutError(str(err)) from err + except requests.exceptions.ConnectionError as err: + raise MeilisearchCommunicationError(str(err)) from err + except requests.exceptions.HTTPError as err: + raise MeilisearchApiError(str(err), response) from err + @staticmethod def __to_json(request: requests.Response) -> Any: if request.content == b"": diff --git a/meilisearch/_utils.py b/meilisearch/_utils.py index e1150cb6..c29649d9 100644 --- a/meilisearch/_utils.py +++ b/meilisearch/_utils.py @@ -1,9 +1,13 @@ +import json +import re from datetime import datetime from functools import lru_cache -from typing import Union +from typing import Any, Dict, List, Union import pydantic +_CONCATENATED_JSON = re.compile(r"(?<=\})\s*(?=\{)") + @lru_cache(maxsize=1) def is_pydantic_2() -> bool: @@ -41,3 +45,28 @@ def iso_to_date_time(iso_date: Union[datetime, str, None]) -> Union[datetime, No reduce = len(split[1]) - 6 reduced = f"{split[0]}.{split[1][:-reduce]}Z" return datetime.strptime(reduced, "%Y-%m-%dT%H:%M:%S.%fZ") + + +def parse_task_documents(raw_documents: str) -> List[Dict[str, Any]]: + """Parse the payload returned by ``GET /tasks/{uid}/documents``. + + The endpoint may return a JSON array, a single JSON object, NDJSON, or + several JSON objects concatenated without a separator. This normalizes all + of those formats into a list of documents. + """ + payload = raw_documents.strip() + if not payload: + return [] + + try: + parsed = json.loads(payload) + except json.JSONDecodeError: + documents: List[Dict[str, Any]] = [] + for line in payload.splitlines(): + for chunk in _CONCATENATED_JSON.split(line): + stripped = chunk.strip() + if stripped: + documents.append(json.loads(stripped)) + return documents + + return parsed if isinstance(parsed, list) else [parsed] diff --git a/meilisearch/client.py b/meilisearch/client.py index bd0ec8fa..523b1579 100644 --- a/meilisearch/client.py +++ b/meilisearch/client.py @@ -783,6 +783,29 @@ def get_task(self, uid: int) -> Task: """ return self.task_handler.get_task(uid) + def get_task_documents(self, uid: int) -> List[Dict[str, Any]]: + """Get the documents added or updated by a task. + + This is an experimental feature; the ``getTaskDocumentsRoute`` experimental + feature must be enabled on the Meilisearch instance. + + Parameters + ---------- + uid: + Identifier of the task. + + Returns + ------- + documents: + List of the documents associated with the task. + + Raises + ------ + MeilisearchApiError + An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors + """ + return self.task_handler.get_task_documents(uid) + def cancel_tasks( self, parameters: MutableMapping[str, Any], *, metadata: Optional[str] = None ) -> TaskInfo: diff --git a/meilisearch/task.py b/meilisearch/task.py index 998746b7..90f07878 100644 --- a/meilisearch/task.py +++ b/meilisearch/task.py @@ -2,10 +2,11 @@ from datetime import datetime from time import sleep -from typing import Any, Mapping, MutableMapping, Optional +from typing import Any, Dict, List, Mapping, MutableMapping, Optional from urllib import parse from meilisearch._httprequests import HttpRequests +from meilisearch._utils import parse_task_documents from meilisearch.config import Config from meilisearch.errors import MeilisearchTimeoutError from meilisearch.models.task import Batch, BatchResults, Task, TaskInfo, TaskResults @@ -122,6 +123,30 @@ def get_task(self, uid: int) -> Task: task = self.http.get(f"{self.config.paths.task}/{uid}") return Task(**task) + def get_task_documents(self, uid: int) -> List[Dict[str, Any]]: + """Get the documents added or updated by a task. + + This is an experimental feature; the ``getTaskDocumentsRoute`` experimental + feature must be enabled on the Meilisearch instance. + + Parameters + ---------- + uid: + Identifier of the task. + + Returns + ------- + documents: + List of the documents associated with the task. + + Raises + ------ + MeilisearchApiError + An error containing details about why Meilisearch can't process your request. Meilisearch error codes are described here: https://www.meilisearch.com/docs/reference/errors/error_codes#meilisearch-errors + """ + response = self.http.get_stream(f"{self.config.paths.task}/{uid}/documents") + return parse_task_documents(response.text) + def cancel_tasks( self, parameters: MutableMapping[str, Any], *, metadata: Optional[str] = None ) -> TaskInfo: diff --git a/tests/client/test_client_task_meilisearch.py b/tests/client/test_client_task_meilisearch.py index 1221a2d1..9e5ebe17 100644 --- a/tests/client/test_client_task_meilisearch.py +++ b/tests/client/test_client_task_meilisearch.py @@ -1,5 +1,7 @@ # pylint: disable=invalid-name +from unittest.mock import MagicMock, patch + import pytest from meilisearch.models.task import TaskInfo @@ -189,3 +191,16 @@ def test_get_batch(client): uid = batches.results[0].uid batch = client.get_batch(uid) assert batch.uid == uid + + +def test_get_task_documents_calls_endpoint_and_parses(client): + """get_task_documents hits /tasks/{uid}/documents and parses the payload.""" + fake_response = MagicMock() + fake_response.text = '{"id": 1}\n{"id": 2}' + with patch.object( + client.task_handler.http, "get_stream", return_value=fake_response + ) as mock_get: + documents = client.get_task_documents(42) + + mock_get.assert_called_once_with("tasks/42/documents") + assert documents == [{"id": 1}, {"id": 2}] diff --git a/tests/test_utils.py b/tests/test_utils.py index 4ca688ce..26fdbd7a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,7 +2,7 @@ import pytest -from meilisearch._utils import is_pydantic_2, iso_to_date_time +from meilisearch._utils import is_pydantic_2, iso_to_date_time, parse_task_documents def test_is_pydantic_2(): @@ -33,6 +33,21 @@ def test_iso_to_date_time_invalid_format(): iso_to_date_time("2023-07-13T23:37:20Z") +@pytest.mark.parametrize( + "raw, expected", + [ + ('[{"id": 1}, {"id": 2}]', [{"id": 1}, {"id": 2}]), # JSON array + ('{"id": 1}', [{"id": 1}]), # single JSON object + ('{"id": 1}\n{"id": 2}', [{"id": 1}, {"id": 2}]), # NDJSON + ('{"id": 1}{"id": 2}', [{"id": 1}, {"id": 2}]), # concatenated, no separator + ("", []), # empty + (" \n ", []), # whitespace only + ], +) +def test_parse_task_documents(raw, expected): + assert parse_task_documents(raw) == expected + + # Refactor to use the unified API to toggle experimental features def reset_network_config(client): client.add_or_update_networks(body={"remotes": {}, "leader": None})