Skip to content

Commit 7f5ec04

Browse files
committed
WIP: tweak coop threads
1 parent e42e8ea commit 7f5ec04

2 files changed

Lines changed: 255 additions & 48 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 110 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -395,8 +396,10 @@ def pending(self):
395396
return not self.running() and self.ready_func is not None
396397

397398
def ready(self):
398-
assert(self.pending())
399-
return self.ready_func()
399+
return self.pending() and self.ready_func()
400+
401+
def ready_list(inst: ComponentInstance) -> list[Thread]:
402+
return [t for t in inst.threads.array if t and t.ready()]
400403

401404
def __init__(self, task, thread_func):
402405
self.task = task
@@ -413,28 +416,57 @@ def fiber_func():
413416
self.fiber_lock.acquire()
414417
assert(self.running() and not cancelled(self.suspend_result))
415418
self.suspend_result = None
416-
thread_func(self)
417-
assert(self.running())
418-
self.task.thread_stop(self)
419-
if self.index is not None:
420-
self.task.inst.threads.remove(self.index)
421-
self.parent_lock.release()
419+
try:
420+
thread_func(self)
421+
self.exit()
422+
except ThreadExit:
423+
return
424+
assert(False)
422425
self.fiber = threading.Thread(target = fiber_func)
423426
self.fiber.start()
424427
self.task.thread_start(self)
425428
assert(self.suspended())
426429

427-
def resume_later(self):
430+
def exit(self):
431+
assert(self.running() and self.task.may_block())
432+
if self.index is not None:
433+
self.task.inst.threads.remove(self.index)
434+
self.task.thread_stop(self)
435+
if self.task.is_sync_before_return():
436+
self.release_some_ready_sibling()
437+
else:
438+
self.parent_lock.release()
439+
raise ThreadExit()
440+
441+
def release_some_ready_sibling(self):
442+
other = random.choice(Thread.ready_list(self.task.inst))
443+
assert(self is not other)
444+
other.clear_pending()
445+
self.release_sibling(other)
446+
447+
def clear_pending(self, suspend_result = SuspendResult.NOT_CANCELLED):
448+
assert(cancelled(suspend_result) or self.ready_func())
449+
self.ready_func = None
450+
self.task.inst.store.pending.remove(self)
451+
452+
def release_sibling(self, other: Thread):
453+
assert(other.suspend_result is None)
454+
other.suspend_result = SuspendResult.NOT_CANCELLED
455+
assert(self.parent_lock and not other.parent_lock)
456+
other.parent_lock = self.parent_lock
457+
self.parent_lock = None
458+
assert(not self.running() and other.running())
459+
other.fiber_lock.release()
460+
461+
def unsuspend(self):
428462
assert(self.suspended())
429463
self.ready_func = lambda: True
430464
self.task.inst.store.pending.append(self)
431465

432466
def resume(self, suspend_result = SuspendResult.NOT_CANCELLED):
433467
assert(not self.running())
434468
if self.pending():
435-
assert(cancelled(suspend_result) or self.ready_func())
436-
self.ready_func = None
437-
self.task.inst.store.pending.remove(self)
469+
self.clear_pending(suspend_result)
438470
assert(self.cancellable or not cancelled(suspend_result))
439471
assert(self.suspend_result is None)
440472
self.suspend_result = suspend_result
@@ -449,7 +481,10 @@ def suspend(self, cancellable) -> SuspendResult:
449481
assert(self.running() and self.task.may_block())
450482
assert(not self.cancellable and self.suspend_result is None)
451483
self.cancellable = cancellable
452-
self.parent_lock.release()
484+
if self.task.is_sync_before_return():
485+
self.release_some_ready_sibling()
486+
else:
487+
self.parent_lock.release()
453488
self.fiber_lock.acquire()
454489
assert(self.running())
455490
self.cancellable = False
@@ -477,16 +512,11 @@ def yield_until(self, ready_func, cancellable) -> SuspendResult:
477512
def yield_(self, cancellable) -> SuspendResult:
478513
return self.yield_until(lambda: True, cancellable)
479514

480-
def switch_to(self, cancellable, other: Thread) -> SuspendResult:
515+
def suspend_to_suspended(self, cancellable, other: Thread) -> SuspendResult:
481516
assert(self.running() and other.suspended())
482-
assert(not self.cancellable and self.suspend_result is None and other.suspend_result is None)
517+
assert(not self.cancellable and self.suspend_result is None)
483518
self.cancellable = cancellable
484-
other.suspend_result = SuspendResult.NOT_CANCELLED
485-
assert(self.parent_lock and not other.parent_lock)
486-
other.parent_lock = self.parent_lock
487-
self.parent_lock = None
488-
assert(not self.running() and other.running())
489-
other.fiber_lock.release()
519+
self.release_sibling(other)
490520
self.fiber_lock.acquire()
491521
assert(self.running())
492522
self.cancellable = False
@@ -496,11 +526,21 @@ def switch_to(self, cancellable, other: Thread) -> SuspendResult:
496526
assert(cancellable or not cancelled(suspend_result))
497527
return suspend_result
498528

