From d2b3830004d30145291bbef36c22382811500710 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Thu, 21 Nov 2024 17:25:00 +0800 Subject: [PATCH 1/7] Add pytest xml parser class This change adds a separate class for parsing pytest results xml Signed-off-by: Trisha De Vera --- telemetry/gparser/parser.py | 77 +++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/telemetry/gparser/parser.py b/telemetry/gparser/parser.py index 2e0308e..2d773b9 100644 --- a/telemetry/gparser/parser.py +++ b/telemetry/gparser/parser.py @@ -6,6 +6,7 @@ import junitparser import telemetry from junitparser import JUnitXml, Skipped, Error, Failure +import xml.etree.ElementTree as ET def get_parser(url,grabber=None): '''Factory method that provides appropriate parser object base on given url''' @@ -243,13 +244,83 @@ def get_payload_parsed(self): payload_param = param return (payload, payload_param) -class PytestFailure(xmlParser): +class pytestxml_parser(xmlParser): + def __init__(self, url, grabber): + super(pytestxml_parser, self).__init__(url, grabber) + + def get_payload_raw(self): + payload = [] + try: + file_path = self.grabber.download_file(self.url, self.file_name) + # Load and parse the XML using element tree + tree = ET.parse(file_path) + root = tree.getroot() + + # Iterate over all test cases + for testcase in root.findall(".//testcase"): + # Get the name of the test case + test_name = testcase.get("name") + # Find the properties tag + properties = testcase.find("properties") + failure = testcase.find("failure") + error = testcase.find("error") + skipped = testcase.find("skipped") + + if properties is not None: + if failure is not None: + # failure tag content + failure_text = failure.text + fail_content_lines = failure_text.splitlines() + fail_line_statement = fail_content_lines[-4] + # exception statement with parameter names + exc_param_value = (fail_line_statement[1:]).lstrip() + fail_content_list = failure_text.split("@", 1) + param_test = "" + if len(fail_content_list) > 0: + # test parameters and values + param_test = fail_content_list[0] + test_desc = "" + + # Iterate through each property in the properties tag + for prop in properties.findall("property"): + # Get the property name and value + prop_name = prop.get("name") + prop_value = prop.get("value") + if prop_name == "exception_type_and_message": + # update test name with exception type and value + test_name = test_name + ": " + prop_value + " (" + exc_param_value + ")" + if prop_name == "test_description": + # test description + test_desc = prop_value + test_details = [test_name, param_test, test_desc] + payload.append(test_details) + elif error is not None: + payload.append(test_name) + elif skipped is not None: + payload.append(test_name) + + except Exception as ex: + traceback.print_exc() + print("Error Parsing File!") + finally: + os.remove(file_path) + return payload + + def get_payload_parsed(self): + num_payload = len(self.payload_raw) + param = list(range(num_payload)) + + payload = self.payload_raw + payload_param = param + return (payload, payload_param) + +class PytestFailure(pytestxml_parser): def __init__(self, url, grabber): super(PytestFailure, self).__init__(url, grabber) -class PytestSkipped(xmlParser): +class PytestSkipped(pytestxml_parser): def __init__(self, url, grabber): super(PytestSkipped, self).__init__(url, grabber) -class PytestError(xmlParser,): +class PytestError(pytestxml_parser,): def __init__(self, url, grabber): super(PytestError, self).__init__(url, grabber) From 03aaeaeb25ceb96b00c4d422be0be9ce111c87d9 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Thu, 21 Nov 2024 17:31:43 +0800 Subject: [PATCH 2/7] Update structure of generated pytest results This change updates the stucture of pytest results to gistfor markdown template Signed-off-by: Trisha De Vera --- telemetry/report/markdown.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/telemetry/report/markdown.py b/telemetry/report/markdown.py index 20ab917..c6dec1e 100644 --- a/telemetry/report/markdown.py +++ b/telemetry/report/markdown.py @@ -73,9 +73,27 @@ def generate_param(self,data): else: iio_drivers_missing_details = "No missing drivers" if len(info["missing_devs"]) == 0 else ("
").join(info["missing_devs"]) iio_drivers_found_details = "No iio drivers found" if len(info["enumerated_devs"]) == 0 else ("
").join(info["enumerated_devs"]) - dmesg_errors_found_details = "No errors" if len(info["dmesg_err"]) == 0 else ("
").join(info["dmesg_err"]) - pytest_failures_details = "No failures" if len(info["pytest_failure"]) == 0 else ("
").join(info["pytest_failure"]) + dmesg_errors_found_details = "No errors" if len(info["dmesg_err"]) == 0 else ("
").join(info["dmesg_err"]) + pytest_failures_details = "No failures" pytest_failures_details = "Invalid" if pytest_tests_status == "⛔" else pytest_failures_details + pytest_details = [] # list of all pytest failure details + if len(info["pytest_failure"]) != 0: + if isinstance(info["pytest_failure"][0], list): + if len(info["pytest_failure"][0]) > 0: + for items in info["pytest_failure"][0]: + # add the details of first test case in the list as separate items (no changes) + pytest_details.append(items) + for data in info["pytest_failure"][1:]: + if isinstance(data, list): + if len(data) > 0: + # add the details of remaining test cases in the list and add "* " to data[0] (test case name) for markdown + pytest_details.append(f"* {data[0]}") + for details in data[1:]: + pytest_details.append(details) + # create structure of pytest failure details for markdown + pytest_failures_details = ("\n\n").join(pytest_details) + else: + pytest_failures_details = ("
").join(info["pytest_failure"]) last_failing_stage = str(info["last_failing_stage"]) last_failing_stage_failure = str(info["last_failing_stage_failure"]) From bb5293c494922ebf4063b0b607983c6f01d16571 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Thu, 21 Nov 2024 17:33:47 +0800 Subject: [PATCH 3/7] Add parameter for pytest_failure parser test --- tests/test_gparser.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_gparser.py b/tests/test_gparser.py index e564c45..de6d36f 100644 --- a/tests/test_gparser.py +++ b/tests/test_gparser.py @@ -50,6 +50,7 @@ def test_get_parser(artifact, parser_object, parser_type): ('zynq-zed-adv7511-adrv9002-rx2tx2-vcmos_missing_devs.log', \ telemetry.parser.MissingDevs, 'missing_devs'), ('info.txt', telemetry.parser.InfoTxt, 'info_txt'), + ('zynqmp_zcu102_rev10_fmcdaq3_reports.xml', telemetry.parser.PytestFailure, 'pytest_failure'), ] ) def test_parser(artifact, parser_object, parser_type): From 98863a6cfa54a52b2b4df68aad59906d1de7b8c8 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Thu, 21 Nov 2024 17:39:00 +0800 Subject: [PATCH 4/7] Add test artifact for pytest_failure parser test Signed-off-by: Trisha De Vera --- .../zynqmp_zcu102_rev10_fmcdaq3_reports.xml | 265 ++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml diff --git a/tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml b/tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml new file mode 100644 index 0000000..9344745 --- /dev/null +++ b/tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml @@ -0,0 +1,265 @@ +test_dds_loopback = <function dds_loopback at 0x000002528402BF60>, iio_uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 0, frequency = 10000000, scale = 0.06, peak_min = -50 + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0, 1]) + @pytest.mark.parametrize("param_set", [dict()]) + @pytest.mark.parametrize( + "frequency, scale", + [ + (5000000, 0.12), + (10000000, 0.06), + (10000000, 0.12), + (15000000, 0.12), + (15000000, 0.5), + (200000000, 0.5), + ], + ) + @pytest.mark.parametrize("peak_min", [-50]) + def test_daq3_dds_loopback( + test_dds_loopback, + iio_uri, + classname, + param_set, + channel, + frequency, + scale, + peak_min, + ): +> test_dds_loopback( + iio_uri, classname, param_set, channel, frequency, scale, peak_min + ) + +test\test_daq3_p.py:50: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 0, frequency = 10000000, scale = 0.06, peak_min = -50, use_obs = False, use_rx2 = False + + def dds_loopback( + uri, + classname, + param_set, + channel, + frequency, + scale, + peak_min, + use_obs=False, + use_rx2=False, + ): + """dds_loopback: Test DDS loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. TX FPGA DDSs are used to generate a sinusoid + which is then estimated on the RX side. The receive tone must be within + 1% of its expected frequency with a specified peak + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + frequency: type=integer + Frequency in Hz of transmitted tone + scale: type=float + Scale of DDS tone. Range [0,1] + peak_min: type=float + Minimum acceptable value of maximum peak in dBFS of received tone + + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + # Set common buffer settings + sdr.tx_cyclic_buffer = True + N = 2 ** 14 + + if use_obs and use_rx2: + raise Exception("Both RX2 and OBS are selected. Select one at a time.") + + if use_rx2: + sdr.rx2_enabled_channels = [channel] + sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) + elif use_obs: + sdr.obs.rx_enabled_channels = [0] + sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + else: + RXFS = int(sdr.orx_sample_rate) if use_obs else int(sdr.rx_sample_rate) + + sdr.dds_single_tone(frequency, scale, channel) + + # Pass through SDR + try: + for _ in range(10): # Wait + data = sdr.rx2() if use_rx2 else sdr.obs.rx() if use_obs else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15, plot=False) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - frequency) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + + assert (frequency * 0.01) > diff +> assert tone_peaks[indx] > peak_min +E assert -53.914503994625996 > -50 + +test\dma_tests.py:341: AssertionErrortest_dds_loopback = <function dds_loopback at 0x000002528402BF60>, iio_uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 1, frequency = 10000000, scale = 0.06, peak_min = -50 + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0, 1]) + @pytest.mark.parametrize("param_set", [dict()]) + @pytest.mark.parametrize( + "frequency, scale", + [ + (5000000, 0.12), + (10000000, 0.06), + (10000000, 0.12), + (15000000, 0.12), + (15000000, 0.5), + (200000000, 0.5), + ], + ) + @pytest.mark.parametrize("peak_min", [-50]) + def test_daq3_dds_loopback( + test_dds_loopback, + iio_uri, + classname, + param_set, + channel, + frequency, + scale, + peak_min, + ): +> test_dds_loopback( + iio_uri, classname, param_set, channel, frequency, scale, peak_min + ) + +test\test_daq3_p.py:50: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 1, frequency = 10000000, scale = 0.06, peak_min = -50, use_obs = False, use_rx2 = False + + def dds_loopback( + uri, + classname, + param_set, + channel, + frequency, + scale, + peak_min, + use_obs=False, + use_rx2=False, + ): + """dds_loopback: Test DDS loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. TX FPGA DDSs are used to generate a sinusoid + which is then estimated on the RX side. The receive tone must be within + 1% of its expected frequency with a specified peak + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + frequency: type=integer + Frequency in Hz of transmitted tone + scale: type=float + Scale of DDS tone. Range [0,1] + peak_min: type=float + Minimum acceptable value of maximum peak in dBFS of received tone + + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + # Set common buffer settings + sdr.tx_cyclic_buffer = True + N = 2 ** 14 + + if use_obs and use_rx2: + raise Exception("Both RX2 and OBS are selected. Select one at a time.") + + if use_rx2: + sdr.rx2_enabled_channels = [channel] + sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) + elif use_obs: + sdr.obs.rx_enabled_channels = [0] + sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + else: + RXFS = int(sdr.orx_sample_rate) if use_obs else int(sdr.rx_sample_rate) + + sdr.dds_single_tone(frequency, scale, channel) + + # Pass through SDR + try: + for _ in range(10): # Wait + data = sdr.rx2() if use_rx2 else sdr.obs.rx() if use_obs else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15, plot=False) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - frequency) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + + assert (frequency * 0.01) > diff +> assert tone_peaks[indx] > peak_min +E assert -54.11450058246225 > -50 + +test\dma_tests.py:341: AssertionError \ No newline at end of file From 5e529e82deb0ded6225bc212978de3a077cc16e7 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Tue, 3 Dec 2024 20:25:01 +0800 Subject: [PATCH 5/7] Update pytest xml parser and pytest markdown results Signed-off-by: Trisha De Vera --- telemetry/gparser/parser.py | 128 ++++++++++++++++++++++++++++------- telemetry/report/markdown.py | 27 +++----- 2 files changed, 110 insertions(+), 45 deletions(-) diff --git a/telemetry/gparser/parser.py b/telemetry/gparser/parser.py index 2d773b9..6f47b3f 100644 --- a/telemetry/gparser/parser.py +++ b/telemetry/gparser/parser.py @@ -7,6 +7,7 @@ import telemetry from junitparser import JUnitXml, Skipped, Error, Failure import xml.etree.ElementTree as ET +import ast def get_parser(url,grabber=None): '''Factory method that provides appropriate parser object base on given url''' @@ -262,42 +263,117 @@ def get_payload_raw(self): test_name = testcase.get("name") # Find the properties tag properties = testcase.find("properties") - failure = testcase.find("failure") - error = testcase.find("error") - skipped = testcase.find("skipped") + # Find the failure tag + failure = testcase.find("failure") + + # Test description links from pyadi-iio + attr_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html" + dma_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_dma.html" + generic_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_generics.html" + # Set default description link for all tests + test_name_link = f"[{test_name}]({attr_link})" + + url_links = [attr_link, dma_link, generic_link] + + # Get parameters from test case name + param_parsed = test_name.split("[", 1) + test_name = param_parsed[0] + param_parsed_last = (param_parsed[-1])[:-1].strip() + # Separate the parameters + param_separate = re.split(r'-(?!\d)', param_parsed_last) + # Check parameters list for a dictionary named "param_set" + for index, param in enumerate(param_separate): + # Check if param_set parameter name exists + param_list = [] + # Remove param_set= in string + new_param_split = param.split("=", 1) + param_name = new_param_split[0] + new_param = new_param_split[1] + # Check if param is a dictionary and not empty + if new_param[0] == "{" and new_param != "\{\}": + # Convert remaining string to a dictionary + param_dict = ast.literal_eval(new_param) + if param_dict: + for key, value in param_dict.items(): + # Convert parameters of param_set back to string + new_updated = "'" + key + "'" + ": " + str(value) + param_list.append(new_updated) + insert_param_name = param_name + "=" + param_list.insert(0, insert_param_name) + param_list_string = " \n".join(param_list) + # Update param in param_separate list + param_separate[index] = param_list_string + # Compile final parameter details + param_separate.insert(0,"**Parameters:**") + param_display = "\n - ".join(param_separate) if properties is not None: if failure is not None: - # failure tag content + # Get failure tag content failure_text = failure.text fail_content_lines = failure_text.splitlines() - fail_line_statement = fail_content_lines[-4] - # exception statement with parameter names - exc_param_value = (fail_line_statement[1:]).lstrip() - fail_content_list = failure_text.split("@", 1) - param_test = "" - if len(fail_content_list) > 0: - # test parameters and values - param_test = fail_content_list[0] - test_desc = "" - + exc_param_value = "" + # Get exception statement with parameter names + for item in fail_content_lines[::-1]: + if len(item) > 0: + if item[0] == ">": + exc_param_value = item[1:].lstrip() + break + + test_name_function = "" + test_function_module = "" + exctype_message = "" # Iterate through each property in the properties tag for prop in properties.findall("property"): # Get the property name and value prop_name = prop.get("name") prop_value = prop.get("value") - if prop_name == "exception_type_and_message": - # update test name with exception type and value - test_name = test_name + ": " + prop_value + " (" + exc_param_value + ")" - if prop_name == "test_description": - # test description - test_desc = prop_value - test_details = [test_name, param_test, test_desc] - payload.append(test_details) - elif error is not None: - payload.append(test_name) - elif skipped is not None: - payload.append(test_name) + if prop_name == "exception_type_and_message": + prop_list = prop_value.splitlines() + new_props = prop_value + prop_list_updated = [] + # Check if exception and message has mutiple lines + if len(prop_list) > 1: + for props in prop_list: + if len(props) > 0: + # Remove leading spaces + prop_strip = props.lstrip() + prop_list_updated.append(prop_strip) + if len(prop_list_updated) > 0: + new_props = " ".join(prop_list_updated) + # Combine exception type, message, and parameters + exctype_message = "\n" + " " + new_props + " ( " + exc_param_value + " )" + # Get test name function + if prop_name == "test_name_function": + # Get test description + test_name_function = prop_value + if prop_name == "test_function_module": + test_function_module = prop_value + + # Create dictionary of pyadi-iio test module links + test_links = { + "test.attr_tests" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html", + "test.dma_tests" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_dma.html", + "test.generics" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_generics.html" + } + + # Set test description link + if test_function_module != "": + if test_name_function != "": + if test_function_module in test_links: + test_permalink = test_links[test_function_module] + "#" + test_function_module + "." + test_name_function + test_name_link = f"[{test_name}]({test_permalink})" + else: + if test_function_module in test_links: + test_permalink = test_links[test_function_module] + test_name_link = f"[{test_name}]({test_permalink})" + + # Compile the test details + test_details = [test_name_link, param_display] + test_details_final = "

