1cc290419Sopenharmony_ci/* 2cc290419Sopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd. 3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License. 5cc290419Sopenharmony_ci * You may obtain a copy of the License at 6cc290419Sopenharmony_ci * 7cc290419Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8cc290419Sopenharmony_ci * 9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and 13cc290419Sopenharmony_ci * limitations under the License. 14cc290419Sopenharmony_ci */ 15cc290419Sopenharmony_ci//! uart_wrapper 16cc290419Sopenharmony_ci#![allow(missing_docs)] 17cc290419Sopenharmony_ciuse super::uart::UartWriter; 18cc290419Sopenharmony_ciuse super::{uart, UartMap}; 19cc290419Sopenharmony_ciuse crate::config::{self, TaskMessage}; 20cc290419Sopenharmony_ciuse crate::serializer::native_struct::UartHead; 21cc290419Sopenharmony_ciuse crate::serializer::serialize::Serialization; 22cc290419Sopenharmony_ciuse crate::serializer::{self, UART_HEAD_SIZE}; 23cc290419Sopenharmony_ciuse crate::utils; 24cc290419Sopenharmony_ci#[allow(unused)] 25cc290419Sopenharmony_ciuse crate::utils::hdc_log::*; 26cc290419Sopenharmony_ciuse std::collections::HashMap; 27cc290419Sopenharmony_ciuse std::sync::Arc; 28cc290419Sopenharmony_ci#[cfg(feature = "host")] 29cc290419Sopenharmony_ciextern crate ylong_runtime_static as ylong_runtime; 30cc290419Sopenharmony_ciuse ylong_runtime::sync::waiter::Waiter; 31cc290419Sopenharmony_ciuse ylong_runtime::sync::Mutex; 32cc290419Sopenharmony_ciuse ylong_runtime::task::JoinHandle; 33cc290419Sopenharmony_ci 34cc290419Sopenharmony_ci#[derive(PartialEq, Debug, Clone, Copy)] 35cc290419Sopenharmony_ci#[repr(u8)] 36cc290419Sopenharmony_cipub enum UartOption { 37cc290419Sopenharmony_ci Tail = 1, // makr is the last packget, can be send to session. 38cc290419Sopenharmony_ci Reset = 2, // host request reset session in daemon 39cc290419Sopenharmony_ci Ack = 4, // response the pkg is received 40cc290419Sopenharmony_ci Nak = 8, // request resend pkg again 41cc290419Sopenharmony_ci Free = 16, // request free this session, some unable recovery error happened 42cc290419Sopenharmony_ci} 43cc290419Sopenharmony_ci 44cc290419Sopenharmony_ciimpl TryFrom<u8> for UartOption { 45cc290419Sopenharmony_ci type Error = (); 46cc290419Sopenharmony_ci fn try_from(cmd: u8) -> Result<Self, ()> { 47cc290419Sopenharmony_ci match cmd { 48cc290419Sopenharmony_ci 1 => Ok(Self::Tail), 49cc290419Sopenharmony_ci 2 => Ok(Self::Reset), 50cc290419Sopenharmony_ci 4 => Ok(Self::Ack), 51cc290419Sopenharmony_ci 8 => Ok(Self::Nak), 52cc290419Sopenharmony_ci 16 => Ok(Self::Free), 53cc290419Sopenharmony_ci _ => Err(()), 54cc290419Sopenharmony_ci } 55cc290419Sopenharmony_ci } 56cc290419Sopenharmony_ci} 57cc290419Sopenharmony_ci 58cc290419Sopenharmony_cistruct WaiterManager { 59cc290419Sopenharmony_ci // waiter used for sync package send-response one by one. 60cc290419Sopenharmony_ci response_waiters: HashMap<u32, Waiter>, 61cc290419Sopenharmony_ci // waiter used for waiting if no packages. 62cc290419Sopenharmony_ci empty_waiters: HashMap<u32, Waiter>, 63cc290419Sopenharmony_ci} 64cc290419Sopenharmony_ci 65cc290419Sopenharmony_ciimpl WaiterManager { 66cc290419Sopenharmony_ci fn get_instance() -> &'static mut WaiterManager { 67cc290419Sopenharmony_ci static mut INSTANCE: Option<WaiterManager> = None; 68cc290419Sopenharmony_ci unsafe { 69cc290419Sopenharmony_ci INSTANCE.get_or_insert(WaiterManager { 70cc290419Sopenharmony_ci response_waiters: HashMap::new(), 71cc290419Sopenharmony_ci empty_waiters: HashMap::new(), 72cc290419Sopenharmony_ci }) 73cc290419Sopenharmony_ci } 74cc290419Sopenharmony_ci } 75cc290419Sopenharmony_ci 76cc290419Sopenharmony_ci async fn start_session(session_id: u32) { 77cc290419Sopenharmony_ci let instance = Self::get_instance(); 78cc290419Sopenharmony_ci instance.response_waiters.insert(session_id, Waiter::new()); 79cc290419Sopenharmony_ci instance.empty_waiters.insert(session_id, Waiter::new()); 80cc290419Sopenharmony_ci } 81cc290419Sopenharmony_ci 82cc290419Sopenharmony_ci #[allow(unused)] 83cc290419Sopenharmony_ci async fn wait_response(session_id: u32) { 84cc290419Sopenharmony_ci let instance = Self::get_instance(); 85cc290419Sopenharmony_ci let waiter = instance.response_waiters.get(&session_id); 86cc290419Sopenharmony_ci if let Some(w) = waiter { 87cc290419Sopenharmony_ci w.wait().await; 88cc290419Sopenharmony_ci } 89cc290419Sopenharmony_ci } 90cc290419Sopenharmony_ci 91cc290419Sopenharmony_ci #[allow(unused)] 92cc290419Sopenharmony_ci async fn wakeup_response_wait(session_id: u32) { 93cc290419Sopenharmony_ci let instance = Self::get_instance(); 94cc290419Sopenharmony_ci let waiter = instance.response_waiters.get(&session_id); 95cc290419Sopenharmony_ci if let Some(w) = waiter { 96cc290419Sopenharmony_ci w.wake_one(); 97cc290419Sopenharmony_ci } 98cc290419Sopenharmony_ci } 99cc290419Sopenharmony_ci 100cc290419Sopenharmony_ci #[allow(unused)] 101cc290419Sopenharmony_ci async fn wait_empty(session_id: u32) { 102cc290419Sopenharmony_ci let instance = Self::get_instance(); 103cc290419Sopenharmony_ci let waiter = instance.empty_waiters.get(&session_id); 104cc290419Sopenharmony_ci if let Some(w) = waiter { 105cc290419Sopenharmony_ci w.wait().await; 106cc290419Sopenharmony_ci } 107cc290419Sopenharmony_ci } 108cc290419Sopenharmony_ci 109cc290419Sopenharmony_ci #[allow(unused)] 110cc290419Sopenharmony_ci async fn wakeup_empty_wait(session_id: u32) { 111cc290419Sopenharmony_ci let instance = Self::get_instance(); 112cc290419Sopenharmony_ci let waiter = instance.empty_waiters.get(&session_id); 113cc290419Sopenharmony_ci if let Some(w) = waiter { 114cc290419Sopenharmony_ci w.wake_one(); 115cc290419Sopenharmony_ci } 116cc290419Sopenharmony_ci } 117cc290419Sopenharmony_ci} 118cc290419Sopenharmony_ci 119cc290419Sopenharmony_ci#[derive(PartialEq, Debug, Clone, Copy)] 120cc290419Sopenharmony_ci#[repr(u8)] 121cc290419Sopenharmony_cienum OutputDataStatus { 122cc290419Sopenharmony_ci WaitSend = 0, 123cc290419Sopenharmony_ci WaitResponse = 1, 124cc290419Sopenharmony_ci ResponseOk = 2, 125cc290419Sopenharmony_ci} 126cc290419Sopenharmony_ci 127cc290419Sopenharmony_ci#[derive(PartialEq, Debug, Clone)] 128cc290419Sopenharmony_cistruct OutputData { 129cc290419Sopenharmony_ci session_id: u32, 130cc290419Sopenharmony_ci response: bool, 131cc290419Sopenharmony_ci option: u8, 132cc290419Sopenharmony_ci package_index: u32, 133cc290419Sopenharmony_ci data: Vec<u8>, 134cc290419Sopenharmony_ci status: OutputDataStatus, 135cc290419Sopenharmony_ci retry_count: u32, 136cc290419Sopenharmony_ci} 137cc290419Sopenharmony_ci 138cc290419Sopenharmony_ciimpl std::fmt::Display for OutputData { 139cc290419Sopenharmony_ci fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 140cc290419Sopenharmony_ci write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}", 141cc290419Sopenharmony_ci self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len()) 142cc290419Sopenharmony_ci } 143cc290419Sopenharmony_ci} 144cc290419Sopenharmony_ci 145cc290419Sopenharmony_citype OutputData_ = Arc<Mutex<OutputData>>; 146cc290419Sopenharmony_ci 147cc290419Sopenharmony_citype OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>; 148cc290419Sopenharmony_ci 149cc290419Sopenharmony_cistruct DataQueue { 150cc290419Sopenharmony_ci data_map: HashMap<u32, OutputDataVec_>, 151cc290419Sopenharmony_ci thread_map: HashMap<u32, JoinHandle<()>>, 152cc290419Sopenharmony_ci stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>, 153cc290419Sopenharmony_ci} 154cc290419Sopenharmony_ci 155cc290419Sopenharmony_ciimpl DataQueue { 156cc290419Sopenharmony_ci fn new() -> Self { 157cc290419Sopenharmony_ci Self { 158cc290419Sopenharmony_ci data_map: HashMap::new(), 159cc290419Sopenharmony_ci thread_map: HashMap::new(), 160cc290419Sopenharmony_ci stop_flag_map: HashMap::new(), 161cc290419Sopenharmony_ci } 162cc290419Sopenharmony_ci } 163cc290419Sopenharmony_ci} 164cc290419Sopenharmony_ci 165cc290419Sopenharmony_citype DataQueue_ = Arc<Mutex<DataQueue>>; 166cc290419Sopenharmony_ci 167cc290419Sopenharmony_cipub struct QueueManager {} 168cc290419Sopenharmony_ci 169cc290419Sopenharmony_ciimpl QueueManager { 170cc290419Sopenharmony_ci fn get_instance() -> DataQueue_ { 171cc290419Sopenharmony_ci static mut INSTANCE: Option<DataQueue_> = None; 172cc290419Sopenharmony_ci unsafe { 173cc290419Sopenharmony_ci INSTANCE 174cc290419Sopenharmony_ci .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new()))) 175cc290419Sopenharmony_ci .clone() 176cc290419Sopenharmony_ci } 177cc290419Sopenharmony_ci } 178cc290419Sopenharmony_ci 179cc290419Sopenharmony_ci async fn get_package(session_id: u32, index: usize) -> Option<OutputData> { 180cc290419Sopenharmony_ci let instance = Self::get_instance(); 181cc290419Sopenharmony_ci let mtx = instance.lock().await; 182cc290419Sopenharmony_ci let data_map = &mtx.data_map; 183cc290419Sopenharmony_ci if let Some(vec) = data_map.get(&session_id) { 184cc290419Sopenharmony_ci let vec = vec.lock().await; 185cc290419Sopenharmony_ci if !vec.is_empty() { 186cc290419Sopenharmony_ci let Some(arc) = vec.get(index) else { 187cc290419Sopenharmony_ci crate::error!("get_package get is None"); 188cc290419Sopenharmony_ci return None; 189cc290419Sopenharmony_ci }; 190cc290419Sopenharmony_ci let data_mtx = arc.lock().await; 191cc290419Sopenharmony_ci return Some(data_mtx.clone()); 192cc290419Sopenharmony_ci } 193cc290419Sopenharmony_ci } 194cc290419Sopenharmony_ci None 195cc290419Sopenharmony_ci } 196cc290419Sopenharmony_ci 197cc290419Sopenharmony_ci async fn put_package(session_id: u32, data: OutputData) { 198cc290419Sopenharmony_ci let instance = Self::get_instance(); 199cc290419Sopenharmony_ci let mut mtx = instance.lock().await; 200cc290419Sopenharmony_ci let data_map = &mut mtx.data_map; 201cc290419Sopenharmony_ci if let Some(vec) = data_map.get(&session_id) { 202cc290419Sopenharmony_ci let mut vec = vec.lock().await; 203cc290419Sopenharmony_ci let item = Arc::new(Mutex::new(data)); 204cc290419Sopenharmony_ci vec.push(item); 205cc290419Sopenharmony_ci } else { 206cc290419Sopenharmony_ci let mut vec = Vec::<Arc<Mutex<OutputData>>>::new(); 207cc290419Sopenharmony_ci let d = Arc::new(Mutex::new(data)); 208cc290419Sopenharmony_ci vec.push(d); 209cc290419Sopenharmony_ci let v = Arc::new(Mutex::new(vec)); 210cc290419Sopenharmony_ci data_map.insert(session_id, v); 211cc290419Sopenharmony_ci } 212cc290419Sopenharmony_ci } 213cc290419Sopenharmony_ci 214cc290419Sopenharmony_ci async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool { 215cc290419Sopenharmony_ci let instance = Self::get_instance(); 216cc290419Sopenharmony_ci let mtx = instance.lock().await; 217cc290419Sopenharmony_ci let data_map = &mtx.data_map; 218cc290419Sopenharmony_ci if let Some(vec) = data_map.get(&session_id) { 219cc290419Sopenharmony_ci let vec = vec.lock().await; 220cc290419Sopenharmony_ci if !vec.is_empty() { 221cc290419Sopenharmony_ci let Some(arc) = vec.get(index) else { 222cc290419Sopenharmony_ci crate::error!("update_package get is None"); 223cc290419Sopenharmony_ci return false; 224cc290419Sopenharmony_ci }; 225cc290419Sopenharmony_ci let mut data_mtx = arc.lock().await; 226cc290419Sopenharmony_ci *data_mtx = data; 227cc290419Sopenharmony_ci return true; 228cc290419Sopenharmony_ci } 229cc290419Sopenharmony_ci } 230cc290419Sopenharmony_ci false 231cc290419Sopenharmony_ci } 232cc290419Sopenharmony_ci 233cc290419Sopenharmony_ci async fn get_stop_flag(session_id: u32) -> Option<u8> { 234cc290419Sopenharmony_ci let instance = Self::get_instance(); 235cc290419Sopenharmony_ci let mtx = instance.lock().await; 236cc290419Sopenharmony_ci let stop_flag_map = &mtx.stop_flag_map; 237cc290419Sopenharmony_ci if let Some(flag) = stop_flag_map.get(&session_id) { 238cc290419Sopenharmony_ci let v = flag.lock().await; 239cc290419Sopenharmony_ci Some(*v) 240cc290419Sopenharmony_ci } else { 241cc290419Sopenharmony_ci None 242cc290419Sopenharmony_ci } 243cc290419Sopenharmony_ci } 244cc290419Sopenharmony_ci 245cc290419Sopenharmony_ci #[allow(unused)] 246cc290419Sopenharmony_ci async fn set_stop_flag(session_id: u32) { 247cc290419Sopenharmony_ci let instance = Self::get_instance(); 248cc290419Sopenharmony_ci let mut mtx = instance.lock().await; 249cc290419Sopenharmony_ci let stop_flag_map = &mut mtx.stop_flag_map; 250cc290419Sopenharmony_ci stop_flag_map.insert(session_id, Arc::new(Mutex::new(1))); 251cc290419Sopenharmony_ci } 252cc290419Sopenharmony_ci 253cc290419Sopenharmony_ci async fn remove_package(session_id: u32, index: usize) -> bool { 254cc290419Sopenharmony_ci let instance = Self::get_instance(); 255cc290419Sopenharmony_ci let mtx = instance.lock().await; 256cc290419Sopenharmony_ci let data_map = &mtx.data_map; 257cc290419Sopenharmony_ci if let Some(vec) = data_map.get(&session_id) { 258cc290419Sopenharmony_ci let mut vec = vec.lock().await; 259cc290419Sopenharmony_ci if !vec.is_empty() && index < vec.len() { 260cc290419Sopenharmony_ci vec.remove(index); 261cc290419Sopenharmony_ci return true; 262cc290419Sopenharmony_ci } 263cc290419Sopenharmony_ci } 264cc290419Sopenharmony_ci false 265cc290419Sopenharmony_ci } 266cc290419Sopenharmony_ci 267cc290419Sopenharmony_ci async fn remove_session(session_id: u32) { 268cc290419Sopenharmony_ci let instance = Self::get_instance(); 269cc290419Sopenharmony_ci let mut mtx = instance.lock().await; 270cc290419Sopenharmony_ci mtx.data_map.remove(&session_id); 271cc290419Sopenharmony_ci mtx.stop_flag_map.remove(&session_id); 272cc290419Sopenharmony_ci mtx.thread_map.remove(&session_id); 273cc290419Sopenharmony_ci crate::info!("remove_session:{session_id}"); 274cc290419Sopenharmony_ci } 275cc290419Sopenharmony_ci 276cc290419Sopenharmony_ci async fn check_stop(session_id: u32) -> bool { 277cc290419Sopenharmony_ci if let Some(stop) = Self::get_stop_flag(session_id).await { 278cc290419Sopenharmony_ci return stop == 0; 279cc290419Sopenharmony_ci } 280cc290419Sopenharmony_ci false 281cc290419Sopenharmony_ci } 282cc290419Sopenharmony_ci 283cc290419Sopenharmony_ci async fn session_loop(session_id: u32) { 284cc290419Sopenharmony_ci // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait 285cc290419Sopenharmony_ci // 2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup 286cc290419Sopenharmony_ci // 3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1; 287cc290419Sopenharmony_ci // 如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait) 288cc290419Sopenharmony_ci // retry count为0, 则表示连接中断,stop session 289cc290419Sopenharmony_ci crate::info!("session_loop for {}", session_id); 290cc290419Sopenharmony_ci loop { 291cc290419Sopenharmony_ci if Self::check_stop(session_id).await { 292cc290419Sopenharmony_ci crate::info!("session_loop stop"); 293cc290419Sopenharmony_ci break; 294cc290419Sopenharmony_ci } 295cc290419Sopenharmony_ci let mut first_pkg = Self::get_package(session_id, 0).await; 296cc290419Sopenharmony_ci while first_pkg.is_none() { 297cc290419Sopenharmony_ci WaiterManager::wait_empty(session_id).await; 298cc290419Sopenharmony_ci first_pkg = Self::get_package(session_id, 0).await; 299cc290419Sopenharmony_ci if Self::check_stop(session_id).await { 300cc290419Sopenharmony_ci crate::info!("session_loop stop"); 301cc290419Sopenharmony_ci break; 302cc290419Sopenharmony_ci } 303cc290419Sopenharmony_ci } 304cc290419Sopenharmony_ci if Self::check_stop(session_id).await { 305cc290419Sopenharmony_ci crate::info!("session_loop stop"); 306cc290419Sopenharmony_ci break; 307cc290419Sopenharmony_ci } 308cc290419Sopenharmony_ci let Some(mut first_pkg) = first_pkg else { 309cc290419Sopenharmony_ci crate::info!("session_loop first_pkg is None"); 310cc290419Sopenharmony_ci break; 311cc290419Sopenharmony_ci }; 312cc290419Sopenharmony_ci let mut status = first_pkg.status; 313cc290419Sopenharmony_ci let mut retry_count = first_pkg.retry_count; 314cc290419Sopenharmony_ci 315cc290419Sopenharmony_ci if status == OutputDataStatus::WaitSend { 316cc290419Sopenharmony_ci // 发送数据 317cc290419Sopenharmony_ci let data = first_pkg.data.clone(); 318cc290419Sopenharmony_ci let _ret = UartMap::put(session_id, data).await; 319cc290419Sopenharmony_ci // 如果是ack报文 则不需要等待回应 320cc290419Sopenharmony_ci if first_pkg.response { 321cc290419Sopenharmony_ci QueueManager::remove_package(session_id, 0).await; 322cc290419Sopenharmony_ci continue; 323cc290419Sopenharmony_ci } 324cc290419Sopenharmony_ci // 修改data 的status = WaitResponse 325cc290419Sopenharmony_ci first_pkg.status = OutputDataStatus::WaitResponse; 326cc290419Sopenharmony_ci retry_count -= 1; 327cc290419Sopenharmony_ci first_pkg.retry_count = retry_count; 328cc290419Sopenharmony_ci // 更新数据 329cc290419Sopenharmony_ci QueueManager::update_package(session_id, 0, first_pkg.clone()).await; 330cc290419Sopenharmony_ci // 等待response 331cc290419Sopenharmony_ci WaiterManager::wait_response(session_id).await; 332cc290419Sopenharmony_ci 333cc290419Sopenharmony_ci if Self::check_stop(session_id).await { 334cc290419Sopenharmony_ci crate::info!("session_loop stop"); 335cc290419Sopenharmony_ci break; 336cc290419Sopenharmony_ci } 337cc290419Sopenharmony_ci // 收到回复 338cc290419Sopenharmony_ci // 重新获取数据 339cc290419Sopenharmony_ci 340cc290419Sopenharmony_ci let Some(mut first_pkg) = Self::get_package(session_id, 0).await else { 341cc290419Sopenharmony_ci crate::info!("session_loop first_pkg is None"); 342cc290419Sopenharmony_ci break; 343cc290419Sopenharmony_ci }; 344cc290419Sopenharmony_ci // 得到新状态 345cc290419Sopenharmony_ci status = first_pkg.status; 346cc290419Sopenharmony_ci 347cc290419Sopenharmony_ci if status == OutputDataStatus::ResponseOk { 348cc290419Sopenharmony_ci // 删除当前data 349cc290419Sopenharmony_ci QueueManager::remove_package(session_id, 0).await; 350cc290419Sopenharmony_ci continue; 351cc290419Sopenharmony_ci } 352cc290419Sopenharmony_ci retry_count = first_pkg.retry_count; 353cc290419Sopenharmony_ci while retry_count > 0 && status == OutputDataStatus::WaitResponse { 354cc290419Sopenharmony_ci // 保存retry_count 355cc290419Sopenharmony_ci retry_count -= 1; 356cc290419Sopenharmony_ci first_pkg.retry_count = retry_count; 357cc290419Sopenharmony_ci QueueManager::update_package(session_id, 0, first_pkg.clone()).await; 358cc290419Sopenharmony_ci 359cc290419Sopenharmony_ci // 再次发送数据 360cc290419Sopenharmony_ci let data = first_pkg.data.clone(); 361cc290419Sopenharmony_ci let _ret = UartMap::put(session_id, data).await; 362cc290419Sopenharmony_ci WaiterManager::wait_response(session_id).await; 363cc290419Sopenharmony_ci 364cc290419Sopenharmony_ci if Self::check_stop(session_id).await { 365cc290419Sopenharmony_ci break; 366cc290419Sopenharmony_ci } 367cc290419Sopenharmony_ci 368cc290419Sopenharmony_ci let Some(first_pkg) = Self::get_package(session_id, 0).await else { 369cc290419Sopenharmony_ci break; 370cc290419Sopenharmony_ci }; 371cc290419Sopenharmony_ci status = first_pkg.status; 372cc290419Sopenharmony_ci 373cc290419Sopenharmony_ci match status { 374cc290419Sopenharmony_ci OutputDataStatus::ResponseOk => { 375cc290419Sopenharmony_ci QueueManager::remove_package(session_id, 0).await; 376cc290419Sopenharmony_ci break; 377cc290419Sopenharmony_ci } 378cc290419Sopenharmony_ci OutputDataStatus::WaitResponse => { 379cc290419Sopenharmony_ci let Some(first_pkg) = Self::get_package(session_id, 0).await else { 380cc290419Sopenharmony_ci break; 381cc290419Sopenharmony_ci }; 382cc290419Sopenharmony_ci status = first_pkg.status; 383cc290419Sopenharmony_ci retry_count = first_pkg.retry_count; 384cc290419Sopenharmony_ci continue; 385cc290419Sopenharmony_ci } 386cc290419Sopenharmony_ci OutputDataStatus::WaitSend => { 387cc290419Sopenharmony_ci QueueManager::remove_package(session_id, 0).await; 388cc290419Sopenharmony_ci break; 389cc290419Sopenharmony_ci } 390cc290419Sopenharmony_ci } 391cc290419Sopenharmony_ci } 392cc290419Sopenharmony_ci } 393cc290419Sopenharmony_ci } 394cc290419Sopenharmony_ci Self::remove_session(session_id).await; 395cc290419Sopenharmony_ci crate::info!("session_loop for {} end.", session_id); 396cc290419Sopenharmony_ci } 397cc290419Sopenharmony_ci} 398cc290419Sopenharmony_ci 399cc290419Sopenharmony_cipub async fn start_session(session_id: u32) { 400cc290419Sopenharmony_ci let instance = QueueManager::get_instance(); 401cc290419Sopenharmony_ci let mut mtx = instance.lock().await; 402cc290419Sopenharmony_ci let thread_map = &mut mtx.thread_map; 403cc290419Sopenharmony_ci if thread_map.contains_key(&session_id) { 404cc290419Sopenharmony_ci crate::error!("session thread has started."); 405cc290419Sopenharmony_ci return; 406cc290419Sopenharmony_ci } 407cc290419Sopenharmony_ci 408cc290419Sopenharmony_ci WaiterManager::start_session(session_id).await; 409cc290419Sopenharmony_ci 410cc290419Sopenharmony_ci let handle = utils::spawn(QueueManager::session_loop(session_id)); 411cc290419Sopenharmony_ci thread_map.insert(session_id, handle); 412cc290419Sopenharmony_ci 413cc290419Sopenharmony_ci let stop_flag_map = &mut mtx.stop_flag_map; 414cc290419Sopenharmony_ci stop_flag_map.insert(session_id, Arc::new(Mutex::new(1))); 415cc290419Sopenharmony_ci} 416cc290419Sopenharmony_ci 417cc290419Sopenharmony_ciasync fn stop_session(session_id: u32) { 418cc290419Sopenharmony_ci let instance = QueueManager::get_instance(); 419cc290419Sopenharmony_ci let mut mtx = instance.lock().await; 420cc290419Sopenharmony_ci let stop_flag_map = &mut mtx.stop_flag_map; 421cc290419Sopenharmony_ci stop_flag_map.insert(session_id, Arc::new(Mutex::new(0))); 422cc290419Sopenharmony_ci 423cc290419Sopenharmony_ci WaiterManager::wakeup_empty_wait(session_id).await; 424cc290419Sopenharmony_ci WaiterManager::wakeup_response_wait(session_id).await; 425cc290419Sopenharmony_ci} 426cc290419Sopenharmony_ci 427cc290419Sopenharmony_cipub async fn stop_other_session(session_id: u32) { 428cc290419Sopenharmony_ci let instance = QueueManager::get_instance(); 429cc290419Sopenharmony_ci let mtx = instance.lock().await; 430cc290419Sopenharmony_ci let session_ids = mtx.data_map.keys(); 431cc290419Sopenharmony_ci let mut remove_sessions = Vec::new(); 432cc290419Sopenharmony_ci for k in session_ids { 433cc290419Sopenharmony_ci if *k != session_id { 434cc290419Sopenharmony_ci remove_sessions.push(*k); 435cc290419Sopenharmony_ci } 436cc290419Sopenharmony_ci } 437cc290419Sopenharmony_ci drop(mtx); 438cc290419Sopenharmony_ci for id in remove_sessions { 439cc290419Sopenharmony_ci stop_session(id).await; 440cc290419Sopenharmony_ci } 441cc290419Sopenharmony_ci} 442cc290419Sopenharmony_ci 443cc290419Sopenharmony_ciasync fn output_package( 444cc290419Sopenharmony_ci session_id: u32, 445cc290419Sopenharmony_ci response: bool, 446cc290419Sopenharmony_ci option: u8, 447cc290419Sopenharmony_ci package_index: u32, 448cc290419Sopenharmony_ci data: Vec<u8>, 449cc290419Sopenharmony_ci) { 450cc290419Sopenharmony_ci let pkg = OutputData { 451cc290419Sopenharmony_ci session_id, 452cc290419Sopenharmony_ci response, 453cc290419Sopenharmony_ci option, 454cc290419Sopenharmony_ci package_index, 455cc290419Sopenharmony_ci data: data.clone(), 456cc290419Sopenharmony_ci retry_count: 5, 457cc290419Sopenharmony_ci status: OutputDataStatus::WaitSend, 458cc290419Sopenharmony_ci }; 459cc290419Sopenharmony_ci QueueManager::put_package(session_id, pkg).await; 460cc290419Sopenharmony_ci WaiterManager::wakeup_empty_wait(session_id).await; 461cc290419Sopenharmony_ci} 462cc290419Sopenharmony_ci 463cc290419Sopenharmony_ci#[allow(unused)] 464cc290419Sopenharmony_cifn is_response(option: u8) -> bool { 465cc290419Sopenharmony_ci let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8); 466cc290419Sopenharmony_ci ret != 0 467cc290419Sopenharmony_ci} 468cc290419Sopenharmony_ci 469cc290419Sopenharmony_cipub async fn on_read_head(head: UartHead) { 470cc290419Sopenharmony_ci let session_id = head.session_id; 471cc290419Sopenharmony_ci let option = head.option; 472cc290419Sopenharmony_ci let package_index = head.package_index; 473cc290419Sopenharmony_ci if option & (UartOption::Free as u16) != 0 { 474cc290419Sopenharmony_ci stop_session(session_id).await; 475cc290419Sopenharmony_ci return; 476cc290419Sopenharmony_ci } 477cc290419Sopenharmony_ci if is_response(option as u8) { 478cc290419Sopenharmony_ci let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else { 479cc290419Sopenharmony_ci return; 480cc290419Sopenharmony_ci }; 481cc290419Sopenharmony_ci pkg.status = if option & (UartOption::Ack as u16) > 1 { 482cc290419Sopenharmony_ci OutputDataStatus::ResponseOk 483cc290419Sopenharmony_ci } else { 484cc290419Sopenharmony_ci OutputDataStatus::WaitSend 485cc290419Sopenharmony_ci }; 486cc290419Sopenharmony_ci QueueManager::update_package(session_id, 0, pkg).await; 487cc290419Sopenharmony_ci WaiterManager::wakeup_response_wait(session_id).await; 488cc290419Sopenharmony_ci } else { 489cc290419Sopenharmony_ci let mut header_obj = 490cc290419Sopenharmony_ci uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index); 491cc290419Sopenharmony_ci let header = header_obj.serialize(); 492cc290419Sopenharmony_ci let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32); 493cc290419Sopenharmony_ci header_obj.head_checksum = u32::to_le(head_sum); 494cc290419Sopenharmony_ci let data = header_obj.serialize(); 495cc290419Sopenharmony_ci output_package(session_id, true, UartOption::Ack as u8, package_index, data).await; 496cc290419Sopenharmony_ci } 497cc290419Sopenharmony_ci} 498cc290419Sopenharmony_ci 499cc290419Sopenharmony_ci#[allow(unused)] 500cc290419Sopenharmony_cifn get_package_index(is_create: bool) -> u32 { 501cc290419Sopenharmony_ci static mut PACKAGE_INDEX: u32 = 888; 502cc290419Sopenharmony_ci 503cc290419Sopenharmony_ci unsafe { 504cc290419Sopenharmony_ci if is_create { 505cc290419Sopenharmony_ci PACKAGE_INDEX += 1; 506cc290419Sopenharmony_ci PACKAGE_INDEX 507cc290419Sopenharmony_ci } else { 508cc290419Sopenharmony_ci PACKAGE_INDEX 509cc290419Sopenharmony_ci } 510cc290419Sopenharmony_ci } 511cc290419Sopenharmony_ci} 512cc290419Sopenharmony_ci 513cc290419Sopenharmony_cipub async fn start_uart(session_id: u32, wr: UartWriter) { 514cc290419Sopenharmony_ci UartMap::start(session_id, wr).await; 515cc290419Sopenharmony_ci} 516cc290419Sopenharmony_ci 517cc290419Sopenharmony_ci#[allow(unused)] 518cc290419Sopenharmony_cipub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) { 519cc290419Sopenharmony_ci let mut pkg_index = package_index; 520cc290419Sopenharmony_ci if package_index == 0 { 521cc290419Sopenharmony_ci pkg_index = get_package_index(true); 522cc290419Sopenharmony_ci } 523cc290419Sopenharmony_ci let send = serializer::concat_pack(data); 524cc290419Sopenharmony_ci crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send); 525cc290419Sopenharmony_ci 526cc290419Sopenharmony_ci let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE; 527cc290419Sopenharmony_ci let mut index = 0; 528cc290419Sopenharmony_ci let len = send.len(); 529cc290419Sopenharmony_ci 530cc290419Sopenharmony_ci loop { 531cc290419Sopenharmony_ci if index >= len { 532cc290419Sopenharmony_ci println!("wrap_put break"); 533cc290419Sopenharmony_ci break; 534cc290419Sopenharmony_ci } 535cc290419Sopenharmony_ci let size; 536cc290419Sopenharmony_ci let mut op = option; 537cc290419Sopenharmony_ci if index + payload_max_len <= len { 538cc290419Sopenharmony_ci size = payload_max_len; 539cc290419Sopenharmony_ci } else { 540cc290419Sopenharmony_ci size = len - index; 541cc290419Sopenharmony_ci op = UartOption::Tail as u8 | option; 542cc290419Sopenharmony_ci } 543cc290419Sopenharmony_ci 544cc290419Sopenharmony_ci let data = send[index..index + size].to_vec().clone(); 545cc290419Sopenharmony_ci let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32); 546cc290419Sopenharmony_ci let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index); 547cc290419Sopenharmony_ci header_obj.data_checksum = u32::to_le(data_sum); 548cc290419Sopenharmony_ci 549cc290419Sopenharmony_ci let header = header_obj.serialize(); 550cc290419Sopenharmony_ci let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32); 551cc290419Sopenharmony_ci header_obj.head_checksum = u32::to_le(head_sum); 552cc290419Sopenharmony_ci 553cc290419Sopenharmony_ci let header = header_obj.serialize(); 554cc290419Sopenharmony_ci crate::info!("header, header_len:{}", header.len()); 555cc290419Sopenharmony_ci let total = [header, send[index..index + size].to_vec().clone()].concat(); 556cc290419Sopenharmony_ci 557cc290419Sopenharmony_ci output_package( 558cc290419Sopenharmony_ci session_id, 559cc290419Sopenharmony_ci (op & UartOption::Ack as u8) > 0, 560cc290419Sopenharmony_ci op, 561cc290419Sopenharmony_ci pkg_index, 562cc290419Sopenharmony_ci total, 563cc290419Sopenharmony_ci ) 564cc290419Sopenharmony_ci .await; 565cc290419Sopenharmony_ci pkg_index = get_package_index(true); 566cc290419Sopenharmony_ci index += size; 567cc290419Sopenharmony_ci } 568cc290419Sopenharmony_ci} 569