Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
754261c
feat(run-lifecycle): complete run lifecycle + MQTT pipeline
dotjae May 19, 2026
de1fa06
fix(run-lifecycle): flatten details + sync state on terminal transitions
dotjae May 20, 2026
595f9ad
feat(run-lifecycle-mqtt): MQTT consumer + lifecycle infra
dotjae May 20, 2026
fa54280
fix(schedule_engine): handle dict-shaped run/progression in fake stop…
dotjae May 21, 2026
cacf60e
refactor(run): rename UpdateRunSerializer to RunUpdateSerializer and …
fabianabarca May 25, 2026
eec4e60
refactor(domain): consolidate imports from runs.domain for clarity an…
fabianabarca May 25, 2026
430edaf
refactor(domain): separate finite state machines in folders and add s…
fabianabarca May 27, 2026
76e7172
refactor(transitions): update state and event references to align wit…
fabianabarca May 29, 2026
c4a44b8
refactor(runs): extract telemetry/stale detection into runs/domain/de…
dotjae Jun 2, 2026
9fe1573
refactor(runs): map models to GTFS Realtime VehiclePosition objects
fabianabarca Jun 4, 2026
49d095b
feat(telemetry): add GTFS-RT entity contract module + keys
dotjae Jun 8, 2026
c2ca1ed
refactor(mqtt): ingest only edge entities, write typed hashes
dotjae Jun 9, 2026
361fef9
feat(runs): write run:<id>:trip hash in lifecycle action
dotjae Jun 9, 2026
5ab7fc2
feat(progression): server-side stop-status producer (seam)
dotjae Jun 9, 2026
603d8c4
refactor(schedule_engine): builders read per-entity hashes
dotjae Jun 9, 2026
8ee2a87
chore(redis): align ops scripts + mapping docstring to entity keys
dotjae Jun 9, 2026
04cfb66
refactor(detection): instance-based detectors with enum events
dotjae Jun 10, 2026
e63af5d
feat(telemetry): add run:<id>:stop_time_updates projection contract +…
dotjae Jun 10, 2026
ad63d8d
feat(progression): build cached shape/stop geometry from GTFS
dotjae Jun 10, 2026
8cf5f5a
feat(progression): real map-matching for vehicle_stop_status
dotjae Jun 10, 2026
7b07889
refactor(schedule_engine): builder reads stop_time_updates projection…
dotjae Jun 10, 2026
41de4cd
feat(progression): use GTFS shape_dist_traveled for polyline cum_dist…
dotjae Jun 10, 2026
353b30c
fix(progression): DP-based monotonic stop projection for loop-back sh…
dotjae Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@ The application will be available at `http://localhost:8000`

For the full documentation site, run `mkdocs serve` and visit http://localhost:8000

## Demo: full run lifecycle

End-to-end demo of a complete run lifecycle driven by MQTT telemetry from the simulator.

```bash
# Terminal 1 — start the full databus stack
cd databus && bash scripts/dev.sh

# Terminal 2 — load GTFS feed
docker compose -f compose.dev.yml exec orchestrator \
uv run python manage.py loaddata gtfs.json

# Terminal 3 — start the simulator (wired to databus broker)
# The simulator's scheduler posts to /api/create-run on each schedule entry's
# start_time. The UI's Operator tab handles confirmation. No databus-side
# bootstrap command is required.
cd ../simulator && docker compose up simulator web

# Terminal 4 — observe (optional)
open http://localhost:8080 # live map
watch ls backend/feed/files/ # GTFS-RT outputs (refresh every 15 s)
```

Within ~30 s of starting the simulator:

- Every run advances `CONFIRMED → TRACKING → IN_PROGRESS`
- `backend/feed/files/vehicle_positions.pb` contains one `FeedEntity` per active run
- `backend/feed/files/trip_updates.pb` contains stop-time predictions

Killing the simulator triggers `RUN_TRACKING_LOST` after 60 s and
`RUN_TRACKING_EXPIRED → CANCELLED` after 300 s.

Verify the protobuf output:

```python
from google.transit import gtfs_realtime_pb2
msg = gtfs_realtime_pb2.FeedMessage()
msg.ParseFromString(open("backend/feed/files/vehicle_positions.pb", "rb").read())
print(len(msg.entity)) # should equal the number of active runs
```

## 🛣️ Roadmap

Where is this going? Check SIMOVI's [roadmap](https://github.com/simovilab/context/blob/main/roadmap.md).
Expand Down
30 changes: 19 additions & 11 deletions backend/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from runs.models import (
Run,
Position,
Progression,
Occupancy,
VehicleStopStatus,
CongestionLevel,
OccupancyStatus,
)
from runs.domain.events import RunLifecycleEvents
from runs.domain.lifecycle import RunLifecycleEvents
from feed.models import *
from django.contrib.auth.models import User
from rest_framework import serializers
Expand Down Expand Up @@ -130,10 +131,9 @@ class CreateRunSerializer(serializers.Serializer):
)


class UpdateRunSerializer(serializers.Serializer):
run_id = serializers.CharField(max_length=100)
class RunUpdateSerializer(serializers.Serializer):
event = serializers.ChoiceField(choices=RunLifecycleEvents)
details = serializers.JSONField()
details = serializers.JSONField(required=False, default=dict)


class PositionSerializer(serializers.HyperlinkedModelSerializer):
Expand Down Expand Up @@ -175,23 +175,31 @@ def get_longitude(self, obj):
# return Position.objects.create(point=point, **validated_data)


class ProgressionSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())
class VehicleStopStatusSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())

class Meta:
model = Progression
model = VehicleStopStatus
fields = "__all__"
fields = "__all__"
ordering = ["id"]


class OccupancySerializer(serializers.HyperlinkedModelSerializer):
class CongestionLevelSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())

class Meta:
model = CongestionLevel
fields = "__all__"
fields = "__all__"
ordering = ["id"]


class OccupancyStatusSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())

class Meta:
model = Occupancy
model = OccupancyStatus
fields = "__all__"
fields = "__all__"
ordering = ["id"]
Expand Down
17 changes: 15 additions & 2 deletions backend/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
router.register(r"equipment-log", views.EquipmentLogViewSet)
# router.register(r"run", views.RunViewSet)
router.register(r"position", views.PositionViewSet)
router.register(r"progression", views.ProgressionViewSet)
router.register(r"stop-status", views.VehicleStopStatusViewSet)
router.register(r"occupancy", views.OccupancyViewSet)
router.register(r"congestion", views.CongestionLevelViewSet)
# GTFS Schedule
router.register(r"agency", views.AgencyViewSet)
router.register(r"stops", views.StopViewSet)
Expand All @@ -39,7 +40,19 @@
path("login/", views.LoginView.as_view(), name="login"),
# path("route-stops/", views.RouteStopView.as_view(), name="route_stops"),
path("create-run/", views.CreateRunViewSet.as_view(), name="create_run"),
path("update-run/", views.UpdateRunViewSet.as_view(), name="update_run"),
path(
"runs/<uuid:run_id>/state/", views.RunStateViewSet.as_view(), name="run_state"
),
path(
"runs/<uuid:run_id>/update/",
views.RunUpdateViewSet.as_view(),
name="run_update",
),
path(
"runs/<uuid:run_id>/history/",
views.RunHistoryView.as_view(),
name="run_history",
),
path("service-today/", views.ServiceTodayView.as_view(), name="service_today"),
path("which-shapes/", views.WhichShapesView.as_view(), name="which_shapes"),
path("find-trips/", views.FindTripsView.as_view(), name="find_trips"),
Expand Down
139 changes: 114 additions & 25 deletions backend/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from realtime_engine.tasks import run_lifecycle_event
from runs.services.exceptions import RunLifecycleError
from runs.services.lifecycle import RunLifecycleService
from runs.domain.events import RunLifecycleEvents
from runs.domain.states import RunLifecycleStates
from runs.domain.lifecycle import RunLifecycleEvents
from runs.domain.lifecycle import RunLifecycleStates
from operations.models import (
Vehicle,
Operator,
Expand All @@ -26,9 +26,11 @@
)
from runs.models import (
Run,
RunLifecycleTransition,
Position,
Progression,
Occupancy,
VehicleStopStatus,
CongestionLevel,
OccupancyStatus,
)
from feed.models import (
Feed,
Expand All @@ -55,10 +57,11 @@
EquipmentLogSerializer,
OperatorSerializer,
CreateRunSerializer,
UpdateRunSerializer,
RunUpdateSerializer,
PositionSerializer,
ProgressionSerializer,
OccupancySerializer,
VehicleStopStatusSerializer,
CongestionLevelSerializer,
OccupancyStatusSerializer,
AgencySerializer,
StopSerializer,
GeoStopSerializer,
Expand Down Expand Up @@ -206,12 +209,14 @@ def post(self, request):
{"status": "error", "step": "operational_validation", "errors": errors},
status=status.HTTP_400_BAD_REQUEST,
)
# Registration of the run (event: RUN_REQUESTED, state: REQUESTED)
# Record creation puts the run in REQUESTED state (run_requested event)
try:
run = Run.objects.create(**payload)
run.vehicle.set([vehicle])
run.operator.set([operator_obj])
payload["run_id"] = run.id
payload["vehicle_id"] = vehicle_id
payload["operator_id"] = operator_id
except Exception as e:
return Response(
{
Expand All @@ -221,26 +226,23 @@ def post(self, request):
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# First transition: GTFS validation (run_lifecycle_state = REQUESTED)
# REQUESTED → VALIDATED: GTFS consistency check
try:
service.process_event(RunLifecycleEvents.RUN_REQUESTED, payload)
service.process_event(RunLifecycleEvents.VALIDATE_RUN, payload)
except RunLifecycleError as e:
payload["guards"] = e.errors.attempts.guards
payload["actions"] = e.errors.attempts.actions
service.process_event(RunLifecycleEvents.RUN_REJECTED, payload)
return Response(
{"status": "error", "step": "gtfs_validation", "errors": e.errors},
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
# System initialization (run_lifecycle_state = VALIDATED)
# VALIDATED → INITIALIZED: write system state
try:
service.process_event(RunLifecycleEvents.VALIDATE_RUN, payload)
service.process_event(RunLifecycleEvents.INITIALIZE_RUN, payload)
except RunLifecycleError as e:
return Response(
{"status": "error", "step": "initialization", "errors": e.errors},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# If successful, give a 200 OK response (run_lifecycle_state = INITIALIZED)
return Response(
{
"status": "success",
Expand All @@ -251,32 +253,76 @@ def post(self, request):
)


class UpdateRunViewSet(APIView):
class RunStateViewSet(APIView):
"""
Endpoint to get the current lifecycle state of a run.

It only allows the GET method with the run_id as path parameter.
"""

def get(self, request, run_id):
run = Run.objects.filter(id=run_id).first()
if not run:
return Response(
{"status": "error", "errors": {"run_id": "Run not found"}},
status=status.HTTP_404_NOT_FOUND,
)
return Response(
{"status": "success", "run_lifecycle_state": run.run_lifecycle_state},
status=status.HTTP_200_OK,
)


class RunUpdateViewSet(APIView):
"""
Endpoint to request an update of the lifecycle state of an existing run.

It only allows the POST method with the event to process.
"""

def post(self, request):
def post(self, request, run_id):
service = RunLifecycleService()
serializer = UpdateRunSerializer(data=request.data)
serializer = RunUpdateSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"status": "error", "errors": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
payload = dict(serializer.validated_data)
run_id = payload.get("run_id")
# Flatten `details` into payload so guards/actions can read fields like
# `actor_role` at the top level — matches the convention used by the
# internal Celery path (realtime_engine/tasks.py).
details = payload.pop("details", {}) or {}
if isinstance(details, dict):
payload.update(details)
payload["run_id"] = run_id
run = Run.objects.filter(id=run_id).first()
if not run:
return Response(
{"status": "error", "errors": {"run_id": "Run not found"}},
status=status.HTTP_404_NOT_FOUND,
)
# Ensure the effective event (after payload normalization) is valid.
event = payload.get("event")
event_value = (
event.value if isinstance(event, RunLifecycleEvents) else str(event)
)
allowed_events = {e.value for e in RunLifecycleEvents}
if event_value not in allowed_events:
return Response(
{
"status": "error",
"errors": {
"event": f"Invalid event '{event_value}'. Allowed values: {sorted(allowed_events)}"
},
},
status=status.HTTP_400_BAD_REQUEST,
)
payload["event"] = event_value
try:
new_run_lifecycle_state = service.process_event(event, payload)
new_run_lifecycle_state, _guards, _actions = service.process_event(
event_value, payload
)
except RunLifecycleError as e:
return Response(
{"status": "error", "errors": e.errors},
Expand All @@ -288,21 +334,64 @@ def post(self, request):
)


class RunHistoryView(APIView):
"""
Return the ordered list of FSM transitions for a run.

Read-only audit log derived from RunLifecycleTransition, which the
lifecycle service writes before any external side-effect (so the log
is authoritative even if a downstream action later fails).
"""

def get(self, request, run_id):
if not Run.objects.filter(id=run_id).exists():
return Response(
{"status": "error", "errors": {"detail": f"run {run_id} not found"}},
status=status.HTTP_404_NOT_FOUND,
)
transitions = RunLifecycleTransition.objects.filter(run_id=run_id).order_by(
"timestamp", "created_at"
)
return Response(
{
"run_id": str(run_id),
"transitions": [
{
"event": t.event_name,
"from_state": t.from_state,
"to_state": t.to_state,
"timestamp": t.timestamp.isoformat(),
"actions": t.actions or {},
"guards": t.guards or {},
}
for t in transitions
],
},
status=status.HTTP_200_OK,
)


class PositionViewSet(viewsets.ModelViewSet):
queryset = Position.objects.all()
serializer_class = PositionSerializer
authentication_classes = [TokenAuthentication]


class ProgressionViewSet(viewsets.ModelViewSet):
queryset = Progression.objects.all()
serializer_class = ProgressionSerializer
class VehicleStopStatusViewSet(viewsets.ModelViewSet):
queryset = VehicleStopStatus.objects.all()
serializer_class = VehicleStopStatusSerializer
authentication_classes = [TokenAuthentication]


class CongestionLevelViewSet(viewsets.ModelViewSet):
queryset = CongestionLevel.objects.all()
serializer_class = CongestionLevelSerializer
authentication_classes = [TokenAuthentication]


class OccupancyViewSet(viewsets.ModelViewSet):
queryset = Occupancy.objects.all()
serializer_class = OccupancySerializer
queryset = OccupancyStatus.objects.all()
serializer_class = OccupancyStatusSerializer
authentication_classes = [TokenAuthentication]


Expand Down
Loading