DroneManager

DroneManager module

class dronemanager.core.DMConfig(drone_configs: DroneConfigs, mav_system_id: int = 246, mav_component_id: int = 190, default_plugins: list[str] | None = None, testing: bool = False, plugin_settings: dict[str, dict[str, Any]] | None = None, **kwargs)

Bases: object

Configuration class for DroneManager

CORE_ENTRIES = ['drones', 'mav_system_id', 'mav_component_id', 'plugin_settings', 'default_plugins', 'testing']
classmethod from_file(filepath: str)
to_file(filepath: str)
class dronemanager.core.DroneManager(drone_class, logger=None, log_to_console=True, console_log_level=logging.INFO)

Bases: object

Core class of the library.

drones: dict[str, Drone]
plugins: set[str]
async connect_to_drone(name: str, mavsdk_server_address: str | None = None, mavsdk_server_port: int | None = None, drone_address: str | None = None, timeout: float = 30, telemetry_frequency: float | None = None, log_telemetry=True)
async disconnect(names: str | Collection[str], force=False)
async arm(names: str | Collection[str], schedule=False)
async disarm(names: str | Collection[str], schedule=False)
async takeoff(names: str | Collection[str], altitude=2.0, schedule=False)
async change_flightmode(names: str | Collection[str], flightmode: str, schedule=False)
async land(names: str | Collection[str], schedule=False)
set_fence(names: str | Collection[str], fence: Fence)

Set a fence on drones

pause(names: str | Collection[str])
resume(names: str | Collection[str])
async yaw_to(names: str | Collection[str], yaw: Collection[float] | float, yaw_rate: Collection[float] | float | None = None, local: Collection[float] | None = None, tol: float | Collection[float] = 2, schedule: bool = True)
async fly_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)
async move(names: str | Collection[str], offset: Collection[float], yaw: Collection[float] | float | None = None, use_gps: bool | Collection[bool] = True, tol: float | Collection[float] = 0.25, schedule: bool = True)

Move the drones by offsets meters from their current positions. Which coordinate system is used depends on no_gps.

Parameters:
  • names

  • offset – An array with the offsets for the drones. Should contain the number of meters to move in NED.

  • yaw

  • use_gps – If False, use the local coordinate system, otherwise use GPS.

  • tol

  • schedule

Returns:

async wait(names: str | Collection[str], delay: float | Collection[float], schedule=True)
async orbit(name, radius, velocity, center_lat, center_long, amsl)
async go_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)

Note that this uses the GO TO Mavlink command instead of offboard mode.

async action_stop(names)
async kill(names)
add_remove_func(func)
add_connect_func(func)
async close()
plugin_options()
currently_loaded_plugins()
add_plugin_load_func(func)
add_plugin_unload_func(func)
async load_plugin(plugin_module: str, plugin_name: str | None = None, options: list[str] | None = None, class_getter: callable = None)
async unload_plugin(plugin_name)

Drone module

class dronemanager.drone.Battery

Bases: object

class dronemanager.drone.DroneConfig(drone_name: str, address: str | None, position_rate: float = 5.0, log_telemetry: bool = False, max_h_vel: float = 10.0, max_down_vel: float = 1.0, max_up_vel: float = 3.0, max_h_acc: float = 1.5, max_v_acc: float = 0.5, max_h_jerk: float = 0.5, max_v_jerk: float = 0.5, max_yaw_vel: float = 60, max_yaw_acc: float = 30, max_yaw_jerk: float = 30, size: float = 1.0, rtsp: str | None = None, **kwargs)

Bases: object

Convenience class for drone configurations.

These exist for convenience purposes, to allow people to define drone objects with fixed parameters for easy reuse. They can be saved to and loaded from files. A given configuration is used when dm.connect_to_drone is called with the drone name matching the configuration.

class dronemanager.drone.DroneConfigs(configs: list[DroneConfig])

Bases: object

class dronemanager.drone.DroneParams(raw=None)

Bases: object

class dronemanager.drone.Drone(name, *args, log_to_file=True, config: DroneConfig | None = None, **kwargs)

