Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 125 additions & 45 deletions src/vaultwarden/clients/bitwarden.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
import typing
from typing import Literal
from uuid import UUID

from httpx import Client, Response

from vaultwarden.models.bitwarden import CipherDetail, RegisterData
from vaultwarden.models.crypto import CryptoContext
from vaultwarden.models.exception_models import BitwardenError
from vaultwarden.models.sync import ConnectToken, SyncData
from vaultwarden.utils.crypto import make_master_key
from vaultwarden.utils.logger import log_raise_for_status

if typing.TYPE_CHECKING:
from vaultwarden.models.bitwarden import (
CipherDetails,
Kdf,
Organization,
OrganizationCollection,
)


class BitwardenAPIClient:
def __init__(
self,
url: str,
email: str,
email: str | None,
password: str,
client_id: str,
client_secret: str,
Expand All @@ -23,7 +33,7 @@ def __init__(
# if one of the parameters is None, raise an exception
if not all([url, password, client_id, client_secret, device_id]):
raise BitwardenError("All parameters are required")
self.email = email
self.email: str | None = email
self.password = password
self.client_id = client_id
self.client_secret = client_secret
Expand Down Expand Up @@ -53,34 +63,15 @@ def _refresh_connect_token(self):
or self.connect_token.refresh_token is None
):
self._set_connect_token()
return
headers = {
"content-type": "application/x-www-form-urlencoded; charset=utf-8",
}
payload = {
"grant_type": "refresh_token",
"refresh_token": self.connect_token.refresh_token,
}
resp = self._http_client.post(
"identity/connect/token", headers=headers, data=payload
)
self._connect_token = ConnectToken.model_validate_json(resp.text)

import vaultwarden.models.bitwarden

self._connect_token.master_key = make_master_key(
password=self.password,
salt=self.email,
kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token(
self._connect_token
),
)
else:
payload = {
"grant_type": "refresh_token",
"refresh_token": self.connect_token.refresh_token,
}
self._set_connect_token(payload)

def _set_connect_token(self):
headers = {
"content-type": "application/x-www-form-urlencoded; charset=utf-8",
}
payload = {
def _set_connect_token(self, refresh: dict | None = None):
payload = refresh or {
"grant_type": "client_credentials",
"client_secret": f"{self.client_secret}",
"client_id": f"{self.client_id}",
Expand All @@ -90,31 +81,28 @@ def _set_connect_token(self):
"deviceIdentifier": f"{self.device_id}",
"deviceName": "python-vaultwarden",
}
headers = {
"content-type": "application/x-www-form-urlencoded; charset=utf-8",
}
resp = self._http_client.post(
"identity/connect/token", headers=headers, data=payload
)
self._connect_token = ConnectToken.model_validate_json(resp.text)

if self.email is None:
access_token = resp.json()["access_token"]
headers = {
"Authorization": f"Bearer {self._connect_token.access_token}",
"Authorization": f"Bearer {access_token}",
"content-type": "application/json; charset=utf-8",
"Accept": "*/*",
}
resp = self._http_client.get(
mresp = self._http_client.get(
"api/accounts/profile", headers=headers
)
self.email = resp.json()["email"]
self.email = mresp.json()["email"]

import vaultwarden.models.bitwarden

self._connect_token.master_key = make_master_key(
password=self.password,
salt=self.email,
kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token(
self._connect_token
),
self._connect_token = ConnectToken.model_validate_json(
resp.text, context=CryptoContext(client=self)
)

return

# login to api
Expand Down Expand Up @@ -145,15 +133,107 @@ def _api_request(
raise BitwardenError("Fail to connect")
headers = {
"Authorization": f"Bearer {self.connect_token.access_token}",
"content-type": "application/json; charset=utf-8",
"Accept": "*/*",
}

if kwargs.get("json") is not None:
headers["content-type"] = "application/json; charset=utf-8"

return self._http_client.request(
method, path, headers=headers, **kwargs
)

def sync(self, force_refresh: bool = False) -> SyncData:
if self._sync is None or force_refresh:
resp = self._api_request("GET", "api/sync")
self._sync = SyncData.model_validate_json(resp.text)
return self._sync_step(resp.json())
return self._sync

def _sync_step(self, data: dict) -> SyncData:
v: dict[str, typing.Any] = {
"profile": data.get("profile") or data.get("Profile"),
"ciphers": [],
"collections": [],
"folders": [],
"policies": [],
"sends": [],
"domains": {},
}
# populate self._sync.Profile
self._sync = SyncData.model_validate(
v, context=CryptoContext(client=self)
)
# uses self._sync.Profile
self._sync = SyncData.model_validate(
data,
context=CryptoContext(client=self),
)
return self._sync

# def create_organization(self, name, email=None) -> "Organization":
# pass

# def get_organization(self, name) -> "Organization":
# pass

def create_user(
self,
email: str,
password: str,
name,
kdf: "Kdf",
):
assert email == email.lower(), "email is not lowercase"
assert len(password) >= 8, "password is too short (< 8 characters)"

rd = RegisterData.model_construct(
email=email,
password=password,
name=name,
**kdf.model_dump(by_alias=True),
)
data = rd.model_dump(
by_alias=True,
exclude_none=True,
exclude_unset=True,
context=CryptoContext(client=self),
)
resp = self._api_request("POST", "api/accounts/register", json=data)
return resp.json()

def create_item(
self,
item: "CipherDetails",
organization: typing.Optional["Organization"],
collections: list["OrganizationCollection"] | None,
) -> "CipherDetails":
if organization:
assert organization and (
collections is not None and len(collections)
), (organization, collections)
path = "api/ciphers/admin"
key = organization.key()
item.OrganizationId = organization.Id
data = {
"type": item.Type,
"cipher": item.model_dump(
by_alias=True,
mode="json",
context=CryptoContext(client=self, stack=[key]),
),
"collectionIds": [str(i.Id) for i in collections],
}
else:
path = "api/ciphers"
assert self.connect_token is not None
key = self.connect_token.Key
data = item.model_dump(
by_alias=True,
mode="json",
context=CryptoContext(client=self, stack=[key]),
)

resp = self._api_request("POST", path, json=data)
return CipherDetail.validate_json(
resp.text, context=CryptoContext(client=self)
)
Loading
Loading