diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d71af206c78d5..fc1eb91db8f44 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -669,6 +669,14 @@ config_namespace! { /// # Default /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + + /// Prefix to use when generating file name in multi file output. + /// + /// When prefix is non-empty string, this prefix will be used to generate file name as + /// `{partitioned_file_prefix_name}{datafusion generated suffix}`. + /// + /// Defaults to empty string. + pub partitioned_file_prefix_name: String, default = String::new() } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2292f5855bfde..a89bd821d7b12 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -72,6 +72,7 @@ use datafusion_functions_aggregate::expr_fn::{ use async_trait::async_trait; use datafusion_catalog::Session; +#[derive(Clone)] /// Contains options that control how data is /// written out from a DataFrame pub struct DataFrameWriteOptions { diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c94ab10a9e72f..baa197661852a 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6856,3 +6856,255 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> { Ok(()) } + +struct FixtureDataGen { + _tmp_dir: TempDir, + out_dir: String, + ctx: SessionContext, +} + +impl FixtureDataGen { + fn register_local_table( + out_dir: impl AsRef, + ctx: &SessionContext, + ) -> Result<()> { + // Create an in memory table with schema C1 and C2, both strings + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::Utf8, false), + ])); + + let record_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["abc", "def"])), + Arc::new(StringArray::from(vec!["123", "456"])), + ], + )?; + + let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?); + + // Register the table in the context + ctx.register_table("test", mem_table)?; + + let local = Arc::new(LocalFileSystem::new_with_prefix(&out_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.register_object_store(&local_url, local); + + Ok(()) + } + + // initializes basic data and writes it using via executing physical plan + // + // Available columns: c1, c2 + async fn prepare_execution_plan_writes(config: SessionConfig) -> Result { + let tmp_dir = TempDir::new()?; + + let ctx = SessionContext::new_with_config(config); + + Self::register_local_table(&tmp_dir, &ctx)?; + + let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/"; + let out_dir_url = format!("file://{out_dir}"); + + let df = ctx.sql("SELECT c1, c2 FROM test").await?; + let plan = df.create_physical_plan().await?; + + ctx.write_parquet(plan.clone(), &out_dir_url, None).await?; + ctx.write_csv(plan.clone(), &out_dir_url).await?; + ctx.write_json(plan.clone(), &out_dir_url).await?; + + Ok(Self { + _tmp_dir: tmp_dir, + out_dir, + ctx, + }) + } + + // initializes basic data and writes it using `write_opts` + // + // Available columns: c1, c2 + async fn prepare_direct_df_writes( + config: SessionConfig, + write_opts: DataFrameWriteOptions, + ) -> Result { + let tmp_dir = TempDir::new()?; + + let ctx = SessionContext::new_with_config(config); + + Self::register_local_table(&tmp_dir, &ctx)?; + + let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/"; + let out_dir_url = format!("file://{out_dir}"); + + let df = ctx.sql("SELECT c1, c2 FROM test").await?; + + df.clone() + .write_parquet(&out_dir_url, write_opts.clone(), None) + .await?; + df.clone() + .write_csv(&out_dir_url, write_opts.clone(), None) + .await?; + df.write_json(&out_dir_url, write_opts.clone(), None) + .await?; + + Ok(Self { + _tmp_dir: tmp_dir, + out_dir, + ctx, + }) + } +} + +#[tokio::test] +async fn write_partitioned_results_with_prefix() -> Result<()> { + let mut config = SessionConfig::new(); + config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned(); + + let df_write_options = + DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]); + let FixtureDataGen { + _tmp_dir, + out_dir, + ctx, + } = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?; + + let partitioned_file = format!("{out_dir}/c2=123/prefix-*"); + let filter_df = ctx + .read_parquet(&partitioned_file, ParquetReadOptions::default()) + .await?; + + // Check that the c2 column is gone and that c1 is abc. + let results_parquet = filter_df.collect().await?; + let results_parquet_display = batches_to_string(&results_parquet); + assert_snapshot!( + results_parquet_display.as_str(), + @r###" + +-----+ + | c1 | + +-----+ + | abc | + +-----+ + "### + ); + + let results_csv = ctx + .read_csv(&partitioned_file, Default::default()) + .await? + .collect() + .await?; + assert_eq!( + results_parquet_display.as_str(), + batches_to_string(&results_csv) + ); + + let results_json = ctx + .read_json(&partitioned_file, Default::default()) + .await? + .collect() + .await?; + assert_eq!(results_parquet_display, batches_to_string(&results_json)); + + Ok(()) +} + +#[tokio::test] +async fn write_physical_plan_results_with_prefix() -> Result<()> { + let mut config = SessionConfig::new(); + config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned(); + + let FixtureDataGen { + _tmp_dir, + out_dir, + ctx, + } = FixtureDataGen::prepare_execution_plan_writes(config).await?; + + let partitioned_file = format!("{out_dir}/prefix-*"); + + let df = ctx + .read_parquet(&partitioned_file, Default::default()) + .await?; + let results_parquet = df.collect().await?; + let results_parquet_display = batches_to_string(&results_parquet); + assert_snapshot!( + results_parquet_display.as_str(), + @r###" + +-----+-----+ + | c1 | c2 | + +-----+-----+ + | abc | 123 | + | def | 456 | + +-----+-----+ + "### + ); + + let results_csv = ctx + .read_csv(&partitioned_file, Default::default()) + .await? + .collect() + .await?; + assert_eq!( + results_parquet_display.as_str(), + batches_to_string(&results_csv) + ); + + let results_json = ctx + .read_json(&partitioned_file, Default::default()) + .await? + .collect() + .await?; + assert_eq!(results_parquet_display, batches_to_string(&results_json)); + + Ok(()) +} + +#[tokio::test] +async fn write_parts_parquet_results_with_prefix() -> Result<()> { + let mut config = SessionConfig::new(); + config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned(); + + let df_write_options = DataFrameWriteOptions::new(); + let FixtureDataGen { + _tmp_dir, + out_dir, + ctx, + } = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?; + + let partitioned_file = format!("{out_dir}/prefix-*"); + + let df = ctx + .read_parquet(&partitioned_file, Default::default()) + .await?; + let results_parquet = df.collect().await?; + let results_parquet_display = batches_to_string(&results_parquet); + assert_snapshot!( + results_parquet_display.as_str(), + @r###" + +-----+-----+ + | c1 | c2 | + +-----+-----+ + | abc | 123 | + | def | 456 | + +-----+-----+ + "### + ); + + let results_csv = ctx + .read_csv(&partitioned_file, Default::default()) + .await? + .collect() + .await?; + assert_eq!( + results_parquet_display.as_str(), + batches_to_string(&results_csv) + ); + + let results_json = ctx + .read_json(&partitioned_file, Default::default()) + .await? + .collect() + .await?; + assert_eq!(results_parquet_display, batches_to_string(&results_json)); + + Ok(()) +} diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 77a0dc9cf7995..b21208ef4703c 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -463,16 +463,15 @@ pub async fn plan_to_csv( let parsed = ListingTableUrl::parse(path)?; let object_store_url = parsed.object_store(); let store = task_ctx.runtime_env().object_store(&object_store_url)?; - let writer_buffer_size = task_ctx - .session_config() - .options() - .execution - .objectstore_writer_buffer_size; + let exec_options = &task_ctx.session_config().options().execution; + let writer_buffer_size = exec_options.objectstore_writer_buffer_size; + let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str(); + let mut join_set = JoinSet::new(); for i in 0..plan.output_partitioning().partition_count() { let storeref = Arc::clone(&store); let plan: Arc = Arc::clone(&plan); - let filename = format!("{}/part-{i}.csv", parsed.prefix()); + let filename = format!("{}/{file_name_prefix}part-{i}.csv", parsed.prefix(),); let file = object_store::path::Path::parse(filename)?; let mut stream = plan.execute(i, Arc::clone(&task_ctx))?; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 2f1d5abbee599..0442136e327c6 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -471,16 +471,15 @@ pub async fn plan_to_json( let parsed = ListingTableUrl::parse(path)?; let object_store_url = parsed.object_store(); let store = task_ctx.runtime_env().object_store(&object_store_url)?; - let writer_buffer_size = task_ctx - .session_config() - .options() - .execution - .objectstore_writer_buffer_size; + let exec_options = &task_ctx.session_config().options().execution; + let writer_buffer_size = exec_options.objectstore_writer_buffer_size; + let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str(); + let mut join_set = JoinSet::new(); for i in 0..plan.output_partitioning().partition_count() { let storeref = Arc::clone(&store); let plan: Arc = Arc::clone(&plan); - let filename = format!("{}/part-{i}.json", parsed.prefix()); + let filename = format!("{}/{file_name_prefix}part-{i}.json", parsed.prefix()); let file = object_store::path::Path::parse(filename)?; let mut stream = plan.execute(i, Arc::clone(&task_ctx))?; diff --git a/datafusion/datasource-parquet/src/writer.rs b/datafusion/datasource-parquet/src/writer.rs index d37b6e26a7536..626323e4cc6a2 100644 --- a/datafusion/datasource-parquet/src/writer.rs +++ b/datafusion/datasource-parquet/src/writer.rs @@ -39,9 +39,12 @@ pub async fn plan_to_parquet( let object_store_url = parsed.object_store(); let store = task_ctx.runtime_env().object_store(&object_store_url)?; let mut join_set = JoinSet::new(); + let exec_options = &task_ctx.session_config().options().execution; + let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str(); + for i in 0..plan.output_partitioning().partition_count() { let plan: Arc = Arc::clone(&plan); - let filename = format!("{}/part-{i}.parquet", parsed.prefix()); + let filename = format!("{}/{file_name_prefix}part-{i}.parquet", parsed.prefix()); let file = Path::parse(filename)?; let propclone = writer_properties.clone(); @@ -49,11 +52,7 @@ pub async fn plan_to_parquet( let buf_writer = BufWriter::with_capacity( storeref, file.clone(), - task_ctx - .session_config() - .options() - .execution - .objectstore_writer_buffer_size, + exec_options.objectstore_writer_buffer_size, ); let mut stream = plan.execute(i, Arc::clone(&task_ctx))?; join_set.spawn(async move { diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 1648624747af2..4c84dd42425a0 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -157,7 +157,8 @@ async fn row_count_demuxer( let max_buffered_batches = exec_options.max_buffered_batches_per_output_file; let minimum_parallel_files = exec_options.minimum_parallel_output_files; let mut part_idx = 0; - let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16); + let mut write_id = exec_options.partitioned_file_prefix_name.clone(); + rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16); let mut open_file_streams = Vec::with_capacity(minimum_parallel_files); @@ -301,9 +302,10 @@ async fn hive_style_partitions_demuxer( file_extension: String, keep_partition_by_columns: bool, ) -> Result<()> { - let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16); - let exec_options = &context.session_config().options().execution; + let mut write_id = exec_options.partitioned_file_prefix_name.clone(); + rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16); + let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file; // To support non string partition col types, cast the type to &str first diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b61ceecb24fc0..ba1e525c1062a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -260,6 +260,7 @@ datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 +datafusion.execution.partitioned_file_prefix_name (empty) datafusion.execution.perfect_hash_join_min_key_density 0.15 datafusion.execution.perfect_hash_join_small_build_threshold 1024 datafusion.execution.planning_concurrency 13 @@ -398,6 +399,7 @@ datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistic datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" +datafusion.execution.partitioned_file_prefix_name (empty) Prefix to use when generating file name in multi file output. When prefix is non-empty string, this prefix will be used to generate file name as `{partitioned_file_prefix_name}{datafusion generated suffix}`. Defaults to empty string. datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f33e6314d3619..8d108c4abe251 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -133,6 +133,7 @@ The following configuration settings are available: | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | +| datafusion.execution.partitioned_file_prefix_name | | Prefix to use when generating file name in multi file output. When prefix is non-empty string, this prefix will be used to generate file name as `{partitioned_file_prefix_name}{datafusion generated suffix}`. Defaults to empty string. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |