Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ import org.apache.texera.web.service.ExecutionResultService.WebResultUpdate

case class WebResultUpdateEvent(
updates: Map[String, WebResultUpdate],
tableStats: Map[String, Map[String, Map[String, Any]]],
resultStorageMode: String
tableStats: Map[String, Map[String, Map[String, Any]]]
) extends TexeraWebSocketEvent
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ class ExecutionResultService(
outputPort.mode == OutputMode.SINGLE_SNAPSHOT
}

if (StorageConfig.resultStorageMode == ICEBERG && !hasSingleSnapshot) {
if (!hasSingleSnapshot) {
val storageUri = WorkflowExecutionsResource
.getResultUriByLogicalPortId(
executionId,
Expand All @@ -408,8 +408,7 @@ class ExecutionResultService(
Iterable(
WebResultUpdateEvent(
buf.toMap,
allTableStats.toMap,
StorageConfig.resultStorageMode.toLowerCase
allTableStats.toMap
)
)
})
Expand Down
2 changes: 0 additions & 2 deletions common/config/src/main/resources/storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

# See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines.
storage {
result-storage-mode = iceberg # either mongodb or iceberg, mongodb will be deprecated soon
result-storage-mode = ${?STORAGE_RESULT_MODE}

# Configuration for Apache Iceberg, used for storing the workflow results & stats
iceberg {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ object EnvironmentalVariable {
*/
val ENV_USER_JWT_TOKEN = "USER_JWT_TOKEN"

/**
* Variables in storage.conf
*/
val ENV_RESULT_STORAGE_MODE = "STORAGE_RESULT_MODE"

// JDBC
val ENV_JDBC_URL = "STORAGE_JDBC_URL"
val ENV_JDBC_USERNAME = "STORAGE_JDBC_USERNAME"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ object StorageConfig {
// Load configuration
private val conf: Config = ConfigFactory.parseResources("storage.conf").resolve()

// General storage settings
val resultStorageMode: String = conf.getString("storage.result-storage-mode")

// JDBC specifics
val jdbcUrl: String = conf.getString("storage.jdbc.url")
val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,25 @@ object DocumentFactory {
throw new IllegalArgumentException(s"Resource type $resourceType is not supported")
}

StorageConfig.resultStorageMode.toLowerCase match {
case ICEBERG =>
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
IcebergUtil.createTable(
IcebergCatalogInstance.getInstance(),
namespace,
storageKey,
icebergSchema,
overrideIfExists = true
)
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema))

new IcebergDocument[Tuple](
namespace,
storageKey,
icebergSchema,
serde,
deserde
)
case unsupportedMode =>
throw new IllegalArgumentException(
s"Storage mode '$unsupportedMode' is not supported"
)
}
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
IcebergUtil.createTable(
IcebergCatalogInstance.getInstance(),
namespace,
storageKey,
icebergSchema,
overrideIfExists = true
)
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema))

new IcebergDocument[Tuple](
namespace,
storageKey,
icebergSchema,
serde,
deserde
)
case unsupportedScheme =>
throw new UnsupportedOperationException(
s"Unsupported URI scheme: $unsupportedScheme for creating the document"
Expand Down Expand Up @@ -130,38 +123,31 @@ object DocumentFactory {
throw new IllegalArgumentException(s"Resource type $resourceType is not supported")
}

StorageConfig.resultStorageMode.toLowerCase match {
case ICEBERG =>
val table = IcebergUtil
.loadTableMetadata(
IcebergCatalogInstance.getInstance(),
namespace,
storageKey
)
.getOrElse(
throw new IllegalArgumentException("No storage is found for the given URI")
)

val amberSchema = IcebergUtil.fromIcebergSchema(table.schema())
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema))

(
new IcebergDocument[Tuple](
namespace,
storageKey,
table.schema(),
serde,
deserde
),
Some(amberSchema)
)
case mode =>
throw new IllegalArgumentException(
s"Storage mode '$mode' is not supported"
)
}
val table = IcebergUtil
.loadTableMetadata(
IcebergCatalogInstance.getInstance(),
namespace,
storageKey
)
.getOrElse(
throw new IllegalArgumentException("No storage is found for the given URI")
)

val amberSchema = IcebergUtil.fromIcebergSchema(table.schema())
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema))

