Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/cloudinary-cli-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
4 changes: 3 additions & 1 deletion cloudinary_cli/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from .sync import sync
from .upload_dir import upload_dir
from .regen_derived import regen_derived
from .clone import clone

commands = [
upload_dir,
make,
migrate,
sync,
regen_derived
regen_derived,
clone
]
112 changes: 112 additions & 0 deletions cloudinary_cli/modules/clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from click import command, option, style, argument
from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit
import cloudinary
from cloudinary_cli.utils.utils import run_tasks_concurrently
from cloudinary_cli.utils.api_utils import upload_file
from cloudinary_cli.utils.config_utils import load_config, get_cloudinary_config, config_to_dict
from cloudinary_cli.defaults import logger
from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination

DEFAULT_MAX_RESULTS = 500


@command("clone",
short_help="""Clone assets from one product environment to another.""",
help="""
\b
Clone assets from one product environment to another with/without tags and/or context (structured metadata is not currently supported).
Source will be your `CLOUDINARY_URL` environment variable but you also can specify a different source using the `-c/-C` option.
Cloning restricted assets is also not supported currently.
Format: cld clone <target_environment> <command options>
`<target_environment>` can be a CLOUDINARY_URL or a saved config (see `config` command)
Example 1 (Copy all assets including tags and context using CLOUDINARY URL):
cld clone cloudinary://<api_key>:<api_secret>@<cloudname> -fi tags,context
Example 2 (Copy all assets with a specific tag via a search expression using a saved config):
cld clone <config_name> -se "tags:<tag_name>"
""")
@argument("target")
@option("-F", "--force", is_flag=True,
help="Skip confirmation.")
@option("-ow", "--overwrite", is_flag=True, default=False,
help="Specify whether to overwrite existing assets.")
@option("-w", "--concurrent_workers", type=int, default=30,
help="Specify the number of concurrent network threads.")
@option("-fi", "--fields", multiple=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to show an example, or state the valid options.

help="Specify whether to copy tags and/or context. Valid options: `tags,context`.")
@option("-se", "--search_exp", default="",
help="Define a search expression to filter the assets to clone.")
@option("--async", "async_", is_flag=True, default=False,
help="Clone the assets asynchronously.")
@option("-nu", "--notification_url",
help="Webhook notification URL.")
def clone(target, force, overwrite, concurrent_workers, fields, search_exp, async_, notification_url):
if not target:
print_help_and_exit()

target_config = get_cloudinary_config(target)
if not target_config:
logger.error("The specified config does not exist or the CLOUDINARY_URL scheme provided is invalid"
" (expecting to start with 'cloudinary://').")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this allow for a saved config?

return False

if cloudinary.config().cloud_name == target_config.cloud_name:
logger.error("Target environment cannot be the same as source environment.")
return False

source_assets = search_assets(force, search_exp)

upload_list = []
for r in source_assets.get('resources'):
updated_options, asset_url = process_metadata(r, overwrite, async_, notification_url,
normalize_list_params(fields))
updated_options.update(config_to_dict(target_config))
upload_list.append((asset_url, {**updated_options}))

if not upload_list:
logger.error(style(f'No assets found in {cloudinary.config().cloud_name}', fg="red"))
return False

logger.info(style(f'Copying {len(upload_list)} asset(s) from {cloudinary.config().cloud_name} to {target_config.cloud_name}', fg="blue"))

run_tasks_concurrently(upload_file, upload_list, concurrent_workers)

return True


def search_assets(force, search_exp):
search = cloudinary.search.Search().expression(search_exp)
search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name'])
search.max_results(DEFAULT_MAX_RESULTS)

res = execute_single_request(search, fields_to_keep="")
res = handle_auto_pagination(res, search, force, fields_to_keep="")

return res


def process_metadata(res, overwrite, async_, notification_url, copy_fields=""):
cloned_options = {}
asset_url = res.get('secure_url')
cloned_options['public_id'] = res.get('public_id')
cloned_options['type'] = res.get('type')
cloned_options['resource_type'] = res.get('resource_type')
cloned_options['overwrite'] = overwrite
cloned_options['async'] = async_
if "tags" in copy_fields:
cloned_options['tags'] = res.get('tags')
if "context" in copy_fields:
cloned_options['context'] = res.get('context')
if res.get('folder'):
# This is required to put the asset in the correct asset_folder
# when copying from a fixed to DF (dynamic folder) cloud as if
# you just pass a `folder` param to a DF cloud, it will append
# this to the `public_id` and we don't want this.
cloned_options['asset_folder'] = res.get('folder')
elif res.get('asset_folder'):
cloned_options['asset_folder'] = res.get('asset_folder')
if res.get('display_name'):
cloned_options['display_name'] = res.get('display_name')
if notification_url:
cloned_options['notification_url'] = notification_url

