diff --git a/4 beacons ver2/4 beacons data.csv b/4 beacons ver2/4 beacons data.csv new file mode 100644 index 0000000..a66d008 --- /dev/null +++ b/4 beacons ver2/4 beacons data.csv @@ -0,0 +1,5 @@ +Point_Name,X_m,Y_m,Z_m,X_Ri,Y_Ri,Z_Ri +beacon1,0,0,0,-0.0857,0.19,0.4667 +beacon2,0,0,0,0,0.2159,0.0381 +beacon3,0,0,0,0,-0.2159,0.0381 +beacon4,0,0,0,-0.0857,-0.19,0.4667 diff --git a/4 beacons ver2/__init__.py b/4 beacons ver2/__init__.py new file mode 100644 index 0000000..d705e69 --- /dev/null +++ b/4 beacons ver2/__init__.py @@ -0,0 +1 @@ +"""Four-beacon beacon solver copy.""" diff --git a/4 beacons ver2/__pycache__/io_utils.cpython-38.pyc b/4 beacons ver2/__pycache__/io_utils.cpython-38.pyc new file mode 100644 index 0000000..46cd461 Binary files /dev/null and b/4 beacons ver2/__pycache__/io_utils.cpython-38.pyc differ diff --git a/4 beacons ver2/__pycache__/math_utils.cpython-38.pyc b/4 beacons ver2/__pycache__/math_utils.cpython-38.pyc new file mode 100644 index 0000000..626889b Binary files /dev/null and b/4 beacons ver2/__pycache__/math_utils.cpython-38.pyc differ diff --git a/4 beacons ver2/__pycache__/model.cpython-38.pyc b/4 beacons ver2/__pycache__/model.cpython-38.pyc new file mode 100644 index 0000000..809c4d3 Binary files /dev/null and b/4 beacons ver2/__pycache__/model.cpython-38.pyc differ diff --git a/4 beacons ver2/__pycache__/position_data.cpython-38.pyc b/4 beacons ver2/__pycache__/position_data.cpython-38.pyc new file mode 100644 index 0000000..de9ffc2 Binary files /dev/null and b/4 beacons ver2/__pycache__/position_data.cpython-38.pyc differ diff --git a/4 beacons ver2/__pycache__/solver.cpython-38.pyc b/4 beacons ver2/__pycache__/solver.cpython-38.pyc new file mode 100644 index 0000000..6142cf5 Binary files /dev/null and b/4 beacons ver2/__pycache__/solver.cpython-38.pyc differ diff --git a/4 beacons ver2/io_utils.py b/4 beacons ver2/io_utils.py new file mode 100644 index 0000000..62683aa --- /dev/null +++ b/4 beacons ver2/io_utils.py @@ -0,0 +1,649 @@ +"""I/O helpers for the four-beacon solver. + +CSV data and serial data have different meanings: +- CSV columns 2-4 are optional measurement values from a file. +- CSV last three columns are real beacon positions Ri = [depth, Y, Z] in meters. +- Indexed serial lines are OpenMV measurements: beacon_number,y,z. +- Unlabeled serial lines are OpenMV measurements: y,z. In this mode the code + filters stable point clusters and infers beacon rows from this pivot sequence: + 1,2,3,4,1,4,3,2. +""" + +from __future__ import annotations + +from pathlib import Path +import time + +import numpy as np +import pandas as pd + +try: + from position_data import add_point +except ImportError: + add_point = None + + +class SerialNoDeviceError(RuntimeError): + """Raised when no serial device is available.""" + + +class ReplayInput: + """Serial-like reader for replaying local text files.""" + + def __init__(self, replay_file: str | Path) -> None: + self.replay_file = Path(replay_file) + self.file = None + + def __enter__(self) -> "ReplayInput": + self.file = self.replay_file.open("rb") + return self + + def __exit__(self, exc_type, exc, traceback) -> None: + if self.file is not None: + self.file.close() + + def readline(self) -> bytes: + if self.file is None: + return b"" + + line = self.file.readline() + if line: + return line + + self.file.seek(0) + return self.file.readline() + + +def get_config(csv_path: str | Path | None = None) -> tuple[np.ndarray, np.ndarray]: + """Load a four-beacon configuration CSV.""" + if csv_path is None: + try: + from tkinter import Tk, filedialog + except Exception as exc: + raise ValueError("csv_path is required when tkinter is unavailable") from exc + + root = Tk() + root.withdraw() + selected = filedialog.askopenfilename( + title="Select the 4-beacon configuration CSV file", + filetypes=[("CSV files", "*.csv"), ("All files", "*.*")], + ) + root.destroy() + + if not selected: + raise RuntimeError("Program Terminated: No configuration file was selected.") + + csv_path = selected + + csv_path = Path(csv_path) + config_data = pd.read_csv(csv_path) + + if config_data.shape[1] < 7: + raise ValueError("Configuration CSV must have at least 7 columns") + + if config_data.shape[0] != 4: + raise ValueError( + f"Four-beacon configuration must have exactly 4 rows, got {config_data.shape[0]}" + ) + + bmeasure = config_data.iloc[:, 1:4].to_numpy(dtype=float) + ri = config_data.iloc[:, -3:].to_numpy(dtype=float) + + print(f"Successfully loaded configuration from: {csv_path.name}") + print("Loaded 4 beacon positions from last three CSV columns.") + + return bmeasure, ri + + +def list_serial_ports() -> list[str]: + """Return available serial port names.""" + try: + from serial.tools import list_ports + except ImportError as exc: + raise ImportError( + "pyserial is required for serial input. Install it with: pip install pyserial" + ) from exc + + return [p.device for p in list_ports.comports()] + + +def parse_sensor_line(raw_data: str, rows: int) -> tuple[int, float, float] | None: + """Parse one indexed sensor line: beacon_number,y,z.""" + raw_data = raw_data.strip() + + if not raw_data: + return None + + parts = raw_data.split(",") + + if len(parts) < 3: + print(f"Serial warning: ignored invalid line: {raw_data!r}") + return None + + try: + beacon_number = int(round(float(parts[0]))) + y_val = float(parts[1]) + z_val = float(parts[2]) + except ValueError: + print(f"Serial warning: ignored non-numeric line: {raw_data!r}") + return None + + if not 1 <= beacon_number <= rows: + print(f"Serial warning: invalid beacon number {beacon_number}. Line ignored.") + return None + + return beacon_number - 1, y_val, z_val + + +def parse_unlabeled_sensor_line(raw_data: str) -> tuple[float, float] | None: + """Parse one unlabeled sensor line: y,z.""" + raw_data = raw_data.strip() + + if not raw_data: + return None + + parts = raw_data.split(",") + + if len(parts) != 2: + print(f"Serial warning: ignored invalid unlabeled line: {raw_data!r}") + return None + + try: + y_val = float(parts[0]) + z_val = float(parts[1]) + except ValueError: + print(f"Serial warning: ignored non-numeric unlabeled line: {raw_data!r}") + return None + + return y_val, z_val + + +def parse_quadrant_order(quadrant_order: str) -> list[str]: + """Return quadrant labels for beacon rows 1..4.""" + labels = [ + part.strip().upper() + for part in quadrant_order.replace(";", ",").split(",") + if part.strip() + ] + + valid = {"TR", "BR", "BL", "TL"} + + if len(labels) != 4 or set(labels) != valid: + raise ValueError( + 'quadrant_order must contain TR,BR,BL,TL exactly once, for example "TR,BR,BL,TL"' + ) + + return labels + + +def assign_quadrant_beacons( + points: list[tuple[float, float]], + corner_fraction: float = 0.25, + quadrant_order: str = "TR,BR,BL,TL", + min_points_per_quadrant: int = 5, +) -> list[tuple[int, float, float]]: + """Estimate four beacon corners from a batch of unlabeled PSD points.""" + if not 0.0 < corner_fraction <= 1.0: + raise ValueError("corner_fraction must be greater than 0 and at most 1") + + arr = np.asarray(points, dtype=float) + + if arr.ndim != 2 or arr.shape[1] != 2: + raise ValueError("points must be a list of y,z pairs") + + if arr.shape[0] < 4 * min_points_per_quadrant: + raise ValueError("Not enough points for quadrant assignment") + + x_values = arr[:, 0] + y_values = arr[:, 1] + + low_x, high_x = np.percentile(x_values, [5, 95]) + low_y, high_y = np.percentile(y_values, [5, 95]) + + mid_x = (low_x + high_x) / 2.0 + mid_y = (low_y + high_y) / 2.0 + + masks = { + "TR": (x_values >= mid_x) & (y_values >= mid_y), + "BR": (x_values >= mid_x) & (y_values < mid_y), + "BL": (x_values < mid_x) & (y_values < mid_y), + "TL": (x_values < mid_x) & (y_values >= mid_y), + } + + ideal_corners = { + "TR": np.array([high_x, high_y], dtype=float), + "BR": np.array([high_x, low_y], dtype=float), + "BL": np.array([low_x, low_y], dtype=float), + "TL": np.array([low_x, high_y], dtype=float), + } + + centers: dict[str, np.ndarray] = {} + + for label, mask in masks.items(): + quadrant_points = arr[mask] + + if quadrant_points.shape[0] < min_points_per_quadrant: + raise ValueError( + f"Not enough points in {label} quadrant. " + f"Got {quadrant_points.shape[0]}, need at least {min_points_per_quadrant}." + ) + + distances = np.linalg.norm(quadrant_points - ideal_corners[label], axis=1) + + keep_count = max( + min_points_per_quadrant, + int(np.ceil(quadrant_points.shape[0] * corner_fraction)), + ) + keep_count = min(keep_count, quadrant_points.shape[0]) + + closest = quadrant_points[np.argsort(distances)[:keep_count]] + centers[label] = np.median(closest, axis=0) + + ordered_labels = parse_quadrant_order(quadrant_order) + + print( + "Quadrant centers: " + + ", ".join( + f"{label}=({centers[label][0]:.4f}, {centers[label][1]:.4f})" + for label in ["TR", "BR", "BL", "TL"] + ) + ) + + print( + "Quadrant assignment: " + + ", ".join( + f"Beacon {i + 1}={label}" + for i, label in enumerate(ordered_labels) + ) + ) + + return [ + (row_index, centers[label][0], centers[label][1]) + for row_index, label in enumerate(ordered_labels) + ] + + +class UnlabeledBeaconTracker: + """Convert a fast unlabeled y,z stream into four labeled beacon rows.""" + + sequence = [0, 1, 2, 3, 0, 3, 2, 1] + + def __init__( + self, + rows: int, + stable_samples: int = 3, + stable_radius: float = 0.03, + pivot_repeat_radius: float = 0.25, + pivot_min_distance: float = 0.5, + pivot_neighbor: str = "auto-x", + ) -> None: + if rows != 4: + raise ValueError("Four-beacon unlabeled tracking requires exactly 4 beacons") + + if stable_samples < 2: + raise ValueError("stable_samples must be at least 2") + + if pivot_neighbor not in {"auto-x", "auto-x-inverted", "beacon2", "beacon4"}: + raise ValueError( + 'pivot_neighbor must be "auto-x", "auto-x-inverted", "beacon2", or "beacon4"' + ) + + self.stable_samples = stable_samples + self.stable_radius = stable_radius + self.pivot_repeat_radius = pivot_repeat_radius + self.pivot_min_distance = pivot_min_distance + self.pivot_neighbor = pivot_neighbor + + self.window: list[np.ndarray] = [] + self.recent_stable: list[np.ndarray] = [] + + self.synced = False + self.sequence_index = 0 + + self.release_radius = max(stable_radius * 3.0, 0.1) + self.last_stable_point: np.ndarray | None = None + self.waiting_for_movement = False + + def add_point(self, y_val: float, z_val: float) -> list[tuple[int, float, float]]: + stable_point = self._stable_point(y_val, z_val) + + if stable_point is None: + return [] + + if self.synced: + row_index = self.sequence[self.sequence_index] + self.sequence_index = (self.sequence_index + 1) % len(self.sequence) + + return [(row_index, stable_point[0], stable_point[1])] + + return self._sync_from_pivot(stable_point) + + def _stable_point(self, y_val: float, z_val: float) -> np.ndarray | None: + point = np.array([y_val, z_val], dtype=float) + + if self.waiting_for_movement and self.last_stable_point is not None: + if np.linalg.norm(point - self.last_stable_point) <= self.release_radius: + self.window.clear() + return None + + self.waiting_for_movement = False + self.window.clear() + + self.window.append(point) + + if len(self.window) > self.stable_samples: + self.window.pop(0) + + if len(self.window) < self.stable_samples: + return None + + points = np.vstack(self.window) + center = np.mean(points, axis=0) + distances = np.linalg.norm(points - center, axis=1) + + if float(np.max(distances)) > self.stable_radius: + return None + + self.window.clear() + self.last_stable_point = center + self.waiting_for_movement = True + + return center + + def _sync_from_pivot(self, stable_point: np.ndarray) -> list[tuple[int, float, float]]: + self.recent_stable.append(stable_point) + + if len(self.recent_stable) > 3: + self.recent_stable.pop(0) + + if len(self.recent_stable) < 3: + print( + f"Stable point found while syncing: " + f"({stable_point[0]:.4f}, {stable_point[1]:.4f})" + ) + return [] + + repeated_before, pivot, repeated_after = self.recent_stable + + repeat_distance = np.linalg.norm(repeated_before - repeated_after) + + pivot_distance = min( + np.linalg.norm(repeated_before - pivot), + np.linalg.norm(repeated_after - pivot), + ) + + print( + "Stable point found while syncing: " + f"({stable_point[0]:.4f}, {stable_point[1]:.4f}); " + f"pivot check repeat={repeat_distance:.4f}, pivot_dist={pivot_distance:.4f}" + ) + + if repeat_distance > self.pivot_repeat_radius or pivot_distance < self.pivot_min_distance: + return [] + + neighbor_index = self._infer_repeated_neighbor(pivot, repeated_after) + + if neighbor_index == 1: + self.sequence_index = 2 + else: + self.sequence_index = 6 + + self.synced = True + self.recent_stable.clear() + + print(f"Pivot sync found: beacon 1 plus repeated beacon {neighbor_index + 1}.") + + return [ + (0, pivot[0], pivot[1]), + (neighbor_index, repeated_after[0], repeated_after[1]), + ] + + def _infer_repeated_neighbor(self, pivot: np.ndarray, repeated: np.ndarray) -> int: + if self.pivot_neighbor == "beacon2": + return 1 + + if self.pivot_neighbor == "beacon4": + return 3 + + if self.pivot_neighbor == "auto-x-inverted": + return 3 if repeated[0] >= pivot[0] else 1 + + return 1 if repeated[0] >= pivot[0] else 3 + + +def get_serial( + port: str | None = None, + baud_rate: int = 115200, + replay_file: str | Path | None = None, + rows: int = 4, + min_required: int | None = None, + timeout_seconds: float | None = None, + serial_input: str = "unlabeled", + serial_format: str = "raw", + unlabeled_method: str = "quadrant", + cluster_samples: int = 300, + corner_fraction: float = 0.25, + quadrant_order: str = "TR,BR,BL,TL", + image_width: float = 320.0, + image_height: float = 240.0, + flip_y: bool = True, + stable_samples: int = 3, + stable_radius: float = 0.03, + pivot_repeat_radius: float = 0.25, + pivot_min_distance: float = 0.5, + pivot_neighbor: str = "auto-x", +) -> np.ndarray: + """Read serial measurements until enough unique beacon rows are received.""" + if rows != 4: + raise ValueError("Four-beacon serial input requires rows=4") + + if min_required is None: + min_required = rows + + if min_required < 1 or min_required > rows: + raise ValueError("min_required must be between 1 and rows") + + if serial_format not in {"raw", "pixel"}: + raise ValueError('serial_format must be either "raw" or "pixel"') + + if serial_input not in {"indexed", "unlabeled"}: + raise ValueError('serial_input must be either "indexed" or "unlabeled"') + + if unlabeled_method not in {"quadrant", "pivot"}: + raise ValueError('unlabeled_method must be either "quadrant" or "pivot"') + + if cluster_samples < 4: + raise ValueError("cluster_samples must be at least 4") + + parse_quadrant_order(quadrant_order) + + if replay_file is None: + try: + import serial + except ImportError as exc: + raise ImportError( + "pyserial is required for serial input. Install it with: pip install pyserial" + ) from exc + else: + serial = None + replay_path = Path(replay_file) + + if not replay_path.exists(): + raise FileNotFoundError(f"Replay file not found: {replay_path}") + + if replay_file is None and port is None: + available_ports = list_serial_ports() + + if not available_ports: + raise SerialNoDeviceError("No serial connections detected. Looking for file instead.") + + port = available_ports[0] + print(f"Available serial ports: {available_ports}") + + if replay_file is None: + print(f"Connecting to device on {port} at {baud_rate} baud...") + else: + print(f"Replaying serial data from: {Path(replay_file)}") + + print(f"Waiting for {min_required} unique beacon measurement(s)...") + + if serial_input == "indexed": + print('Expected format: "beacon_number,y,z", for example: "1,36,32"') + else: + print('Expected format: "y,z", for example: "0.0000,-6.5000"') + print(f"Unlabeled method: {unlabeled_method}") + + if unlabeled_method == "quadrant": + print(f"Quadrant mode: collecting {cluster_samples} samples before assignment.") + else: + print("Pivot sequence: 1,2,3,4,1,4,3,2") + print( + "Unlabeled filter: " + f"{stable_samples} samples within {stable_radius} radius; " + f"pivot repeat radius {pivot_repeat_radius}" + ) + + print(f"Serial format: {serial_format}") + + bmeasure = np.zeros((rows, 3), dtype=float) + received = np.zeros(rows, dtype=bool) + + start_time = time.time() + + tracker = ( + UnlabeledBeaconTracker( + rows=rows, + stable_samples=stable_samples, + stable_radius=stable_radius, + pivot_repeat_radius=pivot_repeat_radius, + pivot_min_distance=pivot_min_distance, + pivot_neighbor=pivot_neighbor, + ) + if serial_input == "unlabeled" and unlabeled_method == "pivot" + else None + ) + + input_source = ( + ReplayInput(replay_file) + if replay_file is not None + else serial.Serial(port, baud_rate, timeout=1) + ) + + with input_source as open_mv: + if serial_input == "unlabeled" and unlabeled_method == "quadrant": + raw_points: list[tuple[float, float]] = [] + + while len(raw_points) < cluster_samples: + if timeout_seconds is not None and time.time() - start_time > timeout_seconds: + raise TimeoutError( + f"Timed out waiting for serial data. " + f"Collected {len(raw_points)} of {cluster_samples} samples." + ) + + raw_data = open_mv.readline().decode("utf-8", errors="replace").strip() + + parsed_unlabeled = parse_unlabeled_sensor_line(raw_data) + + if parsed_unlabeled is None: + continue + + y_val, z_val = parsed_unlabeled + + if add_point is not None: + add_point(y_val, z_val) + + raw_points.append(parsed_unlabeled) + + accepted = assign_quadrant_beacons( + raw_points, + corner_fraction=corner_fraction, + quadrant_order=quadrant_order, + ) + + for row_index, y_val, z_val in accepted: + if serial_format == "pixel": + pixel_x = y_val + pixel_y = z_val + y_measure = pixel_x - image_width / 2.0 + + if flip_y: + z_measure = image_height / 2.0 - pixel_y + else: + z_measure = pixel_y - image_height / 2.0 + else: + y_measure = y_val + z_measure = z_val + + bmeasure[row_index, 1] = y_measure + bmeasure[row_index, 2] = z_measure + received[row_index] = True + + print( + f"Beacon {row_index + 1}: raw=({y_val}, {z_val}), " + f"measurement=({y_measure}, {z_measure}) " + f"[{np.count_nonzero(received)}/{min_required}]" + ) + + print("Matrix bmeasure successfully populated.") + + return bmeasure + + while np.count_nonzero(received) < min_required: + if timeout_seconds is not None and time.time() - start_time > timeout_seconds: + raise TimeoutError( + f"Timed out waiting for serial data. " + f"Received {np.count_nonzero(received)} of {min_required} required beacons." + ) + + raw_data = open_mv.readline().decode("utf-8", errors="replace").strip() + + if serial_input == "indexed": + parsed = parse_sensor_line(raw_data, rows) + + if parsed is None: + continue + + row_index, y_val, z_val = parsed + accepted = [(row_index, y_val, z_val)] + + else: + parsed_unlabeled = parse_unlabeled_sensor_line(raw_data) + + if parsed_unlabeled is None: + continue + + y_val, z_val = parsed_unlabeled + + if add_point is not None: + add_point(y_val, z_val) + + assert tracker is not None + accepted = tracker.add_point(y_val, z_val) + + for row_index, y_val, z_val in accepted: + if serial_format == "pixel": + pixel_x = y_val + pixel_y = z_val + y_measure = pixel_x - image_width / 2.0 + + if flip_y: + z_measure = image_height / 2.0 - pixel_y + else: + z_measure = pixel_y - image_height / 2.0 + else: + y_measure = y_val + z_measure = z_val + + bmeasure[row_index, 1] = y_measure + bmeasure[row_index, 2] = z_measure + received[row_index] = True + + print( + f"Beacon {row_index + 1}: raw=({y_val}, {z_val}), " + f"measurement=({y_measure}, {z_measure}) " + f"[{np.count_nonzero(received)}/{min_required}]" + ) + + print("Matrix bmeasure successfully populated.") + + return bmeasure \ No newline at end of file diff --git a/4 beacons ver2/main.py b/4 beacons ver2/main.py new file mode 100644 index 0000000..8526a81 --- /dev/null +++ b/4 beacons ver2/main.py @@ -0,0 +1,122 @@ +"""Command-line entry point for the four-beacon solver.""" + +from __future__ import annotations + +import argparse +import threading + +from solver import run_loop + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Four-beacon solver") + + parser.add_argument("--config", help="Path to 4-beacon configuration CSV.") + parser.add_argument("--no-serial", action="store_true") + parser.add_argument("--port", help="Serial port, for example COM4.") + parser.add_argument("--baud-rate", type=int, default=115200) + parser.add_argument("--replay-file") + parser.add_argument("--min-beacons", type=int) + parser.add_argument("--serial-timeout", type=float) + + parser.add_argument( + "--serial-input", + choices=["indexed", "unlabeled"], + default="unlabeled", + ) + + parser.add_argument( + "--serial-format", + choices=["raw", "pixel"], + default="raw", + ) + + parser.add_argument( + "--unlabeled-method", + choices=["quadrant", "pivot"], + default="quadrant", + ) + + parser.add_argument("--cluster-samples", type=int, default=60) + parser.add_argument("--corner-fraction", type=float, default=0.25) + parser.add_argument("--quadrant-order", default="TR,BR,BL,TL") + + parser.add_argument("--image-width", type=float, default=320.0) + parser.add_argument("--image-height", type=float, default=240.0) + parser.add_argument("--no-flip-y", action="store_true") + + parser.add_argument("--stable-samples", type=int, default=5) + parser.add_argument("--stable-radius", type=float, default=0.03) + parser.add_argument("--pivot-repeat-radius", type=float, default=0.25) + parser.add_argument("--pivot-min-distance", type=float, default=0.5) + + parser.add_argument( + "--pivot-neighbor", + choices=["auto-x", "auto-x-inverted", "beacon2", "beacon4"], + default="auto-x", + ) + + parser.add_argument("--focal-length", type=float, default=26.0) + parser.add_argument("--tolerance", type=float, default=1e-3) + parser.add_argument("--max-iters", type=int, default=60) + parser.add_argument("--once", action="store_true") + + parser.add_argument( + "--live-plot", + action="store_true", + help="Show live Y,Z plot while solver is running.", + ) + + return parser.parse_args() + + +def run_solver(args): + run_loop( + csv_path=args.config, + use_serial=not args.no_serial, + focal_length=args.focal_length, + tolerance=args.tolerance, + max_iters=args.max_iters, + once=args.once, + port=args.port, + baud_rate=args.baud_rate, + replay_file=args.replay_file, + min_beacons=args.min_beacons, + serial_timeout=args.serial_timeout, + serial_input=args.serial_input, + serial_format=args.serial_format, + unlabeled_method=args.unlabeled_method, + cluster_samples=args.cluster_samples, + corner_fraction=args.corner_fraction, + quadrant_order=args.quadrant_order, + image_width=args.image_width, + image_height=args.image_height, + flip_y=not args.no_flip_y, + stable_samples=args.stable_samples, + stable_radius=args.stable_radius, + pivot_repeat_radius=args.pivot_repeat_radius, + pivot_min_distance=args.pivot_min_distance, + pivot_neighbor=args.pivot_neighbor, + ) + + +def main() -> None: + args = parse_args() + + if args.live_plot: + from position_data import start_plot + + solver_thread = threading.Thread( + target=run_solver, + args=(args,), + daemon=True + ) + solver_thread.start() + + start_plot() + else: + run_solver(args) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/4 beacons ver2/math_utils.py b/4 beacons ver2/math_utils.py new file mode 100644 index 0000000..2b46b74 --- /dev/null +++ b/4 beacons ver2/math_utils.py @@ -0,0 +1,51 @@ +"""Math utilities converted from MATLAB files: skewp.m, DCM.m, unit.m.""" + +from __future__ import annotations + +import numpy as np + + +def as_col3(vector: np.ndarray | list | tuple) -> np.ndarray: + """Return input as a 3-element float vector with shape (3,).""" + arr = np.asarray(vector, dtype=float).reshape(-1) #convert to decimal umber + if arr.size != 3: + raise ValueError(f"Expected a 3-element vector, got shape {np.asarray(vector).shape}") + return arr + + +def skewp(p: np.ndarray | list | tuple) -> np.ndarray: + """Return the 3x3 skew-symmetric matrix for vector p.""" + p = as_col3(p) + return np.array( + [ + [0.0, -p[2], p[1]], + [p[2], 0.0, -p[0]], + [-p[1], p[0], 0.0], + ], + dtype=float, + ) + + +def dcm(p: np.ndarray | list | tuple) -> np.ndarray: + """Direction cosine matrix converted from DCM.m.""" + p = as_col3(p) + identity = np.eye(3) + pcross = skewp(p) + p_dot = float(p @ p) # dot product, matrix multiply + numerator = 8.0 * (pcross @ pcross) - 4.0 * (1.0 - p_dot) * pcross + denominator = (1.0 + p_dot) ** 2 + return identity + numerator / denominator + + +def unit(bmeasure: np.ndarray | list | tuple) -> np.ndarray: + """Normalize each row of a measurement matrix to unit length.""" + bmeasure = np.asarray(bmeasure, dtype=float) + if bmeasure.ndim != 2: + raise ValueError("bmeasure must be a 2D array") + + # Computes the length of each row. + norms = np.linalg.norm(bmeasure, axis=1, keepdims=True) + + if np.any(norms == 0): + raise ValueError("Cannot normalize a row with zero norm") + return bmeasure / norms diff --git a/4 beacons ver2/model.py b/4 beacons ver2/model.py new file mode 100644 index 0000000..ef4a375 --- /dev/null +++ b/4 beacons ver2/model.py @@ -0,0 +1,82 @@ +"""Core beacon solver math converted from Bibguess.m, pos_partial.m, orient_partial.m, MSM.m.""" + +from __future__ import annotations + +import numpy as np + +from math_utils import as_col3, dcm, skewp + + +def bibguess(p: np.ndarray | list | tuple, rs: np.ndarray | list | tuple, ri: np.ndarray | list | tuple) -> tuple[np.ndarray, np.ndarray]: + """Compute Bi and bguess for one beacon.""" + rs = as_col3(rs) # target/camera position guess + ri = as_col3(ri) # known beacon position + + diff = ri - rs + denom = float(diff @ diff) + if denom == 0: + raise ValueError("Ri and Rs are identical; cannot compute line-of-sight unit vector") + + bi = diff / np.sqrt(denom) + bguess = dcm(p) @ bi + return bi, bguess + + +def pos_partial(bi: np.ndarray | list | tuple, c: np.ndarray, rs: np.ndarray | list | tuple, ri: np.ndarray | list | tuple) -> np.ndarray: + """Position partial derivative matrix converted from pos_partial.m.""" + bi = as_col3(bi) + rs = as_col3(rs) + ri = as_col3(ri) + + denom = np.linalg.norm(ri - rs) + if denom == 0: + raise ValueError("Ri and Rs are identical; position partial is undefined") + + identity = np.eye(3) + return (-(c @ (identity - np.outer(bi, bi)))) / denom + + +def orient_partial(bguess: np.ndarray | list | tuple, p: np.ndarray | list | tuple) -> np.ndarray: + """Orientation partial derivative matrix converted from orient_partial.m.""" + bguess = as_col3(bguess) + p = as_col3(p) + + identity = np.eye(3) + bcross = skewp(bguess) + pcross = skewp(p) + p_dot = float(p @ p) + + leftside = (4.0 / (1.0 + p_dot) ** 2) * bcross + rightside = (1.0 - p_dot) * identity - 2.0 * pcross + 2.0 * np.outer(p, p) + return leftside @ rightside + + +def msm(p: np.ndarray | list | tuple, rs: np.ndarray | list | tuple, bmeasure: np.ndarray | list | tuple, ri: np.ndarray | list | tuple, n_beacons: int = 4) -> tuple[np.ndarray, np.ndarray]: + """Build measurement sensitivity matrix H and residual vector deltab.""" + p = as_col3(p) + rs = as_col3(rs) + ri = np.asarray(ri, dtype=float) + bmeasure = np.asarray(bmeasure, dtype=float).reshape(-1) + + if ri.shape[0] < n_beacons or ri.shape[1] != 3: + raise ValueError(f"ri must have at least {n_beacons} rows and exactly 3 columns") + expected_measurements = 3 * n_beacons + if bmeasure.size != expected_measurements: + raise ValueError(f"bmeasure must contain {expected_measurements} values, got {bmeasure.size}") + + c = dcm(p) + h_blocks: list[np.ndarray] = [] + bguess_all: list[np.ndarray] = [] + + for i in range(n_beacons): + beacon_ri = ri[i, :] + bi, bguess = bibguess(p, rs, beacon_ri) + hi_rs = pos_partial(bi, c, rs, beacon_ri) + hi_p = orient_partial(bguess, p) + h_blocks.append(np.hstack((hi_rs, hi_p))) + bguess_all.append(bguess) + + h = np.vstack(h_blocks) + bguess_vector = np.concatenate(bguess_all) # need to check + deltab = bmeasure - bguess_vector + return h, deltab diff --git a/4 beacons ver2/position_data.py b/4 beacons ver2/position_data.py new file mode 100644 index 0000000..90b84c1 --- /dev/null +++ b/4 beacons ver2/position_data.py @@ -0,0 +1,71 @@ +from collections import deque +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.animation import FuncAnimation + +TRAIL_LENGTH = 100 +points = deque(maxlen=TRAIL_LENGTH) + +fig = None +ax = None +scatter = None +line_plot = None +colors = None + + +def add_point(x, y): + points.append((x, y)) + + +def start_plot(): + global fig, ax, scatter, line_plot, colors + + fig, ax = plt.subplots(figsize=(8, 8)) + + ax.set_xlim(-5, 5) + ax.set_ylim(-5, 5) + ax.set_aspect("equal") + + ax.set_xticks(np.arange(-5, 6, 1)) + ax.set_yticks(np.arange(-5, 6, 1)) + ax.set_xticks(np.arange(-5, 5.1, 0.1), minor=True) + ax.set_yticks(np.arange(-5, 5.1, 0.1), minor=True) + + ax.grid(which="major", linestyle="-", linewidth=0.6, alpha=0.6) + ax.grid(which="minor", linestyle="-", linewidth=0.5, alpha=0.6) + + ax.axhline(0, color="black", lw=1.0) + ax.axvline(0, color="black", lw=1.0) + + ax.set_title("Live Serial Position Tracker") + ax.set_xlabel("X") + ax.set_ylabel("Y") + + colors = np.zeros((TRAIL_LENGTH, 4)) + for i in range(TRAIL_LENGTH): + t = i / (TRAIL_LENGTH - 1) + colors[i] = (1 - t, 0, t, 1) + + scatter = ax.scatter([], [], s=3) + line_plot, = ax.plot([], [], lw=0.8, color="cyan", alpha=0.6) + + ani = FuncAnimation( + fig, + update_plot, + interval=20, + blit=True, + cache_frame_data=False + ) + + plt.show() + + +def update_plot(_): + if len(points) > 0: + data = np.array(points) + + scatter.set_offsets(data) + scatter.set_color(colors[-len(data):]) + line_plot.set_data(data[:, 0], data[:, 1]) + + return scatter, line_plot \ No newline at end of file diff --git a/4 beacons ver2/solver.py b/4 beacons ver2/solver.py new file mode 100644 index 0000000..60be93e --- /dev/null +++ b/4 beacons ver2/solver.py @@ -0,0 +1,202 @@ +"""Runnable four-beacon solver.""" + +from __future__ import annotations + +from dataclasses import dataclass +import time + +import numpy as np + +from io_utils import SerialNoDeviceError, get_config, get_serial +from math_utils import dcm, unit +from model import msm + + +@dataclass +class SolveResult: + rs: np.ndarray + p: np.ndarray + iterations: int + converged: bool + + +def prepare_bmeasure_vector(bmeasure: np.ndarray, focal_length: float = 320.0) -> np.ndarray: + """Apply focal length and flatten measurements beacon by beacon.""" + bmeasure = np.asarray(bmeasure, dtype=float).copy() + if bmeasure.ndim != 2 or bmeasure.shape[1] != 3: + raise ValueError("bmeasure must be a matrix with 3 columns") + + if np.all(bmeasure[:, 0] == 0): + bmeasure[:, 0] = focal_length + + b_unit = unit(bmeasure) + return b_unit.reshape(-1) + + +def has_real_measurements(bmeasure: np.ndarray) -> bool: + """Return True when bmeasure has non-zero camera/image-plane data.""" + bmeasure = np.asarray(bmeasure, dtype=float) + return bmeasure.ndim == 2 and bmeasure.shape[1] == 3 and np.any(bmeasure[:, 1:] != 0) + + +def print_pose_details(result: SolveResult) -> None: + """Print pose plus derived values that are easier to compare to a tape measure.""" + distance_m = float(np.linalg.norm(result.rs)) + object_center_camera_frame = dcm(result.p) @ (-result.rs) + + print("object center:", object_center_camera_frame) + print("Rs:", result.rs) + print("p:", result.p) + print(f"distance: {distance_m:.4f} m ({distance_m * 3.28084:.2f} ft)") + # print("object center in camera frame:", object_center_camera_frame) + + +def solve_pose( + bmeasure: np.ndarray, + ri: np.ndarray, + guess_rs: np.ndarray | None = None, + guess_p: np.ndarray | None = None, + weight: np.ndarray | None = None, + tolerance: float = 1e-5, + max_iters: int = 200, + focal_length: float = 320.0, +) -> SolveResult: + """Run the iterative least-squares pose solve.""" + guess_rs = np.zeros(3, dtype=float) if guess_rs is None else np.asarray(guess_rs, dtype=float).reshape(3) + guess_p = np.zeros(3, dtype=float) if guess_p is None else np.asarray(guess_p, dtype=float).reshape(3) + ri = np.asarray(ri, dtype=float) + n_beacons = ri.shape[0] + + b_unit_vector = prepare_bmeasure_vector(bmeasure, focal_length=focal_length) + weight = np.eye(b_unit_vector.size) if weight is None else np.asarray(weight, dtype=float) + + for iteration in range(1, max_iters + 1): + if not np.all(np.isfinite(guess_rs)) or not np.all(np.isfinite(guess_p)): + return SolveResult(rs=guess_rs, p=guess_p, iterations=iteration, converged=False) + + if np.linalg.norm(guess_rs) > 1e6 or np.linalg.norm(guess_p) > 1e6: + return SolveResult(rs=guess_rs, p=guess_p, iterations=iteration, converged=False) + + try: + h, deltab = msm(guess_p, guess_rs, b_unit_vector, ri, n_beacons=n_beacons) + except OverflowError: + return SolveResult(rs=guess_rs, p=guess_p, iterations=iteration, converged=False) + + normal_matrix = h.T @ weight @ h + rhs = h.T @ weight @ deltab + + try: + deltax = np.linalg.solve(normal_matrix, rhs) + except np.linalg.LinAlgError: + deltax = np.linalg.lstsq(normal_matrix, rhs, rcond=None)[0] + + if not np.all(np.isfinite(deltax)): + return SolveResult(rs=guess_rs, p=guess_p, iterations=iteration, converged=False) + + guess_rs = guess_rs + deltax[:3] + guess_p = guess_p + deltax[3:6] + + if np.linalg.norm(deltax) < tolerance: + return SolveResult(rs=guess_rs, p=guess_p, iterations=iteration, converged=True) + + return SolveResult(rs=guess_rs, p=guess_p, iterations=max_iters, converged=False) + + +def run_loop( + csv_path: str | None = None, + use_serial: bool = True, + focal_length: float = 320.0, + tolerance: float = 1e-5, + max_iters: int = 200, + once: bool = False, + port: str | None = None, + baud_rate: int = 115200, + replay_file: str | None = None, + min_beacons: int | None = None, + serial_timeout: float | None = None, + serial_input: str = "unlabeled", + serial_format: str = "raw", + unlabeled_method: str = "quadrant", + cluster_samples: int = 300, + corner_fraction: float = 0.25, + quadrant_order: str = "TR,BR,BL,TL", + image_width: float = 320.0, + image_height: float = 240.0, + flip_y: bool = True, + stable_samples: int = 3, + stable_radius: float = 0.03, + pivot_repeat_radius: float = 0.25, + pivot_min_distance: float = 0.5, + pivot_neighbor: str = "auto-x", +) -> None: + """Run the MASTERLOOP-style workflow for four beacons.""" + bmeasure, ri = get_config(csv_path) + + if ri.shape[0] != 4: + raise ValueError(f"Four-beacon solver requires exactly 4 CSV rows, got {ri.shape[0]}") + + if min_beacons is None: + min_beacons = ri.shape[0] + + while True: + if use_serial: + try: + bmeasure = get_serial( + port=port, + baud_rate=baud_rate, + replay_file=replay_file, + rows=ri.shape[0], + min_required=min_beacons, + timeout_seconds=serial_timeout, + serial_input=serial_input, + serial_format=serial_format, + unlabeled_method=unlabeled_method, + cluster_samples=cluster_samples, + corner_fraction=corner_fraction, + quadrant_order=quadrant_order, + image_width=image_width, + image_height=image_height, + flip_y=flip_y, + stable_samples=stable_samples, + stable_radius=stable_radius, + pivot_repeat_radius=pivot_repeat_radius, + pivot_min_distance=pivot_min_distance, + pivot_neighbor=pivot_neighbor, + ) + except SerialNoDeviceError as exc: + print(exc) + if once: + break + print("Waiting for a serial device...") + time.sleep(1) + continue + except Exception as exc: + print(f"Serial read failed with error: {exc}") + if once: + break + print("Waiting for better serial data...") + time.sleep(1) + continue + else: + if not has_real_measurements(bmeasure): + print("No serial input is being used, and the CSV does not contain measurement data.") + print("This is only a config file, so the solver will not run.") + break + + result = solve_pose( + bmeasure=bmeasure, + ri=ri, + tolerance=tolerance, + max_iters=max_iters, + focal_length=focal_length, + ) + + if result.converged: + print_pose_details(result) + else: + print("Solver did not converge with the current measurements.") + print("Last Rs:", result.rs) + print("Last p:", result.p) + + if once: + break