#!/usr/bin/env python3 import hashlib import itertools import json import os import re import subprocess import sys import yaml def _load_stdout(args): p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.wait() result = p.stdout.read() if p.returncode != 0: raise RuntimeError("Process crashed with " + repr(p.returncode) + ". Args: " + repr(args)) return result def list_remotes(verbose=False): result = _load_stdout(["flatpak", "remote-list", "--user", "--columns=name"]).decode().splitlines() if verbose: print("Remotes: " + ", ".join(result), file=sys.stderr) return result def commit_hashes(name, verbose=False): result = [] for i in list_remotes(verbose=verbose): out = None try: out = _load_stdout(["flatpak", "remote-info", "--user", "-c", i, name]).lower() except RuntimeError: pass if out is not None: out = out.splitlines()[0] for i in filter(lambda x: not(b"a"[0] <= x <= b"z"[0] or b"0"[0] <= x <= b"9"[0]), out): raise ValueError("Wrong char: " + repr(bytes([i]))) result.append(out.decode()) if verbose: print("Commit hashes for " + name + ": " + ", ".join(result), file=sys.stderr) return result __hash_find = re.compile(".*WB_HASH\\=\\'(?P[a-zA-Z0-9]*)\\'.*") def wb_hashes(name, verbose=False): result = [] for i in list_remotes(verbose=verbose): out = None try: out = _load_stdout(["flatpak", "remote-info", "--user", i, name]) except RuntimeError: pass if out is not None: if b"WB_HASH=" in out: out = out[out.find(b"WB_HASH="):] else: continue out = __hash_find.match(out.decode()) if out is not None: result.append(out.groupdict()["hash"].lower()) else: continue if verbose: print("Commit wb hashes for " + name + ": " + ", ".join(result), file=sys.stderr) return result class ModuleLoader(): path = str content = dict __moduleFiles = {} def __module(self, modules): for iID in range(len(modules)): i = modules[iID] if isinstance(i, str): i = self.relative(i) if i in ModuleLoader.__moduleFiles: tmp = ModuleLoader.__moduleFiles[i] else: tmp = ModuleLoader(i) modules[iID] = tmp.content else: if "modules" in i: self.__module(i["modules"]) def __init__(self, source): if not isinstance(source, str): raise ValueError("source have to be string. It was a " + repr(source) + ".") self.path = os.path.abspath(source) ModuleLoader.__moduleFiles[self.path] = self self.content = yaml.load(open(self.path, "r").read(), Loader=yaml.SafeLoader) if "modules" in self.content: self.__module(self.content["modules"]) def hash_content(self, function): tmp = json.dumps(self.content, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, indent=None, separators=("", ""), default=None, sort_keys=True) tmp = tmp.encode("utf-8") return function(tmp) def get_depends(self, arch): version_src = "/" + arch + "/" + self.content["runtime-version"] result = [self.content["sdk"] + version_src, self.content["runtime"] + version_src] return result def hashes_from_base_platforms(self, arch, function, verbose=False): # Get source source = None def helper_func(content): nonlocal source source = content self.hash_content(helper_func) # Build hashes base_hashes = list(map(lambda x: list(map(lambda y: y.encode("utf-8"), commit_hashes(x, verbose=verbose))), self.get_depends(arch))) result = list(map(lambda x: function(b"\x00".join(x) + b"\x00" + source), itertools.product(*base_hashes))) if verbose: print("Local hashes for: " + ", ".join(result), file=sys.stderr) return result def old_wb_hashes(self, arch, verbose=False): version_target = "/" + arch + "/" + self.content["branch"] result = [] result += wb_hashes(self.content["id"] + version_target, verbose=verbose) if "id-platform" in self.content: result += wb_hashes(self.content["id-platform"] + version_target, verbose=verbose) return result def version_hashes(self, arch, verbose=False): def func(data): tmp = hashlib.sha3_256() tmp.update(data) return tmp.hexdigest() return self.hashes_from_base_platforms(arch, func, verbose=verbose) def required_update(self, arch, verbose=False): own_hash = set(self.version_hashes(arch, verbose=verbose)) old_hashes = set(self.old_wb_hashes(arch, verbose=verbose)) return len(own_hash.intersection(old_hashes)) == 0 def relative(self, file): return os.path.abspath(os.path.join(os.path.split(self.path)[0], file)) def get_need_update(source, arch, verbose=False): mod = ModuleLoader(source) return mod.required_update(arch, verbose=verbose) def get_own_hash(source, arch, verbose=False): mod = ModuleLoader(source) tmp = mod.version_hashes(arch, verbose=verbose) if len(tmp) != 1: raise ValueError("No unique version number possible.") return tmp[0] if __name__ == '__main__': import argparse # Parse config parser = argparse.ArgumentParser(description="Generate hash of a module.") parser.add_argument("file", metavar="file", type=str, nargs=1, help="File configuration to generate hash.") parser.add_argument("arch", metavar="arch", type=str, nargs=1, help="The arch to build.") parser.add_argument("--require-build", dest="rebuild", action="store_const", const=True, default=False, help="Returns 0 when outdated.") parser.add_argument("-v", "--verbose", dest="verbose", action="store_const", const=True, default=False, help="Verbose information to stderr.") args = parser.parse_args() if args.rebuild: if get_need_update(args.file[0], args.arch[0], verbose=args.verbose): exit(0) else: exit(1) else: print(get_own_hash(args.file[0], args.arch[0], verbose=args.verbose))