/* * Copyright (C) 2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //! buffer #![allow(missing_docs)] use super::base::{self, Writer}; use super::uart::UartWriter; use super::usb::{self, UsbReader, UsbWriter}; use super::{tcp, uart_wrapper}; #[cfg(feature = "emulator")] use crate::daemon_lib::bridge::BridgeMap; #[cfg(feature = "host")] use crate::host_transfer::host_usb::HostUsbMap; use crate::config::TaskMessage; use crate::config::{self, ConnectType}; use crate::serializer; #[allow(unused)] use crate::utils::hdc_log::*; #[cfg(not(feature = "host"))] use crate::daemon_lib::task_manager; use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{self, Error, ErrorKind}; use std::sync::Arc; use std::sync::Once; use std::mem::MaybeUninit; use crate::transfer::usb::usb_write_all; use std::time::Duration; #[cfg(feature = "host")] extern crate ylong_runtime_static as ylong_runtime; use ylong_runtime::io::AsyncWriteExt; use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf}; use ylong_runtime::sync::{mpsc, Mutex, RwLock}; type ConnectTypeMap_ = Arc>>; pub struct ConnectTypeMap {} impl ConnectTypeMap { fn get_instance() -> ConnectTypeMap_ { static mut CONNECT_TYPE_MAP: Option = None; unsafe { CONNECT_TYPE_MAP .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) .clone() } } pub async fn put(session_id: u32, conn_type: ConnectType) { let arc_map = Self::get_instance(); let mut map = arc_map.write().await; map.insert(session_id, conn_type.clone()); crate::debug!("connect map: add {session_id}, {:?}", conn_type); } pub async fn get(session_id: u32) -> Option { let arc_map = Self::get_instance(); let map = arc_map.read().await; map.get(&session_id).cloned() } pub async fn del(session_id: u32) { let arc_map = Self::get_instance(); let mut map = arc_map.write().await; let item = map.remove(&session_id); DiedSession::add(session_id).await; crate::debug!("connect map: del {session_id}: {:?}", item); } pub async fn get_all_session() -> Vec { let mut sessiones = Vec::new(); let arc_map = Self::get_instance(); let map = arc_map.read().await; for item in map.iter() { sessiones.push(*item.0); } sessiones } pub async fn dump() -> String { let arc_map = Self::get_instance(); let map = arc_map.read().await; let mut result = "".to_string(); for item in map.iter() { let line = format!("session_id:{},\tconnect_type:{:?}\n", item.0, item.1); result.push_str(line.as_str()); } result } } pub async fn dump_session() -> String { ConnectTypeMap::dump().await } type TcpWriter_ = Arc>; pub struct TcpMap { map: Mutex>, } impl TcpMap { pub(crate) fn get_instance() -> &'static TcpMap { static mut TCP_MAP: MaybeUninit = MaybeUninit::uninit(); static ONCE: Once = Once::new(); unsafe { ONCE.call_once(|| { let global = TcpMap { map: Mutex::new(HashMap::new()) }; TCP_MAP = MaybeUninit::new(global); }); &*TCP_MAP.as_ptr() } } async fn put(session_id: u32, data: TaskMessage) { let send = serializer::concat_pack(data); let instance = Self::get_instance(); let map = instance.map.lock().await; let Some(arc_wr) = map.get(&session_id) else { crate::error!("get tcp is None, session_id={session_id}"); return; }; let mut wr = arc_wr.lock().await; let _ = wr.write_all(send.as_slice()).await; } pub async fn send_channel_message(channel_id: u32, buf: Vec) -> io::Result<()> { crate::trace!( "send channel({channel_id}) msg: {:?}", buf.iter() .map(|&c| format!("{c:02X}")) .collect::>() .join(" ") ); let send = [ u32::to_be_bytes(buf.len() as u32).as_slice(), buf.as_slice(), ] .concat(); let instance = Self::get_instance(); let map = instance.map.lock().await; if let Some(guard) = map.get(&channel_id) { let mut wr = guard.lock().await; let _ = wr.write_all(send.as_slice()).await; return Ok(()); } Err(Error::new(ErrorKind::NotFound, "channel not found")) } pub async fn start(id: u32, wr: SplitWriteHalf) { let instance = Self::get_instance(); let mut map = instance.map.lock().await; let arc_wr = Arc::new(Mutex::new(wr)); map.insert(id, arc_wr); ConnectTypeMap::put(id, ConnectType::Tcp).await; crate::warn!("tcp start {id}"); } pub async fn end(id: u32) { let instance = Self::get_instance(); let mut map = instance.map.lock().await; if let Some(arc_wr) = map.remove(&id) { let mut wr = arc_wr.lock().await; let _ = wr.shutdown().await; } crate::warn!("tcp end {id}"); ConnectTypeMap::del(id).await; } } pub struct UsbMap { map: std::sync::Mutex>, lock: std::sync::Mutex, } impl UsbMap { pub(crate) fn get_instance() -> &'static UsbMap { static mut USB_MAP: MaybeUninit = MaybeUninit::uninit(); static ONCE: Once = Once::new(); unsafe { ONCE.call_once(|| { let global = UsbMap { map: std::sync::Mutex::new(HashMap::new()), lock: std::sync::Mutex::new(0) }; USB_MAP = MaybeUninit::new(global); }); &*USB_MAP.as_ptr() } } #[allow(unused)] async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> { if DiedSession::get(session_id).await { return Err(Error::new(ErrorKind::NotFound, "session already died"));; } let mut fd = 0; { let instance = Self::get_instance(); let mut map = instance.map.lock().unwrap(); let Some(arc_wr) = map.get(&session_id) else { return Err(Error::new(ErrorKind::NotFound, "session not found")); }; fd =arc_wr.fd; } let body = serializer::concat_pack(data); let head = usb::build_header(session_id, 1, body.len()); let instance = Self::get_instance(); let _guard = instance.lock.lock().unwrap(); let mut child_ret = 0; match usb_write_all(fd, head) { Ok(_) => {} Err(e) => { return Err(Error::new(ErrorKind::Other, "Error writing head")); } } match usb_write_all(fd, body) { Ok(ret) => { child_ret = ret; } Err(e) => { return Err(Error::new(ErrorKind::Other, "Error writing body")); } } if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) { let tail = usb::build_header(session_id, 0, 0); // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect // so, we send dummy packet to prevent zero packet generate match usb_write_all(fd, tail) { Ok(_) => {} Err(e) => { return Err(Error::new(ErrorKind::Other, "Error writing tail")); } } } Ok(()) } pub async fn start(session_id: u32, wr: UsbWriter) { let buffer_map = Self::get_instance(); let mut try_times = 0; let max_try_time = 10; let wait_one_seconds = 1000; loop { try_times += 1; if let Ok(mut map) = buffer_map.map.try_lock() { map.insert(session_id, wr); crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times"); break; } else { if try_times > max_try_time { crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd"); std::process::exit(0); } crate::error!("start usb session_id:{session_id} try lock failed {try_times} times"); std::thread::sleep(Duration::from_millis(wait_one_seconds)); } } ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await; } pub async fn end(session_id: u32) { let buffer_map = Self::get_instance(); let mut try_times = 0; let max_try_time = 10; let wait_ten_ms = 10; loop { try_times += 1; if let Ok(mut map) = buffer_map.map.try_lock() { let _ = map.remove(&session_id); crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times"); break; } else { if try_times > max_try_time { crate::error!("end usb session_id:{session_id} get lock failed will force break"); break; } crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times"); std::thread::sleep(Duration::from_millis(wait_ten_ms)); } } ConnectTypeMap::del(session_id).await; } } type UartWriter_ = Arc>; type UartMap_ = Arc>>; pub struct UartMap {} impl UartMap { fn get_instance() -> UartMap_ { static mut UART_MAP: Option = None; unsafe { UART_MAP .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) .clone() } } #[allow(unused)] pub async fn put(session_id: u32, data: Vec) -> io::Result<()> { let instance = Self::get_instance(); let map = instance.read().await; let Some(arc_wr) = map.get(&session_id) else { return Err(Error::new(ErrorKind::NotFound, "session not found")); }; let wr = arc_wr.lock().await; wr.write_all(data)?; Ok(()) } pub async fn start(session_id: u32, wr: UartWriter) { let instance = Self::get_instance(); let mut map = instance.write().await; let arc_wr = Arc::new(Mutex::new(wr)); if map.contains_key(&session_id) { crate::error!("uart start, contain session:{}", session_id); return; } map.insert(session_id, arc_wr); ConnectTypeMap::put(session_id, ConnectType::Uart).await; } } pub async fn put(session_id: u32, data: TaskMessage) { match ConnectTypeMap::get(session_id).await { Some(ConnectType::Tcp) => { TcpMap::put(session_id, data).await; } Some(ConnectType::Usb(_mount_point)) => { if let Err(e) = UsbMap::put(session_id, data).await { crate::error!("{e:?}"); #[cfg(not(feature = "host"))] task_manager::free_session(session_id).await; } } Some(ConnectType::Uart) => { uart_wrapper::wrap_put(session_id, data, 0, 0).await; } Some(ConnectType::Bt) => {} Some(ConnectType::Bridge) => { #[cfg(feature = "emulator")] BridgeMap::put(session_id, data).await; } Some(ConnectType::HostUsb(_mount_point)) => { #[cfg(feature = "host")] if let Err(e) = HostUsbMap::put(session_id, data).await { crate::error!("{e:?}"); } } None => { crate::warn!("fail to get connect type for session:{}, command:{:?}", session_id, data.command); } } } pub async fn send_channel_data(channel_id: u32, data: Vec) { let _ = TcpMap::send_channel_message(channel_id, data).await; } pub enum EchoLevel { INFO, FAIL, RAW, OK, // this echo maybe OK with carriage return and newline } impl EchoLevel { pub fn convert_from_message_level(cmd: u8) -> Result { match cmd { 0 => Ok(Self::FAIL), 1 => Ok(Self::INFO), 2 => Ok(Self::OK), _ => Err(Error::new(ErrorKind::Other, "invalid message level type")) } } } pub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()> { let data = match level { EchoLevel::INFO => format!("[Info]{msg}") + "\r\n", EchoLevel::FAIL => format!("[Fail]{msg}") + "\r\n", EchoLevel::RAW => msg.to_string() + "\r\n", EchoLevel::OK => msg.to_string(), }; TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await } // client recv and print msg type TcpRecver_ = Arc>; type ChannelMap_ = Arc>>; pub struct ChannelMap {} impl ChannelMap { fn get_instance() -> ChannelMap_ { static mut TCP_RECVER: Option = None; unsafe { TCP_RECVER .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) .clone() } } pub async fn start(rd: SplitReadHalf) { let instance = Self::get_instance(); let mut map = instance.write().await; let arc_rd = Arc::new(Mutex::new(rd)); map.insert(0, arc_rd); } pub async fn recv() -> io::Result> { let instance = Self::get_instance(); let map = instance.read().await; let Some(arc_rd) = map.get(&0) else { return Err(Error::new(ErrorKind::NotFound, "channel not found")); }; let mut rd = arc_rd.lock().await; tcp::recv_channel_message(&mut rd).await } } pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> { let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN); ylong_runtime::spawn(async move { let mut rd = UsbReader { fd }; loop { if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) { crate::warn!("unpack task failed: {}, reopen fd...", e.to_string()); break; } } }); rx } pub struct DiedSession { set: Arc>>, queue: Arc>>, } impl DiedSession { pub(crate) fn get_instance() -> &'static DiedSession { static mut DIED_SESSION: MaybeUninit = MaybeUninit::uninit(); static ONCE: Once = Once::new(); unsafe { ONCE.call_once(|| { let global = DiedSession { set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))), queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM))) }; DIED_SESSION = MaybeUninit::new(global); }); &*DIED_SESSION.as_ptr() } } pub async fn add(session_id: u32) { let instance = Self::get_instance(); let mut set = instance.set.write().await; let mut queue = instance.queue.write().await; if queue.len() >= config::MAX_DIED_SESSION_NUM { if let Some(front_session) = queue.pop_front(){ set.remove(&front_session); } } if !set.contains(&session_id) { set.insert(session_id); queue.push_back(session_id) } } pub async fn get(session_id: u32) -> bool { let instance = Self::get_instance(); let set = instance.set.read().await; set.contains(&session_id) } }