Skip to main content

iris_core/conntrack/conn/
conn_info.rs

1#![doc(hidden)]
2/// Per-connection data structure for subscription management.
3use super::conn_actions::TrackedActions;
4use crate::lcore::CoreId;
5use crate::protocols::packet::tcp::TCP_PROTOCOL;
6use crate::protocols::stream::{ConnData, ParserRegistry};
7use crate::subscription::{Subscription, Trackable};
8use crate::FiveTuple;
9use crate::L4Pdu;
10
11use super::{conn_layers::*, conn_state::*};
12
13/// Per-connection. Tracks all subscription-requested
14/// datatypes (`tracked` data). Maintains the State of the connection
15/// at each layer, including the `Actions` to execute when new
16/// packets are received.
17/// This must be public in order to be accessible by generated filter
18/// and update code.
19#[derive(Debug)]
20pub struct ConnInfo<T>
21where
22    T: Trackable,
23{
24    /// Actions and state from the perspective of L4 (TCP or UDP).
25    /// Valid states are Payload or None.
26    pub linfo: LayerInfo,
27    /// Connection five-tuple for filtering and determining directionality
28    /// of future packets.
29    pub cdata: ConnData,
30    /// Additional Layers that the L4 conn. should pass
31    /// data to.
32    pub layers: [Layer; NUM_LAYERS],
33    /// Subscription data (for delivering)
34    pub tracked: T,
35}
36
37impl<T> ConnInfo<T>
38where
39    T: Trackable,
40{
41    pub(super) fn new(pdu: &L4Pdu, core_id: CoreId) -> Self {
42        let five_tuple = FiveTuple::from_ctxt(&pdu.ctxt);
43        ConnInfo {
44            linfo: LayerInfo {
45                state: if pdu.ctxt.proto == TCP_PROTOCOL {
46                    LayerState::Headers // Pre-TCP handshake
47                } else {
48                    LayerState::Payload
49                },
50                actions: TrackedActions::new(),
51            },
52            cdata: ConnData::new(five_tuple),
53            layers: [Layer::L7(L7Session::new())],
54            tracked: T::new(pdu, core_id),
55        }
56    }
57
58    /// Initializes actions at all layers when first packet
59    /// in L4 connection is observed.
60    pub(crate) fn filter_first_packet(
61        &mut self,
62        subscription: &Subscription<T::Subscribed>,
63        pdu: &L4Pdu,
64    ) {
65        subscription.state_tx::<T>(self, &StateTransition::L4FirstPacket, Some(pdu));
66    }
67
68    /// Update tracked data when new packet is observed.
69    /// This is invoked up to twice:
70    /// - InL4Conn (pre-reassembly, if applicable) by `conn`
71    /// - InL4Stream (post-reassembly, if applicable) by `consume_stream`
72    pub(crate) fn new_packet(&mut self, pdu: &L4Pdu, subscription: &Subscription<T::Subscribed>) {
73        let mut needs_update = self.linfo.actions.needs_update();
74        let tx = if pdu.ctxt.reassembled {
75            needs_update = self.linfo.actions.needs_parse();
76            StateTransition::InL4Stream
77        } else {
78            StateTransition::InL4Conn
79        };
80        if needs_update && subscription.update(self, pdu, tx) {
81            self.exec_state_tx(tx, subscription);
82        }
83    }
84
85    /// Invoked by reassembly infrastructure when the TCP handshake is completed.
86    pub(super) fn handshake_done(&mut self, subscription: &Subscription<T::Subscribed>) {
87        self.linfo.state = LayerState::Payload;
88        self.exec_state_tx(StateTransition::L4EndHshk, subscription);
89    }
90
91    /// Invoked by transport layer to update data for encapsulated layers.
92    /// This is invoked in reassembled order for TCP and received order for UDP.
93    pub(crate) fn consume_stream(
94        &mut self,
95        pdu: &mut L4Pdu,
96        subscription: &Subscription<T::Subscribed>,
97        registry: &ParserRegistry,
98    ) {
99        // Pass to next layer(s) if applicable for parsing
100        if self.layers[0].needs_stream() {
101            let tx = self.layers[0].process_stream(pdu, registry);
102            self.exec_state_tx(tx, subscription);
103            if self.layers[0].needs_process(tx, pdu) {
104                let tx = self.layers[0].process_stream(pdu, registry);
105                self.exec_state_tx(tx, subscription);
106            }
107        }
108
109        // Update tracked data post-reassembly if needed
110        self.new_packet(pdu, subscription);
111    }
112
113    /// Drop the connection, e.g. due to timeout
114    pub(crate) fn exec_drop(&mut self) {
115        self.linfo.state = LayerState::None
116    }
117
118    /// Returns true if the connection should be dropped
119    pub(crate) fn drop(&self) -> bool {
120        self.linfo.state == LayerState::None
121    }
122
123    /// Invoked when the connection has terminated (by timeout or TCP FIN/ACK sequence)
124    /// Delivers any "end of connection" data.
125    pub(crate) fn handle_terminate(&mut self, subscription: &Subscription<T::Subscribed>) {
126        while let Some(tx) = self.layers[0].handle_terminate() {
127            self.exec_state_tx(tx, subscription);
128            if self.drop() {
129                break;
130            }
131        }
132        if !self.drop() {
133            self.exec_state_tx(StateTransition::L4Terminated, subscription);
134        }
135    }
136
137    /// Update subscription data and current state, including actions,
138    /// upon state transition.
139    fn exec_state_tx(&mut self, tx: StateTransition, subscription: &Subscription<T::Subscribed>) {
140        // Packet is "no-op"; FirstPacket is handled separately
141        if matches!(tx, StateTransition::Packet | StateTransition::L4FirstPacket) {
142            return;
143        }
144
145        // Nothing to do at all layers
146        if self.linfo.actions.skip_tx(&tx)
147            && self
148                .layers
149                .iter()
150                .all(|l| l.layer_info().actions.skip_tx(&tx))
151        {
152            return;
153        }
154        self.linfo.actions.start_state_tx(tx);
155        for layer in self.layers.iter_mut() {
156            layer.layer_info_mut().actions.start_state_tx(tx);
157        }
158        subscription.state_tx::<T>(self, &tx, None);
159        for layer in &mut self.layers {
160            layer.end_state_tx();
161        }
162        if self.linfo.drop() && self.layers.iter().all(|l| l.drop()) {
163            self.exec_drop();
164        } else {
165            if self.layers.iter().any(|l| !l.drop()) {
166                self.linfo.actions.set_next_layer();
167            }
168        }
169    }
170
171    pub(crate) fn clear(&mut self) {
172        self.tracked.clear();
173    }
174
175    pub(crate) fn needs_reassembly(&self) -> bool {
176        self.linfo.actions.needs_parse() || self.layers.iter().any(|l| l.needs_stream())
177    }
178
179    pub(crate) fn needs_update(&self) -> bool {
180        self.linfo.actions.needs_update()
181    }
182}