diff --git a/.github/workflows/typedb.yml b/.github/workflows/typedb.yml index b4fa7e0..feec698 100644 --- a/.github/workflows/typedb.yml +++ b/.github/workflows/typedb.yml @@ -26,6 +26,7 @@ jobs: pip install localstack make install + make lint make dist localstack extensions -v install file://$(ls ./dist/localstack_extension_typedb-*.tar.gz) diff --git a/CODEOWNERS b/CODEOWNERS index 5b72205..fca8505 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -18,4 +18,5 @@ /http-bin/ @thrau @dominikschubert /mailhog/ @lukqw @thrau /miniflare/ @whummer @HarshCasper -/stripe/ @lukqw @thrau \ No newline at end of file +/stripe/ @lukqw @thrau +/typedb/ @whummer @purcell diff --git a/README.md b/README.md index 1ffe6fc..93d9e61 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ You can install the respective extension by calling `localstack extensions insta | [Miniflare](https://github.com/localstack/localstack-extensions/tree/main/miniflare) | localstack-extension-miniflare | 0.1.0 | Experimental | | [Stripe](https://github.com/localstack/localstack-extensions/tree/main/stripe) | localstack-extension-stripe | 0.2.0 | Stable | | [Terraform Init](https://github.com/localstack/localstack-extensions/tree/main/terraform-init) | localstack-extension-terraform-init | 0.2.0 | Experimental | -| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.0 | Experimental | +| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.2 | Experimental | ## Developing Extensions diff --git a/typedb/Makefile b/typedb/Makefile index d274608..dea9383 100644 --- a/typedb/Makefile +++ b/typedb/Makefile @@ -2,6 +2,7 @@ VENV_BIN = python3 -m venv VENV_DIR ?= .venv VENV_ACTIVATE = $(VENV_DIR)/bin/activate VENV_RUN = . $(VENV_ACTIVATE) +TEST_PATH ?= tests usage: ## Shows usage for this Makefile @cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' @@ -33,10 +34,13 @@ entrypoints: venv ## Generate plugin entrypoints for Python package $(VENV_RUN); python -m plux entrypoints format: ## Run ruff to format the codebase - $(VENV_RUN); python -m ruff format .; python -m ruff check --output-format=full --fix . + $(VENV_RUN); python -m ruff format .; make lint + +lint: ## Run ruff to lint the codebase + $(VENV_RUN); python -m ruff check --output-format=full . test: ## Run integration tests (requires LocalStack running with the Extension installed) - $(VENV_RUN); pytest tests $(PYTEST_ARGS) + $(VENV_RUN); pytest $(PYTEST_ARGS) $(TEST_PATH) clean-dist: clean rm -rf dist/ diff --git a/typedb/README.md b/typedb/README.md index 430496c..8eeb372 100644 --- a/typedb/README.md +++ b/typedb/README.md @@ -33,6 +33,11 @@ localstack extensions install "git+https://github.com/localstack/localstack-exte Please refer to the docs [here](https://github.com/localstack/localstack-extensions?tab=readme-ov-file#start-localstack-with-the-extension) for instructions on how to start the extension in developer mode. +## Change Log + +* `0.1.1`: Minor fixes in CI setup, exception handling +* `0.1.0`: Initial version of the extension + ## License The code in this repo is available under the Apache 2.0 license. diff --git a/typedb/localstack_typedb/extension.py b/typedb/localstack_typedb/extension.py index 226778c..21d8c81 100644 --- a/typedb/localstack_typedb/extension.py +++ b/typedb/localstack_typedb/extension.py @@ -2,7 +2,6 @@ import shlex from localstack.config import is_env_not_false -from localstack.utils.docker_utils import DOCKER_CLIENT from localstack_typedb.utils.docker import ProxiedDockerContainerExtension from rolo import Request from werkzeug.datastructures import Headers diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index 2c8b03c..7a78c60 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -9,7 +9,6 @@ from localstack.config import is_env_true from localstack_typedb.utils.h2_proxy import ( apply_http2_patches_for_grpc_support, - ProxyRequestMatcher, ) from localstack.utils.docker_utils import DOCKER_CLIENT from localstack.extensions.api import Extension, http @@ -71,13 +70,11 @@ def __init__( ): self.image_name = image_name if not container_ports: - raise ArgumentError("container_ports is required") + raise ValueError("container_ports is required") self.container_ports = container_ports self.host = host self.path = path - self.container_name = re.sub( - r"\W", "-", f"ls-ext-{self.name}" - ) + self.container_name = re.sub(r"\W", "-", f"ls-ext-{self.name}") self.command = command self.request_to_port_router = request_to_port_router self.http2_ports = http2_ports diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index ee533c1..92dc123 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -16,6 +16,7 @@ ProxyRequestMatcher = Callable[[Headers], bool] + class TcpForwarder: """Simple helper class for bidirectional forwarding of TCP traffic.""" @@ -47,6 +48,7 @@ def close(self): patched_connection = False + def apply_http2_patches_for_grpc_support( target_host: str, target_port: int, should_proxy_request: ProxyRequestMatcher ): @@ -56,7 +58,9 @@ def apply_http2_patches_for_grpc_support( """ LOG.debug(f"Enabling proxying to backend {target_host}:{target_port}") global patched_connection - assert not patched_connection, "It is not safe to patch H2Connection twice with this function" + assert not patched_connection, ( + "It is not safe to patch H2Connection twice with this function" + ) patched_connection = True class ForwardingBuffer: @@ -65,35 +69,57 @@ class ForwardingBuffer: data until the ProxyRequestMatcher tells us whether to send it to the backend, or leave it to the default handler. """ + + backend: TcpForwarder + buffer: list + proxying: bool | None + def __init__(self, http_response_stream): self.http_response_stream = http_response_stream - LOG.debug(f"Starting TCP forwarder to port {target_port} for new HTTP2 connection") + LOG.debug( + f"Starting TCP forwarder to port {target_port} for new HTTP2 connection" + ) self.backend = TcpForwarder(target_port, host=target_host) self.buffer = [] - self.proxying = False - reactor.getThreadPool().callInThread(self.backend.receive_loop, self.received_from_backend) + self.proxying = None + reactor.getThreadPool().callInThread( + self.backend.receive_loop, self.received_from_backend + ) def received_from_backend(self, data): LOG.debug(f"Received {len(data)} bytes from backend") self.http_response_stream.write(data) - def received_from_http2_client(self, data, default_handler): + def received_from_http2_client(self, data, default_handler: Callable): + if self.proxying is False: + # Note: Return here only if `proxying` is `False` (a value of `None` indicates + # that the headers have not fully been received yet) + return default_handler(data) + if self.proxying: assert not self.buffer # Keep sending data to the backend for the lifetime of this connection self.backend.send(data) - else: - self.buffer.append(data) - if headers := get_headers_from_data_stream(self.buffer): - self.proxying = should_proxy_request(headers) - # Now we know what to do with the buffer - buffered_data = b"".join(self.buffer) - self.buffer = [] - if self.proxying: - LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend") - self.backend.send(buffered_data) - else: - return default_handler(buffered_data) + return + + self.buffer.append(data) + + if not (headers := get_headers_from_data_stream(self.buffer)): + # If no headers received yet, then return (method will be called again for next chunk of data) + return + + self.proxying = should_proxy_request(headers) + + buffered_data = b"".join(self.buffer) + self.buffer = [] + + if not self.proxying: + # if this is not a target request, then call the default handler + default_handler(buffered_data) + return + + LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend") + self.backend.send(buffered_data) def close(self): self.backend.close() @@ -104,7 +130,9 @@ def _connectionMade(fn, self, *args, **kwargs): @patch(H2Connection.dataReceived) def _dataReceived(fn, self, data, *args, **kwargs): - self._ls_forwarding_buffer.received_from_http2_client(data, lambda d: fn(d, *args, **kwargs)) + self._ls_forwarding_buffer.received_from_http2_client( + data, lambda d: fn(self, d, *args, **kwargs) + ) @patch(H2Connection.connectionLost) def connectionLost(fn, self, *args, **kwargs): @@ -132,12 +160,11 @@ def get_headers_from_frames(frames: Iterable[Frame]) -> Headers: def get_frames_from_http2_stream(data: bytes) -> Iterable[Frame]: - """Parse the data from an HTTP2 stream into a list of frames""" - frames = [] + """Parse the data from an HTTP2 stream into an iterable of frames""" buffer = FrameBuffer(server=True) buffer.max_frame_size = 16384 - buffer.add_data(data) try: + buffer.add_data(data) for frame in buffer: yield frame except Exception: diff --git a/typedb/pyproject.toml b/typedb/pyproject.toml index 0141906..8ff9ba7 100644 --- a/typedb/pyproject.toml +++ b/typedb/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "localstack-extension-typedb" -version = "0.1.0" +version = "0.1.2" description = "LocalStack Extension: TypeDB on LocalStack" readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"} requires-python = ">=3.9" diff --git a/typedb/tests/test_extension.py b/typedb/tests/test_extension.py index 4bdd47f..efe3fc4 100644 --- a/typedb/tests/test_extension.py +++ b/typedb/tests/test_extension.py @@ -1,6 +1,10 @@ import requests +import httpx from localstack.utils.strings import short_uid -from localstack_typedb.utils.h2_proxy import get_frames_from_http2_stream, get_headers_from_frames +from localstack_typedb.utils.h2_proxy import ( + get_frames_from_http2_stream, + get_headers_from_frames, +) from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType @@ -66,8 +70,34 @@ def test_connect_to_db_via_grpc_endpoint(): results = tx.query( 'match $p isa person; fetch {"name": $p.name};' ).resolve() + results = list(results) for json in results: print(json) + assert len(results) == 2 + + +def test_connect_to_h2_endpoint_non_typedb(): + url = "https://s3.localhost.localstack.cloud:4566/" + + # make an HTTP/2 request to the LocalStack health endpoint + with httpx.Client(http2=True, verify=False, trust_env=False) as client: + health_url = f"{url}/_localstack/health" + response = client.get(health_url) + + assert response.status_code == 200 + assert response.http_version == "HTTP/2" + assert '"services":' in response.text + + # make an HTTP/2 request to a LocalStack endpoint outside the extension (S3 list buckets) + headers = { + "Authorization": "AWS4-HMAC-SHA256 Credential=000000000000/20250101/us-east-1/s3/aws4_request, ..." + } + with httpx.Client(http2=True, verify=False, trust_env=False) as client: + response = client.get(url, headers=headers) + + assert response.status_code == 200 + assert response.http_version == "HTTP/2" + assert "