From 07d534d00782a6b098815f9530f0685fa27497b1 Mon Sep 17 00:00:00 2001 From: Casey Marshall Date: Tue, 8 Apr 2025 19:02:48 -0500 Subject: [PATCH] Adding test coverage. Adding some basic tests and a Github action that runs them. --- .github/workflows/python-test.yml | 32 +++++++ pystdf/IO.py | 3 + tests/__init__.py | 1 + tests/test_BinSummarizer.py | 94 ++++++++++++++++++++ tests/test_IO.py | 140 ++++++++++++++++++++++++++++++ 5 files changed, 270 insertions(+) create mode 100644 .github/workflows/python-test.yml create mode 100644 tests/__init__.py create mode 100644 tests/test_BinSummarizer.py create mode 100644 tests/test_IO.py diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml new file mode 100644 index 0000000..f29b0a6 --- /dev/null +++ b/.github/workflows/python-test.yml @@ -0,0 +1,32 @@ +name: Python Tests + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: |- + python -m pip install --upgrade pip + python -m pip install pytest + python -m pip install . + - name: Run tests + run: |- + python -m pytest tests/ -v diff --git a/pystdf/IO.py b/pystdf/IO.py index 484b514..42c4071 100644 --- a/pystdf/IO.py +++ b/pystdf/IO.py @@ -49,6 +49,9 @@ def readAndUnpack(self, header, fmt): if len(buf) == 0: self.eof = 1 raise EofException() + if len(buf) < size: + header.len = 0 + raise EndOfRecordException() header.len -= len(buf) val,=struct.unpack(self.endian + fmt, buf) if isinstance(val,bytes): diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/test_BinSummarizer.py b/tests/test_BinSummarizer.py new file mode 100644 index 0000000..ae806f9 --- /dev/null +++ b/tests/test_BinSummarizer.py @@ -0,0 +1,94 @@ +import unittest +from pystdf.BinSummarizer import BinSummarizer +from pystdf.V4 import prr, hbr, sbr + +class MockDataSource: + def __init__(self): + self.name = "MockDataSource" + +class TestBinSummarizer(unittest.TestCase): + def setUp(self): + self.summarizer = BinSummarizer() + self.dataSource = MockDataSource() + self.summarizer.before_begin(self.dataSource) + + def test_flags(self): + # Test hard bin flags + hbr_row = [0] * len(hbr.fieldNames) + hbr_row[hbr.HBIN_PF] = 'F' + self.assertEqual(self.summarizer.getHPfFlags(hbr_row), BinSummarizer.FLAG_FAIL) + + hbr_row[hbr.HBIN_PF] = 'P' + self.assertEqual(self.summarizer.getHPfFlags(hbr_row), 0) + + hbr_row[hbr.HBIN_PF] = 'X' + self.assertEqual(self.summarizer.getHPfFlags(hbr_row), BinSummarizer.FLAG_UNKNOWN) + + # Test soft bin flags + sbr_row = [0] * len(sbr.fieldNames) + sbr_row[sbr.SBIN_PF] = 'F' + self.assertEqual(self.summarizer.getSPfFlags(sbr_row), BinSummarizer.FLAG_FAIL) + + sbr_row[sbr.SBIN_PF] = 'P' + self.assertEqual(self.summarizer.getSPfFlags(sbr_row), 0) + + sbr_row[sbr.SBIN_PF] = 'X' + self.assertEqual(self.summarizer.getSPfFlags(sbr_row), BinSummarizer.FLAG_UNKNOWN) + + def test_bin_storage(self): + # Test HBR storage + hbr_row = [0] * len(hbr.fieldNames) + hbr_row[hbr.HEAD_NUM] = 255 # Overall bin + hbr_row[hbr.HBIN_NUM] = 1 + self.summarizer.onHbr(hbr_row) + self.assertEqual(len(self.summarizer.getOverallHbins()), 1) + + hbr_row[hbr.HEAD_NUM] = 1 # Site-specific bin + hbr_row[hbr.SITE_NUM] = 1 + self.summarizer.onHbr(hbr_row) + self.assertEqual(len(self.summarizer.getSiteHbins()), 1) + + # Test SBR storage + sbr_row = [0] * len(sbr.fieldNames) + sbr_row[sbr.HEAD_NUM] = 255 # Overall bin + sbr_row[sbr.SBIN_NUM] = 1 + self.summarizer.onSbr(sbr_row) + self.assertEqual(len(self.summarizer.getOverallSbins()), 1) + + sbr_row[sbr.HEAD_NUM] = 1 # Site-specific bin + sbr_row[sbr.SITE_NUM] = 1 + self.summarizer.onSbr(sbr_row) + self.assertEqual(len(self.summarizer.getSiteSbins()), 1) + + def test_part_tracking(self): + prr_row = [0] * len(prr.fieldNames) + prr_row[prr.SITE_NUM] = 1 + prr_row[prr.HARD_BIN] = 1 + prr_row[prr.SOFT_BIN] = 1 + prr_row[prr.PART_FLG] = 0 # Pass + + # Test part counting and pass/fail tracking + self.summarizer.onPrr(prr_row) + + # Check hard bin tracking + count, status = self.summarizer.hbinParts[(1, 1)] + self.assertEqual(count[0], 1) + self.assertEqual(status[0], 'P') + + # Check soft bin tracking + count, status = self.summarizer.sbinParts[(1, 1)] + self.assertEqual(count[0], 1) + # Soft bins initialize with False, so they'll get ' ' status + self.assertEqual(status[0], ' ') + + # Test fail case + prr_row[prr.PART_FLG] = 0x08 # Fail + self.summarizer.onPrr(prr_row) + + # Check status becomes mixed (' ') when both pass and fail seen + count, status = self.summarizer.hbinParts[(1, 1)] + self.assertEqual(count[0], 2) + self.assertEqual(status[0], ' ') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_IO.py b/tests/test_IO.py new file mode 100644 index 0000000..a522540 --- /dev/null +++ b/tests/test_IO.py @@ -0,0 +1,140 @@ +import unittest +import io +import sys +from pystdf.IO import Parser, appendFieldParser +from pystdf.Types import RecordHeader, EofException, EndOfRecordException +from pystdf import V4 + +class TestIO(unittest.TestCase): + def setUp(self): + sys.stderr.write('Setting up test...\n') + self.test_stream = io.BytesIO() + self.parser = Parser(recTypes=V4.records, inp=self.test_stream, endian='<') + self.parser.eof = 0 # Reset EOF flag + sys.stderr.write('Setup complete.\n') + + def write_to_stream(self, data): + """Helper method to write bytes to the test stream and reset position""" + self.test_stream.write(data) + self.test_stream.seek(0) + + def test_read_field_types(self): + sys.stderr.write('Starting field types test...\n') + # Set up header + header = RecordHeader() + + # Test U1 (unsigned 1-byte integer) + test_data = bytes([42]) + self.write_to_stream(test_data) + header.len = 1 + value = self.parser.readField(header, "U1") + sys.stderr.write(f'U1 value: {value}\n') + self.assertEqual(value, 42) + + def test_read_header(self): + header_data = bytes([ + 0x0A, 0x00, # Length (10) + 0x15, # Type (21) + 0x20 # Sub-type (32) + ]) + self.write_to_stream(header_data) + header = self.parser.readHeader() + self.assertEqual(header.len, 10) + self.assertEqual(header.typ, 21) + self.assertEqual(header.sub, 32) + + def test_read_field_types(self): + print('Starting field types test...') + # Set up header + header = RecordHeader() + + # Test U1 (unsigned 1-byte integer) + test_data = bytes([42]) + self.write_to_stream(test_data) + header.len = 1 + value = self.parser.readField(header, "U1") + self.assertEqual(value, 42) + + # Test U2 (unsigned 2-byte integer) + self.test_stream.seek(0) + self.test_stream.truncate() + test_data = bytes([0x2A, 0x00]) + self.write_to_stream(test_data) + header.len = 2 + value = self.parser.readField(header, "U2") + self.assertEqual(value, 42) + + # Test I1 (signed 1-byte integer) + self.test_stream.seek(0) + self.test_stream.truncate() + test_data = bytes([0xFF]) # -1 in two's complement + self.write_to_stream(test_data) + header.len = 1 + value = self.parser.readField(header, "I1") + self.assertEqual(value, -1) + + def test_read_string(self): + print('Starting string test...') + # Set up header + header = RecordHeader() + header.len = 6 # 1 byte length + 5 bytes string + + # Test Cn (variable-length string) + test_str = b"Hello" + test_data = bytes([len(test_str)]) + test_str # String length + string data + self.write_to_stream(test_data) + value = self.parser.readCn(header) + self.assertEqual(value, "Hello") + + # Test empty string + self.test_stream.seek(0) + self.test_stream.truncate() + test_data = bytes([0]) # Length 0 + self.write_to_stream(test_data) + header.len = 1 + value = self.parser.readCn(header) + self.assertEqual(value, "") + + def test_read_array(self): + print('Starting array test...') + # Test array of U1 + test_data = bytes([10, 20, 30]) + self.write_to_stream(test_data) + header = RecordHeader() + header.len = 3 + values = self.parser.readArray(header, 3, "U1") + self.assertEqual(values, [10, 20, 30]) + + def test_append_field_parser(self): + print('Starting append field parser test...') + def base_parser(*args): + return [1, 2] + + def field_action(*args): + return 3 + + new_parser = appendFieldParser(base_parser, field_action) + result = new_parser() + self.assertEqual(result, [1, 2, 3]) + + def test_end_of_record(self): + print('Starting end of record test...') + # Test handling of premature end of record + test_data = bytes([0x02]) # Only 1 byte when 2 are expected + self.write_to_stream(test_data) + header = RecordHeader() + header.len = 2 + with self.assertRaises(EndOfRecordException): + self.parser.readField(header, "U2") + + def test_eof(self): + print('Starting EOF test...') + # Test handling of EOF + self.write_to_stream(bytes([])) # Empty stream + header = RecordHeader() + header.len = 1 + with self.assertRaises(EofException): + self.parser.readField(header, "U1") + +if __name__ == '__main__': + unittest.main()