Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/environments/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from util.drf_writable_nested.serializers import (
DeleteBeforeUpdateWritableNestedModelSerializer,
)
from webhooks.fields import NoSSRFURLField


class EnvironmentSerializerFull(serializers.ModelSerializer): # type: ignore[type-arg]
Expand Down Expand Up @@ -185,6 +186,8 @@ def create(self, validated_data): # type: ignore[no-untyped-def]


class WebhookSerializer(serializers.ModelSerializer): # type: ignore[type-arg]
url = NoSSRFURLField()

class Meta:
model = Webhook
fields = ("id", "url", "enabled", "created_at", "updated_at", "secret")
Expand Down
87 changes: 50 additions & 37 deletions api/tests/unit/webhooks/test_unit_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,50 +358,55 @@ def test_call_integration_webhook__backoff_give_up__does_not_raise_error(
assert result is None


def test_send_test_webhook__200_response_from_webhook__returns_correct_response(
mocker: MockerFixture,
admin_client: APIClient,
organisation: Organisation,
) -> None:
# Given
webhook_url = "http://test.webhook.com"
mock_post = mocker.patch("requests.post")
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = "success"
mock_post.return_value = mock_response

url = reverse("api-v1:webhooks:webhooks-test")

data = {
"webhook_url": webhook_url,
"secret": "some-secret",
"scope": {"type": "organisation", "id": organisation.id},
}

# When
response = admin_client.post(
url, data=json.dumps(data), content_type="application/json"
)

# Then
assert response.status_code == 200
mock_post.assert_called_once()
response_json = response.json()
assert response_json["status"] == 200
assert response_json["detail"] == "Webhook test successful"


@pytest.mark.parametrize(
"external_api_response_status, external_api_error_text, expected_final_status, expected_response_body",
"external_api_response_status, external_api_error_text, expected_final_status",
[
(200, "", 200, {"detail": "Webhook test successful", "status": 200}),
(
400,
"wrong-payload",
400,
{
"detail": "Webhook returned invalid status",
"status": 400,
"body": "wrong-payload",
},
),
(
401,
"invalid-signature",
400,
{
"detail": "Webhook returned invalid status",
"status": 401,
"body": "invalid-signature",
},
),
(
500,
"internal-server-error",
400,
{
"detail": "Webhook returned invalid status",
"status": 500,
"body": "internal-server-error",
},
),
(400, "wrong-payload", 400),
(401, "invalid-signature", 400),
(500, "internal-server-error", 400),
],
)
def test_send_test_webhook__various_status_codes__returns_correct_response(
def test_send_test_webhook__various_error_status_codes__returns_correct_response(
mocker: MockerFixture,
admin_client: APIClient,
external_api_response_status: int,
expected_final_status: int,
external_api_error_text: str,
organisation: Organisation,
expected_response_body: dict[str, str | int],
) -> None:
# Given
webhook_url = "http://test.webhook.com"
Expand All @@ -427,7 +432,13 @@ def test_send_test_webhook__various_status_codes__returns_correct_response(
# Then
assert response.status_code == expected_final_status
mock_post.assert_called_once()
assert response.json() == expected_response_body
response_json = response.json()
assert response_json["status"] == external_api_response_status
assert response_json["detail"] == "Webhook returned invalid status"
assert (
response_json["body"]
== "Please check the webhook endpoint to validate it returns a 200 OK."
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -504,7 +515,9 @@ def test_send_test_webhook__request_exception__returns_error_response(
# Given
webhook_url = "http://test.webhook.com"
mock_post = mocker.patch("requests.post")
mock_post.side_effect = requests.exceptions.RequestException("Connection refused")
mock_post.side_effect = requests.exceptions.RequestException(
"Some internal exception details that should not be exposed!"
)

url = reverse("api-v1:webhooks:webhooks-test")

Expand All @@ -523,7 +536,7 @@ def test_send_test_webhook__request_exception__returns_error_response(
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {
"detail": "Could not connect to webhook URL",
"body": "Connection refused",
"body": "Please check the URL, and ensure it is valid and accessible from the server.",
}


Expand Down
103 changes: 103 additions & 0 deletions api/tests/unit/webhooks/test_webhooks_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import socket
from unittest import mock

import pytest
from rest_framework.exceptions import ValidationError

from webhooks.fields import NoSSRFURLField


@pytest.fixture()
def field() -> NoSSRFURLField:
return NoSSRFURLField()


@pytest.mark.parametrize(
"url,label",
[
("http://127.0.0.1/hook", "loopback IPv4"),
("http://127.0.0.2/hook", "loopback IPv4 (non-first)"),
("http://[::1]/hook", "loopback IPv6"),
("http://10.0.0.1/hook", "RFC1918 10.0.0.0/8"),
("http://10.255.255.255/hook", "RFC1918 10.0.0.0/8 boundary"),
("http://172.16.0.1/hook", "RFC1918 172.16.0.0/12"),
("http://172.31.255.255/hook", "RFC1918 172.16.0.0/12 boundary"),
("http://192.168.0.1/hook", "RFC1918 192.168.0.0/16"),
("http://192.168.255.255/hook", "RFC1918 192.168.0.0/16 boundary"),
("http://169.254.1.1/hook", "link-local IPv4"),
("http://[fe80::1]/hook", "link-local IPv6"),
("http://224.0.0.1/hook", "multicast"),
("http://240.0.0.1/hook", "reserved"),
],
)
def test_no_ssrf_url_field__internal_ip__raises_validation_error( # noqa: FT004
field: NoSSRFURLField,
url: str,
label: str,
) -> None:
# Given / When / Then
with pytest.raises(ValidationError) as exc_info:
field.run_validation(url)

assert "internal_address" in str(exc_info.value.detail)


def test_no_ssrf_url_field__localhost_hostname__raises_validation_error(
field: NoSSRFURLField,
) -> None:
# Given — localhost resolves to 127.0.0.1
# When / Then
with pytest.raises(ValidationError) as exc_info:
field.run_validation("http://localhost/hook")

assert "internal_address" in str(exc_info.value.detail)


def test_no_ssrf_url_field__hostname_resolving_to_private_ip__raises_validation_error(
field: NoSSRFURLField,
) -> None:
# Given — a hostname that resolves to an RFC1918 address
with mock.patch(
"webhooks.fields.socket.gethostbyname",
return_value="192.168.1.100",
):
# When / Then
with pytest.raises(ValidationError) as exc_info:
field.run_validation("http://internal.example.com/hook")

assert "internal_address" in str(exc_info.value.detail)


@pytest.mark.parametrize(
"url,label",
[
("https://example.com/hook", "public hostname"),
("https://hooks.example.org/path?foo=bar", "public hostname with path"),
("http://8.8.8.8/hook", "public IPv4"),
],
)
def test_no_ssrf_url_field__public_address__returns_value(
field: NoSSRFURLField,
url: str,
label: str,
) -> None:
# Given / When
result = field.run_validation(url)

# Then
assert result == url


def test_no_ssrf_url_field__unresolvable_hostname__returns_value(
field: NoSSRFURLField,
) -> None:
# Given — the hostname cannot be resolved; URL format is still valid
with mock.patch(
"webhooks.fields.socket.gethostbyname",
side_effect=socket.gaierror,
):
# When
result = field.run_validation("https://unresolvable.example.com/hook")

# Then
assert result == "https://unresolvable.example.com/hook"
48 changes: 48 additions & 0 deletions api/webhooks/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import ipaddress
import socket
from urllib.parse import urlparse

from rest_framework import serializers


class NoSSRFURLField(serializers.URLField):
"""
A URL field that rejects URLs resolving to internal network addresses,
preventing Server-Side Request Forgery (SSRF) attacks.

Blocks loopback (127.0.0.0/8, ::1), RFC 1918 private ranges
(10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16), link-local
(169.254.0.0/16, fe80::/10), and other reserved/multicast ranges.
Hostnames are resolved to their IP address before checking.
"""

default_error_messages = {
**serializers.URLField.default_error_messages,
"internal_address": (
"Webhook URLs must not target internal or private network addresses."
),
}

def run_validators(self, value: str) -> None:
super().run_validators(value)

hostname = urlparse(value).hostname or ""

try:
ip = ipaddress.ip_address(hostname)
except ValueError:
# hostname is a name rather than a literal IP — resolve it.
try:
ip = ipaddress.ip_address(socket.gethostbyname(hostname))
except socket.gaierror:
# Unresolvable hostname; leave it to the URL validator.
return

if (
ip.is_loopback
or ip.is_private
or ip.is_link_local
or ip.is_reserved
or ip.is_multicast
):
self.fail("internal_address")
6 changes: 4 additions & 2 deletions api/webhooks/serializers.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from rest_framework import serializers

from webhooks.fields import NoSSRFURLField


class WebhookSerializer(serializers.Serializer[None]):
event_type = serializers.ChoiceField(choices=["FLAG_UPDATED", "AUDIT_LOG_CREATED"])
data = serializers.DictField() # type: ignore[assignment]


class WebhookURLSerializer(serializers.Serializer[None]):
url = serializers.URLField()
url = NoSSRFURLField()


class ScopeSerializer(serializers.Serializer[None]):
type = serializers.ChoiceField(choices=["organisation", "environment"])


class TestWebhookSerializer(serializers.Serializer[None]):
webhook_url = serializers.URLField(required=True)
webhook_url = NoSSRFURLField(required=True)
scope = ScopeSerializer(required=True)
secret = serializers.CharField(required=False, allow_blank=True, allow_null=True)

Expand Down
6 changes: 3 additions & 3 deletions api/webhooks/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test(self, request: Request) -> Response:
return Response(
{
"detail": "Webhook returned invalid status",
"body": response.text,
"body": "Please check the webhook endpoint to validate it returns a 200 OK.",
"status": response.status_code,
},
status=status.HTTP_400_BAD_REQUEST,
Expand All @@ -63,11 +63,11 @@ def test(self, request: Request) -> Response:
},
status=status.HTTP_200_OK,
)
except requests.exceptions.RequestException as e:
except requests.exceptions.RequestException:
return Response(
{
"detail": "Could not connect to webhook URL",
"body": str(e),
"body": "Please check the URL, and ensure it is valid and accessible from the server.",
},
status=status.HTTP_400_BAD_REQUEST,
)
Loading