diff --git a/.github/workflows/session-e2e.yaml b/.github/workflows/session-e2e.yaml new file mode 100644 index 00000000..f6eb4df1 --- /dev/null +++ b/.github/workflows/session-e2e.yaml @@ -0,0 +1,78 @@ +name: Session E2E + +# Drives the real `lk agent session` start/say/end lifecycle against the minimal +# one-file echo agent in cmd/lk/testdata/echo-agent, on Linux and Windows. This +# exercises the detached daemon, the readiness handshake, the console IPC +# transport, and the model round-trip end to end -- runtime behavior that +# `go test` alone never covers. +# +# Runs on manual dispatch and on pushes to any repo branch (forks can't trigger +# `push`, so secrets are only exposed to trusted collaborators). It needs live +# LiveKit credentials -- set these repo secrets first: LIVEKIT_API_KEY, +# LIVEKIT_API_SECRET, LIVEKIT_URL. The echo agent drives its LLM through LiveKit +# Inference, so no other provider keys are needed. +# +# The echo agent depends on plain PyPI livekit-agents (synced by `uv sync` from +# its pyproject.toml). Note: on current releases/main, `cli.run_app()` routes +# `console` through the legacy click CLI, which has no --connect-addr (that +# lives behind `python -m livekit.agents`). The fixture's __main__ dispatches +# console mode to the TCP console directly to bridge the daemon's +# `python console --connect-addr` launch. +# +# Node is intentionally not in the matrix yet: this branch's session daemon only +# supports Python agents (`detectProject` rejects non-Python), and Node console +# support depends on the brian/agent-session-node-support CLI line (#868/#878) +# plus agents-js #1804. Add a node arm once those land. + +on: + workflow_dispatch: + push: + branches: ['**'] + +concurrency: + group: session-e2e-${{ github.ref }} + cancel-in-progress: true + +jobs: + e2e: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest] + + runs-on: ${{ matrix.os }} + name: python on ${{ matrix.os }} + + permissions: + contents: read + + steps: + - name: Checkout livekit-cli + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + + - name: Set up Go + uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6 + with: + go-version-file: go.mod + cache: true + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Set up uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + + - name: Sync echo agent deps + working-directory: cmd/lk/testdata/echo-agent + run: uv sync + + - name: Run session e2e + env: + LIVEKIT_API_KEY: ${{ secrets.LIVEKIT_API_KEY }} + LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }} + LIVEKIT_URL: ${{ secrets.LIVEKIT_URL }} + run: go test ./cmd/lk -run TestSessionE2E -count=1 -v -timeout 600s diff --git a/.gitignore b/.gitignore index fa396b82..9901cd1c 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,13 @@ dist/ .DS_Store /lk + +# local secrets copied for e2e testing +.env + +# python venvs created for e2e agent fixtures +.venv/ +cmd/lk/testdata/**/.venv + +# uv lockfiles for test agent fixtures (resolved fresh in CI) +cmd/lk/testdata/**/uv.lock diff --git a/cmd/lk/session.go b/cmd/lk/session.go index 0b964f1d..4ecb43ee 100644 --- a/cmd/lk/session.go +++ b/cmd/lk/session.go @@ -15,7 +15,6 @@ package main import ( - "bufio" "context" "encoding/binary" "encoding/json" @@ -26,6 +25,7 @@ import ( "os/exec" "strconv" "strings" + "time" "github.com/urfave/cli/v3" ) @@ -38,11 +38,11 @@ const ( sessionHost = "127.0.0.1" defaultSessionPort = 8775 - envSessionPort = "LK_SESSION_PORT" // fixed port - envSessionDir = "LK_SESSION_DIR" // resolved project dir - envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative) - envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string - envSessionReadyFD = "LK_SESSION_READY_FD" + envSessionPort = "LK_SESSION_PORT" // fixed port + envSessionDir = "LK_SESSION_DIR" // resolved project dir + envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative) + envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string + envSessionReadyFile = "LK_SESSION_READY_FILE" // path the daemon writes its status to // sessionDaemonSubcommand is the hidden entrypoint `start` re-execs into. sessionDaemonSubcommand = "daemon" @@ -92,7 +92,7 @@ var agentSessionCommand = &cli.Command{ Name: sessionDaemonSubcommand, Hidden: true, Action: func(ctx context.Context, cmd *cli.Command) error { - if os.Getenv(envSessionReadyFD) == "" { + if os.Getenv(envSessionReadyFile) == "" { return fmt.Errorf("`session daemon` is an internal entrypoint; run `lk agent session start ` instead") } runSessionDaemon() @@ -118,19 +118,20 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error { return fmt.Errorf("could not resolve own binary: %w", err) } - // Pipe the daemon uses to report readiness (or a startup error) before we - // return. This avoids racing a TCP probe against the agent's own connect. - readyR, readyW, err := os.Pipe() + // Readiness file the daemon writes once it is up (or failed) before we + // return, so we don't race a TCP probe against the agent's own connect. + readyFile, err := os.CreateTemp("", "lk-session-ready-*.txt") if err != nil { return err } - defer readyR.Close() + readyPath := readyFile.Name() + readyFile.Close() + defer os.Remove(readyPath) // The daemon is detached, so its own stdout/stderr (panics etc.) go to a // temp log rather than the user's terminal. logFile, err := os.CreateTemp("", "lk-session-daemon-*.log") if err != nil { - readyW.Close() return err } @@ -140,24 +141,19 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error { envSessionDir+"="+projectDir, envSessionEntry+"="+entrypoint, envSessionPType+"="+string(projectType), - envSessionReadyFD+"=3", // ExtraFiles[0] is fd 3 in the child + envSessionReadyFile+"="+readyPath, ) - daemon.ExtraFiles = []*os.File{readyW} daemon.Stdout = logFile daemon.Stderr = logFile setDetachedProcAttr(daemon) if err := daemon.Start(); err != nil { - readyW.Close() logFile.Close() return fmt.Errorf("failed to start session daemon: %w", err) } - // Close our copy of the write end so the read below sees EOF if the daemon dies. - readyW.Close() logFile.Close() - status, _ := bufio.NewReader(readyR).ReadString('\n') - status = strings.TrimSpace(status) + status := awaitDaemonReady(daemon, readyPath) switch { case status == "ready": fmt.Fprintf(os.Stderr, "Detected %s agent (%s in %s)\n", projectType.Lang(), entrypoint, projectDir) @@ -170,6 +166,43 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error { } } +// awaitDaemonReady waits for the detached daemon to report via the readiness +// file, returning its status line ("ready" or "error: ...") or "" if the +// daemon exits or times out without reporting. +func awaitDaemonReady(daemon *exec.Cmd, readyPath string) string { + exited := make(chan struct{}) + go func() { _ = daemon.Wait(); close(exited) }() + + // Slightly longer than the daemon's own 60s agent-connect timeout so its + // "error: timed out ..." status reaches us before we give up. + timeout := time.After(65 * time.Second) + for { + if status, ok := readReadyStatus(readyPath); ok { + return status + } + select { + case <-exited: + if status, ok := readReadyStatus(readyPath); ok { + return status + } + return "" + case <-timeout: + return "" + case <-time.After(50 * time.Millisecond): + } + } +} + +// readReadyStatus returns the daemon's status line once the readiness file has +// content (written atomically via rename), or ok=false while it is still empty. +func readReadyStatus(path string) (string, bool) { + data, err := os.ReadFile(path) + if err != nil || len(data) == 0 { + return "", false + } + return strings.TrimSpace(string(data)), true +} + func runSessionSay(ctx context.Context, cmd *cli.Command) error { text := strings.TrimSpace(strings.Join(cmd.Args().Slice(), " ")) if text == "" { diff --git a/cmd/lk/session_daemon.go b/cmd/lk/session_daemon.go index 720e8b38..046f2759 100644 --- a/cmd/lk/session_daemon.go +++ b/cmd/lk/session_daemon.go @@ -100,26 +100,25 @@ func runSessionDaemon() { agentProc.Kill() } -// readyWriter returns the inherited pipe `lk agent session start` reads to learn the -// daemon became ready (or failed). Nil if not launched via start. -func readyWriter() *os.File { - fdStr := os.Getenv(envSessionReadyFD) - if fdStr == "" { - return nil - } - fd, err := strconv.Atoi(fdStr) - if err != nil { - return nil - } - return os.NewFile(uintptr(fd), "ready") +// readyWriter returns the path of the readiness file `lk agent session start` +// polls to learn the daemon became ready (or failed). Empty if not launched +// via start. +func readyWriter() string { + return os.Getenv(envSessionReadyFile) } -func signalReady(f *os.File, msg string) { - if f == nil { +// signalReady atomically writes the daemon's status to the readiness file the +// parent `start` is polling. The write-then-rename keeps the parent from +// reading a partial line. +func signalReady(path, msg string) { + if path == "" { + return + } + tmp := path + ".tmp" + if err := os.WriteFile(tmp, []byte(msg+"\n"), 0o600); err != nil { return } - fmt.Fprintln(f, msg) - f.Close() + _ = os.Rename(tmp, path) } type sessionDaemon struct { diff --git a/cmd/lk/session_e2e_test.go b/cmd/lk/session_e2e_test.go new file mode 100644 index 00000000..bd46ed25 --- /dev/null +++ b/cmd/lk/session_e2e_test.go @@ -0,0 +1,114 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "net" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestSessionE2E drives the real `lk agent session` lifecycle end to end: +// build the binary, `start` the detached daemon, `say` to make the model echo +// a token (asserting the CLI→daemon→agent→LLM round-trip), `end`, then confirm +// the daemon exited (nothing answers on the port). +// +// Opt-in: needs a prepared agent venv + live creds, so it skips unless +// LIVEKIT_API_KEY is set. Defaults to testdata/echo-agent; override with LK_SESSION_E2E_AGENT. +func TestSessionE2E(t *testing.T) { + if os.Getenv("LIVEKIT_API_KEY") == "" { + t.Skip("set LIVEKIT_API_KEY (and prepare the agent venv) to run the session e2e test") + } + entrypoint := os.Getenv("LK_SESSION_E2E_AGENT") + if entrypoint == "" { + entrypoint = filepath.Join("testdata", "echo-agent", "agent.py") + } + entrypoint, err := filepath.Abs(entrypoint) + require.NoError(t, err) + require.FileExists(t, entrypoint, "agent entrypoint not found (set LK_SESSION_E2E_AGENT to override)") + + // Dedicated port so the test can't collide with a real session on 8775. + port := "18775" + if p := os.Getenv("LK_SESSION_E2E_PORT"); p != "" { + port = p + } + + bin := buildLK(t) + + run := func(timeout time.Duration, args ...string) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + cmd := exec.CommandContext(ctx, bin, args...) + cmd.Env = os.Environ() + out, err := cmd.CombinedOutput() + return string(out), err + } + + // Best-effort teardown so a mid-run failure doesn't leave the daemon alive. + t.Cleanup(func() { + _, _ = run(15*time.Second, "agent", "session", "end", "--port", port) + }) + + // start: launches the detached daemon and returns once the agent is ready. + startOut, err := run(90*time.Second, "agent", "session", "start", "--port", port, entrypoint) + require.NoError(t, err, "session start failed:\n%s", startOut) + require.Contains(t, startOut, "Session started.", "start did not report readiness:\n%s", startOut) + + // say: the token appears once in the echoed prompt and again in the reply, so + // >=2 occurrences proves the agent answered, not just the local echo. + token := "PINEAPPLE7351" + sayOut, err := run(90*time.Second, "agent", "session", "say", "--port", port, + "Repeat this token back to me exactly and nothing else: "+token) + require.NoError(t, err, "session say failed:\n%s", sayOut) + require.GreaterOrEqualf(t, strings.Count(sayOut, token), 2, + "agent did not echo the token back; say output:\n%s", sayOut) + + endOut, err := run(30*time.Second, "agent", "session", "end", "--port", port) + require.NoError(t, err, "session end failed:\n%s", endOut) + require.Contains(t, endOut, "Session ended.", "end did not confirm shutdown:\n%s", endOut) + + // The detached daemon should now be gone: nothing should answer on the port. + require.Eventually(t, func() bool { + conn, derr := net.DialTimeout("tcp", "127.0.0.1:"+port, 200*time.Millisecond) + if derr != nil { + return true // refused → daemon exited + } + conn.Close() + return false + }, 10*time.Second, 200*time.Millisecond, "session daemon still listening on port %s after end", port) +} + +// buildLK compiles the lk binary into a temp dir and returns its path. +func buildLK(t *testing.T) string { + t.Helper() + bin := filepath.Join(t.TempDir(), "lk") + if runtime.GOOS == "windows" { + bin += ".exe" + } + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + build := exec.CommandContext(ctx, "go", "build", "-o", bin, ".") + out, err := build.CombinedOutput() + require.NoErrorf(t, err, "failed to build lk binary:\n%s", out) + return bin +} diff --git a/cmd/lk/testdata/echo-agent/agent.py b/cmd/lk/testdata/echo-agent/agent.py new file mode 100644 index 00000000..ed9ea971 --- /dev/null +++ b/cmd/lk/testdata/echo-agent/agent.py @@ -0,0 +1,48 @@ +"""Minimal one-file echo agent for the `lk agent session` e2e test. + +Driven in text mode, so an LLM is the only component needed. Echoes the user's +text verbatim, which the test asserts on. +""" + +from dotenv import load_dotenv +from livekit.agents import Agent, AgentServer, AgentSession, JobContext, cli, inference + +load_dotenv() + +server = AgentServer() + + +@server.rtc_session() +async def entrypoint(ctx: JobContext): + session = AgentSession(llm=inference.LLM(model="openai/gpt-4o-mini")) + await session.start( + agent=Agent( + instructions=( + "You are an echo bot. Reply with exactly the text the user " + "sends, verbatim, and nothing else." + ), + ), + room=ctx.room, + ) + # No TTS, so disable audio output or the turn crashes in tts_node. + session.output.set_audio_enabled(False) + await ctx.connect() + + +if __name__ == "__main__": + import sys + + argv = sys.argv[1:] + if argv and argv[0] == "console": + # The daemon launches `python agent.py console --connect-addr `, but + # cli.run_app() sends `console` to the legacy click CLI (no --connect-addr), + # so dispatch to the TCP console directly. + from livekit.agents.cli.cli import _run_tcp_console + + _run_tcp_console( + server=server, + connect_addr=argv[argv.index("--connect-addr") + 1], + record="--record" in argv, + ) + else: + cli.run_app(server) diff --git a/cmd/lk/testdata/echo-agent/pyproject.toml b/cmd/lk/testdata/echo-agent/pyproject.toml new file mode 100644 index 00000000..a0f07ffa --- /dev/null +++ b/cmd/lk/testdata/echo-agent/pyproject.toml @@ -0,0 +1,10 @@ +# uv project marker: [tool.uv] lets the daemon's `uv run python` auto-sync +# these deps -- no separate install step. +[project] +name = "lk-session-e2e-echo-agent" +version = "0" +requires-python = ">=3.12" +dependencies = ["livekit-agents", "python-dotenv"] + +[tool.uv] +package = false