#!/usr/bin/env python3 import configparser import hashlib import io import itertools import json import os import re import subprocess import sys import yaml def _load_stdout(args): p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.wait() result = p.stdout.read() if p.returncode != 0: raise RuntimeError("Process crashed with " + repr(p.returncode) + ". Args: " + repr(args)) return result def list_remotes(verbose=False): result = _load_stdout(["flatpak", "remote-list", "--user", "--columns=name"]).decode().splitlines() if verbose: print("Remotes: " + ", ".join(result), file=sys.stderr) return result def commit_hashes(name, use_local=False, verbose=False): result = [] if use_local: result = [_load_stdout(["flatpak", "info", "--user", "-c", name]).lower()] else: for i in list_remotes(verbose=verbose): out = None try: out = _load_stdout(["flatpak", "remote-info", "--user", "-c", i, name]).lower() except RuntimeError: pass if out is not None: out = out.splitlines()[0] for i in filter(lambda x: not(b"a"[0] <= x <= b"z"[0] or b"0"[0] <= x <= b"9"[0]), out): raise ValueError("Wrong char: " + repr(bytes([i]))) result.append(out) if verbose: if use_local: print("Commit local hashes for " + name + ": " + ", ".join(result), file=sys.stderr) else: print("Commit remote hashes for " + name + ": " + ", ".join(result), file=sys.stderr) return result def commit_metas(name, use_local=False, verbose=False): result = [] if use_local: result = [_load_stdout(["flatpak", "info", "--user", "-m", name])] else: for i in list_remotes(verbose=verbose): out = None try: out = _load_stdout(["flatpak", "remote-info", "--user", "-m", i, name]) except RuntimeError: pass if out is not None: out = out.splitlines()[0] for i in filter(lambda x: not(b"a"[0] <= x <= b"z"[0] or b"0"[0] <= x <= b"9"[0]), out): raise ValueError("Wrong char: " + repr(bytes([i]))) result.append(out.decode()) return result __hash_find = re.compile(".*WB_HASH\\=\\'(?P[a-zA-Z0-9]*)\\'.*") def wb_hashes(name, verbose=False): result = [] for i in list_remotes(verbose=verbose): out = None try: out = _load_stdout(["flatpak", "remote-info", "--user", i, name]) except RuntimeError: pass if out is not None: if b"WB_HASH=" in out: out = out[out.find(b"WB_HASH="):] else: continue out = __hash_find.match(out.decode()) if out is not None: result.append(out.groupdict()["hash"].lower()) else: continue if verbose: print("Commit wb hashes for " + name + ": " + ", ".join(result), file=sys.stderr) return result class ModuleLoader(): path = str content = dict __moduleFiles = {} def __module(self, modules): for iID in range(len(modules)): i = modules[iID] if isinstance(i, str): i = self.relative(i) if i in ModuleLoader.__moduleFiles: tmp = ModuleLoader.__moduleFiles[i] else: tmp = ModuleLoader(i) modules[iID] = tmp.content else: if "modules" in i: self.__module(i["modules"]) def __init__(self, source): if not isinstance(source, str): raise ValueError("source have to be string. It was a " + repr(source) + ".") self.path = os.path.abspath(source) ModuleLoader.__moduleFiles[self.path] = self self.content = yaml.load(open(self.path, "r").read(), Loader=yaml.SafeLoader) if "modules" in self.content: self.__module(self.content["modules"]) def hash_content(self, function): tmp = json.dumps(self.content, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, indent=None, separators=("", ""), default=None, sort_keys=True) tmp = tmp.encode("utf-8") return function(tmp) def get_depends(self, arch, use_local=False, verbose=False): version_src = "/" + arch + "/" + self.content["runtime-version"] result = [[self.content["sdk"] + version_src], [self.content["runtime"] + version_src]] # Sdk extensions if "sdk-extensions" in self.content: # Parse meta config meta = [] for i in commit_metas(self.content["sdk"] + version_src, use_local=use_local, verbose=verbose): config = configparser.ConfigParser() config.readfp(io.BytesIO(i)) meta.append(config) # Parse extensions for i in self.content["sdk-extensions"]: iSplit = i.split(".") ext_result = set() for config in meta: for jSize in range(1, len(iSplit) + 1): j = ".".join(iSplit[:jSize]) # Try version try: tmp = config.get("Extension " + j, "version") ext_result.add(i + "/" + arch + "/" + tmp) except configparser.NoSectionError: pass # Try versions try: tmp = config.get("Extension " + j, "versions").split(";") ext_result = ext_result.union(set(map(lambda x: i + "/" + arch + "/" + x, tmp))) except configparser.NoSectionError: pass result.append(list(ext_result)) # Debug output if verbose: print("Flatpak depends: " + "\n".join(map(repr, result)), file=sys.stderr) return result def hashes_from_base_platforms(self, arch, function, use_local=False, verbose=False): # Get source def helper_func(content): return function(content) sourceHash = self.hash_content(helper_func) # Build hashes base_hashes = [] for i in self.get_depends(arch, use_local=use_local, verbose=verbose): tmp = set() for j in i: tmp = tmp.union(set(map(lambda x: x.lower(), commit_hashes(j, use_local=use_local, verbose=verbose)))) base_hashes.append(sorted(tmp)) result = list(map(lambda x: function(b"\x00".join(x) + b"\x00" + sourceHash), itertools.product(*base_hashes))) if verbose: if use_local: print("Local hashes for: " + ", ".join(map(bytes.decode, result)), file=sys.stderr) else: print("Remote hashes for: " + ", ".join(map(bytes.decode, result)), file=sys.stderr) return result def old_wb_hashes(self, arch, verbose=False): version_target = "/" + arch + "/" + self.content["branch"] result = [] result += wb_hashes(self.content["id"] + version_target, verbose=verbose) if "id-platform" in self.content: result += wb_hashes(self.content["id-platform"] + version_target, verbose=verbose) return result def version_hashes(self, arch, use_local=False, verbose=False): def func(data): tmp = hashlib.sha3_256() tmp.update(data) return tmp.hexdigest().encode() return list(map(bytes.decode, self.hashes_from_base_platforms(arch, func, use_local=use_local, verbose=verbose))) def required_update(self, arch, use_local=False, verbose=False): own_hash = set(self.version_hashes(arch, use_local=use_local, verbose=verbose)) old_hashes = set(self.old_wb_hashes(arch, verbose=verbose)) return len(own_hash.intersection(old_hashes)) == 0 def get_id(self, arch): id = self.content["id"] branch = str(self.content["branch"]) assert "/" not in id assert "/" not in arch assert "/" not in branch if bool(self.content.get("build-runtime", False)) or bool(): return "runtime/%s/%s/%s" % (id, arch, branch) else: return "app/%s/%s/%s" % (id, arch, branch) def relative(self, file): return os.path.abspath(os.path.join(os.path.split(self.path)[0], file)) def get_need_update(source, arch, use_local=False, verbose=False): mod = ModuleLoader(source) return mod.required_update(arch, use_local=use_local, verbose=verbose) def get_own_hash(source, arch, use_local=False, verbose=False): mod = ModuleLoader(source) tmp = mod.version_hashes(arch, use_local=use_local, verbose=verbose) if len(tmp) != 1: raise ValueError("No unique version number possible.") if not isinstance(tmp[0], str): raise ValueError("Result have to be an string.") return tmp[0] def get_id(source, arch): mod = ModuleLoader(source) return mod.get_id(arch) if __name__ == '__main__': import argparse # Parse config parser = argparse.ArgumentParser(description="Generate hash of a module.") parser.add_argument("file", metavar="file", type=str, nargs=1, help="File configuration to generate hash.") parser.add_argument("arch", metavar="arch", type=str, nargs=1, help="The arch to build.") parser.add_argument("--require-build", dest="rebuild", action="store_const", const=True, default=False, help="Returns 0 when outdated.") parser.add_argument("--installed", dest="installed", action="store_const", const=True, default=False, help="Use installed then remote.") parser.add_argument("-v", "--verbose", dest="verbose", action="store_const", const=True, default=False, help="Verbose information to stderr.") parser.add_argument("--get-id", dest="get_id", action="store_const", const=True, default=False, help="Get the id.") args = parser.parse_args() if args.get_id: print(get_id(args.file[0], args.arch[0])) elif args.rebuild: if get_need_update(args.file[0], args.arch[0], use_local=args.installed, verbose=args.verbose): exit(0) else: exit(1) else: print(get_own_hash(args.file[0], args.arch[0], use_local=args.installed, verbose=args.verbose))