mirror of
https://git.disroot.org/pranav/pybatmesh.git
synced 2024-11-10 07:21:59 +05:30
Pranav Jerry
61a96ea3b3
Made messages printed in Makefile more understandable. Removed full path of naxalnet from the systemd service. Now you can start naxalnet even if it is installed in /usr/local/bin, if systemd allows (I have not tested it). Many comments were made to respect the 80 chars per line rule. And, of course, added some political commentary to insult the global superpower (superpower in terms of money, military and something else I forgot). And removed MANIFEST.in, which probably haven't changed anything.
326 lines
10 KiB
Python
326 lines
10 KiB
Python
# This file is part of naxalnet.
|
|
# Copyright (C) 2021 The naxalnet Authors
|
|
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU General Public License as published by the
|
|
# Free Software Foundation, either version 3 of the License, or (at your
|
|
# option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful, but
|
|
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
|
|
# Public License for more details.
|
|
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
iwd.py
|
|
------
|
|
|
|
This file contains methods to communicate with iwd via its D-Bus API and
|
|
control WiFi adapters.
|
|
|
|
Some terms used here, such as device and adapter might confuse you if you
|
|
haven't used iwctl before. Just as a quick reference, here is a list of
|
|
terms and what they mean:
|
|
|
|
- ad-hoc: a mode supported by some WiFi adapters to start a decentralised
|
|
network, where there is no central point of failure.
|
|
|
|
- ap: a mode used to start a central access point so that other machines
|
|
without naxalnet can connect to the mesh. AP is also known as WiFi
|
|
hotspot.
|
|
|
|
- station: this is the mode most WiFi adapters use by default. This mode
|
|
is used to connect to an ap. naxalnet DOES NOT use this mode.
|
|
|
|
- adapter: a physical WiFi chip or something similar that is present
|
|
inside most laptops and phones or can be connected via USB to a
|
|
machine.
|
|
|
|
- device: an interface provided by the kernel to control an adapter. Some
|
|
adapters can have multiple devices so that you can start an ap
|
|
on one device and an ad-hoc on the other. By default, iwd starts
|
|
only one device each for one adapter.
|
|
|
|
- machine: Since iwd uses the term device for a WiFi interface, we use the
|
|
word machine to refer to a computer, or a laptop, or a phone.
|
|
|
|
- node: a machine that runs naxalnet and is therefore connected to the
|
|
mesh.
|
|
"""
|
|
|
|
from dasbus.connection import SystemMessageBus
|
|
from naxalnet.log import logger
|
|
|
|
IWD_BUS = "net.connman.iwd"
|
|
IWD_ROOT_PATH = "/"
|
|
IWD_DEVICE_INTERFACE = "net.connman.iwd.Device"
|
|
IWD_ADAPTER_INTERFACE = "net.connman.iwd.Adapter"
|
|
|
|
# If you are new to D-Bus, you might want to use a program
|
|
# such as D-Feet (https://wiki.gnome.org/Apps/DFeet) for reference.
|
|
# And try out iwctl to understand iwd's bus objects.
|
|
|
|
|
|
class IWD:
|
|
"""Manage iwd via dbus"""
|
|
|
|
def __init__(self, bus=SystemMessageBus()):
|
|
# self._bus and self._proxy are meant for use only in this submodule
|
|
self._bus = bus
|
|
self.reload()
|
|
|
|
def reload(self):
|
|
"""reload the proxy"""
|
|
self._proxy = self._bus.get_proxy(IWD_BUS, IWD_ROOT_PATH)
|
|
|
|
def get_name_from_path(self, path: str) -> str:
|
|
"""
|
|
returns device or adapter name when d-bus path is given as arg
|
|
"""
|
|
proxy = self._bus.get_proxy(IWD_BUS, path)
|
|
return proxy.Name
|
|
|
|
def get_device_path_from_name(self, name: str) -> str:
|
|
"""returns path of device as str"""
|
|
device_paths = self.get_all_device_paths()
|
|
for i in device_paths:
|
|
proxy = self._bus.get_proxy(IWD_BUS, i)
|
|
if proxy.Name == name:
|
|
# See comment in the function below
|
|
path = i
|
|
return path
|
|
|
|
def get_adapter_path_from_name(self, name: str) -> str:
|
|
"""returns path of adapter as str"""
|
|
adapter_paths = self.get_all_adapter_paths()
|
|
for i in adapter_paths:
|
|
proxy = self._bus.get_proxy(IWD_BUS, i)
|
|
if proxy.Name == name:
|
|
# We could have just used return here, but shutting up
|
|
# pylint has a greter priority at the moment
|
|
path = i
|
|
return path
|
|
|
|
def get_all_device_paths(self) -> list:
|
|
"""returns list of paths of all devices"""
|
|
objects = self._proxy.GetManagedObjects()
|
|
paths = []
|
|
for key, value in objects.items():
|
|
# if value is a device, add its path to paths
|
|
if IWD_DEVICE_INTERFACE in value:
|
|
paths.append(key)
|
|
return paths
|
|
|
|
def get_all_adapter_paths(self) -> list:
|
|
"""returns list of paths of all adapters"""
|
|
objects = self._proxy.GetManagedObjects()
|
|
paths = []
|
|
|
|
for key, value in objects.items():
|
|
# if value is an adapter, add its path to paths
|
|
if IWD_ADAPTER_INTERFACE in value:
|
|
paths.append(key)
|
|
return paths
|
|
|
|
def get_devices(self) -> list:
|
|
"""
|
|
returns list of all device names as str
|
|
|
|
example: ["wlan0", "wlan1"]
|
|
"""
|
|
devices = []
|
|
device_paths = self.get_all_device_paths()
|
|
|
|
for i in device_paths:
|
|
name = self.get_name_from_path(i)
|
|
devices.append(name)
|
|
return devices
|
|
|
|
def get_adapters(self) -> list:
|
|
"""
|
|
returns list of adapters
|
|
|
|
example: ["phy0","phy1"]
|
|
"""
|
|
adapters = []
|
|
adapter_paths = self.get_all_adapter_paths()
|
|
|
|
for i in adapter_paths:
|
|
name = self.get_name_from_path(i)
|
|
adapters.append(name)
|
|
return adapters
|
|
|
|
|
|
class Device:
|
|
"""
|
|
control devices with iwd
|
|
|
|
name: name of device (str)
|
|
adapter: name of adapter (str)
|
|
"""
|
|
|
|
def __init__(self, name: str):
|
|
self._iwd = IWD()
|
|
self._bus = self._iwd._bus
|
|
self._path = self._iwd.get_device_path_from_name(name)
|
|
self.reload()
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def is_powered_on(self) -> bool:
|
|
"""returns True if devie is powered on"""
|
|
return self._proxy.Powered
|
|
|
|
def power_on(self):
|
|
"""Turn on the device and reload the proxy"""
|
|
self._proxy.Powered = True
|
|
logger.debug("Powered on %s", self.name)
|
|
self.reload()
|
|
|
|
def power_off(self):
|
|
"""Turn off the device and reload the proxy"""
|
|
self._proxy.Powered = False
|
|
logger.debug("Powered off %s", self.name)
|
|
self.reload()
|
|
|
|
def reload(self):
|
|
"""
|
|
Reload the proxy. Used liberally by other
|
|
members to work around errors
|
|
"""
|
|
self._proxy = self._bus.get_proxy(IWD_BUS, self._path)
|
|
self.name = self._proxy.Name
|
|
adapter_path = self._proxy.Adapter
|
|
# name of adapter ('phy0' for example)
|
|
self.adapter = self._iwd.get_name_from_path(adapter_path)
|
|
|
|
def is_adhoc_started(self) -> bool:
|
|
"""
|
|
Returns True if an adhoc network is started on this device.
|
|
Returns False if the network is in staring stage, device
|
|
is not powered on or not in ad-hoc mode.
|
|
"""
|
|
if self.is_powered_on() and self.get_mode() == "ad-hoc":
|
|
return self._proxy.Started
|
|
# If above condition is not true, return False
|
|
return False
|
|
|
|
def is_ap_started(self) -> bool:
|
|
"""
|
|
Same as is_adhoc_started(), but for ap
|
|
"""
|
|
if self.is_powered_on() and self.get_mode() == "ap":
|
|
return self._proxy.Started
|
|
return False
|
|
|
|
def get_mode(self) -> str:
|
|
"""
|
|
returns the mode in which the device is in
|
|
example: "ap"
|
|
"""
|
|
return self._proxy.Mode
|
|
|
|
def set_mode(self, mode: str):
|
|
"""change the device mode to mode"""
|
|
self._proxy.Mode = mode
|
|
logger.debug("Set mode on %s to %s", self.name, mode)
|
|
self.reload()
|
|
|
|
def start_adhoc_open(self, name: str):
|
|
"""
|
|
Create ad-hoc network with name, changing mode to ad-hoc
|
|
if it isn't already on ad-hoc and power onn the device
|
|
if it is off
|
|
"""
|
|
# Stop adhoc if already started
|
|
self.stop_adhoc()
|
|
|
|
if self.get_mode() != "ad-hoc":
|
|
self.set_mode("ad-hoc")
|
|
|
|
if not self.is_powered_on():
|
|
self.power_on()
|
|
|
|
logger.debug("Starting ad-hoc on %s", self.name)
|
|
self._proxy.StartOpen(name)
|
|
|
|
def stop_adhoc(self):
|
|
"""stop adhoc if adhoc is started"""
|
|
if self.is_adhoc_started():
|
|
logger.debug("Stopping ad-hoc on %s", self.name)
|
|
self._proxy.Stop()
|
|
self.reload()
|
|
|
|
def start_ap(self, ssid, passwd):
|
|
"""
|
|
Create ap network, changing mode to ap
|
|
if it isn't already on ap and turning
|
|
on the device if it is off
|
|
"""
|
|
|
|
# Stop ap if already started
|
|
self.stop_ap()
|
|
|
|
if self.get_mode() != "ap":
|
|
self.set_mode("ap")
|
|
|
|
if not self.is_powered_on():
|
|
self.power_on()
|
|
|
|
logger.debug("Starting ap on %s with ssid %s", self.name, ssid)
|
|
self._proxy.Start(ssid, passwd)
|
|
|
|
def stop_ap(self):
|
|
"""stop ap if an ap is started"""
|
|
if self.is_ap_started():
|
|
logger.debug("Stopping ap on %s", self.name)
|
|
self._proxy.Stop()
|
|
self.reload()
|
|
|
|
|
|
class Adapter:
|
|
"""represents an adapter as a python object"""
|
|
|
|
def __init__(self, name: str):
|
|
self._iwd = IWD()
|
|
self._bus = self._iwd._bus
|
|
self._path = self._iwd.get_adapter_path_from_name(name)
|
|
# Initialise self._proxy
|
|
self.reload()
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def reload(self):
|
|
"""reload the proxy after changing mode"""
|
|
self._proxy = self._bus.get_proxy(IWD_BUS, self._path)
|
|
self.name = self._proxy.Name
|
|
self.supported_modes = self._proxy.SupportedModes
|
|
|
|
def is_powered_on(self) -> bool:
|
|
"""returns True if adapter is powered on, False otherwise"""
|
|
return self._proxy.Powered
|
|
|
|
def power_on(self):
|
|
"""power on the adapter"""
|
|
self._proxy.Powered = True
|
|
logger.debug("Powered on adapter %s", self.name)
|
|
self.reload()
|
|
|
|
def power_off(self):
|
|
"""power off the adapter"""
|
|
self._proxy.Powered = False
|
|
logger.debug("Powered off adapter %s", self.name)
|
|
self.reload()
|
|
|
|
def supports_mode(self, mode: str) -> bool:
|
|
"""
|
|
Returns True if the adapter supports the mode.
|
|
mode can be "ad-hoc", "ap" or "station"
|
|
"""
|
|
return mode in self.supported_modes
|