1#![cfg(any(target_os = "android", target_os = "linux"))] 2 3use rustix::fd::{AsFd, OwnedFd}; 4use rustix::io::epoll::{self, Epoll}; 5use rustix::io::{ioctl_fionbio, read, write}; 6use rustix::net::{ 7 accept, bind_v4, connect_v4, getsockname, listen, socket, AddressFamily, Ipv4Addr, Protocol, 8 SocketAddrAny, SocketAddrV4, SocketType, 9}; 10use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 11use std::sync::{Arc, Condvar, Mutex}; 12use std::thread; 13 14const BUFFER_SIZE: usize = 20; 15 16fn server(ready: Arc<(Mutex<u16>, Condvar)>) { 17 let listen_sock = socket(AddressFamily::INET, SocketType::STREAM, Protocol::default()).unwrap(); 18 bind_v4(&listen_sock, &SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap(); 19 listen(&listen_sock, 1).unwrap(); 20 21 let who = match getsockname(&listen_sock).unwrap() { 22 SocketAddrAny::V4(addr) => addr, 23 _ => panic!(), 24 }; 25 26 { 27 let (lock, cvar) = &*ready; 28 let mut port = lock.lock().unwrap(); 29 *port = who.port(); 30 cvar.notify_all(); 31 } 32 33 let epoll = Epoll::new(epoll::CreateFlags::CLOEXEC, epoll::Owning::<OwnedFd>::new()).unwrap(); 34 35 // Test into conversions. 36 let fd: OwnedFd = epoll.into(); 37 let epoll: Epoll<epoll::Owning<OwnedFd>> = fd.into(); 38 let fd: RawFd = epoll.into_raw_fd(); 39 let epoll = unsafe { Epoll::<epoll::Owning<OwnedFd>>::from_raw_fd(fd) }; 40 41 let raw_listen_sock = listen_sock.as_fd().as_raw_fd(); 42 epoll.add(listen_sock, epoll::EventFlags::IN).unwrap(); 43 44 let mut event_list = epoll::EventVec::with_capacity(4); 45 loop { 46 epoll.wait(&mut event_list, -1).unwrap(); 47 for (_event_flags, target) in &event_list { 48 if target.as_raw_fd() == raw_listen_sock { 49 let conn_sock = accept(&*target).unwrap(); 50 ioctl_fionbio(&conn_sock, true).unwrap(); 51 epoll 52 .add(conn_sock, epoll::EventFlags::OUT | epoll::EventFlags::ET) 53 .unwrap(); 54 } else { 55 write(&*target, b"hello\n").unwrap(); 56 let _ = epoll.del(target).unwrap(); 57 } 58 } 59 } 60} 61 62fn client(ready: Arc<(Mutex<u16>, Condvar)>) { 63 let port = { 64 let (lock, cvar) = &*ready; 65 let mut port = lock.lock().unwrap(); 66 while *port == 0 { 67 port = cvar.wait(port).unwrap(); 68 } 69 *port 70 }; 71 72 let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port); 73 let mut buffer = vec![0; BUFFER_SIZE]; 74 75 for _ in 0..16 { 76 let data_socket = 77 socket(AddressFamily::INET, SocketType::STREAM, Protocol::default()).unwrap(); 78 connect_v4(&data_socket, &addr).unwrap(); 79 80 let nread = read(&data_socket, &mut buffer).unwrap(); 81 assert_eq!(String::from_utf8_lossy(&buffer[..nread]), "hello\n"); 82 } 83} 84 85#[test] 86fn test_epoll() { 87 let ready = Arc::new((Mutex::new(0_u16), Condvar::new())); 88 let ready_clone = Arc::clone(&ready); 89 90 let _server = thread::Builder::new() 91 .name("server".to_string()) 92 .spawn(move || { 93 server(ready); 94 }) 95 .unwrap(); 96 let client = thread::Builder::new() 97 .name("client".to_string()) 98 .spawn(move || { 99 client(ready_clone); 100 }) 101 .unwrap(); 102 client.join().unwrap(); 103} 104