1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
//! A sample connection record that provides various TCP and/or UDP connection
//! information, statistics, and state history. It does not deliver payload data.

use retina_core::conntrack::conn::tcp_conn::reassembly::wrapping_lt;
use retina_core::conntrack::conn_id::FiveTuple;
use retina_core::conntrack::pdu::L4Pdu;
use retina_core::protocols::packet::tcp::{ACK, FIN, RST, SYN};

use super::Tracked;

use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use std::time::{Duration, Instant};

use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;

/// Pure SYN
pub(crate) const HIST_SYN: u8 = b'S';
/// Pure SYNACK
pub(crate) const HIST_SYNACK: u8 = b'H';
/// Pure ACK (no payload)
pub(crate) const HIST_ACK: u8 = b'A';
/// Has non-zero payload length
pub(crate) const HIST_DATA: u8 = b'D';
/// Has FIN set
pub(crate) const HIST_FIN: u8 = b'F';
/// Has RST set
pub(crate) const HIST_RST: u8 = b'R';

impl ConnRecord {
    /// Returns the client (originator) socket address.
    #[inline]
    pub fn client(&self) -> SocketAddr {
        self.five_tuple.orig
    }

    /// Returns the server (responder) socket address.
    #[inline]
    pub fn server(&self) -> SocketAddr {
        self.five_tuple.resp
    }

    /// Returns the total number of packets observed in the connection.
    #[inline]
    pub fn total_pkts(&self) -> u64 {
        self.orig.nb_pkts + self.resp.nb_pkts
    }

    /// Returns the total number of payload bytes observed, excluding those from malformed packets.
    #[inline]
    pub fn total_bytes(&self) -> u64 {
        self.orig.nb_bytes + self.resp.nb_bytes
    }

    /// Returns the connection history.
    #[inline]
    pub fn history(&self) -> String {
        String::from_utf8_lossy(&self.history).into_owned()
    }

    /// Returns te duration of the connection.
    ///
    /// ## Remarks
    /// This does not represent the actual duration of the connection in offline analysis. It
    /// approximates the elapsed time between observation of the first and last observed packet in
    /// the connection.
    #[inline]
    pub fn duration(&self) -> Duration {
        if self.orig.nb_pkts + self.resp.nb_pkts == 1 {
            return Duration::default();
        }
        self.last_seen_ts - self.first_seen_ts
    }

    /// The duration (approximate) between the first and second packets.
    #[inline]
    pub fn time_to_second_packet(&self) -> Duration {
        if self.orig.nb_pkts + self.resp.nb_pkts == 1 {
            return Duration::default();
        }
        self.second_seen_ts - self.first_seen_ts
    }
}

impl Serialize for ConnRecord {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        let mut state = serializer.serialize_struct("ConnRecord", 6)?;
        state.serialize_field("five_tuple", &self.five_tuple)?;
        state.serialize_field("duration", &self.duration())?;
        state.serialize_field("time_to_second_pkt", &self.time_to_second_packet())?;
        state.serialize_field("max_inactivity", &self.max_inactivity)?;
        state.serialize_field("history", &self.history())?;
        state.serialize_field("orig", &self.orig)?;
        state.serialize_field("resp", &self.resp)?;
        state.end()
    }
}

impl fmt::Display for ConnRecord {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}: {}", self.five_tuple, self.history())?;
        Ok(())
    }
}

