DroneManager
DroneManager module
- class dronemanager.core.DMConfig(drone_configs: DroneConfigs, mav_system_id: int = 246, mav_component_id: int = 190, default_plugins: list[str] | None = None, testing: bool = False, plugin_settings: dict[str, dict[str, Any]] | None = None, **kwargs)
Bases:
objectConfiguration class for DroneManager
- CORE_ENTRIES = ['drones', 'mav_system_id', 'mav_component_id', 'plugin_settings', 'default_plugins', 'testing']
- classmethod from_file(filepath: str)
- to_file(filepath: str)
- class dronemanager.core.DroneManager(drone_class, logger=None, log_to_console=True, console_log_level=logging.INFO)
Bases:
objectCore class of the library.
- plugins: set[str]
- async connect_to_drone(name: str, mavsdk_server_address: str | None = None, mavsdk_server_port: int | None = None, drone_address: str | None = None, timeout: float = 30, telemetry_frequency: float | None = None, log_telemetry=True)
- async disconnect(names: str | Collection[str], force=False)
- async arm(names: str | Collection[str], schedule=False)
- async disarm(names: str | Collection[str], schedule=False)
- async takeoff(names: str | Collection[str], altitude=2.0, schedule=False)
- async change_flightmode(names: str | Collection[str], flightmode: str, schedule=False)
- async land(names: str | Collection[str], schedule=False)
- pause(names: str | Collection[str])
- resume(names: str | Collection[str])
- async yaw_to(names: str | Collection[str], yaw: Collection[float] | float, yaw_rate: Collection[float] | float | None = None, local: Collection[float] | None = None, tol: float | Collection[float] = 2, schedule: bool = True)
- async fly_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)
- async move(names: str | Collection[str], offset: Collection[float], yaw: Collection[float] | float | None = None, use_gps: bool | Collection[bool] = True, tol: float | Collection[float] = 0.25, schedule: bool = True)
Move the drones by offsets meters from their current positions. Which coordinate system is used depends on no_gps.
- Parameters:
names
offset – An array with the offsets for the drones. Should contain the number of meters to move in NED.
yaw
use_gps – If False, use the local coordinate system, otherwise use GPS.
tol
schedule
- Returns:
- async wait(names: str | Collection[str], delay: float | Collection[float], schedule=True)
- async orbit(name, radius, velocity, center_lat, center_long, amsl)
- async go_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)
Note that this uses the GO TO Mavlink command instead of offboard mode.
- async action_stop(names)
- async kill(names)
- add_remove_func(func)
- add_connect_func(func)
- async close()
- plugin_options()
- currently_loaded_plugins()
- add_plugin_load_func(func)
- add_plugin_unload_func(func)
- async load_plugin(plugin_module: str, plugin_name: str | None = None, options: list[str] | None = None, class_getter: callable = None)
- async unload_plugin(plugin_name)
Drone module
- class dronemanager.drone.Battery
Bases:
object
- class dronemanager.drone.DroneConfig(drone_name: str, address: str | None, position_rate: float = 5.0, log_telemetry: bool = False, max_h_vel: float = 10.0, max_down_vel: float = 1.0, max_up_vel: float = 3.0, max_h_acc: float = 1.5, max_v_acc: float = 0.5, max_h_jerk: float = 0.5, max_v_jerk: float = 0.5, max_yaw_vel: float = 60, max_yaw_acc: float = 30, max_yaw_jerk: float = 30, size: float = 1.0, rtsp: str | None = None, **kwargs)
Bases:
objectConvenience class for drone configurations.
These exist for convenience purposes, to allow people to define drone objects with fixed parameters for easy reuse. They can be saved to and loaded from files. A given configuration is used when dm.connect_to_drone is called with the drone name matching the configuration.
- class dronemanager.drone.DroneConfigs(configs: list[DroneConfig])
Bases:
object
- class dronemanager.drone.DroneParams(raw=None)
Bases:
object
- class dronemanager.drone.Drone(name, *args, log_to_file=True, config: DroneConfig | None = None, **kwargs)
Bases:
ABC,Thread- VALID_FLIGHTMODES = {}
- VALID_SETPOINT_TYPES = {}
- run()
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- schedule_task(coro) Future
- execute_task(coro) Future
- add_handler(handler)
- abstractmethod async stop_execution()
Stops the thread. This function should be called at the end of any implementing function.
- Returns:
- pause()
Pause task execution by setting self.is_paused to True.
Note that it is not possible to “pause” what the drone is doing in a general way. What “pausing” a task does or if a task can even be paused depends on the specific task and implementation. Subclasses must define and implement this behaviour themselves. However, pausing is always possible between tasks, and this is the default behaviour for subclasses that do not implement any of their own: When paused, drones will finish their current task and then wait until unpaused before beginning the next task.
- resume()
Resume executing tasks.
- abstract property is_connected: bool
- abstract property is_armed: bool
- abstract property flightmode: FlightMode
- abstract property in_air: bool
- property autopilot: str
- abstract property fix_type: FixType
- abstract property position_global: ndarray
Array with the GPS coordinates [latitude, longitude, AMSL]
- Type:
return
- abstract property position_ned: ndarray
- abstract property velocity: ndarray
- abstract property speed: float
- abstract property attitude: ndarray
RPY in degrees
- property parameters_loaded: bool
- abstractmethod async connect(drone_addr, *args, **kwargs)
- abstractmethod async load_parameters()
- abstractmethod async disconnect(force=False) bool
- abstractmethod async arm() bool
- abstractmethod async disarm() bool
- abstractmethod async takeoff(altitude=2.0) bool
Takes off to the specified altitude above current position.
Note that altitude is positive.
- Parameters:
altitude – Takeoff altitude above.
- Returns:
- abstractmethod async change_flight_mode(flightmode) bool
- is_at_waypoint(waypoint: Waypoint, pos_tolerance=0.25, vel_tolerance=0.1, yaw_tolerance=1) bool
Definition of “is at” depends on the waypoint type. At most checks position, yaw and velocity.
- Parameters:
waypoint
pos_tolerance – In meters
vel_tolerance – In m/s
yaw_tolerance – In degrees
- Returns:
- is_at_pos(target_pos, tolerance=0.25) bool
- Parameters:
target_pos – Array with target position. If a yaw is also passed (i.e. array length 4), it is ignored.
tolerance – How close we have to be to the target position to be considered “at” it.
- Returns:
- is_at_heading(target_heading, tolerance=1) bool
- is_at_gps(target_gps, tolerance=0.25) bool
- is_at_vel(target_vel, tolerance=0.1)
- abstractmethod async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)
- abstractmethod async spin_at_rate(yaw_rate, duration, direction='cw') bool
- check_waypoint(waypoint: Waypoint)
Check if a waypoint is valid and within any geofence (if such a fence is set)
- async wait(delay: float)
Wait delay seconds.
This function is useful with scheduling to schedule short waits between moves.
- abstractmethod async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)
Fly to the specified position.
- Parameters:
local
gps
yaw
waypoint
tolerance
- Returns:
- abstractmethod async move(offset: ndarray, yaw: float | None = None, use_gps=True, tolerance=0.25)
Move from the current position by the specified distances.
- Parameters:
offset – A numpy array with the information how much to move along each axis in meters.
yaw
use_gps
tolerance
- Returns:
- abstractmethod async orbit(radius, velocity, latitude, longitude, amsl) bool
- abstractmethod async land() bool
- abstractmethod async stop() bool
- abstractmethod async kill() bool
- clear_queue() None
Clears the action queue.
Does not cancel the current action.
- Returns:
- cancel_action() None
Cancels the current action task
- Returns:
- class dronemanager.drone.DroneMAVSDK(name, mavsdk_server_address: str | None = None, mavsdk_server_port: int = 50051, config: DroneConfig | None = None)
Bases:
Drone- VALID_FLIGHTMODES = {'altitude', 'hold', 'land', 'offboard', 'position', 'return', 'takeoff'}
- VALID_SETPOINT_TYPES = {WayPointType.POS_GLOBAL, WayPointType.POS_NED, WayPointType.POS_VEL_ACC_NED, WayPointType.POS_VEL_NED, WayPointType.VEL_NED}
- system: System | None
- mav_conn: MAVPassthrough
- property is_connected: bool
- property is_armed: bool
- property flightmode: FlightMode
- property in_air: bool
- property fix_type: FixType
- property position_global: ndarray
Array with the GPS coordinates [latitude, longitude, AMSL]
- Type:
return
- property altitude_above_takeoff: float
- property position_ned: ndarray
- property velocity: ndarray
- property speed: float
- property attitude: ndarray
RPY in degrees
- property heading: float
- async connect(drone_address, system_id=0, component_id=0, log_telemetry=None) bool
- async load_parameters()
- async disconnect(force=False)
- async arm()
- async disarm()
- async takeoff(altitude=2.0) bool
- Parameters:
altitude
- Returns:
- async change_flight_mode(flightmode: str, timeout: float = 5)
- async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)
Yawing to the target heading as you do so at the specified rate, maintaining current position.
Uses the local coordinate system for to determine and maintain position. Pausable.
- Parameters:
target_yaw – Heading as a degree fom -180 to 180, right positive, 0 forward.
yaw_rate
local – Position setpoint during yaw.
tolerance – How close we have to get to the heading before this function returns.
- Returns:
- async spin_at_rate(yaw_rate, duration, direction='cw')
Spin in place at the given rate for the given duration.
Pausable.
- Parameters:
yaw_rate
duration
direction
- Returns:
- async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25, put_into_offboard=True, log=True)
Fly to a specified point in offboard mode. Uses path generators and followers to get there.
If multiple target are provided (for example GPS and local coordinates), we prefer coordinates in this fashion: Waypoint > GPS > local, i.e. in the example, the local coordinates would be ignored.
- Parameters:
local
gps
yaw
waypoint
tolerance
put_into_offboard
log
- Returns:
- async move(offset, yaw: float | None = None, use_gps=True, tolerance=0.25)
Move from the current position by the specified distances.
- Parameters:
offset – A numpy array with the information how much to move along each axis in meters.
yaw
use_gps
tolerance
- Returns:
- async go_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)
- async orbit(radius, velocity, center_lat, center_long, amsl)
- async land()
- async manual_control_position()
- async manual_control_altitude()
- async set_manual_control_input(x, y, z, r)
- async stop_execution()
Stops all coroutines, closes all connections, etc.
- Returns:
- async stop()
- async kill()
Terminal interface
App
- class dronemanager.app.StatusScreen(*args, **kwargs)
Bases:
ScreenA screen showing detailed information for a single drone.
Initialize the screen.
- Parameters:
name – The name of the screen.
id – The ID of the screen in the DOM.
classes – The CSS classes for the screen.
- CSS = ' \nProgressBar {\n width: 25;\n height: 1;\n layout: horizontal;\n}\n\nBar {\n width: 20;\n height: 1;\n}\n'
CSS for the status screen.
- compose()
Creates the screen object
- on_radio_set_changed(event: Changed) None
- can_focus = False
Widget may receive focus.
- can_focus_children = True
Widget’s children may receive focus.
- class dronemanager.app.CommandScreen(*args, **kwargs)
Bases:
ScreenInitialize the screen.
- Parameters:
name – The name of the screen.
id – The ID of the screen in the DOM.
classes – The CSS classes for the screen.
- STATUS_REFRESH_RATE = 20
- CSS = '\n.text {\n text-style: bold;\n}\n\n#status {\n height: 10fr;\n}\n\n#usage {\n height: 1fr;\n}\n\n#sidebar {\n width: 97;\n}\n'
Inline CSS, useful for quick scripts. Rules here take priority over CSS_PATH.
Note
This CSS applies to the whole app.
- dm: DroneManager
- drone_widgets: dict[str, Widget]
- running_tasks: set[asyncio.Task]
- async cli(message)
- async action_stop(names)
- async exit()
Checks if any drones are armed and exits the app if not.
- compose()
Creates the screen object
- can_focus = False
Widget may receive focus.
- can_focus_children = True
Widget’s children may receive focus.
- class dronemanager.app.DroneApp(dm: DroneManager, logger=None)
Bases:
AppCreate an instance of an app.
- Parameters:
driver_class – Driver class or None to auto-detect. This will be used by some Textual tools.
css_path – Path to CSS or None to use the CSS_PATH class variable. To load multiple CSS files, pass a list of strings or paths which will be loaded in order.
watch_css – Reload CSS if the files changed. This is set automatically if you are using textual run with the dev switch.
ansi_color – Allow ANSI colors if True, or convert ANSI colors to to RGB if False.
- Raises:
CssPathError – When the supplied CSS path(s) are an unexpected type.
- BINDINGS = {Binding(key='s', action='cycle_control', description='Swap Status/Control', show=True, key_display=None, priority=False, tooltip='', id=None, system=False)}
Key Bindings
- TITLE = 'DroneManager'
Window Title
- MODES = {'control': <class 'dronemanager.app.CommandScreen'>, 'status': <class 'dronemanager.app.StatusScreen'>}
A dictionary of the available app “modes”.
An app mode is essentially a different view, we can cycle between them. See the textual documentation for more information.
- command_screen: CommandScreen | None
- status_screen: StatusScreen | None
- on_mount()
- action_cycle_control()
- dronemanager.app.main()
Custom widgets
- exception dronemanager.widgets.ArgumentParserError
Bases:
Exception
- exception dronemanager.widgets.PrintHelpInsteadOfParsingError
Bases:
Exception
- class dronemanager.widgets.ArgParser(*args, logger=None, **kwargs)
Bases:
ArgumentParser- error(message: string)
Prints a usage message incorporating the message to stderr and exits.
If you override this in a subclass, it should not return – it should either exit or raise an exception.
- print_help(file=None)
- exit(status=0, message=None)
- class dronemanager.widgets.InputWithHistory(*args, **kwargs)
Bases:
InputInitialise the Input widget.
- Parameters:
value – An optional default value for the input.
placeholder – Optional placeholder text for the input.
highlighter – An optional highlighter for the input.
password – Flag to say if the field should obfuscate its content.
restrict – A regex to restrict character inputs.
type – The type of the input.
max_length – The maximum length of the input, or 0 for no maximum length.
suggester – [Suggester][textual.suggester.Suggester] associated with this input instance.
validators – An iterable of validators that the Input value will be checked against.
validate_on – Zero or more of the values “blur”, “changed”, and “submitted”, which determine when to do input validation. The default is to do validation for all messages.
valid_empty – Empty values are valid.
select_on_focus – Whether to select all text on focus.
name – Optional name for the input widget.
id – Optional ID for the widget.
classes – Optional initial classes for the widget.
disabled – Whether the input is disabled or not.
tooltip – Optional tooltip.
- BINDINGS = [Binding(key='left', action='cursor_left', description='Move cursor left', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+left', action='cursor_left(True)', description='Move cursor left and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+left', action='cursor_left_word', description='Move cursor left a word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+shift+left', action='cursor_left_word(True)', description='Move cursor left a word and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='right', action='cursor_right', description='Move cursor right or accept the completion suggestion', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+right', action='cursor_right(True)', description='Move cursor right and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+right', action='cursor_right_word', description='Move cursor right a word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+shift+right', action='cursor_right_word(True)', description='Move cursor right a word and select', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='backspace', action='delete_left', description='Delete character left', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='home,ctrl+a', action='home', description='Go to start', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='end,ctrl+e', action='end', description='Go to end', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+home', action='home(True)', description='Select line start', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='shift+end', action='end(True)', description='Select line end', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='delete,ctrl+d', action='delete_right', description='Delete character right', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='enter', action='submit', description='Submit', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+w', action='delete_left_word', description='Delete left to start of word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+u', action='delete_left_all', description='Delete all to the left', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+f', action='delete_right_word', description='Delete right to start of word', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+k', action='delete_right_all', description='Delete all to the right', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+x', action='cut', description='Cut selected text', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+c', action='copy', description='Copy selected text', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='ctrl+v', action='paste', description='Paste text from the clipboard', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='up', action='history_prev', description='Previous item from history', show=False, key_display=None, priority=False, tooltip='', id=None, system=False), Binding(key='down', action='history_rec', description='Next item in history', show=False, key_display=None, priority=False, tooltip='', id=None, system=False)]
- Key(s) | Description |:- | :- |left | Move the cursor left. |shift+left | Move cursor left and select. |ctrl+left | Move the cursor one word to the left. |right | Move the cursor right or accept the completion suggestion. |ctrl+shift+left | Move cursor left a word and select. |shift+right | Move cursor right and select. |ctrl+right | Move the cursor one word to the right. |backspace | Delete the character to the left of the cursor. |ctrl+shift+right | Move cursor right a word and select. |home,ctrl+a | Go to the beginning of the input. |end,ctrl+e | Go to the end of the input. |shift+home | Select up to the input start. |shift+end | Select up to the input end. |delete,ctrl+d | Delete the character to the right of the cursor. |enter | Submit the current value of the input. |ctrl+w | Delete the word to the left of the cursor. |ctrl+u | Delete everything to the left of the cursor. |ctrl+f | Delete the word to the right of the cursor. |ctrl+k | Delete everything to the right of the cursor. |ctrl+x | Cut selected text. |ctrl+c | Copy selected text. |ctrl+v | Paste text from the clipboard. |
- action_history_prev() None
- action_history_rec() None
- add_to_history(item) None
- async action_submit() None
Handle a submit action.
Normally triggered by the user pressing Enter. This may also run any validators.
- can_focus = True
Widget may receive focus.
- can_focus_children = True
Widget’s children may receive focus.
- class dronemanager.widgets.DroneOverview(drone, update_frequency, logger, *args, **kwargs)
Bases:
StaticInitialize a Widget.
- Parameters:
*children – Child widgets.
name – The name of the widget.
id – The ID of the widget in the DOM.
classes – The CSS classes for the widget.
disabled – Whether the widget is disabled or not.
- COLUMN_NAMES = ['Name', 'Status', 'Modes', 'GPS', 'Local', 'Vel', 'Yaw/Bat']
- COLUMN_WIDTHS = [10, 11, 11, 16, 9, 9, 8]
- COLUMN_ALIGN = ['<', '>', '>', '>', '>', '>', '>']
- COLUMN_SPACING = 3
- classmethod header_string()
- classmethod gadget_width()
- on_mount() None
- async update_display()
- can_focus = False
Widget may receive focus.
- can_focus_children = True
Widget’s children may receive focus.
- class dronemanager.widgets.TextualLogHandler(log_textual, *args, **kwargs)
Bases:
HandlerInitializes the instance - basically setting the formatter to None and the filter list to empty.
- log_textual: Log
- emit(record)
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
MAVLink connection
- class dronemanager.mavpassthrough.MAVPassthrough(dialect=None, loggername='passthrough', log_messages=True)
Bases:
object- connect_gcs(address)
- connect_drone(loc, appendix, scheme='udp')
- connected_to_drone()
- connected_to_gcs()
- send_as_gcs(msg)
- listen_ack(command_id, target_component) Future
- listen_message(message_id, target_component) Future
- send_cmd_long(target_component, cmd, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, param6=math.nan, param7=math.nan) Future
- send_request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1)
- async request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1, timeout=5)
- send_param_ext_request_list(target_component)
- send_param_ext_set(target_component, param_id, param_value: int | float, param_type: int)
- send_param_ext_request_read(target_component, param_id: str, param_index: int = None)
- add_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
- remove_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
- async stop()
- async dronemanager.mavpassthrough.main()