-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathunittest.py
More file actions
93 lines (70 loc) · 2.62 KB
/
unittest.py
File metadata and controls
93 lines (70 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
'''
Created on 27 jun. 2016
@author: pim
'''
import socket
import sys
try:
import sflow
import util
except:
from sflow import sflow
from sflow import util
def show_num_records(s_records):
for sample in s_records:
if sample.sample_type == 1:
return(" FlowSample: %d records\n" % sample.num_flow_records)
elif sample.sample_type == 2:
return(" CountersSample: %d records\n" % sample.num_counter_records)
def repr_flow(flow_datagram):
# print(repr(flow_datagram))
return repr(flow_datagram)
def show_ipv4_addr(flow_datagram):
"""
get from the flow records, the IPv4 src and dst IP addresses
from raw Flow sample records
"""
retstr = "\nFlow: %d (%d samples)\n" % (flow_datagram.sequence_number, flow_datagram.num_samples)
n = 1
for sample in flow_datagram.sample_records:
m=1
if sample.sample_type == 1: # FlowSample
for rec in sample.flow_records:
if rec.type == sflow.FLOW_DATA_RAW_HEADER:
retstr += " Raw FlowSample %d(%d).%d (proto %d)\n" % (n, sample.num_flow_records, m, rec.header_protocol)
m += 1
pkt = rec.sampled_packet
if pkt != None:
payl = pkt.payload
if payl != None:
retstr += " src: %s; dst: %s\n" % (util.ip_to_string(payl.src), util.ip_to_string(payl.dst))
n += 1
return retstr
if __name__ == '__main__':
listen_addr = ("0.0.0.0", 5700)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(listen_addr)
while True:
data, addr = sock.recvfrom(65535)
flow_data = sflow.Datagram()
flow_data.unpack(addr, data)
# Test 1
#print("Flow: %d (%d samples)" % (flow_data.sequence_number, flow_data.num_samples))
#sys.stdout.write(show_num_records(flow_data.sample_records))
# Test 2
sys.stdout.write(repr_flow(flow_data))
# Test 3
#sys.stdout.write(show_ipv4_addr(flow_data))
# Test 4
# unpack and pack again, unpack to compare
print("\n\nOrignal data from stream")
sys.stdout.write(repr(flow_data))
new_data = flow_data.pack()
new_flow_data = sflow.Datagram()
new_flow_data.unpack(addr, new_data)
print("Data unpacked and packed again")
sys.stdout.write(repr(new_flow_data))
print("Hexdump original data")
util.hexdump_bytes(data)
print("Hexdump repacked data")
util.hexdump_bytes(new_data)