Skip to content

feat: add GatewayMQTT component for ESP32 MQTT integration#3557

Open
mgazza wants to merge 6 commits intomainfrom
feat/gateway-mqtt-component
Open

feat: add GatewayMQTT component for ESP32 MQTT integration#3557
mgazza wants to merge 6 commits intomainfrom
feat/gateway-mqtt-component

Conversation

@mgazza
Copy link
Collaborator

@mgazza mgazza commented Mar 13, 2026

Summary

  • Add GWMQTT inverter type to INVERTER_DEF in config.py for ESP32 gateway MQTT integration
  • Add gateway.py — full MQTT component (aiomqtt) with telemetry decoding, command publishing, token refresh, exponential backoff reconnection
  • Register GatewayMQTT in COMPONENT_LIST (components.py) with conditional import
  • Add per-inverter HA-style entity injection (sensor., number., switch., select. prefixes with serial suffix)
  • Add automatic_config() for PredBat set_arg() registration on first telemetry
  • Add plan change detection to skip redundant ExecutionPlan publishes
  • Add energy counter, schedule time, battery SoH/rate_max, inverter_time entities
  • Add Python protobuf bindings and proto schema (PlanEntry, ExecutionPlan)
  • Add MQTT integration test for plan publish format

Test plan

  • Verify gateway.py imports cleanly with aiomqtt + protobuf installed
  • Verify conditional import degrades gracefully when aiomqtt not installed
  • Verify GWMQTT inverter type is recognized by plan engine
  • Verify entity names match between _inject_inverter_entities and automatic_config set_arg mappings
  • Run existing gateway MQTT integration test

🤖 Generated with Claude Code

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 13, 2026

Code Review: GatewayMQTT Component

Verdict: Approve

Correctness

  • decode_telemetry() static method and ENTITY_MAP approach is clean and testable
  • Token refresh via oauth-refresh with provider: "predbat_gateway" is consistent with FoxESS OAuth pattern
  • _check_token_refresh correctly uses _refresh_in_progress guard to prevent concurrent refreshes
  • is_alive() handles the "gateway offline but broker connected" case well

Minor Issues (follow-up)

  1. instance_id extraction (gateway.py): self.args.get("user_id", "") — verify user_id is injected by ComponentBase framework since it's not in the explicit args defined in components.py

  2. Reconnect backoff has no cap: _RECONNECT_BASE_DELAY * (2 ** attempt) with 10 max attempts means max delay is ~85 min. Consider capping at 60s

  3. build_command selective fields: Only mode, power_w, target_soc kwargs are included — other kwargs silently dropped. Fine for v1, note for future commands

Test Coverage

12 tests covering protobuf round-trip, entity mapping, plan serialization, command format, EMS entities, and JWT token handling. Good coverage for v1.

🤖 Generated with Claude Code

@springfall2008
Copy link
Owner

From an infra perspective the 'proto' directory won't be downloaded by default by the installed
And there's a new library here from google, do we know how big it is ?
It might make sense to guard this in some way so if the library is missing its safe?

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 14, 2026

OSS community cant really use this stand alone so im not sure what we should do about this.

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 14, 2026

Good points, let me address each:

Proto directory: The proto/ dir contains pre-compiled Python bindings (just .py files) — no .proto compilation needed at runtime. We can move them into the apps/predbat/ tree so they ship with the standard install, or alternatively bundle them directly inside gateway.py since there are only two message types.

Protobuf library size: protobuf (google-protobuf) is ~1.8MB installed. It's a pure-Python package with no native compilation needed. That said, if size is a concern we could use lightweight alternatives like proto-plus or even hand-roll the decode since our messages are simple (2 message types, ~15 fields total).

Import guard: Agreed — I'll wrap the protobuf import in a try/except so if the library isn't installed the component simply doesn't register. PredBat continues to work normally without it. This is SaaS-only functionality anyway so OSS users won't be affected.

I'll push the import guard fix shortly.

self.set_state_wrapper(f"sensor.{pfx}_battery_current", bat.current_a)
self.set_state_wrapper(f"sensor.{pfx}_battery_temperature", bat.temperature_c)
if bat.capacity_wh:
self.set_state_wrapper(f"sensor.{pfx}_battery_capacity", bat.capacity_wh)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

