1/* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15//! uart_wrapper 16#![allow(missing_docs)] 17use super::uart::UartWriter; 18use super::{uart, UartMap}; 19use crate::config::{self, TaskMessage}; 20use crate::serializer::native_struct::UartHead; 21use crate::serializer::serialize::Serialization; 22use crate::serializer::{self, UART_HEAD_SIZE}; 23use crate::utils; 24#[allow(unused)] 25use crate::utils::hdc_log::*; 26use std::collections::HashMap; 27use std::sync::Arc; 28#[cfg(feature = "host")] 29extern crate ylong_runtime_static as ylong_runtime; 30use ylong_runtime::sync::waiter::Waiter; 31use ylong_runtime::sync::Mutex; 32use ylong_runtime::task::JoinHandle; 33 34#[derive(PartialEq, Debug, Clone, Copy)] 35#[repr(u8)] 36pub enum UartOption { 37 Tail = 1, // makr is the last packget, can be send to session. 38 Reset = 2, // host request reset session in daemon 39 Ack = 4, // response the pkg is received 40 Nak = 8, // request resend pkg again 41 Free = 16, // request free this session, some unable recovery error happened 42} 43 44impl TryFrom<u8> for UartOption { 45 type Error = (); 46 fn try_from(cmd: u8) -> Result<Self, ()> { 47 match cmd { 48 1 => Ok(Self::Tail), 49 2 => Ok(Self::Reset), 50 4 => Ok(Self::Ack), 51 8 => Ok(Self::Nak), 52 16 => Ok(Self::Free), 53 _ => Err(()), 54 } 55 } 56} 57 58struct WaiterManager { 59 // waiter used for sync package send-response one by one. 60 response_waiters: HashMap<u32, Waiter>, 61 // waiter used for waiting if no packages. 62 empty_waiters: HashMap<u32, Waiter>, 63} 64 65impl WaiterManager { 66 fn get_instance() -> &'static mut WaiterManager { 67 static mut INSTANCE: Option<WaiterManager> = None; 68 unsafe { 69 INSTANCE.get_or_insert(WaiterManager { 70 response_waiters: HashMap::new(), 71 empty_waiters: HashMap::new(), 72 }) 73 } 74 } 75 76 async fn start_session(session_id: u32) { 77 let instance = Self::get_instance(); 78 instance.response_waiters.insert(session_id, Waiter::new()); 79 instance.empty_waiters.insert(session_id, Waiter::new()); 80 } 81 82 #[allow(unused)] 83 async fn wait_response(session_id: u32) { 84 let instance = Self::get_instance(); 85 let waiter = instance.response_waiters.get(&session_id); 86 if let Some(w) = waiter { 87 w.wait().await; 88 } 89 } 90 91 #[allow(unused)] 92 async fn wakeup_response_wait(session_id: u32) { 93 let instance = Self::get_instance(); 94 let waiter = instance.response_waiters.get(&session_id); 95 if let Some(w) = waiter { 96 w.wake_one(); 97 } 98 } 99 100 #[allow(unused)] 101 async fn wait_empty(session_id: u32) { 102 let instance = Self::get_instance(); 103 let waiter = instance.empty_waiters.get(&session_id); 104 if let Some(w) = waiter { 105 w.wait().await; 106 } 107 } 108 109 #[allow(unused)] 110 async fn wakeup_empty_wait(session_id: u32) { 111 let instance = Self::get_instance(); 112 let waiter = instance.empty_waiters.get(&session_id); 113 if let Some(w) = waiter { 114 w.wake_one(); 115 } 116 } 117} 118 119#[derive(PartialEq, Debug, Clone, Copy)] 120#[repr(u8)] 121enum OutputDataStatus { 122 WaitSend = 0, 123 WaitResponse = 1, 124 ResponseOk = 2, 125} 126 127#[derive(PartialEq, Debug, Clone)] 128struct OutputData { 129 session_id: u32, 130 response: bool, 131 option: u8, 132 package_index: u32, 133 data: Vec<u8>, 134 status: OutputDataStatus, 135 retry_count: u32, 136} 137 138impl std::fmt::Display for OutputData { 139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 140 write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}", 141 self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len()) 142 } 143} 144 145type OutputData_ = Arc<Mutex<OutputData>>; 146 147type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>; 148 149struct DataQueue { 150 data_map: HashMap<u32, OutputDataVec_>, 151 thread_map: HashMap<u32, JoinHandle<()>>, 152 stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>, 153} 154 155impl DataQueue { 156 fn new() -> Self { 157 Self { 158 data_map: HashMap::new(), 159 thread_map: HashMap::new(), 160 stop_flag_map: HashMap::new(), 161 } 162 } 163} 164 165type DataQueue_ = Arc<Mutex<DataQueue>>; 166 167pub struct QueueManager {} 168 169impl QueueManager { 170 fn get_instance() -> DataQueue_ { 171 static mut INSTANCE: Option<DataQueue_> = None; 172 unsafe { 173 INSTANCE 174 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new()))) 175 .clone() 176 } 177 } 178 179 async fn get_package(session_id: u32, index: usize) -> Option<OutputData> { 180 let instance = Self::get_instance(); 181 let mtx = instance.lock().await; 182 let data_map = &mtx.data_map; 183 if let Some(vec) = data_map.get(&session_id) { 184 let vec = vec.lock().await; 185 if !vec.is_empty() { 186 let Some(arc) = vec.get(index) else { 187 crate::error!("get_package get is None"); 188 return None; 189 }; 190 let data_mtx = arc.lock().await; 191 return Some(data_mtx.clone()); 192 } 193 } 194 None 195 } 196 197 async fn put_package(session_id: u32, data: OutputData) { 198 let instance = Self::get_instance(); 199 let mut mtx = instance.lock().await; 200 let data_map = &mut mtx.data_map; 201 if let Some(vec) = data_map.get(&session_id) { 202 let mut vec = vec.lock().await; 203 let item = Arc::new(Mutex::new(data)); 204 vec.push(item); 205 } else { 206 let mut vec = Vec::<Arc<Mutex<OutputData>>>::new(); 207 let d = Arc::new(Mutex::new(data)); 208 vec.push(d); 209 let v = Arc::new(Mutex::new(vec)); 210 data_map.insert(session_id, v); 211 } 212 } 213 214 async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool { 215 let instance = Self::get_instance(); 216 let mtx = instance.lock().await; 217 let data_map = &mtx.data_map; 218 if let Some(vec) = data_map.get(&session_id) { 219 let vec = vec.lock().await; 220 if !vec.is_empty() { 221 let Some(arc) = vec.get(index) else { 222 crate::error!("update_package get is None"); 223 return false; 224 }; 225 let mut data_mtx = arc.lock().await; 226 *data_mtx = data; 227 return true; 228 } 229 } 230 false 231 } 232 233 async fn get_stop_flag(session_id: u32) -> Option<u8> { 234 let instance = Self::get_instance(); 235 let mtx = instance.lock().await; 236 let stop_flag_map = &mtx.stop_flag_map; 237 if let Some(flag) = stop_flag_map.get(&session_id) { 238 let v = flag.lock().await; 239 Some(*v) 240 } else { 241 None 242 } 243 } 244 245 #[allow(unused)] 246 async fn set_stop_flag(session_id: u32) { 247 let instance = Self::get_instance(); 248 let mut mtx = instance.lock().await; 249 let stop_flag_map = &mut mtx.stop_flag_map; 250 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1))); 251 } 252 253 async fn remove_package(session_id: u32, index: usize) -> bool { 254 let instance = Self::get_instance(); 255 let mtx = instance.lock().await; 256 let data_map = &mtx.data_map; 257 if let Some(vec) = data_map.get(&session_id) { 258 let mut vec = vec.lock().await; 259 if !vec.is_empty() && index < vec.len() { 260 vec.remove(index); 261 return true; 262 } 263 } 264 false 265 } 266 267 async fn remove_session(session_id: u32) { 268 let instance = Self::get_instance(); 269 let mut mtx = instance.lock().await; 270 mtx.data_map.remove(&session_id); 271 mtx.stop_flag_map.remove(&session_id); 272 mtx.thread_map.remove(&session_id); 273 crate::info!("remove_session:{session_id}"); 274 } 275 276 async fn check_stop(session_id: u32) -> bool { 277 if let Some(stop) = Self::get_stop_flag(session_id).await { 278 return stop == 0; 279 } 280 false 281 } 282 283 async fn session_loop(session_id: u32) { 284 // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait 285 // 2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup 286 // 3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1; 287 // 如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait) 288 // retry count为0, 则表示连接中断,stop session 289 crate::info!("session_loop for {}", session_id); 290 loop { 291 if Self::check_stop(session_id).await { 292 crate::info!("session_loop stop"); 293 break; 294 } 295 let mut first_pkg = Self::get_package(session_id, 0).await; 296 while first_pkg.is_none() { 297 WaiterManager::wait_empty(session_id).await; 298 first_pkg = Self::get_package(session_id, 0).await; 299 if Self::check_stop(session_id).await { 300 crate::info!("session_loop stop"); 301 break; 302 } 303 } 304 if Self::check_stop(session_id).await { 305 crate::info!("session_loop stop"); 306 break; 307 } 308 let Some(mut first_pkg) = first_pkg else { 309 crate::info!("session_loop first_pkg is None"); 310 break; 311 }; 312 let mut status = first_pkg.status; 313 let mut retry_count = first_pkg.retry_count; 314 315 if status == OutputDataStatus::WaitSend { 316 // 发送数据 317 let data = first_pkg.data.clone(); 318 let _ret = UartMap::put(session_id, data).await; 319 // 如果是ack报文 则不需要等待回应 320 if first_pkg.response { 321 QueueManager::remove_package(session_id, 0).await; 322 continue; 323 } 324 // 修改data 的status = WaitResponse 325 first_pkg.status = OutputDataStatus::WaitResponse; 326 retry_count -= 1; 327 first_pkg.retry_count = retry_count; 328 // 更新数据 329 QueueManager::update_package(session_id, 0, first_pkg.clone()).await; 330 // 等待response 331 WaiterManager::wait_response(session_id).await; 332 333 if Self::check_stop(session_id).await { 334 crate::info!("session_loop stop"); 335 break; 336 } 337 // 收到回复 338 // 重新获取数据 339 340 let Some(mut first_pkg) = Self::get_package(session_id, 0).await else { 341 crate::info!("session_loop first_pkg is None"); 342 break; 343 }; 344 // 得到新状态 345 status = first_pkg.status; 346 347 if status == OutputDataStatus::ResponseOk { 348 // 删除当前data 349 QueueManager::remove_package(session_id, 0).await; 350 continue; 351 } 352 retry_count = first_pkg.retry_count; 353 while retry_count > 0 && status == OutputDataStatus::WaitResponse { 354 // 保存retry_count 355 retry_count -= 1; 356 first_pkg.retry_count = retry_count; 357 QueueManager::update_package(session_id, 0, first_pkg.clone()).await; 358 359 // 再次发送数据 360 let data = first_pkg.data.clone(); 361 let _ret = UartMap::put(session_id, data).await; 362 WaiterManager::wait_response(session_id).await; 363 364 if Self::check_stop(session_id).await { 365 break; 366 } 367 368 let Some(first_pkg) = Self::get_package(session_id, 0).await else { 369 break; 370 }; 371 status = first_pkg.status; 372 373 match status { 374 OutputDataStatus::ResponseOk => { 375 QueueManager::remove_package(session_id, 0).await; 376 break; 377 } 378 OutputDataStatus::WaitResponse => { 379 let Some(first_pkg) = Self::get_package(session_id, 0).await else { 380 break; 381 }; 382 status = first_pkg.status; 383 retry_count = first_pkg.retry_count; 384 continue; 385 } 386 OutputDataStatus::WaitSend => { 387 QueueManager::remove_package(session_id, 0).await; 388 break; 389 } 390 } 391 } 392 } 393 } 394 Self::remove_session(session_id).await; 395 crate::info!("session_loop for {} end.", session_id); 396 } 397} 398 399pub async fn start_session(session_id: u32) { 400 let instance = QueueManager::get_instance(); 401 let mut mtx = instance.lock().await; 402 let thread_map = &mut mtx.thread_map; 403 if thread_map.contains_key(&session_id) { 404 crate::error!("session thread has started."); 405 return; 406 } 407 408 WaiterManager::start_session(session_id).await; 409 410 let handle = utils::spawn(QueueManager::session_loop(session_id)); 411 thread_map.insert(session_id, handle); 412 413 let stop_flag_map = &mut mtx.stop_flag_map; 414 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1))); 415} 416 417async fn stop_session(session_id: u32) { 418 let instance = QueueManager::get_instance(); 419 let mut mtx = instance.lock().await; 420 let stop_flag_map = &mut mtx.stop_flag_map; 421 stop_flag_map.insert(session_id, Arc::new(Mutex::new(0))); 422 423 WaiterManager::wakeup_empty_wait(session_id).await; 424 WaiterManager::wakeup_response_wait(session_id).await; 425} 426 427pub async fn stop_other_session(session_id: u32) { 428 let instance = QueueManager::get_instance(); 429 let mtx = instance.lock().await; 430 let session_ids = mtx.data_map.keys(); 431 let mut remove_sessions = Vec::new(); 432 for k in session_ids { 433 if *k != session_id { 434 remove_sessions.push(*k); 435 } 436 } 437 drop(mtx); 438 for id in remove_sessions { 439 stop_session(id).await; 440 } 441} 442 443async fn output_package( 444 session_id: u32, 445 response: bool, 446 option: u8, 447 package_index: u32, 448 data: Vec<u8>, 449) { 450 let pkg = OutputData { 451 session_id, 452 response, 453 option, 454 package_index, 455 data: data.clone(), 456 retry_count: 5, 457 status: OutputDataStatus::WaitSend, 458 }; 459 QueueManager::put_package(session_id, pkg).await; 460 WaiterManager::wakeup_empty_wait(session_id).await; 461} 462 463#[allow(unused)] 464fn is_response(option: u8) -> bool { 465 let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8); 466 ret != 0 467} 468 469pub async fn on_read_head(head: UartHead) { 470 let session_id = head.session_id; 471 let option = head.option; 472 let package_index = head.package_index; 473 if option & (UartOption::Free as u16) != 0 { 474 stop_session(session_id).await; 475 return; 476 } 477 if is_response(option as u8) { 478 let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else { 479 return; 480 }; 481 pkg.status = if option & (UartOption::Ack as u16) > 1 { 482 OutputDataStatus::ResponseOk 483 } else { 484 OutputDataStatus::WaitSend 485 }; 486 QueueManager::update_package(session_id, 0, pkg).await; 487 WaiterManager::wakeup_response_wait(session_id).await; 488 } else { 489 let mut header_obj = 490 uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index); 491 let header = header_obj.serialize(); 492 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32); 493 header_obj.head_checksum = u32::to_le(head_sum); 494 let data = header_obj.serialize(); 495 output_package(session_id, true, UartOption::Ack as u8, package_index, data).await; 496 } 497} 498 499#[allow(unused)] 500fn get_package_index(is_create: bool) -> u32 { 501 static mut PACKAGE_INDEX: u32 = 888; 502 503 unsafe { 504 if is_create { 505 PACKAGE_INDEX += 1; 506 PACKAGE_INDEX 507 } else { 508 PACKAGE_INDEX 509 } 510 } 511} 512 513pub async fn start_uart(session_id: u32, wr: UartWriter) { 514 UartMap::start(session_id, wr).await; 515} 516 517#[allow(unused)] 518pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) { 519 let mut pkg_index = package_index; 520 if package_index == 0 { 521 pkg_index = get_package_index(true); 522 } 523 let send = serializer::concat_pack(data); 524 crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send); 525 526 let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE; 527 let mut index = 0; 528 let len = send.len(); 529 530 loop { 531 if index >= len { 532 println!("wrap_put break"); 533 break; 534 } 535 let size; 536 let mut op = option; 537 if index + payload_max_len <= len { 538 size = payload_max_len; 539 } else { 540 size = len - index; 541 op = UartOption::Tail as u8 | option; 542 } 543 544 let data = send[index..index + size].to_vec().clone(); 545 let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32); 546 let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index); 547 header_obj.data_checksum = u32::to_le(data_sum); 548 549 let header = header_obj.serialize(); 550 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32); 551 header_obj.head_checksum = u32::to_le(head_sum); 552 553 let header = header_obj.serialize(); 554 crate::info!("header, header_len:{}", header.len()); 555 let total = [header, send[index..index + size].to_vec().clone()].concat(); 556 557 output_package( 558 session_id, 559 (op & UartOption::Ack as u8) > 0, 560 op, 561 pkg_index, 562 total, 563 ) 564 .await; 565 pkg_index = get_package_index(true); 566 index += size; 567 } 568} 569