/// Tracks a connection record throughout its lifetime.
///
/// ## Note
/// Internal connection state is an associated type of a `pub` trait, and therefore must also be
/// public. Documentation is hidden by default to avoid confusing users.
#[derive(Debug)]
pub struct ConnRecord {
    /// The connection 5-tuple.
    pub five_tuple: FiveTuple,
    /// Timestamp of the first packet.
    ///
    /// ## Remarks
    /// This represents the time Retina observed the first packet in the connection, and does not
    /// reflect timestamps read from a packet capture in offline analysis.
    pub first_seen_ts: Instant,
    /// Timestamp of the second packet (approximate).
    pub second_seen_ts: Instant,
    /// Timestamp of the last packet (approximate).
    pub last_seen_ts: Instant,
    /// Maximum duration of inactivity (the maximum time between observed segments).
    pub max_inactivity: Duration,
    // Connection history.
    ///
    /// This represents a summary of the connection history in the order the packets were observed,
    /// with letters encoded as a vector of bytes. This is a simplified version of [state history in
    /// Zeek](https://docs.zeek.org/en/v5.0.0/scripts/base/protocols/conn/main.zeek.html), and the
    /// meanings of each letter are similar: If the event comes from the originator, the letter is
    /// uppercase; if the event comes from the responder, the letter is lowercase.
    /// - S: a pure SYN with only the SYN bit set (may have payload)
    /// - H: a pure SYNACK with only the SYN and ACK bits set (may have payload)
    /// - A: a pure ACK with only the ACK bit set and no payload
    /// - D: segment contains non-zero payload length
    /// - F: the segment has the FIN bit set (may have other flags and/or payload)
    /// - R: segment has the RST bit set (may have other flags and/or payload)
    ///
    /// Each letter is recorded a maximum of once in either direction.
    pub history: Vec<u8>,
    /// Originator flow.
    pub orig: Flow,
    /// Responder flow.
    pub resp: Flow,
}

#[inline]
pub(crate) fn update_history(history: &mut Vec<u8>, segment: &L4Pdu, mask: u8) {
    fn insert(history: &mut Vec<u8>, event: u8) {
        if !history.contains(&event) {
            history.push(event);
        }
    }
    if segment.flags() == SYN {
        insert(history, HIST_SYN ^ mask);
    } else if segment.flags() == (SYN | ACK) {
        insert(history, HIST_SYNACK ^ mask);
    } else if segment.flags() == ACK && segment.length() == 0 {
        insert(history, HIST_ACK ^ mask);
    }

    if segment.flags() & FIN != 0 {
        insert(history, HIST_FIN ^ mask);
    }
    if segment.flags() & RST != 0 {
        insert(history, HIST_RST ^ mask);
    }
    if segment.length() > 0 {
        insert(history, HIST_DATA ^ mask);
    }
}

impl ConnRecord {
    #[inline]
    fn update_data(&mut self, segment: &L4Pdu) {
        let now = Instant::now();
        let inactivity = now - self.last_seen_ts;
        if inactivity > self.max_inactivity {
            self.max_inactivity = inactivity;
        }
        self.last_seen_ts = now;

        if segment.dir {
            update_history(&mut self.history, segment, 0x0);
            // TODO need a separate `update` for `update_owned`
            // Cloning segment is a non-starter.
            self.orig.insert_segment(segment);
        } else {
            update_history(&mut self.history, segment, 0x20);
            self.resp.insert_segment(segment);
        }

        if self.orig.nb_pkts + self.resp.nb_pkts == 2 {
            self.second_seen_ts = now;
        }
    }
}

impl Tracked for ConnRecord {
    fn new(first_pkt: &L4Pdu) -> Self {
        let five_tuple = FiveTuple::from_ctxt(first_pkt.ctxt);
        let now = Instant::now();
        Self {
            five_tuple,
            first_seen_ts: now,
            second_seen_ts: now,
            last_seen_ts: now,
            max_inactivity: Duration::default(),
            history: Vec::with_capacity(16),
            orig: Flow::new(),
            resp: Flow::new(),
        }
    }

    // Clear the more memory-intensive data structures
    fn clear(&mut self) {
        self.orig.chunks = Vec::with_capacity(0);
        self.orig.gaps = HashMap::with_capacity(0);
        self.resp.chunks = Vec::with_capacity(0);
        self.resp.gaps = HashMap::with_capacity(0);
        self.history = Vec::with_capacity(0);
    }

    fn update(&mut self, pdu: &L4Pdu, reassembled: bool) {
        if !reassembled {
            self.update_data(pdu);
        }
    }

    fn stream_protocols() -> Vec<&'static str> {
        vec![]
    }
}

/// Default value for maximum chunk capacity.
const DEFAULT_CHUNK_CAPACITY: usize = 100;

/// A uni-directional flow.
#[derive(Debug, Clone, Serialize)]
pub struct Flow {
    /// Number of packets seen for this flow, including malformed and late start segments.
    ///
    /// - Malformed segments are defined as those that have a payload offset (start of the payload,
    ///   as computed from the header length field) beyond the end of the packet buffer, or the end
    ///   of the payload exceeds the end of the packet buffer.
    /// - Late start segments are those that arrive after the first packet seen in the flow, but
    ///   have an earlier sequence number. Only applies to TCP flows.
    pub nb_pkts: u64,
    /// Number of malformed packets.
    pub nb_malformed_pkts: u64,
    /// Number of late start packets.
    pub nb_late_start_pkts: u64,
    /// Number of payload bytes observed in the flow. Does not include bytes from malformed
    /// segments.
    pub nb_bytes: u64,
    /// Maximum number of simultaneous content gaps.
    ///
    /// A content gap is a "hole" in the TCP sequence number, indicated re-ordered or missing
    /// packets. Only applies to TCP flows.
    pub max_simult_gaps: u64,
    /// Starting sequence number of the first byte in the first payload (ISN + 1). Only applies to
    /// TCP flows, and is set to `0` for UDP.
    pub data_start: u32,
    /// Maximum chunk capacity (the maximum number of simultaneous gaps + 1). Only applies to TCP
    /// flows. Used to prevent resizing `chunks` vector.
    pub capacity: usize,
    /// The set of non-overlapping content intervals. Only applies to TCP flows.
    pub chunks: Vec<Chunk>,
    /// Maps relative sequence number of a content gap to the number of packets observed before it
    /// is filled. Only applies to TCP flows.
    pub gaps: HashMap<u32, u64>,
}

impl Flow {
    fn new() -> Self {
        Flow {
            nb_pkts: 0,
            nb_malformed_pkts: 0,
            nb_late_start_pkts: 0,
            nb_bytes: 0,
            max_simult_gaps: 0,
            data_start: 0,
            capacity: DEFAULT_CHUNK_CAPACITY,
            chunks: Vec::with_capacity(DEFAULT_CHUNK_CAPACITY),
            gaps: HashMap::new(),
        }
    }

    #[inline]
    fn insert_segment(&mut self, segment: &L4Pdu) {
        self.nb_pkts += 1;

        if segment.offset() > segment.mbuf.data_len()
            || (segment.offset() + segment.length()) > segment.mbuf.data_len()
        {
            self.nb_malformed_pkts += 1;
            return;
        }
        self.nb_bytes += segment.length() as u64;

        let seq_no = if segment.flags() & SYN != 0 {
            segment.seq_no().wrapping_add(1)
        } else {
            segment.seq_no()
        };

        if self.chunks.is_empty() {
            self.data_start = seq_no;
        }

        if wrapping_lt(seq_no, self.data_start) {
            self.nb_late_start_pkts += 1;
            return;
        }

        if self.chunks.len() < self.capacity {
            let seg_start = seq_no.wrapping_sub(self.data_start);
            let seg_end = seg_start + segment.length() as u32;

            self.merge_chunk(Chunk(seg_start, seg_end));
        }
    }

    /// Insert `chunk` into flow, merging intervals as necessary. Flow `chunks` are a sorted set of
    /// non-overlapping intervals.
    #[inline]
    fn merge_chunk(&mut self, chunk: Chunk) {
        let mut start = chunk.0;
        let mut end = chunk.1;

        let mut result = vec![];
        let mut inserted = false;
        for chunk in self.chunks.iter() {
            if inserted || start > chunk.1 {
                result.push(*chunk);
            } else if end < chunk.0 {
                inserted = true;
                result.push(Chunk(start, end));
                result.push(*chunk);
            } else {
                start = std::cmp::min(start, chunk.0);
                end = std::cmp::max(end, chunk.1);
            }
        }
        if !inserted {
            result.push(Chunk(start, end));
        }

        for chunk in result[..result.len() - 1].iter() {
            *self.gaps.entry(chunk.1).or_insert(0) += 1;
        }

        if result.len().saturating_sub(1) as u64 > self.max_simult_gaps {
            self.max_simult_gaps += 1;
        }
        self.chunks = result;
    }

