diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 1107b7717db45f..ec1503ac88cf14 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -105,7 +105,8 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque } // MUST sync rowsets before capturing rowset readers and building DeleteHandler SyncOptions options; - options.query_version = request.alter_version; + // The SC boundary (V1) must be calculated from the latest visible rowsets of the base + // tablet. Do not cap this sync by request.alter_version, which may be stale across retries. RETURN_IF_ERROR(_base_tablet->sync_rowsets(options)); // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1] _output_cumulative_point = _base_tablet->cumulative_layer_point(); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index ee9b4b20e0b01a..0ac983898853a9 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -289,6 +289,18 @@ Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* st RETURN_IF_ERROR(sync_if_not_running(stats)); if (options.query_version > 0) { + DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", { + auto target_tablet_id = dp->param("tablet_id", -1); + auto stale_version = dp->param("version", -1); + if (target_tablet_id == tablet_id() && stale_version >= 0) { + std::unique_lock wlock(_meta_lock); + LOG(INFO) << "override cloud tablet local max_version for query_version sync" + << ", tablet_id=" << tablet_id() << ", old_max_version=" << _max_version + << ", stale_version=" << stale_version + << ", query_version=" << options.query_version; + _max_version = stale_version; + } + }); auto lock_start = std::chrono::steady_clock::now(); std::shared_lock rlock(_meta_lock); if (stats) { diff --git a/be/test/cloud/cloud_schema_change_job_test.cpp b/be/test/cloud/cloud_schema_change_job_test.cpp index 972ff2af255ca8..74b27bed7ad934 100644 --- a/be/test/cloud/cloud_schema_change_job_test.cpp +++ b/be/test/cloud/cloud_schema_change_job_test.cpp @@ -205,6 +205,84 @@ TEST_F(CloudSchemaChangeJobTest, FillVersionHolesBeforeNewTabletRunning) { ASSERT_EQ(versions[1], Version(4, 4)); } +TEST_F(CloudSchemaChangeJobTest, RefreshBaseTabletBeforeRegisteringSchemaChangeJob) { + int64_t base_tablet_id = 50001; + int64_t new_tablet_id = 50002; + + TabletMetaSharedPtr base_meta(new TabletMeta( + 1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + TabletMetaSharedPtr new_meta(new TabletMeta( + 1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + + auto base_tablet = std::make_shared(_engine, std::move(base_meta)); + auto new_tablet = std::make_shared(_engine, std::move(new_meta)); + static_cast(new_tablet->set_tablet_state(TABLET_NOTREADY)); + + auto base_initial_rowset = create_rowset(base_tablet->tablet_schema(), base_tablet_id, 2, 6); + auto base_latest_rowset = create_rowset(base_tablet->tablet_schema(), base_tablet_id, 7, 10); + + auto* sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) { + auto tablet_id = try_any_cast(args[0]); + auto* meta_ptr = try_any_cast(args[1]); + if (tablet_id == base_tablet_id) { + *meta_ptr = base_tablet->tablet_meta(); + } else if (tablet_id == new_tablet_id) { + *meta_ptr = new_tablet->tablet_meta(); + } + try_any_cast_ret(args)->second = true; + }); + + int base_sync_count = 0; + sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome) { + auto* tablet = try_any_cast(outcome[0]); + if (tablet->tablet_id() == base_tablet_id) { + ++base_sync_count; + std::unique_lock lock(tablet->get_header_lock()); + if (base_sync_count == 1) { + tablet->add_rowsets({base_initial_rowset}, false, lock); + } else { + tablet->add_rowsets({base_latest_rowset}, false, lock); + } + } + auto* pairs = try_any_cast_ret(outcome); + pairs->second = true; + pairs->first = Status::OK(); + }); + + int64_t prepared_alter_version = -1; + sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [&](auto&& outcome) { + auto job = try_any_cast(outcome[0]); + ASSERT_TRUE(job.has_schema_change()); + prepared_alter_version = job.schema_change().alter_version(); + + auto* pairs = try_any_cast_ret(outcome); + pairs->second = true; + pairs->first = Status::InternalError("mock job already success"); + + auto* resp = try_any_cast(outcome[1]); + resp->mutable_status()->set_code(cloud::JOB_ALREADY_SUCCESS); + }); + + TAlterTabletReqV2 request; + request.base_tablet_id = base_tablet_id; + request.new_tablet_id = new_tablet_id; + request.alter_version = 4; + request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE); + + CloudSchemaChangeJob sc_job(_engine, "test_refresh_base_tablet_before_sc", 9999999999); + auto status = sc_job.process_alter_tablet(request); + + ASSERT_TRUE(status.ok()) << status.to_string(); + ASSERT_EQ(base_sync_count, 2); + ASSERT_EQ(prepared_alter_version, 10); +} + // Test: cross-V1 compaction detected → abort SC job → return SC_COMPACTION_CONFLICT TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) { int64_t base_tablet_id = 10001; diff --git a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy new file mode 100644 index 00000000000000..679385562b56b3 --- /dev/null +++ b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Test: schema change retry refreshes base tablet rowsets before registering a fresh V1. +// +// Timeline: +// 1. Block SC, insert data, and let new tablet compaction create a cross-V1 rowset. +// 2. Force V1=6 once to prove the cross-V1 retry path is active. +// 3. Disable the V1 override, but inject stale local base max=6 only for query_version sync. +// 4. Retry must refresh the base tablet and finish with the latest V1. +// +// The stale-local-max debug point only fires when the caller uses SyncOptions.query_version. +// Old code capped SC base sync by request.alter_version, so it keeps V1 stale and cannot finish. +// The fixed path does not set query_version, so it refreshes from meta-service and succeeds. + +import org.apache.doris.regression.suite.ClusterOptions + +suite('test_sc_compaction_cross_v1_stale_base_refresh', 'docker') { + + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += ["enable_java_support=false"] + options.beConfigs += ["enable_new_tablet_do_compaction=true"] + options.beConfigs += ["alter_tablet_worker_count=1"] + options.beConfigs += ["cumulative_compaction_min_deltas=2"] + options.beNum = 1 + options.feConfigs += ["http_port=8030"] + options.feConfigs += ["rpc_port=9020"] + options.feConfigs += ["query_port=9030"] + options.feConfigs += ["edit_log_port=9010"] + options.feConfigs += ["enable_schema_change_retry=true"] + options.feConfigs += ["schema_change_max_retry_time=10"] + + docker(options) { + def getJobState = { + def result = sql """ + SHOW ALTER TABLE COLUMN + WHERE IndexName='sc_cross_v1_stale_base_refresh_test' + ORDER BY createtime DESC LIMIT 1 + """ + logger.info("getJobState: ${result}") + return result[0][9] + } + + sql "DROP TABLE IF EXISTS sc_cross_v1_stale_base_refresh_test" + sql """ + CREATE TABLE sc_cross_v1_stale_base_refresh_test ( + k1 int NOT NULL, + v1 varchar(100) NOT NULL, + v2 int NOT NULL + ) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def tablets = sql_return_maparray "SHOW TABLETS FROM sc_cross_v1_stale_base_refresh_test" + assertEquals(1, tablets.size()) + def baseTabletId = tablets[0].TabletId.toString() + logger.info("base tablet id for stale refresh test: ${baseTabletId}") + + for (int i = 0; i < 3; i++) { + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES ") + for (int j = 0; j < 20; j++) { + if (j > 0) { + sb.append(", ") + } + def key = i * 20 + j + 1 + sb.append("(${key}, 'val_${key}', ${key * 10})") + } + sql sb.toString() + } + assertEquals(60L, (sql "SELECT count(*) FROM sc_cross_v1_stale_base_refresh_test")[0][0]) + + def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block' + def overrideDP = 'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version' + def staleMaxDP = 'CloudTablet::sync_rowsets.stale_local_max_for_query_version' + + try { + GetDebugPoint().enableDebugPointForAllBEs(scBlock) + try { + sql "ALTER TABLE sc_cross_v1_stale_base_refresh_test MODIFY COLUMN v2 bigint" + sleep(10000) + assertEquals("RUNNING", getJobState()) + + for (int i = 0; i < 6; i++) { + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES ") + for (int j = 0; j < 10; j++) { + if (j > 0) { + sb.append(", ") + } + def key = 100 + i * 10 + j + 1 + sb.append("(${key}, 'new_${key}', ${key * 10})") + } + sql sb.toString() + } + + sleep(30000) + GetDebugPoint().enableDebugPointForAllBEs(overrideDP, [version: 6]) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(scBlock) + } + + sleep(10000) + assertEquals("RUNNING", getJobState(), + "SC should still be RUNNING while V1 is forced to cross the compacted rowset") + + GetDebugPoint().enableDebugPointForAllBEs( + staleMaxDP, [tablet_id: baseTabletId, version: 6]) + GetDebugPoint().disableDebugPointForAllBEs(overrideDP) + + int maxTries = 180 + def finalState = "" + while (maxTries-- > 0) { + finalState = getJobState() + if (finalState == "FINISHED" || finalState == "CANCELLED") { + break + } + sleep(1000) + } + + logger.info("SC final state after stale base refresh retry: ${finalState}") + assertEquals("FINISHED", finalState) + + assertEquals(120L, + (sql "SELECT count(*) FROM sc_cross_v1_stale_base_refresh_test")[0][0]) + + def columns = sql "DESC sc_cross_v1_stale_base_refresh_test" + def v2Col = columns.find { it[0] == "v2" } + assertTrue(v2Col[1].toString().toLowerCase().contains("bigint"), + "v2 column should be bigint after schema change, got: ${v2Col[1]}") + + def backends = sql_return_maparray("SHOW BACKENDS") + assertTrue(backends.every { it.Alive.toString() == "true" }, + "BE should be alive after stale base refresh retry") + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +}