-
Notifications
You must be signed in to change notification settings - Fork 111
refactor(dataset): Redirect multipart upload through File Service #4136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
refactor(dataset): Redirect multipart upload through File Service #4136
Conversation
| FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE | ||
| ); | ||
|
|
||
| CREATE TABLE IF NOT EXISTS dataset_upload_session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add a separate DDL update file for the new table? It would make it easier to apply the schema change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have an example in another pr I can follow?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can take a look at the files under sql/updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks got it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
aicam
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| DeleteObjectRequest.builder().bucket(bucketName).key(objectKey).build() | ||
| ) | ||
| } | ||
| def uploadPart( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment and correct formatting to this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| @Path("/multipart-upload/part") | ||
| @Consumes(Array(MediaType.APPLICATION_OCTET_STREAM)) | ||
| def uploadPart( | ||
| @QueryParam("ownerEmail") ownerEmail: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need ownerEmail here? we already have user id using their token and can be fetched, please move these queries to request body as JSON
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its up to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need ownerEmail here? we already have user id using their token and can be fetched, please move these queries to request body as JSON
I am unsure if we can use email from user directly, maybe @xuang7 can confirm, if so I will proceed to change it as you mentioned, I was just thinking of cases when the dataset is shared....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I would like to keep query params + body purely to stream since it is a convenient way to use streaming application/octet-stream. What do you think?
| inputStream: InputStream, | ||
| contentLength: Option[Long] | ||
| ): Unit = { | ||
| val body: RequestBody = contentLength match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need streaming here, it just read all bytes at once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for the case when user does not specify Content Length, we read all the bytes. (However this case is forbidden in uploadPart endpoint)
The case when user specify Content Length; RequestBody.fromInputStream(inputStream, contentLength /* = ex 5 GiB */) the SDK does not read and buffer the whole 5 GiB in memory first. For retries (depends in support), the SDK tries rewinding by using InputStream.reset() with a read limit of 128 KiB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aicam do you agree in this?
What changes were proposed in this PR?
DB / schema
Add
dataset_upload_sessionto track multipart upload sessions, including:(uid, did, file_path)as the primary keyupload_id(UNIQUE),physical_addressnum_parts_requestedto enforce expected part countAdd
dataset_upload_session_partto track per-part completion for a multipart upload:(upload_id, part_number)as the primary keyetag(TEXT NOT NULL DEFAULT '') to persist per-part ETags for finalizeCHECK (part_number > 0)for sanityFOREIGN KEY (upload_id) REFERENCES dataset_upload_session(upload_id) ON DELETE CASCADEBackend (
DatasetResource)Multipart upload API (server-side streaming to S3, LakeFS manages multipart state):
POST /dataset/multipart-upload?type=initnum_parts_requested.dataset_upload_session_partfor part numbers1..num_parts_requestedwithetag = ''(enables deterministic per-part locking and simple completeness checks).(uid, did, file_path)(409 Conflict). Race is handled via PK/duplicate handling + best-effort LakeFS abort for the losing initializer.POST /dataset/multipart-upload/part?filePath=...&partNumber=...Content-Lengthfor streaming uploads.partNumber <= num_parts_requested.(upload_id, part_number)row usingSELECT … FOR UPDATE NOWAITto prevent concurrent uploads of the same part.dataset_upload_session_part.etag(upsert/overwrite for retries).POST /dataset/multipart-upload?type=finishLocks the session row using
SELECT … FOR UPDATE NOWAITto prevent concurrent finalize/abort.Validates completeness using DB state:
num_parts_requestedrows for theupload_id.Fetches
(part_number, etag)ordered bypart_numberfrom DB and completes multipart upload via LakeFS.Deletes the DB session row; part rows are cleaned up via
ON DELETE CASCADE.NOWAIT lock contention is handled (mapped to “already being finalized/aborted”, 409).
POST /dataset/multipart-upload?type=abortSELECT … FOR UPDATE NOWAIT.finish.Access control and dataset permissions remain enforced on all endpoints.
Frontend service (
dataset.service.ts)multipartUpload(...)updated to reflect the server flow and return values (ETag persistence is server-side; frontend does not need to track ETags).Frontend component (
dataset-detail.component.ts)type=abortto clean up the upload session.Any related issues, documentation, discussions?
Closes #4110
How was this PR tested?
Unit tests added/updated (multipart upload spec):
Manual testing via the dataset detail page (single and multiple uploads), verified:
Was this PR authored or co-authored using generative AI tooling?
GPT partial use.