From 2a2d2b77c52235b89b72d9ade0f9befc03281b3e Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Thu, 19 Sep 2019 17:06:15 +1000 Subject: oven: build recipes in parallel --- cerbero/bootstrap/__init__.py | 4 +- cerbero/bootstrap/android.py | 2 +- cerbero/bootstrap/build_tools.py | 4 +- cerbero/bootstrap/ios.py | 2 +- cerbero/bootstrap/linux.py | 2 +- cerbero/bootstrap/osx.py | 2 +- cerbero/bootstrap/windows.py | 2 +- cerbero/build/oven.py | 235 +++++++++++++++++++++++++++++++++------ cerbero/build/recipe.py | 41 +++++-- cerbero/commands/bootstrap.py | 13 ++- cerbero/commands/build.py | 9 +- cerbero/commands/package.py | 10 +- 12 files changed, 265 insertions(+), 61 deletions(-) diff --git a/cerbero/bootstrap/__init__.py b/cerbero/bootstrap/__init__.py index ac77423b..3586b01d 100644 --- a/cerbero/bootstrap/__init__.py +++ b/cerbero/bootstrap/__init__.py @@ -61,10 +61,10 @@ class BootstrapperBase (object): 'Fetch build-tools recipes; only called by fetch-bootstrap' pass - def extract(self): + async def extract(self): for (url, unpack, unpack_dir) in self.extract_steps: if unpack: - self.sources[url].extract_tarball(unpack_dir) + await self.sources[url].extract_tarball(unpack_dir) else: # Just copy the file as-is fname = os.path.basename(url) diff --git a/cerbero/bootstrap/android.py b/cerbero/bootstrap/android.py index 6875b2ad..e003f8fc 100644 --- a/cerbero/bootstrap/android.py +++ b/cerbero/bootstrap/android.py @@ -42,7 +42,7 @@ class AndroidBootstrapper (BootstrapperBase): self.fetch_urls.append((url, NDK_CHECKSUMS[os.path.basename(url)])) self.extract_steps.append((url, True, self.prefix)) - def start(self): + def start(self, jobs=0): if not os.path.exists(self.prefix): os.makedirs(self.prefix) ndkdir = os.path.join(self.prefix, 'android-ndk-' + NDK_VERSION) diff --git a/cerbero/bootstrap/build_tools.py b/cerbero/bootstrap/build_tools.py index 5d105acf..219518d5 100644 --- a/cerbero/bootstrap/build_tools.py +++ b/cerbero/bootstrap/build_tools.py @@ -102,8 +102,8 @@ class BuildTools (BootstrapperBase, Fetch): self.recipes = self.BUILD_TOOLS self.recipes += self.PLAT_BUILD_TOOLS.get(self.config.platform, []) - def start(self): - oven = Oven(self.recipes, self.cookbook) + def start(self, jobs=0): + oven = Oven(self.recipes, self.cookbook, jobs=jobs) oven.start_cooking() def fetch_recipes(self, jobs): diff --git a/cerbero/bootstrap/ios.py b/cerbero/bootstrap/ios.py index 8db9c240..1870852a 100644 --- a/cerbero/bootstrap/ios.py +++ b/cerbero/bootstrap/ios.py @@ -33,7 +33,7 @@ class IOSBootstrapper (BootstrapperBase): def __init__(self, config, offline, assume_yes): super().__init__(config, offline) - def start(self): + def start(self, jobs=0): # FIXME: enable it when buildbots are properly configured return diff --git a/cerbero/bootstrap/linux.py b/cerbero/bootstrap/linux.py index e2149e11..74028fcc 100644 --- a/cerbero/bootstrap/linux.py +++ b/cerbero/bootstrap/linux.py @@ -37,7 +37,7 @@ class UnixBootstrapper (BootstrapperBase): BootstrapperBase.__init__(self, config, offline) self.assume_yes = assume_yes - def start(self): + def start(self, jobs=0): for c in self.checks: c() diff --git a/cerbero/bootstrap/osx.py b/cerbero/bootstrap/osx.py index 4bcf5cd6..f839a01c 100644 --- a/cerbero/bootstrap/osx.py +++ b/cerbero/bootstrap/osx.py @@ -35,7 +35,7 @@ class OSXBootstrapper (BootstrapperBase): url = CPANM_URL_TPL.format(CPANM_VERSION) self.fetch_urls.append((url, CPANM_CHECKSUM)) - def start(self): + def start(self, jobs=0): # skip system package install if not needed if not self.config.distro_packages_install: return diff --git a/cerbero/bootstrap/windows.py b/cerbero/bootstrap/windows.py index 2d372164..d432d22f 100644 --- a/cerbero/bootstrap/windows.py +++ b/cerbero/bootstrap/windows.py @@ -97,7 +97,7 @@ class WindowsBootstrapper(BootstrapperBase): self.fetch_urls.append((url, checksum)) self.extract_steps.append((url, True, self.prefix)) - def start(self): + def start(self, jobs=0): if not git.check_line_endings(self.config.platform): raise ConfigurationError("git is configured to use automatic line " "endings conversion. You can fix it running:\n" diff --git a/cerbero/build/oven.py b/cerbero/build/oven.py index a6e6c386..26e48969 100644 --- a/cerbero/build/oven.py +++ b/cerbero/build/oven.py @@ -26,7 +26,7 @@ from subprocess import CalledProcessError from cerbero.enums import Architecture, Platform, LibraryType from cerbero.errors import BuildStepError, FatalError, AbortedError from cerbero.build.recipe import Recipe, BuildSteps -from cerbero.utils import _, N_, shell, run_until_complete +from cerbero.utils import _, N_, shell, run_until_complete, determine_num_of_cpus from cerbero.utils import messages as m import inspect @@ -69,7 +69,7 @@ class Oven (object): ''' def __init__(self, recipes, cookbook, force=False, no_deps=False, - missing_files=False, dry_run=False, deps_only=False): + missing_files=False, dry_run=False, deps_only=False, jobs=None): if isinstance(recipes, Recipe): recipes = [recipes] self.recipes = recipes @@ -81,6 +81,11 @@ class Oven (object): self.interactive = self.config.interactive self.deps_only = deps_only shell.DRY_RUN = dry_run + self._build_lock = asyncio.Semaphore(2) + self._install_lock = asyncio.Lock() + self.jobs = jobs + if not self.jobs: + self.jobs = determine_num_of_cpus() def start_cooking(self): ''' @@ -104,40 +109,183 @@ class Oven (object): m.message(_("Building the following recipes: %s") % ' '.join([x.name for x in ordered_recipes])) - i = 1 self._static_libraries_built = [] - for recipe in ordered_recipes: - try: - self._cook_recipe(recipe, i, len(ordered_recipes)) - except BuildStepError as be: - if not self.interactive: - raise be - msg = be.msg - msg += _("Select an action to proceed:") - action = shell.prompt_multiple(msg, RecoveryActions()) - if action == RecoveryActions.SHELL: - shell.enter_build_environment(self.config.target_platform, - be.arch, recipe.get_for_arch (be.arch, 'build_dir'), env=recipe.config.env) - raise be - elif action == RecoveryActions.RETRY_ALL: - shutil.rmtree(recipe.get_for_arch (be.arch, 'build_dir')) - self.cookbook.reset_recipe_status(recipe.name) - self._cook_recipe(recipe, i, len(ordered_recipes)) - elif action == RecoveryActions.RETRY_STEP: - self._cook_recipe(recipe, i, len(ordered_recipes)) - elif action == RecoveryActions.SKIP: - i += 1 + run_until_complete(self._cook_recipes(ordered_recipes)) + + async def _cook_recipes(self, recipes): + recipes = set(recipes) + built_recipes = set() # recipes we have successfully built + building_recipes = set() # recipes that are queued or are in progress + + def all_deps_without_recipe(recipe_name): + return set((dep.name for dep in self.cookbook.list_recipe_deps(recipe_name) if recipe_name != dep.name)) + + all_deps = set() + for recipe in recipes: + [all_deps.add(dep) for dep in all_deps_without_recipe(recipe.name)] + + # handle the 'buildone' case by adding all the recipe deps to the built + # list if they are not in the recipe list + if self.no_deps: + [built_recipes.add(dep) for dep in (all_deps - set((r.name for r in recipes)))] + else: + [recipes.add(self.cookbook.get_recipe(dep)) for dep in all_deps] + + # final targets. The set of recipes with no reverse dependencies + recipe_targets = set((r.name for r in recipes)) - all_deps + + # precompute the deps for each recipe + recipe_deps = {} + for r in set(set((r.name for r in recipes)) | all_deps): + deps = all_deps_without_recipe(r) + recipe_deps[r] = deps + + def find_recipe_dep_path(from_name, to_name): + # returns a list of recipe names in reverse order that describes + # the path for building @from_name + # None if there is no path + if from_name == to_name: + return [to_name] + for dep in recipe_deps[from_name]: + val = find_recipe_dep_path(dep, to_name) + if val: + return [from_name] + val + + def find_longest_path(to_recipes): + # return the longest path from the targets to one of @to_recipes + def yield_path_lengths(): + for f in recipe_targets: + for t in to_recipes: + path = find_recipe_dep_path(f, t) + if path: + yield len(path) + return max((l for l in yield_path_lengths())) + + def find_buildable_recipes(): + # This is a dumb algorithm that only looks for all available + # recipes that can be built. We use a priority queue for + # the smarts. + for recipe in recipes: + if recipe.name in built_recipes: + continue + if recipe.name in building_recipes: continue - elif action == RecoveryActions.ABORT: - raise AbortedError() - i += 1 - def _cook_recipe(self, recipe, count, total): + if len(all_deps_without_recipe(recipe.name)) == 0: + yield recipe + continue + + built_deps = set((dep for dep in all_deps_without_recipe(recipe.name) if dep in built_recipes)) + if len(built_deps) > 0 and built_deps == set(all_deps_without_recipe(recipe.name)): + # we have a new dep buildable + yield recipe + + class MutableInt: + def __init__(self): + self.i = 0 + + counter = MutableInt() + tasks = [] + + loop = asyncio.get_event_loop() + queue = asyncio.PriorityQueue(loop=loop) + + class RecipePriority: + # can't use a tuple as Recipe doens't implement __lt__() as + # required by PriorityQueue + def __init__(self, recipe): + self.recipe = recipe + self.path_length = find_longest_path((recipe.name,)) + + def __lt__(self, other): + # return lower for larger path lengths + return self.path_length > other.path_length + + main_task = asyncio.current_task() + async def shutdown(loop): + # a little heavy handed but we need to do this otherwise some + # tasks will continue even after an exception is thrown + kill_tasks = [t for t in asyncio.all_tasks() if t not in (main_task, asyncio.current_task())] + [task.cancel() for task in kill_tasks] + await asyncio.gather(*kill_tasks, return_exceptions=True) + + async def cook_recipe_worker(queue): + # the main worker task + while True: + recipe_d = await queue.get() + recipe = recipe_d.recipe + counter.i += 1 + await self._cook_recipe_with_prompt(recipe, counter.i, len(recipes)) + built_recipes.add(recipe.name) + building_recipes.remove(recipe.name) + for buildable in find_buildable_recipes (): + building_recipes.add(buildable.name) + queue.put_nowait(RecipePriority(buildable)) + queue.task_done() + + # push the initial set of recipes that have no dependencies to start + # building + for recipe in find_buildable_recipes (): + building_recipes.add(recipe.name) + queue.put_nowait(RecipePriority(recipe)) + + for i in range(self.jobs): + task = asyncio.create_task(cook_recipe_worker(queue)) + tasks.append(task) + + class QueueDone(Exception): + pass + + async def queue_done(queue): + # This is how we exit the asyncio.wait once everything is done + # as otherwise asyncio.wait will wait for our tasks to complete + while built_recipes & recipe_targets != recipe_targets: + await queue.join() + raise QueueDone() + + task = asyncio.create_task (queue_done(queue)) + tasks.append(task) + try: + await asyncio.gather(*tasks, return_exceptions=False) + except asyncio.CancelledError: + pass + except QueueDone: + await shutdown(loop) + except Exception: + await shutdown(loop) + raise + + async def _cook_recipe_with_prompt(self, recipe, count, total): + try: + await self._cook_recipe(recipe, count, total) + except BuildStepError as be: + if not self.interactive: + raise be + msg = be.msg + msg += _("Select an action to proceed:") + action = shell.prompt_multiple(msg, RecoveryActions()) + if action == RecoveryActions.SHELL: + shell.enter_build_environment(self.config.target_platform, + be.arch, recipe.get_for_arch (be.arch, 'build_dir'), env=recipe.config.env) + raise be + elif action == RecoveryActions.RETRY_ALL: + shutil.rmtree(recipe.get_for_arch (be.arch, 'build_dir')) + self.cookbook.reset_recipe_status(recipe.name) + await self._cook_recipe(recipe, count, total) + elif action == RecoveryActions.RETRY_STEP: + await self._cook_recipe(recipe, count, total) + elif action == RecoveryActions.SKIP: + pass + elif action == RecoveryActions.ABORT: + raise AbortedError() + + async def _cook_recipe(self, recipe, count, total): # A Recipe depending on a static library that has been rebuilt # also needs to be rebuilt to pick up the latest build. if recipe.library_type != LibraryType.STATIC: if len(set(self._static_libraries_built) & set(recipe.deps)) != 0: self.cookbook.reset_recipe_status(recipe.name) + if not self.cookbook.recipe_needs_build(recipe.name) and \ not self.force: m.build_step(count, total, recipe.name, _("already built")) @@ -149,22 +297,45 @@ class Oven (object): recipe.force = self.force for desc, step in recipe.steps: - m.build_step(count, total, recipe.name, step) # check if the current step needs to be done if self.cookbook.step_done(recipe.name, step) and not self.force: + m.build_step(count, total, recipe.name, step) m.action(_("Step done")) continue try: # call step function stepfunc = getattr(recipe, step) if not stepfunc: + m.build_step(count, total, recipe.name, step) raise FatalError(_('Step %s not found') % step) - if asyncio.iscoroutinefunction(stepfunc): - run_until_complete(stepfunc()) + + lock = None + if step == BuildSteps.COMPILE[1] \ + and hasattr(recipe, "allow_parallel_build") \ + and recipe.allow_parallel_build: + # only allow a limited number of recipes that can fill all + # CPU cores to execute concurrently. Any recipe that does + # not support parallel builds will always be executed + lock = self._build_lock + if step in (BuildSteps.INSTALL[1], BuildSteps.POST_INSTALL[1]): + # only allow a single install to occur + lock = self._install_lock + + if lock: + async with lock: + m.build_step(count, total, recipe.name, step) + ret = stepfunc() + if asyncio.iscoroutine(ret): + await ret else: - stepfunc() + m.build_step(count, total, recipe.name, step) + ret = stepfunc() + if asyncio.iscoroutine(ret): + await ret # update status successfully self.cookbook.update_step_status(recipe.name, step) + except asyncio.CancelledError: + raise except FatalError as e: exc_traceback = sys.exc_info()[2] trace = '' diff --git a/cerbero/build/recipe.py b/cerbero/build/recipe.py index 79435229..4ca3e019 100644 --- a/cerbero/build/recipe.py +++ b/cerbero/build/recipe.py @@ -35,7 +35,7 @@ from cerbero.ide.vs.genlib import GenLib, GenGnuLib from cerbero.tools.osxuniversalgenerator import OSXUniversalGenerator from cerbero.tools.osxrelocator import OSXRelocator from cerbero.utils import N_, _ -from cerbero.utils import shell, add_system_libs, run_until_complete +from cerbero.utils import shell, add_system_libs from cerbero.utils import messages as m from cerbero.tools.libtool import LibtoolLibrary @@ -740,7 +740,11 @@ class MetaUniversalRecipe(type): def __init__(cls, name, bases, ns): step_func = ns.get('_do_step') for _, step in BuildSteps(): - setattr(cls, step, lambda self, name=step: step_func(self, name)) + async def doit(recipe, step_name=step): + ret = step_func(recipe, step_name) + if asyncio.iscoroutine(ret): + await ret + setattr(cls, step, doit) class BaseUniversalRecipe(object, metaclass=MetaUniversalRecipe): @@ -830,18 +834,35 @@ class UniversalRecipe(BaseUniversalRecipe, UniversalFilesProvider): return [] return self._proxy_recipe.steps[:] - def _do_step(self, step): - if step in BuildSteps.FETCH: + async def _do_step(self, step): + if step == BuildSteps.FETCH[1]: arch, recipe = list(self._recipes.items())[0] - run_until_complete(self._async_run_step(recipe, step, arch)) + await self._async_run_step(recipe, step, arch) return + tasks = [] for arch, recipe in self._recipes.items(): stepfunc = getattr(recipe, step) if asyncio.iscoroutinefunction(stepfunc): - run_until_complete(self._async_run_step(recipe, step, arch)) + if step in (BuildSteps.CONFIGURE[1], BuildSteps.EXTRACT[1]): + tasks.append(asyncio.create_task(self._async_run_step(recipe, step, arch))) + else: + await self._async_run_step(recipe, step, arch) else: self._run_step(recipe, step, arch) + if tasks: + try: + await asyncio.gather(*tasks, return_exceptions=False) + except Exception as e: + [task.cancel() for task in tasks] + ret = await asyncio.gather(*tasks, return_exceptions=True) + # we want to find the actuall exception rather than one + # that may be returned from task.cancel() + if not isinstance(e, asyncio.CancelledError): + raise e + for e in ret: + if not isinstance(e, asyncio.CancelledError): + raise e class UniversalFlatRecipe(BaseUniversalRecipe, UniversalFlatFilesProvider): @@ -888,11 +909,11 @@ class UniversalFlatRecipe(BaseUniversalRecipe, UniversalFlatFilesProvider): for f, archs in arch_files.items(): generator.merge_files([f], [os.path.join(self._config.prefix, arch) for arch in archs]) - def _do_step(self, step): + async def _do_step(self, step): if step in BuildSteps.FETCH: arch, recipe = list(self._recipes.items())[0] # No, really, let's not download a million times... - run_until_complete(self._async_run_step(recipe, step, arch)) + await self._async_run_step(recipe, step, arch) return # For the universal build we need to configure both architectures with @@ -905,7 +926,7 @@ class UniversalFlatRecipe(BaseUniversalRecipe, UniversalFlatFilesProvider): # Create a stamp file to list installed files based on the # modification time of this file if step in [BuildSteps.INSTALL[1], BuildSteps.POST_INSTALL[1]]: - time.sleep(2) #wait 2 seconds to make sure new files get the + await asyncio.sleep(2) #wait 2 seconds to make sure new files get the #proper time difference, this fixes an issue of #the next recipe to be built listing the previous #recipe files as their own @@ -918,7 +939,7 @@ class UniversalFlatRecipe(BaseUniversalRecipe, UniversalFlatFilesProvider): # Call the step function stepfunc = getattr(recipe, step) if asyncio.iscoroutinefunction(stepfunc): - run_until_complete(self._async_run_step(recipe, step, arch)) + await self._async_run_step(recipe, step, arch) else: self._run_step(recipe, step, arch) diff --git a/cerbero/commands/bootstrap.py b/cerbero/commands/bootstrap.py index 551b35a2..6e1b2f7d 100644 --- a/cerbero/commands/bootstrap.py +++ b/cerbero/commands/bootstrap.py @@ -39,20 +39,25 @@ class Bootstrap(Command): ArgparseArgument('--offline', action='store_true', default=False, help=_('Use only the source cache, no network')), ArgparseArgument('-y', '--assume-yes', action='store_true', - default=False, help=('Automatically say yes to prompts and run non-interactively'))] + default=False, help=('Automatically say yes to prompts and run non-interactively')), + ArgparseArgument('--jobs', '-j', action='store', type=int, + default=0, help=_('How many recipes to build concurrently. ' + '0 = number of CPUs.'))] Command.__init__(self, args) def run(self, config, args): bootstrappers = Bootstrapper(config, args.build_tools_only, args.offline, args.assume_yes, args.system_only) tasks = [] + async def bootstrap_fetch_extract(bs): + await bs.fetch() + await bs.extract() for bootstrapper in bootstrappers: - tasks.append(bootstrapper.fetch()) + tasks.append(bootstrap_fetch_extract(bootstrapper)) run_until_complete(tasks) for bootstrapper in bootstrappers: - bootstrapper.extract() - bootstrapper.start() + bootstrapper.start(jobs=args.jobs) class FetchBootstrap(Command): diff --git a/cerbero/commands/build.py b/cerbero/commands/build.py index 3ee16aec..c0d2a4c0 100644 --- a/cerbero/commands/build.py +++ b/cerbero/commands/build.py @@ -41,6 +41,9 @@ class Build(Command): help=_('only print commands instead of running them ')), ArgparseArgument('--offline', action='store_true', default=False, help=_('Use only the source cache, no network')), + ArgparseArgument('--jobs', '-j', action='store', type=int, + default=0, help=_('How many recipes to build concurrently. ' + '0 = number of CPUs.')), ] if force is None: args.append( @@ -66,17 +69,17 @@ class Build(Command): self.no_deps = args.no_deps self.runargs(config, args.recipe, args.missing_files, self.force, self.no_deps, dry_run=args.dry_run, offline=args.offline, - deps_only=self.deps_only) + deps_only=self.deps_only, jobs=args.jobs) def runargs(self, config, recipes, missing_files=False, force=False, no_deps=False, cookbook=None, dry_run=False, offline=False, - deps_only=False): + deps_only=False, jobs=None): if cookbook is None: cookbook = CookBook(config, offline=offline) oven = Oven(recipes, cookbook, force=self.force, no_deps=self.no_deps, missing_files=missing_files, - dry_run=dry_run, deps_only=deps_only) + dry_run=dry_run, deps_only=deps_only, jobs=jobs) oven.start_cooking() diff --git a/cerbero/commands/package.py b/cerbero/commands/package.py index 40f14483..0afd15f0 100644 --- a/cerbero/commands/package.py +++ b/cerbero/commands/package.py @@ -66,6 +66,9 @@ class Package(Command): ArgparseArgument('--xz', action='store_true', default=False, help=_('Use xz instead of bzip2 for compression if ' 'creating a tarball')), + ArgparseArgument('--jobs', '-j', action='store', type=int, + default=0, help=_('How many recipes to build concurrently. ' + '0 = number of CPUs.')), ]) def run(self, config, args): @@ -77,7 +80,7 @@ class Package(Command): "--only-build-deps")) if not args.skip_deps_build: - self._build_deps(config, p, args.no_devel, args.offline, args.dry_run) + self._build_deps(config, p, args.no_devel, args.offline, args.dry_run, args.jobs) if args.only_build_deps or args.dry_run: return @@ -111,10 +114,11 @@ class Package(Command): m.action(_("Package successfully created in %s") % ' '.join([os.path.abspath(x) for x in paths])) - def _build_deps(self, config, package, has_devel, offline, dry_run): + def _build_deps(self, config, package, has_devel, offline, dry_run, jobs): build_command = build.Build() build_command.runargs(config, package.recipes_dependencies(has_devel), - cookbook=self.store.cookbook, dry_run=dry_run, offline=offline) + cookbook=self.store.cookbook, dry_run=dry_run, offline=offline, + jobs=jobs) register_command(Package) -- cgit v1.2.3