1/*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15//! uart_wrapper
16#![allow(missing_docs)]
17use super::uart::UartWriter;
18use super::{uart, UartMap};
19use crate::config::{self, TaskMessage};
20use crate::serializer::native_struct::UartHead;
21use crate::serializer::serialize::Serialization;
22use crate::serializer::{self, UART_HEAD_SIZE};
23use crate::utils;
24#[allow(unused)]
25use crate::utils::hdc_log::*;
26use std::collections::HashMap;
27use std::sync::Arc;
28#[cfg(feature = "host")]
29extern crate ylong_runtime_static as ylong_runtime;
30use ylong_runtime::sync::waiter::Waiter;
31use ylong_runtime::sync::Mutex;
32use ylong_runtime::task::JoinHandle;
33
34#[derive(PartialEq, Debug, Clone, Copy)]
35#[repr(u8)]
36pub enum UartOption {
37    Tail = 1,  // makr is the last packget, can be send to session.
38    Reset = 2, // host request reset session in daemon
39    Ack = 4,   // response the pkg is received
40    Nak = 8,   // request resend pkg again
41    Free = 16, // request free this session, some unable recovery error happened
42}
43
44impl TryFrom<u8> for UartOption {
45    type Error = ();
46    fn try_from(cmd: u8) -> Result<Self, ()> {
47        match cmd {
48            1 => Ok(Self::Tail),
49            2 => Ok(Self::Reset),
50            4 => Ok(Self::Ack),
51            8 => Ok(Self::Nak),
52            16 => Ok(Self::Free),
53            _ => Err(()),
54        }
55    }
56}
57
58struct WaiterManager {
59    // waiter used for sync package send-response one by one.
60    response_waiters: HashMap<u32, Waiter>,
61    // waiter used for waiting if no packages.
62    empty_waiters: HashMap<u32, Waiter>,
63}
64
65impl WaiterManager {
66    fn get_instance() -> &'static mut WaiterManager {
67        static mut INSTANCE: Option<WaiterManager> = None;
68        unsafe {
69            INSTANCE.get_or_insert(WaiterManager {
70                response_waiters: HashMap::new(),
71                empty_waiters: HashMap::new(),
72            })
73        }
74    }
75
76    async fn start_session(session_id: u32) {
77        let instance = Self::get_instance();
78        instance.response_waiters.insert(session_id, Waiter::new());
79        instance.empty_waiters.insert(session_id, Waiter::new());
80    }
81
82    #[allow(unused)]
83    async fn wait_response(session_id: u32) {
84        let instance = Self::get_instance();
85        let waiter = instance.response_waiters.get(&session_id);
86        if let Some(w) = waiter {
87            w.wait().await;
88        }
89    }
90
91    #[allow(unused)]
92    async fn wakeup_response_wait(session_id: u32) {
93        let instance = Self::get_instance();
94        let waiter = instance.response_waiters.get(&session_id);
95        if let Some(w) = waiter {
96            w.wake_one();
97        }
98    }
99
100    #[allow(unused)]
101    async fn wait_empty(session_id: u32) {
102        let instance = Self::get_instance();
103        let waiter = instance.empty_waiters.get(&session_id);
104        if let Some(w) = waiter {
105            w.wait().await;
106        }
107    }
108
109    #[allow(unused)]
110    async fn wakeup_empty_wait(session_id: u32) {
111        let instance = Self::get_instance();
112        let waiter = instance.empty_waiters.get(&session_id);
113        if let Some(w) = waiter {
114            w.wake_one();
115        }
116    }
117}
118
119#[derive(PartialEq, Debug, Clone, Copy)]
120#[repr(u8)]
121enum OutputDataStatus {
122    WaitSend = 0,
123    WaitResponse = 1,
124    ResponseOk = 2,
125}
126
127#[derive(PartialEq, Debug, Clone)]
128struct OutputData {
129    session_id: u32,
130    response: bool,
131    option: u8,
132    package_index: u32,
133    data: Vec<u8>,
134    status: OutputDataStatus,
135    retry_count: u32,
136}
137
138impl std::fmt::Display for OutputData {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
141        self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
142    }
143}
144
145type OutputData_ = Arc<Mutex<OutputData>>;
146
147type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
148
149struct DataQueue {
150    data_map: HashMap<u32, OutputDataVec_>,
151    thread_map: HashMap<u32, JoinHandle<()>>,
152    stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
153}
154
155impl DataQueue {
156    fn new() -> Self {
157        Self {
158            data_map: HashMap::new(),
159            thread_map: HashMap::new(),
160            stop_flag_map: HashMap::new(),
161        }
162    }
163}
164
165type DataQueue_ = Arc<Mutex<DataQueue>>;
166
167pub struct QueueManager {}
168
169impl QueueManager {
170    fn get_instance() -> DataQueue_ {
171        static mut INSTANCE: Option<DataQueue_> = None;
172        unsafe {
173            INSTANCE
174                .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
175                .clone()
176        }
177    }
178
179    async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
180        let instance = Self::get_instance();
181        let mtx = instance.lock().await;
182        let data_map = &mtx.data_map;
183        if let Some(vec) = data_map.get(&session_id) {
184            let vec = vec.lock().await;
185            if !vec.is_empty() {
186                let Some(arc) = vec.get(index) else {
187                    crate::error!("get_package get is None");
188                    return None;
189                };
190                let data_mtx = arc.lock().await;
191                return Some(data_mtx.clone());
192            }
193        }
194        None
195    }
196
197    async fn put_package(session_id: u32, data: OutputData) {
198        let instance = Self::get_instance();
199        let mut mtx = instance.lock().await;
200        let data_map = &mut mtx.data_map;
201        if let Some(vec) = data_map.get(&session_id) {
202            let mut vec = vec.lock().await;
203            let item = Arc::new(Mutex::new(data));
204            vec.push(item);
205        } else {
206            let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
207            let d = Arc::new(Mutex::new(data));
208            vec.push(d);
209            let v = Arc::new(Mutex::new(vec));
210            data_map.insert(session_id, v);
211        }
212    }
213
214    async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
215        let instance = Self::get_instance();
216        let mtx = instance.lock().await;
217        let data_map = &mtx.data_map;
218        if let Some(vec) = data_map.get(&session_id) {
219            let vec = vec.lock().await;
220            if !vec.is_empty() {
221                let Some(arc) = vec.get(index) else {
222                    crate::error!("update_package get is None");
223                    return false;
224                };
225                let mut data_mtx = arc.lock().await;
226                *data_mtx = data;
227                return true;
228            }
229        }
230        false
231    }
232
233    async fn get_stop_flag(session_id: u32) -> Option<u8> {
234        let instance = Self::get_instance();
235        let mtx = instance.lock().await;
236        let stop_flag_map = &mtx.stop_flag_map;
237        if let Some(flag) = stop_flag_map.get(&session_id) {
238            let v = flag.lock().await;
239            Some(*v)
240        } else {
241            None
242        }
243    }
244
245    #[allow(unused)]
246    async fn set_stop_flag(session_id: u32) {
247        let instance = Self::get_instance();
248        let mut mtx = instance.lock().await;
249        let stop_flag_map = &mut mtx.stop_flag_map;
250        stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
251    }
252
253    async fn remove_package(session_id: u32, index: usize) -> bool {
254        let instance = Self::get_instance();
255        let mtx = instance.lock().await;
256        let data_map = &mtx.data_map;
257        if let Some(vec) = data_map.get(&session_id) {
258            let mut vec = vec.lock().await;
259            if !vec.is_empty() && index < vec.len() {
260                vec.remove(index);
261                return true;
262            }
263        }
264        false
265    }
266
267    async fn remove_session(session_id: u32) {
268        let instance = Self::get_instance();
269        let mut mtx = instance.lock().await;
270        mtx.data_map.remove(&session_id);
271        mtx.stop_flag_map.remove(&session_id);
272        mtx.thread_map.remove(&session_id);
273        crate::info!("remove_session:{session_id}");
274    }
275
276    async fn check_stop(session_id: u32) -> bool {
277        if let Some(stop) = Self::get_stop_flag(session_id).await {
278            return stop == 0;
279        }
280        false
281    }
282
283    async fn session_loop(session_id: u32) {
284        // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
285        //   2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
286        //   3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
287        //      如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
288        //      retry count为0, 则表示连接中断,stop session
289        crate::info!("session_loop for {}", session_id);
290        loop {
291            if Self::check_stop(session_id).await {
292                crate::info!("session_loop stop");
293                break;
294            }
295            let mut first_pkg = Self::get_package(session_id, 0).await;
296            while first_pkg.is_none() {
297                WaiterManager::wait_empty(session_id).await;
298                first_pkg = Self::get_package(session_id, 0).await;
299                if Self::check_stop(session_id).await {
300                    crate::info!("session_loop stop");
301                    break;
302                }
303            }
304            if Self::check_stop(session_id).await {
305                crate::info!("session_loop stop");
306                break;
307            }
308            let Some(mut first_pkg) = first_pkg else {
309                crate::info!("session_loop first_pkg is None");
310                break;
311            };
312            let mut status = first_pkg.status;
313            let mut retry_count = first_pkg.retry_count;
314
315            if status == OutputDataStatus::WaitSend {
316                // 发送数据
317                let data = first_pkg.data.clone();
318                let _ret = UartMap::put(session_id, data).await;
319                // 如果是ack报文 则不需要等待回应
320                if first_pkg.response {
321                    QueueManager::remove_package(session_id, 0).await;
322                    continue;
323                }
324                // 修改data 的status = WaitResponse
325                first_pkg.status = OutputDataStatus::WaitResponse;
326                retry_count -= 1;
327                first_pkg.retry_count = retry_count;
328                // 更新数据
329                QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
330                // 等待response
331                WaiterManager::wait_response(session_id).await;
332
333                if Self::check_stop(session_id).await {
334                    crate::info!("session_loop stop");
335                    break;
336                }
337                // 收到回复
338                // 重新获取数据
339
340                let Some(mut first_pkg) = Self::get_package(session_id, 0).await else {
341                    crate::info!("session_loop first_pkg is None");
342                    break;
343                };
344                // 得到新状态
345                status = first_pkg.status;
346
347                if status == OutputDataStatus::ResponseOk {
348                    // 删除当前data
349                    QueueManager::remove_package(session_id, 0).await;
350                    continue;
351                }
352                retry_count = first_pkg.retry_count;
353                while retry_count > 0 && status == OutputDataStatus::WaitResponse {
354                    // 保存retry_count
355                    retry_count -= 1;
356                    first_pkg.retry_count = retry_count;
357                    QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
358
359                    // 再次发送数据
360                    let data = first_pkg.data.clone();
361                    let _ret = UartMap::put(session_id, data).await;
362                    WaiterManager::wait_response(session_id).await;
363
364                    if Self::check_stop(session_id).await {
365                        break;
366                    }
367
368                    let Some(first_pkg) = Self::get_package(session_id, 0).await else {
369                        break;
370                    };
371                    status = first_pkg.status;
372
373                    match status {
374                        OutputDataStatus::ResponseOk => {
375                            QueueManager::remove_package(session_id, 0).await;
376                            break;
377                        }
378                        OutputDataStatus::WaitResponse => {
379                            let Some(first_pkg) = Self::get_package(session_id, 0).await else {
380                                break;
381                            };
382                            status = first_pkg.status;
383                            retry_count = first_pkg.retry_count;
384                            continue;
385                        }
386                        OutputDataStatus::WaitSend => {
387                            QueueManager::remove_package(session_id, 0).await;
388                            break;
389                        }
390                    }
391                }
392            }
393        }
394        Self::remove_session(session_id).await;
395        crate::info!("session_loop for {} end.", session_id);
396    }
397}
398
399pub async fn start_session(session_id: u32) {
400    let instance = QueueManager::get_instance();
401    let mut mtx = instance.lock().await;
402    let thread_map = &mut mtx.thread_map;
403    if thread_map.contains_key(&session_id) {
404        crate::error!("session thread has started.");
405        return;
406    }
407
408    WaiterManager::start_session(session_id).await;
409
410    let handle = utils::spawn(QueueManager::session_loop(session_id));
411    thread_map.insert(session_id, handle);
412
413    let stop_flag_map = &mut mtx.stop_flag_map;
414    stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
415}
416
417async fn stop_session(session_id: u32) {
418    let instance = QueueManager::get_instance();
419    let mut mtx = instance.lock().await;
420    let stop_flag_map = &mut mtx.stop_flag_map;
421    stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
422
423    WaiterManager::wakeup_empty_wait(session_id).await;
424    WaiterManager::wakeup_response_wait(session_id).await;
425}
426
427pub async fn stop_other_session(session_id: u32) {
428    let instance = QueueManager::get_instance();
429    let mtx = instance.lock().await;
430    let session_ids = mtx.data_map.keys();
431    let mut remove_sessions = Vec::new();
432    for k in session_ids {
433        if *k != session_id {
434            remove_sessions.push(*k);
435        }
436    }
437    drop(mtx);
438    for id in remove_sessions {
439        stop_session(id).await;
440    }
441}
442
443async fn output_package(
444    session_id: u32,
445    response: bool,
446    option: u8,
447    package_index: u32,
448    data: Vec<u8>,
449) {
450    let pkg = OutputData {
451        session_id,
452        response,
453        option,
454        package_index,
455        data: data.clone(),
456        retry_count: 5,
457        status: OutputDataStatus::WaitSend,
458    };
459    QueueManager::put_package(session_id, pkg).await;
460    WaiterManager::wakeup_empty_wait(session_id).await;
461}
462
463#[allow(unused)]
464fn is_response(option: u8) -> bool {
465    let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
466    ret != 0
467}
468
469pub async fn on_read_head(head: UartHead) {
470    let session_id = head.session_id;
471    let option = head.option;
472    let package_index = head.package_index;
473    if option & (UartOption::Free as u16) != 0 {
474        stop_session(session_id).await;
475        return;
476    }
477    if is_response(option as u8) {
478        let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else {
479            return;
480        };
481        pkg.status = if option & (UartOption::Ack as u16) > 1 {
482            OutputDataStatus::ResponseOk
483        } else {
484            OutputDataStatus::WaitSend
485        };
486        QueueManager::update_package(session_id, 0, pkg).await;
487        WaiterManager::wakeup_response_wait(session_id).await;
488    } else {
489        let mut header_obj =
490            uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
491        let header = header_obj.serialize();
492        let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
493        header_obj.head_checksum = u32::to_le(head_sum);
494        let data = header_obj.serialize();
495        output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
496    }
497}
498
499#[allow(unused)]
500fn get_package_index(is_create: bool) -> u32 {
501    static mut PACKAGE_INDEX: u32 = 888;
502
503    unsafe {
504        if is_create {
505            PACKAGE_INDEX += 1;
506            PACKAGE_INDEX
507        } else {
508            PACKAGE_INDEX
509        }
510    }
511}
512
513pub async fn start_uart(session_id: u32, wr: UartWriter) {
514    UartMap::start(session_id, wr).await;
515}
516
517#[allow(unused)]
518pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
519    let mut pkg_index = package_index;
520    if package_index == 0 {
521        pkg_index = get_package_index(true);
522    }
523    let send = serializer::concat_pack(data);
524    crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
525
526    let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
527    let mut index = 0;
528    let len = send.len();
529
530    loop {
531        if index >= len {
532            println!("wrap_put break");
533            break;
534        }
535        let size;
536        let mut op = option;
537        if index + payload_max_len <= len {
538            size = payload_max_len;
539        } else {
540            size = len - index;
541            op = UartOption::Tail as u8 | option;
542        }
543
544        let data = send[index..index + size].to_vec().clone();
545        let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
546        let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
547        header_obj.data_checksum = u32::to_le(data_sum);
548
549        let header = header_obj.serialize();
550        let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
551        header_obj.head_checksum = u32::to_le(head_sum);
552
553        let header = header_obj.serialize();
554        crate::info!("header, header_len:{}", header.len());
555        let total = [header, send[index..index + size].to_vec().clone()].concat();
556
557        output_package(
558            session_id,
559            (op & UartOption::Ack as u8) > 0,
560            op,
561            pkg_index,
562            total,
563        )
564        .await;
565        pkg_index = get_package_index(true);
566        index += size;
567    }
568}
569