From 215d371a244e03821faa462e195cdce142448184 Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Sun, 21 Sep 2025 21:50:10 -0700 Subject: [PATCH 1/4] try to fix file upload --- .../service/resource/DatasetResource.scala | 141 ++++++++++-------- 1 file changed, 75 insertions(+), 66 deletions(-) diff --git a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala index 5e7ee3964d8..47538f56b3e 100644 --- a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala +++ b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala @@ -289,9 +289,9 @@ class DatasetResource { createdDataset.getOwnerUid, createdDataset.getName, createdDataset.getIsPublic, - createdDataset.getIsDownloadable, createdDataset.getDescription, - createdDataset.getCreationTime + createdDataset.getCreationTime, + createdDataset.getIsDownloadable, ), user.getEmail, PrivilegeEnum.WRITE, @@ -449,6 +449,7 @@ class DatasetResource { @PathParam("did") did: Integer, @QueryParam("filePath") encodedFilePath: String, @QueryParam("message") message: String, + @DefaultValue("true") @QueryParam("useMultipartUpload") useMultipartUpload: Boolean, fileStream: InputStream, @Context headers: HttpHeaders, @Auth user: SessionUser @@ -469,74 +470,82 @@ class DatasetResource { repoName = dataset.getName filePath = URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name) - // ---------- decide part-size & number-of-parts ---------- - val declaredLen = Option(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).map(_.toLong) - var partSize = StorageConfig.s3MultipartUploadPartSize + if (useMultipartUpload) { + // ---------- decide part-size & number-of-parts ---------- + val declaredLen = Option(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).map(_.toLong) + var partSize = StorageConfig.s3MultipartUploadPartSize + + declaredLen.foreach { ln => + val needed = ((ln + partSize - 1) / partSize).toInt + if (needed > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) + partSize = math.max( + MINIMUM_NUM_OF_MULTIPART_S3_PART, + ln / (MAXIMUM_NUM_OF_MULTIPART_S3_PARTS - 1) + ) + } - declaredLen.foreach { ln => - val needed = ((ln + partSize - 1) / partSize).toInt - if (needed > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) - partSize = math.max( - MINIMUM_NUM_OF_MULTIPART_S3_PART, - ln / (MAXIMUM_NUM_OF_MULTIPART_S3_PARTS - 1) - ) - } + val expectedParts = declaredLen + .map(ln => + ((ln + partSize - 1) / partSize).toInt + 1 + ) // "+1" for the last (possibly small) part + .getOrElse(MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) + + // ---------- ask LakeFS for presigned URLs ---------- + val presign = LakeFSStorageClient + .initiatePresignedMultipartUploads(repoName, filePath, expectedParts) + uploadId = presign.getUploadId + val presignedUrls = presign.getPresignedUrls.asScala.iterator + physicalAddress = presign.getPhysicalAddress + + // ---------- stream & upload parts ---------- + /* + 1. Reads the input stream in chunks of 'partSize' bytes by stacking them in a buffer + 2. Uploads each chunk (part) using a presigned URL + 3. Tracks each part number and ETag returned from S3 + 4. After all parts are uploaded, completes the multipart upload + */ + val buf = new Array[Byte](partSize.toInt) + var buffered = 0 + var partNumber = 1 + val completedParts = ListBuffer[(Int, String)]() + + @inline def flush(): Unit = { + if (buffered == 0) return + if (!presignedUrls.hasNext) + throw new WebApplicationException("Ran out of presigned part URLs – ask for more parts") + + val etag = put(buf, buffered, presignedUrls.next(), partNumber) + completedParts += ((partNumber, etag)) + partNumber += 1 + buffered = 0 + } - val expectedParts = declaredLen - .map(ln => - ((ln + partSize - 1) / partSize).toInt + 1 - ) // “+1” for the last (possibly small) part - .getOrElse(MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) - - // ---------- ask LakeFS for presigned URLs ---------- - val presign = LakeFSStorageClient - .initiatePresignedMultipartUploads(repoName, filePath, expectedParts) - uploadId = presign.getUploadId - val presignedUrls = presign.getPresignedUrls.asScala.iterator - physicalAddress = presign.getPhysicalAddress - - // ---------- stream & upload parts ---------- - /* - 1. Reads the input stream in chunks of 'partSize' bytes by stacking them in a buffer - 2. Uploads each chunk (part) using a presigned URL - 3. Tracks each part number and ETag returned from S3 - 4. After all parts are uploaded, completes the multipart upload - */ - val buf = new Array[Byte](partSize.toInt) - var buffered = 0 - var partNumber = 1 - val completedParts = ListBuffer[(Int, String)]() - - @inline def flush(): Unit = { - if (buffered == 0) return - if (!presignedUrls.hasNext) - throw new WebApplicationException("Ran out of presigned part URLs – ask for more parts") - - val etag = put(buf, buffered, presignedUrls.next(), partNumber) - completedParts += ((partNumber, etag)) - partNumber += 1 - buffered = 0 - } + var read = fileStream.read(buf, buffered, buf.length - buffered) + while (read != -1) { + buffered += read + if (buffered == buf.length) flush() // buffer full + read = fileStream.read(buf, buffered, buf.length - buffered) + } + fileStream.close() + flush() - var read = fileStream.read(buf, buffered, buf.length - buffered) - while (read != -1) { - buffered += read - if (buffered == buf.length) flush() // buffer full - read = fileStream.read(buf, buffered, buf.length - buffered) - } - fileStream.close() - flush() - - // ---------- complete upload ---------- - LakeFSStorageClient.completePresignedMultipartUploads( - repoName, - filePath, - uploadId, - completedParts.toList, - physicalAddress - ) + // ---------- complete upload ---------- + LakeFSStorageClient.completePresignedMultipartUploads( + repoName, + filePath, + uploadId, + completedParts.toList, + physicalAddress + ) - Response.ok(Map("message" -> s"Uploaded $filePath in ${completedParts.size} parts")).build() + Response.ok(Map("message" -> s"Uploaded $filePath in ${completedParts.size} parts")).build() + } else { + // Use single file upload method + LakeFSStorageClient.writeFileToRepo(repoName, filePath, fileStream) + fileStream.close() + + Response.ok(Map("message" -> s"Uploaded $filePath using single upload")).build() + } } } catch { case e: Exception => From 6e5b4d87f750793a52b39e40a7d83e3c846269a0 Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Sun, 21 Sep 2025 22:04:33 -0700 Subject: [PATCH 2/4] revert download change --- .../edu/uci/ics/texera/service/resource/DatasetResource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala index 47538f56b3e..07c7fe6269e 100644 --- a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala +++ b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala @@ -289,9 +289,9 @@ class DatasetResource { createdDataset.getOwnerUid, createdDataset.getName, createdDataset.getIsPublic, + createdDataset.getIsDownloadable, createdDataset.getDescription, createdDataset.getCreationTime, - createdDataset.getIsDownloadable, ), user.getEmail, PrivilegeEnum.WRITE, From 3b93199fabc69aa5f82f112087ec7200164e4bcb Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Sun, 21 Sep 2025 23:38:27 -0700 Subject: [PATCH 3/4] avoid unnecessary change --- .../edu/uci/ics/texera/service/resource/DatasetResource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala index 07c7fe6269e..524889c821e 100644 --- a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala +++ b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala @@ -291,7 +291,7 @@ class DatasetResource { createdDataset.getIsPublic, createdDataset.getIsDownloadable, createdDataset.getDescription, - createdDataset.getCreationTime, + createdDataset.getCreationTime ), user.getEmail, PrivilegeEnum.WRITE, From 2c2f52bab1484ea2467250d9cc87665d527752a4 Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Mon, 22 Sep 2025 16:33:53 -0700 Subject: [PATCH 4/4] feat(config): allow default.conf to be set using environment variables Add environment variable support for all configuration properties in default.conf, following the same pattern used in gui.conf. This allows deployment configurations to override defaults without modifying the configuration file directly. Environment variables added: - CONFIG_SERVICE_ALWAYS_RESET_CONFIGURATIONS_TO_DEFAULT_VALUES - GUI_LOGO_LOGO, GUI_LOGO_MINI_LOGO, GUI_LOGO_FAVICON - GUI_TABS_* for all tab configurations - DATASET_* for dataset upload configurations --- core/config/src/main/resources/default.conf | 28 +++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/core/config/src/main/resources/default.conf b/core/config/src/main/resources/default.conf index 548a4882d3a..55ca9caf87c 100644 --- a/core/config/src/main/resources/default.conf +++ b/core/config/src/main/resources/default.conf @@ -22,43 +22,71 @@ config-service { # Setting to true resets all site settings in the database to the defaults defined in this file. always-reset-configurations-to-default-values = false + always-reset-configurations-to-default-values = ${?CONFIG_SERVICE_ALWAYS_RESET_CONFIGURATIONS_TO_DEFAULT_VALUES} } gui { logo { logo = "assets/logos/logo.png" + logo = ${?GUI_LOGO_LOGO} + mini_logo = "assets/logos/full_logo_small.png" + mini_logo = ${?GUI_LOGO_MINI_LOGO} + favicon = "assets/logos/favicon-32x32.png" + favicon = ${?GUI_LOGO_FAVICON} } tabs { # config for hub tabs hub_enabled = true + hub_enabled = ${?GUI_TABS_HUB_ENABLED} + home_enabled = true + home_enabled = ${?GUI_TABS_HOME_ENABLED} + workflow_enabled = true + workflow_enabled = ${?GUI_TABS_WORKFLOW_ENABLED} + dataset_enabled = true + dataset_enabled = ${?GUI_TABS_DATASET_ENABLED} # config for your work tabs your_work_enabled = true + your_work_enabled = ${?GUI_TABS_YOUR_WORK_ENABLED} + projects_enabled = false + projects_enabled = ${?GUI_TABS_PROJECTS_ENABLED} + workflows_enabled = true + workflows_enabled = ${?GUI_TABS_WORKFLOWS_ENABLED} + datasets_enabled = true + datasets_enabled = ${?GUI_TABS_DATASETS_ENABLED} + quota_enabled = true + quota_enabled = ${?GUI_TABS_QUOTA_ENABLED} + forum_enabled = false + forum_enabled = ${?GUI_TABS_FORUM_ENABLED} # config for about tab about_enabled = true + about_enabled = ${?GUI_TABS_ABOUT_ENABLED} } } dataset { # the file size limit for dataset upload single_file_upload_max_size_mib = 20 + single_file_upload_max_size_mib = ${?DATASET_SINGLE_FILE_UPLOAD_MAX_SIZE_MIB} # the maximum number of file chunks that can be held in the memory; # you may increase this number if your deployment environment has enough memory resource max_number_of_concurrent_uploading_file_chunks = 10 + max_number_of_concurrent_uploading_file_chunks = ${?DATASET_MAX_NUMBER_OF_CONCURRENT_UPLOADING_FILE_CHUNKS} # the size of each chunk during the multipart upload of file multipart_upload_chunk_size_mib = 50 + multipart_upload_chunk_size_mib = ${?DATASET_MULTIPART_UPLOAD_CHUNK_SIZE_MIB} }