Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions bazaarci/runner/asyncgraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Optional
import logging

from bazaarci.runner.node import Node
import asyncio

class asyncGraph(Node, set):
def __init__(self, name: str, graph: Optional["Graph"] = None):
super().__init__(name, graph)

def produces(self):
for node in self:
for product in node.produces():
yield product

def consumes(self):
for node in self:
for product in node.consumes():
yield product

# def start(self):
# [step.start() for step in self]
#
# def wait(self):
# [step.wait() for step in self]

async def runInLoop(self):
'''Run the graph if it's already being called in an event loop'''
asyncRuns = [step.run() for step in self]
groupRuns = await asyncio.gather(*asyncRuns)
results = [step.output for step in self]
return results

def run(self):
asyncRuns = [step.run() for step in self]
groupRuns = asyncio.gather(*asyncRuns)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(groupRuns)
except:
logging.exception(f"Unexpected exception")
finally:
loop.close()
9 changes: 9 additions & 0 deletions bazaarci/runner/asyncproduct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from asyncio import Event

class asyncProduct(Event):
def __init__(self, name):
super().__init__()
self.name = name

def __str__(self):
return "asyncProduct({})".format(self.name)
96 changes: 96 additions & 0 deletions bazaarci/runner/asyncstep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
""" asyncStep
Definition of the Step class and run behavior decorator functions
"""
from functools import reduce, wraps
import asyncio
from asyncio import Event
from typing import Callable, Optional

from bazaarci.runner.node import Node
from bazaarci.runner.asyncproduct import asyncProduct


class asyncStep(Node):
def __init__(self, name, graph: Optional["Graph"] = None, target: Optional[Callable] = None):
super().__init__(name, graph)
self._consumes = set()
self._produces = set()
self.target = target
self.output = None
if self.graph is not None:
self.graph.add(self)

# NOTE: Should produces and consumes be awaitable?
def produces(self, item: str = None):
if item is None:
return self._produces
# The ability to call item.set() is necessary for outputs.
elif hasattr(item, "set") and callable(item.set):
self._produces.add(item)

def consumes(self, item: str = None):
if item is None:
return self._consumes
# The ability to call item.wait() is necessary for inputs.
elif hasattr(item, "wait") and callable(item.wait):
self._consumes.add(item)

# async def start(self):
# self.thread = Thread(target=self.run)
# self.thread.start()

async def run(self):
[await asyncproduct.wait() for asyncproduct in self.consumes()]
if self.target and asyncio.iscoroutinefunction(self.target):
self.output = await self.target()
[asyncproduct.set() for asyncproduct in self.produces()]

# async def wait(self):
# if self.thread and self.thread.is_alive():
# self.thread.join()

def __str__(self):
return self.name

def __repr__(self):
return "{}({})".format(self.__class__.__name__, self.name)

# Was kind of a pain to set up with async. Ignoring this for now
#async def set_run_behavior(class_or_instance, *args):
# """ Build the run function from _run and the
# incoming list of behaviorals
# """
# run_function = class_or_instance._run
# for wrapper in reversed(args):
# run_function = await wrapper(run_function)
# setattr(class_or_instance, "run", run_function)
#
#
#async def wait_for_producers(func):
# """ Waits on all `Product`s in `self.consumes` before
# calling the function.
# """
# @wraps(func)
# def wrapped(self):
# [await asyncproduct.wait() for asyncproduct in self.consumes()]
# await func(self)
# return wrapped
#
#
#def skip_if_redundant(func):
# """ Calls the function only if any output `Product`s
# have not been set yet.
# """
# @wraps(func)
# def wrapped(self):
# # If there are output products and they have all already been set,
# # then this step is not required to run.
# # TODO: await the wait?
# all_set = reduce(lambda x, y: x and y.wait(0), self.produces(), True)
# if len(self.produces()) == 0 or not all_set:
# func(self)
# return wrapped


# By default, Step should wait for producers
#set_run_behavior(Step, wait_for_producers)
57 changes: 57 additions & 0 deletions bazaarci/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/opt/xsite/cte/tools/python/bin/python3
import sys
import asyncio
sys.path.append(f"..")
from bazaarci.runner.asyncgraph import asyncGraph
from bazaarci.runner.asyncstep import asyncStep
from bazaarci.runner.asyncproduct import asyncProduct
import random
import logging

class asyncStepTest(asyncStep):
def __init__(self, name, session):
self.name = name
self.session = session
super().__init__(
name=name,
graph=session,
target=self.asyncFunc)

async def asyncFunc(self):
print(f'{self.name} starting')
await asyncio.sleep(random.random())
print(f'{self.name} ending')
return self.name

async def main():
session = asyncGraph('asyncTest')
prod1 = asyncProduct('p1')
prod2 = asyncProduct('p2')
prod3 = asyncProduct('p3')
prod4 = asyncProduct('p4')
node1 = asyncStepTest('step1', session)
node2 = asyncStepTest('step2', session)
node3 = asyncStepTest('step3', session)
node4 = asyncStepTest('step4', session)
node1.produces(prod1)
node2.produces(prod2)
node4.produces(prod4)
node3.produces(prod3)
node3.consumes(prod1)
node3.consumes(prod2)
products = [prod.name for prod in session.produces()]
consumes = [prod.name for prod in session.consumes()]
print(f"PRODUCES : {products}")
print(f"CONSUMES : {consumes}")
results = await session.runInLoop()
print(results)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except:
logging.exception(f"Unexpected exception")
finally:
loop.close()
sys.exit(0)