1/*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15//! forward
16#![allow(missing_docs)]
17#[cfg(feature = "host")]
18extern crate ylong_runtime_static as ylong_runtime;
19
20#[cfg(not(feature = "host"))]
21use libc::SOCK_STREAM;
22#[cfg(not(target_os = "windows"))]
23use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24use std::collections::HashMap;
25#[cfg(not(target_os = "windows"))]
26use std::fs::{self, File, OpenOptions};
27#[cfg(not(target_os = "windows"))]
28use std::io::{self, Error, ErrorKind, Read, Write};
29use ylong_runtime::sync::{Mutex, RwLock};
30
31use crate::common::base::Base;
32use crate::common::hdctransfer::transfer_task_finish;
33use crate::common::hdctransfer::{self, HdcTransferBase};
34#[cfg(not(feature = "host"))]
35use crate::common::jdwp::Jdwp;
36#[cfg(not(target_os = "windows"))]
37use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
38use crate::config::HdcCommand;
39use crate::config::MessageLevel;
40use crate::config::TaskMessage;
41use crate::transfer;
42#[allow(unused)]
43use crate::utils::hdc_log::*;
44use crate::{config, utils};
45use std::mem::MaybeUninit;
46use std::sync::Arc;
47use std::sync::Once;
48#[cfg(not(feature = "host"))]
49use std::time::Duration;
50use ylong_runtime::io::AsyncReadExt;
51use ylong_runtime::io::AsyncWriteExt;
52use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
53use ylong_runtime::task::JoinHandle;
54
55pub const ARG_COUNT2: u32 = 2;
56pub const BUF_SIZE_SMALL: usize = 256;
57pub const SOCKET_BUFFER_SIZE: usize = 65535;
58pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
59pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
60
61#[cfg(feature = "host")]
62#[derive(Clone, Debug)]
63pub struct HdcForwardInfo {
64    pub session_id: u32,
65    pub channel_id: u32,
66    pub forward_direction: bool,
67    pub task_string: String,
68    pub connect_key: String,
69}
70
71#[cfg(feature = "host")]
72impl HdcForwardInfo {
73    fn new(
74        session_id: u32,
75        channel_id: u32,
76        forward_direction: bool,
77        task_string: String,
78        connect_key: String,
79    ) -> Self {
80        Self {
81            session_id,
82            channel_id,
83            forward_direction,
84            task_string,
85            connect_key,
86        }
87    }
88}
89
90#[derive(Default, Eq, PartialEq, Clone, Debug)]
91enum ForwardType {
92    #[default]
93    Tcp = 0,
94    Device,
95    Abstract,
96    FileSystem,
97    Jdwp,
98    Ark,
99    Reserved,
100}
101
102#[derive(Debug, Default, PartialEq, Eq, Clone)]
103pub struct ContextForward {
104    session_id: u32,
105    channel_id: u32,
106    check_order: bool,
107    is_master: bool,
108    id: u32,
109    fd: i32,
110    target_fd: i32,
111    forward_type: ForwardType,
112    local_args: Vec<String>,
113    remote_args: Vec<String>,
114    task_command: String,
115    last_error: String,
116    remote_parameters: String,
117    dev_path: String,
118}
119
120#[cfg(feature = "host")]
121type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
122#[cfg(feature = "host")]
123type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
124#[cfg(feature = "host")]
125pub struct HdcForwardInfoMap {}
126#[cfg(feature = "host")]
127impl HdcForwardInfoMap {
128    fn get_instance() -> HdcForwardInfoMap_ {
129        static mut MAP: Option<HdcForwardInfoMap_> = None;
130        unsafe {
131            MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
132                .clone()
133        }
134    }
135
136    async fn put(forward_info: HdcForwardInfo) {
137        let instance = Self::get_instance();
138        let mut map = instance.lock().await;
139        map.insert(
140            forward_info.task_string.clone(),
141            Arc::new(Mutex::new(forward_info)),
142        );
143    }
144
145    pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
146        let instance = Self::get_instance();
147        let map = instance.lock().await;
148        let mut result = Vec::new();
149        for (_key, value) in map.iter() {
150            let info = value.lock().await;
151            result.push((*info).clone());
152        }
153        result
154    }
155
156    pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
157        crate::info!(
158            "remove_forward task_string:{}, direction:{}",
159            task_string,
160            forward_direction
161        );
162        let instance = Self::get_instance();
163        let map = instance.lock().await;
164        let mut remove_key = String::new();
165        let prefix = if forward_direction {
166            "1|".to_string()
167        } else {
168            "0|".to_string()
169        };
170        let mut task_string1 = prefix;
171        task_string1.push_str(task_string.as_str());
172        for (key, value) in map.iter() {
173            let info = value.lock().await;
174            if info.task_string.contains(&task_string1)
175                && info.forward_direction == forward_direction
176            {
177                remove_key = (*key.clone()).to_string();
178                break;
179            }
180        }
181        drop(map);
182        if remove_key.is_empty() {
183            return false;
184        }
185
186        let mut map = instance.lock().await;
187        let result = map.remove(&remove_key);
188        result.is_some()
189    }
190}
191
192type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
193type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
194pub struct TcpWriteStreamMap {}
195impl TcpWriteStreamMap {
196    fn get_instance() -> TcpWriterMap_ {
197        static mut TCP_MAP: Option<TcpWriterMap_> = None;
198        unsafe {
199            TCP_MAP
200                .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
201                .clone()
202        }
203    }
204    #[allow(unused)]
205    async fn put(id: u32, wr: SplitWriteHalf) {
206        let instance = Self::get_instance();
207        let mut map = instance.write().await;
208        let arc_wr = Arc::new(Mutex::new(wr));
209        map.insert(id, arc_wr);
210    }
211    #[allow(unused)]
212    async fn write(id: u32, data: Vec<u8>) -> bool {
213        let arc_map = Self::get_instance();
214        let map = arc_map.write().await;
215        let Some(arc_wr) = map.get(&id) else {
216            crate::error!("TcpWriteStreamMap failed to get id {:#?}", id);
217            return false;
218        };
219        let mut wr = arc_wr.lock().await;
220        let write_result = wr.write_all(data.as_slice()).await;
221        if write_result.is_err() {
222            crate::error!("TcpWriteStreamMap write_all error. id = {:#?}", id);
223        }
224        true
225    }
226
227    pub async fn end(id: u32) {
228        let instance = Self::get_instance();
229        let mut map = instance.write().await;
230        if let Some(arc_wr) = map.remove(&id) {
231            let mut wr = arc_wr.lock().await;
232            let _ = wr.shutdown().await;
233        }
234    }
235}
236
237type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
238type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
239pub struct TcpListenerMap {}
240impl TcpListenerMap {
241    fn get_instance() -> TcpListenerMap_ {
242        static mut TCP_MAP: Option<TcpListenerMap_> = None;
243        unsafe {
244            TCP_MAP
245                .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
246                .clone()
247        }
248    }
249    #[allow(unused)]
250    async fn put(id: u32, listener: JoinHandle<()>) {
251        let instance = Self::get_instance();
252        let mut map = instance.write().await;
253        let arc_listener = Arc::new(Mutex::new(listener));
254        map.insert(id, arc_listener);
255        crate::info!("forward tcp put listener id = {id}");
256    }
257
258    pub async fn end(id: u32) {
259        let instance = Self::get_instance();
260        let mut map = instance.write().await;
261        if let Some(arc_listener) = map.remove(&id) {
262            let join_handle = arc_listener.lock().await;
263            join_handle.cancel();
264        }
265    }
266}
267
268type MapContextForward_ = Arc<Mutex<HashMap<u32, ContextForward>>>;
269pub struct ForwardContextMap {}
270impl ForwardContextMap {
271    fn get_instance() -> &'static MapContextForward_ {
272        static mut CONTEXT_MAP: MaybeUninit<MapContextForward_> = MaybeUninit::uninit();
273        static ONCE: Once = Once::new();
274        unsafe {
275            ONCE.call_once(|| {
276                    CONTEXT_MAP = MaybeUninit::new(Arc::new(Mutex::new(HashMap::new())));
277                }
278            );
279            &*CONTEXT_MAP.as_ptr()
280        }
281    }
282
283    pub async fn update(cid: u32, value: ContextForward) {
284        let map = Self::get_instance();
285        let mut map = map.lock().await;
286        map.insert(cid, value.clone());
287    }
288
289    pub async fn remove(cid: u32) {
290        crate::info!("ContextForward remove, cid:{}", cid);
291        let map = Self::get_instance();
292        let mut map = map.lock().await;
293        let _ = map.remove(&cid);
294    }
295
296    pub async fn get(cid: u32) -> Option<ContextForward> {
297        let arc = Self::get_instance();
298        let map = arc.lock().await;
299        let task = map.get(&cid);
300        match task {
301            Some(task) => Some(task.clone()),
302            None => {
303                crate::debug!("ContextForward result:is none,cid={:#?}", cid,);
304                Option::None
305            }
306        }
307    }
308}
309
310type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
311pub struct ForwardTaskMap {}
312impl ForwardTaskMap {
313    fn get_instance() -> MapForward_ {
314        static mut FORWARD_MAP: Option<MapForward_> = None;
315        unsafe {
316            FORWARD_MAP
317                .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
318                .clone()
319        }
320    }
321
322    pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
323        let map = Self::get_instance();
324        let mut map = map.lock().await;
325        map.insert((session_id, channel_id), value.clone());
326    }
327
328    pub async fn remove(session_id: u32, channel_id: u32) {
329        crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
330        let map = Self::get_instance();
331        let mut map = map.lock().await;
332        let _ = map.remove(&(session_id, channel_id));
333    }
334
335    pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
336        let arc = Self::get_instance();
337        let map = arc.lock().await;
338        let task = map.get(&(session_id, channel_id));
339        match task {
340            Some(task) => Some(task.clone()),
341            None => {
342                crate::debug!(
343                    "Forward TaskMap result:is none,session_id={:#?}, channel_id={:#?}",
344                    session_id,
345                    channel_id
346                );
347                Option::None
348            }
349        }
350    }
351
352    pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
353        let arc = Self::get_instance();
354        let map = arc.lock().await;
355        for ((_session_id, _channel_id), value) in map.iter() {
356            if *_session_id == session_id && task_string.contains(&value.task_command) {
357                return Some(*_channel_id);
358            }
359        }
360        None
361    }
362
363    pub async fn clear(session_id: u32) {
364        let arc = Self::get_instance();
365        let mut channel_list = Vec::new();
366        {
367            let map = arc.lock().await;
368            if map.is_empty() {
369                return;
370            }
371            for (&key, _) in map.iter() {
372                if key.0 == session_id {
373                    let id = key;
374                    channel_list.push(id);
375                }
376            }
377        }
378        for id in channel_list {
379            free_channel_task(id.0, id.1).await;
380        }
381    }
382
383    pub async fn dump_task() -> String {
384        let arc = Self::get_instance();
385        let map = arc.lock().await;
386        let mut result = String::new();
387        for (_id, forward_task) in map.iter() {
388            let forward_type = match forward_task.remote_args.len() {
389                0 => "fport".to_string(),
390                2 => "rport".to_string(),
391                _ => "unknown".to_string(),
392            };
393            let first_args = match forward_task.remote_args.len() {
394                0 => "unknown".to_string(),
395                2 => format!(
396                    "{}:{}",
397                    forward_task.local_args[0], forward_task.local_args[1]
398                ),
399                _ => "unknown".to_string(),
400            };
401            let second_args = match forward_task.remote_args.len() {
402                0 => format!(
403                    "{}:{}",
404                    forward_task.local_args[0], forward_task.local_args[1]
405                ),
406                2 => format!(
407                    "{}:{}",
408                    forward_task.remote_args[0], forward_task.remote_args[1]
409                ),
410                _ => "unknown".to_string(),
411            };
412            result.push_str(&format!(
413                "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
414                forward_task.session_id,
415                forward_task.channel_id,
416                forward_type,
417                first_args,
418                second_args
419            ));
420        }
421        result
422    }
423}
424
425pub async fn free_channel_task(session_id: u32, channel_id: u32) {
426    let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
427        return;
428    };
429    let task = &mut task.clone();
430    let cid = task.context_forward.id;
431    crate::info!("free channel context session_id:{session_id}, channel_id:{channel_id}, cid:{cid}");
432
433    match task.forward_type {
434        ForwardType::Tcp => {
435            TcpWriteStreamMap::end(cid).await;
436            TcpListenerMap::end(channel_id).await;
437        }
438        ForwardType::Jdwp | ForwardType::Ark => {
439            TcpWriteStreamMap::end(cid).await;
440            let ret = unsafe { libc::close(task.context_forward.fd) };
441            crate::debug!(
442                "close context_forward fd, ret={}, session_id={}, channel_id={}",
443                ret,
444                session_id,
445                channel_id,
446            );
447            let target_fd_ret = unsafe { libc::close(task.context_forward.target_fd) };
448            crate::debug!(
449                "close context_forward target fd, ret={}, session_id={}, channel_id={}",
450                target_fd_ret,
451                session_id,
452                channel_id,
453            );
454            TcpListenerMap::end(channel_id).await;
455        }
456        ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
457            #[cfg(not(target_os = "windows"))]
458            UdsServer::wrap_close(task.context_forward.fd);
459        }
460        ForwardType::Device => {
461            return;
462        }
463    }
464    ForwardTaskMap::remove(session_id, channel_id).await;
465}
466
467pub async fn stop_task(session_id: u32) {
468    ForwardTaskMap::clear(session_id).await;
469}
470
471pub async fn dump_task() -> String {
472    ForwardTaskMap::dump_task().await
473}
474
475#[derive(Debug, Default, Clone, PartialEq, Eq)]
476pub struct HdcForward {
477    session_id: u32,
478    channel_id: u32,
479    server_or_daemon: bool,
480    local_args: Vec<String>,
481    remote_args: Vec<String>,
482    task_command: String,
483    forward_type: ForwardType,
484    context_forward: ContextForward,
485    pub transfer: HdcTransferBase,
486}
487
488impl HdcForward {
489    pub fn new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self {
490        Self {
491            session_id,
492            channel_id,
493            server_or_daemon,
494            local_args: Default::default(),
495            remote_args: Default::default(),
496            task_command: Default::default(),
497            forward_type: Default::default(),
498            context_forward: Default::default(),
499            transfer: HdcTransferBase::new(session_id, channel_id),
500        }
501    }
502}
503
504pub fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
505    crate::info!("check cmd args is : {:#?}", value);
506    if !value.contains(':') {
507        return false;
508    }
509    let array = value.split(':').collect::<Vec<&str>>();
510    if array[0] == "tcp" {
511        if array[1].len() > config::MAX_PORT_LEN {
512            crate::error!(
513                "forward port = {:#?} it'slength is wrong, can not more than five",
514                array[1]
515            );
516            return false;
517        }
518
519        match array[1].parse::<u32>() {
520            Ok(port) => {
521                if port == 0 || port > config::MAX_PORT_NUM {
522                    crate::error!("port can not greater than: 65535");
523                    return false;
524                }
525            }
526            Err(_) => {
527                crate::error!("port must is int type, port is: {:#?}", array[1]);
528                return false;
529            }
530        }
531    }
532    for item in array.iter() {
533        arg.push(String::from(item.to_owned()));
534    }
535    true
536}
537
538#[cfg(feature = "host")]
539pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
540    crate::info!("on_forward_success");
541    let channel_id = task_message.channel_id;
542    let payload = task_message.payload;
543    let forward_direction = payload[0] == b'1';
544    let connect_key = "unknow key".to_string();
545    let task_string = String::from_utf8(payload);
546    match task_string {
547        Ok(task_string) => {
548            let info = HdcForwardInfo::new(
549                session_id,
550                channel_id,
551                forward_direction,
552                task_string,
553                connect_key,
554            );
555            HdcForwardInfoMap::put(info).await;
556        }
557        Err(err) => {
558            crate::error!("payload to String failed. {err}");
559        }
560    }
561    transfer::TcpMap::end(task_message.channel_id).await;
562    Ok(())
563}
564
565pub async fn check_command(
566    ctx: &mut ContextForward,
567    _payload: &[u8],
568    server_or_daemon: bool,
569) -> bool {
570    let channel_id = ctx.channel_id;
571    if !_payload.is_empty() {
572        hdctransfer::echo_client(
573            ctx.session_id,
574            channel_id,
575            "Forwardport result:OK",
576            MessageLevel::Ok,
577        )
578        .await;
579        let map_info = String::from(if server_or_daemon { "1|" } else { "0|" }) + &ctx.task_command;
580
581        let mut command_string = vec![0_u8; map_info.len() + 1];
582        map_info
583            .as_bytes()
584            .to_vec()
585            .iter()
586            .enumerate()
587            .for_each(|(i, e)| {
588                command_string[i] = *e;
589            });
590        let forward_success_message = TaskMessage {
591            channel_id,
592            command: HdcCommand::ForwardSuccess,
593            payload: command_string,
594        };
595        #[cfg(feature = "host")]
596        {
597            let _ = on_forward_success(forward_success_message, ctx.session_id).await;
598        }
599        #[cfg(not(feature = "host"))]
600        {
601            transfer::put(ctx.session_id, forward_success_message).await;
602        }
603    } else {
604        hdctransfer::echo_client(
605            ctx.session_id,
606            channel_id,
607            "Forwardport result: Failed",
608            MessageLevel::Fail,
609        )
610        .await;
611        free_context(ctx.id, false).await;
612        return false;
613    }
614    true
615}
616
617pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool {
618    let type_str = &ctx_point.local_args[0];
619    match type_str.as_str() {
620        "tcp" => {
621            ctx_point.forward_type = ForwardType::Tcp;
622        }
623        "dev" => {
624            ctx_point.forward_type = ForwardType::Device;
625        }
626        "localabstract" => {
627            ctx_point.forward_type = ForwardType::Abstract;
628        }
629        "localfilesystem" => {
630            ctx_point.local_args[1] =
631                HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
632            ctx_point.forward_type = ForwardType::FileSystem;
633        }
634        "jdwp" => {
635            ctx_point.forward_type = ForwardType::Jdwp;
636        }
637        "ark" => {
638            ctx_point.forward_type = ForwardType::Ark;
639        }
640        "localreserved" => {
641            ctx_point.local_args[1] =
642                FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
643            ctx_point.forward_type = ForwardType::Reserved;
644        }
645        _ => {
646            crate::error!("this forward type may is not expected");
647            return false;
648        }
649    }
650    true
651}
652
653pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> {
654    let saddr = format!("127.0.0.1:{}", port);
655    let session_tmp = ctx.session_id;
656    let channel_tmp = ctx.channel_id;
657    let remote_parameters = ctx.remote_parameters.clone();
658    let ctx_type = ctx.forward_type.clone();
659
660    let result = TcpListener::bind(saddr.clone()).await;
661    match result {
662        Ok(listener) => {
663            crate::info!("forward tcp accept bind ok, addr:{:#?}", saddr);
664            let join_handle = utils::spawn(async move {
665                loop {
666                    let (stream, _addr) = match listener.accept().await {
667                        Ok((stream, _addr)) => (stream, _addr),
668                        Err(err) => {
669                            crate::error!("listener accept failed, {err}");
670                            continue;
671                        }
672                    };
673                    let (rd, wr) = stream.into_split();
674                    let mut client_context = malloc_context(session_tmp, channel_tmp, true).await;
675                    client_context.remote_parameters = remote_parameters.clone();
676                    client_context.forward_type = ctx_type.clone();
677                    ForwardContextMap::update(client_context.id, client_context.clone()).await;
678                    crate::info!("malloc client_context id = {:#?}", client_context.id);
679                    TcpWriteStreamMap::put(client_context.id, wr).await;
680                    on_accept(client_context.id).await;
681                    utils::spawn(async move {
682                        recv_tcp_msg(session_tmp, channel_tmp, rd, client_context.id).await;
683                    });
684                }
685            });
686            TcpListenerMap::put(channel_tmp, join_handle).await;
687            Ok(())
688        }
689        Err(e) => {
690            crate::error!("forward_ tcp_accept fail:{:#?}", e);
691            ctx.last_error = format!("TCP Port listen failed at {}", port);
692            Err(e)
693        }
694    }
695}
696
697pub async fn on_accept(cid: u32) {
698    let Some(context_forward) = ForwardContextMap::get(cid).await else {
699        crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
700        return;
701    };
702    let ctx = &mut context_forward.clone();
703
704    let buf_string: Vec<u8> = ctx.remote_parameters.clone().as_bytes().to_vec();
705    let mut new_buf = vec![0_u8; buf_string.len() + 9];
706
707    buf_string.iter().enumerate().for_each(|(i, e)| {
708        new_buf[i + 8] = *e;
709    });
710    send_to_task(
711        ctx.session_id,
712        ctx.channel_id,
713        HdcCommand::ForwardActiveSlave,
714        &new_buf,
715        buf_string.len() + 9,
716        ctx.id,
717    )
718    .await;
719}
720
721pub async fn daemon_connect_tcp(cid: u32, port: u32) {
722    let Some(context_forward) = ForwardContextMap::get(cid).await else {
723        crate::error!("daemon connect tcp get context is none cid={cid}");
724        return;
725    };
726    let ctx = &mut context_forward.clone();
727
728    let saddr = format!("127.0.0.1:{}", port);
729    let stream = match TcpStream::connect(saddr).await {
730        Err(err) => {
731            crate::error!("TcpStream::stream failed {:?}", err);
732            free_context(cid, true).await;
733            return;
734        }
735        Ok(addr) => addr,
736    };
737    send_active_master(ctx).await;
738    let (rd, wr) = stream.into_split();
739    TcpWriteStreamMap::put(ctx.id, wr).await;
740    ForwardContextMap::update(ctx.id, ctx.clone()).await;
741
742    let session_tmp = ctx.session_id;
743    let channel_tmp = ctx.channel_id;
744    update_context_to_task(session_tmp, channel_tmp, ctx).await;
745    utils::spawn(async move {
746        recv_tcp_msg(session_tmp, channel_tmp, rd, cid).await;
747    });
748}
749
750pub async fn update_context_to_task(session_id: u32, channel_id:u32, ctx: &mut ContextForward) {
751    let Some(task) = ForwardTaskMap::get(ctx.session_id, ctx.channel_id).await else {
752        crate::error!(
753            "update context to task is none session_id={:#?},channel_id={:#?}",
754            session_id,
755            channel_id
756        );
757        return;
758    };
759    let task = &mut task.clone();
760    task.context_forward = ctx.clone();
761    ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
762}
763
764pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
765    let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
766    loop {
767        match rd.read(&mut data).await {
768            Ok(recv_size) => {
769                if recv_size == 0 {
770                    crate::info!("recv_size is 0, tcp temporarily closed");
771                    free_context(cid, true).await;
772                    return;
773                }
774                send_to_task(
775                    session_id,
776                    channel_id,
777                    HdcCommand::ForwardData,
778                    &data[0..recv_size],
779                    recv_size,
780                    cid,
781                )
782                .await;
783            }
784            Err(_e) => {
785                free_context(cid, true).await;
786                crate::error!(
787                    "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
788                );
789            }
790        }
791    }
792}
793
794#[cfg(not(target_os = "windows"))]
795pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32) {
796    loop {
797        let result = ylong_runtime::spawn_blocking(move || {
798            let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
799            let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
800            (recv_size, buffer)
801        })
802        .await;
803        let (recv_size, buffer) = match result {
804            Ok((recv_size, _)) if recv_size < 0 => {
805                crate::error!("local abstract close shutdown fd = {fd}");
806                free_context(cid, true).await;
807                return;
808            }
809            Ok((recv_size, buffer)) => (recv_size, buffer),
810            Err(err) => {
811                crate::error!("read socket msg failed. {err}");
812                free_context(cid, true).await;
813                return;
814            }
815        };
816        send_to_task(
817            session_id,
818            channel_id,
819            HdcCommand::ForwardData,
820            &buffer[0..recv_size as usize],
821            recv_size as usize,
822            cid,
823        )
824        .await;
825    }
826}
827
828pub async fn free_context(cid: u32, notify_remote: bool) {
829    let Some(context_forward) = ForwardContextMap::get(cid).await else {
830        crate::error!("free forward context get cid  is none. cid = {cid}");
831        return;
832    };
833    let ctx = &mut context_forward.clone();
834
835    if notify_remote {
836        let vec_none = Vec::<u8>::new();
837        send_to_task(
838            ctx.session_id,
839            ctx.channel_id,
840            HdcCommand::ForwardFreeContext,
841            &vec_none,
842            0,
843            ctx.id,
844        )
845        .await;
846    }
847    crate::error!("begin to free forward context cid. cid = {cid}");
848    match ctx.forward_type {
849        ForwardType::Tcp => {
850            TcpWriteStreamMap::end(ctx.id).await;
851        }
852        ForwardType::Jdwp | ForwardType::Ark => {
853            TcpWriteStreamMap::end(ctx.id).await;
854            let ret = unsafe { libc::close(ctx.fd) };
855            crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,);
856            let target_fd_ret = unsafe { libc::close(ctx.target_fd) };
857            crate::debug!(
858                "close context_forward target fd, ret={}, id={}",
859                target_fd_ret,
860                ctx.id,
861            );
862        }
863        ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
864            crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd);
865            #[cfg(not(target_os = "windows"))]
866            UdsServer::wrap_close(ctx.fd);
867            ctx.fd = -1;
868        }
869        ForwardType::Device => {}
870    }
871    ForwardContextMap::remove(cid).await;
872}
873
874pub async fn setup_tcp_point(ctx: &mut ContextForward) -> bool {
875    let Ok(port) = ctx.local_args[1].parse::<u32>() else {
876        crate::error!("setup tcp point parse error");
877        return false;
878    };
879    let cid = ctx.id;
880    if ctx.is_master {
881        let result = forward_tcp_accept(ctx, port).await;
882        return result.is_ok();
883    } else {
884        ForwardContextMap::update(ctx.id, ctx.clone()).await;
885        utils::spawn(async move { daemon_connect_tcp(cid, port).await });
886    }
887    true
888}
889
890#[cfg(not(target_os = "windows"))]
891async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool {
892    let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
893    ctx.fd = fd;
894    let name: Vec<u8> = path.as_bytes().to_vec();
895    let mut socket_name = vec![0_u8; name.len() + 1];
896    socket_name[0] = b'\0';
897    name.iter().enumerate().for_each(|(i, e)| {
898        socket_name[i + 1] = *e;
899    });
900    let addr = UdsAddr::parse_abstract(&socket_name[1..]);
901    ForwardContextMap::update(ctx.id, ctx.clone()).await;
902    let cid = ctx.id;
903    if let Ok(addr_obj) = &addr {
904        let ret = UdsServer::wrap_bind(fd, addr_obj);
905        if ret.is_err() {
906            hdctransfer::echo_client(
907                ctx.session_id,
908                ctx.channel_id,
909                "Unix pipe bind failed",
910                MessageLevel::Fail,
911            )
912            .await;
913            crate::error!("bind fail");
914            return false;
915        }
916        let ret = UdsServer::wrap_listen(fd);
917        if ret < 0 {
918            hdctransfer::echo_client(
919                ctx.session_id,
920                ctx.channel_id,
921                "Unix pipe listen failed",
922                MessageLevel::Fail,
923            )
924            .await;
925            crate::error!("listen fail");
926            return false;
927        }
928        utils::spawn(async move {
929            loop {
930                let client_fd = UdsServer::wrap_accept(fd);
931                if client_fd == -1 {
932                    break;
933                }
934                on_accept(cid).await;
935            }
936        });
937    }
938    true
939}
940
941pub async fn canonicalize(path: String) -> Result<String, Error> {
942    match fs::canonicalize(path) {
943        Ok(abs_path) => match abs_path.to_str() {
944            Some(path) => Ok(path.to_string()),
945            None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
946        },
947        Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
948    }
949}
950
951#[cfg(target_os = "windows")]
952pub async fn setup_device_point(_ctx: &mut ContextForward) -> bool {
953    false
954}
955
956#[cfg(not(target_os = "windows"))]
957pub async fn setup_device_point(ctx: &mut ContextForward) -> bool {
958    let s_node_cfg = ctx.local_args[1].clone();
959    let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
960        crate::error!("Open unix-dev failed");
961        return false;
962    };
963    ctx.dev_path = resolv_path.clone();
964    crate::info!("setup_ device_point resolv_path={:?}", resolv_path);
965    let thread_path_ref = Arc::new(Mutex::new(resolv_path));
966    if !send_active_master(ctx).await {
967        crate::error!("send active_master return failed ctx={:?}", ctx);
968        return false;
969    }
970    let session = ctx.session_id;
971    let channel = ctx.channel_id;
972    let cid = ctx.id;
973
974    ForwardContextMap::update(ctx.id, ctx.clone()).await;
975    utils::spawn(async move {
976        loop {
977            let path = thread_path_ref.lock().await;
978            let Ok(mut file) = File::open(&*path) else {
979                crate::error!("open {} failed.", *path);
980                break;
981            };
982            let mut total = Vec::new();
983            let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
984                [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
985            let Ok(read_len) = file.read(&mut buf[4..]) else {
986                crate::error!("read {} failed.", *path);
987                break;
988            };
989            if read_len == 0 {
990                free_context(cid, true).await;
991                break;
992            }
993            total.append(&mut buf[0..read_len].to_vec());
994            send_to_task(
995                session,
996                channel,
997                HdcCommand::ForwardData,
998                &total,
999                read_len,
1000                cid,
1001            )
1002            .await;
1003        }
1004    });
1005    true
1006}
1007
1008#[cfg(not(feature = "host"))]
1009fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
1010    match forward_type == ForwardType::Jdwp {
1011        true => parameter.parse::<u32>().unwrap_or_else(|e| {
1012            crate::error!("Jdwp get pid err :{:?}", e);
1013            0_u32
1014        }),
1015        false => {
1016            let params: Vec<&str> = parameter.split('@').collect();
1017            params[0].parse::<u32>().unwrap_or_else(|e| {
1018                crate::error!("get pid err :{:?}", e);
1019                0_u32
1020            })
1021        }
1022    }
1023}
1024
1025#[cfg(feature = "host")]
1026pub async fn setup_jdwp_point(_ctx: &mut ContextForward) -> bool {
1027    crate::info!("host not setup_jdwp _point");
1028    false
1029}
1030
1031#[cfg(not(feature = "host"))]
1032pub async fn setup_jdwp_point(ctx: &mut ContextForward) -> bool {
1033    let local_args = ctx.local_args[1].clone();
1034    let parameter = local_args.as_str();
1035    let style = &ctx.forward_type;
1036    let pid = get_pid(parameter, style.clone());
1037    let cid = ctx.id;
1038    let session = ctx.session_id;
1039    let channel = ctx.channel_id;
1040    if pid == 0 {
1041        crate::error!("setup jdwp point get pid is 0");
1042        return false;
1043    }
1044
1045    let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1046    if result.is_err() {
1047        crate::error!("wrap socketpair failed");
1048        return false;
1049    }
1050    let mut target_fd = 0;
1051    let mut local_fd = 0;
1052    if let Ok((fd0, fd1)) = result {
1053        crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1054        local_fd = fd0;
1055        target_fd = fd1;
1056        ctx.fd = local_fd;
1057        ctx.target_fd = target_fd;
1058        target_fd = fd1;
1059    }
1060
1061    utils::spawn(async move {
1062        loop {
1063            let result = ylong_runtime::spawn_blocking(move || {
1064                let mut buffer = [0u8; SOCKET_BUFFER_SIZE];
1065                let size = UdsServer::wrap_read(local_fd, &mut buffer);
1066                (size, buffer)
1067            })
1068            .await;
1069            let (size, buffer) = match result {
1070                Ok((size, _)) if size < 0 => {
1071                    crate::error!("disconnect fd:({local_fd}, {target_fd}), error:{:?}", size);
1072                    free_context(cid, true).await;
1073                    break;
1074                }
1075                Ok((0, _)) => {
1076                    ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1077                    continue;
1078                }
1079                Ok((size, buffer)) => (size, buffer),
1080                Err(err) => {
1081                    crate::error!("spawn_blocking failed. disconnect fd:({local_fd}, {target_fd}), error:{err}");
1082                    free_context(cid, true).await;
1083                    break;
1084                }
1085            };
1086            send_to_task(
1087                session,
1088                channel,
1089                HdcCommand::ForwardData,
1090                &buffer[0..size as usize],
1091                size as usize,
1092                cid,
1093            )
1094            .await;
1095        }
1096    });
1097
1098    let jdwp = Jdwp::get_instance();
1099    let mut param = ctx.local_args[0].clone();
1100    param.push(':');
1101    param.push_str(parameter);
1102
1103    let ret = jdwp
1104        .send_fd_to_target(pid, target_fd, local_fd, param.as_str())
1105        .await;
1106    if !ret {
1107        crate::error!("not found pid:{:?}", pid);
1108        hdctransfer::echo_client(
1109            session,
1110            channel,
1111            format!("fport fail:pid not found:{}", pid).as_str(),
1112            MessageLevel::Fail,
1113        )
1114        .await;
1115        task_finish(session, channel).await;
1116        return false;
1117    }
1118
1119    let vec_none = Vec::<u8>::new();
1120    send_to_task(
1121        session,
1122        channel,
1123        HdcCommand::ForwardActiveMaster, // 04
1124        &vec_none,
1125        0,
1126        cid,
1127    )
1128    .await;
1129    crate::info!("setup_jdwp_ point return true");
1130    true
1131}
1132
1133async fn task_finish(session_id: u32, channel_id: u32) {
1134    transfer_task_finish(channel_id, session_id).await;
1135}
1136
1137#[cfg(not(target_os = "windows"))]
1138pub async fn daemon_connect_pipe(ctx: &mut ContextForward) {
1139    let name: Vec<u8> = ctx.local_args[1].clone().as_bytes().to_vec();
1140    let mut socket_name = vec![0_u8; name.len() + 1];
1141    socket_name[0] = b'\0';
1142    name.iter().enumerate().for_each(|(i, e)| {
1143        socket_name[i + 1] = *e;
1144    });
1145    let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1146    if let Ok(addr_obj) = &addr {
1147        let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj);
1148        if ret.is_err() {
1149            hdctransfer::echo_client(
1150                ctx.session_id,
1151                ctx.channel_id,
1152                "localabstract connect fail",
1153                MessageLevel::Fail,
1154            )
1155            .await;
1156            free_context(ctx.id, true).await;
1157            return;
1158        }
1159        send_active_master(ctx).await;
1160        read_data_to_forward(ctx).await;
1161    }
1162}
1163
1164#[cfg(target_os = "windows")]
1165pub async fn setup_file_point(_ctx: &mut ContextForward) -> bool {
1166    false
1167}
1168
1169#[cfg(not(target_os = "windows"))]
1170pub async fn setup_file_point(ctx: &mut ContextForward) -> bool {
1171    let s_node_cfg = ctx.local_args[1].clone();
1172    if ctx.is_master {
1173        if ctx.forward_type == ForwardType::Reserved || ctx.forward_type == ForwardType::FileSystem
1174        {
1175            let _ = fs::remove_file(s_node_cfg.clone());
1176        }
1177        if !server_socket_bind_listen(ctx, s_node_cfg).await {
1178            crate::error!("server socket bind listen failed id={:?}", ctx.id);
1179            task_finish(ctx.session_id, ctx.channel_id).await;
1180            return false;
1181        }
1182    } else {
1183        if ctx.fd <= 0 {
1184            crate::info!("setup_file _point fd: {:?}", ctx.fd);
1185            if ctx.forward_type == ForwardType::Abstract {
1186                ctx.fd = UdsClient::wrap_socket(AF_LOCAL);
1187                unsafe {
1188                    libc::fcntl(ctx.fd, F_SETFD, FD_CLOEXEC);
1189                }
1190            } else {
1191                ctx.fd = UdsClient::wrap_socket(AF_UNIX);
1192            }
1193        }
1194        ForwardContextMap::update(ctx.id, ctx.clone()).await;
1195        daemon_connect_pipe(ctx).await;
1196    }
1197    true
1198}
1199
1200pub async fn setup_point(ctx: &mut ContextForward) -> bool {
1201    if !detech_forward_type(ctx).await {
1202        crate::error!("forward type is not true");
1203        return false;
1204    }
1205
1206    if cfg!(target_os = "windows") && ctx.forward_type != ForwardType::Tcp {
1207        ctx.last_error = String::from("Not support forward-type");
1208        return false;
1209    }
1210
1211    let mut ret = false;
1212    match ctx.forward_type {
1213        ForwardType::Tcp => {
1214            ret = setup_tcp_point(ctx).await;
1215        }
1216        ForwardType::Device => {
1217            if !cfg!(target_os = "windows") {
1218                ret = setup_device_point(ctx).await;
1219            }
1220        }
1221        ForwardType::Jdwp | ForwardType::Ark => {
1222            crate::info!("setup point ark case");
1223            if !cfg!(feature = "host") {
1224                ret = setup_jdwp_point(ctx).await;
1225            }
1226        }
1227        ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1228            if !cfg!(target_os = "windows") {
1229                ret = setup_file_point(ctx).await;
1230            }
1231        }
1232    };
1233    ForwardContextMap::update(ctx.id, ctx.clone()).await;
1234    update_context_to_task(ctx.session_id, ctx.channel_id, ctx).await;
1235    ret
1236}
1237
1238pub async fn send_to_task(
1239    session_id: u32,
1240    channel_id: u32,
1241    command: HdcCommand,
1242    buf_ptr: &[u8],
1243    buf_size: usize,
1244    cid: u32,
1245) -> bool {
1246    if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1247        crate::error!("send task buf_size oversize");
1248        return false;
1249    }
1250
1251    let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1252    new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1253    let file_check_message = TaskMessage {
1254        channel_id,
1255        command,
1256        payload: new_buf,
1257    };
1258    transfer::put(session_id, file_check_message).await;
1259    true
1260}
1261
1262pub fn get_cid(_payload: &[u8]) -> u32 {
1263    let mut id_bytes = [0u8; 4];
1264    id_bytes.copy_from_slice(&_payload[0..4]);
1265    let id: u32 = u32::from_be_bytes(id_bytes);
1266    id
1267}
1268
1269pub async fn send_active_master(ctx: &mut ContextForward) -> bool {
1270    if ctx.check_order {
1271        let flag = [0u8; 1];
1272        send_to_task(
1273            ctx.session_id,
1274            ctx.channel_id,
1275            HdcCommand::ForwardCheckResult,
1276            &flag,
1277            1,
1278            ctx.id,
1279        )
1280        .await;
1281        free_context(ctx.id, false).await;
1282        return true;
1283    }
1284    if !send_to_task(
1285        ctx.session_id,
1286        ctx.channel_id,
1287        HdcCommand::ForwardActiveMaster,
1288        &Vec::<u8>::new(),
1289        0,
1290        ctx.id,
1291    )
1292    .await
1293    {
1294        free_context(ctx.id, true).await;
1295        return false;
1296    }
1297    true
1298}
1299
1300pub fn forward_parse_cmd(context_forward: &mut ContextForward) -> bool {
1301    let command = context_forward.task_command.clone();
1302    let result = Base::split_command_to_args(&command);
1303    let argv = result.0;
1304    let argc = result.1;
1305
1306    if argc < ARG_COUNT2 {
1307        crate::error!("argc < 2 parse is failed.");
1308        context_forward.last_error = "Too few arguments.".to_string();
1309        return false;
1310    }
1311    if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1312        crate::error!("parse's length is flase.");
1313        context_forward.last_error = "Some argument too long.".to_string();
1314        return false;
1315    }
1316    if !check_node_info(&argv[0], &mut context_forward.local_args) {
1317        crate::error!("check argv[0] node info is flase.");
1318        context_forward.last_error = "Arguments parsing failed.".to_string();
1319        return false;
1320    }
1321    if !check_node_info(&argv[1], &mut context_forward.remote_args) {
1322        crate::error!("check argv[1] node info is flase.");
1323        context_forward.last_error = "Arguments parsing failed.".to_string();
1324        return false;
1325    }
1326    context_forward.remote_parameters = argv[1].clone();
1327    true
1328}
1329
1330pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1331    let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1332        crate::error!("begin forward get task is none session_id={session_id},channel_id={channel_id}"
1333        );
1334        return false;
1335    };
1336    let task: &mut HdcForward = &mut task.clone();
1337    let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1338        crate::error!("cmd argv is not int utf8");
1339        return false;
1340    };
1341    let mut context_forward = malloc_context(session_id, channel_id, true).await;
1342    context_forward.task_command = command.clone();
1343
1344    if !forward_parse_cmd(&mut context_forward) {
1345        task.context_forward = context_forward.clone();
1346        ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1347        return false;
1348    }
1349    if !setup_point(&mut context_forward).await {
1350        task.context_forward = context_forward.clone();
1351        ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1352        return false;
1353    }
1354
1355    let wake_up_message = TaskMessage {
1356        channel_id,
1357        command: HdcCommand::KernelWakeupSlavetask,
1358        payload: Vec::<u8>::new(),
1359    };
1360    transfer::put(context_forward.session_id, wake_up_message).await;
1361
1362    let buf_string: Vec<u8> = context_forward.remote_parameters.as_bytes().to_vec();
1363    let mut new_buf = vec![0_u8; buf_string.len() + 9];
1364    buf_string.iter().enumerate().for_each(|(i, e)| {
1365        new_buf[i + 8] = *e;
1366    });
1367    send_to_task(
1368        context_forward.session_id,
1369        context_forward.channel_id,
1370        HdcCommand::ForwardCheck,
1371        &new_buf,
1372        buf_string.len() + 9,
1373        context_forward.id,
1374    )
1375    .await;
1376    true
1377}
1378
1379pub async fn slave_connect(
1380    session_id: u32,
1381    channel_id: u32,
1382    payload: &[u8],
1383    check_order: bool,
1384) -> bool {
1385    let mut context_forward = malloc_context(session_id, channel_id, false).await;
1386    context_forward.check_order = check_order;
1387    if let Ok((content, id)) = filter_command(payload) {
1388        let content = &content[8..].trim_end_matches('\0').to_string();
1389        context_forward.task_command = content.clone();
1390        if !check_node_info(content, &mut context_forward.local_args) {
1391            crate::error!("check local args is false");
1392            return false;
1393        }
1394        context_forward.id = id;
1395        ForwardContextMap::update(id, context_forward.clone()).await;
1396    }
1397
1398    if !check_order {
1399        if !setup_point(&mut context_forward).await {
1400            crate::error!("setup point return false, free context");
1401            free_context(context_forward.id, true).await;
1402            ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1403            return false;
1404        }
1405    } else {
1406        send_active_master(&mut context_forward).await;
1407    }
1408    ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1409    true
1410}
1411
1412pub async fn read_data_to_forward(ctx: &mut ContextForward) {
1413    let cid = ctx.id;
1414    let session = ctx.session_id;
1415    let channel = ctx.channel_id;
1416    match ctx.forward_type {
1417        ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1418            #[cfg(not(target_os = "windows"))]
1419            {
1420                let fd_temp = ctx.fd;
1421                utils::spawn(async move {
1422                    deamon_read_socket_msg(session, channel, fd_temp, cid).await
1423                });
1424            }
1425        }
1426        ForwardType::Device => {
1427            #[cfg(not(target_os = "windows"))]
1428            setup_device_point(ctx).await;
1429        }
1430        _ => {}
1431    }
1432}
1433
1434pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1435    let bytes = &_payload[4..];
1436    let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1437    if let Ok(content) = ct {
1438        let mut id_bytes = [0u8; 4];
1439        id_bytes.copy_from_slice(&_payload[0..4]);
1440        let id: u32 = u32::from_be_bytes(id_bytes);
1441        return Ok((content, id));
1442    }
1443    Err(Error::new(ErrorKind::Other, "filter command failure"))
1444}
1445
1446pub async fn dev_write_bufer(path: String, content: Vec<u8>) {
1447    utils::spawn(async move {
1448        let open_result = OpenOptions::new()
1449            .write(true)
1450            .create(true)
1451            .open(path.clone());
1452
1453        match open_result {
1454            Ok(mut file) => {
1455                let write_result = file.write_all(content.as_slice());
1456                match write_result {
1457                    Ok(()) => {}
1458                    Err(e) => {
1459                        crate::error!("dev write bufer to file fail:{:#?}", e);
1460                    }
1461                }
1462            }
1463            Err(e) => {
1464                crate::error!("dev write bufer fail:{:#?}", e);
1465            }
1466        }
1467    });
1468}
1469
1470pub async fn write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool {
1471    if ctx.forward_type == ForwardType::Tcp {
1472        return TcpWriteStreamMap::write(ctx.id, content).await;
1473    } else if ctx.forward_type == ForwardType::Device {
1474        let path_ref = ctx.dev_path.clone();
1475        if path_ref.is_empty() {
1476            crate::error!(
1477                "write_forward_bufer get dev_path  is failed ctx.id = {:#?}",
1478                ctx.id
1479            );
1480            return false;
1481        }
1482        dev_write_bufer(path_ref, content).await;
1483    } else {
1484        #[cfg(not(target_os = "windows"))]
1485        {
1486            let fd = ctx.fd;
1487            if UdsClient::wrap_send(fd, &content) < 0 {
1488                crate::info!("write forward bufer failed. fd = {fd}");
1489                return false;
1490            }
1491        }
1492    }
1493    true
1494}
1495
1496#[allow(unused)]
1497pub async fn malloc_context(
1498    session_id: u32,
1499    channel_id: u32,
1500    master_slave: bool,
1501) -> ContextForward {
1502    let mut ctx = ContextForward {
1503        ..Default::default()
1504    };
1505    ctx.id = utils::get_current_time() as u32;
1506    ctx.session_id = session_id;
1507    ctx.channel_id = channel_id;
1508    ctx.is_master = master_slave;
1509    crate::info!("malloc_context success id = {:#?}", ctx.id);
1510    ForwardContextMap::update(ctx.id, ctx.clone()).await;
1511    ctx
1512}
1513
1514pub async fn forward_command_dispatch(
1515    session_id: u32,
1516    channel_id: u32,
1517    command: HdcCommand,
1518    _payload: &[u8],
1519) -> bool {
1520    let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1521        crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1522        );
1523        return false;
1524    };
1525    let task: &mut HdcForward = &mut task.clone();
1526    let mut ret: bool = true;
1527    let cid = get_cid(_payload);
1528    let send_msg = _payload[4..].to_vec();
1529
1530    let Some(context_forward) = ForwardContextMap::get(cid).await else {
1531        crate::error!("forward command dispatch get context is none cid={cid}");
1532        return false;
1533    };
1534    let ctx = &mut context_forward.clone();
1535    match command {
1536        HdcCommand::ForwardCheckResult => {
1537            ret = check_command(ctx, _payload, task.server_or_daemon).await;
1538        }
1539        HdcCommand::ForwardData => {
1540            ret = write_forward_bufer(ctx, send_msg).await;
1541        }
1542        HdcCommand::ForwardFreeContext => {
1543            free_context(ctx.id, false).await;
1544        }
1545        HdcCommand::ForwardActiveMaster => {
1546            read_data_to_forward(ctx).await;
1547            ret = true;
1548        }
1549        _ => {
1550            ret = false;
1551        }
1552    }
1553    ret
1554}
1555
1556async fn print_error_info(session_id: u32, channel_id: u32) {
1557    let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1558        crate::error!(
1559            "detech_forward_type get task is none session_id = {:#?}, channel_id = {:#?}",
1560            session_id,
1561            channel_id
1562        );
1563        return;
1564    };
1565    let task = &mut task.clone();
1566    let ctx = &task.context_forward;
1567    let mut echo_content = ctx.last_error.clone();
1568
1569    if echo_content.is_empty() {
1570        echo_content = "Forward parament failed".to_string();
1571    }
1572
1573    hdctransfer::echo_client(
1574        session_id,
1575        channel_id,
1576        echo_content.as_str(),
1577        MessageLevel::Fail,
1578    )
1579    .await;
1580}
1581
1582pub async fn command_dispatch(
1583    session_id: u32,
1584    channel_id: u32,
1585    command: HdcCommand,
1586    payload: &[u8],
1587    _payload_size: u16,
1588) -> bool {
1589    if command != HdcCommand::ForwardData {
1590        crate::info!("command_dispatch command recv: {:?}", command);
1591    }
1592
1593    let ret = match command {
1594        HdcCommand::ForwardInit => begin_forward(session_id, channel_id, payload).await,
1595        HdcCommand::ForwardCheck => slave_connect(session_id, channel_id, payload, true).await,
1596        HdcCommand::ForwardActiveSlave => {
1597            slave_connect(session_id, channel_id, payload, false).await
1598        }
1599        _ => forward_command_dispatch(session_id, channel_id, command, payload).await,
1600    };
1601    if !ret {
1602        print_error_info(session_id, channel_id).await;
1603        task_finish(session_id, channel_id).await;
1604        return false;
1605    }
1606    ret
1607}
1608