1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! buffer
16 #![allow(missing_docs)]
17 
18 use super::base::{self, Writer};
19 use super::uart::UartWriter;
20 use super::usb::{self, UsbReader, UsbWriter};
21 use super::{tcp, uart_wrapper};
22 #[cfg(feature = "emulator")]
23 use crate::daemon_lib::bridge::BridgeMap;
24 #[cfg(feature = "host")]
25 use crate::host_transfer::host_usb::HostUsbMap;
26 
27 use crate::config::TaskMessage;
28 use crate::config::{self, ConnectType};
29 use crate::serializer;
30 #[allow(unused)]
31 use crate::utils::hdc_log::*;
32 #[cfg(not(feature = "host"))]
33 use crate::daemon_lib::task_manager;
34 use std::collections::{HashMap, HashSet, VecDeque};
35 use std::io::{self, Error, ErrorKind};
36 use std::sync::Arc;
37 use std::sync::Once;
38 use std::mem::MaybeUninit;
39 use crate::transfer::usb::usb_write_all;
40 use std::time::Duration;
41 
42 #[cfg(feature = "host")]
43 extern crate ylong_runtime_static as ylong_runtime;
44 use ylong_runtime::io::AsyncWriteExt;
45 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf};
46 use ylong_runtime::sync::{mpsc, Mutex, RwLock};
47 
48 type ConnectTypeMap_ = Arc<RwLock<HashMap<u32, ConnectType>>>;
49 
50 pub struct ConnectTypeMap {}
51 impl ConnectTypeMap {
get_instancenull52     fn get_instance() -> ConnectTypeMap_ {
53         static mut CONNECT_TYPE_MAP: Option<ConnectTypeMap_> = None;
54         unsafe {
55             CONNECT_TYPE_MAP
56                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
57                 .clone()
58         }
59     }
60 
61     pub async fn put(session_id: u32, conn_type: ConnectType) {
62         let arc_map = Self::get_instance();
63         let mut map = arc_map.write().await;
64         map.insert(session_id, conn_type.clone());
65         crate::debug!("connect map: add {session_id}, {:?}", conn_type);
66     }
67 
68     pub async fn get(session_id: u32) -> Option<ConnectType> {
69         let arc_map = Self::get_instance();
70         let map = arc_map.read().await;
71         map.get(&session_id).cloned()
72     }
73 
74     pub async fn del(session_id: u32) {
75         let arc_map = Self::get_instance();
76         let mut map = arc_map.write().await;
77         let item = map.remove(&session_id);
78         DiedSession::add(session_id).await;
79         crate::debug!("connect map: del {session_id}: {:?}", item);
80     }
81 
82     pub async fn get_all_session() -> Vec<u32> {
83         let mut sessiones = Vec::new();
84         let arc_map = Self::get_instance();
85         let map = arc_map.read().await;
86         for item in map.iter() {
87             sessiones.push(*item.0);
88         }
89         sessiones
90     }
91 
92     pub async fn dump() -> String {
93         let arc_map = Self::get_instance();
94         let map = arc_map.read().await;
95         let mut result = "".to_string();
96         for item in map.iter() {
97             let line = format!("session_id:{},\tconnect_type:{:?}\n", item.0, item.1);
98             result.push_str(line.as_str());
99         }
100         result
101     }
102 }
103 
104 pub async fn dump_session() -> String {
105     ConnectTypeMap::dump().await
106 }
107 
108 type TcpWriter_ = Arc<Mutex<SplitWriteHalf>>;
109 
110 pub struct TcpMap {
111     map: Mutex<HashMap<u32, TcpWriter_>>,
112 }
113 impl TcpMap {
114     pub(crate)  fn get_instance() -> &'static TcpMap {
115         static mut TCP_MAP: MaybeUninit<TcpMap> = MaybeUninit::uninit();
116         static ONCE: Once = Once::new();
117 
118         unsafe {
119             ONCE.call_once(|| {
120                 let global = TcpMap {
121                     map: Mutex::new(HashMap::new())
122                 };
123                 TCP_MAP = MaybeUninit::new(global);
124             });
125             &*TCP_MAP.as_ptr()
126         }
127     }
128 
129     async fn put(session_id: u32, data: TaskMessage) {
130         let send = serializer::concat_pack(data);
131         let instance = Self::get_instance();
132         let map = instance.map.lock().await;
133         let Some(arc_wr) = map.get(&session_id) else {
134             crate::error!("get tcp is None, session_id={session_id}");
135             return;
136         };
137         let mut wr = arc_wr.lock().await;
138         let _ = wr.write_all(send.as_slice()).await;
139     }
140 
141     pub async fn send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()> {
142         crate::trace!(
143             "send channel({channel_id}) msg: {:?}",
144             buf.iter()
145                 .map(|&c| format!("{c:02X}"))
146                 .collect::<Vec<_>>()
147                 .join(" ")
148         );
149         let send = [
150             u32::to_be_bytes(buf.len() as u32).as_slice(),
151             buf.as_slice(),
152         ]
153         .concat();
154         let instance = Self::get_instance();
155         let map = instance.map.lock().await;
156         if let Some(guard) = map.get(&channel_id) {
157             let mut wr = guard.lock().await;
158             let _ = wr.write_all(send.as_slice()).await;
159             return Ok(());
160         }
161         Err(Error::new(ErrorKind::NotFound, "channel not found"))
162     }
163 
164     pub async fn start(id: u32, wr: SplitWriteHalf) {
165         let instance = Self::get_instance();
166         let mut map = instance.map.lock().await;
167         let arc_wr = Arc::new(Mutex::new(wr));
168         map.insert(id, arc_wr);
169         ConnectTypeMap::put(id, ConnectType::Tcp).await;
170         crate::warn!("tcp start {id}");
171     }
172 
173     pub async fn end(id: u32) {
174         let instance = Self::get_instance();
175         let mut map = instance.map.lock().await;
176         if let Some(arc_wr) = map.remove(&id) {
177             let mut wr = arc_wr.lock().await;
178             let _ = wr.shutdown().await;
179         }
180         crate::warn!("tcp end {id}");
181         ConnectTypeMap::del(id).await;
182     }
183 }
184 
185 pub struct UsbMap {
186     map: std::sync::Mutex<HashMap<u32, UsbWriter>>,
187     lock: std::sync::Mutex<i32>,
188 }
189 impl UsbMap {
190     pub(crate)  fn get_instance() -> &'static UsbMap {
191         static mut USB_MAP: MaybeUninit<UsbMap> = MaybeUninit::uninit();
192         static ONCE: Once = Once::new();
193 
194         unsafe {
195             ONCE.call_once(|| {
196                 let global = UsbMap {
197                     map: std::sync::Mutex::new(HashMap::new()),
198                     lock: std::sync::Mutex::new(0)
199                 };
200                 USB_MAP = MaybeUninit::new(global);
201             });
202             &*USB_MAP.as_ptr()
203         }
204     }
205 
206     #[allow(unused)]
207     async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> {
208         if DiedSession::get(session_id).await {
209             return Err(Error::new(ErrorKind::NotFound, "session already died"));;
210         }
211         let mut fd = 0;
212         {
213             let instance = Self::get_instance();
214             let mut map = instance.map.lock().unwrap();
215             let Some(arc_wr) = map.get(&session_id) else {
216                 return Err(Error::new(ErrorKind::NotFound, "session not found"));
217             };
218             fd =arc_wr.fd;
219         }
220         let body = serializer::concat_pack(data);
221         let head = usb::build_header(session_id, 1, body.len());
222         let instance = Self::get_instance();
223         let _guard = instance.lock.lock().unwrap();
224         let mut child_ret = 0;
225         match usb_write_all(fd, head) {
226             Ok(_) => {}
227             Err(e) => {
228                 return Err(Error::new(ErrorKind::Other, "Error writing head"));
229             }
230         }
231 
232         match usb_write_all(fd, body) {
233             Ok(ret) => {
234                 child_ret = ret;
235             }
236             Err(e) => {
237                 return Err(Error::new(ErrorKind::Other, "Error writing body"));
238             }
239         }
240 
241         if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) {
242             let tail = usb::build_header(session_id, 0, 0);
243             // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect
244             // so, we send dummy packet to prevent zero packet generate
245             match usb_write_all(fd, tail) {
246                 Ok(_) => {}
247                 Err(e) => {
248                     return Err(Error::new(ErrorKind::Other, "Error writing tail"));
249                 }
250             }
251         }
252         Ok(())
253     }
254 
255     pub async fn start(session_id: u32, wr: UsbWriter) {
256         let buffer_map = Self::get_instance();
257         let mut try_times = 0;
258         let max_try_time = 10;
259         let wait_one_seconds = 1000;
260         loop {
261             try_times += 1;
262             if let Ok(mut map) = buffer_map.map.try_lock() {
263                 map.insert(session_id, wr);
264                 crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times");
265                 break;
266             } else {
267                 if try_times > max_try_time {
268                     crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd");
269                     std::process::exit(0);
270                 }
271                 crate::error!("start usb session_id:{session_id} try lock failed {try_times} times");
272                 std::thread::sleep(Duration::from_millis(wait_one_seconds));
273             }
274         }
275         ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await;
276     }
277 
278     pub async fn end(session_id: u32) {
279         let buffer_map = Self::get_instance();
280         let mut try_times = 0;
281         let max_try_time = 10;
282         let wait_ten_ms = 10;
283         loop {
284             try_times += 1;
285             if let Ok(mut map) = buffer_map.map.try_lock() {
286                 let _ = map.remove(&session_id);
287                 crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times");
288                 break;
289             } else {
290                 if try_times > max_try_time {
291                     crate::error!("end usb session_id:{session_id} get lock failed will force break");
292                     break;
293                 }
294                 crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times");
295                 std::thread::sleep(Duration::from_millis(wait_ten_ms));
296             }
297         }
298         ConnectTypeMap::del(session_id).await;
299     }
300 }
301 
302 type UartWriter_ = Arc<Mutex<UartWriter>>;
303 type UartMap_ = Arc<RwLock<HashMap<u32, UartWriter_>>>;
304 
305 pub struct UartMap {}
306 impl UartMap {
get_instancenull307     fn get_instance() -> UartMap_ {
308         static mut UART_MAP: Option<UartMap_> = None;
309         unsafe {
310             UART_MAP
311                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
312                 .clone()
313         }
314     }
315 
316     #[allow(unused)]
317     pub async fn put(session_id: u32, data: Vec<u8>) -> io::Result<()> {
318         let instance = Self::get_instance();
319         let map = instance.read().await;
320         let Some(arc_wr) = map.get(&session_id) else {
321             return Err(Error::new(ErrorKind::NotFound, "session not found"));
322         };
323         let wr = arc_wr.lock().await;
324         wr.write_all(data)?;
325         Ok(())
326     }
327 
328     pub async fn start(session_id: u32, wr: UartWriter) {
329         let instance = Self::get_instance();
330         let mut map = instance.write().await;
331         let arc_wr = Arc::new(Mutex::new(wr));
332         if map.contains_key(&session_id) {
333             crate::error!("uart start, contain session:{}", session_id);
334             return;
335         }
336         map.insert(session_id, arc_wr);
337         ConnectTypeMap::put(session_id, ConnectType::Uart).await;
338     }
339 }
340 
341 pub async fn put(session_id: u32, data: TaskMessage) {
342     match ConnectTypeMap::get(session_id).await {
343         Some(ConnectType::Tcp) => {
344             TcpMap::put(session_id, data).await;
345         }
346         Some(ConnectType::Usb(_mount_point)) => {
347             if let Err(e) = UsbMap::put(session_id, data).await {
348                 crate::error!("{e:?}");
349                 #[cfg(not(feature = "host"))]
350                 task_manager::free_session(session_id).await;
351             }
352         }
353         Some(ConnectType::Uart) => {
354             uart_wrapper::wrap_put(session_id, data, 0, 0).await;
355         }
356         Some(ConnectType::Bt) => {}
357         Some(ConnectType::Bridge) => {
358             #[cfg(feature = "emulator")]
359             BridgeMap::put(session_id, data).await;
360         }
361         Some(ConnectType::HostUsb(_mount_point)) => {
362             #[cfg(feature = "host")]
363             if let Err(e) = HostUsbMap::put(session_id, data).await {
364                 crate::error!("{e:?}");
365             }
366         }
367         None => {
368             crate::warn!("fail to get connect type for session:{}, command:{:?}", session_id, data.command);
369         }
370     }
371 }
372 
373 pub async fn send_channel_data(channel_id: u32, data: Vec<u8>) {
374     let _ = TcpMap::send_channel_message(channel_id, data).await;
375 }
376 
377 pub enum EchoLevel {
378     INFO,
379     FAIL,
380     RAW,
381     OK, // this echo maybe OK with carriage return and newline
382 }
383 
384 impl EchoLevel {
convert_from_message_levelnull385     pub fn convert_from_message_level(cmd: u8) -> Result<Self, Error> {
386         match cmd {
387             0 => Ok(Self::FAIL),
388             1 => Ok(Self::INFO),
389             2 => Ok(Self::OK),
390             _ => Err(Error::new(ErrorKind::Other, "invalid message level type"))
391         }
392     }
393 }
394 
395 pub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()> {
396     let data = match level {
397         EchoLevel::INFO => format!("[Info]{msg}") + "\r\n",
398         EchoLevel::FAIL => format!("[Fail]{msg}") + "\r\n",
399         EchoLevel::RAW => msg.to_string() + "\r\n",
400         EchoLevel::OK => msg.to_string(),
401     };
402     TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await
403 }
404 
405 // client recv and print msg
406 type TcpRecver_ = Arc<Mutex<SplitReadHalf>>;
407 type ChannelMap_ = Arc<RwLock<HashMap<u32, TcpRecver_>>>;
408 
409 pub struct ChannelMap {}
410 impl ChannelMap {
get_instancenull411     fn get_instance() -> ChannelMap_ {
412         static mut TCP_RECVER: Option<ChannelMap_> = None;
413         unsafe {
414             TCP_RECVER
415                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
416                 .clone()
417         }
418     }
419 
420     pub async fn start(rd: SplitReadHalf) {
421         let instance = Self::get_instance();
422         let mut map = instance.write().await;
423         let arc_rd = Arc::new(Mutex::new(rd));
424         map.insert(0, arc_rd);
425     }
426 
427     pub async fn recv() -> io::Result<Vec<u8>> {
428         let instance = Self::get_instance();
429         let map = instance.read().await;
430         let Some(arc_rd) = map.get(&0) else {
431             return Err(Error::new(ErrorKind::NotFound, "channel not found"));
432         };
433         let mut rd = arc_rd.lock().await;
434         tcp::recv_channel_message(&mut rd).await
435     }
436 }
437 
usb_start_recvnull438 pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> {
439     let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN);
440     ylong_runtime::spawn(async move {
441         let mut rd = UsbReader { fd };
442         loop {
443             if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) {
444                 crate::warn!("unpack task failed: {}, reopen fd...", e.to_string());
445                 break;
446             }
447         }
448     });
449     rx
450 }
451 
452 pub struct DiedSession {
453     set: Arc<RwLock<HashSet<u32>>>,
454     queue: Arc<RwLock<VecDeque<u32>>>,
455 }
456 impl DiedSession {
457     pub(crate)  fn get_instance() -> &'static DiedSession {
458         static mut DIED_SESSION: MaybeUninit<DiedSession> = MaybeUninit::uninit();
459         static ONCE: Once = Once::new();
460 
461         unsafe {
462             ONCE.call_once(|| {
463                 let global = DiedSession {
464                     set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))),
465                     queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM)))
466                 };
467                 DIED_SESSION = MaybeUninit::new(global);
468             });
469             &*DIED_SESSION.as_ptr()
470         }
471     }
472 
473     pub async fn add(session_id: u32) {
474         let instance = Self::get_instance();
475         let mut set = instance.set.write().await;
476         let mut queue = instance.queue.write().await;
477         if queue.len() >= config::MAX_DIED_SESSION_NUM {
478             if let Some(front_session) = queue.pop_front(){
479                 set.remove(&front_session);
480             }
481         }
482         if !set.contains(&session_id) {
483             set.insert(session_id);
484             queue.push_back(session_id)
485         }
486     }
487 
488     pub async fn get(session_id: u32) -> bool {
489         let instance = Self::get_instance();
490         let set = instance.set.read().await;
491         set.contains(&session_id)
492     }
493 }