From e6aee9a083d7d86cce2a18f5f092a45f9163578e Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 7 Jan 2025 15:49:23 +0800 Subject: [PATCH 01/88] init --- .../client/async/CnToDnAsyncRequestType.java | 1 + .../rpc/DataNodeAsyncRequestRPCHandler.java | 1 + .../consensus/request/ConfigPhysicalPlan.java | 4 + .../request/ConfigPhysicalPlanType.java | 2 + .../write/table/AlterColumnDataTypePlan.java | 44 ++++ .../response/table/DescTableResp.java | 21 +- .../confignode/manager/ConfigManager.java | 5 +- .../confignode/manager/ProcedureManager.java | 23 +- .../manager/schema/ClusterSchemaManager.java | 34 ++- .../executor/ConfigPlanExecutor.java | 4 + .../persistence/schema/ClusterSchemaInfo.java | 47 +++- .../persistence/schema/ConfigMTree.java | 79 ++++++- .../schema/mnode/impl/ConfigTableNode.java | 15 ++ .../schema/mnode/info/ConfigTableInfo.java | 16 ++ .../AlterTableColumnDataTypeProcedure.java | 210 ++++++++++++++++++ .../table/DropTableColumnProcedure.java | 2 +- .../schema/AlterTableColumnDataTypeState.java | 27 +++ .../procedure/store/ProcedureFactory.java | 4 + .../procedure/store/ProcedureType.java | 1 + .../impl/DataNodeInternalRPCServiceImpl.java | 1 + .../db/queryengine/plan/Coordinator.java | 2 + .../config/TableConfigTaskVisitor.java | 21 ++ .../executor/ClusterConfigTaskExecutor.java | 41 +++- .../config/executor/IConfigTaskExecutor.java | 10 + .../relational/AlterColumnDataTypeTask.java | 53 +++++ .../relational/DescribeTableDetailsTask.java | 17 +- .../sql/ast/AlterColumnDataType.java | 91 ++++++++ .../plan/relational/sql/ast/AstVisitor.java | 4 + .../relational/sql/parser/AstBuilder.java | 21 ++ .../storageengine/dataregion/DataRegion.java | 2 +- .../table/AlterOrDropTableOperationType.java | 4 +- .../iotdb/commons/schema/table/TsTable.java | 21 +- .../table/column/AttributeColumnSchema.java | 7 + .../table/column/FieldColumnSchema.java | 7 + .../schema/table/column/TagColumnSchema.java | 6 + .../schema/table/column/TimeColumnSchema.java | 6 + .../table/column/TsTableColumnSchema.java | 2 + .../table/column/TsTableColumnSchemaUtil.java | 12 + .../relational/grammar/sql/RelationalSql.g4 | 1 + .../src/main/thrift/confignode.thrift | 1 + .../src/main/thrift/datanode.thrift | 1 - 41 files changed, 830 insertions(+), 41 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AlterColumnDataTypePlan.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AlterTableColumnDataTypeState.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterColumnDataTypeTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterColumnDataType.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java index a27be4e6025d..d982b3886e27 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java @@ -120,4 +120,5 @@ public enum CnToDnAsyncRequestType { INVALIDATE_MATCHED_TABLE_DEVICE_CACHE, DELETE_DATA_FOR_TABLE_DEVICE, DELETE_TABLE_DEVICE_IN_BLACK_LIST, + ALTER_COLUMN_DATATYPE } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index b1e8753ab49c..1419482e8954 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -89,6 +89,7 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( case ROLLBACK_VIEW_SCHEMA_BLACK_LIST: case DELETE_VIEW: case ALTER_VIEW: + case ALTER_COLUMN_DATATYPE: return new SchemaUpdateRPCHandler( requestType, requestId, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index cc1aaee05553..806a4033a240 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -97,6 +97,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan; import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1; import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan; @@ -368,6 +369,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case PreDeleteColumn: plan = new PreDeleteColumnPlan(); break; + case AlterColumnDataType: + plan = new AlterColumnDataTypePlan(); + break; case CommitDeleteColumn: plan = new CommitDeleteColumnPlan(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index 5d3bdcc6710e..e3857b1f03ce 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -171,6 +171,8 @@ public enum ConfigPhysicalPlanType { PreDeleteColumn((short) 860), CommitDeleteColumn((short) 861), DescTable((short) 862), + AlterColumnDataType((short) 863), + CommitAlterColumnDataType((short) 864), /** Deprecated types for sync, restored them for upgrade. */ @Deprecated diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AlterColumnDataTypePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AlterColumnDataTypePlan.java new file mode 100644 index 000000000000..4b5350c22d03 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AlterColumnDataTypePlan.java @@ -0,0 +1,44 @@ +package org.apache.iotdb.confignode.consensus.request.write.table; + +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; + +import org.apache.tsfile.enums.TSDataType; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class AlterColumnDataTypePlan extends AbstractTableColumnPlan { + + private TSDataType newType; + + public AlterColumnDataTypePlan() { + super(ConfigPhysicalPlanType.AlterColumnDataType); + } + + public AlterColumnDataTypePlan( + String database, String tableName, String columnName, TSDataType newType) { + super(ConfigPhysicalPlanType.AlterColumnDataType, database, tableName, columnName); + this.newType = newType; + } + + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + super.serializeImpl(stream); + stream.writeInt(newType.serialize()); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + super.deserializeImpl(buffer); + newType = TSDataType.deserializeFrom(buffer); + } + + public void setNewType(TSDataType newType) { + this.newType = newType; + } + + public TSDataType getNewType() { + return newType; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/table/DescTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/table/DescTableResp.java index fe09d00d2511..7808c0688994 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/table/DescTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/table/DescTableResp.java @@ -25,6 +25,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TDescTableResp; import org.apache.iotdb.consensus.common.DataSet; +import org.apache.tsfile.enums.TSDataType; + +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -32,12 +36,17 @@ public class DescTableResp implements DataSet { private final TSStatus status; private final TsTable table; private final Set preDeletedColumns; + private final Map preAlteredColumns; public DescTableResp( - final TSStatus status, final TsTable table, final Set preDeletedColumns) { + final TSStatus status, + final TsTable table, + final Set preDeletedColumns, + final Map preAlteredColumns) { this.status = status; this.table = table; this.preDeletedColumns = preDeletedColumns; + this.preAlteredColumns = preAlteredColumns; } public TDescTableResp convertToTDescTableResp() { @@ -47,6 +56,14 @@ public TDescTableResp convertToTDescTableResp() { Objects.nonNull(table) ? TsTableInternalRPCUtil.serializeSingleTsTable(table) : null); - return Objects.nonNull(preDeletedColumns) ? resp.setPreDeletedColumns(preDeletedColumns) : resp; + if (Objects.nonNull(preDeletedColumns)) { + resp.setPreDeletedColumns(preDeletedColumns); + } + if (Objects.nonNull(preAlteredColumns)) { + Map preAlteredColumnsMap = new HashMap<>(); + preAlteredColumns.forEach((col, type) -> preAlteredColumnsMap.put(col, type.serialize())); + resp.setPreAlteredColumns(preAlteredColumnsMap); + } + return resp; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index d1006aaba6ff..be0f1dc85fed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -2626,8 +2626,11 @@ public TSStatus alterOrDropTable(final TAlterOrDropTableReq req) { return procedureManager.alterTableDropColumn(req); case DROP_TABLE: return procedureManager.dropTable(req); + case ALTER_COLUMN_DATA_TYPE: + return procedureManager.alterTableColumnDataType(req); default: - throw new IllegalArgumentException(); + throw new IllegalArgumentException( + AlterOrDropTableOperationType.getType(req.operationType).toString()); } } else { return status; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index b26b8c6b3beb..f79cff952a99 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -87,10 +87,10 @@ import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.AbstractAlterOrDropTableProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.AddTableColumnProcedure; +import org.apache.iotdb.confignode.procedure.impl.schema.table.AlterTableColumnDataTypeProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.DeleteDevicesProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.DropTableColumnProcedure; -import org.apache.iotdb.confignode.procedure.impl.schema.table.DropTableProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.RenameTableColumnProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.SetTablePropertiesProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.CreateConsumerProcedure; @@ -135,6 +135,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -1539,14 +1540,30 @@ public TSStatus alterTableDropColumn(final TAlterOrDropTableReq req) { req.database, req.tableName, req.queryId, ReadWriteIOUtils.readString(req.updateInfo))); } + public TSStatus alterTableColumnDataType(TAlterOrDropTableReq req) { + return executeWithoutDuplicate( + req.database, + null, + req.tableName, + req.queryId, + ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE, + new AlterTableColumnDataTypeProcedure( + req.database, + req.tableName, + req.queryId, + ReadWriteIOUtils.readVarIntString(req.updateInfo), + TSDataType.deserialize(req.updateInfo.get()))); + } + public TSStatus dropTable(final TAlterOrDropTableReq req) { return executeWithoutDuplicate( req.database, null, req.tableName, req.queryId, - ProcedureType.DROP_TABLE_PROCEDURE, - new DropTableProcedure(req.database, req.tableName, req.queryId)); + ProcedureType.DROP_TABLE_COLUMN_PROCEDURE, + new DropTableColumnProcedure( + req.database, req.tableName, req.queryId, ReadWriteIOUtils.readString(req.updateInfo))); } public TDeleteTableDeviceResp deleteDevices(final TDeleteTableDeviceReq req) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index 353d97995b0b..7ce0176c2ece 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -99,6 +99,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; @@ -1180,7 +1181,7 @@ public synchronized Pair tableColumnCheckForColumnExtension( null); } - final TsTable expandedTable = TsTable.deserialize(ByteBuffer.wrap(originalTable.serialize())); + final TsTable expandedTable = new TsTable(originalTable); final String errorMsg = String.format( @@ -1203,6 +1204,33 @@ public synchronized Pair tableColumnCheckForColumnExtension( return new Pair<>(RpcUtils.SUCCESS_STATUS, expandedTable); } + public synchronized Pair tableColumnCheckForColumnAltering( + final String database, + final String tableName, + final String columnName, + final TSDataType dataType) + throws MetadataException { + final TsTable originalTable = getTableIfExists(database, tableName).orElse(null); + + if (Objects.isNull(originalTable)) { + return new Pair<>( + RpcUtils.getStatus( + TSStatusCode.TABLE_NOT_EXISTS, + String.format("Table '%s.%s' does not exist", database, tableName)), + null); + } + TSStatus tsStatus = + clusterSchemaInfo.preAlterColumnDataType(database, tableName, columnName, dataType); + if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return new Pair<>(tsStatus, null); + } + + final TsTable alteredTable = new TsTable(originalTable); + alteredTable.getColumnSchema(columnName).setDataType(dataType); + + return new Pair<>(RpcUtils.SUCCESS_STATUS, alteredTable); + } + public synchronized Pair tableColumnCheckForColumnRenaming( final String database, final String tableName, final String oldName, final String newName) throws MetadataException { @@ -1216,7 +1244,7 @@ public synchronized Pair tableColumnCheckForColumnRenaming( null); } - final TsTable expandedTable = TsTable.deserialize(ByteBuffer.wrap(originalTable.serialize())); + final TsTable expandedTable = new TsTable(originalTable); final TsTableColumnSchema schema = originalTable.getColumnSchema(oldName); if (Objects.isNull(schema)) { @@ -1318,7 +1346,7 @@ public synchronized Pair updateTableProperties( return new Pair<>(RpcUtils.SUCCESS_STATUS, null); } - final TsTable updatedTable = TsTable.deserialize(ByteBuffer.wrap(originalTable.serialize())); + final TsTable updatedTable = new TsTable(originalTable); updatedProperties.forEach( (k, v) -> { originalProperties.put(k, originalTable.getPropValue(k).orElse(null)); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 712d8e3bf4c5..cb912794343c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -119,6 +119,7 @@ import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan; import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan; @@ -541,6 +542,9 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) return clusterSchemaInfo.preDeleteTable((PreDeleteTablePlan) physicalPlan); case CommitDeleteTable: return clusterSchemaInfo.dropTable((CommitDeleteTablePlan) physicalPlan); + case AlterColumnDataType: + return clusterSchemaInfo.commitAlterColumnDataType( + ((AlterColumnDataTypePlan) physicalPlan)); case CreatePipeV2: return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan); case SetPipeStatusV2: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java index 069c5d99400f..8c03e76a2d14 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java @@ -49,6 +49,7 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan; import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan; @@ -77,6 +78,7 @@ import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp; import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; +import org.apache.iotdb.confignode.persistence.schema.ConfigMTree.TableSchemaDetails; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; @@ -87,6 +89,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1226,16 +1229,19 @@ public DescTableResp descTable(final DescTablePlan plan) { try { final PartialPath databasePath = getQualifiedDatabasePartialPath(plan.getDatabase()); if (plan.isDetails()) { - final Pair> pair = + final TableSchemaDetails details = tableModelMTree.getTableSchemaDetails(databasePath, plan.getTableName()); - return new DescTableResp(StatusUtils.OK, pair.getLeft(), pair.getRight()); + return new DescTableResp( + StatusUtils.OK, details.table, details.preDeletedColumns, details.preAlteredColumns); } return new DescTableResp( StatusUtils.OK, tableModelMTree.getUsingTableSchema(databasePath, plan.getTableName()), + null, null); } catch (final MetadataException e) { - return new DescTableResp(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()), null, null); + return new DescTableResp( + RpcUtils.getStatus(e.getErrorCode(), e.getMessage()), null, null, null); } finally { databaseReadWriteLock.readLock().unlock(); } @@ -1363,6 +1369,41 @@ public TSStatus commitDeleteColumn(final CommitDeleteColumnPlan plan) { } } + public TSStatus preAlterColumnDataType( + String databaseName, String tableName, String columnName, TSDataType dataType) { + databaseReadWriteLock.writeLock().lock(); + try { + final TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + tableModelMTree.preAlterColumnDataType( + getQualifiedDatabasePartialPath(databaseName), tableName, columnName, dataType); + return status; + } catch (final MetadataException e) { + LOGGER.warn(e.getMessage(), e); + return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); + } catch (final SemanticException e) { + return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR.getStatusCode(), e.getMessage()); + } finally { + databaseReadWriteLock.writeLock().unlock(); + } + } + + public TSStatus commitAlterColumnDataType(AlterColumnDataTypePlan plan) { + databaseReadWriteLock.writeLock().lock(); + try { + tableModelMTree.commitAlterColumnDataType( + getQualifiedDatabasePartialPath(plan.getDatabase()), + plan.getTableName(), + plan.getColumnName(), + plan.getNewType()); + return RpcUtils.SUCCESS_STATUS; + } catch (final MetadataException e) { + LOGGER.warn(e.getMessage(), e); + return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); + } finally { + databaseReadWriteLock.writeLock().unlock(); + } + } + // endregion @TestOnly diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java index bb6474258bd0..5c6babfea32f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java @@ -53,7 +53,9 @@ import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaFormatUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -71,6 +72,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -850,7 +852,7 @@ public boolean preDeleteColumn( } if (columnSchema.getColumnCategory() == TsTableColumnCategory.TAG || columnSchema.getColumnCategory() == TsTableColumnCategory.TIME) { - throw new SemanticException("Dropping id or time column is not supported."); + throw new SemanticException("Dropping tag or time column is not supported."); } node.addPreDeletedColumn(columnName); @@ -868,21 +870,70 @@ public void commitDeleteColumn( } } + public void preAlterColumnDataType( + PartialPath database, String tableName, String columnName, TSDataType dataType) + throws MetadataException { + final ConfigTableNode node = getTableNode(database, tableName); + final TsTableColumnSchema columnSchema = node.getTable().getColumnSchema(columnName); + if (Objects.isNull(columnSchema)) { + throw new ColumnNotExistsException( + PathUtils.unQualifyDatabaseName(database.getFullPath()), tableName, columnName); + } + if (columnSchema.getColumnCategory() != TsTableColumnCategory.FIELD) { + throw new SemanticException("Can only alter datatype of FIELD columns"); + } + if (!dataType.isCompatible(columnSchema.getDataType())) { + throw new SemanticException( + String.format( + "New type %s is not compatible with the existing one %s", + dataType, columnSchema.getDataType())); + } + + node.addPreAlteredColumn(columnName, dataType); + } + + public void commitAlterColumnDataType( + PartialPath database, String tableName, String columnName, TSDataType dataType) + throws MetadataException { + final ConfigTableNode node = getTableNode(database, tableName); + final TsTable table = getTable(database, tableName); + if (Objects.nonNull(table.getColumnSchema(columnName))) { + table.getColumnSchema(columnName).setDataType(dataType); + node.removePreAlteredColumn(columnName); + } + } + public TsTable getUsingTableSchema(final PartialPath database, final String tableName) throws MetadataException { final ConfigTableNode node = getTableNode(database, tableName); - if (node.getPreDeletedColumns().isEmpty()) { + if (node.getPreDeletedColumns().isEmpty() && node.getPreAlteredColumns().isEmpty()) { return node.getTable(); } - final TsTable newTable = TsTable.deserialize(ByteBuffer.wrap(node.getTable().serialize())); - node.getPreDeletedColumns().forEach(newTable::removeColumnSchema); + final TsTable newTable = new TsTable(node.getTable()); + if (!node.getPreDeletedColumns().isEmpty()) { + node.getPreDeletedColumns().forEach(newTable::removeColumnSchema); + } + if (!node.getPreAlteredColumns().isEmpty()) { + node.getPreAlteredColumns() + .forEach((col, type) -> newTable.getColumnSchema(col).setDataType(type)); + } return newTable; } - public Pair> getTableSchemaDetails( + public TableSchemaDetails getTableSchemaDetails( final PartialPath database, final String tableName) throws MetadataException { final ConfigTableNode node = getTableNode(database, tableName); - return new Pair<>(node.getTable(), node.getPreDeletedColumns()); + TableSchemaDetails tableSchemaDetails = new TableSchemaDetails(); + tableSchemaDetails.table = node.getTable(); + tableSchemaDetails.preDeletedColumns = node.getPreDeletedColumns(); + tableSchemaDetails.preAlteredColumns = node.getPreAlteredColumns(); + return tableSchemaDetails; + } + + public static class TableSchemaDetails { + public TsTable table; + public Set preDeletedColumns; + public Map preAlteredColumns; } private TsTable getTable(final PartialPath database, final String tableName) @@ -964,6 +1015,11 @@ private void serializeTableNode(final ConfigTableNode tableNode, final OutputStr for (final String column : preDeletedColumns) { ReadWriteIOUtils.write(column, outputStream); } + ReadWriteForEncodingUtils.writeVarInt(tableNode.getPreAlteredColumns().size(), outputStream); + for (Entry entry : tableNode.getPreAlteredColumns().entrySet()) { + ReadWriteIOUtils.writeVar(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } } public void deserialize(final InputStream inputStream) throws IOException { @@ -1057,10 +1113,17 @@ private IConfigMNode deserializeTableMNode(final InputStream inputStream) throws new ConfigTableNode(null, ReadWriteIOUtils.readString(inputStream)); tableNode.setTable(TsTable.deserialize(inputStream)); tableNode.setStatus(TableNodeStatus.deserialize(inputStream)); - final int size = ReadWriteIOUtils.readInt(inputStream); + int size = ReadWriteIOUtils.readInt(inputStream); for (int i = 0; i < size; ++i) { tableNode.addPreDeletedColumn(ReadWriteIOUtils.readString(inputStream)); } + + size = ReadWriteForEncodingUtils.readVarInt(inputStream); + for (int i = 0; i < size; i++) { + tableNode.addPreAlteredColumn( + ReadWriteIOUtils.readVarIntString(inputStream), + ReadWriteIOUtils.readDataType(inputStream)); + } return tableNode; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/impl/ConfigTableNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/impl/ConfigTableNode.java index 0bb306e3f3f0..b95b50ff0bef 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/impl/ConfigTableNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/impl/ConfigTableNode.java @@ -31,8 +31,11 @@ import org.apache.iotdb.confignode.persistence.schema.mnode.container.ConfigMNodeContainer; import org.apache.iotdb.confignode.persistence.schema.mnode.info.ConfigTableInfo; +import org.apache.tsfile.enums.TSDataType; + import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE; @@ -70,6 +73,10 @@ public Set getPreDeletedColumns() { return tableNodeInfo.getPreDeletedColumns(); } + public Map getPreAlteredColumns() { + return tableNodeInfo.getPreAlteredColumns(); + } + public void addPreDeletedColumn(final String column) { tableNodeInfo.addPreDeletedColumn(column); } @@ -78,6 +85,14 @@ public void removePreDeletedColumn(final String column) { tableNodeInfo.removePreDeletedColumn(column); } + public void addPreAlteredColumn(final String column, TSDataType dataType) { + tableNodeInfo.addPreAlteredColumn(column, dataType); + } + + public void removePreAlteredColumn(final String column) { + tableNodeInfo.removePreAlteredColumn(column); + } + @Override public String getName() { return tableNodeInfo.getName(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/info/ConfigTableInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/info/ConfigTableInfo.java index 5e0fdfba7fda..23b988101dfc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/info/ConfigTableInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/mnode/info/ConfigTableInfo.java @@ -23,9 +23,12 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.BasicMNodeInfo; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.RamUsageEstimator; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class ConfigTableInfo extends BasicMNodeInfo { @@ -37,6 +40,7 @@ public class ConfigTableInfo extends BasicMNodeInfo { // This shall be only one because concurrent modifications of one table is not allowed private final Set preDeletedColumns = new HashSet<>(); + private final Map preAlteredColumns = new HashMap<>(); public ConfigTableInfo(final String name) { super(name); @@ -62,6 +66,10 @@ public Set getPreDeletedColumns() { return preDeletedColumns; } + public Map getPreAlteredColumns() { + return preAlteredColumns; + } + public void addPreDeletedColumn(final String column) { preDeletedColumns.add(column); } @@ -81,4 +89,12 @@ public int estimateSize() { .map(column -> (int) RamUsageEstimator.sizeOf(column)) .reduce(0, Integer::sum); } + + public void addPreAlteredColumn(String column, TSDataType dataType) { + preAlteredColumns.put(column, dataType); + } + + public void removePreAlteredColumn(String column) { + preAlteredColumns.remove(column); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java new file mode 100644 index 000000000000..24ab17f8266c --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema.table; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; +import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils; +import org.apache.iotdb.confignode.procedure.state.schema.AlterTableColumnDataTypeState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class AlterTableColumnDataTypeProcedure + extends AbstractAlterOrDropTableProcedure { + private static final Logger LOGGER = + LoggerFactory.getLogger(AlterTableColumnDataTypeProcedure.class); + + private String columnName; + private TSDataType dataType; + + public AlterTableColumnDataTypeProcedure() { + super(); + } + + public AlterTableColumnDataTypeProcedure( + final String database, + final String tableName, + final String queryId, + final String columnName, + final TSDataType dataType) { + super(database, tableName, queryId); + this.columnName = columnName; + this.dataType = dataType; + } + + @Override + protected String getActionMessage() { + return "Alter table column data type"; + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final AlterTableColumnDataTypeState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case CHECK_AND_INVALIDATE_COLUMN: + LOGGER.info( + "Check and invalidate column {} in {}.{} when altering column data type", + columnName, + database, + tableName); + checkAndPreAlterColumn(env); + break; + case PRE_RELEASE: + LOGGER.info("Pre-release info of table {}.{} when altering column", database, tableName); + preRelease(env); + break; + case ALTER_TABLE_COLUMN_DATA_TYPE: + LOGGER.info("Altering column {} in {}.{} on configNode", columnName, database, tableName); + alterColumnDataType(env); + break; + case COMMIT_RELEASE: + LOGGER.info( + "Commit release info of table {}.{} when altering column", database, tableName); + commitRelease(env); + return Flow.NO_MORE_STATE; + default: + setFailure( + new ProcedureException("Unrecognized AlterTableColumnDataTypeProcedure " + state)); + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } finally { + LOGGER.info( + "AlterTableColumnDataType-{}.{}-{} costs {}ms", + database, + tableName, + state, + (System.currentTimeMillis() - startTime)); + } + } + + @Override + protected void preRelease(ConfigNodeProcedureEnv env) { + super.preRelease(env); + setNextState(AlterTableColumnDataTypeState.ALTER_TABLE_COLUMN_DATA_TYPE); + } + + private void checkAndPreAlterColumn(final ConfigNodeProcedureEnv env) { + try { + final Pair result = + env.getConfigManager() + .getClusterSchemaManager() + .tableColumnCheckForColumnAltering(database, tableName, columnName, dataType); + final TSStatus status = result.getLeft(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure( + new ProcedureException(new IoTDBException(status.getMessage(), status.getCode()))); + return; + } + table = result.getRight(); + setNextState(AlterTableColumnDataTypeState.PRE_RELEASE); + } catch (final MetadataException e) { + setFailure(new ProcedureException(e)); + } + } + + private void alterColumnDataType(final ConfigNodeProcedureEnv env) { + final TSStatus status = + SchemaUtils.executeInConsensusLayer( + new AlterColumnDataTypePlan(database, tableName, columnName, dataType), env, LOGGER); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode()))); + } + setNextState(AlterTableColumnDataTypeState.COMMIT_RELEASE); + } + + @Override + protected boolean isRollbackSupported(final AlterTableColumnDataTypeState state) { + return false; + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv configNodeProcedureEnv, + final AlterTableColumnDataTypeState alterTableColumnDataTypeState) + throws IOException, InterruptedException, ProcedureException { + // Do nothing + } + + @Override + protected AlterTableColumnDataTypeState getState(final int stateId) { + return AlterTableColumnDataTypeState.values()[stateId]; + } + + @Override + protected int getStateId(final AlterTableColumnDataTypeState alterTableColumnDataTypeState) { + return alterTableColumnDataTypeState.ordinal(); + } + + @Override + protected AlterTableColumnDataTypeState getInitialState() { + return AlterTableColumnDataTypeState.CHECK_AND_INVALIDATE_COLUMN; + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE.getTypeCode()); + super.serialize(stream); + + ReadWriteIOUtils.write(columnName, stream); + ReadWriteIOUtils.write(dataType, stream); + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + + this.columnName = ReadWriteIOUtils.readString(byteBuffer); + this.dataType = ReadWriteIOUtils.readDataType(byteBuffer); + } + + @Override + public boolean equals(final Object o) { + return super.equals(o) + && Objects.equals(columnName, ((AlterTableColumnDataTypeProcedure) o).columnName) + && Objects.equals(dataType, ((AlterTableColumnDataTypeProcedure) o).dataType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), columnName, dataType); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DropTableColumnProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DropTableColumnProcedure.java index 42523cd02a8b..764ee61ee0f0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DropTableColumnProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DropTableColumnProcedure.java @@ -114,7 +114,7 @@ protected Flow executeFromState( dropColumn(env); return Flow.NO_MORE_STATE; default: - setFailure(new ProcedureException("Unrecognized CreateTableState " + state)); + setFailure(new ProcedureException("Unrecognized DropTableColumnState " + state)); return Flow.NO_MORE_STATE; } return Flow.HAS_MORE_STATE; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AlterTableColumnDataTypeState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AlterTableColumnDataTypeState.java new file mode 100644 index 000000000000..736e9fa4918d --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AlterTableColumnDataTypeState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.state.schema; + +public enum AlterTableColumnDataTypeState { + CHECK_AND_INVALIDATE_COLUMN, + PRE_RELEASE, + ALTER_TABLE_COLUMN_DATA_TYPE, + COMMIT_RELEASE +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index 42ef4a1b8069..3805db28be42 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -51,6 +51,7 @@ import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.AddTableColumnProcedure; +import org.apache.iotdb.confignode.procedure.impl.schema.table.AlterTableColumnDataTypeProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.DeleteDevicesProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.DropTableColumnProcedure; @@ -207,6 +208,9 @@ public Procedure create(ByteBuffer buffer) throws IOException { case DROP_TABLE_COLUMN_PROCEDURE: procedure = new DropTableColumnProcedure(); break; + case ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE: + procedure = new AlterTableColumnDataTypeProcedure(); + break; case DROP_TABLE_PROCEDURE: procedure = new DropTableProcedure(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 9928a214b757..3135fd30561c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -70,6 +70,7 @@ public enum ProcedureType { RENAME_TABLE_COLUMN_PROCEDURE((short) 754), DROP_TABLE_COLUMN_PROCEDURE((short) 755), DELETE_DEVICES_PROCEDURE((short) 756), + ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE((short) 757), /** AI Model */ CREATE_MODEL_PROCEDURE((short) 800), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 4bd5f675c32c..cfdcdd2ec77f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -164,6 +164,7 @@ import org.apache.iotdb.db.service.RegionMigrateService; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 9151ca14cba4..fb0980fff19f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -58,6 +58,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.security.AllowAllAccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterColumnDataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; @@ -390,6 +391,7 @@ private IQueryExecution createQueryExecutionForTableModel( || statement instanceof DescribeTable || statement instanceof ShowTables || statement instanceof AddColumn + || statement instanceof AlterColumnDataType || statement instanceof SetProperties || statement instanceof DropColumn || statement instanceof DropTable diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 8abe782d5fda..acd70459748a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowVariablesTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.AlterColumnDataTypeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.AlterDBTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.AlterTableAddColumnTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.AlterTableDropColumnTask; @@ -87,6 +88,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableHeaderSchemaValidator; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterColumnDataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; @@ -404,6 +406,25 @@ protected IConfigTask visitCreateTable(final CreateTable node, final MPPQueryCon return new CreateTableTask(table, databaseTablePair.getLeft(), node.isIfNotExists()); } + @Override + protected IConfigTask visitAlterColumnDataType( + AlterColumnDataType node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + final Pair databaseTablePair = splitQualifiedName(node.getTableName(), true); + final String columnName = node.getColumnName().getValue(); + final DataType dataType = node.getDataType(); + final boolean ifTableExists = node.isIfTableExists(); + final boolean ifColumnExists = node.isIfColumnExists(); + return new AlterColumnDataTypeTask( + databaseTablePair.getLeft(), + databaseTablePair.getRight(), + context.getQueryId().getId(), + ifTableExists, + ifColumnExists, + columnName, + getDataType(dataType)); + } + private boolean checkTimeColumnIdempotent( final TsTableColumnCategory category, final String columnName, final TSDataType dataType) { if (category == TsTableColumnCategory.TIME || columnName.equals(TsTable.TIME_COLUMN_NAME)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 4a14c71a3d37..42f99f634070 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -286,6 +286,7 @@ import org.apache.thrift.transport.TTransportException; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3262,7 +3263,8 @@ public SettableFuture describeTable( } final TsTable table = TsTableInternalRPCUtil.deserializeSingleTsTable(resp.getTableInfo()); if (isDetails) { - DescribeTableDetailsTask.buildTsBlock(table, resp.getPreDeletedColumns(), future); + DescribeTableDetailsTask.buildTsBlock( + table, resp.getPreDeletedColumns(), resp.preAlteredColumns, future); } else { DescribeTableTask.buildTsBlock(table, future); } @@ -3393,6 +3395,43 @@ public SettableFuture alterTableAddColumn( return future; } + @Override + public SettableFuture alterColumnDataType( + String database, + String tableName, + String columnName, + TSDataType newType, + String queryId, + boolean ifTableExists, + boolean ifColumnExists) { + final SettableFuture future = SettableFuture.create(); + try (final ConfigNodeClient client = + CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + + final TSStatus tsStatus = + sendAlterReq2ConfigNode( + database, + tableName, + queryId, + AlterOrDropTableOperationType.ALTER_COLUMN_DATA_TYPE, + TsTableColumnSchemaUtil.serialize(columnName, newType), + client); + + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode() + || (TSStatusCode.TABLE_NOT_EXISTS.getStatusCode() == tsStatus.getCode() && ifTableExists) + || (TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode() == tsStatus.getCode() + && ifColumnExists)) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException( + new IoTDBException(getTableErrorMessage(tsStatus, database), tsStatus.getCode())); + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + return future; + } + @Override public SettableFuture alterTableRenameColumn( final String database, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 179d24550376..3ab2dd11df48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -91,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import com.google.common.util.concurrent.SettableFuture; +import org.apache.tsfile.enums.TSDataType; import javax.annotation.Nullable; @@ -323,6 +324,15 @@ SettableFuture alterTableAddColumn( final boolean tableIfExists, final boolean columnIfExists); + SettableFuture alterColumnDataType( + final String database, + final String tableName, + final String columnName, + final TSDataType newType, + final String queryId, + final boolean tableIfExists, + boolean ifColumnExists); + SettableFuture alterTableRenameColumn( final String database, final String tableName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterColumnDataTypeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterColumnDataTypeTask.java new file mode 100644 index 000000000000..fc769020f075 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterColumnDataTypeTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.enums.TSDataType; + +public class AlterColumnDataTypeTask extends AbstractAlterOrDropTableTask { + private final String columnName; + private final TSDataType newType; + private final boolean ifColumnExists; + + public AlterColumnDataTypeTask( + String database, + String tableName, + String queryId, + boolean tableIfExists, + boolean ifColumnExists, + String columnName, + TSDataType newType) { + super(database, tableName, queryId, tableIfExists); + this.columnName = columnName; + this.newType = newType; + this.ifColumnExists = ifColumnExists; + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.alterColumnDataType( + database, tableName, columnName, newType, queryId, tableIfExists, ifColumnExists); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DescribeTableDetailsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DescribeTableDetailsTask.java index ad6220ffc1a8..c2cfe8ce3105 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DescribeTableDetailsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DescribeTableDetailsTask.java @@ -37,6 +37,7 @@ import org.apache.tsfile.utils.Binary; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -54,6 +55,7 @@ public ListenableFuture execute(final IConfigTaskExecutor conf public static void buildTsBlock( final TsTable table, final Set preDeletedColumns, + final Map preAlteredColumns, final SettableFuture future) { final List outputDataTypes = ColumnHeaderConstant.describeTableDetailsColumnHeaders.stream() @@ -63,22 +65,27 @@ public static void buildTsBlock( final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes); for (final TsTableColumnSchema columnSchema : table.getColumnList()) { builder.getTimeColumnBuilder().writeLong(0L); + String columnStatus = "USING"; + String dataTypeName = columnSchema.getDataType().name(); + if (preDeletedColumns.contains(columnSchema.getColumnName())) { + columnStatus = "PRE_DELETE"; + } else if (preAlteredColumns.containsKey(columnSchema.getColumnName())) { + columnStatus = "PRE_ALTER"; + dataTypeName += "->" + preAlteredColumns.get(columnSchema.getColumnName()); + } builder .getColumnBuilder(0) .writeBinary(new Binary(columnSchema.getColumnName(), TSFileConfig.STRING_CHARSET)); builder .getColumnBuilder(1) - .writeBinary(new Binary(columnSchema.getDataType().name(), TSFileConfig.STRING_CHARSET)); + .writeBinary(new Binary(dataTypeName, TSFileConfig.STRING_CHARSET)); builder .getColumnBuilder(2) .writeBinary( new Binary(columnSchema.getColumnCategory().name(), TSFileConfig.STRING_CHARSET)); builder .getColumnBuilder(3) - .writeBinary( - new Binary( - preDeletedColumns.contains(columnSchema.getColumnName()) ? "PRE_DELETE" : "USING", - TSFileConfig.STRING_CHARSET)); + .writeBinary(new Binary(columnStatus, TSFileConfig.STRING_CHARSET)); builder.declarePosition(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterColumnDataType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterColumnDataType.java new file mode 100644 index 000000000000..a427d2342ba9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterColumnDataType.java @@ -0,0 +1,91 @@ +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class AlterColumnDataType extends Statement { + private final QualifiedName tableName; + private final Identifier columnName; + private final DataType dataType; + private final boolean ifTableExists; + private final boolean ifColumnExists; + + public AlterColumnDataType( + @Nullable NodeLocation location, + QualifiedName tableName, + Identifier columnName, + DataType dataType, + boolean ifTableExists, + boolean ifColumnExists) { + super(location); + this.tableName = tableName; + this.columnName = columnName; + this.dataType = dataType; + this.ifTableExists = ifTableExists; + this.ifColumnExists = ifColumnExists; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AlterColumnDataType that = (AlterColumnDataType) o; + return Objects.equals(tableName, that.tableName) + && Objects.equals(columnName, that.columnName) + && Objects.equals(dataType, that.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, columnName, dataType); + } + + @Override + public String toString() { + return "AlterColumnDataType{" + + "tableName=" + + tableName + + ", columnName=" + + columnName + + ", dataType=" + + dataType + + '}'; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitAlterColumnDataType(this, context); + } + + public QualifiedName getTableName() { + return tableName; + } + + public Identifier getColumnName() { + return columnName; + } + + public DataType getDataType() { + return dataType; + } + + public boolean isIfTableExists() { + return ifTableExists; + } + + public boolean isIfColumnExists() { + return ifColumnExists; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 43bc9223b114..80cad24934fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -616,4 +616,8 @@ protected R visitShowStatement(ShowStatement node, C context) { protected R visitKillQuery(KillQuery node, C context) { return visitStatement(node, context); } + + protected R visitAlterColumnDataType(AlterColumnDataType node, C context) { + return visitStatement(node, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index d3b38af0f581..9e0db8fc2be9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllColumns; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllRows; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterColumnDataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ArithmeticBinaryExpression; @@ -184,6 +185,7 @@ import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlBaseVisitor; import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlLexer; import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser; +import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser.AlterColumnDataTypeContext; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator; import org.apache.iotdb.db.utils.DateTimeUtils; @@ -443,6 +445,25 @@ public Node visitSetTableProperties(final RelationalSqlParser.SetTableProperties Objects.nonNull(ctx.EXISTS())); } + @Override + public Node visitAlterColumnDataType(AlterColumnDataTypeContext ctx) { + QualifiedName tableName = getQualifiedName(ctx.tableName); + Identifier columnName = lowerIdentifier((Identifier) visit(ctx.identifier())); + DataType dataType = (DataType) visit(ctx.new_type); + boolean ifTableExists = + ctx.EXISTS().stream() + .anyMatch( + node -> + node.getSymbol().getTokenIndex() < ctx.COLUMN().getSymbol().getTokenIndex()); + boolean ifColumnExists = + ctx.EXISTS().stream() + .anyMatch( + node -> + node.getSymbol().getTokenIndex() > ctx.COLUMN().getSymbol().getTokenIndex()); + return new AlterColumnDataType( + getLocation(ctx), tableName, columnName, dataType, ifTableExists, ifColumnExists); + } + @Override public Node visitCreateIndexStatement(RelationalSqlParser.CreateIndexStatementContext ctx) { return new CreateIndex( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 2efc10e08518..794ece538422 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1883,7 +1883,7 @@ private void waitClosingTsFileProcessorFinished() throws InterruptedException { } /** close all working tsfile processors */ - private List> asyncCloseAllWorkingTsFileProcessors() { + public List> asyncCloseAllWorkingTsFileProcessors() { writeLock("asyncCloseAllWorkingTsFileProcessors"); List> futures = new ArrayList<>(); int count = 0; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java index bdae8196a5d1..b22575f5c469 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java @@ -25,7 +25,9 @@ public enum AlterOrDropTableOperationType { RENAME_COLUMN((byte) 2), DROP_COLUMN((byte) 3), RENAME_TABLE((byte) 4), - DROP_TABLE((byte) 5); + DROP_TABLE((byte) 5), + ALTER_COLUMN_DATA_TYPE((byte) 6), + ; private final byte type; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 905620f4613c..e37ee361da0a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -33,7 +33,6 @@ import javax.annotation.concurrent.ThreadSafe; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -87,6 +86,16 @@ public TsTable(String tableName, ImmutableList columnSchema columnSchema -> columnSchemaMap.put(columnSchema.getColumnName(), columnSchema)); } + public TsTable(TsTable origin) { + this.tableName = origin.tableName; + origin.columnSchemaMap.forEach((col, schema) -> this.columnSchemaMap.put(col, schema.copy())); + this.idColumnIndexMap.putAll(origin.idColumnIndexMap); + this.props = origin.props == null ? null : new HashMap<>(origin.props); + this.ttlValue = origin.ttlValue; + this.idNums = origin.idNums; + this.measurementNum = origin.measurementNum; + } + public String getTableName() { return tableName; } @@ -263,16 +272,6 @@ public void removeProp(final String key) { } } - public byte[] serialize() { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - try { - serialize(stream); - } catch (IOException ignored) { - // won't happen - } - return stream.toByteArray(); - } - public void serialize(final OutputStream stream) throws IOException { ReadWriteIOUtils.write(tableName, stream); ReadWriteIOUtils.write(columnSchemaMap.size(), stream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/AttributeColumnSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/AttributeColumnSchema.java index 8a3f21930d2c..513aecdf61ca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/AttributeColumnSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/AttributeColumnSchema.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; public class AttributeColumnSchema extends TsTableColumnSchema { @@ -55,4 +56,10 @@ static AttributeColumnSchema deserialize(final ByteBuffer buffer) { final Map props = ReadWriteIOUtils.readMap(buffer); return new AttributeColumnSchema(columnName, dataType, props); } + + @Override + public TsTableColumnSchema copy() { + return new AttributeColumnSchema( + columnName, dataType, props == null ? null : new HashMap<>(props)); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/FieldColumnSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/FieldColumnSchema.java index c2728fb47e8b..4cb78ab41ce8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/FieldColumnSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/FieldColumnSchema.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; public class FieldColumnSchema extends TsTableColumnSchema { @@ -109,4 +110,10 @@ static FieldColumnSchema deserialize(final ByteBuffer buffer) { final Map props = ReadWriteIOUtils.readMap(buffer); return new FieldColumnSchema(columnName, dataType, encoding, compressor, props); } + + @Override + public TsTableColumnSchema copy() { + return new FieldColumnSchema( + columnName, dataType, encoding, compressor, props == null ? null : new HashMap<>(props)); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TagColumnSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TagColumnSchema.java index 83c7bde36b17..8ba20a146f9a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TagColumnSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TagColumnSchema.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; public class TagColumnSchema extends TsTableColumnSchema { @@ -55,4 +56,9 @@ static TagColumnSchema deserialize(final ByteBuffer buffer) { Map props = ReadWriteIOUtils.readMap(buffer); return new TagColumnSchema(columnName, dataType, props); } + + @Override + public TsTableColumnSchema copy() { + return new TagColumnSchema(columnName, dataType, props == null ? null : new HashMap<>(props)); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TimeColumnSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TimeColumnSchema.java index 4349d3e134a7..597cf51b53cf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TimeColumnSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TimeColumnSchema.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; public class TimeColumnSchema extends TsTableColumnSchema { @@ -55,4 +56,9 @@ static TimeColumnSchema deserialize(final ByteBuffer buffer) { final Map props = ReadWriteIOUtils.readMap(buffer); return new TimeColumnSchema(columnName, dataType, props); } + + @Override + public TsTableColumnSchema copy() { + return new TimeColumnSchema(columnName, dataType, props == null ? null : new HashMap<>(props)); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java index 2bbb1562fe83..9ad6114aea9b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java @@ -76,4 +76,6 @@ public int hashCode() { public void setDataType(final TSDataType dataType) { this.dataType = dataType; } + + public abstract TsTableColumnSchema copy(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java index 2c2f2340fdef..ab6bbe2ac8a1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.schema.table.column; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.ByteArrayOutputStream; @@ -103,6 +104,17 @@ public static void serialize(List columnSchemaList, OutputS } } + public static byte[] serialize(String columnName, TSDataType dataType) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + ReadWriteIOUtils.writeVar(columnName, stream); + stream.write(dataType.serialize()); + } catch (IOException ignored) { + + } + return stream.toByteArray(); + } + public static List deserializeColumnSchemaList(ByteBuffer buffer) { int size = ReadWriteIOUtils.readInt(buffer); if (size == -1) { diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index fa599b9178f5..794dff013a2d 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -197,6 +197,7 @@ alterTableStatement | ALTER TABLE (IF EXISTS)? tableName=qualifiedName DROP COLUMN (IF EXISTS)? column=identifier #dropColumn // set TTL can use this | ALTER TABLE (IF EXISTS)? tableName=qualifiedName SET PROPERTIES propertyAssignments #setTableProperties + | ALTER TABLE (IF EXISTS)? tableName=qualifiedName ALTER COLUMN column=identifier (IF EXISTS)? SET DATA TYPE new_type=type #alterColumnDataType ; diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index f2e6c1400916..cbca5db7c4a7 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -1075,6 +1075,7 @@ struct TDescTableResp { 1: required common.TSStatus status 2: optional binary tableInfo 3: optional set preDeletedColumns + 4: optional map preAlteredColumns } struct TFetchTableResp { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index cfdd634a92f4..e02505d51a78 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -1139,7 +1139,6 @@ service IDataNodeRPCService { */ common.TSStatus deleteColumnData(TDeleteColumnDataReq req) - /** * Construct table device black list */ From e58e94d7caa6bc4b61fb60e5c787c02cd30302e9 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 7 Jan 2025 17:30:20 +0800 Subject: [PATCH 02/88] adapt write and add tests --- .../it/schema/IoTDBAlterColumnTypeIT.java | 145 ++++++++++++++++++ .../it/session/IoTDBSessionRelationalIT.java | 2 +- .../client/async/CnToDnAsyncRequestType.java | 3 +- .../rpc/DataNodeAsyncRequestRPCHandler.java | 1 - .../procedure/store/ProcedureFactory.java | 2 + .../DataTypeInconsistentException.java | 12 ++ .../storageengine/dataregion/DataRegion.java | 69 +++++++-- .../dataregion/memtable/AbstractMemTable.java | 30 ++++ .../memtable/AlignedWritableMemChunk.java | 19 +++ .../AlignedWritableMemChunkGroup.java | 7 + .../dataregion/memtable/IMemTable.java | 4 + .../memtable/IWritableMemChunkGroup.java | 4 + .../dataregion/memtable/TsFileProcessor.java | 4 + .../memtable/WritableMemChunkGroup.java | 18 +++ .../table/AlterOrDropTableOperationType.java | 2 + 15 files changed, 308 insertions(+), 14 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java new file mode 100644 index 000000000000..019d218934ea --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.schema; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.RowRecord; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.genValue; +import static org.junit.Assert.assertEquals; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBAlterColumnTypeIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test"); + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testWriteAndAlter() throws IoTDBConnectionException, StatementExecutionException { + for (TSDataType from : TSDataType.values()) { + for (TSDataType to : TSDataType.values()) { + System.out.printf("testing %s to %s%n", from, to); + doWriteAndAlter(from, to, false); + doWriteAndAlter(from, to, true); + } + } + } + + private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush) + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + // create a table with type of "from" + session.executeNonQueryStatement( + "CREATE TABLE IF NOT EXISTS write_and_alter_column_type (s1 " + from + ")"); + + // write a point of "from" + session.executeNonQueryStatement( + "INSERT INTO write_and_alter_column_type (time, s1) VALUES (1, " + + genValue(from, 1) + + ")"); + + if (flush) { + session.executeNonQueryStatement("FLUSH"); + } + + // alter the type to "to" + boolean isCompatible = to.isCompatible(from); + if (isCompatible) { + session.executeNonQueryStatement( + "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } else { + try { + session.executeNonQueryStatement( + "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } catch (StatementExecutionException e) { + assertEquals("", e.getMessage()); + } + } + + SessionDataSet dataSet = + session.executeQueryStatement("select * from write_and_alter_column_type order by time"); + RowRecord rec = dataSet.next(); + assertEquals(1, rec.getFields().get(0).getLongV()); + if (to == TSDataType.BLOB) { + assertEquals(genValue(to, 1), rec.getFields().get(1).getBinaryV()); + } else if (to == TSDataType.DATE) { + assertEquals(genValue(to, 1), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(to, 1).toString(), rec.getFields().get(1).toString()); + } + + // write a point + session.executeNonQueryStatement( + "INSERT INTO write_and_alter_column_type (time, s1) VALUES (2, " + + genValue(isCompatible ? to : from, 2) + + ")"); + + dataSet = + session.executeQueryStatement("select * from write_and_alter_column_type order by time"); + rec = dataSet.next(); + assertEquals(1, rec.getFields().get(0).getLongV()); + TSDataType newType = isCompatible ? to : from; + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 1).toString(), rec.getFields().get(1).toString()); + } + + rec = dataSet.next(); + assertEquals(2, rec.getFields().get(0).getLongV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 2), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 2), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 2).toString(), rec.getFields().get(1).toString()); + } + + session.executeNonQueryStatement("DROP TABLE write_and_alter_column_type"); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index 50ae3f55b6e6..5514c966d9e4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java @@ -1324,7 +1324,7 @@ private void testOneCastWithRow( } @SuppressWarnings("SameParameterValue") - private Object genValue(TSDataType dataType, int i) { + public static Object genValue(TSDataType dataType, int i) { switch (dataType) { case INT32: return i; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java index d982b3886e27..ffe95cd805ef 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java @@ -119,6 +119,5 @@ public enum CnToDnAsyncRequestType { ROLLBACK_TABLE_DEVICE_BLACK_LIST, INVALIDATE_MATCHED_TABLE_DEVICE_CACHE, DELETE_DATA_FOR_TABLE_DEVICE, - DELETE_TABLE_DEVICE_IN_BLACK_LIST, - ALTER_COLUMN_DATATYPE + DELETE_TABLE_DEVICE_IN_BLACK_LIST } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 1419482e8954..b1e8753ab49c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -89,7 +89,6 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( case ROLLBACK_VIEW_SCHEMA_BLACK_LIST: case DELETE_VIEW: case ALTER_VIEW: - case ALTER_COLUMN_DATATYPE: return new SchemaUpdateRPCHandler( requestType, requestId, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index 3805db28be42..0dbd4a33e1b1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -437,6 +437,8 @@ public static ProcedureType getProcedureType(final Procedure procedure) { return ProcedureType.NEVER_FINISH_PROCEDURE; } else if (procedure instanceof AddNeverFinishSubProcedureProcedure) { return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE; + } else if (procedure instanceof AlterTableColumnDataTypeProcedure) { + return ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE; } throw new UnsupportedOperationException( "Procedure type " + procedure.getClass() + " is not supported"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java new file mode 100644 index 000000000000..ce0dca64cb9b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java @@ -0,0 +1,12 @@ +package org.apache.iotdb.db.exception; + +import org.apache.tsfile.enums.TSDataType; + +public class DataTypeInconsistentException extends WriteProcessException { + + public DataTypeInconsistentException(TSDataType existing, TSDataType incoming) { + super( + String.format( + "Inconsistent data types, existing data type: %s, incoming: %s", existing, incoming)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 794ece538422..f6ba264bf5a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.DataRegionException; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; @@ -1052,8 +1053,14 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException { > lastFlushTimeMap.getFlushedTime(timePartitionId, insertRowNode.getDeviceID()); // insert to sequence or unSequence file - TsFileProcessor tsFileProcessor = - insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId); + TsFileProcessor tsFileProcessor; + try { + tsFileProcessor = insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId); + } catch (DataTypeInconsistentException e) { + tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, isSequence); + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + tsFileProcessor = insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId); + } // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) { @@ -1311,11 +1318,10 @@ private boolean insertTabletToTsFileProcessor( return false; } - // register TableSchema (and maybe more) for table insertion - registerToTsFile(insertTabletNode, tsFileProcessor); - try { - tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics); + tsFileProcessor = + insertTabletWithTypeConsistencyCheck( + tsFileProcessor, insertTabletNode, rangeList, results, noFailure, infoForMetrics); } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); return false; @@ -1331,6 +1337,29 @@ private boolean insertTabletToTsFileProcessor( return true; } + private TsFileProcessor insertTabletWithTypeConsistencyCheck( + TsFileProcessor tsFileProcessor, + InsertTabletNode insertTabletNode, + List rangeList, + TSStatus[] results, + boolean noFailure, + long[] infoForMetrics) + throws WriteProcessException { + try { + // register TableSchema (and maybe more) for table insertion + registerToTsFile(insertTabletNode, tsFileProcessor); + tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics); + } catch (DataTypeInconsistentException e) { + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + tsFileProcessor = + getOrCreateTsFileProcessor( + tsFileProcessor.getTimeRangeId(), tsFileProcessor.isSequence()); + registerToTsFile(insertTabletNode, tsFileProcessor); + tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics); + } + return tsFileProcessor; + } + private void registerToTsFile(InsertNode node, TsFileProcessor tsFileProcessor) { final String tableName = node.getTableName(); if (tableName != null) { @@ -1415,7 +1444,8 @@ private List insertToTsFileProcessors( TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics); + tsFileProcessor = + insertRowsWithTypeConsistencyCheck(tsFileProcessor, subInsertRowsNode, infoForMetrics); } catch (WriteProcessException e) { insertRowsNode .getResults() @@ -1424,8 +1454,7 @@ private List insertToTsFileProcessors( RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList()); - // register TableSchema (and maybe more) for table insertion - registerToTsFile(subInsertRowsNode, tsFileProcessor); + // check memtable size and may asyncTryToFlush the work memtable if (entry.getKey().shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); @@ -1434,6 +1463,24 @@ private List insertToTsFileProcessors( return executedInsertRowNodeList; } + private TsFileProcessor insertRowsWithTypeConsistencyCheck( + TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, long[] infoForMetrics) + throws WriteProcessException { + try { + // register TableSchema (and maybe more) for table insertion + registerToTsFile(subInsertRowsNode, tsFileProcessor); + tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics); + } catch (DataTypeInconsistentException e) { + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + tsFileProcessor = + getOrCreateTsFileProcessor( + tsFileProcessor.getTimeRangeId(), tsFileProcessor.isSequence()); + registerToTsFile(subInsertRowsNode, tsFileProcessor); + tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics); + } + return tsFileProcessor; + } + private void tryToUpdateInsertRowsLastCache(List nodeList) { for (InsertRowNode node : nodeList) { node.updateLastCache(databaseName); @@ -3384,7 +3431,9 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics); + tsFileProcessor = + insertRowsWithTypeConsistencyCheck( + tsFileProcessor, subInsertRowsNode, infoForMetrics); } catch (WriteProcessException e) { insertRowsOfOneDeviceNode .getResults() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 027de99166ba..4076c2aad7ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -29,11 +29,15 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; @@ -1057,4 +1061,30 @@ public void markAsNotGeneratedByPipe() { public boolean isTotallyGeneratedByPipe() { return this.isTotallyGeneratedByPipe.get(); } + + @Override + public void checkDataType(InsertNode node) throws DataTypeInconsistentException { + if (node instanceof InsertRowsNode) { + List insertRowNodeList = ((InsertRowsNode) node).getInsertRowNodeList(); + for (InsertRowNode insertRowNode : insertRowNodeList) { + doCheckDataType(insertRowNode); + } + } else if (node instanceof InsertMultiTabletsNode) { + List insertTabletNodeList = + ((InsertMultiTabletsNode) node).getInsertTabletNodeList(); + for (InsertTabletNode insertTabletNode : insertTabletNodeList) { + doCheckDataType(insertTabletNode); + } + } else { + doCheckDataType(node); + } + } + + private void doCheckDataType(InsertNode node) throws DataTypeInconsistentException { + IWritableMemChunkGroup memChunkGroup = memTableMap.get(node.getDeviceID()); + if (memChunkGroup == null) { + return; + } + memChunkGroup.checkDataType(node); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 3c906104c17e..4beb010e2879 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; @@ -558,4 +560,21 @@ public List getSchemaList() { public boolean isAllDeleted() { return list.isAllDeleted(); } + + public void checkDataType(InsertNode node) throws DataTypeInconsistentException { + for (MeasurementSchema incomingSchema : node.getMeasurementSchemas()) { + if (incomingSchema == null) { + continue; + } + + Integer index = measurementIndexMap.get(incomingSchema.getMeasurementName()); + if (index != null) { + IMeasurementSchema existingSchema = schemaList.get(index); + if (existingSchema.getType() != incomingSchema.getType()) { + throw new DataTypeInconsistentException( + existingSchema.getType(), incomingSchema.getType()); + } + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java index 609c37c3581c..9d5acb038a05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java @@ -21,6 +21,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; @@ -162,4 +164,9 @@ public static AlignedWritableMemChunkGroup deserialize( memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream, isTableModel); return memChunkGroup; } + + @Override + public void checkDataType(InsertNode node) throws DataTypeInconsistentException { + memChunk.checkDataType(node); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 35d9ad0e95c3..e5494cf23916 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -21,9 +21,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.IFullPath; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; @@ -202,4 +204,6 @@ void queryForDeviceRegionScan( void markAsNotGeneratedByPipe(); boolean isTotallyGeneratedByPipe(); + + void checkDataType(InsertNode node) throws DataTypeInconsistentException; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java index a2d23dc9db04..b4bee0a3fae4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; @@ -60,4 +62,6 @@ boolean writeWithFlushCheck( long getCurrentTVListSize(String measurement); long getMaxTime(); + + void checkDataType(InsertNode node) throws DataTypeInconsistentException; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 21e8b7515505..4467b360b222 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -269,6 +269,8 @@ public void insert(InsertRowNode insertRowNode, long[] infoForMetrics) throws WriteProcessException { ensureMemTable(infoForMetrics); + workMemTable.checkDataType(insertRowNode); + long[] memIncrements; long memControlStartTime = System.nanoTime(); @@ -349,6 +351,7 @@ public void insertRows(InsertRowsNode insertRowsNode, long[] infoForMetrics) throws WriteProcessException { ensureMemTable(infoForMetrics); + workMemTable.checkDataType(insertRowsNode); long[] memIncrements; @@ -541,6 +544,7 @@ public void insertTablet( throws WriteProcessException { ensureMemTable(infoForMetrics); + workMemTable.checkDataType(insertTabletNode); long[] memIncrements = scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure, infoForMetrics); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java index ac5d588eb4fd..c271087f0506 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.exception.DataTypeInconsistentException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; @@ -27,6 +29,7 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.DataInputStream; import java.io.IOException; @@ -199,4 +202,19 @@ public static WritableMemChunkGroup deserialize(DataInputStream stream) throws I } return memChunkGroup; } + + @Override + public void checkDataType(InsertNode node) throws DataTypeInconsistentException { + for (MeasurementSchema incomingSchema : node.getMeasurementSchemas()) { + if (incomingSchema == null) { + continue; + } + + IWritableMemChunk memChunk = memChunkMap.get(incomingSchema.getMeasurementName()); + if (memChunk != null && memChunk.getTVList().getDataType() != incomingSchema.getType()) { + throw new DataTypeInconsistentException( + memChunk.getTVList().getDataType(), incomingSchema.getType()); + } + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java index b22575f5c469..6dafc77d3038 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java @@ -53,6 +53,8 @@ public static AlterOrDropTableOperationType getType(final byte value) { return RENAME_TABLE; case 5: return DROP_TABLE; + case 6: + return ALTER_COLUMN_DATA_TYPE; default: throw new IllegalArgumentException(); } From c85cfa87584149fbdff5fe4d8fc48989f3afd0e2 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Fri, 10 Jan 2025 11:22:29 +0800 Subject: [PATCH 03/88] add test case --- .../it/schema/IoTDBAlterColumnTypeIT.java | 182 ++++++++++++++++-- .../confignode/manager/ProcedureManager.java | 6 +- .../operator/source/SeriesScanUtil.java | 35 +++- .../db/utils/datastructure/AlignedTVList.java | 31 ++- .../relational/grammar/sql/RelationalSql.g4 | 2 +- pom.xml | 2 +- 6 files changed, 226 insertions(+), 32 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java index 019d218934ea..737b9c46af54 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java @@ -30,14 +30,22 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.record.Tablet.ColumnCategory; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import static org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.genValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({TableLocalStandaloneIT.class, TableClusterIT.class}) @@ -58,8 +66,13 @@ public static void tearDown() throws Exception { @Test public void testWriteAndAlter() throws IoTDBConnectionException, StatementExecutionException { - for (TSDataType from : TSDataType.values()) { - for (TSDataType to : TSDataType.values()) { + Set typesToTest = new HashSet<>(); + Collections.addAll(typesToTest, TSDataType.values()); + typesToTest.remove(TSDataType.VECTOR); + typesToTest.remove(TSDataType.UNKNOWN); + + for (TSDataType from : typesToTest) { + for (TSDataType to : typesToTest) { System.out.printf("testing %s to %s%n", from, to); doWriteAndAlter(from, to, false); doWriteAndAlter(from, to, true); @@ -75,10 +88,16 @@ private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush) "CREATE TABLE IF NOT EXISTS write_and_alter_column_type (s1 " + from + ")"); // write a point of "from" - session.executeNonQueryStatement( - "INSERT INTO write_and_alter_column_type (time, s1) VALUES (1, " - + genValue(from, 1) - + ")"); + Tablet tablet = + new Tablet( + "write_and_alter_column_type", + Collections.singletonList("s1"), + Collections.singletonList(from), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, genValue(from, 1)); + session.insert(tablet); + tablet.reset(); if (flush) { session.executeNonQueryStatement("FLUSH"); @@ -94,33 +113,45 @@ private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush) session.executeNonQueryStatement( "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); } catch (StatementExecutionException e) { - assertEquals("", e.getMessage()); + assertEquals( + "701: New type " + to + " is not compatible with the existing one " + from, + e.getMessage()); } } SessionDataSet dataSet = session.executeQueryStatement("select * from write_and_alter_column_type order by time"); RowRecord rec = dataSet.next(); + TSDataType newType = isCompatible ? to : from; assertEquals(1, rec.getFields().get(0).getLongV()); - if (to == TSDataType.BLOB) { - assertEquals(genValue(to, 1), rec.getFields().get(1).getBinaryV()); - } else if (to == TSDataType.DATE) { - assertEquals(genValue(to, 1), rec.getFields().get(1).getDateV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV()); } else { - assertEquals(genValue(to, 1).toString(), rec.getFields().get(1).toString()); + assertEquals(genValue(newType, 1).toString(), rec.getFields().get(1).toString()); } // write a point - session.executeNonQueryStatement( - "INSERT INTO write_and_alter_column_type (time, s1) VALUES (2, " - + genValue(isCompatible ? to : from, 2) - + ")"); + tablet = + new Tablet( + "write_and_alter_column_type", + Collections.singletonList("s1"), + Collections.singletonList(newType), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 2); + tablet.addValue("s1", 0, genValue(newType, 2)); + session.insert(tablet); + tablet.reset(); + + if (flush) { + session.executeNonQueryStatement("FLUSH"); + } dataSet = session.executeQueryStatement("select * from write_and_alter_column_type order by time"); rec = dataSet.next(); assertEquals(1, rec.getFields().get(0).getLongV()); - TSDataType newType = isCompatible ? to : from; if (newType == TSDataType.BLOB) { assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); } else if (newType == TSDataType.DATE) { @@ -142,4 +173,121 @@ private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush) session.executeNonQueryStatement("DROP TABLE write_and_alter_column_type"); } } + + @Test + public void testAlterWithoutWrite() throws IoTDBConnectionException, StatementExecutionException { + Set typesToTest = new HashSet<>(); + Collections.addAll(typesToTest, TSDataType.values()); + typesToTest.remove(TSDataType.VECTOR); + typesToTest.remove(TSDataType.UNKNOWN); + + for (TSDataType from : typesToTest) { + for (TSDataType to : typesToTest) { + System.out.printf("testing %s to %s%n", from, to); + doAlterWithoutWrite(from, to, false); + doAlterWithoutWrite(from, to, true); + } + } + } + + private void doAlterWithoutWrite(TSDataType from, TSDataType to, boolean flush) + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + // create a table with type of "from" + session.executeNonQueryStatement( + "CREATE TABLE IF NOT EXISTS just_alter_column_type (s1 " + from + ")"); + + // alter the type to "to" + boolean isCompatible = to.isCompatible(from); + if (isCompatible) { + session.executeNonQueryStatement( + "ALTER TABLE just_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } else { + try { + session.executeNonQueryStatement( + "ALTER TABLE just_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } catch (StatementExecutionException e) { + assertEquals( + "701: New type " + to + " is not compatible with the existing one " + from, + e.getMessage()); + } + } + + TSDataType newType = isCompatible ? to : from; + + // write a point + Tablet tablet = + new Tablet( + "just_alter_column_type", + Collections.singletonList("s1"), + Collections.singletonList(newType), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, genValue(newType, 1)); + session.insert(tablet); + tablet.reset(); + + if (flush) { + session.executeNonQueryStatement("FLUSH"); + } + + SessionDataSet dataSet = + session.executeQueryStatement("select * from just_alter_column_type order by time"); + RowRecord rec = dataSet.next(); + assertEquals(1, rec.getFields().get(0).getLongV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 1).toString(), rec.getFields().get(1).toString()); + } + + assertFalse(dataSet.hasNext()); + + session.executeNonQueryStatement("DROP TABLE just_alter_column_type"); + } + } + + @Test + public void testAlterNonExist() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + try { + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN s1 SET DATA TYPE INT64"); + fail("Should throw exception"); + } catch (StatementExecutionException e) { + assertEquals("550: Table 'test.non_exist' does not exist", e.getMessage()); + } + session.executeNonQueryStatement( + "ALTER TABLE IF EXISTS non_exist ALTER COLUMN s1 SET DATA TYPE INT64"); + + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS non_exist (s1 int32)"); + + try { + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN s2 SET DATA TYPE INT64"); + fail("Should throw exception"); + } catch (StatementExecutionException e) { + assertEquals("616: Column s2 in table 'test.non_exist' does not exist.", e.getMessage()); + } + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN IF EXISTS s2 SET DATA TYPE INT64"); + } + } + + @Test + public void testAlterWrongType() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS wrong_type (s1 int32)"); + + try { + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN s1 SET DATA TYPE VECTOR"); + fail("Should throw exception"); + } catch (StatementExecutionException e) { + assertEquals("701: Unknown type: VECTOR", e.getMessage()); + } + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index f79cff952a99..36c3a3c097e0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -91,6 +91,7 @@ import org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.DeleteDevicesProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.DropTableColumnProcedure; +import org.apache.iotdb.confignode.procedure.impl.schema.table.DropTableProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.RenameTableColumnProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.table.SetTablePropertiesProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.CreateConsumerProcedure; @@ -1561,9 +1562,8 @@ public TSStatus dropTable(final TAlterOrDropTableReq req) { null, req.tableName, req.queryId, - ProcedureType.DROP_TABLE_COLUMN_PROCEDURE, - new DropTableColumnProcedure( - req.database, req.tableName, req.queryId, ReadWriteIOUtils.readString(req.updateInfo))); + ProcedureType.DROP_TABLE_PROCEDURE, + new DropTableProcedure(req.database, req.tableName, req.queryId)); } public TDeleteTableDeviceResp deleteDevices(final TDeleteTableDeviceReq req) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index cf7e633ec262..00f16bdcf296 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -36,10 +36,12 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IMetadata; import org.apache.tsfile.file.metadata.ITimeSeriesMetadata; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.block.TsBlock; @@ -1174,7 +1176,7 @@ private Optional unpackSeqTsFileResource() throws IOExcepti ITimeSeriesMetadata timeseriesMetadata = loadTimeSeriesMetadata(orderUtils.getNextSeqFileResource(true), true); // skip if data type is mismatched which may be caused by delete - if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) { + if (timeseriesMetadata != null && typeCompatible(timeseriesMetadata)) { timeseriesMetadata.setSeq(true); seqTimeSeriesMetadata.add(timeseriesMetadata); return Optional.of(timeseriesMetadata); @@ -1183,11 +1185,40 @@ private Optional unpackSeqTsFileResource() throws IOExcepti } } + private boolean typeCompatible(ITimeSeriesMetadata timeseriesMetadata) { + if (timeseriesMetadata instanceof TimeseriesMetadata) { + return getTsDataTypeList() + .get(0) + .isCompatible(((TimeseriesMetadata) timeseriesMetadata).getTsDataType()); + } else { + List valueTimeseriesMetadataList = + ((AbstractAlignedTimeSeriesMetadata) timeseriesMetadata).getValueTimeseriesMetadataList(); + if (getTsDataTypeList().isEmpty()) { + return true; + } + if (valueTimeseriesMetadataList != null) { + int incompactibleCount = 0; + for (int i = 0, size = getTsDataTypeList().size(); i < size; i++) { + TimeseriesMetadata valueTimeSeriesMetadata = valueTimeseriesMetadataList.get(i); + if (valueTimeSeriesMetadata != null + && !getTsDataTypeList() + .get(i) + .isCompatible(valueTimeSeriesMetadata.getTsDataType())) { + valueTimeseriesMetadataList.set(i, null); + incompactibleCount++; + } + } + return incompactibleCount != getTsDataTypeList().size(); + } + return true; + } + } + private void unpackUnseqTsFileResource() throws IOException { ITimeSeriesMetadata timeseriesMetadata = loadTimeSeriesMetadata(orderUtils.getNextUnseqFileResource(true), false); // skip if data type is mismatched which may be caused by delete - if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) { + if (timeseriesMetadata != null && typeCompatible(timeseriesMetadata)) { timeseriesMetadata.setSeq(false); unSeqTimeSeriesMetadata.add(timeseriesMetadata); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index b146b25c2abd..f6160b5e833f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; @@ -112,25 +113,39 @@ public static AlignedTVList newAlignedList(List dataTypes) { } } + private List convertToType(TSDataType to, TSDataType from, List originalValues) { + if (!to.isCompatible(from)) { + return null; + } + return originalValues.stream().map(o -> to.castFromArray(from, o)).collect(Collectors.toList()); + } + @Override public TVList getTvListByColumnIndex( - List columnIndex, List dataTypeList, boolean ignoreAllNullRows) { + List columnIndexList, List dataTypeList, boolean ignoreAllNullRows) { List> values = new ArrayList<>(); List> bitMaps = null; - for (int i = 0; i < columnIndex.size(); i++) { + for (int i = 0; i < columnIndexList.size(); i++) { // columnIndex == -1 means querying a non-exist column, add null column here - if (columnIndex.get(i) == -1) { + Integer columnIndex = columnIndexList.get(i); + if (columnIndex == -1) { values.add(null); } else { - values.add(this.values.get(columnIndex.get(i))); - if (this.bitMaps != null && this.bitMaps.get(columnIndex.get(i)) != null) { + List column = this.values.get(columnIndex); + if (dataTypeList.get(i) == this.dataTypes.get(columnIndex)) { + values.add(column); + } else { + values.add(convertToType(dataTypeList.get(i), this.dataTypes.get(columnIndex), column)); + } + + if (this.bitMaps != null && this.bitMaps.get(columnIndex) != null) { if (bitMaps == null) { - bitMaps = new ArrayList<>(columnIndex.size()); - for (int j = 0; j < columnIndex.size(); j++) { + bitMaps = new ArrayList<>(columnIndexList.size()); + for (int j = 0; j < columnIndexList.size(); j++) { bitMaps.add(null); } } - bitMaps.set(i, this.bitMaps.get(columnIndex.get(i))); + bitMaps.set(i, this.bitMaps.get(columnIndex)); } } } diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 794dff013a2d..9908843cec49 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -197,7 +197,7 @@ alterTableStatement | ALTER TABLE (IF EXISTS)? tableName=qualifiedName DROP COLUMN (IF EXISTS)? column=identifier #dropColumn // set TTL can use this | ALTER TABLE (IF EXISTS)? tableName=qualifiedName SET PROPERTIES propertyAssignments #setTableProperties - | ALTER TABLE (IF EXISTS)? tableName=qualifiedName ALTER COLUMN column=identifier (IF EXISTS)? SET DATA TYPE new_type=type #alterColumnDataType + | ALTER TABLE (IF EXISTS)? tableName=qualifiedName ALTER COLUMN (IF EXISTS)? column=identifier SET DATA TYPE new_type=type #alterColumnDataType ; diff --git a/pom.xml b/pom.xml index 28f72ddc9bcc..ee9194757a76 100644 --- a/pom.xml +++ b/pom.xml @@ -167,7 +167,7 @@ 0.14.1 1.9 1.5.6-3 - 1.2.0-241224-SNAPSHOT + 2.0.0-typeconv-SNAPSHOT - 2.2.0-SNAPSHOT + 2.2.0-250903-SNAPSHOT + 2.2.0-2511131-SNAPSHOT