retina_core/subscription/
connection_frame.rs1use crate::conntrack::conn_id::FiveTuple;
28use crate::conntrack::pdu::{L4Context, L4Pdu};
29use crate::conntrack::ConnTracker;
30use crate::filter::FilterResult;
31use crate::memory::mbuf::Mbuf;
32use crate::protocols::stream::{ConnParser, Session};
33use crate::subscription::{Level, Subscribable, Subscription, Trackable};
34
35use std::net::SocketAddr;
36
37#[derive(Debug, Clone)]
39pub struct ConnectionFrame {
40 pub five_tuple: FiveTuple,
41 pub data: Vec<u8>,
42}
43
44impl ConnectionFrame {
45 pub(crate) fn new(five_tuple: FiveTuple, mbuf: &Mbuf) -> Self {
47 ConnectionFrame {
48 five_tuple,
49 data: mbuf.data().to_vec(),
50 }
51 }
52
53 #[inline]
55 pub fn client(&self) -> SocketAddr {
56 self.five_tuple.orig
57 }
58
59 #[inline]
61 pub fn server(&self) -> SocketAddr {
62 self.five_tuple.resp
63 }
64}
65
66impl Subscribable for ConnectionFrame {
67 type Tracked = TrackedConnectionFrame;
68
69 fn level() -> Level {
70 Level::Connection
71 }
72
73 fn parsers() -> Vec<ConnParser> {
74 vec![]
75 }
76
77 fn process_packet(
78 mbuf: Mbuf,
79 subscription: &Subscription<Self>,
80 conn_tracker: &mut ConnTracker<Self::Tracked>,
81 ) {
82 match subscription.filter_packet(&mbuf) {
83 FilterResult::MatchTerminal(idx) | FilterResult::MatchNonTerminal(idx) => {
84 if let Ok(ctxt) = L4Context::new(&mbuf, idx) {
85 conn_tracker.process(mbuf, ctxt, subscription);
86 }
87 }
88 FilterResult::NoMatch => drop(mbuf),
89 }
90 }
91}
92
93#[doc(hidden)]
99pub struct TrackedConnectionFrame {
100 five_tuple: FiveTuple,
102 buf: Vec<ConnectionFrame>,
104}
105
106impl Trackable for TrackedConnectionFrame {
107 type Subscribed = ConnectionFrame;
108
109 fn new(five_tuple: FiveTuple) -> Self {
110 TrackedConnectionFrame {
111 five_tuple,
112 buf: vec![],
113 }
114 }
115
116 fn pre_match(&mut self, pdu: L4Pdu, _session_id: Option<usize>) {
117 self.buf
118 .push(ConnectionFrame::new(self.five_tuple, pdu.mbuf_ref()));
119 }
120
121 fn on_match(&mut self, _session: Session, subscription: &Subscription<Self::Subscribed>) {
122 self.buf.drain(..).for_each(|frame| {
123 subscription.invoke(frame);
124 });
125 }
126
127 fn post_match(&mut self, pdu: L4Pdu, subscription: &Subscription<Self::Subscribed>) {
128 subscription.invoke(ConnectionFrame::new(self.five_tuple, pdu.mbuf_ref()));
129 }
130
131 fn on_terminate(&mut self, subscription: &Subscription<Self::Subscribed>) {
132 self.buf.drain(..).for_each(|frame| {
133 subscription.invoke(frame);
134 });
135 }
136}