diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index a2466310..238ef662 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -241,6 +241,18 @@ def cat(self, src): click.echo(f.read(), nl=False) + + + def async_delete_start(self, dbfs_path, recursive, cluster_id, headers=None): + return self.client.async_delete_start(dbfs_path.absolute_path, recursive, cluster_id, headers=headers) + + def async_delete_status(self, async_delete_id=None, limit=None, headers=None): + return self.client.async_delete_status(async_delete_id, limit, headers=headers) + + def async_delete_cancel(self, async_delete_id, headers=None): + return self.client.async_delete_cancel(async_delete_id, headers=headers) + + class TempDir(object): def __init__(self, remove_on_exit=True): self._dir = None diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index dfa22753..7138522e 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -21,7 +21,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function + import click +from click import UsageError +import json +import time +from datetime import datetime, timedelta from tabulate import tabulate from databricks_cli.utils import eat_exceptions, error_and_quit, CONTEXT_SETTINGS @@ -74,19 +80,195 @@ def mkdirs_cli(api_client, dbfs_path): @click.command(context_settings=CONTEXT_SETTINGS) -@click.option('--recursive', '-r', is_flag=True, default=False) @click.argument('dbfs_path', type=DbfsPathClickType()) +@click.option('--recursive', '-r', is_flag=True, default=False) +@click.option('--as-job', is_flag=True, default=False) +@click.option('--async', is_flag=True, default=False) +@click.option('--cluster-id', required=False) @debug_option @profile_option @eat_exceptions @provide_api_client -def rm_cli(api_client, recursive, dbfs_path): +def rm_cli(api_client, dbfs_path, recursive, as_job, async, cluster_id): """ Remove files from dbfs. To remove a directory you must provide the --recursive flag. """ - DbfsApi(api_client).delete(dbfs_path, recursive) + if async or (cluster_id is not None): + as_job = True + + if as_job: + id = async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id) + if async: + click.echo(async_rm_status_impl(api_client, id)) + else: + async_rm_wait_impl(api_client, id) + else: + DbfsApi(api_client).delete(dbfs_path, recursive) + + + + + + + +def async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id): + delete_job_id = DbfsApi(api_client).async_delete_start(dbfs_path, recursive, cluster_id) + return delete_job_id['delete_job_id'] + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.argument('dbfs_path', type=DbfsPathClickType()) +@click.option('--recursive', '-r', is_flag=True, default=False) +@click.option('--cluster-id', '-c', required=False) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def async_rm_start_cli(api_client, dbfs_path, recursive, cluster_id): + """ + Start a rm-async request. + + To remove a directory you must provide the --recursive flag. + """ + id = async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id) + click.echo(async_rm_status_impl(api_client, id)) + + +def truncate_string(s, length=100): + if len(s) <= length: + return s + return s[:length] + '...' + +def parse_timestamp(ts): + t = int(ts) / 1000 + return datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S') + + +def async_rm_status_to_row(run_json): + r = json.loads(run_json) + params = r['task']['notebook_task']['base_parameters'] + state = r['state'] + if 'result_state' in state: + result = state['result_state'] + duration_ms = r['setup_duration'] + r['execution_duration'] + r['cleanup_duration'] + duration = timedelta(milliseconds=duration_ms) + else: + result = "" + duration = "" + + cluster_type = 'new' if 'new_cluster' in r['cluster_spec'] else "existing" + cluster_id = r['cluster_instance']['cluster_id'] if 'cluster_instance' in r else "" + return ( + r['run_id'], params['path'], params['recursive'], + r['run_page_url'], cluster_type, cluster_id, + parse_timestamp(r['start_time']), state['life_cycle_state'], result, duration + ) + + +def async_rm_status_to_table(id, runs_json): + ret = [] + if id is not None: + r = runs_json['delete_job_run'] + ret.append(async_rm_status_to_row(r)) + else: + for r in runs_json['delete_job_runs']: + ret.append(async_rm_status_to_row(r)) + return ret + + +def async_rm_status_impl(api_client, id, limit=None): + status = DbfsApi(api_client).async_delete_status(id, limit) + if id is not None: + return tabulate(async_rm_status_to_table(id, status), tablefmt="plain") + else: + headers = ( + "id", "path", "recursive", + "run_page_url", "cluster_type", "cluster_id", + "start_time", "state", "result", "duration") + return tabulate(async_rm_status_to_table(id, status), headers, tablefmt='simple') + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--id', required=False) +@click.option('--limit', required=False) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def async_rm_status_cli(api_client, id, limit): + """ + Check the status of your rm-async request(s). + """ + if id is not None and limit is not None: + raise UsageError("You cannot specify both --id and --limit.") + + click.echo(async_rm_status_impl(api_client, id, limit)) + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--id', required=True) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def async_rm_cancel_cli(api_client, id): + """ + Cancel your rm-async request. + """ + DbfsApi(api_client).async_delete_cancel(id) + + +def async_rm_wait_impl(api_client, id): + i = 0 + progress_chars = ['/', '-', '\\', '|'] + while True: + status = DbfsApi(api_client).async_delete_status(id) + click.echo("\r" + async_rm_status_impl(api_client, id), nl=False) + r = json.loads(status['delete_job_run']) + if r['state'].get('result_state') is not None: + click.echo(" ") + break + i = (i + 1) % len(progress_chars) + click.echo(" " + progress_chars[i], nl=False) + time.sleep(0.5) + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--id', required=True) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def async_rm_wait_cli(api_client, id): + """ + Wait until your rm-async request is complete. + """ + async_rm_wait_impl(api_client, id) + + +@click.group(context_settings=CONTEXT_SETTINGS, short_help='Remove files from DBFS asynchronously.') +@debug_option +@profile_option +@eat_exceptions +def async_rm_group(): + """ + Remove files from dbfs asynchronously. + """ + pass + + +async_rm_group.add_command(async_rm_start_cli, name="start") +async_rm_group.add_command(async_rm_status_cli, name="status") +async_rm_group.add_command(async_rm_cancel_cli, name="cancel") +async_rm_group.add_command(async_rm_wait_cli, name="wait") + + + + + + + @click.command(context_settings=CONTEXT_SETTINGS) @@ -158,10 +340,26 @@ def cat_cli(api_client, src): DbfsApi(api_client).cat(src) + +@click.group(context_settings=CONTEXT_SETTINGS, short_help='Perform asynchronous DBFS operations.') +@debug_option +@profile_option +@eat_exceptions +def async_group(): + """ + Remove files from dbfs asynchronously. + """ + pass + + +async_group.add_command(async_rm_group, name='rm') + + dbfs_group.add_command(configure_cli, name='configure') dbfs_group.add_command(ls_cli, name='ls') dbfs_group.add_command(mkdirs_cli, name='mkdirs') dbfs_group.add_command(rm_cli, name='rm') +dbfs_group.add_command(async_group, name='async') dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') dbfs_group.add_command(cat_cli, name='cat') diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index de769a34..86c65c0a 100644 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -514,6 +514,39 @@ def close(self, handle, headers=None): return self.client.perform_query('POST', '/dbfs/close', data=_data, headers=headers) + + + + + + def async_delete_start(self, dbfs_path, recursive=None, cluster_id=None, headers=None): + _data = {} + _data['path'] = dbfs_path + if recursive is not None: + _data['recursive'] = recursive + if cluster_id is not None: + _data['cluster_id'] = cluster_id + return self.client.perform_query('POST', '/dbfs-async/delete/submit', data=_data, headers=headers) + + def async_delete_status(self, async_delete_id=None, limit=None, headers=None): + _data = {} + if async_delete_id is not None: + _data['delete_job_id'] = async_delete_id + return self.client.perform_query('GET', '/dbfs-async/delete/get', data=_data, headers=headers) + else: + if limit is not None: + _data['limit'] = limit + return self.client.perform_query('GET', '/dbfs-async/delete/list', data=_data, headers=headers) + + def async_delete_cancel(self, async_delete_id, headers=None): + _data = {} + _data['delete_job_id'] = async_delete_id + return self.client.perform_query('POST', '/dbfs-async/delete/cancel', data=_data, headers=headers) + + + + + class WorkspaceService(object): def __init__(self, client): self.client = client