From be04005c89ed679d71f9798d9f2e9e00e2946d1e Mon Sep 17 00:00:00 2001 From: JVickery-TBS Date: Mon, 24 Apr 2023 15:56:24 +0000 Subject: [PATCH 1/4] feat(commands): added commands to help manage remote uploads; - Added `list-linked-uploads` command. - Added `list-unlinked-uploads` command. - Added `remove-unlinked-uploads` command. - Added `list-missing-uploads` command. --- ckanext/cloudstorage/cli.py | 39 ++++++ ckanext/cloudstorage/commands.py | 47 ++++++- ckanext/cloudstorage/utils.py | 221 +++++++++++++++++++++++++++++++ 3 files changed, 304 insertions(+), 3 deletions(-) diff --git a/ckanext/cloudstorage/cli.py b/ckanext/cloudstorage/cli.py index 8300d26..c3b01e3 100644 --- a/ckanext/cloudstorage/cli.py +++ b/ckanext/cloudstorage/cli.py @@ -1,4 +1,5 @@ import click +from typing import Union from ckanext.cloudstorage import utils @@ -18,3 +19,41 @@ def cloudstorage(): @cloudstorage.command() def initdb(): utils.initdb() + + +@cloudstorage.command() +@click.option( + "-o", + "--output", + default=None, + help="The output file path.", +) +def list_unlinked_uploads(output: Union[str, None]): + utils.list_linked_uploads(output) + + +@cloudstorage.command() +def remove_unlinked_uploads(): + utils.remove_unlinked_uploads() + + +@cloudstorage.command() +@click.option( + "-o", + "--output", + default=None, + help="The output file path.", +) +def list_missing_uploads(output: Union[str, None]): + utils.list_missing_uploads(output) + + +@cloudstorage.command() +@click.option( + "-o", + "--output", + default=None, + help="The output file path.", +) +def list_linked_uploads(output: Union[str, None]): + utils.list_linked_uploads(output) diff --git a/ckanext/cloudstorage/commands.py b/ckanext/cloudstorage/commands.py index f418c52..b17cfb8 100644 --- a/ckanext/cloudstorage/commands.py +++ b/ckanext/cloudstorage/commands.py @@ -21,17 +21,26 @@ USAGE = """ckanext-cloudstorage Commands: - - fix-cors Update CORS rules where possible. - - migrate Upload local storage to the remote. - - initdb Reinitalize database tables. + - fix-cors Update CORS rules where possible. + - migrate Upload local storage to the remote. + - initdb Reinitalize database tables. + - list-unlinked-uploads Lists uploads in the storage container that do not match to any resources. + - remove-unlinked-uploads Permanently deletes uploads from the storage container that do not match to any resources. + - list-missing-uploads Lists resources IDs that are missing uploads in the storage container. + - list-linked-uploads Lists uploads in the storage container that do match to a resource. Usage: cloudstorage fix-cors ... [--c=] cloudstorage migrate [] [--c=] cloudstorage initdb [--c=] + cloudstorage list-unlinked-uploads [--o=] [--c=] + cloudstorage remove-unlinked-uploads [--c=] + cloudstorage list-missing-uploads [--o=] [--c=] + cloudstorage list-linked-uploads [--o=] [--c=] Options: -c= The CKAN configuration file. + -o= The output file path. """ @@ -45,6 +54,11 @@ class PasterCommand(CkanCommand): summary = 'ckanext-cloudstorage maintence utilities.' usage = USAGE + def __init__(self, name): + super(PasterCommand, self).__init__(name) + self.parser.add_option('-o', '--output', dest='output', action='store', + default=None, help='The output file path.') + def command(self): self._load_config() args = docopt(USAGE, argv=self.args) @@ -55,6 +69,14 @@ def command(self): _migrate(args) elif args['initdb']: _initdb() + elif args['list-unlinked-uploads']: + _list_unlinked_uploads(self.options.output) + elif args['remove-unlinked-uploads']: + _remove_unlinked_uploads() + elif args['list-missing-uploads']: + _list_missing_uploads(self.options.output) + elif args['list-linked-uploads']: + _list_linked_uploads(self.options.output) def _migrate(args): @@ -166,3 +188,22 @@ def _fix_cors(args): def _initdb(): utils.initdb() + + +def _list_unlinked_uploads(output_path): + # type: (str|None) -> None + utils.list_unlinked_uploads(output_path) + + +def _remove_unlinked_uploads(): + utils.remove_unlinked_uploads() + + +def _list_missing_uploads(output_path): + # type: (str|None) -> None + utils.list_missing_uploads(output_path) + + +def _list_linked_uploads(output_path): + # type: (str|None) -> None + utils.list_linked_uploads(output_path) diff --git a/ckanext/cloudstorage/utils.py b/ckanext/cloudstorage/utils.py index a891464..eac6bd3 100644 --- a/ckanext/cloudstorage/utils.py +++ b/ckanext/cloudstorage/utils.py @@ -1,10 +1,231 @@ +import os +import click +import unicodecsv as csv +from sqlalchemy import and_ as _and_ + +from ckan.lib.munge import munge_filename +from ckan import model + +from ckanext.cloudstorage.storage import CloudStorage from ckanext.cloudstorage.model import ( create_tables, drop_tables ) +from ckan.plugins.toolkit import h + def initdb(): drop_tables() create_tables() print("DB tables are reinitialized") + + +def _get_uploads(get_linked = True, return_upload_objects_only = False): + # type: (bool, bool) -> tuple[float, list] + cs = CloudStorage() + + resource_urls = set(os.path.join( + u'resources', + id, + munge_filename(filename)) + for id, filename in + model.Session.query( + model.Resource.id, + model.Resource.url) \ + .join(model.Package, + model.Resource.package_id == model.Package.id) \ + .filter(_and_(model.Resource.url_type == u'upload', + model.Resource.state == model.core.State.ACTIVE, + model.Package.state == model.core.State.ACTIVE)) \ + .all()) + + uploads = cs.container.list_objects() + + parsed_uploads = [] + total_space_used = 0 + for upload in uploads: + if (upload.name in resource_urls + if get_linked else + upload.name not in resource_urls): + + if return_upload_objects_only: + parsed_uploads.append(upload) + continue + + resource_id = upload.name.split('/')[1] + resource_fields = None + if resource_id: + resource_fields = model.Session.query( + model.Resource.id, + model.Resource.url, + model.Resource.package_id, + model.Resource.created, + model.Resource.last_modified, + model.Package.owner_org, + model.Package.state, + model.Resource.state) \ + .join(model.Package, model.Resource.package_id == model.Package.id) \ + .filter(_and_(model.Resource.url_type == u'upload', + model.Resource.id == resource_id)) \ + .first() + parsed_uploads.append({ + u'resource_id': resource_fields[0] if resource_fields else None, + u'resource_filename': resource_fields[1] if resource_fields else None, + u'package_id': resource_fields[2] if resource_fields else None, + u'created': h.render_datetime(resource_fields[3]) if resource_fields else None, + u'last_modified': h.render_datetime(resource_fields[4]) if resource_fields else None, + u'organization_id': resource_fields[5] if resource_fields else None, + u'upload_url': upload.name, + u'upload_size': upload.size / 1000.0, + u'package_state': resource_fields[6] if resource_fields else None, + u'resource_state': resource_fields[7] if resource_fields else None}) + total_space_used += upload.size / 1000.0 + + return total_space_used, parsed_uploads + + +def _humanize_space(space): + # type: (float) -> tuple[float, str] + parsed_space = space + for unit in ['KB', 'MB', 'GB', 'TB']: + if parsed_space < 1000.0: + return parsed_space, unit + parsed_space /= 1000.0 + return space, 'KB' + + +def _write_uploads_to_csv(output_path, uploads): + #type: (str, list) -> None + if not uploads: + click.echo(u"Nothing to write to {}".format(output_path)) + return + with open(output_path, u'w') as f: + w = csv.writer(f, encoding='utf-8') + w.writerow((u'resource_id', + u'package_id', + u'organization_id', + u'resource_filename', + u'upload_url', + u'upload_file_size_in_kb', + u'resource_created', + u'resource_last_modified', + u'package_state', + u'resource_state')) + for upload in uploads: + w.writerow(( + upload[u'resource_id'], + upload[u'package_id'], + upload[u'organization_id'], + upload[u'resource_filename'], + upload[u'upload_url'], + upload[u'upload_size'], + upload[u'created'], + upload[u'last_modified'], + upload[u'package_state'], + upload[u'resource_state'])) + click.echo(u"Wrote {} row(s) to {}" + .format(len(uploads), output_path)) + + +def list_linked_uploads(output_path): + # type: (str|None) -> None + used_space, good_uploads = _get_uploads() + + if output_path: + _write_uploads_to_csv(output_path, good_uploads) + else: + used_space, unit = _humanize_space(used_space) + click.echo(u"Found {} uploads(s) with linked resources. Total space: {} {}." + .format(len(good_uploads), used_space, unit)) + + +def list_unlinked_uploads(output_path): + # type: (str|None) -> None + used_space, uploads_missing_resources = _get_uploads(get_linked = False) + + if output_path: + _write_uploads_to_csv(output_path, uploads_missing_resources) + else: + used_space, unit = _humanize_space(used_space) + click.echo(u"Found {} upload(s) with missing or deleted resources. Total space: {} {}." + .format(len(uploads_missing_resources), used_space, unit)) + + +def remove_unlinked_uploads(): + cs = CloudStorage() + + used_space, uploads_missing_resources = _get_uploads(get_linked = False, return_upload_objects_only = True) + + num_success = 0 + num_failures = 0 + saved_space = 0 + for upload in uploads_missing_resources: + if cs.container.delete_object(upload): + click.echo(u"Deleted {}".format(upload.name)) + num_success += 1 + saved_space += upload.size / 1000.0 + used_space -= upload.size / 1000.0 + else: + click.echo(u"Failed to delete {}".format(upload.name)) + num_failures += 1 + + if num_success: + saved_space, unit = _humanize_space(used_space) + click.echo(u"Deleted {} upload(s). Saved {} {}." + .format(num_success, saved_space, unit)) + + if num_failures: + click.echo(u"Failed to delete {} upload(s).".format(num_failures)) + + if used_space: + used_space, unit = _humanize_space(used_space) + click.echo(u"Remaining space used by unlinked uploads: {} {}." + .format(used_space, unit)) + + +def list_missing_uploads(output_path): + # type: (str|None) -> None + cs = CloudStorage() + + upload_urls = set(u.name for u in cs.container.list_objects()) + + resource_fields = model.Session.query( + model.Resource.id, + model.Resource.url, + model.Resource.package_id, + model.Resource.created, + model.Resource.last_modified, + model.Package.owner_org) \ + .join(model.Package, + model.Resource.package_id == model.Package.id) \ + .filter(_and_(model.Resource.url_type == u'upload', + model.Resource.state == model.core.State.ACTIVE, + model.Package.state == model.core.State.ACTIVE)) \ + .all() + + resources_missing_uploads = [] + for id, filename, package_id, created, last_modified, organization_id in resource_fields: + url = os.path.join( + u'resources', + id, + munge_filename(filename)) + + if url not in upload_urls: + resources_missing_uploads.append({ + u'resource_id': id, + u'resource_filename': filename, + u'package_id': package_id, + u'created': h.render_datetime(created), + u'last_modified': h.render_datetime(last_modified), + u'organization_id': organization_id, + u'upload_url': None, + u'upload_size': None, + u'package_state': model.core.State.ACTIVE, + u'resource_state': model.core.State.ACTIVE}) + + if output_path: + _write_uploads_to_csv(output_path, resources_missing_uploads) + else: + click.echo(u"Found {} resource(s) with missing uploads." + .format(len(resources_missing_uploads))) From 2fbb4d5f8fdfff9f971074d92426d57da552f1aa Mon Sep 17 00:00:00 2001 From: JVickery-TBS Date: Mon, 24 Apr 2023 19:15:39 +0000 Subject: [PATCH 2/4] feat(commands): added method help text for click; - Added method help text for click usage. --- ckanext/cloudstorage/cli.py | 5 +++++ ckanext/cloudstorage/commands.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ckanext/cloudstorage/cli.py b/ckanext/cloudstorage/cli.py index c3b01e3..3d9b68c 100644 --- a/ckanext/cloudstorage/cli.py +++ b/ckanext/cloudstorage/cli.py @@ -18,6 +18,7 @@ def cloudstorage(): @cloudstorage.command() def initdb(): + """Reinitalize database tables.""" utils.initdb() @@ -29,11 +30,13 @@ def initdb(): help="The output file path.", ) def list_unlinked_uploads(output: Union[str, None]): + """Lists uploads in the storage container that do not match to any resources.""" utils.list_linked_uploads(output) @cloudstorage.command() def remove_unlinked_uploads(): + """Permanently deletes uploads from the storage container that do not match to any resources.""" utils.remove_unlinked_uploads() @@ -45,6 +48,7 @@ def remove_unlinked_uploads(): help="The output file path.", ) def list_missing_uploads(output: Union[str, None]): + """Lists resources that are missing uploads in the storage container.""" utils.list_missing_uploads(output) @@ -56,4 +60,5 @@ def list_missing_uploads(output: Union[str, None]): help="The output file path.", ) def list_linked_uploads(output: Union[str, None]): + """Lists uploads in the storage container that do match to a resource.""" utils.list_linked_uploads(output) diff --git a/ckanext/cloudstorage/commands.py b/ckanext/cloudstorage/commands.py index b17cfb8..3205394 100644 --- a/ckanext/cloudstorage/commands.py +++ b/ckanext/cloudstorage/commands.py @@ -26,7 +26,7 @@ - initdb Reinitalize database tables. - list-unlinked-uploads Lists uploads in the storage container that do not match to any resources. - remove-unlinked-uploads Permanently deletes uploads from the storage container that do not match to any resources. - - list-missing-uploads Lists resources IDs that are missing uploads in the storage container. + - list-missing-uploads Lists resources that are missing uploads in the storage container. - list-linked-uploads Lists uploads in the storage container that do match to a resource. Usage: From 1a98885fe1a069c522985c3d582b2a60743c7ba1 Mon Sep 17 00:00:00 2001 From: JVickery-TBS Date: Wed, 24 May 2023 15:45:16 +0000 Subject: [PATCH 3/4] feat(commands): moved cors and migrate commands to click; - Moved `fix-cors` and `migrate` sub-command callbacks to util script. - Implemented `fix-cors` and `migrate` sub-commands with click. --- ckanext/cloudstorage/cli.py | 15 ++++ ckanext/cloudstorage/commands.py | 123 +------------------------------ ckanext/cloudstorage/utils.py | 122 +++++++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 120 deletions(-) diff --git a/ckanext/cloudstorage/cli.py b/ckanext/cloudstorage/cli.py index 3d9b68c..c2eb1bb 100644 --- a/ckanext/cloudstorage/cli.py +++ b/ckanext/cloudstorage/cli.py @@ -22,6 +22,21 @@ def initdb(): utils.initdb() +@cloudstorage.command() +@click.argument(u'domains') +def fix_cors(domains): + """Update CORS rules where possible.""" + utils.fix_cors(domains) + + +@cloudstorage.command() +@click.argument(u'path_to_storage') +@click.argument(u'resource_id', required=False, default=None) +def migrate(path_to_storage, resource_id): + """Upload local storage to the remote.""" + utils.migrate(path_to_storage, resource_id) + + @cloudstorage.command() @click.option( "-o", diff --git a/ckanext/cloudstorage/commands.py b/ckanext/cloudstorage/commands.py index 3205394..f5e951d 100644 --- a/ckanext/cloudstorage/commands.py +++ b/ckanext/cloudstorage/commands.py @@ -1,22 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os -import os.path -import cgi -import tempfile - from docopt import docopt from ckan.lib.cli import CkanCommand -from ckanapi import LocalCKAN -from ckanext.cloudstorage.storage import ( - CloudStorage, - ResourceCloudStorage -) - from ckanext.cloudstorage import utils -from ckan.logic import NotFound USAGE = """ckanext-cloudstorage @@ -44,12 +32,6 @@ """ -class FakeFileStorage(cgi.FieldStorage): - def __init__(self, fp, filename): - self.file = fp - self.filename = filename - - class PasterCommand(CkanCommand): summary = 'ckanext-cloudstorage maintence utilities.' usage = USAGE @@ -80,110 +62,13 @@ def command(self): def _migrate(args): - path = args[''] - single_id = args[''] - if not os.path.isdir(path): - print('The storage directory cannot be found.') - return - - lc = LocalCKAN() - resources = {} - failed = [] - - # The resource folder is stuctured like so on disk: - # - storage/ - # - ... - # - resources/ - # - <3 letter prefix> - # - <3 letter prefix> - # - - # ... - # ... - # ... - for root, dirs, files in os.walk(path): - # Only the bottom level of the tree actually contains any files. We - # don't care at all about the overall structure. - if not files: - continue - - split_root = root.split('/') - resource_id = split_root[-2] + split_root[-1] - - for file_ in files: - ckan_res_id = resource_id + file_ - if single_id and ckan_res_id != single_id: - continue - - resources[ckan_res_id] = os.path.join( - root, - file_ - ) - - for i, resource in enumerate(resources.iteritems(), 1): - resource_id, file_path = resource - print('[{i}/{count}] Working on {id}'.format( - i=i, - count=len(resources), - id=resource_id - )) - - try: - resource = lc.action.resource_show(id=resource_id) - except NotFound: - print(u'\tResource not found') - continue - - if resource['url_type'] != 'upload': - print(u'\t`url_type` is not `upload`. Skip') - continue - - with open(file_path, 'rb') as fin: - resource['upload'] = FakeFileStorage( - fin, - resource['url'].split('/')[-1] - ) - try: - uploader = ResourceCloudStorage(resource) - uploader.upload(resource['id']) - except Exception as e: - failed.append(resource_id) - print(u'\tError of type {0} during upload: {1}'.format(type(e), e)) - - if failed: - log_file = tempfile.NamedTemporaryFile(delete=False) - log_file.file.writelines(failed) - print(u'ID of all failed uploads are saved to `{0}`'.format(log_file.name)) + # type: (list|None) -> None + utils.migrate(args[''], args['']) def _fix_cors(args): - cs = CloudStorage() - - if cs.can_use_advanced_azure: - from azure.storage import blob as azure_blob - from azure.storage import CorsRule - - blob_service = azure_blob.BlockBlobService( - cs.driver_options['key'], - cs.driver_options['secret'] - ) - - blob_service.set_blob_service_properties( - cors=[ - CorsRule( - allowed_origins=args[''], - allowed_methods=['GET'] - ) - ] - ) - print('Done!') - else: - print( - 'The driver {driver_name} being used does not currently' - ' support updating CORS rules through' - ' cloudstorage.'.format( - driver_name=cs.driver_name - ) - ) + # type: (list|None) -> None + utils.fix_cors(args['']) def _initdb(): diff --git a/ckanext/cloudstorage/utils.py b/ckanext/cloudstorage/utils.py index eac6bd3..bcef843 100644 --- a/ckanext/cloudstorage/utils.py +++ b/ckanext/cloudstorage/utils.py @@ -1,4 +1,7 @@ import os +import os.path +import cgi +import tempfile import click import unicodecsv as csv from sqlalchemy import and_ as _and_ @@ -6,13 +9,25 @@ from ckan.lib.munge import munge_filename from ckan import model -from ckanext.cloudstorage.storage import CloudStorage +from ckanapi import LocalCKAN + +from ckanext.cloudstorage.storage import ( + CloudStorage, + ResourceCloudStorage +) from ckanext.cloudstorage.model import ( create_tables, drop_tables ) from ckan.plugins.toolkit import h +from ckan.logic import NotFound + + +class FakeFileStorage(cgi.FieldStorage): + def __init__(self, fp, filename): + self.file = fp + self.filename = filename def initdb(): @@ -21,6 +36,111 @@ def initdb(): print("DB tables are reinitialized") +def migrate(path, single_id=None): + if not os.path.isdir(path): + print('The storage directory cannot be found.') + return + + lc = LocalCKAN() + resources = {} + failed = [] + + # The resource folder is stuctured like so on disk: + # - storage/ + # - ... + # - resources/ + # - <3 letter prefix> + # - <3 letter prefix> + # - + # ... + # ... + # ... + for root, dirs, files in os.walk(path): + # Only the bottom level of the tree actually contains any files. We + # don't care at all about the overall structure. + if not files: + continue + + split_root = root.split('/') + resource_id = split_root[-2] + split_root[-1] + + for file_ in files: + ckan_res_id = resource_id + file_ + if single_id and ckan_res_id != single_id: + continue + + resources[ckan_res_id] = os.path.join( + root, + file_ + ) + + for i, resource in enumerate(resources.iteritems(), 1): + resource_id, file_path = resource + print('[{i}/{count}] Working on {id}'.format( + i=i, + count=len(resources), + id=resource_id + )) + + try: + resource = lc.action.resource_show(id=resource_id) + except NotFound: + print(u'\tResource not found') + continue + + if resource['url_type'] != 'upload': + print(u'\t`url_type` is not `upload`. Skip') + continue + + with open(file_path, 'rb') as fin: + resource['upload'] = FakeFileStorage( + fin, + resource['url'].split('/')[-1] + ) + try: + uploader = ResourceCloudStorage(resource) + uploader.upload(resource['id']) + except Exception as e: + failed.append(resource_id) + print(u'\tError of type {0} during upload: {1}'.format(type(e), e)) + + if failed: + log_file = tempfile.NamedTemporaryFile(delete=False) + log_file.file.writelines(failed) + print(u'ID of all failed uploads are saved to `{0}`'.format(log_file.name)) + + +def fix_cors(domains): + cs = CloudStorage() + + if cs.can_use_advanced_azure: + from azure.storage import blob as azure_blob + from azure.storage import CorsRule + + blob_service = azure_blob.BlockBlobService( + cs.driver_options['key'], + cs.driver_options['secret'] + ) + + blob_service.set_blob_service_properties( + cors=[ + CorsRule( + allowed_origins=domains, + allowed_methods=['GET'] + ) + ] + ) + print('Done!') + else: + print( + 'The driver {driver_name} being used does not currently' + ' support updating CORS rules through' + ' cloudstorage.'.format( + driver_name=cs.driver_name + ) + ) + + def _get_uploads(get_linked = True, return_upload_objects_only = False): # type: (bool, bool) -> tuple[float, list] cs = CloudStorage() From aceebd28f5cd3c401f6ecd36246d55f07ce18709 Mon Sep 17 00:00:00 2001 From: JVickery-TBS Date: Wed, 24 May 2023 15:48:16 +0000 Subject: [PATCH 4/4] feat(commands): added new sub-command; - Added new sub-command `migrate-file`. --- ckanext/cloudstorage/cli.py | 8 +++++++ ckanext/cloudstorage/commands.py | 9 ++++++++ ckanext/cloudstorage/utils.py | 39 ++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/ckanext/cloudstorage/cli.py b/ckanext/cloudstorage/cli.py index c2eb1bb..4d29f07 100644 --- a/ckanext/cloudstorage/cli.py +++ b/ckanext/cloudstorage/cli.py @@ -37,6 +37,14 @@ def migrate(path_to_storage, resource_id): utils.migrate(path_to_storage, resource_id) +@cloudstorage.command() +@click.argument(u'path_to_file') +@click.argument(u'resource_id') +def migrate_file(path_to_file, resource_id): + """Upload local file to the remote for a given resource.""" + utils.migrate_file(path_to_file, resource_id) + + @cloudstorage.command() @click.option( "-o", diff --git a/ckanext/cloudstorage/commands.py b/ckanext/cloudstorage/commands.py index f5e951d..ae6d97e 100644 --- a/ckanext/cloudstorage/commands.py +++ b/ckanext/cloudstorage/commands.py @@ -11,6 +11,7 @@ Commands: - fix-cors Update CORS rules where possible. - migrate Upload local storage to the remote. + - migrate-file Upload local file to the remote for a given resource. - initdb Reinitalize database tables. - list-unlinked-uploads Lists uploads in the storage container that do not match to any resources. - remove-unlinked-uploads Permanently deletes uploads from the storage container that do not match to any resources. @@ -20,6 +21,7 @@ Usage: cloudstorage fix-cors ... [--c=] cloudstorage migrate [] [--c=] + cloudstorage migrate-file [--c=] cloudstorage initdb [--c=] cloudstorage list-unlinked-uploads [--o=] [--c=] cloudstorage remove-unlinked-uploads [--c=] @@ -49,6 +51,8 @@ def command(self): _fix_cors(args) elif args['migrate']: _migrate(args) + elif args['migrate-file']: + _migrate_file(args) elif args['initdb']: _initdb() elif args['list-unlinked-uploads']: @@ -66,6 +70,11 @@ def _migrate(args): utils.migrate(args[''], args['']) +def _migrate_file(args): + # type: (list|None) -> None + utils.migrate_file(args[''], args['']) + + def _fix_cors(args): # type: (list|None) -> None utils.fix_cors(args['']) diff --git a/ckanext/cloudstorage/utils.py b/ckanext/cloudstorage/utils.py index bcef843..09c8780 100644 --- a/ckanext/cloudstorage/utils.py +++ b/ckanext/cloudstorage/utils.py @@ -110,6 +110,45 @@ def migrate(path, single_id=None): print(u'ID of all failed uploads are saved to `{0}`'.format(log_file.name)) +def migrate_file(file_path, resource_id): + if not os.path.isfile(file_path): + print('The file path is not a file.') + return + + lc = LocalCKAN() + failed = [] + + try: + resource = lc.action.resource_show(id=resource_id) + except NotFound: + print(u'Resource not found') + return + + if resource['url_type'] != 'upload': + print(u'`url_type` is not `upload`.') + return + + with open(file_path, 'rb') as fin: + resource['upload'] = FakeFileStorage( + fin, + resource['url'].split('/')[-1] + ) + try: + uploader = ResourceCloudStorage(resource) + uploader.upload(resource['id']) + head, tail = os.path.split(file_path) + print(u'Uploaded file {0} successfully for resource {1}.'.format( + tail, resource_id)) + except Exception as e: + failed.append(resource_id) + print(u'Error of type {0} during upload: {1}'.format(type(e), e)) + + if failed: + log_file = tempfile.NamedTemporaryFile(delete=False) + log_file.file.writelines(failed) + print(u'ID of all failed uploads are saved to `{0}`'.format(log_file.name)) + + def fix_cors(domains): cs = CloudStorage()