diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala index 32649c75de8..4a3e0a58a3e 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala @@ -23,5 +23,8 @@ case class ResultPaginationRequest( requestID: String, operatorID: String, pageIndex: Int, - pageSize: Int + pageSize: Int, + columnOffset: Int = 0, + columnLimit: Int = Int.MaxValue, + columnSearch: Option[String] = None ) extends TexeraWebSocketRequest diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index b90de7e01f6..8810e9891f4 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -437,12 +437,25 @@ class ExecutionResultService( storageUriOption match { case Some(storageUri) => + val (document, schemaOption) = DocumentFactory.openDocument(storageUri) + val virtualDocument = document.asInstanceOf[VirtualDocument[Tuple]] + + val columns = { + val schema = schemaOption.get + val allColumns = schema.getAttributeNames + val filteredColumns = request.columnSearch match { + case Some(search) => + allColumns.filter(col => col.toLowerCase.contains(search.toLowerCase)) + case None => allColumns + } + Some( + filteredColumns.slice(request.columnOffset, request.columnOffset + request.columnLimit) + ) + } + val paginationIterable = { - DocumentFactory - .openDocument(storageUri) - ._1 - .asInstanceOf[VirtualDocument[Tuple]] - .getRange(from, from + request.pageSize) + virtualDocument + .getRange(from, from + request.pageSize, columns) .to(Iterable) } val mappedResults = convertTuplesToJson(paginationIterable) diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index f73cba82c3e..8039441b130 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -108,5 +108,9 @@ gui { # whether AI copilot feature is enabled copilot-enabled = false copilot-enabled = ${?GUI_WORKFLOW_WORKSPACE_COPILOT_ENABLED} + + # the limit of columns to be displayed in the result table + limit-columns = 15 + limit-columns = ${?GUI_WORKFLOW_WORKSPACE_LIMIT_COLUMNS} } } \ No newline at end of file diff --git a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala index 5e16529bb6f..14016f43743 100644 --- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala @@ -69,4 +69,6 @@ object GuiConfig { conf.getInt("gui.workflow-workspace.active-time-in-minutes") val guiWorkflowWorkspaceCopilotEnabled: Boolean = conf.getBoolean("gui.workflow-workspace.copilot-enabled") + val guiWorkflowWorkspaceLimitColumns: Int = + conf.getInt("gui.workflow-workspace.limit-columns") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index a828ab12b7d..4c37c33bb20 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -87,8 +87,8 @@ object DocumentFactory { overrideIfExists = true ) val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (_, record) => - IcebergUtil.fromRecord(record, schema) + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) new IcebergDocument[Tuple]( namespace, @@ -144,8 +144,8 @@ object DocumentFactory { val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (_, record) => - IcebergUtil.fromRecord(record, amberSchema) + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) ( new IcebergDocument[Tuple]( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala index 75302656c4f..14c9d1688e5 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyLocalFileDocument.scala @@ -54,7 +54,11 @@ private[storage] class ReadonlyLocalFileDocument(uri: URI) override def get(): Iterator[Nothing] = throw new NotImplementedError("get is not supported for ReadonlyLocalFileDocument") - override def getRange(from: Int, until: Int): Iterator[Nothing] = + override def getRange( + from: Int, + until: Int, + columns: Option[Seq[String]] = None + ): Iterator[Nothing] = throw new NotImplementedError("getRange is not supported for ReadonlyLocalFileDocument") override def getAfter(offset: Int): Iterator[Nothing] = diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala index 19710e2fc18..47a55112842 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/ReadonlyVirtualDocument.scala @@ -52,9 +52,10 @@ trait ReadonlyVirtualDocument[T] { * Get an iterator of a sequence starting from index `from`, until index `until`. * @param from the starting index (inclusive) * @param until the ending index (exclusive) + * @param columns optional sequence of column names to retrieve. If None, retrieves all columns. * @return an iterator that returns data items of type T */ - def getRange(from: Int, until: Int): Iterator[T] + def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] /** * Get an iterator of all items after the specified index `offset`. diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala index c0407c37d20..8e72c52a803 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala @@ -55,9 +55,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] { * get an iterator of a sequence starting from index `from`, until index `until` * @param from the starting index (inclusive) * @param until the ending index (exclusive) + * @param columns the columns to be projected * @return an iterator that returns data items of type T */ - def getRange(from: Int, until: Int): Iterator[T] = + def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] = throw new NotImplementedError("getRange method is not implemented") /** diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala index cef8273afb2..e238ab7417a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -105,8 +105,8 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( /** * Get records within a specified range [from, until). */ - override def getRange(from: Int, until: Int): Iterator[T] = { - getUsingFileSequenceOrder(from, Some(until)) + override def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] = { + getUsingFileSequenceOrder(from, Some(until), columns) } /** @@ -150,8 +150,13 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( * * @param from start from which record inclusively, if 0 means start from the first * @param until end at which record exclusively, if None means read to the table's EOF + * @param columns columns to be projected */ - private def getUsingFileSequenceOrder(from: Int, until: Option[Int]): Iterator[T] = + private def getUsingFileSequenceOrder( + from: Int, + until: Option[Int], + columns: Option[Seq[String]] = None + ): Iterator[T] = withReadLock(lock) { new Iterator[T] { private val iteLock = new ReentrantLock() @@ -259,9 +264,13 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( while (!currentRecordIterator.hasNext && usableFileIterator.hasNext) { val nextFile = usableFileIterator.next() + val schemaToUse = columns match { + case Some(cols) => tableSchema.select(cols.asJava) + case None => tableSchema + } currentRecordIterator = IcebergUtil.readDataFileAsIterator( nextFile.file(), - tableSchema, + schemaToUse, table.get ) @@ -281,7 +290,11 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( val record = currentRecordIterator.next() numOfReturnedRecords += 1 - deserde(tableSchema, record) + val schemaToUse = columns match { + case Some(cols) => tableSchema.select(cols.asJava) + case None => tableSchema + } + deserde(schemaToUse, record) } } } diff --git a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala index aa907ec3030..30c657746fb 100644 --- a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala +++ b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala @@ -56,6 +56,7 @@ class ConfigResource { ), "activeTimeInMinutes" -> GuiConfig.guiWorkflowWorkspaceActiveTimeInMinutes, "copilotEnabled" -> GuiConfig.guiWorkflowWorkspaceCopilotEnabled, + "limitColumns" -> GuiConfig.guiWorkflowWorkspaceLimitColumns, // flags from the auth.conf if needed "expirationTimeInMinutes" -> AuthConfig.jwtExpirationMinutes ) diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts b/frontend/src/app/common/service/gui-config.service.mock.ts index 610169a7862..daa8adfd224 100644 --- a/frontend/src/app/common/service/gui-config.service.mock.ts +++ b/frontend/src/app/common/service/gui-config.service.mock.ts @@ -49,6 +49,7 @@ export class MockGuiConfigService { expirationTimeInMinutes: 2880, activeTimeInMinutes: 15, copilotEnabled: false, + limitColumns: 15, }; get env(): GuiConfig { diff --git a/frontend/src/app/common/type/gui-config.ts b/frontend/src/app/common/type/gui-config.ts index d9b4ad279ad..b47dfa0ab1b 100644 --- a/frontend/src/app/common/type/gui-config.ts +++ b/frontend/src/app/common/type/gui-config.ts @@ -40,6 +40,7 @@ export interface GuiConfig { expirationTimeInMinutes: number; activeTimeInMinutes: number; copilotEnabled: boolean; + limitColumns: number; } export interface SidebarTabs { diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts index 628fd3c7a30..077fac2de3a 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts @@ -52,7 +52,7 @@ import { CompilationState } from "../../types/workflow-compiling.interface"; import { WorkflowFatalError } from "../../types/workflow-websocket.interface"; export const DEFAULT_WIDTH = 800; -export const DEFAULT_HEIGHT = 300; +export const DEFAULT_HEIGHT = 500; /** * ResultPanelComponent is the bottom level area that displays the * execution result of a workflow after the execution finishes. diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html index 2f13bd6984d..8284d8a9ace 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.html @@ -25,6 +25,39 @@

Empty result set

+ +
+ + +
{ let component: ResultTableFrameComponent; @@ -39,6 +40,14 @@ describe("ResultTableFrameComponent", () => { provide: OperatorMetadataService, useClass: StubOperatorMetadataService, }, + { + provide: GuiConfigService, + useValue: { + env: { + limitColumns: 15, + }, + }, + }, ...commonTestProviders, ], }).compileComponents(); @@ -60,4 +69,8 @@ describe("ResultTableFrameComponent", () => { expect(component.currentResult).toEqual([{ test: "property" }]); }); + + it("should set columnLimit from config", () => { + expect(component.columnLimit).toEqual(15); + }); }); diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts index 591c41e53ac..abb6daa8825 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts @@ -31,6 +31,7 @@ import { DomSanitizer, SafeHtml } from "@angular/platform-browser"; import { ResultExportationComponent } from "../../result-exportation/result-exportation.component"; import { SchemaAttribute } from "../../../types/workflow-compiling.interface"; import { WorkflowStatusService } from "../../../service/workflow-status/workflow-status.service"; +import { GuiConfigService } from "../../../../common/service/gui-config.service"; /** * The Component will display the result in an excel table format, @@ -66,6 +67,9 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { currentPageIndex: number = 1; totalNumTuples: number = 0; pageSize = 5; + currentColumnOffset = 0; + columnLimit = 25; + columnSearch = ""; panelHeight = 0; tableStats: Record> = {}; prevTableStats: Record> = {}; @@ -81,7 +85,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { private resizeService: PanelResizeService, private changeDetectorRef: ChangeDetectorRef, private sanitizer: DomSanitizer, - private workflowStatusService: WorkflowStatusService + private workflowStatusService: WorkflowStatusService, + private guiConfigService: GuiConfigService ) {} ngOnChanges(changes: SimpleChanges): void { @@ -114,6 +119,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { } }); + this.columnLimit = this.guiConfigService.env.limitColumns; + this.workflowResultService .getResultUpdateStream() .pipe(untilDestroyed(this)) @@ -233,8 +240,37 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { return this.sanitizer.bypassSecurityTrustHtml(styledValue); } + /** + * Adjusts the number of result rows displayed per page based on the + * available vertical space of the Texera results panel. + * + * The method accounts for fixed UI elements within the panel—such as + * headers, column navigation controls, pagination, and the search bar— + * to determine the remaining space available for rendering result rows. + * The page size is then recalculated using the height of a single table row. + * + * To maintain a stable user experience during panel resizes, the current + * page index is recomputed so that the previously visible results remain + * in view and the user does not experience an abrupt jump in the dataset. + * + * @param panelHeight - The total height (in pixels) of the results panel. + */ private adjustPageSizeBasedOnPanelSize(panelHeight: number) { - const newPageSize = Math.max(1, Math.floor((panelHeight - 38.62 - 64.27 - 56.6 - 32.63) / 38.62)); + const TABLE_HEADER_HEIGHT = 38.62; + const PANEL_HEADER_HEIGHT = 64.27; // Includes panel title and tab bar + const COLUMN_NAVIGATION_HEIGHT = 56.6; // Previous/Next columns controls + const PAGINATION_HEIGHT = 32.63; + const SEARCH_BAR_HEIGHT_WITH_MARGIN = 77; // Approximate height for search bar and margins + const ROW_HEIGHT = 38.62; + + const usedHeight = + TABLE_HEADER_HEIGHT + + PANEL_HEADER_HEIGHT + + COLUMN_NAVIGATION_HEIGHT + + PAGINATION_HEIGHT + + SEARCH_BAR_HEIGHT_WITH_MARGIN; + + const newPageSize = Math.max(1, Math.floor((panelHeight - usedHeight) / ROW_HEIGHT)); const oldOffset = (this.currentPageIndex - 1) * this.pageSize; @@ -329,7 +365,7 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { } this.isLoadingResult = true; paginatedResultService - .selectPage(this.currentPageIndex, this.pageSize) + .selectPage(this.currentPageIndex, this.pageSize, this.currentColumnOffset, this.columnLimit, this.columnSearch) .pipe(untilDestroyed(this)) .subscribe(pageData => { if (this.currentPageIndex === pageData.pageIndex) { @@ -405,4 +441,25 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { nzFooter: null, }); } + + onColumnShiftLeft(): void { + if (this.currentColumnOffset > 0) { + this.currentColumnOffset = Math.max(0, this.currentColumnOffset - this.columnLimit); + this.changePaginatedResultData(); + } + } + + onColumnShiftRight(): void { + if (this.currentColumns && this.currentColumns.length === this.columnLimit) { + this.currentColumnOffset += this.columnLimit; + this.changePaginatedResultData(); + } + } + + onColumnSearch(event: Event): void { + const input = event.target as HTMLInputElement; + this.columnSearch = input.value; + this.currentColumnOffset = 0; + this.changePaginatedResultData(); + } } diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 330b9b5bad5..96937ccbece 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -312,11 +312,18 @@ export class OperatorPaginationResultService { ); } - public selectPage(pageIndex: number, pageSize: number): Observable { + public selectPage( + pageIndex: number, + pageSize: number, + columnOffset: number = 0, + columnLimit: number = Number.MAX_SAFE_INTEGER, + columnSearch: string = "" + ): Observable { // update currently selected page this.currentPageIndex = pageIndex; // first fetch from frontend result cache - const pageCache = this.resultCache.get(pageIndex); + const useCache = columnOffset === 0 && columnLimit === Number.MAX_SAFE_INTEGER && columnSearch === ""; + const pageCache = useCache ? this.resultCache.get(pageIndex) : undefined; if (pageCache) { return of({ requestID: "", @@ -334,6 +341,9 @@ export class OperatorPaginationResultService { operatorID, pageIndex, pageSize, + columnOffset, + columnLimit, + columnSearch, }); const pendingRequestSubject = new Subject(); this.pendingRequests.set(requestID, pendingRequestSubject); diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index 15e2a2809d5..afd5ea6f04a 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -94,6 +94,9 @@ export type PaginationRequest = Readonly<{ operatorID: string; pageIndex: number; pageSize: number; + columnOffset?: number; + columnLimit?: number; + columnSearch?: string; }>; export type PaginatedResultEvent = Readonly<{