retina_core/subscription/
mod.rsuse crate::conntrack::pdu::{L4Context, L4Pdu};
use crate::conntrack::ConnTracker;
use crate::filter::*;
use crate::lcore::CoreId;
use crate::memory::mbuf::Mbuf;
use crate::protocols::packet::tcp::TCP_PROTOCOL;
use crate::protocols::packet::udp::UDP_PROTOCOL;
use crate::protocols::stream::{ConnData, ParserRegistry, Session};
use crate::stats::{StatExt, TCP_BYTE, TCP_PKT, UDP_BYTE, UDP_PKT};
#[cfg(feature = "timing")]
use crate::timing::timer::Timers;
pub trait Subscribable {
type Tracked: Trackable<Subscribed = Self>;
}
pub trait Trackable {
type Subscribed: Subscribable<Tracked = Self>;
fn new(first_pkt: &L4Pdu, core_id: CoreId) -> Self;
fn update(&mut self, pdu: &L4Pdu, reassembled: bool);
fn sessions(&self) -> &Vec<Session>;
fn track_session(&mut self, session: Session);
fn buffer_packet(&mut self, pdu: &L4Pdu, actions: &Actions, reassembled: bool);
fn packets(&self) -> &Vec<Mbuf>;
fn drain_tracked_packets(&mut self);
fn drain_cached_packets(&mut self);
fn core_id(&self) -> &CoreId;
fn parsers() -> ParserRegistry;
fn clear(&mut self);
}
pub struct Subscription<S>
where
S: Subscribable,
{
packet_continue: PacketContFn,
packet_filter: PacketFilterFn<S::Tracked>,
proto_filter: ProtoFilterFn<S::Tracked>,
session_filter: SessionFilterFn<S::Tracked>,
packet_deliver: PacketDeliverFn<S::Tracked>,
conn_deliver: ConnDeliverFn<S::Tracked>,
#[cfg(feature = "timing")]
pub(crate) timers: Timers,
}
impl<S> Subscription<S>
where
S: Subscribable,
{
pub fn new(factory: FilterFactory<S::Tracked>) -> Self {
Subscription {
packet_continue: factory.packet_continue,
packet_filter: factory.packet_filter,
proto_filter: factory.proto_filter,
session_filter: factory.session_filter,
packet_deliver: factory.packet_deliver,
conn_deliver: factory.conn_deliver,
#[cfg(feature = "timing")]
timers: Timers::new(),
}
}
pub fn process_packet(
&self,
mbuf: Mbuf,
conn_tracker: &mut ConnTracker<S::Tracked>,
actions: Actions,
) {
if actions.data.intersects(ActionData::PacketContinue) {
if let Ok(ctxt) = L4Context::new(&mbuf) {
match ctxt.proto {
TCP_PROTOCOL => {
TCP_PKT.inc();
TCP_BYTE.inc_by(mbuf.data_len() as u64);
}
UDP_PROTOCOL => {
UDP_PKT.inc();
UDP_BYTE.inc_by(mbuf.data_len() as u64);
}
_ => {}
}
conn_tracker.process(mbuf, ctxt, self);
}
}
}
pub fn continue_packet(&self, mbuf: &Mbuf, core_id: &CoreId) -> Actions {
(self.packet_continue)(mbuf, core_id)
}
pub fn filter_packet(&self, mbuf: &Mbuf, tracked: &S::Tracked) -> Actions {
(self.packet_filter)(mbuf, tracked)
}
pub fn filter_protocol(&self, conn: &ConnData, tracked: &S::Tracked) -> Actions {
(self.proto_filter)(conn, tracked)
}
pub fn filter_session(
&self,
session: &Session,
conn: &ConnData,
tracked: &S::Tracked,
) -> Actions {
(self.session_filter)(session, conn, tracked)
}
pub fn deliver_packet(&self, mbuf: &Mbuf, conn_data: &ConnData, tracked: &S::Tracked) {
(self.packet_deliver)(mbuf, conn_data, tracked)
}
pub fn deliver_conn(&self, conn_data: &ConnData, tracked: &S::Tracked) {
(self.conn_deliver)(conn_data, tracked)
}
}