Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions gc/mmtk/mmtk.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,20 @@ struct objspace {
unsigned int fork_hook_vm_lock_lev;
};

#define OBJ_FREE_BUF_CAPACITY 128

struct MMTk_ractor_cache {
struct ccan_list_node list_node;

MMTk_Mutator *mutator;
bool gc_mutator_p;

MMTk_BumpPointer *bump_pointer;

MMTk_ObjectReference obj_free_parallel_buf[OBJ_FREE_BUF_CAPACITY];
size_t obj_free_parallel_count;
MMTk_ObjectReference obj_free_non_parallel_buf[OBJ_FREE_BUF_CAPACITY];
size_t obj_free_non_parallel_count;
};

struct MMTk_final_job {
Expand Down Expand Up @@ -143,6 +150,8 @@ rb_mmtk_resume_mutators(void)
}
}

static void mmtk_flush_obj_free_buffer(struct MMTk_ractor_cache *cache);

static void
rb_mmtk_block_for_gc(MMTk_VMMutatorThread mutator)
{
Expand Down Expand Up @@ -173,6 +182,11 @@ rb_mmtk_block_for_gc(MMTk_VMMutatorThread mutator)

rb_gc_vm_barrier();

struct MMTk_ractor_cache *rc;
ccan_list_for_each(&objspace->ractor_caches, rc, list_node) {
mmtk_flush_obj_free_buffer(rc);
}

objspace->world_stopped = true;

pthread_cond_broadcast(&objspace->cond_world_stopped);
Expand Down Expand Up @@ -584,7 +598,7 @@ rb_gc_impl_ractor_cache_alloc(void *objspace_ptr, void *ractor)
}
objspace->live_ractor_cache_count++;

struct MMTk_ractor_cache *cache = malloc(sizeof(struct MMTk_ractor_cache));
struct MMTk_ractor_cache *cache = calloc(1, sizeof(struct MMTk_ractor_cache));
ccan_list_add(&objspace->ractor_caches, &cache->list_node);

cache->mutator = mmtk_bind_mutator(cache);
Expand All @@ -601,6 +615,8 @@ rb_gc_impl_ractor_cache_free(void *objspace_ptr, void *cache_ptr)

ccan_list_del(&cache->list_node);

mmtk_flush_obj_free_buffer(cache);

if (ruby_free_at_exit_p()) {
MMTK_ASSERT(objspace->live_ractor_cache_count > 0);
}
Expand Down Expand Up @@ -801,6 +817,42 @@ obj_can_parallel_free_p(VALUE obj)
}
}

static void
mmtk_flush_obj_free_buffer(struct MMTk_ractor_cache *cache)
{
if (cache->obj_free_parallel_count > 0) {
mmtk_add_obj_free_candidates(cache->obj_free_parallel_buf,
cache->obj_free_parallel_count, true);
cache->obj_free_parallel_count = 0;
}
if (cache->obj_free_non_parallel_count > 0) {
mmtk_add_obj_free_candidates(cache->obj_free_non_parallel_buf,
cache->obj_free_non_parallel_count, false);
cache->obj_free_non_parallel_count = 0;
}
}

static inline void
mmtk_buffer_obj_free_candidate(struct MMTk_ractor_cache *cache, VALUE obj)
{
if (obj_can_parallel_free_p(obj)) {
cache->obj_free_parallel_buf[cache->obj_free_parallel_count++] = (MMTk_ObjectReference)obj;
if (cache->obj_free_parallel_count >= OBJ_FREE_BUF_CAPACITY) {
mmtk_add_obj_free_candidates(cache->obj_free_parallel_buf,
cache->obj_free_parallel_count, true);
cache->obj_free_parallel_count = 0;
}
}
else {
cache->obj_free_non_parallel_buf[cache->obj_free_non_parallel_count++] = (MMTk_ObjectReference)obj;
if (cache->obj_free_non_parallel_count >= OBJ_FREE_BUF_CAPACITY) {
mmtk_add_obj_free_candidates(cache->obj_free_non_parallel_buf,
cache->obj_free_non_parallel_count, false);
cache->obj_free_non_parallel_count = 0;
}
}
}

