From aaf989796e69be2a3fe26ce7bc72005cf763ab75 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Tue, 19 May 2026 11:52:52 +0200 Subject: [PATCH 01/13] Add tests for TTL EXPORT partition (test-first) Tests describe the contract for the upcoming `TTL ... EXPORT TO db.table` action. They are added before the C++ implementation so they double as the acceptance criteria. Stateless (tests/queries/0_stateless): - 04206_ttl_export_partition_syntax: parser/metadata round-trip and rejection of (a) two EXPORT TTLs to the same destination and (b) EXPORT TTL on a table without a partition key. - 04207_ttl_export_partition_basic: happy path, plus an in-line assertion that a future-dated partition is not exported. - 04208_ttl_export_partition_skip_already_exported: re-triggering after a partition has been exported does not duplicate it. Integration (tests/integration/test_ttl_export_partition): - test_basic_to_iceberg, test_only_one_replica_submits, test_failure_and_backoff, test_serial_across_partitions, test_replica_restart_mid_export, test_modify_ttl_picks_up_with_materialize, test_disabled_replica, test_dedup_via_high_water_mark. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test_ttl_export_partition/__init__.py | 0 .../allow_experimental_export_partition.xml | 3 + .../disable_experimental_export_partition.xml | 3 + .../configs/named_collections.xml | 9 + .../configs/users.d/profile.xml | 8 + .../test_ttl_export_partition/test.py | 403 ++++++++++++++++++ ...4206_ttl_export_partition_syntax.reference | 2 + .../04206_ttl_export_partition_syntax.sql | 52 +++ ...04207_ttl_export_partition_basic.reference | 13 + .../04207_ttl_export_partition_basic.sh | 70 +++ ..._partition_skip_already_exported.reference | 6 + ..._export_partition_skip_already_exported.sh | 60 +++ 12 files changed, 629 insertions(+) create mode 100644 tests/integration/test_ttl_export_partition/__init__.py create mode 100644 tests/integration/test_ttl_export_partition/configs/allow_experimental_export_partition.xml create mode 100644 tests/integration/test_ttl_export_partition/configs/disable_experimental_export_partition.xml create mode 100644 tests/integration/test_ttl_export_partition/configs/named_collections.xml create mode 100644 tests/integration/test_ttl_export_partition/configs/users.d/profile.xml create mode 100644 tests/integration/test_ttl_export_partition/test.py create mode 100644 tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference create mode 100644 tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql create mode 100644 tests/queries/0_stateless/04207_ttl_export_partition_basic.reference create mode 100755 tests/queries/0_stateless/04207_ttl_export_partition_basic.sh create mode 100644 tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.reference create mode 100755 tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh diff --git a/tests/integration/test_ttl_export_partition/__init__.py b/tests/integration/test_ttl_export_partition/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_ttl_export_partition/configs/allow_experimental_export_partition.xml b/tests/integration/test_ttl_export_partition/configs/allow_experimental_export_partition.xml new file mode 100644 index 000000000000..514cd710836a --- /dev/null +++ b/tests/integration/test_ttl_export_partition/configs/allow_experimental_export_partition.xml @@ -0,0 +1,3 @@ + + 1 + diff --git a/tests/integration/test_ttl_export_partition/configs/disable_experimental_export_partition.xml b/tests/integration/test_ttl_export_partition/configs/disable_experimental_export_partition.xml new file mode 100644 index 000000000000..21be4dc894df --- /dev/null +++ b/tests/integration/test_ttl_export_partition/configs/disable_experimental_export_partition.xml @@ -0,0 +1,3 @@ + + 0 + diff --git a/tests/integration/test_ttl_export_partition/configs/named_collections.xml b/tests/integration/test_ttl_export_partition/configs/named_collections.xml new file mode 100644 index 000000000000..573822539c50 --- /dev/null +++ b/tests/integration/test_ttl_export_partition/configs/named_collections.xml @@ -0,0 +1,9 @@ + + + + http://minio1:9001/root/data + minio + ClickHouse_Minio_P@ssw0rd + + + diff --git a/tests/integration/test_ttl_export_partition/configs/users.d/profile.xml b/tests/integration/test_ttl_export_partition/configs/users.d/profile.xml new file mode 100644 index 000000000000..7427b889a5c5 --- /dev/null +++ b/tests/integration/test_ttl_export_partition/configs/users.d/profile.xml @@ -0,0 +1,8 @@ + + + + 3 + 1 + + + diff --git a/tests/integration/test_ttl_export_partition/test.py b/tests/integration/test_ttl_export_partition/test.py new file mode 100644 index 000000000000..38099b0045ae --- /dev/null +++ b/tests/integration/test_ttl_export_partition/test.py @@ -0,0 +1,403 @@ +"""Integration tests for `TTL ... EXPORT TO db.table`. + +The stateless suite covers parser, single-replica happy path and skip-already-exported. +This suite covers what can only be observed on a multi-replica cluster: ZK race, +failure injection with backoff, serial behaviour, restart recovery, ALTER pickup, +the disabled-replica case, and dedup across a high-water-mark lookup. +""" + +import logging +import time + +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.export_partition_helpers import ( + make_iceberg_s3, + make_rmt, + unique_suffix, + wait_for_exception_count, + wait_for_export_status, + wait_for_export_to_start, +) +from helpers.network import PartitionManager + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + common = dict( + main_configs=[ + "configs/named_collections.xml", + "configs/allow_experimental_export_partition.xml", + ], + user_configs=["configs/users.d/profile.xml"], + with_minio=True, + stay_alive=True, + with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], + ) + cluster.add_instance("replica1", **common) + cluster.add_instance("replica2", **common) + cluster.add_instance( + "replica_disabled", + main_configs=[ + "configs/named_collections.xml", + "configs/disable_experimental_export_partition.xml", + ], + user_configs=["configs/users.d/profile.xml"], + with_minio=True, + stay_alive=True, + with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], + ) + logging.info("Starting cluster...") + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(autouse=True) +def drop_tables_after_test(cluster): + yield + for name, instance in cluster.instances.items(): + try: + tables = instance.query( + "SELECT name FROM system.tables WHERE database = 'default' FORMAT TabSeparated" + ).strip() + for table in tables.split("\n"): + table = table.strip() + if table: + instance.query(f"DROP TABLE IF EXISTS default.`{table}` SYNC") + except Exception as exc: + logging.warning("cleanup on %s failed: %s", name, exc) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def create_rmt_with_export_ttl(node, name, dst, interval="INTERVAL 1 DAY"): + """ReplicatedMergeTree with EXPORT TTL set at create time so parts get export_ttl info on write.""" + node.query( + f""" + CREATE TABLE {name} (event_date Date, id UInt64, year UInt16) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{database}}/{name}', '{node.name}') + PARTITION BY year ORDER BY id + TTL event_date + {interval} EXPORT TO {dst} + """ + ) + + +def create_s3_dst(node, name): + node.query( + f""" + CREATE TABLE {name} (event_date Date, id UInt64, year UInt16) + ENGINE = S3(s3_conn, filename='{name}', format=Parquet, partition_strategy='hive') + PARTITION BY year + """ + ) + + +def insert_expired_partition(node, name, year, ids): + values = ", ".join(f"(toDate('{year}-01-01'), {i}, {year})" for i in ids) + node.query(f"INSERT INTO {name} VALUES {values}") + + +def count_pending(node, src): + return int(node.query( + f"SELECT count() FROM system.replicated_partition_exports" + f" WHERE source_table = '{src}' AND status = 'PENDING'" + ).strip()) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_basic_to_iceberg(cluster): + """Iceberg destination: expired partition exports end to end, single replica.""" + node = cluster.instances["replica1"] + uid = unique_suffix() + src = f"src_{uid}" + dst = f"iceberg_{uid}" + + # Iceberg only accepts its own partition transforms (`toYearNumSinceEpoch`, not `toYear`) + # and only signed integer types, so source and destination must be created inline to match. + make_iceberg_s3(node, dst, "event_date Date, id Int64", partition_by="toYearNumSinceEpoch(event_date)") + node.query( + f""" + CREATE TABLE {src} (event_date Date, id Int64) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{database}}/{src}', '{node.name}') + PARTITION BY toYearNumSinceEpoch(event_date) ORDER BY id + TTL event_date + INTERVAL 1 DAY EXPORT TO TABLE {dst} + """ + ) + insert_expired_partition(node, src, 2000, [1, 2, 3]) + + # `toYearNumSinceEpoch('2000-01-01')` = 2000 - 1970 = 30, so the partition_id is "30". + wait_for_export_status(node, src, dst, "30", "COMPLETED") + + assert int(node.query(f"SELECT count() FROM {dst}").strip()) == 3 + assert int(node.query(f"SELECT count() FROM {src}").strip()) == 3 # not dropped locally + + +def test_only_one_replica_submits(cluster): + """Both replicas have the TTL; only one ZK manifest is created.""" + r1 = cluster.instances["replica1"] + r2 = cluster.instances["replica2"] + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + # S3-engine tables are not replicated, so the destination must exist on every replica + # that creates the source — the DDL-time schema check resolves it locally. + create_s3_dst(r1, dst) + create_s3_dst(r2, dst) + create_rmt_with_export_ttl(r1, src, dst) + create_rmt_with_export_ttl(r2, src, dst) + + insert_expired_partition(r1, src, 2000, [1, 2]) + r2.query(f"SYSTEM SYNC REPLICA {src}") + + wait_for_export_status(r1, src, dst, "2000", "COMPLETED") + + # Each replica sees the manifest via ZK, but only one row exists in the history + # (keyed by partition_id, dest_db, dest_table). Both replicas report the same row. + for node in (r1, r2): + count = int(node.query( + f"SELECT count() FROM system.replicated_partition_exports" + f" WHERE source_table = '{src}' AND destination_table = '{dst}' AND partition_id = '2000'" + ).strip()) + assert count == 1, f"{node.name}: expected exactly one entry, got {count}" + + +def test_failure_and_backoff(cluster): + """Block S3, watch retries accumulate, unblock, expect eventual COMPLETED.""" + node = cluster.instances["replica1"] + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + create_s3_dst(node, dst) + create_rmt_with_export_ttl(node, src, dst) + + with PartitionManager() as pm: + pm.add_rule({ + "instance": node, "destination": minio_ip, "protocol": "tcp", + "destination_port": minio_port, "action": "REJECT --reject-with tcp-reset", + }) + insert_expired_partition(node, src, 2000, [1, 2]) + + # Wait until the scheduler has attempted submission and recorded at least one failure. + # The TTL scheduler's own backoff is in-memory; we observe its retries through + # exception_count growing on the manifest. + wait_for_exception_count(node, src, dst, "2000", min_exception_count=2, timeout=60) + + # MinIO is reachable again; the next retry of the scheduler must succeed. + wait_for_export_status(node, src, dst, "2000", "COMPLETED", timeout=120) + + rows = int(node.query(f"SELECT count() FROM {dst}").strip()) + assert rows == 2 + + +def test_serial_across_partitions(cluster): + """While any partition is in flight (PENDING / FAILED), no other partition for the same + (source, dest) reaches PENDING. We force the in-flight to linger by blocking MinIO, + insert three eligible partitions, and assert at-most-one in flight throughout. + """ + node = cluster.instances["replica1"] + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + create_s3_dst(node, dst) + create_rmt_with_export_ttl(node, src, dst) + + with PartitionManager() as pm: + pm.add_rule({ + "instance": node, "destination": minio_ip, "protocol": "tcp", + "destination_port": minio_port, "action": "REJECT --reject-with tcp-reset", + }) + + for year, ids in [(2000, [1, 2]), (2001, [3]), (2002, [4])]: + insert_expired_partition(node, src, year, ids) + + wait_for_export_to_start(node, src, dst, "2000", timeout=30) + + # Sample at 0.5s for 15s. With serial scheduling there must never be more than one + # in-flight entry for this (source, dest). Track the maximum observed instead of + # asserting per-sample so a single transient blip isn't the cause of a flake report. + observations = [] + deadline = time.time() + 15 + while time.time() < deadline: + observations.append(count_pending(node, src)) + time.sleep(0.5) + + assert observations, "no observations collected" + assert max(observations) <= 1, ( + f"observed multiple in-flight partitions: {observations}" + ) + + # Drain everything once MinIO returns. Only the trailing partition is guaranteed to be + # observable as COMPLETED — earlier ttl-origin markers are removed when the next one is + # submitted. The destination row count below verifies that all three actually exported. + wait_for_export_status(node, src, dst, "2002", "COMPLETED", timeout=120) + + assert int(node.query(f"SELECT count() FROM {dst}").strip()) == 4 + + +def test_replica_restart_mid_export(cluster): + """If the replica restarts mid-flight, the scheduler recovers current_export from ZK + and does not queue a second partition before the first completes. + """ + node = cluster.instances["replica1"] + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + create_s3_dst(node, dst) + create_rmt_with_export_ttl(node, src, dst) + + with PartitionManager() as pm: + pm.add_rule({ + "instance": node, "destination": minio_ip, "protocol": "tcp", + "destination_port": minio_port, "action": "REJECT --reject-with tcp-reset", + }) + + insert_expired_partition(node, src, 2000, [1]) + insert_expired_partition(node, src, 2001, [2]) + wait_for_export_to_start(node, src, dst, "2000", timeout=30) + + node.restart_clickhouse() + + # Both partitions complete after MinIO is back and the scheduler has re-attached + # to the in-flight 2000 manifest from ZK. + wait_for_export_status(node, src, dst, "2000", "COMPLETED", timeout=120) + wait_for_export_status(node, src, dst, "2001", "COMPLETED", timeout=120) + + # No duplicate manifest for the trailing partition. The 2000 ttl-marker is removed + # when 2001 is submitted (the "at most one ttl-origin manifest per (src, dest)" invariant), + # so we check 2001, which is the latest and therefore not pruned. + n_2001 = int(node.query( + f"SELECT count() FROM system.replicated_partition_exports" + f" WHERE source_table = '{src}' AND destination_table = '{dst}' AND partition_id = '2001'" + ).strip()) + assert n_2001 == 1 + + +def test_modify_ttl_picks_up_with_materialize(cluster): + """ALTER MODIFY TTL adding an EXPORT TTL must take effect. + Parts that pre-date the TTL require MATERIALIZE TTL to populate per-part export info. + """ + node = cluster.instances["replica1"] + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + create_s3_dst(node, dst) + # No EXPORT TTL at create time. + node.query( + f""" + CREATE TABLE {src} (event_date Date, id UInt64, year UInt16) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{database}}/{src}', 'r1') + PARTITION BY year ORDER BY id + """ + ) + insert_expired_partition(node, src, 2000, [1, 2]) + + # Add the TTL; old parts lack export_ttl[result_column] so the scheduler skips them. + # Disable the implicit materialise so we can demonstrate the explicit `MATERIALIZE TTL` + # back-fill below — otherwise `MODIFY TTL` populates the per-part info itself. + node.query( + f"ALTER TABLE {src} MODIFY TTL event_date + INTERVAL 1 DAY EXPORT TO {dst}", + settings={"materialize_ttl_after_modify": 0}, + ) + + # Confirm no export happens within one tick. + time.sleep(8) + assert int(node.query( + f"SELECT count() FROM system.replicated_partition_exports WHERE source_table = '{src}'" + ).strip()) == 0 + + # MATERIALIZE TTL back-fills per-part info; export becomes possible. + node.query(f"ALTER TABLE {src} MATERIALIZE TTL", settings={"mutations_sync": 2}) + wait_for_export_status(node, src, dst, "2000", "COMPLETED", timeout=60) + + assert int(node.query(f"SELECT count() FROM {dst}").strip()) == 2 + + +def test_disabled_replica(cluster): + """A replica with allow_experimental_export_merge_tree_partition=0 must not submit.""" + r_on = cluster.instances["replica1"] + r_off = cluster.instances["replica_disabled"] + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + create_s3_dst(r_on, dst) + create_s3_dst(r_off, dst) + create_rmt_with_export_ttl(r_on, src, dst) + create_rmt_with_export_ttl(r_off, src, dst) + + insert_expired_partition(r_on, src, 2000, [1]) + r_off.query(f"SYSTEM SYNC REPLICA {src}") + + wait_for_export_status(r_on, src, dst, "2000", "COMPLETED", timeout=60) + + # The disabled replica must not appear as the submitter (source_replica column). + source_replica = r_on.query( + f"SELECT source_replica FROM system.replicated_partition_exports" + f" WHERE source_table = '{src}' AND destination_table = '{dst}' AND partition_id = '2000'" + ).strip() + assert source_replica != r_off.name, f"disabled replica submitted: {source_replica}" + + +def test_dedup_after_restart(cluster): + """Restart wipes in-memory scheduler state. The scheduler reads the latest ttl-origin + manifest from ZK on each tick and walks forward by `partition_id`, so already-exported + partitions are not re-submitted. + """ + node = cluster.instances["replica1"] + uid = unique_suffix() + src = f"src_{uid}" + dst = f"dst_{uid}" + + create_s3_dst(node, dst) + create_rmt_with_export_ttl(node, src, dst) + + insert_expired_partition(node, src, 2000, [1]) + insert_expired_partition(node, src, 2001, [2]) + # Only the trailing partition is guaranteed to be observable; earlier markers are pruned + # when the next one is submitted. + wait_for_export_status(node, src, dst, "2001", "COMPLETED", timeout=60) + + node.restart_clickhouse() + + # A fresh partition with a strictly newer expiration must still export, and 2000/2001 + # must not be re-submitted — the destination row count is the witness. + insert_expired_partition(node, src, 2002, [3]) + wait_for_export_status(node, src, dst, "2002", "COMPLETED", timeout=60) + + n_2002 = int(node.query( + f"SELECT count() FROM system.replicated_partition_exports" + f" WHERE source_table = '{src}' AND destination_table = '{dst}' AND partition_id = '2002'" + ).strip()) + assert n_2002 == 1, f"partition 2002 appears {n_2002} times (expected 1)" + + # Destination has rows from all three partitions, no duplicates. + rows = int(node.query(f"SELECT count() FROM {dst}").strip()) + assert rows == 3 diff --git a/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference new file mode 100644 index 000000000000..b7156fa9f148 --- /dev/null +++ b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference @@ -0,0 +1,2 @@ +TTL event_date + toIntervalDay(7) EXPORT TO ttl_export_dst +TTL event_date + toIntervalDay(7) EXPORT TO ttl_export_dst, event_date + toIntervalDay(30) diff --git a/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql new file mode 100644 index 000000000000..515f62a0dd21 --- /dev/null +++ b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql @@ -0,0 +1,52 @@ +-- Tags: zookeeper, no-replicated-database +-- Parser and metadata round-trip for `TTL ... EXPORT TO db.table`, plus validation. + +DROP TABLE IF EXISTS ttl_export_src SYNC; +DROP TABLE IF EXISTS ttl_export_dst SYNC; + +CREATE TABLE ttl_export_dst (event_date Date, id UInt64) +ENGINE = MergeTree() PARTITION BY toYear(event_date) ORDER BY tuple(); + +-- 1. CREATE TABLE with an EXPORT TTL round-trips through system.tables. +CREATE TABLE ttl_export_src (event_date Date, id UInt64) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src', 'r1') +PARTITION BY toYear(event_date) +ORDER BY id +TTL event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst; + +SELECT replaceRegexpOne(extract(create_table_query, 'TTL [^\n]+'), ' SETTINGS .*$', '') FROM system.tables +WHERE database = currentDatabase() AND name = 'ttl_export_src'; + +DROP TABLE ttl_export_src SYNC; + +-- 2. ALTER MODIFY TTL adds EXPORT alongside DELETE. +CREATE TABLE ttl_export_src (event_date Date, id UInt64) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src', 'r1') +PARTITION BY toYear(event_date) +ORDER BY id; + +ALTER TABLE ttl_export_src MODIFY TTL + event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst, + event_date + INTERVAL 30 DAY DELETE; + +SELECT replaceRegexpOne(extract(create_table_query, 'TTL [^\n]+'), ' SETTINGS .*$', '') FROM system.tables +WHERE database = currentDatabase() AND name = 'ttl_export_src'; + +DROP TABLE ttl_export_src SYNC; + +-- 3. Two EXPORT TTLs to the same destination must be rejected. +CREATE TABLE ttl_export_src (event_date Date, id UInt64) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src', 'r1') +PARTITION BY toYear(event_date) +ORDER BY id +TTL + event_date + INTERVAL 1 DAY EXPORT TO ttl_export_dst, + event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst; -- { serverError BAD_ARGUMENTS } + +-- 4. EXPORT TTL on a table without a partition key must be rejected. +CREATE TABLE ttl_export_src (event_date Date, id UInt64) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src_nopk', 'r1') +ORDER BY id +TTL event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst; -- { serverError BAD_ARGUMENTS } + +DROP TABLE IF EXISTS ttl_export_dst SYNC; diff --git a/tests/queries/0_stateless/04207_ttl_export_partition_basic.reference b/tests/queries/0_stateless/04207_ttl_export_partition_basic.reference new file mode 100644 index 000000000000..04e676e64715 --- /dev/null +++ b/tests/queries/0_stateless/04207_ttl_export_partition_basic.reference @@ -0,0 +1,13 @@ +---- destination contents +1 2000-01-01 +2 2000-01-02 +3 2000-01-03 +4 2001-01-01 +5 2001-01-02 +---- source still has the data (EXPORT does not drop locally) +6 +---- not-yet-expired partition 2100 is absent from destination and from history +0 +0 +---- no BAD_ARGUMENTS attributed to the scheduler +0 diff --git a/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh b/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh new file mode 100755 index 000000000000..6a9731a07bf1 --- /dev/null +++ b/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-parallel, no-replicated-database, replica +# Tag no-fasttest: requires S3 storage. +# Happy path for TTL EXPORT: expired partitions show up in destination, source data is preserved. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +src="ttl_export_basic_src_${RANDOM}" +dst="ttl_export_basic_dst_${RANDOM}" + +query() { $CLICKHOUSE_CLIENT --query "$1"; } + +poll_status() { + local partition="$1" + local expected="$2" + local deadline=$(( $(date +%s) + 90 )) + local s="" + while [ "$(date +%s)" -lt "$deadline" ]; do + s=$(query "SELECT status FROM system.replicated_partition_exports WHERE source_table = '$src' AND destination_table = '$dst' AND partition_id = '$partition'") + if [ "$s" = "$expected" ]; then return 0; fi + sleep 1 + done + echo "Timed out waiting for partition $partition to reach $expected (last: '$s')" >&2 + return 1 +} + +query "DROP TABLE IF EXISTS $src SYNC" +query "DROP TABLE IF EXISTS $dst SYNC" + +query "CREATE TABLE $dst (event_date Date, id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dst', format=Parquet, partition_strategy='hive') PARTITION BY year" + +# Source carries the EXPORT TTL from creation, so parts get export_ttl info populated at write time. +# This avoids the MATERIALIZE TTL requirement that applies to parts predating the TTL. +# Identity partition key on `year` so the hive S3 destination accepts it. +query "CREATE TABLE $src (event_date Date, id UInt64, year UInt16) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$src', 'r1') + PARTITION BY year ORDER BY id + TTL event_date + INTERVAL 1 DAY EXPORT TO $dst" + +query "INSERT INTO $src VALUES (toDate('2000-01-01'), 1, 2000), (toDate('2000-01-02'), 2, 2000), (toDate('2000-01-03'), 3, 2000)" +query "INSERT INTO $src VALUES (toDate('2001-01-01'), 4, 2001), (toDate('2001-01-02'), 5, 2001)" +# Far-future partition: must remain unexported because `event_date + INTERVAL 1 DAY` is well past now(). +query "INSERT INTO $src VALUES (toDate('2100-01-01'), 99, 2100)" + +# Only the trailing partition is guaranteed to be observable as COMPLETED — the 2000 +# ttl-marker is removed when 2001 is submitted ("at most one ttl-origin per (src, dest)"). +# The destination row check below confirms that both partitions actually exported. +poll_status 2001 COMPLETED + +# Give the scheduler one more tick after 2001 completes so a buggy "ignore the `< now()` check" +# implementation has the chance to pick 2100 up. Default poll interval is 5s + ~25% jitter. +sleep 8 + +echo "---- destination contents" +query "SELECT id, event_date FROM $dst ORDER BY id" + +echo "---- source still has the data (EXPORT does not drop locally)" +query "SELECT count() FROM $src" + +echo "---- not-yet-expired partition 2100 is absent from destination and from history" +query "SELECT count() FROM $dst WHERE year = 2100" +query "SELECT count() FROM system.replicated_partition_exports WHERE source_table = '$src' AND destination_table = '$dst' AND partition_id = '2100'" + +echo "---- no BAD_ARGUMENTS attributed to the scheduler" +query "SELECT count() FROM system.errors WHERE name = 'BAD_ARGUMENTS' AND last_error_message ILIKE '%${src}%'" + +query "DROP TABLE $src SYNC" +query "DROP TABLE $dst SYNC" diff --git a/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.reference b/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.reference new file mode 100644 index 000000000000..aff0e91a230e --- /dev/null +++ b/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.reference @@ -0,0 +1,6 @@ +---- after first export, destination row count +2 +---- after second export, destination row count +3 +---- partition 2000 has no duplicate rows in destination +2 diff --git a/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh b/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh new file mode 100755 index 000000000000..d8d6fb042d6d --- /dev/null +++ b/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-parallel, no-replicated-database, replica +# Tag no-fasttest: requires S3 storage. +# An already-exported partition must not be re-exported on the next scheduler tick. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +src="ttl_export_skip_src_${RANDOM}" +dst="ttl_export_skip_dst_${RANDOM}" + +query() { $CLICKHOUSE_CLIENT --query "$1"; } + +poll_status() { + local partition="$1" + local expected="$2" + local deadline=$(( $(date +%s) + 90 )) + local s="" + while [ "$(date +%s)" -lt "$deadline" ]; do + s=$(query "SELECT status FROM system.replicated_partition_exports WHERE source_table = '$src' AND destination_table = '$dst' AND partition_id = '$partition'") + if [ "$s" = "$expected" ]; then return 0; fi + sleep 1 + done + echo "Timed out waiting for partition $partition to reach $expected (last: '$s')" >&2 + return 1 +} + +query "DROP TABLE IF EXISTS $src SYNC" +query "DROP TABLE IF EXISTS $dst SYNC" + +query "CREATE TABLE $dst (event_date Date, id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dst', format=Parquet, partition_strategy='hive') PARTITION BY year" + +# Identity partition key on `year` so the hive S3 destination accepts it. +query "CREATE TABLE $src (event_date Date, id UInt64, year UInt16) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$src', 'r1') + PARTITION BY year ORDER BY id + TTL event_date + INTERVAL 1 DAY EXPORT TO $dst" + +query "INSERT INTO $src VALUES (toDate('2000-01-01'), 1, 2000), (toDate('2000-01-02'), 2, 2000)" +poll_status 2000 COMPLETED + +echo "---- after first export, destination row count" +query "SELECT count() FROM $dst" + +# Add a second partition; it should export and the first must not export again. +query "INSERT INTO $src VALUES (toDate('2001-01-01'), 3, 2001)" +poll_status 2001 COMPLETED + +echo "---- after second export, destination row count" +query "SELECT count() FROM $dst" + +# The destination row count for partition 2000 is the witness that it was not re-exported: +# the ttl-marker for 2000 is removed when 2001 is submitted, so the history-count check is +# no longer meaningful under the new "at most one ttl-origin per (src, dest)" invariant. +echo "---- partition 2000 has no duplicate rows in destination" +query "SELECT count() FROM $dst WHERE year = 2000" + +query "DROP TABLE $src SYNC" +query "DROP TABLE $dst SYNC" From 399bd77f382fc1eb79cc41c98e4ffa49eef54878 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Tue, 19 May 2026 12:24:46 +0200 Subject: [PATCH 02/13] Add `EXPORT` TTL mode for partition export to a destination table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce parser, AST, and metadata plumbing for `TTL ... EXPORT TO db.table`. No background scheduler or per-part TTL info yet — those land in follow-up commits. The clause is recognised, round-trips through `SHOW CREATE TABLE`, and the resulting `TTLDescription` is collected into `TTLTableDescription`'s new `export_ttl` list (exposed via `StorageInMemoryMetadata::getExportTTLs`). Validation in `TTLTableDescription::getTTLForTableFromAST`: * reject two `EXPORT` clauses to the same destination, * reject `EXPORT` TTL on a table with no partition key. The destination-specific override of `TTLDescription::result_column` (`"_export_" + db + "." + table`) is required so that future per-part TTL info (keyed by `result_column`) keeps separate clocks per destination. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Databases/DatabasesCommon.cpp | 5 ++- src/Parsers/ASTTTLElement.cpp | 16 ++++++++ src/Parsers/CommonParsers.h | 1 + src/Parsers/ExpressionElementParsers.cpp | 16 ++++++++ src/Storages/AlterCommands.cpp | 4 +- .../ReplicatedMergeTreeTableMetadata.cpp | 9 +++-- .../MergeTree/registerStorageMergeTree.cpp | 2 +- src/Storages/StorageInMemoryMetadata.cpp | 10 +++++ src/Storages/StorageInMemoryMetadata.h | 4 ++ src/Storages/TTLDescription.cpp | 38 ++++++++++++++++++- src/Storages/TTLDescription.h | 18 ++++++++- src/Storages/TTLMode.h | 1 + 12 files changed, 113 insertions(+), 11 deletions(-) diff --git a/src/Databases/DatabasesCommon.cpp b/src/Databases/DatabasesCommon.cpp index 7932c4a69a9a..4274b21e6e33 100644 --- a/src/Databases/DatabasesCommon.cpp +++ b/src/Databases/DatabasesCommon.cpp @@ -129,12 +129,13 @@ void validateCreateQuery(const ASTCreateQuery & query, ContextPtr context) primary_key = KeyDescription::getKeyFromAST(storage.order_by->ptr(), columns_desc, context); if (storage.primary_key) primary_key = KeyDescription::getKeyFromAST(storage.primary_key->ptr(), columns_desc, context); + KeyDescription partition_key; if (storage.partition_by) - KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context); + partition_key = KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context); if (storage.sample_by) KeyDescription::getKeyFromAST(storage.sample_by->ptr(), columns_desc, context); if (storage.ttl_table && primary_key.has_value()) - TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, true); + TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, partition_key, true); } } diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index c6c8d1eb8a08..84597e2a2c1b 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -79,6 +79,22 @@ void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settin ostr << " RECOMPRESS "; recompression_codec->format(ostr, settings, state, frame); } + else if (mode == TTLMode::EXPORT) + { + if (destination_type != DataDestinationType::TABLE) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Unsupported destination type {} for TTL EXPORT", + magic_enum::enum_name(destination_type)); + + ostr << " EXPORT TO "; + auto dot_pos = destination_name.find('.'); + if (dot_pos == String::npos) + ostr << backQuoteIfNeed(destination_name); + else + ostr << backQuoteIfNeed(std::string_view(destination_name).substr(0, dot_pos)) + << '.' + << backQuoteIfNeed(std::string_view(destination_name).substr(dot_pos + 1)); + } else if (mode == TTLMode::DELETE) { /// It would be better to output "DELETE" here but that will break compatibility with earlier versions. diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 29ea00e6d04f..101c6e899ee1 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -351,6 +351,7 @@ namespace DB MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \ + MR_MACROS(EXPORT_TO, "EXPORT TO") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index aebd8d4b1094..82ca6f3695a8 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include @@ -2449,6 +2450,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_set(Keyword::SET); ParserKeyword s_recompress(Keyword::RECOMPRESS); ParserKeyword s_codec(Keyword::CODEC); + ParserKeyword s_export_to(Keyword::EXPORT_TO); ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL); ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL); ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL); @@ -2496,6 +2498,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { mode = TTLMode::RECOMPRESS; } + else if (s_export_to.ignore(pos, expected)) + { + mode = TTLMode::EXPORT; + destination_type = DataDestinationType::TABLE; + } else { /// DELETE is the default mode. @@ -2547,6 +2554,15 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (!parser_codec.parse(pos, recompression_codec, expected)) return false; } + else if (mode == TTLMode::EXPORT) + { + String dst_database; + String dst_table; + if (!parseDatabaseAndTableName(pos, expected, dst_database, dst_table)) + return false; + + destination_name = dst_database.empty() ? dst_table : dst_database + "." + dst_table; + } auto ttl_element = make_intrusive(mode, destination_type, destination_name, if_exists); ttl_element->setTTL(std::move(ttl_expr)); diff --git a/src/Storages/AlterCommands.cpp b/src/Storages/AlterCommands.cpp index 8a926885f1f8..7872d6a2924f 100644 --- a/src/Storages/AlterCommands.cpp +++ b/src/Storages/AlterCommands.cpp @@ -870,7 +870,8 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context) else if (type == MODIFY_TTL) { metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST( - ttl, metadata.columns, context, metadata.primary_key, context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]); + ttl, metadata.columns, context, metadata.primary_key, metadata.partition_key, + context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]); } else if (type == REMOVE_TTL) { @@ -1393,6 +1394,7 @@ void AlterCommands::apply(StorageInMemoryMetadata & metadata, ContextPtr context metadata_copy.columns, context, metadata_copy.primary_key, + metadata_copy.partition_key, context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]); metadata = std::move(metadata_copy); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp index dcbd9a0dde2e..32c4cc3c883b 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp @@ -412,9 +412,10 @@ bool ReplicatedMergeTreeTableMetadata::checkEquals( } auto parsed_primary_key = KeyDescription::parse(primary_key, columns, context, true); + auto parsed_partition_key = KeyDescription::parse(partition_key, columns, context, false); // Strict checking of suspicious TTL is not needed here String parsed_zk_ttl_table = formattedAST( - TTLTableDescription::parse(from_zk.ttl_table, columns, context, parsed_primary_key, /* is_attach = */ true).definition_ast); + TTLTableDescription::parse(from_zk.ttl_table, columns, context, parsed_primary_key, parsed_partition_key, /* is_attach = */ true).definition_ast); if (ttl_table != parsed_zk_ttl_table) { handleTableMetadataMismatch(table_name_for_error_message, "TTL", from_zk.ttl_table, parsed_zk_ttl_table, ttl_table, strict_check, logger); @@ -574,7 +575,8 @@ StorageInMemoryMetadata ReplicatedMergeTreeTableMetadata::Diff::getNewMetadata(c ParserTTLExpressionList parser; auto ttl_for_table_ast = parseQuery(parser, new_ttl_table, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS); new_metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST( - ttl_for_table_ast, new_metadata.columns, context, new_metadata.primary_key, true /* allow_suspicious; because it is replication */); + ttl_for_table_ast, new_metadata.columns, context, new_metadata.primary_key, new_metadata.partition_key, + true /* allow_suspicious; because it is replication */); } else /// TTL was removed { @@ -636,7 +638,8 @@ StorageInMemoryMetadata ReplicatedMergeTreeTableMetadata::Diff::getNewMetadata(c if (!ttl_table_changed && new_metadata.table_ttl.definition_ast != nullptr) new_metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST( - new_metadata.table_ttl.definition_ast, new_metadata.columns, context, new_metadata.primary_key, true /* allow_suspicious; because it is replication */); + new_metadata.table_ttl.definition_ast, new_metadata.columns, context, new_metadata.primary_key, new_metadata.partition_key, + true /* allow_suspicious; because it is replication */); if (!projections_changed) { diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index e63605989cde..18eb62740458 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -710,7 +710,7 @@ static StoragePtr create(const StorageFactory::Arguments & args) if (args.storage_def->ttl_table) { metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST( - args.storage_def->ttl_table->ptr(), metadata.columns, context, metadata.primary_key, allow_suspicious_ttl); + args.storage_def->ttl_table->ptr(), metadata.columns, context, metadata.primary_key, metadata.partition_key, allow_suspicious_ttl); } storage_settings->loadFromQuery(*args.storage_def, context, LoadingStrictnessLevel::ATTACH <= args.mode); diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index bb0e26c4cea0..b03106d22db4 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -358,6 +358,16 @@ bool StorageInMemoryMetadata::hasAnyGroupByTTL() const return !table_ttl.group_by_ttl.empty(); } +TTLDescriptions StorageInMemoryMetadata::getExportTTLs() const +{ + return table_ttl.export_ttl; +} + +bool StorageInMemoryMetadata::hasAnyExportTTL() const +{ + return !table_ttl.export_ttl.empty(); +} + ColumnDependencies StorageInMemoryMetadata::getColumnDependencies( const NameSet & updated_columns, bool include_ttl_target, diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index aa577f8d02ff..ce3516713ca8 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -188,6 +188,10 @@ struct StorageInMemoryMetadata TTLDescriptions getGroupByTTLs() const; bool hasAnyGroupByTTL() const; + /// Wrapper for table TTLs, returns partition-export TTLs. + TTLDescriptions getExportTTLs() const; + bool hasAnyExportTTL() const; + using HasDependencyCallback = std::function; /// Returns columns, which will be needed to calculate dependencies (skip indices, projections, diff --git a/src/Storages/TTLDescription.cpp b/src/Storages/TTLDescription.cpp index 1bb3587b28c2..ff14737b85b1 100644 --- a/src/Storages/TTLDescription.cpp +++ b/src/Storages/TTLDescription.cpp @@ -354,6 +354,17 @@ TTLDescription TTLDescription::getTTLFromAST( } checkTTLExpression(expression, result.result_column, is_attach || context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]); + + if (result.mode == TTLMode::EXPORT) + { + /// The TTL expression's natural result column name is derived from the expression text and + /// is therefore identical across multiple EXPORT TTLs that share the same expression but + /// target different destinations. Override it with a destination-specific name so it can be + /// used as a stable key into `MergeTreeDataPartTTLInfos::export_ttl`. The override must + /// happen after `checkTTLExpression`, which looks the column up in the sample block. + result.result_column = "_export_" + result.destination_name; + } + return result; } @@ -365,6 +376,7 @@ TTLTableDescription::TTLTableDescription(const TTLTableDescription & other) , move_ttl(other.move_ttl) , recompression_ttl(other.recompression_ttl) , group_by_ttl(other.group_by_ttl) + , export_ttl(other.export_ttl) { } @@ -383,6 +395,7 @@ TTLTableDescription & TTLTableDescription::operator=(const TTLTableDescription & move_ttl = other.move_ttl; recompression_ttl = other.recompression_ttl; group_by_ttl = other.group_by_ttl; + export_ttl = other.export_ttl; return *this; } @@ -392,6 +405,7 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST( const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, + const KeyDescription & partition_key, bool is_attach) { TTLTableDescription result; @@ -427,6 +441,21 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST( { result.group_by_ttl.emplace_back(std::move(ttl)); } + else if (ttl.mode == TTLMode::EXPORT) + { + if (partition_key.column_names.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "TTL EXPORT TO {} requires the table to have a partition key", ttl.destination_name); + + for (const auto & existing : result.export_ttl) + { + if (existing.destination_name == ttl.destination_name) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Two TTL EXPORT clauses target the same destination {}", ttl.destination_name); + } + + result.export_ttl.emplace_back(std::move(ttl)); + } else { result.move_ttl.emplace_back(std::move(ttl)); @@ -436,7 +465,12 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST( } TTLTableDescription TTLTableDescription::parse( - const String & str, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, bool is_attach) + const String & str, + const ColumnsDescription & columns, + ContextPtr context, + const KeyDescription & primary_key, + const KeyDescription & partition_key, + bool is_attach) { TTLTableDescription result; if (str.empty()) @@ -446,7 +480,7 @@ TTLTableDescription TTLTableDescription::parse( ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS); FunctionNameNormalizer::visit(ast.get()); - return getTTLForTableFromAST(ast, columns, context, primary_key, is_attach); + return getTTLForTableFromAST(ast, columns, context, primary_key, partition_key, is_attach); } } diff --git a/src/Storages/TTLDescription.h b/src/Storages/TTLDescription.h index 59566d67f6ad..9e64f58b499a 100644 --- a/src/Storages/TTLDescription.h +++ b/src/Storages/TTLDescription.h @@ -131,15 +131,29 @@ struct TTLTableDescription TTLDescriptions group_by_ttl; + /// Per-partition export TTLs (`TTL ... EXPORT TO db.table`). + TTLDescriptions export_ttl; + TTLTableDescription() = default; TTLTableDescription(const TTLTableDescription & other); TTLTableDescription & operator=(const TTLTableDescription & other); static TTLTableDescription getTTLForTableFromAST( - const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, bool is_attach); + const ASTPtr & definition_ast, + const ColumnsDescription & columns, + ContextPtr context, + const KeyDescription & primary_key, + const KeyDescription & partition_key, + bool is_attach); /// Parse description from string - static TTLTableDescription parse(const String & str, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, bool is_attach); + static TTLTableDescription parse( + const String & str, + const ColumnsDescription & columns, + ContextPtr context, + const KeyDescription & primary_key, + const KeyDescription & partition_key, + bool is_attach); }; } diff --git a/src/Storages/TTLMode.h b/src/Storages/TTLMode.h index bbbdbee400ae..92f994a96b80 100644 --- a/src/Storages/TTLMode.h +++ b/src/Storages/TTLMode.h @@ -10,6 +10,7 @@ enum class TTLMode : uint8_t MOVE, GROUP_BY, RECOMPRESS, + EXPORT, }; } From 0fb5adbdcbb732dadcc834e7adbd02c75fbf975a Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Tue, 19 May 2026 12:58:54 +0200 Subject: [PATCH 03/13] Add per-part `export_ttl` info for partition export TTLs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stores per-part TTL info under `MergeTreeDataPartTTLInfos::export_ttl`, keyed by `TTLDescription::result_column`. The map is: * populated at write time in `MergeTreeDataWriter` for every TTL returned by `getExportTTLs`, * recomputed during `MATERIALIZE TTL` and merge-time TTL recompute via `TTLCalcTransform` / `TTLTransform` (a new `TTLUpdateField::EXPORT_TTL` finalizes into the right map), * serialized in JSON under the `"export"` key (mirroring the `recompression` entry), * propagated across merges through the existing `update` aggregation, * surfaced through `hasAnyNonFinishedTTLs` and `checkAllTTLCalculated` so old parts that predate the TTL are flagged for `MATERIALIZE TTL`. Adds the partition-wide helper `getPartitionExportTTLMax`: returns the max `export_ttl.max` across all parts of a partition, or `nullopt` if any part is missing the entry (with optional `missing_parts_out` for the scheduler to log). Deliberate: no on-the-fly evaluation — the user runs `ALTER TABLE ... MATERIALIZE TTL` to backfill, same UX as moves and recompression TTLs. Also pulls `export_ttl` into `hasAnyTableTTL` / `hasOnlyRowsTTL` and the `getColumnDependencies` TTL-column-set walk. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp | 4 ++ src/Processors/TTL/TTLUpdateInfoAlgorithm.h | 1 + .../Transforms/TTLCalcTransform.cpp | 5 ++ src/Processors/Transforms/TTLTransform.cpp | 5 ++ src/Storages/MergeTree/IMergeTreeDataPart.cpp | 6 +++ .../MergeTree/MergeTreeDataPartTTLInfo.cpp | 52 +++++++++++++++++++ .../MergeTree/MergeTreeDataPartTTLInfo.h | 20 ++++++- .../MergeTree/MergeTreeDataWriter.cpp | 3 ++ src/Storages/StorageInMemoryMetadata.cpp | 7 ++- src/Storages/TTLDescription.cpp | 11 ---- 10 files changed, 100 insertions(+), 14 deletions(-) diff --git a/src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp b/src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp index 2bf098a385df..3c6d1fc9ee3d 100644 --- a/src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp +++ b/src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp @@ -60,6 +60,10 @@ void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) cons data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info; data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); } + else if (ttl_update_field == TTLUpdateField::EXPORT_TTL) + { + data_part->ttl_infos.export_ttl[ttl_update_key] = new_ttl_info; + } } diff --git a/src/Processors/TTL/TTLUpdateInfoAlgorithm.h b/src/Processors/TTL/TTLUpdateInfoAlgorithm.h index 52cd15095674..6991e120e88f 100644 --- a/src/Processors/TTL/TTLUpdateInfoAlgorithm.h +++ b/src/Processors/TTL/TTLUpdateInfoAlgorithm.h @@ -13,6 +13,7 @@ enum class TTLUpdateField : uint8_t MOVES_TTL, RECOMPRESSION_TTL, GROUP_BY_TTL, + EXPORT_TTL, }; /// Calculates new ttl_info and does nothing with data. diff --git a/src/Processors/Transforms/TTLCalcTransform.cpp b/src/Processors/Transforms/TTLCalcTransform.cpp index b23aef1bc705..f0a345be14b9 100644 --- a/src/Processors/Transforms/TTLCalcTransform.cpp +++ b/src/Processors/Transforms/TTLCalcTransform.cpp @@ -73,6 +73,11 @@ TTLCalcTransform::TTLCalcTransform( algorithms.emplace_back(std::make_unique( getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); + + for (const auto & export_ttl : metadata_snapshot_->getExportTTLs()) + algorithms.emplace_back(std::make_unique( + getExpressions(export_ttl, subqueries_for_sets, context), export_ttl, + TTLUpdateField::EXPORT_TTL, export_ttl.destination_name, old_ttl_infos.export_ttl[export_ttl.destination_name], current_time_, force_)); } void TTLCalcTransform::consume(Chunk chunk) diff --git a/src/Processors/Transforms/TTLTransform.cpp b/src/Processors/Transforms/TTLTransform.cpp index 1d1154f217ce..c3f42da87df3 100644 --- a/src/Processors/Transforms/TTLTransform.cpp +++ b/src/Processors/Transforms/TTLTransform.cpp @@ -146,6 +146,11 @@ TTLTransform::TTLTransform( algorithms.emplace_back(std::make_unique( getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); + + for (const auto & export_ttl : metadata_snapshot_->getExportTTLs()) + algorithms.emplace_back(std::make_unique( + getExpressions(export_ttl, subqueries_for_sets, context), export_ttl, + TTLUpdateField::EXPORT_TTL, export_ttl.destination_name, old_ttl_infos.export_ttl[export_ttl.destination_name], current_time_, force_)); } Block reorderColumns(Block block, const Block & header) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 1d6ad1ef20a2..ad2a71497bbf 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -2874,6 +2874,12 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada return false; } + for (const auto & export_desc : metadata_snapshot->getExportTTLs()) + { + if (!ttl_infos.export_ttl.contains(export_desc.destination_name)) + return false; + } + return true; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp index 4f169084b2f8..fcdd03a34833 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -59,6 +60,9 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i for (const auto & [expression, ttl_info] : other_infos.moves_ttl) moves_ttl[expression].update(ttl_info); + for (const auto & [name, ttl_info] : other_infos.export_ttl) + export_ttl[name].update(ttl_info); + table_ttl.update(other_infos.table_ttl); updatePartMinMaxTTL(table_ttl.min, table_ttl.max); } @@ -140,6 +144,11 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) const JSON & rows_where = json["rows_where"]; fill_ttl_info_map(rows_where, rows_where_ttl, true); } + if (json.has("export")) + { + const JSON & export_part = json["export"]; + fill_ttl_info_map(export_part, export_ttl, false); + } } @@ -225,7 +234,13 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const } if (!rows_where_ttl.empty()) + { write_infos(rows_where_ttl, "rows_where", is_first); + is_first = false; + } + + if (!export_ttl.empty()) + write_infos(export_ttl, "export", is_first); writeString("}", out); } @@ -273,6 +288,9 @@ bool MergeTreeDataPartTTLInfos::hasAnyNonFinishedTTLs() const if (has_non_finished_ttl(group_by_ttl)) return true; + if (has_non_finished_ttl(export_ttl)) + return true; + return false; } @@ -333,5 +351,39 @@ std::optional selectTTLDescriptionForTTLInfos(const TTLDescripti return best_ttl_time ? *best_entry_it : std::optional(); } +std::optional getPartitionExportTTLMax( + const TTLDescription & desc, + const DataPartsVector & parts_in_partition, + std::vector * missing_parts_out) +{ + if (parts_in_partition.empty()) + return std::nullopt; + + time_t result = 0; + bool any_missing = false; + for (const auto & part : parts_in_partition) + { + const auto & map = part->ttl_infos.export_ttl; + auto it = map.find(desc.destination_name); + if (it == map.end()) + { + any_missing = true; + if (missing_parts_out) + missing_parts_out->push_back(part->name); + else + return std::nullopt; + continue; + } + + if (it->second.max > result) + result = it->second.max; + } + + if (any_missing) + return std::nullopt; + + return result; +} + } diff --git a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h index 9924b3ce6f06..42042499bbb4 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h +++ b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h @@ -53,6 +53,11 @@ struct MergeTreeDataPartTTLInfos TTLInfoMap group_by_ttl; + /// Per-partition export TTL info; keyed by `TTLDescription::destination_name`. + /// Two EXPORT TTLs sharing the same expression but targeting different destinations + /// would alias under `result_column`, so `destination_name` is used instead. + TTLInfoMap export_ttl; + /// Return the smallest max recompression TTL value time_t getMinimalMaxRecompressionTTL() const; @@ -75,11 +80,24 @@ struct MergeTreeDataPartTTLInfos bool empty() const { /// part_min_ttl in minimum of rows, rows_where and group_by TTLs - return !part_min_ttl && moves_ttl.empty() && recompression_ttl.empty() && columns_ttl.empty() && rows_where_ttl.empty() && group_by_ttl.empty(); + return !part_min_ttl && moves_ttl.empty() && recompression_ttl.empty() && columns_ttl.empty() && rows_where_ttl.empty() && group_by_ttl.empty() && export_ttl.empty(); } }; /// Selects the most appropriate TTLDescription using TTL info and current time. std::optional selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max); +/// Returns the partition-wide max EXPORT TTL across all parts. Returns nullopt if any part lacks +/// an entry for `desc.destination_name` — such parts predate the TTL and require `ALTER TABLE ... +/// MATERIALIZE TTL` before the partition can be exported. Names of those parts are appended to +/// `missing_parts_out` (if non-null) so the caller can log them. +class IMergeTreeDataPart; +using DataPartPtr = std::shared_ptr; +using DataPartsVector = std::vector; + +std::optional getPartitionExportTTLMax( + const TTLDescription & desc, + const DataPartsVector & parts_in_partition, + std::vector * missing_parts_out = nullptr); + } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index a3583fc0d081..55ad2226763e 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -873,6 +873,9 @@ MergeTreeTemporaryPartPtr MergeTreeDataWriter::writeTempPartImpl( for (const auto & ttl_entry : recompression_ttl_entries) updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column], block, false); + for (const auto & ttl_entry : metadata_snapshot->getExportTTLs()) + updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.export_ttl[ttl_entry.destination_name], block, false); + new_data_part->ttl_infos.update(move_ttl_infos); /// This effectively chooses minimal compression method: diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index b03106d22db4..dd7012b7054e 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -289,12 +289,12 @@ TTLTableDescription StorageInMemoryMetadata::getTableTTLs() const bool StorageInMemoryMetadata::hasAnyTableTTL() const { - return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL(); + return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyExportTTL(); } bool StorageInMemoryMetadata::hasOnlyRowsTTL() const { - bool has_any_other_ttl = hasAnyMoveTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyColumnTTL(); + bool has_any_other_ttl = hasAnyMoveTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyColumnTTL() || hasAnyExportTTL(); return hasRowsTTL() && !has_any_other_ttl; } @@ -447,6 +447,9 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies( for (const auto & entry : getMoveTTLs()) add_dependent_columns(entry.expression_columns.getNames(), required_ttl_columns); + for (const auto & entry : getExportTTLs()) + add_dependent_columns(entry.expression_columns.getNames(), required_ttl_columns); + //TODO what about rows_where_ttl and group_by_ttl ?? for (const auto & column : indices_columns) diff --git a/src/Storages/TTLDescription.cpp b/src/Storages/TTLDescription.cpp index ff14737b85b1..c19c62239eeb 100644 --- a/src/Storages/TTLDescription.cpp +++ b/src/Storages/TTLDescription.cpp @@ -354,17 +354,6 @@ TTLDescription TTLDescription::getTTLFromAST( } checkTTLExpression(expression, result.result_column, is_attach || context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]); - - if (result.mode == TTLMode::EXPORT) - { - /// The TTL expression's natural result column name is derived from the expression text and - /// is therefore identical across multiple EXPORT TTLs that share the same expression but - /// target different destinations. Override it with a destination-specific name so it can be - /// used as a stable key into `MergeTreeDataPartTTLInfos::export_ttl`. The override must - /// happen after `checkTTLExpression`, which looks the column up in the sample block. - result.result_column = "_export_" + result.destination_name; - } - return result; } From be213b777b84434702d6386e880fa4bd3fb50ee1 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 08:29:15 +0200 Subject: [PATCH 04/13] Switch TTL EXPORT syntax to `EXPORT TO TABLE` Align the TTL syntax with `ALTER ... EXPORT PARTITION TO TABLE`: the keyword is now `EXPORT TO TABLE ` instead of `EXPORT TO `. Parser, formatter, exception messages, and tests updated to match. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Parsers/ASTTTLElement.cpp | 2 +- src/Parsers/CommonParsers.h | 2 +- src/Parsers/ExpressionElementParsers.cpp | 4 ++-- src/Storages/TTLDescription.cpp | 2 +- src/Storages/TTLDescription.h | 2 +- tests/integration/test_ttl_export_partition/test.py | 6 +++--- .../04206_ttl_export_partition_syntax.reference | 4 ++-- .../04206_ttl_export_partition_syntax.sql | 12 ++++++------ .../0_stateless/04207_ttl_export_partition_basic.sh | 2 +- ...208_ttl_export_partition_skip_already_exported.sh | 2 +- 10 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index 84597e2a2c1b..b43ac15ef546 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -86,7 +86,7 @@ void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settin "Unsupported destination type {} for TTL EXPORT", magic_enum::enum_name(destination_type)); - ostr << " EXPORT TO "; + ostr << " EXPORT TO TABLE "; auto dot_pos = destination_name.find('.'); if (dot_pos == String::npos) ostr << backQuoteIfNeed(destination_name); diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 101c6e899ee1..6cae6f02ab02 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -351,7 +351,7 @@ namespace DB MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \ - MR_MACROS(EXPORT_TO, "EXPORT TO") \ + MR_MACROS(EXPORT_TO_TABLE, "EXPORT TO TABLE") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 82ca6f3695a8..cf52edb1dc15 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -2450,7 +2450,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_set(Keyword::SET); ParserKeyword s_recompress(Keyword::RECOMPRESS); ParserKeyword s_codec(Keyword::CODEC); - ParserKeyword s_export_to(Keyword::EXPORT_TO); + ParserKeyword s_export_to_table(Keyword::EXPORT_TO_TABLE); ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL); ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL); ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL); @@ -2498,7 +2498,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { mode = TTLMode::RECOMPRESS; } - else if (s_export_to.ignore(pos, expected)) + else if (s_export_to_table.ignore(pos, expected)) { mode = TTLMode::EXPORT; destination_type = DataDestinationType::TABLE; diff --git a/src/Storages/TTLDescription.cpp b/src/Storages/TTLDescription.cpp index c19c62239eeb..23ebbd55aaa6 100644 --- a/src/Storages/TTLDescription.cpp +++ b/src/Storages/TTLDescription.cpp @@ -434,7 +434,7 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST( { if (partition_key.column_names.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, - "TTL EXPORT TO {} requires the table to have a partition key", ttl.destination_name); + "TTL EXPORT TO TABLE {} requires the table to have a partition key", ttl.destination_name); for (const auto & existing : result.export_ttl) { diff --git a/src/Storages/TTLDescription.h b/src/Storages/TTLDescription.h index 9e64f58b499a..196759fcff09 100644 --- a/src/Storages/TTLDescription.h +++ b/src/Storages/TTLDescription.h @@ -131,7 +131,7 @@ struct TTLTableDescription TTLDescriptions group_by_ttl; - /// Per-partition export TTLs (`TTL ... EXPORT TO db.table`). + /// Per-partition export TTLs (`TTL ... EXPORT TO TABLE db.table`). TTLDescriptions export_ttl; TTLTableDescription() = default; diff --git a/tests/integration/test_ttl_export_partition/test.py b/tests/integration/test_ttl_export_partition/test.py index 38099b0045ae..ad2a1a26eb02 100644 --- a/tests/integration/test_ttl_export_partition/test.py +++ b/tests/integration/test_ttl_export_partition/test.py @@ -1,4 +1,4 @@ -"""Integration tests for `TTL ... EXPORT TO db.table`. +"""Integration tests for `TTL ... EXPORT TO TABLE db.table`. The stateless suite covers parser, single-replica happy path and skip-already-exported. This suite covers what can only be observed on a multi-replica cluster: ZK race, @@ -87,7 +87,7 @@ def create_rmt_with_export_ttl(node, name, dst, interval="INTERVAL 1 DAY"): CREATE TABLE {name} (event_date Date, id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{database}}/{name}', '{node.name}') PARTITION BY year ORDER BY id - TTL event_date + {interval} EXPORT TO {dst} + TTL event_date + {interval} EXPORT TO TABLE {dst} """ ) @@ -323,7 +323,7 @@ def test_modify_ttl_picks_up_with_materialize(cluster): # Disable the implicit materialise so we can demonstrate the explicit `MATERIALIZE TTL` # back-fill below — otherwise `MODIFY TTL` populates the per-part info itself. node.query( - f"ALTER TABLE {src} MODIFY TTL event_date + INTERVAL 1 DAY EXPORT TO {dst}", + f"ALTER TABLE {src} MODIFY TTL event_date + INTERVAL 1 DAY EXPORT TO TABLE {dst}", settings={"materialize_ttl_after_modify": 0}, ) diff --git a/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference index b7156fa9f148..0ff0ac51256c 100644 --- a/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference +++ b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.reference @@ -1,2 +1,2 @@ -TTL event_date + toIntervalDay(7) EXPORT TO ttl_export_dst -TTL event_date + toIntervalDay(7) EXPORT TO ttl_export_dst, event_date + toIntervalDay(30) +TTL event_date + toIntervalDay(7) EXPORT TO TABLE ttl_export_dst +TTL event_date + toIntervalDay(7) EXPORT TO TABLE ttl_export_dst, event_date + toIntervalDay(30) diff --git a/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql index 515f62a0dd21..5f65654e1a54 100644 --- a/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql +++ b/tests/queries/0_stateless/04206_ttl_export_partition_syntax.sql @@ -1,5 +1,5 @@ -- Tags: zookeeper, no-replicated-database --- Parser and metadata round-trip for `TTL ... EXPORT TO db.table`, plus validation. +-- Parser and metadata round-trip for `TTL ... EXPORT TO TABLE db.table`, plus validation. DROP TABLE IF EXISTS ttl_export_src SYNC; DROP TABLE IF EXISTS ttl_export_dst SYNC; @@ -12,7 +12,7 @@ CREATE TABLE ttl_export_src (event_date Date, id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src', 'r1') PARTITION BY toYear(event_date) ORDER BY id -TTL event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst; +TTL event_date + INTERVAL 7 DAY EXPORT TO TABLE ttl_export_dst; SELECT replaceRegexpOne(extract(create_table_query, 'TTL [^\n]+'), ' SETTINGS .*$', '') FROM system.tables WHERE database = currentDatabase() AND name = 'ttl_export_src'; @@ -26,7 +26,7 @@ PARTITION BY toYear(event_date) ORDER BY id; ALTER TABLE ttl_export_src MODIFY TTL - event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst, + event_date + INTERVAL 7 DAY EXPORT TO TABLE ttl_export_dst, event_date + INTERVAL 30 DAY DELETE; SELECT replaceRegexpOne(extract(create_table_query, 'TTL [^\n]+'), ' SETTINGS .*$', '') FROM system.tables @@ -40,13 +40,13 @@ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src', 'r1 PARTITION BY toYear(event_date) ORDER BY id TTL - event_date + INTERVAL 1 DAY EXPORT TO ttl_export_dst, - event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst; -- { serverError BAD_ARGUMENTS } + event_date + INTERVAL 1 DAY EXPORT TO TABLE ttl_export_dst, + event_date + INTERVAL 7 DAY EXPORT TO TABLE ttl_export_dst; -- { serverError BAD_ARGUMENTS } -- 4. EXPORT TTL on a table without a partition key must be rejected. CREATE TABLE ttl_export_src (event_date Date, id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/ttl_export_src_nopk', 'r1') ORDER BY id -TTL event_date + INTERVAL 7 DAY EXPORT TO ttl_export_dst; -- { serverError BAD_ARGUMENTS } +TTL event_date + INTERVAL 7 DAY EXPORT TO TABLE ttl_export_dst; -- { serverError BAD_ARGUMENTS } DROP TABLE IF EXISTS ttl_export_dst SYNC; diff --git a/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh b/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh index 6a9731a07bf1..9e19cfd27b2f 100755 --- a/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh +++ b/tests/queries/0_stateless/04207_ttl_export_partition_basic.sh @@ -37,7 +37,7 @@ query "CREATE TABLE $dst (event_date Date, id UInt64, year UInt16) ENGINE = S3(s query "CREATE TABLE $src (event_date Date, id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$src', 'r1') PARTITION BY year ORDER BY id - TTL event_date + INTERVAL 1 DAY EXPORT TO $dst" + TTL event_date + INTERVAL 1 DAY EXPORT TO TABLE $dst" query "INSERT INTO $src VALUES (toDate('2000-01-01'), 1, 2000), (toDate('2000-01-02'), 2, 2000), (toDate('2000-01-03'), 3, 2000)" query "INSERT INTO $src VALUES (toDate('2001-01-01'), 4, 2001), (toDate('2001-01-02'), 5, 2001)" diff --git a/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh b/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh index d8d6fb042d6d..47e5f921b928 100755 --- a/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh +++ b/tests/queries/0_stateless/04208_ttl_export_partition_skip_already_exported.sh @@ -35,7 +35,7 @@ query "CREATE TABLE $dst (event_date Date, id UInt64, year UInt16) ENGINE = S3(s query "CREATE TABLE $src (event_date Date, id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$src', 'r1') PARTITION BY year ORDER BY id - TTL event_date + INTERVAL 1 DAY EXPORT TO $dst" + TTL event_date + INTERVAL 1 DAY EXPORT TO TABLE $dst" query "INSERT INTO $src VALUES (toDate('2000-01-01'), 1, 2000), (toDate('2000-01-02'), 2, 2000)" poll_status 2000 COMPLETED From 0664ab8bfa5867d5594040796b84cf8ed71b88cb Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 08:45:45 +0200 Subject: [PATCH 05/13] Add `export_origin` to partition export manifests Adds an `ExportOrigin` enum (`alter` | `ttl`) to the manifest body so manifests submitted manually (`ALTER ... EXPORT PARTITION`) can be told apart from manifests submitted by the upcoming TTL scheduler. Surfaced as `system.replicated_partition_exports.export_origin Enum8('alter' = 0, 'ttl' = 1)`. Existing manifests in ZooKeeper that don't carry the field read back as `alter` for backwards compatibility. `ttl`-origin manifests are skipped by manifest-TTL eviction: the background cleanup in `ExportPartitionManifestUpdatingTask` and the overwrite path in `StorageReplicatedMergeTree::exportPartitionToTable` both refuse to consider them expired. The existing `export_merge_tree_partition_force_export` setting still overrides via the unchanged gate. The write site keeps the default `ExportOrigin::alter`; the TTL submitter writes `ExportOrigin::ttl` in a follow-up commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...xportReplicatedMergeTreePartitionManifest.h | 18 ++++++++++++++++++ .../ExportPartitionManifestUpdatingTask.cpp | 7 ++++++- src/Storages/StorageReplicatedMergeTree.cpp | 4 +++- ...StorageSystemReplicatedPartitionExports.cpp | 8 ++++++++ .../StorageSystemReplicatedPartitionExports.h | 2 ++ 5 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index dd5ef9886ded..d020ba755a0b 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -11,6 +11,15 @@ namespace DB { +/// Distinguishes manifests submitted by manual `ALTER ... EXPORT PARTITION` from those +/// submitted by the TTL scheduler. Persisted in the manifest body and surfaced through +/// `system.replicated_partition_exports.export_origin`. +enum class ExportOrigin : int8_t +{ + alter = 0, + ttl = 1, +}; + struct ExportReplicatedMergeTreePartitionProcessingPartEntry { @@ -121,6 +130,7 @@ struct ExportReplicatedMergeTreePartitionManifest String filename_pattern; bool write_full_path_in_iceberg_metadata = false; String iceberg_metadata_json; + ExportOrigin export_origin = ExportOrigin::alter; std::string toJsonString() const { @@ -154,6 +164,7 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("ttl_seconds", ttl_seconds); json.set("task_timeout_seconds", task_timeout_seconds); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); + json.set("export_origin", String(magic_enum::enum_name(export_origin))); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -208,6 +219,13 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.write_full_path_in_iceberg_metadata = json->getValue("write_full_path_in_iceberg_metadata"); + /// Manifests written before this field existed default to `alter`. + if (json->has("export_origin")) + { + if (auto parsed = magic_enum::enum_cast(json->getValue("export_origin"))) + manifest.export_origin = *parsed; + } + return manifest; } }; diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 0ed8d1033135..db8d3da8d377 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -56,7 +56,10 @@ namespace auto & entries_by_key ) { - bool has_expired = metadata.create_time < now - static_cast(metadata.ttl_seconds); + /// Manifests submitted by the TTL scheduler are durable by design: the scheduler relies on the + /// last manifest for `(src, dest)` to know where to resume, so manifest-TTL eviction must skip them. + bool has_expired = metadata.export_origin != ExportOrigin::ttl + && metadata.create_time < now - static_cast(metadata.ttl_seconds); bool task_timed_out = is_pending && metadata.task_timeout_seconds > 0 @@ -545,6 +548,7 @@ std::vector ExportPartitionManifestUpdatingTask:: info.last_exception = last_exception; info.exception_part = exception_part; info.exception_count = exception_count; + info.export_origin = metadata.export_origin; infos.emplace_back(std::move(info)); } @@ -572,6 +576,7 @@ std::vector ExportPartitionManifestUpdatingTask:: info.parts_to_do = entry.manifest.parts.size(); info.parts = entry.manifest.parts; info.status = magic_enum::enum_name(entry.status); + info.export_origin = entry.manifest.export_origin; infos.emplace_back(std::move(info)); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6bf954a2bd89..07d27bcbba34 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8365,7 +8365,9 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & LOG_INFO(log, "Export with key {} has expiration time {}, now is {}", export_key, expiration_time, now); - if (static_cast(expiration_time) < now) + /// TTL-origin manifests are never considered expired here — only `export_merge_tree_partition_force_export` + /// can overwrite them. Conflict semantics for the alter-vs-ttl interplay are handled separately. + if (manifest.export_origin != ExportOrigin::ttl && static_cast(expiration_time) < now) { has_expired = true; } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index e088e4f77214..396164fa03e0 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include "Columns/ColumnString.h" @@ -42,6 +43,12 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio {"last_exception", std::make_shared(), "Last exception message of any part (not necessarily the last global exception)"}, {"exception_part", std::make_shared(), "Part that caused the last exception"}, {"exception_count", std::make_shared(), "Number of global exceptions"}, + {"export_origin", + std::make_shared(DataTypeEnum8::Values{ + {String(magic_enum::enum_name(ExportOrigin::alter)), static_cast(ExportOrigin::alter)}, + {String(magic_enum::enum_name(ExportOrigin::ttl)), static_cast(ExportOrigin::ttl)}, + }), + "Submitter of the manifest: `alter` for `ALTER ... EXPORT PARTITION`, `ttl` for the TTL scheduler."}, }; } @@ -144,6 +151,7 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu res_columns[i++]->insert(info.last_exception); res_columns[i++]->insert(info.exception_part); res_columns[i++]->insert(info.exception_count); + res_columns[i++]->insert(static_cast(info.export_origin)); } } } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index 15eb54f38c0e..e27545b5b584 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace DB @@ -24,6 +25,7 @@ struct ReplicatedPartitionExportInfo std::string last_exception; std::string exception_part; size_t exception_count = 0; + ExportOrigin export_origin = ExportOrigin::alter; }; class StorageSystemReplicatedPartitionExports final : public IStorageSystemOneBlock From 97d6ae15773ecae70c330bb4d8d4e65db872aa91 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 09:39:08 +0200 Subject: [PATCH 06/13] Add `export_merge_tree_partition_mark_as_ttl` and ttl-marker invariant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new query-level setting `export_merge_tree_partition_mark_as_ttl` (default false). When set on `ALTER ... EXPORT PARTITION`, the resulting manifest is written with `export_origin = ttl` (same as what the TTL scheduler will write in a follow-up). The TTL scheduler always sets this implicitly when it submits. Enforces the "at most one ttl-origin manifest per (src, dest)" invariant at submission time: when a ttl-origin manifest is being created, scan siblings under `/exports/` for an existing ttl-origin marker at a different `partition_id`. If found at `P_old`, reject the submission as a back-fill (`new < P_old`) unless `export_merge_tree_partition_force_export` is set; otherwise best-effort `tryRemoveRecursive` of the old marker before creating the new one. Same-key collisions continue to be handled by the existing block. A plain `alter` over a ttl marker at a different partition is allowed without friction — alter manifests coexist with the ttl marker, and the TTL scheduler will filter by `export_origin = ttl` when reading its own state. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Core/Settings.cpp | 3 + src/Storages/StorageReplicatedMergeTree.cpp | 67 ++++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index b292429b836f..f24ee27bda64 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7531,6 +7531,9 @@ Overwrite file if it already exists when exporting a merge tree part )", 0) \ DECLARE(Bool, export_merge_tree_partition_force_export, false, R"( Ignore existing partition export and overwrite the zookeeper entry +)", 0) \ + DECLARE(Bool, export_merge_tree_partition_mark_as_ttl, false, R"( +When set on `ALTER ... EXPORT PARTITION`, marks the manifest with `export_origin = 'ttl'` so it is treated as if submitted by the TTL scheduler: it is exempt from manifest-TTL eviction and participates in the cross-partition ordering check against other ttl-origin manifests. The TTL scheduler always sets this implicitly when it submits. )", 0) \ DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"( Maximum number of retries for exporting a merge tree part in an export partition task diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 07d27bcbba34..6a42fabf4d28 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -213,6 +213,7 @@ namespace Setting extern const SettingsBool update_sequential_consistency; extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsBool export_merge_tree_partition_force_export; + extern const SettingsBool export_merge_tree_partition_mark_as_ttl; extern const SettingsUInt64 export_merge_tree_partition_max_retries; extern const SettingsUInt64 export_merge_tree_partition_manifest_ttl; extern const SettingsUInt64 export_merge_tree_partition_task_timeout_seconds; @@ -8336,10 +8337,73 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & const auto exports_path = fs::path(zookeeper_path) / "exports"; - const auto export_key = partition_id + "_" + dest_storage_id.getQualifiedName().getFullName(); + const auto dest_full_name = dest_storage_id.getQualifiedName().getFullName(); + const auto export_key = partition_id + "_" + dest_full_name; const auto partition_exports_path = fs::path(exports_path) / export_key; + const auto new_export_origin = query_context->getSettingsRef()[Setting::export_merge_tree_partition_mark_as_ttl] + ? ExportOrigin::ttl : ExportOrigin::alter; + const bool force_export = query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export]; + + /// Maintain the "at most one ttl-origin manifest per (src, dest)" invariant: when submitting a ttl-origin + /// manifest, locate the current ttl marker (if any) at a different partition_id and either reject + /// (back-fill without force) or best-effort remove it before creating the new manifest. Same-partition + /// collisions are handled by the block below. + if (new_export_origin == ExportOrigin::ttl) + { + const auto dest_suffix = "_" + dest_full_name; + std::vector sibling_children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + zookeeper->tryGetChildren(exports_path, sibling_children); + + std::optional existing_ttl_partition_id; + fs::path existing_ttl_marker_path; + for (const auto & child : sibling_children) + { + if (!child.ends_with(dest_suffix)) + continue; + const String child_partition_id = child.substr(0, child.size() - dest_suffix.size()); + if (child_partition_id == partition_id) + continue; + + std::string metadata_json; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + if (!zookeeper->tryGet(fs::path(exports_path) / child / "metadata.json", metadata_json)) + continue; + + const auto sibling = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + if (sibling.export_origin != ExportOrigin::ttl) + continue; + + existing_ttl_partition_id = child_partition_id; + existing_ttl_marker_path = fs::path(exports_path) / child; + break; + } + + if (existing_ttl_partition_id) + { + if (partition_id < *existing_ttl_partition_id && !force_export) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "TTL-origin export of partition {} to {} would move the ttl marker backwards " + "(current marker is at partition {}). " + "Set `export_merge_tree_partition_force_export` to allow this.", + partition_id, dest_full_name, *existing_ttl_partition_id); + } + + LOG_INFO(log, + "Replacing ttl-origin marker for {} (partition {} -> {})", + dest_full_name, *existing_ttl_partition_id, partition_id); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); + zookeeper->tryRemoveRecursive(existing_ttl_marker_path); + } + } + /// check if entry already exists ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); @@ -8469,6 +8533,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value; manifest.write_full_path_in_iceberg_metadata = query_context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]; + manifest.export_origin = new_export_origin; if (dest_storage->isDataLake()) { From 77df5e6db4f103e1bc1ac5183aca84e2b0eb696a Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 09:51:22 +0200 Subject: [PATCH 07/13] Verify export destination compatibility at TTL DDL time Extracts the readable-vs-insertable column diff and the partition-key AST compare from `StorageReplicatedMergeTree::exportPartitionToTable` into `ExportPartitionUtils::verifyExportDestinationCompatibility`, and calls it from `TTLTableDescription::getTTLForTableFromAST` for every `TTLMode::EXPORT` clause when not attaching. The destination is resolved through `DatabaseCatalog::getTable`, matching the manual `ALTER ... EXPORT PARTITION` flow (throws `UNKNOWN_TABLE` if missing). The check is skipped under `is_attach=true` because the destination table may not yet be loaded at server startup; submission-time validation in `exportPartitionToTable` still covers that path. Iceberg destinations skip the partition-key AST compare here; the existing `verifyIcebergPartitionCompatibility` runs against the runtime iceberg metadata at submission time. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MergeTree/ExportPartitionUtils.cpp | 25 +++++++++++++++++++ src/Storages/MergeTree/ExportPartitionUtils.h | 17 ++++++++++++- .../MergeTree/registerStorageMergeTree.cpp | 4 ++- src/Storages/StorageReplicatedMergeTree.cpp | 22 ++++------------ src/Storages/TTLDescription.cpp | 18 +++++++++++++ 5 files changed, 67 insertions(+), 19 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index da11b5a11bbd..61251827cba9 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -3,8 +3,11 @@ #include #include #include +#include +#include "Storages/ColumnsDescription.h" #include "Storages/ExportReplicatedMergeTreePartitionManifest.h" #include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h" +#include "Storages/StorageInMemoryMetadata.h" #include #include #include @@ -32,6 +35,7 @@ namespace ErrorCodes { extern const int FAULT_INJECTED; extern const int BAD_ARGUMENTS; + extern const int INCOMPATIBLE_COLUMNS; extern const int NO_SUCH_DATA_PART; extern const int CORRUPTED_DATA; extern const int NETWORK_ERROR; @@ -47,6 +51,27 @@ namespace fs = std::filesystem; namespace ExportPartitionUtils { + void verifyExportDestinationCompatibility( + const ColumnsDescription & src_columns, + const ASTPtr & src_partition_key_ast, + const StorageInMemoryMetadata & dest_metadata, + const IStorage & dest_storage) + { + if (src_columns.getReadable().sizeOfDifference(dest_metadata.getColumns().getInsertable())) + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + + if (dest_storage.isDataLake()) + return; + + const auto ast_to_string = [](const ASTPtr & ast) -> String + { + return ast ? ast->formatWithSecretsOneLine() : ""; + }; + + if (ast_to_string(src_partition_key_ast) != ast_to_string(dest_metadata.getPartitionKeyAST())) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); + } + std::vector getPartitionValuesForIcebergCommit( MergeTreeData & storage, const String & partition_id) { diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index 411f3b5224be..c6cccfee5ad0 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -6,22 +6,37 @@ #include #include #include +#include #include "Storages/IStorage.h" #include #if USE_AVRO -#include #include #endif namespace DB { +class ColumnsDescription; class MergeTreeData; +struct StorageInMemoryMetadata; struct ExportReplicatedMergeTreePartitionManifest; namespace ExportPartitionUtils { + /// Verifies that the destination table is structurally compatible with the source so that + /// `EXPORT PARTITION` (manual or TTL-driven) can succeed: + /// - source readable columns must equal destination insertable columns (ephemeral columns excluded); + /// - for non-data-lake destinations, the partition key ASTs must match; + /// - for data-lake destinations, partition-key compatibility is verified later at submission time + /// by `verifyIcebergPartitionCompatibility` (it needs the runtime iceberg metadata). + /// Throws `INCOMPATIBLE_COLUMNS` or `BAD_ARGUMENTS` on mismatch. + void verifyExportDestinationCompatibility( + const ColumnsDescription & src_columns, + const ASTPtr & src_partition_key_ast, + const StorageInMemoryMetadata & dest_metadata, + const IStorage & dest_storage); + std::vector getExportedPaths(const LoggerPtr & log, const zkutil::ZooKeeperPtr & zk, const std::string & export_path); ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest); diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 18eb62740458..795a1b2be97e 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -709,8 +709,10 @@ static StoragePtr create(const StorageFactory::Arguments & args) if (args.storage_def->ttl_table) { + /// Local (query) context carries the user's `current_database`, which the EXPORT TTL + /// destination resolution needs to honour `EXPORT TO TABLE `. metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST( - args.storage_def->ttl_table->ptr(), metadata.columns, context, metadata.primary_key, metadata.partition_key, allow_suspicious_ttl); + args.storage_def->ttl_table->ptr(), metadata.columns, args.getLocalContext(), metadata.primary_key, metadata.partition_key, allow_suspicious_ttl); } storage_settings->loadFromQuery(*args.storage_def, context, LoadingStrictnessLevel::ATTACH <= args.mode); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6a42fabf4d28..55640218e061 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8309,27 +8309,15 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & if (!dest_storage->supportsImport(query_context)) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName()); - auto query_to_string = [] (const ASTPtr & ast) - { - return ast ? ast->formatWithSecretsOneLine() : ""; - }; - auto src_snapshot = getInMemoryMetadataPtr(); auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); - /// compare all source readable columns with all destination insertable columns - /// this allows us to skip ephemeral columns - if (src_snapshot->getColumns().getReadable().sizeOfDifference(destination_snapshot->getColumns().getInsertable())) - throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + ExportPartitionUtils::verifyExportDestinationCompatibility( + src_snapshot->getColumns(), + src_snapshot->getPartitionKeyAST(), + *destination_snapshot, + *dest_storage); - /// for data lakes this check is performed later. It is a bit more complex as we need to convert the iceberg partition spec - /// to the MergeTree partition spec and compare the two. - if (!dest_storage->isDataLake()) - { - if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); - } - zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly(); diff --git a/src/Storages/TTLDescription.cpp b/src/Storages/TTLDescription.cpp index 23ebbd55aaa6..d1ba08760727 100644 --- a/src/Storages/TTLDescription.cpp +++ b/src/Storages/TTLDescription.cpp @@ -2,8 +2,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -15,6 +17,8 @@ #include #include #include +#include +#include #include #include @@ -443,6 +447,20 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST( "Two TTL EXPORT clauses target the same destination {}", ttl.destination_name); } + /// Skip on ATTACH because the destination table may not yet be loaded at startup. + /// Submission-time validation in `exportPartitionToTable` still covers this path. + if (!is_attach) + { + const auto qualified = QualifiedTableName::parseFromString(ttl.destination_name); + const auto dest_database = context->resolveDatabase(qualified.database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, qualified.table}, context); + ExportPartitionUtils::verifyExportDestinationCompatibility( + columns, + partition_key.definition_ast, + *dest_storage->getInMemoryMetadataPtr(), + *dest_storage); + } + result.export_ttl.emplace_back(std::move(ttl)); } else From 7a4acf4106c08ba1595a07c0d4705036242860d4 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 10:52:01 +0200 Subject: [PATCH 08/13] Add `TTLExportScheduler` + table-level backoff settings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces `TTLExportScheduler`, a per-`StorageReplicatedMergeTree` background driver that submits partition exports for tables with `TTL ... EXPORT TO TABLE db.table`. The scheduler is stateless across restarts: it reads the latest `export_origin = ttl` manifest from ZooKeeper on every tick and acts on its status — no manifest → submit the smallest eligible partition; PENDING → wait; COMPLETED → walk forward to `partition_id > completed`; FAILED → resubmit with `force_export=1` after per-partition exponential backoff; KILLED → idle with a `LOG_WARNING` carrying the recovery recipe. `submit` classifies outcomes as `Submitted | Transient | Failure` so ZK CAS races and `UNKNOWN_TABLE` (destination dropped post-DDL) do not bump backoff, while genuine submission errors do. Adds three table-level settings used by the scheduler: `export_merge_tree_partition_ttl_poll_interval_seconds` (default 5), `export_merge_tree_partition_ttl_min_backoff_seconds` (default 1), `export_merge_tree_partition_ttl_max_backoff_seconds` (default 60). The scheduler is not yet wired into the background task pool; that follows in a separate commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Storages/MergeTree/MergeTreeSettings.cpp | 13 + src/Storages/MergeTree/TTLExportScheduler.cpp | 306 ++++++++++++++++++ src/Storages/MergeTree/TTLExportScheduler.h | 87 +++++ src/Storages/StorageReplicatedMergeTree.h | 1 + 4 files changed, 407 insertions(+) create mode 100644 src/Storages/MergeTree/TTLExportScheduler.cpp create mode 100644 src/Storages/MergeTree/TTLExportScheduler.h diff --git a/src/Storages/MergeTree/MergeTreeSettings.cpp b/src/Storages/MergeTree/MergeTreeSettings.cpp index a6ed4d9ec463..b2d95d8a4001 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -1657,6 +1657,19 @@ namespace ErrorCodes DECLARE(Bool, materialize_ttl_recalculate_only, false, R"( Only recalculate ttl info when MATERIALIZE TTL )", 0) \ + DECLARE(UInt64, export_merge_tree_partition_ttl_poll_interval_seconds, 5, R"( + Base interval (in seconds) at which the TTL export scheduler ticks for tables with + `TTL ... EXPORT TO TABLE ...`. The actual reschedule delay applies ±25% jitter on top. + )", 0) \ + DECLARE(UInt64, export_merge_tree_partition_ttl_min_backoff_seconds, 1, R"( + Initial backoff (in seconds) after a TTL-driven partition export reports `FAILED`. The + scheduler retries the same partition after the backoff elapses. The delay doubles on + each subsequent failure up to `export_merge_tree_partition_ttl_max_backoff_seconds`. + )", 0) \ + DECLARE(UInt64, export_merge_tree_partition_ttl_max_backoff_seconds, 60, R"( + Upper bound (in seconds) on the per-partition backoff used by the TTL export scheduler + after a `FAILED` manifest. See `export_merge_tree_partition_ttl_min_backoff_seconds`. + )", 0) \ DECLARE(Bool, enable_mixed_granularity_parts, true, R"( Enables or disables transitioning to control the granule size with the `index_granularity_bytes` setting. Before version 19.11, there was only the diff --git a/src/Storages/MergeTree/TTLExportScheduler.cpp b/src/Storages/MergeTree/TTLExportScheduler.cpp new file mode 100644 index 000000000000..42169a1b4326 --- /dev/null +++ b/src/Storages/MergeTree/TTLExportScheduler.cpp @@ -0,0 +1,306 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace fs = std::filesystem; + +namespace ProfileEvents +{ + extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; + extern const Event ExportPartitionZooKeeperGetChildren; +} + +namespace DB +{ + +namespace ServerSetting +{ + extern const ServerSettingsBool allow_experimental_export_merge_tree_partition; +} + +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsUInt64 export_merge_tree_partition_ttl_min_backoff_seconds; + extern const MergeTreeSettingsUInt64 export_merge_tree_partition_ttl_max_backoff_seconds; +} + +namespace ErrorCodes +{ + extern const int UNKNOWN_TABLE; +} + +namespace +{ + +/// Uniform ±25% jitter multiplier. +double jitter25() +{ + thread_local std::mt19937 rng{std::random_device{}()}; + std::uniform_real_distribution dist(0.75, 1.25); + return dist(rng); +} + +time_t computeBackoffDelay(size_t tries, UInt64 min_seconds, UInt64 max_seconds) +{ + if (tries == 0) + tries = 1; + /// Cap the shift to avoid UB; anything past 63 saturates to max anyway. + const size_t shift = std::min(tries - 1, 63); + UInt64 base = min_seconds << shift; + if (base == 0 || base > max_seconds) + base = max_seconds; + return static_cast(static_cast(base) * jitter25()); +} + +} + +TTLExportScheduler::TTLExportScheduler(StorageReplicatedMergeTree & storage_) + : storage(storage_) + , log(getLogger(storage.getStorageID().getNameForLogs() + " (TTLExport)")) +{ +} + +void TTLExportScheduler::run() +{ + if (storage.is_readonly || storage.shutdown_called) + return; + + auto metadata = storage.getInMemoryMetadataPtr(); + if (!metadata->hasAnyExportTTL()) + return; + + const auto global_context = Context::getGlobalContextInstance(); + if (!global_context->getServerSettings()[ServerSetting::allow_experimental_export_merge_tree_partition]) + return; + + for (const auto & export_ttl : metadata->getExportTTLs()) + { + try + { + processExportTTL(export_ttl); + } + catch (const Coordination::Exception &) + { + tryLogCurrentException(log, "ZK race while processing TTL export; will retry on next tick"); + } + catch (...) + { + tryLogCurrentException(log, "Unhandled exception while processing TTL export"); + } + } +} + +std::optional TTLExportScheduler::findTtlMarker( + const String & dest_database, const String & dest_table) +{ + auto zk = storage.getZooKeeper(); + const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; + const String dest_full = dest_database + "." + dest_table; + const String dest_suffix = "_" + dest_full; + + std::vector children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + zk->tryGetChildren(exports_path.string(), children); + + std::optional latest; + for (const auto & child : children) + { + if (!child.ends_with(dest_suffix)) + continue; + const auto child_path = exports_path / child; + + std::string metadata_json; + std::string status_str; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests, 2); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, 2); + if (!zk->tryGet((child_path / "metadata.json").string(), metadata_json)) + continue; + if (!zk->tryGet((child_path / "status").string(), status_str)) + continue; + + const auto manifest = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + if (manifest.export_origin != ExportOrigin::ttl) + continue; + + if (!latest || manifest.create_time > latest->create_time) + latest = TtlMarker{manifest.partition_id, status_str, manifest.create_time}; + } + return latest; +} + +std::optional TTLExportScheduler::pickPartition( + const TTLDescription & export_ttl, const std::optional & floor) +{ + const auto active_parts = storage.getDataPartsVectorForInternalUsage(); + if (active_parts.empty()) + return std::nullopt; + + std::map by_partition; + for (const auto & part : active_parts) + by_partition[part->info.getPartitionId()].push_back(part); + + const auto now = time(nullptr); + std::optional best; + std::optional best_max; + + for (const auto & [pid, parts] : by_partition) + { + if (floor && pid <= *floor) + continue; + const auto max_ttl = getPartitionExportTTLMax(export_ttl, parts); + if (!max_ttl || *max_ttl >= now) + continue; + if (!best_max || *max_ttl < *best_max) + { + best = pid; + best_max = max_ttl; + } + } + return best; +} + +TTLExportScheduler::SubmitResult TTLExportScheduler::submit( + const String & dest_database, const String & dest_table, const String & partition_id, bool force) +{ + auto cmd_context = Context::createCopy(storage.getContext()); + cmd_context->setSetting("export_merge_tree_partition_mark_as_ttl", true); + if (force) + cmd_context->setSetting("export_merge_tree_partition_force_export", true); + + PartitionCommand cmd; + cmd.type = PartitionCommand::EXPORT_PARTITION; + cmd.to_database = dest_database; + cmd.to_table = dest_table; + auto partition_ast = make_intrusive(); + partition_ast->setPartitionID(make_intrusive(partition_id)); + cmd.partition = partition_ast; + + try + { + storage.exportPartitionToTable(cmd, cmd_context); + LOG_INFO(log, "Submitted TTL export of partition {} to {}.{} (force={})", + partition_id, dest_database, dest_table, force); + return SubmitResult::Submitted; + } + catch (const Coordination::Exception &) + { + tryLogCurrentException(log, "ZK race while submitting TTL export"); + return SubmitResult::Transient; + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_TABLE) + { + LOG_WARNING(log, "TTL EXPORT destination {}.{} disappeared at submit time: {}", + dest_database, dest_table, e.message()); + return SubmitResult::Transient; + } + tryLogCurrentException(log, "TTL export submission failed"); + return SubmitResult::Failure; + } + catch (...) + { + tryLogCurrentException(log, "TTL export submission failed"); + return SubmitResult::Failure; + } +} + +void TTLExportScheduler::processExportTTL(const TTLDescription & export_ttl) +{ + const auto context = storage.getContext(); + const auto qualified = QualifiedTableName::parseFromString(export_ttl.destination_name); + const auto dest_database = context->resolveDatabase(qualified.database); + const auto & dest_table = qualified.table; + const String dest_full = dest_database + "." + dest_table; + + if (!DatabaseCatalog::instance().tryGetTable({dest_database, dest_table}, context)) + { + LOG_WARNING(log, "TTL EXPORT destination {} does not exist; will retry on next tick", dest_full); + return; + } + + const auto marker = findTtlMarker(dest_database, dest_table); + + if (!marker) + { + if (auto pid = pickPartition(export_ttl, std::nullopt)) + (void)submit(dest_database, dest_table, *pid, /* force = */ false); + return; + } + + if (marker->status == "PENDING") + return; + + if (marker->status == "COMPLETED") + { + if (auto pid = pickPartition(export_ttl, marker->partition_id)) + (void)submit(dest_database, dest_table, *pid, /* force = */ false); + return; + } + + if (marker->status == "FAILED") + { + const BackoffKey key{marker->partition_id, dest_database, dest_table}; + const auto now = time(nullptr); + auto & state = backoff[key]; + if (now < state.next_attempt_at) + return; + + const auto result = submit(dest_database, dest_table, marker->partition_id, /* force = */ true); + if (result == SubmitResult::Submitted) + { + backoff.erase(key); + } + else if (result == SubmitResult::Failure) + { + const auto settings = storage.getSettings(); + const auto min_s = (*settings)[MergeTreeSetting::export_merge_tree_partition_ttl_min_backoff_seconds]; + const auto max_s = (*settings)[MergeTreeSetting::export_merge_tree_partition_ttl_max_backoff_seconds]; + state.tries += 1; + const auto delay = computeBackoffDelay(state.tries, min_s, max_s); + state.next_attempt_at = now + delay; + LOG_INFO(log, "TTL export of partition {} to {} failed (try {}); next attempt in {}s", + marker->partition_id, dest_full, state.tries, delay); + } + return; + } + + if (marker->status == "KILLED") + { + LOG_WARNING(log, + "TTL export scheduler is idle for {}: most recent ttl-origin manifest at partition {} is KILLED. " + "To retry: `ALTER TABLE {} EXPORT PARTITION '{}' TO TABLE {} SETTINGS " + "export_merge_tree_partition_mark_as_ttl=1, export_merge_tree_partition_force_export=1`. " + "Or advance past it by exporting a newer partition with mark_as_ttl=1.", + dest_full, marker->partition_id, + storage.getStorageID().getFullTableName(), marker->partition_id, dest_full); + return; + } + + LOG_WARNING(log, "Unrecognised TTL export status `{}` for partition {} of {}; ignoring", + marker->status, marker->partition_id, dest_full); +} + +} diff --git a/src/Storages/MergeTree/TTLExportScheduler.h b/src/Storages/MergeTree/TTLExportScheduler.h new file mode 100644 index 000000000000..7be17c945065 --- /dev/null +++ b/src/Storages/MergeTree/TTLExportScheduler.h @@ -0,0 +1,87 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class StorageReplicatedMergeTree; +struct TTLDescription; + +/// Drives automatic partition exports for `TTL ... EXPORT TO TABLE db.table` clauses. +/// One instance per `StorageReplicatedMergeTree`; the background task pool calls `run` +/// on each tick. The scheduler is stateless across restarts — the manifest in ZooKeeper +/// is the source of truth and per-tick state is rebuilt from it. Only the per-partition +/// retry backoff is held in memory. +class TTLExportScheduler +{ +public: + explicit TTLExportScheduler(StorageReplicatedMergeTree & storage_); + + /// One tick: iterate every EXPORT TTL on the source table. For each destination, look up + /// the most recent ttl-origin manifest and act on it: + /// - no manifest → submit the oldest eligible partition (smallest expiration max); + /// - PENDING → wait; + /// - COMPLETED → walk forward, submit the next eligible partition with + /// `partition_id > completed`; + /// - FAILED → respect per-partition in-memory backoff; on elapse resubmit the + /// same partition with `force_export=1` to overwrite the failed manifest; + /// - KILLED → idle; log a recovery recipe. The operator either retries the killed + /// partition with `force_export=1` or steps past it by exporting a newer + /// partition with `mark_as_ttl=1`. + /// Exceptions surface as either transient (ZK race → next tick retries without bumping + /// backoff) or terminal (logged; backoff bumped if applicable). `UNKNOWN_TABLE` at submit + /// time is a soft failure — the destination disappeared post-DDL. + void run(); + +private: + struct BackoffState + { + size_t tries = 0; + time_t next_attempt_at = 0; + }; + + using BackoffKey = std::tuple; /// (partition_id, dest_db, dest_table) + + struct TtlMarker + { + String partition_id; + String status; + time_t create_time = 0; + }; + + StorageReplicatedMergeTree & storage; + LoggerPtr log; + std::map backoff; + + void processExportTTL(const TTLDescription & export_ttl); + + /// Locate the current ttl-origin manifest for `(this src, dest)`. The submission-time + /// invariant keeps this set bounded; on transient races where more than one ttl-origin + /// manifest coexists, the most recent by `create_time` is returned. + std::optional findTtlMarker(const String & dest_database, const String & dest_table); + + /// Return the eligible partition with the smallest expiration max. If `floor` is set, + /// only partitions with `partition_id > *floor` are considered — that's how forward-walking + /// is enforced from a COMPLETED marker. + std::optional pickPartition(const TTLDescription & export_ttl, const std::optional & floor); + + enum class SubmitResult + { + Submitted, /// Manifest was created; backoff (if any) should be reset. + Transient, /// Lost a ZK race or destination missing at submit time — neither failure + /// nor success; let the next tick retry without bumping backoff. + Failure, /// Any other exception. Caller should bump backoff if it tracks one. + }; + + /// Submit a TTL-driven export via `exportPartitionToTable`. Catches and classifies the + /// outcome; the caller decides whether to bump backoff. Diagnostics are logged here so + /// each call site stays focused on state transitions. + SubmitResult submit(const String & dest_database, const String & dest_table, const String & partition_id, bool force); +}; + +} diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2cda66289c6c..832cb4fa4767 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -403,6 +403,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData friend class ReplicatedMergeMutateTaskBase; friend class ExportPartitionManifestUpdatingTask; friend class ExportPartitionTaskScheduler; + friend class TTLExportScheduler; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry; From f7d8a2768b6cca4de47559c8f670b25a532f35cd Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 12:20:50 +0200 Subject: [PATCH 09/13] Wire `TTLExportScheduler` into `StorageReplicatedMergeTree` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Declare the scheduler and its background task next to the other `export_merge_tree_partition_*` task holders. Under the `allow_experimental_export_merge_tree_partition` server gate: - Construct the scheduler and create the `TTLExport` task, logging any exceptions from `run` via `tryLogCurrentException`. - `ReplicatedMergeTreeRestartingThread::tryStartup` activates the task alongside the other export tasks; `partialShutdown` deactivates it. - `alter` calls `ttl_export_task->schedule` when any `MODIFY TTL` command is in the alter so newly added EXPORT TTLs take effect immediately. - `TTLExportScheduler::run` reschedules itself with `export_merge_tree_partition_ttl_poll_interval_seconds * jitter25` on the polling path. Early returns (shutdown, readonly, no EXPORT TTL, experimental gate off) intentionally skip the reschedule — deactivation, the `alter` hook, and server-level config drive those paths instead. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ReplicatedMergeTreeRestartingThread.cpp | 1 + src/Storages/MergeTree/TTLExportScheduler.cpp | 17 +++++++++++- src/Storages/StorageReplicatedMergeTree.cpp | 26 +++++++++++++++++++ src/Storages/StorageReplicatedMergeTree.h | 4 +++ 4 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index d06b6d7a9d24..00821b59487f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -191,6 +191,7 @@ bool ReplicatedMergeTreeRestartingThread::runImpl() storage.export_merge_tree_partition_updating_task->activateAndSchedule(); storage.export_merge_tree_partition_select_task->activateAndSchedule(); storage.export_merge_tree_partition_status_handling_task->activateAndSchedule(); + storage.ttl_export_task->activateAndSchedule(); } storage.cleanup_thread.start(); diff --git a/src/Storages/MergeTree/TTLExportScheduler.cpp b/src/Storages/MergeTree/TTLExportScheduler.cpp index 42169a1b4326..572573a05421 100644 --- a/src/Storages/MergeTree/TTLExportScheduler.cpp +++ b/src/Storages/MergeTree/TTLExportScheduler.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -41,6 +42,7 @@ namespace ServerSetting namespace MergeTreeSetting { + extern const MergeTreeSettingsUInt64 export_merge_tree_partition_ttl_poll_interval_seconds; extern const MergeTreeSettingsUInt64 export_merge_tree_partition_ttl_min_backoff_seconds; extern const MergeTreeSettingsUInt64 export_merge_tree_partition_ttl_max_backoff_seconds; } @@ -83,7 +85,15 @@ TTLExportScheduler::TTLExportScheduler(StorageReplicatedMergeTree & storage_) void TTLExportScheduler::run() { - if (storage.is_readonly || storage.shutdown_called) + auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::ttl_export_task"); + + /// Early returns intentionally skip the reschedule at the bottom: + /// - shutdown_called: the task will be deactivated by `partialShutdown`. + /// - is_readonly: `ReplicatedMergeTreeRestartingThread` deactivates and reactivates these + /// tasks across readonly transitions. + /// - no export TTL: the `alter` path calls `schedule()` when a TTL is added. + /// - experimental gate off: not toggleable at runtime in practice. + if (storage.shutdown_called || storage.is_readonly) return; auto metadata = storage.getInMemoryMetadataPtr(); @@ -109,6 +119,11 @@ void TTLExportScheduler::run() tryLogCurrentException(log, "Unhandled exception while processing TTL export"); } } + + const auto settings = storage.getSettings(); + const UInt64 poll_seconds = (*settings)[MergeTreeSetting::export_merge_tree_partition_ttl_poll_interval_seconds]; + const auto delay_ms = static_cast(static_cast(poll_seconds) * 1000.0 * jitter25()); + storage.ttl_export_task->scheduleAfter(delay_ms); } std::optional TTLExportScheduler::findTtlMarker( diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 55640218e061..351b3c0c0341 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -544,6 +544,23 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( getStorageID(), getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_select_task)", [this] { selectPartsToExport(); }); export_merge_tree_partition_select_task->deactivate(); + + ttl_export_scheduler = std::make_unique(*this); + + ttl_export_task = getContext()->getSchedulePool().createTask( + getStorageID(), getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::ttl_export_task)", [this] + { + try + { + ttl_export_scheduler->run(); + } + catch (...) + { + tryLogCurrentException(log); + } + }); + + ttl_export_task->deactivate(); } @@ -6061,6 +6078,7 @@ void StorageReplicatedMergeTree::partialShutdown() export_merge_tree_partition_updating_task->deactivate(); export_merge_tree_partition_select_task->deactivate(); export_merge_tree_partition_status_handling_task->deactivate(); + ttl_export_task->deactivate(); } cleanup_thread.stop(); @@ -7116,6 +7134,14 @@ void StorageReplicatedMergeTree::alter( waitMutation(*mutation_znode, query_settings[Setting::alter_sync]); LOG_DEBUG(log, "Data changes applied."); } + + if (ttl_export_task) + { + const bool ttl_changed = std::any_of(commands.begin(), commands.end(), + [](const AlterCommand & c) { return c.type == AlterCommand::MODIFY_TTL; }); + if (ttl_changed) + ttl_export_task->schedule(); + } } /// If new version returns ordinary name, else returns part name containing the first and last month of the month diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 832cb4fa4767..314c4046f27c 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -530,6 +531,9 @@ class StorageReplicatedMergeTree final : public MergeTreeData BackgroundSchedulePoolTaskHolder export_merge_tree_partition_select_task; + std::unique_ptr ttl_export_scheduler; + BackgroundSchedulePoolTaskHolder ttl_export_task; + ExportPartitionTaskEntriesContainer export_merge_tree_partition_task_entries; // Convenience references to indexes From b876bb34dd395a34eea962e84ad25e83ffa32617 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 18:31:22 +0200 Subject: [PATCH 10/13] fix a test broken during rebase --- tests/integration/test_ttl_export_partition/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_ttl_export_partition/test.py b/tests/integration/test_ttl_export_partition/test.py index ad2a1a26eb02..2e87fc49d990 100644 --- a/tests/integration/test_ttl_export_partition/test.py +++ b/tests/integration/test_ttl_export_partition/test.py @@ -128,10 +128,10 @@ def test_basic_to_iceberg(cluster): # Iceberg only accepts its own partition transforms (`toYearNumSinceEpoch`, not `toYear`) # and only signed integer types, so source and destination must be created inline to match. - make_iceberg_s3(node, dst, "event_date Date, id Int64", partition_by="toYearNumSinceEpoch(event_date)") + make_iceberg_s3(node, dst, "event_date Date, id Int64, year Int32", partition_by="toYearNumSinceEpoch(event_date)") node.query( f""" - CREATE TABLE {src} (event_date Date, id Int64) + CREATE TABLE {src} (event_date Date, id Int64, year Int32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{database}}/{src}', '{node.name}') PARTITION BY toYearNumSinceEpoch(event_date) ORDER BY id TTL event_date + INTERVAL 1 DAY EXPORT TO TABLE {dst} From 3e6e29e556b156ebaed73b614c62229001dab748 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 18:32:02 +0200 Subject: [PATCH 11/13] use export_merge_tree_partition_task_entries instead of requests to zookeeper --- src/Storages/MergeTree/TTLExportScheduler.cpp | 57 ++++++------------- src/Storages/StorageReplicatedMergeTree.cpp | 39 +++++-------- 2 files changed, 30 insertions(+), 66 deletions(-) diff --git a/src/Storages/MergeTree/TTLExportScheduler.cpp b/src/Storages/MergeTree/TTLExportScheduler.cpp index 572573a05421..a102cd3a4663 100644 --- a/src/Storages/MergeTree/TTLExportScheduler.cpp +++ b/src/Storages/MergeTree/TTLExportScheduler.cpp @@ -8,29 +8,19 @@ #include #include #include +#include #include #include #include #include #include #include -#include #include -#include -#include #include -#include -#include - -namespace fs = std::filesystem; +#include -namespace ProfileEvents -{ - extern const Event ExportPartitionZooKeeperRequests; - extern const Event ExportPartitionZooKeeperGet; - extern const Event ExportPartitionZooKeeperGetChildren; -} +#include namespace DB { @@ -85,8 +75,6 @@ TTLExportScheduler::TTLExportScheduler(StorageReplicatedMergeTree & storage_) void TTLExportScheduler::run() { - auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::ttl_export_task"); - /// Early returns intentionally skip the reschedule at the bottom: /// - shutdown_called: the task will be deactivated by `partialShutdown`. /// - is_readonly: `ReplicatedMergeTreeRestartingThread` deactivates and reactivates these @@ -129,38 +117,25 @@ void TTLExportScheduler::run() std::optional TTLExportScheduler::findTtlMarker( const String & dest_database, const String & dest_table) { - auto zk = storage.getZooKeeper(); - const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; - const String dest_full = dest_database + "." + dest_table; - const String dest_suffix = "_" + dest_full; - - std::vector children; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); - zk->tryGetChildren(exports_path.string(), children); + std::lock_guard lock(storage.export_merge_tree_partition_mutex); std::optional latest; - for (const auto & child : children) + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) { - if (!child.ends_with(dest_suffix)) - continue; - const auto child_path = exports_path / child; - - std::string metadata_json; - std::string status_str; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests, 2); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, 2); - if (!zk->tryGet((child_path / "metadata.json").string(), metadata_json)) + if (entry.manifest.export_origin != ExportOrigin::ttl) continue; - if (!zk->tryGet((child_path / "status").string(), status_str)) + if (entry.manifest.destination_database != dest_database + || entry.manifest.destination_table != dest_table) continue; - const auto manifest = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - if (manifest.export_origin != ExportOrigin::ttl) - continue; - - if (!latest || manifest.create_time > latest->create_time) - latest = TtlMarker{manifest.partition_id, status_str, manifest.create_time}; + if (!latest || entry.manifest.create_time > latest->create_time) + { + latest = TtlMarker{ + entry.manifest.partition_id, + String(magic_enum::enum_name(entry.status)), + entry.manifest.create_time + }; + } } return latest; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 351b3c0c0341..7d8d57fdba31 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8366,35 +8366,24 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & /// collisions are handled by the block below. if (new_export_origin == ExportOrigin::ttl) { - const auto dest_suffix = "_" + dest_full_name; - std::vector sibling_children; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); - zookeeper->tryGetChildren(exports_path, sibling_children); - std::optional existing_ttl_partition_id; fs::path existing_ttl_marker_path; - for (const auto & child : sibling_children) { - if (!child.ends_with(dest_suffix)) - continue; - const String child_partition_id = child.substr(0, child.size() - dest_suffix.size()); - if (child_partition_id == partition_id) - continue; - - std::string metadata_json; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (!zookeeper->tryGet(fs::path(exports_path) / child / "metadata.json", metadata_json)) - continue; - - const auto sibling = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - if (sibling.export_origin != ExportOrigin::ttl) - continue; + std::lock_guard task_entries_lock(export_merge_tree_partition_mutex); + for (const auto & entry : export_merge_tree_partition_task_entries_by_key) + { + if (entry.manifest.export_origin != ExportOrigin::ttl) + continue; + if (entry.manifest.destination_database != dest_database + || entry.manifest.destination_table != dest_table) + continue; + if (entry.manifest.partition_id == partition_id) + continue; - existing_ttl_partition_id = child_partition_id; - existing_ttl_marker_path = fs::path(exports_path) / child; - break; + existing_ttl_partition_id = entry.manifest.partition_id; + existing_ttl_marker_path = fs::path(exports_path) / entry.getCompositeKey(); + break; + } } if (existing_ttl_partition_id) From 00657632397dddef4c0e7644c527a06116cecfa3 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 19:37:49 +0200 Subject: [PATCH 12/13] Defer ttl-marker cleanup until after the new manifest is durable In `exportPartitionToTable`, the cross-partition swap used to delete the existing ttl-origin marker before the `tryMulti` that creates the new manifest. Any throw between the delete and the multi (parts.empty, pending mutations, iceberg compatibility, or a ZK error mid-multi) left the scheduler with no ttl marker at all, so the next tick of `TTLExportScheduler` would treat the table as fresh and restart from the oldest expired partition. Now: - The sibling cache walk collects every stale ttl-origin entry for the destination, not just the first one encountered. The freshest by `create_time` is used for the back-fill check; the others are kept for cleanup. This makes the walk deterministic (no break-on-first- match in unspecified iteration order) and self-healing for any stragglers a previous cleanup may have missed. - The `tryRemoveRecursive` of those stale markers runs after the `tryMulti` succeeds. Failures during validation or in the multi itself now leave the existing marker intact, so the scheduler keeps its high-water mark. - The post-multi cleanup is best-effort; a ZK error there at worst leaves dead nodes, which the next ttl submission will reap. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Storages/StorageReplicatedMergeTree.cpp | 55 +++++++++++++-------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7d8d57fdba31..098a66527db7 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8360,14 +8360,18 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & ? ExportOrigin::ttl : ExportOrigin::alter; const bool force_export = query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export]; - /// Maintain the "at most one ttl-origin manifest per (src, dest)" invariant: when submitting a ttl-origin - /// manifest, locate the current ttl marker (if any) at a different partition_id and either reject - /// (back-fill without force) or best-effort remove it before creating the new manifest. Same-partition - /// collisions are handled by the block below. + /// Cross-partition ttl-marker invariant. When submitting a ttl-origin manifest, collect every + /// existing ttl-origin marker at a different partition_id from the in-memory cache. The latest + /// (by create_time) is the current marker; the new partition must not move it backwards unless + /// `force_export` is set. Stale markers are recorded for best-effort removal AFTER the new + /// manifest is durably created — deleting before would risk losing the scheduler's high-water + /// mark if a subsequent step throws. Recording all stale markers (not just one) is the + /// self-healing path for any stragglers a previous post-multi cleanup left behind. + std::vector stale_ttl_marker_paths; if (new_export_origin == ExportOrigin::ttl) { std::optional existing_ttl_partition_id; - fs::path existing_ttl_marker_path; + time_t existing_ttl_create_time = 0; { std::lock_guard task_entries_lock(export_merge_tree_partition_mutex); for (const auto & entry : export_merge_tree_partition_task_entries_by_key) @@ -8380,30 +8384,29 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & if (entry.manifest.partition_id == partition_id) continue; - existing_ttl_partition_id = entry.manifest.partition_id; - existing_ttl_marker_path = fs::path(exports_path) / entry.getCompositeKey(); - break; + stale_ttl_marker_paths.push_back(fs::path(exports_path) / entry.getCompositeKey()); + if (!existing_ttl_partition_id || entry.manifest.create_time > existing_ttl_create_time) + { + existing_ttl_partition_id = entry.manifest.partition_id; + existing_ttl_create_time = entry.manifest.create_time; + } } } - if (existing_ttl_partition_id) + if (existing_ttl_partition_id && partition_id < *existing_ttl_partition_id && !force_export) { - if (partition_id < *existing_ttl_partition_id && !force_export) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "TTL-origin export of partition {} to {} would move the ttl marker backwards " - "(current marker is at partition {}). " - "Set `export_merge_tree_partition_force_export` to allow this.", - partition_id, dest_full_name, *existing_ttl_partition_id); - } + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "TTL-origin export of partition {} to {} would move the ttl marker backwards " + "(current marker is at partition {}). " + "Set `export_merge_tree_partition_force_export` to allow this.", + partition_id, dest_full_name, *existing_ttl_partition_id); + } + if (existing_ttl_partition_id) + { LOG_INFO(log, "Replacing ttl-origin marker for {} (partition {} -> {})", dest_full_name, *existing_ttl_partition_id, partition_id); - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); - zookeeper->tryRemoveRecursive(existing_ttl_marker_path); } } @@ -8647,6 +8650,16 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & } throw zkutil::KeeperException::fromPath(code, partition_exports_path); } + + /// Best-effort cleanup of stale ttl-origin markers (cross-partition replacement). The new + /// manifest is durable; failures here at worst leave dead nodes that the next ttl submission + /// will reap. We deliberately do not throw if this fails. + for (const auto & stale_path : stale_ttl_marker_paths) + { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); + zookeeper->tryRemoveRecursive(stale_path); + } } From e7e728771adaaf2407ddc0595457c7d7499ac053 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Wed, 20 May 2026 19:42:16 +0200 Subject: [PATCH 13/13] Register new TTL EXPORT settings in SettingsChangesHistory Add `export_merge_tree_partition_mark_as_ttl` to the Antalya 26.3 session block, and the three TTL EXPORT scheduler MergeTree settings (`export_merge_tree_partition_ttl_poll_interval_seconds`, `export_merge_tree_partition_ttl_min_backoff_seconds`, `export_merge_tree_partition_ttl_max_backoff_seconds`) to the 26.3 MergeTree block, so `02995_new_settings_history` passes. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Core/SettingsChangesHistory.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9520753d7864..2c3789302420 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -42,6 +42,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya", { {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, + {"export_merge_tree_partition_mark_as_ttl", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "26.3", { @@ -1151,6 +1152,9 @@ const VersionToSettingsChangesMap & getMergeTreeSettingsChangesHistory() { addSettingsChanges(merge_tree_settings_changes_history, "26.3", { + {"export_merge_tree_partition_ttl_poll_interval_seconds", 5, 5, "New setting for the TTL EXPORT scheduler poll interval."}, + {"export_merge_tree_partition_ttl_min_backoff_seconds", 1, 1, "New setting for the TTL EXPORT scheduler minimum backoff."}, + {"export_merge_tree_partition_ttl_max_backoff_seconds", 60, 60, "New setting for the TTL EXPORT scheduler maximum backoff."}, {"vertical_merge_optimize_ttl_delete", false, true, "Allow vertical merge algorithm for merges that need to remove rows expired by TTL"}, {"shared_merge_tree_replica_set_max_lifetime_seconds", 300, 300, "New setting"}, {"auto_statistics_types", "", "minmax, uniq", "Enable auto statistics by default"},