Source code for payloadcomputerdroneprojekt.mission_computer.mc_class

from payloadcomputerdroneprojekt.communications import Communications
from payloadcomputerdroneprojekt.image_analysis import ImageAnalysis
from payloadcomputerdroneprojekt.camera import AbstractCamera
from payloadcomputerdroneprojekt.mission_computer.scan_planer import \
    plan_scan, export_geojson
from payloadcomputerdroneprojekt.mission_computer.helper \
    import rec_serialize, count_actions, action_with_count, diag, \
    find_shortest_path, PIDController
import os
import logging
import time
import json
import shutil
from payloadcomputerdroneprojekt.helper import smart_print as sp
import asyncio
from typing import Any, Callable, Dict, List, Optional
import numpy as np

MISSION_PATH = "mission_file.json"
MISSION_PROGRESS = "__mission__.json"


[docs] def test_rem(path: str) -> None: if os.path.exists(path): os.remove(path)
[docs] class MissionComputer: """ MissionComputer class for managing drone missions, communication, and image analysis. This class handles mission initialization, execution, progress tracking, and communication with the drone and its subsystems. :param config: Configuration dictionary for the mission computer and subsystems. :type config: dict :param port: Communication port for the drone. :type port: str :param camera: Camera class to be used for image analysis. :type camera: type[AbstractCamera] :param communications: Communications class for drone communication. :type communications: type[Communications], optional :param image_analysis: ImageAnalysis class for image processing. :type image_analysis: type[ImageAnalysis], optional """ def __init__( self, config: dict, port: str, camera: type[AbstractCamera], communications: type[Communications] = Communications, image_analysis: type[ImageAnalysis] = ImageAnalysis ) -> None: """ Initialize the MissionComputer instance. Sets up communication, image analysis, logging, and working directory. :param config: Configuration dictionary. :type config: dict :param port: Communication port. :type port: str :param camera: Camera class. :type camera: type[AbstractCamera] :param communications: Communications class. :type communications: type[Communications], optional :param image_analysis: ImageAnalysis class. :type image_analysis: type[ImageAnalysis], optional """ self.set_work_dir(config) logging.basicConfig(filename="flight.log", format='%(asctime)s %(message)s', level=logging.INFO) self._comms: Communications = communications( port, config.get("communications", {})) self._image: ImageAnalysis = image_analysis( config=config.get("image", {}), camera=camera( config.get("camera", None)), comms=self._comms) self._image._camera.start_camera() self.config: dict = config.get("mission_computer", {}) self._setup()
[docs] def set_work_dir(self, config: dict) -> None: error: Optional[Exception] = None try: path: str = config.get("mission_storage", "mission_storage") print(path) os.makedirs(path, exist_ok=True) except Exception as e: error = e print( "Working directory not accesable, " "switching to 'mission_storage'") path = "mission_storage" os.makedirs(path, exist_ok=True) os.chdir(path) if error: logging.info(str(error))
def _setup(self) -> None: """ Internal setup for mission plan, progress, and available actions. """ self.task = None self._old_task = None self.current_mission_plan: dict = {} self.current_mission_plan.setdefault("parameter", {}) self.progress: int = 0 self.max_progress: int = -1 self.running: bool = False self.main_programm: Optional[asyncio.Task] = None self.actions: Dict[str, Callable] = { "start_camera": self.start_camera, "stop_camera": self.stop_camera, "takeoff": self.takeoff, "land_at": self.land, "delay": self.delay, "list": self.execute_list, "mov_multiple": self.mov_multiple, "forever": self.forever, "mov": self.mov, "mov_to_objects_cap_pic": self.mov_to_objects_cap_pic, "mov_local": self.mov_local, "scan_area": self.scan_area, } self.none_counting_tasks: List[str] = [ "list", "mov_multiple" ] self.cancel_list: List[Callable] = [ self._image.stop_cam ] # TODO: add on off state filter camera is # not being reactivated on restart
[docs] def initiate(self, missionfile: str = "") -> None: """ Initialize or reset the mission plan from a file. :param missionfile: Path to the mission file. :type missionfile: str, optional """ if os.path.exists(MISSION_PROGRESS): try: with open(MISSION_PROGRESS, "r") as f: progress = json.load(f) # Check if the mission can be recovered based on time if abs(progress["time"] - time.time() ) > self.config.get("recouver_time", 10): test_rem(MISSION_PROGRESS) test_rem(MISSION_PATH) except Exception: sp("Error reading progress file, resetting progress") test_rem(MISSION_PROGRESS) test_rem(MISSION_PATH) return else: test_rem(MISSION_PROGRESS) test_rem(MISSION_PATH) mission: Optional[dict] = None if os.path.exists(missionfile): shutil.copyfile(missionfile, MISSION_PATH) test_rem(MISSION_PROGRESS) if os.path.exists(MISSION_PATH): with open(MISSION_PATH, "r") as f: mission = json.load(f) rec_serialize(mission) self.current_mission_plan = mission self.current_mission_plan.setdefault("parameter", {}) if mission is None: self.progress = 0 self.max_progress = -1 test_rem(MISSION_PROGRESS) return try: if os.path.exists(MISSION_PROGRESS): with open(MISSION_PROGRESS, "r") as f: progress = json.load(f) if count_actions(mission) == progress["max_progress"]: self.progress = progress["progress"] self.max_progress = progress["max_progress"] return except Exception: sp("Error reading progress file, resetting progress") test_rem(MISSION_PROGRESS) self.progress = 0 self.max_progress = -1 return self.progress = mission.get("progress", 0) self.max_progress = count_actions(mission)
[docs] async def save_progress(self) -> None: """ Periodically save the current mission progress to a file. """ while True: self._save_progress() await asyncio.sleep(0.1)
def _save_progress(self) -> None: """ Save the current progress to the progress file if the mission is running. """ if self.running: obj = { "progress": self.progress, "max_progress": self.max_progress, "time": time.time() } with open(MISSION_PROGRESS, "w") as f: json.dump(obj, f)
[docs] async def execute(self, action: dict) -> None: """ Execute a single mission action. :param action: Action dictionary containing the action type and commands. :type action: dict """ self.running = True a: str = action["action"] if a not in self.actions.keys(): sp(f"Action not found {a} at exectuion" f" {self.progress} / {self.max_progress}") return try: await self.actions[a](action.get("commands", {})) except Exception as e: sp(f"Error in {a} ({self.progress} / {self.max_progress}): {e}") if a not in self.none_counting_tasks: self.progress += 1 self._save_progress() self.running = False if self.progress >= self.max_progress: await self.status("Mission Completed") self.running = False if os.path.exists(MISSION_PROGRESS): os.remove(MISSION_PROGRESS) if os.path.exists(MISSION_PATH): os.remove(MISSION_PATH)
[docs] def start(self) -> None: """ Start the mission computer's main event loop. """ self.task = self._start asyncio.run(self._task())
async def _task(self) -> None: asyncio.create_task(self.save_progress()) while True: if self.task is not None: if self._old_task is not None: self._old_task.cancel() self._old_task = asyncio.create_task(self.task()) self.task = None await asyncio.sleep(0.1) async def _start(self) -> None: """ Asynchronous main loop for mission execution and communication. """ await self._comms.connect() await self.status("Mission Computer Started") await self.status(f"Starting with Progress: {self.progress}") if "action" in self.current_mission_plan.keys(): self.running = True plan: dict = self.current_mission_plan if self.progress > 0: plan = action_with_count( self.current_mission_plan, self.progress) if isinstance(plan, int): sp(f"Progress {self.progress} exceeds plan actions, " f"resetting to 0") self.progress = 0 plan = self.current_mission_plan if self.main_programm is not None: self.main_programm.cancel() self.main_programm = asyncio.create_task( self.execute(plan)) else: await self.status("No Valid Mision") sp("Waiting for Networking connection")
[docs] async def new_mission(self, plan: str) -> None: """ Callback for receiving a new mission plan. :param plan: Path to the new mission plan file. :type plan: str """ if self.main_programm: try: self.main_programm.cancel() except asyncio.CancelledError: sp("Main programm already canceled") for task in self.cancel_list: try: task() except Exception as e: sp(f"Error in canceling: {e}") self.running = False self.initiate(plan) self.task = self._start
[docs] async def start_camera(self, options: dict) -> None: """ Start the camera subsystem. :param options: Options for starting the camera (e.g., images per second). :type options: dict """ await self.status("Starting Camera") self._image.start_cam(options.get("ips", 1))
[docs] async def stop_camera(self, options: dict) -> None: """ Stop the camera subsystem and process filtered objects. :param options: Options for stopping the camera. :type options: dict """ await self.status("Stopping Camera") self._image.stop_cam() self._image.get_filtered_objs()
[docs] async def takeoff(self, options: dict) -> None: """ Command the drone to take off to a specified height. :param options: Options containing the target height. :type options: dict """ h: float = options.get( "height", self.current_mission_plan["parameter"].get( "flight_height", 5)) await self.status(f"Taking Off to height {h}") await self._comms.start(h)
[docs] async def land(self, objective: dict) -> None: """ Land the drone at a specified location, optionally using color/shape detection. :param objective: Dictionary with landing coordinates and optional color/shape. :type objective: dict """ if not await self._comms.is_flying(): return if "lat" in objective and "lon" in objective: sp(f"Landing at {objective['lat']:.6f} {objective['lon']:.6f}") await self.mov(options=objective) else: await self.status("No lat/lon given – skipping GPS movement") try: await self.smart_land(objective) except Exception as e: sp(f"Error during smart land: {e}") await self.status("Smart land failed, landing at current position") await self.status("Landeposition erreicht. Drohne landet.") await self._comms.mov_by_vel( [0, 0, self.config.get("land_speed", 2)]) await self._comms.landed()
[docs] async def smart_land(self, objective: dict) -> None: yaw_pid = PIDController( Kp=0.25, Ki=0.0, Kd=0.1, output_limits=(-15, 15)) if "color" not in objective.keys(): sp("No color given") return sp(f"Suche Objekt vom Typ '{objective.get('shape', None)}' " f"mit Farbe '{objective['color']}'") min_alt: float = self.current_mission_plan.get( "parameter", {}).get("decision_height", 1) if self.config.get("indoor", False): detected_alt: float = -1*(await self._comms.get_position_xyz())[2] else: detected_alt: float = await self._comms.get_relative_height() if detected_alt <= 0: sp(f"Warning: detected_alt below 0 ({detected_alt:.2f})," " clamping to 0") detected_alt = 0.001 tries = 5 old_time = time.time() old_d = 0.0 while detected_alt > min_alt: old_alt = detected_alt offset, detected_alt, yaw = \ await self._image.get_current_offset_closest( objective["color"], objective.get('shape', None), indoor=self.config.get("indoor", False)) if offset is None: await self.status("Objekt nicht gefunden.") tries -= 1 detected_alt = old_alt if tries <= 0: sp("Max tries reached, aborting landing") return sp("skip to next") continue tries = 5 sp(f"Offset: {offset}, Detected Altitude: {detected_alt}, " f"Yaw: {yaw}") d = diag(offset[0], offset[1]) vel_ver: float = 0.002 / d if vel_ver*2 > detected_alt: vel_ver = detected_alt / 2 if abs(d-old_d) > 0.25: vel_ver = 0 sp(f"Vertical Velocity: {vel_ver:.2f}") def smart_xy(x): return np.tanh(x) / 2 now = time.time() dt = now - old_time old_time = now # PID control for x, y, z, and yaw vx = smart_xy(offset[0]) vy = smart_xy(offset[1]) vyaw = yaw_pid(np.tanh(-yaw/15)*15, dt) sp(f"PID vx: {vx:.2f}, vy: {vy:.2f}, vyaw: {vyaw:.2f}") await self._comms.mov_by_vel([vx, vy, vel_ver], vyaw) old_d = d
[docs] async def delay(self, options: dict) -> None: """ Delay execution for a specified amount of time. :param options: Dictionary with the delay time in seconds. :type options: dict """ sp(f"Delay: {options.get('time', 1)}") await asyncio.sleep(options.get("time", 1))
[docs] async def execute_list(self, options: List[dict]) -> None: """ Execute a list of actions sequentially. :param options: List of action dictionaries. :type options: List[dict] """ for item in options: await self.execute(item)
[docs] async def mov_multiple(self, options: List[dict]) -> None: """ Move to multiple locations sequentially. :param options: List of movement command dictionaries. :type options: List[dict] """ await self.status(f"Moving Multiple {len(options)}") for item in options: await self.mov(item) self.progress += 1
[docs] async def mov(self, options: dict) -> None: """ Move the drone to a specified latitude, longitude, and height. :param options: Dictionary with 'lat', 'lon', and optional 'height' and 'yaw'. :type options: dict """ yaw: Optional[float] = options.get("yaw") if "height" in options.keys(): h: float = options["height"] else: h: float = self.current_mission_plan.get( "parameter", {}).get("flight_height", 5) await self.status( f"Moving to {options['lat']:.6f} {options['lon']:.6f} " f"{h:.2f} {yaw}") pos: List[float] = [options['lat'], options['lon'], h] if not await self._comms.is_flying(): await self._comms.start(h) await self._comms.mov_to_lat_lon_alt(pos, yaw)
[docs] async def forever(self, options: dict) -> None: """ Run an infinite loop (used for testing or waiting). :param options: Options dictionary (unused). :type options: dict """ sp("Running Until Forever") while True: await asyncio.sleep(2)
[docs] async def mov_to_objects_cap_pic(self, options: dict) -> None: """ Move to detected objects and capture images at each location. :param options: Dictionary with movement and delay options. :type options: dict """ sp("Moving to objects and taking picture") obj: List[dict] = self._image.get_filtered_objs() path: List[Any] = find_shortest_path( obj, await self._comms.get_position_lat_lon_alt()) if "height" in options.keys(): h: float = options["height"] else: h: float = self.current_mission_plan.get( "parameter", {}).get("height", 5) for i, item in enumerate(path): sp(f"Moving to {i+1}/{len(path)}: {item}") await self.mov({"lat": item[0], "lon": item[1], "height": h}) await asyncio.sleep(options.get("delay", 0.5)) await self._image.take_image()
[docs] async def status(self, msg: str) -> None: """ Send a status message to the communication subsystem. :param msg: Status message to send. :type msg: str """ sp(msg) await self._comms.send_status(msg)
[docs] async def mov_local(self, options: dict) -> None: """ Move the drone to a specified local position (x, y, z) in meters. :param options: Dictionary with 'x', 'y', and optional 'z' and 'yaw'. x: forward (North), y: right (East), z: down (positive) :type options: dict """ if "x" in options and "y" in options: await self._move_local(options) elif "dx" in options and "dy" in options: await self._move_local_delta(options) else: await self.status("No valid input fields in local move")
async def _move_local(self, options: dict) -> None: """ Move the drone to a specified local position (x, y, z) in meters. :param options: Dictionary with 'x', 'y', and optional 'z' and 'yaw'. x: forward (North), y: right (East), z: down (positive) :type options: dict """ # Extract coordinates x, y = options['x'], options['y'] if "z" in options.keys(): z: float = options["z"] else: z: float = -1 * self.current_mission_plan.get( "parameter", {}).get("height", 1) yaw: Optional[float] = options.get("yaw") await self.status( f"Moving to local x={x:.2f}, y={y:.2f}, z={z:.2f}") pos_local: List[float] = [x, y, z] if not await self._comms.is_flying(): # Use negative z for takeoff height since NED z is positive # downward await self._comms.start(abs(z)) await self._comms.mov_to_xyz(pos_local, yaw) async def _move_local_delta(self, options: dict) -> None: """ Move the drone by a specified delta in local coordinates (dx, dy, dz). :param options: Dictionary with 'dx', 'dy', and optional 'dz' and 'yaw'. :type options: dict """ dx, dy = options['dx'], options['dy'] dz: float = options.get("dz", 0) yaw: Optional[float] = options.get("yaw", 0) await self.status( f"Moving local delta dx={dx:.2f}, dy={dy:.2f}, dz={dz:.2f}") pos_local: List[float] = [dx, dy, dz] if not await self._comms.is_flying(): # Use negative dz for takeoff height since NED z is positive # downward z: float = self.current_mission_plan.get( "parameter", {}).get("height", 1) takeoff_height = abs(z - dz) await self._comms.start(takeoff_height) await self._comms.mov_by_xyz(pos_local, yaw)
[docs] async def scan_area(self, options: dict) -> None: """ Scan a specified area using the drone's camera. :param options: Dictionary with 'lat', 'lon', 'height', and optional 'yaw'. :type options: dict """ start = (await self._comms.get_position_lat_lon_alt())[:2] polygon: List[tuple] = options.get("polygon", []) end = options.get("end_point", start) # polygon = [(48.767642, 11.337281), # (48.767535, 11.337174), # (48.767722, 11.336517), # (48.768063, 11.336072), # (48.768167, 11.336196) # ] # start = (48.767642, 11.337281) # end = (48.767722, 11.336799) if "height" in options.keys(): h: float = options["height"] else: h: float = self.current_mission_plan.get( "parameter", {}).get("flight_height", 5) mission = plan_scan( polygon_latlon=polygon, start_latlon=start, end_latlon=end, altitude=h, fov_deg=self._image.config.get("fov", [66, 41])[0], overlap_ratio=options.get("overlap_ratio", 0.2) ) sp("Scan Mission Plan:") sp(mission) if not mission: sp("No valid scan mission plan generated") return export_geojson(mission, filename="scan_mission.geojson") for point in mission["route"]: sp(f"Scan Line: {point}") await self.mov({"lat": point[0], "lon": point[1], "height": h, "yaw": point[2]}) await asyncio.sleep(options.get("delay", 0.5))