Bases: ABC, Thread

VALID_FLIGHTMODES = {}
VALID_SETPOINT_TYPES = {}
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

schedule_task(coro) Future
execute_task(coro) Future
add_handler(handler)
abstractmethod async stop_execution()

Stops the thread. This function should be called at the end of any implementing function.

Returns:

pause()

Pause task execution by setting self.is_paused to True.

Note that it is not possible to “pause” what the drone is doing in a general way. What “pausing” a task does or if a task can even be paused depends on the specific task and implementation. Subclasses must define and implement this behaviour themselves. However, pausing is always possible between tasks, and this is the default behaviour for subclasses that do not implement any of their own: When paused, drones will finish their current task and then wait until unpaused before beginning the next task.

resume()

Resume executing tasks.

abstract property is_connected: bool
abstract property is_armed: bool
abstract property flightmode: FlightMode
abstract property in_air: bool
property autopilot: str
abstract property fix_type: FixType
abstract property position_global: ndarray

Array with the GPS coordinates [latitude, longitude, AMSL]

Type:

return

abstract property position_ned: ndarray
abstract property velocity: ndarray
abstract property speed: float
abstract property attitude: ndarray

RPY in degrees

abstract property batteries: dict[int, Battery]
property parameters_loaded: bool
abstractmethod async connect(drone_addr, *args, **kwargs)
abstractmethod async load_parameters()
abstractmethod async disconnect(force=False) bool
abstractmethod async arm() bool
abstractmethod async disarm() bool
abstractmethod async takeoff(altitude=2.0) bool

Takes off to the specified altitude above current position.

Note that altitude is positive.

Parameters:

altitude – Takeoff altitude above.

Returns:

abstractmethod async change_flight_mode(flightmode) bool
is_at_waypoint(waypoint: Waypoint, pos_tolerance=0.25, vel_tolerance=0.1, yaw_tolerance=1) bool

Definition of “is at” depends on the waypoint type. At most checks position, yaw and velocity.

Parameters:
  • waypoint

  • pos_tolerance – In meters

  • vel_tolerance – In m/s

  • yaw_tolerance – In degrees

Returns:

is_at_pos(target_pos, tolerance=0.25) bool
Parameters:
  • target_pos – Array with target position. If a yaw is also passed (i.e. array length 4), it is ignored.

  • tolerance – How close we have to be to the target position to be considered “at” it.

Returns:

is_at_heading(target_heading, tolerance=1) bool
is_at_gps(target_gps, tolerance=0.25) bool
is_at_vel(target_vel, tolerance=0.1)
abstractmethod async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)
abstractmethod async spin_at_rate(yaw_rate, duration, direction='cw') bool
set_fence(fence_type: type[Fence], *args, **kwargs)
check_waypoint(waypoint: Waypoint)

Check if a waypoint is valid and within any geofence (if such a fence is set)

abstractmethod async set_setpoint(setpoint: Waypoint) bool
async wait(delay: float)

Wait delay seconds.

This function is useful with scheduling to schedule short waits between moves.

abstractmethod async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)

Fly to the specified position.

Parameters:
  • local

  • gps

  • yaw

  • waypoint

  • tolerance

Returns:

abstractmethod async move(offset: ndarray, yaw: float | None = None, use_gps=True, tolerance=0.25)

Move from the current position by the specified distances.

Parameters:
  • offset – A numpy array with the information how much to move along each axis in meters.

  • yaw

  • use_gps

  • tolerance

Returns:

abstractmethod async orbit(radius, velocity, latitude, longitude, amsl) bool
abstractmethod async land() bool
abstractmethod async stop() bool
abstractmethod async kill() bool
clear_queue() None

Clears the action queue.

Does not cancel the current action.

Returns:

cancel_action() None

Cancels the current action task

Returns:

class dronemanager.drone.DroneMAVSDK(name, mavsdk_server_address: str | None = None, mavsdk_server_port: int = 50051, config: DroneConfig | None = None)

Bases: Drone

