/* * Copyright (C) 2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //! host_usb #![allow(missing_docs)] #[cfg(feature = "host")] extern crate ylong_runtime_static as ylong_runtime; use crate::config; use crate::config::*; use crate::serializer; use crate::serializer::native_struct::UsbHead; use crate::serializer::serialize::Serialization; use crate::serializer::serialize::SerializedBuffer; use crate::transfer::base; use crate::utils; #[allow(unused)] use crate::utils::hdc_log::*; use crate::config::ConnectType; use crate::config::TaskMessage; use crate::transfer::base::Reader; use crate::transfer::base::Writer; use crate::transfer::buffer::ConnectTypeMap; use std::collections::HashMap; use std::io::{self, Error, ErrorKind}; use std::string::FromUtf8Error; use std::sync::Arc; use ylong_runtime::sync::mpsc; use ylong_runtime::sync::mpsc::BoundedSender; use ylong_runtime::sync::Mutex; use ylong_runtime::sync::RwLock; #[repr(C)] pub struct PersistBuffer { pub ptr: *const libc::c_char, pub size: libc::c_ulonglong, } pub fn buf_to_vec(buf: PersistBuffer) -> Vec { let slice = unsafe { std::slice::from_raw_parts(buf.ptr as *const libc::c_uchar, buf.size as usize) }; slice.to_vec() } extern "C" { fn InitHostUsb() -> *mut libc::c_void; fn GetReadyUsbDevice(ptr: *mut libc::c_void) -> PersistBuffer; fn OnDeviceConnected( ptr: *mut libc::c_void, connect_key: *mut libc::c_char, len: i32, connectSuccess: bool, ); fn WriteUsb( ptr: *mut libc::c_void, connect_key: *mut libc::c_char, len: i32, buf: SerializedBuffer, ) -> libc::c_int; fn ReadUsb( ptr: *mut libc::c_void, connect_key: *mut libc::c_char, len: i32, excepted_size: i32, ) -> PersistBuffer; fn CancelUsbIo(ptr: *mut libc::c_void, connect_key: *mut libc::c_char, len: i32); fn Stop(ptr: *mut libc::c_void) -> bool; } pub fn init_host_usb() -> *mut libc::c_void { unsafe { InitHostUsb() } } pub fn get_ready_usb_devices_string(ptr: u64) -> Result { let buf = get_ready_usb_devices(ptr); String::from_utf8(buf_to_vec(buf)) } pub fn get_ready_usb_devices(ptr: u64) -> PersistBuffer { unsafe { GetReadyUsbDevice(ptr as *mut libc::c_void) } } pub fn on_device_connected(ptr: u64, connect_key: String, connect_success: bool) { unsafe { OnDeviceConnected( ptr as *mut libc::c_void, connect_key.as_str().as_ptr() as *mut libc::c_char, connect_key.len() as i32, connect_success, ); } } pub fn write_usb(ptr: u64, connect_key: String, buf: SerializedBuffer) -> i32 { unsafe { WriteUsb( ptr as *mut libc::c_void, connect_key.as_str().as_ptr() as *mut libc::c_char, connect_key.len() as i32, buf, ) } } pub fn read_usb(ptr: u64, connect_key: String, excepted_size: i32) -> PersistBuffer { unsafe { ReadUsb( ptr as *mut libc::c_void, connect_key.as_str().as_ptr() as *mut libc::c_char, connect_key.len() as i32, excepted_size, ) } } pub fn cancel_usb_io(ptr: u64, connect_key: String) { unsafe { CancelUsbIo( ptr as *mut libc::c_void, connect_key.as_str().as_ptr() as *mut libc::c_char, connect_key.len() as i32, ); } } pub fn stop(ptr: u64) { unsafe { Stop(ptr as *mut libc::c_void); } } pub struct HostUsbReader { pub connect_key: String, pub ptr: u64, } pub struct HostUsbWriter { pub connect_key: String, pub ptr: u64, } impl base::Reader for HostUsbReader { fn read_frame(&self, expected_size: usize) -> io::Result> { let buf = read_usb(self.ptr, self.connect_key.clone(), expected_size as i32); if buf.size == 0 { crate::warn!("usb read result < 0"); return Err(utils::error_other("usb read error".to_string())); } Ok(buf_to_vec(buf)) } fn check_protocol_head(&mut self) -> io::Result<(u32, u32, u32)> { let buf = self.read_frame(serializer::USB_HEAD_SIZE)?; if buf[..config::USB_PACKET_FLAG.len()] != config::USB_PACKET_FLAG[..] { return Err(Error::new( ErrorKind::Other, format!("USB_PACKET_FLAG incorrect, content: {:#?}", buf), )); } let mut head = serializer::native_struct::UsbHead::default(); if let Err(e) = head.parse(buf) { crate::warn!("parse usb head error: {}", e.to_string()); return Err(e); } Ok((u32::from_be(head.data_size), 0, head.session_id)) } } impl base::Writer for HostUsbWriter { fn write_all(&self, data: Vec) -> io::Result { let buf = SerializedBuffer { ptr: data.as_ptr() as *const libc::c_char, size: data.len() as u64, }; let ret = write_usb(self.ptr, self.connect_key.clone(), buf); if ret < 0 { Err(utils::error_other("usb write failed".to_string())) } else { Ok(ret) } } } pub fn build_header(session_id: u32, option: u8, length: usize) -> Vec { UsbHead { session_id: u32::to_be(session_id), flag: [config::USB_PACKET_FLAG[0], config::USB_PACKET_FLAG[1]], option, data_size: u32::to_be(length as u32), } .serialize() } pub async fn recv_channel_message(rd: &mut HostUsbReader) -> io::Result> { let data = rd.read_frame(4)?; let expected_size = u32::from_be_bytes(data.try_into().unwrap()); rd.read_frame(expected_size as usize) } async fn unpack_task_message( rd: &mut dyn Reader, tx: BoundedSender<(TaskMessage, u32)>, ) -> io::Result<()> { let (pack_size, package_index, _session_id) = rd.check_protocol_head()?; if pack_size == 0 { return Ok(()); } let data = rd.read_frame(pack_size as usize)?; let (head, body) = data.split_at(serializer::HEAD_SIZE); let payload_head = serializer::unpack_payload_head(head.to_vec())?; let expected_head_size = u16::from_be(payload_head.head_size) as usize; let expected_data_size = u32::from_be(payload_head.data_size) as usize; if serializer::HEAD_SIZE + expected_head_size + expected_data_size != pack_size as usize { crate::warn!( "protocol size diff: {pack_size} != {} + {expected_head_size} + {expected_data_size}", serializer::HEAD_SIZE ); } if expected_head_size + expected_data_size == 0 || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE { return Err(Error::new(ErrorKind::Other, "Packet size incorrect")); } let (protect, payload_raw) = body.split_at(expected_head_size); let (payload, _) = payload_raw.split_at(expected_data_size); let payload_protect = serializer::unpack_payload_protect(protect.to_vec())?; let channel_id = payload_protect.channel_id; let command = match HdcCommand::try_from(payload_protect.command_flag) { Ok(command) => command, Err(_) => { return Err(Error::new(ErrorKind::Other, "unknown command")); } }; let _ = tx .send(( TaskMessage { channel_id, command, payload: payload.to_vec(), }, package_index, )) .await; Ok(()) } pub fn start_recv( ptr: u64, connect_key: String, _session_id: u32, ) -> mpsc::BoundedReceiver<(TaskMessage, u32)> { let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32)>(config::USB_QUEUE_LEN); ylong_runtime::spawn(async move { let mut rd: HostUsbReader = HostUsbReader { connect_key, ptr }; loop { if let Err(e) = unpack_task_message(&mut rd, tx.clone()).await { crate::warn!("unpack task failed: {}, reopen fd...", e.to_string()); break; } } }); rx } pub fn start_recv_once( ptr: u64, connect_key: String, _session_id: u32, ) -> mpsc::BoundedReceiver<(TaskMessage, u32)> { let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32)>(config::USB_QUEUE_LEN); ylong_runtime::spawn(async move { let mut rd: HostUsbReader = HostUsbReader { connect_key, ptr }; if let Err(e) = unpack_task_message(&mut rd, tx.clone()).await { crate::warn!("unpack task failed: {}, reopen fd...", e.to_string()); } }); rx } type HostUsbWriter_ = Arc>; type HostUsbMap_ = Arc>>; pub struct HostUsbMap {} impl HostUsbMap { fn get_instance() -> HostUsbMap_ { static mut USB_MAP: Option = None; unsafe { USB_MAP .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) .clone() } } #[allow(unused)] pub async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> { let body = serializer::concat_pack(data); crate::debug!("transfer put data {:?}", body); let head = build_header(session_id, 1, body.len()); let tail = build_header(session_id, 0, 0); let instance = Self::get_instance(); let map: ylong_runtime::sync::RwLockReadGuard<'_, HashMap>>> = instance.read().await; let arc_wr = map.get(&session_id).unwrap(); let mut wr = arc_wr.lock().await; wr.write_all(head)?; wr.write_all(body)?; wr.write_all(tail)?; Ok(()) } #[allow(unused)] pub async fn send_channel_message(channel_id: u32, buf: Vec) -> io::Result<()> { crate::trace!("send channel msg: {:#?}", buf.clone()); let send = [ u32::to_be_bytes(buf.len() as u32).as_slice(), buf.as_slice(), ] .concat(); let instance = Self::get_instance(); let map = instance.read().await; if let Some(guard) = map.get(&channel_id) { let mut wr = guard.lock().await; let _ = wr.write_all(send); return Ok(()); } Err(Error::new(ErrorKind::NotFound, "channel not found")) } pub async fn start(session_id: u32, wr: HostUsbWriter) { let buffer_map = Self::get_instance(); let mut map = buffer_map.write().await; let arc_wr = Arc::new(Mutex::new(wr)); map.insert(session_id, arc_wr); ConnectTypeMap::put( session_id, ConnectType::HostUsb("some_mount_point".to_string()), ) .await; } #[allow(unused)] pub async fn end(id: u32) { crate::warn!("usb session {} will end", id); let instance = Self::get_instance(); let mut map = instance.write().await; let _ = map.remove(&id); ConnectTypeMap::del(id).await; } }