1/*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15//! hdcfile
16#![allow(missing_docs)]
17
18use crate::transfer;
19use std::fs::metadata;
20
21use std::collections::HashMap;
22use std::io;
23use std::path::Path;
24use std::sync::Arc;
25use std::io::{Error, ErrorKind};
26#[cfg(feature = "host")]
27extern crate ylong_runtime_static as ylong_runtime;
28use ylong_runtime::sync::Mutex;
29
30use crate::common::filemanager::FileManager;
31use crate::common::hdctransfer::*;
32use crate::config::CompressType;
33use crate::config::HdcCommand;
34use crate::config::MessageLevel;
35use crate::config::TaskMessage;
36use crate::config::MAX_SIZE_IOBUF;
37use crate::serializer::serialize::Serialization;
38
39use super::base::Base;
40use super::hdctransfer;
41use crate::serializer::native_struct::TransferConfig;
42use crate::utils;
43#[cfg(not(feature = "host"))]
44use crate::utils::hdc_log::*;
45#[derive(Debug, Default, Clone, PartialEq, Eq)]
46pub struct HdcFile {
47    pub file_cnt: u32,
48    pub dir_size: u64,
49    pub file_size: u64,
50    pub file_begin_time: u64,
51    pub dir_begin_time: u64,
52    pub transfer: HdcTransferBase,
53}
54
55impl HdcFile {
56    pub fn new(_session_id: u32, _channel_id: u32) -> Self {
57        Self {
58            transfer: HdcTransferBase::new(_session_id, _channel_id),
59            ..Default::default()
60        }
61    }
62}
63type HdcFile_ = Arc<Mutex<HdcFile>>;
64type FileTaskMap_ = Arc<Mutex<HashMap<(u32, u32), HdcFile_>>>;
65pub struct FileTaskMap {}
66impl FileTaskMap {
67    fn get_instance() -> FileTaskMap_ {
68        static mut MAP: Option<FileTaskMap_> = None;
69        unsafe {
70            MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
71                .clone()
72        }
73    }
74
75    pub async fn put(session_id: u32, channel_id: u32, value: HdcFile) {
76        let map = Self::get_instance();
77        let mut map = map.lock().await;
78        map.insert((session_id, channel_id), Arc::new(Mutex::new(value)));
79    }
80
81    pub async fn exsit(session_id: u32, channel_id: u32) -> bool {
82        let arc = Self::get_instance();
83        let map = arc.lock().await;
84        let task = map.get(&(session_id, channel_id));
85        task.is_some()
86    }
87
88    pub async fn remove(session_id: u32, channel_id: u32) -> Option<HdcFile_> {
89        let arc = Self::get_instance();
90        let mut map = arc.lock().await;
91        map.remove(&(session_id, channel_id))
92    }
93
94    pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcFile_> {
95        let arc = Self::get_instance();
96        let map = arc.lock().await;
97        let task = map.get(&(session_id, channel_id));
98        task.cloned()
99    }
100
101    async fn stop_task(session_id: u32) {
102        let arc = Self::get_instance();
103        let map = arc.lock().await;
104        crate::info!("hdcfile stop task, session_id:{}, task_size: {}", session_id, map.len());
105        for _iter in map.iter() {
106            if _iter.0 .0 != session_id {
107                continue;
108            }
109            let mut task = _iter.1.lock().await;
110            task.transfer.stop_run = true;
111            crate::info!(
112                "session_id:{}, channel_id:{}, set stop_run as true.",
113                session_id,
114                _iter.0 .1
115            );
116        }
117    }
118
119    async fn dump_task() -> String {
120        let arc = Self::get_instance();
121        let map = arc.lock().await;
122        let mut result = String::new();
123        for _iter in map.iter() {
124            let task = _iter.1.lock().await;
125            let command = task.transfer.command_str.clone();
126            let line = format!(
127                "session_id:{},\tchannel_id:{},\tcommand:{}\n",
128                _iter.0 .0, _iter.0 .1, command
129            );
130            result.push_str(line.as_str());
131        }
132        result
133    }
134}
135
136async fn check_local_path(session_id: u32, channel_id: u32) -> bool {
137    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
138        crate::error!(
139            "check_local_path get task is none session_id={session_id},channel_id={channel_id}"
140        );
141        return false;
142    };
143    let mut file_task = task.lock().await;
144    let local_path = file_task.transfer.local_path.clone();
145    let mut file_manager = FileManager::new(local_path);
146    let (open_result, err_msg) = file_manager.open();
147    if open_result {
148        file_task.transfer.transfer_config.file_size = file_manager.file_size();
149        file_task.transfer.file_size = file_task.transfer.transfer_config.file_size;
150        file_task.file_size = file_task.transfer.transfer_config.file_size;
151        file_task.transfer.transfer_config.optional_name = file_task.transfer.local_name.clone();
152        if transfer::base::CheckCompressVersion::get().await
153            && (file_task.transfer.transfer_config.file_size > (MAX_SIZE_IOBUF as u64))
154        {
155            file_task.transfer.transfer_config.compress_type = CompressType::Lz4 as u8;
156        }
157        file_task.transfer.transfer_config.path = file_task.transfer.remote_path.clone();
158        let command_str = format!(
159            "[file send], local_path:{}, optional_name:{}",
160            file_task.transfer.local_path.clone(),
161            file_task.transfer.transfer_config.optional_name
162        );
163        file_task
164            .transfer
165            .command_str
166            .push_str(command_str.as_str());
167        return true;
168    } else {
169        hdctransfer::echo_client(
170            session_id,
171            channel_id,
172            err_msg.as_str(),
173            MessageLevel::Fail,
174        )
175        .await;
176    }
177    false
178}
179
180async fn echo_finish(session_id: u32, channel_id: u32, msg: String) {
181    hdctransfer::echo_client(
182        session_id,
183        channel_id,
184        msg.as_str(),
185        MessageLevel::Ok,
186    )
187    .await;
188    task_finish(session_id, channel_id).await;
189}
190
191pub async fn begin_transfer(session_id: u32, channel_id: u32, command: &String) -> bool {
192    let (argv, argc) = Base::split_command_to_args(command);
193    if argc < 2 {
194        echo_finish(
195            session_id,
196            channel_id,
197            "Transfer path split failed.".to_string(),
198        )
199        .await;
200        return false;
201    }
202    match set_master_parameters(session_id, channel_id, command, argc, argv).await {
203        Ok(_) => (),
204        Err(e) => {
205            echo_fail(session_id, channel_id, e, false).await;
206            return false;
207        }
208    }
209
210    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
211        crate::error!(
212            "begin_transfer get task is none session_id={session_id},channel_id={channel_id}"
213        );
214        return false;
215    };
216    let mut task = task.lock().await;
217    task.transfer.is_master = true;
218    drop(task);
219
220    let ret = check_local_path(session_id, channel_id).await;
221    if !ret {
222        do_file_finish(session_id, channel_id, &[1]).await;
223        return true;
224    }
225
226    put_file_check(session_id, channel_id).await;
227    true
228}
229
230async fn set_master_parameters(
231    session_id: u32,
232    channel_id: u32,
233    _command: &str,
234    argc: u32,
235    argv: Vec<String>,
236) -> Result<bool, Error> {
237    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
238        crate::error!(
239            "set_master_parameters get task is none session_id={session_id},channel_id={channel_id}"
240        );
241        return Err(Error::new(ErrorKind::Other, "Other failed"));
242    };
243    let mut task = task.lock().await;
244    let mut i: usize = 0;
245    let mut src_argv_index = 0u32;
246    if task.transfer.server_or_daemon {
247        src_argv_index += 2; // 2: represent the host parameters: "file" "send".
248    } // else: src_argv_index += 0: the host parameters "file" "recv" will be filtered.
249    while i < argc as usize {
250        match &argv[i] as &str {
251            "-z" => {
252                task.transfer.transfer_config.compress_type = CompressType::Lz4 as u8;
253                src_argv_index += 1;
254            }
255            "-a" => {
256                task.transfer.transfer_config.hold_timestamp = true;
257                src_argv_index += 1;
258            }
259            "-sync" => {
260                task.transfer.transfer_config.update_if_new = true;
261                src_argv_index += 1;
262            }
263            "-m" => {
264                src_argv_index += 1;
265            }
266            "-remote" => {
267                src_argv_index += 1;
268            }
269            "-cwd" => {
270                src_argv_index += 2;
271                task.transfer.transfer_config.client_cwd = argv.get(i + 1).unwrap().clone();
272            }
273            _ => {}
274        }
275        i += 1;
276    }
277    if argc == src_argv_index {
278        crate::error!("set_master_parameters argc = {:#?} return false", argc);
279        return Err(Error::new(ErrorKind::Other, "There is no local and remote path"));
280    }
281    task.transfer.remote_path = argv.last().unwrap().clone();
282    task.transfer.local_path = argv.get(argv.len() - 2).unwrap().clone();
283    if task.transfer.server_or_daemon {
284        if src_argv_index + 1 == argc {
285            crate::error!("src_argv_index = {:#?} return false", src_argv_index);
286            return Err(Error::new(ErrorKind::Other, "There is no remote path"));
287        }
288        let cwd = task.transfer.transfer_config.client_cwd.clone();
289        task.transfer.local_path = Base::extract_relative_path(&cwd, &task.transfer.local_path);
290    } else if src_argv_index + 1 == argc {
291        task.transfer.remote_path = String::from(".");
292        task.transfer.local_path = argv.get((argc - 1) as usize).unwrap().clone();
293    }
294    task.transfer.local_name = Base::get_file_name(&mut task.transfer.local_path).unwrap();
295    match metadata(task.transfer.local_path.clone()) {
296        Ok(metadata) => {
297            if !metadata.is_dir() {
298                task.transfer.is_dir = false;
299                return Ok(true);
300            }
301            task.transfer.is_dir = true;
302            task.transfer.task_queue = get_sub_files_resurively(&task.transfer.local_path.clone());
303            task.transfer.base_local_path = get_base_path(task.transfer.local_path.clone());
304
305            if !task.transfer.task_queue.is_empty() {
306                task.transfer.local_path = task.transfer.task_queue.pop().unwrap();
307                task.transfer.local_name =
308                    match Base::get_relative_path(&task.transfer.base_local_path, &task.transfer.local_path) {
309                        Some(relative_path) => relative_path,
310                        None => task.transfer.local_path.clone()
311                    };
312            } else {
313                crate::error!("task transfer task_queue is empty");
314                return Err(Error::new(ErrorKind::Other, "Operation failed, because the source folder is empty."));
315            }
316        },
317        Err(error) => {
318            let err_msg = format!("Error opening file: {}, path: {}", error, task.transfer.local_path);
319            crate::error!("{}", err_msg);
320            return Err(Error::new(ErrorKind::Other, err_msg));
321        },
322    }
323    Ok(true)
324}
325
326fn get_base_path(path: String) -> String {
327    let p = Path::new(path.as_str());
328    let parent_path = p.parent();
329    if let Some(pp) = parent_path {
330        pp.display().to_string()
331    } else {
332        path
333    }
334}
335
336async fn put_file_check(session_id: u32, channel_id: u32) {
337    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
338        return;
339    };
340    let task = task.lock().await;
341    let file_check_message = TaskMessage {
342        channel_id,
343        command: HdcCommand::FileCheck,
344        payload: task.transfer.transfer_config.serialize(),
345    };
346    transfer::put(task.transfer.session_id, file_check_message).await;
347}
348
349pub async fn check_slaver(session_id: u32, channel_id: u32, _payload: &[u8]) -> Result<bool, Error> {
350    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
351        crate::error!(
352            "check_slaver get task is none session_id={session_id:?},channel_id={channel_id:?}"
353        );
354        return Err(Error::new(ErrorKind::Other, "Other failed"));
355    };
356    let mut task = task.lock().await;
357    let mut transconfig = TransferConfig {
358        ..Default::default()
359    };
360    let _ = transconfig.parse(_payload.to_owned());
361    task.transfer.file_size = transconfig.file_size;
362    task.file_size = transconfig.file_size;
363    task.transfer.local_path = transconfig.path;
364    task.transfer.is_master = false;
365    task.transfer.index = 0;
366    let command_str = format!(
367        "[file recv],\t local_path:{},\t optional_name:{}\t",
368        task.transfer.local_path.clone(),
369        transconfig.optional_name
370    );
371    task.transfer.command_str.push_str(command_str.as_str());
372    let local_path = task.transfer.local_path.clone();
373    let optional_name = transconfig.optional_name.clone();
374    task.transfer.transfer_config.compress_type = transconfig.compress_type;
375    match hdctransfer::check_local_path(&mut task.transfer, &local_path, &optional_name) {
376        Ok(_) => (),
377        Err(e) => {
378            crate::error!("check_local_path return false channel_id={:#?}", channel_id);
379            return Err(e);
380        },
381    }
382    if task.transfer.transfer_config.update_if_new {
383        crate::error!("task.transfer.transfer_config.update_if_new is true");
384        return Err(Error::new(ErrorKind::Other, "Other failed"));
385    }
386    if task.dir_begin_time == 0 {
387        task.dir_begin_time = utils::get_current_time();
388    }
389    task.file_begin_time = utils::get_current_time();
390    Ok(true)
391}
392
393pub async fn wake_up_slaver(session_id: u32, channel_id: u32) {
394    let wake_up_message = TaskMessage {
395        channel_id,
396        command: HdcCommand::KernelWakeupSlavetask,
397        payload: Vec::<u8>::new(),
398    };
399    transfer::put(session_id, wake_up_message).await;
400}
401
402async fn put_file_begin(session_id: u32, channel_id: u32) {
403    let file_begin_message = TaskMessage {
404        channel_id,
405        command: HdcCommand::FileBegin,
406        payload: Vec::<u8>::new(),
407    };
408    transfer::put(session_id, file_begin_message).await;
409}
410
411async fn transfer_next(session_id: u32, channel_id: u32) -> bool {
412    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
413        crate::error!(
414            "transfer_next get task is none session_id={session_id:?},channel_id={channel_id:?}"
415        );
416        return false;
417    };
418    let mut task = task.lock().await;
419    let Some(local_path) = task.transfer.task_queue.pop() else {
420        crate::error!(
421            "transfer_next get local path is none session_id={session_id:?},channel_id={channel_id:?}"
422        );
423        return false;
424    };
425    task.transfer.local_path = local_path;
426    task.transfer.local_name =
427        match Base::get_relative_path(&task.transfer.base_local_path, &task.transfer.local_path) {
428            Some(relative_path) => relative_path,
429            None => task.transfer.local_path.clone()
430        };
431    drop(task);
432    check_local_path(session_id, channel_id).await
433}
434
435async fn on_all_transfer_finish(session_id: u32, channel_id: u32) {
436    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
437        crate::error!(
438            "on_all_transfer_finish get task is none session_id={session_id:?},channel_id={channel_id:?}"
439        );
440        return;
441    };
442    let task = task.lock().await;
443    let last_error = task.transfer.last_error;
444    let size = if task.file_cnt > 1 {
445        task.dir_size
446    } else {
447        task.file_size
448    };
449    let time = if task.file_cnt > 1 {
450        utils::get_current_time() - task.dir_begin_time
451    } else {
452        utils::get_current_time() - task.file_begin_time
453    };
454    let rate = size as f64 / time as f64;
455    #[allow(unused_variables)]
456    let message = if last_error == 0 {
457        format!(
458            "FileTransfer finish, Size:{}, File count = {}, time:{}ms rate:{:.2}kB/s",
459            size, task.file_cnt, time, rate
460        )
461    } else {
462        format!(
463            "Transfer failed: {}: {}",
464            task.transfer.local_path,
465            io::Error::from_raw_os_error(last_error as i32),
466        )
467    };
468    #[cfg(feature = "host")]
469    {
470        let level = if last_error == 0 {
471            transfer::EchoLevel::OK
472        } else {
473            transfer::EchoLevel::FAIL
474        };
475        let _ =
476            transfer::send_channel_msg(task.transfer.channel_id, level, message)
477                .await;
478        hdctransfer::close_channel(channel_id).await;
479        return;
480    }
481    #[allow(unreachable_code)]
482    {
483        let level = if last_error == 0 {
484            MessageLevel::Ok
485        } else {
486            MessageLevel::Fail
487        };
488        hdctransfer::echo_client(
489            task.transfer.session_id,
490            task.transfer.channel_id,
491            message.as_str(),
492            level,
493        )
494        .await;
495        hdctransfer::close_channel(channel_id).await;
496    }
497}
498
499async fn is_task_queue_empty(session_id: u32, channel_id: u32) -> bool {
500    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
501        crate::error!(
502            "do_file_finish get task is none session_id={session_id:?},channel_id={channel_id:?}"
503        );
504        return false;
505    };
506    let task = task.lock().await;
507    task.transfer.task_queue.is_empty()
508}
509
510async fn do_file_finish(session_id: u32, channel_id: u32, _payload: &[u8]) {
511    if _payload[0] == 1 {
512        while !is_task_queue_empty(session_id, channel_id).await {
513            if transfer_next(session_id, channel_id).await {
514                put_file_check(session_id, channel_id).await;
515                return;
516            }
517        }
518
519        if is_task_queue_empty(session_id, channel_id).await {
520            let _finish_message = TaskMessage {
521                channel_id,
522                command: HdcCommand::FileFinish,
523                payload: [0].to_vec(),
524            };
525
526            transfer::put(session_id, _finish_message).await;
527        }
528    } else {
529        on_all_transfer_finish(session_id, channel_id).await;
530        task_finish(session_id, channel_id).await;
531    }
532}
533
534async fn put_file_finish(session_id: u32, channel_id: u32) {
535    let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
536        crate::error!(
537            "put_file_finish get task is none session_id={session_id:?},channel_id={channel_id:?}"
538        );
539        return;
540    };
541    let mut task = task.lock().await;
542    let _payload: [u8; 1] = [1];
543    task.file_cnt += 1;
544    task.dir_size += task.file_size;
545    let task_finish_message = TaskMessage {
546        channel_id,
547        command: HdcCommand::FileFinish,
548        payload: _payload.to_vec(),
549    };
550    transfer::put(session_id, task_finish_message).await;
551}
552
553pub async fn command_dispatch(
554    session_id: u32,
555    channel_id: u32,
556    _command: HdcCommand,
557    _payload: &[u8],
558    _payload_size: u16,
559) -> bool {
560    match _command {
561        HdcCommand::FileInit => {
562            let s = String::from_utf8(_payload.to_vec());
563            match s {
564                Ok(str) => {
565                    wake_up_slaver(session_id, channel_id).await;
566                    begin_transfer(session_id, channel_id, &str).await;
567                }
568                Err(e) => {
569                    let err_msg = format!("Transfer failed: arguments is invalid {:?}", e);
570                    crate::error!("HdcCommand::FileInit: {}", err_msg);
571                    echo_finish(session_id, channel_id, err_msg.to_string()).await;
572                }
573            }
574        }
575        HdcCommand::FileCheck => {
576            match check_slaver(session_id, channel_id, _payload).await {
577                Ok(_) => {
578                    put_file_begin(session_id, channel_id).await;
579                },
580                Err(e) => {
581                    echo_fail(session_id, channel_id, e, true).await;
582                }
583            }
584        }
585        HdcCommand::FileBegin => {
586            let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
587                crate::error!(
588                    "command_dispatch get task is none session_id={session_id:?},channel_id={channel_id:?}"
589                );
590                return false;
591            };
592            let task = task.lock().await;
593            hdctransfer::transfer_begin(&task.transfer, HdcCommand::FileData).await;
594        }
595        HdcCommand::FileData => {
596            let Some(task) = FileTaskMap::get(session_id, channel_id).await else {
597                crate::error!(
598                    "command_dispatch get task is none session_id={session_id:?},channel_id={channel_id:?}"
599                );
600                return false;
601            };
602            let mut task = task.lock().await;
603            if hdctransfer::transfer_data(&mut task.transfer, _payload) {
604                drop(task);
605                put_file_finish(session_id, channel_id).await;
606            }
607        }
608        HdcCommand::FileMode | HdcCommand::DirMode => {
609            put_file_mode(session_id, channel_id).await;
610        }
611        HdcCommand::FileFinish => {
612            do_file_finish(session_id, channel_id, _payload).await;
613        }
614        _ => {
615            crate::error!("others, command {:?}", _command);
616        }
617    }
618
619    true
620}
621
622async fn put_file_mode(session_id: u32, channel_id: u32) {
623    let task_message = TaskMessage {
624        channel_id,
625        command: HdcCommand::FileMode,
626        payload: Vec::<u8>::new(),
627    };
628    transfer::put(session_id, task_message).await;
629}
630
631async fn task_finish(session_id: u32, channel_id: u32) {
632    hdctransfer::transfer_task_finish(channel_id, session_id).await;
633}
634
635pub async fn stop_task(session_id: u32) {
636    FileTaskMap::stop_task(session_id).await;
637}
638
639pub async fn dump_task() -> String {
640    FileTaskMap::dump_task().await
641}
642
643pub async fn echo_fail(session_id: u32, channel_id: u32, error: Error, is_checked: bool) {
644    let message = match FileTaskMap::get(session_id, channel_id).await {
645        Some(task) => {
646            if is_checked {
647                let task = task.lock().await;
648                format!("Error opening file: {}, path: {}", error, task.transfer.local_path)
649            } else {
650                format!("{}", error)
651            }
652        }
653        None => format!(
654            "Error opening file: {}, path: {}",
655            error,
656            "cannot get file path from FileTaskMap",
657        )
658    };
659    hdctransfer::echo_client(
660        session_id,
661        channel_id,
662        message.as_str(),
663        MessageLevel::Fail,
664    )
665    .await;
666    task_finish(session_id, channel_id).await;
667}