Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 226 additions & 69 deletions native/spark-expr/src/conversion_funcs/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ use std::str::FromStr;
use std::sync::{Arc, LazyLock};

macro_rules! cast_utf8_to_timestamp {
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident, $tz:expr) => {{
// $tz is a Timezone:Tz object and contains the session timezone.
// $to_tz_str is a string containing the to_type timezone
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident, $tz:expr, $to_tz_str:expr) => {{
let len = $array.len();
let mut cast_array = PrimitiveArray::<$array_type>::builder(len).with_timezone("UTC");
let mut cast_array = PrimitiveArray::<$array_type>::builder(len).with_timezone($to_tz_str);
let mut cast_err: Option<SparkError> = None;
for i in 0..len {
if $array.is_null(i) {
Expand Down Expand Up @@ -675,16 +677,21 @@ pub(crate) fn cast_string_to_timestamp(
.downcast_ref::<GenericStringArray<i32>>()
.expect("Expected a string array");

let tz = &timezone::Tz::from_str(timezone_str).unwrap();
let tz = &timezone::Tz::from_str(timezone_str)
.map_err(|_| SparkError::Internal(format!("Invalid timezone string: {timezone_str}")))?;

let cast_array: ArrayRef = match to_type {
DataType::Timestamp(_, _) => cast_utf8_to_timestamp!(
string_array,
eval_mode,
TimestampMicrosecondType,
timestamp_parser,
tz
)?,
DataType::Timestamp(_, tz_opt) => {
let to_tz = tz_opt.as_deref().unwrap_or("UTC");
cast_utf8_to_timestamp!(
string_array,
eval_mode,
TimestampMicrosecondType,
timestamp_parser,
tz,
to_tz
)?
}
_ => unreachable!("Invalid data type {:?} in cast from string", to_type),
};
Ok(cast_array)
Expand Down Expand Up @@ -967,20 +974,31 @@ fn get_timestamp_values<T: TimeZone>(
timestamp_type: &str,
tz: &T,
) -> SparkResult<Option<i64>> {
let values: Vec<_> = value.split(['T', '-', ':', '.']).collect();
let year = values[0].parse::<i32>().unwrap_or_default();
// Handle negative year: strip leading '-' and remember the sign.
let (sign, date_part) = if let Some(stripped) = value.strip_prefix('-') {
(-1i32, stripped)
} else {
(1i32, value)
};
let mut parts = date_part.split(['T', ' ', '-', ':', '.']);
let year = sign
* parts
.next()
.unwrap_or("")
.parse::<i32>()
.unwrap_or_default();

// NaiveDate (used internally by chrono's with_ymd_and_hms) is bounded to ±262142.
if !(-262143..=262142).contains(&year) {
return Ok(None);
}

let month = values.get(1).map_or(1, |m| m.parse::<u32>().unwrap_or(1));
let day = values.get(2).map_or(1, |d| d.parse::<u32>().unwrap_or(1));
let hour = values.get(3).map_or(0, |h| h.parse::<u32>().unwrap_or(0));
let minute = values.get(4).map_or(0, |m| m.parse::<u32>().unwrap_or(0));
let second = values.get(5).map_or(0, |s| s.parse::<u32>().unwrap_or(0));
let microsecond = values.get(6).map_or(0, |ms| ms.parse::<u32>().unwrap_or(0));
let month = parts.next().map_or(1, |m| m.parse::<u32>().unwrap_or(1));
let day = parts.next().map_or(1, |d| d.parse::<u32>().unwrap_or(1));
let hour = parts.next().map_or(0, |h| h.parse::<u32>().unwrap_or(0));
let minute = parts.next().map_or(0, |m| m.parse::<u32>().unwrap_or(0));
let second = parts.next().map_or(0, |s| s.parse::<u32>().unwrap_or(0));
let microsecond = parts.next().map_or(0, |ms| ms.parse::<u32>().unwrap_or(0));

let mut timestamp_info = TimeStampInfo::default();

Expand Down Expand Up @@ -1041,28 +1059,19 @@ fn parse_timestamp_to_micros<T: TimeZone>(
timestamp_info.second,
);

// Check if datetime is not None
let tz_datetime = match datetime.single() {
// Spark uses the offset before daylight savings change so we need to use earliest()
// Return None for LocalResult::None which is the invalid time in a DST spring forward gap).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do existing tests cover any DST spring forward gaps?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests in Sparks DateTimeUtilsSuite which do.

let tz_datetime = match datetime.earliest() {
Some(dt) => dt
.with_timezone(tz)
.with_nanosecond(timestamp_info.microsecond * 1000),
None => {
return Err(SparkError::Internal(
"Failed to parse timestamp".to_string(),
));
}
};

let result = match tz_datetime {
Some(dt) => dt.timestamp_micros(),
None => {
return Err(SparkError::Internal(
"Failed to parse timestamp".to_string(),
));
}
None => return Ok(None),
};

Ok(Some(result))
match tz_datetime {
Some(dt) => Ok(Some(dt.timestamp_micros())),
None => Ok(None),
}
}

fn parse_str_to_year_timestamp<T: TimeZone>(value: &str, tz: &T) -> SparkResult<Option<i64>> {
Expand Down Expand Up @@ -1096,21 +1105,6 @@ fn parse_str_to_microsecond_timestamp<T: TimeZone>(
get_timestamp_values(value, "microsecond", tz)
}

type TimestampPattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>);

static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{4,7}$").unwrap());
static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}$").unwrap());
static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}$").unwrap());
static RE_HOUR: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{1,2}$").unwrap());
static RE_MINUTE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{2}:\d{2}$").unwrap());
static RE_SECOND: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$").unwrap());
static RE_MICROSECOND: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap());
static RE_TIME_ONLY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^T\d{1,2}$").unwrap());

fn timestamp_parser<T: TimeZone>(
value: &str,
eval_mode: EvalMode,
Expand All @@ -1120,15 +1114,103 @@ fn timestamp_parser<T: TimeZone>(
if value.is_empty() {
return Ok(None);
}
let patterns: &[TimestampPattern<T>] = &[
(&RE_YEAR, parse_str_to_year_timestamp),

// Handle Z or ±HH:MM offset suffix: strip it and parse with the explicit fixed offset.
if let Some((stripped, offset_secs)) = extract_offset_suffix(value) {
let fixed_tz = chrono::FixedOffset::east_opt(offset_secs)
.ok_or_else(|| SparkError::Internal("Invalid timezone offset".to_string()))?;
return timestamp_parser_with_tz(stripped, eval_mode, &fixed_tz);
}

timestamp_parser_with_tz(value, eval_mode, tz)
}

/// If `value` ends with a UTC offset suffix (`Z`, `+HH:MM`, or `-HH:MM`), returns the
/// stripped string and the offset in seconds. Returns `None` if no offset suffix is present.
Comment on lines +1128 to +1129
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior for offsets other than the ones that are supported so far?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll return None and eventually fail. The corresponding code in Spark SparkDateTimeUtils.getZoneId indicates that these formats (H:mm and HH:m) are retained for backward compatibility with Spark before 3.0. Let me create an issue to handle those as well as named timezones (e.g. 'America/New_York') which are less commonly used.
There is a Spark test test("SPARK-35780: support full range of timestamp string") which currently passes. I'll double check in the next PR to make sure it is being exercised. The test does not include named timezones so perhaps that is not needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue to support more formats - #3775

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Spark's DateTimeUtilsSuite is never called in our ci. This may reveal a bunch of issues.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #3776 to address this.

fn extract_offset_suffix(value: &str) -> Option<(&str, i32)> {
if let Some(stripped) = value.strip_suffix('Z') {
return Some((stripped, 0));
}
// Check for ±HH:MM at the end (exactly 6 chars: sign + 2 digits + ':' + 2 digits)
if value.len() >= 6 {
let suffix_start = value.len() - 6;
let suffix = &value[suffix_start..];
let sign_byte = suffix.as_bytes()[0];
if (sign_byte == b'+' || sign_byte == b'-') && suffix.as_bytes()[3] == b':' {
if let (Ok(h), Ok(m)) = (suffix[1..3].parse::<i32>(), suffix[4..6].parse::<i32>()) {
let sign = if sign_byte == b'+' { 1i32 } else { -1i32 };
return Some((&value[..suffix_start], sign * (h * 3600 + m * 60)));
}
}
}
None
}

type TimestampParsePattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>);

static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}$").unwrap());
static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}$").unwrap());
static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}$").unwrap());
static RE_HOUR: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{1,2}$").unwrap());
static RE_MINUTE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{2}:\d{2}$").unwrap());
static RE_SECOND: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}$").unwrap());
static RE_MICROSECOND: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap());
static RE_TIME_ONLY_H: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^T\d{1,2}$").unwrap());
static RE_TIME_ONLY_HM: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^T\d{1,2}:\d{2}$").unwrap());
static RE_TIME_ONLY_HMS: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^T\d{1,2}:\d{2}:\d{2}$").unwrap());
static RE_TIME_ONLY_HMSU: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^T\d{1,2}:\d{2}:\d{2}\.\d{1,6}$").unwrap());
static RE_BARE_HM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{1,2}:\d{2}$").unwrap());
static RE_BARE_HMS: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{1,2}:\d{2}:\d{2}$").unwrap());
static RE_BARE_HMSU: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^\d{1,2}:\d{2}:\d{2}\.\d{1,6}$").unwrap());

fn timestamp_parser_with_tz<T: TimeZone>(
value: &str,
eval_mode: EvalMode,
tz: &T,
) -> SparkResult<Option<i64>> {
// Both T-separator and space-separator date-time forms are supported.
// Negative years are handled by get_timestamp_values detecting a leading '-'.
let patterns: &[TimestampParsePattern<T>] = &[
// Year only: 4-7 digits, optionally negative
(
&RE_YEAR,
parse_str_to_year_timestamp as fn(&str, &T) -> SparkResult<Option<i64>>,
),
// Year-month
(&RE_MONTH, parse_str_to_month_timestamp),
// Year-month-day
(&RE_DAY, parse_str_to_day_timestamp),
// Date T-or-space hour (1 or 2 digits)
(&RE_HOUR, parse_str_to_hour_timestamp),
// Date T-or-space hour:minute
(&RE_MINUTE, parse_str_to_minute_timestamp),
// Date T-or-space hour:minute:second
(&RE_SECOND, parse_str_to_second_timestamp),
// Date T-or-space hour:minute:second.fraction
(&RE_MICROSECOND, parse_str_to_microsecond_timestamp),
(&RE_TIME_ONLY, parse_str_to_time_only_timestamp),
// Time-only: T hour (1 or 2 digits, no colon)
(&RE_TIME_ONLY_H, parse_str_to_time_only_timestamp),
// Time-only: T hour:minute
(&RE_TIME_ONLY_HM, parse_str_to_time_only_timestamp),
// Time-only: T hour:minute:second
(&RE_TIME_ONLY_HMS, parse_str_to_time_only_timestamp),
// Time-only: T hour:minute:second.fraction
(&RE_TIME_ONLY_HMSU, parse_str_to_time_only_timestamp),
// Bare time-only: hour:minute (without T prefix)
(&RE_BARE_HM, parse_str_to_time_only_timestamp),
// Bare time-only: hour:minute:second
(&RE_BARE_HMS, parse_str_to_time_only_timestamp),
// Bare time-only: hour:minute:second.fraction
(&RE_BARE_HMSU, parse_str_to_time_only_timestamp),
];

let mut timestamp = None;
Expand Down Expand Up @@ -1157,23 +1239,43 @@ fn timestamp_parser<T: TimeZone>(
}

fn parse_str_to_time_only_timestamp<T: TimeZone>(value: &str, tz: &T) -> SparkResult<Option<i64>> {
let values: Vec<&str> = value.split('T').collect();
let time_values: Vec<u32> = values[1]
.split(':')
.map(|v| v.parse::<u32>().unwrap_or(0))
.collect();
// The 'T' is optional in the time format; strip it if specified.
let time_part = value.strip_prefix('T').unwrap_or(value);

// Parse time components: hour[:minute[:second[.fraction]]]
// Use splitn(3) so "12:34:56.789" splits into ["12", "34", "56.789"].
let colon_parts: Vec<&str> = time_part.splitn(3, ':').collect();
let hour: u32 = colon_parts[0].parse().unwrap_or(0);
let minute: u32 = colon_parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0);
let (second, nanosecond) = if let Some(sec_frac) = colon_parts.get(2) {
let dot_idx = sec_frac.find('.');
let sec: u32 = sec_frac[..dot_idx.unwrap_or(sec_frac.len())]
.parse()
.unwrap_or(0);
let ns: u32 = if let Some(dot) = dot_idx {
let frac = &sec_frac[dot + 1..];
// Interpret up to 6 digits as microseconds, padding with trailing zeros.
let trimmed = &frac[..frac.len().min(6)];
let padded = format!("{:0<6}", trimmed);
padded.parse::<u32>().unwrap_or(0) * 1000
} else {
0
};
(sec, ns)
} else {
(0, 0)
};

let datetime = tz.from_utc_datetime(&chrono::Utc::now().naive_utc());
let timestamp = datetime
let result = datetime
.with_timezone(tz)
.with_hour(time_values.first().copied().unwrap_or_default())
.and_then(|dt| dt.with_minute(*time_values.get(1).unwrap_or(&0)))
.and_then(|dt| dt.with_second(*time_values.get(2).unwrap_or(&0)))
.and_then(|dt| dt.with_nanosecond(*time_values.get(3).unwrap_or(&0) * 1_000))
.map(|dt| dt.timestamp_micros())
.unwrap_or_default();

Ok(Some(timestamp))
.with_hour(hour)
.and_then(|dt| dt.with_minute(minute))
.and_then(|dt| dt.with_second(second))
.and_then(|dt| dt.with_nanosecond(nanosecond))
.map(|dt| dt.timestamp_micros());

Ok(result)
}

