1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
//! Per-connection state management.
//!
//! Tracks a TCP or UDP connection, performs stream reassembly, and (via ConnInfo)
//! manages protocol parser state throughout the duration of the connection.

pub mod conn_info;
pub mod tcp_conn;
pub mod udp_conn;

use self::conn_info::ConnInfo;
use self::tcp_conn::TcpConn;
use self::udp_conn::UdpConn;
use crate::conntrack::conn_id::FiveTuple;
use crate::conntrack::pdu::{L4Context, L4Pdu};
use crate::lcore::CoreId;
use crate::protocols::packet::tcp::{ACK, RST, SYN};
use crate::protocols::stream::ParserRegistry;
use crate::subscription::{Subscription, Trackable};

use anyhow::{bail, Result};
use std::time::Instant;

/// Tracks either a TCP or a UDP connection.
///
/// Performs light-weight stream reassembly for TCP connections and tracks UDP connections.
pub(crate) enum L4Conn {
    Tcp(TcpConn),
    Udp(UdpConn),
}

/// Connection state.
pub(crate) struct Conn<T>
where
    T: Trackable,
{
    /// Timestamp of the last observed packet in the connection.
    pub(crate) last_seen_ts: Instant,
    /// Amount of time (in milliseconds) before the connection should be expired for inactivity.
    pub(crate) inactivity_window: usize,
    /// Layer-4 connection tracking.
    pub(crate) l4conn: L4Conn,
    /// Connection tracking for filtering and parsing.
    pub(crate) info: ConnInfo<T>,
}

impl<T> Conn<T>
where
    T: Trackable,
{
    /// Creates a new TCP connection from `ctxt` with an initial inactivity window of
    /// `initial_timeout` and a maximum out-or-order tolerance of `max_ooo`. This means that there
    /// can be at most `max_ooo` packets buffered out of sequence before Retina chooses to discard
    /// the connection.
    pub(super) fn new_tcp(
        initial_timeout: usize,
        max_ooo: usize,
        pdu: &L4Pdu,
        core_id: CoreId,
    ) -> Result<Self> {
        let tcp_conn = if pdu.ctxt.flags & SYN != 0
            && pdu.ctxt.flags & ACK == 0
            && pdu.ctxt.flags & RST == 0
        {
            TcpConn::new_on_syn(pdu.ctxt, max_ooo)
        } else {
            bail!("Not SYN")
        };
        Ok(Conn {
            last_seen_ts: Instant::now(),
            inactivity_window: initial_timeout,
            l4conn: L4Conn::Tcp(tcp_conn),
            info: ConnInfo::new(pdu, core_id),
        })
    }

    /// Creates a new UDP connection from `ctxt` with an initial inactivity window of
    /// `initial_timeout`.
    #[allow(clippy::unnecessary_wraps)]
    pub(super) fn new_udp(initial_timeout: usize, pdu: &L4Pdu, core_id: CoreId) -> Result<Self> {
        let udp_conn = UdpConn;
        Ok(Conn {
            last_seen_ts: Instant::now(),
            inactivity_window: initial_timeout,
            l4conn: L4Conn::Udp(udp_conn),
            info: ConnInfo::new(pdu, core_id),
        })
    }

    /// Updates a connection on the arrival of a new packet.
    pub(super) fn update(
        &mut self,
        pdu: L4Pdu,
        subscription: &Subscription<T::Subscribed>,
        registry: &ParserRegistry,
    ) {
        match &mut self.l4conn {
            L4Conn::Tcp(tcp_conn) => {
                tcp_conn.reassemble(pdu, &mut self.info, subscription, registry);
                // Check if, after actions update, the framework/subscriptions no longer require
                // receiving reassembled traffic
                if !self.info.actions.reassemble() {
                    // Safe to discard out-of-order buffers
                    if tcp_conn.ctos.ooo_buf.len() != 0 {
                        tcp_conn.ctos.ooo_buf.buf.clear();
                    }
                    if tcp_conn.stoc.ooo_buf.len() != 0 {
                        tcp_conn.stoc.ooo_buf.buf.clear();
                    }
                }
            }
            L4Conn::Udp(_udp_conn) => self.info.consume_pdu(pdu, subscription, registry),
        }
    }

    /// Updates flags
    #[inline]
    pub(super) fn update_tcp_flags(&mut self, flags: u8, dir: bool) {
        if let L4Conn::Tcp(tcp_conn) = &mut self.l4conn {
            tcp_conn.update_flags(flags, dir);
        }
    }

    /// Returns the connection 5-tuple.
    pub(super) fn five_tuple(&self) -> FiveTuple {
        self.info.cdata.five_tuple
    }

    /// Returns `true` if the connection should be removed from the conn. table.
    /// Note UDP connections are kept for a buffer period. UDP packets
    /// that pass the packet filter stage are assumed to represent an
    /// existing or new connection and are inserted into the connection
    /// table. Keeping UDP connections in "drop" state for a buffer
    /// period prevents dropped connections from being re-inserted.
    pub(super) fn remove_from_table(&self) -> bool {
        match &self.l4conn {
            L4Conn::Udp(_) => false,
            _ => self.info.actions.drop(),
        }
    }

    /// Returns `true` if PDUs for this connection should be dropped.
    pub(super) fn drop_pdu(&self) -> bool {
        self.info.actions.drop()
    }

    /// Returns `true` if the connection has been naturally terminated.
    pub(super) fn terminated(&self) -> bool {
        match &self.l4conn {
            L4Conn::Tcp(tcp_conn) => tcp_conn.is_terminated(),
            L4Conn::Udp(_udp_conn) => false,
        }
    }

    /// Returns the `true` if the packet represented by `ctxt` is in the direction of originator ->
    /// responder.
    pub(super) fn packet_dir(&self, ctxt: &L4Context) -> bool {
        self.five_tuple().orig == ctxt.src
    }

    /// Invokes connection termination tasks that are triggered when any of the following conditions
    /// occur:
    /// - the connection naturally terminates (e.g., FIN/RST)
    /// - the connection expires due to inactivity
    /// - the connection is drained at the end of the run
    pub(crate) fn terminate(&mut self, subscription: &Subscription<T::Subscribed>) {
        self.info.handle_terminate(subscription);
    }
}