diff options
author | Edward Hervey <bilboed@bilboed.com> | 2015-03-03 17:40:53 +0100 |
---|---|---|
committer | Edward Hervey <bilboed@bilboed.com> | 2018-02-21 11:31:07 +0100 |
commit | 3b5ca4a3794171750ae33410b34832177d75b410 (patch) | |
tree | fc8b8bd91256859a75175248db0acc67c8ab8544 | |
parent | dbbd47b842df2438f7ba7ca872b60766b60abc6d (diff) |
WIP cerbero parallell recipe buildparallel
Instead of building one recipe after another, have more than one slot
building recipes at the same (currently hardcoded to 4)
TODO: Fix bootstrap
TODO: Fix issue when a build fails (deadlock somewhere ?)
TODO: Cleanup
-rw-r--r-- | cerbero/build/build.py | 17 | ||||
-rw-r--r-- | cerbero/build/cookbook.py | 15 | ||||
-rw-r--r-- | cerbero/build/oven.py | 134 | ||||
-rw-r--r-- | cerbero/commands/build.py | 12 | ||||
-rw-r--r-- | cerbero/utils/shell.py | 81 |
5 files changed, 220 insertions, 39 deletions
diff --git a/cerbero/build/build.py b/cerbero/build/build.py index d2834a92..c3bbfa7a 100644 --- a/cerbero/build/build.py +++ b/cerbero/build/build.py @@ -24,7 +24,6 @@ from cerbero.utils import messages as m import shutil import re - class Build (object): ''' Base class for build handlers @@ -178,18 +177,19 @@ class MakefilesBase (Build): for var in append_env.keys() + new_env.keys(): self._old_env[var] = os.environ.get(var, None) + the_env = os.environ.copy() + for var, val in append_env.iteritems(): - if not os.environ.has_key(var): - os.environ[var] = val - else: - os.environ[var] = '%s %s' % (os.environ[var], val) + the_env[var] = '%s %s' % (os.environ.get(var, ''), val) for var, val in new_env.iteritems(): if val is None: - if var in os.environ: - del os.environ[var] + if var in the_env: + del the_env[var] else: - os.environ[var] = val + the_env[var] = val + + shell.set_thread_environ(the_env) return self._old_env def _restore_env(self, old_env): @@ -203,6 +203,7 @@ class MakefilesBase (Build): del os.environ[var] else: os.environ[var] = val + shell.set_thread_environ(None) self._old_env = None def _add_system_libs(self, new_env): diff --git a/cerbero/build/cookbook.py b/cerbero/build/cookbook.py index ba45b5b2..889b5d0b 100644 --- a/cerbero/build/cookbook.py +++ b/cerbero/build/cookbook.py @@ -234,6 +234,21 @@ class CookBook (object): del self.status[recipe_name] self.save() + def recipe_has_deps_to_build(self, recipe_name): + ''' + Check if recipe has dependencies that need to be built + ''' + recipe = self.get_recipe(recipe_name) + d = self._find_deps(recipe, {}, []) + #print recipe, d + for r in d: + if r == recipe: + continue + if self.recipe_needs_build(r.name): + #m.message(" %r needs_build of %r" % (recipe, r)); + return True + return False + def recipe_needs_build(self, recipe_name): ''' Whether a recipe needs to be build or not diff --git a/cerbero/build/oven.py b/cerbero/build/oven.py index 077dd561..9b4dd174 100644 --- a/cerbero/build/oven.py +++ b/cerbero/build/oven.py @@ -16,9 +16,11 @@ # Free Software Foundation, Inc., 59 Temple Place - Suite 330, # Boston, MA 02111-1307, USA. +from threading import Thread, Condition, Lock, ThreadError import tempfile import shutil import traceback +import sys from cerbero.errors import BuildStepError, FatalError, AbortedError from cerbero.build.recipe import Recipe, BuildSteps @@ -42,7 +44,35 @@ class RecoveryActions(object): RecoveryActions.RETRY_STEP, RecoveryActions.SKIP, RecoveryActions.ABORT] +class OvenTray(Thread): + def __init__(self, oven, recipe, count, total, *args, **kwargs): + Thread.__init__(self, name=recipe.name, *args, **kwargs) + self.oven = oven + self.recipe = recipe + self.count = count + self.total = total + def run(self): + print "Calling cook_recipe for", self.recipe + try: + self.oven._cook_recipe(self.recipe, self.count, self.total) + except Exception, e: + traceback.print_exc() + self.oven.slotcond.acquire() + if not self.oven.stop_building: + self.oven.exception = sys.exc_info() + print self.oven.exception + self.oven.stop_building = True + self.oven.slots.remove(self) + self.oven.slotcond.notify_all() + self.oven.slotcond.release() + print "==> EXITING THREAD %r" % self.recipe + return + self.oven.slotcond.acquire() + self.oven.slots.remove(self) + self.oven.slotcond.notify_all() + self.oven.slotcond.release() + class Oven (object): ''' This oven cooks recipes with all their ingredients @@ -73,7 +103,27 @@ class Oven (object): self.config = cookbook.get_config() self.interactive = self.config.interactive shell.DRY_RUN = dry_run + self.exception = None + self.stop_building = False + # Lock to protect above variables in the executor threads + self.lock = Lock() + self.slots = [] + # condition to use over slots + self.slotcond = Condition() + def stop_cooking(self): + self.stop_building = True + # The main thread might have been interrupted while + # the slotcond lock was acquired, make sure that wasn't the + # case + try: + self.slotcond.release() + except: + pass + for t in self.slots: + while t.is_alive(): + t.join(1) + def start_cooking(self): ''' Cooks the recipe and all its dependencies @@ -88,13 +138,66 @@ class Oven (object): # remove recipes already scheduled to be built recipes = [x for x in recipes if x not in ordered_recipes] ordered_recipes.extend(recipes) + # Filter already built recipes + if not self.force: + ordered_recipes = [x for x in ordered_recipes if self.cookbook.recipe_needs_build(x.name)] + m.message(_("Building the following recipes: %s") % ' '.join([x.name for x in ordered_recipes])) + for r in ordered_recipes: + m.message('%s has dependencies that need to be built : %r' % (r, self.cookbook.recipe_has_deps_to_build(r.name))) + + # Recipes built in order, we need a graph i = 1 - for recipe in ordered_recipes: + total = len(ordered_recipes) + # slots we will use for threads + self.slots = [] + maxslots = 4 + + # iterate while we have recipes to build or builds still running + while len(ordered_recipes) or len(self.slots): + print "recipes", len(ordered_recipes) + print "slots", len(self.slots) + self.slotcond.acquire() + + # are we done yet ? + if ordered_recipes == []: + print "!!!!! Already building %r " % [x.recipe for x in self.slots] + print "!!!! Done with recipes, waiting for the end" + while not len(self.slots) == 0: + self.slotcond.wait() + self.slotcond.release() + break + + # if the slots are full, wait for a free slot + if len(self.slots) >= maxslots: + print "!!!!! Already building %r " % [x.recipe for x in self.slots] + print "!!!!! waiting for more slots", len(self.slots), maxslots + self.slotcond.wait() + + # FIXME : How do we catch exceptions ?? + + # find the next recipe we can build without dependencies + recipe = None + while True: + print "Trying to find next recipe without dependencies" + for x in ordered_recipes: + if not self.cookbook.recipe_has_deps_to_build(x.name): + recipe = x + ordered_recipes.remove(x) + break + if recipe is not None: + break + + print "Waiting for dependencies to finish building : %r" % ([x.recipe.name for x in self.slots]) + self.slotcond.wait() + try: - self._cook_recipe(recipe, i, len(ordered_recipes)) + t = OvenTray(self, recipe, i, total) + self.slots.append(t) + t.start() + self.slotcond.release() except BuildStepError, be: if not self.interactive: raise be @@ -108,19 +211,25 @@ class Oven (object): elif action == RecoveryActions.RETRY_ALL: shutil.rmtree(recipe.get_for_arch (be.arch, 'build_dir')) self.cookbook.reset_recipe_status(recipe.name) - self._cook_recipe(recipe, i, len(ordered_recipes)) + self._cook_recipe(recipe, i, total) elif action == RecoveryActions.RETRY_STEP: - self._cook_recipe(recipe, i, len(ordered_recipes)) + self._cook_recipe(recipe, i, total) elif action == RecoveryActions.SKIP: continue elif action == RecoveryActions.ABORT: raise AbortedError() + except Exception, e: + print "!!!!!!! other exception %r" % e i += 1 + # Handle exceptions if any + def _cook_recipe(self, recipe, count, total): + self.lock.acquire() if not self.cookbook.recipe_needs_build(recipe.name) and \ not self.force: m.build_step(count, total, recipe.name, _("already built")) + self.lock.release() return if self.missing_files: @@ -134,23 +243,29 @@ class Oven (object): if self.cookbook.step_done(recipe.name, step) and not self.force: m.action(_("Step done")) continue + if self.stop_building: + m.action("Aborting build") + self.lock.release() + return try: + self.lock.release() # call step function stepfunc = getattr(recipe, step) if not stepfunc: raise FatalError(_('Step %s not found') % step) shell.set_logfile_output("%s/%s-%s.log" % (recipe.config.logs, recipe, step)) stepfunc() - # update status successfully - self.cookbook.update_step_status(recipe.name, step) shell.close_logfile_output() + self.lock.acquire() + self.cookbook.update_step_status(recipe.name, step) except FatalError, e: - shell.close_logfile_output(dump=True) self._handle_build_step_error(recipe, step, e.arch) except Exception: + self.lock.release() shell.close_logfile_output(dump=True) raise BuildStepError(recipe, step, traceback.format_exc()) self.cookbook.update_build_status(recipe.name, recipe.built_version()) + self.lock.release() if self.missing_files: self._print_missing_files(recipe, tmp) @@ -162,6 +277,11 @@ class Oven (object): # the recipe status to start from scratch next time shutil.rmtree(recipe.build_dir) self.cookbook.reset_recipe_status(recipe.name) + try: + self.lock.release() + except ThreadError: + pass + shell.close_logfile_output(dump=True) raise BuildStepError(recipe, step, arch=arch) def _print_missing_files(self, recipe, tmp): diff --git a/cerbero/commands/build.py b/cerbero/commands/build.py index a0c62f93..2a93ccb8 100644 --- a/cerbero/commands/build.py +++ b/cerbero/commands/build.py @@ -22,7 +22,8 @@ from cerbero.commands import Command, register_command from cerbero.build.cookbook import CookBook from cerbero.build.oven import Oven from cerbero.utils import _, N_, ArgparseArgument - +from cerbero.utils import messages as m +import traceback class Build(Command): doc = N_('Build a recipe') @@ -71,7 +72,14 @@ class Build(Command): oven = Oven(recipes, cookbook, force=self.force, no_deps=self.no_deps, missing_files=missing_files, dry_run=dry_run) - oven.start_cooking() + try: + oven.start_cooking() + except KeyboardInterrupt: + m.error(_('Interrupted')) + oven.stop_cooking() + except Exception, e: + oven.stop_cooking() + raise e class BuildOne(Build): diff --git a/cerbero/utils/shell.py b/cerbero/utils/shell.py index 804f33ba..e263a497 100644 --- a/cerbero/utils/shell.py +++ b/cerbero/utils/shell.py @@ -28,6 +28,7 @@ import time import glob import shutil import hashlib +import threading from cerbero.enums import Platform from cerbero.utils import _, system_info, to_unixpath @@ -40,9 +41,20 @@ TAR = 'tar' PLATFORM = system_info()[0] -LOGFILE = None # open('/tmp/cerbero.log', 'w+') +# Thread local logfile +LOGFILE = threading.local() +LOGFILE.value = None +# open('/tmp/cerbero.log', 'w+') DRY_RUN = False +_thread_env = threading.local() +_thread_env.value = None + +def set_thread_environ(environ): + if environ is not None: + _thread_env.value = environ.copy() + else: + _thread_env.value = None def set_logfile_output(location): ''' @@ -53,10 +65,10 @@ def set_logfile_output(location): ''' global LOGFILE - if not LOGFILE is None: + if _valid_logfile(): raise Exception("Logfile was already open. Forgot to call " "close_logfile_output() ?") - LOGFILE = open(location, "w+") + LOGFILE.value = open(location, "w+") def close_logfile_output(dump=False): @@ -67,22 +79,22 @@ def close_logfile_output(dump=False): @type dump: bool ''' global LOGFILE - if LOGFILE is None: + if not _valid_logfile(): raise Exception("No logfile was open") if dump: - LOGFILE.seek(0) + LOGFILE.value.seek(0) while True: - data = LOGFILE.read() + data = LOGFILE.value.read() if data: print data else: break # if logfile is empty, remove it - pos = LOGFILE.tell() - LOGFILE.close() + pos = LOGFILE.value.tell() + LOGFILE.value.close() if pos == 0: - os.remove(LOGFILE.name) - LOGFILE = None + os.remove(LOGFILE.value.name) + LOGFILE.value = None class StdOut: @@ -107,7 +119,23 @@ def _fix_mingw_cmd(path): l_path[i] = '/' return ''.join(l_path) +def _valid_logfile(): + global LOGFILE + try: + x = LOGFILE.value + return x is not None + except: + return False +def _get_environ(): + try: + env = _thread_env.value + if env is None: + env = os.environ.copy() + except: + env = os.environ.copy() + return env + def call(cmd, cmd_dir='.', fail=True, verbose=False): ''' Run a shell command @@ -119,13 +147,20 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False): @param fail: whether or not to raise an exception if the command fails @type fail: bool ''' + env = _get_environ() + global LOGFILE + if _valid_logfile(): + stream = LOGFILE.value + else: + stream = sys.stdout + try: if LOGFILE is None: if verbose: m.message("Running command '%s'" % cmd) - else: - LOGFILE.write("Running command '%s'\n" % cmd) - LOGFILE.flush() + elif _valid_logfile(): + LOGFILE.value.write("Running command '%s'\n" % cmd) + LOGFILE.value.flush() shell = True if PLATFORM == Platform.WINDOWS: # windows do not understand ./ @@ -137,7 +172,6 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False): cmd = _fix_mingw_cmd(cmd) # Disable shell which uses cmd.exe shell = False - stream = LOGFILE or sys.stdout if DRY_RUN: # write to sdterr so it's filtered more easilly m.error("cd %s && %s && cd %s" % (cmd_dir, cmd, os.getcwd())) @@ -146,7 +180,7 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False): ret = subprocess.check_call(cmd, cwd=cmd_dir, stderr=subprocess.STDOUT, stdout=StdOut(stream), - env=os.environ.copy(), shell=shell) + env=env, shell=shell) except subprocess.CalledProcessError: if fail: raise FatalError(_("Error running command: %s") % cmd) @@ -155,13 +189,16 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False): return ret -def check_call(cmd, cmd_dir=None, shell=False, split=True, fail=False): - if split and isinstance(cmd, str): - cmd = shlex.split(cmd) +def check_call(cmd, cmd_dir=None, shell=False, split=True, fail=False, env=None): try: + if env is None: + env = _get_environ() + if split and isinstance(cmd, str): + cmd = shlex.split(cmd) process = subprocess.Popen(cmd, cwd=cmd_dir, stdout=subprocess.PIPE, - stderr=open(os.devnull), shell=shell) + stderr=open(os.devnull), shell=shell, + env=env) output, unused_err = process.communicate() if process.poll() and fail: raise Exception() @@ -233,7 +270,7 @@ def download(url, destination=None, recursive=False, check_cert=True, overwrite= cmd += " --no-check-certificate" if not recursive and not overwrite and os.path.exists(destination): - if LOGFILE is None: + if LOGFILE.value is None: logging.info("File %s already downloaded." % destination) else: if not recursive and not os.path.exists(os.path.dirname(destination)): @@ -241,8 +278,8 @@ def download(url, destination=None, recursive=False, check_cert=True, overwrite= elif recursive and not os.path.exists(destination): os.makedirs(destination) - if LOGFILE: - LOGFILE.write("Downloading %s\n" % url) + if LOGFILE.value: + LOGFILE.value.write("Downloading %s\n" % url) else: logging.info("Downloading %s", url) try: |