//a string to date parser - port of spark's SparkDateTimeUtils#stringToDate.
Expand Down Expand Up @@ -1343,7 +1445,8 @@ mod tests {
eval_mode,
TimestampMicrosecondType,
timestamp_parser,
tz
tz,
"UTC"
)
.unwrap();

Expand Down Expand Up @@ -1373,7 +1476,8 @@ mod tests {
eval_mode,
TimestampMicrosecondType,
timestamp_parser,
tz
tz,
"UTC"
);
assert!(
result.is_err(),
Expand Down Expand Up @@ -1497,6 +1601,59 @@ mod tests {
timestamp_parser("10000-01-01T12:34:56.123456", EvalMode::Legacy, tz).unwrap(),
Some(253402346096123456)
);
// Space separator (same values as T separator)
assert_eq!(
timestamp_parser("2020-01-01 12", EvalMode::Legacy, tz).unwrap(),
Some(1577880000000000)
);
assert_eq!(
timestamp_parser("2020-01-01 12:34", EvalMode::Legacy, tz).unwrap(),
Some(1577882040000000)
);
assert_eq!(
timestamp_parser("2020-01-01 12:34:56", EvalMode::Legacy, tz).unwrap(),
Some(1577882096000000)
);
assert_eq!(
timestamp_parser("2020-01-01 12:34:56.123456", EvalMode::Legacy, tz).unwrap(),
Some(1577882096123456)
);
// Z suffix (UTC)
assert_eq!(
timestamp_parser("2020-01-01T12:34:56Z", EvalMode::Legacy, tz).unwrap(),
Some(1577882096000000)
);
// Positive offset suffix
assert_eq!(
timestamp_parser("2020-01-01T12:34:56+05:30", EvalMode::Legacy, tz).unwrap(),
Some(1577862296000000) // 12:34:56 UTC+5:30 = 07:04:56 UTC
);
// T-prefixed time-only with colon
assert!(timestamp_parser("T12:34", EvalMode::Legacy, tz)
.unwrap()
.is_some());
assert!(timestamp_parser("T12:34:56", EvalMode::Legacy, tz)
.unwrap()
.is_some());
assert!(timestamp_parser("T12:34:56.123456", EvalMode::Legacy, tz)
.unwrap()
.is_some());
// Bare time-only (hour:minute without T prefix)
assert!(timestamp_parser("12:34", EvalMode::Legacy, tz)
.unwrap()
.is_some());
assert!(timestamp_parser("12:34:56", EvalMode::Legacy, tz)
.unwrap()
.is_some());
// Negative year
assert!(timestamp_parser("-0001", EvalMode::Legacy, tz)
.unwrap()
.is_some());
assert!(
timestamp_parser("-0001-01-01T12:34:56", EvalMode::Legacy, tz)
.unwrap()
.is_some()
);
}

#[test]
Expand Down
Loading
Loading