From 791d220fb55e5252adbf38937f9800f40cd1dba9 Mon Sep 17 00:00:00 2001 From: Gracjan Sadowicz Date: Wed, 24 Jun 2026 13:10:14 +0200 Subject: [PATCH] RDBC-1076 Python API: Pull Replication Port the Pull Replication client API from the C# client (v7.2): hub, sink, external-replication and hub-access (register/unregister/get) operations, read-side task info, supporting models/enums and ReplicationHubNotFoundException. Replaces the empty stubs in replication/definitions.py, adds top-level exports and TestBase tests verified against a licensed 7.2 server. --- ravendb/__init__.py | 36 +- ravendb/documents/operations/ongoing_tasks.py | 183 +++++++++ .../operations/replication/definitions.py | 357 +++++++++++++++++- .../replication/pull_replication.py | 289 ++++++++++++++ ravendb/exceptions/exception_dispatcher.py | 3 + ravendb/exceptions/raven_exceptions.py | 5 + .../replication_tests/__init__.py | 0 .../test_pull_replication.py | 171 +++++++++ ravendb/tests/test_imports.py | 32 +- 9 files changed, 1036 insertions(+), 40 deletions(-) create mode 100644 ravendb/documents/operations/replication/pull_replication.py create mode 100644 ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/__init__.py create mode 100644 ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py diff --git a/ravendb/__init__.py b/ravendb/__init__.py index ea8bde2c..5893e21a 100644 --- a/ravendb/__init__.py +++ b/ravendb/__init__.py @@ -165,10 +165,29 @@ from ravendb.documents.operations.refresh.configuration import RefreshConfiguration from ravendb.documents.operations.replication.definitions import ( ExternalReplication, + ExternalReplicationBase, + ReplicationNode, PullReplicationAsSink, PullReplicationDefinition, - ReplicationNode, - ExternalReplicationBase, + PullReplicationMode, + PreventDeletionsMode, + ReplicationHubAccess, + DetailedReplicationHubAccess, + ReplicationHubAccessResult, + PullReplicationDefinitionAndCurrentConnections, +) +from ravendb.documents.operations.replication.pull_replication import ( + PutPullReplicationAsHubOperation, + UpdatePullReplicationAsSinkOperation, + UpdateExternalReplicationOperation, + RegisterReplicationHubAccessOperation, + UnregisterReplicationHubAccessOperation, + GetReplicationHubAccessOperation, + GetPullReplicationTasksInfoOperation, +) +from ravendb.documents.operations.ongoing_tasks import ( + OngoingTaskPullReplicationAsSink, + OngoingTaskPullReplicationAsHub, ) from ravendb.documents.operations.revisions import ( RevisionsCollectionConfiguration, @@ -395,17 +414,7 @@ # SeedIdentityForOperation # IOperationProgress # IOperationResult -# PullReplicationDefinitionAndCurrentConnections -# DetailedReplicationHubAccess -# GetReplicationHubAccessOperation -# PreventDeletionsMode -# PullReplicationMode -# RegisterReplicationHubAccessOperation -# ReplicationHubAccess -# ReplicationHubAccessResult # ReplicationHubAccessResponse -# UnregisterReplicationHubAccessOperation -# UpdatePullReplicationAsSinkOperation # GetConflictsCommand # PutAttachmentCommandHelper # SetupDocumentBase @@ -435,9 +444,6 @@ # DisableDatabaseToggleResult # ConfigureExpirationOperation # DeleteOngoingTaskOperation -# GetPullReplicationHubTasksInfoOperation -# OngoingTaskPullReplicationAsSink -# OngoingTaskPullReplicationAsHub # OngoingTaskType # RunningBackup # NextBackup diff --git a/ravendb/documents/operations/ongoing_tasks.py b/ravendb/documents/operations/ongoing_tasks.py index 6561fa89..62b869c6 100644 --- a/ravendb/documents/operations/ongoing_tasks.py +++ b/ravendb/documents/operations/ongoing_tasks.py @@ -12,6 +12,7 @@ from ravendb.tools.utils import Utils from ravendb.util.util import RaftIdGenerator from ravendb.http.raven_command import RavenCommand +from ravendb.documents.operations.replication.definitions import PullReplicationMode if TYPE_CHECKING: from ravendb.documents.conventions import DocumentConventions @@ -279,6 +280,186 @@ def from_json(cls, json_dict: dict) -> "OngoingTaskEmbeddingsGeneration": ) +class OngoingTaskPullReplicationAsHub(OngoingTask): + """Ongoing task information for a single pull-replication hub connection.""" + + def __init__( + self, + task_id: Optional[int] = None, + responsible_node: Optional[NodeId] = None, + task_state: Optional[OngoingTaskState] = None, + task_connection_status: Optional[OngoingTaskConnectionStatus] = None, + task_name: Optional[str] = None, + error: Optional[str] = None, + mentor_node: Optional[str] = None, + pin_to_mentor_node: Optional[bool] = None, + from_to_string: Optional[str] = None, + destination_url: Optional[str] = None, + destination_database: Optional[str] = None, + delay_replication_for=None, + handler_id: Optional[str] = None, + last_accepted_change_vector_from_destination: Optional[str] = None, + source_database_change_vector: Optional[str] = None, + last_sent_etag: Optional[int] = None, + last_database_etag: Optional[int] = None, + ): + super().__init__( + task_id=task_id, + task_type=OngoingTaskType.PULL_REPLICATION_AS_HUB, + responsible_node=responsible_node, + task_state=task_state, + task_connection_status=task_connection_status, + task_name=task_name, + error=error, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + ) + self.from_to_string = from_to_string + self.destination_url = destination_url + self.destination_database = destination_database + self.delay_replication_for = delay_replication_for + self.handler_id = handler_id + self.last_accepted_change_vector_from_destination = last_accepted_change_vector_from_destination + self.source_database_change_vector = source_database_change_vector + self.last_sent_etag = last_sent_etag + self.last_database_etag = last_database_etag + + def to_json(self) -> dict: + result = super().to_json() + result["FromToString"] = self.from_to_string + result["DestinationUrl"] = self.destination_url + result["DestinationDatabase"] = self.destination_database + result["DelayReplicationFor"] = ( + Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None + ) + result["HandlerId"] = self.handler_id + result["LastAcceptedChangeVectorFromDestination"] = self.last_accepted_change_vector_from_destination + result["SourceDatabaseChangeVector"] = self.source_database_change_vector + result["LastSentEtag"] = self.last_sent_etag + result["LastDatabaseEtag"] = self.last_database_etag + return result + + @classmethod + def from_json(cls, json_dict: dict) -> Optional["OngoingTaskPullReplicationAsHub"]: + if json_dict is None: + return None + task_state_str = json_dict.get("TaskState") + task_connection_status_str = json_dict.get("TaskConnectionStatus") + delay = json_dict.get("DelayReplicationFor") + return cls( + task_id=json_dict.get("TaskId"), + responsible_node=NodeId.from_json(json_dict.get("ResponsibleNode")), + task_state=OngoingTaskState(task_state_str) if task_state_str else None, + task_connection_status=( + OngoingTaskConnectionStatus(task_connection_status_str) if task_connection_status_str else None + ), + task_name=json_dict.get("TaskName"), + error=json_dict.get("Error"), + mentor_node=json_dict.get("MentorNode"), + pin_to_mentor_node=json_dict.get("PinToMentorNode"), + from_to_string=json_dict.get("FromToString"), + destination_url=json_dict.get("DestinationUrl"), + destination_database=json_dict.get("DestinationDatabase"), + delay_replication_for=Utils.string_to_timedelta(delay) if delay else None, + handler_id=json_dict.get("HandlerId"), + last_accepted_change_vector_from_destination=json_dict.get("LastAcceptedChangeVectorFromDestination"), + source_database_change_vector=json_dict.get("SourceDatabaseChangeVector"), + last_sent_etag=json_dict.get("LastSentEtag"), + last_database_etag=json_dict.get("LastDatabaseEtag"), + ) + + +class OngoingTaskPullReplicationAsSink(OngoingTask): + """Ongoing task information for a pull-replication sink task.""" + + def __init__( + self, + task_id: Optional[int] = None, + responsible_node: Optional[NodeId] = None, + task_state: Optional[OngoingTaskState] = None, + task_connection_status: Optional[OngoingTaskConnectionStatus] = None, + task_name: Optional[str] = None, + error: Optional[str] = None, + mentor_node: Optional[str] = None, + pin_to_mentor_node: Optional[bool] = None, + hub_name: Optional[str] = None, + mode: Optional[PullReplicationMode] = None, + destination_url: Optional[str] = None, + topology_discovery_urls: Optional[list] = None, + destination_database: Optional[str] = None, + connection_string_name: Optional[str] = None, + certificate_public_key: Optional[str] = None, + access_name: Optional[str] = None, + allowed_hub_to_sink_paths: Optional[list] = None, + allowed_sink_to_hub_paths: Optional[list] = None, + ): + super().__init__( + task_id=task_id, + task_type=OngoingTaskType.PULL_REPLICATION_AS_SINK, + responsible_node=responsible_node, + task_state=task_state, + task_connection_status=task_connection_status, + task_name=task_name, + error=error, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + ) + self.hub_name = hub_name + self.mode = mode + self.destination_url = destination_url + self.topology_discovery_urls = topology_discovery_urls + self.destination_database = destination_database + self.connection_string_name = connection_string_name + self.certificate_public_key = certificate_public_key + self.access_name = access_name + self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths + self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths + + def to_json(self) -> dict: + result = super().to_json() + result["HubName"] = self.hub_name + result["Mode"] = self.mode.value if self.mode else None + result["DestinationUrl"] = self.destination_url + result["TopologyDiscoveryUrls"] = self.topology_discovery_urls + result["DestinationDatabase"] = self.destination_database + result["ConnectionStringName"] = self.connection_string_name + result["CertificatePublicKey"] = self.certificate_public_key + result["AccessName"] = self.access_name + result["AllowedHubToSinkPaths"] = self.allowed_hub_to_sink_paths + result["AllowedSinkToHubPaths"] = self.allowed_sink_to_hub_paths + return result + + @classmethod + def from_json(cls, json_dict: dict) -> Optional["OngoingTaskPullReplicationAsSink"]: + if json_dict is None: + return None + task_state_str = json_dict.get("TaskState") + task_connection_status_str = json_dict.get("TaskConnectionStatus") + mode_str = json_dict.get("Mode") + return cls( + task_id=json_dict.get("TaskId"), + responsible_node=NodeId.from_json(json_dict.get("ResponsibleNode")), + task_state=OngoingTaskState(task_state_str) if task_state_str else None, + task_connection_status=( + OngoingTaskConnectionStatus(task_connection_status_str) if task_connection_status_str else None + ), + task_name=json_dict.get("TaskName"), + error=json_dict.get("Error"), + mentor_node=json_dict.get("MentorNode"), + pin_to_mentor_node=json_dict.get("PinToMentorNode"), + hub_name=json_dict.get("HubName"), + mode=PullReplicationMode(mode_str) if mode_str else None, + destination_url=json_dict.get("DestinationUrl"), + topology_discovery_urls=json_dict.get("TopologyDiscoveryUrls"), + destination_database=json_dict.get("DestinationDatabase"), + connection_string_name=json_dict.get("ConnectionStringName"), + certificate_public_key=json_dict.get("CertificatePublicKey"), + access_name=json_dict.get("AccessName"), + allowed_hub_to_sink_paths=json_dict.get("AllowedHubToSinkPaths"), + allowed_sink_to_hub_paths=json_dict.get("AllowedSinkToHubPaths"), + ) + + class ToggleOngoingTaskStateOperation(MaintenanceOperation[ModifyOngoingTaskResult]): def __init__( self, task_name_or_id: Union[int, str], type_of_task: Optional[OngoingTaskType], disable: Optional[bool] @@ -454,6 +635,8 @@ def _deserialize_task( return OngoingTaskGenAi.from_json(json_dict) elif self._task_type == OngoingTaskType.EMBEDDINGS_GENERATION: return OngoingTaskEmbeddingsGeneration.from_json(json_dict) + elif self._task_type == OngoingTaskType.PULL_REPLICATION_AS_SINK: + return OngoingTaskPullReplicationAsSink.from_json(json_dict) else: # todo: handle more types of tasks return OngoingTask.from_json(json_dict) diff --git a/ravendb/documents/operations/replication/definitions.py b/ravendb/documents/operations/replication/definitions.py index 51d2d7d8..0d820375 100644 --- a/ravendb/documents/operations/replication/definitions.py +++ b/ravendb/documents/operations/replication/definitions.py @@ -1,28 +1,320 @@ -# todo: implement -from datetime import datetime -from typing import Optional, List, Dict +from __future__ import annotations + +import enum +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Any from ravendb.tools.utils import Utils +class PullReplicationMode(enum.Enum): + """Determines the direction of data flow between a pull-replication hub and sink. + + Mirrors the C# ``[Flags]`` enum - the combined value serializes as the + comma-separated string the server expects/returns. + """ + + NONE = "None" + HUB_TO_SINK = "HubToSink" + SINK_TO_HUB = "SinkToHub" + HUB_TO_SINK_AND_SINK_TO_HUB = "HubToSink, SinkToHub" + + def __str__(self) -> str: + return self.value + + +class PreventDeletionsMode(enum.Enum): + NONE = "None" + PREVENT_SINK_TO_HUB_DELETIONS = "PreventSinkToHubDeletions" + + def __str__(self) -> str: + return self.value + + +class ReplicationType(enum.Enum): + EXTERNAL = "External" + PULL_AS_SINK = "PullAsSink" + PULL_AS_HUB = "PullAsHub" + INTERNAL = "Internal" + MIGRATION = "Migration" + + def __str__(self) -> str: + return self.value + + class ReplicationNode: - pass + def __init__(self, url: str = None, database: str = None, disabled: bool = False): + self.url = url.rstrip("/") if url else url + self.database = database + self.disabled = disabled + + def get_replication_type(self) -> Optional[ReplicationType]: + return None + + def to_json(self) -> Dict[str, Any]: + replication_type = self.get_replication_type() + return { + "Database": self.database, + "Url": self.url, + "Disabled": self.disabled, + "Type": replication_type.value if replication_type is not None else None, + } + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[ReplicationNode]: + if json_dict is None: + return None + return cls(json_dict.get("Url"), json_dict.get("Database"), json_dict.get("Disabled", False)) class ExternalReplicationBase(ReplicationNode): - pass + def __init__( + self, + database: str = None, + connection_string_name: str = None, + name: str = None, + mentor_node: str = None, + pin_to_mentor_node: bool = False, + task_id: int = 0, + url: str = None, + disabled: bool = False, + ): + super().__init__(url, database, disabled) + self.task_id = task_id + self.name = name + self.connection_string_name = connection_string_name + self.mentor_node = mentor_node + self.pin_to_mentor_node = pin_to_mentor_node + + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict.update( + { + "TaskId": self.task_id, + "Name": self.name, + "MentorNode": self.mentor_node, + "PinToMentorNode": self.pin_to_mentor_node, + "ConnectionStringName": self.connection_string_name, + } + ) + return json_dict + + def _fill_from_json(self, json_dict: Dict[str, Any]) -> None: + self.url = json_dict.get("Url") + self.database = json_dict.get("Database") + self.disabled = json_dict.get("Disabled", False) + self.task_id = json_dict.get("TaskId", 0) + self.name = json_dict.get("Name") + self.connection_string_name = json_dict.get("ConnectionStringName") + self.mentor_node = json_dict.get("MentorNode") + self.pin_to_mentor_node = json_dict.get("PinToMentorNode", False) class ExternalReplication(ExternalReplicationBase): - pass + def __init__( + self, + database: str = None, + connection_string_name: str = None, + delay_replication_for: timedelta = None, + name: str = None, + mentor_node: str = None, + pin_to_mentor_node: bool = False, + task_id: int = 0, + ): + super().__init__( + database=database, + connection_string_name=connection_string_name, + name=name, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + task_id=task_id, + ) + self.delay_replication_for = delay_replication_for + def get_replication_type(self) -> ReplicationType: + return ReplicationType.EXTERNAL -class PullReplicationAsSink(ExternalReplicationBase): - pass + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict["DelayReplicationFor"] = ( + Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None + ) + return json_dict + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[ExternalReplication]: + if json_dict is None: + return None + obj = cls() + obj._fill_from_json(json_dict) + delay = json_dict.get("DelayReplicationFor") + obj.delay_replication_for = Utils.string_to_timedelta(delay) if delay else None + return obj class PullReplicationDefinition: - pass + """A pull-replication hub definition (the source side of a pull-replication task).""" + + def __init__( + self, + name: str = None, + delay_replication_for: timedelta = None, + mentor_node: str = None, + disabled: bool = False, + mode: PullReplicationMode = None, + task_id: int = 0, + with_filtering: bool = False, + prevent_deletions_mode: PreventDeletionsMode = None, + pin_to_mentor_node: bool = False, + ): + self.name = name + self.delay_replication_for = delay_replication_for + self.mentor_node = mentor_node + self.disabled = disabled + self.mode = mode if mode is not None else PullReplicationMode.HUB_TO_SINK + self.task_id = task_id + self.with_filtering = with_filtering + self.prevent_deletions_mode = prevent_deletions_mode + self.pin_to_mentor_node = pin_to_mentor_node + + def to_json(self) -> Dict[str, Any]: + return { + "Name": self.name, + "DelayReplicationFor": ( + Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None + ), + "MentorNode": self.mentor_node, + "PinToMentorNode": self.pin_to_mentor_node, + "Disabled": self.disabled, + "Mode": self.mode.value if self.mode else None, + "TaskId": self.task_id, + "WithFiltering": self.with_filtering, + "PreventDeletionsMode": self.prevent_deletions_mode.value if self.prevent_deletions_mode else None, + } + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[PullReplicationDefinition]: + if json_dict is None: + return None + delay = json_dict.get("DelayReplicationFor") + mode = json_dict.get("Mode") + prevent = json_dict.get("PreventDeletionsMode") + return cls( + name=json_dict.get("Name"), + delay_replication_for=Utils.string_to_timedelta(delay) if delay else None, + mentor_node=json_dict.get("MentorNode"), + disabled=json_dict.get("Disabled", False), + mode=PullReplicationMode(mode) if mode else None, + task_id=json_dict.get("TaskId", 0), + with_filtering=json_dict.get("WithFiltering", False), + prevent_deletions_mode=PreventDeletionsMode(prevent) if prevent else None, + pin_to_mentor_node=json_dict.get("PinToMentorNode", False), + ) + + +class PullReplicationAsSink(ExternalReplicationBase): + """A pull-replication sink definition (the destination side of a pull-replication task).""" + + def __init__( + self, + database: str = None, + connection_string_name: str = None, + hub_name: str = None, + mode: PullReplicationMode = None, + allowed_hub_to_sink_paths: List[str] = None, + allowed_sink_to_hub_paths: List[str] = None, + certificate_with_private_key: str = None, + certificate_password: str = None, + access_name: str = None, + name: str = None, + mentor_node: str = None, + pin_to_mentor_node: bool = False, + task_id: int = 0, + ): + super().__init__( + database=database, + connection_string_name=connection_string_name, + name=name, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + task_id=task_id, + ) + self.hub_name = hub_name + self.mode = mode if mode is not None else PullReplicationMode.HUB_TO_SINK + self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths + self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths + self.certificate_with_private_key = certificate_with_private_key + self.certificate_password = certificate_password + self.access_name = access_name + + def get_replication_type(self) -> ReplicationType: + return ReplicationType.PULL_AS_SINK + + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict.update( + { + "Mode": self.mode.value if self.mode else None, + "HubName": self.hub_name, + "AllowedHubToSinkPaths": self.allowed_hub_to_sink_paths, + "AllowedSinkToHubPaths": self.allowed_sink_to_hub_paths, + "CertificateWithPrivateKey": self.certificate_with_private_key, + "CertificatePassword": self.certificate_password, + "AccessName": self.access_name, + } + ) + return json_dict + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[PullReplicationAsSink]: + if json_dict is None: + return None + obj = cls() + obj._fill_from_json(json_dict) + mode = json_dict.get("Mode") + obj.mode = PullReplicationMode(mode) if mode else None + obj.hub_name = json_dict.get("HubName") + obj.allowed_hub_to_sink_paths = json_dict.get("AllowedHubToSinkPaths") + obj.allowed_sink_to_hub_paths = json_dict.get("AllowedSinkToHubPaths") + obj.certificate_with_private_key = json_dict.get("CertificateWithPrivateKey") + obj.certificate_password = json_dict.get("CertificatePassword") + obj.access_name = json_dict.get("AccessName") + return obj + + +class ReplicationHubAccess: + """Grants a sink (identified by a certificate) access to a replication hub.""" + + def __init__( + self, + name: str = None, + certificate_base64: str = None, + allowed_hub_to_sink_paths: List[str] = None, + allowed_sink_to_hub_paths: List[str] = None, + ): + self.name = name + self.certificate_base64 = certificate_base64 + self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths + self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths + + def to_json(self) -> Dict[str, Any]: + return { + "Name": self.name, + "CertificateBase64": self.certificate_base64, + "AllowedHubToSinkPaths": self.allowed_hub_to_sink_paths, + "AllowedSinkToHubPaths": self.allowed_sink_to_hub_paths, + } + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[ReplicationHubAccess]: + if json_dict is None: + return None + return cls( + json_dict.get("Name"), + json_dict.get("CertificateBase64"), + json_dict.get("AllowedHubToSinkPaths"), + json_dict.get("AllowedSinkToHubPaths"), + ) class DetailedReplicationHubAccess: @@ -60,3 +352,50 @@ def to_json(self) -> Dict: "AllowedHubToSinkPaths": self.allowed_hub_to_sink_paths, "AllowedSinkToHubPaths": self.allowed_sink_to_hub_paths, } + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[DetailedReplicationHubAccess]: + if json_dict is None: + return None + not_before = json_dict.get("NotBefore") + not_after = json_dict.get("NotAfter") + return cls( + name=json_dict.get("Name"), + thumbprint=json_dict.get("Thumbprint"), + certificate=json_dict.get("Certificate"), + not_before=Utils.string_to_datetime(not_before) if not_before else None, + not_after=Utils.string_to_datetime(not_after) if not_after else None, + subject=json_dict.get("Subject"), + issuer=json_dict.get("Issuer"), + allowed_hub_to_sink_paths=json_dict.get("AllowedHubToSinkPaths"), + allowed_sink_to_hub_paths=json_dict.get("AllowedSinkToHubPaths"), + ) + + +class ReplicationHubAccessResult: + def __init__(self, results: List[DetailedReplicationHubAccess] = None): + self.results = results + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> ReplicationHubAccessResult: + results = json_dict.get("Results") or [] + return cls([DetailedReplicationHubAccess.from_json(item) for item in results]) + + +class PullReplicationDefinitionAndCurrentConnections: + def __init__(self, definition: PullReplicationDefinition = None, ongoing_tasks: List = None): + self.definition = definition + self.ongoing_tasks = ongoing_tasks + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[PullReplicationDefinitionAndCurrentConnections]: + if json_dict is None: + return None + from ravendb.documents.operations.ongoing_tasks import OngoingTaskPullReplicationAsHub + + definition = json_dict.get("Definition") + ongoing = json_dict.get("OngoingTasks") or [] + return cls( + definition=PullReplicationDefinition.from_json(definition) if definition else None, + ongoing_tasks=[OngoingTaskPullReplicationAsHub.from_json(item) for item in ongoing], + ) diff --git a/ravendb/documents/operations/replication/pull_replication.py b/ravendb/documents/operations/replication/pull_replication.py new file mode 100644 index 00000000..69fd06b4 --- /dev/null +++ b/ravendb/documents/operations/replication/pull_replication.py @@ -0,0 +1,289 @@ +from __future__ import annotations + +import http +import json +from typing import List, Optional, TYPE_CHECKING + +import requests + +from ravendb.documents.operations.definitions import MaintenanceOperation, VoidMaintenanceOperation +from ravendb.documents.operations.replication.definitions import ( + DetailedReplicationHubAccess, + ExternalReplication, + PullReplicationAsSink, + PullReplicationDefinition, + PullReplicationDefinitionAndCurrentConnections, + ReplicationHubAccess, + ReplicationHubAccessResult, +) +from ravendb.exceptions.raven_exceptions import ReplicationHubNotFoundException +from ravendb.http.raven_command import RavenCommand, RavenCommandResponseType, VoidRavenCommand +from ravendb.http.topology import RaftCommand +from ravendb.serverwide.operations.common import ModifyOngoingTaskResult +from ravendb.tools.utils import Utils +from ravendb.util.util import RaftIdGenerator + +if TYPE_CHECKING: + from ravendb.documents.conventions import DocumentConventions + from ravendb.http.server_node import ServerNode + + +class PutPullReplicationAsHubOperation(MaintenanceOperation[ModifyOngoingTaskResult]): + """Creates or updates a pull-replication hub task on the (source) database.""" + + def __init__( + self, + pull_replication_definition: Optional[PullReplicationDefinition] = None, + name: str = None, + ): + if pull_replication_definition is None: + if not name or name.isspace(): + raise ValueError("Name cannot be None or whitespace") + pull_replication_definition = PullReplicationDefinition(name) + + if not pull_replication_definition.name or pull_replication_definition.name.isspace(): + raise ValueError("PullReplicationDefinition name cannot be None or whitespace") + + self._pull_replication_definition = pull_replication_definition + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ModifyOngoingTaskResult]: + return self.PutPullReplicationAsHubCommand(self._pull_replication_definition) + + class PutPullReplicationAsHubCommand(RavenCommand[ModifyOngoingTaskResult], RaftCommand): + def __init__(self, pull_replication_definition: PullReplicationDefinition): + super().__init__(ModifyOngoingTaskResult) + self._pull_replication_definition = pull_replication_definition + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub" + request = requests.Request("PUT", url) + request.data = self._pull_replication_definition.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ModifyOngoingTaskResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class UpdatePullReplicationAsSinkOperation(MaintenanceOperation[ModifyOngoingTaskResult]): + """Creates or updates a pull-replication sink task on the (destination) database.""" + + def __init__(self, pull_replication: PullReplicationAsSink, use_server_certificate: bool = False): + if pull_replication is None: + raise ValueError("PullReplicationAsSink cannot be None") + self._pull_replication = pull_replication + self._use_server_certificate = use_server_certificate + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ModifyOngoingTaskResult]: + return self.UpdatePullReplicationAsSinkCommand(self._pull_replication, self._use_server_certificate) + + class UpdatePullReplicationAsSinkCommand(RavenCommand[ModifyOngoingTaskResult], RaftCommand): + def __init__(self, pull_replication: PullReplicationAsSink, use_server_certificate: bool): + super().__init__(ModifyOngoingTaskResult) + self._pull_replication = pull_replication + self._use_server_certificate = use_server_certificate + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/tasks/sink-pull-replication" + sink_json = self._pull_replication.to_json() + if self._use_server_certificate: + sink_json["CertificateWithPrivateKey"] = None + request = requests.Request("POST", url) + request.data = {"PullReplicationAsSink": sink_json} + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ModifyOngoingTaskResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class UpdateExternalReplicationOperation(MaintenanceOperation[ModifyOngoingTaskResult]): + """Creates or updates an external-replication task on the database.""" + + def __init__(self, new_watcher: ExternalReplication): + if new_watcher is None: + raise ValueError("ExternalReplication cannot be None") + self._new_watcher = new_watcher + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ModifyOngoingTaskResult]: + return self.UpdateExternalReplicationCommand(self._new_watcher) + + class UpdateExternalReplicationCommand(RavenCommand[ModifyOngoingTaskResult], RaftCommand): + def __init__(self, new_watcher: ExternalReplication): + super().__init__(ModifyOngoingTaskResult) + self._new_watcher = new_watcher + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/tasks/external-replication" + request = requests.Request("POST", url) + request.data = {"Watcher": self._new_watcher.to_json()} + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ModifyOngoingTaskResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class RegisterReplicationHubAccessOperation(MaintenanceOperation[None]): + """Registers a sink's certificate access against an existing replication hub.""" + + def __init__(self, hub_name: str, access: ReplicationHubAccess): + if not hub_name or hub_name.isspace(): + raise ValueError("Hub name cannot be None or whitespace") + if access is None: + raise ValueError("Access cannot be None") + self._hub_name = hub_name + self._access = access + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[None]: + return self.RegisterReplicationHubAccessCommand(self._hub_name, self._access) + + class RegisterReplicationHubAccessCommand(RavenCommand[None], RaftCommand): + def __init__(self, hub_name: str, access: ReplicationHubAccess): + super().__init__() + self._hub_name = hub_name + self._access = access + self._response_type = RavenCommandResponseType.RAW + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = ( + f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub/access" + f"?name={Utils.quote_key(self._hub_name)}" + ) + request = requests.Request("PUT", url) + request.data = self._access.to_json() + return request + + def set_response_raw(self, response: requests.Response, stream: bytes) -> None: + if response.status_code == http.HTTPStatus.NOT_FOUND: + raise ReplicationHubNotFoundException( + f"The replication hub {self._hub_name} was not found on the database. " + f"Did you forget to define it first?" + ) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class UnregisterReplicationHubAccessOperation(VoidMaintenanceOperation): + """Revokes a previously-registered sink certificate (by thumbprint) from a replication hub.""" + + def __init__(self, hub_name: str, thumbprint: str): + if not hub_name or hub_name.isspace(): + raise ValueError("Hub name cannot be None or whitespace") + if not thumbprint or thumbprint.isspace(): + raise ValueError("Thumbprint cannot be None or whitespace") + self._hub_name = hub_name + self._thumbprint = thumbprint + + def get_command(self, conventions: "DocumentConventions") -> VoidRavenCommand: + return self.UnregisterReplicationHubAccessCommand(self._hub_name, self._thumbprint) + + class UnregisterReplicationHubAccessCommand(VoidRavenCommand, RaftCommand): + def __init__(self, hub_name: str, thumbprint: str): + super().__init__() + self._hub_name = hub_name + self._thumbprint = thumbprint + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = ( + f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub/access" + f"?name={Utils.quote_key(self._hub_name)}&thumbprint={Utils.quote_key(self._thumbprint)}" + ) + return requests.Request("DELETE", url) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class GetReplicationHubAccessOperation(MaintenanceOperation[List[DetailedReplicationHubAccess]]): + """Lists the sinks (certificates) registered to access a replication hub.""" + + def __init__(self, hub_name: str, start: int = 0, page_size: int = 25): + if not hub_name or hub_name.isspace(): + raise ValueError("Hub name cannot be None or whitespace") + self._hub_name = hub_name + self._start = start + self._page_size = page_size + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[List[DetailedReplicationHubAccess]]: + return self.GetReplicationHubAccessCommand(self._hub_name, self._start, self._page_size) + + class GetReplicationHubAccessCommand(RavenCommand[List[DetailedReplicationHubAccess]]): + def __init__(self, hub_name: str, start: int, page_size: int): + super().__init__(DetailedReplicationHubAccess) + self._hub_name = hub_name + self._start = start + self._page_size = page_size + + def is_read_request(self) -> bool: + return True + + def create_request(self, node: "ServerNode") -> requests.Request: + url = ( + f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub/access" + f"?name={Utils.quote_key(self._hub_name)}&start={self._start}&pageSize={self._page_size}" + ) + return requests.Request("GET", url) + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self.result = [] + return + self.result = ReplicationHubAccessResult.from_json(json.loads(response)).results + + +class GetPullReplicationTasksInfoOperation(MaintenanceOperation[PullReplicationDefinitionAndCurrentConnections]): + """Retrieves a pull-replication hub definition together with its current sink connections.""" + + def __init__(self, task_id: int): + self._task_id = task_id + + def get_command( + self, conventions: "DocumentConventions" + ) -> RavenCommand[PullReplicationDefinitionAndCurrentConnections]: + return self.GetPullReplicationTasksInfoCommand(self._task_id) + + class GetPullReplicationTasksInfoCommand(RavenCommand[PullReplicationDefinitionAndCurrentConnections]): + def __init__(self, task_id: int): + super().__init__(PullReplicationDefinitionAndCurrentConnections) + self._task_id = task_id + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/tasks/pull-replication/hub?key={self._task_id}" + return requests.Request("GET", url) + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + return + self.result = PullReplicationDefinitionAndCurrentConnections.from_json(json.loads(response)) diff --git a/ravendb/exceptions/exception_dispatcher.py b/ravendb/exceptions/exception_dispatcher.py index 4d80bed6..b06ca2a2 100644 --- a/ravendb/exceptions/exception_dispatcher.py +++ b/ravendb/exceptions/exception_dispatcher.py @@ -20,6 +20,7 @@ RateLimitException, RavenException, RefusedToAnswerException, + ReplicationHubNotFoundException, SchemaValidationException, TooManyRequestsException, TooManyTokensException, @@ -54,6 +55,8 @@ "BulkInsertProtocolViolationException": BulkInsertProtocolViolationException, # schema validation "SchemaValidationException": SchemaValidationException, + # replication + "ReplicationHubNotFoundException": ReplicationHubNotFoundException, # cluster "NodeIsPassiveException": NodeIsPassiveException, "NoLoaderException": NoLoaderException, diff --git a/ravendb/exceptions/raven_exceptions.py b/ravendb/exceptions/raven_exceptions.py index 34c507c0..6b6efee9 100644 --- a/ravendb/exceptions/raven_exceptions.py +++ b/ravendb/exceptions/raven_exceptions.py @@ -89,3 +89,8 @@ class MissingAiAgentParameterException(RavenException): class SchemaValidationException(RavenException): def __init__(self, message: str = None): super().__init__(message) + + +class ReplicationHubNotFoundException(RavenException): + def __init__(self, message: str = None, cause: BaseException = None): + super().__init__(message, cause) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/__init__.py b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py new file mode 100644 index 00000000..97eafedb --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py @@ -0,0 +1,171 @@ +import base64 +from datetime import timedelta + +from ravendb import ( + ExternalReplication, + GetReplicationHubAccessOperation, + GetPullReplicationTasksInfoOperation, + PreventDeletionsMode, + PullReplicationAsSink, + PullReplicationDefinition, + PullReplicationMode, + PutPullReplicationAsHubOperation, + RegisterReplicationHubAccessOperation, + ReplicationHubAccess, + UnregisterReplicationHubAccessOperation, + UpdateExternalReplicationOperation, + UpdatePullReplicationAsSinkOperation, +) +from ravendb.documents.operations.etl.configuration import RavenConnectionString +from ravendb.documents.operations.connection_string.put_connection_string_operation import ( + PutConnectionStringOperation, +) +from ravendb.documents.operations.ongoing_tasks import ( + GetOngoingTaskInfoOperation, + OngoingTaskPullReplicationAsSink, + OngoingTaskType, +) +from ravendb.exceptions.raven_exceptions import RavenException, ReplicationHubNotFoundException +from ravendb.tests.test_base import TestBase + +_DUMMY_CERTIFICATE_B64 = base64.b64encode(b"this-is-not-a-real-certificate").decode("utf-8") + + +class TestPullReplication(TestBase): + def setUp(self): + super().setUp() + + def _put_hub(self, name, **kwargs): + definition = PullReplicationDefinition(name, **kwargs) + result = self.store.maintenance.send(PutPullReplicationAsHubOperation(definition)) + self.assertIsNotNone(result) + self.assertIsNotNone(result.task_id) + return result + + def _put_connection_string(self, name, database): + connection_string = RavenConnectionString( + name=name, database=database, topology_discovery_urls=list(self.store.urls) + ) + self.store.maintenance.send(PutConnectionStringOperation(connection_string)) + return name + + def test_can_put_pull_replication_as_hub(self): + result = self._put_hub("hub-1") + self.assertGreater(result.task_id, 0) + + def test_can_put_pull_replication_as_hub_by_name(self): + result = self.store.maintenance.send(PutPullReplicationAsHubOperation(name="hub-by-name")) + self.assertIsNotNone(result.task_id) + + def test_can_get_pull_replication_tasks_info(self): + result = self._put_hub("hub-info", delay_replication_for=timedelta(seconds=30)) + + info = self.store.maintenance.send(GetPullReplicationTasksInfoOperation(result.task_id)) + self.assertIsNotNone(info) + self.assertIsNotNone(info.definition) + self.assertEqual("hub-info", info.definition.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK, info.definition.mode) + self.assertEqual(timedelta(seconds=30), info.definition.delay_replication_for) + self.assertIsNotNone(info.ongoing_tasks) + + def test_can_update_pull_replication_as_sink(self): + connection_string_name = self._put_connection_string("cs-sink", "sink-remote-db") + sink = PullReplicationAsSink( + database=self.store.database, + connection_string_name=connection_string_name, + hub_name="remote-hub", + name="my-sink", + ) + + result = self.store.maintenance.send(UpdatePullReplicationAsSinkOperation(sink)) + self.assertIsNotNone(result) + self.assertIsNotNone(result.task_id) + + info = self.store.maintenance.send( + GetOngoingTaskInfoOperation(result.task_id, OngoingTaskType.PULL_REPLICATION_AS_SINK) + ) + self.assertIsInstance(info, OngoingTaskPullReplicationAsSink) + self.assertEqual("remote-hub", info.hub_name) + self.assertEqual("my-sink", info.task_name) + + def test_can_update_external_replication(self): + connection_string_name = self._put_connection_string("cs-ext", "external-remote-db") + external = ExternalReplication( + database=self.store.database, + connection_string_name=connection_string_name, + name="my-external-replication", + ) + + result = self.store.maintenance.send(UpdateExternalReplicationOperation(external)) + self.assertIsNotNone(result) + self.assertIsNotNone(result.task_id) + + def test_get_replication_hub_access_is_empty_for_new_hub(self): + self._put_hub("hub-no-access") + access = self.store.maintenance.send(GetReplicationHubAccessOperation("hub-no-access")) + self.assertEqual([], access) + + def test_register_replication_hub_access_for_missing_hub_raises(self): + access = ReplicationHubAccess(name="sink-1", certificate_base64=_DUMMY_CERTIFICATE_B64) + operation = RegisterReplicationHubAccessOperation("hub-that-does-not-exist", access) + # The client maps a 404 to ReplicationHubNotFoundException (matching the C# client). + # The 7.2 server reports a missing hub as a 500 InvalidOperationException ("... isn't + # defined ..."); accept either so the test is correct across server versions. + try: + self.store.maintenance.send(operation) + self.fail("Expected the operation to raise for a missing hub") + except ReplicationHubNotFoundException: + pass + except RavenException as e: + self.assertIn("hub-that-does-not-exist", str(e)) + + def test_unregister_replication_hub_access_is_noop_for_unknown_thumbprint(self): + self._put_hub("hub-unregister") + # Removing an unknown thumbprint from an existing hub must not raise. + self.store.maintenance.send( + UnregisterReplicationHubAccessOperation("hub-unregister", "0123456789ABCDEF0123456789ABCDEF01234567") + ) + + def test_operations_validate_arguments_client_side(self): + self.assertRaises(ValueError, lambda: PutPullReplicationAsHubOperation()) + self.assertRaises(ValueError, lambda: RegisterReplicationHubAccessOperation("", ReplicationHubAccess())) + self.assertRaises(ValueError, lambda: RegisterReplicationHubAccessOperation("hub", None)) + self.assertRaises(ValueError, lambda: UnregisterReplicationHubAccessOperation("hub", "")) + self.assertRaises(ValueError, lambda: GetReplicationHubAccessOperation("")) + self.assertRaises(ValueError, lambda: UpdatePullReplicationAsSinkOperation(None)) + + def test_pull_replication_definition_json_round_trip(self): + definition = PullReplicationDefinition( + "round-trip", + delay_replication_for=timedelta(minutes=5), + mode=PullReplicationMode.HUB_TO_SINK_AND_SINK_TO_HUB, + with_filtering=True, + prevent_deletions_mode=PreventDeletionsMode.PREVENT_SINK_TO_HUB_DELETIONS, + disabled=True, + ) + + parsed = PullReplicationDefinition.from_json(definition.to_json()) + self.assertEqual("round-trip", parsed.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK_AND_SINK_TO_HUB, parsed.mode) + self.assertTrue(parsed.with_filtering) + self.assertTrue(parsed.disabled) + self.assertEqual(PreventDeletionsMode.PREVENT_SINK_TO_HUB_DELETIONS, parsed.prevent_deletions_mode) + self.assertEqual(timedelta(minutes=5), parsed.delay_replication_for) + + def test_pull_replication_as_sink_json_round_trip(self): + sink = PullReplicationAsSink( + database="db", + connection_string_name="cs", + hub_name="h", + name="s", + mode=PullReplicationMode.HUB_TO_SINK, + allowed_hub_to_sink_paths=["users/*"], + ) + + parsed = PullReplicationAsSink.from_json(sink.to_json()) + self.assertEqual("h", parsed.hub_name) + self.assertEqual("cs", parsed.connection_string_name) + self.assertEqual("db", parsed.database) + self.assertEqual("s", parsed.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK, parsed.mode) + self.assertEqual(["users/*"], parsed.allowed_hub_to_sink_paths) diff --git a/ravendb/tests/test_imports.py b/ravendb/tests/test_imports.py index 2edda75b..d0b4446a 100644 --- a/ravendb/tests/test_imports.py +++ b/ravendb/tests/test_imports.py @@ -148,22 +148,22 @@ def test_imports_at_top_level(self): # from ravendb import SeedIdentityForOperation # from ravendb import IOperationProgress # from ravendb import IOperationResult - # from ravendb import UpdateExternalReplicationOperation + from ravendb import UpdateExternalReplicationOperation + from ravendb import PullReplicationDefinitionAndCurrentConnections + from ravendb import PutPullReplicationAsHubOperation + from ravendb import DetailedReplicationHubAccess + from ravendb import GetReplicationHubAccessOperation + from ravendb import PreventDeletionsMode + from ravendb import PullReplicationMode + from ravendb import RegisterReplicationHubAccessOperation + from ravendb import ReplicationHubAccess + from ravendb import ReplicationHubAccessResult + from ravendb import UnregisterReplicationHubAccessOperation + from ravendb import UpdatePullReplicationAsSinkOperation + from ravendb import GetPullReplicationTasksInfoOperation - # from ravendb import PullReplicationDefinitionAndCurrentConnections - # from ravendb import PutPullReplicationAsHubOperation - - # from ravendb import DetailedReplicationHubAccess - # from ravendb import GetReplicationHubAccessOperation # from ravendb import IExternalReplication - # from ravendb import PreventDeletionsMode - # from ravendb import PullReplicationMode - # from ravendb import RegisterReplicationHubAccessOperation - # from ravendb import ReplicationHubAccess - # from ravendb import ReplicationHubAccessResult # from ravendb import ReplicationHubAccessResponse - # from ravendb import UnregisterReplicationHubAccessOperation - # from ravendb import UpdatePullReplicationAsSinkOperation # from ravendb import GetConflictsCommand from ravendb import SetIndexesLockOperation from ravendb import SetIndexesPriorityOperation @@ -247,9 +247,9 @@ def test_imports_at_top_level(self): # from ravendb import DisableDatabaseToggleResult # from ravendb import ConfigureExpirationOperation # from ravendb import DeleteOngoingTaskOperation - # from ravendb import GetPullReplicationHubTasksInfoOperation - # from ravendb import OngoingTaskPullReplicationAsSink - # from ravendb import OngoingTaskPullReplicationAsHub + from ravendb import OngoingTaskPullReplicationAsSink + from ravendb import OngoingTaskPullReplicationAsHub + # from ravendb import OngoingTaskType # from ravendb import RunningBackup # from ravendb import NextBackup