Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/graphs/build-test-publish.act
Original file line number Diff line number Diff line change
Expand Up @@ -6021,6 +6021,7 @@ nodes:
x: 13320
y: 620
inputs:
_disable_concurrency: true
dependency-snapshot: 'true'
- id: core-string-fmt-v1-lychee-coconut-gray
type: core/string-fmt@v1
Expand Down
18 changes: 15 additions & 3 deletions core/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type NodeBaseInterface interface {
IsExecutionNode() bool
SetExecutionNode(execNode bool)

DisableConcurrency() bool
SetDisableConcurrency(v bool)

// Returns the cache type where data is stored or should be stored to
// By default this depends on if this is an execution node or not.
GetCacheType() CacheType
Expand All @@ -183,9 +186,10 @@ type NodeBaseComponent struct {
FullPath string // Full path of the node within the graph hierarchy
CacheId string // Unique identifier for the cache
NodeType string // Node type of the node (e.g. core/run@v1 or github.com/actions/checkout@v3)
Graph *ActionGraph
Parent NodeBaseInterface
isExecutionNode bool
Graph *ActionGraph
Parent NodeBaseInterface
isExecutionNode bool
disableConcurrency bool
}

func (n *NodeBaseComponent) GetCacheType() CacheType {
Expand All @@ -204,6 +208,14 @@ func (n *NodeBaseComponent) SetExecutionNode(execNode bool) {
n.isExecutionNode = execNode
}

func (n *NodeBaseComponent) DisableConcurrency() bool {
return n.disableConcurrency
}

func (n *NodeBaseComponent) SetDisableConcurrency(v bool) {
n.disableConcurrency = v
}

func (n *NodeBaseComponent) SetId(id string) {
n.Id = id
n.CacheId = fmt.Sprintf("%s:%s", n.Id, uuid.New().String())
Expand Down
11 changes: 9 additions & 2 deletions core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ type ExecutionState struct {
JobConclusion string `json:"jobConclusion"`

DebugCallback DebugCallback `json:"-"`

// PendingConcurrencyLocks tracks concurrency locks that are held during
// ExecuteImpl. The key is node id → *sync.Mutex. Released when the
// node calls Execute to dispatch downstream node, or as a fallback when
// ExecuteImpl returns without any dispatching
PendingConcurrencyLocks *sync.Map `json:"-"`
}

type ExecutionStateOptions struct {
Expand Down Expand Up @@ -244,8 +250,9 @@ func (c *ExecutionState) PushNewExecutionState(parentNode NodeBaseInterface) *Ex
PostSteps: c.PostSteps,
JobConclusion: c.JobConclusion,

Visited: visited,
DebugCallback: c.DebugCallback,
Visited: visited,
DebugCallback: c.DebugCallback,
PendingConcurrencyLocks: &sync.Map{},
}

return newEc
Expand Down
36 changes: 35 additions & 1 deletion core/executions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package core

import "github.com/actionforge/actrun-cli/utils"
import (
"sync"

"github.com/actionforge/actrun-cli/utils"
)

type ExecutionSource struct {
SrcNode HasExecutionInterface
Expand Down Expand Up @@ -30,6 +34,16 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error)

dest, hasDest := n.GetExecutionTarget(outputPort)

// Release any pending concurrency lock for the source node. When a node
// calls Execute on its own Executions to dispatch downstream, it means
// the nodes own ExecuteImpl work is done so we can release its lock
// and let the next concurrent caller continue
if hasDest && dest.SrcNode != nil {
if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dest.SrcNode.GetId()); loaded {
lockVal.(*sync.Mutex).Unlock()
}
}

// Set the step conclusion for the SOURCE node based on which output port is being executed.
// This enables ${{ steps.X.conclusion }} syntax similar to GitHub Actions.
// The conclusion is set BEFORE downstream nodes execute so they can read it.
Expand Down Expand Up @@ -65,6 +79,26 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error)
return nil
}

// If the destination node has _disable_concurrency set, serialize execution
// through a per-node-ID mutex to prevent concurrent ExecuteImpl calls.
// The lock is stored as pending and released when the node calls Execute
// to dispatch downstream (above), or as a fallback when ExecuteImpl
// returns without dispatching (below).
if dest.DstNode.DisableConcurrency() {
dcNodeId := dest.DstNode.GetId()
actual, _ := ec.Graph.ConcurrencyLocks.LoadOrStore(dcNodeId, &sync.Mutex{})
mu := actual.(*sync.Mutex)
mu.Lock()
ec.PendingConcurrencyLocks.Store(dcNodeId, mu)
// Fallback: release if ExecuteImpl returns without calling Execute
// (end of chain, error, or panic).
defer func() {
if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dcNodeId); loaded {
lockVal.(*sync.Mutex).Unlock()
}
}()
}

