Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b419d8e
Add S7CommPlus protocol scaffolding for S7-1200/1500 support
gijzelaerr Feb 28, 2026
8e545c0
Add S7CommPlus server emulator, async client, and integration tests
gijzelaerr Feb 28, 2026
015314c
Clean up security-focused wording in S7CommPlus docstrings
gijzelaerr Feb 28, 2026
81b9e9f
Fix CI: remove pytest-asyncio dependency, fix formatting
gijzelaerr Feb 28, 2026
a399d19
Add pytest-asyncio dependency and use native async tests
gijzelaerr Feb 28, 2026
e5ac49e
Fix CI and add S7CommPlus end-to-end tests
gijzelaerr Mar 2, 2026
e48afb6
Enhance S7CommPlus connection with variable-length TSAP support and a…
gijzelaerr Mar 5, 2026
9fc9901
Add extensive debug logging to S7CommPlus protocol stack for real PLC…
gijzelaerr Mar 6, 2026
9a6ffcf
Fix S7CommPlus wire format for real PLC compatibility
gijzelaerr Mar 6, 2026
51179ae
Fix S7CommPlus LID byte offsets to use 1-based addressing
gijzelaerr Mar 9, 2026
010b358
Add S7CommPlus session setup and legacy S7 fallback for data operations
gijzelaerr Mar 9, 2026
61ba9f0
Potential fix for code scanning alert no. 9: Binding a socket to all …
gijzelaerr Mar 10, 2026
a35ef5a
Add S7CommPlus protocol scaffolding for S7-1200/1500 support
gijzelaerr Feb 28, 2026
2de52f5
Add S7CommPlus server emulator, async client, and integration tests
gijzelaerr Feb 28, 2026
9760271
Clean up security-focused wording in S7CommPlus docstrings
gijzelaerr Feb 28, 2026
189dc20
Fix CI: remove pytest-asyncio dependency, fix formatting
gijzelaerr Feb 28, 2026
4ad928c
Add pytest-asyncio dependency and use native async tests
gijzelaerr Feb 28, 2026
727afde
Fix CI and add S7CommPlus end-to-end tests
gijzelaerr Mar 2, 2026
38fb46b
Enhance S7CommPlus connection with variable-length TSAP support and a…
gijzelaerr Mar 5, 2026
607b169
Add extensive debug logging to S7CommPlus protocol stack for real PLC…
gijzelaerr Mar 6, 2026
d78b0eb
Fix S7CommPlus wire format for real PLC compatibility
gijzelaerr Mar 6, 2026
a4b75e6
Fix S7CommPlus LID byte offsets to use 1-based addressing
gijzelaerr Mar 9, 2026
e63ca22
Add S7CommPlus session setup and legacy S7 fallback for data operations
gijzelaerr Mar 9, 2026
1149257
Add S7CommPlus V2 protocol support (TLS + IntegrityId)
gijzelaerr Mar 20, 2026
4fd702a
Complete V2 legitimation with cryptography optional dependency
gijzelaerr Mar 20, 2026
440e470
Merge s7commplus-scaffolding into feature/s7commplus-v2
gijzelaerr Mar 24, 2026
823dd22
Merge master into feature/s7commplus-v2
gijzelaerr Mar 24, 2026
645cca4
Fix CI: skip V2 crypto tests when cryptography not installed
gijzelaerr Mar 24, 2026
23ed549
Install s7commplus extras in CI to test cryptography-dependent code
gijzelaerr Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ jobs:
- name: Install dependencies
run: |
uv venv --python python${{ matrix.python-version }}
uv pip install ".[test]"
uv pip install ".[test,s7commplus]"
- name: Run pytest
run: uv run pytest --cov=snap7 --cov-report=term
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Documentation = "https://python-snap7.readthedocs.io/en/latest/"

[project.optional-dependencies]
test = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-html", "hypothesis", "mypy", "types-setuptools", "ruff", "tox", "tox-uv", "types-click", "uv"]
s7commplus = ["cryptography"]
cli = ["rich", "click" ]
doc = ["sphinx", "sphinx_rtd_theme"]

Expand Down
2 changes: 1 addition & 1 deletion snap7/s7commplus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
The wire protocol (VLQ encoding, data types, function codes, object model)
is the same across all versions -- only the session authentication differs.

Status: experimental scaffolding -- not yet functional.
Status: V1 connection functional, V2 (TLS + IntegrityId) scaffolding complete.

Reference implementation:
https://github.com/thomas-v2/S7CommPlusDriver (C#, LGPL-3.0)
Expand Down
65 changes: 49 additions & 16 deletions snap7/s7commplus/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
ObjectId,
Opcode,
ProtocolVersion,
READ_FUNCTION_CODES,
S7COMMPLUS_LOCAL_TSAP,
S7COMMPLUS_REMOTE_TSAP,
)
from .codec import encode_header, decode_header, encode_typed_value, encode_object_qualifier
from .vlq import encode_uint32_vlq, decode_uint64_vlq
from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq
from .client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response