return cloned_options, asset_url
23 changes: 16 additions & 7 deletions cloudinary_cli/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from cloudinary_cli.utils.json_utils import print_json, write_json_to_file
from cloudinary_cli.utils.utils import log_exception, confirm_action, get_command_params, merge_responses, \
normalize_list_params, ConfigurationError, print_api_help, duplicate_values
import re
from cloudinary.utils import is_remote_url

PAGINATION_MAX_RESULTS = 500

Expand Down Expand Up @@ -118,15 +120,20 @@ def upload_file(file_path, options, uploaded=None, failed=None):
verbose = logger.getEffectiveLevel() < logging.INFO

try:
size = path.getsize(file_path)
size = 0 if is_remote_url(file_path) else path.getsize(file_path)
upload_func = uploader.upload
if size > 20000000:
upload_func = uploader.upload_large
result = upload_func(file_path, **options)
disp_path = _display_path(result)
disp_str = f"as {result['public_id']}" if not disp_path \
else f"as {disp_path} with public_id: {result['public_id']}"
logger.info(style(f"Successfully uploaded {file_path} {disp_str}", fg="green"))
if "batch_id" in result:
starting_msg = "Uploading"
disp_str = f"asynchnously with batch_id: {result['batch_id']}"
else:
starting_msg = "Successfully uploaded"
disp_str = f"as {result['public_id']}" if not disp_path \
else f"as {disp_path} with public_id: {result['public_id']}"
logger.info(style(f"{starting_msg} {file_path} {disp_str}", fg="green"))
if verbose:
print_json(result)
uploaded[file_path] = {"path": asset_source(result), "display_path": disp_path}
Expand Down Expand Up @@ -212,12 +219,15 @@ def asset_source(asset_details):

:return:
"""
base_name = asset_details['public_id']
base_name = asset_details.get('public_id', '')

if not base_name:
return base_name

if asset_details['resource_type'] == 'raw' or asset_details['type'] == 'fetch':
return base_name

return base_name + '.' + asset_details['format']
return base_name + '.' + asset_details.get('format', '')


def get_folder_mode():
Expand Down Expand Up @@ -278,7 +288,6 @@ def handle_api_command(
"""
Used by Admin and Upload API commands
"""

if doc:
return launch(doc_url)

Expand Down
35 changes: 29 additions & 6 deletions cloudinary_cli/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,31 @@ def refresh_cloudinary_config(cloudinary_url):

def verify_cloudinary_url(cloudinary_url):
refresh_cloudinary_config(cloudinary_url)
try:
api.ping()
except Exception as e:
log_exception(e, f"Invalid Cloudinary URL: {cloudinary_url}")
return ping_cloudinary()


def get_cloudinary_config(target):
target_config = cloudinary.Config()
if target.startswith("cloudinary://"):
parsed_url = target_config._parse_cloudinary_url(target)
elif target in load_config():
parsed_url = target_config._parse_cloudinary_url(load_config().get(target))
else:
return False
return True

target_config._setup_from_parsed_url(parsed_url)

if not ping_cloudinary(**config_to_dict(target_config)):
logger.error(f"Invalid Cloudinary config: {target}")
return False

return target_config

def config_to_dict(config):
return {k: v for k, v in config.__dict__.items() if not k.startswith("_")}

def show_cloudinary_config(cloudinary_config):
obfuscated_config = {k: v for k, v in cloudinary_config.__dict__.items() if not k.startswith("_")}
obfuscated_config = config_to_dict(cloudinary_config)

if "api_secret" in obfuscated_config:
api_secret = obfuscated_config["api_secret"]
Expand Down Expand Up @@ -96,6 +111,14 @@ def is_valid_cloudinary_config():
def initialize():
migrate_old_config()

def ping_cloudinary(**options):
try:
api.ping(**options)
except Exception as e:
logger.error(f"Failed to ping Cloudinary: {e}")
return False

return True

def _verify_file_path(file):
os.makedirs(os.path.dirname(file), exist_ok=True)
Loading