1cc290419Sopenharmony_ci/*
2cc290419Sopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd.
3cc290419Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4cc290419Sopenharmony_ci * you may not use this file except in compliance with the License.
5cc290419Sopenharmony_ci * You may obtain a copy of the License at
6cc290419Sopenharmony_ci *
7cc290419Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8cc290419Sopenharmony_ci *
9cc290419Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10cc290419Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11cc290419Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12cc290419Sopenharmony_ci * See the License for the specific language governing permissions and
13cc290419Sopenharmony_ci * limitations under the License.
14cc290419Sopenharmony_ci */
15cc290419Sopenharmony_ci//! hdctransfer
16cc290419Sopenharmony_ci#![allow(missing_docs)]
17cc290419Sopenharmony_ciuse std::collections::VecDeque;
18cc290419Sopenharmony_ciuse std::fs::{self, File, OpenOptions, metadata};
19cc290419Sopenharmony_ciuse std::io::{Read, Seek, Write, Error};
20cc290419Sopenharmony_ciuse std::path::PathBuf;
21cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))]
22cc290419Sopenharmony_ciuse std::path::Path;
23cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))]
24cc290419Sopenharmony_ciuse std::os::unix::fs::PermissionsExt;
25cc290419Sopenharmony_ciuse std::sync::Arc;
26cc290419Sopenharmony_ci
27cc290419Sopenharmony_ciuse crate::common::base::Base;
28cc290419Sopenharmony_ciuse crate::common::hdcfile::FileTaskMap;
29cc290419Sopenharmony_ciuse crate::config::HdcCommand;
30cc290419Sopenharmony_ciuse crate::config::TaskMessage;
31cc290419Sopenharmony_ciuse crate::{config::*, utils};
32cc290419Sopenharmony_ciuse crate::serializer::native_struct::TransferConfig;
33cc290419Sopenharmony_ciuse crate::serializer::native_struct::TransferPayload;
34cc290419Sopenharmony_ciuse crate::serializer::serialize::Serialization;
35cc290419Sopenharmony_ciuse crate::transfer;
36cc290419Sopenharmony_ci#[cfg(not(feature = "host"))]
37cc290419Sopenharmony_ciuse crate::utils::hdc_log::*;
38cc290419Sopenharmony_ci#[cfg(feature = "host")]
39cc290419Sopenharmony_ciextern crate ylong_runtime_static as ylong_runtime;
40cc290419Sopenharmony_ciuse ylong_runtime::sync::Mutex;
41cc290419Sopenharmony_ciuse ylong_runtime::task::JoinHandle;
42cc290419Sopenharmony_ci
43cc290419Sopenharmony_ciextern "C" {
44cc290419Sopenharmony_ci    fn LZ4CompressTransfer(
45cc290419Sopenharmony_ci        data: *const libc::c_char,
46cc290419Sopenharmony_ci        dataCompress: *mut libc::c_char,
47cc290419Sopenharmony_ci        data_size: i32,
48cc290419Sopenharmony_ci        compressCapacity: i32,
49cc290419Sopenharmony_ci    ) -> i32;
50cc290419Sopenharmony_ci    fn LZ4DeompressTransfer(
51cc290419Sopenharmony_ci        data: *const libc::c_char,
52cc290419Sopenharmony_ci        dataDecompress: *mut libc::c_char,
53cc290419Sopenharmony_ci        data_size: i32,
54cc290419Sopenharmony_ci        decompressCapacity: i32,
55cc290419Sopenharmony_ci    ) -> i32;
56cc290419Sopenharmony_ci}
57cc290419Sopenharmony_ci
58cc290419Sopenharmony_ci#[derive(Debug, Default, Clone, PartialEq, Eq)]
59cc290419Sopenharmony_cipub struct HdcTransferBase {
60cc290419Sopenharmony_ci    pub need_close_notify: bool,
61cc290419Sopenharmony_ci    pub io_index: u64,
62cc290419Sopenharmony_ci    pub last_error: u32,
63cc290419Sopenharmony_ci    pub is_io_finish: bool,
64cc290419Sopenharmony_ci    pub is_master: bool,
65cc290419Sopenharmony_ci    pub remote_path: String,
66cc290419Sopenharmony_ci    pub base_local_path: String,
67cc290419Sopenharmony_ci    pub local_path: String,
68cc290419Sopenharmony_ci    pub server_or_daemon: bool,
69cc290419Sopenharmony_ci    pub task_queue: Vec<String>,
70cc290419Sopenharmony_ci    pub local_name: String,
71cc290419Sopenharmony_ci    pub local_tar_raw_path: String,
72cc290419Sopenharmony_ci    pub is_dir: bool,
73cc290419Sopenharmony_ci    pub file_size: u64,
74cc290419Sopenharmony_ci    pub dir_size: u64,
75cc290419Sopenharmony_ci    pub session_id: u32,
76cc290419Sopenharmony_ci    pub channel_id: u32,
77cc290419Sopenharmony_ci    pub index: u64,
78cc290419Sopenharmony_ci    pub file_cnt: u32,
79cc290419Sopenharmony_ci    pub is_file_mode_sync: bool,
80cc290419Sopenharmony_ci    pub file_begin_time: u64,
81cc290419Sopenharmony_ci    pub dir_begin_time: u64,
82cc290419Sopenharmony_ci    pub is_local_dir_exsit: Option<bool>,
83cc290419Sopenharmony_ci    pub empty_dirs: String,
84cc290419Sopenharmony_ci    pub stop_run: bool,
85cc290419Sopenharmony_ci    pub command_str: String,
86cc290419Sopenharmony_ci
87cc290419Sopenharmony_ci    pub transfer_config: TransferConfig,
88cc290419Sopenharmony_ci}
89cc290419Sopenharmony_ci
90cc290419Sopenharmony_ciimpl HdcTransferBase {
91cc290419Sopenharmony_ci    pub fn new(_session_id: u32, _channel_id: u32) -> Self {
92cc290419Sopenharmony_ci        Self {
93cc290419Sopenharmony_ci            need_close_notify: false,
94cc290419Sopenharmony_ci            io_index: 0,
95cc290419Sopenharmony_ci            last_error: 0,
96cc290419Sopenharmony_ci            is_io_finish: false,
97cc290419Sopenharmony_ci            is_master: false,
98cc290419Sopenharmony_ci            remote_path: String::new(),
99cc290419Sopenharmony_ci            base_local_path: String::new(),
100cc290419Sopenharmony_ci            local_path: String::new(),
101cc290419Sopenharmony_ci            local_tar_raw_path: String::new(),
102cc290419Sopenharmony_ci            server_or_daemon: false,
103cc290419Sopenharmony_ci            task_queue: Vec::<String>::new(),
104cc290419Sopenharmony_ci            local_name: String::new(),
105cc290419Sopenharmony_ci            is_dir: false,
106cc290419Sopenharmony_ci            file_size: 0,
107cc290419Sopenharmony_ci            dir_size: 0,
108cc290419Sopenharmony_ci            session_id: _session_id,
109cc290419Sopenharmony_ci            channel_id: _channel_id,
110cc290419Sopenharmony_ci            index: 0,
111cc290419Sopenharmony_ci            file_cnt: 0,
112cc290419Sopenharmony_ci            is_file_mode_sync: false,
113cc290419Sopenharmony_ci            file_begin_time: 0,
114cc290419Sopenharmony_ci            dir_begin_time: 0,
115cc290419Sopenharmony_ci            is_local_dir_exsit: None,
116cc290419Sopenharmony_ci            empty_dirs: String::new(),
117cc290419Sopenharmony_ci            stop_run: false,
118cc290419Sopenharmony_ci            command_str: String::new(),
119cc290419Sopenharmony_ci            transfer_config: TransferConfig::default(),
120cc290419Sopenharmony_ci        }
121cc290419Sopenharmony_ci    }
122cc290419Sopenharmony_ci}
123cc290419Sopenharmony_ci
124cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))]
125cc290419Sopenharmony_cifn set_file_permission(path: String, mode: u32) -> std::io::Result<()> {
126cc290419Sopenharmony_ci    let perms = std::fs::Permissions::from_mode(mode);
127cc290419Sopenharmony_ci    fs::set_permissions(std::path::Path::new(&path), perms)
128cc290419Sopenharmony_ci}
129cc290419Sopenharmony_ci
130cc290419Sopenharmony_ci#[cfg(not(target_os = "windows"))]
131cc290419Sopenharmony_cifn set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()> {
132cc290419Sopenharmony_ci    let perms = std::fs::Permissions::from_mode(mode);
133cc290419Sopenharmony_ci    fs::set_permissions(dir, perms)?;
134cc290419Sopenharmony_ci
135cc290419Sopenharmony_ci    for entry in fs::read_dir(dir)? {
136cc290419Sopenharmony_ci        let entry = entry?;
137cc290419Sopenharmony_ci        let entry_path = dir.join(entry.file_name());
138cc290419Sopenharmony_ci        if entry_path.is_dir() {
139cc290419Sopenharmony_ci            set_dir_permissions_recursive(&entry_path, mode)?;
140cc290419Sopenharmony_ci        }
141cc290419Sopenharmony_ci    }
142cc290419Sopenharmony_ci    Ok(())
143cc290419Sopenharmony_ci}
144cc290419Sopenharmony_ci
145cc290419Sopenharmony_ci#[allow(unused)]
146cc290419Sopenharmony_cifn create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()> {
147cc290419Sopenharmony_ci    let mut dir_path = std::path::Path::new(&path);
148cc290419Sopenharmony_ci    while let Some(p) = dir_path.parent() {
149cc290419Sopenharmony_ci        if p.exists() {
150cc290419Sopenharmony_ci            break;
151cc290419Sopenharmony_ci        }
152cc290419Sopenharmony_ci        dir_path = p;
153cc290419Sopenharmony_ci    }
154cc290419Sopenharmony_ci    #[cfg(not(target_os = "windows"))]
155cc290419Sopenharmony_ci    let exsit = dir_path.exists();
156cc290419Sopenharmony_ci    std::fs::create_dir_all(path.clone())?;
157cc290419Sopenharmony_ci    #[cfg(not(target_os = "windows"))]
158cc290419Sopenharmony_ci    if !exsit {
159cc290419Sopenharmony_ci        set_dir_permissions_recursive(dir_path, mode)
160cc290419Sopenharmony_ci    } else {
161cc290419Sopenharmony_ci        Ok(())
162cc290419Sopenharmony_ci    }
163cc290419Sopenharmony_ci    #[cfg(target_os = "windows")]
164cc290419Sopenharmony_ci    Ok(())
165cc290419Sopenharmony_ci}
166cc290419Sopenharmony_ci
167cc290419Sopenharmony_cipub fn check_local_path(
168cc290419Sopenharmony_ci    transfer: &mut HdcTransferBase,
169cc290419Sopenharmony_ci    _local_path: &str,
170cc290419Sopenharmony_ci    _optional_name: &str,
171cc290419Sopenharmony_ci) -> Result<bool, Error> {
172cc290419Sopenharmony_ci    crate::info!(
173cc290419Sopenharmony_ci        "check_local_path, local_path:{}, optional_name:{}",
174cc290419Sopenharmony_ci        _local_path,
175cc290419Sopenharmony_ci        _optional_name
176cc290419Sopenharmony_ci    );
177cc290419Sopenharmony_ci    let file = metadata(_local_path);
178cc290419Sopenharmony_ci    if let Ok(f) = file {
179cc290419Sopenharmony_ci        if transfer.is_local_dir_exsit.is_none() {
180cc290419Sopenharmony_ci            transfer.is_local_dir_exsit = Some(true);
181cc290419Sopenharmony_ci        }
182cc290419Sopenharmony_ci        transfer.is_dir = f.is_dir();
183cc290419Sopenharmony_ci        if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
184cc290419Sopenharmony_ci            transfer
185cc290419Sopenharmony_ci                .local_path
186cc290419Sopenharmony_ci                .push_str(Base::get_path_sep().to_string().as_str());
187cc290419Sopenharmony_ci        }
188cc290419Sopenharmony_ci    } else if transfer.is_local_dir_exsit.is_none() {
189cc290419Sopenharmony_ci        transfer.is_local_dir_exsit = Some(false);
190cc290419Sopenharmony_ci    }
191cc290419Sopenharmony_ci    let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
192cc290419Sopenharmony_ci    op = op.replace('/', Base::get_path_sep().to_string().as_str());
193cc290419Sopenharmony_ci
194cc290419Sopenharmony_ci    if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
195cc290419Sopenharmony_ci        transfer
196cc290419Sopenharmony_ci            .local_path
197cc290419Sopenharmony_ci            .push_str(Base::get_path_sep().to_string().as_str());
198cc290419Sopenharmony_ci    }
199cc290419Sopenharmony_ci
200cc290419Sopenharmony_ci    if transfer.local_path.ends_with(Base::get_path_sep()) {
201cc290419Sopenharmony_ci        let local_dir = transfer
202cc290419Sopenharmony_ci            .local_path
203cc290419Sopenharmony_ci            .clone()
204cc290419Sopenharmony_ci            .replace('/', Base::get_path_sep().to_string().as_str());
205cc290419Sopenharmony_ci
206cc290419Sopenharmony_ci        if let Some(false) = transfer.is_local_dir_exsit {
207cc290419Sopenharmony_ci            if op.contains(Base::get_path_sep()) {
208cc290419Sopenharmony_ci                let first_sep_index = op.find(Base::get_path_sep()).unwrap_or_default();
209cc290419Sopenharmony_ci                op = op.as_str()[first_sep_index..].to_string();
210cc290419Sopenharmony_ci                crate::debug!(
211cc290419Sopenharmony_ci                    "check_local_path, combine 2 local_dir:{}, op:{}",
212cc290419Sopenharmony_ci                    local_dir,
213cc290419Sopenharmony_ci                    op
214cc290419Sopenharmony_ci                );
215cc290419Sopenharmony_ci            }
216cc290419Sopenharmony_ci        }
217cc290419Sopenharmony_ci
218cc290419Sopenharmony_ci        transfer.local_path = Base::combine(local_dir, op);
219cc290419Sopenharmony_ci    }
220cc290419Sopenharmony_ci    crate::debug!(
221cc290419Sopenharmony_ci        "check_local_path, final transfer.local_path:{}",
222cc290419Sopenharmony_ci        transfer.local_path
223cc290419Sopenharmony_ci    );
224cc290419Sopenharmony_ci    if transfer.local_path.ends_with(Base::get_path_sep()) {
225cc290419Sopenharmony_ci        match create_dir_all_with_permission(transfer.local_path.clone(), 0o750) {
226cc290419Sopenharmony_ci            Ok(_) => Ok(true),
227cc290419Sopenharmony_ci            Err(error) => {
228cc290419Sopenharmony_ci                crate::error!("dir create failed, error:{}", &error);
229cc290419Sopenharmony_ci                Err(error)
230cc290419Sopenharmony_ci            },
231cc290419Sopenharmony_ci        }
232cc290419Sopenharmony_ci    } else {
233cc290419Sopenharmony_ci        let last = transfer.local_path.rfind(Base::get_path_sep());
234cc290419Sopenharmony_ci        match last {
235cc290419Sopenharmony_ci            Some(index) => {
236cc290419Sopenharmony_ci                match create_dir_all_with_permission((transfer.local_path[0..index]).to_string(), 0o750) {
237cc290419Sopenharmony_ci                    Ok(_) => {
238cc290419Sopenharmony_ci                        match File::create(transfer.local_path.clone()) {
239cc290419Sopenharmony_ci                    Ok(_) => {
240cc290419Sopenharmony_ci                        #[cfg(not(target_os = "windows"))]
241cc290419Sopenharmony_ci                        set_file_permission(transfer.local_path.clone(), 0o644)?;
242cc290419Sopenharmony_ci                        Ok(true)
243cc290419Sopenharmony_ci                            },
244cc290419Sopenharmony_ci                    Err(error) => {
245cc290419Sopenharmony_ci                        crate::error!("file create failed, error:{}", &error);
246cc290419Sopenharmony_ci                        Err(error)
247cc290419Sopenharmony_ci                            },
248cc290419Sopenharmony_ci                        }
249cc290419Sopenharmony_ci                    }
250cc290419Sopenharmony_ci                Err(error) => {
251cc290419Sopenharmony_ci                    crate::error!("dir create failed, error:{}", &error);
252cc290419Sopenharmony_ci                    Err(error)
253cc290419Sopenharmony_ci                    },
254cc290419Sopenharmony_ci                }
255cc290419Sopenharmony_ci            }
256cc290419Sopenharmony_ci            None => {
257cc290419Sopenharmony_ci                match File::create(transfer.local_path.clone()) {
258cc290419Sopenharmony_ci                Ok(_) => {
259cc290419Sopenharmony_ci                    #[cfg(not(target_os = "windows"))]
260cc290419Sopenharmony_ci                    set_file_permission(transfer.local_path.clone(), 0o644)?;
261cc290419Sopenharmony_ci                    Ok(true)
262cc290419Sopenharmony_ci                    },
263cc290419Sopenharmony_ci                Err(error) => {
264cc290419Sopenharmony_ci                    crate::error!("file create failed, error:{}", &error);
265cc290419Sopenharmony_ci                    Err(error)
266cc290419Sopenharmony_ci                    },
267cc290419Sopenharmony_ci                }
268cc290419Sopenharmony_ci            }
269cc290419Sopenharmony_ci        }
270cc290419Sopenharmony_ci    }
271cc290419Sopenharmony_ci}
272cc290419Sopenharmony_ci
273cc290419Sopenharmony_cifn spawn_handler(
274cc290419Sopenharmony_ci    _command_data: HdcCommand,
275cc290419Sopenharmony_ci    index: usize,
276cc290419Sopenharmony_ci    local_path: String,
277cc290419Sopenharmony_ci    _channel_id_: u32,
278cc290419Sopenharmony_ci    transfer_config: &TransferConfig,
279cc290419Sopenharmony_ci) -> JoinHandle<(bool, TaskMessage)> {
280cc290419Sopenharmony_ci    let thread_path_ref = Arc::new(Mutex::new(local_path));
281cc290419Sopenharmony_ci    let pos = (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64);
282cc290419Sopenharmony_ci    let compress_type = transfer_config.compress_type;
283cc290419Sopenharmony_ci    let file_size = transfer_config.file_size as u64;
284cc290419Sopenharmony_ci    ylong_runtime::spawn(async move {
285cc290419Sopenharmony_ci        let path = thread_path_ref.lock().await;
286cc290419Sopenharmony_ci        let Ok(mut file) = File::open(&*path) else {
287cc290419Sopenharmony_ci            crate::debug!("open file failed, path:{}", *path);
288cc290419Sopenharmony_ci            let _data_message = TaskMessage {
289cc290419Sopenharmony_ci                channel_id: _channel_id_,
290cc290419Sopenharmony_ci                command: _command_data,
291cc290419Sopenharmony_ci                payload: Vec::new(),
292cc290419Sopenharmony_ci            };
293cc290419Sopenharmony_ci            return (false, _data_message);
294cc290419Sopenharmony_ci        };
295cc290419Sopenharmony_ci        let _ = file.seek(std::io::SeekFrom::Start(pos));
296cc290419Sopenharmony_ci        let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
297cc290419Sopenharmony_ci        let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
298cc290419Sopenharmony_ci        let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
299cc290419Sopenharmony_ci        let mut read_len = 0usize;
300cc290419Sopenharmony_ci        let mut package_read_len = (file_size - pos) as usize;
301cc290419Sopenharmony_ci        if package_read_len > FILE_PACKAGE_PAYLOAD_SIZE {
302cc290419Sopenharmony_ci            package_read_len = FILE_PACKAGE_PAYLOAD_SIZE;
303cc290419Sopenharmony_ci        }
304cc290419Sopenharmony_ci        while read_len < package_read_len {
305cc290419Sopenharmony_ci            let Ok(single_len) = file.read(&mut buf[read_len..]) else {
306cc290419Sopenharmony_ci                crate::debug!("file read failed, path:{}", *path);
307cc290419Sopenharmony_ci                break;
308cc290419Sopenharmony_ci            };
309cc290419Sopenharmony_ci            read_len += single_len;
310cc290419Sopenharmony_ci            if single_len == 0 && read_len < package_read_len {
311cc290419Sopenharmony_ci                break;
312cc290419Sopenharmony_ci            }
313cc290419Sopenharmony_ci        }
314cc290419Sopenharmony_ci        let transfer_compress_type = match CompressType::try_from(compress_type) {
315cc290419Sopenharmony_ci            Ok(compress_type) => compress_type,
316cc290419Sopenharmony_ci            Err(_) => CompressType::None,
317cc290419Sopenharmony_ci        };
318cc290419Sopenharmony_ci
319cc290419Sopenharmony_ci        let mut header: TransferPayload = TransferPayload {
320cc290419Sopenharmony_ci            index: pos,
321cc290419Sopenharmony_ci            compress_type,
322cc290419Sopenharmony_ci            compress_size: 0,
323cc290419Sopenharmony_ci            uncompress_size: 0,
324cc290419Sopenharmony_ci        };
325cc290419Sopenharmony_ci        header.uncompress_size = read_len as u32;
326cc290419Sopenharmony_ci        let capacity = read_len as i32;
327cc290419Sopenharmony_ci
328cc290419Sopenharmony_ci        match transfer_compress_type {
329cc290419Sopenharmony_ci            CompressType::Lz4 => {
330cc290419Sopenharmony_ci                let compress_size: i32;
331cc290419Sopenharmony_ci                header.compress_type = CompressType::Lz4 as u8;
332cc290419Sopenharmony_ci                unsafe {
333cc290419Sopenharmony_ci                    compress_size = LZ4CompressTransfer(
334cc290419Sopenharmony_ci                        buf.as_ptr() as *const libc::c_char,
335cc290419Sopenharmony_ci                        data_buf.as_ptr() as *mut libc::c_char,
336cc290419Sopenharmony_ci                        capacity,
337cc290419Sopenharmony_ci                        capacity,
338cc290419Sopenharmony_ci                    );
339cc290419Sopenharmony_ci                }
340cc290419Sopenharmony_ci                if compress_size > 0 {
341cc290419Sopenharmony_ci                    header.compress_size = compress_size as u32;
342cc290419Sopenharmony_ci                } else {
343cc290419Sopenharmony_ci                    header.compress_type = CompressType::None as u8;
344cc290419Sopenharmony_ci                    header.compress_size = read_len as u32;
345cc290419Sopenharmony_ci                    data_buf = buf;
346cc290419Sopenharmony_ci                }
347cc290419Sopenharmony_ci            }
348cc290419Sopenharmony_ci            _ => {
349cc290419Sopenharmony_ci                header.compress_type = CompressType::None as u8;
350cc290419Sopenharmony_ci                header.compress_size = read_len as u32;
351cc290419Sopenharmony_ci                data_buf = buf;
352cc290419Sopenharmony_ci            }
353cc290419Sopenharmony_ci        }
354cc290419Sopenharmony_ci
355cc290419Sopenharmony_ci        let head_buffer = header.serialize();
356cc290419Sopenharmony_ci        total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
357cc290419Sopenharmony_ci        let data_len = header.compress_size as usize;
358cc290419Sopenharmony_ci        total.append(&mut data_buf[..data_len].to_vec());
359cc290419Sopenharmony_ci        let _data_message = TaskMessage {
360cc290419Sopenharmony_ci            channel_id: _channel_id_,
361cc290419Sopenharmony_ci            command: _command_data,
362cc290419Sopenharmony_ci            payload: total,
363cc290419Sopenharmony_ci        };
364cc290419Sopenharmony_ci        (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
365cc290419Sopenharmony_ci    })
366cc290419Sopenharmony_ci}
367cc290419Sopenharmony_ci
368cc290419Sopenharmony_cifn is_dir_link(path: String) -> bool {
369cc290419Sopenharmony_ci    let ret = std::fs::read_link(path);
370cc290419Sopenharmony_ci    match ret {
371cc290419Sopenharmony_ci        Ok(p) => {
372cc290419Sopenharmony_ci            crate::debug!("link to file:{}", p.display().to_string());
373cc290419Sopenharmony_ci            p.exists() && p.is_dir()
374cc290419Sopenharmony_ci        }
375cc290419Sopenharmony_ci        Err(e) => {
376cc290419Sopenharmony_ci            crate::error!("read_link fail:{:#?}", e);
377cc290419Sopenharmony_ci            false
378cc290419Sopenharmony_ci        }
379cc290419Sopenharmony_ci    }
380cc290419Sopenharmony_ci}
381cc290419Sopenharmony_ci
382cc290419Sopenharmony_cifn is_file_access(path: String) -> bool {
383cc290419Sopenharmony_ci    let file = metadata(path.clone());
384cc290419Sopenharmony_ci    match file {
385cc290419Sopenharmony_ci        Ok(f) => {
386cc290419Sopenharmony_ci            if !f.is_symlink() {
387cc290419Sopenharmony_ci                crate::debug!("file is not a link, path:{}", path);
388cc290419Sopenharmony_ci                return true;
389cc290419Sopenharmony_ci            }
390cc290419Sopenharmony_ci        }
391cc290419Sopenharmony_ci        Err(_e) => {
392cc290419Sopenharmony_ci            crate::error!("metadata file is error, path:{}", path);
393cc290419Sopenharmony_ci            return false;
394cc290419Sopenharmony_ci        }
395cc290419Sopenharmony_ci    }
396cc290419Sopenharmony_ci    let ret = std::fs::read_link(path);
397cc290419Sopenharmony_ci    match ret {
398cc290419Sopenharmony_ci        Ok(p) => {
399cc290419Sopenharmony_ci            crate::debug!("link to file:{}", p.display().to_string());
400cc290419Sopenharmony_ci            p.exists()
401cc290419Sopenharmony_ci        }
402cc290419Sopenharmony_ci        Err(e) => {
403cc290419Sopenharmony_ci            crate::error!("read_link fail:{:#?}", e);
404cc290419Sopenharmony_ci            false
405cc290419Sopenharmony_ci        }
406cc290419Sopenharmony_ci    }
407cc290419Sopenharmony_ci}
408cc290419Sopenharmony_ci
409cc290419Sopenharmony_cipub async fn read_and_send_data(
410cc290419Sopenharmony_ci    local_path: &str,
411cc290419Sopenharmony_ci    session_id: u32,
412cc290419Sopenharmony_ci    _channel_id_: u32,
413cc290419Sopenharmony_ci    _file_size: u64,
414cc290419Sopenharmony_ci    _command_data: HdcCommand,
415cc290419Sopenharmony_ci    transfer_config: &TransferConfig,
416cc290419Sopenharmony_ci) -> bool {
417cc290419Sopenharmony_ci    const MAX_WORKER_COUNT: usize = 5;
418cc290419Sopenharmony_ci    let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
419cc290419Sopenharmony_ci    if pieces_count == 0 {
420cc290419Sopenharmony_ci        pieces_count = 1;
421cc290419Sopenharmony_ci    }
422cc290419Sopenharmony_ci    let workers_count = if pieces_count > MAX_WORKER_COUNT {
423cc290419Sopenharmony_ci        MAX_WORKER_COUNT
424cc290419Sopenharmony_ci    } else {
425cc290419Sopenharmony_ci        pieces_count
426cc290419Sopenharmony_ci    };
427cc290419Sopenharmony_ci    let mut index = 0;
428cc290419Sopenharmony_ci    let mut queue = VecDeque::new();
429cc290419Sopenharmony_ci    while index < workers_count {
430cc290419Sopenharmony_ci        let worker = spawn_handler(
431cc290419Sopenharmony_ci            _command_data,
432cc290419Sopenharmony_ci            index,
433cc290419Sopenharmony_ci            local_path.to_owned(),
434cc290419Sopenharmony_ci            _channel_id_,
435cc290419Sopenharmony_ci            transfer_config,
436cc290419Sopenharmony_ci        );
437cc290419Sopenharmony_ci        queue.push_back(worker);
438cc290419Sopenharmony_ci        index += 1;
439cc290419Sopenharmony_ci    }
440cc290419Sopenharmony_ci    loop {
441cc290419Sopenharmony_ci        if queue.is_empty() {
442cc290419Sopenharmony_ci            crate::debug!("read_and_send_data queue is empty");
443cc290419Sopenharmony_ci            break;
444cc290419Sopenharmony_ci        }
445cc290419Sopenharmony_ci        let Some(handler) = queue.pop_front() else {
446cc290419Sopenharmony_ci            continue;
447cc290419Sopenharmony_ci        };
448cc290419Sopenharmony_ci        let Ok((is_finish, task_message)) = handler.await else {
449cc290419Sopenharmony_ci            continue;
450cc290419Sopenharmony_ci        };
451cc290419Sopenharmony_ci        transfer::put(session_id, task_message).await;
452cc290419Sopenharmony_ci        if is_finish {
453cc290419Sopenharmony_ci            crate::debug!("read_and_send_data is finish return false");
454cc290419Sopenharmony_ci            return false;
455cc290419Sopenharmony_ci        }
456cc290419Sopenharmony_ci
457cc290419Sopenharmony_ci        if (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64) < _file_size {
458cc290419Sopenharmony_ci            let worker = spawn_handler(
459cc290419Sopenharmony_ci                _command_data,
460cc290419Sopenharmony_ci                index,
461cc290419Sopenharmony_ci                local_path.to_owned(),
462cc290419Sopenharmony_ci                _channel_id_,
463cc290419Sopenharmony_ci                transfer_config,
464cc290419Sopenharmony_ci            );
465cc290419Sopenharmony_ci            queue.push_back(worker);
466cc290419Sopenharmony_ci            index += 1;
467cc290419Sopenharmony_ci        }
468cc290419Sopenharmony_ci    }
469cc290419Sopenharmony_ci    true
470cc290419Sopenharmony_ci}
471cc290419Sopenharmony_ci
472cc290419Sopenharmony_cipub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
473cc290419Sopenharmony_ci    let mut header = TransferPayload {
474cc290419Sopenharmony_ci        ..Default::default()
475cc290419Sopenharmony_ci    };
476cc290419Sopenharmony_ci    let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
477cc290419Sopenharmony_ci    let file_index = header.index;
478cc290419Sopenharmony_ci    let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
479cc290419Sopenharmony_ci    let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
480cc290419Sopenharmony_ci        Ok(compress_type) => compress_type,
481cc290419Sopenharmony_ci        Err(_) => CompressType::None,
482cc290419Sopenharmony_ci    };
483cc290419Sopenharmony_ci
484cc290419Sopenharmony_ci    if let CompressType::Lz4 = compress_type {
485cc290419Sopenharmony_ci        let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
486cc290419Sopenharmony_ci        let decompress_size = unsafe {
487cc290419Sopenharmony_ci            LZ4DeompressTransfer(
488cc290419Sopenharmony_ci                _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
489cc290419Sopenharmony_ci                buf.as_ptr() as *mut libc::c_char,
490cc290419Sopenharmony_ci                header.compress_size as i32,
491cc290419Sopenharmony_ci                header.uncompress_size as i32,
492cc290419Sopenharmony_ci            )
493cc290419Sopenharmony_ci        };
494cc290419Sopenharmony_ci        if decompress_size > 0 {
495cc290419Sopenharmony_ci            buffer = buf[..(decompress_size as usize)].to_vec();
496cc290419Sopenharmony_ci        }
497cc290419Sopenharmony_ci    }
498cc290419Sopenharmony_ci
499cc290419Sopenharmony_ci    let path = tbase.local_path.clone();
500cc290419Sopenharmony_ci    let write_buf = buffer.clone();
501cc290419Sopenharmony_ci    let session_id = tbase.session_id.to_owned();
502cc290419Sopenharmony_ci    let channel_id = tbase.channel_id.to_owned();
503cc290419Sopenharmony_ci    utils::spawn(async move {
504cc290419Sopenharmony_ci        let open_result = OpenOptions::new()
505cc290419Sopenharmony_ci            .write(true)
506cc290419Sopenharmony_ci            .create(true)
507cc290419Sopenharmony_ci            .open(path.clone());
508cc290419Sopenharmony_ci        match open_result {
509cc290419Sopenharmony_ci            Ok(mut file) => {
510cc290419Sopenharmony_ci                let _ = file.seek(std::io::SeekFrom::Start(file_index));
511cc290419Sopenharmony_ci                let write_result = file.write_all(write_buf.as_slice());
512cc290419Sopenharmony_ci                match write_result {
513cc290419Sopenharmony_ci                    Ok(()) => {}
514cc290419Sopenharmony_ci                    Err(e) => {
515cc290419Sopenharmony_ci                        let _ = put_last_error(e, session_id, channel_id).await;
516cc290419Sopenharmony_ci                    }
517cc290419Sopenharmony_ci                }
518cc290419Sopenharmony_ci            }
519cc290419Sopenharmony_ci            Err(e) => {
520cc290419Sopenharmony_ci                let _ = put_last_error(e, session_id, channel_id).await;
521cc290419Sopenharmony_ci            }
522cc290419Sopenharmony_ci        }
523cc290419Sopenharmony_ci    });
524cc290419Sopenharmony_ci
525cc290419Sopenharmony_ci    tbase.index += buffer.len() as u64;
526cc290419Sopenharmony_ci    crate::debug!(
527cc290419Sopenharmony_ci        "transfer file [{}] index {} / {}",
528cc290419Sopenharmony_ci        tbase.local_path.clone(),
529cc290419Sopenharmony_ci        tbase.index,
530cc290419Sopenharmony_ci        tbase.file_size
531cc290419Sopenharmony_ci    );
532cc290419Sopenharmony_ci    if tbase.index >= tbase.file_size {
533cc290419Sopenharmony_ci        return true;
534cc290419Sopenharmony_ci    }
535cc290419Sopenharmony_ci    false
536cc290419Sopenharmony_ci}
537cc290419Sopenharmony_ci
538cc290419Sopenharmony_ciasync fn put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool {
539cc290419Sopenharmony_ci    crate::warn!(
540cc290419Sopenharmony_ci        "put_last_error sesssion_id:{}, channel_id:{}, error:{}",
541cc290419Sopenharmony_ci        session_id,
542cc290419Sopenharmony_ci        channel_id,
543cc290419Sopenharmony_ci        error,
544cc290419Sopenharmony_ci    );
545cc290419Sopenharmony_ci    let errno = match error.raw_os_error() {
546cc290419Sopenharmony_ci        Some(errno) => errno as u32,
547cc290419Sopenharmony_ci        None => std::i32::MAX as u32
548cc290419Sopenharmony_ci    };
549cc290419Sopenharmony_ci    match FileTaskMap::get(session_id, channel_id).await {
550cc290419Sopenharmony_ci        Some(task) => {
551cc290419Sopenharmony_ci            let mut task = task.lock().await;
552cc290419Sopenharmony_ci            task.transfer.last_error = errno;
553cc290419Sopenharmony_ci        }
554cc290419Sopenharmony_ci        None => {
555cc290419Sopenharmony_ci            crate::error!(
556cc290419Sopenharmony_ci                "recv_and_write_file get task is none session_id:{},channel_id:{}",
557cc290419Sopenharmony_ci                session_id,
558cc290419Sopenharmony_ci                channel_id,
559cc290419Sopenharmony_ci            );
560cc290419Sopenharmony_ci            return false;
561cc290419Sopenharmony_ci        }
562cc290419Sopenharmony_ci    }
563cc290419Sopenharmony_ci    true
564cc290419Sopenharmony_ci}
565cc290419Sopenharmony_ci
566cc290419Sopenharmony_cipub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
567cc290419Sopenharmony_ci    let mut result = Vec::new();
568cc290419Sopenharmony_ci    let dir_path = PathBuf::from(_path);
569cc290419Sopenharmony_ci    if !is_file_access(_path.clone()) {
570cc290419Sopenharmony_ci        crate::error!("file is invalid link, path:{}", _path);
571cc290419Sopenharmony_ci        return result;
572cc290419Sopenharmony_ci    }
573cc290419Sopenharmony_ci    let Ok(dir_list) = fs::read_dir(dir_path) else {
574cc290419Sopenharmony_ci        crate::error!("read dir fail, path:{}", _path);
575cc290419Sopenharmony_ci        return result;
576cc290419Sopenharmony_ci    };
577cc290419Sopenharmony_ci    for entry in dir_list {
578cc290419Sopenharmony_ci        let Ok(path) = entry else {
579cc290419Sopenharmony_ci            continue;
580cc290419Sopenharmony_ci        };
581cc290419Sopenharmony_ci        let path = path.path();
582cc290419Sopenharmony_ci        if is_dir_link(path.clone().display().to_string()) {
583cc290419Sopenharmony_ci            continue;
584cc290419Sopenharmony_ci        } else if path.is_file() {
585cc290419Sopenharmony_ci            result.push(Base::normalized_path(path).display().to_string());
586cc290419Sopenharmony_ci        } else {
587cc290419Sopenharmony_ci            let p = path.display().to_string();
588cc290419Sopenharmony_ci            let mut sub_list = get_sub_files_resurively(&p);
589cc290419Sopenharmony_ci            result.append(&mut sub_list);
590cc290419Sopenharmony_ci        }
591cc290419Sopenharmony_ci    }
592cc290419Sopenharmony_ci    result.sort();
593cc290419Sopenharmony_ci    result
594cc290419Sopenharmony_ci}
595cc290419Sopenharmony_ci
596cc290419Sopenharmony_cipub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
597cc290419Sopenharmony_ci    let local_path_ = transfer.local_path.clone();
598cc290419Sopenharmony_ci
599cc290419Sopenharmony_ci    read_and_send_data(
600cc290419Sopenharmony_ci        &local_path_,
601cc290419Sopenharmony_ci        transfer.session_id,
602cc290419Sopenharmony_ci        transfer.channel_id,
603cc290419Sopenharmony_ci        transfer.file_size,
604cc290419Sopenharmony_ci        _command_data,
605cc290419Sopenharmony_ci        &transfer.transfer_config,
606cc290419Sopenharmony_ci    )
607cc290419Sopenharmony_ci    .await;
608cc290419Sopenharmony_ci}
609cc290419Sopenharmony_ci
610cc290419Sopenharmony_cipub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
611cc290419Sopenharmony_ci    recv_and_write_file(tbase, _payload)
612cc290419Sopenharmony_ci}
613cc290419Sopenharmony_ci
614cc290419Sopenharmony_cipub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
615cc290419Sopenharmony_ci    let task_message = TaskMessage {
616cc290419Sopenharmony_ci        channel_id,
617cc290419Sopenharmony_ci        command: HdcCommand::KernelChannelClose,
618cc290419Sopenharmony_ci        payload: [1].to_vec(),
619cc290419Sopenharmony_ci    };
620cc290419Sopenharmony_ci    transfer::put(_session_id, task_message).await;
621cc290419Sopenharmony_ci}
622cc290419Sopenharmony_ci
623cc290419Sopenharmony_cipub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
624cc290419Sopenharmony_ci    let task_message = TaskMessage {
625cc290419Sopenharmony_ci        channel_id,
626cc290419Sopenharmony_ci        command: comamnd_finish,
627cc290419Sopenharmony_ci        payload: [1].to_vec(),
628cc290419Sopenharmony_ci    };
629cc290419Sopenharmony_ci    transfer::put(_session_id, task_message).await;
630cc290419Sopenharmony_ci}
631cc290419Sopenharmony_ci
632cc290419Sopenharmony_cipub async fn close_channel(channel_id: u32) {
633cc290419Sopenharmony_ci    transfer::TcpMap::end(channel_id).await;
634cc290419Sopenharmony_ci}
635cc290419Sopenharmony_ci
636cc290419Sopenharmony_cipub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel) {
637cc290419Sopenharmony_ci    #[cfg(feature = "host")]
638cc290419Sopenharmony_ci    {
639cc290419Sopenharmony_ci        let echo_level = match level {
640cc290419Sopenharmony_ci            MessageLevel::Ok => transfer::EchoLevel::OK,
641cc290419Sopenharmony_ci            MessageLevel::Fail => transfer::EchoLevel::FAIL,
642cc290419Sopenharmony_ci            MessageLevel::Info => transfer::EchoLevel::INFO,
643cc290419Sopenharmony_ci        };
644cc290419Sopenharmony_ci        let _ =
645cc290419Sopenharmony_ci            transfer::send_channel_msg(channel_id, echo_level, message.to_string())
646cc290419Sopenharmony_ci                .await;
647cc290419Sopenharmony_ci    }
648cc290419Sopenharmony_ci    #[cfg(not(feature = "host"))]
649cc290419Sopenharmony_ci    {
650cc290419Sopenharmony_ci        let mut data = Vec::<u8>::new();
651cc290419Sopenharmony_ci        data.push(level as u8);
652cc290419Sopenharmony_ci        data.append(&mut message.as_bytes().to_vec());
653cc290419Sopenharmony_ci        let echo_message = TaskMessage {
654cc290419Sopenharmony_ci            channel_id,
655cc290419Sopenharmony_ci            command: HdcCommand::KernelEcho,
656cc290419Sopenharmony_ci            payload: data,
657cc290419Sopenharmony_ci        };
658cc290419Sopenharmony_ci        transfer::put(_session_id, echo_message).await;
659cc290419Sopenharmony_ci    }
660cc290419Sopenharmony_ci}