From 69690af87c4d81896ae141fd43d56a23c8bd6f4f Mon Sep 17 00:00:00 2001 From: RA341 Date: Sun, 30 Mar 2025 18:36:13 -0400 Subject: [PATCH 1/2] switched to an inverted worker pool for better system efficiency --- src/models/worker_semaphore.go | 25 +++++++++ src/service/jobs/job_queue.go | 98 +++++++++++++++------------------- 2 files changed, 69 insertions(+), 54 deletions(-) create mode 100644 src/models/worker_semaphore.go diff --git a/src/models/worker_semaphore.go b/src/models/worker_semaphore.go new file mode 100644 index 0000000..5b42dea --- /dev/null +++ b/src/models/worker_semaphore.go @@ -0,0 +1,25 @@ +package models + +type WorkerSemaphore struct { + semaphore chan struct{} +} + +func NewWorkerSemaphore(maxWorkers int) *WorkerSemaphore { + sem := &WorkerSemaphore{ + semaphore: make(chan struct{}, maxWorkers), + } + for i := 0; i < maxWorkers; i++ { + sem.Release() // fill the semaphore with maxWorkers + } + return sem +} + +// Acquire returns the semaphore channel for select statements +// will block until resource is available +func (s *WorkerSemaphore) Acquire() <-chan struct{} { + return s.semaphore +} + +func (s *WorkerSemaphore) Release() { + s.semaphore <- struct{}{} +} diff --git a/src/service/jobs/job_queue.go b/src/service/jobs/job_queue.go index bd0606e..7c47088 100644 --- a/src/service/jobs/job_queue.go +++ b/src/service/jobs/job_queue.go @@ -2,12 +2,11 @@ package jobs import ( "context" - "errors" "fmt" dk "github.com/docker/docker/api/types/container" "github.com/docker/docker/pkg/stdcopy" "github.com/makeopensource/leviathan/common" - "github.com/makeopensource/leviathan/models" + . "github.com/makeopensource/leviathan/models" "github.com/makeopensource/leviathan/service/docker" "github.com/rs/zerolog/log" "gorm.io/gorm" @@ -17,41 +16,31 @@ import ( ) type JobQueue struct { - jobChannel chan *models.Job - db *gorm.DB - dkSrv *docker.DkService - contextMap *models.Map[string, func()] + jobSemaphore *WorkerSemaphore + db *gorm.DB + dkSrv *docker.DkService + contextMap *Map[string, func()] } func NewJobQueue(totalJobs uint, db *gorm.DB, dk *docker.DkService) *JobQueue { queue := &JobQueue{ - jobChannel: make(chan *models.Job, totalJobs), - contextMap: &models.Map[string, func()]{}, - db: db, - dkSrv: dk, + contextMap: &Map[string, func()]{}, + db: db, + dkSrv: dk, + jobSemaphore: NewWorkerSemaphore(int(totalJobs)), } - queue.spawnWorkers(int(totalJobs)) return queue } -func (q *JobQueue) spawnWorkers(workerCount int) { - for i := 0; i < workerCount; i++ { - go q.worker() - } -} - -func (q *JobQueue) AddJob(mes *models.Job) error { +func (q *JobQueue) AddJob(mes *Job) error { jog(mes.JobCtx).Info().Msg("sending job to queue") err := mes.ValidateForQueue() if err != nil { return common.ErrLog("job validation failed: "+err.Error(), err, jog(mes.JobCtx).Error()) } - // run in go routine in case queue is full and gets blocked - go func() { - q.jobChannel <- mes - }() + go q.worker(mes) return nil } @@ -75,27 +64,28 @@ func (q *JobQueue) CancelJob(messageId string) { cancel() } -func (q *JobQueue) worker() { - for msg := range q.jobChannel { - if msg == nil { - log.Error().Msg("job received was nil, THIS SHOULD NEVER HAPPEN") - continue - } - - if errors.Is(msg.JobCtx.Err(), context.Canceled) { - q.setJobAsCancelled(msg) - q.cleanupJob(msg, nil) - jog(msg.JobCtx).Warn().Msgf("job context was canceled before queue could process") - continue - } +func (q *JobQueue) worker(msg *Job) { + if msg == nil { + log.Error().Msg("job received was nil, THIS SHOULD NEVER HAPPEN") + return + } + select { + case <-q.jobSemaphore.Acquire(): + defer q.jobSemaphore.Release() jog(msg.JobCtx).Info().Msgf("worker is now processing job") q.runJob(msg) + return + case <-msg.JobCtx.Done(): + q.setJobAsCancelled(msg) + q.cleanupJob(msg, nil) + jog(msg.JobCtx).Warn().Msgf("job context was canceled before queue could process") + return } } // runJob should ALWAYS BE BLOCKING, as it prevents the worker from moving on to a new job -func (q *JobQueue) runJob(job *models.Job) { +func (q *JobQueue) runJob(job *Job) { client, contId, err, reason := q.setupJob(job) defer q.cleanupJob(job, client) if err != nil { @@ -155,7 +145,7 @@ func (q *JobQueue) runJob(job *models.Job) { // setupJob Set up job like king, yes! // returns nil client if an error occurred while setup, // make sure to handle null ptr dereference -func (q *JobQueue) setupJob(msg *models.Job) (*docker.DkClient, string, error, string) { +func (q *JobQueue) setupJob(msg *Job) (*docker.DkClient, string, error, string) { q.setJobInSetup(msg) machine, err := q.dkSrv.ClientManager.GetClientById(msg.MachineId) @@ -180,8 +170,8 @@ func (q *JobQueue) setupJob(msg *models.Job) (*docker.DkClient, string, error, s } resources := dk.Resources{ - NanoCPUs: msg.LabData.JobLimits.NanoCPU * models.CPUQuota, - Memory: msg.LabData.JobLimits.Memory * models.MB, + NanoCPUs: msg.LabData.JobLimits.NanoCPU * CPUQuota, + Memory: msg.LabData.JobLimits.Memory * MB, PidsLimit: &msg.LabData.JobLimits.PidsLimit, } @@ -206,7 +196,7 @@ func (q *JobQueue) setupJob(msg *models.Job) (*docker.DkClient, string, error, s // cleanupJob clean up job, // updates job in DB, removes the container and associated tmp job data -func (q *JobQueue) cleanupJob(msg *models.Job, client *docker.DkClient) { +func (q *JobQueue) cleanupJob(msg *Job, client *docker.DkClient) { jog(msg.JobCtx).Info().Msg("cleaning up job") q.updateJobVeryNice(msg) @@ -231,9 +221,9 @@ func (q *JobQueue) cleanupJob(msg *models.Job, client *docker.DkClient) { // Very nice! // // jobResult is the last line expected to be valid json string, returned to the job caller -func (q *JobQueue) greatSuccess(job *models.Job, jobResult string) { +func (q *JobQueue) greatSuccess(job *Job, jobResult string) { jog(job.JobCtx).Info().Msg("job completed successfully") - job.Status = models.Complete + job.Status = Complete job.StatusMessage = jobResult } @@ -244,46 +234,46 @@ func (q *JobQueue) greatSuccess(job *models.Job, jobResult string) { // The publicReason will be displayed to the end user, providing a user-friendly message. // // The err parameter holds the underlying error, used for debugging purposes. -func (q *JobQueue) bigProblem(job *models.Job, publicReason string, err error) { +func (q *JobQueue) bigProblem(job *Job, publicReason string, err error) { jog(job.JobCtx).Error().Err(err).Str("reason", publicReason).Msg("job failed") - job.Status = models.Failed + job.Status = Failed job.StatusMessage = publicReason if err != nil { job.Error = err.Error() } } -func (q *JobQueue) setJobAsCancelled(job *models.Job) { +func (q *JobQueue) setJobAsCancelled(job *Job) { jog(job.JobCtx).Info().Msg("job was cancelled") - job.Status = models.Canceled + job.Status = Canceled job.StatusMessage = "Job was cancelled" } // setJobInProgress set job status as models.Running // // Job is in progress, success soon! -func (q *JobQueue) setJobInProgress(msg *models.Job) { - msg.Status = models.Running +func (q *JobQueue) setJobInProgress(msg *Job) { + msg.Status = Running q.updateJobVeryNice(msg) } // setJobInSetup set job status as models.Preparing // // job is being setup standby -func (q *JobQueue) setJobInSetup(msg *models.Job) { - msg.Status = models.Preparing +func (q *JobQueue) setJobInSetup(msg *Job) { + msg.Status = Preparing q.updateJobVeryNice(msg) } // updateJobVeryNice Database updated, fresh like new wife! -func (q *JobQueue) updateJobVeryNice(msg *models.Job) { +func (q *JobQueue) updateJobVeryNice(msg *Job) { res := q.db.Save(msg) if res.Error != nil { jog(msg.JobCtx).Error().Err(res.Error).Msg("error occurred while saving job to db") } } -func writeLogs(client *docker.DkClient, msg *models.Job) (string, error) { +func writeLogs(client *docker.DkClient, msg *Job) (string, error) { outputFile, err := os.OpenFile(msg.OutputLogFilePath, os.O_RDWR|os.O_CREATE, 0660) if err != nil { return "unable to open log file", err @@ -308,8 +298,8 @@ func writeLogs(client *docker.DkClient, msg *models.Job) (string, error) { return "", nil } -func verifyLogs(msg *models.Job) (string, string, error) { - if msg.Status == models.Failed { +func verifyLogs(msg *Job) (string, string, error) { + if msg.Status == Failed { return "", "Job failed, skipping parsing log file", nil } From 51e07257562f88a5552a8f963f4db625ef6d72fc Mon Sep 17 00:00:00 2001 From: RA341 Date: Sun, 30 Mar 2025 21:16:39 -0400 Subject: [PATCH 2/2] fix linter errors --- src/api/v1/file_manager_impl.go | 43 +++--- src/common/logger.go | 4 +- src/common/utils.go | 5 +- src/models/job_error.go | 35 +++++ src/service/docker/docker_utils_ssh.go | 10 +- .../file_manager/file_manager_service.go | 24 ++-- src/service/jobs/job_queue.go | 136 ++++++++---------- src/service/jobs/job_service.go | 46 +++--- src/service/jobs/job_service_tango_test.go | 8 +- src/service/labs/lab_service.go | 24 ++-- 10 files changed, 180 insertions(+), 155 deletions(-) create mode 100644 src/models/job_error.go diff --git a/src/api/v1/file_manager_impl.go b/src/api/v1/file_manager_impl.go index d1c0f15..cd83561 100644 --- a/src/api/v1/file_manager_impl.go +++ b/src/api/v1/file_manager_impl.go @@ -2,8 +2,8 @@ package v1 import ( "encoding/json" - . "github.com/makeopensource/leviathan/common" - . "github.com/makeopensource/leviathan/service/file_manager" + com "github.com/makeopensource/leviathan/common" + fm "github.com/makeopensource/leviathan/service/file_manager" "github.com/rs/zerolog/log" "mime/multipart" "net/http" @@ -20,7 +20,7 @@ type FileManagerHandler struct { BasePath string UploadLabPath string UploadSubmissionPath string - service FileManagerService + service fm.FileManagerService } func NewFileManagerHandler(basePath string) *FileManagerHandler { @@ -28,7 +28,7 @@ func NewFileManagerHandler(basePath string) *FileManagerHandler { BasePath: basePath, UploadLabPath: basePath + "/upload/lab", UploadSubmissionPath: basePath + "/upload/submission", - service: FileManagerService{}, + service: fm.FileManagerService{}, } } @@ -44,7 +44,6 @@ func (f *FileManagerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusMethodNotAllowed) - return } func (f *FileManagerHandler) UploadLabData(w http.ResponseWriter, r *http.Request) { @@ -58,12 +57,12 @@ func (f *FileManagerHandler) UploadLabData(w http.ResponseWriter, r *http.Reques if err != nil { http.Error( w, - ErrLog("Failed to get dockerfile in form", err, log.Error()).Error(), + com.ErrLog("Failed to get dockerfile in form", err, log.Error()).Error(), http.StatusBadRequest, ) return } - defer CloseFile(dockerFile) + defer com.CloseFile(dockerFile) jobFiles, ok := r.MultipartForm.File[LabFilesKey] if !ok || len(jobFiles) == 0 { @@ -76,9 +75,9 @@ func (f *FileManagerHandler) UploadLabData(w http.ResponseWriter, r *http.Reques http.Error(w, err.Error(), http.StatusBadRequest) return } - defer func(files []*FileInfo) { + defer func(files []*fm.FileInfo) { for _, file := range files { - CloseFile(file.Reader) + com.CloseFile(file.Reader) } }(fileInfos) @@ -88,7 +87,7 @@ func (f *FileManagerHandler) UploadLabData(w http.ResponseWriter, r *http.Reques return } - sendResponse(w, err, folderID) + sendResponse(w, folderID) } func (f *FileManagerHandler) UploadSubmissionData(w http.ResponseWriter, r *http.Request) { @@ -109,9 +108,9 @@ func (f *FileManagerHandler) UploadSubmissionData(w http.ResponseWriter, r *http http.Error(w, err.Error(), http.StatusBadRequest) return } - defer func(files []*FileInfo) { + defer func(files []*fm.FileInfo) { for _, file := range files { - CloseFile(file.Reader) + com.CloseFile(file.Reader) } }(fileInfos) @@ -121,11 +120,11 @@ func (f *FileManagerHandler) UploadSubmissionData(w http.ResponseWriter, r *http return } - sendResponse(w, err, folderID) + sendResponse(w, folderID) } -func sendResponse(w http.ResponseWriter, err error, folderID string) { - jsonData, err := toJson(folderID, err) +func sendResponse(w http.ResponseWriter, folderID string) { + jsonData, err := toJson(folderID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -135,32 +134,32 @@ func sendResponse(w http.ResponseWriter, err error, folderID string) { if err != nil { http.Error( w, - ErrLog("Failed to write response", err, log.Error()).Error(), + com.ErrLog("Failed to write response", err, log.Error()).Error(), http.StatusInternalServerError, ) return } } -func toJson(folderID string, err error) ([]byte, error) { +func toJson(folderID string) ([]byte, error) { resultMap := map[string]string{ "folderId": folderID, } jsonData, err := json.Marshal(resultMap) if err != nil { - return nil, ErrLog("Failed to marshal json", err, log.Error()) + return nil, com.ErrLog("Failed to marshal json", err, log.Error()) } return jsonData, nil } -func mapToFileInfo(jobFiles []*multipart.FileHeader) ([]*FileInfo, error) { - var fileInfos []*FileInfo +func mapToFileInfo(jobFiles []*multipart.FileHeader) ([]*fm.FileInfo, error) { + var fileInfos []*fm.FileInfo for _, jobFile := range jobFiles { file, err := jobFile.Open() if err != nil { - return fileInfos, ErrLog("unable to open file: "+err.Error(), err, log.Error()) + return fileInfos, com.ErrLog("unable to open file: "+err.Error(), err, log.Error()) } - fileInfos = append(fileInfos, &FileInfo{ + fileInfos = append(fileInfos, &fm.FileInfo{ Reader: file, Filename: jobFile.Filename, }) diff --git a/src/common/logger.go b/src/common/logger.go index ab1f086..441542d 100644 --- a/src/common/logger.go +++ b/src/common/logger.go @@ -52,8 +52,8 @@ func FileConsoleLogger() zerolog.Logger { // // This hides implementation details from users while ensuring full error information is available for debugging. func ErrLog(message string, err error, eventLevel *zerolog.Event) error { - eventLevel.Err(err).Msgf(message) - return fmt.Errorf(message) + eventLevel.Err(err).Msg(message) + return fmt.Errorf("%s", message) } func ConsoleLogger() zerolog.Logger { diff --git a/src/common/utils.go b/src/common/utils.go index 528303c..b90dfa1 100644 --- a/src/common/utils.go +++ b/src/common/utils.go @@ -11,10 +11,7 @@ import ( func FileExists(filename string) bool { _, err := os.Stat(filename) - if os.IsNotExist(err) { - return false - } - return true + return !os.IsNotExist(err) } func CloseFile(file io.ReadCloser) { diff --git a/src/models/job_error.go b/src/models/job_error.go new file mode 100644 index 0000000..a5a967e --- /dev/null +++ b/src/models/job_error.go @@ -0,0 +1,35 @@ +package models + +type JobError interface { + // Reason will be displayed to the end user, providing a user-friendly message. + Reason() string + // Err err parameter holds the underlying error, used for debugging purposes. + Err() error + // ErrStr returns string from the error, if nil return empty string + ErrStr() string +} + +// JErr implements JobError +type JErr struct { + reason string + err error +} + +func JError(reason string, err error) JErr { + return JErr{reason: reason, err: err} +} + +func (err JErr) Reason() string { + return err.reason +} + +func (err JErr) Err() error { + return err.err +} + +func (err JErr) ErrStr() string { + if err.err != nil { + return err.err.Error() + } + return "" +} diff --git a/src/service/docker/docker_utils_ssh.go b/src/service/docker/docker_utils_ssh.go index 2ee01bf..d585dbb 100644 --- a/src/service/docker/docker_utils_ssh.go +++ b/src/service/docker/docker_utils_ssh.go @@ -9,7 +9,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" - . "github.com/makeopensource/leviathan/common" + com "github.com/makeopensource/leviathan/common" "github.com/makeopensource/leviathan/models" "github.com/rs/zerolog/log" "github.com/spf13/viper" @@ -70,7 +70,7 @@ func saveHostKey(machine models.MachineOptions) func(hostname string, remote net } func writeMachineToConfigFile(machine models.MachineOptions) { - machineKey := fmt.Sprintf("%s.%s", ClientsSSH.ConfigKey, machine.Name()) + machineKey := fmt.Sprintf("%s.%s", com.ClientsSSH.ConfigKey, machine.Name()) viper.Set(machineKey, machine) err := viper.WriteConfig() if err != nil { @@ -109,7 +109,7 @@ func GenerateKeyPair() (privateKey []byte, publicKey []byte, err error) { // // the generated keys can be found in common.SSHConfigFolder func initKeyPairFile() { - basePath := SSHConfigFolder.GetStr() + basePath := com.SSHConfigFolder.GetStr() privateKeyPath := fmt.Sprintf("%s/%s", basePath, "id_rsa") publicKeyPath := fmt.Sprintf("%s/%s", basePath, "id_rsa.pub") @@ -120,7 +120,7 @@ func initKeyPairFile() { Str("private_key_file", privateKeyPath). Str("public_key_file", publicKeyPath) - if FileExists(privateKeyPath) && FileExists(publicKeyPath) { + if com.FileExists(privateKeyPath) && com.FileExists(publicKeyPath) { logF.Msg("found existing keys... skipping generation") return } @@ -143,7 +143,7 @@ func initKeyPairFile() { func LoadPrivateKey() ([]byte, error) { return os.ReadFile(fmt.Sprintf( "%s/%s", - SSHConfigFolder.GetStr(), + com.SSHConfigFolder.GetStr(), "id_rsa", )) } diff --git a/src/service/file_manager/file_manager_service.go b/src/service/file_manager/file_manager_service.go index 8866662..5c5f55c 100644 --- a/src/service/file_manager/file_manager_service.go +++ b/src/service/file_manager/file_manager_service.go @@ -3,7 +3,7 @@ package file_manager import ( "fmt" "github.com/google/uuid" - . "github.com/makeopensource/leviathan/common" + com "github.com/makeopensource/leviathan/common" "github.com/rs/zerolog/log" "io" "os" @@ -36,7 +36,7 @@ func (f *FileManagerService) CreateTmpLabFolder(dockerfile io.Reader, jobFiles . jobDataDir := filepath.Join(basePath, JobDataFolderName) err = os.MkdirAll(jobDataDir, os.ModePerm) if err != nil { - return "", ErrLog("unable to create job data folder", err, log.Error()) + return "", com.ErrLog("unable to create job data folder", err, log.Error()) } if err = f.SaveFile(basePath, DockerfileName, dockerfile); err != nil { @@ -71,14 +71,14 @@ func (f *FileManagerService) CreateSubmissionFolder(jobFiles ...*FileInfo) (stri func (f *FileManagerService) createBaseFolder() (string, string, error) { folderUUID, err := uuid.NewUUID() if err != nil { - return "", "", ErrLog("Unable to generate uuid", err, log.Error()) + return "", "", com.ErrLog("Unable to generate uuid", err, log.Error()) } stringUuid := folderUUID.String() - basePath := filepath.Join(TmpUploadFolder.GetStr(), stringUuid) + basePath := filepath.Join(com.TmpUploadFolder.GetStr(), stringUuid) - err = os.Mkdir(basePath, DefaultFilePerm) + err = os.Mkdir(basePath, com.DefaultFilePerm) if err != nil { - return "", "", ErrLog("Unable to create tmp folder", err, log.Error()) + return "", "", com.ErrLog("Unable to create tmp folder", err, log.Error()) } return folderUUID.String(), basePath, nil @@ -90,7 +90,7 @@ func (f *FileManagerService) SaveFile(basePath string, filename string, file io. dst, err := os.Create(fPath) if err != nil { - return ErrLog( + return com.ErrLog( "Failed to create destination file", err, log.Error(), @@ -106,7 +106,7 @@ func (f *FileManagerService) SaveFile(basePath string, filename string, file io. // Copy the file contents written, err := io.Copy(dst, file) if err != nil { - return ErrLog( + return com.ErrLog( "Failed to write file", err, log.Error(), @@ -122,14 +122,14 @@ func (f *FileManagerService) SaveFile(basePath string, filename string, file io. } func (f *FileManagerService) DeleteFolder(folderUuid string) { - basePath := filepath.Join(TmpUploadFolder.GetStr(), folderUuid) + basePath := filepath.Join(com.TmpUploadFolder.GetStr(), folderUuid) if err := os.RemoveAll(basePath); err != nil { log.Warn().Err(err).Msgf("failed to delete tmp folder %s", folderUuid) } } func (f *FileManagerService) GetLabFilePaths(folderUuid string) (basePath string, err error) { - basePath = filepath.Join(TmpUploadFolder.GetStr(), folderUuid) + basePath = filepath.Join(com.TmpUploadFolder.GetStr(), folderUuid) jobData := filepath.Join(basePath, JobDataFolderName) dockerFile := filepath.Join(basePath, DockerfileName) @@ -144,12 +144,12 @@ func (f *FileManagerService) GetLabFilePaths(folderUuid string) (basePath string } func (f *FileManagerService) GetSubmissionPath(uuid string) (string, error) { - path := filepath.Join(TmpUploadFolder.GetStr(), uuid) + path := filepath.Join(com.TmpUploadFolder.GetStr(), uuid) return f.checkFolder(path) } func (f *FileManagerService) checkFolder(path string) (jobData string, err error) { - if !FileExists(path) { + if !com.FileExists(path) { return "", fmt.Errorf("could not find path") } return path, err diff --git a/src/service/jobs/job_queue.go b/src/service/jobs/job_queue.go index 7c47088..f405f3d 100644 --- a/src/service/jobs/job_queue.go +++ b/src/service/jobs/job_queue.go @@ -6,7 +6,7 @@ import ( dk "github.com/docker/docker/api/types/container" "github.com/docker/docker/pkg/stdcopy" "github.com/makeopensource/leviathan/common" - . "github.com/makeopensource/leviathan/models" + md "github.com/makeopensource/leviathan/models" "github.com/makeopensource/leviathan/service/docker" "github.com/rs/zerolog/log" "gorm.io/gorm" @@ -16,24 +16,24 @@ import ( ) type JobQueue struct { - jobSemaphore *WorkerSemaphore + jobSemaphore *md.WorkerSemaphore db *gorm.DB dkSrv *docker.DkService - contextMap *Map[string, func()] + contextMap *md.Map[string, func()] } func NewJobQueue(totalJobs uint, db *gorm.DB, dk *docker.DkService) *JobQueue { queue := &JobQueue{ - contextMap: &Map[string, func()]{}, + contextMap: &md.Map[string, func()]{}, db: db, dkSrv: dk, - jobSemaphore: NewWorkerSemaphore(int(totalJobs)), + jobSemaphore: md.NewWorkerSemaphore(int(totalJobs)), } return queue } -func (q *JobQueue) AddJob(mes *Job) error { +func (q *JobQueue) AddJob(mes *md.Job) error { jog(mes.JobCtx).Info().Msg("sending job to queue") err := mes.ValidateForQueue() if err != nil { @@ -64,7 +64,7 @@ func (q *JobQueue) CancelJob(messageId string) { cancel() } -func (q *JobQueue) worker(msg *Job) { +func (q *JobQueue) worker(msg *md.Job) { if msg == nil { log.Error().Msg("job received was nil, THIS SHOULD NEVER HAPPEN") return @@ -85,56 +85,48 @@ func (q *JobQueue) worker(msg *Job) { } // runJob should ALWAYS BE BLOCKING, as it prevents the worker from moving on to a new job -func (q *JobQueue) runJob(job *Job) { - client, contId, err, reason := q.setupJob(job) +func (q *JobQueue) runJob(job *md.Job) { + client, contId, err := q.setupJob(job) defer q.cleanupJob(job, client) if err != nil { - q.bigProblem(job, reason, err) + q.bigProblem(job, err) return } - logStatusCh := make(chan struct { - message string - err error - }) + logStatusCh := make(chan md.JobError, 1) q.setJobInProgress(job) - err = client.StartContainer(contId) - if err != nil { - q.bigProblem(job, "unable to start job container", err) + err2 := client.StartContainer(contId) + if err2 != nil { + q.bigProblem(job, md.JError("unable to start job container", err2)) return } // start writing to log file so that // we can stream changes to the log file to the user go func() { - statusMessage, err2 := writeLogs(client, job) - logStatusCh <- struct { - message string - err error - }{message: statusMessage, err: err2} + logStatusCh <- writeLogs(client, job) }() statusCh, errCh := client.Client.ContainerWait(context.Background(), contId, dk.WaitConditionNotRunning) select { case <-statusCh: - mes := <-logStatusCh - if mes.err != nil { - q.bigProblem(job, mes.message, mes.err) + if mes := <-logStatusCh; mes != nil { + q.bigProblem(job, mes) return } - logLine, errMessage, err := verifyLogs(job) - if err != nil || errMessage != "" { - q.bigProblem(job, errMessage, err) + logLine, err := verifyLogs(job) + if err != nil { + q.bigProblem(job, err) return } q.greatSuccess(job, logLine) return case err := <-errCh: - q.bigProblem(job, "error occurred while waiting for job process", err) + q.bigProblem(job, md.JError("error occurred while waiting for job process", err)) return case <-time.After(job.LabData.JobTimeout): - q.bigProblem(job, fmt.Sprintf("Maximum timeout reached for job, job ran for %s", job.LabData.JobTimeout), nil) + q.bigProblem(job, md.JError(timeoutErrLog(job.LabData.JobTimeout), nil)) return case <-job.JobCtx.Done(): q.setJobAsCancelled(job) @@ -142,61 +134,65 @@ func (q *JobQueue) runJob(job *Job) { } } +func timeoutErrLog(timeout time.Duration) string { + return fmt.Sprintf("Maximum timeout reached for job, job ran for %s", timeout) +} + // setupJob Set up job like king, yes! // returns nil client if an error occurred while setup, // make sure to handle null ptr dereference -func (q *JobQueue) setupJob(msg *Job) (*docker.DkClient, string, error, string) { +func (q *JobQueue) setupJob(msg *md.Job) (*docker.DkClient, string, md.JobError) { q.setJobInSetup(msg) machine, err := q.dkSrv.ClientManager.GetClientById(msg.MachineId) if err != nil { - return nil, "", err, "Failed to get machine info" + return nil, "", md.JError("Failed to get machine info", err) } // incase dockerfile is not passed and referenced via tag name if msg.LabData.DockerFilePath != "" { err = machine.BuildImageFromDockerfile(msg.LabData.DockerFilePath, msg.LabData.ImageTag) if err != nil { - return nil, "", err, "Failed to create image" + return nil, "", md.JError("Failed to create image", err) } // folder structure is '//autolab/Dockerfile, get the folder path parent := filepath.Base(filepath.Dir(filepath.Dir(msg.LabData.DockerFilePath))) if parent != "labs" { // do not delete if job is from a saved lab if err = os.RemoveAll(parent); err != nil { - return nil, "", err, "failed to delete dockerfile" + return nil, "", md.JError("failed to delete dockerfile", err) } } } resources := dk.Resources{ - NanoCPUs: msg.LabData.JobLimits.NanoCPU * CPUQuota, - Memory: msg.LabData.JobLimits.Memory * MB, + NanoCPUs: msg.LabData.JobLimits.NanoCPU * md.CPUQuota, + Memory: msg.LabData.JobLimits.Memory * md.MB, PidsLimit: &msg.LabData.JobLimits.PidsLimit, } contId, err := machine.CreateNewContainer(msg.JobId, msg.LabData.ImageTag, filepath.Base(msg.TmpJobFolderPath), msg.LabData.JobEntryCmd, resources) if err != nil { - return nil, "", err, "unable to create job container" + return nil, "", md.JError("unable to create job container", err) } err = machine.CopyToContainer(contId, msg.TmpJobFolderPath) if err != nil { - return nil, "", err, "unable to copy files to job container" + return nil, "", md.JError("unable to copy files to job container", err) } msg.ContainerId = contId res := q.db.Save(msg) if res.Error != nil { - return nil, "", err, "unable to update job in db" + return nil, "", md.JError("unable to update job in db", err) } - return machine, contId, nil, "" + return machine, contId, nil } // cleanupJob clean up job, // updates job in DB, removes the container and associated tmp job data -func (q *JobQueue) cleanupJob(msg *Job, client *docker.DkClient) { +func (q *JobQueue) cleanupJob(msg *md.Job, client *docker.DkClient) { jog(msg.JobCtx).Info().Msg("cleaning up job") q.updateJobVeryNice(msg) @@ -221,62 +217,56 @@ func (q *JobQueue) cleanupJob(msg *Job, client *docker.DkClient) { // Very nice! // // jobResult is the last line expected to be valid json string, returned to the job caller -func (q *JobQueue) greatSuccess(job *Job, jobResult string) { +func (q *JobQueue) greatSuccess(job *md.Job, jobResult string) { jog(job.JobCtx).Info().Msg("job completed successfully") - job.Status = Complete + job.Status = md.Complete job.StatusMessage = jobResult } // bigProblem set job status to models.Failed // // job failed, Not good! -// -// The publicReason will be displayed to the end user, providing a user-friendly message. -// -// The err parameter holds the underlying error, used for debugging purposes. -func (q *JobQueue) bigProblem(job *Job, publicReason string, err error) { - jog(job.JobCtx).Error().Err(err).Str("reason", publicReason).Msg("job failed") - job.Status = Failed - job.StatusMessage = publicReason - if err != nil { - job.Error = err.Error() - } +func (q *JobQueue) bigProblem(job *md.Job, jErr md.JobError) { + jog(job.JobCtx).Error().Err(jErr.Err()).Str("reason", jErr.Reason()).Msg("job failed") + job.Status = md.Failed + job.StatusMessage = jErr.Reason() + job.Error = jErr.ErrStr() } -func (q *JobQueue) setJobAsCancelled(job *Job) { +func (q *JobQueue) setJobAsCancelled(job *md.Job) { jog(job.JobCtx).Info().Msg("job was cancelled") - job.Status = Canceled + job.Status = md.Canceled job.StatusMessage = "Job was cancelled" } // setJobInProgress set job status as models.Running // // Job is in progress, success soon! -func (q *JobQueue) setJobInProgress(msg *Job) { - msg.Status = Running +func (q *JobQueue) setJobInProgress(msg *md.Job) { + msg.Status = md.Running q.updateJobVeryNice(msg) } // setJobInSetup set job status as models.Preparing // // job is being setup standby -func (q *JobQueue) setJobInSetup(msg *Job) { - msg.Status = Preparing +func (q *JobQueue) setJobInSetup(msg *md.Job) { + msg.Status = md.Preparing q.updateJobVeryNice(msg) } // updateJobVeryNice Database updated, fresh like new wife! -func (q *JobQueue) updateJobVeryNice(msg *Job) { +func (q *JobQueue) updateJobVeryNice(msg *md.Job) { res := q.db.Save(msg) if res.Error != nil { jog(msg.JobCtx).Error().Err(res.Error).Msg("error occurred while saving job to db") } } -func writeLogs(client *docker.DkClient, msg *Job) (string, error) { +func writeLogs(client *docker.DkClient, msg *md.Job) md.JobError { outputFile, err := os.OpenFile(msg.OutputLogFilePath, os.O_RDWR|os.O_CREATE, 0660) if err != nil { - return "unable to open log file", err + return md.JError("unable to open log file", err) } defer func() { @@ -288,24 +278,24 @@ func writeLogs(client *docker.DkClient, msg *Job) (string, error) { logs, err := client.TailContainerLogs(context.Background(), msg.ContainerId) if err != nil { - return "unable to tail job container", err + return md.JError("unable to tail job container", err) } _, err = stdcopy.StdCopy(outputFile, outputFile, logs) if err != nil { - return "unable to write to log file", err + return md.JError("unable to write to log file", err) } - return "", nil + return nil } -func verifyLogs(msg *Job) (string, string, error) { - if msg.Status == Failed { - return "", "Job failed, skipping parsing log file", nil +func verifyLogs(msg *md.Job) (string, md.JobError) { + if msg.Status == md.Failed { + return "", md.JError("Job failed, skipping parsing log file", nil) } outputFile, err := os.Open(msg.OutputLogFilePath) if err != nil { - return "", "unable to open log file", err + return "", md.JError("unable to open log file", err) } defer func(open *os.File) { err := open.Close() @@ -316,11 +306,11 @@ func verifyLogs(msg *Job) (string, string, error) { line, err := GetLastLine(outputFile) if err != nil { - return "", "unable to get logs", err + return "", md.JError("unable to get logs", err) } if !IsValidJSON(line) { - return "", "unable to parse log output", nil + return "", md.JError("unable to parse log output", err) } - return line, "", nil + return line, nil } diff --git a/src/service/jobs/job_service.go b/src/service/jobs/job_service.go index 2bdf9e7..4616a83 100644 --- a/src/service/jobs/job_service.go +++ b/src/service/jobs/job_service.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "github.com/google/uuid" - . "github.com/makeopensource/leviathan/common" + com "github.com/makeopensource/leviathan/common" "github.com/makeopensource/leviathan/models" "github.com/makeopensource/leviathan/service/docker" "github.com/makeopensource/leviathan/service/file_manager" @@ -37,7 +37,7 @@ func NewJobService( db: db, broadcastCh: bc, dockerSrv: dockerService, - queue: NewJobQueue(uint(ConcurrentJobs.GetUint64()), db, dockerService), + queue: NewJobQueue(uint(com.ConcurrentJobs.GetUint64()), db, dockerService), labSrv: labService, fileManSrv: tmpFileService, } @@ -56,7 +56,7 @@ func (job *JobService) NewJob(newJob *models.Job, submissionFolderId string) (st jobId, err := uuid.NewUUID() if err != nil { - return "", ErrLog("failed to generate job ID", err, log.Error()) + return "", com.ErrLog("failed to generate job ID", err, log.Error()) } newJob.JobId = jobId.String() @@ -65,9 +65,9 @@ func (job *JobService) NewJob(newJob *models.Job, submissionFolderId string) (st // job context, so that it can be cancelled, and store sub logger ctx := job.queue.NewJobContext(newJob.JobId) - jobDir, err := CreateTmpJobDir(newJob.JobId, SubmissionFolder.GetStr()) + jobDir, err := CreateTmpJobDir(newJob.JobId, com.SubmissionFolder.GetStr()) if err != nil { - return "", ErrLog("failed to create job dir", err, jog(ctx).Error()) + return "", com.ErrLog("failed to create job dir", err, jog(ctx).Error()) } submissionFolder, err := job.fileManSrv.GetSubmissionPath(submissionFolderId) @@ -76,16 +76,20 @@ func (job *JobService) NewJob(newJob *models.Job, submissionFolderId string) (st } defer job.fileManSrv.DeleteFolder(submissionFolderId) - if err = HardLinkFolder(submissionFolder, jobDir); err != nil { - return "", ErrLog("unable to copy files to job dir", err, log.Error()) + if err = com.HardLinkFolder(submissionFolder, jobDir); err != nil { + return "", com.ErrLog("unable to copy files to job dir", err, log.Error()) } - if err = HardLinkFolder(newJob.LabData.JobFilesDirPath, jobDir); err != nil { - return "", ErrLog("unable to copy files to job dir", err, log.Error()) + if err = com.HardLinkFolder(newJob.LabData.JobFilesDirPath, jobDir); err != nil { + return "", com.ErrLog("unable to copy files to job dir", err, log.Error()) } - logPath, err, reason := setupLogFile(newJob.JobId) - if err != nil { - return "", ErrLog("failed to setup log file: "+reason, err, jog(ctx).Error().Str("reason", reason)) + logPath, err2 := setupLogFile(newJob.JobId) + if err2 != nil { + return "", com.ErrLog( + "failed to setup log file: "+err2.Reason(), + err, + jog(ctx).Error().Str("reason", err2.Reason()), + ) } // setup job metadata @@ -99,7 +103,7 @@ func (job *JobService) NewJob(newJob *models.Job, submissionFolderId string) (st res := job.db.Create(newJob) if res.Error != nil { - return "", ErrLog("failed to save job to db", res.Error, jog(ctx).Error()) + return "", com.ErrLog("failed to save job to db", res.Error, jog(ctx).Error()) } err = job.queue.AddJob(newJob) @@ -220,13 +224,13 @@ func (job *JobService) ListenToJobLogs(ctx context.Context, jobInfo *models.Job) content := ReadLogFile(jobInfo.OutputLogFilePath) contLen := len(content) if contLen > prevLength { // send if content changed - log.Debug().Str(JobLogKey, jobInfo.JobId).Msgf("sending log, length changed from %d to %d", prevLength, contLen) + log.Debug().Str(com.JobLogKey, jobInfo.JobId).Msgf("sending log, length changed from %d to %d", prevLength, contLen) prevLength = contLen logChannel <- content } case <-ctx.Done(): close(logChannel) - log.Debug().Str(JobLogKey, jobInfo.JobId).Msg("stopping listening for logs") + log.Debug().Str(com.JobLogKey, jobInfo.JobId).Msg("stopping listening for logs") return } } @@ -250,7 +254,7 @@ func (job *JobService) checkJob(jobUuid string) (*models.Job, bool, string, erro } if jobInf.Status.Done() { - log.Debug().Str(JobLogKey, jobUuid).Msg("job is already done") + log.Debug().Str(com.JobLogKey, jobUuid).Msg("job is already done") content := ReadLogFile(jobInf.OutputLogFilePath) return jobInf, true, content, nil @@ -328,11 +332,11 @@ func (job *JobService) cleanupOrphanJobs() { } // setupLogFile store grader output -func setupLogFile(jobId string) (string, error, string) { - outputFile := fmt.Sprintf("%s/%s.txt", OutputFolder.GetStr(), jobId) +func setupLogFile(jobId string) (string, models.JobError) { + outputFile := fmt.Sprintf("%s/%s.txt", com.OutputFolder.GetStr(), jobId) outFile, err := os.Create(outputFile) if err != nil { - return "", err, fmt.Sprintf("error while creating log file at %s", outputFile) + return "", models.JError(fmt.Sprintf("error while creating log file at %s", outputFile), err) } defer func() { err := outFile.Close() @@ -343,10 +347,10 @@ func setupLogFile(jobId string) (string, error, string) { full, err := filepath.Abs(outputFile) if err != nil { - return "", err, "error while getting absolute path" + return "", models.JError("error while getting absolute path", err) } - return full, nil, "" + return full, nil } func (job *JobService) getLab(labId uint) (*models.Lab, error) { diff --git a/src/service/jobs/job_service_tango_test.go b/src/service/jobs/job_service_tango_test.go index fc59007..0ebd7d9 100644 --- a/src/service/jobs/job_service_tango_test.go +++ b/src/service/jobs/job_service_tango_test.go @@ -14,10 +14,10 @@ var ( tangoDockerFile = basePath + "/tango-Dockerfile" autolab0 = basePath + "/tango0" autolab1 = basePath + "/tango1" - autolab2 = basePath + "/tango2" - autolab3 = basePath + "/tango3" - autolab4 = basePath + "/tango4" - tangoTimeout = 10 * time.Second + //autolab2 = basePath + "/tango2" + autolab3 = basePath + "/tango3" + autolab4 = basePath + "/tango4" + tangoTimeout = 10 * time.Second ) type testMap = map[string]testCase diff --git a/src/service/labs/lab_service.go b/src/service/labs/lab_service.go index cf33da1..90ddb45 100644 --- a/src/service/labs/lab_service.go +++ b/src/service/labs/lab_service.go @@ -2,10 +2,10 @@ package labs import ( "fmt" - . "github.com/makeopensource/leviathan/common" + com "github.com/makeopensource/leviathan/common" "github.com/makeopensource/leviathan/models" "github.com/makeopensource/leviathan/service/docker" - . "github.com/makeopensource/leviathan/service/file_manager" + fm "github.com/makeopensource/leviathan/service/file_manager" "github.com/rs/zerolog/log" "gorm.io/gorm" "os" @@ -16,10 +16,10 @@ import ( type LabService struct { db *gorm.DB dk *docker.DkService - fileMan *FileManagerService + fileMan *fm.FileManagerService } -func NewLabService(db *gorm.DB, dk *docker.DkService, service *FileManagerService) *LabService { +func NewLabService(db *gorm.DB, dk *docker.DkService, service *fm.FileManagerService) *LabService { return &LabService{ db: db, dk: dk, @@ -35,21 +35,21 @@ func (service *LabService) CreateLab(lab *models.Lab, jobDirId string) (uint, er defer service.fileMan.DeleteFolder(jobDirId) jobFolderName := fmt.Sprintf("%s_%s", lab.Name, jobDirId) - jobDataDirPath := fmt.Sprintf("%s/%s", LabsFolder.GetStr(), jobFolderName) - if err = os.MkdirAll(jobDataDirPath, DefaultFilePerm); err != nil { - return 0, ErrLog( + jobDataDirPath := fmt.Sprintf("%s/%s", com.LabsFolder.GetStr(), jobFolderName) + if err = os.MkdirAll(jobDataDirPath, com.DefaultFilePerm); err != nil { + return 0, com.ErrLog( "unable to create directories for lab: "+lab.Name, err, log.Error(), ) } - if err = HardLinkFolder(tmpDir, jobDataDirPath); err != nil { - return 0, ErrLog("unable to copy files to job dir", err, log.Error()) + if err = com.HardLinkFolder(tmpDir, jobDataDirPath); err != nil { + return 0, com.ErrLog("unable to copy files to job dir", err, log.Error()) } - lab.DockerFilePath = filepath.Join(jobDataDirPath, DockerfileName) - lab.JobFilesDirPath = filepath.Join(jobDataDirPath, JobDataFolderName) + lab.DockerFilePath = filepath.Join(jobDataDirPath, fm.DockerfileName) + lab.JobFilesDirPath = filepath.Join(jobDataDirPath, fm.JobDataFolderName) lab.ImageTag = fmt.Sprintf("%s:v1", lab.Name) lab.ImageTag = strings.ToLower(strings.Trim(strings.TrimSpace(lab.ImageTag), " ")) @@ -111,7 +111,7 @@ func (service *LabService) DeleteLab(id uint) error { func (service *LabService) deleteLabFiles(labData *models.Lab) error { err := os.RemoveAll(filepath.Base(labData.DockerFilePath)) if err != nil { - return ErrLog( + return com.ErrLog( "unable to delete directories for lab: "+labData.Name, err, log.Error(),