diff --git a/src/vaultwarden/clients/bitwarden.py b/src/vaultwarden/clients/bitwarden.py index 3650b11..8929702 100644 --- a/src/vaultwarden/clients/bitwarden.py +++ b/src/vaultwarden/clients/bitwarden.py @@ -1,19 +1,29 @@ +import typing from typing import Literal from uuid import UUID from httpx import Client, Response +from vaultwarden.models.bitwarden import CipherDetail, RegisterData +from vaultwarden.models.crypto import CryptoContext from vaultwarden.models.exception_models import BitwardenError from vaultwarden.models.sync import ConnectToken, SyncData -from vaultwarden.utils.crypto import make_master_key from vaultwarden.utils.logger import log_raise_for_status +if typing.TYPE_CHECKING: + from vaultwarden.models.bitwarden import ( + CipherDetails, + Kdf, + Organization, + OrganizationCollection, + ) + class BitwardenAPIClient: def __init__( self, url: str, - email: str, + email: str | None, password: str, client_id: str, client_secret: str, @@ -23,7 +33,7 @@ def __init__( # if one of the parameters is None, raise an exception if not all([url, password, client_id, client_secret, device_id]): raise BitwardenError("All parameters are required") - self.email = email + self.email: str | None = email self.password = password self.client_id = client_id self.client_secret = client_secret @@ -53,34 +63,15 @@ def _refresh_connect_token(self): or self.connect_token.refresh_token is None ): self._set_connect_token() - return - headers = { - "content-type": "application/x-www-form-urlencoded; charset=utf-8", - } - payload = { - "grant_type": "refresh_token", - "refresh_token": self.connect_token.refresh_token, - } - resp = self._http_client.post( - "identity/connect/token", headers=headers, data=payload - ) - self._connect_token = ConnectToken.model_validate_json(resp.text) - - import vaultwarden.models.bitwarden - - self._connect_token.master_key = make_master_key( - password=self.password, - salt=self.email, - kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token( - self._connect_token - ), - ) + else: + payload = { + "grant_type": "refresh_token", + "refresh_token": self.connect_token.refresh_token, + } + self._set_connect_token(payload) - def _set_connect_token(self): - headers = { - "content-type": "application/x-www-form-urlencoded; charset=utf-8", - } - payload = { + def _set_connect_token(self, refresh: dict | None = None): + payload = refresh or { "grant_type": "client_credentials", "client_secret": f"{self.client_secret}", "client_id": f"{self.client_id}", @@ -90,31 +81,28 @@ def _set_connect_token(self): "deviceIdentifier": f"{self.device_id}", "deviceName": "python-vaultwarden", } + headers = { + "content-type": "application/x-www-form-urlencoded; charset=utf-8", + } resp = self._http_client.post( "identity/connect/token", headers=headers, data=payload ) - self._connect_token = ConnectToken.model_validate_json(resp.text) - if self.email is None: + access_token = resp.json()["access_token"] headers = { - "Authorization": f"Bearer {self._connect_token.access_token}", + "Authorization": f"Bearer {access_token}", "content-type": "application/json; charset=utf-8", "Accept": "*/*", } - resp = self._http_client.get( + mresp = self._http_client.get( "api/accounts/profile", headers=headers ) - self.email = resp.json()["email"] + self.email = mresp.json()["email"] - import vaultwarden.models.bitwarden - - self._connect_token.master_key = make_master_key( - password=self.password, - salt=self.email, - kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token( - self._connect_token - ), + self._connect_token = ConnectToken.model_validate_json( + resp.text, context=CryptoContext(client=self) ) + return # login to api @@ -145,9 +133,12 @@ def _api_request( raise BitwardenError("Fail to connect") headers = { "Authorization": f"Bearer {self.connect_token.access_token}", - "content-type": "application/json; charset=utf-8", "Accept": "*/*", } + + if kwargs.get("json") is not None: + headers["content-type"] = "application/json; charset=utf-8" + return self._http_client.request( method, path, headers=headers, **kwargs ) @@ -155,5 +146,99 @@ def _api_request( def sync(self, force_refresh: bool = False) -> SyncData: if self._sync is None or force_refresh: resp = self._api_request("GET", "api/sync") - self._sync = SyncData.model_validate_json(resp.text) + return self._sync_step(resp.json()) + return self._sync + + def _sync_step(self, data: dict) -> SyncData: + v: dict[str, typing.Any] = { + "profile": data.get("profile") or data.get("Profile"), + "ciphers": [], + "collections": [], + "folders": [], + "policies": [], + "sends": [], + "domains": {}, + } + # populate self._sync.Profile + self._sync = SyncData.model_validate( + v, context=CryptoContext(client=self) + ) + # uses self._sync.Profile + self._sync = SyncData.model_validate( + data, + context=CryptoContext(client=self), + ) return self._sync + + # def create_organization(self, name, email=None) -> "Organization": + # pass + + # def get_organization(self, name) -> "Organization": + # pass + + def create_user( + self, + email: str, + password: str, + name, + kdf: "Kdf", + ): + assert email == email.lower(), "email is not lowercase" + assert len(password) >= 8, "password is too short (< 8 characters)" + + rd = RegisterData.model_construct( + email=email, + password=password, + name=name, + **kdf.model_dump(by_alias=True), + ) + data = rd.model_dump( + by_alias=True, + exclude_none=True, + exclude_unset=True, + context=CryptoContext(client=self), + ) + resp = self._api_request("POST", "api/accounts/register", json=data) + return resp.json() + + def search_item(self, name): + for i in self._sync.Ciphers: + if i.uri_match(name): + yield i + + def create_item( + self, + item: "CipherDetails", + organization: typing.Optional["Organization"], + collections: list["OrganizationCollection"] | None, + ) -> "CipherDetails": + if organization: + assert organization and ( + collections is not None and len(collections) + ), (organization, collections) + path = "api/ciphers/admin" + key = organization.key() + item.OrganizationId = organization.Id + data = { + "type": item.Type, + "cipher": item.model_dump( + by_alias=True, + mode="json", + context=CryptoContext(client=self, stack=[key]), + ), + "collectionIds": [str(i.Id) for i in collections], + } + else: + path = "api/ciphers" + assert self.connect_token is not None + key = self.connect_token.Key + data = item.model_dump( + by_alias=True, + mode="json", + context=CryptoContext(client=self, stack=[key]), + ) + + resp = self._api_request("POST", path, json=data) + return CipherDetail.validate_json( + resp.text, context=CryptoContext(client=self) + ) diff --git a/src/vaultwarden/models/bitwarden.py b/src/vaultwarden/models/bitwarden.py index 8e81b68..a3de716 100644 --- a/src/vaultwarden/models/bitwarden.py +++ b/src/vaultwarden/models/bitwarden.py @@ -1,6 +1,13 @@ -import dataclasses +import base64 import datetime +from enum import IntEnum +from functools import cached_property +import hashlib +import io +from pathlib import Path +from secrets import token_bytes import sys +import typing from typing import ( TYPE_CHECKING, Annotated, @@ -13,32 +20,46 @@ ) from uuid import UUID +from Crypto.PublicKey import RSA from pydantic import ( AliasChoices, Field, ModelWrapValidatorHandler, + PrivateAttr, TypeAdapter, - WrapValidator, + computed_field, field_validator, + model_serializer, model_validator, ) from pydantic_core.core_schema import ( FieldValidationInfo, + SerializationInfo, + SerializerFunctionWrapHandler, ValidationInfo, - ValidatorFunctionWrapHandler, ) -from typing_extensions import Self -from vaultwarden.clients.bitwarden import BitwardenAPIClient +from vaultwarden.models.crypto import ( + CryptoContext, + SecretBytes, + SecretKey, + SecretString, +) from vaultwarden.models.enum import CipherType, KdfType, OrganizationUserType from vaultwarden.models.exception_models import BitwardenError from vaultwarden.models.permissive_model import PermissiveBaseModel -from vaultwarden.utils.crypto import decrypt, encrypt +from vaultwarden.utils.crypto import ( + BinarySymmetricCipher, + SymmetricCipher, + make_master_key, + stretch_key, +) if TYPE_CHECKING: - import vaultwarden.clients.bitwarden + from vaultwarden.clients.bitwarden import BitwardenAPIClient + from vaultwarden.models.sync import ProfileOrganization -if sys.version_info < (3, 12): +if sys.version_info < (3, 11): from typing_extensions import Self else: from typing import Self @@ -49,117 +70,187 @@ T = TypeVar("T", bound="BitwardenBaseModel") +def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Any], + info: ValidationInfo, +) -> Any: + key: str + ctx: CryptoContext = cast(CryptoContext, info.context) + if (key := (data.get("key") or data.get("Key"))) is not None: + assert isinstance(ctx.stack[-1], bytes) + v = SymmetricCipher.decode(key, ctx.stack[-1]) + ctx.push(v) + + r = handler(data) + + if key is not None: + ctx.pop() + + return r + + +def ser_set_key( + slf: Any, handler: SerializerFunctionWrapHandler, info: SerializationInfo +) -> Any: + key: bytes | None + if (key := slf.Key) is not None: + ctx: CryptoContext = cast(CryptoContext, info.context) + ctx.push(key) + + v = handler(slf) + + if key is not None: + ctx.pop() + + return v + + class ResplistBitwarden(PermissiveBaseModel, Generic[T]): Data: list[T] class BitwardenBaseModel(PermissiveBaseModel): - bitwarden_client: BitwardenAPIClient | None = Field( - default=None, validate_default=True, exclude=True - ) + _bitwarden_client: Any = PrivateAttr(default=None) - @field_validator("bitwarden_client") + @model_validator(mode="wrap") @classmethod - def set_client(cls, v, info: FieldValidationInfo): - if v is None and info.context is not None: - return info.context.get("client") + def val_set_client( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + ctx: CryptoContext = cast(CryptoContext, info.context) + v = handler(data) + v._bitwarden_client = ctx.client return v @property - def api_client(self) -> BitwardenAPIClient: - assert self.bitwarden_client is not None - return self.bitwarden_client - - -def decode_bytes( - value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo -) -> bytes: - context: dict = cast("dict", info.context) - keys: list[bytes] = cast("list[bytes]", context.get("cctx")) - for key in keys[::-1]: - try: - return decrypt(handler(value), key) - except Exception: - continue - raise ValueError("No key found") + def api_client(self) -> "BitwardenAPIClient": + assert self._bitwarden_client is not None + return self._bitwarden_client -def decode_string( - value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo -) -> str: - return decode_bytes(value, handler, info=info).decode("utf-8") +class UriMatchDetection(IntEnum): + BASEDOMAIN = 0 + HOST = 1 + STARTSWITH = 2 + EXACT = 3 + RE = 4 + NEVER = 5 class UriMatch(BitwardenBaseModel): class Config: extra = "forbid" - match: int | None = None - uri: Annotated[str, WrapValidator(decode_string)] | None = None - uriChecksum: Annotated[str, WrapValidator(decode_string)] | None = None + match: UriMatchDetection | None = None + uri: SecretString | None = None + uriChecksum: SecretString | None = None response: str | None = None + def uri_match(self, name: str) -> bool: + import re + import urllib.parse + + if self.uri is None: + return False + m = self.match if self.match is not None else UriMatchDetection.HOST + match m: + case UriMatchDetection.BASEDOMAIN: + url = urllib.parse.urlparse(name) + if url.hostname is None: + return False + basename = ".".join(url.hostname.split(".")[1:]) + hostname = urllib.parse.urlparse(self.uri).hostname + return hostname == basename + case UriMatchDetection.HOST: + url = urllib.parse.urlparse(self.uri) + hostname = urllib.parse.urlparse(name).hostname + return hostname == url.hostname + case UriMatchDetection.STARTSWITH: + return name.startswith(self.uri) + case UriMatchDetection.EXACT: + return name == self.uri + case UriMatchDetection.RE: + return re.match(self.uri, name) is not None + case UriMatchDetection.NEVER: + return False + class XField(BitwardenBaseModel): class Config: extra = "forbid" - name: Annotated[str, WrapValidator(decode_string)] | None = None - response: Annotated[str, WrapValidator(decode_string)] | None = None + name: SecretString | None = None + response: SecretString | None = None type: int - value: Annotated[str, WrapValidator(decode_string)] | None = None + value: SecretString | None = None linkedId: str | None = None +class PasswordChange(BitwardenBaseModel): + class Config: + extra = "forbid" + + lastUsedDate: datetime.datetime + password: SecretString + + class CipherLogin(BitwardenBaseModel): class Config: extra = "forbid" - name: Annotated[str, WrapValidator(decode_string)] | None = None + Name: SecretString | None = None autofillOnPageLoad: bool | None = None - password: Annotated[str, WrapValidator(decode_string)] | None = None + password: SecretString | None = None passwordRevisionDate: datetime.datetime | None = None totp: str | None = None - uri: Annotated[str, WrapValidator(decode_string)] | None = None - uris: list[UriMatch] | None = None - username: Annotated[str, WrapValidator(decode_string)] | None = None - notes: Annotated[str, WrapValidator(decode_string)] | None = None + Uri: SecretString | None = None + Uris: list[UriMatch] | None = None + username: SecretString | None = None + Notes: SecretString | None = None + Fields: list[XField] | None = None + PasswordHistory: list[PasswordChange] | None = None -class PasswordChange(BitwardenBaseModel): - class Config: - extra = "forbid" + def uri_match(self, name: str) -> bool: + if self.Uri and self.Uri == name: + return True - lastUsedDate: datetime.datetime - password: str + if self.Uris: + for um in self.Uris: + if um.uri_match(name): + return True + return False class Fido2Credential(BitwardenBaseModel): class Config: extra = "forbid" - counter: Annotated[str, WrapValidator(decode_string)] | None = None + counter: SecretString | None = None creationDate: datetime.datetime | None = None - credentialId: Annotated[str, WrapValidator(decode_string)] | None = None - discoverable: Annotated[str, WrapValidator(decode_string)] | None = None - keyAlgorithm: Annotated[str, WrapValidator(decode_string)] | None = None - keyCurve: Annotated[str, WrapValidator(decode_string)] | None = None - keyType: Annotated[str, WrapValidator(decode_string)] | None = None - keyValue: Annotated[str, WrapValidator(decode_string)] | None = None + credentialId: SecretString | None = None + discoverable: SecretString | None = None + keyAlgorithm: SecretString | None = None + keyCurve: SecretString | None = None + keyType: SecretString | None = None + keyValue: SecretString | None = None response: str | None = None - rpId: Annotated[str, WrapValidator(decode_string)] | None = None - rpName: Annotated[str, WrapValidator(decode_string)] | None = None - userDisplayName: Annotated[str, WrapValidator(decode_string)] | None = None - userHandle: Annotated[str, WrapValidator(decode_string)] | None = None - userName: Annotated[str, WrapValidator(decode_string)] | None = None + rpId: SecretString | None = None + rpName: SecretString | None = None + userDisplayName: SecretString | None = None + userHandle: SecretString | None = None + userName: SecretString | None = None class LoginData(CipherLogin): class Config: extra = "forbid" - fields: list[XField] | None = None - passwordHistory: list[PasswordChange] | None = None response: str | None = None fido2Credentials: list[Fido2Credential] | None = None @@ -168,8 +259,6 @@ class SecureNoteData(CipherLogin): class Config: extra = "forbid" - fields: list[XField] - passwordHistory: list[PasswordChange] response: str | None = None type: int | None = None @@ -178,28 +267,36 @@ class SecureNoteProperty(BitwardenBaseModel): class Config: extra = "forbid" - name: Annotated[str, WrapValidator(decode_string)] | None = None - notes: Annotated[str, WrapValidator(decode_string)] | None = None - fields: list[XField] | None = None - passwordHistory: list[PasswordChange] | None = None - response: Annotated[str, WrapValidator(decode_string)] | None = None + response: SecretString | None = None type: int +class AttachmentRequest(BitwardenBaseModel): + class Config: + extra = "forbid" + + Key: SecretBytes + fileName: SecretString + fileSize: int + adminRequest: bool | None = None + + class Attachment(BitwardenBaseModel): class Config: extra = "forbid" - fileName: Annotated[str, WrapValidator(decode_string)] | None = None + Key: SecretBytes + fileName: SecretString | None = None id: str - key: str | None = ( - None # Annotated[str, WrapValidator(decodeBytes)]|None = None - ) - object: str + Object: str size: int sizeName: str url: str + def download(self): + v = self._bitwarden_client._http_client.get(self.url) + return BinarySymmetricCipher.decode(v.content, self.key) + class _CipherBase(BitwardenBaseModel): class Config: @@ -208,49 +305,78 @@ class Config: Id: UUID | None = None OrganizationId: UUID | None = Field(None, validate_default=True) Type: CipherType - Name: Annotated[str, WrapValidator(decode_string)] + Name: SecretString CollectionIds: list[UUID] - key: str | None = None + Key: SecretKey | None = None - organizationUseTotp: bool | None = None - creationDate: datetime.datetime | None = None - deletedDate: datetime.datetime | None = None - fields: list[XField] | None = None + OrganizationUseTotp: bool | None = None + CreationDate: datetime.datetime | None = None + DeletedDate: datetime.datetime | None = None + Fields: list[XField] | None = None + + Notes: SecretString | None = None + Reprompt: int | None = None + RevisionDate: str | None = None + sshKey: str | None = None + PasswordHistory: list[PasswordChange] | None = None + Object: str | None = None + Attachments: list[Attachment] | None = None + + Edit: bool | None = None + Favorite: bool | None = None + FolderId: UUID | None = None + Permissions: Any | None = None + ViewPassword: bool | None = None - notes: Annotated[str, WrapValidator(decode_string)] | None = None - reprompt: int - revisionDate: str - sshKey: str | None - passwordHistory: list[PasswordChange] - object: str | None = None - attachments: list[Attachment] | None = None + Data: CipherLogin | None = None @model_validator(mode="wrap") @classmethod - def set_key( + def val_set_key( cls, data: Any, handler: ModelWrapValidatorHandler[Self], info: ValidationInfo, ) -> Self: - if (key := data.get("key")) is not None: - context = cast("dict", info.context) - cctx = cast("list[bytes]", context.get("cctx")) - - cctx.append(decrypt(key, cctx[0])) + assert isinstance(info.context, CryptoContext) + + ctx: CryptoContext = cast(CryptoContext, info.context) + + assert ctx.client._sync and ctx.client._sync.Profile + + if ( + o := data.get("organizationId") or data.get("OrganizationId") + ) is not None: + oid = UUID(o) + org: typing.Optional["ProfileOrganization"] = None + for org in ctx.client._sync.Profile.Organizations: + if oid == org.Id: + assert org.Key + ctx.push(org.Key) + break + else: + raise ValueError(f"No organization found {oid}") + else: + assert ctx.client._connect_token + ctx.push(ctx.client._connect_token.Key) + r = val_set_key(cls, data, handler, info) - v = handler(data) + ctx.pop() - if key is not None: - cctx.pop() + return r - return v + @model_serializer(mode="wrap") + def ser_set_key( + self, handler: SerializerFunctionWrapHandler, info: SerializationInfo + ) -> Any: + return ser_set_key(self, handler, info) @field_validator("OrganizationId") @classmethod def set_id(cls, v, info: FieldValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def add_collections(self, collections: list[UUID]): @@ -289,55 +415,98 @@ def update_collection(self, collections: list[UUID]): json={"collectionIds": dump}, ) + def attach(self, path: Path): + with path.open("rb") as f: + self._attach(path.name, f) + + def _attach(self, name: str, file: io.IOBase): + "/api/ciphers/fc246fe5-9177-455b-b318-c00fab407dc8/attachment/v2" + key = token_bytes(64) + ed = BinarySymmetricCipher.encode(file.read(), key) + ar = AttachmentRequest.model_construct( + Key=key, fileName=name, fileSize=len(ed), adminRequest=True + ) + if self.OrganizationId: + stack = [ + get_organization( + self._bitwarden_client, self.OrganizationId + ).key() + ] + else: + stack = [self._bitwarden_client._connect_token._masterKey] + ard = ar.model_dump( + context=CryptoContext(client=self._bitwarden_client, stack=stack) + ) + v = self._bitwarden_client._api_request( + "POST", f"api/ciphers/{self.Id}/attachment/v2", json=ard + ).json() + self._bitwarden_client._api_request( + "POST", + "api" + v["url"], + files={ + "data": ( + ard["fileName"], + io.BytesIO(ed), + "application/octet-stream", + ) + }, + ) + + def uri_match(self, name: str) -> bool: + assert self.Data + return self.Data.uri_match(name) + class Login(_CipherBase): - Type: Literal[CipherType.Login] + Type: Literal[CipherType.Login] = CipherType.Login - login: LoginData | None = None - secureNote: None = None - card: None = None - identity: None = None + Login: LoginData | None = None + SecureNote: None = None + Card: None = None + Identity: None = None - data: LoginData | None = None + Data: LoginData | None = None class SecureNote(_CipherBase): Type: Literal[CipherType.SecureNote] - login: None = None - secureNote: SecureNoteProperty | None = None - card: None = None - identity: None = None + Login: None = None + SecureNote: SecureNoteProperty | None = None + Card: None = None + Identity: None = None - data: SecureNoteData | None = None + Data: SecureNoteData | None = None class Card(_CipherBase): Type: Literal[CipherType.Card] - login: None = None - card: None = None - secureNote: None = None - identity: None = None + Login: None = None + Card: None = None + SecureNote: None = None + Identity: None = None - data: None = None + Data: None = None class Identity(_CipherBase): Type: Literal[CipherType.Identity] - login: None = None - secureNote: None = None - card: None = None - identity: None = None + Login: None = None + SecureNote: None = None + Card: None = None + Identity: None = None - data: None = None + Data: None = None CipherDetails = Annotated[ Union[Login, SecureNote, Card, Identity], Field(discriminator="Type") ] +CipherDetail: TypeAdapter[CipherDetails] = TypeAdapter(CipherDetails) + class CollectionAccess(BitwardenBaseModel): ReadOnly: bool = False @@ -357,7 +526,8 @@ class CollectionUser(CollectionAccess): @classmethod def set_id(cls, v, info: FieldValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v @@ -373,21 +543,23 @@ class UserCollection(CollectionAccess): @classmethod def set_id(cls, v, info: FieldValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v class OrganizationCollection(BitwardenBaseModel): Id: UUID | None = None OrganizationId: UUID | None = Field(None, validate_default=True) - Name: str + Name: SecretString ExternalId: str | None = None @field_validator("OrganizationId") @classmethod def set_id(cls, v, info: FieldValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def users(self) -> list[CollectionUser]: @@ -398,7 +570,7 @@ def users(self) -> list[CollectionUser]: ) return TypeAdapter(list[CollectionUser]).validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext(client=self.api_client, parent_id=self.Id), ) def set_users( @@ -462,7 +634,8 @@ class OrganizationUserDetails(BitwardenBaseModel): @classmethod def set_id(cls, v, info: FieldValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def add_collections(self, collections: list[UUID]): @@ -470,14 +643,14 @@ def add_collections(self, collections: list[UUID]): for collection in collections: if collection in _current_collections: continue - user = UserCollection( + user = UserCollection.model_construct( CollectionId=collection, UserId=self.Id, ReadOnly=False, HidePasswords=False, Manage=False, ) - user.bitwarden_client = self.api_client + user._bitwarden_client = self.api_client self.Collections.append(user) pl = self.model_dump( include={ @@ -598,7 +771,8 @@ class Organization(BitwardenBaseModel): @classmethod def set_id(cls, v, info: FieldValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def rename(self, new_name: str): @@ -684,10 +858,9 @@ def _get_users(self) -> list[OrganizationUserDetails]: ResplistBitwarden[OrganizationUserDetails] .model_validate_json( resp.text, - context={ - "parent_id": self.Id, - "client": self.api_client, - }, + context=CryptoContext( + client=self.api_client, parent_id=self.Id + ), ) .Data ) @@ -720,7 +893,7 @@ def user(self, user_id: UUID) -> OrganizationUserDetails: ) return OrganizationUserDetails.model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext(client=self.api_client, parent_id=self.Id), ) def user_search( @@ -740,12 +913,10 @@ def _get_collections(self) -> list[OrganizationCollection]: ) res = ResplistBitwarden[OrganizationCollection].model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext( + client=self.api_client, parent_id=self.Id, stack=[self.key()] + ), ) - org_key = self.key() - # map each collection name to the decrypted name - for collection in res.Data: - collection.Name = decrypt(collection.Name, org_key).decode("utf-8") return res.Data def collections( @@ -760,7 +931,7 @@ def collections( def create_collection(self, name: str) -> OrganizationCollection: org_key = self.key() data = { - "name": encrypt(2, name, self.key()), + "name": SymmetricCipher.encode(name.encode("utf-8"), org_key), "groups": [], "users": [], } @@ -769,9 +940,10 @@ def create_collection(self, name: str) -> OrganizationCollection: ) res = OrganizationCollection.model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext( + client=self.api_client, parent_id=self.Id, stack=[org_key] + ), ) - res.Name = decrypt(res.Name, org_key).decode("utf-8") if self._collections is not None: self._collections.append(res) else: @@ -801,14 +973,9 @@ def _get_ciphers(self) -> list[CipherDetails]: "api/ciphers/organization-details", params={"organizationId": self.Id}, ) - org_key = self.key() res = ResplistBitwarden[CipherDetails].model_validate_json( resp.text, - context={ - "parent_id": self.Id, - "client": self.api_client, - "cctx": [org_key], # crypto context - }, + context=CryptoContext(client=self.api_client, parent_id=self.Id), ) return res.Data @@ -831,42 +998,135 @@ def ciphers( ] return self._ciphers - def key(self): + def key(self) -> bytes: sync = self.api_client.sync() for org in sync.Profile.Organizations: if org.Id == self.Id: break else: raise BitwardenError(f"No Organizations `{self.Id}` found") - return decrypt(org.Key, self.api_client.connect_token.orgs_key) + assert org and org.Key + return org.Key def get_organization( - bitwarden_client, organisation_id: UUID | str + bitwarden_client: "BitwardenAPIClient", organisation_id: UUID | str ) -> Organization: + oid = ( + UUID(organisation_id) + if isinstance(organisation_id, str) + else organisation_id + ) + + if bitwarden_client._sync is not None: + for org in bitwarden_client._sync.Profile.Organizations: + if org.Id == oid: + r = Organization.model_construct( + Id=org.Id, Name=org.Name, BillingEmail="", Object="" + ) + r._bitwarden_client = bitwarden_client + return r + resp = bitwarden_client.api_request( "GET", f"api/organizations/{organisation_id}" ) return Organization.model_validate_json( resp.text, - context={"client": bitwarden_client, "parent_id": organisation_id}, + context=CryptoContext(client=bitwarden_client, parent_id=oid), ) -@dataclasses.dataclass -class Kdf: - Kdf: KdfType +class Kdf(PermissiveBaseModel): + Kdf: int KdfIterations: int | None = None KdfMemory: int | None = None KdfParallelism: int | None = None @classmethod - def from_connect_token( - cls, token: "vaultwarden.clients.bitwarden.ConnectToken" - ): - return cls( - token.Kdf, - token.KdfIterations, - token.KdfMemory, - token.KdfParallelism, + def argon2id(cls): + return cls.model_construct( + Kdf=KdfType.Argon2id, + KdfMemory=32, + KdfIterations=6, + KdfParallelism=4, ) + + +class KeysData(BitwardenBaseModel): + encryptedPrivateKey: str + publicKey: str + + +class RegisterData(BitwardenBaseModel): + """ + c.f. https://bitwarden.com/help/bitwarden-security-white-paper/ + """ + + class Config: + extra = "forbid" + arbitrary_types_allowed = True + + email: str + password: str = Field(exclude=True) + + name: str + Kdf: int + # key: str + + KdfIterations: int | None = None + KdfMemory: int | None = None + KdfParallelism: int | None = None + + # keys: KeysData | None = None + + masterPasswordHint: str | None = None + + @computed_field # type: ignore[prop-decorator] + @property + def masterPasswordHash(self) -> str: # noqa: N802 + v = hashlib.pbkdf2_hmac( + "sha256", self._masterKey, self.password.encode(), 1 + ) + return base64.b64encode(v).decode() + + @computed_field # type: ignore[prop-decorator] + @property + def key(self) -> str: + return SymmetricCipher.encode(self._rawKey, self._masterKey) + + @computed_field # type: ignore[prop-decorator] + @property + def keys(self) -> KeysData: + return KeysData.model_construct( + encryptedPrivateKey=SymmetricCipher.encode( + self._rawKeys.exportKey("DER", pkcs=8), self._rawKey + ), + publicKey=base64.b64encode( + self._rawKeys.publickey().exportKey("DER") + ).decode(), + ) + + @cached_property + def _masterKey(self) -> bytes: # noqa: N802 + return make_master_key( + self.password, + self.email, + Kdf.model_construct( + Kdf=self.Kdf, + KdfIterations=self.KdfIterations, + KdfMemory=self.KdfMemory, + KdfParallelism=self.KdfParallelism, + ), + ) + + @cached_property + def _stretchedKey(self) -> bytes: # noqa: N802 + return stretch_key(self._masterKey) + + @cached_property + def _rawKey(self) -> bytes: # noqa: N802 + return token_bytes(64) + + @cached_property + def _rawKeys(self) -> RSA.RsaKey: # noqa: N802 + return RSA.generate(2048) diff --git a/src/vaultwarden/models/crypto.py b/src/vaultwarden/models/crypto.py new file mode 100644 index 0000000..a4803b7 --- /dev/null +++ b/src/vaultwarden/models/crypto.py @@ -0,0 +1,157 @@ +import dataclasses +import typing +from typing import Any, TypeAlias, cast +from uuid import UUID + +from Crypto.PublicKey import RSA +from pydantic import ( + SerializationInfo, + SerializerFunctionWrapHandler, + ValidationInfo, + ValidatorFunctionWrapHandler, + WrapSerializer, + WrapValidator, +) +from typing_extensions import Annotated + +from vaultwarden.utils.crypto import AsymmetricCipher, SymmetricCipher + +if typing.TYPE_CHECKING: + from vaultwarden.clients.bitwarden import BitwardenAPIClient + + +def decode_string( + value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> str: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.decode(value, ctx.stack[-1])) + + +def encode_string( + value: str, handler: SerializerFunctionWrapHandler, info: SerializationInfo +) -> str: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.encode(value.encode(), ctx.stack[-1])) + + +SecretString = Annotated[ + str, WrapValidator(decode_string), WrapSerializer(encode_string) +] +""" +Symmetric encoded string value +""" + + +def decode_bytes( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.decode(value, ctx.stack[-1])) + + +def encode_bytes( + value: Any, handler: SerializerFunctionWrapHandler, info: SerializationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.encode(value, ctx.stack[-1])) + + +SecretBytes = Annotated[ + bytes, WrapValidator(decode_bytes), WrapSerializer(encode_bytes) +] +""" +Symmetric encoded bytes value +""" + + +def decode_rsa( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> RSA.RsaKey: + ctx = cast(CryptoContext, info.context) + return handler(RSA.importKey(SymmetricCipher.decode(value, ctx.stack[-1]))) + + +def encode_rsa( + value: RSA.RsaKey, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler( + SymmetricCipher.encode(value.exportKey("DER", pkcs=8), ctx.stack[-1]) + ) + + +SecretRSA = Annotated[ + RSA.RsaKey, WrapValidator(decode_rsa), WrapSerializer(encode_rsa) +] +""" +Symmetric encoded RSA key +""" + + +def decode_org_key( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(AsymmetricCipher.decode(value, ctx.stack[-1])) + + +def encode_org_key( + value: bytes, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, +) -> str: + ctx = cast(CryptoContext, info.context) + return handler(AsymmetricCipher.encode(value, ctx.stack[-1])) + + +SecretOrganizationKey = Annotated[ + bytes, WrapValidator(decode_org_key), WrapSerializer(encode_org_key) +] +""" +Asymmetric encoded Key + +* key is not added to cctx +* encoding uses the seconds last key in cctx +""" + + +def decode_key( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.decode(value, ctx.stack[-2])) + + +def encode_key( + value: Any, handler: SerializerFunctionWrapHandler, info: SerializationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.encode(value, ctx.stack[-2])) + + +SecretKey = Annotated[ + bytes, WrapValidator(decode_key), WrapSerializer(encode_key) +] +""" +Symmetric encoded Key + +* the Key is added to cctx by ser_set_key / val_set_key of the model +* en/decoding uses the [-2] key in cctx +""" + +CryptoKey: TypeAlias = RSA.RsaKey | bytes + + +@dataclasses.dataclass +class CryptoContext: + client: "BitwardenAPIClient" + parent_id: UUID | None = None + stack: list[CryptoKey] = dataclasses.field(default_factory=list) + + def push(self, v: CryptoKey) -> None: + return self.stack.append(v) + + def pop(self) -> CryptoKey: + return self.stack.pop() diff --git a/src/vaultwarden/models/sync.py b/src/vaultwarden/models/sync.py index 3044ce2..551782a 100644 --- a/src/vaultwarden/models/sync.py +++ b/src/vaultwarden/models/sync.py @@ -1,11 +1,32 @@ +import sys import time +from typing import Any, cast from uuid import UUID -from pydantic import AliasChoices, Field, field_validator - +from pydantic import ( + AliasChoices, + Field, + ModelWrapValidatorHandler, + PrivateAttr, + ValidationInfo, + field_validator, + model_validator, +) + +from vaultwarden.models.bitwarden import Login, val_set_key +from vaultwarden.models.crypto import ( + CryptoContext, + SecretKey, + SecretOrganizationKey, + SecretRSA, +) from vaultwarden.models.enum import KdfType, VaultwardenUserStatus from vaultwarden.models.permissive_model import PermissiveBaseModel -from vaultwarden.utils.crypto import decrypt + +if sys.version_info < (3, 11): + from typing_extensions import Self +else: + from typing import Self class ConnectToken(PermissiveBaseModel): @@ -13,8 +34,8 @@ class ConnectToken(PermissiveBaseModel): KdfIterations: int = 0 KdfMemory: int | None = None KdfParallelism: int | None = None - Key: str - PrivateKey: str + Key: SecretKey + PrivateKey: SecretRSA access_token: str refresh_token: str | None = None expires_in: int @@ -23,7 +44,7 @@ class ConnectToken(PermissiveBaseModel): unofficialServer: bool = False ResetMasterPassword: bool | None = None - master_key: bytes | None = None # pydantic.PrivateAttr(default=None) + _master_key: bytes | None = PrivateAttr(default=None) @field_validator("expires_in") @classmethod @@ -35,19 +56,39 @@ def is_expired(self, now=None): now = time.time() return (self.expires_in is not None) and (self.expires_in <= now) - @property - def user_key(self): - return decrypt(self.Key, self.master_key) - - @property - def orgs_key(self): - return decrypt(self.PrivateKey, self.user_key) + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + from vaultwarden.models.bitwarden import Kdf + from vaultwarden.models.crypto import CryptoContext + from vaultwarden.utils.crypto import make_master_key + + assert info and info.context + + ctx = cast(CryptoContext, info.context) + assert ctx.client.email is not None + + master_key = make_master_key( + password=ctx.client.password, + salt=ctx.client.email, + kdf=Kdf.model_validate(data), + ) + ctx.push(master_key) + v = val_set_key(cls, data, handler, info) + ctx.pop() # master_key + v._master_key = master_key + return v class ProfileOrganization(PermissiveBaseModel): Id: UUID Name: str - Key: str | None = None + Key: SecretOrganizationKey | None = None ProviderId: str | None = None ProviderName: str | None = None ResetPasswordEnrolled: bool @@ -67,20 +108,20 @@ class ProfileOrganization(PermissiveBaseModel): UseTotp: bool -class UserProfile(PermissiveBaseModel): +class _UserProfile(PermissiveBaseModel): AvatarColor: str | None Culture: str Email: str EmailVerified: bool ForcePasswordReset: bool Id: UUID - Key: str + Key: SecretKey MasterPasswordHint: str | None = None Name: str | None Object: str | None + PrivateKey: SecretRSA | None Organizations: list[ProfileOrganization] Premium: bool - PrivateKey: str | None ProviderOrganizations: list Providers: list SecurityStamp: str @@ -92,18 +133,75 @@ class UserProfile(PermissiveBaseModel): ) -class VaultwardenUser(UserProfile): +class UserProfile(_UserProfile): + @field_validator("Organizations", mode="wrap") + @classmethod + def val_field_Organizations( # noqa: N802 + cls, + v: str, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + ctx: CryptoContext = cast(CryptoContext, info.context) + if ( + key := info.data.get("PrivateKey") or info.data.get("privateKey") + ) is not None: + ctx.push(key) + r = handler(v) + if key: + ctx.pop() + return r + + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + return val_set_key(cls, data, handler, info) + + +class VaultwardenOrganization(ProfileOrganization): + # overwrite + Key: str # type: ignore + + +class VaultwardenUser(_UserProfile): UserEnabled: bool CreatedAt: str LastActive: str | None = None + # overwrite + Key: str # type: ignore + PrivateKey: str | None # type: ignore + Organizations: list[VaultwardenOrganization] # type: ignore + -# TODO: add definition of attribute's types class SyncData(PermissiveBaseModel): - Ciphers: list[dict] + Profile: UserProfile + Ciphers: list[Login] Collections: list[dict] Domains: dict | None Folders: list[dict] Policies: list[dict] - Profile: UserProfile Sends: list[dict] + + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + ctx: CryptoContext = cast(CryptoContext, info.context) + + assert ( + ctx.client._connect_token and ctx.client._connect_token._master_key + ) + ctx.push(ctx.client._connect_token._master_key) + r = handler(data) + ctx.pop() + return r diff --git a/src/vaultwarden/utils/crypto.py b/src/vaultwarden/utils/crypto.py index 7b47841..04b0fdc 100644 --- a/src/vaultwarden/utils/crypto.py +++ b/src/vaultwarden/utils/crypto.py @@ -2,13 +2,13 @@ # -*- coding: utf-8 -*- # Original source: # https://github.com/corpusops/bitwardentools/blob/main/src/bitwardentools/crypto.py -from __future__ import absolute_import, division, print_function import base64 import hashlib import re import secrets import string +import sys from base64 import b64decode, b64encode from enum import IntEnum from hashlib import pbkdf2_hmac, sha256 @@ -19,106 +19,218 @@ from Crypto.Cipher import AES, PKCS1_OAEP from Crypto.PublicKey import RSA from hkdf import hkdf_expand +from typing_extensions import override + +if sys.version_info < (3, 11): + from typing_extensions import Self +else: + from typing import Self + if typing.TYPE_CHECKING: import vaultwarden.models.bitwarden + + class CIPHERS(IntEnum): + null = 0 sym = 2 asym = 4 CACHE = {} # type: ignore -ITERATIONS = 2000000 -ENCODED_CIPHER = { - CIPHERS.sym: "{typ}.{b64_iv}|{b64_ct}|{b64_digest}", - CIPHERS.asym: "{typ}.{b64_ct}", -} ENCRYPTED_STRING_RE = re.compile("^[0-9][.].*=.*", flags=re.I | re.M) SYM_ENCRYPTED_STRING_RE = re.compile( "^2[.][^=]+=+[|][^=]+=+[|][^=]+=+", flags=re.I | re.M ) +class _Cipher: + TYPE: int + ENCODING: str + @classmethod + def encode(cls, plainbytes:bytes, key:bytes) -> str: + raise NotImplementedError() + + @classmethod + def decode(cls, data, key) -> bytes: + raise NotImplementedError() + + def _decrypt(self, data:bytes, key: bytes) -> bytes: + raise NotImplementedError() + +class AsymmetricCipher(_Cipher): + TYPE = CIPHERS.asym + ENCODING = "{typ}.{b64_ct}" + @classmethod + def _parse(cls, ct:str) -> tuple[Self, bytes]: + return cls(), b64decode(ct) + + def _decrypt(self, ct:bytes, key: RSA.RsaKey) -> bytes: + assert isinstance(ct, bytes) + assert isinstance(key, RSA.RsaKey) + return PKCS1_OAEP.new(key).decrypt(ct) + + @classmethod + def encode(cls, plainbytes: bytes, key: RSA.RsaKey): + assert isinstance(plainbytes, bytes) + assert isinstance(key, RSA.RsaKey) + cipher = PKCS1_OAEP.new(key).encrypt(plainbytes) + b64_ct = b64encode(cipher).decode() + return cls.ENCODING.format(cipher=cipher, b64_ct=b64_ct) + + @classmethod + def decode(cls, data: str, key: RSA.RsaKey) -> bytes: + assert int(data[0]) == AsymmetricCipher.TYPE + cipher, ct = cls._parse(data[1:]) + return cipher._decrypt(ct, key) + +class SymmetricCipher(_Cipher): + TYPE = CIPHERS.sym + ENCODING = "{typ}.{b64_iv}|{b64_ct}|{b64_digest}" + def __init__(self, iv:bytes, mac:bytes): + self._iv = iv + self._mac = mac + + @classmethod + def _parse(cls, ct: str) -> tuple[Self, bytes]: + iv, ct, mac = ct.split("|", 3) + return cls(b64decode(iv), b64decode(mac)[0:32]), b64decode(ct) + + def _decrypt(self, ct: bytes, key: bytes) -> bytes: + assert isinstance(ct, bytes) + assert isinstance(key, bytes) + enc, mac = SymmetricCipher._get_enc_mac(key) + hdmac = hmac_new(mac, self._iv + ct, sha256).digest() + if hdmac != self._mac: + raise DecryptError( + f"Symmetric hmac verification failed {bytes(hdmac).hex()} / {bytes(self._mac).hex()}. Check your password." + ) + c = AES.new(enc, AES.MODE_CBC, self._iv) + plaintext = c.decrypt(ct) + pad_len = plaintext[-1] + padding = bytes([pad_len] * pad_len) + if plaintext[-pad_len:] == padding: + plaintext = plaintext[:-pad_len] + return plaintext + + + @classmethod + def encode(cls, plainbytes: bytes, key: bytes) -> str: + assert isinstance(plainbytes, bytes) + assert isinstance(key, bytes) + # inspired from bitwarden/jslib:src/services/crypto.service.ts + typ = int(CIPHERS.sym) + (iv, ct, mac) = aes_encrypt(plainbytes, key) + # jslib: encrypt() + b64_iv = b64encode(iv).decode() + b64_ct = b64encode(ct).decode() + b64_digest = "" + if mac: + b64_digest = b64encode(mac).decode() + return cls.ENCODING.format(typ=CIPHERS.sym, b64_iv=b64_iv, b64_ct=b64_ct, b64_digest=b64_digest) + + @classmethod + def decode(cls, data: str, key: bytes) -> bytes: + assert int(data[0]) == SymmetricCipher.TYPE + cipher, ct = cls._parse(data[1:]) + return cipher._decrypt(ct, key) + + + @staticmethod + def _get_enc_mac(key:bytes) -> tuple[bytes, bytes]: + assert isinstance(key, bytes) + # + match len(key): + case 32: + """symmetric master_key of the user""" + enc = hkdf_expand(key, b"enc", 32, sha256) + mac = hkdf_expand(key, b"mac", 32, sha256) + case 64: + """symmetric key of an organization""" + enc = key[:32] + mac = key[32:] + case _: + raise ValueError(f"Invalid key type {key!r}") + return enc, mac + + +class BinarySymmetricCipher: + ENCODING = b"%(typ)c%(iv)16b%(mac)32b%(ct)b" + + def __init__(self, iv:bytes, mac:bytes): + self._iv = iv + self._mac = mac + + @classmethod + def _parse(cls, cipher_bytes: bytes) -> tuple[Self, bytes]: + iv = cipher_bytes[0:16] + mac = cipher_bytes[16:48] + ct = cipher_bytes[48:] + return cls(iv, mac), ct + + def _decrypt(self, ct: bytes, key: bytes) -> bytes: + assert isinstance(ct, bytes) + assert isinstance(key, bytes) + enc, mac = SymmetricCipher._get_enc_mac(key) + hdmac = hmac_new(mac, self._iv + ct, sha256).digest() + if hdmac != self._mac: + raise DecryptError( + f"Symmetric hmac verification failed {bytes(hdmac).hex()} / {bytes(self._mac).hex()}. Check your password." + ) + c = AES.new(enc, AES.MODE_CBC, self._iv) + plaintext = c.decrypt(ct) + pad_len = plaintext[-1] + padding = bytes([pad_len] * pad_len) + if plaintext[-pad_len:] == padding: + plaintext = plaintext[:-pad_len] + return plaintext + + @classmethod + def decode(cls, data: bytes, key: bytes) -> bytes: + assert isinstance(data, bytes) + assert isinstance(key, bytes) + assert int(data[0]) == SymmetricCipher.TYPE + cipher, ct = cls._parse(data[1:]) + return cipher._decrypt(ct, key) + + + @classmethod + def encode(cls, plainbytes: bytes, key: bytes) -> bytes: + assert isinstance(plainbytes, bytes) + assert isinstance(key, bytes) + # inspired from bitwarden/jslib:src/services/crypto.service.ts + typ = int(CIPHERS.sym) + (iv, ct, mac) = aes_encrypt(plainbytes, key) + # jslib: encryptToBytes() + ret = chr(typ).encode() + ret += iv + if mac: + ret += mac + ret += ct -class UnimplementedError(Exception): - """.""" - - -class DecodeEncKeyError(ValueError): - """.""" - - -class WrongFormatError(DecodeEncKeyError): - """.""" - - -class WrongTypeDecryptError(DecodeEncKeyError): - """.""" - + assert cls.ENCODING % {b"typ": typ, b"iv": iv, b"mac": mac, b"ct": ct} == ret + return ret -class MissingPartsDecryptError(DecodeEncKeyError): - """.""" +class NullCipher(_Cipher): + TYPE = CIPHERS.null + def __init__(self, iv, ct): + self._iv = iv + self._ct = ct -class B64DecryptError(DecodeEncKeyError): - """.""" + @classmethod + def parse(cls, ct): + iv, ct, mac = ct.split("|", 2) + iv = b64decode(iv) + ct = b64decode(ct) + return cls(iv), ct class DecryptError(ValueError): """.""" -def decode_cipher_string(cipher_string): - """decode a cipher tring into it's parts""" - iv = None - mac = None - assert cipher_string is not None - if not ENCRYPTED_STRING_RE.match(cipher_string): - raise WrongFormatError(f"{cipher_string}") - try: - typ = cipher_string[0:1] - typ = int(typ) - assert typ < 9 - except (AssertionError, ValueError): - raise WrongTypeDecryptError(f"{typ} is not valid") - ct = cipher_string[2:] - if typ == CIPHERS.asym: - pass - else: - try: - if typ == 0: - iv, ct = ct.split("|", 2) - else: - iv, ct, mac = ct.split("|", 3) - except Exception: - raise MissingPartsDecryptError(f"{ct} is missing parts") - if iv: - try: - iv = b64decode(iv) - except Exception: - raise B64DecryptError(f"iv {iv} not valid") - if mac: - try: - mac = b64decode(mac)[0:32] - except Exception: - raise B64DecryptError(f"mac {mac} not valid") - try: - ct = b64decode(ct) - except Exception: - raise B64DecryptError(f"ct {ct} not valid") - return typ, iv, ct, mac - - -def is_encrypted(cipher_string): - try: - decode_cipher_string(cipher_string) - except DecodeEncKeyError: - return False - else: - return True - - -def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden.Kdf"): +def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden.Kdf") -> bytes: import vaultwarden.models.bitwarden assert isinstance(salt, str) @@ -149,31 +261,16 @@ def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden type=argon2.Type.ID, ) return v + case _: + raise ValueError(f"unsupported kdf {kdf}") + + +def aes_encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes, bytes]: + assert isinstance(plaintext, bytes) + assert isinstance(key, bytes) + + enc, mac = SymmetricCipher._get_enc_mac(key) -def hash_password(password, salt, iterations=ITERATIONS): - """base64-encode a wrapped, stretched password+salt(email) for signup/login""" - if not hasattr(password, "decode"): - password = password.encode("utf-8") - master_key = make_master_key(password, salt, iterations) - hashpw = hashlib.pbkdf2_hmac("sha256", master_key, password, 1) - return base64.b64encode(hashpw), master_key - - -def load_rsa_key(key): - rsakeys = CACHE.setdefault("rsa", {}) - if not isinstance(key, RSA.RsaKey): - try: - key = rsakeys[key] - except KeyError: - rsakeys[key] = RSA.importKey(key) - key = rsakeys[key] - return key - - -def aes_encrypt(plaintext, key, charset="utf-8"): - enc, mac = get_sym_enc_mac(key) - if not hasattr(plaintext, "decode"): - plaintext = plaintext.encode(charset) pad_len = 16 - len(plaintext) % 16 padding = bytes([pad_len] * pad_len) content = plaintext + padding @@ -181,106 +278,10 @@ def aes_encrypt(plaintext, key, charset="utf-8"): c = AES.new(enc, AES.MODE_CBC, iv) ct = c.encrypt(content) cmac = hmac_new(mac, iv + ct, sha256) - return iv, ct, cmac - - -def encrypt_sym(plaintext, key, to_bytes=False, *a, **kw): - # inspired from bitwarden/jslib:src/services/crypto.service.ts - typ, (iv, ct, mac) = int(CIPHERS.sym), aes_encrypt(plaintext, key, *a, **kw) - if mac: - mac = mac.digest() - if to_bytes: - # jslib: encryptToBytes() - ret = chr(typ).encode() - ret += iv - if mac: - ret += mac - ret += ct - else: - # jslib: encrypt() - b64_iv = b64encode(iv).decode() - b64_ct = b64encode(ct).decode() - b64_digest = "" - if mac: - b64_digest = b64encode(mac).decode() - ret = ENCODED_CIPHER[typ].format(**locals()) - return ret - - -def encrypt_sym_to_bytes(plaintext, key, *a, **kw): - kw["to_bytes"] = True - return encrypt_sym(plaintext, key, *a, **kw) - + return iv, ct, cmac.digest() -def encrypt_asym(plaintext, key, *a, **kw): - cipher = PKCS1_OAEP.new(load_rsa_key(key)).encrypt(plaintext) - b64_ct = b64encode(cipher).decode() - typ = CIPHERS.asym - return ENCODED_CIPHER[typ].format(**locals()) - -def encrypt(typ, plaintext, key, *a, **kw): - try: - enc = ENCRYPT[typ] - except KeyError: - raise UnimplementedError(f"can not encrypt type:{typ}") - return enc(plaintext=plaintext, key=key, *a, **kw) - - -def get_sym_enc_mac(key): - # symmetric master_key of the user - if len(key) == 32: - enc = hkdf_expand(key, b"enc", 32, sha256) - mac = hkdf_expand(key, b"mac", 32, sha256) - # symmetric key of an organization - elif len(key) == 64: - enc = key[:32] - mac = key[32:] - return enc, mac - - -def decrypt_sym(dct, key, div, dmac, *a, **kw): - enc, mac = get_sym_enc_mac(key) - hdmac = hmac_new(mac, div + dct, sha256).digest() - if hdmac != dmac: - raise DecryptError( - f"Symmetric hmac verification failed {bytes(hdmac).hex()} / {bytes(dmac).hex()}. Check your password." - ) - c = AES.new(enc, AES.MODE_CBC, div) - plaintext = c.decrypt(dct) - pad_len = plaintext[-1] - padding = bytes([pad_len] * pad_len) - if plaintext[-pad_len:] == padding: - plaintext = plaintext[:-pad_len] - return plaintext - - -def decrypt_asym(dct, key, *a, **kw): - return PKCS1_OAEP.new(load_rsa_key(key)).decrypt(dct) - - -def decrypt_bytes(cipher_bytes, key, *a, **kw): - ret, typ = None, cipher_bytes[0] - if typ in [2]: - iv = cipher_bytes[1:17] - mac = cipher_bytes[17:49] - ct = cipher_bytes[49:] - ret = decrypt_sym(ct, key, iv, mac) - else: - raise UnimplementedError(f"{typ} encType decryption is not implemented") - return ret - - -def decrypt(cipher_string, key, *a, **kw): - typ, iv, ct, mac = decode_cipher_string(cipher_string) - try: - dec = DECRYPT[typ] - except KeyError: - raise UnimplementedError(f"can not decrypt type:{typ}") - return dec(div=iv, dct=ct, dmac=mac, key=key, *a, **kw) - - -def strech_key(key): +def stretch_key(key: bytes) -> bytes: stretched_key = key if len(stretched_key) < 64: stretched_key = hkdf_expand(key, b"enc", 32, sha256) + hkdf_expand( @@ -289,23 +290,8 @@ def strech_key(key): return stretched_key -def make_sym_key(master_key): - stretched_key = strech_key(master_key) - plaintext = token_bytes(64) - return encrypt_sym(plaintext, stretched_key), plaintext - - -def make_asym_key(key, stretch=True): - if stretch: - key = strech_key(key) - asym_key = RSA.generate(2048) - public_key = asym_key.publickey().exportKey("DER") - private_key = asym_key.exportKey("DER", pkcs=8) - return encrypt_sym(private_key, key), public_key, private_key - - -def gen_password(length=32, alphabet=None): - alphabet = string.ascii_letters + string.digits +def gen_password(length=32, alphabet=None) -> str: # FIXME UNUSED + alphabet = alphabet or string.ascii_letters + string.digits while True: password = "".join(secrets.choice(alphabet) for i in range(length)) if ( @@ -317,6 +303,4 @@ def gen_password(length=32, alphabet=None): return password -DECRYPT = {CIPHERS.sym: decrypt_sym, CIPHERS.asym: decrypt_asym} -ENCRYPT = {CIPHERS.sym: encrypt_sym, CIPHERS.asym: encrypt_asym} # vim:set et sts=4 ts=4 tw=120: diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py index e69de29..6cd987f 100644 --- a/tests/e2e/__init__.py +++ b/tests/e2e/__init__.py @@ -0,0 +1,12 @@ +def env_from_ci(): + import os + from pathlib import Path + + if os.environ.get("BITWARDEN_URL", None) is not None: + return + + import yaml + + obj = yaml.safe_load(Path(".github/workflows/ci.yml").read_text()) + for k, v in obj["jobs"]["test"]["steps"][-1]["env"].items(): + os.environ[k] = v diff --git a/tests/e2e/test_bitwarden.py b/tests/e2e/test_bitwarden.py index 2a566a0..6151be9 100644 --- a/tests/e2e/test_bitwarden.py +++ b/tests/e2e/test_bitwarden.py @@ -1,8 +1,16 @@ import os +from pathlib import Path +import string import unittest from vaultwarden.clients.bitwarden import BitwardenAPIClient -from vaultwarden.models.bitwarden import get_organization +from vaultwarden.models.bitwarden import ( + get_organization, +) + +from . import env_from_ci + +env_from_ci() # Get Bitwarden credentials from environment variables url = os.environ.get("BITWARDEN_URL", None) @@ -12,6 +20,7 @@ client_secret = os.environ.get("BITWARDEN_CLIENT_SECRET", None) device_id = os.environ.get("BITWARDEN_DEVICE_ID", None) + # Get test organization id from environment variables test_organization = os.environ.get("BITWARDEN_TEST_ORGANIZATION", None) @@ -35,6 +44,8 @@ class BitwardenBaseTests: + bitwarden: BitwardenAPIClient + def setup_base(self): self.organization = get_organization(self.bitwarden, test_organization) self.test_colls_names = self.organization.collections(as_dict=True) @@ -120,7 +131,7 @@ def test_invite_user_than_remove(self): self.assertIsNotNone(user) user.delete() - def test_rename_organization(self): + def _test_rename_organization(self): old_name = self.organization.Name new_name = "new_test_organization" self.organization.rename(new_name) @@ -142,6 +153,82 @@ def test_deduplicate(self): # Todo build test fixtures and delete them at the end of the test return + def _test_create_user(self): + import random + + from vaultwarden.models.bitwarden import Kdf + from vaultwarden.utils.crypto import gen_password + + rnd = "".join( + random.choices(string.ascii_letters + string.digits, k=10) + ).lower() + self.bitwarden.create_user( + f"test+{rnd}@examle.org", + gen_password(), + "test user", + kdf=Kdf.argon2id(), + ) + + def _test_create_org_login(self): + from secrets import token_bytes + + from vaultwarden.models.bitwarden import Login, LoginData + + for name, key in [ + ("org - with key", token_bytes(64)), + ("org - no key", None), + ]: + data = LoginData.model_construct( + name=name, + password="test123", + username="test", + ) + item = Login.model_construct( + name=name, + login=data, + data=data, + key=key, + ) + self.bitwarden.create_item( + item, self.organization, collections=self.test_colls_ids + ) + + def _test_create_own_login(self): + from secrets import token_bytes + + from vaultwarden.models.bitwarden import Login, LoginData + + for name, key in [ + ("own - with key", token_bytes(64)), + ("own - no key", None), + ]: + data = LoginData.model_construct( + name=name, + password="test123", + username="test", + ) + item = Login.model_construct( + name=name, + login=data, + data=data, + key=key, + ) + self.bitwarden.create_item( + item, None, collections=self.test_colls_ids + ) + + def _test_create_attachment(self): + from vaultwarden.models.bitwarden import Login + + login: Login = next( + filter(lambda x: x.attachments, self.test_org_ciphers) + ) + login.attach(Path(__file__)) + + def test_sync(self): + for v in self.bitwarden._sync.Ciphers: + print(f"{v.Name} - {v.OrganizationId}") + class BitwardenWithEmailTests(unittest.TestCase, BitwardenBaseTests): def setUp(self): diff --git a/tests/e2e/test_vaultwarden.py b/tests/e2e/test_vaultwarden.py index ed6981d..c015bb2 100644 --- a/tests/e2e/test_vaultwarden.py +++ b/tests/e2e/test_vaultwarden.py @@ -4,14 +4,20 @@ from vaultwarden.clients.vaultwarden import VaultwardenAdminClient +from . import env_from_ci + +env_from_ci() + # Get Vaultwarden Admin credentials from environment variables url = os.environ.get("VAULTWARDEN_URL", None) admin_token = os.environ.get("VAULTWARDEN_ADMIN_TOKEN", None) -# TODO Add tests for VaultwardenAdminClient class VaultwardenAdminClientBasic(unittest.TestCase): def setUp(self) -> None: self.vaultwarden = VaultwardenAdminClient( - url=url, admin_secret_token=admin_token + url=url, admin_secret_token=admin_token, preload_users=False ) + + def test_users(self): + self.vaultwarden.users() diff --git a/tests/models/validation/__init__.py b/tests/models/validation/__init__.py index e69de29..251f548 100644 --- a/tests/models/validation/__init__.py +++ b/tests/models/validation/__init__.py @@ -0,0 +1,41 @@ +from typing import Any + +from vaultwarden.models.crypto import CryptoContext + + +def default_ctx(account: str = "test-account") -> Any: + import json + from pathlib import Path + + from vaultwarden.clients.bitwarden import BitwardenAPIClient + from vaultwarden.models.sync import ConnectToken + + client = BitwardenAPIClient( + url=".", + email=f"{account}@example.com", + password=account, + client_id=".", + device_id=".", + client_secret=".", + ) + ctx = CryptoContext(client) + + payload = json.loads( + Path(f"tests/fixtures/{account}/sync_camel.json").read_text() + ) + + ct = { + "Kdf": 0, + "KdfIterations": 600000, + "Key": payload["profile"]["key"], + "PrivateKey": payload["profile"]["privateKey"], + "access_token": "", + "expires_in": 3600, + "token_type": "", + "scope": "", + } + + client._connect_token = ConnectToken.model_validate(ct, context=ctx) + + client._sync_step(payload) + return ctx diff --git a/tests/models/validation/test_bitwarden_models.py b/tests/models/validation/test_bitwarden_models.py index 657296e..6239213 100644 --- a/tests/models/validation/test_bitwarden_models.py +++ b/tests/models/validation/test_bitwarden_models.py @@ -1,4 +1,5 @@ import unittest +from uuid import UUID from pydantic import TypeAdapter from vaultwarden.models.bitwarden import ( @@ -7,6 +8,9 @@ OrganizationUserDetails, ResplistBitwarden, ) +from vaultwarden.models.crypto import CryptoContext + +from . import default_ctx class TestBitwardenModels(unittest.TestCase): @@ -19,20 +23,24 @@ def test_organization(self): payload = self.read_json_payload( "tests/fixtures/test-organization/organization_camel.json" ) - data = Organization.model_validate_json(payload) + data = Organization.model_validate_json( + payload, context=CryptoContext(client=None) + ) assert data.Name == "Test Organization" def test_organization_users(self): payload = self.read_json_payload( "tests/fixtures/test-organization/users_camel.json" ) + ctx = default_ctx() + ctx.parent_id = UUID("cda840d2-1de0-4f31-bd49-b30dacd7e8b0") users = ( ResplistBitwarden[OrganizationUserDetails] .model_validate_json( payload, - context={"parent_id": "cda840d2-1de0-4f31-bd49-b30dacd7e8b0"}, + context=ctx, ) - .model_validate_json(payload) + .model_validate_json(payload, context=ctx) ) assert len(users.Data) == 2 assert users.Data[0].Email == "test-account@example.com" @@ -47,11 +55,17 @@ def test_organization_collections(self): ) collection1 = TypeAdapter(list[CollectionUser]).validate_json( payload1, - context={"parent_id": "9ed17918-31f6-4ac5-ac82-c11541cd8a7c"}, + context=CryptoContext( + client=None, + parent_id=UUID("9ed17918-31f6-4ac5-ac82-c11541cd8a7c"), + ), ) collection2 = TypeAdapter(list[CollectionUser]).validate_json( payload2, - context={"parent_id": "3c73f14f-5a01-4016-98bb-9605146a1a49"}, + context=CryptoContext( + client=None, + parent_id=UUID("3c73f14f-5a01-4016-98bb-9605146a1a49"), + ), ) assert len(collection1) == 0 diff --git a/tests/models/validation/test_pascal_camel_cases.py b/tests/models/validation/test_pascal_camel_cases.py index e82ebd1..c7a8620 100644 --- a/tests/models/validation/test_pascal_camel_cases.py +++ b/tests/models/validation/test_pascal_camel_cases.py @@ -8,6 +8,8 @@ ) from vaultwarden.models.sync import SyncData, VaultwardenUser +from . import default_ctx + class TestModelCases(unittest.TestCase): @staticmethod @@ -22,17 +24,25 @@ def test_organization(self): camel_case_payload = self.read_json_payload( "tests/fixtures/test-organization/organization_camel.json" ) - pascal = Organization.model_validate_json(pascal_case_payload) - camel = Organization.model_validate_json(camel_case_payload) + ctx = default_ctx() + pascal = Organization.model_validate_json( + pascal_case_payload, context=ctx + ) + camel = Organization.model_validate_json( + camel_case_payload, context=ctx + ) self.assertEqual(pascal.Name, camel.Name) def test_collections(self): + ctx = default_ctx() + ctx.push(ctx.client._sync.Profile.Organizations[0].Key) + pascal_case_payload = self.read_json_payload( "tests/fixtures/test-organization/collections/collections_pascal.json" ) pascal_collections = ( ResplistBitwarden[OrganizationCollection] - .model_validate_json(pascal_case_payload) + .model_validate_json(pascal_case_payload, context=ctx) .Data ) camel_case_payload = self.read_json_payload( @@ -40,7 +50,7 @@ def test_collections(self): ) camel_collections = ( ResplistBitwarden[OrganizationCollection] - .model_validate_json(camel_case_payload) + .model_validate_json(camel_case_payload, context=ctx) .Data ) self.assertEqual(len(pascal_collections), len(camel_collections)) @@ -48,14 +58,15 @@ def test_collections(self): self.assertEqual(pascal_collections[1].Name, camel_collections[1].Name) def test_sync_data(self): + ctx = default_ctx() pascal_case_payload = self.read_json_payload( "tests/fixtures/test-account/sync_pascal.json" ) camel_case_payload = self.read_json_payload( "tests/fixtures/test-account/sync_camel.json" ) - pascal = SyncData.model_validate_json(pascal_case_payload) - camel = SyncData.model_validate_json(camel_case_payload) + pascal = SyncData.model_validate_json(pascal_case_payload, context=ctx) + camel = SyncData.model_validate_json(camel_case_payload, context=ctx) self.assertEqual(len(pascal.Ciphers), len(camel.Ciphers)) self.assertEqual(len(pascal.Collections), len(camel.Collections)) self.assertEqual( diff --git a/tests/models/validation/test_sync_models.py b/tests/models/validation/test_sync_models.py index f438f54..063c849 100644 --- a/tests/models/validation/test_sync_models.py +++ b/tests/models/validation/test_sync_models.py @@ -2,6 +2,8 @@ from vaultwarden.models.sync import SyncData +from . import default_ctx + class TestSyncModels(unittest.TestCase): @staticmethod @@ -13,7 +15,9 @@ def test_syncdata(self): payload = self.read_json_payload( "tests/fixtures/test-account/sync_camel.json" ) - data = SyncData.model_validate_json(payload) + ctx = default_ctx() + + data = SyncData.model_validate_json(payload, context=ctx) assert len(data.Ciphers) == 2 assert len(data.Collections) == 3 assert len(data.Profile.Organizations) == 1