Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 65 additions & 12 deletions batchbeagle/aws/batch.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
from __future__ import print_function

import boto3
import csv
import sys
import time
import yaml

BATCH_CLIENT = None

def get_batch_client():
global BATCH_CLIENT
if not BATCH_CLIENT:
BATCH_CLIENT = boto3.client('batch')
return BATCH_CLIENT

class AWSRenderable(object):

def __init__(self, limited_update=False):
Expand Down Expand Up @@ -226,7 +236,7 @@ class JobDefinition(AWSRenderable):

def __init__(self, yml={}):
super(JobDefinition, self).__init__()
self.batch = boto3.client('batch')
self.batch = get_batch_client()
self.from_yaml(yml)
self.__aws_j = None
self.arn = None
Expand Down Expand Up @@ -328,7 +338,7 @@ class ComputeResources(AWSLimitedUpdateRenderable):

def __init__(self, yml={}):
super(ComputeResources, self).__init__()
self.batch = boto3.client('batch')
self.batch = get_batch_client()
if yml:
self.from_yaml(yml)

Expand Down Expand Up @@ -407,7 +417,7 @@ class ComputeEnvironment(AWSLimitedUpdateRenderable):

def __init__(self, yml={}):
super(ComputeEnvironment, self).__init__()
self.batch = boto3.client('batch')
self.batch = get_batch_client()
self.from_yaml(yml)
self.order = 0

Expand Down Expand Up @@ -639,7 +649,7 @@ def describe(self):
class BatchManager(object):

def __init__(self, filename='batchbeagle.yml'):
self.batch = boto3.client('batch')
self.batch = get_batch_client()
self.queues = {}
self.compute_environments = {}
self.job_definitions = {}
Expand Down Expand Up @@ -726,6 +736,38 @@ def submit_job(self, name, job_description, queue, parameters={}, depends_on=[],
kwargs['parameters'] = parameters
response = self.batch.submit_job(**kwargs)

def submit_jobs(self, name, job_description, queue, parameters_csv):
'''
Submit Batch Jobs given a CSV file containing rows of parameters
'''

jd = self.job_definitions[job_description]
if not jd.arn:
jd.register()
kwargs = {
'jobDefinition': jd.arn,
'jobName': name,
'jobQueue': queue
}

with open(parameters_csv, 'rt') as csvfile:
# first line is parameter names
parameter_sets = csv.DictReader(csvfile)
jobs_submitted = 0
print('Submitting Jobs ', end='')

for _parameters in parameter_sets:
kwargs['parameters'] = _parameters
self.batch.submit_job(**kwargs)
jobs_submitted += 1
if jobs_submitted % 100 == 0:
print('.', end='')
sys.stdout.flush()

print('.')
print('Jobs submitted: {}'.format(jobs_submitted))
sys.stdout.flush()

def get_jobs(self, queue):
jobs = []
statuses = {
Expand Down Expand Up @@ -856,9 +898,10 @@ def disable_compute_environment(self, compute_environment):
if compute_environment in self.compute_environments:
c = self.compute_environments[compute_environment]
if c.exists():
kwargs = c.render(True)
kwargs['state'] = 'DISABLED'
response = self.batch.update_compute_environment(**kwargs)
response = self.batch.update_compute_environment(
computeEnvironment=compute_environment,
state='DISABLED'
)
else:
print("Compute Environment must be created first.")

Expand Down Expand Up @@ -901,6 +944,7 @@ def describe(self):
def assemble(self):

# compute environments
print('Creating Compute Environments')
compute_environments = set(self.compute_environments.keys())
for compute_environment in compute_environments:
c = self.compute_environments[compute_environment]
Expand All @@ -910,7 +954,7 @@ def assemble(self):
self.update_compute_environment(compute_environment)

while True:
time.sleep(1)
time.sleep(3)
if compute_environments.issubset([
env_dict['computeEnvironmentName']
for env_dict in self.__get_compute_environments()
Expand All @@ -921,6 +965,7 @@ def assemble(self):
self.from_aws()

# queues
print('Creating Job Queues')
queues = set(self.queues.keys())
for queue in queues:
q = self.queues[queue]
Expand All @@ -932,16 +977,21 @@ def assemble(self):
self.from_aws()

# job definitions
print('Creating Job Definitions')
for job_definition in self.job_definitions.keys():
self.create_job_definition(job_definition)

print('Assembly complete')

def teardown(self):

# job definitions
print('Deleting Job Definitions')
for job_definition in self.job_definitions.keys():
self.deregister_job_definition(job_definition)

# job queues
print('Deleting Job Queues')
queues = list(self.queues.keys())
for queue in queues:
self.terminate_all_jobs(queue)
Expand All @@ -952,7 +1002,7 @@ def teardown(self):

enabled = [True]
while any(enabled):
time.sleep(1)
time.sleep(3)
enabled = []
for queue_dict in self.__get_queues():
enabled.append(queue_dict['state'] != 'DISABLED' or queue_dict['status'] == 'UPDATING')
Expand All @@ -965,21 +1015,22 @@ def teardown(self):

exists = [True]
while any(exists):
time.sleep(1)
time.sleep(3)
exists = []
for queue_dict in self.__get_queues():
exists.append(queue_dict['status'] != 'DELETED')

self.from_aws()

# compute environments
print('Deleting Compute Environments')
for env_dict in self.__get_compute_environments():
if env_dict['state'] != 'DISABLED':
self.disable_compute_environment(env_dict['computeEnvironmentName'])

enabled = [True]
while any(enabled):
time.sleep(1)
time.sleep(3)
enabled = []
for env_dict in self.__get_compute_environments():
enabled.append(env_dict['state'] != 'DISABLED' or env_dict['status'] == 'UPDATING')
Expand All @@ -991,7 +1042,9 @@ def teardown(self):

exists = [True]
while any(exists):
time.sleep(1)
time.sleep(3)
exists = []
for env_dict in self.__get_compute_environments():
exists.append(env_dict['status'] != 'DELETED')

print('Teardown complete')
7 changes: 1 addition & 6 deletions batchbeagle/dplycli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python

import copy
import csv
import time

import click
Expand Down Expand Up @@ -145,11 +144,7 @@ def submit(ctx, name, job_definition, queue, parameters, nowait):
"""
mgr = BatchManager(filename=ctx.obj['CONFIG_FILE'])
if parameters:
with open(parameters) as csvfile:
# first line is parameter names
reader = csv.DictReader(csvfile)
for row in reader:
mgr.submit_job(name, job_definition, queue, parameters=row)
mgr.submit_jobs(name, job_definition, queue, parameters_csv=parameters)
else:
mgr.submit_job(name, job_definition, queue)
while True:
Expand Down