soc_max unit mismatch: this injects raw Wh from the proto (e.g. 15970) but PredBat's soc_max expects kWh (e.g. 15.97) when soc_units: "%". The energy counter entities already divide by 1000 — this needs the same.

Suggested change
self.set_state_wrapper(f"sensor.{pfx}_battery_capacity", bat.capacity_wh)
self.set_state_wrapper(f"sensor.{pfx}_battery_capacity", round(bat.capacity_wh / 1000.0, 2))

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 16, 2026

Code review

Found 1 issue:

  1. battery.rate_max_w is referenced in ENTITY_MAP, _inject_inverter_entities, and automatic_config, but the BatteryStatus proto message only defines 8 fields (soc_percent through capacity_wh) — there is no rate_max_w field. In proto3, accessing a non-existent field silently returns 0, so if bat.rate_max_w > 0 is always false and the battery rate max entity/arg is never populated from live gateway data.

"battery.capacity_wh": "predbat_gateway_battery_capacity",
"battery.rate_max_w": "predbat_gateway_battery_rate_max",
# Power flows

Proto schema (no rate_max_w field):

// Proto3 zero-value omission means unpopulated future fields cost nothing on the wire.
// Mirrors InverterType enum in types.h (values must match 1:1 for safe casting)
enum InverterType {
INVERTER_TYPE_UNKNOWN = 0;
INVERTER_TYPE_SOLIS_HYBRID = 1;
INVERTER_TYPE_SOLIS_AC = 2;
INVERTER_TYPE_SOFAR_G3 = 3;
INVERTER_TYPE_GROWATT_SPH = 4;
INVERTER_TYPE_DEYE_SUNSYNK = 5;
INVERTER_TYPE_GIVENERGY = 6;

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 16, 2026

Code review

Found 3 issues:

  1. soc_max injected as Wh instead of kWh -- _inject_inverter_entities stores bat.capacity_wh raw (e.g. 9500 for a 9.5 kWh battery) into the battery_capacity entity. automatic_config then registers this entity as soc_max, but PredBat's core engine treats soc_max as kWh. This makes the battery appear 1000x larger than it is, breaking all charge/discharge calculations. The energy counter entities in the same file correctly convert Wh / 1000.0 -- battery capacity should do the same.

if bat.capacity_wh:
self.set_state_wrapper(f"sensor.{pfx}_battery_capacity", bat.capacity_wh)
if bat.soh_percent > 0:

  1. Protobuf version mismatch raises RuntimeError, not ImportError -- The generated gateway_status_pb2.py calls ValidateProtobufRuntimeVersion(..., 6, 33, 4, ...) which raises a RuntimeError (or VersionError) if the installed protobuf version doesn't match. Both gateway.py (line 24) and components.py (line 40) only catch ImportError. If a user has protobuf installed at the wrong version, the uncaught exception crashes PredBat startup for all users, not just gateway users. Fix: broaden the except to except (ImportError, Exception) or specifically catch RuntimeError.

try:
from proto import gateway_status_pb2 as pb
HAS_PROTOBUF = True
except ImportError:
pb = None
HAS_PROTOBUF = False

try:
from gateway import GatewayMQTT
HAS_GATEWAY = True
except ImportError:
HAS_GATEWAY = False
GatewayMQTT = None

  1. Token refresh fires every 60 seconds when JWT parse fails -- When extract_jwt_expiry() returns 0 (malformed token), mqtt_token_expires_at stays 0 (falsy). The early-return condition if self.mqtt_token_expires_at and not self.token_needs_refresh(...) short-circuits, skipping the return. The method falls through to the HTTP refresh call on every 60-second housekeeping cycle indefinitely. Fix: track whether extraction was already attempted, or set a sentinel value on parse failure.

# Extract expiry from JWT if not yet known
if not self.mqtt_token_expires_at and self.mqtt_token:
self.mqtt_token_expires_at = self.extract_jwt_expiry(self.mqtt_token)
if self.mqtt_token_expires_at and not self.token_needs_refresh(self.mqtt_token_expires_at):
return

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 20, 2026

Bug: test asserts raw Wh instead of converted kWh for battery_capacity

_inject_inverter_entities correctly converts capacity_wh to kWh:

self.set_state_wrapper(f"sensor.{pfx}_battery_capacity", round(bat.capacity_wh / 1000.0, 2))

But the test asserts the unconverted value:

assert entities["predbat_gateway_battery_capacity"] == 9500  # should be 9.5

This test will fail. Fix: assert entities["predbat_gateway_battery_capacity"] == 9.5

@mgazza
Copy link
Collaborator Author

mgazza commented Mar 23, 2026

Code review

Found 6 issues:

