diff --git a/CHANGELOG.md b/CHANGELOG.md index 45420a8..5d8450e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [1.1.1] - 2023-06-25 +### Changed +- Uses `pycryptodome` for keccak digests instead of `pysha3`, which is incompatible with Python 3.11+. + +## [1.1.0] - 2022-01-07 +### Added +- LTC segwit address support + +### Fixed +- Fix wheel build by replacing crc16 with fastcrc + ## [1.0.1] - 2018-04-16 ### Added - Start using zope.interfaces for all objects. diff --git a/Makefile b/Makefile index 488d09d..cd42c6c 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ all: help help: @echo 'NAME' - @echo ' Makefile for coinaddrng' + @echo ' Makefile for coinaddrvalidor' @echo '' @echo 'SYNOPSIS' @echo ' make [options]' @@ -12,16 +12,16 @@ help: @echo '' @echo ' dist builds both binary and source distribution' @echo '' - @echo ' install installs blockapi library' + @echo ' install installs coinaddrvalidor library' @echo '' - @echo ' uninstall uninstalls blockapi library' + @echo ' uninstall uninstalls coinaddrvalidor library' install: pip3 install --upgrade . uninstall: - pip3 uninstall -y coinaddrng + pip3 uninstall -y coinaddrvalidor dist: diff --git a/README.md b/README.md index cc368f6..681c44f 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ -# CoinAddrNG -[![Github Repo](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://github.com/crypkit/coinaddrng) [![Pypi Version](https://img.shields.io/pypi/v/coinaddrng.svg)](https://pypi.python.org/pypi/coinaddrng) [![Pypi License](https://img.shields.io/pypi/l/coinaddrng.svg)](https://pypi.python.org/pypi/coinaddrng) [![Pypi Wheel](https://img.shields.io/pypi/wheel/coinaddrng.svg)](https://pypi.python.org/pypi/coinaddrng) [![Pypi Versions](https://img.shields.io/pypi/pyversions/coinaddrng.svg)](https://pypi.python.org/pypi/coinaddrng) - +# CoinAddrValidator +[![Github Repo](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://github.com/nobitex/coinaddrvalidator) [![Pypi Version](https://img.shields.io/pypi/v/coinaddrvalidator.svg)](https://pypi.python.org/pypi/coinaddrvalidator) [![Pypi License](https://img.shields.io/pypi/l/coinaddrvalidator.svg)](https://pypi.python.org/pypi/coinaddrvalidator) [![Pypi Wheel](https://img.shields.io/pypi/wheel/coinaddrvalidator.svg)](https://pypi.python.org/pypi/coinaddrvalidator) [![Pypi Versions](https://img.shields.io/pypi/pyversions/coinaddrvalidator.svg)](https://pypi.python.org/pypi/coinaddrvalidator) ## Maintainer +Mohammad Aghamir - *Maintainer of this repository* - [coinaddrvalidator](https://github.com/nobitex/coinaddrvalidator) + +## Fork Maintainer Devmons s.r.o. - *Maintainer of this fork* - [coinaddrng](https://github.com/crypkit/coinaddrng) See also the list of [contributors](https://github.com/crypkit/coinaddrng/contributors) who participated in this project. @@ -46,13 +48,13 @@ A cryptocurrency address inspection/validation library for python. ## Installation ```shell -pip3 install coinaddrng +pip3 install coinaddrvalidator ``` ## Usage ```python ->>> import coinaddrng ->>> coinaddrng.validate('btc', b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT') +>>> import coinaddrvalidator +>>> coinaddrvalidator.validate('btc', b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT') ValidationResult(name='bitcoin', ticker='btc', address=b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT', valid=True, network='main', is_extended=False, address_type='address') ``` @@ -64,7 +66,7 @@ format, which is returned as address_type. If there's none, 'address' is being r #### Currencies To add a new currency, simply instantiate a new `coinaddr.currency.Currency` class. It will be automatically registered. ```python -from coinaddrng import Currency +from coinaddrvalidator import Currency Currency('decred', ticker='dcr', validator='DecredCheck', networks=dict( main=(0x073f,0x071a,0x02fda926), test=(0x0f21,0x0efc,0x043587d1)), diff --git a/coinaddrng/interfaces.py b/coinaddrng/interfaces.py deleted file mode 100644 index 9872bcd..0000000 --- a/coinaddrng/interfaces.py +++ /dev/null @@ -1,105 +0,0 @@ -# pylint: disable=inherit-non-class,no-self-argument,no-method-argument -# pylint: disable=unexpected-special-method-signature,arguments-differ - -""" -:mod:`coinaddr.interfaces` -~~~~~~~~~~~~~~~~~~~~~~~~ - -Various zope compatible interfaces for the coinaddr package. -""" - -from zope.interface import Interface, Attribute - - -class INamedInstanceContainer(Interface): - """Contains all currencies instantiated.""" - - instances = Attribute('Mapping of instance.name -> instance') - - def __getitem__(name): - """Return the named instance""" - - def __setitem__(name, obj): - """Add the named instance to the mapping of instances""" - - def __delitem__(name, obj): - """Add the named instance to the mapping of instances""" - - def __contains__(name): - """Return true if we contain the named instance""" - - def __iter__(): - """Return an iterable, iterating all instances""" - - def get(name, default=None): - """Return the named instance if we contain it, else default""" - - -class INamedSubclassContainer(Interface): - """Contains a weakvaluedict of subclasses.""" - - subclasses = Attribute('Mapping of subclass.name -> subclass') - - def __getitem__(name): - """Return the named subclass""" - - def __setitem__(name, obj): - """Add the named subclass to the mapping of subclasses""" - - def __delitem__(name, obj): - """Add the named subclass to the mapping of subclasses""" - - def __contains__(name): - """Return true if we contain the named subclass""" - - def __iter__(): - """Return an iterable, iterating all subclasses""" - - def get(name, default=None): - """Return the named subclass if we contain it, else default""" - - -class ICurrency(Interface): - """A cryptocurrency address specification.""" - - name = Attribute('Name of currency') - ticker = Attribute('Ticker symbol for currency') - validator = Attribute('Validator name for validation') - networks = Attribute('The networks and version bytes for those networks') - charset = Attribute('For base58Check based currencies, custom charset.') - - -class IValidator(Interface): - """A cryptocurrency address validator.""" - - name = Attribute('Name of validator') - network = Attribute('Network name of address being validated') - - def validate(): - """Validate the address type, True if valid, else False.""" - - -class IValidationRequest(Interface): - """Contains the data and helpers for a given validation request.""" - - currency = Attribute('The currency name or ticker being validated') - address = Attribute('The address to be validated') - extras = Attribute('Any extra attributes to be passed to decoder, etc') - networks = Attribute( - 'Concatenated list of all network versions for currency') - - def execute(): - """Executes the request and returns a ValidationResult object""" - - -class IValidationResult(Interface): - """Represents all data for a validation result.""" - - name = Attribute('Name of currency for address validated') - ticker = Attribute('Ticker of currency for address validated') - address = Attribute('The address that was validated') - valid = Attribute('Boolean representing whether the address is valid') - network = Attribute( - 'Name of network the address belongs to if applicable') - is_extended = Attribute('boolean representing whether the address is extended key or not') - diff --git a/coinaddrng/validation.py b/coinaddrng/validation.py deleted file mode 100644 index 01c18c8..0000000 --- a/coinaddrng/validation.py +++ /dev/null @@ -1,830 +0,0 @@ -# pylint: disable=no-member - -""" -:mod:`coinaddr.validation` -~~~~~~~~~~~~~~~~~~~~~~~~ - -Various validation machinery for validating cryptocurrency addresses. -""" - -import re -from hashlib import sha256, blake2b -import functools -import operator -from typing import Optional - -from zope.interface import implementer, provider -import attr -import sha3 -import base58check -import math -from binascii import unhexlify, crc32 -import base64 -import crc16 -from blake256 import blake256 -import cbor -import bech32 -import groestlcoin_hash2 - -from .interfaces import ( - INamedSubclassContainer, IValidator, IValidationRequest, - IValidationResult, ICurrency - ) -from .base import NamedSubclassContainerBase -from . import currency - - -@provider(INamedSubclassContainer) -class Validators(metaclass=NamedSubclassContainerBase): - """Container for all validators.""" - - -class ValidatorMeta(type): - """Register validator classes on Validators.validators.""" - - def __new__(mcs, cls, bases, attrs): - new = type.__new__(mcs, cls, bases, attrs) - if new.name: - Validators[new.name] = new - return new - - -@attr.s(cmp=False, slots=True) -class ValidatorBase(metaclass=ValidatorMeta): - """Validator Interface.""" - - name = None - - request = attr.ib( - type='ValidationRequest', - validator=[ - lambda i, a, v: type(v).__name__ == 'ValidationRequest', - attr.validators.provides(IValidationRequest) - ] - ) - - def validate(self): - """Validate the address type, return True if valid, else False.""" - - def validate_extended(self): - """Validate the extended keys, return True if valid, else False.""" - - @property - def network(self): - """Return the network derived from the network version bytes.""" - - @property - def address_type(self): - """Return the address type derived from the network version bytes.""" - return 'address' - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class GRSValidator(ValidatorBase): - - name = 'GRSCheck' - - def validate(self): - # groestlcoin address is 34 bytes long - if len(self.request.address) != 34: - return False - try: - decoded = base58check.b58decode(self.request.address) - except ValueError: - return False - - hash_str = decoded[0:21] - checksum = groestlcoin_hash2.groestl_hash(hash_str)[:4] - expected_checksum = decoded[21:] - - if checksum != expected_checksum: - return False - - return True - - def validate_extended(self): - return False - - @property - def network(self): - for name, networks in self.request.currency.networks.items(): - for netw in networks: - if self.request.address.startswith(netw.encode('utf-8')): - return name - - return "" - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class Bech32CheckValidator(ValidatorBase): - - name = 'Bech32Check' - - def validate(self): - decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) - data = decoded_address[1] - - if self.network == "": - return False - - if data is None: - return False - - return True - - def validate_extended(self): - return False - - @property - def network(self): - decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) - hrp = decoded_address[0] - - for name, networks in self.request.currency.networks.items(): - for netw in networks: - if hrp == netw: - return name - - return "" - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class CosmosValidator(ValidatorBase): - - name = 'CosmosCheck' - hrp_table = ("cosmos","cosmospub","cosmosvalcons","cosmosvalconspub","cosmosvaloper","cosmosvaloperpub") - - def validate(self): - decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) - hrp = decoded_address[0] - data = decoded_address[1] - - if hrp not in self.hrp_table: - return False - - if data is None: - return False - - """ - test = [] - for i in data: - test.append(hex(i)) - - print(test) - - test = [] - converted = bech32.convertbits(decoded_address[1], 5, 8, False) - for i in converted: - test.append(hex(i)) - - print(test) - """ - - return True - - - def validate_extended(self): - return False - - @property - def network(self): - return "" - - @property - def address_type(self): - if len(self.request.address) == 0: - return "" - - decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) - hrp = decoded_address[0] - - if hrp not in self.hrp_table: - return "" - - return hrp - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class Base58CheckValidator(ValidatorBase): - """Validates Base58Check based cryptocurrency addresses.""" - - name = 'Base58Check' - # base58 alphabet representation - dec_digit_to_base58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" - base58_digit_to_dec = { b58:dec for dec,b58 in enumerate(dec_digit_to_base58) } - - - def validate(self): - """extended keys have their own validation""" - if len(self.request.address) == 111: - return self.validate_extended() - - """Validate the address.""" - if 25 > len(self.request.address) > 35: - return False - - try: - abytes = base58check.b58decode( - self.request.address, **self.request.extras) - except ValueError: - return False - - if self.network == '': - return False - - checksum = sha256(sha256(abytes[:-4]).digest()).digest()[:4] - if abytes[-4:] != checksum: - return False - - return self.request.address == base58check.b58encode( - abytes, **self.request.extras) - - def validate_extended(self,checksum_algo='sha256'): - if len(self.request.address) != 111: - return False - - if self.network == '': - return False - - # strip leading "zeros" (the "1" digit with base58) - base58_stripped = self.request.address.decode('utf-8').lstrip("1") - # convert base58 to decimal - int_rep = 0 - for base58_digit in base58_stripped: - int_rep *= 58 - try: - int_rep += self.base58_digit_to_dec[base58_digit] - except KeyError: - # not a valid base58 digit -> invalid address - return False - - # encode it to base64 - hex_rep = "{:X}".format(int_rep) - # if the length is odd, add leading zero (needed for b16decode) - if len(hex_rep) % 2 == 1: - hex_rep = "0" + hex_rep - # decode it into a binary string, padded with zeros - # 72 bytes (extended key size) + 4 bytes (prefix version bytes) - all_bytes = base64.b16decode(hex_rep).rjust(82, b"\0") - - # count leading zeros - zero_count = next(zeros for zeros,byte in enumerate(all_bytes) if byte != 0) - # compare it with the number of leading zeros lstripped at the beginning - if len(self.request.address.decode('utf-8')) - len(base58_stripped) != zero_count: - return False - - if checksum_algo == 'blake256': - checksum = blake256.blake_hash(blake256.blake_hash(all_bytes[:-4]))[:4] - elif checksum_algo == 'sha256': - checksum = sha256(sha256(all_bytes[:-4]).digest()).digest()[:4] - else: - return False - - - # checking if the checksum is valid - if checksum != all_bytes[-4:]: - return False - - return True - - @property - def network(self): - """Return network derived from network version bytes.""" - abytes = base58check.b58decode( - self.request.address, **self.request.extras) - - nbyte = abytes[0] - for name, networks in self.request.currency.networks.items(): - if nbyte in networks: - return name - return '' - - @property - def address_type(self): - """Return address type derived from network version bytes.""" - if len(self.request.address) == 0: - return '' - try: - abytes = base58check.b58decode( - self.request.address, **self.request.extras) - except ValueError: - return '' - - for name, networks in self.request.currency.address_types.items(): - for netw in networks: - if netw != 0: - # count the prefix length in bytes - prefixlen = math.ceil(math.floor((math.log(netw) / math.log(2)) + 1) / 8) - else: - prefixlen = 1 - address_prefix = [x for x in bytearray(abytes[:prefixlen])] - if prefixtodec(address_prefix) == netw: - return name - - if len(self.request.currency.address_types.items()) == 0: - return 'address' - else: - return '' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class DecredValidator(Base58CheckValidator): - """Validates Decred cryptocurrency addresses.""" - - name = 'DecredCheck' - - - def validate(self): - if len(self.request.address) == 111: - return self.validate_extended(checksum_algo='blake256') - - try: - decoded_address = base58check.b58decode(self.request.address) - except ValueError: - return False - - # decoded address has to be 26 bytes long - if len(decoded_address) != 26: - return False - - # original one has to start with D,T,S or R - if not self.request.address.startswith((b'D', b'T', b'S', b'R')): - return False - - expected_checksum = decoded_address[-4:] - - version_bytes = int.from_bytes(decoded_address[:2],byteorder='big') - - if self.network == '': - return False - - checksum = blake256.blake_hash(blake256.blake_hash(decoded_address[:-4]))[:4] - - # double blake256 checksum needs to be equal with the expected checksum - if checksum != expected_checksum: - return False - - return True - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class CardanoValidator(Base58CheckValidator): - """Validates Cardano cryptocurrency addresses.""" - - name = 'CardanoCheck' - - - def validate(self): - try: - decoded_address = base58check.b58decode(self.request.address) - except ValueError: - return False - - - if self.network == '': - return False - - decoded_address = cbor.loads(decoded_address) - tagged_address = decoded_address[0] - expected_checksum = decoded_address[1] - checksum = crc32(tagged_address.value) - - if checksum != expected_checksum: - return False - - return True - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class EosValidator(ValidatorBase): - """Validates EOS cryptocurrency addresses.""" - - name = 'EOS' - - def validate(self): - if len(self.request.address) != 12: - return False - eos_pattern = re.compile('^[a-z]{1}[a-z1-5.]{10}[a-z1-5]{1}$') - if eos_pattern.match(self.request.address.decode('utf-8')) == None: - return False - return True - - def validate_extended(self): - return False - - @property - def network(self): - return '' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class StellarValidator(ValidatorBase): - """Validates Stellar cryptocurrency addresses.""" - - name = 'Stellar' - - def validate(self): - try: - decoded_address = base64.b32decode(self.request.address) - except: - return False - - version_byte = decoded_address[0] - payload = decoded_address[0:-2] - expected_checksum = int.from_bytes(decoded_address[-2:], byteorder='little') - - if version_byte != 6 << 3: # ed25519PublicKey - return False - - checksum = crc16.crc16xmodem(payload) - - if checksum != expected_checksum: - return False - - return True - - def validate_extended(self): - return False - - @property - def network(self): - return '' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class TerraMoneyValidator(ValidatorBase): - """Validates Terra Money cryptocurrency addresses.""" - - name = 'TerraMoney' - - def validate(self): - - # Each address has to have 44 characters, first 5 are "terra" - if len(self.request.address) != 44: - return False - - if self.request.address[:5] != b'terra': - return False - - if not self.request.address.decode('utf-8').isalnum(): - return False - - return True - - def validate_extended(self): - return False - - @property - def network(self): - return '' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class EthereumValidator(ValidatorBase): - """Validates ethereum based crytocurrency addresses.""" - - name = 'Ethereum' - non_checksummed_patterns = ( - re.compile("^(0x)?[0-9a-f]{40}$"), re.compile("^(0x)?[0-9A-F]{40}$") - ) - - def validate(self): - """Validate the address.""" - address = self.request.address.decode() - - if any(bool(pat.match(address)) - for pat in self.non_checksummed_patterns): - return True - - addr = address[2:] if address.startswith('0x') else address - - # Ethereum address has to contain exactly 40 chars (20-bytes) - if len(addr.encode('utf-8')) != 40: - return False - - # Ethereum address is generated by keccak algorithm and has to - # hexadecimal - addr_hash = sha3.keccak_256(addr.lower().encode('ascii')).hexdigest() - for i, letter in enumerate(addr): - if any([ - int(addr_hash[i], 16) >= 8 and letter.upper() != letter, - int(addr_hash[i], 16) < 8 and letter.lower() != letter - ]): - return False - return True - - def validate_extended(self): - return False - - #def validate(self): - # """Validate the address.""" - # address = self.request.address.decode() - # if any(bool(pat.match(address)) - # for pat in self.non_checksummed_patterns): - # return True - # addr = address.lstrip('0x') - # addr_hash = sha3.keccak_256(addr.lower().encode('ascii')).hexdigest() - # for i in range(0, len(addr)): - # if any([ - # int(addr_hash[i], 16) > 7 and addr[i].upper() != addr[i], - # int(addr_hash[i], 16) <= 7 and addr[i].lower() != addr[i] - # ]): - # return False - # return True - - @property - def network(self): - """Return network derived from network version bytes.""" - return 'both' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class BitcoinBasedValidator(ValidatorBase): - """Validates bitcoin based crytocurrency addresses.""" - - name = 'BitcoinBasedCheck' - - @property - def base58_validator(self): - return Base58CheckValidator(self.request) - - @property - def bech32_validator(self): - return Bech32CheckValidator(self.request) - - def validate(self): - base58_res = self.base58_validator.validate() - if base58_res: - return True - - bech32_res = self.bech32_validator.validate() - if bech32_res: - return True - - return False - - def validate_extended(self): - base58_res = self.base58_validator.validate_extended() - if base58_res: - return True - - bech32_res = self.bech32_validator.validate_extended() - if bech32_res: - return True - - return False - - @property - def network(self): - base58_res = self.base58_validator.network - if base58_res: - return base58_res - - bech32_res = self.bech32_validator.network - return bech32_res - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidationRequest) -class ValidationRequest: - """Contain the data and helpers as an immutable request object.""" - - currency = attr.ib( - type=currency.Currency, - converter=currency.Currencies.get, - validator=[ - attr.validators.instance_of(currency.Currency), - attr.validators.provides(ICurrency) - ]) - address = attr.ib( - type=bytes, - converter=lambda a: a if isinstance(a, bytes) else a.encode('ascii'), - validator=attr.validators.instance_of(bytes)) - - @property - def extras(self): - """Extra arguments for passing to decoder, etc.""" - extras = dict() - if self.currency.charset: - extras.setdefault('charset', self.currency.charset) - return extras - - @property - def networks(self): - """Concatenated list of all version bytes for currency.""" - networks = tuple(self.currency.networks.values()) - return functools.reduce(operator.concat, networks) - - @property - def address_types(self): - address_types = tuple(self.currency.address_types.values()) - return functools.reduce(operator.concat, address_types) - - def execute(self): - """Execute this request and return the result.""" - validator = Validators.get(self.currency.validator)(self) - - valid = False - network = '' - is_extended = False - try: - valid = validator.validate() - network = validator.network - is_extended = validator.validate_extended() - except: - pass - - return ValidationResult( - name=self.currency.name, - ticker=self.currency.ticker, - address=self.address, - valid=valid, - network=network, - address_type=validator.address_type, - is_extended=is_extended - ) - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidationResult) -class ValidationResult: - """Contains an immutable representation of the validation result.""" - - name = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - ticker = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - address = attr.ib( - type=bytes, - validator=attr.validators.instance_of(bytes)) - valid = attr.ib( - type=bool, - validator=attr.validators.instance_of(bool)) - network = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - is_extended = attr.ib( - type=bool, - validator=attr.validators.instance_of(bool)) - address_type = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - - def __bool__(self): - return self.valid - - -def validate(currency_name, address): - """Validate the given address according to currency type. - - This is the main entrypoint for using this library. - - :param currency_name str: The name or ticker code of the cryptocurrency. - :param address (bytes, str): The crytocurrency address to validate. - :return: a populated ValidationResult object - :rtype: :inst:`ValidationResult` - - Usage:: - - >>> import coinaddr - >>> coinaddr.validate('btc', b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT') - ValidationResult(name='bitcoin', ticker='btc', - ... address=b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT', - ... valid=True, network='main') - - """ - - tickers = [currency.Currencies.instances[curr].ticker for curr in currency.Currencies.instances] - currencies = [currency.Currencies.instances[curr].name for curr in currency.Currencies.instances] - - if currency_name in tickers or currency_name in currencies: - request = ValidationRequest(currency_name, address) - return request.execute() - else: - return ValidationResult( - name='', - ticker=currency_name, - address=bytes(address, 'utf-8'), - valid=True, - network='', - address_type='address', - is_extended=False - ) - -def prefixtodec(prefix): - total = 0 - multiplier = 256 - for i in range(2,len(prefix)+1): - total += prefix[-i]*multiplier - multiplier *= 256 - return total+prefix[-1] - - -@attr.s(frozen=True, slots=True, auto_attribs=True) -class SS58Address: - format: int - length: int - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class SS58Validator(ValidatorBase): - - name = 'SS58Check' - valid_ss58_format = None - - def validate(self): - try: - self._ss58_decode(self.request.address, valid_ss58_format=self.valid_ss58_format) - except ValueError: - return False - else: - return True - - @staticmethod - def _decode_ss58_address_format(address: bytes, valid_ss58_format: Optional[int]) -> SS58Address: - if address[0] & 0b0100_0000: - format_length = 2 - ss58_format = ((address[0] & 0b0011_1111) << 2) | (address[1] >> 6) | \ - ((address[1] & 0b0011_1111) << 8) - else: - format_length = 1 - ss58_format = address[0] - - if ss58_format in [46, 47]: - raise ValueError(f"{ss58_format} is a reserved SS58 format") - - if valid_ss58_format is not None and ss58_format != valid_ss58_format: - raise ValueError("Invalid SS58 format") - - return SS58Address(format=ss58_format, length=format_length) - - @staticmethod - def _get_checksum_length(decoded_base58_len: int, ss58_address: SS58Address) -> int: - if decoded_base58_len in (3, 4, 6, 10): - return 1 - elif decoded_base58_len in (5, 7, 11, 34 + ss58_address.length, 35 + ss58_address.length): - return 2 - elif decoded_base58_len in (8, 12): - return 3 - elif decoded_base58_len in (9, 13): - return 4 - elif decoded_base58_len == 14: - return 5 - elif decoded_base58_len == 15: - return 6 - elif decoded_base58_len == 16: - return 7 - elif decoded_base58_len == 17: - return 8 - else: - raise ValueError("Invalid address length") - - # https://github.com/paritytech/substrate/wiki/External-Address-Format-(SS58) - def _ss58_decode(self, address: bytes, valid_ss58_format: Optional[int] = None) -> str: - decoded_base58 = base58check.b58decode(address) - - ss58_address = self._decode_ss58_address_format(decoded_base58, valid_ss58_format) - - # Determine checksum length according to length of address string - checksum_length = self._get_checksum_length(len(decoded_base58), ss58_address) - - checksum = blake2b(b'SS58PRE' + decoded_base58[:-checksum_length]).digest() - - if checksum[0:checksum_length] != decoded_base58[-checksum_length:]: - raise ValueError("Invalid checksum") - - return decoded_base58[ss58_address.length:len(decoded_base58) - checksum_length].hex() - - def validate_extended(self): - return True - - @property - def network(self): - return '' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class PolkadotValidator(SS58Validator): - - name = 'PolkadotCheck' - valid_ss58_format = 0 - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class KusamaValidator(SS58Validator): - - name = 'KusamaCheck' - valid_ss58_format = 2 - diff --git a/coinaddrng/__init__.py b/coinaddrvalidator/__init__.py similarity index 96% rename from coinaddrng/__init__.py rename to coinaddrvalidator/__init__.py index 1c86817..9ba56fb 100644 --- a/coinaddrng/__init__.py +++ b/coinaddrvalidator/__init__.py @@ -16,7 +16,7 @@ :license: MIT, see LICENSE for more details. """ -__version__ = '1.0.30' +__version__ = '1.2.3' from . import interfaces, currency, validation from .validation import validate diff --git a/coinaddrvalidator/attrs_zope.py b/coinaddrvalidator/attrs_zope.py new file mode 100644 index 0000000..eb46129 --- /dev/null +++ b/coinaddrvalidator/attrs_zope.py @@ -0,0 +1,36 @@ +import attrs + +@attrs.define(repr=False) +class _ProvidesValidator: + interface = attrs.field() + + def __call__(self, inst, attr, value): + """ + We use a callable class to be able to change the ``__repr__``. + """ + if not self.interface.providedBy(value): + msg = f"'{attr.name}' must provide {self.interface!r} which {value!r} doesn't." + raise TypeError( + msg, + attr, + self.interface, + value, + ) + + def __repr__(self): + return f"" + + +def provides(interface): + """ + A validator that raises a `TypeError` if the initializer is called + with an object that does not provide the requested *interface* (checks are + performed using ``interface.providedBy(value)`` (see `zope.interface + `_). + :param interface: The interface to check for. + :type interface: ``zope.interface.Interface`` + :raises TypeError: With a human readable error message, the attribute + (of type `attrs.Attribute`), the expected interface, and the + value it got. + """ + return _ProvidesValidator(interface) \ No newline at end of file diff --git a/coinaddrng/base.py b/coinaddrvalidator/base.py similarity index 100% rename from coinaddrng/base.py rename to coinaddrvalidator/base.py diff --git a/coinaddrng/currency.py b/coinaddrvalidator/currency.py similarity index 88% rename from coinaddrng/currency.py rename to coinaddrvalidator/currency.py index 72de7d2..f7851e2 100644 --- a/coinaddrng/currency.py +++ b/coinaddrvalidator/currency.py @@ -6,18 +6,17 @@ """ import attr -from zope.interface import implementer, provider +from typing import Dict, Any, Optional from .interfaces import ICurrency, INamedInstanceContainer from .base import NamedInstanceContainerBase -@provider(INamedInstanceContainer) class Currencies(metaclass=NamedInstanceContainerBase): """Container for all currencies.""" @classmethod - def get(cls, name, default=None): + def get(cls, name: str, default: Any = None) -> Any: """Return currency object with matching name or ticker.""" for inst in cls.instances.values(): if name in (inst.name, inst.ticker): @@ -29,36 +28,26 @@ def get(cls, name, default=None): class CurrencyMeta(type): """Register currency classes on Currencies.currencies.""" - def __call__(cls, *args, **kwargs): + def __call__(cls, *args: Any, **kwargs: Any) -> Any: inst = super(CurrencyMeta, cls).__call__(*args, **kwargs) Currencies[inst.name] = inst return inst -@implementer(ICurrency) @attr.s(frozen=True, slots=True, cmp=False) class Currency(metaclass=CurrencyMeta): """An immutable representation of a cryptocurrency specification.""" - name = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - ticker = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - validator = attr.ib( - type='str', - validator=attr.validators.instance_of(str)) + name = attr.ib(validator=attr.validators.instance_of(str)) + ticker = attr.ib(validator=attr.validators.instance_of(str)) + validator = attr.ib(validator=attr.validators.instance_of(str)) networks = attr.ib( - type=dict, validator=attr.validators.optional(attr.validators.instance_of(dict)), default=attr.Factory(dict)) address_types = attr.ib( - type=dict, validator=attr.validators.optional(attr.validators.instance_of(dict)), default=attr.Factory(dict)) charset = attr.ib( - type=bytes, validator=attr.validators.optional(attr.validators.instance_of(bytes)), default=None) @@ -78,11 +67,12 @@ class Currency(metaclass=CurrencyMeta): Currency('bitcoin-cash', ticker='bch', validator='Base58Check', networks=dict( main=(0x00, 0x05), test=(0x6f, 0xc4))) -Currency('litecoin', ticker='ltc', validator='Base58Check', +Currency('litecoin', ticker='ltc', validator='BitcoinBasedCheck', networks=dict( main=(0x30, 0x05, 0x32, 0x019da462, 0x01b26ef6, - 0x488B21E, 0x49D7CB2, 0x4B24746, 0x295B43F, 0x2AA7ED3), - test=(0x6f, 0xc4, 0x0436f6e1))) + 0x488B21E, 0x49D7CB2, 0x4B24746, 0x295B43F, + 0x2AA7ED3, 'ltc'), + test=(0x6f, 0xc4, 0x0436f6e1, 'tltc'))) Currency('dogecoin', ticker='doge', validator='Base58Check', networks=dict( main=(0x1e, 0x16), test=(0x71, 0xc4))) diff --git a/coinaddrvalidator/encoding/__init__.py b/coinaddrvalidator/encoding/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/coinaddrvalidator/encoding/crc16.py b/coinaddrvalidator/encoding/crc16.py new file mode 100644 index 0000000..6545659 --- /dev/null +++ b/coinaddrvalidator/encoding/crc16.py @@ -0,0 +1,318 @@ +"""Pure python library for calculating CRC16""" + +############################################################################## +# +# Copyright (C) Gennady Trafimenkov, 2011 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . +# +############################################################################## + + +# table for calculating CRC +# this particular table was generated using pycrc v0.7.6, http://www.tty1.net/pycrc/ +# using the configuration: +# * Width = 16 +# * Poly = 0x1021 +# * XorIn = 0x0000 +# * ReflectIn = False +# * XorOut = 0x0000 +# * ReflectOut = False +# * Algorithm = table-driven +# by following command: +# python pycrc.py --model xmodem --algorithm table-driven --generate c +from typing import List + +__all__ = ["crc16xmodem"] + + +_CRC16_XMODEM_TABLE = [ + 0x0000, + 0x1021, + 0x2042, + 0x3063, + 0x4084, + 0x50A5, + 0x60C6, + 0x70E7, + 0x8108, + 0x9129, + 0xA14A, + 0xB16B, + 0xC18C, + 0xD1AD, + 0xE1CE, + 0xF1EF, + 0x1231, + 0x0210, + 0x3273, + 0x2252, + 0x52B5, + 0x4294, + 0x72F7, + 0x62D6, + 0x9339, + 0x8318, + 0xB37B, + 0xA35A, + 0xD3BD, + 0xC39C, + 0xF3FF, + 0xE3DE, + 0x2462, + 0x3443, + 0x0420, + 0x1401, + 0x64E6, + 0x74C7, + 0x44A4, + 0x5485, + 0xA56A, + 0xB54B, + 0x8528, + 0x9509, + 0xE5EE, + 0xF5CF, + 0xC5AC, + 0xD58D, + 0x3653, + 0x2672, + 0x1611, + 0x0630, + 0x76D7, + 0x66F6, + 0x5695, + 0x46B4, + 0xB75B, + 0xA77A, + 0x9719, + 0x8738, + 0xF7DF, + 0xE7FE, + 0xD79D, + 0xC7BC, + 0x48C4, + 0x58E5, + 0x6886, + 0x78A7, + 0x0840, + 0x1861, + 0x2802, + 0x3823, + 0xC9CC, + 0xD9ED, + 0xE98E, + 0xF9AF, + 0x8948, + 0x9969, + 0xA90A, + 0xB92B, + 0x5AF5, + 0x4AD4, + 0x7AB7, + 0x6A96, + 0x1A71, + 0x0A50, + 0x3A33, + 0x2A12, + 0xDBFD, + 0xCBDC, + 0xFBBF, + 0xEB9E, + 0x9B79, + 0x8B58, + 0xBB3B, + 0xAB1A, + 0x6CA6, + 0x7C87, + 0x4CE4, + 0x5CC5, + 0x2C22, + 0x3C03, + 0x0C60, + 0x1C41, + 0xEDAE, + 0xFD8F, + 0xCDEC, + 0xDDCD, + 0xAD2A, + 0xBD0B, + 0x8D68, + 0x9D49, + 0x7E97, + 0x6EB6, + 0x5ED5, + 0x4EF4, + 0x3E13, + 0x2E32, + 0x1E51, + 0x0E70, + 0xFF9F, + 0xEFBE, + 0xDFDD, + 0xCFFC, + 0xBF1B, + 0xAF3A, + 0x9F59, + 0x8F78, + 0x9188, + 0x81A9, + 0xB1CA, + 0xA1EB, + 0xD10C, + 0xC12D, + 0xF14E, + 0xE16F, + 0x1080, + 0x00A1, + 0x30C2, + 0x20E3, + 0x5004, + 0x4025, + 0x7046, + 0x6067, + 0x83B9, + 0x9398, + 0xA3FB, + 0xB3DA, + 0xC33D, + 0xD31C, + 0xE37F, + 0xF35E, + 0x02B1, + 0x1290, + 0x22F3, + 0x32D2, + 0x4235, + 0x5214, + 0x6277, + 0x7256, + 0xB5EA, + 0xA5CB, + 0x95A8, + 0x8589, + 0xF56E, + 0xE54F, + 0xD52C, + 0xC50D, + 0x34E2, + 0x24C3, + 0x14A0, + 0x0481, + 0x7466, + 0x6447, + 0x5424, + 0x4405, + 0xA7DB, + 0xB7FA, + 0x8799, + 0x97B8, + 0xE75F, + 0xF77E, + 0xC71D, + 0xD73C, + 0x26D3, + 0x36F2, + 0x0691, + 0x16B0, + 0x6657, + 0x7676, + 0x4615, + 0x5634, + 0xD94C, + 0xC96D, + 0xF90E, + 0xE92F, + 0x99C8, + 0x89E9, + 0xB98A, + 0xA9AB, + 0x5844, + 0x4865, + 0x7806, + 0x6827, + 0x18C0, + 0x08E1, + 0x3882, + 0x28A3, + 0xCB7D, + 0xDB5C, + 0xEB3F, + 0xFB1E, + 0x8BF9, + 0x9BD8, + 0xABBB, + 0xBB9A, + 0x4A75, + 0x5A54, + 0x6A37, + 0x7A16, + 0x0AF1, + 0x1AD0, + 0x2AB3, + 0x3A92, + 0xFD2E, + 0xED0F, + 0xDD6C, + 0xCD4D, + 0xBDAA, + 0xAD8B, + 0x9DE8, + 0x8DC9, + 0x7C26, + 0x6C07, + 0x5C64, + 0x4C45, + 0x3CA2, + 0x2C83, + 0x1CE0, + 0x0CC1, + 0xEF1F, + 0xFF3E, + 0xCF5D, + 0xDF7C, + 0xAF9B, + 0xBFBA, + 0x8FD9, + 0x9FF8, + 0x6E17, + 0x7E36, + 0x4E55, + 0x5E74, + 0x2E93, + 0x3EB2, + 0x0ED1, + 0x1EF0, +] + + +def _crc16(data: bytes, crc: int, table: List[int]): + """Calculate CRC16 using the given table. + `data` - data for calculating CRC, must be bytes + `crc` - initial value + `table` - table for caclulating CRC (list of 256 integers) + Return calculated value of CRC + """ + for byte in data: + crc = ((crc << 8) & 0xFF00) ^ table[((crc >> 8) & 0xFF) ^ byte] + return crc & 0xFFFF + + +def crc16xmodem(data: bytes, crc: int = 0) -> int: + """Calculate CRC-CCITT (XModem) variant of CRC16. + `data` - data for calculating CRC, must be bytes + `crc` - initial value + Return calculated value of CRC + """ + return _crc16(data, crc, _CRC16_XMODEM_TABLE) \ No newline at end of file diff --git a/coinaddrvalidator/interfaces.py b/coinaddrvalidator/interfaces.py new file mode 100644 index 0000000..99c1201 --- /dev/null +++ b/coinaddrvalidator/interfaces.py @@ -0,0 +1,221 @@ +# pylint: disable=inherit-non-class,no-self-argument,no-method-argument +# pylint: disable=unexpected-special-method-signature,arguments-differ + +""" +:mod:`coinaddr.interfaces` +~~~~~~~~~~~~~~~~~~~~~~~~ + +Various interfaces for the coinaddr package. +""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, Iterator, Optional + + +class INamedInstanceContainer(ABC): + """Contains all currencies instantiated.""" + + @property + @abstractmethod + def instances(self) -> Dict[str, Any]: + """Mapping of instance.name -> instance""" + pass + + @abstractmethod + def __getitem__(self, name: str) -> Any: + """Return the named instance""" + pass + + @abstractmethod + def __setitem__(self, name: str, obj: Any) -> None: + """Add the named instance to the mapping of instances""" + pass + + @abstractmethod + def __delitem__(self, name: str) -> None: + """Remove the named instance from the mapping of instances""" + pass + + @abstractmethod + def __contains__(self, name: str) -> bool: + """Return true if we contain the named instance""" + pass + + @abstractmethod + def __iter__(self) -> Iterator[Any]: + """Return an iterable, iterating all instances""" + pass + + @abstractmethod + def get(self, name: str, default: Any = None) -> Any: + """Return the named instance if we contain it, else default""" + pass + + +class INamedSubclassContainer(ABC): + """Contains a weakvaluedict of subclasses.""" + + @property + @abstractmethod + def subclasses(self) -> Dict[str, Any]: + """Mapping of subclass.name -> subclass""" + pass + + @abstractmethod + def __getitem__(self, name: str) -> Any: + """Return the named subclass""" + pass + + @abstractmethod + def __setitem__(self, name: str, obj: Any) -> None: + """Add the named subclass to the mapping of subclasses""" + pass + + @abstractmethod + def __delitem__(self, name: str) -> None: + """Remove the named subclass from the mapping of subclasses""" + pass + + @abstractmethod + def __contains__(self, name: str) -> bool: + """Return true if we contain the named subclass""" + pass + + @abstractmethod + def __iter__(self) -> Iterator[Any]: + """Return an iterable, iterating all subclasses""" + pass + + @abstractmethod + def get(self, name: str, default: Any = None) -> Any: + """Return the named subclass if we contain it, else default""" + pass + + +class ICurrency(ABC): + """A cryptocurrency address specification.""" + + @property + @abstractmethod + def name(self) -> str: + """Name of currency""" + pass + + @property + @abstractmethod + def ticker(self) -> str: + """Ticker symbol for currency""" + pass + + @property + @abstractmethod + def validator(self) -> str: + """Validator name for validation""" + pass + + @property + @abstractmethod + def networks(self) -> Dict[str, Any]: + """The networks and version bytes for those networks""" + pass + + @property + @abstractmethod + def charset(self) -> Optional[bytes]: + """For base58Check based currencies, custom charset.""" + pass + + +class IValidator(ABC): + """A cryptocurrency address validator.""" + + @property + @abstractmethod + def name(self) -> str: + """Name of validator""" + pass + + @property + @abstractmethod + def network(self) -> str: + """Network name of address being validated""" + pass + + @abstractmethod + def validate(self) -> bool: + """Validate the address type, True if valid, else False.""" + pass + + +class IValidationRequest(ABC): + """Contains the data and helpers for a given validation request.""" + + @property + @abstractmethod + def currency(self) -> str: + """The currency name or ticker being validated""" + pass + + @property + @abstractmethod + def address(self) -> str: + """The address to be validated""" + pass + + @property + @abstractmethod + def extras(self) -> Dict[str, Any]: + """Any extra attributes to be passed to decoder, etc""" + pass + + @property + @abstractmethod + def networks(self) -> str: + """Concatenated list of all network versions for currency""" + pass + + @abstractmethod + def execute(self) -> 'IValidationResult': + """Executes the request and returns a ValidationResult object""" + pass + + +class IValidationResult(ABC): + """Represents all data for a validation result.""" + + @property + @abstractmethod + def name(self) -> str: + """Name of currency for address validated""" + pass + + @property + @abstractmethod + def ticker(self) -> str: + """Ticker of currency for address validated""" + pass + + @property + @abstractmethod + def address(self) -> str: + """The address that was validated""" + pass + + @property + @abstractmethod + def valid(self) -> bool: + """Boolean representing whether the address is valid""" + pass + + @property + @abstractmethod + def network(self) -> Optional[str]: + """Name of network the address belongs to if applicable""" + pass + + @property + @abstractmethod + def is_extended(self) -> bool: + """boolean representing whether the address is extended key or not""" + pass + diff --git a/coinaddrvalidator/validation.py b/coinaddrvalidator/validation.py new file mode 100644 index 0000000..598bb0e --- /dev/null +++ b/coinaddrvalidator/validation.py @@ -0,0 +1,693 @@ +""" +:mod:`coinaddr.validation` +~~~~~~~~~~~~~~~~~~~~~~~~ + +Validation of cryptocurrency addresses. +""" + +import re +from hashlib import sha256, blake2b +import functools +import operator +from typing import Any, Dict, Optional, Type, ClassVar, Union + +import attr +from Crypto.Hash import keccak +import base58check +import math +from binascii import unhexlify, crc32 +import base64 +from blake256 import blake256 +import cbor +import bech32 +import groestlcoin_hash2 + +from .encoding import crc16 +from .interfaces import ( + ICurrency, IValidator, IValidationRequest, IValidationResult) +from .currency import Currencies, Currency +from .base import NamedSubclassContainerBase + + +class Validators(metaclass=NamedSubclassContainerBase): + """Container for all validators.""" + + +class ValidatorMeta(type): + """Register validator classes on Validators.validators.""" + + def __new__(mcs, cls, bases, attrs): + new = type.__new__(mcs, cls, bases, attrs) + if new.name: + Validators[new.name] = new + return new + + +@attr.s(frozen=True, slots=True) +class ValidationResult: + """Represents all data for a validation result.""" + + name = attr.ib(validator=attr.validators.instance_of(str)) + ticker = attr.ib(validator=attr.validators.instance_of(str)) + address = attr.ib(validator=attr.validators.instance_of((str, bytes))) + valid = attr.ib(validator=attr.validators.instance_of(bool)) + network = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(str)), default=None) + is_extended = attr.ib(validator=attr.validators.instance_of(bool), default=False) + address_type = attr.ib(validator=attr.validators.instance_of(str), default='address') + + def __bool__(self): + return self.valid + + +@attr.s(frozen=True, slots=True) +class ValidationRequest: + """Contains the data and helpers for a given validation request.""" + + currency = attr.ib(validator=attr.validators.instance_of((str, Currency))) + address = attr.ib(validator=attr.validators.instance_of(str)) + extras = attr.ib(validator=attr.validators.instance_of(dict), default=attr.Factory(dict)) + networks = attr.ib(validator=attr.validators.instance_of(str), default='') + + def execute(self) -> ValidationResult: + """Executes the request and returns a ValidationResult object""" + if isinstance(self.currency, str): + currency = Currencies.get(self.currency) + if not currency: + return ValidationResult( + name=self.currency, + ticker=self.currency, + address=self.address.encode('utf-8'), + valid=False) + else: + currency = self.currency + + validator_cls = self._get_validator_cls(currency.validator) + if not validator_cls: + return ValidationResult( + name=currency.name, + ticker=currency.ticker, + address=self.address.encode('utf-8'), + valid=False) + + # Create a new request with the original string address + request = ValidationRequest( + currency=currency, + address=self.address, + extras=self.extras, + networks=self.networks + ) + + validator = validator_cls(request=request) + + valid = False + network = '' + is_extended = False + address_type = 'address' + try: + valid = validator.validate() + network = validator.network + is_extended = validator.validate_extended() + address_type = validator.address_type + except: + pass + + return ValidationResult( + name=currency.name, + ticker=currency.ticker, + address=self.address.encode('utf-8'), + valid=valid, + network=network, + is_extended=is_extended, + address_type=address_type) + + def _get_validator_cls(self, validator_name: str) -> Optional[Type[IValidator]]: + """Get the validator class for the given validator name.""" + return Validators.get(validator_name) + + +@attr.s(cmp=False, slots=True) +class ValidatorBase(metaclass=ValidatorMeta): + """Validator Interface.""" + + name: ClassVar[str] = None + + request = attr.ib( + validator=attr.validators.instance_of(ValidationRequest) + ) + + def validate(self) -> bool: + """Validate the address type, return True if valid, else False.""" + raise NotImplementedError + + def validate_extended(self) -> bool: + """Validate the extended keys, return True if valid, else False.""" + raise NotImplementedError + + @property + def network(self) -> str: + """Return the network derived from the network version bytes.""" + raise NotImplementedError + + @property + def address_type(self) -> str: + """Return the address type derived from the network version bytes.""" + return 'address' + + +@attr.s(frozen=True, slots=True, cmp=False) +class GRSValidator(ValidatorBase): + """Validates Groestlcoin addresses.""" + + name = 'GRSCheck' + + def validate(self) -> bool: + # groestlcoin address is 34 bytes long + if len(self.request.address) != 34: + return False + try: + decoded = base58check.b58decode(self.request.address) + except ValueError: + return False + + hash_str = decoded[0:21] + checksum = groestlcoin_hash2.groestl_hash(hash_str)[:4] + expected_checksum = decoded[21:] + + if checksum != expected_checksum: + return False + + return True + + def validate_extended(self) -> bool: + return False + + @property + def network(self) -> str: + for name, networks in self.request.currency.networks.items(): + for netw in networks: + if self.request.address.startswith(netw.encode('utf-8')): + return name + return "" + + +@attr.s(frozen=True, slots=True, cmp=False) +class Bech32CheckValidator(ValidatorBase): + """Validates Bech32 addresses.""" + + name = 'Bech32Check' + + def validate(self) -> bool: + decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) + data = decoded_address[1] + + if self.network == "": + return False + + if data is None: + return False + + return True + + def validate_extended(self) -> bool: + return False + + @property + def network(self) -> str: + decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) + hrp = decoded_address[0] + + for name, networks in self.request.currency.networks.items(): + for netw in networks: + if hrp == netw: + return name + return "" + + +@attr.s(frozen=True, slots=True, cmp=False) +class Base58CheckValidator(ValidatorBase): + """Validates Base58Check based cryptocurrency addresses.""" + + name = 'Base58Check' + # base58 alphabet representation + dec_digit_to_base58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" + base58_digit_to_dec = {b58: dec for dec, b58 in enumerate(dec_digit_to_base58)} + + def validate(self) -> bool: + """extended keys have their own validation""" + if len(self.request.address) == 111: + return self.validate_extended() + + """Validate the address.""" + if 25 > len(self.request.address) > 35: + return False + + try: + # Use custom charset if provided + extras = self.request.extras.copy() + if self.request.currency.charset: + extras['charset'] = self.request.currency.charset + abytes = base58check.b58decode( + self.request.address, **extras) + except ValueError: + return False + + # For XRP, we need to check the network first + network = self.network + if network == '': + return False + + # Calculate checksum + checksum = sha256(sha256(abytes[:-4]).digest()).digest()[:4] + if abytes[-4:] != checksum: + return False + + # Verify the address can be re-encoded correctly + try: + reencoded = base58check.b58encode(abytes, **extras) + return self.request.address == reencoded.decode('utf-8') + except Exception: + return False + + def validate_extended(self, checksum_algo='sha256') -> bool: + if len(self.request.address) != 111: + return False + + if self.network == '': + return False + + # strip leading "zeros" (the "1" digit with base58) + base58_stripped = self.request.address.lstrip("1") + # convert base58 to decimal + int_rep = 0 + for base58_digit in base58_stripped: + int_rep *= 58 + try: + int_rep += self.base58_digit_to_dec[base58_digit] + except KeyError: + # not a valid base58 digit -> invalid address + return False + + # encode it to base64 + hex_rep = "{:X}".format(int_rep) + # if the length is odd, add leading zero (needed for b16decode) + if len(hex_rep) % 2 == 1: + hex_rep = "0" + hex_rep + # decode it into a binary string, padded with zeros + # 72 bytes (extended key size) + 4 bytes (prefix version bytes) + all_bytes = base64.b16decode(hex_rep).rjust(82, b"\0") + + # count leading zeros + zero_count = next(zeros for zeros, byte in enumerate(all_bytes) if byte != 0) + # compare it with the number of leading zeros lstripped at the beginning + if len(self.request.address) - len(base58_stripped) != zero_count: + return False + + if checksum_algo == 'blake256': + checksum = blake256.blake_hash(blake256.blake_hash(all_bytes[:-4]))[:4] + elif checksum_algo == 'sha256': + checksum = sha256(sha256(all_bytes[:-4]).digest()).digest()[:4] + else: + return False + + # checking if the checksum is valid + if checksum != all_bytes[-4:]: + return False + + return True + + @property + def network(self) -> str: + """Return network derived from network version bytes.""" + try: + # Use custom charset if provided + extras = self.request.extras.copy() + if self.request.currency.charset: + extras['charset'] = self.request.currency.charset + abytes = base58check.b58decode( + self.request.address, **extras) + except ValueError: + return '' + + nbyte = abytes[0] + for name, networks in self.request.currency.networks.items(): + if isinstance(networks, tuple): + if nbyte in networks: + return name + elif isinstance(networks, str): + if self.request.address.startswith(networks): + return name + return '' + + @property + def address_type(self) -> str: + """Return address type derived from network version bytes.""" + if len(self.request.address) == 0: + return '' + try: + # Use custom charset if provided + extras = self.request.extras.copy() + if self.request.currency.charset: + extras['charset'] = self.request.currency.charset + abytes = base58check.b58decode( + self.request.address, **extras) + except ValueError: + return '' + + for name, networks in self.request.currency.address_types.items(): + for netw in networks: + if netw != 0: + # count the prefix length in bytes + prefixlen = math.ceil(math.floor((math.log(netw) / math.log(2)) + 1) / 8) + else: + prefixlen = 1 + address_prefix = [x for x in bytearray(abytes[:prefixlen])] + if prefixtodec(address_prefix) == netw: + return name + + if len(self.request.currency.address_types.items()) == 0: + return 'address' + else: + return '' + + +@attr.s(frozen=True, slots=True, cmp=False) +class EthereumValidator(ValidatorBase): + """Validates ethereum based cryptocurrency addresses.""" + + name = 'Ethereum' + non_checksummed_patterns = ( + re.compile("^(0x)?[0-9a-f]{40}$"), re.compile("^(0x)?[0-9A-F]{40}$") + ) + + def validate(self) -> bool: + """Validate the address.""" + address = self.request.address + + # Remove '0x' prefix if present + if address.startswith('0x'): + address = address[2:] + + # Check if it's a non-checksummed address + if any(bool(pat.match(address)) for pat in self.non_checksummed_patterns): + return True + + # Ethereum address has to contain exactly 40 chars (20-bytes) + if len(address) != 40: + return False + + # Ethereum address is generated by keccak algorithm and has to be hexadecimal + k = keccak.new(digest_bits=256) + addr_hash = k.update(address.lower().encode('ascii')).hexdigest() + + # Check each character against the hash + for i, letter in enumerate(address): + if any([ + int(addr_hash[i], 16) >= 8 and letter.upper() != letter, + int(addr_hash[i], 16) < 8 and letter.lower() != letter + ]): + return False + return True + + def validate_extended(self) -> bool: + return False + + @property + def network(self) -> str: + """Return network derived from network version bytes.""" + return 'both' + + +@attr.s(frozen=True, slots=True, cmp=False) +class EosValidator(ValidatorBase): + """Validates EOS cryptocurrency addresses.""" + + name = 'EOS' + + def validate(self) -> bool: + """Validate the address.""" + address = self.request.address + + # EOS addresses must be 12 characters long + if len(address) != 12: + return False + + # EOS addresses must start with a letter and contain only a-z, 1-5, and . + eos_pattern = re.compile('^[a-z][a-z1-5.]{10}[a-z1-5]$') + return bool(eos_pattern.match(address)) + + def validate_extended(self) -> bool: + return False + + @property + def network(self) -> str: + return '' + + @property + def address_type(self) -> str: + return 'address' + + +@attr.s(frozen=True, slots=True, cmp=False) +class StellarValidator(ValidatorBase): + """Validates Stellar cryptocurrency addresses.""" + + name = 'Stellar' + + def validate(self) -> bool: + try: + decoded_address = base64.b32decode(self.request.address) + except: + return False + + version_byte = decoded_address[0] + payload = decoded_address[0:-2] + expected_checksum = int.from_bytes(decoded_address[-2:], byteorder='little') + + if version_byte != 6 << 3: # ed25519PublicKey + return False + + checksum = crc16.crc16xmodem(payload) + + if checksum != expected_checksum: + return False + + return True + + def validate_extended(self) -> bool: + return False + + @property + def network(self) -> str: + return '' + + +@attr.s(frozen=True, slots=True, cmp=False) +class CosmosValidator(ValidatorBase): + """Validates Cosmos cryptocurrency addresses.""" + + name = 'CosmosCheck' + hrp_table = ("cosmos", "cosmospub", "cosmosvalcons", "cosmosvalconspub", "cosmosvaloper", "cosmosvaloperpub") + + def validate(self) -> bool: + try: + address = self.request.address + decoded_address = bech32.bech32_decode(address) + if not decoded_address: + return False + + hrp, data = decoded_address + if not hrp or not data: + return False + + if hrp not in self.hrp_table: + return False + + # For Cosmos addresses, we only need to verify the HRP and that the data exists + # The bech32_decode function already verifies the checksum + return True + except Exception: + return False + + def validate_extended(self) -> bool: + return False + + @property + def network(self) -> str: + return "" + + @property + def address_type(self) -> str: + try: + address = self.request.address + decoded_address = bech32.bech32_decode(address) + if not decoded_address: + return "" + + hrp, _ = decoded_address + if not hrp: + return "" + + if hrp not in self.hrp_table: + return "" + + return hrp + except Exception: + return "" + + +@attr.s(frozen=True, slots=True, cmp=False) +class BitcoinBasedCheck(ValidatorBase): + """Validates Bitcoin-based cryptocurrency addresses.""" + + name = 'BitcoinBasedCheck' + + def validate(self) -> bool: + """Validate the address.""" + if len(self.request.address) == 111: + return self.validate_extended() + + if 25 > len(self.request.address) > 35: + return False + + try: + abytes = base58check.b58decode( + self.request.address, **self.request.extras) + except ValueError: + return False + + # Check network first + network = self.network + if network == '': + return False + + # Calculate checksum + checksum = sha256(sha256(abytes[:-4]).digest()).digest()[:4] + if abytes[-4:] != checksum: + return False + + # Verify the address can be re-encoded correctly + try: + reencoded = base58check.b58encode(abytes, **self.request.extras) + return self.request.address == reencoded.decode('utf-8') + except Exception: + return False + + def validate_extended(self) -> bool: + if len(self.request.address) != 111: + return False + + if self.network == '': + return False + + # strip leading "zeros" (the "1" digit with base58) + base58_stripped = self.request.address.lstrip("1") + # convert base58 to decimal + int_rep = 0 + for base58_digit in base58_stripped: + int_rep *= 58 + try: + int_rep += self.base58_digit_to_dec[base58_digit] + except KeyError: + # not a valid base58 digit -> invalid address + return False + + # encode it to base64 + hex_rep = "{:X}".format(int_rep) + # if the length is odd, add leading zero (needed for b16decode) + if len(hex_rep) % 2 == 1: + hex_rep = "0" + hex_rep + # decode it into a binary string, padded with zeros + # 72 bytes (extended key size) + 4 bytes (prefix version bytes) + all_bytes = base64.b16decode(hex_rep).rjust(82, b"\0") + + # count leading zeros + zero_count = next(zeros for zeros, byte in enumerate(all_bytes) if byte != 0) + # compare it with the number of leading zeros lstripped at the beginning + if len(self.request.address) - len(base58_stripped) != zero_count: + return False + + checksum = sha256(sha256(all_bytes[:-4]).digest()).digest()[:4] + if checksum != all_bytes[-4:]: + return False + + return True + + @property + def network(self) -> str: + """Return network derived from network version bytes.""" + try: + abytes = base58check.b58decode( + self.request.address, **self.request.extras) + except ValueError: + return '' + + nbyte = abytes[0] + for name, networks in self.request.currency.networks.items(): + if isinstance(networks, tuple): + # Check if the first byte matches any of the network values + if nbyte in networks: + return name + # For Litecoin, also check if the address starts with 'ltc' or 'tltc' + if name == 'main' and self.request.address.startswith('ltc'): + return name + if name == 'test' and self.request.address.startswith('tltc'): + return name + elif isinstance(networks, str): + if self.request.address.startswith(networks): + return name + return '' + + @property + def address_type(self) -> str: + """Return address type derived from network version bytes.""" + if len(self.request.address) == 0: + return '' + try: + abytes = base58check.b58decode( + self.request.address, **self.request.extras) + except ValueError: + return '' + + for name, networks in self.request.currency.address_types.items(): + for netw in networks: + if netw != 0: + # count the prefix length in bytes + prefixlen = math.ceil(math.floor((math.log(netw) / math.log(2)) + 1) / 8) + else: + prefixlen = 1 + address_prefix = [x for x in bytearray(abytes[:prefixlen])] + if prefixtodec(address_prefix) == netw: + return name + + if len(self.request.currency.address_types.items()) == 0: + return 'address' + else: + return '' + + +def validate(currency: str, address: str, **extras: Any) -> ValidationResult: + """Validate a cryptocurrency address. + + Args: + currency: The currency name or ticker to validate against + address: The address to validate + **extras: Any extra attributes to be passed to decoder, etc + + Returns: + A ValidationResult object containing the validation results + """ + request = ValidationRequest( + currency=currency, + address=address, + extras=extras) + return request.execute() + + +def prefixtodec(prefix): + total = 0 + multiplier = 256 + for i in range(2, len(prefix) + 1): + total += prefix[-i] * multiplier + multiplier *= 256 + return total + prefix[-1] + diff --git a/requirements.txt b/requirements.txt index 8a00f0b..bd9cdc2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ -zope -attrs>=17.4.0 -pysha3>=1.0.2 +attrs>=24.0.0 base58check>=1.0.1 -zope.interface>=4.4.3 -bech32 -cbor -blake256 -groestlcoin_hash2 -crc16 \ No newline at end of file +bech32>=1.1.0 +blake256>=0.1.1 +cbor>=1.0.0 +cryptography>=2.1.4 +groestlcoin-hash2>=0.1.0 +pycryptodome>=3.6.1 + + + + diff --git a/setup.py b/setup.py index 81bc4d2..8cdd922 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages -with open('coinaddrng/__init__.py', 'rt') as fd: +with open('coinaddrvalidator/__init__.py', 'rt') as fd: version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE).group(1) @@ -18,10 +18,11 @@ setup( - name='coinaddrng', + name='coinaddrvalidator', version=version, description='A crypto-currency address inspection/validation library.', - #long_description=long_description, + long_description=long_description, + long_description_content_type='text/markdown', keywords=[ 'bitcoin', 'litecoin', @@ -31,24 +32,25 @@ 'validation', 'inspection', ], - author='Joe Black', - author_email='me@joeblack.nyc', - maintainer='Joe Black', - maintainer_email='me@joeblack.nyc', - url='https://github.com/joeblackwaslike/coinaddr', + author='Mohammad Aghamir', + author_email='maghamir@nobitex.net', + maintainer='Mohammad Aghamir', + maintainer_email='maghamir@nobitex.net', + url='https://github.com/nobitex/coinaddrvalid', download_url=( - 'https://github.com/joeblackwaslike/coinaddr/tarball/v%s' % version), + 'https://github.com/nobitex/coinaddrvalid/tarball/v%s' % version), license='MIT', install_requires=[ - 'attrs>=17.4.0', - 'pysha3>=1.0.2', + 'attrs>=24.0.0', + 'pycryptodome>=3.6.1', 'base58check>=1.0.1', 'zope.interface>=4.4.3', - 'crc16>=0.1.1', 'blake256>=0.1.1', 'cbor>=1.0.0', 'bech32>=1.1.0', - 'groestlcoin-hash2>=1.1.1' + 'groestlcoin-hash2>=1.1.1', + 'zope', + 'zope.interface>=4.4.3', ], zip_safe=False, packages=find_packages(), @@ -63,6 +65,12 @@ 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Programming Language :: Python :: 3 :: Only', 'Topic :: Internet :: WWW/HTTP', 'Topic :: Software Development', diff --git a/tests/test_coinaddr.py b/tests/test_coinaddr.py index a846d3d..b054d05 100644 --- a/tests/test_coinaddr.py +++ b/tests/test_coinaddr.py @@ -1,13 +1,13 @@ import unittest -import coinaddrng +import coinaddrvalidator -from coinaddrng.interfaces import ( +from coinaddrvalidator.interfaces import ( INamedSubclassContainer, INamedInstanceContainer, ICurrency, IValidator, IValidationRequest, IValidationResult ) -from coinaddrng.currency import Currencies, Currency -from coinaddrng.validation import ( +from coinaddrvalidator.currency import Currencies, Currency +from coinaddrvalidator.validation import ( Validators, ValidatorBase, ValidationRequest, ValidationResult, Base58CheckValidator, EthereumValidator ) @@ -23,6 +23,7 @@ ('bitcoin-cash', 'bch', b'3QJmV3qfvL9SuYo34YihAf3sRCW3qSinyC', 'main'), ('litecoin', 'ltc', b'LeF6vC9k1qfFDEj6UGjM5e4fwHtiKsakTd', 'main'), ('litecoin', 'ltc', b'mkwV3DZkgYwKaXkphBtcXAjsYQEqZ8aB3x', 'test'), + ('litecoin', 'ltc', b'ltc1qs54v679auflz9y88nleyy6qknalwwmfx6kcf8z', 'main'), ('neocoin', 'neo', b'AL9fzczwjV6ynoFAJVz4fBDu4NYLG6MBwm', 'both'), ('dogecoin', 'doge', b'DAnBU2rLkUgQb1ZLBJd6Bm5pZ45RN4TQC4', 'main'), ('dogecoin', 'doge', b'njscgXBB3HUUTXH7njim1Uw82PF9da4R8k', 'test'), @@ -37,6 +38,7 @@ ('terramoney', 'luna', b'terra1v5hrqlv8dqgzvy0pwzqzg0gxy899rm4kdn0jp4', ''), ('polkadot', 'dot', b'12gX42C4Fj1wgtfgoP624zeHrcPBqzhb4yAENyvFdGX6EUnN', ''), ('kusama', 'ksm', b'GLdQ4D4wkeEJUX8DBT9HkpycFVYQZ3fmJyQ5ZgBRxZ4LD3S', ''), + ('stellar', 'xlm', b'GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH', ''), ] WRONG_DATA = [ @@ -52,7 +54,7 @@ class TestCoinaddr(unittest.TestCase): def test_validation_by_name(self): for name, ticker, addr, net in TEST_DATA: with self.subTest(name=name, address=addr, net=net): - res = coinaddrng.validate(name, addr) + res = coinaddrvalidator.validate(name, addr) self.assertEqual(name, res.name) self.assertEqual(ticker, res.ticker) self.assertEqual(addr, res.address) @@ -61,13 +63,13 @@ def test_validation_by_name(self): for name, ticker, addr, net in WRONG_DATA: with self.subTest(name=name, address=addr, net=net): - res = coinaddrng.validate(name, addr) + res = coinaddrvalidator.validate(name, addr) self.assertNotEqual(True, res.valid) def test_validation_by_ticker(self): for name, ticker, addr, net in TEST_DATA: with self.subTest(name=name, ticker=ticker, address=addr, net=net): - res = coinaddrng.validate(ticker, addr) + res = coinaddrvalidator.validate(ticker, addr) self.assertEqual(name, res.name) self.assertEqual(ticker, res.ticker) self.assertEqual(addr, res.address) @@ -78,7 +80,7 @@ def test_validation_by_ticker(self): def test_validation_from_text(self): for name, ticker, addr, net in TEST_DATA: with self.subTest(name=name, address=addr, net=net): - res = coinaddrng.validate(name, addr.decode()) + res = coinaddrvalidator.validate(name, addr.decode()) self.assertEqual(name, res.name) self.assertEqual(ticker, res.ticker) self.assertEqual(addr, res.address) @@ -89,7 +91,7 @@ def test_validation_wrong_data(self): for currency in Currencies.instances.values(): for addr in WRONG_ADDRESSES: with self.subTest(name=currency.name, address=addr): - res = coinaddrng.validate(currency.name, addr) + res = coinaddrvalidator.validate(currency.name, addr) self.assertEqual(res.valid, False) @@ -109,7 +111,7 @@ def test_extending_currency(self): ] for name, ticker, addr, net in test_data: with self.subTest(name=name, ticker=ticker, address=addr, net=net): - res = coinaddrng.validate(name, addr) + res = coinaddrvalidator.validate(name, addr) self.assertEqual(name, res.name) self.assertEqual(ticker, res.ticker) self.assertEqual(addr, res.address) @@ -117,7 +119,7 @@ def test_extending_currency(self): self.assertEqual(net, res.network) with self.subTest(name=name, ticker=ticker, address=addr, net=net): - res = coinaddrng.validate(ticker, addr) + res = coinaddrvalidator.validate(ticker, addr) self.assertEqual(name, res.name) self.assertEqual(ticker, res.ticker) self.assertEqual(addr, res.address) diff --git a/tests/test_currency.py b/tests/test_currency.py index c9ee151..cbe6a27 100644 --- a/tests/test_currency.py +++ b/tests/test_currency.py @@ -1,7 +1,7 @@ import unittest -from coinaddrng.interfaces import INamedInstanceContainer, ICurrency -from coinaddrng.currency import Currencies, Currency +from coinaddrvalidator.interfaces import INamedInstanceContainer, ICurrency +from coinaddrvalidator.currency import Currencies, Currency class TestCurrency(unittest.TestCase): diff --git a/tests/test_validation.py b/tests/test_validation.py index 3ac7db5..d2f7c13 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -1,11 +1,11 @@ import unittest -from coinaddrng.interfaces import ( +from coinaddrvalidator.interfaces import ( INamedSubclassContainer, IValidator, IValidationRequest, IValidationResult ) -from coinaddrng.validation import ( +from coinaddrvalidator.validation import ( Validators, ValidatorBase, ValidationRequest, ValidationResult, - Base58CheckValidator, EthereumValidator + Base58CheckValidator, EthereumValidator, validate ) @@ -22,7 +22,30 @@ def test_interfaces(self): IValidationRequest.implementedBy(ValidationRequest)) self.assertTrue( IValidationResult.implementedBy(ValidationResult)) + + def test_wrong_network_bytes_input(self): + try: + validate("bt", b"12nMGd6bzC8UpyWjd9HeZESZheZ8arttAb") + except TypeError: + assert False + def test_wrong_network_str_input(self): + try: + validate("bt", "12nMGd6bzC8UpyWjd9HeZESZheZ8arttAb") + except TypeError: + assert False + + def test_invalid_as_default(self): + result = validate("BTC", b"not_an_address", default_valid=False) + self.assertFalse(result.valid) + + def test_valid_as_default(self): + result = validate("FTM", "0x12341") + self.assertTrue(result.valid) + + def test_uppercase_symbol(self): + result = validate("BTC", "12nMGd6bzC8UpyWjd9HeZESZheZ8arttAb", default_valid=False) + self.assertTrue(result.valid) if __name__ == '__main__': unittest.main() diff --git a/tox.ini b/tox.ini index b7afaac..cfc4e6a 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py34,py35,py36 +envlist = py34,py35,py36,py37,py38,py39,py310 [testenv] commands = pytest