Skip to content

Commit 49dfe6e

Browse files
author
sidey79
committed
feat: Enhance MQTT response parsing and logging
- Updated the response parser to support additional delimiters (`;` and `=`) for better command matching. - Implemented strict prefix matching to prevent incorrect command matches. - Introduced a new logging mechanism to clean payloads by removing preambles before logging. - Added functionality to skip publishing messages with empty or invalid data. - Ensured JSON output for MQTT messages is compact to maintain compatibility with brokers. - Added tests to verify the new parsing logic and ensure robustness against invalid payloads. - Refactored MQTT publisher to handle raw lines and improved error handling for empty payloads.
1 parent d5e3bb6 commit 49dfe6e

15 files changed

Lines changed: 700 additions & 46 deletions

docs/architecture/decisions/ADR-004-mqtt-response-parsing.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ Dies stellt sicher, dass alle erfolgreichen `GET` Anfragen über MQTT eine struk
2424

2525
=== Detaillierte Logik-Anpassungen
2626

27-
1. **`get_config` (CG):**
27+
1. **Erweitertes Parsing (Update 2026-01-13):**
28+
* Der Response-Parser unterstützt nun `;` und `=` als Trennzeichen zusätzlich zu Leerzeichen (z.B. für `SR;R=...`).
29+
* Es wird ein striktes Prefix-Matching erzwungen, um zu verhindern, dass Befehls-Prefixe längere Nachrichtentypen matchen (z.B. darf Befehl `M` nicht `MN;...` matchen).
30+
31+
2. **`get_config` (CG):**
2832
* Wird eine private Hilfsfunktion `_parse_decoder_config(response: str) -> Dict[str, int]` in [`signalduino/commands.py`](signalduino/commands.py) implementiert.
2933
* Diese Funktion parst den `key=value;` String in ein Dictionary (z.B. `{'MS': 1, 'MU': 1, 'MC': 1, 'Mred': 1}`).
3034
* Der Rückgabetyp von `get_config` wird von `str` auf `Dict[str, int]` geändert.

docs/architecture/decisions/ADR-006-json-output-schema.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ Wir werden das JSON-Output-Schema von `DecodedMessage` wie folgt anpassen:
1717

1818
Die Umbenennung des Nutzdatenfeldes von `payload` zu `data` sowie die Einführung des Feldes `raw` (für die ursprüngliche Nachricht) sind in link:ADR-007-data-and-raw-fields.adoc[ADR-007] dokumentiert, das dieses Schema ergänzt und präzisiert. Dieses ADR dient als Grundlage für die Einführung des `protocol`-Feldes und die Bereinigung des Nutzdateninhalts.
1919

20+
=== Serialisierungsformat
21+
Der JSON-Output für MQTT-Nutzdaten muss **kompakt** (ohne Zeilenumbrüche und Einrückungen) serialisiert werden, um die Kompatibilität mit MQTT-Brokern und Downstream-Systemen zu gewährleisten, die multiline-Nutzdaten falsch interpretieren könnten. Dies wird durch das Weglassen des `indent`-Parameters beim `json.dumps`-Aufruf in `MqttPublisher` sichergestellt.
22+
2023
=== Details zur neuen Struktur (Präzisiert durch ADR-007)
2124

2225
[cols="1,1,4"]

main.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@
1414
from signalduino.transport import SerialTransport, TCPTransport
1515
from signalduino.types import DecodedMessage, RawFrame # NEU: RawFrame
1616

