diff --git a/batchbeagle/aws/batch.py b/batchbeagle/aws/batch.py index 9892785..34a0f82 100644 --- a/batchbeagle/aws/batch.py +++ b/batchbeagle/aws/batch.py @@ -1,9 +1,19 @@ from __future__ import print_function import boto3 +import csv +import sys import time import yaml +BATCH_CLIENT = None + +def get_batch_client(): + global BATCH_CLIENT + if not BATCH_CLIENT: + BATCH_CLIENT = boto3.client('batch') + return BATCH_CLIENT + class AWSRenderable(object): def __init__(self, limited_update=False): @@ -226,7 +236,7 @@ class JobDefinition(AWSRenderable): def __init__(self, yml={}): super(JobDefinition, self).__init__() - self.batch = boto3.client('batch') + self.batch = get_batch_client() self.from_yaml(yml) self.__aws_j = None self.arn = None @@ -328,7 +338,7 @@ class ComputeResources(AWSLimitedUpdateRenderable): def __init__(self, yml={}): super(ComputeResources, self).__init__() - self.batch = boto3.client('batch') + self.batch = get_batch_client() if yml: self.from_yaml(yml) @@ -407,7 +417,7 @@ class ComputeEnvironment(AWSLimitedUpdateRenderable): def __init__(self, yml={}): super(ComputeEnvironment, self).__init__() - self.batch = boto3.client('batch') + self.batch = get_batch_client() self.from_yaml(yml) self.order = 0 @@ -639,7 +649,7 @@ def describe(self): class BatchManager(object): def __init__(self, filename='batchbeagle.yml'): - self.batch = boto3.client('batch') + self.batch = get_batch_client() self.queues = {} self.compute_environments = {} self.job_definitions = {} @@ -726,6 +736,38 @@ def submit_job(self, name, job_description, queue, parameters={}, depends_on=[], kwargs['parameters'] = parameters response = self.batch.submit_job(**kwargs) + def submit_jobs(self, name, job_description, queue, parameters_csv): + ''' + Submit Batch Jobs given a CSV file containing rows of parameters + ''' + + jd = self.job_definitions[job_description] + if not jd.arn: + jd.register() + kwargs = { + 'jobDefinition': jd.arn, + 'jobName': name, + 'jobQueue': queue + } + + with open(parameters_csv, 'rt') as csvfile: + # first line is parameter names + parameter_sets = csv.DictReader(csvfile) + jobs_submitted = 0 + print('Submitting Jobs ', end='') + + for _parameters in parameter_sets: + kwargs['parameters'] = _parameters + self.batch.submit_job(**kwargs) + jobs_submitted += 1 + if jobs_submitted % 100 == 0: + print('.', end='') + sys.stdout.flush() + + print('.') + print('Jobs submitted: {}'.format(jobs_submitted)) + sys.stdout.flush() + def get_jobs(self, queue): jobs = [] statuses = { @@ -856,9 +898,10 @@ def disable_compute_environment(self, compute_environment): if compute_environment in self.compute_environments: c = self.compute_environments[compute_environment] if c.exists(): - kwargs = c.render(True) - kwargs['state'] = 'DISABLED' - response = self.batch.update_compute_environment(**kwargs) + response = self.batch.update_compute_environment( + computeEnvironment=compute_environment, + state='DISABLED' + ) else: print("Compute Environment must be created first.") @@ -901,6 +944,7 @@ def describe(self): def assemble(self): # compute environments + print('Creating Compute Environments') compute_environments = set(self.compute_environments.keys()) for compute_environment in compute_environments: c = self.compute_environments[compute_environment] @@ -910,7 +954,7 @@ def assemble(self): self.update_compute_environment(compute_environment) while True: - time.sleep(1) + time.sleep(3) if compute_environments.issubset([ env_dict['computeEnvironmentName'] for env_dict in self.__get_compute_environments() @@ -921,6 +965,7 @@ def assemble(self): self.from_aws() # queues + print('Creating Job Queues') queues = set(self.queues.keys()) for queue in queues: q = self.queues[queue] @@ -932,16 +977,21 @@ def assemble(self): self.from_aws() # job definitions + print('Creating Job Definitions') for job_definition in self.job_definitions.keys(): self.create_job_definition(job_definition) + print('Assembly complete') + def teardown(self): # job definitions + print('Deleting Job Definitions') for job_definition in self.job_definitions.keys(): self.deregister_job_definition(job_definition) # job queues + print('Deleting Job Queues') queues = list(self.queues.keys()) for queue in queues: self.terminate_all_jobs(queue) @@ -952,7 +1002,7 @@ def teardown(self): enabled = [True] while any(enabled): - time.sleep(1) + time.sleep(3) enabled = [] for queue_dict in self.__get_queues(): enabled.append(queue_dict['state'] != 'DISABLED' or queue_dict['status'] == 'UPDATING') @@ -965,7 +1015,7 @@ def teardown(self): exists = [True] while any(exists): - time.sleep(1) + time.sleep(3) exists = [] for queue_dict in self.__get_queues(): exists.append(queue_dict['status'] != 'DELETED') @@ -973,13 +1023,14 @@ def teardown(self): self.from_aws() # compute environments + print('Deleting Compute Environments') for env_dict in self.__get_compute_environments(): if env_dict['state'] != 'DISABLED': self.disable_compute_environment(env_dict['computeEnvironmentName']) enabled = [True] while any(enabled): - time.sleep(1) + time.sleep(3) enabled = [] for env_dict in self.__get_compute_environments(): enabled.append(env_dict['state'] != 'DISABLED' or env_dict['status'] == 'UPDATING') @@ -991,7 +1042,9 @@ def teardown(self): exists = [True] while any(exists): - time.sleep(1) + time.sleep(3) exists = [] for env_dict in self.__get_compute_environments(): exists.append(env_dict['status'] != 'DELETED') + + print('Teardown complete') diff --git a/batchbeagle/dplycli.py b/batchbeagle/dplycli.py index 47c6f7f..5b6571b 100755 --- a/batchbeagle/dplycli.py +++ b/batchbeagle/dplycli.py @@ -1,7 +1,6 @@ #!/usr/bin/env python import copy -import csv import time import click @@ -145,11 +144,7 @@ def submit(ctx, name, job_definition, queue, parameters, nowait): """ mgr = BatchManager(filename=ctx.obj['CONFIG_FILE']) if parameters: - with open(parameters) as csvfile: - # first line is parameter names - reader = csv.DictReader(csvfile) - for row in reader: - mgr.submit_job(name, job_definition, queue, parameters=row) + mgr.submit_jobs(name, job_definition, queue, parameters_csv=parameters) else: mgr.submit_job(name, job_definition, queue) while True: