summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEdward Hervey <bilboed@bilboed.com>2015-03-03 17:40:53 +0100
committerEdward Hervey <bilboed@bilboed.com>2018-02-21 11:31:07 +0100
commit3b5ca4a3794171750ae33410b34832177d75b410 (patch)
treefc8b8bd91256859a75175248db0acc67c8ab8544
parentdbbd47b842df2438f7ba7ca872b60766b60abc6d (diff)
WIP cerbero parallell recipe buildparallel
Instead of building one recipe after another, have more than one slot building recipes at the same (currently hardcoded to 4) TODO: Fix bootstrap TODO: Fix issue when a build fails (deadlock somewhere ?) TODO: Cleanup
-rw-r--r--cerbero/build/build.py17
-rw-r--r--cerbero/build/cookbook.py15
-rw-r--r--cerbero/build/oven.py134
-rw-r--r--cerbero/commands/build.py12
-rw-r--r--cerbero/utils/shell.py81
5 files changed, 220 insertions, 39 deletions
diff --git a/cerbero/build/build.py b/cerbero/build/build.py
index d2834a92..c3bbfa7a 100644
--- a/cerbero/build/build.py
+++ b/cerbero/build/build.py
@@ -24,7 +24,6 @@ from cerbero.utils import messages as m
import shutil
import re
-
class Build (object):
'''
Base class for build handlers
@@ -178,18 +177,19 @@ class MakefilesBase (Build):
for var in append_env.keys() + new_env.keys():
self._old_env[var] = os.environ.get(var, None)
+ the_env = os.environ.copy()
+
for var, val in append_env.iteritems():
- if not os.environ.has_key(var):
- os.environ[var] = val
- else:
- os.environ[var] = '%s %s' % (os.environ[var], val)
+ the_env[var] = '%s %s' % (os.environ.get(var, ''), val)
for var, val in new_env.iteritems():
if val is None:
- if var in os.environ:
- del os.environ[var]
+ if var in the_env:
+ del the_env[var]
else:
- os.environ[var] = val
+ the_env[var] = val
+
+ shell.set_thread_environ(the_env)
return self._old_env
def _restore_env(self, old_env):
@@ -203,6 +203,7 @@ class MakefilesBase (Build):
del os.environ[var]
else:
os.environ[var] = val
+ shell.set_thread_environ(None)
self._old_env = None
def _add_system_libs(self, new_env):
diff --git a/cerbero/build/cookbook.py b/cerbero/build/cookbook.py
index ba45b5b2..889b5d0b 100644
--- a/cerbero/build/cookbook.py
+++ b/cerbero/build/cookbook.py
@@ -234,6 +234,21 @@ class CookBook (object):
del self.status[recipe_name]
self.save()
+ def recipe_has_deps_to_build(self, recipe_name):
+ '''
+ Check if recipe has dependencies that need to be built
+ '''
+ recipe = self.get_recipe(recipe_name)
+ d = self._find_deps(recipe, {}, [])
+ #print recipe, d
+ for r in d:
+ if r == recipe:
+ continue
+ if self.recipe_needs_build(r.name):
+ #m.message(" %r needs_build of %r" % (recipe, r));
+ return True
+ return False
+
def recipe_needs_build(self, recipe_name):
'''
Whether a recipe needs to be build or not
diff --git a/cerbero/build/oven.py b/cerbero/build/oven.py
index 077dd561..9b4dd174 100644
--- a/cerbero/build/oven.py
+++ b/cerbero/build/oven.py
@@ -16,9 +16,11 @@
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
+from threading import Thread, Condition, Lock, ThreadError
import tempfile
import shutil
import traceback
+import sys
from cerbero.errors import BuildStepError, FatalError, AbortedError
from cerbero.build.recipe import Recipe, BuildSteps
@@ -42,7 +44,35 @@ class RecoveryActions(object):
RecoveryActions.RETRY_STEP, RecoveryActions.SKIP,
RecoveryActions.ABORT]
+class OvenTray(Thread):
+ def __init__(self, oven, recipe, count, total, *args, **kwargs):
+ Thread.__init__(self, name=recipe.name, *args, **kwargs)
+ self.oven = oven
+ self.recipe = recipe
+ self.count = count
+ self.total = total
+ def run(self):
+ print "Calling cook_recipe for", self.recipe
+ try:
+ self.oven._cook_recipe(self.recipe, self.count, self.total)
+ except Exception, e:
+ traceback.print_exc()
+ self.oven.slotcond.acquire()
+ if not self.oven.stop_building:
+ self.oven.exception = sys.exc_info()
+ print self.oven.exception
+ self.oven.stop_building = True
+ self.oven.slots.remove(self)
+ self.oven.slotcond.notify_all()
+ self.oven.slotcond.release()
+ print "==> EXITING THREAD %r" % self.recipe
+ return
+ self.oven.slotcond.acquire()
+ self.oven.slots.remove(self)
+ self.oven.slotcond.notify_all()
+ self.oven.slotcond.release()
+
class Oven (object):
'''
This oven cooks recipes with all their ingredients
@@ -73,7 +103,27 @@ class Oven (object):
self.config = cookbook.get_config()
self.interactive = self.config.interactive
shell.DRY_RUN = dry_run
+ self.exception = None
+ self.stop_building = False
+ # Lock to protect above variables in the executor threads
+ self.lock = Lock()
+ self.slots = []
+ # condition to use over slots
+ self.slotcond = Condition()
+ def stop_cooking(self):
+ self.stop_building = True
+ # The main thread might have been interrupted while
+ # the slotcond lock was acquired, make sure that wasn't the
+ # case
+ try:
+ self.slotcond.release()
+ except:
+ pass
+ for t in self.slots:
+ while t.is_alive():
+ t.join(1)
+
def start_cooking(self):
'''
Cooks the recipe and all its dependencies
@@ -88,13 +138,66 @@ class Oven (object):
# remove recipes already scheduled to be built
recipes = [x for x in recipes if x not in ordered_recipes]
ordered_recipes.extend(recipes)
+ # Filter already built recipes
+ if not self.force:
+ ordered_recipes = [x for x in ordered_recipes if self.cookbook.recipe_needs_build(x.name)]
+
m.message(_("Building the following recipes: %s") %
' '.join([x.name for x in ordered_recipes]))
+ for r in ordered_recipes:
+ m.message('%s has dependencies that need to be built : %r' % (r, self.cookbook.recipe_has_deps_to_build(r.name)))
+
+ # Recipes built in order, we need a graph
i = 1
- for recipe in ordered_recipes:
+ total = len(ordered_recipes)
+ # slots we will use for threads
+ self.slots = []
+ maxslots = 4
+
+ # iterate while we have recipes to build or builds still running
+ while len(ordered_recipes) or len(self.slots):
+ print "recipes", len(ordered_recipes)
+ print "slots", len(self.slots)
+ self.slotcond.acquire()
+
+ # are we done yet ?
+ if ordered_recipes == []:
+ print "!!!!! Already building %r " % [x.recipe for x in self.slots]
+ print "!!!! Done with recipes, waiting for the end"
+ while not len(self.slots) == 0:
+ self.slotcond.wait()
+ self.slotcond.release()
+ break
+
+ # if the slots are full, wait for a free slot
+ if len(self.slots) >= maxslots:
+ print "!!!!! Already building %r " % [x.recipe for x in self.slots]
+ print "!!!!! waiting for more slots", len(self.slots), maxslots
+ self.slotcond.wait()
+
+ # FIXME : How do we catch exceptions ??
+
+ # find the next recipe we can build without dependencies
+ recipe = None
+ while True:
+ print "Trying to find next recipe without dependencies"
+ for x in ordered_recipes:
+ if not self.cookbook.recipe_has_deps_to_build(x.name):
+ recipe = x
+ ordered_recipes.remove(x)
+ break
+ if recipe is not None:
+ break
+
+ print "Waiting for dependencies to finish building : %r" % ([x.recipe.name for x in self.slots])
+ self.slotcond.wait()
+
try:
- self._cook_recipe(recipe, i, len(ordered_recipes))
+ t = OvenTray(self, recipe, i, total)
+ self.slots.append(t)
+ t.start()
+ self.slotcond.release()
except BuildStepError, be:
if not self.interactive:
raise be
@@ -108,19 +211,25 @@ class Oven (object):
elif action == RecoveryActions.RETRY_ALL:
shutil.rmtree(recipe.get_for_arch (be.arch, 'build_dir'))
self.cookbook.reset_recipe_status(recipe.name)
- self._cook_recipe(recipe, i, len(ordered_recipes))
+ self._cook_recipe(recipe, i, total)
elif action == RecoveryActions.RETRY_STEP:
- self._cook_recipe(recipe, i, len(ordered_recipes))
+ self._cook_recipe(recipe, i, total)
elif action == RecoveryActions.SKIP:
continue
elif action == RecoveryActions.ABORT:
raise AbortedError()
+ except Exception, e:
+ print "!!!!!!! other exception %r" % e
i += 1
+ # Handle exceptions if any
+
def _cook_recipe(self, recipe, count, total):
+ self.lock.acquire()
if not self.cookbook.recipe_needs_build(recipe.name) and \
not self.force:
m.build_step(count, total, recipe.name, _("already built"))
+ self.lock.release()
return
if self.missing_files:
@@ -134,23 +243,29 @@ class Oven (object):
if self.cookbook.step_done(recipe.name, step) and not self.force:
m.action(_("Step done"))
continue
+ if self.stop_building:
+ m.action("Aborting build")
+ self.lock.release()
+ return
try:
+ self.lock.release()
# call step function
stepfunc = getattr(recipe, step)
if not stepfunc:
raise FatalError(_('Step %s not found') % step)
shell.set_logfile_output("%s/%s-%s.log" % (recipe.config.logs, recipe, step))
stepfunc()
- # update status successfully
- self.cookbook.update_step_status(recipe.name, step)
shell.close_logfile_output()
+ self.lock.acquire()
+ self.cookbook.update_step_status(recipe.name, step)
except FatalError, e:
- shell.close_logfile_output(dump=True)
self._handle_build_step_error(recipe, step, e.arch)
except Exception:
+ self.lock.release()
shell.close_logfile_output(dump=True)
raise BuildStepError(recipe, step, traceback.format_exc())
self.cookbook.update_build_status(recipe.name, recipe.built_version())
+ self.lock.release()
if self.missing_files:
self._print_missing_files(recipe, tmp)
@@ -162,6 +277,11 @@ class Oven (object):
# the recipe status to start from scratch next time
shutil.rmtree(recipe.build_dir)
self.cookbook.reset_recipe_status(recipe.name)
+ try:
+ self.lock.release()
+ except ThreadError:
+ pass
+ shell.close_logfile_output(dump=True)
raise BuildStepError(recipe, step, arch=arch)
def _print_missing_files(self, recipe, tmp):
diff --git a/cerbero/commands/build.py b/cerbero/commands/build.py
index a0c62f93..2a93ccb8 100644
--- a/cerbero/commands/build.py
+++ b/cerbero/commands/build.py
@@ -22,7 +22,8 @@ from cerbero.commands import Command, register_command
from cerbero.build.cookbook import CookBook
from cerbero.build.oven import Oven
from cerbero.utils import _, N_, ArgparseArgument
-
+from cerbero.utils import messages as m
+import traceback
class Build(Command):
doc = N_('Build a recipe')
@@ -71,7 +72,14 @@ class Build(Command):
oven = Oven(recipes, cookbook, force=self.force,
no_deps=self.no_deps, missing_files=missing_files,
dry_run=dry_run)
- oven.start_cooking()
+ try:
+ oven.start_cooking()
+ except KeyboardInterrupt:
+ m.error(_('Interrupted'))
+ oven.stop_cooking()
+ except Exception, e:
+ oven.stop_cooking()
+ raise e
class BuildOne(Build):
diff --git a/cerbero/utils/shell.py b/cerbero/utils/shell.py
index 804f33ba..e263a497 100644
--- a/cerbero/utils/shell.py
+++ b/cerbero/utils/shell.py
@@ -28,6 +28,7 @@ import time
import glob
import shutil
import hashlib
+import threading
from cerbero.enums import Platform
from cerbero.utils import _, system_info, to_unixpath
@@ -40,9 +41,20 @@ TAR = 'tar'
PLATFORM = system_info()[0]
-LOGFILE = None # open('/tmp/cerbero.log', 'w+')
+# Thread local logfile
+LOGFILE = threading.local()
+LOGFILE.value = None
+# open('/tmp/cerbero.log', 'w+')
DRY_RUN = False
+_thread_env = threading.local()
+_thread_env.value = None
+
+def set_thread_environ(environ):
+ if environ is not None:
+ _thread_env.value = environ.copy()
+ else:
+ _thread_env.value = None
def set_logfile_output(location):
'''
@@ -53,10 +65,10 @@ def set_logfile_output(location):
'''
global LOGFILE
- if not LOGFILE is None:
+ if _valid_logfile():
raise Exception("Logfile was already open. Forgot to call "
"close_logfile_output() ?")
- LOGFILE = open(location, "w+")
+ LOGFILE.value = open(location, "w+")
def close_logfile_output(dump=False):
@@ -67,22 +79,22 @@ def close_logfile_output(dump=False):
@type dump: bool
'''
global LOGFILE
- if LOGFILE is None:
+ if not _valid_logfile():
raise Exception("No logfile was open")
if dump:
- LOGFILE.seek(0)
+ LOGFILE.value.seek(0)
while True:
- data = LOGFILE.read()
+ data = LOGFILE.value.read()
if data:
print data
else:
break
# if logfile is empty, remove it
- pos = LOGFILE.tell()
- LOGFILE.close()
+ pos = LOGFILE.value.tell()
+ LOGFILE.value.close()
if pos == 0:
- os.remove(LOGFILE.name)
- LOGFILE = None
+ os.remove(LOGFILE.value.name)
+ LOGFILE.value = None
class StdOut:
@@ -107,7 +119,23 @@ def _fix_mingw_cmd(path):
l_path[i] = '/'
return ''.join(l_path)
+def _valid_logfile():
+ global LOGFILE
+ try:
+ x = LOGFILE.value
+ return x is not None
+ except:
+ return False
+def _get_environ():
+ try:
+ env = _thread_env.value
+ if env is None:
+ env = os.environ.copy()
+ except:
+ env = os.environ.copy()
+ return env
+
def call(cmd, cmd_dir='.', fail=True, verbose=False):
'''
Run a shell command
@@ -119,13 +147,20 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False):
@param fail: whether or not to raise an exception if the command fails
@type fail: bool
'''
+ env = _get_environ()
+ global LOGFILE
+ if _valid_logfile():
+ stream = LOGFILE.value
+ else:
+ stream = sys.stdout
+
try:
if LOGFILE is None:
if verbose:
m.message("Running command '%s'" % cmd)
- else:
- LOGFILE.write("Running command '%s'\n" % cmd)
- LOGFILE.flush()
+ elif _valid_logfile():
+ LOGFILE.value.write("Running command '%s'\n" % cmd)
+ LOGFILE.value.flush()
shell = True
if PLATFORM == Platform.WINDOWS:
# windows do not understand ./
@@ -137,7 +172,6 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False):
cmd = _fix_mingw_cmd(cmd)
# Disable shell which uses cmd.exe
shell = False
- stream = LOGFILE or sys.stdout
if DRY_RUN:
# write to sdterr so it's filtered more easilly
m.error("cd %s && %s && cd %s" % (cmd_dir, cmd, os.getcwd()))
@@ -146,7 +180,7 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False):
ret = subprocess.check_call(cmd, cwd=cmd_dir,
stderr=subprocess.STDOUT,
stdout=StdOut(stream),
- env=os.environ.copy(), shell=shell)
+ env=env, shell=shell)
except subprocess.CalledProcessError:
if fail:
raise FatalError(_("Error running command: %s") % cmd)
@@ -155,13 +189,16 @@ def call(cmd, cmd_dir='.', fail=True, verbose=False):
return ret
-def check_call(cmd, cmd_dir=None, shell=False, split=True, fail=False):
- if split and isinstance(cmd, str):
- cmd = shlex.split(cmd)
+def check_call(cmd, cmd_dir=None, shell=False, split=True, fail=False, env=None):
try:
+ if env is None:
+ env = _get_environ()
+ if split and isinstance(cmd, str):
+ cmd = shlex.split(cmd)
process = subprocess.Popen(cmd, cwd=cmd_dir,
stdout=subprocess.PIPE,
- stderr=open(os.devnull), shell=shell)
+ stderr=open(os.devnull), shell=shell,
+ env=env)
output, unused_err = process.communicate()
if process.poll() and fail:
raise Exception()
@@ -233,7 +270,7 @@ def download(url, destination=None, recursive=False, check_cert=True, overwrite=
cmd += " --no-check-certificate"
if not recursive and not overwrite and os.path.exists(destination):
- if LOGFILE is None:
+ if LOGFILE.value is None:
logging.info("File %s already downloaded." % destination)
else:
if not recursive and not os.path.exists(os.path.dirname(destination)):
@@ -241,8 +278,8 @@ def download(url, destination=None, recursive=False, check_cert=True, overwrite=
elif recursive and not os.path.exists(destination):
os.makedirs(destination)
- if LOGFILE:
- LOGFILE.write("Downloading %s\n" % url)
+ if LOGFILE.value:
+ LOGFILE.value.write("Downloading %s\n" % url)
else:
logging.info("Downloading %s", url)
try: