Source code for payloadcomputerdroneprojekt.communications.helper

from mavsdk.telemetry import PositionVelocityNed, PositionNed, VelocityNed
import math
from scipy.spatial.transform import Rotation as R
from payloadcomputerdroneprojekt.helper import smart_print as sp
from typing import Callable, AsyncGenerator, List, TypeVar, Optional
import numpy as np
import asyncio

T = TypeVar('T')


[docs] def save_execute(msg: str): """ Decorator to wrap a function and catch exceptions, printing a message if an error occurs. Works for both sync and async functions. """ def wrapper(f): if asyncio.iscoroutinefunction(f): async def wrap(*args, **kwargs): try: return await f(*args, **kwargs) except Exception as e: sp(f"{msg}, Error: {e}") return wrap else: def wrap(*args, **kwargs): try: return f(*args, **kwargs) except Exception as e: sp(f"{msg}, Error: {e}") return wrap return wrapper
[docs] def reached_pos( target: List[float], error: float = 0.5, error_vel: float = 0.1 ) -> Callable[[PositionVelocityNed], bool]: """ Returns a function that checks if a drone has reached a target position and is below a velocity threshold. :param target: Target position as [north, east, down] in meters. :type target: list :param error: Allowed position error in meters. :type error: float, optional :param error_vel: Allowed velocity error in m/s. :type error_vel: float, optional :return: Function that takes PositionVelocityNed and returns True if target is reached. :rtype: function """ def func(state: PositionVelocityNed) -> bool: return (pythagoras(get_pos_vec(state), target) < error ) and (abs_vel(get_vel_vec(state)) < error_vel) return func
[docs] def get_pos_vec(state: PositionVelocityNed) -> List[float]: """ Extracts the position vector from a PositionVelocityNed object. :param state: MAVSDK PositionVelocityNed object. :type state: PositionVelocityNed :return: Position as [north, east, down] in meters. :rtype: list """ pos: PositionNed = state.position return [pos.north_m, pos.east_m, pos.down_m]
[docs] def get_vel_vec(state: PositionVelocityNed) -> List[float]: """ Extracts the velocity vector from a PositionVelocityNed object. :param state: MAVSDK PositionVelocityNed object. :type state: PositionVelocityNed :return: Velocity as [north, east, down] in m/s. :rtype: list """ vel: VelocityNed = state.velocity return [vel.north_m_s, vel.east_m_s, vel.down_m_s]
[docs] def abs_vel(vec: List[float]) -> float: """ Calculates the magnitude of a velocity vector. :param vec: Velocity vector [vx, vy, vz]. :type vec: list :return: Magnitude of velocity. :rtype: float """ return math.sqrt(sum([v**2 for v in vec]))
[docs] def pythagoras(pos_a: List[float], pos_b: List[float]) -> float: """ Calculates the Euclidean distance between two position vectors. :param pos_a: First position vector [x, y, z]. :type pos_a: list :param pos_b: Second position vector [x, y, z]. :type pos_b: list :return: Euclidean distance. :rtype: float """ return math.sqrt( sum([(pos_a[i] - pos_b[i])**2 for i in range(len(pos_a))]))
[docs] async def get_data(func: AsyncGenerator[T, None]) -> Optional[T]: """ Asynchronously retrieves the first result from an async generator. :param func: Asynchronous generator function. :type func: async generator :return: First result from the generator. """ async for res in func: return res
[docs] async def wait_for(func: AsyncGenerator[T, None], b: Callable[[T], bool]) -> Optional[T]: """ Asynchronously waits for a condition to be met in an async generator. :param func: Asynchronous generator function. :type func: async generator :param b: Condition function that takes a result and returns True if condition is met. :type b: function :return: First result for which the condition is True. """ async for res in func: if b(res): return res
[docs] def rotation_matrix_yaw(rot: float) -> np.ndarray: """ Creates a 3x3 rotation matrix for a yaw (Z-axis) rotation. :param rot: Yaw rotation in degrees. :type rot: float :return: 3x3 rotation matrix. :rtype: numpy.ndarray """ # Using scipy's Rotation to create a rotation matrix for yaw (Z-axis) return R.from_euler('z', [rot], degrees=True).as_matrix()