Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ea8c082
feature ok / needs to be tested
IdirLISN Mar 31, 2026
3052ac6
feature deactivated by default + hiden behind a button by default + v…
IdirLISN Apr 2, 2026
115f74c
worker monitoring button behind user menu
IdirLISN Apr 2, 2026
15ef892
files blacked for fixing the formatting issues
IdirLISN Apr 2, 2026
bb80f24
fixing synthax and format
IdirLISN Apr 2, 2026
84b8922
rebase on dev
IdirLISN May 27, 2026
bda93c1
clean feature
IdirLISN May 27, 2026
923c16c
git rebase continue
IdirLISN May 27, 2026
f92891f
feature in progress
IdirLISN Apr 7, 2026
9be3ec9
git rebase continue
IdirLISN May 27, 2026
3231a2a
git rebase continue
IdirLISN May 27, 2026
8fae6a5
compute worker monitoring on private queues (amazing stuff)
IdirLISN May 7, 2026
4b870fc
test number 245
IdirLISN May 7, 2026
7a8dd0b
git rebase continue
IdirLISN May 27, 2026
de9a401
rebase and fix incoming
IdirLISN May 27, 2026
99347b2
rebase and fix incoming
IdirLISN May 27, 2026
b4fd5a6
feature clean
IdirLISN May 27, 2026
88ac295
conflicts solved
IdirLISN May 28, 2026
a49cf13
conflicts solved
IdirLISN May 28, 2026
f43d5fd
private CW pb solved
IdirLISN May 28, 2026
e430db5
linter fix
IdirLISN May 28, 2026
2f7d290
remove comment
IdirLISN May 28, 2026
f08280f
feature en cours
IdirLISN Jun 1, 2026
2aa19aa
monitor queues feature for admin ok
IdirLISN Jun 2, 2026
2ee6d3b
UX/UI improved
IdirLISN Jun 2, 2026
9f0e970
UI/UX improvment
IdirLISN Jun 4, 2026
c15117e
final push
IdirLISN Jun 4, 2026
ddca9b4
adding colors (final push)
IdirLISN Jun 4, 2026
3f8033c
panel resize
IdirLISN Jun 4, 2026
1c42616
UI cleaning
IdirLISN Jun 4, 2026
6dd6214
save panel state
IdirLISN Jun 4, 2026
c440a67
revert details.tag before CW monitoring merge
IdirLISN Jun 4, 2026
4d77c5f
worker jobs fix first try
IdirLISN Jun 4, 2026
d6265ce
adding queue jobs display
IdirLISN Jun 5, 2026
a63a8e8
queue jobs feaute ux/ui polish
IdirLISN Jun 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
579 changes: 319 additions & 260 deletions src/apps/competitions/tasks.py

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions src/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,6 @@
'task': 'profiles.tasks.clean_non_activated_users',
'schedule': timedelta(days=1), # Run every 24 hours
},
"refresh_compute_worker_health": {
"task": "competitions.tasks.refresh_compute_worker_health",
"schedule": 60,
},
}
CELERY_TIMEZONE = 'UTC'
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Expand Down
2 changes: 0 additions & 2 deletions src/static/riot/competitions/detail/_header.tag
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
Migrate
</button>

<!--
<worker-monitor-toggle
if="{competition.admin}"
can_view_workers_panel="true"
competition_id="{ competition.id }">
</worker-monitor-toggle>
-->

</div>
<div class="row">
Expand Down
679 changes: 48 additions & 631 deletions src/static/riot/competitions/detail/detail.tag

Large diffs are not rendered by default.

1,148 changes: 931 additions & 217 deletions src/static/riot/competitions/detail/worker-monitor-toggle.tag

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions src/templates/pages/monitor_queues.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,21 @@ <h1>Monitor queues</h1>
</div>
</div>
{% endif %}

{% if user.is_superuser %}
<div class="ui container">

<div class="ui segment">
<worker-monitor-toggle
can_view_workers_panel="true"
all_workers="true"
inline_mode="true">
</worker-monitor-toggle>
</div>

<div id="external_monitors" class="ui two column grid">
</div>
</div>
{% endif %}

{% endblock %}
122 changes: 47 additions & 75 deletions src/utils/consumers.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,20 @@
import asyncio
import json
import logging
import time

