Skip to main content

iris_core/multicore/
subscription_stats.rs

1//! Statistics tracking for subscription processing.
2//!
3//! This module provides thread-safe statistics tracking allowing monitoring of subscription dispatch, processing, and completion states.
4
5use std::fmt;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9/// Thread-safe statistics tracker for the various stages of subscription processing.
10/// All counters use atomic operations for thread safety.
11#[derive(Default)]
12pub struct SubscriptionStats {
13    /// Number of messages dispatched to processing queues.
14    pub dispatched: AtomicU64,
15
16    /// Number of messages dropped due to queue overflow or errors.
17    pub dropped: AtomicU64,
18
19    /// Number of messages that have completed processing.
20    /// Wrapped in `Arc` for thread sharing.
21    pub processed: Arc<AtomicU64>,
22
23    /// Number of messages currently being processed.
24    /// Wrapped in `Arc` for thread sharing.
25    pub actively_processing: Arc<AtomicU64>,
26
27    /// Number of messages saved to disk (not processed).
28    /// Wrapped in `Arc` for thread sharing.
29    pub flushed: Arc<AtomicU64>,
30}
31
32impl SubscriptionStats {
33    /// Creates a new instance with all counters initialized to zero.
34    pub fn new() -> Self {
35        Self {
36            dispatched: AtomicU64::new(0),
37            dropped: AtomicU64::new(0),
38            processed: Arc::new(AtomicU64::new(0)),
39            actively_processing: Arc::new(AtomicU64::new(0)),
40            flushed: Arc::new(AtomicU64::new(0)),
41        }
42    }
43
44    /// This creates a new `SubscriptionStats` instance with identical atomic counters.
45    pub fn snapshot(&self) -> SubscriptionStats {
46        SubscriptionStats {
47            dispatched: AtomicU64::new(self.get_dispatched()),
48            dropped: AtomicU64::new(self.get_dropped()),
49            processed: Arc::new(AtomicU64::new(self.get_processed())),
50            actively_processing: Arc::new(AtomicU64::new(self.get_actively_processing())),
51            flushed: Arc::new(AtomicU64::new(self.get_flushed())),
52        }
53    }
54
55    /// Returns the current number of dispatched messages.
56    pub fn get_dispatched(&self) -> u64 {
57        self.dispatched.load(Ordering::Relaxed)
58    }
59
60    /// Returns the current number of dropped messages.
61    pub fn get_dropped(&self) -> u64 {
62        self.dropped.load(Ordering::Relaxed)
63    }
64
65    /// Returns the current number of processed messages.
66    pub fn get_processed(&self) -> u64 {
67        self.processed.load(Ordering::Relaxed)
68    }
69
70    /// Returns the current number of messages actively being processed.
71    pub fn get_actively_processing(&self) -> u64 {
72        self.actively_processing.load(Ordering::Relaxed)
73    }
74
75    /// Returns the current number of messages flushed to disk.
76    pub fn get_flushed(&self) -> u64 {
77        self.flushed.load(Ordering::Relaxed)
78    }
79}
80
81/// Prints current statistics to stdout.
82impl fmt::Display for SubscriptionStats {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        write!(
85            f,
86            "Processed: {}\nDropped: {}\nFlushed (to disk): {}",
87            self.get_processed(),
88            self.get_dropped(),
89            self.get_flushed(),
90        )
91    }
92}