1/* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15//! forward 16#![allow(missing_docs)] 17#[cfg(feature = "host")] 18extern crate ylong_runtime_static as ylong_runtime; 19 20#[cfg(not(feature = "host"))] 21use libc::SOCK_STREAM; 22#[cfg(not(target_os = "windows"))] 23use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD}; 24use std::collections::HashMap; 25#[cfg(not(target_os = "windows"))] 26use std::fs::{self, File, OpenOptions}; 27#[cfg(not(target_os = "windows"))] 28use std::io::{self, Error, ErrorKind, Read, Write}; 29use ylong_runtime::sync::{Mutex, RwLock}; 30 31use crate::common::base::Base; 32use crate::common::hdctransfer::transfer_task_finish; 33use crate::common::hdctransfer::{self, HdcTransferBase}; 34#[cfg(not(feature = "host"))] 35use crate::common::jdwp::Jdwp; 36#[cfg(not(target_os = "windows"))] 37use crate::common::uds::{UdsAddr, UdsClient, UdsServer}; 38use crate::config::HdcCommand; 39use crate::config::MessageLevel; 40use crate::config::TaskMessage; 41use crate::transfer; 42#[allow(unused)] 43use crate::utils::hdc_log::*; 44use crate::{config, utils}; 45use std::mem::MaybeUninit; 46use std::sync::Arc; 47use std::sync::Once; 48#[cfg(not(feature = "host"))] 49use std::time::Duration; 50use ylong_runtime::io::AsyncReadExt; 51use ylong_runtime::io::AsyncWriteExt; 52use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream}; 53use ylong_runtime::task::JoinHandle; 54 55pub const ARG_COUNT2: u32 = 2; 56pub const BUF_SIZE_SMALL: usize = 256; 57pub const SOCKET_BUFFER_SIZE: usize = 65535; 58pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket"; 59pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/"; 60 61#[cfg(feature = "host")] 62#[derive(Clone, Debug)] 63pub struct HdcForwardInfo { 64 pub session_id: u32, 65 pub channel_id: u32, 66 pub forward_direction: bool, 67 pub task_string: String, 68 pub connect_key: String, 69} 70 71#[cfg(feature = "host")] 72impl HdcForwardInfo { 73 fn new( 74 session_id: u32, 75 channel_id: u32, 76 forward_direction: bool, 77 task_string: String, 78 connect_key: String, 79 ) -> Self { 80 Self { 81 session_id, 82 channel_id, 83 forward_direction, 84 task_string, 85 connect_key, 86 } 87 } 88} 89 90#[derive(Default, Eq, PartialEq, Clone, Debug)] 91enum ForwardType { 92 #[default] 93 Tcp = 0, 94 Device, 95 Abstract, 96 FileSystem, 97 Jdwp, 98 Ark, 99 Reserved, 100} 101 102#[derive(Debug, Default, PartialEq, Eq, Clone)] 103pub struct ContextForward { 104 session_id: u32, 105 channel_id: u32, 106 check_order: bool, 107 is_master: bool, 108 id: u32, 109 fd: i32, 110 target_fd: i32, 111 forward_type: ForwardType, 112 local_args: Vec<String>, 113 remote_args: Vec<String>, 114 task_command: String, 115 last_error: String, 116 remote_parameters: String, 117 dev_path: String, 118} 119 120#[cfg(feature = "host")] 121type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>; 122#[cfg(feature = "host")] 123type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>; 124#[cfg(feature = "host")] 125pub struct HdcForwardInfoMap {} 126#[cfg(feature = "host")] 127impl HdcForwardInfoMap { 128 fn get_instance() -> HdcForwardInfoMap_ { 129 static mut MAP: Option<HdcForwardInfoMap_> = None; 130 unsafe { 131 MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new()))) 132 .clone() 133 } 134 } 135 136 async fn put(forward_info: HdcForwardInfo) { 137 let instance = Self::get_instance(); 138 let mut map = instance.lock().await; 139 map.insert( 140 forward_info.task_string.clone(), 141 Arc::new(Mutex::new(forward_info)), 142 ); 143 } 144 145 pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> { 146 let instance = Self::get_instance(); 147 let map = instance.lock().await; 148 let mut result = Vec::new(); 149 for (_key, value) in map.iter() { 150 let info = value.lock().await; 151 result.push((*info).clone()); 152 } 153 result 154 } 155 156 pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool { 157 crate::info!( 158 "remove_forward task_string:{}, direction:{}", 159 task_string, 160 forward_direction 161 ); 162 let instance = Self::get_instance(); 163 let map = instance.lock().await; 164 let mut remove_key = String::new(); 165 let prefix = if forward_direction { 166 "1|".to_string() 167 } else { 168 "0|".to_string() 169 }; 170 let mut task_string1 = prefix; 171 task_string1.push_str(task_string.as_str()); 172 for (key, value) in map.iter() { 173 let info = value.lock().await; 174 if info.task_string.contains(&task_string1) 175 && info.forward_direction == forward_direction 176 { 177 remove_key = (*key.clone()).to_string(); 178 break; 179 } 180 } 181 drop(map); 182 if remove_key.is_empty() { 183 return false; 184 } 185 186 let mut map = instance.lock().await; 187 let result = map.remove(&remove_key); 188 result.is_some() 189 } 190} 191 192type TcpWriter = Arc<Mutex<SplitWriteHalf>>; 193type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>; 194pub struct TcpWriteStreamMap {} 195impl TcpWriteStreamMap { 196 fn get_instance() -> TcpWriterMap_ { 197 static mut TCP_MAP: Option<TcpWriterMap_> = None; 198 unsafe { 199 TCP_MAP 200 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) 201 .clone() 202 } 203 } 204 #[allow(unused)] 205 async fn put(id: u32, wr: SplitWriteHalf) { 206 let instance = Self::get_instance(); 207 let mut map = instance.write().await; 208 let arc_wr = Arc::new(Mutex::new(wr)); 209 map.insert(id, arc_wr); 210 } 211 #[allow(unused)] 212 async fn write(id: u32, data: Vec<u8>) -> bool { 213 let arc_map = Self::get_instance(); 214 let map = arc_map.write().await; 215 let Some(arc_wr) = map.get(&id) else { 216 crate::error!("TcpWriteStreamMap failed to get id {:#?}", id); 217 return false; 218 }; 219 let mut wr = arc_wr.lock().await; 220 let write_result = wr.write_all(data.as_slice()).await; 221 if write_result.is_err() { 222 crate::error!("TcpWriteStreamMap write_all error. id = {:#?}", id); 223 } 224 true 225 } 226 227 pub async fn end(id: u32) { 228 let instance = Self::get_instance(); 229 let mut map = instance.write().await; 230 if let Some(arc_wr) = map.remove(&id) { 231 let mut wr = arc_wr.lock().await; 232 let _ = wr.shutdown().await; 233 } 234 } 235} 236 237type TcpListener_ = Arc<Mutex<JoinHandle<()>>>; 238type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>; 239pub struct TcpListenerMap {} 240impl TcpListenerMap { 241 fn get_instance() -> TcpListenerMap_ { 242 static mut TCP_MAP: Option<TcpListenerMap_> = None; 243 unsafe { 244 TCP_MAP 245 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) 246 .clone() 247 } 248 } 249 #[allow(unused)] 250 async fn put(id: u32, listener: JoinHandle<()>) { 251 let instance = Self::get_instance(); 252 let mut map = instance.write().await; 253 let arc_listener = Arc::new(Mutex::new(listener)); 254 map.insert(id, arc_listener); 255 crate::info!("forward tcp put listener id = {id}"); 256 } 257 258 pub async fn end(id: u32) { 259 let instance = Self::get_instance(); 260 let mut map = instance.write().await; 261 if let Some(arc_listener) = map.remove(&id) { 262 let join_handle = arc_listener.lock().await; 263 join_handle.cancel(); 264 } 265 } 266} 267 268type MapContextForward_ = Arc<Mutex<HashMap<u32, ContextForward>>>; 269pub struct ForwardContextMap {} 270impl ForwardContextMap { 271 fn get_instance() -> &'static MapContextForward_ { 272 static mut CONTEXT_MAP: MaybeUninit<MapContextForward_> = MaybeUninit::uninit(); 273 static ONCE: Once = Once::new(); 274 unsafe { 275 ONCE.call_once(|| { 276 CONTEXT_MAP = MaybeUninit::new(Arc::new(Mutex::new(HashMap::new()))); 277 } 278 ); 279 &*CONTEXT_MAP.as_ptr() 280 } 281 } 282 283 pub async fn update(cid: u32, value: ContextForward) { 284 let map = Self::get_instance(); 285 let mut map = map.lock().await; 286 map.insert(cid, value.clone()); 287 } 288 289 pub async fn remove(cid: u32) { 290 crate::info!("ContextForward remove, cid:{}", cid); 291 let map = Self::get_instance(); 292 let mut map = map.lock().await; 293 let _ = map.remove(&cid); 294 } 295 296 pub async fn get(cid: u32) -> Option<ContextForward> { 297 let arc = Self::get_instance(); 298 let map = arc.lock().await; 299 let task = map.get(&cid); 300 match task { 301 Some(task) => Some(task.clone()), 302 None => { 303 crate::debug!("ContextForward result:is none,cid={:#?}", cid,); 304 Option::None 305 } 306 } 307 } 308} 309 310type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>; 311pub struct ForwardTaskMap {} 312impl ForwardTaskMap { 313 fn get_instance() -> MapForward_ { 314 static mut FORWARD_MAP: Option<MapForward_> = None; 315 unsafe { 316 FORWARD_MAP 317 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new()))) 318 .clone() 319 } 320 } 321 322 pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) { 323 let map = Self::get_instance(); 324 let mut map = map.lock().await; 325 map.insert((session_id, channel_id), value.clone()); 326 } 327 328 pub async fn remove(session_id: u32, channel_id: u32) { 329 crate::info!("remove, session:{}, channel:{}", session_id, channel_id); 330 let map = Self::get_instance(); 331 let mut map = map.lock().await; 332 let _ = map.remove(&(session_id, channel_id)); 333 } 334 335 pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> { 336 let arc = Self::get_instance(); 337 let map = arc.lock().await; 338 let task = map.get(&(session_id, channel_id)); 339 match task { 340 Some(task) => Some(task.clone()), 341 None => { 342 crate::debug!( 343 "Forward TaskMap result:is none,session_id={:#?}, channel_id={:#?}", 344 session_id, 345 channel_id 346 ); 347 Option::None 348 } 349 } 350 } 351 352 pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> { 353 let arc = Self::get_instance(); 354 let map = arc.lock().await; 355 for ((_session_id, _channel_id), value) in map.iter() { 356 if *_session_id == session_id && task_string.contains(&value.task_command) { 357 return Some(*_channel_id); 358 } 359 } 360 None 361 } 362 363 pub async fn clear(session_id: u32) { 364 let arc = Self::get_instance(); 365 let mut channel_list = Vec::new(); 366 { 367 let map = arc.lock().await; 368 if map.is_empty() { 369 return; 370 } 371 for (&key, _) in map.iter() { 372 if key.0 == session_id { 373 let id = key; 374 channel_list.push(id); 375 } 376 } 377 } 378 for id in channel_list { 379 free_channel_task(id.0, id.1).await; 380 } 381 } 382 383 pub async fn dump_task() -> String { 384 let arc = Self::get_instance(); 385 let map = arc.lock().await; 386 let mut result = String::new(); 387 for (_id, forward_task) in map.iter() { 388 let forward_type = match forward_task.remote_args.len() { 389 0 => "fport".to_string(), 390 2 => "rport".to_string(), 391 _ => "unknown".to_string(), 392 }; 393 let first_args = match forward_task.remote_args.len() { 394 0 => "unknown".to_string(), 395 2 => format!( 396 "{}:{}", 397 forward_task.local_args[0], forward_task.local_args[1] 398 ), 399 _ => "unknown".to_string(), 400 }; 401 let second_args = match forward_task.remote_args.len() { 402 0 => format!( 403 "{}:{}", 404 forward_task.local_args[0], forward_task.local_args[1] 405 ), 406 2 => format!( 407 "{}:{}", 408 forward_task.remote_args[0], forward_task.remote_args[1] 409 ), 410 _ => "unknown".to_string(), 411 }; 412 result.push_str(&format!( 413 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n", 414 forward_task.session_id, 415 forward_task.channel_id, 416 forward_type, 417 first_args, 418 second_args 419 )); 420 } 421 result 422 } 423} 424 425pub async fn free_channel_task(session_id: u32, channel_id: u32) { 426 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else { 427 return; 428 }; 429 let task = &mut task.clone(); 430 let cid = task.context_forward.id; 431 crate::info!("free channel context session_id:{session_id}, channel_id:{channel_id}, cid:{cid}"); 432 433 match task.forward_type { 434 ForwardType::Tcp => { 435 TcpWriteStreamMap::end(cid).await; 436 TcpListenerMap::end(channel_id).await; 437 } 438 ForwardType::Jdwp | ForwardType::Ark => { 439 TcpWriteStreamMap::end(cid).await; 440 let ret = unsafe { libc::close(task.context_forward.fd) }; 441 crate::debug!( 442 "close context_forward fd, ret={}, session_id={}, channel_id={}", 443 ret, 444 session_id, 445 channel_id, 446 ); 447 let target_fd_ret = unsafe { libc::close(task.context_forward.target_fd) }; 448 crate::debug!( 449 "close context_forward target fd, ret={}, session_id={}, channel_id={}", 450 target_fd_ret, 451 session_id, 452 channel_id, 453 ); 454 TcpListenerMap::end(channel_id).await; 455 } 456 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => { 457 #[cfg(not(target_os = "windows"))] 458 UdsServer::wrap_close(task.context_forward.fd); 459 } 460 ForwardType::Device => { 461 return; 462 } 463 } 464 ForwardTaskMap::remove(session_id, channel_id).await; 465} 466 467pub async fn stop_task(session_id: u32) { 468 ForwardTaskMap::clear(session_id).await; 469} 470 471pub async fn dump_task() -> String { 472 ForwardTaskMap::dump_task().await 473} 474 475#[derive(Debug, Default, Clone, PartialEq, Eq)] 476pub struct HdcForward { 477 session_id: u32, 478 channel_id: u32, 479 server_or_daemon: bool, 480 local_args: Vec<String>, 481 remote_args: Vec<String>, 482 task_command: String, 483 forward_type: ForwardType, 484 context_forward: ContextForward, 485 pub transfer: HdcTransferBase, 486} 487 488impl HdcForward { 489 pub fn new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self { 490 Self { 491 session_id, 492 channel_id, 493 server_or_daemon, 494 local_args: Default::default(), 495 remote_args: Default::default(), 496 task_command: Default::default(), 497 forward_type: Default::default(), 498 context_forward: Default::default(), 499 transfer: HdcTransferBase::new(session_id, channel_id), 500 } 501 } 502} 503 504pub fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool { 505 crate::info!("check cmd args is : {:#?}", value); 506 if !value.contains(':') { 507 return false; 508 } 509 let array = value.split(':').collect::<Vec<&str>>(); 510 if array[0] == "tcp" { 511 if array[1].len() > config::MAX_PORT_LEN { 512 crate::error!( 513 "forward port = {:#?} it'slength is wrong, can not more than five", 514 array[1] 515 ); 516 return false; 517 } 518 519 match array[1].parse::<u32>() { 520 Ok(port) => { 521 if port == 0 || port > config::MAX_PORT_NUM { 522 crate::error!("port can not greater than: 65535"); 523 return false; 524 } 525 } 526 Err(_) => { 527 crate::error!("port must is int type, port is: {:#?}", array[1]); 528 return false; 529 } 530 } 531 } 532 for item in array.iter() { 533 arg.push(String::from(item.to_owned())); 534 } 535 true 536} 537 538#[cfg(feature = "host")] 539pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> { 540 crate::info!("on_forward_success"); 541 let channel_id = task_message.channel_id; 542 let payload = task_message.payload; 543 let forward_direction = payload[0] == b'1'; 544 let connect_key = "unknow key".to_string(); 545 let task_string = String::from_utf8(payload); 546 match task_string { 547 Ok(task_string) => { 548 let info = HdcForwardInfo::new( 549 session_id, 550 channel_id, 551 forward_direction, 552 task_string, 553 connect_key, 554 ); 555 HdcForwardInfoMap::put(info).await; 556 } 557 Err(err) => { 558 crate::error!("payload to String failed. {err}"); 559 } 560 } 561 transfer::TcpMap::end(task_message.channel_id).await; 562 Ok(()) 563} 564 565pub async fn check_command( 566 ctx: &mut ContextForward, 567 _payload: &[u8], 568 server_or_daemon: bool, 569) -> bool { 570 let channel_id = ctx.channel_id; 571 if !_payload.is_empty() { 572 hdctransfer::echo_client( 573 ctx.session_id, 574 channel_id, 575 "Forwardport result:OK", 576 MessageLevel::Ok, 577 ) 578 .await; 579 let map_info = String::from(if server_or_daemon { "1|" } else { "0|" }) + &ctx.task_command; 580 581 let mut command_string = vec![0_u8; map_info.len() + 1]; 582 map_info 583 .as_bytes() 584 .to_vec() 585 .iter() 586 .enumerate() 587 .for_each(|(i, e)| { 588 command_string[i] = *e; 589 }); 590 let forward_success_message = TaskMessage { 591 channel_id, 592 command: HdcCommand::ForwardSuccess, 593 payload: command_string, 594 }; 595 #[cfg(feature = "host")] 596 { 597 let _ = on_forward_success(forward_success_message, ctx.session_id).await; 598 } 599 #[cfg(not(feature = "host"))] 600 { 601 transfer::put(ctx.session_id, forward_success_message).await; 602 } 603 } else { 604 hdctransfer::echo_client( 605 ctx.session_id, 606 channel_id, 607 "Forwardport result: Failed", 608 MessageLevel::Fail, 609 ) 610 .await; 611 free_context(ctx.id, false).await; 612 return false; 613 } 614 true 615} 616 617pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool { 618 let type_str = &ctx_point.local_args[0]; 619 match type_str.as_str() { 620 "tcp" => { 621 ctx_point.forward_type = ForwardType::Tcp; 622 } 623 "dev" => { 624 ctx_point.forward_type = ForwardType::Device; 625 } 626 "localabstract" => { 627 ctx_point.forward_type = ForwardType::Abstract; 628 } 629 "localfilesystem" => { 630 ctx_point.local_args[1] = 631 HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1]; 632 ctx_point.forward_type = ForwardType::FileSystem; 633 } 634 "jdwp" => { 635 ctx_point.forward_type = ForwardType::Jdwp; 636 } 637 "ark" => { 638 ctx_point.forward_type = ForwardType::Ark; 639 } 640 "localreserved" => { 641 ctx_point.local_args[1] = 642 FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1]; 643 ctx_point.forward_type = ForwardType::Reserved; 644 } 645 _ => { 646 crate::error!("this forward type may is not expected"); 647 return false; 648 } 649 } 650 true 651} 652 653pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> { 654 let saddr = format!("127.0.0.1:{}", port); 655 let session_tmp = ctx.session_id; 656 let channel_tmp = ctx.channel_id; 657 let remote_parameters = ctx.remote_parameters.clone(); 658 let ctx_type = ctx.forward_type.clone(); 659 660 let result = TcpListener::bind(saddr.clone()).await; 661 match result { 662 Ok(listener) => { 663 crate::info!("forward tcp accept bind ok, addr:{:#?}", saddr); 664 let join_handle = utils::spawn(async move { 665 loop { 666 let (stream, _addr) = match listener.accept().await { 667 Ok((stream, _addr)) => (stream, _addr), 668 Err(err) => { 669 crate::error!("listener accept failed, {err}"); 670 continue; 671 } 672 }; 673 let (rd, wr) = stream.into_split(); 674 let mut client_context = malloc_context(session_tmp, channel_tmp, true).await; 675 client_context.remote_parameters = remote_parameters.clone(); 676 client_context.forward_type = ctx_type.clone(); 677 ForwardContextMap::update(client_context.id, client_context.clone()).await; 678 crate::info!("malloc client_context id = {:#?}", client_context.id); 679 TcpWriteStreamMap::put(client_context.id, wr).await; 680 on_accept(client_context.id).await; 681 utils::spawn(async move { 682 recv_tcp_msg(session_tmp, channel_tmp, rd, client_context.id).await; 683 }); 684 } 685 }); 686 TcpListenerMap::put(channel_tmp, join_handle).await; 687 Ok(()) 688 } 689 Err(e) => { 690 crate::error!("forward_ tcp_accept fail:{:#?}", e); 691 ctx.last_error = format!("TCP Port listen failed at {}", port); 692 Err(e) 693 } 694 } 695} 696 697pub async fn on_accept(cid: u32) { 698 let Some(context_forward) = ForwardContextMap::get(cid).await else { 699 crate::error!("daemon_connect _tcp2 get context is none cid={cid}"); 700 return; 701 }; 702 let ctx = &mut context_forward.clone(); 703 704 let buf_string: Vec<u8> = ctx.remote_parameters.clone().as_bytes().to_vec(); 705 let mut new_buf = vec![0_u8; buf_string.len() + 9]; 706 707 buf_string.iter().enumerate().for_each(|(i, e)| { 708 new_buf[i + 8] = *e; 709 }); 710 send_to_task( 711 ctx.session_id, 712 ctx.channel_id, 713 HdcCommand::ForwardActiveSlave, 714 &new_buf, 715 buf_string.len() + 9, 716 ctx.id, 717 ) 718 .await; 719} 720 721pub async fn daemon_connect_tcp(cid: u32, port: u32) { 722 let Some(context_forward) = ForwardContextMap::get(cid).await else { 723 crate::error!("daemon connect tcp get context is none cid={cid}"); 724 return; 725 }; 726 let ctx = &mut context_forward.clone(); 727 728 let saddr = format!("127.0.0.1:{}", port); 729 let stream = match TcpStream::connect(saddr).await { 730 Err(err) => { 731 crate::error!("TcpStream::stream failed {:?}", err); 732 free_context(cid, true).await; 733 return; 734 } 735 Ok(addr) => addr, 736 }; 737 send_active_master(ctx).await; 738 let (rd, wr) = stream.into_split(); 739 TcpWriteStreamMap::put(ctx.id, wr).await; 740 ForwardContextMap::update(ctx.id, ctx.clone()).await; 741 742 let session_tmp = ctx.session_id; 743 let channel_tmp = ctx.channel_id; 744 update_context_to_task(session_tmp, channel_tmp, ctx).await; 745 utils::spawn(async move { 746 recv_tcp_msg(session_tmp, channel_tmp, rd, cid).await; 747 }); 748} 749 750pub async fn update_context_to_task(session_id: u32, channel_id:u32, ctx: &mut ContextForward) { 751 let Some(task) = ForwardTaskMap::get(ctx.session_id, ctx.channel_id).await else { 752 crate::error!( 753 "update context to task is none session_id={:#?},channel_id={:#?}", 754 session_id, 755 channel_id 756 ); 757 return; 758 }; 759 let task = &mut task.clone(); 760 task.context_forward = ctx.clone(); 761 ForwardTaskMap::update(session_id, channel_id, task.clone()).await; 762} 763 764pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) { 765 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE]; 766 loop { 767 match rd.read(&mut data).await { 768 Ok(recv_size) => { 769 if recv_size == 0 { 770 crate::info!("recv_size is 0, tcp temporarily closed"); 771 free_context(cid, true).await; 772 return; 773 } 774 send_to_task( 775 session_id, 776 channel_id, 777 HdcCommand::ForwardData, 778 &data[0..recv_size], 779 recv_size, 780 cid, 781 ) 782 .await; 783 } 784 Err(_e) => { 785 free_context(cid, true).await; 786 crate::error!( 787 "recv tcp msg read failed session_id={session_id},channel_id={channel_id}" 788 ); 789 } 790 } 791 } 792} 793 794#[cfg(not(target_os = "windows"))] 795pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32) { 796 loop { 797 let result = ylong_runtime::spawn_blocking(move || { 798 let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE]; 799 let recv_size = UdsClient::wrap_recv(fd, &mut buffer); 800 (recv_size, buffer) 801 }) 802 .await; 803 let (recv_size, buffer) = match result { 804 Ok((recv_size, _)) if recv_size < 0 => { 805 crate::error!("local abstract close shutdown fd = {fd}"); 806 free_context(cid, true).await; 807 return; 808 } 809 Ok((recv_size, buffer)) => (recv_size, buffer), 810 Err(err) => { 811 crate::error!("read socket msg failed. {err}"); 812 free_context(cid, true).await; 813 return; 814 } 815 }; 816 send_to_task( 817 session_id, 818 channel_id, 819 HdcCommand::ForwardData, 820 &buffer[0..recv_size as usize], 821 recv_size as usize, 822 cid, 823 ) 824 .await; 825 } 826} 827 828pub async fn free_context(cid: u32, notify_remote: bool) { 829 let Some(context_forward) = ForwardContextMap::get(cid).await else { 830 crate::error!("free forward context get cid is none. cid = {cid}"); 831 return; 832 }; 833 let ctx = &mut context_forward.clone(); 834 835 if notify_remote { 836 let vec_none = Vec::<u8>::new(); 837 send_to_task( 838 ctx.session_id, 839 ctx.channel_id, 840 HdcCommand::ForwardFreeContext, 841 &vec_none, 842 0, 843 ctx.id, 844 ) 845 .await; 846 } 847 crate::error!("begin to free forward context cid. cid = {cid}"); 848 match ctx.forward_type { 849 ForwardType::Tcp => { 850 TcpWriteStreamMap::end(ctx.id).await; 851 } 852 ForwardType::Jdwp | ForwardType::Ark => { 853 TcpWriteStreamMap::end(ctx.id).await; 854 let ret = unsafe { libc::close(ctx.fd) }; 855 crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,); 856 let target_fd_ret = unsafe { libc::close(ctx.target_fd) }; 857 crate::debug!( 858 "close context_forward target fd, ret={}, id={}", 859 target_fd_ret, 860 ctx.id, 861 ); 862 } 863 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => { 864 crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd); 865 #[cfg(not(target_os = "windows"))] 866 UdsServer::wrap_close(ctx.fd); 867 ctx.fd = -1; 868 } 869 ForwardType::Device => {} 870 } 871 ForwardContextMap::remove(cid).await; 872} 873 874pub async fn setup_tcp_point(ctx: &mut ContextForward) -> bool { 875 let Ok(port) = ctx.local_args[1].parse::<u32>() else { 876 crate::error!("setup tcp point parse error"); 877 return false; 878 }; 879 let cid = ctx.id; 880 if ctx.is_master { 881 let result = forward_tcp_accept(ctx, port).await; 882 return result.is_ok(); 883 } else { 884 ForwardContextMap::update(ctx.id, ctx.clone()).await; 885 utils::spawn(async move { daemon_connect_tcp(cid, port).await }); 886 } 887 true 888} 889 890#[cfg(not(target_os = "windows"))] 891async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool { 892 let fd: i32 = UdsClient::wrap_socket(AF_UNIX); 893 ctx.fd = fd; 894 let name: Vec<u8> = path.as_bytes().to_vec(); 895 let mut socket_name = vec![0_u8; name.len() + 1]; 896 socket_name[0] = b'\0'; 897 name.iter().enumerate().for_each(|(i, e)| { 898 socket_name[i + 1] = *e; 899 }); 900 let addr = UdsAddr::parse_abstract(&socket_name[1..]); 901 ForwardContextMap::update(ctx.id, ctx.clone()).await; 902 let cid = ctx.id; 903 if let Ok(addr_obj) = &addr { 904 let ret = UdsServer::wrap_bind(fd, addr_obj); 905 if ret.is_err() { 906 hdctransfer::echo_client( 907 ctx.session_id, 908 ctx.channel_id, 909 "Unix pipe bind failed", 910 MessageLevel::Fail, 911 ) 912 .await; 913 crate::error!("bind fail"); 914 return false; 915 } 916 let ret = UdsServer::wrap_listen(fd); 917 if ret < 0 { 918 hdctransfer::echo_client( 919 ctx.session_id, 920 ctx.channel_id, 921 "Unix pipe listen failed", 922 MessageLevel::Fail, 923 ) 924 .await; 925 crate::error!("listen fail"); 926 return false; 927 } 928 utils::spawn(async move { 929 loop { 930 let client_fd = UdsServer::wrap_accept(fd); 931 if client_fd == -1 { 932 break; 933 } 934 on_accept(cid).await; 935 } 936 }); 937 } 938 true 939} 940 941pub async fn canonicalize(path: String) -> Result<String, Error> { 942 match fs::canonicalize(path) { 943 Ok(abs_path) => match abs_path.to_str() { 944 Some(path) => Ok(path.to_string()), 945 None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")), 946 }, 947 Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")), 948 } 949} 950 951#[cfg(target_os = "windows")] 952pub async fn setup_device_point(_ctx: &mut ContextForward) -> bool { 953 false 954} 955 956#[cfg(not(target_os = "windows"))] 957pub async fn setup_device_point(ctx: &mut ContextForward) -> bool { 958 let s_node_cfg = ctx.local_args[1].clone(); 959 let Ok(resolv_path) = canonicalize(s_node_cfg).await else { 960 crate::error!("Open unix-dev failed"); 961 return false; 962 }; 963 ctx.dev_path = resolv_path.clone(); 964 crate::info!("setup_ device_point resolv_path={:?}", resolv_path); 965 let thread_path_ref = Arc::new(Mutex::new(resolv_path)); 966 if !send_active_master(ctx).await { 967 crate::error!("send active_master return failed ctx={:?}", ctx); 968 return false; 969 } 970 let session = ctx.session_id; 971 let channel = ctx.channel_id; 972 let cid = ctx.id; 973 974 ForwardContextMap::update(ctx.id, ctx.clone()).await; 975 utils::spawn(async move { 976 loop { 977 let path = thread_path_ref.lock().await; 978 let Ok(mut file) = File::open(&*path) else { 979 crate::error!("open {} failed.", *path); 980 break; 981 }; 982 let mut total = Vec::new(); 983 let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] = 984 [0; config::FILE_PACKAGE_PAYLOAD_SIZE]; 985 let Ok(read_len) = file.read(&mut buf[4..]) else { 986 crate::error!("read {} failed.", *path); 987 break; 988 }; 989 if read_len == 0 { 990 free_context(cid, true).await; 991 break; 992 } 993 total.append(&mut buf[0..read_len].to_vec()); 994 send_to_task( 995 session, 996 channel, 997 HdcCommand::ForwardData, 998 &total, 999 read_len, 1000 cid, 1001 ) 1002 .await; 1003 } 1004 }); 1005 true 1006} 1007 1008#[cfg(not(feature = "host"))] 1009fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 { 1010 match forward_type == ForwardType::Jdwp { 1011 true => parameter.parse::<u32>().unwrap_or_else(|e| { 1012 crate::error!("Jdwp get pid err :{:?}", e); 1013 0_u32 1014 }), 1015 false => { 1016 let params: Vec<&str> = parameter.split('@').collect(); 1017 params[0].parse::<u32>().unwrap_or_else(|e| { 1018 crate::error!("get pid err :{:?}", e); 1019 0_u32 1020 }) 1021 } 1022 } 1023} 1024 1025#[cfg(feature = "host")] 1026pub async fn setup_jdwp_point(_ctx: &mut ContextForward) -> bool { 1027 crate::info!("host not setup_jdwp _point"); 1028 false 1029} 1030 1031#[cfg(not(feature = "host"))] 1032pub async fn setup_jdwp_point(ctx: &mut ContextForward) -> bool { 1033 let local_args = ctx.local_args[1].clone(); 1034 let parameter = local_args.as_str(); 1035 let style = &ctx.forward_type; 1036 let pid = get_pid(parameter, style.clone()); 1037 let cid = ctx.id; 1038 let session = ctx.session_id; 1039 let channel = ctx.channel_id; 1040 if pid == 0 { 1041 crate::error!("setup jdwp point get pid is 0"); 1042 return false; 1043 } 1044 1045 let result = UdsServer::wrap_socketpair(SOCK_STREAM); 1046 if result.is_err() { 1047 crate::error!("wrap socketpair failed"); 1048 return false; 1049 } 1050 let mut target_fd = 0; 1051 let mut local_fd = 0; 1052 if let Ok((fd0, fd1)) = result { 1053 crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1); 1054 local_fd = fd0; 1055 target_fd = fd1; 1056 ctx.fd = local_fd; 1057 ctx.target_fd = target_fd; 1058 target_fd = fd1; 1059 } 1060 1061 utils::spawn(async move { 1062 loop { 1063 let result = ylong_runtime::spawn_blocking(move || { 1064 let mut buffer = [0u8; SOCKET_BUFFER_SIZE]; 1065 let size = UdsServer::wrap_read(local_fd, &mut buffer); 1066 (size, buffer) 1067 }) 1068 .await; 1069 let (size, buffer) = match result { 1070 Ok((size, _)) if size < 0 => { 1071 crate::error!("disconnect fd:({local_fd}, {target_fd}), error:{:?}", size); 1072 free_context(cid, true).await; 1073 break; 1074 } 1075 Ok((0, _)) => { 1076 ylong_runtime::time::sleep(Duration::from_millis(200)).await; 1077 continue; 1078 } 1079 Ok((size, buffer)) => (size, buffer), 1080 Err(err) => { 1081 crate::error!("spawn_blocking failed. disconnect fd:({local_fd}, {target_fd}), error:{err}"); 1082 free_context(cid, true).await; 1083 break; 1084 } 1085 }; 1086 send_to_task( 1087 session, 1088 channel, 1089 HdcCommand::ForwardData, 1090 &buffer[0..size as usize], 1091 size as usize, 1092 cid, 1093 ) 1094 .await; 1095 } 1096 }); 1097 1098 let jdwp = Jdwp::get_instance(); 1099 let mut param = ctx.local_args[0].clone(); 1100 param.push(':'); 1101 param.push_str(parameter); 1102 1103 let ret = jdwp 1104 .send_fd_to_target(pid, target_fd, local_fd, param.as_str()) 1105 .await; 1106 if !ret { 1107 crate::error!("not found pid:{:?}", pid); 1108 hdctransfer::echo_client( 1109 session, 1110 channel, 1111 format!("fport fail:pid not found:{}", pid).as_str(), 1112 MessageLevel::Fail, 1113 ) 1114 .await; 1115 task_finish(session, channel).await; 1116 return false; 1117 } 1118 1119 let vec_none = Vec::<u8>::new(); 1120 send_to_task( 1121 session, 1122 channel, 1123 HdcCommand::ForwardActiveMaster, // 04 1124 &vec_none, 1125 0, 1126 cid, 1127 ) 1128 .await; 1129 crate::info!("setup_jdwp_ point return true"); 1130 true 1131} 1132 1133async fn task_finish(session_id: u32, channel_id: u32) { 1134 transfer_task_finish(channel_id, session_id).await; 1135} 1136 1137#[cfg(not(target_os = "windows"))] 1138pub async fn daemon_connect_pipe(ctx: &mut ContextForward) { 1139 let name: Vec<u8> = ctx.local_args[1].clone().as_bytes().to_vec(); 1140 let mut socket_name = vec![0_u8; name.len() + 1]; 1141 socket_name[0] = b'\0'; 1142 name.iter().enumerate().for_each(|(i, e)| { 1143 socket_name[i + 1] = *e; 1144 }); 1145 let addr = UdsAddr::parse_abstract(&socket_name[1..]); 1146 if let Ok(addr_obj) = &addr { 1147 let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj); 1148 if ret.is_err() { 1149 hdctransfer::echo_client( 1150 ctx.session_id, 1151 ctx.channel_id, 1152 "localabstract connect fail", 1153 MessageLevel::Fail, 1154 ) 1155 .await; 1156 free_context(ctx.id, true).await; 1157 return; 1158 } 1159 send_active_master(ctx).await; 1160 read_data_to_forward(ctx).await; 1161 } 1162} 1163 1164#[cfg(target_os = "windows")] 1165pub async fn setup_file_point(_ctx: &mut ContextForward) -> bool { 1166 false 1167} 1168 1169#[cfg(not(target_os = "windows"))] 1170pub async fn setup_file_point(ctx: &mut ContextForward) -> bool { 1171 let s_node_cfg = ctx.local_args[1].clone(); 1172 if ctx.is_master { 1173 if ctx.forward_type == ForwardType::Reserved || ctx.forward_type == ForwardType::FileSystem 1174 { 1175 let _ = fs::remove_file(s_node_cfg.clone()); 1176 } 1177 if !server_socket_bind_listen(ctx, s_node_cfg).await { 1178 crate::error!("server socket bind listen failed id={:?}", ctx.id); 1179 task_finish(ctx.session_id, ctx.channel_id).await; 1180 return false; 1181 } 1182 } else { 1183 if ctx.fd <= 0 { 1184 crate::info!("setup_file _point fd: {:?}", ctx.fd); 1185 if ctx.forward_type == ForwardType::Abstract { 1186 ctx.fd = UdsClient::wrap_socket(AF_LOCAL); 1187 unsafe { 1188 libc::fcntl(ctx.fd, F_SETFD, FD_CLOEXEC); 1189 } 1190 } else { 1191 ctx.fd = UdsClient::wrap_socket(AF_UNIX); 1192 } 1193 } 1194 ForwardContextMap::update(ctx.id, ctx.clone()).await; 1195 daemon_connect_pipe(ctx).await; 1196 } 1197 true 1198} 1199 1200pub async fn setup_point(ctx: &mut ContextForward) -> bool { 1201 if !detech_forward_type(ctx).await { 1202 crate::error!("forward type is not true"); 1203 return false; 1204 } 1205 1206 if cfg!(target_os = "windows") && ctx.forward_type != ForwardType::Tcp { 1207 ctx.last_error = String::from("Not support forward-type"); 1208 return false; 1209 } 1210 1211 let mut ret = false; 1212 match ctx.forward_type { 1213 ForwardType::Tcp => { 1214 ret = setup_tcp_point(ctx).await; 1215 } 1216 ForwardType::Device => { 1217 if !cfg!(target_os = "windows") { 1218 ret = setup_device_point(ctx).await; 1219 } 1220 } 1221 ForwardType::Jdwp | ForwardType::Ark => { 1222 crate::info!("setup point ark case"); 1223 if !cfg!(feature = "host") { 1224 ret = setup_jdwp_point(ctx).await; 1225 } 1226 } 1227 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => { 1228 if !cfg!(target_os = "windows") { 1229 ret = setup_file_point(ctx).await; 1230 } 1231 } 1232 }; 1233 ForwardContextMap::update(ctx.id, ctx.clone()).await; 1234 update_context_to_task(ctx.session_id, ctx.channel_id, ctx).await; 1235 ret 1236} 1237 1238pub async fn send_to_task( 1239 session_id: u32, 1240 channel_id: u32, 1241 command: HdcCommand, 1242 buf_ptr: &[u8], 1243 buf_size: usize, 1244 cid: u32, 1245) -> bool { 1246 if buf_size > (config::MAX_SIZE_IOBUF * 2) { 1247 crate::error!("send task buf_size oversize"); 1248 return false; 1249 } 1250 1251 let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat(); 1252 new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]); 1253 let file_check_message = TaskMessage { 1254 channel_id, 1255 command, 1256 payload: new_buf, 1257 }; 1258 transfer::put(session_id, file_check_message).await; 1259 true 1260} 1261 1262pub fn get_cid(_payload: &[u8]) -> u32 { 1263 let mut id_bytes = [0u8; 4]; 1264 id_bytes.copy_from_slice(&_payload[0..4]); 1265 let id: u32 = u32::from_be_bytes(id_bytes); 1266 id 1267} 1268 1269pub async fn send_active_master(ctx: &mut ContextForward) -> bool { 1270 if ctx.check_order { 1271 let flag = [0u8; 1]; 1272 send_to_task( 1273 ctx.session_id, 1274 ctx.channel_id, 1275 HdcCommand::ForwardCheckResult, 1276 &flag, 1277 1, 1278 ctx.id, 1279 ) 1280 .await; 1281 free_context(ctx.id, false).await; 1282 return true; 1283 } 1284 if !send_to_task( 1285 ctx.session_id, 1286 ctx.channel_id, 1287 HdcCommand::ForwardActiveMaster, 1288 &Vec::<u8>::new(), 1289 0, 1290 ctx.id, 1291 ) 1292 .await 1293 { 1294 free_context(ctx.id, true).await; 1295 return false; 1296 } 1297 true 1298} 1299 1300pub fn forward_parse_cmd(context_forward: &mut ContextForward) -> bool { 1301 let command = context_forward.task_command.clone(); 1302 let result = Base::split_command_to_args(&command); 1303 let argv = result.0; 1304 let argc = result.1; 1305 1306 if argc < ARG_COUNT2 { 1307 crate::error!("argc < 2 parse is failed."); 1308 context_forward.last_error = "Too few arguments.".to_string(); 1309 return false; 1310 } 1311 if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL { 1312 crate::error!("parse's length is flase."); 1313 context_forward.last_error = "Some argument too long.".to_string(); 1314 return false; 1315 } 1316 if !check_node_info(&argv[0], &mut context_forward.local_args) { 1317 crate::error!("check argv[0] node info is flase."); 1318 context_forward.last_error = "Arguments parsing failed.".to_string(); 1319 return false; 1320 } 1321 if !check_node_info(&argv[1], &mut context_forward.remote_args) { 1322 crate::error!("check argv[1] node info is flase."); 1323 context_forward.last_error = "Arguments parsing failed.".to_string(); 1324 return false; 1325 } 1326 context_forward.remote_parameters = argv[1].clone(); 1327 true 1328} 1329 1330pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool { 1331 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else { 1332 crate::error!("begin forward get task is none session_id={session_id},channel_id={channel_id}" 1333 ); 1334 return false; 1335 }; 1336 let task: &mut HdcForward = &mut task.clone(); 1337 let Ok(command) = String::from_utf8(_payload.to_vec()) else { 1338 crate::error!("cmd argv is not int utf8"); 1339 return false; 1340 }; 1341 let mut context_forward = malloc_context(session_id, channel_id, true).await; 1342 context_forward.task_command = command.clone(); 1343 1344 if !forward_parse_cmd(&mut context_forward) { 1345 task.context_forward = context_forward.clone(); 1346 ForwardTaskMap::update(session_id, channel_id, task.clone()).await; 1347 return false; 1348 } 1349 if !setup_point(&mut context_forward).await { 1350 task.context_forward = context_forward.clone(); 1351 ForwardTaskMap::update(session_id, channel_id, task.clone()).await; 1352 return false; 1353 } 1354 1355 let wake_up_message = TaskMessage { 1356 channel_id, 1357 command: HdcCommand::KernelWakeupSlavetask, 1358 payload: Vec::<u8>::new(), 1359 }; 1360 transfer::put(context_forward.session_id, wake_up_message).await; 1361 1362 let buf_string: Vec<u8> = context_forward.remote_parameters.as_bytes().to_vec(); 1363 let mut new_buf = vec![0_u8; buf_string.len() + 9]; 1364 buf_string.iter().enumerate().for_each(|(i, e)| { 1365 new_buf[i + 8] = *e; 1366 }); 1367 send_to_task( 1368 context_forward.session_id, 1369 context_forward.channel_id, 1370 HdcCommand::ForwardCheck, 1371 &new_buf, 1372 buf_string.len() + 9, 1373 context_forward.id, 1374 ) 1375 .await; 1376 true 1377} 1378 1379pub async fn slave_connect( 1380 session_id: u32, 1381 channel_id: u32, 1382 payload: &[u8], 1383 check_order: bool, 1384) -> bool { 1385 let mut context_forward = malloc_context(session_id, channel_id, false).await; 1386 context_forward.check_order = check_order; 1387 if let Ok((content, id)) = filter_command(payload) { 1388 let content = &content[8..].trim_end_matches('\0').to_string(); 1389 context_forward.task_command = content.clone(); 1390 if !check_node_info(content, &mut context_forward.local_args) { 1391 crate::error!("check local args is false"); 1392 return false; 1393 } 1394 context_forward.id = id; 1395 ForwardContextMap::update(id, context_forward.clone()).await; 1396 } 1397 1398 if !check_order { 1399 if !setup_point(&mut context_forward).await { 1400 crate::error!("setup point return false, free context"); 1401 free_context(context_forward.id, true).await; 1402 ForwardContextMap::update(context_forward.id, context_forward.clone()).await; 1403 return false; 1404 } 1405 } else { 1406 send_active_master(&mut context_forward).await; 1407 } 1408 ForwardContextMap::update(context_forward.id, context_forward.clone()).await; 1409 true 1410} 1411 1412pub async fn read_data_to_forward(ctx: &mut ContextForward) { 1413 let cid = ctx.id; 1414 let session = ctx.session_id; 1415 let channel = ctx.channel_id; 1416 match ctx.forward_type { 1417 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => { 1418 #[cfg(not(target_os = "windows"))] 1419 { 1420 let fd_temp = ctx.fd; 1421 utils::spawn(async move { 1422 deamon_read_socket_msg(session, channel, fd_temp, cid).await 1423 }); 1424 } 1425 } 1426 ForwardType::Device => { 1427 #[cfg(not(target_os = "windows"))] 1428 setup_device_point(ctx).await; 1429 } 1430 _ => {} 1431 } 1432} 1433 1434pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> { 1435 let bytes = &_payload[4..]; 1436 let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec()); 1437 if let Ok(content) = ct { 1438 let mut id_bytes = [0u8; 4]; 1439 id_bytes.copy_from_slice(&_payload[0..4]); 1440 let id: u32 = u32::from_be_bytes(id_bytes); 1441 return Ok((content, id)); 1442 } 1443 Err(Error::new(ErrorKind::Other, "filter command failure")) 1444} 1445 1446pub async fn dev_write_bufer(path: String, content: Vec<u8>) { 1447 utils::spawn(async move { 1448 let open_result = OpenOptions::new() 1449 .write(true) 1450 .create(true) 1451 .open(path.clone()); 1452 1453 match open_result { 1454 Ok(mut file) => { 1455 let write_result = file.write_all(content.as_slice()); 1456 match write_result { 1457 Ok(()) => {} 1458 Err(e) => { 1459 crate::error!("dev write bufer to file fail:{:#?}", e); 1460 } 1461 } 1462 } 1463 Err(e) => { 1464 crate::error!("dev write bufer fail:{:#?}", e); 1465 } 1466 } 1467 }); 1468} 1469 1470pub async fn write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool { 1471 if ctx.forward_type == ForwardType::Tcp { 1472 return TcpWriteStreamMap::write(ctx.id, content).await; 1473 } else if ctx.forward_type == ForwardType::Device { 1474 let path_ref = ctx.dev_path.clone(); 1475 if path_ref.is_empty() { 1476 crate::error!( 1477 "write_forward_bufer get dev_path is failed ctx.id = {:#?}", 1478 ctx.id 1479 ); 1480 return false; 1481 } 1482 dev_write_bufer(path_ref, content).await; 1483 } else { 1484 #[cfg(not(target_os = "windows"))] 1485 { 1486 let fd = ctx.fd; 1487 if UdsClient::wrap_send(fd, &content) < 0 { 1488 crate::info!("write forward bufer failed. fd = {fd}"); 1489 return false; 1490 } 1491 } 1492 } 1493 true 1494} 1495 1496#[allow(unused)] 1497pub async fn malloc_context( 1498 session_id: u32, 1499 channel_id: u32, 1500 master_slave: bool, 1501) -> ContextForward { 1502 let mut ctx = ContextForward { 1503 ..Default::default() 1504 }; 1505 ctx.id = utils::get_current_time() as u32; 1506 ctx.session_id = session_id; 1507 ctx.channel_id = channel_id; 1508 ctx.is_master = master_slave; 1509 crate::info!("malloc_context success id = {:#?}", ctx.id); 1510 ForwardContextMap::update(ctx.id, ctx.clone()).await; 1511 ctx 1512} 1513 1514pub async fn forward_command_dispatch( 1515 session_id: u32, 1516 channel_id: u32, 1517 command: HdcCommand, 1518 _payload: &[u8], 1519) -> bool { 1520 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else { 1521 crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}" 1522 ); 1523 return false; 1524 }; 1525 let task: &mut HdcForward = &mut task.clone(); 1526 let mut ret: bool = true; 1527 let cid = get_cid(_payload); 1528 let send_msg = _payload[4..].to_vec(); 1529 1530 let Some(context_forward) = ForwardContextMap::get(cid).await else { 1531 crate::error!("forward command dispatch get context is none cid={cid}"); 1532 return false; 1533 }; 1534 let ctx = &mut context_forward.clone(); 1535 match command { 1536 HdcCommand::ForwardCheckResult => { 1537 ret = check_command(ctx, _payload, task.server_or_daemon).await; 1538 } 1539 HdcCommand::ForwardData => { 1540 ret = write_forward_bufer(ctx, send_msg).await; 1541 } 1542 HdcCommand::ForwardFreeContext => { 1543 free_context(ctx.id, false).await; 1544 } 1545 HdcCommand::ForwardActiveMaster => { 1546 read_data_to_forward(ctx).await; 1547 ret = true; 1548 } 1549 _ => { 1550 ret = false; 1551 } 1552 } 1553 ret 1554} 1555 1556async fn print_error_info(session_id: u32, channel_id: u32) { 1557 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else { 1558 crate::error!( 1559 "detech_forward_type get task is none session_id = {:#?}, channel_id = {:#?}", 1560 session_id, 1561 channel_id 1562 ); 1563 return; 1564 }; 1565 let task = &mut task.clone(); 1566 let ctx = &task.context_forward; 1567 let mut echo_content = ctx.last_error.clone(); 1568 1569 if echo_content.is_empty() { 1570 echo_content = "Forward parament failed".to_string(); 1571 } 1572 1573 hdctransfer::echo_client( 1574 session_id, 1575 channel_id, 1576 echo_content.as_str(), 1577 MessageLevel::Fail, 1578 ) 1579 .await; 1580} 1581 1582pub async fn command_dispatch( 1583 session_id: u32, 1584 channel_id: u32, 1585 command: HdcCommand, 1586 payload: &[u8], 1587 _payload_size: u16, 1588) -> bool { 1589 if command != HdcCommand::ForwardData { 1590 crate::info!("command_dispatch command recv: {:?}", command); 1591 } 1592 1593 let ret = match command { 1594 HdcCommand::ForwardInit => begin_forward(session_id, channel_id, payload).await, 1595 HdcCommand::ForwardCheck => slave_connect(session_id, channel_id, payload, true).await, 1596 HdcCommand::ForwardActiveSlave => { 1597 slave_connect(session_id, channel_id, payload, false).await 1598 } 1599 _ => forward_command_dispatch(session_id, channel_id, command, payload).await, 1600 }; 1601 if !ret { 1602 print_error_info(session_id, channel_id).await; 1603 task_finish(session_id, channel_id).await; 1604 return false; 1605 } 1606 ret 1607} 1608