Skip to main content

retina_core/subscription/
connection_frame.rs

1//! Connection packet stream.
2//!
3//! This is a connection-level subscription that provides a stream of raw Ethernet frames associated
4//! with connections that satisfy the subscription filter in the order of arrival. The callback is
5//! invoked once per frame.
6//!
7//! ## Example
8//! Prints raw packet data from TLS connections on TCP/443 with subdomains of `google.com`:
9//! ```
10//! #[filter("tcp.port = 443 and tls.sni ~ 'google\\.com$'")]
11//! fn main() {
12//!     let config = default_config();
13//!     let cb = |frame: ConnectionFrame| {
14//!         println!("{:?}", frame.data);
15//!     };
16//!     let mut runtime = Runtime::new(config, filter, cb).unwrap();
17//!     runtime.run();
18//! }
19//! ```
20//!
21//! ## Remarks
22//! The first few packets in the connection may be delivered in sequence order if the subscription's
23//! filter requires Retina to reassemble the stream. Once the filter is satisfied, all remaining
24//! packets in the connection are delivered in the order of observation.
25// TODO: find a workaround for this, perhaps timestamping all packets by default.
26
27use crate::conntrack::conn_id::FiveTuple;
28use crate::conntrack::pdu::{L4Context, L4Pdu};
29use crate::conntrack::ConnTracker;
30use crate::filter::FilterResult;
31use crate::memory::mbuf::Mbuf;
32use crate::protocols::stream::{ConnParser, Session};
33use crate::subscription::{Level, Subscribable, Subscription, Trackable};
34
35use std::net::SocketAddr;
36
37/// Ethernet frames in a TCP or UDP connection.
38#[derive(Debug, Clone)]
39pub struct ConnectionFrame {
40    pub five_tuple: FiveTuple,
41    pub data: Vec<u8>,
42}
43
44impl ConnectionFrame {
45    /// Creates a new `ConnectionFrame`.
46    pub(crate) fn new(five_tuple: FiveTuple, mbuf: &Mbuf) -> Self {
47        ConnectionFrame {
48            five_tuple,
49            data: mbuf.data().to_vec(),
50        }
51    }
52
53    /// Returns the associated connection originator's socket address.
54    #[inline]
55    pub fn client(&self) -> SocketAddr {
56        self.five_tuple.orig
57    }
58
59    /// Returns the associated connection responder's socket address.
60    #[inline]
61    pub fn server(&self) -> SocketAddr {
62        self.five_tuple.resp
63    }
64}
65
66impl Subscribable for ConnectionFrame {
67    type Tracked = TrackedConnectionFrame;
68
69    fn level() -> Level {
70        Level::Connection
71    }
72
73    fn parsers() -> Vec<ConnParser> {
74        vec![]
75    }
76
77    fn process_packet(
78        mbuf: Mbuf,
79        subscription: &Subscription<Self>,
80        conn_tracker: &mut ConnTracker<Self::Tracked>,
81    ) {
82        match subscription.filter_packet(&mbuf) {
83            FilterResult::MatchTerminal(idx) | FilterResult::MatchNonTerminal(idx) => {
84                if let Ok(ctxt) = L4Context::new(&mbuf, idx) {
85                    conn_tracker.process(mbuf, ctxt, subscription);
86                }
87            }
88            FilterResult::NoMatch => drop(mbuf),
89        }
90    }
91}
92
93/// Tracks connection frames throughout the duration of the connection lifetime.
94///
95/// ## Note
96/// Internal connection state is an associated type of a `pub` trait, and therefore must also be
97/// public. Documentation is hidden by default to avoid confusing users.
98#[doc(hidden)]
99pub struct TrackedConnectionFrame {
100    /// Connection 5-tuple.
101    five_tuple: FiveTuple,
102    /// Buffers packets in the connection prior to a filter match.
103    buf: Vec<ConnectionFrame>,
104}
105
106impl Trackable for TrackedConnectionFrame {
107    type Subscribed = ConnectionFrame;
108
109    fn new(five_tuple: FiveTuple) -> Self {
110        TrackedConnectionFrame {
111            five_tuple,
112            buf: vec![],
113        }
114    }
115
116    fn pre_match(&mut self, pdu: L4Pdu, _session_id: Option<usize>) {
117        self.buf
118            .push(ConnectionFrame::new(self.five_tuple, pdu.mbuf_ref()));
119    }
120
121    fn on_match(&mut self, _session: Session, subscription: &Subscription<Self::Subscribed>) {
122        self.buf.drain(..).for_each(|frame| {
123            subscription.invoke(frame);
124        });
125    }
126
127    fn post_match(&mut self, pdu: L4Pdu, subscription: &Subscription<Self::Subscribed>) {
128        subscription.invoke(ConnectionFrame::new(self.five_tuple, pdu.mbuf_ref()));
129    }
130
131    fn on_terminate(&mut self, subscription: &Subscription<Self::Subscribed>) {
132        self.buf.drain(..).for_each(|frame| {
133            subscription.invoke(frame);
134        });
135    }
136}