Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .github/workflows/session-e2e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
name: Session E2E

# Drives the real `lk agent session` start/say/end lifecycle against the minimal
# one-file echo agent in cmd/lk/testdata/echo-agent, on Linux and Windows. This
# exercises the detached daemon, the readiness handshake, the console IPC
# transport, and the model round-trip end to end -- runtime behavior that
# `go test` alone never covers.
#
# Runs on manual dispatch and on pushes to any repo branch (forks can't trigger
# `push`, so secrets are only exposed to trusted collaborators). It needs live
# LiveKit credentials -- set these repo secrets first: LIVEKIT_API_KEY,
# LIVEKIT_API_SECRET, LIVEKIT_URL. The echo agent drives its LLM through LiveKit
# Inference, so no other provider keys are needed.
#
# The echo agent depends on plain PyPI livekit-agents (synced by `uv sync` from
# its pyproject.toml). Note: on current releases/main, `cli.run_app()` routes
# `console` through the legacy click CLI, which has no --connect-addr (that
# lives behind `python -m livekit.agents`). The fixture's __main__ dispatches
# console mode to the TCP console directly to bridge the daemon's
# `python <entry> console --connect-addr` launch.
#
# Node is intentionally not in the matrix yet: this branch's session daemon only
# supports Python agents (`detectProject` rejects non-Python), and Node console
# support depends on the brian/agent-session-node-support CLI line (#868/#878)
# plus agents-js #1804. Add a node arm once those land.

on:
workflow_dispatch:
push:
branches: ['**']

concurrency:
group: session-e2e-${{ github.ref }}
cancel-in-progress: true

jobs:
e2e:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]

runs-on: ${{ matrix.os }}
name: python on ${{ matrix.os }}

permissions:
contents: read

steps:
- name: Checkout livekit-cli
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6

- name: Set up Go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6
with:
go-version-file: go.mod
cache: true

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true

- name: Sync echo agent deps
working-directory: cmd/lk/testdata/echo-agent
run: uv sync

- name: Run session e2e
env:
LIVEKIT_API_KEY: ${{ secrets.LIVEKIT_API_KEY }}
LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }}
LIVEKIT_URL: ${{ secrets.LIVEKIT_URL }}
run: go test ./cmd/lk -run TestSessionE2E -count=1 -v -timeout 600s
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,13 @@ dist/

.DS_Store
/lk

# local secrets copied for e2e testing
.env

# python venvs created for e2e agent fixtures
.venv/
cmd/lk/testdata/**/.venv

# uv lockfiles for test agent fixtures (resolved fresh in CI)
cmd/lk/testdata/**/uv.lock
71 changes: 52 additions & 19 deletions cmd/lk/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package main

import (
"bufio"
"context"
"encoding/binary"
"encoding/json"
Expand All @@ -26,6 +25,7 @@ import (
"os/exec"
"strconv"
"strings"
"time"

"github.com/urfave/cli/v3"
)
Expand All @@ -38,11 +38,11 @@ const (
sessionHost = "127.0.0.1"
defaultSessionPort = 8775

envSessionPort = "LK_SESSION_PORT" // fixed port
envSessionDir = "LK_SESSION_DIR" // resolved project dir
envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative)
envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string
envSessionReadyFD = "LK_SESSION_READY_FD"
envSessionPort = "LK_SESSION_PORT" // fixed port
envSessionDir = "LK_SESSION_DIR" // resolved project dir
envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative)
envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string
envSessionReadyFile = "LK_SESSION_READY_FILE" // path the daemon writes its status to

