iris_core/multicore/channel_dispatcher.rs
1//! Channel Dispatcher
2//!
3//! This module provides a channel dispatching system that operate in two modes:
4//! - **Shared**: All data is sent through a single shared channel.
5//! - **PerCore**: Data is dispatched to specific channels based on coreID
6//!
7//! The dispatcher automatically tracks statistics for dispatched and dropped subscriptions
8//! and provides thread-safe access to receivers for consumption.
9
10use super::SubscriptionStats;
11use crate::CoreId;
12use crossbeam::channel::{bounded, Receiver, Sender, TrySendError};
13use std::collections::HashMap;
14use std::sync::{atomic::Ordering, Arc, Mutex};
15use thiserror::Error;
16
17/// Defines the operating mode for the channel dispatcher.
18///
19/// # Examples
20///
21/// ```rust,ignore
22/// // Create a shared mode dispatcher
23/// let shared_mode = ChannelMode::Shared;
24/// ```
25///
26/// ```rust,ignore
27/// // Create a per-core mode dispatcher for specific cores
28/// let cores = vec![CoreId(0), CoreId(1), CoreId(2)];
29/// let per_core_mode = ChannelMode::PerCore(cores);
30/// ```
31#[derive(Clone)]
32pub enum ChannelMode {
33 /// All subscriptions are sent through a single shared channel.
34 Shared,
35 /// Messages are routed to specific channels based on core ID.
36 PerCore(Vec<CoreId>),
37}
38
39type Channel<T> = (Option<Sender<T>>, Arc<Receiver<T>>);
40
41/// Internal representation of the channel configuration based on chosen operating mode.
42pub enum Channels<T> {
43 /// Single shared sender and receiver pair.
44 Shared(Channel<T>),
45 /// HashMap mapping core IDs to their dedicated sender/receiver pairs.
46 PerCore(HashMap<CoreId, Channel<T>>),
47}
48
49/// A unified thread-safe interface for dispatching subscriptions.
50///
51/// # Type Parameters
52///
53/// * `T` - The type of subscriptions being dispatched. Must implement `Send + 'static`.
54pub struct ChannelDispatcher<T> {
55 name: String,
56 channels: Mutex<Channels<T>>,
57 stats: SubscriptionStats,
58}
59
60impl<T: Send + 'static> ChannelDispatcher<T> {
61 /// Creates a new channel dispatcher with the specified mode and channel capacity.
62 pub fn new(mode: ChannelMode, channel_size: usize, name: String) -> Self {
63 match mode {
64 ChannelMode::Shared => Self::new_shared(channel_size, name),
65 ChannelMode::PerCore(rx_cores) => Self::new_percore(&rx_cores, channel_size, name),
66 }
67 }
68
69 /// Creates a new shared-mode dispatcher.
70 fn new_shared(channel_size: usize, name: String) -> Self {
71 let (tx, rx) = bounded(channel_size);
72
73 Self {
74 name,
75 channels: Mutex::new(Channels::Shared((Some(tx), Arc::new(rx)))),
76 stats: SubscriptionStats::new(),
77 }
78 }
79
80 /// Creates a new per-core mode dispatcher.
81 fn new_percore(rx_cores: &[CoreId], channel_size: usize, name: String) -> Self {
82 let mut map = HashMap::with_capacity(rx_cores.len());
83
84 for &core in rx_cores {
85 let (tx, rx) = bounded(channel_size);
86 map.insert(core, (Some(tx), Arc::new(rx)));
87 }
88
89 Self {
90 name,
91 channels: Mutex::new(Channels::PerCore(map)),
92 stats: SubscriptionStats::new(),
93 }
94 }
95
96 /// Dispatches data to appropriate channel based on the dispatcher's mode.
97 ///
98 /// In either case, the subscription passing is non-blocking through crossbeam's try_send
99 /// operation and doesn't rely on mutexes internally (relies on lower-level atomic operations).
100 pub fn dispatch(&self, data: T, core_id: Option<&CoreId>) -> Result<(), DispatchError<T>> {
101 let channels = self.channels.lock().unwrap();
102
103 let result = match &*channels {
104 Channels::PerCore(map) => {
105 let core = core_id.ok_or(DispatchError::CoreIdRequired)?;
106 let (sender_result, _) = map.get(core).ok_or(DispatchError::CoreNotFound(*core))?;
107 match sender_result {
108 Some(sender) => sender.try_send(data),
109 None => Err(TrySendError::Disconnected(data)),
110 }
111 }
112 Channels::Shared((sender_result, _)) => match sender_result {
113 Some(sender) => sender.try_send(data),
114 None => Err(TrySendError::Disconnected(data)),
115 },
116 };
117
118 match result {
119 Ok(()) => {
120 self.stats.dispatched.fetch_add(1, Ordering::Relaxed);
121 Ok(())
122 }
123 Err(e) => {
124 self.stats.dropped.fetch_add(1, Ordering::Relaxed);
125 Err(DispatchError::SendFailed(e))
126 }
127 }
128 }
129
130 /// Returns a vector of all receivers for subscription consumption.
131 pub fn receivers(&self) -> Vec<Arc<Receiver<T>>> {
132 let channels = self.channels.lock().unwrap();
133
134 match &*channels {
135 Channels::PerCore(map) => map.values().map(|(_, rx)| Arc::clone(rx)).collect(),
136 Channels::Shared((_, rx)) => vec![Arc::clone(rx)],
137 }
138 }
139
140 /// Manually closes all channels.
141 pub fn close_channels(&self) {
142 let mut channels = self.channels.lock().unwrap();
143
144 match &mut *channels {
145 Channels::PerCore(map) => {
146 for (_, (sender_result, _)) in map.iter_mut() {
147 *sender_result = None;
148 }
149 }
150 Channels::Shared((sender_result, _)) => {
151 *sender_result = None;
152 }
153 }
154 }
155
156 /// Returns a reference to the dispatch statistics.
157 pub fn stats(&self) -> &SubscriptionStats {
158 &self.stats
159 }
160
161 /// Returns a reference to the name.
162 pub fn name(&self) -> &str {
163 &self.name
164 }
165}
166
167/// Errors that can occur during message dispatch.
168#[derive(Debug, Error)]
169pub enum DispatchError<T> {
170 /// A core ID was required for PerCore mode dispatch but none was provided.
171 #[error("Core ID required for PerCore dispatch")]
172 CoreIdRequired,
173
174 /// The specified core ID doesn't have a configured channel.
175 ///
176 /// This error occurs when the core ID provided for dispatch wasn't included in the original
177 /// core list when created the PerCore dispatcher.
178 #[error("No sender found for core: {0}")]
179 CoreNotFound(CoreId),
180
181 /// The underlying channel send operation failed.
182 ///
183 /// This error wraps the `TrySendError` from the crossbeam channel, which can occur when:
184 /// - The channel is full (`TrySendError::Full`)
185 /// - All receivers have been dropped (`TrySendError::Disconnected`)
186 #[error("Failed to send data")]
187 SendFailed(#[from] TrySendError<T>),
188}