Skip to content

Commit ea652f3

Browse files
committed
修复重复下载文件时间间隔过短的文件,大约30/s请求,对主控造成不好的影响
1 parent da2491e commit ea652f3

File tree

3 files changed

+59
-46
lines changed

3 files changed

+59
-46
lines changed

core/cluster.py

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from core.i18n import locale
3333
import aiowebdav.client as webdav3_client
3434
import aiowebdav.exceptions as webdav3_exceptions
35-
from core.exceptions import ClusterIdNotSet, ClusterSecretNotSet
35+
from core.exceptions import ClusterIdNotSet, ClusterSecretNotSet, PutQueueIgnoreError
3636

3737
from core.const import *
3838

@@ -136,6 +136,7 @@ class FileDownloader:
136136
def __init__(self) -> None:
137137
self.files = []
138138
self.queues: asyncio.Queue[BMCLAPIFile] = asyncio.Queue()
139+
self.failed_files: defaultdict[BMCLAPIFile, int] = defaultdict(int)
139140
self.last_modified: int = 0
140141

141142
async def get_files(self) -> list[BMCLAPIFile]:
@@ -215,7 +216,9 @@ async def _download_temporarily_file(self, hash: str):
215216
async def _download(self, pbar: tqdm, lock: asyncio.Semaphore):
216217
async def put(size, file: BMCLAPIFile):
217218
await self.queues.put(file)
219+
self.failed_files[file] += 1
218220
pbar.update(-size)
221+
raise PutQueueIgnoreError # raised ignore error, continue to next code.
219222
async def error(*responses: aiohttp.ClientResponse):
220223
msg = []
221224
history = list((ResponseRedirects(resp.status, str(resp.real_url)) for resp in responses))
@@ -226,53 +229,59 @@ async def error(*responses: aiohttp.ClientResponse):
226229
logger.terror("cluster.error.download.failed", hash=file.hash, size=unit.format_bytes(file.size),
227230
source=source, host=responses[-1].host, status=responses[-1].status, history=history)
228231
while not self.queues.empty() and storages.available:
229-
async with aiohttp.ClientSession(
230-
BASE_URL,
231-
headers={
232-
"User-Agent": USER_AGENT,
233-
"Authorization": f"Bearer {await token.getToken()}",
234-
},
235-
) as session:
232+
try:
236233
file = await self.queues.get()
237-
hash = get_hash(file.hash)
238-
size = 0
239-
content: io.BytesIO = io.BytesIO()
240-
resp = None
241-
try:
242-
async with lock:
243-
resp = await session.get(file.path)
244-
while data := await resp.content.read(IO_BUFFER):
245-
if not data:
246-
break
247-
byte = len(data)
248-
size += byte
249-
pbar.update(byte)
250-
content.write(data)
251-
hash.update(data)
252-
resp.close()
253-
except asyncio.CancelledError:
254-
return
255-
except aiohttp.client_exceptions.ClientConnectionError:
256-
if resp is not None:
257-
await error(*resp.history, resp)
258-
await put(size, file)
259-
continue
260-
except:
261-
logger.error(traceback.format_exc())
262-
if resp is not None:
234+
if file in self.failed_files:
235+
logger.error(f"该文件 {file.hash}({unit.format_bytes(file.size)}) 将在 {DOWNLOAD_RETRY_DELAY} 后执行,已重试 {self.failed_files[file]} 次下载")
236+
try:
237+
await asyncio.sleep(DOWNLOAD_RETRY_DELAY)
238+
except asyncio.CancelledError:
239+
return
240+
async with aiohttp.ClientSession(
241+
BASE_URL,
242+
headers={
243+
"User-Agent": USER_AGENT,
244+
"Authorization": f"Bearer {await token.getToken()}",
245+
},
246+
) as session:
247+
hash = get_hash(file.hash)
248+
size = 0
249+
content: io.BytesIO = io.BytesIO()
250+
resp = None
251+
try:
252+
async with lock:
253+
resp = await session.get(file.path)
254+
while data := await resp.content.read(IO_BUFFER):
255+
if not data:
256+
break
257+
byte = len(data)
258+
size += byte
259+
pbar.update(byte)
260+
content.write(data)
261+
hash.update(data)
262+
resp.close()
263+
except asyncio.CancelledError:
264+
return
265+
except aiohttp.client_exceptions.ClientConnectionError:
266+
if resp is not None:
267+
await error(*resp.history, resp)
268+
await put(size, file)
269+
except:
270+
logger.error(traceback.format_exc())
271+
if resp is not None:
272+
await error(*resp.history, resp)
273+
await put(size, file)
274+
if file.hash != hash.hexdigest():
263275
await error(*resp.history, resp)
264-
await put(size, file)
265-
continue
266-
if file.hash != hash.hexdigest():
267-
await error(*resp.history, resp)
268-
await put(size, file)
269-
await asyncio.sleep(5)
270-
continue
271-
r = await self._mount_file(file, content)
272-
if r[0] == -1:
273-
logger.terror("cluster.error.download.failed_to_upload")
274-
await put(size, file)
275-
continue
276+
await put(size, file)
277+
r = await self._mount_file(file, content)
278+
if r[0] == -1:
279+
logger.terror("cluster.error.download.failed_to_upload")
280+
await put(size, file)
281+
except PutQueueIgnoreError:
282+
...
283+
except Exception as r:
284+
raise r
276285

277286
async def _mount_file(
278287
self, file: BMCLAPIFile, buf: io.BytesIO

core/const.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
}
5656
CLUSTER_PATTERN = re.compile(r'https?://([a-fA-F0-9]*)\.openbmclapi\.933\.moe(:\d+)/')
5757
DOWNLOAD_ACCESS_LOG: bool = True
58+
DOWNLOAD_RETRY_DELAY: int = 60
5859
DOWNLOAD_FILE: bool = False
5960
RESPONSE_DATE = "%a, %d %b %Y %H:%M:%S GMT"
6061
RESPONSE_COMPRESSION_IGNORE_SIZE_THRESHOLD: int = 16777216

core/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ class ServerWebSocketError(WebSocketError): ...
2020

2121

2222
class ServerWebSocketUnknownDataError(ServerWebSocketError): ...
23+
24+
25+
class PutQueueIgnoreError(Exception): ...

0 commit comments

Comments
 (0)