VALUE
rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags, bool wb_protected, size_t alloc_size)
{
Expand Down Expand Up @@ -837,7 +889,7 @@ rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags
mmtk_post_alloc(ractor_cache->mutator, (void*)alloc_obj, alloc_size, MMTK_ALLOCATION_SEMANTICS_DEFAULT);

// TODO: only add when object needs obj_free to be called
mmtk_add_obj_free_candidate(alloc_obj, obj_can_parallel_free_p((VALUE)alloc_obj));
mmtk_buffer_obj_free_candidate(ractor_cache, (VALUE)alloc_obj);

objspace->total_allocated_objects++;

Expand Down Expand Up @@ -1277,6 +1329,11 @@ rb_gc_impl_shutdown_call_finalizer(void *objspace_ptr)

unsigned int lev = RB_GC_VM_LOCK();
{
struct MMTk_ractor_cache *rc;
ccan_list_for_each(&objspace->ractor_caches, rc, list_node) {
mmtk_flush_obj_free_buffer(rc);
}

struct MMTk_RawVecOfObjRef registered_candidates = mmtk_get_all_obj_free_candidates();
for (size_t i = 0; i < registered_candidates.len; i++) {
VALUE obj = (VALUE)registered_candidates.ptr[i];
Expand Down
4 changes: 3 additions & 1 deletion gc/mmtk/mmtk.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ void mmtk_post_alloc(MMTk_Mutator *mutator,
size_t bytes,
MMTk_AllocationSemantics semantics);

void mmtk_add_obj_free_candidate(MMTk_ObjectReference object, bool can_parallel_free);
void mmtk_add_obj_free_candidates(const MMTk_ObjectReference *objects,
size_t count,
bool can_parallel_free);

void mmtk_declare_weak_references(MMTk_ObjectReference object);

Expand Down
10 changes: 7 additions & 3 deletions gc/mmtk/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ pub unsafe extern "C" fn mmtk_post_alloc(
memory_manager::post_alloc::<Ruby>(unsafe { &mut *mutator }, refer, bytes, semantics)
}

// TODO: Replace with buffered mmtk_add_obj_free_candidates
#[no_mangle]
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference, can_parallel_free: bool) {
pub unsafe extern "C" fn mmtk_add_obj_free_candidates(
objects: *const ObjectReference,
count: usize,
can_parallel_free: bool,
) {
let objects = unsafe { std::slice::from_raw_parts(objects, count) };
binding()
.weak_proc
.add_obj_free_candidate(object, can_parallel_free)
.add_obj_free_candidates_batch(objects, can_parallel_free)
}

// =============== Weak references ===============
Expand Down
41 changes: 22 additions & 19 deletions gc/mmtk/src/weak_proc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

use mmtk::scheduler::GCWork;
Expand All @@ -15,7 +13,6 @@ use crate::Ruby;
pub struct WeakProcessor {
non_parallel_obj_free_candidates: Mutex<Vec<ObjectReference>>,
parallel_obj_free_candidates: Vec<Mutex<Vec<ObjectReference>>>,
parallel_obj_free_candidates_counter: AtomicUsize,

/// Objects that needs `obj_free` called when dying.
/// If it is a bottleneck, replace it with a lock-free data structure,
Expand All @@ -34,7 +31,6 @@ impl WeakProcessor {
Self {
non_parallel_obj_free_candidates: Mutex::new(Vec::new()),
parallel_obj_free_candidates: vec![Mutex::new(Vec::new())],
parallel_obj_free_candidates_counter: AtomicUsize::new(0),
weak_references: Mutex::new(Vec::new()),
}
}
Expand All @@ -48,27 +44,34 @@ impl WeakProcessor {
}
}

/// Add an object as a candidate for `obj_free`.
/// Add a batch of objects as candidates for `obj_free`.
///
/// Multiple mutators can call it concurrently, so it has `&self`.
pub fn add_obj_free_candidate(&self, object: ObjectReference, can_parallel_free: bool) {
/// Amortizes mutex acquisition over the entire batch. Called when a
/// mutator's local buffer is flushed (buffer full or stop-the-world).
pub fn add_obj_free_candidates_batch(
&self,
objects: &[ObjectReference],
can_parallel_free: bool,
) {
if objects.is_empty() {
return;
}

if can_parallel_free {
// Newly allocated objects are placed in parallel_obj_free_candidates using
// round-robin. This may not be ideal for load balancing.
let idx = self
.parallel_obj_free_candidates_counter
.fetch_add(1, Ordering::Relaxed)
% self.parallel_obj_free_candidates.len();

self.parallel_obj_free_candidates[idx]
.lock()
.unwrap()
.push(object);
let num_buckets = self.parallel_obj_free_candidates.len();
for idx in 0..num_buckets {
let mut bucket = self.parallel_obj_free_candidates[idx].lock().unwrap();
for (i, &obj) in objects.iter().enumerate() {
if i % num_buckets == idx {
bucket.push(obj);
}
}
}
} else {
self.non_parallel_obj_free_candidates
.lock()
.unwrap()
.push(object);
.extend_from_slice(objects);
}
}

Expand Down