summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Peres <martin.peres@linux.intel.com>2017-06-15 19:21:05 +0300
committerMartin Peres <martin.peres@linux.intel.com>2017-10-04 17:48:28 +0300
commitc9c2e0d947448ca1802af80a02fa17ea454e998b (patch)
tree3802e21f3173a53d93c5e8879426a7e44a45fd5e
parent04673983b1f0784980e66d230af06cbfa0c022cc (diff)
smartezbench: rework the locking to make it reentrant
This allows hooks to call functions that may want to reload the state.
-rw-r--r--python-modules/ezbench/smartezbench.py396
1 files changed, 252 insertions, 144 deletions
diff --git a/python-modules/ezbench/smartezbench.py b/python-modules/ezbench/smartezbench.py
index a719856..c6dc300 100644
--- a/python-modules/ezbench/smartezbench.py
+++ b/python-modules/ezbench/smartezbench.py
@@ -194,7 +194,67 @@ class SmartEzbenchAttributes(Enum):
report_deadline_soft = 401
report_deadline_hard = 402
+class StateLock:
+ def __init__(self, lock_path):
+ self.lock_path = lock_path
+ self.rlock = threading.RLock()
+ self.lock_fd = None
+
+ # Since rlock does not allow us to query the count
+ self._ref_cnt = 0
+ self._ref_cnt_lock = threading.Lock()
+
+ def __del__(self):
+ with self._ref_cnt_lock:
+ if self._ref_cnt > 0:
+ print("WARNING: StateLock instance is destroyed with ref_cnt > 0")
+
+ def ref_cnt(self):
+ with self._ref_cnt_lock:
+ return self._ref_cnt
+
+ def acquire(self):
+ # First, make sure to acquire the process lock, that prevents multiple
+ # threads from doing concurent accesses
+ self.rlock.acquire()
+
+ # Keep track of the count the lock has been acquired
+ with self._ref_cnt_lock:
+ self._ref_cnt += 1
+
+ if self._ref_cnt == 1:
+ try:
+ self.lock_fd = open(self.lock_path, 'w')
+ fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
+ return True
+ except IOError as e:
+ print("Could not lock the report: " + str(e))
+ self._ref_cnt = 0
+ self.rlock.release()
+ raise ValueError("Can't lock the report")
+
+ def release(self):
+ # Keep track of the count the lock has been acquired
+ with self._ref_cnt_lock:
+ self._ref_cnt -= 1
+
+ if self._ref_cnt == 0:
+ try:
+ fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
+ self.lock_fd.close()
+ self.lock_fd = None
+ except Exception as e:
+ print("Cannot release the lock: " + str(e))
+ pass
+
+ # Finally, release the process lock, that prevents multiple threads from
+ # doing concurent accesses
+ self.rlock.release()
+
class SmartEzbench:
+ state_locks = dict()
+ state_locks_lock = threading.Lock()
+
@classmethod
def list_reports(cls, ezbench_dir, updatedSince = 0):
log_dir = ezbench_dir + '/logs'
@@ -228,6 +288,14 @@ class SmartEzbench:
self._first_run = False
self._deleted = False
+ # Make sure we do not use two different instances of StateLock for the
+ # same report
+ with self.state_locks_lock:
+ self.state_lock = self.state_locks.get(report_name, None)
+ if self.state_lock is None:
+ self.state_locks[report_name] = StateLock(self.smart_ezbench_lock)
+ self.state_lock = self.state_locks[report_name]
+
self.state = dict()
self.state['commits'] = dict()
self.state['mode'] = RunningMode.INITIAL.value
@@ -264,15 +332,16 @@ class SmartEzbench:
def delete(self):
self.__grab_lock()
- if not shutil.rmtree.avoids_symlink_attacks:
- self.__log(Criticality.WW, "Deleting the report unsafely (symlink attack)")
- else:
- self.__log(Criticality.II, "Deleting the report({}) safely".format(self.log_folder))
- shutil.rmtree(self.log_folder)
-
- self._deleted = True
+ try:
+ if not shutil.rmtree.avoids_symlink_attacks:
+ self.__log(Criticality.WW, "Deleting the report unsafely (symlink attack)")
+ else:
+ self.__log(Criticality.II, "Deleting the report({}) safely".format(self.log_folder))
+ shutil.rmtree(self.log_folder)
- self.__release_lock()
+ self._deleted = True
+ finally:
+ self.__release_lock()
def __log(self, error, msg):
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@@ -316,25 +385,14 @@ class SmartEzbench:
def __grab_lock(self):
if self.readonly:
return
-
- try:
- self.lock_fd = open(self.smart_ezbench_lock, 'w')
- fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
- return True
- except IOError as e:
- self.__log(Criticality.EE, "Could not lock the report: " + str(e))
- raise ValueError("Can't lock the report")
+ else:
+ self.state_lock.acquire()
def __release_lock(self):
if self.readonly:
return
-
- try:
- fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
- self.lock_fd.close()
- except Exception as e:
- self.__log(Criticality.EE, "Cannot release the lock: " + str(e))
- pass
+ else:
+ self.state_lock.release()
def __update_state_version__(self):
upgraded = False
@@ -393,9 +451,15 @@ class SmartEzbench:
pass
return False
- def __reload_state(self, keep_lock = False):
+ def __reload_state(self, keep_lock=False):
self.__grab_lock()
- ret = self.__reload_state_unlocked()
+
+ if self.state_lock.ref_cnt() == 1:
+ ret = self.__reload_state_unlocked()
+ else:
+ print("WARNING: Recursive call to __reload_state()")
+ ret = True
+
if not keep_lock:
self.__release_lock()
return ret
@@ -424,6 +488,8 @@ class SmartEzbench:
if profile is None:
profile = self.__read_attribute_unlocked__('profile')
+ if profile is None:
+ return None
runner.set_profile(profile)
for conf_script in self.__read_attribute_unlocked__('conf_scripts', []):
@@ -449,9 +515,12 @@ class SmartEzbench:
return False
def __write_attribute__(self, attr, value, allow_updates = False):
+ ret = False
self.__reload_state(keep_lock=True)
- ret = self.__write_attribute_unlocked__(attr, value, allow_updates)
- self.__release_lock()
+ try:
+ ret = self.__write_attribute_unlocked__(attr, value, allow_updates)
+ finally:
+ self.__release_lock()
return ret
def __running_mode_unlocked__(self, check_running = True):
@@ -463,9 +532,12 @@ class SmartEzbench:
return RunningMode(mode)
def running_mode(self, check_running = True):
+ ret = RunningMode.INITIAL
self.__reload_state(keep_lock=True)
- ret = self.__running_mode_unlocked__(check_running)
- self.__release_lock()
+ try:
+ ret = self.__running_mode_unlocked__(check_running)
+ finally:
+ self.__release_lock()
return ret
def __set_running_mode_unlocked__(self, mode):
@@ -490,8 +562,10 @@ class SmartEzbench:
return False
self.__reload_state(keep_lock=True)
- self.__set_running_mode_unlocked__(mode)
- self.__release_lock()
+ try:
+ self.__set_running_mode_unlocked__(mode)
+ finally:
+ self.__release_lock()
return True
@@ -505,28 +579,30 @@ class SmartEzbench:
def set_profile(self, profile):
ret = False
self.__reload_state(keep_lock=True)
- if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False:
- # Check that the profile exists!
- try:
- runner = self.__create_ezbench(profile=profile)
+ try:
+ if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False:
+ # Check that the profile exists!
+ try:
+ runner = self.__create_ezbench(profile=profile)
- self.state['profile'] = profile
- self.__log(Criticality.II, "Ezbench profile set to '{profile}'".format(profile=profile))
- self.__save_state()
+ self.state['profile'] = profile
+ self.__log(Criticality.II, "Ezbench profile set to '{profile}'".format(profile=profile))
+ self.__save_state()
+ ret = True
+ except RunnerError as e:
+ if e.args[0]['err_code'] == RunnerErrorCode.CMD_PROFILE_INVALID:
+ self.__log(Criticality.EE,
+ "Invalid profile name '{}'.".format(profile))
+ else:
+ self.__log(Criticality.EE,
+ "The following error arose '{}({})'.".format(e.args[0]['err_code'],
+ e.args[0]['err_str']))
+ elif profile == self.state['profile']:
ret = True
- except RunnerError as e:
- if e.args[0]['err_code'] == RunnerErrorCode.CMD_PROFILE_INVALID:
- self.__log(Criticality.EE,
- "Invalid profile name '{}'.".format(profile))
- else:
- self.__log(Criticality.EE,
- "The following error arose '{}({})'.".format(e.args[0]['err_code'],
- e.args[0]['err_str']))
- elif profile == self.state['profile']:
- ret = True
- else:
- self.__log(Criticality.EE, "You cannot change the profile of a report that already has results. Start a new one.")
- self.__release_lock()
+ else:
+ self.__log(Criticality.EE, "You cannot change the profile of a report that already has results. Start a new one.")
+ finally:
+ self.__release_lock()
return ret
@@ -535,31 +611,35 @@ class SmartEzbench:
def add_conf_script(self, conf_script):
self.__reload_state(keep_lock=True)
- if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False:
- if "conf_scripts" not in self.state:
- self.state['conf_scripts'] = list()
+ try:
+ if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False:
+ if "conf_scripts" not in self.state:
+ self.state['conf_scripts'] = list()
- if conf_script not in self.state['conf_scripts']:
- self.__log(Criticality.II, "Add configuration script '{0}'".format(conf_script))
- self.state['conf_scripts'].append(conf_script)
- self.__save_state()
- else:
- self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.")
- self.__release_lock()
+ if conf_script not in self.state['conf_scripts']:
+ self.__log(Criticality.II, "Add configuration script '{0}'".format(conf_script))
+ self.state['conf_scripts'].append(conf_script)
+ self.__save_state()
+ else:
+ self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.")
+ finally:
+ self.__release_lock()
def remove_conf_script(self, conf_script):
self.__reload_state(keep_lock=True)
- if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False:
- if "conf_scripts" in self.state:
- try:
- self.state['conf_scripts'].remove(conf_script)
- self.__log(Criticality.II, "Remove configuration script '{0}'".format(conf_script))
- self.__save_state()
- except:
- pass
- else:
- self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.")
- self.__release_lock()
+ try:
+ if 'beenRunBefore' not in self.state or self.state['beenRunBefore'] == False:
+ if "conf_scripts" in self.state:
+ try:
+ self.state['conf_scripts'].remove(conf_script)
+ self.__log(Criticality.II, "Remove configuration script '{0}'".format(conf_script))
+ self.__save_state()
+ except:
+ pass
+ else:
+ self.__log(Criticality.EE, "You cannot change the set of scripts of a report that already has results. Start a new one.")
+ finally:
+ self.__release_lock()
def commit_url(self):
return self.__read_attribute__('commit_url')
@@ -625,27 +705,35 @@ class SmartEzbench:
def add_test(self, commit, test, rounds = None, user_requested=True):
self.__reload_state(keep_lock=True)
- source = "user" if user_requested else "machine"
- self.__log(Criticality.II, "Add the {}-requested test {} ({} rounds)".format(source, test, rounds))
- total_rounds = self.__add_test_unlocked__(commit, test, rounds)
- self.__save_state()
- self.__release_lock()
+ total_rounds = 0
+ try:
+ source = "user" if user_requested else "machine"
+ self.__log(Criticality.II, "Add the {}-requested test {} ({} rounds)".format(source, test, rounds))
+ total_rounds = self.__add_test_unlocked__(commit, test, rounds)
+ self.__save_state()
+ finally:
+ self.__release_lock()
return total_rounds
def add_testset(self, commit, testset, rounds = 1, ensure=False, user_requested=True):
self.__reload_state(keep_lock=True)
- self.__log(Criticality.II, "Add the testset {} ({} tests)".format(testset.name,
- len(testset)))
+ try:
+ self.__log(Criticality.II, "Add the testset {} ({} tests)".format(testset.name,
+ len(testset)))
- for test in sorted(testset.keys()):
- if not ensure:
- self.__add_test_unlocked__(commit, test, testset[test] * rounds, user_requested)
- else:
- self.__force_test_rounds_unlocked__(commit, test, testset[test] * rounds, user_requested)
+ for test in sorted(testset.keys()):
+ if not ensure:
+ self.__add_test_unlocked__(commit, test, testset[test] * rounds,
+ user_requested)
+ else:
+ self.__force_test_rounds_unlocked__(commit, test,
+ testset[test] * rounds,
+ user_requested)
- self.__save_state()
- self.__release_lock()
+ self.__save_state()
+ finally:
+ self.__release_lock()
def __force_test_rounds_unlocked__(self, commit, test, at_least, user_requested=True):
scm = self.repo()
@@ -679,20 +767,26 @@ class SmartEzbench:
return 0
def force_test_rounds(self, commit, test, at_least, user_requested=True):
+ ret = 0
+
self.__reload_state(keep_lock=True)
- ret = self.__force_test_rounds_unlocked__(commit, test, at_least, user_requested)
- self.__save_state()
- self.__release_lock()
+ try:
+ ret = self.__force_test_rounds_unlocked__(commit, test, at_least, user_requested)
+ self.__save_state()
+ finally:
+ self.__release_lock()
return ret
def reset_work(self):
self.__reload_state(keep_lock=True)
- self.__log(Criticality.II, "Reset the queued work")
- self.state['tasks']['user']['commits'] = dict()
- self.state['tasks']['auto']['commits'] = dict()
- self.__save_state()
- self.__release_lock()
+ try:
+ self.__log(Criticality.II, "Reset the queued work")
+ self.state['tasks']['user']['commits'] = dict()
+ self.state['tasks']['auto']['commits'] = dict()
+ self.__save_state()
+ finally:
+ self.__release_lock()
def task_info(self):
self._task_lock.acquire()
@@ -794,20 +888,23 @@ class SmartEzbench:
return task_list
def __change_state_to_run__(self):
- self.__reload_state(keep_lock=True)
ret = False
- running_state=self.__running_mode_unlocked__()
- if running_state == RunningMode.INITIAL or running_state == RunningMode.RUNNING:
- self.__write_attribute_unlocked__('mode', RunningMode.RUN.value, allow_updates = True)
- self.__log(Criticality.II, "Ezbench running mode set to RUN")
- ret = True
- elif running_state != RunningMode.RUN:
- self.__log(Criticality.II,
- "We cannot run when the current running mode is {mode}.".format(mode=running_state.name))
- ret = False
- else:
- ret = True
- self.__release_lock()
+
+ self.__reload_state(keep_lock=True)
+ try:
+ running_state=self.__running_mode_unlocked__()
+ if running_state == RunningMode.INITIAL or running_state == RunningMode.RUNNING:
+ self.__write_attribute_unlocked__('mode', RunningMode.RUN.value, allow_updates = True)
+ self.__log(Criticality.II, "Ezbench running mode set to RUN")
+ ret = True
+ elif running_state != RunningMode.RUN:
+ self.__log(Criticality.II,
+ "We cannot run when the current running mode is {mode}.".format(mode=running_state.name))
+ ret = False
+ else:
+ ret = True
+ finally:
+ self.__release_lock()
return ret
def __done_running__(self, runner):
@@ -1118,12 +1215,14 @@ class SmartEzbench:
return commit_weight * test_weight * severity
def __attribute__(self, key, default):
- self.__reload_state(keep_lock = True)
- if "attributes" not in self.state:
- ret = default
- else:
- ret = self.state['attributes'].get(key, default)
- self.__release_lock()
+ ret = default
+
+ self.__reload_state(keep_lock=True)
+ try:
+ if "attributes" in self.state:
+ ret = self.state['attributes'].get(key, default)
+ finally:
+ self.__release_lock()
return ret
@classmethod
@@ -1155,26 +1254,33 @@ class SmartEzbench:
# verify that the attribute exists
p = SmartEzbenchAttributes[param]
- self.__reload_state(keep_lock = True)
- if "attributes" not in self.state:
- self.state['attributes'] = dict()
- self.state['attributes'][param] = float(value)
- self.__save_state()
- self.__release_lock()
+ self.__reload_state(keep_lock=True)
+ try:
+ if "attributes" not in self.state:
+ self.state['attributes'] = dict()
+ self.state['attributes'][param] = float(value)
+ self.__save_state()
+ finally:
+ self.__release_lock()
self.__log(Criticality.II, "Attribute '{}' set to {}".format(param, value))
def user_data(self, key, default=None):
- self.__reload_state(keep_lock = True)
- ret = self.state['user_data'].get(key, default)
- self.__release_lock()
+ ret = False
+ self.__reload_state(keep_lock=True)
+ try:
+ ret = self.state.get('user_data', dict()).get(key, default)
+ finally:
+ self.__release_lock()
return ret
def set_user_data(self, key, value):
- self.__reload_state(keep_lock = True)
- self.state['user_data'][key] = value
- self.__save_state()
- self.__release_lock()
+ self.__reload_state(keep_lock=True)
+ try:
+ self.state['user_data'][key] = value
+ self.__save_state()
+ finally:
+ self.__release_lock()
def schedule_enhancements(self):
# Read all the attributes
@@ -1304,25 +1410,27 @@ class SmartEzbench:
# biggest score to speed up bisecting of the most important issues
scheduled_commits = total_added = 0
self.__reload_state(keep_lock=True)
- while len(tasks_sorted) > 0 and scheduled_commits < commit_schedule_max:
- commit = tasks_sorted[-1][1]
- self.__log(Criticality.DD, "Add all the tasks using commit {}".format(commit))
- for t in tasks_sorted:
- if t[1] == commit:
- added = self.__force_test_rounds_unlocked__(t[1], t[2], t[3], user_requested=False)
- if added > 0:
- self.__log(Criticality.II,
- "Scheduled {} more runs for the test {} on commit {}".format(added, t[2], commit))
- total_added += added
+ try:
+ while len(tasks_sorted) > 0 and scheduled_commits < commit_schedule_max:
+ commit = tasks_sorted[-1][1]
+ self.__log(Criticality.DD, "Add all the tasks using commit {}".format(commit))
+ for t in tasks_sorted:
+ if t[1] == commit:
+ added = self.__force_test_rounds_unlocked__(t[1], t[2], t[3], user_requested=False)
+ if added > 0:
+ self.__log(Criticality.II,
+ "Scheduled {} more runs for the test {} on commit {}".format(added, t[2], commit))
+ total_added += added
+ if total_added > 0:
+ self.__log(Criticality.II, "{}".format(t[4]))
+ scheduled_commits += 1
+ else:
+ self.__log(Criticality.DD, "No work scheduled using commit {}, try another one".format(commit))
+ del tasks_sorted[-1]
if total_added > 0:
- self.__log(Criticality.II, "{}".format(t[4]))
- scheduled_commits += 1
- else:
- self.__log(Criticality.DD, "No work scheduled using commit {}, try another one".format(commit))
- del tasks_sorted[-1]
- if total_added > 0:
- self.__save_state()
- self.__release_lock()
+ self.__save_state()
+ finally:
+ self.__release_lock()
self.__log(Criticality.II, "Done enhancing the report")