  1. Empty plan early-return prevents gateway schedule clearing. _on_plan_executed returns early when both charge_windows and export_windows are empty, but never publishes an empty ExecutionPlan to clear the retained MQTT schedule. The gateway keeps executing the last published plan even when PredBat decides no action is needed.

client_id = f"predbat-{self.gateway_device_id}-{uuid.uuid4().hex[:8]}"
async with aiomqtt.Client(
hostname=self.mqtt_host,

  1. Proto3 HasField() on singular message fields will raise ValueError at runtime. inv.HasField("battery"), inv.HasField("schedule"), and inv.HasField("energy") are called on regular embedded message fields in a proto3 schema. In proto3, HasField() is only valid for optional fields or oneof members. These calls will crash _inject_inverter_entities() on every telemetry receipt.

self.set_arg("export_today", [f"sensor.{base0}_export_today"])
self.set_arg("load_today", [f"sensor.{base0}_load_today"])
# Battery health (first inverter)
self.set_arg("battery_temperature_history", f"sensor.{base0}_battery_temperature")
self.set_arg("battery_scaling", [f"sensor.{base0}_battery_dod"])
# Battery rate max
rate_max = inverters[0].battery.rate_max_w
if rate_max > 0:
self.set_arg("battery_rate_max", [f"sensor.{base0}_battery_rate_max"])
else:
self.log("Warn: GatewayMQTT: no battery_rate_max from gateway, using default 6000W")
self.set_arg("battery_rate_max", [6000])
# Inverter time (clock drift detection — uses GatewayStatus.timestamp)
self.set_arg("inverter_time", [f"sensor.{base0}_inverter_time"])
# EMS aggregate entities (GivEnergy EMS only)
inv0 = inverters[0]
if inv0.type == pb.INVERTER_TYPE_GIVENERGY_EMS and inv0.ems.num_inverters > 0:
pfx = f"{self.prefix}_gateway"
self.set_arg("ems_total_soc", f"sensor.{pfx}_ems_total_soc")
self.set_arg("ems_total_charge", f"sensor.{pfx}_ems_total_charge")
self.set_arg("ems_total_discharge", f"sensor.{pfx}_ems_total_discharge")
self.set_arg("ems_total_grid", f"sensor.{pfx}_ems_total_grid")
self.set_arg("ems_total_pv", f"sensor.{pfx}_ems_total_pv")
self.set_arg("ems_total_load", f"sensor.{pfx}_ems_total_load")
# EMS idle slots (discharge pause windows) — use same slot entities for all inverters
self.set_arg(
"idle_start_time",
[f"select.{base0}_discharge_slot1_start" for _ in range(num_inverters)],

  1. depth_of_discharge_pct does not exist in BatteryStatus proto. inv.battery.depth_of_discharge_pct is referenced but the field is not defined in gateway_status.proto. The proto defines: soc_percent, voltage_v, current_a, power_w, temperature_c, soh_percent, cycle_count, capacity_wh. Accessing a non-existent attribute on a proto3 generated message raises AttributeError.

self.set_arg("ems_total_charge", f"sensor.{pfx}_ems_total_charge")
self.set_arg("ems_total_discharge", f"sensor.{pfx}_ems_total_discharge")
self.set_arg("ems_total_grid", f"sensor.{pfx}_ems_total_grid")
self.set_arg("ems_total_pv", f"sensor.{pfx}_ems_total_pv")

  1. Charge/discharge rate conversion uses * 1000 instead of * MINUTE_WATT. battery_rate_max_charge is stored as kWh/min (divided by MINUTE_WATT = 60000). Converting back to watts requires * 60000, not * 1000. The current code produces rates 60x too small (e.g. 3kW battery reports 50W to the gateway).

self.fetch_inverter_data()
status, status_extra = self.execute_plan()
else:
self.log("Will not recompute the plan, it is {} minutes old and max age is {} minutes".format(dp1(plan_age_minutes), self.calculate_plan_every))

  1. test_gateway.py not registered in CI test runner. Tests are pytest-style but unit_test.py TEST_REGISTRY is not updated. The repo runs tests via coverage/run_all -> unit_test.py, not pytest. These tests will not execute in CI. Same issue was flagged and required as a fix for PR feat: add KrakenAPI component for EDF/E.ON tariff discovery #3635.

"""Tests for GatewayMQTT component."""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from proto import gateway_status_pb2 as pb
import importlib.util

  1. update_success_timestamp() only called on telemetry receipt. If the ESP32 device goes offline for 60+ minutes, Components.is_alive() will mark the component dead even though the MQTT connection is healthy and run() returns True. Should also update on successful connection/housekeeping, not just telemetry. Same pattern was required for PR feat: add KrakenAPI component for EDF/E.ON tariff discovery #3635.

if grid.voltage_v:
self.set_state_wrapper(f"sensor.{pfx}_grid_voltage", grid.voltage_v)
if grid.frequency_hz:
self.set_state_wrapper(f"sensor.{pfx}_grid_frequency", grid.frequency_hz)
self.set_state_wrapper(f"sensor.{pfx}_load_power", inv.load.power_w)

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

mgazza and others added 3 commits March 23, 2026 20:33
…xecution

Add Python protobuf definitions matching the ESP32 firmware proto schema:
- GatewayStatus with device diagnostics (uptime, RSSI, heap, log level)
- BatteryStatus including depth_of_discharge_pct and rate_max_w
- InverterEntry with managed flag, schedule, EMS, and energy counters
- ExecutionPlan for cloud-to-gateway schedule publishing

Proto is synced with src/predbat-gateway/include/proto/gateway_status.proto.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
GatewayMQTT is a ComponentBase-derived component that bridges ESP32
gateway devices to PredBat via MQTT. Key capabilities:

- MQTT connection with TLS, auto-reconnect, and exponential backoff
- Protobuf telemetry decoding → Home Assistant entity injection
- Per-inverter entity mapping with auto-config from gateway status
- EMS multi-inverter aggregate entity support (GivEnergy EMS)
- JWT token refresh via oauth-refresh edge function
- Execution plan publishing (charge/export windows → protobuf → MQTT)
- Plan deduplication and periodic re-publish for reliability
- Proto3-compatible sub-message checks using ByteSize() > 0
- Battery DoD from firmware telemetry with apps.yaml fallback
- Correct charge/discharge rate conversion using MINUTE_WATT

Registers GWMQTT inverter type and GatewayMQTT in COMPONENT_LIST
with all required config args wired via set_arg().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comprehensive tests covering:
- Protobuf encode/decode roundtrip and entity mapping
- Plan serialization (entries, empty plan, version monotonicity)
- MQTT command format (mode, charge rate, reserve, schedule slots)
- EMS aggregate entity generation for multi-inverter setups
- JWT token expiry extraction and refresh threshold logic
- Plan hook conversion (charge/export windows, caps, edge cases)
- Empty window plan publishing (clears gateway schedule)

Tests work both with pytest (local dev) and without (CI runner):
- pytest.approx replaced with approx_equal() helper
- pytest import made conditional
- run_gateway_tests() wrapper registered in unit_test.py TEST_REGISTRY

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@mgazza mgazza force-pushed the feat/gateway-mqtt-component branch from 1948a9f to 1505651 Compare March 23, 2026 20:36
springfall2008 and others added 3 commits March 24, 2026 19:46
GivEnergy Cloud returns "24:00:00" for discharge_end_time (meaning
midnight/end of day). This is valid ISO 8601 but crashes strptime.
Normalise to "00:00:00" before parsing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants