diff --git a/core/application/p2p.go b/core/application/p2p.go index 451e381214c2..dbd6f74aa9b2 100644 --- a/core/application/p2p.go +++ b/core/application/p2p.go @@ -53,9 +53,21 @@ func (a *Application) StartP2P() error { return err } + // modelsFn reports the model names this instance currently serves so the + // federation proxy can route a request only to peers that have the + // requested model. It is re-evaluated on every announce tick. + modelsFn := func() []string { + cfgs := a.ModelConfigLoader().GetAllModelsConfigs() + names := make([]string, 0, len(cfgs)) + for _, c := range cfgs { + names = append(names, c.Name) + } + return names + } + // Here a new node is created and started // and a service is exposed by the node - node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID)) + node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), modelsFn) if err != nil { return err } diff --git a/core/cli/federated.go b/core/cli/federated.go index c61adab0f072..a19e11df0965 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -14,12 +14,14 @@ type FederatedCLI struct { RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"` Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"` TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"` + UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-size limit in megabytes" group:"api"` + AffinitySync bool `env:"LOCALAI_FEDERATED_AFFINITY_SYNC,FEDERATED_AFFINITY_SYNC" default:"false" help:"Broadcast prefix-cache affinity observations to other federation servers over the p2p generic channel (enable on every federation server that should cohere)" group:"p2p"` } func (f *FederatedCLI) Run(ctx *cliContext.Context) error { warnDeprecatedFlags() - fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker) + fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker, int64(f.UploadLimit)*1024*1024, f.AffinitySync) c, cancel := context.WithCancel(context.Background()) diff --git a/core/cli/worker/worker_p2p.go b/core/cli/worker/worker_p2p.go index c7ff254ea451..223f4f90889a 100644 --- a/core/cli/worker/worker_p2p.go +++ b/core/cli/worker/worker_p2p.go @@ -62,7 +62,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { p = r.RunnerPort } - _, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID)) + _, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil) if err != nil { return err } @@ -104,7 +104,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { } }() - _, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID)) + _, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil) if err != nil { return err } diff --git a/core/cli/worker/worker_p2p_mlx.go b/core/cli/worker/worker_p2p_mlx.go index 7edd1673def2..77a95394eece 100644 --- a/core/cli/worker/worker_p2p_mlx.go +++ b/core/cli/worker/worker_p2p_mlx.go @@ -81,7 +81,7 @@ func (r *P2PMLX) Run(ctx *cliContext.Context) error { } }() - _, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID)) + _, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID), nil) if err != nil { return err } diff --git a/core/http/react-ui/e2e/cluster.spec.js b/core/http/react-ui/e2e/cluster.spec.js new file mode 100644 index 000000000000..523e0f60d50a --- /dev/null +++ b/core/http/react-ui/e2e/cluster.spec.js @@ -0,0 +1,37 @@ +import { test, expect } from './coverage-fixtures.js' + +// The Cluster page composes two capability sections: "Distributed (NATS)" (the +// former Nodes page) and "Swarm (p2p)" (the former P2P page). Each section only +// mounts when its mode is enabled — distributed when /api/nodes answers OK, swarm +// when a non-empty p2p network token is present. We mock those probes so the page +// renders against the standalone ui-test-server without NATS / p2p running. + +async function mockDistributedOnly(page) { + await page.route('**/api/nodes', (route) => { + route.fulfill({ status: 200, contentType: 'application/json', body: '[]' }) + }) + await page.route('**/api/nodes/scheduling', (route) => { + route.fulfill({ status: 200, contentType: 'application/json', body: '[]' }) + }) + // Swarm disabled: token probe fails, so the swarm section stays hidden. + await page.route('**/api/p2p/token', (route) => { + route.fulfill({ status: 503, contentType: 'text/plain', body: '' }) + }) +} + +test.describe('Cluster page', () => { + test('shows the page title', async ({ page }) => { + await mockDistributedOnly(page) + await page.goto('/app/cluster') + await expect(page).toHaveURL(/\/app\/cluster$/) + await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible() + }) + + test('shows the distributed section when /api/nodes responds', async ({ page }) => { + await mockDistributedOnly(page) + await page.goto('/app/cluster') + await expect(page).toHaveURL(/\/app\/cluster$/) + // The distributed capability section is titled "Distributed (NATS)". + await expect(page.getByText(/Distributed \(NATS\)/i)).toBeVisible() + }) +}) diff --git a/core/http/react-ui/e2e/navigation.spec.js b/core/http/react-ui/e2e/navigation.spec.js index d22dbe0a6c7f..cf2cf0a9a719 100644 --- a/core/http/react-ui/e2e/navigation.spec.js +++ b/core/http/react-ui/e2e/navigation.spec.js @@ -23,4 +23,11 @@ test.describe('Navigation', () => { await expect(page).toHaveURL(/\/app\/traces/) await expect(page.getByRole('heading', { name: 'Traces', exact: true })).toBeVisible() }) + + test('old cluster routes redirect to /app/cluster', async ({ page }) => { + await page.goto('/app/p2p') + await expect(page).toHaveURL(/\/app\/cluster$/) + await page.goto('/app/nodes') + await expect(page).toHaveURL(/\/app\/cluster$/) + }) }) diff --git a/core/http/react-ui/e2e/nodes-per-node-backend-actions.spec.js b/core/http/react-ui/e2e/nodes-per-node-backend-actions.spec.js index 76855437f15e..784a815f8202 100644 --- a/core/http/react-ui/e2e/nodes-per-node-backend-actions.spec.js +++ b/core/http/react-ui/e2e/nodes-per-node-backend-actions.spec.js @@ -81,7 +81,7 @@ async function mockDistributedNodes(page, { onDelete } = {}) { } async function expandNodeAndWaitForBackends(page) { - await page.goto('/app/nodes') + await page.goto('/app/cluster') // Click the row to expand it. The chevron toggle and the row both work, // but clicking the name cell is the most user-like. await page.getByText(NODE_NAME).first().click() diff --git a/core/http/react-ui/e2e/p2p.spec.js b/core/http/react-ui/e2e/p2p.spec.js index 8ea92faf85d9..70d0e8f397e3 100644 --- a/core/http/react-ui/e2e/p2p.spec.js +++ b/core/http/react-ui/e2e/p2p.spec.js @@ -1,26 +1,65 @@ import { test, expect } from './coverage-fixtures.js' -// P2P (Swarm) admin page — renders in the no-auth test harness (isAdmin). -test.describe('P2P page', () => { - test.beforeEach(async ({ page }) => { - await page.goto('/app/p2p') +// The standalone P2P (Swarm) page was merged into the Cluster page: /app/p2p now +// redirects to /app/cluster, and the p2p content lives under the "Swarm (p2p)" +// section. That section only mounts when p2p is enabled (a network token is +// present), so we mock /api/p2p/token to return a non-empty token and assert the +// swarm content renders under the cluster page. +const P2P_TOKEN = 'test-network-token' + +async function mockSwarmEnabled(page) { + await page.route('**/api/p2p/token', (route) => { + route.fulfill({ + status: 200, + contentType: 'text/plain', + body: P2P_TOKEN, + }) + }) + await page.route('**/api/p2p/workers', (route) => { + route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' }) + }) + await page.route('**/api/p2p/federation', (route) => { + route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' }) + }) + await page.route('**/api/p2p/stats', (route) => { + route.fulfill({ + status: 200, + contentType: 'application/json', + body: JSON.stringify({ + llama_cpp_workers: { online: 0, total: 0 }, + federated: { online: 0, total: 0 }, + mlx_workers: { online: 0, total: 0 }, + }), + }) }) + // The cluster page also probes /api/nodes for the distributed section; keep it + // failing (distributed disabled) so only the swarm section renders here. + await page.route('**/api/nodes', (route) => { + route.fulfill({ status: 503, contentType: 'application/json', body: '{}' }) + }) +} - test('renders the P2P distribution overview and capability cards', async ({ page }) => { - await expect(page).toHaveURL(/\/app\/p2p$/) - await expect(page.getByRole('heading', { name: /P2P Distribution Not Enabled/i })).toBeVisible() - await expect(page.getByRole('heading', { name: 'Instance Federation' })).toBeVisible() - await expect(page.getByRole('heading', { name: 'Model Sharding' })).toBeVisible() - await expect(page.getByRole('heading', { name: 'Resource Sharing' })).toBeVisible() - await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible() +test.describe('P2P (Swarm) section on the Cluster page', () => { + test('the old /app/p2p route lands on the cluster page', async ({ page }) => { + await mockSwarmEnabled(page) + // /app/p2p redirects to /app/cluster. + await page.goto('/app/p2p') + await expect(page).toHaveURL(/\/app\/cluster$/) + await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible() }) - test('hardware selector offers build targets and responds to selection', async ({ page }) => { - const cpu = page.getByRole('button').filter({ hasText: /^CPU$/ }) - const cuda = page.getByRole('button').filter({ hasText: /^CUDA 12$/ }) - await expect(cpu).toBeVisible() - await expect(cuda).toBeVisible() - await cuda.click() // selecting a build target must not break the page - await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible() + test('renders the Swarm (p2p) section when p2p is enabled', async ({ page }) => { + await mockSwarmEnabled(page) + await page.goto('/app/cluster') + await expect(page).toHaveURL(/\/app\/cluster$/) + + // The collapsible swarm section is titled "Swarm (p2p)". + await expect(page.getByText(/Swarm \(p2p\)/i)).toBeVisible() + + // The enabled p2p content (Network Token panel + the federation / sharding + // tabs) is rendered inside the swarm section. + await expect(page.getByRole('heading', { name: /Network Token/i })).toBeVisible() + await expect(page.getByText('Federation', { exact: true })).toBeVisible() + await expect(page.getByText('Model Sharding', { exact: true })).toBeVisible() }) }) diff --git a/core/http/react-ui/public/locales/de/admin.json b/core/http/react-ui/public/locales/de/admin.json index 88582b5a20f3..767131bea32e 100644 --- a/core/http/react-ui/public/locales/de/admin.json +++ b/core/http/react-ui/public/locales/de/admin.json @@ -58,5 +58,21 @@ "explorer": { "title": "Explorer", "subtitle": "Dateien und Konfiguration durchsuchen" + }, + "cluster": { + "title": "Cluster", + "subtitle": "Verteilte und Peer-to-Peer-Knoten, die diese Instanz bedienen", + "empty": "Es ist kein verteiltes oder p2p-Clustering aktiviert. Starte LocalAI im verteilten oder föderierten/p2p-Modus, um hier Cluster-Knoten zu verwalten.", + "distributed": { + "title": "Verteilt (NATS)" + }, + "swarm": { + "title": "Swarm (p2p)" + }, + "summary": { + "nodes": "Verteilte Knoten", + "inFlight": "Laufende Anfragen", + "peers": "Swarm-Peers online" + } } } diff --git a/core/http/react-ui/public/locales/de/nav.json b/core/http/react-ui/public/locales/de/nav.json index 891a15cae7dc..ddc1520a913e 100644 --- a/core/http/react-ui/public/locales/de/nav.json +++ b/core/http/react-ui/public/locales/de/nav.json @@ -40,6 +40,7 @@ "traces": "Traces", "nodes": "Knoten", "swarm": "Swarm", + "cluster": "Cluster", "system": "System", "settings": "Einstellungen", "api": "API" diff --git a/core/http/react-ui/public/locales/en/admin.json b/core/http/react-ui/public/locales/en/admin.json index f4a380ae33ba..c88e4c225fa3 100644 --- a/core/http/react-ui/public/locales/en/admin.json +++ b/core/http/react-ui/public/locales/en/admin.json @@ -81,5 +81,21 @@ "explorer": { "title": "Explorer", "subtitle": "Browse files and configuration" + }, + "cluster": { + "title": "Cluster", + "subtitle": "Distributed and peer-to-peer nodes serving this instance", + "empty": "No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.", + "distributed": { + "title": "Distributed (NATS)" + }, + "swarm": { + "title": "Swarm (p2p)" + }, + "summary": { + "nodes": "Distributed nodes", + "inFlight": "In-flight requests", + "peers": "Swarm peers online" + } } } diff --git a/core/http/react-ui/public/locales/en/nav.json b/core/http/react-ui/public/locales/en/nav.json index ac85d49794db..6d6f314925f2 100644 --- a/core/http/react-ui/public/locales/en/nav.json +++ b/core/http/react-ui/public/locales/en/nav.json @@ -41,6 +41,7 @@ "traces": "Traces", "nodes": "Nodes", "swarm": "Swarm", + "cluster": "Cluster", "system": "System", "settings": "Settings", "api": "API" diff --git a/core/http/react-ui/public/locales/es/admin.json b/core/http/react-ui/public/locales/es/admin.json index fee37c1ab959..85141ef112d1 100644 --- a/core/http/react-ui/public/locales/es/admin.json +++ b/core/http/react-ui/public/locales/es/admin.json @@ -58,5 +58,21 @@ "explorer": { "title": "Explorador", "subtitle": "Explora archivos y configuración" + }, + "cluster": { + "title": "Clúster", + "subtitle": "Nodos distribuidos y entre pares que sirven a esta instancia", + "empty": "No hay clustering distribuido ni p2p habilitado. Inicia LocalAI en modo distribuido o federado/p2p para gestionar aquí los nodos del clúster.", + "distributed": { + "title": "Distribuido (NATS)" + }, + "swarm": { + "title": "Swarm (p2p)" + }, + "summary": { + "nodes": "Nodos distribuidos", + "inFlight": "Solicitudes en curso", + "peers": "Pares de Swarm en línea" + } } } diff --git a/core/http/react-ui/public/locales/es/nav.json b/core/http/react-ui/public/locales/es/nav.json index 0c831a599af4..ab91f2072afb 100644 --- a/core/http/react-ui/public/locales/es/nav.json +++ b/core/http/react-ui/public/locales/es/nav.json @@ -40,6 +40,7 @@ "traces": "Trazas", "nodes": "Nodos", "swarm": "Swarm", + "cluster": "Clúster", "system": "Sistema", "settings": "Configuración", "api": "API" diff --git a/core/http/react-ui/public/locales/it/admin.json b/core/http/react-ui/public/locales/it/admin.json index 2bd575b661be..90b05e8cc719 100644 --- a/core/http/react-ui/public/locales/it/admin.json +++ b/core/http/react-ui/public/locales/it/admin.json @@ -58,5 +58,21 @@ "explorer": { "title": "Esplora risorse", "subtitle": "Sfoglia file e configurazioni" + }, + "cluster": { + "title": "Cluster", + "subtitle": "Nodi distribuiti e peer-to-peer al servizio di questa istanza", + "empty": "Nessun clustering distribuito o p2p è abilitato. Avvia LocalAI in modalità distribuita o federata/p2p per gestire qui i nodi del cluster.", + "distributed": { + "title": "Distribuito (NATS)" + }, + "swarm": { + "title": "Swarm (p2p)" + }, + "summary": { + "nodes": "Nodi distribuiti", + "inFlight": "Richieste in corso", + "peers": "Peer Swarm online" + } } } diff --git a/core/http/react-ui/public/locales/it/nav.json b/core/http/react-ui/public/locales/it/nav.json index e3d3ec434295..f9ef6f11c0bc 100644 --- a/core/http/react-ui/public/locales/it/nav.json +++ b/core/http/react-ui/public/locales/it/nav.json @@ -40,6 +40,7 @@ "traces": "Tracce", "nodes": "Nodi", "swarm": "Swarm", + "cluster": "Cluster", "system": "Sistema", "settings": "Impostazioni", "api": "API" diff --git a/core/http/react-ui/public/locales/zh-CN/admin.json b/core/http/react-ui/public/locales/zh-CN/admin.json index d55487e69e88..1e938c950c70 100644 --- a/core/http/react-ui/public/locales/zh-CN/admin.json +++ b/core/http/react-ui/public/locales/zh-CN/admin.json @@ -58,5 +58,21 @@ "explorer": { "title": "资源浏览器", "subtitle": "浏览文件和配置" + }, + "cluster": { + "title": "集群", + "subtitle": "为此实例提供服务的分布式和点对点节点", + "empty": "未启用分布式或 p2p 集群。请以分布式或联邦/p2p 模式启动 LocalAI,以便在此管理集群节点。", + "distributed": { + "title": "分布式 (NATS)" + }, + "swarm": { + "title": "Swarm (p2p)" + }, + "summary": { + "nodes": "分布式节点", + "inFlight": "进行中的请求", + "peers": "在线 Swarm 节点" + } } } diff --git a/core/http/react-ui/public/locales/zh-CN/nav.json b/core/http/react-ui/public/locales/zh-CN/nav.json index 84fff7c91942..9ff29d370748 100644 --- a/core/http/react-ui/public/locales/zh-CN/nav.json +++ b/core/http/react-ui/public/locales/zh-CN/nav.json @@ -40,6 +40,7 @@ "traces": "追踪", "nodes": "节点", "swarm": "Swarm", + "cluster": "集群", "system": "系统", "settings": "设置", "api": "API" diff --git a/core/http/react-ui/src/components/ClusterSection.jsx b/core/http/react-ui/src/components/ClusterSection.jsx new file mode 100644 index 000000000000..9ff7ce1d7232 --- /dev/null +++ b/core/http/react-ui/src/components/ClusterSection.jsx @@ -0,0 +1,27 @@ +import { useState } from 'react' + +// ClusterSection is a collapsible, titled container for one capability area of +// the Cluster page (Distributed / Swarm). Default expanded. +export default function ClusterSection({ icon, title, subtitle, defaultOpen = true, children }) { + const [open, setOpen] = useState(defaultOpen) + return ( +
+ + {open &&
{children}
} +
+ ) +} diff --git a/core/http/react-ui/src/components/ClusterSummary.jsx b/core/http/react-ui/src/components/ClusterSummary.jsx new file mode 100644 index 000000000000..0bd78b3cdee1 --- /dev/null +++ b/core/http/react-ui/src/components/ClusterSummary.jsx @@ -0,0 +1,44 @@ +import { useEffect, useState } from 'react' +import { useTranslation } from 'react-i18next' +import StatCard from './StatCard' +import { nodesApi, p2pApi } from '../utils/api' + +// ClusterSummary shows merged totals across both transports. Self-contained +// (own lightweight fetch) so the page composes without lifting state out of the +// two large section components. +export default function ClusterSummary({ distributedEnabled, p2pEnabled }) { + const { t } = useTranslation('admin') + const [nats, setNats] = useState({ nodes: 0, inFlight: 0 }) + const [swarm, setSwarm] = useState({ online: 0, total: 0 }) + + useEffect(() => { + let active = true + async function load() { + if (distributedEnabled) { + try { + const list = await nodesApi.list() + const nodes = Array.isArray(list) ? list : (list?.nodes ?? []) + if (active) setNats({ nodes: nodes.length, inFlight: nodes.reduce((a, n) => a + (n.in_flight_count || 0), 0) }) + } catch { /* leave zeros */ } + } + if (p2pEnabled) { + try { + const stats = await p2pApi.getStats() + const online = (stats?.federated?.online || 0) + (stats?.llama_cpp_workers?.online || 0) + (stats?.mlx_workers?.online || 0) + const total = (stats?.federated?.total || 0) + (stats?.llama_cpp_workers?.total || 0) + (stats?.mlx_workers?.total || 0) + if (active) setSwarm({ online, total }) + } catch { /* leave zeros */ } + } + } + load() + return () => { active = false } + }, [distributedEnabled, p2pEnabled]) + + return ( +
+ {distributedEnabled && } + {distributedEnabled && } + {p2pEnabled && } +
+ ) +} diff --git a/core/http/react-ui/src/components/NodeInstallPicker.jsx b/core/http/react-ui/src/components/NodeInstallPicker.jsx index 517d7156ce85..8264224683ef 100644 --- a/core/http/react-ui/src/components/NodeInstallPicker.jsx +++ b/core/http/react-ui/src/components/NodeInstallPicker.jsx @@ -479,7 +479,7 @@ export default function NodeInstallPicker({ Approve pending workers or register new ones. {pendingCount > 0 && ` (${pendingCount} awaiting approval.)`}

- + Manage nodes @@ -672,7 +672,7 @@ export default function NodeInstallPicker({ {pendingCount > 0 && (

- +{pendingCount} awaiting approval — approve from Nodes. + +{pendingCount} awaiting approval — approve from Nodes.

)} diff --git a/core/http/react-ui/src/components/Sidebar.jsx b/core/http/react-ui/src/components/Sidebar.jsx index c75ed377d1e0..453b7bada510 100644 --- a/core/http/react-ui/src/components/Sidebar.jsx +++ b/core/http/react-ui/src/components/Sidebar.jsx @@ -75,8 +75,7 @@ const sections = [ { path: '/app/middleware', icon: 'fas fa-shield-halved', labelKey: 'items.middleware', adminOnly: true }, { path: '/app/backends', icon: 'fas fa-server', labelKey: 'items.backends', adminOnly: true }, { path: '/app/traces', icon: 'fas fa-chart-line', labelKey: 'items.traces', adminOnly: true }, - { path: '/app/nodes', icon: 'fas fa-network-wired', labelKey: 'items.nodes', adminOnly: true, feature: 'distributed' }, - { path: '/app/p2p', icon: 'fas fa-circle-nodes', labelKey: 'items.swarm', adminOnly: true }, + { path: '/app/cluster', icon: 'fas fa-network-wired', labelKey: 'items.cluster', adminOnly: true }, { path: '/app/manage', icon: 'fas fa-desktop', labelKey: 'items.system', adminOnly: true }, { path: '/app/settings', icon: 'fas fa-cog', labelKey: 'items.settings', adminOnly: true }, ], diff --git a/core/http/react-ui/src/hooks/useP2PMode.js b/core/http/react-ui/src/hooks/useP2PMode.js new file mode 100644 index 000000000000..98676df9f0a2 --- /dev/null +++ b/core/http/react-ui/src/hooks/useP2PMode.js @@ -0,0 +1,31 @@ +import { useState, useEffect, useCallback } from 'react' +import { p2pApi } from '../utils/api' + +// useP2PMode reports whether p2p / swarm mode is available, mirroring +// useDistributedMode. Availability is "a network token exists" (the same signal +// the standalone P2P page used). One-shot probe on mount plus a manual refetch. +// +// Returns: +// enabled — true when a non-empty network token is present +// loading — true until the first probe completes +// refetch — manual trigger to re-run the probe +export function useP2PMode() { + const [enabled, setEnabled] = useState(false) + const [loading, setLoading] = useState(true) + + const probe = useCallback(async () => { + setLoading(true) + try { + const token = await p2pApi.getToken() + setEnabled(!!(token && String(token).trim())) + } catch { + setEnabled(false) + } finally { + setLoading(false) + } + }, []) + + useEffect(() => { probe() }, [probe]) + + return { enabled, loading, refetch: probe } +} diff --git a/core/http/react-ui/src/pages/BackendLogs.jsx b/core/http/react-ui/src/pages/BackendLogs.jsx index 3f5216dbc5e9..83c042137f2e 100644 --- a/core/http/react-ui/src/pages/BackendLogs.jsx +++ b/core/http/react-ui/src/pages/BackendLogs.jsx @@ -339,7 +339,7 @@ function DistributedBackendLogsResolver({ modelId, fromTimestamp }) {

Model not loaded on any worker

{modelId} isn't currently loaded on any node in the cluster. - Check the Nodes page to see which models are running where. + Check the Nodes page to see which models are running where.

diff --git a/core/http/react-ui/src/pages/Backends.jsx b/core/http/react-ui/src/pages/Backends.jsx index 53f1ef547c44..56d249b82fc4 100644 --- a/core/http/react-ui/src/pages/Backends.jsx +++ b/core/http/react-ui/src/pages/Backends.jsx @@ -49,7 +49,7 @@ export default function Backends() { // whenever splitMenuFor changes to a different row index. const splitMenuAnchorRef = useRef(null) - // Target-node mode: set when navigated from /app/nodes via "+ Add backend". + // Target-node mode: set when navigated from /app/cluster via "+ Add backend". // The gallery page header banners the scope; rows collapse their split-button // to a single Install-on-this-node action; manual install posts to the // per-node endpoint. @@ -323,7 +323,7 @@ export default function Backends() { return (
{/* Target-node banner: when this gallery is scoped to one node via - ?target= (entered from /app/nodes), show the scope clearly and + ?target= (entered from /app/cluster), show the scope clearly and give a fast way to clear it. Visually a primary-tinted strip so the user knows they're in a special mode without it feeling alarming. */} {targetNode && ( diff --git a/core/http/react-ui/src/pages/Cluster.jsx b/core/http/react-ui/src/pages/Cluster.jsx new file mode 100644 index 000000000000..3e87ddd36cdf --- /dev/null +++ b/core/http/react-ui/src/pages/Cluster.jsx @@ -0,0 +1,45 @@ +import { useTranslation } from 'react-i18next' +import { useDistributedMode } from '../hooks/useDistributedMode' +import { useP2PMode } from '../hooks/useP2PMode' +import ClusterSection from '../components/ClusterSection' +import ClusterSummary from '../components/ClusterSummary' +import Nodes from './Nodes' +import P2P from './P2P' + +export default function Cluster() { + const { t } = useTranslation('admin') + const distributed = useDistributedMode() + const p2p = useP2PMode() + + const loading = distributed.loading || p2p.loading + const nothingEnabled = !loading && !distributed.enabled && !p2p.enabled + + return ( +
+
+

{t('cluster.title', 'Cluster')}

+

{t('cluster.subtitle', 'Distributed and peer-to-peer nodes serving this instance')}

+
+ + {!loading && } + + {distributed.enabled && ( + + + + )} + + {p2p.enabled && ( + + + + )} + + {nothingEnabled && ( +
+ {t('cluster.empty', 'No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.')} +
+ )} +
+ ) +} diff --git a/core/http/react-ui/src/pages/NodeBackendLogs.jsx b/core/http/react-ui/src/pages/NodeBackendLogs.jsx index 58e7982339b5..4e09393a978f 100644 --- a/core/http/react-ui/src/pages/NodeBackendLogs.jsx +++ b/core/http/react-ui/src/pages/NodeBackendLogs.jsx @@ -162,7 +162,7 @@ export default function NodeBackendLogs() {

No node/model selected

View backend logs from the{' '} - Nodes page. + Nodes page.

@@ -220,7 +220,7 @@ export default function NodeBackendLogs() {

Backend logs from node {nodeName || nodeId} - {' '}(back to nodes) + {' '}(back to nodes)

diff --git a/core/http/react-ui/src/pages/Nodes.jsx b/core/http/react-ui/src/pages/Nodes.jsx index f2eb9d955770..7ddcf94e83c3 100644 --- a/core/http/react-ui/src/pages/Nodes.jsx +++ b/core/http/react-ui/src/pages/Nodes.jsx @@ -689,7 +689,7 @@ function SchedulingForm({ onSave, onCancel }) { ) } -export default function Nodes() { +export default function Nodes({ embedded = false }) { const { addToast } = useOutletContext() const navigate = useNavigate() const { t } = useTranslation('admin') @@ -983,16 +983,18 @@ export default function Nodes() { const pending = filteredNodes.filter(n => n.status === 'pending').length return ( -
-
-

- - {t('nodes.title')} -

-

- {t('nodes.subtitle')} -

-
+
+ {!embedded && ( +
+

+ + {t('nodes.title')} +

+

+ {t('nodes.subtitle')} +

+
+ )} {/* Tabs */}
diff --git a/core/http/react-ui/src/pages/P2P.jsx b/core/http/react-ui/src/pages/P2P.jsx index 7e52d1c77d5e..02641499bbf5 100644 --- a/core/http/react-ui/src/pages/P2P.jsx +++ b/core/http/react-ui/src/pages/P2P.jsx @@ -102,7 +102,7 @@ function StepNumber({ n, bg, color }) { ) } -export default function P2P() { +export default function P2P({ embedded = false }) { const { addToast } = useOutletContext() const { t } = useTranslation('admin') const [workers, setWorkers] = useState([]) @@ -172,7 +172,7 @@ export default function P2P() { if (loading) { return ( -
+
) @@ -181,7 +181,7 @@ export default function P2P() { // ── P2P Disabled ── if (!enabled) { return ( -
+

@@ -294,21 +294,23 @@ export default function P2P() { const mlxTotal = stats.mlx_workers?.total ?? 0 return ( -
-
-

- - {t('p2p.title')} -

-

- {t('p2p.subtitle')} - {' '} - - - -

-
+
+ {!embedded && ( +
+

+ + {t('p2p.title')} +

+

+ {t('p2p.subtitle')} + {' '} + + + +

+
+ )} {/* Network Token */}
import('./pages/Talk')) const Backends = page('backends', () => import('./pages/Backends')) const Settings = page('settings', () => import('./pages/Settings')) const Traces = page('traces', () => import('./pages/Traces')) -const P2P = page('p2p', () => import('./pages/P2P')) +const Cluster = page('cluster', () => import('./pages/Cluster')) const Agents = page('agents', () => import('./pages/Agents')) const AgentCreate = page(null, () => import('./pages/AgentCreate')) const AgentChat = page(null, () => import('./pages/AgentChat')) @@ -68,7 +68,6 @@ const Quantize = page('quantize', () => import('./pages/Quantize')) const Studio = page('studio', () => import('./pages/Studio')) const FaceRecognition = page('face', () => import('./pages/FaceRecognition')) const VoiceRecognition = page('voice', () => import('./pages/VoiceRecognition')) -const Nodes = page('nodes', () => import('./pages/Nodes')) const NodeBackendLogs = page(null, () => import('./pages/NodeBackendLogs')) const NotFound = page(null, () => import('./pages/NotFound')) const Usage = page('usage', () => import('./pages/Usage')) @@ -120,8 +119,9 @@ const appChildren = [ { path: 'settings', element: }, { path: 'traces', element: }, { path: 'backend-logs/:modelId', element: }, - { path: 'p2p', element: }, - { path: 'nodes', element: }, + { path: 'cluster', element: }, + { path: 'p2p', element: }, + { path: 'nodes', element: }, { path: 'node-backend-logs/:nodeId/:modelId', element: }, { path: 'agents', element: }, { path: 'agents/new', element: }, diff --git a/core/p2p/affinity_sync.go b/core/p2p/affinity_sync.go new file mode 100644 index 000000000000..6cef494e96e1 --- /dev/null +++ b/core/p2p/affinity_sync.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" + "github.com/mudler/edgevpn/pkg/blockchain" + "github.com/mudler/edgevpn/pkg/hub" + "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/xlog" +) + +// affinitySubjectKey is the hub.Message annotation carrying the logical subject +// (observe vs invalidate) so the receiver can dispatch the way a NATS subject +// would. The generic channel has no subject routing, so we carry it ourselves. +const affinitySubjectKey = "subject" + +// genericChannelPublisher adapts an edgevpn node's generic broadcast channel to +// the prefixcache publisher interface (Publish(subject, v)). It lets a +// federation server reuse prefixcache.Sync for cross-server affinity coherence +// without NATS: each event is JSON-encoded into a hub.Message and broadcast over +// the generic channel (not the slow blockchain ledger). +type genericChannelPublisher struct { + node *node.Node +} + +// Publish satisfies prefixcache's (unexported) publisher interface structurally. +func (p *genericChannelPublisher) Publish(subject string, v any) error { + payload, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("marshalling affinity event: %w", err) + } + return p.node.PublishMessage(&hub.Message{ + Message: string(payload), + Annotations: map[string]interface{}{affinitySubjectKey: subject}, + }) +} + +// applyAffinityMessage decodes a generic-channel affinity message and applies it +// to sync WITHOUT re-broadcasting (ApplyObserve/ApplyInvalidate). now is the +// local clock so TTL is measured per server. Unknown subjects, malformed +// payloads, and nil inputs are ignored (debug-logged), never fatal. +func applyAffinityMessage(sync *prefixcache.Sync, m *hub.Message, now time.Time) { + if sync == nil || m == nil { + return + } + subject, _ := m.Annotations[affinitySubjectKey].(string) + switch subject { + case messaging.SubjectPrefixCacheObserve: + var ev messaging.PrefixCacheObserveEvent + if err := json.Unmarshal([]byte(m.Message), &ev); err != nil { + xlog.Debug("affinity: bad observe payload", "error", err) + return + } + sync.ApplyObserve(ev, now) + case messaging.SubjectPrefixCacheInvalidate: + var ev messaging.PrefixCacheInvalidateEvent + if err := json.Unmarshal([]byte(m.Message), &ev); err != nil { + xlog.Debug("affinity: bad invalidate payload", "error", err) + return + } + sync.ApplyInvalidate(ev) + default: + // Other generic-channel traffic; not ours. + } +} + +// affinityHandler returns the edgevpn generic-channel handler that applies remote +// affinity events to this server's index. It is registered at node construction +// (handlers cannot be added after Start) and reads fs.prefixSync lazily, which is +// safe because messages only arrive after Start, by which point Start has wired +// fs.prefixSync. +func (fs *FederatedServer) affinityHandler() node.Handler { + return func(_ *blockchain.Ledger, m *hub.Message, _ chan *hub.Message) error { + applyAffinityMessage(fs.prefixSync, m, time.Now()) + return nil + } +} diff --git a/core/p2p/affinity_sync_test.go b/core/p2p/affinity_sync_test.go new file mode 100644 index 000000000000..066f7f335958 --- /dev/null +++ b/core/p2p/affinity_sync_test.go @@ -0,0 +1,48 @@ +package p2p + +import ( + "encoding/json" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" + "github.com/mudler/edgevpn/pkg/hub" +) + +var _ = Describe("applyAffinityMessage", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + observeMsg := func(ev messaging.PrefixCacheObserveEvent) *hub.Message { + payload, _ := json.Marshal(ev) + return &hub.Message{ + Message: string(payload), + Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}, + } + } + + It("applies a peer observe so the local index resolves the warm peer", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + sync := prefixcache.NewSync(idx, nil) + chain := prefixcache.ExtractChain("m1", "a fairly long shared system prompt body for the prefix chain", cfg) + + applyAffinityMessage(sync, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m1", Chain: chain, NodeID: "warm", Replica: 0}), ref) + + d := idx.Decide("m1", chain, []prefixcache.ReplicaKey{{NodeID: "warm"}, {NodeID: "cold"}}, ref) + Expect(d.HasHot).To(BeTrue()) + Expect(d.Hot.NodeID).To(Equal("warm")) + }) + + It("ignores malformed, unknown-subject, and nil inputs without panicking", func() { + sync := prefixcache.NewSync(prefixcache.NewIndex(prefixcache.DefaultConfig()), nil) + applyAffinityMessage(sync, &hub.Message{Message: "not-json", Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}}, ref) + applyAffinityMessage(sync, &hub.Message{Message: "{}", Annotations: map[string]interface{}{affinitySubjectKey: "some.other.subject"}}, ref) + applyAffinityMessage(sync, &hub.Message{Message: "{}"}, ref) + applyAffinityMessage(nil, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m"}), ref) + applyAffinityMessage(sync, nil, ref) + Expect(true).To(BeTrue()) + }) +}) diff --git a/core/p2p/federated.go b/core/p2p/federated.go index 7ed8cbab6871..3a5853935925 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -1,10 +1,17 @@ package p2p import ( + "context" + "encoding/json" "fmt" "math/rand/v2" + "strings" "sync" + "time" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" + "github.com/mudler/LocalAI/pkg/clusterrouting" "github.com/mudler/xlog" ) @@ -23,16 +30,29 @@ type FederatedServer struct { requestTable map[string]int loadBalanced bool workerTarget string + bodyLimit int64 // max request body bytes (0 = unlimited) + prefixCfg prefixcache.Config + prefixIndex *prefixcache.Index + prefixSync *prefixcache.Sync + prefixProvider prefixcache.Provider // Index (sync off) or Sync (sync on) + syncAffinity bool } -func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer { +func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64, syncAffinity bool) *FederatedServer { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) return &FederatedServer{ - listenAddr: listenAddr, - service: service, - p2ptoken: p2pToken, - requestTable: map[string]int{}, - loadBalanced: loadBalanced, - workerTarget: workerTarget, + listenAddr: listenAddr, + service: service, + p2ptoken: p2pToken, + requestTable: map[string]int{}, + loadBalanced: loadBalanced, + workerTarget: workerTarget, + bodyLimit: bodyLimit, + prefixCfg: cfg, + prefixIndex: idx, + prefixProvider: idx, + syncAffinity: syncAffinity, } } @@ -74,28 +94,141 @@ func (fs *FederatedServer) syncTableStatus() { } } -func (fs *FederatedServer) SelectLeastUsedServer() string { - fs.syncTableStatus() +// buildFederatedCandidates maps the currently-online federated peers into the +// shared routing policy's candidate form, optionally filtered to peers that can +// serve model. A peer with a non-empty advertised model set that lacks model is +// excluded; a peer with an empty set is treated as "unknown" and stays eligible +// (so older peers and mid-convergence peers are not starved). When model is "", +// no model filtering is applied. InFlight comes from the per-peer request +// counter; AvailableVRAM from the gossiped NodeData; LastUsed is left zero. +func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time, model string) []clusterrouting.ReplicaCandidate { + candidates := make([]clusterrouting.ReplicaCandidate, 0, len(nodes)) + for _, nd := range nodes { + if !nd.IsOnlineAt(now) { + continue + } + if !servesModel(nd, model) { + continue + } + candidates = append(candidates, clusterrouting.ReplicaCandidate{ + NodeID: nd.ID, + InFlight: requestTable[nd.ID], + AvailableVRAM: nd.AvailableVRAM, + }) + } + return candidates +} + +// servesModel reports whether nd is eligible to serve model. An empty model +// means "no filter". An empty advertised set means "unknown" and is eligible. +func servesModel(nd schema.NodeData, model string) bool { + if model == "" || len(nd.Models) == 0 { + return true + } + for _, m := range nd.Models { + if m == model { + return true + } + } + return false +} + +// extractModel best-effort resolves the target model of a buffered request, +// cheapest source first: an explicit query value, then the JSON body "model" +// field. Returns "" when it cannot be determined (for example a multipart or +// websocket request), in which case the caller routes by load/affinity only. +func extractModel(queryModel string, body []byte) string { + if strings.TrimSpace(queryModel) != "" { + return queryModel + } + if len(body) == 0 { + return "" + } + var probe struct { + Model string `json:"model"` + } + if err := json.Unmarshal(body, &probe); err != nil { + return "" + } + return probe.Model +} +// affinityPreferred returns the peer the prefix index considers warm for this +// chain, or "" when there is no match strong enough among the candidates. It +// reuses prefixcache's per-model radix-tree Decide; the final load-guarded pick +// is done by clusterrouting.PickWithAffinity so the VRAM tier is preserved. +func affinityPreferred(idx prefixcache.Provider, model string, chain []uint64, candidates []clusterrouting.ReplicaCandidate, cfg prefixcache.Config, now time.Time) string { + if idx == nil || len(chain) == 0 || len(candidates) == 0 { + return "" + } + keys := make([]prefixcache.ReplicaKey, 0, len(candidates)) + for _, c := range candidates { + keys = append(keys, prefixcache.ReplicaKey{NodeID: c.NodeID}) + } + d := idx.Decide(model, chain, keys, now) + if d.HasHot && d.MatchRatio >= cfg.MinPrefixMatch { + return d.Hot.NodeID + } + return "" +} + +// selectPeer chooses the federated peer to serve a request for model with the +// given raw body. It filters candidates by model, computes the prefix chain, +// consults the affinity index, and makes the final load+VRAM-aware pick. It +// returns the chosen peer ID and the chain (so the caller can Observe after a +// successful forward). An empty model and nil body degrade to load+VRAM only. +// Returns "" when no eligible peer is online. +func (fs *FederatedServer) selectPeer(model string, body []byte, now time.Time) (string, []uint64) { + fs.syncTableStatus() + nodes := GetAvailableNodes(fs.service) + // Snapshot candidates under the lock (it only guards requestTable), then + // release before the prefix hashing and tree walk, which are lock-free + // (candidates is a copy; prefixIndex/prefixCfg are set once at construction). fs.Lock() - defer fs.Unlock() + candidates := buildFederatedCandidates(nodes, fs.requestTable, now, model) + fs.Unlock() + if len(candidates) == 0 { + return "", nil + } + var chain []uint64 + preferred := "" + if fs.prefixProvider != nil && model != "" && len(body) > 0 { + chain = prefixcache.ExtractChain(model, string(body), fs.prefixCfg) + preferred = affinityPreferred(fs.prefixProvider, model, chain, candidates, fs.prefixCfg, now) + } + best := clusterrouting.PickWithAffinity(candidates, preferred, fs.prefixCfg.BalanceAbsThreshold) + if best == nil { + return "", chain + } + return best.NodeID, chain +} - xlog.Debug("SelectLeastUsedServer()", "request_table", fs.requestTable) - - // cycle over requestTable and find the entry with the lower number - // if there are multiple entries with the same number, select one randomly - // if there are no entries, return an empty string - var min int - var minKey string - for k, v := range fs.requestTable { - if min == 0 || v < min { - min = v - minKey = k - } +// observeServed records that peerID served the given chain for model, so the +// next request sharing that prefix is routed back to the same warm peer. +func (fs *FederatedServer) observeServed(model string, chain []uint64, peerID string, now time.Time) { + if fs.prefixProvider == nil || len(chain) == 0 || peerID == "" || model == "" { + return } - xlog.Debug("Selected tunnel", "tunnel", minKey, "requests_served", min, "request_table", fs.requestTable) + fs.prefixProvider.Observe(model, chain, prefixcache.ReplicaKey{NodeID: peerID}, now) +} - return minKey +// evictLoop periodically sweeps expired affinity entries so the in-memory tree +// does not grow unbounded. Runs for the lifetime of the proxy. +func (fs *FederatedServer) evictLoop(ctx context.Context) { + interval := fs.prefixCfg.TTL / 2 + if interval <= 0 { + interval = time.Minute + } + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-t.C: + fs.prefixProvider.Evict(now) + } + } } func (fs *FederatedServer) RecordRequest(nodeID string) { diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index 36d7eb45dac2..46ba5cf15688 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -1,22 +1,81 @@ package p2p import ( + "bufio" + "bytes" "context" "errors" "fmt" "io" "net" + "net/http" + "strings" + "time" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" "github.com/mudler/edgevpn/pkg/node" "github.com/mudler/xlog" ) +// ErrBodyTooLarge is returned by readRequest when the buffered request body +// exceeds the configured limit. The proxy turns it into a 413 response. +var ErrBodyTooLarge = errors.New("request body exceeds limit") + +// readRequest parses a single HTTP request from r and buffers its body (so the +// body can both be inspected for the model/prefix and replayed to the chosen +// peer). limit caps the buffered body in bytes; 0 means unlimited. A body over +// the cap returns ErrBodyTooLarge. The returned request has its body replaced +// with the buffered bytes and RequestURI cleared so it can be re-serialized +// with req.Write to the peer stream. +func readRequest(r *bufio.Reader, limit int64) (*http.Request, []byte, error) { + req, err := http.ReadRequest(r) + if err != nil { + return nil, nil, err + } + var body []byte + if req.Body != nil { + reader := io.Reader(req.Body) + if limit > 0 { + reader = io.LimitReader(req.Body, limit+1) + } + body, err = io.ReadAll(reader) + _ = req.Body.Close() + if err != nil { + return nil, nil, err + } + if limit > 0 && int64(len(body)) > limit { + return nil, nil, ErrBodyTooLarge + } + } + req.Body = io.NopCloser(bytes.NewReader(body)) + req.ContentLength = int64(len(body)) + req.RequestURI = "" + return req, body, nil +} + +// isWebsocketUpgrade reports whether req is a websocket handshake, which must be +// forwarded as a raw bidirectional duplex (not request/streamed-response) and +// is not body-capped or model-routed. +func isWebsocketUpgrade(req *http.Request) bool { + return strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && + strings.EqualFold(req.Header.Get("Upgrade"), "websocket") +} + func (f *FederatedServer) Start(ctx context.Context) error { - n, err := NewNode(f.p2ptoken) + var extraOpts []node.Option + if f.syncAffinity { + extraOpts = append(extraOpts, node.EnableGenericHub, node.GenericChannelHandlers(f.affinityHandler())) + } + n, err := NewNode(f.p2ptoken, extraOpts...) if err != nil { return fmt.Errorf("creating a new node: %w", err) } + if f.syncAffinity { + f.prefixSync = prefixcache.NewSync(f.prefixIndex, &genericChannelPublisher{node: n}) + f.prefixProvider = f.prefixSync + xlog.Info("Federation affinity sync enabled (generic channel)") + } err = n.Start(ctx) if err != nil { return fmt.Errorf("creating a new node: %w", err) @@ -28,6 +87,8 @@ func (f *FederatedServer) Start(ctx context.Context) error { return err } + go f.evictLoop(ctx) + return f.proxy(ctx, n) } @@ -62,40 +123,60 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { continue } - // Handle connections in a new goroutine, forwarding to the p2p service + // Handle connections in a new goroutine, terminating HTTP and + // forwarding the request to the chosen p2p peer. go func() { - workerID := "" - if fs.workerTarget != "" { - workerID = fs.workerTarget - } else if fs.loadBalanced { - xlog.Debug("Load balancing request") - - workerID = fs.SelectLeastUsedServer() - if workerID == "" { - xlog.Debug("Least used server not found, selecting random") - workerID = fs.RandomServer() + br := bufio.NewReader(conn) + req, body, err := readRequest(br, fs.bodyLimit) + if err != nil { + if err == ErrBodyTooLarge { + fs.sendHTMLResponse(conn, 413, "Request body too large") + return } - } else { + xlog.Error("Failed to read request", "error", err) + _ = conn.Close() + return + } + + upgrade := isWebsocketUpgrade(req) + + now := time.Now() + var ( + workerID string + model string + chain []uint64 + ) + switch { + case fs.workerTarget != "": + workerID = fs.workerTarget + case !fs.loadBalanced: + // Explicit random mode (the RandomWorker flag): keep the + // historical random pick, no model/affinity routing. workerID = fs.RandomServer() + case upgrade: + // Websocket: no readable model; route by load only. + workerID, _ = fs.selectPeer("", nil, now) + default: + model = extractModel(req.URL.Query().Get("model"), body) + workerID, chain = fs.selectPeer(model, body, now) } if workerID == "" { - xlog.Error("No available nodes yet") - fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect") + fs.sendHTMLResponse(conn, 503, "No federated peer available for this request") return } - xlog.Debug("Selected node", "node", workerID) nodeData, exists := GetNode(fs.service, workerID) if !exists { - xlog.Error("Node not found", "node", workerID) fs.sendHTMLResponse(conn, 404, "Node not found") return } - proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) - if fs.loadBalanced { - fs.RecordRequest(workerID) + proxyHTTPToPeer(ctx, node, nodeData.ServiceID, conn, req, upgrade) + + fs.RecordRequest(workerID) + if !upgrade { + fs.observeServed(model, chain, workerID, now) } }() } @@ -132,6 +213,8 @@ func getHTTPStatusText(statusCode int) string { switch statusCode { case 503: return "Service Unavailable" + case 413: + return "Request Entity Too Large" case 404: return "Not Found" case 200: diff --git a/core/p2p/federated_test.go b/core/p2p/federated_test.go new file mode 100644 index 000000000000..407058d9cdb5 --- /dev/null +++ b/core/p2p/federated_test.go @@ -0,0 +1,196 @@ +package p2p + +import ( + "bufio" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" + "github.com/mudler/LocalAI/pkg/clusterrouting" +) + +var _ = Describe("buildFederatedCandidates", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + onlineSeen := ref.Add(-10 * time.Second) + offlineSeen := ref.Add(-2 * time.Minute) + + It("excludes offline nodes", func() { + nodes := []schema.NodeData{ + {ID: "online", LastSeen: onlineSeen}, + {ID: "offline", LastSeen: offlineSeen}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "") + Expect(cands).To(HaveLen(1)) + Expect(cands[0].NodeID).To(Equal("online")) + }) + + It("maps the request counter to InFlight and defaults missing entries to zero", func() { + nodes := []schema.NodeData{ + {ID: "a", LastSeen: onlineSeen}, + {ID: "b", LastSeen: onlineSeen}, + } + cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref, "") + byID := map[string]int{} + for _, c := range cands { + byID[c.NodeID] = c.InFlight + } + Expect(byID["a"]).To(Equal(4)) + Expect(byID["b"]).To(Equal(0)) + }) + + It("carries gossiped AvailableVRAM into the candidate", func() { + nodes := []schema.NodeData{ + {ID: "gpu", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "") + Expect(cands[0].AvailableVRAM).To(Equal(uint64(24_000_000_000))) + }) + + It("produces candidates the shared policy ranks by least in-flight then most VRAM", func() { + // busy-big has the most VRAM but is busy, so it must lose. Among the two + // idle peers, the one with more free VRAM wins (VRAM breaks the tie). + nodes := []schema.NodeData{ + {ID: "busy-big", LastSeen: onlineSeen, AvailableVRAM: 80_000_000_000}, + {ID: "idle-small", LastSeen: onlineSeen, AvailableVRAM: 8_000_000_000}, + {ID: "idle-big", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000}, + } + cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref, "") + best := clusterrouting.PickBestReplica(cands) + Expect(best).ToNot(BeNil()) + Expect(best.NodeID).To(Equal("idle-big")) + }) +}) + +var _ = Describe("model-aware candidate building", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + seen := ref.Add(-10 * time.Second) + + It("keeps peers that advertise the requested model", func() { + nodes := []schema.NodeData{ + {ID: "has", LastSeen: seen, Models: []string{"m1", "m2"}}, + {ID: "hasnot", LastSeen: seen, Models: []string{"other"}}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1") + Expect(cands).To(HaveLen(1)) + Expect(cands[0].NodeID).To(Equal("has")) + }) + + It("keeps peers with an empty (unknown) model set eligible for any model", func() { + nodes := []schema.NodeData{ + {ID: "unknown", LastSeen: seen, Models: nil}, + {ID: "hasnot", LastSeen: seen, Models: []string{"other"}}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1") + Expect(cands).To(HaveLen(1)) + Expect(cands[0].NodeID).To(Equal("unknown")) + }) + + It("does not filter when the requested model is empty", func() { + nodes := []schema.NodeData{ + {ID: "a", LastSeen: seen, Models: []string{"x"}}, + {ID: "b", LastSeen: seen, Models: []string{"y"}}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "") + Expect(cands).To(HaveLen(2)) + }) +}) + +var _ = Describe("extractModel", func() { + It("reads the JSON body model field", func() { + body := []byte(`{"model":"llama-3","messages":[]}`) + Expect(extractModel("", body)).To(Equal("llama-3")) + }) + + It("prefers a path/query model over the body", func() { + body := []byte(`{"model":"frombody"}`) + Expect(extractModel("fromquery", body)).To(Equal("fromquery")) + }) + + It("returns empty when no model is present", func() { + Expect(extractModel("", []byte(`{"messages":[]}`))).To(Equal("")) + }) + + It("returns empty on non-JSON / unparseable body without panicking", func() { + Expect(extractModel("", []byte("--multipart-boundary--"))).To(Equal("")) + }) +}) + +var _ = Describe("affinityPreferred", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + It("returns the warm peer once a chain has been observed for it", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg) + idx.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref) + + cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}} + Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("warm")) + }) + + It("returns empty when no chain has been observed", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg) + cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}} + Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("")) + }) + + It("returns empty for a nil index or empty chain", func() { + cfg := prefixcache.DefaultConfig() + Expect(affinityPreferred(nil, "m1", []uint64{1}, nil, cfg, ref)).To(Equal("")) + idx := prefixcache.NewIndex(cfg) + Expect(affinityPreferred(idx, "m1", nil, nil, cfg, ref)).To(Equal("")) + }) +}) + +var _ = Describe("L7 request handling", func() { + It("reads a buffered request and its body under the cap", func() { + raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 28\r\n\r\n" + + `{"model":"m1","messages":[]}` + req, body, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024) + Expect(err).ToNot(HaveOccurred()) + Expect(req.URL.Path).To(Equal("/v1/chat/completions")) + Expect(string(body)).To(ContainSubstring(`"model":"m1"`)) + }) + + It("rejects a body over the cap with ErrBodyTooLarge", func() { + big := strings.Repeat("a", 200) + raw := "POST /x HTTP/1.1\r\nHost: x\r\nContent-Length: 200\r\n\r\n" + big + _, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 64) + Expect(err).To(MatchError(ErrBodyTooLarge)) + }) + + It("detects a websocket upgrade request", func() { + raw := "GET /v1/realtime HTTP/1.1\r\nHost: x\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n" + req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024) + Expect(err).ToNot(HaveOccurred()) + Expect(isWebsocketUpgrade(req)).To(BeTrue()) + }) + + It("does not flag a normal POST as a websocket upgrade", func() { + raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 2\r\n\r\n{}" + req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024) + Expect(err).ToNot(HaveOccurred()) + Expect(isWebsocketUpgrade(req)).To(BeFalse()) + }) +}) + +var _ = Describe("affinityPreferred with a sync provider", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + It("returns the warm peer when the provider is a Sync wrapping the index", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + sync := prefixcache.NewSync(idx, nil) + chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"a long shared system prompt for affinity"}]}`, cfg) + sync.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref) + + cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}} + Expect(affinityPreferred(sync, "m1", chain, cands, cfg, ref)).To(Equal("warm")) + }) +}) diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index d03a9c50c200..9e42b30e2ea0 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -6,15 +6,18 @@ import ( "fmt" "io" "net" + "net/http" "os" "strings" "sync" "time" "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/utils" + "github.com/mudler/LocalAI/pkg/xsysinfo" "github.com/mudler/edgevpn/pkg/config" "github.com/mudler/edgevpn/pkg/node" "github.com/mudler/edgevpn/pkg/protocol" @@ -86,37 +89,39 @@ func nodeAnnounce(ctx context.Context, node *node.Node) { ) } -func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) { - ledger, _ := node.Ledger() +// openPeerStream resolves serviceID to its advertised peer in the services +// ledger and opens a libp2p stream to that peer over the service protocol. +// Returns the stream or an error describing which lookup step failed. +func openPeerStream(ctx context.Context, n *node.Node, serviceID string) (network.Stream, error) { + ledger, _ := n.Ledger() // Retrieve current ID for ip in the blockchain existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID) service := &types.Service{} existingValue.Unmarshal(service) // If mismatch, update the blockchain if !found { - zlog.Error("Service not found on blockchain") - conn.Close() - // ll.Debugf("service '%s' not found on blockchain", serviceID) - return + return nil, errors.New("service not found on blockchain") } // Decode the Peer d, err := peer.Decode(service.PeerID) if err != nil { - zlog.Error("cannot decode peer") - - conn.Close() - // ll.Debugf("could not decode peer '%s'", service.PeerID) - return + return nil, fmt.Errorf("cannot decode peer: %w", err) } // Open a stream - stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) + stream, err := n.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) if err != nil { - zlog.Error("cannot open stream peer", "error", err) + return nil, fmt.Errorf("cannot open stream peer: %w", err) + } + return stream, nil +} +func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) { + stream, err := openPeerStream(ctx, node, serviceID) + if err != nil { + zlog.Error("Could not open peer stream", "error", err) conn.Close() - // ll.Debugf("could not open stream '%s'", err.Error()) return } // ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String()) @@ -130,6 +135,45 @@ func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn.Close() } +// proxyHTTPToPeer forwards an already-parsed HTTP request to the chosen peer +// over a libp2p stream and streams the response back to conn. When duplex is +// true (a websocket upgrade) it runs a bidirectional copy after writing the +// request, so post-101 frames flow both ways. The response is never buffered, +// so SSE keeps flowing. +func proxyHTTPToPeer(ctx context.Context, n *node.Node, serviceID string, conn net.Conn, req *http.Request, duplex bool) { + stream, err := openPeerStream(ctx, n, serviceID) + if err != nil { + zlog.Error("Could not open peer stream", "error", err) + _ = conn.Close() + return + } + // Force the peer to close after responding so the one-way io.Copy below + // terminates. Without this the peer keeps the HTTP/1.1 connection alive and + // io.Copy(conn, stream) blocks forever, leaking the goroutine, conn, and + // stream. Websocket upgrades keep keep-alive: their duplex copy owns the + // lifetime. + req.Header.Del("Connection") + req.Close = !duplex + if err := req.Write(stream); err != nil { + zlog.Error("Could not write request to peer", "error", err) + _ = stream.Close() + _ = conn.Close() + return + } + if duplex { + closer := make(chan struct{}, 2) + go copyStream(closer, stream, conn) + go copyStream(closer, conn, stream) + <-closer + _ = stream.Close() + _ = conn.Close() + return + } + _, _ = io.Copy(conn, stream) + _ = stream.Close() + _ = conn.Close() +} + func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { zlog.Info("Allocating service", "service", service, "address", listenAddr) // Open local port for listening @@ -311,7 +355,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv } // This is the P2P worker main -func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) { +func ExposeService(ctx context.Context, host, port, token, servicesID string, modelsFn func() []string) (*node.Node, error) { if servicesID == "" { servicesID = defaultServicesID } @@ -347,10 +391,16 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (* 20*time.Second, func() { updatedMap := map[string]any{} + var models []string + if modelsFn != nil { + models = modelsFn() + } updatedMap[name] = &schema.NodeData{ - Name: name, - LastSeen: time.Now(), - ID: nodeID(name), + Name: name, + LastSeen: time.Now(), + ID: nodeID(name), + AvailableVRAM: xsysinfo.GetGPUAggregateInfo().FreeVRAM, + Models: models, } ledger.Add(servicesID, updatedMap) }, @@ -359,11 +409,12 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (* return n, err } -func NewNode(token string) (*node.Node, error) { +func NewNode(token string, extraOpts ...node.Option) (*node.Node, error) { nodeOpts, err := newNodeOpts(token) if err != nil { return nil, err } + nodeOpts = append(nodeOpts, extraOpts...) n, err := node.New(nodeOpts...) if err != nil { diff --git a/core/p2p/p2p_suite_test.go b/core/p2p/p2p_suite_test.go new file mode 100644 index 000000000000..f24f9bbc26f6 --- /dev/null +++ b/core/p2p/p2p_suite_test.go @@ -0,0 +1,13 @@ +package p2p + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestP2P(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "P2P Suite") +} diff --git a/core/schema/localai.go b/core/schema/localai.go index 8704f8ad84ae..74f559a04bab 100644 --- a/core/schema/localai.go +++ b/core/schema/localai.go @@ -121,18 +121,38 @@ type StoresFindResponse struct { Similarities []float32 `json:"similarities" yaml:"similarities"` } +// NodeOnlineWindow is how long after its last announce a node still counts as +// online. Nodes re-announce into the edgevpn ledger every 20s (see core/p2p +// ExposeService), so 40s tolerates a single missed announce. +const NodeOnlineWindow = 40 * time.Second + type NodeData struct { Name string ID string TunnelAddress string ServiceID string LastSeen time.Time + // AvailableVRAM is the node's free GPU VRAM in bytes at its last announce, + // gossiped so federation selection can prefer peers with more headroom. + // Zero for CPU-only nodes and for peers on an older version that does not + // publish it; the routing policy treats zero as the lowest VRAM tier. + AvailableVRAM uint64 + // Models is the set of model names this peer currently serves, gossiped so + // the federation proxy can route a request only to peers that have the + // requested model. Empty means "unknown" (an older peer, or one that has + // not loaded any model yet) and is treated as eligible for any model so a + // mixed-version swarm is not starved. + Models []string } func (d NodeData) IsOnline() bool { - now := time.Now() - // if the node was seen in the last 40 seconds, it's online - return now.Sub(d.LastSeen) < 40*time.Second + return d.IsOnlineAt(time.Now()) +} + +// IsOnlineAt reports whether the node counts as online relative to now. It is +// split from IsOnline so selection logic can be exercised with a fixed clock. +func (d NodeData) IsOnlineAt(now time.Time) bool { + return now.Sub(d.LastSeen) < NodeOnlineWindow } type P2PNodesResponse struct { diff --git a/core/schema/nodedata_test.go b/core/schema/nodedata_test.go new file mode 100644 index 000000000000..3c38fc2afccf --- /dev/null +++ b/core/schema/nodedata_test.go @@ -0,0 +1,39 @@ +package schema_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/schema" +) + +var _ = Describe("NodeData", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + It("is online within the online window", func() { + nd := schema.NodeData{LastSeen: ref.Add(-30 * time.Second)} + Expect(nd.IsOnlineAt(ref)).To(BeTrue()) + }) + + It("is offline once the online window has elapsed", func() { + nd := schema.NodeData{LastSeen: ref.Add(-50 * time.Second)} + Expect(nd.IsOnlineAt(ref)).To(BeFalse()) + }) + + It("treats exactly the window boundary as offline (strict less-than)", func() { + nd := schema.NodeData{LastSeen: ref.Add(-schema.NodeOnlineWindow)} + Expect(nd.IsOnlineAt(ref)).To(BeFalse()) + }) + + It("carries AvailableVRAM in bytes", func() { + nd := schema.NodeData{AvailableVRAM: 8_000_000_000} + Expect(nd.AvailableVRAM).To(Equal(uint64(8_000_000_000))) + }) + + It("carries the advertised model set", func() { + nd := schema.NodeData{Models: []string{"llama-3", "qwen"}} + Expect(nd.Models).To(ConsistOf("llama-3", "qwen")) + }) +}) diff --git a/pkg/clusterrouting/affinity.go b/pkg/clusterrouting/affinity.go new file mode 100644 index 000000000000..4067002cad11 --- /dev/null +++ b/pkg/clusterrouting/affinity.go @@ -0,0 +1,33 @@ +package clusterrouting + +// PickWithAffinity prefers the candidate whose NodeID equals preferredNodeID +// when that candidate's in-flight count is within slack of the least-loaded +// candidate; otherwise it falls back to PickBestReplica (least in-flight, then +// oldest last-used, then most free VRAM). This keeps a warm prefix-cache peer +// sticky without letting it become a hot-spot: once it is more than slack +// requests busier than the least-loaded peer, load wins. With an empty +// preferredNodeID, or a preferred node not in the set, it is exactly +// PickBestReplica. slack mirrors prefixcache's BalanceAbsThreshold. +func PickWithAffinity(candidates []ReplicaCandidate, preferredNodeID string, slack int) *ReplicaCandidate { + if len(candidates) == 0 { + return nil + } + if preferredNodeID == "" { + return PickBestReplica(candidates) + } + var preferred *ReplicaCandidate + minInFlight := candidates[0].InFlight + for i := range candidates { + c := &candidates[i] + if c.InFlight < minInFlight { + minInFlight = c.InFlight + } + if c.NodeID == preferredNodeID { + preferred = c + } + } + if preferred != nil && preferred.InFlight <= minInFlight+slack { + return preferred + } + return PickBestReplica(candidates) +} diff --git a/pkg/clusterrouting/affinity_test.go b/pkg/clusterrouting/affinity_test.go new file mode 100644 index 000000000000..50ed7f3a5e43 --- /dev/null +++ b/pkg/clusterrouting/affinity_test.go @@ -0,0 +1,48 @@ +package clusterrouting + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("PickWithAffinity", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + It("returns nil for an empty candidate list", func() { + Expect(PickWithAffinity(nil, "x", 2)).To(BeNil()) + }) + + It("falls back to PickBestReplica when no preferred node is given", func() { + cs := []ReplicaCandidate{ + {NodeID: "busy", InFlight: 3, LastUsed: ref, AvailableVRAM: 80}, + {NodeID: "idle", InFlight: 0, LastUsed: ref, AvailableVRAM: 8}, + } + Expect(PickWithAffinity(cs, "", 2).NodeID).To(Equal("idle")) + }) + + It("honors the preferred node when it is within the in-flight slack of the least-loaded", func() { + cs := []ReplicaCandidate{ + {NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80}, + {NodeID: "warm", InFlight: 2, LastUsed: ref, AvailableVRAM: 8}, + } + Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("warm")) + }) + + It("ignores the preferred node when it is beyond the slack and falls back to load+VRAM", func() { + cs := []ReplicaCandidate{ + {NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80}, + {NodeID: "warm", InFlight: 5, LastUsed: ref, AvailableVRAM: 8}, + } + Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("cold")) + }) + + It("falls back to load+VRAM when the preferred node is not among the candidates", func() { + cs := []ReplicaCandidate{ + {NodeID: "a", InFlight: 1, LastUsed: ref, AvailableVRAM: 8}, + {NodeID: "b", InFlight: 1, LastUsed: ref, AvailableVRAM: 24}, + } + Expect(PickWithAffinity(cs, "ghost", 2).NodeID).To(Equal("b")) + }) +})