summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorVignesh Raman <vignesh.raman@collabora.com>2023-08-25 15:10:07 +0530
committerMarge Bot <emma+marge@anholt.net>2023-12-19 10:09:35 +0000
commitaa0c4078de511730249c4dba2ceebba69b6c8a23 (patch)
tree12bb4242dc039161a7c7512a8a5f42c330146972 /bin
parentb5153693cc38e631b23c741a5eb3b160a01b10e2 (diff)
ci: Add CustomLogger class and CLI tool
This commit introduces the CustomLogger class, which provides methods for updating, creating, and managing dut jobs and phases in a structured log in below json format. { "_timestamp": "2023-10-05T06:16:42.603921", "dut_job_type": "rpi3", "farm": "igalia", "dut_jobs": [ { "status": "pass", "submitter_start_time": "2023-10-05T06:16:42.745862", "dut_start_time": "2023-10-05T06:16:42.819964", "dut_submit_time": "2023-10-05T06:16:45.866096", "dut_end_time": "2023-10-05T06:24:13.533394", "dut_name": "igalia-ci01-rpi3-1gb", "dut_state": "finished", "dut_job_phases": [ { "name": "boot", "start_time": "2023-10-05T06:16:45.865863", "end_time": "2023-10-05T06:17:14.801002" }, { "name": "test", "start_time": "2023-10-05T06:17:14.801009", "end_time": "2023-10-05T06:24:13.610296" } ], "submitter_end_time": "2023-10-05T06:24:13.680729" } ], "job_combined_status": "pass", "dut_attempt_counter": 1 } This class uses the existing StructuredLogger module, which provides a robust and flexible logging utility supporting multiple formats. It also includes a command-line tool for updating, creating, and modifying dut jobs and phases through command-line arguments. This can be used in ci job scripts to update information in structured logs. Unit tests also have been added for the new class. Currently, only LAVA jobs create structured log files, and this will be extended for other jobs using this tool. Signed-off-by: Vignesh Raman <vignesh.raman@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25179>
Diffstat (limited to 'bin')
-rw-r--r--bin/ci/custom_logger.py334
-rw-r--r--bin/ci/test/test_custom_logger.py669
2 files changed, 1003 insertions, 0 deletions
diff --git a/bin/ci/custom_logger.py b/bin/ci/custom_logger.py
new file mode 100644
index 00000000000..7721be2f66e
--- /dev/null
+++ b/bin/ci/custom_logger.py
@@ -0,0 +1,334 @@
+import argparse
+import logging
+from datetime import datetime
+from pathlib import Path
+
+from structured_logger import StructuredLogger
+
+
+class CustomLogger:
+ def __init__(self, log_file):
+ self.log_file = log_file
+ self.logger = StructuredLogger(file_name=self.log_file)
+
+ def get_last_dut_job(self):
+ """
+ Gets the details of the most recent DUT job.
+
+ Returns:
+ dict: Details of the most recent DUT job.
+
+ Raises:
+ ValueError: If no DUT jobs are found in the logger's data.
+ """
+ try:
+ job = self.logger.data["dut_jobs"][-1]
+ except KeyError:
+ raise ValueError(
+ "No DUT jobs found. Please create a job via create_dut_job call."
+ )
+
+ return job
+
+ def update(self, **kwargs):
+ """
+ Updates the log file with provided key-value pairs.
+
+ Args:
+ **kwargs: Key-value pairs to be updated.
+
+ """
+ with self.logger.edit_context():
+ for key, value in kwargs.items():
+ self.logger.data[key] = value
+
+ def create_dut_job(self, **kwargs):
+ """
+ Creates a new DUT job with provided key-value pairs.
+
+ Args:
+ **kwargs: Key-value pairs for the new DUT job.
+
+ """
+ with self.logger.edit_context():
+ if "dut_jobs" not in self.logger.data:
+ self.logger.data["dut_jobs"] = []
+ new_job = {
+ "status": "",
+ "submitter_start_time": datetime.now().isoformat(),
+ "dut_submit_time": "",
+ "dut_start_time": "",
+ "dut_end_time": "",
+ "dut_name": "",
+ "dut_state": "pending",
+ "dut_job_phases": [],
+ **kwargs,
+ }
+ self.logger.data["dut_jobs"].append(new_job)
+
+ def update_dut_job(self, key, value):
+ """
+ Updates the last DUT job with a key-value pair.
+
+ Args:
+ key : The key to be updated.
+ value: The value to be assigned.
+
+ """
+ with self.logger.edit_context():
+ job = self.get_last_dut_job()
+ job[key] = value
+
+ def update_status_fail(self, reason=""):
+ """
+ Sets the status of the last DUT job to 'fail' and logs the failure reason.
+
+ Args:
+ reason (str, optional): The reason for the failure. Defaults to "".
+
+ """
+ with self.logger.edit_context():
+ job = self.get_last_dut_job()
+ job["status"] = "fail"
+ job["dut_job_fail_reason"] = reason
+
+ def create_job_phase(self, phase_name):
+ """
+ Creates a new job phase for the last DUT job.
+
+ Args:
+ phase_name : The name of the new job phase.
+
+ """
+ with self.logger.edit_context():
+ job = self.get_last_dut_job()
+ if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "":
+ # If the last phase exists and its end time is empty, set the end time
+ job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat()
+
+ # Create a new phase
+ phase_data = {
+ "name": phase_name,
+ "start_time": datetime.now().isoformat(),
+ "end_time": "",
+ }
+ job["dut_job_phases"].append(phase_data)
+
+ def check_dut_timings(self, job):
+ """
+ Check the timing sequence of a job to ensure logical consistency.
+
+ The function verifies that the job's submission time is not earlier than its start time and that
+ the job's end time is not earlier than its start time. If either of these conditions is found to be true,
+ an error is logged for each instance of inconsistency.
+
+ Args:
+ job (dict): A dictionary containing timing information of a job. Expected keys are 'dut_start_time',
+ 'dut_submit_time', and 'dut_end_time'.
+
+ Returns:
+ None: This function does not return a value; it logs errors if timing inconsistencies are detected.
+
+ The function checks the following:
+ - If 'dut_start_time' and 'dut_submit_time' are both present and correctly sequenced.
+ - If 'dut_start_time' and 'dut_end_time' are both present and correctly sequenced.
+ """
+
+ # Check if the start time and submit time exist
+ if job.get("dut_start_time") and job.get("dut_submit_time"):
+ # If they exist, check if the submission time is before the start time
+ if job["dut_start_time"] < job["dut_submit_time"]:
+ logging.error("Job submission is happening before job start.")
+
+ # Check if the start time and end time exist
+ if job.get("dut_start_time") and job.get("dut_end_time"):
+ # If they exist, check if the end time is after the start time
+ if job["dut_end_time"] < job["dut_start_time"]:
+ logging.error("Job ended before it started.")
+
+ # Method to update DUT start, submit and end time
+ def update_dut_time(self, value, custom_time):
+ """
+ Updates DUT start, submit, and end times.
+
+ Args:
+ value : Specifies which DUT time to update. Options: 'start', 'submit', 'end'.
+ custom_time : Custom time to set. If None, use current time.
+
+ Raises:
+ ValueError: If an invalid argument is provided for value.
+
+ """
+ with self.logger.edit_context():
+ job = self.get_last_dut_job()
+ timestamp = custom_time if custom_time else datetime.now().isoformat()
+ if value == "start":
+ job["dut_start_time"] = timestamp
+ job["dut_state"] = "running"
+ elif value == "submit":
+ job["dut_submit_time"] = timestamp
+ job["dut_state"] = "submitted"
+ elif value == "end":
+ job["dut_end_time"] = timestamp
+ job["dut_state"] = "finished"
+ else:
+ raise ValueError(
+ "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'."
+ )
+ # check the sanity of the partial structured log
+ self.check_dut_timings(job)
+
+ def close_dut_job(self):
+ """
+ Closes the most recent DUT (Device Under Test) job in the logger's data.
+
+ The method performs the following operations:
+ 1. Validates if there are any DUT jobs in the logger's data.
+ 2. If the last phase of the most recent DUT job has an empty end time, it sets the end time to the current time.
+
+ Raises:
+ ValueError: If no DUT jobs are found in the logger's data.
+ """
+ with self.logger.edit_context():
+ job = self.get_last_dut_job()
+ # Check if the last phase exists and its end time is empty, then set the end time
+ if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "":
+ job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat()
+
+ def close(self):
+ """
+ Closes the most recent DUT (Device Under Test) job in the logger's data.
+
+ The method performs the following operations:
+ 1. Determines the combined status of all DUT jobs.
+ 2. Sets the submitter's end time to the current time.
+ 3. Updates the DUT attempt counter to reflect the total number of DUT jobs.
+
+ """
+ with self.logger.edit_context():
+ job_status = []
+ for job in self.logger.data["dut_jobs"]:
+ if "status" in job:
+ job_status.append(job["status"])
+
+ if not job_status:
+ job_combined_status = "null"
+ else:
+ # Get job_combined_status
+ if "pass" in job_status:
+ job_combined_status = "pass"
+ else:
+ job_combined_status = "fail"
+
+ self.logger.data["job_combined_status"] = job_combined_status
+ self.logger.data["dut_attempt_counter"] = len(self.logger.data["dut_jobs"])
+ job["submitter_end_time"] = datetime.now().isoformat()
+
+
+def process_args(args):
+ # Function to process key-value pairs and call corresponding logger methods
+ def process_key_value_pairs(args_list, action_func):
+ if not args_list:
+ raise ValueError(
+ f"No key-value pairs provided for {action_func.__name__.replace('_', '-')}"
+ )
+ if len(args_list) % 2 != 0:
+ raise ValueError(
+ f"Incomplete key-value pairs for {action_func.__name__.replace('_', '-')}"
+ )
+ kwargs = dict(zip(args_list[::2], args_list[1::2]))
+ action_func(**kwargs)
+
+ # Create a CustomLogger object with the specified log file path
+ custom_logger = CustomLogger(Path(args.log_file))
+
+ if args.update:
+ process_key_value_pairs(args.update, custom_logger.update)
+
+ if args.create_dut_job:
+ process_key_value_pairs(args.create_dut_job, custom_logger.create_dut_job)
+
+ if args.update_dut_job:
+ key, value = args.update_dut_job
+ custom_logger.update_dut_job(key, value)
+
+ if args.create_job_phase:
+ custom_logger.create_job_phase(args.create_job_phase)
+
+ if args.update_status_fail:
+ custom_logger.update_status_fail(args.update_status_fail)
+
+ if args.update_dut_time:
+ if len(args.update_dut_time) == 2:
+ action, custom_time = args.update_dut_time
+ elif len(args.update_dut_time) == 1:
+ action, custom_time = args.update_dut_time[0], None
+ else:
+ raise ValueError("Invalid number of values for --update-dut-time")
+
+ if action in ["start", "end", "submit"]:
+ custom_logger.update_dut_time(action, custom_time)
+ else:
+ raise ValueError(
+ "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'."
+ )
+
+ if args.close_dut_job:
+ custom_logger.close_dut_job()
+
+ if args.close:
+ custom_logger.close()
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Custom Logger Command Line Tool")
+ parser.add_argument("log_file", help="Path to the log file")
+ parser.add_argument(
+ "--update",
+ nargs=argparse.ZERO_OR_MORE,
+ metavar=("key", "value"),
+ help="Update a key-value pair e.g., --update key1 value1 key2 value2)",
+ )
+ parser.add_argument(
+ "--create-dut-job",
+ nargs=argparse.ZERO_OR_MORE,
+ metavar=("key", "value"),
+ help="Create a new DUT job with key-value pairs (e.g., --create-dut-job key1 value1 key2 value2)",
+ )
+ parser.add_argument(
+ "--update-dut-job",
+ nargs=argparse.ZERO_OR_MORE,
+ metavar=("key", "value"),
+ help="Update a key-value pair in DUT job",
+ )
+ parser.add_argument(
+ "--create-job-phase",
+ help="Create a new job phase (e.g., --create-job-phase name)",
+ )
+ parser.add_argument(
+ "--update-status-fail",
+ help="Update fail as the status and log the failure reason (e.g., --update-status-fail reason)",
+ )
+ parser.add_argument(
+ "--update-dut-time",
+ nargs=argparse.ZERO_OR_MORE,
+ metavar=("action", "custom_time"),
+ help="Update DUT start and end time. Provide action ('start', 'submit', 'end') and custom_time (e.g., '2023-01-01T12:00:00')",
+ )
+ parser.add_argument(
+ "--close-dut-job",
+ action="store_true",
+ help="Close the dut job by updating end time of last dut job)",
+ )
+ parser.add_argument(
+ "--close",
+ action="store_true",
+ help="Updates combined status, submitter's end time and DUT attempt counter",
+ )
+ args = parser.parse_args()
+
+ process_args(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/bin/ci/test/test_custom_logger.py b/bin/ci/test/test_custom_logger.py
new file mode 100644
index 00000000000..98ad9c00494
--- /dev/null
+++ b/bin/ci/test/test_custom_logger.py
@@ -0,0 +1,669 @@
+import logging
+import subprocess
+from datetime import datetime
+
+import pytest
+from custom_logger import CustomLogger
+
+
+@pytest.fixture
+def tmp_log_file(tmp_path):
+ return tmp_path / "test_log.json"
+
+
+@pytest.fixture
+def custom_logger(tmp_log_file):
+ return CustomLogger(tmp_log_file)
+
+
+def run_script_with_args(args):
+ import custom_logger
+
+ script_path = custom_logger.__file__
+ return subprocess.run(
+ ["python3", str(script_path), *args], capture_output=True, text=True
+ )
+
+
+# Test case for missing log file
+@pytest.mark.parametrize(
+ "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
+)
+def test_missing_log_file_argument(key, value):
+ result = run_script_with_args(["--update", "key", "value"])
+ assert result.returncode != 0
+
+
+# Parametrize test case for valid update arguments
+@pytest.mark.parametrize(
+ "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
+)
+def test_update_argument_valid(custom_logger, tmp_log_file, key, value):
+ result = run_script_with_args([str(tmp_log_file), "--update", key, value])
+ assert result.returncode == 0
+
+
+# Test case for passing only the key without a value
+def test_update_argument_key_only(custom_logger, tmp_log_file):
+ key = "dut_attempt_counter"
+ result = run_script_with_args([str(tmp_log_file), "--update", key])
+ assert result.returncode != 0
+
+
+# Test case for not passing any key-value pair
+def test_update_argument_no_values(custom_logger, tmp_log_file):
+ result = run_script_with_args([str(tmp_log_file), "--update"])
+ assert result.returncode == 0
+
+
+# Parametrize test case for valid arguments
+@pytest.mark.parametrize(
+ "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
+)
+def test_create_argument_valid(custom_logger, tmp_log_file, key, value):
+ result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
+ assert result.returncode == 0
+
+
+# Test case for passing only the key without a value
+def test_create_argument_key_only(custom_logger, tmp_log_file):
+ key = "dut_attempt_counter"
+ result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key])
+ assert result.returncode != 0
+
+
+# Test case for not passing any key-value pair
+def test_create_argument_no_values(custom_logger, tmp_log_file):
+ result = run_script_with_args([str(tmp_log_file), "--create-dut-job"])
+ assert result.returncode == 0
+
+
+# Test case for updating a DUT job
+@pytest.mark.parametrize(
+ "key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")]
+)
+def test_update_dut_job(custom_logger, tmp_log_file, key, value):
+ result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
+ assert result.returncode != 0
+
+ result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
+ assert result.returncode == 0
+
+ result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
+ assert result.returncode == 0
+
+
+# Test case for updating last DUT job
+def test_update_dut_multiple_job(custom_logger, tmp_log_file):
+ # Create the first DUT job with the first key
+ result = run_script_with_args(
+ [str(tmp_log_file), "--create-dut-job", "status", "hung"]
+ )
+ assert result.returncode == 0
+
+ # Create the second DUT job with the second key
+ result = run_script_with_args(
+ [str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"]
+ )
+ assert result.returncode == 0
+
+ result = run_script_with_args(
+ [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
+ )
+ assert result.returncode == 0
+
+
+# Parametrize test case for valid phase arguments
+@pytest.mark.parametrize(
+ "phase_name",
+ [("Phase1"), ("Phase2"), ("Phase3")],
+)
+def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name):
+ custom_logger.create_dut_job(status="pass")
+
+ result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
+ assert result.returncode == 0
+
+
+# Test case for not passing any arguments for create-job-phase
+def test_create_job_phase_no_arguments(custom_logger, tmp_log_file):
+ custom_logger.create_dut_job(status="pass")
+
+ result = run_script_with_args([str(tmp_log_file), "--create-job-phase"])
+ assert result.returncode != 0
+
+
+# Test case for trying to create a phase job without an existing DUT job
+def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file):
+ phase_name = "Phase1"
+
+ result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
+ assert result.returncode != 0
+
+
+# Combined test cases for valid scenarios
+def test_valid_scenarios(custom_logger, tmp_log_file):
+ valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
+ for key, value in valid_update_args:
+ result = run_script_with_args([str(tmp_log_file), "--update", key, value])
+ assert result.returncode == 0
+
+ valid_create_args = [
+ ("status", "hung"),
+ ("dut_state", "Canceling"),
+ ("dut_name", "asus"),
+ ("phase_name", "Bootloader"),
+ ]
+ for key, value in valid_create_args:
+ result = run_script_with_args(
+ [str(tmp_log_file), "--create-dut-job", key, value]
+ )
+ assert result.returncode == 0
+
+ result = run_script_with_args(
+ [str(tmp_log_file), "--create-dut-job", "status", "hung"]
+ )
+ assert result.returncode == 0
+
+ result = run_script_with_args(
+ [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
+ )
+ assert result.returncode == 0
+
+ result = run_script_with_args(
+ [
+ str(tmp_log_file),
+ "--create-job-phase",
+ "phase_name",
+ ]
+ )
+ assert result.returncode == 0
+
+
+# Parametrize test case for valid update arguments
+@pytest.mark.parametrize(
+ "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
+)
+def test_update(custom_logger, key, value):
+ custom_logger.update(**{key: value})
+ logger_data = custom_logger.logger.data
+
+ assert key in logger_data
+ assert logger_data[key] == value
+
+
+# Test case for updating with a key that already exists
+def test_update_existing_key(custom_logger):
+ key = "status"
+ value = "new_value"
+ custom_logger.logger.data[key] = "old_value"
+ custom_logger.update(**{key: value})
+ logger_data = custom_logger.logger.data
+
+ assert key in logger_data
+ assert logger_data[key] == value
+
+
+# Test case for updating "dut_jobs"
+def test_update_dut_jobs(custom_logger):
+ key1 = "status"
+ value1 = "fail"
+ key2 = "state"
+ value2 = "hung"
+
+ custom_logger.create_dut_job(**{key1: value1})
+ logger_data = custom_logger.logger.data
+
+ job1 = logger_data["dut_jobs"][0]
+ assert key1 in job1
+ assert job1[key1] == value1
+
+ custom_logger.update_dut_job(key2, value2)
+ logger_data = custom_logger.logger.data
+
+ job2 = logger_data["dut_jobs"][0]
+ assert key2 in job2
+ assert job2[key2] == value2
+
+
+# Test case for creating and updating DUT job
+def test_create_dut_job(custom_logger):
+ key = "status"
+ value1 = "pass"
+ value2 = "fail"
+ value3 = "hung"
+
+ reason = "job_combined_status"
+ result = "Finished"
+
+ custom_logger.update(**{reason: result})
+ logger_data = custom_logger.logger.data
+
+ assert reason in logger_data
+ assert logger_data[reason] == result
+
+ # Create the first DUT job
+ custom_logger.create_dut_job(**{key: value1})
+ logger_data = custom_logger.logger.data
+
+ assert "dut_jobs" in logger_data
+ assert isinstance(logger_data["dut_jobs"], list)
+ assert len(logger_data["dut_jobs"]) == 1
+ assert isinstance(logger_data["dut_jobs"][0], dict)
+
+ # Check the values of the keys in the created first DUT job
+ job1 = logger_data["dut_jobs"][0]
+ assert key in job1
+ assert job1[key] == value1
+
+ # Create the second DUT job
+ custom_logger.create_dut_job(**{key: value2})
+ logger_data = custom_logger.logger.data
+
+ assert "dut_jobs" in logger_data
+ assert isinstance(logger_data["dut_jobs"], list)
+ assert len(logger_data["dut_jobs"]) == 2
+ assert isinstance(logger_data["dut_jobs"][1], dict)
+
+ # Check the values of the keys in the created second DUT job
+ job2 = logger_data["dut_jobs"][1]
+ assert key in job2
+ assert job2[key] == value2
+
+ # Update the second DUT job with value3
+ custom_logger.update_dut_job(key, value3)
+ logger_data = custom_logger.logger.data
+
+ # Check the updated value in the second DUT job
+ job2 = logger_data["dut_jobs"][1]
+ assert key in job2
+ assert job2[key] == value3
+
+ # Find the index of the last DUT job
+ last_job_index = len(logger_data["dut_jobs"]) - 1
+
+ # Update the last DUT job
+ custom_logger.update_dut_job("dut_name", "asus")
+ logger_data = custom_logger.logger.data
+
+ # Check the updated value in the last DUT job
+ job2 = logger_data["dut_jobs"][last_job_index]
+ assert "dut_name" in job2
+ assert job2["dut_name"] == "asus"
+
+ # Check that "dut_name" is not present in other DUT jobs
+ for idx, job in enumerate(logger_data["dut_jobs"]):
+ if idx != last_job_index:
+ assert job.get("dut_name") == ""
+
+
+# Test case for updating with missing "dut_jobs" key
+def test_update_dut_job_missing_dut_jobs(custom_logger):
+ key = "status"
+ value = "fail"
+
+ # Attempt to update a DUT job when "dut_jobs" is missing
+ with pytest.raises(ValueError, match="No DUT jobs found."):
+ custom_logger.update_dut_job(key, value)
+
+
+# Test case for creating a job phase
+def test_create_job_phase(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+ phase_name = "Phase1"
+
+ custom_logger.create_job_phase(phase_name)
+ logger_data = custom_logger.logger.data
+
+ assert "dut_jobs" in logger_data
+ assert isinstance(logger_data["dut_jobs"], list)
+ assert len(logger_data["dut_jobs"]) == 1
+
+ job = logger_data["dut_jobs"][0]
+ assert "dut_job_phases" in job
+ assert isinstance(job["dut_job_phases"], list)
+ assert len(job["dut_job_phases"]) == 1
+
+ phase = job["dut_job_phases"][0]
+ assert phase["name"] == phase_name
+ try:
+ datetime.fromisoformat(phase["start_time"])
+ assert True
+ except ValueError:
+ assert False
+ assert phase["end_time"] == ""
+
+
+# Test case for creating multiple phase jobs
+def test_create_multiple_phase_jobs(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+
+ phase_data = [
+ {
+ "phase_name": "Phase1",
+ },
+ {
+ "phase_name": "Phase2",
+ },
+ {
+ "phase_name": "Phase3",
+ },
+ ]
+
+ for data in phase_data:
+ phase_name = data["phase_name"]
+
+ custom_logger.create_job_phase(phase_name)
+
+ logger_data = custom_logger.logger.data
+
+ assert "dut_jobs" in logger_data
+ assert isinstance(logger_data["dut_jobs"], list)
+ assert len(logger_data["dut_jobs"]) == 1
+
+ job = logger_data["dut_jobs"][0]
+ assert "dut_job_phases" in job
+ assert isinstance(job["dut_job_phases"], list)
+ assert len(job["dut_job_phases"]) == len(phase_data)
+
+ for data in phase_data:
+ phase_name = data["phase_name"]
+
+ phase = job["dut_job_phases"][phase_data.index(data)]
+
+ assert phase["name"] == phase_name
+ try:
+ datetime.fromisoformat(phase["start_time"])
+ assert True
+ except ValueError:
+ assert False
+
+ if phase_data.index(data) != len(phase_data) - 1:
+ try:
+ datetime.fromisoformat(phase["end_time"])
+ assert True
+ except ValueError:
+ assert False
+
+ # Check if the end_time of the last phase is an empty string
+ last_phase = job["dut_job_phases"][-1]
+ assert last_phase["end_time"] == ""
+
+
+# Test case for creating multiple dut jobs and updating phase job for last dut job
+def test_create_two_dut_jobs_and_add_phase(custom_logger):
+ # Create the first DUT job
+ custom_logger.create_dut_job(status="pass")
+
+ # Create the second DUT job
+ custom_logger.create_dut_job(status="fail")
+
+ logger_data = custom_logger.logger.data
+
+ assert "dut_jobs" in logger_data
+ assert isinstance(logger_data["dut_jobs"], list)
+ assert len(logger_data["dut_jobs"]) == 2
+
+ first_dut_job = logger_data["dut_jobs"][0]
+ second_dut_job = logger_data["dut_jobs"][1]
+
+ # Add a phase to the second DUT job
+ custom_logger.create_job_phase("Phase1")
+
+ logger_data = custom_logger.logger.data
+
+ assert "dut_jobs" in logger_data
+ assert isinstance(logger_data["dut_jobs"], list)
+ assert len(logger_data["dut_jobs"]) == 2
+
+ first_dut_job = logger_data["dut_jobs"][0]
+ second_dut_job = logger_data["dut_jobs"][1]
+
+ # Check first DUT job does not have a phase
+ assert not first_dut_job.get("dut_job_phases")
+
+ # Check second DUT job has a phase
+ assert second_dut_job.get("dut_job_phases")
+ assert isinstance(second_dut_job["dut_job_phases"], list)
+ assert len(second_dut_job["dut_job_phases"]) == 1
+
+
+# Test case for updating DUT start time
+def test_update_dut_start_time(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+ custom_logger.update_dut_time("start", None)
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "dut_start_time" in dut_job
+ assert dut_job["dut_start_time"] != ""
+
+ try:
+ datetime.fromisoformat(dut_job["dut_start_time"])
+ assert True
+ except ValueError:
+ assert False
+
+
+# Test case for updating DUT submit time
+def test_update_dut_submit_time(custom_logger):
+ custom_time = "2023-11-09T02:37:06Z"
+ custom_logger.create_dut_job(status="pass")
+ custom_logger.update_dut_time("submit", custom_time)
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "dut_submit_time" in dut_job
+
+ try:
+ datetime.fromisoformat(dut_job["dut_submit_time"])
+ assert True
+ except ValueError:
+ assert False
+
+
+# Test case for updating DUT end time
+def test_update_dut_end_time(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+ custom_logger.update_dut_time("end", None)
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "dut_end_time" in dut_job
+
+ try:
+ datetime.fromisoformat(dut_job["dut_end_time"])
+ assert True
+ except ValueError:
+ assert False
+
+
+# Test case for updating DUT time with invalid value
+def test_update_dut_time_invalid_value(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+ with pytest.raises(
+ ValueError,
+ match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.",
+ ):
+ custom_logger.update_dut_time("invalid_value", None)
+
+
+# Test case for close_dut_job
+def test_close_dut_job(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+
+ custom_logger.create_job_phase("Phase1")
+ custom_logger.create_job_phase("Phase2")
+
+ custom_logger.close_dut_job()
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "dut_job_phases" in dut_job
+ dut_job_phases = dut_job["dut_job_phases"]
+
+ phase1 = dut_job_phases[0]
+ assert phase1["name"] == "Phase1"
+
+ try:
+ datetime.fromisoformat(phase1["start_time"])
+ assert True
+ except ValueError:
+ assert False
+
+ try:
+ datetime.fromisoformat(phase1["end_time"])
+ assert True
+ except ValueError:
+ assert False
+
+ phase2 = dut_job_phases[1]
+ assert phase2["name"] == "Phase2"
+
+ try:
+ datetime.fromisoformat(phase2["start_time"])
+ assert True
+ except ValueError:
+ assert False
+
+ try:
+ datetime.fromisoformat(phase2["end_time"])
+ assert True
+ except ValueError:
+ assert False
+
+
+# Test case for close
+def test_close(custom_logger):
+ custom_logger.create_dut_job(status="pass")
+
+ custom_logger.close()
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+ assert "dut_attempt_counter" in logger_data
+ assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"])
+ assert "job_combined_status" in logger_data
+ assert logger_data["job_combined_status"] != ""
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "submitter_end_time" in dut_job
+ try:
+ datetime.fromisoformat(dut_job["submitter_end_time"])
+ assert True
+ except ValueError:
+ assert False
+
+
+# Test case for updating status to fail with a reason
+def test_update_status_fail_with_reason(custom_logger):
+ custom_logger.create_dut_job()
+
+ reason = "kernel panic"
+ custom_logger.update_status_fail(reason)
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "status" in dut_job
+ assert dut_job["status"] == "fail"
+ assert "dut_job_fail_reason" in dut_job
+ assert dut_job["dut_job_fail_reason"] == reason
+
+
+# Test case for updating status to fail without providing a reason
+def test_update_status_fail_without_reason(custom_logger):
+ custom_logger.create_dut_job()
+
+ custom_logger.update_status_fail()
+
+ # Check if the status is updated and fail reason is empty
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ dut_job = logger_data["dut_jobs"][0]
+ assert "status" in dut_job
+ assert dut_job["status"] == "fail"
+ assert "dut_job_fail_reason" in dut_job
+ assert dut_job["dut_job_fail_reason"] == ""
+
+
+# Test case for check_dut_timings with submission time earlier than start time
+def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog):
+ custom_logger.create_dut_job()
+
+ # Set submission time to be earlier than start time
+ custom_logger.update_dut_time("start", "2023-01-01T11:00:00")
+ custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ job = logger_data["dut_jobs"][0]
+
+ # Call check_dut_timings
+ custom_logger.check_dut_timings(job)
+
+ # Check if an error message is logged
+ assert "Job submission is happening before job start." in caplog.text
+
+
+# Test case for check_dut_timings with end time earlier than start time
+def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog):
+ custom_logger.create_dut_job()
+
+ # Set end time to be earlier than start time
+ custom_logger.update_dut_time("end", "2023-01-01T11:00:00")
+ custom_logger.update_dut_time("start", "2023-01-01T12:00:00")
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ job = logger_data["dut_jobs"][0]
+
+ # Call check_dut_timings
+ custom_logger.check_dut_timings(job)
+
+ # Check if an error message is logged
+ assert "Job ended before it started." in caplog.text
+
+
+# Test case for check_dut_timings with valid timing sequence
+def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog):
+ custom_logger.create_dut_job()
+
+ # Set valid timing sequence
+ custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
+ custom_logger.update_dut_time("start", "2023-01-01T12:30:00")
+ custom_logger.update_dut_time("end", "2023-01-01T13:00:00")
+
+ logger_data = custom_logger.logger.data
+ assert "dut_jobs" in logger_data
+ assert len(logger_data["dut_jobs"]) == 1
+
+ job = logger_data["dut_jobs"][0]
+
+ # Call check_dut_timings
+ custom_logger.check_dut_timings(job)
+
+ # Check that no error messages are logged
+ assert "Job submission is happening before job start." not in caplog.text
+ assert "Job ended before it started." not in caplog.text