from competitions.models import Competition

from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django_redis import get_redis_connection

from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY
from utils.worker_utils import fetch_compute_workers

logger = logging.getLogger(__name__)

r = get_redis_connection("default")


def _load_snapshot(competition_queue_name=None):
"""
Charge les workers depuis Redis.
- workers par défaut : toujours inclus (queue_source == 'default')
- workers privés : inclus uniquement si leur queue_source correspond
à la queue de la compétition courante
"""
raw = r.hgetall(WORKERS_REGISTRY_KEY)
workers = []
private_workers = []
now = time.time()

for _, value in raw.items():
try:
worker = json.loads(value)
except Exception:
continue

if now - worker.get("last_seen", 0) > WORKER_HEARTBEAT_TTL:
continue

if worker.get("queue_source") == "default":
workers.append(worker)
else:
# Worker privé : n'afficher que si la queue correspond à la compétition
if competition_queue_name and worker.get("queue_source") == competition_queue_name:
private_workers.append(worker)

workers.sort(key=lambda x: x.get("hostname", ""))
private_workers.sort(key=lambda x: (x.get("queue_source", ""), x.get("hostname", "")))
return workers, private_workers


def _get_competition_queue_name(competition_id):
"""Retourne le nom de la queue de la compétition, ou None."""
if not competition_id:
return None
try:
from competitions.models import Competition

competition = Competition.objects.select_related("queue").get(pk=competition_id)
if competition.queue and competition.queue.name:
return competition.queue.name
Expand All @@ -62,6 +23,30 @@ def _get_competition_queue_name(competition_id):
return None


def _load_snapshot(competition_queue_name=None, show_all=False):
workers, private_workers, queue_stats = fetch_compute_workers()

if show_all:
pass
elif competition_queue_name:
private_workers = [
w
for w in private_workers
if w.get("queue_source") == competition_queue_name
]
queue_stats = [
q
for q in queue_stats
if q.get("source_name") == competition_queue_name
or q.get("source_name") == "default"
]
else:
private_workers = []
queue_stats = [q for q in queue_stats if q.get("source_name") == "default"]

return workers, private_workers, queue_stats


class ComputeWorkersConsumer(AsyncJsonWebsocketConsumer):

async def connect(self):
Expand All @@ -72,6 +57,7 @@ async def connect(self):
await self.accept()
await self.channel_layer.group_add("compute_workers", self.channel_name)
self._competition_queue_name = None
self._show_all = False
self._running = True
self._subscribed = asyncio.Event()
self._task = asyncio.create_task(self._push_workers_loop())
Expand All @@ -88,55 +74,41 @@ async def disconnect(self, close_code):
pass

async def receive_json(self, content):
logger.debug("WebSocket received: %s", content)
if content.get("type") == "subscribe":
competition_id = content.get("competition_id")
self._competition_queue_name = await sync_to_async(_get_competition_queue_name)(
competition_id)
if content.get("all_workers"):
self._show_all = True
else:
competition_id = content.get("competition_id")
self._competition_queue_name = await sync_to_async(
_get_competition_queue_name
)(competition_id)
self._subscribed.set()