err = dest.DstNode.ExecuteImpl(ec, dest.Port, err)
if err != nil {
return err
Expand Down
25 changes: 20 additions & 5 deletions core/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type ActionGraph struct {
Outputs map[OutputId]OutputDefinition `yaml:"outputs" json:"outputs" bson:"outputs"`

Entry string

// ConcurrencyLocks maps node ID → *sync.Mutex. Used to serialize concurrent
// calls to a node's ExecuteImpl when the node's _disable_concurrency input is true.
ConcurrencyLocks *sync.Map `yaml:"-" json:"-"`
}

func (ag *ActionGraph) AddNode(nodeId string, node NodeBaseInterface) {
Expand Down Expand Up @@ -78,7 +82,8 @@ func (ag *ActionGraph) GetEntry() (NodeEntryInterface, error) {

func NewActionGraph() ActionGraph {
return ActionGraph{
Nodes: make(map[string]NodeBaseInterface),
Nodes: make(map[string]NodeBaseInterface),
ConcurrencyLocks: &sync.Map{},
}
}

Expand Down Expand Up @@ -245,10 +250,11 @@ func NewExecutionState(
ctx, cancel := context.WithCancel(ctx)

return &ExecutionState{
Graph: graph,
Hierarchy: make([]NodeBaseInterface, 0),
ContextStackLock: &sync.RWMutex{},
OutputCacheLock: &sync.RWMutex{},
Graph: graph,
Hierarchy: make([]NodeBaseInterface, 0),
ContextStackLock: &sync.RWMutex{},
OutputCacheLock: &sync.RWMutex{},
PendingConcurrencyLocks: &sync.Map{},

IsDebugSession: debugCb != nil,
DebugCallback: debugCb,
Expand Down Expand Up @@ -815,6 +821,15 @@ func LoadInputValues(node NodeBaseInterface, nodeI map[string]any, validate bool

subInputs := map[string][]subInput{}

// _disable_concurrency is not a regular input, its stored directly on
// the node instance so we pull it out before processing the rest.
if v, ok := inputValues["_disable_concurrency"]; ok {
if b, ok := v.(bool); ok && b {
node.SetDisableConcurrency(true)
}
delete(inputValues, "_disable_concurrency")
}

for portId, inputValue := range inputValues {
groupInputId, portIndex, isIndexPort := IsValidIndexPortId(portId)
if isIndexPort {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
start-0
end-0
start-1
end-1
start-2
end-2
start-3
end-3
start-4
end-4
all done
128 changes: 128 additions & 0 deletions tests_e2e/scripts/concurrent_for_disable_concurrency.act
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
entry: start
nodes:
- id: start
type: core/start@v1
position:
x: 10
y: 500
- id: loop
type: core/concurrent-for-loop@v1
position:
x: 290
y: 400
inputs:
first_index: 0
last_index: 4
worker_count: 0
- id: mul
type: core/math-multiply@v1
position:
x: 600
y: 200
inputs:
inputs[0]: null
inputs[1]: 100
- id: stagger
type: core/sleep@v1
position:
x: 900
y: 300
inputs:
unit: milliseconds
- id: fmt
type: core/string-fmt@v1
position:
x: 600
y: 500
inputs:
substitutes[0]: null
fmt: IDX=%v
- id: env
type: core/env-array@v1
position:
x: 1000
y: 500
inputs:
env[0]: ''
- id: run
type: core/run@v1
position:
x: 1400
y: 300
inputs:
_disable_concurrency: true
shell: python
script: |-
import time, sys, os
idx = os.environ["IDX"]
sys.stdout.write("start-" + idx + "\n")
sys.stdout.flush()
time.sleep(0.5)
sys.stdout.write("end-" + idx + "\n")
sys.stdout.flush()
- id: done
type: core/run@v1
position:
x: 1400
y: 800
inputs:
shell: python
script: |-
import sys
sys.stdout.write("all done\n")
connections:
- src:
node: loop
port: index
dst:
node: mul
port: inputs[0]
- src:
node: mul
port: result
dst:
node: stagger
port: duration
- src:
node: loop
port: index
dst:
node: fmt
port: substitutes[0]
- src:
node: fmt
port: result
dst:
node: env
port: env[0]
- src:
node: env
port: env
dst:
node: run
port: env
executions:
- src:
node: start
port: exec
dst:
node: loop
port: exec
- src:
node: loop
port: exec-body
dst:
node: stagger
port: exec
- src:
node: stagger
port: exec
dst:
node: run
port: exec
- src:
node: loop
port: exec-completed
dst:
node: done
port: exec
10 changes: 10 additions & 0 deletions tests_e2e/scripts/concurrent_for_disable_concurrency.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
echo "Test Concurrent For Loop with Disable Concurrency"

TEST_NAME=concurrent_for_disable_concurrency
GRAPH_FILE="${ACT_GRAPH_FILES_DIR}${PATH_SEPARATOR}${TEST_NAME}.act"
cp $GRAPH_FILE $TEST_NAME.act
export ACT_GRAPH_FILE=$TEST_NAME.act
export ACT_LOGLEVEL=normal

#! test actrun

Loading
Loading