Compare commits

...

1 Commits

Author SHA1 Message Date
Martino Russi bfa120f24b improve serial_reading 2026-02-26 15:07:10 +01:00
@@ -38,19 +38,23 @@ def parse_raw16(line: bytes) -> list[int] | None:
def read_raw_from_serial(ser) -> list[int] | None: def read_raw_from_serial(ser) -> list[int] | None:
"""Read latest sample from serial; if buffer is backed up, keep only the newest.""" """Read latest sample from serial; if buffer is backed up, keep only the newest."""
last = None try:
while ser.in_waiting > 0: last = None
b = ser.readline() while ser.in_waiting > 0:
if not b: b = ser.readline()
break if not b:
raw16 = parse_raw16(b) break
if raw16 is not None: raw16 = parse_raw16(b)
last = raw16 if raw16 is not None:
if last is None: last = raw16
b = ser.readline() if last is None:
if b: b = ser.readline()
last = parse_raw16(b) if b:
return last last = parse_raw16(b)
return last
except (OSError, serial.SerialException) as e:
logger.warning(f"Serial read error: {e}")
return None
@dataclass @dataclass
@@ -104,14 +108,20 @@ class ExoskeletonArm:
logger.warning(f"failed to load calibration: {e}") logger.warning(f"failed to load calibration: {e}")
def read_raw(self) -> list[int] | None: def read_raw(self) -> list[int] | None:
if not self._ser: if not self._ser or not self._ser.is_open:
return None return None
return read_raw_from_serial(self._ser) return read_raw_from_serial(self._ser)
def get_angles(self) -> dict[str, float]: def get_angles(self, raw: list[int] | None = None) -> dict[str, float]:
"""Convert raw ADC values to joint angles.
Args:
raw: Optional raw ADC values. If None, reads from serial.
"""
if not self.calibration: if not self.calibration:
raise RuntimeError("exoskeleton not calibrated") raise RuntimeError("exoskeleton not calibrated")
raw = self.read_raw() if raw is None:
raw = self.read_raw()
return {} if raw is None else exo_raw_to_angles(raw, self.calibration) return {} if raw is None else exo_raw_to_angles(raw, self.calibration)
def calibrate(self) -> None: def calibrate(self) -> None: