1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! forward
16 #![allow(missing_docs)]
17 #[cfg(feature = "host")]
18 extern crate ylong_runtime_static as ylong_runtime;
19
20 #[cfg(not(feature = "host"))]
21 use libc::SOCK_STREAM;
22 #[cfg(not(target_os = "windows"))]
23 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24 use std::collections::HashMap;
25 #[cfg(not(target_os = "windows"))]
26 use std::fs::{self, File, OpenOptions};
27 #[cfg(not(target_os = "windows"))]
28 use std::io::{self, Error, ErrorKind, Read, Write};
29 use ylong_runtime::sync::{Mutex, RwLock};
30
31 use crate::common::base::Base;
32 use crate::common::hdctransfer::transfer_task_finish;
33 use crate::common::hdctransfer::{self, HdcTransferBase};
34 #[cfg(not(feature = "host"))]
35 use crate::common::jdwp::Jdwp;
36 #[cfg(not(target_os = "windows"))]
37 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
38 use crate::config::HdcCommand;
39 use crate::config::MessageLevel;
40 use crate::config::TaskMessage;
41 use crate::transfer;
42 #[allow(unused)]
43 use crate::utils::hdc_log::*;
44 use crate::{config, utils};
45 use std::mem::MaybeUninit;
46 use std::sync::Arc;
47 use std::sync::Once;
48 #[cfg(not(feature = "host"))]
49 use std::time::Duration;
50 use ylong_runtime::io::AsyncReadExt;
51 use ylong_runtime::io::AsyncWriteExt;
52 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
53 use ylong_runtime::task::JoinHandle;
54
55 pub const ARG_COUNT2: u32 = 2;
56 pub const BUF_SIZE_SMALL: usize = 256;
57 pub const SOCKET_BUFFER_SIZE: usize = 65535;
58 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
59 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
60
61 #[cfg(feature = "host")]
62 #[derive(Clone, Debug)]
63 pub struct HdcForwardInfo {
64 pub session_id: u32,
65 pub channel_id: u32,
66 pub forward_direction: bool,
67 pub task_string: String,
68 pub connect_key: String,
69 }
70
71 #[cfg(feature = "host")]
72 impl HdcForwardInfo {
newnull73 fn new(
74 session_id: u32,
75 channel_id: u32,
76 forward_direction: bool,
77 task_string: String,
78 connect_key: String,
79 ) -> Self {
80 Self {
81 session_id,
82 channel_id,
83 forward_direction,
84 task_string,
85 connect_key,
86 }
87 }
88 }
89
90 #[derive(Default, Eq, PartialEq, Clone, Debug)]
91 enum ForwardType {
92 #[default]
93 Tcp = 0,
94 Device,
95 Abstract,
96 FileSystem,
97 Jdwp,
98 Ark,
99 Reserved,
100 }
101
102 #[derive(Debug, Default, PartialEq, Eq, Clone)]
103 pub struct ContextForward {
104 session_id: u32,
105 channel_id: u32,
106 check_order: bool,
107 is_master: bool,
108 id: u32,
109 fd: i32,
110 target_fd: i32,
111 forward_type: ForwardType,
112 local_args: Vec<String>,
113 remote_args: Vec<String>,
114 task_command: String,
115 last_error: String,
116 remote_parameters: String,
117 dev_path: String,
118 }
119
120 #[cfg(feature = "host")]
121 type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
122 #[cfg(feature = "host")]
123 type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
124 #[cfg(feature = "host")]
125 pub struct HdcForwardInfoMap {}
126 #[cfg(feature = "host")]
127 impl HdcForwardInfoMap {
get_instancenull128 fn get_instance() -> HdcForwardInfoMap_ {
129 static mut MAP: Option<HdcForwardInfoMap_> = None;
130 unsafe {
131 MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
132 .clone()
133 }
134 }
135
136 async fn put(forward_info: HdcForwardInfo) {
137 let instance = Self::get_instance();
138 let mut map = instance.lock().await;
139 map.insert(
140 forward_info.task_string.clone(),
141 Arc::new(Mutex::new(forward_info)),
142 );
143 }
144
145 pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
146 let instance = Self::get_instance();
147 let map = instance.lock().await;
148 let mut result = Vec::new();
149 for (_key, value) in map.iter() {
150 let info = value.lock().await;
151 result.push((*info).clone());
152 }
153 result
154 }
155
156 pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
157 crate::info!(
158 "remove_forward task_string:{}, direction:{}",
159 task_string,
160 forward_direction
161 );
162 let instance = Self::get_instance();
163 let map = instance.lock().await;
164 let mut remove_key = String::new();
165 let prefix = if forward_direction {
166 "1|".to_string()
167 } else {
168 "0|".to_string()
169 };
170 let mut task_string1 = prefix;
171 task_string1.push_str(task_string.as_str());
172 for (key, value) in map.iter() {
173 let info = value.lock().await;
174 if info.task_string.contains(&task_string1)
175 && info.forward_direction == forward_direction
176 {
177 remove_key = (*key.clone()).to_string();
178 break;
179 }
180 }
181 drop(map);
182 if remove_key.is_empty() {
183 return false;
184 }
185
186 let mut map = instance.lock().await;
187 let result = map.remove(&remove_key);
188 result.is_some()
189 }
190 }
191
192 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
193 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
194 pub struct TcpWriteStreamMap {}
195 impl TcpWriteStreamMap {
get_instancenull196 fn get_instance() -> TcpWriterMap_ {
197 static mut TCP_MAP: Option<TcpWriterMap_> = None;
198 unsafe {
199 TCP_MAP
200 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
201 .clone()
202 }
203 }
204 #[allow(unused)]
205 async fn put(id: u32, wr: SplitWriteHalf) {
206 let instance = Self::get_instance();
207 let mut map = instance.write().await;
208 let arc_wr = Arc::new(Mutex::new(wr));
209 map.insert(id, arc_wr);
210 }
211 #[allow(unused)]
212 async fn write(id: u32, data: Vec<u8>) -> bool {
213 let arc_map = Self::get_instance();
214 let map = arc_map.write().await;
215 let Some(arc_wr) = map.get(&id) else {
216 crate::error!("TcpWriteStreamMap failed to get id {:#?}", id);
217 return false;
218 };
219 let mut wr = arc_wr.lock().await;
220 let write_result = wr.write_all(data.as_slice()).await;
221 if write_result.is_err() {
222 crate::error!("TcpWriteStreamMap write_all error. id = {:#?}", id);
223 }
224 true
225 }
226
227 pub async fn end(id: u32) {
228 let instance = Self::get_instance();
229 let mut map = instance.write().await;
230 if let Some(arc_wr) = map.remove(&id) {
231 let mut wr = arc_wr.lock().await;
232 let _ = wr.shutdown().await;
233 }
234 }
235 }
236
237 type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
238 type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
239 pub struct TcpListenerMap {}
240 impl TcpListenerMap {
get_instancenull241 fn get_instance() -> TcpListenerMap_ {
242 static mut TCP_MAP: Option<TcpListenerMap_> = None;
243 unsafe {
244 TCP_MAP
245 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
246 .clone()
247 }
248 }
249 #[allow(unused)]
250 async fn put(id: u32, listener: JoinHandle<()>) {
251 let instance = Self::get_instance();
252 let mut map = instance.write().await;
253 let arc_listener = Arc::new(Mutex::new(listener));
254 map.insert(id, arc_listener);
255 crate::info!("forward tcp put listener id = {id}");
256 }
257
258 pub async fn end(id: u32) {
259 let instance = Self::get_instance();
260 let mut map = instance.write().await;
261 if let Some(arc_listener) = map.remove(&id) {
262 let join_handle = arc_listener.lock().await;
263 join_handle.cancel();
264 }
265 }
266 }
267
268 type MapContextForward_ = Arc<Mutex<HashMap<u32, ContextForward>>>;
269 pub struct ForwardContextMap {}
270 impl ForwardContextMap {
get_instancenull271 fn get_instance() -> &'static MapContextForward_ {
272 static mut CONTEXT_MAP: MaybeUninit<MapContextForward_> = MaybeUninit::uninit();
273 static ONCE: Once = Once::new();
274 unsafe {
275 ONCE.call_once(|| {
276 CONTEXT_MAP = MaybeUninit::new(Arc::new(Mutex::new(HashMap::new())));
277 }
278 );
279 &*CONTEXT_MAP.as_ptr()
280 }
281 }
282
283 pub async fn update(cid: u32, value: ContextForward) {
284 let map = Self::get_instance();
285 let mut map = map.lock().await;
286 map.insert(cid, value.clone());
287 }
288
289 pub async fn remove(cid: u32) {
290 crate::info!("ContextForward remove, cid:{}", cid);
291 let map = Self::get_instance();
292 let mut map = map.lock().await;
293 let _ = map.remove(&cid);
294 }
295
296 pub async fn get(cid: u32) -> Option<ContextForward> {
297 let arc = Self::get_instance();
298 let map = arc.lock().await;
299 let task = map.get(&cid);
300 match task {
301 Some(task) => Some(task.clone()),
302 None => {
303 crate::debug!("ContextForward result:is none,cid={:#?}", cid,);
304 Option::None
305 }
306 }
307 }
308 }
309
310 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
311 pub struct ForwardTaskMap {}
312 impl ForwardTaskMap {
get_instancenull313 fn get_instance() -> MapForward_ {
314 static mut FORWARD_MAP: Option<MapForward_> = None;
315 unsafe {
316 FORWARD_MAP
317 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
318 .clone()
319 }
320 }
321
322 pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
323 let map = Self::get_instance();
324 let mut map = map.lock().await;
325 map.insert((session_id, channel_id), value.clone());
326 }
327
328 pub async fn remove(session_id: u32, channel_id: u32) {
329 crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
330 let map = Self::get_instance();
331 let mut map = map.lock().await;
332 let _ = map.remove(&(session_id, channel_id));
333 }
334
335 pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
336 let arc = Self::get_instance();
337 let map = arc.lock().await;
338 let task = map.get(&(session_id, channel_id));
339 match task {
340 Some(task) => Some(task.clone()),
341 None => {
342 crate::debug!(
343 "Forward TaskMap result:is none,session_id={:#?}, channel_id={:#?}",
344 session_id,
345 channel_id
346 );
347 Option::None
348 }
349 }
350 }
351
352 pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
353 let arc = Self::get_instance();
354 let map = arc.lock().await;
355 for ((_session_id, _channel_id), value) in map.iter() {
356 if *_session_id == session_id && task_string.contains(&value.task_command) {
357 return Some(*_channel_id);
358 }
359 }
360 None
361 }
362
363 pub async fn clear(session_id: u32) {
364 let arc = Self::get_instance();
365 let mut channel_list = Vec::new();
366 {
367 let map = arc.lock().await;
368 if map.is_empty() {
369 return;
370 }
371 for (&key, _) in map.iter() {
372 if key.0 == session_id {
373 let id = key;
374 channel_list.push(id);
375 }
376 }
377 }
378 for id in channel_list {
379 free_channel_task(id.0, id.1).await;
380 }
381 }
382
383 pub async fn dump_task() -> String {
384 let arc = Self::get_instance();
385 let map = arc.lock().await;
386 let mut result = String::new();
387 for (_id, forward_task) in map.iter() {
388 let forward_type = match forward_task.remote_args.len() {
389 0 => "fport".to_string(),
390 2 => "rport".to_string(),
391 _ => "unknown".to_string(),
392 };
393 let first_args = match forward_task.remote_args.len() {
394 0 => "unknown".to_string(),
395 2 => format!(
396 "{}:{}",
397 forward_task.local_args[0], forward_task.local_args[1]
398 ),
399 _ => "unknown".to_string(),
400 };
401 let second_args = match forward_task.remote_args.len() {
402 0 => format!(
403 "{}:{}",
404 forward_task.local_args[0], forward_task.local_args[1]
405 ),
406 2 => format!(
407 "{}:{}",
408 forward_task.remote_args[0], forward_task.remote_args[1]
409 ),
410 _ => "unknown".to_string(),
411 };
412 result.push_str(&format!(
413 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
414 forward_task.session_id,
415 forward_task.channel_id,
416 forward_type,
417 first_args,
418 second_args
419 ));
420 }
421 result
422 }
423 }
424
425 pub async fn free_channel_task(session_id: u32, channel_id: u32) {
426 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
427 return;
428 };
429 let task = &mut task.clone();
430 let cid = task.context_forward.id;
431 crate::info!("free channel context session_id:{session_id}, channel_id:{channel_id}, cid:{cid}");
432
433 match task.forward_type {
434 ForwardType::Tcp => {
435 TcpWriteStreamMap::end(cid).await;
436 TcpListenerMap::end(channel_id).await;
437 }
438 ForwardType::Jdwp | ForwardType::Ark => {
439 TcpWriteStreamMap::end(cid).await;
440 let ret = unsafe { libc::close(task.context_forward.fd) };
441 crate::debug!(
442 "close context_forward fd, ret={}, session_id={}, channel_id={}",
443 ret,
444 session_id,
445 channel_id,
446 );
447 let target_fd_ret = unsafe { libc::close(task.context_forward.target_fd) };
448 crate::debug!(
449 "close context_forward target fd, ret={}, session_id={}, channel_id={}",
450 target_fd_ret,
451 session_id,
452 channel_id,
453 );
454 TcpListenerMap::end(channel_id).await;
455 }
456 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
457 #[cfg(not(target_os = "windows"))]
458 UdsServer::wrap_close(task.context_forward.fd);
459 }
460 ForwardType::Device => {
461 return;
462 }
463 }
464 ForwardTaskMap::remove(session_id, channel_id).await;
465 }
466
467 pub async fn stop_task(session_id: u32) {
468 ForwardTaskMap::clear(session_id).await;
469 }
470
471 pub async fn dump_task() -> String {
472 ForwardTaskMap::dump_task().await
473 }
474
475 #[derive(Debug, Default, Clone, PartialEq, Eq)]
476 pub struct HdcForward {
477 session_id: u32,
478 channel_id: u32,
479 server_or_daemon: bool,
480 local_args: Vec<String>,
481 remote_args: Vec<String>,
482 task_command: String,
483 forward_type: ForwardType,
484 context_forward: ContextForward,
485 pub transfer: HdcTransferBase,
486 }
487
488 impl HdcForward {
newnull489 pub fn new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self {
490 Self {
491 session_id,
492 channel_id,
493 server_or_daemon,
494 local_args: Default::default(),
495 remote_args: Default::default(),
496 task_command: Default::default(),
497 forward_type: Default::default(),
498 context_forward: Default::default(),
499 transfer: HdcTransferBase::new(session_id, channel_id),
500 }
501 }
502 }
503
check_node_infonull504 pub fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
505 crate::info!("check cmd args is : {:#?}", value);
506 if !value.contains(':') {
507 return false;
508 }
509 let array = value.split(':').collect::<Vec<&str>>();
510 if array[0] == "tcp" {
511 if array[1].len() > config::MAX_PORT_LEN {
512 crate::error!(
513 "forward port = {:#?} it'slength is wrong, can not more than five",
514 array[1]
515 );
516 return false;
517 }
518
519 match array[1].parse::<u32>() {
520 Ok(port) => {
521 if port == 0 || port > config::MAX_PORT_NUM {
522 crate::error!("port can not greater than: 65535");
523 return false;
524 }
525 }
526 Err(_) => {
527 crate::error!("port must is int type, port is: {:#?}", array[1]);
528 return false;
529 }
530 }
531 }
532 for item in array.iter() {
533 arg.push(String::from(item.to_owned()));
534 }
535 true
536 }
537
538 #[cfg(feature = "host")]
539 pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
540 crate::info!("on_forward_success");
541 let channel_id = task_message.channel_id;
542 let payload = task_message.payload;
543 let forward_direction = payload[0] == b'1';
544 let connect_key = "unknow key".to_string();
545 let task_string = String::from_utf8(payload);
546 match task_string {
547 Ok(task_string) => {
548 let info = HdcForwardInfo::new(
549 session_id,
550 channel_id,
551 forward_direction,
552 task_string,
553 connect_key,
554 );
555 HdcForwardInfoMap::put(info).await;
556 }
557 Err(err) => {
558 crate::error!("payload to String failed. {err}");
559 }
560 }
561 transfer::TcpMap::end(task_message.channel_id).await;
562 Ok(())
563 }
564
565 pub async fn check_command(
566 ctx: &mut ContextForward,
567 _payload: &[u8],
568 server_or_daemon: bool,
569 ) -> bool {
570 let channel_id = ctx.channel_id;
571 if !_payload.is_empty() {
572 hdctransfer::echo_client(
573 ctx.session_id,
574 channel_id,
575 "Forwardport result:OK",
576 MessageLevel::Ok,
577 )
578 .await;
579 let map_info = String::from(if server_or_daemon { "1|" } else { "0|" }) + &ctx.task_command;
580
581 let mut command_string = vec![0_u8; map_info.len() + 1];
582 map_info
583 .as_bytes()
584 .to_vec()
585 .iter()
586 .enumerate()
587 .for_each(|(i, e)| {
588 command_string[i] = *e;
589 });
590 let forward_success_message = TaskMessage {
591 channel_id,
592 command: HdcCommand::ForwardSuccess,
593 payload: command_string,
594 };
595 #[cfg(feature = "host")]
596 {
597 let _ = on_forward_success(forward_success_message, ctx.session_id).await;
598 }
599 #[cfg(not(feature = "host"))]
600 {
601 transfer::put(ctx.session_id, forward_success_message).await;
602 }
603 } else {
604 hdctransfer::echo_client(
605 ctx.session_id,
606 channel_id,
607 "Forwardport result: Failed",
608 MessageLevel::Fail,
609 )
610 .await;
611 free_context(ctx.id, false).await;
612 return false;
613 }
614 true
615 }
616
617 pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool {
618 let type_str = &ctx_point.local_args[0];
619 match type_str.as_str() {
620 "tcp" => {
621 ctx_point.forward_type = ForwardType::Tcp;
622 }
623 "dev" => {
624 ctx_point.forward_type = ForwardType::Device;
625 }
626 "localabstract" => {
627 ctx_point.forward_type = ForwardType::Abstract;
628 }
629 "localfilesystem" => {
630 ctx_point.local_args[1] =
631 HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
632 ctx_point.forward_type = ForwardType::FileSystem;
633 }
634 "jdwp" => {
635 ctx_point.forward_type = ForwardType::Jdwp;
636 }
637 "ark" => {
638 ctx_point.forward_type = ForwardType::Ark;
639 }
640 "localreserved" => {
641 ctx_point.local_args[1] =
642 FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
643 ctx_point.forward_type = ForwardType::Reserved;
644 }
645 _ => {
646 crate::error!("this forward type may is not expected");
647 return false;
648 }
649 }
650 true
651 }
652
653 pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> {
654 let saddr = format!("127.0.0.1:{}", port);
655 let session_tmp = ctx.session_id;
656 let channel_tmp = ctx.channel_id;
657 let remote_parameters = ctx.remote_parameters.clone();
658 let ctx_type = ctx.forward_type.clone();
659
660 let result = TcpListener::bind(saddr.clone()).await;
661 match result {
662 Ok(listener) => {
663 crate::info!("forward tcp accept bind ok, addr:{:#?}", saddr);
664 let join_handle = utils::spawn(async move {
665 loop {
666 let (stream, _addr) = match listener.accept().await {
667 Ok((stream, _addr)) => (stream, _addr),
668 Err(err) => {
669 crate::error!("listener accept failed, {err}");
670 continue;
671 }
672 };
673 let (rd, wr) = stream.into_split();
674 let mut client_context = malloc_context(session_tmp, channel_tmp, true).await;
675 client_context.remote_parameters = remote_parameters.clone();
676 client_context.forward_type = ctx_type.clone();
677 ForwardContextMap::update(client_context.id, client_context.clone()).await;
678 crate::info!("malloc client_context id = {:#?}", client_context.id);
679 TcpWriteStreamMap::put(client_context.id, wr).await;
680 on_accept(client_context.id).await;
681 utils::spawn(async move {
682 recv_tcp_msg(session_tmp, channel_tmp, rd, client_context.id).await;
683 });
684 }
685 });
686 TcpListenerMap::put(channel_tmp, join_handle).await;
687 Ok(())
688 }
689 Err(e) => {
690 crate::error!("forward_ tcp_accept fail:{:#?}", e);
691 ctx.last_error = format!("TCP Port listen failed at {}", port);
692 Err(e)
693 }
694 }
695 }
696
697 pub async fn on_accept(cid: u32) {
698 let Some(context_forward) = ForwardContextMap::get(cid).await else {
699 crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
700 return;
701 };
702 let ctx = &mut context_forward.clone();
703
704 let buf_string: Vec<u8> = ctx.remote_parameters.clone().as_bytes().to_vec();
705 let mut new_buf = vec![0_u8; buf_string.len() + 9];
706
707 buf_string.iter().enumerate().for_each(|(i, e)| {
708 new_buf[i + 8] = *e;
709 });
710 send_to_task(
711 ctx.session_id,
712 ctx.channel_id,
713 HdcCommand::ForwardActiveSlave,
714 &new_buf,
715 buf_string.len() + 9,
716 ctx.id,
717 )
718 .await;
719 }
720
721 pub async fn daemon_connect_tcp(cid: u32, port: u32) {
722 let Some(context_forward) = ForwardContextMap::get(cid).await else {
723 crate::error!("daemon connect tcp get context is none cid={cid}");
724 return;
725 };
726 let ctx = &mut context_forward.clone();
727
728 let saddr = format!("127.0.0.1:{}", port);
729 let stream = match TcpStream::connect(saddr).await {
730 Err(err) => {
731 crate::error!("TcpStream::stream failed {:?}", err);
732 free_context(cid, true).await;
733 return;
734 }
735 Ok(addr) => addr,
736 };
737 send_active_master(ctx).await;
738 let (rd, wr) = stream.into_split();
739 TcpWriteStreamMap::put(ctx.id, wr).await;
740 ForwardContextMap::update(ctx.id, ctx.clone()).await;
741
742 let session_tmp = ctx.session_id;
743 let channel_tmp = ctx.channel_id;
744 update_context_to_task(session_tmp, channel_tmp, ctx).await;
745 utils::spawn(async move {
746 recv_tcp_msg(session_tmp, channel_tmp, rd, cid).await;
747 });
748 }
749
750 pub async fn update_context_to_task(session_id: u32, channel_id:u32, ctx: &mut ContextForward) {
751 let Some(task) = ForwardTaskMap::get(ctx.session_id, ctx.channel_id).await else {
752 crate::error!(
753 "update context to task is none session_id={:#?},channel_id={:#?}",
754 session_id,
755 channel_id
756 );
757 return;
758 };
759 let task = &mut task.clone();
760 task.context_forward = ctx.clone();
761 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
762 }
763
764 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
765 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
766 loop {
767 match rd.read(&mut data).await {
768 Ok(recv_size) => {
769 if recv_size == 0 {
770 crate::info!("recv_size is 0, tcp temporarily closed");
771 free_context(cid, true).await;
772 return;
773 }
774 send_to_task(
775 session_id,
776 channel_id,
777 HdcCommand::ForwardData,
778 &data[0..recv_size],
779 recv_size,
780 cid,
781 )
782 .await;
783 }
784 Err(_e) => {
785 free_context(cid, true).await;
786 crate::error!(
787 "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
788 );
789 }
790 }
791 }
792 }
793
794 #[cfg(not(target_os = "windows"))]
795 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32) {
796 loop {
797 let result = ylong_runtime::spawn_blocking(move || {
798 let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
799 let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
800 (recv_size, buffer)
801 })
802 .await;
803 let (recv_size, buffer) = match result {
804 Ok((recv_size, _)) if recv_size < 0 => {
805 crate::error!("local abstract close shutdown fd = {fd}");
806 free_context(cid, true).await;
807 return;
808 }
809 Ok((recv_size, buffer)) => (recv_size, buffer),
810 Err(err) => {
811 crate::error!("read socket msg failed. {err}");
812 free_context(cid, true).await;
813 return;
814 }
815 };
816 send_to_task(
817 session_id,
818 channel_id,
819 HdcCommand::ForwardData,
820 &buffer[0..recv_size as usize],
821 recv_size as usize,
822 cid,
823 )
824 .await;
825 }
826 }
827
828 pub async fn free_context(cid: u32, notify_remote: bool) {
829 let Some(context_forward) = ForwardContextMap::get(cid).await else {
830 crate::error!("free forward context get cid is none. cid = {cid}");
831 return;
832 };
833 let ctx = &mut context_forward.clone();
834
835 if notify_remote {
836 let vec_none = Vec::<u8>::new();
837 send_to_task(
838 ctx.session_id,
839 ctx.channel_id,
840 HdcCommand::ForwardFreeContext,
841 &vec_none,
842 0,
843 ctx.id,
844 )
845 .await;
846 }
847 crate::error!("begin to free forward context cid. cid = {cid}");
848 match ctx.forward_type {
849 ForwardType::Tcp => {
850 TcpWriteStreamMap::end(ctx.id).await;
851 }
852 ForwardType::Jdwp | ForwardType::Ark => {
853 TcpWriteStreamMap::end(ctx.id).await;
854 let ret = unsafe { libc::close(ctx.fd) };
855 crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,);
856 let target_fd_ret = unsafe { libc::close(ctx.target_fd) };
857 crate::debug!(
858 "close context_forward target fd, ret={}, id={}",
859 target_fd_ret,
860 ctx.id,
861 );
862 }
863 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
864 crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd);
865 #[cfg(not(target_os = "windows"))]
866 UdsServer::wrap_close(ctx.fd);
867 ctx.fd = -1;
868 }
869 ForwardType::Device => {}
870 }
871 ForwardContextMap::remove(cid).await;
872 }
873
874 pub async fn setup_tcp_point(ctx: &mut ContextForward) -> bool {
875 let Ok(port) = ctx.local_args[1].parse::<u32>() else {
876 crate::error!("setup tcp point parse error");
877 return false;
878 };
879 let cid = ctx.id;
880 if ctx.is_master {
881 let result = forward_tcp_accept(ctx, port).await;
882 return result.is_ok();
883 } else {
884 ForwardContextMap::update(ctx.id, ctx.clone()).await;
885 utils::spawn(async move { daemon_connect_tcp(cid, port).await });
886 }
887 true
888 }
889
890 #[cfg(not(target_os = "windows"))]
891 async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool {
892 let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
893 ctx.fd = fd;
894 let name: Vec<u8> = path.as_bytes().to_vec();
895 let mut socket_name = vec![0_u8; name.len() + 1];
896 socket_name[0] = b'\0';
897 name.iter().enumerate().for_each(|(i, e)| {
898 socket_name[i + 1] = *e;
899 });
900 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
901 ForwardContextMap::update(ctx.id, ctx.clone()).await;
902 let cid = ctx.id;
903 if let Ok(addr_obj) = &addr {
904 let ret = UdsServer::wrap_bind(fd, addr_obj);
905 if ret.is_err() {
906 hdctransfer::echo_client(
907 ctx.session_id,
908 ctx.channel_id,
909 "Unix pipe bind failed",
910 MessageLevel::Fail,
911 )
912 .await;
913 crate::error!("bind fail");
914 return false;
915 }
916 let ret = UdsServer::wrap_listen(fd);
917 if ret < 0 {
918 hdctransfer::echo_client(
919 ctx.session_id,
920 ctx.channel_id,
921 "Unix pipe listen failed",
922 MessageLevel::Fail,
923 )
924 .await;
925 crate::error!("listen fail");
926 return false;
927 }
928 utils::spawn(async move {
929 loop {
930 let client_fd = UdsServer::wrap_accept(fd);
931 if client_fd == -1 {
932 break;
933 }
934 on_accept(cid).await;
935 }
936 });
937 }
938 true
939 }
940
941 pub async fn canonicalize(path: String) -> Result<String, Error> {
942 match fs::canonicalize(path) {
943 Ok(abs_path) => match abs_path.to_str() {
944 Some(path) => Ok(path.to_string()),
945 None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
946 },
947 Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
948 }
949 }
950
951 #[cfg(target_os = "windows")]
952 pub async fn setup_device_point(_ctx: &mut ContextForward) -> bool {
953 false
954 }
955
956 #[cfg(not(target_os = "windows"))]
957 pub async fn setup_device_point(ctx: &mut ContextForward) -> bool {
958 let s_node_cfg = ctx.local_args[1].clone();
959 let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
960 crate::error!("Open unix-dev failed");
961 return false;
962 };
963 ctx.dev_path = resolv_path.clone();
964 crate::info!("setup_ device_point resolv_path={:?}", resolv_path);
965 let thread_path_ref = Arc::new(Mutex::new(resolv_path));
966 if !send_active_master(ctx).await {
967 crate::error!("send active_master return failed ctx={:?}", ctx);
968 return false;
969 }
970 let session = ctx.session_id;
971 let channel = ctx.channel_id;
972 let cid = ctx.id;
973
974 ForwardContextMap::update(ctx.id, ctx.clone()).await;
975 utils::spawn(async move {
976 loop {
977 let path = thread_path_ref.lock().await;
978 let Ok(mut file) = File::open(&*path) else {
979 crate::error!("open {} failed.", *path);
980 break;
981 };
982 let mut total = Vec::new();
983 let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
984 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
985 let Ok(read_len) = file.read(&mut buf[4..]) else {
986 crate::error!("read {} failed.", *path);
987 break;
988 };
989 if read_len == 0 {
990 free_context(cid, true).await;
991 break;
992 }
993 total.append(&mut buf[0..read_len].to_vec());
994 send_to_task(
995 session,
996 channel,
997 HdcCommand::ForwardData,
998 &total,
999 read_len,
1000 cid,
1001 )
1002 .await;
1003 }
1004 });
1005 true
1006 }
1007
1008 #[cfg(not(feature = "host"))]
get_pidnull1009 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
1010 match forward_type == ForwardType::Jdwp {
1011 true => parameter.parse::<u32>().unwrap_or_else(|e| {
1012 crate::error!("Jdwp get pid err :{:?}", e);
1013 0_u32
1014 }),
1015 false => {
1016 let params: Vec<&str> = parameter.split('@').collect();
1017 params[0].parse::<u32>().unwrap_or_else(|e| {
1018 crate::error!("get pid err :{:?}", e);
1019 0_u32
1020 })
1021 }
1022 }
1023 }
1024
1025 #[cfg(feature = "host")]
1026 pub async fn setup_jdwp_point(_ctx: &mut ContextForward) -> bool {
1027 crate::info!("host not setup_jdwp _point");
1028 false
1029 }
1030
1031 #[cfg(not(feature = "host"))]
1032 pub async fn setup_jdwp_point(ctx: &mut ContextForward) -> bool {
1033 let local_args = ctx.local_args[1].clone();
1034 let parameter = local_args.as_str();
1035 let style = &ctx.forward_type;
1036 let pid = get_pid(parameter, style.clone());
1037 let cid = ctx.id;
1038 let session = ctx.session_id;
1039 let channel = ctx.channel_id;
1040 if pid == 0 {
1041 crate::error!("setup jdwp point get pid is 0");
1042 return false;
1043 }
1044
1045 let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1046 if result.is_err() {
1047 crate::error!("wrap socketpair failed");
1048 return false;
1049 }
1050 let mut target_fd = 0;
1051 let mut local_fd = 0;
1052 if let Ok((fd0, fd1)) = result {
1053 crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1054 local_fd = fd0;
1055 target_fd = fd1;
1056 ctx.fd = local_fd;
1057 ctx.target_fd = target_fd;
1058 target_fd = fd1;
1059 }
1060
1061 utils::spawn(async move {
1062 loop {
1063 let result = ylong_runtime::spawn_blocking(move || {
1064 let mut buffer = [0u8; SOCKET_BUFFER_SIZE];
1065 let size = UdsServer::wrap_read(local_fd, &mut buffer);
1066 (size, buffer)
1067 })
1068 .await;
1069 let (size, buffer) = match result {
1070 Ok((size, _)) if size < 0 => {
1071 crate::error!("disconnect fd:({local_fd}, {target_fd}), error:{:?}", size);
1072 free_context(cid, true).await;
1073 break;
1074 }
1075 Ok((0, _)) => {
1076 ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1077 continue;
1078 }
1079 Ok((size, buffer)) => (size, buffer),
1080 Err(err) => {
1081 crate::error!("spawn_blocking failed. disconnect fd:({local_fd}, {target_fd}), error:{err}");
1082 free_context(cid, true).await;
1083 break;
1084 }
1085 };
1086 send_to_task(
1087 session,
1088 channel,
1089 HdcCommand::ForwardData,
1090 &buffer[0..size as usize],
1091 size as usize,
1092 cid,
1093 )
1094 .await;
1095 }
1096 });
1097
1098 let jdwp = Jdwp::get_instance();
1099 let mut param = ctx.local_args[0].clone();
1100 param.push(':');
1101 param.push_str(parameter);
1102
1103 let ret = jdwp
1104 .send_fd_to_target(pid, target_fd, local_fd, param.as_str())
1105 .await;
1106 if !ret {
1107 crate::error!("not found pid:{:?}", pid);
1108 hdctransfer::echo_client(
1109 session,
1110 channel,
1111 format!("fport fail:pid not found:{}", pid).as_str(),
1112 MessageLevel::Fail,
1113 )
1114 .await;
1115 task_finish(session, channel).await;
1116 return false;
1117 }
1118
1119 let vec_none = Vec::<u8>::new();
1120 send_to_task(
1121 session,
1122 channel,
1123 HdcCommand::ForwardActiveMaster, // 04
1124 &vec_none,
1125 0,
1126 cid,
1127 )
1128 .await;
1129 crate::info!("setup_jdwp_ point return true");
1130 true
1131 }
1132
1133 async fn task_finish(session_id: u32, channel_id: u32) {
1134 transfer_task_finish(channel_id, session_id).await;
1135 }
1136
1137 #[cfg(not(target_os = "windows"))]
1138 pub async fn daemon_connect_pipe(ctx: &mut ContextForward) {
1139 let name: Vec<u8> = ctx.local_args[1].clone().as_bytes().to_vec();
1140 let mut socket_name = vec![0_u8; name.len() + 1];
1141 socket_name[0] = b'\0';
1142 name.iter().enumerate().for_each(|(i, e)| {
1143 socket_name[i + 1] = *e;
1144 });
1145 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1146 if let Ok(addr_obj) = &addr {
1147 let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj);
1148 if ret.is_err() {
1149 hdctransfer::echo_client(
1150 ctx.session_id,
1151 ctx.channel_id,
1152 "localabstract connect fail",
1153 MessageLevel::Fail,
1154 )
1155 .await;
1156 free_context(ctx.id, true).await;
1157 return;
1158 }
1159 send_active_master(ctx).await;
1160 read_data_to_forward(ctx).await;
1161 }
1162 }
1163
1164 #[cfg(target_os = "windows")]
1165 pub async fn setup_file_point(_ctx: &mut ContextForward) -> bool {
1166 false
1167 }
1168
1169 #[cfg(not(target_os = "windows"))]
1170 pub async fn setup_file_point(ctx: &mut ContextForward) -> bool {
1171 let s_node_cfg = ctx.local_args[1].clone();
1172 if ctx.is_master {
1173 if ctx.forward_type == ForwardType::Reserved || ctx.forward_type == ForwardType::FileSystem
1174 {
1175 let _ = fs::remove_file(s_node_cfg.clone());
1176 }
1177 if !server_socket_bind_listen(ctx, s_node_cfg).await {
1178 crate::error!("server socket bind listen failed id={:?}", ctx.id);
1179 task_finish(ctx.session_id, ctx.channel_id).await;
1180 return false;
1181 }
1182 } else {
1183 if ctx.fd <= 0 {
1184 crate::info!("setup_file _point fd: {:?}", ctx.fd);
1185 if ctx.forward_type == ForwardType::Abstract {
1186 ctx.fd = UdsClient::wrap_socket(AF_LOCAL);
1187 unsafe {
1188 libc::fcntl(ctx.fd, F_SETFD, FD_CLOEXEC);
1189 }
1190 } else {
1191 ctx.fd = UdsClient::wrap_socket(AF_UNIX);
1192 }
1193 }
1194 ForwardContextMap::update(ctx.id, ctx.clone()).await;
1195 daemon_connect_pipe(ctx).await;
1196 }
1197 true
1198 }
1199
1200 pub async fn setup_point(ctx: &mut ContextForward) -> bool {
1201 if !detech_forward_type(ctx).await {
1202 crate::error!("forward type is not true");
1203 return false;
1204 }
1205
1206 if cfg!(target_os = "windows") && ctx.forward_type != ForwardType::Tcp {
1207 ctx.last_error = String::from("Not support forward-type");
1208 return false;
1209 }
1210
1211 let mut ret = false;
1212 match ctx.forward_type {
1213 ForwardType::Tcp => {
1214 ret = setup_tcp_point(ctx).await;
1215 }
1216 ForwardType::Device => {
1217 if !cfg!(target_os = "windows") {
1218 ret = setup_device_point(ctx).await;
1219 }
1220 }
1221 ForwardType::Jdwp | ForwardType::Ark => {
1222 crate::info!("setup point ark case");
1223 if !cfg!(feature = "host") {
1224 ret = setup_jdwp_point(ctx).await;
1225 }
1226 }
1227 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1228 if !cfg!(target_os = "windows") {
1229 ret = setup_file_point(ctx).await;
1230 }
1231 }
1232 };
1233 ForwardContextMap::update(ctx.id, ctx.clone()).await;
1234 update_context_to_task(ctx.session_id, ctx.channel_id, ctx).await;
1235 ret
1236 }
1237
1238 pub async fn send_to_task(
1239 session_id: u32,
1240 channel_id: u32,
1241 command: HdcCommand,
1242 buf_ptr: &[u8],
1243 buf_size: usize,
1244 cid: u32,
1245 ) -> bool {
1246 if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1247 crate::error!("send task buf_size oversize");
1248 return false;
1249 }
1250
1251 let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1252 new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1253 let file_check_message = TaskMessage {
1254 channel_id,
1255 command,
1256 payload: new_buf,
1257 };
1258 transfer::put(session_id, file_check_message).await;
1259 true
1260 }
1261
get_cidnull1262 pub fn get_cid(_payload: &[u8]) -> u32 {
1263 let mut id_bytes = [0u8; 4];
1264 id_bytes.copy_from_slice(&_payload[0..4]);
1265 let id: u32 = u32::from_be_bytes(id_bytes);
1266 id
1267 }
1268
1269 pub async fn send_active_master(ctx: &mut ContextForward) -> bool {
1270 if ctx.check_order {
1271 let flag = [0u8; 1];
1272 send_to_task(
1273 ctx.session_id,
1274 ctx.channel_id,
1275 HdcCommand::ForwardCheckResult,
1276 &flag,
1277 1,
1278 ctx.id,
1279 )
1280 .await;
1281 free_context(ctx.id, false).await;
1282 return true;
1283 }
1284 if !send_to_task(
1285 ctx.session_id,
1286 ctx.channel_id,
1287 HdcCommand::ForwardActiveMaster,
1288 &Vec::<u8>::new(),
1289 0,
1290 ctx.id,
1291 )
1292 .await
1293 {
1294 free_context(ctx.id, true).await;
1295 return false;
1296 }
1297 true
1298 }
1299
forward_parse_cmdnull1300 pub fn forward_parse_cmd(context_forward: &mut ContextForward) -> bool {
1301 let command = context_forward.task_command.clone();
1302 let result = Base::split_command_to_args(&command);
1303 let argv = result.0;
1304 let argc = result.1;
1305
1306 if argc < ARG_COUNT2 {
1307 crate::error!("argc < 2 parse is failed.");
1308 context_forward.last_error = "Too few arguments.".to_string();
1309 return false;
1310 }
1311 if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1312 crate::error!("parse's length is flase.");
1313 context_forward.last_error = "Some argument too long.".to_string();
1314 return false;
1315 }
1316 if !check_node_info(&argv[0], &mut context_forward.local_args) {
1317 crate::error!("check argv[0] node info is flase.");
1318 context_forward.last_error = "Arguments parsing failed.".to_string();
1319 return false;
1320 }
1321 if !check_node_info(&argv[1], &mut context_forward.remote_args) {
1322 crate::error!("check argv[1] node info is flase.");
1323 context_forward.last_error = "Arguments parsing failed.".to_string();
1324 return false;
1325 }
1326 context_forward.remote_parameters = argv[1].clone();
1327 true
1328 }
1329
1330 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1331 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1332 crate::error!("begin forward get task is none session_id={session_id},channel_id={channel_id}"
1333 );
1334 return false;
1335 };
1336 let task: &mut HdcForward = &mut task.clone();
1337 let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1338 crate::error!("cmd argv is not int utf8");
1339 return false;
1340 };
1341 let mut context_forward = malloc_context(session_id, channel_id, true).await;
1342 context_forward.task_command = command.clone();
1343
1344 if !forward_parse_cmd(&mut context_forward) {
1345 task.context_forward = context_forward.clone();
1346 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1347 return false;
1348 }
1349 if !setup_point(&mut context_forward).await {
1350 task.context_forward = context_forward.clone();
1351 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1352 return false;
1353 }
1354
1355 let wake_up_message = TaskMessage {
1356 channel_id,
1357 command: HdcCommand::KernelWakeupSlavetask,
1358 payload: Vec::<u8>::new(),
1359 };
1360 transfer::put(context_forward.session_id, wake_up_message).await;
1361
1362 let buf_string: Vec<u8> = context_forward.remote_parameters.as_bytes().to_vec();
1363 let mut new_buf = vec![0_u8; buf_string.len() + 9];
1364 buf_string.iter().enumerate().for_each(|(i, e)| {
1365 new_buf[i + 8] = *e;
1366 });
1367 send_to_task(
1368 context_forward.session_id,
1369 context_forward.channel_id,
1370 HdcCommand::ForwardCheck,
1371 &new_buf,
1372 buf_string.len() + 9,
1373 context_forward.id,
1374 )
1375 .await;
1376 true
1377 }
1378
1379 pub async fn slave_connect(
1380 session_id: u32,
1381 channel_id: u32,
1382 payload: &[u8],
1383 check_order: bool,
1384 ) -> bool {
1385 let mut context_forward = malloc_context(session_id, channel_id, false).await;
1386 context_forward.check_order = check_order;
1387 if let Ok((content, id)) = filter_command(payload) {
1388 let content = &content[8..].trim_end_matches('\0').to_string();
1389 context_forward.task_command = content.clone();
1390 if !check_node_info(content, &mut context_forward.local_args) {
1391 crate::error!("check local args is false");
1392 return false;
1393 }
1394 context_forward.id = id;
1395 ForwardContextMap::update(id, context_forward.clone()).await;
1396 }
1397
1398 if !check_order {
1399 if !setup_point(&mut context_forward).await {
1400 crate::error!("setup point return false, free context");
1401 free_context(context_forward.id, true).await;
1402 ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1403 return false;
1404 }
1405 } else {
1406 send_active_master(&mut context_forward).await;
1407 }
1408 ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1409 true
1410 }
1411
1412 pub async fn read_data_to_forward(ctx: &mut ContextForward) {
1413 let cid = ctx.id;
1414 let session = ctx.session_id;
1415 let channel = ctx.channel_id;
1416 match ctx.forward_type {
1417 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1418 #[cfg(not(target_os = "windows"))]
1419 {
1420 let fd_temp = ctx.fd;
1421 utils::spawn(async move {
1422 deamon_read_socket_msg(session, channel, fd_temp, cid).await
1423 });
1424 }
1425 }
1426 ForwardType::Device => {
1427 #[cfg(not(target_os = "windows"))]
1428 setup_device_point(ctx).await;
1429 }
1430 _ => {}
1431 }
1432 }
1433
filter_commandnull1434 pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1435 let bytes = &_payload[4..];
1436 let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1437 if let Ok(content) = ct {
1438 let mut id_bytes = [0u8; 4];
1439 id_bytes.copy_from_slice(&_payload[0..4]);
1440 let id: u32 = u32::from_be_bytes(id_bytes);
1441 return Ok((content, id));
1442 }
1443 Err(Error::new(ErrorKind::Other, "filter command failure"))
1444 }
1445
1446 pub async fn dev_write_bufer(path: String, content: Vec<u8>) {
1447 utils::spawn(async move {
1448 let open_result = OpenOptions::new()
1449 .write(true)
1450 .create(true)
1451 .open(path.clone());
1452
1453 match open_result {
1454 Ok(mut file) => {
1455 let write_result = file.write_all(content.as_slice());
1456 match write_result {
1457 Ok(()) => {}
1458 Err(e) => {
1459 crate::error!("dev write bufer to file fail:{:#?}", e);
1460 }
1461 }
1462 }
1463 Err(e) => {
1464 crate::error!("dev write bufer fail:{:#?}", e);
1465 }
1466 }
1467 });
1468 }
1469
1470 pub async fn write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool {
1471 if ctx.forward_type == ForwardType::Tcp {
1472 return TcpWriteStreamMap::write(ctx.id, content).await;
1473 } else if ctx.forward_type == ForwardType::Device {
1474 let path_ref = ctx.dev_path.clone();
1475 if path_ref.is_empty() {
1476 crate::error!(
1477 "write_forward_bufer get dev_path is failed ctx.id = {:#?}",
1478 ctx.id
1479 );
1480 return false;
1481 }
1482 dev_write_bufer(path_ref, content).await;
1483 } else {
1484 #[cfg(not(target_os = "windows"))]
1485 {
1486 let fd = ctx.fd;
1487 if UdsClient::wrap_send(fd, &content) < 0 {
1488 crate::info!("write forward bufer failed. fd = {fd}");
1489 return false;
1490 }
1491 }
1492 }
1493 true
1494 }
1495
1496 #[allow(unused)]
1497 pub async fn malloc_context(
1498 session_id: u32,
1499 channel_id: u32,
1500 master_slave: bool,
1501 ) -> ContextForward {
1502 let mut ctx = ContextForward {
1503 ..Default::default()
1504 };
1505 ctx.id = utils::get_current_time() as u32;
1506 ctx.session_id = session_id;
1507 ctx.channel_id = channel_id;
1508 ctx.is_master = master_slave;
1509 crate::info!("malloc_context success id = {:#?}", ctx.id);
1510 ForwardContextMap::update(ctx.id, ctx.clone()).await;
1511 ctx
1512 }
1513
1514 pub async fn forward_command_dispatch(
1515 session_id: u32,
1516 channel_id: u32,
1517 command: HdcCommand,
1518 _payload: &[u8],
1519 ) -> bool {
1520 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1521 crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1522 );
1523 return false;
1524 };
1525 let task: &mut HdcForward = &mut task.clone();
1526 let mut ret: bool = true;
1527 let cid = get_cid(_payload);
1528 let send_msg = _payload[4..].to_vec();
1529
1530 let Some(context_forward) = ForwardContextMap::get(cid).await else {
1531 crate::error!("forward command dispatch get context is none cid={cid}");
1532 return false;
1533 };
1534 let ctx = &mut context_forward.clone();
1535 match command {
1536 HdcCommand::ForwardCheckResult => {
1537 ret = check_command(ctx, _payload, task.server_or_daemon).await;
1538 }
1539 HdcCommand::ForwardData => {
1540 ret = write_forward_bufer(ctx, send_msg).await;
1541 }
1542 HdcCommand::ForwardFreeContext => {
1543 free_context(ctx.id, false).await;
1544 }
1545 HdcCommand::ForwardActiveMaster => {
1546 read_data_to_forward(ctx).await;
1547 ret = true;
1548 }
1549 _ => {
1550 ret = false;
1551 }
1552 }
1553 ret
1554 }
1555
1556 async fn print_error_info(session_id: u32, channel_id: u32) {
1557 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1558 crate::error!(
1559 "detech_forward_type get task is none session_id = {:#?}, channel_id = {:#?}",
1560 session_id,
1561 channel_id
1562 );
1563 return;
1564 };
1565 let task = &mut task.clone();
1566 let ctx = &task.context_forward;
1567 let mut echo_content = ctx.last_error.clone();
1568
1569 if echo_content.is_empty() {
1570 echo_content = "Forward parament failed".to_string();
1571 }
1572
1573 hdctransfer::echo_client(
1574 session_id,
1575 channel_id,
1576 echo_content.as_str(),
1577 MessageLevel::Fail,
1578 )
1579 .await;
1580 }
1581
1582 pub async fn command_dispatch(
1583 session_id: u32,
1584 channel_id: u32,
1585 command: HdcCommand,
1586 payload: &[u8],
1587 _payload_size: u16,
1588 ) -> bool {
1589 if command != HdcCommand::ForwardData {
1590 crate::info!("command_dispatch command recv: {:?}", command);
1591 }
1592
1593 let ret = match command {
1594 HdcCommand::ForwardInit => begin_forward(session_id, channel_id, payload).await,
1595 HdcCommand::ForwardCheck => slave_connect(session_id, channel_id, payload, true).await,
1596 HdcCommand::ForwardActiveSlave => {
1597 slave_connect(session_id, channel_id, payload, false).await
1598 }
1599 _ => forward_command_dispatch(session_id, channel_id, command, payload).await,
1600 };
1601 if !ret {
1602 print_error_info(session_id, channel_id).await;
1603 task_finish(session_id, channel_id).await;
1604 return false;
1605 }
1606 ret
1607 }
1608