Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ This library uses semantic versioning and follows Okta's [Library Version Policy

The latest release can always be found on the [releases page][github-releases].

**NOTE:** We have implemented DPoP and the support is now available in the latest version of the SDK.

## Need help?

If you run into problems using the SDK, you can:
Expand Down
28 changes: 22 additions & 6 deletions okta/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,28 @@ def __init__(
# use default configuration if none is provided
if configuration is None:
configuration = Configuration.get_default()
self.configuration = Configuration(
host=configuration["client"]["orgUrl"],
access_token=configuration["client"]["token"],
api_key=configuration["client"].get("privateKey", None),
authorization_mode=configuration["client"].get("authorizationMode", "SSWS"),
)

# Build Configuration with DPoP support if present
config_params = {
"host": configuration["client"]["orgUrl"],
"access_token": configuration["client"].get("token", None), # Use .get() to handle PrivateKey mode
"api_key": configuration["client"].get("privateKey", None),
"authorization_mode": configuration["client"].get("authorizationMode", "SSWS"),
}

# Store DPoP parameters in Configuration for completeness.
# NOTE: DPoP proof generation and header injection are handled
# entirely by RequestExecutor → OAuth → DPoPProofGenerator.
# The Configuration object stores these values but they are NOT
# consumed by the synchronous urllib3/RESTClientObject path.
if configuration["client"].get("dpopEnabled", False):
config_params.update({
"dpop_enabled": True,
"dpop_private_key": configuration["client"].get("privateKey"),
"dpop_key_rotation_interval": configuration["client"].get("dpopKeyRotationInterval", 86400),
})

self.configuration = Configuration(**config_params)

if self.configuration.event_listeners is not None:
if len(self.configuration.event_listeners["call_api_started"]) > 0:
Expand Down
10 changes: 9 additions & 1 deletion okta/cache/no_op_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ class NoOpCache(Cache):
This is a disabled Cache Class where no operations occur
in the cache.
Implementing the okta.cache.cache.Cache abstract class.

.. note::
**DPoP with NoOpCache**: When using NoOpCache, the ``OKTA_ACCESS_TOKEN``
cache key is not persisted between requests. However, the OAuth class
maintains its own internal token state, so tokens are still reused
across requests without hitting the authorization server each time.
For production use with DPoP, enabling OktaCache provides an additional
layer of caching that avoids redundant ``get_oauth_token()`` calls.
"""

def __init__(self):
super()
super().__init__()

def get(self, key):
"""
Expand Down
34 changes: 25 additions & 9 deletions okta/cache/okta_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,31 @@
import time

from okta.cache.cache import Cache
from okta.constants import LOGGER_NAME

logger = logging.getLogger("okta-sdk-python")
logger = logging.getLogger(LOGGER_NAME)


class OktaCache(Cache):
"""
This is a base class implementing a Cache using TTL and TTI.
Implementing the okta.cache.cache.Cache abstract class.

THREAD SAFETY WARNING:
---------------------
This cache implementation is NOT thread-safe and should only be used in
single-threaded or single-coroutine contexts. In concurrent environments
(e.g., asyncio with multiple coroutines accessing the same cache instance),
race conditions may occur during cache operations.

For multi-threaded applications, consider:
1. Using threading.local() to create per-thread cache instances
2. Implementing a thread-safe cache wrapper with locks
3. Using an external cache (Redis, Memcached) for distributed scenarios

The default SDK usage pattern (one client instance per thread/coroutine)
is safe. Issues only arise when sharing a single client across multiple
concurrent execution contexts.
"""

def __init__(self, ttl, tti):
Expand All @@ -30,7 +47,7 @@ def __init__(self, ttl, tti):
ttl {float} -- Time to Live: for cache entries
tti {float} -- Time to Idle: for cache entries
"""
super() # Inherit from parent class
super().__init__() # Inherit from parent class
self._store = {} # key -> {value, TTI, TTL}
self._time_to_live = ttl
self._time_to_idle = tti
Expand All @@ -55,8 +72,7 @@ def get(self, key):
entry["tti"] = now + self._time_to_idle
# Return desired value and update cache
self._clean_cache()
logger.info(f'Got value from cache for key "{key}".')
logger.debug(f'Cached value for key {key}: {entry["value"]}')
logger.info('Got value from cache for key "%s".', key)
return entry["value"]

# Return None if key isn't in cache and update cache
Expand All @@ -75,13 +91,14 @@ def contains(self, key):
"""
return key in self._store and self._is_valid_entry(self._store[key])

def add(self, key: str, value: tuple):
def add(self, key: str, value):
"""
Adds a key-value pair to the cache.

Arguments:
key {str} -- Key in pair
value {tuple} -- Tuple of response and response body
value -- Value to cache (e.g., tuple of response and body,
or tuple of access_token and token_type)
"""
if isinstance(key, str) and (
not isinstance(value, list) or not isinstance(value[1], list)
Expand All @@ -95,8 +112,7 @@ def add(self, key: str, value: tuple):
"tti": now + self._time_to_idle,
"ttl": now + self._time_to_live,
}
logger.info(f'Added to cache value for key "{key}".')
logger.debug(f"Cached value for key {key}: {value}.")
logger.info('Added to cache value for key "%s".', key)
# Update cache
self._clean_cache()

Expand All @@ -111,7 +127,7 @@ def delete(self, key):
if key in self._store:
# Delete entry
del self._store[key]
logger.info(f'Removed value from cache for key "{key}".')
logger.info('Removed value from cache for key "%s".', key)

def clear(self):
"""
Expand Down
8 changes: 8 additions & 0 deletions okta/config/config_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ConfigSetter:
"proxy": {"port": "", "host": "", "username": "", "password": ""},
"rateLimit": {"maxRetries": ""},
"oauthTokenRenewalOffset": "",
"dpopEnabled": "",
"dpopKeyRotationInterval": "",
},
"testing": {"testingDisableHttpsCheck": ""},
}
Expand All @@ -63,6 +65,12 @@ def get_config(self):
Returns a deep copy to prevent external modification of internal state
and to avoid holding references to sensitive values.

NOTE: Deep copying creates duplicate copies of all config data in memory,
including private keys. While this prevents external mutation, it means
sensitive data may be duplicated. Callers should not store this config
unnecessarily and should allow Python's garbage collector to clean up
the copy when no longer needed.

Returns:
dict -- Deep copy of the client configuration dictionary
"""
Expand Down
111 changes: 109 additions & 2 deletions okta/config/config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
# See the License for the specific language governing permissions and limitations under the License.
# coding: utf-8

from okta.constants import FINDING_OKTA_DOMAIN, REPO_URL
import logging

from okta.constants import (
FINDING_OKTA_DOMAIN, LOGGER_NAME, REPO_URL,
MIN_DPOP_KEY_ROTATION_SECONDS, MAX_DPOP_KEY_ROTATION_SECONDS,
)
from okta.error_messages import (
ERROR_MESSAGE_ORG_URL_MISSING,
ERROR_MESSAGE_API_TOKEN_DEFAULT,
Expand All @@ -26,6 +31,8 @@
ERROR_MESSAGE_PROXY_INVALID_PORT,
)

logger = logging.getLogger(LOGGER_NAME)


class ConfigValidator:
"""
Expand All @@ -51,7 +58,6 @@ def validate_config(self):
errors = []
# Defensive: default to empty dict if sections are missing
client = self._config.get("client", {})
_ = self._config.get("testing", {})

# check org url
errors += self._validate_org_url(client.get("orgUrl", ""))
Expand All @@ -61,6 +67,14 @@ def validate_config(self):
# check API details based on authorization mode
if client.get("authorizationMode") in ("SSWS", "Bearer"):
errors += self._validate_token(client.get("token", ""))
# Warn if DPoP is enabled with a non-OAuth auth mode (it has no effect)
if client.get("dpopEnabled", False):
logger.warning(
"dpopEnabled is True but authorizationMode is '%s'. "
"DPoP only applies to 'PrivateKey' (OAuth) mode and will "
"be ignored. Set authorizationMode to 'PrivateKey' to use DPoP.",
client.get("authorizationMode"),
)
elif client.get("authorizationMode") == "PrivateKey":
client_fields = [
"clientId",
Expand All @@ -70,6 +84,8 @@ def validate_config(self):
]
client_fields_values = [client.get(field, "") for field in client_fields]
errors += self._validate_client_fields(*client_fields_values)
# Validate DPoP configuration if enabled
errors += self._validate_dpop_config(client)
else: # Not a valid authorization mode
errors += [
(
Expand Down Expand Up @@ -226,3 +242,94 @@ def _validate_proxy_settings(self, proxy):
proxy_errors.append(ERROR_MESSAGE_PROXY_INVALID_PORT)

return proxy_errors

def _validate_dpop_config(self, client):
"""
Validate DPoP-specific configuration.

Coerces string values from environment variables to their expected
types (bool for ``dpopEnabled``, int for ``dpopKeyRotationInterval``)
before validation. The coerced values are written back into *client*
so that downstream code receives the correct Python types.

Note: This method is only called when authorizationMode is 'PrivateKey',
so no need to re-check the auth mode here.

Args:
client (dict): Client configuration dict (mutated in-place for
type coercion of string values from environment variables).

Returns:
list: List of error messages (empty if valid)
"""

errors = []

dpop_enabled = client.get('dpopEnabled', False)

# Coerce string from env vars to bool (e.g. "true" → True)
if isinstance(dpop_enabled, str):
dpop_enabled = dpop_enabled.strip().lower() == 'true'
client['dpopEnabled'] = dpop_enabled

# Validate dpopEnabled is a boolean
if not isinstance(dpop_enabled, bool):
errors.append(
"dpopEnabled must be a boolean (True/False), "
f"but got {type(dpop_enabled).__name__}: {dpop_enabled!r}"
)
return errors # Cannot validate further if type is wrong

if not dpop_enabled:
return errors # DPoP not enabled, nothing to validate

# Validate key rotation interval
rotation_interval = client.get('dpopKeyRotationInterval', 86400)

# Coerce string from env vars to int (e.g. "86400" → 86400)
if isinstance(rotation_interval, str):
try:
rotation_interval = int(rotation_interval)
client['dpopKeyRotationInterval'] = rotation_interval
except ValueError:
errors.append(
"dpopKeyRotationInterval must be a valid integer "
f"(seconds), but got non-numeric string: "
f"{rotation_interval!r}"
)
return errors

if not isinstance(rotation_interval, int):
errors.append(
"dpopKeyRotationInterval must be an integer (seconds), "
f"but got {type(rotation_interval).__name__}"
)
elif rotation_interval < MIN_DPOP_KEY_ROTATION_SECONDS:
errors.append(
"dpopKeyRotationInterval must be at least "
f"{MIN_DPOP_KEY_ROTATION_SECONDS} seconds (1 hour), "
f"but got {rotation_interval} seconds. "
"Shorter intervals may cause performance issues."
)
elif rotation_interval > MAX_DPOP_KEY_ROTATION_SECONDS:
max_days = MAX_DPOP_KEY_ROTATION_SECONDS // 86400
given_days = rotation_interval // 86400
errors.append(
"dpopKeyRotationInterval must be at most "
f"{MAX_DPOP_KEY_ROTATION_SECONDS} seconds "
f"({max_days} days), but got {rotation_interval} "
f"seconds ({given_days} days). "
"Excessive rotation intervals defeat the security "
"purpose of DPoP. "
"Recommended: 24-48 hours for production use."
)
elif rotation_interval > 7 * 24 * 3600: # Warning for > 7 days
# This is a warning, not an error
logger.warning(
"dpopKeyRotationInterval is very long (%d seconds, "
"%d days). Consider shorter intervals (24-48 hours) "
"for better security.",
rotation_interval, rotation_interval // 86400,
)

return errors
19 changes: 19 additions & 0 deletions okta/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ class Configuration:
:param ssl_ca_cert: str - the path to a file of concatenated CA certificates
in PEM format.

warning::
**Thread Safety**: Configuration objects should be treated as immutable
after initialization. Modifying configuration attributes after passing
to Client/ApiClient may result in undefined behavior in multi-threaded
or async environments.

:Example:

API Key Authentication Example.
Expand Down Expand Up @@ -109,6 +115,9 @@ def __init__(
server_operation_variables=None,
ssl_ca_cert=None,
authorization_mode=None,
dpop_enabled=False,
dpop_private_key=None,
dpop_key_rotation_interval=86400,
) -> None:
"""Constructor"""
self._base_path = "https://subdomain.okta.com" if host is None else host
Expand Down Expand Up @@ -148,6 +157,16 @@ def __init__(
self.access_token = access_token
"""Access token
"""
# DPoP Settings
self.dpop_enabled = dpop_enabled
"""Enable DPoP (Demonstrating Proof-of-Possession) per RFC 9449
"""
self.dpop_private_key = dpop_private_key
"""Private key for DPoP proof generation
"""
self.dpop_key_rotation_interval = dpop_key_rotation_interval
"""Key rotation interval in seconds (default: 86400 = 24 hours)
"""
self.logger = {}
"""Logging Settings
"""
Expand Down
10 changes: 10 additions & 0 deletions okta/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@

SWA_APP_NAME = "template_swa"
SWA3_APP_NAME = "template_swa3field"

# DPoP (Demonstrating Proof-of-Possession) constants
MIN_DPOP_KEY_ROTATION_SECONDS = 3600 # 1 hour minimum
MAX_DPOP_KEY_ROTATION_SECONDS = 90 * 24 * 3600 # 90 days maximum
MAX_DPOP_NONCE_RETRIES = 2
MAX_DPOP_BACKOFF_DELAY = 1.0 # Maximum backoff delay in seconds for nonce retries
DPOP_USER_AGENT_EXTENSION = "isDPoP:true"

# SDK-wide logger name — single source of truth for all hand-written modules
LOGGER_NAME = "okta-sdk-python"
Loading
Loading