mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-26 22:20:06 +00:00
chore(utils): move encoding utils and process to their respective modules (#2029)
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
This commit is contained in:
@@ -1,67 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
def encode_sign_magnitude(value: int, sign_bit_index: int):
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude
|
||||
"""
|
||||
max_magnitude = (1 << sign_bit_index) - 1
|
||||
magnitude = abs(value)
|
||||
if magnitude > max_magnitude:
|
||||
raise ValueError(f"Magnitude {magnitude} exceeds {max_magnitude} (max for {sign_bit_index=})")
|
||||
|
||||
direction_bit = 1 if value < 0 else 0
|
||||
return (direction_bit << sign_bit_index) | magnitude
|
||||
|
||||
|
||||
def decode_sign_magnitude(encoded_value: int, sign_bit_index: int):
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude
|
||||
"""
|
||||
direction_bit = (encoded_value >> sign_bit_index) & 1
|
||||
magnitude_mask = (1 << sign_bit_index) - 1
|
||||
magnitude = encoded_value & magnitude_mask
|
||||
return -magnitude if direction_bit else magnitude
|
||||
|
||||
|
||||
def encode_twos_complement(value: int, n_bytes: int):
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/Signed_number_representations#Two%27s_complement
|
||||
"""
|
||||
|
||||
bit_width = n_bytes * 8
|
||||
min_val = -(1 << (bit_width - 1))
|
||||
max_val = (1 << (bit_width - 1)) - 1
|
||||
|
||||
if not (min_val <= value <= max_val):
|
||||
raise ValueError(
|
||||
f"Value {value} out of range for {n_bytes}-byte two's complement: [{min_val}, {max_val}]"
|
||||
)
|
||||
|
||||
if value >= 0:
|
||||
return value
|
||||
|
||||
return (1 << bit_width) + value
|
||||
|
||||
|
||||
def decode_twos_complement(value: int, n_bytes: int) -> int:
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/Signed_number_representations#Two%27s_complement
|
||||
"""
|
||||
bits = n_bytes * 8
|
||||
sign_bit = 1 << (bits - 1)
|
||||
if value & sign_bit:
|
||||
value -= 1 << bits
|
||||
return value
|
||||
@@ -1,83 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
|
||||
class ProcessSignalHandler:
|
||||
"""Utility class to attach graceful shutdown signal handlers.
|
||||
|
||||
The class exposes a shutdown_event attribute that is set when a shutdown
|
||||
signal is received. A counter tracks how many shutdown signals have been
|
||||
caught. On the second signal the process exits with status 1.
|
||||
"""
|
||||
|
||||
_SUPPORTED_SIGNALS = ("SIGINT", "SIGTERM", "SIGHUP", "SIGQUIT")
|
||||
|
||||
def __init__(self, use_threads: bool, display_pid: bool = False):
|
||||
# TODO: Check if we can use Event from threading since Event from
|
||||
# multiprocessing is the a clone of threading.Event.
|
||||
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Event
|
||||
if use_threads:
|
||||
from threading import Event
|
||||
else:
|
||||
from multiprocessing import Event
|
||||
|
||||
self.shutdown_event = Event()
|
||||
self._counter: int = 0
|
||||
self._display_pid = display_pid
|
||||
|
||||
self._register_handlers()
|
||||
|
||||
@property
|
||||
def counter(self) -> int: # pragma: no cover – simple accessor
|
||||
"""Number of shutdown signals that have been intercepted."""
|
||||
return self._counter
|
||||
|
||||
def _register_handlers(self):
|
||||
"""Attach the internal _signal_handler to a subset of POSIX signals."""
|
||||
|
||||
def _signal_handler(signum, frame):
|
||||
pid_str = ""
|
||||
if self._display_pid:
|
||||
pid_str = f"[PID: {os.getpid()}]"
|
||||
logging.info(f"{pid_str} Shutdown signal {signum} received. Cleaning up…")
|
||||
self.shutdown_event.set()
|
||||
self._counter += 1
|
||||
|
||||
# On a second Ctrl-C (or any supported signal) force the exit to
|
||||
# mimic the previous behaviour while giving the caller one chance to
|
||||
# shutdown gracefully.
|
||||
# TODO: Investigate if we need it later
|
||||
if self._counter > 1:
|
||||
logging.info("Force shutdown")
|
||||
sys.exit(1)
|
||||
|
||||
for sig_name in self._SUPPORTED_SIGNALS:
|
||||
sig = getattr(signal, sig_name, None)
|
||||
if sig is None:
|
||||
# The signal is not available on this platform (Windows for
|
||||
# instance does not provide SIGHUP, SIGQUIT…). Skip it.
|
||||
continue
|
||||
try:
|
||||
signal.signal(sig, _signal_handler)
|
||||
except (ValueError, OSError): # pragma: no cover – unlikely but safe
|
||||
# Signal not supported or we are in a non-main thread.
|
||||
continue
|
||||
Reference in New Issue
Block a user