Skip to main content

iris_core/conntrack/
mod.rs

1//! Per-core connection state management.
2//!
3//! Most of this module's functionality is maintained internally by Iris and is not meant to be
4//! directly managed by users. However, it publicly exposes some useful connection identifiers for
5//! convenience.
6
7pub mod conn;
8pub mod conn_id;
9pub mod pdu;
10mod timerwheel;
11
12#[cfg(test)]
13mod tests;
14
15pub use conn::conn_actions::{Actions, TrackedActions};
16pub use conn::conn_layers::Layer;
17pub use conn::conn_state::{LayerState, StateTransition, StateTxData};
18pub use conn::ConnInfo;
19
20use self::conn::{Conn, L4Conn};
21use self::conn_id::ConnId;
22use self::pdu::{L4Context, L4Pdu};
23use self::timerwheel::TimerWheel;
24use crate::config::ConnTrackConfig;
25use crate::lcore::CoreId;
26use crate::memory::mbuf::Mbuf;
27use crate::protocols::packet::tcp::TCP_PROTOCOL;
28use crate::protocols::packet::udp::UDP_PROTOCOL;
29use crate::protocols::stream::ParserRegistry;
30use crate::stats::{StatExt, TCP_NEW_CONNECTIONS, UDP_NEW_CONNECTIONS};
31use crate::subscription::{Subscription, Trackable};
32
33use std::cmp;
34use std::time::Instant;
35
36use anyhow::anyhow;
37use hashlink::linked_hash_map::{LinkedHashMap, RawEntryMut};
38
39/// Manages state for all TCP and UDP connections.
40///
41/// One `ConnTracker` is maintained per core. `ConnTracker` is not meant to be directly managed by
42/// users, but can be configured at runtime with a maximum capacity, out-of-order tolerance,
43/// different timeout values, and other options. See
44/// [ConnTrackConfig](crate::config::ConnTrackConfig) for details.
45#[doc(hidden)]
46pub struct ConnTracker<T>
47where
48    T: Trackable,
49{
50    /// Configuration
51    config: TrackerConfig,
52    /// Contains required protocol parsers for `T`.
53    registry: ParserRegistry,
54    /// Manages `ConnId` to `Conn<T>` mappings.
55    table: LinkedHashMap<ConnId, Conn<T>>,
56    /// Manages connection timeouts.
57    timerwheel: TimerWheel,
58    /// ID of the core that the table is assigned to.
59    core_id: CoreId,
60}
61
62impl<T> ConnTracker<T>
63where
64    T: Trackable,
65{
66    /// Creates a new `ConnTracker`.
67    pub(crate) fn new(config: TrackerConfig, registry: ParserRegistry, core_id: CoreId) -> Self {
68        let table = LinkedHashMap::with_capacity(config.max_connections);
69        let timerwheel = TimerWheel::new(
70            cmp::max(config.tcp_inactivity_timeout, config.udp_inactivity_timeout),
71            config.timeout_resolution,
72        );
73        ConnTracker {
74            config,
75            registry,
76            table,
77            timerwheel,
78            core_id,
79        }
80    }
81
82    /// Returns the number of entries in the table.
83    #[inline]
84    pub(crate) fn size(&self) -> usize {
85        self.table.len()
86    }
87
88    /// Process a single incoming packet `mbuf` with layer-4 context `ctxt`.
89    pub(crate) fn process(
90        &mut self,
91        mbuf: Mbuf,
92        ctxt: L4Context,
93        subscription: &Subscription<T::Subscribed>,
94    ) {
95        let conn_id = ConnId::new(ctxt.src, ctxt.dst, ctxt.proto);
96        match self.table.raw_entry_mut().from_key(&conn_id) {
97            RawEntryMut::Occupied(mut occupied) => {
98                let conn = occupied.get_mut();
99                conn.last_seen_ts = Instant::now();
100                if conn.remove_from_table() {
101                    log::error!("Conn in Drop state when occupied in table");
102                    return;
103                }
104                if conn.drop_pdu() {
105                    return;
106                }
107                let dir = conn.packet_dir(&ctxt);
108                let pdu = L4Pdu::new(mbuf, ctxt, dir, conn.last_seen_ts);
109
110                // Consume PDU for update, reassembly, and/or parsing
111                conn.update(pdu, subscription, &self.registry);
112
113                // Delete stale data for connections no longer matching
114                if conn.remove_from_table() {
115                    occupied.remove();
116                } else if conn.drop_pdu() {
117                    conn.info.clear();
118                } else if conn.terminated() {
119                    conn.terminate(subscription);
120                    occupied.remove();
121                } else {
122                    // Update inactivity timeout
123                    conn.inactivity_window = match &conn.l4conn {
124                        L4Conn::Tcp(tcp) => tcp.inactivity_timeout(
125                            self.config.tcp_inactivity_timeout,
126                            self.config.tcp_reassembly_timeout,
127                        ),
128                        L4Conn::Udp(_) => self.config.udp_inactivity_timeout,
129                    };
130                }
131            }
132            RawEntryMut::Vacant(_) => {
133                if self.size() < self.config.max_connections {
134                    let mut pdu = L4Pdu::new(mbuf, ctxt, true, Instant::now());
135                    let conn = match ctxt.proto {
136                        TCP_PROTOCOL => Conn::<T>::new_tcp(
137                            self.config.tcp_establish_timeout,
138                            self.config.max_out_of_order,
139                            &pdu,
140                            self.core_id,
141                        ),
142                        UDP_PROTOCOL => Conn::<T>::new_udp(
143                            self.config.udp_inactivity_timeout,
144                            &pdu,
145                            self.core_id,
146                        ),
147                        _ => Err(anyhow!("Invalid L4 Protocol")),
148                    };
149                    if let Ok(mut conn) = conn {
150                        conn.info.filter_first_packet(subscription, &pdu);
151                        // Pre-reassembly update
152                        if conn.info.needs_update() {
153                            conn.info.new_packet(&pdu, subscription);
154                        }
155                        // Post-reassembly update
156                        if conn.info.needs_reassembly() {
157                            pdu.ctxt.reassembled = true;
158                            conn.info
159                                .consume_stream(&mut pdu, subscription, &self.registry);
160                        }
161                        if !conn.remove_from_table() {
162                            self.timerwheel.insert(
163                                &conn_id,
164                                conn.last_seen_ts,
165                                conn.inactivity_window,
166                            );
167                            self.table.insert(conn_id, conn);
168                            match ctxt.proto {
169                                TCP_PROTOCOL => {
170                                    TCP_NEW_CONNECTIONS.inc();
171                                }
172                                UDP_PROTOCOL => {
173                                    UDP_NEW_CONNECTIONS.inc();
174                                }
175                                _ => {}
176                            }
177                        }
178                    }
179                } else {
180                    log::error!("Table full. Dropping packet.");
181                }
182            }
183        }
184    }
185
186    /// Drains any remaining connections that satisfy the filter on runtime termination.
187    pub(crate) fn drain(&mut self, subscription: &Subscription<T::Subscribed>) {
188        log::info!("Draining Connection table");
189        for (_, mut conn) in self.table.drain() {
190            conn.terminate(subscription);
191        }
192    }
193
194    /// Checks for and removes inactive connections.
195    pub(crate) fn check_inactive(
196        &mut self,
197        subscription: &Subscription<T::Subscribed>,
198        now: Instant,
199    ) {
200        self.timerwheel
201            .check_inactive(&mut self.table, subscription, now);
202    }
203
204    /// Clears the parser registry. Used in testing.
205    #[allow(dead_code)]
206    pub(crate) fn clear_registry(&mut self) {
207        self.registry = ParserRegistry::from_strings(vec![]);
208    }
209}
210
211/// Configurable options for a `ConnTracker`.
212#[derive(Debug)]
213pub(crate) struct TrackerConfig {
214    /// Maximum number of connections that can be tracked per-core.
215    pub(super) max_connections: usize,
216    /// Maximum number of out-of-order packets allowed per TCP connection.
217    pub(super) max_out_of_order: usize,
218    /// Time to expire inactive UDP connections (in milliseconds).
219    pub(super) udp_inactivity_timeout: usize,
220    /// Time to expire inactive TCP connections (in milliseconds).
221    pub(super) tcp_inactivity_timeout: usize,
222    /// Time to expire unestablished TCP connections (in milliseconds).
223    pub(super) tcp_establish_timeout: usize,
224    /// Time to expire TCP connections that have received termination flags
225    /// but have not yet been removed due to out-of-order packets (in milliseconds).
226    pub(super) tcp_reassembly_timeout: usize,
227    /// Frequency to check for inactive streams (in milliseconds).
228    pub(super) timeout_resolution: usize,
229}
230
231impl From<&ConnTrackConfig> for TrackerConfig {
232    fn from(config: &ConnTrackConfig) -> Self {
233        TrackerConfig {
234            max_connections: config.max_connections,
235            max_out_of_order: config.max_out_of_order,
236            udp_inactivity_timeout: config.udp_inactivity_timeout,
237            tcp_inactivity_timeout: config.tcp_inactivity_timeout,
238            tcp_establish_timeout: config.tcp_establish_timeout,
239            tcp_reassembly_timeout: config.tcp_reassembly_timeout,
240            timeout_resolution: config.timeout_resolution,
241        }
242    }
243}