feat(run-lifecycle-mqtt): MQTT consumer + full run lifecycle pipeline#59
Open
dotjae wants to merge 16 commits into
Open
feat(run-lifecycle-mqtt): MQTT consumer + full run lifecycle pipeline#59dotjae wants to merge 16 commits into
dotjae wants to merge 16 commits into
Conversation
UpdateRunViewSet
- Flatten serializer.validated_data["details"] into payload before
process_event. Guards like is_*_authorized read actor_role at the
top level of payload (consistent with the Celery path in
realtime_engine/tasks.py), so without this every UI-driven
cancel/interrupt/short-turn/run_rejected returned 422 with
"actor_role '' is not authorized…".
Transitions
- INTERRUPT_RUN, SHORT_TURN_RUN, COMPLETE_RUN: add sync_lifecycle_state
so Redis's run:{id} hash reflects the terminal state. Without this
the simulator's RunBinder polled an "In Progress" value forever and
the UI never recognized the terminal state.
- RUN_TRACKING_LOST: stop removing the run from runs:tracking — the
set is scan_stale_runs's work queue, and removing on lost prevented
the eventual RUN_TRACKING_EXPIRED transition.
- RUN_TRACKING_EXPIRED: add sync_lifecycle_state + remove_from_tracking_set
so the run finally exits the queue when fully terminal.
Also includes RunHistoryView (read-only GET /api/runs/{id}/history/)
in views.py — drives the history panel in the simulator's Runs tab.
Note: committed against feat/run-lifecycle-mqtt which still has
unrelated WIP from another contributor (uncommitted) — only the two
files touched here are part of this fix.
Replaces the management-command mqtt_consumer with an in-worker
Celery bootstep at realtime_engine/mqtt.py. It subscribes to
transit/vehicle/+/{position,progression,occupancy}, updates Redis
on every ping, and emits lifecycle events (run_tracking_started,
run_started, complete_run, run_tracking_restored) based on the
current run state.
- backend/realtime_engine/mqtt.py: new MQTT bootstep
- backend/realtime_engine/management/commands/*: removed
(mqtt_consumer + bootstrap_simulator_runs no longer needed)
- backend/databus/celery.py: register the bootstep on workers that
set MQTT_CONSUMER_ENABLED=true
- backend/api/urls.py: add /api/runs/<uuid>/history/
- backend/Dockerfile, compose.dev.yml, compose.prod.yml: wire
telemetry-broker + realtime-engine environment so the consumer
attaches at startup
- backend/uv.lock: pick up paho-mqtt
- docs/content/processes/run-lifecycle.md, README, realtime_engine/README:
document the new flow
…update related views and URLs feat(run): add RunStateViewSet for retrieving current run lifecycle state docs: add README for run lifecycle states refactor(redis): change vehicle data key from 'data' to 'metadata' across scripts and tasks
…caffolding for the detection logic
…h run progress model
…tection Consolidate the lifecycle detection heuristics that were inlined in realtime_engine/mqtt.py (_maybe_fire_lifecycle_event) and scan_stale_runs into a dedicated, pure, unit-testable detection layer: - detection/result.py: DetectionResult value object - detection/thresholds.py: single source for telemetry grace/expiry (fixes the prior 300-vs-600 mismatch between tasks.py and lifecycle/guards.py) - detection/lifecycle_detectors.py: tracking-started, run-started (speed>0.5), tracking-restored, completion detectors - detection/periodic_detectors.py: tracking-lost / tracking-expired (staleness) - detection/registry.py + dispatch.py: ordered detector lists and a pure planner plus thin Redis/Celery wrappers mqtt.py and scan_stale_runs now delegate to the dispatcher and hold no detection logic. Lifecycle FSM behaviour is unchanged. 30 pure tests added. Detectors stay I/O-free; the runs:tracking seed that a guard depends on stays in the dispatcher.
Introduce backend/runs/domain/telemetry/ as the single shared contract for the edge/server Redis entity division. One file per GTFS-RT entity (position, occupancy, vehicle_stop_status, congestion_level, trip) plus centralized Redis key templates (keys.py). Each entity exposes a tolerant from_redis (str->typed, drops bad optional values) and a strict validate_for_write (typed->str, raises on missing required / invalid enum) so producers and builders agree on field names and types. occupancy.classify_status centralizes the GTFS enum bucketing policy; trip.project_from_run_hash projects the flat run hash to the trip subset. Modules are import-light (no Django/Redis at top level) and unit-tested under plain pytest (89 tests). No producers/builders wired yet.
Route inbound telemetry through the telemetry contract instead of blindly hset-ing every field as a string: - Subscribe to position and occupancy only; stop subscribing to progression (decommissioned server-side; the simulator may still publish it). - position payloads go through position.validate_for_write; occupancy through occupancy.validate_for_write. occupancy_status is server policy: the edge-sent value is discarded and recomputed via occupancy.classify_status from the raw percentage at write time. - Malformed payloads (e.g. position missing lat/lon) are logged and dropped without touching last_seen or the detection delegate — a bad payload never crashes ingestion. Unknown leaves are dropped at debug. - All Redis keys come from telemetry.keys; the runs:last_seen update and the detect_from_telemetry delegate (raw data) are preserved. Unit-tested with a fake redis + patched dispatch (7 tests).
In update_system_state, additively write the GTFS-RT-shaped TripDescriptor projection to run:<id>:trip alongside the flat run:<id> hash. Values are already computed in the run mapping, so this is a near-free extra hset in the existing pipeline, reusing trip.project_from_run_hash and telemetry.keys. Gated on a present trip_id so a run without a trip never writes an invalid TripDescriptor (a non-empty projection of only direction_id/schedule_ relationship is skipped). This makes the builder's v["trip"] an as-is read instead of hand-picking the subset from the flat hash. Unit-tested import-light via a stubbed runs.models + fake redis pipeline (2 tests).
Introduce runs/domain/progression/ to produce run:<id>:vehicle_stop_status server-side, decoupling stop status from edge telemetry (it depends on the assigned trip, so it is server-computed and run-keyed). - compute.py: pure compute_stop_status(run_hash, position_hash, *, prev_state) with a seam/placeholder body — defaults current_status to IN_TRANSIT_TO and carries forward current_stop_sequence/stop_id from prev_state as a monotonic floor. The signature is locked (the progress-FSM branch consumes it); the real map-matching port (GPS->polyline projection, radius rules, GTFS shape model) is deferred and marked TODO. - producer.py: reads position + run + prior stop-status from Redis, delegates to compute, validates via the contract, writes the typed hash. - mqtt: one guarded call after each successful position write derives stop status; failures are logged and never break ingestion. Minimum seam — real map-matching deferred. Unit-tested (pure compute + monkeypatched-redis producer + mqtt wiring assertion).
The payoff of the entity division: the GTFS-RT feed builders now read one ready-made, correctly-typed hash per entity (written by the MQTT consumer and lifecycle actions) and assemble the message by update/assignment instead of the ~110-line per-field try/except block. - Extract a Django-free builders.py (pure assembly) from tasks.py (now thin I/O shells). builders.py imports only the telemetry contracts + keys + fake_stop_times, so the feed dict can be unit-tested against the protobuf without a Django harness. - build_vehicle_position_entity / build_trip_update_entity read run:<id>:trip, vehicle:<id>:position, vehicle:<id>:occupancy, run:<id>:vehicle_stop_status, run:<id>:congestion_level, vehicle:<id>:metadata via from_redis. The decommissioned vehicle:<id>:progression read is gone; stop status now comes from the run-keyed vehicle_stop_status hash. - timestamp is lifted to the VehiclePosition/TripUpdate level (the Position sub-message has no timestamp). occupancy_count is intentionally NOT emitted — it is not a GTFS-RT VehiclePosition field and would break ParseDict. - trip falls back to projecting the flat run hash when run:<id>:trip is absent. - fake_stop_times is untouched: it receives the vehicle_stop_status hash via its existing progression= parameter. Add __init__.py to realtime_engine and schedule_engine so their tests/ packages are uniquely qualified (the two app-level tests dirs otherwise collide as a bare 'tests' package under pytest). Tests assert the assembled feed ParseDicts + SerializeToStrings into gtfs_realtime_pb2 (the contract gate), the timestamp lift, and the occupancy_count exclusion.
Bring the operational scripts and the Run model's mapping docstring in line with the entity key schema now that producers/builders use it. - cleanup_redis.py: drop the decommissioned vehicle:*:progression deletes; force_cleanup_all now also clears the server run-keyed entity hashes (run:*:trip, run:*:vehicle_stop_status, run:*:congestion_level). - inspect_redis.py: stop reading vehicle:<id>:progression; resolve the run via vehicle:<id>:current_run and read run:<id>:vehicle_stop_status (stop status) and run:<id>:congestion_level, printed as separate sections. - runs/models.py: rewrite the stale bottom docstring (which claimed run:<id>:position / :current_status / :occupancy_status) to the real schema, split by producer (edge vehicle:* vs server run:*), noting the timestamp lift and that vehicle:<id>:progression is decommissioned. (No docs reference these keys; backend/scripts/cleanup_runs.py was also aligned but is untracked and left for separate staging.)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
realtime_engine/mqtt.py): subscribes to vehicle telemetry broker, drives run lifecycle FSM transitions in real timeruns/domain/): complete guards, actions, and transitions for all lifecycle states — including flatten/sync fixes for terminal transitionsschedule_engine/tasks.py): ported publisher builders into the unified backend, removed legacypublisher/,scheduler/, andscripts/top-level service directoriesdatabus/celery.py): addedbuild_vehicle_positionsandbuild_trip_updatesbeat schedules (15 s cadence)schedule_engine/fake_stop_times.py): handle dict-shaped run/progression entries to unblock dev simulation