From 75764784d3c92d7f072be53232ee8d5b4bb725b6 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Wed, 5 Nov 2025 16:36:24 +0100 Subject: [PATCH 01/16] WIP: FIX CI --- tests/supervisor/test_checkpayment.py | 60 +++++++++++++++++---------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/tests/supervisor/test_checkpayment.py b/tests/supervisor/test_checkpayment.py index bf23e1df9..29fc6a2be 100644 --- a/tests/supervisor/test_checkpayment.py +++ b/tests/supervisor/test_checkpayment.py @@ -51,18 +51,22 @@ async def test_enough_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) - mocker.patch("aleph.vm.orchestrator.tasks.is_after_community_wallet_start", return_value=True) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) + mocker.patch("aleph.vm.orchestrator.tasks.is_after_community_wallet_start", new=mocker.AsyncMock(return_value=True)) - loop = asyncio.get_event_loop() pool = VmPool() - mocker.patch("aleph.vm.orchestrator.tasks.get_stream", return_value=400, autospec=True) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) + mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=mocker.AsyncMock(return_value=400)) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) async def compute_required_flow(executions): return 500 * len(executions) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", compute_required_flow) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=compute_required_flow) message = InstanceContent.model_validate(fake_instance_content) hash = "decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca" @@ -105,18 +109,24 @@ async def test_enough_flow_not_community(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) - mocker.patch("aleph.vm.orchestrator.tasks.is_after_community_wallet_start", return_value=False) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) + mocker.patch( + "aleph.vm.orchestrator.tasks.is_after_community_wallet_start", new=mocker.AsyncMock(return_value=False) + ) - loop = asyncio.get_event_loop() pool = VmPool() - mocker.patch("aleph.vm.orchestrator.tasks.get_stream", return_value=500, autospec=True) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) + mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=mocker.AsyncMock(return_value=500)) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) async def compute_required_flow(executions): return 500 * len(executions) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", compute_required_flow) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=compute_required_flow) message = InstanceContent.model_validate(fake_instance_content) hash = "decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca" @@ -150,13 +160,17 @@ async def test_not_enough_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) - loop = asyncio.get_event_loop() pool = VmPool() - mocker.patch("aleph.vm.orchestrator.tasks.get_stream", return_value=2, autospec=True) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", return_value=5) + mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=mocker.AsyncMock(return_value=2)) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=mocker.AsyncMock(return_value=5)) message = InstanceContent.model_validate(fake_instance_content) mocker.patch.object(VmExecution, "is_running", new=True) @@ -187,7 +201,6 @@ async def test_not_enough_community_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") - loop = asyncio.get_event_loop() pool = VmPool() mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" @@ -198,9 +211,14 @@ async def get_stream(sender, receiver, chain): return 10 mocker.patch("aleph.vm.orchestrator.tasks.get_stream", new=get_stream) - mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) - mocker.patch("aleph.vm.orchestrator.tasks.get_message_status", return_value=MessageStatus.PROCESSED) - mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", return_value=5) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_community_wallet_address", + new=mocker.AsyncMock(return_value=mock_community_wallet_address), + ) + mocker.patch( + "aleph.vm.orchestrator.tasks.get_message_status", new=mocker.AsyncMock(return_value=MessageStatus.PROCESSED) + ) + mocker.patch("aleph.vm.orchestrator.tasks.compute_required_flow", new=mocker.AsyncMock(return_value=5)) message = InstanceContent.model_validate(fake_instance_content) mocker.patch.object(VmExecution, "is_running", new=True) From c27b7992e9d2c2aa49e09e6c162fe67958b74380 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Wed, 5 Nov 2025 17:41:46 +0100 Subject: [PATCH 02/16] add exec timeout to see at which level the test hang --- tests/supervisor/test_execution.py | 16 ++++++++-------- tests/supervisor/test_instance.py | 4 ++-- tests/supervisor/test_qemu_instance.py | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/supervisor/test_execution.py b/tests/supervisor/test_execution.py index 26f1ca9c3..3cf9a8030 100644 --- a/tests/supervisor/test_execution.py +++ b/tests/supervisor/test_execution.py @@ -65,8 +65,8 @@ async def test_create_execution(mocker): assert isinstance(vm, AlephFirecrackerProgram) assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=60) + await asyncio.wait_for(execution.stop(), timeout=30) # This test depends on having a vm-connector running on port 4021 @@ -109,8 +109,8 @@ async def test_create_execution_online(vm_hash: ItemHash = None): vm.fvm.enable_log = True assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=120) + await asyncio.wait_for(execution.stop(), timeout=60) @pytest.fixture() @@ -245,8 +245,8 @@ async def test_create_execution_from_fake_message(fake_message): vm.fvm.enable_log = True assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=120) + await asyncio.wait_for(execution.stop(), timeout=60) @pytest.mark.asyncio @@ -307,8 +307,8 @@ async def test_create_execution_volume_with_no_name(fake_message): vm.fvm.enable_log = True assert vm.vm_id == 3 - await execution.start() - await execution.stop() + await asyncio.wait_for(execution.start(), timeout=120) + await asyncio.wait_for(execution.stop(), timeout=60) # This test depends on having a vm-connector running on port 4021 diff --git a/tests/supervisor/test_instance.py b/tests/supervisor/test_instance.py index 9dd7c4db4..12560b634 100644 --- a/tests/supervisor/test_instance.py +++ b/tests/supervisor/test_instance.py @@ -123,7 +123,7 @@ async def test_create_firecracker_instance(mocker): assert vm.persistent assert vm.enable_networking - await execution.start() + await asyncio.wait_for(execution.start(), timeout=120) # firecracker_execution, process = await mock_systemd_manager.enable_and_start(execution.vm_hash) firecracker_execution = mock_systemd_manager.execution assert isinstance(firecracker_execution, MicroVM) @@ -136,5 +136,5 @@ async def test_create_firecracker_instance(mocker): # up and prevent disk corruption await asyncio.sleep(60) firecracker_execution, process = await mock_systemd_manager.stop_and_disable(execution.controller_service) - await execution.stop() + await asyncio.wait_for(execution.stop(), timeout=60) assert firecracker_execution is None diff --git a/tests/supervisor/test_qemu_instance.py b/tests/supervisor/test_qemu_instance.py index 1b3c56076..1bf9840a1 100644 --- a/tests/supervisor/test_qemu_instance.py +++ b/tests/supervisor/test_qemu_instance.py @@ -114,7 +114,7 @@ async def test_create_qemu_instance(mocker): assert isinstance(vm, AlephQemuInstance) assert vm.vm_id == vm_id - await execution.start() + await asyncio.wait_for(execution.start(), timeout=120) qemu_execution, process = await mock_systemd_manager.enable_and_start(execution.controller_service) assert isinstance(qemu_execution, QemuVM) assert qemu_execution.qemu_process is not None @@ -122,7 +122,7 @@ async def test_create_qemu_instance(mocker): await mock_systemd_manager.stop_and_disable(execution.vm_hash) await qemu_execution.qemu_process.wait() assert qemu_execution.qemu_process.returncode is not None - await execution.stop() + await asyncio.wait_for(execution.stop(), timeout=60) settings.LINUX_PATH = original_linux_path @@ -208,14 +208,14 @@ async def test_create_qemu_instance_online(mocker): assert isinstance(vm, AlephQemuInstance) assert vm.vm_id == vm_id - await execution.start() + await asyncio.wait_for(execution.start(), timeout=120) qemu_execution = mock_systemd_manager.execution assert isinstance(qemu_execution, QemuVM) assert qemu_execution.qemu_process is not None await execution.init_task assert execution.init_task.result() is True, "VM failed to start" qemu_execution, process = await mock_systemd_manager.stop_and_disable(execution.vm_hash) - await execution.stop() + await asyncio.wait_for(execution.stop(), timeout=60) assert qemu_execution is None settings.LINUX_PATH = original_linux_path From 1c42d62272ecda67f98f80a35d65d301073fff4d Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Wed, 5 Nov 2025 18:03:39 +0100 Subject: [PATCH 03/16] debug --- tests/supervisor/test_execution.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/supervisor/test_execution.py b/tests/supervisor/test_execution.py index 3cf9a8030..b7be38833 100644 --- a/tests/supervisor/test_execution.py +++ b/tests/supervisor/test_execution.py @@ -65,8 +65,8 @@ async def test_create_execution(mocker): assert isinstance(vm, AlephFirecrackerProgram) assert vm.vm_id == 3 - await asyncio.wait_for(execution.start(), timeout=60) - await asyncio.wait_for(execution.stop(), timeout=30) + await asyncio.wait_for(execution.start(), timeout=300) + await asyncio.wait_for(execution.stop(), timeout=300) # This test depends on having a vm-connector running on port 4021 From 48680387ae13b47d451015977e7af875b1727fe5 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Wed, 5 Nov 2025 19:10:53 +0100 Subject: [PATCH 04/16] fix async mock --- tests/supervisor/test_payment.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/supervisor/test_payment.py b/tests/supervisor/test_payment.py index c5cd3369f..8f3c1c545 100644 --- a/tests/supervisor/test_payment.py +++ b/tests/supervisor/test_payment.py @@ -19,7 +19,7 @@ def mock_get_address_balance(mocker): "locked_amount": 4010.008710650127, "credit_balance": 10000, } - mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", return_value=fake) + mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", new=mocker.AsyncMock(return_value=fake)) return fake @@ -46,7 +46,7 @@ async def test_fetch_credit_balance_of_address_empty_response( ): """ """ mocker.patch.object(settings, "API_SERVER", "https://fake.aleph.cloud") - mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", return_value={}) + mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", new=mocker.AsyncMock(return_value={})) balance = await fetch_credit_balance_of_address("0x555559cd833c1dc1735bee4a7416caaE58Facca") assert balance == Decimal("0") @@ -58,7 +58,7 @@ async def test_fetch_balance_of_address_empty_response( ): """ """ mocker.patch.object(settings, "API_SERVER", "https://fake.aleph.cloud") - mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", return_value={}) + mocker.patch("aleph.vm.orchestrator.payment.get_address_balance", new=mocker.AsyncMock(return_value={})) balance = await fetch_balance_of_address("0x555559cd833c1dc1735bee4a7416caaE58Facca") assert balance == Decimal("0") From dbe12f0b00debb8dee25f8204540946f59dcaeae Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Thu, 6 Nov 2025 13:26:52 +0100 Subject: [PATCH 05/16] Revert "Fix: Replaced CloudFlare DNS service by AdGuard DNS service to prevent French people from having issues." This reverts commit 254635cb438cd50798841777de455834ab8d7f65. --- examples/example_fastapi/main.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index b89265648..ba1f546e7 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -126,7 +126,9 @@ async def environ() -> dict[str, str]: async def get_aleph_messages(api_host: str, message_filter: MessageFilter): async with AlephHttpClient(api_server=api_host) as client: - data = await client.get_messages(message_filter=message_filter) + data = await client.get_messages( + message_filter=message_filter + ) return data.dict() @@ -136,9 +138,7 @@ async def read_aleph_messages() -> dict[str, MessagesResponse]: message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(get_aleph_messages(host, message_filter)) for host in ALEPH_API_HOSTS - } + tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_messages(host, message_filter)) for host in ALEPH_API_HOSTS} failures = [] @@ -204,7 +204,7 @@ async def connect_ipv4(): """ ipv4_hosts: list[str] = [ "https://9.9.9.9", # Quad9 VPN service - "https://94.140.14.14", # AdGuard DNS service + "https://1.1.1.1", # CloudFlare DNS service "https://208.67.222.222", # OpenDNS service ] timeout_seconds = 5 @@ -384,9 +384,7 @@ async def post_with_remote_account(): account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS - } + tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} # While no tasks have completed, keep waiting for the next one to finish while tasks: From 56cf632fe2eec74cd7a435ce7ea09c8106ccb99f Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Thu, 6 Nov 2025 13:27:21 +0100 Subject: [PATCH 06/16] Revert "Fix: Improved error handling on Aleph API requests." This reverts commit 17fe6c7448e736b453d03261537011dcce6d478c. --- examples/example_fastapi/main.py | 181 ++++++++++--------------------- 1 file changed, 60 insertions(+), 121 deletions(-) diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index ba1f546e7..577c12774 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -59,7 +59,7 @@ @app.on_event("startup") async def startup_event() -> None: - global startup_lifespan_executed, cache + global startup_lifespan_executed startup_lifespan_executed = True cache = VmCache() @@ -124,40 +124,13 @@ async def environ() -> dict[str, str]: return dict(os.environ) -async def get_aleph_messages(api_host: str, message_filter: MessageFilter): - async with AlephHttpClient(api_server=api_host) as client: - data = await client.get_messages( - message_filter=message_filter - ) - return data.dict() - - @app.get("/messages") async def read_aleph_messages() -> dict[str, MessagesResponse]: """Read data from Aleph using the Aleph Client library.""" - message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_messages(host, message_filter)) for host in ALEPH_API_HOSTS} - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result.get("messages", None): - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return {"Messages": result} - else: - failures.append(result) - continue - - # No Aleph API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + async with AlephHttpClient() as client: + message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) + data = await client.get_messages(message_filter=message_filter) + return {"Messages": data} @app.get("/dns") @@ -342,52 +315,43 @@ async def get_a_message(): # Create a list of tasks to check the URLs in parallel tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_message(host, item_hash)) for host in ALEPH_API_HOSTS} - failures = [] - # While no tasks have completed, keep waiting for the next one to finish while tasks: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) result = done.pop().result() - if result.get("item_hash", None): + if result["status"]: # The task was successful, cancel the remaining tasks and return the result for task in tasks: task.cancel() return result else: - failures.append(result) continue - # No Aleph API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + # No API Host was reachable + return {"result": False} async def get_aleph_message(api_host: str, item_hash: str): - try: - async with AlephHttpClient(api_server=api_host) as client: - message = await client.get_message( - item_hash=item_hash, - message_type=ProgramMessage, - ) - return message.dict() - except Exception as e: - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {api_host}: {e}", exc_info=True) - return {"result": False, "reason": reason} + async with AlephHttpClient(api_server=api_host) as client: + message = await client.get_message( + item_hash=item_hash, + message_type=ProgramMessage, + ) + return message.dict() @app.post("/post_a_message") async def post_with_remote_account(): """Post a message on the Aleph.im network using the remote account of the host.""" - failures = [] - try: - account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") + account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} + # Create a list of tasks to check the URLs in parallel + tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} - # While no tasks have completed, keep waiting for the next one to finish - while tasks: + # While no tasks have completed, keep waiting for the next one to finish + while tasks: + try: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) message, status = done.pop().result() @@ -399,13 +363,12 @@ async def post_with_remote_account(): "message": message, } else: - failures.append(message) continue - except aiohttp.client_exceptions.UnixClientConnectorError: - failures.append({"error": "Could not connect to the remote account"}) + except aiohttp.client_exceptions.UnixClientConnectorError: + return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"}) # No API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + return JSONResponse(status_code=500, content={"error": status}) @app.post("/post_a_message_local_account") @@ -418,8 +381,6 @@ async def post_with_local_account(): # Create a list of tasks to check the URLs in parallel tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} - failures = [] - # While no tasks have completed, keep waiting for the next one to finish while tasks: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) @@ -433,45 +394,35 @@ async def post_with_local_account(): "message": message, } else: - failures.append(message) continue # No API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + return JSONResponse(status_code=500, content={"error": status}) async def send_post_aleph_message(api_host: str, account: RemoteAccount | ETHAccount): """Post a message on the Aleph.im network using a local or the remote account of the host.""" - try: - content = { - "date": datetime.now(tz=timezone.utc).isoformat(), - "test": True, - "answer": 42, - "something": "interesting", - } - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: - message: PostMessage - status: MessageStatus - return await client.create_post( - post_content=content, - post_type="test", - ref=None, - channel="TEST", - inline=True, - storage_engine=StorageEnum.storage, - sync=True, - ) - except aiohttp.client_exceptions.UnixClientConnectorError as e: - reason = f"{type(e).__name__}" - logger.error(f"Connection error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED - except Exception as e: - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED + content = { + "date": datetime.now(tz=timezone.utc).isoformat(), + "test": True, + "answer": 42, + "something": "interesting", + } + async with AuthenticatedAlephHttpClient( + account=account, + api_server=api_host, + ) as client: + message: PostMessage + status: MessageStatus + return await client.create_post( + post_content=content, + post_type="test", + ref=None, + channel="TEST", + inline=True, + storage_engine=StorageEnum.storage, + sync=True, + ) @app.post("/post_a_file") @@ -486,8 +437,6 @@ async def post_a_file(): asyncio.create_task(send_store_aleph_message(host, account, file_path)) for host in ALEPH_API_HOSTS } - failures = [] - # While no tasks have completed, keep waiting for the next one to finish while tasks: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) @@ -501,37 +450,27 @@ async def post_a_file(): "message": message, } else: - failures.append(message) continue # No API Host was reachable - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + return JSONResponse(status_code=500, content={"error": status}) async def send_store_aleph_message(api_host: str, account: ETHAccount, file_path: Path): """Store a file on the Aleph.im network using a local account.""" - try: - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: - message: StoreMessage - status: MessageStatus - return await client.create_store( - file_path=file_path, - ref=None, - channel="TEST", - storage_engine=StorageEnum.storage, - sync=True, - ) - except aiohttp.client_exceptions.UnixClientConnectorError as e: - reason = f"{type(e).__name__}" - logger.error(f"Connection error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED - except Exception as e: - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {api_host} with account {account}: {e}", exc_info=True) - return {"result": False, "reason": reason}, MessageStatus.REJECTED + async with AuthenticatedAlephHttpClient( + account=account, + api_server=api_host, + ) as client: + message: StoreMessage + status: MessageStatus + return await client.create_store( + file_path=file_path, + ref=None, + channel="TEST", + storage_engine=StorageEnum.storage, + sync=True, + ) @app.get("/sign_a_message") @@ -541,7 +480,7 @@ async def sign_a_message(): from aleph.sdk.chains.ethereum import get_fallback_account account = get_fallback_account() - message = {"hello": "world", "chain": "ETH", "type": "POST", "item_hash": "0x000"} + message = {"hello": "world", "chain": "ETH"} signed_message = await account.sign_message(message) return {"message": signed_message} From dba8103cc676f3c420ec4d29bdd0448fcfac87b1 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Thu, 6 Nov 2025 13:27:59 +0100 Subject: [PATCH 07/16] Revert "Fix: Update dependencies and return failure reasons on url checks to have better debugging." This reverts commit ab6ec3c662ca504c12de476866ad38717604dc39. --- examples/example_fastapi/main.py | 106 ++++-------------- .../create_disk_image.sh | 2 +- 2 files changed, 25 insertions(+), 83 deletions(-) diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index 577c12774..89c3b77bb 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -45,9 +45,7 @@ allow_methods=["*"], allow_headers=["*"], ) - -# Initialize cache on startup event to avoid running loop errors -cache: VmCache | None = None +cache = VmCache() startup_lifespan_executed: bool = False @@ -61,7 +59,6 @@ async def startup_event() -> None: global startup_lifespan_executed startup_lifespan_executed = True - cache = VmCache() @app.get("/") @@ -172,62 +169,34 @@ async def ip_address(): @app.get("/ip/4") async def connect_ipv4(): - """Connect to some DNS services using their IPv4 address. - The webserver on that address can return a 404 error, and it is normal, so we accept that response code. - """ - ipv4_hosts: list[str] = [ - "https://9.9.9.9", # Quad9 VPN service - "https://1.1.1.1", # CloudFlare DNS service - "https://208.67.222.222", # OpenDNS service - ] - timeout_seconds = 5 - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET, accept_404=True)) - for host in ipv4_hosts - } - - failures = [] - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result["result"]: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return result - else: - failures.append(result) - continue - - # No IPv6 URL was reachable, return the collected failure reasons - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + """Connect to the Quad9 VPN provider using their IPv4 address.""" + ipv4_host = "9.9.9.9" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + sock.connect((ipv4_host, 53)) + return {"result": True} + except TimeoutError: + logger.warning(f"Socket connection for host {ipv4_host} failed") + return {"result": False} @app.get("/ip/6") async def connect_ipv6(): - """Connect to some DNS services using their IPv6 address. - The webserver on that address can return a 404 error, and it is normal, so we accept that response code. + """Connect to the Quad9 VPN provider using their IPv6 address. + The webserver on that address returns a 404 error, so we accept that response code. """ ipv6_hosts: list[str] = [ "https://[2620:fe::fe]", # Quad9 DNS service "https://[2606:4700:4700::1111]", # CloudFlare DNS service - "https://[2620:0:ccc::2]", # OpenDNS service ] timeout_seconds = 5 # Create a list of tasks to check the URLs in parallel tasks: set[asyncio.Task] = { - asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET6, accept_404=True)) - for host in ipv6_hosts + asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET6)) for host in ipv6_hosts } - failures = [] - # While no tasks have completed, keep waiting for the next one to finish while tasks: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) @@ -239,40 +208,24 @@ async def connect_ipv6(): task.cancel() return result else: - failures.append(result) continue - # No IPv6 URL was reachable, return the collected failure reasons - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + # No IPv6 URL was reachable + return {"result": False} -async def check_url( - internet_host: str, timeout_seconds: int = 5, socket_family=socket.AF_INET, accept_404: bool = False -): +async def check_url(internet_host: str, timeout_seconds: int = 5, socket_family=socket.AF_INET): """Check the connectivity of a single URL.""" timeout = aiohttp.ClientTimeout(total=timeout_seconds) tcp_connector = aiohttp.TCPConnector(family=socket_family) async with aiohttp.ClientSession(timeout=timeout, connector=tcp_connector) as session: try: async with session.get(internet_host) as resp: - if 200 <= resp.status < 300: - return {"result": True, "headers": resp.headers, "url": internet_host} - - if resp.status == 404 and accept_404: - return {"result": True, "headers": resp.headers, "url": internet_host} - - reason = f"HTTP Status {resp.status}" - logger.warning(f"Session connection for host {internet_host} failed with status {resp.status}") - return {"result": False, "url": internet_host, "reason": reason} - except (aiohttp.ClientConnectionError, TimeoutError) as e: - reason = f"{type(e).__name__}" - logger.warning(f"Session connection for host {internet_host} failed ({reason})") - return {"result": False, "url": internet_host, "reason": reason} - except Exception as e: - # Catch other errors not related to timeouts like DNS errors, SSL certificates, etc. - reason = f"Unexpected error: {type(e).__name__}" - logger.error(f"Unexpected error for host {internet_host}: {e}", exc_info=True) - return {"result": False, "url": internet_host, "reason": reason} + resp.raise_for_status() + return {"result": resp.status, "headers": resp.headers, "url": internet_host} + except (aiohttp.ClientConnectionError, TimeoutError): + logger.warning(f"Session connection for host {internet_host} failed") + return {"result": False, "url": internet_host} @app.get("/internet") @@ -288,8 +241,6 @@ async def read_internet(): # Create a list of tasks to check the URLs in parallel tasks: set[asyncio.Task] = {asyncio.create_task(check_url(host, timeout_seconds)) for host in internet_hosts} - failures = [] - # While no tasks have completed, keep waiting for the next one to finish while tasks: done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) @@ -301,11 +252,10 @@ async def read_internet(): task.cancel() return result else: - failures.append(result) continue - # No URL was reachable, return the collected failure reasons - return JSONResponse(status_code=503, content={"result": False, "failures": failures}) + # No URL was reachable + return {"result": False} @app.get("/get_a_message") @@ -488,24 +438,18 @@ async def sign_a_message(): @app.get("/cache/get/{key}") async def get_from_cache(key: str): """Get data in the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) return await cache.get(key) @app.get("/cache/set/{key}/{value}") async def store_in_cache(key: str, value: str): """Store data in the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) return await cache.set(key, value) @app.get("/cache/remove/{key}") async def remove_from_cache(key: str): """Store data in the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) result = await cache.delete(key) return result == 1 @@ -513,8 +457,6 @@ async def remove_from_cache(key: str): @app.get("/cache/keys") async def keys_from_cache(pattern: str = "*"): """List keys from the VM cache""" - if cache is None: - return JSONResponse(status_code=503, content={"error": "Cache not initialized"}) return await cache.keys(pattern) diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index 52b85ee9d..ba69ab3e8 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -37,7 +37,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue. -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.5' 'aleph-message~=1.0.5' 'fastapi~=0.120.1' 'protobuf==6.33.0' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.1' 'fastapi~=0.109.2' 'protobuf==5.28.3' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py ("python -OO") From a6f4dc0b0541f12c4eb804a7d56a1cdc93b36f6c Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Thu, 6 Nov 2025 13:28:27 +0100 Subject: [PATCH 08/16] Revert "Fix: If the official API `official.aleph.cloud` is not available use another CCN URL to try to connect it." Fix: Refactor API failover to prevent test timeouts --- ...ackage-and-integration-tests.yml.disabled} | 0 ...lysis.yml => codeql-analysis.yml.disabled} | 0 ...ml => deploy-main-on-staging.yml.disabled} | 0 .../{pr-rating.yml => pr-rating.yml.disabled} | 0 ...s.yml => test-build-examples.yml.disabled} | 0 .github/workflows/test-using-pytest.yml | 74 +-- examples/example_fastapi/main.py | 521 +++++++++++------- 7 files changed, 373 insertions(+), 222 deletions(-) rename .github/workflows/{build-deb-package-and-integration-tests.yml => build-deb-package-and-integration-tests.yml.disabled} (100%) rename .github/workflows/{codeql-analysis.yml => codeql-analysis.yml.disabled} (100%) rename .github/workflows/{deploy-main-on-staging.yml => deploy-main-on-staging.yml.disabled} (100%) rename .github/workflows/{pr-rating.yml => pr-rating.yml.disabled} (100%) rename .github/workflows/{test-build-examples.yml => test-build-examples.yml.disabled} (100%) diff --git a/.github/workflows/build-deb-package-and-integration-tests.yml b/.github/workflows/build-deb-package-and-integration-tests.yml.disabled similarity index 100% rename from .github/workflows/build-deb-package-and-integration-tests.yml rename to .github/workflows/build-deb-package-and-integration-tests.yml.disabled diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml.disabled similarity index 100% rename from .github/workflows/codeql-analysis.yml rename to .github/workflows/codeql-analysis.yml.disabled diff --git a/.github/workflows/deploy-main-on-staging.yml b/.github/workflows/deploy-main-on-staging.yml.disabled similarity index 100% rename from .github/workflows/deploy-main-on-staging.yml rename to .github/workflows/deploy-main-on-staging.yml.disabled diff --git a/.github/workflows/pr-rating.yml b/.github/workflows/pr-rating.yml.disabled similarity index 100% rename from .github/workflows/pr-rating.yml rename to .github/workflows/pr-rating.yml.disabled diff --git a/.github/workflows/test-build-examples.yml b/.github/workflows/test-build-examples.yml.disabled similarity index 100% rename from .github/workflows/test-build-examples.yml rename to .github/workflows/test-build-examples.yml.disabled diff --git a/.github/workflows/test-using-pytest.yml b/.github/workflows/test-using-pytest.yml index 7abcbb904..49cb3b7d5 100644 --- a/.github/workflows/test-using-pytest.yml +++ b/.github/workflows/test-using-pytest.yml @@ -1,12 +1,15 @@ --- -name: "py.test and linting" +name: "Test execution only" +# name: "py.test and linting" on: push jobs: - tests-python: - name: "Test Python code" + test-execution: + # tests-python: + name: "Test execution.py only" + # name: "Test Python code" runs-on: ubuntu-22.04 services: # Run vm connector for the execution tests @@ -32,13 +35,13 @@ jobs: run: | python3 -m pip install hatch hatch-vcs coverage - - name: Test style wth ruff, black and isort - run: | - hatch run linting:style + # - name: Test style wth ruff, black and isort + # run: | + # hatch run linting:style - - name: Test typing with Mypy - run: | - hatch run linting:typing + # - name: Test typing with Mypy + # run: | + # hatch run linting:typing - name: Download and build required files for running tests. Copied from packaging/Makefile. run: | @@ -71,10 +74,13 @@ jobs: cd examples/volumes && bash build_squashfs.sh # Unit tests create and delete network interfaces, and therefore require to run as root - - name: Run unit tests + # Run only test_execution.py + - name: Run test_execution.py only + # - name: Run unit tests run: | sudo python3 -m pip install hatch hatch-vcs coverage - sudo hatch run testing:cov + sudo hatch run testing:test tests/supervisor/test_execution.py -v + # sudo hatch run testing:cov - name: Output modules used and their version if: ${{ !cancelled() }} @@ -83,26 +89,26 @@ jobs: sudo python3 -m pip install hatch hatch-vcs coverage sudo hatch -e testing run pip freeze - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v4.0.1 - with: - token: ${{ secrets.CODECOV_TOKEN }} - slug: aleph-im/aleph-vm - - code-quality-shell: - runs-on: ubuntu-22.04 - - steps: - - uses: actions/checkout@v4 - - - name: Workaround github issue https://github.com/actions/runner-images/issues/7192 - run: sudo echo RESET grub-efi/install_devices | sudo debconf-communicate grub-pc - - - name: Install required system packages only for Ubuntu Linux - run: | - sudo apt-get update - sudo apt-get install -y shellcheck - - - name: Run Shellcheck on all shell scripts - run: |- - find ./ -type f -name "*.sh" -exec shellcheck {} \; + # - name: Upload coverage reports to Codecov + # uses: codecov/codecov-action@v4.0.1 + # with: + # token: ${{ secrets.CODECOV_TOKEN }} + # slug: aleph-im/aleph-vm + + # code-quality-shell: + # runs-on: ubuntu-22.04 + # + # steps: + # - uses: actions/checkout@v4 + # + # - name: Workaround github issue https://github.com/actions/runner-images/issues/7192 + # run: sudo echo RESET grub-efi/install_devices | sudo debconf-communicate grub-pc + # + # - name: Install required system packages only for Ubuntu Linux + # run: | + # sudo apt-get update + # sudo apt-get install -y shellcheck + # + # - name: Run Shellcheck on all shell scripts + # run: |- + # find ./ -type f -name "*.sh" -exec shellcheck {} \; diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index 89c3b77bb..2723a0e3f 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -8,7 +8,7 @@ from datetime import datetime, timezone from os import listdir from pathlib import Path -from typing import Any +from typing import Any, Callable import aiohttp from aleph_message.models import ( @@ -36,6 +36,174 @@ logger = logging.getLogger(__name__) logger.debug("imports done") +ALEPH_API_HOSTS: list[str] = [ + "https://official.aleph.cloud", + "https://api.aleph.im", +] + +DEFAULT_TIMEOUT_SECONDS = 10 + + +class APIFailoverError(Exception): + """Raised when all API endpoints fail.""" + + pass + + +async def _safe_request_with_timeout( + host: str, + request_func: Callable[[str], Any], + timeout_seconds: float, +) -> dict[str, Any]: + """Safely execute a request function with timeout and error handling.""" + try: + result = await asyncio.wait_for( + request_func(host), + timeout=timeout_seconds, + ) + return {"host": host, "result": result} + except asyncio.TimeoutError as e: + logger.warning(f"Request to {host} timed out after {timeout_seconds}s") + return {"host": host, "error": e} + except aiohttp.ClientError as e: + logger.warning(f"Client error connecting to {host}: {e}") + return {"host": host, "error": e} + except Exception as e: + logger.error(f"Unexpected error connecting to {host}: {e}", exc_info=True) + return {"host": host, "error": e} + + +async def try_api_hosts( + api_hosts: list[str], + request_func: Callable[[str], Any], + timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS, + success_check: Callable[[Any], bool] | None = None, +) -> Any: + """Try multiple API hosts in parallel and return the first successful result.""" + if not api_hosts: + raise APIFailoverError("No API hosts provided") + + # Create tasks for all API hosts with individual timeouts + tasks: set[asyncio.Task] = set() + for host in api_hosts: + try: + task = asyncio.create_task( + _safe_request_with_timeout( + host=host, + request_func=request_func, + timeout_seconds=timeout_seconds, + ) + ) + tasks.add(task) + except Exception as e: + logger.warning(f"Failed to create task for host {host}: {e}") + continue + + if not tasks: + raise APIFailoverError("Failed to create any API request tasks") + + errors: list[tuple[str, Exception]] = [] + + try: + # Wait for tasks to complete one by one + while tasks: + done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + result = await task + host = result.get("host", "unknown") + + # Check if this is an error result + if "error" in result: + error = result["error"] + logger.warning(f"API request to {host} failed: {error}") + errors.append((host, error)) + continue + + # Extract the actual result + actual_result = result.get("result") + + # Validate the result if a success check is provided + if success_check and not success_check(actual_result): + logger.warning(f"API request to {host} returned invalid result") + errors.append((host, ValueError("Invalid result from API"))) + continue + + # Success! Cancel remaining tasks and return + logger.info(f"Successfully connected to API host: {host}") + for remaining_task in tasks: + remaining_task.cancel() + try: + await remaining_task + except asyncio.CancelledError: + pass + + return actual_result + + except asyncio.CancelledError: + continue + except Exception as e: + logger.error(f"Unexpected error processing task result: {e}", exc_info=True) + errors.append(("unknown", e)) + continue + + finally: + # Ensure all tasks are cancelled + for task in tasks: + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # If we get here, all hosts failed + error_summary = "; ".join([f"{host}: {str(error)}" for host, error in errors]) + raise APIFailoverError(f"All API hosts failed. Errors: {error_summary}") + + +async def try_url_check( + urls: list[str], + timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS, + socket_family: int | None = None, +) -> dict[str, Any]: + """Check multiple URLs in parallel and return the first successful connection.""" + + async def check_single_url(url: str) -> dict[str, Any]: + timeout = aiohttp.ClientTimeout(total=timeout_seconds) + connector_kwargs = {} + if socket_family is not None: + connector_kwargs["family"] = socket_family + + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(**connector_kwargs), + timeout=timeout, + ) as session: + async with session.get(url) as resp: + # Accept 404 as success for some endpoints (like Quad9) + if resp.status in (200, 404): + return {"result": True, "status": resp.status, "url": url, "headers": dict(resp.headers)} + else: + resp.raise_for_status() + return {"result": True, "status": resp.status, "url": url, "headers": dict(resp.headers)} + + try: + result = await try_api_hosts( + api_hosts=urls, + request_func=check_single_url, + timeout_seconds=timeout_seconds, + success_check=lambda r: r.get("result") is True, + ) + return result + except APIFailoverError as e: + logger.warning(f"All URL checks failed: {e}") + return {"result": False, "reason": str(e)} + except Exception as e: + logger.error(f"Unexpected error in URL check: {e}", exc_info=True) + return {"result": False, "reason": str(e)} + + http_app = FastAPI() app = AlephApp(http_app=http_app) app.add_middleware( @@ -49,16 +217,12 @@ startup_lifespan_executed: bool = False -ALEPH_API_HOSTS: list[str] = [ - "https://official.aleph.cloud", - "https://api.aleph.im", -] - @app.on_event("startup") async def startup_event() -> None: - global startup_lifespan_executed + global startup_lifespan_executed, cache startup_lifespan_executed = True + # Initialize cache - no network calls here to avoid blocking startup @app.get("/") @@ -183,35 +347,14 @@ async def connect_ipv4(): @app.get("/ip/6") async def connect_ipv6(): - """Connect to the Quad9 VPN provider using their IPv6 address. - The webserver on that address returns a 404 error, so we accept that response code. + """Connect to IPv6 DNS providers using their IPv6 addresses. + Tests IPv6 connectivity by trying multiple providers in parallel. """ ipv6_hosts: list[str] = [ "https://[2620:fe::fe]", # Quad9 DNS service - "https://[2606:4700:4700::1111]", # CloudFlare DNS service + "https://[2606:4700:4700::1111]", # Cloudflare DNS service ] - timeout_seconds = 5 - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET6)) for host in ipv6_hosts - } - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result["result"]: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return result - else: - continue - - # No IPv6 URL was reachable - return {"result": False} + return await try_url_check(urls=ipv6_hosts, timeout_seconds=5, socket_family=socket.AF_INET6) async def check_url(internet_host: str, timeout_seconds: int = 5, socket_family=socket.AF_INET): @@ -260,65 +403,77 @@ async def read_internet(): @app.get("/get_a_message") async def get_a_message(): - """Get a message from the Aleph.im network""" + """Get a message from the Aleph.im network with automatic failover.""" item_hash = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af" - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_message(host, item_hash)) for host in ALEPH_API_HOSTS} - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - result = done.pop().result() - - if result["status"]: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return result - else: - continue - - # No API Host was reachable - return {"result": False} + async def fetch_message(api_host: str): + """Fetch a message from a specific API host.""" + async with AlephHttpClient(api_server=api_host) as client: + message = await client.get_message( + item_hash=item_hash, + message_type=ProgramMessage, + ) + return message.dict() -async def get_aleph_message(api_host: str, item_hash: str): - async with AlephHttpClient(api_server=api_host) as client: - message = await client.get_message( - item_hash=item_hash, - message_type=ProgramMessage, + try: + result = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=fetch_message, + timeout_seconds=10, ) - return message.dict() + return result + except APIFailoverError as e: + logger.error(f"Failed to fetch message from all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.post("/post_a_message") async def post_with_remote_account(): """Post a message on the Aleph.im network using the remote account of the host.""" - account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - try: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - message, status = done.pop().result() - - if status == MessageStatus.PROCESSED: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return { - "message": message, - } - else: - continue - except aiohttp.client_exceptions.UnixClientConnectorError: - return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"}) + try: + account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") + except aiohttp.client_exceptions.UnixClientConnectorError: + return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"}) + except Exception as e: + logger.error(f"Failed to create remote account: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"error": f"Failed to create account: {str(e)}"}) + + async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: + """Post a message to a specific API host.""" + content = { + "date": datetime.now(tz=timezone.utc).isoformat(), + "test": True, + "answer": 42, + "something": "interesting", + } + async with AuthenticatedAlephHttpClient( + account=account, + api_server=api_host, + ) as client: + message, status = await client.create_post( + post_content=content, + post_type="test", + ref=None, + channel="TEST", + inline=True, + storage_engine=StorageEnum.storage, + sync=True, + ) + return message, status - # No API Host was reachable - return JSONResponse(status_code=500, content={"error": status}) + try: + message, status = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=post_message, + timeout_seconds=15, + success_check=lambda result: result[1] == MessageStatus.PROCESSED, + ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + return {"message": message} + except APIFailoverError as e: + logger.error(f"Failed to post message to all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.post("/post_a_message_local_account") @@ -326,101 +481,92 @@ async def post_with_local_account(): """Post a message on the Aleph.im network using a local private key.""" from aleph.sdk.chains.ethereum import get_fallback_account - account = get_fallback_account() - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS} - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - message, status = done.pop().result() - - if status == MessageStatus.PROCESSED: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return { - "message": message, - } - else: - continue - - # No API Host was reachable - return JSONResponse(status_code=500, content={"error": status}) - + try: + account = get_fallback_account() + except Exception as e: + logger.error(f"Failed to get fallback account: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"error": f"Failed to create account: {str(e)}"}) + + async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: + """Post a message to a specific API host.""" + content = { + "date": datetime.now(tz=timezone.utc).isoformat(), + "test": True, + "answer": 42, + "something": "interesting", + } + async with AuthenticatedAlephHttpClient( + account=account, + api_server=api_host, + allow_unix_sockets=False, + ) as client: + message, status = await client.create_post( + post_content=content, + post_type="test", + ref=None, + channel="TEST", + inline=True, + storage_engine=StorageEnum.storage, + sync=True, + ) + return message, status -async def send_post_aleph_message(api_host: str, account: RemoteAccount | ETHAccount): - """Post a message on the Aleph.im network using a local or the remote account of the host.""" - content = { - "date": datetime.now(tz=timezone.utc).isoformat(), - "test": True, - "answer": 42, - "something": "interesting", - } - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: - message: PostMessage - status: MessageStatus - return await client.create_post( - post_content=content, - post_type="test", - ref=None, - channel="TEST", - inline=True, - storage_engine=StorageEnum.storage, - sync=True, + try: + message, status = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=post_message, + timeout_seconds=15, + success_check=lambda result: result[1] == MessageStatus.PROCESSED, ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + return {"message": message} + except APIFailoverError as e: + logger.error(f"Failed to post message to all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.post("/post_a_file") async def post_a_file(): + """Store a file on the Aleph.im network using a local account.""" from aleph.sdk.chains.ethereum import get_fallback_account - account = get_fallback_account() - file_path = Path(__file__).absolute() - - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = { - asyncio.create_task(send_store_aleph_message(host, account, file_path)) for host in ALEPH_API_HOSTS - } - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - message, status = done.pop().result() - - if status == MessageStatus.PROCESSED: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return { - "message": message, - } - else: - continue + try: + account = get_fallback_account() + except Exception as e: + logger.error(f"Failed to get fallback account: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"error": f"Failed to create account: {str(e)}"}) - # No API Host was reachable - return JSONResponse(status_code=500, content={"error": status}) + file_path = Path(__file__).absolute() + async def store_file(api_host: str) -> tuple[StoreMessage, MessageStatus]: + """Store a file on a specific API host.""" + async with AuthenticatedAlephHttpClient( + account=account, + api_server=api_host, + ) as client: + message, status = await client.create_store( + file_path=file_path, + ref=None, + channel="TEST", + storage_engine=StorageEnum.storage, + sync=True, + ) + return message, status -async def send_store_aleph_message(api_host: str, account: ETHAccount, file_path: Path): - """Store a file on the Aleph.im network using a local account.""" - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: - message: StoreMessage - status: MessageStatus - return await client.create_store( - file_path=file_path, - ref=None, - channel="TEST", - storage_engine=StorageEnum.storage, - sync=True, + try: + message, status = await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=store_file, + timeout_seconds=20, + success_check=lambda result: result[1] == MessageStatus.PROCESSED, ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + return {"message": message} + except APIFailoverError as e: + logger.error(f"Failed to store file to all API hosts: {e}") + return JSONResponse(status_code=503, content={"error": "All API hosts unavailable", "details": str(e)}) @app.get("/sign_a_message") @@ -526,30 +672,29 @@ def platform_pip_freeze() -> list[str]: @app.event(filters=filters) async def aleph_event(event) -> dict[str, str]: - # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_json(host)) for host in ALEPH_API_HOSTS} - - # While no tasks have completed, keep waiting for the next one to finish - while tasks: - done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - status = done.pop().result() - - if status: - # The task was successful, cancel the remaining tasks and return the result - for task in tasks: - task.cancel() - return {"result": "Good"} - else: - continue - - return {"result": "Bad"} + """Handle Aleph events by verifying API connectivity.""" + logger.info(f"Received aleph_event: {event}") + async def check_api_info(api_host: str) -> bool: + """Check if API info endpoint is accessible.""" + try: + timeout = aiohttp.ClientTimeout(total=5) + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(), timeout=timeout) as session: + async with session.get(f"{api_host}/api/v0/info/public.json") as resp: + resp.raise_for_status() + return True + except Exception as e: + logger.warning(f"Failed to check API info at {api_host}: {e}") + return False -async def get_aleph_json(api_host: str) -> bool: try: - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session: - async with session.get(f"{api_host}/api/v0/info/public.json") as resp: - resp.raise_for_status() - return True - except Exception: - return False + await try_api_hosts( + api_hosts=ALEPH_API_HOSTS, + request_func=check_api_info, + timeout_seconds=5, + success_check=lambda result: result is True, + ) + return {"result": "Good"} + except APIFailoverError as e: + logger.error(f"All API hosts failed for event handling: {e}") + return {"result": "Bad", "error": str(e)} \ No newline at end of file From dedf57abe1fa9807761540e35bef683f55112268 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Fri, 7 Nov 2025 12:40:00 +0100 Subject: [PATCH 09/16] TEST fixed! --- ...ild-deb-package-and-integration-tests.yml} | 0 ...lysis.yml.disabled => codeql-analysis.yml} | 0 ...ml.disabled => deploy-main-on-staging.yml} | 0 .../{pr-rating.yml.disabled => pr-rating.yml} | 0 ...s.yml.disabled => test-build-examples.yml} | 0 .github/workflows/test-using-pytest.yml | 68 +++++++------- examples/example_fastapi/main.py | 88 ++++++++++++++----- 7 files changed, 97 insertions(+), 59 deletions(-) rename .github/workflows/{build-deb-package-and-integration-tests.yml.disabled => build-deb-package-and-integration-tests.yml} (100%) rename .github/workflows/{codeql-analysis.yml.disabled => codeql-analysis.yml} (100%) rename .github/workflows/{deploy-main-on-staging.yml.disabled => deploy-main-on-staging.yml} (100%) rename .github/workflows/{pr-rating.yml.disabled => pr-rating.yml} (100%) rename .github/workflows/{test-build-examples.yml.disabled => test-build-examples.yml} (100%) diff --git a/.github/workflows/build-deb-package-and-integration-tests.yml.disabled b/.github/workflows/build-deb-package-and-integration-tests.yml similarity index 100% rename from .github/workflows/build-deb-package-and-integration-tests.yml.disabled rename to .github/workflows/build-deb-package-and-integration-tests.yml diff --git a/.github/workflows/codeql-analysis.yml.disabled b/.github/workflows/codeql-analysis.yml similarity index 100% rename from .github/workflows/codeql-analysis.yml.disabled rename to .github/workflows/codeql-analysis.yml diff --git a/.github/workflows/deploy-main-on-staging.yml.disabled b/.github/workflows/deploy-main-on-staging.yml similarity index 100% rename from .github/workflows/deploy-main-on-staging.yml.disabled rename to .github/workflows/deploy-main-on-staging.yml diff --git a/.github/workflows/pr-rating.yml.disabled b/.github/workflows/pr-rating.yml similarity index 100% rename from .github/workflows/pr-rating.yml.disabled rename to .github/workflows/pr-rating.yml diff --git a/.github/workflows/test-build-examples.yml.disabled b/.github/workflows/test-build-examples.yml similarity index 100% rename from .github/workflows/test-build-examples.yml.disabled rename to .github/workflows/test-build-examples.yml diff --git a/.github/workflows/test-using-pytest.yml b/.github/workflows/test-using-pytest.yml index 49cb3b7d5..b71d6fec6 100644 --- a/.github/workflows/test-using-pytest.yml +++ b/.github/workflows/test-using-pytest.yml @@ -1,6 +1,5 @@ --- -name: "Test execution only" -# name: "py.test and linting" +name: "py.test and linting" on: push @@ -8,8 +7,7 @@ on: push jobs: test-execution: # tests-python: - name: "Test execution.py only" - # name: "Test Python code" + name: "Test Python code" runs-on: ubuntu-22.04 services: # Run vm connector for the execution tests @@ -35,13 +33,13 @@ jobs: run: | python3 -m pip install hatch hatch-vcs coverage - # - name: Test style wth ruff, black and isort - # run: | - # hatch run linting:style + - name: Test style wth ruff, black and isort + run: | + hatch run linting:style - # - name: Test typing with Mypy - # run: | - # hatch run linting:typing + - name: Test typing with Mypy + run: | + hatch run linting:typing - name: Download and build required files for running tests. Copied from packaging/Makefile. run: | @@ -74,9 +72,7 @@ jobs: cd examples/volumes && bash build_squashfs.sh # Unit tests create and delete network interfaces, and therefore require to run as root - # Run only test_execution.py - - name: Run test_execution.py only - # - name: Run unit tests + - name: Run unit tests run: | sudo python3 -m pip install hatch hatch-vcs coverage sudo hatch run testing:test tests/supervisor/test_execution.py -v @@ -89,26 +85,26 @@ jobs: sudo python3 -m pip install hatch hatch-vcs coverage sudo hatch -e testing run pip freeze - # - name: Upload coverage reports to Codecov - # uses: codecov/codecov-action@v4.0.1 - # with: - # token: ${{ secrets.CODECOV_TOKEN }} - # slug: aleph-im/aleph-vm - - # code-quality-shell: - # runs-on: ubuntu-22.04 - # - # steps: - # - uses: actions/checkout@v4 - # - # - name: Workaround github issue https://github.com/actions/runner-images/issues/7192 - # run: sudo echo RESET grub-efi/install_devices | sudo debconf-communicate grub-pc - # - # - name: Install required system packages only for Ubuntu Linux - # run: | - # sudo apt-get update - # sudo apt-get install -y shellcheck - # - # - name: Run Shellcheck on all shell scripts - # run: |- - # find ./ -type f -name "*.sh" -exec shellcheck {} \; + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4.0.1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + slug: aleph-im/aleph-vm + + code-quality-shell: + runs-on: ubuntu-22.04 + + steps: + - uses: actions/checkout@v4 + + - name: Workaround github issue https://github.com/actions/runner-images/issues/7192 + run: sudo echo RESET grub-efi/install_devices | sudo debconf-communicate grub-pc + + - name: Install required system packages only for Ubuntu Linux + run: | + sudo apt-get update + sudo apt-get install -y shellcheck + + - name: Run Shellcheck on all shell scripts + run: |- + find ./ -type f -name "*.sh" -exec shellcheck {} \; diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index 2723a0e3f..8ce47e9a3 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -8,7 +8,7 @@ from datetime import datetime, timezone from os import listdir from pathlib import Path -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable import aiohttp from aleph_message.models import ( @@ -25,7 +25,6 @@ from pydantic import BaseModel from starlette.responses import JSONResponse -from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.chains.remote import RemoteAccount from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.query.filters import MessageFilter @@ -33,9 +32,14 @@ from aleph.sdk.vm.app import AlephApp from aleph.sdk.vm.cache import VmCache +if TYPE_CHECKING: + from aleph.sdk.vm.cache import VmCache + + logger = logging.getLogger(__name__) logger.debug("imports done") +# API Failover code - commented out for testing ALEPH_API_HOSTS: list[str] = [ "https://official.aleph.cloud", "https://api.aleph.im", @@ -213,16 +217,31 @@ async def check_single_url(url: str) -> dict[str, Any]: allow_methods=["*"], allow_headers=["*"], ) -cache = VmCache() +# Initialize cache as None - will be created during startup event +cache: VmCache | None = None startup_lifespan_executed: bool = False +cache = VmCache() + @app.on_event("startup") async def startup_event() -> None: - global startup_lifespan_executed, cache + global startup_lifespan_executed startup_lifespan_executed = True - # Initialize cache - no network calls here to avoid blocking startup + + +def get_cache(): + """Get or create the cache instance on-demand (lazy import + lazy initialization).""" + global cache + if cache is None: + try: + cache = VmCache() + logger.info("Cache initialized on-demand") + except Exception as e: + logger.warning(f"Failed to initialize cache: {e}") + # Don't retry - return None so endpoint can handle it + return cache @app.get("/") @@ -403,7 +422,7 @@ async def read_internet(): @app.get("/get_a_message") async def get_a_message(): - """Get a message from the Aleph.im network with automatic failover.""" + """Get a message from the Aleph.im network.""" item_hash = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af" async def fetch_message(api_host: str): @@ -446,10 +465,7 @@ async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: "answer": 42, "something": "interesting", } - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=api_host) as client: message, status = await client.create_post( post_content=content, post_type="test", @@ -459,6 +475,7 @@ async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: storage_engine=StorageEnum.storage, sync=True, ) + return message, status try: @@ -496,9 +513,7 @@ async def post_message(api_host: str) -> tuple[PostMessage, MessageStatus]: "something": "interesting", } async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - allow_unix_sockets=False, + account=account, api_server=api_host, allow_unix_sockets=False ) as client: message, status = await client.create_post( post_content=content, @@ -531,6 +546,24 @@ async def post_a_file(): """Store a file on the Aleph.im network using a local account.""" from aleph.sdk.chains.ethereum import get_fallback_account + """ OLD + account = get_fallback_account() + file_path = Path(__file__).absolute() + + async with AuthenticatedAlephHttpClient(account=account) as client: + message, status = await client.create_store( + file_path=file_path, + ref=None, + channel="TEST", + storage_engine=StorageEnum.storage, + sync=True, + ) + + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": f"Message status: {status}"}) + + return {"message": message} + """ try: account = get_fallback_account() except Exception as e: @@ -541,10 +574,7 @@ async def post_a_file(): async def store_file(api_host: str) -> tuple[StoreMessage, MessageStatus]: """Store a file on a specific API host.""" - async with AuthenticatedAlephHttpClient( - account=account, - api_server=api_host, - ) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=api_host) as client: message, status = await client.create_store( file_path=file_path, ref=None, @@ -584,26 +614,38 @@ async def sign_a_message(): @app.get("/cache/get/{key}") async def get_from_cache(key: str): """Get data in the VM cache""" - return await cache.get(key) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + return await cache_instance.get(key) @app.get("/cache/set/{key}/{value}") async def store_in_cache(key: str, value: str): """Store data in the VM cache""" - return await cache.set(key, value) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + return await cache_instance.set(key, value) @app.get("/cache/remove/{key}") async def remove_from_cache(key: str): """Store data in the VM cache""" - result = await cache.delete(key) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + result = await cache_instance.delete(key) return result == 1 @app.get("/cache/keys") async def keys_from_cache(pattern: str = "*"): """List keys from the VM cache""" - return await cache.keys(pattern) + cache_instance = get_cache() + if cache_instance is None: + return JSONResponse(status_code=503, content={"error": "Cache not available"}) + return await cache_instance.keys(pattern) @app.get("/state/increment") @@ -672,7 +714,7 @@ def platform_pip_freeze() -> list[str]: @app.event(filters=filters) async def aleph_event(event) -> dict[str, str]: - """Handle Aleph events by verifying API connectivity.""" + """Handle Aleph events.""" logger.info(f"Received aleph_event: {event}") async def check_api_info(api_host: str) -> bool: @@ -697,4 +739,4 @@ async def check_api_info(api_host: str) -> bool: return {"result": "Good"} except APIFailoverError as e: logger.error(f"All API hosts failed for event handling: {e}") - return {"result": "Bad", "error": str(e)} \ No newline at end of file + return {"result": "Bad", "error": str(e)} From 603f4a7709861670b388e0f39d2fce84abc0591c Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Fri, 7 Nov 2025 13:28:59 +0100 Subject: [PATCH 10/16] version bump: fastapi to 0.121.0 --- examples/example_fastapi/main.py | 23 ++++++------------- .../create_disk_image.sh | 2 +- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index 8ce47e9a3..b70ab1832 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -185,12 +185,7 @@ async def check_single_url(url: str) -> dict[str, Any]: timeout=timeout, ) as session: async with session.get(url) as resp: - # Accept 404 as success for some endpoints (like Quad9) - if resp.status in (200, 404): - return {"result": True, "status": resp.status, "url": url, "headers": dict(resp.headers)} - else: - resp.raise_for_status() - return {"result": True, "status": resp.status, "url": url, "headers": dict(resp.headers)} + return {"result": True, "status": resp.status, "url": url, "headers": dict(resp.headers)} try: result = await try_api_hosts( @@ -352,16 +347,12 @@ async def ip_address(): @app.get("/ip/4") async def connect_ipv4(): - """Connect to the Quad9 VPN provider using their IPv4 address.""" - ipv4_host = "9.9.9.9" - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(5) - sock.connect((ipv4_host, 53)) - return {"result": True} - except TimeoutError: - logger.warning(f"Socket connection for host {ipv4_host} failed") - return {"result": False} + ipv4_hosts: list[str] = [ + "https://9.9.9.9", # Quad9 DNS service + "https://1.1.1.1", # Cloudflare DNS service + "https://94.140.14.14", # AdGuard DNS service + ] + return await try_url_check(urls=ipv4_hosts, timeout_seconds=5, socket_family=socket.AF_INET) @app.get("/ip/6") diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index ba69ab3e8..a85e21624 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -37,7 +37,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue. -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.1' 'fastapi~=0.109.2' 'protobuf==5.28.3' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.1' 'fastapi~=0.121.0' 'protobuf==5.28.3' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py ("python -OO") From 80902e2610fdd518a4bcda917549afc690a1b917 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Fri, 7 Nov 2025 13:51:45 +0100 Subject: [PATCH 11/16] version bump: aleph-message 1.0.5 --- runtimes/aleph-debian-12-python/create_disk_image.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index a85e21624..4c8ede1f8 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -37,7 +37,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue. -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.1' 'fastapi~=0.121.0' 'protobuf==5.28.3' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.5' 'fastapi~=0.121.0' 'protobuf==5.28.3' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py ("python -OO") From d58b1f388fa51a5296e26ac6282c31058964e408 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Fri, 7 Nov 2025 13:53:43 +0100 Subject: [PATCH 12/16] version bump: aleph-sdk-python 2.0.5 --- runtimes/aleph-debian-12-python/create_disk_image.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index 4c8ede1f8..68490c87f 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -37,7 +37,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue. -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.5' 'fastapi~=0.121.0' 'protobuf==5.28.3' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.5' 'aleph-message~=1.0.5' 'fastapi~=0.121.0' 'protobuf==5.28.3' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py ("python -OO") From 443be780cc7a3c082412620868a439be83d490e4 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Fri, 7 Nov 2025 14:25:56 +0100 Subject: [PATCH 13/16] Revert "version bump: aleph-sdk-python 2.0.5" This reverts commit d58b1f388fa51a5296e26ac6282c31058964e408. --- runtimes/aleph-debian-12-python/create_disk_image.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index 68490c87f..4c8ede1f8 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -37,7 +37,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue. -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.5' 'aleph-message~=1.0.5' 'fastapi~=0.121.0' 'protobuf==5.28.3' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==2.0.0' 'aleph-message~=1.0.5' 'fastapi~=0.121.0' 'protobuf==5.28.3' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py ("python -OO") From 6efe0ee141d443e609548a86ad30926bd186788f Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Wed, 22 Oct 2025 12:38:30 +0200 Subject: [PATCH 14/16] Fix: Solve rootfs resizing issue updating the cloud-init tool version when mounting the base image. --- examples/example_confidential_image/setup_debian_rootfs.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/example_confidential_image/setup_debian_rootfs.sh b/examples/example_confidential_image/setup_debian_rootfs.sh index 36fe54921..b1e944baa 100644 --- a/examples/example_confidential_image/setup_debian_rootfs.sh +++ b/examples/example_confidential_image/setup_debian_rootfs.sh @@ -60,6 +60,9 @@ locale-gen "en_US.UTF-8" DEBIAN_FRONTEND=noninteractive apt update DEBIAN_FRONTEND=noninteractive apt install -y -f openssh-server openssh-client cryptsetup cryptsetup-initramfs cloud-init +# Force to update cloud-init tool to prevent bugs from old versions +DEBIAN_FRONTEND=noninteractive apt install -y -f cloud-init + # The original password of the OS partition. Must be provided by the caller of the script. BOOT_KEY_FILE="${SCRIPT_DIR}/os_partition.key" From f1ce51be5f272e202328a54b894ab7d4c3aeaefe Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Mon, 3 Nov 2025 16:05:55 +0100 Subject: [PATCH 15/16] Fix: Unified apt install command into a unique one updating also the comment. --- examples/example_confidential_image/setup_debian_rootfs.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/examples/example_confidential_image/setup_debian_rootfs.sh b/examples/example_confidential_image/setup_debian_rootfs.sh index b1e944baa..36fe54921 100644 --- a/examples/example_confidential_image/setup_debian_rootfs.sh +++ b/examples/example_confidential_image/setup_debian_rootfs.sh @@ -60,9 +60,6 @@ locale-gen "en_US.UTF-8" DEBIAN_FRONTEND=noninteractive apt update DEBIAN_FRONTEND=noninteractive apt install -y -f openssh-server openssh-client cryptsetup cryptsetup-initramfs cloud-init -# Force to update cloud-init tool to prevent bugs from old versions -DEBIAN_FRONTEND=noninteractive apt install -y -f cloud-init - # The original password of the OS partition. Must be provided by the caller of the script. BOOT_KEY_FILE="${SCRIPT_DIR}/os_partition.key" From a5a7e7bc0c23a9673c1da0492d91c9d589a18ff7 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Mon, 10 Nov 2025 06:10:47 +0100 Subject: [PATCH 16/16] Enable all tests --- .github/workflows/test-using-pytest.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test-using-pytest.yml b/.github/workflows/test-using-pytest.yml index b71d6fec6..7abcbb904 100644 --- a/.github/workflows/test-using-pytest.yml +++ b/.github/workflows/test-using-pytest.yml @@ -5,8 +5,7 @@ on: push jobs: - test-execution: - # tests-python: + tests-python: name: "Test Python code" runs-on: ubuntu-22.04 services: @@ -75,8 +74,7 @@ jobs: - name: Run unit tests run: | sudo python3 -m pip install hatch hatch-vcs coverage - sudo hatch run testing:test tests/supervisor/test_execution.py -v - # sudo hatch run testing:cov + sudo hatch run testing:cov - name: Output modules used and their version if: ${{ !cancelled() }}