From 298b5193bc7eed5c9eb449333a2c097e3e4bb3f0 Mon Sep 17 00:00:00 2001 From: Justin Wang Date: Thu, 12 Sep 2019 12:50:06 -0400 Subject: [PATCH 1/5] First commit --- bazaarci/runner/asyncgraph.py | 36 +++++++++++++ bazaarci/runner/asyncproduct.py | 9 ++++ bazaarci/runner/asyncstep.py | 96 +++++++++++++++++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 bazaarci/runner/asyncgraph.py create mode 100644 bazaarci/runner/asyncproduct.py create mode 100644 bazaarci/runner/asyncstep.py diff --git a/bazaarci/runner/asyncgraph.py b/bazaarci/runner/asyncgraph.py new file mode 100644 index 0000000..e0e80c6 --- /dev/null +++ b/bazaarci/runner/asyncgraph.py @@ -0,0 +1,36 @@ +from typing import Optional +import logging + +from bazaarci.runner.node import Node +import asyncio + +class asyncGraph(Node, set): + def __init__(self, name: str, graph: Optional["Graph"] = None): + super().__init__(name, graph) + + def produces(self): + for node in self: + for product in node.produces(): + yield product + + def consumes(self): + for node in self: + for product in node.consumes(): + yield product + +# def start(self): +# [step.start() for step in self] +# +# def wait(self): +# [step.wait() for step in self] + + def run(self): + asyncRuns = [step.run() for step in self] + groupRuns = asyncio.gather(*asyncRuns) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(groupRuns) + except: + logging.exception(f"Unexpected exception") + finally: + loop.close() diff --git a/bazaarci/runner/asyncproduct.py b/bazaarci/runner/asyncproduct.py new file mode 100644 index 0000000..1457449 --- /dev/null +++ b/bazaarci/runner/asyncproduct.py @@ -0,0 +1,9 @@ +from asyncio import Event + +class asyncProduct(Event): + def __init__(self, name): + super().__init__() + self.name = name + + def __str__(self): + return "asyncProduct({})".format(self.name) diff --git a/bazaarci/runner/asyncstep.py b/bazaarci/runner/asyncstep.py new file mode 100644 index 0000000..f5bf70f --- /dev/null +++ b/bazaarci/runner/asyncstep.py @@ -0,0 +1,96 @@ +""" asyncStep +Definition of the Step class and run behavior decorator functions +""" +from functools import reduce, wraps +import asyncio +from asyncio import Event +from typing import Callable, Optional + +from bazaarci.runner.node import Node +from bazaarci.runner.asyncproduct import asyncProduct + + +class asyncStep(Node): + def __init__(self, name, graph: Optional["Graph"] = None, target: Optional[Callable] = None): + super().__init__(name, graph) + self._consumes = set() + self._produces = set() + self.target = target + self.output = None + if self.graph is not None: + self.graph.add(self) + + # NOTE: Should produces and consumes be awaitable? + def produces(self, item: str = None): + if item is None: + return self._produces + # The ability to call item.set() is necessary for outputs. + elif hasattr(item, "set") and callable(item.set): + self._produces.add(item) + + def consumes(self, item: str = None): + if item is None: + return self._consumes + # The ability to call item.wait() is necessary for inputs. + elif hasattr(item, "wait") and callable(item.wait): + self._consumes.add(item) + +# async def start(self): +# self.thread = Thread(target=self.run) +# self.thread.start() + + async def run(self): + [await asyncproduct.wait() for asyncproduct in self.consumes()] + if self.target and asyncio.iscoroutinefunction(self.target): + self.output = await self.target() + [asyncproduct.set() for asyncproduct in self.produces()] + +# async def wait(self): +# if self.thread and self.thread.is_alive(): +# self.thread.join() + + def __str__(self): + return self.name + + def __repr__(self): + return "{}({})".format(self.__class__.__name__, self.name) + +# Was kind of a pain to set up with async. Ignoring this for now +#async def set_run_behavior(class_or_instance, *args): +# """ Build the run function from _run and the +# incoming list of behaviorals +# """ +# run_function = class_or_instance._run +# for wrapper in reversed(args): +# run_function = await wrapper(run_function) +# setattr(class_or_instance, "run", run_function) +# +# +#async def wait_for_producers(func): +# """ Waits on all `Product`s in `self.consumes` before +# calling the function. +# """ +# @wraps(func) +# def wrapped(self): +# [await asyncproduct.wait() for asyncproduct in self.consumes()] +# await func(self) +# return wrapped +# +# +#def skip_if_redundant(func): +# """ Calls the function only if any output `Product`s +# have not been set yet. +# """ +# @wraps(func) +# def wrapped(self): +# # If there are output products and they have all already been set, +# # then this step is not required to run. +# # TODO: await the wait? +# all_set = reduce(lambda x, y: x and y.wait(0), self.produces(), True) +# if len(self.produces()) == 0 or not all_set: +# func(self) +# return wrapped + + +# By default, Step should wait for producers +#set_run_behavior(Step, wait_for_producers) From a407358add84eb7a4530131dd8a7879f198cdce4 Mon Sep 17 00:00:00 2001 From: Justin Wang Date: Thu, 12 Sep 2019 12:50:16 -0400 Subject: [PATCH 2/5] add test --- bazaarci/test.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100755 bazaarci/test.py diff --git a/bazaarci/test.py b/bazaarci/test.py new file mode 100755 index 0000000..ddc7488 --- /dev/null +++ b/bazaarci/test.py @@ -0,0 +1,42 @@ +#!/opt/xsite/cte/tools/python/bin/python3 +import sys +import asyncio +sys.path.append(f"/afs/apd.pok.ibm.com/u/jpwang2/bazaarci/BazaarCI/") +from bazaarci.runner.asyncgraph import asyncGraph +from bazaarci.runner.asyncstep import asyncStep +from bazaarci.runner.asyncproduct import asyncProduct +import random + +class asyncStepTest(asyncStep): + def __init__(self, name, session): + self.name = name + self.session = session + super().__init__( + name=name, + graph=session, + target=self.asyncFunc) + + async def asyncFunc(self): + print(f'{self.name} starting') + await asyncio.sleep(random.random()) + print(f'{self.name} ending') + return self.name + +def main(): + session = asyncGraph('asyncTest') + prod1 = asyncProduct('p1') + prod2 = asyncProduct('p2') + node1 = asyncStepTest('step1', session) + node2 = asyncStepTest('step2', session) + node3 = asyncStepTest('step3', session) + node1.produces(prod1) + node2.produces(prod2) + node3.consumes(prod1) + node3.consumes(prod2) + session.run() + results = [step.output for step in session] + print(results) + +if __name__ == "__main__": + rc = main() + sys.exit(rc) From 2a10ea96f81e488d5161055229f7749f626d323b Mon Sep 17 00:00:00 2001 From: Justin Wang Date: Mon, 16 Sep 2019 13:25:45 -0400 Subject: [PATCH 3/5] remove the afs referencde to my local clone --- bazaarci/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bazaarci/test.py b/bazaarci/test.py index ddc7488..d8fecc8 100755 --- a/bazaarci/test.py +++ b/bazaarci/test.py @@ -1,7 +1,7 @@ #!/opt/xsite/cte/tools/python/bin/python3 import sys import asyncio -sys.path.append(f"/afs/apd.pok.ibm.com/u/jpwang2/bazaarci/BazaarCI/") +sys.path.append(f"..") from bazaarci.runner.asyncgraph import asyncGraph from bazaarci.runner.asyncstep import asyncStep from bazaarci.runner.asyncproduct import asyncProduct From e103fb7b86bd4ce4156c3e48a3f02cfd850558b2 Mon Sep 17 00:00:00 2001 From: Justin Wang Date: Fri, 11 Oct 2019 16:12:54 -0400 Subject: [PATCH 4/5] add runInLoop func to call from an existing event loop --- bazaarci/runner/asyncgraph.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bazaarci/runner/asyncgraph.py b/bazaarci/runner/asyncgraph.py index e0e80c6..174841b 100644 --- a/bazaarci/runner/asyncgraph.py +++ b/bazaarci/runner/asyncgraph.py @@ -24,6 +24,12 @@ def consumes(self): # def wait(self): # [step.wait() for step in self] + async def runInLoop(self): + '''Run the graph if it's already being called in an event loop''' + asyncRuns = [step.run() for step in self] + groupRuns = asyncio.gather(*asyncRuns) + + def run(self): asyncRuns = [step.run() for step in self] groupRuns = asyncio.gather(*asyncRuns) From 1e2200da706ddfab6c0e1860f18a114641555948 Mon Sep 17 00:00:00 2001 From: Justin Wang Date: Mon, 14 Oct 2019 19:57:12 -0400 Subject: [PATCH 5/5] Update test to use runInLoop and update asyncio gather to do the correct thing --- bazaarci/runner/asyncgraph.py | 5 +++-- bazaarci/test.py | 25 ++++++++++++++++++++----- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/bazaarci/runner/asyncgraph.py b/bazaarci/runner/asyncgraph.py index 174841b..f3c35a9 100644 --- a/bazaarci/runner/asyncgraph.py +++ b/bazaarci/runner/asyncgraph.py @@ -27,8 +27,9 @@ def consumes(self): async def runInLoop(self): '''Run the graph if it's already being called in an event loop''' asyncRuns = [step.run() for step in self] - groupRuns = asyncio.gather(*asyncRuns) - + groupRuns = await asyncio.gather(*asyncRuns) + results = [step.output for step in self] + return results def run(self): asyncRuns = [step.run() for step in self] diff --git a/bazaarci/test.py b/bazaarci/test.py index d8fecc8..c64637a 100755 --- a/bazaarci/test.py +++ b/bazaarci/test.py @@ -6,6 +6,7 @@ from bazaarci.runner.asyncstep import asyncStep from bazaarci.runner.asyncproduct import asyncProduct import random +import logging class asyncStepTest(asyncStep): def __init__(self, name, session): @@ -22,21 +23,35 @@ async def asyncFunc(self): print(f'{self.name} ending') return self.name -def main(): +async def main(): session = asyncGraph('asyncTest') prod1 = asyncProduct('p1') prod2 = asyncProduct('p2') + prod3 = asyncProduct('p3') + prod4 = asyncProduct('p4') node1 = asyncStepTest('step1', session) node2 = asyncStepTest('step2', session) node3 = asyncStepTest('step3', session) + node4 = asyncStepTest('step4', session) node1.produces(prod1) node2.produces(prod2) + node4.produces(prod4) + node3.produces(prod3) node3.consumes(prod1) node3.consumes(prod2) - session.run() - results = [step.output for step in session] + products = [prod.name for prod in session.produces()] + consumes = [prod.name for prod in session.consumes()] + print(f"PRODUCES : {products}") + print(f"CONSUMES : {consumes}") + results = await session.runInLoop() print(results) if __name__ == "__main__": - rc = main() - sys.exit(rc) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + except: + logging.exception(f"Unexpected exception") + finally: + loop.close() + sys.exit(0)