retina_core/runtime/
mod.rs1mod offline;
7mod online;
8use self::offline::*;
9use self::online::*;
10
11use crate::config::*;
12use crate::dpdk;
13use crate::filter::{Filter, FilterFactory};
14use crate::lcore::SocketId;
15use crate::memory::mempool::Mempool;
16use crate::subscription::*;
17
18use std::collections::BTreeMap;
19use std::ffi::CString;
20use std::sync::Arc;
21
22use anyhow::{bail, Result};
23
24pub struct Runtime<'a, S>
29where
30 S: Subscribable,
31{
32 #[allow(dead_code)]
33 mempools: BTreeMap<SocketId, Mempool>,
34 online: Option<OnlineRuntime<'a, S>>,
35 offline: Option<OfflineRuntime<'a, S>>,
36 #[cfg(feature = "timing")]
37 subscription: Arc<Subscription<'a, S>>,
38}
39
40impl<'a, S> Runtime<'a, S>
41where
42 S: Subscribable,
43{
44 pub fn new(
58 config: RuntimeConfig,
59 factory: fn() -> FilterFactory,
60 cb: impl Fn(S) + 'a,
61 ) -> Result<Self> {
62 let factory = factory();
63 let filter =
64 Filter::from_str(factory.filter_str.as_str(), true).expect("Failed to parse filter");
65 let subscription = Arc::new(Subscription::new(factory, cb));
66
67 println!("Initializing Retina runtime...");
68 log::info!("Initializing EAL...");
69 dpdk::load_drivers();
70 {
71 let eal_params = config.get_eal_params();
72 let eal_params_len = eal_params.len() as i32;
73
74 let mut args = vec![];
75 let mut ptrs = vec![];
76 for arg in eal_params.into_iter() {
77 let s = CString::new(arg).unwrap();
78 ptrs.push(s.as_ptr() as *mut u8);
79 args.push(s);
80 }
81
82 let ret = unsafe { dpdk::rte_eal_init(eal_params_len, ptrs.as_ptr() as *mut _) };
83 if ret < 0 {
84 bail!("Failure initializing EAL");
85 }
86 }
87
88 log::info!("Initializing Mempools...");
89 let mut mempools = BTreeMap::new();
90 let socket_ids = config.get_all_socket_ids();
91 let mtu = if let Some(online) = &config.online {
92 online.mtu
93 } else if let Some(offline) = &config.offline {
94 offline.mtu
95 } else {
96 Mempool::default_mtu()
97 };
98 for socket_id in socket_ids {
99 log::debug!("Socket ID: {}", socket_id);
100 let mempool = Mempool::new(&config.mempool, socket_id, mtu)?;
101 mempools.insert(socket_id, mempool);
102 }
103
104 let online = config.online.as_ref().map(|cfg| {
105 log::info!("Initializing Online Runtime...");
106 let online_opts = OnlineOptions {
107 online: cfg.clone(),
108 conntrack: config.conntrack.clone(),
109 };
110 OnlineRuntime::new(
111 &config,
112 online_opts,
113 &mut mempools,
114 filter.clone(),
115 Arc::clone(&subscription),
116 )
117 });
118
119 let offline = config.offline.as_ref().map(|cfg| {
120 log::info!("Initializing Offline Analysis...");
121 let offline_opts = OfflineOptions {
122 offline: cfg.clone(),
123 conntrack: config.conntrack.clone(),
124 };
125 OfflineRuntime::new(
126 offline_opts,
127 &mempools,
128 filter.clone(),
129 Arc::clone(&subscription),
130 )
131 });
132
133 log::info!("Runtime ready.");
134 Ok(Runtime {
135 mempools,
136 online,
137 offline,
138 #[cfg(feature = "timing")]
139 subscription,
140 })
141 }
142
143 pub fn run(&mut self) {
151 if let Some(online) = &mut self.online {
152 online.run();
153 } else if let Some(offline) = &self.offline {
154 offline.run();
155 } else {
156 log::error!("No runtime");
157 }
158 #[cfg(feature = "timing")]
159 {
160 self.subscription.timers.display_stats();
161 self.subscription.timers.dump_stats();
162 }
163 log::info!("Done.");
164 }
165}