17+
# NEU: Importiere Protokoll-Handler für Log-Bereinigung
18+
try:
19+
from sd_protocols.loader import _protocol_handler
20+
except ImportError:
21+
_protocol_handler = None
22+
23+
1724
# Konfiguration des Loggings
1825
def initialize_logging(log_level_str: str):
1926
"""Initialisiert das Logging basierend auf dem übergebenen String."""
@@ -39,10 +46,28 @@ def initialize_logging(log_level_str: str):
3946
async def message_callback(message: DecodedMessage):
4047
"""Callback-Funktion, die aufgerufen wird, wenn eine Nachricht dekodiert wurde."""
4148
model = message.metadata.get("model", "Unknown")
49+
50+
# NEU: Bereinige die Payload für die Log-Ausgabe, da der Parser die Preamble möglicherweise nicht entfernt hat
51+
# (oder der Preamble-Eintrag im Protokoll-Handler fehlt).
52+
log_payload = message.data
53+
preamble = ""
54+
55+
if _protocol_handler and message.protocol.get('id'):
56+
try:
57+
protocol_id = message.protocol['id']
58+
# Abrufen der Preamble, falls vorhanden
59+
preamble = _protocol_handler.check_property(protocol_id, 'preamble', '')
60+
except Exception:
61+
logger.debug("Konnte Preamble nicht abrufen für Protokoll %s", message.protocol.get('id'))
62+
63+
if preamble and log_payload.upper().startswith(preamble.upper()):
64+
# Entferne die Preamble aus der Payload für das Logging
65+
log_payload = log_payload[len(preamble):]
66+
4267
logger.info(
4368
f"Decoded message received: protocol={message.protocol_id}, "
4469
f"model={model}, "
45-
f"payload={message.payload}"
70+
f"payload={log_payload}"
4671
)
4772
logger.debug(f"Full Metadata: {message.metadata}")
4873
# NEU: Überprüfe, ob RawFrame vorhanden ist und das Attribut 'line' hat

signalduino/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,7 @@
1717
SDUINO_MC_DISPATCH_LOG_ID = "12.1"
1818
SDUINO_PARSE_DEFAULT_LENGHT_MIN = 8
1919
SDUINO_GET_CONFIGQUERY_DELAY = 0.75
20+
21+
# Protocol Framing
22+
ASCII_STX = "\x02"
23+
ASCII_ETX = "\x03"

signalduino/controller.py

Lines changed: 82 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import asyncio
77
from datetime import datetime, timedelta, timezone
8+
from dataclasses import asdict
89
from typing import Any, Awaitable, Callable, List, Optional, Dict, Tuple, Pattern
910

