diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index b89265648..b70ab1832 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -8,7 +8,7 @@ from datetime import datetime, timezone from os import listdir from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any, Callable import aiohttp from aleph_message.models import ( @@ -25,7 +25,6 @@ from pydantic import BaseModel from starlette.responses import JSONResponse -from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.chains.remote import RemoteAccount from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.query.filters import MessageFilter @@ -33,9 +32,177 @@ from aleph.sdk.vm.app import AlephApp from aleph.sdk.vm.cache import VmCache +if TYPE_CHECKING: + from aleph.sdk.vm.cache import VmCache + + logger = logging.getLogger(__name__) logger.debug("imports done") +# API Failover code - commented out for testing +ALEPH_API_HOSTS: list[str] = [ + "https://official.aleph.cloud", + "https://api.aleph.im", +] + +DEFAULT_TIMEOUT_SECONDS = 10 + + +class APIFailoverError(Exception): + """Raised when all API endpoints fail.""" + + pass + + +async def _safe_request_with_timeout( + host: str, + request_func: Callable[[str], Any], + timeout_seconds: float, +) -> dict[str, Any]: + """Safely execute a request function with timeout and error handling.""" + try: + result = await asyncio.wait_for( + request_func(host), + timeout=timeout_seconds, + ) + return {"host": host, "result": result} + except asyncio.TimeoutError as e: + logger.warning(f"Request to {host} timed out after {timeout_seconds}s") + return {"host": host, "error": e} + except aiohttp.ClientError as e: + logger.warning(f"Client error connecting to {host}: {e}") + return {"host": host, "error": e} + except Exception as e: + logger.error(f"Unexpected error connecting to {host}: {e}", exc_info=True) + return {"host": host, "error": e} + + +async def try_api_hosts( + api_hosts: list[str], + request_func: Callable[[str], Any], + timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS, + success_check: Callable[[Any], bool] | None = None, +) -> Any: + """Try multiple API hosts in parallel and return the first successful result.""" + if not api_hosts: + raise APIFailoverError("No API hosts provided") + + # Create tasks for all API hosts with individual timeouts + tasks: set[asyncio.Task] = set() + for host in api_hosts: + try: + task = asyncio.create_task( + _safe_request_with_timeout( + host=host, + request_func=request_func, + timeout_seconds=timeout_seconds, + ) + ) + tasks.add(task) + except Exception as e: + logger.warning(f"Failed to create task for host {host}: {e}") + continue + + if not tasks: + raise APIFailoverError("Failed to create any API request tasks") + + errors: list[tuple[str, Exception]] = [] + + try: + # Wait for tasks to complete one by one + while tasks: + done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + result = await task + host = result.get("host", "unknown") + + # Check if this is an error result + if "error" in result: + error = result["error"] + logger.warning(f"API request to {host} failed: {error}") + errors.append((host, error)) + continue + + # Extract the actual result + actual_result = result.get("result") + + # Validate the result if a success check is provided + if success_check and not success_check(actual_result): + logger.warning(f"API request to {host} returned invalid result") + errors.append((host, ValueError("Invalid result from API"))) + continue + + # Success! Cancel remaining tasks and return + logger.info(f"Successfully connected to API host: {host}") + for remaining_task in tasks: + remaining_task.cancel() + try: + await remaining_task + except asyncio.CancelledError: + pass + + return actual_result + + except asyncio.CancelledError: + continue + except Exception as e: + logger.error(f"Unexpected error processing task result: {e}", exc_info=True) + errors.append(("unknown", e)) + continue + + finally: + # Ensure all tasks are cancelled + for task in tasks: + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # If we get here, all hosts failed + error_summary = "; ".join([f"{host}: {str(error)}" for host, error in errors]) + raise APIFailoverError(f"All API hosts failed. Errors: {error_summary}") + + +async def try_url_check( + urls: list[str], + timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS, + socket_family: int | None = None, +) -> dict[str, Any]: + """Check multiple URLs in parallel and return the first successful connection.""" + + async def check_single_url(url: str) -> dict[str, Any]: + timeout = aiohttp.ClientTimeout(total=timeout_seconds) + connector_kwargs = {} + if socket_family is not None: + connector_kwargs["family"] = socket_family + + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(**connector_kwargs), + timeout=timeout, + ) as session: + async with session.get(url) as resp: + return {"result": True, "status": resp.status, "url": url, "headers": dict(resp.headers)} + + try: + result = await try_api_hosts( + api_hosts=urls, + request_func=check_single_url, + timeout_seconds=timeout_seconds, + success_check=lambda r: r.get("result") is True, + ) + return result + except APIFailoverError as e: + logger.warning(f"All URL checks failed: {e}") + return {"result": False, "reason": str(e)} + except Exception as e: + logger.error(f"Unexpected error in URL check: {e}", exc_info=True) + return {"result": False, "reason": str(e)} + + http_app = FastAPI() app = AlephApp(http_app=http_app) app.add_middleware( @@ -46,22 +213,30 @@ allow_headers=["*"], ) -# Initialize cache on startup event to avoid running loop errors +# Initialize cache as None - will be created during startup event cache: VmCache | None = None - startup_lifespan_executed: bool = False -ALEPH_API_HOSTS: list[str] = [ - "https://official.aleph.cloud", - "https://api.aleph.im", -] +cache = VmCache() @app.on_event("startup") async def startup_event() -> None: - global startup_lifespan_executed, cache + global startup_lifespan_executed startup_lifespan_executed = True - cache = VmCache() + + +def get_cache(): + """Get or create the cache instance on-demand (lazy import + lazy initialization).""" + global cache + if cache is None: + try: + cache = VmCache() + logger.info("Cache initialized on-demand") + except Exception as e: + logger.warning(f"Failed to initialize cache: {e}") + # Don't retry - return None so endpoint can handle it + return cache @app.get("/") @@ -124,40 +299,13 @@ async def environ() -> dict[str, str]: return dict(os.environ) -async def get_aleph_messages(api_host: str, message_filter: MessageFilter): - async with AlephHttpClient(api_server=api_host) as client: - data = await client.get_messages(message_filter=message_filter) - return data.dict() - - @app.get("/messages") async def read_aleph_messages() -> dict[str, MessagesResponse]: """Read data from Aleph using the Aleph Client library.""" - message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(get_aleph_messages(host, message_filter)) for host in ALEPH_API_HOSTS - } - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result.get("messages", None): - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return {"Messages": result} - else: - failures.append(result) - continue - - # No Aleph API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + async with AlephHttpClient() as client: + message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) + data = await client.get_messages(message_filter=message_filter) + return {"Messages": data} @app.get("/dns") @@ -199,107 +347,38 @@ async def ip_address(): @app.get("/ip/4") async def connect_ipv4(): - """Connect to some DNS services using their IPv4 address. - The webserver on that address can return a 404 error, and it is normal, so we accept that response code. - """ ipv4_hosts: list[str] = [ - "https://9.9.9.9", # Quad9 VPN service + "https://9.9.9.9", # Quad9 DNS service + "https://1.1.1.1", # Cloudflare DNS service "https://94.140.14.14", # AdGuard DNS service - "https://208.67.222.222", # OpenDNS service ] - timeout_seconds = 5 - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET, accept_404=True)) - for host in ipv4_hosts - } - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result["result"]: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return result - else: - failures.append(result) - continue - - # No IPv6 URL was reachable, return the collected failure reasons - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + return await try_url_check(urls=ipv4_hosts, timeout_seconds=5, socket_family=socket.AF_INET) @app.get("/ip/6") async def connect_ipv6(): - """Connect to some DNS services using their IPv6 address. - The webserver on that address can return a 404 error, and it is normal, so we accept that response code. + """Connect to IPv6 DNS providers using their IPv6 addresses. + Tests IPv6 connectivity by trying multiple providers in parallel. """ ipv6_hosts: list[str] = [ "https://[2620:fe::fe]", # Quad9 DNS service - "https://[2606:4700:4700::1111]", # CloudFlare DNS service - "https://[2620:0:ccc::2]", # OpenDNS service + "https://[2606:4700:4700::1111]", # Cloudflare DNS service ] - timeout_seconds = 5 - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET6, accept_404=True)) - for host in ipv6_hosts - } - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result["result"]: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return result - else: - failures.append(result) - continue - - # No IPv6 URL was reachable, return the collected failure reasons - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + return await try_url_check(urls=ipv6_hosts, timeout_seconds=5, socket_family=socket.AF_INET6) -async def check_url( - internet_host: str, timeout_seconds: int = 5, socket_family=socket.AF_INET, accept_404: bool = False -): +async def check_url(internet_host: str, timeout_seconds: int = 5, socket_family=socket.AF_INET): """Check the connectivity of a single URL.""" timeout = aiohttp.ClientTimeout(total=timeout_seconds) tcp_connector = aiohttp.TCPConnector(family=socket_family) async with aiohttp.ClientSession(timeout=timeout, connector=tcp_connector) as session: try: async with session.get(internet_host) as resp: - if 200 <= resp.status < 300: - return {"result": True, "headers": resp.headers, "url": internet_host} - - if resp.status == 404 and accept_404: - return {"result": True, "headers": resp.headers, "url": internet_host} - - reason = f"HTTP Status {resp.status}" - logger.warning(f"Session connection for host {internet_host} failed with status {resp.status}") - return {"result": False, "url": internet_host, "reason": reason} - except (aiohttp.ClientConnectionError, TimeoutError) as e: - reason = f"{type(e).__name__}" - logger.warning(f"Session connection for host {internet_host} failed ({reason})") - return {"result": False, "url": internet_host, "reason": reason} - except Exception as e: - # Catch other errors not related to timeouts like DNS errors, SSL certificates, etc. - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {internet_host}: {e}", exc_info=True) - return {"result": False, "url": internet_host, "reason": reason} + resp.raise_for_status() + return {"result": resp.status, "headers": resp.headers, "url": internet_host} + except (aiohttp.ClientConnectionError, TimeoutError): + logger.warning(f"Session connection for host {internet_host} failed") + return {"result": False, "url": internet_host} @app.get("/internet") @@ -315,8 +394,6 @@ async def read_internet(): # Create a list of tasks to check the URLs in parallel tasks: set[asyncio.Task] = {asyncio.create_task(check_url(host, timeout_seconds)) for host in internet_hosts} - failures = [] - # While no tasks have completed, keep waiting for the next one to finish while tasks: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) @@ -328,86 +405,83 @@ async def read_internet(): task.cancel() return result else: - failures.append(result) continue - # No URL was reachable, return the collected failure reasons - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + # No URL was reachable + return {"result": False} @app.get("/get_a_message") async def get_a_message(): - """Get a message from the Aleph.im network""" + """Get a message from the Aleph.im network.""" item_hash = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af" - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_message(host, item_hash)) for host in ALEPH_API_HOSTS} - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result.get("item_hash", None): - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return result - else: - failures.append(result) - continue - - # No Aleph API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) - -async def get_aleph_message(api_host: str, item_hash: str): - try: + async def fetch_message(api_host: str): + """Fetch a message from a specific API host.""" async with AlephHttpClient(api_server=api_host) as client: message = await client.get_message( item_hash=item_hash, message_type=ProgramMessage, ) return message.dict() - except Exception as e: - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {api_host}: {e}", exc_info=True) - return {"result": False, "reason": reason} + + try: + result = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=fetch_message, + timeout_seconds=10, + ) + return result + except APIFailoverError as e: + logger.error(f"Failed to fetch message from all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.post("/post_a_message") async def post_with_remote_account(): """Post a message on the Aleph.im network using the remote account of the host.""" - failures = [] try: account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") + except aiohttp.client_exceptions.UnixClientConnectorError: + return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"}) + except Exception as e: + logger.error(f"Failed to create remote account: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"error": f"Failed to create account: {str(e)}"}) - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS + async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: + """Post a message to a specific API host.""" + content = { + "date": datetime.now(tz=timezone.utc).isoformat(), + "test": True, + "answer": 42, + "something": "interesting", } + async with AuthenticatedAlephHttpClient(account=account, api_server=api_host) as client: + message, status = await client.create_post( + post_content=content, + post_type="test", + ref=None, + channel="TEST", + inline=True, + storage_engine=StorageEnum.storage, + sync=True, + ) - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - message, status = done.pop().result() - - if status == MessageStatus.PROCESSED: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return { - "message": message, - } - else: - failures.append(message) - continue - except aiohttp.client_exceptions.UnixClientConnectorError: - failures.append({"error": "Could not connect to the remote account"}) + return message, status - # No API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + try: + message, status = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=post_message, + timeout_seconds=15, + success_check=lambda result: result[1] == MessageStatus.PROCESSED, + ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + return {"message": message} + except APIFailoverError as e: + logger.error(f"Failed to post message to all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.post("/post_a_message_local_account") @@ -415,36 +489,14 @@ async def post_with_local_account(): """Post a message on the Aleph.im network using a local private key.""" from aleph.sdk.chains.ethereum import get_fallback_account - account = get_fallback_account() - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - message, status = done.pop().result() - - if status == MessageStatus.PROCESSED: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return { - "message": message, - } - else: - failures.append(message) - continue - - # No API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) - - -async def send_post_aleph_message(api_host: str, account: RemoteAccount | ETHAccount): - """Post a message on the Aleph.im network using a local or the remote account of the host.""" try: + account = get_fallback_account() + except Exception as e: + logger.error(f"Failed to get fallback account: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"error": f"Failed to create account: {str(e)}"}) + + async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: + """Post a message to a specific API host.""" content = { "date": datetime.now(tz=timezone.utc).isoformat(), "test": True, @@ -452,12 +504,9 @@ async def send_post_aleph_message(api_host: str, account: RemoteAccount | ETHAcc "something": "interesting", } async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, + account=account, api_server=api_host, allow_unix_sockets=False ) as client: - message: PostMessage - status: MessageStatus - return await client.create_post( + message, status = await client.create_post( post_content=content, post_type="test", ref=None, @@ -466,74 +515,79 @@ async def send_post_aleph_message(api_host: str, account: RemoteAccount | ETHAcc storage_engine=StorageEnum.storage, sync=True, ) - except aiohttp.client_exceptions.UnixClientConnectorError as e: - reason = f"{type(e).__name__}" - logger.error(f"Connection error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED - except Exception as e: - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED + return message, status + + try: + message, status = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=post_message, + timeout_seconds=15, + success_check=lambda result: result[1] == MessageStatus.PROCESSED, + ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + return {"message": message} + except APIFailoverError as e: + logger.error(f"Failed to post message to all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.post("/post_a_file") async def post_a_file(): + """Store a file on the Aleph.im network using a local account.""" from aleph.sdk.chains.ethereum import get_fallback_account + """ OLD account = get_fallback_account() file_path = Path(__file__).absolute() - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(send_store_aleph_message(host, account, file_path)) for host in ALEPH_API_HOSTS - } - - failures = [] + async with AuthenticatedAlephHttpClient(account=account) as client: + message, status = await client.create_store( + file_path=file_path, + ref=None, + channel="TEST", + storage_engine=StorageEnum.storage, + sync=True, + ) - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - message, status = done.pop().result() + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) - if status == MessageStatus.PROCESSED: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return { - "message": message, - } - else: - failures.append(message) - continue - - # No API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + return {"message": message} + """ + try: + account = get_fallback_account() + except Exception as e: + logger.error(f"Failed to get fallback account: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"error": f"Failed to create account: {str(e)}"}) + file_path = Path(__file__).absolute() -async def send_store_aleph_message(api_host: str, account: ETHAccount, file_path: Path): - """Store a file on the Aleph.im network using a local account.""" - try: - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: - message: StoreMessage - status: MessageStatus - return await client.create_store( + async def store_file(api_host: str) -> tuple[StoreMessage, MessageStatus]: + """Store a file on a specific API host.""" + async with AuthenticatedAlephHttpClient(account=account, api_server=api_host) as client: + message, status = await client.create_store( file_path=file_path, ref=None, channel="TEST", storage_engine=StorageEnum.storage, sync=True, ) - except aiohttp.client_exceptions.UnixClientConnectorError as e: - reason = f"{type(e).__name__}" - logger.error(f"Connection error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED - except Exception as e: - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED + return message, status + + try: + message, status = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=store_file, + timeout_seconds=20, + success_check=lambda result: result[1] == MessageStatus.PROCESSED, + ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + return {"message": message} + except APIFailoverError as e: + logger.error(f"Failed to store file to all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.get("/sign_a_message") @@ -543,7 +597,7 @@ async def sign_a_message(): from aleph.sdk.chains.ethereum import get_fallback_account account = get_fallback_account() - message = {"hello": "world", "chain": "ETH", "type": "POST", "item_hash": "0x000"} + message = {"hello": "world", "chain": "ETH"} signed_message = await account.sign_message(message) return {"message": signed_message} @@ -551,34 +605,38 @@ async def sign_a_message(): @app.get("/cache/get/{key}") async def get_from_cache(key: str): """Get data in the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) - return await cache.get(key) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + return await cache_instance.get(key) @app.get("/cache/set/{key}/{value}") async def store_in_cache(key: str, value: str): """Store data in the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) - return await cache.set(key, value) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + return await cache_instance.set(key, value) @app.get("/cache/remove/{key}") async def remove_from_cache(key: str): """Store data in the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) - result = await cache.delete(key) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + result = await cache_instance.delete(key) return result == 1 @app.get("/cache/keys") async def keys_from_cache(pattern: str = "*"): """List keys from the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) - return await cache.keys(pattern) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + return await cache_instance.keys(pattern) @app.get("/state/increment") @@ -647,30 +705,29 @@ def platform_pip_freeze() -> list[str]: @app.event(filters=filters) async def aleph_event(event) -> dict[str, str]: - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_json(host)) for host in ALEPH_API_HOSTS} - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - status = done.pop().result() - - if status: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return {"result": "Good"} - else: - continue - - return {"result": "Bad"} + """Handle Aleph events.""" + logger.info(f"Received aleph_event: {event}") + async def check_api_info(api_host: str) -> bool: + """Check if API info endpoint is accessible.""" + try: + timeout = aiohttp.ClientTimeout(total=5) + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(), timeout=timeout) as session: + async with session.get(f"{api_host}/api/v0/info/public.json") as resp: + resp.raise_for_status() + return True + except Exception as e: + logger.warning(f"Failed to check API info at {api_host}: {e}") + return False -async def get_aleph_json(api_host: str) -> bool: try: - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session: - async with session.get(f"{api_host}/api/v0/info/public.json") as resp: - resp.raise_for_status() - return True - except Exception: - return False + await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=check_api_info, + timeout_seconds=5, + success_check=lambda result: result is True, + ) + return {"result": "Good"} + except APIFailoverError as e: + logger.error(f"All API hosts failed for event handling: {e}") + return {"result": "Bad", "error": str(e)} diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index 52b85ee9d..4c8ede1f8 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -37,7 +37,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue. -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.5' 'aleph-message~=1.0.5' 'fastapi~=0.120.1' 'protobuf==6.33.0' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.5' 'fastapi~=0.121.0' 'protobuf==5.28.3' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py ("python -OO") diff --git a/tests/supervisor/test_checkpayment.py b/tests/supervisor/test_checkpayment.py index bf23e1df9..29fc6a2be 100644 --- a/tests/supervisor/test_checkpayment.py +++ b/tests/supervisor/test_checkpayment.py @@ -51,18 +51,22 @@ async def test_enough_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) - mocker.patch("aleph.vm.orchestrator.tasks.is_after_community_wallet_start", return_value=True) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) + mocker.patch("aleph.vm.orchestrator.tasks.is_after_community_wallet_start", new=mocker.AsyncMock(return_value=True)) - loop = asyncio.get_event_loop() pool = VmPool() - mocker.patch("aleph.vm.orchestrator.tasks.get_stream", return_value=400, autospec=True) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) + mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=mocker.AsyncMock(return_value=400)) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) async def compute_required_flow(executions): return 500 * len(executions) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", compute_required_flow) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=compute_required_flow) message = InstanceContent.model_validate(fake_instance_content) hash = "decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca" @@ -105,18 +109,24 @@ async def test_enough_flow_not_community(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) - mocker.patch("aleph.vm.orchestrator.tasks.is_after_community_wallet_start", return_value=False) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) + mocker.patch( + "aleph.vm.orchestrator.tasks.is_after_community_wallet_start", new=mocker.AsyncMock(return_value=False) + ) - loop = asyncio.get_event_loop() pool = VmPool() - mocker.patch("aleph.vm.orchestrator.tasks.get_stream", return_value=500, autospec=True) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) + mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=mocker.AsyncMock(return_value=500)) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) async def compute_required_flow(executions): return 500 * len(executions) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", compute_required_flow) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=compute_required_flow) message = InstanceContent.model_validate(fake_instance_content) hash = "decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca" @@ -150,13 +160,17 @@ async def test_not_enough_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) - loop = asyncio.get_event_loop() pool = VmPool() - mocker.patch("aleph.vm.orchestrator.tasks.get_stream", return_value=2, autospec=True) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", return_value=5) + mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=mocker.AsyncMock(return_value=2)) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=mocker.AsyncMock(return_value=5)) message = InstanceContent.model_validate(fake_instance_content) mocker.patch.object(VmExecution, "is_running", new=True) @@ -187,7 +201,6 @@ async def test_not_enough_community_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") - loop = asyncio.get_event_loop() pool = VmPool() mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" @@ -198,9 +211,14 @@ async def get_stream(sender, receiver, chain): return 10 mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=get_stream) - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", return_value=5) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=mocker.AsyncMock(return_value=5)) message = InstanceContent.model_validate(fake_instance_content) mocker.patch.object(VmExecution, "is_running", new=True) diff --git a/tests/supervisor/test_execution.py b/tests/supervisor/test_execution.py index 26f1ca9c3..b7be38833 100644 --- a/tests/supervisor/test_execution.py +++ b/tests/supervisor/test_execution.py @@ -65,8 +65,8 @@ async def test_create_execution(mocker): assert isinstance(vm, AlephFirecrackerProgram) assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=300) + await asyncio.wait_for(execution.stop(), timeout=300) # This test depends on having a vm-connector running on port 4021 @@ -109,8 +109,8 @@ async def test_create_execution_online(vm_hash: ItemHash = None): vm.fvm.enable_log = True assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=120) + await asyncio.wait_for(execution.stop(), timeout=60) @pytest.fixture() @@ -245,8 +245,8 @@ async def test_create_execution_from_fake_message(fake_message): vm.fvm.enable_log = True assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=120) + await asyncio.wait_for(execution.stop(), timeout=60) @pytest.mark.asyncio @@ -307,8 +307,8 @@ async def test_create_execution_volume_with_no_name(fake_message): vm.fvm.enable_log = True assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=120) + await asyncio.wait_for(execution.stop(), timeout=60) # This test depends on having a vm-connector running on port 4021 diff --git a/tests/supervisor/test_instance.py b/tests/supervisor/test_instance.py index 9dd7c4db4..12560b634 100644 --- a/tests/supervisor/test_instance.py +++ b/tests/supervisor/test_instance.py @@ -123,7 +123,7 @@ async def test_create_firecracker_instance(mocker): assert vm.persistent assert vm.enable_networking - await execution.start() + await asyncio.wait_for(execution.start(), timeout=120) # firecracker_execution, process = await mock_systemd_manager.enable_and_start(execution.vm_hash) firecracker_execution = mock_systemd_manager.execution assert isinstance(firecracker_execution, MicroVM) @@ -136,5 +136,5 @@ async def test_create_firecracker_instance(mocker): # up and prevent disk corruption await asyncio.sleep(60) firecracker_execution, process = await mock_systemd_manager.stop_and_disable(execution.controller_service) - await execution.stop() + await asyncio.wait_for(execution.stop(), timeout=60) assert firecracker_execution is None diff --git a/tests/supervisor/test_payment.py b/tests/supervisor/test_payment.py index c5cd3369f..8f3c1c545 100644 --- a/tests/supervisor/test_payment.py +++ b/tests/supervisor/test_payment.py @@ -19,7 +19,7 @@ def mock_get_address_balance(mocker): "locked_amount": 4010.008710650127, "credit_balance": 10000, } - mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", return_value=fake) + mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", new=mocker.AsyncMock(return_value=fake)) return fake @@ -46,7 +46,7 @@ async def test_fetch_credit_balance_of_address_empty_response( ): """ """ mocker.patch.object(settings, "API_SERVER", "https://fake.aleph.cloud") - mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", return_value={}) + mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", new=mocker.AsyncMock(return_value={})) balance = await fetch_credit_balance_of_address("0x555559cd833c1dc1735bee4a7416caaE58Facca") assert balance == Decimal("0") @@ -58,7 +58,7 @@ async def test_fetch_balance_of_address_empty_response( ): """ """ mocker.patch.object(settings, "API_SERVER", "https://fake.aleph.cloud") - mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", return_value={}) + mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", new=mocker.AsyncMock(return_value={})) balance = await fetch_balance_of_address("0x555559cd833c1dc1735bee4a7416caaE58Facca") assert balance == Decimal("0") diff --git a/tests/supervisor/test_qemu_instance.py b/tests/supervisor/test_qemu_instance.py index 1b3c56076..1bf9840a1 100644 --- a/tests/supervisor/test_qemu_instance.py +++ b/tests/supervisor/test_qemu_instance.py @@ -114,7 +114,7 @@ async def test_create_qemu_instance(mocker): assert isinstance(vm, AlephQemuInstance) assert vm.vm_id == vm_id - await execution.start() + await asyncio.wait_for(execution.start(), timeout=120) qemu_execution, process = await mock_systemd_manager.enable_and_start(execution.controller_service) assert isinstance(qemu_execution, QemuVM) assert qemu_execution.qemu_process is not None @@ -122,7 +122,7 @@ async def test_create_qemu_instance(mocker): await mock_systemd_manager.stop_and_disable(execution.vm_hash) await qemu_execution.qemu_process.wait() assert qemu_execution.qemu_process.returncode is not None - await execution.stop() + await asyncio.wait_for(execution.stop(), timeout=60) settings.LINUX_PATH = original_linux_path @@ -208,14 +208,14 @@ async def test_create_qemu_instance_online(mocker): assert isinstance(vm, AlephQemuInstance) assert vm.vm_id == vm_id - await execution.start() + await asyncio.wait_for(execution.start(), timeout=120) qemu_execution = mock_systemd_manager.execution assert isinstance(qemu_execution, QemuVM) assert qemu_execution.qemu_process is not None await execution.init_task assert execution.init_task.result() is True, "VM failed to start" qemu_execution, process = await mock_systemd_manager.stop_and_disable(execution.vm_hash) - await execution.stop() + await asyncio.wait_for(execution.stop(), timeout=60) assert qemu_execution is None settings.LINUX_PATH = original_linux_path