Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7531,6 +7531,9 @@ Overwrite file if it already exists when exporting a merge tree part
)", 0) \
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
DECLARE(Bool, export_merge_tree_partition_mark_as_ttl, false, R"(
When set on `ALTER ... EXPORT PARTITION`, marks the manifest with `export_origin = 'ttl'` so it is treated as if submitted by the TTL scheduler: it is exempt from manifest-TTL eviction and participates in the cross-partition ordering check against other ttl-origin manifests. The TTL scheduler always sets this implicitly when it submits.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
Maximum number of retries for exporting a merge tree part in an export partition task
Expand Down
5 changes: 3 additions & 2 deletions src/Databases/DatabasesCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ void validateCreateQuery(const ASTCreateQuery & query, ContextPtr context)
primary_key = KeyDescription::getKeyFromAST(storage.order_by->ptr(), columns_desc, context);
if (storage.primary_key)
primary_key = KeyDescription::getKeyFromAST(storage.primary_key->ptr(), columns_desc, context);
KeyDescription partition_key;
if (storage.partition_by)
KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context);
partition_key = KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context);
if (storage.sample_by)
KeyDescription::getKeyFromAST(storage.sample_by->ptr(), columns_desc, context);
if (storage.ttl_table && primary_key.has_value())
TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, true);
TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, partition_key, true);
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/Parsers/ASTTTLElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settin
ostr << " RECOMPRESS ";
recompression_codec->format(ostr, settings, state, frame);
}
else if (mode == TTLMode::EXPORT)
{
if (destination_type != DataDestinationType::TABLE)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unsupported destination type {} for TTL EXPORT",
magic_enum::enum_name(destination_type));

ostr << " EXPORT TO TABLE ";
auto dot_pos = destination_name.find('.');
if (dot_pos == String::npos)
ostr << backQuoteIfNeed(destination_name);
else
ostr << backQuoteIfNeed(std::string_view(destination_name).substr(0, dot_pos))
<< '.'
<< backQuoteIfNeed(std::string_view(destination_name).substr(dot_pos + 1));
}
else if (mode == TTLMode::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ namespace DB
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(EXPORT_TO_TABLE, "EXPORT TO TABLE") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
16 changes: 16 additions & 0 deletions src/Parsers/ExpressionElementParsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>

#include <Interpreters/StorageID.h>

Expand Down Expand Up @@ -2449,6 +2450,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_set(Keyword::SET);
ParserKeyword s_recompress(Keyword::RECOMPRESS);
ParserKeyword s_codec(Keyword::CODEC);
ParserKeyword s_export_to_table(Keyword::EXPORT_TO_TABLE);
ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL);
ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL);
ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL);
Expand Down Expand Up @@ -2496,6 +2498,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
mode = TTLMode::RECOMPRESS;
}
else if (s_export_to_table.ignore(pos, expected))
{
mode = TTLMode::EXPORT;
destination_type = DataDestinationType::TABLE;
}
else
{
/// DELETE is the default mode.
Expand Down Expand Up @@ -2547,6 +2554,15 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parser_codec.parse(pos, recompression_codec, expected))
return false;
}
else if (mode == TTLMode::EXPORT)
{
String dst_database;
String dst_table;
if (!parseDatabaseAndTableName(pos, expected, dst_database, dst_table))
return false;

destination_name = dst_database.empty() ? dst_table : dst_database + "." + dst_table;
}

auto ttl_element = make_intrusive<ASTTTLElement>(mode, destination_type, destination_name, if_exists);
ttl_element->setTTL(std::move(ttl_expr));
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) cons
data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
else if (ttl_update_field == TTLUpdateField::EXPORT_TTL)
{
data_part->ttl_infos.export_ttl[ttl_update_key] = new_ttl_info;
}

}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ enum class TTLUpdateField : uint8_t
MOVES_TTL,
RECOMPRESSION_TTL,
GROUP_BY_TTL,
EXPORT_TTL,
};

/// Calculates new ttl_info and does nothing with data.
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Transforms/TTLCalcTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ TTLCalcTransform::TTLCalcTransform(
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));

for (const auto & export_ttl : metadata_snapshot_->getExportTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(export_ttl, subqueries_for_sets, context), export_ttl,
TTLUpdateField::EXPORT_TTL, export_ttl.destination_name, old_ttl_infos.export_ttl[export_ttl.destination_name], current_time_, force_));
}

void TTLCalcTransform::consume(Chunk chunk)
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Transforms/TTLTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ TTLTransform::TTLTransform(
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));

for (const auto & export_ttl : metadata_snapshot_->getExportTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(export_ttl, subqueries_for_sets, context), export_ttl,
TTLUpdateField::EXPORT_TTL, export_ttl.destination_name, old_ttl_infos.export_ttl[export_ttl.destination_name], current_time_, force_));
}

