iris_core/conntrack/
mod.rs1pub mod conn;
8pub mod conn_id;
9pub mod pdu;
10mod timerwheel;
11
12#[cfg(test)]
13mod tests;
14
15pub use conn::conn_actions::{Actions, TrackedActions};
16pub use conn::conn_layers::Layer;
17pub use conn::conn_state::{LayerState, StateTransition, StateTxData};
18pub use conn::ConnInfo;
19
20use self::conn::{Conn, L4Conn};
21use self::conn_id::ConnId;
22use self::pdu::{L4Context, L4Pdu};
23use self::timerwheel::TimerWheel;
24use crate::config::ConnTrackConfig;
25use crate::lcore::CoreId;
26use crate::memory::mbuf::Mbuf;
27use crate::protocols::packet::tcp::TCP_PROTOCOL;
28use crate::protocols::packet::udp::UDP_PROTOCOL;
29use crate::protocols::stream::ParserRegistry;
30use crate::stats::{StatExt, TCP_NEW_CONNECTIONS, UDP_NEW_CONNECTIONS};
31use crate::subscription::{Subscription, Trackable};
32
33use std::cmp;
34use std::time::Instant;
35
36use anyhow::anyhow;
37use hashlink::linked_hash_map::{LinkedHashMap, RawEntryMut};
38
39#[doc(hidden)]
46pub struct ConnTracker<T>
47where
48 T: Trackable,
49{
50 config: TrackerConfig,
52 registry: ParserRegistry,
54 table: LinkedHashMap<ConnId, Conn<T>>,
56 timerwheel: TimerWheel,
58 core_id: CoreId,
60}
61
62impl<T> ConnTracker<T>
63where
64 T: Trackable,
65{
66 pub(crate) fn new(config: TrackerConfig, registry: ParserRegistry, core_id: CoreId) -> Self {
68 let table = LinkedHashMap::with_capacity(config.max_connections);
69 let timerwheel = TimerWheel::new(
70 cmp::max(config.tcp_inactivity_timeout, config.udp_inactivity_timeout),
71 config.timeout_resolution,
72 );
73 ConnTracker {
74 config,
75 registry,
76 table,
77 timerwheel,
78 core_id,
79 }
80 }
81
82 #[inline]
84 pub(crate) fn size(&self) -> usize {
85 self.table.len()
86 }
87
88 pub(crate) fn process(
90 &mut self,
91 mbuf: Mbuf,
92 ctxt: L4Context,
93 subscription: &Subscription<T::Subscribed>,
94 ) {
95 let conn_id = ConnId::new(ctxt.src, ctxt.dst, ctxt.proto);
96 match self.table.raw_entry_mut().from_key(&conn_id) {
97 RawEntryMut::Occupied(mut occupied) => {
98 let conn = occupied.get_mut();
99 conn.last_seen_ts = Instant::now();
100 if conn.remove_from_table() {
101 log::error!("Conn in Drop state when occupied in table");
102 return;
103 }
104 if conn.drop_pdu() {
105 return;
106 }
107 let dir = conn.packet_dir(&ctxt);
108 let pdu = L4Pdu::new(mbuf, ctxt, dir, conn.last_seen_ts);
109
110 conn.update(pdu, subscription, &self.registry);
112
113 if conn.remove_from_table() {
115 occupied.remove();
116 } else if conn.drop_pdu() {
117 conn.info.clear();
118 } else if conn.terminated() {
119 conn.terminate(subscription);
120 occupied.remove();
121 } else {
122 conn.inactivity_window = match &conn.l4conn {
124 L4Conn::Tcp(tcp) => tcp.inactivity_timeout(
125 self.config.tcp_inactivity_timeout,
126 self.config.tcp_reassembly_timeout,
127 ),
128 L4Conn::Udp(_) => self.config.udp_inactivity_timeout,
129 };
130 }
131 }
132 RawEntryMut::Vacant(_) => {
133 if self.size() < self.config.max_connections {
134 let mut pdu = L4Pdu::new(mbuf, ctxt, true, Instant::now());
135 let conn = match ctxt.proto {
136 TCP_PROTOCOL => Conn::<T>::new_tcp(
137 self.config.tcp_establish_timeout,
138 self.config.max_out_of_order,
139 &pdu,
140 self.core_id,
141 ),
142 UDP_PROTOCOL => Conn::<T>::new_udp(
143 self.config.udp_inactivity_timeout,
144 &pdu,
145 self.core_id,
146 ),
147 _ => Err(anyhow!("Invalid L4 Protocol")),
148 };
149 if let Ok(mut conn) = conn {
150 conn.info.filter_first_packet(subscription, &pdu);
151 if conn.info.needs_update() {
153 conn.info.new_packet(&pdu, subscription);
154 }
155 if conn.info.needs_reassembly() {
157 pdu.ctxt.reassembled = true;
158 conn.info
159 .consume_stream(&mut pdu, subscription, &self.registry);
160 }
161 if !conn.remove_from_table() {
162 self.timerwheel.insert(
163 &conn_id,
164 conn.last_seen_ts,
165 conn.inactivity_window,
166 );
167 self.table.insert(conn_id, conn);
168 match ctxt.proto {
169 TCP_PROTOCOL => {
170 TCP_NEW_CONNECTIONS.inc();
171 }
172 UDP_PROTOCOL => {
173 UDP_NEW_CONNECTIONS.inc();
174 }
175 _ => {}
176 }
177 }
178 }
179 } else {
180 log::error!("Table full. Dropping packet.");
181 }
182 }
183 }
184 }
185
186 pub(crate) fn drain(&mut self, subscription: &Subscription<T::Subscribed>) {
188 log::info!("Draining Connection table");
189 for (_, mut conn) in self.table.drain() {
190 conn.terminate(subscription);
191 }
192 }
193
194 pub(crate) fn check_inactive(
196 &mut self,
197 subscription: &Subscription<T::Subscribed>,
198 now: Instant,
199 ) {
200 self.timerwheel
201 .check_inactive(&mut self.table, subscription, now);
202 }
203
204 #[allow(dead_code)]
206 pub(crate) fn clear_registry(&mut self) {
207 self.registry = ParserRegistry::from_strings(vec![]);
208 }
209}
210
211#[derive(Debug)]
213pub(crate) struct TrackerConfig {
214 pub(super) max_connections: usize,
216 pub(super) max_out_of_order: usize,
218 pub(super) udp_inactivity_timeout: usize,
220 pub(super) tcp_inactivity_timeout: usize,
222 pub(super) tcp_establish_timeout: usize,
224 pub(super) tcp_reassembly_timeout: usize,
227 pub(super) timeout_resolution: usize,
229}
230
231impl From<&ConnTrackConfig> for TrackerConfig {
232 fn from(config: &ConnTrackConfig) -> Self {
233 TrackerConfig {
234 max_connections: config.max_connections,
235 max_out_of_order: config.max_out_of_order,
236 udp_inactivity_timeout: config.udp_inactivity_timeout,
237 tcp_inactivity_timeout: config.tcp_inactivity_timeout,
238 tcp_establish_timeout: config.tcp_establish_timeout,
239 tcp_reassembly_timeout: config.tcp_reassembly_timeout,
240 timeout_resolution: config.timeout_resolution,
241 }
242 }
243}