1cac7dca0Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd. 2cac7dca0Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License"); 3cac7dca0Sopenharmony_ci// you may not use this file except in compliance with the License. 4cac7dca0Sopenharmony_ci// You may obtain a copy of the License at 5cac7dca0Sopenharmony_ci// 6cac7dca0Sopenharmony_ci// http://www.apache.org/licenses/LICENSE-2.0 7cac7dca0Sopenharmony_ci// 8cac7dca0Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software 9cac7dca0Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS, 10cac7dca0Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11cac7dca0Sopenharmony_ci// See the License for the specific language governing permissions and 12cac7dca0Sopenharmony_ci// limitations under the License. 13cac7dca0Sopenharmony_ci 14cac7dca0Sopenharmony_ci//! One-shot channel is used to send a single message from a single sender to a 15cac7dca0Sopenharmony_ci//! single receiver. The [`channel`] function returns a [`Sender`] and 16cac7dca0Sopenharmony_ci//! [`Receiver`] handle pair that controls channel. 17cac7dca0Sopenharmony_ci//! 18cac7dca0Sopenharmony_ci//! The `Sender` handle is used by the producer to send a message. 19cac7dca0Sopenharmony_ci//! The `Receiver` handle is used by the consumer to receive the message. It has 20cac7dca0Sopenharmony_ci//! implemented the `Future` trait 21cac7dca0Sopenharmony_ci//! 22cac7dca0Sopenharmony_ci//! The `send` method is not async. It can be called from non-async context. 23cac7dca0Sopenharmony_ci//! 24cac7dca0Sopenharmony_ci//! # Examples 25cac7dca0Sopenharmony_ci//! 26cac7dca0Sopenharmony_ci//! ``` 27cac7dca0Sopenharmony_ci//! use ylong_runtime::sync::oneshot; 28cac7dca0Sopenharmony_ci//! async fn io_func() { 29cac7dca0Sopenharmony_ci//! let (tx, rx) = oneshot::channel(); 30cac7dca0Sopenharmony_ci//! ylong_runtime::spawn(async move { 31cac7dca0Sopenharmony_ci//! if let Err(_) = tx.send(6) { 32cac7dca0Sopenharmony_ci//! println!("Receiver dropped"); 33cac7dca0Sopenharmony_ci//! } 34cac7dca0Sopenharmony_ci//! }); 35cac7dca0Sopenharmony_ci//! 36cac7dca0Sopenharmony_ci//! match rx.await { 37cac7dca0Sopenharmony_ci//! Ok(v) => println!("received : {:?}", v), 38cac7dca0Sopenharmony_ci//! Err(_) => println!("Sender dropped"), 39cac7dca0Sopenharmony_ci//! } 40cac7dca0Sopenharmony_ci//! } 41cac7dca0Sopenharmony_ci//! ``` 42cac7dca0Sopenharmony_ciuse std::cell::RefCell; 43cac7dca0Sopenharmony_ciuse std::fmt::{Debug, Formatter}; 44cac7dca0Sopenharmony_ciuse std::future::Future; 45cac7dca0Sopenharmony_ciuse std::pin::Pin; 46cac7dca0Sopenharmony_ciuse std::sync::atomic::AtomicUsize; 47cac7dca0Sopenharmony_ciuse std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst}; 48cac7dca0Sopenharmony_ciuse std::sync::Arc; 49cac7dca0Sopenharmony_ciuse std::task::Poll::{Pending, Ready}; 50cac7dca0Sopenharmony_ciuse std::task::{Context, Poll}; 51cac7dca0Sopenharmony_ci 52cac7dca0Sopenharmony_ciuse super::atomic_waker::AtomicWaker; 53cac7dca0Sopenharmony_ciuse super::error::{RecvError, TryRecvError}; 54cac7dca0Sopenharmony_ci 55cac7dca0Sopenharmony_ci/// Initial state. 56cac7dca0Sopenharmony_ciconst INIT: usize = 0b00; 57cac7dca0Sopenharmony_ci/// Sender has sent the value. 58cac7dca0Sopenharmony_ciconst SENT: usize = 0b01; 59cac7dca0Sopenharmony_ci/// Channel is closed. 60cac7dca0Sopenharmony_ciconst CLOSED: usize = 0b10; 61cac7dca0Sopenharmony_ci 62cac7dca0Sopenharmony_ci/// Creates a new one-shot channel with a `Sender` and `Receiver` handle pair. 63cac7dca0Sopenharmony_ci/// 64cac7dca0Sopenharmony_ci/// The `Sender` can send a single value to the `Receiver`. 65cac7dca0Sopenharmony_ci/// 66cac7dca0Sopenharmony_ci/// # Examples 67cac7dca0Sopenharmony_ci/// 68cac7dca0Sopenharmony_ci/// ``` 69cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot; 70cac7dca0Sopenharmony_ci/// async fn io_func() { 71cac7dca0Sopenharmony_ci/// let (tx, rx) = oneshot::channel(); 72cac7dca0Sopenharmony_ci/// ylong_runtime::spawn(async move { 73cac7dca0Sopenharmony_ci/// if let Err(_) = tx.send(6) { 74cac7dca0Sopenharmony_ci/// println!("Receiver dropped"); 75cac7dca0Sopenharmony_ci/// } 76cac7dca0Sopenharmony_ci/// }); 77cac7dca0Sopenharmony_ci/// 78cac7dca0Sopenharmony_ci/// match rx.await { 79cac7dca0Sopenharmony_ci/// Ok(v) => println!("received : {:?}", v), 80cac7dca0Sopenharmony_ci/// Err(_) => println!("Sender dropped"), 81cac7dca0Sopenharmony_ci/// } 82cac7dca0Sopenharmony_ci/// } 83cac7dca0Sopenharmony_ci/// ``` 84cac7dca0Sopenharmony_cipub fn channel<T>() -> (Sender<T>, Receiver<T>) { 85cac7dca0Sopenharmony_ci let channel = Arc::new(Channel::new()); 86cac7dca0Sopenharmony_ci let tx = Sender { 87cac7dca0Sopenharmony_ci channel: channel.clone(), 88cac7dca0Sopenharmony_ci }; 89cac7dca0Sopenharmony_ci let rx = Receiver { channel }; 90cac7dca0Sopenharmony_ci (tx, rx) 91cac7dca0Sopenharmony_ci} 92cac7dca0Sopenharmony_ci 93cac7dca0Sopenharmony_ci/// Sends a single value to the associated [`Receiver`]. 94cac7dca0Sopenharmony_ci/// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`] 95cac7dca0Sopenharmony_ci/// function. 96cac7dca0Sopenharmony_ci/// 97cac7dca0Sopenharmony_ci/// # Examples 98cac7dca0Sopenharmony_ci/// 99cac7dca0Sopenharmony_ci/// ``` 100cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot; 101cac7dca0Sopenharmony_ci/// async fn io_func() { 102cac7dca0Sopenharmony_ci/// let (tx, rx) = oneshot::channel(); 103cac7dca0Sopenharmony_ci/// ylong_runtime::spawn(async move { 104cac7dca0Sopenharmony_ci/// if let Err(_) = tx.send(6) { 105cac7dca0Sopenharmony_ci/// println!("Receiver dropped"); 106cac7dca0Sopenharmony_ci/// } 107cac7dca0Sopenharmony_ci/// }); 108cac7dca0Sopenharmony_ci/// 109cac7dca0Sopenharmony_ci/// match rx.await { 110cac7dca0Sopenharmony_ci/// Ok(v) => println!("received : {:?}", v), 111cac7dca0Sopenharmony_ci/// Err(_) => println!("Sender dropped"), 112cac7dca0Sopenharmony_ci/// } 113cac7dca0Sopenharmony_ci/// } 114cac7dca0Sopenharmony_ci/// ``` 115cac7dca0Sopenharmony_ci/// 116cac7dca0Sopenharmony_ci/// The receiver will fail with a [`RecvError`] if the sender is dropped without 117cac7dca0Sopenharmony_ci/// sending a value. 118cac7dca0Sopenharmony_ci/// 119cac7dca0Sopenharmony_ci/// # Examples 120cac7dca0Sopenharmony_ci/// 121cac7dca0Sopenharmony_ci/// ``` 122cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot; 123cac7dca0Sopenharmony_ci/// async fn io_func() { 124cac7dca0Sopenharmony_ci/// let (tx, rx) = oneshot::channel::<()>(); 125cac7dca0Sopenharmony_ci/// ylong_runtime::spawn(async move { 126cac7dca0Sopenharmony_ci/// drop(tx); 127cac7dca0Sopenharmony_ci/// }); 128cac7dca0Sopenharmony_ci/// 129cac7dca0Sopenharmony_ci/// match rx.await { 130cac7dca0Sopenharmony_ci/// Ok(v) => panic!("This won't happen"), 131cac7dca0Sopenharmony_ci/// Err(_) => println!("Sender dropped"), 132cac7dca0Sopenharmony_ci/// } 133cac7dca0Sopenharmony_ci/// } 134cac7dca0Sopenharmony_ci/// ``` 135cac7dca0Sopenharmony_ci#[derive(Debug)] 136cac7dca0Sopenharmony_cipub struct Sender<T> { 137cac7dca0Sopenharmony_ci channel: Arc<Channel<T>>, 138cac7dca0Sopenharmony_ci} 139cac7dca0Sopenharmony_ci 140cac7dca0Sopenharmony_ciimpl<T> Sender<T> { 141cac7dca0Sopenharmony_ci /// Sends a single value to the associated [`Receiver`], returns the value 142cac7dca0Sopenharmony_ci /// back if it fails to send. 143cac7dca0Sopenharmony_ci /// 144cac7dca0Sopenharmony_ci /// The sender will consume itself when calling this method. It can send a 145cac7dca0Sopenharmony_ci /// single value in synchronous code as it doesn't need waiting. 146cac7dca0Sopenharmony_ci /// 147cac7dca0Sopenharmony_ci /// # Examples 148cac7dca0Sopenharmony_ci /// 149cac7dca0Sopenharmony_ci /// ``` 150cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::oneshot; 151cac7dca0Sopenharmony_ci /// async fn io_func() { 152cac7dca0Sopenharmony_ci /// let (tx, rx) = oneshot::channel(); 153cac7dca0Sopenharmony_ci /// ylong_runtime::spawn(async move { 154cac7dca0Sopenharmony_ci /// if let Err(_) = tx.send(6) { 155cac7dca0Sopenharmony_ci /// println!("Receiver dropped"); 156cac7dca0Sopenharmony_ci /// } 157cac7dca0Sopenharmony_ci /// }); 158cac7dca0Sopenharmony_ci /// 159cac7dca0Sopenharmony_ci /// match rx.await { 160cac7dca0Sopenharmony_ci /// Ok(v) => println!("received : {:?}", v), 161cac7dca0Sopenharmony_ci /// Err(_) => println!("Sender dropped"), 162cac7dca0Sopenharmony_ci /// } 163cac7dca0Sopenharmony_ci /// } 164cac7dca0Sopenharmony_ci /// ``` 165cac7dca0Sopenharmony_ci pub fn send(self, value: T) -> Result<(), T> { 166cac7dca0Sopenharmony_ci self.channel.value.borrow_mut().replace(value); 167cac7dca0Sopenharmony_ci 168cac7dca0Sopenharmony_ci loop { 169cac7dca0Sopenharmony_ci match self.channel.state.load(Acquire) { 170cac7dca0Sopenharmony_ci INIT => { 171cac7dca0Sopenharmony_ci if self 172cac7dca0Sopenharmony_ci .channel 173cac7dca0Sopenharmony_ci .state 174cac7dca0Sopenharmony_ci .compare_exchange(INIT, SENT, AcqRel, Acquire) 175cac7dca0Sopenharmony_ci .is_ok() 176cac7dca0Sopenharmony_ci { 177cac7dca0Sopenharmony_ci self.channel.waker.wake(); 178cac7dca0Sopenharmony_ci return Ok(()); 179cac7dca0Sopenharmony_ci } 180cac7dca0Sopenharmony_ci } 181cac7dca0Sopenharmony_ci CLOSED => { 182cac7dca0Sopenharmony_ci // value is stored in this function before. 183cac7dca0Sopenharmony_ci return Err(self.channel.take_value().unwrap()); 184cac7dca0Sopenharmony_ci } 185cac7dca0Sopenharmony_ci _ => unreachable!(), 186cac7dca0Sopenharmony_ci } 187cac7dca0Sopenharmony_ci } 188cac7dca0Sopenharmony_ci } 189cac7dca0Sopenharmony_ci 190cac7dca0Sopenharmony_ci /// Checks whether channel is closed. if so, the sender could not 191cac7dca0Sopenharmony_ci /// send any value anymore. It returns true if the [`Receiver`] is dropped 192cac7dca0Sopenharmony_ci /// or calls the [`close`] method. 193cac7dca0Sopenharmony_ci /// 194cac7dca0Sopenharmony_ci /// [`close`]: Receiver::close 195cac7dca0Sopenharmony_ci /// 196cac7dca0Sopenharmony_ci /// # Examples 197cac7dca0Sopenharmony_ci /// 198cac7dca0Sopenharmony_ci /// ``` 199cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::oneshot; 200cac7dca0Sopenharmony_ci /// async fn io_func() { 201cac7dca0Sopenharmony_ci /// let (tx, rx) = oneshot::channel(); 202cac7dca0Sopenharmony_ci /// assert!(!tx.is_closed()); 203cac7dca0Sopenharmony_ci /// 204cac7dca0Sopenharmony_ci /// drop(rx); 205cac7dca0Sopenharmony_ci /// 206cac7dca0Sopenharmony_ci /// assert!(tx.is_closed()); 207cac7dca0Sopenharmony_ci /// assert!(tx.send("no receive").is_err()); 208cac7dca0Sopenharmony_ci /// } 209cac7dca0Sopenharmony_ci /// ``` 210cac7dca0Sopenharmony_ci pub fn is_closed(&self) -> bool { 211cac7dca0Sopenharmony_ci self.channel.state.load(Acquire) == CLOSED 212cac7dca0Sopenharmony_ci } 213cac7dca0Sopenharmony_ci} 214cac7dca0Sopenharmony_ci 215cac7dca0Sopenharmony_ciimpl<T> Drop for Sender<T> { 216cac7dca0Sopenharmony_ci fn drop(&mut self) { 217cac7dca0Sopenharmony_ci if self.channel.state.swap(SENT, SeqCst) == INIT { 218cac7dca0Sopenharmony_ci self.channel.waker.wake(); 219cac7dca0Sopenharmony_ci } 220cac7dca0Sopenharmony_ci } 221cac7dca0Sopenharmony_ci} 222cac7dca0Sopenharmony_ci 223cac7dca0Sopenharmony_ci/// Receives a single value from the associated [`Sender`]. 224cac7dca0Sopenharmony_ci/// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`] 225cac7dca0Sopenharmony_ci/// function. 226cac7dca0Sopenharmony_ci/// 227cac7dca0Sopenharmony_ci/// There is no `recv` method to receive the message because the receiver itself 228cac7dca0Sopenharmony_ci/// implements the [`Future`] trait. To receive a value, `.await` the `Receiver` 229cac7dca0Sopenharmony_ci/// object directly. 230cac7dca0Sopenharmony_ci/// 231cac7dca0Sopenharmony_ci/// # Examples 232cac7dca0Sopenharmony_ci/// 233cac7dca0Sopenharmony_ci/// ``` 234cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot; 235cac7dca0Sopenharmony_ci/// async fn io_func() { 236cac7dca0Sopenharmony_ci/// let (tx, rx) = oneshot::channel(); 237cac7dca0Sopenharmony_ci/// ylong_runtime::spawn(async move { 238cac7dca0Sopenharmony_ci/// if let Err(_) = tx.send(6) { 239cac7dca0Sopenharmony_ci/// println!("Receiver dropped"); 240cac7dca0Sopenharmony_ci/// } 241cac7dca0Sopenharmony_ci/// }); 242cac7dca0Sopenharmony_ci/// 243cac7dca0Sopenharmony_ci/// match rx.await { 244cac7dca0Sopenharmony_ci/// Ok(v) => println!("received : {:?}", v), 245cac7dca0Sopenharmony_ci/// Err(_) => println!("Sender dropped"), 246cac7dca0Sopenharmony_ci/// } 247cac7dca0Sopenharmony_ci/// } 248cac7dca0Sopenharmony_ci/// ``` 249cac7dca0Sopenharmony_ci/// 250cac7dca0Sopenharmony_ci/// The receiver will fail with [`RecvError`], if the sender is dropped without 251cac7dca0Sopenharmony_ci/// sending a value. 252cac7dca0Sopenharmony_ci/// 253cac7dca0Sopenharmony_ci/// # Examples 254cac7dca0Sopenharmony_ci/// 255cac7dca0Sopenharmony_ci/// ``` 256cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot; 257cac7dca0Sopenharmony_ci/// async fn io_func() { 258cac7dca0Sopenharmony_ci/// let (tx, rx) = oneshot::channel::<u32>(); 259cac7dca0Sopenharmony_ci/// ylong_runtime::spawn(async move { 260cac7dca0Sopenharmony_ci/// drop(tx); 261cac7dca0Sopenharmony_ci/// }); 262cac7dca0Sopenharmony_ci/// 263cac7dca0Sopenharmony_ci/// match rx.await { 264cac7dca0Sopenharmony_ci/// Ok(v) => panic!("This won't happen"), 265cac7dca0Sopenharmony_ci/// Err(_) => println!("Sender dropped"), 266cac7dca0Sopenharmony_ci/// } 267cac7dca0Sopenharmony_ci/// } 268cac7dca0Sopenharmony_ci/// ``` 269cac7dca0Sopenharmony_ci#[derive(Debug)] 270cac7dca0Sopenharmony_cipub struct Receiver<T> { 271cac7dca0Sopenharmony_ci channel: Arc<Channel<T>>, 272cac7dca0Sopenharmony_ci} 273cac7dca0Sopenharmony_ci 274cac7dca0Sopenharmony_ciimpl<T> Receiver<T> { 275cac7dca0Sopenharmony_ci /// Attempts to receive a value from the associated [`Sender`]. 276cac7dca0Sopenharmony_ci /// 277cac7dca0Sopenharmony_ci /// The method will still receive the result if the `Sender` gets dropped 278cac7dca0Sopenharmony_ci /// after sending the message. 279cac7dca0Sopenharmony_ci /// 280cac7dca0Sopenharmony_ci /// # Return value 281cac7dca0Sopenharmony_ci /// The function returns: 282cac7dca0Sopenharmony_ci /// * `Ok(T)` if receiving a value successfully. 283cac7dca0Sopenharmony_ci /// * `Err(TryRecvError::Empty)` if no value has been sent yet. 284cac7dca0Sopenharmony_ci /// * `Err(TryRecvError::Closed)` if the sender has dropped without sending 285cac7dca0Sopenharmony_ci /// a value, or if the message has already been received. 286cac7dca0Sopenharmony_ci /// 287cac7dca0Sopenharmony_ci /// # Examples 288cac7dca0Sopenharmony_ci /// 289cac7dca0Sopenharmony_ci /// `try_recv` before a value is sent, then after. 290cac7dca0Sopenharmony_ci /// 291cac7dca0Sopenharmony_ci /// ``` 292cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::error::TryRecvError; 293cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::oneshot; 294cac7dca0Sopenharmony_ci /// async fn io_func() { 295cac7dca0Sopenharmony_ci /// let (tx, mut rx) = oneshot::channel(); 296cac7dca0Sopenharmony_ci /// match rx.try_recv() { 297cac7dca0Sopenharmony_ci /// Err(TryRecvError::Empty) => {} 298cac7dca0Sopenharmony_ci /// _ => panic!("This won't happen"), 299cac7dca0Sopenharmony_ci /// } 300cac7dca0Sopenharmony_ci /// 301cac7dca0Sopenharmony_ci /// // Send a value 302cac7dca0Sopenharmony_ci /// tx.send("Hello").unwrap(); 303cac7dca0Sopenharmony_ci /// 304cac7dca0Sopenharmony_ci /// match rx.try_recv() { 305cac7dca0Sopenharmony_ci /// Ok(value) => assert_eq!(value, "Hello"), 306cac7dca0Sopenharmony_ci /// _ => panic!("This won't happen"), 307cac7dca0Sopenharmony_ci /// } 308cac7dca0Sopenharmony_ci /// } 309cac7dca0Sopenharmony_ci /// ``` 310cac7dca0Sopenharmony_ci /// 311cac7dca0Sopenharmony_ci /// `try_recv` when the sender dropped before sending a value 312cac7dca0Sopenharmony_ci /// 313cac7dca0Sopenharmony_ci /// ``` 314cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::error::TryRecvError; 315cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::oneshot; 316cac7dca0Sopenharmony_ci /// async fn io_func() { 317cac7dca0Sopenharmony_ci /// let (tx, mut rx) = oneshot::channel::<()>(); 318cac7dca0Sopenharmony_ci /// drop(tx); 319cac7dca0Sopenharmony_ci /// 320cac7dca0Sopenharmony_ci /// match rx.try_recv() { 321cac7dca0Sopenharmony_ci /// Err(TryRecvError::Closed) => {} 322cac7dca0Sopenharmony_ci /// _ => panic!("This won't happen"), 323cac7dca0Sopenharmony_ci /// } 324cac7dca0Sopenharmony_ci /// } 325cac7dca0Sopenharmony_ci /// ``` 326cac7dca0Sopenharmony_ci pub fn try_recv(&mut self) -> Result<T, TryRecvError> { 327cac7dca0Sopenharmony_ci match self.channel.state.load(Acquire) { 328cac7dca0Sopenharmony_ci INIT => Err(TryRecvError::Empty), 329cac7dca0Sopenharmony_ci SENT => self 330cac7dca0Sopenharmony_ci .channel 331cac7dca0Sopenharmony_ci .take_value_sent() 332cac7dca0Sopenharmony_ci .map_err(|_| TryRecvError::Closed), 333cac7dca0Sopenharmony_ci CLOSED => Err(TryRecvError::Closed), 334cac7dca0Sopenharmony_ci _ => unreachable!(), 335cac7dca0Sopenharmony_ci } 336cac7dca0Sopenharmony_ci } 337cac7dca0Sopenharmony_ci 338cac7dca0Sopenharmony_ci /// Closes the channel, prevents the `Sender` from sending a value. 339cac7dca0Sopenharmony_ci /// 340cac7dca0Sopenharmony_ci /// The `Sender` will fail to call [`send`] after the `Receiver` called 341cac7dca0Sopenharmony_ci /// `close`. It will do nothing if the channel is already closed or the 342cac7dca0Sopenharmony_ci /// message has been already received. 343cac7dca0Sopenharmony_ci /// 344cac7dca0Sopenharmony_ci /// [`send`]: Sender::send 345cac7dca0Sopenharmony_ci /// [`try_recv`]: Receiver::try_recv 346cac7dca0Sopenharmony_ci /// 347cac7dca0Sopenharmony_ci /// # Examples 348cac7dca0Sopenharmony_ci /// ``` 349cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::oneshot; 350cac7dca0Sopenharmony_ci /// async fn io_func() { 351cac7dca0Sopenharmony_ci /// let (tx, mut rx) = oneshot::channel(); 352cac7dca0Sopenharmony_ci /// assert!(!tx.is_closed()); 353cac7dca0Sopenharmony_ci /// 354cac7dca0Sopenharmony_ci /// rx.close(); 355cac7dca0Sopenharmony_ci /// 356cac7dca0Sopenharmony_ci /// assert!(tx.is_closed()); 357cac7dca0Sopenharmony_ci /// assert!(tx.send("no receive").is_err()); 358cac7dca0Sopenharmony_ci /// } 359cac7dca0Sopenharmony_ci /// ``` 360cac7dca0Sopenharmony_ci /// 361cac7dca0Sopenharmony_ci /// Receive a value sent **before** calling `close` 362cac7dca0Sopenharmony_ci /// 363cac7dca0Sopenharmony_ci /// ``` 364cac7dca0Sopenharmony_ci /// use ylong_runtime::sync::oneshot; 365cac7dca0Sopenharmony_ci /// async fn io_func() { 366cac7dca0Sopenharmony_ci /// let (tx, mut rx) = oneshot::channel(); 367cac7dca0Sopenharmony_ci /// assert!(tx.send("Hello").is_ok()); 368cac7dca0Sopenharmony_ci /// 369cac7dca0Sopenharmony_ci /// rx.close(); 370cac7dca0Sopenharmony_ci /// 371cac7dca0Sopenharmony_ci /// let msg = rx.try_recv().unwrap(); 372cac7dca0Sopenharmony_ci /// assert_eq!(msg, "Hello"); 373cac7dca0Sopenharmony_ci /// } 374cac7dca0Sopenharmony_ci /// ``` 375cac7dca0Sopenharmony_ci pub fn close(&mut self) { 376cac7dca0Sopenharmony_ci let _ = self 377cac7dca0Sopenharmony_ci .channel 378cac7dca0Sopenharmony_ci .state 379cac7dca0Sopenharmony_ci .compare_exchange(INIT, CLOSED, AcqRel, Acquire); 380cac7dca0Sopenharmony_ci } 381cac7dca0Sopenharmony_ci} 382cac7dca0Sopenharmony_ci 383cac7dca0Sopenharmony_ciimpl<T> Drop for Receiver<T> { 384cac7dca0Sopenharmony_ci fn drop(&mut self) { 385cac7dca0Sopenharmony_ci self.close(); 386cac7dca0Sopenharmony_ci } 387cac7dca0Sopenharmony_ci} 388cac7dca0Sopenharmony_ci 389cac7dca0Sopenharmony_ciimpl<T> Future for Receiver<T> { 390cac7dca0Sopenharmony_ci type Output = Result<T, RecvError>; 391cac7dca0Sopenharmony_ci 392cac7dca0Sopenharmony_ci fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 393cac7dca0Sopenharmony_ci match self.channel.state.load(Acquire) { 394cac7dca0Sopenharmony_ci INIT => { 395cac7dca0Sopenharmony_ci self.channel.waker.register_by_ref(cx.waker()); 396cac7dca0Sopenharmony_ci if self.channel.state.load(Acquire) == SENT { 397cac7dca0Sopenharmony_ci Ready(self.channel.take_value_sent()) 398cac7dca0Sopenharmony_ci } else { 399cac7dca0Sopenharmony_ci Pending 400cac7dca0Sopenharmony_ci } 401cac7dca0Sopenharmony_ci } 402cac7dca0Sopenharmony_ci SENT => Ready(self.channel.take_value_sent()), 403cac7dca0Sopenharmony_ci CLOSED => Ready(Err(RecvError)), 404cac7dca0Sopenharmony_ci _ => unreachable!(), 405cac7dca0Sopenharmony_ci } 406cac7dca0Sopenharmony_ci } 407cac7dca0Sopenharmony_ci} 408cac7dca0Sopenharmony_ci 409cac7dca0Sopenharmony_cistruct Channel<T> { 410cac7dca0Sopenharmony_ci /// The state of the channel. 411cac7dca0Sopenharmony_ci state: AtomicUsize, 412cac7dca0Sopenharmony_ci 413cac7dca0Sopenharmony_ci /// The value passed by channel, it is set by `Sender` and read by 414cac7dca0Sopenharmony_ci /// `Receiver`. 415cac7dca0Sopenharmony_ci value: RefCell<Option<T>>, 416cac7dca0Sopenharmony_ci 417cac7dca0Sopenharmony_ci /// The waker to notify the sender task or the receiver task. 418cac7dca0Sopenharmony_ci waker: AtomicWaker, 419cac7dca0Sopenharmony_ci} 420cac7dca0Sopenharmony_ci 421cac7dca0Sopenharmony_ciimpl<T> Channel<T> { 422cac7dca0Sopenharmony_ci fn new() -> Channel<T> { 423cac7dca0Sopenharmony_ci Channel { 424cac7dca0Sopenharmony_ci state: AtomicUsize::new(INIT), 425cac7dca0Sopenharmony_ci value: RefCell::new(None), 426cac7dca0Sopenharmony_ci waker: AtomicWaker::new(), 427cac7dca0Sopenharmony_ci } 428cac7dca0Sopenharmony_ci } 429cac7dca0Sopenharmony_ci 430cac7dca0Sopenharmony_ci fn take_value_sent(&self) -> Result<T, RecvError> { 431cac7dca0Sopenharmony_ci match self.take_value() { 432cac7dca0Sopenharmony_ci Some(val) => { 433cac7dca0Sopenharmony_ci self.state.store(CLOSED, Release); 434cac7dca0Sopenharmony_ci Ok(val) 435cac7dca0Sopenharmony_ci } 436cac7dca0Sopenharmony_ci None => Err(RecvError), 437cac7dca0Sopenharmony_ci } 438cac7dca0Sopenharmony_ci } 439cac7dca0Sopenharmony_ci 440cac7dca0Sopenharmony_ci fn take_value(&self) -> Option<T> { 441cac7dca0Sopenharmony_ci self.value.borrow_mut().take() 442cac7dca0Sopenharmony_ci } 443cac7dca0Sopenharmony_ci} 444cac7dca0Sopenharmony_ci 445cac7dca0Sopenharmony_ciunsafe impl<T: Send> Send for Channel<T> {} 446cac7dca0Sopenharmony_ciunsafe impl<T: Send> Sync for Channel<T> {} 447cac7dca0Sopenharmony_ci 448cac7dca0Sopenharmony_ciimpl<T> Drop for Channel<T> { 449cac7dca0Sopenharmony_ci fn drop(&mut self) { 450cac7dca0Sopenharmony_ci self.waker.take_waker(); 451cac7dca0Sopenharmony_ci } 452cac7dca0Sopenharmony_ci} 453cac7dca0Sopenharmony_ci 454cac7dca0Sopenharmony_ciimpl<T: Debug> Debug for Channel<T> { 455cac7dca0Sopenharmony_ci fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 456cac7dca0Sopenharmony_ci f.debug_struct("Channel") 457cac7dca0Sopenharmony_ci .field("state", &self.state.load(Acquire)) 458cac7dca0Sopenharmony_ci .finish() 459cac7dca0Sopenharmony_ci } 460cac7dca0Sopenharmony_ci} 461cac7dca0Sopenharmony_ci 462cac7dca0Sopenharmony_ci#[cfg(test)] 463cac7dca0Sopenharmony_cimod tests { 464cac7dca0Sopenharmony_ci use crate::spawn; 465cac7dca0Sopenharmony_ci use crate::sync::error::TryRecvError; 466cac7dca0Sopenharmony_ci use crate::sync::oneshot; 467cac7dca0Sopenharmony_ci 468cac7dca0Sopenharmony_ci /// UT test cases for `send()` and `try_recv()`. 469cac7dca0Sopenharmony_ci /// 470cac7dca0Sopenharmony_ci /// # Brief 471cac7dca0Sopenharmony_ci /// 1. Call channel to create a sender and a receiver handle pair. 472cac7dca0Sopenharmony_ci /// 2. Receiver tries receiving a message before the sender sends one. 473cac7dca0Sopenharmony_ci /// 3. Receiver tries receiving a message after the sender sends one. 474cac7dca0Sopenharmony_ci /// 4. Check if the test results are correct. 475cac7dca0Sopenharmony_ci #[test] 476cac7dca0Sopenharmony_ci fn send_try_recv() { 477cac7dca0Sopenharmony_ci let (tx, mut rx) = oneshot::channel(); 478cac7dca0Sopenharmony_ci assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 479cac7dca0Sopenharmony_ci tx.send("hello").unwrap(); 480cac7dca0Sopenharmony_ci 481cac7dca0Sopenharmony_ci assert_eq!(rx.try_recv().unwrap(), "hello"); 482cac7dca0Sopenharmony_ci assert_eq!(rx.try_recv(), Err(TryRecvError::Closed)); 483cac7dca0Sopenharmony_ci } 484cac7dca0Sopenharmony_ci 485cac7dca0Sopenharmony_ci /// UT test cases for `send()` and async receive. 486cac7dca0Sopenharmony_ci /// 487cac7dca0Sopenharmony_ci /// # Brief 488cac7dca0Sopenharmony_ci /// 1. Call channel to create a sender and a receiver handle pair. 489cac7dca0Sopenharmony_ci /// 2. Sender sends message in one thread. 490cac7dca0Sopenharmony_ci /// 3. Receiver receives message in another thread. 491cac7dca0Sopenharmony_ci /// 4. Check if the test results are correct. 492cac7dca0Sopenharmony_ci #[test] 493cac7dca0Sopenharmony_ci fn send_recv_await() { 494cac7dca0Sopenharmony_ci let (tx, rx) = oneshot::channel(); 495cac7dca0Sopenharmony_ci if tx.send(6).is_err() { 496cac7dca0Sopenharmony_ci panic!("Receiver dropped"); 497cac7dca0Sopenharmony_ci } 498cac7dca0Sopenharmony_ci spawn(async move { 499cac7dca0Sopenharmony_ci match rx.await { 500cac7dca0Sopenharmony_ci Ok(v) => assert_eq!(v, 6), 501cac7dca0Sopenharmony_ci Err(_) => panic!("Sender dropped"), 502cac7dca0Sopenharmony_ci } 503cac7dca0Sopenharmony_ci }); 504cac7dca0Sopenharmony_ci } 505cac7dca0Sopenharmony_ci 506cac7dca0Sopenharmony_ci /// UT test cases for `is_closed()` and `close`. 507cac7dca0Sopenharmony_ci /// 508cac7dca0Sopenharmony_ci /// # Brief 509cac7dca0Sopenharmony_ci /// 1. Call channel to create a sender and a receiver handle pair. 510cac7dca0Sopenharmony_ci /// 2. Check whether the sender is closed. 511cac7dca0Sopenharmony_ci /// 3. Close the receiver. 512cac7dca0Sopenharmony_ci /// 4. Check whether the receiver will receive the message sent before it 513cac7dca0Sopenharmony_ci /// closed. 514cac7dca0Sopenharmony_ci /// 5. Check if the test results are correct. 515cac7dca0Sopenharmony_ci #[test] 516cac7dca0Sopenharmony_ci fn close_rx() { 517cac7dca0Sopenharmony_ci let (tx, mut rx) = oneshot::channel(); 518cac7dca0Sopenharmony_ci assert!(!tx.is_closed()); 519cac7dca0Sopenharmony_ci rx.close(); 520cac7dca0Sopenharmony_ci 521cac7dca0Sopenharmony_ci assert!(tx.is_closed()); 522cac7dca0Sopenharmony_ci assert!(tx.send("never received").is_err()); 523cac7dca0Sopenharmony_ci 524cac7dca0Sopenharmony_ci let (tx, mut rx) = oneshot::channel(); 525cac7dca0Sopenharmony_ci assert!(tx.send("will receive").is_ok()); 526cac7dca0Sopenharmony_ci 527cac7dca0Sopenharmony_ci rx.close(); 528cac7dca0Sopenharmony_ci 529cac7dca0Sopenharmony_ci let msg = rx.try_recv().unwrap(); 530cac7dca0Sopenharmony_ci assert_eq!(msg, "will receive"); 531cac7dca0Sopenharmony_ci } 532cac7dca0Sopenharmony_ci} 533