home-backup/home_backup/user_service/rpc.py

58 lines
1.6 KiB
Python

import asyncio
import os
from . import config
from .. import defaults, utils
_own_uid = str(os.getuid())
class BackupManager():
__con:utils.Connection
def __init__(self, con:utils.Connection):
self.__con = con
def gen_create_backup_func(master:BackupManager):
async def callback_func(backups:list):
raise NotImplementedError()
return callback_func
def gen_callback_func(master:BackupManager):
@utils.rpc_callback
async def callback_func(data, uid):
# Check uid
if _own_uid != uid:
raise PermissionError("%s user try to connect." % uid)
# Get operation
if not isinstance(data, dict):
raise ValueError("data have to be a object.")
if "operation" not in data:
raise ValueError("'operation' isn't set.")
operation = data["operation"]
# Run operation
raise NotImplementedError()
return callback_func
async def run_deamon(user_path:str=None, fork:bool=False, sys_path:str=defaults.DEFAULT_PATH):
# Find path
if user_path is None:
user_path = defaults.USER_PATH
if user_path is None:
raise RuntimeError("Can't find environemt variable 'XDG_RUNTIME_DIR'.")
# Connect to sys service
con = utils.Connection()
await con.init(sys_path)
# Timer
backup_manager = BackupManager(con)
timer_task = asyncio.create_task(config.Timer().run(gen_create_backup_func(backup_manager)))
# Start serving
await utils.run_access_socket(user_path, gen_callback_func(backup_manager), fork=fork)