From dfdaf264a096c0e1e8338a629d00130109829437 Mon Sep 17 00:00:00 2001 From: cloether <20920516+cloether@users.noreply.github.com> Date: Thu, 29 May 2025 00:59:08 -0400 Subject: [PATCH] fixed tests --- requirements.txt | 6 +- scripts/depupdate.py | 390 ++++++++++++++++++++++++++++++++++++++++ scripts/runtests.py | 134 ++++++++++---- setup.py | 29 ++- skeleton/__version__.py | 7 +- skeleton/config.py | 43 ++++- skeleton/log.py | 75 ++++++++ skeleton/utils.py | 201 ++++++++++++++------- tests/conftest.py | 12 ++ tests/test_log.py | 78 ++++++-- tests/test_utils.py | 114 +++++++++++- tests/test_version.py | 67 +++++++ 12 files changed, 1014 insertions(+), 142 deletions(-) create mode 100755 scripts/depupdate.py create mode 100644 tests/test_version.py diff --git a/requirements.txt b/requirements.txt index b1e24bf..cadcc5a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ -six -appdirs +# Updated by depupdate.py on 2025-05-29T00:19:36 by user +# Do not edit this section manually. +six==1.17.0 +appdirs==1.4.4 diff --git a/scripts/depupdate.py b/scripts/depupdate.py new file mode 100755 index 0000000..e720f9b --- /dev/null +++ b/scripts/depupdate.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +# coding=utf8 +"""depupdate.py + +Update or check Python project dependencies. +""" +from __future__ import absolute_import, print_function, unicode_literals + +import getpass +import json +import logging +import os +import re +import sys +from datetime import datetime +from subprocess import CalledProcessError, PIPE, check_call, run + +# script version +__version__ = "0.0.1" + +_SCRIPT_NAME = os.path.splitext(os.path.basename(__file__))[0] + +LOGGER = logging.getLogger(_SCRIPT_NAME) + + +def _serialize_args(args): + def _serialize_value(v): + # If it's a basic type, return as is + if isinstance(v, (str, int, float, bool, type(None))): + return v + # If it's a file-like object, show its class and name + if hasattr(v, 'name') and hasattr(v, 'mode'): + return { + "class": v.__class__.__name__, + "name": v.name, + "mode": v.mode + } + # If it's a type, return its name + if isinstance(v, type): + return v.__name__ + # If it's an object with __dict__, show class and attributes + if hasattr(v, '__dict__'): + return { + "class": v.__class__.__name__, + "attributes": { + k: _serialize_value(val) + for k, val in vars(v).items() + } + } + # Fallback to string + return str(v) + + return { + k: _serialize_value(v) + for k, v in vars(args).items() + if v is not None + } + + +def argument_parser(**kwargs): + """Construct Argument Parser.""" + from argparse import ( + ArgumentParser, + ArgumentDefaultsHelpFormatter, + FileType, + SUPPRESS + ) + + _file_mode_suffix = "b" if sys.version_info[0] == 2 else "" + _filetype_read = FileType("r+{0}".format(_file_mode_suffix)) + _filetype_write = FileType("w+{0}".format(_file_mode_suffix)) + + parser = ArgumentParser(**kwargs) + parser.set_defaults( + argument_default=SUPPRESS, + conflict_handler="resolve", + formatter_class=ArgumentDefaultsHelpFormatter, + add_help=False + ) + parser.add_argument( + "-V", "--version", + action="version", + version=__version__ + ) + parser.add_argument( + "-d", "--debug", + "-v", "--verbose", + dest="verbose", + action="store_true", + help="enable debug/verbose logging" + ) + parser.add_argument( + "--logfile", + action="store", + dest="logfile", + required=False, + help="path to log file" + ) + parser.add_argument( + "input", + nargs="?", + default="-", + type=_filetype_read, + help="program input" + ) + parser.add_argument( + "-o", "--output", + action="store", + nargs="?", + default="-", + required=False, + type=_filetype_write, + help="program output" + ) + parser.add_argument( + "requirements_file", + nargs="?", + default="requirements.txt", + type=str, + help="path to requirements.txt file (default: requirements.txt)" + ) + parser.add_argument( + "--update", + action="store_true", + dest="update", + default=False, + help="update dependencies to latest versions" + ) + parser.add_argument( + "--check", + action="store_true", + dest="check", + default=False, + help="check if dependencies are up to date" + ) + parser.add_argument( + "--write", + action="store_true", + dest="write", + default=False, + help="write latest versions to requirements file" + ) + return parser + + +def parse_requirements(file_path): + """Parse the requirements.txt file and return a list of dependencies.""" + # noinspection PyArgumentEqualDefault + with open(file_path, 'r') as f: + lines = f.readlines() + dependencies = [ + line.strip() + for line in lines + if line.strip() and not line.startswith('#') + ] + return dependencies + + +def get_latest_version(package): + """Get the latest version of a package using pip. + """ + try: + result = run( + ['pip', 'index', 'versions', package], + stdout=PIPE, + stderr=PIPE, + text=True + ) + if result.returncode == 0 and "Available versions:" in result.stdout: + match = re.search(r'Available versions: (.+)', result.stdout) + if match: + versions = match.group(1).split(', ') + return versions[0] # Return the latest version + else: + LOGGER.error( + "failed to fetch versions for package: %s - %s", + package, result.stderr + ) + except Exception as e: + LOGGER.error( + "error fetching version for package: %s - %s", + package, e + ) + return None + + +def update_requirements(file_path): + """Update the requirements.txt file with the latest versions. + """ + dependencies = parse_requirements(file_path) + updated_dependencies = [] + for dep in dependencies: + package, _, current_version = dep.partition('==') + latest_version = get_latest_version(package) + if latest_version and latest_version != current_version: + print(f"updating {package} from {current_version} to {latest_version}") + updated_dependencies.append(f"{package}=={latest_version}") + else: + updated_dependencies.append(dep) + with open(file_path, 'w') as f: + f.write('\n'.join(updated_dependencies) + '\n') + + +def update_dependencies(requirements_file="requirements.txt"): + """Update all dependencies listed in requirements.txt. + """ + if not os.path.isfile(requirements_file): + LOGGER.error("Requirements file not found: %s", requirements_file) + return 1 + try: + check_call([ + "python3", "-m", "pip", "install", "--upgrade", "-r", + requirements_file + ]) + LOGGER.info( + "successfully updated dependencies from requirements file: %s", + requirements_file + ) + return 0 + except CalledProcessError as e: + LOGGER.error( + "failed to update dependencies from requirements file: %s - %s", + requirements_file, e + ) + return e.returncode + + +def write_latest_versions(requirements_file="requirements.txt"): + """Write the latest versions of dependencies to the requirements file. + + Args: + requirements_file: Path to the requirements.txt file. + """ + dependencies = parse_requirements(requirements_file) + + updated = [] + for dep in dependencies: + LOGGER.info("checking-dependency: %s", dep) + + package, _, current_version = dep.partition('==') + + latest_version = get_latest_version(package) + + if latest_version: + LOGGER.info( + "updating-package: %s from %s to %s", + package, current_version or "not specified", latest_version + ) + + updated.append("{package}=={latest_version}".format( + package=package, latest_version=latest_version + )) + else: + LOGGER.warning( + "no-latest-version-found: %s (current version: %s)", + package, current_version or "not specified" + ) + updated.append(dep) + + now = datetime.now().isoformat(timespec="seconds") + user = getpass.getuser() + comment = ( + "# Updated by depupdate.py on {now} by {user}\n" + "# Do not edit this section manually.\n" + ).format(now=now, user=user) + + LOGGER.info("writing-updated-dependencies: %s", requirements_file) + + with open(requirements_file, 'w') as f: + f.write(comment) + for n, dep in enumerate(updated, start=1): + f.write(dep + '\n') + LOGGER.debug("updated-dependency-%d: %s", n, dep) + + LOGGER.info( + "requirements file updated with latest versions: %s", + requirements_file + ) + return 0 + + +def check_dependencies(requirements_file="requirements.txt"): + """Check if dependencies are up to date. + """ + if not os.path.isfile(requirements_file): + LOGGER.error("requirements-file-not-found: %s", requirements_file) + return 1 + + dependencies = parse_requirements(requirements_file) + + up_to_date = 0 + outdated = 0 + missing_version = 0 + + for dep in dependencies: + LOGGER.info("checking-dependency: %s", dep) + + package, _, current_version = dep.partition('==') + latest_version = get_latest_version(package) + + if not current_version: + LOGGER.warning( + "no-package-version-specified: '%s' (latest: '%s')", + package, latest_version or "unknown" + ) + missing_version += 1 + elif latest_version and latest_version != current_version: + LOGGER.warning( + "package-is-outdated: name='%s', current-version='%s', " + "latest-version='%s'", + package, current_version, latest_version + ) + outdated += 1 + else: + LOGGER.info( + "package-is-up-to-date: name='%s', version='%s'", + package, current_version + ) + up_to_date += 1 + + LOGGER.info( + "dependency-check-summary: up-to-date=%d outdated=%d missing-version=%d", + up_to_date, outdated, missing_version + ) + + if outdated > 0 or missing_version > 0: + LOGGER.error( + "some dependencies are outdated or missing versions - " + "please update your requirements file" + ) + return 1 + + LOGGER.info("all dependencies are up-to-date") + return 0 + + +def main(*args): + """CLI Entry Point.""" + path = os.path.abspath(__file__) + prog = os.path.splitext(os.path.basename(path))[0] + + parser = argument_parser(prog=prog, description=__doc__) + args = parser.parse_args(args if args else None) + + # setup logging + level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=level, filename=args.logfile) + LOGGER.setLevel(level) + + LOGGER.info("starting: %s (v%s)", prog, __version__) + LOGGER.debug( + "parsed arguments: %s", + json.dumps(_serialize_args(args), indent=2, sort_keys=True) + ) + + if args.update: + LOGGER.info("updating-dependencies: %s", args.requirements_file) + return_code = update_dependencies(args.requirements_file) + + elif args.check: + if args.write: + LOGGER.info( + "checking and updating requirements file: %s", + args.requirements_file + ) + return_code = write_latest_versions(args.requirements_file) + else: + LOGGER.info("checking-dependencies: %s", args.requirements_file) + return_code = check_dependencies(args.requirements_file) + + else: + LOGGER.error("no-action-specified: use --update or --check") + parser.print_help() + return_code = 1 + + LOGGER.info( + "finished: %s (v%s) exiting with return code: %s", + prog, __version__, return_code + ) + + if args.input and not args.input.closed: + args.input.close() + if args.output and not args.output.closed: + args.output.close() + + return return_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/runtests.py b/scripts/runtests.py index 84c20c4..0639730 100755 --- a/scripts/runtests.py +++ b/scripts/runtests.py @@ -2,35 +2,99 @@ # coding=utf8 """run_tests.py -Run tests for python module. +Run tests for the Python module from outside the source directory. -Warnings: - Do not run tests from the root repo dir. - - We want to ensure we're importing from the installed binary package not - from the CWD. +Warning: + Do not run this script from the root repo directory. This ensures + we import the installed module, not the local source. """ from __future__ import absolute_import, print_function, unicode_literals import logging import os +import threading from contextlib import contextmanager from errno import EEXIST -from subprocess import CalledProcessError, check_call +from io import TextIOWrapper +from subprocess import PIPE, Popen +from typing import NamedTuple from setuptools import find_packages from six import next -def run(command): - """Run Command. +class RunResult(NamedTuple): + """Structured result of a command execution.""" + returncode: int + stdout: str + stderr: str + + +def _stream_output( + stream: TextIOWrapper, + target: TextIOWrapper, + buffer: list, + flush: bool = True +): + for line in stream: + print(line, end="", file=target) + buffer.append(line) + if flush and hasattr(target, "flush"): + target.flush() + stream.close() + + +def run_command(command: str, env: dict = None) -> RunResult: + """Run a shell command with concurrent stdout/stderr streaming and capture. + + Args: + command (str): Shell command to execute. + env (dict, optional): Environment variables to set for the command. + If None, it uses the current environment. + + Returns: + RunResult: Captured return code, stdout, and stderr. """ - try: - return_code = check_call(command, shell=True) - except CalledProcessError as e: - sys.stderr.write("{0!r}\n".format(e)) - return_code = e.returncode - return return_code + if env is None: + env = os.environ.copy() + + proc = Popen( + command, + shell=True, + env=env, + stdout=PIPE, + stderr=PIPE, + text=True, + bufsize=1 + ) + + assert proc.stdout is not None and proc.stderr is not None + + stdout_lines = [] + stderr_lines = [] + + # Stream both stdout and stderr concurrently + t_out = threading.Thread( + target=_stream_output, + args=(proc.stdout, sys.stdout, stdout_lines) + ) + t_err = threading.Thread( + target=_stream_output, + args=(proc.stderr, sys.stderr, stderr_lines) + ) + + t_out.start() + t_err.start() + + proc.wait() + t_out.join() + t_err.join() + + return RunResult( + returncode=proc.returncode, + stdout="".join(stdout_lines).strip(), + stderr="".join(stderr_lines).strip() + ) @contextmanager @@ -53,7 +117,7 @@ def cwd(dirname): def module_name(exclude=("doc*", "example*", "script*", "test*"), where=".", include=('*',), default=None): - """Get current module name. + """Get the current module name. Args: exclude (tuple or list): sequence of package names to exclude; @@ -78,7 +142,7 @@ def module_name(exclude=("doc*", "example*", "script*", "test*"), where=".", def mkdir_p(path): - """Create entire filepath. + """Create a directory and any necessary parent directories. Notes: Unix "mkdir -p" equivalent. @@ -123,6 +187,7 @@ def touch(filepath): if not os.path.exists(filepath): fh = open(filepath, "a+") try: + # noinspection PyArgumentEqualDefault os.utime(filepath, None) finally: fh.close() @@ -136,32 +201,39 @@ def main(): Returns: int: Command return code. """ - logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s") + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)s %(message)s" + ) repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) module = module_name(where=repo_root) + + if not module: + logging.error("module name not found in repo root: %s", repo_root) + return -1 + logging.debug("running tests for module: %s", module) - return_code = -1 # noqa + run_result = -1 # noqa - with cwd(repo_root): - # noinspection PyUnusedLocal - env_name = os.getenv("ENVNAME", "test") + # noinspection PyUnusedLocal + env_name = os.getenv("ENVNAME", "test") - tests_dir = os.path.join(repo_root, "tests") - logs_dir = os.path.join(tests_dir, "logs") - reports_dir = os.path.join(tests_dir, "reports") - tests_log_file = os.path.join(logs_dir, "pytest.log") + tests_dir = os.path.join(repo_root, "tests") + logs_dir = os.path.join(tests_dir, "logs") + reports_dir = os.path.join(tests_dir, "reports") + tests_log_file = os.path.join(logs_dir, "pytest.log") - mkdirs_p(logs_dir, reports_dir) - touch(tests_log_file) # prevent pytest error due to missing log file + mkdirs_p(logs_dir, reports_dir) + touch(tests_log_file) # prevent pytest error due to missing log file - return_code = run("pytest {posargs} --cov={module}".format( + with cwd(repo_root): + run_result = run_command("pytest --color=yes {posargs} --cov={module}".format( posargs=tests_dir, module=module )) - - return return_code + return run_result.returncode if __name__ == "__main__": diff --git a/setup.py b/setup.py index 655bceb..bef0274 100644 --- a/setup.py +++ b/setup.py @@ -12,11 +12,11 @@ author_email="my.self@example.com", url="https://github.com/me/my_other_package", description="Descriptive", - license="Open-Source-Baybeeeeee-1.0", + license="Open-Source-1.0", install_requires=["setuptools"], packages=["my_other_package"], package_data={"my_other_package": ["py.typed", "bar.pyi"]}, - data_files=[], + data_files=[] ) References: @@ -30,7 +30,7 @@ import sys # io.open is needed for projects that support Python 2.7. It ensures open() # defaults to text mode with universal newlines, and accepts an argument to -# specify the text encoding. Python 3 only projects can skip this import. +# specify the text encoding. Python 3 only projects can skip this import. # # References: # https://raw.githubusercontent.com/pypa/sampleproject/master/setup.py @@ -41,19 +41,28 @@ ROOT = path.abspath(path.dirname(__file__)) +DEFAULT_LICENSE = "MIT" + MODULE_META_RE = re.compile( - r"^__(?P.*)__ = ['\"](?P[^'\"]*)['\"]", re.M + r'^__(?P[a-zA-Z0-9_]+)__\s*=\s*([\'"])(?P.*?)(? Any: + """Retrieve the value of the configuration attribute. + + Args: + instance (Any): The instance of the class. + owner (Type, optional): The owner class. + + Returns: + Any: The value of the configuration attribute. + """ if instance is None: return self value = instance.config[self.__name__] @@ -141,7 +159,14 @@ def __get__(self, instance, owner=None): value = self.get_converter(value) return value - def __set__(self, instance, value): + def __set__(self, instance: Any, value: Any) -> None: + """ + Set the value of the configuration attribute. + + Args: + instance (Any): The instance of the class. + value (Any): The value to set. + """ instance.set(self.__name__, value) @@ -190,7 +215,7 @@ class Configuration(dict): # noinspection PyMissingConstructor # pylint: disable=super-init-not-called - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any): # record user provided options self._user_options = self._make_options(*args, **kwargs) # merge the user_provided options onto the default options @@ -199,7 +224,7 @@ def __init__(self, *args, **kwargs): # set the attributes based on the config_vars dict.update(self, config_vars) - def _make_options(self, *args, **kwargs): + def _make_options(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: """Set configuration values. Returns: @@ -411,7 +436,7 @@ def set(self, key, value): # get - def get(self, key, default=None, strict=False): + def get(self, key, default=None, strict=False) -> Any: """Get configuration value. """ if key not in self.OPTIONS: @@ -420,7 +445,7 @@ def get(self, key, default=None, strict=False): return default return dict.__getitem__(self, key) - def getall(self, keys, default=None, strict=False): + def getall(self, keys, default=None, strict=False) -> List[Any]: """Get configuration value. Returns: @@ -430,7 +455,7 @@ def getall(self, keys, default=None, strict=False): # merge - def merge(self, other): + def merge(self, other: "Configuration") -> "Configuration": """Merge current config with another config. This will merge in all non-default values from the diff --git a/skeleton/log.py b/skeleton/log.py index 741f831..b1e1635 100644 --- a/skeleton/log.py +++ b/skeleton/log.py @@ -521,3 +521,78 @@ def format(self, record): """ result = super(SyslogBOMFormatter, self).format(record) return "ufeff{0}".format(result) + + +class JSONFormatter(logging.Formatter): + """JSON Log Formatter. + + This formatter outputs log records in JSON format, which is useful + for structured logging and can be easily parsed by log management systems. + + Example usage: + logger = logging.getLogger("json_logger") + handler = logging.StreamHandler() + handler.setFormatter(JSONFormatter()) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + """ + + def format(self, record): + """Format Log Record as JSON. + + Args: + record (logging.LogRecord): Log Record to format. + + Returns: + str: JSON formatted log record. + """ + log_record = { + "time": self.formatTime(record, self.datefmt), + "level": record.levelname, + "message": record.getMessage(), + "module": record.module, + } + return json.dumps(log_record) + +# Log filters and handlers can be added here as needed. +# +# class ContextFilter(logging.Filter): +# """Context Filter to add user_id to log records. +# +# This filter adds a user_id to each log record, which can be useful +# for tracking logs related to a specific user in applications. +# +# Example usage: +# logger = logging.getLogger("contextual_logger") +# logger.addFilter(ContextFilter(user_id="12345")) +# logger.info("Log with user context") +# """ +# +# def __init__(self, user_id): +# super().__init__() +# self.user_id = user_id +# +# def filter(self, record): +# record.user_id = self.user_id +# return True +# +# +# class DatabaseHandler(logging.Handler): +# """Custom logging handler to save logs to a database. +# +# This is a placeholder for an actual database handler. +# You would implement the logic to connect to your database +# and save the log records. +# +# Example usage: +# db_handler = DatabaseHandler() +# logger = logging.getLogger("custom_handler_logger") +# logger.addHandler(db_handler) +# logger.setLevel(logging.INFO) +# logger.info("Custom handler log example") +# """ +# +# def emit(self, record): +# # Example: Save log to a database +# log_entry = self.format(record) +# print(f"Saving to database: {log_entry}") diff --git a/skeleton/utils.py b/skeleton/utils.py index c66b593..9975b95 100644 --- a/skeleton/utils.py +++ b/skeleton/utils.py @@ -1,10 +1,20 @@ # coding=utf8 -"""utils.py +"""types.py -Generic Utilities +Shared type aliases and interfaces. + +This file defines shared types used across this project to keep annotations +consistent, reduce duplication, and simplify updates. Useful for paths, data +structures, and function signatures. + +References: +- PEP 484: https://peps.python.org/pep-0484/ +- PEP 586: https://peps.python.org/pep-0586/ +- typing documentation: https://docs.python.org/3/library/typing.html """ from __future__ import absolute_import, print_function, unicode_literals +import itertools import os import pickle import re @@ -12,13 +22,18 @@ import zlib from base64 import b64decode from contextlib import contextmanager -from datetime import timedelta +from datetime import date, datetime, timedelta from errno import EEXIST from io import UnsupportedOperation from itertools import groupby, islice from math import ceil from operator import itemgetter from string import Formatter +from typing import ( + Iterable, Iterator, List, + Optional, Sequence, TypeVar, + Union, overload +) from six import integer_types, iteritems, string_types, text_type from six.moves import filter, map @@ -162,12 +177,12 @@ def as_number(value): if isinstance(value, integer_types): return value if isinstance(value, string_types): - if value.isnumeric(): - value = int(value) - elif value.isdecimal(): - value = float(value) - elif value.isdigit(): - value = int(value) + if value.isdigit(): + return int(value) + try: + return float(value) + except ValueError: + return value return value @@ -254,35 +269,35 @@ def run_in_separate_process(func, *args, **kwargs): """Run function in a separate_process. Args: - func (Callable): Function to invoked. + func (Callable): Function to invoke. Returns: return value of the invoked function """ - # create a pipe. + # create a pipe for inter-process communication. read_fd, write_fd = os.pipe() - # fork a child process. + # fork the current process pid = os.fork() - if pid > 0: # in child process - - # close write file descriptor. + if pid > 0: # in parent process + # close the write end of the pipe in the parent os.close(write_fd) - # open read file descriptor. + # read the result from the child process. with os.fdopen(read_fd, "rb") as fd: status, result = pickle.load(fd) - # wait for completion of child process. + # wait for the child process to finish os.waitpid(pid, 0) if status == 0: return result - raise result - os.close(read_fd) # close read file descriptor + # child process: close the read end and write end of the pipe. + os.close(read_fd) + os.close(write_fd) try: # call the function. status: success=0 fail=1 @@ -291,19 +306,23 @@ def run_in_separate_process(func, *args, **kwargs): result, status = e, 1 with os.fdopen(write_fd, "wb") as fd: + fd_typed = cast(SupportsWrite[bytes], fd) + try: # dump results. pickle.dump( (status, result), fd, pickle.HIGHEST_PROTOCOL ) - except pickle.PicklingError as e: + except pickle.PicklingError: + # if the result cannot be pickled, write a generic error. pickle.dump( - (2, e), + (2, "PicklingError occurred"), fd, pickle.HIGHEST_PROTOCOL ) + # exit the child process os._exit(0) # noqa @@ -367,7 +386,7 @@ def mkdir_p(path): raise -def touch(filepath): +def touch(filepath: Union[str, bytes]): """Equivalent of Unix `touch` command. Args: @@ -498,7 +517,32 @@ def _format(value): return dict(zip(keys, func(values))) -def chunk(iterable, size): +T = TypeVar("T") + + +def pad_iterable(iterable, size, fillvalue=None): + """Pads an iterable to a fixed size with a fill value. + + Args: + iterable (Iterable[T]): Iterable to pad. + size (int): Desired length. + fillvalue (Optional[T]): Value to pad with. + + Returns: + List[Optional[T]]: Padded list. + """ + return list( + itertools.islice( + itertools.chain( + iterable, + itertools.repeat(fillvalue) + ), + size + ) + ) + + +def chunk(iterable: Sequence[T], size: int) -> Iterator[List[T]]: """Splits an iterable into batches of the specified size. Notes: @@ -506,17 +550,17 @@ def chunk(iterable, size): through it. Args: - iterable (Iterable): Iterable of data. + iterable (Sequence): Iterable of data. size (int): Chunk size. Yields: list: Chunk of data from iterable. """ for i in range(0, len(iterable), size): - yield iterable[i:i + size] + yield list(iterable[i:i + size]) -def iterchunk(iterable, size): +def iterchunk(iterable: Iterable[T], size: int) -> Iterator[List[T]]: """Splits an iterable into chunks of size chunksize. Notes: @@ -529,42 +573,47 @@ def iterchunk(iterable, size): Yields: list: Chunk of data from iterable. """ - result = [] + result: List[T] = [] for i in iterable: result.append(i) if len(result) == size: yield result result = [] - # TODO: if the length of the last chunk does not - # equal the provided size, then fill (append to) - # chunk until its length is equal to size. - if result: - # handle last chunk + if result: # handle last chunk + if len(result) < size: + # pad the last chunk to a fixed size + result.extend([None] * (size - len(result))) yield result -# TODO: add option to fill the last chunk to the specified size. -def chunkify(iterable, size): +def chunkify( + iterable: Iterable[T], + size: int, + fill_value: Optional[T] = None +) -> Iterator[List[Optional[T]]]: """Splits an iterable into chunks of size chunksize. Notes: - The last chunk may be smaller than the provided chunksize. + The last chunk will be filled to the specified size with `fill_value`. Args: iterable (Iterable): Iterable of data. size (int): Chunk size. + fill_value (Any): Value to fill the last chunk if it is smaller than `size`. Yields: list: Chunk of data from iterable. """ if size <= 0: raise ValueError("non-positive chunk size: {0}".format(size)) - return ( - chunk - if hasattr(iterable, '__getitem__') - # generator, set, map, etc... - else iterchunk - )(iterable, size) + iterator = iter(iterable) + while True: + chunk = list(islice(iterator, size)) + if not chunk: + break + if len(chunk) < size: + chunk.extend([fill_value] * (size - len(chunk))) + yield chunk def group_continuous(iterable, key=None, start=0): @@ -596,15 +645,18 @@ def grouper(i, value): # datetime -def timestamp_from_datetime(dt, epoch=EPOCH): +def timestamp_from_datetime( + dt: Union[datetime, date], + epoch: Union[datetime, date] = EPOCH +) -> int: """Convert a datetime to a timestamp. References: https://stackoverflow.com/a/8778548/141395 Args: - dt (datetime.datetime or datetime.date): Datetime. - epoch (datetime.datetime or datetime.date): Epoch Datetime. + dt (datetime or date): Datetime. + epoch (datetime or date): Epoch Datetime. Returns: int: Timestamp @@ -613,7 +665,7 @@ def timestamp_from_datetime(dt, epoch=EPOCH): return delta.seconds + delta.days * 86400 -def timedelta_isoformat(td): +def timedelta_isoformat(td: timedelta) -> str: """ISO-8601 encoding for timedelta. Args: @@ -639,7 +691,7 @@ def timedelta_isoformat(td): # noinspection PyArgumentEqualDefault -TIMEDELTA_ZERO = timedelta(0) +TIMEDELTA_ZERO: timedelta = timedelta(0) class DateRange(object): # pylint: disable=useless-object-inheritance @@ -653,9 +705,9 @@ class DateRange(object): # pylint: disable=useless-object-inheritance provided. Args: - start (datetime.datetime or datetime.date): Range Start Time. - stop (datetime.datetime or datetime.date): Range End Time. - step (datetime.timedelta): Range Step Time. + start (datetime or date): Range Start Time. + stop (datetime or date): Range End Time. + step (timedelta): Range Step Time. Examples: > now = datetime.now().date() @@ -665,7 +717,12 @@ class DateRange(object): # pylint: disable=useless-object-inheritance >> print(d) """ - def __init__(self, start=None, stop=None, step=None): + def __init__( + self, + start: Union[date, datetime], + stop: Optional[Union[date, datetime]], + step: timedelta + ): if start is None: raise TypeError("must provide starting point for DateRange") if step is None: @@ -673,12 +730,12 @@ def __init__(self, start=None, stop=None, step=None): # noinspection PyArgumentEqualDefault if step == timedelta(0): raise TypeError("must provide non-zero step for DateRange") - self.start = start - self.stop = stop - self.step = step + self.start: Union[date, datetime] = start + self.stop: Optional[Union[date, datetime]] = stop + self.step: timedelta = step self._has_neg_step = self.step < TIMEDELTA_ZERO - def __repr__(self): + def __repr__(self) -> str: return "{!s}(start={!r}, stop={!r}, step={!r}".format( self.__class__.__name__, self.start, @@ -686,13 +743,13 @@ def __repr__(self): self.step ) - def __reversed__(self): + def __reversed__(self) -> "DateRange": if self.stop: # pylint: disable=invalid-unary-operand-type return DateRange(self.stop, self.start, -self.step) raise ValueError("cannot reverse infinite range") - def __len__(self): + def __len__(self) -> int: if self.stop is None: # it would be nice if float("inf") could be returned raise TypeError("infinite range") @@ -703,7 +760,7 @@ def __len__(self): ) return int(ceil(abs(calc.total_seconds() / self.step.total_seconds()))) - def __contains__(self, value): + def __contains__(self, value: Union[date, datetime]) -> bool: if self.stop is not None: check = ( self.start >= value > self.stop @@ -717,20 +774,21 @@ def __contains__(self, value): difference = value - self.start return difference.total_seconds() % self.step.total_seconds() == 0 - def _check_stop(self, current): + def _check_stop(self, current: Union[date, datetime]) -> bool: if self._has_neg_step: return current <= self.stop return current >= self.stop - def __iter__(self): - current, stopping = self.start, self.stop is not None + def __iter__(self) -> Iterator[Union[date, datetime]]: + current = self.start + stopping = self.stop is not None while True: if stopping and self._check_stop(current): break yield current current += self.step - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, DateRange): return ( self.start == other.start @@ -739,12 +797,23 @@ def __eq__(self, other): ) return NotImplemented - def __ne__(self, other): + def __ne__(self, other: object) -> bool: if isinstance(other, DateRange): return not self == other return NotImplemented - def __getitem__(self, idx_or_slice): + @overload + def __getitem__(self, idx: int) -> Union[date, datetime]: + ... + + @overload + def __getitem__(self, idx: slice) -> "DateRange": + ... + + def __getitem__( + self, + idx_or_slice: Union[int, slice] + ) -> Union[date, datetime, "DateRange"]: if isinstance(idx_or_slice, int): return self._getidx(idx_or_slice) if isinstance(idx_or_slice, slice): @@ -755,7 +824,7 @@ def __getitem__(self, idx_or_slice): ) ) - def _getidx(self, idx): + def _getidx(self, idx: int) -> Union[date, datetime]: if not self.stop and idx < 0: raise IndexError("Cannot negative index infinite range") if self.stop and abs(idx) > len(self) - 1: @@ -766,13 +835,13 @@ def _getidx(self, idx): idx += len(self) return self.start + (self.step * idx) - def _getslice(self, slice_): + def _getslice(self, slice_: slice) -> "DateRange": sss = slice_.start, slice_.stop, slice_.step # if s == (None, None, None) or s == (None, None, 1): if sss in [(None, None, None), (None, None, 1)]: return DateRange(start=self.start, stop=self.stop, step=self.step) start, stop, step = sss - # seems redundant but we are converting None -> 0 + # seems redundant, but we are converting None -> 0 start = start or 0 stop = stop or 0 step = step or 1 # use 1 here because of multiplication diff --git a/tests/conftest.py b/tests/conftest.py index 5f38c9f..c7d1f60 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,6 +52,18 @@ def module_name(): return _modname(where=repo_root) +@pytest.fixture(name="version_file") +def version_file(): + """Version file fixture. + + Returns: + str: Path to the version file. + """ + return os.path.join( + os.path.dirname(__file__), "..", "skeleton", "__version__.py" + ) + + @pytest.fixture(name="prepared_request") def prepared_request(): """PreparedRequest Fixture for Testing `skeleton.log_request` diff --git a/tests/test_log.py b/tests/test_log.py index b8b275a..0485886 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -3,10 +3,12 @@ """ from __future__ import absolute_import, print_function, unicode_literals +import logging from unittest.mock import patch +import pytest + from skeleton.log import ( - apply_session_hook, log_request, log_request_response, log_response @@ -30,9 +32,10 @@ def test_log_request(prepared_request): Args: prepared_request (PreparedRequest): PreparedRequest Instance """ - with patch("skeleton.log.logger") as mock_logger: + logging.basicConfig(level=logging.DEBUG) + with patch("skeleton.log.LOGGER") as mock_logger: log_request(prepared_request) - mock_logger.info.assert_called() + mock_logger.debug.assert_called() def test_log_request_content(prepared_request, log_content=True): @@ -41,9 +44,10 @@ def test_log_request_content(prepared_request, log_content=True): Args: prepared_request (PreparedRequest): PreparedRequest Instance """ - with patch("skeleton.log.logger") as mock_logger: + logging.basicConfig(level=logging.DEBUG) + with patch("skeleton.log.LOGGER") as mock_logger: log_request(prepared_request, log_content=log_content) - mock_logger.info.assert_called() + mock_logger.debug.assert_called() def test_log_response(response): @@ -52,9 +56,10 @@ def test_log_response(response): Args: response (Response): Response Instance """ - with patch("skeleton.log.logger") as mock_logger: + logging.basicConfig(level=logging.DEBUG) + with patch("skeleton.log.LOGGER") as mock_logger: log_response(response) - mock_logger.info.assert_called() + mock_logger.debug.assert_called() def test_log_response_content(response, log_content=True): @@ -63,9 +68,10 @@ def test_log_response_content(response, log_content=True): Args: response (Response): Response Instance """ - with patch("skeleton.log.logger") as mock_logger: + logging.basicConfig(level=logging.DEBUG) + with patch("skeleton.log.LOGGER") as mock_logger: log_response(response, log_content=log_content) - mock_logger.info.assert_called() + mock_logger.debug.assert_called() def test_log_request_response(response, log_content=False): @@ -74,20 +80,44 @@ def test_log_request_response(response, log_content=False): Args: response (Response): Response Instance """ - with patch("skeleton.log.logger") as mock_logger: + logging.basicConfig(level=logging.DEBUG) + with patch("skeleton.log.LOGGER") as mock_logger: log_request_response(response, log_content=log_content) - mock_logger.info.assert_called() + mock_logger.debug.assert_called() -def test_log_request_response_content(response, log_content=True): +def test_log_request_response_content( + response, + caplog: pytest.LogCaptureFixture, + log_content: bool = True, +): """Test `skeleton.log_request_response` with log_content set to True. Args: response (Response): Response Instance """ - with patch("skeleton.log.logger") as mock_logger: + import logging + + logging.basicConfig(level=logging.DEBUG) + + from skeleton.log import LOGGER + + LOGGER.setLevel(logging.DEBUG) + # Ensure the logger propagates to the root logger + LOGGER.propagate = True + + with caplog.at_level(logging.DEBUG, logger=LOGGER.name): log_request_response(response, log_content=log_content) - mock_logger.info.assert_called() + log_text = caplog.text + assert "REQUEST:" in log_text + assert "- METHOD:" in log_text + assert "- URL:" in log_text + assert "- HEADERS:" in log_text + assert "- BODY:" in log_text + assert "RESPONSE:" in log_text + assert "- HEADERS:" in log_text + assert "- STATUS CODE:" in log_text + assert "- CONTENT:" in log_text def test_apply_session_hook(session, log_content=False): @@ -96,19 +126,29 @@ def test_apply_session_hook(session, log_content=False): Args: session (requests.Session): Session Instance """ + from functools import partial + from skeleton.log import log_request_response, apply_session_hook + apply_session_hook(session, log_content=log_content) - assert session.hooks["response"] == [log_response], ( - "log_response was not set as a session hook" + hook = session.hooks["response"][0] + assert isinstance(hook, partial), "Session hook is not a functools.partial" + assert hook.func is log_request_response, ( + "Session hook does not use log_request_response" ) def test_apply_session_hook_content(session, log_content=True): - """Test `skeleton.apply_session_hook` with log_content set to True. + """Test `skeleton.apply_session_hook` with `log_content` set to True. Args: session (requests.Session): Session Instance """ + from functools import partial + from skeleton.log import log_request_response, apply_session_hook + apply_session_hook(session, log_content=log_content) - assert session.hooks["response"] == [log_response], ( - "log_response was not set as a session hook" + hook = session.hooks["response"][0] + assert isinstance(hook, partial), "Session hook is not a functools.partial" + assert hook.func is log_request_response, ( + "Session hook does not use log_request_response" ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 8e30cea..f48e145 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,13 +4,14 @@ from __future__ import absolute_import, print_function, unicode_literals import logging +import os from six import binary_type, integer_types from skeleton.utils import ( as_bool, as_number, - safe_b64decode, + is_file_newer_than_file, safe_b64decode, to_valid_filename, to_valid_module_name ) @@ -65,3 +66,114 @@ def test_to_valid_module_name(modname): valid_module_name = to_valid_module_name(modname) LOGGER.debug("input: %s output: %s", modname, valid_module_name) assert modname != valid_module_name + + +def test_as_number_converts_numeric_string_to_int(): + """Test that as_number converts numeric strings to int.""" + assert as_number("42") == 42 + + +def test_as_number_converts_decimal_string_to_float(): + """Test that as_number converts decimal strings to float.""" + assert as_number("3.14") == 3.14 + + +def test_as_number_returns_int_for_int_input(): + """Test that as_number returns the original integer value.""" + assert as_number(7) == 7 + + +def test_as_number_returns_value_for_non_numeric_string(): + """Test that as_number returns the original value for non-numeric strings.""" + assert as_number("foo") == "foo" + + +def test_as_bool_converts_true_strings_to_true(): + """Test that as_bool converts various true strings to True.""" + for val in ["y", "yes", "t", "true", "on", "1"]: + assert as_bool(val) is True + + +def test_as_bool_converts_false_strings_to_false(): + """Test that as_bool converts various false strings to False.""" + for val in ["n", "no", "f", "false", "off", "0"]: + assert as_bool(val) is False + + +def test_as_bool_returns_bool_for_bool_input(): + """Test that as_bool returns the original boolean value.""" + assert as_bool(True) is True + assert as_bool(False) is False + + +def test_as_bool_returns_value_for_invalid_string(): + """Test that as_bool returns the original value for non-boolean strings.""" + assert as_bool("maybe") == "maybe" + + +def safe_b64decode_decodes_padded_base64(): + """Test that safe_b64decode can decode padded base64.""" + assert safe_b64decode("Q2hhZAo=") == b"Chad\n" + + +def safe_b64decode_decodes_unpadded_base64(): + """Test that safe_b64decode can decode unpadded base64.""" + assert safe_b64decode("Q2hhZAo") == b"Chad\n" + + +def to_valid_filename_replaces_invalid_characters(): + """Test that invalid characters are replaced in filenames.""" + assert to_valid_filename("My File@2024!.txt") == "my-file-2024-.txt" + + +def to_valid_filename_allows_valid_characters(): + """Test that valid characters are preserved in filenames.""" + assert to_valid_filename("abc-123.txt") == "abc-123.txt" + + +def to_valid_module_name_replaces_hyphens_with_underscores(): + """Test that hyphens are replaced with underscores in module names.""" + assert to_valid_module_name("my-module") == "my_module" + + +def to_valid_module_name_handles_file_extensions(): + """Test that file extensions are preserved in module names.""" + assert to_valid_module_name("my-module.py") == "my_module.py" + + +def test_is_file_newer_than_file_returns_true_for_newer_file(tmp_path): + """Test that is_file_newer_than_file returns True for a newer file.""" + file_a = tmp_path / "a.txt" + file_b = tmp_path / "b.txt" + file_a.write_text("a") + file_b.write_text("b") + os.utime( + file_a, + (os.path.getmtime(file_b) + 10, os.path.getmtime(file_b) + 10) + ) + assert is_file_newer_than_file(str(file_a), str(file_b)) is True + + +def test_is_file_newer_than_file_returns_false_for_older_file(tmp_path): + """Test that is_file_newer_than_file returns False for an older file.""" + file_a = tmp_path / "a.txt" + file_b = tmp_path / "b.txt" + file_a.write_text("a") + file_b.write_text("b") + os.utime( + file_b, + (os.path.getmtime(file_a) + 10, os.path.getmtime(file_a) + 10) + ) + assert is_file_newer_than_file(str(file_a), str(file_b)) is False + + +def test_is_file_newer_than_file_handles_missing_file(tmp_path): + """Test that is_file_newer_than_file returns True for a + file compared to a missing file. + """ + file_a = tmp_path / "a.txt" + file_a.write_text("a") + assert is_file_newer_than_file( + str(file_a), + str(tmp_path / "missing.txt") + ) is True diff --git a/tests/test_version.py b/tests/test_version.py new file mode 100644 index 0000000..e75eab4 --- /dev/null +++ b/tests/test_version.py @@ -0,0 +1,67 @@ +# coding=utf8 +"""test_version.py +""" +from __future__ import absolute_import, print_function, unicode_literals + +import re + +try: + import importlib.metadata as importlib_metadata +except ImportError: + import importlib_metadata # noqa + +from skeleton import __version__ as code_version + + +def __find_meta(filepath, **kwargs): + """Extract metadata from a Python file. + + Args: + filepath (str): Path to the file. + **kwargs: Additional arguments for `open()`. + + Returns: + dict: A dictionary containing the metadata. + """ + if "b" not in kwargs.get("mode", ""): + kwargs.setdefault("encoding", "utf8") + + with open(filepath, **kwargs) as fd: + content = fd.read() + + pattern = re.compile( + r'^__(?P[a-zA-Z0-9_]+)__\s*=\s*([\'"])(?P.*?)(?