1/*
2 * Permission is hereby granted, free of charge, to any person obtaining a copy
3 * of this software and associated documentation files (the "Software"), to deal
4 * in the Software without restriction, including without limitation the rights
5 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6 * copies of the Software, and to permit persons to whom the Software is
7 * furnished to do so, subject to the following conditions:
8 *
9 * The above copyright notice and this permission notice shall be included in
10 * all copies or substantial portions of the Software.
11 *
12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18 * THE SOFTWARE.
19 */
20
21/**
22 * Thread message API test
23 */
24
25#include "libavutil/avassert.h"
26#include "libavutil/avstring.h"
27#include "libavutil/frame.h"
28#include "libavutil/threadmessage.h"
29#include "libavutil/thread.h" // not public
30
31struct sender_data {
32    int id;
33    pthread_t tid;
34    int workload;
35    AVThreadMessageQueue *queue;
36};
37
38/* same as sender_data but shuffled for testing purpose */
39struct receiver_data {
40    pthread_t tid;
41    int workload;
42    int id;
43    AVThreadMessageQueue *queue;
44};
45
46struct message {
47    AVFrame *frame;
48    // we add some junk in the message to make sure the message size is >
49    // sizeof(void*)
50    int magic;
51};
52
53#define MAGIC 0xdeadc0de
54
55static void free_frame(void *arg)
56{
57    struct message *msg = arg;
58    av_assert0(msg->magic == MAGIC);
59    av_frame_free(&msg->frame);
60}
61
62static void *sender_thread(void *arg)
63{
64    int i, ret = 0;
65    struct sender_data *wd = arg;
66
67    av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68    for (i = 0; i < wd->workload; i++) {
69        if (rand() % wd->workload < wd->workload / 10) {
70            av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
71            av_thread_message_flush(wd->queue);
72        } else {
73            char *val;
74            AVDictionary *meta = NULL;
75            struct message msg = {
76                .magic = MAGIC,
77                .frame = av_frame_alloc(),
78            };
79
80            if (!msg.frame) {
81                ret = AVERROR(ENOMEM);
82                break;
83            }
84
85            /* we add some metadata to identify the frames */
86            val = av_asprintf("frame %d/%d from sender %d",
87                              i + 1, wd->workload, wd->id);
88            if (!val) {
89                av_frame_free(&msg.frame);
90                ret = AVERROR(ENOMEM);
91                break;
92            }
93            ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94            if (ret < 0) {
95                av_frame_free(&msg.frame);
96                break;
97            }
98            msg.frame->metadata = meta;
99
100            /* allocate a real frame in order to simulate "real" work */
101            msg.frame->format = AV_PIX_FMT_RGBA;
102            msg.frame->width  = 320;
103            msg.frame->height = 240;
104            ret = av_frame_get_buffer(msg.frame, 0);
105            if (ret < 0) {
106                av_frame_free(&msg.frame);
107                break;
108            }
109
110            /* push the frame in the common queue */
111            av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112                   wd->id, i + 1, wd->workload, msg.frame);
113            ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114            if (ret < 0) {
115                av_frame_free(&msg.frame);
116                break;
117            }
118        }
119    }
120    av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121           wd->id, av_err2str(ret));
122    av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
123    return NULL;
124}
125
126static void *receiver_thread(void *arg)
127{
128    int i, ret = 0;
129    struct receiver_data *rd = arg;
130
131    for (i = 0; i < rd->workload; i++) {
132        if (rand() % rd->workload < rd->workload / 10) {
133            av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134                   "discarding %d message(s)\n", rd->id,
135                   av_thread_message_queue_nb_elems(rd->queue));
136            av_thread_message_flush(rd->queue);
137        } else {
138            struct message msg;
139            AVDictionary *meta;
140            AVDictionaryEntry *e;
141
142            ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
143            if (ret < 0)
144                break;
145            av_assert0(msg.magic == MAGIC);
146            meta = msg.frame->metadata;
147            e = av_dict_get(meta, "sig", NULL, 0);
148            av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149            av_frame_free(&msg.frame);
150        }
151    }
152
153    av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
154    av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
155
156    return NULL;
157}
158
159static int get_workload(int minv, int maxv)
160{
161    return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
162}
163
164int main(int ac, char **av)
165{
166    int i, ret = 0;
167    int max_queue_size;
168    int nb_senders, sender_min_load, sender_max_load;
169    int nb_receivers, receiver_min_load, receiver_max_load;
170    struct sender_data *senders;
171    struct receiver_data *receivers;
172    AVThreadMessageQueue *queue = NULL;
173
174    if (ac != 8) {
175        av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176               "<nb_senders> <sender_min_send> <sender_max_send> "
177               "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
178        return 1;
179    }
180
181    max_queue_size    = atoi(av[1]);
182    nb_senders        = atoi(av[2]);
183    sender_min_load   = atoi(av[3]);
184    sender_max_load   = atoi(av[4]);
185    nb_receivers      = atoi(av[5]);
186    receiver_min_load = atoi(av[6]);
187    receiver_max_load = atoi(av[7]);
188
189    if (max_queue_size <= 0 ||
190        nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191        nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192        av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
193        return 1;
194    }
195
196    av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197           "%d receivers receiving [%d-%d]\n", max_queue_size,
198           nb_senders, sender_min_load, sender_max_load,
199           nb_receivers, receiver_min_load, receiver_max_load);
200
201    senders   = av_calloc(nb_senders,   sizeof(*senders));
202    receivers = av_calloc(nb_receivers, sizeof(*receivers));
203    if (!senders || !receivers) {
204        ret = AVERROR(ENOMEM);
205        goto end;
206    }
207
208    ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
209    if (ret < 0)
210        goto end;
211
212    av_thread_message_queue_set_free_func(queue, free_frame);
213
214#define SPAWN_THREADS(type) do {                                                \
215    for (i = 0; i < nb_##type##s; i++) {                                        \
216        struct type##_data *td = &type##s[i];                                   \
217                                                                                \
218        td->id = i;                                                             \
219        td->queue = queue;                                                      \
220        td->workload = get_workload(type##_min_load, type##_max_load);          \
221                                                                                \
222        ret = pthread_create(&td->tid, NULL, type##_thread, td);                \
223        if (ret) {                                                              \
224            const int err = AVERROR(ret);                                       \
225            av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type)    \
226                   " thread: %s\n", av_err2str(err));                           \
227            goto end;                                                           \
228        }                                                                       \
229    }                                                                           \
230} while (0)
231
232#define WAIT_THREADS(type) do {                                                 \
233    for (i = 0; i < nb_##type##s; i++) {                                        \
234        struct type##_data *td = &type##s[i];                                   \
235                                                                                \
236        ret = pthread_join(td->tid, NULL);                                      \
237        if (ret) {                                                              \
238            const int err = AVERROR(ret);                                       \
239            av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type)     \
240                   " thread: %s\n", av_err2str(err));                           \
241            goto end;                                                           \
242        }                                                                       \
243    }                                                                           \
244} while (0)
245
246    SPAWN_THREADS(receiver);
247    SPAWN_THREADS(sender);
248
249    WAIT_THREADS(sender);
250    WAIT_THREADS(receiver);
251
252end:
253    av_thread_message_queue_free(&queue);
254    av_freep(&senders);
255    av_freep(&receivers);
256
257    if (ret < 0 && ret != AVERROR_EOF) {
258        av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
259        return 1;
260    }
261    return 0;
262}
263