Skip to main content

iris_core/conntrack/conn/
conn_layers.rs

1#![doc(hidden)]
2/// Additional traffic layers built on top of the L4 base transport layer.
3use super::conn_actions::TrackedActions;
4use super::conn_state::{LayerState, StateTransition};
5use crate::conntrack::Actions;
6use crate::protocols::stream::{
7    ConnParser, ParseResult, ParserRegistry, ParsingState, ProbeRegistryResult, SessionData,
8    SessionProto,
9};
10use crate::protocols::Session;
11use crate::L4Pdu;
12
13lazy_static! {
14    static ref DEFAULT_SESSION: Session = Session {
15        data: SessionData::Null,
16        id: 0,
17    };
18}
19
20/// "Layers" that can be built on top of the transport layer (L4).
21/// Each associated datatype must implement LayerInfo API (see below)
22#[derive(Debug)]
23pub enum Layer {
24    /// L6/L7 Session
25    L7(L7Session),
26}
27pub const NUM_LAYERS: usize = 1;
28
29/// Convenience enum to be used at compile-time.
30/// Should correspond to the transport layer plus `Layer` variants.
31#[derive(PartialEq, Eq, Debug, Copy, Clone, Ord, PartialOrd, Hash)]
32#[repr(usize)]
33pub enum SupportedLayer {
34    L4,
35    L7,
36}
37
38/// Trait implemented for each Layer variant
39pub(crate) trait TrackableLayer {
40    /// Ingest the next packet in the stream (reassembled, if TCP).
41    /// Returns State transition(s) triggered.
42    /// If multiple state transitions are triggered, the "Streaming" (InX)
43    /// should be returned first. This will invoke methods on `T` based
44    /// on the Layer and current State.
45    fn process_stream(&mut self, pdu: &mut L4Pdu, registry: &ParserRegistry) -> StateTransition;
46
47    /// Should be checked directly after a state transition to see
48    /// if process_stream needs to be called again.
49    /// For example, the packet used to "discover" a protocol will
50    /// also be part of its header.
51    fn needs_process(&self, tx: StateTransition, pdu: &L4Pdu) -> bool;
52
53    /// No actions are active and, if applicable, no sub-layers have
54    /// active actions.
55    fn drop(&self) -> bool;
56
57    /// "Consume_stream" must be called by the transport layer.
58    /// TCP reassemly is expected if applicable.
59    fn needs_stream(&self) -> bool;
60
61    /// Used to remove any actions that are invalid at this layer.
62    /// For example: an L4 Update may trigger an L7 "parse" action, which
63    /// would be invalid once in payload if another session is not expected.
64    fn end_state_tx(&mut self);
65
66    /// Indicate that the connection has terminated.
67    /// Should be invoked repeatedly until it returns None
68    fn handle_terminate(&mut self) -> Option<StateTransition>;
69}
70
71impl Layer {
72    /// Accessors for LayerInfo
73    pub fn layer_info_mut(&mut self) -> &mut LayerInfo {
74        match self {
75            Layer::L7(session) => &mut session.linfo,
76        }
77    }
78
79    pub fn layer_info(&self) -> &LayerInfo {
80        match self {
81            Layer::L7(session) => &session.linfo,
82        }
83    }
84
85    /// Push an action
86    pub fn extend_actions(&mut self, action: &TrackedActions) {
87        self.layer_info_mut().actions.extend(action)
88    }
89
90    /// Accessors
91    pub fn last_session(&self) -> &Session {
92        match self {
93            Layer::L7(session) => match session.sessions.last() {
94                Some(s) => s,
95                None => &DEFAULT_SESSION,
96            },
97        }
98    }
99
100    pub fn drain_sessions(&mut self) -> Vec<Session> {
101        match self {
102            Layer::L7(session) => session.parser.drain_sessions(),
103        }
104    }
105
106    pub fn first_session(&self) -> &Session {
107        match self {
108            Layer::L7(session) => match session.sessions.first() {
109                Some(s) => s,
110                None => &DEFAULT_SESSION,
111            },
112        }
113    }
114
115    pub fn sessions(&self) -> &Vec<Session> {
116        match self {
117            Layer::L7(session) => &session.sessions,
118        }
119    }
120
121    pub fn last_protocol(&self) -> SessionProto {
122        match self {
123            Layer::L7(session) => session.get_protocol(),
124        }
125    }
126}
127
128impl TrackableLayer for Layer {
129    fn process_stream(&mut self, pdu: &mut L4Pdu, registry: &ParserRegistry) -> StateTransition {
130        match self {
131            Layer::L7(session) => session.process_stream(pdu, registry),
132        }
133    }
134
135    fn needs_process(&self, tx: StateTransition, pdu: &L4Pdu) -> bool {
136        match self {
137            Layer::L7(session) => session.needs_process(tx, pdu),
138        }
139    }
140
141    fn drop(&self) -> bool {
142        match self {
143            Layer::L7(session) => session.drop(),
144        }
145    }
146
147    fn needs_stream(&self) -> bool {
148        match self {
149            Layer::L7(session) => session.needs_stream(),
150        }
151    }
152
153    fn end_state_tx(&mut self) {
154        match self {
155            Layer::L7(session) => session.end_state_tx(),
156        }
157    }
158
159    fn handle_terminate(&mut self) -> Option<StateTransition> {
160        match self {
161            Layer::L7(session) => session.handle_terminate(),
162        }
163    }
164}
165
166/// Stored for each Layer
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct LayerInfo {
169    pub state: LayerState,
170    pub actions: TrackedActions,
171}
172
173impl Default for LayerInfo {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179impl LayerInfo {
180    pub fn new() -> Self {
181        Self {
182            state: LayerState::Discovery,
183            actions: TrackedActions::new(),
184        }
185    }
186
187    pub(crate) fn drop(&self) -> bool {
188        self.state == LayerState::None || self.actions.drop()
189    }
190}
191
192/// L6/L7 parsing infrastructure
193#[derive(Debug)]
194pub struct L7Session {
195    /// Layer management
196    pub linfo: LayerInfo,
197    /// Stateful protocol parser (once identified, or None)
198    pub parser: ConnParser,
199    /// Parsed sessions, if applicable
200    pub sessions: Vec<Session>,
201    /// Sessions seen on terminate that are not fully parsed
202    pub pending_sessions: Vec<Session>,
203    // Further encapsulated layers could go here.
204}
205
206impl L7Session {
207    /// Initialize infrastructure for probing, parsing, and tracking
208    /// L6/L7 (application-layer) sessions.
209    pub fn new() -> Self {
210        Self {
211            linfo: LayerInfo::new(),
212            parser: ConnParser::Unknown,
213            sessions: Vec::new(),
214            pending_sessions: Vec::new(),
215        }
216    }
217
218    /// Accessor for Protocol
219    pub fn get_protocol(&self) -> SessionProto {
220        match self.linfo.state {
221            LayerState::Discovery => SessionProto::Probing,
222            _ => self.parser.protocol(),
223        }
224    }
225}
226
227// Clippy #new_without_default warning for pub types
228impl Default for L7Session {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234impl TrackableLayer for L7Session {
235    fn end_state_tx(&mut self) {
236        // Nothing to parse if in payload and no more sessions expected
237        if self.linfo.actions.needs_parse()
238            && matches!(self.linfo.state, LayerState::Payload)
239            && !matches!(
240                self.parser.session_parsed_state(),
241                ParsingState::Parsing | ParsingState::Probing
242            )
243        {
244            self.linfo.actions.clear(&Actions::Parse);
245        }
246    }
247
248    fn needs_process(&self, tx: StateTransition, pdu: &L4Pdu) -> bool {
249        if self.linfo.state == LayerState::None {
250            return false;
251        }
252        (tx == StateTransition::L7OnDisc && pdu.length() > 0)
253            || (tx == StateTransition::L7EndHdrs && pdu.ctxt.app_offset.is_some())
254    }
255
256    fn drop(&self) -> bool {
257        self.linfo.drop()
258    }
259
260    fn needs_stream(&self) -> bool {
261        self.linfo.actions.needs_parse()
262    }
263
264    /// If some subscription is waiting for sessions, drain
265    /// pending (not yet fully parsed) sessions from the parser.
266    /// Move these sessions one-by-one to `self.sessions` until
267    /// none are left. This should be invoked until it returns None.
268    fn handle_terminate(&mut self) -> Option<StateTransition> {
269        if !self.linfo.actions.needs_parse() {
270            return None;
271        }
272        if matches!(self.linfo.state, LayerState::None) {
273            return None;
274        }
275        // Discovery failed
276        if matches!(self.linfo.state, LayerState::Discovery) {
277            return Some(StateTransition::L7OnDisc);
278        }
279        self.pending_sessions.extend(self.parser.drain_sessions());
280        // Parsing failed
281        if self.pending_sessions.is_empty() {
282            return Some(StateTransition::L7EndHdrs);
283        }
284        // New session ready
285        self.sessions.push(self.pending_sessions.pop().unwrap());
286        Some(StateTransition::L7EndHdrs)
287    }
288
289    fn process_stream(&mut self, pdu: &mut L4Pdu, registry: &ParserRegistry) -> StateTransition {
290        match self.linfo.state {
291            LayerState::Discovery => {
292                match registry.probe_all(pdu) {
293                    ProbeRegistryResult::Some(conn_parser) => {
294                        // Application-layer protocol known
295                        self.parser = conn_parser;
296                        self.linfo.state = LayerState::Headers;
297                        return StateTransition::L7OnDisc;
298                    }
299                    ProbeRegistryResult::None => {
300                        // All relevant parsers have failed to match
301                        self.linfo.state = LayerState::None;
302                        return StateTransition::L7OnDisc;
303                    }
304                    ProbeRegistryResult::Unsure => { /* skip */ }
305                }
306            }
307            LayerState::Headers => {
308                match self.parser.parse(pdu) {
309                    ParseResult::HeadersDone(id) => {
310                        if let Some(session) = self.parser.remove_session(id) {
311                            self.sessions.push(session);
312                        }
313                        if let Some(offset) = self.parser.body_offset() {
314                            pdu.ctxt.app_offset = Some(offset);
315                        }
316                        self.linfo.state = LayerState::Payload;
317                        return StateTransition::L7EndHdrs;
318                    }
319                    ParseResult::None => {
320                        self.linfo.state = LayerState::None;
321                        return StateTransition::L7EndHdrs;
322                    }
323                    ParseResult::Done(id) => {
324                        if let Some(session) = self.parser.remove_session(id) {
325                            self.sessions.push(session);
326                        }
327                        self.linfo.state = LayerState::None;
328                        return StateTransition::L7EndHdrs;
329                    }
330                    _ => { /* continue */ }
331                }
332            }
333            LayerState::Payload => {
334                pdu.ctxt.app_offset = Some(0);
335                if self.linfo.actions.needs_parse() {
336                    match self.parser.session_parsed_state() {
337                        ParsingState::Probing => {
338                            // TODO unimplemented: nested sessions
339                        }
340                        ParsingState::Parsing => {
341                            // TODO unimplemented: pipelined sessions
342                        }
343                        _ => {}
344                    }
345                }
346                // TODO - add API for parser to consume payload
347                // if applicable and return when session is "done"
348            }
349            LayerState::None => {
350                // Do nothing
351            }
352        }
353        StateTransition::Packet
354    }
355}