From 811b716aa73d5e4df12ed7b928d2efe1095e53af Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 27 Aug 2025 13:02:56 -0400 Subject: [PATCH 1/5] fix(python_adapter): retrieve messages as Sequence --- .../adapters/arroyo/rust_step.py | 19 ++++++++-- sentry_streams/src/python_operator.rs | 38 +++++++++++++++++++ sentry_streams/uv.lock | 2 +- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_step.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_step.py index 07533827..a312651f 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_step.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_step.py @@ -1,5 +1,13 @@ from abc import ABC, abstractmethod -from typing import Callable, Generic, Iterable, MutableSequence, Set, Tuple, TypeVar +from typing import ( + Callable, + Generic, + Iterable, + MutableSequence, + Set, + Tuple, + TypeVar, +) from arroyo.dlq import InvalidMessage from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy @@ -299,7 +307,7 @@ def __should_send_watermark(self, watermark: PyWatermark) -> bool: return False return True - def __yield_messages(self) -> Iterable[Tuple[PipelineMessage, Committable]]: + def __yield_messages(self) -> MutableSequence[Tuple[PipelineMessage, Committable]]: """ Yields messages polled from the OutputRetriever, as well as any stored watermarks that can be sent after each message. @@ -310,14 +318,17 @@ def __yield_messages(self) -> Iterable[Tuple[PipelineMessage, Committable]]: Currently, if no new messages are received, watermarks will not be sent further down the pipeline from a delegate. """ # TODO: ensure watermarks leave the delegate in the same order they entered it + ret: MutableSequence[Tuple[PipelineMessage, Committable]] = [] + for message, committable in self.__retriever.fetch(): - yield (message, committable) + ret.append((message, committable)) self.__globbed_committable.update(committable) watermarks = self.__watermarks.copy() for wm in watermarks: if self.__should_send_watermark(wm): - yield (wm, wm.committable) + ret.append((wm, wm.committable)) self.__watermarks.remove(wm) + return ret def submit(self, message: PipelineMessage, committable: Committable) -> None: if isinstance(message, PyWatermark): diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 05d08ea8..430c2e71 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -545,4 +545,42 @@ class RustOperatorDelegateFactory: } // Unlock the MutexGuard around `actual_messages` }) } + + #[test] + fn test_arroyo_python_delegate() { + crate::testutils::initialize_python(); + traced_with_gil!(|py| { + let delegate = c_str!( + r#" +from sentry_streams.adapters.arroyo.rust_step import ArroyoStrategyDelegate, OutputRetriever +from arroyo.processing.strategies.run_task import RunTask + +class TestDelegateFactory: + def build(self): + in_noop = lambda msg, committable: msg + out_noop = lambda msg: (msg, {}) + retriever = OutputRetriever(out_transformer=out_noop) + inner = RunTask(lambda x: x, retriever) + return ArroyoStrategyDelegate(inner,in_noop,retriever) + "# + ); + let scope = PyModule::new(py, "test_scope").unwrap(); + py.run(delegate, Some(&scope.dict()), None).unwrap(); + let operator = scope.getattr("TestDelegateFactory").unwrap(); + let instance = operator.call0().unwrap(); + + let mut operator = PythonAdapter::new( + Route::new("source1".to_string(), vec!["waypoint1".to_string()]), + instance.unbind(), + Box::new(Noop {}), + ); + + let message = make_msg(py, "ok"); + let res = operator.submit(message); + assert!(res.is_ok()); + + let commit_request = operator.poll(); + assert!(commit_request.is_ok()); + }) + } } diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index 019e1c29..aba7b713 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -893,7 +893,7 @@ wheels = [ [[package]] name = "sentry-streams" -version = "0.0.26" +version = "0.0.27" source = { editable = "." } dependencies = [ { name = "click" }, From 3a6f3cdf1c3652069991264f1058d23ff035a417 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:37:50 -0400 Subject: [PATCH 2/5] better in/out transformers --- sentry_streams/src/python_operator.rs | 35 +++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 430c2e71..c22449d5 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -552,16 +552,41 @@ class RustOperatorDelegateFactory: traced_with_gil!(|py| { let delegate = c_str!( r#" -from sentry_streams.adapters.arroyo.rust_step import ArroyoStrategyDelegate, OutputRetriever from arroyo.processing.strategies.run_task import RunTask +from arroyo.types import Message as ArroyoMessage +from arroyo.types import Partition, Topic, Value +from sentry_streams.adapters.arroyo.rust_step import ArroyoStrategyDelegate, OutputRetriever +from sentry_streams.adapters.arroyo.multi_process_delegate import mapped_msg_to_rust +from sentry_streams.pipeline.message import PyMessage class TestDelegateFactory: + def rust_to_arroyo_msg(self, message, committable): + arroyo_committable = { + Partition(Topic(partition[0]), partition[1]): offset + for partition, offset in committable.items() + } + arroyo_msg = PyMessage( + message.payload, message.headers, message.timestamp, message.schema + ) + return ArroyoMessage( + Value( + arroyo_msg, + arroyo_committable, + datetime.fromtimestamp(message.timestamp) if message.timestamp else None, + ) + ) + + def arroyo_msg_to_rust(self, message): + committable = { + (partition.topic.name, partition.index): offset + for partition, offset in message.committable.items() + } + return (message.payload.to_inner(), committable) + def build(self): - in_noop = lambda msg, committable: msg - out_noop = lambda msg: (msg, {}) - retriever = OutputRetriever(out_transformer=out_noop) + retriever = OutputRetriever(out_transformer=self.arroyo_msg_to_rust) inner = RunTask(lambda x: x, retriever) - return ArroyoStrategyDelegate(inner,in_noop,retriever) + return ArroyoStrategyDelegate(inner,self.rust_to_arroyo_msg,retriever) "# ); let scope = PyModule::new(py, "test_scope").unwrap(); From 6af3a3268c1a048fd51b945f5583457f46e92db4 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Fri, 29 Aug 2025 15:52:13 -0400 Subject: [PATCH 3/5] port python code over to python --- .../adapters/arroyo/test_delegate.py | 65 +++++++++++++++++++ sentry_streams/src/python_operator.rs | 38 +---------- 2 files changed, 68 insertions(+), 35 deletions(-) create mode 100644 sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py diff --git a/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py new file mode 100644 index 00000000..2b913c07 --- /dev/null +++ b/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py @@ -0,0 +1,65 @@ +from datetime import datetime +from typing import Tuple, Union, cast + +from arroyo.processing.strategies.run_task import RunTask +from arroyo.types import FilteredPayload +from arroyo.types import Message as ArroyoMessage +from arroyo.types import Partition, Topic, Value + +from sentry_streams.adapters.arroyo.rust_step import ( + ArroyoStrategyDelegate, + Committable, + OutputRetriever, + RustOperatorFactory, +) +from sentry_streams.pipeline.message import Message, PyMessage, RustMessage + + +def rust_to_arroyo_msg( + message: RustMessage, committable: Committable +) -> ArroyoMessage[Message[str]]: + arroyo_committable = { + Partition(Topic(partition[0]), partition[1]): offset + for partition, offset in committable.items() + } + arroyo_msg = cast( + Message[str], PyMessage(message.payload, message.headers, message.timestamp, message.schema) + ) + return ArroyoMessage( + Value( + arroyo_msg, + arroyo_committable, + datetime.fromtimestamp(message.timestamp) if message.timestamp else None, + ) + ) + + +def arroyo_msg_to_rust( + message: ArroyoMessage[Union[FilteredPayload, Message[str]]] +) -> Tuple[RustMessage, Committable] | None: + if isinstance(message.payload, FilteredPayload): + return None + committable = { + (partition.topic.name, partition.index): offset + for partition, offset in message.committable.items() + } + return (message.payload.to_inner(), committable) + + +def noop( + msg: ArroyoMessage[Union[FilteredPayload, Message[str]]] +) -> Union[FilteredPayload, Message[str]]: + return msg.payload + + +class TestDelegateFactory(RustOperatorFactory): + """ + A delegate used in rust_streams tests for the PythonAdapter step. + """ + + def build( + self, + ) -> ArroyoStrategyDelegate[FilteredPayload | Message[str], FilteredPayload | Message[str]]: + retriever = OutputRetriever(out_transformer=arroyo_msg_to_rust) + inner = RunTask(noop, retriever) + return ArroyoStrategyDelegate(inner, rust_to_arroyo_msg, retriever) diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index c22449d5..25110d64 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -272,6 +272,7 @@ mod tests { use crate::testutils::build_routed_value; use crate::testutils::make_committable; use pyo3::ffi::c_str; + use pyo3::BoundObject; use pyo3::IntoPyObjectExt; use sentry_arroyo::processing::strategies::noop::Noop; use std::collections::{BTreeMap, HashMap}; @@ -552,41 +553,8 @@ class RustOperatorDelegateFactory: traced_with_gil!(|py| { let delegate = c_str!( r#" -from arroyo.processing.strategies.run_task import RunTask -from arroyo.types import Message as ArroyoMessage -from arroyo.types import Partition, Topic, Value -from sentry_streams.adapters.arroyo.rust_step import ArroyoStrategyDelegate, OutputRetriever -from sentry_streams.adapters.arroyo.multi_process_delegate import mapped_msg_to_rust -from sentry_streams.pipeline.message import PyMessage - -class TestDelegateFactory: - def rust_to_arroyo_msg(self, message, committable): - arroyo_committable = { - Partition(Topic(partition[0]), partition[1]): offset - for partition, offset in committable.items() - } - arroyo_msg = PyMessage( - message.payload, message.headers, message.timestamp, message.schema - ) - return ArroyoMessage( - Value( - arroyo_msg, - arroyo_committable, - datetime.fromtimestamp(message.timestamp) if message.timestamp else None, - ) - ) - - def arroyo_msg_to_rust(self, message): - committable = { - (partition.topic.name, partition.index): offset - for partition, offset in message.committable.items() - } - return (message.payload.to_inner(), committable) - - def build(self): - retriever = OutputRetriever(out_transformer=self.arroyo_msg_to_rust) - inner = RunTask(lambda x: x, retriever) - return ArroyoStrategyDelegate(inner,self.rust_to_arroyo_msg,retriever) +from sentry_streams.rust_streams import PyAnyMessage +from sentry_streams.adapters.arroyo.test_delegate import TestDelegateFactory "# ); let scope = PyModule::new(py, "test_scope").unwrap(); From d2e113c44ecb3c4aeb55c6eaef52ae6ef61c7517 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Fri, 29 Aug 2025 16:29:40 -0400 Subject: [PATCH 4/5] AI fixes everything --- .../adapters/arroyo/test_delegate.py | 21 ++++++++----------- sentry_streams/src/messages.rs | 7 +++---- sentry_streams/src/python_operator.rs | 1 - 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py index 2b913c07..fab79d6d 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Tuple, Union, cast +from typing import Tuple, Union from arroyo.processing.strategies.run_task import RunTask from arroyo.types import FilteredPayload @@ -12,22 +12,19 @@ OutputRetriever, RustOperatorFactory, ) -from sentry_streams.pipeline.message import Message, PyMessage, RustMessage +from sentry_streams.pipeline.message import RustMessage def rust_to_arroyo_msg( message: RustMessage, committable: Committable -) -> ArroyoMessage[Message[str]]: +) -> ArroyoMessage[RustMessage]: arroyo_committable = { Partition(Topic(partition[0]), partition[1]): offset for partition, offset in committable.items() } - arroyo_msg = cast( - Message[str], PyMessage(message.payload, message.headers, message.timestamp, message.schema) - ) return ArroyoMessage( Value( - arroyo_msg, + message, arroyo_committable, datetime.fromtimestamp(message.timestamp) if message.timestamp else None, ) @@ -35,7 +32,7 @@ def rust_to_arroyo_msg( def arroyo_msg_to_rust( - message: ArroyoMessage[Union[FilteredPayload, Message[str]]] + message: ArroyoMessage[Union[FilteredPayload, RustMessage]], ) -> Tuple[RustMessage, Committable] | None: if isinstance(message.payload, FilteredPayload): return None @@ -43,12 +40,12 @@ def arroyo_msg_to_rust( (partition.topic.name, partition.index): offset for partition, offset in message.committable.items() } - return (message.payload.to_inner(), committable) + return (message.payload, committable) def noop( - msg: ArroyoMessage[Union[FilteredPayload, Message[str]]] -) -> Union[FilteredPayload, Message[str]]: + msg: ArroyoMessage[Union[FilteredPayload, RustMessage]], +) -> Union[FilteredPayload, RustMessage]: return msg.payload @@ -59,7 +56,7 @@ class TestDelegateFactory(RustOperatorFactory): def build( self, - ) -> ArroyoStrategyDelegate[FilteredPayload | Message[str], FilteredPayload | Message[str]]: + ) -> ArroyoStrategyDelegate[FilteredPayload | RustMessage, FilteredPayload | RustMessage]: retriever = OutputRetriever(out_transformer=arroyo_msg_to_rust) inner = RunTask(noop, retriever) return ArroyoStrategyDelegate(inner, rust_to_arroyo_msg, retriever) diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 320e6137..0e117c89 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -452,13 +452,12 @@ impl From> for PyStreamingMessage { fn from(value: Py) -> Self { traced_with_gil!(|py| { let bound = value.clone_ref(py).into_bound(py); - if bound.is_instance_of::() { - let content = bound.downcast::()?; + + if let Ok(content) = bound.downcast::() { Ok(PyStreamingMessage::PyAnyMessage { content: content.clone().unbind(), }) - } else if bound.is_instance_of::() { - let content = bound.downcast::()?; + } else if let Ok(content) = bound.downcast::() { Ok(PyStreamingMessage::RawMessage { content: content.clone().unbind(), }) diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 25110d64..78a75454 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -553,7 +553,6 @@ class RustOperatorDelegateFactory: traced_with_gil!(|py| { let delegate = c_str!( r#" -from sentry_streams.rust_streams import PyAnyMessage from sentry_streams.adapters.arroyo.test_delegate import TestDelegateFactory "# ); From 516f9e1351635f813e827f7b0d86411a00afd1d9 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 4 Sep 2025 15:20:23 -0400 Subject: [PATCH 5/5] clean up cursor --- sentry_streams/src/messages.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 22156a15..ff3484c0 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -453,12 +453,13 @@ impl From> for PyStreamingMessage { fn from(value: Py) -> Self { traced_with_gil!(|py| { let bound = value.clone_ref(py).into_bound(py); - - if let Ok(content) = bound.downcast::() { + if bound.is_instance_of::() { + let content = bound.downcast::()?; Ok(PyStreamingMessage::PyAnyMessage { content: content.clone().unbind(), }) - } else if let Ok(content) = bound.downcast::() { + } else if bound.is_instance_of::() { + let content = bound.downcast::()?; Ok(PyStreamingMessage::RawMessage { content: content.clone().unbind(), })