Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions sflock/unpack/pgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# See the file 'docs/LICENSE.txt' for copying permission.

import os
import shutil
import subprocess
import tempfile

Expand All @@ -16,6 +17,23 @@ class PGP(Unpacker):
exts = b".pgp", b".gpg"
magic = "PGP "

TAG_SESSION_KEY = 1
TAG_SIGNATURE = 2
TAG_SECRET_KEY = 5
TAG_PUBLIC_KEY = 6
TAG_PUBLIC_SUBKEY = 14
TAG_ENCRYPTED_DATA = 18

META_PUBLIC_KEY = "public_key"
META_PRIVATE_KEY = "private_key"
META_ENCRYPTED_MESSAGE = "encrypted_message"
META_SIGNATURE = "signature"

def handles(self):
if not super(PGP, self).handles():
return False
return "encrypted_message" in self.get_metadata()

def unpack(self, password: str = None, duplicates=None):
dirpath = tempfile.mkdtemp()

Expand All @@ -38,7 +56,7 @@ def unpack(self, password: str = None, duplicates=None):
stderr=subprocess.PIPE,
)

stdout, stderr = p.communicate(timeout=30)
_, _ = p.communicate(timeout=30)
return_code = p.returncode

except subprocess.TimeoutExpired:
Expand All @@ -51,12 +69,47 @@ def unpack(self, password: str = None, duplicates=None):
p.kill()
p.wait()
return_code = 1
finally:
if temporary and os.path.exists(filepath):
os.unlink(filepath)

ret = not return_code
if not ret:
if os.path.exists(dirpath):
shutil.rmtree(dirpath)
return []

if temporary:
os.unlink(filepath)

return self.process_directory(dirpath, duplicates, password)

def get_metadata(self):
ret = []
content = self.f.contents
if not content:
return ret

if b"BEGIN PGP PUBLIC KEY BLOCK" in content:
ret.append("public_key")
elif b"BEGIN PGP PRIVATE KEY BLOCK" in content:
ret.append("private_key")
elif b"BEGIN PGP MESSAGE" in content:
ret.append("encrypted_message")
elif b"BEGIN PGP SIGNATURE" in content:
ret.append("signature")
elif content[0] & 0x80:
# Binary analysis
tag = content[0]
if tag & 0x40: # New format
tag_type = tag & 0x3F
else: # Old format
tag_type = (tag >> 2) & 0xF

if tag_type in (self.TAG_PUBLIC_KEY, self.TAG_PUBLIC_SUBKEY):
ret.append(self.META_PUBLIC_KEY)
elif tag_type == self.TAG_SECRET_KEY:
ret.append(self.META_PRIVATE_KEY)
elif tag_type == self.TAG_SIGNATURE:
ret.append(self.META_SIGNATURE)
elif tag_type in (self.TAG_SESSION_KEY, self.TAG_ENCRYPTED_DATA):
ret.append(self.META_ENCRYPTED_MESSAGE)

return ret
Binary file added tests/files/dummy_encrypted.gpg
Binary file not shown.
54 changes: 54 additions & 0 deletions tests/test_pgp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import unittest
import os
from sflock.unpack.pgp import PGP
from sflock.abstracts import File

class TestPGPMetadata(unittest.TestCase):
def test_get_metadata(self):
# (test_name, contents, expected_metadata)
test_cases = [
("public_key_ascii", b"-----BEGIN PGP PUBLIC KEY BLOCK-----...", ["public_key"]),
("private_key_ascii", b"-----BEGIN PGP PRIVATE KEY BLOCK-----...", ["private_key"]),
("encrypted_ascii", b"-----BEGIN PGP MESSAGE-----...", ["encrypted_message"]),
("signature_ascii", b"-----BEGIN PGP SIGNATURE-----...", ["signature"]),
# Old format: 0x80 | (tag << 2)
("binary_public_key_old", b"\x98\x01", ["public_key"]), # Tag 6
("binary_private_key_old", b"\x94\x01", ["private_key"]), # Tag 5
("binary_signature_old", b"\x88\x01", ["signature"]), # Tag 2
("binary_encrypted_old", b"\x84\x01", ["encrypted_message"]),# Tag 1
# New format: 0xC0 | tag
("binary_public_key_new", b"\xC6\x01", ["public_key"]), # Tag 6
("binary_private_key_new", b"\xC5\x01", ["private_key"]), # Tag 5
("binary_signature_new", b"\xC2\x05", ["signature"]), # Tag 2
("binary_encrypted_new", b"\xD2\x05", ["encrypted_message"]),# Tag 18
("no_pgp_data", b"this is not pgp data", []),
("empty_content", b"", []),
]

for name, contents, expected in test_cases:
with self.subTest(name=name):
f = File(contents=contents)
p = PGP(f)
self.assertEqual(p.get_metadata(), expected)

def test_dummy_encrypted_handles(self):
filepath = os.path.join("tests", "files", "dummy_encrypted.gpg")
if not os.path.exists(filepath):
# Create the dummy file if it doesn't exist (useful if running independently)
with open(filepath, 'wb') as f:
f.write(b'\xD2\x05\x00\x00\x00\x00\x00')

with open(filepath, "rb") as f:
content = f.read()

f = File(filepath=filepath, contents=content)
p = PGP(f)

# Verify metadata extraction
self.assertIn("encrypted_message", p.get_metadata())

# Verify handles() returns True because "encrypted_message" is present
self.assertTrue(p.handles())

if __name__ == '__main__':
unittest.main()