diff --git a/mapillary_tools/blackvue_parser.py b/mapillary_tools/blackvue_parser.py index ffb1bc4b..625c95dc 100644 --- a/mapillary_tools/blackvue_parser.py +++ b/mapillary_tools/blackvue_parser.py @@ -193,6 +193,50 @@ def _parse_nmea_lines( yield epoch_ms, message +def _detect_timezone_offset( + parsed_lines: list[tuple[float, pynmea2.NMEASentence]], +) -> float: + """ + Detect timezone offset between camera clock and GPS time. + + Tries RMC messages first (most reliable - has full date+time), + then falls back to GGA/GLL (less reliable - time only, no date). + Returns 0.0 if no offset could be determined. + """ + first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None + + for epoch_sec, message in parsed_lines: + if message.sentence_type == "RMC": + if hasattr(message, "is_valid") and message.is_valid: + offset = _compute_timezone_offset_from_rmc(epoch_sec, message) + if offset is not None: + LOG.debug( + "Computed timezone offset %.1fs from RMC (%s %s)", + offset, + message.datestamp, + message.timestamp, + ) + return offset + + if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]: + if hasattr(message, "is_valid") and message.is_valid: + first_valid_gga_gll = (epoch_sec, message) + + # Fallback: if no RMC found, try GGA/GLL (less reliable - no date info) + if first_valid_gga_gll is not None: + epoch_sec, message = first_valid_gga_gll + offset = _compute_timezone_offset_from_time_only(epoch_sec, message) + if offset is not None: + LOG.debug( + "Computed timezone offset %.1fs from %s (fallback, no date info)", + offset, + message.sentence_type, + ) + return offset + + return 0.0 + + def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]: """ >>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61")) @@ -210,83 +254,47 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]: >>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]")) [] """ - timezone_offset: float | None = None parsed_lines: list[tuple[float, pynmea2.NMEASentence]] = [] - first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None - # First pass: collect parsed_lines and compute timezone offset from the first valid RMC message + # First pass: collect parsed_lines for epoch_ms, message in _parse_nmea_lines(gps_data): # Rounding needed to avoid floating point precision issues epoch_sec = round(epoch_ms / 1000, 3) parsed_lines.append((epoch_sec, message)) - if timezone_offset is None and message.sentence_type == "RMC": - if hasattr(message, "is_valid") and message.is_valid: - timezone_offset = _compute_timezone_offset_from_rmc(epoch_sec, message) - if timezone_offset is not None: - LOG.debug( - "Computed timezone offset %.1fs from RMC (%s %s)", - timezone_offset, - message.datestamp, - message.timestamp, - ) - # Track first valid GGA/GLL for fallback - if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]: - if hasattr(message, "is_valid") and message.is_valid: - first_valid_gga_gll = (epoch_sec, message) - - # Fallback: if no RMC found, try GGA/GLL (less reliable - no date info) - if timezone_offset is None and first_valid_gga_gll is not None: - epoch_sec, message = first_valid_gga_gll - timezone_offset = _compute_timezone_offset_from_time_only(epoch_sec, message) - if timezone_offset is not None: - LOG.debug( - "Computed timezone offset %.1fs from %s (fallback, no date info)", - timezone_offset, - message.sentence_type, - ) - # If no offset could be determined, use 0 (camera clock assumed correct) - if timezone_offset is None: - timezone_offset = 0.0 + timezone_offset = _detect_timezone_offset(parsed_lines) points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {} # Second pass: apply offset to all GPS points for epoch_sec, message in parsed_lines: + if message.sentence_type not in ("GGA", "RMC", "GLL"): + continue + if not message.is_valid: + continue + corrected_epoch = round(epoch_sec + timezone_offset, 3) - # https://tavotech.com/gps-nmea-sentence-structure/ - if message.sentence_type in ["GGA"]: - if not message.is_valid: - continue - point = telemetry.GPSPoint( - time=corrected_epoch, - lat=message.latitude, - lon=message.longitude, - alt=message.altitude, - angle=None, - epoch_time=corrected_epoch, - fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None, - precision=None, - ground_speed=None, - ) - points_by_sentence_type.setdefault(message.sentence_type, []).append(point) - - elif message.sentence_type in ["RMC", "GLL"]: - if not message.is_valid: - continue - point = telemetry.GPSPoint( - time=corrected_epoch, - lat=message.latitude, - lon=message.longitude, - alt=None, - angle=None, - epoch_time=corrected_epoch, - fix=None, - precision=None, - ground_speed=None, - ) - points_by_sentence_type.setdefault(message.sentence_type, []).append(point) + # GGA has altitude and fix; RMC and GLL do not + if message.sentence_type == "GGA": + alt = message.altitude + fix = telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None + else: + alt = None + fix = None + + point = telemetry.GPSPoint( + time=corrected_epoch, + lat=message.latitude, + lon=message.longitude, + alt=alt, + angle=None, + epoch_time=corrected_epoch, + fix=fix, + precision=None, + ground_speed=None, + ) + points_by_sentence_type.setdefault(message.sentence_type, []).append(point) # This is the extraction order in exiftool if "RMC" in points_by_sentence_type: diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index a5c2fcc0..66d0d0ad 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -120,6 +120,86 @@ def _deduplicate_gps_points( return deduplicated_track +def _aggregate_float_values_same_length( + texts_by_tag: dict[str, list[str]], + tag: str | None, + expected_length: int, +) -> list[float | None]: + if tag is not None: + vals = [ + _maybe_float(val) + for val in _extract_alternative_fields(texts_by_tag, [tag], list) or [] + ] + else: + vals = [] + while len(vals) < expected_length: + vals.append(None) + return vals + + +def _aggregate_epoch_times( + texts_by_tag: dict[str, list[str]], + gps_time_tag: str | None, + time_tag: str | None, + timestamps: list[float | None], + expected_length: int, +) -> list[float | None]: + """Aggregate GPS epoch times from tags, with fallback to per-point timestamps.""" + if gps_time_tag is not None: + gps_epoch_times: list[float | None] = [ + geo.as_unix_time(dt) if dt is not None else None + for dt in ( + exif_read.parse_gps_datetime(text) + for text in _extract_alternative_fields( + texts_by_tag, [gps_time_tag], list + ) + or [] + ) + ] + if len(gps_epoch_times) != expected_length: + LOG.warning( + "Found different number of GPS epoch times %d and coordinates %d", + len(gps_epoch_times), + expected_length, + ) + gps_epoch_times = [None] * expected_length + return gps_epoch_times + elif time_tag is not None: + # Use per-point GPS timestamps as epoch times + return [t for t in timestamps] + else: + return [None] * expected_length + + +def _aggregate_timestamps( + texts_by_tag: dict[str, list[str]], + time_tag: str | None, + expected_length: int, +) -> list[float | None] | None: + """Aggregate timestamps from the time tag. + + Returns the timestamp list, or None if the lengths don't match + (caller should return [] in that case). + """ + if time_tag is not None: + dts = [ + exif_read.parse_gps_datetime(text) + for text in _extract_alternative_fields(texts_by_tag, [time_tag], list) + or [] + ] + timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts] + if expected_length != len(timestamps): + LOG.warning( + "Found different number of timestamps %d and coordinates %d", + len(timestamps), + expected_length, + ) + return None + else: + timestamps = [0.0] * expected_length + return timestamps + + def _aggregate_gps_track( texts_by_tag: dict[str, list[str]], time_tag: str | None, @@ -159,73 +239,29 @@ def _aggregate_gps_track( expected_length = len(lats) # aggregate timestamps (optional) - if time_tag is not None: - dts = [ - exif_read.parse_gps_datetime(text) - for text in _extract_alternative_fields(texts_by_tag, [time_tag], list) - or [] - ] - timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts] - if expected_length != len(timestamps): - # no idea what to do if we have different number of timestamps and coordinates - LOG.warning( - "Found different number of timestamps %d and coordinates %d", - len(timestamps), - expected_length, - ) - return [] - else: - timestamps = [0.0] * expected_length + timestamps = _aggregate_timestamps(texts_by_tag, time_tag, expected_length) + if timestamps is None: + return [] assert len(timestamps) == expected_length - def _aggregate_float_values_same_length( - tag: str | None, - ) -> list[float | None]: - if tag is not None: - vals = [ - _maybe_float(val) - for val in _extract_alternative_fields(texts_by_tag, [tag], list) or [] - ] - else: - vals = [] - while len(vals) < expected_length: - vals.append(None) - return vals - # aggregate altitudes (optional) - alts = _aggregate_float_values_same_length(alt_tag) + alts = _aggregate_float_values_same_length(texts_by_tag, alt_tag, expected_length) # aggregate directions (optional) - directions = _aggregate_float_values_same_length(direction_tag) + directions = _aggregate_float_values_same_length( + texts_by_tag, direction_tag, expected_length + ) # aggregate speeds (optional) - ground_speeds = _aggregate_float_values_same_length(ground_speed_tag) + ground_speeds = _aggregate_float_values_same_length( + texts_by_tag, ground_speed_tag, expected_length + ) # GPS epoch times (optional) - if gps_time_tag is not None: - gps_epoch_times: list[float | None] = [ - geo.as_unix_time(dt) if dt is not None else None - for dt in ( - exif_read.parse_gps_datetime(text) - for text in _extract_alternative_fields( - texts_by_tag, [gps_time_tag], list - ) - or [] - ) - ] - if len(gps_epoch_times) != expected_length: - LOG.warning( - "Found different number of GPS epoch times %d and coordinates %d", - len(gps_epoch_times), - expected_length, - ) - gps_epoch_times = [None] * expected_length - elif time_tag is not None: - # Use per-point GPS timestamps as epoch times - gps_epoch_times = [t for t in timestamps] - else: - gps_epoch_times = [None] * expected_length + gps_epoch_times = _aggregate_epoch_times( + texts_by_tag, gps_time_tag, time_tag, timestamps, expected_length + ) # build track track: list[GPSPoint] = [] diff --git a/mapillary_tools/gpmf/gpmf_parser.py b/mapillary_tools/gpmf/gpmf_parser.py index 1ba3c8db..e82329b7 100644 --- a/mapillary_tools/gpmf/gpmf_parser.py +++ b/mapillary_tools/gpmf/gpmf_parser.py @@ -561,6 +561,37 @@ def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): return values +_XYZDataT = T.TypeVar( + "_XYZDataT", + telemetry.AccelerationData, + telemetry.GyroscopeData, + telemetry.MagnetometerData, +) + + +def _accumulate_xyz_telemetry( + device_data: T.Sequence[KLVDict], + sample: Sample, + stream_key: bytes, + data_class: T.Type[_XYZDataT], + output_dict: dict[int, list[_XYZDataT]], + device_id: int, +) -> None: + """Extract XYZ telemetry (ACCL/GYRO/MAGN) from a device and accumulate into output_dict.""" + samples = _find_first_telemetry_stream(device_data, stream_key) + if samples: + avg_delta = sample.exact_timedelta / len(samples) + output_dict.setdefault(device_id, []).extend( + data_class( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(samples) + ) + + def _backfill_gps_timestamps(gps_points: T.Iterable[telemetry.GPSPoint]) -> None: it = iter(gps_points) @@ -626,49 +657,34 @@ def _load_telemetry_from_samples( device_points.extend(sample_points) if accls_by_dvid is not None: - sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL") - if sample_accls: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_accls) - accls_by_dvid.setdefault(device_id, []).extend( - telemetry.AccelerationData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, - ) - for idx, (z, x, y, *_) in enumerate(sample_accls) - ) + _accumulate_xyz_telemetry( + device["data"], + sample, + b"ACCL", + telemetry.AccelerationData, + accls_by_dvid, + device_id, + ) if gyros_by_dvid is not None: - sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO") - if sample_gyros: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_gyros) - gyros_by_dvid.setdefault(device_id, []).extend( - telemetry.GyroscopeData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, - ) - for idx, (z, x, y, *_) in enumerate(sample_gyros) - ) + _accumulate_xyz_telemetry( + device["data"], + sample, + b"GYRO", + telemetry.GyroscopeData, + gyros_by_dvid, + device_id, + ) if magns_by_dvid is not None: - sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN") - if sample_magns: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_magns) - magns_by_dvid.setdefault(device_id, []).extend( - telemetry.MagnetometerData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, - ) - for idx, (z, x, y, *_) in enumerate(sample_magns) - ) + _accumulate_xyz_telemetry( + device["data"], + sample, + b"MAGN", + telemetry.MagnetometerData, + magns_by_dvid, + device_id, + ) return device_found