1use crate::conntrack::conn::tcp_conn::reassembly::wrapping_lt;
24use crate::conntrack::conn_id::FiveTuple;
25use crate::conntrack::pdu::{L4Context, L4Pdu};
26use crate::conntrack::ConnTracker;
27use crate::filter::FilterResult;
28use crate::memory::mbuf::Mbuf;
29use crate::protocols::packet::tcp::{ACK, FIN, RST, SYN};
30use crate::protocols::stream::{ConnParser, Session};
31use crate::subscription::{Level, Subscribable, Subscription, Trackable};
32
33use serde::ser::{SerializeStruct, Serializer};
34use serde::Serialize;
35
36use std::collections::HashMap;
37use std::fmt;
38use std::net::SocketAddr;
39use std::time::{Duration, Instant};
40
41const HIST_SYN: u8 = b'S';
43const HIST_SYNACK: u8 = b'H';
45const HIST_ACK: u8 = b'A';
47const HIST_DATA: u8 = b'D';
49const HIST_FIN: u8 = b'F';
51const HIST_RST: u8 = b'R';
53
54#[derive(Debug)]
60pub struct Connection {
61 pub five_tuple: FiveTuple,
63 pub ts: Instant,
70 pub duration: Duration,
77 pub max_inactivity: Duration,
79 pub time_to_second_packet: Duration,
81 pub history: Vec<u8>,
97 pub orig: Flow,
99 pub resp: Flow,
101}
102
103impl Connection {
104 #[inline]
106 pub fn client(&self) -> SocketAddr {
107 self.five_tuple.orig
108 }
109
110 #[inline]
112 pub fn server(&self) -> SocketAddr {
113 self.five_tuple.resp
114 }
115
116 #[inline]
118 pub fn total_pkts(&self) -> u64 {
119 self.orig.nb_pkts + self.resp.nb_pkts
120 }
121
122 #[inline]
124 pub fn total_bytes(&self) -> u64 {
125 self.orig.nb_bytes + self.resp.nb_bytes
126 }
127
128 #[inline]
130 pub fn history(&self) -> String {
131 String::from_utf8_lossy(&self.history).into_owned()
132 }
133}
134
135impl Serialize for Connection {
136 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
137 where
138 S: Serializer,
139 {
140 let mut state = serializer.serialize_struct("Connection", 6)?;
141 state.serialize_field("five_tuple", &self.five_tuple)?;
142 state.serialize_field("duration", &self.duration)?;
143 state.serialize_field("max_inactivity", &self.max_inactivity)?;
144 state.serialize_field("history", &self.history())?;
145 state.serialize_field("orig", &self.orig)?;
146 state.serialize_field("resp", &self.resp)?;
147 state.end()
148 }
149}
150
151impl fmt::Display for Connection {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 write!(f, "{}: {}", self.five_tuple, self.history())?;
154 Ok(())
155 }
156}
157
158impl Subscribable for Connection {
159 type Tracked = TrackedConnection;
160
161 fn level() -> Level {
162 Level::Connection
163 }
164
165 fn parsers() -> Vec<ConnParser> {
167 vec![]
168 }
169
170 fn process_packet(
171 mbuf: Mbuf,
172 subscription: &Subscription<Self>,
173 conn_tracker: &mut ConnTracker<Self::Tracked>,
174 ) {
175 match subscription.filter_packet(&mbuf) {
176 FilterResult::MatchTerminal(idx) | FilterResult::MatchNonTerminal(idx) => {
177 if let Ok(ctxt) = L4Context::new(&mbuf, idx) {
178 conn_tracker.process(mbuf, ctxt, subscription);
179 }
180 }
181 FilterResult::NoMatch => drop(mbuf),
182 }
183 }
184}
185
186#[doc(hidden)]
192pub struct TrackedConnection {
193 five_tuple: FiveTuple,
194 first_seen_ts: Instant,
195 second_seen_ts: Instant,
196 last_seen_ts: Instant,
197 max_inactivity: Duration,
198 history: Vec<u8>,
199 ctos: Flow,
200 stoc: Flow,
201}
202
203impl TrackedConnection {
204 #[inline]
205 fn update(&mut self, segment: L4Pdu) {
206 let now = Instant::now();
207 let inactivity = now - self.last_seen_ts;
208 if inactivity > self.max_inactivity {
209 self.max_inactivity = inactivity;
210 }
211 self.last_seen_ts = now;
212
213 if segment.dir {
214 self.update_history(&segment, 0x0);
215 self.ctos.insert_segment(segment);
216 } else {
217 self.update_history(&segment, 0x20);
218 self.stoc.insert_segment(segment);
219 }
220
221 if self.ctos.nb_pkts + self.stoc.nb_pkts == 2 {
222 self.second_seen_ts = now;
223 }
224 }
225
226 #[inline]
227 fn update_history(&mut self, segment: &L4Pdu, mask: u8) {
228 fn insert(history: &mut Vec<u8>, event: u8) {
229 if !history.contains(&event) {
230 history.push(event);
231 }
232 }
233 if segment.flags() == SYN {
234 insert(&mut self.history, HIST_SYN ^ mask);
235 } else if segment.flags() == (SYN | ACK) {
236 insert(&mut self.history, HIST_SYNACK ^ mask);
237 } else if segment.flags() == ACK && segment.length() == 0 {
238 insert(&mut self.history, HIST_ACK ^ mask);
239 }
240
241 if segment.flags() & FIN != 0 {
242 insert(&mut self.history, HIST_FIN ^ mask);
243 }
244 if segment.flags() & RST != 0 {
245 insert(&mut self.history, HIST_RST ^ mask);
246 }
247 if segment.length() > 0 {
248 insert(&mut self.history, HIST_DATA ^ mask);
249 }
250 }
251}
252
253impl Trackable for TrackedConnection {
254 type Subscribed = Connection;
255
256 fn new(five_tuple: FiveTuple) -> Self {
257 let now = Instant::now();
258 TrackedConnection {
259 five_tuple,
260 first_seen_ts: now,
261 second_seen_ts: now,
262 last_seen_ts: now,
263 max_inactivity: Duration::default(),
264 history: Vec::with_capacity(16),
265 ctos: Flow::new(),
266 stoc: Flow::new(),
267 }
268 }
269
270 fn pre_match(&mut self, pdu: L4Pdu, _session_id: Option<usize>) {
271 self.update(pdu);
272 }
273
274 fn on_match(&mut self, _session: Session, _subscription: &Subscription<Self::Subscribed>) {
275 }
277
278 fn post_match(&mut self, pdu: L4Pdu, _subscription: &Subscription<Self::Subscribed>) {
279 self.update(pdu)
280 }
281
282 fn on_terminate(&mut self, subscription: &Subscription<Self::Subscribed>) {
283 let (duration, max_inactivity, time_to_second_packet) =
284 if self.ctos.nb_pkts + self.stoc.nb_pkts == 1 {
285 (
286 Duration::default(),
287 Duration::default(),
288 Duration::default(),
289 )
290 } else {
291 (
292 self.last_seen_ts - self.first_seen_ts,
293 self.max_inactivity,
294 self.second_seen_ts - self.first_seen_ts,
295 )
296 };
297
298 let conn = Connection {
299 five_tuple: self.five_tuple,
300 ts: self.first_seen_ts,
301 duration,
302 max_inactivity,
303 time_to_second_packet,
304 history: self.history.clone(),
305 orig: self.ctos.clone(),
306 resp: self.stoc.clone(),
307 };
308 subscription.invoke(conn);
309 }
310}
311
312#[derive(Debug, Clone, Serialize)]
314pub struct Flow {
315 pub nb_pkts: u64,
323 pub nb_malformed_pkts: u64,
325 pub nb_late_start_pkts: u64,
327 pub nb_bytes: u64,
330 pub max_simult_gaps: u64,
335 pub data_start: u32,
338 pub capacity: usize,
341 pub chunks: Vec<Chunk>,
343 pub gaps: HashMap<u32, u64>,
346}
347
348impl Flow {
349 fn new() -> Self {
350 Flow {
351 nb_pkts: 0,
352 nb_malformed_pkts: 0,
353 nb_late_start_pkts: 0,
354 nb_bytes: 0,
355 max_simult_gaps: 0,
356 data_start: 0,
357 capacity: 100, chunks: Vec::with_capacity(100),
359 gaps: HashMap::new(),
360 }
361 }
362
363 #[inline]
364 fn insert_segment(&mut self, segment: L4Pdu) {
365 self.nb_pkts += 1;
366
367 if segment.offset() > segment.mbuf.data_len()
368 || (segment.offset() + segment.length()) > segment.mbuf.data_len()
369 {
370 self.nb_malformed_pkts += 1;
371 return;
372 }
373 self.nb_bytes += segment.length() as u64;
374
375 let seq_no = if segment.flags() & SYN != 0 {
376 segment.seq_no().wrapping_add(1)
377 } else {
378 segment.seq_no()
379 };
380
381 if self.chunks.is_empty() {
382 self.data_start = seq_no;
383 }
384
385 if wrapping_lt(seq_no, self.data_start) {
386 self.nb_late_start_pkts += 1;
387 return;
388 }
389
390 if self.chunks.len() < self.capacity {
391 let seg_start = seq_no.wrapping_sub(self.data_start);
392 let seg_end = seg_start + segment.length() as u32;
393
394 self.merge_chunk(Chunk(seg_start, seg_end));
395 }
396 }
397
398 #[inline]
401 fn merge_chunk(&mut self, chunk: Chunk) {
402 let mut start = chunk.0;
403 let mut end = chunk.1;
404
405 let mut result = vec![];
406 let mut inserted = false;
407 for chunk in self.chunks.iter() {
408 if inserted || start > chunk.1 {
409 result.push(*chunk);
410 } else if end < chunk.0 {
411 inserted = true;
412 result.push(Chunk(start, end));
413 result.push(*chunk);
414 } else {
415 start = std::cmp::min(start, chunk.0);
416 end = std::cmp::max(end, chunk.1);
417 }
418 }
419 if !inserted {
420 result.push(Chunk(start, end));
421 }
422
423 for chunk in result[..result.len() - 1].iter() {
424 *self.gaps.entry(chunk.1).or_insert(0) += 1;
425 }
426
427 if result.len().saturating_sub(1) as u64 > self.max_simult_gaps {
428 self.max_simult_gaps += 1;
429 }
430 self.chunks = result;
431 }
432
433 #[inline]
438 pub fn content_gaps(&self) -> u64 {
439 self.chunks.len().saturating_sub(1) as u64
440 }
441
442 #[inline]
447 pub fn missed_bytes(&self) -> u64 {
448 self.chunks.windows(2).map(|w| w[1].0 - w[0].1).sum::<u32>() as u64
449 }
450
451 #[inline]
454 pub fn mean_pkts_to_fill(&self) -> Option<f64> {
455 if self.gaps.is_empty() {
456 return None;
457 }
458 let mut sum = 0;
459 for val in self.gaps.values() {
460 sum += *val;
461 }
462 Some(sum as f64 / self.gaps.len() as f64)
463 }
464
465 #[inline]
468 pub fn median_pkts_to_fill(&self) -> Option<u64> {
469 if self.gaps.is_empty() {
470 return None;
471 }
472 let mut values = self.gaps.values().collect::<Vec<_>>();
473 values.sort();
474 let mid = values.len() / 2;
475 Some(*values[mid])
476 }
477}
478
479#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Serialize)]
481pub struct Chunk(u32, u32);
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[test]
488 fn core_merge_chunk_fill_single() {
489 let mut flow = Flow::new();
490 flow.chunks = vec![Chunk(0, 3), Chunk(4, 5)];
491 flow.merge_chunk(Chunk(3, 4));
492 assert_eq!(flow.chunks, vec![Chunk(0, 5)]);
493 }
494
495 #[test]
496 fn core_merge_chunk_fill_multiple() {
497 let mut flow = Flow::new();
498 flow.chunks = vec![Chunk(0, 3), Chunk(4, 5), Chunk(8, 10)];
499 flow.merge_chunk(Chunk(2, 12));
500 assert_eq!(flow.chunks, vec![Chunk(0, 12)]);
501 }
502
503 #[test]
504 fn core_merge_chunk_create_hole() {
505 let mut flow = Flow::new();
506 flow.chunks = vec![Chunk(0, 3), Chunk(8, 10)];
507 flow.merge_chunk(Chunk(4, 5));
508 assert_eq!(flow.chunks, vec![Chunk(0, 3), Chunk(4, 5), Chunk(8, 10)]);
509 }
510
511 #[test]
512 fn core_merge_chunk_fill_overlap() {
513 let mut flow = Flow::new();
514 flow.chunks = vec![Chunk(0, 3), Chunk(8, 10)];
515 flow.merge_chunk(Chunk(5, 9));
516 assert_eq!(flow.chunks, vec![Chunk(0, 3), Chunk(5, 10)]);
517 }
518
519 #[test]
520 fn core_merge_chunk_start() {
521 let mut flow = Flow::new();
522 flow.chunks = vec![Chunk(4, 6), Chunk(8, 10)];
523 flow.merge_chunk(Chunk(0, 2));
524 assert_eq!(flow.chunks, vec![Chunk(0, 2), Chunk(4, 6), Chunk(8, 10)]);
525 }
526
527 #[test]
528 fn core_merge_chunk_end() {
529 let mut flow = Flow::new();
530 flow.chunks = vec![Chunk(4, 6), Chunk(8, 10)];
531 flow.merge_chunk(Chunk(11, 15));
532 assert_eq!(flow.chunks, vec![Chunk(4, 6), Chunk(8, 10), Chunk(11, 15)]);
533 }
534}