From 29aabd1d7449918765c1b2de1d137ee18882b3b5 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Thu, 11 Jun 2026 19:55:52 +0800 Subject: [PATCH] [fix](cloud) Refresh base tablet before schema change V1 (#64312) Related PR: #62272 Problem Summary: Cloud schema change retries could keep registering a stale alter version after a cross-V1 compaction conflict. The retry path synced the base tablet only up to the request alter version, so a local base tablet whose max version already exceeded that request could skip refreshing newer visible rowsets. When a new schema-change tablet had a compaction rowset crossing the stale V1, every retry reused that V1 and eventually exhausted the retry limit. This change refreshes the base tablet without capping sync by the request alter version before computing schema change V1, allowing retries to register a fresh boundary. It also adds a docker regression case that blocks schema change, creates a cross-V1 retry, injects stale local max-version state for query-version sync, and verifies the retry finishes after the base tablet is refreshed. ### Release note Fix cloud schema change retry stale V1 failure. --- be/src/cloud/cloud_schema_change_job.cpp | 3 +- be/src/cloud/cloud_tablet.cpp | 12 ++ .../cloud/cloud_schema_change_job_test.cpp | 78 +++++++++ ...paction_cross_v1_stale_base_refresh.groovy | 158 ++++++++++++++++++ 4 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 1107b7717db45f..ec1503ac88cf14 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -105,7 +105,8 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque } // MUST sync rowsets before capturing rowset readers and building DeleteHandler SyncOptions options; - options.query_version = request.alter_version; + // The SC boundary (V1) must be calculated from the latest visible rowsets of the base + // tablet. Do not cap this sync by request.alter_version, which may be stale across retries. RETURN_IF_ERROR(_base_tablet->sync_rowsets(options)); // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1] _output_cumulative_point = _base_tablet->cumulative_layer_point(); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index ee9b4b20e0b01a..0ac983898853a9 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -289,6 +289,18 @@ Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* st RETURN_IF_ERROR(sync_if_not_running(stats)); if (options.query_version > 0) { + DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", { + auto target_tablet_id = dp->param("tablet_id", -1); + auto stale_version = dp->param("version", -1); + if (target_tablet_id == tablet_id() && stale_version >= 0) { + std::unique_lock wlock(_meta_lock); + LOG(INFO) << "override cloud tablet local max_version for query_version sync" + << ", tablet_id=" << tablet_id() << ", old_max_version=" << _max_version + << ", stale_version=" << stale_version + << ", query_version=" << options.query_version; + _max_version = stale_version; + } + }); auto lock_start = std::chrono::steady_clock::now(); std::shared_lock rlock(_meta_lock); if (stats) { diff --git a/be/test/cloud/cloud_schema_change_job_test.cpp b/be/test/cloud/cloud_schema_change_job_test.cpp index 972ff2af255ca8..74b27bed7ad934 100644 --- a/be/test/cloud/cloud_schema_change_job_test.cpp +++ b/be/test/cloud/cloud_schema_change_job_test.cpp @@ -205,6 +205,84 @@ TEST_F(CloudSchemaChangeJobTest, FillVersionHolesBeforeNewTabletRunning) { ASSERT_EQ(versions[1], Version(4, 4)); } +TEST_F(CloudSchemaChangeJobTest, RefreshBaseTabletBeforeRegisteringSchemaChangeJob) { + int64_t base_tablet_id = 50001; + int64_t new_tablet_id = 50002; + + TabletMetaSharedPtr base_meta(new TabletMeta( + 1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + TabletMetaSharedPtr new_meta(new TabletMeta( + 1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + + auto base_tablet = std::make_shared(_engine, std::move(base_meta)); + auto new_tablet = std::make_shared(_engine, std::move(new_meta)); + static_cast(new_tablet->set_tablet_state(TABLET_NOTREADY)); + + auto base_initial_rowset = create_rowset(base_tablet->tablet_schema(), base_tablet_id, 2, 6); + auto base_latest_rowset = create_rowset(base_tablet->tablet_schema(), base_tablet_id, 7, 10); + + auto* sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) { + auto tablet_id = try_any_cast(args[0]); + auto* meta_ptr = try_any_cast(args[1]); + if (tablet_id == base_tablet_id) { + *meta_ptr = base_tablet->tablet_meta(); + } else if (tablet_id == new_tablet_id) { + *meta_ptr = new_tablet->tablet_meta(); + } + try_any_cast_ret(args)->second = true; + }); + + int base_sync_count = 0; + sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome) { + auto* tablet = try_any_cast(outcome[0]); + if (tablet->tablet_id() == base_tablet_id) { + ++base_sync_count; + std::unique_lock lock(tablet->get_header_lock()); + if (base_sync_count == 1) { + tablet->add_rowsets({base_initial_rowset}, false, lock); + } else { + tablet->add_rowsets({base_latest_rowset}, false, lock); + } + } + auto* pairs = try_any_cast_ret(outcome); + pairs->second = true; + pairs->first = Status::OK(); + }); + + int64_t prepared_alter_version = -1; + sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [&](auto&& outcome) { + auto job = try_any_cast(outcome[0]); + ASSERT_TRUE(job.has_schema_change()); + prepared_alter_version = job.schema_change().alter_version(); + + auto* pairs = try_any_cast_ret(outcome); + pairs->second = true; + pairs->first = Status::InternalError("mock job already success"); + + auto* resp = try_any_cast(outcome[1]); + resp->mutable_status()->set_code(cloud::JOB_ALREADY_SUCCESS); + }); + + TAlterTabletReqV2 request; + request.base_tablet_id = base_tablet_id; + request.new_tablet_id = new_tablet_id; + request.alter_version = 4; + request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE); + + CloudSchemaChangeJob sc_job(_engine, "test_refresh_base_tablet_before_sc", 9999999999); + auto status = sc_job.process_alter_tablet(request); + + ASSERT_TRUE(status.ok()) << status.to_string(); + ASSERT_EQ(base_sync_count, 2); + ASSERT_EQ(prepared_alter_version, 10); +} + // Test: cross-V1 compaction detected → abort SC job → return SC_COMPACTION_CONFLICT TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) { int64_t base_tablet_id = 10001; diff --git a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy new file mode 100644 index 00000000000000..679385562b56b3 --- /dev/null +++ b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Test: schema change retry refreshes base tablet rowsets before registering a fresh V1. +// +// Timeline: +// 1. Block SC, insert data, and let new tablet compaction create a cross-V1 rowset. +// 2. Force V1=6 once to prove the cross-V1 retry path is active. +// 3. Disable the V1 override, but inject stale local base max=6 only for query_version sync. +// 4. Retry must refresh the base tablet and finish with the latest V1. +// +// The stale-local-max debug point only fires when the caller uses SyncOptions.query_version. +// Old code capped SC base sync by request.alter_version, so it keeps V1 stale and cannot finish. +// The fixed path does not set query_version, so it refreshes from meta-service and succeeds. + +import org.apache.doris.regression.suite.ClusterOptions + +suite('test_sc_compaction_cross_v1_stale_base_refresh', 'docker') { + + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += ["enable_java_support=false"] + options.beConfigs += ["enable_new_tablet_do_compaction=true"] + options.beConfigs += ["alter_tablet_worker_count=1"] + options.beConfigs += ["cumulative_compaction_min_deltas=2"] + options.beNum = 1 + options.feConfigs += ["http_port=8030"] + options.feConfigs += ["rpc_port=9020"] + options.feConfigs += ["query_port=9030"] + options.feConfigs += ["edit_log_port=9010"] + options.feConfigs += ["enable_schema_change_retry=true"] + options.feConfigs += ["schema_change_max_retry_time=10"] + + docker(options) { + def getJobState = { + def result = sql """ + SHOW ALTER TABLE COLUMN + WHERE IndexName='sc_cross_v1_stale_base_refresh_test' + ORDER BY createtime DESC LIMIT 1 + """ + logger.info("getJobState: ${result}") + return result[0][9] + } + + sql "DROP TABLE IF EXISTS sc_cross_v1_stale_base_refresh_test" + sql """ + CREATE TABLE sc_cross_v1_stale_base_refresh_test ( + k1 int NOT NULL, + v1 varchar(100) NOT NULL, + v2 int NOT NULL + ) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def tablets = sql_return_maparray "SHOW TABLETS FROM sc_cross_v1_stale_base_refresh_test" + assertEquals(1, tablets.size()) + def baseTabletId = tablets[0].TabletId.toString() + logger.info("base tablet id for stale refresh test: ${baseTabletId}") + + for (int i = 0; i < 3; i++) { + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES ") + for (int j = 0; j < 20; j++) { + if (j > 0) { + sb.append(", ") + } + def key = i * 20 + j + 1 + sb.append("(${key}, 'val_${key}', ${key * 10})") + } + sql sb.toString() + } + assertEquals(60L, (sql "SELECT count(*) FROM sc_cross_v1_stale_base_refresh_test")[0][0]) + + def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block' + def overrideDP = 'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version' + def staleMaxDP = 'CloudTablet::sync_rowsets.stale_local_max_for_query_version' + + try { + GetDebugPoint().enableDebugPointForAllBEs(scBlock) + try { + sql "ALTER TABLE sc_cross_v1_stale_base_refresh_test MODIFY COLUMN v2 bigint" + sleep(10000) + assertEquals("RUNNING", getJobState()) + + for (int i = 0; i < 6; i++) { + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES ") + for (int j = 0; j < 10; j++) { + if (j > 0) { + sb.append(", ") + } + def key = 100 + i * 10 + j + 1 + sb.append("(${key}, 'new_${key}', ${key * 10})") + } + sql sb.toString() + } + + sleep(30000) + GetDebugPoint().enableDebugPointForAllBEs(overrideDP, [version: 6]) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(scBlock) + } + + sleep(10000) + assertEquals("RUNNING", getJobState(), + "SC should still be RUNNING while V1 is forced to cross the compacted rowset") + + GetDebugPoint().enableDebugPointForAllBEs( + staleMaxDP, [tablet_id: baseTabletId, version: 6]) + GetDebugPoint().disableDebugPointForAllBEs(overrideDP) + + int maxTries = 180 + def finalState = "" + while (maxTries-- > 0) { + finalState = getJobState() + if (finalState == "FINISHED" || finalState == "CANCELLED") { + break + } + sleep(1000) + } + + logger.info("SC final state after stale base refresh retry: ${finalState}") + assertEquals("FINISHED", finalState) + + assertEquals(120L, + (sql "SELECT count(*) FROM sc_cross_v1_stale_base_refresh_test")[0][0]) + + def columns = sql "DESC sc_cross_v1_stale_base_refresh_test" + def v2Col = columns.find { it[0] == "v2" } + assertTrue(v2Col[1].toString().toLowerCase().contains("bigint"), + "v2 column should be bigint after schema change, got: ${v2Col[1]}") + + def backends = sql_return_maparray("SHOW BACKENDS") + assertTrue(backends.every { it.Alive.toString() == "true" }, + "BE should be alive after stale base refresh retry") + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +}