diff --git a/gc/mmtk/mmtk.c b/gc/mmtk/mmtk.c index 4832916..21d97b8 100644 --- a/gc/mmtk/mmtk.c +++ b/gc/mmtk/mmtk.c @@ -48,6 +48,8 @@ struct objspace { unsigned int fork_hook_vm_lock_lev; }; +#define OBJ_FREE_BUF_CAPACITY 128 + struct MMTk_ractor_cache { struct ccan_list_node list_node; @@ -55,6 +57,11 @@ struct MMTk_ractor_cache { bool gc_mutator_p; MMTk_BumpPointer *bump_pointer; + + MMTk_ObjectReference obj_free_parallel_buf[OBJ_FREE_BUF_CAPACITY]; + size_t obj_free_parallel_count; + MMTk_ObjectReference obj_free_non_parallel_buf[OBJ_FREE_BUF_CAPACITY]; + size_t obj_free_non_parallel_count; }; struct MMTk_final_job { @@ -143,6 +150,8 @@ rb_mmtk_resume_mutators(void) } } +static void mmtk_flush_obj_free_buffer(struct MMTk_ractor_cache *cache); + static void rb_mmtk_block_for_gc(MMTk_VMMutatorThread mutator) { @@ -173,6 +182,11 @@ rb_mmtk_block_for_gc(MMTk_VMMutatorThread mutator) rb_gc_vm_barrier(); + struct MMTk_ractor_cache *rc; + ccan_list_for_each(&objspace->ractor_caches, rc, list_node) { + mmtk_flush_obj_free_buffer(rc); + } + objspace->world_stopped = true; pthread_cond_broadcast(&objspace->cond_world_stopped); @@ -584,7 +598,7 @@ rb_gc_impl_ractor_cache_alloc(void *objspace_ptr, void *ractor) } objspace->live_ractor_cache_count++; - struct MMTk_ractor_cache *cache = malloc(sizeof(struct MMTk_ractor_cache)); + struct MMTk_ractor_cache *cache = calloc(1, sizeof(struct MMTk_ractor_cache)); ccan_list_add(&objspace->ractor_caches, &cache->list_node); cache->mutator = mmtk_bind_mutator(cache); @@ -601,6 +615,8 @@ rb_gc_impl_ractor_cache_free(void *objspace_ptr, void *cache_ptr) ccan_list_del(&cache->list_node); + mmtk_flush_obj_free_buffer(cache); + if (ruby_free_at_exit_p()) { MMTK_ASSERT(objspace->live_ractor_cache_count > 0); } @@ -801,6 +817,42 @@ obj_can_parallel_free_p(VALUE obj) } } +static void +mmtk_flush_obj_free_buffer(struct MMTk_ractor_cache *cache) +{ + if (cache->obj_free_parallel_count > 0) { + mmtk_add_obj_free_candidates(cache->obj_free_parallel_buf, + cache->obj_free_parallel_count, true); + cache->obj_free_parallel_count = 0; + } + if (cache->obj_free_non_parallel_count > 0) { + mmtk_add_obj_free_candidates(cache->obj_free_non_parallel_buf, + cache->obj_free_non_parallel_count, false); + cache->obj_free_non_parallel_count = 0; + } +} + +static inline void +mmtk_buffer_obj_free_candidate(struct MMTk_ractor_cache *cache, VALUE obj) +{ + if (obj_can_parallel_free_p(obj)) { + cache->obj_free_parallel_buf[cache->obj_free_parallel_count++] = (MMTk_ObjectReference)obj; + if (cache->obj_free_parallel_count >= OBJ_FREE_BUF_CAPACITY) { + mmtk_add_obj_free_candidates(cache->obj_free_parallel_buf, + cache->obj_free_parallel_count, true); + cache->obj_free_parallel_count = 0; + } + } + else { + cache->obj_free_non_parallel_buf[cache->obj_free_non_parallel_count++] = (MMTk_ObjectReference)obj; + if (cache->obj_free_non_parallel_count >= OBJ_FREE_BUF_CAPACITY) { + mmtk_add_obj_free_candidates(cache->obj_free_non_parallel_buf, + cache->obj_free_non_parallel_count, false); + cache->obj_free_non_parallel_count = 0; + } + } +} + VALUE rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags, bool wb_protected, size_t alloc_size) { @@ -837,7 +889,7 @@ rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags mmtk_post_alloc(ractor_cache->mutator, (void*)alloc_obj, alloc_size, MMTK_ALLOCATION_SEMANTICS_DEFAULT); // TODO: only add when object needs obj_free to be called - mmtk_add_obj_free_candidate(alloc_obj, obj_can_parallel_free_p((VALUE)alloc_obj)); + mmtk_buffer_obj_free_candidate(ractor_cache, (VALUE)alloc_obj); objspace->total_allocated_objects++; @@ -1277,6 +1329,11 @@ rb_gc_impl_shutdown_call_finalizer(void *objspace_ptr) unsigned int lev = RB_GC_VM_LOCK(); { + struct MMTk_ractor_cache *rc; + ccan_list_for_each(&objspace->ractor_caches, rc, list_node) { + mmtk_flush_obj_free_buffer(rc); + } + struct MMTk_RawVecOfObjRef registered_candidates = mmtk_get_all_obj_free_candidates(); for (size_t i = 0; i < registered_candidates.len; i++) { VALUE obj = (VALUE)registered_candidates.ptr[i]; diff --git a/gc/mmtk/mmtk.h b/gc/mmtk/mmtk.h index ffbad1a..20d2684 100644 --- a/gc/mmtk/mmtk.h +++ b/gc/mmtk/mmtk.h @@ -123,7 +123,9 @@ void mmtk_post_alloc(MMTk_Mutator *mutator, size_t bytes, MMTk_AllocationSemantics semantics); -void mmtk_add_obj_free_candidate(MMTk_ObjectReference object, bool can_parallel_free); +void mmtk_add_obj_free_candidates(const MMTk_ObjectReference *objects, + size_t count, + bool can_parallel_free); void mmtk_declare_weak_references(MMTk_ObjectReference object); diff --git a/gc/mmtk/src/api.rs b/gc/mmtk/src/api.rs index 5eac068..b9797f6 100644 --- a/gc/mmtk/src/api.rs +++ b/gc/mmtk/src/api.rs @@ -297,12 +297,16 @@ pub unsafe extern "C" fn mmtk_post_alloc( memory_manager::post_alloc::(unsafe { &mut *mutator }, refer, bytes, semantics) } -// TODO: Replace with buffered mmtk_add_obj_free_candidates #[no_mangle] -pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference, can_parallel_free: bool) { +pub unsafe extern "C" fn mmtk_add_obj_free_candidates( + objects: *const ObjectReference, + count: usize, + can_parallel_free: bool, +) { + let objects = unsafe { std::slice::from_raw_parts(objects, count) }; binding() .weak_proc - .add_obj_free_candidate(object, can_parallel_free) + .add_obj_free_candidates_batch(objects, can_parallel_free) } // =============== Weak references =============== diff --git a/gc/mmtk/src/weak_proc.rs b/gc/mmtk/src/weak_proc.rs index f103822..d38dbe0 100644 --- a/gc/mmtk/src/weak_proc.rs +++ b/gc/mmtk/src/weak_proc.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; use std::sync::Mutex; use mmtk::scheduler::GCWork; @@ -15,7 +13,6 @@ use crate::Ruby; pub struct WeakProcessor { non_parallel_obj_free_candidates: Mutex>, parallel_obj_free_candidates: Vec>>, - parallel_obj_free_candidates_counter: AtomicUsize, /// Objects that needs `obj_free` called when dying. /// If it is a bottleneck, replace it with a lock-free data structure, @@ -34,7 +31,6 @@ impl WeakProcessor { Self { non_parallel_obj_free_candidates: Mutex::new(Vec::new()), parallel_obj_free_candidates: vec![Mutex::new(Vec::new())], - parallel_obj_free_candidates_counter: AtomicUsize::new(0), weak_references: Mutex::new(Vec::new()), } } @@ -48,27 +44,34 @@ impl WeakProcessor { } } - /// Add an object as a candidate for `obj_free`. + /// Add a batch of objects as candidates for `obj_free`. /// - /// Multiple mutators can call it concurrently, so it has `&self`. - pub fn add_obj_free_candidate(&self, object: ObjectReference, can_parallel_free: bool) { + /// Amortizes mutex acquisition over the entire batch. Called when a + /// mutator's local buffer is flushed (buffer full or stop-the-world). + pub fn add_obj_free_candidates_batch( + &self, + objects: &[ObjectReference], + can_parallel_free: bool, + ) { + if objects.is_empty() { + return; + } + if can_parallel_free { - // Newly allocated objects are placed in parallel_obj_free_candidates using - // round-robin. This may not be ideal for load balancing. - let idx = self - .parallel_obj_free_candidates_counter - .fetch_add(1, Ordering::Relaxed) - % self.parallel_obj_free_candidates.len(); - - self.parallel_obj_free_candidates[idx] - .lock() - .unwrap() - .push(object); + let num_buckets = self.parallel_obj_free_candidates.len(); + for idx in 0..num_buckets { + let mut bucket = self.parallel_obj_free_candidates[idx].lock().unwrap(); + for (i, &obj) in objects.iter().enumerate() { + if i % num_buckets == idx { + bucket.push(obj); + } + } + } } else { self.non_parallel_obj_free_candidates .lock() .unwrap() - .push(object); + .extend_from_slice(objects); } }