1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::io::{self, SeekFrom};
15 use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
16 use std::sync::{Mutex, MutexGuard};
17 use std::time::Duration;
18
19 use ylong_http_client::async_impl::{Body, Client, Request, RequestBuilder, Response};
20 use ylong_http_client::{ErrorKind, HttpClientError};
21 use ylong_runtime::io::{AsyncSeekExt, AsyncWriteExt};
22
23 cfg_oh! {
24 use crate::manage::SystemConfig;
25 }
26
27 use super::config::{Mode, Version};
28 use super::info::{CommonTaskInfo, State, TaskInfo, UpdateInfo};
29 use super::notify::{EachFileStatus, NotifyData, Progress};
30 use super::reason::Reason;
31 use crate::error::ErrorCode;
32 use crate::manage::database::RequestDb;
33 use crate::manage::network::Network;
34 use crate::manage::notifier::Notifier;
35 use crate::service::client::ClientManagerEntry;
36 use crate::task::client::build_client;
37 use crate::task::config::{Action, TaskConfig};
38 use crate::task::files::{AttachedFiles, Files};
39 use crate::utils::form_item::FileSpec;
40 use crate::utils::get_current_timestamp;
41
42 const RETRY_TIMES: u32 = 4;
43 const RETRY_INTERVAL: u64 = 400;
44
45 pub(crate) struct RequestTask {
46 pub(crate) conf: TaskConfig,
47 pub(crate) client: Client,
48 pub(crate) files: Files,
49 pub(crate) body_files: Files,
50 pub(crate) ctime: u64,
51 pub(crate) mime_type: Mutex<String>,
52 pub(crate) progress: Mutex<Progress>,
53 pub(crate) status: Mutex<TaskStatus>,
54 pub(crate) code: Mutex<Vec<Reason>>,
55 pub(crate) tries: AtomicU32,
56 pub(crate) background_notify_time: AtomicU64,
57 pub(crate) file_total_size: AtomicI64,
58 pub(crate) rate_limiting: AtomicU64,
59 pub(crate) last_notify: AtomicU64,
60 pub(crate) client_manager: ClientManagerEntry,
61 pub(crate) running_result: Mutex<Option<Result<(), Reason>>>,
62 pub(crate) network: Network,
63 pub(crate) timeout_tries: AtomicU32,
64 pub(crate) upload_resume: AtomicBool,
65 }
66
67 impl RequestTask {
68 pub(crate) fn task_id(&self) -> u32 {
69 self.conf.common_data.task_id
70 }
71
72 pub(crate) fn uid(&self) -> u64 {
73 self.conf.common_data.uid
74 }
75
76 pub(crate) fn config(&self) -> &TaskConfig {
77 &self.conf
78 }
79
80 // only use for download task
81 pub(crate) fn mime_type(&self) -> String {
82 self.mime_type.lock().unwrap().clone()
83 }
84
85 pub(crate) fn action(&self) -> Action {
86 self.conf.common_data.action
87 }
88
89 pub(crate) fn mode(&self) -> Mode {
90 self.conf.common_data.mode
91 }
92
93 pub(crate) fn speed_limit(&self, limit: u64) {
94 let old = self.rate_limiting.swap(limit, Ordering::SeqCst);
95 if old != limit {
96 info!("task {} speed_limit {}", self.task_id(), limit);
97 }
98 }
99
100 pub(crate) async fn network_retry(&self) -> Result<(), TaskError> {
101 if self.tries.load(Ordering::SeqCst) < RETRY_TIMES {
102 self.tries.fetch_add(1, Ordering::SeqCst);
103 if !self.network.is_online() {
104 return Err(TaskError::Waiting(TaskPhase::NetworkOffline));
105 } else {
106 ylong_runtime::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await;
107 return Err(TaskError::Waiting(TaskPhase::NeedRetry));
108 }
109 }
110 Ok(())
111 }
112 }
113
114 pub(crate) fn change_upload_size(begins: u64, mut ends: i64, size: i64) -> i64 {
115 if ends < 0 || ends >= size {
116 ends = size - 1;
117 }
118 if begins as i64 > ends {
119 return size;
120 }
121 ends - begins as i64 + 1
122 }
123
124 impl RequestTask {
125 pub(crate) fn new(
126 config: TaskConfig,
127 files: AttachedFiles,
128 client: Client,
129 client_manager: ClientManagerEntry,
130 network: Network,
131 upload_resume: bool,
132 ) -> RequestTask {
133 let file_len = files.files.len();
134 let action = config.common_data.action;
135
136 let file_total_size = match action {
137 Action::Upload => {
138 let mut file_total_size = 0i64;
139 // If the total size overflows, ignore it.
140 for size in files.sizes.iter() {
141 file_total_size += *size;
142 }
143 file_total_size
144 }
145 Action::Download => -1,
146 _ => unreachable!("Action::Any in RequestTask::new never reach"),
147 };
148
149 let mut sizes = files.sizes.clone();
150
151 if action == Action::Upload && config.common_data.index < sizes.len() as u32 {
152 sizes[config.common_data.index as usize] = change_upload_size(
153 config.common_data.begins,
154 config.common_data.ends,
155 sizes[config.common_data.index as usize],
156 );
157 }
158
159 let time = get_current_timestamp();
160 let status = TaskStatus::new(time);
161 let progress = Progress::new(sizes);
162
163 RequestTask {
164 conf: config,
165 client,
166 files: files.files,
167 body_files: files.body_files,
168 ctime: time,
169 mime_type: Mutex::new(String::new()),
170 progress: Mutex::new(progress),
171 tries: AtomicU32::new(0),
172 status: Mutex::new(status),
173 code: Mutex::new(vec![Reason::Default; file_len]),
174 background_notify_time: AtomicU64::new(time),
175 file_total_size: AtomicI64::new(file_total_size),
176 rate_limiting: AtomicU64::new(0),
177 last_notify: AtomicU64::new(time),
178 client_manager,
179 running_result: Mutex::new(None),
180 network,
181 timeout_tries: AtomicU32::new(0),
182 upload_resume: AtomicBool::new(upload_resume),
183 }
184 }
185
186 pub(crate) fn new_by_info(
187 config: TaskConfig,
188 #[cfg(feature = "oh")] system: SystemConfig,
189 info: TaskInfo,
190 client_manager: ClientManagerEntry,
191 network: Network,
192 upload_resume: bool,
193 ) -> Result<RequestTask, ErrorCode> {
194 #[cfg(feature = "oh")]
195 let (files, client) = check_config(&config, system)?;
196 #[cfg(not(feature = "oh"))]
197 let (files, client) = check_config(&config)?;
198
199 let file_len = files.files.len();
200 let action = config.common_data.action;
201 let time = get_current_timestamp();
202
203 let file_total_size = match action {
204 Action::Upload => {
205 let mut file_total_size = 0i64;
206 // If the total size overflows, ignore it.
207 for size in files.sizes.iter() {
208 file_total_size += *size;
209 }
210 file_total_size
211 }
212 Action::Download => *info.progress.sizes.first().unwrap_or(&-1),
213 _ => unreachable!("Action::Any in RequestTask::new never reach"),
214 };
215
216 // If `TaskInfo` is provided, use data of it.
217 let ctime = info.common_data.ctime;
218 let mime_type = info.mime_type.clone();
219 let tries = info.common_data.tries;
220 let status = TaskStatus {
221 mtime: time,
222 state: State::from(info.progress.common_data.state),
223 reason: Reason::from(info.common_data.reason),
224 };
225 let progress = info.progress;
226
227 Ok(RequestTask {
228 conf: config,
229 client,
230 files: files.files,
231 body_files: files.body_files,
232 ctime,
233 mime_type: Mutex::new(mime_type),
234 progress: Mutex::new(progress),
235 tries: AtomicU32::new(tries),
236 status: Mutex::new(status),
237 code: Mutex::new(vec![Reason::Default; file_len]),
238 background_notify_time: AtomicU64::new(time),
239 file_total_size: AtomicI64::new(file_total_size),
240 rate_limiting: AtomicU64::new(0),
241 last_notify: AtomicU64::new(time),
242 client_manager,
243 running_result: Mutex::new(None),
244 network,
245 timeout_tries: AtomicU32::new(0),
246 upload_resume: AtomicBool::new(upload_resume),
247 })
248 }
249
250 pub(crate) fn build_notify_data(&self) -> NotifyData {
251 let vec = self.get_each_file_status();
252 NotifyData {
253 bundle: self.conf.bundle.clone(),
254 // `unwrap` for propagating panics among threads.
255 progress: self.progress.lock().unwrap().clone(),
256 action: self.conf.common_data.action,
257 version: self.conf.version,
258 each_file_status: vec,
259 task_id: self.conf.common_data.task_id,
260 }
261 }
262
263 pub(crate) fn update_progress_in_database(&self) {
264 let mtime = self.status.lock().unwrap().mtime;
265 let reason = self.status.lock().unwrap().reason;
266 let progress = self.progress.lock().unwrap().clone();
267 let update_info = UpdateInfo {
268 mtime,
269 reason: reason.repr,
270 progress,
271 each_file_status: RequestTask::get_each_file_status_by_code(
272 &self.code.lock().unwrap(),
273 &self.conf.file_specs,
274 ),
275 tries: self.tries.load(Ordering::SeqCst),
276 mime_type: self.mime_type(),
277 };
278 RequestDb::get_instance().update_task(self.task_id(), update_info);
279 }
280
281 pub(crate) fn build_request_builder(&self) -> Result<RequestBuilder, HttpClientError> {
282 use ylong_http_client::async_impl::PercentEncoder;
283
284 let url = self.conf.url.clone();
285 let url = match PercentEncoder::encode(url.as_str()) {
286 Ok(value) => value,
287 Err(e) => {
288 error!("url percent encoding error is {:?}", e);
289 return Err(e);
290 }
291 };
292
293 let method = match self.conf.method.to_uppercase().as_str() {
294 "PUT" => "PUT",
295 "POST" => "POST",
296 "GET" => "GET",
297 _ => match self.conf.common_data.action {
298 Action::Upload => {
299 if self.conf.version == Version::API10 {
300 "PUT"
301 } else {
302 "POST"
303 }
304 }
305 Action::Download => "GET",
306 _ => "",
307 },
308 };
309 let mut request = RequestBuilder::new().method(method).url(url.as_str());
310 for (key, value) in self.conf.headers.iter() {
311 request = request.header(key.as_str(), value.as_str());
312 }
313 Ok(request)
314 }
315
316 pub(crate) async fn clear_downloaded_file(&self) -> Result<(), std::io::Error> {
317 info!("task {} clear downloaded file", self.task_id());
318 let file = self.files.get_mut(0).unwrap();
319 file.set_len(0).await?;
320 file.seek(SeekFrom::Start(0)).await?;
321
322 let mut progress_guard = self.progress.lock().unwrap();
323 progress_guard.common_data.total_processed = 0;
324 progress_guard.processed[0] = 0;
325
326 Ok(())
327 }
328
329 pub(crate) async fn build_download_request(&self) -> Result<Request, TaskError> {
330 let mut request_builder = self.build_request_builder()?;
331
332 let file = self.files.get_mut(0).unwrap();
333
334 let has_downloaded = file.metadata().await?.len();
335 let resume_download = has_downloaded > 0;
336 let require_range = self.require_range();
337
338 let begins = self.conf.common_data.begins;
339 let ends = self.conf.common_data.ends;
340
341 debug!(
342 "task {} build download request, resume_download: {}, require_range: {}",
343 self.task_id(),
344 resume_download,
345 require_range
346 );
347 match (resume_download, require_range) {
348 (true, false) => {
349 let (builder, support_range) = self.support_range(request_builder);
350 request_builder = builder;
351 if support_range {
352 request_builder =
353 self.range_request(request_builder, begins + has_downloaded, ends);
354 } else {
355 self.clear_downloaded_file().await?;
356 }
357 }
358 (false, true) => {
359 request_builder = self.range_request(request_builder, begins, ends);
360 }
361 (true, true) => {
362 let (builder, support_range) = self.support_range(request_builder);
363 request_builder = builder;
364 if support_range {
365 request_builder =
366 self.range_request(request_builder, begins + has_downloaded, ends);
367 } else {
368 return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
369 }
370 }
371 (false, false) => {}
372 };
373
374 let request = request_builder.body(Body::slice(self.conf.data.clone()))?;
375 Ok(request)
376 }
377
range_requestnull378 fn range_request(
379 &self,
380 request_builder: RequestBuilder,
381 begins: u64,
382 ends: i64,
383 ) -> RequestBuilder {
384 let range = if ends < 0 {
385 format!("bytes={begins}-")
386 } else {
387 format!("bytes={begins}-{ends}")
388 };
389 request_builder.header("Range", range.as_str())
390 }
391
support_rangenull392 fn support_range(&self, mut request_builder: RequestBuilder) -> (RequestBuilder, bool) {
393 let progress_guard = self.progress.lock().unwrap();
394 let mut support_range = false;
395 if let Some(etag) = progress_guard.extras.get("etag") {
396 request_builder = request_builder.header("If-Range", etag.as_str());
397 support_range = true;
398 } else if let Some(last_modified) = progress_guard.extras.get("last-modified") {
399 request_builder = request_builder.header("If-Range", last_modified.as_str());
400 support_range = true;
401 }
402 if !support_range {
403 info!("task {} not support range", self.task_id());
404 }
405 (request_builder, support_range)
406 }
407
408 pub(crate) fn get_file_info(&self, response: &Response) -> Result<(), TaskError> {
409 let content_type = response.headers().get("content-type");
410 if let Some(mime_type) = content_type {
411 if let Ok(value) = mime_type.to_string() {
412 *self.mime_type.lock().unwrap() = value;
413 }
414 }
415
416 let content_length = response.headers().get("content-length");
417 if let Some(Ok(len)) = content_length.map(|v| v.to_string()) {
418 match len.parse::<i64>() {
419 Ok(v) => {
420 let mut progress = self.progress.lock().unwrap();
421 progress.sizes = vec![v + progress.processed[0] as i64];
422 self.file_total_size.store(v, Ordering::SeqCst);
423 debug!("the download task content-length is {}", v);
424 }
425 Err(e) => {
426 error!("convert string to i64 error: {:?}", e);
427 }
428 }
429 } else {
430 error!("cannot get content-length of the task");
431 if self.conf.common_data.precise {
432 return Err(TaskError::Failed(Reason::GetFileSizeFailed));
433 }
434 }
435 Ok(())
436 }
437
438 pub(crate) async fn handle_download_error(
439 &self,
440 err: HttpClientError,
441 ) -> Result<(), TaskError> {
442 if err.error_kind() != ErrorKind::UserAborted {
443 error!("Task {} {:?}", self.task_id(), err);
444 }
445 match err.error_kind() {
446 ErrorKind::Timeout => Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
447 // user triggered
448 ErrorKind::UserAborted => Err(TaskError::Waiting(TaskPhase::UserAbort)),
449 ErrorKind::BodyTransfer | ErrorKind::BodyDecode => {
450 self.network_retry().await?;
451 Err(TaskError::Failed(Reason::OthersError))
452 }
453 _ => {
454 if format!("{}", err).contains("No space left on device") {
455 Err(TaskError::Failed(Reason::InsufficientSpace))
456 } else {
457 Err(TaskError::Failed(Reason::OthersError))
458 }
459 }
460 }
461 }
462
463 #[cfg(feature = "oh")]
464 pub(crate) fn notify_response(&self, response: &Response) {
465 let tid = self.conf.common_data.task_id;
466 let version: String = response.version().as_str().into();
467 let status_code: u32 = response.status().as_u16() as u32;
468 let status_message: String;
469 if let Some(reason) = response.status().reason() {
470 status_message = reason.into();
471 } else {
472 error!("bad status_message {:?}", status_code);
473 return;
474 }
475 let headers = response.headers().clone();
476 debug!("notify_response");
477 self.client_manager
478 .send_response(tid, version, status_code, status_message, headers)
479 }
480
481 pub(crate) fn require_range(&self) -> bool {
482 self.conf.common_data.begins > 0 || self.conf.common_data.ends >= 0
483 }
484
485 pub(crate) async fn record_upload_response(
486 &self,
487 index: usize,
488 response: Result<Response, HttpClientError>,
489 ) {
490 if let Ok(mut r) = response {
491 {
492 let mut guard = self.progress.lock().unwrap();
493 guard.extras.clear();
494 for (k, v) in r.headers() {
495 if let Ok(value) = v.to_string() {
496 guard.extras.insert(k.to_string().to_lowercase(), value);
497 }
498 }
499 }
500
501 let file = match self.body_files.get_mut(index) {
502 Some(file) => file,
503 None => return,
504 };
505 let _ = file.set_len(0).await;
506 loop {
507 let mut buf = [0u8; 1024];
508 let size = r.data(&mut buf).await;
509 let size = match size {
510 Ok(size) => size,
511 Err(_e) => break,
512 };
513
514 if size == 0 {
515 break;
516 }
517 let _ = file.write_all(&buf[..size]).await;
518 }
519 // Makes sure all the data has been written to the target file.
520 let _ = file.sync_all().await;
521 }
522 }
523
524 pub(crate) fn get_each_file_status(&self) -> Vec<EachFileStatus> {
525 let mut vec = Vec::new();
526 // `unwrap` for propagating panics among threads.
527 let codes_guard = self.code.lock().unwrap();
528 for (i, file_spec) in self.conf.file_specs.iter().enumerate() {
529 let reason = *codes_guard.get(i).unwrap_or(&Reason::Default);
530 vec.push(EachFileStatus {
531 path: file_spec.path.clone(),
532 reason,
533 message: reason.to_str().into(),
534 });
535 }
536 vec
537 }
538
539 pub(crate) fn get_each_file_status_by_code(
540 codes_guard: &MutexGuard<Vec<Reason>>,
541 file_specs: &[FileSpec],
542 ) -> Vec<EachFileStatus> {
543 let mut vec = Vec::new();
544 for (i, file_spec) in file_specs.iter().enumerate() {
545 let reason = *codes_guard.get(i).unwrap_or(&Reason::Default);
546 vec.push(EachFileStatus {
547 path: file_spec.path.clone(),
548 reason,
549 message: reason.to_str().into(),
550 });
551 }
552 vec
553 }
554
555 pub(crate) fn info(&self) -> TaskInfo {
556 let status = self.status.lock().unwrap();
557 let progress = self.progress.lock().unwrap();
558 TaskInfo {
559 bundle: self.conf.bundle.clone(),
560 url: self.conf.url.clone(),
561 data: self.conf.data.clone(),
562 token: self.conf.token.clone(),
563 form_items: self.conf.form_items.clone(),
564 file_specs: self.conf.file_specs.clone(),
565 title: self.conf.title.clone(),
566 description: self.conf.description.clone(),
567 mime_type: {
568 match self.conf.version {
569 Version::API10 => match self.conf.common_data.action {
570 Action::Download => match self.conf.headers.get("Content-Type") {
571 None => "".into(),
572 Some(v) => v.clone(),
573 },
574 Action::Upload => "multipart/form-data".into(),
575 _ => "".into(),
576 },
577 Version::API9 => self.mime_type.lock().unwrap().clone(),
578 }
579 },
580 progress: progress.clone(),
581 extras: progress.extras.clone(),
582 each_file_status: self.get_each_file_status(),
583 common_data: CommonTaskInfo {
584 task_id: self.conf.common_data.task_id,
585 uid: self.conf.common_data.uid,
586 action: self.conf.common_data.action.repr,
587 mode: self.conf.common_data.mode.repr,
588 ctime: self.ctime,
589 mtime: status.mtime,
590 reason: status.reason.repr,
591 gauge: self.conf.common_data.gauge,
592 retry: self.conf.common_data.retry,
593 tries: self.tries.load(Ordering::SeqCst),
594 version: self.conf.version as u8,
595 priority: self.conf.common_data.priority,
596 },
597 }
598 }
599
600 pub(crate) fn notify_header_receive(&self) {
601 if self.conf.version == Version::API9 && self.conf.common_data.action == Action::Upload {
602 let notify_data = self.build_notify_data();
603
604 Notifier::header_receive(&self.client_manager, notify_data);
605 }
606 }
607 }
608
609 #[derive(Clone, Debug)]
610 pub(crate) struct TaskStatus {
611 pub(crate) mtime: u64,
612 pub(crate) state: State,
613 pub(crate) reason: Reason,
614 }
615
616 impl TaskStatus {
617 pub(crate) fn new(mtime: u64) -> Self {
618 TaskStatus {
619 mtime,
620 state: State::Initialized,
621 reason: Reason::Default,
622 }
623 }
624 }
625
check_file_specsnull626 fn check_file_specs(file_specs: &[FileSpec]) -> bool {
627 const EL1: &str = "/data/storage/el1/base/";
628 const EL2: &str = "/data/storage/el2/base/";
629 const EL5: &str = "/data/storage/el5/base/";
630
631 let mut result = true;
632 for (idx, spec) in file_specs.iter().enumerate() {
633 let path = &spec.path;
634 if !spec.is_user_file
635 && !path.starts_with(EL1)
636 && !path.starts_with(EL2)
637 && !path.starts_with(EL5)
638 {
639 error!("File path invalid - path: {}, idx: {}", path, idx);
640 result = false;
641 break;
642 }
643 }
644
645 result
646 }
647
648 pub(crate) fn check_config(
649 config: &TaskConfig,
650 #[cfg(feature = "oh")] system: SystemConfig,
651 ) -> Result<(AttachedFiles, Client), ErrorCode> {
652 if !check_file_specs(&config.file_specs) {
653 return Err(ErrorCode::Other);
654 }
655 let files = AttachedFiles::open(config).map_err(|_| ErrorCode::FileOperationErr)?;
656 #[cfg(feature = "oh")]
657 let client = build_client(config, system).map_err(|_| ErrorCode::Other)?;
658
659 #[cfg(not(feature = "oh"))]
660 let client = build_client(config).map_err(|_| ErrorCode::Other)?;
661 Ok((files, client))
662 }
663
664 impl From<HttpClientError> for TaskError {
fromnull665 fn from(_value: HttpClientError) -> Self {
666 TaskError::Failed(Reason::BuildRequestFailed)
667 }
668 }
669
670 impl From<io::Error> for TaskError {
fromnull671 fn from(_value: io::Error) -> Self {
672 TaskError::Failed(Reason::IoError)
673 }
674 }
675
676 #[derive(Debug, PartialEq, Eq)]
677 pub enum TaskPhase {
678 NeedRetry,
679 UserAbort,
680 NetworkOffline,
681 }
682
683 #[derive(Debug, PartialEq, Eq)]
684 pub enum TaskError {
685 Failed(Reason),
686 Waiting(TaskPhase),
687 }
688
689 #[cfg(test)]
690 mod test {
691 use crate::task::request_task::change_upload_size;
692
693 #[test]
ut_upload_sizenull694 fn ut_upload_size() {
695 assert_eq!(change_upload_size(0, -1, 30), 30);
696 assert_eq!(change_upload_size(10, -1, 30), 20);
697 assert_eq!(change_upload_size(0, 10, 30), 11);
698 assert_eq!(change_upload_size(10, 10, 100), 1);
699 assert_eq!(change_upload_size(0, 30, 30), 30);
700 assert_eq!(change_upload_size(0, 0, 0), 0);
701 assert_eq!(change_upload_size(10, 9, 100), 100);
702 }
703 }
704