Skip to main content

iris_core/conntrack/conn/tcp_conn/
reassembly.rs

1use crate::conntrack::conn::conn_info::ConnInfo;
2use crate::conntrack::pdu::L4Pdu;
3use crate::protocols::packet::tcp::{ACK, FIN, RST, SYN};
4use crate::protocols::stream::ParserRegistry;
5use crate::subscription::{Subscription, Trackable};
6
7use anyhow::{bail, Result};
8use std::collections::VecDeque;
9
10/// Represents a uni-directional TCP flow
11#[derive(Debug)]
12pub(crate) struct TcpFlow {
13    /// Expected sequence number of next segment
14    pub(super) next_seq: Option<u32>,
15    /// Last-seen ack number for peer's flow
16    pub(crate) last_ack: Option<u32>,
17    /// Flow status for consumed control packets.
18    /// Matches TCP flag bits.
19    pub(super) consumed_flags: u8,
20    /// Out-of-order buffer
21    pub(crate) ooo_buf: OutOfOrderBuffer,
22    /// Number observed (not necessarily reassembled) packets
23    pub(crate) observed: usize,
24}
25
26impl TcpFlow {
27    /// Creates a default TCP flow
28    #[inline]
29    pub(super) fn default(capacity: usize) -> Self {
30        TcpFlow {
31            next_seq: None,
32            last_ack: None,
33            consumed_flags: 0,
34            ooo_buf: OutOfOrderBuffer::new(capacity),
35            observed: 0,
36        }
37    }
38
39    /// Creates a new TCP flow with given next sequence number, flags,
40    /// and out-of-order buffer
41    #[inline]
42    pub(super) fn new(capacity: usize, next_seq: u32, flags: u8, ack: u32) -> Self {
43        TcpFlow {
44            next_seq: Some(next_seq),
45            last_ack: Some(ack),
46            consumed_flags: flags,
47            ooo_buf: OutOfOrderBuffer::new(capacity),
48            observed: 1,
49        }
50    }
51
52    /// Attempt to insert incoming data segment into flow.
53    /// Buffer future segments and drop old segments.
54    /// Shunts TcpStream if the incoming segment causes out-of-order buffer overflow
55    #[inline]
56    pub(super) fn insert_segment<T: Trackable>(
57        &mut self,
58        mut segment: L4Pdu,
59        info: &mut ConnInfo<T>,
60        subscription: &Subscription<T::Subscribed>,
61        registry: &ParserRegistry,
62    ) {
63        let length = segment.length() as u32;
64        let cur_seq = segment.seq_no();
65        self.observed += 1;
66        segment.ctxt.reassembled = true;
67
68        if let Some(next_seq) = self.next_seq {
69            if next_seq == cur_seq {
70                // Segment is the next expected segment in the sequence
71                self.consumed_flags |= segment.flags();
72                if segment.flags() & RST != 0 {
73                    info.consume_stream(&mut segment, subscription, registry);
74                    return;
75                }
76                let mut expected_seq = cur_seq.wrapping_add(length);
77                if segment.flags() & FIN != 0 {
78                    expected_seq = cur_seq.wrapping_add(1);
79                }
80                info.consume_stream(&mut segment, subscription, registry);
81                self.last_ack = Some(segment.ack_no());
82                self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
83            } else if wrapping_lt(next_seq, cur_seq) {
84                // Segment comes after the next expected segment
85                self.buffer_ooo_seg(segment, info);
86            } else if let Some(expected_seq) = overlap(&mut segment, next_seq) {
87                // Segment starts before the next expected segment but has new data
88                self.consumed_flags |= segment.flags();
89                info.consume_stream(&mut segment, subscription, registry);
90                self.last_ack = Some(segment.ack_no());
91                self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
92            } else {
93                // Segment contains old data
94                log::debug!(
95                    "Dropping old segment. cur: {} expect: {}",
96                    cur_seq,
97                    next_seq
98                );
99                segment.mark_no_payload();
100                drop(segment);
101            }
102        } else {
103            // expecting SYNACK in response to the originator's SYN
104            if segment.flags() & (SYN | ACK) != 0 {
105                let expected_seq = cur_seq.wrapping_add(1 + length);
106                self.next_seq = Some(expected_seq);
107                self.consumed_flags |= segment.flags();
108                self.last_ack = Some(segment.ack_no());
109                info.consume_stream(&mut segment, subscription, registry);
110                self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
111            } else {
112                // Buffer out-of-order non-SYNACK packets
113                self.buffer_ooo_seg(segment, info);
114            }
115        }
116    }
117
118    /// Insert packet into ooo buffer and handle overflow
119    #[inline]
120    fn buffer_ooo_seg<T: Trackable>(&mut self, segment: L4Pdu, info: &mut ConnInfo<T>) {
121        if self.ooo_buf.insert_back(segment).is_err() {
122            log::warn!("Out-of-order buffer overflow");
123            // Drop the connection
124            info.exec_drop();
125        }
126    }
127
128    /// Flushes the flow's out-of-order buffer given the next expected
129    /// sequence number and updates the flow's new next expected
130    /// sequence number and status after the flush.
131    #[inline]
132    pub(super) fn flush_ooo_buffer<T: Trackable>(
133        &mut self,
134        expected_seq: u32,
135        info: &mut ConnInfo<T>,
136        subscription: &Subscription<T::Subscribed>,
137        registry: &ParserRegistry,
138    ) {
139        if info.drop() {
140            return;
141        }
142        let next_seq = self.ooo_buf.flush_ordered::<T>(
143            expected_seq,
144            &mut self.last_ack,
145            &mut self.consumed_flags,
146            info,
147            subscription,
148            registry,
149        );
150        self.next_seq = Some(next_seq);
151    }
152}
153
154/// A buffer to hold reordered TCP segments
155#[derive(Debug)]
156pub(crate) struct OutOfOrderBuffer {
157    capacity: usize,
158    pub(crate) buf: VecDeque<L4Pdu>,
159}
160
161impl OutOfOrderBuffer {
162    /// Creates a new OutOfOrderBuffer with capacity
163    fn new(capacity: usize) -> Self {
164        OutOfOrderBuffer {
165            capacity,
166            buf: VecDeque::new(),
167        }
168    }
169
170    /// Is empty
171    pub(crate) fn is_empty(&self) -> bool {
172        self.buf.is_empty()
173    }
174
175    /// Returns the number of elements in the buffer
176    #[allow(dead_code)]
177    pub(crate) fn len(&self) -> usize {
178        self.buf.len()
179    }
180
181    /// Inserts segment at the end of the buffer.
182    fn insert_back(&mut self, segment: L4Pdu) -> Result<()> {
183        log::debug!("insert with seq : {:#?}", segment.seq_no());
184        if self.len() >= self.capacity {
185            // // must clear to drop buffered Mbufs
186            // self.buf.clear();
187            bail!("Out-of-order buffer overflow.");
188        }
189        self.buf.push_back(segment);
190        Ok(())
191    }
192
193    /// Consumes segments with expected data, retains segments with future data,
194    /// and drops segments with old data.
195    /// Returns the next expected sequence number and control flags of consumed segments.
196    #[allow(clippy::too_many_arguments)]
197    #[inline]
198    fn flush_ordered<T: Trackable>(
199        &mut self,
200        expected_seq: u32,
201        last_ack: &mut Option<u32>,
202        consumed_flags: &mut u8,
203        info: &mut ConnInfo<T>,
204        subscription: &Subscription<T::Subscribed>,
205        registry: &ParserRegistry,
206    ) -> u32 {
207        let mut next_seq = expected_seq;
208        let mut index = 0;
209        while index < self.len() {
210            if info.drop() {
211                return next_seq;
212            }
213
214            // unwraps ok because index < len
215            let cur_seq = self.buf.get_mut(index).unwrap().seq_no();
216            log::debug!("Flushing...current seq: {:#?}", cur_seq);
217
218            if next_seq == cur_seq {
219                let mut segment = self.buf.remove(index).unwrap();
220                *consumed_flags |= segment.flags();
221                if segment.flags() & RST != 0 {
222                    info.consume_stream(&mut segment, subscription, registry);
223                    return next_seq;
224                }
225                next_seq = next_seq.wrapping_add(segment.length() as u32);
226                if segment.flags() & FIN != 0 {
227                    next_seq = next_seq.wrapping_add(1);
228                }
229                info.consume_stream(&mut segment, subscription, registry);
230                *last_ack = Some(segment.ack_no());
231                index = 0;
232            } else if wrapping_lt(next_seq, cur_seq) {
233                index += 1;
234            } else {
235                let mut segment = self.buf.remove(index).unwrap();
236                if let Some(update_seq) = overlap(&mut segment, next_seq) {
237                    next_seq = update_seq;
238                    *consumed_flags |= segment.flags();
239                    info.consume_stream(&mut segment, subscription, registry);
240                    *last_ack = Some(segment.ack_no());
241                    index = 0;
242                } else {
243                    log::debug!("Dropping old segment during flush.");
244                    segment.mark_no_payload();
245                    drop(segment);
246                    index += 1;
247                }
248            }
249        }
250        next_seq
251    }
252}
253
254pub fn wrapping_lt(lhs: u32, rhs: u32) -> bool {
255    // From RFC1323:
256    //     TCP determines if a data segment is "old" or "new" by testing
257    //     whether its sequence number is within 2**31 bytes of the left edge
258    //     of the window, and if it is not, discarding the data as "old".  To
259    //     insure that new data is never mistakenly considered old and vice-
260    //     versa, the left edge of the sender's window has to be at most
261    //     2**31 away from the right edge of the receiver's window.
262    lhs.wrapping_sub(rhs) > (1 << 31)
263}
264
265/// Check if a segment has overlapping data with the received bytes.
266/// Returns the new expected sequence number if there is overlap
267fn overlap(segment: &mut L4Pdu, expected_seq: u32) -> Option<u32> {
268    let length = segment.length();
269    let cur_seq = segment.seq_no();
270    let mut end_seq = cur_seq.wrapping_add(length as u32);
271    if segment.flags() & FIN != 0 {
272        end_seq = end_seq.wrapping_add(1);
273    }
274
275    if wrapping_lt(expected_seq, end_seq) {
276        // contains new data
277        let new_data_len = end_seq.wrapping_sub(expected_seq);
278        let overlap_data_len = expected_seq.wrapping_sub(cur_seq);
279
280        log::debug!("Overlap with new data size : {:#?}", new_data_len);
281        segment.ctxt.offset += overlap_data_len as usize;
282        segment.ctxt.length = new_data_len as usize;
283        Some(end_seq)
284    } else {
285        None
286    }
287}