From c9c2e0d947448ca1802af80a02fa17ea454e998b Mon Sep 17 00:00:00 2001 From: Martin Peres Date: Thu, 15 Jun 2017 19:21:05 +0300 Subject: smartezbench: rework the locking to make it reentrant This allows hooks to call functions that may want to reload the state. --- python-modules/ezbench/smartezbench.py | 396 +++++++++++++++++++++------------ 1 file changed, 252 insertions(+), 144 deletions(-) diff --git a/python-modules/ezbench/smartezbench.py b/python-modules/ezbench/smartezbench.py index a719856..c6dc300 100644 --- a/python-modules/ezbench/smartezbench.py +++ b/python-modules/ezbench/smartezbench.py @@ -194,7 +194,67 @@ class SmartEzbenchAttributes(Enum): report_deadline_soft = 401 report_deadline_hard = 402 +class StateLock: + def __init__(self, lock_path): + self.lock_path = lock_path + self.rlock = threading.RLock() + self.lock_fd = None + + # Since rlock does not allow us to query the count + self._ref_cnt = 0 + self._ref_cnt_lock = threading.Lock() + + def __del__(self): + with self._ref_cnt_lock: + if self._ref_cnt > 0: + print("WARNING: StateLock instance is destroyed with ref_cnt > 0") + + def ref_cnt(self): + with self._ref_cnt_lock: + return self._ref_cnt + + def acquire(self): + # First, make sure to acquire the process lock, that prevents multiple + # threads from doing concurent accesses + self.rlock.acquire() + + # Keep track of the count the lock has been acquired + with self._ref_cnt_lock: + self._ref_cnt += 1 + + if self._ref_cnt == 1: + try: + self.lock_fd = open(self.lock_path, 'w') + fcntl.flock(self.lock_fd, fcntl.LOCK_EX) + return True + except IOError as e: + print("Could not lock the report: " + str(e)) + self._ref_cnt = 0 + self.rlock.release() + raise ValueError("Can't lock the report") + + def release(self): + # Keep track of the count the lock has been acquired + with self._ref_cnt_lock: + self._ref_cnt -= 1 + + if self._ref_cnt == 0: + try: + fcntl.flock(self.lock_fd, fcntl.LOCK_UN) + self.lock_fd.close() + self.lock_fd = None + except Exception as e: + print("Cannot release the lock: " + str(e)) + pass + + # Finally, release the process lock, that prevents multiple threads from + # doing concurent accesses + self.rlock.release() + class SmartEzbench: + state_locks = dict() + state_locks_lock = threading.Lock() + @classmethod def list_reports(cls, ezbench_dir, updatedSince = 0): log_dir = ezbench_dir + '/logs' @@ -228,6 +288,14 @@ class SmartEzbench: self._first_run = False self._deleted = False + # Make sure we do not use two different instances of StateLock for the + # same report + with self.state_locks_lock: + self.state_lock = self.state_locks.get(report_name, None) + if self.state_lock is None: + self.state_locks[report_name] = StateLock(self.smart_ezbench_lock) + self.state_lock = self.state_locks[report_name] + self.state = dict() self.state['commits'] = dict() self.state['mode'] = RunningMode.INITIAL.value @@ -264,15 +332,16 @@ class SmartEzbench: def delete(self): self.__grab_lock() - if not shutil.rmtree.avoids_symlink_attacks: - self.__log(Criticality.WW, "Deleting the report unsafely (symlink attack)") - else: - self.__log(Criticality.II, "Deleting the report({}) safely".format(self.log_folder)) - shutil.rmtree(self.log_folder) - - self._deleted = True + try: + if not shutil.rmtree.avoids_symlink_attacks: + self.__log(Criticality.WW, "Deleting the report unsafely (symlink attack)") + else: + self.__log(Criticality.II, "Deleting the report({}) safely".format(self.log_folder)) + shutil.rmtree(self.log_folder) - self.__release_lock() + self._deleted = True + finally: + self.__release_lock() def __log(self, error, msg): time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -316,25 +385,14 @@ class SmartEzbench: def __grab_lock(self): if self.readonly: return - - try: - self.lock_fd = open(self.smart_ezbench_lock, 'w') - fcntl.flock(self.lock_fd, fcntl.LOCK_EX) - return True - except IOError as e: - self.__log(Criticality.EE, "Could not lock the report: " + str(e)) - raise ValueError("Can't lock the report") + else: + self.state_lock.acquire() def __release_lock(self): if self.readonly: return - - try: - fcntl.flock(self.lock_fd, fcntl.LOCK_UN) - self.lock_fd.close() - except Exception as e: - self.__log(Criticality.EE, "Cannot release the lock: " + str(e)) - pass + else: + self.state_lock.release() def __update_state_version__(self): upgraded = False @@ -393,9 +451,15 @@ class SmartEzbench: pass return False - def __reload_state(self, keep_lock = False): + def __reload_state(self, keep_lock=False): self.__grab_lock() - ret = self.__reload_state_unlocked() + + if self.state_lock.ref_cnt() == 1: + ret = self.__reload_state_unlocked() + else: + print("WARNING: Recursive call to __reload_state()") + ret = True + if not keep_lock: self.__release_lock() return ret @@ -424,6 +488,8 @@ class SmartEzbench: if profile is None: profile = self.__read_attribute_unlocked__('profile') + if profile is None: + return None runner.set_profile(profile) for conf_script in self.__read_attribute_unlocked__('conf_scripts', []): @@ -449,9 +515,12 @@ class SmartEzbench: return False def __write_attribute__(self, attr, value, allow_updates = False): + ret = False self.__reload_state(keep_lock=True) - ret = self.__write_attribute_unlocked__(attr, value, allow_updates) - self.__release_lock() + try: + ret = self.__write_attribute_unlocked__(attr, value, allow_updates) + finally: + self.__release_lock() return ret def __running_mode_unlocked__(self, check_running = True): @@ -463,9 +532,12 @@ class SmartEzbench: return RunningMode(mode) def running_mode(self, check_running = True): + ret = RunningMode.INITIAL self.__reload_state(keep_lock=True) - ret = self.__running_mode_unlocked__(check_running) - self.__release_lock() + try: + ret = self.__running_mode_unlocked__(check_running) + finally: + self.__release_lock() return ret def __set_running_mode_unlocked__(self, mode): @@ -490,8 +562,10 @@ class SmartEzbench: return False self.__reload_state(keep_lock=True) - self.__set_running_mode_unlocked__(mode) - self.__release_lock() + try: + self.__set_running_mode_unlocked__(mode) + finally: + self.__release_lock() return True @@ -505,28 +579,30 @@ class SmartEzbench: def set_profile(self, profile): ret = False self.__reload_state(keep_lock=True) - if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False: - # Check that the profile exists! - try: - runner = self.__create_ezbench(profile=profile) + try: + if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False: + # Check that the profile exists! + try: + runner = self.__create_ezbench(profile=profile) - self.state['profile'] = profile - self.__log(Criticality.II, "Ezbench profile set to '{profile}'".format(profile=profile)) - self.__save_state() + self.state['profile'] = profile + self.__log(Criticality.II, "Ezbench profile set to '{profile}'".format(profile=profile)) + self.__save_state() + ret = True + except RunnerError as e: + if e.args[0]['err_code'] == RunnerErrorCode.CMD_PROFILE_INVALID: + self.__log(Criticality.EE, + "Invalid profile name '{}'.".format(profile)) + else: + self.__log(Criticality.EE, + "The following error arose '{}({})'.".format(e.args[0]['err_code'], + e.args[0]['err_str'])) + elif profile == self.state['profile']: ret = True - except RunnerError as e: - if e.args[0]['err_code'] == RunnerErrorCode.CMD_PROFILE_INVALID: - self.__log(Criticality.EE, - "Invalid profile name '{}'.".format(profile)) - else: - self.__log(Criticality.EE, - "The following error arose '{}({})'.".format(e.args[0]['err_code'], - e.args[0]['err_str'])) - elif profile == self.state['profile']: - ret = True - else: - self.__log(Criticality.EE, "You cannot change the profile of a report that already has results. Start a new one.") - self.__release_lock() + else: + self.__log(Criticality.EE, "You cannot change the profile of a report that already has results. Start a new one.") + finally: + self.__release_lock() return ret @@ -535,31 +611,35 @@ class SmartEzbench: def add_conf_script(self, conf_script): self.__reload_state(keep_lock=True) - if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False: - if "conf_scripts" not in self.state: - self.state['conf_scripts'] = list() + try: + if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False: + if "conf_scripts" not in self.state: + self.state['conf_scripts'] = list() - if conf_script not in self.state['conf_scripts']: - self.__log(Criticality.II, "Add configuration script '{0}'".format(conf_script)) - self.state['conf_scripts'].append(conf_script) - self.__save_state() - else: - self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.") - self.__release_lock() + if conf_script not in self.state['conf_scripts']: + self.__log(Criticality.II, "Add configuration script '{0}'".format(conf_script)) + self.state['conf_scripts'].append(conf_script) + self.__save_state() + else: + self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.") + finally: + self.__release_lock() def remove_conf_script(self, conf_script): self.__reload_state(keep_lock=True) - if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False: - if "conf_scripts" in self.state: - try: - self.state['conf_scripts'].remove(conf_script) - self.__log(Criticality.II, "Remove configuration script '{0}'".format(conf_script)) - self.__save_state() - except: - pass - else: - self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.") - self.__release_lock() + try: + if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False: + if "conf_scripts" in self.state: + try: + self.state['conf_scripts'].remove(conf_script) + self.__log(Criticality.II, "Remove configuration script '{0}'".format(conf_script)) + self.__save_state() + except: + pass + else: + self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.") + finally: + self.__release_lock() def commit_url(self): return self.__read_attribute__('commit_url') @@ -625,27 +705,35 @@ class SmartEzbench: def add_test(self, commit, test, rounds = None, user_requested=True): self.__reload_state(keep_lock=True) - source = "user" if user_requested else "machine" - self.__log(Criticality.II, "Add the {}-requested test {} ({} rounds)".format(source, test, rounds)) - total_rounds = self.__add_test_unlocked__(commit, test, rounds) - self.__save_state() - self.__release_lock() + total_rounds = 0 + try: + source = "user" if user_requested else "machine" + self.__log(Criticality.II, "Add the {}-requested test {} ({} rounds)".format(source, test, rounds)) + total_rounds = self.__add_test_unlocked__(commit, test, rounds) + self.__save_state() + finally: + self.__release_lock() return total_rounds def add_testset(self, commit, testset, rounds = 1, ensure=False, user_requested=True): self.__reload_state(keep_lock=True) - self.__log(Criticality.II, "Add the testset {} ({} tests)".format(testset.name, - len(testset))) + try: + self.__log(Criticality.II, "Add the testset {} ({} tests)".format(testset.name, + len(testset))) - for test in sorted(testset.keys()): - if not ensure: - self.__add_test_unlocked__(commit, test, testset[test] * rounds, user_requested) - else: - self.__force_test_rounds_unlocked__(commit, test, testset[test] * rounds, user_requested) + for test in sorted(testset.keys()): + if not ensure: + self.__add_test_unlocked__(commit, test, testset[test] * rounds, + user_requested) + else: + self.__force_test_rounds_unlocked__(commit, test, + testset[test] * rounds, + user_requested) - self.__save_state() - self.__release_lock() + self.__save_state() + finally: + self.__release_lock() def __force_test_rounds_unlocked__(self, commit, test, at_least, user_requested=True): scm = self.repo() @@ -679,20 +767,26 @@ class SmartEzbench: return 0 def force_test_rounds(self, commit, test, at_least, user_requested=True): + ret = 0 + self.__reload_state(keep_lock=True) - ret = self.__force_test_rounds_unlocked__(commit, test, at_least, user_requested) - self.__save_state() - self.__release_lock() + try: + ret = self.__force_test_rounds_unlocked__(commit, test, at_least, user_requested) + self.__save_state() + finally: + self.__release_lock() return ret def reset_work(self): self.__reload_state(keep_lock=True) - self.__log(Criticality.II, "Reset the queued work") - self.state['tasks']['user']['commits'] = dict() - self.state['tasks']['auto']['commits'] = dict() - self.__save_state() - self.__release_lock() + try: + self.__log(Criticality.II, "Reset the queued work") + self.state['tasks']['user']['commits'] = dict() + self.state['tasks']['auto']['commits'] = dict() + self.__save_state() + finally: + self.__release_lock() def task_info(self): self._task_lock.acquire() @@ -794,20 +888,23 @@ class SmartEzbench: return task_list def __change_state_to_run__(self): - self.__reload_state(keep_lock=True) ret = False - running_state=self.__running_mode_unlocked__() - if running_state == RunningMode.INITIAL or running_state == RunningMode.RUNNING: - self.__write_attribute_unlocked__('mode', RunningMode.RUN.value, allow_updates = True) - self.__log(Criticality.II, "Ezbench running mode set to RUN") - ret = True - elif running_state != RunningMode.RUN: - self.__log(Criticality.II, - "We cannot run when the current running mode is {mode}.".format(mode=running_state.name)) - ret = False - else: - ret = True - self.__release_lock() + + self.__reload_state(keep_lock=True) + try: + running_state=self.__running_mode_unlocked__() + if running_state == RunningMode.INITIAL or running_state == RunningMode.RUNNING: + self.__write_attribute_unlocked__('mode', RunningMode.RUN.value, allow_updates = True) + self.__log(Criticality.II, "Ezbench running mode set to RUN") + ret = True + elif running_state != RunningMode.RUN: + self.__log(Criticality.II, + "We cannot run when the current running mode is {mode}.".format(mode=running_state.name)) + ret = False + else: + ret = True + finally: + self.__release_lock() return ret def __done_running__(self, runner): @@ -1118,12 +1215,14 @@ class SmartEzbench: return commit_weight * test_weight * severity def __attribute__(self, key, default): - self.__reload_state(keep_lock = True) - if "attributes" not in self.state: - ret = default - else: - ret = self.state['attributes'].get(key, default) - self.__release_lock() + ret = default + + self.__reload_state(keep_lock=True) + try: + if "attributes" in self.state: + ret = self.state['attributes'].get(key, default) + finally: + self.__release_lock() return ret @classmethod @@ -1155,26 +1254,33 @@ class SmartEzbench: # verify that the attribute exists p = SmartEzbenchAttributes[param] - self.__reload_state(keep_lock = True) - if "attributes" not in self.state: - self.state['attributes'] = dict() - self.state['attributes'][param] = float(value) - self.__save_state() - self.__release_lock() + self.__reload_state(keep_lock=True) + try: + if "attributes" not in self.state: + self.state['attributes'] = dict() + self.state['attributes'][param] = float(value) + self.__save_state() + finally: + self.__release_lock() self.__log(Criticality.II, "Attribute '{}' set to {}".format(param, value)) def user_data(self, key, default=None): - self.__reload_state(keep_lock = True) - ret = self.state['user_data'].get(key, default) - self.__release_lock() + ret = False + self.__reload_state(keep_lock=True) + try: + ret = self.state.get('user_data', dict()).get(key, default) + finally: + self.__release_lock() return ret def set_user_data(self, key, value): - self.__reload_state(keep_lock = True) - self.state['user_data'][key] = value - self.__save_state() - self.__release_lock() + self.__reload_state(keep_lock=True) + try: + self.state['user_data'][key] = value + self.__save_state() + finally: + self.__release_lock() def schedule_enhancements(self): # Read all the attributes @@ -1304,25 +1410,27 @@ class SmartEzbench: # biggest score to speed up bisecting of the most important issues scheduled_commits = total_added = 0 self.__reload_state(keep_lock=True) - while len(tasks_sorted) > 0 and scheduled_commits < commit_schedule_max: - commit = tasks_sorted[-1][1] - self.__log(Criticality.DD, "Add all the tasks using commit {}".format(commit)) - for t in tasks_sorted: - if t[1] == commit: - added = self.__force_test_rounds_unlocked__(t[1], t[2], t[3], user_requested=False) - if added > 0: - self.__log(Criticality.II, - "Scheduled {} more runs for the test {} on commit {}".format(added, t[2], commit)) - total_added += added + try: + while len(tasks_sorted) > 0 and scheduled_commits < commit_schedule_max: + commit = tasks_sorted[-1][1] + self.__log(Criticality.DD, "Add all the tasks using commit {}".format(commit)) + for t in tasks_sorted: + if t[1] == commit: + added = self.__force_test_rounds_unlocked__(t[1], t[2], t[3], user_requested=False) + if added > 0: + self.__log(Criticality.II, + "Scheduled {} more runs for the test {} on commit {}".format(added, t[2], commit)) + total_added += added + if total_added > 0: + self.__log(Criticality.II, "{}".format(t[4])) + scheduled_commits += 1 + else: + self.__log(Criticality.DD, "No work scheduled using commit {}, try another one".format(commit)) + del tasks_sorted[-1] if total_added > 0: - self.__log(Criticality.II, "{}".format(t[4])) - scheduled_commits += 1 - else: - self.__log(Criticality.DD, "No work scheduled using commit {}, try another one".format(commit)) - del tasks_sorted[-1] - if total_added > 0: - self.__save_state() - self.__release_lock() + self.__save_state() + finally: + self.__release_lock() self.__log(Criticality.II, "Done enhancing the report") -- cgit v1.2.3