278 lines
11 KiB
Python
278 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import configparser
|
|
import hashlib
|
|
import io
|
|
import itertools
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import yaml
|
|
|
|
|
|
def _load_stdout(args):
|
|
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
p.wait()
|
|
result = p.stdout.read()
|
|
if p.returncode != 0:
|
|
raise RuntimeError("Process crashed with " + repr(p.returncode) + ". Args: " + repr(args))
|
|
return result
|
|
|
|
|
|
def list_remotes(verbose=False):
|
|
result = _load_stdout(["flatpak", "remote-list", "--user", "--columns=name"]).decode().splitlines()
|
|
if verbose:
|
|
print("Remotes: " + ", ".join(result), file=sys.stderr)
|
|
return result
|
|
|
|
|
|
def commit_hashes(name, use_local=False, verbose=False):
|
|
result = []
|
|
if use_local:
|
|
result = [_load_stdout(["flatpak", "info", "--user", "-c", name]).lower()]
|
|
else:
|
|
for i in list_remotes(verbose=verbose):
|
|
out = None
|
|
try:
|
|
out = _load_stdout(["flatpak", "remote-info", "--user", "-c", i, name]).lower()
|
|
except RuntimeError:
|
|
pass
|
|
if out is not None:
|
|
out = out.splitlines()[0]
|
|
for i in filter(lambda x: not(b"a"[0] <= x <= b"z"[0] or b"0"[0] <= x <= b"9"[0]), out):
|
|
raise ValueError("Wrong char: " + repr(bytes([i])))
|
|
result.append(out)
|
|
if verbose:
|
|
if use_local:
|
|
print("Commit local hashes for " + name + ": " + ", ".join(result), file=sys.stderr)
|
|
else:
|
|
print("Commit remote hashes for " + name + ": " + ", ".join(result), file=sys.stderr)
|
|
return result
|
|
|
|
|
|
def commit_metas(name, use_local=False, verbose=False):
|
|
result = []
|
|
if use_local:
|
|
result = [_load_stdout(["flatpak", "info", "--user", "-m", name])]
|
|
else:
|
|
for i in list_remotes(verbose=verbose):
|
|
out = None
|
|
try:
|
|
out = _load_stdout(["flatpak", "remote-info", "--user", "-m", i, name])
|
|
except RuntimeError:
|
|
pass
|
|
if out is not None:
|
|
out = out.splitlines()[0]
|
|
for i in filter(lambda x: not(b"a"[0] <= x <= b"z"[0] or b"0"[0] <= x <= b"9"[0]), out):
|
|
raise ValueError("Wrong char: " + repr(bytes([i])))
|
|
result.append(out.decode())
|
|
return result
|
|
|
|
|
|
__hash_find = re.compile(".*WB_HASH\\=\\'(?P<hash>[a-zA-Z0-9]*)\\'.*")
|
|
def wb_hashes(name, verbose=False):
|
|
result = []
|
|
for i in list_remotes(verbose=verbose):
|
|
out = None
|
|
try:
|
|
out = _load_stdout(["flatpak", "remote-info", "--user", i, name])
|
|
except RuntimeError:
|
|
pass
|
|
if out is not None:
|
|
if b"WB_HASH=" in out:
|
|
out = out[out.find(b"WB_HASH="):]
|
|
else:
|
|
continue
|
|
out = __hash_find.match(out.decode())
|
|
if out is not None:
|
|
result.append(out.groupdict()["hash"].lower())
|
|
else:
|
|
continue
|
|
if verbose:
|
|
print("Commit wb hashes for " + name + ": " + ", ".join(result), file=sys.stderr)
|
|
return result
|
|
|
|
|
|
class ModuleLoader():
|
|
path = str
|
|
content = dict
|
|
__moduleFiles = {}
|
|
|
|
def __module(self, modules):
|
|
for iID in range(len(modules)):
|
|
i = modules[iID]
|
|
if isinstance(i, str):
|
|
i = self.relative(i)
|
|
if i in ModuleLoader.__moduleFiles:
|
|
tmp = ModuleLoader.__moduleFiles[i]
|
|
else:
|
|
tmp = ModuleLoader(i)
|
|
modules[iID] = tmp.content
|
|
else:
|
|
if "modules" in i:
|
|
self.__module(i["modules"])
|
|
|
|
def __init__(self, source):
|
|
if not isinstance(source, str):
|
|
raise ValueError("source have to be string. It was a "
|
|
+ repr(source) + ".")
|
|
self.path = os.path.abspath(source)
|
|
ModuleLoader.__moduleFiles[self.path] = self
|
|
self.content = yaml.load(open(self.path, "r").read(),
|
|
Loader=yaml.SafeLoader)
|
|
if "modules" in self.content:
|
|
self.__module(self.content["modules"])
|
|
|
|
def hash_content(self, function):
|
|
tmp = json.dumps(self.content, skipkeys=False, ensure_ascii=True,
|
|
check_circular=True, allow_nan=True, indent=None,
|
|
separators=("", ""), default=None, sort_keys=True)
|
|
tmp = tmp.encode("utf-8")
|
|
return function(tmp)
|
|
|
|
def get_depends(self, arch, use_local=False, verbose=False):
|
|
version_src = "/" + arch + "/" + self.content["runtime-version"]
|
|
result = [[self.content["sdk"] + version_src],
|
|
[self.content["runtime"] + version_src]]
|
|
|
|
# Sdk extensions
|
|
if "sdk-extensions" in self.content:
|
|
# Parse meta config
|
|
meta = []
|
|
for i in commit_metas(self.content["sdk"] + version_src, use_local=use_local, verbose=verbose):
|
|
config = configparser.ConfigParser()
|
|
config.readfp(io.BytesIO(i))
|
|
meta.append(config)
|
|
|
|
# Parse extensions
|
|
for i in self.content["sdk-extensions"]:
|
|
iSplit = i.split(".")
|
|
ext_result = set()
|
|
for config in meta:
|
|
for jSize in range(1, len(iSplit) + 1):
|
|
j = ".".join(iSplit[:jSize])
|
|
|
|
# Try version
|
|
try:
|
|
tmp = config.get("Extension " + j, "version")
|
|
ext_result.add(i + "/" + arch + "/" + tmp)
|
|
except configparser.NoSectionError:
|
|
pass
|
|
|
|
# Try versions
|
|
try:
|
|
tmp = config.get("Extension " + j, "versions").split(";")
|
|
ext_result = ext_result.union(set(map(lambda x: i + "/" + arch + "/" + x, tmp)))
|
|
except configparser.NoSectionError:
|
|
pass
|
|
result.append(list(ext_result))
|
|
|
|
# Debug output
|
|
if verbose:
|
|
print("Flatpak depends: " + "\n".join(map(repr, result)), file=sys.stderr)
|
|
return result
|
|
|
|
def hashes_from_base_platforms(self, arch, function, use_local=False, verbose=False):
|
|
# Get source
|
|
def helper_func(content):
|
|
return function(content)
|
|
sourceHash = self.hash_content(helper_func)
|
|
|
|
# Build hashes
|
|
base_hashes = []
|
|
for i in self.get_depends(arch, use_local=use_local, verbose=verbose):
|
|
tmp = set()
|
|
for j in i:
|
|
tmp = tmp.union(set(map(lambda x: x.lower(), commit_hashes(j, use_local=use_local, verbose=verbose))))
|
|
base_hashes.append(sorted(tmp))
|
|
result = list(map(lambda x: function(b"\x00".join(x) + b"\x00" + sourceHash), itertools.product(*base_hashes)))
|
|
if verbose:
|
|
if use_local:
|
|
print("Local hashes for: " + ", ".join(map(bytes.decode, result)), file=sys.stderr)
|
|
else:
|
|
print("Remote hashes for: " + ", ".join(map(bytes.decode, result)), file=sys.stderr)
|
|
return result
|
|
|
|
def old_wb_hashes(self, arch, verbose=False):
|
|
version_target = "/" + arch + "/" + self.content["branch"]
|
|
result = []
|
|
result += wb_hashes(self.content["id"] + version_target, verbose=verbose)
|
|
if "id-platform" in self.content:
|
|
result += wb_hashes(self.content["id-platform"] + version_target, verbose=verbose)
|
|
return result
|
|
|
|
def version_hashes(self, arch, use_local=False, verbose=False):
|
|
def func(data):
|
|
tmp = hashlib.sha3_256()
|
|
tmp.update(data)
|
|
return tmp.hexdigest().encode()
|
|
return list(map(bytes.decode, self.hashes_from_base_platforms(arch, func, use_local=use_local, verbose=verbose)))
|
|
|
|
def required_update(self, arch, use_local=False, verbose=False):
|
|
own_hash = set(self.version_hashes(arch, use_local=use_local, verbose=verbose))
|
|
old_hashes = set(self.old_wb_hashes(arch, verbose=verbose))
|
|
return len(own_hash.intersection(old_hashes)) == 0
|
|
|
|
def get_id(self, arch):
|
|
id = self.content["id"]
|
|
branch = str(self.content["branch"])
|
|
assert "/" not in id
|
|
assert "/" not in arch
|
|
assert "/" not in branch
|
|
if bool(self.content.get("build-runtime", False)) or bool():
|
|
return "runtime/%s/%s/%s" % (id, arch, branch)
|
|
else:
|
|
return "app/%s/%s/%s" % (id, arch, branch)
|
|
|
|
def relative(self, file):
|
|
return os.path.abspath(os.path.join(os.path.split(self.path)[0], file))
|
|
|
|
|
|
def get_need_update(source, arch, use_local=False, verbose=False):
|
|
mod = ModuleLoader(source)
|
|
return mod.required_update(arch, use_local=use_local, verbose=verbose)
|
|
|
|
def get_own_hash(source, arch, use_local=False, verbose=False):
|
|
mod = ModuleLoader(source)
|
|
tmp = mod.version_hashes(arch, use_local=use_local, verbose=verbose)
|
|
if len(tmp) != 1:
|
|
raise ValueError("No unique version number possible.")
|
|
if not isinstance(tmp[0], str):
|
|
raise ValueError("Result have to be an string.")
|
|
return tmp[0]
|
|
|
|
def get_id(source, arch):
|
|
mod = ModuleLoader(source)
|
|
return mod.get_id(arch)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
|
|
# Parse config
|
|
parser = argparse.ArgumentParser(description="Generate hash of a module.")
|
|
parser.add_argument("file", metavar="file", type=str, nargs=1,
|
|
help="File configuration to generate hash.")
|
|
parser.add_argument("arch", metavar="arch", type=str, nargs=1,
|
|
help="The arch to build.")
|
|
parser.add_argument("--require-build", dest="rebuild", action="store_const",
|
|
const=True, default=False, help="Returns 0 when outdated.")
|
|
parser.add_argument("--installed", dest="installed", action="store_const",
|
|
const=True, default=False, help="Use installed then remote.")
|
|
parser.add_argument("-v", "--verbose", dest="verbose", action="store_const",
|
|
const=True, default=False, help="Verbose information to stderr.")
|
|
parser.add_argument("--get-id", dest="get_id", action="store_const",
|
|
const=True, default=False, help="Get the id.")
|
|
args = parser.parse_args()
|
|
|
|
if args.get_id:
|
|
print(get_id(args.file[0], args.arch[0]))
|
|
elif args.rebuild:
|
|
if get_need_update(args.file[0], args.arch[0], use_local=args.installed, verbose=args.verbose):
|
|
exit(0)
|
|
else:
|
|
exit(1)
|
|
else:
|
|
print(get_own_hash(args.file[0], args.arch[0], use_local=args.installed, verbose=args.verbose))
|