Skip to content

Commit 3593774

Browse files
committed
WIP: tweak coop threads
1 parent 421d4a6 commit 3593774

2 files changed

Lines changed: 209 additions & 53 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 93 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -392,8 +393,10 @@ def pending(self):
392393
return not self.running() and self.ready_func is not None
393394

394395
def ready(self):
395-
assert(self.pending())
396-
return self.ready_func()
396+
return self.pending() and self.ready_func()
397+
398+
def ready_list(inst: ComponentInstance) -> list[Thread]:
399+
return [t for t in inst.threads.array if t and t.ready()]
397400

398401
def __init__(self, task, thread_func):
399402
self.task = task
@@ -410,17 +413,48 @@ def fiber_func():
410413
self.fiber_lock.acquire()
411414
assert(self.running() and self.suspend_result == SuspendResult.NOT_CANCELLED)
412415
self.suspend_result = None
413-
thread_func(self)
414-
assert(self.running())
415-
self.task.thread_stop(self)
416-
if self.index is not None:
417-
self.task.inst.threads.remove(self.index)
418-
self.parent_lock.release()
416+
try:
417+
thread_func(self)
418+
self.exit()
419+
except ThreadExit:
420+
return
421+
assert(False)
419422
self.fiber = threading.Thread(target = fiber_func)
420423
self.fiber.start()
421424
self.task.thread_start(self)
422425
assert(self.suspended())
423426

427+
def exit(self):
428+
assert(self.running() and self.task.may_block())
429+
if self.index is not None:
430+
self.task.inst.threads.remove(self.index)
431+
self.task.thread_stop(self)
432+
if self.task.is_sync_before_return():
433+
self.release_any_ready_sibling()
434+
else:
435+
self.parent_lock.release()
436+
raise ThreadExit()
437+
438+
def release_any_ready_sibling(self):
439+
other = random.choice(Thread.ready_list(self.task.inst))
440+
assert(self is not other)
441+
other.clear_pending()
442+
self.release_sibling(other)
443+
444+
def clear_pending(self, suspend_result = SuspendResult.NOT_CANCELLED):
445+
assert(suspend_result == SuspendResult.CANCELLED or self.ready_func())
446+
self.ready_func = None
447+
self.task.inst.store.pending.remove(self)
448+
449+
def release_sibling(self, other: Thread):
450+
assert(other.suspend_result is None)
451+
other.suspend_result = SuspendResult.NOT_CANCELLED
452+
assert(self.parent_lock and not other.parent_lock)
453+
other.parent_lock = self.parent_lock
454+
self.parent_lock = None
455+
assert(not self.running() and other.running())
456+
other.fiber_lock.release()
457+
424458
def resume_later(self):
425459
assert(self.suspended())
426460
self.ready_func = lambda: True
@@ -429,9 +463,7 @@ def resume_later(self):
429463
def resume(self, suspend_result = SuspendResult.NOT_CANCELLED):
430464
assert(not self.running())
431465
if self.pending():
432-
assert(suspend_result == SuspendResult.CANCELLED or self.ready_func())
433-
self.ready_func = None
434-
self.task.inst.store.pending.remove(self)
466+
self.clear_pending(suspend_result)
435467
assert(self.cancellable or suspend_result == SuspendResult.NOT_CANCELLED)
436468
assert(self.suspend_result is None)
437469
self.suspend_result = suspend_result
@@ -446,7 +478,10 @@ def suspend(self, cancellable) -> SuspendResult:
446478
assert(self.running() and self.task.may_block())
447479
assert(not self.cancellable and self.suspend_result is None)
448480
self.cancellable = cancellable
449-
self.parent_lock.release()
481+
if self.task.is_sync_before_return():
482+
self.release_any_ready_sibling()
483+
else:
484+
self.parent_lock.release()
450485
self.fiber_lock.acquire()
451486
assert(self.running())
452487
self.cancellable = False
@@ -464,16 +499,11 @@ def suspend_until(self, ready_func, cancellable = False) -> SuspendResult:
464499
self.task.inst.store.pending.append(self)
465500
return self.suspend(cancellable)
466501

