Skip to main content

iris_core/multicore/
channel_dispatcher.rs

1//! Channel Dispatcher
2//!
3//! This module provides a channel dispatching system that operate in two modes:
4//! - **Shared**: All data is sent through a single shared channel.
5//! - **PerCore**: Data is dispatched to specific channels based on coreID
6//!
7//! The dispatcher automatically tracks statistics for dispatched and dropped subscriptions
8//! and provides thread-safe access to receivers for consumption.
9
10use super::SubscriptionStats;
11use crate::CoreId;
12use crossbeam::channel::{bounded, Receiver, Sender, TrySendError};
13use std::collections::HashMap;
14use std::sync::{atomic::Ordering, Arc, Mutex};
15use thiserror::Error;
16
17/// Defines the operating mode for the channel dispatcher.
18///
19/// # Examples
20///
21/// ```rust,ignore
22/// // Create a shared mode dispatcher
23/// let shared_mode = ChannelMode::Shared;
24/// ```
25///
26/// ```rust,ignore
27/// // Create a per-core mode dispatcher for specific cores
28/// let cores = vec![CoreId(0), CoreId(1), CoreId(2)];
29/// let per_core_mode = ChannelMode::PerCore(cores);
30/// ```
31#[derive(Clone)]
32pub enum ChannelMode {
33    /// All subscriptions are sent through a single shared channel.
34    Shared,
35    /// Messages are routed to specific channels based on core ID.
36    PerCore(Vec<CoreId>),
37}
38
39type Channel<T> = (Option<Sender<T>>, Arc<Receiver<T>>);
40
41/// Internal representation of the channel configuration based on chosen operating mode.
42pub enum Channels<T> {
43    /// Single shared sender and receiver pair.
44    Shared(Channel<T>),
45    /// HashMap mapping core IDs to their dedicated sender/receiver pairs.
46    PerCore(HashMap<CoreId, Channel<T>>),
47}
48
49/// A unified thread-safe interface for dispatching subscriptions.
50///
51/// # Type Parameters
52///
53/// * `T` - The type of subscriptions being dispatched. Must implement `Send + 'static`.
54pub struct ChannelDispatcher<T> {
55    name: String,
56    channels: Mutex<Channels<T>>,
57    stats: SubscriptionStats,
58}
59
60impl<T: Send + 'static> ChannelDispatcher<T> {
61    /// Creates a new channel dispatcher with the specified mode and channel capacity.
62    pub fn new(mode: ChannelMode, channel_size: usize, name: String) -> Self {
63        match mode {
64            ChannelMode::Shared => Self::new_shared(channel_size, name),
65            ChannelMode::PerCore(rx_cores) => Self::new_percore(&rx_cores, channel_size, name),
66        }
67    }
68
69    /// Creates a new shared-mode dispatcher.
70    fn new_shared(channel_size: usize, name: String) -> Self {
71        let (tx, rx) = bounded(channel_size);
72
73        Self {
74            name,
75            channels: Mutex::new(Channels::Shared((Some(tx), Arc::new(rx)))),
76            stats: SubscriptionStats::new(),
77        }
78    }
79
80    /// Creates a new per-core mode dispatcher.
81    fn new_percore(rx_cores: &[CoreId], channel_size: usize, name: String) -> Self {
82        let mut map = HashMap::with_capacity(rx_cores.len());
83
84        for &core in rx_cores {
85            let (tx, rx) = bounded(channel_size);
86            map.insert(core, (Some(tx), Arc::new(rx)));
87        }
88
89        Self {
90            name,
91            channels: Mutex::new(Channels::PerCore(map)),
92            stats: SubscriptionStats::new(),
93        }
94    }
95
96    /// Dispatches data to appropriate channel based on the dispatcher's mode.
97    ///
98    /// In either case, the subscription passing is non-blocking through crossbeam's try_send
99    /// operation and doesn't rely on mutexes internally (relies on lower-level atomic operations).
100    pub fn dispatch(&self, data: T, core_id: Option<&CoreId>) -> Result<(), DispatchError<T>> {
101        let channels = self.channels.lock().unwrap();
102
103        let result = match &*channels {
104            Channels::PerCore(map) => {
105                let core = core_id.ok_or(DispatchError::CoreIdRequired)?;
106                let (sender_result, _) = map.get(core).ok_or(DispatchError::CoreNotFound(*core))?;
107                match sender_result {
108                    Some(sender) => sender.try_send(data),
109                    None => Err(TrySendError::Disconnected(data)),
110                }
111            }
112            Channels::Shared((sender_result, _)) => match sender_result {
113                Some(sender) => sender.try_send(data),
114                None => Err(TrySendError::Disconnected(data)),
115            },
116        };
117
118        match result {
119            Ok(()) => {
120                self.stats.dispatched.fetch_add(1, Ordering::Relaxed);
121                Ok(())
122            }
123            Err(e) => {
124                self.stats.dropped.fetch_add(1, Ordering::Relaxed);
125                Err(DispatchError::SendFailed(e))
126            }
127        }
128    }
129
130    /// Returns a vector of all receivers for subscription consumption.
131    pub fn receivers(&self) -> Vec<Arc<Receiver<T>>> {
132        let channels = self.channels.lock().unwrap();
133
134        match &*channels {
135            Channels::PerCore(map) => map.values().map(|(_, rx)| Arc::clone(rx)).collect(),
136            Channels::Shared((_, rx)) => vec![Arc::clone(rx)],
137        }
138    }
139
140    /// Manually closes all channels.
141    pub fn close_channels(&self) {
142        let mut channels = self.channels.lock().unwrap();
143
144        match &mut *channels {
145            Channels::PerCore(map) => {
146                for (_, (sender_result, _)) in map.iter_mut() {
147                    *sender_result = None;
148                }
149            }
150            Channels::Shared((sender_result, _)) => {
151                *sender_result = None;
152            }
153        }
154    }
155
156    /// Returns a reference to the dispatch statistics.
157    pub fn stats(&self) -> &SubscriptionStats {
158        &self.stats
159    }
160
161    /// Returns a reference to the name.
162    pub fn name(&self) -> &str {
163        &self.name
164    }
165}
166
167/// Errors that can occur during message dispatch.
168#[derive(Debug, Error)]
169pub enum DispatchError<T> {
170    /// A core ID was required for PerCore mode dispatch but none was provided.
171    #[error("Core ID required for PerCore dispatch")]
172    CoreIdRequired,
173
174    /// The specified core ID doesn't have a configured channel.
175    ///
176    /// This error occurs when the core ID provided for dispatch wasn't included in the original
177    /// core list when created the PerCore dispatcher.
178    #[error("No sender found for core: {0}")]
179    CoreNotFound(CoreId),
180
181    /// The underlying channel send operation failed.
182    ///
183    /// This error wraps the `TrySendError` from the crossbeam channel, which can occur when:
184    /// - The channel is full (`TrySendError::Full`)
185    /// - All receivers have been dropped (`TrySendError::Disconnected`)
186    #[error("Failed to send data")]
187    SendFailed(#[from] TrySendError<T>),
188}