From d03e43faac413e8baae6915bd739098d4d9b11ec Mon Sep 17 00:00:00 2001 From: Sebastian Rath Date: Wed, 18 Feb 2026 21:44:03 -0500 Subject: [PATCH 1/4] Add _disable_concurrency input to serialize concurrent node execution --- core/base.go | 15 +++++++++++++++ core/context.go | 11 +++++++++-- core/executions.go | 40 +++++++++++++++++++++++++++++++++++++++- core/graph.go | 16 +++++++++++----- 4 files changed, 74 insertions(+), 8 deletions(-) diff --git a/core/base.go b/core/base.go index a0cd38a..9b86e2f 100644 --- a/core/base.go +++ b/core/base.go @@ -562,6 +562,21 @@ func RegisterNodeFactory(nodeDefStr string, fn nodeFactoryFunc) error { nodeDef.Outputs[outputId] = outputDef } + // inject the hidden _disable_concurrency input for execution nodes. + // When set to true at runtime, concurrent calls to this nodes + // ExecuteImpl are serialized via a per-node-ID mutex + if countExec > 0 { + nodeDef.Inputs[InputId("_disable_concurrency")] = InputDefinition{ + PortDefinition: PortDefinition{ + Name: "Disable Concurrency", + Type: "bool", + Index: 999999, + }, + HideSocket: true, + Default: false, + } + } + id := fmt.Sprintf("%v@v%v", nodeDef.Id, nodeDef.Version) _, ok := registries[id] if ok { diff --git a/core/context.go b/core/context.go index 5a649a2..7ec9734 100644 --- a/core/context.go +++ b/core/context.go @@ -169,6 +169,12 @@ type ExecutionState struct { JobConclusion string `json:"jobConclusion"` DebugCallback DebugCallback `json:"-"` + + // PendingConcurrencyLocks tracks concurrency locks that are held during + // ExecuteImpl. The key is node id → *sync.Mutex. Released when the + // node calls Execute to dispatch downstream node, or as a fallback when + // ExecuteImpl returns without any dispatching + PendingConcurrencyLocks *sync.Map `json:"-"` } type ExecutionStateOptions struct { @@ -244,8 +250,9 @@ func (c *ExecutionState) PushNewExecutionState(parentNode NodeBaseInterface) *Ex PostSteps: c.PostSteps, JobConclusion: c.JobConclusion, - Visited: visited, - DebugCallback: c.DebugCallback, + Visited: visited, + DebugCallback: c.DebugCallback, + PendingConcurrencyLocks: &sync.Map{}, } return newEc diff --git a/core/executions.go b/core/executions.go index ca2b66a..b074420 100644 --- a/core/executions.go +++ b/core/executions.go @@ -1,6 +1,10 @@ package core -import "github.com/actionforge/actrun-cli/utils" +import ( + "sync" + + "github.com/actionforge/actrun-cli/utils" +) type ExecutionSource struct { SrcNode HasExecutionInterface @@ -30,6 +34,16 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error) dest, hasDest := n.GetExecutionTarget(outputPort) + // Release any pending concurrency lock for the source node. When a node + // calls Execute on its own Executions to dispatch downstream, it means + // the nodes own ExecuteImpl work is done so we can release its lock + // and let the next concurrent caller continue + if hasDest && dest.SrcNode != nil { + if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dest.SrcNode.GetId()); loaded { + lockVal.(*sync.Mutex).Unlock() + } + } + // Set the step conclusion for the SOURCE node based on which output port is being executed. // This enables ${{ steps.X.conclusion }} syntax similar to GitHub Actions. // The conclusion is set BEFORE downstream nodes execute so they can read it. @@ -65,6 +79,30 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error) return nil } + // If the destination node has _disable_concurrency set to true, serialize execution + // through a per-node-ID mutex to prevent concurrent ExecuteImpl calls. + // The lock is stored as pending and released when the node calls Execute + // to dispatch downstream (above), or as a fallback when ExecuteImpl + // returns without dispatching (below). + var dcNodeId string + if nwi, ok := dest.DstNode.(NodeWithInputs); ok { + dcVal, dcErr := InputValueById[bool](ec, nwi, InputId("_disable_concurrency")) + if dcErr == nil && dcVal { + dcNodeId = dest.DstNode.GetId() + actual, _ := ec.Graph.ConcurrencyLocks.LoadOrStore(dcNodeId, &sync.Mutex{}) + mu := actual.(*sync.Mutex) + mu.Lock() + ec.PendingConcurrencyLocks.Store(dcNodeId, mu) + // Fallback: release if ExecuteImpl returns without calling Execute + // (end of chain, error, or panic). + defer func() { + if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dcNodeId); loaded { + lockVal.(*sync.Mutex).Unlock() + } + }() + } + } + err = dest.DstNode.ExecuteImpl(ec, dest.Port, err) if err != nil { return err diff --git a/core/graph.go b/core/graph.go index 363248f..d09addf 100644 --- a/core/graph.go +++ b/core/graph.go @@ -40,6 +40,10 @@ type ActionGraph struct { Outputs map[OutputId]OutputDefinition `yaml:"outputs" json:"outputs" bson:"outputs"` Entry string + + // ConcurrencyLocks maps node ID → *sync.Mutex. Used to serialize concurrent + // calls to a node's ExecuteImpl when the node's _disable_concurrency input is true. + ConcurrencyLocks *sync.Map `yaml:"-" json:"-"` } func (ag *ActionGraph) AddNode(nodeId string, node NodeBaseInterface) { @@ -78,7 +82,8 @@ func (ag *ActionGraph) GetEntry() (NodeEntryInterface, error) { func NewActionGraph() ActionGraph { return ActionGraph{ - Nodes: make(map[string]NodeBaseInterface), + Nodes: make(map[string]NodeBaseInterface), + ConcurrencyLocks: &sync.Map{}, } } @@ -245,10 +250,11 @@ func NewExecutionState( ctx, cancel := context.WithCancel(ctx) return &ExecutionState{ - Graph: graph, - Hierarchy: make([]NodeBaseInterface, 0), - ContextStackLock: &sync.RWMutex{}, - OutputCacheLock: &sync.RWMutex{}, + Graph: graph, + Hierarchy: make([]NodeBaseInterface, 0), + ContextStackLock: &sync.RWMutex{}, + OutputCacheLock: &sync.RWMutex{}, + PendingConcurrencyLocks: &sync.Map{}, IsDebugSession: debugCb != nil, DebugCallback: debugCb, From c9b16267365f74fccf5f8a15684725c9dd84e178 Mon Sep 17 00:00:00 2001 From: Sebastian Rath Date: Wed, 18 Feb 2026 21:44:09 -0500 Subject: [PATCH 2/4] Add unit and e2e tests for _disable_concurrency --- ...e_concurrent_for_disable_concurrency.sh_l9 | 11 ++ .../concurrent_for_disable_concurrency.act | 128 ++++++++++++++++++ .../concurrent_for_disable_concurrency.sh | 10 ++ tests_unit/disable_concurrency_input_test.go | 52 +++++++ 4 files changed, 201 insertions(+) create mode 100644 tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 create mode 100644 tests_e2e/scripts/concurrent_for_disable_concurrency.act create mode 100644 tests_e2e/scripts/concurrent_for_disable_concurrency.sh create mode 100644 tests_unit/disable_concurrency_input_test.go diff --git a/tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 b/tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 new file mode 100644 index 0000000..58d8ca6 --- /dev/null +++ b/tests_e2e/references/reference_concurrent_for_disable_concurrency.sh_l9 @@ -0,0 +1,11 @@ +start-0 +end-0 +start-1 +end-1 +start-2 +end-2 +start-3 +end-3 +start-4 +end-4 +all done diff --git a/tests_e2e/scripts/concurrent_for_disable_concurrency.act b/tests_e2e/scripts/concurrent_for_disable_concurrency.act new file mode 100644 index 0000000..8f8dc51 --- /dev/null +++ b/tests_e2e/scripts/concurrent_for_disable_concurrency.act @@ -0,0 +1,128 @@ +entry: start +nodes: + - id: start + type: core/start@v1 + position: + x: 10 + y: 500 + - id: loop + type: core/concurrent-for-loop@v1 + position: + x: 290 + y: 400 + inputs: + first_index: 0 + last_index: 4 + worker_count: 0 + - id: mul + type: core/math-multiply@v1 + position: + x: 600 + y: 200 + inputs: + inputs[0]: null + inputs[1]: 100 + - id: stagger + type: core/sleep@v1 + position: + x: 900 + y: 300 + inputs: + unit: milliseconds + - id: fmt + type: core/string-fmt@v1 + position: + x: 600 + y: 500 + inputs: + substitutes[0]: null + fmt: IDX=%v + - id: env + type: core/env-array@v1 + position: + x: 1000 + y: 500 + inputs: + env[0]: '' + - id: run + type: core/run@v1 + position: + x: 1400 + y: 300 + inputs: + _disable_concurrency: true + shell: python + script: |- + import time, sys, os + idx = os.environ["IDX"] + sys.stdout.write("start-" + idx + "\n") + sys.stdout.flush() + time.sleep(0.5) + sys.stdout.write("end-" + idx + "\n") + sys.stdout.flush() + - id: done + type: core/run@v1 + position: + x: 1400 + y: 800 + inputs: + shell: python + script: |- + import sys + sys.stdout.write("all done\n") +connections: + - src: + node: loop + port: index + dst: + node: mul + port: inputs[0] + - src: + node: mul + port: result + dst: + node: stagger + port: duration + - src: + node: loop + port: index + dst: + node: fmt + port: substitutes[0] + - src: + node: fmt + port: result + dst: + node: env + port: env[0] + - src: + node: env + port: env + dst: + node: run + port: env +executions: + - src: + node: start + port: exec + dst: + node: loop + port: exec + - src: + node: loop + port: exec-body + dst: + node: stagger + port: exec + - src: + node: stagger + port: exec + dst: + node: run + port: exec + - src: + node: loop + port: exec-completed + dst: + node: done + port: exec diff --git a/tests_e2e/scripts/concurrent_for_disable_concurrency.sh b/tests_e2e/scripts/concurrent_for_disable_concurrency.sh new file mode 100644 index 0000000..279fb4f --- /dev/null +++ b/tests_e2e/scripts/concurrent_for_disable_concurrency.sh @@ -0,0 +1,10 @@ +echo "Test Concurrent For Loop with Disable Concurrency" + +TEST_NAME=concurrent_for_disable_concurrency +GRAPH_FILE="${ACT_GRAPH_FILES_DIR}${PATH_SEPARATOR}${TEST_NAME}.act" +cp $GRAPH_FILE $TEST_NAME.act +export ACT_GRAPH_FILE=$TEST_NAME.act +export ACT_LOGLEVEL=normal + +#! test actrun + diff --git a/tests_unit/disable_concurrency_input_test.go b/tests_unit/disable_concurrency_input_test.go new file mode 100644 index 0000000..5c04ff6 --- /dev/null +++ b/tests_unit/disable_concurrency_input_test.go @@ -0,0 +1,52 @@ +//go:build tests_unit + +package tests_unit + +import ( + "testing" + + "github.com/actionforge/actrun-cli/core" + + // initialize all nodes + _ "github.com/actionforge/actrun-cli/nodes" +) + +func TestDisableConcurrencyInputInjection(t *testing.T) { + registries := core.GetRegistries() + if len(registries) == 0 { + t.Fatal("no node types registered; node factories not loaded") + } + + for id, nodeDef := range registries { + // Count execution inputs to determine if this is an execution node. + hasExecInput := false + for _, inputDef := range nodeDef.Inputs { + if inputDef.Exec { + hasExecInput = true + break + } + } + + dcDef, hasDC := nodeDef.Inputs[core.InputId("_disable_concurrency")] + + if hasExecInput { + if !hasDC { + t.Errorf("node %s has execution inputs but is missing _disable_concurrency input", id) + continue + } + if dcDef.Type != "bool" { + t.Errorf("node %s: _disable_concurrency type = %q, want \"bool\"", id, dcDef.Type) + } + if dcDef.Default != false { + t.Errorf("node %s: _disable_concurrency default = %v, want false", id, dcDef.Default) + } + if !dcDef.HideSocket { + t.Errorf("node %s: _disable_concurrency HideSocket = false, want true", id) + } + } else { + if hasDC { + t.Errorf("node %s has no execution inputs but has _disable_concurrency input", id) + } + } + } +} From 42dfac511877d899990336d09af4c12d4db47ce6 Mon Sep 17 00:00:00 2001 From: Sebastian Rath Date: Wed, 18 Feb 2026 21:44:15 -0500 Subject: [PATCH 3/4] Use _disable_concurrency in build workflow --- .github/workflows/graphs/build-test-publish.act | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/graphs/build-test-publish.act b/.github/workflows/graphs/build-test-publish.act index 02ae836..3296bde 100644 --- a/.github/workflows/graphs/build-test-publish.act +++ b/.github/workflows/graphs/build-test-publish.act @@ -6021,6 +6021,7 @@ nodes: x: 13320 y: 620 inputs: + _disable_concurrency: true dependency-snapshot: 'true' - id: core-string-fmt-v1-lychee-coconut-gray type: core/string-fmt@v1 From d48e921504b666b0836d413cf7c74bf744bac035 Mon Sep 17 00:00:00 2001 From: Sebastian Rath Date: Wed, 18 Feb 2026 21:50:06 -0500 Subject: [PATCH 4/4] Move _disable_concurrency to static field to avoid unnecessary input lookups --- core/base.go | 33 +++--- core/executions.go | 32 +++--- core/graph.go | 19 +++- tests_unit/disable_concurrency_input_test.go | 101 ++++++++++++------- 4 files changed, 109 insertions(+), 76 deletions(-) diff --git a/core/base.go b/core/base.go index 9b86e2f..6597d8c 100644 --- a/core/base.go +++ b/core/base.go @@ -169,6 +169,9 @@ type NodeBaseInterface interface { IsExecutionNode() bool SetExecutionNode(execNode bool) + DisableConcurrency() bool + SetDisableConcurrency(v bool) + // Returns the cache type where data is stored or should be stored to // By default this depends on if this is an execution node or not. GetCacheType() CacheType @@ -183,9 +186,10 @@ type NodeBaseComponent struct { FullPath string // Full path of the node within the graph hierarchy CacheId string // Unique identifier for the cache NodeType string // Node type of the node (e.g. core/run@v1 or github.com/actions/checkout@v3) - Graph *ActionGraph - Parent NodeBaseInterface - isExecutionNode bool + Graph *ActionGraph + Parent NodeBaseInterface + isExecutionNode bool + disableConcurrency bool } func (n *NodeBaseComponent) GetCacheType() CacheType { @@ -204,6 +208,14 @@ func (n *NodeBaseComponent) SetExecutionNode(execNode bool) { n.isExecutionNode = execNode } +func (n *NodeBaseComponent) DisableConcurrency() bool { + return n.disableConcurrency +} + +func (n *NodeBaseComponent) SetDisableConcurrency(v bool) { + n.disableConcurrency = v +} + func (n *NodeBaseComponent) SetId(id string) { n.Id = id n.CacheId = fmt.Sprintf("%s:%s", n.Id, uuid.New().String()) @@ -562,21 +574,6 @@ func RegisterNodeFactory(nodeDefStr string, fn nodeFactoryFunc) error { nodeDef.Outputs[outputId] = outputDef } - // inject the hidden _disable_concurrency input for execution nodes. - // When set to true at runtime, concurrent calls to this nodes - // ExecuteImpl are serialized via a per-node-ID mutex - if countExec > 0 { - nodeDef.Inputs[InputId("_disable_concurrency")] = InputDefinition{ - PortDefinition: PortDefinition{ - Name: "Disable Concurrency", - Type: "bool", - Index: 999999, - }, - HideSocket: true, - Default: false, - } - } - id := fmt.Sprintf("%v@v%v", nodeDef.Id, nodeDef.Version) _, ok := registries[id] if ok { diff --git a/core/executions.go b/core/executions.go index b074420..12ff733 100644 --- a/core/executions.go +++ b/core/executions.go @@ -79,28 +79,24 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error) return nil } - // If the destination node has _disable_concurrency set to true, serialize execution + // If the destination node has _disable_concurrency set, serialize execution // through a per-node-ID mutex to prevent concurrent ExecuteImpl calls. // The lock is stored as pending and released when the node calls Execute // to dispatch downstream (above), or as a fallback when ExecuteImpl // returns without dispatching (below). - var dcNodeId string - if nwi, ok := dest.DstNode.(NodeWithInputs); ok { - dcVal, dcErr := InputValueById[bool](ec, nwi, InputId("_disable_concurrency")) - if dcErr == nil && dcVal { - dcNodeId = dest.DstNode.GetId() - actual, _ := ec.Graph.ConcurrencyLocks.LoadOrStore(dcNodeId, &sync.Mutex{}) - mu := actual.(*sync.Mutex) - mu.Lock() - ec.PendingConcurrencyLocks.Store(dcNodeId, mu) - // Fallback: release if ExecuteImpl returns without calling Execute - // (end of chain, error, or panic). - defer func() { - if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dcNodeId); loaded { - lockVal.(*sync.Mutex).Unlock() - } - }() - } + if dest.DstNode.DisableConcurrency() { + dcNodeId := dest.DstNode.GetId() + actual, _ := ec.Graph.ConcurrencyLocks.LoadOrStore(dcNodeId, &sync.Mutex{}) + mu := actual.(*sync.Mutex) + mu.Lock() + ec.PendingConcurrencyLocks.Store(dcNodeId, mu) + // Fallback: release if ExecuteImpl returns without calling Execute + // (end of chain, error, or panic). + defer func() { + if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dcNodeId); loaded { + lockVal.(*sync.Mutex).Unlock() + } + }() } err = dest.DstNode.ExecuteImpl(ec, dest.Port, err) diff --git a/core/graph.go b/core/graph.go index d09addf..60b675f 100644 --- a/core/graph.go +++ b/core/graph.go @@ -82,7 +82,7 @@ func (ag *ActionGraph) GetEntry() (NodeEntryInterface, error) { func NewActionGraph() ActionGraph { return ActionGraph{ - Nodes: make(map[string]NodeBaseInterface), + Nodes: make(map[string]NodeBaseInterface), ConcurrencyLocks: &sync.Map{}, } } @@ -250,10 +250,10 @@ func NewExecutionState( ctx, cancel := context.WithCancel(ctx) return &ExecutionState{ - Graph: graph, - Hierarchy: make([]NodeBaseInterface, 0), - ContextStackLock: &sync.RWMutex{}, - OutputCacheLock: &sync.RWMutex{}, + Graph: graph, + Hierarchy: make([]NodeBaseInterface, 0), + ContextStackLock: &sync.RWMutex{}, + OutputCacheLock: &sync.RWMutex{}, PendingConcurrencyLocks: &sync.Map{}, IsDebugSession: debugCb != nil, @@ -821,6 +821,15 @@ func LoadInputValues(node NodeBaseInterface, nodeI map[string]any, validate bool subInputs := map[string][]subInput{} + // _disable_concurrency is not a regular input, its stored directly on + // the node instance so we pull it out before processing the rest. + if v, ok := inputValues["_disable_concurrency"]; ok { + if b, ok := v.(bool); ok && b { + node.SetDisableConcurrency(true) + } + delete(inputValues, "_disable_concurrency") + } + for portId, inputValue := range inputValues { groupInputId, portIndex, isIndexPort := IsValidIndexPortId(portId) if isIndexPort { diff --git a/tests_unit/disable_concurrency_input_test.go b/tests_unit/disable_concurrency_input_test.go index 5c04ff6..a4b0bae 100644 --- a/tests_unit/disable_concurrency_input_test.go +++ b/tests_unit/disable_concurrency_input_test.go @@ -6,47 +6,78 @@ import ( "testing" "github.com/actionforge/actrun-cli/core" + "go.yaml.in/yaml/v4" // initialize all nodes _ "github.com/actionforge/actrun-cli/nodes" ) -func TestDisableConcurrencyInputInjection(t *testing.T) { - registries := core.GetRegistries() - if len(registries) == 0 { - t.Fatal("no node types registered; node factories not loaded") +func loadTestGraph(t *testing.T, graphStr string) core.ActionGraph { + t.Helper() + var graphYaml map[string]any + if err := yaml.Unmarshal([]byte(graphStr), &graphYaml); err != nil { + t.Fatalf("unmarshal YAML: %v", err) } + ag, errs := core.LoadGraph(graphYaml, nil, "", false, core.RunOpts{}) + if len(errs) > 0 { + t.Fatalf("LoadGraph: %v", errs[0]) + } + return ag +} + +func TestDisableConcurrencyNotSetByDefault(t *testing.T) { + ag := loadTestGraph(t, ` +entry: start +nodes: + - id: start + type: core/start@v1 + position: {x: 0, y: 0} + - id: run1 + type: core/run@v1 + position: {x: 100, y: 0} + inputs: + shell: bash + script: echo hello +connections: [] +executions: + - src: {node: start, port: exec} + dst: {node: run1, port: exec} +`) - for id, nodeDef := range registries { - // Count execution inputs to determine if this is an execution node. - hasExecInput := false - for _, inputDef := range nodeDef.Inputs { - if inputDef.Exec { - hasExecInput = true - break - } - } - - dcDef, hasDC := nodeDef.Inputs[core.InputId("_disable_concurrency")] - - if hasExecInput { - if !hasDC { - t.Errorf("node %s has execution inputs but is missing _disable_concurrency input", id) - continue - } - if dcDef.Type != "bool" { - t.Errorf("node %s: _disable_concurrency type = %q, want \"bool\"", id, dcDef.Type) - } - if dcDef.Default != false { - t.Errorf("node %s: _disable_concurrency default = %v, want false", id, dcDef.Default) - } - if !dcDef.HideSocket { - t.Errorf("node %s: _disable_concurrency HideSocket = false, want true", id) - } - } else { - if hasDC { - t.Errorf("node %s has no execution inputs but has _disable_concurrency input", id) - } - } + node := ag.Nodes["run1"] + if node == nil { + t.Fatal("node run1 not found") + } + if node.DisableConcurrency() { + t.Error("expected DisableConcurrency to be false by default") + } +} + +func TestDisableConcurrencySetFromYaml(t *testing.T) { + ag := loadTestGraph(t, ` +entry: start +nodes: + - id: start + type: core/start@v1 + position: {x: 0, y: 0} + - id: run1 + type: core/run@v1 + position: {x: 100, y: 0} + inputs: + _disable_concurrency: true + shell: bash + script: echo hello +connections: [] +executions: + - src: {node: start, port: exec} + dst: {node: run1, port: exec} +`) + + node := ag.Nodes["run1"] + if node == nil { + t.Fatal("node run1 not found") + } + if !node.DisableConcurrency() { + t.Error("expected DisableConcurrency to be true when set in YAML") } }