467-
def switch_to(self, cancellable, other: Thread) -> SuspendResult:
502+
def suspend_to_suspended(self, cancellable, other: Thread) -> SuspendResult:
468503
assert(self.running() and other.suspended())
469-
assert(not self.cancellable and self.suspend_result is None and other.suspend_result is None)
504+
assert(not self.cancellable and self.suspend_result is None)
470505
self.cancellable = cancellable
471-
other.suspend_result = SuspendResult.NOT_CANCELLED
472-
assert(self.parent_lock and not other.parent_lock)
473-
other.parent_lock = self.parent_lock
474-
self.parent_lock = None
475-
assert(not self.running() and other.running())
476-
other.fiber_lock.release()
506+
self.release_sibling(other)
477507
self.fiber_lock.acquire()
478508
assert(self.running())
479509
self.cancellable = False
@@ -483,11 +513,21 @@ def switch_to(self, cancellable, other: Thread) -> SuspendResult:
483513
assert(cancellable or suspend_result == SuspendResult.NOT_CANCELLED)
484514
return suspend_result
485515

486-
def yield_to(self, cancellable, other: Thread) -> SuspendResult:
487-
assert(not self.ready_func)
516+
def yield_to_suspended(self, cancellable, other: Thread) -> SuspendResult:
517+
assert(self.running() and other.suspended())
488518
self.ready_func = lambda: True
489519
self.task.inst.store.pending.append(self)
490-
return self.switch_to(cancellable, other)
520+
return self.suspend_to_suspended(cancellable, other)
521+
522+
def suspend_to_ready(self, cancellable, other: Thread) -> SuspendResult:
523+
assert(self.running() and other.ready())
524+
other.clear_pending()
525+
return self.suspend_to_suspended(cancellable, other)
526+
527+
def yield_to_ready(self, cancellable, other: Thread) -> SuspendResult:
528+
assert(self.running() and other.ready())
529+
other.clear_pending()
530+
return self.yield_to_suspended(cancellable, other)
491531

492532
#### Waitable State
493533

@@ -597,8 +637,11 @@ def thread_stop(self, thread):
597637
def needs_exclusive(self):
598638
return not self.opts.async_ or self.opts.callback
599639

640+
def is_sync_before_return(self):
641+
return not self.ft.async_ and self.state != Task.State.RESOLVED
642+
600643
def may_block(self):
601-
return self.ft.async_ or self.state == Task.State.RESOLVED
644+
return not self.is_sync_before_return() or Thread.ready_list(self.inst)
602645

603646
def enter(self, thread):
604647
assert(thread in self.threads and thread.task is self)
@@ -1998,10 +2041,7 @@ def thread_func(thread):
19982041
inst.exclusive = False
19992042
match code:
20002043
case CallbackCode.YIELD:
2001-
if task.may_block():
2002-
event = task.yield_until(lambda: not inst.exclusive, thread, cancellable = True)
2003-
else:
2004-
event = (EventCode.NONE, 0, 0)
2044+
event = task.yield_until(lambda: not inst.exclusive, thread, cancellable = True)
20052045
case CallbackCode.WAIT:
20062046
trap_if(not task.may_block())
20072047
wset = inst.handles.get(si)
@@ -2514,6 +2554,13 @@ def canon_thread_resume_later(thread, i):
25142554
other_thread.resume_later()
25152555
return []
25162556

2557+
### 🧵 `canon thread.exit`
2558+
2559+
def canon_thread_exit(thread):
2560+
trap_if(not thread.task.inst.may_leave)
2561+
thread.exit()
2562+
assert(False)
2563+
25172564
### 🧵 `canon thread.suspend`
25182565

25192566
def canon_thread_suspend(cancellable, thread):
@@ -2529,60 +2576,62 @@ def canon_thread_suspend(cancellable, thread):
25292576

