diff --git a/examples/prefect/deploy_flow.py b/examples/prefect/deploy_flow.py new file mode 100644 index 00000000..ca57463f --- /dev/null +++ b/examples/prefect/deploy_flow.py @@ -0,0 +1,63 @@ +"""Example usage of deploy function""" + +from datetime import timedelta +from os import environ + +from prefect import flow +from prefect.artifacts import create_table_artifact + +from osw.utils.workflow import DeployConfig, DeployParam, deploy, tags_str_to_list + +# Set environment variables +environ["PREFECT_DEPLOYMENT_NAME"] = "osw-python-deploy-example" +environ["PREFECT_DEPLOYMENT_DESCRIPTION"] = "Deployment of notify_teams.py" +environ["PREFECT_DEPLOYMENT_VERSION"] = "0.0.1" +environ["PREFECT_DEPLOYMENT_TAGS"] = "osw-python,example-deploy-flow" +environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"] = "1" +# environ["PREFECT_DEPLOYMENT_CRON"] = "* * * * *" + + +@flow(log_prints=True) +def example_flow_to_deploy(): + """Example flow to be deployed""" + print(f"Execution of example: {example_flow_to_deploy.__name__}!") + # set example table artifact + create_table_artifact( + key="example-table", + table=[ + {"name": "Alice", "age": 24}, + {"name": "Bob", "age": 25}, + ], + description="Example table artifact", + ) + + +if __name__ == "__main__": + """Deploy the example flow""" + # Example using environment variables + deploy( + DeployParam( + deployments=[ + DeployConfig( + flow=example_flow_to_deploy, + name=environ.get("PREFECT_DEPLOYMENT_NAME"), + description=environ.get("PREFECT_DEPLOYMENT_DESCRIPTION"), + version=environ.get("PREFECT_DEPLOYMENT_VERSION"), + tags=tags_str_to_list(environ.get("PREFECT_DEPLOYMENT_TAGS")), + interval=timedelta( + minutes=int(environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN")) + ), # either interval or cron + # cron=environ.get("PREFECT_DEPLOYMENT_CRON"), + ) + ], + # remove_existing_deployments=True, + ) + ) + + # Clear secret environment variables + del environ["PREFECT_DEPLOYMENT_NAME"] + del environ["PREFECT_DEPLOYMENT_DESCRIPTION"] + del environ["PREFECT_DEPLOYMENT_VERSION"] + del environ["PREFECT_DEPLOYMENT_TAGS"] + del environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"] + # del environ["PREFECT_DEPLOYMENT_CRON"] diff --git a/examples/prefect/notify_teams.py b/examples/prefect/notify_teams.py new file mode 100644 index 00000000..3f1fbf88 --- /dev/null +++ b/examples/prefect/notify_teams.py @@ -0,0 +1,43 @@ +"""Example of sending notifications to MS Teams on prefect flow failures""" + +from os import environ + +from prefect import flow +from pydantic import SecretStr + +from osw.utils.workflow import NotifyTeams, NotifyTeamsParam + +# Prerequisite: Set environment variable TEAMS_WEBHOOK_URL +# in CLI: export TEAMS_WEBHOOK_URL="https://prod..." +# in python uncomment below, DO NOT PUSH SECRETS TO GIT + +# environ["TEAMS_WEBHOOK_URL"] = "https://prod..." + + +# Decorator must be configured with on_failure argument +@flow( + # Microsoft Teams notification on failure -> + # on_failure use `notify_teams` function without brackets as list element + on_failure=[ + NotifyTeams( + NotifyTeamsParam( + teams_webhook_url=SecretStr(environ.get("TEAMS_WEBHOOK_URL")), + # OPTIONAL, will be empty if no deploment is assigned + deployment_name="osw-python-notify-teams-example", + ) + ).notify_teams + ], + log_prints=True, +) +def example_error_flow(): + """Test flow that always fails""" + + raise ValueError( + "oops! LOREM IPSUM DOLOR SIT AMET CONSECTETUR ADIPISICING ELIT " * 1 + ) + + +if __name__ == "__main__": + example_error_flow() + # Clear secret environment variable + del environ["TEAMS_WEBHOOK_URL"] diff --git a/setup.cfg b/setup.cfg index 5f8ce476..fa544bc9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -111,6 +111,8 @@ dev = # Add here test requirements (semicolon/line-separated) testing = + pytest-asyncio + prefect setuptools pytest pytest-cov @@ -147,6 +149,8 @@ norecursedirs = build .tox testpaths = tests +asyncio_default_fixture_loop_scope = function + # Use pytest markers to select/deselect specific tests # markers = # slow: mark tests as slow (deselect with '-m "not slow"') diff --git a/src/osw/utils/prefect.py b/src/osw/utils/prefect.py deleted file mode 100644 index 005c2c95..00000000 --- a/src/osw/utils/prefect.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Prefect utils as support for OpenSemanticWorld.""" - -from prefect.blocks.notifications import MicrosoftTeamsWebhook -from prefect.client.schemas.objects import FlowRun -from prefect.settings import PREFECT_API_URL -from prefect.states import State -from pydantic import BaseModel, SecretStr - - -class NotifyTeamsParam(BaseModel): - """Parameter set for notifying Microsoft Teams using class NotifyTeams""" - - deployment_name: str - teams_webhook_url: SecretStr - - -class NotifyTeams(NotifyTeamsParam): - """Notify Microsoft Teams channel using a webhook""" - - def __init__(self, notify_teams_param: NotifyTeamsParam): - super().__init__(**notify_teams_param.model_dump()) - - def notify_teams( - self, - flow, - flow_run: FlowRun, - state: State, - ): - - host_url = str(PREFECT_API_URL.value()).replace("/api", "") - - if flow_run.deployment_id is not None: - deployment_url = ( - f"{host_url}/deployments/deployment/{flow_run.deployment_id}" - ) - else: - deployment_url = "" - _flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa - if self.deployment_name == "" or self.deployment_name is None: - _deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa - else: - _deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa - _ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" # noqa - if flow_run.tags != []: - _tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n" - else: - _tags = "" - - _message = f"📜 Message:\n\n_`{state.message}`_" - - MicrosoftTeamsWebhook(url=self.teams_webhook_url.get_secret_value()).notify( - body=(_flow_run + _deployment + _ts + _tags + _message) - ) diff --git a/src/osw/utils/workflow.py b/src/osw/utils/workflow.py new file mode 100644 index 00000000..a9be8dbb --- /dev/null +++ b/src/osw/utils/workflow.py @@ -0,0 +1,210 @@ +"""Prefect utils as support for OpenSemanticWorld.""" + +import asyncio +import re +import sys +from datetime import timedelta +from importlib.metadata import version +from inspect import signature +from typing import Any, Dict, Iterable, List, Optional, Union + +from packaging.specifiers import SpecifierSet +from prefect import Flow, serve +from prefect.blocks.notifications import MicrosoftTeamsWebhook +from prefect.client.schemas.objects import FlowRun +from prefect.settings import PREFECT_API_URL +from prefect.states import State +from pydantic import SecretStr +from pydantic.v1 import BaseModel + + +# ------------------------------ NOTIFICATIONS --------------------- +class NotifyTeamsParam(BaseModel): + """Parameter set for notifying Microsoft Teams using class NotifyTeams""" + + teams_webhook_url: SecretStr + """Microsoft Teams webhook URL containing a secret""" + deployment_name: Optional[str] = None + """Deployment name to be displayed in the notification""" + + # allow arbitrary types for compatibility with pydantic v1 + class Config: + arbitrary_types_allowed = True + + +class NotifyTeams(NotifyTeamsParam): + """Notify Microsoft Teams channel using a webhook""" + + def __init__(self, notify_teams_param: NotifyTeamsParam): + # super().__init__(**notify_teams_param.model_dump()) # pydantic v2 + super().__init__(**notify_teams_param.dict()) # pydantic v1 + + def notify_teams( + self, + flow, + flow_run: FlowRun, + state: State, + ): + + host_url = str(PREFECT_API_URL.value()).replace("/api", "") + + _flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa + + if flow_run.deployment_id is not None: + # Assigned deployment found + deployment_url = ( + f"{host_url}/deployments/deployment/{flow_run.deployment_id}" + ) + if self.deployment_name == "" or self.deployment_name is None: + _deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa + else: + _deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa + else: + # No deployment assigned + _deployment = "🚀 Deployment: _Just flow, no deployment_\n\n" + + _ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" # noqa + if flow_run.tags != []: + _tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n" + else: + _tags = "" + + if state.message is None: + _message = "No message provided." + else: + _message = f"📜 Message:\n\n_`{state.message}`_" + + MicrosoftTeamsWebhook( + url=str(self.teams_webhook_url.get_secret_value()) + ).notify(body=(_flow_run + _deployment + _ts + _tags + _message)) + + +# ------------------------------- DEPLOYMENTS ------------------------------- +def tags_str_to_list(tags: str) -> List[str]: + """Remove tags whitespaces, newlines, tabs, empty strings, split comma""" + return list(filter(None, re.sub(r"\s+", "", tags).split(","))) + + +# def filter_arguments(func, args_dict): +# """Filter arguments for a function based on its signature""" +# sig = signature(func) +# valid_params = sig.parameters +# filtered_args = {k: v for k, v in args_dict.items() if k in valid_params} +# return filtered_args + + +def match_func_model_args(func, model: BaseModel) -> dict: + """Match function arguments with model attributes""" + valid_params = set(signature(func).parameters) + # model_attrs = model.model_dump().items() # pydantic v2 + model_attrs = model.dict().items() # pydantic v1 + matched_args = {k: v for k, v in model_attrs if k in valid_params} + return matched_args + + +class DeployConfig(BaseModel): + """Prefect deployment configuration""" + + flow: Flow # to be excluded in `flow.to_deployment()` function + # Union instead of | for compatibility with pydantic v1, python < 3.10 + name: Union[str, None] = None + description: Union[str, None] = None + interval: Union[ + Iterable[Union[int, float, timedelta]], int, float, timedelta, None + ] = None + cron: Union[Iterable[str], str, None] = None + version: Union[str, None] = None + tags: Union[List[str], None] = None + rrule: Union[Iterable[str], str, None] = None + paused: Union[bool, None] = None + is_schedule_active: Union[bool, None] = None + parameters: Union[dict, None] = None + enforce_parameter_schema: bool = False + work_pool_name: Union[str, None] = None + work_queue_name: Union[str, None] = None + job_variables: Union[Dict[str, Any], None] = None + deployment_id: Union[str, None] = None + + class Config: + arbitrary_types_allowed = True + + +class DeployParam(BaseModel): + """Parameter set for deploying flows as deployments""" + + deployments: List[DeployConfig] + """List of deployments to be served""" + # TODO: Implement remove_existing_deployments + remove_existing_deployments: Optional[bool] = False + """Will remove existing deployments of the specified flows/software""" + # TODO: Add parameter for OSW support in next version + + +async def _deploy(param: DeployParam): + """programmatic deployment supported in newer prefect versions + This should become part of osw-python + """ + + deployments = [] + + for deploy_config in param.deployments: + flow: Flow = deploy_config.flow + # Set deployment name if not provided + if deploy_config.name is None or deploy_config.name == "": + deploy_config.name = flow.name + "-deployment" + + # Match valid args of flow.to_deployment and deploy_config + kwargs = match_func_model_args(func=flow.to_deployment, model=deploy_config) + # Set config via matching flow.to_deployment arguments + config = await flow.to_deployment(**kwargs) + await config.apply() # returns the deployment_uuid + + deployments.append(config) + + if version("prefect") in SpecifierSet(">=3.0"): + print(f"prefect version IF: {version('prefect')}") + # return deployments + await serve(*deployments) + else: + print(f"prefect version ELSE: {version('prefect')}") + await serve(*deployments) + + +def deploy(param: DeployParam): + """Function to serve configured flows as deployments by python version.""" + if sys.version_info >= (3, 11): + print(f"python version IF: {sys.version_info}") + # python >= 3.11 + with asyncio.Runner() as runner: + runner.run(_deploy(param)) + else: + # python < 3.11 + print(f"python version ELSE: {sys.version_info}") + asyncio.run(_deploy(param)) + + +# # ------------------------------- TEST ------------------------------- +# from prefect import flow + + +# @flow +# def osw_python_test_flow_to_deploy(): +# """Example flow to be deployed""" +# print(f"Execution of example: {osw_python_test_flow_to_deploy.__name__}!") + + +# if __name__ == "__main__": +# deploy( +# DeployParam( +# deployments=[ +# DeployConfig( +# flow=osw_python_test_flow_to_deploy, +# name="osw-python-deployment-test", +# description="Deployment of osw-python test flow", +# version="0.0.1", +# tags=["osw-python", "example-deploy-flow"], +# ) +# ], +# # remove_existing_deployments=True, +# ) +# ) diff --git a/tests/utils/workflow_test.py b/tests/utils/workflow_test.py new file mode 100644 index 00000000..c99d8e57 --- /dev/null +++ b/tests/utils/workflow_test.py @@ -0,0 +1,94 @@ +"""Test workflows for `osw-python` package""" + +import subprocess +from os import environ + +import pytest +from prefect import flow +from prefect.testing.utilities import prefect_test_harness +from pydantic import SecretStr + +from osw.utils.workflow import ( + DeployConfig, + DeployParam, + NotifyTeams, + NotifyTeamsParam, + _deploy, + deploy, + tags_str_to_list, +) + + +# ------------------------------ NOTIFICATIONS --------------------- +@flow( + # Microsoft Teams notification on completion for testing + # Notification only if env var + on_completion=[ + NotifyTeams( + NotifyTeamsParam( + teams_webhook_url=SecretStr(environ.get("TEAMS_WEBHOOK_URL")), + # OPTIONAL, will be empty if no deploment is assigned + deployment_name="osw-python-notify-teams-test", + ) + ).notify_teams + ], + log_prints=True, +) +@flow +def osw_python_teams_notify_test_flow(): + """Notify Microsoft Teams channel using a webhook""" + return 42 + + +def test_notify_teams(): + """Test of flow to notify Microsoft Teams channel using a webhook""" + with prefect_test_harness(): + test_flow_run = osw_python_teams_notify_test_flow() + assert test_flow_run == 42 + + +# ------------------------------- DEPLOYMENTS ------------------------------- +def test_tags_str_to_list(tags="osw-python,example-deploy-flow"): + """Test of conversion of tags string to list""" + assert tags_str_to_list("osw-python,example-deploy-flow") == [ + "osw-python", + "example-deploy-flow", + ] + + +@flow +def osw_python_test_flow_to_deploy(): + """Example flow to be deployed""" + print(f"Execution of example: {osw_python_test_flow_to_deploy.__name__}!") + + +deploy_param = DeployParam( + deployments=[ + DeployConfig( + flow=osw_python_test_flow_to_deploy, + name="osw-python-deployment-test", + description="Deployment of osw-python test flow", + version="0.0.1", + tags=["osw-python", "example-deploy-flow"], + ) + ], + # remove_existing_deployments=True, +) + + +@pytest.mark.asyncio +async def test_deploy_serve(): + """Test of deployment of example flow""" + with prefect_test_harness(): + _deploy(param=deploy_param) + + +@pytest.mark.skip(reason="Not deployable in test environment") +def test_deploy_runner(): + """Test of deployment of example flow""" + with prefect_test_harness(): + # test environment shell: + command = "prefect config set PREFECT_API_URL=http://127.0.0.1:8443/api" + subprocess.run(command, shell=True) + # Not deployable in local test environment + deploy(param=deploy_param)