1011
from .commands import SignalduinoCommands
@@ -250,13 +251,42 @@ async def _parser_task(self) -> None:
250251
# Führe die rechenintensive Parsing-Logik in einem separaten Thread aus.
251252
# Dadurch wird die asyncio-Event-Schleife nicht blockiert.
252253
decoded = await asyncio.to_thread(self.parser.parse_line, line)
253-
if decoded and self.message_callback:
254-
await self.message_callback(decoded[0])
255-
if self.mqtt_publisher and decoded:
256-
# Verwende die neue MqttPublisher.publish(message: DecodedMessage) Signatur
257-
await self.mqtt_publisher.publish(decoded[0])
258-
await self._handle_as_command_response(line)
259-
254+
if decoded:
255+
for message in decoded:
256+
if isinstance(message, DecodedMessage):
257+
# Überspringe die Veröffentlichung, wenn DecodedMessage keine Daten enthält.
258+
# Das Feld 'data' kann leer sein, wenn die Decodierung fehlschlägt (z.B. Checksumme)
259+
# oder ungültige Werte wie '[]' (leere JSON-Liste als String) enthält,
260+
# was auf eine nicht parsbare Nachricht hinweist, die aber als DecodedMessage
261+
# zurückgegeben wurde.
262+
if not message.data or message.data.strip() == "[]":
263+
self.logger.info("Skipping decoded message with empty/invalid data for protocol %s: %s",
264+
message.protocol.get('id', 'N/A'), message)
265+
continue
266+
267+
if self.message_callback:
268+
try:
269+
await self.message_callback(message)
270+
except Exception as exc:
271+
self.logger.error("Error in message callback: %s", exc)
272+
273+
if self.mqtt_publisher:
274+
try:
275+
# message is a DecodedMessage dataclass, pass directly to publish
276+
# The MqttPublisher handles serialization.
277+
await self.mqtt_publisher.publish(message)
278+
except Exception as exc:
279+
self.logger.error("Error publishing message to MQTT: %s", exc)
280+
matched_cmd = False
281+
if not decoded:
282+
# Nur die Zeile als Befehlsantwort verarbeiten, wenn sie NICHT erfolgreich als Sensordaten geparst wurde.
283+
matched_cmd = await self._handle_as_command_response(line)
284+
285+
# If line was not parsed as a decoded message and was not a command response,
286+
# publish it as a raw line via MQTT (Problem 2).
287+
if not decoded and not matched_cmd and self.mqtt_publisher and line.strip():
288+
await self.mqtt_publisher.publish_raw_line(line)
289+
260290
# Ensure a minimal yield time for other tasks when the queue is rapidly processed.
261291
await asyncio.sleep(0.01)
262292
except Exception as e:
@@ -357,34 +387,69 @@ async def _send_and_wait(self, command: str, timeout: float, response_pattern: O
357387
raise SignalduinoConnectionError(str(e))
358388
raise
359389

360-
async def _handle_as_command_response(self, line: str) -> None:
361-
"""Check if the received line matches any pending command response."""
390+
async def _handle_as_command_response(self, line: str) -> bool:
391+
"""Check if the received line matches any pending command response.
392+
393+
Returns:
394+
True if a response was matched and processed, False otherwise.
395+
"""
362396
self.logger.debug("Hardware response received: %s", line)
397+
matched = False
398+
363399
async with self._pending_responses_lock:
364400
self.logger.debug(f"Current pending responses: {len(self._pending_responses)}")
365-
for pending in self._pending_responses:
401+
402+
# Use a copy of the list for safe iteration/removal if performance allows.
403+
# Since the number of pending responses is usually small, this is safe.
404+
for pending in list(self._pending_responses):
366405
try:
367-
self.logger.debug(f"Checking pending response for command: {pending.command.payload}. Line: {line.strip()}")
406+
cmd_payload = pending.command.payload
407+
self.logger.debug(f"Checking pending response for command: {cmd_payload}. Line: {line.strip()}")
368408

369409
pattern = pending.command.response_pattern
370410
if pattern:
371411
self.logger.debug(f"Testing pattern: {pattern.pattern}")
372412
if pattern.match(line):
373-
self.logger.debug(f"Matched response pattern for command: {pending.command.payload}")
413+
self.logger.debug(f"Matched response pattern for command: {cmd_payload}")
374414
pending.future.set_result(line)
375415
self._pending_responses.remove(pending)
376-
return
416+
matched = True
417+
break # Exit for-loop
377418

378-
self.logger.debug(f"Testing direct match for: {pending.command.payload}")
379-
if line.startswith(pending.command.payload):
380-
self.logger.debug(f"Matched direct response for command: {pending.command.payload}")
419+
self.logger.debug(f"Testing direct match for: {cmd_payload}")
420+
421+
# Robust check for command match:
422+
# 1. Startswith command payload (e.g., 'V', 'SR')
423+
# 2. Response must be either exact match, or followed by a valid separator
424+
# Separators: space (V command), ';' (SR command), '=' (Config)
425+
# This ensures "V" matches "V 4.0.0", "SR" matches "SR;R=1", but "M" does NOT match "MN..."
426+
if line.startswith(cmd_payload):
427+
if len(line) == len(cmd_payload):
428+
is_direct_match = True
429+
else:
430+
separator = line[len(cmd_payload)]
431+
is_direct_match = separator.isspace() or separator in (';', '=')
432+
else:
433+
is_direct_match = False
434+
435+
if is_direct_match:
436+
self.logger.debug(f"Matched direct response for command: {cmd_payload}")
381437
pending.future.set_result(line)
382438
self._pending_responses.remove(pending)
383-
return
439+
matched = True
440+
break # Exit for-loop
441+
384442
except Exception as e:
385443
self.logger.error(f"Error processing pending response: {e}")
444+
# Remove the potentially failing pending response to prevent further errors
445+
if pending in self._pending_responses:
446+
self._pending_responses.remove(pending)
386447
continue
448+
449+
if not matched:
387450
self.logger.debug("No matching pending response found")
451+
452+
return matched
388453

389454
async def _init_task_start_loop(self) -> None:
390455
"""Main initialization task that handles version check and XQ command."""

signalduino/mqtt.py

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import json
22
import logging
33
import os
4-
from dataclasses import asdict
5-
from typing import Optional, Any, Callable, Awaitable # NEU: Awaitable für async callbacks
4+
from dataclasses import asdict, is_dataclass
5+
from typing import Optional, Any, Callable, Awaitable, Union # NEU: Awaitable für async callbacks
66

77
from .commands import MqttCommandDispatcher, CommandValidationError, SignalduinoCommandTimeout # NEU: Import Dispatcher
88
import aiomqtt as mqtt
@@ -231,8 +231,28 @@ async def _handle_command(self, command_name: str, payload: str) -> None:
231231
)
232232

