1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! forward
16 #![allow(missing_docs)]
17 #[cfg(feature = "host")]
18 extern crate ylong_runtime_static as ylong_runtime;
19 
20 #[cfg(not(feature = "host"))]
21 use libc::SOCK_STREAM;
22 #[cfg(not(target_os = "windows"))]
23 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24 use std::collections::HashMap;
25 #[cfg(not(target_os = "windows"))]
26 use std::fs::{self, File, OpenOptions};
27 #[cfg(not(target_os = "windows"))]
28 use std::io::{self, Error, ErrorKind, Read, Write};
29 use ylong_runtime::sync::{Mutex, RwLock};
30 
31 use crate::common::base::Base;
32 use crate::common::hdctransfer::transfer_task_finish;
33 use crate::common::hdctransfer::{self, HdcTransferBase};
34 #[cfg(not(feature = "host"))]
35 use crate::common::jdwp::Jdwp;
36 #[cfg(not(target_os = "windows"))]
37 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
38 use crate::config::HdcCommand;
39 use crate::config::MessageLevel;
40 use crate::config::TaskMessage;
41 use crate::transfer;
42 #[allow(unused)]
43 use crate::utils::hdc_log::*;
44 use crate::{config, utils};
45 use std::mem::MaybeUninit;
46 use std::sync::Arc;
47 use std::sync::Once;
48 #[cfg(not(feature = "host"))]
49 use std::time::Duration;
50 use ylong_runtime::io::AsyncReadExt;
51 use ylong_runtime::io::AsyncWriteExt;
52 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
53 use ylong_runtime::task::JoinHandle;
54 
55 pub const ARG_COUNT2: u32 = 2;
56 pub const BUF_SIZE_SMALL: usize = 256;
57 pub const SOCKET_BUFFER_SIZE: usize = 65535;
58 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
59 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
60 
61 #[cfg(feature = "host")]
62 #[derive(Clone, Debug)]
63 pub struct HdcForwardInfo {
64     pub session_id: u32,
65     pub channel_id: u32,
66     pub forward_direction: bool,
67     pub task_string: String,
68     pub connect_key: String,
69 }
70 
71 #[cfg(feature = "host")]
72 impl HdcForwardInfo {
newnull73     fn new(
74         session_id: u32,
75         channel_id: u32,
76         forward_direction: bool,
77         task_string: String,
78         connect_key: String,
79     ) -> Self {
80         Self {
81             session_id,
82             channel_id,
83             forward_direction,
84             task_string,
85             connect_key,
86         }
87     }
88 }
89 
90 #[derive(Default, Eq, PartialEq, Clone, Debug)]
91 enum ForwardType {
92     #[default]
93     Tcp = 0,
94     Device,
95     Abstract,
96     FileSystem,
97     Jdwp,
98     Ark,
99     Reserved,
100 }
101 
102 #[derive(Debug, Default, PartialEq, Eq, Clone)]
103 pub struct ContextForward {
104     session_id: u32,
105     channel_id: u32,
106     check_order: bool,
107     is_master: bool,
108     id: u32,
109     fd: i32,
110     target_fd: i32,
111     forward_type: ForwardType,
112     local_args: Vec<String>,
113     remote_args: Vec<String>,
114     task_command: String,
115     last_error: String,
116     remote_parameters: String,
117     dev_path: String,
118 }
119 
120 #[cfg(feature = "host")]
121 type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
122 #[cfg(feature = "host")]
123 type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
124 #[cfg(feature = "host")]
125 pub struct HdcForwardInfoMap {}
126 #[cfg(feature = "host")]
127 impl HdcForwardInfoMap {
get_instancenull128     fn get_instance() -> HdcForwardInfoMap_ {
129         static mut MAP: Option<HdcForwardInfoMap_> = None;
130         unsafe {
131             MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
132                 .clone()
133         }
134     }
135 
136     async fn put(forward_info: HdcForwardInfo) {
137         let instance = Self::get_instance();
138         let mut map = instance.lock().await;
139         map.insert(
140             forward_info.task_string.clone(),
141             Arc::new(Mutex::new(forward_info)),
142         );
143     }
144 
145     pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
146         let instance = Self::get_instance();
147         let map = instance.lock().await;
148         let mut result = Vec::new();
149         for (_key, value) in map.iter() {
150             let info = value.lock().await;
151             result.push((*info).clone());
152         }
153         result
154     }
155 
156     pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
157         crate::info!(
158             "remove_forward task_string:{}, direction:{}",
159             task_string,
160             forward_direction
161         );
162         let instance = Self::get_instance();
163         let map = instance.lock().await;
164         let mut remove_key = String::new();
165         let prefix = if forward_direction {
166             "1|".to_string()
167         } else {
168             "0|".to_string()
169         };
170         let mut task_string1 = prefix;
171         task_string1.push_str(task_string.as_str());
172         for (key, value) in map.iter() {
173             let info = value.lock().await;
174             if info.task_string.contains(&task_string1)
175                 && info.forward_direction == forward_direction
176             {
177                 remove_key = (*key.clone()).to_string();
178                 break;
179             }
180         }
181         drop(map);
182         if remove_key.is_empty() {
183             return false;
184         }
185 
186         let mut map = instance.lock().await;
187         let result = map.remove(&remove_key);
188         result.is_some()
189     }
190 }
191 
192 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
193 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
194 pub struct TcpWriteStreamMap {}
195 impl TcpWriteStreamMap {
get_instancenull196     fn get_instance() -> TcpWriterMap_ {
197         static mut TCP_MAP: Option<TcpWriterMap_> = None;
198         unsafe {
199             TCP_MAP
200                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
201                 .clone()
202         }
203     }
204     #[allow(unused)]
205     async fn put(id: u32, wr: SplitWriteHalf) {
206         let instance = Self::get_instance();
207         let mut map = instance.write().await;
208         let arc_wr = Arc::new(Mutex::new(wr));
209         map.insert(id, arc_wr);
210     }
211     #[allow(unused)]
212     async fn write(id: u32, data: Vec<u8>) -> bool {
213         let arc_map = Self::get_instance();
214         let map = arc_map.write().await;
215         let Some(arc_wr) = map.get(&id) else {
216             crate::error!("TcpWriteStreamMap failed to get id {:#?}", id);
217             return false;
218         };
219         let mut wr = arc_wr.lock().await;
220         let write_result = wr.write_all(data.as_slice()).await;
221         if write_result.is_err() {
222             crate::error!("TcpWriteStreamMap write_all error. id = {:#?}", id);
223         }
224         true
225     }
226 
227     pub async fn end(id: u32) {
228         let instance = Self::get_instance();
229         let mut map = instance.write().await;
230         if let Some(arc_wr) = map.remove(&id) {
231             let mut wr = arc_wr.lock().await;
232             let _ = wr.shutdown().await;
233         }
234     }
235 }
236 
237 type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
238 type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
239 pub struct TcpListenerMap {}
240 impl TcpListenerMap {
get_instancenull241     fn get_instance() -> TcpListenerMap_ {
242         static mut TCP_MAP: Option<TcpListenerMap_> = None;
243         unsafe {
244             TCP_MAP
245                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
246                 .clone()
247         }
248     }
249     #[allow(unused)]
250     async fn put(id: u32, listener: JoinHandle<()>) {
251         let instance = Self::get_instance();
252         let mut map = instance.write().await;
253         let arc_listener = Arc::new(Mutex::new(listener));
254         map.insert(id, arc_listener);
255         crate::info!("forward tcp put listener id = {id}");
256     }
257 
258     pub async fn end(id: u32) {
259         let instance = Self::get_instance();
260         let mut map = instance.write().await;
261         if let Some(arc_listener) = map.remove(&id) {
262             let join_handle = arc_listener.lock().await;
263             join_handle.cancel();
264         }
265     }
266 }
267 
268 type MapContextForward_ = Arc<Mutex<HashMap<u32, ContextForward>>>;
269 pub struct ForwardContextMap {}
270 impl ForwardContextMap {
get_instancenull271     fn get_instance() -> &'static MapContextForward_ {
272         static mut CONTEXT_MAP: MaybeUninit<MapContextForward_> = MaybeUninit::uninit();
273         static ONCE: Once = Once::new();
274         unsafe {
275             ONCE.call_once(|| {
276                     CONTEXT_MAP = MaybeUninit::new(Arc::new(Mutex::new(HashMap::new())));
277                 }
278             );
279             &*CONTEXT_MAP.as_ptr()
280         }
281     }
282 
283     pub async fn update(cid: u32, value: ContextForward) {
284         let map = Self::get_instance();
285         let mut map = map.lock().await;
286         map.insert(cid, value.clone());
287     }
288 
289     pub async fn remove(cid: u32) {
290         crate::info!("ContextForward remove, cid:{}", cid);
291         let map = Self::get_instance();
292         let mut map = map.lock().await;
293         let _ = map.remove(&cid);
294     }
295 
296     pub async fn get(cid: u32) -> Option<ContextForward> {
297         let arc = Self::get_instance();
298         let map = arc.lock().await;
299         let task = map.get(&cid);
300         match task {
301             Some(task) => Some(task.clone()),
302             None => {
303                 crate::debug!("ContextForward result:is none,cid={:#?}", cid,);
304                 Option::None
305             }
306         }
307     }
308 }
309 
310 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
311 pub struct ForwardTaskMap {}
312 impl ForwardTaskMap {
get_instancenull313     fn get_instance() -> MapForward_ {
314         static mut FORWARD_MAP: Option<MapForward_> = None;
315         unsafe {
316             FORWARD_MAP
317                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
318                 .clone()
319         }
320     }
321 
322     pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
323         let map = Self::get_instance();
324         let mut map = map.lock().await;
325         map.insert((session_id, channel_id), value.clone());
326     }
327 
328     pub async fn remove(session_id: u32, channel_id: u32) {
329         crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
330         let map = Self::get_instance();
331         let mut map = map.lock().await;
332         let _ = map.remove(&(session_id, channel_id));
333     }
334 
335     pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
336         let arc = Self::get_instance();
337         let map = arc.lock().await;
338         let task = map.get(&(session_id, channel_id));
339         match task {
340             Some(task) => Some(task.clone()),
341             None => {
342                 crate::debug!(
343                     "Forward TaskMap result:is none,session_id={:#?}, channel_id={:#?}",
344                     session_id,
345                     channel_id
346                 );
347                 Option::None
348             }
349         }
350     }
351 
352     pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
353         let arc = Self::get_instance();
354         let map = arc.lock().await;
355         for ((_session_id, _channel_id), value) in map.iter() {
356             if *_session_id == session_id && task_string.contains(&value.task_command) {
357                 return Some(*_channel_id);
358             }
359         }
360         None
361     }
362 
363     pub async fn clear(session_id: u32) {
364         let arc = Self::get_instance();
365         let mut channel_list = Vec::new();
366         {
367             let map = arc.lock().await;
368             if map.is_empty() {
369                 return;
370             }
371             for (&key, _) in map.iter() {
372                 if key.0 == session_id {
373                     let id = key;
374                     channel_list.push(id);
375                 }
376             }
377         }
378         for id in channel_list {
379             free_channel_task(id.0, id.1).await;
380         }
381     }
382 
383     pub async fn dump_task() -> String {
384         let arc = Self::get_instance();
385         let map = arc.lock().await;
386         let mut result = String::new();
387         for (_id, forward_task) in map.iter() {
388             let forward_type = match forward_task.remote_args.len() {
389                 0 => "fport".to_string(),
390                 2 => "rport".to_string(),
391                 _ => "unknown".to_string(),
392             };
393             let first_args = match forward_task.remote_args.len() {
394                 0 => "unknown".to_string(),
395                 2 => format!(
396                     "{}:{}",
397                     forward_task.local_args[0], forward_task.local_args[1]
398                 ),
399                 _ => "unknown".to_string(),
400             };
401             let second_args = match forward_task.remote_args.len() {
402                 0 => format!(
403                     "{}:{}",
404                     forward_task.local_args[0], forward_task.local_args[1]
405                 ),
406                 2 => format!(
407                     "{}:{}",
408                     forward_task.remote_args[0], forward_task.remote_args[1]
409                 ),
410                 _ => "unknown".to_string(),
411             };
412             result.push_str(&format!(
413                 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
414                 forward_task.session_id,
415                 forward_task.channel_id,
416                 forward_type,
417                 first_args,
418                 second_args
419             ));
420         }
421         result
422     }
423 }
424 
425 pub async fn free_channel_task(session_id: u32, channel_id: u32) {
426     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
427         return;
428     };
429     let task = &mut task.clone();
430     let cid = task.context_forward.id;
431     crate::info!("free channel context session_id:{session_id}, channel_id:{channel_id}, cid:{cid}");
432 
433     match task.forward_type {
434         ForwardType::Tcp => {
435             TcpWriteStreamMap::end(cid).await;
436             TcpListenerMap::end(channel_id).await;
437         }
438         ForwardType::Jdwp | ForwardType::Ark => {
439             TcpWriteStreamMap::end(cid).await;
440             let ret = unsafe { libc::close(task.context_forward.fd) };
441             crate::debug!(
442                 "close context_forward fd, ret={}, session_id={}, channel_id={}",
443                 ret,
444                 session_id,
445                 channel_id,
446             );
447             let target_fd_ret = unsafe { libc::close(task.context_forward.target_fd) };
448             crate::debug!(
449                 "close context_forward target fd, ret={}, session_id={}, channel_id={}",
450                 target_fd_ret,
451                 session_id,
452                 channel_id,
453             );
454             TcpListenerMap::end(channel_id).await;
455         }
456         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
457             #[cfg(not(target_os = "windows"))]
458             UdsServer::wrap_close(task.context_forward.fd);
459         }
460         ForwardType::Device => {
461             return;
462         }
463     }
464     ForwardTaskMap::remove(session_id, channel_id).await;
465 }
466 
467 pub async fn stop_task(session_id: u32) {
468     ForwardTaskMap::clear(session_id).await;
469 }
470 
471 pub async fn dump_task() -> String {
472     ForwardTaskMap::dump_task().await
473 }
474 
475 #[derive(Debug, Default, Clone, PartialEq, Eq)]
476 pub struct HdcForward {
477     session_id: u32,
478     channel_id: u32,
479     server_or_daemon: bool,
480     local_args: Vec<String>,
481     remote_args: Vec<String>,
482     task_command: String,
483     forward_type: ForwardType,
484     context_forward: ContextForward,
485     pub transfer: HdcTransferBase,
486 }
487 
488 impl HdcForward {
newnull489     pub fn new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self {
490         Self {
491             session_id,
492             channel_id,
493             server_or_daemon,
494             local_args: Default::default(),
495             remote_args: Default::default(),
496             task_command: Default::default(),
497             forward_type: Default::default(),
498             context_forward: Default::default(),
499             transfer: HdcTransferBase::new(session_id, channel_id),
500         }
501     }
502 }
503 
check_node_infonull504 pub fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
505     crate::info!("check cmd args is : {:#?}", value);
506     if !value.contains(':') {
507         return false;
508     }
509     let array = value.split(':').collect::<Vec<&str>>();
510     if array[0] == "tcp" {
511         if array[1].len() > config::MAX_PORT_LEN {
512             crate::error!(
513                 "forward port = {:#?} it'slength is wrong, can not more than five",
514                 array[1]
515             );
516             return false;
517         }
518 
519         match array[1].parse::<u32>() {
520             Ok(port) => {
521                 if port == 0 || port > config::MAX_PORT_NUM {
522                     crate::error!("port can not greater than: 65535");
523                     return false;
524                 }
525             }
526             Err(_) => {
527                 crate::error!("port must is int type, port is: {:#?}", array[1]);
528                 return false;
529             }
530         }
531     }
532     for item in array.iter() {
533         arg.push(String::from(item.to_owned()));
534     }
535     true
536 }
537 
538 #[cfg(feature = "host")]
539 pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
540     crate::info!("on_forward_success");
541     let channel_id = task_message.channel_id;
542     let payload = task_message.payload;
543     let forward_direction = payload[0] == b'1';
544     let connect_key = "unknow key".to_string();
545     let task_string = String::from_utf8(payload);
546     match task_string {
547         Ok(task_string) => {
548             let info = HdcForwardInfo::new(
549                 session_id,
550                 channel_id,
551                 forward_direction,
552                 task_string,
553                 connect_key,
554             );
555             HdcForwardInfoMap::put(info).await;
556         }
557         Err(err) => {
558             crate::error!("payload to String failed. {err}");
559         }
560     }
561     transfer::TcpMap::end(task_message.channel_id).await;
562     Ok(())
563 }
564 
565 pub async fn check_command(
566     ctx: &mut ContextForward,
567     _payload: &[u8],
568     server_or_daemon: bool,
569 ) -> bool {
570     let channel_id = ctx.channel_id;
571     if !_payload.is_empty() {
572         hdctransfer::echo_client(
573             ctx.session_id,
574             channel_id,
575             "Forwardport result:OK",
576             MessageLevel::Ok,
577         )
578         .await;
579         let map_info = String::from(if server_or_daemon { "1|" } else { "0|" }) + &ctx.task_command;
580 
581         let mut command_string = vec![0_u8; map_info.len() + 1];
582         map_info
583             .as_bytes()
584             .to_vec()
585             .iter()
586             .enumerate()
587             .for_each(|(i, e)| {
588                 command_string[i] = *e;
589             });
590         let forward_success_message = TaskMessage {
591             channel_id,
592             command: HdcCommand::ForwardSuccess,
593             payload: command_string,
594         };
595         #[cfg(feature = "host")]
596         {
597             let _ = on_forward_success(forward_success_message, ctx.session_id).await;
598         }
599         #[cfg(not(feature = "host"))]
600         {
601             transfer::put(ctx.session_id, forward_success_message).await;
602         }
603     } else {
604         hdctransfer::echo_client(
605             ctx.session_id,
606             channel_id,
607             "Forwardport result: Failed",
608             MessageLevel::Fail,
609         )
610         .await;
611         free_context(ctx.id, false).await;
612         return false;
613     }
614     true
615 }
616 
617 pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool {
618     let type_str = &ctx_point.local_args[0];
619     match type_str.as_str() {
620         "tcp" => {
621             ctx_point.forward_type = ForwardType::Tcp;
622         }
623         "dev" => {
624             ctx_point.forward_type = ForwardType::Device;
625         }
626         "localabstract" => {
627             ctx_point.forward_type = ForwardType::Abstract;
628         }
629         "localfilesystem" => {
630             ctx_point.local_args[1] =
631                 HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
632             ctx_point.forward_type = ForwardType::FileSystem;
633         }
634         "jdwp" => {
635             ctx_point.forward_type = ForwardType::Jdwp;
636         }
637         "ark" => {
638             ctx_point.forward_type = ForwardType::Ark;
639         }
640         "localreserved" => {
641             ctx_point.local_args[1] =
642                 FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
643             ctx_point.forward_type = ForwardType::Reserved;
644         }
645         _ => {
646             crate::error!("this forward type may is not expected");
647             return false;
648         }
649     }
650     true
651 }
652 
653 pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> {
654     let saddr = format!("127.0.0.1:{}", port);
655     let session_tmp = ctx.session_id;
656     let channel_tmp = ctx.channel_id;
657     let remote_parameters = ctx.remote_parameters.clone();
658     let ctx_type = ctx.forward_type.clone();
659 
660     let result = TcpListener::bind(saddr.clone()).await;
661     match result {
662         Ok(listener) => {
663             crate::info!("forward tcp accept bind ok, addr:{:#?}", saddr);
664             let join_handle = utils::spawn(async move {
665                 loop {
666                     let (stream, _addr) = match listener.accept().await {
667                         Ok((stream, _addr)) => (stream, _addr),
668                         Err(err) => {
669                             crate::error!("listener accept failed, {err}");
670                             continue;
671                         }
672                     };
673                     let (rd, wr) = stream.into_split();
674                     let mut client_context = malloc_context(session_tmp, channel_tmp, true).await;
675                     client_context.remote_parameters = remote_parameters.clone();
676                     client_context.forward_type = ctx_type.clone();
677                     ForwardContextMap::update(client_context.id, client_context.clone()).await;
678                     crate::info!("malloc client_context id = {:#?}", client_context.id);
679                     TcpWriteStreamMap::put(client_context.id, wr).await;
680                     on_accept(client_context.id).await;
681                     utils::spawn(async move {
682                         recv_tcp_msg(session_tmp, channel_tmp, rd, client_context.id).await;
683                     });
684                 }
685             });
686             TcpListenerMap::put(channel_tmp, join_handle).await;
687             Ok(())
688         }
689         Err(e) => {
690             crate::error!("forward_ tcp_accept fail:{:#?}", e);
691             ctx.last_error = format!("TCP Port listen failed at {}", port);
692             Err(e)
693         }
694     }
695 }
696 
697 pub async fn on_accept(cid: u32) {
698     let Some(context_forward) = ForwardContextMap::get(cid).await else {
699         crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
700         return;
701     };
702     let ctx = &mut context_forward.clone();
703 
704     let buf_string: Vec<u8> = ctx.remote_parameters.clone().as_bytes().to_vec();
705     let mut new_buf = vec![0_u8; buf_string.len() + 9];
706 
707     buf_string.iter().enumerate().for_each(|(i, e)| {
708         new_buf[i + 8] = *e;
709     });
710     send_to_task(
711         ctx.session_id,
712         ctx.channel_id,
713         HdcCommand::ForwardActiveSlave,
714         &new_buf,
715         buf_string.len() + 9,
716         ctx.id,
717     )
718     .await;
719 }
720 
721 pub async fn daemon_connect_tcp(cid: u32, port: u32) {
722     let Some(context_forward) = ForwardContextMap::get(cid).await else {
723         crate::error!("daemon connect tcp get context is none cid={cid}");
724         return;
725     };
726     let ctx = &mut context_forward.clone();
727 
728     let saddr = format!("127.0.0.1:{}", port);
729     let stream = match TcpStream::connect(saddr).await {
730         Err(err) => {
731             crate::error!("TcpStream::stream failed {:?}", err);
732             free_context(cid, true).await;
733             return;
734         }
735         Ok(addr) => addr,
736     };
737     send_active_master(ctx).await;
738     let (rd, wr) = stream.into_split();
739     TcpWriteStreamMap::put(ctx.id, wr).await;
740     ForwardContextMap::update(ctx.id, ctx.clone()).await;
741 
742     let session_tmp = ctx.session_id;
743     let channel_tmp = ctx.channel_id;
744     update_context_to_task(session_tmp, channel_tmp, ctx).await;
745     utils::spawn(async move {
746         recv_tcp_msg(session_tmp, channel_tmp, rd, cid).await;
747     });
748 }
749 
750 pub async fn update_context_to_task(session_id: u32, channel_id:u32, ctx: &mut ContextForward) {
751     let Some(task) = ForwardTaskMap::get(ctx.session_id, ctx.channel_id).await else {
752         crate::error!(
753             "update context to task is none session_id={:#?},channel_id={:#?}",
754             session_id,
755             channel_id
756         );
757         return;
758     };
759     let task = &mut task.clone();
760     task.context_forward = ctx.clone();
761     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
762 }
763 
764 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
765     let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
766     loop {
767         match rd.read(&mut data).await {
768             Ok(recv_size) => {
769                 if recv_size == 0 {
770                     crate::info!("recv_size is 0, tcp temporarily closed");
771                     free_context(cid, true).await;
772                     return;
773                 }
774                 send_to_task(
775                     session_id,
776                     channel_id,
777                     HdcCommand::ForwardData,
778                     &data[0..recv_size],
779                     recv_size,
780                     cid,
781                 )
782                 .await;
783             }
784             Err(_e) => {
785                 free_context(cid, true).await;
786                 crate::error!(
787                     "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
788                 );
789             }
790         }
791     }
792 }
793 
794 #[cfg(not(target_os = "windows"))]
795 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32) {
796     loop {
797         let result = ylong_runtime::spawn_blocking(move || {
798             let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
799             let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
800             (recv_size, buffer)
801         })
802         .await;
803         let (recv_size, buffer) = match result {
804             Ok((recv_size, _)) if recv_size < 0 => {
805                 crate::error!("local abstract close shutdown fd = {fd}");
806                 free_context(cid, true).await;
807                 return;
808             }
809             Ok((recv_size, buffer)) => (recv_size, buffer),
810             Err(err) => {
811                 crate::error!("read socket msg failed. {err}");
812                 free_context(cid, true).await;
813                 return;
814             }
815         };
816         send_to_task(
817             session_id,
818             channel_id,
819             HdcCommand::ForwardData,
820             &buffer[0..recv_size as usize],
821             recv_size as usize,
822             cid,
823         )
824         .await;
825     }
826 }
827 
828 pub async fn free_context(cid: u32, notify_remote: bool) {
829     let Some(context_forward) = ForwardContextMap::get(cid).await else {
830         crate::error!("free forward context get cid  is none. cid = {cid}");
831         return;
832     };
833     let ctx = &mut context_forward.clone();
834 
835     if notify_remote {
836         let vec_none = Vec::<u8>::new();
837         send_to_task(
838             ctx.session_id,
839             ctx.channel_id,
840             HdcCommand::ForwardFreeContext,
841             &vec_none,
842             0,
843             ctx.id,
844         )
845         .await;
846     }
847     crate::error!("begin to free forward context cid. cid = {cid}");
848     match ctx.forward_type {
849         ForwardType::Tcp => {
850             TcpWriteStreamMap::end(ctx.id).await;
851         }
852         ForwardType::Jdwp | ForwardType::Ark => {
853             TcpWriteStreamMap::end(ctx.id).await;
854             let ret = unsafe { libc::close(ctx.fd) };
855             crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,);
856             let target_fd_ret = unsafe { libc::close(ctx.target_fd) };
857             crate::debug!(
858                 "close context_forward target fd, ret={}, id={}",
859                 target_fd_ret,
860                 ctx.id,
861             );
862         }
863         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
864             crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd);
865             #[cfg(not(target_os = "windows"))]
866             UdsServer::wrap_close(ctx.fd);
867             ctx.fd = -1;
868         }
869         ForwardType::Device => {}
870     }
871     ForwardContextMap::remove(cid).await;
872 }
873 
874 pub async fn setup_tcp_point(ctx: &mut ContextForward) -> bool {
875     let Ok(port) = ctx.local_args[1].parse::<u32>() else {
876         crate::error!("setup tcp point parse error");
877         return false;
878     };
879     let cid = ctx.id;
880     if ctx.is_master {
881         let result = forward_tcp_accept(ctx, port).await;
882         return result.is_ok();
883     } else {
884         ForwardContextMap::update(ctx.id, ctx.clone()).await;
885         utils::spawn(async move { daemon_connect_tcp(cid, port).await });
886     }
887     true
888 }
889 
890 #[cfg(not(target_os = "windows"))]
891 async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool {
892     let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
893     ctx.fd = fd;
894     let name: Vec<u8> = path.as_bytes().to_vec();
895     let mut socket_name = vec![0_u8; name.len() + 1];
896     socket_name[0] = b'\0';
897     name.iter().enumerate().for_each(|(i, e)| {
898         socket_name[i + 1] = *e;
899     });
900     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
901     ForwardContextMap::update(ctx.id, ctx.clone()).await;
902     let cid = ctx.id;
903     if let Ok(addr_obj) = &addr {
904         let ret = UdsServer::wrap_bind(fd, addr_obj);
905         if ret.is_err() {
906             hdctransfer::echo_client(
907                 ctx.session_id,
908                 ctx.channel_id,
909                 "Unix pipe bind failed",
910                 MessageLevel::Fail,
911             )
912             .await;
913             crate::error!("bind fail");
914             return false;
915         }
916         let ret = UdsServer::wrap_listen(fd);
917         if ret < 0 {
918             hdctransfer::echo_client(
919                 ctx.session_id,
920                 ctx.channel_id,
921                 "Unix pipe listen failed",
922                 MessageLevel::Fail,
923             )
924             .await;
925             crate::error!("listen fail");
926             return false;
927         }
928         utils::spawn(async move {
929             loop {
930                 let client_fd = UdsServer::wrap_accept(fd);
931                 if client_fd == -1 {
932                     break;
933                 }
934                 on_accept(cid).await;
935             }
936         });
937     }
938     true
939 }
940 
941 pub async fn canonicalize(path: String) -> Result<String, Error> {
942     match fs::canonicalize(path) {
943         Ok(abs_path) => match abs_path.to_str() {
944             Some(path) => Ok(path.to_string()),
945             None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
946         },
947         Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
948     }
949 }
950 
951 #[cfg(target_os = "windows")]
952 pub async fn setup_device_point(_ctx: &mut ContextForward) -> bool {
953     false
954 }
955 
956 #[cfg(not(target_os = "windows"))]
957 pub async fn setup_device_point(ctx: &mut ContextForward) -> bool {
958     let s_node_cfg = ctx.local_args[1].clone();
959     let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
960         crate::error!("Open unix-dev failed");
961         return false;
962     };
963     ctx.dev_path = resolv_path.clone();
964     crate::info!("setup_ device_point resolv_path={:?}", resolv_path);
965     let thread_path_ref = Arc::new(Mutex::new(resolv_path));
966     if !send_active_master(ctx).await {
967         crate::error!("send active_master return failed ctx={:?}", ctx);
968         return false;
969     }
970     let session = ctx.session_id;
971     let channel = ctx.channel_id;
972     let cid = ctx.id;
973 
974     ForwardContextMap::update(ctx.id, ctx.clone()).await;
975     utils::spawn(async move {
976         loop {
977             let path = thread_path_ref.lock().await;
978             let Ok(mut file) = File::open(&*path) else {
979                 crate::error!("open {} failed.", *path);
980                 break;
981             };
982             let mut total = Vec::new();
983             let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
984                 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
985             let Ok(read_len) = file.read(&mut buf[4..]) else {
986                 crate::error!("read {} failed.", *path);
987                 break;
988             };
989             if read_len == 0 {
990                 free_context(cid, true).await;
991                 break;
992             }
993             total.append(&mut buf[0..read_len].to_vec());
994             send_to_task(
995                 session,
996                 channel,
997                 HdcCommand::ForwardData,
998                 &total,
999                 read_len,
1000                 cid,
1001             )
1002             .await;
1003         }
1004     });
1005     true
1006 }
1007 
1008 #[cfg(not(feature = "host"))]
get_pidnull1009 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
1010     match forward_type == ForwardType::Jdwp {
1011         true => parameter.parse::<u32>().unwrap_or_else(|e| {
1012             crate::error!("Jdwp get pid err :{:?}", e);
1013             0_u32
1014         }),
1015         false => {
1016             let params: Vec<&str> = parameter.split('@').collect();
1017             params[0].parse::<u32>().unwrap_or_else(|e| {
1018                 crate::error!("get pid err :{:?}", e);
1019                 0_u32
1020             })
1021         }
1022     }
1023 }
1024 
1025 #[cfg(feature = "host")]
1026 pub async fn setup_jdwp_point(_ctx: &mut ContextForward) -> bool {
1027     crate::info!("host not setup_jdwp _point");
1028     false
1029 }
1030 
1031 #[cfg(not(feature = "host"))]
1032 pub async fn setup_jdwp_point(ctx: &mut ContextForward) -> bool {
1033     let local_args = ctx.local_args[1].clone();
1034     let parameter = local_args.as_str();
1035     let style = &ctx.forward_type;
1036     let pid = get_pid(parameter, style.clone());
1037     let cid = ctx.id;
1038     let session = ctx.session_id;
1039     let channel = ctx.channel_id;
1040     if pid == 0 {
1041         crate::error!("setup jdwp point get pid is 0");
1042         return false;
1043     }
1044 
1045     let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1046     if result.is_err() {
1047         crate::error!("wrap socketpair failed");
1048         return false;
1049     }
1050     let mut target_fd = 0;
1051     let mut local_fd = 0;
1052     if let Ok((fd0, fd1)) = result {
1053         crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1054         local_fd = fd0;
1055         target_fd = fd1;
1056         ctx.fd = local_fd;
1057         ctx.target_fd = target_fd;
1058         target_fd = fd1;
1059     }
1060 
1061     utils::spawn(async move {
1062         loop {
1063             let result = ylong_runtime::spawn_blocking(move || {
1064                 let mut buffer = [0u8; SOCKET_BUFFER_SIZE];
1065                 let size = UdsServer::wrap_read(local_fd, &mut buffer);
1066                 (size, buffer)
1067             })
1068             .await;
1069             let (size, buffer) = match result {
1070                 Ok((size, _)) if size < 0 => {
1071                     crate::error!("disconnect fd:({local_fd}, {target_fd}), error:{:?}", size);
1072                     free_context(cid, true).await;
1073                     break;
1074                 }
1075                 Ok((0, _)) => {
1076                     ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1077                     continue;
1078                 }
1079                 Ok((size, buffer)) => (size, buffer),
1080                 Err(err) => {
1081                     crate::error!("spawn_blocking failed. disconnect fd:({local_fd}, {target_fd}), error:{err}");
1082                     free_context(cid, true).await;
1083                     break;
1084                 }
1085             };
1086             send_to_task(
1087                 session,
1088                 channel,
1089                 HdcCommand::ForwardData,
1090                 &buffer[0..size as usize],
1091                 size as usize,
1092                 cid,
1093             )
1094             .await;
1095         }
1096     });
1097 
1098     let jdwp = Jdwp::get_instance();
1099     let mut param = ctx.local_args[0].clone();
1100     param.push(':');
1101     param.push_str(parameter);
1102 
1103     let ret = jdwp
1104         .send_fd_to_target(pid, target_fd, local_fd, param.as_str())
1105         .await;
1106     if !ret {
1107         crate::error!("not found pid:{:?}", pid);
1108         hdctransfer::echo_client(
1109             session,
1110             channel,
1111             format!("fport fail:pid not found:{}", pid).as_str(),
1112             MessageLevel::Fail,
1113         )
1114         .await;
1115         task_finish(session, channel).await;
1116         return false;
1117     }
1118 
1119     let vec_none = Vec::<u8>::new();
1120     send_to_task(
1121         session,
1122         channel,
1123         HdcCommand::ForwardActiveMaster, // 04
1124         &vec_none,
1125         0,
1126         cid,
1127     )
1128     .await;
1129     crate::info!("setup_jdwp_ point return true");
1130     true
1131 }
1132 
1133 async fn task_finish(session_id: u32, channel_id: u32) {
1134     transfer_task_finish(channel_id, session_id).await;
1135 }
1136 
1137 #[cfg(not(target_os = "windows"))]
1138 pub async fn daemon_connect_pipe(ctx: &mut ContextForward) {
1139     let name: Vec<u8> = ctx.local_args[1].clone().as_bytes().to_vec();
1140     let mut socket_name = vec![0_u8; name.len() + 1];
1141     socket_name[0] = b'\0';
1142     name.iter().enumerate().for_each(|(i, e)| {
1143         socket_name[i + 1] = *e;
1144     });
1145     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1146     if let Ok(addr_obj) = &addr {
1147         let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj);
1148         if ret.is_err() {
1149             hdctransfer::echo_client(
1150                 ctx.session_id,
1151                 ctx.channel_id,
1152                 "localabstract connect fail",
1153                 MessageLevel::Fail,
1154             )
1155             .await;
1156             free_context(ctx.id, true).await;
1157             return;
1158         }
1159         send_active_master(ctx).await;
1160         read_data_to_forward(ctx).await;
1161     }
1162 }
1163 
1164 #[cfg(target_os = "windows")]
1165 pub async fn setup_file_point(_ctx: &mut ContextForward) -> bool {
1166     false
1167 }
1168 
1169 #[cfg(not(target_os = "windows"))]
1170 pub async fn setup_file_point(ctx: &mut ContextForward) -> bool {
1171     let s_node_cfg = ctx.local_args[1].clone();
1172     if ctx.is_master {
1173         if ctx.forward_type == ForwardType::Reserved || ctx.forward_type == ForwardType::FileSystem
1174         {
1175             let _ = fs::remove_file(s_node_cfg.clone());
1176         }
1177         if !server_socket_bind_listen(ctx, s_node_cfg).await {
1178             crate::error!("server socket bind listen failed id={:?}", ctx.id);
1179             task_finish(ctx.session_id, ctx.channel_id).await;
1180             return false;
1181         }
1182     } else {
1183         if ctx.fd <= 0 {
1184             crate::info!("setup_file _point fd: {:?}", ctx.fd);
1185             if ctx.forward_type == ForwardType::Abstract {
1186                 ctx.fd = UdsClient::wrap_socket(AF_LOCAL);
1187                 unsafe {
1188                     libc::fcntl(ctx.fd, F_SETFD, FD_CLOEXEC);
1189                 }
1190             } else {
1191                 ctx.fd = UdsClient::wrap_socket(AF_UNIX);
1192             }
1193         }
1194         ForwardContextMap::update(ctx.id, ctx.clone()).await;
1195         daemon_connect_pipe(ctx).await;
1196     }
1197     true
1198 }
1199 
1200 pub async fn setup_point(ctx: &mut ContextForward) -> bool {
1201     if !detech_forward_type(ctx).await {
1202         crate::error!("forward type is not true");
1203         return false;
1204     }
1205 
1206     if cfg!(target_os = "windows") && ctx.forward_type != ForwardType::Tcp {
1207         ctx.last_error = String::from("Not support forward-type");
1208         return false;
1209     }
1210 
1211     let mut ret = false;
1212     match ctx.forward_type {
1213         ForwardType::Tcp => {
1214             ret = setup_tcp_point(ctx).await;
1215         }
1216         ForwardType::Device => {
1217             if !cfg!(target_os = "windows") {
1218                 ret = setup_device_point(ctx).await;
1219             }
1220         }
1221         ForwardType::Jdwp | ForwardType::Ark => {
1222             crate::info!("setup point ark case");
1223             if !cfg!(feature = "host") {
1224                 ret = setup_jdwp_point(ctx).await;
1225             }
1226         }
1227         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1228             if !cfg!(target_os = "windows") {
1229                 ret = setup_file_point(ctx).await;
1230             }
1231         }
1232     };
1233     ForwardContextMap::update(ctx.id, ctx.clone()).await;
1234     update_context_to_task(ctx.session_id, ctx.channel_id, ctx).await;
1235     ret
1236 }
1237 
1238 pub async fn send_to_task(
1239     session_id: u32,
1240     channel_id: u32,
1241     command: HdcCommand,
1242     buf_ptr: &[u8],
1243     buf_size: usize,
1244     cid: u32,
1245 ) -> bool {
1246     if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1247         crate::error!("send task buf_size oversize");
1248         return false;
1249     }
1250 
1251     let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1252     new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1253     let file_check_message = TaskMessage {
1254         channel_id,
1255         command,
1256         payload: new_buf,
1257     };
1258     transfer::put(session_id, file_check_message).await;
1259     true
1260 }
1261 
get_cidnull1262 pub fn get_cid(_payload: &[u8]) -> u32 {
1263     let mut id_bytes = [0u8; 4];
1264     id_bytes.copy_from_slice(&_payload[0..4]);
1265     let id: u32 = u32::from_be_bytes(id_bytes);
1266     id
1267 }
1268 
1269 pub async fn send_active_master(ctx: &mut ContextForward) -> bool {
1270     if ctx.check_order {
1271         let flag = [0u8; 1];
1272         send_to_task(
1273             ctx.session_id,
1274             ctx.channel_id,
1275             HdcCommand::ForwardCheckResult,
1276             &flag,
1277             1,
1278             ctx.id,
1279         )
1280         .await;
1281         free_context(ctx.id, false).await;
1282         return true;
1283     }
1284     if !send_to_task(
1285         ctx.session_id,
1286         ctx.channel_id,
1287         HdcCommand::ForwardActiveMaster,
1288         &Vec::<u8>::new(),
1289         0,
1290         ctx.id,
1291     )
1292     .await
1293     {
1294         free_context(ctx.id, true).await;
1295         return false;
1296     }
1297     true
1298 }
1299 
forward_parse_cmdnull1300 pub fn forward_parse_cmd(context_forward: &mut ContextForward) -> bool {
1301     let command = context_forward.task_command.clone();
1302     let result = Base::split_command_to_args(&command);
1303     let argv = result.0;
1304     let argc = result.1;
1305 
1306     if argc < ARG_COUNT2 {
1307         crate::error!("argc < 2 parse is failed.");
1308         context_forward.last_error = "Too few arguments.".to_string();
1309         return false;
1310     }
1311     if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1312         crate::error!("parse's length is flase.");
1313         context_forward.last_error = "Some argument too long.".to_string();
1314         return false;
1315     }
1316     if !check_node_info(&argv[0], &mut context_forward.local_args) {
1317         crate::error!("check argv[0] node info is flase.");
1318         context_forward.last_error = "Arguments parsing failed.".to_string();
1319         return false;
1320     }
1321     if !check_node_info(&argv[1], &mut context_forward.remote_args) {
1322         crate::error!("check argv[1] node info is flase.");
1323         context_forward.last_error = "Arguments parsing failed.".to_string();
1324         return false;
1325     }
1326     context_forward.remote_parameters = argv[1].clone();
1327     true
1328 }
1329 
1330 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1331     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1332         crate::error!("begin forward get task is none session_id={session_id},channel_id={channel_id}"
1333         );
1334         return false;
1335     };
1336     let task: &mut HdcForward = &mut task.clone();
1337     let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1338         crate::error!("cmd argv is not int utf8");
1339         return false;
1340     };
1341     let mut context_forward = malloc_context(session_id, channel_id, true).await;
1342     context_forward.task_command = command.clone();
1343 
1344     if !forward_parse_cmd(&mut context_forward) {
1345         task.context_forward = context_forward.clone();
1346         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1347         return false;
1348     }
1349     if !setup_point(&mut context_forward).await {
1350         task.context_forward = context_forward.clone();
1351         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1352         return false;
1353     }
1354 
1355     let wake_up_message = TaskMessage {
1356         channel_id,
1357         command: HdcCommand::KernelWakeupSlavetask,
1358         payload: Vec::<u8>::new(),
1359     };
1360     transfer::put(context_forward.session_id, wake_up_message).await;
1361 
1362     let buf_string: Vec<u8> = context_forward.remote_parameters.as_bytes().to_vec();
1363     let mut new_buf = vec![0_u8; buf_string.len() + 9];
1364     buf_string.iter().enumerate().for_each(|(i, e)| {
1365         new_buf[i + 8] = *e;
1366     });
1367     send_to_task(
1368         context_forward.session_id,
1369         context_forward.channel_id,
1370         HdcCommand::ForwardCheck,
1371         &new_buf,
1372         buf_string.len() + 9,
1373         context_forward.id,
1374     )
1375     .await;
1376     true
1377 }
1378 
1379 pub async fn slave_connect(
1380     session_id: u32,
1381     channel_id: u32,
1382     payload: &[u8],
1383     check_order: bool,
1384 ) -> bool {
1385     let mut context_forward = malloc_context(session_id, channel_id, false).await;
1386     context_forward.check_order = check_order;
1387     if let Ok((content, id)) = filter_command(payload) {
1388         let content = &content[8..].trim_end_matches('\0').to_string();
1389         context_forward.task_command = content.clone();
1390         if !check_node_info(content, &mut context_forward.local_args) {
1391             crate::error!("check local args is false");
1392             return false;
1393         }
1394         context_forward.id = id;
1395         ForwardContextMap::update(id, context_forward.clone()).await;
1396     }
1397 
1398     if !check_order {
1399         if !setup_point(&mut context_forward).await {
1400             crate::error!("setup point return false, free context");
1401             free_context(context_forward.id, true).await;
1402             ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1403             return false;
1404         }
1405     } else {
1406         send_active_master(&mut context_forward).await;
1407     }
1408     ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1409     true
1410 }
1411 
1412 pub async fn read_data_to_forward(ctx: &mut ContextForward) {
1413     let cid = ctx.id;
1414     let session = ctx.session_id;
1415     let channel = ctx.channel_id;
1416     match ctx.forward_type {
1417         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1418             #[cfg(not(target_os = "windows"))]
1419             {
1420                 let fd_temp = ctx.fd;
1421                 utils::spawn(async move {
1422                     deamon_read_socket_msg(session, channel, fd_temp, cid).await
1423                 });
1424             }
1425         }
1426         ForwardType::Device => {
1427             #[cfg(not(target_os = "windows"))]
1428             setup_device_point(ctx).await;
1429         }
1430         _ => {}
1431     }
1432 }
1433 
filter_commandnull1434 pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1435     let bytes = &_payload[4..];
1436     let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1437     if let Ok(content) = ct {
1438         let mut id_bytes = [0u8; 4];
1439         id_bytes.copy_from_slice(&_payload[0..4]);
1440         let id: u32 = u32::from_be_bytes(id_bytes);
1441         return Ok((content, id));
1442     }
1443     Err(Error::new(ErrorKind::Other, "filter command failure"))
1444 }
1445 
1446 pub async fn dev_write_bufer(path: String, content: Vec<u8>) {
1447     utils::spawn(async move {
1448         let open_result = OpenOptions::new()
1449             .write(true)
1450             .create(true)
1451             .open(path.clone());
1452 
1453         match open_result {
1454             Ok(mut file) => {
1455                 let write_result = file.write_all(content.as_slice());
1456                 match write_result {
1457                     Ok(()) => {}
1458                     Err(e) => {
1459                         crate::error!("dev write bufer to file fail:{:#?}", e);
1460                     }
1461                 }
1462             }
1463             Err(e) => {
1464                 crate::error!("dev write bufer fail:{:#?}", e);
1465             }
1466         }
1467     });
1468 }
1469 
1470 pub async fn write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool {
1471     if ctx.forward_type == ForwardType::Tcp {
1472         return TcpWriteStreamMap::write(ctx.id, content).await;
1473     } else if ctx.forward_type == ForwardType::Device {
1474         let path_ref = ctx.dev_path.clone();
1475         if path_ref.is_empty() {
1476             crate::error!(
1477                 "write_forward_bufer get dev_path  is failed ctx.id = {:#?}",
1478                 ctx.id
1479             );
1480             return false;
1481         }
1482         dev_write_bufer(path_ref, content).await;
1483     } else {
1484         #[cfg(not(target_os = "windows"))]
1485         {
1486             let fd = ctx.fd;
1487             if UdsClient::wrap_send(fd, &content) < 0 {
1488                 crate::info!("write forward bufer failed. fd = {fd}");
1489                 return false;
1490             }
1491         }
1492     }
1493     true
1494 }
1495 
1496 #[allow(unused)]
1497 pub async fn malloc_context(
1498     session_id: u32,
1499     channel_id: u32,
1500     master_slave: bool,
1501 ) -> ContextForward {
1502     let mut ctx = ContextForward {
1503         ..Default::default()
1504     };
1505     ctx.id = utils::get_current_time() as u32;
1506     ctx.session_id = session_id;
1507     ctx.channel_id = channel_id;
1508     ctx.is_master = master_slave;
1509     crate::info!("malloc_context success id = {:#?}", ctx.id);
1510     ForwardContextMap::update(ctx.id, ctx.clone()).await;
1511     ctx
1512 }
1513 
1514 pub async fn forward_command_dispatch(
1515     session_id: u32,
1516     channel_id: u32,
1517     command: HdcCommand,
1518     _payload: &[u8],
1519 ) -> bool {
1520     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1521         crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1522         );
1523         return false;
1524     };
1525     let task: &mut HdcForward = &mut task.clone();
1526     let mut ret: bool = true;
1527     let cid = get_cid(_payload);
1528     let send_msg = _payload[4..].to_vec();
1529 
1530     let Some(context_forward) = ForwardContextMap::get(cid).await else {
1531         crate::error!("forward command dispatch get context is none cid={cid}");
1532         return false;
1533     };
1534     let ctx = &mut context_forward.clone();
1535     match command {
1536         HdcCommand::ForwardCheckResult => {
1537             ret = check_command(ctx, _payload, task.server_or_daemon).await;
1538         }
1539         HdcCommand::ForwardData => {
1540             ret = write_forward_bufer(ctx, send_msg).await;
1541         }
1542         HdcCommand::ForwardFreeContext => {
1543             free_context(ctx.id, false).await;
1544         }
1545         HdcCommand::ForwardActiveMaster => {
1546             read_data_to_forward(ctx).await;
1547             ret = true;
1548         }
1549         _ => {
1550             ret = false;
1551         }
1552     }
1553     ret
1554 }
1555 
1556 async fn print_error_info(session_id: u32, channel_id: u32) {
1557     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1558         crate::error!(
1559             "detech_forward_type get task is none session_id = {:#?}, channel_id = {:#?}",
1560             session_id,
1561             channel_id
1562         );
1563         return;
1564     };
1565     let task = &mut task.clone();
1566     let ctx = &task.context_forward;
1567     let mut echo_content = ctx.last_error.clone();
1568 
1569     if echo_content.is_empty() {
1570         echo_content = "Forward parament failed".to_string();
1571     }
1572 
1573     hdctransfer::echo_client(
1574         session_id,
1575         channel_id,
1576         echo_content.as_str(),
1577         MessageLevel::Fail,
1578     )
1579     .await;
1580 }
1581 
1582 pub async fn command_dispatch(
1583     session_id: u32,
1584     channel_id: u32,
1585     command: HdcCommand,
1586     payload: &[u8],
1587     _payload_size: u16,
1588 ) -> bool {
1589     if command != HdcCommand::ForwardData {
1590         crate::info!("command_dispatch command recv: {:?}", command);
1591     }
1592 
1593     let ret = match command {
1594         HdcCommand::ForwardInit => begin_forward(session_id, channel_id, payload).await,
1595         HdcCommand::ForwardCheck => slave_connect(session_id, channel_id, payload, true).await,
1596         HdcCommand::ForwardActiveSlave => {
1597             slave_connect(session_id, channel_id, payload, false).await
1598         }
1599         _ => forward_command_dispatch(session_id, channel_id, command, payload).await,
1600     };
1601     if !ret {
1602         print_error_info(session_id, channel_id).await;
1603         task_finish(session_id, channel_id).await;
1604         return false;
1605     }
1606     ret
1607 }
1608