use crate::conntrack::pdu::L4Pdu;
use crate::filter::Actions;
use crate::lcore::CoreId;
use crate::protocols::packet::tcp::TCP_PROTOCOL;
use crate::protocols::stream::{
ConnData, ParseResult, ParserRegistry, ParsingState, ProbeRegistryResult,
};
use crate::subscription::{Subscription, Trackable};
use crate::FiveTuple;
#[derive(Debug)]
pub(crate) struct ConnInfo<T>
where
T: Trackable,
{
pub(crate) actions: Actions,
pub(crate) cdata: ConnData,
pub(crate) sdata: T,
}
impl<T> ConnInfo<T>
where
T: Trackable,
{
pub(super) fn new(pdu: &L4Pdu, core_id: CoreId) -> Self {
let five_tuple = FiveTuple::from_ctxt(pdu.ctxt);
ConnInfo {
actions: Actions::new(),
cdata: ConnData::new(five_tuple),
sdata: T::new(pdu, core_id),
}
}
pub(crate) fn filter_first_packet(
&mut self,
pdu: &L4Pdu,
subscription: &Subscription<T::Subscribed>,
) {
assert!(self.actions.drop());
let pkt_actions = subscription.filter_packet(pdu.mbuf_ref(), &self.sdata);
self.actions = pkt_actions;
}
pub(crate) fn consume_pdu(
&mut self,
pdu: L4Pdu,
subscription: &Subscription<T::Subscribed>,
registry: &ParserRegistry,
) {
if self.actions.drop() {
drop(pdu);
return;
}
if self.actions.parse_any() {
self.handle_parse(&pdu, subscription, registry);
}
if self.actions.update_pdu_reassembled() && pdu.ctxt.proto == TCP_PROTOCOL {
self.sdata.update(&pdu, true);
}
if self.actions.packet_deliver() {
subscription.deliver_packet(pdu.mbuf_ref(), &self.cdata, &self.sdata);
}
if self.actions.buffer_frame() {
self.sdata.track_packet(pdu.mbuf_own());
}
}
fn handle_parse(
&mut self,
pdu: &L4Pdu,
subscription: &Subscription<T::Subscribed>,
registry: &ParserRegistry,
) {
if self.actions.session_probe() {
self.on_probe(pdu, subscription, registry);
}
if self.actions.session_parse() {
self.on_parse(pdu, subscription);
}
}
fn on_probe(
&mut self,
pdu: &L4Pdu,
subscription: &Subscription<T::Subscribed>,
registry: &ParserRegistry,
) {
match registry.probe_all(pdu) {
ProbeRegistryResult::Some(conn_parser) => {
self.cdata.conn_parser = conn_parser;
self.done_probe(subscription);
}
ProbeRegistryResult::None => {
self.done_probe(subscription);
}
ProbeRegistryResult::Unsure => { }
}
}
fn done_probe(&mut self, subscription: &Subscription<T::Subscribed>) {
#[cfg(debug_assertions)]
{
if !self.actions.apply_proto_filter() {
assert!(self.actions.drop() || !self.actions.terminal_actions.is_none());
}
}
if self.actions.apply_proto_filter() {
let actions = subscription.filter_protocol(&self.cdata, &self.sdata);
self.clear_stale_data(&actions);
self.actions.update(&actions);
}
self.actions.session_done_probe();
}
fn on_parse(&mut self, pdu: &L4Pdu, subscription: &Subscription<T::Subscribed>) {
match self.cdata.conn_parser.parse(pdu) {
ParseResult::Done(id) => self.handle_session(subscription, id),
ParseResult::None => self.session_done_parse(subscription),
_ => {} }
}
fn handle_session(&mut self, subscription: &Subscription<T::Subscribed>, id: usize) {
if let Some(session) = self.cdata.conn_parser.remove_session(id) {
let session_track = self.actions.session_track();
if self.actions.apply_session_filter() {
let actions = subscription.filter_session(&session, &self.cdata, &self.sdata);
self.clear_stale_data(&actions);
self.actions.update(&actions);
}
if session_track || self.actions.session_track() {
self.sdata.track_session(session);
}
} else {
log::error!("Done parsing but no session found");
}
self.session_done_parse(subscription);
}
fn session_done_parse(&mut self, subscription: &Subscription<T::Subscribed>) {
match self.cdata.conn_parser.session_parsed_state() {
ParsingState::Probing => {
self.actions.session_set_probe();
}
ParsingState::Stop => {
self.actions.session_clear_parse();
if self.actions.conn_deliver_only() {
self.handle_terminate(subscription);
self.actions.clear();
}
}
ParsingState::Parsing => {
}
}
}
pub(crate) fn handle_terminate(&mut self, subscription: &Subscription<T::Subscribed>) {
if self.actions.session_parse() {
for session in self.cdata.conn_parser.drain_sessions() {
let session_track = self.actions.session_track();
if self.actions.apply_session_filter() {
let actions = subscription.filter_session(&session, &self.cdata, &self.sdata);
self.actions.update(&actions);
}
if session_track || self.actions.session_track() {
self.sdata.track_session(session);
}
}
}
if self.actions.connection_matched() {
subscription.deliver_conn(&self.cdata, &self.sdata)
}
self.actions.clear();
}
pub(crate) fn clear_packets(&mut self) {
self.sdata.drain_packets();
}
pub(crate) fn clear_stale_data(&mut self, new_actions: &Actions) {
if self.actions.buffer_frame() && !new_actions.buffer_frame() && !self.actions.drop() {
self.clear_packets();
assert!(!new_actions.buffer_frame());
}
}
pub(crate) fn clear(&mut self) {
self.cdata.clear();
self.sdata.clear();
}
}