233233

234-
def _message_to_json(self, message: DecodedMessage) -> str:
235-
"""Serializes a DecodedMessage to a JSON string."""
234+
def _message_to_json(self, message: Union[DecodedMessage, Any]) -> Optional[str]:
235+
"""Serializes a DecodedMessage or other payload to a JSON string."""
236+
237+
# Check if message is a dataclass instance
238+
if not is_dataclass(message):
239+
# If not a dataclass, try to serialize it directly or wrap it
240+
if isinstance(message, dict):
241+
return json.dumps(message)
242+
elif isinstance(message, str):
243+
try:
244+
# Check if it's already valid JSON
245+
json.loads(message)
246+
return message
247+
except json.JSONDecodeError:
248+
# Wrap string in a simple object
249+
return json.dumps({"data": message})
250+
else:
251+
# Fallback for other types
252+
try:
253+
return json.dumps(message)
254+
except (TypeError, ValueError):
255+
return json.dumps({"data": str(message)})
236256

237257
# DecodedMessage uses dataclasses, but RawFrame inside it also uses a dataclass.
238258
# We need a custom serializer to handle nested dataclasses like RawFrame.
@@ -249,11 +269,13 @@ def _raw_frame_to_dict(raw_frame: RawFrame) -> dict:
249269
# Note: 'raw' is now a string (ADR-007) and should be published.
250270
# The pop operation (line 249 in original) is removed to include it.
251271

252-
# Append preamble to data for FHEM compatibility (PreambleProtocolID#HexData)
253272
preamble = ""
254273
if self._protocol_handler:
255274
try:
256-
protocol_id = message.protocol.get('id')
275+
# Use .get only if message has a protocol attribute (it should if it's DecodedMessage)
276+
protocol = getattr(message, 'protocol', {})
277+
protocol_id = protocol.get('id') if isinstance(protocol, dict) else None
278+
257279
if protocol_id:
258280
# check_property returns the value or default
259281
preamble = self._protocol_handler.check_property(protocol_id, 'preamble', '')
@@ -264,9 +286,32 @@ def _raw_frame_to_dict(raw_frame: RawFrame) -> dict:
264286
message_dict["preamble"] = preamble
265287

266288
# Ensure data (formerly payload) is uppercase
267-
message_dict["data"] = message.data.upper()
289+
# Use getattr to be safe even if dataclass structure changed
290+
message_data = getattr(message, 'data', '')
291+
if isinstance(message_data, str):
292+
message_data = message_data.upper()
293+
else:
294+
message_data = str(message_data).upper()
295+
296+
# REMOVE PREAMBLE FROM DATA FIELD IF PRESENT
297+
# This ensures the 'data' field only contains the payload, consistent with ADR-007.
298+
if preamble and message_data.startswith(preamble.upper()):
299+
message_data = message_data[len(preamble):]
300+
301+
# NEU: Fehlerbehebung für ungültige Parser-Rückgaben (dmsg=[])
302+
# Wenn der Parser eine leere Liste als String-Literal zurückgibt, wird dies als ungültiger
303+
# Datenwert interpretiert. Setze auf leeren String.
304+
if message_data.strip() == "[]":
305+
message_data = ""
306+
self.logger.warning("Invalid data '[]' received from parser, setting to empty string.")
268307

