Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 44 additions & 46 deletions omnitils/api/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
* Copyright (c) Hexproof Systems <hexproofsystems@gmail.com>
* LICENSE: Mozilla Public License 2.0
"""

# Standard Library Imports
from contextlib import suppress
from dataclasses import dataclass
import os
from pathlib import Path
import re
import shutil
from typing import Optional, Callable, TypedDict, NotRequired, Union
from typing import MutableMapping, Optional, Callable, TypedDict, NotRequired, Union

# Third Party Imports
from loguru import logger
Expand All @@ -24,7 +25,8 @@
request_header_default,
get_new_session,
chunk_size_default,
download_file_from_response)
download_file_from_response,
)
from omnitils.files import mkdir_full_perms, get_temporary_file, dump_data_file
from omnitils.strings import decode_url

Expand All @@ -36,10 +38,11 @@
@dataclass
class GoogleReg:
"""Defined Google URL regex patterns."""
URL: re.Pattern = re.compile(r'"downloadUrl":"([^"]+)')
FORM: re.Pattern = re.compile(r'id="download-form" action="(.+?)"')
EXPORT: re.Pattern = re.compile(r'href="(/uc\?export=download[^"]+)')
ERROR: re.Pattern = re.compile(r'<p class="uc-error-subcaption">(.*)</p>')

URL: re.Pattern[str] = re.compile(r'"downloadUrl":"([^"]+)')
FORM: re.Pattern[str] = re.compile(r'id="download-form" action="(.+?)"')
EXPORT: re.Pattern[str] = re.compile(r'href="(/uc\?export=download[^"]+)')
ERROR: re.Pattern[str] = re.compile(r'<p class="uc-error-subcaption">(.*)</p>')


"""
Expand All @@ -49,6 +52,7 @@ class GoogleReg:

class GoogleDriveMetadata(TypedDict):
"""Relevant metadata for a file hosted on Google Drive."""

description: NotRequired[str]
name: str
size: int
Expand All @@ -72,7 +76,7 @@ def gdrive_get_confirmation_url(contents: str) -> yarl.URL:
for line in contents.splitlines():
if m := GoogleReg.EXPORT.search(line):
# Google Docs URL
return decode_url(f'https://docs.google.com{m.groups()[0]}')
return decode_url(f"https://docs.google.com{m.groups()[0]}")
if m := GoogleReg.FORM.search(line):
# Download URL from Form
return decode_url(m.groups()[0])
Expand All @@ -83,14 +87,15 @@ def gdrive_get_confirmation_url(contents: str) -> yarl.URL:
# Error Returned
raise OSError(m.groups()[0])
raise OSError(
"Google Drive file has been made private or has reached its daily request limit.")
"Google Drive file has been made private or has reached its daily request limit."
)


def gdrive_process_url(
url: Union[str, yarl.URL],
sess: Optional[Session] = None,
headers: Optional[dict] = None,
path_cookies: Optional[Path] = None
headers: MutableMapping[str, str | bytes] | None = None,
path_cookies: Optional[Path] = None,
) -> Optional[tuple[Session, Response]]:
"""Tests a Gdrive file URL to ensure it is the absolute download URL. If it isn't,
attempt to redirect to the absolute URL based on Google Drive confirmation. Return a valid session and
Expand All @@ -108,9 +113,8 @@ def gdrive_process_url(
"""
# Ensure session object, then initialize request
if not sess:
sess = get_new_session(
headers=headers, stream=True, path_cookies=path_cookies)
res = sess.get(url)
sess = get_new_session(headers=headers, stream=True, path_cookies=path_cookies)
res = sess.get(str(url))

# Update cookies
if path_cookies:
Expand All @@ -126,10 +130,13 @@ def gdrive_process_url(
url=gdrive_get_confirmation_url(res.text),
sess=sess,
headers=headers,
path_cookies=path_cookies)
path_cookies=path_cookies,
)
except Exception as e:
logger.error(e), res.close(), sess.close()
return logger.error(f'Google Drive denied access to the file!')
logger.error(e)
res.close()
sess.close()
return logger.error("Google Drive denied access to the file!")


