from . import paths import asyncio import hashlib import json import os class ConfigAndBackup(): __instance = None def _hash_instance(self): hash = hashlib.sha3_256() hash.update(self.__instance.get_instance().encode()) return hash.hexdigest() async def _list_installed_hashes(self): # List archives archives_proc = await asyncio.create_subprocess_exec(b"borg", b"list", b"--json", paths.BACKUP_DIR.encode(), stdout=asyncio.subprocess.PIPE) stdout, stderr = await archives_proc.communicate() if archives_proc.returncode != 0: raise RuntimeError("Borg crashed.") archives = [] for i in json.loads(stdout)["archives"]: archives.append(i["name"]) # Filter own instance_hash = self._hash_instance() return list(map(lambda x: x[len(instance_hash) + 1:], filter(lambda x: x.startswith(instance_hash), archives))) async def _run_backup(self, name: bytes, *args, path:bytes=None): # Check if exists (when true delete it) archive_name = b"%s::%s" % (paths.BACKUP_DIR.encode(), name) if name in await self._list_installed_hashes(): process = await asyncio.create_subprocess_exec(b"borg", b"delete", archive_name) await process.wait() if process.returncode != 0: raise RuntimeError("Backup can't be created.") # Create backup if path is None: path = self.__instance.get_instance_path().encode() process = await asyncio.create_subprocess_exec(b"borg", b"create", b"-C", b"zstd,5", archive_name, *args, cwd=path) await process.wait() if process.returncode != 0: raise RuntimeError("Backup can't be created.") def __init__(self, instance): # Set values self.__instance = instance async def init(self): # Check borg process = await asyncio.create_subprocess_exec(b"borg", b"--version", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) await process.wait() if process.returncode != 0: raise RuntimeError("Can't find borg backup.") # Create archive process = await asyncio.create_subprocess_exec(b"borg", b"info", paths.BACKUP_DIR.encode(), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) await process.wait() if process.returncode != 0: process = await asyncio.create_subprocess_exec(b"borg", b"init", b"-e", b"none", paths.BACKUP_DIR.encode(), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) await process.wait() if process.returncode != 0: raise RuntimeError("Borg repo is broken.") async def gen_install_backup(self, name:str): await self._run_backup(b"install_%s" + (name.encode(),), b".")