1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! buffer
16 #![allow(missing_docs)]
17
18 use super::base::{self, Writer};
19 use super::uart::UartWriter;
20 use super::usb::{self, UsbReader, UsbWriter};
21 use super::{tcp, uart_wrapper};
22 #[cfg(feature = "emulator")]
23 use crate::daemon_lib::bridge::BridgeMap;
24 #[cfg(feature = "host")]
25 use crate::host_transfer::host_usb::HostUsbMap;
26
27 use crate::config::TaskMessage;
28 use crate::config::{self, ConnectType};
29 use crate::serializer;
30 #[allow(unused)]
31 use crate::utils::hdc_log::*;
32 #[cfg(not(feature = "host"))]
33 use crate::daemon_lib::task_manager;
34 use std::collections::{HashMap, HashSet, VecDeque};
35 use std::io::{self, Error, ErrorKind};
36 use std::sync::Arc;
37 use std::sync::Once;
38 use std::mem::MaybeUninit;
39 use crate::transfer::usb::usb_write_all;
40 use std::time::Duration;
41
42 #[cfg(feature = "host")]
43 extern crate ylong_runtime_static as ylong_runtime;
44 use ylong_runtime::io::AsyncWriteExt;
45 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf};
46 use ylong_runtime::sync::{mpsc, Mutex, RwLock};
47
48 type ConnectTypeMap_ = Arc<RwLock<HashMap<u32, ConnectType>>>;
49
50 pub struct ConnectTypeMap {}
51 impl ConnectTypeMap {
get_instancenull52 fn get_instance() -> ConnectTypeMap_ {
53 static mut CONNECT_TYPE_MAP: Option<ConnectTypeMap_> = None;
54 unsafe {
55 CONNECT_TYPE_MAP
56 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
57 .clone()
58 }
59 }
60
61 pub async fn put(session_id: u32, conn_type: ConnectType) {
62 let arc_map = Self::get_instance();
63 let mut map = arc_map.write().await;
64 map.insert(session_id, conn_type.clone());
65 crate::debug!("connect map: add {session_id}, {:?}", conn_type);
66 }
67
68 pub async fn get(session_id: u32) -> Option<ConnectType> {
69 let arc_map = Self::get_instance();
70 let map = arc_map.read().await;
71 map.get(&session_id).cloned()
72 }
73
74 pub async fn del(session_id: u32) {
75 let arc_map = Self::get_instance();
76 let mut map = arc_map.write().await;
77 let item = map.remove(&session_id);
78 DiedSession::add(session_id).await;
79 crate::debug!("connect map: del {session_id}: {:?}", item);
80 }
81
82 pub async fn get_all_session() -> Vec<u32> {
83 let mut sessiones = Vec::new();
84 let arc_map = Self::get_instance();
85 let map = arc_map.read().await;
86 for item in map.iter() {
87 sessiones.push(*item.0);
88 }
89 sessiones
90 }
91
92 pub async fn dump() -> String {
93 let arc_map = Self::get_instance();
94 let map = arc_map.read().await;
95 let mut result = "".to_string();
96 for item in map.iter() {
97 let line = format!("session_id:{},\tconnect_type:{:?}\n", item.0, item.1);
98 result.push_str(line.as_str());
99 }
100 result
101 }
102 }
103
104 pub async fn dump_session() -> String {
105 ConnectTypeMap::dump().await
106 }
107
108 type TcpWriter_ = Arc<Mutex<SplitWriteHalf>>;
109
110 pub struct TcpMap {
111 map: Mutex<HashMap<u32, TcpWriter_>>,
112 }
113 impl TcpMap {
114 pub(crate) fn get_instance() -> &'static TcpMap {
115 static mut TCP_MAP: MaybeUninit<TcpMap> = MaybeUninit::uninit();
116 static ONCE: Once = Once::new();
117
118 unsafe {
119 ONCE.call_once(|| {
120 let global = TcpMap {
121 map: Mutex::new(HashMap::new())
122 };
123 TCP_MAP = MaybeUninit::new(global);
124 });
125 &*TCP_MAP.as_ptr()
126 }
127 }
128
129 async fn put(session_id: u32, data: TaskMessage) {
130 let send = serializer::concat_pack(data);
131 let instance = Self::get_instance();
132 let map = instance.map.lock().await;
133 let Some(arc_wr) = map.get(&session_id) else {
134 crate::error!("get tcp is None, session_id={session_id}");
135 return;
136 };
137 let mut wr = arc_wr.lock().await;
138 let _ = wr.write_all(send.as_slice()).await;
139 }
140
141 pub async fn send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()> {
142 crate::trace!(
143 "send channel({channel_id}) msg: {:?}",
144 buf.iter()
145 .map(|&c| format!("{c:02X}"))
146 .collect::<Vec<_>>()
147 .join(" ")
148 );
149 let send = [
150 u32::to_be_bytes(buf.len() as u32).as_slice(),
151 buf.as_slice(),
152 ]
153 .concat();
154 let instance = Self::get_instance();
155 let map = instance.map.lock().await;
156 if let Some(guard) = map.get(&channel_id) {
157 let mut wr = guard.lock().await;
158 let _ = wr.write_all(send.as_slice()).await;
159 return Ok(());
160 }
161 Err(Error::new(ErrorKind::NotFound, "channel not found"))
162 }
163
164 pub async fn start(id: u32, wr: SplitWriteHalf) {
165 let instance = Self::get_instance();
166 let mut map = instance.map.lock().await;
167 let arc_wr = Arc::new(Mutex::new(wr));
168 map.insert(id, arc_wr);
169 ConnectTypeMap::put(id, ConnectType::Tcp).await;
170 crate::warn!("tcp start {id}");
171 }
172
173 pub async fn end(id: u32) {
174 let instance = Self::get_instance();
175 let mut map = instance.map.lock().await;
176 if let Some(arc_wr) = map.remove(&id) {
177 let mut wr = arc_wr.lock().await;
178 let _ = wr.shutdown().await;
179 }
180 crate::warn!("tcp end {id}");
181 ConnectTypeMap::del(id).await;
182 }
183 }
184
185 pub struct UsbMap {
186 map: std::sync::Mutex<HashMap<u32, UsbWriter>>,
187 lock: std::sync::Mutex<i32>,
188 }
189 impl UsbMap {
190 pub(crate) fn get_instance() -> &'static UsbMap {
191 static mut USB_MAP: MaybeUninit<UsbMap> = MaybeUninit::uninit();
192 static ONCE: Once = Once::new();
193
194 unsafe {
195 ONCE.call_once(|| {
196 let global = UsbMap {
197 map: std::sync::Mutex::new(HashMap::new()),
198 lock: std::sync::Mutex::new(0)
199 };
200 USB_MAP = MaybeUninit::new(global);
201 });
202 &*USB_MAP.as_ptr()
203 }
204 }
205
206 #[allow(unused)]
207 async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> {
208 if DiedSession::get(session_id).await {
209 return Err(Error::new(ErrorKind::NotFound, "session already died"));;
210 }
211 let mut fd = 0;
212 {
213 let instance = Self::get_instance();
214 let mut map = instance.map.lock().unwrap();
215 let Some(arc_wr) = map.get(&session_id) else {
216 return Err(Error::new(ErrorKind::NotFound, "session not found"));
217 };
218 fd =arc_wr.fd;
219 }
220 let body = serializer::concat_pack(data);
221 let head = usb::build_header(session_id, 1, body.len());
222 let instance = Self::get_instance();
223 let _guard = instance.lock.lock().unwrap();
224 let mut child_ret = 0;
225 match usb_write_all(fd, head) {
226 Ok(_) => {}
227 Err(e) => {
228 return Err(Error::new(ErrorKind::Other, "Error writing head"));
229 }
230 }
231
232 match usb_write_all(fd, body) {
233 Ok(ret) => {
234 child_ret = ret;
235 }
236 Err(e) => {
237 return Err(Error::new(ErrorKind::Other, "Error writing body"));
238 }
239 }
240
241 if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) {
242 let tail = usb::build_header(session_id, 0, 0);
243 // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect
244 // so, we send dummy packet to prevent zero packet generate
245 match usb_write_all(fd, tail) {
246 Ok(_) => {}
247 Err(e) => {
248 return Err(Error::new(ErrorKind::Other, "Error writing tail"));
249 }
250 }
251 }
252 Ok(())
253 }
254
255 pub async fn start(session_id: u32, wr: UsbWriter) {
256 let buffer_map = Self::get_instance();
257 let mut try_times = 0;
258 let max_try_time = 10;
259 let wait_one_seconds = 1000;
260 loop {
261 try_times += 1;
262 if let Ok(mut map) = buffer_map.map.try_lock() {
263 map.insert(session_id, wr);
264 crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times");
265 break;
266 } else {
267 if try_times > max_try_time {
268 crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd");
269 std::process::exit(0);
270 }
271 crate::error!("start usb session_id:{session_id} try lock failed {try_times} times");
272 std::thread::sleep(Duration::from_millis(wait_one_seconds));
273 }
274 }
275 ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await;
276 }
277
278 pub async fn end(session_id: u32) {
279 let buffer_map = Self::get_instance();
280 let mut try_times = 0;
281 let max_try_time = 10;
282 let wait_ten_ms = 10;
283 loop {
284 try_times += 1;
285 if let Ok(mut map) = buffer_map.map.try_lock() {
286 let _ = map.remove(&session_id);
287 crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times");
288 break;
289 } else {
290 if try_times > max_try_time {
291 crate::error!("end usb session_id:{session_id} get lock failed will force break");
292 break;
293 }
294 crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times");
295 std::thread::sleep(Duration::from_millis(wait_ten_ms));
296 }
297 }
298 ConnectTypeMap::del(session_id).await;
299 }
300 }
301
302 type UartWriter_ = Arc<Mutex<UartWriter>>;
303 type UartMap_ = Arc<RwLock<HashMap<u32, UartWriter_>>>;
304
305 pub struct UartMap {}
306 impl UartMap {
get_instancenull307 fn get_instance() -> UartMap_ {
308 static mut UART_MAP: Option<UartMap_> = None;
309 unsafe {
310 UART_MAP
311 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
312 .clone()
313 }
314 }
315
316 #[allow(unused)]
317 pub async fn put(session_id: u32, data: Vec<u8>) -> io::Result<()> {
318 let instance = Self::get_instance();
319 let map = instance.read().await;
320 let Some(arc_wr) = map.get(&session_id) else {
321 return Err(Error::new(ErrorKind::NotFound, "session not found"));
322 };
323 let wr = arc_wr.lock().await;
324 wr.write_all(data)?;
325 Ok(())
326 }
327
328 pub async fn start(session_id: u32, wr: UartWriter) {
329 let instance = Self::get_instance();
330 let mut map = instance.write().await;
331 let arc_wr = Arc::new(Mutex::new(wr));
332 if map.contains_key(&session_id) {
333 crate::error!("uart start, contain session:{}", session_id);
334 return;
335 }
336 map.insert(session_id, arc_wr);
337 ConnectTypeMap::put(session_id, ConnectType::Uart).await;
338 }
339 }
340
341 pub async fn put(session_id: u32, data: TaskMessage) {
342 match ConnectTypeMap::get(session_id).await {
343 Some(ConnectType::Tcp) => {
344 TcpMap::put(session_id, data).await;
345 }
346 Some(ConnectType::Usb(_mount_point)) => {
347 if let Err(e) = UsbMap::put(session_id, data).await {
348 crate::error!("{e:?}");
349 #[cfg(not(feature = "host"))]
350 task_manager::free_session(session_id).await;
351 }
352 }
353 Some(ConnectType::Uart) => {
354 uart_wrapper::wrap_put(session_id, data, 0, 0).await;
355 }
356 Some(ConnectType::Bt) => {}
357 Some(ConnectType::Bridge) => {
358 #[cfg(feature = "emulator")]
359 BridgeMap::put(session_id, data).await;
360 }
361 Some(ConnectType::HostUsb(_mount_point)) => {
362 #[cfg(feature = "host")]
363 if let Err(e) = HostUsbMap::put(session_id, data).await {
364 crate::error!("{e:?}");
365 }
366 }
367 None => {
368 crate::warn!("fail to get connect type for session:{}, command:{:?}", session_id, data.command);
369 }
370 }
371 }
372
373 pub async fn send_channel_data(channel_id: u32, data: Vec<u8>) {
374 let _ = TcpMap::send_channel_message(channel_id, data).await;
375 }
376
377 pub enum EchoLevel {
378 INFO,
379 FAIL,
380 RAW,
381 OK, // this echo maybe OK with carriage return and newline
382 }
383
384 impl EchoLevel {
convert_from_message_levelnull385 pub fn convert_from_message_level(cmd: u8) -> Result<Self, Error> {
386 match cmd {
387 0 => Ok(Self::FAIL),
388 1 => Ok(Self::INFO),
389 2 => Ok(Self::OK),
390 _ => Err(Error::new(ErrorKind::Other, "invalid message level type"))
391 }
392 }
393 }
394
395 pub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()> {
396 let data = match level {
397 EchoLevel::INFO => format!("[Info]{msg}") + "\r\n",
398 EchoLevel::FAIL => format!("[Fail]{msg}") + "\r\n",
399 EchoLevel::RAW => msg.to_string() + "\r\n",
400 EchoLevel::OK => msg.to_string(),
401 };
402 TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await
403 }
404
405 // client recv and print msg
406 type TcpRecver_ = Arc<Mutex<SplitReadHalf>>;
407 type ChannelMap_ = Arc<RwLock<HashMap<u32, TcpRecver_>>>;
408
409 pub struct ChannelMap {}
410 impl ChannelMap {
get_instancenull411 fn get_instance() -> ChannelMap_ {
412 static mut TCP_RECVER: Option<ChannelMap_> = None;
413 unsafe {
414 TCP_RECVER
415 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
416 .clone()
417 }
418 }
419
420 pub async fn start(rd: SplitReadHalf) {
421 let instance = Self::get_instance();
422 let mut map = instance.write().await;
423 let arc_rd = Arc::new(Mutex::new(rd));
424 map.insert(0, arc_rd);
425 }
426
427 pub async fn recv() -> io::Result<Vec<u8>> {
428 let instance = Self::get_instance();
429 let map = instance.read().await;
430 let Some(arc_rd) = map.get(&0) else {
431 return Err(Error::new(ErrorKind::NotFound, "channel not found"));
432 };
433 let mut rd = arc_rd.lock().await;
434 tcp::recv_channel_message(&mut rd).await
435 }
436 }
437
usb_start_recvnull438 pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> {
439 let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN);
440 ylong_runtime::spawn(async move {
441 let mut rd = UsbReader { fd };
442 loop {
443 if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) {
444 crate::warn!("unpack task failed: {}, reopen fd...", e.to_string());
445 break;
446 }
447 }
448 });
449 rx
450 }
451
452 pub struct DiedSession {
453 set: Arc<RwLock<HashSet<u32>>>,
454 queue: Arc<RwLock<VecDeque<u32>>>,
455 }
456 impl DiedSession {
457 pub(crate) fn get_instance() -> &'static DiedSession {
458 static mut DIED_SESSION: MaybeUninit<DiedSession> = MaybeUninit::uninit();
459 static ONCE: Once = Once::new();
460
461 unsafe {
462 ONCE.call_once(|| {
463 let global = DiedSession {
464 set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))),
465 queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM)))
466 };
467 DIED_SESSION = MaybeUninit::new(global);
468 });
469 &*DIED_SESSION.as_ptr()
470 }
471 }
472
473 pub async fn add(session_id: u32) {
474 let instance = Self::get_instance();
475 let mut set = instance.set.write().await;
476 let mut queue = instance.queue.write().await;
477 if queue.len() >= config::MAX_DIED_SESSION_NUM {
478 if let Some(front_session) = queue.pop_front(){
479 set.remove(&front_session);
480 }
481 }
482 if !set.contains(&session_id) {
483 set.insert(session_id);
484 queue.push_back(session_id)
485 }
486 }
487
488 pub async fn get(session_id: u32) -> bool {
489 let instance = Self::get_instance();
490 let set = instance.set.read().await;
491 set.contains(&session_id)
492 }
493 }