"""
Expand All @@ -146,10 +153,12 @@ def gdrive_update_cookies(sess: Session, path_cookies: Path) -> None:
"""
dump_data_file(
obj=[
(k, v) for k, v in sess.cookies.items()
(k, v)
for k, v in sess.cookies.items()
if not k.startswith("download_warning_")
],
path=path_cookies)
path=path_cookies,
)


"""
Expand All @@ -158,9 +167,7 @@ def gdrive_update_cookies(sess: Session, path_cookies: Path) -> None:


def gdrive_get_metadata(
file_id: str,
api_key: str,
header: Optional[dict] = None
file_id: str, api_key: str, header: MutableMapping[str, str | bytes] | None = None
) -> Optional[GoogleDriveMetadata]:
"""Get the metadata of a given template file.

Expand All @@ -178,15 +185,12 @@ def gdrive_get_metadata(
with requests.get(
f"https://www.googleapis.com/drive/v3/files/{file_id}",
headers=header,
params={
'alt': 'json',
'fields': 'description,name,size',
'key': api_key}
params={"alt": "json", "fields": "description,name,size", "key": api_key},
) as req:
if not req.status_code == 200:
return
result = req.json()
if 'name' in result and 'size' in result:
if "name" in result and "size" in result:
return result

# Request was unsuccessful
Expand All @@ -201,11 +205,11 @@ def gdrive_get_metadata(
def gdrive_download_file(
url: Union[yarl.URL, str],
path: Path,
callback: Optional[Callable] = None,
headers: Optional[dict] = None,
callback: Optional[Callable[[int, int], None]] = None,
headers: MutableMapping[str, str | bytes] | None = None,
path_cookies: Optional[Path] = None,
allow_resume: bool = True,
chunk_size: int = chunk_size_default
chunk_size: int = chunk_size_default,
) -> Optional[Path]:
"""Download a file from Google Drive using its file ID.

Expand All @@ -226,8 +230,7 @@ def gdrive_download_file(
"""
# Ensure path and load a temporary file
mkdir_full_perms(path.parent)
file = get_temporary_file(
path=path, ext='.drive', allow_existing=allow_resume)
file = get_temporary_file(path=path, ext=".drive", allow_existing=allow_resume)
size = file.stat().st_size

# Add range header if file is partially downloaded
Expand All @@ -236,36 +239,31 @@ def gdrive_download_file(
headers["Range"] = f"bytes={str(size)}-"

# Attempt to create a session and request from URL
check = gdrive_process_url(
url=url, headers=headers, path_cookies=path_cookies)
check = gdrive_process_url(url=url, headers=headers, path_cookies=path_cookies)
if not check:
return logger.error(
f'Google Drive download failed!\n'
f'{path.name} | {url}')
return logger.error(f"Google Drive download failed!\n{path.name} | {url}")
sess, res = check

# Attempt to download the file
try:
result = download_file_from_response(
response=res,
path=file,
callback=callback,
chunk_size=chunk_size)
response=res, path=file, callback=callback, chunk_size=chunk_size
)
except Exception as e:
# Exception occurred
logger.error(e)
result = None
if result is None:
# Download failed
res.close(), sess.close()
return logger.error(
f'Google Drive download failed!\n'
f'{path.name} | {url}')
res.close()
sess.close()
return logger.error(f"Google Drive download failed!\n{path.name} | {url}")

# Rename temporary file
if not os.path.samefile(file, path):
if not (path.is_file() and os.path.samefile(file, path)):
shutil.move(file, path)

# Close session and return
res.close(), sess.close()
res.close()
sess.close()
return path
27 changes: 14 additions & 13 deletions omnitils/api/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,29 @@
* Copyright (c) Hexproof Systems <hexproofsystems@gmail.com>
* LICENSE: Mozilla Public License 2.0
"""
# Standard Library Imports
import os
import zipfile
from logging import getLogger
from pathlib import Path
import requests
from typing import Optional, Union, Callable
from typing import Callable, Optional, Union

# Third Party Imports
import requests
import yarl
from backoff import on_exception, expo
from ratelimit import RateLimitDecorator, sleep_and_retry
from backoff import expo, on_exception
from limits import RateLimitItemPerHour
from limits.storage import MemoryStorage
from limits.strategies import MovingWindowRateLimiter

# Local Imports
from omnitils.fetch import download_file
from omnitils.fetch._core import chunk_size_default, request_header_default
from omnitils.files.folders import mkdir_full_perms
from omnitils.fetch._core import request_header_default, chunk_size_default
from omnitils.rate_limit import rate_limit

# Rate limiter to safely limit GitHub requests
github_rate_limit = RateLimitDecorator(calls=60, period=3600)
github_rate_limit_authenticated = RateLimitDecorator(calls=7, period=5)
_rate_limit_storage = MemoryStorage()
_rate_limiter = MovingWindowRateLimiter(_rate_limit_storage)
_github_rate_limit = RateLimitItemPerHour(60)
_github_rate_limit_authenticated = RateLimitItemPerHour(5000)

"""
* Handlers
Expand All @@ -34,8 +36,7 @@
def gh_request_handler(func) -> Callable:
"""Wrapper for a GitHub request function to handle retries and rate limits on
unauthenticated requests (60 per hour)."""
@sleep_and_retry
@github_rate_limit
@rate_limit(limiter=_rate_limiter, limit=_github_rate_limit)
@on_exception(expo, requests.exceptions.RequestException, max_tries=2, max_time=1)
def decorator(*args, **kwargs):
return func(*args, **kwargs)
Expand All @@ -46,7 +47,7 @@ def gh_request_handler_authenticated(func) -> Callable:
"""Wrapper for a GitHub request function to handle retries and rate limits on
authenticated requests (5000 per hour)."""
@sleep_and_retry
@github_rate_limit_authenticated
@rate_limit(limiter=_rate_limiter, limit=_github_rate_limit_authenticated)
@on_exception(expo, requests.exceptions.RequestException, max_tries=2, max_time=1)
def decorator(*args, **kwargs):
return func(*args, **kwargs)
Expand Down
50 changes: 41 additions & 9 deletions omnitils/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,28 @@
* Copyright (c) Hexproof Systems <hexproofsystems@gmail.com>
* LICENSE: Mozilla Public License 2.0
"""

# Standard Library Imports
from contextlib import suppress
from logging import getLogger
from typing import Callable, Any, Optional
from typing import Callable, Any, ParamSpec, Protocol, TypeVar, overload

T = TypeVar("T")
V = TypeVar("V")
P = ParamSpec("P")


class ExceptionLogger(Protocol):
def log_exception(self, error: Exception, *args: Any, **kwargs: Any) -> Any: ...


"""
* Utility Decorators
"""


def log_on_exception(logr: Any = None) -> Callable:
def log_on_exception(
logr: ExceptionLogger,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Decorator to log any exception that occurs.

Args:
Expand All @@ -23,21 +34,39 @@ def log_on_exception(logr: Any = None) -> Callable:
Returns:
Wrapped function.
"""
logr = logr or getLogger()
# Normal logger does not have log_exception function
# so we can't default to it.
# logr = logr or getLogger()

def decorator(func):
def wrapper(*args, **kwargs):
def decorator(func: Callable[P, T]):
def wrapper(*args: P.args, **kwargs: P.kwargs):
# Final exception catch
try:
return func(*args, **kwargs)
except Exception as e:
logr.log_exception(e)
raise e

return wrapper

return decorator


def return_on_exception(response: Optional[Any] = None) -> Callable:
@overload
def return_on_exception(
response: T,
) -> Callable[[Callable[P, V]], Callable[P, V | T]]: ...


@overload
def return_on_exception(
response: T | None = None,
) -> Callable[[Callable[P, V]], Callable[P, V | T | None]]: ...


def return_on_exception(
response: T | None = None,
) -> Callable[[Callable[P, V]], Callable[P, V | T | None]]:
"""Decorator to handle any exception and return appropriate failure value.

Args:
Expand All @@ -46,11 +75,14 @@ def return_on_exception(response: Optional[Any] = None) -> Callable:
Returns:
Wrapped function.
"""
def decorator(func):
def wrapper(*args, **kwargs):

def decorator(func: Callable[P, V]):
def wrapper(*args: P.args, **kwargs: P.kwargs):
# Final exception catch
with suppress(Exception):
return func(*args, **kwargs)
return response

return wrapper

return decorator
Loading