From d8b0cc06ab87e4417527ddd73cfb76fd1e8621a5 Mon Sep 17 00:00:00 2001 From: evangelisilva Date: Sun, 15 Feb 2026 18:39:27 -0500 Subject: [PATCH 1/2] feat: add coerce_arguments flag to UDTFs to allow skipping automatic coercion This allows UDTFs to handle complex arguments (like identifiers) that would otherwise fail planning when coerced against an empty schema. Fixes #20293 --- datafusion/catalog/src/table.rs | 11 ++ .../core/src/execution/session_state.rs | 161 +++++++++++++++++- 2 files changed, 164 insertions(+), 8 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index f31d4d52ce88b..24d41ead8af0f 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -489,6 +489,12 @@ pub trait TableProviderFactory: Debug + Sync + Send { pub trait TableFunctionImpl: Debug + Sync + Send { /// Create a table provider fn call(&self, args: &[Expr]) -> Result>; + + /// Returns true if the arguments should be coerced and simplified. + /// Defaults to true for backward compatibility. + fn coerce_arguments(&self) -> bool { + true + } } /// A table that uses a function to generate data @@ -520,4 +526,9 @@ impl TableFunction { pub fn create_table_provider(&self, args: &[Expr]) -> Result> { self.fun.call(args) } + + /// Returns true if the arguments should be coerced and simplified + pub fn coerce_arguments(&self) -> bool { + self.fun.coerce_arguments() + } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9560616c1b6da..331814edc587f 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1842,14 +1842,17 @@ impl ContextProvider for SessionContextProvider<'_> { ); let simplifier = ExprSimplifier::new(simplify_context); let schema = DFSchema::empty(); - let args = args - .into_iter() - .map(|arg| { - simplifier - .coerce(arg, &schema) - .and_then(|e| simplifier.simplify(e)) - }) - .collect::>>()?; + let args = if tbl_func.coerce_arguments() { + args.into_iter() + .map(|arg| { + simplifier + .coerce(arg, &schema) + .and_then(|e| simplifier.simplify(e)) + }) + .collect::>>()? + } else { + args + }; let provider = tbl_func.create_table_provider(&args)?; Ok(provider_as_source(provider)) @@ -2509,3 +2512,145 @@ mod tests { } } } + +#[cfg(test)] +mod udtf_tests { + use super::*; + use datafusion_catalog::{TableFunctionImpl, TableProvider, TableFunction}; + use datafusion_expr::{Expr, TableType}; + use std::sync::Arc; + use std::any::Any; + use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + use datafusion_physical_plan::ExecutionPlan; + use async_trait::async_trait; + use datafusion_catalog::Session; + use datafusion_common::{Result, plan_err}; + + #[derive(Debug)] + struct MockTableProvider { + schema: SchemaRef, + } + + #[async_trait] + impl TableProvider for MockTableProvider { + fn as_any(&self) -> &dyn Any { self } + fn schema(&self) -> SchemaRef { self.schema.clone() } + fn table_type(&self) -> TableType { TableType::Base } + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let schema = self.schema.clone(); + Ok(Arc::new(datafusion_physical_plan::empty::EmptyExec::new(schema))) + } + } + + #[derive(Debug)] + struct NoCoerceUDTF {} + + impl TableFunctionImpl for NoCoerceUDTF { + fn call(&self, args: &[Expr]) -> Result> { + // Verify that the argument 'index' (which is technically a column reference in SQL) + // survives as an identifier instead of failing coercion because it's missing from the empty schema. + match &args[0] { + Expr::BinaryExpr(be) => { + match be.left.as_ref() { + Expr::Column(c) if c.name == "index" => { + // Success! + } + _ => return plan_err!("Expected Column('index') on left side, got {:?}", be.left), + } + } + _ => return plan_err!("Expected BinaryExpr, got {:?}", args[0]), + } + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + Ok(Arc::new(MockTableProvider { schema })) + } + + fn coerce_arguments(&self) -> bool { + false + } + } + + #[test] + fn test_udtf_no_coercion() -> Result<()> { + let udtf = Arc::new(TableFunction::new( + "scan_with".to_string(), + Arc::new(NoCoerceUDTF {}), + )); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_table_function_list(vec![udtf]) + .build(); + + let provider = SessionContextProvider { + state: &state, + tables: HashMap::new(), + }; + + // SQL: SELECT * FROM scan_with(index=1) + let args = vec![ + Expr::BinaryExpr(datafusion_expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion_common::Column::from_name("index"))), + op: datafusion_expr::Operator::Eq, + right: Box::new(Expr::Literal(datafusion_common::ScalarValue::Int32(Some(1)), None)), + }) + ]; + + let source = provider.get_table_function_source("scan_with", args)?; + assert_eq!(source.schema().fields().len(), 1); + assert_eq!(source.schema().field(0).name(), "a"); + + Ok(()) + } + + #[test] + fn test_udtf_default_coercion() -> Result<()> { + #[derive(Debug)] + struct CoerceUDTF {} + impl TableFunctionImpl for CoerceUDTF { + fn call(&self, _args: &[Expr]) -> Result> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + Ok(Arc::new(MockTableProvider { schema })) + } + } + + let udtf = Arc::new(TableFunction::new( + "scan_with".to_string(), + Arc::new(CoerceUDTF {}), + )); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_table_function_list(vec![udtf]) + .build(); + + let provider = SessionContextProvider { + state: &state, + tables: HashMap::new(), + }; + + // In SQL: SELECT * FROM scan_with(unknown_col=1) + let args = vec![ + Expr::BinaryExpr(datafusion_expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion_common::Column::from_name("unknown_col"))), + op: datafusion_expr::Operator::Eq, + right: Box::new(Expr::Literal(datafusion_common::ScalarValue::Int32(Some(1)), None)), + }) + ]; + + // Should fail because coercion is ON and "unknown_col" is not in the empty schema. + let res = provider.get_table_function_source("scan_with", args); + match res { + Ok(_) => panic!("Expected error, but got success"), + Err(e) => assert!(e.to_string().contains("Schema error: No field named unknown_col")), + } + + Ok(()) + } +} From 28b52c9c2e5728392b70457c7261e78388313299 Mon Sep 17 00:00:00 2001 From: evangelisilva Date: Sun, 15 Feb 2026 18:49:45 -0500 Subject: [PATCH 2/2] chore: format session_state.rs with cargo fmt --- .../core/src/execution/session_state.rs | 90 ++++++++++++------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 331814edc587f..4b27514022967 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -2516,15 +2516,15 @@ mod tests { #[cfg(test)] mod udtf_tests { use super::*; - use datafusion_catalog::{TableFunctionImpl, TableProvider, TableFunction}; - use datafusion_expr::{Expr, TableType}; - use std::sync::Arc; - use std::any::Any; - use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; - use datafusion_physical_plan::ExecutionPlan; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion_catalog::Session; + use datafusion_catalog::{TableFunction, TableFunctionImpl, TableProvider}; use datafusion_common::{Result, plan_err}; + use datafusion_expr::{Expr, TableType}; + use datafusion_physical_plan::ExecutionPlan; + use std::any::Any; + use std::sync::Arc; #[derive(Debug)] struct MockTableProvider { @@ -2533,9 +2533,15 @@ mod udtf_tests { #[async_trait] impl TableProvider for MockTableProvider { - fn as_any(&self) -> &dyn Any { self } - fn schema(&self) -> SchemaRef { self.schema.clone() } - fn table_type(&self) -> TableType { TableType::Base } + fn as_any(&self) -> &dyn Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } async fn scan( &self, _state: &dyn Session, @@ -2544,7 +2550,9 @@ mod udtf_tests { _limit: Option, ) -> Result> { let schema = self.schema.clone(); - Ok(Arc::new(datafusion_physical_plan::empty::EmptyExec::new(schema))) + Ok(Arc::new(datafusion_physical_plan::empty::EmptyExec::new( + schema, + ))) } } @@ -2561,13 +2569,19 @@ mod udtf_tests { Expr::Column(c) if c.name == "index" => { // Success! } - _ => return plan_err!("Expected Column('index') on left side, got {:?}", be.left), + _ => { + return plan_err!( + "Expected Column('index') on left side, got {:?}", + be.left + ); + } } } _ => return plan_err!("Expected BinaryExpr, got {:?}", args[0]), } - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); Ok(Arc::new(MockTableProvider { schema })) } @@ -2582,30 +2596,31 @@ mod udtf_tests { "scan_with".to_string(), Arc::new(NoCoerceUDTF {}), )); - + let state = SessionStateBuilder::new() .with_default_features() .with_table_function_list(vec![udtf]) .build(); - + let provider = SessionContextProvider { state: &state, tables: HashMap::new(), }; // SQL: SELECT * FROM scan_with(index=1) - let args = vec![ - Expr::BinaryExpr(datafusion_expr::BinaryExpr { - left: Box::new(Expr::Column(datafusion_common::Column::from_name("index"))), - op: datafusion_expr::Operator::Eq, - right: Box::new(Expr::Literal(datafusion_common::ScalarValue::Int32(Some(1)), None)), - }) - ]; + let args = vec![Expr::BinaryExpr(datafusion_expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion_common::Column::from_name("index"))), + op: datafusion_expr::Operator::Eq, + right: Box::new(Expr::Literal( + datafusion_common::ScalarValue::Int32(Some(1)), + None, + )), + })]; let source = provider.get_table_function_source("scan_with", args)?; assert_eq!(source.schema().fields().len(), 1); assert_eq!(source.schema().field(0).name(), "a"); - + Ok(()) } @@ -2615,7 +2630,8 @@ mod udtf_tests { struct CoerceUDTF {} impl TableFunctionImpl for CoerceUDTF { fn call(&self, _args: &[Expr]) -> Result> { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); Ok(Arc::new(MockTableProvider { schema })) } } @@ -2624,33 +2640,39 @@ mod udtf_tests { "scan_with".to_string(), Arc::new(CoerceUDTF {}), )); - + let state = SessionStateBuilder::new() .with_default_features() .with_table_function_list(vec![udtf]) .build(); - + let provider = SessionContextProvider { state: &state, tables: HashMap::new(), }; // In SQL: SELECT * FROM scan_with(unknown_col=1) - let args = vec![ - Expr::BinaryExpr(datafusion_expr::BinaryExpr { - left: Box::new(Expr::Column(datafusion_common::Column::from_name("unknown_col"))), - op: datafusion_expr::Operator::Eq, - right: Box::new(Expr::Literal(datafusion_common::ScalarValue::Int32(Some(1)), None)), - }) - ]; + let args = vec![Expr::BinaryExpr(datafusion_expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion_common::Column::from_name( + "unknown_col", + ))), + op: datafusion_expr::Operator::Eq, + right: Box::new(Expr::Literal( + datafusion_common::ScalarValue::Int32(Some(1)), + None, + )), + })]; // Should fail because coercion is ON and "unknown_col" is not in the empty schema. let res = provider.get_table_function_source("scan_with", args); match res { Ok(_) => panic!("Expected error, but got success"), - Err(e) => assert!(e.to_string().contains("Schema error: No field named unknown_col")), + Err(e) => assert!( + e.to_string() + .contains("Schema error: No field named unknown_col") + ), } - + Ok(()) } }