1/* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15//! hdcfile 16#![allow(missing_docs)] 17 18use crate::transfer; 19use std::fs::metadata; 20 21use std::collections::HashMap; 22use std::io; 23use std::path::Path; 24use std::sync::Arc; 25use std::io::{Error, ErrorKind}; 26#[cfg(feature = "host")] 27extern crate ylong_runtime_static as ylong_runtime; 28use ylong_runtime::sync::Mutex; 29 30use crate::common::filemanager::FileManager; 31use crate::common::hdctransfer::*; 32use crate::config::CompressType; 33use crate::config::HdcCommand; 34use crate::config::MessageLevel; 35use crate::config::TaskMessage; 36use crate::config::MAX_SIZE_IOBUF; 37use crate::serializer::serialize::Serialization; 38 39use super::base::Base; 40use super::hdctransfer; 41use crate::serializer::native_struct::TransferConfig; 42use crate::utils; 43#[cfg(not(feature = "host"))] 44use crate::utils::hdc_log::*; 45#[derive(Debug, Default, Clone, PartialEq, Eq)] 46pub struct HdcFile { 47 pub file_cnt: u32, 48 pub dir_size: u64, 49 pub file_size: u64, 50 pub file_begin_time: u64, 51 pub dir_begin_time: u64, 52 pub transfer: HdcTransferBase, 53} 54 55impl HdcFile { 56 pub fn new(_session_id: u32, _channel_id: u32) -> Self { 57 Self { 58 transfer: HdcTransferBase::new(_session_id, _channel_id), 59 ..Default::default() 60 } 61 } 62} 63type HdcFile_ = Arc<Mutex<HdcFile>>; 64type FileTaskMap_ = Arc<Mutex<HashMap<(u32, u32), HdcFile_>>>; 65pub struct FileTaskMap {} 66impl FileTaskMap { 67 fn get_instance() -> FileTaskMap_ { 68 static mut MAP: Option<FileTaskMap_> = None; 69 unsafe { 70 MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new()))) 71 .clone() 72 } 73 } 74 75 pub async fn put(session_id: u32, channel_id: u32, value: HdcFile) { 76 let map = Self::get_instance(); 77 let mut map = map.lock().await; 78 map.insert((session_id, channel_id), Arc::new(Mutex::new(value))); 79 } 80 81 pub async fn exsit(session_id: u32, channel_id: u32) -> bool { 82 let arc = Self::get_instance(); 83 let map = arc.lock().await; 84 let task = map.get(&(session_id, channel_id)); 85 task.is_some() 86 } 87 88 pub async fn remove(session_id: u32, channel_id: u32) -> Option<HdcFile_> { 89 let arc = Self::get_instance(); 90 let mut map = arc.lock().await; 91 map.remove(&(session_id, channel_id)) 92 } 93 94 pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcFile_> { 95 let arc = Self::get_instance(); 96 let map = arc.lock().await; 97 let task = map.get(&(session_id, channel_id)); 98 task.cloned() 99 } 100 101 async fn stop_task(session_id: u32) { 102 let arc = Self::get_instance(); 103 let map = arc.lock().await; 104 crate::info!("hdcfile stop task, session_id:{}, task_size: {}", session_id, map.len()); 105 for _iter in map.iter() { 106 if _iter.0 .0 != session_id { 107 continue; 108 } 109 let mut task = _iter.1.lock().await; 110 task.transfer.stop_run = true; 111 crate::info!( 112 "session_id:{}, channel_id:{}, set stop_run as true.", 113 session_id, 114 _iter.0 .1 115 ); 116 } 117 } 118 119 async fn dump_task() -> String { 120 let arc = Self::get_instance(); 121 let map = arc.lock().await; 122 let mut result = String::new(); 123 for _iter in map.iter() { 124 let task = _iter.1.lock().await; 125 let command = task.transfer.command_str.clone(); 126 let line = format!( 127 "session_id:{},\tchannel_id:{},\tcommand:{}\n", 128 _iter.0 .0, _iter.0 .1, command 129 ); 130 result.push_str(line.as_str()); 131 } 132 result 133 } 134} 135 136async fn check_local_path(session_id: u32, channel_id: u32) -> bool { 137 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 138 crate::error!( 139 "check_local_path get task is none session_id={session_id},channel_id={channel_id}" 140 ); 141 return false; 142 }; 143 let mut file_task = task.lock().await; 144 let local_path = file_task.transfer.local_path.clone(); 145 let mut file_manager = FileManager::new(local_path); 146 let (open_result, err_msg) = file_manager.open(); 147 if open_result { 148 file_task.transfer.transfer_config.file_size = file_manager.file_size(); 149 file_task.transfer.file_size = file_task.transfer.transfer_config.file_size; 150 file_task.file_size = file_task.transfer.transfer_config.file_size; 151 file_task.transfer.transfer_config.optional_name = file_task.transfer.local_name.clone(); 152 if transfer::base::CheckCompressVersion::get().await 153 && (file_task.transfer.transfer_config.file_size > (MAX_SIZE_IOBUF as u64)) 154 { 155 file_task.transfer.transfer_config.compress_type = CompressType::Lz4 as u8; 156 } 157 file_task.transfer.transfer_config.path = file_task.transfer.remote_path.clone(); 158 let command_str = format!( 159 "[file send], local_path:{}, optional_name:{}", 160 file_task.transfer.local_path.clone(), 161 file_task.transfer.transfer_config.optional_name 162 ); 163 file_task 164 .transfer 165 .command_str 166 .push_str(command_str.as_str()); 167 return true; 168 } else { 169 hdctransfer::echo_client( 170 session_id, 171 channel_id, 172 err_msg.as_str(), 173 MessageLevel::Fail, 174 ) 175 .await; 176 } 177 false 178} 179 180async fn echo_finish(session_id: u32, channel_id: u32, msg: String) { 181 hdctransfer::echo_client( 182 session_id, 183 channel_id, 184 msg.as_str(), 185 MessageLevel::Ok, 186 ) 187 .await; 188 task_finish(session_id, channel_id).await; 189} 190 191pub async fn begin_transfer(session_id: u32, channel_id: u32, command: &String) -> bool { 192 let (argv, argc) = Base::split_command_to_args(command); 193 if argc < 2 { 194 echo_finish( 195 session_id, 196 channel_id, 197 "Transfer path split failed.".to_string(), 198 ) 199 .await; 200 return false; 201 } 202 match set_master_parameters(session_id, channel_id, command, argc, argv).await { 203 Ok(_) => (), 204 Err(e) => { 205 echo_fail(session_id, channel_id, e, false).await; 206 return false; 207 } 208 } 209 210 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 211 crate::error!( 212 "begin_transfer get task is none session_id={session_id},channel_id={channel_id}" 213 ); 214 return false; 215 }; 216 let mut task = task.lock().await; 217 task.transfer.is_master = true; 218 drop(task); 219 220 let ret = check_local_path(session_id, channel_id).await; 221 if !ret { 222 do_file_finish(session_id, channel_id, &[1]).await; 223 return true; 224 } 225 226 put_file_check(session_id, channel_id).await; 227 true 228} 229 230async fn set_master_parameters( 231 session_id: u32, 232 channel_id: u32, 233 _command: &str, 234 argc: u32, 235 argv: Vec<String>, 236) -> Result<bool, Error> { 237 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 238 crate::error!( 239 "set_master_parameters get task is none session_id={session_id},channel_id={channel_id}" 240 ); 241 return Err(Error::new(ErrorKind::Other, "Other failed")); 242 }; 243 let mut task = task.lock().await; 244 let mut i: usize = 0; 245 let mut src_argv_index = 0u32; 246 if task.transfer.server_or_daemon { 247 src_argv_index += 2; // 2: represent the host parameters: "file" "send". 248 } // else: src_argv_index += 0: the host parameters "file" "recv" will be filtered. 249 while i < argc as usize { 250 match &argv[i] as &str { 251 "-z" => { 252 task.transfer.transfer_config.compress_type = CompressType::Lz4 as u8; 253 src_argv_index += 1; 254 } 255 "-a" => { 256 task.transfer.transfer_config.hold_timestamp = true; 257 src_argv_index += 1; 258 } 259 "-sync" => { 260 task.transfer.transfer_config.update_if_new = true; 261 src_argv_index += 1; 262 } 263 "-m" => { 264 src_argv_index += 1; 265 } 266 "-remote" => { 267 src_argv_index += 1; 268 } 269 "-cwd" => { 270 src_argv_index += 2; 271 task.transfer.transfer_config.client_cwd = argv.get(i + 1).unwrap().clone(); 272 } 273 _ => {} 274 } 275 i += 1; 276 } 277 if argc == src_argv_index { 278 crate::error!("set_master_parameters argc = {:#?} return false", argc); 279 return Err(Error::new(ErrorKind::Other, "There is no local and remote path")); 280 } 281 task.transfer.remote_path = argv.last().unwrap().clone(); 282 task.transfer.local_path = argv.get(argv.len() - 2).unwrap().clone(); 283 if task.transfer.server_or_daemon { 284 if src_argv_index + 1 == argc { 285 crate::error!("src_argv_index = {:#?} return false", src_argv_index); 286 return Err(Error::new(ErrorKind::Other, "There is no remote path")); 287 } 288 let cwd = task.transfer.transfer_config.client_cwd.clone(); 289 task.transfer.local_path = Base::extract_relative_path(&cwd, &task.transfer.local_path); 290 } else if src_argv_index + 1 == argc { 291 task.transfer.remote_path = String::from("."); 292 task.transfer.local_path = argv.get((argc - 1) as usize).unwrap().clone(); 293 } 294 task.transfer.local_name = Base::get_file_name(&mut task.transfer.local_path).unwrap(); 295 match metadata(task.transfer.local_path.clone()) { 296 Ok(metadata) => { 297 if !metadata.is_dir() { 298 task.transfer.is_dir = false; 299 return Ok(true); 300 } 301 task.transfer.is_dir = true; 302 task.transfer.task_queue = get_sub_files_resurively(&task.transfer.local_path.clone()); 303 task.transfer.base_local_path = get_base_path(task.transfer.local_path.clone()); 304 305 if !task.transfer.task_queue.is_empty() { 306 task.transfer.local_path = task.transfer.task_queue.pop().unwrap(); 307 task.transfer.local_name = 308 match Base::get_relative_path(&task.transfer.base_local_path, &task.transfer.local_path) { 309 Some(relative_path) => relative_path, 310 None => task.transfer.local_path.clone() 311 }; 312 } else { 313 crate::error!("task transfer task_queue is empty"); 314 return Err(Error::new(ErrorKind::Other, "Operation failed, because the source folder is empty.")); 315 } 316 }, 317 Err(error) => { 318 let err_msg = format!("Error opening file: {}, path: {}", error, task.transfer.local_path); 319 crate::error!("{}", err_msg); 320 return Err(Error::new(ErrorKind::Other, err_msg)); 321 }, 322 } 323 Ok(true) 324} 325 326fn get_base_path(path: String) -> String { 327 let p = Path::new(path.as_str()); 328 let parent_path = p.parent(); 329 if let Some(pp) = parent_path { 330 pp.display().to_string() 331 } else { 332 path 333 } 334} 335 336async fn put_file_check(session_id: u32, channel_id: u32) { 337 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 338 return; 339 }; 340 let task = task.lock().await; 341 let file_check_message = TaskMessage { 342 channel_id, 343 command: HdcCommand::FileCheck, 344 payload: task.transfer.transfer_config.serialize(), 345 }; 346 transfer::put(task.transfer.session_id, file_check_message).await; 347} 348 349pub async fn check_slaver(session_id: u32, channel_id: u32, _payload: &[u8]) -> Result<bool, Error> { 350 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 351 crate::error!( 352 "check_slaver get task is none session_id={session_id:?},channel_id={channel_id:?}" 353 ); 354 return Err(Error::new(ErrorKind::Other, "Other failed")); 355 }; 356 let mut task = task.lock().await; 357 let mut transconfig = TransferConfig { 358 ..Default::default() 359 }; 360 let _ = transconfig.parse(_payload.to_owned()); 361 task.transfer.file_size = transconfig.file_size; 362 task.file_size = transconfig.file_size; 363 task.transfer.local_path = transconfig.path; 364 task.transfer.is_master = false; 365 task.transfer.index = 0; 366 let command_str = format!( 367 "[file recv],\t local_path:{},\t optional_name:{}\t", 368 task.transfer.local_path.clone(), 369 transconfig.optional_name 370 ); 371 task.transfer.command_str.push_str(command_str.as_str()); 372 let local_path = task.transfer.local_path.clone(); 373 let optional_name = transconfig.optional_name.clone(); 374 task.transfer.transfer_config.compress_type = transconfig.compress_type; 375 match hdctransfer::check_local_path(&mut task.transfer, &local_path, &optional_name) { 376 Ok(_) => (), 377 Err(e) => { 378 crate::error!("check_local_path return false channel_id={:#?}", channel_id); 379 return Err(e); 380 }, 381 } 382 if task.transfer.transfer_config.update_if_new { 383 crate::error!("task.transfer.transfer_config.update_if_new is true"); 384 return Err(Error::new(ErrorKind::Other, "Other failed")); 385 } 386 if task.dir_begin_time == 0 { 387 task.dir_begin_time = utils::get_current_time(); 388 } 389 task.file_begin_time = utils::get_current_time(); 390 Ok(true) 391} 392 393pub async fn wake_up_slaver(session_id: u32, channel_id: u32) { 394 let wake_up_message = TaskMessage { 395 channel_id, 396 command: HdcCommand::KernelWakeupSlavetask, 397 payload: Vec::<u8>::new(), 398 }; 399 transfer::put(session_id, wake_up_message).await; 400} 401 402async fn put_file_begin(session_id: u32, channel_id: u32) { 403 let file_begin_message = TaskMessage { 404 channel_id, 405 command: HdcCommand::FileBegin, 406 payload: Vec::<u8>::new(), 407 }; 408 transfer::put(session_id, file_begin_message).await; 409} 410 411async fn transfer_next(session_id: u32, channel_id: u32) -> bool { 412 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 413 crate::error!( 414 "transfer_next get task is none session_id={session_id:?},channel_id={channel_id:?}" 415 ); 416 return false; 417 }; 418 let mut task = task.lock().await; 419 let Some(local_path) = task.transfer.task_queue.pop() else { 420 crate::error!( 421 "transfer_next get local path is none session_id={session_id:?},channel_id={channel_id:?}" 422 ); 423 return false; 424 }; 425 task.transfer.local_path = local_path; 426 task.transfer.local_name = 427 match Base::get_relative_path(&task.transfer.base_local_path, &task.transfer.local_path) { 428 Some(relative_path) => relative_path, 429 None => task.transfer.local_path.clone() 430 }; 431 drop(task); 432 check_local_path(session_id, channel_id).await 433} 434 435async fn on_all_transfer_finish(session_id: u32, channel_id: u32) { 436 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 437 crate::error!( 438 "on_all_transfer_finish get task is none session_id={session_id:?},channel_id={channel_id:?}" 439 ); 440 return; 441 }; 442 let task = task.lock().await; 443 let last_error = task.transfer.last_error; 444 let size = if task.file_cnt > 1 { 445 task.dir_size 446 } else { 447 task.file_size 448 }; 449 let time = if task.file_cnt > 1 { 450 utils::get_current_time() - task.dir_begin_time 451 } else { 452 utils::get_current_time() - task.file_begin_time 453 }; 454 let rate = size as f64 / time as f64; 455 #[allow(unused_variables)] 456 let message = if last_error == 0 { 457 format!( 458 "FileTransfer finish, Size:{}, File count = {}, time:{}ms rate:{:.2}kB/s", 459 size, task.file_cnt, time, rate 460 ) 461 } else { 462 format!( 463 "Transfer failed: {}: {}", 464 task.transfer.local_path, 465 io::Error::from_raw_os_error(last_error as i32), 466 ) 467 }; 468 #[cfg(feature = "host")] 469 { 470 let level = if last_error == 0 { 471 transfer::EchoLevel::OK 472 } else { 473 transfer::EchoLevel::FAIL 474 }; 475 let _ = 476 transfer::send_channel_msg(task.transfer.channel_id, level, message) 477 .await; 478 hdctransfer::close_channel(channel_id).await; 479 return; 480 } 481 #[allow(unreachable_code)] 482 { 483 let level = if last_error == 0 { 484 MessageLevel::Ok 485 } else { 486 MessageLevel::Fail 487 }; 488 hdctransfer::echo_client( 489 task.transfer.session_id, 490 task.transfer.channel_id, 491 message.as_str(), 492 level, 493 ) 494 .await; 495 hdctransfer::close_channel(channel_id).await; 496 } 497} 498 499async fn is_task_queue_empty(session_id: u32, channel_id: u32) -> bool { 500 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 501 crate::error!( 502 "do_file_finish get task is none session_id={session_id:?},channel_id={channel_id:?}" 503 ); 504 return false; 505 }; 506 let task = task.lock().await; 507 task.transfer.task_queue.is_empty() 508} 509 510async fn do_file_finish(session_id: u32, channel_id: u32, _payload: &[u8]) { 511 if _payload[0] == 1 { 512 while !is_task_queue_empty(session_id, channel_id).await { 513 if transfer_next(session_id, channel_id).await { 514 put_file_check(session_id, channel_id).await; 515 return; 516 } 517 } 518 519 if is_task_queue_empty(session_id, channel_id).await { 520 let _finish_message = TaskMessage { 521 channel_id, 522 command: HdcCommand::FileFinish, 523 payload: [0].to_vec(), 524 }; 525 526 transfer::put(session_id, _finish_message).await; 527 } 528 } else { 529 on_all_transfer_finish(session_id, channel_id).await; 530 task_finish(session_id, channel_id).await; 531 } 532} 533 534async fn put_file_finish(session_id: u32, channel_id: u32) { 535 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 536 crate::error!( 537 "put_file_finish get task is none session_id={session_id:?},channel_id={channel_id:?}" 538 ); 539 return; 540 }; 541 let mut task = task.lock().await; 542 let _payload: [u8; 1] = [1]; 543 task.file_cnt += 1; 544 task.dir_size += task.file_size; 545 let task_finish_message = TaskMessage { 546 channel_id, 547 command: HdcCommand::FileFinish, 548 payload: _payload.to_vec(), 549 }; 550 transfer::put(session_id, task_finish_message).await; 551} 552 553pub async fn command_dispatch( 554 session_id: u32, 555 channel_id: u32, 556 _command: HdcCommand, 557 _payload: &[u8], 558 _payload_size: u16, 559) -> bool { 560 match _command { 561 HdcCommand::FileInit => { 562 let s = String::from_utf8(_payload.to_vec()); 563 match s { 564 Ok(str) => { 565 wake_up_slaver(session_id, channel_id).await; 566 begin_transfer(session_id, channel_id, &str).await; 567 } 568 Err(e) => { 569 let err_msg = format!("Transfer failed: arguments is invalid {:?}", e); 570 crate::error!("HdcCommand::FileInit: {}", err_msg); 571 echo_finish(session_id, channel_id, err_msg.to_string()).await; 572 } 573 } 574 } 575 HdcCommand::FileCheck => { 576 match check_slaver(session_id, channel_id, _payload).await { 577 Ok(_) => { 578 put_file_begin(session_id, channel_id).await; 579 }, 580 Err(e) => { 581 echo_fail(session_id, channel_id, e, true).await; 582 } 583 } 584 } 585 HdcCommand::FileBegin => { 586 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 587 crate::error!( 588 "command_dispatch get task is none session_id={session_id:?},channel_id={channel_id:?}" 589 ); 590 return false; 591 }; 592 let task = task.lock().await; 593 hdctransfer::transfer_begin(&task.transfer, HdcCommand::FileData).await; 594 } 595 HdcCommand::FileData => { 596 let Some(task) = FileTaskMap::get(session_id, channel_id).await else { 597 crate::error!( 598 "command_dispatch get task is none session_id={session_id:?},channel_id={channel_id:?}" 599 ); 600 return false; 601 }; 602 let mut task = task.lock().await; 603 if hdctransfer::transfer_data(&mut task.transfer, _payload) { 604 drop(task); 605 put_file_finish(session_id, channel_id).await; 606 } 607 } 608 HdcCommand::FileMode | HdcCommand::DirMode => { 609 put_file_mode(session_id, channel_id).await; 610 } 611 HdcCommand::FileFinish => { 612 do_file_finish(session_id, channel_id, _payload).await; 613 } 614 _ => { 615 crate::error!("others, command {:?}", _command); 616 } 617 } 618 619 true 620} 621 622async fn put_file_mode(session_id: u32, channel_id: u32) { 623 let task_message = TaskMessage { 624 channel_id, 625 command: HdcCommand::FileMode, 626 payload: Vec::<u8>::new(), 627 }; 628 transfer::put(session_id, task_message).await; 629} 630 631async fn task_finish(session_id: u32, channel_id: u32) { 632 hdctransfer::transfer_task_finish(channel_id, session_id).await; 633} 634 635pub async fn stop_task(session_id: u32) { 636 FileTaskMap::stop_task(session_id).await; 637} 638 639pub async fn dump_task() -> String { 640 FileTaskMap::dump_task().await 641} 642 643pub async fn echo_fail(session_id: u32, channel_id: u32, error: Error, is_checked: bool) { 644 let message = match FileTaskMap::get(session_id, channel_id).await { 645 Some(task) => { 646 if is_checked { 647 let task = task.lock().await; 648 format!("Error opening file: {}, path: {}", error, task.transfer.local_path) 649 } else { 650 format!("{}", error) 651 } 652 } 653 None => format!( 654 "Error opening file: {}, path: {}", 655 error, 656 "cannot get file path from FileTaskMap", 657 ) 658 }; 659 hdctransfer::echo_client( 660 session_id, 661 channel_id, 662 message.as_str(), 663 MessageLevel::Fail, 664 ) 665 .await; 666 task_finish(session_id, channel_id).await; 667}