async def _push_workers_loop(self):
try:
try:
await asyncio.wait_for(self._subscribed.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("WebSocket subscribe timeout, proceeding without competition filter")
logger.warning("WebSocket subscribe timeout, proceeding without filter")

while self._running:
workers, private_workers = await sync_to_async(_load_snapshot)(
self._competition_queue_name
workers, private_workers, queue_stats = await sync_to_async(_load_snapshot)(
competition_queue_name=self._competition_queue_name,
show_all=self._show_all,
)
if not self._running:
break
try:
await self.send_json({
"type": "workers.snapshot",
"workers": workers,
"private_workers": private_workers,
})
await self.send_json(
{
"type": "workers.snapshot",
"workers": workers,
"private_workers": private_workers,
"queue_stats": queue_stats,
}
)
except RuntimeError:
break
await asyncio.sleep(3)
except asyncio.CancelledError:
pass

async def worker_health(self, event):
worker = event["worker"]
is_default = worker.get("queue_source") == "default"
is_mine = (
self._competition_queue_name is not None
and worker.get("queue_source") == self._competition_queue_name
)
if not is_default and not is_mine:
return
try:
workers, private_workers = await sync_to_async(_load_snapshot)(
self._competition_queue_name
)
await self.send_json({
"type": "workers.snapshot",
"workers": workers,
"private_workers": private_workers,
})
except RuntimeError:
pass
129 changes: 110 additions & 19 deletions src/utils/worker_utils.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,119 @@
import logging
import time

import requests
from django.conf import settings
from queues.models import Queue

WORKERS_REGISTRY_KEY = "workers:registry"
WORKER_HEARTBEAT_TTL = 180
logger = logging.getLogger(__name__)

PIDBOX_SUFFIX = ".celery.pidbox"

def extract_queue_names(active_queues):
names = set()
for q in active_queues or []:
if isinstance(q, dict) and q.get("name"):
names.add(q["name"])
return names

def _rabbitmq_auth():
return (settings.RABBITMQ_DEFAULT_USER, settings.RABBITMQ_DEFAULT_PASS)

def known_compute_queue_names():
return set(
Queue.objects.exclude(name__isnull=True)
.exclude(name="")
.values_list("name", flat=True)
)

def _rabbitmq_base_url():
return f"http://{settings.RABBITMQ_HOST}:{settings.RABBITMQ_MANAGEMENT_PORT}/api"


def is_compute_worker(worker_name, queue_names, known_queue_names):
return (
bool(queue_names & known_queue_names)
or "compute-worker" in queue_names
or worker_name.startswith("compute-worker")
def _build_vhost_to_source_map():
"""{ vhost_string: queue_source_name }. Default vhost '/' → 'default'."""
mapping = {"/": "default"}
for q in Queue.objects.exclude(vhost__isnull=True).values("vhost", "name"):
mapping[str(q["vhost"])] = q["name"]
return mapping


def _fetch_all_queues():
resp = requests.get(
f"{_rabbitmq_base_url()}/queues",
auth=_rabbitmq_auth(),
timeout=5,
)
resp.raise_for_status()
return resp.json()


def is_compute_worker(worker_name):
return worker_name.startswith("compute-worker")


def fetch_compute_workers():
try:
all_queues = _fetch_all_queues()
except Exception:
logger.exception("Failed to fetch queues from RabbitMQ Management API")
return [], [], []

try:
vhost_to_source = _build_vhost_to_source_map()
except Exception:
logger.exception("Failed to build vhost→source map")
return [], [], []

by_vhost: dict[str, list] = {}
for q in all_queues:
by_vhost.setdefault(q["vhost"], []).append(q)

workers = []
private_workers = []
queue_stats = []
now = time.time()

for vhost, queues in by_vhost.items():
source_name = vhost_to_source.get(vhost)
if not source_name:
continue

cw_queue = next((q for q in queues if q["name"] == "compute-worker"), None)
messages_ready = cw_queue.get("messages_ready", 0) if cw_queue else 0
messages_unacked = cw_queue.get("messages_unacknowledged", 0) if cw_queue else 0
cw_consumers = cw_queue.get("consumers", 0) if cw_queue else 0

queue_stats.append(
{
"source_name": source_name,
"jobs_pending": messages_ready,
"jobs_running": messages_unacked,
"workers_count": cw_consumers,
}
)

for pidbox_q in queues:
name = pidbox_q["name"]
if not (
name.endswith(PIDBOX_SUFFIX) and name.startswith("compute-worker@")
):
continue
hostname = name[: -len(PIDBOX_SUFFIX)]
if not is_compute_worker(hostname):
continue

pidbox_alive = pidbox_q.get("consumers", 0) > 0

if not pidbox_alive or cw_consumers == 0:
status = "unavailable"
elif messages_unacked > 0:
status = "busy"
else:
status = "available"

worker = {
"hostname": hostname,
"status": status,
"last_seen": now,
"queue_source": source_name,
"queue_names": ["compute-worker"],
}

if source_name == "default":
workers.append(worker)
else:
private_workers.append(worker)

workers.sort(key=lambda x: x["hostname"])
private_workers.sort(key=lambda x: (x["queue_source"], x["hostname"]))
queue_stats.sort(key=lambda x: x["source_name"])
return workers, private_workers, queue_stats