From 48dc6e45b6f17e3649fc371cef59de008ff27d29 Mon Sep 17 00:00:00 2001 From: barry3406 Date: Thu, 9 Apr 2026 13:45:04 -0700 Subject: [PATCH 1/2] fix: Map::fetch stream continues past row-level errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Map::fetch() previously used fetch_many() + try_filter_map(), which stopped the stream at the first row-level error from the mapper. This meant query_as!().fetch() would silently stop when one row failed to deserialize, instead of yielding the error and continuing. Switch to executor.fetch().map() which processes each row independently — matching the behavior of query_as().fetch() (fixed in #1887). This also resolves the FIXME noting that fetch() should have used executor.fetch() directly. Added a regression test that verifies the stream yields all rows (including errors) instead of terminating early. Fixes #4126 --- sqlx-core/src/query.rs | 15 +++------- tests/sqlite/sqlite.rs | 64 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/sqlx-core/src/query.rs b/sqlx-core/src/query.rs index d549e3ca99..83b8d85fb7 100644 --- a/sqlx-core/src/query.rs +++ b/sqlx-core/src/query.rs @@ -370,7 +370,7 @@ where } /// Execute the query and return the generated results as a stream. - pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result> + pub fn fetch<'e, 'c: 'e, E>(mut self, executor: E) -> BoxStream<'e, Result> where 'q: 'e, E: 'e + Executor<'c, Database = DB>, @@ -378,16 +378,9 @@ where F: 'e, O: 'e, { - // FIXME: this should have used `executor.fetch()` but that's a breaking change - // because this technically allows multiple statements in one query string. - #[allow(deprecated)] - self.fetch_many(executor) - .try_filter_map(|step| async move { - Ok(match step { - Either::Left(_) => None, - Either::Right(o) => Some(o), - }) - }) + executor + .fetch(self.inner) + .map(move |row| (self.mapper)(row?)) .boxed() } diff --git a/tests/sqlite/sqlite.rs b/tests/sqlite/sqlite.rs index d8f8ee492c..a151331e1c 100644 --- a/tests/sqlite/sqlite.rs +++ b/tests/sqlite/sqlite.rs @@ -1,17 +1,17 @@ use futures_util::TryStreamExt; use rand::{Rng, SeedableRng}; use rand_xoshiro::Xoshiro256PlusPlus; -use sqlx::sqlite::{SqliteConnectOptions, SqliteOperation, SqlitePoolOptions}; use sqlx::SqlSafeStr; +use sqlx::sqlite::{SqliteConnectOptions, SqliteOperation, SqlitePoolOptions}; use sqlx::{ - query, sqlite::Sqlite, sqlite::SqliteRow, Column, ConnectOptions, Connection, Executor, Row, - SqliteConnection, SqlitePool, Statement, TypeInfo, + Column, ConnectOptions, Connection, Executor, Row, SqliteConnection, SqlitePool, Statement, + TypeInfo, query, sqlite::Sqlite, sqlite::SqliteRow, }; use sqlx_sqlite::LockedSqliteHandle; use sqlx_test::new; use std::future::Future; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; #[sqlx_macros::test] async fn it_connects() -> anyhow::Result<()> { @@ -574,7 +574,7 @@ async fn it_resets_prepared_statement_after_fetch_many() -> anyhow::Result<()> { // https://github.com/launchbadge/sqlx/issues/1300 #[sqlx_macros::test] async fn concurrent_resets_dont_segfault() { - use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions}; + use sqlx::{ConnectOptions, sqlite::SqliteConnectOptions}; use std::{str::FromStr, time::Duration}; let mut conn = SqliteConnectOptions::from_str(":memory:") @@ -1398,7 +1398,7 @@ async fn it_can_recover_from_bad_transaction_begin() -> anyhow::Result<()> { } fn transaction_state(handle: &mut LockedSqliteHandle) -> SqliteTransactionState { - use libsqlite3_sys::{sqlite3_txn_state, SQLITE_TXN_NONE, SQLITE_TXN_READ, SQLITE_TXN_WRITE}; + use libsqlite3_sys::{SQLITE_TXN_NONE, SQLITE_TXN_READ, SQLITE_TXN_WRITE, sqlite3_txn_state}; let unchecked_state = unsafe { sqlite3_txn_state(handle.as_raw_handle().as_ptr(), std::ptr::null()) }; @@ -1438,3 +1438,55 @@ async fn issue_3982() -> anyhow::Result<()> { Ok(()) } + +// Regression test for https://github.com/launchbadge/sqlx/issues/4126 +// Map::fetch() stream must continue past row-level mapper errors instead +// of terminating the entire stream at the first error. +#[sqlx_macros::test] +async fn it_fetch_stream_continues_past_mapper_error() -> anyhow::Result<()> { + use futures_util::StreamExt; + + let mut conn = new::().await?; + + // Insert rows: values 1..=5 + sqlx::query("CREATE TEMPORARY TABLE test_stream (val INTEGER NOT NULL)") + .execute(&mut conn) + .await?; + for i in 1..=5i32 { + sqlx::query("INSERT INTO test_stream (val) VALUES (?)") + .bind(i) + .execute(&mut conn) + .await?; + } + + // Use try_map with a mapper that fails on val == 3 + let mut stream = sqlx::query("SELECT val FROM test_stream ORDER BY val") + .try_map(|row: SqliteRow| { + let val: i32 = row.try_get("val")?; + if val == 3 { + return Err(sqlx::Error::Protocol(format!("bad row: {val}"))); + } + Ok(val) + }) + .fetch(&mut conn); + + let mut successes = Vec::new(); + let mut errors = 0; + + while let Some(result) = stream.next().await { + match result { + Ok(val) => successes.push(val), + Err(_) => errors += 1, + } + } + + // The stream should yield all 5 items: 4 successes + 1 error + assert_eq!( + successes, + vec![1, 2, 4, 5], + "stream should continue past the error for val=3" + ); + assert_eq!(errors, 1, "there should be exactly one error"); + + Ok(()) +} From 79bf2646bc48186f13160f107eb9a18aa81a9b22 Mon Sep 17 00:00:00 2001 From: barry3406 Date: Thu, 9 Apr 2026 13:53:29 -0700 Subject: [PATCH 2/2] fix: apply rustfmt to test imports --- tests/sqlite/sqlite.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/sqlite/sqlite.rs b/tests/sqlite/sqlite.rs index a151331e1c..cba55203bc 100644 --- a/tests/sqlite/sqlite.rs +++ b/tests/sqlite/sqlite.rs @@ -1,17 +1,17 @@ use futures_util::TryStreamExt; use rand::{Rng, SeedableRng}; use rand_xoshiro::Xoshiro256PlusPlus; -use sqlx::SqlSafeStr; use sqlx::sqlite::{SqliteConnectOptions, SqliteOperation, SqlitePoolOptions}; +use sqlx::SqlSafeStr; use sqlx::{ - Column, ConnectOptions, Connection, Executor, Row, SqliteConnection, SqlitePool, Statement, - TypeInfo, query, sqlite::Sqlite, sqlite::SqliteRow, + query, sqlite::Sqlite, sqlite::SqliteRow, Column, ConnectOptions, Connection, Executor, Row, + SqliteConnection, SqlitePool, Statement, TypeInfo, }; use sqlx_sqlite::LockedSqliteHandle; use sqlx_test::new; use std::future::Future; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; #[sqlx_macros::test] async fn it_connects() -> anyhow::Result<()> { @@ -574,7 +574,7 @@ async fn it_resets_prepared_statement_after_fetch_many() -> anyhow::Result<()> { // https://github.com/launchbadge/sqlx/issues/1300 #[sqlx_macros::test] async fn concurrent_resets_dont_segfault() { - use sqlx::{ConnectOptions, sqlite::SqliteConnectOptions}; + use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions}; use std::{str::FromStr, time::Duration}; let mut conn = SqliteConnectOptions::from_str(":memory:") @@ -1398,7 +1398,7 @@ async fn it_can_recover_from_bad_transaction_begin() -> anyhow::Result<()> { } fn transaction_state(handle: &mut LockedSqliteHandle) -> SqliteTransactionState { - use libsqlite3_sys::{SQLITE_TXN_NONE, SQLITE_TXN_READ, SQLITE_TXN_WRITE, sqlite3_txn_state}; + use libsqlite3_sys::{sqlite3_txn_state, SQLITE_TXN_NONE, SQLITE_TXN_READ, SQLITE_TXN_WRITE}; let unchecked_state = unsafe { sqlite3_txn_state(handle.as_raw_handle().as_ptr(), std::ptr::null()) };