-
Notifications
You must be signed in to change notification settings - Fork 13
Minor fixes and polishing for TypeDB extension #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
|
|
||
| ProxyRequestMatcher = Callable[[Headers], bool] | ||
|
|
||
|
|
||
| class TcpForwarder: | ||
| """Simple helper class for bidirectional forwarding of TCP traffic.""" | ||
|
|
||
|
|
@@ -47,6 +48,7 @@ def close(self): | |
|
|
||
| patched_connection = False | ||
|
|
||
|
|
||
| def apply_http2_patches_for_grpc_support( | ||
| target_host: str, target_port: int, should_proxy_request: ProxyRequestMatcher | ||
| ): | ||
|
|
@@ -56,7 +58,9 @@ def apply_http2_patches_for_grpc_support( | |
| """ | ||
| LOG.debug(f"Enabling proxying to backend {target_host}:{target_port}") | ||
| global patched_connection | ||
| assert not patched_connection, "It is not safe to patch H2Connection twice with this function" | ||
| assert not patched_connection, ( | ||
| "It is not safe to patch H2Connection twice with this function" | ||
| ) | ||
| patched_connection = True | ||
|
|
||
| class ForwardingBuffer: | ||
|
|
@@ -65,35 +69,57 @@ class ForwardingBuffer: | |
| data until the ProxyRequestMatcher tells us whether to send it | ||
| to the backend, or leave it to the default handler. | ||
| """ | ||
|
|
||
| backend: TcpForwarder | ||
| buffer: list | ||
| proxying: bool | None | ||
|
|
||
| def __init__(self, http_response_stream): | ||
| self.http_response_stream = http_response_stream | ||
| LOG.debug(f"Starting TCP forwarder to port {target_port} for new HTTP2 connection") | ||
| LOG.debug( | ||
| f"Starting TCP forwarder to port {target_port} for new HTTP2 connection" | ||
| ) | ||
| self.backend = TcpForwarder(target_port, host=target_host) | ||
| self.buffer = [] | ||
| self.proxying = False | ||
| reactor.getThreadPool().callInThread(self.backend.receive_loop, self.received_from_backend) | ||
| self.proxying = None | ||
| reactor.getThreadPool().callInThread( | ||
| self.backend.receive_loop, self.received_from_backend | ||
| ) | ||
|
|
||
| def received_from_backend(self, data): | ||
| LOG.debug(f"Received {len(data)} bytes from backend") | ||
| self.http_response_stream.write(data) | ||
|
|
||
| def received_from_http2_client(self, data, default_handler): | ||
| def received_from_http2_client(self, data, default_handler: Callable): | ||
| if self.proxying is False: | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: Had to slightly adjust the logic in here, to match the logic of what we had prior to the last refactoring round. We need to ensure that non-TypeDB requests sent via HTTP2 are not proxied and forwarded to the target properly. The challenge is that this function may be called multiple times (for multiple HTTP2 frames) until the headers are fully received, hence we distinguish between the value of This behavior can be tested by using the resource browser in the LocalStack Web application in Chrome browser, which sends connections via HTTP2 by default. (This logic is now also covered by a new integration test as part of this PR.)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, that was the challenge — it was hard to follow the rationale originally. Thanks for fixing it here and adding the failing test! When extracting the logic it seemed there was ternary logic involved, and I don't like |
||
| # Note: Return here only if `proxying` is `False` (a value of `None` indicates | ||
| # that the headers have not fully been received yet) | ||
| return default_handler(data) | ||
|
|
||
| if self.proxying: | ||
| assert not self.buffer | ||
| # Keep sending data to the backend for the lifetime of this connection | ||
| self.backend.send(data) | ||
| else: | ||
| self.buffer.append(data) | ||
| if headers := get_headers_from_data_stream(self.buffer): | ||
| self.proxying = should_proxy_request(headers) | ||
| # Now we know what to do with the buffer | ||
| buffered_data = b"".join(self.buffer) | ||
| self.buffer = [] | ||
| if self.proxying: | ||
| LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend") | ||
| self.backend.send(buffered_data) | ||
| else: | ||
| return default_handler(buffered_data) | ||
| return | ||
|
|
||
| self.buffer.append(data) | ||
|
|
||
| if not (headers := get_headers_from_data_stream(self.buffer)): | ||
| # If no headers received yet, then return (method will be called again for next chunk of data) | ||
| return | ||
|
|
||
| self.proxying = should_proxy_request(headers) | ||
|
|
||
| buffered_data = b"".join(self.buffer) | ||
| self.buffer = [] | ||
|
|
||
| if not self.proxying: | ||
| # if this is not a target request, then call the default handler | ||
| default_handler(buffered_data) | ||
| return | ||
|
|
||
| LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend") | ||
| self.backend.send(buffered_data) | ||
|
|
||
| def close(self): | ||
| self.backend.close() | ||
|
|
@@ -104,7 +130,9 @@ def _connectionMade(fn, self, *args, **kwargs): | |
|
|
||
| @patch(H2Connection.dataReceived) | ||
| def _dataReceived(fn, self, data, *args, **kwargs): | ||
| self._ls_forwarding_buffer.received_from_http2_client(data, lambda d: fn(d, *args, **kwargs)) | ||
| self._ls_forwarding_buffer.received_from_http2_client( | ||
| data, lambda d: fn(self, d, *args, **kwargs) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Argh! |
||
| ) | ||
|
|
||
| @patch(H2Connection.connectionLost) | ||
| def connectionLost(fn, self, *args, **kwargs): | ||
|
|
@@ -132,12 +160,11 @@ def get_headers_from_frames(frames: Iterable[Frame]) -> Headers: | |
|
|
||
|
|
||
| def get_frames_from_http2_stream(data: bytes) -> Iterable[Frame]: | ||
| """Parse the data from an HTTP2 stream into a list of frames""" | ||
| frames = [] | ||
| """Parse the data from an HTTP2 stream into an iterable of frames""" | ||
| buffer = FrameBuffer(server=True) | ||
| buffer.max_frame_size = 16384 | ||
| buffer.add_data(data) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious about this one — did it fail?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, great catch - it does actually fail for certain HTTP bytes that are not representing a proper HTTP/2 preamble: Don't have a reproducible test case for this one (will try to pull something together!), but seen this in the logs a couple of times, hence adding the additional guard here.. |
||
| try: | ||
| buffer.add_data(data) | ||
| for frame in buffer: | ||
| yield frame | ||
| except Exception: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,10 @@ | ||
| import requests | ||
| import httpx | ||
| from localstack.utils.strings import short_uid | ||
| from localstack_typedb.utils.h2_proxy import get_frames_from_http2_stream, get_headers_from_frames | ||
| from localstack_typedb.utils.h2_proxy import ( | ||
| get_frames_from_http2_stream, | ||
| get_headers_from_frames, | ||
| ) | ||
| from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType | ||
|
|
||
|
|
||
|
|
@@ -66,8 +70,34 @@ def test_connect_to_db_via_grpc_endpoint(): | |
| results = tx.query( | ||
| 'match $p isa person; fetch {"name": $p.name};' | ||
| ).resolve() | ||
| results = list(results) | ||
| for json in results: | ||
| print(json) | ||
| assert len(results) == 2 | ||
|
|
||
|
|
||
| def test_connect_to_h2_endpoint_non_typedb(): | ||
| url = "https://s3.localhost.localstack.cloud:4566/" | ||
|
|
||
| # make an HTTP/2 request to the LocalStack health endpoint | ||
| with httpx.Client(http2=True, verify=False, trust_env=False) as client: | ||
| health_url = f"{url}/_localstack/health" | ||
| response = client.get(health_url) | ||
|
|
||
| assert response.status_code == 200 | ||
| assert response.http_version == "HTTP/2" | ||
| assert '"services":' in response.text | ||
|
|
||
| # make an HTTP/2 request to a LocalStack endpoint outside the extension (S3 list buckets) | ||
| headers = { | ||
| "Authorization": "AWS4-HMAC-SHA256 Credential=000000000000/20250101/us-east-1/s3/aws4_request, ..." | ||
| } | ||
| with httpx.Client(http2=True, verify=False, trust_env=False) as client: | ||
| response = client.get(url, headers=headers) | ||
|
Comment on lines
+82
to
+96
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! If I had poked around a bit more with LS alongside the extension, the need for this would quickly have become more obvious. |
||
|
|
||
| assert response.status_code == 200 | ||
| assert response.http_version == "HTTP/2" | ||
| assert "<ListAllMyBucketsResult" in response.text | ||
|
|
||
|
|
||
| def test_get_frames_from_http2_stream(): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏 Thanks! I missed linting/formatting manually, doh!