1/* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15//! tcp 16#![allow(missing_docs)] 17 18use crate::config::*; 19use crate::serializer; 20#[allow(unused)] 21use crate::utils::hdc_log::*; 22 23use std::io::{self, Error, ErrorKind}; 24 25#[cfg(feature = "host")] 26extern crate ylong_runtime_static as ylong_runtime; 27use ylong_runtime::io::AsyncReadExt; 28use ylong_runtime::net::SplitReadHalf; 29 30async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>> { 31 if expected_size == 0 { 32 return Ok(vec![]); 33 } 34 let mut data = vec![0_u8; expected_size]; 35 let mut index: usize = 0; 36 while index < expected_size { 37 crate::trace!("before read {index} / {expected_size}"); 38 match rd.read(&mut data[index..]).await { 39 Ok(recv_size) => { 40 crate::trace!("after read {recv_size}"); 41 if recv_size == 0 { 42 crate::debug!("peer shutdown"); 43 return Err(Error::new(ErrorKind::ConnectionAborted, "peer shutdown")); 44 } 45 index += recv_size; 46 } 47 Err(e) => { 48 crate::warn!("read tcp failed: {}", e.to_string()); 49 return Err(Error::new(ErrorKind::Other, "read tcp failed")); 50 } 51 } 52 } 53 Ok(data) 54} 55 56pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage> { 57 let data = read_frame(rd, serializer::HEAD_SIZE).await?; 58 let payload_head = serializer::unpack_payload_head(data)?; 59 crate::trace!("get payload_head: {:?}", payload_head); 60 61 let expected_head_size = u16::from_be(payload_head.head_size) as usize; 62 let expected_data_size = u32::from_be(payload_head.data_size) as usize; 63 if expected_head_size + expected_data_size == 0 64 || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE 65 { 66 return Err(Error::new(ErrorKind::Other, "Packet size incorrect")); 67 } 68 69 let data = read_frame(rd, expected_head_size).await?; 70 let payload_protect = serializer::unpack_payload_protect(data)?; 71 crate::trace!("get payload_protect: {:?}", payload_protect); 72 let channel_id = payload_protect.channel_id; 73 74 let command = match HdcCommand::try_from(payload_protect.command_flag) { 75 Ok(command) => command, 76 Err(_) => { 77 return Err(Error::new(ErrorKind::Other, "unknown command")); 78 } 79 }; 80 81 let payload = read_frame(rd, expected_data_size).await?; 82 Ok(TaskMessage { 83 channel_id, 84 command, 85 payload, 86 }) 87} 88 89pub async fn recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>> { 90 let data = read_frame(rd, 4).await?; 91 let Ok(data) = data.try_into() else { 92 return Err(Error::new( 93 ErrorKind::Other, 94 "Data forced conversion failed", 95 )); 96 }; 97 let expected_size = u32::from_be_bytes(data); 98 read_frame(rd, expected_size as usize).await 99} 100