home-backup/home_backup/user_service/remotes.py

111 lines
2.7 KiB
Python

from .. import utils
class Remote():
rtype:str
name:str
target:str
def __init__(self, rtype:str, name:str, target:str=None):
# Check args
if rtype not in ("borgbackup",):
raise ValueError("rtype have to be borgbackup.")
utils.valid_name_check(name)
self.rtype = rtype
self.name = name
# Check borg config
if rtype == "borgbackup":
if not isinstance(target, str):
raise TypeError("target have to be a string.")
self.target = target
def dump_config(self):
if self.rtype == "borgbackup":
return {"type": "borgbackup", "target":self.target}
else:
raise NotImplementedError("Unknown backup type %s to dump." % self.rtype)
@staticmethod
def load_remote(name:str, config):
config = dict(config.items())
if config["type"] == "borgbackup":
# Borg backup
return Remote(rtype="borgbackup", name=name, target=config["target"])
else:
raise ValueError("Unknown backup type %s." % repr(config["type"]))
# RPC implementations
async def add_remote(data:dict):
# Load config
from . import config
# Load data
name = data["name"]
del data["name"]
rtype = data["type"]
del data["type"]
info = data["info"]
if not isinstance(info, dict):
raise TypeError("info isn't a object.")
info["type"] = rtype
del data["info"]
utils.check_empty_data_dict(data)
rem = Remote.load_remote(name, info)
# Set data
async with config.config_lock:
# Check
if name in config.remotes:
return {"status": "fail-already-exists"}
# Set remote
config.remotes[name] = rem
await config.save_config()
# Return success
return {"status": "success"}
async def remote_list(data:dict):
# Import config
from . import config
# Generate result
utils.check_empty_data_dict(data)
result = {}
async with config.config_lock:
for iID, i in config.remotes.items():
result[iID] = i.dump_config()
# Return result
return {"status": "success", "data": result}
async def remote_delete(data:dict):
# Import config
from . import config
# Check and delete
name = data["name"]
del data["name"]
utils.check_empty_data_dict(data)
async with config.config_lock:
# Check
if name not in config.remotes:
return {"status": "failed-not-existing"}
# TODO: Check if backup still used by backup
# Remove backup
del config.remotes[name]
await config.save_config()
# Return result
return {"status": "success"}