diff --git a/README.md b/README.md index 2d21f49..1058e61 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Use of this project with Salesforce is subject to the [TERMS OF USE](./TERMS_OF_ ## Prerequisites - **Python 3.11 only** (currently supported version - if your system version is different, we recommend using [pyenv](https://github.com/pyenv/pyenv) to configure 3.11) -- [Azul Zulu OpenJDK 17.x](https://www.azul.com/downloads/?version=java-17-lts&package=jdk#zulu) +- JDK 17 - Docker support like [Docker Desktop](https://docs.docker.com/desktop/) - A salesforce org with some DLOs or DMOs with data and this feature enabled (it is not GA) - An [External Client App](#creating-an-external-client-app) diff --git a/src/datacustomcode/client.py b/src/datacustomcode/client.py index 01aed31..80f20a8 100644 --- a/src/datacustomcode/client.py +++ b/src/datacustomcode/client.py @@ -185,29 +185,31 @@ def _new_function_client(cls) -> Client: ) return cls._instance - def read_dlo(self, name: str) -> PySparkDataFrame: + def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: """Read a DLO from Data Cloud. Args: name: The name of the DLO to read. + row_limit: Maximum number of rows to fetch (default: 1000). Returns: A PySpark DataFrame containing the DLO data. """ self._record_dlo_access(name) - return self._reader.read_dlo(name) + return self._reader.read_dlo(name, row_limit=row_limit) - def read_dmo(self, name: str) -> PySparkDataFrame: + def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: """Read a DMO from Data Cloud. Args: name: The name of the DMO to read. + row_limit: Maximum number of rows to fetch (default: 1000). Returns: A PySpark DataFrame containing the DMO data. """ self._record_dmo_access(name) - return self._reader.read_dmo(name) + return self._reader.read_dmo(name, row_limit=row_limit) def write_to_dlo( self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs diff --git a/src/datacustomcode/io/reader/base.py b/src/datacustomcode/io/reader/base.py index 32b0cff..f5e69f3 100644 --- a/src/datacustomcode/io/reader/base.py +++ b/src/datacustomcode/io/reader/base.py @@ -15,12 +15,13 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union from datacustomcode.io.base import BaseDataAccessLayer if TYPE_CHECKING: from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession + from pyspark.sql.types import AtomicType, StructType class BaseDataCloudReader(BaseDataAccessLayer): @@ -28,7 +29,17 @@ def __init__(self, spark: SparkSession): self.spark = spark @abstractmethod - def read_dlo(self, name: str) -> PySparkDataFrame: ... + def read_dlo( + self, + name: str, + schema: Union[AtomicType, StructType, str, None] = None, + row_limit: int = 1000, + ) -> PySparkDataFrame: ... @abstractmethod - def read_dmo(self, name: str) -> PySparkDataFrame: ... + def read_dmo( + self, + name: str, + schema: Union[AtomicType, StructType, str, None] = None, + row_limit: int = 1000, + ) -> PySparkDataFrame: ... diff --git a/tests/io/reader/test_query_api.py b/tests/io/reader/test_query_api.py index b0bde69..b649354 100644 --- a/tests/io/reader/test_query_api.py +++ b/tests/io/reader/test_query_api.py @@ -277,3 +277,27 @@ def test_read_dmo_with_schema( reader_without_init.spark.createDataFrame.assert_called_once() args, _ = reader_without_init.spark.createDataFrame.call_args assert args[1] is custom_schema + + def test_read_dlo_with_custom_row_limit( + self, reader_without_init, mock_connection, mock_pandas_dataframe + ): + """Test read_dlo method with custom row_limit.""" + reader_without_init._conn = mock_connection + + reader_without_init.read_dlo("test_dlo", row_limit=50) + + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE.format("test_dlo", 50) + ) + + def test_read_dmo_with_custom_row_limit( + self, reader_without_init, mock_connection, mock_pandas_dataframe + ): + """Test read_dmo method with custom row_limit.""" + reader_without_init._conn = mock_connection + + reader_without_init.read_dmo("test_dmo", row_limit=25) + + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE.format("test_dmo", 25) + ) diff --git a/tests/test_client.py b/tests/test_client.py index 4e7b99e..40a52bb 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,11 +25,11 @@ class MockDataCloudReader(BaseDataCloudReader): CONFIG_NAME = "MockDataCloudReader" - def read_dlo(self, name: str) -> DataFrame: + def read_dlo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame: df = MagicMock(spec=DataFrame) return df - def read_dmo(self, name: str) -> DataFrame: + def read_dmo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame: df = MagicMock(spec=DataFrame) return df @@ -153,7 +153,7 @@ def test_read_dlo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dlo("test_dlo") - reader.read_dlo.assert_called_once_with("test_dlo") + reader.read_dlo.assert_called_once_with("test_dlo", row_limit=1000) assert result is mock_df assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] @@ -166,7 +166,7 @@ def test_read_dmo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dmo("test_dmo") - reader.read_dmo.assert_called_once_with("test_dmo") + reader.read_dmo.assert_called_once_with("test_dmo", row_limit=1000) assert result is mock_df assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] @@ -238,7 +238,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dlo("source_dlo") client.write_to_dlo("target_dlo", df, WriteMode.APPEND) - reader.read_dlo.assert_called_once_with("source_dlo") + reader.read_dlo.assert_called_once_with("source_dlo", row_limit=1000) writer.write_to_dlo.assert_called_once_with( "target_dlo", mock_df, WriteMode.APPEND ) @@ -253,13 +253,41 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dmo("source_dmo") client.write_to_dmo("target_dmo", df, WriteMode.MERGE) - reader.read_dmo.assert_called_once_with("source_dmo") + reader.read_dmo.assert_called_once_with("source_dmo", row_limit=1000) writer.write_to_dmo.assert_called_once_with( "target_dmo", mock_df, WriteMode.MERGE ) assert "source_dmo" in client._data_layer_history[DataCloudObjectType.DMO] + def test_read_dlo_with_row_limit(self, reset_client, mock_spark, mock_proxy): + """Test that row_limit parameter is passed through to reader.""" + reader = MagicMock(spec=BaseDataCloudReader) + writer = MagicMock(spec=BaseDataCloudWriter) + mock_df = MagicMock(spec=DataFrame) + reader.read_dlo.return_value = mock_df + + client = Client(reader=reader, writer=writer, proxy=mock_proxy) + result = client.read_dlo("test_dlo", row_limit=500) + + reader.read_dlo.assert_called_once_with("test_dlo", row_limit=500) + assert result is mock_df + assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] + + def test_read_dmo_with_row_limit(self, reset_client, mock_spark, mock_proxy): + """Test that row_limit parameter is passed through to reader.""" + reader = MagicMock(spec=BaseDataCloudReader) + writer = MagicMock(spec=BaseDataCloudWriter) + mock_df = MagicMock(spec=DataFrame) + reader.read_dmo.return_value = mock_df + + client = Client(reader=reader, writer=writer, proxy=mock_proxy) + result = client.read_dmo("test_dmo", row_limit=100) + + reader.read_dmo.assert_called_once_with("test_dmo", row_limit=100) + assert result is mock_df + assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] + # Add tests for DefaultSparkSessionProvider class TestDefaultSparkSessionProvider: