-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.go
More file actions
125 lines (105 loc) · 2.72 KB
/
engine.go
File metadata and controls
125 lines (105 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package rely
import (
"context"
"encoding/json"
"fmt"
"github.com/exeebit/rely/journal"
)
// WorkflowFunc is the function signature for a workflow.
type WorkflowFunc func(ctx Context, input ...interface{}) error
// Engine manages the durable execution.
type Engine struct {
journal journal.Journal
workflows map[string]WorkflowFunc
}
// New creates a new Engine with the given journal backend.
func New(j journal.Journal) *Engine {
return &Engine{
journal: j,
workflows: make(map[string]WorkflowFunc),
}
}
// Define registers a workflow definition.
func (e *Engine) Define(name string, fn WorkflowFunc) *Workflow {
e.workflows[name] = fn
return &Workflow{
name: name,
engine: e,
}
}
type Workflow struct {
name string
engine *Engine
}
// Execute starts or follows a workflow execution.
func (w *Workflow) Execute(ctx context.Context, args ...interface{}) error {
// 1. Replay History to build current state
events, err := w.engine.journal.Read()
if err != nil {
return fmt.Errorf("failed to read journal: %w", err)
}
// 2. Create the Replay Context
rctx := &replayContext{
Context: ctx,
journal: w.engine.journal,
history: make(map[string]*journal.Event),
workflowName: w.name,
}
// Index history by step name
for i := range events {
ev := events[i]
if ev.Type == journal.EventStepCompleted {
rctx.history[ev.StepName] = &ev
}
}
// 3. Log Workflow Started (omitted for MVP)
// 4. Run the Workflow Function
fn, ok := w.engine.workflows[w.name]
if !ok {
return fmt.Errorf("workflow %s not found", w.name)
}
return fn(rctx, args...)
}
// replayContext implements Context
type replayContext struct {
context.Context
journal journal.Journal
history map[string]*journal.Event
workflowName string
}
func (c *replayContext) Step(name string, fn func() (interface{}, error), opts ...StepOption) StepResult {
// 1. Check if step already completed
if event, ok := c.history[name]; ok {
// Replay: Return cached result
return &stepResult{
payload: event.Payload,
err: nil,
}
}
// 2. Not found? Execute it.
val, err := fn()
// 3. Handle Failure
if err != nil {
return &stepResult{err: err}
}
// 4. Handle Success: Serialize
payload, err := json.Marshal(val)
if err != nil {
return &stepResult{err: fmt.Errorf("failed to marshal step result: %w", err)}
}
// 5. Persist to Journal
je := journal.Event{
Type: journal.EventStepCompleted,
Workflow: c.workflowName,
StepName: name,
Payload: payload,
Timestamp: 0, // Should be time.Now().Unix()
}
if err := c.journal.Append(je); err != nil {
return &stepResult{err: fmt.Errorf("failed to write to journal: %w", err)}
}
return &stepResult{
payload: payload,
err: nil,
}
}