Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/continuous-integration-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: CI
on:
pull_request:
branches:
- develop
- main
merge_group:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion model/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def problem_stats(user: User, problem: Problem):

@problem_api.post('/<int:problem_id>/migrate-test-case')
@login_required
@identity_verify(0) # admin only
@identity_verify(0) # admin only
@Request.doc('problem_id', 'problem', Problem)
def problem_migrate_test_case(user: User, problem: Problem):
if not problem.permission(user, problem.Permission.MANAGE):
Expand Down
9 changes: 4 additions & 5 deletions model/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,12 @@ def rejudge(user, submission: Submission):
return HTTPError('forbidden.', 403)
try:
success = submission.rejudge()
except FileExistsError:
exit(10086)
except ValueError as e:
return HTTPError(str(e), 400)
except JudgeQueueFullError as e:
return HTTPResponse(str(e), 202)
except ValidationError as e:
return HTTPError(str(e), data=e.to_dict())
return HTTPError(str(e), 422, data=e.to_dict())
if success:
return HTTPResponse(f'{submission} is sent to judgement.')
else:
Expand Down Expand Up @@ -526,14 +524,15 @@ def modify_config(rate_limit, sandbox_instances):
methods = {'GET': get_config, 'PUT': modify_config}
return methods[request.method]()


@submission_api.post('/<submission>/migrate-code')
@login_required
@identity_verify(0)
@Request.doc('submission', Submission)
def migrate_code(user: User, submission: Submission):
if not submission.permission(
user,
Submission.Permission.MANAGER,
user,
Submission.Permission.MANAGER,
):
return HTTPError('forbidden.', 403)

Expand Down
6 changes: 3 additions & 3 deletions mongo/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

FLASK_DEBUG = os.getenv('FLASK_DEBUG', 'False') == "True"
MINIO_HOST = os.getenv('MINIO_HOST', "localhost:9000")
FLASK_DEBUG = os.getenv('FLASK_DEBUG', 'False') == 'True'
MINIO_HOST = os.getenv('MINIO_HOST')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
MINIO_BUCKET = os.getenv('MINIO_BUCKET', "normal-oj-testing")
MINIO_BUCKET = os.getenv('MINIO_BUCKET', 'normal-oj-testing')
7 changes: 6 additions & 1 deletion mongo/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ class CaseResult(EmbeddedDocument):
null=True,
max_size=11**9,
)
output_minio_path = StringField(
null=True,
max_length=256,
db_field='outputMinioPath',
)


class TaskResult(EmbeddedDocument):
Expand Down Expand Up @@ -403,7 +408,7 @@ class Submission(Document):
tasks = EmbeddedDocumentListField(TaskResult, default=list)
exec_time = IntField(default=-1, db_field='runTime')
memory_usage = IntField(default=-1, db_field='memoryUsage')
code = ZipField(required=True, null=True, max_size=10**7)
code = ZipField(null=True, max_size=10**7)
code_minio_path = StringField(
null=True,
max_length=256,
Expand Down
98 changes: 75 additions & 23 deletions mongo/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Optional,
Union,
List,
TypedDict,
)
import enum
import tempfile
Expand Down Expand Up @@ -74,6 +75,14 @@ class SubmissionCodeNotFound(Exception):
'''


class SubmissionResultOutput(TypedDict):
'''
output of a submission result, including stdout and stderr
'''
stdout: str | bytes
stderr: str | bytes


class SubmissionConfig(MongoBase, engine=engine.SubmissionConfig):
TMP_DIR = pathlib.Path(
os.getenv(
Expand Down Expand Up @@ -181,21 +190,41 @@ def get_single_output(
task_no: int,
case_no: int,
text: bool = True,
):
) -> SubmissionResultOutput:
try:
case = self.tasks[task_no].cases[case_no]
except IndexError:
raise FileNotFoundError('task not exist')
ret = {}
try:
with ZipFile(case.output) as zf:
with ZipFile(self._get_output_raw(case)) as zf:
ret = {k: zf.read(k) for k in ('stdout', 'stderr')}
if text:
ret = {k: v.decode('utf-8') for k, v in ret.items()}
except AttributeError:
raise AttributeError('The submission is still in pending')
return ret

def _get_output_raw(self, case: engine.CaseResult) -> io.BytesIO:
'''
get a output blob of a submission result
'''
if case.output_minio_path is not None:
# get from minio
minio_client = MinioClient()
try:
resp = minio_client.client.get_object(
minio_client.bucket,
case.output_minio_path,
)
return io.BytesIO(resp.read())
finally:
if 'resp' in locals():
resp.close()
resp.release_conn()
# fallback to gridfs
return case.output

def delete_output(self, *args):
'''
delete stdout/stderr of this submission
Expand All @@ -206,6 +235,8 @@ def delete_output(self, *args):
for task in self.tasks:
for case in task.cases:
case.output.delete()
case.output_minio_path = None
self.save()

def delete(self, *keeps):
'''
Expand Down Expand Up @@ -328,7 +359,7 @@ def rejudge(self) -> bool:
return True
return self.send()

def _generate_obj_path(self):
def _generate_code_minio_path(self):
return f'submissions/{ULID()}.zip'

def _put_code(self, code_file) -> str:
Expand All @@ -339,7 +370,7 @@ def _put_code(self, code_file) -> str:
raise ValueError(err)

minio_client = MinioClient()
path = self._generate_obj_path()
path = self._generate_code_minio_path()
minio_client.client.put_object(
minio_client.bucket,
path,
Expand Down Expand Up @@ -451,6 +482,7 @@ def process_result(self, tasks: list):
# convert status into integer
case['status'] = self.status2code.get(case['status'], -3)
# process task
minio_client = MinioClient()
for i, cases in enumerate(tasks):
# save stdout/stderr
fds = ['stdout', 'stderr']
Expand All @@ -465,13 +497,22 @@ def process_result(self, tasks: list):
)
zf.writestr(fd, content)
tf.seek(0)
case['output'] = tf
# upload to minio
output_minio_path = self._generate_output_minio_path(i, j)
minio_client.client.put_object(
minio_client.bucket,
output_minio_path,
io.BytesIO(tf.read()),
-1,
part_size=5 * 1024 * 1024, # 5MB
content_type='application/zip',
)
# convert dict to document
cases[j] = engine.CaseResult(
status=case['status'],
exec_time=case['execTime'],
memory_usage=case['memoryUsage'],
output=case['output'],
output_minio_path=output_minio_path,
)
status = max(c.status for c in cases)
exec_time = max(c.exec_time for c in cases)
Expand All @@ -498,6 +539,12 @@ def process_result(self, tasks: list):
self.finish_judging()
return True

def _generate_output_minio_path(self, task_no: int, case_no: int) -> str:
'''
generate a output file path for minio
'''
return f'submissions/task{task_no:02d}_case{case_no:02d}_{ULID()}.zip'

def finish_judging(self):
# update user's submission
User(self.username).add_submission(self)
Expand Down Expand Up @@ -749,15 +796,12 @@ def get_detailed_result(self) -> List[Dict[str, Any]]:
Get all results (including stdout/stderr) of this submission
'''
tasks = [task.to_mongo() for task in self.tasks]
for task in tasks:
for case in task.cases:
# extract zip file
output = case.pop('output', None)
if output is not None:
output = engine.GridFSProxy(output)
with ZipFile(output) as zf:
case['stdout'] = zf.read('stdout').decode('utf-8')
case['stderr'] = zf.read('stderr').decode('utf-8')
for i, task in enumerate(tasks):
for j, case in enumerate(task.cases):
output = self.get_single_output(i, j)
case['stdout'] = output['stdout']
case['stderr'] = output['stderr']
del case['output'] # non-serializable field
return [task.to_dict() for task in tasks]

def _get_code_raw(self):
Expand Down Expand Up @@ -862,18 +906,22 @@ def migrate_code_to_minio(self):
# upload code to minio
if self.code_minio_path is None:
self.logger.info(f"uploading code to minio. submission={self.id}")
self.update(
code_minio_path=self._put_code(self.code),
)
self.update(code_minio_path=self._put_code(self.code), )
self.reload()
self.logger.info(f"code uploaded to minio. submission={self.id} path={self.code_minio_path}")
self.logger.info(
f"code uploaded to minio. submission={self.id} path={self.code_minio_path}"
)

# remove code in gridfs if it is consistent
if self._check_code_consistency():
self.logger.info(f"data consistency validated, removing code in gridfs. submission={self.id}")
self.logger.info(
f"data consistency validated, removing code in gridfs. submission={self.id}"
)
self._remove_code_in_mongodb()
else:
self.logger.warning(f"data inconsistent, keeping code in gridfs. submission={self.id}")
self.logger.warning(
f"data inconsistent, keeping code in gridfs. submission={self.id}"
)

def _remove_code_in_mongodb(self):
self.code.delete()
Expand All @@ -891,7 +939,9 @@ def _check_code_consistency(self):
# if file is deleted but GridFS proxy is not updated
return False
gridfs_checksum = md5(gridfs_code).hexdigest()
self.logger.info(f"calculated grid checksum. submission={self.id} checksum={gridfs_checksum}")
self.logger.info(
f"calculated grid checksum. submission={self.id} checksum={gridfs_checksum}"
)

minio_client = MinioClient()
try:
Expand All @@ -906,5 +956,7 @@ def _check_code_consistency(self):
resp.release_conn()

minio_checksum = md5(minio_code).hexdigest()
self.logger.info(f"calculated minio checksum. submission={self.id} checksum={minio_checksum}")
self.logger.info(
f"calculated minio checksum. submission={self.id} checksum={minio_checksum}"
)
return minio_checksum == gridfs_checksum
11 changes: 6 additions & 5 deletions mongo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from minio import Minio
import redis
from . import engine
from . import config
from .config import FLASK_DEBUG, MINIO_HOST, MINIO_SECRET_KEY, MINIO_ACCESS_KEY, MINIO_BUCKET

if TYPE_CHECKING:
Expand Down Expand Up @@ -176,9 +177,9 @@ class MinioClient:

def __init__(self):
self.client = Minio(
MINIO_HOST,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=not FLASK_DEBUG,
config.MINIO_HOST,
access_key=config.MINIO_ACCESS_KEY,
secret_key=config.MINIO_SECRET_KEY,
secure=not config.FLASK_DEBUG,
)
self.bucket = MINIO_BUCKET
self.bucket = config.MINIO_BUCKET
Loading