logger = logging.getLogger(__name__)
Expand All @@ -46,7 +47,7 @@
class S7CommPlusAsyncClient:
"""Async S7CommPlus client for S7-1200/1500 PLCs.

Supports V1 protocol. V2/V3/TLS planned for future.
Supports V1 and V2 protocols. V3/TLS planned for future.

Uses asyncio for all I/O operations and asyncio.Lock for
concurrent safety when shared between multiple coroutines.
Expand All @@ -70,6 +71,11 @@ def __init__(self) -> None:
self._rack: int = 0
self._slot: int = 1

# V2+ IntegrityId tracking
self._integrity_id_read: int = 0
self._integrity_id_write: int = 0
self._with_integrity_id: bool = False

@property
def connected(self) -> bool:
if self._use_legacy_data and self._legacy_client is not None:
Expand Down Expand Up @@ -189,6 +195,9 @@ async def disconnect(self) -> None:
self._session_id = 0
self._sequence_number = 0
self._protocol_version = 0
self._with_integrity_id = False
self._integrity_id_read = 0
self._integrity_id_write = 0

if self._writer:
try:
Expand Down Expand Up @@ -283,31 +292,48 @@ async def explore(self) -> bytes:
# -- Internal methods --

async def _send_request(self, function_code: int, payload: bytes) -> bytes:
"""Send an S7CommPlus request and receive the response."""
"""Send an S7CommPlus request and receive the response.

For V2+ with IntegrityId tracking, inserts IntegrityId after the
14-byte request header and strips it from the response.
"""
async with self._lock:
if not self._connected or self._writer is None or self._reader is None:
raise RuntimeError("Not connected")

seq_num = self._next_sequence_number()

request = (
struct.pack(
">BHHHHIB",
Opcode.REQUEST,
0x0000,
function_code,
0x0000,
seq_num,
self._session_id,
0x36,
)
+ payload
request_header = struct.pack(
">BHHHHIB",
Opcode.REQUEST,
0x0000,
function_code,
0x0000,
seq_num,
self._session_id,
0x36,
)

# For V2+ with IntegrityId, insert after header
integrity_id_bytes = b""
if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2:
is_read = function_code in READ_FUNCTION_CODES
integrity_id = self._integrity_id_read if is_read else self._integrity_id_write
integrity_id_bytes = encode_uint32_vlq(integrity_id)

request = request_header + integrity_id_bytes + payload

frame = encode_header(self._protocol_version, len(request)) + request
frame += struct.pack(">BBH", 0x72, self._protocol_version, 0x0000)
await self._send_cotp_dt(frame)

# Increment appropriate IntegrityId counter
if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2:
if function_code in READ_FUNCTION_CODES:
self._integrity_id_read = (self._integrity_id_read + 1) & 0xFFFFFFFF
else:
self._integrity_id_write = (self._integrity_id_write + 1) & 0xFFFFFFFF

response_data = await self._recv_cotp_dt()

version, data_length, consumed = decode_header(response_data)
Expand All @@ -316,7 +342,14 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
if len(response) < 14:
raise RuntimeError("Response too short")

return response[14:]
# For V2+, skip IntegrityId in response
resp_offset = 14
if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2:
if resp_offset < len(response):
_resp_iid, iid_consumed = decode_uint32_vlq(response, resp_offset)
resp_offset += iid_consumed

return response[resp_offset:]

async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None:
"""Perform COTP Connection Request / Confirm handshake."""
Expand Down
11 changes: 9 additions & 2 deletions snap7/s7commplus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
the client transparently falls back to the legacy S7 protocol for
data block read/write operations.

Status: V1 connection is functional. V2/V3/TLS authentication planned.
Status: V1 and V2 connections are functional. V3/TLS authentication planned.

Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0)
"""
Expand Down Expand Up @@ -108,6 +108,7 @@ def connect(
tls_cert: Optional[str] = None,
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""Connect to an S7-1200/1500 PLC using S7CommPlus.

Expand All @@ -119,10 +120,11 @@ def connect(
port: TCP port (default 102)
rack: PLC rack number
slot: PLC slot number
use_tls: Whether to attempt TLS (requires V3 PLC + certs)
use_tls: Whether to activate TLS (required for V2)
tls_cert: Path to client TLS certificate (PEM)
tls_key: Path to client private key (PEM)
tls_ca: Path to CA certificate for PLC verification (PEM)
password: PLC password for legitimation (V2+ with TLS)
"""
self._host = host
self._port = port
Expand All @@ -141,6 +143,11 @@ def connect(
tls_ca=tls_ca,
)

# Handle legitimation for password-protected PLCs
if password is not None and self._connection.tls_active:
logger.info("Performing PLC legitimation (password authentication)")
self._connection.authenticate(password)

# Probe S7CommPlus data operations with a minimal request
if not self._probe_s7commplus_data():
logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol")
Expand Down
Loading
Loading