use crate::conntrack::conn::conn_info::ConnInfo;
use crate::conntrack::pdu::L4Pdu;
use crate::filter::Actions;
use crate::protocols::packet::tcp::{ACK, FIN, RST, SYN};
use crate::protocols::stream::ParserRegistry;
use crate::subscription::{Subscription, Trackable};
use anyhow::{bail, Result};
use std::collections::VecDeque;
#[derive(Debug)]
pub(crate) struct TcpFlow {
pub(super) next_seq: Option<u32>,
pub(super) consumed_flags: u8,
pub(crate) ooo_buf: OutOfOrderBuffer,
}
impl TcpFlow {
#[inline]
pub(super) fn default(capacity: usize) -> Self {
TcpFlow {
next_seq: None,
consumed_flags: 0,
ooo_buf: OutOfOrderBuffer::new(capacity),
}
}
#[inline]
pub(super) fn new(capacity: usize, next_seq: u32, flags: u8) -> Self {
TcpFlow {
next_seq: Some(next_seq),
consumed_flags: flags,
ooo_buf: OutOfOrderBuffer::new(capacity),
}
}
#[inline]
pub(super) fn insert_segment<T: Trackable>(
&mut self,
mut segment: L4Pdu,
info: &mut ConnInfo<T>,
subscription: &Subscription<T::Subscribed>,
registry: &ParserRegistry,
) {
let length = segment.length() as u32;
let cur_seq = segment.seq_no();
if let Some(next_seq) = self.next_seq {
if next_seq == cur_seq {
self.consumed_flags |= segment.flags();
if segment.flags() & RST != 0 {
info.consume_pdu(segment, subscription, registry);
return;
}
let mut expected_seq = cur_seq.wrapping_add(length);
if segment.flags() & FIN != 0 {
expected_seq = cur_seq.wrapping_add(1);
}
info.consume_pdu(segment, subscription, registry);
self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
} else if wrapping_lt(next_seq, cur_seq) {
self.buffer_ooo_seg(segment, info);
} else if let Some(expected_seq) = overlap(&mut segment, next_seq) {
self.consumed_flags |= segment.flags();
info.consume_pdu(segment, subscription, registry);
self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
} else {
log::debug!(
"Dropping old segment. cur: {} expect: {}",
cur_seq,
next_seq
);
drop(segment);
}
} else {
if segment.flags() & (SYN | ACK) != 0 {
let expected_seq = cur_seq.wrapping_add(1 + length);
self.next_seq = Some(expected_seq);
self.consumed_flags |= segment.flags();
info.consume_pdu(segment, subscription, registry);
self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
} else {
self.buffer_ooo_seg(segment, info);
}
}
}
#[inline]
fn buffer_ooo_seg<T: Trackable>(&mut self, segment: L4Pdu, info: &mut ConnInfo<T>) {
if self.ooo_buf.insert_back(segment).is_err() {
log::warn!("Out-of-order buffer overflow");
info.actions = Actions::new();
}
}
#[inline]
pub(super) fn flush_ooo_buffer<T: Trackable>(
&mut self,
expected_seq: u32,
info: &mut ConnInfo<T>,
subscription: &Subscription<T::Subscribed>,
registry: &ParserRegistry,
) {
if info.actions.drop() {
return;
}
let next_seq = self.ooo_buf.flush_ordered::<T>(
expected_seq,
&mut self.consumed_flags,
info,
subscription,
registry,
);
self.next_seq = Some(next_seq);
}
}
#[derive(Debug)]
pub(crate) struct OutOfOrderBuffer {
capacity: usize,
pub(crate) buf: VecDeque<L4Pdu>,
}
impl OutOfOrderBuffer {
fn new(capacity: usize) -> Self {
OutOfOrderBuffer {
capacity,
buf: VecDeque::new(),
}
}
pub(crate) fn len(&self) -> usize {
self.buf.len()
}
fn insert_back(&mut self, segment: L4Pdu) -> Result<()> {
log::debug!("insert with seq : {:#?}", segment.seq_no());
if self.len() >= self.capacity {
bail!("Out-of-order buffer overflow.");
}
self.buf.push_back(segment);
Ok(())
}
#[inline]
fn flush_ordered<T: Trackable>(
&mut self,
expected_seq: u32,
consumed_flags: &mut u8,
info: &mut ConnInfo<T>,
subscription: &Subscription<T::Subscribed>,
registry: &ParserRegistry,
) -> u32 {
let mut next_seq = expected_seq;
let mut index = 0;
while index < self.len() {
if info.actions.drop() {
return next_seq;
}
let cur_seq = self.buf.get_mut(index).unwrap().seq_no();
log::debug!("Flushing...current seq: {:#?}", cur_seq);
if next_seq == cur_seq {
let segment = self.buf.remove(index).unwrap();
*consumed_flags |= segment.flags();
if segment.flags() & RST != 0 {
info.consume_pdu(segment, subscription, registry);
return next_seq;
}
next_seq = next_seq.wrapping_add(segment.length() as u32);
if segment.flags() & FIN != 0 {
next_seq = next_seq.wrapping_add(1);
}
info.consume_pdu(segment, subscription, registry);
index = 0;
} else if wrapping_lt(next_seq, cur_seq) {
index += 1;
} else {
let mut segment = self.buf.remove(index).unwrap();
if let Some(update_seq) = overlap(&mut segment, next_seq) {
next_seq = update_seq;
*consumed_flags |= segment.flags();
info.consume_pdu(segment, subscription, registry);
index = 0;
} else {
log::debug!("Dropping old segment during flush.");
drop(segment);
index += 1;
}
}
}
next_seq
}
}
pub fn wrapping_lt(lhs: u32, rhs: u32) -> bool {
lhs.wrapping_sub(rhs) > (1 << 31)
}
fn overlap(segment: &mut L4Pdu, expected_seq: u32) -> Option<u32> {
let length = segment.length();
let cur_seq = segment.seq_no();
let end_seq = cur_seq.wrapping_add(length as u32);
if wrapping_lt(expected_seq, end_seq) {
let new_data_len = end_seq.wrapping_sub(expected_seq);
let overlap_data_len = expected_seq.wrapping_sub(cur_seq);
log::debug!("Overlap with new data size : {:#?}", new_data_len);
segment.ctxt.offset += overlap_data_len as usize;
segment.ctxt.length = new_data_len as usize;
Some(end_seq)
} else {
None
}
}