From f0a3acccd371443348e1019ff0e82e2f59b049ea Mon Sep 17 00:00:00 2001 From: doomedraven Date: Thu, 8 Jan 2026 15:21:23 +0100 Subject: [PATCH 1/6] Add PGP metadata extraction and tests Implemented a get_metadata() method in the PGP unpacker to identify PGP file types (public key, private key, encrypted message, signature) from both ASCII-armored and binary formats. Added unit tests to verify metadata extraction for various PGP data types. Improved resource cleanup in the unpack method. --- sflock/unpack/pgp.py | 42 ++++++++++++++++++++++++++++++++++---- tests/test_pgp_metadata.py | 40 ++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 4 deletions(-) create mode 100644 tests/test_pgp_metadata.py diff --git a/sflock/unpack/pgp.py b/sflock/unpack/pgp.py index eeede9a..52d19c9 100644 --- a/sflock/unpack/pgp.py +++ b/sflock/unpack/pgp.py @@ -4,6 +4,7 @@ # See the file 'docs/LICENSE.txt' for copying permission. import os +import shutil import subprocess import tempfile @@ -38,7 +39,7 @@ def unpack(self, password: str = None, duplicates=None): stderr=subprocess.PIPE, ) - stdout, stderr = p.communicate(timeout=30) + _, _ = p.communicate(timeout=30) return_code = p.returncode except subprocess.TimeoutExpired: @@ -51,12 +52,45 @@ def unpack(self, password: str = None, duplicates=None): p.kill() p.wait() return_code = 1 + finally: + if temporary and os.path.exists(filepath): + os.unlink(filepath) + + if os.path.exists(dirpath): + shutil.rmtree(dirpath) + ret = not return_code if not ret: return [] - if temporary: - os.unlink(filepath) + def get_metadata(self): + ret = [] + content = self.f.contents + if not content: + return ret + + if b"BEGIN PGP PUBLIC KEY BLOCK" in content: + ret.append("public_key") + elif b"BEGIN PGP PRIVATE KEY BLOCK" in content: + ret.append("private_key") + elif b"BEGIN PGP MESSAGE" in content: + ret.append("encrypted_message") + elif b"BEGIN PGP SIGNATURE" in content: + ret.append("signature") + elif content[0] & 0x80: + # Binary analysis + tag = content[0] + if tag & 0x40: # New format + tag_type = tag & 0x3F + else: # Old format + tag_type = (tag >> 2) & 0xF + + if tag_type in (6, 14): + ret.append("public_key") + elif tag_type == 5: + ret.append("private_key") + elif tag_type in (1, 18): + ret.append("encrypted_message") - return self.process_directory(dirpath, duplicates, password) + return ret \ No newline at end of file diff --git a/tests/test_pgp_metadata.py b/tests/test_pgp_metadata.py new file mode 100644 index 0000000..5865533 --- /dev/null +++ b/tests/test_pgp_metadata.py @@ -0,0 +1,40 @@ +import unittest +from sflock.unpack.pgp import PGP +from sflock.abstracts import File + +class TestPGPMetadata(unittest.TestCase): + def test_public_key_ascii(self): + f = File(contents=b"-----BEGIN PGP PUBLIC KEY BLOCK-----...") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["public_key"]) + + def test_private_key_ascii(self): + f = File(contents=b"-----BEGIN PGP PRIVATE KEY BLOCK-----...") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["private_key"]) + + def test_encrypted_ascii(self): + f = File(contents=b"-----BEGIN PGP MESSAGE-----...") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["encrypted_message"]) + + def test_binary_public_key_old(self): + # Tag 6 (Public Key Packet) -> 000110 + # Old format: 10xxxxxx + # Tag is bits 5-2. + # 0x80 | (6 << 2) = 0x80 | 0x18 = 0x98. + # Let's use 0x98 (length type 0 - 1 byte length) + f = File(contents=b"\x98\x01") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["public_key"]) + + def test_binary_encrypted_new(self): + # Tag 18 (Sym Encrypted) -> 010010 + # New format: 11xxxxxx + # 0xC0 | 18 = 0xC0 | 0x12 = 0xD2 + f = File(contents=b"\xD2\x05") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["encrypted_message"]) + +if __name__ == '__main__': + unittest.main() From 952db2c22adf54e68feb77b3db2002d8650ba012 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Thu, 8 Jan 2026 21:55:27 +0100 Subject: [PATCH 2/6] fix --- sflock/unpack/pgp.py | 23 ++++++++++----- tests/test_pgp_metadata.py | 58 +++++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/sflock/unpack/pgp.py b/sflock/unpack/pgp.py index 52d19c9..04c0c1f 100644 --- a/sflock/unpack/pgp.py +++ b/sflock/unpack/pgp.py @@ -17,6 +17,13 @@ class PGP(Unpacker): exts = b".pgp", b".gpg" magic = "PGP " + TAG_SESSION_KEY = 1 + TAG_SIGNATURE = 2 + TAG_SECRET_KEY = 5 + TAG_PUBLIC_KEY = 6 + TAG_PUBLIC_SUBKEY = 14 + TAG_ENCRYPTED_DATA = 18 + def unpack(self, password: str = None, duplicates=None): dirpath = tempfile.mkdtemp() @@ -56,14 +63,14 @@ def unpack(self, password: str = None, duplicates=None): if temporary and os.path.exists(filepath): os.unlink(filepath) - if os.path.exists(dirpath): - shutil.rmtree(dirpath) - - ret = not return_code if not ret: + if os.path.exists(dirpath): + shutil.rmtree(dirpath) return [] + return self.process_directory(dirpath, duplicates, password) + def get_metadata(self): ret = [] content = self.f.contents @@ -86,11 +93,13 @@ def get_metadata(self): else: # Old format tag_type = (tag >> 2) & 0xF - if tag_type in (6, 14): + if tag_type in (self.TAG_PUBLIC_KEY, self.TAG_PUBLIC_SUBKEY): ret.append("public_key") - elif tag_type == 5: + elif tag_type == self.TAG_SECRET_KEY: ret.append("private_key") - elif tag_type in (1, 18): + elif tag_type == self.TAG_SIGNATURE: + ret.append("signature") + elif tag_type in (self.TAG_SESSION_KEY, self.TAG_ENCRYPTED_DATA): ret.append("encrypted_message") return ret \ No newline at end of file diff --git a/tests/test_pgp_metadata.py b/tests/test_pgp_metadata.py index 5865533..8c0bb1e 100644 --- a/tests/test_pgp_metadata.py +++ b/tests/test_pgp_metadata.py @@ -18,16 +18,62 @@ def test_encrypted_ascii(self): p = PGP(f) self.assertEqual(p.get_metadata(), ["encrypted_message"]) + def test_signature_ascii(self): + f = File(contents=b"-----BEGIN PGP SIGNATURE-----...") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["signature"]) + def test_binary_public_key_old(self): # Tag 6 (Public Key Packet) -> 000110 # Old format: 10xxxxxx # Tag is bits 5-2. # 0x80 | (6 << 2) = 0x80 | 0x18 = 0x98. - # Let's use 0x98 (length type 0 - 1 byte length) f = File(contents=b"\x98\x01") p = PGP(f) self.assertEqual(p.get_metadata(), ["public_key"]) + def test_binary_public_key_new(self): + # Tag 6 -> 000110 + # New format: 11xxxxxx -> 0xC0 | 6 = 0xC6 + f = File(contents=b"\xC6\x01") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["public_key"]) + + def test_binary_private_key_old(self): + # Tag 5 (Secret Key Packet) -> 000101 + # Old format: 0x80 | (5 << 2) = 0x80 | 0x14 = 0x94 + f = File(contents=b"\x94\x01") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["private_key"]) + + def test_binary_private_key_new(self): + # Tag 5 -> 000101 + # New format: 11xxxxxx -> 0xC0 | 5 = 0xC5 + f = File(contents=b"\xC5\x01") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["private_key"]) + + def test_binary_signature_old(self): + # Tag 2 (Signature Packet) -> 000010 + # Old format: 0x80 | (2 << 2) = 0x80 | 0x08 = 0x88 + f = File(contents=b"\x88\x01") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["signature"]) + + def test_binary_signature_new(self): + # Tag 2 -> 000010 + # New format: 11xxxxxx -> 0xC0 | 2 = 0xC2 + f = File(contents=b"\xC2\x05") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["signature"]) + + def test_binary_encrypted_old(self): + # Tag 1 (Public-Key Encrypted Session Key Packet) -> 000001 + # Old format: 0x80 | (1 << 2) = 0x84 + f = File(contents=b"\x84\x01") + p = PGP(f) + self.assertEqual(p.get_metadata(), ["encrypted_message"]) + def test_binary_encrypted_new(self): # Tag 18 (Sym Encrypted) -> 010010 # New format: 11xxxxxx @@ -36,5 +82,15 @@ def test_binary_encrypted_new(self): p = PGP(f) self.assertEqual(p.get_metadata(), ["encrypted_message"]) + def test_no_pgp_data(self): + f = File(contents=b"this is not pgp data") + p = PGP(f) + self.assertEqual(p.get_metadata(), []) + + def test_empty_content(self): + f = File(contents=b"") + p = PGP(f) + self.assertEqual(p.get_metadata(), []) + if __name__ == '__main__': unittest.main() From b3e546baed66625dd43452c6cb20eb2f68be3a62 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Thu, 8 Jan 2026 21:56:31 +0100 Subject: [PATCH 3/6] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- tests/test_pgp_metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_pgp_metadata.py b/tests/test_pgp_metadata.py index 8c0bb1e..ded56b2 100644 --- a/tests/test_pgp_metadata.py +++ b/tests/test_pgp_metadata.py @@ -28,7 +28,7 @@ def test_binary_public_key_old(self): # Old format: 10xxxxxx # Tag is bits 5-2. # 0x80 | (6 << 2) = 0x80 | 0x18 = 0x98. - f = File(contents=b"\x98\x01") + f = File(contents=b"\x98\x01") p = PGP(f) self.assertEqual(p.get_metadata(), ["public_key"]) From 4b51a82b10fc4b07c8eff4bf7ddbdaec89022208 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Thu, 8 Jan 2026 22:09:00 +0100 Subject: [PATCH 4/6] update --- sflock/unpack/pgp.py | 15 +++-- tests/test_pgp_metadata.py | 114 +++++++++---------------------------- 2 files changed, 36 insertions(+), 93 deletions(-) diff --git a/sflock/unpack/pgp.py b/sflock/unpack/pgp.py index 04c0c1f..81c79bc 100644 --- a/sflock/unpack/pgp.py +++ b/sflock/unpack/pgp.py @@ -24,6 +24,11 @@ class PGP(Unpacker): TAG_PUBLIC_SUBKEY = 14 TAG_ENCRYPTED_DATA = 18 + META_PUBLIC_KEY = "public_key" + META_PRIVATE_KEY = "private_key" + META_ENCRYPTED_MESSAGE = "encrypted_message" + META_SIGNATURE = "signature" + def unpack(self, password: str = None, duplicates=None): dirpath = tempfile.mkdtemp() @@ -94,12 +99,12 @@ def get_metadata(self): tag_type = (tag >> 2) & 0xF if tag_type in (self.TAG_PUBLIC_KEY, self.TAG_PUBLIC_SUBKEY): - ret.append("public_key") + ret.append(self.META_PUBLIC_KEY) elif tag_type == self.TAG_SECRET_KEY: - ret.append("private_key") + ret.append(self.META_PRIVATE_KEY) elif tag_type == self.TAG_SIGNATURE: - ret.append("signature") + ret.append(self.META_SIGNATURE) elif tag_type in (self.TAG_SESSION_KEY, self.TAG_ENCRYPTED_DATA): - ret.append("encrypted_message") + ret.append(self.META_ENCRYPTED_MESSAGE) - return ret \ No newline at end of file + return ret diff --git a/tests/test_pgp_metadata.py b/tests/test_pgp_metadata.py index 8c0bb1e..21b08fc 100644 --- a/tests/test_pgp_metadata.py +++ b/tests/test_pgp_metadata.py @@ -3,94 +3,32 @@ from sflock.abstracts import File class TestPGPMetadata(unittest.TestCase): - def test_public_key_ascii(self): - f = File(contents=b"-----BEGIN PGP PUBLIC KEY BLOCK-----...") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["public_key"]) - - def test_private_key_ascii(self): - f = File(contents=b"-----BEGIN PGP PRIVATE KEY BLOCK-----...") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["private_key"]) - - def test_encrypted_ascii(self): - f = File(contents=b"-----BEGIN PGP MESSAGE-----...") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["encrypted_message"]) - - def test_signature_ascii(self): - f = File(contents=b"-----BEGIN PGP SIGNATURE-----...") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["signature"]) - - def test_binary_public_key_old(self): - # Tag 6 (Public Key Packet) -> 000110 - # Old format: 10xxxxxx - # Tag is bits 5-2. - # 0x80 | (6 << 2) = 0x80 | 0x18 = 0x98. - f = File(contents=b"\x98\x01") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["public_key"]) - - def test_binary_public_key_new(self): - # Tag 6 -> 000110 - # New format: 11xxxxxx -> 0xC0 | 6 = 0xC6 - f = File(contents=b"\xC6\x01") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["public_key"]) - - def test_binary_private_key_old(self): - # Tag 5 (Secret Key Packet) -> 000101 - # Old format: 0x80 | (5 << 2) = 0x80 | 0x14 = 0x94 - f = File(contents=b"\x94\x01") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["private_key"]) - - def test_binary_private_key_new(self): - # Tag 5 -> 000101 - # New format: 11xxxxxx -> 0xC0 | 5 = 0xC5 - f = File(contents=b"\xC5\x01") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["private_key"]) - - def test_binary_signature_old(self): - # Tag 2 (Signature Packet) -> 000010 - # Old format: 0x80 | (2 << 2) = 0x80 | 0x08 = 0x88 - f = File(contents=b"\x88\x01") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["signature"]) - - def test_binary_signature_new(self): - # Tag 2 -> 000010 - # New format: 11xxxxxx -> 0xC0 | 2 = 0xC2 - f = File(contents=b"\xC2\x05") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["signature"]) - - def test_binary_encrypted_old(self): - # Tag 1 (Public-Key Encrypted Session Key Packet) -> 000001 - # Old format: 0x80 | (1 << 2) = 0x84 - f = File(contents=b"\x84\x01") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["encrypted_message"]) - - def test_binary_encrypted_new(self): - # Tag 18 (Sym Encrypted) -> 010010 - # New format: 11xxxxxx - # 0xC0 | 18 = 0xC0 | 0x12 = 0xD2 - f = File(contents=b"\xD2\x05") - p = PGP(f) - self.assertEqual(p.get_metadata(), ["encrypted_message"]) - - def test_no_pgp_data(self): - f = File(contents=b"this is not pgp data") - p = PGP(f) - self.assertEqual(p.get_metadata(), []) - - def test_empty_content(self): - f = File(contents=b"") - p = PGP(f) - self.assertEqual(p.get_metadata(), []) + def test_get_metadata(self): + # (test_name, contents, expected_metadata) + test_cases = [ + ("public_key_ascii", b"-----BEGIN PGP PUBLIC KEY BLOCK-----...", ["public_key"]), + ("private_key_ascii", b"-----BEGIN PGP PRIVATE KEY BLOCK-----...", ["private_key"]), + ("encrypted_ascii", b"-----BEGIN PGP MESSAGE-----...", ["encrypted_message"]), + ("signature_ascii", b"-----BEGIN PGP SIGNATURE-----...", ["signature"]), + # Old format: 0x80 | (tag << 2) + ("binary_public_key_old", b"\x98\x01", ["public_key"]), # Tag 6 + ("binary_private_key_old", b"\x94\x01", ["private_key"]), # Tag 5 + ("binary_signature_old", b"\x88\x01", ["signature"]), # Tag 2 + ("binary_encrypted_old", b"\x84\x01", ["encrypted_message"]),# Tag 1 + # New format: 0xC0 | tag + ("binary_public_key_new", b"\xC6\x01", ["public_key"]), # Tag 6 + ("binary_private_key_new", b"\xC5\x01", ["private_key"]), # Tag 5 + ("binary_signature_new", b"\xC2\x05", ["signature"]), # Tag 2 + ("binary_encrypted_new", b"\xD2\x05", ["encrypted_message"]),# Tag 18 + ("no_pgp_data", b"this is not pgp data", []), + ("empty_content", b"", []), + ] + + for name, contents, expected in test_cases: + with self.subTest(name=name): + f = File(contents=contents) + p = PGP(f) + self.assertEqual(p.get_metadata(), expected) if __name__ == '__main__': unittest.main() From 2b56a9c9eba9296d5408f5ccaaa3f50a3ba5d5d8 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Thu, 8 Jan 2026 22:11:45 +0100 Subject: [PATCH 5/6] Update pgp.py --- sflock/unpack/pgp.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sflock/unpack/pgp.py b/sflock/unpack/pgp.py index 81c79bc..2139b28 100644 --- a/sflock/unpack/pgp.py +++ b/sflock/unpack/pgp.py @@ -29,6 +29,11 @@ class PGP(Unpacker): META_ENCRYPTED_MESSAGE = "encrypted_message" META_SIGNATURE = "signature" + def handles(self): + if not super(PGP, self).handles(): + return False + return "encrypted_message" in self.get_metadata() + def unpack(self, password: str = None, duplicates=None): dirpath = tempfile.mkdtemp() From 52f5f728d3f011ad4ad4a289fc548756b3e25fb1 Mon Sep 17 00:00:00 2001 From: doomedraven Date: Thu, 8 Jan 2026 22:14:53 +0100 Subject: [PATCH 6/6] sync --- tests/files/dummy_encrypted.gpg | Bin 0 -> 7 bytes tests/{test_pgp_metadata.py => test_pgp.py} | 20 ++++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 tests/files/dummy_encrypted.gpg rename tests/{test_pgp_metadata.py => test_pgp.py} (69%) diff --git a/tests/files/dummy_encrypted.gpg b/tests/files/dummy_encrypted.gpg new file mode 100644 index 0000000000000000000000000000000000000000..729249facc7569107c436e0a858a80e764478db1 GIT binary patch literal 7 Lcmcb_$^ZfY1>*qN literal 0 HcmV?d00001 diff --git a/tests/test_pgp_metadata.py b/tests/test_pgp.py similarity index 69% rename from tests/test_pgp_metadata.py rename to tests/test_pgp.py index 21b08fc..431f387 100644 --- a/tests/test_pgp_metadata.py +++ b/tests/test_pgp.py @@ -1,4 +1,5 @@ import unittest +import os from sflock.unpack.pgp import PGP from sflock.abstracts import File @@ -30,5 +31,24 @@ def test_get_metadata(self): p = PGP(f) self.assertEqual(p.get_metadata(), expected) + def test_dummy_encrypted_handles(self): + filepath = os.path.join("tests", "files", "dummy_encrypted.gpg") + if not os.path.exists(filepath): + # Create the dummy file if it doesn't exist (useful if running independently) + with open(filepath, 'wb') as f: + f.write(b'\xD2\x05\x00\x00\x00\x00\x00') + + with open(filepath, "rb") as f: + content = f.read() + + f = File(filepath=filepath, contents=content) + p = PGP(f) + + # Verify metadata extraction + self.assertIn("encrypted_message", p.get_metadata()) + + # Verify handles() returns True because "encrypted_message" is present + self.assertTrue(p.handles()) + if __name__ == '__main__': unittest.main()