diff --git a/sflock/unpack/pgp.py b/sflock/unpack/pgp.py index eeede9a..2139b28 100644 --- a/sflock/unpack/pgp.py +++ b/sflock/unpack/pgp.py @@ -4,6 +4,7 @@ # See the file 'docs/LICENSE.txt' for copying permission. import os +import shutil import subprocess import tempfile @@ -16,6 +17,23 @@ class PGP(Unpacker): exts = b".pgp", b".gpg" magic = "PGP " + TAG_SESSION_KEY = 1 + TAG_SIGNATURE = 2 + TAG_SECRET_KEY = 5 + TAG_PUBLIC_KEY = 6 + TAG_PUBLIC_SUBKEY = 14 + TAG_ENCRYPTED_DATA = 18 + + META_PUBLIC_KEY = "public_key" + META_PRIVATE_KEY = "private_key" + META_ENCRYPTED_MESSAGE = "encrypted_message" + META_SIGNATURE = "signature" + + def handles(self): + if not super(PGP, self).handles(): + return False + return "encrypted_message" in self.get_metadata() + def unpack(self, password: str = None, duplicates=None): dirpath = tempfile.mkdtemp() @@ -38,7 +56,7 @@ def unpack(self, password: str = None, duplicates=None): stderr=subprocess.PIPE, ) - stdout, stderr = p.communicate(timeout=30) + _, _ = p.communicate(timeout=30) return_code = p.returncode except subprocess.TimeoutExpired: @@ -51,12 +69,47 @@ def unpack(self, password: str = None, duplicates=None): p.kill() p.wait() return_code = 1 + finally: + if temporary and os.path.exists(filepath): + os.unlink(filepath) ret = not return_code if not ret: + if os.path.exists(dirpath): + shutil.rmtree(dirpath) return [] - if temporary: - os.unlink(filepath) - return self.process_directory(dirpath, duplicates, password) + + def get_metadata(self): + ret = [] + content = self.f.contents + if not content: + return ret + + if b"BEGIN PGP PUBLIC KEY BLOCK" in content: + ret.append("public_key") + elif b"BEGIN PGP PRIVATE KEY BLOCK" in content: + ret.append("private_key") + elif b"BEGIN PGP MESSAGE" in content: + ret.append("encrypted_message") + elif b"BEGIN PGP SIGNATURE" in content: + ret.append("signature") + elif content[0] & 0x80: + # Binary analysis + tag = content[0] + if tag & 0x40: # New format + tag_type = tag & 0x3F + else: # Old format + tag_type = (tag >> 2) & 0xF + + if tag_type in (self.TAG_PUBLIC_KEY, self.TAG_PUBLIC_SUBKEY): + ret.append(self.META_PUBLIC_KEY) + elif tag_type == self.TAG_SECRET_KEY: + ret.append(self.META_PRIVATE_KEY) + elif tag_type == self.TAG_SIGNATURE: + ret.append(self.META_SIGNATURE) + elif tag_type in (self.TAG_SESSION_KEY, self.TAG_ENCRYPTED_DATA): + ret.append(self.META_ENCRYPTED_MESSAGE) + + return ret diff --git a/tests/files/dummy_encrypted.gpg b/tests/files/dummy_encrypted.gpg new file mode 100644 index 0000000..729249f Binary files /dev/null and b/tests/files/dummy_encrypted.gpg differ diff --git a/tests/test_pgp.py b/tests/test_pgp.py new file mode 100644 index 0000000..431f387 --- /dev/null +++ b/tests/test_pgp.py @@ -0,0 +1,54 @@ +import unittest +import os +from sflock.unpack.pgp import PGP +from sflock.abstracts import File + +class TestPGPMetadata(unittest.TestCase): + def test_get_metadata(self): + # (test_name, contents, expected_metadata) + test_cases = [ + ("public_key_ascii", b"-----BEGIN PGP PUBLIC KEY BLOCK-----...", ["public_key"]), + ("private_key_ascii", b"-----BEGIN PGP PRIVATE KEY BLOCK-----...", ["private_key"]), + ("encrypted_ascii", b"-----BEGIN PGP MESSAGE-----...", ["encrypted_message"]), + ("signature_ascii", b"-----BEGIN PGP SIGNATURE-----...", ["signature"]), + # Old format: 0x80 | (tag << 2) + ("binary_public_key_old", b"\x98\x01", ["public_key"]), # Tag 6 + ("binary_private_key_old", b"\x94\x01", ["private_key"]), # Tag 5 + ("binary_signature_old", b"\x88\x01", ["signature"]), # Tag 2 + ("binary_encrypted_old", b"\x84\x01", ["encrypted_message"]),# Tag 1 + # New format: 0xC0 | tag + ("binary_public_key_new", b"\xC6\x01", ["public_key"]), # Tag 6 + ("binary_private_key_new", b"\xC5\x01", ["private_key"]), # Tag 5 + ("binary_signature_new", b"\xC2\x05", ["signature"]), # Tag 2 + ("binary_encrypted_new", b"\xD2\x05", ["encrypted_message"]),# Tag 18 + ("no_pgp_data", b"this is not pgp data", []), + ("empty_content", b"", []), + ] + + for name, contents, expected in test_cases: + with self.subTest(name=name): + f = File(contents=contents) + p = PGP(f) + self.assertEqual(p.get_metadata(), expected) + + def test_dummy_encrypted_handles(self): + filepath = os.path.join("tests", "files", "dummy_encrypted.gpg") + if not os.path.exists(filepath): + # Create the dummy file if it doesn't exist (useful if running independently) + with open(filepath, 'wb') as f: + f.write(b'\xD2\x05\x00\x00\x00\x00\x00') + + with open(filepath, "rb") as f: + content = f.read() + + f = File(filepath=filepath, contents=content) + p = PGP(f) + + # Verify metadata extraction + self.assertIn("encrypted_message", p.get_metadata()) + + # Verify handles() returns True because "encrypted_message" is present + self.assertTrue(p.handles()) + +if __name__ == '__main__': + unittest.main()