VALID_FLIGHTMODES = {'altitude', 'hold', 'land', 'offboard', 'position', 'return', 'takeoff'}
VALID_SETPOINT_TYPES = {WayPointType.POS_GLOBAL, WayPointType.POS_NED, WayPointType.POS_VEL_ACC_NED, WayPointType.POS_VEL_NED, WayPointType.VEL_NED}
system: System | None
mav_conn: MAVPassthrough
property is_connected: bool
property is_armed: bool
property flightmode: FlightMode
property in_air: bool
property fix_type: FixType
property position_global: ndarray

Array with the GPS coordinates [latitude, longitude, AMSL]

Type:

return

property altitude_above_takeoff: float
property position_ned: ndarray
property velocity: ndarray
property speed: float
property attitude: ndarray

RPY in degrees

property heading: float
property batteries: dict[int, Battery]
async connect(drone_address, system_id=0, component_id=0, log_telemetry=None) bool
async load_parameters()
async disconnect(force=False)
async arm()
async disarm()
async takeoff(altitude=2.0) bool
Parameters:

altitude

Returns:

async change_flight_mode(flightmode: str, timeout: float = 5)
async set_setpoint(setpoint: Waypoint)
async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)

Yawing to the target heading as you do so at the specified rate, maintaining current position.

Uses the local coordinate system for to determine and maintain position. Pausable.

Parameters:
  • target_yaw – Heading as a degree fom -180 to 180, right positive, 0 forward.

  • yaw_rate

  • local – Position setpoint during yaw.

  • tolerance – How close we have to get to the heading before this function returns.

Returns:

async spin_at_rate(yaw_rate, duration, direction='cw')

Spin in place at the given rate for the given duration.

Pausable.

Parameters:
  • yaw_rate

  • duration

  • direction

Returns:

async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25, put_into_offboard=True, log=True)

Fly to a specified point in offboard mode. Uses path generators and followers to get there.

If multiple target are provided (for example GPS and local coordinates), we prefer coordinates in this fashion: Waypoint > GPS > local, i.e. in the example, the local coordinates would be ignored.

Parameters:
  • local

  • gps

  • yaw

  • waypoint

  • tolerance

  • put_into_offboard

  • log

Returns:

async move(offset, yaw: float | None = None, use_gps=True, tolerance=0.25)

Move from the current position by the specified distances.

Parameters:
  • offset – A numpy array with the information how much to move along each axis in meters.

  • yaw

  • use_gps

  • tolerance

Returns:

async go_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)
async orbit(radius, velocity, center_lat, center_long, amsl)
async land()
async manual_control_position()
async manual_control_altitude()
async set_manual_control_input(x, y, z, r)
async stop_execution()

Stops all coroutines, closes all connections, etc.

Returns:

async stop()
async kill()

Terminal interface

App

class dronemanager.app.StatusScreen(*args, **kwargs)

Bases: Screen

A screen showing detailed information for a single drone.

Initialize the screen.

Parameters:
  • name – The name of the screen.

  • id – The ID of the screen in the DOM.

  • classes – The CSS classes for the screen.

CSS = ' \nProgressBar {\n    width: 25;\n    height: 1;\n    layout: horizontal;\n}\n\nBar {\n    width: 20;\n    height: 1;\n}\n'

CSS for the status screen.

compose()

Creates the screen object

on_radio_set_changed(event: Changed) None
can_focus = False

Widget may receive focus.

can_focus_children = True

Widget’s children may receive focus.

class dronemanager.app.CommandScreen(*args, **kwargs)

Bases: Screen

Initialize the screen.

Parameters:
  • name – The name of the screen.

  • id – The ID of the screen in the DOM.

  • classes – The CSS classes for the screen.

STATUS_REFRESH_RATE = 20
CSS = '\n.text {\n    text-style: bold;\n}\n\n#status {\n    height: 10fr;\n}\n\n#usage {\n    height: 1fr;\n}\n\n#sidebar {\n    width: 97;\n}\n'

