diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f3a63bff..78b6b3eb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,6 +43,8 @@ repos: types-pyYAML, types-jsonschema, "sentry-kafka-schemas>=1.2.0", + "types-protobuf", + "sentry-protos>=0.1.74" ] files: ^sentry_streams/.+ - repo: https://github.com/pycqa/isort diff --git a/sentry_streams/pyproject.toml b/sentry_streams/pyproject.toml index f79447a3..852c43fa 100644 --- a/sentry_streams/pyproject.toml +++ b/sentry_streams/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "pyyaml>=6.0.2", "jsonschema>=4.23.0", "sentry-kafka-schemas>=1.2.0", + "types-protobuf>=6.30.2.20250516", ] [dependency-groups] diff --git a/sentry_streams/sentry_streams/examples/batching.py b/sentry_streams/sentry_streams/examples/batching.py index 4bc55cdb..ad07fefc 100644 --- a/sentry_streams/sentry_streams/examples/batching.py +++ b/sentry_streams/sentry_streams/examples/batching.py @@ -3,6 +3,7 @@ from sentry_streams.pipeline import Batch, FlatMap, streaming_source from sentry_streams.pipeline.batch import unbatch from sentry_streams.pipeline.chain import Parser, Serializer +from sentry_streams.pipeline.message import MessageSchema pipeline = streaming_source( name="myinput", @@ -19,6 +20,6 @@ FlatMap(function=unbatch), ) -chain3 = chain2.apply("serializer", Serializer()).sink( +chain3 = chain2.apply("serializer", Serializer(schema_type=MessageSchema.JSON)).sink( "kafkasink2", stream_name="transformed-events" ) # flush the batches to the Sink diff --git a/sentry_streams/sentry_streams/examples/blq.py b/sentry_streams/sentry_streams/examples/blq.py index a13b8531..8655e1fb 100644 --- a/sentry_streams/sentry_streams/examples/blq.py +++ b/sentry_streams/sentry_streams/examples/blq.py @@ -6,10 +6,11 @@ ) from sentry_streams.pipeline import segment, streaming_source from sentry_streams.pipeline.chain import Parser, Serializer +from sentry_streams.pipeline.message import MessageSchema storage_branch = ( segment(name="recent", msg_type=IngestMetric) - .apply("serializer1", Serializer()) + .apply("serializer1", Serializer(schema_type=MessageSchema.JSON)) .broadcast( "send_message_to_DBs", routes=[ @@ -25,7 +26,7 @@ save_delayed_message = ( segment(name="delayed", msg_type=IngestMetric) - .apply("serializer2", Serializer()) + .apply("serializer2", Serializer(schema_type=MessageSchema.JSON)) .sink( "kafkasink3", stream_name="transformed-events-3", diff --git a/sentry_streams/sentry_streams/examples/multi_chain.py b/sentry_streams/sentry_streams/examples/multi_chain.py index 22ae3060..739ab91f 100644 --- a/sentry_streams/sentry_streams/examples/multi_chain.py +++ b/sentry_streams/sentry_streams/examples/multi_chain.py @@ -2,7 +2,7 @@ from sentry_streams.pipeline import Map, multi_chain, streaming_source from sentry_streams.pipeline.chain import Parser, Serializer -from sentry_streams.pipeline.message import Message +from sentry_streams.pipeline.message import Message, MessageSchema def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]: @@ -16,12 +16,12 @@ def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]: streaming_source("ingest", stream_name="ingest-metrics") .apply("parse_msg", Parser(msg_type=IngestMetric)) .apply("process", Map(do_something)) - .apply("serialize", Serializer()) + .apply("serialize", Serializer(schema_type=MessageSchema.JSON)) .sink("eventstream", stream_name="events"), # Snuba chain to Clickhouse streaming_source("snuba", stream_name="ingest-metrics") .apply("snuba_parse_msg", Parser(msg_type=IngestMetric)) - .apply("snuba_serialize", Serializer()) + .apply("snuba_serialize", Serializer(schema_type=MessageSchema.JSON)) .sink( "clickhouse", stream_name="someewhere", @@ -29,7 +29,7 @@ def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]: # Super Big Consumer chain streaming_source("sbc", stream_name="ingest-metrics") .apply("sbc_parse_msg", Parser(msg_type=IngestMetric)) - .apply("sbc_serialize", Serializer()) + .apply("sbc_serialize", Serializer(schema_type=MessageSchema.JSON)) .sink( "sbc_sink", stream_name="someewhere", @@ -38,7 +38,7 @@ def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]: streaming_source("post_process", stream_name="ingest-metrics") .apply("post_parse_msg", Parser(msg_type=IngestMetric)) .apply("postprocess", Map(do_something)) - .apply("postprocess_serialize", Serializer()) + .apply("postprocess_serialize", Serializer(schema_type=MessageSchema.JSON)) .sink( "devnull", stream_name="someewhereelse", diff --git a/sentry_streams/sentry_streams/examples/transformer.py b/sentry_streams/sentry_streams/examples/transformer.py index 7ed6883b..f33e67c3 100644 --- a/sentry_streams/sentry_streams/examples/transformer.py +++ b/sentry_streams/sentry_streams/examples/transformer.py @@ -10,7 +10,7 @@ Serializer, ) from sentry_streams.pipeline.function_template import Accumulator -from sentry_streams.pipeline.message import Message +from sentry_streams.pipeline.message import Message, MessageSchema from sentry_streams.pipeline.window import SlidingWindow # The simplest possible pipeline. @@ -59,7 +59,7 @@ def merge(self, other: Self) -> Self: chain3 = chain2.apply( "serializer", - Serializer(), # pass in the standard message serializer function + Serializer(schema_type=MessageSchema.JSON), # pass in the standard message serializer function ) # ExtensibleChain[bytes] chain4 = chain3.sink( diff --git a/sentry_streams/sentry_streams/pipeline/chain.py b/sentry_streams/sentry_streams/pipeline/chain.py index 3cbad9ce..10682fc1 100644 --- a/sentry_streams/sentry_streams/pipeline/chain.py +++ b/sentry_streams/sentry_streams/pipeline/chain.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass +from functools import partial from typing import ( Callable, Generic, @@ -23,7 +24,7 @@ InputType, OutputType, ) -from sentry_streams.pipeline.message import Message +from sentry_streams.pipeline.message import Message, MessageSchema from sentry_streams.pipeline.msg_parser import msg_parser, msg_serializer from sentry_streams.pipeline.pipeline import ( Aggregate, @@ -151,12 +152,14 @@ class Serializer(Applier[Message[TIn], bytes], Generic[TIn]): sink step which writes to Kafka. """ + schema_type: MessageSchema + def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: return MapStep( name=name, ctx=ctx, inputs=[previous], - function=msg_serializer, + function=partial(msg_serializer, schema_type=self.schema_type), ) diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index 9b652015..eb990eb4 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass +from enum import Enum from typing import ( Any, Generic, @@ -15,6 +16,11 @@ TIn = TypeVar("TIn") # TODO: Consider naming this TPayload +class MessageSchema(Enum): + PROTOBUF = "protobuf" + JSON = "json" + + # A message with a generic payload @dataclass(frozen=True) class Message(Generic[TIn]): diff --git a/sentry_streams/sentry_streams/pipeline/msg_parser.py b/sentry_streams/sentry_streams/pipeline/msg_parser.py index 9e068818..937b3717 100644 --- a/sentry_streams/sentry_streams/pipeline/msg_parser.py +++ b/sentry_streams/sentry_streams/pipeline/msg_parser.py @@ -1,7 +1,9 @@ import json from typing import Any -from sentry_streams.pipeline.message import Message +from google.protobuf.message import Message as ProtoMessage + +from sentry_streams.pipeline.message import Message, MessageSchema # TODO: Push the following to docs # Standard message decoders and encoders live here @@ -21,7 +23,13 @@ def msg_parser(msg: Message[bytes]) -> Any: return decoded -def msg_serializer(msg: Message[Any]) -> bytes: +def msg_serializer(msg: Message[Any], schema_type: MessageSchema) -> bytes: payload = msg.payload - return json.dumps(payload).encode("utf-8") + if schema_type is MessageSchema.PROTOBUF: + assert isinstance(payload, ProtoMessage) + return payload.SerializeToString() + elif schema_type is MessageSchema.JSON: + return json.dumps(payload).encode("utf-8") + else: + raise Exception(f"Unknown codec / message schema type {schema_type}") diff --git a/sentry_streams/tests/adapters/arroyo/conftest.py b/sentry_streams/tests/adapters/arroyo/conftest.py index 65f594af..1e7ac6ff 100644 --- a/sentry_streams/tests/adapters/arroyo/conftest.py +++ b/sentry_streams/tests/adapters/arroyo/conftest.py @@ -8,6 +8,7 @@ from arroyo.types import Topic from arroyo.utils.clock import MockedClock from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric +from sentry_protos.sentry.v1.taskworker_pb2 import TaskActivation from sentry_streams.pipeline.chain import ( Filter, @@ -19,13 +20,10 @@ streaming_source, ) from sentry_streams.pipeline.function_template import Accumulator -from sentry_streams.pipeline.message import Message +from sentry_streams.pipeline.message import Message, MessageSchema from sentry_streams.pipeline.pipeline import Pipeline from sentry_streams.pipeline.window import SlidingWindow -# def decode(msg: bytes) -> str: -# return msg.decode("utf-8") - def basic_map(msg: Message[IngestMetric]) -> IngestMetric: payload = msg.payload @@ -42,6 +40,7 @@ def broker() -> LocalBroker[KafkaPayload]: broker.create_topic(Topic("transformed-events"), 1) broker.create_topic(Topic("transformed-events-2"), 1) broker.create_topic(Topic("ingest-metrics"), 1) + broker.create_topic(Topic("taskworker-output"), 1) return broker @@ -112,7 +111,7 @@ def pipeline() -> Pipeline: .apply("decoder", Parser(msg_type=IngestMetric)) .apply("myfilter", Filter(lambda msg: msg.payload["type"] == "s")) .apply("mymap", Map(basic_map)) - .apply("serializer", Serializer()) + .apply("serializer", Serializer(schema_type=MessageSchema.JSON)) .sink("kafkasink", stream_name="transformed-events") ) @@ -130,7 +129,7 @@ def reduce_pipeline(transformer: Callable[[], TestTransformerBatch]) -> Pipeline .apply("decoder", Parser(msg_type=IngestMetric)) .apply("mymap", Map(basic_map)) .apply("myreduce", Reducer(reduce_window, transformer)) - .apply("serializer", Serializer()) + .apply("serializer", Serializer(schema_type=MessageSchema.JSON)) .sink("kafkasink", stream_name="transformed-events") ) @@ -141,12 +140,12 @@ def reduce_pipeline(transformer: Callable[[], TestTransformerBatch]) -> Pipeline def router_pipeline() -> Pipeline: branch_1 = ( segment("set_branch", IngestMetric) - .apply("serializer", Serializer()) + .apply("serializer", Serializer(schema_type=MessageSchema.JSON)) .sink("kafkasink1", stream_name="transformed-events") ) branch_2 = ( segment("not_set_branch", IngestMetric) - .apply("serializer2", Serializer()) + .apply("serializer2", Serializer(schema_type=MessageSchema.JSON)) .sink("kafkasink2", stream_name="transformed-events-2") ) @@ -174,13 +173,13 @@ def broadcast_pipeline() -> Pipeline: branch_1 = ( segment("even_branch", IngestMetric) .apply("mymap1", Map(basic_map)) - .apply("serializer", Serializer()) + .apply("serializer", Serializer(schema_type=MessageSchema.JSON)) .sink("kafkasink1", stream_name="transformed-events") ) branch_2 = ( segment("odd_branch", IngestMetric) .apply("mymap2", Map(basic_map)) - .apply("serializer2", Serializer()) + .apply("serializer2", Serializer(schema_type=MessageSchema.JSON)) .sink("kafkasink2", stream_name="transformed-events-2") ) @@ -200,3 +199,32 @@ def broadcast_pipeline() -> Pipeline: ) return pipeline + + +@pytest.fixture +def basic_proto_pipeline() -> Pipeline: + + pipeline = streaming_source( + name="myinput", stream_name="taskworker-ingest" + ) # ExtensibleChain[Message[bytes]] + + chain1 = pipeline.apply( + "parser", + Parser( + msg_type=TaskActivation, + ), # pass in the standard message parser function + ) # ExtensibleChain[Message[TaskActivation]] + + chain2 = chain1.apply( + "serializer", + Serializer( + schema_type=MessageSchema.PROTOBUF + ), # pass in the standard message serializer function + ) # ExtensibleChain[bytes] + + chain2.sink( + "kafkasink2", + stream_name="taskworker-output", + ) # Chain + + return pipeline diff --git a/sentry_streams/tests/adapters/arroyo/message_helpers.py b/sentry_streams/tests/adapters/arroyo/message_helpers.py index 67f5018a..3b843a96 100644 --- a/sentry_streams/tests/adapters/arroyo/message_helpers.py +++ b/sentry_streams/tests/adapters/arroyo/message_helpers.py @@ -67,13 +67,13 @@ def make_value_msg( def make_kafka_msg( - payload: str, + payload: bytes, topic: str, offset: int, ) -> Message[Any]: return Message( BrokerValue( - payload=KafkaPayload(None, payload.encode("utf-8"), []), + payload=KafkaPayload(None, payload, []), partition=Partition(Topic(topic), 0), offset=offset, timestamp=datetime.now(), diff --git a/sentry_streams/tests/adapters/arroyo/test_consumer.py b/sentry_streams/tests/adapters/arroyo/test_consumer.py index e7db0dce..c3490fe1 100644 --- a/sentry_streams/tests/adapters/arroyo/test_consumer.py +++ b/sentry_streams/tests/adapters/arroyo/test_consumer.py @@ -90,11 +90,11 @@ def test_single_route( counter_metric = deepcopy(metric) counter_metric["type"] = "c" - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 0)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 0)) strategy.poll() - strategy.submit(make_kafka_msg(json.dumps(counter_metric), "ingest-metrics", 2)) + strategy.submit(make_kafka_msg(json.dumps(counter_metric).encode("utf-8"), "ingest-metrics", 2)) strategy.poll() - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 3)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 3)) strategy.poll() topic = Topic("transformed-events") @@ -186,11 +186,11 @@ def test_broadcast( commit = mock.Mock(spec=Commit) strategy = factory.create_with_partitions(commit, {Partition(Topic("ingest-metrics"), 0): 0}) - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 0)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 0)) strategy.poll() - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 2)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 2)) strategy.poll() - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 3)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 3)) strategy.poll() topics = [Topic("transformed-events"), Topic("transformed-events-2")] @@ -265,11 +265,11 @@ def test_multiple_routes( counter_metric = deepcopy(metric) counter_metric["type"] = "c" - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 0)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 0)) strategy.poll() - strategy.submit(make_kafka_msg(json.dumps(counter_metric), "ingest-metrics", 2)) + strategy.submit(make_kafka_msg(json.dumps(counter_metric).encode("utf-8"), "ingest-metrics", 2)) strategy.poll() - strategy.submit(make_kafka_msg(json.dumps(metric), "ingest-metrics", 3)) + strategy.submit(make_kafka_msg(json.dumps(metric).encode("utf-8"), "ingest-metrics", 3)) strategy.poll() topic = Topic("transformed-events") # for set messages @@ -360,7 +360,9 @@ def test_standard_reduce( # Accumulators: [0,1] [2,3] [4,5] [6,7] [8,9] for i in range(6): with mock.patch("time.time", return_value=cur_time + 2 * i): - strategy.submit(make_kafka_msg(json.dumps(messages[i]), "ingest-metrics", i)) + strategy.submit( + make_kafka_msg(json.dumps(messages[i]).encode("utf-8"), "ingest-metrics", i) + ) # Last submit was at T+10, which means we've only flushed the first 3 windows @@ -418,7 +420,9 @@ def test_standard_reduce( # Submit data at T+24, T+26 (data comes in at a gap) for i in range(12, 14): with mock.patch("time.time", return_value=cur_time + 2 * i): - strategy.submit(make_kafka_msg(json.dumps(messages[i - 12]), "ingest-metrics", i)) + strategy.submit( + make_kafka_msg(json.dumps(messages[i - 12]).encode("utf-8"), "ingest-metrics", i) + ) transformed_msgs = [] for i in range(12, 14): @@ -563,7 +567,9 @@ def test_reduce_with_gap( # Accumulators: [0,1] [2,3] [4,5] [6,7] [8,9] for i in range(6): with mock.patch("time.time", return_value=cur_time + 2 * i): - strategy.submit(make_kafka_msg(json.dumps(messages[i]), "ingest-metrics", i)) + strategy.submit( + make_kafka_msg(json.dumps(messages[i]).encode("utf-8"), "ingest-metrics", i) + ) # Last submit was at T+10, which means we've only flushed the first 3 windows diff --git a/sentry_streams/tests/adapters/arroyo/test_proto_pipelines.py b/sentry_streams/tests/adapters/arroyo/test_proto_pipelines.py new file mode 100644 index 00000000..59569109 --- /dev/null +++ b/sentry_streams/tests/adapters/arroyo/test_proto_pipelines.py @@ -0,0 +1,92 @@ +from typing import cast +from unittest import mock +from unittest.mock import call + +from arroyo.backends.kafka.consumer import KafkaPayload +from arroyo.backends.local.backend import LocalBroker +from arroyo.types import Commit, Partition, Topic +from sentry_kafka_schemas import get_codec +from sentry_protos.sentry.v1.taskworker_pb2 import TaskActivation + +from sentry_streams.adapters.arroyo.consumer import ( + ArroyoConsumer, + ArroyoStreamingFactory, +) +from sentry_streams.adapters.arroyo.routes import Route +from sentry_streams.adapters.arroyo.steps import MapStep, StreamSinkStep +from sentry_streams.pipeline.message import Message, MessageSchema +from sentry_streams.pipeline.msg_parser import msg_serializer +from sentry_streams.pipeline.pipeline import Map, Pipeline +from tests.adapters.arroyo.message_helpers import make_kafka_msg + +PROTO_SCHEMA = get_codec("taskworker-ingest") +ACTIVATION_MSG = TaskActivation() + + +def test_msg_serializer() -> None: + activation_msg = ACTIVATION_MSG + msg = Message(payload=activation_msg, headers=[], timestamp=123.0, schema=PROTO_SCHEMA) + serialized = msg_serializer(msg, MessageSchema.PROTOBUF) + + assert serialized == activation_msg.SerializeToString() + + +def test_simple_pipeline( + broker: LocalBroker[KafkaPayload], + basic_proto_pipeline: Pipeline, +) -> None: + """ + Test the creation of an Arroyo Consumer from a number of + pipeline steps. + """ + route = Route(source="source1", waypoints=[]) + + consumer = ArroyoConsumer( + source="source1", stream_name="taskworker-ingest", schema=PROTO_SCHEMA + ) + consumer.add_step( + MapStep( + route=route, + pipeline_step=cast(Map, basic_proto_pipeline.steps["parser"]), + ) + ) + consumer.add_step( + MapStep( + route=route, + pipeline_step=cast(Map, basic_proto_pipeline.steps["serializer"]), + ) + ) + consumer.add_step( + StreamSinkStep( + route=route, + producer=broker.get_producer(), + topic_name="taskworker-output", + ) + ) + + factory = ArroyoStreamingFactory(consumer) + commit = mock.Mock(spec=Commit) + strategy = factory.create_with_partitions(commit, {Partition(Topic("taskworker-ingest"), 0): 0}) + + strategy.submit(make_kafka_msg(ACTIVATION_MSG.SerializeToString(), "taskworker-ingest", 0)) + strategy.poll() + strategy.submit(make_kafka_msg(ACTIVATION_MSG.SerializeToString(), "taskworker-ingest", 1)) + strategy.poll() + + topic = Topic("taskworker-output") + msg1 = broker.consume(Partition(topic, 0), 0) + assert msg1 is not None and msg1.payload.value == ACTIVATION_MSG.SerializeToString() + msg2 = broker.consume(Partition(topic, 0), 1) + assert msg2 is not None and msg2.payload.value == ACTIVATION_MSG.SerializeToString() + assert broker.consume(Partition(topic, 0), 2) is None + + commit.assert_has_calls( + [ + call({}), + call({Partition(Topic("taskworker-ingest"), 0): 1}), + call({}), + call({}), + call({Partition(Topic("taskworker-ingest"), 0): 2}), + call({}), + ], + ) diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index d101d00d..c385ba67 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -743,6 +743,7 @@ dependencies = [ { name = "requests" }, { name = "sentry-arroyo" }, { name = "sentry-kafka-schemas" }, + { name = "types-protobuf" }, ] [package.dev-dependencies] @@ -769,6 +770,7 @@ requires-dist = [ { name = "requests", specifier = ">=2.32.3" }, { name = "sentry-arroyo", specifier = ">=2.18.2" }, { name = "sentry-kafka-schemas", specifier = ">=1.2.0" }, + { name = "types-protobuf", specifier = ">=6.30.2.20250516" }, ] [package.metadata.requires-dev] @@ -930,6 +932,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/91/64/4b2fba8b7cb0104ba013f2a1bf6f39a98e927e14befe1ef947d373b25218/types_jsonschema-4.23.0.20241208-py3-none-any.whl", hash = "sha256:87934bd9231c99d8eff94cacfc06ba668f7973577a9bd9e1f9de957c5737313e", size = 15021 }, ] +[[package]] +name = "types-protobuf" +version = "6.30.2.20250516" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ac/6c/5cf088aaa3927d1cc39910f60f220f5ff573ab1a6485b2836e8b26beb58c/types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41", size = 62254 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/66/06a9c161f5dd5deb4f5c016ba29106a8f1903eb9a1ba77d407dd6588fecb/types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722", size = 76480 }, +] + [[package]] name = "types-pyyaml" version = "6.0.12.20250402"