Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 74 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ Use of this project with Salesforce is subject to the [TERMS OF USE](./TERMS_OF_
- JDK 17
- Docker support like [Docker Desktop](https://docs.docker.com/desktop/)
- A salesforce org with some DLOs or DMOs with data and this feature enabled (it is not GA)
- An [External Client App](#creating-an-external-client-app)
- **One of the following** for authentication:
- A Salesforce org already authenticated via the [Salesforce CLI](https://developer.salesforce.com/tools/salesforcecli)
(simplest — no External Client App needed)
- An [External Client App](#creating-an-external-client-app) configured with OAuth settings

## Installation
The SDK can be downloaded directly from PyPI with `pip`:
Expand Down Expand Up @@ -65,6 +68,13 @@ datacustomcode configure
datacustomcode run ./payload/entrypoint.py
```

> [!TIP]
> **Already using the Salesforce CLI?** If you have authenticated an org with `sf org login web
> --alias myorg`, you can skip `datacustomcode configure` entirely:
> ```zsh
> datacustomcode run ./payload/entrypoint.py --sf-cli-org myorg
> ```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen if/how this will work with jupyter notebook?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not. What I can say is this --sf-cli-org flag is in addition to what we already have. Everything else continues to work.


> [!IMPORTANT]
> The example entrypoint.py requires a `Account_std__dll` DLO to be present. And in order to deploy the script (next step), the output DLO (which is `Account_std_copy__dll` in the example entrypoint.py) also needs to exist and be in the same dataspace as `Account_std__dll`.

Expand Down Expand Up @@ -183,17 +193,19 @@ Options:
- `--auth-type TEXT`: Authentication method (default: `oauth_tokens`)
- `oauth_tokens` - OAuth tokens with refresh_token
- `client_credentials` - Server-to-server using client_id/secret only
- `--login-url TEXT`: Salesforce login URL

For OAuth Tokens authentication:
- `--client-id TEXT`: External Client App Client ID
- `--client-secret TEXT`: External Client App Client Secret
- `--refresh-token TEXT`: OAuth refresh token (see [Obtaining Refresh Token](#obtaining-refresh-token-and-core-token))
- `--core-token TEXT`: (Optional) OAuth core/access token - if not provided, it will be obtained using the refresh token
You will be prompted for the following depending on auth type:

*Common to all auth types:*
- **Login URL**: Salesforce login URL
- **Client ID**: External Client App Client ID

*For OAuth Tokens authentication:*
- **Client Secret**: External Client App Client Secret
- **Redirect URI**: OAuth redirect URI

For Client Credentials authentication (server-to-server):
- `--client-id TEXT`: External Client App Client ID
- `--client-secret TEXT`: External Client App Client Secret
*For Client Credentials authentication:*
- **Client Secret**: External Client App Client Secret

##### Using Environment Variables (Alternative)

Expand Down Expand Up @@ -255,6 +267,9 @@ Options:
- `--config-file TEXT`: Path to configuration file
- `--dependencies TEXT`: Additional dependencies (can be specified multiple times)
- `--profile TEXT`: Credential profile name (default: "default")
- `--sf-cli-org TEXT`: Salesforce CLI org alias or username (e.g. `dev1`). Fetches
credentials via `sf org display` — no `datacustomcode configure` step needed.
Takes precedence over `--profile` if both are supplied.


#### `datacustomcode zip`
Expand All @@ -277,7 +292,7 @@ Options:
- `--version TEXT`: Version of the transformation job (default: "0.0.1")
- `--description TEXT`: Description of the transformation job (default: "")
- `--network TEXT`: docker network (default: "default")
- `--cpu-size TEXT`: CPU size for the deployment (default: "CPU_XL"). Available options: CPU_L(Large), CPU_XL(Extra Large), CPU_2XL(2X Large), CPU_4XL(4X Large)
- `--cpu-size TEXT`: CPU size for the deployment (default: `CPU_2XL`). Available options: CPU_L(Large), CPU_XL(Extra Large), CPU_2XL(2X Large), CPU_4XL(4X Large)


## Docker usage
Expand Down Expand Up @@ -365,6 +380,54 @@ You can read more about Jupyter Notebooks here: https://jupyter.org/

You now have all fields necessary for the `datacustomcode configure` command.

### Using the Salesforce CLI for authentication

The [Salesforce CLI](https://developer.salesforce.com/tools/salesforcecli) (`sf`) lets you authenticate an org once and then reference it by alias across tools — including this SDK via `--sf-cli-org`.

#### Installing the Salesforce CLI

Follow the [official install guide](https://developer.salesforce.com/docs/atlas.en-us.sfdx_setup.meta/sfdx_setup/sfdx_setup_install_cli.htm), or use a package manager:

```zsh
# macOS (Homebrew)
brew install sf

# npm (all platforms)
npm install --global @salesforce/cli
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to make sure I'm following- are both of these needed? Or as a macOS user, only brew install is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally installed via npm and didn't even know brew was a thing until later. You are correct however when you say you only need one way or the other.

```

Verify the install:
```zsh
sf --version
```

#### Authenticating an org

**Browser-based (recommended for developer orgs and sandboxes):**
```zsh
# Production / Developer Edition
sf org login web --alias myorg

# Sandbox
sf org login web --alias mysandbox --instance-url https://test.salesforce.com

# Custom domain
sf org login web --alias myorg --instance-url https://mycompany.my.salesforce.com
```

Each command opens a browser tab. After you log in and approve access, the CLI stores the session locally.

**Verify the stored org and confirm the alias:**
```zsh
sf org list
sf org display --target-org myorg
```

Once authenticated, pass the alias directly to `datacustomcode run`:
```zsh
datacustomcode run ./payload/entrypoint.py --sf-cli-org myorg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we thinking about this in phases? I could see us wanting to utilize SF CLI as the main/default option, if not even the only option in time.

Did you consider saving the SF CLI org ailas in one of this CLI's config locations - that way --sf-cli-org wouldn't need to be remembered for each command run? And/or an env var that'd do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea and I did not consider this approach as the --profile doesn't work this way either. I will create a ticket to implement something like this in a future update.

```

### Obtaining Refresh Token and Core Token

If you're using OAuth Tokens authentication, the initial configure will retrieve and store tokens. Run `datacustomcode auth` to refresh these when they expire.
Expand Down
16 changes: 14 additions & 2 deletions src/datacustomcode/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import json
import os
import sys
from typing import List, Union
from typing import (
List,
Optional,
Union,
)

import click
from loguru import logger
Expand Down Expand Up @@ -294,12 +298,20 @@ def scan(filename: str, config: str, dry_run: bool, no_requirements: bool):
@click.option("--config-file", default=None)
@click.option("--dependencies", default=[], multiple=True)
@click.option("--profile", default="default")
@click.option(
"--sf-cli-org",
default=None,
help="SF CLI org alias or username. Fetches credentials via `sf org display`.",
)
def run(
entrypoint: str,
config_file: Union[str, None],
dependencies: List[str],
profile: str,
sf_cli_org: Optional[str],
):
from datacustomcode.run import run_entrypoint

run_entrypoint(entrypoint, config_file, dependencies, profile)
run_entrypoint(
entrypoint, config_file, dependencies, profile, sf_cli_org=sf_cli_org
)
79 changes: 41 additions & 38 deletions src/datacustomcode/io/reader/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,21 @@
Union,
)

import pandas.api.types as pd_types
from pyspark.sql.types import (
BooleanType,
DoubleType,
LongType,
StringType,
StructField,
StructType,
TimestampType,
)
from salesforcecdpconnector.connection import SalesforceCDPConnection

from datacustomcode.credentials import AuthType, Credentials
from datacustomcode.io.reader.base import BaseDataCloudReader
from datacustomcode.io.reader.sf_cli import SFCLIDataCloudReader
from datacustomcode.io.reader.utils import _pandas_to_spark_schema

if TYPE_CHECKING:
import pandas
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
from pyspark.sql.types import AtomicType
from pyspark.sql.types import AtomicType, StructType

logger = logging.getLogger(__name__)


SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}"
PANDAS_TYPE_MAPPING = {
"object": StringType(),
"int64": LongType(),
"float64": DoubleType(),
"bool": BooleanType(),
}


def _pandas_to_spark_schema(
pandas_df: pandas.DataFrame, nullable: bool = True
) -> StructType:
fields = []
for column, dtype in pandas_df.dtypes.items():
spark_type: AtomicType
if pd_types.is_datetime64_any_dtype(dtype):
spark_type = TimestampType()
else:
spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType())
fields.append(StructField(column, spark_type, nullable))
return StructType(fields)


def create_cdp_connection(
Expand Down Expand Up @@ -136,6 +107,7 @@ class QueryAPIDataCloudReader(BaseDataCloudReader):
Supports multiple authentication methods:
- OAuth Tokens (default, needs client_id/secret with refresh_token)
- Client Credentials (server-to-server, needs client_id/secret only)
- SF CLI (uses ``sf org display`` access token via the REST API directly)

Supports dataspace configuration for querying data within specific dataspaces.
When a dataspace is provided (and not "default"), queries are executed within
Expand All @@ -149,6 +121,7 @@ def __init__(
spark: SparkSession,
credentials_profile: str = "default",
dataspace: Optional[str] = None,
sf_cli_org: Optional[str] = None,
) -> None:
"""Initialize QueryAPIDataCloudReader.

Expand All @@ -160,14 +133,30 @@ def __init__(
dataspace: Optional dataspace identifier. If provided and not "default",
the connection will be configured for the specified dataspace.
When None or "default", uses the default dataspace.
sf_cli_org: Optional SF CLI org alias or username. When set, the
reader delegates to :class:`SFCLIDataCloudReader` which calls
the Data Cloud REST API directly using the token obtained from
``sf org display``, bypassing the CDP token-exchange flow.
"""
self.spark = spark
credentials = Credentials.from_available(profile=credentials_profile)
logger.debug(
"Initializing QueryAPIDataCloudReader with "
f"auth_type={credentials.auth_type.value}"
)
self._conn = create_cdp_connection(credentials, dataspace)
if sf_cli_org:
logger.debug(
f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'"
)
self._sf_cli_reader: Optional[SFCLIDataCloudReader] = SFCLIDataCloudReader(
spark=spark,
sf_cli_org=sf_cli_org,
dataspace=dataspace,
)
self._conn = None
else:
self._sf_cli_reader = None
credentials = Credentials.from_available(profile=credentials_profile)
logger.debug(
"Initializing QueryAPIDataCloudReader with "
f"auth_type={credentials.auth_type.value}"
)
self._conn = create_cdp_connection(credentials, dataspace)

def read_dlo(
self,
Expand All @@ -186,8 +175,15 @@ def read_dlo(
Returns:
PySparkDataFrame: The PySpark DataFrame.
"""
sf_cli_reader: Optional[SFCLIDataCloudReader] = getattr(
self, "_sf_cli_reader", None
)
if sf_cli_reader is not None:
return sf_cli_reader.read_dlo(name, schema, row_limit)

query = SQL_QUERY_TEMPLATE.format(name, row_limit)

assert self._conn is not None
pandas_df = self._conn.get_pandas_dataframe(query)

# Convert pandas DataFrame to Spark DataFrame
Expand All @@ -214,8 +210,15 @@ def read_dmo(
Returns:
PySparkDataFrame: The PySpark DataFrame.
"""
sf_cli_reader: Optional[SFCLIDataCloudReader] = getattr(
self, "_sf_cli_reader", None
)
if sf_cli_reader is not None:
return sf_cli_reader.read_dmo(name, schema, row_limit)

query = SQL_QUERY_TEMPLATE.format(name, row_limit)

assert self._conn is not None
pandas_df = self._conn.get_pandas_dataframe(query)

# Convert pandas DataFrame to Spark DataFrame
Expand Down
Loading