Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/lowkey-vault/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. autoclass:: testcontainers.lowkeyvault.LowkeyVaultContainer
.. title:: testcontainers.lowkeyvault.LowkeyVaultContainer
38 changes: 38 additions & 0 deletions modules/lowkey-vault/example_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import urllib3
from azure.core.pipeline.transport._requests_basic import RequestsTransport
from azure.keyvault.secrets import SecretClient

from testcontainers.lowkeyvault import LowkeyVaultContainer


def basic_example():
with LowkeyVaultContainer() as lowkey_vault_container:
# get connection details
connection_url = lowkey_vault_container.get_connection_url()
print(f"Lowkey Vault is running: {connection_url}")
token = lowkey_vault_container.get_token()
print("Obtained token")
# prepare a transport ignoring self signed certificate issues
transport = RequestsTransport(connection_verify=False)
# make sure to turn off challenge resource verification
secret_client: SecretClient = SecretClient(
vault_url=connection_url, credential=token, verify_challenge_resource=False, transport=transport
)

# set a secret
secret_client.set_secret(name="test-secret", value="a secret message")
print("The secret has been set.")

# get the value of the secret
actual: str = secret_client.get_secret(name="test-secret").value
print(f"The secret has been retrieved with value: '{actual}'")

# close the secret client
secret_client.close()


if __name__ == "__main__":
# ignore cert errors
urllib3.disable_warnings()
# run the code
basic_example()
95 changes: 95 additions & 0 deletions modules/lowkey-vault/testcontainers/lowkeyvault/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from enum import Enum
from typing import Any, Optional

import requests
from azure.core.credentials import AccessToken, TokenCredential

from testcontainers.core.container import DockerContainer
from testcontainers.core.wait_strategies import LogMessageWaitStrategy

# This comment can be removed (Used for testing)


class StaticTokenCredential(TokenCredential):
def __init__(self, json_token: str):
self.access_token = AccessToken(token=json_token.get("access_token"), expires_on=json_token.get("expires_on"))

def get_token(
self,
*scopes: str,
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
enable_cae: bool = False,
**kwargs: Any,
) -> AccessToken:
return self.access_token


class NetworkType(Enum):
NETWORK = "network"
LOCAL = "local"


class LowkeyVaultContainer(DockerContainer):
"""
Container for a Lowkey Vault instance for emulating Azure Key Vault.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs could be updated


Example:

.. doctest::

>>> from azure.core.pipeline.transport._requests_basic import RequestsTransport
>>> from azure.keyvault.secrets import SecretClient
>>> from testcontainers.lowkeyvault import LowkeyVaultContainer

>>> with LowkeyVaultContainer() as lowkey_vault:
... connection_url = lowkey_vault.get_connection_url()
... token = lowkey_vault.get_token()
... transport = RequestsTransport(connection_verify=False)
... # make sure to turn off challenge resource verification
... secret_client = SecretClient(
... vault_url=connection_url,
... credential=token,
... verify_challenge_resource=False,
... transport=transport
... )
"""

def __init__(
self, image: str = "nagyesta/lowkey-vault:7.0.3-ubi10-minimal", container_alias: Optional[str] = None, **kwargs
) -> None:
super().__init__(image, **kwargs)
self.api_port = 8443
self.metadata_port = 8080
self.with_exposed_ports(self.api_port, self.metadata_port)
self.with_env("LOWKEY_VAULT_RELAXED_PORTS", "true")
if container_alias is not None:
self.with_network_aliases(container_alias)
self.with_env("LOWKEY_VAULT_ALIASES", f"localhost={container_alias}:<port>")
self.container_alias = container_alias
self.waiting_for(LogMessageWaitStrategy("Started LowkeyVaultApp."))

def _configure(self) -> None:
return

