Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/lk/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ var (
ArgsUsage: "[working-dir]",
},
privateLinkCommands,
simulateCommand,
},
},
}
Expand Down
29 changes: 12 additions & 17 deletions cmd/lk/agent_private_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"strconv"

"github.com/livekit/livekit-cli/v2/pkg/util"
lkproto "github.com/livekit/protocol/livekit"
Expand Down Expand Up @@ -87,11 +86,9 @@ var privateLinkCommands = &cli.Command{
},
}

func buildCreatePrivateLinkRequest(name, region string, port uint32, awsEndpoint string) *lkproto.CreatePrivateLinkRequest {
func buildCreatePrivateLinkRequest(name, awsEndpoint string) *lkproto.CreatePrivateLinkRequest {
return &lkproto.CreatePrivateLinkRequest{
Name: name,
Region: region,
Port: port,
Name: name,
Config: &lkproto.CreatePrivateLinkRequest_Aws{
Aws: &lkproto.CreatePrivateLinkRequest_AWSCreateConfig{
Endpoint: awsEndpoint,
Expand All @@ -104,14 +101,14 @@ func privateLinkServiceDNS(name, projectID string) string {
return fmt.Sprintf("%s-%s.plg.svc", name, projectID)
}

func buildPrivateLinkListRows(links []*lkproto.PrivateLink, healthByID map[string]*lkproto.PrivateLinkStatus, healthErrByID map[string]error) [][]string {
func buildPrivateLinkListRows(links []*lkproto.PrivateLink, healthByID map[string]*lkproto.PrivateLinkHealthStatus, healthErrByID map[string]error) [][]string {
var rows [][]string
for _, link := range links {
if link == nil {
continue
}

status := lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_UNKNOWN.String()
status := lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_UNKNOWN.String()
updatedAt := "-"

if err, ok := healthErrByID[link.PrivateLinkId]; ok && err != nil {
Expand All @@ -127,8 +124,6 @@ func buildPrivateLinkListRows(links []*lkproto.PrivateLink, healthByID map[strin
rows = append(rows, []string{
link.PrivateLinkId,
link.Name,
link.Region,
strconv.FormatUint(uint64(link.Port), 10),
status,
updatedAt,
})
Expand All @@ -144,7 +139,7 @@ func formatPrivateLinkClientError(action string, err error) error {
}

func createPrivateLink(ctx context.Context, cmd *cli.Command) error {
req := buildCreatePrivateLinkRequest(cmd.String("name"), cmd.String("region"), uint32(cmd.Uint("port")), cmd.String("endpoint"))
req := buildCreatePrivateLinkRequest(cmd.String("name"), cmd.String("endpoint"))
resp, err := agentsClient.CreatePrivateLink(ctx, req)
if err != nil {
return formatPrivateLinkClientError("create", err)
Expand Down Expand Up @@ -173,13 +168,13 @@ func listPrivateLinks(ctx context.Context, cmd *cli.Command) error {
return formatPrivateLinkClientError("list", err)
}

healthByID := make(map[string]*lkproto.PrivateLinkStatus, len(resp.Items))
healthByID := make(map[string]*lkproto.PrivateLinkHealthStatus, len(resp.Items))
healthErrByID := make(map[string]error)
for _, link := range resp.Items {
if link == nil || link.PrivateLinkId == "" {
continue
}
health, healthErr := agentsClient.GetPrivateLinkStatus(ctx, &lkproto.GetPrivateLinkStatusRequest{
health, healthErr := agentsClient.GetPrivateLinkHealthStatus(ctx, &lkproto.GetPrivateLinkHealthStatusRequest{
PrivateLinkId: link.PrivateLinkId,
})
if healthErr != nil {
Expand All @@ -193,9 +188,9 @@ func listPrivateLinks(ctx context.Context, cmd *cli.Command) error {

if cmd.Bool("json") {
type privateLinkWithHealth struct {
PrivateLink *lkproto.PrivateLink `json:"private_link"`
Status *lkproto.PrivateLinkStatus `json:"health"`
HealthError string `json:"health_error,omitempty"`
PrivateLink *lkproto.PrivateLink `json:"private_link"`
Status *lkproto.PrivateLinkHealthStatus `json:"health"`
HealthError string `json:"health_error,omitempty"`
}
items := make([]privateLinkWithHealth, 0, len(resp.Items))
for _, link := range resp.Items {
Expand All @@ -221,7 +216,7 @@ func listPrivateLinks(ctx context.Context, cmd *cli.Command) error {
}

rows := buildPrivateLinkListRows(resp.Items, healthByID, healthErrByID)
table := util.CreateTable().Headers("ID", "Name", "Region", "Port", "Health", "Updated At").Rows(rows...)
table := util.CreateTable().Headers("ID", "Name", "Health", "Updated At").Rows(rows...)
fmt.Println(table)
return nil
}
Expand All @@ -245,7 +240,7 @@ func deletePrivateLink(ctx context.Context, cmd *cli.Command) error {

func getPrivateLinkHealthStatus(ctx context.Context, cmd *cli.Command) error {
privateLinkID := cmd.String("id")
resp, err := agentsClient.GetPrivateLinkStatus(ctx, &lkproto.GetPrivateLinkStatusRequest{
resp, err := agentsClient.GetPrivateLinkHealthStatus(ctx, &lkproto.GetPrivateLinkHealthStatusRequest{
PrivateLinkId: privateLinkID,
})
if err != nil {
Expand Down
34 changes: 11 additions & 23 deletions cmd/lk/agent_private_link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ func TestAgentPrivateLinkCommandTree(t *testing.T) {
}

func TestBuildCreatePrivateLinkRequest_HappyPath(t *testing.T) {
req := buildCreatePrivateLinkRequest("orders-db", "us-east-1", 6379, "com.amazonaws.vpce.us-east-1.vpce-svc-abc123")
req := buildCreatePrivateLinkRequest("orders-db", "com.amazonaws.vpce.us-east-1.vpce-svc-abc123")
require.NotNil(t, req)

assert.Equal(t, "orders-db", req.Name)
assert.Equal(t, "us-east-1", req.Region)
assert.Equal(t, uint32(6379), req.Port)

aws := req.GetAws()
require.NotNil(t, aws)
Expand All @@ -62,7 +60,7 @@ func TestPrivateLinkServiceDNS(t *testing.T) {
}

func TestBuildPrivateLinkListRows_EmptyList(t *testing.T) {
rows := buildPrivateLinkListRows([]*lkproto.PrivateLink{}, map[string]*lkproto.PrivateLinkStatus{}, map[string]error{})
rows := buildPrivateLinkListRows([]*lkproto.PrivateLink{}, map[string]*lkproto.PrivateLinkHealthStatus{}, map[string]error{})
assert.Empty(t, rows)
}

Expand All @@ -71,15 +69,13 @@ func TestBuildPrivateLinkListRows_OnePrivateLink(t *testing.T) {
{
PrivateLinkId: "pl-1",
Name: "orders-db",
Region: "us-east-1",
Port: 6379,
},
}

now := time.Now().UTC()
healthByID := map[string]*lkproto.PrivateLinkStatus{
healthByID := map[string]*lkproto.PrivateLinkHealthStatus{
"pl-1": {
Status: lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE,
Status: lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY,
UpdatedAt: timestamppb.New(now),
},
}
Expand All @@ -88,41 +84,33 @@ func TestBuildPrivateLinkListRows_OnePrivateLink(t *testing.T) {
require.Len(t, rows, 1)
assert.Equal(t, "pl-1", rows[0][0])
assert.Equal(t, "orders-db", rows[0][1])
assert.Equal(t, "us-east-1", rows[0][2])
assert.Equal(t, "6379", rows[0][3])
assert.Equal(t, lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE.String(), rows[0][4])
assert.Equal(t, lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY.String(), rows[0][2])
}

func TestBuildPrivateLinkListRows_TwoPrivateLinksDifferentRegions(t *testing.T) {
func TestBuildPrivateLinkListRows_TwoPrivateLinks(t *testing.T) {
links := []*lkproto.PrivateLink{
{
PrivateLinkId: "pl-1",
Name: "orders-db",
Region: "us-east-1",
Port: 6379,
},
{
PrivateLinkId: "pl-2",
Name: "cache",
Region: "eu-west-1",
Port: 6380,
},
}

healthByID := map[string]*lkproto.PrivateLinkStatus{
healthByID := map[string]*lkproto.PrivateLinkHealthStatus{
"pl-1": {
Status: lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE,
Status: lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY,
},
"pl-2": {
Status: lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE,
Status: lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY,
},
}

rows := buildPrivateLinkListRows(links, healthByID, map[string]error{})
require.Len(t, rows, 2)

assert.Equal(t, "us-east-1", rows[0][2])
assert.Equal(t, "eu-west-1", rows[1][2])
assert.Equal(t, lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE.String(), rows[0][4])
assert.Equal(t, lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE.String(), rows[1][4])
assert.Equal(t, lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY.String(), rows[0][2])
assert.Equal(t, lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY.String(), rows[1][2])
}
99 changes: 99 additions & 0 deletions cmd/lk/agent_reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"fmt"
"net"
"sync"
"time"

agent "github.com/livekit/protocol/livekit/agent"

"github.com/livekit/livekit-cli/v2/pkg/ipc"
)

// reloadServer manages the dev-mode reload protocol between Go and Python processes.
// Flow:
// 1. Go → old Python: GetRunningJobsRequest → receives GetRunningJobsResponse (capture)
// 2. New Python → Go: GetRunningJobsRequest → Go replies with saved GetRunningJobsResponse (restore)
type reloadServer struct {
listener *ipc.Listener
mu sync.Mutex
savedJobs *agent.GetRunningJobsResponse
}

func newReloadServer() (*reloadServer, error) {
ln, err := ipc.Listen("127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("reload server: %w", err)
}
return &reloadServer{listener: ln}, nil
}

func (rs *reloadServer) addr() string {
return rs.listener.Addr().String()
}

// captureJobs sends GetRunningJobsRequest to the old Python process and stores the response.
func (rs *reloadServer) captureJobs(conn net.Conn) {
conn.SetDeadline(time.Now().Add(1500 * time.Millisecond))
defer conn.SetDeadline(time.Time{})

req := &agent.DevMessage{
Message: &agent.DevMessage_GetRunningJobsRequest{
GetRunningJobsRequest: &agent.GetRunningJobsRequest{},
},
}
if err := ipc.WriteProto(conn, req); err != nil {
fmt.Printf("reload: failed to send capture request: %v\n", err)
return
}

resp := &agent.DevMessage{}
if err := ipc.ReadProto(conn, resp); err != nil {
fmt.Printf("reload: failed to read capture response: %v\n", err)
return
}

if jobs := resp.GetGetRunningJobsResponse(); jobs != nil {
rs.mu.Lock()
rs.savedJobs = jobs
rs.mu.Unlock()
fmt.Printf("reload: captured %d running job(s)\n", len(jobs.Jobs))
}
}

// serveNewProcess handles a GetRunningJobsRequest from the new Python process,
// replying with the previously captured jobs.
func (rs *reloadServer) serveNewProcess(conn net.Conn) {
req := &agent.DevMessage{}
if err := ipc.ReadProto(conn, req); err != nil {
return
}
if req.GetGetRunningJobsRequest() == nil {
return
}

rs.mu.Lock()
saved := rs.savedJobs
rs.savedJobs = nil
rs.mu.Unlock()

if saved == nil {
saved = &agent.GetRunningJobsResponse{}
}

resp := &agent.DevMessage{
Message: &agent.DevMessage_GetRunningJobsResponse{
GetRunningJobsResponse: saved,
},
}
if err := ipc.WriteProto(conn, resp); err != nil {
fmt.Printf("reload: failed to send restore response: %v\n", err)
} else if len(saved.Jobs) > 0 {
fmt.Printf("reload: restored %d job(s) to new process\n", len(saved.Jobs))
}
}

func (rs *reloadServer) close() error {
return rs.listener.Close()
}
Loading