".join(test_details) + test_details_final = test_details_final + "\n\n" + exctype_message + + payload.append(test_details_final) except Exception as ex: traceback.print_exc() diff --git a/telemetry/report/markdown.py b/telemetry/report/markdown.py index c6dec1e..3a25df0 100644 --- a/telemetry/report/markdown.py +++ b/telemetry/report/markdown.py @@ -73,27 +73,16 @@ def generate_param(self,data): else: iio_drivers_missing_details = "No missing drivers" if len(info["missing_devs"]) == 0 else ("
").join(info["missing_devs"]) iio_drivers_found_details = "No iio drivers found" if len(info["enumerated_devs"]) == 0 else ("
").join(info["enumerated_devs"]) - dmesg_errors_found_details = "No errors" if len(info["dmesg_err"]) == 0 else ("
").join(info["dmesg_err"]) - pytest_failures_details = "No failures" + dmesg_errors_found_details = "No errors" if len(info["dmesg_err"]) == 0 else ("
").join(info["dmesg_err"]) + pytest_failures_details = "No failures" pytest_failures_details = "Invalid" if pytest_tests_status == "⛔" else pytest_failures_details - pytest_details = [] # list of all pytest failure details + pytest_details = [] if len(info["pytest_failure"]) != 0: - if isinstance(info["pytest_failure"][0], list): - if len(info["pytest_failure"][0]) > 0: - for items in info["pytest_failure"][0]: - # add the details of first test case in the list as separate items (no changes) - pytest_details.append(items) - for data in info["pytest_failure"][1:]: - if isinstance(data, list): - if len(data) > 0: - # add the details of remaining test cases in the list and add "* " to data[0] (test case name) for markdown - pytest_details.append(f"* {data[0]}") - for details in data[1:]: - pytest_details.append(details) - # create structure of pytest failure details for markdown - pytest_failures_details = ("\n\n").join(pytest_details) - else: - pytest_failures_details = ("
").join(info["pytest_failure"]) + pytest_details.append(info["pytest_failure"][0]) + for item in info["pytest_failure"][1:]: + item_update = "- " + item + pytest_details.append(item_update) + pytest_failures_details = ("\n\n").join(pytest_details) last_failing_stage = str(info["last_failing_stage"]) last_failing_stage_failure = str(info["last_failing_stage_failure"]) From b3c5ccc8f1f11bcbc6c5160b51773a1298a8d495 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Fri, 20 Dec 2024 18:29:32 +0800 Subject: [PATCH 6/7] Update test artifact and parameter for pytest failure Signed-off-by: Trisha De Vera --- ...qmp_zcu102_rev10_ad9081_vm4_l8_reports.xml | 1102 +++++++++++++++++ .../zynqmp_zcu102_rev10_fmcdaq3_reports.xml | 265 ---- tests/test_gparser.py | 2 +- 3 files changed, 1103 insertions(+), 266 deletions(-) create mode 100644 tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml delete mode 100644 tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml diff --git a/tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml b/tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml new file mode 100644 index 0000000..a213a70 --- /dev/null +++ b/tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml @@ -0,0 +1,1102 @@ +test_attribute_multiple_values = <function attribute_multiple_values at 0x7bb6970d8af0> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', attr = 'rx_test_mode' +val = ['midscale_short', 'pos_fullscale', 'neg_fullscale', 'checkerboard', 'pn23', 'pn9', ...] + + @pytest.mark.iio_hardware(hardware) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize( + "attr, val", + [ + ("rx_nyquist_zone", ["even", "odd"]), + ("loopback_mode", [0]), + ( + "rx_test_mode", + [ + "midscale_short", + "pos_fullscale", + "neg_fullscale", + "checkerboard", + "pn23", + "pn9", + "one_zero_toggle", + "user", + "pn7", + "pn15", + "pn31", + "ramp", + "off", + ], + ), + ( + "tx_main_ffh_mode", + ["phase_continuous", "phase_incontinuous", "phase_coherent"], + ), + ], + ) + def test_ad9081_str_attr(test_attribute_multiple_values, iio_uri, classname, attr, val): +> test_attribute_multiple_values(iio_uri, classname, attr, val, 0) + +test/test_ad9081.py:65: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ +test/attr_tests.py:221: in attribute_multiple_values + assert dev_interface(uri, classname, val, attr, tol, sleep=sleep) +test/common.py:114: in dev_interface + setattr(sdr, attr, val) +adi/ad9081.py:268: in rx_test_mode + self._set_iio_attr_single( +adi/ad9081.py:170: in _set_iio_attr_single + return self._set_iio_attr(channel_name, attr, output, value, _ctrl) +adi/attribute.py:71: in _set_iio_attr + raise ex +adi/attribute.py:69: in _set_iio_attr + channel.attrs[attr_name].value = str(value) +/usr/local/lib/python3.8/dist-packages/iio.py:704: in <lambda> + lambda self, x: self._write(x), +/usr/local/lib/python3.8/dist-packages/iio.py:736: in _write + _c_write_attr(self._channel, self._name_ascii, value.encode("ascii")) +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +result = -22, func = <_FuncPtr object at 0x7bb6c56de880> +arguments = (<iio.LP__Channel object at 0x7bb69f77a5c0>, b'test_mode', b'ramp') + + def _check_negative(result, func, arguments): + if result >= 0: + return result +> raise OSError(-result, _strerror(-result)) +E OSError: [Errno 22] Invalid argument + +/usr/local/lib/python3.8/dist-packages/iio.py:62: OSError/var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:121: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:121: Skipping test: Channel 3not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:131: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:131: Skipping test: Channel 3not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:163: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:163: Skipping test: Channel 3not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:194: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:194: Skipping test: Channel 3not available.test_sfdr = <function t_sfdr at 0x7bb6a9ca9820>, iio_uri = 'ip:192.168.10.114' +classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...} +sfdr_min = 70 + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_test_tone_en=[0, 0, 0, 0], + tx_main_nco_test_tone_en=[0, 0, 0, 0], + ) + ], + ) + @pytest.mark.parametrize("sfdr_min", [70]) + def test_ad9081_sfdr(test_sfdr, iio_uri, classname, channel, param_set, sfdr_min): + param_set = scale_field(param_set, iio_uri) +> test_sfdr(iio_uri, classname, channel, param_set, sfdr_min, full_scale=0.5) + +test/test_ad9081.py:232: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...} +sfdr_min = 70, use_obs = False, full_scale = 0.5 + + def t_sfdr(uri, classname, channel, param_set, sfdr_min, use_obs=False, full_scale=0.9): + """t_sfdr: Test SFDR loopback of tone with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. Sinuoidal data is passed to DMAs which is then + estimated on the RX side. The peak and second peak are determined in + the received signal to determine the sfdr. + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + sfdr_min: type=float + Minimum acceptable value of SFDR in dB + + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + time.sleep(5) # Wait for QEC to kick in + # Set common buffer settings + N = 2 ** 14 + sdr.tx_cyclic_buffer = True + sdr.tx_enabled_channels = [channel] + sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels) + + if use_obs: + sdr.obs.rx_enabled_channels = [0] + sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + else: + RXFS = int(sdr.rx_sample_rate) + + if hasattr(sdr, "tx_sample_rate"): + FS = int(sdr.tx_sample_rate) + else: + FS = RXFS + + fc = FS * 0.1 + fc = int(fc / (FS / N)) * (FS / N) + ts = 1 / float(FS) + t = np.arange(0, N * ts, ts) + i = np.cos(2 * np.pi * t * fc) * 2 ** 15 * full_scale + q = np.sin(2 * np.pi * t * fc) * 2 ** 15 * full_scale + iq = i + 1j * q + # Pass through SDR + try: + sdr.tx(iq) + time.sleep(3) + for _ in range(10): # Wait for IQ correction to stabilize + data = sdr.obs.rx() if use_obs else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + val, amp, freqs = spec.sfdr(data, fs=RXFS, plot=False) + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + freqs, + amp, + "Frequency (Hz)", + "Amplitude (dBFS)", + "SDFR {} dBc ({})".format(val, classname), + ) + } + print("SFDR:", val, "dB") +> assert val > sfdr_min +E assert 0.5749985164147731 > 70 + +test/dma_tests.py:733: AssertionErrortest_dds_loopback = <function dds_loopback at 0x7bb6a9ca95e0> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081' +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...} +channel = 0, frequency = 10000000, scale = 0.5, peak_min = -30 + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize("frequency, scale", [(10000000, 0.5)]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ) + ], + ) + @pytest.mark.parametrize("peak_min", [-30]) + def test_ad9081_dds_loopback( + test_dds_loopback, + iio_uri, + classname, + param_set, + channel, + frequency, + scale, + peak_min, + ): + param_set = scale_field(param_set, iio_uri) +> test_dds_loopback( + iio_uri, classname, param_set, channel, frequency, scale, peak_min + ) + +test/test_ad9081.py:268: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081' +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...} +channel = 0, frequency = 10000000, scale = 0.5, peak_min = -30, use_obs = False +use_rx2 = False + + def dds_loopback( + uri, + classname, + param_set, + channel, + frequency, + scale, + peak_min, + use_obs=False, + use_rx2=False, + ): + """dds_loopback: Test DDS loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. TX FPGA DDSs are used to generate a sinusoid + which is then estimated on the RX side. The receive tone must be within + 1% of its expected frequency with a specified peak + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + frequency: type=integer + Frequency in Hz of transmitted tone + scale: type=float + Scale of DDS tone. Range [0,1] + peak_min: type=float + Minimum acceptable value of maximum peak in dBFS of received tone + + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + # Set common buffer settings + sdr.tx_cyclic_buffer = True + N = 2 ** 14 + + if use_obs and use_rx2: + raise Exception("Both RX2 and OBS are selected. Select one at a time.") + + if use_rx2: + sdr.rx2_enabled_channels = [channel] + sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) + elif use_obs: + sdr.obs.rx_enabled_channels = [0] + sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + else: + RXFS = int(sdr.orx_sample_rate) if use_obs else int(sdr.rx_sample_rate) + + sdr.dds_single_tone(frequency, scale, channel) + + # Pass through SDR + try: + for _ in range(10): # Wait + data = sdr.rx2() if use_rx2 else sdr.obs.rx() if use_obs else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15, plot=False) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - frequency) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + +> assert (frequency * 0.01) > diff +E assert (10000000 * 0.01) > 607431640.625 + +test/dma_tests.py:340: AssertionErrortest_iq_loopback = <function cw_loopback at 0x7bb6a9ca9790> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [1234567, 1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [500000000, 500000000], ...} + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000], + tx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000], + rx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567], + tx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000], + tx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000], + rx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567], + tx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + ], + ) + def test_ad9081_iq_loopback(test_iq_loopback, iio_uri, classname, channel, param_set): + param_set = scale_field(param_set, iio_uri) +> test_iq_loopback(iio_uri, classname, channel, param_set) + +test/test_ad9081.py:317: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [1234567, 1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [500000000, 500000000], ...} +use_tx2 = False, use_rx2 = False + + def cw_loopback(uri, classname, channel, param_set, use_tx2=False, use_rx2=False): + """cw_loopback: Test CW loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. Sinuoidal data is passed to DMAs which is then + estimated on the RX side. The receive tone must be within + 1% of its expected frequency at the max peak found + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + use_tx2: type=bool + Boolean if set will use tx2() as tx method + use_rx2: type=bool + Boolean if set will use rx2() as rx method + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + time.sleep(1) + # Verify still set + for p in param_set.keys(): + if isinstance(param_set[p], str): + assert getattr(sdr, p) == param_set[p] + else: + assert ( + np.argmax(np.abs(np.array(getattr(sdr, p)) - np.array(param_set[p]))) + < 4 + ) + # Set common buffer settings + N = 2 ** 14 + if use_tx2: + sdr.tx2_cyclic_buffer = True + sdr.tx2_enabled_channels = [channel] + sdr.tx2_buffer_size = N * 2 * len(sdr.tx2_enabled_channels) + else: + sdr.tx_cyclic_buffer = True + sdr.tx_enabled_channels = [channel] + sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels) + + if use_rx2: + sdr.rx2_enabled_channels = [channel] + sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + elif hasattr(sdr, "rx_sample_rate"): + RXFS = int(sdr.rx_sample_rate) + else: + """no sample_rate nor rx_sample_rate. Let's try something like + rx($channel)_sample_rate""" + attr = "rx" + str(channel) + "_sample_rate" + RXFS = int(getattr(sdr, attr)) + + A = 2 ** 15 + if hasattr(sdr, "tx_sample_rate"): + FS = int(sdr.tx_sample_rate) + else: + FS = RXFS + fc = FS * 0.1 + fc = int(fc / (FS / N)) * (FS / N) + + ts = 1 / float(FS) + t = np.arange(0, N * ts, ts) + if sdr._complex_data: + i = np.cos(2 * np.pi * t * fc) * A * 0.5 + q = np.sin(2 * np.pi * t * fc) * A * 0.5 + cw = i + 1j * q + else: + cw = np.cos(2 * np.pi * t * fc) * A * 1 + + # Pass through SDR + try: + if use_tx2: + sdr.tx2(cw) + else: + sdr.tx(cw) + for _ in range(30): # Wait to stabilize + data = sdr.rx2() if use_rx2 else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + # tone_freq = freq_est(data, RXFS) + # diff = np.abs(tone_freq - fc) + # print("Peak: @"+str(tone_freq) ) + # assert (fc * 0.01) > diff + + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=A, plot=False) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - fc) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + +> assert (fc * 0.01) > diff +E assert (149963378.90625 * 0.01) > 802505493.1640625 + +test/dma_tests.py:648: AssertionErrortest_iq_loopback = <function cw_loopback at 0x7bb6a9ca9790> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [-1234567, -1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [750000000, 750000000], ...} + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000], + tx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000], + rx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567], + tx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000], + tx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000], + rx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567], + tx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + ], + ) + def test_ad9081_iq_loopback(test_iq_loopback, iio_uri, classname, channel, param_set): + param_set = scale_field(param_set, iio_uri) +> test_iq_loopback(iio_uri, classname, channel, param_set) + +test/test_ad9081.py:317: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [-1234567, -1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [750000000, 750000000], ...} +use_tx2 = False, use_rx2 = False + + def cw_loopback(uri, classname, channel, param_set, use_tx2=False, use_rx2=False): + """cw_loopback: Test CW loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. Sinuoidal data is passed to DMAs which is then + estimated on the RX side. The receive tone must be within + 1% of its expected frequency at the max peak found + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + use_tx2: type=bool + Boolean if set will use tx2() as tx method + use_rx2: type=bool + Boolean if set will use rx2() as rx method + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + time.sleep(1) + # Verify still set + for p in param_set.keys(): + if isinstance(param_set[p], str): + assert getattr(sdr, p) == param_set[p] + else: + assert ( + np.argmax(np.abs(np.array(getattr(sdr, p)) - np.array(param_set[p]))) + < 4 + ) + # Set common buffer settings + N = 2 ** 14 + if use_tx2: + sdr.tx2_cyclic_buffer = True + sdr.tx2_enabled_channels = [channel] + sdr.tx2_buffer_size = N * 2 * len(sdr.tx2_enabled_channels) + else: + sdr.tx_cyclic_buffer = True + sdr.tx_enabled_channels = [channel] + sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels) + + if use_rx2: + sdr.rx2_enabled_channels = [channel] + sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + elif hasattr(sdr, "rx_sample_rate"): + RXFS = int(sdr.rx_sample_rate) + else: + """no sample_rate nor rx_sample_rate. Let's try something like + rx($channel)_sample_rate""" + attr = "rx" + str(channel) + "_sample_rate" + RXFS = int(getattr(sdr, attr)) + + A = 2 ** 15 + if hasattr(sdr, "tx_sample_rate"): + FS = int(sdr.tx_sample_rate) + else: + FS = RXFS + fc = FS * 0.1 + fc = int(fc / (FS / N)) * (FS / N) + + ts = 1 / float(FS) + t = np.arange(0, N * ts, ts) + if sdr._complex_data: + i = np.cos(2 * np.pi * t * fc) * A * 0.5 + q = np.sin(2 * np.pi * t * fc) * A * 0.5 + cw = i + 1j * q + else: + cw = np.cos(2 * np.pi * t * fc) * A * 1 + + # Pass through SDR + try: + if use_tx2: + sdr.tx2(cw) + else: + sdr.tx(cw) + for _ in range(30): # Wait to stabilize + data = sdr.rx2() if use_rx2 else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + # tone_freq = freq_est(data, RXFS) + # diff = np.abs(tone_freq - fc) + # print("Peak: @"+str(tone_freq) ) + # assert (fc * 0.01) > diff + + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=A, plot=False) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - fc) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + +> assert (fc * 0.01) > diff +E assert (149963378.90625 * 0.01) > 606536865.234375 + +test/dma_tests.py:648: AssertionErrortest_iq_loopback = <function cw_loopback at 0x7bb6a9ca9790> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...} + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000], + tx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000], + rx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567], + tx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000], + tx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000], + rx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567], + tx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + rx_main_nco_phases=[0, 0, 0, 0], + tx_main_nco_phases=[0, 0, 0, 0], + rx_channel_nco_phases=[0, 0, 0, 0], + tx_channel_nco_phases=[0, 0, 0, 0], + ), + ], + ) + def test_ad9081_iq_loopback(test_iq_loopback, iio_uri, classname, channel, param_set): + param_set = scale_field(param_set, iio_uri) +> test_iq_loopback(iio_uri, classname, channel, param_set) + +test/test_ad9081.py:317: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0 +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...} +use_tx2 = False, use_rx2 = False + + def cw_loopback(uri, classname, channel, param_set, use_tx2=False, use_rx2=False): + """cw_loopback: Test CW loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. Sinuoidal data is passed to DMAs which is then + estimated on the RX side. The receive tone must be within + 1% of its expected frequency at the max peak found + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + use_tx2: type=bool + Boolean if set will use tx2() as tx method + use_rx2: type=bool + Boolean if set will use rx2() as rx method + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + time.sleep(1) + # Verify still set + for p in param_set.keys(): + if isinstance(param_set[p], str): + assert getattr(sdr, p) == param_set[p] + else: + assert ( + np.argmax(np.abs(np.array(getattr(sdr, p)) - np.array(param_set[p]))) + < 4 + ) + # Set common buffer settings + N = 2 ** 14 + if use_tx2: + sdr.tx2_cyclic_buffer = True + sdr.tx2_enabled_channels = [channel] + sdr.tx2_buffer_size = N * 2 * len(sdr.tx2_enabled_channels) + else: + sdr.tx_cyclic_buffer = True + sdr.tx_enabled_channels = [channel] + sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels) + + if use_rx2: + sdr.rx2_enabled_channels = [channel] + sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) + else: + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + elif hasattr(sdr, "rx_sample_rate"): + RXFS = int(sdr.rx_sample_rate) + else: + """no sample_rate nor rx_sample_rate. Let's try something like + rx($channel)_sample_rate""" + attr = "rx" + str(channel) + "_sample_rate" + RXFS = int(getattr(sdr, attr)) + + A = 2 ** 15 + if hasattr(sdr, "tx_sample_rate"): + FS = int(sdr.tx_sample_rate) + else: + FS = RXFS + fc = FS * 0.1 + fc = int(fc / (FS / N)) * (FS / N) + + ts = 1 / float(FS) + t = np.arange(0, N * ts, ts) + if sdr._complex_data: + i = np.cos(2 * np.pi * t * fc) * A * 0.5 + q = np.sin(2 * np.pi * t * fc) * A * 0.5 + cw = i + 1j * q + else: + cw = np.cos(2 * np.pi * t * fc) * A * 1 + + # Pass through SDR + try: + if use_tx2: + sdr.tx2(cw) + else: + sdr.tx(cw) + for _ in range(30): # Wait to stabilize + data = sdr.rx2() if use_rx2 else sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + # tone_freq = freq_est(data, RXFS) + # diff = np.abs(tone_freq - fc) + # print("Peak: @"+str(tone_freq) ) + # assert (fc * 0.01) > diff + + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=A, plot=False) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - fc) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + +> assert (fc * 0.01) > diff +E assert (149963378.90625 * 0.01) > 590194702.1484375 + +test/dma_tests.py:648: AssertionErrortest_tone_loopback = <function nco_loopback at 0x7bb6a9ca9700> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081' +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [0, 0], ...} +channel = 0, frequency = 10000000, peak_min = -30 + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize("frequency", [10000000]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1010000000, 1010000000, 1010000000, 1010000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + tx_main_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5], + tx_main_nco_test_tone_en=[1, 1, 1, 1], + tx_channel_nco_test_tone_en=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[10000000, 10000000, 10000000, 10000000], + tx_channel_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5], + tx_main_nco_test_tone_en=[0, 0, 0, 0], + tx_channel_nco_test_tone_en=[1, 1, 1, 1], + ), + ], + ) + @pytest.mark.parametrize("peak_min", [-30]) + def test_ad9081_nco_loopback( + test_tone_loopback, iio_uri, classname, param_set, channel, frequency, peak_min, + ): + param_set = scale_field(param_set, iio_uri) +> test_tone_loopback(iio_uri, classname, param_set, channel, frequency, peak_min) + +test/test_ad9081.py:355: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081' +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [0, 0], ...} +channel = 0, frequency = 10000000, peak_min = -30 + + def nco_loopback(uri, classname, param_set, channel, frequency, peak_min): + """nco_loopback: TX/DAC Test tone loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. TX/DAC internal NCOs are used to generate a sinusoid + which is then estimated on the RX side. The receive tone must be within + 1% of its expected frequency with a specified peak + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + frequency: type=integer + Frequency in Hz of transmitted tone + peak_min: type=float + Minimum acceptable value of maximum peak in dBFS of received tone + + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + + N = 2 ** 14 + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + elif hasattr(sdr, "rx_sample_rate"): + RXFS = int(sdr.rx_sample_rate) + else: + """no sample_rate nor rx_sample_rate. Let's try something like + rx($channel)_sample_rate""" + attr = "rx" + str(channel) + "_sample_rate" + RXFS = int(getattr(sdr, attr)) + + # Pass through SDR + try: + for _ in range(10): # Wait + data = sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - frequency) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + +> assert (frequency * 0.01) > diff +E assert (10000000 * 0.01) > 538350830.078125 + +test/dma_tests.py:526: AssertionErrortest_tone_loopback = <function nco_loopback at 0x7bb6a9ca9700> +iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081' +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [10000000, 10000000], ...} +channel = 0, frequency = 10000000, peak_min = -30 + + @pytest.mark.iio_hardware(hardware, True) + @pytest.mark.parametrize("classname", [(classname)]) + @pytest.mark.parametrize("channel", [0]) + @pytest.mark.parametrize("frequency", [10000000]) + @pytest.mark.parametrize( + "param_set", + [ + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1010000000, 1010000000, 1010000000, 1010000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[0, 0, 0, 0], + tx_main_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5], + tx_main_nco_test_tone_en=[1, 1, 1, 1], + tx_channel_nco_test_tone_en=[0, 0, 0, 0], + ), + dict( + loopback_mode=0, + rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000], + rx_channel_nco_frequencies=[0, 0, 0, 0], + tx_channel_nco_frequencies=[10000000, 10000000, 10000000, 10000000], + tx_channel_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5], + tx_main_nco_test_tone_en=[0, 0, 0, 0], + tx_channel_nco_test_tone_en=[1, 1, 1, 1], + ), + ], + ) + @pytest.mark.parametrize("peak_min", [-30]) + def test_ad9081_nco_loopback( + test_tone_loopback, iio_uri, classname, param_set, channel, frequency, peak_min, + ): + param_set = scale_field(param_set, iio_uri) +> test_tone_loopback(iio_uri, classname, param_set, channel, frequency, peak_min) + +test/test_ad9081.py:355: +_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + +uri = 'ip:192.168.10.114', classname = 'adi.ad9081' +param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [10000000, 10000000], ...} +channel = 0, frequency = 10000000, peak_min = -30 + + def nco_loopback(uri, classname, param_set, channel, frequency, peak_min): + """nco_loopback: TX/DAC Test tone loopback with connected loopback cables. + This test requires a devices with TX and RX onboard where the transmit + signal can be recovered. TX/DAC internal NCOs are used to generate a sinusoid + which is then estimated on the RX side. The receive tone must be within + 1% of its expected frequency with a specified peak + + parameters: + uri: type=string + URI of IIO context of target board/system + classname: type=string + Name of pyadi interface class which contain attribute + param_set: type=dict + Dictionary of attribute and values to be set before tone is + generated and received + channel: type=list + List of integers or list of list of integers of channels to + enable through tx_enabled_channels + frequency: type=integer + Frequency in Hz of transmitted tone + peak_min: type=float + Minimum acceptable value of maximum peak in dBFS of received tone + + """ + # See if we can tone using DMAs + sdr = eval(classname + "(uri='" + uri + "')") + # Set custom device parameters + for p in param_set.keys(): + setattr(sdr, p, param_set[p]) + + N = 2 ** 14 + sdr.rx_enabled_channels = [channel] + sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) + # Create a sinewave waveform + if hasattr(sdr, "sample_rate"): + RXFS = int(sdr.sample_rate) + elif hasattr(sdr, "rx_sample_rate"): + RXFS = int(sdr.rx_sample_rate) + else: + """no sample_rate nor rx_sample_rate. Let's try something like + rx($channel)_sample_rate""" + attr = "rx" + str(channel) + "_sample_rate" + RXFS = int(getattr(sdr, attr)) + + # Pass through SDR + try: + for _ in range(10): # Wait + data = sdr.rx() + except Exception as e: + del sdr + raise Exception(e) + del sdr + tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15) + indx = np.argmax(tone_peaks) + diff = np.abs(tone_freqs[indx] - frequency) + s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) + print(s) + if do_html_log: + pytest.data_log = { + "html": gen_line_plot_html( + tone_freqs, + tone_peaks, + "Frequency (Hz)", + "Amplitude (dBFS)", + "{} ({})".format(s, classname), + ) + } + +> assert (frequency * 0.01) > diff +E assert (10000000 * 0.01) > 484472045.8984375 + +test/dma_tests.py:526: AssertionError/var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_all_inits.py:47: No required hardware found \ No newline at end of file diff --git a/tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml b/tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml deleted file mode 100644 index 9344745..0000000 --- a/tests/test_artifacts/zynqmp_zcu102_rev10_fmcdaq3_reports.xml +++ /dev/null @@ -1,265 +0,0 @@ -test_dds_loopback = <function dds_loopback at 0x000002528402BF60>, iio_uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 0, frequency = 10000000, scale = 0.06, peak_min = -50 - - @pytest.mark.iio_hardware(hardware, True) - @pytest.mark.parametrize("classname", [(classname)]) - @pytest.mark.parametrize("channel", [0, 1]) - @pytest.mark.parametrize("param_set", [dict()]) - @pytest.mark.parametrize( - "frequency, scale", - [ - (5000000, 0.12), - (10000000, 0.06), - (10000000, 0.12), - (15000000, 0.12), - (15000000, 0.5), - (200000000, 0.5), - ], - ) - @pytest.mark.parametrize("peak_min", [-50]) - def test_daq3_dds_loopback( - test_dds_loopback, - iio_uri, - classname, - param_set, - channel, - frequency, - scale, - peak_min, - ): -> test_dds_loopback( - iio_uri, classname, param_set, channel, frequency, scale, peak_min - ) - -test\test_daq3_p.py:50: -_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ - -uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 0, frequency = 10000000, scale = 0.06, peak_min = -50, use_obs = False, use_rx2 = False - - def dds_loopback( - uri, - classname, - param_set, - channel, - frequency, - scale, - peak_min, - use_obs=False, - use_rx2=False, - ): - """dds_loopback: Test DDS loopback with connected loopback cables. - This test requires a devices with TX and RX onboard where the transmit - signal can be recovered. TX FPGA DDSs are used to generate a sinusoid - which is then estimated on the RX side. The receive tone must be within - 1% of its expected frequency with a specified peak - - parameters: - uri: type=string - URI of IIO context of target board/system - classname: type=string - Name of pyadi interface class which contain attribute - param_set: type=dict - Dictionary of attribute and values to be set before tone is - generated and received - channel: type=list - List of integers or list of list of integers of channels to - enable through tx_enabled_channels - frequency: type=integer - Frequency in Hz of transmitted tone - scale: type=float - Scale of DDS tone. Range [0,1] - peak_min: type=float - Minimum acceptable value of maximum peak in dBFS of received tone - - """ - # See if we can tone using DMAs - sdr = eval(classname + "(uri='" + uri + "')") - # Set custom device parameters - for p in param_set.keys(): - setattr(sdr, p, param_set[p]) - # Set common buffer settings - sdr.tx_cyclic_buffer = True - N = 2 ** 14 - - if use_obs and use_rx2: - raise Exception("Both RX2 and OBS are selected. Select one at a time.") - - if use_rx2: - sdr.rx2_enabled_channels = [channel] - sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) - elif use_obs: - sdr.obs.rx_enabled_channels = [0] - sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels) - else: - sdr.rx_enabled_channels = [channel] - sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) - - # Create a sinewave waveform - if hasattr(sdr, "sample_rate"): - RXFS = int(sdr.sample_rate) - else: - RXFS = int(sdr.orx_sample_rate) if use_obs else int(sdr.rx_sample_rate) - - sdr.dds_single_tone(frequency, scale, channel) - - # Pass through SDR - try: - for _ in range(10): # Wait - data = sdr.rx2() if use_rx2 else sdr.obs.rx() if use_obs else sdr.rx() - except Exception as e: - del sdr - raise Exception(e) - del sdr - tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15, plot=False) - indx = np.argmax(tone_peaks) - diff = np.abs(tone_freqs[indx] - frequency) - s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) - print(s) - - if do_html_log: - pytest.data_log = { - "html": gen_line_plot_html( - tone_freqs, - tone_peaks, - "Frequency (Hz)", - "Amplitude (dBFS)", - "{} ({})".format(s, classname), - ) - } - - assert (frequency * 0.01) > diff -> assert tone_peaks[indx] > peak_min -E assert -53.914503994625996 > -50 - -test\dma_tests.py:341: AssertionErrortest_dds_loopback = <function dds_loopback at 0x000002528402BF60>, iio_uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 1, frequency = 10000000, scale = 0.06, peak_min = -50 - - @pytest.mark.iio_hardware(hardware, True) - @pytest.mark.parametrize("classname", [(classname)]) - @pytest.mark.parametrize("channel", [0, 1]) - @pytest.mark.parametrize("param_set", [dict()]) - @pytest.mark.parametrize( - "frequency, scale", - [ - (5000000, 0.12), - (10000000, 0.06), - (10000000, 0.12), - (15000000, 0.12), - (15000000, 0.5), - (200000000, 0.5), - ], - ) - @pytest.mark.parametrize("peak_min", [-50]) - def test_daq3_dds_loopback( - test_dds_loopback, - iio_uri, - classname, - param_set, - channel, - frequency, - scale, - peak_min, - ): -> test_dds_loopback( - iio_uri, classname, param_set, channel, frequency, scale, peak_min - ) - -test\test_daq3_p.py:50: -_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ - -uri = 'ip:localhost', classname = 'adi.DAQ3', param_set = {}, channel = 1, frequency = 10000000, scale = 0.06, peak_min = -50, use_obs = False, use_rx2 = False - - def dds_loopback( - uri, - classname, - param_set, - channel, - frequency, - scale, - peak_min, - use_obs=False, - use_rx2=False, - ): - """dds_loopback: Test DDS loopback with connected loopback cables. - This test requires a devices with TX and RX onboard where the transmit - signal can be recovered. TX FPGA DDSs are used to generate a sinusoid - which is then estimated on the RX side. The receive tone must be within - 1% of its expected frequency with a specified peak - - parameters: - uri: type=string - URI of IIO context of target board/system - classname: type=string - Name of pyadi interface class which contain attribute - param_set: type=dict - Dictionary of attribute and values to be set before tone is - generated and received - channel: type=list - List of integers or list of list of integers of channels to - enable through tx_enabled_channels - frequency: type=integer - Frequency in Hz of transmitted tone - scale: type=float - Scale of DDS tone. Range [0,1] - peak_min: type=float - Minimum acceptable value of maximum peak in dBFS of received tone - - """ - # See if we can tone using DMAs - sdr = eval(classname + "(uri='" + uri + "')") - # Set custom device parameters - for p in param_set.keys(): - setattr(sdr, p, param_set[p]) - # Set common buffer settings - sdr.tx_cyclic_buffer = True - N = 2 ** 14 - - if use_obs and use_rx2: - raise Exception("Both RX2 and OBS are selected. Select one at a time.") - - if use_rx2: - sdr.rx2_enabled_channels = [channel] - sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels) - elif use_obs: - sdr.obs.rx_enabled_channels = [0] - sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels) - else: - sdr.rx_enabled_channels = [channel] - sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels) - - # Create a sinewave waveform - if hasattr(sdr, "sample_rate"): - RXFS = int(sdr.sample_rate) - else: - RXFS = int(sdr.orx_sample_rate) if use_obs else int(sdr.rx_sample_rate) - - sdr.dds_single_tone(frequency, scale, channel) - - # Pass through SDR - try: - for _ in range(10): # Wait - data = sdr.rx2() if use_rx2 else sdr.obs.rx() if use_obs else sdr.rx() - except Exception as e: - del sdr - raise Exception(e) - del sdr - tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15, plot=False) - indx = np.argmax(tone_peaks) - diff = np.abs(tone_freqs[indx] - frequency) - s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx]) - print(s) - - if do_html_log: - pytest.data_log = { - "html": gen_line_plot_html( - tone_freqs, - tone_peaks, - "Frequency (Hz)", - "Amplitude (dBFS)", - "{} ({})".format(s, classname), - ) - } - - assert (frequency * 0.01) > diff -> assert tone_peaks[indx] > peak_min -E assert -54.11450058246225 > -50 - -test\dma_tests.py:341: AssertionError \ No newline at end of file diff --git a/tests/test_gparser.py b/tests/test_gparser.py index de6d36f..8be5eea 100644 --- a/tests/test_gparser.py +++ b/tests/test_gparser.py @@ -50,7 +50,7 @@ def test_get_parser(artifact, parser_object, parser_type): ('zynq-zed-adv7511-adrv9002-rx2tx2-vcmos_missing_devs.log', \ telemetry.parser.MissingDevs, 'missing_devs'), ('info.txt', telemetry.parser.InfoTxt, 'info_txt'), - ('zynqmp_zcu102_rev10_fmcdaq3_reports.xml', telemetry.parser.PytestFailure, 'pytest_failure'), + ('zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml', telemetry.parser.PytestFailure, 'pytest_failure'), ] ) def test_parser(artifact, parser_object, parser_type): From 76b6669ee0fffc5c723780378c602940b647a488 Mon Sep 17 00:00:00 2001 From: Trisha De Vera Date: Fri, 20 Dec 2024 22:20:07 +0800 Subject: [PATCH 7/7] Fix parsing errors on log artifacts Signed-off-by: Trisha De Vera --- telemetry/gparser/parser.py | 73 +++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/telemetry/gparser/parser.py b/telemetry/gparser/parser.py index 6f47b3f..87c2ed1 100644 --- a/telemetry/gparser/parser.py +++ b/telemetry/gparser/parser.py @@ -266,49 +266,44 @@ def get_payload_raw(self): # Find the failure tag failure = testcase.find("failure") - # Test description links from pyadi-iio - attr_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html" - dma_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_dma.html" - generic_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_generics.html" + attr_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html" # Set default description link for all tests - test_name_link = f"[{test_name}]({attr_link})" - - url_links = [attr_link, dma_link, generic_link] - - # Get parameters from test case name - param_parsed = test_name.split("[", 1) - test_name = param_parsed[0] - param_parsed_last = (param_parsed[-1])[:-1].strip() - # Separate the parameters - param_separate = re.split(r'-(?!\d)', param_parsed_last) - # Check parameters list for a dictionary named "param_set" - for index, param in enumerate(param_separate): - # Check if param_set parameter name exists - param_list = [] - # Remove param_set= in string - new_param_split = param.split("=", 1) - param_name = new_param_split[0] - new_param = new_param_split[1] - # Check if param is a dictionary and not empty - if new_param[0] == "{" and new_param != "\{\}": - # Convert remaining string to a dictionary - param_dict = ast.literal_eval(new_param) - if param_dict: - for key, value in param_dict.items(): - # Convert parameters of param_set back to string - new_updated = "'" + key + "'" + ": " + str(value) - param_list.append(new_updated) - insert_param_name = param_name + "=" - param_list.insert(0, insert_param_name) - param_list_string = " \n".join(param_list) - # Update param in param_separate list - param_separate[index] = param_list_string - # Compile final parameter details - param_separate.insert(0,"**Parameters:**") - param_display = "\n - ".join(param_separate) + test_name_link = f"[{test_name}]({attr_link})" if properties is not None: if failure is not None: + # Get parameters from test case name + param_parsed = test_name.split("[", 1) + test_name = param_parsed[0] + param_parsed_last = (param_parsed[-1])[:-1].strip() + # Separate the parameters + param_separate = re.split(r'-(?!\d)', param_parsed_last) + # Check parameters list for a dictionary" + for index, param in enumerate(param_separate): + param_list = [] + # Separate values from parameter name + new_param_split = param.split("=", 1) + if len(new_param_split) > 1: + param_name = new_param_split[0] + new_param = new_param_split[1].strip() + # Check if param is a dictionary and not empty + if new_param[0] == "{" and new_param != "\{\}": + # Convert remaining string to a dictionary + param_dict = ast.literal_eval(new_param) + if param_dict: + for key, value in param_dict.items(): + # Convert parameters back to string + new_updated = "'" + key + "'" + ": " + str(value) + param_list.append(new_updated) + insert_param_name = param_name + "=" + param_list.insert(0, insert_param_name) + param_list_string = " \n".join(param_list) + # Update param in param_separate list + param_separate[index] = param_list_string + # Compile final parameter details + param_separate.insert(0,"**Parameters:**") + param_display = "\n - ".join(param_separate) + # Get failure tag content failure_text = failure.text fail_content_lines = failure_text.splitlines()