From d13a512ffbaedf1a35001268d214da0d2515b24a Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Thu, 14 May 2026 12:49:41 +0530 Subject: [PATCH 1/3] UN-3393 [FEAT] Add SanitizedSerializerMixin foundation for input validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation PR for the input-validation hardening plan (CWE-20). No behaviour change yet — nothing imports the new pre-mixed classes; the sweep that rewrites serializer imports lands in a follow-up PR. - New `utils.serializer.sanitization` exposes `SanitizedSerializerMixin` plus pre-mixed `ModelSerializer`, `Serializer`, and `HyperlinkedModelSerializer`. The mixin walks `self.fields` in `__init__` and attaches `validate_no_html_tags` to every writable `CharField`. Opt out per-serializer via `Meta.html_safe_fields = (...)` for fields that legitimately accept HTML-like content (e.g. prompt text, regex literals). Read-only fields are naturally exempt — DRF skips validators on `read_only=True`. - `utils.serializer.__init__` re-exports the pre-mixed classes alongside the existing `IntegrityErrorMixin`, so consumers can do `from utils.serializer import ModelSerializer`. - `input_sanitizer.validate_no_html_tags` now emits a structured `logger.warning("input_validation_rejected", extra={"field", "reason"})` in each rejection branch (`html_tag`, `js_protocol`, `event_handler`). Searchable in logs; no metric / alert wired. - New `cleanup_html_payloads` management command (under `backend/commands/...`) replaces stored pentest payloads in the columns flagged by the May 2026 prod scan: `custom_tool.tool_name`, `custom_tool.description`, `workflow.workflow_name`, `workflow.description`, `api_deployment.display_name`, `api_deployment.description`, `adapter_instance.adapter_name`. Default mode is dry-run; `--apply` performs the replacement inside a transaction. - 14 new mixin unit tests; existing 36 `test_input_sanitizer` tests unchanged and still pass (50 total). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../commands/cleanup_html_payloads.py | 118 ++++++++++++++++++ backend/utils/input_sanitizer.py | 29 ++++- backend/utils/serializer/__init__.py | 15 +++ backend/utils/serializer/sanitization.py | 55 ++++++++ .../tests/test_sanitized_serializer_mixin.py | 113 +++++++++++++++++ 5 files changed, 327 insertions(+), 3 deletions(-) create mode 100644 backend/commands/management/commands/cleanup_html_payloads.py create mode 100644 backend/utils/serializer/__init__.py create mode 100644 backend/utils/serializer/sanitization.py create mode 100644 backend/utils/tests/test_sanitized_serializer_mixin.py diff --git a/backend/commands/management/commands/cleanup_html_payloads.py b/backend/commands/management/commands/cleanup_html_payloads.py new file mode 100644 index 0000000000..6f23270544 --- /dev/null +++ b/backend/commands/management/commands/cleanup_html_payloads.py @@ -0,0 +1,118 @@ +"""Replace stored HTML/JS payloads in name/description columns. + +Targets the columns flagged by the May 2026 prod scan against US prod, where +all matches were pentest leftovers. See KB +`Obsidian Vault/zipstuff/UN-3393-input-validation/05-prod-baseline.md` for the +scan methodology. + +Default mode is `--dry-run`: print id + table + column + truncated value for +every row that would be replaced. Pass `--apply` to perform the replacement +inside a single transaction. The replacement value is `REPLACEMENT`. + +Usage: + python manage.py cleanup_html_payloads # dry-run + python manage.py cleanup_html_payloads --apply # apply +""" + +from django.core.management.base import BaseCommand +from django.db import transaction +from utils.input_sanitizer import ( + EVENT_HANDLER_PATTERN, + HTML_TAG_PATTERN, + JS_PROTOCOL_PATTERN, +) + +REPLACEMENT = "[removed-invalid-content]" +PREVIEW_CHARS = 160 + +# (app_label, model_name, column_name) — resolved dynamically to keep this +# command independent of import-order between apps and pluggable_apps. +TARGETS: tuple[tuple[str, str, str], ...] = ( + ("prompt_studio_core_v2", "CustomTool", "tool_name"), + ("prompt_studio_core_v2", "CustomTool", "description"), + ("workflow_v2", "Workflow", "workflow_name"), + ("workflow_v2", "Workflow", "description"), + ("api_v2", "APIDeployment", "display_name"), + ("api_v2", "APIDeployment", "description"), + ("adapter_processor_v2", "AdapterInstance", "adapter_name"), +) + + +def _has_payload(value: str | None) -> bool: + if not value: + return False + return bool( + HTML_TAG_PATTERN.search(value) + or JS_PROTOCOL_PATTERN.search(value) + or EVENT_HANDLER_PATTERN.search(value) + ) + + +class Command(BaseCommand): + help = ( + "Replace stored HTML/JS payloads in known name/description columns " + "with a redaction marker. Default mode is --dry-run." + ) + + def add_arguments(self, parser): + parser.add_argument( + "--apply", + action="store_true", + help="Perform the replacement. Without this flag, the command only prints candidates.", + ) + + def handle(self, *args, **opts): + from django.apps import apps + + apply = opts["apply"] + total = 0 + replaced = 0 + + for app_label, model_name, column in TARGETS: + try: + model = apps.get_model(app_label, model_name) + except LookupError: + self.stderr.write(f"[skip] {app_label}.{model_name} not installed") + continue + + qs = model.objects.exclude(**{f"{column}__isnull": True}).exclude( + **{column: ""} + ) + matches: list[tuple[object, str]] = [] + for obj in qs.only("id", column).iterator(): + value = getattr(obj, column) + if _has_payload(value): + matches.append((obj.id, value)) + + if not matches: + continue + + label = f"{app_label}.{model_name}.{column}" + self.stdout.write(f"\n=== {label}: {len(matches)} match(es) ===") + for obj_id, value in matches: + preview = value[:PREVIEW_CHARS].replace("\n", " ") + self.stdout.write(f" id={obj_id} value={preview!r}") + total += len(matches) + + if apply: + with transaction.atomic(): + updated = model.objects.filter( + id__in=[obj_id for obj_id, _ in matches] + ).update(**{column: REPLACEMENT}) + replaced += updated + self.stdout.write( + self.style.SUCCESS(f" applied: replaced {updated} row(s)") + ) + + self.stdout.write("") + if apply: + self.stdout.write( + self.style.SUCCESS( + f"Done. Replaced {replaced} row(s) across {len(TARGETS)} target column(s)." + ) + ) + else: + self.stdout.write( + f"Dry run. {total} candidate row(s) across {len(TARGETS)} target column(s). " + "Re-run with --apply to perform the replacement." + ) diff --git a/backend/utils/input_sanitizer.py b/backend/utils/input_sanitizer.py index 36772727e6..01e484d7c8 100644 --- a/backend/utils/input_sanitizer.py +++ b/backend/utils/input_sanitizer.py @@ -1,7 +1,10 @@ +import logging import re from rest_framework.serializers import ValidationError +logger = logging.getLogger(__name__) + # Pattern to detect HTML/script tags (closed tags and unclosed tags starting with a letter) # The second alternative catches unclosed tags like " None: + logger.warning( + "input_validation_rejected", + extra={"field": field_name, "reason": reason}, + ) + raise ValidationError(message) + + def validate_no_html_tags(value: str, field_name: str = "This field") -> str: """Reject values containing HTML/script tags.""" if HTML_TAG_PATTERN.search(value): - raise ValidationError(f"{field_name} must not contain HTML or script tags.") + _reject( + field_name, + "html_tag", + f"{field_name} must not contain HTML or script tags.", + ) if JS_PROTOCOL_PATTERN.search(value): - raise ValidationError(f"{field_name} must not contain dangerous URI protocols.") + _reject( + field_name, + "js_protocol", + f"{field_name} must not contain dangerous URI protocols.", + ) if EVENT_HANDLER_PATTERN.search(value): - raise ValidationError(f"{field_name} must not contain event handler attributes.") + _reject( + field_name, + "event_handler", + f"{field_name} must not contain event handler attributes.", + ) return value diff --git a/backend/utils/serializer/__init__.py b/backend/utils/serializer/__init__.py new file mode 100644 index 0000000000..0f2b61acb5 --- /dev/null +++ b/backend/utils/serializer/__init__.py @@ -0,0 +1,15 @@ +from utils.serializer.integrity_error_mixin import IntegrityErrorMixin +from utils.serializer.sanitization import ( + HyperlinkedModelSerializer, + ModelSerializer, + SanitizedSerializerMixin, + Serializer, +) + +__all__ = [ + "HyperlinkedModelSerializer", + "IntegrityErrorMixin", + "ModelSerializer", + "SanitizedSerializerMixin", + "Serializer", +] diff --git a/backend/utils/serializer/sanitization.py b/backend/utils/serializer/sanitization.py new file mode 100644 index 0000000000..3c53ed73d4 --- /dev/null +++ b/backend/utils/serializer/sanitization.py @@ -0,0 +1,55 @@ +"""Sanitized serializer base classes. + +`SanitizedSerializerMixin` attaches `validate_no_html_tags` to every writable +`CharField` on the serializer, so new serializers are protected from stored +XSS without per-field wiring. Opt out per-serializer via +`Meta.html_safe_fields = (...)` for fields that legitimately accept HTML-like +content (e.g. prompt text, regex literals). + +Use the pre-mixed `ModelSerializer` / `Serializer` / `HyperlinkedModelSerializer` +classes instead of importing from `rest_framework` directly: + + from utils.serializer import ModelSerializer + + class FooSerializer(ModelSerializer): + class Meta: + model = Foo + fields = ["name", "description", "prompt"] + html_safe_fields = ("prompt",) # opt-out +""" + +from functools import partial + +from rest_framework import serializers as drf + +from utils.input_sanitizer import validate_no_html_tags + + +class SanitizedSerializerMixin: + """Attach `validate_no_html_tags` to every writable CharField.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + meta = getattr(self, "Meta", None) + exempt = set(getattr(meta, "html_safe_fields", ()) or ()) + for name, field in self.fields.items(): + if name in exempt or field.read_only: + continue + if isinstance(field, drf.CharField): + # partial binds the field name at iteration time, avoiding the + # late-binding closure trap of a bare lambda. + field.validators.append(partial(validate_no_html_tags, field_name=name)) + + +class ModelSerializer(SanitizedSerializerMixin, drf.ModelSerializer): + pass + + +class Serializer(SanitizedSerializerMixin, drf.Serializer): + pass + + +class HyperlinkedModelSerializer( + SanitizedSerializerMixin, drf.HyperlinkedModelSerializer +): + pass diff --git a/backend/utils/tests/test_sanitized_serializer_mixin.py b/backend/utils/tests/test_sanitized_serializer_mixin.py new file mode 100644 index 0000000000..20f4833a2a --- /dev/null +++ b/backend/utils/tests/test_sanitized_serializer_mixin.py @@ -0,0 +1,113 @@ +import pytest +from rest_framework import serializers as drf +from rest_framework.exceptions import ValidationError + +from utils.serializer import ( + HyperlinkedModelSerializer, + ModelSerializer, + SanitizedSerializerMixin, + Serializer, +) + + +class PlainSerializer(Serializer): + name = drf.CharField(max_length=100) + description = drf.CharField(max_length=500, allow_blank=True) + count = drf.IntegerField(required=False) + + +class WithOptOutSerializer(Serializer): + name = drf.CharField(max_length=100) + prompt = drf.CharField() + + class Meta: + html_safe_fields = ("prompt",) + + +class WithReadOnlySerializer(Serializer): + name = drf.CharField(max_length=100) + computed = drf.CharField(read_only=True) + + +class TestSanitizedSerializerMixin: + def test_rejects_html_in_writable_charfield(self): + s = PlainSerializer(data={"name": "", "description": ""}) + assert not s.is_valid() + assert "name" in s.errors + + def test_rejects_html_in_description(self): + s = PlainSerializer(data={"name": "ok", "description": ""}) + assert not s.is_valid() + assert "description" in s.errors + + def test_clean_input_passes(self): + s = PlainSerializer(data={"name": "My Workflow", "description": "Plain text."}) + assert s.is_valid(), s.errors + + def test_does_not_touch_non_charfield(self): + s = PlainSerializer(data={"name": "ok", "description": "", "count": 42}) + assert s.is_valid(), s.errors + + def test_each_field_gets_its_own_validator(self): + """Default-arg closure capture: the field_name in the error must match the offender.""" + s = PlainSerializer(data={"name": "ok", "description": ""}) + assert not s.is_valid() + assert "description" in s.errors + msg = str(s.errors["description"][0]) + assert "description" in msg.lower() + + def test_html_safe_fields_opts_out(self): + s = WithOptOutSerializer( + data={"name": "ok", "prompt": "step 1"} + ) + assert s.is_valid(), s.errors + + def test_html_safe_fields_does_not_leak_to_other_fields(self): + s = WithOptOutSerializer( + data={"name": "