diff --git a/bazaarci/runner/asyncgraph.py b/bazaarci/runner/asyncgraph.py new file mode 100644 index 0000000..f3c35a9 --- /dev/null +++ b/bazaarci/runner/asyncgraph.py @@ -0,0 +1,43 @@ +from typing import Optional +import logging + +from bazaarci.runner.node import Node +import asyncio + +class asyncGraph(Node, set): + def __init__(self, name: str, graph: Optional["Graph"] = None): + super().__init__(name, graph) + + def produces(self): + for node in self: + for product in node.produces(): + yield product + + def consumes(self): + for node in self: + for product in node.consumes(): + yield product + +# def start(self): +# [step.start() for step in self] +# +# def wait(self): +# [step.wait() for step in self] + + async def runInLoop(self): + '''Run the graph if it's already being called in an event loop''' + asyncRuns = [step.run() for step in self] + groupRuns = await asyncio.gather(*asyncRuns) + results = [step.output for step in self] + return results + + def run(self): + asyncRuns = [step.run() for step in self] + groupRuns = asyncio.gather(*asyncRuns) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(groupRuns) + except: + logging.exception(f"Unexpected exception") + finally: + loop.close() diff --git a/bazaarci/runner/asyncproduct.py b/bazaarci/runner/asyncproduct.py new file mode 100644 index 0000000..1457449 --- /dev/null +++ b/bazaarci/runner/asyncproduct.py @@ -0,0 +1,9 @@ +from asyncio import Event + +class asyncProduct(Event): + def __init__(self, name): + super().__init__() + self.name = name + + def __str__(self): + return "asyncProduct({})".format(self.name) diff --git a/bazaarci/runner/asyncstep.py b/bazaarci/runner/asyncstep.py new file mode 100644 index 0000000..f5bf70f --- /dev/null +++ b/bazaarci/runner/asyncstep.py @@ -0,0 +1,96 @@ +""" asyncStep +Definition of the Step class and run behavior decorator functions +""" +from functools import reduce, wraps +import asyncio +from asyncio import Event +from typing import Callable, Optional + +from bazaarci.runner.node import Node +from bazaarci.runner.asyncproduct import asyncProduct + + +class asyncStep(Node): + def __init__(self, name, graph: Optional["Graph"] = None, target: Optional[Callable] = None): + super().__init__(name, graph) + self._consumes = set() + self._produces = set() + self.target = target + self.output = None + if self.graph is not None: + self.graph.add(self) + + # NOTE: Should produces and consumes be awaitable? + def produces(self, item: str = None): + if item is None: + return self._produces + # The ability to call item.set() is necessary for outputs. + elif hasattr(item, "set") and callable(item.set): + self._produces.add(item) + + def consumes(self, item: str = None): + if item is None: + return self._consumes + # The ability to call item.wait() is necessary for inputs. + elif hasattr(item, "wait") and callable(item.wait): + self._consumes.add(item) + +# async def start(self): +# self.thread = Thread(target=self.run) +# self.thread.start() + + async def run(self): + [await asyncproduct.wait() for asyncproduct in self.consumes()] + if self.target and asyncio.iscoroutinefunction(self.target): + self.output = await self.target() + [asyncproduct.set() for asyncproduct in self.produces()] + +# async def wait(self): +# if self.thread and self.thread.is_alive(): +# self.thread.join() + + def __str__(self): + return self.name + + def __repr__(self): + return "{}({})".format(self.__class__.__name__, self.name) + +# Was kind of a pain to set up with async. Ignoring this for now +#async def set_run_behavior(class_or_instance, *args): +# """ Build the run function from _run and the +# incoming list of behaviorals +# """ +# run_function = class_or_instance._run +# for wrapper in reversed(args): +# run_function = await wrapper(run_function) +# setattr(class_or_instance, "run", run_function) +# +# +#async def wait_for_producers(func): +# """ Waits on all `Product`s in `self.consumes` before +# calling the function. +# """ +# @wraps(func) +# def wrapped(self): +# [await asyncproduct.wait() for asyncproduct in self.consumes()] +# await func(self) +# return wrapped +# +# +#def skip_if_redundant(func): +# """ Calls the function only if any output `Product`s +# have not been set yet. +# """ +# @wraps(func) +# def wrapped(self): +# # If there are output products and they have all already been set, +# # then this step is not required to run. +# # TODO: await the wait? +# all_set = reduce(lambda x, y: x and y.wait(0), self.produces(), True) +# if len(self.produces()) == 0 or not all_set: +# func(self) +# return wrapped + + +# By default, Step should wait for producers +#set_run_behavior(Step, wait_for_producers) diff --git a/bazaarci/test.py b/bazaarci/test.py new file mode 100755 index 0000000..c64637a --- /dev/null +++ b/bazaarci/test.py @@ -0,0 +1,57 @@ +#!/opt/xsite/cte/tools/python/bin/python3 +import sys +import asyncio +sys.path.append(f"..") +from bazaarci.runner.asyncgraph import asyncGraph +from bazaarci.runner.asyncstep import asyncStep +from bazaarci.runner.asyncproduct import asyncProduct +import random +import logging + +class asyncStepTest(asyncStep): + def __init__(self, name, session): + self.name = name + self.session = session + super().__init__( + name=name, + graph=session, + target=self.asyncFunc) + + async def asyncFunc(self): + print(f'{self.name} starting') + await asyncio.sleep(random.random()) + print(f'{self.name} ending') + return self.name + +async def main(): + session = asyncGraph('asyncTest') + prod1 = asyncProduct('p1') + prod2 = asyncProduct('p2') + prod3 = asyncProduct('p3') + prod4 = asyncProduct('p4') + node1 = asyncStepTest('step1', session) + node2 = asyncStepTest('step2', session) + node3 = asyncStepTest('step3', session) + node4 = asyncStepTest('step4', session) + node1.produces(prod1) + node2.produces(prod2) + node4.produces(prod4) + node3.produces(prod3) + node3.consumes(prod1) + node3.consumes(prod2) + products = [prod.name for prod in session.produces()] + consumes = [prod.name for prod in session.consumes()] + print(f"PRODUCES : {products}") + print(f"CONSUMES : {consumes}") + results = await session.runInLoop() + print(results) + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + except: + logging.exception(f"Unexpected exception") + finally: + loop.close() + sys.exit(0)