Skip to content

feat(run-lifecycle-mqtt): MQTT consumer + full run lifecycle pipeline#59

Open
dotjae wants to merge 16 commits into
mainfrom
feat/run-lifecycle-mqtt
Open

feat(run-lifecycle-mqtt): MQTT consumer + full run lifecycle pipeline#59
dotjae wants to merge 16 commits into
mainfrom
feat/run-lifecycle-mqtt

Conversation

@dotjae

@dotjae dotjae commented May 23, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • MQTT consumer (realtime_engine/mqtt.py): subscribes to vehicle telemetry broker, drives run lifecycle FSM transitions in real time
  • Run lifecycle pipeline (runs/domain/): complete guards, actions, and transitions for all lifecycle states — including flatten/sync fixes for terminal transitions
  • Schedule engine cleanup (schedule_engine/tasks.py): ported publisher builders into the unified backend, removed legacy publisher/, scheduler/, and scripts/ top-level service directories
  • Celery beat (databus/celery.py): added build_vehicle_positions and build_trip_updates beat schedules (15 s cadence)
  • Fix (schedule_engine/fake_stop_times.py): handle dict-shaped run/progression entries to unblock dev simulation

dotjae and others added 16 commits May 18, 2026 18:54
UpdateRunViewSet
- Flatten serializer.validated_data["details"] into payload before
  process_event. Guards like is_*_authorized read actor_role at the
  top level of payload (consistent with the Celery path in
  realtime_engine/tasks.py), so without this every UI-driven
  cancel/interrupt/short-turn/run_rejected returned 422 with
  "actor_role '' is not authorized…".

Transitions
- INTERRUPT_RUN, SHORT_TURN_RUN, COMPLETE_RUN: add sync_lifecycle_state
  so Redis's run:{id} hash reflects the terminal state. Without this
  the simulator's RunBinder polled an "In Progress" value forever and
  the UI never recognized the terminal state.
- RUN_TRACKING_LOST: stop removing the run from runs:tracking — the
  set is scan_stale_runs's work queue, and removing on lost prevented
  the eventual RUN_TRACKING_EXPIRED transition.
- RUN_TRACKING_EXPIRED: add sync_lifecycle_state + remove_from_tracking_set
  so the run finally exits the queue when fully terminal.

Also includes RunHistoryView (read-only GET /api/runs/{id}/history/)
in views.py — drives the history panel in the simulator's Runs tab.

Note: committed against feat/run-lifecycle-mqtt which still has
unrelated WIP from another contributor (uncommitted) — only the two
files touched here are part of this fix.
Replaces the management-command mqtt_consumer with an in-worker
Celery bootstep at realtime_engine/mqtt.py. It subscribes to
transit/vehicle/+/{position,progression,occupancy}, updates Redis
on every ping, and emits lifecycle events (run_tracking_started,
run_started, complete_run, run_tracking_restored) based on the
current run state.

