1cc290419Sopenharmony_ci/* 2cc290419Sopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd. 3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License. 5cc290419Sopenharmony_ci * You may obtain a copy of the License at 6cc290419Sopenharmony_ci * 7cc290419Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8cc290419Sopenharmony_ci * 9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and 13cc290419Sopenharmony_ci * limitations under the License. 14cc290419Sopenharmony_ci */ 15cc290419Sopenharmony_ci//! buffer 16cc290419Sopenharmony_ci#![allow(missing_docs)] 17cc290419Sopenharmony_ci 18cc290419Sopenharmony_ciuse super::base::{self, Writer}; 19cc290419Sopenharmony_ciuse super::uart::UartWriter; 20cc290419Sopenharmony_ciuse super::usb::{self, UsbReader, UsbWriter}; 21cc290419Sopenharmony_ciuse super::{tcp, uart_wrapper}; 22cc290419Sopenharmony_ci#[cfg(feature = "emulator")] 23cc290419Sopenharmony_ciuse crate::daemon_lib::bridge::BridgeMap; 24cc290419Sopenharmony_ci#[cfg(feature = "host")] 25cc290419Sopenharmony_ciuse crate::host_transfer::host_usb::HostUsbMap; 26cc290419Sopenharmony_ci 27cc290419Sopenharmony_ciuse crate::config::TaskMessage; 28cc290419Sopenharmony_ciuse crate::config::{self, ConnectType}; 29cc290419Sopenharmony_ciuse crate::serializer; 30cc290419Sopenharmony_ci#[allow(unused)] 31cc290419Sopenharmony_ciuse crate::utils::hdc_log::*; 32cc290419Sopenharmony_ci#[cfg(not(feature = "host"))] 33cc290419Sopenharmony_ciuse crate::daemon_lib::task_manager; 34cc290419Sopenharmony_ciuse std::collections::{HashMap, HashSet, VecDeque}; 35cc290419Sopenharmony_ciuse std::io::{self, Error, ErrorKind}; 36cc290419Sopenharmony_ciuse std::sync::Arc; 37cc290419Sopenharmony_ciuse std::sync::Once; 38cc290419Sopenharmony_ciuse std::mem::MaybeUninit; 39cc290419Sopenharmony_ciuse crate::transfer::usb::usb_write_all; 40cc290419Sopenharmony_ciuse std::time::Duration; 41cc290419Sopenharmony_ci 42cc290419Sopenharmony_ci#[cfg(feature = "host")] 43cc290419Sopenharmony_ciextern crate ylong_runtime_static as ylong_runtime; 44cc290419Sopenharmony_ciuse ylong_runtime::io::AsyncWriteExt; 45cc290419Sopenharmony_ciuse ylong_runtime::net::{SplitReadHalf, SplitWriteHalf}; 46cc290419Sopenharmony_ciuse ylong_runtime::sync::{mpsc, Mutex, RwLock}; 47cc290419Sopenharmony_ci 48cc290419Sopenharmony_citype ConnectTypeMap_ = Arc<RwLock<HashMap<u32, ConnectType>>>; 49cc290419Sopenharmony_ci 50cc290419Sopenharmony_cipub struct ConnectTypeMap {} 51cc290419Sopenharmony_ciimpl ConnectTypeMap { 52cc290419Sopenharmony_ci fn get_instance() -> ConnectTypeMap_ { 53cc290419Sopenharmony_ci static mut CONNECT_TYPE_MAP: Option<ConnectTypeMap_> = None; 54cc290419Sopenharmony_ci unsafe { 55cc290419Sopenharmony_ci CONNECT_TYPE_MAP 56cc290419Sopenharmony_ci .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) 57cc290419Sopenharmony_ci .clone() 58cc290419Sopenharmony_ci } 59cc290419Sopenharmony_ci } 60cc290419Sopenharmony_ci 61cc290419Sopenharmony_ci pub async fn put(session_id: u32, conn_type: ConnectType) { 62cc290419Sopenharmony_ci let arc_map = Self::get_instance(); 63cc290419Sopenharmony_ci let mut map = arc_map.write().await; 64cc290419Sopenharmony_ci map.insert(session_id, conn_type.clone()); 65cc290419Sopenharmony_ci crate::debug!("connect map: add {session_id}, {:?}", conn_type); 66cc290419Sopenharmony_ci } 67cc290419Sopenharmony_ci 68cc290419Sopenharmony_ci pub async fn get(session_id: u32) -> Option<ConnectType> { 69cc290419Sopenharmony_ci let arc_map = Self::get_instance(); 70cc290419Sopenharmony_ci let map = arc_map.read().await; 71cc290419Sopenharmony_ci map.get(&session_id).cloned() 72cc290419Sopenharmony_ci } 73cc290419Sopenharmony_ci 74cc290419Sopenharmony_ci pub async fn del(session_id: u32) { 75cc290419Sopenharmony_ci let arc_map = Self::get_instance(); 76cc290419Sopenharmony_ci let mut map = arc_map.write().await; 77cc290419Sopenharmony_ci let item = map.remove(&session_id); 78cc290419Sopenharmony_ci DiedSession::add(session_id).await; 79cc290419Sopenharmony_ci crate::debug!("connect map: del {session_id}: {:?}", item); 80cc290419Sopenharmony_ci } 81cc290419Sopenharmony_ci 82cc290419Sopenharmony_ci pub async fn get_all_session() -> Vec<u32> { 83cc290419Sopenharmony_ci let mut sessiones = Vec::new(); 84cc290419Sopenharmony_ci let arc_map = Self::get_instance(); 85cc290419Sopenharmony_ci let map = arc_map.read().await; 86cc290419Sopenharmony_ci for item in map.iter() { 87cc290419Sopenharmony_ci sessiones.push(*item.0); 88cc290419Sopenharmony_ci } 89cc290419Sopenharmony_ci sessiones 90cc290419Sopenharmony_ci } 91cc290419Sopenharmony_ci 92cc290419Sopenharmony_ci pub async fn dump() -> String { 93cc290419Sopenharmony_ci let arc_map = Self::get_instance(); 94cc290419Sopenharmony_ci let map = arc_map.read().await; 95cc290419Sopenharmony_ci let mut result = "".to_string(); 96cc290419Sopenharmony_ci for item in map.iter() { 97cc290419Sopenharmony_ci let line = format!("session_id:{},\tconnect_type:{:?}\n", item.0, item.1); 98cc290419Sopenharmony_ci result.push_str(line.as_str()); 99cc290419Sopenharmony_ci } 100cc290419Sopenharmony_ci result 101cc290419Sopenharmony_ci } 102cc290419Sopenharmony_ci} 103cc290419Sopenharmony_ci 104cc290419Sopenharmony_cipub async fn dump_session() -> String { 105cc290419Sopenharmony_ci ConnectTypeMap::dump().await 106cc290419Sopenharmony_ci} 107cc290419Sopenharmony_ci 108cc290419Sopenharmony_citype TcpWriter_ = Arc<Mutex<SplitWriteHalf>>; 109cc290419Sopenharmony_ci 110cc290419Sopenharmony_cipub struct TcpMap { 111cc290419Sopenharmony_ci map: Mutex<HashMap<u32, TcpWriter_>>, 112cc290419Sopenharmony_ci} 113cc290419Sopenharmony_ciimpl TcpMap { 114cc290419Sopenharmony_ci pub(crate) fn get_instance() -> &'static TcpMap { 115cc290419Sopenharmony_ci static mut TCP_MAP: MaybeUninit<TcpMap> = MaybeUninit::uninit(); 116cc290419Sopenharmony_ci static ONCE: Once = Once::new(); 117cc290419Sopenharmony_ci 118cc290419Sopenharmony_ci unsafe { 119cc290419Sopenharmony_ci ONCE.call_once(|| { 120cc290419Sopenharmony_ci let global = TcpMap { 121cc290419Sopenharmony_ci map: Mutex::new(HashMap::new()) 122cc290419Sopenharmony_ci }; 123cc290419Sopenharmony_ci TCP_MAP = MaybeUninit::new(global); 124cc290419Sopenharmony_ci }); 125cc290419Sopenharmony_ci &*TCP_MAP.as_ptr() 126cc290419Sopenharmony_ci } 127cc290419Sopenharmony_ci } 128cc290419Sopenharmony_ci 129cc290419Sopenharmony_ci async fn put(session_id: u32, data: TaskMessage) { 130cc290419Sopenharmony_ci let send = serializer::concat_pack(data); 131cc290419Sopenharmony_ci let instance = Self::get_instance(); 132cc290419Sopenharmony_ci let map = instance.map.lock().await; 133cc290419Sopenharmony_ci let Some(arc_wr) = map.get(&session_id) else { 134cc290419Sopenharmony_ci crate::error!("get tcp is None, session_id={session_id}"); 135cc290419Sopenharmony_ci return; 136cc290419Sopenharmony_ci }; 137cc290419Sopenharmony_ci let mut wr = arc_wr.lock().await; 138cc290419Sopenharmony_ci let _ = wr.write_all(send.as_slice()).await; 139cc290419Sopenharmony_ci } 140cc290419Sopenharmony_ci 141cc290419Sopenharmony_ci pub async fn send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()> { 142cc290419Sopenharmony_ci crate::trace!( 143cc290419Sopenharmony_ci "send channel({channel_id}) msg: {:?}", 144cc290419Sopenharmony_ci buf.iter() 145cc290419Sopenharmony_ci .map(|&c| format!("{c:02X}")) 146cc290419Sopenharmony_ci .collect::<Vec<_>>() 147cc290419Sopenharmony_ci .join(" ") 148cc290419Sopenharmony_ci ); 149cc290419Sopenharmony_ci let send = [ 150cc290419Sopenharmony_ci u32::to_be_bytes(buf.len() as u32).as_slice(), 151cc290419Sopenharmony_ci buf.as_slice(), 152cc290419Sopenharmony_ci ] 153cc290419Sopenharmony_ci .concat(); 154cc290419Sopenharmony_ci let instance = Self::get_instance(); 155cc290419Sopenharmony_ci let map = instance.map.lock().await; 156cc290419Sopenharmony_ci if let Some(guard) = map.get(&channel_id) { 157cc290419Sopenharmony_ci let mut wr = guard.lock().await; 158cc290419Sopenharmony_ci let _ = wr.write_all(send.as_slice()).await; 159cc290419Sopenharmony_ci return Ok(()); 160cc290419Sopenharmony_ci } 161cc290419Sopenharmony_ci Err(Error::new(ErrorKind::NotFound, "channel not found")) 162cc290419Sopenharmony_ci } 163cc290419Sopenharmony_ci 164cc290419Sopenharmony_ci pub async fn start(id: u32, wr: SplitWriteHalf) { 165cc290419Sopenharmony_ci let instance = Self::get_instance(); 166cc290419Sopenharmony_ci let mut map = instance.map.lock().await; 167cc290419Sopenharmony_ci let arc_wr = Arc::new(Mutex::new(wr)); 168cc290419Sopenharmony_ci map.insert(id, arc_wr); 169cc290419Sopenharmony_ci ConnectTypeMap::put(id, ConnectType::Tcp).await; 170cc290419Sopenharmony_ci crate::warn!("tcp start {id}"); 171cc290419Sopenharmony_ci } 172cc290419Sopenharmony_ci 173cc290419Sopenharmony_ci pub async fn end(id: u32) { 174cc290419Sopenharmony_ci let instance = Self::get_instance(); 175cc290419Sopenharmony_ci let mut map = instance.map.lock().await; 176cc290419Sopenharmony_ci if let Some(arc_wr) = map.remove(&id) { 177cc290419Sopenharmony_ci let mut wr = arc_wr.lock().await; 178cc290419Sopenharmony_ci let _ = wr.shutdown().await; 179cc290419Sopenharmony_ci } 180cc290419Sopenharmony_ci crate::warn!("tcp end {id}"); 181cc290419Sopenharmony_ci ConnectTypeMap::del(id).await; 182cc290419Sopenharmony_ci } 183cc290419Sopenharmony_ci} 184cc290419Sopenharmony_ci 185cc290419Sopenharmony_cipub struct UsbMap { 186cc290419Sopenharmony_ci map: std::sync::Mutex<HashMap<u32, UsbWriter>>, 187cc290419Sopenharmony_ci lock: std::sync::Mutex<i32>, 188cc290419Sopenharmony_ci} 189cc290419Sopenharmony_ciimpl UsbMap { 190cc290419Sopenharmony_ci pub(crate) fn get_instance() -> &'static UsbMap { 191cc290419Sopenharmony_ci static mut USB_MAP: MaybeUninit<UsbMap> = MaybeUninit::uninit(); 192cc290419Sopenharmony_ci static ONCE: Once = Once::new(); 193cc290419Sopenharmony_ci 194cc290419Sopenharmony_ci unsafe { 195cc290419Sopenharmony_ci ONCE.call_once(|| { 196cc290419Sopenharmony_ci let global = UsbMap { 197cc290419Sopenharmony_ci map: std::sync::Mutex::new(HashMap::new()), 198cc290419Sopenharmony_ci lock: std::sync::Mutex::new(0) 199cc290419Sopenharmony_ci }; 200cc290419Sopenharmony_ci USB_MAP = MaybeUninit::new(global); 201cc290419Sopenharmony_ci }); 202cc290419Sopenharmony_ci &*USB_MAP.as_ptr() 203cc290419Sopenharmony_ci } 204cc290419Sopenharmony_ci } 205cc290419Sopenharmony_ci 206cc290419Sopenharmony_ci #[allow(unused)] 207cc290419Sopenharmony_ci async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> { 208cc290419Sopenharmony_ci if DiedSession::get(session_id).await { 209cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::NotFound, "session already died"));; 210cc290419Sopenharmony_ci } 211cc290419Sopenharmony_ci let mut fd = 0; 212cc290419Sopenharmony_ci { 213cc290419Sopenharmony_ci let instance = Self::get_instance(); 214cc290419Sopenharmony_ci let mut map = instance.map.lock().unwrap(); 215cc290419Sopenharmony_ci let Some(arc_wr) = map.get(&session_id) else { 216cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::NotFound, "session not found")); 217cc290419Sopenharmony_ci }; 218cc290419Sopenharmony_ci fd =arc_wr.fd; 219cc290419Sopenharmony_ci } 220cc290419Sopenharmony_ci let body = serializer::concat_pack(data); 221cc290419Sopenharmony_ci let head = usb::build_header(session_id, 1, body.len()); 222cc290419Sopenharmony_ci let instance = Self::get_instance(); 223cc290419Sopenharmony_ci let _guard = instance.lock.lock().unwrap(); 224cc290419Sopenharmony_ci let mut child_ret = 0; 225cc290419Sopenharmony_ci match usb_write_all(fd, head) { 226cc290419Sopenharmony_ci Ok(_) => {} 227cc290419Sopenharmony_ci Err(e) => { 228cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::Other, "Error writing head")); 229cc290419Sopenharmony_ci } 230cc290419Sopenharmony_ci } 231cc290419Sopenharmony_ci 232cc290419Sopenharmony_ci match usb_write_all(fd, body) { 233cc290419Sopenharmony_ci Ok(ret) => { 234cc290419Sopenharmony_ci child_ret = ret; 235cc290419Sopenharmony_ci } 236cc290419Sopenharmony_ci Err(e) => { 237cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::Other, "Error writing body")); 238cc290419Sopenharmony_ci } 239cc290419Sopenharmony_ci } 240cc290419Sopenharmony_ci 241cc290419Sopenharmony_ci if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) { 242cc290419Sopenharmony_ci let tail = usb::build_header(session_id, 0, 0); 243cc290419Sopenharmony_ci // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect 244cc290419Sopenharmony_ci // so, we send dummy packet to prevent zero packet generate 245cc290419Sopenharmony_ci match usb_write_all(fd, tail) { 246cc290419Sopenharmony_ci Ok(_) => {} 247cc290419Sopenharmony_ci Err(e) => { 248cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::Other, "Error writing tail")); 249cc290419Sopenharmony_ci } 250cc290419Sopenharmony_ci } 251cc290419Sopenharmony_ci } 252cc290419Sopenharmony_ci Ok(()) 253cc290419Sopenharmony_ci } 254cc290419Sopenharmony_ci 255cc290419Sopenharmony_ci pub async fn start(session_id: u32, wr: UsbWriter) { 256cc290419Sopenharmony_ci let buffer_map = Self::get_instance(); 257cc290419Sopenharmony_ci let mut try_times = 0; 258cc290419Sopenharmony_ci let max_try_time = 10; 259cc290419Sopenharmony_ci let wait_one_seconds = 1000; 260cc290419Sopenharmony_ci loop { 261cc290419Sopenharmony_ci try_times += 1; 262cc290419Sopenharmony_ci if let Ok(mut map) = buffer_map.map.try_lock() { 263cc290419Sopenharmony_ci map.insert(session_id, wr); 264cc290419Sopenharmony_ci crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times"); 265cc290419Sopenharmony_ci break; 266cc290419Sopenharmony_ci } else { 267cc290419Sopenharmony_ci if try_times > max_try_time { 268cc290419Sopenharmony_ci crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd"); 269cc290419Sopenharmony_ci std::process::exit(0); 270cc290419Sopenharmony_ci } 271cc290419Sopenharmony_ci crate::error!("start usb session_id:{session_id} try lock failed {try_times} times"); 272cc290419Sopenharmony_ci std::thread::sleep(Duration::from_millis(wait_one_seconds)); 273cc290419Sopenharmony_ci } 274cc290419Sopenharmony_ci } 275cc290419Sopenharmony_ci ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await; 276cc290419Sopenharmony_ci } 277cc290419Sopenharmony_ci 278cc290419Sopenharmony_ci pub async fn end(session_id: u32) { 279cc290419Sopenharmony_ci let buffer_map = Self::get_instance(); 280cc290419Sopenharmony_ci let mut try_times = 0; 281cc290419Sopenharmony_ci let max_try_time = 10; 282cc290419Sopenharmony_ci let wait_ten_ms = 10; 283cc290419Sopenharmony_ci loop { 284cc290419Sopenharmony_ci try_times += 1; 285cc290419Sopenharmony_ci if let Ok(mut map) = buffer_map.map.try_lock() { 286cc290419Sopenharmony_ci let _ = map.remove(&session_id); 287cc290419Sopenharmony_ci crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times"); 288cc290419Sopenharmony_ci break; 289cc290419Sopenharmony_ci } else { 290cc290419Sopenharmony_ci if try_times > max_try_time { 291cc290419Sopenharmony_ci crate::error!("end usb session_id:{session_id} get lock failed will force break"); 292cc290419Sopenharmony_ci break; 293cc290419Sopenharmony_ci } 294cc290419Sopenharmony_ci crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times"); 295cc290419Sopenharmony_ci std::thread::sleep(Duration::from_millis(wait_ten_ms)); 296cc290419Sopenharmony_ci } 297cc290419Sopenharmony_ci } 298cc290419Sopenharmony_ci ConnectTypeMap::del(session_id).await; 299cc290419Sopenharmony_ci } 300cc290419Sopenharmony_ci} 301cc290419Sopenharmony_ci 302cc290419Sopenharmony_citype UartWriter_ = Arc<Mutex<UartWriter>>; 303cc290419Sopenharmony_citype UartMap_ = Arc<RwLock<HashMap<u32, UartWriter_>>>; 304cc290419Sopenharmony_ci 305cc290419Sopenharmony_cipub struct UartMap {} 306cc290419Sopenharmony_ciimpl UartMap { 307cc290419Sopenharmony_ci fn get_instance() -> UartMap_ { 308cc290419Sopenharmony_ci static mut UART_MAP: Option<UartMap_> = None; 309cc290419Sopenharmony_ci unsafe { 310cc290419Sopenharmony_ci UART_MAP 311cc290419Sopenharmony_ci .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) 312cc290419Sopenharmony_ci .clone() 313cc290419Sopenharmony_ci } 314cc290419Sopenharmony_ci } 315cc290419Sopenharmony_ci 316cc290419Sopenharmony_ci #[allow(unused)] 317cc290419Sopenharmony_ci pub async fn put(session_id: u32, data: Vec<u8>) -> io::Result<()> { 318cc290419Sopenharmony_ci let instance = Self::get_instance(); 319cc290419Sopenharmony_ci let map = instance.read().await; 320cc290419Sopenharmony_ci let Some(arc_wr) = map.get(&session_id) else { 321cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::NotFound, "session not found")); 322cc290419Sopenharmony_ci }; 323cc290419Sopenharmony_ci let wr = arc_wr.lock().await; 324cc290419Sopenharmony_ci wr.write_all(data)?; 325cc290419Sopenharmony_ci Ok(()) 326cc290419Sopenharmony_ci } 327cc290419Sopenharmony_ci 328cc290419Sopenharmony_ci pub async fn start(session_id: u32, wr: UartWriter) { 329cc290419Sopenharmony_ci let instance = Self::get_instance(); 330cc290419Sopenharmony_ci let mut map = instance.write().await; 331cc290419Sopenharmony_ci let arc_wr = Arc::new(Mutex::new(wr)); 332cc290419Sopenharmony_ci if map.contains_key(&session_id) { 333cc290419Sopenharmony_ci crate::error!("uart start, contain session:{}", session_id); 334cc290419Sopenharmony_ci return; 335cc290419Sopenharmony_ci } 336cc290419Sopenharmony_ci map.insert(session_id, arc_wr); 337cc290419Sopenharmony_ci ConnectTypeMap::put(session_id, ConnectType::Uart).await; 338cc290419Sopenharmony_ci } 339cc290419Sopenharmony_ci} 340cc290419Sopenharmony_ci 341cc290419Sopenharmony_cipub async fn put(session_id: u32, data: TaskMessage) { 342cc290419Sopenharmony_ci match ConnectTypeMap::get(session_id).await { 343cc290419Sopenharmony_ci Some(ConnectType::Tcp) => { 344cc290419Sopenharmony_ci TcpMap::put(session_id, data).await; 345cc290419Sopenharmony_ci } 346cc290419Sopenharmony_ci Some(ConnectType::Usb(_mount_point)) => { 347cc290419Sopenharmony_ci if let Err(e) = UsbMap::put(session_id, data).await { 348cc290419Sopenharmony_ci crate::error!("{e:?}"); 349cc290419Sopenharmony_ci #[cfg(not(feature = "host"))] 350cc290419Sopenharmony_ci task_manager::free_session(session_id).await; 351cc290419Sopenharmony_ci } 352cc290419Sopenharmony_ci } 353cc290419Sopenharmony_ci Some(ConnectType::Uart) => { 354cc290419Sopenharmony_ci uart_wrapper::wrap_put(session_id, data, 0, 0).await; 355cc290419Sopenharmony_ci } 356cc290419Sopenharmony_ci Some(ConnectType::Bt) => {} 357cc290419Sopenharmony_ci Some(ConnectType::Bridge) => { 358cc290419Sopenharmony_ci #[cfg(feature = "emulator")] 359cc290419Sopenharmony_ci BridgeMap::put(session_id, data).await; 360cc290419Sopenharmony_ci } 361cc290419Sopenharmony_ci Some(ConnectType::HostUsb(_mount_point)) => { 362cc290419Sopenharmony_ci #[cfg(feature = "host")] 363cc290419Sopenharmony_ci if let Err(e) = HostUsbMap::put(session_id, data).await { 364cc290419Sopenharmony_ci crate::error!("{e:?}"); 365cc290419Sopenharmony_ci } 366cc290419Sopenharmony_ci } 367cc290419Sopenharmony_ci None => { 368cc290419Sopenharmony_ci crate::warn!("fail to get connect type for session:{}, command:{:?}", session_id, data.command); 369cc290419Sopenharmony_ci } 370cc290419Sopenharmony_ci } 371cc290419Sopenharmony_ci} 372cc290419Sopenharmony_ci 373cc290419Sopenharmony_cipub async fn send_channel_data(channel_id: u32, data: Vec<u8>) { 374cc290419Sopenharmony_ci let _ = TcpMap::send_channel_message(channel_id, data).await; 375cc290419Sopenharmony_ci} 376cc290419Sopenharmony_ci 377cc290419Sopenharmony_cipub enum EchoLevel { 378cc290419Sopenharmony_ci INFO, 379cc290419Sopenharmony_ci FAIL, 380cc290419Sopenharmony_ci RAW, 381cc290419Sopenharmony_ci OK, // this echo maybe OK with carriage return and newline 382cc290419Sopenharmony_ci} 383cc290419Sopenharmony_ci 384cc290419Sopenharmony_ciimpl EchoLevel { 385cc290419Sopenharmony_ci pub fn convert_from_message_level(cmd: u8) -> Result<Self, Error> { 386cc290419Sopenharmony_ci match cmd { 387cc290419Sopenharmony_ci 0 => Ok(Self::FAIL), 388cc290419Sopenharmony_ci 1 => Ok(Self::INFO), 389cc290419Sopenharmony_ci 2 => Ok(Self::OK), 390cc290419Sopenharmony_ci _ => Err(Error::new(ErrorKind::Other, "invalid message level type")) 391cc290419Sopenharmony_ci } 392cc290419Sopenharmony_ci } 393cc290419Sopenharmony_ci} 394cc290419Sopenharmony_ci 395cc290419Sopenharmony_cipub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()> { 396cc290419Sopenharmony_ci let data = match level { 397cc290419Sopenharmony_ci EchoLevel::INFO => format!("[Info]{msg}") + "\r\n", 398cc290419Sopenharmony_ci EchoLevel::FAIL => format!("[Fail]{msg}") + "\r\n", 399cc290419Sopenharmony_ci EchoLevel::RAW => msg.to_string() + "\r\n", 400cc290419Sopenharmony_ci EchoLevel::OK => msg.to_string(), 401cc290419Sopenharmony_ci }; 402cc290419Sopenharmony_ci TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await 403cc290419Sopenharmony_ci} 404cc290419Sopenharmony_ci 405cc290419Sopenharmony_ci// client recv and print msg 406cc290419Sopenharmony_citype TcpRecver_ = Arc<Mutex<SplitReadHalf>>; 407cc290419Sopenharmony_citype ChannelMap_ = Arc<RwLock<HashMap<u32, TcpRecver_>>>; 408cc290419Sopenharmony_ci 409cc290419Sopenharmony_cipub struct ChannelMap {} 410cc290419Sopenharmony_ciimpl ChannelMap { 411cc290419Sopenharmony_ci fn get_instance() -> ChannelMap_ { 412cc290419Sopenharmony_ci static mut TCP_RECVER: Option<ChannelMap_> = None; 413cc290419Sopenharmony_ci unsafe { 414cc290419Sopenharmony_ci TCP_RECVER 415cc290419Sopenharmony_ci .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) 416cc290419Sopenharmony_ci .clone() 417cc290419Sopenharmony_ci } 418cc290419Sopenharmony_ci } 419cc290419Sopenharmony_ci 420cc290419Sopenharmony_ci pub async fn start(rd: SplitReadHalf) { 421cc290419Sopenharmony_ci let instance = Self::get_instance(); 422cc290419Sopenharmony_ci let mut map = instance.write().await; 423cc290419Sopenharmony_ci let arc_rd = Arc::new(Mutex::new(rd)); 424cc290419Sopenharmony_ci map.insert(0, arc_rd); 425cc290419Sopenharmony_ci } 426cc290419Sopenharmony_ci 427cc290419Sopenharmony_ci pub async fn recv() -> io::Result<Vec<u8>> { 428cc290419Sopenharmony_ci let instance = Self::get_instance(); 429cc290419Sopenharmony_ci let map = instance.read().await; 430cc290419Sopenharmony_ci let Some(arc_rd) = map.get(&0) else { 431cc290419Sopenharmony_ci return Err(Error::new(ErrorKind::NotFound, "channel not found")); 432cc290419Sopenharmony_ci }; 433cc290419Sopenharmony_ci let mut rd = arc_rd.lock().await; 434cc290419Sopenharmony_ci tcp::recv_channel_message(&mut rd).await 435cc290419Sopenharmony_ci } 436cc290419Sopenharmony_ci} 437cc290419Sopenharmony_ci 438cc290419Sopenharmony_cipub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> { 439cc290419Sopenharmony_ci let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN); 440cc290419Sopenharmony_ci ylong_runtime::spawn(async move { 441cc290419Sopenharmony_ci let mut rd = UsbReader { fd }; 442cc290419Sopenharmony_ci loop { 443cc290419Sopenharmony_ci if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) { 444cc290419Sopenharmony_ci crate::warn!("unpack task failed: {}, reopen fd...", e.to_string()); 445cc290419Sopenharmony_ci break; 446cc290419Sopenharmony_ci } 447cc290419Sopenharmony_ci } 448cc290419Sopenharmony_ci }); 449cc290419Sopenharmony_ci rx 450cc290419Sopenharmony_ci} 451cc290419Sopenharmony_ci 452cc290419Sopenharmony_cipub struct DiedSession { 453cc290419Sopenharmony_ci set: Arc<RwLock<HashSet<u32>>>, 454cc290419Sopenharmony_ci queue: Arc<RwLock<VecDeque<u32>>>, 455cc290419Sopenharmony_ci} 456cc290419Sopenharmony_ciimpl DiedSession { 457cc290419Sopenharmony_ci pub(crate) fn get_instance() -> &'static DiedSession { 458cc290419Sopenharmony_ci static mut DIED_SESSION: MaybeUninit<DiedSession> = MaybeUninit::uninit(); 459cc290419Sopenharmony_ci static ONCE: Once = Once::new(); 460cc290419Sopenharmony_ci 461cc290419Sopenharmony_ci unsafe { 462cc290419Sopenharmony_ci ONCE.call_once(|| { 463cc290419Sopenharmony_ci let global = DiedSession { 464cc290419Sopenharmony_ci set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))), 465cc290419Sopenharmony_ci queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM))) 466cc290419Sopenharmony_ci }; 467cc290419Sopenharmony_ci DIED_SESSION = MaybeUninit::new(global); 468cc290419Sopenharmony_ci }); 469cc290419Sopenharmony_ci &*DIED_SESSION.as_ptr() 470cc290419Sopenharmony_ci } 471cc290419Sopenharmony_ci } 472cc290419Sopenharmony_ci 473cc290419Sopenharmony_ci pub async fn add(session_id: u32) { 474cc290419Sopenharmony_ci let instance = Self::get_instance(); 475cc290419Sopenharmony_ci let mut set = instance.set.write().await; 476cc290419Sopenharmony_ci let mut queue = instance.queue.write().await; 477cc290419Sopenharmony_ci if queue.len() >= config::MAX_DIED_SESSION_NUM { 478cc290419Sopenharmony_ci if let Some(front_session) = queue.pop_front(){ 479cc290419Sopenharmony_ci set.remove(&front_session); 480cc290419Sopenharmony_ci } 481cc290419Sopenharmony_ci } 482cc290419Sopenharmony_ci if !set.contains(&session_id) { 483cc290419Sopenharmony_ci set.insert(session_id); 484cc290419Sopenharmony_ci queue.push_back(session_id) 485cc290419Sopenharmony_ci } 486cc290419Sopenharmony_ci } 487cc290419Sopenharmony_ci 488cc290419Sopenharmony_ci pub async fn get(session_id: u32) -> bool { 489cc290419Sopenharmony_ci let instance = Self::get_instance(); 490cc290419Sopenharmony_ci let set = instance.set.read().await; 491cc290419Sopenharmony_ci set.contains(&session_id) 492cc290419Sopenharmony_ci } 493cc290419Sopenharmony_ci}