From 460efcc8f76e08a136568f3e6186f24caf6e4bcb Mon Sep 17 00:00:00 2001 From: Andreas Raeder Date: Thu, 20 Feb 2025 14:28:52 +0000 Subject: [PATCH 1/4] feat: add inital deploy and notify teams for prefect including examples --- examples/prefect/deploy_flow.py | 63 ++++++++++++++ examples/prefect/notify_teams.py | 43 ++++++++++ src/osw/utils/prefect.py | 142 +++++++++++++++++++++++++++++-- 3 files changed, 241 insertions(+), 7 deletions(-) create mode 100644 examples/prefect/deploy_flow.py create mode 100644 examples/prefect/notify_teams.py diff --git a/examples/prefect/deploy_flow.py b/examples/prefect/deploy_flow.py new file mode 100644 index 00000000..0528a714 --- /dev/null +++ b/examples/prefect/deploy_flow.py @@ -0,0 +1,63 @@ +"""Example usage of deploy function""" + +from datetime import timedelta +from os import environ + +from prefect import flow +from prefect.artifacts import create_table_artifact + +from osw.utils.prefect import DeployConfig, DeployParam, deploy, tagsStrToList + +# Set environment variables +environ["PREFECT_DEPLOYMENT_NAME"] = "osw-python-deploy-example" +environ["PREFECT_DEPLOYMENT_DESCRIPTION"] = "Deployment of notify_teams.py" +environ["PREFECT_DEPLOYMENT_VERSION"] = "0.0.1" +environ["PREFECT_DEPLOYMENT_TAGS"] = "osw-python,example-deploy-flow" +environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"] = "1" +# environ["PREFECT_DEPLOYMENT_CRON"] = "* * * * *" + + +@flow(log_prints=True) +def example_flow_to_deploy(): + """Example flow to be deployed""" + print(f"Execution of example: {example_flow_to_deploy.__name__}!") + # set example table artifact + create_table_artifact( + key="example-table", + table=[ + {"name": "Alice", "age": 24}, + {"name": "Bob", "age": 25}, + ], + description="Example table artifact", + ) + + +if __name__ == "__main__": + """Deploy the example flow""" + # Example using environment variables + deploy( + DeployParam( + deployments=[ + DeployConfig( + flow=example_flow_to_deploy, + name=environ.get("PREFECT_DEPLOYMENT_NAME"), + description=environ.get("PREFECT_DEPLOYMENT_DESCRIPTION"), + version=environ.get("PREFECT_DEPLOYMENT_VERSION"), + tags=tagsStrToList(environ.get("PREFECT_DEPLOYMENT_TAGS")), + interval=timedelta( + minutes=int(environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN")) + ), # either interval or cron + # cron=environ.get("PREFECT_DEPLOYMENT_CRON"), + ) + ], + # remove_existing_deployments=True, + ) + ) + + # Clear secret environment variables + del environ["PREFECT_DEPLOYMENT_NAME"] + del environ["PREFECT_DEPLOYMENT_DESCRIPTION"] + del environ["PREFECT_DEPLOYMENT_VERSION"] + del environ["PREFECT_DEPLOYMENT_TAGS"] + del environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"] + # del environ["PREFECT_DEPLOYMENT_CRON"] diff --git a/examples/prefect/notify_teams.py b/examples/prefect/notify_teams.py new file mode 100644 index 00000000..567cb0b9 --- /dev/null +++ b/examples/prefect/notify_teams.py @@ -0,0 +1,43 @@ +"""Example of sending notifications to MS Teams on prefect flow failures""" + +from os import environ + +from prefect import flow +from pydantic import SecretStr + +from osw.utils.prefect import NotifyTeams, NotifyTeamsParam + +# Prerequisite: Set environment variable TEAMS_WEBHOOK_URL +# in CLI: export TEAMS_WEBHOOK_URL="https://prod..." +# in python uncomment below, DO NOT PUSH SECRETS TO GIT + +# environ["TEAMS_WEBHOOK_URL"] = "https://prod..." + + +# Decorator must be configured with on_failure argument +@flow( + # Microsoft Teams notification on failure -> + # on_failure use `notify_teams` function without brackets as list element + on_failure=[ + NotifyTeams( + NotifyTeamsParam( + teams_webhook_url=SecretStr(environ.get("TEAMS_WEBHOOK_URL")), + # OPTIONAL, will be empty if no deploment is assigned + deployment_name="osw-python-notify-teams-example", + ) + ).notify_teams + ], + log_prints=True, +) +def example_error_flow(): + """Test flow that always fails""" + + raise ValueError( + "oops! LOREM IPSUM DOLOR SIT AMET CONSECTETUR ADIPISICING ELIT " * 1 + ) + + +if __name__ == "__main__": + example_error_flow() + # Clear secret environment variable + del environ["TEAMS_WEBHOOK_URL"] diff --git a/src/osw/utils/prefect.py b/src/osw/utils/prefect.py index 005c2c95..69379222 100644 --- a/src/osw/utils/prefect.py +++ b/src/osw/utils/prefect.py @@ -1,17 +1,31 @@ """Prefect utils as support for OpenSemanticWorld.""" +import asyncio +import re +import sys +from datetime import timedelta +from importlib.metadata import version +from typing import Iterable, List, Optional + +from packaging.specifiers import SpecifierSet +from prefect import Flow, get_client, serve from prefect.blocks.notifications import MicrosoftTeamsWebhook from prefect.client.schemas.objects import FlowRun from prefect.settings import PREFECT_API_URL from prefect.states import State from pydantic import BaseModel, SecretStr +# from prefect.settings import PREFECT_API_URL + +# ------------------------------ NOTIFICATIONS ------------------------------ class NotifyTeamsParam(BaseModel): """Parameter set for notifying Microsoft Teams using class NotifyTeams""" - deployment_name: str teams_webhook_url: SecretStr + """Microsoft Teams webhook URL containing a secret""" + deployment_name: Optional[str] = None + """Deployment name to be displayed in the notification""" class NotifyTeams(NotifyTeamsParam): @@ -29,17 +43,21 @@ def notify_teams( host_url = str(PREFECT_API_URL.value()).replace("/api", "") + _flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa + if flow_run.deployment_id is not None: + # Assigned deployment found deployment_url = ( f"{host_url}/deployments/deployment/{flow_run.deployment_id}" ) + if self.deployment_name == "" or self.deployment_name is None: + _deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa + else: + _deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa else: - deployment_url = "" - _flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa - if self.deployment_name == "" or self.deployment_name is None: - _deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa - else: - _deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa + # No deployment assigned + _deployment = "🚀 Deployment: _Just flow, no deployment_\n\n" + _ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" # noqa if flow_run.tags != []: _tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n" @@ -48,6 +66,116 @@ def notify_teams( _message = f"📜 Message:\n\n_`{state.message}`_" + # # DEBUG + # print(_flow_run + _deployment + _ts + _tags + _message) + # print(f"Teams webhook URL: {self.teams_webhook_url}") + # print(f"Deployment name: {self.deployment_name}") + # print(f"Flow name: {flow.name}") + # print(f"Flow run name: {flow_run.name}") + # print(f"Flow run ID: {flow_run.id}") + MicrosoftTeamsWebhook(url=self.teams_webhook_url.get_secret_value()).notify( body=(_flow_run + _deployment + _ts + _tags + _message) ) + + +# ------------------------------- DEPLOYMENTS ------------------------------- +def tagsStrToList(tags: str) -> List[str]: + """Remove tags whitespaces, newlines, tabs, empty strings, split comma""" + return list(filter(None, re.sub(r"\s+", "", tags).split(","))) + + +class DeployConfig(BaseModel): + """Prefect deployment configuration""" + + flow: Flow + name: str | None = None + description: str | None = None + interval: Iterable[int | float | timedelta] | int | float | timedelta | None = None + cron: Iterable[str] | str | None = None + version: str | None = None + tags: List[str] | None = None + + # Parameters that could be added in future, see to_deployment function: + # rrule: Iterable[str] | str | None = None + # paused: bool | None = None + # schedules: List[FlexibleScheduleList] | None = None + # schedule: SCHEDULE_TYPES | None = None + # is_schedule_active: bool | None = None + # parameters: dict | None = None + # triggers: List[DeploymentTriggerTypes | TriggerTypes] | None = None + # enforce_parameter_schema: bool = False + # work_pool_name: str | None = None + # work_queue_name: str | None = None + # job_variables: Dict[str, Any] | None = None + # deployment_id: str | None = None + # prefect_api_url: str = PREFECT_API_URL + class Config: + arbitrary_types_allowed = True + + +class DeployParam(BaseModel): + """Parameter set for deploying flows as deployments""" + + deployments: List[DeployConfig] + """List of deployments to be served""" + # TODO: Implement remove_existing_deployments + remove_existing_deployments: Optional[bool] = False + """Will remove existing deployments of the specified flows/software""" + # TODO: Add parameter for OSW support in next version + + +async def _deploy(param: DeployParam): + """programmatic deployment supported in newer prefect versions + This should become part of osw-python + """ + + deployments = [] + + for deploy_config in param.deployments: + flow: Flow = deploy_config.flow + # Set deployment name if not provided + if deploy_config.name is None or deploy_config.name == "": + deploy_config.name = flow.name + "-deployment" + config = await flow.to_deployment( + name=deploy_config.name, + tags=deploy_config.tags, + cron=deploy_config.cron, + interval=deploy_config.interval, + description=deploy_config.description, + version=deploy_config.version, + ) + await config.apply() # returns the deployment_uuid + + deployments.append(config) + + # fetch flow uuid + async with get_client() as client: + response = await client.read_flow_by_name(flow.name) + print(response.json()) + flow_uuid = response.id + print("Flow UUID:", flow_uuid) + + # prefect_domain = ( + # environ.get("PREFECT_API_URL").split("//")[-1].split("/")[0] + # ) # noqa + # print("Prefect domain:", prefect_domain) + # start agent to serve deployment + # await deploy_config.flow.serve(name=deployment_name) + if version("prefect") in SpecifierSet(">=3.0"): + return deployments + else: + await serve(*deployments) + + +def deploy(param: DeployParam): + """Function to serve configured flows as deployments by python version.""" + if sys.version_info >= (3, 11): + # python >= 3.11 + with asyncio.Runner() as runner: + deployments = runner.run(_deploy(param)) + else: + # python < 3.11 + deployments = asyncio.run(_deploy(param)) + if version("prefect") in SpecifierSet(">=3.0"): + serve(*deployments) From 1ab7e0516283d580ebc930e35f60e734d9a773fc Mon Sep 17 00:00:00 2001 From: Andreas Raeder Date: Fri, 21 Feb 2025 10:11:43 +0000 Subject: [PATCH 2/4] fix: rename script file due to prevent import issues, add local test to be moved to tests --- examples/prefect/deploy_flow.py | 6 +- examples/prefect/notify_teams.py | 2 +- src/osw/utils/{prefect.py => workflow.py} | 74 ++++++++++++++--------- 3 files changed, 52 insertions(+), 30 deletions(-) rename src/osw/utils/{prefect.py => workflow.py} (77%) diff --git a/examples/prefect/deploy_flow.py b/examples/prefect/deploy_flow.py index 0528a714..6a890b7c 100644 --- a/examples/prefect/deploy_flow.py +++ b/examples/prefect/deploy_flow.py @@ -6,7 +6,7 @@ from prefect import flow from prefect.artifacts import create_table_artifact -from osw.utils.prefect import DeployConfig, DeployParam, deploy, tagsStrToList +from osw.utils.workflow import DeployConfig, DeployParam, deploy, tagsStrToList # Set environment variables environ["PREFECT_DEPLOYMENT_NAME"] = "osw-python-deploy-example" @@ -45,7 +45,9 @@ def example_flow_to_deploy(): version=environ.get("PREFECT_DEPLOYMENT_VERSION"), tags=tagsStrToList(environ.get("PREFECT_DEPLOYMENT_TAGS")), interval=timedelta( - minutes=int(environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN")) + minutes=int( + environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN") + ) ), # either interval or cron # cron=environ.get("PREFECT_DEPLOYMENT_CRON"), ) diff --git a/examples/prefect/notify_teams.py b/examples/prefect/notify_teams.py index 567cb0b9..3f1fbf88 100644 --- a/examples/prefect/notify_teams.py +++ b/examples/prefect/notify_teams.py @@ -5,7 +5,7 @@ from prefect import flow from pydantic import SecretStr -from osw.utils.prefect import NotifyTeams, NotifyTeamsParam +from osw.utils.workflow import NotifyTeams, NotifyTeamsParam # Prerequisite: Set environment variable TEAMS_WEBHOOK_URL # in CLI: export TEAMS_WEBHOOK_URL="https://prod..." diff --git a/src/osw/utils/prefect.py b/src/osw/utils/workflow.py similarity index 77% rename from src/osw/utils/prefect.py rename to src/osw/utils/workflow.py index 69379222..d2db8240 100644 --- a/src/osw/utils/prefect.py +++ b/src/osw/utils/workflow.py @@ -6,19 +6,27 @@ from datetime import timedelta from importlib.metadata import version from typing import Iterable, List, Optional - from packaging.specifiers import SpecifierSet -from prefect import Flow, get_client, serve +from prefect import Flow, serve from prefect.blocks.notifications import MicrosoftTeamsWebhook from prefect.client.schemas.objects import FlowRun from prefect.settings import PREFECT_API_URL from prefect.states import State from pydantic import BaseModel, SecretStr -# from prefect.settings import PREFECT_API_URL +# ------------------------------ TEST ------------------------------ + + +from prefect import flow + + +@flow +def example_flow_to_deploy(): + """Example flow to be deployed""" + print(f"Execution of example: {example_flow_to_deploy.__name__}!") -# ------------------------------ NOTIFICATIONS ------------------------------ +# ------------------------------ NOTIFICATIONS --------------------- class NotifyTeamsParam(BaseModel): """Parameter set for notifying Microsoft Teams using class NotifyTeams""" @@ -74,9 +82,9 @@ def notify_teams( # print(f"Flow run name: {flow_run.name}") # print(f"Flow run ID: {flow_run.id}") - MicrosoftTeamsWebhook(url=self.teams_webhook_url.get_secret_value()).notify( - body=(_flow_run + _deployment + _ts + _tags + _message) - ) + MicrosoftTeamsWebhook( + url=self.teams_webhook_url.get_secret_value() + ).notify(body=(_flow_run + _deployment + _ts + _tags + _message)) # ------------------------------- DEPLOYMENTS ------------------------------- @@ -88,10 +96,12 @@ def tagsStrToList(tags: str) -> List[str]: class DeployConfig(BaseModel): """Prefect deployment configuration""" - flow: Flow + flow: Flow # to be excluded in `flow.to_deployment()` function name: str | None = None description: str | None = None - interval: Iterable[int | float | timedelta] | int | float | timedelta | None = None + interval: ( + Iterable[int | float | timedelta] | int | float | timedelta | None + ) = None cron: Iterable[str] | str | None = None version: str | None = None tags: List[str] | None = None @@ -138,6 +148,7 @@ async def _deploy(param: DeployParam): if deploy_config.name is None or deploy_config.name == "": deploy_config.name = flow.name + "-deployment" config = await flow.to_deployment( + # Entpacken und ungleiche Parameter exkludieren (ggf. ext funktion schreiben mit inspect.signature -> fkt + dict input -> dict mit keys der args output) name=deploy_config.name, tags=deploy_config.tags, cron=deploy_config.cron, @@ -149,33 +160,42 @@ async def _deploy(param: DeployParam): deployments.append(config) - # fetch flow uuid - async with get_client() as client: - response = await client.read_flow_by_name(flow.name) - print(response.json()) - flow_uuid = response.id - print("Flow UUID:", flow_uuid) - - # prefect_domain = ( - # environ.get("PREFECT_API_URL").split("//")[-1].split("/")[0] - # ) # noqa - # print("Prefect domain:", prefect_domain) - # start agent to serve deployment - # await deploy_config.flow.serve(name=deployment_name) if version("prefect") in SpecifierSet(">=3.0"): - return deployments + print(f"prefect version IF: {version('prefect')}") + # return deployments + serve(*deployments) else: + print(f"prefect version ELSE: {version('prefect')}") await serve(*deployments) def deploy(param: DeployParam): """Function to serve configured flows as deployments by python version.""" if sys.version_info >= (3, 11): + print(f"python version IF: {sys.version_info}") # python >= 3.11 with asyncio.Runner() as runner: - deployments = runner.run(_deploy(param)) + runner.run(_deploy(param)) else: # python < 3.11 - deployments = asyncio.run(_deploy(param)) - if version("prefect") in SpecifierSet(">=3.0"): - serve(*deployments) + print(f"python version ELSE: {sys.version_info}") + asyncio.run(_deploy(param)) + + +if __name__ == "__main__": + + deploy( + DeployParam( + deployments=[ + DeployConfig( + flow=example_flow_to_deploy, + name="osw-python-deploy-example", + description="Deployment of notify_teams.py", + version="0.0.1", + tags=["osw-python", "example-deploy-flow"], + interval=timedelta(seconds=20), + ) + ], + # remove_existing_deployments=True, + ) + ) From 9381dc32bb47e184e76b26b751aac587d44478b8 Mon Sep 17 00:00:00 2001 From: Andreas Raeder Date: Thu, 27 Feb 2025 17:03:53 +0000 Subject: [PATCH 3/4] add add support for python 3.8 pydantic syntax and tests for prefect workflows --- examples/prefect/deploy_flow.py | 8 +- setup.cfg | 4 + src/osw/utils/workflow.py | 159 +++++++++++++++++--------------- tests/utils/workflow.py | 94 +++++++++++++++++++ 4 files changed, 185 insertions(+), 80 deletions(-) create mode 100644 tests/utils/workflow.py diff --git a/examples/prefect/deploy_flow.py b/examples/prefect/deploy_flow.py index 6a890b7c..ca57463f 100644 --- a/examples/prefect/deploy_flow.py +++ b/examples/prefect/deploy_flow.py @@ -6,7 +6,7 @@ from prefect import flow from prefect.artifacts import create_table_artifact -from osw.utils.workflow import DeployConfig, DeployParam, deploy, tagsStrToList +from osw.utils.workflow import DeployConfig, DeployParam, deploy, tags_str_to_list # Set environment variables environ["PREFECT_DEPLOYMENT_NAME"] = "osw-python-deploy-example" @@ -43,11 +43,9 @@ def example_flow_to_deploy(): name=environ.get("PREFECT_DEPLOYMENT_NAME"), description=environ.get("PREFECT_DEPLOYMENT_DESCRIPTION"), version=environ.get("PREFECT_DEPLOYMENT_VERSION"), - tags=tagsStrToList(environ.get("PREFECT_DEPLOYMENT_TAGS")), + tags=tags_str_to_list(environ.get("PREFECT_DEPLOYMENT_TAGS")), interval=timedelta( - minutes=int( - environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN") - ) + minutes=int(environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN")) ), # either interval or cron # cron=environ.get("PREFECT_DEPLOYMENT_CRON"), ) diff --git a/setup.cfg b/setup.cfg index 5f8ce476..fa544bc9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -111,6 +111,8 @@ dev = # Add here test requirements (semicolon/line-separated) testing = + pytest-asyncio + prefect setuptools pytest pytest-cov @@ -147,6 +149,8 @@ norecursedirs = build .tox testpaths = tests +asyncio_default_fixture_loop_scope = function + # Use pytest markers to select/deselect specific tests # markers = # slow: mark tests as slow (deselect with '-m "not slow"') diff --git a/src/osw/utils/workflow.py b/src/osw/utils/workflow.py index d2db8240..a9be8dbb 100644 --- a/src/osw/utils/workflow.py +++ b/src/osw/utils/workflow.py @@ -5,25 +5,17 @@ import sys from datetime import timedelta from importlib.metadata import version -from typing import Iterable, List, Optional +from inspect import signature +from typing import Any, Dict, Iterable, List, Optional, Union + from packaging.specifiers import SpecifierSet from prefect import Flow, serve from prefect.blocks.notifications import MicrosoftTeamsWebhook from prefect.client.schemas.objects import FlowRun from prefect.settings import PREFECT_API_URL from prefect.states import State -from pydantic import BaseModel, SecretStr - -# ------------------------------ TEST ------------------------------ - - -from prefect import flow - - -@flow -def example_flow_to_deploy(): - """Example flow to be deployed""" - print(f"Execution of example: {example_flow_to_deploy.__name__}!") +from pydantic import SecretStr +from pydantic.v1 import BaseModel # ------------------------------ NOTIFICATIONS --------------------- @@ -35,12 +27,17 @@ class NotifyTeamsParam(BaseModel): deployment_name: Optional[str] = None """Deployment name to be displayed in the notification""" + # allow arbitrary types for compatibility with pydantic v1 + class Config: + arbitrary_types_allowed = True + class NotifyTeams(NotifyTeamsParam): """Notify Microsoft Teams channel using a webhook""" def __init__(self, notify_teams_param: NotifyTeamsParam): - super().__init__(**notify_teams_param.model_dump()) + # super().__init__(**notify_teams_param.model_dump()) # pydantic v2 + super().__init__(**notify_teams_param.dict()) # pydantic v1 def notify_teams( self, @@ -72,54 +69,62 @@ def notify_teams( else: _tags = "" - _message = f"📜 Message:\n\n_`{state.message}`_" - - # # DEBUG - # print(_flow_run + _deployment + _ts + _tags + _message) - # print(f"Teams webhook URL: {self.teams_webhook_url}") - # print(f"Deployment name: {self.deployment_name}") - # print(f"Flow name: {flow.name}") - # print(f"Flow run name: {flow_run.name}") - # print(f"Flow run ID: {flow_run.id}") + if state.message is None: + _message = "No message provided." + else: + _message = f"📜 Message:\n\n_`{state.message}`_" MicrosoftTeamsWebhook( - url=self.teams_webhook_url.get_secret_value() + url=str(self.teams_webhook_url.get_secret_value()) ).notify(body=(_flow_run + _deployment + _ts + _tags + _message)) # ------------------------------- DEPLOYMENTS ------------------------------- -def tagsStrToList(tags: str) -> List[str]: +def tags_str_to_list(tags: str) -> List[str]: """Remove tags whitespaces, newlines, tabs, empty strings, split comma""" return list(filter(None, re.sub(r"\s+", "", tags).split(","))) +# def filter_arguments(func, args_dict): +# """Filter arguments for a function based on its signature""" +# sig = signature(func) +# valid_params = sig.parameters +# filtered_args = {k: v for k, v in args_dict.items() if k in valid_params} +# return filtered_args + + +def match_func_model_args(func, model: BaseModel) -> dict: + """Match function arguments with model attributes""" + valid_params = set(signature(func).parameters) + # model_attrs = model.model_dump().items() # pydantic v2 + model_attrs = model.dict().items() # pydantic v1 + matched_args = {k: v for k, v in model_attrs if k in valid_params} + return matched_args + + class DeployConfig(BaseModel): """Prefect deployment configuration""" flow: Flow # to be excluded in `flow.to_deployment()` function - name: str | None = None - description: str | None = None - interval: ( - Iterable[int | float | timedelta] | int | float | timedelta | None - ) = None - cron: Iterable[str] | str | None = None - version: str | None = None - tags: List[str] | None = None - - # Parameters that could be added in future, see to_deployment function: - # rrule: Iterable[str] | str | None = None - # paused: bool | None = None - # schedules: List[FlexibleScheduleList] | None = None - # schedule: SCHEDULE_TYPES | None = None - # is_schedule_active: bool | None = None - # parameters: dict | None = None - # triggers: List[DeploymentTriggerTypes | TriggerTypes] | None = None - # enforce_parameter_schema: bool = False - # work_pool_name: str | None = None - # work_queue_name: str | None = None - # job_variables: Dict[str, Any] | None = None - # deployment_id: str | None = None - # prefect_api_url: str = PREFECT_API_URL + # Union instead of | for compatibility with pydantic v1, python < 3.10 + name: Union[str, None] = None + description: Union[str, None] = None + interval: Union[ + Iterable[Union[int, float, timedelta]], int, float, timedelta, None + ] = None + cron: Union[Iterable[str], str, None] = None + version: Union[str, None] = None + tags: Union[List[str], None] = None + rrule: Union[Iterable[str], str, None] = None + paused: Union[bool, None] = None + is_schedule_active: Union[bool, None] = None + parameters: Union[dict, None] = None + enforce_parameter_schema: bool = False + work_pool_name: Union[str, None] = None + work_queue_name: Union[str, None] = None + job_variables: Union[Dict[str, Any], None] = None + deployment_id: Union[str, None] = None + class Config: arbitrary_types_allowed = True @@ -147,15 +152,11 @@ async def _deploy(param: DeployParam): # Set deployment name if not provided if deploy_config.name is None or deploy_config.name == "": deploy_config.name = flow.name + "-deployment" - config = await flow.to_deployment( - # Entpacken und ungleiche Parameter exkludieren (ggf. ext funktion schreiben mit inspect.signature -> fkt + dict input -> dict mit keys der args output) - name=deploy_config.name, - tags=deploy_config.tags, - cron=deploy_config.cron, - interval=deploy_config.interval, - description=deploy_config.description, - version=deploy_config.version, - ) + + # Match valid args of flow.to_deployment and deploy_config + kwargs = match_func_model_args(func=flow.to_deployment, model=deploy_config) + # Set config via matching flow.to_deployment arguments + config = await flow.to_deployment(**kwargs) await config.apply() # returns the deployment_uuid deployments.append(config) @@ -163,7 +164,7 @@ async def _deploy(param: DeployParam): if version("prefect") in SpecifierSet(">=3.0"): print(f"prefect version IF: {version('prefect')}") # return deployments - serve(*deployments) + await serve(*deployments) else: print(f"prefect version ELSE: {version('prefect')}") await serve(*deployments) @@ -182,20 +183,28 @@ def deploy(param: DeployParam): asyncio.run(_deploy(param)) -if __name__ == "__main__": - - deploy( - DeployParam( - deployments=[ - DeployConfig( - flow=example_flow_to_deploy, - name="osw-python-deploy-example", - description="Deployment of notify_teams.py", - version="0.0.1", - tags=["osw-python", "example-deploy-flow"], - interval=timedelta(seconds=20), - ) - ], - # remove_existing_deployments=True, - ) - ) +# # ------------------------------- TEST ------------------------------- +# from prefect import flow + + +# @flow +# def osw_python_test_flow_to_deploy(): +# """Example flow to be deployed""" +# print(f"Execution of example: {osw_python_test_flow_to_deploy.__name__}!") + + +# if __name__ == "__main__": +# deploy( +# DeployParam( +# deployments=[ +# DeployConfig( +# flow=osw_python_test_flow_to_deploy, +# name="osw-python-deployment-test", +# description="Deployment of osw-python test flow", +# version="0.0.1", +# tags=["osw-python", "example-deploy-flow"], +# ) +# ], +# # remove_existing_deployments=True, +# ) +# ) diff --git a/tests/utils/workflow.py b/tests/utils/workflow.py new file mode 100644 index 00000000..c99d8e57 --- /dev/null +++ b/tests/utils/workflow.py @@ -0,0 +1,94 @@ +"""Test workflows for `osw-python` package""" + +import subprocess +from os import environ + +import pytest +from prefect import flow +from prefect.testing.utilities import prefect_test_harness +from pydantic import SecretStr + +from osw.utils.workflow import ( + DeployConfig, + DeployParam, + NotifyTeams, + NotifyTeamsParam, + _deploy, + deploy, + tags_str_to_list, +) + + +# ------------------------------ NOTIFICATIONS --------------------- +@flow( + # Microsoft Teams notification on completion for testing + # Notification only if env var + on_completion=[ + NotifyTeams( + NotifyTeamsParam( + teams_webhook_url=SecretStr(environ.get("TEAMS_WEBHOOK_URL")), + # OPTIONAL, will be empty if no deploment is assigned + deployment_name="osw-python-notify-teams-test", + ) + ).notify_teams + ], + log_prints=True, +) +@flow +def osw_python_teams_notify_test_flow(): + """Notify Microsoft Teams channel using a webhook""" + return 42 + + +def test_notify_teams(): + """Test of flow to notify Microsoft Teams channel using a webhook""" + with prefect_test_harness(): + test_flow_run = osw_python_teams_notify_test_flow() + assert test_flow_run == 42 + + +# ------------------------------- DEPLOYMENTS ------------------------------- +def test_tags_str_to_list(tags="osw-python,example-deploy-flow"): + """Test of conversion of tags string to list""" + assert tags_str_to_list("osw-python,example-deploy-flow") == [ + "osw-python", + "example-deploy-flow", + ] + + +@flow +def osw_python_test_flow_to_deploy(): + """Example flow to be deployed""" + print(f"Execution of example: {osw_python_test_flow_to_deploy.__name__}!") + + +deploy_param = DeployParam( + deployments=[ + DeployConfig( + flow=osw_python_test_flow_to_deploy, + name="osw-python-deployment-test", + description="Deployment of osw-python test flow", + version="0.0.1", + tags=["osw-python", "example-deploy-flow"], + ) + ], + # remove_existing_deployments=True, +) + + +@pytest.mark.asyncio +async def test_deploy_serve(): + """Test of deployment of example flow""" + with prefect_test_harness(): + _deploy(param=deploy_param) + + +@pytest.mark.skip(reason="Not deployable in test environment") +def test_deploy_runner(): + """Test of deployment of example flow""" + with prefect_test_harness(): + # test environment shell: + command = "prefect config set PREFECT_API_URL=http://127.0.0.1:8443/api" + subprocess.run(command, shell=True) + # Not deployable in local test environment + deploy(param=deploy_param) From cfa7ad7d700dd9c86da5c4cf654938ef75d4d05b Mon Sep 17 00:00:00 2001 From: Andreas Raeder Date: Fri, 28 Feb 2025 07:26:01 +0000 Subject: [PATCH 4/4] fix trigger tests by naming testfile including key test --- tests/utils/{workflow.py => workflow_test.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/utils/{workflow.py => workflow_test.py} (100%) diff --git a/tests/utils/workflow.py b/tests/utils/workflow_test.py similarity index 100% rename from tests/utils/workflow.py rename to tests/utils/workflow_test.py