Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import datetime
import hmac
import io
import tempfile
import json
from pathlib import Path
import sys
import time
from typing import Any, Optional
from typing import Any, Optional, BinaryIO
import aiohttp
import anyio
import anyio.abc
Expand Down Expand Up @@ -227,7 +228,7 @@ async def _download_file(
for _ in range(10):
size = 0
hash = utils.get_hash_obj(file.hash)
tmp_file = io.BytesIO()
tmp_file = tempfile.TemporaryFile()
try:
async with session.get(
file.path
Expand All @@ -242,6 +243,7 @@ async def _download_file(
if hash.hexdigest() != file.hash or size != file.size:
await anyio.sleep(50)
raise Exception(f"hash mismatch, got {hash.hexdigest()} expected {file.hash}")
tmp_file.seek(0)
await self.upload_storage(file, tmp_file, size)
self.update_success()
Comment on lines +246 to 248
Copy link

Copilot AI Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The seek(0) call is not within the try block but should be, as it's part of the file processing logic that could fail. If an exception occurs before this line, the file position will be incorrect for the upload_storage call.

Suggested change
tmp_file.seek(0)
await self.upload_storage(file, tmp_file, size)
self.update_success()
tmp_file.seek(0)
await self.upload_storage(file, tmp_file, size)
self.update_success()

Copilot uses AI. Check for mistakes.
except Exception as e:
Expand All @@ -259,7 +261,7 @@ async def _download_file(
async def upload_storage(
self,
file: BMCLAPIFile,
data: io.BytesIO,
data: BinaryIO,
size: int
):
missing_storage = [
Expand Down Expand Up @@ -598,14 +600,19 @@ async def get_files(self) -> list[BMCLAPIFile]:
if resp.status == 204: # no new files
#logger.tdebug("cluster.get_files.no_new_files", id=self.id)
return results
reader = utils.AvroParser(zstd.decompress(await resp.read()))
for _ in range(reader.read_long()):
results.append(BMCLAPIFile(
reader.read_string(),
reader.read_string(),
reader.read_long(),
reader.read_long() / 1000.0,
))
tmp = tempfile.TemporaryFile()
async for chunk in resp.content.iter_chunked(1024 * 1024 * 4):
tmp.write(chunk)
tmp.seek(0)
with zstd.open(tmp, 'rb') as zf:
Copy link

Copilot AI Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temporary file tmp is not properly closed after use. The context manager only handles the zstd file, but the underlying temporary file should also be closed to free resources.

Copilot uses AI. Check for mistakes.
reader = utils.AvroParser(zf)
for _ in range(reader.read_long()):
results.append(BMCLAPIFile(
reader.read_string(),
reader.read_string(),
reader.read_long(),
reader.read_long() / 1000.0,
))
self._last_modified = max(results, key=lambda x: x.mtime).mtime
logger.tdebug("cluster.get_files", id=self.id, name=self.display_name, count=len(results), size=units.format_bytes(sum([f.size for f in results])), last_modified=units.format_datetime_from_timestamp(self._last_modified))
except:
Expand Down
5 changes: 3 additions & 2 deletions core/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ def stop(
):
self._running = 0

def update(
async def update(
self,
):
while self._running:
try:
self.cpus.append(self.process.cpu_percent(interval=1))
self.cpus.append(self.process.cpu_percent(interval=None))

memory = self.process.memory_full_info()

Expand All @@ -83,6 +83,7 @@ def update(
))
except:
break
await anyio.sleep(1)

def get_info(self) -> dict[str, Any]:
return {
Expand Down
4 changes: 2 additions & 2 deletions core/database/memory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import defaultdict
from collections import defaultdict, deque
from datetime import datetime
from typing import Any
from .abc import DataBase, ClusterCounterInfo
Expand All @@ -11,7 +11,7 @@ def __init__(
):
super().__init__(database_name)

self._clusters_logs = []
self._clusters_logs = deque(maxlen=10000)
self._clusters_counters: defaultdict[int, defaultdict[str, defaultdict[str, int]]] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))

async def insert_cluster_info(self, cluster_id: str, type: str, event: str, data: Any | None = None):
Expand Down
18 changes: 14 additions & 4 deletions core/storage/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import anyio.abc
import io
import tempfile
from typing import BinaryIO

from core import utils
from core.abc import BMCLAPIFile, ResponseFile, ResponseFileNotFound, ResponseFileMemory, ResponseFileLocal, ResponseFileRemote
Expand Down Expand Up @@ -151,12 +153,12 @@ async def works(root_ids: list[int]):
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
raise NotImplementedError

async def upload_download_file(self, path: str, data: io.BytesIO, size: int):
async def upload_download_file(self, path: str, data: BinaryIO, size: int):
if self.download_dir:
path = f"download/{path}"
await self.upload(f"download/{path}", data, size)
Expand Down Expand Up @@ -196,12 +198,20 @@ def path(self) -> 'CPath':
async def write_measure(self, size: int):
path = f"measure/{size}"
size = size * 1024 * 1024

tmp = tempfile.TemporaryFile()
chunk = b"\x00" * (1024 * 1024)
remain = size
while remain > 0:
w = min(remain, len(chunk))
tmp.write(chunk[:w])
remain -= w
tmp.seek(0)
await self.upload(
path,
io.BytesIO(b"\x00" * size),
tmp,
size
)
tmp.close()
Comment on lines +201 to +214
Copy link

Copilot AI Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temporary file should be closed in a try-finally block or using a context manager to ensure it's always closed even if an exception occurs during upload.

Copilot uses AI. Check for mistakes.
logger.tsuccess("storage.write_measure", size=int(size / (1024 * 1024)), name=self.name, type=self.type)


Expand Down
6 changes: 3 additions & 3 deletions core/storage/alist.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
import time
from typing import Any
from typing import Any, BinaryIO
import urllib.parse as urlparse
import aiohttp
import anyio.abc
Expand Down Expand Up @@ -174,7 +174,7 @@ async def list_files(self, path: str) -> list[abc.FileInfo]:
))
return res

async def upload(self, path: str, data: io.BytesIO, size: int):
async def upload(self, path: str, data: BinaryIO, size: int):
async with aiohttp.ClientSession(
base_url=self._endpoint,
headers={
Expand All @@ -187,7 +187,7 @@ async def upload(self, path: str, data: io.BytesIO, size: int):
headers={
"File-Path": urlparse.quote(str(self._path / path)),
},
data=data.getbuffer()
data=data
) as resp:
alist_resp = AlistResponse(await resp.json())
alist_resp.raise_for_status()
Expand Down
7 changes: 5 additions & 2 deletions core/storage/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from pathlib import Path
import io
import time
import shutil
from typing import BinaryIO
import anyio.abc

from core.abc import ResponseFile, ResponseFileLocal, ResponseFileNotFound
Expand Down Expand Up @@ -53,13 +55,14 @@ async def list_files(
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
root = Path(str(self.path)) / path
root.parent.mkdir(parents=True, exist_ok=True)
with open(root, "wb") as f:
f.write(data.getbuffer())
data.seek(0)
shutil.copyfileobj(data, f)
return True

async def _check(
Expand Down
7 changes: 4 additions & 3 deletions core/storage/minio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta
import datetime
import io
from typing import BinaryIO
import time
from typing import Optional
import urllib.parse as urlparse
Expand Down Expand Up @@ -92,15 +93,15 @@ async def list_files(
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
root = self.path / path

await self.minio.put_object(
self.bucket,
str(root)[1:],
data.getbuffer(),
data,
size
)
return True
Expand Down
3 changes: 2 additions & 1 deletion core/storage/s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from io import BytesIO
import io
from typing import BinaryIO
import time
import aioboto3.session
import anyio.abc
Expand Down Expand Up @@ -136,7 +137,7 @@ async def list_files(
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
async with self.session.resource(
Expand Down
5 changes: 3 additions & 2 deletions core/storage/webdav.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import time
from typing import BinaryIO
import aiohttp
import aiowebdav.client
import anyio
Expand Down Expand Up @@ -128,10 +129,10 @@ async def _mkdir(self, parent: abc.CPath):
for parent in parent.parents:
await self.client.mkdir(str(parent))

async def upload(self, path: str, data: io.BytesIO, size: int):
async def upload(self, path: str, data: BinaryIO, size: int):
# check dir
await self._mkdir((self._path / path).parent)
await self.client.upload_to(io.BytesIO(data.getbuffer()), str(self._path / path))
await self.client.upload_to(data, str(self._path / path))
return True


Expand Down
9 changes: 7 additions & 2 deletions core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@
V = TypeVar("V")
T = TypeVar("T")

from typing import BinaryIO

class AvroParser:
def __init__(
self,
data: bytes
data: bytes | BinaryIO
):
self.data = io.BytesIO(data)
if isinstance(data, (bytes, bytearray)):
self.data = io.BytesIO(data)
else:
self.data = data

def read_long(self):
result, shift = 0, 0
Expand Down