Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions Lib/test/test_free_threading/test_code.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,41 @@
import unittest

try:
import ctypes
except ImportError:
ctypes = None

from threading import Thread
from unittest import TestCase

from test.support import threading_helper
from test.support.threading_helper import run_concurrently

if ctypes is not None:
capi = ctypes.pythonapi

freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp)

RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex
RequestCodeExtraIndex.argtypes = (freefunc,)
RequestCodeExtraIndex.restype = ctypes.c_ssize_t

SetExtra = capi.PyUnstable_Code_SetExtra
SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
SetExtra.restype = ctypes.c_int

GetExtra = capi.PyUnstable_Code_GetExtra
GetExtra.argtypes = (
ctypes.py_object,
ctypes.c_ssize_t,
ctypes.POINTER(ctypes.c_voidp),
)
GetExtra.restype = ctypes.c_int

# Note: each call to RequestCodeExtraIndex permanently allocates a slot
# (the counter is monotonically increasing), up to MAX_CO_EXTRA_USERS (255).
NTHREADS = 20


@threading_helper.requires_working_threading()
class TestCode(TestCase):
Expand All @@ -25,6 +57,83 @@ def run_in_thread():
for thread in threads:
thread.join()

@unittest.skipUnless(ctypes, "ctypes is required")
def test_request_code_extra_index_concurrent(self):
"""Test concurrent calls to RequestCodeExtraIndex"""
results = []

def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
results.append(idx)

run_concurrently(worker_func=worker, nthreads=NTHREADS)

# Every thread must get a unique index.
self.assertEqual(len(results), NTHREADS)
self.assertEqual(len(set(results)), NTHREADS)

@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_all_ops_concurrent(self):
"""Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra"""
LOOP = 100

def f():
pass

code = f.__code__

def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)

for i in range(LOOP):
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
self.assertEqual(ret, 0)

for _ in range(LOOP):
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
# The slot was set by this thread, so the value must
# be the last one written.
self.assertEqual(extra.value, LOOP)

run_concurrently(worker_func=worker, nthreads=NTHREADS)

@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_set_get_concurrent(self):
"""Test concurrent SetExtra + GetExtra on a shared index"""
LOOP = 100

def f():
pass

code = f.__code__

idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)

def worker():
for i in range(LOOP):
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
self.assertEqual(ret, 0)

for _ in range(LOOP):
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
# Value is set by any writer thread.
self.assertTrue(1 <= extra.value <= LOOP)

run_concurrently(worker_func=worker, nthreads=NTHREADS)

# Every thread's last write is LOOP, so the final value must be LOOP.
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
self.assertEqual(extra.value, LOOP)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Made :c:func:`PyUnstable_Code_SetExtra`, :c:func:`PyUnstable_Code_GetExtra`,
and :c:func:`PyUnstable_Eval_RequestCodeExtraIndex` thread-safe on the
:term:`free threaded <free threading>` build.
142 changes: 113 additions & 29 deletions Objects/codeobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,67 @@ typedef struct {
} _PyCodeObjectExtra;


static inline size_t
code_extra_size(Py_ssize_t n)
{
return sizeof(_PyCodeObjectExtra) + (n - 1) * sizeof(void *);
}

#ifdef Py_GIL_DISABLED
static int
code_extra_grow_ft(PyCodeObject *co, _PyCodeObjectExtra *old_co_extra,
Py_ssize_t old_ce_size, Py_ssize_t new_ce_size,
Py_ssize_t index, void *extra)
{
_Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED(co);
_PyCodeObjectExtra *new_co_extra = PyMem_Malloc(
code_extra_size(new_ce_size));
if (new_co_extra == NULL) {
PyErr_NoMemory();
return -1;
}

if (old_ce_size > 0) {
memcpy(new_co_extra->ce_extras, old_co_extra->ce_extras,
old_ce_size * sizeof(void *));
}
for (Py_ssize_t i = old_ce_size; i < new_ce_size; i++) {
new_co_extra->ce_extras[i] = NULL;
}
new_co_extra->ce_size = new_ce_size;
new_co_extra->ce_extras[index] = extra;

// Publish new buffer and its contents to lock-free readers.
FT_ATOMIC_STORE_PTR_RELEASE(co->co_extra, new_co_extra);
if (old_co_extra != NULL) {
// QSBR: defer old-buffer free until lock-free readers quiesce.
_PyMem_FreeDelayed(old_co_extra, code_extra_size(old_ce_size));
}
return 0;
}
#else
static int
code_extra_grow_gil(PyCodeObject *co, _PyCodeObjectExtra *old_co_extra,
Py_ssize_t old_ce_size, Py_ssize_t new_ce_size,
Py_ssize_t index, void *extra)
{
_PyCodeObjectExtra *new_co_extra = PyMem_Realloc(
old_co_extra, code_extra_size(new_ce_size));
if (new_co_extra == NULL) {
PyErr_NoMemory();
return -1;
}

for (Py_ssize_t i = old_ce_size; i < new_ce_size; i++) {
new_co_extra->ce_extras[i] = NULL;
}
new_co_extra->ce_size = new_ce_size;
new_co_extra->ce_extras[index] = extra;
co->co_extra = new_co_extra;
return 0;
}
#endif

int
PyUnstable_Code_GetExtra(PyObject *code, Py_ssize_t index, void **extra)
{
Expand All @@ -1583,15 +1644,19 @@ PyUnstable_Code_GetExtra(PyObject *code, Py_ssize_t index, void **extra)
return -1;
}