Inline CSS, useful for quick scripts. Rules here take priority over CSS_PATH.

Note

This CSS applies to the whole app.

dm: DroneManager
drone_widgets: dict[str, Widget]
running_tasks: set[asyncio.Task]
async cli(message)
async action_stop(names)
async exit()

Checks if any drones are armed and exits the app if not.

compose()

Creates the screen object

can_focus = False

Widget may receive focus.

can_focus_children = True

Widget’s children may receive focus.

class dronemanager.app.DroneApp(dm: DroneManager, logger=None)

Bases: App

Create an instance of an app.

Parameters:
  • driver_class – Driver class or None to auto-detect. This will be used by some Textual tools.

  • css_path – Path to CSS or None to use the CSS_PATH class variable. To load multiple CSS files, pass a list of strings or paths which will be loaded in order.

  • watch_css – Reload CSS if the files changed. This is set automatically if you are using textual run with the dev switch.

  • ansi_color – Allow ANSI colors if True, or convert ANSI colors to to RGB if False.

Raises:

CssPathError – When the supplied CSS path(s) are an unexpected type.

BINDINGS = {Binding(key='s', action='cycle_control', description='Swap Status/Control', show=True, key_display=None, priority=False, tooltip='', id=None, system=False)}

Key Bindings

TITLE = 'DroneManager'

Window Title

MODES = {'control': <class 'dronemanager.app.CommandScreen'>, 'status': <class 'dronemanager.app.StatusScreen'>}

A dictionary of the available app “modes”.

An app mode is essentially a different view, we can cycle between them. See the textual documentation for more information.

command_screen: CommandScreen | None
status_screen: StatusScreen | None
on_mount()
action_cycle_control()
dronemanager.app.main()

Custom widgets

exception dronemanager.widgets.ArgumentParserError

Bases: Exception

exception dronemanager.widgets.PrintHelpInsteadOfParsingError

Bases: Exception

class dronemanager.widgets.ArgParser(*args, logger=None, **kwargs)

Bases: ArgumentParser

error(message: string)

Prints a usage message incorporating the message to stderr and exits.

If you override this in a subclass, it should not return – it should either exit or raise an exception.

print_help(file=None)
exit(status=0, message=None)
class dronemanager.widgets.InputWithHistory(*args, **kwargs)

Bases: Input

Initialise the Input widget.

Parameters:
  • value – An optional default value for the input.

  • placeholder – Optional placeholder text for the input.

  • highlighter – An optional highlighter for the input.

  • password – Flag to say if the field should obfuscate its content.

  • restrict – A regex to restrict character inputs.

  • type – The type of the input.

  • max_length – The maximum length of the input, or 0 for no maximum length.

  • suggester – [Suggester][textual.suggester.Suggester] associated with this input instance.

  • validators – An iterable of validators that the Input value will be checked against.

  • validate_on – Zero or more of the values “blur”, “changed”, and “submitted”, which determine when to do input validation. The default is to do validation for all messages.

  • valid_empty – Empty values are valid.

  • select_on_focus – Whether to select all text on focus.

  • name – Optional name for the input widget.

  • id – Optional ID for the widget.

  • classes – Optional initial classes for the widget.

  • disabled – Whether the input is disabled or not.

  • tooltip – Optional tooltip.

