1// vim: tw=80 2//! POSIX Asynchronous I/O 3//! 4//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like 5//! devices. It supports [`read`](struct.AioRead.html#method.new), 6//! [`write`](struct.AioWrite.html#method.new), 7//! [`fsync`](struct.AioFsync.html#method.new), 8//! [`readv`](struct.AioReadv.html#method.new), and 9//! [`writev`](struct.AioWritev.html#method.new), operations, subject to 10//! platform support. Completion 11//! notifications can optionally be delivered via 12//! [signals](../signal/enum.SigevNotify.html#variant.SigevSignal), via the 13//! [`aio_suspend`](fn.aio_suspend.html) function, or via polling. Some 14//! platforms support other completion 15//! notifications, such as 16//! [kevent](../signal/enum.SigevNotify.html#variant.SigevKevent). 17//! 18//! Multiple operations may be submitted in a batch with 19//! [`lio_listio`](fn.lio_listio.html), though the standard does not guarantee 20//! that they will be executed atomically. 21//! 22//! Outstanding operations may be cancelled with 23//! [`cancel`](trait.Aio.html#method.cancel) or 24//! [`aio_cancel_all`](fn.aio_cancel_all.html), though the operating system may 25//! not support this for all filesystems and devices. 26#[cfg(target_os = "freebsd")] 27use std::io::{IoSlice, IoSliceMut}; 28use std::{ 29 convert::TryFrom, 30 fmt::{self, Debug}, 31 marker::{PhantomData, PhantomPinned}, 32 mem, 33 os::unix::io::RawFd, 34 pin::Pin, 35 ptr, thread, 36}; 37 38use libc::{c_void, off_t}; 39use pin_utils::unsafe_pinned; 40 41use crate::{ 42 errno::Errno, 43 sys::{signal::*, time::TimeSpec}, 44 Result, 45}; 46 47libc_enum! { 48 /// Mode for `AioCb::fsync`. Controls whether only data or both data and 49 /// metadata are synced. 50 #[repr(i32)] 51 #[non_exhaustive] 52 pub enum AioFsyncMode { 53 /// do it like `fsync` 54 O_SYNC, 55 /// on supported operating systems only, do it like `fdatasync` 56 #[cfg(any(target_os = "ios", 57 target_os = "linux", 58 target_os = "macos", 59 target_os = "netbsd", 60 target_os = "openbsd"))] 61 #[cfg_attr(docsrs, doc(cfg(all())))] 62 O_DSYNC 63 } 64 impl TryFrom<i32> 65} 66 67libc_enum! { 68 /// Mode for [`lio_listio`](fn.lio_listio.html) 69 #[repr(i32)] 70 pub enum LioMode { 71 /// Requests that [`lio_listio`](fn.lio_listio.html) block until all 72 /// requested operations have been completed 73 LIO_WAIT, 74 /// Requests that [`lio_listio`](fn.lio_listio.html) return immediately 75 LIO_NOWAIT, 76 } 77} 78 79/// Return values for [`AioCb::cancel`](struct.AioCb.html#method.cancel) and 80/// [`aio_cancel_all`](fn.aio_cancel_all.html) 81#[repr(i32)] 82#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 83pub enum AioCancelStat { 84 /// All outstanding requests were canceled 85 AioCanceled = libc::AIO_CANCELED, 86 /// Some requests were not canceled. Their status should be checked with 87 /// `AioCb::error` 88 AioNotCanceled = libc::AIO_NOTCANCELED, 89 /// All of the requests have already finished 90 AioAllDone = libc::AIO_ALLDONE, 91} 92 93/// Newtype that adds Send and Sync to libc::aiocb, which contains raw pointers 94#[repr(transparent)] 95struct LibcAiocb(libc::aiocb); 96 97unsafe impl Send for LibcAiocb {} 98unsafe impl Sync for LibcAiocb {} 99 100/// Base class for all AIO operations. Should only be used directly when 101/// checking for completion. 102// We could create some kind of AsPinnedMut trait, and implement it for all aio 103// ops, allowing the crate's users to get pinned references to `AioCb`. That 104// could save some code for things like polling methods. But IMHO it would 105// provide polymorphism at the wrong level. Instead, the best place for 106// polymorphism is at the level of `Futures`. 107#[repr(C)] 108struct AioCb { 109 aiocb: LibcAiocb, 110 /// Could this `AioCb` potentially have any in-kernel state? 111 // It would be really nice to perform the in-progress check entirely at 112 // compile time. But I can't figure out how, because: 113 // * Future::poll takes a `Pin<&mut self>` rather than `self`, and 114 // * Rust's lack of an equivalent of C++'s Guaranteed Copy Elision means 115 // that there's no way to write an AioCb constructor that neither boxes 116 // the object itself, nor moves it during return. 117 in_progress: bool, 118} 119 120impl AioCb { 121 pin_utils::unsafe_unpinned!(aiocb: LibcAiocb); 122 123 fn aio_return(mut self: Pin<&mut Self>) -> Result<usize> { 124 self.in_progress = false; 125 unsafe { 126 let p: *mut libc::aiocb = &mut self.aiocb.0; 127 Errno::result(libc::aio_return(p)) 128 } 129 .map(|r| r as usize) 130 } 131 132 fn cancel(mut self: Pin<&mut Self>) -> Result<AioCancelStat> { 133 let r = unsafe { 134 libc::aio_cancel(self.aiocb.0.aio_fildes, &mut self.aiocb.0) 135 }; 136 match r { 137 libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), 138 libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), 139 libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), 140 -1 => Err(Errno::last()), 141 _ => panic!("unknown aio_cancel return value"), 142 } 143 } 144 145 fn common_init(fd: RawFd, prio: i32, sigev_notify: SigevNotify) -> Self { 146 // Use mem::zeroed instead of explicitly zeroing each field, because the 147 // number and name of reserved fields is OS-dependent. On some OSes, 148 // some reserved fields are used the kernel for state, and must be 149 // explicitly zeroed when allocated. 150 let mut a = unsafe { mem::zeroed::<libc::aiocb>() }; 151 a.aio_fildes = fd; 152 a.aio_reqprio = prio; 153 a.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); 154 AioCb { 155 aiocb: LibcAiocb(a), 156 in_progress: false, 157 } 158 } 159 160 fn error(self: Pin<&mut Self>) -> Result<()> { 161 let r = unsafe { libc::aio_error(&self.aiocb().0) }; 162 match r { 163 0 => Ok(()), 164 num if num > 0 => Err(Errno::from_i32(num)), 165 -1 => Err(Errno::last()), 166 num => panic!("unknown aio_error return value {:?}", num), 167 } 168 } 169 170 fn in_progress(&self) -> bool { 171 self.in_progress 172 } 173 174 fn set_in_progress(mut self: Pin<&mut Self>) { 175 self.as_mut().in_progress = true; 176 } 177 178 /// Update the notification settings for an existing AIO operation that has 179 /// not yet been submitted. 180 // Takes a normal reference rather than a pinned one because this method is 181 // normally called before the object needs to be pinned, that is, before 182 // it's been submitted to the kernel. 183 fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) { 184 assert!( 185 !self.in_progress, 186 "Can't change notification settings for an in-progress operation" 187 ); 188 self.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); 189 } 190} 191 192impl Debug for AioCb { 193 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 194 fmt.debug_struct("AioCb") 195 .field("aiocb", &self.aiocb.0) 196 .field("in_progress", &self.in_progress) 197 .finish() 198 } 199} 200 201impl Drop for AioCb { 202 /// If the `AioCb` has no remaining state in the kernel, just drop it. 203 /// Otherwise, dropping constitutes a resource leak, which is an error 204 fn drop(&mut self) { 205 assert!( 206 thread::panicking() || !self.in_progress, 207 "Dropped an in-progress AioCb" 208 ); 209 } 210} 211 212/// Methods common to all AIO operations 213pub trait Aio { 214 /// The return type of [`Aio::aio_return`]. 215 type Output; 216 217 /// Retrieve return status of an asynchronous operation. 218 /// 219 /// Should only be called once for each operation, after [`Aio::error`] 220 /// indicates that it has completed. The result is the same as for the 221 /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions. 222 /// 223 /// # References 224 /// 225 /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html) 226 fn aio_return(self: Pin<&mut Self>) -> Result<Self::Output>; 227 228 /// Cancels an outstanding AIO request. 229 /// 230 /// The operating system is not required to implement cancellation for all 231 /// file and device types. Even if it does, there is no guarantee that the 232 /// operation has not already completed. So the caller must check the 233 /// result and handle operations that were not canceled or that have already 234 /// completed. 235 /// 236 /// # Examples 237 /// 238 /// Cancel an outstanding aio operation. Note that we must still call 239 /// `aio_return` to free resources, even though we don't care about the 240 /// result. 241 /// 242 /// ``` 243 /// # use nix::errno::Errno; 244 /// # use nix::Error; 245 /// # use nix::sys::aio::*; 246 /// # use nix::sys::signal::SigevNotify; 247 /// # use std::{thread, time}; 248 /// # use std::io::Write; 249 /// # use std::os::unix::io::AsRawFd; 250 /// # use tempfile::tempfile; 251 /// let wbuf = b"CDEF"; 252 /// let mut f = tempfile().unwrap(); 253 /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), 254 /// 2, //offset 255 /// &wbuf[..], 256 /// 0, //priority 257 /// SigevNotify::SigevNone)); 258 /// aiocb.as_mut().submit().unwrap(); 259 /// let cs = aiocb.as_mut().cancel().unwrap(); 260 /// if cs == AioCancelStat::AioNotCanceled { 261 /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { 262 /// thread::sleep(time::Duration::from_millis(10)); 263 /// } 264 /// } 265 /// // Must call `aio_return`, but ignore the result 266 /// let _ = aiocb.as_mut().aio_return(); 267 /// ``` 268 /// 269 /// # References 270 /// 271 /// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) 272 fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat>; 273 274 /// Retrieve error status of an asynchronous operation. 275 /// 276 /// If the request has not yet completed, returns `EINPROGRESS`. Otherwise, 277 /// returns `Ok` or any other error. 278 /// 279 /// # Examples 280 /// 281 /// Issue an aio operation and use `error` to poll for completion. Polling 282 /// is an alternative to `aio_suspend`, used by most of the other examples. 283 /// 284 /// ``` 285 /// # use nix::errno::Errno; 286 /// # use nix::Error; 287 /// # use nix::sys::aio::*; 288 /// # use nix::sys::signal::SigevNotify; 289 /// # use std::{thread, time}; 290 /// # use std::os::unix::io::AsRawFd; 291 /// # use tempfile::tempfile; 292 /// const WBUF: &[u8] = b"abcdef123456"; 293 /// let mut f = tempfile().unwrap(); 294 /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), 295 /// 2, //offset 296 /// WBUF, 297 /// 0, //priority 298 /// SigevNotify::SigevNone)); 299 /// aiocb.as_mut().submit().unwrap(); 300 /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { 301 /// thread::sleep(time::Duration::from_millis(10)); 302 /// } 303 /// assert_eq!(aiocb.as_mut().aio_return().unwrap(), WBUF.len()); 304 /// ``` 305 /// 306 /// # References 307 /// 308 /// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html) 309 fn error(self: Pin<&mut Self>) -> Result<()>; 310 311 /// Returns the underlying file descriptor associated with the operation. 312 fn fd(&self) -> RawFd; 313 314 /// Does this operation currently have any in-kernel state? 315 /// 316 /// Dropping an operation that does have in-kernel state constitutes a 317 /// resource leak. 318 /// 319 /// # Examples 320 /// 321 /// ``` 322 /// # use nix::errno::Errno; 323 /// # use nix::Error; 324 /// # use nix::sys::aio::*; 325 /// # use nix::sys::signal::SigevNotify::SigevNone; 326 /// # use std::{thread, time}; 327 /// # use std::os::unix::io::AsRawFd; 328 /// # use tempfile::tempfile; 329 /// let f = tempfile().unwrap(); 330 /// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC, 331 /// 0, SigevNone)); 332 /// assert!(!aiof.as_mut().in_progress()); 333 /// aiof.as_mut().submit().expect("aio_fsync failed early"); 334 /// assert!(aiof.as_mut().in_progress()); 335 /// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) { 336 /// thread::sleep(time::Duration::from_millis(10)); 337 /// } 338 /// aiof.as_mut().aio_return().expect("aio_fsync failed late"); 339 /// assert!(!aiof.as_mut().in_progress()); 340 /// ``` 341 fn in_progress(&self) -> bool; 342 343 /// Returns the priority of the `AioCb` 344 fn priority(&self) -> i32; 345 346 /// Update the notification settings for an existing AIO operation that has 347 /// not yet been submitted. 348 fn set_sigev_notify(&mut self, sev: SigevNotify); 349 350 /// Returns the `SigEvent` that will be used for notification. 351 fn sigevent(&self) -> SigEvent; 352 353 /// Actually start the I/O operation. 354 /// 355 /// After calling this method and until [`Aio::aio_return`] returns `Ok`, 356 /// the structure may not be moved in memory. 357 fn submit(self: Pin<&mut Self>) -> Result<()>; 358} 359 360macro_rules! aio_methods { 361 () => { 362 fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat> { 363 self.aiocb().cancel() 364 } 365 366 fn error(self: Pin<&mut Self>) -> Result<()> { 367 self.aiocb().error() 368 } 369 370 fn fd(&self) -> RawFd { 371 self.aiocb.aiocb.0.aio_fildes 372 } 373 374 fn in_progress(&self) -> bool { 375 self.aiocb.in_progress() 376 } 377 378 fn priority(&self) -> i32 { 379 self.aiocb.aiocb.0.aio_reqprio 380 } 381 382 fn set_sigev_notify(&mut self, sev: SigevNotify) { 383 self.aiocb.set_sigev_notify(sev) 384 } 385 386 fn sigevent(&self) -> SigEvent { 387 SigEvent::from(&self.aiocb.aiocb.0.aio_sigevent) 388 } 389 }; 390 ($func:ident) => { 391 aio_methods!(); 392 393 fn aio_return(self: Pin<&mut Self>) -> Result<<Self as Aio>::Output> { 394 self.aiocb().aio_return() 395 } 396 397 fn submit(mut self: Pin<&mut Self>) -> Result<()> { 398 let p: *mut libc::aiocb = &mut self.as_mut().aiocb().aiocb.0; 399 Errno::result({ unsafe { libc::$func(p) } }).map(|_| { 400 self.aiocb().set_in_progress(); 401 }) 402 } 403 }; 404} 405 406/// An asynchronous version of `fsync(2)`. 407/// 408/// # References 409/// 410/// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html) 411/// # Examples 412/// 413/// ``` 414/// # use nix::errno::Errno; 415/// # use nix::Error; 416/// # use nix::sys::aio::*; 417/// # use nix::sys::signal::SigevNotify::SigevNone; 418/// # use std::{thread, time}; 419/// # use std::os::unix::io::AsRawFd; 420/// # use tempfile::tempfile; 421/// let f = tempfile().unwrap(); 422/// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC, 423/// 0, SigevNone)); 424/// aiof.as_mut().submit().expect("aio_fsync failed early"); 425/// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) { 426/// thread::sleep(time::Duration::from_millis(10)); 427/// } 428/// aiof.as_mut().aio_return().expect("aio_fsync failed late"); 429/// ``` 430#[derive(Debug)] 431#[repr(transparent)] 432pub struct AioFsync { 433 aiocb: AioCb, 434 _pin: PhantomPinned, 435} 436 437impl AioFsync { 438 unsafe_pinned!(aiocb: AioCb); 439 440 /// Returns the operation's fsync mode: data and metadata or data only? 441 pub fn mode(&self) -> AioFsyncMode { 442 AioFsyncMode::try_from(self.aiocb.aiocb.0.aio_lio_opcode).unwrap() 443 } 444 445 /// Create a new `AioFsync`. 446 /// 447 /// # Arguments 448 /// 449 /// * `fd`: File descriptor to sync. 450 /// * `mode`: Whether to sync file metadata too, or just data. 451 /// * `prio`: If POSIX Prioritized IO is supported, then the 452 /// operation will be prioritized at the process's 453 /// priority level minus `prio`. 454 /// * `sigev_notify`: Determines how you will be notified of event 455 /// completion. 456 pub fn new( 457 fd: RawFd, 458 mode: AioFsyncMode, 459 prio: i32, 460 sigev_notify: SigevNotify, 461 ) -> Self { 462 let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); 463 // To save some memory, store mode in an unused field of the AioCb. 464 // True it isn't very much memory, but downstream creates will likely 465 // create an enum containing this and other AioCb variants and pack 466 // those enums into data structures like Vec, so it adds up. 467 aiocb.aiocb.0.aio_lio_opcode = mode as libc::c_int; 468 AioFsync { 469 aiocb, 470 _pin: PhantomPinned, 471 } 472 } 473} 474 475impl Aio for AioFsync { 476 type Output = (); 477 478 aio_methods!(); 479 480 fn aio_return(self: Pin<&mut Self>) -> Result<()> { 481 self.aiocb().aio_return().map(drop) 482 } 483 484 fn submit(mut self: Pin<&mut Self>) -> Result<()> { 485 let aiocb = &mut self.as_mut().aiocb().aiocb.0; 486 let mode = mem::replace(&mut aiocb.aio_lio_opcode, 0); 487 let p: *mut libc::aiocb = aiocb; 488 Errno::result(unsafe { libc::aio_fsync(mode, p) }).map(|_| { 489 self.aiocb().set_in_progress(); 490 }) 491 } 492} 493 494// AioFsync does not need AsMut, since it can't be used with lio_listio 495 496impl AsRef<libc::aiocb> for AioFsync { 497 fn as_ref(&self) -> &libc::aiocb { 498 &self.aiocb.aiocb.0 499 } 500} 501 502/// Asynchronously reads from a file descriptor into a buffer 503/// 504/// # References 505/// 506/// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html) 507/// 508/// # Examples 509/// 510/// 511/// ``` 512/// # use nix::errno::Errno; 513/// # use nix::Error; 514/// # use nix::sys::aio::*; 515/// # use nix::sys::signal::SigevNotify; 516/// # use std::{thread, time}; 517/// # use std::io::Write; 518/// # use std::os::unix::io::AsRawFd; 519/// # use tempfile::tempfile; 520/// const INITIAL: &[u8] = b"abcdef123456"; 521/// const LEN: usize = 4; 522/// let mut rbuf = vec![0; LEN]; 523/// let mut f = tempfile().unwrap(); 524/// f.write_all(INITIAL).unwrap(); 525/// { 526/// let mut aior = Box::pin( 527/// AioRead::new( 528/// f.as_raw_fd(), 529/// 2, //offset 530/// &mut rbuf, 531/// 0, //priority 532/// SigevNotify::SigevNone 533/// ) 534/// ); 535/// aior.as_mut().submit().unwrap(); 536/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) { 537/// thread::sleep(time::Duration::from_millis(10)); 538/// } 539/// assert_eq!(aior.as_mut().aio_return().unwrap(), LEN); 540/// } 541/// assert_eq!(rbuf, b"cdef"); 542/// ``` 543#[derive(Debug)] 544#[repr(transparent)] 545pub struct AioRead<'a> { 546 aiocb: AioCb, 547 _data: PhantomData<&'a [u8]>, 548 _pin: PhantomPinned, 549} 550 551impl<'a> AioRead<'a> { 552 unsafe_pinned!(aiocb: AioCb); 553 554 /// Returns the requested length of the aio operation in bytes 555 /// 556 /// This method returns the *requested* length of the operation. To get the 557 /// number of bytes actually read or written by a completed operation, use 558 /// `aio_return` instead. 559 pub fn nbytes(&self) -> usize { 560 self.aiocb.aiocb.0.aio_nbytes 561 } 562 563 /// Create a new `AioRead`, placing the data in a mutable slice. 564 /// 565 /// # Arguments 566 /// 567 /// * `fd`: File descriptor to read from 568 /// * `offs`: File offset 569 /// * `buf`: A memory buffer. It must outlive the `AioRead`. 570 /// * `prio`: If POSIX Prioritized IO is supported, then the 571 /// operation will be prioritized at the process's 572 /// priority level minus `prio` 573 /// * `sigev_notify`: Determines how you will be notified of event 574 /// completion. 575 pub fn new( 576 fd: RawFd, 577 offs: off_t, 578 buf: &'a mut [u8], 579 prio: i32, 580 sigev_notify: SigevNotify, 581 ) -> Self { 582 let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); 583 aiocb.aiocb.0.aio_nbytes = buf.len(); 584 aiocb.aiocb.0.aio_buf = buf.as_mut_ptr() as *mut c_void; 585 aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READ; 586 aiocb.aiocb.0.aio_offset = offs; 587 AioRead { 588 aiocb, 589 _data: PhantomData, 590 _pin: PhantomPinned, 591 } 592 } 593 594 /// Returns the file offset of the operation. 595 pub fn offset(&self) -> off_t { 596 self.aiocb.aiocb.0.aio_offset 597 } 598} 599 600impl<'a> Aio for AioRead<'a> { 601 type Output = usize; 602 603 aio_methods!(aio_read); 604} 605 606impl<'a> AsMut<libc::aiocb> for AioRead<'a> { 607 fn as_mut(&mut self) -> &mut libc::aiocb { 608 &mut self.aiocb.aiocb.0 609 } 610} 611 612impl<'a> AsRef<libc::aiocb> for AioRead<'a> { 613 fn as_ref(&self) -> &libc::aiocb { 614 &self.aiocb.aiocb.0 615 } 616} 617 618/// Asynchronously reads from a file descriptor into a scatter/gather list of buffers. 619/// 620/// # References 621/// 622/// [aio_readv](https://www.freebsd.org/cgi/man.cgi?query=aio_readv) 623/// 624/// # Examples 625/// 626/// 627#[cfg_attr(fbsd14, doc = " ```")] 628#[cfg_attr(not(fbsd14), doc = " ```no_run")] 629/// # use nix::errno::Errno; 630/// # use nix::Error; 631/// # use nix::sys::aio::*; 632/// # use nix::sys::signal::SigevNotify; 633/// # use std::{thread, time}; 634/// # use std::io::{IoSliceMut, Write}; 635/// # use std::os::unix::io::AsRawFd; 636/// # use tempfile::tempfile; 637/// const INITIAL: &[u8] = b"abcdef123456"; 638/// let mut rbuf0 = vec![0; 4]; 639/// let mut rbuf1 = vec![0; 2]; 640/// let expected_len = rbuf0.len() + rbuf1.len(); 641/// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; 642/// let mut f = tempfile().unwrap(); 643/// f.write_all(INITIAL).unwrap(); 644/// { 645/// let mut aior = Box::pin( 646/// AioReadv::new( 647/// f.as_raw_fd(), 648/// 2, //offset 649/// &mut rbufs, 650/// 0, //priority 651/// SigevNotify::SigevNone 652/// ) 653/// ); 654/// aior.as_mut().submit().unwrap(); 655/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) { 656/// thread::sleep(time::Duration::from_millis(10)); 657/// } 658/// assert_eq!(aior.as_mut().aio_return().unwrap(), expected_len); 659/// } 660/// assert_eq!(rbuf0, b"cdef"); 661/// assert_eq!(rbuf1, b"12"); 662/// ``` 663#[cfg(target_os = "freebsd")] 664#[derive(Debug)] 665#[repr(transparent)] 666pub struct AioReadv<'a> { 667 aiocb: AioCb, 668 _data: PhantomData<&'a [&'a [u8]]>, 669 _pin: PhantomPinned, 670} 671 672#[cfg(target_os = "freebsd")] 673impl<'a> AioReadv<'a> { 674 unsafe_pinned!(aiocb: AioCb); 675 676 /// Returns the number of buffers the operation will read into. 677 pub fn iovlen(&self) -> usize { 678 self.aiocb.aiocb.0.aio_nbytes 679 } 680 681 /// Create a new `AioReadv`, placing the data in a list of mutable slices. 682 /// 683 /// # Arguments 684 /// 685 /// * `fd`: File descriptor to read from 686 /// * `offs`: File offset 687 /// * `bufs`: A scatter/gather list of memory buffers. They must 688 /// outlive the `AioReadv`. 689 /// * `prio`: If POSIX Prioritized IO is supported, then the 690 /// operation will be prioritized at the process's 691 /// priority level minus `prio` 692 /// * `sigev_notify`: Determines how you will be notified of event 693 /// completion. 694 pub fn new( 695 fd: RawFd, 696 offs: off_t, 697 bufs: &mut [IoSliceMut<'a>], 698 prio: i32, 699 sigev_notify: SigevNotify, 700 ) -> Self { 701 let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); 702 // In vectored mode, aio_nbytes stores the length of the iovec array, 703 // not the byte count. 704 aiocb.aiocb.0.aio_nbytes = bufs.len(); 705 aiocb.aiocb.0.aio_buf = bufs.as_mut_ptr() as *mut c_void; 706 aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READV; 707 aiocb.aiocb.0.aio_offset = offs; 708 AioReadv { 709 aiocb, 710 _data: PhantomData, 711 _pin: PhantomPinned, 712 } 713 } 714 715 /// Returns the file offset of the operation. 716 pub fn offset(&self) -> off_t { 717 self.aiocb.aiocb.0.aio_offset 718 } 719} 720 721#[cfg(target_os = "freebsd")] 722impl<'a> Aio for AioReadv<'a> { 723 type Output = usize; 724 725 aio_methods!(aio_readv); 726} 727 728#[cfg(target_os = "freebsd")] 729impl<'a> AsMut<libc::aiocb> for AioReadv<'a> { 730 fn as_mut(&mut self) -> &mut libc::aiocb { 731 &mut self.aiocb.aiocb.0 732 } 733} 734 735#[cfg(target_os = "freebsd")] 736impl<'a> AsRef<libc::aiocb> for AioReadv<'a> { 737 fn as_ref(&self) -> &libc::aiocb { 738 &self.aiocb.aiocb.0 739 } 740} 741 742/// Asynchronously writes from a buffer to a file descriptor 743/// 744/// # References 745/// 746/// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html) 747/// 748/// # Examples 749/// 750/// ``` 751/// # use nix::errno::Errno; 752/// # use nix::Error; 753/// # use nix::sys::aio::*; 754/// # use nix::sys::signal::SigevNotify; 755/// # use std::{thread, time}; 756/// # use std::os::unix::io::AsRawFd; 757/// # use tempfile::tempfile; 758/// const WBUF: &[u8] = b"abcdef123456"; 759/// let mut f = tempfile().unwrap(); 760/// let mut aiow = Box::pin( 761/// AioWrite::new( 762/// f.as_raw_fd(), 763/// 2, //offset 764/// WBUF, 765/// 0, //priority 766/// SigevNotify::SigevNone 767/// ) 768/// ); 769/// aiow.as_mut().submit().unwrap(); 770/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { 771/// thread::sleep(time::Duration::from_millis(10)); 772/// } 773/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); 774/// ``` 775#[derive(Debug)] 776#[repr(transparent)] 777pub struct AioWrite<'a> { 778 aiocb: AioCb, 779 _data: PhantomData<&'a [u8]>, 780 _pin: PhantomPinned, 781} 782 783impl<'a> AioWrite<'a> { 784 unsafe_pinned!(aiocb: AioCb); 785 786 /// Returns the requested length of the aio operation in bytes 787 /// 788 /// This method returns the *requested* length of the operation. To get the 789 /// number of bytes actually read or written by a completed operation, use 790 /// `aio_return` instead. 791 pub fn nbytes(&self) -> usize { 792 self.aiocb.aiocb.0.aio_nbytes 793 } 794 795 /// Construct a new `AioWrite`. 796 /// 797 /// # Arguments 798 /// 799 /// * `fd`: File descriptor to write to 800 /// * `offs`: File offset 801 /// * `buf`: A memory buffer. It must outlive the `AioWrite`. 802 /// * `prio`: If POSIX Prioritized IO is supported, then the 803 /// operation will be prioritized at the process's 804 /// priority level minus `prio` 805 /// * `sigev_notify`: Determines how you will be notified of event 806 /// completion. 807 pub fn new( 808 fd: RawFd, 809 offs: off_t, 810 buf: &'a [u8], 811 prio: i32, 812 sigev_notify: SigevNotify, 813 ) -> Self { 814 let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); 815 aiocb.aiocb.0.aio_nbytes = buf.len(); 816 // casting an immutable buffer to a mutable pointer looks unsafe, 817 // but technically its only unsafe to dereference it, not to create 818 // it. Type Safety guarantees that we'll never pass aiocb to 819 // aio_read or aio_readv. 820 aiocb.aiocb.0.aio_buf = buf.as_ptr() as *mut c_void; 821 aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITE; 822 aiocb.aiocb.0.aio_offset = offs; 823 AioWrite { 824 aiocb, 825 _data: PhantomData, 826 _pin: PhantomPinned, 827 } 828 } 829 830 /// Returns the file offset of the operation. 831 pub fn offset(&self) -> off_t { 832 self.aiocb.aiocb.0.aio_offset 833 } 834} 835 836impl<'a> Aio for AioWrite<'a> { 837 type Output = usize; 838 839 aio_methods!(aio_write); 840} 841 842impl<'a> AsMut<libc::aiocb> for AioWrite<'a> { 843 fn as_mut(&mut self) -> &mut libc::aiocb { 844 &mut self.aiocb.aiocb.0 845 } 846} 847 848impl<'a> AsRef<libc::aiocb> for AioWrite<'a> { 849 fn as_ref(&self) -> &libc::aiocb { 850 &self.aiocb.aiocb.0 851 } 852} 853 854/// Asynchronously writes from a scatter/gather list of buffers to a file descriptor. 855/// 856/// # References 857/// 858/// [aio_writev](https://www.freebsd.org/cgi/man.cgi?query=aio_writev) 859/// 860/// # Examples 861/// 862#[cfg_attr(fbsd14, doc = " ```")] 863#[cfg_attr(not(fbsd14), doc = " ```no_run")] 864/// # use nix::errno::Errno; 865/// # use nix::Error; 866/// # use nix::sys::aio::*; 867/// # use nix::sys::signal::SigevNotify; 868/// # use std::{thread, time}; 869/// # use std::io::IoSlice; 870/// # use std::os::unix::io::AsRawFd; 871/// # use tempfile::tempfile; 872/// const wbuf0: &[u8] = b"abcdef"; 873/// const wbuf1: &[u8] = b"123456"; 874/// let len = wbuf0.len() + wbuf1.len(); 875/// let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)]; 876/// let mut f = tempfile().unwrap(); 877/// let mut aiow = Box::pin( 878/// AioWritev::new( 879/// f.as_raw_fd(), 880/// 2, //offset 881/// &wbufs, 882/// 0, //priority 883/// SigevNotify::SigevNone 884/// ) 885/// ); 886/// aiow.as_mut().submit().unwrap(); 887/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { 888/// thread::sleep(time::Duration::from_millis(10)); 889/// } 890/// assert_eq!(aiow.as_mut().aio_return().unwrap(), len); 891/// ``` 892#[cfg(target_os = "freebsd")] 893#[derive(Debug)] 894#[repr(transparent)] 895pub struct AioWritev<'a> { 896 aiocb: AioCb, 897 _data: PhantomData<&'a [&'a [u8]]>, 898 _pin: PhantomPinned, 899} 900 901#[cfg(target_os = "freebsd")] 902impl<'a> AioWritev<'a> { 903 unsafe_pinned!(aiocb: AioCb); 904 905 /// Returns the number of buffers the operation will read into. 906 pub fn iovlen(&self) -> usize { 907 self.aiocb.aiocb.0.aio_nbytes 908 } 909 910 /// Construct a new `AioWritev`. 911 /// 912 /// # Arguments 913 /// 914 /// * `fd`: File descriptor to write to 915 /// * `offs`: File offset 916 /// * `bufs`: A scatter/gather list of memory buffers. They must 917 /// outlive the `AioWritev`. 918 /// * `prio`: If POSIX Prioritized IO is supported, then the 919 /// operation will be prioritized at the process's 920 /// priority level minus `prio` 921 /// * `sigev_notify`: Determines how you will be notified of event 922 /// completion. 923 pub fn new( 924 fd: RawFd, 925 offs: off_t, 926 bufs: &[IoSlice<'a>], 927 prio: i32, 928 sigev_notify: SigevNotify, 929 ) -> Self { 930 let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); 931 // In vectored mode, aio_nbytes stores the length of the iovec array, 932 // not the byte count. 933 aiocb.aiocb.0.aio_nbytes = bufs.len(); 934 // casting an immutable buffer to a mutable pointer looks unsafe, 935 // but technically its only unsafe to dereference it, not to create 936 // it. Type Safety guarantees that we'll never pass aiocb to 937 // aio_read or aio_readv. 938 aiocb.aiocb.0.aio_buf = bufs.as_ptr() as *mut c_void; 939 aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITEV; 940 aiocb.aiocb.0.aio_offset = offs; 941 AioWritev { 942 aiocb, 943 _data: PhantomData, 944 _pin: PhantomPinned, 945 } 946 } 947 948 /// Returns the file offset of the operation. 949 pub fn offset(&self) -> off_t { 950 self.aiocb.aiocb.0.aio_offset 951 } 952} 953 954#[cfg(target_os = "freebsd")] 955impl<'a> Aio for AioWritev<'a> { 956 type Output = usize; 957 958 aio_methods!(aio_writev); 959} 960 961#[cfg(target_os = "freebsd")] 962impl<'a> AsMut<libc::aiocb> for AioWritev<'a> { 963 fn as_mut(&mut self) -> &mut libc::aiocb { 964 &mut self.aiocb.aiocb.0 965 } 966} 967 968#[cfg(target_os = "freebsd")] 969impl<'a> AsRef<libc::aiocb> for AioWritev<'a> { 970 fn as_ref(&self) -> &libc::aiocb { 971 &self.aiocb.aiocb.0 972 } 973} 974 975/// Cancels outstanding AIO requests for a given file descriptor. 976/// 977/// # Examples 978/// 979/// Issue an aio operation, then cancel all outstanding operations on that file 980/// descriptor. 981/// 982/// ``` 983/// # use nix::errno::Errno; 984/// # use nix::Error; 985/// # use nix::sys::aio::*; 986/// # use nix::sys::signal::SigevNotify; 987/// # use std::{thread, time}; 988/// # use std::io::Write; 989/// # use std::os::unix::io::AsRawFd; 990/// # use tempfile::tempfile; 991/// let wbuf = b"CDEF"; 992/// let mut f = tempfile().unwrap(); 993/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), 994/// 2, //offset 995/// &wbuf[..], 996/// 0, //priority 997/// SigevNotify::SigevNone)); 998/// aiocb.as_mut().submit().unwrap(); 999/// let cs = aio_cancel_all(f.as_raw_fd()).unwrap(); 1000/// if cs == AioCancelStat::AioNotCanceled { 1001/// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { 1002/// thread::sleep(time::Duration::from_millis(10)); 1003/// } 1004/// } 1005/// // Must call `aio_return`, but ignore the result 1006/// let _ = aiocb.as_mut().aio_return(); 1007/// ``` 1008/// 1009/// # References 1010/// 1011/// [`aio_cancel`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) 1012pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> { 1013 match unsafe { libc::aio_cancel(fd, ptr::null_mut()) } { 1014 libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), 1015 libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), 1016 libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), 1017 -1 => Err(Errno::last()), 1018 _ => panic!("unknown aio_cancel return value"), 1019 } 1020} 1021 1022/// Suspends the calling process until at least one of the specified operations 1023/// have completed, a signal is delivered, or the timeout has passed. 1024/// 1025/// If `timeout` is `None`, `aio_suspend` will block indefinitely. 1026/// 1027/// # Examples 1028/// 1029/// Use `aio_suspend` to block until an aio operation completes. 1030/// 1031/// ``` 1032/// # use nix::sys::aio::*; 1033/// # use nix::sys::signal::SigevNotify; 1034/// # use std::os::unix::io::AsRawFd; 1035/// # use tempfile::tempfile; 1036/// const WBUF: &[u8] = b"abcdef123456"; 1037/// let mut f = tempfile().unwrap(); 1038/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), 1039/// 2, //offset 1040/// WBUF, 1041/// 0, //priority 1042/// SigevNotify::SigevNone)); 1043/// aiocb.as_mut().submit().unwrap(); 1044/// aio_suspend(&[&*aiocb], None).expect("aio_suspend failed"); 1045/// assert_eq!(aiocb.as_mut().aio_return().unwrap() as usize, WBUF.len()); 1046/// ``` 1047/// # References 1048/// 1049/// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html) 1050pub fn aio_suspend( 1051 list: &[&dyn AsRef<libc::aiocb>], 1052 timeout: Option<TimeSpec>, 1053) -> Result<()> { 1054 let p = list as *const [&dyn AsRef<libc::aiocb>] 1055 as *const [*const libc::aiocb] as *const *const libc::aiocb; 1056 let timep = match timeout { 1057 None => ptr::null::<libc::timespec>(), 1058 Some(x) => x.as_ref() as *const libc::timespec, 1059 }; 1060 Errno::result(unsafe { libc::aio_suspend(p, list.len() as i32, timep) }) 1061 .map(drop) 1062} 1063 1064/// Submits multiple asynchronous I/O requests with a single system call. 1065/// 1066/// They are not guaranteed to complete atomically, and the order in which the 1067/// requests are carried out is not specified. Reads, and writes may be freely 1068/// mixed. 1069/// 1070/// # Examples 1071/// 1072/// Use `lio_listio` to submit an aio operation and wait for its completion. In 1073/// this case, there is no need to use aio_suspend to wait or `error` to poll. 1074/// This mode is useful for otherwise-synchronous programs that want to execute 1075/// a handful of I/O operations in parallel. 1076/// ``` 1077/// # use std::os::unix::io::AsRawFd; 1078/// # use nix::sys::aio::*; 1079/// # use nix::sys::signal::SigevNotify; 1080/// # use tempfile::tempfile; 1081/// const WBUF: &[u8] = b"abcdef123456"; 1082/// let mut f = tempfile().unwrap(); 1083/// let mut aiow = Box::pin(AioWrite::new( 1084/// f.as_raw_fd(), 1085/// 2, // offset 1086/// WBUF, 1087/// 0, // priority 1088/// SigevNotify::SigevNone 1089/// )); 1090/// lio_listio(LioMode::LIO_WAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone) 1091/// .unwrap(); 1092/// // At this point, we are guaranteed that aiow is complete. 1093/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); 1094/// ``` 1095/// 1096/// Use `lio_listio` to submit multiple asynchronous operations with a single 1097/// syscall, but receive notification individually. This is an efficient 1098/// technique for reducing overall context-switch overhead, especially when 1099/// combined with kqueue. 1100/// ``` 1101/// # use std::os::unix::io::AsRawFd; 1102/// # use std::thread; 1103/// # use std::time; 1104/// # use nix::errno::Errno; 1105/// # use nix::sys::aio::*; 1106/// # use nix::sys::signal::SigevNotify; 1107/// # use tempfile::tempfile; 1108/// const WBUF: &[u8] = b"abcdef123456"; 1109/// let mut f = tempfile().unwrap(); 1110/// let mut aiow = Box::pin(AioWrite::new( 1111/// f.as_raw_fd(), 1112/// 2, // offset 1113/// WBUF, 1114/// 0, // priority 1115/// SigevNotify::SigevNone 1116/// )); 1117/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone) 1118/// .unwrap(); 1119/// // We must wait for the completion of each individual operation 1120/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { 1121/// thread::sleep(time::Duration::from_millis(10)); 1122/// } 1123/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); 1124/// ``` 1125/// 1126/// Use `lio_listio` to submit multiple operations, and receive notification 1127/// only when all of them are complete. This can be useful when there is some 1128/// logical relationship between the operations. But beware! Errors or system 1129/// resource limitations may cause `lio_listio` to return `EIO`, `EAGAIN`, or 1130/// `EINTR`, in which case some but not all operations may have been submitted. 1131/// In that case, you must check the status of each individual operation, and 1132/// possibly resubmit some. 1133/// ``` 1134/// # use libc::c_int; 1135/// # use std::os::unix::io::AsRawFd; 1136/// # use std::sync::atomic::{AtomicBool, Ordering}; 1137/// # use std::thread; 1138/// # use std::time; 1139/// # use lazy_static::lazy_static; 1140/// # use nix::errno::Errno; 1141/// # use nix::sys::aio::*; 1142/// # use nix::sys::signal::*; 1143/// # use tempfile::tempfile; 1144/// lazy_static! { 1145/// pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); 1146/// } 1147/// 1148/// extern fn sigfunc(_: c_int) { 1149/// SIGNALED.store(true, Ordering::Relaxed); 1150/// } 1151/// let sa = SigAction::new(SigHandler::Handler(sigfunc), 1152/// SaFlags::SA_RESETHAND, 1153/// SigSet::empty()); 1154/// SIGNALED.store(false, Ordering::Relaxed); 1155/// unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); 1156/// 1157/// const WBUF: &[u8] = b"abcdef123456"; 1158/// let mut f = tempfile().unwrap(); 1159/// let mut aiow = Box::pin(AioWrite::new( 1160/// f.as_raw_fd(), 1161/// 2, // offset 1162/// WBUF, 1163/// 0, // priority 1164/// SigevNotify::SigevNone 1165/// )); 1166/// let sev = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, si_value: 0 }; 1167/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], sev).unwrap(); 1168/// while !SIGNALED.load(Ordering::Relaxed) { 1169/// thread::sleep(time::Duration::from_millis(10)); 1170/// } 1171/// // At this point, since `lio_listio` returned success and delivered its 1172/// // notification, we know that all operations are complete. 1173/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); 1174/// ``` 1175pub fn lio_listio( 1176 mode: LioMode, 1177 list: &mut [Pin<&mut dyn AsMut<libc::aiocb>>], 1178 sigev_notify: SigevNotify, 1179) -> Result<()> { 1180 let p = list as *mut [Pin<&mut dyn AsMut<libc::aiocb>>] 1181 as *mut [*mut libc::aiocb] as *mut *mut libc::aiocb; 1182 let sigev = SigEvent::new(sigev_notify); 1183 let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; 1184 Errno::result(unsafe { 1185 libc::lio_listio(mode as i32, p, list.len() as i32, sigevp) 1186 }) 1187 .map(drop) 1188} 1189 1190#[cfg(test)] 1191mod t { 1192 use super::*; 1193 1194 /// aio_suspend relies on casting Rust Aio* struct pointers to libc::aiocb 1195 /// pointers. This test ensures that such casts are valid. 1196 #[test] 1197 fn casting() { 1198 let sev = SigevNotify::SigevNone; 1199 let aiof = AioFsync::new(666, AioFsyncMode::O_SYNC, 0, sev); 1200 assert_eq!( 1201 aiof.as_ref() as *const libc::aiocb, 1202 &aiof as *const AioFsync as *const libc::aiocb 1203 ); 1204 1205 let mut rbuf = []; 1206 let aior = AioRead::new(666, 0, &mut rbuf, 0, sev); 1207 assert_eq!( 1208 aior.as_ref() as *const libc::aiocb, 1209 &aior as *const AioRead as *const libc::aiocb 1210 ); 1211 1212 let wbuf = []; 1213 let aiow = AioWrite::new(666, 0, &wbuf, 0, sev); 1214 assert_eq!( 1215 aiow.as_ref() as *const libc::aiocb, 1216 &aiow as *const AioWrite as *const libc::aiocb 1217 ); 1218 } 1219 1220 #[cfg(target_os = "freebsd")] 1221 #[test] 1222 fn casting_vectored() { 1223 let sev = SigevNotify::SigevNone; 1224 1225 let mut rbuf = []; 1226 let mut rbufs = [IoSliceMut::new(&mut rbuf)]; 1227 let aiorv = AioReadv::new(666, 0, &mut rbufs[..], 0, sev); 1228 assert_eq!( 1229 aiorv.as_ref() as *const libc::aiocb, 1230 &aiorv as *const AioReadv as *const libc::aiocb 1231 ); 1232 1233 let wbuf = []; 1234 let wbufs = [IoSlice::new(&wbuf)]; 1235 let aiowv = AioWritev::new(666, 0, &wbufs, 0, sev); 1236 assert_eq!( 1237 aiowv.as_ref() as *const libc::aiocb, 1238 &aiowv as *const AioWritev as *const libc::aiocb 1239 ); 1240 } 1241} 1242