Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b546f17
nest tests under `jose` module
bwhmather Dec 3, 2014
a085612
make it possible to run tests without nose
bwhmather Dec 3, 2014
42303f4
register test suite
bwhmather Dec 3, 2014
9d7226e
run tox tests using setup.py
bwhmather Dec 3, 2014
fe73198
tox should run tests on py34
bwhmather Dec 3, 2014
9cb2d06
run tests directly in travis
bwhmather Dec 3, 2014
e5134c0
test under python 3.4 on travis
bwhmather Dec 3, 2014
7a8f1ae
don't load requirements from requirements.txt
bwhmather Dec 3, 2014
f30286f
print function
bwhmather Dec 3, 2014
4e26e31
Exception.message is removed in python3. just use str
bwhmather Dec 3, 2014
e08e5d1
build header using `update`
bwhmather Dec 3, 2014
7f1da41
encode json plaintext as utf-8
bwhmather Dec 3, 2014
fb3c513
pad_pkcs7 with bytes not unicode code points
bwhmather Dec 3, 2014
a812c2d
fix str literals in jwe and jws hash string functions
bwhmather Dec 3, 2014
17c8e16
more references to deprecated Exception.message
bwhmather Dec 3, 2014
656632f
encode adata if necessary
bwhmather Dec 3, 2014
18ac1ee
wrap map results in a list for python 3 compatibility
bwhmather Dec 3, 2014
073916d
build sign header using update
bwhmather Dec 3, 2014
1ccfa7e
docstrings for b64encode_url and b64decode_url
bwhmather Dec 3, 2014
4dddb92
input to b64decode should always be a unicode str in the ascii range
bwhmather Dec 3, 2014
3bd35c9
input to b64encode_url should always be an encoded byte string
bwhmather Dec 3, 2014
c002f58
delete unneeded encode_safe function
bwhmather Dec 3, 2014
632a1bf
catch unencoded header string
bwhmather Dec 3, 2014
4e3a37c
add debugging type checks to b64encode_url and b64decode_url
bwhmather Dec 3, 2014
4ac5f35
b64encode bytestring in test_decrypt_invalid_compression_error
bwhmather Dec 3, 2014
67ed6fd
b64encode_url should return a string
bwhmather Dec 5, 2014
8401f0f
adata should be a byte string
bwhmather Dec 5, 2014
1fed594
hacky python3 fallback for unpad_pkcs7
bwhmather Dec 5, 2014
6315bbb
parse unicode before decoding header
bwhmather Dec 5, 2014
daf26bc
decode bytestring before calling json.loads
bwhmather Dec 5, 2014
29870ba
decode bytes string before calling json_decode
bwhmather Dec 5, 2014
b7ebf9e
just use range
bwhmather Dec 5, 2014
57c0b5e
add else statement for calls that should raise exceptions
bwhmather Dec 5, 2014
21f4d66
compact serialization should be a string not a byte string
bwhmather Dec 5, 2014
219733b
break up map
bwhmather Dec 5, 2014
630e693
base64 params are unicode strings
bwhmather Dec 5, 2014
34ec1d1
fix implementation of const_compare for both python versions
bwhmather Dec 5, 2014
c9b9da1
more decoding charset before json
bwhmather Dec 5, 2014
bdcae11
fix problems with python2 getting confused by unicode objects
bwhmather Dec 9, 2014
0a826a0
in python 3 b64decode raises a binascii.Error if padding is incorrect
bwhmather Dec 9, 2014
1790d2d
b64encode expects byte string
bwhmather Dec 9, 2014
c0b27a3
assertEquals is deprecated
bwhmather Dec 9, 2014
88cd331
use "python 2" instead of "py2" to make compatibility changes searche…
bwhmather Dec 9, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
language: python
python:
- "2.7"
- "2.7"
- "3.4"
install:
- pip install tox
script: tox
- "pip install -e ."
- "pip install flake8"
script:
- "python setup.py test"
- "flake8"
121 changes: 87 additions & 34 deletions jose.py → jose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datetime

