diff --git a/sqlx-core/src/query.rs b/sqlx-core/src/query.rs index d549e3ca99..83b8d85fb7 100644 --- a/sqlx-core/src/query.rs +++ b/sqlx-core/src/query.rs @@ -370,7 +370,7 @@ where } /// Execute the query and return the generated results as a stream. - pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result> + pub fn fetch<'e, 'c: 'e, E>(mut self, executor: E) -> BoxStream<'e, Result> where 'q: 'e, E: 'e + Executor<'c, Database = DB>, @@ -378,16 +378,9 @@ where F: 'e, O: 'e, { - // FIXME: this should have used `executor.fetch()` but that's a breaking change - // because this technically allows multiple statements in one query string. - #[allow(deprecated)] - self.fetch_many(executor) - .try_filter_map(|step| async move { - Ok(match step { - Either::Left(_) => None, - Either::Right(o) => Some(o), - }) - }) + executor + .fetch(self.inner) + .map(move |row| (self.mapper)(row?)) .boxed() } diff --git a/tests/sqlite/sqlite.rs b/tests/sqlite/sqlite.rs index d8f8ee492c..cba55203bc 100644 --- a/tests/sqlite/sqlite.rs +++ b/tests/sqlite/sqlite.rs @@ -1438,3 +1438,55 @@ async fn issue_3982() -> anyhow::Result<()> { Ok(()) } + +// Regression test for https://github.com/launchbadge/sqlx/issues/4126 +// Map::fetch() stream must continue past row-level mapper errors instead +// of terminating the entire stream at the first error. +#[sqlx_macros::test] +async fn it_fetch_stream_continues_past_mapper_error() -> anyhow::Result<()> { + use futures_util::StreamExt; + + let mut conn = new::().await?; + + // Insert rows: values 1..=5 + sqlx::query("CREATE TEMPORARY TABLE test_stream (val INTEGER NOT NULL)") + .execute(&mut conn) + .await?; + for i in 1..=5i32 { + sqlx::query("INSERT INTO test_stream (val) VALUES (?)") + .bind(i) + .execute(&mut conn) + .await?; + } + + // Use try_map with a mapper that fails on val == 3 + let mut stream = sqlx::query("SELECT val FROM test_stream ORDER BY val") + .try_map(|row: SqliteRow| { + let val: i32 = row.try_get("val")?; + if val == 3 { + return Err(sqlx::Error::Protocol(format!("bad row: {val}"))); + } + Ok(val) + }) + .fetch(&mut conn); + + let mut successes = Vec::new(); + let mut errors = 0; + + while let Some(result) = stream.next().await { + match result { + Ok(val) => successes.push(val), + Err(_) => errors += 1, + } + } + + // The stream should yield all 5 items: 4 successes + 1 error + assert_eq!( + successes, + vec![1, 2, 4, 5], + "stream should continue past the error for val=3" + ); + assert_eq!(errors, 1, "there should be exactly one error"); + + Ok(()) +}