diff --git a/Cargo.lock b/Cargo.lock index c552835a2cb6f..294e4ec1a74cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2594,6 +2594,7 @@ dependencies = [ "log", "percent-encoding", "rand 0.9.2", + "serde_json", "sha1", "sha2", "url", diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 8a5c68a5d4e4b..162b6d814e804 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -59,6 +59,7 @@ datafusion-functions-nested = { workspace = true } log = { workspace = true } percent-encoding = "2.3.2" rand = { workspace = true } +serde_json = { workspace = true } sha1 = "0.10" sha2 = { workspace = true } url = { workspace = true } diff --git a/datafusion/spark/src/function/json/json_tuple.rs b/datafusion/spark/src/function/json/json_tuple.rs new file mode 100644 index 0000000000000..f3ba7e91ac3da --- /dev/null +++ b/datafusion/spark/src/function/json/json_tuple.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, NullBufferBuilder, StringBuilder, StructArray}; +use arrow::datatypes::{DataType, Field, FieldRef, Fields}; +use datafusion_common::cast::as_string_array; +use datafusion_common::{Result, exec_err, internal_err}; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, +}; + +/// Spark-compatible `json_tuple` expression +/// +/// +/// +/// Extracts top-level fields from a JSON string and returns them as a struct. +/// +/// `json_tuple(json_string, field1, field2, ...) -> Struct` +/// +/// Note: In Spark, `json_tuple` is a Generator that produces multiple columns directly. +/// In DataFusion, a ScalarUDF can only return one value per row, so the result is wrapped +/// in a Struct. The caller (e.g. Comet) is expected to destructure the struct fields. +/// +/// - Returns NULL for each field that is missing from the JSON object +/// - Returns NULL for all fields if the input is NULL or not valid JSON +/// - Non-string JSON values are converted to their JSON string representation +/// - JSON `null` values are returned as NULL (not the string "null") +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct JsonTuple { + signature: Signature, +} + +impl Default for JsonTuple { + fn default() -> Self { + Self::new() + } +} + +impl JsonTuple { + pub fn new() -> Self { + Self { + signature: Signature::variadic(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for JsonTuple { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "json_tuple" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + if args.arg_fields.len() < 2 { + return exec_err!( + "json_tuple requires at least 2 arguments (json_string, field1), got {}", + args.arg_fields.len() + ); + } + + let num_fields = args.arg_fields.len() - 1; + let fields: Fields = (0..num_fields) + .map(|i| Field::new(format!("c{i}"), DataType::Utf8, true)) + .collect::>() + .into(); + + Ok(Arc::new(Field::new( + self.name(), + DataType::Struct(fields), + true, + ))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let ScalarFunctionArgs { + args: arg_values, + return_field, + .. + } = args; + let arrays = ColumnarValue::values_to_arrays(&arg_values)?; + let result = json_tuple_inner(&arrays, return_field.data_type())?; + + Ok(ColumnarValue::Array(result)) + } +} + +fn json_tuple_inner(args: &[ArrayRef], return_type: &DataType) -> Result { + let num_rows = args[0].len(); + let num_fields = args.len() - 1; + + let json_array = as_string_array(&args[0])?; + + let field_arrays = args[1..] + .iter() + .map(|arg| as_string_array(arg)) + .collect::>>()?; + + let mut builders: Vec = + (0..num_fields).map(|_| StringBuilder::new()).collect(); + + let mut null_buffer = NullBufferBuilder::new(num_rows); + + for row_idx in 0..num_rows { + if json_array.is_null(row_idx) { + for builder in &mut builders { + builder.append_null(); + } + null_buffer.append_null(); + continue; + } + + let json_str = json_array.value(row_idx); + match serde_json::from_str::(json_str) { + Ok(serde_json::Value::Object(map)) => { + null_buffer.append_non_null(); + for (field_idx, builder) in builders.iter_mut().enumerate() { + if field_arrays[field_idx].is_null(row_idx) { + builder.append_null(); + continue; + } + let field_name = field_arrays[field_idx].value(row_idx); + match map.get(field_name) { + Some(serde_json::Value::Null) => { + builder.append_null(); + } + Some(serde_json::Value::String(s)) => { + builder.append_value(s); + } + Some(other) => { + builder.append_value(other.to_string()); + } + None => { + builder.append_null(); + } + } + } + } + _ => { + for builder in &mut builders { + builder.append_null(); + } + null_buffer.append_null(); + } + } + } + + let struct_fields = match return_type { + DataType::Struct(fields) => fields.clone(), + _ => { + return internal_err!( + "json_tuple requires a Struct return type, got {:?}", + return_type + ); + } + }; + + let arrays: Vec = builders + .into_iter() + .map(|mut builder| Arc::new(builder.finish()) as ArrayRef) + .collect(); + + let struct_array = StructArray::try_new(struct_fields, arrays, null_buffer.finish())?; + + Ok(Arc::new(struct_array)) +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_expr::ReturnFieldArgs; + + #[test] + fn test_return_field_shape() { + let func = JsonTuple::new(); + let fields = vec![ + Arc::new(Field::new("json", DataType::Utf8, false)), + Arc::new(Field::new("f1", DataType::Utf8, false)), + Arc::new(Field::new("f2", DataType::Utf8, false)), + ]; + let result = func + .return_field_from_args(ReturnFieldArgs { + arg_fields: &fields, + scalar_arguments: &[None, None, None], + }) + .unwrap(); + + match result.data_type() { + DataType::Struct(inner) => { + assert_eq!(inner.len(), 2); + assert_eq!(inner[0].name(), "c0"); + assert_eq!(inner[1].name(), "c1"); + assert_eq!(inner[0].data_type(), &DataType::Utf8); + assert!(inner[0].is_nullable()); + } + other => panic!("Expected Struct, got {other:?}"), + } + } + + #[test] + fn test_too_few_args() { + let func = JsonTuple::new(); + let fields = vec![Arc::new(Field::new("json", DataType::Utf8, false))]; + let result = func.return_field_from_args(ReturnFieldArgs { + arg_fields: &fields, + scalar_arguments: &[None], + }); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("at least 2 arguments") + ); + } +} diff --git a/datafusion/spark/src/function/json/mod.rs b/datafusion/spark/src/function/json/mod.rs index a87df9a2c87a0..01378235d7c64 100644 --- a/datafusion/spark/src/function/json/mod.rs +++ b/datafusion/spark/src/function/json/mod.rs @@ -15,11 +15,24 @@ // specific language governing permissions and limitations // under the License. +pub mod json_tuple; + use datafusion_expr::ScalarUDF; +use datafusion_functions::make_udf_function; use std::sync::Arc; -pub mod expr_fn {} +make_udf_function!(json_tuple::JsonTuple, json_tuple); + +pub mod expr_fn { + use datafusion_functions::export_functions; + + export_functions!(( + json_tuple, + "Extracts top-level fields from a JSON string and returns them as a struct.", + args, + )); +} pub fn functions() -> Vec> { - vec![] + vec![json_tuple()] } diff --git a/datafusion/sqllogictest/test_files/spark/json/json_tuple.slt b/datafusion/sqllogictest/test_files/spark/json/json_tuple.slt new file mode 100644 index 0000000000000..c0c424946709f --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/json/json_tuple.slt @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for Spark-compatible json_tuple function +# https://spark.apache.org/docs/latest/api/sql/index.html#json_tuple +# +# Test cases derived from Spark JsonExpressionsSuite: +# https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala + +# Scalar: hive key 1 +query ? +SELECT json_tuple('{"f1":"value1","f2":"value2","f3":3,"f5":5.23}'::STRING, 'f1'::STRING, 'f2'::STRING, 'f3'::STRING, 'f4'::STRING, 'f5'::STRING); +---- +{c0: value1, c1: value2, c2: 3, c3: NULL, c4: 5.23} + +# Scalar: hive key 2 +query ? +SELECT json_tuple('{"f1":"value12","f3":"value3","f2":2,"f4":4.01}'::STRING, 'f1'::STRING, 'f2'::STRING, 'f3'::STRING, 'f4'::STRING, 'f5'::STRING); +---- +{c0: value12, c1: 2, c2: value3, c3: 4.01, c4: NULL} + +# Scalar: hive key 3 +query ? +SELECT json_tuple('{"f1":"value13","f4":"value44","f3":"value33","f2":2,"f5":5.01}'::STRING, 'f1'::STRING, 'f2'::STRING, 'f3'::STRING, 'f4'::STRING, 'f5'::STRING); +---- +{c0: value13, c1: 2, c2: value33, c3: value44, c4: 5.01} + +# Scalar: null JSON input +query ? +SELECT json_tuple(NULL::STRING, 'f1'::STRING, 'f2'::STRING, 'f3'::STRING, 'f4'::STRING, 'f5'::STRING); +---- +NULL + +# Scalar: null and empty values +query ? +SELECT json_tuple('{"f1":"","f5":null}'::STRING, 'f1'::STRING, 'f2'::STRING, 'f3'::STRING, 'f4'::STRING, 'f5'::STRING); +---- +{c0: , c1: NULL, c2: NULL, c3: NULL, c4: NULL} + +# Scalar: invalid JSON (array) +query ? +SELECT json_tuple('[invalid JSON string]'::STRING, 'f1'::STRING); +---- +NULL + +# Scalar: invalid JSON (start only) +query ? +SELECT json_tuple('{'::STRING, 'f1'::STRING); +---- +NULL + +# Scalar: invalid JSON (no closing brace) +query ? +SELECT json_tuple('{"foo":"bar"'::STRING, 'f1'::STRING); +---- +NULL + +# Scalar: invalid JSON (backslash) +query ? +SELECT json_tuple('\'::STRING, 'f1'::STRING); +---- +NULL + +# Scalar: invalid JSON (quoted string, not an object) +query ? +SELECT json_tuple('"quote'::STRING, '"quote'::STRING); +---- +NULL + +# Scalar: empty JSON object +query ? +SELECT json_tuple('{}'::STRING, 'a'::STRING); +---- +{c0: NULL} + +# Array: multi-row test +query ? +SELECT json_tuple(col, 'f1'::STRING, 'f2'::STRING) FROM (VALUES + ('{"f1":"a","f2":"b"}'::STRING), + (NULL::STRING), + ('{"f1":"c"}'::STRING), + ('invalid'::STRING) +) AS t(col); +---- +{c0: a, c1: b} +NULL +{c0: c, c1: NULL} +NULL + +# Array: SPARK-21677 null field key +query ? +SELECT json_tuple(col1, col2, col3, col4) FROM (VALUES + ('{"f1":1,"f2":2}'::STRING, 'f1'::STRING, NULL::STRING, 'f2'::STRING) +) AS t(col1, col2, col3, col4); +---- +{c0: 1, c1: NULL, c2: 2} + +# Array: SPARK-21804 repeated field +query ? +SELECT json_tuple(col1, col2, col3, col4) FROM (VALUES + ('{"f1":1,"f2":2}'::STRING, 'f1'::STRING, NULL::STRING, 'f1'::STRING) +) AS t(col1, col2, col3, col4); +---- +{c0: 1, c1: NULL, c2: 1} + +# Edge case: both json and field key are null +query ? +SELECT json_tuple(NULL::STRING, NULL::STRING); +---- +NULL + +# Edge case: empty string json and empty string key +query ? +SELECT json_tuple(''::STRING, ''::STRING); +---- +NULL + +# Edge case: mixed upper/lower case keys +query ? +SELECT json_tuple('{"Name":"Alice","name":"bob","NAME":"Charlie"}'::STRING, 'Name'::STRING, 'name'::STRING, 'NAME'::STRING); +---- +{c0: Alice, c1: bob, c2: Charlie} + +# Edge case: UTF-8 Chinese characters +query ? +SELECT json_tuple('{"姓名":"小明","城市":"台北"}'::STRING, '姓名'::STRING, '城市'::STRING); +---- +{c0: 小明, c1: 台北} + +# Edge case: UTF-8 Cyrillic characters +query ? +SELECT json_tuple('{"имя":"Иван","город":"Москва"}'::STRING, 'имя'::STRING, 'город'::STRING); +---- +{c0: Иван, c1: Москва} + +# Verify return type with arrow_typeof +query T +SELECT arrow_typeof(json_tuple('{"a":1}'::STRING, 'a'::STRING)); +---- +Struct("c0": Utf8)