From 983889aa0520fada00bca6e342b32318908a2ec7 Mon Sep 17 00:00:00 2001 From: Margubur Rahman Date: Wed, 12 Nov 2025 21:00:16 +0000 Subject: [PATCH] Trying async downloads --- .../_experimental/asyncio/json/__init__.py | 0 .../_experimental/asyncio/json/_helpers.py | 48 ++++++ .../asyncio/json/async_client.py | 143 ++++++++++++++++++ .../_experimental/asyncio/json/download.py | 84 ++++++++++ 4 files changed, 275 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/json/__init__.py create mode 100644 google/cloud/storage/_experimental/asyncio/json/_helpers.py create mode 100644 google/cloud/storage/_experimental/asyncio/json/async_client.py create mode 100644 google/cloud/storage/_experimental/asyncio/json/download.py diff --git a/google/cloud/storage/_experimental/asyncio/json/__init__.py b/google/cloud/storage/_experimental/asyncio/json/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/google/cloud/storage/_experimental/asyncio/json/_helpers.py b/google/cloud/storage/_experimental/asyncio/json/_helpers.py new file mode 100644 index 000000000..5a945b185 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/json/_helpers.py @@ -0,0 +1,48 @@ +"""Async classes for holding the credentials, and connection""" + + +import google.auth._credentials_async +from google.cloud.client import _ClientProjectMixin +from google.cloud.client import _CREDENTIALS_REFRESH_TIMEOUT +from google.auth.transport import _aiohttp_requests as async_requests +from google.cloud.storage import retry as storage_retry +from google.auth import _default_async +from google.api_core import retry_async + + +DEFAULT_ASYNC_RETRY = retry_async.AsyncRetry(predicate=storage_retry._should_retry) + +class Client: + SCOPE = None + # Would be overridden by child classes. + + def __init__(self): + async_creds, _ = _default_async.default_async(scopes=self.SCOPE) + self._async_credentials = google.auth._credentials_async.with_scopes_if_required( + async_creds, scopes=self.SCOPE + ) + self._async_http_internal = None + + @property + def _async_http(self): + if self._async_http_internal is None: + self._async_http_internal = async_requests.AuthorizedSession( + self._async_credentials, + refresh_timeout=_CREDENTIALS_REFRESH_TIMEOUT, + ) + return self._async_http_internal + + async def __aenter__(self): + return self + + async def __aexit__(self, _exc_type, _exc_val, _exc_tb): + if self._async_http_internal is not None: + await self._async_http_internal.close() + + +class ClientWithProjectAsync(Client, _ClientProjectMixin): + _SET_PROJECT = True + + def __init__(self, project=None): + _ClientProjectMixin.__init__(self, project=project) + Client.__init__(self) \ No newline at end of file diff --git a/google/cloud/storage/_experimental/asyncio/json/async_client.py b/google/cloud/storage/_experimental/asyncio/json/async_client.py new file mode 100644 index 000000000..d137f0bb7 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/json/async_client.py @@ -0,0 +1,143 @@ +"""Async client for SDK downloads""" + +import os +import asyncio +import aiofiles + +from google.cloud.storage._experimental.asyncio.json import _helpers +from google.cloud.storage._experimental.asyncio.json import download +from google.cloud.storage._helpers import _DEFAULT_SCHEME +from google.cloud.storage._helpers import _STORAGE_HOST_TEMPLATE +from google.cloud.storage._helpers import _DEFAULT_UNIVERSE_DOMAIN +from google.cloud.storage import blob + + +_SLICED_DOWNLOAD_THRESHOLD = 1024*1024*1024 # 1GB +_SLICED_DOWNLOAD_PARTS = 5 +_USERAGENT = 'test-prototype' + + +class AsyncClient(_helpers.ClientWithProjectAsync): + + SCOPE = ( + "https://www.googleapis.com/auth/devstorage.full_control", + "https://www.googleapis.com/auth/devstorage.read_only", + "https://www.googleapis.com/auth/devstorage.read_write", + ) + + @property + def api_endpoint(self): + return _DEFAULT_SCHEME + _STORAGE_HOST_TEMPLATE.format( + universe_domain=_DEFAULT_UNIVERSE_DOMAIN + ) + + def _get_download_url(self, blob_obj): + return f'{self.api_endpoint}/download/storage/v1/b/{blob_obj.bucket.name}/o/{blob_obj.name}?alt=media' + + async def _perform_download( + self, + transport, + file_obj, + download_url, + headers, + start=None, + end=None, + timeout=None, + checksum="md5", + retry=_helpers.DEFAULT_ASYNC_RETRY, + sequential_read=False, + ): + download_obj = download.DownloadAsync( + download_url, + stream=file_obj, + headers=headers, + start=start, + end=end, + checksum=checksum, + retry=retry, + sequential_read=sequential_read, + ) + await download_obj.consume(transport, timeout=timeout) + + def _check_if_sliced_download_is_eligible(self, obj_size, checksum): + if obj_size < _SLICED_DOWNLOAD_THRESHOLD: + return False + # Need to support checksum validations for parallel downloads. + return checksum==None + + async def download_to_file( + self, + blob_obj, + filename, + start=None, + end=None, + timeout=None, + checksum="md5", + retry=_helpers.DEFAULT_ASYNC_RETRY, + sequential_read=False, + ): + download_url = self._get_download_url(blob_obj) + headers = blob._get_encryption_headers(blob_obj._encryption_key) + headers["accept-encoding"] = "gzip" + headers = { + **blob._get_default_headers(_USERAGENT), + **headers, + } + + transport = self._async_http + if not blob_obj.size: + blob_obj.reload() + obj_size = blob_obj.size + try: + if not sequential_read and self._check_if_sliced_download_is_eligible(obj_size, checksum): # 1GB + print("Sliced Download Preferred, and Starting...") + chunks_offset = [0] + [obj_size//_SLICED_DOWNLOAD_PARTS]*(_SLICED_DOWNLOAD_PARTS-1) + [obj_size - obj_size//_SLICED_DOWNLOAD_PARTS*(_SLICED_DOWNLOAD_PARTS-1)] + for i in range(1, _SLICED_DOWNLOAD_PARTS+1): + chunks_offset[i]+=chunks_offset[i-1] + + with open(filename, 'wb') as _: pass # trunacates the file to zero, and keeps the file. + + tasks, file_handles = [], [] + try: + for idx in range(_SLICED_DOWNLOAD_PARTS): + file_handle = await aiofiles.open(filename, 'r+b') + await file_handle.seek(chunks_offset[idx]) + tasks.append( + self._perform_download( + transport, + file_handle, + download_url, + headers, + chunks_offset[idx], + chunks_offset[idx+1]-1, + timeout=timeout, + checksum=checksum, + retry=retry, + sequential_read=sequential_read, + ) + ) + file_handles.append(file_handle) + await asyncio.gather(*tasks) + finally: + for file_handle in file_handles: + await file_handle.close() + else: + print("Sequential Download Preferred, and Starting...") + async with aiofiles.open(filename, "wb") as file_obj: + await self._perform_download( + transport, + file_obj, + download_url, + headers, + start, + end, + timeout=timeout, + checksum=checksum, + retry=retry, + sequential_read=sequential_read, + ) + except (blob.DataCorruption, blob.NotFound): + await aiofiles.os.remove(filename) + raise + except blob.InvalidResponse as exc: + blob._raise_from_invalid_response(exc) diff --git a/google/cloud/storage/_experimental/asyncio/json/download.py b/google/cloud/storage/_experimental/asyncio/json/download.py new file mode 100644 index 000000000..bc56bfca8 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/json/download.py @@ -0,0 +1,84 @@ +"""Async based download code""" + +import http +import aiohttp + +from google.cloud.storage._experimental.asyncio.json._helpers import DEFAULT_ASYNC_RETRY +from google.cloud.storage._media.requests import _request_helpers +from google.cloud.storage._media import _download +from google.cloud.storage._media import _helpers +from google.cloud.storage._media.requests import download as storage_download + + +class DownloadAsync(_request_helpers.RequestsMixin, _download.Download): + + def __init__( + self, + media_url, + stream=None, + start=None, + end=None, + headers=None, + checksum="md5", + retry=DEFAULT_ASYNC_RETRY, + sequential_read=False, + ): + super().__init__( + media_url, stream=stream, start=start, end=end, headers=headers, checksum=checksum, retry=retry + ) + self.sequential_read = sequential_read + + async def _write_to_stream(self, response): + if not self.sequential_read: + # If we've not set expected checksum, or checksum object yet, and if it is not + # sequential download, API would not return us hash value for each chunk. + # We could ideally compute the crc32c checksum for each chunk, and later combine them + # and check, However for prototype not implementing it. + expected_checksum = None + checksum_object = _helpers._DoNothingHash() + self._expected_checksum = expected_checksum + self._checksum_object = checksum_object + else: + # Sequential read, so fetch the hash from the headers. + expected_checksum, checksum_object = _helpers._get_expected_checksum( + response, self._get_headers, self.media_url, checksum_type=self.checksum + ) + self._expected_checksum = expected_checksum + self._checksum_object = checksum_object + + async with response: + chunk_size = 4096 * 32 + async for chunk in response.content.iter_chunked(chunk_size): + await self._stream.write(chunk) + self._bytes_downloaded += len(chunk) + checksum_object.update(chunk) + + if ( + expected_checksum is not None + and response.status != http.client.PARTIAL_CONTENT + ): + actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) + + if actual_checksum != expected_checksum: + raise storage_download.DataCorruption('Corrupted download!') + + async def consume( + self, + transport, + timeout=aiohttp.ClientTimeout(total=None, sock_read=300), + ): + method, _, payload, headers = self._prepare_request() + request_kwargs = { + "data": payload, + "headers": headers, + "timeout": timeout, + } + async def retriable_request(): + url = self.media_url + result = await transport.request(method, url, **request_kwargs) + await self._write_to_stream(result) + if result.status not in (http.client.OK, http.client.PARTIAL_CONTENT): + result.raise_for_status() + return result + + return await _request_helpers.wait_and_retry(retriable_request, self._retry_strategy)