Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,55 @@ Example script and workflow are available in the [examples/polling-simple](examp

---

### `dispatch`

Fires only when another workflow (or the CLI) hands it a run via `otter dispatch`.
The workflow stays idle until a dispatch arrives; it never polls and never fires on
its own. This lets one workflow programmatically start another **with data**: a
payload string plus a set of files that pre-populate the run's `trigger-context/`
— the same directory a polling `context_command` would write.

**Fields:**
- `type` (required): `"dispatch"`

**Example:**
```toml
name = "on-demand-handler"
type = "triggered"

[trigger]
type = "dispatch"

[[steps]]
type = "shell"
command = ["cat", "trigger-context/summary.txt"]
```

**Usage:**
```bash
# Hand a run to a running dispatch-triggered workflow, with context files.
otter dispatch on-demand-handler \
--payload "change-42" \
--context-dir ./my-context-dir \
--context-file summary.txt=/tmp/summary.txt
```

**Behavior:**
- The target workflow must be **running** (started or enabled) — its engine
registers the dispatch inbox on start. Dispatching to a stopped workflow, or to a
workflow whose trigger is not `dispatch`, returns an error.
- `--context-dir <dir>` copies every regular file in `<dir>` into
`trigger-context/`. `--context-file <name>=<path>` adds a single file under
`<name>`. Both may be combined and `--context-file` repeated. Context file
contents must be valid UTF-8 — they are carried as text, so binary files are
rejected at dispatch time.
- `--payload <str>` is recorded as the run's trigger payload.
- Each dispatch fires exactly one run; dispatches arriving while a run is in progress
are queued (same as polling events).
- File names are restricted to plain names; path-traversal entries are skipped.

---

## Workflow management

A **workflow package** is a directory containing a `workflow.toml` plus any companion scripts used by the workflow's steps. Companion scripts in the package directory are automatically prepended to `PATH` when any step in that workflow runs.
Expand Down
16 changes: 16 additions & 0 deletions crates/otter-cli/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,22 @@ async fn handle_connection<S>(
}
let _ = write_json(&mut writer, &result_to_response(result)).await;
}
DaemonCommand::Dispatch {
workflow,
payload,
context_files,
} => {
info!(workflow = %workflow, "Dispatch workflow requested");
let result = manager
.lock()
.await
.dispatch(&workflow, payload, context_files)
.await;
if let Err(e) = &result {
warn!(workflow = %workflow, error = %e, "Dispatch workflow failed");
}
let _ = write_json(&mut writer, &result_to_response(result)).await;
}
DaemonCommand::Stop { name } => {
info!(workflow = %name, "Stop workflow requested");
let result = manager.lock().await.stop(&name).await;
Expand Down
64 changes: 64 additions & 0 deletions crates/otter-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ enum Commands {
Ui,
/// Start a dormant workflow
Start { name: String },
/// Hand a one-off run to a running `dispatch`-triggered workflow, passing a
/// payload and/or files to pre-populate its `trigger-context/`.
Dispatch {
/// Name of the target dispatch-triggered workflow
workflow: String,
/// Optional payload string, available to the run as the trigger payload
#[arg(long)]
payload: Option<String>,
/// A file to place in trigger-context/, as `name=path` (repeatable)
#[arg(long = "context-file", value_name = "NAME=PATH")]
context_file: Vec<String>,
/// A directory whose files are all copied into trigger-context/
#[arg(long = "context-dir", value_name = "DIR")]
context_dir: Option<String>,
},
/// Stop a running workflow
Stop { name: String },
/// Print the status of all registered workflows
Expand Down Expand Up @@ -209,6 +224,41 @@ enum TriggersCommands {
DeleteConsumed { workflow: String, trigger: String },
}

/// Build the `(filename, contents)` list for `otter dispatch` from any
/// `--context-file name=path` args plus every file in an optional `--context-dir`.
fn collect_context_files(
context_file: &[String],
context_dir: Option<&str>,
) -> anyhow::Result<Vec<(String, String)>> {
let mut files = Vec::new();

if let Some(dir) = context_dir {
for entry in std::fs::read_dir(dir)
.map_err(|e| anyhow::anyhow!("reading --context-dir {dir}: {e}"))?
{
let entry = entry?;
if !entry.file_type()?.is_file() {
continue;
}
let name = entry.file_name().to_string_lossy().into_owned();
let contents = std::fs::read_to_string(entry.path())
.map_err(|e| anyhow::anyhow!("reading {}: {e}", entry.path().display()))?;
files.push((name, contents));
}
}

for spec in context_file {
let (name, path) = spec
.split_once('=')
.ok_or_else(|| anyhow::anyhow!("--context-file must be NAME=PATH, got '{spec}'"))?;
let contents =
std::fs::read_to_string(path).map_err(|e| anyhow::anyhow!("reading {path}: {e}"))?;
files.push((name.to_string(), contents));
}

Ok(files)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
Expand Down Expand Up @@ -241,6 +291,20 @@ async fn main() -> anyhow::Result<()> {
Some(Commands::Start { name }) => {
client::send_command_print(DaemonCommand::Start { name }).await
}
Some(Commands::Dispatch {
workflow,
payload,
context_file,
context_dir,
}) => {
let context_files = collect_context_files(&context_file, context_dir.as_deref())?;
client::send_command_print(DaemonCommand::Dispatch {
workflow,
payload,
context_files,
})
.await
}
Some(Commands::Stop { name }) => {
client::send_command_print(DaemonCommand::Stop { name }).await
}
Expand Down
37 changes: 37 additions & 0 deletions crates/otter-core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct Engine {
scripts_dir: Option<std::path::PathBuf>,
secret_store: Arc<dyn SecretStore>,
requirements: Option<Arc<Requirements>>,
dispatch_registry: Option<crate::triggers::DispatchRegistry>,
}

impl Engine {
Expand All @@ -45,6 +46,7 @@ impl Engine {
scripts_dir: None,
secret_store: Arc::new(NoOpSecretStore),
requirements: None,
dispatch_registry: None,
}
}

Expand All @@ -62,6 +64,7 @@ impl Engine {
scripts_dir,
secret_store: Arc::new(NoOpSecretStore),
requirements: None,
dispatch_registry: None,
}
}

Expand All @@ -80,6 +83,7 @@ impl Engine {
scripts_dir,
secret_store,
requirements: None,
dispatch_registry: None,
}
}

Expand All @@ -90,6 +94,16 @@ impl Engine {
self
}

/// Builder-style setter for the shared dispatch inbox registry. Required for
/// `dispatch`-triggered workflows so they can register and receive runs.
pub fn with_dispatch_registry(
mut self,
registry: Option<crate::triggers::DispatchRegistry>,
) -> Self {
self.dispatch_registry = registry;
self
}

pub fn with_executors(
storage: Arc<dyn StorageBackend>,
scratch_base: std::path::PathBuf,
Expand All @@ -103,6 +117,7 @@ impl Engine {
scripts_dir: None,
secret_store: Arc::new(NoOpSecretStore),
requirements: None,
dispatch_registry: None,
}
}

Expand Down Expand Up @@ -258,6 +273,7 @@ impl Engine {
self.scripts_dir.as_deref(),
self.secret_store.clone(),
self.requirements.clone(),
self.dispatch_registry.clone(),
)?;

let (trigger_tx, mut trigger_rx) = mpsc::channel::<TriggerEvent>(32);
Expand Down Expand Up @@ -467,6 +483,27 @@ impl Engine {
}
}

// Materialize inline context files (from a `dispatch` trigger) into
// trigger-context/ — the inline counterpart to a polling context command.
if let Some(files) = event.and_then(|e| e.inline_context.as_ref()) {
let ctx_dir = match &workspace_dir {
Some(ws) => ws.join("trigger-context"),
None => scratch_dir.join("trigger-context"),
};
std::fs::create_dir_all(&ctx_dir)?;
for (name, contents) in files {
// Guard against path traversal: only allow plain file names.
let safe = std::path::Path::new(name)
.file_name()
.map(|n| n == std::ffi::OsStr::new(name));
if safe != Some(true) {
warn!(run_id = %run.id, "skipping unsafe inline context file name {:?}", name);
continue;
}
std::fs::write(ctx_dir.join(name), contents)?;
}
}

let stop = if context_failed {
true
} else {
Expand Down
50 changes: 50 additions & 0 deletions crates/otter-core/src/engine_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,56 @@ async fn triggered_workflow_runs_once_per_event() {
shutdown.store(true, Ordering::Relaxed);
}

#[tokio::test]
async fn inline_context_is_written_to_trigger_context() {
// GIVEN a run seeded with an event carrying inline trigger-context files
let storage = Arc::new(InMemoryStorage::new());
let temp = tempfile::tempdir().unwrap();
let scratch = temp.path().to_path_buf();
let engine = Engine::new(
storage,
scratch.clone(),
Arc::new(otter_notify::NoOpNotifier),
);

let run_id = uuid::Uuid::new_v4();
let event = TriggerEvent {
source: "dispatch".to_string(),
payload: "ch-42".to_string(),
preallocated_run_id: Some(run_id),
pending_context: None,
inline_context: Some(vec![
("summary.txt".to_string(), "Change: 42".to_string()),
("otter_command.txt".to_string(), "review".to_string()),
// A traversal attempt must be ignored, not written.
("../escape.txt".to_string(), "nope".to_string()),
]),
};

let wf = workflow("inline-ctx", WorkflowType::Triggered, vec![]);
let shutdown = Arc::new(AtomicBool::new(false));

// WHEN the run executes
engine
.run_once(&wf, Some(&event), shutdown, None)
.await
.unwrap();

// THEN the inline files land in the run's trigger-context/ (scratch workspace)
let ctx = scratch.join(run_id.to_string()).join("trigger-context");
assert_eq!(
std::fs::read_to_string(ctx.join("summary.txt")).unwrap(),
"Change: 42"
);
assert_eq!(
std::fs::read_to_string(ctx.join("otter_command.txt")).unwrap(),
"review"
);
// AND the path-traversal entry was skipped: had the guard failed, joining
// "../escape.txt" onto trigger-context/ would have landed it in the run dir.
assert!(!scratch.join(run_id.to_string()).join("escape.txt").exists());
}

#[tokio::test]
async fn stop_prevents_next_iteration() {
// GIVEN a looping workflow with a fast shell step
Expand Down
Loading