#!/usr/bin/env python3 import re import argparse import os import gi import json import subprocess gi.require_version('Flatpak', '1.0') from gi.repository import Flatpak from gi.repository import GLib def get_bisection_data(): return {'ref': None, 'good': None, 'bad': None, 'refs': None, 'log': None, 'messages': None} class Bisector(): def load_cache(self): try: os.makedirs(os.path.join(GLib.get_user_cache_dir(), 'flatpak')) except FileExistsError: pass self.cache_path = os.path.join(GLib.get_user_cache_dir(), 'flatpak', '%s-%s-bisect.status' % ( self.name, self.branch)) try: with open(self.cache_path, 'rb') as f: self.data = json.load(f) except FileNotFoundError: self.data = None def dump_data(self): with open(self.cache_path, 'w') as f: json.dump(self.data, f) def setup_flatpak_app(self): self.installation = Flatpak.Installation.new_user() kind = Flatpak.RefKind.APP if self.runtime: kind = Flatpak.RefKind.RUNTIME try: self.cref = self.installation.get_installed_ref(kind, self.name, None, self.branch, None) except GLib.Error as e: print("%s\n\nMake sure %s is installed as a " "user (flatpak install --user) and specify `--runtime`" " if it is a runtime." % (e, self.name)) return -1 return 0 def run(self): self.name = self.name[0] self.load_cache() res = self.setup_flatpak_app() if res: return res try: func = getattr(self, self.subparser_name) except AttributeError: print('No action called %s' % self.subparser_name) return -1 res = func() if self.data: self.dump_data() return res def set_reference_commits(self, set_name, check_name): if not self.data: print("You need to first start the bisection") return -1 ref = self.cref.get_latest_commit() if self.data[check_name] == ref: print('Commit %s is already set as %s...' % ( ref, check_name)) return 1 if ref not in self.data['refs']: print("%s is not a known commit." % ref) return -1 print("Setting %s as %s commit" % (ref, set_name)) self.data[set_name] = ref if self.data[set_name] and self.data[check_name]: x1 = self.data['refs'].index(self.data['good']) x2 = self.data['refs'].index(self.data['bad']) refs = self.data['refs'][x1:x2] if not refs: print("==========================" "First bad commit is:\n%s" "==========================" % self.data['message'][self.data['bad']]) exit(0) ref = refs[int(len(refs) / 2)] if self.data['good'] == ref: print("\n==========================\n" "First bad commit is:\n\n%s" "==========================" % self.data['messages'][self.data['bad']]) exit(0) return self.checkout(ref) return -1 def load_refs(self): repodir, refname = self.download_history() history = subprocess.check_output(['ostree', 'log', '--repo', repodir, refname]).decode() refs = [] messages = {} message = "" _hash = '' for l in history.split('\n'): rehash = re.search('(?<=^commit )\w+', l) if rehash: if message: messages[_hash] = message _hash = rehash.group(0) refs.insert(0, _hash) message = "" message += l + '\n' if message: messages[_hash] = message self.data['refs'] = refs self.data['log'] = history self.data['messages'] = messages def good(self): if not self.data['bad']: print("Set the bad commit first") exit(-1) return self.set_reference_commits('good', 'bad') def bad(self): return self.set_reference_commits('bad', 'good') def start(self): if self.data: print('Bisection already started') return -1 print("Updating to %s latest commit" % self.name) self.reset(False) self.data = get_bisection_data() self.load_refs() def download_history(self): print("Getting history") appidir = os.path.abspath(os.path.join(self.cref.get_deploy_dir(), '..')) dirname = "app" if self.runtime: dirname = "runtime" appidir = appidir.split('/%s/' % dirname) repodir = os.path.join(appidir[0], 'repo') refname = self.cref.get_origin() + ':' + dirname + '/' + self.cref.get_name() + '/' + self.cref.get_arch() + '/' + self.cref.get_branch() # FIXME Getting `error: Exceeded maximum recursion` in ostree if using --depth=-1 (or > 250) subprocess.call(['ostree', 'pull', '--depth=250', '--commit-metadata-only', '--repo', repodir, refname]) return repodir, refname def log(self): if self.data: cmd = ['echo', self.data['log']] else: repodir, refname = self.download_history() cmd = ['ostree', 'log', '--repo', repodir, refname] pager = os.environ.get('PAGER') if pager: stdout = subprocess.PIPE else: stdout = subprocess.STDOUT p = subprocess.Popen(cmd, stdout=stdout) if pager: ps = subprocess.check_call((pager), stdin=p.stdout) p.wait() def checkout(self, commit=None): if not commit: commit = self.commit[0] refname = self.cref.get_name() + '/' + self.cref.get_arch() + '/' + self.cref.get_branch() print("Checking out %s" % commit) return subprocess.call(['flatpak', 'update', '--user', refname, '--commit', commit]) def reset(self, v=True): if not self.data: if v: print("Not bisecting, nothing to reset") return -1 refname = self.cref.get_name() + '/' + self.cref.get_arch() + '/' + self.cref.get_branch() print("Removing %s" % self.cache_path) os.remove(self.cache_path) self.data = None return subprocess.call(['flatpak', 'update', '--user', refname]) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('name', nargs=1, help='Application/Runtime to bisect') parser.add_argument('-b', '--branch', default='master', help='The branch to bisect') parser.add_argument('-r', '--runtime', action="store_true", help='Bisecting a runtime not an app') subparsers = parser.add_subparsers(dest='subparser_name') subparsers.required = True start_parser = subparsers.add_parser('start', help="Start bisection") bad_parser = subparsers.add_parser('bad', help="Set current version as bad") good_parser = subparsers.add_parser('good', help="Set current version as good") log_parser = subparsers.add_parser('log', help="Download and print application commit history") checkout_parser = subparsers.add_parser('checkout', help="Checkout defined commit") checkout_parser.add_argument('commit', nargs=1, help='The commit hash to checkout') reset_parser = subparsers.add_parser('reset', help="Reset all bisecting data and go back to latest commit") bisector = Bisector() options = parser.parse_args(namespace=bisector) bisector.run()