Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,14 @@ config_namespace! {
/// # Default
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false

/// Prefix to use when generating file name in multi file output.
///
/// When prefix is non-empty string, this prefix will be used to generate file name as
/// `{partitioned_file_prefix_name}{datafusion generated suffix}`.
///
/// Defaults to empty string.
pub partitioned_file_prefix_name: String, default = String::new()
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use datafusion_functions_aggregate::expr_fn::{
use async_trait::async_trait;
use datafusion_catalog::Session;

#[derive(Clone)]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenient for test code, but generally there seems to be nothing wrong with having it clonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, nothing wrong with it. It's a plain data struct with no resources or
invariants that cloning would violate. Makes the builder pattern nicer to
work with too.

/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
Expand Down
252 changes: 252 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6856,3 +6856,255 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {

Ok(())
}

struct FixtureDataGen {
_tmp_dir: TempDir,
out_dir: String,
ctx: SessionContext,
}

impl FixtureDataGen {
fn register_local_table(
out_dir: impl AsRef<Path>,
ctx: &SessionContext,
) -> Result<()> {
// Create an in memory table with schema C1 and C2, both strings
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::Utf8, false),
]));

let record_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["abc", "def"])),
Arc::new(StringArray::from(vec!["123", "456"])),
],
)?;

let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);

// Register the table in the context
ctx.register_table("test", mem_table)?;

let local = Arc::new(LocalFileSystem::new_with_prefix(&out_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.register_object_store(&local_url, local);

Ok(())
}

// initializes basic data and writes it using via executing physical plan
//
// Available columns: c1, c2
async fn prepare_execution_plan_writes(config: SessionConfig) -> Result<Self> {
let tmp_dir = TempDir::new()?;

let ctx = SessionContext::new_with_config(config);

Self::register_local_table(&tmp_dir, &ctx)?;

let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
let out_dir_url = format!("file://{out_dir}");

let df = ctx.sql("SELECT c1, c2 FROM test").await?;
let plan = df.create_physical_plan().await?;

ctx.write_parquet(plan.clone(), &out_dir_url, None).await?;
ctx.write_csv(plan.clone(), &out_dir_url).await?;
ctx.write_json(plan.clone(), &out_dir_url).await?;

Ok(Self {
_tmp_dir: tmp_dir,
out_dir,
ctx,
})
}

// initializes basic data and writes it using `write_opts`
//
// Available columns: c1, c2
async fn prepare_direct_df_writes(
config: SessionConfig,
write_opts: DataFrameWriteOptions,
) -> Result<Self> {
let tmp_dir = TempDir::new()?;

let ctx = SessionContext::new_with_config(config);

Self::register_local_table(&tmp_dir, &ctx)?;

let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
let out_dir_url = format!("file://{out_dir}");

let df = ctx.sql("SELECT c1, c2 FROM test").await?;

df.clone()
.write_parquet(&out_dir_url, write_opts.clone(), None)
.await?;
df.clone()
.write_csv(&out_dir_url, write_opts.clone(), None)
.await?;
df.write_json(&out_dir_url, write_opts.clone(), None)
.await?;

Ok(Self {
_tmp_dir: tmp_dir,
out_dir,
ctx,
})
}
}

#[tokio::test]
async fn write_partitioned_results_with_prefix() -> Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();

let df_write_options =
DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
let FixtureDataGen {
_tmp_dir,
out_dir,
ctx,
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;

let partitioned_file = format!("{out_dir}/c2=123/prefix-*");
let filter_df = ctx
.read_parquet(&partitioned_file, ParquetReadOptions::default())
.await?;

// Check that the c2 column is gone and that c1 is abc.
let results_parquet = filter_df.collect().await?;
let results_parquet_display = batches_to_string(&results_parquet);
assert_snapshot!(
results_parquet_display.as_str(),
@r###"
+-----+
| c1 |
+-----+
| abc |
+-----+
"###
);

let results_csv = ctx
.read_csv(&partitioned_file, Default::default())
.await?
.collect()
.await?;
assert_eq!(
results_parquet_display.as_str(),
batches_to_string(&results_csv)
);

let results_json = ctx
.read_json(&partitioned_file, Default::default())
.await?
.collect()
.await?;
assert_eq!(results_parquet_display, batches_to_string(&results_json));

Ok(())
}

#[tokio::test]
async fn write_physical_plan_results_with_prefix() -> Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();

let FixtureDataGen {
_tmp_dir,
out_dir,
ctx,
} = FixtureDataGen::prepare_execution_plan_writes(config).await?;

let partitioned_file = format!("{out_dir}/prefix-*");

let df = ctx
.read_parquet(&partitioned_file, Default::default())
.await?;
let results_parquet = df.collect().await?;
let results_parquet_display = batches_to_string(&results_parquet);
assert_snapshot!(
results_parquet_display.as_str(),
@r###"
+-----+-----+
| c1 | c2 |
+-----+-----+
| abc | 123 |
| def | 456 |
+-----+-----+
"###
);

