1cac7dca0Sopenharmony_ci// Copyright (c) 2023 Huawei Device Co., Ltd.
2cac7dca0Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License");
3cac7dca0Sopenharmony_ci// you may not use this file except in compliance with the License.
4cac7dca0Sopenharmony_ci// You may obtain a copy of the License at
5cac7dca0Sopenharmony_ci//
6cac7dca0Sopenharmony_ci//     http://www.apache.org/licenses/LICENSE-2.0
7cac7dca0Sopenharmony_ci//
8cac7dca0Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software
9cac7dca0Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS,
10cac7dca0Sopenharmony_ci// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11cac7dca0Sopenharmony_ci// See the License for the specific language governing permissions and
12cac7dca0Sopenharmony_ci// limitations under the License.
13cac7dca0Sopenharmony_ci
14cac7dca0Sopenharmony_ci//! One-shot channel is used to send a single message from a single sender to a
15cac7dca0Sopenharmony_ci//! single receiver. The [`channel`] function returns a [`Sender`] and
16cac7dca0Sopenharmony_ci//! [`Receiver`] handle pair that controls channel.
17cac7dca0Sopenharmony_ci//!
18cac7dca0Sopenharmony_ci//! The `Sender` handle is used by the producer to send a message.
19cac7dca0Sopenharmony_ci//! The `Receiver` handle is used by the consumer to receive the message. It has
20cac7dca0Sopenharmony_ci//! implemented the `Future` trait
21cac7dca0Sopenharmony_ci//!
22cac7dca0Sopenharmony_ci//! The `send` method is not async. It can be called from non-async context.
23cac7dca0Sopenharmony_ci//!
24cac7dca0Sopenharmony_ci//! # Examples
25cac7dca0Sopenharmony_ci//!
26cac7dca0Sopenharmony_ci//! ```
27cac7dca0Sopenharmony_ci//! use ylong_runtime::sync::oneshot;
28cac7dca0Sopenharmony_ci//! async fn io_func() {
29cac7dca0Sopenharmony_ci//!     let (tx, rx) = oneshot::channel();
30cac7dca0Sopenharmony_ci//!     ylong_runtime::spawn(async move {
31cac7dca0Sopenharmony_ci//!         if let Err(_) = tx.send(6) {
32cac7dca0Sopenharmony_ci//!             println!("Receiver dropped");
33cac7dca0Sopenharmony_ci//!         }
34cac7dca0Sopenharmony_ci//!     });
35cac7dca0Sopenharmony_ci//!
36cac7dca0Sopenharmony_ci//!     match rx.await {
37cac7dca0Sopenharmony_ci//!         Ok(v) => println!("received : {:?}", v),
38cac7dca0Sopenharmony_ci//!         Err(_) => println!("Sender dropped"),
39cac7dca0Sopenharmony_ci//!     }
40cac7dca0Sopenharmony_ci//! }
41cac7dca0Sopenharmony_ci//! ```
42cac7dca0Sopenharmony_ciuse std::cell::RefCell;
43cac7dca0Sopenharmony_ciuse std::fmt::{Debug, Formatter};
44cac7dca0Sopenharmony_ciuse std::future::Future;
45cac7dca0Sopenharmony_ciuse std::pin::Pin;
46cac7dca0Sopenharmony_ciuse std::sync::atomic::AtomicUsize;
47cac7dca0Sopenharmony_ciuse std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
48cac7dca0Sopenharmony_ciuse std::sync::Arc;
49cac7dca0Sopenharmony_ciuse std::task::Poll::{Pending, Ready};
50cac7dca0Sopenharmony_ciuse std::task::{Context, Poll};
51cac7dca0Sopenharmony_ci
52cac7dca0Sopenharmony_ciuse super::atomic_waker::AtomicWaker;
53cac7dca0Sopenharmony_ciuse super::error::{RecvError, TryRecvError};
54cac7dca0Sopenharmony_ci
55cac7dca0Sopenharmony_ci/// Initial state.
56cac7dca0Sopenharmony_ciconst INIT: usize = 0b00;
57cac7dca0Sopenharmony_ci/// Sender has sent the value.
58cac7dca0Sopenharmony_ciconst SENT: usize = 0b01;
59cac7dca0Sopenharmony_ci/// Channel is closed.
60cac7dca0Sopenharmony_ciconst CLOSED: usize = 0b10;
61cac7dca0Sopenharmony_ci
62cac7dca0Sopenharmony_ci/// Creates a new one-shot channel with a `Sender` and `Receiver` handle pair.
63cac7dca0Sopenharmony_ci///
64cac7dca0Sopenharmony_ci/// The `Sender` can send a single value to the `Receiver`.
65cac7dca0Sopenharmony_ci///
66cac7dca0Sopenharmony_ci/// # Examples
67cac7dca0Sopenharmony_ci///
68cac7dca0Sopenharmony_ci/// ```
69cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot;
70cac7dca0Sopenharmony_ci/// async fn io_func() {
71cac7dca0Sopenharmony_ci///     let (tx, rx) = oneshot::channel();
72cac7dca0Sopenharmony_ci///     ylong_runtime::spawn(async move {
73cac7dca0Sopenharmony_ci///         if let Err(_) = tx.send(6) {
74cac7dca0Sopenharmony_ci///             println!("Receiver dropped");
75cac7dca0Sopenharmony_ci///         }
76cac7dca0Sopenharmony_ci///     });
77cac7dca0Sopenharmony_ci///
78cac7dca0Sopenharmony_ci///     match rx.await {
79cac7dca0Sopenharmony_ci///         Ok(v) => println!("received : {:?}", v),
80cac7dca0Sopenharmony_ci///         Err(_) => println!("Sender dropped"),
81cac7dca0Sopenharmony_ci///     }
82cac7dca0Sopenharmony_ci/// }
83cac7dca0Sopenharmony_ci/// ```
84cac7dca0Sopenharmony_cipub fn channel<T>() -> (Sender<T>, Receiver<T>) {
85cac7dca0Sopenharmony_ci    let channel = Arc::new(Channel::new());
86cac7dca0Sopenharmony_ci    let tx = Sender {
87cac7dca0Sopenharmony_ci        channel: channel.clone(),
88cac7dca0Sopenharmony_ci    };
89cac7dca0Sopenharmony_ci    let rx = Receiver { channel };
90cac7dca0Sopenharmony_ci    (tx, rx)
91cac7dca0Sopenharmony_ci}
92cac7dca0Sopenharmony_ci
93cac7dca0Sopenharmony_ci/// Sends a single value to the associated [`Receiver`].
94cac7dca0Sopenharmony_ci/// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`]
95cac7dca0Sopenharmony_ci/// function.
96cac7dca0Sopenharmony_ci///
97cac7dca0Sopenharmony_ci/// # Examples
98cac7dca0Sopenharmony_ci///
99cac7dca0Sopenharmony_ci/// ```
100cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot;
101cac7dca0Sopenharmony_ci/// async fn io_func() {
102cac7dca0Sopenharmony_ci///     let (tx, rx) = oneshot::channel();
103cac7dca0Sopenharmony_ci///     ylong_runtime::spawn(async move {
104cac7dca0Sopenharmony_ci///         if let Err(_) = tx.send(6) {
105cac7dca0Sopenharmony_ci///             println!("Receiver dropped");
106cac7dca0Sopenharmony_ci///         }
107cac7dca0Sopenharmony_ci///     });
108cac7dca0Sopenharmony_ci///
109cac7dca0Sopenharmony_ci///     match rx.await {
110cac7dca0Sopenharmony_ci///         Ok(v) => println!("received : {:?}", v),
111cac7dca0Sopenharmony_ci///         Err(_) => println!("Sender dropped"),
112cac7dca0Sopenharmony_ci///     }
113cac7dca0Sopenharmony_ci/// }
114cac7dca0Sopenharmony_ci/// ```
115cac7dca0Sopenharmony_ci///
116cac7dca0Sopenharmony_ci/// The receiver will fail with a [`RecvError`] if the sender is dropped without
117cac7dca0Sopenharmony_ci/// sending a value.
118cac7dca0Sopenharmony_ci///
119cac7dca0Sopenharmony_ci/// # Examples
120cac7dca0Sopenharmony_ci///
121cac7dca0Sopenharmony_ci/// ```
122cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot;
123cac7dca0Sopenharmony_ci/// async fn io_func() {
124cac7dca0Sopenharmony_ci///     let (tx, rx) = oneshot::channel::<()>();
125cac7dca0Sopenharmony_ci///     ylong_runtime::spawn(async move {
126cac7dca0Sopenharmony_ci///         drop(tx);
127cac7dca0Sopenharmony_ci///     });
128cac7dca0Sopenharmony_ci///
129cac7dca0Sopenharmony_ci///     match rx.await {
130cac7dca0Sopenharmony_ci///         Ok(v) => panic!("This won't happen"),
131cac7dca0Sopenharmony_ci///         Err(_) => println!("Sender dropped"),
132cac7dca0Sopenharmony_ci///     }
133cac7dca0Sopenharmony_ci/// }
134cac7dca0Sopenharmony_ci/// ```
135cac7dca0Sopenharmony_ci#[derive(Debug)]
136cac7dca0Sopenharmony_cipub struct Sender<T> {
137cac7dca0Sopenharmony_ci    channel: Arc<Channel<T>>,
138cac7dca0Sopenharmony_ci}
139cac7dca0Sopenharmony_ci
140cac7dca0Sopenharmony_ciimpl<T> Sender<T> {
141cac7dca0Sopenharmony_ci    /// Sends a single value to the associated [`Receiver`], returns the value
142cac7dca0Sopenharmony_ci    /// back if it fails to send.
143cac7dca0Sopenharmony_ci    ///
144cac7dca0Sopenharmony_ci    /// The sender will consume itself when calling this method. It can send a
145cac7dca0Sopenharmony_ci    /// single value in synchronous code as it doesn't need waiting.
146cac7dca0Sopenharmony_ci    ///
147cac7dca0Sopenharmony_ci    /// # Examples
148cac7dca0Sopenharmony_ci    ///
149cac7dca0Sopenharmony_ci    /// ```
150cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::oneshot;
151cac7dca0Sopenharmony_ci    /// async fn io_func() {
152cac7dca0Sopenharmony_ci    ///     let (tx, rx) = oneshot::channel();
153cac7dca0Sopenharmony_ci    ///     ylong_runtime::spawn(async move {
154cac7dca0Sopenharmony_ci    ///         if let Err(_) = tx.send(6) {
155cac7dca0Sopenharmony_ci    ///             println!("Receiver dropped");
156cac7dca0Sopenharmony_ci    ///         }
157cac7dca0Sopenharmony_ci    ///     });
158cac7dca0Sopenharmony_ci    ///
159cac7dca0Sopenharmony_ci    ///     match rx.await {
160cac7dca0Sopenharmony_ci    ///         Ok(v) => println!("received : {:?}", v),
161cac7dca0Sopenharmony_ci    ///         Err(_) => println!("Sender dropped"),
162cac7dca0Sopenharmony_ci    ///     }
163cac7dca0Sopenharmony_ci    /// }
164cac7dca0Sopenharmony_ci    /// ```
165cac7dca0Sopenharmony_ci    pub fn send(self, value: T) -> Result<(), T> {
166cac7dca0Sopenharmony_ci        self.channel.value.borrow_mut().replace(value);
167cac7dca0Sopenharmony_ci
168cac7dca0Sopenharmony_ci        loop {
169cac7dca0Sopenharmony_ci            match self.channel.state.load(Acquire) {
170cac7dca0Sopenharmony_ci                INIT => {
171cac7dca0Sopenharmony_ci                    if self
172cac7dca0Sopenharmony_ci                        .channel
173cac7dca0Sopenharmony_ci                        .state
174cac7dca0Sopenharmony_ci                        .compare_exchange(INIT, SENT, AcqRel, Acquire)
175cac7dca0Sopenharmony_ci                        .is_ok()
176cac7dca0Sopenharmony_ci                    {
177cac7dca0Sopenharmony_ci                        self.channel.waker.wake();
178cac7dca0Sopenharmony_ci                        return Ok(());
179cac7dca0Sopenharmony_ci                    }
180cac7dca0Sopenharmony_ci                }
181cac7dca0Sopenharmony_ci                CLOSED => {
182cac7dca0Sopenharmony_ci                    // value is stored in this function before.
183cac7dca0Sopenharmony_ci                    return Err(self.channel.take_value().unwrap());
184cac7dca0Sopenharmony_ci                }
185cac7dca0Sopenharmony_ci                _ => unreachable!(),
186cac7dca0Sopenharmony_ci            }
187cac7dca0Sopenharmony_ci        }
188cac7dca0Sopenharmony_ci    }
189cac7dca0Sopenharmony_ci
190cac7dca0Sopenharmony_ci    /// Checks whether channel is closed. if so, the sender could not
191cac7dca0Sopenharmony_ci    /// send any value anymore. It returns true if the [`Receiver`] is dropped
192cac7dca0Sopenharmony_ci    /// or calls the [`close`] method.
193cac7dca0Sopenharmony_ci    ///
194cac7dca0Sopenharmony_ci    /// [`close`]: Receiver::close
195cac7dca0Sopenharmony_ci    ///
196cac7dca0Sopenharmony_ci    /// # Examples
197cac7dca0Sopenharmony_ci    ///
198cac7dca0Sopenharmony_ci    /// ```
199cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::oneshot;
200cac7dca0Sopenharmony_ci    /// async fn io_func() {
201cac7dca0Sopenharmony_ci    ///     let (tx, rx) = oneshot::channel();
202cac7dca0Sopenharmony_ci    ///     assert!(!tx.is_closed());
203cac7dca0Sopenharmony_ci    ///
204cac7dca0Sopenharmony_ci    ///     drop(rx);
205cac7dca0Sopenharmony_ci    ///
206cac7dca0Sopenharmony_ci    ///     assert!(tx.is_closed());
207cac7dca0Sopenharmony_ci    ///     assert!(tx.send("no receive").is_err());
208cac7dca0Sopenharmony_ci    /// }
209cac7dca0Sopenharmony_ci    /// ```
210cac7dca0Sopenharmony_ci    pub fn is_closed(&self) -> bool {
211cac7dca0Sopenharmony_ci        self.channel.state.load(Acquire) == CLOSED
212cac7dca0Sopenharmony_ci    }
213cac7dca0Sopenharmony_ci}
214cac7dca0Sopenharmony_ci
215cac7dca0Sopenharmony_ciimpl<T> Drop for Sender<T> {
216cac7dca0Sopenharmony_ci    fn drop(&mut self) {
217cac7dca0Sopenharmony_ci        if self.channel.state.swap(SENT, SeqCst) == INIT {
218cac7dca0Sopenharmony_ci            self.channel.waker.wake();
219cac7dca0Sopenharmony_ci        }
220cac7dca0Sopenharmony_ci    }
221cac7dca0Sopenharmony_ci}
222cac7dca0Sopenharmony_ci
223cac7dca0Sopenharmony_ci/// Receives a single value from the associated [`Sender`].
224cac7dca0Sopenharmony_ci/// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`]
225cac7dca0Sopenharmony_ci/// function.
226cac7dca0Sopenharmony_ci///
227cac7dca0Sopenharmony_ci/// There is no `recv` method to receive the message because the receiver itself
228cac7dca0Sopenharmony_ci/// implements the [`Future`] trait. To receive a value, `.await` the `Receiver`
229cac7dca0Sopenharmony_ci/// object directly.
230cac7dca0Sopenharmony_ci///
231cac7dca0Sopenharmony_ci/// # Examples
232cac7dca0Sopenharmony_ci///
233cac7dca0Sopenharmony_ci/// ```
234cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot;
235cac7dca0Sopenharmony_ci/// async fn io_func() {
236cac7dca0Sopenharmony_ci///     let (tx, rx) = oneshot::channel();
237cac7dca0Sopenharmony_ci///     ylong_runtime::spawn(async move {
238cac7dca0Sopenharmony_ci///         if let Err(_) = tx.send(6) {
239cac7dca0Sopenharmony_ci///             println!("Receiver dropped");
240cac7dca0Sopenharmony_ci///         }
241cac7dca0Sopenharmony_ci///     });
242cac7dca0Sopenharmony_ci///
243cac7dca0Sopenharmony_ci///     match rx.await {
244cac7dca0Sopenharmony_ci///         Ok(v) => println!("received : {:?}", v),
245cac7dca0Sopenharmony_ci///         Err(_) => println!("Sender dropped"),
246cac7dca0Sopenharmony_ci///     }
247cac7dca0Sopenharmony_ci/// }
248cac7dca0Sopenharmony_ci/// ```
249cac7dca0Sopenharmony_ci///
250cac7dca0Sopenharmony_ci/// The receiver will fail with [`RecvError`], if the sender is dropped without
251cac7dca0Sopenharmony_ci/// sending a value.
252cac7dca0Sopenharmony_ci///
253cac7dca0Sopenharmony_ci/// # Examples
254cac7dca0Sopenharmony_ci///
255cac7dca0Sopenharmony_ci/// ```
256cac7dca0Sopenharmony_ci/// use ylong_runtime::sync::oneshot;
257cac7dca0Sopenharmony_ci/// async fn io_func() {
258cac7dca0Sopenharmony_ci///     let (tx, rx) = oneshot::channel::<u32>();
259cac7dca0Sopenharmony_ci///     ylong_runtime::spawn(async move {
260cac7dca0Sopenharmony_ci///         drop(tx);
261cac7dca0Sopenharmony_ci///     });
262cac7dca0Sopenharmony_ci///
263cac7dca0Sopenharmony_ci///     match rx.await {
264cac7dca0Sopenharmony_ci///         Ok(v) => panic!("This won't happen"),
265cac7dca0Sopenharmony_ci///         Err(_) => println!("Sender dropped"),
266cac7dca0Sopenharmony_ci///     }
267cac7dca0Sopenharmony_ci/// }
268cac7dca0Sopenharmony_ci/// ```
269cac7dca0Sopenharmony_ci#[derive(Debug)]
270cac7dca0Sopenharmony_cipub struct Receiver<T> {
271cac7dca0Sopenharmony_ci    channel: Arc<Channel<T>>,
272cac7dca0Sopenharmony_ci}
273cac7dca0Sopenharmony_ci
274cac7dca0Sopenharmony_ciimpl<T> Receiver<T> {
275cac7dca0Sopenharmony_ci    /// Attempts to receive a value from the associated [`Sender`].
276cac7dca0Sopenharmony_ci    ///
277cac7dca0Sopenharmony_ci    /// The method will still receive the result if the `Sender` gets dropped
278cac7dca0Sopenharmony_ci    /// after sending the message.
279cac7dca0Sopenharmony_ci    ///
280cac7dca0Sopenharmony_ci    /// # Return value
281cac7dca0Sopenharmony_ci    /// The function returns:
282cac7dca0Sopenharmony_ci    ///  * `Ok(T)` if receiving a value successfully.
283cac7dca0Sopenharmony_ci    ///  * `Err(TryRecvError::Empty)` if no value has been sent yet.
284cac7dca0Sopenharmony_ci    ///  * `Err(TryRecvError::Closed)` if the sender has dropped without sending
285cac7dca0Sopenharmony_ci    ///   a value, or if the message has already been received.
286cac7dca0Sopenharmony_ci    ///
287cac7dca0Sopenharmony_ci    /// # Examples
288cac7dca0Sopenharmony_ci    ///
289cac7dca0Sopenharmony_ci    /// `try_recv` before a value is sent, then after.
290cac7dca0Sopenharmony_ci    ///
291cac7dca0Sopenharmony_ci    /// ```
292cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::error::TryRecvError;
293cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::oneshot;
294cac7dca0Sopenharmony_ci    /// async fn io_func() {
295cac7dca0Sopenharmony_ci    ///     let (tx, mut rx) = oneshot::channel();
296cac7dca0Sopenharmony_ci    ///     match rx.try_recv() {
297cac7dca0Sopenharmony_ci    ///         Err(TryRecvError::Empty) => {}
298cac7dca0Sopenharmony_ci    ///         _ => panic!("This won't happen"),
299cac7dca0Sopenharmony_ci    ///     }
300cac7dca0Sopenharmony_ci    ///
301cac7dca0Sopenharmony_ci    ///     // Send a value
302cac7dca0Sopenharmony_ci    ///     tx.send("Hello").unwrap();
303cac7dca0Sopenharmony_ci    ///
304cac7dca0Sopenharmony_ci    ///     match rx.try_recv() {
305cac7dca0Sopenharmony_ci    ///         Ok(value) => assert_eq!(value, "Hello"),
306cac7dca0Sopenharmony_ci    ///         _ => panic!("This won't happen"),
307cac7dca0Sopenharmony_ci    ///     }
308cac7dca0Sopenharmony_ci    /// }
309cac7dca0Sopenharmony_ci    /// ```
310cac7dca0Sopenharmony_ci    ///
311cac7dca0Sopenharmony_ci    /// `try_recv` when the sender dropped before sending a value
312cac7dca0Sopenharmony_ci    ///
313cac7dca0Sopenharmony_ci    /// ```
314cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::error::TryRecvError;
315cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::oneshot;
316cac7dca0Sopenharmony_ci    /// async fn io_func() {
317cac7dca0Sopenharmony_ci    ///     let (tx, mut rx) = oneshot::channel::<()>();
318cac7dca0Sopenharmony_ci    ///     drop(tx);
319cac7dca0Sopenharmony_ci    ///
320cac7dca0Sopenharmony_ci    ///     match rx.try_recv() {
321cac7dca0Sopenharmony_ci    ///         Err(TryRecvError::Closed) => {}
322cac7dca0Sopenharmony_ci    ///         _ => panic!("This won't happen"),
323cac7dca0Sopenharmony_ci    ///     }
324cac7dca0Sopenharmony_ci    /// }
325cac7dca0Sopenharmony_ci    /// ```
326cac7dca0Sopenharmony_ci    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
327cac7dca0Sopenharmony_ci        match self.channel.state.load(Acquire) {
328cac7dca0Sopenharmony_ci            INIT => Err(TryRecvError::Empty),
329cac7dca0Sopenharmony_ci            SENT => self
330cac7dca0Sopenharmony_ci                .channel
331cac7dca0Sopenharmony_ci                .take_value_sent()
332cac7dca0Sopenharmony_ci                .map_err(|_| TryRecvError::Closed),
333cac7dca0Sopenharmony_ci            CLOSED => Err(TryRecvError::Closed),
334cac7dca0Sopenharmony_ci            _ => unreachable!(),
335cac7dca0Sopenharmony_ci        }
336cac7dca0Sopenharmony_ci    }
337cac7dca0Sopenharmony_ci
338cac7dca0Sopenharmony_ci    /// Closes the channel, prevents the `Sender` from sending a value.
339cac7dca0Sopenharmony_ci    ///
340cac7dca0Sopenharmony_ci    /// The `Sender` will fail to call [`send`] after the `Receiver` called
341cac7dca0Sopenharmony_ci    /// `close`. It will do nothing if the channel is already closed or the
342cac7dca0Sopenharmony_ci    /// message has been already received.
343cac7dca0Sopenharmony_ci    ///
344cac7dca0Sopenharmony_ci    /// [`send`]: Sender::send
345cac7dca0Sopenharmony_ci    /// [`try_recv`]: Receiver::try_recv
346cac7dca0Sopenharmony_ci    ///
347cac7dca0Sopenharmony_ci    /// # Examples
348cac7dca0Sopenharmony_ci    /// ```
349cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::oneshot;
350cac7dca0Sopenharmony_ci    /// async fn io_func() {
351cac7dca0Sopenharmony_ci    ///     let (tx, mut rx) = oneshot::channel();
352cac7dca0Sopenharmony_ci    ///     assert!(!tx.is_closed());
353cac7dca0Sopenharmony_ci    ///
354cac7dca0Sopenharmony_ci    ///     rx.close();
355cac7dca0Sopenharmony_ci    ///
356cac7dca0Sopenharmony_ci    ///     assert!(tx.is_closed());
357cac7dca0Sopenharmony_ci    ///     assert!(tx.send("no receive").is_err());
358cac7dca0Sopenharmony_ci    /// }
359cac7dca0Sopenharmony_ci    /// ```
360cac7dca0Sopenharmony_ci    ///
361cac7dca0Sopenharmony_ci    /// Receive a value sent **before** calling `close`
362cac7dca0Sopenharmony_ci    ///
363cac7dca0Sopenharmony_ci    /// ```
364cac7dca0Sopenharmony_ci    /// use ylong_runtime::sync::oneshot;
365cac7dca0Sopenharmony_ci    /// async fn io_func() {
366cac7dca0Sopenharmony_ci    ///     let (tx, mut rx) = oneshot::channel();
367cac7dca0Sopenharmony_ci    ///     assert!(tx.send("Hello").is_ok());
368cac7dca0Sopenharmony_ci    ///
369cac7dca0Sopenharmony_ci    ///     rx.close();
370cac7dca0Sopenharmony_ci    ///
371cac7dca0Sopenharmony_ci    ///     let msg = rx.try_recv().unwrap();
372cac7dca0Sopenharmony_ci    ///     assert_eq!(msg, "Hello");
373cac7dca0Sopenharmony_ci    /// }
374cac7dca0Sopenharmony_ci    /// ```
375cac7dca0Sopenharmony_ci    pub fn close(&mut self) {
376cac7dca0Sopenharmony_ci        let _ = self
377cac7dca0Sopenharmony_ci            .channel
378cac7dca0Sopenharmony_ci            .state
379cac7dca0Sopenharmony_ci            .compare_exchange(INIT, CLOSED, AcqRel, Acquire);
380cac7dca0Sopenharmony_ci    }
381cac7dca0Sopenharmony_ci}
382cac7dca0Sopenharmony_ci
383cac7dca0Sopenharmony_ciimpl<T> Drop for Receiver<T> {
384cac7dca0Sopenharmony_ci    fn drop(&mut self) {
385cac7dca0Sopenharmony_ci        self.close();
386cac7dca0Sopenharmony_ci    }
387cac7dca0Sopenharmony_ci}
388cac7dca0Sopenharmony_ci
389cac7dca0Sopenharmony_ciimpl<T> Future for Receiver<T> {
390cac7dca0Sopenharmony_ci    type Output = Result<T, RecvError>;
391cac7dca0Sopenharmony_ci
392cac7dca0Sopenharmony_ci    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
393cac7dca0Sopenharmony_ci        match self.channel.state.load(Acquire) {
394cac7dca0Sopenharmony_ci            INIT => {
395cac7dca0Sopenharmony_ci                self.channel.waker.register_by_ref(cx.waker());
396cac7dca0Sopenharmony_ci                if self.channel.state.load(Acquire) == SENT {
397cac7dca0Sopenharmony_ci                    Ready(self.channel.take_value_sent())
398cac7dca0Sopenharmony_ci                } else {
399cac7dca0Sopenharmony_ci                    Pending
400cac7dca0Sopenharmony_ci                }
401cac7dca0Sopenharmony_ci            }
402cac7dca0Sopenharmony_ci            SENT => Ready(self.channel.take_value_sent()),
403cac7dca0Sopenharmony_ci            CLOSED => Ready(Err(RecvError)),
404cac7dca0Sopenharmony_ci            _ => unreachable!(),
405cac7dca0Sopenharmony_ci        }
406cac7dca0Sopenharmony_ci    }
407cac7dca0Sopenharmony_ci}
408cac7dca0Sopenharmony_ci
409cac7dca0Sopenharmony_cistruct Channel<T> {
410cac7dca0Sopenharmony_ci    /// The state of the channel.
411cac7dca0Sopenharmony_ci    state: AtomicUsize,
412cac7dca0Sopenharmony_ci
413cac7dca0Sopenharmony_ci    /// The value passed by channel, it is set by `Sender` and read by
414cac7dca0Sopenharmony_ci    /// `Receiver`.
415cac7dca0Sopenharmony_ci    value: RefCell<Option<T>>,
416cac7dca0Sopenharmony_ci
417cac7dca0Sopenharmony_ci    /// The waker to notify the sender task or the receiver task.
418cac7dca0Sopenharmony_ci    waker: AtomicWaker,
419cac7dca0Sopenharmony_ci}
420cac7dca0Sopenharmony_ci
421cac7dca0Sopenharmony_ciimpl<T> Channel<T> {
422cac7dca0Sopenharmony_ci    fn new() -> Channel<T> {
423cac7dca0Sopenharmony_ci        Channel {
424cac7dca0Sopenharmony_ci            state: AtomicUsize::new(INIT),
425cac7dca0Sopenharmony_ci            value: RefCell::new(None),
426cac7dca0Sopenharmony_ci            waker: AtomicWaker::new(),
427cac7dca0Sopenharmony_ci        }
428cac7dca0Sopenharmony_ci    }
429cac7dca0Sopenharmony_ci
430cac7dca0Sopenharmony_ci    fn take_value_sent(&self) -> Result<T, RecvError> {
431cac7dca0Sopenharmony_ci        match self.take_value() {
432cac7dca0Sopenharmony_ci            Some(val) => {
433cac7dca0Sopenharmony_ci                self.state.store(CLOSED, Release);
434cac7dca0Sopenharmony_ci                Ok(val)
435cac7dca0Sopenharmony_ci            }
436cac7dca0Sopenharmony_ci            None => Err(RecvError),
437cac7dca0Sopenharmony_ci        }
438cac7dca0Sopenharmony_ci    }
439cac7dca0Sopenharmony_ci
440cac7dca0Sopenharmony_ci    fn take_value(&self) -> Option<T> {
441cac7dca0Sopenharmony_ci        self.value.borrow_mut().take()
442cac7dca0Sopenharmony_ci    }
443cac7dca0Sopenharmony_ci}
444cac7dca0Sopenharmony_ci
445cac7dca0Sopenharmony_ciunsafe impl<T: Send> Send for Channel<T> {}
446cac7dca0Sopenharmony_ciunsafe impl<T: Send> Sync for Channel<T> {}
447cac7dca0Sopenharmony_ci
448cac7dca0Sopenharmony_ciimpl<T> Drop for Channel<T> {
449cac7dca0Sopenharmony_ci    fn drop(&mut self) {
450cac7dca0Sopenharmony_ci        self.waker.take_waker();
451cac7dca0Sopenharmony_ci    }
452cac7dca0Sopenharmony_ci}
453cac7dca0Sopenharmony_ci
454cac7dca0Sopenharmony_ciimpl<T: Debug> Debug for Channel<T> {
455cac7dca0Sopenharmony_ci    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
456cac7dca0Sopenharmony_ci        f.debug_struct("Channel")
457cac7dca0Sopenharmony_ci            .field("state", &self.state.load(Acquire))
458cac7dca0Sopenharmony_ci            .finish()
459cac7dca0Sopenharmony_ci    }
460cac7dca0Sopenharmony_ci}
461cac7dca0Sopenharmony_ci
462cac7dca0Sopenharmony_ci#[cfg(test)]
463cac7dca0Sopenharmony_cimod tests {
464cac7dca0Sopenharmony_ci    use crate::spawn;
465cac7dca0Sopenharmony_ci    use crate::sync::error::TryRecvError;
466cac7dca0Sopenharmony_ci    use crate::sync::oneshot;
467cac7dca0Sopenharmony_ci
468cac7dca0Sopenharmony_ci    /// UT test cases for `send()` and `try_recv()`.
469cac7dca0Sopenharmony_ci    ///
470cac7dca0Sopenharmony_ci    /// # Brief
471cac7dca0Sopenharmony_ci    /// 1. Call channel to create a sender and a receiver handle pair.
472cac7dca0Sopenharmony_ci    /// 2. Receiver tries receiving a message before the sender sends one.
473cac7dca0Sopenharmony_ci    /// 3. Receiver tries receiving a message after the sender sends one.
474cac7dca0Sopenharmony_ci    /// 4. Check if the test results are correct.
475cac7dca0Sopenharmony_ci    #[test]
476cac7dca0Sopenharmony_ci    fn send_try_recv() {
477cac7dca0Sopenharmony_ci        let (tx, mut rx) = oneshot::channel();
478cac7dca0Sopenharmony_ci        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
479cac7dca0Sopenharmony_ci        tx.send("hello").unwrap();
480cac7dca0Sopenharmony_ci
481cac7dca0Sopenharmony_ci        assert_eq!(rx.try_recv().unwrap(), "hello");
482cac7dca0Sopenharmony_ci        assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
483cac7dca0Sopenharmony_ci    }
484cac7dca0Sopenharmony_ci
485cac7dca0Sopenharmony_ci    /// UT test cases for `send()` and async receive.
486cac7dca0Sopenharmony_ci    ///
487cac7dca0Sopenharmony_ci    /// # Brief
488cac7dca0Sopenharmony_ci    /// 1. Call channel to create a sender and a receiver handle pair.
489cac7dca0Sopenharmony_ci    /// 2. Sender sends message in one thread.
490cac7dca0Sopenharmony_ci    /// 3. Receiver receives message in another thread.
491cac7dca0Sopenharmony_ci    /// 4. Check if the test results are correct.
492cac7dca0Sopenharmony_ci    #[test]
493cac7dca0Sopenharmony_ci    fn send_recv_await() {
494cac7dca0Sopenharmony_ci        let (tx, rx) = oneshot::channel();
495cac7dca0Sopenharmony_ci        if tx.send(6).is_err() {
496cac7dca0Sopenharmony_ci            panic!("Receiver dropped");
497cac7dca0Sopenharmony_ci        }
498cac7dca0Sopenharmony_ci        spawn(async move {
499cac7dca0Sopenharmony_ci            match rx.await {
500cac7dca0Sopenharmony_ci                Ok(v) => assert_eq!(v, 6),
501cac7dca0Sopenharmony_ci                Err(_) => panic!("Sender dropped"),
502cac7dca0Sopenharmony_ci            }
503cac7dca0Sopenharmony_ci        });
504cac7dca0Sopenharmony_ci    }
505cac7dca0Sopenharmony_ci
506cac7dca0Sopenharmony_ci    /// UT test cases for `is_closed()` and `close`.
507cac7dca0Sopenharmony_ci    ///
508cac7dca0Sopenharmony_ci    /// # Brief
509cac7dca0Sopenharmony_ci    /// 1. Call channel to create a sender and a receiver handle pair.
510cac7dca0Sopenharmony_ci    /// 2. Check whether the sender is closed.
511cac7dca0Sopenharmony_ci    /// 3. Close the receiver.
512cac7dca0Sopenharmony_ci    /// 4. Check whether the receiver will receive the message sent before it
513cac7dca0Sopenharmony_ci    ///    closed.
514cac7dca0Sopenharmony_ci    /// 5. Check if the test results are correct.
515cac7dca0Sopenharmony_ci    #[test]
516cac7dca0Sopenharmony_ci    fn close_rx() {
517cac7dca0Sopenharmony_ci        let (tx, mut rx) = oneshot::channel();
518cac7dca0Sopenharmony_ci        assert!(!tx.is_closed());
519cac7dca0Sopenharmony_ci        rx.close();
520cac7dca0Sopenharmony_ci
521cac7dca0Sopenharmony_ci        assert!(tx.is_closed());
522cac7dca0Sopenharmony_ci        assert!(tx.send("never received").is_err());
523cac7dca0Sopenharmony_ci
524cac7dca0Sopenharmony_ci        let (tx, mut rx) = oneshot::channel();
525cac7dca0Sopenharmony_ci        assert!(tx.send("will receive").is_ok());
526cac7dca0Sopenharmony_ci
527cac7dca0Sopenharmony_ci        rx.close();
528cac7dca0Sopenharmony_ci
529cac7dca0Sopenharmony_ci        let msg = rx.try_recv().unwrap();
530cac7dca0Sopenharmony_ci        assert_eq!(msg, "will receive");
531cac7dca0Sopenharmony_ci    }
532cac7dca0Sopenharmony_ci}
533