from base64 import urlsafe_b64encode, urlsafe_b64decode
import binascii
from collections import namedtuple
from time import time

Expand All @@ -21,6 +22,13 @@
from Crypto.Signature import PKCS1_v1_5 as PKCS1_v1_5_SIG


try:
# python 2 compatibility
unicode
except NameError:
unicode = str


__all__ = ['encrypt', 'decrypt', 'sign', 'verify']


Expand Down Expand Up @@ -125,10 +133,17 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP',
:raises: :class:`~jose.Error` if there is an error producing the JWE
"""

header = dict((add_header or {}).items() + [
('enc', enc), ('alg', alg)])
header = {}

plaintext = json_encode(claims)
if add_header:
header.update(add_header)

header.update({
'enc': enc,
'alg': alg,
})

plaintext = json_encode(claims).encode('utf-8')

# compress (if required)
if compression is not None:
Expand All @@ -145,6 +160,10 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP',
iv = rng(AES.block_size)
encryption_key = rng((key_size // 8) + hash_mod.digest_size)

if not isinstance(adata, bytes):
# TODO this should probably just be an error
adata = adata.encode('utf-8')

ciphertext = cipher(plaintext, encryption_key[:-hash_mod.digest_size], iv)
hash = hash_fn(_jwe_hash_str(plaintext, iv, adata),
encryption_key[-hash_mod.digest_size:], hash_mod)
Expand All @@ -153,15 +172,16 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP',
(cipher, _), _ = JWA[alg]
encryption_key_ciphertext = cipher(encryption_key, jwk)

return JWE(*map(b64encode_url,
(json_encode(header),
return JWE(*list(map(b64encode_url,
(json_encode(header).encode('utf-8'),
encryption_key_ciphertext,
iv,
ciphertext,
auth_tag(hash))))
auth_tag(hash)))))


def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None):

def decrypt(jwe, jwk, adata=b'', validate_claims=True, expiry_seconds=None):
""" Decrypts a deserialized :class:`~jose.JWE`

:param jwe: An instance of :class:`~jose.JWE`
Expand All @@ -182,7 +202,8 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None):
"""
header, encryption_key_ciphertext, iv, ciphertext, tag = map(
b64decode_url, jwe)
header = json_decode(header)
header = json_decode(header.decode('utf-8'))


# decrypt cek
(_, decipher), _ = JWA[header['alg']]
Expand All @@ -191,6 +212,10 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None):
# decrypt body
((_, decipher), _), ((hash_fn, _), mod) = JWA[header['enc']]

if not isinstance(adata, bytes):
# TODO this should probably just be an error
adata = adata.encode('utf-8')

plaintext = decipher(ciphertext, encryption_key[:-mod.digest_size], iv)
hash = hash_fn(_jwe_hash_str(plaintext, iv, adata),
encryption_key[-mod.digest_size:], mod=mod)
Expand All @@ -207,7 +232,7 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None):

plaintext = decompress(plaintext)

claims = json_decode(plaintext)
claims = json_decode(plaintext.decode('utf-8'))
_validate(claims, validate_claims, expiry_seconds)

return JWT(header, claims)
Expand All @@ -227,8 +252,17 @@ def sign(claims, jwk, add_header=None, alg='HS256'):
"""
(hash_fn, _), mod = JWA[alg]

header = dict((add_header or {}).items() + [('alg', alg)])
header, payload = map(b64encode_url, map(json_encode, (header, claims)))
header = {}

if add_header:
header.update(add_header)

header.update({
'alg': alg,
})

header = b64encode_url(json_encode(header).encode('utf-8'))
payload = b64encode_url(json_encode(claims).encode('utf-8'))

