diff --git a/openevsehttp/client.py b/openevsehttp/client.py index 9901293..5daf0dc 100644 --- a/openevsehttp/client.py +++ b/openevsehttp/client.py @@ -6,7 +6,6 @@ import inspect import json import logging -import re import threading from collections.abc import Callable, Mapping from typing import Any @@ -31,6 +30,7 @@ from .managers import ManagersMixin from .properties import PropertiesMixin from .sensors import SensorsMixin +from .utils import get_awesome_version from .websocket import ( SIGNAL_CONNECTION_STATE, STATE_CONNECTED, @@ -441,38 +441,18 @@ def _version_check(self, min_version: str, max_version: str = "") -> bool: _LOGGER.warning("Unable to find firmware version.") return False cutoff = AwesomeVersion(min_version) - current = "" limit = "" if max_version != "": limit = AwesomeVersion(max_version) - firmware_filtered = None - firmware_search = re.search(r"\d+\.\d+\.\d+", self._config["version"]) - if firmware_search: - firmware_filtered = firmware_search.group(0) - - if firmware_filtered is None: + current = get_awesome_version(self._config["version"]) + if current.strategy == "unknown": _LOGGER.warning( "Non-standard versioning string: %s", self._config["version"] ) _LOGGER.debug("Non-semver firmware version detected.") return False - _LOGGER.debug("Detected firmware: %s", self._config["version"]) - _LOGGER.debug("Filtered firmware: %s", firmware_filtered) - - if "dev" in self._config["version"]: - value = self._config["version"] - _LOGGER.debug("Stripping 'dev' from version.") - value = value.split(".") - value = ".".join(value[0:3]) - elif "master" in self._config["version"]: - value = "dev" - else: - value = firmware_filtered - - current = AwesomeVersion(value) - if limit: try: if cutoff <= current < limit: diff --git a/openevsehttp/commands.py b/openevsehttp/commands.py index 4ff0e65..b3283c9 100644 --- a/openevsehttp/commands.py +++ b/openevsehttp/commands.py @@ -14,6 +14,7 @@ from .const import MAX_AMPS, MIN_AMPS, RAPI_ERRORS, divert_mode from .exceptions import UnknownError, UnsupportedFeature +from .utils import get_awesome_version _LOGGER = logging.getLogger(__name__) @@ -364,22 +365,10 @@ async def firmware_check(self) -> dict | None: method = "get" cutoff = AwesomeVersion("3.0.0") - current = "" - _LOGGER.debug("Detected firmware: %s", self._config["version"]) - if "dev" in self._config["version"]: - value = self._config["version"] - _LOGGER.debug("Stripping 'dev' from version.") - value = value.split(".") - value = ".".join(value[0:3]) - elif "master" in self._config["version"]: - value = "dev" - else: - value = self._config["version"] - - _LOGGER.debug("Using version: %s", value) - current = AwesomeVersion(value) + current = get_awesome_version(self._config["version"]) + _LOGGER.debug("Using version: %s", current) try: if current >= cutoff: diff --git a/openevsehttp/properties.py b/openevsehttp/properties.py index a658f34..3025397 100644 --- a/openevsehttp/properties.py +++ b/openevsehttp/properties.py @@ -9,6 +9,7 @@ from .const import MAX_AMPS, MIN_AMPS, states from .exceptions import UnsupportedFeature +from .utils import normalize_version _LOGGER = logging.getLogger(__name__) @@ -130,10 +131,8 @@ def max_current(self) -> int | None: def wifi_firmware(self) -> str | None: """Return the ESP firmware version.""" value = self._config.get("version") - if value is not None and "dev" in value: - _LOGGER.debug("Stripping 'dev' from version.") - value = value.split(".") - value = ".".join(value[0:3]) + if value is not None: + value = normalize_version(value) return value @property diff --git a/openevsehttp/utils.py b/openevsehttp/utils.py new file mode 100644 index 0000000..8c8e30d --- /dev/null +++ b/openevsehttp/utils.py @@ -0,0 +1,29 @@ +"""Utility functions for python-openevse-http.""" + +import logging +import re + +from awesomeversion import AwesomeVersion + +_LOGGER = logging.getLogger(__name__) + + +def normalize_version(version: str) -> str: + """Normalize the version string to strip 'dev' tag.""" + if "dev" in version: + _LOGGER.debug("Stripping 'dev' from version.") + value = version.split(".") + return ".".join(value[0:3]) + return version + + +def get_awesome_version(version: str) -> AwesomeVersion: + """Parse and normalize the version string, returning an AwesomeVersion.""" + if "master" in version: + version = "dev" + value = normalize_version(version) + if "dev" not in version: + firmware_search = re.search(r"\d+\.\d+\.\d+", value) + if firmware_search: + value = firmware_search.group(0) + return AwesomeVersion(value)