-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcredentials.py
More file actions
268 lines (214 loc) · 9.16 KB
/
credentials.py
File metadata and controls
268 lines (214 loc) · 9.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# Copyright (c) 2025, Salesforce, Inc.
# SPDX-License-Identifier: Apache-2
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import configparser
from dataclasses import dataclass, field
from enum import Enum
import os
from typing import Optional
from loguru import logger
INI_FILE = os.path.expanduser("~/.datacustomcode/credentials.ini")
class AuthType(str, Enum):
"""Supported authentication methods for Salesforce Data Cloud."""
OAUTH_TOKENS = "oauth_tokens"
CLIENT_CREDENTIALS = "client_credentials"
# Environment variable mappings for each auth type
ENV_CREDENTIALS_COMMON = {
"login_url": "SFDC_LOGIN_URL",
"client_id": "SFDC_CLIENT_ID",
}
ENV_CREDENTIALS_OAUTH_TOKENS = {
"client_secret": "SFDC_CLIENT_SECRET",
"refresh_token": "SFDC_REFRESH_TOKEN",
"core_token": "SFDC_CORE_TOKEN",
}
ENV_CREDENTIALS_CLIENT_CREDENTIALS = {
"client_secret": "SFDC_CLIENT_SECRET",
}
@dataclass
class Credentials:
"""Flexible credentials supporting multiple authentication methods.
Supports two authentication methods:
- OAUTH_TOKENS: OAuth tokens (refresh_token) authentication (default)
- CLIENT_CREDENTIALS: Server-to-server integration using client_id/secret only
"""
# Required for all auth types
login_url: str
client_id: str
auth_type: AuthType = field(default=AuthType.OAUTH_TOKENS)
# Common field
client_secret: Optional[str] = None
# OAuth Tokens flow fields
core_token: Optional[str] = None
refresh_token: Optional[str] = None
def __post_init__(self):
"""Validate credentials based on auth_type."""
self._validate()
def _validate(self) -> None:
"""Validate that required fields are present for the auth type."""
if self.auth_type == AuthType.OAUTH_TOKENS:
missing = []
if not self.refresh_token:
missing.append("refresh_token")
if not self.client_secret:
missing.append("client_secret")
if missing:
raise ValueError(f"OAuth Tokens auth requires: {', '.join(missing)}")
elif self.auth_type == AuthType.CLIENT_CREDENTIALS:
if not self.client_secret:
raise ValueError("Client Credentials auth requires: client_secret")
@classmethod
def from_ini(
cls,
profile: str = "default",
ini_file: str = INI_FILE,
) -> Credentials:
"""Load credentials from INI file.
Args:
profile: Profile section name in the INI file (default: "default")
ini_file: Path to the credentials INI file
Returns:
Credentials instance loaded from the INI file
Raises:
KeyError: If the profile or required fields are missing
"""
config = configparser.ConfigParser()
expanded_ini_file = os.path.expanduser(ini_file)
logger.debug(f"Reading {expanded_ini_file} for profile {profile}")
if not os.path.exists(expanded_ini_file):
raise FileNotFoundError(f"Credentials file not found: {expanded_ini_file}")
config.read(expanded_ini_file)
if profile not in config:
raise KeyError(f"Profile '{profile}' not found in {expanded_ini_file}")
section = config[profile]
# Determine auth type (default to oauth_tokens)
auth_type_str = section.get("auth_type", AuthType.OAUTH_TOKENS.value)
try:
auth_type = AuthType(auth_type_str)
except ValueError as exc:
raise ValueError(
f"Invalid auth_type '{auth_type_str}' in profile '{profile}'. "
f"Valid options: {[t.value for t in AuthType]}"
) from exc
return cls(
login_url=section["login_url"],
client_id=section["client_id"],
auth_type=auth_type,
client_secret=section.get("client_secret"),
# OAuth Tokens fields
core_token=section.get("core_token"),
refresh_token=section.get("refresh_token"),
)
@classmethod
def from_env(cls) -> Credentials:
"""Load credentials from environment variables.
Environment variables:
Common (required):
SFDC_LOGIN_URL: Salesforce login URL
SFDC_CLIENT_ID: External Client App client ID
SFDC_AUTH_TYPE: Authentication type (optional, defaults to oauth_tokens)
For oauth_tokens (default):
SFDC_CLIENT_SECRET: External Client App client secret
SFDC_REFRESH_TOKEN: OAuth refresh token
SFDC_CORE_TOKEN: OAuth core/access token (optional)
Returns:
Credentials instance loaded from environment variables
Raises:
ValueError: If required environment variables are missing
"""
# Check for common required variables
login_url = os.environ.get("SFDC_LOGIN_URL")
client_id = os.environ.get("SFDC_CLIENT_ID")
if not login_url or not client_id:
raise ValueError(
"Environment variables SFDC_LOGIN_URL and SFDC_CLIENT_ID are required."
)
# Determine auth type
auth_type_str = os.environ.get("SFDC_AUTH_TYPE", AuthType.OAUTH_TOKENS.value)
try:
auth_type = AuthType(auth_type_str)
except ValueError as exc:
raise ValueError(
f"Invalid SFDC_AUTH_TYPE '{auth_type_str}'. "
f"Valid options: {[t.value for t in AuthType]}"
) from exc
return cls(
login_url=login_url,
client_id=client_id,
auth_type=auth_type,
client_secret=os.environ.get("SFDC_CLIENT_SECRET"),
# OAuth Tokens fields
core_token=os.environ.get("SFDC_CORE_TOKEN"),
refresh_token=os.environ.get("SFDC_REFRESH_TOKEN"),
)
@classmethod
def from_available(cls, profile: str = "default") -> Credentials:
"""Load credentials from the first available source.
Checks sources in order:
1. Environment variables (if SFDC_LOGIN_URL is set)
2. INI file (~/.datacustomcode/credentials.ini)
Args:
profile: Profile name to use when loading from INI file
Returns:
Credentials instance from the first available source
Raises:
ValueError: If no credentials are found in any source
"""
# Check environment variables first
if os.environ.get("SFDC_LOGIN_URL"):
logger.debug("Loading credentials from environment variables")
return cls.from_env()
# Check INI file
if os.path.exists(os.path.expanduser(INI_FILE)):
logger.debug(f"Loading credentials from INI file: {INI_FILE}")
return cls.from_ini(profile=profile)
raise ValueError(
"Credentials not found in environment or INI file. "
"Run `datacustomcode configure` to create a credentials file."
)
def update_ini(self, profile: str = "default", ini_file: str = INI_FILE) -> None:
"""Save credentials to INI file.
Args:
profile: Profile section name in the INI file
ini_file: Path to the credentials INI file
"""
config = configparser.ConfigParser()
expanded_ini_file = os.path.expanduser(ini_file)
os.makedirs(os.path.dirname(expanded_ini_file), exist_ok=True)
if os.path.exists(expanded_ini_file):
config.read(expanded_ini_file)
if profile not in config:
config[profile] = {}
# Always save common fields
config[profile]["auth_type"] = self.auth_type.value
config[profile]["login_url"] = self.login_url
config[profile]["client_id"] = self.client_id
# Save fields based on auth type
if self.auth_type == AuthType.OAUTH_TOKENS:
config[profile]["client_secret"] = self.client_secret or ""
config[profile]["refresh_token"] = self.refresh_token or ""
if self.core_token:
config[profile]["core_token"] = self.core_token
# Remove fields from other auth types
for key in ["username", "password"]:
config[profile].pop(key, None)
elif self.auth_type == AuthType.CLIENT_CREDENTIALS:
config[profile]["client_secret"] = self.client_secret or ""
# Remove fields from other auth types
for key in ["username", "password", "refresh_token", "core_token"]:
config[profile].pop(key, None)
with open(expanded_ini_file, "w") as f:
config.write(f)
logger.debug(f"Saved credentials to {expanded_ini_file} [{profile}]")