sig = b64encode_url(hash_fn(_jws_hash_str(header, payload), jwk['k'],
mod=mod))
Expand All @@ -254,14 +288,14 @@ def verify(jws, jwk, validate_claims=True, expiry_seconds=None):
:raises: :class:`~jose.Error` if there is an error decrypting the JWE
"""
header, payload, sig = map(b64decode_url, jws)
header = json_decode(header)
header = json_decode(header.decode('utf-8'))
(_, verify_fn), mod = JWA[header['alg']]

if not verify_fn(_jws_hash_str(jws.header, jws.payload),
jwk['k'], sig, mod=mod):
raise Error('Mismatched signatures')

claims = json_decode(b64decode_url(jws.payload))
claims = json_decode(b64decode_url(jws.payload).decode('utf-8'))
_validate(claims, validate_claims, expiry_seconds)

return JWT(header, claims)
Expand All @@ -270,28 +304,35 @@ def verify(jws, jwk, validate_claims=True, expiry_seconds=None):
def b64decode_url(istr):
""" JWT Tokens may be truncated without the usual trailing padding '='
symbols. Compensate by padding to the nearest 4 bytes.

:param istr: A unicode string to decode
:returns: The byte string represented by `istr`
"""
istr = encode_safe(istr)
# unicode check for python 2 compatibility
if not isinstance(istr, (str, unicode)):
raise ValueError("expected string, got %r" % type(istr))

# required for python 2 as urlsafe_b64decode does not like unicode objects
# safe as b64 encoded string should be only ascii anyway
istr = str(istr)

try:
return urlsafe_b64decode(istr + '=' * (4 - (len(istr) % 4)))
except TypeError as e:
except (TypeError, binascii.Error) as e:
raise Error('Unable to decode base64: %s' % (e))


def b64encode_url(istr):
""" JWT Tokens may be truncated without the usual trailing padding '='
symbols. Compensate by padding to the nearest 4 bytes.
"""
return urlsafe_b64encode(encode_safe(istr)).rstrip('=')


def encode_safe(istr, encoding='utf8'):
try:
return istr.encode(encoding)
except UnicodeDecodeError:
# this will fail if istr is already encoded
pass
return istr
:param istr: a byte string to encode
:returns: The base64 representation of the input byte string as a regular
`str` object
"""
if not isinstance(istr, bytes):
raise Exception("expected bytestring")
return urlsafe_b64encode(istr).rstrip(b'=').decode('ascii')


def auth_tag(hmac):
Expand All @@ -302,11 +343,17 @@ def auth_tag(hmac):

def pad_pkcs7(s):
sz = AES.block_size - (len(s) % AES.block_size)
return s + (chr(sz) * sz)
# TODO would be cleaner to do `bytes(sz) * sz` but python 2 behaves
# strangely
return s + (chr(sz) * sz).encode('ascii')


def unpad_pkcs7(s):
return s[:-ord(s[-1])]
try:
return s[:-ord(s[-1])]
# Python 3 compatibility
except TypeError:
return s[:-s[-1]]


def encrypt_oaep(plaintext, jwk):
Expand Down Expand Up @@ -361,9 +408,15 @@ def const_compare(stra, strb):
if len(stra) != len(strb):
return False

try:
# python 2 compatibility
orda, ordb = list(map(ord, stra)), list(map(ord, strb))
except TypeError:
orda, ordb = stra, strb

res = 0
for a, b in zip(stra, strb):
res |= ord(a) ^ ord(b)
for a, b in zip(orda, ordb):
res |= a ^ b
return res == 0


Expand Down Expand Up @@ -491,19 +544,19 @@ def _validate(claims, validate_claims, expiry_seconds):
_check_not_before(now, not_before)


def _jwe_hash_str(plaintext, iv, adata=''):
def _jwe_hash_str(plaintext, iv, adata=b''):
# http://tools.ietf.org/html/
# draft-ietf-jose-json-web-algorithms-24#section-5.2.2.1
return '.'.join((adata, iv, plaintext, str(len(adata))))
return b'.'.join((adata, iv, plaintext, bytes(len(adata))))


def _jws_hash_str(header, claims):
return '.'.join((header, claims))
return b'.'.join((header.encode('ascii'), claims.encode('ascii')))


def cli_decrypt(jwt, key):
print decrypt(deserialize_compact(jwt), {'k':key},
validate_claims=False)
print(decrypt(deserialize_compact(jwt), {'k':key},
validate_claims=False))


def _cli():
Expand Down
Loading