    /// Returns the number of content gaps at the connection end.
    ///
    /// This is not the total number of content gaps ever observed, rather, it represents the total
    /// number of gaps remaining in the final state of the connection.
    #[inline]
    pub fn content_gaps(&self) -> u64 {
        self.chunks.len().saturating_sub(1) as u64
    }

    /// Number of bytes missed in content gaps at connection end.
    ///
    /// This is not the total size of all content gaps ever observed, rather, it represents the
    /// total number of missing bytes in the final state of the connection.
    #[inline]
    pub fn missed_bytes(&self) -> u64 {
        self.chunks.windows(2).map(|w| w[1].0 - w[0].1).sum::<u32>() as u64
    }

    /// Returns the mean number of packet arrivals before a content gap is filled, or `0` if there
    /// were no gaps.
    #[inline]
    pub fn mean_pkts_to_fill(&self) -> Option<f64> {
        if self.gaps.is_empty() {
            return None;
        }
        let mut sum = 0;
        for val in self.gaps.values() {
            sum += *val;
        }
        Some(sum as f64 / self.gaps.len() as f64)
    }

    /// Returns the median number of packet arrivals before a content gap is filled, or `0` if there
    /// were no gaps.
    #[inline]
    pub fn median_pkts_to_fill(&self) -> Option<u64> {
        if self.gaps.is_empty() {
            return None;
        }
        let mut values = self.gaps.values().collect::<Vec<_>>();
        values.sort();
        let mid = values.len() / 2;
        Some(*values[mid])
    }
}

/// Start (inclusive) and end (exclusive) interval of contiguous TCP payload bytes.
#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Serialize)]
pub struct Chunk(u32, u32);

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn core_merge_chunk_fill_single() {
        let mut flow = Flow::new();
        flow.chunks = vec![Chunk(0, 3), Chunk(4, 5)];
        flow.merge_chunk(Chunk(3, 4));
        assert_eq!(flow.chunks, vec![Chunk(0, 5)]);
    }

    #[test]
    fn core_merge_chunk_fill_multiple() {
        let mut flow = Flow::new();
        flow.chunks = vec![Chunk(0, 3), Chunk(4, 5), Chunk(8, 10)];
        flow.merge_chunk(Chunk(2, 12));
        assert_eq!(flow.chunks, vec![Chunk(0, 12)]);
    }

    #[test]
    fn core_merge_chunk_create_hole() {
        let mut flow = Flow::new();
        flow.chunks = vec![Chunk(0, 3), Chunk(8, 10)];
        flow.merge_chunk(Chunk(4, 5));
        assert_eq!(flow.chunks, vec![Chunk(0, 3), Chunk(4, 5), Chunk(8, 10)]);
    }

    #[test]
    fn core_merge_chunk_fill_overlap() {
        let mut flow = Flow::new();
        flow.chunks = vec![Chunk(0, 3), Chunk(8, 10)];
        flow.merge_chunk(Chunk(5, 9));
        assert_eq!(flow.chunks, vec![Chunk(0, 3), Chunk(5, 10)]);
    }

    #[test]
    fn core_merge_chunk_start() {
        let mut flow = Flow::new();
        flow.chunks = vec![Chunk(4, 6), Chunk(8, 10)];
        flow.merge_chunk(Chunk(0, 2));
        assert_eq!(flow.chunks, vec![Chunk(0, 2), Chunk(4, 6), Chunk(8, 10)]);
    }

    #[test]
    fn core_merge_chunk_end() {
        let mut flow = Flow::new();
        flow.chunks = vec![Chunk(4, 6), Chunk(8, 10)];
        flow.merge_chunk(Chunk(11, 15));
        assert_eq!(flow.chunks, vec![Chunk(4, 6), Chunk(8, 10), Chunk(11, 15)]);
    }
}