1cc290419Sopenharmony_ci/* 2cc290419Sopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd. 3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License. 5cc290419Sopenharmony_ci * You may obtain a copy of the License at 6cc290419Sopenharmony_ci * 7cc290419Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8cc290419Sopenharmony_ci * 9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and 13cc290419Sopenharmony_ci * limitations under the License. 14cc290419Sopenharmony_ci */ 15cc290419Sopenharmony_ci//! hdctransfer 16cc290419Sopenharmony_ci#![allow(missing_docs)] 17cc290419Sopenharmony_ciuse std::collections::VecDeque; 18cc290419Sopenharmony_ciuse std::fs::{self, File, OpenOptions, metadata}; 19cc290419Sopenharmony_ciuse std::io::{Read, Seek, Write, Error}; 20cc290419Sopenharmony_ciuse std::path::PathBuf; 21cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))] 22cc290419Sopenharmony_ciuse std::path::Path; 23cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))] 24cc290419Sopenharmony_ciuse std::os::unix::fs::PermissionsExt; 25cc290419Sopenharmony_ciuse std::sync::Arc; 26cc290419Sopenharmony_ci 27cc290419Sopenharmony_ciuse crate::common::base::Base; 28cc290419Sopenharmony_ciuse crate::common::hdcfile::FileTaskMap; 29cc290419Sopenharmony_ciuse crate::config::HdcCommand; 30cc290419Sopenharmony_ciuse crate::config::TaskMessage; 31cc290419Sopenharmony_ciuse crate::{config::*, utils}; 32cc290419Sopenharmony_ciuse crate::serializer::native_struct::TransferConfig; 33cc290419Sopenharmony_ciuse crate::serializer::native_struct::TransferPayload; 34cc290419Sopenharmony_ciuse crate::serializer::serialize::Serialization; 35cc290419Sopenharmony_ciuse crate::transfer; 36cc290419Sopenharmony_ci#[cfg(not(feature = "host"))] 37cc290419Sopenharmony_ciuse crate::utils::hdc_log::*; 38cc290419Sopenharmony_ci#[cfg(feature = "host")] 39cc290419Sopenharmony_ciextern crate ylong_runtime_static as ylong_runtime; 40cc290419Sopenharmony_ciuse ylong_runtime::sync::Mutex; 41cc290419Sopenharmony_ciuse ylong_runtime::task::JoinHandle; 42cc290419Sopenharmony_ci 43cc290419Sopenharmony_ciextern "C" { 44cc290419Sopenharmony_ci fn LZ4CompressTransfer( 45cc290419Sopenharmony_ci data: *const libc::c_char, 46cc290419Sopenharmony_ci dataCompress: *mut libc::c_char, 47cc290419Sopenharmony_ci data_size: i32, 48cc290419Sopenharmony_ci compressCapacity: i32, 49cc290419Sopenharmony_ci ) -> i32; 50cc290419Sopenharmony_ci fn LZ4DeompressTransfer( 51cc290419Sopenharmony_ci data: *const libc::c_char, 52cc290419Sopenharmony_ci dataDecompress: *mut libc::c_char, 53cc290419Sopenharmony_ci data_size: i32, 54cc290419Sopenharmony_ci decompressCapacity: i32, 55cc290419Sopenharmony_ci ) -> i32; 56cc290419Sopenharmony_ci} 57cc290419Sopenharmony_ci 58cc290419Sopenharmony_ci#[derive(Debug, Default, Clone, PartialEq, Eq)] 59cc290419Sopenharmony_cipub struct HdcTransferBase { 60cc290419Sopenharmony_ci pub need_close_notify: bool, 61cc290419Sopenharmony_ci pub io_index: u64, 62cc290419Sopenharmony_ci pub last_error: u32, 63cc290419Sopenharmony_ci pub is_io_finish: bool, 64cc290419Sopenharmony_ci pub is_master: bool, 65cc290419Sopenharmony_ci pub remote_path: String, 66cc290419Sopenharmony_ci pub base_local_path: String, 67cc290419Sopenharmony_ci pub local_path: String, 68cc290419Sopenharmony_ci pub server_or_daemon: bool, 69cc290419Sopenharmony_ci pub task_queue: Vec<String>, 70cc290419Sopenharmony_ci pub local_name: String, 71cc290419Sopenharmony_ci pub local_tar_raw_path: String, 72cc290419Sopenharmony_ci pub is_dir: bool, 73cc290419Sopenharmony_ci pub file_size: u64, 74cc290419Sopenharmony_ci pub dir_size: u64, 75cc290419Sopenharmony_ci pub session_id: u32, 76cc290419Sopenharmony_ci pub channel_id: u32, 77cc290419Sopenharmony_ci pub index: u64, 78cc290419Sopenharmony_ci pub file_cnt: u32, 79cc290419Sopenharmony_ci pub is_file_mode_sync: bool, 80cc290419Sopenharmony_ci pub file_begin_time: u64, 81cc290419Sopenharmony_ci pub dir_begin_time: u64, 82cc290419Sopenharmony_ci pub is_local_dir_exsit: Option<bool>, 83cc290419Sopenharmony_ci pub empty_dirs: String, 84cc290419Sopenharmony_ci pub stop_run: bool, 85cc290419Sopenharmony_ci pub command_str: String, 86cc290419Sopenharmony_ci 87cc290419Sopenharmony_ci pub transfer_config: TransferConfig, 88cc290419Sopenharmony_ci} 89cc290419Sopenharmony_ci 90cc290419Sopenharmony_ciimpl HdcTransferBase { 91cc290419Sopenharmony_ci pub fn new(_session_id: u32, _channel_id: u32) -> Self { 92cc290419Sopenharmony_ci Self { 93cc290419Sopenharmony_ci need_close_notify: false, 94cc290419Sopenharmony_ci io_index: 0, 95cc290419Sopenharmony_ci last_error: 0, 96cc290419Sopenharmony_ci is_io_finish: false, 97cc290419Sopenharmony_ci is_master: false, 98cc290419Sopenharmony_ci remote_path: String::new(), 99cc290419Sopenharmony_ci base_local_path: String::new(), 100cc290419Sopenharmony_ci local_path: String::new(), 101cc290419Sopenharmony_ci local_tar_raw_path: String::new(), 102cc290419Sopenharmony_ci server_or_daemon: false, 103cc290419Sopenharmony_ci task_queue: Vec::<String>::new(), 104cc290419Sopenharmony_ci local_name: String::new(), 105cc290419Sopenharmony_ci is_dir: false, 106cc290419Sopenharmony_ci file_size: 0, 107cc290419Sopenharmony_ci dir_size: 0, 108cc290419Sopenharmony_ci session_id: _session_id, 109cc290419Sopenharmony_ci channel_id: _channel_id, 110cc290419Sopenharmony_ci index: 0, 111cc290419Sopenharmony_ci file_cnt: 0, 112cc290419Sopenharmony_ci is_file_mode_sync: false, 113cc290419Sopenharmony_ci file_begin_time: 0, 114cc290419Sopenharmony_ci dir_begin_time: 0, 115cc290419Sopenharmony_ci is_local_dir_exsit: None, 116cc290419Sopenharmony_ci empty_dirs: String::new(), 117cc290419Sopenharmony_ci stop_run: false, 118cc290419Sopenharmony_ci command_str: String::new(), 119cc290419Sopenharmony_ci transfer_config: TransferConfig::default(), 120cc290419Sopenharmony_ci } 121cc290419Sopenharmony_ci } 122cc290419Sopenharmony_ci} 123cc290419Sopenharmony_ci 124cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))] 125cc290419Sopenharmony_cifn set_file_permission(path: String, mode: u32) -> std::io::Result<()> { 126cc290419Sopenharmony_ci let perms = std::fs::Permissions::from_mode(mode); 127cc290419Sopenharmony_ci fs::set_permissions(std::path::Path::new(&path), perms) 128cc290419Sopenharmony_ci} 129cc290419Sopenharmony_ci 130cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))] 131cc290419Sopenharmony_cifn set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()> { 132cc290419Sopenharmony_ci let perms = std::fs::Permissions::from_mode(mode); 133cc290419Sopenharmony_ci fs::set_permissions(dir, perms)?; 134cc290419Sopenharmony_ci 135cc290419Sopenharmony_ci for entry in fs::read_dir(dir)? { 136cc290419Sopenharmony_ci let entry = entry?; 137cc290419Sopenharmony_ci let entry_path = dir.join(entry.file_name()); 138cc290419Sopenharmony_ci if entry_path.is_dir() { 139cc290419Sopenharmony_ci set_dir_permissions_recursive(&entry_path, mode)?; 140cc290419Sopenharmony_ci } 141cc290419Sopenharmony_ci } 142cc290419Sopenharmony_ci Ok(()) 143cc290419Sopenharmony_ci} 144cc290419Sopenharmony_ci 145cc290419Sopenharmony_ci#[allow(unused)] 146cc290419Sopenharmony_cifn create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()> { 147cc290419Sopenharmony_ci let mut dir_path = std::path::Path::new(&path); 148cc290419Sopenharmony_ci while let Some(p) = dir_path.parent() { 149cc290419Sopenharmony_ci if p.exists() { 150cc290419Sopenharmony_ci break; 151cc290419Sopenharmony_ci } 152cc290419Sopenharmony_ci dir_path = p; 153cc290419Sopenharmony_ci } 154cc290419Sopenharmony_ci #[cfg(not(target_os = "windows"))] 155cc290419Sopenharmony_ci let exsit = dir_path.exists(); 156cc290419Sopenharmony_ci std::fs::create_dir_all(path.clone())?; 157cc290419Sopenharmony_ci #[cfg(not(target_os = "windows"))] 158cc290419Sopenharmony_ci if !exsit { 159cc290419Sopenharmony_ci set_dir_permissions_recursive(dir_path, mode) 160cc290419Sopenharmony_ci } else { 161cc290419Sopenharmony_ci Ok(()) 162cc290419Sopenharmony_ci } 163cc290419Sopenharmony_ci #[cfg(target_os = "windows")] 164cc290419Sopenharmony_ci Ok(()) 165cc290419Sopenharmony_ci} 166cc290419Sopenharmony_ci 167cc290419Sopenharmony_cipub fn check_local_path( 168cc290419Sopenharmony_ci transfer: &mut HdcTransferBase, 169cc290419Sopenharmony_ci _local_path: &str, 170cc290419Sopenharmony_ci _optional_name: &str, 171cc290419Sopenharmony_ci) -> Result<bool, Error> { 172cc290419Sopenharmony_ci crate::info!( 173cc290419Sopenharmony_ci "check_local_path, local_path:{}, optional_name:{}", 174cc290419Sopenharmony_ci _local_path, 175cc290419Sopenharmony_ci _optional_name 176cc290419Sopenharmony_ci ); 177cc290419Sopenharmony_ci let file = metadata(_local_path); 178cc290419Sopenharmony_ci if let Ok(f) = file { 179cc290419Sopenharmony_ci if transfer.is_local_dir_exsit.is_none() { 180cc290419Sopenharmony_ci transfer.is_local_dir_exsit = Some(true); 181cc290419Sopenharmony_ci } 182cc290419Sopenharmony_ci transfer.is_dir = f.is_dir(); 183cc290419Sopenharmony_ci if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) { 184cc290419Sopenharmony_ci transfer 185cc290419Sopenharmony_ci .local_path 186cc290419Sopenharmony_ci .push_str(Base::get_path_sep().to_string().as_str()); 187cc290419Sopenharmony_ci } 188cc290419Sopenharmony_ci } else if transfer.is_local_dir_exsit.is_none() { 189cc290419Sopenharmony_ci transfer.is_local_dir_exsit = Some(false); 190cc290419Sopenharmony_ci } 191cc290419Sopenharmony_ci let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str()); 192cc290419Sopenharmony_ci op = op.replace('/', Base::get_path_sep().to_string().as_str()); 193cc290419Sopenharmony_ci 194cc290419Sopenharmony_ci if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) { 195cc290419Sopenharmony_ci transfer 196cc290419Sopenharmony_ci .local_path 197cc290419Sopenharmony_ci .push_str(Base::get_path_sep().to_string().as_str()); 198cc290419Sopenharmony_ci } 199cc290419Sopenharmony_ci 200cc290419Sopenharmony_ci if transfer.local_path.ends_with(Base::get_path_sep()) { 201cc290419Sopenharmony_ci let local_dir = transfer 202cc290419Sopenharmony_ci .local_path 203cc290419Sopenharmony_ci .clone() 204cc290419Sopenharmony_ci .replace('/', Base::get_path_sep().to_string().as_str()); 205cc290419Sopenharmony_ci 206cc290419Sopenharmony_ci if let Some(false) = transfer.is_local_dir_exsit { 207cc290419Sopenharmony_ci if op.contains(Base::get_path_sep()) { 208cc290419Sopenharmony_ci let first_sep_index = op.find(Base::get_path_sep()).unwrap_or_default(); 209cc290419Sopenharmony_ci op = op.as_str()[first_sep_index..].to_string(); 210cc290419Sopenharmony_ci crate::debug!( 211cc290419Sopenharmony_ci "check_local_path, combine 2 local_dir:{}, op:{}", 212cc290419Sopenharmony_ci local_dir, 213cc290419Sopenharmony_ci op 214cc290419Sopenharmony_ci ); 215cc290419Sopenharmony_ci } 216cc290419Sopenharmony_ci } 217cc290419Sopenharmony_ci 218cc290419Sopenharmony_ci transfer.local_path = Base::combine(local_dir, op); 219cc290419Sopenharmony_ci } 220cc290419Sopenharmony_ci crate::debug!( 221cc290419Sopenharmony_ci "check_local_path, final transfer.local_path:{}", 222cc290419Sopenharmony_ci transfer.local_path 223cc290419Sopenharmony_ci ); 224cc290419Sopenharmony_ci if transfer.local_path.ends_with(Base::get_path_sep()) { 225cc290419Sopenharmony_ci match create_dir_all_with_permission(transfer.local_path.clone(), 0o750) { 226cc290419Sopenharmony_ci Ok(_) => Ok(true), 227cc290419Sopenharmony_ci Err(error) => { 228cc290419Sopenharmony_ci crate::error!("dir create failed, error:{}", &error); 229cc290419Sopenharmony_ci Err(error) 230cc290419Sopenharmony_ci }, 231cc290419Sopenharmony_ci } 232cc290419Sopenharmony_ci } else { 233cc290419Sopenharmony_ci let last = transfer.local_path.rfind(Base::get_path_sep()); 234cc290419Sopenharmony_ci match last { 235cc290419Sopenharmony_ci Some(index) => { 236cc290419Sopenharmony_ci match create_dir_all_with_permission((transfer.local_path[0..index]).to_string(), 0o750) { 237cc290419Sopenharmony_ci Ok(_) => { 238cc290419Sopenharmony_ci match File::create(transfer.local_path.clone()) { 239cc290419Sopenharmony_ci Ok(_) => { 240cc290419Sopenharmony_ci #[cfg(not(target_os = "windows"))] 241cc290419Sopenharmony_ci set_file_permission(transfer.local_path.clone(), 0o644)?; 242cc290419Sopenharmony_ci Ok(true) 243cc290419Sopenharmony_ci }, 244cc290419Sopenharmony_ci Err(error) => { 245cc290419Sopenharmony_ci crate::error!("file create failed, error:{}", &error); 246cc290419Sopenharmony_ci Err(error) 247cc290419Sopenharmony_ci }, 248cc290419Sopenharmony_ci } 249cc290419Sopenharmony_ci } 250cc290419Sopenharmony_ci Err(error) => { 251cc290419Sopenharmony_ci crate::error!("dir create failed, error:{}", &error); 252cc290419Sopenharmony_ci Err(error) 253cc290419Sopenharmony_ci }, 254cc290419Sopenharmony_ci } 255cc290419Sopenharmony_ci } 256cc290419Sopenharmony_ci None => { 257cc290419Sopenharmony_ci match File::create(transfer.local_path.clone()) { 258cc290419Sopenharmony_ci Ok(_) => { 259cc290419Sopenharmony_ci #[cfg(not(target_os = "windows"))] 260cc290419Sopenharmony_ci set_file_permission(transfer.local_path.clone(), 0o644)?; 261cc290419Sopenharmony_ci Ok(true) 262cc290419Sopenharmony_ci }, 263cc290419Sopenharmony_ci Err(error) => { 264cc290419Sopenharmony_ci crate::error!("file create failed, error:{}", &error); 265cc290419Sopenharmony_ci Err(error) 266cc290419Sopenharmony_ci }, 267cc290419Sopenharmony_ci } 268cc290419Sopenharmony_ci } 269cc290419Sopenharmony_ci } 270cc290419Sopenharmony_ci } 271cc290419Sopenharmony_ci} 272cc290419Sopenharmony_ci 273cc290419Sopenharmony_cifn spawn_handler( 274cc290419Sopenharmony_ci _command_data: HdcCommand, 275cc290419Sopenharmony_ci index: usize, 276cc290419Sopenharmony_ci local_path: String, 277cc290419Sopenharmony_ci _channel_id_: u32, 278cc290419Sopenharmony_ci transfer_config: &TransferConfig, 279cc290419Sopenharmony_ci) -> JoinHandle<(bool, TaskMessage)> { 280cc290419Sopenharmony_ci let thread_path_ref = Arc::new(Mutex::new(local_path)); 281cc290419Sopenharmony_ci let pos = (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64); 282cc290419Sopenharmony_ci let compress_type = transfer_config.compress_type; 283cc290419Sopenharmony_ci let file_size = transfer_config.file_size as u64; 284cc290419Sopenharmony_ci ylong_runtime::spawn(async move { 285cc290419Sopenharmony_ci let path = thread_path_ref.lock().await; 286cc290419Sopenharmony_ci let Ok(mut file) = File::open(&*path) else { 287cc290419Sopenharmony_ci crate::debug!("open file failed, path:{}", *path); 288cc290419Sopenharmony_ci let _data_message = TaskMessage { 289cc290419Sopenharmony_ci channel_id: _channel_id_, 290cc290419Sopenharmony_ci command: _command_data, 291cc290419Sopenharmony_ci payload: Vec::new(), 292cc290419Sopenharmony_ci }; 293cc290419Sopenharmony_ci return (false, _data_message); 294cc290419Sopenharmony_ci }; 295cc290419Sopenharmony_ci let _ = file.seek(std::io::SeekFrom::Start(pos)); 296cc290419Sopenharmony_ci let mut total = Vec::from([0; FILE_PACKAGE_HEAD]); 297cc290419Sopenharmony_ci let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE]; 298cc290419Sopenharmony_ci let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE]; 299cc290419Sopenharmony_ci let mut read_len = 0usize; 300cc290419Sopenharmony_ci let mut package_read_len = (file_size - pos) as usize; 301cc290419Sopenharmony_ci if package_read_len > FILE_PACKAGE_PAYLOAD_SIZE { 302cc290419Sopenharmony_ci package_read_len = FILE_PACKAGE_PAYLOAD_SIZE; 303cc290419Sopenharmony_ci } 304cc290419Sopenharmony_ci while read_len < package_read_len { 305cc290419Sopenharmony_ci let Ok(single_len) = file.read(&mut buf[read_len..]) else { 306cc290419Sopenharmony_ci crate::debug!("file read failed, path:{}", *path); 307cc290419Sopenharmony_ci break; 308cc290419Sopenharmony_ci }; 309cc290419Sopenharmony_ci read_len += single_len; 310cc290419Sopenharmony_ci if single_len == 0 && read_len < package_read_len { 311cc290419Sopenharmony_ci break; 312cc290419Sopenharmony_ci } 313cc290419Sopenharmony_ci } 314cc290419Sopenharmony_ci let transfer_compress_type = match CompressType::try_from(compress_type) { 315cc290419Sopenharmony_ci Ok(compress_type) => compress_type, 316cc290419Sopenharmony_ci Err(_) => CompressType::None, 317cc290419Sopenharmony_ci }; 318cc290419Sopenharmony_ci 319cc290419Sopenharmony_ci let mut header: TransferPayload = TransferPayload { 320cc290419Sopenharmony_ci index: pos, 321cc290419Sopenharmony_ci compress_type, 322cc290419Sopenharmony_ci compress_size: 0, 323cc290419Sopenharmony_ci uncompress_size: 0, 324cc290419Sopenharmony_ci }; 325cc290419Sopenharmony_ci header.uncompress_size = read_len as u32; 326cc290419Sopenharmony_ci let capacity = read_len as i32; 327cc290419Sopenharmony_ci 328cc290419Sopenharmony_ci match transfer_compress_type { 329cc290419Sopenharmony_ci CompressType::Lz4 => { 330cc290419Sopenharmony_ci let compress_size: i32; 331cc290419Sopenharmony_ci header.compress_type = CompressType::Lz4 as u8; 332cc290419Sopenharmony_ci unsafe { 333cc290419Sopenharmony_ci compress_size = LZ4CompressTransfer( 334cc290419Sopenharmony_ci buf.as_ptr() as *const libc::c_char, 335cc290419Sopenharmony_ci data_buf.as_ptr() as *mut libc::c_char, 336cc290419Sopenharmony_ci capacity, 337cc290419Sopenharmony_ci capacity, 338cc290419Sopenharmony_ci ); 339cc290419Sopenharmony_ci } 340cc290419Sopenharmony_ci if compress_size > 0 { 341cc290419Sopenharmony_ci header.compress_size = compress_size as u32; 342cc290419Sopenharmony_ci } else { 343cc290419Sopenharmony_ci header.compress_type = CompressType::None as u8; 344cc290419Sopenharmony_ci header.compress_size = read_len as u32; 345cc290419Sopenharmony_ci data_buf = buf; 346cc290419Sopenharmony_ci } 347cc290419Sopenharmony_ci } 348cc290419Sopenharmony_ci _ => { 349cc290419Sopenharmony_ci header.compress_type = CompressType::None as u8; 350cc290419Sopenharmony_ci header.compress_size = read_len as u32; 351cc290419Sopenharmony_ci data_buf = buf; 352cc290419Sopenharmony_ci } 353cc290419Sopenharmony_ci } 354cc290419Sopenharmony_ci 355cc290419Sopenharmony_ci let head_buffer = header.serialize(); 356cc290419Sopenharmony_ci total[..head_buffer.len()].copy_from_slice(&head_buffer[..]); 357cc290419Sopenharmony_ci let data_len = header.compress_size as usize; 358cc290419Sopenharmony_ci total.append(&mut data_buf[..data_len].to_vec()); 359cc290419Sopenharmony_ci let _data_message = TaskMessage { 360cc290419Sopenharmony_ci channel_id: _channel_id_, 361cc290419Sopenharmony_ci command: _command_data, 362cc290419Sopenharmony_ci payload: total, 363cc290419Sopenharmony_ci }; 364cc290419Sopenharmony_ci (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message) 365cc290419Sopenharmony_ci }) 366cc290419Sopenharmony_ci} 367cc290419Sopenharmony_ci 368cc290419Sopenharmony_cifn is_dir_link(path: String) -> bool { 369cc290419Sopenharmony_ci let ret = std::fs::read_link(path); 370cc290419Sopenharmony_ci match ret { 371cc290419Sopenharmony_ci Ok(p) => { 372cc290419Sopenharmony_ci crate::debug!("link to file:{}", p.display().to_string()); 373cc290419Sopenharmony_ci p.exists() && p.is_dir() 374cc290419Sopenharmony_ci } 375cc290419Sopenharmony_ci Err(e) => { 376cc290419Sopenharmony_ci crate::error!("read_link fail:{:#?}", e); 377cc290419Sopenharmony_ci false 378cc290419Sopenharmony_ci } 379cc290419Sopenharmony_ci } 380cc290419Sopenharmony_ci} 381cc290419Sopenharmony_ci 382cc290419Sopenharmony_cifn is_file_access(path: String) -> bool { 383cc290419Sopenharmony_ci let file = metadata(path.clone()); 384cc290419Sopenharmony_ci match file { 385cc290419Sopenharmony_ci Ok(f) => { 386cc290419Sopenharmony_ci if !f.is_symlink() { 387cc290419Sopenharmony_ci crate::debug!("file is not a link, path:{}", path); 388cc290419Sopenharmony_ci return true; 389cc290419Sopenharmony_ci } 390cc290419Sopenharmony_ci } 391cc290419Sopenharmony_ci Err(_e) => { 392cc290419Sopenharmony_ci crate::error!("metadata file is error, path:{}", path); 393cc290419Sopenharmony_ci return false; 394cc290419Sopenharmony_ci } 395cc290419Sopenharmony_ci } 396cc290419Sopenharmony_ci let ret = std::fs::read_link(path); 397cc290419Sopenharmony_ci match ret { 398cc290419Sopenharmony_ci Ok(p) => { 399cc290419Sopenharmony_ci crate::debug!("link to file:{}", p.display().to_string()); 400cc290419Sopenharmony_ci p.exists() 401cc290419Sopenharmony_ci } 402cc290419Sopenharmony_ci Err(e) => { 403cc290419Sopenharmony_ci crate::error!("read_link fail:{:#?}", e); 404cc290419Sopenharmony_ci false 405cc290419Sopenharmony_ci } 406cc290419Sopenharmony_ci } 407cc290419Sopenharmony_ci} 408cc290419Sopenharmony_ci 409cc290419Sopenharmony_cipub async fn read_and_send_data( 410cc290419Sopenharmony_ci local_path: &str, 411cc290419Sopenharmony_ci session_id: u32, 412cc290419Sopenharmony_ci _channel_id_: u32, 413cc290419Sopenharmony_ci _file_size: u64, 414cc290419Sopenharmony_ci _command_data: HdcCommand, 415cc290419Sopenharmony_ci transfer_config: &TransferConfig, 416cc290419Sopenharmony_ci) -> bool { 417cc290419Sopenharmony_ci const MAX_WORKER_COUNT: usize = 5; 418cc290419Sopenharmony_ci let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize; 419cc290419Sopenharmony_ci if pieces_count == 0 { 420cc290419Sopenharmony_ci pieces_count = 1; 421cc290419Sopenharmony_ci } 422cc290419Sopenharmony_ci let workers_count = if pieces_count > MAX_WORKER_COUNT { 423cc290419Sopenharmony_ci MAX_WORKER_COUNT 424cc290419Sopenharmony_ci } else { 425cc290419Sopenharmony_ci pieces_count 426cc290419Sopenharmony_ci }; 427cc290419Sopenharmony_ci let mut index = 0; 428cc290419Sopenharmony_ci let mut queue = VecDeque::new(); 429cc290419Sopenharmony_ci while index < workers_count { 430cc290419Sopenharmony_ci let worker = spawn_handler( 431cc290419Sopenharmony_ci _command_data, 432cc290419Sopenharmony_ci index, 433cc290419Sopenharmony_ci local_path.to_owned(), 434cc290419Sopenharmony_ci _channel_id_, 435cc290419Sopenharmony_ci transfer_config, 436cc290419Sopenharmony_ci ); 437cc290419Sopenharmony_ci queue.push_back(worker); 438cc290419Sopenharmony_ci index += 1; 439cc290419Sopenharmony_ci } 440cc290419Sopenharmony_ci loop { 441cc290419Sopenharmony_ci if queue.is_empty() { 442cc290419Sopenharmony_ci crate::debug!("read_and_send_data queue is empty"); 443cc290419Sopenharmony_ci break; 444cc290419Sopenharmony_ci } 445cc290419Sopenharmony_ci let Some(handler) = queue.pop_front() else { 446cc290419Sopenharmony_ci continue; 447cc290419Sopenharmony_ci }; 448cc290419Sopenharmony_ci let Ok((is_finish, task_message)) = handler.await else { 449cc290419Sopenharmony_ci continue; 450cc290419Sopenharmony_ci }; 451cc290419Sopenharmony_ci transfer::put(session_id, task_message).await; 452cc290419Sopenharmony_ci if is_finish { 453cc290419Sopenharmony_ci crate::debug!("read_and_send_data is finish return false"); 454cc290419Sopenharmony_ci return false; 455cc290419Sopenharmony_ci } 456cc290419Sopenharmony_ci 457cc290419Sopenharmony_ci if (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64) < _file_size { 458cc290419Sopenharmony_ci let worker = spawn_handler( 459cc290419Sopenharmony_ci _command_data, 460cc290419Sopenharmony_ci index, 461cc290419Sopenharmony_ci local_path.to_owned(), 462cc290419Sopenharmony_ci _channel_id_, 463cc290419Sopenharmony_ci transfer_config, 464cc290419Sopenharmony_ci ); 465cc290419Sopenharmony_ci queue.push_back(worker); 466cc290419Sopenharmony_ci index += 1; 467cc290419Sopenharmony_ci } 468cc290419Sopenharmony_ci } 469cc290419Sopenharmony_ci true 470cc290419Sopenharmony_ci} 471cc290419Sopenharmony_ci 472cc290419Sopenharmony_cipub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool { 473cc290419Sopenharmony_ci let mut header = TransferPayload { 474cc290419Sopenharmony_ci ..Default::default() 475cc290419Sopenharmony_ci }; 476cc290419Sopenharmony_ci let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec()); 477cc290419Sopenharmony_ci let file_index = header.index; 478cc290419Sopenharmony_ci let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec(); 479cc290419Sopenharmony_ci let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) { 480cc290419Sopenharmony_ci Ok(compress_type) => compress_type, 481cc290419Sopenharmony_ci Err(_) => CompressType::None, 482cc290419Sopenharmony_ci }; 483cc290419Sopenharmony_ci 484cc290419Sopenharmony_ci if let CompressType::Lz4 = compress_type { 485cc290419Sopenharmony_ci let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE]; 486cc290419Sopenharmony_ci let decompress_size = unsafe { 487cc290419Sopenharmony_ci LZ4DeompressTransfer( 488cc290419Sopenharmony_ci _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char, 489cc290419Sopenharmony_ci buf.as_ptr() as *mut libc::c_char, 490cc290419Sopenharmony_ci header.compress_size as i32, 491cc290419Sopenharmony_ci header.uncompress_size as i32, 492cc290419Sopenharmony_ci ) 493cc290419Sopenharmony_ci }; 494cc290419Sopenharmony_ci if decompress_size > 0 { 495cc290419Sopenharmony_ci buffer = buf[..(decompress_size as usize)].to_vec(); 496cc290419Sopenharmony_ci } 497cc290419Sopenharmony_ci } 498cc290419Sopenharmony_ci 499cc290419Sopenharmony_ci let path = tbase.local_path.clone(); 500cc290419Sopenharmony_ci let write_buf = buffer.clone(); 501cc290419Sopenharmony_ci let session_id = tbase.session_id.to_owned(); 502cc290419Sopenharmony_ci let channel_id = tbase.channel_id.to_owned(); 503cc290419Sopenharmony_ci utils::spawn(async move { 504cc290419Sopenharmony_ci let open_result = OpenOptions::new() 505cc290419Sopenharmony_ci .write(true) 506cc290419Sopenharmony_ci .create(true) 507cc290419Sopenharmony_ci .open(path.clone()); 508cc290419Sopenharmony_ci match open_result { 509cc290419Sopenharmony_ci Ok(mut file) => { 510cc290419Sopenharmony_ci let _ = file.seek(std::io::SeekFrom::Start(file_index)); 511cc290419Sopenharmony_ci let write_result = file.write_all(write_buf.as_slice()); 512cc290419Sopenharmony_ci match write_result { 513cc290419Sopenharmony_ci Ok(()) => {} 514cc290419Sopenharmony_ci Err(e) => { 515cc290419Sopenharmony_ci let _ = put_last_error(e, session_id, channel_id).await; 516cc290419Sopenharmony_ci } 517cc290419Sopenharmony_ci } 518cc290419Sopenharmony_ci } 519cc290419Sopenharmony_ci Err(e) => { 520cc290419Sopenharmony_ci let _ = put_last_error(e, session_id, channel_id).await; 521cc290419Sopenharmony_ci } 522cc290419Sopenharmony_ci } 523cc290419Sopenharmony_ci }); 524cc290419Sopenharmony_ci 525cc290419Sopenharmony_ci tbase.index += buffer.len() as u64; 526cc290419Sopenharmony_ci crate::debug!( 527cc290419Sopenharmony_ci "transfer file [{}] index {} / {}", 528cc290419Sopenharmony_ci tbase.local_path.clone(), 529cc290419Sopenharmony_ci tbase.index, 530cc290419Sopenharmony_ci tbase.file_size 531cc290419Sopenharmony_ci ); 532cc290419Sopenharmony_ci if tbase.index >= tbase.file_size { 533cc290419Sopenharmony_ci return true; 534cc290419Sopenharmony_ci } 535cc290419Sopenharmony_ci false 536cc290419Sopenharmony_ci} 537cc290419Sopenharmony_ci 538cc290419Sopenharmony_ciasync fn put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool { 539cc290419Sopenharmony_ci crate::warn!( 540cc290419Sopenharmony_ci "put_last_error sesssion_id:{}, channel_id:{}, error:{}", 541cc290419Sopenharmony_ci session_id, 542cc290419Sopenharmony_ci channel_id, 543cc290419Sopenharmony_ci error, 544cc290419Sopenharmony_ci ); 545cc290419Sopenharmony_ci let errno = match error.raw_os_error() { 546cc290419Sopenharmony_ci Some(errno) => errno as u32, 547cc290419Sopenharmony_ci None => std::i32::MAX as u32 548cc290419Sopenharmony_ci }; 549cc290419Sopenharmony_ci match FileTaskMap::get(session_id, channel_id).await { 550cc290419Sopenharmony_ci Some(task) => { 551cc290419Sopenharmony_ci let mut task = task.lock().await; 552cc290419Sopenharmony_ci task.transfer.last_error = errno; 553cc290419Sopenharmony_ci } 554cc290419Sopenharmony_ci None => { 555cc290419Sopenharmony_ci crate::error!( 556cc290419Sopenharmony_ci "recv_and_write_file get task is none session_id:{},channel_id:{}", 557cc290419Sopenharmony_ci session_id, 558cc290419Sopenharmony_ci channel_id, 559cc290419Sopenharmony_ci ); 560cc290419Sopenharmony_ci return false; 561cc290419Sopenharmony_ci } 562cc290419Sopenharmony_ci } 563cc290419Sopenharmony_ci true 564cc290419Sopenharmony_ci} 565cc290419Sopenharmony_ci 566cc290419Sopenharmony_cipub fn get_sub_files_resurively(_path: &String) -> Vec<String> { 567cc290419Sopenharmony_ci let mut result = Vec::new(); 568cc290419Sopenharmony_ci let dir_path = PathBuf::from(_path); 569cc290419Sopenharmony_ci if !is_file_access(_path.clone()) { 570cc290419Sopenharmony_ci crate::error!("file is invalid link, path:{}", _path); 571cc290419Sopenharmony_ci return result; 572cc290419Sopenharmony_ci } 573cc290419Sopenharmony_ci let Ok(dir_list) = fs::read_dir(dir_path) else { 574cc290419Sopenharmony_ci crate::error!("read dir fail, path:{}", _path); 575cc290419Sopenharmony_ci return result; 576cc290419Sopenharmony_ci }; 577cc290419Sopenharmony_ci for entry in dir_list { 578cc290419Sopenharmony_ci let Ok(path) = entry else { 579cc290419Sopenharmony_ci continue; 580cc290419Sopenharmony_ci }; 581cc290419Sopenharmony_ci let path = path.path(); 582cc290419Sopenharmony_ci if is_dir_link(path.clone().display().to_string()) { 583cc290419Sopenharmony_ci continue; 584cc290419Sopenharmony_ci } else if path.is_file() { 585cc290419Sopenharmony_ci result.push(Base::normalized_path(path).display().to_string()); 586cc290419Sopenharmony_ci } else { 587cc290419Sopenharmony_ci let p = path.display().to_string(); 588cc290419Sopenharmony_ci let mut sub_list = get_sub_files_resurively(&p); 589cc290419Sopenharmony_ci result.append(&mut sub_list); 590cc290419Sopenharmony_ci } 591cc290419Sopenharmony_ci } 592cc290419Sopenharmony_ci result.sort(); 593cc290419Sopenharmony_ci result 594cc290419Sopenharmony_ci} 595cc290419Sopenharmony_ci 596cc290419Sopenharmony_cipub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) { 597cc290419Sopenharmony_ci let local_path_ = transfer.local_path.clone(); 598cc290419Sopenharmony_ci 599cc290419Sopenharmony_ci read_and_send_data( 600cc290419Sopenharmony_ci &local_path_, 601cc290419Sopenharmony_ci transfer.session_id, 602cc290419Sopenharmony_ci transfer.channel_id, 603cc290419Sopenharmony_ci transfer.file_size, 604cc290419Sopenharmony_ci _command_data, 605cc290419Sopenharmony_ci &transfer.transfer_config, 606cc290419Sopenharmony_ci ) 607cc290419Sopenharmony_ci .await; 608cc290419Sopenharmony_ci} 609cc290419Sopenharmony_ci 610cc290419Sopenharmony_cipub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool { 611cc290419Sopenharmony_ci recv_and_write_file(tbase, _payload) 612cc290419Sopenharmony_ci} 613cc290419Sopenharmony_ci 614cc290419Sopenharmony_cipub async fn transfer_task_finish(channel_id: u32, _session_id: u32) { 615cc290419Sopenharmony_ci let task_message = TaskMessage { 616cc290419Sopenharmony_ci channel_id, 617cc290419Sopenharmony_ci command: HdcCommand::KernelChannelClose, 618cc290419Sopenharmony_ci payload: [1].to_vec(), 619cc290419Sopenharmony_ci }; 620cc290419Sopenharmony_ci transfer::put(_session_id, task_message).await; 621cc290419Sopenharmony_ci} 622cc290419Sopenharmony_ci 623cc290419Sopenharmony_cipub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) { 624cc290419Sopenharmony_ci let task_message = TaskMessage { 625cc290419Sopenharmony_ci channel_id, 626cc290419Sopenharmony_ci command: comamnd_finish, 627cc290419Sopenharmony_ci payload: [1].to_vec(), 628cc290419Sopenharmony_ci }; 629cc290419Sopenharmony_ci transfer::put(_session_id, task_message).await; 630cc290419Sopenharmony_ci} 631cc290419Sopenharmony_ci 632cc290419Sopenharmony_cipub async fn close_channel(channel_id: u32) { 633cc290419Sopenharmony_ci transfer::TcpMap::end(channel_id).await; 634cc290419Sopenharmony_ci} 635cc290419Sopenharmony_ci 636cc290419Sopenharmony_cipub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel) { 637cc290419Sopenharmony_ci #[cfg(feature = "host")] 638cc290419Sopenharmony_ci { 639cc290419Sopenharmony_ci let echo_level = match level { 640cc290419Sopenharmony_ci MessageLevel::Ok => transfer::EchoLevel::OK, 641cc290419Sopenharmony_ci MessageLevel::Fail => transfer::EchoLevel::FAIL, 642cc290419Sopenharmony_ci MessageLevel::Info => transfer::EchoLevel::INFO, 643cc290419Sopenharmony_ci }; 644cc290419Sopenharmony_ci let _ = 645cc290419Sopenharmony_ci transfer::send_channel_msg(channel_id, echo_level, message.to_string()) 646cc290419Sopenharmony_ci .await; 647cc290419Sopenharmony_ci } 648cc290419Sopenharmony_ci #[cfg(not(feature = "host"))] 649cc290419Sopenharmony_ci { 650cc290419Sopenharmony_ci let mut data = Vec::<u8>::new(); 651cc290419Sopenharmony_ci data.push(level as u8); 652cc290419Sopenharmony_ci data.append(&mut message.as_bytes().to_vec()); 653cc290419Sopenharmony_ci let echo_message = TaskMessage { 654cc290419Sopenharmony_ci channel_id, 655cc290419Sopenharmony_ci command: HdcCommand::KernelEcho, 656cc290419Sopenharmony_ci payload: data, 657cc290419Sopenharmony_ci }; 658cc290419Sopenharmony_ci transfer::put(_session_id, echo_message).await; 659cc290419Sopenharmony_ci } 660cc290419Sopenharmony_ci}