-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
62 lines (53 loc) · 2.33 KB
/
parser.py
File metadata and controls
62 lines (53 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import struct
from ..enums import FunctionCode, HeaderError, MessageType
from .error import S7Error
from .headers import S7AckDataHeader, S7Header
from .packet import S7Packet
from .rw_variable import (
ReadVariableResponse,
VariableReadRequest,
VariableWriteRequest,
WriteVariableResponse,
)
from .setup_communication import SetupCommunicationRequest
from .user_data import UserDataResponse
response_code_to_packet: dict[FunctionCode, type[S7Packet]] = {
FunctionCode.SetupCommunication: SetupCommunicationRequest,
FunctionCode.ReadVariable: ReadVariableResponse,
FunctionCode.WriteVariable: WriteVariableResponse,
}
request_code_to_packet: dict[FunctionCode, type[S7Packet]] = {
FunctionCode.SetupCommunication: SetupCommunicationRequest,
FunctionCode.ReadVariable: VariableReadRequest,
FunctionCode.WriteVariable: VariableWriteRequest,
}
class S7PacketParser:
@staticmethod
def parse(packet: bytes) -> S7Packet:
header: S7Header
response: S7Packet
protocol_id = packet[0]
if protocol_id != S7Header.PROTOCOL_ID:
raise ValueError(f"Invalid protocol id. received: {protocol_id}, expected: {S7Header.PROTOCOL_ID}")
message_type = MessageType(packet[1])
if message_type in (MessageType.Response, MessageType.Ack):
header = S7AckDataHeader.parse(packet)
error_class = HeaderError(header.error_class)
if error_class != HeaderError.NO_ERROR:
return S7Error(header=header)
function_code = struct.unpack_from("!B", packet, header.LENGTH)[0]
# parse parameter
parameter = packet[header.LENGTH :]
response = response_code_to_packet[function_code].parse(parameter)
elif message_type == MessageType.JobRequest:
header = S7Header.parse(packet)
function_code = struct.unpack_from("!B", packet, header.LENGTH)[0]
parameter = packet[header.LENGTH :]
response = request_code_to_packet[function_code].parse(parameter)
elif message_type == MessageType.Userdata:
header = S7Header.parse(packet)
response = UserDataResponse.parse(packet[header.LENGTH :])
else:
raise ValueError(f"Invalid message type: {message_type}")
response.header = header
return response