diff --git a/.github/workflows/graphs/build-test-publish.act b/.github/workflows/graphs/build-test-publish.act index 02ae836..3296bde 100644 --- a/.github/workflows/graphs/build-test-publish.act +++ b/.github/workflows/graphs/build-test-publish.act @@ -6021,6 +6021,7 @@ nodes: x: 13320 y: 620 inputs: + _disable_concurrency: true dependency-snapshot: 'true' - id: core-string-fmt-v1-lychee-coconut-gray type: core/string-fmt@v1 diff --git a/core/base.go b/core/base.go index a0cd38a..6597d8c 100644 --- a/core/base.go +++ b/core/base.go @@ -169,6 +169,9 @@ type NodeBaseInterface interface { IsExecutionNode() bool SetExecutionNode(execNode bool) + DisableConcurrency() bool + SetDisableConcurrency(v bool) + // Returns the cache type where data is stored or should be stored to // By default this depends on if this is an execution node or not. GetCacheType() CacheType @@ -183,9 +186,10 @@ type NodeBaseComponent struct { FullPath string // Full path of the node within the graph hierarchy CacheId string // Unique identifier for the cache NodeType string // Node type of the node (e.g. core/run@v1 or github.com/actions/checkout@v3) - Graph *ActionGraph - Parent NodeBaseInterface - isExecutionNode bool + Graph *ActionGraph + Parent NodeBaseInterface + isExecutionNode bool + disableConcurrency bool } func (n *NodeBaseComponent) GetCacheType() CacheType { @@ -204,6 +208,14 @@ func (n *NodeBaseComponent) SetExecutionNode(execNode bool) { n.isExecutionNode = execNode } +func (n *NodeBaseComponent) DisableConcurrency() bool { + return n.disableConcurrency +} + +func (n *NodeBaseComponent) SetDisableConcurrency(v bool) { + n.disableConcurrency = v +} + func (n *NodeBaseComponent) SetId(id string) { n.Id = id n.CacheId = fmt.Sprintf("%s:%s", n.Id, uuid.New().String()) diff --git a/core/context.go b/core/context.go index 5a649a2..7ec9734 100644 --- a/core/context.go +++ b/core/context.go @@ -169,6 +169,12 @@ type ExecutionState struct { JobConclusion string `json:"jobConclusion"` DebugCallback DebugCallback `json:"-"` + + // PendingConcurrencyLocks tracks concurrency locks that are held during + // ExecuteImpl. The key is node id → *sync.Mutex. Released when the + // node calls Execute to dispatch downstream node, or as a fallback when + // ExecuteImpl returns without any dispatching + PendingConcurrencyLocks *sync.Map `json:"-"` } type ExecutionStateOptions struct { @@ -244,8 +250,9 @@ func (c *ExecutionState) PushNewExecutionState(parentNode NodeBaseInterface) *Ex PostSteps: c.PostSteps, JobConclusion: c.JobConclusion, - Visited: visited, - DebugCallback: c.DebugCallback, + Visited: visited, + DebugCallback: c.DebugCallback, + PendingConcurrencyLocks: &sync.Map{}, } return newEc diff --git a/core/executions.go b/core/executions.go index ca2b66a..12ff733 100644 --- a/core/executions.go +++ b/core/executions.go @@ -1,6 +1,10 @@ package core -import "github.com/actionforge/actrun-cli/utils" +import ( + "sync" + + "github.com/actionforge/actrun-cli/utils" +) type ExecutionSource struct { SrcNode HasExecutionInterface @@ -30,6 +34,16 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error) dest, hasDest := n.GetExecutionTarget(outputPort) + // Release any pending concurrency lock for the source node. When a node + // calls Execute on its own Executions to dispatch downstream, it means + // the nodes own ExecuteImpl work is done so we can release its lock + // and let the next concurrent caller continue + if hasDest && dest.SrcNode != nil { + if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dest.SrcNode.GetId()); loaded { + lockVal.(*sync.Mutex).Unlock() + } + } + // Set the step conclusion for the SOURCE node based on which output port is being executed. // This enables ${{ steps.X.conclusion }} syntax similar to GitHub Actions. // The conclusion is set BEFORE downstream nodes execute so they can read it. @@ -65,6 +79,26 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error) return nil } + // If the destination node has _disable_concurrency set, serialize execution + // through a per-node-ID mutex to prevent concurrent ExecuteImpl calls. + // The lock is stored as pending and released when the node calls Execute + // to dispatch downstream (above), or as a fallback when ExecuteImpl + // returns without dispatching (below). + if dest.DstNode.DisableConcurrency() { + dcNodeId := dest.DstNode.GetId() + actual, _ := ec.Graph.ConcurrencyLocks.LoadOrStore(dcNodeId, &sync.Mutex{}) + mu := actual.(*sync.Mutex) + mu.Lock() + ec.PendingConcurrencyLocks.Store(dcNodeId, mu) + // Fallback: release if ExecuteImpl returns without calling Execute + // (end of chain, error, or panic). + defer func() { + if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dcNodeId); loaded { + lockVal.(*sync.Mutex).Unlock() + } + }() + } + err = dest.DstNode.ExecuteImpl(ec, dest.Port, err) if err != nil { return err diff --git a/core/graph.go b/core/graph.go index 363248f..60b675f 100644 --- a/core/graph.go +++ b/core/graph.go @@ -40,6 +40,10 @@ type ActionGraph struct { Outputs map[OutputId]OutputDefinition `yaml:"outputs" json:"outputs" bson:"outputs"` Entry string + + // ConcurrencyLocks maps node ID → *sync.Mutex. Used to serialize concurrent + // calls to a node's ExecuteImpl when the node's _disable_concurrency input is true. + ConcurrencyLocks *sync.Map `yaml:"-" json:"-"` } func (ag *ActionGraph) AddNode(nodeId string, node NodeBaseInterface) { @@ -78,7 +82,8 @@ func (ag *ActionGraph) GetEntry() (NodeEntryInterface, error) { func NewActionGraph() ActionGraph { return ActionGraph{ - Nodes: make(map[string]NodeBaseInterface), + Nodes: make(map[string]NodeBaseInterface), + ConcurrencyLocks: &sync.Map{}, } } @@ -245,10 +250,11 @@ func NewExecutionState( ctx, cancel := context.WithCancel(ctx) return &ExecutionState{ - Graph: graph, - Hierarchy: make([]NodeBaseInterface, 0), - ContextStackLock: &sync.RWMutex{}, - OutputCacheLock: &sync.RWMutex{}, + Graph: graph, + Hierarchy: make([]NodeBaseInterface, 0), + ContextStackLock: &sync.RWMutex{}, + OutputCacheLock: &sync.RWMutex{}, + PendingConcurrencyLocks: &sync.Map{}, IsDebugSession: debugCb != nil, DebugCallback: debugCb, @@ -815,6 +821,15 @@ func LoadInputValues(node NodeBaseInterface, nodeI map[string]any, validate bool subInputs := map[string][]subInput{} + // _disable_concurrency is not a regular input, its stored directly on + // the node instance so we pull it out before processing the rest. + if v, ok := inputValues["_disable_concurrency"]; ok { + if b, ok := v.(bool); ok && b { + node.SetDisableConcurrency(true) + } + delete(inputValues, "_disable_concurrency") + } + for portId, inputValue := range inputValues { groupInputId, portIndex, isIndexPort := IsValidIndexPortId(portId) if isIndexPort { diff --git a/tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 b/tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 new file mode 100644 index 0000000..58d8ca6 --- /dev/null +++ b/tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 @@ -0,0 +1,11 @@ +start-0 +end-0 +start-1 +end-1 +start-2 +end-2 +start-3 +end-3 +start-4 +end-4 +all done diff --git a/tests_e2e/scripts/concurrent_for_disable_concurrency.act b/tests_e2e/scripts/concurrent_for_disable_concurrency.act new file mode 100644 index 0000000..8f8dc51 --- /dev/null +++ b/tests_e2e/scripts/concurrent_for_disable_concurrency.act @@ -0,0 +1,128 @@ +entry: start +nodes: + - id: start + type: core/start@v1 + position: + x: 10 + y: 500 + - id: loop + type: core/concurrent-for-loop@v1 + position: + x: 290 + y: 400 + inputs: + first_index: 0 + last_index: 4 + worker_count: 0 + - id: mul + type: core/math-multiply@v1 + position: + x: 600 + y: 200 + inputs: + inputs[0]: null + inputs[1]: 100 + - id: stagger + type: core/sleep@v1 + position: + x: 900 + y: 300 + inputs: + unit: milliseconds + - id: fmt + type: core/string-fmt@v1 + position: + x: 600 + y: 500 + inputs: + substitutes[0]: null + fmt: IDX=%v + - id: env + type: core/env-array@v1 + position: + x: 1000 + y: 500 + inputs: + env[0]: '' + - id: run + type: core/run@v1 + position: + x: 1400 + y: 300 + inputs: + _disable_concurrency: true + shell: python + script: |- + import time, sys, os + idx = os.environ["IDX"] + sys.stdout.write("start-" + idx + "\n") + sys.stdout.flush() + time.sleep(0.5) + sys.stdout.write("end-" + idx + "\n") + sys.stdout.flush() + - id: done + type: core/run@v1 + position: + x: 1400 + y: 800 + inputs: + shell: python + script: |- + import sys + sys.stdout.write("all done\n") +connections: + - src: + node: loop + port: index + dst: + node: mul + port: inputs[0] + - src: + node: mul + port: result + dst: + node: stagger + port: duration + - src: + node: loop + port: index + dst: + node: fmt + port: substitutes[0] + - src: + node: fmt + port: result + dst: + node: env + port: env[0] + - src: + node: env + port: env + dst: + node: run + port: env +executions: + - src: + node: start + port: exec + dst: + node: loop + port: exec + - src: + node: loop + port: exec-body + dst: + node: stagger + port: exec + - src: + node: stagger + port: exec + dst: + node: run + port: exec + - src: + node: loop + port: exec-completed + dst: + node: done + port: exec diff --git a/tests_e2e/scripts/concurrent_for_disable_concurrency.sh b/tests_e2e/scripts/concurrent_for_disable_concurrency.sh new file mode 100644 index 0000000..279fb4f --- /dev/null +++ b/tests_e2e/scripts/concurrent_for_disable_concurrency.sh @@ -0,0 +1,10 @@ +echo "Test Concurrent For Loop with Disable Concurrency" + +TEST_NAME=concurrent_for_disable_concurrency +GRAPH_FILE="${ACT_GRAPH_FILES_DIR}${PATH_SEPARATOR}${TEST_NAME}.act" +cp $GRAPH_FILE $TEST_NAME.act +export ACT_GRAPH_FILE=$TEST_NAME.act +export ACT_LOGLEVEL=normal + +#! test actrun + diff --git a/tests_unit/disable_concurrency_input_test.go b/tests_unit/disable_concurrency_input_test.go new file mode 100644 index 0000000..a4b0bae --- /dev/null +++ b/tests_unit/disable_concurrency_input_test.go @@ -0,0 +1,83 @@ +//go:build tests_unit + +package tests_unit + +import ( + "testing" + + "github.com/actionforge/actrun-cli/core" + "go.yaml.in/yaml/v4" + + // initialize all nodes + _ "github.com/actionforge/actrun-cli/nodes" +) + +func loadTestGraph(t *testing.T, graphStr string) core.ActionGraph { + t.Helper() + var graphYaml map[string]any + if err := yaml.Unmarshal([]byte(graphStr), &graphYaml); err != nil { + t.Fatalf("unmarshal YAML: %v", err) + } + ag, errs := core.LoadGraph(graphYaml, nil, "", false, core.RunOpts{}) + if len(errs) > 0 { + t.Fatalf("LoadGraph: %v", errs[0]) + } + return ag +} + +func TestDisableConcurrencyNotSetByDefault(t *testing.T) { + ag := loadTestGraph(t, ` +entry: start +nodes: + - id: start + type: core/start@v1 + position: {x: 0, y: 0} + - id: run1 + type: core/run@v1 + position: {x: 100, y: 0} + inputs: + shell: bash + script: echo hello +connections: [] +executions: + - src: {node: start, port: exec} + dst: {node: run1, port: exec} +`) + + node := ag.Nodes["run1"] + if node == nil { + t.Fatal("node run1 not found") + } + if node.DisableConcurrency() { + t.Error("expected DisableConcurrency to be false by default") + } +} + +func TestDisableConcurrencySetFromYaml(t *testing.T) { + ag := loadTestGraph(t, ` +entry: start +nodes: + - id: start + type: core/start@v1 + position: {x: 0, y: 0} + - id: run1 + type: core/run@v1 + position: {x: 100, y: 0} + inputs: + _disable_concurrency: true + shell: bash + script: echo hello +connections: [] +executions: + - src: {node: start, port: exec} + dst: {node: run1, port: exec} +`) + + node := ag.Nodes["run1"] + if node == nil { + t.Fatal("node run1 not found") + } + if !node.DisableConcurrency() { + t.Error("expected DisableConcurrency to be true when set in YAML") + } +}