Skip to main content

retina_core/subscription/
connection.rs

1//! Connection records.
2//!
3//! This is a connection-level subscription that provides TCP and/or UDP connection information,
4//! statistics, and state history. It does not deliver payload data.
5//!
6//!
7//! ## Example
8//! Logs TCP/22 and TCP/23 connection records to a file:
9//! ```
10//! #[filter("tcp.port = 80 or tcp.port = 443")]
11//! fn main() {
12//!     let config = default_config();
13//!     let file = Mutex::new(File::create("conn.jsonl").unwrap());
14//!     let cb = |conn: Connection| {
15//!         let mut wtr = file.lock().unwrap();
16//!         jsonl::write(&mut *wtr, &conn).unwrap();
17//!     };
18//!     let mut runtime = Runtime::new(config, filter, cb).unwrap();
19//!     runtime.run();
20//! }
21//! ```
22
23use crate::conntrack::conn::tcp_conn::reassembly::wrapping_lt;
24use crate::conntrack::conn_id::FiveTuple;
25use crate::conntrack::pdu::{L4Context, L4Pdu};
26use crate::conntrack::ConnTracker;
27use crate::filter::FilterResult;
28use crate::memory::mbuf::Mbuf;
29use crate::protocols::packet::tcp::{ACK, FIN, RST, SYN};
30use crate::protocols::stream::{ConnParser, Session};
31use crate::subscription::{Level, Subscribable, Subscription, Trackable};
32
33use serde::ser::{SerializeStruct, Serializer};
34use serde::Serialize;
35
36use std::collections::HashMap;
37use std::fmt;
38use std::net::SocketAddr;
39use std::time::{Duration, Instant};
40
41/// Pure SYN
42const HIST_SYN: u8 = b'S';
43/// Pure SYNACK
44const HIST_SYNACK: u8 = b'H';
45/// Pure ACK (no payload)
46const HIST_ACK: u8 = b'A';
47/// Has non-zero payload length
48const HIST_DATA: u8 = b'D';
49/// Has FIN set
50const HIST_FIN: u8 = b'F';
51/// Has RST set
52const HIST_RST: u8 = b'R';
53
54/// A connection record.
55///
56/// This subscribable type returns general information regarding TCP and UDP connections but does
57/// does not track payload data. If applicable, Retina internally manages stream reassembly. All
58/// connections are interpreted using flow semantics.
59#[derive(Debug)]
60pub struct Connection {
61    /// The connection 5-tuple.
62    pub five_tuple: FiveTuple,
63    /// Timestamp of the first packet.
64    ///
65    /// ## Remarks
66    /// This represents the time Retina observed the first packet in the connection, and does not
67    /// reflect timestamps read from a packet capture in offline analysis.
68    // TODO: embed a hardware timestamp in the Mbuf itself.
69    pub ts: Instant,
70    /// The duration of the connection.
71    ///
72    /// ## Remarks
73    /// This does not represent the actual duration of the connection in offline analysis. It
74    /// approximates the elapsed time between observation of the first and last observed packet in
75    /// the connection.
76    pub duration: Duration,
77    /// Maximum duration of inactivity (the maximum time between observed segments).
78    pub max_inactivity: Duration,
79    /// The duration between the first and second packets.
80    pub time_to_second_packet: Duration,
81    /// Connection history.
82    ///
83    /// This represents a summary of the connection history in the order the packets were observed,
84    /// with letters encoded as a vector of bytes. This is a simplified version of [state history in
85    /// Zeek](https://docs.zeek.org/en/v5.0.0/scripts/base/protocols/conn/main.zeek.html), and the
86    /// meanings of each letter are similar: If the event comes from the originator, the letter is
87    /// uppercase; if the event comes from the responder, the letter is lowercase.
88    /// - S: a pure SYN with only the SYN bit set (may have payload)
89    /// - H: a pure SYNACK with only the SYN and ACK bits set (may have payload)
90    /// - A: a pure ACK with only the ACK bit set and no payload
91    /// - D: segment contains non-zero payload length
92    /// - F: the segment has the FIN bit set (may have other flags and/or payload)
93    /// - R: segment has the RST bit set (may have other flags and/or payload)
94    ///
95    /// Each letter is recorded a maximum of once in either direction.
96    pub history: Vec<u8>,
97    /// Originator flow.
98    pub orig: Flow,
99    /// Responder flow.
100    pub resp: Flow,
101}
102
103impl Connection {
104    /// Returns the client (originator) socket address.
105    #[inline]
106    pub fn client(&self) -> SocketAddr {
107        self.five_tuple.orig
108    }
109
110    /// Returns the server (responder) socket address.
111    #[inline]
112    pub fn server(&self) -> SocketAddr {
113        self.five_tuple.resp
114    }
115
116    /// Returns the total number of packets observed in the connection.
117    #[inline]
118    pub fn total_pkts(&self) -> u64 {
119        self.orig.nb_pkts + self.resp.nb_pkts
120    }
121
122    /// Returns the total number of payload bytes observed, excluding those from malformed packets.
123    #[inline]
124    pub fn total_bytes(&self) -> u64 {
125        self.orig.nb_bytes + self.resp.nb_bytes
126    }
127
128    /// Returns the connection history.
129    #[inline]
130    pub fn history(&self) -> String {
131        String::from_utf8_lossy(&self.history).into_owned()
132    }
133}
134
135impl Serialize for Connection {
136    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
137    where
138        S: Serializer,
139    {
140        let mut state = serializer.serialize_struct("Connection", 6)?;
141        state.serialize_field("five_tuple", &self.five_tuple)?;
142        state.serialize_field("duration", &self.duration)?;
143        state.serialize_field("max_inactivity", &self.max_inactivity)?;
144        state.serialize_field("history", &self.history())?;
145        state.serialize_field("orig", &self.orig)?;
146        state.serialize_field("resp", &self.resp)?;
147        state.end()
148    }
149}
150
151impl fmt::Display for Connection {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        write!(f, "{}: {}", self.five_tuple, self.history())?;
154        Ok(())
155    }
156}
157
158impl Subscribable for Connection {
159    type Tracked = TrackedConnection;
160
161    fn level() -> Level {
162        Level::Connection
163    }
164
165    // TODO: return a vector of all known parsers.
166    fn parsers() -> Vec<ConnParser> {
167        vec![]
168    }
169
170    fn process_packet(
171        mbuf: Mbuf,
172        subscription: &Subscription<Self>,
173        conn_tracker: &mut ConnTracker<Self::Tracked>,
174    ) {
175        match subscription.filter_packet(&mbuf) {
176            FilterResult::MatchTerminal(idx) | FilterResult::MatchNonTerminal(idx) => {
177                if let Ok(ctxt) = L4Context::new(&mbuf, idx) {
178                    conn_tracker.process(mbuf, ctxt, subscription);
179                }
180            }
181            FilterResult::NoMatch => drop(mbuf),
182        }
183    }
184}
185
186/// Tracks a connection record throughout its lifetime.
187///
188/// ## Note
189/// Internal connection state is an associated type of a `pub` trait, and therefore must also be
190/// public. Documentation is hidden by default to avoid confusing users.
191#[doc(hidden)]
192pub struct TrackedConnection {
193    five_tuple: FiveTuple,
194    first_seen_ts: Instant,
195    second_seen_ts: Instant,
196    last_seen_ts: Instant,
197    max_inactivity: Duration,
198    history: Vec<u8>,
199    ctos: Flow,
200    stoc: Flow,
201}
202
203impl TrackedConnection {
204    #[inline]
205    fn update(&mut self, segment: L4Pdu) {
206        let now = Instant::now();
207        let inactivity = now - self.last_seen_ts;
208        if inactivity > self.max_inactivity {
209            self.max_inactivity = inactivity;
210        }
211        self.last_seen_ts = now;
212
213        if segment.dir {
214            self.update_history(&segment, 0x0);
215            self.ctos.insert_segment(segment);
216        } else {
217            self.update_history(&segment, 0x20);
218            self.stoc.insert_segment(segment);
219        }
220
221        if self.ctos.nb_pkts + self.stoc.nb_pkts == 2 {
222            self.second_seen_ts = now;
223        }
224    }
225
226    #[inline]
227    fn update_history(&mut self, segment: &L4Pdu, mask: u8) {
228        fn insert(history: &mut Vec<u8>, event: u8) {
229            if !history.contains(&event) {
230                history.push(event);
231            }
232        }
233        if segment.flags() == SYN {
234            insert(&mut self.history, HIST_SYN ^ mask);
235        } else if segment.flags() == (SYN | ACK) {
236            insert(&mut self.history, HIST_SYNACK ^ mask);
237        } else if segment.flags() == ACK && segment.length() == 0 {
238            insert(&mut self.history, HIST_ACK ^ mask);
239        }
240
241        if segment.flags() & FIN != 0 {
242            insert(&mut self.history, HIST_FIN ^ mask);
243        }
244        if segment.flags() & RST != 0 {
245            insert(&mut self.history, HIST_RST ^ mask);
246        }
247        if segment.length() > 0 {
248            insert(&mut self.history, HIST_DATA ^ mask);
249        }
250    }
251}
252
253impl Trackable for TrackedConnection {
254    type Subscribed = Connection;
255
256    fn new(five_tuple: FiveTuple) -> Self {
257        let now = Instant::now();
258        TrackedConnection {
259            five_tuple,
260            first_seen_ts: now,
261            second_seen_ts: now,
262            last_seen_ts: now,
263            max_inactivity: Duration::default(),
264            history: Vec::with_capacity(16),
265            ctos: Flow::new(),
266            stoc: Flow::new(),
267        }
268    }
269
270    fn pre_match(&mut self, pdu: L4Pdu, _session_id: Option<usize>) {
271        self.update(pdu);
272    }
273
274    fn on_match(&mut self, _session: Session, _subscription: &Subscription<Self::Subscribed>) {
275        // do nothing, should stay tracked
276    }
277
278    fn post_match(&mut self, pdu: L4Pdu, _subscription: &Subscription<Self::Subscribed>) {
279        self.update(pdu)
280    }
281
282    fn on_terminate(&mut self, subscription: &Subscription<Self::Subscribed>) {
283        let (duration, max_inactivity, time_to_second_packet) =
284            if self.ctos.nb_pkts + self.stoc.nb_pkts == 1 {
285                (
286                    Duration::default(),
287                    Duration::default(),
288                    Duration::default(),
289                )
290            } else {
291                (
292                    self.last_seen_ts - self.first_seen_ts,
293                    self.max_inactivity,
294                    self.second_seen_ts - self.first_seen_ts,
295                )
296            };
297
298        let conn = Connection {
299            five_tuple: self.five_tuple,
300            ts: self.first_seen_ts,
301            duration,
302            max_inactivity,
303            time_to_second_packet,
304            history: self.history.clone(),
305            orig: self.ctos.clone(),
306            resp: self.stoc.clone(),
307        };
308        subscription.invoke(conn);
309    }
310}
311
312/// A uni-directional flow.
313#[derive(Debug, Clone, Serialize)]
314pub struct Flow {
315    /// Number of packets seen for this flow, including malformed and late start segments.
316    ///
317    /// - Malformed segments are defined as those that have a payload offset (start of the payload,
318    ///   as computed from the header length field) beyond the end of the packet buffer, or the end
319    ///   of the payload exceeds the end of the packet buffer.
320    /// - Late start segments are those that arrive after the first packet seen in the flow, but
321    ///   have an earlier sequence number. Only applies to TCP flows.
322    pub nb_pkts: u64,
323    /// Number of malformed packets.
324    pub nb_malformed_pkts: u64,
325    /// Number of late start packets.
326    pub nb_late_start_pkts: u64,
327    /// Number of payload bytes observed in the flow. Does not include bytes from malformed
328    /// segments.
329    pub nb_bytes: u64,
330    /// Maximum number of simultaneous content gaps.
331    ///
332    /// A content gap is a "hole" in the TCP sequence number, indicated re-ordered or missing
333    /// packets. Only applies to TCP flows.
334    pub max_simult_gaps: u64,
335    /// Starting sequence number of the first byte in the first payload (ISN + 1). Only applies to
336    /// TCP flows, and is set to `0` for UDP.
337    pub data_start: u32,
338    /// Maximum chunk capacity (the maximum number of simultaneous gaps + 1). Only applies to TCP
339    /// flows.
340    pub capacity: usize,
341    /// The set of non-overlapping content intervals. Only applies to TCP flows.
342    pub chunks: Vec<Chunk>,
343    /// Maps relative sequence number of a content gap to the number of packets observed before it
344    /// is filled. Only applies to TCP flows.
345    pub gaps: HashMap<u32, u64>,
346}
347
348impl Flow {
349    fn new() -> Self {
350        Flow {
351            nb_pkts: 0,
352            nb_malformed_pkts: 0,
353            nb_late_start_pkts: 0,
354            nb_bytes: 0,
355            max_simult_gaps: 0,
356            data_start: 0,
357            capacity: 100, // temp hardcode for now
358            chunks: Vec::with_capacity(100),
359            gaps: HashMap::new(),
360        }
361    }
362
363    #[inline]
364    fn insert_segment(&mut self, segment: L4Pdu) {
365        self.nb_pkts += 1;
366
367        if segment.offset() > segment.mbuf.data_len()
368            || (segment.offset() + segment.length()) > segment.mbuf.data_len()
369        {
370            self.nb_malformed_pkts += 1;
371            return;
372        }
373        self.nb_bytes += segment.length() as u64;
374
375        let seq_no = if segment.flags() & SYN != 0 {
376            segment.seq_no().wrapping_add(1)
377        } else {
378            segment.seq_no()
379        };
380
381        if self.chunks.is_empty() {
382            self.data_start = seq_no;
383        }
384
385        if wrapping_lt(seq_no, self.data_start) {
386            self.nb_late_start_pkts += 1;
387            return;
388        }
389
390        if self.chunks.len() < self.capacity {
391            let seg_start = seq_no.wrapping_sub(self.data_start);
392            let seg_end = seg_start + segment.length() as u32;
393
394            self.merge_chunk(Chunk(seg_start, seg_end));
395        }
396    }
397
398    /// Insert `chunk` into flow, merging intervals as necessary. Flow `chunks` are a sorted set of
399    /// non-overlapping intervals.
400    #[inline]
401    fn merge_chunk(&mut self, chunk: Chunk) {
402        let mut start = chunk.0;
403        let mut end = chunk.1;
404
405        let mut result = vec![];
406        let mut inserted = false;
407        for chunk in self.chunks.iter() {
408            if inserted || start > chunk.1 {
409                result.push(*chunk);
410            } else if end < chunk.0 {
411                inserted = true;
412                result.push(Chunk(start, end));
413                result.push(*chunk);
414            } else {
415                start = std::cmp::min(start, chunk.0);
416                end = std::cmp::max(end, chunk.1);
417            }
418        }
419        if !inserted {
420            result.push(Chunk(start, end));
421        }
422
423        for chunk in result[..result.len() - 1].iter() {
424            *self.gaps.entry(chunk.1).or_insert(0) += 1;
425        }
426
427        if result.len().saturating_sub(1) as u64 > self.max_simult_gaps {
428            self.max_simult_gaps += 1;
429        }
430        self.chunks = result;
431    }
432
433    /// Returns the number of content gaps at the connection end.
434    ///
435    /// This is not the total number of content gaps ever observed, rather, it represents the total
436    /// number of gaps remaining in the final state of the connection.
437    #[inline]
438    pub fn content_gaps(&self) -> u64 {
439        self.chunks.len().saturating_sub(1) as u64
440    }
441
442    /// Number of bytes missed in content gaps at connection end.
443    ///
444    /// This is not the total size of all content gaps ever observed, rather, it represents the
445    /// total number of missing bytes in the final state of the connection.
446    #[inline]
447    pub fn missed_bytes(&self) -> u64 {
448        self.chunks.windows(2).map(|w| w[1].0 - w[0].1).sum::<u32>() as u64
449    }
450
451    /// Returns the mean number of packet arrivals before a content gap is filled, or `0` if there
452    /// were no gaps.
453    #[inline]
454    pub fn mean_pkts_to_fill(&self) -> Option<f64> {
455        if self.gaps.is_empty() {
456            return None;
457        }
458        let mut sum = 0;
459        for val in self.gaps.values() {
460            sum += *val;
461        }
462        Some(sum as f64 / self.gaps.len() as f64)
463    }
464
465    /// Returns the median number of packet arrivals before a content gap is filled, or `0` if there
466    /// were no gaps.
467    #[inline]
468    pub fn median_pkts_to_fill(&self) -> Option<u64> {
469        if self.gaps.is_empty() {
470            return None;
471        }
472        let mut values = self.gaps.values().collect::<Vec<_>>();
473        values.sort();
474        let mid = values.len() / 2;
475        Some(*values[mid])
476    }
477}
478
479/// Start (inclusive) and end (exclusive) interval of contiguous TCP payload bytes.
480#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Serialize)]
481pub struct Chunk(u32, u32);
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[test]
488    fn core_merge_chunk_fill_single() {
489        let mut flow = Flow::new();
490        flow.chunks = vec![Chunk(0, 3), Chunk(4, 5)];
491        flow.merge_chunk(Chunk(3, 4));
492        assert_eq!(flow.chunks, vec![Chunk(0, 5)]);
493    }
494
495    #[test]
496    fn core_merge_chunk_fill_multiple() {
497        let mut flow = Flow::new();
498        flow.chunks = vec![Chunk(0, 3), Chunk(4, 5), Chunk(8, 10)];
499        flow.merge_chunk(Chunk(2, 12));
500        assert_eq!(flow.chunks, vec![Chunk(0, 12)]);
501    }
502
503    #[test]
504    fn core_merge_chunk_create_hole() {
505        let mut flow = Flow::new();
506        flow.chunks = vec![Chunk(0, 3), Chunk(8, 10)];
507        flow.merge_chunk(Chunk(4, 5));
508        assert_eq!(flow.chunks, vec![Chunk(0, 3), Chunk(4, 5), Chunk(8, 10)]);
509    }
510
511    #[test]
512    fn core_merge_chunk_fill_overlap() {
513        let mut flow = Flow::new();
514        flow.chunks = vec![Chunk(0, 3), Chunk(8, 10)];
515        flow.merge_chunk(Chunk(5, 9));
516        assert_eq!(flow.chunks, vec![Chunk(0, 3), Chunk(5, 10)]);
517    }
518
519    #[test]
520    fn core_merge_chunk_start() {
521        let mut flow = Flow::new();
522        flow.chunks = vec![Chunk(4, 6), Chunk(8, 10)];
523        flow.merge_chunk(Chunk(0, 2));
524        assert_eq!(flow.chunks, vec![Chunk(0, 2), Chunk(4, 6), Chunk(8, 10)]);
525    }
526
527    #[test]
528    fn core_merge_chunk_end() {
529        let mut flow = Flow::new();
530        flow.chunks = vec![Chunk(4, 6), Chunk(8, 10)];
531        flow.merge_chunk(Chunk(11, 15));
532        assert_eq!(flow.chunks, vec![Chunk(4, 6), Chunk(8, 10), Chunk(11, 15)]);
533    }
534}