Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Use of this project with Salesforce is subject to the [TERMS OF USE](./TERMS_OF_
## Prerequisites

- **Python 3.11 only** (currently supported version - if your system version is different, we recommend using [pyenv](https://github.com/pyenv/pyenv) to configure 3.11)
- [Azul Zulu OpenJDK 17.x](https://www.azul.com/downloads/?version=java-17-lts&package=jdk#zulu)
- JDK 17
- Docker support like [Docker Desktop](https://docs.docker.com/desktop/)
- A salesforce org with some DLOs or DMOs with data and this feature enabled (it is not GA)
- An [External Client App](#creating-an-external-client-app)
Expand Down
10 changes: 6 additions & 4 deletions src/datacustomcode/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,29 +185,31 @@ def _new_function_client(cls) -> Client:
)
return cls._instance

def read_dlo(self, name: str) -> PySparkDataFrame:
def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
"""Read a DLO from Data Cloud.

Args:
name: The name of the DLO to read.
row_limit: Maximum number of rows to fetch (default: 1000).

Returns:
A PySpark DataFrame containing the DLO data.
"""
self._record_dlo_access(name)
return self._reader.read_dlo(name)
return self._reader.read_dlo(name, row_limit=row_limit)

def read_dmo(self, name: str) -> PySparkDataFrame:
def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
"""Read a DMO from Data Cloud.

Args:
name: The name of the DMO to read.
row_limit: Maximum number of rows to fetch (default: 1000).

Returns:
A PySpark DataFrame containing the DMO data.
"""
self._record_dmo_access(name)
return self._reader.read_dmo(name)
return self._reader.read_dmo(name, row_limit=row_limit)

def write_to_dlo(
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs
Expand Down
17 changes: 14 additions & 3 deletions src/datacustomcode/io/reader/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,31 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union

from datacustomcode.io.base import BaseDataAccessLayer

if TYPE_CHECKING:
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
from pyspark.sql.types import AtomicType, StructType


class BaseDataCloudReader(BaseDataAccessLayer):
def __init__(self, spark: SparkSession):
self.spark = spark

@abstractmethod
def read_dlo(self, name: str) -> PySparkDataFrame: ...
def read_dlo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame: ...

@abstractmethod
def read_dmo(self, name: str) -> PySparkDataFrame: ...
def read_dmo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is accepting schema in these methods required? It doesn't look like we're using it are we?

row_limit: int = 1000,
) -> PySparkDataFrame: ...
24 changes: 24 additions & 0 deletions tests/io/reader/test_query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,27 @@ def test_read_dmo_with_schema(
reader_without_init.spark.createDataFrame.assert_called_once()
args, _ = reader_without_init.spark.createDataFrame.call_args
assert args[1] is custom_schema

def test_read_dlo_with_custom_row_limit(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dlo method with custom row_limit."""
reader_without_init._conn = mock_connection

reader_without_init.read_dlo("test_dlo", row_limit=50)

mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dlo", 50)
)

def test_read_dmo_with_custom_row_limit(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dmo method with custom row_limit."""
reader_without_init._conn = mock_connection

reader_without_init.read_dmo("test_dmo", row_limit=25)

mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dmo", 25)
)
40 changes: 34 additions & 6 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ class MockDataCloudReader(BaseDataCloudReader):

CONFIG_NAME = "MockDataCloudReader"

def read_dlo(self, name: str) -> DataFrame:
def read_dlo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame:
df = MagicMock(spec=DataFrame)
return df

def read_dmo(self, name: str) -> DataFrame:
def read_dmo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame:
df = MagicMock(spec=DataFrame)
return df

Expand Down Expand Up @@ -153,7 +153,7 @@ def test_read_dlo(self, reset_client, mock_spark, mock_proxy):
client = Client(reader=reader, writer=writer, proxy=mock_proxy)
result = client.read_dlo("test_dlo")

reader.read_dlo.assert_called_once_with("test_dlo")
reader.read_dlo.assert_called_once_with("test_dlo", row_limit=1000)
assert result is mock_df
assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO]

Expand All @@ -166,7 +166,7 @@ def test_read_dmo(self, reset_client, mock_spark, mock_proxy):
client = Client(reader=reader, writer=writer, proxy=mock_proxy)
result = client.read_dmo("test_dmo")

reader.read_dmo.assert_called_once_with("test_dmo")
reader.read_dmo.assert_called_once_with("test_dmo", row_limit=1000)
assert result is mock_df
assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO]

Expand Down Expand Up @@ -238,7 +238,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy):
df = client.read_dlo("source_dlo")
client.write_to_dlo("target_dlo", df, WriteMode.APPEND)

reader.read_dlo.assert_called_once_with("source_dlo")
reader.read_dlo.assert_called_once_with("source_dlo", row_limit=1000)
writer.write_to_dlo.assert_called_once_with(
"target_dlo", mock_df, WriteMode.APPEND
)
Expand All @@ -253,13 +253,41 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy):
df = client.read_dmo("source_dmo")
client.write_to_dmo("target_dmo", df, WriteMode.MERGE)

reader.read_dmo.assert_called_once_with("source_dmo")
reader.read_dmo.assert_called_once_with("source_dmo", row_limit=1000)
writer.write_to_dmo.assert_called_once_with(
"target_dmo", mock_df, WriteMode.MERGE
)

assert "source_dmo" in client._data_layer_history[DataCloudObjectType.DMO]

def test_read_dlo_with_row_limit(self, reset_client, mock_spark, mock_proxy):
"""Test that row_limit parameter is passed through to reader."""
reader = MagicMock(spec=BaseDataCloudReader)
writer = MagicMock(spec=BaseDataCloudWriter)
mock_df = MagicMock(spec=DataFrame)
reader.read_dlo.return_value = mock_df

client = Client(reader=reader, writer=writer, proxy=mock_proxy)
result = client.read_dlo("test_dlo", row_limit=500)

reader.read_dlo.assert_called_once_with("test_dlo", row_limit=500)
assert result is mock_df
assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO]

def test_read_dmo_with_row_limit(self, reset_client, mock_spark, mock_proxy):
"""Test that row_limit parameter is passed through to reader."""
reader = MagicMock(spec=BaseDataCloudReader)
writer = MagicMock(spec=BaseDataCloudWriter)
mock_df = MagicMock(spec=DataFrame)
reader.read_dmo.return_value = mock_df

client = Client(reader=reader, writer=writer, proxy=mock_proxy)
result = client.read_dmo("test_dmo", row_limit=100)

reader.read_dmo.assert_called_once_with("test_dmo", row_limit=100)
assert result is mock_df
assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO]


# Add tests for DefaultSparkSessionProvider
class TestDefaultSparkSessionProvider:
Expand Down