def get_connection_url(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
if network_type == NetworkType.LOCAL:
return f"https://{self.get_container_host_ip()}:{self.get_exposed_port(self.api_port)}"
else:
return f"https://{self.container_alias}:{self.api_port}"

def get_imds_endpoint(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add a method to set the environment variables automatically for the managed identity simulation (although we cannot create the /var/opt/azcmagent/tokens/assumed-identity.key key file automatically).

if network_type == NetworkType.LOCAL:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.metadata_port)}"
else:
return f"http://{self.container_alias}:{self.metadata_port}"

def get_token_url(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
base_url = self.get_imds_endpoint(network_type=network_type)
return f"{base_url}/metadata/identity/oauth2/token"

def get_token(self, network_type: NetworkType = NetworkType.LOCAL) -> StaticTokenCredential:
resource = self.get_connection_url(network_type=network_type)
token_url = self.get_token_url(network_type=network_type)
json_response = requests.get(f"{token_url}?resource={resource}").json()
return StaticTokenCredential(json_token=json_response)
20 changes: 20 additions & 0 deletions modules/lowkey-vault/tests/samples/network_container/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use an official Python runtime as a parent image
FROM python:3.10-slim

# Set the working directory in the container
WORKDIR /app

# Install dependencies and create a dummy key file for the authentication to work
RUN \
pip install azure-keyvault-certificates==4.10.0 && \
pip install azure-keyvault-secrets==4.10.0 && \
pip install azure-keyvault-keys==4.11.0 && \
pip install cryptography==46.0.3 && \
pip install azure-identity==1.25.1 && \
mkdir -p /var/opt/azcmagent/tokens/ && \
touch /var/opt/azcmagent/tokens/assumed-identity.key

COPY ./netowrk_container.py netowrk_container.py
EXPOSE 80
# Define the command to run the application
CMD ["python", "netowrk_container.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import base64
import os

import urllib3
from azure.core.pipeline.transport._requests_basic import RequestsTransport
from azure.identity import DefaultAzureCredential
from azure.keyvault.certificates import CertificateClient, CertificatePolicy
from azure.keyvault.keys import KeyClient, KeyOperation
from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm, EncryptResult, DecryptResult
from azure.keyvault.secrets import SecretClient
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates

API_VERSION = "7.6"


def hello_secrets_from_an_external_container():
"""
Entry point function for a custom Docker container to test connectivity
and "secrets" functionality with Lowkey Vault.

This function is designed to run inside a separate container within the
same Docker network as a Lowkey Vault instance.
"""
vault_url = os.environ["CONNECTION_URL"]
secret_message = os.environ["SECRET_VALUE"]
secret_name = os.environ["SECRET_NAME"]
# test secrets API to see that the container works
secret_client = None
try:
# ignore SSL errors because we are using a self-signed certificate
transport_secrets = RequestsTransport(connection_verify=False)
# create the client we need
secret_client = SecretClient(
vault_url=vault_url,
credential=DefaultAzureCredential(),
verify_challenge_resource=False,
transport=transport_secrets,
api_version=API_VERSION,
)
# set the result as a secret
secret_client.set_secret(name=secret_name, value=secret_message)
# get back the value
actual = secret_client.get_secret(name=secret_name).value

# verify the result
assert actual == secret_message
print("Lowkey Vault Container created.")
except Exception as e:
print(f"Something went wrong : {e}")
finally:
# close client
if secret_client is not None:
secret_client.close()


def hello_keys_from_an_external_container():
"""
Entry point function for a custom Docker container to test connectivity
and "keys" functionality with Lowkey Vault.

This function is designed to run inside a separate container within the
same Docker network as a Lowkey Vault instance.
"""
vault_url = os.environ["CONNECTION_URL"]
secret_message = os.environ["SECRET_VALUE"]
key_name = os.environ["KEY_NAME"]
# test key API to see that the container works
key_client = None
crypto_client = None
try:
# ignore SSL errors because we are using a self-signed certificate
transport_keys = RequestsTransport(connection_verify=False)
transport_crypto = RequestsTransport(connection_verify=False)
# create the clients we need
key_client = KeyClient(
vault_url=vault_url,
credential=DefaultAzureCredential(),
verify_challenge_resource=False,
transport=transport_keys,
api_version=API_VERSION,
)
# create a new key
key_client.create_rsa_key(
name=key_name,
size=2048,
key_operations=[KeyOperation.encrypt, KeyOperation.decrypt, KeyOperation.wrap_key, KeyOperation.unwrap_key],
)

crypto_client = CryptographyClient(
key=key_client.get_key(name=key_name).id,
credential=DefaultAzureCredential(),
verify_challenge_resource=False,
transport=transport_crypto,
api_version=API_VERSION,
)

# encode the text
text_as_bytes: bytes = bytes(secret_message.encode("utf-8"))
encrypted: EncryptResult = crypto_client.encrypt(
algorithm=EncryptionAlgorithm.rsa_oaep_256, plaintext=text_as_bytes
)
cipher_text: bytes = encrypted.ciphertext

# decode the cipher text
decrypted: DecryptResult = crypto_client.decrypt(
algorithm=EncryptionAlgorithm.rsa_oaep_256, ciphertext=cipher_text
)
decrypted_text: str = decrypted.plaintext.decode("utf-8")

# verify the result
assert decrypted_text == secret_message
print("Lowkey Vault Container created.")
except Exception as e:
print(f"Something went wrong : {e}")
finally:
# close clients
if key_client is not None:
key_client.close()
if crypto_client is not None:
crypto_client.close()


def hello_certificates_from_an_external_container():
"""
Entry point function for a custom Docker container to test connectivity
and "certificates" functionality with Lowkey Vault.

This function is designed to run inside a separate container within the
same Docker network as a Lowkey Vault instance.
"""
vault_url = os.environ["CONNECTION_URL"]
cert_name = os.environ["CERT_NAME"]
# test certificates API to see that the container works
certificate_client = None
secret_client = None
try:
# ignore SSL errors because we are using a self-signed certificate
transport_certs = RequestsTransport(connection_verify=False)
transport_secrets = RequestsTransport(connection_verify=False)
# create the clients we need
certificate_client = CertificateClient(
vault_url=vault_url,
credential=DefaultAzureCredential(),
verify_challenge_resource=False,
transport=transport_certs,
api_version=API_VERSION,
)
secret_client = SecretClient(
vault_url=vault_url,
credential=DefaultAzureCredential(),
verify_challenge_resource=False,
transport=transport_secrets,
api_version=API_VERSION,
)

subject_name: str = "CN=example.com"
policy: CertificatePolicy = CertificatePolicy(
issuer_name="Self",
subject=subject_name,
key_curve_name="P-256",
key_type="EC",
validity_in_months=12,
content_type="application/x-pkcs12",
)
certificate_client.begin_create_certificate(certificate_name=cert_name, policy=policy).wait()

cert_value = secret_client.get_secret(name=cert_name).value

# decode base64 secret
decoded = base64.b64decode(cert_value)
# open decoded secret as PKCS12 file
pkcs12 = load_key_and_certificates(decoded, b"")

# get the components
ec_key: EllipticCurvePrivateKey = pkcs12[0]
x509_cert: x509.Certificate = pkcs12[1]

# verify the result
assert subject_name == x509_cert.subject.rdns[0].rfc4514_string()
assert "secp256r1" == ec_key.curve.name

print("Lowkey Vault Container created.")
except Exception as e:
print(f"Something went wrong : {e}")
finally:
# close clients
if certificate_client is not None:
certificate_client.close()
if secret_client is not None:
secret_client.close()


if __name__ == "__main__":
mode = os.getenv("TEST")
# ignore cert errors
urllib3.disable_warnings()
if mode == "secrets":
hello_secrets_from_an_external_container()
elif mode == "keys":
hello_keys_from_an_external_container()
elif mode == "certificates":
hello_certificates_from_an_external_container()
else:
print("The TEST env variable must be 'secrets', 'keys', or 'certificates'.")
Loading