retina_core/subscription/connection_frame.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
//! Connection packet stream.
//!
//! This is a connection-level subscription that provides a stream of raw Ethernet frames associated
//! with connections that satisfy the subscription filter in the order of arrival. The callback is
//! invoked once per frame.
//!
//! ## Example
//! Prints raw packet data from TLS connections on TCP/443 with subdomains of `google.com`:
//! ```
//! #[filter("tcp.port = 443 and tls.sni ~ 'google\\.com$'")]
//! fn main() {
//! let config = default_config();
//! let cb = |frame: ConnectionFrame| {
//! println!("{:?}", frame.data);
//! };
//! let mut runtime = Runtime::new(config, filter, cb).unwrap();
//! runtime.run();
//! }
//! ```
//!
//! ## Remarks
//! The first few packets in the connection may be delivered in sequence order if the subscription's
//! filter requires Retina to reassemble the stream. Once the filter is satisfied, all remaining
//! packets in the connection are delivered in the order of observation.
// TODO: find a workaround for this, perhaps timestamping all packets by default.
use crate::conntrack::conn_id::FiveTuple;
use crate::conntrack::pdu::{L4Context, L4Pdu};
use crate::conntrack::ConnTracker;
use crate::filter::FilterResult;
use crate::memory::mbuf::Mbuf;
use crate::protocols::stream::{ConnParser, Session};
use crate::subscription::{Level, Subscribable, Subscription, Trackable};
use std::net::SocketAddr;
/// Ethernet frames in a TCP or UDP connection.
#[derive(Debug, Clone)]
pub struct ConnectionFrame {
pub five_tuple: FiveTuple,
pub data: Vec<u8>,
}
impl ConnectionFrame {
/// Creates a new `ConnectionFrame`.
pub(crate) fn new(five_tuple: FiveTuple, mbuf: &Mbuf) -> Self {
ConnectionFrame {
five_tuple,
data: mbuf.data().to_vec(),
}
}
/// Returns the associated connection originator's socket address.
#[inline]
pub fn client(&self) -> SocketAddr {
self.five_tuple.orig
}
/// Returns the associated connection responder's socket address.
#[inline]
pub fn server(&self) -> SocketAddr {
self.five_tuple.resp
}
}
impl Subscribable for ConnectionFrame {
type Tracked = TrackedConnectionFrame;
fn level() -> Level {
Level::Connection
}
fn parsers() -> Vec<ConnParser> {
vec![]
}
fn process_packet(
mbuf: Mbuf,
subscription: &Subscription<Self>,
conn_tracker: &mut ConnTracker<Self::Tracked>,
) {
match subscription.filter_packet(&mbuf) {
FilterResult::MatchTerminal(idx) | FilterResult::MatchNonTerminal(idx) => {
if let Ok(ctxt) = L4Context::new(&mbuf, idx) {
conn_tracker.process(mbuf, ctxt, subscription);
}
}
FilterResult::NoMatch => drop(mbuf),
}
}
}
/// Tracks connection frames throughout the duration of the connection lifetime.
///
/// ## Note
/// Internal connection state is an associated type of a `pub` trait, and therefore must also be
/// public. Documentation is hidden by default to avoid confusing users.
#[doc(hidden)]
pub struct TrackedConnectionFrame {
/// Connection 5-tuple.
five_tuple: FiveTuple,
/// Buffers packets in the connection prior to a filter match.
buf: Vec<ConnectionFrame>,
}
impl Trackable for TrackedConnectionFrame {
type Subscribed = ConnectionFrame;
fn new(five_tuple: FiveTuple) -> Self {
TrackedConnectionFrame {
five_tuple,
buf: vec![],
}
}
fn pre_match(&mut self, pdu: L4Pdu, _session_id: Option<usize>) {
self.buf
.push(ConnectionFrame::new(self.five_tuple, pdu.mbuf_ref()));
}
fn on_match(&mut self, _session: Session, subscription: &Subscription<Self::Subscribed>) {
self.buf.drain(..).for_each(|frame| {
subscription.invoke(frame);
});
}
fn post_match(&mut self, pdu: L4Pdu, subscription: &Subscription<Self::Subscribed>) {
subscription.invoke(ConnectionFrame::new(self.five_tuple, pdu.mbuf_ref()));
}
fn on_terminate(&mut self, subscription: &Subscription<Self::Subscribed>) {
self.buf.drain(..).for_each(|frame| {
subscription.invoke(frame);
});
}
}