home-backup/home_backup/user_service/rpc.py

68 lines
2.0 KiB
Python

import asyncio
import os
from . import backups, config, remotes
from .. import defaults, utils
_own_uid = str(os.getuid())
class BackupManager():
__con:utils.Connection
def __init__(self, con:utils.Connection):
self.__con = con
def gen_create_backup_func(master:BackupManager):
async def callback_func(backups:list):
raise NotImplementedError()
return callback_func
def gen_callback_func(master:BackupManager):
@utils.rpc_callback
async def callback_func(data, uid):
# Check uid
if _own_uid != uid:
raise PermissionError("%s user try to connect." % uid)
# Get operation
if not isinstance(data, dict):
raise ValueError("data have to be a object.")
if "operation" not in data:
raise ValueError("'operation' isn't set.")
operation = data["operation"]
del data["operation"]
# Run operation
if operation == "backup-add":
return await backups.add_backup(data)
elif operation == "remote-add":
return await remotes.add_remote(data)
elif operation == "remote-list":
return await remotes.remote_list(data)
elif operation == "remote-delete":
return await remotes.remote_delete(data)
else:
raise NotImplementedError("%s isn't a supported operation." % repr(operation))
return callback_func
async def run_deamon(user_path:str=None, fork:bool=False, sys_path:str=defaults.DEFAULT_PATH):
# Find path
if user_path is None:
user_path = defaults.USER_PATH
if user_path is None:
raise RuntimeError("Can't find environemt variable 'XDG_RUNTIME_DIR'.")
# Connect to sys service
con = utils.Connection()
await con.init(sys_path)
# Timer
backup_manager = BackupManager(con)
timer_task = asyncio.create_task(config.Timer().run(gen_create_backup_func(backup_manager)))
# Start serving
await utils.run_access_socket(user_path, gen_callback_func(backup_manager), fork=fork)