diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..fde6ed3 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,13 @@ +repos: +- repo: https://github.com/psf/black + rev: 23.11.0 + hooks: + - id: black +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.7.0 + hooks: + - id: mypy +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.1.6 + hooks: + - id: ruff \ No newline at end of file diff --git a/main.py b/manual_tests/box_example.py similarity index 100% rename from main.py rename to manual_tests/box_example.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..35e8ad7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[tool.ruff] +fix = true +preview = true +extend-select = ["UP", "FURB", "B"] + +[tool.mypy] +strict=true \ No newline at end of file diff --git a/rclone_python/rclone.py b/rclone_python/rclone.py index e2d964a..f9483d7 100644 --- a/rclone_python/rclone.py +++ b/rclone_python/rclone.py @@ -1,9 +1,27 @@ +from __future__ import annotations + import json import re import logging +import subprocess from functools import wraps from shutil import which -from typing import Optional, Union, List, Dict, Callable +from typing import ( + Optional, + Dict, + Callable, + TypeVar, + ParamSpec, + Sequence, + Any, + cast, + List, + Union, +) + +import rich.progress + +from rich.progress import Progress from rclone_python import utils from rclone_python.hash_types import HashTypes @@ -12,10 +30,17 @@ # debug flag enables/disables raw output of rclone progresses in the terminal DEBUG = False +_LISTENER = Optional[Callable[[Dict[str, Any]], None]] +_PROGRESS = Optional[Progress] + + +_T = TypeVar("_T") +_PARAMS = ParamSpec("_PARAMS") + -def __check_installed(func): +def __check_installed(func: Callable[_PARAMS, _T]) -> Callable[_PARAMS, _T]: @wraps(func) - def wrapper(*args, **kwargs): + def wrapper(*args: _PARAMS.args, **kwargs: _PARAMS.kwargs) -> _T: if not is_installed(): raise Exception( "rclone is not installed on this system. Please install it here: https://rclone.org/" @@ -34,7 +59,7 @@ def is_installed() -> bool: @__check_installed -def about(remote_name: str): +def about(remote_name: str) -> object: """ Executes the rclone about command and returns the retrieved json as a dictionary. :param remote_name: The name of the remote to examine. @@ -74,11 +99,11 @@ def check_remote_existing(remote_name: str) -> bool: @__check_installed def create_remote( remote_name: str, - remote_type: Union[str, RemoteTypes], - client_id: Union[str, None] = None, - client_secret: Union[str, None] = None, - **kwargs, -): + remote_type: str | RemoteTypes, + client_id: str | None = None, + client_secret: str | None = None, + **kwargs: object, +) -> None: """Creates a new remote with name, type and options. Args: @@ -123,12 +148,12 @@ def create_remote( def copy( in_path: str, out_path: str, - ignore_existing=False, - show_progress=True, - listener: Callable[[Dict], None] = None, - args=None, - pbar=None, -): + ignore_existing: bool = False, + show_progress: bool = True, + listener: _LISTENER = None, + args: list[str] | None = None, + pbar: _PROGRESS = None, +) -> None: """ Copies a file or a directory from a src path to a destination path. :param in_path: The source path to use. Specify the remote with 'remote_name:path_on_remote' @@ -158,12 +183,12 @@ def copy( def copyto( in_path: str, out_path: str, - ignore_existing=False, - show_progress=True, - listener: Callable[[Dict], None] = None, - args=None, - pbar=None, -): + ignore_existing: bool = False, + show_progress: bool = True, + listener: _LISTENER = None, + args: list[str] | None = None, + pbar: _PROGRESS = None, +) -> None: """ Copies a file or a directory from a src path to a destination path and is typically used when renaming a file is necessary. :param in_path: The source path to use. Specify the remote with 'remote_name:path_on_remote' @@ -193,12 +218,12 @@ def copyto( def move( in_path: str, out_path: str, - ignore_existing=False, - show_progress=True, - listener: Callable[[Dict], None] = None, - args=None, - pbar=None, -): + ignore_existing: bool = False, + show_progress: bool = True, + listener: _LISTENER = None, + args: list[str] | None = None, + pbar: _PROGRESS = None, +) -> None: """ Moves a file or a directory from a src path to a destination path. :param in_path: The source path to use. Specify the remote with 'remote_name:path_on_remote' @@ -228,12 +253,12 @@ def move( def moveto( in_path: str, out_path: str, - ignore_existing=False, - show_progress=True, - listener: Callable[[Dict], None] = None, - args=None, - pbar=None, -): + ignore_existing: bool = False, + show_progress: bool = True, + listener: _LISTENER = None, + args: list[str] | None = None, + pbar: _PROGRESS = None, +) -> None: """ Moves a file or a directory from a src path to a destination path and is typically used when renaming is necessary. :param in_path: The source path to use. Specify the remote with 'remote_name:path_on_remote' @@ -263,11 +288,11 @@ def moveto( def sync( src_path: str, dest_path: str, - show_progress=True, - listener: Callable[[Dict], None] = None, - args=None, - pbar=None, -): + show_progress: bool = True, + listener: _LISTENER = None, + args: list[str] | None = None, + pbar: _PROGRESS = None, +) -> None: """ Sync the source to the destination, changing the destination only. Doesn't transfer files that are identical on source and destination, testing by size and modification time or MD5SUM. :param in_path: The source path to use. Specify the remote with 'remote_name:path_on_remote' @@ -293,7 +318,7 @@ def sync( @__check_installed -def get_remotes() -> List[str]: +def get_remotes() -> list[str]: """ :return: A list of all available remotes. """ @@ -306,7 +331,7 @@ def get_remotes() -> List[str]: @__check_installed -def purge(path: str, args=None): +def purge(path: str, args: list[str] | None = None) -> None: """ Purges the specified folder. This means that unlike with delete, also all the folders are removed. :param args: List of additional arguments/ flags. @@ -326,7 +351,7 @@ def purge(path: str, args=None): @__check_installed -def delete(path: str, args=None): +def delete(path: str, args: list[str] | None = None) -> None: """ Deletes a file or a folder. When deleting a folder, all the files in it and it's subdirectories are removed, but not the folder structure itself. @@ -350,9 +375,9 @@ def delete(path: str, args=None): @__check_installed def link( path: str, - expire: Union[str, None] = None, - unlink=False, - args=None, + expire: str | None = None, + unlink: bool = False, + args: list[str] | None = None, ) -> str: """ Generates a public link to a file/directory. @@ -371,7 +396,7 @@ def link( if expire is not None: args.append(f"--expire {expire}") if unlink: - args.append(f"--unlink") + args.append("--unlink") process = utils.run_cmd(command, args) @@ -384,11 +409,11 @@ def link( @__check_installed def ls( path: str, - max_depth: Union[int, None] = None, - dirs_only=False, - files_only=False, - args=None, -) -> List[Dict[str, Union[int, str]]]: + max_depth: int | None = None, + dirs_only: bool = False, + files_only: bool = False, + args: list[str] | None = None, +) -> list[dict[str, int | str]]: """ Lists the files in a directory. :param path: The path to the folder that should be examined. @@ -408,21 +433,21 @@ def ls( if max_depth is not None: args.append(f"--max-depth {max_depth}") if dirs_only: - args.append(f"--dirs-only") + args.append("--dirs-only") if files_only: args.append("--files-only") process = utils.run_cmd(command, args) if process.returncode == 0: - return json.loads(process.stdout) + return cast(List[Dict[str, Union[int, str]]], json.loads(process.stdout)) else: raise Exception(f"ls operation on {path} failed with:\n{process.stderr}") def tree( path: str, - args: List[str] = None, + args: list[str] | None = None, ) -> str: """Returns the contents of the remote path in a tree like fashion. @@ -446,13 +471,13 @@ def tree( @__check_installed def hash( - hash: Union[str, HashTypes], + hash: str | HashTypes, path: str, - download=False, - checkfile: Optional[str] = None, - output_file: Optional[str] = None, - args: List[str] = None, -) -> Union[None, str, bool, Dict[str, str], Dict[str, bool]]: + download: bool = False, + checkfile: str | None = None, + output_file: str | None = None, + args: list[str] | None = None, +) -> None | str | bool | dict[str, str] | dict[str, bool]: """Produces a hashsum file for all the objects in the path. Args: @@ -492,9 +517,11 @@ def hash( if output_file is not None: args.append(f'--output-file "{output_file}"') - process: str = utils.run_cmd(f'rclone hashsum "{hash}" "{path}"', args) + process: subprocess.CompletedProcess[str] = utils.run_cmd( + f'rclone hashsum "{hash}" "{path}"', args + ) - lines = process.stdout.splitlines() + lines: list[str] = process.stdout.splitlines() exception = False @@ -503,42 +530,41 @@ def hash( exception = True else: # validate that the checkfile command succeeded, by checking if the output has the expected form - for l in lines: - if not (l.startswith("= ") or l.startswith("* ")): - exception = True - break + exception = not all(line.startswith(("= ", "* ")) for line in lines) if exception: - raise Exception( + raise RuntimeError( f"hashsum operation on {path} with hash='{hash}' failed with:\n{process.stderr}" ) if output_file is None: # each line contains the hashsum first, followed by the name of the file - hashsums = {} + hashsums: dict[str, str] | dict[str, bool] = {} - for l in lines: - if len(l) > 0: - value, key = l.split() + for line in lines: + if line: + value, key = line.split() if checkfile is None: - hashsums[key] = value + hashsums[key] = value # type: ignore[assignment] else: # in checkfile mode, value is '=' for valid and '*' for invalid files - hashsums[key] = value == "=" + hashsums[key] = value == "=" # type: ignore[assignment] # for only a single file return the value instead of the dict if len(hashsums) == 1: - return next(iter(hashsums.values())) + return list(hashsums.values())[0] # type: ignore[return-value] return hashsums + else: + return None @__check_installed def version( - check=False, - args: List[str] = None, -) -> Union[str, List[str]]: + check: bool = False, + args: list[str] | None = None, +) -> str | tuple[str, str, str]: """Get the rclone version number. Args: @@ -572,7 +598,7 @@ def version( class RcloneException(ChildProcessError): - def __init__(self, description, error_msg): + def __init__(self, description: str, error_msg: str): self.description = description self.error_msg = error_msg super().__init__(f"{description}. Error message: \n{error_msg}") @@ -584,12 +610,12 @@ def _rclone_transfer_operation( out_path: str, command: str, command_descr: str, - ignore_existing=False, - show_progress=True, - listener: Callable[[Dict], None] = None, - args=None, - pbar=None, -): + ignore_existing: bool = False, + show_progress: bool = True, + listener: _LISTENER = None, + args: Sequence[str] | None = None, + pbar: rich.progress.Progress | None = None, +) -> None: """Executes the rclone transfer operation (e.g. copyto, move, ...) and displays the progress of every individual file. Args: diff --git a/rclone_python/scripts/get_version.py b/rclone_python/scripts/get_version.py index b1d30fd..7a16d90 100644 --- a/rclone_python/scripts/get_version.py +++ b/rclone_python/scripts/get_version.py @@ -1,7 +1,7 @@ from subprocess import check_output -def get_version(): +def get_version() -> str: stdout = check_output("rclone version", shell=True, encoding="utf8") return stdout.split("\n")[0].replace("rclone ", "") diff --git a/rclone_python/scripts/update_hash_types.py b/rclone_python/scripts/update_hash_types.py index 92fbd0a..84fe7ff 100644 --- a/rclone_python/scripts/update_hash_types.py +++ b/rclone_python/scripts/update_hash_types.py @@ -6,7 +6,7 @@ from get_version import get_version -def update_hashes(output_path: str): +def update_hashes(output_path: str) -> None: """Updates the hash_types.py file to include all supported hash algorithms. Args: @@ -17,10 +17,7 @@ def update_hashes(output_path: str): rclone_output = sp.check_output("rclone hashsum", shell=True, encoding="utf8") lines = rclone_output.splitlines() - hashes = [] - - for l in lines[1:]: - hashes.append(l.replace("*", "").strip()) + hashes = [line.replace("*", "").strip() for line in lines[1:]] with open(output_path, "w") as o: o.write("from enum import Enum") diff --git a/rclone_python/scripts/update_remote_types.py b/rclone_python/scripts/update_remote_types.py index b95b6d4..06b7654 100755 --- a/rclone_python/scripts/update_remote_types.py +++ b/rclone_python/scripts/update_remote_types.py @@ -6,7 +6,7 @@ from get_version import get_version -def extract_remote_names(output_path: str = None) -> str: +def extract_remote_names(output_path: str) -> None: """Updates the remote_types.py file to the newest supported backends. Args: diff --git a/rclone_python/utils.py b/rclone_python/utils.py index 3f07696..fc59f30 100644 --- a/rclone_python/utils.py +++ b/rclone_python/utils.py @@ -1,11 +1,11 @@ +from __future__ import annotations import re import subprocess -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, Optional, Sequence from rich.progress import Progress, TaskID, Task from pathlib import Path from rich.progress import ( - Progress, TextColumn, BarColumn, TaskProgressColumn, @@ -18,23 +18,24 @@ # General Functions # # ---------------------------------------------------------------------------- # +_LISTENER = Optional[Callable[[Dict[str, Any]], None]] -def args2string(args: List[str]) -> str: + +def args2string(args: Sequence[str]) -> str: # separate flags/ named arguments by a space return " ".join(args) def run_cmd( - command: str, args: List[str] = (), shell=True, encoding="utf-8" -) -> subprocess.CompletedProcess: + command: str, args: Sequence[str] = (), shell: bool = True, encoding: str = "utf-8" +) -> subprocess.CompletedProcess[str]: # add optional arguments and flags to the command args_str = args2string(args) command = f"{command} {args_str}" return subprocess.run( command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + capture_output=True, shell=shell, encoding=encoding, ) @@ -63,7 +64,7 @@ def convert2bits(value: float, unit: str) -> float: Returns: float: The corresponding bit value. """ - exp = { + exp: dict[str, int] = { "B": 0, "KiB": 1, "MiB": 2, @@ -75,7 +76,7 @@ def convert2bits(value: float, unit: str) -> float: "YiB": 8, } - return value * 1024 ** exp[unit] + return float(value * (1024 ** exp[unit])) # ---------------------------------------------------------------------------- # @@ -86,31 +87,33 @@ def convert2bits(value: float, unit: str) -> float: def rclone_progress( command: str, pbar_title: str, - stderr=subprocess.PIPE, - show_progress=True, - listener: Callable[[Dict], None] = None, - debug=False, - pbar: Optional[Progress] = None, -) -> subprocess.Popen: + stderr: int = subprocess.PIPE, + show_progress: bool = True, + listener: _LISTENER = None, + debug: bool = False, + pbar: Progress | None = None, +) -> subprocess.Popen[bytes]: buffer = "" total_progress_id = None - subprocesses = {} + subprocesses: dict[str, TaskID] = {} if show_progress: if pbar is None: pbar = create_progress_bar() pbar.start() total_progress_id = pbar.add_task(pbar_title, total=None) - + assert pbar is not None process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=stderr, shell=True ) + assert process.stdout is not None for line in iter(process.stdout.readline, b""): var = line.decode() valid, update_dict = extract_rclone_progress(buffer) if valid: + assert update_dict is not None if show_progress: update_tasks(pbar, total_progress_id, update_dict, subprocesses) @@ -129,7 +132,7 @@ def rclone_progress( if show_progress: complete_task(total_progress_id, pbar) - for _, task_id in subprocesses.items(): + for task_id in subprocesses.values(): # hide all subprocesses pbar.update(task_id=task_id, visible=False) pbar.stop() @@ -137,7 +140,7 @@ def rclone_progress( return process -def extract_rclone_progress(buffer: str) -> Tuple[bool, Union[Dict[str, Any], None]]: +def extract_rclone_progress(buffer: str) -> tuple[bool, dict[str, Any] | None]: # matcher that checks if the progress update block is completely buffered yet (defines start and stop) # it gets the sent bits, total bits, progress, transfer-speed and eta reg_transferred = re.findall( @@ -164,7 +167,7 @@ def extract_rclone_progress(buffer: str) -> Tuple[bool, Union[Dict[str, Any], No ) ) - out = {"prog_transferring": prog_transferring} + out: dict[str, Any] = {"prog_transferring": prog_transferring} sent_bits, total_bits, progress, transfer_speed_str, eta = reg_transferred[0] out["progress"] = float(progress.strip()) out["total_bits"] = float(re.findall(r"\d+.\d+", total_bits)[0]) @@ -213,7 +216,7 @@ def get_task(id: TaskID, progress: Progress) -> Task: return None -def complete_task(id: TaskID, progress: Progress): +def complete_task(id: TaskID, progress: Progress) -> None: """Manually sets the progress of the task with the specified TaskID to 100%. Args: @@ -234,9 +237,9 @@ def complete_task(id: TaskID, progress: Progress): def update_tasks( pbar: Progress, total_progress: TaskID, - update_dict: Dict[str, Any], - subprocesses: Dict[str, TaskID], -): + update_dict: dict[str, Any], + subprocesses: dict[str, TaskID], +) -> None: """Updates the total progress as well as all subprocesses (the individual files that are currently uploading). Args: diff --git a/setup.py b/setup.py index 6df71ad..ce3ee10 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,8 @@ from setuptools import setup - +from pathlib import Path from rclone_python import VERSION -with open("README.md", "r") as f: - long_description = f.read() +long_description = Path("README.md").read_text() setup( name="rclone-python",