1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io::{self, SeekFrom};
15 use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
16 use std::sync::{Mutex, MutexGuard};
17 use std::time::Duration;
18 
19 use ylong_http_client::async_impl::{Body, Client, Request, RequestBuilder, Response};
20 use ylong_http_client::{ErrorKind, HttpClientError};
21 use ylong_runtime::io::{AsyncSeekExt, AsyncWriteExt};
22 
23 cfg_oh! {
24     use crate::manage::SystemConfig;
25 }
26 
27 use super::config::{Mode, Version};
28 use super::info::{CommonTaskInfo, State, TaskInfo, UpdateInfo};
29 use super::notify::{EachFileStatus, NotifyData, Progress};
30 use super::reason::Reason;
31 use crate::error::ErrorCode;
32 use crate::manage::database::RequestDb;
33 use crate::manage::network::Network;
34 use crate::manage::notifier::Notifier;
35 use crate::service::client::ClientManagerEntry;
36 use crate::task::client::build_client;
37 use crate::task::config::{Action, TaskConfig};
38 use crate::task::files::{AttachedFiles, Files};
39 use crate::utils::form_item::FileSpec;
40 use crate::utils::get_current_timestamp;
41 
42 const RETRY_TIMES: u32 = 4;
43 const RETRY_INTERVAL: u64 = 400;
44 
45 pub(crate) struct RequestTask {
46     pub(crate) conf: TaskConfig,
47     pub(crate) client: Client,
48     pub(crate) files: Files,
49     pub(crate) body_files: Files,
50     pub(crate) ctime: u64,
51     pub(crate) mime_type: Mutex<String>,
52     pub(crate) progress: Mutex<Progress>,
53     pub(crate) status: Mutex<TaskStatus>,
54     pub(crate) code: Mutex<Vec<Reason>>,
55     pub(crate) tries: AtomicU32,
56     pub(crate) background_notify_time: AtomicU64,
57     pub(crate) file_total_size: AtomicI64,
58     pub(crate) rate_limiting: AtomicU64,
59     pub(crate) last_notify: AtomicU64,
60     pub(crate) client_manager: ClientManagerEntry,
61     pub(crate) running_result: Mutex<Option<Result<(), Reason>>>,
62     pub(crate) network: Network,
63     pub(crate) timeout_tries: AtomicU32,
64     pub(crate) upload_resume: AtomicBool,
65 }
66 
67 impl RequestTask {
68     pub(crate) fn task_id(&self) -> u32 {
69         self.conf.common_data.task_id
70     }
71 
72     pub(crate) fn uid(&self) -> u64 {
73         self.conf.common_data.uid
74     }
75 
76     pub(crate) fn config(&self) -> &TaskConfig {
77         &self.conf
78     }
79 
80     // only use for download task
81     pub(crate) fn mime_type(&self) -> String {
82         self.mime_type.lock().unwrap().clone()
83     }
84 
85     pub(crate) fn action(&self) -> Action {
86         self.conf.common_data.action
87     }
88 
89     pub(crate) fn mode(&self) -> Mode {
90         self.conf.common_data.mode
91     }
92 
93     pub(crate) fn speed_limit(&self, limit: u64) {
94         let old = self.rate_limiting.swap(limit, Ordering::SeqCst);
95         if old != limit {
96             info!("task {} speed_limit {}", self.task_id(), limit);
97         }
98     }
99 
100     pub(crate) async fn network_retry(&self) -> Result<(), TaskError> {
101         if self.tries.load(Ordering::SeqCst) < RETRY_TIMES {
102             self.tries.fetch_add(1, Ordering::SeqCst);
103             if !self.network.is_online() {
104                 return Err(TaskError::Waiting(TaskPhase::NetworkOffline));
105             } else {
106                 ylong_runtime::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await;
107                 return Err(TaskError::Waiting(TaskPhase::NeedRetry));
108             }
109         }
110         Ok(())
111     }
112 }
113 
114 pub(crate) fn change_upload_size(begins: u64, mut ends: i64, size: i64) -> i64 {
115     if ends < 0 || ends >= size {
116         ends = size - 1;
117     }
118     if begins as i64 > ends {
119         return size;
120     }
121     ends - begins as i64 + 1
122 }
123 
124 impl RequestTask {
125     pub(crate) fn new(
126         config: TaskConfig,
127         files: AttachedFiles,
128         client: Client,
129         client_manager: ClientManagerEntry,
130         network: Network,
131         upload_resume: bool,
132     ) -> RequestTask {
133         let file_len = files.files.len();
134         let action = config.common_data.action;
135 
136         let file_total_size = match action {
137             Action::Upload => {
138                 let mut file_total_size = 0i64;
139                 // If the total size overflows, ignore it.
140                 for size in files.sizes.iter() {
141                     file_total_size += *size;
142                 }
143                 file_total_size
144             }
145             Action::Download => -1,
146             _ => unreachable!("Action::Any in RequestTask::new never reach"),
147         };
148 
149         let mut sizes = files.sizes.clone();
150 
151         if action == Action::Upload && config.common_data.index < sizes.len() as u32 {
152             sizes[config.common_data.index as usize] = change_upload_size(
153                 config.common_data.begins,
154                 config.common_data.ends,
155                 sizes[config.common_data.index as usize],
156             );
157         }
158 
159         let time = get_current_timestamp();
160         let status = TaskStatus::new(time);
161         let progress = Progress::new(sizes);
162 
163         RequestTask {
164             conf: config,
165             client,
166             files: files.files,
167             body_files: files.body_files,
168             ctime: time,
169             mime_type: Mutex::new(String::new()),
170             progress: Mutex::new(progress),
171             tries: AtomicU32::new(0),
172             status: Mutex::new(status),
173             code: Mutex::new(vec![Reason::Default; file_len]),
174             background_notify_time: AtomicU64::new(time),
175             file_total_size: AtomicI64::new(file_total_size),
176             rate_limiting: AtomicU64::new(0),
177             last_notify: AtomicU64::new(time),
178             client_manager,
179             running_result: Mutex::new(None),
180             network,
181             timeout_tries: AtomicU32::new(0),
182             upload_resume: AtomicBool::new(upload_resume),
183         }
184     }
185 
186     pub(crate) fn new_by_info(
187         config: TaskConfig,
188         #[cfg(feature = "oh")] system: SystemConfig,
189         info: TaskInfo,
190         client_manager: ClientManagerEntry,
191         network: Network,
192         upload_resume: bool,
193     ) -> Result<RequestTask, ErrorCode> {
194         #[cfg(feature = "oh")]
195         let (files, client) = check_config(&config, system)?;
196         #[cfg(not(feature = "oh"))]
197         let (files, client) = check_config(&config)?;
198 
199         let file_len = files.files.len();
200         let action = config.common_data.action;
201         let time = get_current_timestamp();
202 
203         let file_total_size = match action {
204             Action::Upload => {
205                 let mut file_total_size = 0i64;
206                 // If the total size overflows, ignore it.
207                 for size in files.sizes.iter() {
208                     file_total_size += *size;
209                 }
210                 file_total_size
211             }
212             Action::Download => *info.progress.sizes.first().unwrap_or(&-1),
213             _ => unreachable!("Action::Any in RequestTask::new never reach"),
214         };
215 
216         // If `TaskInfo` is provided, use data of it.
217         let ctime = info.common_data.ctime;
218         let mime_type = info.mime_type.clone();
219         let tries = info.common_data.tries;
220         let status = TaskStatus {
221             mtime: time,
222             state: State::from(info.progress.common_data.state),
223             reason: Reason::from(info.common_data.reason),
224         };
225         let progress = info.progress;
226 
227         Ok(RequestTask {
228             conf: config,
229             client,
230             files: files.files,
231             body_files: files.body_files,
232             ctime,
233             mime_type: Mutex::new(mime_type),
234             progress: Mutex::new(progress),
235             tries: AtomicU32::new(tries),
236             status: Mutex::new(status),
237             code: Mutex::new(vec![Reason::Default; file_len]),
238             background_notify_time: AtomicU64::new(time),
239             file_total_size: AtomicI64::new(file_total_size),
240             rate_limiting: AtomicU64::new(0),
241             last_notify: AtomicU64::new(time),
242             client_manager,
243             running_result: Mutex::new(None),
244             network,
245             timeout_tries: AtomicU32::new(0),
246             upload_resume: AtomicBool::new(upload_resume),
247         })
248     }
249 
250     pub(crate) fn build_notify_data(&self) -> NotifyData {
251         let vec = self.get_each_file_status();
252         NotifyData {
253             bundle: self.conf.bundle.clone(),
254             // `unwrap` for propagating panics among threads.
255             progress: self.progress.lock().unwrap().clone(),
256             action: self.conf.common_data.action,
257             version: self.conf.version,
258             each_file_status: vec,
259             task_id: self.conf.common_data.task_id,
260         }
261     }
262 
263     pub(crate) fn update_progress_in_database(&self) {
264         let mtime = self.status.lock().unwrap().mtime;
265         let reason = self.status.lock().unwrap().reason;
266         let progress = self.progress.lock().unwrap().clone();
267         let update_info = UpdateInfo {
268             mtime,
269             reason: reason.repr,
270             progress,
271             each_file_status: RequestTask::get_each_file_status_by_code(
272                 &self.code.lock().unwrap(),
273                 &self.conf.file_specs,
274             ),
275             tries: self.tries.load(Ordering::SeqCst),
276             mime_type: self.mime_type(),
277         };
278         RequestDb::get_instance().update_task(self.task_id(), update_info);
279     }
280 
281     pub(crate) fn build_request_builder(&self) -> Result<RequestBuilder, HttpClientError> {
282         use ylong_http_client::async_impl::PercentEncoder;
283 
284         let url = self.conf.url.clone();
285         let url = match PercentEncoder::encode(url.as_str()) {
286             Ok(value) => value,
287             Err(e) => {
288                 error!("url percent encoding error is {:?}", e);
289                 return Err(e);
290             }
291         };
292 
293         let method = match self.conf.method.to_uppercase().as_str() {
294             "PUT" => "PUT",
295             "POST" => "POST",
296             "GET" => "GET",
297             _ => match self.conf.common_data.action {
298                 Action::Upload => {
299                     if self.conf.version == Version::API10 {
300                         "PUT"
301                     } else {
302                         "POST"
303                     }
304                 }
305                 Action::Download => "GET",
306                 _ => "",
307             },
308         };
309         let mut request = RequestBuilder::new().method(method).url(url.as_str());
310         for (key, value) in self.conf.headers.iter() {
311             request = request.header(key.as_str(), value.as_str());
312         }
313         Ok(request)
314     }
315 
316     pub(crate) async fn clear_downloaded_file(&self) -> Result<(), std::io::Error> {
317         info!("task {} clear downloaded file", self.task_id());
318         let file = self.files.get_mut(0).unwrap();
319         file.set_len(0).await?;
320         file.seek(SeekFrom::Start(0)).await?;
321 
322         let mut progress_guard = self.progress.lock().unwrap();
323         progress_guard.common_data.total_processed = 0;
324         progress_guard.processed[0] = 0;
325 
326         Ok(())
327     }
328 
329     pub(crate) async fn build_download_request(&self) -> Result<Request, TaskError> {
330         let mut request_builder = self.build_request_builder()?;
331 
332         let file = self.files.get_mut(0).unwrap();
333 
334         let has_downloaded = file.metadata().await?.len();
335         let resume_download = has_downloaded > 0;
336         let require_range = self.require_range();
337 
338         let begins = self.conf.common_data.begins;
339         let ends = self.conf.common_data.ends;
340 
341         debug!(
342             "task {} build download request, resume_download: {}, require_range: {}",
343             self.task_id(),
344             resume_download,
345             require_range
346         );
347         match (resume_download, require_range) {
348             (true, false) => {
349                 let (builder, support_range) = self.support_range(request_builder);
350                 request_builder = builder;
351                 if support_range {
352                     request_builder =
353                         self.range_request(request_builder, begins + has_downloaded, ends);
354                 } else {
355                     self.clear_downloaded_file().await?;
356                 }
357             }
358             (false, true) => {
359                 request_builder = self.range_request(request_builder, begins, ends);
360             }
361             (true, true) => {
362                 let (builder, support_range) = self.support_range(request_builder);
363                 request_builder = builder;
364                 if support_range {
365                     request_builder =
366                         self.range_request(request_builder, begins + has_downloaded, ends);
367                 } else {
368                     return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
369                 }
370             }
371             (false, false) => {}
372         };
373 
374         let request = request_builder.body(Body::slice(self.conf.data.clone()))?;
375         Ok(request)
376     }
377 
range_requestnull378     fn range_request(
379         &self,
380         request_builder: RequestBuilder,
381         begins: u64,
382         ends: i64,
383     ) -> RequestBuilder {
384         let range = if ends < 0 {
385             format!("bytes={begins}-")
386         } else {
387             format!("bytes={begins}-{ends}")
388         };
389         request_builder.header("Range", range.as_str())
390     }
391 
support_rangenull392     fn support_range(&self, mut request_builder: RequestBuilder) -> (RequestBuilder, bool) {
393         let progress_guard = self.progress.lock().unwrap();
394         let mut support_range = false;
395         if let Some(etag) = progress_guard.extras.get("etag") {
396             request_builder = request_builder.header("If-Range", etag.as_str());
397             support_range = true;
398         } else if let Some(last_modified) = progress_guard.extras.get("last-modified") {
399             request_builder = request_builder.header("If-Range", last_modified.as_str());
400             support_range = true;
401         }
402         if !support_range {
403             info!("task {} not support range", self.task_id());
404         }
405         (request_builder, support_range)
406     }
407 
408     pub(crate) fn get_file_info(&self, response: &Response) -> Result<(), TaskError> {
409         let content_type = response.headers().get("content-type");
410         if let Some(mime_type) = content_type {
411             if let Ok(value) = mime_type.to_string() {
412                 *self.mime_type.lock().unwrap() = value;
413             }
414         }
415 
416         let content_length = response.headers().get("content-length");
417         if let Some(Ok(len)) = content_length.map(|v| v.to_string()) {
418             match len.parse::<i64>() {
419                 Ok(v) => {
420                     let mut progress = self.progress.lock().unwrap();
421                     progress.sizes = vec![v + progress.processed[0] as i64];
422                     self.file_total_size.store(v, Ordering::SeqCst);
423                     debug!("the download task content-length is {}", v);
424                 }
425                 Err(e) => {
426                     error!("convert string to i64 error: {:?}", e);
427                 }
428             }
429         } else {
430             error!("cannot get content-length of the task");
431             if self.conf.common_data.precise {
432                 return Err(TaskError::Failed(Reason::GetFileSizeFailed));
433             }
434         }
435         Ok(())
436     }
437 
438     pub(crate) async fn handle_download_error(
439         &self,
440         err: HttpClientError,
441     ) -> Result<(), TaskError> {
442         if err.error_kind() != ErrorKind::UserAborted {
443             error!("Task {} {:?}", self.task_id(), err);
444         }
445         match err.error_kind() {
446             ErrorKind::Timeout => Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
447             // user triggered
448             ErrorKind::UserAborted => Err(TaskError::Waiting(TaskPhase::UserAbort)),
449             ErrorKind::BodyTransfer | ErrorKind::BodyDecode => {
450                 self.network_retry().await?;
451                 Err(TaskError::Failed(Reason::OthersError))
452             }
453             _ => {
454                 if format!("{}", err).contains("No space left on device") {
455                     Err(TaskError::Failed(Reason::InsufficientSpace))
456                 } else {
457                     Err(TaskError::Failed(Reason::OthersError))
458                 }
459             }
460         }
461     }
462 
463     #[cfg(feature = "oh")]
464     pub(crate) fn notify_response(&self, response: &Response) {
465         let tid = self.conf.common_data.task_id;
466         let version: String = response.version().as_str().into();
467         let status_code: u32 = response.status().as_u16() as u32;
468         let status_message: String;
469         if let Some(reason) = response.status().reason() {
470             status_message = reason.into();
471         } else {
472             error!("bad status_message {:?}", status_code);
473             return;
474         }
475         let headers = response.headers().clone();
476         debug!("notify_response");
477         self.client_manager
478             .send_response(tid, version, status_code, status_message, headers)
479     }
480 
481     pub(crate) fn require_range(&self) -> bool {
482         self.conf.common_data.begins > 0 || self.conf.common_data.ends >= 0
483     }
484 
485     pub(crate) async fn record_upload_response(
486         &self,
487         index: usize,
488         response: Result<Response, HttpClientError>,
489     ) {
490         if let Ok(mut r) = response {
491             {
492                 let mut guard = self.progress.lock().unwrap();
493                 guard.extras.clear();
494                 for (k, v) in r.headers() {
495                     if let Ok(value) = v.to_string() {
496                         guard.extras.insert(k.to_string().to_lowercase(), value);
497                     }
498                 }
499             }
500 
501             let file = match self.body_files.get_mut(index) {
502                 Some(file) => file,
503                 None => return,
504             };
505             let _ = file.set_len(0).await;
506             loop {
507                 let mut buf = [0u8; 1024];
508                 let size = r.data(&mut buf).await;
509                 let size = match size {
510                     Ok(size) => size,
511                     Err(_e) => break,
512                 };
513 
514                 if size == 0 {
515                     break;
516                 }
517                 let _ = file.write_all(&buf[..size]).await;
518             }
519             // Makes sure all the data has been written to the target file.
520             let _ = file.sync_all().await;
521         }
522     }
523 
524     pub(crate) fn get_each_file_status(&self) -> Vec<EachFileStatus> {
525         let mut vec = Vec::new();
526         // `unwrap` for propagating panics among threads.
527         let codes_guard = self.code.lock().unwrap();
528         for (i, file_spec) in self.conf.file_specs.iter().enumerate() {
529             let reason = *codes_guard.get(i).unwrap_or(&Reason::Default);
530             vec.push(EachFileStatus {
531                 path: file_spec.path.clone(),
532                 reason,
533                 message: reason.to_str().into(),
534             });
535         }
536         vec
537     }
538 
539     pub(crate) fn get_each_file_status_by_code(
540         codes_guard: &MutexGuard<Vec<Reason>>,
541         file_specs: &[FileSpec],
542     ) -> Vec<EachFileStatus> {
543         let mut vec = Vec::new();
544         for (i, file_spec) in file_specs.iter().enumerate() {
545             let reason = *codes_guard.get(i).unwrap_or(&Reason::Default);
546             vec.push(EachFileStatus {
547                 path: file_spec.path.clone(),
548                 reason,
549                 message: reason.to_str().into(),
550             });
551         }
552         vec
553     }
554 
555     pub(crate) fn info(&self) -> TaskInfo {
556         let status = self.status.lock().unwrap();
557         let progress = self.progress.lock().unwrap();
558         TaskInfo {
559             bundle: self.conf.bundle.clone(),
560             url: self.conf.url.clone(),
561             data: self.conf.data.clone(),
562             token: self.conf.token.clone(),
563             form_items: self.conf.form_items.clone(),
564             file_specs: self.conf.file_specs.clone(),
565             title: self.conf.title.clone(),
566             description: self.conf.description.clone(),
567             mime_type: {
568                 match self.conf.version {
569                     Version::API10 => match self.conf.common_data.action {
570                         Action::Download => match self.conf.headers.get("Content-Type") {
571                             None => "".into(),
572                             Some(v) => v.clone(),
573                         },
574                         Action::Upload => "multipart/form-data".into(),
575                         _ => "".into(),
576                     },
577                     Version::API9 => self.mime_type.lock().unwrap().clone(),
578                 }
579             },
580             progress: progress.clone(),
581             extras: progress.extras.clone(),
582             each_file_status: self.get_each_file_status(),
583             common_data: CommonTaskInfo {
584                 task_id: self.conf.common_data.task_id,
585                 uid: self.conf.common_data.uid,
586                 action: self.conf.common_data.action.repr,
587                 mode: self.conf.common_data.mode.repr,
588                 ctime: self.ctime,
589                 mtime: status.mtime,
590                 reason: status.reason.repr,
591                 gauge: self.conf.common_data.gauge,
592                 retry: self.conf.common_data.retry,
593                 tries: self.tries.load(Ordering::SeqCst),
594                 version: self.conf.version as u8,
595                 priority: self.conf.common_data.priority,
596             },
597         }
598     }
599 
600     pub(crate) fn notify_header_receive(&self) {
601         if self.conf.version == Version::API9 && self.conf.common_data.action == Action::Upload {
602             let notify_data = self.build_notify_data();
603 
604             Notifier::header_receive(&self.client_manager, notify_data);
605         }
606     }
607 }
608 
609 #[derive(Clone, Debug)]
610 pub(crate) struct TaskStatus {
611     pub(crate) mtime: u64,
612     pub(crate) state: State,
613     pub(crate) reason: Reason,
614 }
615 
616 impl TaskStatus {
617     pub(crate) fn new(mtime: u64) -> Self {
618         TaskStatus {
619             mtime,
620             state: State::Initialized,
621             reason: Reason::Default,
622         }
623     }
624 }
625 
check_file_specsnull626 fn check_file_specs(file_specs: &[FileSpec]) -> bool {
627     const EL1: &str = "/data/storage/el1/base/";
628     const EL2: &str = "/data/storage/el2/base/";
629     const EL5: &str = "/data/storage/el5/base/";
630 
631     let mut result = true;
632     for (idx, spec) in file_specs.iter().enumerate() {
633         let path = &spec.path;
634         if !spec.is_user_file
635             && !path.starts_with(EL1)
636             && !path.starts_with(EL2)
637             && !path.starts_with(EL5)
638         {
639             error!("File path invalid - path: {}, idx: {}", path, idx);
640             result = false;
641             break;
642         }
643     }
644 
645     result
646 }
647 
648 pub(crate) fn check_config(
649     config: &TaskConfig,
650     #[cfg(feature = "oh")] system: SystemConfig,
651 ) -> Result<(AttachedFiles, Client), ErrorCode> {
652     if !check_file_specs(&config.file_specs) {
653         return Err(ErrorCode::Other);
654     }
655     let files = AttachedFiles::open(config).map_err(|_| ErrorCode::FileOperationErr)?;
656     #[cfg(feature = "oh")]
657     let client = build_client(config, system).map_err(|_| ErrorCode::Other)?;
658 
659     #[cfg(not(feature = "oh"))]
660     let client = build_client(config).map_err(|_| ErrorCode::Other)?;
661     Ok((files, client))
662 }
663 
664 impl From<HttpClientError> for TaskError {
fromnull665     fn from(_value: HttpClientError) -> Self {
666         TaskError::Failed(Reason::BuildRequestFailed)
667     }
668 }
669 
670 impl From<io::Error> for TaskError {
fromnull671     fn from(_value: io::Error) -> Self {
672         TaskError::Failed(Reason::IoError)
673     }
674 }
675 
676 #[derive(Debug, PartialEq, Eq)]
677 pub enum TaskPhase {
678     NeedRetry,
679     UserAbort,
680     NetworkOffline,
681 }
682 
683 #[derive(Debug, PartialEq, Eq)]
684 pub enum TaskError {
685     Failed(Reason),
686     Waiting(TaskPhase),
687 }
688 
689 #[cfg(test)]
690 mod test {
691     use crate::task::request_task::change_upload_size;
692 
693     #[test]
ut_upload_sizenull694     fn ut_upload_size() {
695         assert_eq!(change_upload_size(0, -1, 30), 30);
696         assert_eq!(change_upload_size(10, -1, 30), 20);
697         assert_eq!(change_upload_size(0, 10, 30), 11);
698         assert_eq!(change_upload_size(10, 10, 100), 1);
699         assert_eq!(change_upload_size(0, 30, 30), 30);
700         assert_eq!(change_upload_size(0, 0, 0), 0);
701         assert_eq!(change_upload_size(10, 9, 100), 100);
702     }
703 }
704