diff --git a/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py new file mode 100644 index 00000000..fab79d6d --- /dev/null +++ b/sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py @@ -0,0 +1,62 @@ +from datetime import datetime +from typing import Tuple, Union + +from arroyo.processing.strategies.run_task import RunTask +from arroyo.types import FilteredPayload +from arroyo.types import Message as ArroyoMessage +from arroyo.types import Partition, Topic, Value + +from sentry_streams.adapters.arroyo.rust_step import ( + ArroyoStrategyDelegate, + Committable, + OutputRetriever, + RustOperatorFactory, +) +from sentry_streams.pipeline.message import RustMessage + + +def rust_to_arroyo_msg( + message: RustMessage, committable: Committable +) -> ArroyoMessage[RustMessage]: + arroyo_committable = { + Partition(Topic(partition[0]), partition[1]): offset + for partition, offset in committable.items() + } + return ArroyoMessage( + Value( + message, + arroyo_committable, + datetime.fromtimestamp(message.timestamp) if message.timestamp else None, + ) + ) + + +def arroyo_msg_to_rust( + message: ArroyoMessage[Union[FilteredPayload, RustMessage]], +) -> Tuple[RustMessage, Committable] | None: + if isinstance(message.payload, FilteredPayload): + return None + committable = { + (partition.topic.name, partition.index): offset + for partition, offset in message.committable.items() + } + return (message.payload, committable) + + +def noop( + msg: ArroyoMessage[Union[FilteredPayload, RustMessage]], +) -> Union[FilteredPayload, RustMessage]: + return msg.payload + + +class TestDelegateFactory(RustOperatorFactory): + """ + A delegate used in rust_streams tests for the PythonAdapter step. + """ + + def build( + self, + ) -> ArroyoStrategyDelegate[FilteredPayload | RustMessage, FilteredPayload | RustMessage]: + retriever = OutputRetriever(out_transformer=arroyo_msg_to_rust) + inner = RunTask(noop, retriever) + return ArroyoStrategyDelegate(inner, rust_to_arroyo_msg, retriever) diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 0dc6ff54..27ac1e3b 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -283,6 +283,7 @@ mod tests { use crate::testutils::build_routed_value; use crate::testutils::make_committable; use pyo3::ffi::c_str; + use pyo3::BoundObject; use pyo3::IntoPyObjectExt; use sentry_arroyo::processing::strategies::noop::Noop; use std::collections::{BTreeMap, HashMap}; @@ -521,4 +522,33 @@ class RustOperatorDelegateFactory: } // Unlock the MutexGuard around `actual_messages` }) } + + #[test] + fn test_arroyo_python_delegate() { + crate::testutils::initialize_python(); + traced_with_gil!(|py| { + let delegate = c_str!( + r#" +from sentry_streams.adapters.arroyo.test_delegate import TestDelegateFactory + "# + ); + let scope = PyModule::new(py, "test_scope").unwrap(); + py.run(delegate, Some(&scope.dict()), None).unwrap(); + let operator = scope.getattr("TestDelegateFactory").unwrap(); + let instance = operator.call0().unwrap(); + + let mut operator = PythonAdapter::new( + Route::new("source1".to_string(), vec!["waypoint1".to_string()]), + instance.unbind(), + Box::new(Noop {}), + ); + + let message = make_msg(py, "ok"); + let res = operator.submit(message); + assert!(res.is_ok()); + + let commit_request = operator.poll(); + assert!(commit_request.is_ok()); + }) + } }