diff --git a/src/plugins/input/parser/parser.cpp b/src/plugins/input/parser/parser.cpp index fcd117ea5..74cc0c476 100644 --- a/src/plugins/input/parser/parser.cpp +++ b/src/plugins/input/parser/parser.cpp @@ -682,6 +682,8 @@ void parse_packet( return; } Packet* pkt = &opt->pblock->pkts[opt->pblock->cnt]; + // reset all packet data + *pkt = Packet(); uint16_t data_offset = 0; DEBUG_MSG("---------- packet parser #%u -------------\n", ++s_total_pkts); @@ -693,18 +695,6 @@ void parse_packet( pkt->packet_len_wire = len; pkt->ts = ts; - pkt->src_port = 0; - pkt->dst_port = 0; - pkt->ip_proto = 0; - pkt->ip_ttl = 0; - pkt->ip_flags = 0; - pkt->ip_version = 0; - pkt->ip_payload_len = 0; - pkt->tcp_flags = 0; - pkt->tcp_window = 0; - pkt->tcp_options = 0; - pkt->tcp_mss = 0; - pkt->mplsTop = 0; stats.seen_packets++; @@ -758,12 +748,14 @@ void parse_packet( } l4_hdr_offset = data_offset; - if (pkt->ip_proto == IPPROTO_TCP) { - data_offset += parse_tcp_hdr(data + data_offset, caplen - data_offset, pkt, stats); - stats.tcp_packets++; - } else if (pkt->ip_proto == IPPROTO_UDP) { - data_offset += parse_udp_hdr(data + data_offset, caplen - data_offset, pkt, stats); - stats.udp_packets++; + if (pkt->frag_off == 0) { + if (pkt->ip_proto == IPPROTO_TCP) { + data_offset += parse_tcp_hdr(data + data_offset, caplen - data_offset, pkt, stats); + stats.tcp_packets++; + } else if (pkt->ip_proto == IPPROTO_UDP) { + data_offset += parse_udp_hdr(data + data_offset, caplen - data_offset, pkt, stats); + stats.udp_packets++; + } } } catch (const char* err) { DEBUG_MSG("%s\n", err); diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index bda8bb4ac..0fe470c48 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -321,11 +321,15 @@ void NHTFlowCache::flush(Packet& pkt, size_t flow_index, int ret, bool source_fl int NHTFlowCache::put_pkt(Packet& pkt) { - int ret = plugins_pre_create(pkt); - if (m_enable_fragmentation_cache) { try_to_fill_ports_to_fragmented_packet(pkt); } + return put_pkt_recursive(pkt); +} + +int NHTFlowCache::put_pkt_recursive(Packet& pkt) +{ + int ret = plugins_pre_create(pkt); if (!create_hash_key(pkt)) { // saves key value and key length into attributes NHTFlowCache::key // and NHTFlowCache::m_keylen @@ -429,7 +433,7 @@ int NHTFlowCache::put_pkt(Packet& pkt) // Flows with FIN or RST TCP flags are exported when new SYN packet arrives m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_EOF; export_flow(flow_index); - put_pkt(pkt); + put_pkt_recursive(pkt); return 0; } @@ -453,7 +457,7 @@ int NHTFlowCache::put_pkt(Packet& pkt) #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ - return put_pkt(pkt); + return put_pkt_recursive(pkt); } /* Check if flow record is expired (active timeout). */ @@ -464,7 +468,7 @@ int NHTFlowCache::put_pkt(Packet& pkt) #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ - return put_pkt(pkt); + return put_pkt_recursive(pkt); } ret = plugins_pre_update(flow->m_flow, pkt); diff --git a/src/plugins/storage/cache/src/cache.hpp b/src/plugins/storage/cache/src/cache.hpp index 704d6e402..8f0f5db30 100644 --- a/src/plugins/storage/cache/src/cache.hpp +++ b/src/plugins/storage/cache/src/cache.hpp @@ -308,6 +308,7 @@ class NHTFlowCache FlowEndReasonStats m_flow_end_reason_stats = {}; FlowRecordStats m_flow_record_stats = {}; + int put_pkt_recursive(Packet& pkt); void try_to_fill_ports_to_fragmented_packet(Packet& packet); void flush(Packet& pkt, size_t flow_index, int ret, bool source_flow); bool create_hash_key(Packet& pkt); diff --git a/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.cpp b/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.cpp index ea79dc570..000799f24 100644 --- a/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.cpp +++ b/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.cpp @@ -74,6 +74,8 @@ void FragmentationCache::fill_missing_packet_data( { if (!is_fragmentation_data_timedouted(packet, fragmentation_data)) { fill_ports_to_packet(packet, fragmentation_data); + } else { + m_stats.timeouted_fragments++; } } @@ -104,6 +106,7 @@ telemetry::Content FragmentationCache::get_cache_telemetry() dict["fragmentedTraffic"] = telemetry::ScalarWithUnit {trafficPercentage, "%"}; dict["fragmentedPackets"] = m_stats.fragmented_packets; dict["notFoundFragments"] = m_stats.not_found_fragments; + dict["timeoutedFragments"] = m_stats.timeouted_fragments; return dict; } diff --git a/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.hpp b/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.hpp index bb1633616..d8f2fa4b8 100644 --- a/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.hpp +++ b/src/plugins/storage/cache/src/fragmentationCache/fragmentationCache.hpp @@ -94,6 +94,7 @@ class FragmentationCache : TelemetryUtils { uint64_t first_fragments; uint64_t fragmented_packets; uint64_t not_found_fragments; + uint64_t timeouted_fragments; uint64_t total_packets; };