diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala index 5689eaafbc0..7e31be1ca7f 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala @@ -23,6 +23,5 @@ import org.apache.texera.web.service.ExecutionResultService.WebResultUpdate case class WebResultUpdateEvent( updates: Map[String, WebResultUpdate], - tableStats: Map[String, Map[String, Map[String, Any]]], - resultStorageMode: String + tableStats: Map[String, Map[String, Map[String, Any]]] ) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index 8810e9891f4..285c836b607 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -382,7 +382,7 @@ class ExecutionResultService( outputPort.mode == OutputMode.SINGLE_SNAPSHOT } - if (StorageConfig.resultStorageMode == ICEBERG && !hasSingleSnapshot) { + if (!hasSingleSnapshot) { val storageUri = WorkflowExecutionsResource .getResultUriByLogicalPortId( executionId, @@ -408,8 +408,7 @@ class ExecutionResultService( Iterable( WebResultUpdateEvent( buf.toMap, - allTableStats.toMap, - StorageConfig.resultStorageMode.toLowerCase + allTableStats.toMap ) ) }) diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 85a62b77a3b..276d1491cdb 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -17,8 +17,6 @@ # See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines. storage { - result-storage-mode = iceberg # either mongodb or iceberg, mongodb will be deprecated soon - result-storage-mode = ${?STORAGE_RESULT_MODE} # Configuration for Apache Iceberg, used for storing the workflow results & stats iceberg { diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala index 099e12260d2..1adc3233055 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala @@ -44,11 +44,6 @@ object EnvironmentalVariable { */ val ENV_USER_JWT_TOKEN = "USER_JWT_TOKEN" - /** - * Variables in storage.conf - */ - val ENV_RESULT_STORAGE_MODE = "STORAGE_RESULT_MODE" - // JDBC val ENV_JDBC_URL = "STORAGE_JDBC_URL" val ENV_JDBC_USERNAME = "STORAGE_JDBC_USERNAME" diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index c5bd3302862..3bc1e05a9b5 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -28,9 +28,6 @@ object StorageConfig { // Load configuration private val conf: Config = ConfigFactory.parseResources("storage.conf").resolve() - // General storage settings - val resultStorageMode: String = conf.getString("storage.result-storage-mode") - // JDBC specifics val jdbcUrl: String = conf.getString("storage.jdbc.url") val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases") diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 4c37c33bb20..15949ef4717 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -76,32 +76,25 @@ object DocumentFactory { throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } - StorageConfig.resultStorageMode.toLowerCase match { - case ICEBERG => - val icebergSchema = IcebergUtil.toIcebergSchema(schema) - IcebergUtil.createTable( - IcebergCatalogInstance.getInstance(), - namespace, - storageKey, - icebergSchema, - overrideIfExists = true - ) - val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => - IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) - - new IcebergDocument[Tuple]( - namespace, - storageKey, - icebergSchema, - serde, - deserde - ) - case unsupportedMode => - throw new IllegalArgumentException( - s"Storage mode '$unsupportedMode' is not supported" - ) - } + val icebergSchema = IcebergUtil.toIcebergSchema(schema) + IcebergUtil.createTable( + IcebergCatalogInstance.getInstance(), + namespace, + storageKey, + icebergSchema, + overrideIfExists = true + ) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) + + new IcebergDocument[Tuple]( + namespace, + storageKey, + icebergSchema, + serde, + deserde + ) case unsupportedScheme => throw new UnsupportedOperationException( s"Unsupported URI scheme: $unsupportedScheme for creating the document" @@ -130,38 +123,31 @@ object DocumentFactory { throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } - StorageConfig.resultStorageMode.toLowerCase match { - case ICEBERG => - val table = IcebergUtil - .loadTableMetadata( - IcebergCatalogInstance.getInstance(), - namespace, - storageKey - ) - .getOrElse( - throw new IllegalArgumentException("No storage is found for the given URI") - ) - - val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) - val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => - IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) - - ( - new IcebergDocument[Tuple]( - namespace, - storageKey, - table.schema(), - serde, - deserde - ), - Some(amberSchema) - ) - case mode => - throw new IllegalArgumentException( - s"Storage mode '$mode' is not supported" - ) - } + val table = IcebergUtil + .loadTableMetadata( + IcebergCatalogInstance.getInstance(), + namespace, + storageKey + ) + .getOrElse( + throw new IllegalArgumentException("No storage is found for the given URI") + ) + + val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) + + ( + new IcebergDocument[Tuple]( + namespace, + storageKey, + table.schema(), + serde, + deserde + ), + Some(amberSchema) + ) case unsupportedScheme => throw new UnsupportedOperationException( s"Unsupported URI scheme: $unsupportedScheme for opening the document" diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts index abb6daa8825..383c6daa716 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts @@ -29,7 +29,6 @@ import { RowModalComponent } from "../result-panel-modal.component"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { DomSanitizer, SafeHtml } from "@angular/platform-browser"; import { ResultExportationComponent } from "../../result-exportation/result-exportation.component"; -import { SchemaAttribute } from "../../../types/workflow-compiling.interface"; import { WorkflowStatusService } from "../../../service/workflow-status/workflow-status.service"; import { GuiConfigService } from "../../../../common/service/gui-config.service"; @@ -74,8 +73,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { tableStats: Record> = {}; prevTableStats: Record> = {}; widthPercent: string = ""; - sinkStorageMode: string = ""; - private schema: ReadonlyArray = []; isOperatorFinished: boolean = false; constructor( @@ -101,7 +98,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { this.tableStats = paginatedResultService.getStats(); this.prevTableStats = this.tableStats; - this.schema = paginatedResultService.getSchema(); } } } @@ -160,13 +156,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { } }); - this.workflowResultService - .getSinkStorageMode() - .pipe(untilDestroyed(this)) - .subscribe(sinkStorageMode => { - this.sinkStorageMode = sinkStorageMode; - }); - this.resizeService.currentSize.pipe(untilDestroyed(this)).subscribe(size => { this.panelHeight = size.height; this.adjustPageSizeBasedOnPanelSize(size.height); @@ -179,7 +168,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { if (this.operatorId) { const paginatedResultService = this.workflowResultService.getPaginatedResultService(this.operatorId); if (paginatedResultService) { - this.schema = paginatedResultService.getSchema(); } } } @@ -207,8 +195,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { compare(field: string, stats: string): SafeHtml { let current = this.tableStats[field][stats]; let previous = this.prevTableStats[field][stats]; - let currentStr = ""; - let previousStr = ""; + let currentStr: string; + let previousStr: string; if (typeof current === "number" && typeof previous === "number") { currentStr = current.toFixed(2); @@ -370,7 +358,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { .subscribe(pageData => { if (this.currentPageIndex === pageData.pageIndex) { this.setupResultTable(pageData.table, paginatedResultService.getCurrentTotalNumTuples()); - this.schema = pageData.schema; this.changeDetectorRef.detectChanges(); } }); diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 96937ccbece..9fd18e0f161 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -29,7 +29,7 @@ import { } from "../../types/execute-workflow.interface"; import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; import { PaginatedResultEvent, WorkflowAvailableResultEvent } from "../../types/workflow-websocket.interface"; -import { BehaviorSubject, map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs"; +import { map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs"; import { v4 as uuid } from "uuid"; import { IndexableObject } from "../../types/result-table.interface"; import { isDefined } from "../../../common/util/predicate"; @@ -49,13 +49,11 @@ export class WorkflowResultService { private resultUpdateStream = new Subject>(); private resultTableStats = new ReplaySubject>>>(1); private resultInitiateStream = new Subject(); - private sinkStorageModeSubject = new BehaviorSubject(""); constructor(private wsService: WorkflowWebsocketService) { this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => { this.handleResultUpdate(event.updates); this.handleTableStatsUpdate(event.tableStats); - this.handleSinkStorageModeUpdate(event.sinkStorageMode); }); this.wsService .subscribeToEvent("WorkflowAvailableResultEvent") @@ -165,14 +163,6 @@ export class WorkflowResultService { this.resultTableStats.next(event); } - private handleSinkStorageModeUpdate(sinkStorageMode: string): void { - this.sinkStorageModeSubject.next(sinkStorageMode); - } - - public getSinkStorageMode(): BehaviorSubject { - return this.sinkStorageModeSubject; - } - private getOrInitPaginatedResultService(operatorID: string): OperatorPaginationResultService { let service = this.getPaginatedResultService(operatorID); if (!service) { diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 23ade231998..1633e4bfdf7 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -120,7 +120,6 @@ export interface WorkflowResultUpdateEvent extends Readonly<{ updates: WorkflowResultUpdate; tableStats: WorkflowResultTableStats; - sinkStorageMode: string; }> {} // user-defined type guards to check the type of the result update