1cc290419Sopenharmony_ci/*
2cc290419Sopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd.
3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License.
5cc290419Sopenharmony_ci * You may obtain a copy of the License at
6cc290419Sopenharmony_ci *
7cc290419Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8cc290419Sopenharmony_ci *
9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and
13cc290419Sopenharmony_ci * limitations under the License.
14cc290419Sopenharmony_ci */
15cc290419Sopenharmony_ci//! uart_wrapper
16cc290419Sopenharmony_ci#![allow(missing_docs)]
17cc290419Sopenharmony_ciuse super::uart::UartWriter;
18cc290419Sopenharmony_ciuse super::{uart, UartMap};
19cc290419Sopenharmony_ciuse crate::config::{self, TaskMessage};
20cc290419Sopenharmony_ciuse crate::serializer::native_struct::UartHead;
21cc290419Sopenharmony_ciuse crate::serializer::serialize::Serialization;
22cc290419Sopenharmony_ciuse crate::serializer::{self, UART_HEAD_SIZE};
23cc290419Sopenharmony_ciuse crate::utils;
24cc290419Sopenharmony_ci#[allow(unused)]
25cc290419Sopenharmony_ciuse crate::utils::hdc_log::*;
26cc290419Sopenharmony_ciuse std::collections::HashMap;
27cc290419Sopenharmony_ciuse std::sync::Arc;
28cc290419Sopenharmony_ci#[cfg(feature = "host")]
29cc290419Sopenharmony_ciextern crate ylong_runtime_static as ylong_runtime;
30cc290419Sopenharmony_ciuse ylong_runtime::sync::waiter::Waiter;
31cc290419Sopenharmony_ciuse ylong_runtime::sync::Mutex;
32cc290419Sopenharmony_ciuse ylong_runtime::task::JoinHandle;
33cc290419Sopenharmony_ci
34cc290419Sopenharmony_ci#[derive(PartialEq, Debug, Clone, Copy)]
35cc290419Sopenharmony_ci#[repr(u8)]
36cc290419Sopenharmony_cipub enum UartOption {
37cc290419Sopenharmony_ci    Tail = 1,  // makr is the last packget, can be send to session.
38cc290419Sopenharmony_ci    Reset = 2, // host request reset session in daemon
39cc290419Sopenharmony_ci    Ack = 4,   // response the pkg is received
40cc290419Sopenharmony_ci    Nak = 8,   // request resend pkg again
41cc290419Sopenharmony_ci    Free = 16, // request free this session, some unable recovery error happened
42cc290419Sopenharmony_ci}
43cc290419Sopenharmony_ci
44cc290419Sopenharmony_ciimpl TryFrom<u8> for UartOption {
45cc290419Sopenharmony_ci    type Error = ();
46cc290419Sopenharmony_ci    fn try_from(cmd: u8) -> Result<Self, ()> {
47cc290419Sopenharmony_ci        match cmd {
48cc290419Sopenharmony_ci            1 => Ok(Self::Tail),
49cc290419Sopenharmony_ci            2 => Ok(Self::Reset),
50cc290419Sopenharmony_ci            4 => Ok(Self::Ack),
51cc290419Sopenharmony_ci            8 => Ok(Self::Nak),
52cc290419Sopenharmony_ci            16 => Ok(Self::Free),
53cc290419Sopenharmony_ci            _ => Err(()),
54cc290419Sopenharmony_ci        }
55cc290419Sopenharmony_ci    }
56cc290419Sopenharmony_ci}
57cc290419Sopenharmony_ci
58cc290419Sopenharmony_cistruct WaiterManager {
59cc290419Sopenharmony_ci    // waiter used for sync package send-response one by one.
60cc290419Sopenharmony_ci    response_waiters: HashMap<u32, Waiter>,
61cc290419Sopenharmony_ci    // waiter used for waiting if no packages.
62cc290419Sopenharmony_ci    empty_waiters: HashMap<u32, Waiter>,
63cc290419Sopenharmony_ci}
64cc290419Sopenharmony_ci
65cc290419Sopenharmony_ciimpl WaiterManager {
66cc290419Sopenharmony_ci    fn get_instance() -> &'static mut WaiterManager {
67cc290419Sopenharmony_ci        static mut INSTANCE: Option<WaiterManager> = None;
68cc290419Sopenharmony_ci        unsafe {
69cc290419Sopenharmony_ci            INSTANCE.get_or_insert(WaiterManager {
70cc290419Sopenharmony_ci                response_waiters: HashMap::new(),
71cc290419Sopenharmony_ci                empty_waiters: HashMap::new(),
72cc290419Sopenharmony_ci            })
73cc290419Sopenharmony_ci        }
74cc290419Sopenharmony_ci    }
75cc290419Sopenharmony_ci
76cc290419Sopenharmony_ci    async fn start_session(session_id: u32) {
77cc290419Sopenharmony_ci        let instance = Self::get_instance();
78cc290419Sopenharmony_ci        instance.response_waiters.insert(session_id, Waiter::new());
79cc290419Sopenharmony_ci        instance.empty_waiters.insert(session_id, Waiter::new());
80cc290419Sopenharmony_ci    }
81cc290419Sopenharmony_ci
82cc290419Sopenharmony_ci    #[allow(unused)]
83cc290419Sopenharmony_ci    async fn wait_response(session_id: u32) {
84cc290419Sopenharmony_ci        let instance = Self::get_instance();
85cc290419Sopenharmony_ci        let waiter = instance.response_waiters.get(&session_id);
86cc290419Sopenharmony_ci        if let Some(w) = waiter {
87cc290419Sopenharmony_ci            w.wait().await;
88cc290419Sopenharmony_ci        }
89cc290419Sopenharmony_ci    }
90cc290419Sopenharmony_ci
91cc290419Sopenharmony_ci    #[allow(unused)]
92cc290419Sopenharmony_ci    async fn wakeup_response_wait(session_id: u32) {
93cc290419Sopenharmony_ci        let instance = Self::get_instance();
94cc290419Sopenharmony_ci        let waiter = instance.response_waiters.get(&session_id);
95cc290419Sopenharmony_ci        if let Some(w) = waiter {
96cc290419Sopenharmony_ci            w.wake_one();
97cc290419Sopenharmony_ci        }
98cc290419Sopenharmony_ci    }
99cc290419Sopenharmony_ci
100cc290419Sopenharmony_ci    #[allow(unused)]
101cc290419Sopenharmony_ci    async fn wait_empty(session_id: u32) {
102cc290419Sopenharmony_ci        let instance = Self::get_instance();
103cc290419Sopenharmony_ci        let waiter = instance.empty_waiters.get(&session_id);
104cc290419Sopenharmony_ci        if let Some(w) = waiter {
105cc290419Sopenharmony_ci            w.wait().await;
106cc290419Sopenharmony_ci        }
107cc290419Sopenharmony_ci    }
108cc290419Sopenharmony_ci
109cc290419Sopenharmony_ci    #[allow(unused)]
110cc290419Sopenharmony_ci    async fn wakeup_empty_wait(session_id: u32) {
111cc290419Sopenharmony_ci        let instance = Self::get_instance();
112cc290419Sopenharmony_ci        let waiter = instance.empty_waiters.get(&session_id);
113cc290419Sopenharmony_ci        if let Some(w) = waiter {
114cc290419Sopenharmony_ci            w.wake_one();
115cc290419Sopenharmony_ci        }
116cc290419Sopenharmony_ci    }
117cc290419Sopenharmony_ci}
118cc290419Sopenharmony_ci
119cc290419Sopenharmony_ci#[derive(PartialEq, Debug, Clone, Copy)]
120cc290419Sopenharmony_ci#[repr(u8)]
121cc290419Sopenharmony_cienum OutputDataStatus {
122cc290419Sopenharmony_ci    WaitSend = 0,
123cc290419Sopenharmony_ci    WaitResponse = 1,
124cc290419Sopenharmony_ci    ResponseOk = 2,
125cc290419Sopenharmony_ci}
126cc290419Sopenharmony_ci
127cc290419Sopenharmony_ci#[derive(PartialEq, Debug, Clone)]
128cc290419Sopenharmony_cistruct OutputData {
129cc290419Sopenharmony_ci    session_id: u32,
130cc290419Sopenharmony_ci    response: bool,
131cc290419Sopenharmony_ci    option: u8,
132cc290419Sopenharmony_ci    package_index: u32,
133cc290419Sopenharmony_ci    data: Vec<u8>,
134cc290419Sopenharmony_ci    status: OutputDataStatus,
135cc290419Sopenharmony_ci    retry_count: u32,
136cc290419Sopenharmony_ci}
137cc290419Sopenharmony_ci
138cc290419Sopenharmony_ciimpl std::fmt::Display for OutputData {
139cc290419Sopenharmony_ci    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140cc290419Sopenharmony_ci        write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
141cc290419Sopenharmony_ci        self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
142cc290419Sopenharmony_ci    }
143cc290419Sopenharmony_ci}
144cc290419Sopenharmony_ci
145cc290419Sopenharmony_citype OutputData_ = Arc<Mutex<OutputData>>;
146cc290419Sopenharmony_ci
147cc290419Sopenharmony_citype OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
148cc290419Sopenharmony_ci
149cc290419Sopenharmony_cistruct DataQueue {
150cc290419Sopenharmony_ci    data_map: HashMap<u32, OutputDataVec_>,
151cc290419Sopenharmony_ci    thread_map: HashMap<u32, JoinHandle<()>>,
152cc290419Sopenharmony_ci    stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
153cc290419Sopenharmony_ci}
154cc290419Sopenharmony_ci
155cc290419Sopenharmony_ciimpl DataQueue {
156cc290419Sopenharmony_ci    fn new() -> Self {
157cc290419Sopenharmony_ci        Self {
158cc290419Sopenharmony_ci            data_map: HashMap::new(),
159cc290419Sopenharmony_ci            thread_map: HashMap::new(),
160cc290419Sopenharmony_ci            stop_flag_map: HashMap::new(),
161cc290419Sopenharmony_ci        }
162cc290419Sopenharmony_ci    }
163cc290419Sopenharmony_ci}
164cc290419Sopenharmony_ci
165cc290419Sopenharmony_citype DataQueue_ = Arc<Mutex<DataQueue>>;
166cc290419Sopenharmony_ci
167cc290419Sopenharmony_cipub struct QueueManager {}
168cc290419Sopenharmony_ci
169cc290419Sopenharmony_ciimpl QueueManager {
170cc290419Sopenharmony_ci    fn get_instance() -> DataQueue_ {
171cc290419Sopenharmony_ci        static mut INSTANCE: Option<DataQueue_> = None;
172cc290419Sopenharmony_ci        unsafe {
173cc290419Sopenharmony_ci            INSTANCE
174cc290419Sopenharmony_ci                .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
175cc290419Sopenharmony_ci                .clone()
176cc290419Sopenharmony_ci        }
177cc290419Sopenharmony_ci    }
178cc290419Sopenharmony_ci
179cc290419Sopenharmony_ci    async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
180cc290419Sopenharmony_ci        let instance = Self::get_instance();
181cc290419Sopenharmony_ci        let mtx = instance.lock().await;
182cc290419Sopenharmony_ci        let data_map = &mtx.data_map;
183cc290419Sopenharmony_ci        if let Some(vec) = data_map.get(&session_id) {
184cc290419Sopenharmony_ci            let vec = vec.lock().await;
185cc290419Sopenharmony_ci            if !vec.is_empty() {
186cc290419Sopenharmony_ci                let Some(arc) = vec.get(index) else {
187cc290419Sopenharmony_ci                    crate::error!("get_package get is None");
188cc290419Sopenharmony_ci                    return None;
189cc290419Sopenharmony_ci                };
190cc290419Sopenharmony_ci                let data_mtx = arc.lock().await;
191cc290419Sopenharmony_ci                return Some(data_mtx.clone());
192cc290419Sopenharmony_ci            }
193cc290419Sopenharmony_ci        }
194cc290419Sopenharmony_ci        None
195cc290419Sopenharmony_ci    }
196cc290419Sopenharmony_ci
197cc290419Sopenharmony_ci    async fn put_package(session_id: u32, data: OutputData) {
198cc290419Sopenharmony_ci        let instance = Self::get_instance();
199cc290419Sopenharmony_ci        let mut mtx = instance.lock().await;
200cc290419Sopenharmony_ci        let data_map = &mut mtx.data_map;
201cc290419Sopenharmony_ci        if let Some(vec) = data_map.get(&session_id) {
202cc290419Sopenharmony_ci            let mut vec = vec.lock().await;
203cc290419Sopenharmony_ci            let item = Arc::new(Mutex::new(data));
204cc290419Sopenharmony_ci            vec.push(item);
205cc290419Sopenharmony_ci        } else {
206cc290419Sopenharmony_ci            let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
207cc290419Sopenharmony_ci            let d = Arc::new(Mutex::new(data));
208cc290419Sopenharmony_ci            vec.push(d);
209cc290419Sopenharmony_ci            let v = Arc::new(Mutex::new(vec));
210cc290419Sopenharmony_ci            data_map.insert(session_id, v);
211cc290419Sopenharmony_ci        }
212cc290419Sopenharmony_ci    }
213cc290419Sopenharmony_ci
214cc290419Sopenharmony_ci    async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
215cc290419Sopenharmony_ci        let instance = Self::get_instance();
216cc290419Sopenharmony_ci        let mtx = instance.lock().await;
217cc290419Sopenharmony_ci        let data_map = &mtx.data_map;
218cc290419Sopenharmony_ci        if let Some(vec) = data_map.get(&session_id) {
219cc290419Sopenharmony_ci            let vec = vec.lock().await;
220cc290419Sopenharmony_ci            if !vec.is_empty() {
221cc290419Sopenharmony_ci                let Some(arc) = vec.get(index) else {
222cc290419Sopenharmony_ci                    crate::error!("update_package get is None");
223cc290419Sopenharmony_ci                    return false;
224cc290419Sopenharmony_ci                };
225cc290419Sopenharmony_ci                let mut data_mtx = arc.lock().await;
226cc290419Sopenharmony_ci                *data_mtx = data;
227cc290419Sopenharmony_ci                return true;
228cc290419Sopenharmony_ci            }
229cc290419Sopenharmony_ci        }
230cc290419Sopenharmony_ci        false
231cc290419Sopenharmony_ci    }
232cc290419Sopenharmony_ci
233cc290419Sopenharmony_ci    async fn get_stop_flag(session_id: u32) -> Option<u8> {
234cc290419Sopenharmony_ci        let instance = Self::get_instance();
235cc290419Sopenharmony_ci        let mtx = instance.lock().await;
236cc290419Sopenharmony_ci        let stop_flag_map = &mtx.stop_flag_map;
237cc290419Sopenharmony_ci        if let Some(flag) = stop_flag_map.get(&session_id) {
238cc290419Sopenharmony_ci            let v = flag.lock().await;
239cc290419Sopenharmony_ci            Some(*v)
240cc290419Sopenharmony_ci        } else {
241cc290419Sopenharmony_ci            None
242cc290419Sopenharmony_ci        }
243cc290419Sopenharmony_ci    }
244cc290419Sopenharmony_ci
245cc290419Sopenharmony_ci    #[allow(unused)]
246cc290419Sopenharmony_ci    async fn set_stop_flag(session_id: u32) {
247cc290419Sopenharmony_ci        let instance = Self::get_instance();
248cc290419Sopenharmony_ci        let mut mtx = instance.lock().await;
249cc290419Sopenharmony_ci        let stop_flag_map = &mut mtx.stop_flag_map;
250cc290419Sopenharmony_ci        stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
251cc290419Sopenharmony_ci    }
252cc290419Sopenharmony_ci
253cc290419Sopenharmony_ci    async fn remove_package(session_id: u32, index: usize) -> bool {
254cc290419Sopenharmony_ci        let instance = Self::get_instance();
255cc290419Sopenharmony_ci        let mtx = instance.lock().await;
256cc290419Sopenharmony_ci        let data_map = &mtx.data_map;
257cc290419Sopenharmony_ci        if let Some(vec) = data_map.get(&session_id) {
258cc290419Sopenharmony_ci            let mut vec = vec.lock().await;
259cc290419Sopenharmony_ci            if !vec.is_empty() && index < vec.len() {
260cc290419Sopenharmony_ci                vec.remove(index);
261cc290419Sopenharmony_ci                return true;
262cc290419Sopenharmony_ci            }
263cc290419Sopenharmony_ci        }
264cc290419Sopenharmony_ci        false
265cc290419Sopenharmony_ci    }
266cc290419Sopenharmony_ci
267cc290419Sopenharmony_ci    async fn remove_session(session_id: u32) {
268cc290419Sopenharmony_ci        let instance = Self::get_instance();
269cc290419Sopenharmony_ci        let mut mtx = instance.lock().await;
270cc290419Sopenharmony_ci        mtx.data_map.remove(&session_id);
271cc290419Sopenharmony_ci        mtx.stop_flag_map.remove(&session_id);
272cc290419Sopenharmony_ci        mtx.thread_map.remove(&session_id);
273cc290419Sopenharmony_ci        crate::info!("remove_session:{session_id}");
274cc290419Sopenharmony_ci    }
275cc290419Sopenharmony_ci
276cc290419Sopenharmony_ci    async fn check_stop(session_id: u32) -> bool {
277cc290419Sopenharmony_ci        if let Some(stop) = Self::get_stop_flag(session_id).await {
278cc290419Sopenharmony_ci            return stop == 0;
279cc290419Sopenharmony_ci        }
280cc290419Sopenharmony_ci        false
281cc290419Sopenharmony_ci    }
282cc290419Sopenharmony_ci
283cc290419Sopenharmony_ci    async fn session_loop(session_id: u32) {
284cc290419Sopenharmony_ci        // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
285cc290419Sopenharmony_ci        //   2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
286cc290419Sopenharmony_ci        //   3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
287cc290419Sopenharmony_ci        //      如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
288cc290419Sopenharmony_ci        //      retry count为0, 则表示连接中断,stop session
289cc290419Sopenharmony_ci        crate::info!("session_loop for {}", session_id);
290cc290419Sopenharmony_ci        loop {
291cc290419Sopenharmony_ci            if Self::check_stop(session_id).await {
292cc290419Sopenharmony_ci                crate::info!("session_loop stop");
293cc290419Sopenharmony_ci                break;
294cc290419Sopenharmony_ci            }
295cc290419Sopenharmony_ci            let mut first_pkg = Self::get_package(session_id, 0).await;
296cc290419Sopenharmony_ci            while first_pkg.is_none() {
297cc290419Sopenharmony_ci                WaiterManager::wait_empty(session_id).await;
298cc290419Sopenharmony_ci                first_pkg = Self::get_package(session_id, 0).await;
299cc290419Sopenharmony_ci                if Self::check_stop(session_id).await {
300cc290419Sopenharmony_ci                    crate::info!("session_loop stop");
301cc290419Sopenharmony_ci                    break;
302cc290419Sopenharmony_ci                }
303cc290419Sopenharmony_ci            }
304cc290419Sopenharmony_ci            if Self::check_stop(session_id).await {
305cc290419Sopenharmony_ci                crate::info!("session_loop stop");
306cc290419Sopenharmony_ci                break;
307cc290419Sopenharmony_ci            }
308cc290419Sopenharmony_ci            let Some(mut first_pkg) = first_pkg else {
309cc290419Sopenharmony_ci                crate::info!("session_loop first_pkg is None");
310cc290419Sopenharmony_ci                break;
311cc290419Sopenharmony_ci            };
312cc290419Sopenharmony_ci            let mut status = first_pkg.status;
313cc290419Sopenharmony_ci            let mut retry_count = first_pkg.retry_count;
314cc290419Sopenharmony_ci
315cc290419Sopenharmony_ci            if status == OutputDataStatus::WaitSend {
316cc290419Sopenharmony_ci                // 发送数据
317cc290419Sopenharmony_ci                let data = first_pkg.data.clone();
318cc290419Sopenharmony_ci                let _ret = UartMap::put(session_id, data).await;
319cc290419Sopenharmony_ci                // 如果是ack报文 则不需要等待回应
320cc290419Sopenharmony_ci                if first_pkg.response {
321cc290419Sopenharmony_ci                    QueueManager::remove_package(session_id, 0).await;
322cc290419Sopenharmony_ci                    continue;
323cc290419Sopenharmony_ci                }
324cc290419Sopenharmony_ci                // 修改data 的status = WaitResponse
325cc290419Sopenharmony_ci                first_pkg.status = OutputDataStatus::WaitResponse;
326cc290419Sopenharmony_ci                retry_count -= 1;
327cc290419Sopenharmony_ci                first_pkg.retry_count = retry_count;
328cc290419Sopenharmony_ci                // 更新数据
329cc290419Sopenharmony_ci                QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
330cc290419Sopenharmony_ci                // 等待response
331cc290419Sopenharmony_ci                WaiterManager::wait_response(session_id).await;
332cc290419Sopenharmony_ci
333cc290419Sopenharmony_ci                if Self::check_stop(session_id).await {
334cc290419Sopenharmony_ci                    crate::info!("session_loop stop");
335cc290419Sopenharmony_ci                    break;
336cc290419Sopenharmony_ci                }
337cc290419Sopenharmony_ci                // 收到回复
338cc290419Sopenharmony_ci                // 重新获取数据
339cc290419Sopenharmony_ci
340cc290419Sopenharmony_ci                let Some(mut first_pkg) = Self::get_package(session_id, 0).await else {
341cc290419Sopenharmony_ci                    crate::info!("session_loop first_pkg is None");
342cc290419Sopenharmony_ci                    break;
343cc290419Sopenharmony_ci                };
344cc290419Sopenharmony_ci                // 得到新状态
345cc290419Sopenharmony_ci                status = first_pkg.status;
346cc290419Sopenharmony_ci
347cc290419Sopenharmony_ci                if status == OutputDataStatus::ResponseOk {
348cc290419Sopenharmony_ci                    // 删除当前data
349cc290419Sopenharmony_ci                    QueueManager::remove_package(session_id, 0).await;
350cc290419Sopenharmony_ci                    continue;
351cc290419Sopenharmony_ci                }
352cc290419Sopenharmony_ci                retry_count = first_pkg.retry_count;
353cc290419Sopenharmony_ci                while retry_count > 0 && status == OutputDataStatus::WaitResponse {
354cc290419Sopenharmony_ci                    // 保存retry_count
355cc290419Sopenharmony_ci                    retry_count -= 1;
356cc290419Sopenharmony_ci                    first_pkg.retry_count = retry_count;
357cc290419Sopenharmony_ci                    QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
358cc290419Sopenharmony_ci
359cc290419Sopenharmony_ci                    // 再次发送数据
360cc290419Sopenharmony_ci                    let data = first_pkg.data.clone();
361cc290419Sopenharmony_ci                    let _ret = UartMap::put(session_id, data).await;
362cc290419Sopenharmony_ci                    WaiterManager::wait_response(session_id).await;
363cc290419Sopenharmony_ci
364cc290419Sopenharmony_ci                    if Self::check_stop(session_id).await {
365cc290419Sopenharmony_ci                        break;
366cc290419Sopenharmony_ci                    }
367cc290419Sopenharmony_ci
368cc290419Sopenharmony_ci                    let Some(first_pkg) = Self::get_package(session_id, 0).await else {
369cc290419Sopenharmony_ci                        break;
370cc290419Sopenharmony_ci                    };
371cc290419Sopenharmony_ci                    status = first_pkg.status;
372cc290419Sopenharmony_ci
373cc290419Sopenharmony_ci                    match status {
374cc290419Sopenharmony_ci                        OutputDataStatus::ResponseOk => {
375cc290419Sopenharmony_ci                            QueueManager::remove_package(session_id, 0).await;
376cc290419Sopenharmony_ci                            break;
377cc290419Sopenharmony_ci                        }
378cc290419Sopenharmony_ci                        OutputDataStatus::WaitResponse => {
379cc290419Sopenharmony_ci                            let Some(first_pkg) = Self::get_package(session_id, 0).await else {
380cc290419Sopenharmony_ci                                break;
381cc290419Sopenharmony_ci                            };
382cc290419Sopenharmony_ci                            status = first_pkg.status;
383cc290419Sopenharmony_ci                            retry_count = first_pkg.retry_count;
384cc290419Sopenharmony_ci                            continue;
385cc290419Sopenharmony_ci                        }
386cc290419Sopenharmony_ci                        OutputDataStatus::WaitSend => {
387cc290419Sopenharmony_ci                            QueueManager::remove_package(session_id, 0).await;
388cc290419Sopenharmony_ci                            break;
389cc290419Sopenharmony_ci                        }
390cc290419Sopenharmony_ci                    }
391cc290419Sopenharmony_ci                }
392cc290419Sopenharmony_ci            }
393cc290419Sopenharmony_ci        }
394cc290419Sopenharmony_ci        Self::remove_session(session_id).await;
395cc290419Sopenharmony_ci        crate::info!("session_loop for {} end.", session_id);
396cc290419Sopenharmony_ci    }
397cc290419Sopenharmony_ci}
398cc290419Sopenharmony_ci
399cc290419Sopenharmony_cipub async fn start_session(session_id: u32) {
400cc290419Sopenharmony_ci    let instance = QueueManager::get_instance();
401cc290419Sopenharmony_ci    let mut mtx = instance.lock().await;
402cc290419Sopenharmony_ci    let thread_map = &mut mtx.thread_map;
403cc290419Sopenharmony_ci    if thread_map.contains_key(&session_id) {
404cc290419Sopenharmony_ci        crate::error!("session thread has started.");
405cc290419Sopenharmony_ci        return;
406cc290419Sopenharmony_ci    }
407cc290419Sopenharmony_ci
408cc290419Sopenharmony_ci    WaiterManager::start_session(session_id).await;
409cc290419Sopenharmony_ci
410cc290419Sopenharmony_ci    let handle = utils::spawn(QueueManager::session_loop(session_id));
411cc290419Sopenharmony_ci    thread_map.insert(session_id, handle);
412cc290419Sopenharmony_ci
413cc290419Sopenharmony_ci    let stop_flag_map = &mut mtx.stop_flag_map;
414cc290419Sopenharmony_ci    stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
415cc290419Sopenharmony_ci}
416cc290419Sopenharmony_ci
417cc290419Sopenharmony_ciasync fn stop_session(session_id: u32) {
418cc290419Sopenharmony_ci    let instance = QueueManager::get_instance();
419cc290419Sopenharmony_ci    let mut mtx = instance.lock().await;
420cc290419Sopenharmony_ci    let stop_flag_map = &mut mtx.stop_flag_map;
421cc290419Sopenharmony_ci    stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
422cc290419Sopenharmony_ci
423cc290419Sopenharmony_ci    WaiterManager::wakeup_empty_wait(session_id).await;
424cc290419Sopenharmony_ci    WaiterManager::wakeup_response_wait(session_id).await;
425cc290419Sopenharmony_ci}
426cc290419Sopenharmony_ci
427cc290419Sopenharmony_cipub async fn stop_other_session(session_id: u32) {
428cc290419Sopenharmony_ci    let instance = QueueManager::get_instance();
429cc290419Sopenharmony_ci    let mtx = instance.lock().await;
430cc290419Sopenharmony_ci    let session_ids = mtx.data_map.keys();
431cc290419Sopenharmony_ci    let mut remove_sessions = Vec::new();
432cc290419Sopenharmony_ci    for k in session_ids {
433cc290419Sopenharmony_ci        if *k != session_id {
434cc290419Sopenharmony_ci            remove_sessions.push(*k);
435cc290419Sopenharmony_ci        }
436cc290419Sopenharmony_ci    }
437cc290419Sopenharmony_ci    drop(mtx);
438cc290419Sopenharmony_ci    for id in remove_sessions {
439cc290419Sopenharmony_ci        stop_session(id).await;
440cc290419Sopenharmony_ci    }
441cc290419Sopenharmony_ci}
442cc290419Sopenharmony_ci
443cc290419Sopenharmony_ciasync fn output_package(
444cc290419Sopenharmony_ci    session_id: u32,
445cc290419Sopenharmony_ci    response: bool,
446cc290419Sopenharmony_ci    option: u8,
447cc290419Sopenharmony_ci    package_index: u32,
448cc290419Sopenharmony_ci    data: Vec<u8>,
449cc290419Sopenharmony_ci) {
450cc290419Sopenharmony_ci    let pkg = OutputData {
451cc290419Sopenharmony_ci        session_id,
452cc290419Sopenharmony_ci        response,
453cc290419Sopenharmony_ci        option,
454cc290419Sopenharmony_ci        package_index,
455cc290419Sopenharmony_ci        data: data.clone(),
456cc290419Sopenharmony_ci        retry_count: 5,
457cc290419Sopenharmony_ci        status: OutputDataStatus::WaitSend,
458cc290419Sopenharmony_ci    };
459cc290419Sopenharmony_ci    QueueManager::put_package(session_id, pkg).await;
460cc290419Sopenharmony_ci    WaiterManager::wakeup_empty_wait(session_id).await;
461cc290419Sopenharmony_ci}
462cc290419Sopenharmony_ci
463cc290419Sopenharmony_ci#[allow(unused)]
464cc290419Sopenharmony_cifn is_response(option: u8) -> bool {
465cc290419Sopenharmony_ci    let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
466cc290419Sopenharmony_ci    ret != 0
467cc290419Sopenharmony_ci}
468cc290419Sopenharmony_ci
469cc290419Sopenharmony_cipub async fn on_read_head(head: UartHead) {
470cc290419Sopenharmony_ci    let session_id = head.session_id;
471cc290419Sopenharmony_ci    let option = head.option;
472cc290419Sopenharmony_ci    let package_index = head.package_index;
473cc290419Sopenharmony_ci    if option & (UartOption::Free as u16) != 0 {
474cc290419Sopenharmony_ci        stop_session(session_id).await;
475cc290419Sopenharmony_ci        return;
476cc290419Sopenharmony_ci    }
477cc290419Sopenharmony_ci    if is_response(option as u8) {
478cc290419Sopenharmony_ci        let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else {
479cc290419Sopenharmony_ci            return;
480cc290419Sopenharmony_ci        };
481cc290419Sopenharmony_ci        pkg.status = if option & (UartOption::Ack as u16) > 1 {
482cc290419Sopenharmony_ci            OutputDataStatus::ResponseOk
483cc290419Sopenharmony_ci        } else {
484cc290419Sopenharmony_ci            OutputDataStatus::WaitSend
485cc290419Sopenharmony_ci        };
486cc290419Sopenharmony_ci        QueueManager::update_package(session_id, 0, pkg).await;
487cc290419Sopenharmony_ci        WaiterManager::wakeup_response_wait(session_id).await;
488cc290419Sopenharmony_ci    } else {
489cc290419Sopenharmony_ci        let mut header_obj =
490cc290419Sopenharmony_ci            uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
491cc290419Sopenharmony_ci        let header = header_obj.serialize();
492cc290419Sopenharmony_ci        let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
493cc290419Sopenharmony_ci        header_obj.head_checksum = u32::to_le(head_sum);
494cc290419Sopenharmony_ci        let data = header_obj.serialize();
495cc290419Sopenharmony_ci        output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
496cc290419Sopenharmony_ci    }
497cc290419Sopenharmony_ci}
498cc290419Sopenharmony_ci
499cc290419Sopenharmony_ci#[allow(unused)]
500cc290419Sopenharmony_cifn get_package_index(is_create: bool) -> u32 {
501cc290419Sopenharmony_ci    static mut PACKAGE_INDEX: u32 = 888;
502cc290419Sopenharmony_ci
503cc290419Sopenharmony_ci    unsafe {
504cc290419Sopenharmony_ci        if is_create {
505cc290419Sopenharmony_ci            PACKAGE_INDEX += 1;
506cc290419Sopenharmony_ci            PACKAGE_INDEX
507cc290419Sopenharmony_ci        } else {
508cc290419Sopenharmony_ci            PACKAGE_INDEX
509cc290419Sopenharmony_ci        }
510cc290419Sopenharmony_ci    }
511cc290419Sopenharmony_ci}
512cc290419Sopenharmony_ci
513cc290419Sopenharmony_cipub async fn start_uart(session_id: u32, wr: UartWriter) {
514cc290419Sopenharmony_ci    UartMap::start(session_id, wr).await;
515cc290419Sopenharmony_ci}
516cc290419Sopenharmony_ci
517cc290419Sopenharmony_ci#[allow(unused)]
518cc290419Sopenharmony_cipub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
519cc290419Sopenharmony_ci    let mut pkg_index = package_index;
520cc290419Sopenharmony_ci    if package_index == 0 {
521cc290419Sopenharmony_ci        pkg_index = get_package_index(true);
522cc290419Sopenharmony_ci    }
523cc290419Sopenharmony_ci    let send = serializer::concat_pack(data);
524cc290419Sopenharmony_ci    crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
525cc290419Sopenharmony_ci
526cc290419Sopenharmony_ci    let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
527cc290419Sopenharmony_ci    let mut index = 0;
528cc290419Sopenharmony_ci    let len = send.len();
529cc290419Sopenharmony_ci
530cc290419Sopenharmony_ci    loop {
531cc290419Sopenharmony_ci        if index >= len {
532cc290419Sopenharmony_ci            println!("wrap_put break");
533cc290419Sopenharmony_ci            break;
534cc290419Sopenharmony_ci        }
535cc290419Sopenharmony_ci        let size;
536cc290419Sopenharmony_ci        let mut op = option;
537cc290419Sopenharmony_ci        if index + payload_max_len <= len {
538cc290419Sopenharmony_ci            size = payload_max_len;
539cc290419Sopenharmony_ci        } else {
540cc290419Sopenharmony_ci            size = len - index;
541cc290419Sopenharmony_ci            op = UartOption::Tail as u8 | option;
542cc290419Sopenharmony_ci        }
543cc290419Sopenharmony_ci
544cc290419Sopenharmony_ci        let data = send[index..index + size].to_vec().clone();
545cc290419Sopenharmony_ci        let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
546cc290419Sopenharmony_ci        let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
547cc290419Sopenharmony_ci        header_obj.data_checksum = u32::to_le(data_sum);
548cc290419Sopenharmony_ci
549cc290419Sopenharmony_ci        let header = header_obj.serialize();
550cc290419Sopenharmony_ci        let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
551cc290419Sopenharmony_ci        header_obj.head_checksum = u32::to_le(head_sum);
552cc290419Sopenharmony_ci
553cc290419Sopenharmony_ci        let header = header_obj.serialize();
554cc290419Sopenharmony_ci        crate::info!("header, header_len:{}", header.len());
555cc290419Sopenharmony_ci        let total = [header, send[index..index + size].to_vec().clone()].concat();
556cc290419Sopenharmony_ci
557cc290419Sopenharmony_ci        output_package(
558cc290419Sopenharmony_ci            session_id,
559cc290419Sopenharmony_ci            (op & UartOption::Ack as u8) > 0,
560cc290419Sopenharmony_ci            op,
561cc290419Sopenharmony_ci            pkg_index,
562cc290419Sopenharmony_ci            total,
563cc290419Sopenharmony_ci        )
564cc290419Sopenharmony_ci        .await;
565cc290419Sopenharmony_ci        pkg_index = get_package_index(true);
566cc290419Sopenharmony_ci        index += size;
567cc290419Sopenharmony_ci    }
568cc290419Sopenharmony_ci}
569