From 223fd936aa6959b3c66576d189189571625b3722 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Wed, 12 Mar 2025 12:10:20 +0100 Subject: [PATCH 1/2] Fix: Trusted 3rd party IPFS server IPFS Kubo is now present on all CRNs but unused. Downloading files from this service would be more reliable than using third party and provide content integrity checks. This bypasses the vm-connector when fetching resources from IPFS. --- src/aleph/vm/conf.py | 1 + src/aleph/vm/orchestrator/messages.py | 4 +- src/aleph/vm/storage.py | 90 ++++++++++++++++++------- tests/supervisor/test_checkpayment.py | 1 + tests/supervisor/test_execution.py | 6 +- tests/supervisor/test_instance.py | 5 +- tests/supervisor/test_qemu_instance.py | 7 +- tests/supervisor/views/test_operator.py | 12 ++-- 8 files changed, 87 insertions(+), 39 deletions(-) diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 0c6d9cbec..5de7816c0 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -126,6 +126,7 @@ class Settings(BaseSettings): WATCH_FOR_UPDATES: bool = True API_SERVER: str = "https://official.aleph.cloud" + IPFS_SERVER: Url = Url("http://localhost:8080/ipfs") # Connect to the Quad9 VPN provider using their IPv4 and IPv6 addresses. CONNECTIVITY_IPV4_URL: str = "https://9.9.9.9/" CONNECTIVITY_IPV6_URL: str = "https://[2620:fe::fe]/" diff --git a/src/aleph/vm/orchestrator/messages.py b/src/aleph/vm/orchestrator/messages.py index 5ae67102c..f1826ba2c 100644 --- a/src/aleph/vm/orchestrator/messages.py +++ b/src/aleph/vm/orchestrator/messages.py @@ -7,13 +7,13 @@ from aleph_message.status import MessageStatus from aleph.vm.conf import settings -from aleph.vm.storage import get_latest_amend, get_message +from aleph.vm.storage import get_executable_message, get_latest_amend async def try_get_message(ref: str) -> ExecutableMessage: """Get the message or raise an aiohttp HTTP error""" try: - return await get_message(ref) + return await get_executable_message(ref) except ClientConnectorError as error: raise HTTPServiceUnavailable(reason="Aleph Connector unavailable") from error except ClientResponseError as error: diff --git a/src/aleph/vm/storage.py b/src/aleph/vm/storage.py index 58d6f78c2..a6eef447c 100644 --- a/src/aleph/vm/storage.py +++ b/src/aleph/vm/storage.py @@ -10,7 +10,6 @@ import logging import re import sys -import uuid from datetime import datetime, timezone from pathlib import Path from shutil import copy2, make_archive @@ -18,9 +17,12 @@ import aiohttp from aleph_message.models import ( + AlephMessage, InstanceMessage, ItemHash, + ItemType, ProgramMessage, + StoreMessage, parse_message, ) from aleph_message.models.execution.instance import RootfsVolume @@ -133,6 +135,32 @@ async def download_file(url: str, local_path: Path) -> None: tmp_path.unlink(missing_ok=True) +async def download_file_from_ipfs_or_connector(ref: str, cache_path: Path, filetype: str) -> None: + """Download a file from the IPFS Gateway if possible, else from the vm-connector.""" + + if cache_path.is_file(): + logger.debug(f"File already exists: {cache_path}") + return + + message: StoreMessage = await get_store_message(ref) + + if message.content.item_type == ItemType.ipfs: + # Download IPFS files from the IPFS gateway directly + cid = message.content.item_hash + url = f"{settings.IPFS_SERVER}/{cid}" + await download_file(url, cache_path) + else: + # Download via the vm-connector + path_mapping = { + "runtime": "/download/runtime", + "code": "/download/code", + "data": "/download/data", + } + path = path_mapping[filetype] + url = f"{settings.CONNECTOR_URL}{path}/{ref}" + await download_file(url, cache_path) + + async def get_latest_amend(item_hash: str) -> str: if settings.FAKE_DATA_PROGRAM: return item_hash @@ -146,7 +174,26 @@ async def get_latest_amend(item_hash: str) -> str: return result or item_hash -async def get_message(ref: str) -> ProgramMessage | InstanceMessage: +async def load_message(path: Path) -> AlephMessage: + """Load a message from the cache on disk.""" + with open(path) as cache_file: + msg = json.load(cache_file) + + if path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE): + # Ensure validation passes while tweaking message content + msg = fix_message_validation(msg) + + return parse_message(message_dict=msg) + + +async def get_message(ref: str) -> AlephMessage: + cache_path = (Path(settings.MESSAGE_CACHE) / ref).with_suffix(".json") + url = f"{settings.CONNECTOR_URL}/download/message/{ref}" + await download_file(url, cache_path) + return await load_message(cache_path) + + +async def get_executable_message(ref: str) -> ProgramMessage | InstanceMessage: if ref == settings.FAKE_INSTANCE_ID: logger.debug("Using the fake instance message since the ref matches") cache_path = settings.FAKE_INSTANCE_MESSAGE @@ -158,23 +205,22 @@ async def get_message(ref: str) -> ProgramMessage | InstanceMessage: url = f"{settings.CONNECTOR_URL}/download/message/{ref}" await download_file(url, cache_path) - with open(cache_path) as cache_file: - msg = json.load(cache_file) + return await load_message(cache_path) - if cache_path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE): - # Ensure validation passes while tweaking message content - msg = fix_message_validation(msg) - result = parse_message(message_dict=msg) - assert isinstance(result, InstanceMessage | ProgramMessage), "Parsed message is not executable" - return result +async def get_store_message(ref: str) -> StoreMessage: + message = await get_message(ref) + if not isinstance(message, StoreMessage): + msg = f"Expected a store message, got {message.type}" + raise ValueError(msg) + return message async def get_code_path(ref: str) -> Path: if settings.FAKE_DATA_PROGRAM: archive_path = Path(settings.FAKE_DATA_PROGRAM) - encoding: Encoding = (await get_message(ref="fake-message")).content.code.encoding + encoding: Encoding = (await get_executable_message(ref="fake-message")).content.code.encoding if encoding == Encoding.squashfs: squashfs_path = Path(archive_path.name + ".squashfs") squashfs_path.unlink(missing_ok=True) @@ -191,8 +237,7 @@ async def get_code_path(ref: str) -> Path: raise ValueError(msg) cache_path = Path(settings.CODE_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/code/{ref}" - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "code") return cache_path @@ -203,8 +248,7 @@ async def get_data_path(ref: str) -> Path: return Path(f"{data_dir}.zip") cache_path = Path(settings.DATA_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/data/{ref}" - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "data") return cache_path @@ -224,11 +268,7 @@ async def get_runtime_path(ref: str) -> Path: return Path(settings.FAKE_DATA_RUNTIME) cache_path = Path(settings.RUNTIME_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}" - - if not cache_path.is_file(): - # File does not exist, download it - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "runtime") await check_squashfs_integrity(cache_path) await chown_to_jailman(cache_path) @@ -242,8 +282,10 @@ async def get_rootfs_base_path(ref: ItemHash) -> Path: return Path(settings.FAKE_INSTANCE_BASE) cache_path = Path(settings.RUNTIME_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}" - await download_file(url, cache_path) + + # if not cache_path.is_file(): + await download_file_from_ipfs_or_connector(ref, cache_path, "runtime") + await chown_to_jailman(cache_path) return cache_path @@ -364,8 +406,8 @@ async def get_existing_file(ref: str) -> Path: return Path(settings.FAKE_DATA_VOLUME) cache_path = Path(settings.DATA_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/data/{ref}" - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "data") + await chown_to_jailman(cache_path) return cache_path diff --git a/tests/supervisor/test_checkpayment.py b/tests/supervisor/test_checkpayment.py index 3671114de..d554b7bc7 100644 --- a/tests/supervisor/test_checkpayment.py +++ b/tests/supervisor/test_checkpayment.py @@ -149,6 +149,7 @@ async def compute_required_flow(executions): async def test_not_enough_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") + mocker.patch.object(settings, "IPFS_SERVER", "https://ipfs.io/ipfs") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) diff --git a/tests/supervisor/test_execution.py b/tests/supervisor/test_execution.py index b064a084a..812404441 100644 --- a/tests/supervisor/test_execution.py +++ b/tests/supervisor/test_execution.py @@ -12,7 +12,7 @@ from aleph.vm.models import VmExecution from aleph.vm.orchestrator import metrics from aleph.vm.orchestrator.messages import load_updated_message -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.utils import fix_message_validation @@ -33,6 +33,7 @@ async def test_create_execution(mocker): mock_settings.FAKE_DATA_PROGRAM = mock_settings.BENCHMARK_FAKE_DATA_PROGRAM mock_settings.ALLOW_VM_NETWORKING = False mock_settings.USE_JAILER = False + mock_settings.IPFS_SERVER = "https://ipfs.io/ipfs" logging.basicConfig(level=logging.DEBUG) mock_settings.PRINT_SYSTEM_LOGS = True @@ -46,7 +47,7 @@ async def test_create_execution(mocker): await metrics.create_tables(engine) vm_hash = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe") - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) execution = VmExecution( vm_hash=vm_hash, @@ -78,6 +79,7 @@ async def test_create_execution_online(vm_hash: ItemHash = None): """ vm_hash = vm_hash or settings.CHECK_FASTAPI_VM_ID + settings.IPFS_SERVER = "https://ipfs.io/ipfs" # Ensure that the settings are correct and required files present. settings.setup() diff --git a/tests/supervisor/test_instance.py b/tests/supervisor/test_instance.py index 1fc1f12ba..69e0d0fa2 100644 --- a/tests/supervisor/test_instance.py +++ b/tests/supervisor/test_instance.py @@ -13,7 +13,7 @@ from aleph.vm.models import VmExecution from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from aleph.vm.orchestrator import metrics -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.systemd import SystemDManager from aleph.vm.vm_type import VmType @@ -55,6 +55,7 @@ async def test_create_instance(): # settings.FAKE_INSTANCE_MESSAGE settings.ALLOW_VM_NETWORKING = True settings.USE_JAILER = True + settings.IPFS_SERVER = "https://ipfs.io/ipfs" logging.basicConfig(level=logging.DEBUG) settings.PRINT_SYSTEM_LOGS = True @@ -70,7 +71,7 @@ async def test_create_instance(): await metrics.create_tables(engine) vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) mock_systemd_manager = MockSystemDManager() diff --git a/tests/supervisor/test_qemu_instance.py b/tests/supervisor/test_qemu_instance.py index 56d4fc145..1e76d19a4 100644 --- a/tests/supervisor/test_qemu_instance.py +++ b/tests/supervisor/test_qemu_instance.py @@ -13,7 +13,7 @@ from aleph.vm.models import VmExecution from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from aleph.vm.orchestrator import metrics -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.systemd import SystemDManager from aleph.vm.vm_type import VmType @@ -69,7 +69,7 @@ async def test_create_qemu_instance(): await metrics.create_tables(engine) vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) mock_systemd_manager = MockSystemDManager() @@ -112,6 +112,7 @@ async def test_create_qemu_instance_online(): settings.ENABLE_CONFIDENTIAL_COMPUTING = False settings.ALLOW_VM_NETWORKING = True settings.USE_JAILER = False + settings.IPFS_SERVER = "https://ipfs.io/ipfs" logging.basicConfig(level=logging.DEBUG) @@ -126,7 +127,7 @@ async def test_create_qemu_instance_online(): await metrics.create_tables(engine) vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) mock_systemd_manager = MockSystemDManager() diff --git a/tests/supervisor/views/test_operator.py b/tests/supervisor/views/test_operator.py index 8a6c70485..51ad5323d 100644 --- a/tests/supervisor/views/test_operator.py +++ b/tests/supervisor/views/test_operator.py @@ -14,7 +14,7 @@ from aleph.vm.conf import settings from aleph.vm.orchestrator.metrics import ExecutionRecord from aleph.vm.orchestrator.supervisor import setup_webapp -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.utils.logs import EntryDict from aleph.vm.utils.test_helpers import ( generate_signer_and_signed_headers_for_operation, @@ -72,7 +72,7 @@ async def test_operator_confidential_initialize_already_running(aiohttp_client, settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.Mock( executions={ @@ -115,7 +115,7 @@ async def test_operator_expire(aiohttp_client, mocker): settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.Mock( executions={ @@ -154,7 +154,7 @@ async def test_operator_stop(aiohttp_client, mocker): settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.AsyncMock( executions={ @@ -190,7 +190,7 @@ async def test_operator_confidential_initialize_not_confidential(aiohttp_client, settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.Mock( executions={ @@ -232,7 +232,7 @@ async def test_operator_confidential_initialize(aiohttp_client): settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) class FakeExecution: message = instance_message.content From dadad88f73a9ed52a24e378992945ea2f7325925 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Wed, 12 Mar 2025 15:29:28 +0100 Subject: [PATCH 2/2] Fix: IPFS config was not properly provided. Kubo service could also use unlimited memory. --- packaging/aleph-vm/etc/ipfs/kubo.json | 17 ----------------- .../aleph-vm/etc/systemd/system/ipfs.service | 11 ++++++++++- 2 files changed, 10 insertions(+), 18 deletions(-) delete mode 100644 packaging/aleph-vm/etc/ipfs/kubo.json diff --git a/packaging/aleph-vm/etc/ipfs/kubo.json b/packaging/aleph-vm/etc/ipfs/kubo.json deleted file mode 100644 index 9957b142e..000000000 --- a/packaging/aleph-vm/etc/ipfs/kubo.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "AutoNAT": { - "ServiceMode": "disabled" - }, - "AddrFilters": [ - "/ip4/86.84.0.0/ipcidr/16" - ], - "Reprovider": { - "Strategy": "roots" - }, - "Swarm": { - "EnableHolePunching":false, - "RelayService": { - "Enabled": false - } - } -} diff --git a/packaging/aleph-vm/etc/systemd/system/ipfs.service b/packaging/aleph-vm/etc/systemd/system/ipfs.service index 2009361e3..0708ae8f1 100644 --- a/packaging/aleph-vm/etc/systemd/system/ipfs.service +++ b/packaging/aleph-vm/etc/systemd/system/ipfs.service @@ -50,6 +50,11 @@ ProtectHome=true RemoveIPC=true RestrictSUIDSGID=true CapabilityBoundingSet=CAP_NET_BIND_SERVICE +# set memory limit to avoid taking all the CRN ressource and getting OOM +# https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgrmaxmemory +Environment=GOMEMLIMIT=1900m +MemoryHigh=2G +MemoryMax=4G # enable for 1-1024 port listening #AmbientCapabilities=CAP_NET_BIND_SERVICE @@ -76,7 +81,11 @@ Type=notify User=ipfs Group=ipfs Environment=IPFS_PATH="/var/lib/ipfs" -ExecStart=/opt/kubo/ipfs daemon --init --migrate --init-profile=server --config-file /etc/ipfs/kubo.json +ExecStartPre=/opt/kubo/ipfs init +ExecStartPre=/opt/kubo/ipfs config --json Gateway.PublicGateways '{"localhost": {"UseSubdomains": false, "Paths": ["/ipfs", "/ipns"]}}' +ExecStartPre=/opt/kubo/ipfs config --json Reprovider.Strategy '"roots"' +ExecStartPre=/opt/kubo/ipfs config --json Swarm.ResourceMgr '{"MaxMemory" : "1GB"}' +ExecStart=/opt/kubo/ipfs daemon --migrate=true --init-profile=server Restart=on-failure KillSignal=SIGINT