Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/typedb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
pip install localstack

make install
make lint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏 Thanks! I missed linting/formatting manually, doh!

make dist
localstack extensions -v install file://$(ls ./dist/localstack_extension_typedb-*.tar.gz)

Expand Down
3 changes: 2 additions & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/http-bin/ @thrau @dominikschubert
/mailhog/ @lukqw @thrau
/miniflare/ @whummer @HarshCasper
/stripe/ @lukqw @thrau
/stripe/ @lukqw @thrau
/typedb/ @whummer @purcell
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ You can install the respective extension by calling `localstack extensions insta
| [Miniflare](https://github.com/localstack/localstack-extensions/tree/main/miniflare) | localstack-extension-miniflare | 0.1.0 | Experimental |
| [Stripe](https://github.com/localstack/localstack-extensions/tree/main/stripe) | localstack-extension-stripe | 0.2.0 | Stable |
| [Terraform Init](https://github.com/localstack/localstack-extensions/tree/main/terraform-init) | localstack-extension-terraform-init | 0.2.0 | Experimental |
| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.0 | Experimental |
| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.2 | Experimental |


## Developing Extensions
Expand Down
8 changes: 6 additions & 2 deletions typedb/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ VENV_BIN = python3 -m venv
VENV_DIR ?= .venv
VENV_ACTIVATE = $(VENV_DIR)/bin/activate
VENV_RUN = . $(VENV_ACTIVATE)
TEST_PATH ?= tests

usage: ## Shows usage for this Makefile
@cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}'
Expand Down Expand Up @@ -33,10 +34,13 @@ entrypoints: venv ## Generate plugin entrypoints for Python package
$(VENV_RUN); python -m plux entrypoints

format: ## Run ruff to format the codebase
$(VENV_RUN); python -m ruff format .; python -m ruff check --output-format=full --fix .
$(VENV_RUN); python -m ruff format .; make lint

lint: ## Run ruff to lint the codebase
$(VENV_RUN); python -m ruff check --output-format=full .

test: ## Run integration tests (requires LocalStack running with the Extension installed)
$(VENV_RUN); pytest tests $(PYTEST_ARGS)
$(VENV_RUN); pytest $(PYTEST_ARGS) $(TEST_PATH)

clean-dist: clean
rm -rf dist/
Expand Down
5 changes: 5 additions & 0 deletions typedb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ localstack extensions install "git+https://github.com/localstack/localstack-exte

Please refer to the docs [here](https://github.com/localstack/localstack-extensions?tab=readme-ov-file#start-localstack-with-the-extension) for instructions on how to start the extension in developer mode.

## Change Log

* `0.1.1`: Minor fixes in CI setup, exception handling
* `0.1.0`: Initial version of the extension

## License

The code in this repo is available under the Apache 2.0 license.
1 change: 0 additions & 1 deletion typedb/localstack_typedb/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import shlex

from localstack.config import is_env_not_false
from localstack.utils.docker_utils import DOCKER_CLIENT
from localstack_typedb.utils.docker import ProxiedDockerContainerExtension
from rolo import Request
from werkzeug.datastructures import Headers
Expand Down
7 changes: 2 additions & 5 deletions typedb/localstack_typedb/utils/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from localstack.config import is_env_true
from localstack_typedb.utils.h2_proxy import (
apply_http2_patches_for_grpc_support,
ProxyRequestMatcher,
)
from localstack.utils.docker_utils import DOCKER_CLIENT
from localstack.extensions.api import Extension, http
Expand Down Expand Up @@ -71,13 +70,11 @@ def __init__(
):
self.image_name = image_name
if not container_ports:
raise ArgumentError("container_ports is required")
raise ValueError("container_ports is required")
self.container_ports = container_ports
self.host = host
self.path = path
self.container_name = re.sub(
r"\W", "-", f"ls-ext-{self.name}"
)
self.container_name = re.sub(r"\W", "-", f"ls-ext-{self.name}")
self.command = command
self.request_to_port_router = request_to_port_router
self.http2_ports = http2_ports
Expand Down
69 changes: 48 additions & 21 deletions typedb/localstack_typedb/utils/h2_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

ProxyRequestMatcher = Callable[[Headers], bool]


class TcpForwarder:
"""Simple helper class for bidirectional forwarding of TCP traffic."""

Expand Down Expand Up @@ -47,6 +48,7 @@ def close(self):

patched_connection = False


def apply_http2_patches_for_grpc_support(
target_host: str, target_port: int, should_proxy_request: ProxyRequestMatcher
):
Expand All @@ -56,7 +58,9 @@ def apply_http2_patches_for_grpc_support(
"""
LOG.debug(f"Enabling proxying to backend {target_host}:{target_port}")
global patched_connection
assert not patched_connection, "It is not safe to patch H2Connection twice with this function"
assert not patched_connection, (
"It is not safe to patch H2Connection twice with this function"
)
patched_connection = True

class ForwardingBuffer:
Expand All @@ -65,35 +69,57 @@ class ForwardingBuffer:
data until the ProxyRequestMatcher tells us whether to send it
to the backend, or leave it to the default handler.
"""

backend: TcpForwarder
buffer: list
proxying: bool | None

def __init__(self, http_response_stream):
self.http_response_stream = http_response_stream
LOG.debug(f"Starting TCP forwarder to port {target_port} for new HTTP2 connection")
LOG.debug(
f"Starting TCP forwarder to port {target_port} for new HTTP2 connection"
)
self.backend = TcpForwarder(target_port, host=target_host)
self.buffer = []
self.proxying = False
reactor.getThreadPool().callInThread(self.backend.receive_loop, self.received_from_backend)
self.proxying = None
reactor.getThreadPool().callInThread(
self.backend.receive_loop, self.received_from_backend
)

def received_from_backend(self, data):
LOG.debug(f"Received {len(data)} bytes from backend")
self.http_response_stream.write(data)

def received_from_http2_client(self, data, default_handler):
def received_from_http2_client(self, data, default_handler: Callable):
if self.proxying is False:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Had to slightly adjust the logic in here, to match the logic of what we had prior to the last refactoring round. We need to ensure that non-TypeDB requests sent via HTTP2 are not proxied and forwarded to the target properly.

The challenge is that this function may be called multiple times (for multiple HTTP2 frames) until the headers are fully received, hence we distinguish between the value of proxying being None (connection not fully initialized yet, still waiting for headers), and being False (connection initialized, headers indicate that the request is not in scope for the proxy).

This behavior can be tested by using the resource browser in the LocalStack Web application in Chrome browser, which sends connections via HTTP2 by default. (This logic is now also covered by a new integration test as part of this PR.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that was the challenge — it was hard to follow the rationale originally. Thanks for fixing it here and adding the failing test! When extracting the logic it seemed there was ternary logic involved, and I don't like bool | None for that, so I started to use an Enum initially with match, to make this state machine more explicit. Then at some point I wrongly concluded that there were only two possible states, and reverted to a bool.

# Note: Return here only if `proxying` is `False` (a value of `None` indicates
# that the headers have not fully been received yet)
return default_handler(data)

if self.proxying:
assert not self.buffer
# Keep sending data to the backend for the lifetime of this connection
self.backend.send(data)
else:
self.buffer.append(data)
if headers := get_headers_from_data_stream(self.buffer):
self.proxying = should_proxy_request(headers)
# Now we know what to do with the buffer
buffered_data = b"".join(self.buffer)
self.buffer = []
if self.proxying:
LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
self.backend.send(buffered_data)
else:
return default_handler(buffered_data)
return

self.buffer.append(data)

if not (headers := get_headers_from_data_stream(self.buffer)):
# If no headers received yet, then return (method will be called again for next chunk of data)
return

self.proxying = should_proxy_request(headers)

buffered_data = b"".join(self.buffer)
self.buffer = []

if not self.proxying:
# if this is not a target request, then call the default handler
default_handler(buffered_data)
return

LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
self.backend.send(buffered_data)

def close(self):
self.backend.close()
Expand All @@ -104,7 +130,9 @@ def _connectionMade(fn, self, *args, **kwargs):

@patch(H2Connection.dataReceived)
def _dataReceived(fn, self, data, *args, **kwargs):
self._ls_forwarding_buffer.received_from_http2_client(data, lambda d: fn(d, *args, **kwargs))
self._ls_forwarding_buffer.received_from_http2_client(
data, lambda d: fn(self, d, *args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh!

)

@patch(H2Connection.connectionLost)
def connectionLost(fn, self, *args, **kwargs):
Expand Down Expand Up @@ -132,12 +160,11 @@ def get_headers_from_frames(frames: Iterable[Frame]) -> Headers:


def get_frames_from_http2_stream(data: bytes) -> Iterable[Frame]:
"""Parse the data from an HTTP2 stream into a list of frames"""
frames = []
"""Parse the data from an HTTP2 stream into an iterable of frames"""
buffer = FrameBuffer(server=True)
buffer.max_frame_size = 16384
buffer.add_data(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this one — did it fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, great catch - it does actually fail for certain HTTP bytes that are not representing a proper HTTP/2 preamble:

...
  File "/opt/code/extensions/typedb/localstack_typedb/utils/h2_proxy.py", line 128, in get_headers_from_data_stream
    return get_headers_from_frames(get_frames_from_http2_stream(stream))
  File "/opt/code/extensions/typedb/localstack_typedb/utils/h2_proxy.py", line 135, in get_headers_from_frames
    for frame in frames:
  File "/opt/code/extensions/typedb/localstack_typedb/utils/h2_proxy.py", line 149, in get_frames_from_http2_stream
    buffer.add_data(data)
  File "/opt/code/localstack/.venv/lib/python3.13/site-packages/h2/frame_buffer.py", line 51, in add_data
    raise ProtocolError(msg)
h2.exceptions.ProtocolError: Invalid HTTP/2 preamble.

Don't have a reproducible test case for this one (will try to pull something together!), but seen this in the logs a couple of times, hence adding the additional guard here..

try:
buffer.add_data(data)
for frame in buffer:
yield frame
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion typedb/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "localstack-extension-typedb"
version = "0.1.0"
version = "0.1.2"
description = "LocalStack Extension: TypeDB on LocalStack"
readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"}
requires-python = ">=3.9"
Expand Down
32 changes: 31 additions & 1 deletion typedb/tests/test_extension.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import requests
import httpx
from localstack.utils.strings import short_uid
from localstack_typedb.utils.h2_proxy import get_frames_from_http2_stream, get_headers_from_frames
from localstack_typedb.utils.h2_proxy import (
get_frames_from_http2_stream,
get_headers_from_frames,
)
from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType


Expand Down Expand Up @@ -66,8 +70,34 @@ def test_connect_to_db_via_grpc_endpoint():
results = tx.query(
'match $p isa person; fetch {"name": $p.name};'
).resolve()
results = list(results)
for json in results:
print(json)
assert len(results) == 2


def test_connect_to_h2_endpoint_non_typedb():
url = "https://s3.localhost.localstack.cloud:4566/"

# make an HTTP/2 request to the LocalStack health endpoint
with httpx.Client(http2=True, verify=False, trust_env=False) as client:
health_url = f"{url}/_localstack/health"
response = client.get(health_url)

assert response.status_code == 200
assert response.http_version == "HTTP/2"
assert '"services":' in response.text

# make an HTTP/2 request to a LocalStack endpoint outside the extension (S3 list buckets)
headers = {
"Authorization": "AWS4-HMAC-SHA256 Credential=000000000000/20250101/us-east-1/s3/aws4_request, ..."
}
with httpx.Client(http2=True, verify=False, trust_env=False) as client:
response = client.get(url, headers=headers)
Comment on lines +82 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! If I had poked around a bit more with LS alongside the extension, the need for this would quickly have become more obvious.


assert response.status_code == 200
assert response.http_version == "HTTP/2"
assert "<ListAllMyBucketsResult" in response.text


def test_get_frames_from_http2_stream():
Expand Down