-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathPython-Sample-OAuth.py
More file actions
203 lines (176 loc) · 8.48 KB
/
Python-Sample-OAuth.py
File metadata and controls
203 lines (176 loc) · 8.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
from __future__ import annotations
import base64
import hashlib
import json
import secrets
import string
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
import requests
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from urllib.parse import quote, urlencode
# --------------------------------------------------------------------------- #
# PayPing OAuth2 / Client Registration Helper
# --------------------------------------------------------------------------- #
@dataclass
class PayPing:
"""
Secure, production-ready helper for PayPing OAuth2 (PKCE) flow
and client-registration API.
"""
# --------------------------------------------------------------------- #
# Configuration – read from Django settings (lazy)
# --------------------------------------------------------------------- #
CLIENT_ID: str = ""
CLIENT_SECRET: str = ""
TOKEN: str = "" # Service-to-service bearer token
PREFIX: str = "payping/callback"
SCOPES: str = "openid pay:write profile"
def __post_init__(self) -> None:
"""Load values from Django settings if they are not provided explicitly."""
self.CLIENT_ID = self.CLIENT_ID or getattr(settings, "PAYPING_CLIENT_ID", "")
self.CLIENT_SECRET = self.CLIENT_SECRET or getattr(settings, "PAYPING_CLIENT_SECRET", "")
self.TOKEN = self.TOKEN or getattr(settings, "PAYPING_SERVICE_TOKEN", "")
self.PREFIX = self.PREFIX or getattr(settings, "PAYPING_REDIRECT_PREFIX", "payping/callback")
if not all([self.CLIENT_ID, self.CLIENT_SECRET, self.TOKEN]):
raise ImproperlyConfigured(
"PAYPING_CLIENT_ID, PAYPING_CLIENT_SECRET and PAYPING_SERVICE_TOKEN must be defined."
)
@property
def REDIRECT_URI(self) -> str:
site_url = getattr(settings, "SITE_URL", None)
if not site_url:
raise ImproperlyConfigured("SITE_URL must be defined in Django settings.")
return f"{site_url.rstrip('/')}/{self.PREFIX.lstrip('/')}"
# --------------------------------------------------------------------- #
# PKCE helpers
# --------------------------------------------------------------------- #
@staticmethod
def generate_code_verifier() -> str:
token = secrets.token_bytes(64)
return base64.urlsafe_b64encode(token).rstrip(b"=").decode("utf-8")
@staticmethod
def generate_code_challenge(verifier: str) -> str:
digest = hashlib.sha256(verifier.encode("utf-8")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("utf-8")
# --------------------------------------------------------------------- #
# Secure username generation
# --------------------------------------------------------------------- #
@staticmethod
def generate_username(prefix: str = "hmyn", length: int = 20) -> str:
alphabet = string.ascii_letters + string.digits
rnd = secrets.SystemRandom()
return prefix + "".join(rnd.choice(alphabet) for _ in range(length))
# --------------------------------------------------------------------- #
# Centralised request handler
# --------------------------------------------------------------------- #
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
timeout = kwargs.pop("timeout", 15)
response = requests.request(method, url, timeout=timeout, **kwargs)
try:
response.raise_for_status()
except requests.HTTPError as exc:
# Try to extract a meaningful error message from PayPing
try:
error_detail = response.json()
except Exception:
error_detail = response.text or "No details"
raise requests.HTTPError(
f"PayPing API error {response.status_code}: {error_detail}"
) from exc
return response
# --------------------------------------------------------------------- #
# Check if e-mail already exists
# --------------------------------------------------------------------- #
def check_user_exists(self, email: str) -> bool:
url = f"https://oauth.payping.ir/v1/client/EmailExist?Email={quote(email)}"
headers = {"Authorization": f"Bearer {self.TOKEN}"}
resp = self._request("GET", url, headers=headers)
return resp.text.strip().lower() == "true"
# --------------------------------------------------------------------- #
# Initiate client registration (returns URL for user)
# --------------------------------------------------------------------- #
def initiate_client_registration(
self,
return_url: str,
email: str,
sheba: str,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
phone_number: Optional[str] = None,
national_code: Optional[str] = None,
birth_day: Optional[str] = None, # YYYY-MM-DD
) -> str:
if not username:
username = self.generate_username()
payload = {
"UserName": username,
"Email": email,
"FirstName": first_name,
"LastName": last_name,
"PhoneNumber": phone_number,
"NationalCode": national_code,
"BirthDay": birth_day,
"ReturnUrl": return_url,
"ClientId": self.CLIENT_ID,
"Sheba": sheba,
}
# Remove None values – PayPing rejects them
payload = {k: v for k, v in payload.items() if v is not None}
resp = self._request(
"POST",
"https://oauth.payping.ir/v1/client/ClientRegisterInit",
json=payload,
headers={"Authorization": f"Bearer {self.TOKEN}"},
)
register_id = resp.text.strip().strip('"')
return f"https://oauth.payping.ir/Client/ClientRegister?registerId={register_id}"
# --------------------------------------------------------------------- #
# Build authorization URL (PKCE)
# --------------------------------------------------------------------- #
def get_authorization_url(self, code_verifier: str, state: str) -> str:
code_challenge = self.generate_code_challenge(code_verifier)
params = {
"scope": self.SCOPES,
"response_type": "code",
"client_id": self.CLIENT_ID,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"redirect_uri": self.REDIRECT_URI,
"state": state,
}
return "https://oauth.payping.ir/connect/authorize?" + urlencode(params, quote_via=quote)
# --------------------------------------------------------------------- #
# Exchange authorization code for access token
# --------------------------------------------------------------------- #
def exchange_code_for_token(self, code_verifier: str, code: str) -> Tuple[str, int]:
url = "https://oauth.payping.ir/connect/token"
data = {
"grant_type": "authorization_code",
"client_id": self.CLIENT_ID,
"client_secret": self.CLIENT_SECRET,
"code_verifier": code_verifier,
"code": code,
"redirect_uri": self.REDIRECT_URI,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
resp = self._request("POST", url, data=data, headers=headers)
try:
token_data = resp.json()
except json.JSONDecodeError as exc:
raise ValueError("PayPing token endpoint returned invalid JSON") from exc
access_token = token_data.get("access_token")
expires_in = int(token_data.get("expires_in", 0))
if not access_token:
raise ValueError("Access token missing in PayPing response")
return access_token, expires_in
# --------------------------------------------------------------------- #
# Fetch user information
# --------------------------------------------------------------------- #
def get_userinfo(self, access_token: str) -> Dict[str, Any]:
url = "https://oauth.payping.ir/connect/userinfo"
headers = {"Authorization": f"Bearer {access_token}"}
resp = self._request("GET", url, headers=headers)
return resp.json()