Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ namespace DB
M(force_set_proxy_state_machine_cpu_cores) \
M(force_join_v2_probe_enable_lm) \
M(force_join_v2_probe_disable_lm) \
M(force_gc_try_segment_merge_generic_error) \
M(force_gc_try_segment_merge_s3_error) \
M(force_s3_random_access_file_init_fail) \
M(force_s3_random_access_file_read_fail) \
M(force_s3_random_access_file_seek_fail) \
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ class DeltaMergeStore
/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit, const GCOptions & gc_options);

/// Test hook for verifying the background GC merge fan-in cap.
UInt32 getGcMergeableSegmentsCapForTest() const;

/// Test hook for overriding the background GC merge fan-in cap.
void setGcMergeableSegmentsCapForTest(UInt32 cap);

/**
* Try to merge the segment in the current thread as the GC operation.
* This function may be blocking, and should be called in the GC background thread.
Expand Down Expand Up @@ -552,6 +558,9 @@ class DeltaMergeStore

void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment);

void reduceGcMergeableSegmentsCap(std::string_view reason);
void recoverGcMergeableSegmentsCap(std::string_view reason);

/// Should be called after every write into DeltaMergeStore.
/// If the delta cache reaches the foreground flush limit, it will also trigger a KVStore flush of related regions,
/// by returning a non-empty DM::WriteResult.
Expand Down Expand Up @@ -834,6 +843,10 @@ class DeltaMergeStore
public:
#endif

static constexpr UInt32 gc_mergeable_segments_cap_default = 32;
static constexpr UInt32 gc_mergeable_segments_cap_min = 2;
static constexpr UInt32 gc_mergeable_segments_cap_recover_step = 2;

/**
* Ensure the segment has delta index.
* If the segment has no delta index, it will be built in background.
Expand Down Expand Up @@ -918,6 +931,7 @@ class DeltaMergeStore
MergeDeltaTaskPool background_tasks;

std::atomic<DB::Timestamp> latest_gc_safe_point = 0;
std::atomic<UInt32> gc_mergeable_segments_cap = gc_mergeable_segments_cap_default;

RowKeyValue next_gc_check_key;

Expand Down
92 changes: 91 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,30 @@ namespace DB
{
namespace FailPoints
{
extern const char force_gc_try_segment_merge_generic_error[];
extern const char force_gc_try_segment_merge_s3_error[];
extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
} // namespace FailPoints

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int S3_ERROR;
} // namespace ErrorCodes

namespace DM
{
UInt32 DeltaMergeStore::getGcMergeableSegmentsCapForTest() const
{
return gc_mergeable_segments_cap.load(std::memory_order_relaxed);
}

void DeltaMergeStore::setGcMergeableSegmentsCapForTest(UInt32 cap)
{
gc_mergeable_segments_cap.store(std::max(cap, gc_mergeable_segments_cap_min), std::memory_order_relaxed);
}

// A callback class for scanning the DMFiles on local filesystem
class LocalDMFileGcScanner final
{
Expand Down Expand Up @@ -326,6 +344,7 @@ std::vector<SegmentPtr> DeltaMergeStore::getMergeableSegments(
// segment merging for this case in future.
auto max_total_rows = context->segment_limit_rows;
auto max_total_bytes = context->segment_limit_bytes;
const auto max_mergeable_segments = gc_mergeable_segments_cap.load(std::memory_order_relaxed);

std::vector<SegmentPtr> results;
{
Expand All @@ -342,6 +361,8 @@ std::vector<SegmentPtr> DeltaMergeStore::getMergeableSegments(
auto it = segments.upper_bound(base_segment->getRowKeyRange().getEnd());
while (it != segments.end())
{
if (results.size() >= max_mergeable_segments)
break;
const auto & this_seg = it->second;
const auto this_rows = this_seg->getEstimatedRows();
const auto this_bytes = this_seg->getEstimatedBytes();
Expand All @@ -366,6 +387,51 @@ std::vector<SegmentPtr> DeltaMergeStore::getMergeableSegments(
return results;
}

void DeltaMergeStore::reduceGcMergeableSegmentsCap(std::string_view reason)
{
auto old_cap = gc_mergeable_segments_cap.load(std::memory_order_relaxed);
while (true)
{
const auto new_cap = std::max(gc_mergeable_segments_cap_min, old_cap / 2);
if (new_cap == old_cap)
return;
if (gc_mergeable_segments_cap.compare_exchange_weak(old_cap, new_cap, std::memory_order_relaxed))
{
LOG_INFO(
log,
"GC - Reduce mergeable segments cap, table_id={} old_cap={} new_cap={} reason={}",
physical_table_id,
old_cap,
new_cap,
reason);
return;
}
}
}

void DeltaMergeStore::recoverGcMergeableSegmentsCap(std::string_view reason)
{
auto old_cap = gc_mergeable_segments_cap.load(std::memory_order_relaxed);
while (true)
{
const auto new_cap
= std::min(gc_mergeable_segments_cap_default, old_cap + gc_mergeable_segments_cap_recover_step);
if (new_cap == old_cap)
return;
if (gc_mergeable_segments_cap.compare_exchange_weak(old_cap, new_cap, std::memory_order_relaxed))
{
LOG_INFO(
log,
"GC - Recover mergeable segments cap, table_id={} old_cap={} new_cap={} reason={}",
physical_table_id,
old_cap,
new_cap,
reason);
return;
}
}
}

bool DeltaMergeStore::updateGCSafePoint()
{
if (auto pd_client = global_context.getTMTContext().getPDClient(); !pd_client->isMock())
Expand Down Expand Up @@ -736,10 +802,23 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
}

LOG_INFO(log, "GC - Trigger Merge, segment={}", segment->simpleInfo());
fiu_do_on(FailPoints::force_gc_try_segment_merge_s3_error, {
throw Exception(
ErrorCodes::S3_ERROR,
"Injected S3_ERROR in gcTrySegmentMerge, segment={}",
segment->simpleInfo());
});
fiu_do_on(FailPoints::force_gc_try_segment_merge_generic_error, {
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Injected non-S3 error in gcTrySegmentMerge, segment={}",
segment->simpleInfo());
});
auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread);
if (new_segment)
{
checkSegmentUpdate(dm_context, new_segment, ThreadType::BG_GC, InputType::NotRaft);
recoverGcMergeableSegmentsCap("background_gc_merge_success");
}

return new_segment;
Expand Down Expand Up @@ -936,7 +1015,18 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)
{
SegmentPtr new_seg = nullptr;
if (!new_seg && gc_options.do_merge)
new_seg = gcTrySegmentMerge(dm_context, segment);
{
try
{
new_seg = gcTrySegmentMerge(dm_context, segment);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::S3_ERROR)
reduceGcMergeableSegmentsCap("background_gc_merge_s3_error");
throw;
}
}
if (!new_seg && gc_options.do_merge_delta)
new_seg = gcTrySegmentMergeDelta(dm_context, segment, prev_segment, next_segment, gc_safe_point);

Expand Down
57 changes: 56 additions & 1 deletion dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ namespace DB
{
namespace FailPoints
{
extern const char force_gc_try_segment_merge_generic_error[];
extern const char force_gc_try_segment_merge_s3_error[];
extern const char skip_check_segment_update[];
} // namespace FailPoints

Expand Down Expand Up @@ -139,6 +141,59 @@ try
}
CATCH

TEST_F(DeltaMergeStoreGCMergeTest, MergeableSegmentsCapLimitsFanIn)
try
{
ensureSegmentBreakpoints({0, 10, 20, 30, 40, 50});
db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 1000;
db_context->getGlobalContext().getSettingsRef().dt_segment_limit_size = 1ULL << 30;

store->setGcMergeableSegmentsCapForTest(3);
auto segments_to_merge = store->getMergeableSegments(dm_context, getSegmentAt(0));
ASSERT_EQ(segments_to_merge.size(), 3);
ASSERT_EQ(store->getGcMergeableSegmentsCapForTest(), 3);
}
CATCH

TEST_F(DeltaMergeStoreGCMergeTest, S3ErrorReducesCapAndSuccessfulMergeRecoversIt)
try
{
ensureSegmentBreakpoints({0, 10, 20, 30, 40, 50});
db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 1000;

ASSERT_EQ(store->getGcMergeableSegmentsCapForTest(), DeltaMergeStore::gc_mergeable_segments_cap_default);

FailPointHelper::enableFailPoint(FailPoints::force_gc_try_segment_merge_s3_error);
ASSERT_THROW(store->onSyncGc(1, gc_options), DB::Exception);
ASSERT_EQ(store->getGcMergeableSegmentsCapForTest(), DeltaMergeStore::gc_mergeable_segments_cap_default / 2);

ASSERT_THROW(store->onSyncGc(1, gc_options), DB::Exception);
ASSERT_EQ(store->getGcMergeableSegmentsCapForTest(), DeltaMergeStore::gc_mergeable_segments_cap_default / 4);

FailPointHelper::disableFailPoint(FailPoints::force_gc_try_segment_merge_s3_error);
ASSERT_EQ(store->onSyncGc(1, gc_options), 1);
ASSERT_EQ(
store->getGcMergeableSegmentsCapForTest(),
DeltaMergeStore::gc_mergeable_segments_cap_default / 4
+ DeltaMergeStore::gc_mergeable_segments_cap_recover_step);
}
CATCH

TEST_F(DeltaMergeStoreGCMergeTest, NonS3ErrorDoesNotReduceCap)
try
{
ensureSegmentBreakpoints({0, 10, 20, 30, 40, 50});
db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 1000;
store->setGcMergeableSegmentsCapForTest(8);

FailPointHelper::enableFailPoint(FailPoints::force_gc_try_segment_merge_generic_error);
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_gc_try_segment_merge_generic_error); });

ASSERT_THROW(store->onSyncGc(1, gc_options), DB::Exception);
ASSERT_EQ(store->getGcMergeableSegmentsCapForTest(), 8);
}
CATCH


class DeltaMergeStoreGCMergeDeltaTest : public DeltaMergeStoreGCTest
{
Expand Down Expand Up @@ -476,4 +531,4 @@ CATCH

} // namespace tests
} // namespace DM
} // namespace DB
} // namespace DB
69 changes: 60 additions & 9 deletions dbms/src/Storages/S3/S3RandomAccessFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace DB::S3
namespace
{
constexpr size_t s3_read_limiter_preferred_chunk_size = 128 * 1024;
constexpr size_t s3_forward_seek_reopen_threshold = 128 * 1024;
constexpr size_t s3_forward_seek_reopen_threshold = 64 * 1024;
} // namespace

String S3RandomAccessFile::summary() const
Expand Down Expand Up @@ -120,18 +120,43 @@ bool shouldRetryStreamError(Int32 retried_times, int ret, int err, Int32 max_ret
{
return retried_times + 1 < max_retry_times && isRetryableError(ret, err);
}

const char * retryableErrorName(int ret, int err)
{
if (ret == S3StreamError)
return "stream_error";
if (err == ECONNRESET)
return "ECONNRESET";
if (err == EAGAIN)
return "EAGAIN";
if (err == EINPROGRESS)
return "EINPROGRESS";
return "unknown";
}
} // namespace

ssize_t S3RandomAccessFile::read(char * buf, size_t size)
{
for (Int32 stream_retry_times = 0;; ++stream_retry_times)
{
auto n = readImpl(buf, size);
if (unlikely(n < 0 && shouldRetryStreamError(stream_retry_times, n, errno, max_retry)))
const auto err = errno;
if (unlikely(n < 0))
{
// Stream-side retries reopen from the last committed offset instead of sharing initialize state.
reopenAt(cur_offset, "read meet retryable error");
continue;
const auto retryable = isRetryableError(n, err);
const auto can_retry = shouldRetryStreamError(stream_retry_times, n, err, max_retry);
if (can_retry)
{
// Stream-side retries reopen from the last committed offset instead of sharing initialize state.
reopenAt(cur_offset, "read meet retryable error");
continue;
}
if (retryable)
{
// The failure is still retryable, but this call has already used up the bounded stream-side
// retries. Convert it to S3_ERROR here so upper layers can classify the final remote read.
throwRetryExhaustedError("read", n, err);
}
}
return n;
}
Expand Down Expand Up @@ -246,16 +271,42 @@ off_t S3RandomAccessFile::seek(off_t offset_, int whence)
for (Int32 stream_retry_times = 0;; ++stream_retry_times)
{
auto off = seekImpl(offset_, whence);
if (unlikely(off < 0 && shouldRetryStreamError(stream_retry_times, off, errno, max_retry)))
const auto err = errno;
if (unlikely(off < 0))
{
// Retry the seek from the last committed offset rather than from a partially drained stream.
reopenAt(cur_offset, "seek meet retryable error");
continue;
const auto retryable = isRetryableError(off, err);
const auto can_retry = shouldRetryStreamError(stream_retry_times, off, err, max_retry);
if (can_retry)
{
// Retry the seek from the last committed offset rather than from a partially drained stream.
reopenAt(cur_offset, "seek meet retryable error");
continue;
}
if (retryable)
{
// The failure is still retryable, but this call has already used up the bounded stream-side
// retries. Convert it to S3_ERROR here so upper layers can classify the final remote read.
throwRetryExhaustedError("seek", off, err);
}
}
return off;
}
}

void S3RandomAccessFile::throwRetryExhaustedError(std::string_view action, int ret, int err) const
{
throw Exception(
ErrorCodes::S3_ERROR,
"S3RandomAccessFile {} failed after {} stream retries, key={}, cur_offset={}, ret={}, errno={} ({})",
action,
max_retry,
remote_fname,
cur_offset,
ret,
err,
retryableErrorName(ret, err));
}

off_t S3RandomAccessFile::seekImpl(off_t offset_, int whence)
{
RUNTIME_CHECK_MSG(whence == SEEK_SET, "Only SEEK_SET mode is allowed, but {} is received", whence);
Expand Down
Loading