-
Notifications
You must be signed in to change notification settings - Fork 303
fix: add timezone and special formats support for cast string to timestamp #3730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,9 +34,11 @@ use std::str::FromStr; | |
| use std::sync::{Arc, LazyLock}; | ||
|
|
||
| macro_rules! cast_utf8_to_timestamp { | ||
| ($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident, $tz:expr) => {{ | ||
| // $tz is a Timezone:Tz object and contains the session timezone. | ||
| // $to_tz_str is a string containing the to_type timezone | ||
| ($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident, $tz:expr, $to_tz_str:expr) => {{ | ||
| let len = $array.len(); | ||
| let mut cast_array = PrimitiveArray::<$array_type>::builder(len).with_timezone("UTC"); | ||
| let mut cast_array = PrimitiveArray::<$array_type>::builder(len).with_timezone($to_tz_str); | ||
| let mut cast_err: Option<SparkError> = None; | ||
| for i in 0..len { | ||
| if $array.is_null(i) { | ||
|
|
@@ -675,16 +677,21 @@ pub(crate) fn cast_string_to_timestamp( | |
| .downcast_ref::<GenericStringArray<i32>>() | ||
| .expect("Expected a string array"); | ||
|
|
||
| let tz = &timezone::Tz::from_str(timezone_str).unwrap(); | ||
| let tz = &timezone::Tz::from_str(timezone_str) | ||
| .map_err(|_| SparkError::Internal(format!("Invalid timezone string: {timezone_str}")))?; | ||
|
|
||
| let cast_array: ArrayRef = match to_type { | ||
| DataType::Timestamp(_, _) => cast_utf8_to_timestamp!( | ||
| string_array, | ||
| eval_mode, | ||
| TimestampMicrosecondType, | ||
| timestamp_parser, | ||
| tz | ||
| )?, | ||
| DataType::Timestamp(_, tz_opt) => { | ||
| let to_tz = tz_opt.as_deref().unwrap_or("UTC"); | ||
| cast_utf8_to_timestamp!( | ||
| string_array, | ||
| eval_mode, | ||
| TimestampMicrosecondType, | ||
| timestamp_parser, | ||
| tz, | ||
| to_tz | ||
| )? | ||
| } | ||
| _ => unreachable!("Invalid data type {:?} in cast from string", to_type), | ||
| }; | ||
| Ok(cast_array) | ||
|
|
@@ -967,20 +974,31 @@ fn get_timestamp_values<T: TimeZone>( | |
| timestamp_type: &str, | ||
| tz: &T, | ||
| ) -> SparkResult<Option<i64>> { | ||
| let values: Vec<_> = value.split(['T', '-', ':', '.']).collect(); | ||
| let year = values[0].parse::<i32>().unwrap_or_default(); | ||
| // Handle negative year: strip leading '-' and remember the sign. | ||
| let (sign, date_part) = if let Some(stripped) = value.strip_prefix('-') { | ||
| (-1i32, stripped) | ||
| } else { | ||
| (1i32, value) | ||
| }; | ||
| let mut parts = date_part.split(['T', ' ', '-', ':', '.']); | ||
| let year = sign | ||
| * parts | ||
| .next() | ||
| .unwrap_or("") | ||
| .parse::<i32>() | ||
| .unwrap_or_default(); | ||
|
|
||
| // NaiveDate (used internally by chrono's with_ymd_and_hms) is bounded to ±262142. | ||
| if !(-262143..=262142).contains(&year) { | ||
| return Ok(None); | ||
| } | ||
|
|
||
| let month = values.get(1).map_or(1, |m| m.parse::<u32>().unwrap_or(1)); | ||
| let day = values.get(2).map_or(1, |d| d.parse::<u32>().unwrap_or(1)); | ||
| let hour = values.get(3).map_or(0, |h| h.parse::<u32>().unwrap_or(0)); | ||
| let minute = values.get(4).map_or(0, |m| m.parse::<u32>().unwrap_or(0)); | ||
| let second = values.get(5).map_or(0, |s| s.parse::<u32>().unwrap_or(0)); | ||
| let microsecond = values.get(6).map_or(0, |ms| ms.parse::<u32>().unwrap_or(0)); | ||
| let month = parts.next().map_or(1, |m| m.parse::<u32>().unwrap_or(1)); | ||
| let day = parts.next().map_or(1, |d| d.parse::<u32>().unwrap_or(1)); | ||
| let hour = parts.next().map_or(0, |h| h.parse::<u32>().unwrap_or(0)); | ||
| let minute = parts.next().map_or(0, |m| m.parse::<u32>().unwrap_or(0)); | ||
| let second = parts.next().map_or(0, |s| s.parse::<u32>().unwrap_or(0)); | ||
| let microsecond = parts.next().map_or(0, |ms| ms.parse::<u32>().unwrap_or(0)); | ||
|
|
||
| let mut timestamp_info = TimeStampInfo::default(); | ||
|
|
||
|
|
@@ -1041,28 +1059,19 @@ fn parse_timestamp_to_micros<T: TimeZone>( | |
| timestamp_info.second, | ||
| ); | ||
|
|
||
| // Check if datetime is not None | ||
| let tz_datetime = match datetime.single() { | ||
| // Spark uses the offset before daylight savings change so we need to use earliest() | ||
| // Return None for LocalResult::None which is the invalid time in a DST spring forward gap). | ||
| let tz_datetime = match datetime.earliest() { | ||
| Some(dt) => dt | ||
| .with_timezone(tz) | ||
| .with_nanosecond(timestamp_info.microsecond * 1000), | ||
| None => { | ||
| return Err(SparkError::Internal( | ||
| "Failed to parse timestamp".to_string(), | ||
| )); | ||
| } | ||
| }; | ||
|
|
||
| let result = match tz_datetime { | ||
| Some(dt) => dt.timestamp_micros(), | ||
| None => { | ||
| return Err(SparkError::Internal( | ||
| "Failed to parse timestamp".to_string(), | ||
| )); | ||
| } | ||
| None => return Ok(None), | ||
| }; | ||
|
|
||
| Ok(Some(result)) | ||
| match tz_datetime { | ||
| Some(dt) => Ok(Some(dt.timestamp_micros())), | ||
| None => Ok(None), | ||
| } | ||
| } | ||
|
|
||
| fn parse_str_to_year_timestamp<T: TimeZone>(value: &str, tz: &T) -> SparkResult<Option<i64>> { | ||
|
|
@@ -1096,21 +1105,6 @@ fn parse_str_to_microsecond_timestamp<T: TimeZone>( | |
| get_timestamp_values(value, "microsecond", tz) | ||
| } | ||
|
|
||
| type TimestampPattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>); | ||
|
|
||
| static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{4,7}$").unwrap()); | ||
| static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}$").unwrap()); | ||
| static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}$").unwrap()); | ||
| static RE_HOUR: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{1,2}$").unwrap()); | ||
| static RE_MINUTE: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{2}:\d{2}$").unwrap()); | ||
| static RE_SECOND: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$").unwrap()); | ||
| static RE_MICROSECOND: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^\d{4,7}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap()); | ||
| static RE_TIME_ONLY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^T\d{1,2}$").unwrap()); | ||
|
|
||
| fn timestamp_parser<T: TimeZone>( | ||
| value: &str, | ||
| eval_mode: EvalMode, | ||
|
|
@@ -1120,15 +1114,103 @@ fn timestamp_parser<T: TimeZone>( | |
| if value.is_empty() { | ||
| return Ok(None); | ||
| } | ||
| let patterns: &[TimestampPattern<T>] = &[ | ||
| (&RE_YEAR, parse_str_to_year_timestamp), | ||
|
|
||
| // Handle Z or ±HH:MM offset suffix: strip it and parse with the explicit fixed offset. | ||
| if let Some((stripped, offset_secs)) = extract_offset_suffix(value) { | ||
| let fixed_tz = chrono::FixedOffset::east_opt(offset_secs) | ||
| .ok_or_else(|| SparkError::Internal("Invalid timezone offset".to_string()))?; | ||
| return timestamp_parser_with_tz(stripped, eval_mode, &fixed_tz); | ||
| } | ||
|
|
||
| timestamp_parser_with_tz(value, eval_mode, tz) | ||
| } | ||
|
|
||
| /// If `value` ends with a UTC offset suffix (`Z`, `+HH:MM`, or `-HH:MM`), returns the | ||
| /// stripped string and the offset in seconds. Returns `None` if no offset suffix is present. | ||
|
Comment on lines
+1128
to
+1129
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the behavior for offsets other than the ones that are supported so far?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll return None and eventually fail. The corresponding code in Spark
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue to support more formats - #3775
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. Spark's
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created #3776 to address this. |
||
| fn extract_offset_suffix(value: &str) -> Option<(&str, i32)> { | ||
| if let Some(stripped) = value.strip_suffix('Z') { | ||
| return Some((stripped, 0)); | ||
| } | ||
| // Check for ±HH:MM at the end (exactly 6 chars: sign + 2 digits + ':' + 2 digits) | ||
| if value.len() >= 6 { | ||
| let suffix_start = value.len() - 6; | ||
| let suffix = &value[suffix_start..]; | ||
| let sign_byte = suffix.as_bytes()[0]; | ||
| if (sign_byte == b'+' || sign_byte == b'-') && suffix.as_bytes()[3] == b':' { | ||
| if let (Ok(h), Ok(m)) = (suffix[1..3].parse::<i32>(), suffix[4..6].parse::<i32>()) { | ||
| let sign = if sign_byte == b'+' { 1i32 } else { -1i32 }; | ||
| return Some((&value[..suffix_start], sign * (h * 3600 + m * 60))); | ||
| } | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| type TimestampParsePattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>); | ||
|
|
||
| static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}$").unwrap()); | ||
| static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}$").unwrap()); | ||
| static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}$").unwrap()); | ||
| static RE_HOUR: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{1,2}$").unwrap()); | ||
| static RE_MINUTE: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{2}:\d{2}$").unwrap()); | ||
| static RE_SECOND: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}$").unwrap()); | ||
| static RE_MICROSECOND: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap()); | ||
| static RE_TIME_ONLY_H: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^T\d{1,2}$").unwrap()); | ||
| static RE_TIME_ONLY_HM: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^T\d{1,2}:\d{2}$").unwrap()); | ||
| static RE_TIME_ONLY_HMS: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^T\d{1,2}:\d{2}:\d{2}$").unwrap()); | ||
| static RE_TIME_ONLY_HMSU: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^T\d{1,2}:\d{2}:\d{2}\.\d{1,6}$").unwrap()); | ||
| static RE_BARE_HM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\d{1,2}:\d{2}$").unwrap()); | ||
| static RE_BARE_HMS: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^\d{1,2}:\d{2}:\d{2}$").unwrap()); | ||
| static RE_BARE_HMSU: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new(r"^\d{1,2}:\d{2}:\d{2}\.\d{1,6}$").unwrap()); | ||
|
|
||
| fn timestamp_parser_with_tz<T: TimeZone>( | ||
| value: &str, | ||
| eval_mode: EvalMode, | ||
| tz: &T, | ||
| ) -> SparkResult<Option<i64>> { | ||
| // Both T-separator and space-separator date-time forms are supported. | ||
| // Negative years are handled by get_timestamp_values detecting a leading '-'. | ||
| let patterns: &[TimestampParsePattern<T>] = &[ | ||
| // Year only: 4-7 digits, optionally negative | ||
| ( | ||
| &RE_YEAR, | ||
| parse_str_to_year_timestamp as fn(&str, &T) -> SparkResult<Option<i64>>, | ||
| ), | ||
| // Year-month | ||
| (&RE_MONTH, parse_str_to_month_timestamp), | ||
| // Year-month-day | ||
| (&RE_DAY, parse_str_to_day_timestamp), | ||
| // Date T-or-space hour (1 or 2 digits) | ||
| (&RE_HOUR, parse_str_to_hour_timestamp), | ||
| // Date T-or-space hour:minute | ||
| (&RE_MINUTE, parse_str_to_minute_timestamp), | ||
| // Date T-or-space hour:minute:second | ||
| (&RE_SECOND, parse_str_to_second_timestamp), | ||
| // Date T-or-space hour:minute:second.fraction | ||
| (&RE_MICROSECOND, parse_str_to_microsecond_timestamp), | ||
| (&RE_TIME_ONLY, parse_str_to_time_only_timestamp), | ||
| // Time-only: T hour (1 or 2 digits, no colon) | ||
| (&RE_TIME_ONLY_H, parse_str_to_time_only_timestamp), | ||
| // Time-only: T hour:minute | ||
| (&RE_TIME_ONLY_HM, parse_str_to_time_only_timestamp), | ||
| // Time-only: T hour:minute:second | ||
| (&RE_TIME_ONLY_HMS, parse_str_to_time_only_timestamp), | ||
| // Time-only: T hour:minute:second.fraction | ||
| (&RE_TIME_ONLY_HMSU, parse_str_to_time_only_timestamp), | ||
| // Bare time-only: hour:minute (without T prefix) | ||
| (&RE_BARE_HM, parse_str_to_time_only_timestamp), | ||
| // Bare time-only: hour:minute:second | ||
| (&RE_BARE_HMS, parse_str_to_time_only_timestamp), | ||
| // Bare time-only: hour:minute:second.fraction | ||
| (&RE_BARE_HMSU, parse_str_to_time_only_timestamp), | ||
| ]; | ||
|
|
||
| let mut timestamp = None; | ||
|
|
@@ -1157,23 +1239,43 @@ fn timestamp_parser<T: TimeZone>( | |
| } | ||
|
|
||
| fn parse_str_to_time_only_timestamp<T: TimeZone>(value: &str, tz: &T) -> SparkResult<Option<i64>> { | ||
| let values: Vec<&str> = value.split('T').collect(); | ||
| let time_values: Vec<u32> = values[1] | ||
| .split(':') | ||
| .map(|v| v.parse::<u32>().unwrap_or(0)) | ||
| .collect(); | ||
| // The 'T' is optional in the time format; strip it if specified. | ||
| let time_part = value.strip_prefix('T').unwrap_or(value); | ||
|
|
||
| // Parse time components: hour[:minute[:second[.fraction]]] | ||
| // Use splitn(3) so "12:34:56.789" splits into ["12", "34", "56.789"]. | ||
| let colon_parts: Vec<&str> = time_part.splitn(3, ':').collect(); | ||
| let hour: u32 = colon_parts[0].parse().unwrap_or(0); | ||
| let minute: u32 = colon_parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0); | ||
| let (second, nanosecond) = if let Some(sec_frac) = colon_parts.get(2) { | ||
| let dot_idx = sec_frac.find('.'); | ||
| let sec: u32 = sec_frac[..dot_idx.unwrap_or(sec_frac.len())] | ||
| .parse() | ||
| .unwrap_or(0); | ||
| let ns: u32 = if let Some(dot) = dot_idx { | ||
| let frac = &sec_frac[dot + 1..]; | ||
| // Interpret up to 6 digits as microseconds, padding with trailing zeros. | ||
| let trimmed = &frac[..frac.len().min(6)]; | ||
| let padded = format!("{:0<6}", trimmed); | ||
| padded.parse::<u32>().unwrap_or(0) * 1000 | ||
| } else { | ||
| 0 | ||
| }; | ||
| (sec, ns) | ||
| } else { | ||
| (0, 0) | ||
| }; | ||
|
|
||
| let datetime = tz.from_utc_datetime(&chrono::Utc::now().naive_utc()); | ||
| let timestamp = datetime | ||
| let result = datetime | ||
| .with_timezone(tz) | ||
| .with_hour(time_values.first().copied().unwrap_or_default()) | ||
| .and_then(|dt| dt.with_minute(*time_values.get(1).unwrap_or(&0))) | ||
| .and_then(|dt| dt.with_second(*time_values.get(2).unwrap_or(&0))) | ||
| .and_then(|dt| dt.with_nanosecond(*time_values.get(3).unwrap_or(&0) * 1_000)) | ||
| .map(|dt| dt.timestamp_micros()) | ||
| .unwrap_or_default(); | ||
|
|
||
| Ok(Some(timestamp)) | ||
| .with_hour(hour) | ||
| .and_then(|dt| dt.with_minute(minute)) | ||
| .and_then(|dt| dt.with_second(second)) | ||
| .and_then(|dt| dt.with_nanosecond(nanosecond)) | ||
| .map(|dt| dt.timestamp_micros()); | ||
|
|
||
| Ok(result) | ||
| } | ||
|
|
||
| //a string to date parser - port of spark's SparkDateTimeUtils#stringToDate. | ||
|
|
@@ -1343,7 +1445,8 @@ mod tests { | |
| eval_mode, | ||
| TimestampMicrosecondType, | ||
| timestamp_parser, | ||
| tz | ||
| tz, | ||
| "UTC" | ||
| ) | ||
| .unwrap(); | ||
|
|
||
|
|
@@ -1373,7 +1476,8 @@ mod tests { | |
| eval_mode, | ||
| TimestampMicrosecondType, | ||
| timestamp_parser, | ||
| tz | ||
| tz, | ||
| "UTC" | ||
| ); | ||
| assert!( | ||
| result.is_err(), | ||
|
|
@@ -1497,6 +1601,59 @@ mod tests { | |
| timestamp_parser("10000-01-01T12:34:56.123456", EvalMode::Legacy, tz).unwrap(), | ||
| Some(253402346096123456) | ||
| ); | ||
| // Space separator (same values as T separator) | ||
| assert_eq!( | ||
| timestamp_parser("2020-01-01 12", EvalMode::Legacy, tz).unwrap(), | ||
| Some(1577880000000000) | ||
| ); | ||
| assert_eq!( | ||
| timestamp_parser("2020-01-01 12:34", EvalMode::Legacy, tz).unwrap(), | ||
| Some(1577882040000000) | ||
| ); | ||
| assert_eq!( | ||
| timestamp_parser("2020-01-01 12:34:56", EvalMode::Legacy, tz).unwrap(), | ||
| Some(1577882096000000) | ||
| ); | ||
| assert_eq!( | ||
| timestamp_parser("2020-01-01 12:34:56.123456", EvalMode::Legacy, tz).unwrap(), | ||
| Some(1577882096123456) | ||
| ); | ||
| // Z suffix (UTC) | ||
| assert_eq!( | ||
| timestamp_parser("2020-01-01T12:34:56Z", EvalMode::Legacy, tz).unwrap(), | ||
| Some(1577882096000000) | ||
| ); | ||
| // Positive offset suffix | ||
| assert_eq!( | ||
| timestamp_parser("2020-01-01T12:34:56+05:30", EvalMode::Legacy, tz).unwrap(), | ||
| Some(1577862296000000) // 12:34:56 UTC+5:30 = 07:04:56 UTC | ||
| ); | ||
| // T-prefixed time-only with colon | ||
| assert!(timestamp_parser("T12:34", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some()); | ||
| assert!(timestamp_parser("T12:34:56", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some()); | ||
| assert!(timestamp_parser("T12:34:56.123456", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some()); | ||
| // Bare time-only (hour:minute without T prefix) | ||
| assert!(timestamp_parser("12:34", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some()); | ||
| assert!(timestamp_parser("12:34:56", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some()); | ||
| // Negative year | ||
| assert!(timestamp_parser("-0001", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some()); | ||
| assert!( | ||
| timestamp_parser("-0001-01-01T12:34:56", EvalMode::Legacy, tz) | ||
| .unwrap() | ||
| .is_some() | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do existing tests cover any DST spring forward gaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests in Sparks DateTimeUtilsSuite which do.