retina_core/multicore/subscription_stats.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
//! Statistics tracking for subscription processing.
//!
//! This module provides thread-safe statistics tracking allowing monitoring of subscription dispatch, processing, and completion states.
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
/// Thread-safe statistics tracker for the various stages of subscription processing.
/// All counters use atomic operations for thread safety.
#[derive(Default)]
pub struct SubscriptionStats {
/// Number of messages dispatched to processing queues.
pub dispatched: AtomicU64,
/// Number of messages dropped due to queue overflow or errors.
pub dropped: AtomicU64,
/// Number of messages that have completed processing.
/// Wrapped in `Arc` for thread sharing.
pub processed: Arc<AtomicU64>,
/// Number of messages currently being processed.
/// Wrapped in `Arc` for thread sharing.
pub actively_processing: Arc<AtomicU64>,
/// Number of messages saved to disk (not processed).
/// Wrapped in `Arc` for thread sharing.
pub flushed: Arc<AtomicU64>,
}
impl SubscriptionStats {
/// Creates a new instance with all counters initialized to zero.
pub fn new() -> Self {
Self {
dispatched: AtomicU64::new(0),
dropped: AtomicU64::new(0),
processed: Arc::new(AtomicU64::new(0)),
actively_processing: Arc::new(AtomicU64::new(0)),
flushed: Arc::new(AtomicU64::new(0)),
}
}
/// This creates a new `SubscriptionStats` instance with identical atomic counters.
pub fn snapshot(&self) -> SubscriptionStats {
SubscriptionStats {
dispatched: AtomicU64::new(self.get_dispatched()),
dropped: AtomicU64::new(self.get_dropped()),
processed: Arc::new(AtomicU64::new(self.get_processed())),
actively_processing: Arc::new(AtomicU64::new(self.get_actively_processing())),
flushed: Arc::new(AtomicU64::new(self.get_flushed())),
}
}
/// Returns the current number of dispatched messages.
pub fn get_dispatched(&self) -> u64 {
self.dispatched.load(Ordering::Relaxed)
}
/// Returns the current number of dropped messages.
pub fn get_dropped(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
/// Returns the current number of processed messages.
pub fn get_processed(&self) -> u64 {
self.processed.load(Ordering::Relaxed)
}
/// Returns the current number of messages actively being processed.
pub fn get_actively_processing(&self) -> u64 {
self.actively_processing.load(Ordering::Relaxed)
}
/// Returns the current number of messages flushed to disk.
pub fn get_flushed(&self) -> u64 {
self.flushed.load(Ordering::Relaxed)
}
}
/// Prints current statistics to stdout.
impl fmt::Display for SubscriptionStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Processed: {}\nDropped: {}\nFlushed (to disk): {}",
self.get_processed(),
self.get_dropped(),
self.get_flushed(),
)
}
}