iris_core/conntrack/conn/tcp_conn/
reassembly.rs1use crate::conntrack::conn::conn_info::ConnInfo;
2use crate::conntrack::pdu::L4Pdu;
3use crate::protocols::packet::tcp::{ACK, FIN, RST, SYN};
4use crate::protocols::stream::ParserRegistry;
5use crate::subscription::{Subscription, Trackable};
6
7use anyhow::{bail, Result};
8use std::collections::VecDeque;
9
10#[derive(Debug)]
12pub(crate) struct TcpFlow {
13 pub(super) next_seq: Option<u32>,
15 pub(crate) last_ack: Option<u32>,
17 pub(super) consumed_flags: u8,
20 pub(crate) ooo_buf: OutOfOrderBuffer,
22 pub(crate) observed: usize,
24}
25
26impl TcpFlow {
27 #[inline]
29 pub(super) fn default(capacity: usize) -> Self {
30 TcpFlow {
31 next_seq: None,
32 last_ack: None,
33 consumed_flags: 0,
34 ooo_buf: OutOfOrderBuffer::new(capacity),
35 observed: 0,
36 }
37 }
38
39 #[inline]
42 pub(super) fn new(capacity: usize, next_seq: u32, flags: u8, ack: u32) -> Self {
43 TcpFlow {
44 next_seq: Some(next_seq),
45 last_ack: Some(ack),
46 consumed_flags: flags,
47 ooo_buf: OutOfOrderBuffer::new(capacity),
48 observed: 1,
49 }
50 }
51
52 #[inline]
56 pub(super) fn insert_segment<T: Trackable>(
57 &mut self,
58 mut segment: L4Pdu,
59 info: &mut ConnInfo<T>,
60 subscription: &Subscription<T::Subscribed>,
61 registry: &ParserRegistry,
62 ) {
63 let length = segment.length() as u32;
64 let cur_seq = segment.seq_no();
65 self.observed += 1;
66 segment.ctxt.reassembled = true;
67
68 if let Some(next_seq) = self.next_seq {
69 if next_seq == cur_seq {
70 self.consumed_flags |= segment.flags();
72 if segment.flags() & RST != 0 {
73 info.consume_stream(&mut segment, subscription, registry);
74 return;
75 }
76 let mut expected_seq = cur_seq.wrapping_add(length);
77 if segment.flags() & FIN != 0 {
78 expected_seq = cur_seq.wrapping_add(1);
79 }
80 info.consume_stream(&mut segment, subscription, registry);
81 self.last_ack = Some(segment.ack_no());
82 self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
83 } else if wrapping_lt(next_seq, cur_seq) {
84 self.buffer_ooo_seg(segment, info);
86 } else if let Some(expected_seq) = overlap(&mut segment, next_seq) {
87 self.consumed_flags |= segment.flags();
89 info.consume_stream(&mut segment, subscription, registry);
90 self.last_ack = Some(segment.ack_no());
91 self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
92 } else {
93 log::debug!(
95 "Dropping old segment. cur: {} expect: {}",
96 cur_seq,
97 next_seq
98 );
99 segment.mark_no_payload();
100 drop(segment);
101 }
102 } else {
103 if segment.flags() & (SYN | ACK) != 0 {
105 let expected_seq = cur_seq.wrapping_add(1 + length);
106 self.next_seq = Some(expected_seq);
107 self.consumed_flags |= segment.flags();
108 self.last_ack = Some(segment.ack_no());
109 info.consume_stream(&mut segment, subscription, registry);
110 self.flush_ooo_buffer::<T>(expected_seq, info, subscription, registry);
111 } else {
112 self.buffer_ooo_seg(segment, info);
114 }
115 }
116 }
117
118 #[inline]
120 fn buffer_ooo_seg<T: Trackable>(&mut self, segment: L4Pdu, info: &mut ConnInfo<T>) {
121 if self.ooo_buf.insert_back(segment).is_err() {
122 log::warn!("Out-of-order buffer overflow");
123 info.exec_drop();
125 }
126 }
127
128 #[inline]
132 pub(super) fn flush_ooo_buffer<T: Trackable>(
133 &mut self,
134 expected_seq: u32,
135 info: &mut ConnInfo<T>,
136 subscription: &Subscription<T::Subscribed>,
137 registry: &ParserRegistry,
138 ) {
139 if info.drop() {
140 return;
141 }
142 let next_seq = self.ooo_buf.flush_ordered::<T>(
143 expected_seq,
144 &mut self.last_ack,
145 &mut self.consumed_flags,
146 info,
147 subscription,
148 registry,
149 );
150 self.next_seq = Some(next_seq);
151 }
152}
153
154#[derive(Debug)]
156pub(crate) struct OutOfOrderBuffer {
157 capacity: usize,
158 pub(crate) buf: VecDeque<L4Pdu>,
159}
160
161impl OutOfOrderBuffer {
162 fn new(capacity: usize) -> Self {
164 OutOfOrderBuffer {
165 capacity,
166 buf: VecDeque::new(),
167 }
168 }
169
170 pub(crate) fn is_empty(&self) -> bool {
172 self.buf.is_empty()
173 }
174
175 #[allow(dead_code)]
177 pub(crate) fn len(&self) -> usize {
178 self.buf.len()
179 }
180
181 fn insert_back(&mut self, segment: L4Pdu) -> Result<()> {
183 log::debug!("insert with seq : {:#?}", segment.seq_no());
184 if self.len() >= self.capacity {
185 bail!("Out-of-order buffer overflow.");
188 }
189 self.buf.push_back(segment);
190 Ok(())
191 }
192
193 #[allow(clippy::too_many_arguments)]
197 #[inline]
198 fn flush_ordered<T: Trackable>(
199 &mut self,
200 expected_seq: u32,
201 last_ack: &mut Option<u32>,
202 consumed_flags: &mut u8,
203 info: &mut ConnInfo<T>,
204 subscription: &Subscription<T::Subscribed>,
205 registry: &ParserRegistry,
206 ) -> u32 {
207 let mut next_seq = expected_seq;
208 let mut index = 0;
209 while index < self.len() {
210 if info.drop() {
211 return next_seq;
212 }
213
214 let cur_seq = self.buf.get_mut(index).unwrap().seq_no();
216 log::debug!("Flushing...current seq: {:#?}", cur_seq);
217
218 if next_seq == cur_seq {
219 let mut segment = self.buf.remove(index).unwrap();
220 *consumed_flags |= segment.flags();
221 if segment.flags() & RST != 0 {
222 info.consume_stream(&mut segment, subscription, registry);
223 return next_seq;
224 }
225 next_seq = next_seq.wrapping_add(segment.length() as u32);
226 if segment.flags() & FIN != 0 {
227 next_seq = next_seq.wrapping_add(1);
228 }
229 info.consume_stream(&mut segment, subscription, registry);
230 *last_ack = Some(segment.ack_no());
231 index = 0;
232 } else if wrapping_lt(next_seq, cur_seq) {
233 index += 1;
234 } else {
235 let mut segment = self.buf.remove(index).unwrap();
236 if let Some(update_seq) = overlap(&mut segment, next_seq) {
237 next_seq = update_seq;
238 *consumed_flags |= segment.flags();
239 info.consume_stream(&mut segment, subscription, registry);
240 *last_ack = Some(segment.ack_no());
241 index = 0;
242 } else {
243 log::debug!("Dropping old segment during flush.");
244 segment.mark_no_payload();
245 drop(segment);
246 index += 1;
247 }
248 }
249 }
250 next_seq
251 }
252}
253
254pub fn wrapping_lt(lhs: u32, rhs: u32) -> bool {
255 lhs.wrapping_sub(rhs) > (1 << 31)
263}
264
265fn overlap(segment: &mut L4Pdu, expected_seq: u32) -> Option<u32> {
268 let length = segment.length();
269 let cur_seq = segment.seq_no();
270 let mut end_seq = cur_seq.wrapping_add(length as u32);
271 if segment.flags() & FIN != 0 {
272 end_seq = end_seq.wrapping_add(1);
273 }
274
275 if wrapping_lt(expected_seq, end_seq) {
276 let new_data_len = end_seq.wrapping_sub(expected_seq);
278 let overlap_data_len = expected_seq.wrapping_sub(cur_seq);
279
280 log::debug!("Overlap with new data size : {:#?}", new_data_len);
281 segment.ctxt.offset += overlap_data_len as usize;
282 segment.ctxt.length = new_data_len as usize;
283 Some(end_seq)
284 } else {
285 None
286 }
287}