From db87dcc29cfac37a43aedf545c4ad0fb553ea57b Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 31 Mar 2025 15:35:20 -0500 Subject: [PATCH 01/14] converting async activities to sync: first chunk --- hello/hello_activity.py | 7 +- hello/hello_activity_async.py | 71 +++++++++++++++++++ hello/hello_activity_choice.py | 10 +-- ...hreaded.py => hello_activity_heartbeat.py} | 11 ++- hello/hello_activity_retry.py | 4 +- hello/hello_cron.py | 4 +- hello/hello_exception.py | 4 +- hello/hello_mtls.py | 4 +- 8 files changed, 100 insertions(+), 15 deletions(-) create mode 100644 hello/hello_activity_async.py rename hello/{hello_activity_threaded.py => hello_activity_heartbeat.py} (85%) diff --git a/hello/hello_activity.py b/hello/hello_activity.py index 9801ee1b..b6d7f7f6 100644 --- a/hello/hello_activity.py +++ b/hello/hello_activity.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -18,7 +19,7 @@ class ComposeGreetingInput: # Basic activity that logs and does string concatenation @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: activity.logger.info("Running activity with parameter %s" % input) return f"{input.greeting}, {input.name}!" @@ -50,6 +51,10 @@ async def main(): task_queue="hello-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + # Synchronous activities require some kind of executor; + # a thread pool executor is recommended. + # This same thread pool could be passed to multiple workers if desired. + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_activity_async.py b/hello/hello_activity_async.py new file mode 100644 index 00000000..d7d9b0f0 --- /dev/null +++ b/hello/hello_activity_async.py @@ -0,0 +1,71 @@ +import asyncio +from dataclasses import dataclass +from datetime import timedelta + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.worker import Worker + + +# While we could use multiple parameters in the activity, Temporal strongly +# encourages using a single dataclass instead which can have fields added to it +# in a backwards-compatible way. +@dataclass +class ComposeGreetingInput: + greeting: str + name: str + + +# Basic activity that logs and does string concatenation +@activity.defn +async def compose_greeting(input: ComposeGreetingInput) -> str: + activity.logger.info("Running activity with parameter %s" % input) + return f"{input.greeting}, {input.name}!" + + +# Basic workflow that logs and invokes an activity +@workflow.defn +class GreetingWorkflow: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running workflow with parameter %s" % name) + return await workflow.execute_activity( + compose_greeting, + ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) + + +async def main(): + # Uncomment the lines below to see logging output + # import logging + # logging.basicConfig(level=logging.INFO) + + # Start client + client = await Client.connect("localhost:7233") + + # Run a worker for the workflow + async with Worker( + client, + task_queue="hello-activity-task-queue", + workflows=[GreetingWorkflow], + activities=[compose_greeting], + # If the worker is running async activities, you don't need + # to supply an activity executor because they run in + # the event loop of the worker. + ): + + # While the worker is running, use the client to run the workflow and + # print out its result. Note, in many production setups, the client + # would be in a completely separate process from the worker. + result = await client.execute_workflow( + GreetingWorkflow.run, + "World", + id="hello-activity-workflow-id", + task_queue="hello-activity-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/hello/hello_activity_choice.py b/hello/hello_activity_choice.py index 6d15af53..7d01b019 100644 --- a/hello/hello_activity_choice.py +++ b/hello/hello_activity_choice.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from enum import IntEnum @@ -12,22 +13,22 @@ @activity.defn -async def order_apples(amount: int) -> str: +def order_apples(amount: int) -> str: return f"Ordered {amount} Apples..." @activity.defn -async def order_bananas(amount: int) -> str: +def order_bananas(amount: int) -> str: return f"Ordered {amount} Bananas..." @activity.defn -async def order_cherries(amount: int) -> str: +def order_cherries(amount: int) -> str: return f"Ordered {amount} Cherries..." @activity.defn -async def order_oranges(amount: int) -> str: +def order_oranges(amount: int) -> str: return f"Ordered {amount} Oranges..." @@ -88,6 +89,7 @@ async def main(): task_queue="hello-activity-choice-task-queue", workflows=[PurchaseFruitsWorkflow], activities=[order_apples, order_bananas, order_cherries, order_oranges], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_activity_threaded.py b/hello/hello_activity_heartbeat.py similarity index 85% rename from hello/hello_activity_threaded.py rename to hello/hello_activity_heartbeat.py index e7bf8344..cb61f31f 100644 --- a/hello/hello_activity_threaded.py +++ b/hello/hello_activity_heartbeat.py @@ -1,5 +1,4 @@ import asyncio -import threading import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass @@ -21,7 +20,7 @@ def compose_greeting(input: ComposeGreetingInput) -> str: # We'll wait for 3 seconds, heartbeating in between (like all long-running # activities should do), then return the greeting for _ in range(0, 3): - print(f"Heartbeating activity on thread {threading.get_ident()}") + print(f"Heartbeating activity") activity.heartbeat() time.sleep(1) return f"{input.greeting}, {input.name}!" @@ -47,7 +46,7 @@ async def main(): # Run a worker for the workflow async with Worker( client, - task_queue="hello-activity-threaded-task-queue", + task_queue="hello-activity-heartbeating-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], # Synchronous activities are not allowed unless we provide some kind of @@ -62,10 +61,10 @@ async def main(): result = await client.execute_workflow( GreetingWorkflow.run, "World", - id="hello-activity-threaded-workflow-id", - task_queue="hello-activity-threaded-task-queue", + id="hello-activity-heartbeating-workflow-id", + task_queue="hello-activity-heartbeating-task-queue", ) - print(f"Result on thread {threading.get_ident()}: {result}") + print(f"Result: {result}") if __name__ == "__main__": diff --git a/hello/hello_activity_retry.py b/hello/hello_activity_retry.py index 233f9613..f1acd529 100644 --- a/hello/hello_activity_retry.py +++ b/hello/hello_activity_retry.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -15,7 +16,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: print(f"Invoking activity, attempt number {activity.info().attempt}") # Fail the first 3 attempts, succeed the 4th if activity.info().attempt < 4: @@ -52,6 +53,7 @@ async def main(): task_queue="hello-activity-retry-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_cron.py b/hello/hello_cron.py index 68b26099..dbb5cba6 100644 --- a/hello/hello_cron.py +++ b/hello/hello_cron.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -14,7 +15,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: return f"{input.greeting}, {input.name}!" @@ -40,6 +41,7 @@ async def main(): task_queue="hello-cron-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): print("Running workflow once a minute") diff --git a/hello/hello_exception.py b/hello/hello_exception.py index bfb198d5..9c35987c 100644 --- a/hello/hello_exception.py +++ b/hello/hello_exception.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor import logging from dataclasses import dataclass from datetime import timedelta @@ -18,7 +19,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> NoReturn: +def compose_greeting(input: ComposeGreetingInput) -> NoReturn: # Always raise exception raise RuntimeError(f"Greeting exception: {input.greeting}, {input.name}!") @@ -46,6 +47,7 @@ async def main(): task_queue="hello-exception-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_mtls.py b/hello/hello_mtls.py index 1ac6f9d8..3ed15354 100644 --- a/hello/hello_mtls.py +++ b/hello/hello_mtls.py @@ -1,5 +1,6 @@ import argparse import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from typing import Optional @@ -18,7 +19,7 @@ class ComposeGreetingInput: # Basic activity that logs and does string concatenation @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: return f"{input.greeting}, {input.name}!" @@ -79,6 +80,7 @@ async def main(): task_queue="hello-mtls-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and From 31c7d41050551fe825bd17ebb20b687d82910ec0 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 31 Mar 2025 15:45:46 -0500 Subject: [PATCH 02/14] converting async activities to sync: second chunk --- hello/hello_activity.py | 2 +- hello/hello_activity_heartbeat.py | 3 --- hello/hello_parallel_activity.py | 4 +++- hello/hello_patch.py | 4 +++- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/hello/hello_activity.py b/hello/hello_activity.py index b6d7f7f6..5e045854 100644 --- a/hello/hello_activity.py +++ b/hello/hello_activity.py @@ -51,7 +51,7 @@ async def main(): task_queue="hello-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], - # Synchronous activities require some kind of executor; + # Non-async activities require some kind of executor; # a thread pool executor is recommended. # This same thread pool could be passed to multiple workers if desired. activity_executor=ThreadPoolExecutor(5), diff --git a/hello/hello_activity_heartbeat.py b/hello/hello_activity_heartbeat.py index cb61f31f..230621d3 100644 --- a/hello/hello_activity_heartbeat.py +++ b/hello/hello_activity_heartbeat.py @@ -49,9 +49,6 @@ async def main(): task_queue="hello-activity-heartbeating-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], - # Synchronous activities are not allowed unless we provide some kind of - # executor. This same thread pool could be passed to multiple workers if - # desired. activity_executor=ThreadPoolExecutor(5), ): diff --git a/hello/hello_parallel_activity.py b/hello/hello_parallel_activity.py index 9de09846..b32b02bb 100644 --- a/hello/hello_parallel_activity.py +++ b/hello/hello_parallel_activity.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from datetime import timedelta from typing import List @@ -8,7 +9,7 @@ @activity.defn -async def say_hello_activity(name: str) -> str: +def say_hello_activity(name: str) -> str: return f"Hello, {name}!" @@ -48,6 +49,7 @@ async def main(): task_queue="hello-parallel-activity-task-queue", workflows=[SayHelloWorkflow], activities=[say_hello_activity], + activity_executor=ThreadPoolExecutor(10), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_patch.py b/hello/hello_patch.py index 0ed19b2f..711d9f7d 100644 --- a/hello/hello_patch.py +++ b/hello/hello_patch.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor import sys from dataclasses import dataclass from datetime import timedelta @@ -19,7 +20,7 @@ class ComposeGreetingInput: # Basic activity that logs and does string concatenation @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: activity.logger.info("Running activity with parameter %s" % input) return f"{input.greeting}, {input.name}!" @@ -123,6 +124,7 @@ async def main(): task_queue="hello-patch-task-queue", workflows=[workflow_class], # type: ignore activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): try: result = await client.execute_workflow( From c5b95657675d5eb64dbfa92563ac94928abb65e6 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 31 Mar 2025 16:52:13 -0500 Subject: [PATCH 03/14] converting async activities to sync: third chunk --- hello/hello_cancellation.py | 10 +++++++--- hello/hello_local_activity.py | 4 +++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/hello/hello_cancellation.py b/hello/hello_cancellation.py index 3467893c..09a35daa 100644 --- a/hello/hello_cancellation.py +++ b/hello/hello_cancellation.py @@ -1,23 +1,26 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor +import time import traceback from datetime import timedelta from typing import NoReturn from temporalio import activity, workflow +from temporalio.exceptions import CancelledError from temporalio.client import Client, WorkflowFailureError from temporalio.worker import Worker @activity.defn -async def never_complete_activity() -> NoReturn: +def never_complete_activity() -> NoReturn: # All long-running activities should heartbeat. Heartbeat is how # cancellation is delivered from the server. try: while True: print("Heartbeating activity") activity.heartbeat() - await asyncio.sleep(1) - except asyncio.CancelledError: + time.sleep(1) + except CancelledError: print("Activity cancelled") raise @@ -56,6 +59,7 @@ async def main(): task_queue="hello-cancellation-task-queue", workflows=[CancellationWorkflow], activities=[never_complete_activity, cleanup_activity], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to start the workflow. diff --git a/hello/hello_local_activity.py b/hello/hello_local_activity.py index 08c1d9f2..374c29c5 100644 --- a/hello/hello_local_activity.py +++ b/hello/hello_local_activity.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -14,7 +15,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: return f"{input.greeting}, {input.name}!" @@ -39,6 +40,7 @@ async def main(): task_queue="hello-local-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and From e3741e0da9df4ab820c5c2f341a6a08afa3f6d72 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Tue, 1 Apr 2025 16:33:36 -0500 Subject: [PATCH 04/14] fixed small async straggler --- hello/hello_cancellation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hello/hello_cancellation.py b/hello/hello_cancellation.py index 09a35daa..50b45e21 100644 --- a/hello/hello_cancellation.py +++ b/hello/hello_cancellation.py @@ -26,7 +26,7 @@ def never_complete_activity() -> NoReturn: @activity.defn -async def cleanup_activity() -> None: +def cleanup_activity() -> None: print("Executing cleanup activity") From b36747893d384619c46f4632782d0a06657accf3 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Wed, 2 Apr 2025 11:00:44 -0500 Subject: [PATCH 05/14] converting async activities to sync: async completion --- hello/hello_async_activity_completion.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index 10aa89df..966790ba 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -1,6 +1,9 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta +from threading import Thread +import time from temporalio import activity, workflow from temporalio.client import Client @@ -18,7 +21,7 @@ def __init__(self, client: Client) -> None: self.client = client @activity.defn - async def compose_greeting(self, input: ComposeGreetingInput) -> str: + def compose_greeting(self, input: ComposeGreetingInput) -> str: # Schedule a task to complete this asynchronously. This could be done in # a completely different process or system. print("Completing activity asynchronously") @@ -27,15 +30,16 @@ async def compose_greeting(self, input: ComposeGreetingInput) -> str: # So we store the tasks ourselves. # See https://docs.python.org/3/library/asyncio-task.html#creating-tasks, # https://bugs.python.org/issue21163 and others. - _ = asyncio.create_task( - self.complete_greeting(activity.info().task_token, input) - ) + Thread( + target=self.complete_greeting, + args=(activity.info().task_token, input), + ).start() # Raise the complete-async error which will complete this function but # does not consider the activity complete from the workflow perspective activity.raise_complete_async() - async def complete_greeting( + def complete_greeting( self, task_token: bytes, input: ComposeGreetingInput ) -> None: # Let's wait three seconds, heartbeating each second. Note, heartbeating @@ -45,11 +49,11 @@ async def complete_greeting( handle = self.client.get_async_activity_handle(task_token=task_token) for _ in range(0, 3): print("Waiting one second...") - await handle.heartbeat() - await asyncio.sleep(1) + asyncio.run(handle.heartbeat()) + time.sleep(1) # Complete using the handle - await handle.complete(f"{input.greeting}, {input.name}!") + asyncio.run(handle.complete(f"{input.greeting}, {input.name}!")) @workflow.defn @@ -77,6 +81,7 @@ async def main(): task_queue="hello-async-activity-completion-task-queue", workflows=[GreetingWorkflow], activities=[composer.compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and From 45d78a0446808c430e4dd7050ab8f1cccd70f69d Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Wed, 2 Apr 2025 11:04:28 -0500 Subject: [PATCH 06/14] reviewed my own code --- hello/hello_activity.py | 2 +- hello/hello_activity_async.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hello/hello_activity.py b/hello/hello_activity.py index 5e045854..13b5fcbb 100644 --- a/hello/hello_activity.py +++ b/hello/hello_activity.py @@ -51,7 +51,7 @@ async def main(): task_queue="hello-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], - # Non-async activities require some kind of executor; + # Non-async activities require an executor; # a thread pool executor is recommended. # This same thread pool could be passed to multiple workers if desired. activity_executor=ThreadPoolExecutor(5), diff --git a/hello/hello_activity_async.py b/hello/hello_activity_async.py index d7d9b0f0..fd14a2cf 100644 --- a/hello/hello_activity_async.py +++ b/hello/hello_activity_async.py @@ -50,9 +50,9 @@ async def main(): task_queue="hello-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], - # If the worker is running async activities, you don't need + # If the worker is only running async activities, you don't need # to supply an activity executor because they run in - # the event loop of the worker. + # the worker's event loop. ): # While the worker is running, use the client to run the workflow and From b19154620f375f3af824a56d8357f1056e105911 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Wed, 2 Apr 2025 11:18:10 -0500 Subject: [PATCH 07/14] fix lints --- hello/hello_async_activity_completion.py | 4 +--- hello/hello_cancellation.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index 966790ba..cd42f3ff 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -39,9 +39,7 @@ def compose_greeting(self, input: ComposeGreetingInput) -> str: # does not consider the activity complete from the workflow perspective activity.raise_complete_async() - def complete_greeting( - self, task_token: bytes, input: ComposeGreetingInput - ) -> None: + def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> None: # Let's wait three seconds, heartbeating each second. Note, heartbeating # during async activity completion is done via the client directly. It # is often important to heartbeat so the server can know when an diff --git a/hello/hello_cancellation.py b/hello/hello_cancellation.py index 50b45e21..b846e074 100644 --- a/hello/hello_cancellation.py +++ b/hello/hello_cancellation.py @@ -6,8 +6,8 @@ from typing import NoReturn from temporalio import activity, workflow -from temporalio.exceptions import CancelledError from temporalio.client import Client, WorkflowFailureError +from temporalio.exceptions import CancelledError from temporalio.worker import Worker From 2299e65b514979557661e80d985dc8f868fc179f Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Wed, 2 Apr 2025 11:25:04 -0500 Subject: [PATCH 08/14] fixed more lints --- hello/hello_async_activity_completion.py | 2 +- hello/hello_cancellation.py | 2 +- hello/hello_exception.py | 2 +- hello/hello_patch.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index cd42f3ff..b193ef10 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -1,9 +1,9 @@ import asyncio +import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from threading import Thread -import time from temporalio import activity, workflow from temporalio.client import Client diff --git a/hello/hello_cancellation.py b/hello/hello_cancellation.py index b846e074..5bf38a66 100644 --- a/hello/hello_cancellation.py +++ b/hello/hello_cancellation.py @@ -1,7 +1,7 @@ import asyncio -from concurrent.futures import ThreadPoolExecutor import time import traceback +from concurrent.futures import ThreadPoolExecutor from datetime import timedelta from typing import NoReturn diff --git a/hello/hello_exception.py b/hello/hello_exception.py index 9c35987c..628c10c5 100644 --- a/hello/hello_exception.py +++ b/hello/hello_exception.py @@ -1,6 +1,6 @@ import asyncio -from concurrent.futures import ThreadPoolExecutor import logging +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from typing import NoReturn, Optional diff --git a/hello/hello_patch.py b/hello/hello_patch.py index 711d9f7d..e511ad5b 100644 --- a/hello/hello_patch.py +++ b/hello/hello_patch.py @@ -1,6 +1,6 @@ import asyncio -from concurrent.futures import ThreadPoolExecutor import sys +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta From eb0f5b7b5df69aae3598267a960bf6b1dcb6c7ed Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Wed, 2 Apr 2025 15:59:30 -0500 Subject: [PATCH 09/14] responded to review comments --- hello/hello_async_activity_completion.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index b193ef10..cf4c70a2 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -17,19 +17,15 @@ class ComposeGreetingInput: class GreetingComposer: - def __init__(self, client: Client) -> None: + def __init__(self, client: Client, loop: asyncio.AbstractEventLoop) -> None: self.client = client + self.loop = loop @activity.defn def compose_greeting(self, input: ComposeGreetingInput) -> str: - # Schedule a task to complete this asynchronously. This could be done in + # Make a thread to complete this externally. This could be done in # a completely different process or system. print("Completing activity asynchronously") - # Tasks stored by asyncio are weak references and therefore can get GC'd - # which can cause warnings like "Task was destroyed but it is pending!". - # So we store the tasks ourselves. - # See https://docs.python.org/3/library/asyncio-task.html#creating-tasks, - # https://bugs.python.org/issue21163 and others. Thread( target=self.complete_greeting, args=(activity.info().task_token, input), @@ -47,11 +43,13 @@ def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> N handle = self.client.get_async_activity_handle(task_token=task_token) for _ in range(0, 3): print("Waiting one second...") - asyncio.run(handle.heartbeat()) + asyncio.run_coroutine_threadsafe(handle.heartbeat(), self.loop) time.sleep(1) # Complete using the handle - asyncio.run(handle.complete(f"{input.greeting}, {input.name}!")) + asyncio.run_coroutine_threadsafe( + handle.complete(f"{input.greeting}, {input.name}!"), self.loop + ) @workflow.defn @@ -72,8 +70,10 @@ async def main(): # Start client client = await Client.connect("localhost:7233") + loop = asyncio.get_event_loop() + # Run a worker for the workflow - composer = GreetingComposer(client) + composer = GreetingComposer(client, loop) async with Worker( client, task_queue="hello-async-activity-completion-task-queue", From 015a1828f94ced66d8494f1df9b8613b58cc73bc Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Thu, 3 Apr 2025 08:14:52 -0500 Subject: [PATCH 10/14] fixed mistake on not awaiting a future --- hello/hello_async_activity_completion.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index cf4c70a2..22fc10f5 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -43,13 +43,13 @@ def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> N handle = self.client.get_async_activity_handle(task_token=task_token) for _ in range(0, 3): print("Waiting one second...") - asyncio.run_coroutine_threadsafe(handle.heartbeat(), self.loop) + asyncio.run_coroutine_threadsafe(handle.heartbeat(), self.loop).result() time.sleep(1) # Complete using the handle asyncio.run_coroutine_threadsafe( handle.complete(f"{input.greeting}, {input.name}!"), self.loop - ) + ).result() @workflow.defn From 394daffc90000bb016d44c3fd1e16029fa3b47cf Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Tue, 22 Apr 2025 11:26:56 -0500 Subject: [PATCH 11/14] fixed tests --- tests/custom_metric/workflow_test.py | 2 ++ tests/hello/hello_activity_choice_test.py | 5 ++--- tests/hello/hello_activity_test.py | 2 ++ tests/hello/hello_cancellation_test.py | 2 ++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/custom_metric/workflow_test.py b/tests/custom_metric/workflow_test.py index 4e107b79..bac5c07f 100644 --- a/tests/custom_metric/workflow_test.py +++ b/tests/custom_metric/workflow_test.py @@ -1,4 +1,5 @@ import uuid +from concurrent.futures import ThreadPoolExecutor from temporalio import activity from temporalio.client import Client @@ -22,6 +23,7 @@ async def print_message_mock(): task_queue=_TASK_QUEUE, workflows=[StartTwoActivitiesWorkflow], activities=[print_message_mock], + activity_executor=ThreadPoolExecutor(5), ): result = await client.execute_workflow( StartTwoActivitiesWorkflow.run, diff --git a/tests/hello/hello_activity_choice_test.py b/tests/hello/hello_activity_choice_test.py index 236634de..1dadbb88 100644 --- a/tests/hello/hello_activity_choice_test.py +++ b/tests/hello/hello_activity_choice_test.py @@ -20,13 +20,12 @@ ] -@pytest.mark.asyncio @pytest.mark.parametrize( "activity_func, order_amount, expected_result", activity_test_data ) -async def test_order_fruit(activity_func, order_amount, expected_result): +def test_order_fruit(activity_func, order_amount, expected_result): activity_environment = ActivityEnvironment() - result = await activity_environment.run(activity_func, order_amount) + result = activity_environment.run(activity_func, order_amount) assert result == expected_result diff --git a/tests/hello/hello_activity_test.py b/tests/hello/hello_activity_test.py index 4bd8e40e..5bf9c1b8 100644 --- a/tests/hello/hello_activity_test.py +++ b/tests/hello/hello_activity_test.py @@ -1,4 +1,5 @@ import uuid +from concurrent.futures import ThreadPoolExecutor from temporalio import activity from temporalio.client import Client @@ -18,6 +19,7 @@ async def test_execute_workflow(client: Client): client, task_queue=task_queue_name, workflows=[GreetingWorkflow], + activity_executor=ThreadPoolExecutor(5), activities=[compose_greeting], ): assert "Hello, World!" == await client.execute_workflow( diff --git a/tests/hello/hello_cancellation_test.py b/tests/hello/hello_cancellation_test.py index 2d4b946e..b511b50b 100644 --- a/tests/hello/hello_cancellation_test.py +++ b/tests/hello/hello_cancellation_test.py @@ -1,5 +1,6 @@ import asyncio import uuid +from concurrent.futures import ThreadPoolExecutor import pytest from temporalio.client import Client, WorkflowExecutionStatus, WorkflowFailureError @@ -21,6 +22,7 @@ async def test_cancel_workflow(client: Client): task_queue=task_queue_name, workflows=[CancellationWorkflow], activities=[cleanup_activity, never_complete_activity], + activity_executor=ThreadPoolExecutor(5), ): handle = await client.start_workflow( CancellationWorkflow.run, From ce1c741d7f149e846920ad82ce30f5f3cbd385d7 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 6 Jun 2025 10:51:35 -0500 Subject: [PATCH 12/14] revert async activity completion changes --- hello/hello_async_activity_completion.py | 39 +++++++++++------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index 22fc10f5..10aa89df 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -1,9 +1,6 @@ import asyncio -import time -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta -from threading import Thread from temporalio import activity, workflow from temporalio.client import Client @@ -17,25 +14,30 @@ class ComposeGreetingInput: class GreetingComposer: - def __init__(self, client: Client, loop: asyncio.AbstractEventLoop) -> None: + def __init__(self, client: Client) -> None: self.client = client - self.loop = loop @activity.defn - def compose_greeting(self, input: ComposeGreetingInput) -> str: - # Make a thread to complete this externally. This could be done in + async def compose_greeting(self, input: ComposeGreetingInput) -> str: + # Schedule a task to complete this asynchronously. This could be done in # a completely different process or system. print("Completing activity asynchronously") - Thread( - target=self.complete_greeting, - args=(activity.info().task_token, input), - ).start() + # Tasks stored by asyncio are weak references and therefore can get GC'd + # which can cause warnings like "Task was destroyed but it is pending!". + # So we store the tasks ourselves. + # See https://docs.python.org/3/library/asyncio-task.html#creating-tasks, + # https://bugs.python.org/issue21163 and others. + _ = asyncio.create_task( + self.complete_greeting(activity.info().task_token, input) + ) # Raise the complete-async error which will complete this function but # does not consider the activity complete from the workflow perspective activity.raise_complete_async() - def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> None: + async def complete_greeting( + self, task_token: bytes, input: ComposeGreetingInput + ) -> None: # Let's wait three seconds, heartbeating each second. Note, heartbeating # during async activity completion is done via the client directly. It # is often important to heartbeat so the server can know when an @@ -43,13 +45,11 @@ def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> N handle = self.client.get_async_activity_handle(task_token=task_token) for _ in range(0, 3): print("Waiting one second...") - asyncio.run_coroutine_threadsafe(handle.heartbeat(), self.loop).result() - time.sleep(1) + await handle.heartbeat() + await asyncio.sleep(1) # Complete using the handle - asyncio.run_coroutine_threadsafe( - handle.complete(f"{input.greeting}, {input.name}!"), self.loop - ).result() + await handle.complete(f"{input.greeting}, {input.name}!") @workflow.defn @@ -70,16 +70,13 @@ async def main(): # Start client client = await Client.connect("localhost:7233") - loop = asyncio.get_event_loop() - # Run a worker for the workflow - composer = GreetingComposer(client, loop) + composer = GreetingComposer(client) async with Worker( client, task_queue="hello-async-activity-completion-task-queue", workflows=[GreetingWorkflow], activities=[composer.compose_greeting], - activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and From 46b8a62f3eb76e3dd7f889c5a270514daedc74dc Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 6 Jun 2025 10:57:35 -0500 Subject: [PATCH 13/14] added code owners --- .github/CODEOWNERS | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..7bdb4eca --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,5 @@ +# These owners will be the default owners for everything in +# the repo. Unless a later match takes precedence, +# @temporalio/sdk will be requested for review when +# someone opens a pull request. +* @temporalio/sdk From 67dd2ccf45e388fa183b3e2f5f2af7874c29ec31 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 6 Jun 2025 10:59:58 -0500 Subject: [PATCH 14/14] deleted code owners --- .github/CODEOWNERS | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS deleted file mode 100644 index 7bdb4eca..00000000 --- a/.github/CODEOWNERS +++ /dev/null @@ -1,5 +0,0 @@ -# These owners will be the default owners for everything in -# the repo. Unless a later match takes precedence, -# @temporalio/sdk will be requested for review when -# someone opens a pull request. -* @temporalio/sdk