Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 236 additions & 18 deletions snap7/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"""

import logging
import random
import struct
import threading
import time
from typing import List, Any, Optional, Tuple, Union, Callable, cast
from datetime import datetime
Expand Down Expand Up @@ -58,12 +60,33 @@ class Client(ClientMixin):

MAX_VARS = 20 # Max variables per multi-read/multi-write request

def __init__(self, lib_location: Optional[str] = None, **kwargs: Any):
def __init__(
self,
lib_location: Optional[str] = None,
*,
auto_reconnect: bool = False,
max_retries: int = 3,
retry_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 30.0,
heartbeat_interval: float = 0,
on_disconnect: Optional[Callable[[], None]] = None,
on_reconnect: Optional[Callable[[], None]] = None,
**kwargs: Any,
):
"""
Initialize S7 client.

Args:
lib_location: Ignored. Kept for backwards compatibility.
auto_reconnect: Enable automatic reconnection on connection loss.
max_retries: Maximum number of reconnection attempts.
retry_delay: Initial delay between reconnection attempts in seconds.
backoff_factor: Multiplier for exponential backoff between retries.
max_delay: Maximum delay between reconnection attempts in seconds.
heartbeat_interval: Interval in seconds for heartbeat probes (0=disabled).
on_disconnect: Optional callback invoked when connection is lost.
on_reconnect: Optional callback invoked after successful reconnection.
**kwargs: Ignored. Kept for backwards compatibility.
"""
self.connection: Optional[ISOTCPConnection] = None
Expand Down Expand Up @@ -107,8 +130,37 @@ def __init__(self, lib_location: Optional[str] = None, **kwargs: Any):
self._last_error = 0
self._exec_time = 0

# Auto-reconnection settings
self._auto_reconnect = auto_reconnect
self._max_retries = max_retries
self._retry_delay = retry_delay
self._backoff_factor = backoff_factor
self._max_delay = max_delay
self._on_disconnect = on_disconnect
self._on_reconnect = on_reconnect

# Heartbeat settings
self._heartbeat_interval = heartbeat_interval
self._heartbeat_thread: Optional[threading.Thread] = None
self._heartbeat_stop_event = threading.Event()
self._is_alive = False

# Lock for thread safety during reconnection
self._reconnect_lock = threading.Lock()

logger.info("S7Client initialized (pure Python implementation)")

@property
def is_alive(self) -> bool:
"""Whether the connection is alive according to the last heartbeat probe.

Returns True if heartbeat is disabled but the client is connected,
or if the last heartbeat probe succeeded.
"""
if self._heartbeat_interval <= 0:
return self.connected
return self._is_alive

def _get_connection(self) -> ISOTCPConnection:
"""Get connection, raising if not connected."""
if self.connection is None:
Expand Down Expand Up @@ -150,6 +202,152 @@ def _send_receive(self, request: bytes, max_stale_retries: int = 3) -> dict[str,

raise S7ProtocolError("Failed to receive valid response") # Should not reach here

def _send_receive_with_reconnect(self, request_builder: Callable[[], bytes], max_stale_retries: int = 3) -> dict[str, Any]:
"""Send a request with automatic reconnection on connection loss.

If auto_reconnect is disabled, behaves identically to _send_receive.
When enabled, catches connection errors, reconnects, rebuilds the request
(since the protocol sequence counter may have changed), and retries.

Args:
request_builder: Callable that builds the request bytes. Called again
after reconnection to get a fresh request with updated sequence.
max_stale_retries: Max times to retry receive on stale packets.

Returns:
Parsed S7 response dict.
"""
try:
return self._send_receive(request_builder(), max_stale_retries)
except (S7ConnectionError, OSError) as e:
if not self._auto_reconnect:
raise
logger.warning(f"Connection lost during operation: {e}")
self._do_reconnect()
return self._send_receive(request_builder(), max_stale_retries)

def _do_reconnect(self) -> None:
"""Perform reconnection with exponential backoff and jitter.

