Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
683 changes: 443 additions & 240 deletions src/btree.c

Large diffs are not rendered by default.

22 changes: 15 additions & 7 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,21 @@ typedef struct btree_iterator {
bool32 do_prefetch;
uint32 height;
page_type page_type;
comparison min_key_comparison;
key min_key;
comparison max_key_comparison;
key max_key;
// Active memtable iterators copy nodes here and release page locks.
bool32 copy_nodes;
comparison min_key_comparison;
key min_key;
comparison max_key_comparison;
key max_key;

uint64 root_addr;
btree_node curr;
char *node_copy;
int64 idx;
int64 curr_min_idx;
uint64 end_addr;
int64 end_idx;
uint64 end_generation;
bool32 end_idx_valid;
} btree_iterator;

typedef struct btree_pack_req {
Expand Down Expand Up @@ -296,7 +299,7 @@ btree_lookup_and_merge_async(btree_lookup_async_state *state);
async_status
btree_lookup_async(btree_lookup_async_state *state);

void
platform_status
btree_iterator_init(cache *cc,
const btree_config *cfg,
btree_iterator *itor,
Expand All @@ -309,6 +312,7 @@ btree_iterator_init(cache *cc,
comparison start_type,
key start_key,
bool32 do_prefetch,
bool32 copy_nodes,
uint32 height);

// clang-format off
Expand All @@ -325,13 +329,14 @@ DEFINE_ASYNC_STATE(btree_iterator_async_state, 5,
param, comparison, start_type,
param, key, start_key,
param, bool32, do_prefetch,
param, bool32, copy_nodes,
param, uint32, height,
param, async_callback_fn, callback,
param, void *, callback_arg,
local, platform_status, __async_result,
local, btree_lookup_async_state, lookup_state,
local, page_get_async_state_buffer, cache_get_state,
local, btree_node, end,
local, btree_node, live_curr,
local, key, target,
local, comparison, position_rule,
local, bool32, found,
Expand All @@ -349,6 +354,9 @@ DEFINE_ASYNC_STATE(btree_iterator_async_state, 5,
async_status
btree_iterator_init_async(btree_iterator_async_state *state);

platform_status
btree_iterator_init_async_result(btree_iterator_async_state *state);

void
btree_iterator_deinit(btree_iterator *itor);

Expand Down
85 changes: 53 additions & 32 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ core_memtable_dec_ref(core_handle *spl, uint64 root_addr)


/* Wrappers for creating/destroying memtable btree iterators. */
static void
static platform_status
core_memtable_iterator_init(core_handle *spl,
btree_iterator *itor,
uint64 root_addr,
Expand All @@ -376,19 +376,20 @@ core_memtable_iterator_init(core_handle *spl,
comparison start_key_comparison,
key start_key)
{
btree_iterator_init(spl->cc,
spl->cfg.btree_cfg,
itor,
root_addr,
PAGE_TYPE_MEMTABLE,
min_key_comparison,
min_key,
max_key_comparison,
max_key,
start_key_comparison,
start_key,
FALSE,
0);
return btree_iterator_init(spl->cc,
spl->cfg.btree_cfg,
itor,
root_addr,
PAGE_TYPE_MEMTABLE,
min_key_comparison,
min_key,
max_key_comparison,
max_key,
start_key_comparison,
start_key,
FALSE,
FALSE,
0);
}

static void
Expand Down Expand Up @@ -468,15 +469,16 @@ core_memtable_compact(core_handle *spl, uint64 generation, const threadid tid)
btree_iterator btree_itor;
iterator *itor = &btree_itor.super;

core_memtable_iterator_init(spl,
&btree_itor,
memtable_root_addr,
greater_than_or_equal,
NEGATIVE_INFINITY_KEY,
less_than,
POSITIVE_INFINITY_KEY,
greater_than_or_equal,
NEGATIVE_INFINITY_KEY);
platform_status rc = core_memtable_iterator_init(spl,
&btree_itor,
memtable_root_addr,
greater_than_or_equal,
NEGATIVE_INFINITY_KEY,
less_than,
POSITIVE_INFINITY_KEY,
greater_than_or_equal,
NEGATIVE_INFINITY_KEY);
platform_assert_status_ok(rc);
const routing_config *rfcfg = spl->cfg.trunk_node_cfg->filter_cfg;
uint64 rflimit = routing_filter_max_fingerprints(spl->cfg.cache_cfg, rfcfg);
btree_pack_req req;
Expand Down Expand Up @@ -734,14 +736,21 @@ core_memtable_flush_virtual(void *arg, uint64 generation)
static inline uint64
core_memtable_root_addr_for_lookup(core_handle *spl,
uint64 generation,
bool32 *is_compacted)
bool32 *is_compacted,
bool32 *is_active)
{
memtable *mt = core_get_memtable(spl, generation);
platform_assert(memtable_ok_to_lookup(mt));
if (is_active != NULL) {
*is_active = mt->state == MEMTABLE_STATE_READY;
}

if (memtable_ok_to_lookup_compacted(mt)) {
// lookup in packed tree
*is_compacted = TRUE;
if (is_active != NULL) {
*is_active = FALSE;
}
core_compacted_memtable *cmt =
core_get_compacted_memtable(spl, generation);
return cmt->branch.root_addr;
Expand Down Expand Up @@ -772,7 +781,7 @@ core_memtable_lookup(core_handle *spl,
btree_config *const cfg = spl->cfg.btree_cfg;
bool32 memtable_is_compacted;
uint64 root_addr = core_memtable_root_addr_for_lookup(
spl, generation, &memtable_is_compacted);
spl, generation, &memtable_is_compacted, NULL);
page_type type =
memtable_is_compacted ? PAGE_TYPE_BRANCH : PAGE_TYPE_MEMTABLE;

Expand Down Expand Up @@ -867,7 +876,8 @@ core_start_btree_iterator_init_async(
key max_key,
comparison start_key_comparison,
key start_key,
bool32 do_prefetch)
bool32 do_prefetch,
bool32 copy_nodes)
{
btree_iterator_async_state_init(&ctxt->state,
spl->cc,
Expand All @@ -882,6 +892,7 @@ core_start_btree_iterator_init_async(
start_key_comparison,
start_key,
do_prefetch,
copy_nodes,
0,
core_btree_iterator_init_async_callback,
ctxt);
Expand All @@ -890,7 +901,7 @@ core_start_btree_iterator_init_async(

if (btree_iterator_init_async(&ctxt->state) == ASYNC_STATUS_DONE) {
ctxt->done = TRUE;
return async_result(&ctxt->state);
return btree_iterator_init_async_result(&ctxt->state);
}

return STATUS_OK;
Expand All @@ -907,7 +918,7 @@ core_drain_btree_iterator_init_async(
for (uint64 i = 0; i < num_inits; i++) {
if (ctxt[i].done) {
done_count++;
platform_status rc = async_result(&ctxt[i].state);
platform_status rc = btree_iterator_init_async_result(&ctxt[i].state);
if (!SUCCESS(rc) && SUCCESS(result)) {
result = rc;
}
Expand All @@ -926,7 +937,8 @@ core_drain_btree_iterator_init_async(
if (btree_iterator_init_async(&ctxt[i].state) == ASYNC_STATUS_DONE) {
ctxt[i].done = TRUE;
done_count++;
platform_status rc = async_result(&ctxt[i].state);
platform_status rc =
btree_iterator_init_async_result(&ctxt[i].state);
if (!SUCCESS(rc) && SUCCESS(result)) {
result = rc;
}
Expand Down Expand Up @@ -1071,6 +1083,7 @@ core_range_iterator_init(core_handle *spl,
range_itor->memtable_end_gen = memtable_generation_retired(&spl->mt_ctxt);
range_itor->num_memtable_branches =
range_itor->memtable_start_gen - range_itor->memtable_end_gen;
bool32 first_memtable_copy_nodes = FALSE;
for (uint64 mt_gen = range_itor->memtable_start_gen;
mt_gen != range_itor->memtable_end_gen;
mt_gen--)
Expand All @@ -1083,9 +1096,16 @@ core_range_iterator_init(core_handle *spl,
debug_assert(range_itor->num_branches < ARRAY_SIZE(range_itor->branch));

bool32 compacted;
bool32 active;
uint64 root_addr =
core_memtable_root_addr_for_lookup(spl, mt_gen, &compacted);
core_memtable_root_addr_for_lookup(spl, mt_gen, &compacted, &active);
range_itor->compacted[range_itor->num_branches] = compacted;
// Only READY memtables can be modified while this iterator is live.
if (range_itor->num_branches == 0) {
first_memtable_copy_nodes = active;
} else {
debug_assert(!active);
}
if (compacted) {
btree_inc_ref(spl->cc, spl->cfg.btree_cfg, root_addr);
} else {
Expand Down Expand Up @@ -1190,7 +1210,8 @@ core_range_iterator_init(core_handle *spl,
key_buffer_key(&range_itor->local_max_key),
start_key_comparison,
start_key,
do_prefetch);
do_prefetch,
branch_no == 0 ? first_memtable_copy_nodes : FALSE);
started_inits++;
if (!SUCCESS(rc)) {
break;
Expand Down Expand Up @@ -2360,7 +2381,7 @@ core_print_lookup(core_handle *spl, key target, platform_log_handle *log_handle)
for (uint64 mt_gen = mt_gen_start; mt_gen != mt_gen_end; mt_gen--) {
bool32 memtable_is_compacted;
uint64 root_addr = core_memtable_root_addr_for_lookup(
spl, mt_gen, &memtable_is_compacted);
spl, mt_gen, &memtable_is_compacted, NULL);
platform_status rc;

rc = btree_lookup(spl->cc,
Expand Down
39 changes: 25 additions & 14 deletions src/trunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2274,25 +2274,36 @@ trunk_branch_merger_add_branch(trunk_branch_merger *merger,
"%s():%d: platform_malloc() failed", __func__, __LINE__);
return STATUS_NO_MEMORY;
}
btree_iterator_init(cc,
btree_cfg,
iter,
addr,
type,
greater_than_or_equal,
merger->min_key,
less_than,
merger->max_key,
greater_than_or_equal,
merger->min_key,
TRUE,
merger->height);
platform_status rc = vector_append(&merger->itors, (iterator *)iter);
platform_status rc = btree_iterator_init(cc,
btree_cfg,
iter,
addr,
type,
greater_than_or_equal,
merger->min_key,
less_than,
merger->max_key,
greater_than_or_equal,
merger->min_key,
TRUE,
FALSE,
merger->height);
if (!SUCCESS(rc)) {
platform_error_log("%s():%d: btree_iterator_init() failed: %s",
__func__,
__LINE__,
platform_status_to_string(rc));
platform_free(merger->hid, iter);
return rc;
}
rc = vector_append(&merger->itors, (iterator *)iter);
if (!SUCCESS(rc)) {
platform_error_log("%s():%d: vector_append() failed: %s",
__func__,
__LINE__,
platform_status_to_string(rc));
btree_iterator_deinit(iter);
platform_free(merger->hid, iter);
}
return rc;
}
Expand Down
Loading
Loading