1// SPDX-License-Identifier: GPL-2.0
2
3#define _GNU_SOURCE
4
5#include <arpa/inet.h>
6#include <error.h>
7#include <errno.h>
8#include <limits.h>
9#include <linux/errqueue.h>
10#include <linux/if_packet.h>
11#include <linux/socket.h>
12#include <linux/sockios.h>
13#include <net/ethernet.h>
14#include <net/if.h>
15#include <netinet/ip.h>
16#include <netinet/ip6.h>
17#include <netinet/tcp.h>
18#include <netinet/udp.h>
19#include <poll.h>
20#include <sched.h>
21#include <stdbool.h>
22#include <stdio.h>
23#include <stdint.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/ioctl.h>
27#include <sys/socket.h>
28#include <sys/stat.h>
29#include <sys/time.h>
30#include <sys/types.h>
31#include <sys/wait.h>
32#include <unistd.h>
33
34#ifndef UDP_GRO
35#define UDP_GRO		104
36#endif
37
38static int  cfg_port		= 8000;
39static bool cfg_tcp;
40static bool cfg_verify;
41static bool cfg_read_all;
42static bool cfg_gro_segment;
43static int  cfg_family		= PF_INET6;
44static int  cfg_alen 		= sizeof(struct sockaddr_in6);
45static int  cfg_expected_pkt_nr;
46static int  cfg_expected_pkt_len;
47static int  cfg_expected_gso_size;
48static int  cfg_connect_timeout_ms;
49static int  cfg_rcv_timeout_ms;
50static struct sockaddr_storage cfg_bind_addr;
51
52static bool interrupted;
53static unsigned long packets, bytes;
54
55static void sigint_handler(int signum)
56{
57	if (signum == SIGINT)
58		interrupted = true;
59}
60
61static void setup_sockaddr(int domain, const char *str_addr, void *sockaddr)
62{
63	struct sockaddr_in6 *addr6 = (void *) sockaddr;
64	struct sockaddr_in *addr4 = (void *) sockaddr;
65
66	switch (domain) {
67	case PF_INET:
68		addr4->sin_family = AF_INET;
69		addr4->sin_port = htons(cfg_port);
70		if (inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
71			error(1, 0, "ipv4 parse error: %s", str_addr);
72		break;
73	case PF_INET6:
74		addr6->sin6_family = AF_INET6;
75		addr6->sin6_port = htons(cfg_port);
76		if (inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
77			error(1, 0, "ipv6 parse error: %s", str_addr);
78		break;
79	default:
80		error(1, 0, "illegal domain");
81	}
82}
83
84static unsigned long gettimeofday_ms(void)
85{
86	struct timeval tv;
87
88	gettimeofday(&tv, NULL);
89	return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
90}
91
92static void do_poll(int fd, int timeout_ms)
93{
94	struct pollfd pfd;
95	int ret;
96
97	pfd.events = POLLIN;
98	pfd.revents = 0;
99	pfd.fd = fd;
100
101	do {
102		ret = poll(&pfd, 1, 10);
103		if (interrupted)
104			break;
105		if (ret == -1)
106			error(1, errno, "poll");
107		if (ret == 0) {
108			if (!timeout_ms)
109				continue;
110
111			timeout_ms -= 10;
112			if (timeout_ms <= 0) {
113				interrupted = true;
114				break;
115			}
116
117			/* no events and more time to wait, do poll again */
118			continue;
119		}
120		if (pfd.revents != POLLIN)
121			error(1, errno, "poll: 0x%x expected 0x%x\n",
122					pfd.revents, POLLIN);
123	} while (!ret);
124}
125
126static int do_socket(bool do_tcp)
127{
128	int fd, val;
129
130	fd = socket(cfg_family, cfg_tcp ? SOCK_STREAM : SOCK_DGRAM, 0);
131	if (fd == -1)
132		error(1, errno, "socket");
133
134	val = 1 << 21;
135	if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)))
136		error(1, errno, "setsockopt rcvbuf");
137	val = 1;
138	if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)))
139		error(1, errno, "setsockopt reuseport");
140
141	if (bind(fd, (void *)&cfg_bind_addr, cfg_alen))
142		error(1, errno, "bind");
143
144	if (do_tcp) {
145		int accept_fd = fd;
146
147		if (listen(accept_fd, 1))
148			error(1, errno, "listen");
149
150		do_poll(accept_fd, cfg_connect_timeout_ms);
151		if (interrupted)
152			exit(0);
153
154		fd = accept(accept_fd, NULL, NULL);
155		if (fd == -1)
156			error(1, errno, "accept");
157		if (close(accept_fd))
158			error(1, errno, "close accept fd");
159	}
160
161	return fd;
162}
163
164/* Flush all outstanding bytes for the tcp receive queue */
165static void do_flush_tcp(int fd)
166{
167	int ret;
168
169	while (true) {
170		/* MSG_TRUNC flushes up to len bytes */
171		ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
172		if (ret == -1 && errno == EAGAIN)
173			return;
174		if (ret == -1)
175			error(1, errno, "flush");
176		if (ret == 0) {
177			/* client detached */
178			exit(0);
179		}
180
181		packets++;
182		bytes += ret;
183	}
184
185}
186
187static char sanitized_char(char val)
188{
189	return (val >= 'a' && val <= 'z') ? val : '.';
190}
191
192static void do_verify_udp(const char *data, int len)
193{
194	char cur = data[0];
195	int i;
196
197	/* verify contents */
198	if (cur < 'a' || cur > 'z')
199		error(1, 0, "data initial byte out of range");
200
201	for (i = 1; i < len; i++) {
202		if (cur == 'z')
203			cur = 'a';
204		else
205			cur++;
206
207		if (data[i] != cur)
208			error(1, 0, "data[%d]: len %d, %c(%hhu) != %c(%hhu)\n",
209			      i, len,
210			      sanitized_char(data[i]), data[i],
211			      sanitized_char(cur), cur);
212	}
213}
214
215static int recv_msg(int fd, char *buf, int len, int *gso_size)
216{
217	char control[CMSG_SPACE(sizeof(int))] = {0};
218	struct msghdr msg = {0};
219	struct iovec iov = {0};
220	struct cmsghdr *cmsg;
221	int ret;
222
223	iov.iov_base = buf;
224	iov.iov_len = len;
225
226	msg.msg_iov = &iov;
227	msg.msg_iovlen = 1;
228
229	msg.msg_control = control;
230	msg.msg_controllen = sizeof(control);
231
232	*gso_size = -1;
233	ret = recvmsg(fd, &msg, MSG_TRUNC | MSG_DONTWAIT);
234	if (ret != -1) {
235		for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL;
236		     cmsg = CMSG_NXTHDR(&msg, cmsg)) {
237			if (cmsg->cmsg_level == SOL_UDP
238			    && cmsg->cmsg_type == UDP_GRO) {
239				*gso_size = *(int *)CMSG_DATA(cmsg);
240				break;
241			}
242		}
243	}
244	return ret;
245}
246
247/* Flush all outstanding datagrams. Verify first few bytes of each. */
248static void do_flush_udp(int fd)
249{
250	static char rbuf[ETH_MAX_MTU];
251	int ret, len, gso_size = 0, budget = 256;
252
253	len = cfg_read_all ? sizeof(rbuf) : 0;
254	while (budget--) {
255		/* MSG_TRUNC will make return value full datagram length */
256		if (!cfg_expected_gso_size)
257			ret = recv(fd, rbuf, len, MSG_TRUNC | MSG_DONTWAIT);
258		else
259			ret = recv_msg(fd, rbuf, len, &gso_size);
260		if (ret == -1 && errno == EAGAIN)
261			break;
262		if (ret == -1)
263			error(1, errno, "recv");
264		if (cfg_expected_pkt_len && ret != cfg_expected_pkt_len)
265			error(1, 0, "recv: bad packet len, got %d,"
266			      " expected %d\n", ret, cfg_expected_pkt_len);
267		if (len && cfg_verify) {
268			if (ret == 0)
269				error(1, errno, "recv: 0 byte datagram\n");
270
271			do_verify_udp(rbuf, ret);
272		}
273		if (cfg_expected_gso_size && cfg_expected_gso_size != gso_size)
274			error(1, 0, "recv: bad gso size, got %d, expected %d "
275			      "(-1 == no gso cmsg))\n", gso_size,
276			      cfg_expected_gso_size);
277
278		packets++;
279		bytes += ret;
280		if (cfg_expected_pkt_nr && packets >= cfg_expected_pkt_nr)
281			break;
282	}
283}
284
285static void usage(const char *filepath)
286{
287	error(1, 0, "Usage: %s [-C connect_timeout] [-Grtv] [-b addr] [-p port]"
288	      " [-l pktlen] [-n packetnr] [-R rcv_timeout] [-S gsosize]",
289	      filepath);
290}
291
292static void parse_opts(int argc, char **argv)
293{
294	const char *bind_addr = NULL;
295	int c;
296
297	while ((c = getopt(argc, argv, "4b:C:Gl:n:p:rR:S:tv")) != -1) {
298		switch (c) {
299		case '4':
300			cfg_family = PF_INET;
301			cfg_alen = sizeof(struct sockaddr_in);
302			break;
303		case 'b':
304			bind_addr = optarg;
305			break;
306		case 'C':
307			cfg_connect_timeout_ms = strtoul(optarg, NULL, 0);
308			break;
309		case 'G':
310			cfg_gro_segment = true;
311			break;
312		case 'l':
313			cfg_expected_pkt_len = strtoul(optarg, NULL, 0);
314			break;
315		case 'n':
316			cfg_expected_pkt_nr = strtoul(optarg, NULL, 0);
317			break;
318		case 'p':
319			cfg_port = strtoul(optarg, NULL, 0);
320			break;
321		case 'r':
322			cfg_read_all = true;
323			break;
324		case 'R':
325			cfg_rcv_timeout_ms = strtoul(optarg, NULL, 0);
326			break;
327		case 'S':
328			cfg_expected_gso_size = strtol(optarg, NULL, 0);
329			break;
330		case 't':
331			cfg_tcp = true;
332			break;
333		case 'v':
334			cfg_verify = true;
335			cfg_read_all = true;
336			break;
337		default:
338			exit(1);
339		}
340	}
341
342	if (!bind_addr)
343		bind_addr = cfg_family == PF_INET6 ? "::" : "0.0.0.0";
344
345	setup_sockaddr(cfg_family, bind_addr, &cfg_bind_addr);
346
347	if (optind != argc)
348		usage(argv[0]);
349
350	if (cfg_tcp && cfg_verify)
351		error(1, 0, "TODO: implement verify mode for tcp");
352}
353
354static void do_recv(void)
355{
356	int timeout_ms = cfg_tcp ? cfg_rcv_timeout_ms : cfg_connect_timeout_ms;
357	unsigned long tnow, treport;
358	int fd;
359
360	fd = do_socket(cfg_tcp);
361
362	if (cfg_gro_segment && !cfg_tcp) {
363		int val = 1;
364		if (setsockopt(fd, IPPROTO_UDP, UDP_GRO, &val, sizeof(val)))
365			error(1, errno, "setsockopt UDP_GRO");
366	}
367
368	treport = gettimeofday_ms() + 1000;
369	do {
370		do_poll(fd, timeout_ms);
371
372		if (cfg_tcp)
373			do_flush_tcp(fd);
374		else
375			do_flush_udp(fd);
376
377		tnow = gettimeofday_ms();
378		if (!cfg_expected_pkt_nr && tnow > treport) {
379			if (packets)
380				fprintf(stderr,
381					"%s rx: %6lu MB/s %8lu calls/s\n",
382					cfg_tcp ? "tcp" : "udp",
383					bytes >> 20, packets);
384			bytes = packets = 0;
385			treport = tnow + 1000;
386		}
387
388		timeout_ms = cfg_rcv_timeout_ms;
389
390	} while (!interrupted);
391
392	if (cfg_expected_pkt_nr && (packets != cfg_expected_pkt_nr))
393		error(1, 0, "wrong packet number! got %ld, expected %d\n",
394		      packets, cfg_expected_pkt_nr);
395
396	if (close(fd))
397		error(1, errno, "close");
398}
399
400int main(int argc, char **argv)
401{
402	parse_opts(argc, argv);
403
404	signal(SIGINT, sigint_handler);
405
406	do_recv();
407
408	return 0;
409}
410