- backend/realtime_engine/mqtt.py: new MQTT bootstep
- backend/realtime_engine/management/commands/*: removed
  (mqtt_consumer + bootstrap_simulator_runs no longer needed)
- backend/databus/celery.py: register the bootstep on workers that
  set MQTT_CONSUMER_ENABLED=true
- backend/api/urls.py: add /api/runs/<uuid>/history/
- backend/Dockerfile, compose.dev.yml, compose.prod.yml: wire
  telemetry-broker + realtime-engine environment so the consumer
  attaches at startup
- backend/uv.lock: pick up paho-mqtt
- docs/content/processes/run-lifecycle.md, README, realtime_engine/README:
  document the new flow
…update related views and URLs

feat(run): add RunStateViewSet for retrieving current run lifecycle state
docs: add README for run lifecycle states
refactor(redis): change vehicle data key from 'data' to 'metadata' across scripts and tasks
…tection

Consolidate the lifecycle detection heuristics that were inlined in
realtime_engine/mqtt.py (_maybe_fire_lifecycle_event) and scan_stale_runs into a
dedicated, pure, unit-testable detection layer:

- detection/result.py: DetectionResult value object
- detection/thresholds.py: single source for telemetry grace/expiry (fixes the
  prior 300-vs-600 mismatch between tasks.py and lifecycle/guards.py)
- detection/lifecycle_detectors.py: tracking-started, run-started (speed>0.5),
  tracking-restored, completion detectors
- detection/periodic_detectors.py: tracking-lost / tracking-expired (staleness)
- detection/registry.py + dispatch.py: ordered detector lists and a pure planner
  plus thin Redis/Celery wrappers

mqtt.py and scan_stale_runs now delegate to the dispatcher and hold no detection
logic. Lifecycle FSM behaviour is unchanged. 30 pure tests added.

Detectors stay I/O-free; the runs:tracking seed that a guard depends on stays in
the dispatcher.
Introduce backend/runs/domain/telemetry/ as the single shared contract for
the edge/server Redis entity division. One file per GTFS-RT entity (position,
occupancy, vehicle_stop_status, congestion_level, trip) plus centralized Redis
key templates (keys.py).

Each entity exposes a tolerant from_redis (str->typed, drops bad optional
values) and a strict validate_for_write (typed->str, raises on missing
required / invalid enum) so producers and builders agree on field names and
types. occupancy.classify_status centralizes the GTFS enum bucketing policy;
trip.project_from_run_hash projects the flat run hash to the trip subset.

Modules are import-light (no Django/Redis at top level) and unit-tested under
plain pytest (89 tests). No producers/builders wired yet.
Route inbound telemetry through the telemetry contract instead of blindly
hset-ing every field as a string:

- Subscribe to position and occupancy only; stop subscribing to progression
  (decommissioned server-side; the simulator may still publish it).
- position payloads go through position.validate_for_write; occupancy through
  occupancy.validate_for_write. occupancy_status is server policy: the
  edge-sent value is discarded and recomputed via occupancy.classify_status
  from the raw percentage at write time.
- Malformed payloads (e.g. position missing lat/lon) are logged and dropped
  without touching last_seen or the detection delegate — a bad payload never
  crashes ingestion. Unknown leaves are dropped at debug.
- All Redis keys come from telemetry.keys; the runs:last_seen update and the
  detect_from_telemetry delegate (raw data) are preserved.

Unit-tested with a fake redis + patched dispatch (7 tests).
In update_system_state, additively write the GTFS-RT-shaped TripDescriptor
projection to run:<id>:trip alongside the flat run:<id> hash. Values are
already computed in the run mapping, so this is a near-free extra hset in the
existing pipeline, reusing trip.project_from_run_hash and telemetry.keys.

Gated on a present trip_id so a run without a trip never writes an invalid
TripDescriptor (a non-empty projection of only direction_id/schedule_
relationship is skipped). This makes the builder's v["trip"] an as-is read
instead of hand-picking the subset from the flat hash.

Unit-tested import-light via a stubbed runs.models + fake redis pipeline
(2 tests).
Introduce runs/domain/progression/ to produce run:<id>:vehicle_stop_status
server-side, decoupling stop status from edge telemetry (it depends on the
assigned trip, so it is server-computed and run-keyed).

- compute.py: pure compute_stop_status(run_hash, position_hash, *, prev_state)
  with a seam/placeholder body — defaults current_status to IN_TRANSIT_TO and
  carries forward current_stop_sequence/stop_id from prev_state as a monotonic
  floor. The signature is locked (the progress-FSM branch consumes it); the
  real map-matching port (GPS->polyline projection, radius rules, GTFS shape
  model) is deferred and marked TODO.
- producer.py: reads position + run + prior stop-status from Redis, delegates
  to compute, validates via the contract, writes the typed hash.
- mqtt: one guarded call after each successful position write derives stop
  status; failures are logged and never break ingestion.

Minimum seam — real map-matching deferred. Unit-tested (pure compute +
monkeypatched-redis producer + mqtt wiring assertion).
The payoff of the entity division: the GTFS-RT feed builders now read one
ready-made, correctly-typed hash per entity (written by the MQTT consumer and
lifecycle actions) and assemble the message by update/assignment instead of
the ~110-line per-field try/except block.

- Extract a Django-free builders.py (pure assembly) from tasks.py (now thin
  I/O shells). builders.py imports only the telemetry contracts + keys +
  fake_stop_times, so the feed dict can be unit-tested against the protobuf
  without a Django harness.
- build_vehicle_position_entity / build_trip_update_entity read run:<id>:trip,
  vehicle:<id>:position, vehicle:<id>:occupancy, run:<id>:vehicle_stop_status,
  run:<id>:congestion_level, vehicle:<id>:metadata via from_redis. The
  decommissioned vehicle:<id>:progression read is gone; stop status now comes
  from the run-keyed vehicle_stop_status hash.
- timestamp is lifted to the VehiclePosition/TripUpdate level (the Position
  sub-message has no timestamp). occupancy_count is intentionally NOT
  emitted — it is not a GTFS-RT VehiclePosition field and would break ParseDict.
- trip falls back to projecting the flat run hash when run:<id>:trip is absent.
- fake_stop_times is untouched: it receives the vehicle_stop_status hash via
  its existing progression= parameter.

Add __init__.py to realtime_engine and schedule_engine so their tests/ packages
are uniquely qualified (the two app-level tests dirs otherwise collide as a
bare 'tests' package under pytest).

Tests assert the assembled feed ParseDicts + SerializeToStrings into
gtfs_realtime_pb2 (the contract gate), the timestamp lift, and the
occupancy_count exclusion.
Bring the operational scripts and the Run model's mapping docstring in line
with the entity key schema now that producers/builders use it.

- cleanup_redis.py: drop the decommissioned vehicle:*:progression deletes;
  force_cleanup_all now also clears the server run-keyed entity hashes
  (run:*:trip, run:*:vehicle_stop_status, run:*:congestion_level).
- inspect_redis.py: stop reading vehicle:<id>:progression; resolve the run via
  vehicle:<id>:current_run and read run:<id>:vehicle_stop_status (stop status)
  and run:<id>:congestion_level, printed as separate sections.
- runs/models.py: rewrite the stale bottom docstring (which claimed
  run:<id>:position / :current_status / :occupancy_status) to the real schema,
  split by producer (edge vehicle:* vs server run:*), noting the timestamp lift
  and that vehicle:<id>:progression is decommissioned.

(No docs reference these keys; backend/scripts/cleanup_runs.py was also aligned
but is untracked and left for separate staging.)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants