retina_datatypes/streaming.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
use crate::Tracked;
use retina_core::conntrack::pdu::L4Pdu;
/// Infrastructure for managing the state of streaming subscriptions.
/// This should not be accessed directly by the user.
use retina_core::filter::datatypes::Streaming;
use std::time::{Duration, Instant};
/// Callback timer wrapper
pub struct CallbackTimer<T>
where
T: Tracked,
{
/// The type of counter (time-based, packet-based, or byte-based)
counter_type: Streaming,
/// Whether a callback has "unsubscribed" from streaming data
unsubscribed: bool,
/// Whether the subscription's filter has matched
deliverable: bool,
/// For time-based counters, when the callback was last invoked
last_invoked: Option<Instant>,
/// For packet- and byte-based counters, the number of packets/bytes
/// remaining until the callback will be invoked again.
/// Can be set to 0 for time-based counters.
count_remaining: Option<u32>,
/// TMP - TODO move this into the TrackedWrapper to be shared
/// TODO - ideally could have multiple tracked datatypes
data: T,
}
impl<T> CallbackTimer<T>
where
T: Tracked,
{
/// Create a new CallbackTimer with the given counter type and data.
pub fn new(counter_type: Streaming, first_pkt: &L4Pdu) -> Self {
Self {
counter_type,
unsubscribed: false,
deliverable: false,
last_invoked: None,
count_remaining: None,
data: T::new(first_pkt),
}
}
/// Clear internal data.
/// Should be invoked after delivery.
#[inline]
pub fn clear(&mut self) {
self.data.clear();
}
#[inline]
pub fn update(&mut self, pdu: &L4Pdu, reassembled: bool) {
if !self.unsubscribed {
self.data.update(pdu, reassembled);
}
}
pub fn stream_protocols() -> Vec<&'static str> {
T::stream_protocols()
}
#[inline]
pub fn unsubscribe(&mut self) {
self.unsubscribed = true;
}
#[inline]
pub fn matched(&mut self) {
if !self.unsubscribed {
self.deliverable = true;
}
}
/// Check if the callback should be invoked. Update counters.
pub fn invoke(&mut self, pdu: &L4Pdu) -> bool {
if self.unsubscribed || !self.deliverable {
return false;
}
match self.counter_type {
Streaming::Seconds(duration) => {
// Deliver when first ready for delivery, then every N seconds
if self.last_invoked.is_none() {
self.last_invoked = Some(Instant::now());
return true;
}
if self.last_invoked.unwrap().elapsed()
>= Duration::from_millis((duration * 1000.0).round() as u64)
{
self.last_invoked = Some(Instant::now());
return true;
}
false
}
Streaming::Packets(count) => {
// Deliver when first ready for delivery, then every N packets
if self.count_remaining.is_none() {
self.count_remaining = Some(count);
return true;
}
// New packet received
let count_remaining = self.count_remaining.unwrap();
if count_remaining == 1 {
self.count_remaining = Some(count);
return true;
}
self.count_remaining = Some(count_remaining - 1);
false
}
Streaming::Bytes(count) => {
// Deliver when first ready for delivery, then every N packets
if self.count_remaining.is_none() {
self.count_remaining = Some(count);
return true;
}
let count_remaining = self.count_remaining.unwrap();
let len = pdu.mbuf_ref().data_len() as u32;
if count_remaining <= len {
self.count_remaining = Some(count);
return true;
}
self.count_remaining = Some(count_remaining - len);
false
}
}
}
}