retina_core/multicore/
channel_dispatcher.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! Channel Dispatcher
//!
//! This module provides a channel dispatching system that operate in two modes:
//! - **Shared**: All data is sent through a single shared channel.
//! - **PerCore**: Data is dispatched to specific channels based on coreID
//!
//! The dispatcher automatically tracks statistics for dispatched and dropped subscriptions
//! and provides thread-safe access to receivers for consumption.

use super::SubscriptionStats;
use crate::CoreId;
use crossbeam::channel::{bounded, Receiver, Sender, TrySendError};
use std::collections::HashMap;
use std::sync::{atomic::Ordering, Arc, Mutex};
use thiserror::Error;

/// Defines the operating mode for the channel dispatcher.
///
/// # Examples
///
/// ```rust
/// // Create a shared mode dispatcher
/// let shared_mode = ChannelMode::Shared;
/// ```
///
/// ```rust
/// // Create a per-core mode dispatcher for specific cores
/// let cores = vec![CoreId(0), CoreId(1), CoreId(2)];
/// let per_core_mode = ChannelMode::PerCore(cores);
/// ```
#[derive(Clone)]
pub enum ChannelMode {
    /// All subscriptions are sent through a single shared channel.
    Shared,
    /// Messages are routed to specific channels based on core ID.
    PerCore(Vec<CoreId>),
}

type Channel<T> = (Option<Sender<T>>, Arc<Receiver<T>>);

/// Internal representation of the channel configuration based on chosen operating mode.
pub enum Channels<T> {
    /// Single shared sender and receiver pair.
    Shared(Channel<T>),
    /// HashMap mapping core IDs to their dedicated sender/receiver pairs.
    PerCore(HashMap<CoreId, Channel<T>>),
}

/// A unified thread-safe interface for dispatching subscriptions.
///
/// # Type Parameters
///
/// * `T` - The type of subscriptions being dispatched. Must implement `Send + 'static`.
pub struct ChannelDispatcher<T> {
    name: String,
    channels: Mutex<Channels<T>>,
    stats: SubscriptionStats,
}

impl<T: Send + 'static> ChannelDispatcher<T> {
    /// Creates a new channel dispatcher with the specified mode and channel capacity.
    pub fn new(mode: ChannelMode, channel_size: usize, name: String) -> Self {
        match mode {
            ChannelMode::Shared => Self::new_shared(channel_size, name),
            ChannelMode::PerCore(rx_cores) => Self::new_percore(&rx_cores, channel_size, name),
        }
    }

    /// Creates a new shared-mode dispatcher.
    fn new_shared(channel_size: usize, name: String) -> Self {
        let (tx, rx) = bounded(channel_size);

        Self {
            name,
            channels: Mutex::new(Channels::Shared((Some(tx), Arc::new(rx)))),
            stats: SubscriptionStats::new(),
        }
    }

    /// Creates a new per-core mode dispatcher.
    fn new_percore(rx_cores: &[CoreId], channel_size: usize, name: String) -> Self {
        let mut map = HashMap::with_capacity(rx_cores.len());

        for &core in rx_cores {
            let (tx, rx) = bounded(channel_size);
            map.insert(core, (Some(tx), Arc::new(rx)));
        }

        Self {
            name,
            channels: Mutex::new(Channels::PerCore(map)),
            stats: SubscriptionStats::new(),
        }
    }

    /// Dispatches data to appropriate channel based on the dispatcher's mode.
    ///
    /// In either case, the subscription passing is non-blocking through crossbeam's try_send
    /// operation and doesn't rely on mutexes internally (relies on lower-level atomic operations).
    pub fn dispatch(&self, data: T, core_id: Option<&CoreId>) -> Result<(), DispatchError<T>> {
        let channels = self.channels.lock().unwrap();

        let result = match &*channels {
            Channels::PerCore(map) => {
                let core = core_id.ok_or(DispatchError::CoreIdRequired)?;
                let (sender_result, _) = map.get(core).ok_or(DispatchError::CoreNotFound(*core))?;
                match sender_result {
                    Some(sender) => sender.try_send(data),
                    None => Err(TrySendError::Disconnected(data)),
                }
            }
            Channels::Shared((sender_result, _)) => match sender_result {
                Some(sender) => sender.try_send(data),
                None => Err(TrySendError::Disconnected(data)),
            },
        };

        match result {
            Ok(()) => {
                self.stats.dispatched.fetch_add(1, Ordering::Relaxed);
                Ok(())
            }
            Err(e) => {
                self.stats.dropped.fetch_add(1, Ordering::Relaxed);
                Err(DispatchError::SendFailed(e))
            }
        }
    }

    /// Returns a vector of all receivers for subscription consumption.
    pub fn receivers(&self) -> Vec<Arc<Receiver<T>>> {
        let channels = self.channels.lock().unwrap();

        match &*channels {
            Channels::PerCore(map) => map.values().map(|(_, rx)| Arc::clone(rx)).collect(),
            Channels::Shared((_, rx)) => vec![Arc::clone(rx)],
        }
    }

    /// Manually closes all channels.
    pub fn close_channels(&self) {
        let mut channels = self.channels.lock().unwrap();

        match &mut *channels {
            Channels::PerCore(map) => {
                for (_, (sender_result, _)) in map.iter_mut() {
                    *sender_result = None;
                }
            }
            Channels::Shared((sender_result, _)) => {
                *sender_result = None;
            }
        }
    }

    /// Returns a reference to the dispatch statistics.
    pub fn stats(&self) -> &SubscriptionStats {
        &self.stats
    }

    /// Returns a reference to the name.
    pub fn name(&self) -> &str {
        &self.name
    }
}

/// Errors that can occur during message dispatch.
#[derive(Debug, Error)]
pub enum DispatchError<T> {
    /// A core ID was required for PerCore mode dispatch but none was provided.
    #[error("Core ID required for PerCore dispatch")]
    CoreIdRequired,

    /// The specified core ID doesn't have a configured channel.
    ///
    /// This error occurs when the core ID provided for dispatch wasn't included in the original
    /// core list when created the PerCore dispatcher.
    #[error("No sender found for core: {0}")]
    CoreNotFound(CoreId),

    /// The underlying channel send operation failed.
    ///
    /// This error wraps the `TrySendError` from the crossbeam channel, which can occur when:
    /// - The channel is full (`TrySendError::Full`)
    /// - All receivers have been dropped (`TrySendError::Disconnected`)
    #[error("Failed to send data")]
    SendFailed(#[from] TrySendError<T>),
}