From 623577520728d121732818302cb3e033adf128ce Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 3 Oct 2025 17:09:39 -0700 Subject: [PATCH 1/8] got texera to load a csv with a lot of columns 1000+ but need to fix that the result panel trying to show it and schema propagation are going so slow it crashes the system --- .../ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala | 1 + .../ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala index c1fe63cc6ba..c3e304e61a3 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala @@ -92,6 +92,7 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(hasHeader) csvSetting.setNullValue("") + csvSetting.setMaxColumns(100000) val parser = new CsvParser(csvSetting) parser.beginParsing(inputReader) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala index 968744094d3..4cefa7f90e7 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala @@ -92,6 +92,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat csvSetting.setMaxCharsPerColumn(-1) csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(desc.hasHeader) + csvSetting.setMaxColumns(10000) parser = new CsvParser(csvSetting) parser.beginParsing(inputReader) From d6add973a9cd9beb1799682be1f7906bcb45755e Mon Sep 17 00:00:00 2001 From: Ma77Ball Date: Wed, 26 Nov 2025 00:42:52 -0800 Subject: [PATCH 2/8] Added the ability to view and work with wide column tables with left and right column shift, and a search bar --- .../request/ResultPaginationRequest.scala | 5 +- .../web/service/ExecutionResultService.scala | 23 +++++-- .../amber/core/storage/DocumentFactory.scala | 8 +-- .../core/storage/model/VirtualDocument.scala | 10 +++ .../result/iceberg/IcebergDocument.scala | 32 +++++++-- .../result-table-frame.component.html | 68 +++++++------------ .../result-table-frame.component.ts | 26 ++++++- .../workflow-result.service.ts | 14 +++- .../types/workflow-websocket.interface.ts | 3 + 9 files changed, 127 insertions(+), 62 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala index 32649c75de8..4a3e0a58a3e 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala @@ -23,5 +23,8 @@ case class ResultPaginationRequest( requestID: String, operatorID: String, pageIndex: Int, - pageSize: Int + pageSize: Int, + columnOffset: Int = 0, + columnLimit: Int = Int.MaxValue, + columnSearch: Option[String] = None ) extends TexeraWebSocketRequest diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index 246c4e49671..7e18abf6d43 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -433,12 +433,25 @@ class ExecutionResultService( storageUriOption match { case Some(storageUri) => + val (document, schemaOption) = DocumentFactory.openDocument(storageUri) + val virtualDocument = document.asInstanceOf[VirtualDocument[Tuple]] + + val columns = if (request.columnLimit < Int.MaxValue && schemaOption.isDefined) { + val schema = schemaOption.get + val allColumns = schema.getAttributeNames + val filteredColumns = request.columnSearch match { + case Some(search) => + allColumns.filter(col => col.toLowerCase.contains(search.toLowerCase)) + case None => allColumns + } + Some(filteredColumns.slice(request.columnOffset, request.columnOffset + request.columnLimit)) + } else { + None + } + val paginationIterable = { - DocumentFactory - .openDocument(storageUri) - ._1 - .asInstanceOf[VirtualDocument[Tuple]] - .getRange(from, from + request.pageSize) + virtualDocument + .getRange(from, from + request.pageSize, columns) .to(Iterable) } val mappedResults = convertTuplesToJson(paginationIterable) diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/storage/DocumentFactory.scala index 7c343b13df1..1d506581670 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/storage/DocumentFactory.scala @@ -87,8 +87,8 @@ object DocumentFactory { overrideIfExists = true ) val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (_, record) => - IcebergUtil.fromRecord(record, schema) + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) new IcebergDocument[Tuple]( namespace, @@ -144,8 +144,8 @@ object DocumentFactory { val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (_, record) => - IcebergUtil.fromRecord(record, amberSchema) + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) ( new IcebergDocument[Tuple]( diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/storage/model/VirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/storage/model/VirtualDocument.scala index 4b7e11e4a0b..3176150d3b5 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/storage/model/VirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/storage/model/VirtualDocument.scala @@ -60,6 +60,16 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] { def getRange(from: Int, until: Int): Iterator[T] = throw new NotImplementedError("getRange method is not implemented") + /** + * get an iterator of a sequence starting from index `from`, until index `until` + * @param from the starting index (inclusive) + * @param until the ending index (exclusive) + * @param columns the columns to be projected + * @return an iterator that returns data items of type T + */ + def getRange(from: Int, until: Int, columns: Option[Seq[String]]): Iterator[T] = + getRange(from, until) + /** * get an iterator of all items after the specified index `offset` * @param offset the starting index (exclusive) diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/storage/result/iceberg/IcebergDocument.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/storage/result/iceberg/IcebergDocument.scala index a05e0cdfdde..f160aab7367 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -100,20 +100,27 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( /** * Get an iterator for reading records from the table. */ - override def get(): Iterator[T] = getUsingFileSequenceOrder(0, None) + override def get(): Iterator[T] = getUsingFileSequenceOrder(0, None, None) /** * Get records within a specified range [from, until). */ override def getRange(from: Int, until: Int): Iterator[T] = { - getUsingFileSequenceOrder(from, Some(until)) + getUsingFileSequenceOrder(from, Some(until), None) + } + + /** + * Get records within a specified range [from, until) with specific columns. + */ + override def getRange(from: Int, until: Int, columns: Option[Seq[String]]): Iterator[T] = { + getUsingFileSequenceOrder(from, Some(until), columns) } /** * Get records starting after a specified offset. */ override def getAfter(offset: Int): Iterator[T] = { - getUsingFileSequenceOrder(offset, None) + getUsingFileSequenceOrder(offset, None, None) } /** @@ -150,8 +157,13 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( * * @param from start from which record inclusively, if 0 means start from the first * @param until end at which record exclusively, if None means read to the table's EOF + * @param columns columns to be projected */ - private def getUsingFileSequenceOrder(from: Int, until: Option[Int]): Iterator[T] = + private def getUsingFileSequenceOrder( + from: Int, + until: Option[Int], + columns: Option[Seq[String]] + ): Iterator[T] = withReadLock(lock) { new Iterator[T] { private val iteLock = new ReentrantLock() @@ -259,9 +271,13 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( while (!currentRecordIterator.hasNext && usableFileIterator.hasNext) { val nextFile = usableFileIterator.next() + val schemaToUse = columns match { + case Some(cols) => tableSchema.select(cols.asJava) + case None => tableSchema + } currentRecordIterator = IcebergUtil.readDataFileAsIterator( nextFile.file(), - tableSchema, + schemaToUse, table.get ) @@ -281,7 +297,11 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( val record = currentRecordIterator.next() numOfReturnedRecords += 1 - deserde(tableSchema, record) + val schemaToUse = columns match { + case Some(cols) => tableSchema.select(cols.asJava) + case None => tableSchema + } + deserde(schemaToUse, record) } } } diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html index 2f13bd6984d..35ed6efbcb6 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html @@ -17,43 +17,35 @@ under the License. --> -
+

Empty result set

-
+
+
+ + +
+
- + - {{ column.header }} - +
- - + {{ column.getCell(row) }}
-
+
\ No newline at end of file diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts index 591c41e53ac..726be332806 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts @@ -66,6 +66,9 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { currentPageIndex: number = 1; totalNumTuples: number = 0; pageSize = 5; + currentColumnOffset = 0; + columnLimit = 25; + columnSearch = ""; panelHeight = 0; tableStats: Record> = {}; prevTableStats: Record> = {}; @@ -329,7 +332,7 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { } this.isLoadingResult = true; paginatedResultService - .selectPage(this.currentPageIndex, this.pageSize) + .selectPage(this.currentPageIndex, this.pageSize, this.currentColumnOffset, this.columnLimit, this.columnSearch) .pipe(untilDestroyed(this)) .subscribe(pageData => { if (this.currentPageIndex === pageData.pageIndex) { @@ -405,4 +408,25 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { nzFooter: null, }); } + + onColumnShiftLeft(): void { + if (this.currentColumnOffset > 0) { + this.currentColumnOffset = Math.max(0, this.currentColumnOffset - this.columnLimit); + this.changePaginatedResultData(); + } + } + + onColumnShiftRight(): void { + if (this.currentColumns && this.currentColumns.length === this.columnLimit) { + this.currentColumnOffset += this.columnLimit; + this.changePaginatedResultData(); + } + } + + onColumnSearch(event: Event): void { + const input = event.target as HTMLInputElement; + this.columnSearch = input.value; + this.currentColumnOffset = 0; + this.changePaginatedResultData(); + } } diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 330b9b5bad5..96937ccbece 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -312,11 +312,18 @@ export class OperatorPaginationResultService { ); } - public selectPage(pageIndex: number, pageSize: number): Observable { + public selectPage( + pageIndex: number, + pageSize: number, + columnOffset: number = 0, + columnLimit: number = Number.MAX_SAFE_INTEGER, + columnSearch: string = "" + ): Observable { // update currently selected page this.currentPageIndex = pageIndex; // first fetch from frontend result cache - const pageCache = this.resultCache.get(pageIndex); + const useCache = columnOffset === 0 && columnLimit === Number.MAX_SAFE_INTEGER && columnSearch === ""; + const pageCache = useCache ? this.resultCache.get(pageIndex) : undefined; if (pageCache) { return of({ requestID: "", @@ -334,6 +341,9 @@ export class OperatorPaginationResultService { operatorID, pageIndex, pageSize, + columnOffset, + columnLimit, + columnSearch, }); const pendingRequestSubject = new Subject(); this.pendingRequests.set(requestID, pendingRequestSubject); diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index 15e2a2809d5..afd5ea6f04a 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -94,6 +94,9 @@ export type PaginationRequest = Readonly<{ operatorID: string; pageIndex: number; pageSize: number; + columnOffset?: number; + columnLimit?: number; + columnSearch?: string; }>; export type PaginatedResultEvent = Readonly<{ From 38cc9eeeaba1d42b93c5f468b7ea327969e2d3de Mon Sep 17 00:00:00 2001 From: Ma77Ball Date: Wed, 26 Nov 2025 04:25:04 -0800 Subject: [PATCH 3/8] fixed format issues --- .../web/service/ExecutionResultService.scala | 4 +- .../result-table-frame.component.html | 91 +++++++++++++++---- 2 files changed, 74 insertions(+), 21 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index 7e18abf6d43..0c92053c792 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -444,7 +444,9 @@ class ExecutionResultService( allColumns.filter(col => col.toLowerCase.contains(search.toLowerCase)) case None => allColumns } - Some(filteredColumns.slice(request.columnOffset, request.columnOffset + request.columnLimit)) + Some( + filteredColumns.slice(request.columnOffset, request.columnOffset + request.columnLimit) + ) } else { None } diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html index 35ed6efbcb6..7e5b0b4b06b 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html @@ -17,35 +17,76 @@ under the License. --> -
+

Empty result set

-
-
- -
- From f7027369cdcff9692bd2219f9fc3b549f178554f Mon Sep 17 00:00:00 2001 From: Ma77Ball Date: Wed, 26 Nov 2025 10:04:54 -0800 Subject: [PATCH 4/8] made max columns 10,000 --- .../amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala index 7387087c0fc..859fbb46f3f 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala @@ -92,7 +92,7 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(hasHeader) csvSetting.setNullValue("") - csvSetting.setMaxColumns(100000) + csvSetting.setMaxColumns(10000) val parser = new CsvParser(csvSetting) parser.beginParsing(inputReader) From dbee5d28e64980d9f14b820bd0c58341cd9aa9f5 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Mon, 15 Dec 2025 13:49:15 -0800 Subject: [PATCH 5/8] got rid of if statement to check if column limit was set --- .../apache/texera/web/service/ExecutionResultService.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index e9b0d3441a6..8810e9891f4 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -440,7 +440,7 @@ class ExecutionResultService( val (document, schemaOption) = DocumentFactory.openDocument(storageUri) val virtualDocument = document.asInstanceOf[VirtualDocument[Tuple]] - val columns = if (request.columnLimit < Int.MaxValue && schemaOption.isDefined) { + val columns = { val schema = schemaOption.get val allColumns = schema.getAttributeNames val filteredColumns = request.columnSearch match { @@ -451,8 +451,6 @@ class ExecutionResultService( Some( filteredColumns.slice(request.columnOffset, request.columnOffset + request.columnLimit) ) - } else { - None } val paginationIterable = { From 6ec61e91d093f785fac7b74f30f9e9d1f59f65ae Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Mon, 15 Dec 2025 16:28:31 -0800 Subject: [PATCH 6/8] fixed comments in pr #4086 (refactor code) --- .../storage/model/ReadonlyLocalFileDocument.scala | 6 +++++- .../storage/model/ReadonlyVirtualDocument.scala | 2 +- .../core/storage/model/VirtualDocument.scala | 13 ++----------- .../storage/result/iceberg/IcebergDocument.scala | 15 ++++----------- .../source/scan/csv/CSVScanSourceOpDesc.scala | 1 - .../source/scan/csv/CSVScanSourceOpExec.scala | 1 - 6 files changed, 12 insertions(+), 26 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala index 75302656c4f..14c9d1688e5 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala @@ -54,7 +54,11 @@ private[storage] class ReadonlyLocalFileDocument(uri: URI) override def get(): Iterator[Nothing] = throw new NotImplementedError("get is not supported for ReadonlyLocalFileDocument") - override def getRange(from: Int, until: Int): Iterator[Nothing] = + override def getRange( + from: Int, + until: Int, + columns: Option[Seq[String]] = None + ): Iterator[Nothing] = throw new NotImplementedError("getRange is not supported for ReadonlyLocalFileDocument") override def getAfter(offset: Int): Iterator[Nothing] = diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala index 19710e2fc18..80d8b72277b 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala @@ -54,7 +54,7 @@ trait ReadonlyVirtualDocument[T] { * @param until the ending index (exclusive) * @return an iterator that returns data items of type T */ - def getRange(from: Int, until: Int): Iterator[T] + def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] /** * Get an iterator of all items after the specified index `offset`. diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala index ca9b4ea8419..8e72c52a803 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala @@ -51,15 +51,6 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] { */ def get(): Iterator[T] = throw new NotImplementedError("get method is not implemented") - /** - * get an iterator of a sequence starting from index `from`, until index `until` - * @param from the starting index (inclusive) - * @param until the ending index (exclusive) - * @return an iterator that returns data items of type T - */ - def getRange(from: Int, until: Int): Iterator[T] = - throw new NotImplementedError("getRange method is not implemented") - /** * get an iterator of a sequence starting from index `from`, until index `until` * @param from the starting index (inclusive) @@ -67,8 +58,8 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] { * @param columns the columns to be projected * @return an iterator that returns data items of type T */ - def getRange(from: Int, until: Int, columns: Option[Seq[String]]): Iterator[T] = - getRange(from, until) + def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] = + throw new NotImplementedError("getRange method is not implemented") /** * get an iterator of all items after the specified index `offset` diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala index 3665dc99b18..e238ab7417a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -100,19 +100,12 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( /** * Get an iterator for reading records from the table. */ - override def get(): Iterator[T] = getUsingFileSequenceOrder(0, None, None) + override def get(): Iterator[T] = getUsingFileSequenceOrder(0, None) /** * Get records within a specified range [from, until). */ - override def getRange(from: Int, until: Int): Iterator[T] = { - getUsingFileSequenceOrder(from, Some(until), None) - } - - /** - * Get records within a specified range [from, until) with specific columns. - */ - override def getRange(from: Int, until: Int, columns: Option[Seq[String]]): Iterator[T] = { + override def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] = { getUsingFileSequenceOrder(from, Some(until), columns) } @@ -120,7 +113,7 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( * Get records starting after a specified offset. */ override def getAfter(offset: Int): Iterator[T] = { - getUsingFileSequenceOrder(offset, None, None) + getUsingFileSequenceOrder(offset, None) } /** @@ -162,7 +155,7 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( private def getUsingFileSequenceOrder( from: Int, until: Option[Int], - columns: Option[Seq[String]] + columns: Option[Seq[String]] = None ): Iterator[T] = withReadLock(lock) { new Iterator[T] { diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala index f4c08182709..9879835e763 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala @@ -92,7 +92,6 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(hasHeader) csvSetting.setNullValue("") - csvSetting.setMaxColumns(10000) val parser = new CsvParser(csvSetting) parser.beginParsing(inputReader) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala index 5d96adf458e..c3fbbe9bb55 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala @@ -92,7 +92,6 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat csvSetting.setMaxCharsPerColumn(-1) csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(desc.hasHeader) - csvSetting.setMaxColumns(10000) parser = new CsvParser(csvSetting) parser.beginParsing(inputReader) From b48d46f04a8d9daea5b2a8f07ea4e69ad01bddb7 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Thu, 18 Dec 2025 10:06:56 -0800 Subject: [PATCH 7/8] fixed result panel layout --- .../result-panel/result-panel.component.ts | 2 +- .../result-table-frame.component.html | 18 +++++------ .../result-table-frame.component.ts | 31 ++++++++++++++++++- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts index 628fd3c7a30..077fac2de3a 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts @@ -52,7 +52,7 @@ import { CompilationState } from "../../types/workflow-compiling.interface"; import { WorkflowFatalError } from "../../types/workflow-websocket.interface"; export const DEFAULT_WIDTH = 800; -export const DEFAULT_HEIGHT = 300; +export const DEFAULT_HEIGHT = 500; /** * ResultPanelComponent is the bottom level area that displays the * execution result of a workflow after the execution finishes. diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html index 7e5b0b4b06b..8284d8a9ace 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html @@ -25,6 +25,15 @@

Empty result set

+
@@ -49,15 +58,6 @@

Empty result set

nzType="right">
-
Date: Mon, 22 Dec 2025 09:56:24 -0800 Subject: [PATCH 8/8] addressed comments in pr#4086: added a comment explaining the column param in getRange(...) and made the column limit displayed on the result panel a configurable parameter --- common/config/src/main/resources/gui.conf | 4 ++++ .../scala/org/apache/texera/config/GuiConfig.scala | 2 ++ .../storage/model/ReadonlyVirtualDocument.scala | 1 + .../texera/service/resource/ConfigResource.scala | 1 + .../app/common/service/gui-config.service.mock.ts | 1 + frontend/src/app/common/type/gui-config.ts | 1 + .../result-table-frame.component.spec.ts | 13 +++++++++++++ .../result-table-frame.component.ts | 6 +++++- 8 files changed, 28 insertions(+), 1 deletion(-) diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index f73cba82c3e..8039441b130 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -108,5 +108,9 @@ gui { # whether AI copilot feature is enabled copilot-enabled = false copilot-enabled = ${?GUI_WORKFLOW_WORKSPACE_COPILOT_ENABLED} + + # the limit of columns to be displayed in the result table + limit-columns = 15 + limit-columns = ${?GUI_WORKFLOW_WORKSPACE_LIMIT_COLUMNS} } } \ No newline at end of file diff --git a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala index 5e16529bb6f..14016f43743 100644 --- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala @@ -69,4 +69,6 @@ object GuiConfig { conf.getInt("gui.workflow-workspace.active-time-in-minutes") val guiWorkflowWorkspaceCopilotEnabled: Boolean = conf.getBoolean("gui.workflow-workspace.copilot-enabled") + val guiWorkflowWorkspaceLimitColumns: Int = + conf.getInt("gui.workflow-workspace.limit-columns") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala index 80d8b72277b..47a55112842 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala @@ -52,6 +52,7 @@ trait ReadonlyVirtualDocument[T] { * Get an iterator of a sequence starting from index `from`, until index `until`. * @param from the starting index (inclusive) * @param until the ending index (exclusive) + * @param columns optional sequence of column names to retrieve. If None, retrieves all columns. * @return an iterator that returns data items of type T */ def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] diff --git a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala index aa907ec3030..30c657746fb 100644 --- a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala +++ b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala @@ -56,6 +56,7 @@ class ConfigResource { ), "activeTimeInMinutes" -> GuiConfig.guiWorkflowWorkspaceActiveTimeInMinutes, "copilotEnabled" -> GuiConfig.guiWorkflowWorkspaceCopilotEnabled, + "limitColumns" -> GuiConfig.guiWorkflowWorkspaceLimitColumns, // flags from the auth.conf if needed "expirationTimeInMinutes" -> AuthConfig.jwtExpirationMinutes ) diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts b/frontend/src/app/common/service/gui-config.service.mock.ts index 610169a7862..daa8adfd224 100644 --- a/frontend/src/app/common/service/gui-config.service.mock.ts +++ b/frontend/src/app/common/service/gui-config.service.mock.ts @@ -49,6 +49,7 @@ export class MockGuiConfigService { expirationTimeInMinutes: 2880, activeTimeInMinutes: 15, copilotEnabled: false, + limitColumns: 15, }; get env(): GuiConfig { diff --git a/frontend/src/app/common/type/gui-config.ts b/frontend/src/app/common/type/gui-config.ts index d9b4ad279ad..b47dfa0ab1b 100644 --- a/frontend/src/app/common/type/gui-config.ts +++ b/frontend/src/app/common/type/gui-config.ts @@ -40,6 +40,7 @@ export interface GuiConfig { expirationTimeInMinutes: number; activeTimeInMinutes: number; copilotEnabled: boolean; + limitColumns: number; } export interface SidebarTabs { diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.spec.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.spec.ts index 8ef0ad4d9ac..8c36110ae37 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.spec.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.spec.ts @@ -25,6 +25,7 @@ import { StubOperatorMetadataService } from "../../../service/operator-metadata/ import { HttpClientTestingModule } from "@angular/common/http/testing"; import { NzModalModule } from "ng-zorro-antd/modal"; import { commonTestProviders } from "../../../../common/testing/test-utils"; +import { GuiConfigService } from "../../../../common/service/gui-config.service"; describe("ResultTableFrameComponent", () => { let component: ResultTableFrameComponent; @@ -39,6 +40,14 @@ describe("ResultTableFrameComponent", () => { provide: OperatorMetadataService, useClass: StubOperatorMetadataService, }, + { + provide: GuiConfigService, + useValue: { + env: { + limitColumns: 15, + }, + }, + }, ...commonTestProviders, ], }).compileComponents(); @@ -60,4 +69,8 @@ describe("ResultTableFrameComponent", () => { expect(component.currentResult).toEqual([{ test: "property" }]); }); + + it("should set columnLimit from config", () => { + expect(component.columnLimit).toEqual(15); + }); }); diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts index 3055eaa13df..abb6daa8825 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts @@ -31,6 +31,7 @@ import { DomSanitizer, SafeHtml } from "@angular/platform-browser"; import { ResultExportationComponent } from "../../result-exportation/result-exportation.component"; import { SchemaAttribute } from "../../../types/workflow-compiling.interface"; import { WorkflowStatusService } from "../../../service/workflow-status/workflow-status.service"; +import { GuiConfigService } from "../../../../common/service/gui-config.service"; /** * The Component will display the result in an excel table format, @@ -84,7 +85,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { private resizeService: PanelResizeService, private changeDetectorRef: ChangeDetectorRef, private sanitizer: DomSanitizer, - private workflowStatusService: WorkflowStatusService + private workflowStatusService: WorkflowStatusService, + private guiConfigService: GuiConfigService ) {} ngOnChanges(changes: SimpleChanges): void { @@ -117,6 +119,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { } }); + this.columnLimit = this.guiConfigService.env.limitColumns; + this.workflowResultService .getResultUpdateStream() .pipe(untilDestroyed(this))