25302577
def canon_thread_yield(cancellable, thread):
25312578
trap_if(not thread.task.inst.may_leave)
2532-
if not thread.task.may_block():
2533-
return [SuspendResult.NOT_CANCELLED]
25342579
if thread.task.deliver_pending_cancel(cancellable):
25352580
suspend_result = SuspendResult.CANCELLED
25362581
else:
25372582
suspend_result = thread.suspend_until(lambda:True, cancellable)
25382583
return [suspend_result]
25392584

2540-
### 🧵 `canon thread.switch-to`
2585+
### 🧵 `canon thread.suspend-to-suspended`
25412586

2542-
def canon_thread_switch_to(cancellable, thread, i):
2587+
def canon_thread_suspend_to_suspended(cancellable, thread, i):
25432588
trap_if(not thread.task.inst.may_leave)
25442589
other_thread = thread.task.inst.threads.get(i)
25452590
trap_if(not other_thread.suspended())
25462591
if thread.task.deliver_pending_cancel(cancellable):
25472592
suspend_result = SuspendResult.CANCELLED
25482593
else:
2549-
suspend_result = thread.switch_to(cancellable, other_thread)
2594+
suspend_result = thread.suspend_to_suspended(cancellable, other_thread)
25502595
return [suspend_result]
25512596

2552-
### 🧵 `canon thread.yield-to`
2597+
### 🧵 `canon thread.yield-to-suspended`
25532598

2554-
def canon_thread_yield_to(cancellable, thread, i):
2599+
def canon_thread_yield_to_suspended(cancellable, thread, i):
25552600
trap_if(not thread.task.inst.may_leave)
25562601
other_thread = thread.task.inst.threads.get(i)
25572602
trap_if(not other_thread.suspended())
25582603
if thread.task.deliver_pending_cancel(cancellable):
25592604
suspend_result = SuspendResult.CANCELLED
25602605
else:
2561-
suspend_result = thread.yield_to(cancellable, other_thread)
2606+
suspend_result = thread.yield_to_suspended(cancellable, other_thread)
25622607
return [suspend_result]
25632608

2564-
### 🧵 `canon thread.switch-to`
2609+
### 🧵 `canon thread.suspend-then-promote`
25652610

2566-
def canon_thread_switch_to(cancellable, thread, i):
2611+
def canon_thread_suspend_then_promote(cancellable, thread, i):
25672612
trap_if(not thread.task.inst.may_leave)
2613+
trap_if(not thread.task.may_block())
25682614
other_thread = thread.task.inst.threads.get(i)
2569-
trap_if(not other_thread.suspended())
25702615
if thread.task.deliver_pending_cancel(cancellable):
25712616
suspend_result = SuspendResult.CANCELLED
2617+
elif other_thread.ready():
2618+
suspend_result = thread.suspend_to_ready(cancellable, other_thread)
25722619
else:
2573-
suspend_result = thread.switch_to(cancellable, other_thread)
2620+
suspend_result = thread.suspend(cancellable)
25742621
return [suspend_result]
25752622

2576-
### 🧵 `canon thread.yield-to`
2623+
### 🧵 `canon thread.yield-then-promote`
25772624

2578-
def canon_thread_yield_to(cancellable, thread, i):
2625+
def canon_thread_yield_then_promote(cancellable, thread, i):
25792626
trap_if(not thread.task.inst.may_leave)
2627+
trap_if(not thread.task.may_block())
25802628
other_thread = thread.task.inst.threads.get(i)
2581-
trap_if(not other_thread.suspended())
25822629
if thread.task.deliver_pending_cancel(cancellable):
25832630
suspend_result = SuspendResult.CANCELLED
2631+
elif other_thread.ready():
2632+
suspend_result = thread.yield_to_ready(cancellable, other_thread)
25842633
else:
2585-
suspend_result = thread.yield_to(cancellable, other_thread)
2634+
suspend_result = thread.suspend_until(lambda:True, cancellable)
25862635
return [suspend_result]
25872636

25882637
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)