Raises:
S7ConnectionError: If all reconnection attempts fail.
"""
with self._reconnect_lock:
# Check if another thread already reconnected
if self.connected and self.connection is not None:
try:
if self.connection.check_connection():
return
except Exception:
pass

self._is_alive = False
if self._on_disconnect is not None:
try:
self._on_disconnect()
except Exception:
logger.debug("on_disconnect callback raised an exception", exc_info=True)

delay = self._retry_delay
last_error: Optional[Exception] = None

for attempt in range(1, self._max_retries + 1):
logger.info(f"Reconnection attempt {attempt}/{self._max_retries}")

# Clean up old connection
try:
if self.connection is not None:
self.connection.disconnect()
self.connection = None
except Exception:
pass
self.connected = False

try:
# Re-establish connection using stored parameters
self.connection = ISOTCPConnection(
host=self.host, port=self.port, local_tsap=self.local_tsap, remote_tsap=self.remote_tsap
)
self.connection.connect()

# Re-create protocol to reset sequence counters
self.protocol = S7Protocol()
self._setup_communication()

self.connected = True
self._is_alive = True
logger.info(f"Reconnected to {self.host}:{self.port}")

if self._on_reconnect is not None:
try:
self._on_reconnect()
except Exception:
logger.debug("on_reconnect callback raised an exception", exc_info=True)
return
except Exception as e:
last_error = e
logger.warning(f"Reconnection attempt {attempt} failed: {e}")

if attempt < self._max_retries:
# Exponential backoff with jitter
jitter = random.uniform(0, delay * 0.1)
sleep_time = min(delay + jitter, self._max_delay)
logger.debug(f"Waiting {sleep_time:.2f}s before next attempt")
time.sleep(sleep_time)
delay = min(delay * self._backoff_factor, self._max_delay)

raise S7ConnectionError(f"Reconnection failed after {self._max_retries} attempts: {last_error}")

def _start_heartbeat(self) -> None:
"""Start the heartbeat background thread."""
if self._heartbeat_interval <= 0:
return

self._heartbeat_stop_event.clear()
self._is_alive = True
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True, name="s7-heartbeat")
self._heartbeat_thread.start()
logger.debug(f"Heartbeat started with interval {self._heartbeat_interval}s")

def _stop_heartbeat(self) -> None:
"""Stop the heartbeat background thread."""
self._heartbeat_stop_event.set()
if self._heartbeat_thread is not None:
self._heartbeat_thread.join(timeout=self._heartbeat_interval + 2)
self._heartbeat_thread = None
logger.debug("Heartbeat stopped")

def _heartbeat_loop(self) -> None:
"""Background loop that periodically probes the PLC connection."""
while not self._heartbeat_stop_event.is_set():
if self._heartbeat_stop_event.wait(timeout=self._heartbeat_interval):
break # Stop event was set

if not self.connected:
self._is_alive = False
if self._auto_reconnect:
try:
self._do_reconnect()
except S7ConnectionError:
logger.warning("Heartbeat reconnection failed")
continue

try:
with self._reconnect_lock:
if self.connected and self.connection is not None:
self.get_cpu_state()
self._is_alive = True
except Exception as e:
logger.warning(f"Heartbeat probe failed: {e}")
self._is_alive = False
self.connected = False

if self._auto_reconnect:
try:
self._do_reconnect()
except S7ConnectionError:
logger.warning("Heartbeat reconnection failed")

def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "Client":
"""
Connect to S7 PLC.
Expand Down Expand Up @@ -187,9 +385,13 @@ def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "C
self._setup_communication()

self.connected = True
self._is_alive = True
self._exec_time = int((time.time() - start_time) * 1000)
logger.info(f"Connected to {address}:{tcp_port} rack {rack} slot {slot}")

# Start heartbeat if configured
self._start_heartbeat()

except Exception as e:
self.disconnect()
if isinstance(e, S7Error):
Expand All @@ -205,11 +407,15 @@ def disconnect(self) -> int:
Returns:
0 on success
"""
# Stop heartbeat first
self._stop_heartbeat()

if self.connection:
self.connection.disconnect()
self.connection = None

self.connected = False
self._is_alive = False
logger.info(f"Disconnected from {self.host}:{self.port}")
return 0

Expand Down Expand Up @@ -359,11 +565,13 @@ def read_area(self, area: Area, db_number: int, start: int, size: int, word_len:

max_chunk = self._max_read_size()
if size <= max_chunk:
# Single request
request = self.protocol.build_read_request(
area=s7_area, db_number=db_number, start=start, word_len=s7_word_len, count=size
)
response = self._send_receive(request)
# Single request - use reconnect-aware send/receive
def build_request() -> bytes:
return self.protocol.build_read_request(
area=s7_area, db_number=db_number, start=start, word_len=s7_word_len, count=size
)

response = self._send_receive_with_reconnect(build_request)
values = self.protocol.extract_read_data(response, s7_word_len, size)
self._exec_time = int((time.time() - start_time) * 1000)
return bytearray(values)
Expand All @@ -374,10 +582,14 @@ def read_area(self, area: Area, db_number: int, start: int, size: int, word_len:
remaining = size
while remaining > 0:
chunk_size = min(remaining, max_chunk)
request = self.protocol.build_read_request(
area=s7_area, db_number=db_number, start=start + offset, word_len=s7_word_len, count=chunk_size
)
response = self._send_receive(request)
chunk_offset = offset

def build_chunk_request(o: int = chunk_offset, cs: int = chunk_size) -> bytes:
return self.protocol.build_read_request(
area=s7_area, db_number=db_number, start=start + o, word_len=s7_word_len, count=cs
)

response = self._send_receive_with_reconnect(build_chunk_request)
values = self.protocol.extract_read_data(response, s7_word_len, chunk_size)
result.extend(values)
offset += chunk_size
Expand Down Expand Up @@ -421,10 +633,12 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray, wo
max_chunk = self._max_write_size()
if len(data) <= max_chunk:
# Single request
request = self.protocol.build_write_request(
area=s7_area, db_number=db_number, start=start, word_len=s7_word_len, data=bytes(data)
)
response = self._send_receive(request)
def build_request() -> bytes:
return self.protocol.build_write_request(
area=s7_area, db_number=db_number, start=start, word_len=s7_word_len, data=bytes(data)
)

response = self._send_receive_with_reconnect(build_request)
self.protocol.check_write_response(response)
self._exec_time = int((time.time() - start_time) * 1000)
return 0
Expand All @@ -435,10 +649,14 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray, wo
while remaining > 0:
chunk_size = min(remaining, max_chunk)
chunk_data = data[offset : offset + chunk_size]
request = self.protocol.build_write_request(
area=s7_area, db_number=db_number, start=start + offset, word_len=s7_word_len, data=bytes(chunk_data)
)
response = self._send_receive(request)
chunk_offset = offset

def build_chunk_request(o: int = chunk_offset, cd: bytes = bytes(chunk_data)) -> bytes:
return self.protocol.build_write_request(
area=s7_area, db_number=db_number, start=start + o, word_len=s7_word_len, data=cd
)

response = self._send_receive_with_reconnect(build_chunk_request)
self.protocol.check_write_response(response)
offset += chunk_size
remaining -= chunk_size
Expand Down
Loading
Loading