retina_datatypes/
streaming.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use crate::Tracked;
use retina_core::conntrack::pdu::L4Pdu;
/// Infrastructure for managing the state of streaming subscriptions.
/// This should not be accessed directly by the user.
use retina_core::filter::datatypes::Streaming;
use std::time::{Duration, Instant};

/// Callback timer wrapper
pub struct CallbackTimer<T>
where
    T: Tracked,
{
    /// The type of counter (time-based, packet-based, or byte-based)
    counter_type: Streaming,
    /// Whether a callback has "unsubscribed" from streaming data
    unsubscribed: bool,
    /// Whether the subscription's filter has matched
    deliverable: bool,
    /// For time-based counters, when the callback was last invoked
    last_invoked: Option<Instant>,
    /// For packet- and byte-based counters, the number of packets/bytes
    /// remaining until the callback will be invoked again.
    /// Can be set to 0 for time-based counters.
    count_remaining: Option<u32>,
    /// TMP - TODO move this into the TrackedWrapper to be shared
    /// TODO - ideally could have multiple tracked datatypes
    data: T,
}

impl<T> CallbackTimer<T>
where
    T: Tracked,
{
    /// Create a new CallbackTimer with the given counter type and data.
    pub fn new(counter_type: Streaming, first_pkt: &L4Pdu) -> Self {
        Self {
            counter_type,
            unsubscribed: false,
            deliverable: false,
            last_invoked: None,
            count_remaining: None,
            data: T::new(first_pkt),
        }
    }

    /// Clear internal data.
    /// Should be invoked after delivery.
    #[inline]
    pub fn clear(&mut self) {
        self.data.clear();
    }

    #[inline]
    pub fn update(&mut self, pdu: &L4Pdu, reassembled: bool) {
        if !self.unsubscribed {
            self.data.update(pdu, reassembled);
        }
    }

    pub fn stream_protocols() -> Vec<&'static str> {
        T::stream_protocols()
    }

    #[inline]
    pub fn unsubscribe(&mut self) {
        self.unsubscribed = true;
    }

    #[inline]
    pub fn matched(&mut self) {
        if !self.unsubscribed {
            self.deliverable = true;
        }
    }

    /// Check if the callback should be invoked. Update counters.
    pub fn invoke(&mut self, pdu: &L4Pdu) -> bool {
        if self.unsubscribed || !self.deliverable {
            return false;
        }
        match self.counter_type {
            Streaming::Seconds(duration) => {
                // Deliver when first ready for delivery, then every N seconds
                if self.last_invoked.is_none() {
                    self.last_invoked = Some(Instant::now());
                    return true;
                }
                if self.last_invoked.unwrap().elapsed()
                    >= Duration::from_millis((duration * 1000.0).round() as u64)
                {
                    self.last_invoked = Some(Instant::now());
                    return true;
                }
                false
            }
            Streaming::Packets(count) => {
                // Deliver when first ready for delivery, then every N packets
                if self.count_remaining.is_none() {
                    self.count_remaining = Some(count);
                    return true;
                }
                // New packet received
                let count_remaining = self.count_remaining.unwrap();
                if count_remaining == 1 {
                    self.count_remaining = Some(count);
                    return true;
                }
                self.count_remaining = Some(count_remaining - 1);
                false
            }
            Streaming::Bytes(count) => {
                // Deliver when first ready for delivery, then every N packets
                if self.count_remaining.is_none() {
                    self.count_remaining = Some(count);
                    return true;
                }
                let count_remaining = self.count_remaining.unwrap();
                let len = pdu.mbuf_ref().data_len() as u32;
                if count_remaining <= len {
                    self.count_remaining = Some(count);
                    return true;
                }
                self.count_remaining = Some(count_remaining - len);
                false
            }
        }
    }
}