(
new IcebergDocument[Tuple](
namespace,
storageKey,
table.schema(),
serde,
deserde
),
Some(amberSchema)
)
case unsupportedScheme =>
throw new UnsupportedOperationException(
s"Unsupported URI scheme: $unsupportedScheme for opening the document"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import { RowModalComponent } from "../result-panel-modal.component";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { DomSanitizer, SafeHtml } from "@angular/platform-browser";
import { ResultExportationComponent } from "../../result-exportation/result-exportation.component";
import { SchemaAttribute } from "../../../types/workflow-compiling.interface";
import { WorkflowStatusService } from "../../../service/workflow-status/workflow-status.service";
import { GuiConfigService } from "../../../../common/service/gui-config.service";

Expand Down Expand Up @@ -74,8 +73,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges {
tableStats: Record<string, Record<string, number>> = {};
prevTableStats: Record<string, Record<string, number>> = {};
widthPercent: string = "";
sinkStorageMode: string = "";
private schema: ReadonlyArray<SchemaAttribute> = [];
isOperatorFinished: boolean = false;

constructor(
Expand All @@ -101,7 +98,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges {

this.tableStats = paginatedResultService.getStats();
this.prevTableStats = this.tableStats;
this.schema = paginatedResultService.getSchema();
}
}
}
Expand Down Expand Up @@ -160,13 +156,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges {
}
});

this.workflowResultService
.getSinkStorageMode()
.pipe(untilDestroyed(this))
.subscribe(sinkStorageMode => {
this.sinkStorageMode = sinkStorageMode;
});

this.resizeService.currentSize.pipe(untilDestroyed(this)).subscribe(size => {
this.panelHeight = size.height;
this.adjustPageSizeBasedOnPanelSize(size.height);
Expand All @@ -179,7 +168,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges {
if (this.operatorId) {
const paginatedResultService = this.workflowResultService.getPaginatedResultService(this.operatorId);
if (paginatedResultService) {
this.schema = paginatedResultService.getSchema();
}
}
}
Expand Down Expand Up @@ -207,8 +195,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges {
compare(field: string, stats: string): SafeHtml {
let current = this.tableStats[field][stats];
let previous = this.prevTableStats[field][stats];
let currentStr = "";
let previousStr = "";
let currentStr: string;
let previousStr: string;

if (typeof current === "number" && typeof previous === "number") {
currentStr = current.toFixed(2);
Expand Down Expand Up @@ -370,7 +358,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges {
.subscribe(pageData => {
if (this.currentPageIndex === pageData.pageIndex) {
this.setupResultTable(pageData.table, paginatedResultService.getCurrentTotalNumTuples());
this.schema = pageData.schema;
this.changeDetectorRef.detectChanges();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
} from "../../types/execute-workflow.interface";
import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service";
import { PaginatedResultEvent, WorkflowAvailableResultEvent } from "../../types/workflow-websocket.interface";
import { BehaviorSubject, map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs";
import { map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs";
import { v4 as uuid } from "uuid";
import { IndexableObject } from "../../types/result-table.interface";
import { isDefined } from "../../../common/util/predicate";
Expand All @@ -49,13 +49,11 @@ export class WorkflowResultService {
private resultUpdateStream = new Subject<Record<string, WebResultUpdate | undefined>>();
private resultTableStats = new ReplaySubject<Record<string, Record<string, Record<string, number>>>>(1);
private resultInitiateStream = new Subject<string>();
private sinkStorageModeSubject = new BehaviorSubject<string>("");

constructor(private wsService: WorkflowWebsocketService) {
this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => {
this.handleResultUpdate(event.updates);
this.handleTableStatsUpdate(event.tableStats);
this.handleSinkStorageModeUpdate(event.sinkStorageMode);
});
this.wsService
.subscribeToEvent("WorkflowAvailableResultEvent")
Expand Down Expand Up @@ -165,14 +163,6 @@ export class WorkflowResultService {
this.resultTableStats.next(event);
}

private handleSinkStorageModeUpdate(sinkStorageMode: string): void {
this.sinkStorageModeSubject.next(sinkStorageMode);
}

public getSinkStorageMode(): BehaviorSubject<string> {
return this.sinkStorageModeSubject;
}

private getOrInitPaginatedResultService(operatorID: string): OperatorPaginationResultService {
let service = this.getPaginatedResultService(operatorID);
if (!service) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export interface WorkflowResultUpdateEvent
extends Readonly<{
updates: WorkflowResultUpdate;
tableStats: WorkflowResultTableStats;
sinkStorageMode: string;
}> {}

// user-defined type guards to check the type of the result update
Expand Down
Loading