import asyncio import json class Mountpoint(): fstype:str target:str subvolume:str def __init__(self, fstype:str, target:str, subvolume:str): # Check args if not isinstance(fstype, str): raise ValueError("fstype isn't a strng.") if not isinstance(target, str): raise ValueError("target isn't a strng.") if subvolume is not None and not isinstance(subvolume, str): raise ValueError("subvolume isn't a strng.") # Set values self.fstype = fstype self.target = target self.subvolume = subvolume async def list_mounts(): # List mounts proc_call = await asyncio.create_subprocess_exec(b"findmnt", b"-lJ", stdout=asyncio.subprocess.PIPE) proc_data = await proc_call.communicate() if not isinstance(proc_data, tuple): raise RuntimeError("Type doesn't match.") if proc_call.returncode != 0: raise RuntimeError("Can't find mounts.") data = json.loads(proc_data[0]) # Parse mounts result = [] for mount in data["filesystem"]: subvolume = None if mount["fstype"] == "btrfs": search_term = "subvol=" tmp = list(filter(lambda x: x.startswith(search_term), mount["options"].split(","))) if len(tmp) != 1: raise ValueError("Can't find subvolume of mount %s." % repr(mount["target"])) subvolume = tmp[0][len(search_term):] result.append(Mountpoint(mount["target"], mount["fstype"], subvolume)) return result