xref: /kernel/linux/linux-5.10/samples/bpf/xsk_fwd.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0
2/* Copyright(c) 2020 Intel Corporation. */
3
4#define _GNU_SOURCE
5#include <poll.h>
6#include <pthread.h>
7#include <signal.h>
8#include <sched.h>
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <sys/mman.h>
13#include <sys/resource.h>
14#include <sys/socket.h>
15#include <sys/types.h>
16#include <time.h>
17#include <unistd.h>
18#include <getopt.h>
19#include <netinet/ether.h>
20#include <net/if.h>
21
22#include <linux/bpf.h>
23#include <linux/if_link.h>
24#include <linux/if_xdp.h>
25
26#include <bpf/libbpf.h>
27#include <bpf/xsk.h>
28#include <bpf/bpf.h>
29
30#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
31
32typedef __u64 u64;
33typedef __u32 u32;
34typedef __u16 u16;
35typedef __u8  u8;
36
37/* This program illustrates the packet forwarding between multiple AF_XDP
38 * sockets in multi-threaded environment. All threads are sharing a common
39 * buffer pool, with each socket having its own private buffer cache.
40 *
41 * Example 1: Single thread handling two sockets. The packets received by socket
42 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
43 * QB), while the packets received by socket B are forwarded to socket A. The
44 * thread is running on CPU core X:
45 *
46 *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
47 *
48 * Example 2: Two threads, each handling two sockets. The thread running on CPU
49 * core X forwards all the packets received by socket A to socket B, and all the
50 * packets received by socket B to socket A. The thread running on CPU core Y is
51 * performing the same packet forwarding between sockets C and D:
52 *
53 *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
54 *         -c CX -c CY
55 */
56
57/*
58 * Buffer pool and buffer cache
59 *
60 * For packet forwarding, the packet buffers are typically allocated from the
61 * pool for packet reception and freed back to the pool for further reuse once
62 * the packet transmission is completed.
63 *
64 * The buffer pool is shared between multiple threads. In order to minimize the
65 * access latency to the shared buffer pool, each thread creates one (or
66 * several) buffer caches, which, unlike the buffer pool, are private to the
67 * thread that creates them and therefore cannot be shared with other threads.
68 * The access to the shared pool is only needed either (A) when the cache gets
69 * empty due to repeated buffer allocations and it needs to be replenished from
70 * the pool, or (B) when the cache gets full due to repeated buffer free and it
71 * needs to be flushed back to the pull.
72 *
73 * In a packet forwarding system, a packet received on any input port can
74 * potentially be transmitted on any output port, depending on the forwarding
75 * configuration. For AF_XDP sockets, for this to work with zero-copy of the
76 * packet buffers when, it is required that the buffer pool memory fits into the
77 * UMEM area shared by all the sockets.
78 */
79
80struct bpool_params {
81	u32 n_buffers;
82	u32 buffer_size;
83	int mmap_flags;
84
85	u32 n_users_max;
86	u32 n_buffers_per_slab;
87};
88
89/* This buffer pool implementation organizes the buffers into equally sized
90 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
91 * pool that are completely filled with buffer pointers (full slabs).
92 *
93 * Each buffer cache has a slab for buffer allocation and a slab for buffer
94 * free, with both of these slabs initially empty. When the cache's allocation
95 * slab goes empty, it is swapped with one of the available full slabs from the
96 * pool, if any is available. When the cache's free slab goes full, it is
97 * swapped for one of the empty slabs from the pool, which is guaranteed to
98 * succeed.
99 *
100 * Partially filled slabs never get traded between the cache and the pool
101 * (except when the cache itself is destroyed), which enables fast operation
102 * through pointer swapping.
103 */
104struct bpool {
105	struct bpool_params params;
106	pthread_mutex_t lock;
107	void *addr;
108
109	u64 **slabs;
110	u64 **slabs_reserved;
111	u64 *buffers;
112	u64 *buffers_reserved;
113
114	u64 n_slabs;
115	u64 n_slabs_reserved;
116	u64 n_buffers;
117
118	u64 n_slabs_available;
119	u64 n_slabs_reserved_available;
120
121	struct xsk_umem_config umem_cfg;
122	struct xsk_ring_prod umem_fq;
123	struct xsk_ring_cons umem_cq;
124	struct xsk_umem *umem;
125};
126
127static struct bpool *
128bpool_init(struct bpool_params *params,
129	   struct xsk_umem_config *umem_cfg)
130{
131	struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
132	u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
133	u64 slabs_size, slabs_reserved_size;
134	u64 buffers_size, buffers_reserved_size;
135	u64 total_size, i;
136	struct bpool *bp;
137	u8 *p;
138	int status;
139
140	/* mmap prep. */
141	if (setrlimit(RLIMIT_MEMLOCK, &r))
142		return NULL;
143
144	/* bpool internals dimensioning. */
145	n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
146		params->n_buffers_per_slab;
147	n_slabs_reserved = params->n_users_max * 2;
148	n_buffers = n_slabs * params->n_buffers_per_slab;
149	n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
150
151	slabs_size = n_slabs * sizeof(u64 *);
152	slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
153	buffers_size = n_buffers * sizeof(u64);
154	buffers_reserved_size = n_buffers_reserved * sizeof(u64);
155
156	total_size = sizeof(struct bpool) +
157		slabs_size + slabs_reserved_size +
158		buffers_size + buffers_reserved_size;
159
160	/* bpool memory allocation. */
161	p = calloc(total_size, sizeof(u8));
162	if (!p)
163		return NULL;
164
165	/* bpool memory initialization. */
166	bp = (struct bpool *)p;
167	memcpy(&bp->params, params, sizeof(*params));
168	bp->params.n_buffers = n_buffers;
169
170	bp->slabs = (u64 **)&p[sizeof(struct bpool)];
171	bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
172		slabs_size];
173	bp->buffers = (u64 *)&p[sizeof(struct bpool) +
174		slabs_size + slabs_reserved_size];
175	bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
176		slabs_size + slabs_reserved_size + buffers_size];
177
178	bp->n_slabs = n_slabs;
179	bp->n_slabs_reserved = n_slabs_reserved;
180	bp->n_buffers = n_buffers;
181
182	for (i = 0; i < n_slabs; i++)
183		bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
184	bp->n_slabs_available = n_slabs;
185
186	for (i = 0; i < n_slabs_reserved; i++)
187		bp->slabs_reserved[i] = &bp->buffers_reserved[i *
188			params->n_buffers_per_slab];
189	bp->n_slabs_reserved_available = n_slabs_reserved;
190
191	for (i = 0; i < n_buffers; i++)
192		bp->buffers[i] = i * params->buffer_size;
193
194	/* lock. */
195	status = pthread_mutex_init(&bp->lock, NULL);
196	if (status) {
197		free(p);
198		return NULL;
199	}
200
201	/* mmap. */
202	bp->addr = mmap(NULL,
203			n_buffers * params->buffer_size,
204			PROT_READ | PROT_WRITE,
205			MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
206			-1,
207			0);
208	if (bp->addr == MAP_FAILED) {
209		pthread_mutex_destroy(&bp->lock);
210		free(p);
211		return NULL;
212	}
213
214	/* umem. */
215	status = xsk_umem__create(&bp->umem,
216				  bp->addr,
217				  bp->params.n_buffers * bp->params.buffer_size,
218				  &bp->umem_fq,
219				  &bp->umem_cq,
220				  umem_cfg);
221	if (status) {
222		munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
223		pthread_mutex_destroy(&bp->lock);
224		free(p);
225		return NULL;
226	}
227	memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
228
229	return bp;
230}
231
232static void
233bpool_free(struct bpool *bp)
234{
235	if (!bp)
236		return;
237
238	xsk_umem__delete(bp->umem);
239	munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
240	pthread_mutex_destroy(&bp->lock);
241	free(bp);
242}
243
244struct bcache {
245	struct bpool *bp;
246
247	u64 *slab_cons;
248	u64 *slab_prod;
249
250	u64 n_buffers_cons;
251	u64 n_buffers_prod;
252};
253
254static u32
255bcache_slab_size(struct bcache *bc)
256{
257	struct bpool *bp = bc->bp;
258
259	return bp->params.n_buffers_per_slab;
260}
261
262static struct bcache *
263bcache_init(struct bpool *bp)
264{
265	struct bcache *bc;
266
267	bc = calloc(1, sizeof(struct bcache));
268	if (!bc)
269		return NULL;
270
271	bc->bp = bp;
272	bc->n_buffers_cons = 0;
273	bc->n_buffers_prod = 0;
274
275	pthread_mutex_lock(&bp->lock);
276	if (bp->n_slabs_reserved_available == 0) {
277		pthread_mutex_unlock(&bp->lock);
278		free(bc);
279		return NULL;
280	}
281
282	bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
283	bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
284	bp->n_slabs_reserved_available -= 2;
285	pthread_mutex_unlock(&bp->lock);
286
287	return bc;
288}
289
290static void
291bcache_free(struct bcache *bc)
292{
293	struct bpool *bp;
294
295	if (!bc)
296		return;
297
298	/* In order to keep this example simple, the case of freeing any
299	 * existing buffers from the cache back to the pool is ignored.
300	 */
301
302	bp = bc->bp;
303	pthread_mutex_lock(&bp->lock);
304	bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
305	bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
306	bp->n_slabs_reserved_available += 2;
307	pthread_mutex_unlock(&bp->lock);
308
309	free(bc);
310}
311
312/* To work correctly, the implementation requires that the *n_buffers* input
313 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
314 * is typically the case, with one exception taking place when large number of
315 * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
316 */
317static inline u32
318bcache_cons_check(struct bcache *bc, u32 n_buffers)
319{
320	struct bpool *bp = bc->bp;
321	u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
322	u64 n_buffers_cons = bc->n_buffers_cons;
323	u64 n_slabs_available;
324	u64 *slab_full;
325
326	/*
327	 * Consumer slab is not empty: Use what's available locally. Do not
328	 * look for more buffers from the pool when the ask can only be
329	 * partially satisfied.
330	 */
331	if (n_buffers_cons)
332		return (n_buffers_cons < n_buffers) ?
333			n_buffers_cons :
334			n_buffers;
335
336	/*
337	 * Consumer slab is empty: look to trade the current consumer slab
338	 * (full) for a full slab from the pool, if any is available.
339	 */
340	pthread_mutex_lock(&bp->lock);
341	n_slabs_available = bp->n_slabs_available;
342	if (!n_slabs_available) {
343		pthread_mutex_unlock(&bp->lock);
344		return 0;
345	}
346
347	n_slabs_available--;
348	slab_full = bp->slabs[n_slabs_available];
349	bp->slabs[n_slabs_available] = bc->slab_cons;
350	bp->n_slabs_available = n_slabs_available;
351	pthread_mutex_unlock(&bp->lock);
352
353	bc->slab_cons = slab_full;
354	bc->n_buffers_cons = n_buffers_per_slab;
355	return n_buffers;
356}
357
358static inline u64
359bcache_cons(struct bcache *bc)
360{
361	u64 n_buffers_cons = bc->n_buffers_cons - 1;
362	u64 buffer;
363
364	buffer = bc->slab_cons[n_buffers_cons];
365	bc->n_buffers_cons = n_buffers_cons;
366	return buffer;
367}
368
369static inline void
370bcache_prod(struct bcache *bc, u64 buffer)
371{
372	struct bpool *bp = bc->bp;
373	u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
374	u64 n_buffers_prod = bc->n_buffers_prod;
375	u64 n_slabs_available;
376	u64 *slab_empty;
377
378	/*
379	 * Producer slab is not yet full: store the current buffer to it.
380	 */
381	if (n_buffers_prod < n_buffers_per_slab) {
382		bc->slab_prod[n_buffers_prod] = buffer;
383		bc->n_buffers_prod = n_buffers_prod + 1;
384		return;
385	}
386
387	/*
388	 * Producer slab is full: trade the cache's current producer slab
389	 * (full) for an empty slab from the pool, then store the current
390	 * buffer to the new producer slab. As one full slab exists in the
391	 * cache, it is guaranteed that there is at least one empty slab
392	 * available in the pool.
393	 */
394	pthread_mutex_lock(&bp->lock);
395	n_slabs_available = bp->n_slabs_available;
396	slab_empty = bp->slabs[n_slabs_available];
397	bp->slabs[n_slabs_available] = bc->slab_prod;
398	bp->n_slabs_available = n_slabs_available + 1;
399	pthread_mutex_unlock(&bp->lock);
400
401	slab_empty[0] = buffer;
402	bc->slab_prod = slab_empty;
403	bc->n_buffers_prod = 1;
404}
405
406/*
407 * Port
408 *
409 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
410 * packet forwarding to happen with no packet buffer copy, all the sockets need
411 * to share the same UMEM area, which is used as the buffer pool memory.
412 */
413#ifndef MAX_BURST_RX
414#define MAX_BURST_RX 64
415#endif
416
417#ifndef MAX_BURST_TX
418#define MAX_BURST_TX 64
419#endif
420
421struct burst_rx {
422	u64 addr[MAX_BURST_RX];
423	u32 len[MAX_BURST_RX];
424};
425
426struct burst_tx {
427	u64 addr[MAX_BURST_TX];
428	u32 len[MAX_BURST_TX];
429	u32 n_pkts;
430};
431
432struct port_params {
433	struct xsk_socket_config xsk_cfg;
434	struct bpool *bp;
435	const char *iface;
436	u32 iface_queue;
437};
438
439struct port {
440	struct port_params params;
441
442	struct bcache *bc;
443
444	struct xsk_ring_cons rxq;
445	struct xsk_ring_prod txq;
446	struct xsk_ring_prod umem_fq;
447	struct xsk_ring_cons umem_cq;
448	struct xsk_socket *xsk;
449	int umem_fq_initialized;
450
451	u64 n_pkts_rx;
452	u64 n_pkts_tx;
453};
454
455static void
456port_free(struct port *p)
457{
458	if (!p)
459		return;
460
461	/* To keep this example simple, the code to free the buffers from the
462	 * socket's receive and transmit queues, as well as from the UMEM fill
463	 * and completion queues, is not included.
464	 */
465
466	if (p->xsk)
467		xsk_socket__delete(p->xsk);
468
469	bcache_free(p->bc);
470
471	free(p);
472}
473
474static struct port *
475port_init(struct port_params *params)
476{
477	struct port *p;
478	u32 umem_fq_size, pos = 0;
479	int status, i;
480
481	/* Memory allocation and initialization. */
482	p = calloc(sizeof(struct port), 1);
483	if (!p)
484		return NULL;
485
486	memcpy(&p->params, params, sizeof(p->params));
487	umem_fq_size = params->bp->umem_cfg.fill_size;
488
489	/* bcache. */
490	p->bc = bcache_init(params->bp);
491	if (!p->bc ||
492	    (bcache_slab_size(p->bc) < umem_fq_size) ||
493	    (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
494		port_free(p);
495		return NULL;
496	}
497
498	/* xsk socket. */
499	status = xsk_socket__create_shared(&p->xsk,
500					   params->iface,
501					   params->iface_queue,
502					   params->bp->umem,
503					   &p->rxq,
504					   &p->txq,
505					   &p->umem_fq,
506					   &p->umem_cq,
507					   &params->xsk_cfg);
508	if (status) {
509		port_free(p);
510		return NULL;
511	}
512
513	/* umem fq. */
514	xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
515
516	for (i = 0; i < umem_fq_size; i++)
517		*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
518			bcache_cons(p->bc);
519
520	xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
521	p->umem_fq_initialized = 1;
522
523	return p;
524}
525
526static inline u32
527port_rx_burst(struct port *p, struct burst_rx *b)
528{
529	u32 n_pkts, pos, i;
530
531	/* Free buffers for FQ replenish. */
532	n_pkts = ARRAY_SIZE(b->addr);
533
534	n_pkts = bcache_cons_check(p->bc, n_pkts);
535	if (!n_pkts)
536		return 0;
537
538	/* RXQ. */
539	n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
540	if (!n_pkts) {
541		if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
542			struct pollfd pollfd = {
543				.fd = xsk_socket__fd(p->xsk),
544				.events = POLLIN,
545			};
546
547			poll(&pollfd, 1, 0);
548		}
549		return 0;
550	}
551
552	for (i = 0; i < n_pkts; i++) {
553		b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
554		b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
555	}
556
557	xsk_ring_cons__release(&p->rxq, n_pkts);
558	p->n_pkts_rx += n_pkts;
559
560	/* UMEM FQ. */
561	for ( ; ; ) {
562		int status;
563
564		status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
565		if (status == n_pkts)
566			break;
567
568		if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
569			struct pollfd pollfd = {
570				.fd = xsk_socket__fd(p->xsk),
571				.events = POLLIN,
572			};
573
574			poll(&pollfd, 1, 0);
575		}
576	}
577
578	for (i = 0; i < n_pkts; i++)
579		*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
580			bcache_cons(p->bc);
581
582	xsk_ring_prod__submit(&p->umem_fq, n_pkts);
583
584	return n_pkts;
585}
586
587static inline void
588port_tx_burst(struct port *p, struct burst_tx *b)
589{
590	u32 n_pkts, pos, i;
591	int status;
592
593	/* UMEM CQ. */
594	n_pkts = p->params.bp->umem_cfg.comp_size;
595
596	n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
597
598	for (i = 0; i < n_pkts; i++) {
599		u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
600
601		bcache_prod(p->bc, addr);
602	}
603
604	xsk_ring_cons__release(&p->umem_cq, n_pkts);
605
606	/* TXQ. */
607	n_pkts = b->n_pkts;
608
609	for ( ; ; ) {
610		status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
611		if (status == n_pkts)
612			break;
613
614		if (xsk_ring_prod__needs_wakeup(&p->txq))
615			sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
616			       NULL, 0);
617	}
618
619	for (i = 0; i < n_pkts; i++) {
620		xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
621		xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
622	}
623
624	xsk_ring_prod__submit(&p->txq, n_pkts);
625	if (xsk_ring_prod__needs_wakeup(&p->txq))
626		sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
627	p->n_pkts_tx += n_pkts;
628}
629
630/*
631 * Thread
632 *
633 * Packet forwarding threads.
634 */
635#ifndef MAX_PORTS_PER_THREAD
636#define MAX_PORTS_PER_THREAD 16
637#endif
638
639struct thread_data {
640	struct port *ports_rx[MAX_PORTS_PER_THREAD];
641	struct port *ports_tx[MAX_PORTS_PER_THREAD];
642	u32 n_ports_rx;
643	struct burst_rx burst_rx;
644	struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
645	u32 cpu_core_id;
646	int quit;
647};
648
649static void swap_mac_addresses(void *data)
650{
651	struct ether_header *eth = (struct ether_header *)data;
652	struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
653	struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
654	struct ether_addr tmp;
655
656	tmp = *src_addr;
657	*src_addr = *dst_addr;
658	*dst_addr = tmp;
659}
660
661static void *
662thread_func(void *arg)
663{
664	struct thread_data *t = arg;
665	cpu_set_t cpu_cores;
666	u32 i;
667
668	CPU_ZERO(&cpu_cores);
669	CPU_SET(t->cpu_core_id, &cpu_cores);
670	pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
671
672	for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
673		struct port *port_rx = t->ports_rx[i];
674		struct port *port_tx = t->ports_tx[i];
675		struct burst_rx *brx = &t->burst_rx;
676		struct burst_tx *btx = &t->burst_tx[i];
677		u32 n_pkts, j;
678
679		/* RX. */
680		n_pkts = port_rx_burst(port_rx, brx);
681		if (!n_pkts)
682			continue;
683
684		/* Process & TX. */
685		for (j = 0; j < n_pkts; j++) {
686			u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
687			u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
688						     addr);
689
690			swap_mac_addresses(pkt);
691
692			btx->addr[btx->n_pkts] = brx->addr[j];
693			btx->len[btx->n_pkts] = brx->len[j];
694			btx->n_pkts++;
695
696			if (btx->n_pkts == MAX_BURST_TX) {
697				port_tx_burst(port_tx, btx);
698				btx->n_pkts = 0;
699			}
700		}
701	}
702
703	return NULL;
704}
705
706/*
707 * Process
708 */
709static const struct bpool_params bpool_params_default = {
710	.n_buffers = 64 * 1024,
711	.buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
712	.mmap_flags = 0,
713
714	.n_users_max = 16,
715	.n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
716};
717
718static const struct xsk_umem_config umem_cfg_default = {
719	.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
720	.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
721	.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
722	.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
723	.flags = 0,
724};
725
726static const struct port_params port_params_default = {
727	.xsk_cfg = {
728		.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
729		.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
730		.libbpf_flags = 0,
731		.xdp_flags = XDP_FLAGS_DRV_MODE,
732		.bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
733	},
734
735	.bp = NULL,
736	.iface = NULL,
737	.iface_queue = 0,
738};
739
740#ifndef MAX_PORTS
741#define MAX_PORTS 64
742#endif
743
744#ifndef MAX_THREADS
745#define MAX_THREADS 64
746#endif
747
748static struct bpool_params bpool_params;
749static struct xsk_umem_config umem_cfg;
750static struct bpool *bp;
751
752static struct port_params port_params[MAX_PORTS];
753static struct port *ports[MAX_PORTS];
754static u64 n_pkts_rx[MAX_PORTS];
755static u64 n_pkts_tx[MAX_PORTS];
756static int n_ports;
757
758static pthread_t threads[MAX_THREADS];
759static struct thread_data thread_data[MAX_THREADS];
760static int n_threads;
761
762static void
763print_usage(char *prog_name)
764{
765	const char *usage =
766		"Usage:\n"
767		"\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
768		"\n"
769		"-c CORE        CPU core to run a packet forwarding thread\n"
770		"               on. May be invoked multiple times.\n"
771		"\n"
772		"-b SIZE        Number of buffers in the buffer pool shared\n"
773		"               by all the forwarding threads. Default: %u.\n"
774		"\n"
775		"-i INTERFACE   Network interface. Each (INTERFACE, QUEUE)\n"
776		"               pair specifies one forwarding port. May be\n"
777		"               invoked multiple times.\n"
778		"\n"
779		"-q QUEUE       Network interface queue for RX and TX. Each\n"
780		"               (INTERFACE, QUEUE) pair specified one\n"
781		"               forwarding port. Default: %u. May be invoked\n"
782		"               multiple times.\n"
783		"\n";
784	printf(usage,
785	       prog_name,
786	       bpool_params_default.n_buffers,
787	       port_params_default.iface_queue);
788}
789
790static int
791parse_args(int argc, char **argv)
792{
793	struct option lgopts[] = {
794		{ NULL,  0, 0, 0 }
795	};
796	int opt, option_index;
797
798	/* Parse the input arguments. */
799	for ( ; ;) {
800		opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
801		if (opt == EOF)
802			break;
803
804		switch (opt) {
805		case 'b':
806			bpool_params.n_buffers = atoi(optarg);
807			break;
808
809		case 'c':
810			if (n_threads == MAX_THREADS) {
811				printf("Max number of threads (%d) reached.\n",
812				       MAX_THREADS);
813				return -1;
814			}
815
816			thread_data[n_threads].cpu_core_id = atoi(optarg);
817			n_threads++;
818			break;
819
820		case 'i':
821			if (n_ports == MAX_PORTS) {
822				printf("Max number of ports (%d) reached.\n",
823				       MAX_PORTS);
824				return -1;
825			}
826
827			port_params[n_ports].iface = optarg;
828			port_params[n_ports].iface_queue = 0;
829			n_ports++;
830			break;
831
832		case 'q':
833			if (n_ports == 0) {
834				printf("No port specified for queue.\n");
835				return -1;
836			}
837			port_params[n_ports - 1].iface_queue = atoi(optarg);
838			break;
839
840		default:
841			printf("Illegal argument.\n");
842			return -1;
843		}
844	}
845
846	optind = 1; /* reset getopt lib */
847
848	/* Check the input arguments. */
849	if (!n_ports) {
850		printf("No ports specified.\n");
851		return -1;
852	}
853
854	if (!n_threads) {
855		printf("No threads specified.\n");
856		return -1;
857	}
858
859	if (n_ports % n_threads) {
860		printf("Ports cannot be evenly distributed to threads.\n");
861		return -1;
862	}
863
864	return 0;
865}
866
867static void
868print_port(u32 port_id)
869{
870	struct port *port = ports[port_id];
871
872	printf("Port %u: interface = %s, queue = %u\n",
873	       port_id, port->params.iface, port->params.iface_queue);
874}
875
876static void
877print_thread(u32 thread_id)
878{
879	struct thread_data *t = &thread_data[thread_id];
880	u32 i;
881
882	printf("Thread %u (CPU core %u): ",
883	       thread_id, t->cpu_core_id);
884
885	for (i = 0; i < t->n_ports_rx; i++) {
886		struct port *port_rx = t->ports_rx[i];
887		struct port *port_tx = t->ports_tx[i];
888
889		printf("(%s, %u) -> (%s, %u), ",
890		       port_rx->params.iface,
891		       port_rx->params.iface_queue,
892		       port_tx->params.iface,
893		       port_tx->params.iface_queue);
894	}
895
896	printf("\n");
897}
898
899static void
900print_port_stats_separator(void)
901{
902	printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
903	       "----",
904	       "------------",
905	       "-------------",
906	       "------------",
907	       "-------------");
908}
909
910static void
911print_port_stats_header(void)
912{
913	print_port_stats_separator();
914	printf("| %4s | %12s | %13s | %12s | %13s |\n",
915	       "Port",
916	       "RX packets",
917	       "RX rate (pps)",
918	       "TX packets",
919	       "TX_rate (pps)");
920	print_port_stats_separator();
921}
922
923static void
924print_port_stats_trailer(void)
925{
926	print_port_stats_separator();
927	printf("\n");
928}
929
930static void
931print_port_stats(int port_id, u64 ns_diff)
932{
933	struct port *p = ports[port_id];
934	double rx_pps, tx_pps;
935
936	rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
937	tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
938
939	printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
940	       port_id,
941	       p->n_pkts_rx,
942	       rx_pps,
943	       p->n_pkts_tx,
944	       tx_pps);
945
946	n_pkts_rx[port_id] = p->n_pkts_rx;
947	n_pkts_tx[port_id] = p->n_pkts_tx;
948}
949
950static void
951print_port_stats_all(u64 ns_diff)
952{
953	int i;
954
955	print_port_stats_header();
956	for (i = 0; i < n_ports; i++)
957		print_port_stats(i, ns_diff);
958	print_port_stats_trailer();
959}
960
961static int quit;
962
963static void
964signal_handler(int sig)
965{
966	quit = 1;
967}
968
969static void remove_xdp_program(void)
970{
971	int i;
972
973	for (i = 0 ; i < n_ports; i++)
974		bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
975				    port_params[i].xsk_cfg.xdp_flags);
976}
977
978int main(int argc, char **argv)
979{
980	struct timespec time;
981	u64 ns0;
982	int i;
983
984	/* Parse args. */
985	memcpy(&bpool_params, &bpool_params_default,
986	       sizeof(struct bpool_params));
987	memcpy(&umem_cfg, &umem_cfg_default,
988	       sizeof(struct xsk_umem_config));
989	for (i = 0; i < MAX_PORTS; i++)
990		memcpy(&port_params[i], &port_params_default,
991		       sizeof(struct port_params));
992
993	if (parse_args(argc, argv)) {
994		print_usage(argv[0]);
995		return -1;
996	}
997
998	/* Buffer pool initialization. */
999	bp = bpool_init(&bpool_params, &umem_cfg);
1000	if (!bp) {
1001		printf("Buffer pool initialization failed.\n");
1002		return -1;
1003	}
1004	printf("Buffer pool created successfully.\n");
1005
1006	/* Ports initialization. */
1007	for (i = 0; i < MAX_PORTS; i++)
1008		port_params[i].bp = bp;
1009
1010	for (i = 0; i < n_ports; i++) {
1011		ports[i] = port_init(&port_params[i]);
1012		if (!ports[i]) {
1013			printf("Port %d initialization failed.\n", i);
1014			return -1;
1015		}
1016		print_port(i);
1017	}
1018	printf("All ports created successfully.\n");
1019
1020	/* Threads. */
1021	for (i = 0; i < n_threads; i++) {
1022		struct thread_data *t = &thread_data[i];
1023		u32 n_ports_per_thread = n_ports / n_threads, j;
1024
1025		for (j = 0; j < n_ports_per_thread; j++) {
1026			t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1027			t->ports_tx[j] = ports[i * n_ports_per_thread +
1028				(j + 1) % n_ports_per_thread];
1029		}
1030
1031		t->n_ports_rx = n_ports_per_thread;
1032
1033		print_thread(i);
1034	}
1035
1036	for (i = 0; i < n_threads; i++) {
1037		int status;
1038
1039		status = pthread_create(&threads[i],
1040					NULL,
1041					thread_func,
1042					&thread_data[i]);
1043		if (status) {
1044			printf("Thread %d creation failed.\n", i);
1045			return -1;
1046		}
1047	}
1048	printf("All threads created successfully.\n");
1049
1050	/* Print statistics. */
1051	signal(SIGINT, signal_handler);
1052	signal(SIGTERM, signal_handler);
1053	signal(SIGABRT, signal_handler);
1054
1055	clock_gettime(CLOCK_MONOTONIC, &time);
1056	ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1057	for ( ; !quit; ) {
1058		u64 ns1, ns_diff;
1059
1060		sleep(1);
1061		clock_gettime(CLOCK_MONOTONIC, &time);
1062		ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1063		ns_diff = ns1 - ns0;
1064		ns0 = ns1;
1065
1066		print_port_stats_all(ns_diff);
1067	}
1068
1069	/* Threads completion. */
1070	printf("Quit.\n");
1071	for (i = 0; i < n_threads; i++)
1072		thread_data[i].quit = 1;
1073
1074	for (i = 0; i < n_threads; i++)
1075		pthread_join(threads[i], NULL);
1076
1077	for (i = 0; i < n_ports; i++)
1078		port_free(ports[i]);
1079
1080	bpool_free(bp);
1081
1082	remove_xdp_program();
1083
1084	return 0;
1085}
1086