Block reorderColumns(Block block, const Block & header)
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/AlterCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,8 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else if (type == MODIFY_TTL)
{
metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST(
ttl, metadata.columns, context, metadata.primary_key, context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);
ttl, metadata.columns, context, metadata.primary_key, metadata.partition_key,
context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);
}
else if (type == REMOVE_TTL)
{
Expand Down Expand Up @@ -1393,6 +1394,7 @@ void AlterCommands::apply(StorageInMemoryMetadata & metadata, ContextPtr context
metadata_copy.columns,
context,
metadata_copy.primary_key,
metadata_copy.partition_key,
context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);

metadata = std::move(metadata_copy);
Expand Down
18 changes: 18 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
namespace DB
{

/// Distinguishes manifests submitted by manual `ALTER ... EXPORT PARTITION` from those
/// submitted by the TTL scheduler. Persisted in the manifest body and surfaced through
/// `system.replicated_partition_exports.export_origin`.
enum class ExportOrigin : int8_t
{
alter = 0,
ttl = 1,
};

struct ExportReplicatedMergeTreePartitionProcessingPartEntry
{

Expand Down Expand Up @@ -121,6 +130,7 @@ struct ExportReplicatedMergeTreePartitionManifest
String filename_pattern;
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;
ExportOrigin export_origin = ExportOrigin::alter;

std::string toJsonString() const
{
Expand Down Expand Up @@ -154,6 +164,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("export_origin", String(magic_enum::enum_name(export_origin)));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -208,6 +219,13 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

/// Manifests written before this field existed default to `alter`.
if (json->has("export_origin"))
{
if (auto parsed = magic_enum::enum_cast<ExportOrigin>(json->getValue<String>("export_origin")))
manifest.export_origin = *parsed;
}

return manifest;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ namespace
auto & entries_by_key
)
{
bool has_expired = metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);
/// Manifests submitted by the TTL scheduler are durable by design: the scheduler relies on the
/// last manifest for `(src, dest)` to know where to resume, so manifest-TTL eviction must skip them.
bool has_expired = metadata.export_origin != ExportOrigin::ttl
&& metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);

bool task_timed_out = is_pending
&& metadata.task_timeout_seconds > 0
Expand Down Expand Up @@ -545,6 +548,7 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
info.last_exception = last_exception;
info.exception_part = exception_part;
info.exception_count = exception_count;
info.export_origin = metadata.export_origin;
infos.emplace_back(std::move(info));
}

Expand Down Expand Up @@ -572,6 +576,7 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
info.parts_to_do = entry.manifest.parts.size();
info.parts = entry.manifest.parts;
info.status = magic_enum::enum_name(entry.status);
info.export_origin = entry.manifest.export_origin;

infos.emplace_back(std::move(info));
}
Expand Down
25 changes: 25 additions & 0 deletions src/Storages/MergeTree/ExportPartitionUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
#include <Common/ProfileEvents.h>
#include <Common/FailPoint.h>
#include <Common/logger_useful.h>
#include <Parsers/IAST.h>
#include "Storages/ColumnsDescription.h"
#include "Storages/ExportReplicatedMergeTreePartitionManifest.h"
#include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h"
#include "Storages/StorageInMemoryMetadata.h"
#include <Storages/MergeTree/MergeTreeData.h>
#include <filesystem>
#include <thread>
Expand Down Expand Up @@ -32,6 +35,7 @@ namespace ErrorCodes
{
extern const int FAULT_INJECTED;
extern const int BAD_ARGUMENTS;
extern const int INCOMPATIBLE_COLUMNS;
extern const int NO_SUCH_DATA_PART;
extern const int CORRUPTED_DATA;
extern const int NETWORK_ERROR;
Expand All @@ -47,6 +51,27 @@ namespace fs = std::filesystem;

namespace ExportPartitionUtils
{
void verifyExportDestinationCompatibility(
const ColumnsDescription & src_columns,
const ASTPtr & src_partition_key_ast,
const StorageInMemoryMetadata & dest_metadata,
const IStorage & dest_storage)
{
if (src_columns.getReadable().sizeOfDifference(dest_metadata.getColumns().getInsertable()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (dest_storage.isDataLake())
return;

const auto ast_to_string = [](const ASTPtr & ast) -> String
{
return ast ? ast->formatWithSecretsOneLine() : "";
};

if (ast_to_string(src_partition_key_ast) != ast_to_string(dest_metadata.getPartitionKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
}

std::vector<Field> getPartitionValuesForIcebergCommit(
MergeTreeData & storage, const String & partition_id)
{
Expand Down
17 changes: 16 additions & 1 deletion src/Storages/MergeTree/ExportPartitionUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,37 @@
#include <Core/Field.h>
#include <Common/Logger.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/IAST_fwd.h>
#include "Storages/IStorage.h"
#include <config.h>

#if USE_AVRO
#include <Parsers/IAST.h>
#include <Poco/JSON/Object.h>
#endif

namespace DB
{

class ColumnsDescription;
class MergeTreeData;
struct StorageInMemoryMetadata;
struct ExportReplicatedMergeTreePartitionManifest;

namespace ExportPartitionUtils
{
/// Verifies that the destination table is structurally compatible with the source so that
/// `EXPORT PARTITION` (manual or TTL-driven) can succeed:
/// - source readable columns must equal destination insertable columns (ephemeral columns excluded);
/// - for non-data-lake destinations, the partition key ASTs must match;
/// - for data-lake destinations, partition-key compatibility is verified later at submission time
/// by `verifyIcebergPartitionCompatibility` (it needs the runtime iceberg metadata).
/// Throws `INCOMPATIBLE_COLUMNS` or `BAD_ARGUMENTS` on mismatch.
void verifyExportDestinationCompatibility(
const ColumnsDescription & src_columns,
const ASTPtr & src_partition_key_ast,
const StorageInMemoryMetadata & dest_metadata,
const IStorage & dest_storage);

std::vector<std::string> getExportedPaths(const LoggerPtr & log, const zkutil::ZooKeeperPtr & zk, const std::string & export_path);

ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest);
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2874,6 +2874,12 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada
return false;
}

for (const auto & export_desc : metadata_snapshot->getExportTTLs())
{
if (!ttl_infos.export_ttl.contains(export_desc.destination_name))
return false;
}

return true;
}

Expand Down
Loading
Loading