PyCodeObject *o = (PyCodeObject*) code;
_PyCodeObjectExtra *co_extra = (_PyCodeObjectExtra*) o->co_extra;
PyCodeObject *co = (PyCodeObject *)code;
*extra = NULL;

if (co_extra == NULL || index < 0 || co_extra->ce_size <= index) {
*extra = NULL;
if (index < 0) {
return 0;
}

*extra = co_extra->ce_extras[index];
// Lock-free read; pairs with release stores in SetExtra.
_PyCodeObjectExtra *co_extra = FT_ATOMIC_LOAD_PTR_ACQUIRE(co->co_extra);
if (co_extra != NULL && index < co_extra->ce_size) {
*extra = FT_ATOMIC_LOAD_PTR_ACQUIRE(co_extra->ce_extras[index]);
}

return 0;
}

Expand All @@ -1601,40 +1666,59 @@ PyUnstable_Code_SetExtra(PyObject *code, Py_ssize_t index, void *extra)
{
PyInterpreterState *interp = _PyInterpreterState_GET();

if (!PyCode_Check(code) || index < 0 ||
index >= interp->co_extra_user_count) {
// co_extra_user_count is monotonically increasing and published with
// release store in RequestCodeExtraIndex, so once an index is valid
// it stays valid.
Py_ssize_t user_count = FT_ATOMIC_LOAD_SSIZE_ACQUIRE(
interp->co_extra_user_count);

if (!PyCode_Check(code) || index < 0 || index >= user_count) {
PyErr_BadInternalCall();
return -1;
}

PyCodeObject *o = (PyCodeObject*) code;
_PyCodeObjectExtra *co_extra = (_PyCodeObjectExtra *) o->co_extra;
PyCodeObject *co = (PyCodeObject *)code;
int result = 0;
void *old_slot_value = NULL;

if (co_extra == NULL || co_extra->ce_size <= index) {
Py_ssize_t i = (co_extra == NULL ? 0 : co_extra->ce_size);
co_extra = PyMem_Realloc(
co_extra,
sizeof(_PyCodeObjectExtra) +
(interp->co_extra_user_count-1) * sizeof(void*));
if (co_extra == NULL) {
return -1;
}
for (; i < interp->co_extra_user_count; i++) {
co_extra->ce_extras[i] = NULL;
}
co_extra->ce_size = interp->co_extra_user_count;
o->co_extra = co_extra;
Py_BEGIN_CRITICAL_SECTION(co);

_PyCodeObjectExtra *old_co_extra = (_PyCodeObjectExtra *)co->co_extra;
Py_ssize_t old_ce_size = (old_co_extra == NULL)
? 0 : old_co_extra->ce_size;

// Fast path: slot already exists, update in place.
if (index < old_ce_size) {
old_slot_value = old_co_extra->ce_extras[index];
FT_ATOMIC_STORE_PTR_RELEASE(old_co_extra->ce_extras[index], extra);
goto done;
}

if (co_extra->ce_extras[index] != NULL) {
freefunc free = interp->co_extra_freefuncs[index];
if (free != NULL) {
free(co_extra->ce_extras[index]);
// Slow path: buffer needs to grow.
Py_ssize_t new_ce_size = user_count;
#ifdef Py_GIL_DISABLED
// FT build: allocate new buffer and swap; QSBR reclaims the old one.
result = code_extra_grow_ft(
co, old_co_extra, old_ce_size, new_ce_size, index, extra);
#else
// GIL build: grow with realloc.
result = code_extra_grow_gil(
co, old_co_extra, old_ce_size, new_ce_size, index, extra);
#endif

done:;
Py_END_CRITICAL_SECTION();
if (old_slot_value != NULL) {
// Free the old slot value if a free function was registered.
// The caller must ensure no other thread can still access the old
// value after this overwrite.
freefunc free_extra = interp->co_extra_freefuncs[index];
if (free_extra != NULL) {
free_extra(old_slot_value);
}
}

co_extra->ce_extras[index] = extra;
return 0;
return result;
}


Expand Down
20 changes: 18 additions & 2 deletions Python/ceval.c
Original file line number Diff line number Diff line change
Expand Up @@ -3493,11 +3493,27 @@ PyUnstable_Eval_RequestCodeExtraIndex(freefunc free)
PyInterpreterState *interp = _PyInterpreterState_GET();
Py_ssize_t new_index;

if (interp->co_extra_user_count == MAX_CO_EXTRA_USERS - 1) {
#ifdef Py_GIL_DISABLED
struct _py_code_state *state = &interp->code_state;
FT_MUTEX_LOCK(&state->mutex);
#endif

if (interp->co_extra_user_count >= MAX_CO_EXTRA_USERS - 1) {
#ifdef Py_GIL_DISABLED
FT_MUTEX_UNLOCK(&state->mutex);
#endif
return -1;
}
new_index = interp->co_extra_user_count++;

new_index = interp->co_extra_user_count;
interp->co_extra_freefuncs[new_index] = free;

// Publish freefuncs[new_index] before making the index visible.
FT_ATOMIC_STORE_SSIZE_RELEASE(interp->co_extra_user_count, new_index + 1);

#ifdef Py_GIL_DISABLED
FT_MUTEX_UNLOCK(&state->mutex);
#endif
return new_index;
}

Expand Down
Loading