1/*
2 * Copyright 2016 Patrick Rudolph <siro@das-labor.org>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * on the rights to use, copy, modify, merge, publish, distribute, sub
8 * license, and/or sell copies of the Software, and to permit persons to whom
9 * the Software is furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice (including the next
12 * paragraph) shall be included in all copies or substantial portions of the
13 * Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL
18 * THE AUTHOR(S) AND/OR THEIR SUPPLIERS BE LIABLE FOR ANY CLAIM,
19 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
20 * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
21 * USE OR OTHER DEALINGS IN THE SOFTWARE. */
22
23#include "nine_queue.h"
24#include "os/os_thread.h"
25#include "util/macros.h"
26#include "nine_helpers.h"
27
28#define NINE_CMD_BUF_INSTR (256)
29
30#define NINE_CMD_BUFS (32)
31#define NINE_CMD_BUFS_MASK (NINE_CMD_BUFS - 1)
32
33#define NINE_QUEUE_SIZE (8192 * 16 + 128)
34
35#define DBG_CHANNEL DBG_DEVICE
36
37/*
38 * Single producer - single consumer pool queue
39 *
40 * Producer:
41 * Calls nine_queue_alloc to get a slice of memory in current cmdbuf.
42 * Calls nine_queue_flush to flush the queue by request.
43 * The queue is flushed automatically on insufficient space or once the
44 * cmdbuf contains NINE_CMD_BUF_INSTR instructions.
45 *
46 * nine_queue_flush does block, while nine_queue_alloc doesn't block.
47 *
48 * nine_queue_alloc returns NULL on insufficent space.
49 *
50 * Consumer:
51 * Calls nine_queue_wait_flush to wait for a cmdbuf.
52 * After waiting for a cmdbuf it calls nine_queue_get until NULL is returned.
53 *
54 * nine_queue_wait_flush does block, while nine_queue_get doesn't block.
55 *
56 * Constrains:
57 * Only a single consumer and a single producer are supported.
58 *
59 */
60
61struct nine_cmdbuf {
62    unsigned instr_size[NINE_CMD_BUF_INSTR];
63    unsigned num_instr;
64    unsigned offset;
65    void *mem_pool;
66    BOOL full;
67};
68
69struct nine_queue_pool {
70    struct nine_cmdbuf pool[NINE_CMD_BUFS];
71    unsigned head;
72    unsigned tail;
73    unsigned cur_instr;
74    BOOL worker_wait;
75    cnd_t event_pop;
76    cnd_t event_push;
77    mtx_t mutex_pop;
78    mtx_t mutex_push;
79};
80
81/* Consumer functions: */
82void
83nine_queue_wait_flush(struct nine_queue_pool* ctx)
84{
85    struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->tail];
86
87    /* wait for cmdbuf full */
88    mtx_lock(&ctx->mutex_push);
89    while (!cmdbuf->full)
90    {
91        DBG("waiting for full cmdbuf\n");
92        cnd_wait(&ctx->event_push, &ctx->mutex_push);
93    }
94    DBG("got cmdbuf=%p\n", cmdbuf);
95    mtx_unlock(&ctx->mutex_push);
96
97    cmdbuf->offset = 0;
98    ctx->cur_instr = 0;
99}
100
101/* Gets a pointer to the next memory slice.
102 * Does not block.
103 * Returns NULL on empty cmdbuf. */
104void *
105nine_queue_get(struct nine_queue_pool* ctx)
106{
107    struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->tail];
108    unsigned offset;
109
110    /* At this pointer there's always a cmdbuf. */
111
112    if (ctx->cur_instr == cmdbuf->num_instr) {
113        /* signal waiting producer */
114        mtx_lock(&ctx->mutex_pop);
115        DBG("freeing cmdbuf=%p\n", cmdbuf);
116        cmdbuf->full = 0;
117        cnd_signal(&ctx->event_pop);
118        mtx_unlock(&ctx->mutex_pop);
119
120        ctx->tail = (ctx->tail + 1) & NINE_CMD_BUFS_MASK;
121
122        return NULL;
123    }
124
125    /* At this pointer there's always a cmdbuf with instruction to process. */
126    offset = cmdbuf->offset;
127    cmdbuf->offset += cmdbuf->instr_size[ctx->cur_instr];
128    ctx->cur_instr ++;
129
130    return cmdbuf->mem_pool + offset;
131}
132
133/* Producer functions: */
134
135/* Flushes the queue.
136 * Moves the current cmdbuf to worker thread.
137 * Blocks until next cmdbuf is free. */
138void
139nine_queue_flush(struct nine_queue_pool* ctx)
140{
141    struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->head];
142
143    DBG("flushing cmdbuf=%p instr=%d size=%d\n",
144           cmdbuf, cmdbuf->num_instr, cmdbuf->offset);
145
146    /* Nothing to flush */
147    if (!cmdbuf->num_instr)
148        return;
149
150    /* signal waiting worker */
151    mtx_lock(&ctx->mutex_push);
152    cmdbuf->full = 1;
153    cnd_signal(&ctx->event_push);
154    mtx_unlock(&ctx->mutex_push);
155
156    ctx->head = (ctx->head + 1) & NINE_CMD_BUFS_MASK;
157
158    cmdbuf = &ctx->pool[ctx->head];
159
160    /* wait for queue empty */
161    mtx_lock(&ctx->mutex_pop);
162    while (cmdbuf->full)
163    {
164        DBG("waiting for empty cmdbuf\n");
165        cnd_wait(&ctx->event_pop, &ctx->mutex_pop);
166    }
167    DBG("got empty cmdbuf=%p\n", cmdbuf);
168    mtx_unlock(&ctx->mutex_pop);
169    cmdbuf->offset = 0;
170    cmdbuf->num_instr = 0;
171}
172
173/* Gets a a pointer to slice of memory with size @space.
174 * Does block if queue is full.
175 * Returns NULL on @space > NINE_QUEUE_SIZE. */
176void *
177nine_queue_alloc(struct nine_queue_pool* ctx, unsigned space)
178{
179    unsigned offset;
180    struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->head];
181
182    if (space > NINE_QUEUE_SIZE)
183        return NULL;
184
185    /* at this pointer there's always a free queue available */
186
187    if ((cmdbuf->offset + space > NINE_QUEUE_SIZE) ||
188        (cmdbuf->num_instr == NINE_CMD_BUF_INSTR)) {
189
190        nine_queue_flush(ctx);
191
192        cmdbuf = &ctx->pool[ctx->head];
193    }
194
195    DBG("cmdbuf=%p space=%d\n", cmdbuf, space);
196
197    /* at this pointer there's always a free queue with sufficient space available */
198
199    offset = cmdbuf->offset;
200    cmdbuf->offset += space;
201    cmdbuf->instr_size[cmdbuf->num_instr] = space;
202    cmdbuf->num_instr ++;
203
204    return cmdbuf->mem_pool + offset;
205}
206
207/* Returns the current queue flush state.
208 * TRUE nothing flushed
209 * FALSE one ore more instructions queued flushed. */
210bool
211nine_queue_no_flushed_work(struct nine_queue_pool* ctx)
212{
213    return (ctx->tail == ctx->head);
214}
215
216/* Returns the current queue empty state.
217 * TRUE no instructions queued.
218 * FALSE one ore more instructions queued. */
219bool
220nine_queue_isempty(struct nine_queue_pool* ctx)
221{
222    struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->head];
223
224    return (ctx->tail == ctx->head) && !cmdbuf->num_instr;
225}
226
227struct nine_queue_pool*
228nine_queue_create(void)
229{
230    unsigned i;
231    struct nine_queue_pool *ctx;
232
233    ctx = CALLOC_STRUCT(nine_queue_pool);
234    if (!ctx)
235        goto failed;
236
237    for (i = 0; i < NINE_CMD_BUFS; i++) {
238        ctx->pool[i].mem_pool = MALLOC(NINE_QUEUE_SIZE);
239        if (!ctx->pool[i].mem_pool)
240            goto failed;
241    }
242
243    cnd_init(&ctx->event_pop);
244    (void) mtx_init(&ctx->mutex_pop, mtx_plain);
245
246    cnd_init(&ctx->event_push);
247    (void) mtx_init(&ctx->mutex_push, mtx_plain);
248
249    /* Block until first cmdbuf has been flushed. */
250    ctx->worker_wait = TRUE;
251
252    return ctx;
253failed:
254    if (ctx) {
255        for (i = 0; i < NINE_CMD_BUFS; i++) {
256            if (ctx->pool[i].mem_pool)
257                FREE(ctx->pool[i].mem_pool);
258        }
259        FREE(ctx);
260    }
261    return NULL;
262}
263
264void
265nine_queue_delete(struct nine_queue_pool *ctx)
266{
267    unsigned i;
268
269    mtx_destroy(&ctx->mutex_pop);
270    cnd_destroy(&ctx->event_pop);
271
272    mtx_destroy(&ctx->mutex_push);
273    cnd_destroy(&ctx->event_push);
274
275    for (i = 0; i < NINE_CMD_BUFS; i++)
276        FREE(ctx->pool[i].mem_pool);
277
278    FREE(ctx);
279}
280