From bf93bdad1ab12265039be107c39edf301e4d5e7f Mon Sep 17 00:00:00 2001 From: Adam Dangoor Date: Wed, 25 Feb 2026 08:58:51 +0000 Subject: [PATCH 1/3] Extract shared helpers to reduce sync/async code duplication Move duplicated exception-raising, response-parsing, and request-building logic into shared modules (_vws_common.py, _query_common.py) so the sync and async client implementations can share the same code. Co-Authored-By: Claude Sonnet 4.6 --- src/vws/_async_vws_request.py | 26 ++--- src/vws/_query_common.py | 58 ++++++++++ src/vws/_vws_common.py | 195 ++++++++++++++++++++++++++++++++ src/vws/_vws_request.py | 63 ++++++++--- src/vws/async_query.py | 25 ++-- src/vws/async_vumark_service.py | 38 +------ src/vws/async_vws.py | 115 +++++-------------- src/vws/query.py | 23 ++-- src/vws/vumark_service.py | 38 +------ src/vws/vws.py | 115 +++++-------------- 10 files changed, 389 insertions(+), 307 deletions(-) create mode 100644 src/vws/_query_common.py create mode 100644 src/vws/_vws_common.py diff --git a/src/vws/_async_vws_request.py b/src/vws/_async_vws_request.py index 4ebd50c8..169c3591 100644 --- a/src/vws/_async_vws_request.py +++ b/src/vws/_async_vws_request.py @@ -3,8 +3,8 @@ """ from beartype import BeartypeConf, beartype -from vws_auth_tools import authorization_header, rfc_1123_date +from vws._vws_request import build_vws_request_args from vws.response import Response from vws.transports import AsyncTransport @@ -47,27 +47,17 @@ async def async_target_api_request( Returns: The response to the request. """ - date_string = rfc_1123_date() - - signature_string = authorization_header( - access_key=server_access_key, - secret_key=server_secret_key, - method=method, - content=data, + url, headers = build_vws_request_args( content_type=content_type, - date=date_string, + server_access_key=server_access_key, + server_secret_key=server_secret_key, + method=method, + data=data, request_path=request_path, + base_vws_url=base_vws_url, + extra_headers=extra_headers, ) - headers = { - "Authorization": signature_string, - "Date": date_string, - "Content-Type": content_type, - **extra_headers, - } - - url = base_vws_url.rstrip("/") + request_path - return await transport( method=method, url=url, diff --git a/src/vws/_query_common.py b/src/vws/_query_common.py new file mode 100644 index 00000000..0dd8abf7 --- /dev/null +++ b/src/vws/_query_common.py @@ -0,0 +1,58 @@ +"""Shared helpers for CloudReco query implementations.""" + +import datetime +import json +from typing import NoReturn + +from vws.exceptions.cloud_reco_exceptions import ( + AuthenticationFailureError, + BadImageError, + InactiveProjectError, + RequestTimeTooSkewedError, +) +from vws.reports import QueryResult, TargetData +from vws.response import Response + + +def raise_for_cloud_reco_result_code( + result_code: str, response: Response +) -> NoReturn: + """Raise the appropriate cloud reco exception for the given result + code. + """ + exception = { + "AuthenticationFailure": AuthenticationFailureError, + "BadImage": BadImageError, + "InactiveProject": InactiveProjectError, + "RequestTimeTooSkewed": RequestTimeTooSkewedError, + }[result_code] + raise exception(response=response) + + +def parse_query_results(text: str) -> list[QueryResult]: + """Parse the results list from a cloud reco query response body.""" + result: list[QueryResult] = [] + result_list = list(json.loads(s=text)["results"]) + for item in result_list: + target_data: TargetData | None = None + if "target_data" in item: + target_data_dict = item["target_data"] + metadata = target_data_dict["application_metadata"] + timestamp_string = target_data_dict["target_timestamp"] + target_timestamp = datetime.datetime.fromtimestamp( + timestamp=timestamp_string, + tz=datetime.UTC, + ) + target_data = TargetData( + name=target_data_dict["name"], + application_metadata=metadata, + target_timestamp=target_timestamp, + ) + + query_result = QueryResult( + target_id=item["target_id"], + target_data=target_data, + ) + + result.append(query_result) + return result diff --git a/src/vws/_vws_common.py b/src/vws/_vws_common.py new file mode 100644 index 00000000..6b06cda9 --- /dev/null +++ b/src/vws/_vws_common.py @@ -0,0 +1,195 @@ +"""Shared helpers for VWS client implementations.""" + +import base64 +import json +from datetime import date +from typing import NoReturn + +from vws._image_utils import ImageType, get_image_data +from vws.exceptions.vws_exceptions import ( + AuthenticationFailureError, + BadImageError, + BadRequestError, + DateRangeError, + FailError, + ImageTooLargeError, + InvalidAcceptHeaderError, + InvalidInstanceIdError, + InvalidTargetTypeError, + MetadataTooLargeError, + ProjectHasNoAPIAccessError, + ProjectInactiveError, + ProjectSuspendedError, + RequestQuotaReachedError, + RequestTimeTooSkewedError, + TargetNameExistError, + TargetQuotaReachedError, + TargetStatusNotSuccessError, + TargetStatusProcessingError, + UnknownTargetError, +) +from vws.reports import ( + DatabaseSummaryReport, + TargetRecord, + TargetStatusAndRecord, + TargetStatuses, + TargetSummaryReport, +) +from vws.response import Response + + +def raise_for_vws_result_code( + result_code: str, response: Response +) -> NoReturn: + """Raise the appropriate VWS exception for the given result code.""" + exception = { + "AuthenticationFailure": AuthenticationFailureError, + "BadImage": BadImageError, + "BadRequest": BadRequestError, + "DateRangeError": DateRangeError, + "Fail": FailError, + "ImageTooLarge": ImageTooLargeError, + "MetadataTooLarge": MetadataTooLargeError, + "ProjectHasNoAPIAccess": ProjectHasNoAPIAccessError, + "ProjectInactive": ProjectInactiveError, + "ProjectSuspended": ProjectSuspendedError, + "RequestQuotaReached": RequestQuotaReachedError, + "RequestTimeTooSkewed": RequestTimeTooSkewedError, + "TargetNameExist": TargetNameExistError, + "TargetQuotaReached": TargetQuotaReachedError, + "TargetStatusNotSuccess": TargetStatusNotSuccessError, + "TargetStatusProcessing": TargetStatusProcessingError, + "UnknownTarget": UnknownTargetError, + }[result_code] + raise exception(response=response) + + +def raise_for_vumark_result_code( + result_code: str, response: Response +) -> NoReturn: + """Raise the appropriate VuMark exception for the given result + code. + """ + exception = { + "AuthenticationFailure": AuthenticationFailureError, + "BadRequest": BadRequestError, + "DateRangeError": DateRangeError, + "Fail": FailError, + "InvalidAcceptHeader": InvalidAcceptHeaderError, + "InvalidInstanceId": InvalidInstanceIdError, + "InvalidTargetType": InvalidTargetTypeError, + "RequestTimeTooSkewed": RequestTimeTooSkewedError, + "TargetStatusNotSuccess": TargetStatusNotSuccessError, + "UnknownTarget": UnknownTargetError, + }[result_code] + raise exception(response=response) + + +def parse_target_record_response(text: str) -> TargetStatusAndRecord: + """Parse a get_target_record response body.""" + result_data = json.loads(s=text) + status = TargetStatuses(value=result_data["status"]) + target_record_dict = dict(result_data["target_record"]) + target_record = TargetRecord( + target_id=target_record_dict["target_id"], + active_flag=bool(target_record_dict["active_flag"]), + name=target_record_dict["name"], + width=float(target_record_dict["width"]), + tracking_rating=int(target_record_dict["tracking_rating"]), + reco_rating=target_record_dict["reco_rating"], + ) + return TargetStatusAndRecord( + status=status, + target_record=target_record, + ) + + +def parse_target_summary_response(text: str) -> TargetSummaryReport: + """Parse a get_target_summary_report response body.""" + result_data = dict(json.loads(s=text)) + return TargetSummaryReport( + status=TargetStatuses(value=result_data["status"]), + database_name=result_data["database_name"], + target_name=result_data["target_name"], + upload_date=date.fromisoformat(result_data["upload_date"]), + active_flag=bool(result_data["active_flag"]), + tracking_rating=int(result_data["tracking_rating"]), + total_recos=int(result_data["total_recos"]), + current_month_recos=int(result_data["current_month_recos"]), + previous_month_recos=int(result_data["previous_month_recos"]), + ) + + +def parse_database_summary_response(text: str) -> DatabaseSummaryReport: + """Parse a get_database_summary_report response body.""" + response_data = dict(json.loads(s=text)) + return DatabaseSummaryReport( + active_images=int(response_data["active_images"]), + current_month_recos=int(response_data["current_month_recos"]), + failed_images=int(response_data["failed_images"]), + inactive_images=int(response_data["inactive_images"]), + name=str(object=response_data["name"]), + previous_month_recos=int(response_data["previous_month_recos"]), + processing_images=int(response_data["processing_images"]), + reco_threshold=int(response_data["reco_threshold"]), + request_quota=int(response_data["request_quota"]), + request_usage=int(response_data["request_usage"]), + target_quota=int(response_data["target_quota"]), + total_recos=int(response_data["total_recos"]), + ) + + +def build_add_target_content( + *, + name: str, + width: float, + image: ImageType, + active_flag: bool, + application_metadata: str | None, +) -> bytes: + """Build the request body for an add_target request.""" + image_data = get_image_data(image=image) + image_data_encoded = base64.b64encode(s=image_data).decode( + encoding="ascii", + ) + data = { + "name": name, + "width": width, + "image": image_data_encoded, + "active_flag": active_flag, + "application_metadata": application_metadata, + } + return json.dumps(obj=data).encode(encoding="utf-8") + + +def build_update_target_content( + *, + name: str | None, + width: float | None, + image: ImageType | None, + active_flag: bool | None, + application_metadata: str | None, +) -> bytes: + """Build the request body for an update_target request.""" + data: dict[str, str | bool | float | int] = {} + + if name is not None: + data["name"] = name + + if width is not None: + data["width"] = width + + if image is not None: + image_data = get_image_data(image=image) + image_data_encoded = base64.b64encode(s=image_data).decode( + encoding="ascii", + ) + data["image"] = image_data_encoded + + if active_flag is not None: + data["active_flag"] = active_flag + + if application_metadata is not None: + data["application_metadata"] = application_metadata + + return json.dumps(obj=data).encode(encoding="utf-8") diff --git a/src/vws/_vws_request.py b/src/vws/_vws_request.py index 8e08cf86..6d0f0486 100644 --- a/src/vws/_vws_request.py +++ b/src/vws/_vws_request.py @@ -9,6 +9,45 @@ from vws.transports import Transport +def build_vws_request_args( + *, + content_type: str, + server_access_key: str, + server_secret_key: str, + method: str, + data: bytes, + request_path: str, + base_vws_url: str, + extra_headers: dict[str, str], +) -> tuple[str, dict[str, str]]: + """Build the URL and headers for a Vuforia Target API request. + + Returns: + A tuple of (url, headers). + """ + date_string = rfc_1123_date() + + signature_string = authorization_header( + access_key=server_access_key, + secret_key=server_secret_key, + method=method, + content=data, + content_type=content_type, + date=date_string, + request_path=request_path, + ) + + headers = { + "Authorization": signature_string, + "Date": date_string, + "Content-Type": content_type, + **extra_headers, + } + + url = base_vws_url.rstrip("/") + request_path + return url, headers + + @beartype(conf=BeartypeConf(is_pep484_tower=True)) def target_api_request( *, @@ -46,27 +85,17 @@ def target_api_request( Returns: The response to the request. """ - date_string = rfc_1123_date() - - signature_string = authorization_header( - access_key=server_access_key, - secret_key=server_secret_key, - method=method, - content=data, + url, headers = build_vws_request_args( content_type=content_type, - date=date_string, + server_access_key=server_access_key, + server_secret_key=server_secret_key, + method=method, + data=data, request_path=request_path, + base_vws_url=base_vws_url, + extra_headers=extra_headers, ) - headers = { - "Authorization": signature_string, - "Date": date_string, - "Content-Type": content_type, - **extra_headers, - } - - url = base_vws_url.rstrip("/") + request_path - return transport( method=method, url=url, diff --git a/src/vws/async_query.py b/src/vws/async_query.py index f1df2ac7..36ea58a5 100644 --- a/src/vws/async_query.py +++ b/src/vws/async_query.py @@ -12,13 +12,11 @@ from vws._image_utils import ImageType as _ImageType from vws._image_utils import get_image_data as _get_image_data -from vws.exceptions.cloud_reco_exceptions import ( - AuthenticationFailureError, - BadImageError, - InactiveProjectError, - MaxNumResultsOutOfRangeError, - RequestTimeTooSkewedError, +from vws._query_common import ( + parse_query_results, + raise_for_cloud_reco_result_code, ) +from vws.exceptions.cloud_reco_exceptions import MaxNumResultsOutOfRangeError from vws.exceptions.custom_exceptions import ( RequestEntityTooLargeError, ServerError, @@ -188,15 +186,6 @@ async def query( result_code = json.loads(s=response.text)["result_code"] if result_code != "Success": - exception = { - "AuthenticationFailure": (AuthenticationFailureError), - "BadImage": BadImageError, - "InactiveProject": InactiveProjectError, - "RequestTimeTooSkewed": (RequestTimeTooSkewedError), - }[result_code] - raise exception(response=response) - - result_list = list( - json.loads(s=response.text)["results"], - ) - return [QueryResult.from_response_dict(item) for item in result_list] # type: ignore[misc] + raise_for_cloud_reco_result_code(result_code, response) + + return parse_query_results(response.text) diff --git a/src/vws/async_vumark_service.py b/src/vws/async_vumark_service.py index c1e189ee..3c23ee83 100644 --- a/src/vws/async_vumark_service.py +++ b/src/vws/async_vumark_service.py @@ -7,20 +7,9 @@ from beartype import BeartypeConf, beartype from vws._async_vws_request import async_target_api_request +from vws._vws_common import raise_for_vumark_result_code from vws.exceptions.custom_exceptions import ServerError -from vws.exceptions.vws_exceptions import ( - AuthenticationFailureError, - BadRequestError, - DateRangeError, - FailError, - InvalidAcceptHeaderError, - InvalidInstanceIdError, - InvalidTargetTypeError, - RequestTimeTooSkewedError, - TargetStatusNotSuccessError, - TooManyRequestsError, - UnknownTargetError, -) +from vws.exceptions.vws_exceptions import TooManyRequestsError from vws.transports import AsyncHTTPXTransport, AsyncTransport from vws.vumark_accept import VuMarkAccept @@ -149,22 +138,7 @@ async def generate_vumark_instance( ): # pragma: no cover raise ServerError(response=response) - if response.status_code == HTTPStatus.OK: - return response.content - - result_code = json.loads(s=response.text)["result_code"] - - exception = { - "AuthenticationFailure": (AuthenticationFailureError), - "BadRequest": BadRequestError, - "DateRangeError": DateRangeError, - "Fail": FailError, - "InvalidAcceptHeader": InvalidAcceptHeaderError, - "InvalidInstanceId": InvalidInstanceIdError, - "InvalidTargetType": InvalidTargetTypeError, - "RequestTimeTooSkewed": RequestTimeTooSkewedError, - "TargetStatusNotSuccess": (TargetStatusNotSuccessError), - "UnknownTarget": UnknownTargetError, - }[result_code] - - raise exception(response=response) + if response.status_code != HTTPStatus.OK: + result_code = json.loads(s=response.text)["result_code"] + raise_for_vumark_result_code(result_code, response) + return response.content diff --git a/src/vws/async_vws.py b/src/vws/async_vws.py index b81c5e15..cea72357 100644 --- a/src/vws/async_vws.py +++ b/src/vws/async_vws.py @@ -1,7 +1,6 @@ """Async tools for interacting with Vuforia APIs.""" import asyncio -import base64 import json from http import HTTPMethod, HTTPStatus from typing import Self @@ -10,31 +9,19 @@ from vws._async_vws_request import async_target_api_request from vws._image_utils import ImageType as _ImageType -from vws._image_utils import get_image_data as _get_image_data +from vws._vws_common import ( + build_add_target_content, + build_update_target_content, + parse_database_summary_response, + parse_target_record_response, + parse_target_summary_response, + raise_for_vws_result_code, +) from vws.exceptions.custom_exceptions import ( ServerError, TargetProcessingTimeoutError, ) -from vws.exceptions.vws_exceptions import ( - AuthenticationFailureError, - BadImageError, - BadRequestError, - DateRangeError, - FailError, - ImageTooLargeError, - MetadataTooLargeError, - ProjectHasNoAPIAccessError, - ProjectInactiveError, - ProjectSuspendedError, - RequestQuotaReachedError, - RequestTimeTooSkewedError, - TargetNameExistError, - TargetQuotaReachedError, - TargetStatusNotSuccessError, - TargetStatusProcessingError, - TooManyRequestsError, - UnknownTargetError, -) +from vws.exceptions.vws_exceptions import TooManyRequestsError from vws.reports import ( DatabaseSummaryReport, TargetStatusAndRecord, @@ -155,30 +142,9 @@ async def make_request( result_code = json.loads(s=response.text)["result_code"] - if result_code == expected_result_code: - return response - - exception = { - "AuthenticationFailure": AuthenticationFailureError, - "BadImage": BadImageError, - "BadRequest": BadRequestError, - "DateRangeError": DateRangeError, - "Fail": FailError, - "ImageTooLarge": ImageTooLargeError, - "MetadataTooLarge": MetadataTooLargeError, - "ProjectHasNoAPIAccess": ProjectHasNoAPIAccessError, - "ProjectInactive": ProjectInactiveError, - "ProjectSuspended": ProjectSuspendedError, - "RequestQuotaReached": RequestQuotaReachedError, - "RequestTimeTooSkewed": RequestTimeTooSkewedError, - "TargetNameExist": TargetNameExistError, - "TargetQuotaReached": TargetQuotaReachedError, - "TargetStatusNotSuccess": TargetStatusNotSuccessError, - "TargetStatusProcessing": TargetStatusProcessingError, - "UnknownTarget": UnknownTargetError, - }[result_code] - - raise exception(response=response) + if result_code != expected_result_code: + raise_for_vws_result_code(result_code, response) + return response async def add_target( self, @@ -234,21 +200,14 @@ async def add_target( ~vws.exceptions.vws_exceptions.TooManyRequestsError: Vuforia is rate limiting access. """ - image_data = _get_image_data(image=image) - image_data_encoded = base64.b64encode(s=image_data).decode( - encoding="ascii", + content = build_add_target_content( + name=name, + width=width, + image=image, + active_flag=active_flag, + application_metadata=application_metadata, ) - data = { - "name": name, - "width": width, - "image": image_data_encoded, - "active_flag": active_flag, - "application_metadata": application_metadata, - } - - content = json.dumps(obj=data).encode(encoding="utf-8") - response = await self.make_request( method=HTTPMethod.POST, data=content, @@ -295,8 +254,7 @@ async def get_target_record(self, target_id: str) -> TargetStatusAndRecord: content_type="application/json", ) - result_data = json.loads(s=response.text) - return TargetStatusAndRecord.from_response_dict(result_data) # type: ignore[misc] + return parse_target_record_response(response.text) async def wait_for_target_processed( self, @@ -430,8 +388,7 @@ async def get_target_summary_report( content_type="application/json", ) - result_data = dict(json.loads(s=response.text)) - return TargetSummaryReport.from_response_dict(result_data) # type: ignore[misc] + return parse_target_summary_response(response.text) async def get_database_summary_report( self, @@ -465,8 +422,7 @@ async def get_database_summary_report( content_type="application/json", ) - response_data = dict(json.loads(s=response.text)) - return DatabaseSummaryReport.from_response_dict(response_data) # type: ignore[misc] + return parse_database_summary_response(response.text) async def delete_target(self, target_id: str) -> None: """Delete a given target. @@ -602,28 +558,13 @@ async def update_target( ~vws.exceptions.vws_exceptions.TooManyRequestsError: Vuforia is rate limiting access. """ - data: dict[str, str | bool | float | int] = {} - - if name is not None: - data["name"] = name - - if width is not None: - data["width"] = width - - if image is not None: - image_data = _get_image_data(image=image) - image_data_encoded = base64.b64encode( - s=image_data, - ).decode(encoding="ascii") - data["image"] = image_data_encoded - - if active_flag is not None: - data["active_flag"] = active_flag - - if application_metadata is not None: - data["application_metadata"] = application_metadata - - content = json.dumps(obj=data).encode(encoding="utf-8") + content = build_update_target_content( + name=name, + width=width, + image=image, + active_flag=active_flag, + application_metadata=application_metadata, + ) await self.make_request( method=HTTPMethod.PUT, diff --git a/src/vws/query.py b/src/vws/query.py index a4810b73..168d8852 100644 --- a/src/vws/query.py +++ b/src/vws/query.py @@ -10,13 +10,11 @@ from vws._image_utils import ImageType as _ImageType from vws._image_utils import get_image_data as _get_image_data -from vws.exceptions.cloud_reco_exceptions import ( - AuthenticationFailureError, - BadImageError, - InactiveProjectError, - MaxNumResultsOutOfRangeError, - RequestTimeTooSkewedError, +from vws._query_common import ( + parse_query_results, + raise_for_cloud_reco_result_code, ) +from vws.exceptions.cloud_reco_exceptions import MaxNumResultsOutOfRangeError from vws.exceptions.custom_exceptions import ( RequestEntityTooLargeError, ServerError, @@ -156,13 +154,6 @@ def query( result_code = json.loads(s=response.text)["result_code"] if result_code != "Success": - exception = { - "AuthenticationFailure": AuthenticationFailureError, - "BadImage": BadImageError, - "InactiveProject": InactiveProjectError, - "RequestTimeTooSkewed": RequestTimeTooSkewedError, - }[result_code] - raise exception(response=response) - - result_list = list(json.loads(s=response.text)["results"]) - return [QueryResult.from_response_dict(item) for item in result_list] # type: ignore[misc] + raise_for_cloud_reco_result_code(result_code, response) + + return parse_query_results(response.text) diff --git a/src/vws/vumark_service.py b/src/vws/vumark_service.py index 47d68d94..8bebe1e0 100644 --- a/src/vws/vumark_service.py +++ b/src/vws/vumark_service.py @@ -5,21 +5,10 @@ from beartype import BeartypeConf, beartype +from vws._vws_common import raise_for_vumark_result_code from vws._vws_request import target_api_request from vws.exceptions.custom_exceptions import ServerError -from vws.exceptions.vws_exceptions import ( - AuthenticationFailureError, - BadRequestError, - DateRangeError, - FailError, - InvalidAcceptHeaderError, - InvalidInstanceIdError, - InvalidTargetTypeError, - RequestTimeTooSkewedError, - TargetStatusNotSuccessError, - TooManyRequestsError, - UnknownTargetError, -) +from vws.exceptions.vws_exceptions import TooManyRequestsError from vws.transports import RequestsTransport, Transport from vws.vumark_accept import VuMarkAccept @@ -130,22 +119,7 @@ def generate_vumark_instance( ): # pragma: no cover raise ServerError(response=response) - if response.status_code == HTTPStatus.OK: - return response.content - - result_code = json.loads(s=response.text)["result_code"] - - exception = { - "AuthenticationFailure": AuthenticationFailureError, - "BadRequest": BadRequestError, - "DateRangeError": DateRangeError, - "Fail": FailError, - "InvalidAcceptHeader": InvalidAcceptHeaderError, - "InvalidInstanceId": InvalidInstanceIdError, - "InvalidTargetType": InvalidTargetTypeError, - "RequestTimeTooSkewed": RequestTimeTooSkewedError, - "TargetStatusNotSuccess": TargetStatusNotSuccessError, - "UnknownTarget": UnknownTargetError, - }[result_code] - - raise exception(response=response) + if response.status_code != HTTPStatus.OK: + result_code = json.loads(s=response.text)["result_code"] + raise_for_vumark_result_code(result_code, response) + return response.content diff --git a/src/vws/vws.py b/src/vws/vws.py index 346b7d62..8a0f4b76 100644 --- a/src/vws/vws.py +++ b/src/vws/vws.py @@ -1,6 +1,5 @@ """Tools for interacting with Vuforia APIs.""" -import base64 import json import time from http import HTTPMethod, HTTPStatus @@ -8,32 +7,20 @@ from beartype import BeartypeConf, beartype from vws._image_utils import ImageType as _ImageType -from vws._image_utils import get_image_data as _get_image_data +from vws._vws_common import ( + build_add_target_content, + build_update_target_content, + parse_database_summary_response, + parse_target_record_response, + parse_target_summary_response, + raise_for_vws_result_code, +) from vws._vws_request import target_api_request from vws.exceptions.custom_exceptions import ( ServerError, TargetProcessingTimeoutError, ) -from vws.exceptions.vws_exceptions import ( - AuthenticationFailureError, - BadImageError, - BadRequestError, - DateRangeError, - FailError, - ImageTooLargeError, - MetadataTooLargeError, - ProjectHasNoAPIAccessError, - ProjectInactiveError, - ProjectSuspendedError, - RequestQuotaReachedError, - RequestTimeTooSkewedError, - TargetNameExistError, - TargetQuotaReachedError, - TargetStatusNotSuccessError, - TargetStatusProcessingError, - TooManyRequestsError, - UnknownTargetError, -) +from vws.exceptions.vws_exceptions import TooManyRequestsError from vws.reports import ( DatabaseSummaryReport, TargetStatusAndRecord, @@ -140,30 +127,9 @@ def make_request( result_code = json.loads(s=response.text)["result_code"] - if result_code == expected_result_code: - return response - - exception = { - "AuthenticationFailure": AuthenticationFailureError, - "BadImage": BadImageError, - "BadRequest": BadRequestError, - "DateRangeError": DateRangeError, - "Fail": FailError, - "ImageTooLarge": ImageTooLargeError, - "MetadataTooLarge": MetadataTooLargeError, - "ProjectHasNoAPIAccess": ProjectHasNoAPIAccessError, - "ProjectInactive": ProjectInactiveError, - "ProjectSuspended": ProjectSuspendedError, - "RequestQuotaReached": RequestQuotaReachedError, - "RequestTimeTooSkewed": RequestTimeTooSkewedError, - "TargetNameExist": TargetNameExistError, - "TargetQuotaReached": TargetQuotaReachedError, - "TargetStatusNotSuccess": TargetStatusNotSuccessError, - "TargetStatusProcessing": TargetStatusProcessingError, - "UnknownTarget": UnknownTargetError, - }[result_code] - - raise exception(response=response) + if result_code != expected_result_code: + raise_for_vws_result_code(result_code, response) + return response def add_target( self, @@ -219,21 +185,14 @@ def add_target( ~vws.exceptions.vws_exceptions.TooManyRequestsError: Vuforia is rate limiting access. """ - image_data = _get_image_data(image=image) - image_data_encoded = base64.b64encode(s=image_data).decode( - encoding="ascii", + content = build_add_target_content( + name=name, + width=width, + image=image, + active_flag=active_flag, + application_metadata=application_metadata, ) - data = { - "name": name, - "width": width, - "image": image_data_encoded, - "active_flag": active_flag, - "application_metadata": application_metadata, - } - - content = json.dumps(obj=data).encode(encoding="utf-8") - response = self.make_request( method=HTTPMethod.POST, data=content, @@ -280,8 +239,7 @@ def get_target_record(self, target_id: str) -> TargetStatusAndRecord: content_type="application/json", ) - result_data = json.loads(s=response.text) - return TargetStatusAndRecord.from_response_dict(result_data) # type: ignore[misc] + return parse_target_record_response(response.text) def wait_for_target_processed( self, @@ -404,8 +362,7 @@ def get_target_summary_report(self, target_id: str) -> TargetSummaryReport: content_type="application/json", ) - result_data = dict(json.loads(s=response.text)) - return TargetSummaryReport.from_response_dict(result_data) # type: ignore[misc] + return parse_target_summary_response(response.text) def get_database_summary_report(self) -> DatabaseSummaryReport: """Get a summary report for the database. @@ -437,8 +394,7 @@ def get_database_summary_report(self) -> DatabaseSummaryReport: content_type="application/json", ) - response_data = dict(json.loads(s=response.text)) - return DatabaseSummaryReport.from_response_dict(response_data) # type: ignore[misc] + return parse_database_summary_response(response.text) def delete_target(self, target_id: str) -> None: """Delete a given target. @@ -568,28 +524,13 @@ def update_target( ~vws.exceptions.vws_exceptions.TooManyRequestsError: Vuforia is rate limiting access. """ - data: dict[str, str | bool | float | int] = {} - - if name is not None: - data["name"] = name - - if width is not None: - data["width"] = width - - if image is not None: - image_data = _get_image_data(image=image) - image_data_encoded = base64.b64encode(s=image_data).decode( - encoding="ascii", - ) - data["image"] = image_data_encoded - - if active_flag is not None: - data["active_flag"] = active_flag - - if application_metadata is not None: - data["application_metadata"] = application_metadata - - content = json.dumps(obj=data).encode(encoding="utf-8") + content = build_update_target_content( + name=name, + width=width, + image=image, + active_flag=active_flag, + application_metadata=application_metadata, + ) self.make_request( method=HTTPMethod.PUT, From f2f6e9e5b267998406d2cfb04f1ef6e6684c7a0c Mon Sep 17 00:00:00 2001 From: Adam Dangoor Date: Wed, 25 Feb 2026 09:28:15 +0000 Subject: [PATCH 2/3] Use keyword args for shared helper calls --- src/vws/async_query.py | 7 +++++-- src/vws/async_vumark_service.py | 5 ++++- src/vws/async_vws.py | 11 +++++++---- src/vws/query.py | 7 +++++-- src/vws/vumark_service.py | 5 ++++- src/vws/vws.py | 11 +++++++---- 6 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/vws/async_query.py b/src/vws/async_query.py index 36ea58a5..268245db 100644 --- a/src/vws/async_query.py +++ b/src/vws/async_query.py @@ -186,6 +186,9 @@ async def query( result_code = json.loads(s=response.text)["result_code"] if result_code != "Success": - raise_for_cloud_reco_result_code(result_code, response) + raise_for_cloud_reco_result_code( + result_code=result_code, + response=response, + ) - return parse_query_results(response.text) + return parse_query_results(text=response.text) diff --git a/src/vws/async_vumark_service.py b/src/vws/async_vumark_service.py index 3c23ee83..2ddb6a91 100644 --- a/src/vws/async_vumark_service.py +++ b/src/vws/async_vumark_service.py @@ -140,5 +140,8 @@ async def generate_vumark_instance( if response.status_code != HTTPStatus.OK: result_code = json.loads(s=response.text)["result_code"] - raise_for_vumark_result_code(result_code, response) + raise_for_vumark_result_code( + result_code=result_code, + response=response, + ) return response.content diff --git a/src/vws/async_vws.py b/src/vws/async_vws.py index cea72357..fbaae135 100644 --- a/src/vws/async_vws.py +++ b/src/vws/async_vws.py @@ -143,7 +143,10 @@ async def make_request( result_code = json.loads(s=response.text)["result_code"] if result_code != expected_result_code: - raise_for_vws_result_code(result_code, response) + raise_for_vws_result_code( + result_code=result_code, + response=response, + ) return response async def add_target( @@ -254,7 +257,7 @@ async def get_target_record(self, target_id: str) -> TargetStatusAndRecord: content_type="application/json", ) - return parse_target_record_response(response.text) + return parse_target_record_response(text=response.text) async def wait_for_target_processed( self, @@ -388,7 +391,7 @@ async def get_target_summary_report( content_type="application/json", ) - return parse_target_summary_response(response.text) + return parse_target_summary_response(text=response.text) async def get_database_summary_report( self, @@ -422,7 +425,7 @@ async def get_database_summary_report( content_type="application/json", ) - return parse_database_summary_response(response.text) + return parse_database_summary_response(text=response.text) async def delete_target(self, target_id: str) -> None: """Delete a given target. diff --git a/src/vws/query.py b/src/vws/query.py index 168d8852..d7a81dea 100644 --- a/src/vws/query.py +++ b/src/vws/query.py @@ -154,6 +154,9 @@ def query( result_code = json.loads(s=response.text)["result_code"] if result_code != "Success": - raise_for_cloud_reco_result_code(result_code, response) + raise_for_cloud_reco_result_code( + result_code=result_code, + response=response, + ) - return parse_query_results(response.text) + return parse_query_results(text=response.text) diff --git a/src/vws/vumark_service.py b/src/vws/vumark_service.py index 8bebe1e0..33e4f993 100644 --- a/src/vws/vumark_service.py +++ b/src/vws/vumark_service.py @@ -121,5 +121,8 @@ def generate_vumark_instance( if response.status_code != HTTPStatus.OK: result_code = json.loads(s=response.text)["result_code"] - raise_for_vumark_result_code(result_code, response) + raise_for_vumark_result_code( + result_code=result_code, + response=response, + ) return response.content diff --git a/src/vws/vws.py b/src/vws/vws.py index 8a0f4b76..6106087c 100644 --- a/src/vws/vws.py +++ b/src/vws/vws.py @@ -128,7 +128,10 @@ def make_request( result_code = json.loads(s=response.text)["result_code"] if result_code != expected_result_code: - raise_for_vws_result_code(result_code, response) + raise_for_vws_result_code( + result_code=result_code, + response=response, + ) return response def add_target( @@ -239,7 +242,7 @@ def get_target_record(self, target_id: str) -> TargetStatusAndRecord: content_type="application/json", ) - return parse_target_record_response(response.text) + return parse_target_record_response(text=response.text) def wait_for_target_processed( self, @@ -362,7 +365,7 @@ def get_target_summary_report(self, target_id: str) -> TargetSummaryReport: content_type="application/json", ) - return parse_target_summary_response(response.text) + return parse_target_summary_response(text=response.text) def get_database_summary_report(self) -> DatabaseSummaryReport: """Get a summary report for the database. @@ -394,7 +397,7 @@ def get_database_summary_report(self) -> DatabaseSummaryReport: content_type="application/json", ) - return parse_database_summary_response(response.text) + return parse_database_summary_response(text=response.text) def delete_target(self, target_id: str) -> None: """Delete a given target. From 1d7eabbd374adca59e4f9b4874bc2689a3da7433 Mon Sep 17 00:00:00 2001 From: Adam Dangoor Date: Wed, 25 Feb 2026 10:10:23 +0000 Subject: [PATCH 3/3] Remove duplicated response parsing helpers --- src/vws/_query_common.py | 32 --------------------- src/vws/_vws_common.py | 62 ---------------------------------------- src/vws/async_query.py | 11 +++---- src/vws/async_vws.py | 15 ++++++---- src/vws/query.py | 11 +++---- src/vws/vws.py | 15 ++++++---- 6 files changed, 30 insertions(+), 116 deletions(-) diff --git a/src/vws/_query_common.py b/src/vws/_query_common.py index 0dd8abf7..4a045aad 100644 --- a/src/vws/_query_common.py +++ b/src/vws/_query_common.py @@ -1,7 +1,5 @@ """Shared helpers for CloudReco query implementations.""" -import datetime -import json from typing import NoReturn from vws.exceptions.cloud_reco_exceptions import ( @@ -10,7 +8,6 @@ InactiveProjectError, RequestTimeTooSkewedError, ) -from vws.reports import QueryResult, TargetData from vws.response import Response @@ -27,32 +24,3 @@ def raise_for_cloud_reco_result_code( "RequestTimeTooSkewed": RequestTimeTooSkewedError, }[result_code] raise exception(response=response) - - -def parse_query_results(text: str) -> list[QueryResult]: - """Parse the results list from a cloud reco query response body.""" - result: list[QueryResult] = [] - result_list = list(json.loads(s=text)["results"]) - for item in result_list: - target_data: TargetData | None = None - if "target_data" in item: - target_data_dict = item["target_data"] - metadata = target_data_dict["application_metadata"] - timestamp_string = target_data_dict["target_timestamp"] - target_timestamp = datetime.datetime.fromtimestamp( - timestamp=timestamp_string, - tz=datetime.UTC, - ) - target_data = TargetData( - name=target_data_dict["name"], - application_metadata=metadata, - target_timestamp=target_timestamp, - ) - - query_result = QueryResult( - target_id=item["target_id"], - target_data=target_data, - ) - - result.append(query_result) - return result diff --git a/src/vws/_vws_common.py b/src/vws/_vws_common.py index 6b06cda9..2f3553b8 100644 --- a/src/vws/_vws_common.py +++ b/src/vws/_vws_common.py @@ -2,7 +2,6 @@ import base64 import json -from datetime import date from typing import NoReturn from vws._image_utils import ImageType, get_image_data @@ -28,13 +27,6 @@ TargetStatusProcessingError, UnknownTargetError, ) -from vws.reports import ( - DatabaseSummaryReport, - TargetRecord, - TargetStatusAndRecord, - TargetStatuses, - TargetSummaryReport, -) from vws.response import Response @@ -85,60 +77,6 @@ def raise_for_vumark_result_code( raise exception(response=response) -def parse_target_record_response(text: str) -> TargetStatusAndRecord: - """Parse a get_target_record response body.""" - result_data = json.loads(s=text) - status = TargetStatuses(value=result_data["status"]) - target_record_dict = dict(result_data["target_record"]) - target_record = TargetRecord( - target_id=target_record_dict["target_id"], - active_flag=bool(target_record_dict["active_flag"]), - name=target_record_dict["name"], - width=float(target_record_dict["width"]), - tracking_rating=int(target_record_dict["tracking_rating"]), - reco_rating=target_record_dict["reco_rating"], - ) - return TargetStatusAndRecord( - status=status, - target_record=target_record, - ) - - -def parse_target_summary_response(text: str) -> TargetSummaryReport: - """Parse a get_target_summary_report response body.""" - result_data = dict(json.loads(s=text)) - return TargetSummaryReport( - status=TargetStatuses(value=result_data["status"]), - database_name=result_data["database_name"], - target_name=result_data["target_name"], - upload_date=date.fromisoformat(result_data["upload_date"]), - active_flag=bool(result_data["active_flag"]), - tracking_rating=int(result_data["tracking_rating"]), - total_recos=int(result_data["total_recos"]), - current_month_recos=int(result_data["current_month_recos"]), - previous_month_recos=int(result_data["previous_month_recos"]), - ) - - -def parse_database_summary_response(text: str) -> DatabaseSummaryReport: - """Parse a get_database_summary_report response body.""" - response_data = dict(json.loads(s=text)) - return DatabaseSummaryReport( - active_images=int(response_data["active_images"]), - current_month_recos=int(response_data["current_month_recos"]), - failed_images=int(response_data["failed_images"]), - inactive_images=int(response_data["inactive_images"]), - name=str(object=response_data["name"]), - previous_month_recos=int(response_data["previous_month_recos"]), - processing_images=int(response_data["processing_images"]), - reco_threshold=int(response_data["reco_threshold"]), - request_quota=int(response_data["request_quota"]), - request_usage=int(response_data["request_usage"]), - target_quota=int(response_data["target_quota"]), - total_recos=int(response_data["total_recos"]), - ) - - def build_add_target_content( *, name: str, diff --git a/src/vws/async_query.py b/src/vws/async_query.py index 268245db..a10db9c5 100644 --- a/src/vws/async_query.py +++ b/src/vws/async_query.py @@ -12,10 +12,7 @@ from vws._image_utils import ImageType as _ImageType from vws._image_utils import get_image_data as _get_image_data -from vws._query_common import ( - parse_query_results, - raise_for_cloud_reco_result_code, -) +from vws._query_common import raise_for_cloud_reco_result_code from vws.exceptions.cloud_reco_exceptions import MaxNumResultsOutOfRangeError from vws.exceptions.custom_exceptions import ( RequestEntityTooLargeError, @@ -191,4 +188,8 @@ async def query( response=response, ) - return parse_query_results(text=response.text) + result_items = list(json.loads(s=response.text)["results"]) + return [ + QueryResult.from_response_dict(response_dict=result_item) + for result_item in result_items + ] diff --git a/src/vws/async_vws.py b/src/vws/async_vws.py index fbaae135..21ccdec3 100644 --- a/src/vws/async_vws.py +++ b/src/vws/async_vws.py @@ -12,9 +12,6 @@ from vws._vws_common import ( build_add_target_content, build_update_target_content, - parse_database_summary_response, - parse_target_record_response, - parse_target_summary_response, raise_for_vws_result_code, ) from vws.exceptions.custom_exceptions import ( @@ -257,7 +254,9 @@ async def get_target_record(self, target_id: str) -> TargetStatusAndRecord: content_type="application/json", ) - return parse_target_record_response(text=response.text) + return TargetStatusAndRecord.from_response_dict( + response_dict=dict(json.loads(s=response.text)), + ) async def wait_for_target_processed( self, @@ -391,7 +390,9 @@ async def get_target_summary_report( content_type="application/json", ) - return parse_target_summary_response(text=response.text) + return TargetSummaryReport.from_response_dict( + response_dict=dict(json.loads(s=response.text)), + ) async def get_database_summary_report( self, @@ -425,7 +426,9 @@ async def get_database_summary_report( content_type="application/json", ) - return parse_database_summary_response(text=response.text) + return DatabaseSummaryReport.from_response_dict( + response_dict=dict(json.loads(s=response.text)), + ) async def delete_target(self, target_id: str) -> None: """Delete a given target. diff --git a/src/vws/query.py b/src/vws/query.py index d7a81dea..afd5e89d 100644 --- a/src/vws/query.py +++ b/src/vws/query.py @@ -10,10 +10,7 @@ from vws._image_utils import ImageType as _ImageType from vws._image_utils import get_image_data as _get_image_data -from vws._query_common import ( - parse_query_results, - raise_for_cloud_reco_result_code, -) +from vws._query_common import raise_for_cloud_reco_result_code from vws.exceptions.cloud_reco_exceptions import MaxNumResultsOutOfRangeError from vws.exceptions.custom_exceptions import ( RequestEntityTooLargeError, @@ -159,4 +156,8 @@ def query( response=response, ) - return parse_query_results(text=response.text) + result_items = list(json.loads(s=response.text)["results"]) + return [ + QueryResult.from_response_dict(response_dict=result_item) + for result_item in result_items + ] diff --git a/src/vws/vws.py b/src/vws/vws.py index 6106087c..04194f1f 100644 --- a/src/vws/vws.py +++ b/src/vws/vws.py @@ -10,9 +10,6 @@ from vws._vws_common import ( build_add_target_content, build_update_target_content, - parse_database_summary_response, - parse_target_record_response, - parse_target_summary_response, raise_for_vws_result_code, ) from vws._vws_request import target_api_request @@ -242,7 +239,9 @@ def get_target_record(self, target_id: str) -> TargetStatusAndRecord: content_type="application/json", ) - return parse_target_record_response(text=response.text) + return TargetStatusAndRecord.from_response_dict( + response_dict=dict(json.loads(s=response.text)), + ) def wait_for_target_processed( self, @@ -365,7 +364,9 @@ def get_target_summary_report(self, target_id: str) -> TargetSummaryReport: content_type="application/json", ) - return parse_target_summary_response(text=response.text) + return TargetSummaryReport.from_response_dict( + response_dict=dict(json.loads(s=response.text)), + ) def get_database_summary_report(self) -> DatabaseSummaryReport: """Get a summary report for the database. @@ -397,7 +398,9 @@ def get_database_summary_report(self) -> DatabaseSummaryReport: content_type="application/json", ) - return parse_database_summary_response(text=response.text) + return DatabaseSummaryReport.from_response_dict( + response_dict=dict(json.loads(s=response.text)), + ) def delete_target(self, target_id: str) -> None: """Delete a given target.