269-
return json.dumps(message_dict, indent=4)
308+
# Check for empty payload to prevent sending empty messages
309+
if not message_data:
310+
return None
311+
312+
message_dict["data"] = message_data
313+
314+
return json.dumps(message_dict)
270315

271316
async def publish_simple(self, subtopic: str, payload: str, retain: bool = False) -> None:
272317
"""Publishes a simple string payload to a subtopic of the main topic."""
@@ -276,11 +321,19 @@ async def publish_simple(self, subtopic: str, payload: str, retain: bool = False
276321

277322
try:
278323
topic = f"{self.base_topic}/{subtopic}"
279-
await self.client.publish(topic, payload, retain=retain)
324+
await self.client.publish(topic, payload.encode("utf-8"), retain=retain)
280325
self.logger.debug("Published simple message to %s: %s", topic, payload)
281326
except Exception:
282327
self.logger.error("Failed to publish simple message to %s", subtopic, exc_info=True)
283328

329+
async def publish_raw_line(self, line: str) -> None:
330+
"""Publishes a raw, non-decoded line from the transport (e.g., non-command status messages)."""
331+
await self.publish_simple(
332+
subtopic="state/raw_lines",
333+
payload=json.dumps({"line": line.strip()}),
334+
retain=False
335+
)
336+
284337
async def publish(self, message: DecodedMessage) -> None:
285338
"""Publishes a DecodedMessage."""
286339
if not self.client:
@@ -290,8 +343,14 @@ async def publish(self, message: DecodedMessage) -> None:
290343
try:
291344
topic = f"{self.base_topic}/state/messages"
292345
payload = self._message_to_json(message)
293-
await self.client.publish(topic, payload)
346+
347+
if payload is None:
348+
protocol_id = message.protocol.get('id', 'N/A')
349+
self.logger.debug("Skipping MQTT publish due to empty payload for protocol %s", protocol_id)
350+
return
351+
352+
await self.client.publish(topic, payload.encode("utf-8"))
294353
protocol_id = message.protocol.get('id', 'N/A')
295-
self.logger.debug("Published message for protocol %s to %s", protocol_id, topic)
354+
self.logger.debug("Published message for protocol %s to %s (Payload length: %s)", protocol_id, topic, len(payload))
296355
except Exception:
297356
self.logger.error("Failed to publish message", exc_info=True)

signalduino/parser/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from typing import Optional, List, Tuple
77

88
from ..exceptions import SignalduinoParserError
9+
from ..constants import ASCII_STX, ASCII_ETX
910

10-
_STX_ETX = re.compile(r"^\x02(M[sSuUcCNOo];.*;)\x03$")
11+
_STX_ETX = re.compile(f"^{ASCII_STX}(M[sSuUcCNOo];.*;){ASCII_ETX}$")
1112

1213

1314
def decompress_payload(compressed_payload: str) -> str:

signalduino/parser/mc.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,14 @@ def parse(self, frame: RawFrame) -> Iterable[DecodedMessage]:
100100
"preamble": protocol_data.get("preamble", ""),
101101
}
102102

103-
# 1. Entferne die Preamble aus der Payload
104-
preamble_len = len(protocol_meta["preamble"])
105-
payload = raw_payload[preamble_len:]
103+
# 1. Entferne die Preamble aus der Payload, falls vorhanden.
104+
# Normalisiere Preamble und Payload zur korrekten Erkennung der Groß-/Kleinschreibung.
105+
preamble = protocol_meta["preamble"]
106+
if preamble and raw_payload.upper().startswith(preamble.upper()):
107+
preamble_len = len(preamble)
108+
payload = raw_payload[preamble_len:]
109+
else:
110+
payload = raw_payload
106111

107112
yield DecodedMessage(
108113
data=payload,

signalduino/types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ class DecodedMessage:
3030
metadata: dict = field(default_factory=dict)
3131
protocol: dict = field(default_factory=dict)
3232

33+
@property
34+
def protocol_id(self) -> Optional[str]:
35+
"""Provides backward compatibility for message.protocol_id."""
36+
return self.protocol.get('id')
37+
3338

3439
@dataclass(slots=True)
3540
class QueuedCommand:

0 commit comments

Comments
 (0)