diff --git a/spec/leviathan_node/src/generated/jobs/v1/jobs_pb.ts b/spec/leviathan_node/src/generated/jobs/v1/jobs_pb.ts index 6f9f17a..e8455d4 100644 --- a/spec/leviathan_node/src/generated/jobs/v1/jobs_pb.ts +++ b/spec/leviathan_node/src/generated/jobs/v1/jobs_pb.ts @@ -12,7 +12,7 @@ import type { Message } from "@bufbuild/protobuf"; * Describes the file jobs/v1/jobs.proto. */ export const file_jobs_v1_jobs: GenFile = /*@__PURE__*/ - fileDesc("ChJqb2JzL3YxL2pvYnMucHJvdG8SB2pvYnMudjEizAEKDU5ld0pvYlJlcXVlc3QSJgoIam9iRmlsZXMYASADKAsyFC50eXBlcy52MS5GaWxlVXBsb2FkEigKCmRvY2tlckZpbGUYAiABKAsyFC50eXBlcy52MS5GaWxlVXBsb2FkEhEKCWltYWdlTmFtZRgDIAEoCRIbChNqb2JUaW1lb3V0SW5TZWNvbmRzGAQgASgEEhAKCGVudHJ5Q21kGAUgASgJEicKBmxpbWl0cxgGIAEoCzIXLnR5cGVzLnYxLk1hY2hpbmVMaW1pdHMiHwoOTmV3Sm9iUmVzcG9uc2USDQoFam9iSWQYASABKAkiIQoQQ2FuY2VsSm9iUmVxdWVzdBINCgVqb2JJZBgBIAEoCSITChFDYW5jZWxKb2JSZXNwb25zZSIeCg1Kb2JMb2dSZXF1ZXN0Eg0KBWpvYklkGAEgASgJIkQKD0pvYkxvZ3NSZXNwb25zZRIjCgdqb2JJbmZvGAEgASgLMhIuam9icy52MS5Kb2JTdGF0dXMSDAoEbG9ncxgCIAEoCSJDCglKb2JTdGF0dXMSDgoGam9iX2lkGAEgASgJEg4KBnN0YXR1cxgCIAEoCRIWCg5zdGF0dXNfbWVzc2FnZRgDIAEoCTLVAQoKSm9iU2VydmljZRI7CgZOZXdKb2ISFi5qb2JzLnYxLk5ld0pvYlJlcXVlc3QaFy5qb2JzLnYxLk5ld0pvYlJlc3BvbnNlIgASRAoMU3RyZWFtU3RhdHVzEhYuam9icy52MS5Kb2JMb2dSZXF1ZXN0Ghguam9icy52MS5Kb2JMb2dzUmVzcG9uc2UiADABEkQKCUNhbmNlbEpvYhIZLmpvYnMudjEuQ2FuY2VsSm9iUmVxdWVzdBoaLmpvYnMudjEuQ2FuY2VsSm9iUmVzcG9uc2UiAEKMAQoLY29tLmpvYnMudjFCCUpvYnNQcm90b1ABWjVnaXRodWIuY29tL21ha2VvcGVuc291cmNlL2xldmlhdGhhbi9nZW5lcmF0ZWQvam9icy92MaICA0pYWKoCB0pvYnMuVjHKAgdKb2JzXFYx4gITSm9ic1xWMVxHUEJNZXRhZGF0YeoCCEpvYnM6OlYxYgZwcm90bzM", [file_types_v1_types]); + fileDesc("ChJqb2JzL3YxL2pvYnMucHJvdG8SB2pvYnMudjEizAEKDU5ld0pvYlJlcXVlc3QSJgoIam9iRmlsZXMYASADKAsyFC50eXBlcy52MS5GaWxlVXBsb2FkEigKCmRvY2tlckZpbGUYAiABKAsyFC50eXBlcy52MS5GaWxlVXBsb2FkEhEKCWltYWdlTmFtZRgDIAEoCRIbChNqb2JUaW1lb3V0SW5TZWNvbmRzGAQgASgEEhAKCGVudHJ5Q21kGAUgASgJEicKBmxpbWl0cxgGIAEoCzIXLnR5cGVzLnYxLk1hY2hpbmVMaW1pdHMiHwoOTmV3Sm9iUmVzcG9uc2USDQoFam9iSWQYASABKAkiIQoQQ2FuY2VsSm9iUmVxdWVzdBINCgVqb2JJZBgBIAEoCSITChFDYW5jZWxKb2JSZXNwb25zZSIeCg1Kb2JMb2dSZXF1ZXN0Eg0KBWpvYklkGAEgASgJIkQKD0pvYkxvZ3NSZXNwb25zZRIjCgdqb2JJbmZvGAEgASgLMhIuam9icy52MS5Kb2JTdGF0dXMSDAoEbG9ncxgCIAEoCSJDCglKb2JTdGF0dXMSDgoGam9iX2lkGAEgASgJEg4KBnN0YXR1cxgCIAEoCRIWCg5zdGF0dXNfbWVzc2FnZRgDIAEoCTKWAgoKSm9iU2VydmljZRI7CgZOZXdKb2ISFi5qb2JzLnYxLk5ld0pvYlJlcXVlc3QaFy5qb2JzLnYxLk5ld0pvYlJlc3BvbnNlIgASPwoJR2V0U3RhdHVzEhYuam9icy52MS5Kb2JMb2dSZXF1ZXN0Ghguam9icy52MS5Kb2JMb2dzUmVzcG9uc2UiABJECgxTdHJlYW1TdGF0dXMSFi5qb2JzLnYxLkpvYkxvZ1JlcXVlc3QaGC5qb2JzLnYxLkpvYkxvZ3NSZXNwb25zZSIAMAESRAoJQ2FuY2VsSm9iEhkuam9icy52MS5DYW5jZWxKb2JSZXF1ZXN0Ghouam9icy52MS5DYW5jZWxKb2JSZXNwb25zZSIAQowBCgtjb20uam9icy52MUIJSm9ic1Byb3RvUAFaNWdpdGh1Yi5jb20vbWFrZW9wZW5zb3VyY2UvbGV2aWF0aGFuL2dlbmVyYXRlZC9qb2JzL3YxogIDSlhYqgIHSm9icy5WMcoCB0pvYnNcVjHiAhNKb2JzXFYxXEdQQk1ldGFkYXRh6gIISm9iczo6VjFiBnByb3RvMw", [file_types_v1_types]); /** * @generated from message jobs.v1.NewJobRequest @@ -182,6 +182,18 @@ export const JobService: GenService<{ output: typeof NewJobResponseSchema; }, /** + * Gets job status at call time, whatever it may be + * + * @generated from rpc jobs.v1.JobService.GetStatus + */ + getStatus: { + methodKind: "unary"; + input: typeof JobLogRequestSchema; + output: typeof JobLogsResponseSchema; + }, + /** + * Streams job status until it is complete + * * @generated from rpc jobs.v1.JobService.StreamStatus */ streamStatus: { diff --git a/spec/proto/jobs/v1/jobs.proto b/spec/proto/jobs/v1/jobs.proto index 32388fd..6327021 100644 --- a/spec/proto/jobs/v1/jobs.proto +++ b/spec/proto/jobs/v1/jobs.proto @@ -8,6 +8,9 @@ import "types/v1/types.proto"; service JobService { rpc NewJob(NewJobRequest) returns (NewJobResponse) {} + // Gets job status at call time, whatever it may be + rpc GetStatus (JobLogRequest) returns (JobLogsResponse) {} + // Streams job status until it is complete rpc StreamStatus(JobLogRequest) returns (stream JobLogsResponse) {} rpc CancelJob(CancelJobRequest) returns (CancelJobResponse) {} } diff --git a/src/api/v1/job_impl.go b/src/api/v1/job_impl.go index c3ca855..5fd1669 100644 --- a/src/api/v1/job_impl.go +++ b/src/api/v1/job_impl.go @@ -63,15 +63,24 @@ func (job *JobServer) NewJob(ctx context.Context, req *connect.Request[v1.NewJob return res, nil } +func (job *JobServer) GetStatus(_ context.Context, req *connect.Request[v1.JobLogRequest]) (*connect.Response[v1.JobLogsResponse], error) { + status, logs, err := job.srv.GetJobStatusAndLogs(req.Msg.GetJobId()) + if err != nil { + return nil, err + } + + res := connect.NewResponse(&v1.JobLogsResponse{ + JobInfo: status.ToProto(), + Logs: logs, + }) + return res, nil +} + func (job *JobServer) StreamStatus(ctx context.Context, req *connect.Request[v1.JobLogRequest], stream *connect.ServerStream[v1.JobLogsResponse]) error { streamFunc := func(jobInfo *models.Job, logs string) error { return stream.Send(&v1.JobLogsResponse{ - JobInfo: &v1.JobStatus{ - JobId: jobInfo.JobId, - Status: string(jobInfo.Status), - StatusMessage: jobInfo.StatusMessage, - }, - Logs: logs, + JobInfo: jobInfo.ToProto(), + Logs: logs, }) } diff --git a/src/generated/jobs/v1/jobs.pb.go b/src/generated/jobs/v1/jobs.pb.go index 36255b8..0a7d98d 100644 --- a/src/generated/jobs/v1/jobs.pb.go +++ b/src/generated/jobs/v1/jobs.pb.go @@ -413,10 +413,11 @@ const file_jobs_v1_jobs_proto_rawDesc = "" + "\tJobStatus\x12\x15\n" + "\x06job_id\x18\x01 \x01(\tR\x05jobId\x12\x16\n" + "\x06status\x18\x02 \x01(\tR\x06status\x12%\n" + - "\x0estatus_message\x18\x03 \x01(\tR\rstatusMessage2\xd5\x01\n" + + "\x0estatus_message\x18\x03 \x01(\tR\rstatusMessage2\x96\x02\n" + "\n" + "JobService\x12;\n" + - "\x06NewJob\x12\x16.jobs.v1.NewJobRequest\x1a\x17.jobs.v1.NewJobResponse\"\x00\x12D\n" + + "\x06NewJob\x12\x16.jobs.v1.NewJobRequest\x1a\x17.jobs.v1.NewJobResponse\"\x00\x12?\n" + + "\tGetStatus\x12\x16.jobs.v1.JobLogRequest\x1a\x18.jobs.v1.JobLogsResponse\"\x00\x12D\n" + "\fStreamStatus\x12\x16.jobs.v1.JobLogRequest\x1a\x18.jobs.v1.JobLogsResponse\"\x000\x01\x12D\n" + "\tCancelJob\x12\x19.jobs.v1.CancelJobRequest\x1a\x1a.jobs.v1.CancelJobResponse\"\x00B\x8c\x01\n" + "\vcom.jobs.v1B\tJobsProtoP\x01Z5github.com/makeopensource/leviathan/generated/jobs/v1\xa2\x02\x03JXX\xaa\x02\aJobs.V1\xca\x02\aJobs\\V1\xe2\x02\x13Jobs\\V1\\GPBMetadata\xea\x02\bJobs::V1b\x06proto3" @@ -450,14 +451,16 @@ var file_jobs_v1_jobs_proto_depIdxs = []int32{ 7, // 1: jobs.v1.NewJobRequest.dockerFile:type_name -> types.v1.FileUpload 8, // 2: jobs.v1.NewJobRequest.limits:type_name -> types.v1.MachineLimits 6, // 3: jobs.v1.JobLogsResponse.jobInfo:type_name -> jobs.v1.JobStatus - 0, // 4: jobs.v1.JobService.NewJobFromRPC:input_type -> jobs.v1.NewJobRequest - 4, // 5: jobs.v1.JobService.StreamStatus:input_type -> jobs.v1.JobLogRequest - 2, // 6: jobs.v1.JobService.CancelJob:input_type -> jobs.v1.CancelJobRequest - 1, // 7: jobs.v1.JobService.NewJobFromRPC:output_type -> jobs.v1.NewJobResponse - 5, // 8: jobs.v1.JobService.StreamStatus:output_type -> jobs.v1.JobLogsResponse - 3, // 9: jobs.v1.JobService.CancelJob:output_type -> jobs.v1.CancelJobResponse - 7, // [7:10] is the sub-list for method output_type - 4, // [4:7] is the sub-list for method input_type + 0, // 4: jobs.v1.JobService.NewJob:input_type -> jobs.v1.NewJobRequest + 4, // 5: jobs.v1.JobService.GetStatus:input_type -> jobs.v1.JobLogRequest + 4, // 6: jobs.v1.JobService.StreamStatus:input_type -> jobs.v1.JobLogRequest + 2, // 7: jobs.v1.JobService.CancelJob:input_type -> jobs.v1.CancelJobRequest + 1, // 8: jobs.v1.JobService.NewJob:output_type -> jobs.v1.NewJobResponse + 5, // 9: jobs.v1.JobService.GetStatus:output_type -> jobs.v1.JobLogsResponse + 5, // 10: jobs.v1.JobService.StreamStatus:output_type -> jobs.v1.JobLogsResponse + 3, // 11: jobs.v1.JobService.CancelJob:output_type -> jobs.v1.CancelJobResponse + 8, // [8:12] is the sub-list for method output_type + 4, // [4:8] is the sub-list for method input_type 4, // [4:4] is the sub-list for extension type_name 4, // [4:4] is the sub-list for extension extendee 0, // [0:4] is the sub-list for field type_name diff --git a/src/generated/jobs/v1/v1connect/jobs.connect.go b/src/generated/jobs/v1/v1connect/jobs.connect.go index e96af44..b01dc4c 100644 --- a/src/generated/jobs/v1/v1connect/jobs.connect.go +++ b/src/generated/jobs/v1/v1connect/jobs.connect.go @@ -33,8 +33,10 @@ const ( // reflection-formatted method names, remove the leading slash and convert the remaining slash to a // period. const ( - // JobServiceNewJobProcedure is the fully-qualified name of the JobService's NewJobFromRPC RPC. - JobServiceNewJobProcedure = "/jobs.v1.JobService/NewJobFromRPC" + // JobServiceNewJobProcedure is the fully-qualified name of the JobService's NewJob RPC. + JobServiceNewJobProcedure = "/jobs.v1.JobService/NewJob" + // JobServiceGetStatusProcedure is the fully-qualified name of the JobService's GetStatus RPC. + JobServiceGetStatusProcedure = "/jobs.v1.JobService/GetStatus" // JobServiceStreamStatusProcedure is the fully-qualified name of the JobService's StreamStatus RPC. JobServiceStreamStatusProcedure = "/jobs.v1.JobService/StreamStatus" // JobServiceCancelJobProcedure is the fully-qualified name of the JobService's CancelJob RPC. @@ -44,6 +46,9 @@ const ( // JobServiceClient is a client for the jobs.v1.JobService service. type JobServiceClient interface { NewJob(context.Context, *connect.Request[v1.NewJobRequest]) (*connect.Response[v1.NewJobResponse], error) + // Gets job status at call time, whatever it may be + GetStatus(context.Context, *connect.Request[v1.JobLogRequest]) (*connect.Response[v1.JobLogsResponse], error) + // Streams job status until it is complete StreamStatus(context.Context, *connect.Request[v1.JobLogRequest]) (*connect.ServerStreamForClient[v1.JobLogsResponse], error) CancelJob(context.Context, *connect.Request[v1.CancelJobRequest]) (*connect.Response[v1.CancelJobResponse], error) } @@ -62,7 +67,13 @@ func NewJobServiceClient(httpClient connect.HTTPClient, baseURL string, opts ... newJob: connect.NewClient[v1.NewJobRequest, v1.NewJobResponse]( httpClient, baseURL+JobServiceNewJobProcedure, - connect.WithSchema(jobServiceMethods.ByName("NewJobFromRPC")), + connect.WithSchema(jobServiceMethods.ByName("NewJob")), + connect.WithClientOptions(opts...), + ), + getStatus: connect.NewClient[v1.JobLogRequest, v1.JobLogsResponse]( + httpClient, + baseURL+JobServiceGetStatusProcedure, + connect.WithSchema(jobServiceMethods.ByName("GetStatus")), connect.WithClientOptions(opts...), ), streamStatus: connect.NewClient[v1.JobLogRequest, v1.JobLogsResponse]( @@ -83,15 +94,21 @@ func NewJobServiceClient(httpClient connect.HTTPClient, baseURL string, opts ... // jobServiceClient implements JobServiceClient. type jobServiceClient struct { newJob *connect.Client[v1.NewJobRequest, v1.NewJobResponse] + getStatus *connect.Client[v1.JobLogRequest, v1.JobLogsResponse] streamStatus *connect.Client[v1.JobLogRequest, v1.JobLogsResponse] cancelJob *connect.Client[v1.CancelJobRequest, v1.CancelJobResponse] } -// NewJob calls jobs.v1.JobService.NewJobFromRPC. +// NewJob calls jobs.v1.JobService.NewJob. func (c *jobServiceClient) NewJob(ctx context.Context, req *connect.Request[v1.NewJobRequest]) (*connect.Response[v1.NewJobResponse], error) { return c.newJob.CallUnary(ctx, req) } +// GetStatus calls jobs.v1.JobService.GetStatus. +func (c *jobServiceClient) GetStatus(ctx context.Context, req *connect.Request[v1.JobLogRequest]) (*connect.Response[v1.JobLogsResponse], error) { + return c.getStatus.CallUnary(ctx, req) +} + // StreamStatus calls jobs.v1.JobService.StreamStatus. func (c *jobServiceClient) StreamStatus(ctx context.Context, req *connect.Request[v1.JobLogRequest]) (*connect.ServerStreamForClient[v1.JobLogsResponse], error) { return c.streamStatus.CallServerStream(ctx, req) @@ -105,6 +122,9 @@ func (c *jobServiceClient) CancelJob(ctx context.Context, req *connect.Request[v // JobServiceHandler is an implementation of the jobs.v1.JobService service. type JobServiceHandler interface { NewJob(context.Context, *connect.Request[v1.NewJobRequest]) (*connect.Response[v1.NewJobResponse], error) + // Gets job status at call time, whatever it may be + GetStatus(context.Context, *connect.Request[v1.JobLogRequest]) (*connect.Response[v1.JobLogsResponse], error) + // Streams job status until it is complete StreamStatus(context.Context, *connect.Request[v1.JobLogRequest], *connect.ServerStream[v1.JobLogsResponse]) error CancelJob(context.Context, *connect.Request[v1.CancelJobRequest]) (*connect.Response[v1.CancelJobResponse], error) } @@ -119,7 +139,13 @@ func NewJobServiceHandler(svc JobServiceHandler, opts ...connect.HandlerOption) jobServiceNewJobHandler := connect.NewUnaryHandler( JobServiceNewJobProcedure, svc.NewJob, - connect.WithSchema(jobServiceMethods.ByName("NewJobFromRPC")), + connect.WithSchema(jobServiceMethods.ByName("NewJob")), + connect.WithHandlerOptions(opts...), + ) + jobServiceGetStatusHandler := connect.NewUnaryHandler( + JobServiceGetStatusProcedure, + svc.GetStatus, + connect.WithSchema(jobServiceMethods.ByName("GetStatus")), connect.WithHandlerOptions(opts...), ) jobServiceStreamStatusHandler := connect.NewServerStreamHandler( @@ -138,6 +164,8 @@ func NewJobServiceHandler(svc JobServiceHandler, opts ...connect.HandlerOption) switch r.URL.Path { case JobServiceNewJobProcedure: jobServiceNewJobHandler.ServeHTTP(w, r) + case JobServiceGetStatusProcedure: + jobServiceGetStatusHandler.ServeHTTP(w, r) case JobServiceStreamStatusProcedure: jobServiceStreamStatusHandler.ServeHTTP(w, r) case JobServiceCancelJobProcedure: @@ -152,7 +180,11 @@ func NewJobServiceHandler(svc JobServiceHandler, opts ...connect.HandlerOption) type UnimplementedJobServiceHandler struct{} func (UnimplementedJobServiceHandler) NewJob(context.Context, *connect.Request[v1.NewJobRequest]) (*connect.Response[v1.NewJobResponse], error) { - return nil, connect.NewError(connect.CodeUnimplemented, errors.New("jobs.v1.JobService.NewJobFromRPC is not implemented")) + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("jobs.v1.JobService.NewJob is not implemented")) +} + +func (UnimplementedJobServiceHandler) GetStatus(context.Context, *connect.Request[v1.JobLogRequest]) (*connect.Response[v1.JobLogsResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("jobs.v1.JobService.GetStatus is not implemented")) } func (UnimplementedJobServiceHandler) StreamStatus(context.Context, *connect.Request[v1.JobLogRequest], *connect.ServerStream[v1.JobLogsResponse]) error { diff --git a/src/models/job.go b/src/models/job.go index fecfd6c..e6c8406 100644 --- a/src/models/job.go +++ b/src/models/job.go @@ -3,6 +3,7 @@ package models import ( "context" "fmt" + v1 "github.com/makeopensource/leviathan/generated/jobs/v1" "github.com/rs/zerolog/log" "gorm.io/gorm" ) @@ -63,6 +64,14 @@ type Job struct { JobCtx context.Context `gorm:"-"` } +func (j *Job) ToProto() *v1.JobStatus { + return &v1.JobStatus{ + JobId: j.JobId, + Status: string(j.Status), + StatusMessage: j.StatusMessage, + } +} + // ValidateForQueue checks for fields required before sending job to queue func (j *Job) ValidateForQueue() error { if j == nil { diff --git a/src/service/jobs/job_service.go b/src/service/jobs/job_service.go index 5697ae0..a260a2d 100644 --- a/src/service/jobs/job_service.go +++ b/src/service/jobs/job_service.go @@ -249,6 +249,15 @@ func (job *JobService) StreamJobAndLogs( } } +// GetJobStatusAndLogs gets the status once whatever it may be and current logs +func (job *JobService) GetJobStatusAndLogs(jobUuid string) (*models.Job, string, error) { + jobInfo, _, logs, err := job.checkJob(jobUuid) + if err != nil { + return nil, "", err + } + return jobInfo, logs, nil +} + func (job *JobService) ListenToJobLogs(ctx context.Context, jobInfo *models.Job) chan string { logChannel := make(chan string, 2) go func(ctx context.Context) {