BINDINGS = [Binding(key='left', action='cursor_left', description='Move cursor left', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+left', action='cursor_left(True)', description='Move cursor left and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+left', action='cursor_left_word', description='Move cursor left a word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+shift+left', action='cursor_left_word(True)', description='Move cursor left a word and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='right', action='cursor_right', description='Move cursor right or accept the completion suggestion', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+right', action='cursor_right(True)', description='Move cursor right and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+right', action='cursor_right_word', description='Move cursor right a word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+shift+right', action='cursor_right_word(True)', description='Move cursor right a word and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='backspace', action='delete_left', description='Delete character left', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='home,ctrl+a', action='home', description='Go to start', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='end,ctrl+e', action='end', description='Go to end', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+home', action='home(True)', description='Select line start', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+end', action='end(True)', description='Select line end', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='delete,ctrl+d', action='delete_right', description='Delete character right', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='enter', action='submit', description='Submit', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+w', action='delete_left_word', description='Delete left to start of word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+u', action='delete_left_all', description='Delete all to the left', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+f', action='delete_right_word', description='Delete right to start of word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+k', action='delete_right_all', description='Delete all to the right', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+x', action='cut', description='Cut selected text', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+c', action='copy', description='Copy selected text', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+v', action='paste', description='Paste text from the clipboard', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='up', action='history_prev', description='Previous item from history', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='down', action='history_rec', description='Next item in history', show=False, key_display=None, priority=False, tooltip='', id=None, system=False)]
Key(s) | Description |
:- | :- |
left | Move the cursor left. |
shift+left | Move cursor left and select. |
ctrl+left | Move the cursor one word to the left. |
right | Move the cursor right or accept the completion suggestion. |
ctrl+shift+left | Move cursor left a word and select. |
shift+right | Move cursor right and select. |
ctrl+right | Move the cursor one word to the right. |
backspace | Delete the character to the left of the cursor. |
ctrl+shift+right | Move cursor right a word and select. |
home,ctrl+a | Go to the beginning of the input. |
end,ctrl+e | Go to the end of the input. |
shift+home | Select up to the input start. |
shift+end | Select up to the input end. |
delete,ctrl+d | Delete the character to the right of the cursor. |
enter | Submit the current value of the input. |
ctrl+w | Delete the word to the left of the cursor. |
ctrl+u | Delete everything to the left of the cursor. |
ctrl+f | Delete the word to the right of the cursor. |
ctrl+k | Delete everything to the right of the cursor. |
ctrl+x | Cut selected text. |
ctrl+c | Copy selected text. |
ctrl+v | Paste text from the clipboard. |
action_history_prev() None
action_history_rec() None
add_to_history(item) None
async action_submit() None

Handle a submit action.

Normally triggered by the user pressing Enter. This may also run any validators.

can_focus = True

Widget may receive focus.

can_focus_children = True

Widget’s children may receive focus.

class dronemanager.widgets.DroneOverview(drone, update_frequency, logger, *args, **kwargs)

Bases: Static

Initialize a Widget.

Parameters:
  • *children – Child widgets.

  • name – The name of the widget.

  • id – The ID of the widget in the DOM.

  • classes – The CSS classes for the widget.

  • disabled – Whether the widget is disabled or not.

COLUMN_NAMES = ['Name', 'Status', 'Modes', 'GPS', 'Local', 'Vel', 'Yaw/Bat']
COLUMN_WIDTHS = [10, 11, 11, 16, 9, 9, 8]
COLUMN_ALIGN = ['<', '>', '>', '>', '>', '>', '>']
COLUMN_SPACING = 3
classmethod header_string()
classmethod gadget_width()
on_mount() None
async update_display()
can_focus = False

Widget may receive focus.

can_focus_children = True

Widget’s children may receive focus.

class dronemanager.widgets.TextualLogHandler(log_textual, *args, **kwargs)

Bases: Handler

Initializes the instance - basically setting the formatter to None and the filter list to empty.

log_textual: Log
emit(record)

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

MAVLink connection

class dronemanager.mavpassthrough.MAVPassthrough(dialect=None, loggername='passthrough', log_messages=True)

Bases: object

connect_gcs(address)
connect_drone(loc, appendix, scheme='udp')
connected_to_drone()
connected_to_gcs()
send_as_gcs(msg)
listen_ack(command_id, target_component) Future
listen_message(message_id, target_component) Future
send_cmd_long(target_component, cmd, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, param6=math.nan, param7=math.nan) Future
send_request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1)
async request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1, timeout=5)
send_param_ext_request_list(target_component)
send_param_ext_set(target_component, param_id, param_value: int | float, param_type: int)
send_param_ext_request_read(target_component, param_id: str, param_index: int = None)
add_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
remove_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
async stop()
async dronemanager.mavpassthrough.main()