xref: /developtools/hdc/hdc_rust/src/transfer/tcp.rs (revision cc290419)
1/*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15//! tcp
16#![allow(missing_docs)]
17
18use crate::config::*;
19use crate::serializer;
20#[allow(unused)]
21use crate::utils::hdc_log::*;
22
23use std::io::{self, Error, ErrorKind};
24
25#[cfg(feature = "host")]
26extern crate ylong_runtime_static as ylong_runtime;
27use ylong_runtime::io::AsyncReadExt;
28use ylong_runtime::net::SplitReadHalf;
29
30async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>> {
31    if expected_size == 0 {
32        return Ok(vec![]);
33    }
34    let mut data = vec![0_u8; expected_size];
35    let mut index: usize = 0;
36    while index < expected_size {
37        crate::trace!("before read {index} / {expected_size}");
38        match rd.read(&mut data[index..]).await {
39            Ok(recv_size) => {
40                crate::trace!("after read {recv_size}");
41                if recv_size == 0 {
42                    crate::debug!("peer shutdown");
43                    return Err(Error::new(ErrorKind::ConnectionAborted, "peer shutdown"));
44                }
45                index += recv_size;
46            }
47            Err(e) => {
48                crate::warn!("read tcp failed: {}", e.to_string());
49                return Err(Error::new(ErrorKind::Other, "read tcp failed"));
50            }
51        }
52    }
53    Ok(data)
54}
55
56pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage> {
57    let data = read_frame(rd, serializer::HEAD_SIZE).await?;
58    let payload_head = serializer::unpack_payload_head(data)?;
59    crate::trace!("get payload_head: {:?}", payload_head);
60
61    let expected_head_size = u16::from_be(payload_head.head_size) as usize;
62    let expected_data_size = u32::from_be(payload_head.data_size) as usize;
63    if expected_head_size + expected_data_size == 0
64        || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE
65    {
66        return Err(Error::new(ErrorKind::Other, "Packet size incorrect"));
67    }
68
69    let data = read_frame(rd, expected_head_size).await?;
70    let payload_protect = serializer::unpack_payload_protect(data)?;
71    crate::trace!("get payload_protect: {:?}", payload_protect);
72    let channel_id = payload_protect.channel_id;
73
74    let command = match HdcCommand::try_from(payload_protect.command_flag) {
75        Ok(command) => command,
76        Err(_) => {
77            return Err(Error::new(ErrorKind::Other, "unknown command"));
78        }
79    };
80
81    let payload = read_frame(rd, expected_data_size).await?;
82    Ok(TaskMessage {
83        channel_id,
84        command,
85        payload,
86    })
87}
88
89pub async fn recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>> {
90    let data = read_frame(rd, 4).await?;
91    let Ok(data) = data.try_into() else {
92        return Err(Error::new(
93            ErrorKind::Other,
94            "Data forced conversion failed",
95        ));
96    };
97    let expected_size = u32::from_be_bytes(data);
98    read_frame(rd, expected_size as usize).await
99}
100