1use std::{ 2 io::{Read, Seek, Write}, 3 ops::Deref, 4 os::unix::io::AsRawFd, 5 pin::Pin, 6 sync::atomic::{AtomicBool, Ordering}, 7 thread, time, 8}; 9 10use libc::c_int; 11use nix::{ 12 errno::*, 13 sys::{ 14 aio::*, 15 signal::{ 16 sigaction, SaFlags, SigAction, SigHandler, SigSet, SigevNotify, 17 Signal, 18 }, 19 time::{TimeSpec, TimeValLike}, 20 }, 21}; 22use tempfile::tempfile; 23 24lazy_static! { 25 pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); 26} 27 28extern "C" fn sigfunc(_: c_int) { 29 SIGNALED.store(true, Ordering::Relaxed); 30} 31 32// Helper that polls an AioCb for completion or error 33macro_rules! poll_aio { 34 ($aiocb: expr) => { 35 loop { 36 let err = $aiocb.as_mut().error(); 37 if err != Err(Errno::EINPROGRESS) { 38 break err; 39 }; 40 thread::sleep(time::Duration::from_millis(10)); 41 } 42 }; 43} 44 45mod aio_fsync { 46 use super::*; 47 48 #[test] 49 fn test_accessors() { 50 let aiocb = AioFsync::new( 51 1001, 52 AioFsyncMode::O_SYNC, 53 42, 54 SigevNotify::SigevSignal { 55 signal: Signal::SIGUSR2, 56 si_value: 99, 57 }, 58 ); 59 assert_eq!(1001, aiocb.fd()); 60 assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode()); 61 assert_eq!(42, aiocb.priority()); 62 let sev = aiocb.sigevent().sigevent(); 63 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); 64 assert_eq!(99, sev.sigev_value.sival_ptr as i64); 65 } 66 67 /// `AioFsync::submit` should not modify the `AioCb` object if 68 /// `libc::aio_fsync` returns an error 69 // Skip on Linux, because Linux's AIO implementation can't detect errors 70 // synchronously 71 #[test] 72 #[cfg(any(target_os = "freebsd", target_os = "macos"))] 73 fn error() { 74 use std::mem; 75 76 const INITIAL: &[u8] = b"abcdef123456"; 77 // Create an invalid AioFsyncMode 78 let mode = unsafe { mem::transmute(666) }; 79 let mut f = tempfile().unwrap(); 80 f.write_all(INITIAL).unwrap(); 81 let mut aiof = Box::pin(AioFsync::new( 82 f.as_raw_fd(), 83 mode, 84 0, 85 SigevNotify::SigevNone, 86 )); 87 let err = aiof.as_mut().submit(); 88 err.expect_err("assertion failed"); 89 } 90 91 #[test] 92 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 93 fn ok() { 94 const INITIAL: &[u8] = b"abcdef123456"; 95 let mut f = tempfile().unwrap(); 96 f.write_all(INITIAL).unwrap(); 97 let fd = f.as_raw_fd(); 98 let mut aiof = Box::pin(AioFsync::new( 99 fd, 100 AioFsyncMode::O_SYNC, 101 0, 102 SigevNotify::SigevNone, 103 )); 104 aiof.as_mut().submit().unwrap(); 105 poll_aio!(&mut aiof).unwrap(); 106 aiof.as_mut().aio_return().unwrap(); 107 } 108} 109 110mod aio_read { 111 use super::*; 112 113 #[test] 114 fn test_accessors() { 115 let mut rbuf = vec![0; 4]; 116 let aiocb = AioRead::new( 117 1001, 118 2, //offset 119 &mut rbuf, 120 42, //priority 121 SigevNotify::SigevSignal { 122 signal: Signal::SIGUSR2, 123 si_value: 99, 124 }, 125 ); 126 assert_eq!(1001, aiocb.fd()); 127 assert_eq!(4, aiocb.nbytes()); 128 assert_eq!(2, aiocb.offset()); 129 assert_eq!(42, aiocb.priority()); 130 let sev = aiocb.sigevent().sigevent(); 131 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); 132 assert_eq!(99, sev.sigev_value.sival_ptr as i64); 133 } 134 135 // Tests AioWrite.cancel. We aren't trying to test the OS's implementation, 136 // only our bindings. So it's sufficient to check that cancel 137 // returned any AioCancelStat value. 138 #[test] 139 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 140 fn cancel() { 141 const INITIAL: &[u8] = b"abcdef123456"; 142 let mut rbuf = vec![0; 4]; 143 let mut f = tempfile().unwrap(); 144 f.write_all(INITIAL).unwrap(); 145 let fd = f.as_raw_fd(); 146 let mut aior = 147 Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone)); 148 aior.as_mut().submit().unwrap(); 149 150 aior.as_mut().cancel().unwrap(); 151 152 // Wait for aiow to complete, but don't care whether it succeeded 153 let _ = poll_aio!(&mut aior); 154 let _ = aior.as_mut().aio_return(); 155 } 156 157 /// `AioRead::submit` should not modify the `AioCb` object if 158 /// `libc::aio_read` returns an error 159 // Skip on Linux, because Linux's AIO implementation can't detect errors 160 // synchronously 161 #[test] 162 #[cfg(any(target_os = "freebsd", target_os = "macos"))] 163 fn error() { 164 const INITIAL: &[u8] = b"abcdef123456"; 165 let mut rbuf = vec![0; 4]; 166 let mut f = tempfile().unwrap(); 167 f.write_all(INITIAL).unwrap(); 168 let mut aior = Box::pin(AioRead::new( 169 f.as_raw_fd(), 170 -1, //an invalid offset 171 &mut rbuf, 172 0, //priority 173 SigevNotify::SigevNone, 174 )); 175 aior.as_mut().submit().expect_err("assertion failed"); 176 } 177 178 // Test a simple aio operation with no completion notification. We must 179 // poll for completion 180 #[test] 181 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 182 fn ok() { 183 const INITIAL: &[u8] = b"abcdef123456"; 184 let mut rbuf = vec![0; 4]; 185 const EXPECT: &[u8] = b"cdef"; 186 let mut f = tempfile().unwrap(); 187 f.write_all(INITIAL).unwrap(); 188 { 189 let fd = f.as_raw_fd(); 190 let mut aior = Box::pin(AioRead::new( 191 fd, 192 2, 193 &mut rbuf, 194 0, 195 SigevNotify::SigevNone, 196 )); 197 aior.as_mut().submit().unwrap(); 198 199 let err = poll_aio!(&mut aior); 200 assert_eq!(err, Ok(())); 201 assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len()); 202 } 203 assert_eq!(EXPECT, rbuf.deref().deref()); 204 } 205 206 // Like ok, but allocates the structure on the stack. 207 #[test] 208 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 209 fn on_stack() { 210 const INITIAL: &[u8] = b"abcdef123456"; 211 let mut rbuf = vec![0; 4]; 212 const EXPECT: &[u8] = b"cdef"; 213 let mut f = tempfile().unwrap(); 214 f.write_all(INITIAL).unwrap(); 215 { 216 let fd = f.as_raw_fd(); 217 let mut aior = 218 AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone); 219 let mut aior = unsafe { Pin::new_unchecked(&mut aior) }; 220 aior.as_mut().submit().unwrap(); 221 222 let err = poll_aio!(&mut aior); 223 assert_eq!(err, Ok(())); 224 assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len()); 225 } 226 assert_eq!(EXPECT, rbuf.deref().deref()); 227 } 228} 229 230#[cfg(target_os = "freebsd")] 231#[cfg(fbsd14)] 232mod aio_readv { 233 use std::io::IoSliceMut; 234 235 use super::*; 236 237 #[test] 238 fn test_accessors() { 239 let mut rbuf0 = vec![0; 4]; 240 let mut rbuf1 = vec![0; 8]; 241 let mut rbufs = 242 [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; 243 let aiocb = AioReadv::new( 244 1001, 245 2, //offset 246 &mut rbufs, 247 42, //priority 248 SigevNotify::SigevSignal { 249 signal: Signal::SIGUSR2, 250 si_value: 99, 251 }, 252 ); 253 assert_eq!(1001, aiocb.fd()); 254 assert_eq!(2, aiocb.iovlen()); 255 assert_eq!(2, aiocb.offset()); 256 assert_eq!(42, aiocb.priority()); 257 let sev = aiocb.sigevent().sigevent(); 258 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); 259 assert_eq!(99, sev.sigev_value.sival_ptr as i64); 260 } 261 262 #[test] 263 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 264 fn ok() { 265 const INITIAL: &[u8] = b"abcdef123456"; 266 let mut rbuf0 = vec![0; 4]; 267 let mut rbuf1 = vec![0; 2]; 268 let mut rbufs = 269 [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; 270 const EXPECT0: &[u8] = b"cdef"; 271 const EXPECT1: &[u8] = b"12"; 272 let mut f = tempfile().unwrap(); 273 f.write_all(INITIAL).unwrap(); 274 { 275 let fd = f.as_raw_fd(); 276 let mut aior = Box::pin(AioReadv::new( 277 fd, 278 2, 279 &mut rbufs, 280 0, 281 SigevNotify::SigevNone, 282 )); 283 aior.as_mut().submit().unwrap(); 284 285 let err = poll_aio!(&mut aior); 286 assert_eq!(err, Ok(())); 287 assert_eq!( 288 aior.as_mut().aio_return().unwrap(), 289 EXPECT0.len() + EXPECT1.len() 290 ); 291 } 292 assert_eq!(&EXPECT0, &rbuf0); 293 assert_eq!(&EXPECT1, &rbuf1); 294 } 295} 296 297mod aio_write { 298 use super::*; 299 300 #[test] 301 fn test_accessors() { 302 let wbuf = vec![0; 4]; 303 let aiocb = AioWrite::new( 304 1001, 305 2, //offset 306 &wbuf, 307 42, //priority 308 SigevNotify::SigevSignal { 309 signal: Signal::SIGUSR2, 310 si_value: 99, 311 }, 312 ); 313 assert_eq!(1001, aiocb.fd()); 314 assert_eq!(4, aiocb.nbytes()); 315 assert_eq!(2, aiocb.offset()); 316 assert_eq!(42, aiocb.priority()); 317 let sev = aiocb.sigevent().sigevent(); 318 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); 319 assert_eq!(99, sev.sigev_value.sival_ptr as i64); 320 } 321 322 // Tests AioWrite.cancel. We aren't trying to test the OS's implementation, 323 // only our bindings. So it's sufficient to check that cancel 324 // returned any AioCancelStat value. 325 #[test] 326 #[cfg_attr(target_env = "musl", ignore)] 327 fn cancel() { 328 let wbuf: &[u8] = b"CDEF"; 329 330 let f = tempfile().unwrap(); 331 let mut aiow = Box::pin(AioWrite::new( 332 f.as_raw_fd(), 333 0, 334 wbuf, 335 0, 336 SigevNotify::SigevNone, 337 )); 338 aiow.as_mut().submit().unwrap(); 339 let err = aiow.as_mut().error(); 340 assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); 341 342 aiow.as_mut().cancel().unwrap(); 343 344 // Wait for aiow to complete, but don't care whether it succeeded 345 let _ = poll_aio!(&mut aiow); 346 let _ = aiow.as_mut().aio_return(); 347 } 348 349 // Test a simple aio operation with no completion notification. We must 350 // poll for completion. 351 #[test] 352 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 353 fn ok() { 354 const INITIAL: &[u8] = b"abcdef123456"; 355 let wbuf = "CDEF".to_string().into_bytes(); 356 let mut rbuf = Vec::new(); 357 const EXPECT: &[u8] = b"abCDEF123456"; 358 359 let mut f = tempfile().unwrap(); 360 f.write_all(INITIAL).unwrap(); 361 let mut aiow = Box::pin(AioWrite::new( 362 f.as_raw_fd(), 363 2, 364 &wbuf, 365 0, 366 SigevNotify::SigevNone, 367 )); 368 aiow.as_mut().submit().unwrap(); 369 370 let err = poll_aio!(&mut aiow); 371 assert_eq!(err, Ok(())); 372 assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len()); 373 374 f.rewind().unwrap(); 375 let len = f.read_to_end(&mut rbuf).unwrap(); 376 assert_eq!(len, EXPECT.len()); 377 assert_eq!(rbuf, EXPECT); 378 } 379 380 // Like ok, but allocates the structure on the stack. 381 #[test] 382 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 383 fn on_stack() { 384 const INITIAL: &[u8] = b"abcdef123456"; 385 let wbuf = "CDEF".to_string().into_bytes(); 386 let mut rbuf = Vec::new(); 387 const EXPECT: &[u8] = b"abCDEF123456"; 388 389 let mut f = tempfile().unwrap(); 390 f.write_all(INITIAL).unwrap(); 391 let mut aiow = AioWrite::new( 392 f.as_raw_fd(), 393 2, //offset 394 &wbuf, 395 0, //priority 396 SigevNotify::SigevNone, 397 ); 398 let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) }; 399 aiow.as_mut().submit().unwrap(); 400 401 let err = poll_aio!(&mut aiow); 402 assert_eq!(err, Ok(())); 403 assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len()); 404 405 f.rewind().unwrap(); 406 let len = f.read_to_end(&mut rbuf).unwrap(); 407 assert_eq!(len, EXPECT.len()); 408 assert_eq!(rbuf, EXPECT); 409 } 410 411 /// `AioWrite::write` should not modify the `AioCb` object if 412 /// `libc::aio_write` returns an error. 413 // Skip on Linux, because Linux's AIO implementation can't detect errors 414 // synchronously 415 #[test] 416 #[cfg(any(target_os = "freebsd", target_os = "macos"))] 417 fn error() { 418 let wbuf = "CDEF".to_string().into_bytes(); 419 let mut aiow = Box::pin(AioWrite::new( 420 666, // An invalid file descriptor 421 0, //offset 422 &wbuf, 423 0, //priority 424 SigevNotify::SigevNone, 425 )); 426 aiow.as_mut().submit().expect_err("assertion failed"); 427 // Dropping the AioWrite at this point should not panic 428 } 429} 430 431#[cfg(target_os = "freebsd")] 432#[cfg(fbsd14)] 433mod aio_writev { 434 use std::io::IoSlice; 435 436 use super::*; 437 438 #[test] 439 fn test_accessors() { 440 let wbuf0 = vec![0; 4]; 441 let wbuf1 = vec![0; 8]; 442 let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)]; 443 let aiocb = AioWritev::new( 444 1001, 445 2, //offset 446 &wbufs, 447 42, //priority 448 SigevNotify::SigevSignal { 449 signal: Signal::SIGUSR2, 450 si_value: 99, 451 }, 452 ); 453 assert_eq!(1001, aiocb.fd()); 454 assert_eq!(2, aiocb.iovlen()); 455 assert_eq!(2, aiocb.offset()); 456 assert_eq!(42, aiocb.priority()); 457 let sev = aiocb.sigevent().sigevent(); 458 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); 459 assert_eq!(99, sev.sigev_value.sival_ptr as i64); 460 } 461 462 // Test a simple aio operation with no completion notification. We must 463 // poll for completion. 464 #[test] 465 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] 466 fn ok() { 467 const INITIAL: &[u8] = b"abcdef123456"; 468 let wbuf0 = b"BC"; 469 let wbuf1 = b"DEF"; 470 let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)]; 471 let wlen = wbuf0.len() + wbuf1.len(); 472 let mut rbuf = Vec::new(); 473 const EXPECT: &[u8] = b"aBCDEF123456"; 474 475 let mut f = tempfile().unwrap(); 476 f.write_all(INITIAL).unwrap(); 477 let mut aiow = Box::pin(AioWritev::new( 478 f.as_raw_fd(), 479 1, 480 &wbufs, 481 0, 482 SigevNotify::SigevNone, 483 )); 484 aiow.as_mut().submit().unwrap(); 485 486 let err = poll_aio!(&mut aiow); 487 assert_eq!(err, Ok(())); 488 assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen); 489 490 f.rewind().unwrap(); 491 let len = f.read_to_end(&mut rbuf).unwrap(); 492 assert_eq!(len, EXPECT.len()); 493 assert_eq!(rbuf, EXPECT); 494 } 495} 496 497// Test an aio operation with completion delivered by a signal 498#[test] 499#[cfg_attr( 500 any( 501 all(target_env = "musl", target_arch = "x86_64"), 502 target_arch = "mips", 503 target_arch = "mips64" 504 ), 505 ignore 506)] 507fn sigev_signal() { 508 let _m = crate::SIGNAL_MTX.lock(); 509 let sa = SigAction::new( 510 SigHandler::Handler(sigfunc), 511 SaFlags::SA_RESETHAND, 512 SigSet::empty(), 513 ); 514 SIGNALED.store(false, Ordering::Relaxed); 515 unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); 516 517 const INITIAL: &[u8] = b"abcdef123456"; 518 const WBUF: &[u8] = b"CDEF"; 519 let mut rbuf = Vec::new(); 520 const EXPECT: &[u8] = b"abCDEF123456"; 521 522 let mut f = tempfile().unwrap(); 523 f.write_all(INITIAL).unwrap(); 524 let mut aiow = Box::pin(AioWrite::new( 525 f.as_raw_fd(), 526 2, //offset 527 WBUF, 528 0, //priority 529 SigevNotify::SigevSignal { 530 signal: Signal::SIGUSR2, 531 si_value: 0, //TODO: validate in sigfunc 532 }, 533 )); 534 aiow.as_mut().submit().unwrap(); 535 while !SIGNALED.load(Ordering::Relaxed) { 536 thread::sleep(time::Duration::from_millis(10)); 537 } 538 539 assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); 540 f.rewind().unwrap(); 541 let len = f.read_to_end(&mut rbuf).unwrap(); 542 assert_eq!(len, EXPECT.len()); 543 assert_eq!(rbuf, EXPECT); 544} 545 546// Tests using aio_cancel_all for all outstanding IOs. 547#[test] 548#[cfg_attr(target_env = "musl", ignore)] 549fn test_aio_cancel_all() { 550 let wbuf: &[u8] = b"CDEF"; 551 552 let f = tempfile().unwrap(); 553 let mut aiocb = Box::pin(AioWrite::new( 554 f.as_raw_fd(), 555 0, //offset 556 wbuf, 557 0, //priority 558 SigevNotify::SigevNone, 559 )); 560 aiocb.as_mut().submit().unwrap(); 561 let err = aiocb.as_mut().error(); 562 assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); 563 564 aio_cancel_all(f.as_raw_fd()).unwrap(); 565 566 // Wait for aiocb to complete, but don't care whether it succeeded 567 let _ = poll_aio!(&mut aiocb); 568 let _ = aiocb.as_mut().aio_return(); 569} 570 571#[test] 572// On Cirrus on Linux, this test fails due to a glibc bug. 573// https://github.com/nix-rust/nix/issues/1099 574#[cfg_attr(target_os = "linux", ignore)] 575// On Cirrus, aio_suspend is failing with EINVAL 576// https://github.com/nix-rust/nix/issues/1361 577#[cfg_attr(target_os = "macos", ignore)] 578fn test_aio_suspend() { 579 const INITIAL: &[u8] = b"abcdef123456"; 580 const WBUF: &[u8] = b"CDEFG"; 581 let timeout = TimeSpec::seconds(10); 582 let mut rbuf = vec![0; 4]; 583 let rlen = rbuf.len(); 584 let mut f = tempfile().unwrap(); 585 f.write_all(INITIAL).unwrap(); 586 587 let mut wcb = Box::pin(AioWrite::new( 588 f.as_raw_fd(), 589 2, //offset 590 WBUF, 591 0, //priority 592 SigevNotify::SigevNone, 593 )); 594 595 let mut rcb = Box::pin(AioRead::new( 596 f.as_raw_fd(), 597 8, //offset 598 &mut rbuf, 599 0, //priority 600 SigevNotify::SigevNone, 601 )); 602 wcb.as_mut().submit().unwrap(); 603 rcb.as_mut().submit().unwrap(); 604 loop { 605 { 606 let cbbuf = [ 607 &*wcb as &dyn AsRef<libc::aiocb>, 608 &*rcb as &dyn AsRef<libc::aiocb>, 609 ]; 610 let r = aio_suspend(&cbbuf[..], Some(timeout)); 611 match r { 612 Err(Errno::EINTR) => continue, 613 Err(e) => panic!("aio_suspend returned {:?}", e), 614 Ok(_) => (), 615 }; 616 } 617 if rcb.as_mut().error() != Err(Errno::EINPROGRESS) 618 && wcb.as_mut().error() != Err(Errno::EINPROGRESS) 619 { 620 break; 621 } 622 } 623 624 assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len()); 625 assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen); 626} 627