1// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 2/* 3 * Ring buffer operations. 4 * 5 * Copyright (C) 2020 Facebook, Inc. 6 */ 7#ifndef _GNU_SOURCE 8#define _GNU_SOURCE 9#endif 10#include <stdlib.h> 11#include <stdio.h> 12#include <errno.h> 13#include <unistd.h> 14#include <linux/err.h> 15#include <linux/bpf.h> 16#include <asm/barrier.h> 17#include <sys/mman.h> 18#include <sys/epoll.h> 19 20#include "libbpf.h" 21#include "libbpf_internal.h" 22#include "bpf.h" 23 24struct ring { 25 ring_buffer_sample_fn sample_cb; 26 void *ctx; 27 void *data; 28 unsigned long *consumer_pos; 29 unsigned long *producer_pos; 30 unsigned long mask; 31 int map_fd; 32}; 33 34struct ring_buffer { 35 struct epoll_event *events; 36 struct ring *rings; 37 size_t page_size; 38 int epoll_fd; 39 int ring_cnt; 40}; 41 42static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) 43{ 44 if (r->consumer_pos) { 45 munmap(r->consumer_pos, rb->page_size); 46 r->consumer_pos = NULL; 47 } 48 if (r->producer_pos) { 49 munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1)); 50 r->producer_pos = NULL; 51 } 52} 53 54/* Add extra RINGBUF maps to this ring buffer manager */ 55int ring_buffer__add(struct ring_buffer *rb, int map_fd, 56 ring_buffer_sample_fn sample_cb, void *ctx) 57{ 58 struct bpf_map_info info; 59 __u32 len = sizeof(info); 60 struct epoll_event *e; 61 struct ring *r; 62 __u64 mmap_sz; 63 void *tmp; 64 int err; 65 66 memset(&info, 0, sizeof(info)); 67 68 err = bpf_obj_get_info_by_fd(map_fd, &info, &len); 69 if (err) { 70 err = -errno; 71 pr_warn("ringbuf: failed to get map info for fd=%d: %d\n", 72 map_fd, err); 73 return err; 74 } 75 76 if (info.type != BPF_MAP_TYPE_RINGBUF) { 77 pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n", 78 map_fd); 79 return -EINVAL; 80 } 81 82 tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings)); 83 if (!tmp) 84 return -ENOMEM; 85 rb->rings = tmp; 86 87 tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events)); 88 if (!tmp) 89 return -ENOMEM; 90 rb->events = tmp; 91 92 r = &rb->rings[rb->ring_cnt]; 93 memset(r, 0, sizeof(*r)); 94 95 r->map_fd = map_fd; 96 r->sample_cb = sample_cb; 97 r->ctx = ctx; 98 r->mask = info.max_entries - 1; 99 100 /* Map writable consumer page */ 101 tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0); 102 if (tmp == MAP_FAILED) { 103 err = -errno; 104 pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n", 105 map_fd, err); 106 return err; 107 } 108 r->consumer_pos = tmp; 109 110 /* Map read-only producer page and data pages. We map twice as big 111 * data size to allow simple reading of samples that wrap around the 112 * end of a ring buffer. See kernel implementation for details. 113 * */ 114 mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; 115 if (mmap_sz != (__u64)(size_t)mmap_sz) { 116 pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries); 117 return -E2BIG; 118 } 119 tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size); 120 if (tmp == MAP_FAILED) { 121 err = -errno; 122 ringbuf_unmap_ring(rb, r); 123 pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n", 124 map_fd, err); 125 return err; 126 } 127 r->producer_pos = tmp; 128 r->data = tmp + rb->page_size; 129 130 e = &rb->events[rb->ring_cnt]; 131 memset(e, 0, sizeof(*e)); 132 133 e->events = EPOLLIN; 134 e->data.fd = rb->ring_cnt; 135 if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) { 136 err = -errno; 137 ringbuf_unmap_ring(rb, r); 138 pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n", 139 map_fd, err); 140 return err; 141 } 142 143 rb->ring_cnt++; 144 return 0; 145} 146 147void ring_buffer__free(struct ring_buffer *rb) 148{ 149 int i; 150 151 if (!rb) 152 return; 153 154 for (i = 0; i < rb->ring_cnt; ++i) 155 ringbuf_unmap_ring(rb, &rb->rings[i]); 156 if (rb->epoll_fd >= 0) 157 close(rb->epoll_fd); 158 159 free(rb->events); 160 free(rb->rings); 161 free(rb); 162} 163 164struct ring_buffer * 165ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx, 166 const struct ring_buffer_opts *opts) 167{ 168 struct ring_buffer *rb; 169 int err; 170 171 if (!OPTS_VALID(opts, ring_buffer_opts)) 172 return NULL; 173 174 rb = calloc(1, sizeof(*rb)); 175 if (!rb) 176 return NULL; 177 178 rb->page_size = getpagesize(); 179 180 rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 181 if (rb->epoll_fd < 0) { 182 err = -errno; 183 pr_warn("ringbuf: failed to create epoll instance: %d\n", err); 184 goto err_out; 185 } 186 187 err = ring_buffer__add(rb, map_fd, sample_cb, ctx); 188 if (err) 189 goto err_out; 190 191 return rb; 192 193err_out: 194 ring_buffer__free(rb); 195 return NULL; 196} 197 198static inline int roundup_len(__u32 len) 199{ 200 /* clear out top 2 bits (discard and busy, if set) */ 201 len <<= 2; 202 len >>= 2; 203 /* add length prefix */ 204 len += BPF_RINGBUF_HDR_SZ; 205 /* round up to 8 byte alignment */ 206 return (len + 7) / 8 * 8; 207} 208 209static int64_t ringbuf_process_ring(struct ring* r) 210{ 211 int *len_ptr, len, err; 212 /* 64-bit to avoid overflow in case of extreme application behavior */ 213 int64_t cnt = 0; 214 unsigned long cons_pos, prod_pos; 215 bool got_new_data; 216 void *sample; 217 218 cons_pos = smp_load_acquire(r->consumer_pos); 219 do { 220 got_new_data = false; 221 prod_pos = smp_load_acquire(r->producer_pos); 222 while (cons_pos < prod_pos) { 223 len_ptr = r->data + (cons_pos & r->mask); 224 len = smp_load_acquire(len_ptr); 225 226 /* sample not committed yet, bail out for now */ 227 if (len & BPF_RINGBUF_BUSY_BIT) 228 goto done; 229 230 got_new_data = true; 231 cons_pos += roundup_len(len); 232 233 if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { 234 sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; 235 err = r->sample_cb(r->ctx, sample, len); 236 if (err < 0) { 237 /* update consumer pos and bail out */ 238 smp_store_release(r->consumer_pos, 239 cons_pos); 240 return err; 241 } 242 cnt++; 243 } 244 245 smp_store_release(r->consumer_pos, cons_pos); 246 } 247 } while (got_new_data); 248done: 249 return cnt; 250} 251 252/* Consume available ring buffer(s) data without event polling. 253 * Returns number of records consumed across all registered ring buffers (or 254 * INT_MAX, whichever is less), or negative number if any of the callbacks 255 * return error. 256 */ 257int ring_buffer__consume(struct ring_buffer *rb) 258{ 259 int64_t err, res = 0; 260 int i; 261 262 for (i = 0; i < rb->ring_cnt; i++) { 263 struct ring *ring = &rb->rings[i]; 264 265 err = ringbuf_process_ring(ring); 266 if (err < 0) 267 return err; 268 res += err; 269 } 270 if (res > INT_MAX) 271 return INT_MAX; 272 return res; 273} 274 275/* Poll for available data and consume records, if any are available. 276 * Returns number of records consumed (or INT_MAX, whichever is less), or 277 * negative number, if any of the registered callbacks returned error. 278 */ 279int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) 280{ 281 int i, cnt; 282 int64_t err, res = 0; 283 284 cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms); 285 if (cnt < 0) 286 return -errno; 287 288 for (i = 0; i < cnt; i++) { 289 __u32 ring_id = rb->events[i].data.fd; 290 struct ring *ring = &rb->rings[ring_id]; 291 292 err = ringbuf_process_ring(ring); 293 if (err < 0) 294 return err; 295 res += err; 296 } 297 if (res > INT_MAX) 298 return INT_MAX; 299 return res; 300} 301