499-
def yield_to(self, cancellable, other: Thread) -> SuspendResult:
529+
def yield_to_suspended(self, cancellable, other: Thread) -> SuspendResult:
500530
assert(self.running() and other.suspended())
501531
self.ready_func = lambda: True
502532
self.task.inst.store.pending.append(self)
503-
return self.switch_to(cancellable, other)
533+
return self.suspend_to_suspended(cancellable, other)
534+
535+
def suspend_to_ready(self, cancellable, other: Thread) -> SuspendResult:
536+
assert(self.running() and other.ready())
537+
other.clear_pending()
538+
return self.suspend_to_suspended(cancellable, other)
539+
540+
def yield_to_ready(self, cancellable, other: Thread) -> SuspendResult:
541+
assert(self.running() and other.ready())
542+
other.clear_pending()
543+
return self.yield_to_suspended(cancellable, other)
504544

505545
#### Waitable State
506546

@@ -624,8 +664,11 @@ def thread_stop(self, thread):
624664
def needs_exclusive(self):
625665
return not self.opts.async_ or self.opts.callback
626666

667+
def is_sync_before_return(self):
668+
return not self.ft.async_ and self.state != Task.State.RESOLVED
669+
627670
def may_block(self):
628-
return self.ft.async_ or self.state == Task.State.RESOLVED
671+
return not self.is_sync_before_return() or bool(Thread.ready_list(self.inst))
629672

630673
def enter(self, thread):
631674
assert(thread in self.threads and thread.task is self)
@@ -2513,15 +2556,22 @@ def thread_func(thread):
25132556
new_thread.index = thread.task.inst.threads.add(new_thread)
25142557
return [new_thread.index]
25152558

2516-
### 🧵 `canon thread.resume-later`
2559+
### 🧵 `canon thread.unsuspend`
25172560

2518-
def canon_thread_resume_later(thread, i):
2561+
def canon_thread_unsuspend(thread, i):
25192562
trap_if(not thread.task.inst.may_leave)
25202563
other_thread = thread.task.inst.threads.get(i)
25212564
trap_if(not other_thread.suspended())
2522-
other_thread.resume_later()
2565+
other_thread.unsuspend()
25232566
return []
25242567

2568+
### 🧵 `canon thread.exit`
2569+
2570+
def canon_thread_exit(thread):
2571+
trap_if(not thread.task.inst.may_leave)
2572+
thread.exit()
2573+
assert(False)
2574+
25252575
### 🧵 `canon thread.suspend`
25262576

25272577
def canon_thread_suspend(cancellable, thread):
@@ -2543,28 +2593,55 @@ def canon_thread_yield(cancellable, thread):
25432593
suspend_result = thread.yield_(cancellable)
25442594
return [suspend_result]
25452595

2546-
### 🧵 `canon thread.switch-to`
2596+
### 🧵 `canon thread.suspend-to-suspended`
25472597

2548-
def canon_thread_switch_to(cancellable, thread, i):
2598+
def canon_thread_suspend_to_suspended(cancellable, thread, i):
25492599
trap_if(not thread.task.inst.may_leave)
25502600
other_thread = thread.task.inst.threads.get(i)
25512601
trap_if(not other_thread.suspended())
25522602
if thread.task.deliver_pending_cancel(cancellable):
25532603
suspend_result = SuspendResult.CANCELLED
25542604
else:
2555-
suspend_result = thread.switch_to(cancellable, other_thread)
2605+
suspend_result = thread.suspend_to_suspended(cancellable, other_thread)
25562606
return [suspend_result]
25572607

2558-
### 🧵 `canon thread.yield-to`
2608+
### 🧵 `canon thread.yield-to-suspended`
25592609

2560-
def canon_thread_yield_to(cancellable, thread, i):
2610+
def canon_thread_yield_to_suspended(cancellable, thread, i):
25612611
trap_if(not thread.task.inst.may_leave)
25622612
other_thread = thread.task.inst.threads.get(i)
25632613
trap_if(not other_thread.suspended())
25642614
if thread.task.deliver_pending_cancel(cancellable):
25652615
suspend_result = SuspendResult.CANCELLED
25662616
else:
2567-
suspend_result = thread.yield_to(cancellable, other_thread)
2617+
suspend_result = thread.yield_to_suspended(cancellable, other_thread)
2618+
return [suspend_result]
2619+
2620+
### 🧵 `canon thread.suspend-then-promote`
2621+
2622+
def canon_thread_suspend_then_promote(cancellable, thread, i):
2623+
trap_if(not thread.task.inst.may_leave)
2624+
trap_if(not thread.task.may_block())
2625+
other_thread = thread.task.inst.threads.get(i)
2626+
if thread.task.deliver_pending_cancel(cancellable):
2627+
suspend_result = SuspendResult.CANCELLED
2628+
elif other_thread.ready():
2629+
suspend_result = thread.suspend_to_ready(cancellable, other_thread)
2630+
else:
2631+
suspend_result = thread.suspend(cancellable)
2632+
return [suspend_result]
2633+
2634+
### 🧵 `canon thread.yield-then-promote`
2635+
2636+
def canon_thread_yield_then_promote(cancellable, thread, i):
2637+
trap_if(not thread.task.inst.may_leave)
2638+
other_thread = thread.task.inst.threads.get(i)
2639+
if thread.task.deliver_pending_cancel(cancellable):
2640+
suspend_result = SuspendResult.CANCELLED
2641+
elif other_thread.ready():
2642+
suspend_result = thread.yield_to_ready(cancellable, other_thread)
2643+
else:
2644+
suspend_result = thread.yield_(cancellable)
25682645
return [suspend_result]
25692646

25702647
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)