diff --git a/README.md b/README.md index 5cffefce..15ed5939 100644 --- a/README.md +++ b/README.md @@ -62,13 +62,14 @@ Some examples require extra dependencies. See each sample's directory for specif * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [gevent_async](gevent_async) - Combine gevent and Temporal. * [langchain](langchain) - Orchestrate workflows for LangChain. -* [message-passing introduction](message_passing/introduction/) - Introduction to queries, signals, and updates. +* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates. +* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals. +* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist. * [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry. * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. -* [safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. diff --git a/message_passing/update_with_start/lazy_initialization/README.md b/message_passing/update_with_start/lazy_initialization/README.md new file mode 100644 index 00000000..b9f6679a --- /dev/null +++ b/message_passing/update_with_start/lazy_initialization/README.md @@ -0,0 +1,18 @@ +# Update With Start: Lazy init + +This sample illustrates the use of update-with-start to send Updates to a Workflow, starting the Workflow if +it is not running yet ("lazy init"). The Workflow represents a Shopping Cart in an e-commerce application, and +update-with-start is used to add items to the cart, receiving back the updated cart subtotal. + +To run, first see the main [README.md](../../../README.md) for prerequisites. + +Then run the following from this directory: + + poetry run python worker.py + +Then, in another terminal: + + poetry run python starter.py + +This will start a worker to run your workflow and activities, then simulate a backend application receiving +requests to add items to a shopping cart, before finalizing the order. diff --git a/message_passing/update_with_start/lazy_initialization/__init__.py b/message_passing/update_with_start/lazy_initialization/__init__.py new file mode 100644 index 00000000..ac3632ee --- /dev/null +++ b/message_passing/update_with_start/lazy_initialization/__init__.py @@ -0,0 +1 @@ +TASK_QUEUE = "update-with-start-lazy-initialization" diff --git a/message_passing/update_with_start/lazy_initialization/activities.py b/message_passing/update_with_start/lazy_initialization/activities.py new file mode 100644 index 00000000..fa730e47 --- /dev/null +++ b/message_passing/update_with_start/lazy_initialization/activities.py @@ -0,0 +1,20 @@ +import asyncio +from dataclasses import dataclass +from typing import Optional + +from temporalio import activity + + +@dataclass +class ShoppingCartItem: + sku: str + quantity: int + + +@activity.defn +async def get_price(item: ShoppingCartItem) -> Optional[int]: + await asyncio.sleep(0.1) + price = None if item.sku == "sku-456" else 599 + if price is None: + return None + return price * item.quantity diff --git a/message_passing/update_with_start/lazy_initialization/starter.py b/message_passing/update_with_start/lazy_initialization/starter.py new file mode 100644 index 00000000..b3274f9d --- /dev/null +++ b/message_passing/update_with_start/lazy_initialization/starter.py @@ -0,0 +1,78 @@ +import asyncio +import uuid +from typing import Optional, Tuple + +from temporalio import common +from temporalio.client import ( + Client, + WithStartWorkflowOperation, + WorkflowHandle, + WorkflowUpdateFailedError, +) +from temporalio.exceptions import ApplicationError + +from message_passing.update_with_start.lazy_initialization import TASK_QUEUE +from message_passing.update_with_start.lazy_initialization.workflows import ( + ShoppingCartItem, + ShoppingCartWorkflow, +) + + +async def handle_add_item_request( + session_id: str, item_id: str, quantity: int, temporal_client: Client +) -> Tuple[Optional[int], WorkflowHandle]: + """ + Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is + available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start + time and is shared by all request handlers. + + A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to + add an item to the shopping cart, creating the cart if it doesn't already exist. + + Note that the workflow handle is available, even if the Update fails. + """ + cart_id = f"cart-{session_id}" + start_op = WithStartWorkflowOperation( + ShoppingCartWorkflow.run, + id=cart_id, + id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING, + task_queue=TASK_QUEUE, + ) + try: + price = await temporal_client.execute_update_with_start_workflow( + ShoppingCartWorkflow.add_item, + ShoppingCartItem(sku=item_id, quantity=quantity), + start_workflow_operation=start_op, + ) + except WorkflowUpdateFailedError as err: + if ( + isinstance(err.cause, ApplicationError) + and err.cause.type == "ItemUnavailableError" + ): + price = None + else: + raise err + + workflow_handle = await start_op.workflow_handle() + + return price, workflow_handle + + +async def main(): + print("🛒") + session_id = f"session-{uuid.uuid4()}" + temporal_client = await Client.connect("localhost:7233") + subtotal_1, _ = await handle_add_item_request( + session_id, "sku-123", 1, temporal_client + ) + subtotal_2, wf_handle = await handle_add_item_request( + session_id, "sku-456", 1, temporal_client + ) + print(f"subtotals were, {[subtotal_1, subtotal_2]}") + await wf_handle.signal(ShoppingCartWorkflow.checkout) + final_order = await wf_handle.result() + print(f"final order: {final_order}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/message_passing/update_with_start/lazy_initialization/worker.py b/message_passing/update_with_start/lazy_initialization/worker.py new file mode 100644 index 00000000..1964b43e --- /dev/null +++ b/message_passing/update_with_start/lazy_initialization/worker.py @@ -0,0 +1,35 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.update_with_start.lazy_initialization import TASK_QUEUE, workflows +from message_passing.update_with_start.lazy_initialization.activities import get_price + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[workflows.ShoppingCartWorkflow], + activities=[get_price], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/message_passing/update_with_start/lazy_initialization/workflows.py b/message_passing/update_with_start/lazy_initialization/workflows.py new file mode 100644 index 00000000..49964ff2 --- /dev/null +++ b/message_passing/update_with_start/lazy_initialization/workflows.py @@ -0,0 +1,60 @@ +from dataclasses import dataclass +from datetime import timedelta +from typing import List, Tuple + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +with workflow.unsafe.imports_passed_through(): + from message_passing.update_with_start.lazy_initialization.activities import ( + ShoppingCartItem, + get_price, + ) + + +@dataclass +class FinalizedOrder: + id: str + items: List[Tuple[ShoppingCartItem, int]] + total: int + + +@workflow.defn +class ShoppingCartWorkflow: + def __init__(self): + self.items: List[Tuple[ShoppingCartItem, int]] = [] + self.order_submitted = False + + @workflow.run + async def run(self) -> FinalizedOrder: + await workflow.wait_condition( + lambda: workflow.all_handlers_finished() and self.order_submitted + ) + return FinalizedOrder( + id=workflow.info().workflow_id, + items=self.items, + total=sum(price for _, price in self.items), + ) + + @workflow.update + async def add_item(self, item: ShoppingCartItem) -> int: + price = await workflow.execute_activity( + get_price, item, start_to_close_timeout=timedelta(seconds=10) + ) + if price is None: + raise ApplicationError( + f"Item unavailable: {item}", + type="ItemUnavailableError", + ) + self.items.append((item, price)) + + return sum(price for _, price in self.items) + + @add_item.validator + def validate_add_item(self, item: ShoppingCartItem) -> None: + if self.order_submitted: + raise ApplicationError("Order already submitted") + + @workflow.signal + def checkout(self): + self.order_submitted = True diff --git a/tests/conftest.py b/tests/conftest.py index 95294fb4..e63a059b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,7 +41,12 @@ def event_loop(): async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]: env_type = request.config.getoption("--workflow-environment") if env_type == "local": - env = await WorkflowEnvironment.start_local() + env = await WorkflowEnvironment.start_local( + dev_server_extra_args=[ + "--dynamic-config-value", + "frontend.enableExecuteMultiOperation=true", + ] + ) elif env_type == "time-skipping": env = await WorkflowEnvironment.start_time_skipping() else: diff --git a/tests/message_passing/lazy_initialization/test_lazy_initialization.py b/tests/message_passing/lazy_initialization/test_lazy_initialization.py new file mode 100644 index 00000000..dcf94582 --- /dev/null +++ b/tests/message_passing/lazy_initialization/test_lazy_initialization.py @@ -0,0 +1,60 @@ +import pytest +from temporalio import common +from temporalio.client import Client, WithStartWorkflowOperation +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from message_passing.update_with_start.lazy_initialization.workflows import ( + ShoppingCartItem, + ShoppingCartWorkflow, + get_price, +) + + +async def test_shopping_cart_workflow(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + async with Worker( + client, + task_queue="lazy-initialization-test", + workflows=[ShoppingCartWorkflow], + activities=[get_price], + ): + cart_id = "cart--session-1234" + make_start_op = lambda: WithStartWorkflowOperation( + ShoppingCartWorkflow.run, + id=cart_id, + id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING, + task_queue="lazy-initialization-test", + ) + start_op_1 = make_start_op() + price = await client.execute_update_with_start_workflow( + ShoppingCartWorkflow.add_item, + ShoppingCartItem(sku="item-1", quantity=2), + start_workflow_operation=start_op_1, + ) + + assert price == 1198 + + workflow_handle = await start_op_1.workflow_handle() + + start_op_2 = make_start_op() + price = await client.execute_update_with_start_workflow( + ShoppingCartWorkflow.add_item, + ShoppingCartItem(sku="item-2", quantity=1), + start_workflow_operation=start_op_2, + ) + assert price == 1797 + + workflow_handle = await start_op_2.workflow_handle() + + await workflow_handle.signal(ShoppingCartWorkflow.checkout) + + finalized_order = await workflow_handle.result() + assert finalized_order.items == [ + (ShoppingCartItem(sku="item-1", quantity=2), 1198), + (ShoppingCartItem(sku="item-2", quantity=1), 599), + ] + assert finalized_order.total == 1797