let results_csv = ctx
.read_csv(&partitioned_file, Default::default())
.await?
.collect()
.await?;
assert_eq!(
results_parquet_display.as_str(),
batches_to_string(&results_csv)
);

let results_json = ctx
.read_json(&partitioned_file, Default::default())
.await?
.collect()
.await?;
assert_eq!(results_parquet_display, batches_to_string(&results_json));

Ok(())
}

#[tokio::test]
async fn write_parts_parquet_results_with_prefix() -> Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();

let df_write_options = DataFrameWriteOptions::new();
let FixtureDataGen {
_tmp_dir,
out_dir,
ctx,
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;

let partitioned_file = format!("{out_dir}/prefix-*");

let df = ctx
.read_parquet(&partitioned_file, Default::default())
.await?;
let results_parquet = df.collect().await?;
let results_parquet_display = batches_to_string(&results_parquet);
assert_snapshot!(
results_parquet_display.as_str(),
@r###"
+-----+-----+
| c1 | c2 |
+-----+-----+
| abc | 123 |
| def | 456 |
+-----+-----+
"###
);

let results_csv = ctx
.read_csv(&partitioned_file, Default::default())
.await?
.collect()
.await?;
assert_eq!(
results_parquet_display.as_str(),
batches_to_string(&results_csv)
);

let results_json = ctx
.read_json(&partitioned_file, Default::default())
.await?
.collect()
.await?;
assert_eq!(results_parquet_display, batches_to_string(&results_json));

Ok(())
}
11 changes: 5 additions & 6 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,15 @@ pub async fn plan_to_csv(
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let writer_buffer_size = task_ctx
.session_config()
.options()
.execution
.objectstore_writer_buffer_size;
let exec_options = &task_ctx.session_config().options().execution;
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();

let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
let filename = format!("{}/part-{i}.csv", parsed.prefix());
let filename = format!("{}/{file_name_prefix}part-{i}.csv", parsed.prefix(),);
let file = object_store::path::Path::parse(filename)?;

let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
Expand Down
11 changes: 5 additions & 6 deletions datafusion/datasource-json/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,16 +471,15 @@ pub async fn plan_to_json(
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let writer_buffer_size = task_ctx
.session_config()
.options()
.execution
.objectstore_writer_buffer_size;
let exec_options = &task_ctx.session_config().options().execution;
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();

let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
let filename = format!("{}/part-{i}.json", parsed.prefix());
let filename = format!("{}/{file_name_prefix}part-{i}.json", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;

let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
Expand Down
11 changes: 5 additions & 6 deletions datafusion/datasource-parquet/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,20 @@ pub async fn plan_to_parquet(
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let mut join_set = JoinSet::new();
let exec_options = &task_ctx.session_config().options().execution;
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();

for i in 0..plan.output_partitioning().partition_count() {
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
let filename = format!("{}/part-{i}.parquet", parsed.prefix());
let filename = format!("{}/{file_name_prefix}part-{i}.parquet", parsed.prefix());
let file = Path::parse(filename)?;
let propclone = writer_properties.clone();

let storeref = Arc::clone(&store);
let buf_writer = BufWriter::with_capacity(
storeref,
file.clone(),
task_ctx
.session_config()
.options()
.execution
.objectstore_writer_buffer_size,
exec_options.objectstore_writer_buffer_size,
);
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
join_set.spawn(async move {
Expand Down
8 changes: 5 additions & 3 deletions datafusion/datasource/src/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ async fn row_count_demuxer(
let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
let minimum_parallel_files = exec_options.minimum_parallel_output_files;
let mut part_idx = 0;
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);

let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);

Expand Down Expand Up @@ -301,9 +302,10 @@ async fn hive_style_partitions_demuxer(
file_extension: String,
keep_partition_by_columns: bool,
) -> Result<()> {
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);

let exec_options = &context.session_config().options().execution;
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);

let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;

// To support non string partition col types, cast the type to &str first
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ datafusion.execution.parquet.statistics_enabled page
datafusion.execution.parquet.statistics_truncate_length 64
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.partitioned_file_prefix_name (empty)
datafusion.execution.perfect_hash_join_min_key_density 0.15
datafusion.execution.perfect_hash_join_small_build_threshold 1024
datafusion.execution.planning_concurrency 13
Expand Down Expand Up @@ -398,6 +399,7 @@ datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistic
datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.partitioned_file_prefix_name (empty) Prefix to use when generating file name in multi file output. When prefix is non-empty string, this prefix will be used to generate file name as `{partitioned_file_prefix_name}{datafusion generated suffix}`. Defaults to empty string.
datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
Expand Down
Loading