// sessionDaemonSubcommand is the hidden entrypoint `start` re-execs into.
sessionDaemonSubcommand = "daemon"
Expand Down Expand Up @@ -92,7 +92,7 @@ var agentSessionCommand = &cli.Command{
Name: sessionDaemonSubcommand,
Hidden: true,
Action: func(ctx context.Context, cmd *cli.Command) error {
if os.Getenv(envSessionReadyFD) == "" {
if os.Getenv(envSessionReadyFile) == "" {
return fmt.Errorf("`session daemon` is an internal entrypoint; run `lk agent session start <entrypoint>` instead")
}
runSessionDaemon()
Expand All @@ -118,19 +118,20 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error {
return fmt.Errorf("could not resolve own binary: %w", err)
}

// Pipe the daemon uses to report readiness (or a startup error) before we
// return. This avoids racing a TCP probe against the agent's own connect.
readyR, readyW, err := os.Pipe()
// Readiness file the daemon writes once it is up (or failed) before we
// return, so we don't race a TCP probe against the agent's own connect.
readyFile, err := os.CreateTemp("", "lk-session-ready-*.txt")
if err != nil {
return err
}
defer readyR.Close()
readyPath := readyFile.Name()
readyFile.Close()
defer os.Remove(readyPath)

// The daemon is detached, so its own stdout/stderr (panics etc.) go to a
// temp log rather than the user's terminal.
logFile, err := os.CreateTemp("", "lk-session-daemon-*.log")
if err != nil {
readyW.Close()
return err
}

Expand All @@ -140,24 +141,19 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error {
envSessionDir+"="+projectDir,
envSessionEntry+"="+entrypoint,
envSessionPType+"="+string(projectType),
envSessionReadyFD+"=3", // ExtraFiles[0] is fd 3 in the child
envSessionReadyFile+"="+readyPath,
)
daemon.ExtraFiles = []*os.File{readyW}
daemon.Stdout = logFile
daemon.Stderr = logFile
setDetachedProcAttr(daemon)

if err := daemon.Start(); err != nil {
readyW.Close()
logFile.Close()
return fmt.Errorf("failed to start session daemon: %w", err)
}
// Close our copy of the write end so the read below sees EOF if the daemon dies.
readyW.Close()
logFile.Close()

status, _ := bufio.NewReader(readyR).ReadString('\n')
status = strings.TrimSpace(status)
status := awaitDaemonReady(daemon, readyPath)
switch {
case status == "ready":
fmt.Fprintf(os.Stderr, "Detected %s agent (%s in %s)\n", projectType.Lang(), entrypoint, projectDir)
Expand All @@ -170,6 +166,43 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error {
}
}

// awaitDaemonReady waits for the detached daemon to report via the readiness
// file, returning its status line ("ready" or "error: ...") or "" if the
// daemon exits or times out without reporting.
func awaitDaemonReady(daemon *exec.Cmd, readyPath string) string {
exited := make(chan struct{})
go func() { _ = daemon.Wait(); close(exited) }()

// Slightly longer than the daemon's own 60s agent-connect timeout so its
// "error: timed out ..." status reaches us before we give up.
timeout := time.After(65 * time.Second)
for {
if status, ok := readReadyStatus(readyPath); ok {
return status
}
select {
case <-exited:
if status, ok := readReadyStatus(readyPath); ok {
return status
}
return ""
case <-timeout:
return ""
case <-time.After(50 * time.Millisecond):
}
}
}

// readReadyStatus returns the daemon's status line once the readiness file has
// content (written atomically via rename), or ok=false while it is still empty.
func readReadyStatus(path string) (string, bool) {
data, err := os.ReadFile(path)
if err != nil || len(data) == 0 {
return "", false
}
return strings.TrimSpace(string(data)), true
}

func runSessionSay(ctx context.Context, cmd *cli.Command) error {
text := strings.TrimSpace(strings.Join(cmd.Args().Slice(), " "))
if text == "" {
Expand Down
31 changes: 15 additions & 16 deletions cmd/lk/session_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,25 @@ func runSessionDaemon() {
agentProc.Kill()
}

// readyWriter returns the inherited pipe `lk agent session start` reads to learn the
// daemon became ready (or failed). Nil if not launched via start.
func readyWriter() *os.File {
fdStr := os.Getenv(envSessionReadyFD)
if fdStr == "" {
return nil
}
fd, err := strconv.Atoi(fdStr)
if err != nil {
return nil
}
return os.NewFile(uintptr(fd), "ready")
// readyWriter returns the path of the readiness file `lk agent session start`
// polls to learn the daemon became ready (or failed). Empty if not launched
// via start.
func readyWriter() string {
return os.Getenv(envSessionReadyFile)
}

func signalReady(f *os.File, msg string) {
if f == nil {
// signalReady atomically writes the daemon's status to the readiness file the
// parent `start` is polling. The write-then-rename keeps the parent from
// reading a partial line.
func signalReady(path, msg string) {
if path == "" {
return
}
tmp := path + ".tmp"
if err := os.WriteFile(tmp, []byte(msg+"\n"), 0o600); err != nil {
return
}
fmt.Fprintln(f, msg)
f.Close()
_ = os.Rename(tmp, path)
}

type sessionDaemon struct {
Expand Down
114 changes: 114 additions & 0 deletions cmd/lk/session_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2025 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestSessionE2E drives the real `lk agent session` lifecycle end to end:
// build the binary, `start` the detached daemon, `say` to make the model echo
// a token (asserting the CLI→daemon→agent→LLM round-trip), `end`, then confirm
// the daemon exited (nothing answers on the port).
//
// Opt-in: needs a prepared agent venv + live creds, so it skips unless
// LIVEKIT_API_KEY is set. Defaults to testdata/echo-agent; override with LK_SESSION_E2E_AGENT.
func TestSessionE2E(t *testing.T) {
if os.Getenv("LIVEKIT_API_KEY") == "" {
t.Skip("set LIVEKIT_API_KEY (and prepare the agent venv) to run the session e2e test")
}
entrypoint := os.Getenv("LK_SESSION_E2E_AGENT")
if entrypoint == "" {
entrypoint = filepath.Join("testdata", "echo-agent", "agent.py")
}
entrypoint, err := filepath.Abs(entrypoint)
require.NoError(t, err)
require.FileExists(t, entrypoint, "agent entrypoint not found (set LK_SESSION_E2E_AGENT to override)")

// Dedicated port so the test can't collide with a real session on 8775.
port := "18775"
if p := os.Getenv("LK_SESSION_E2E_PORT"); p != "" {
port = p
}

bin := buildLK(t)

run := func(timeout time.Duration, args ...string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, bin, args...)
cmd.Env = os.Environ()
out, err := cmd.CombinedOutput()
return string(out), err
}

// Best-effort teardown so a mid-run failure doesn't leave the daemon alive.
t.Cleanup(func() {
_, _ = run(15*time.Second, "agent", "session", "end", "--port", port)
})

// start: launches the detached daemon and returns once the agent is ready.
startOut, err := run(90*time.Second, "agent", "session", "start", "--port", port, entrypoint)
require.NoError(t, err, "session start failed:\n%s", startOut)
require.Contains(t, startOut, "Session started.", "start did not report readiness:\n%s", startOut)

// say: the token appears once in the echoed prompt and again in the reply, so
// >=2 occurrences proves the agent answered, not just the local echo.
token := "PINEAPPLE7351"
sayOut, err := run(90*time.Second, "agent", "session", "say", "--port", port,
"Repeat this token back to me exactly and nothing else: "+token)
require.NoError(t, err, "session say failed:\n%s", sayOut)
require.GreaterOrEqualf(t, strings.Count(sayOut, token), 2,
"agent did not echo the token back; say output:\n%s", sayOut)

endOut, err := run(30*time.Second, "agent", "session", "end", "--port", port)
require.NoError(t, err, "session end failed:\n%s", endOut)
require.Contains(t, endOut, "Session ended.", "end did not confirm shutdown:\n%s", endOut)

// The detached daemon should now be gone: nothing should answer on the port.
require.Eventually(t, func() bool {
conn, derr := net.DialTimeout("tcp", "127.0.0.1:"+port, 200*time.Millisecond)
if derr != nil {
return true // refused → daemon exited
}
conn.Close()
return false
}, 10*time.Second, 200*time.Millisecond, "session daemon still listening on port %s after end", port)
}

// buildLK compiles the lk binary into a temp dir and returns its path.
func buildLK(t *testing.T) string {
t.Helper()
bin := filepath.Join(t.TempDir(), "lk")
if runtime.GOOS == "windows" {
bin += ".exe"
}
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
build := exec.CommandContext(ctx, "go", "build", "-o", bin, ".")
out, err := build.CombinedOutput()
require.NoErrorf(t, err, "failed to build lk binary:\n%s", out)
return bin
}
Loading
Loading