1cabdff1aSopenharmony_ci/*
2cabdff1aSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy
3cabdff1aSopenharmony_ci * of this software and associated documentation files (the "Software"), to deal
4cabdff1aSopenharmony_ci * in the Software without restriction, including without limitation the rights
5cabdff1aSopenharmony_ci * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6cabdff1aSopenharmony_ci * copies of the Software, and to permit persons to whom the Software is
7cabdff1aSopenharmony_ci * furnished to do so, subject to the following conditions:
8cabdff1aSopenharmony_ci *
9cabdff1aSopenharmony_ci * The above copyright notice and this permission notice shall be included in
10cabdff1aSopenharmony_ci * all copies or substantial portions of the Software.
11cabdff1aSopenharmony_ci *
12cabdff1aSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13cabdff1aSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14cabdff1aSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15cabdff1aSopenharmony_ci * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16cabdff1aSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17cabdff1aSopenharmony_ci * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18cabdff1aSopenharmony_ci * THE SOFTWARE.
19cabdff1aSopenharmony_ci */
20cabdff1aSopenharmony_ci
21cabdff1aSopenharmony_ci/**
22cabdff1aSopenharmony_ci * Thread message API test
23cabdff1aSopenharmony_ci */
24cabdff1aSopenharmony_ci
25cabdff1aSopenharmony_ci#include "libavutil/avassert.h"
26cabdff1aSopenharmony_ci#include "libavutil/avstring.h"
27cabdff1aSopenharmony_ci#include "libavutil/frame.h"
28cabdff1aSopenharmony_ci#include "libavutil/threadmessage.h"
29cabdff1aSopenharmony_ci#include "libavutil/thread.h" // not public
30cabdff1aSopenharmony_ci
31cabdff1aSopenharmony_cistruct sender_data {
32cabdff1aSopenharmony_ci    int id;
33cabdff1aSopenharmony_ci    pthread_t tid;
34cabdff1aSopenharmony_ci    int workload;
35cabdff1aSopenharmony_ci    AVThreadMessageQueue *queue;
36cabdff1aSopenharmony_ci};
37cabdff1aSopenharmony_ci
38cabdff1aSopenharmony_ci/* same as sender_data but shuffled for testing purpose */
39cabdff1aSopenharmony_cistruct receiver_data {
40cabdff1aSopenharmony_ci    pthread_t tid;
41cabdff1aSopenharmony_ci    int workload;
42cabdff1aSopenharmony_ci    int id;
43cabdff1aSopenharmony_ci    AVThreadMessageQueue *queue;
44cabdff1aSopenharmony_ci};
45cabdff1aSopenharmony_ci
46cabdff1aSopenharmony_cistruct message {
47cabdff1aSopenharmony_ci    AVFrame *frame;
48cabdff1aSopenharmony_ci    // we add some junk in the message to make sure the message size is >
49cabdff1aSopenharmony_ci    // sizeof(void*)
50cabdff1aSopenharmony_ci    int magic;
51cabdff1aSopenharmony_ci};
52cabdff1aSopenharmony_ci
53cabdff1aSopenharmony_ci#define MAGIC 0xdeadc0de
54cabdff1aSopenharmony_ci
55cabdff1aSopenharmony_cistatic void free_frame(void *arg)
56cabdff1aSopenharmony_ci{
57cabdff1aSopenharmony_ci    struct message *msg = arg;
58cabdff1aSopenharmony_ci    av_assert0(msg->magic == MAGIC);
59cabdff1aSopenharmony_ci    av_frame_free(&msg->frame);
60cabdff1aSopenharmony_ci}
61cabdff1aSopenharmony_ci
62cabdff1aSopenharmony_cistatic void *sender_thread(void *arg)
63cabdff1aSopenharmony_ci{
64cabdff1aSopenharmony_ci    int i, ret = 0;
65cabdff1aSopenharmony_ci    struct sender_data *wd = arg;
66cabdff1aSopenharmony_ci
67cabdff1aSopenharmony_ci    av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68cabdff1aSopenharmony_ci    for (i = 0; i < wd->workload; i++) {
69cabdff1aSopenharmony_ci        if (rand() % wd->workload < wd->workload / 10) {
70cabdff1aSopenharmony_ci            av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
71cabdff1aSopenharmony_ci            av_thread_message_flush(wd->queue);
72cabdff1aSopenharmony_ci        } else {
73cabdff1aSopenharmony_ci            char *val;
74cabdff1aSopenharmony_ci            AVDictionary *meta = NULL;
75cabdff1aSopenharmony_ci            struct message msg = {
76cabdff1aSopenharmony_ci                .magic = MAGIC,
77cabdff1aSopenharmony_ci                .frame = av_frame_alloc(),
78cabdff1aSopenharmony_ci            };
79cabdff1aSopenharmony_ci
80cabdff1aSopenharmony_ci            if (!msg.frame) {
81cabdff1aSopenharmony_ci                ret = AVERROR(ENOMEM);
82cabdff1aSopenharmony_ci                break;
83cabdff1aSopenharmony_ci            }
84cabdff1aSopenharmony_ci
85cabdff1aSopenharmony_ci            /* we add some metadata to identify the frames */
86cabdff1aSopenharmony_ci            val = av_asprintf("frame %d/%d from sender %d",
87cabdff1aSopenharmony_ci                              i + 1, wd->workload, wd->id);
88cabdff1aSopenharmony_ci            if (!val) {
89cabdff1aSopenharmony_ci                av_frame_free(&msg.frame);
90cabdff1aSopenharmony_ci                ret = AVERROR(ENOMEM);
91cabdff1aSopenharmony_ci                break;
92cabdff1aSopenharmony_ci            }
93cabdff1aSopenharmony_ci            ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94cabdff1aSopenharmony_ci            if (ret < 0) {
95cabdff1aSopenharmony_ci                av_frame_free(&msg.frame);
96cabdff1aSopenharmony_ci                break;
97cabdff1aSopenharmony_ci            }
98cabdff1aSopenharmony_ci            msg.frame->metadata = meta;
99cabdff1aSopenharmony_ci
100cabdff1aSopenharmony_ci            /* allocate a real frame in order to simulate "real" work */
101cabdff1aSopenharmony_ci            msg.frame->format = AV_PIX_FMT_RGBA;
102cabdff1aSopenharmony_ci            msg.frame->width  = 320;
103cabdff1aSopenharmony_ci            msg.frame->height = 240;
104cabdff1aSopenharmony_ci            ret = av_frame_get_buffer(msg.frame, 0);
105cabdff1aSopenharmony_ci            if (ret < 0) {
106cabdff1aSopenharmony_ci                av_frame_free(&msg.frame);
107cabdff1aSopenharmony_ci                break;
108cabdff1aSopenharmony_ci            }
109cabdff1aSopenharmony_ci
110cabdff1aSopenharmony_ci            /* push the frame in the common queue */
111cabdff1aSopenharmony_ci            av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112cabdff1aSopenharmony_ci                   wd->id, i + 1, wd->workload, msg.frame);
113cabdff1aSopenharmony_ci            ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114cabdff1aSopenharmony_ci            if (ret < 0) {
115cabdff1aSopenharmony_ci                av_frame_free(&msg.frame);
116cabdff1aSopenharmony_ci                break;
117cabdff1aSopenharmony_ci            }
118cabdff1aSopenharmony_ci        }
119cabdff1aSopenharmony_ci    }
120cabdff1aSopenharmony_ci    av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121cabdff1aSopenharmony_ci           wd->id, av_err2str(ret));
122cabdff1aSopenharmony_ci    av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
123cabdff1aSopenharmony_ci    return NULL;
124cabdff1aSopenharmony_ci}
125cabdff1aSopenharmony_ci
126cabdff1aSopenharmony_cistatic void *receiver_thread(void *arg)
127cabdff1aSopenharmony_ci{
128cabdff1aSopenharmony_ci    int i, ret = 0;
129cabdff1aSopenharmony_ci    struct receiver_data *rd = arg;
130cabdff1aSopenharmony_ci
131cabdff1aSopenharmony_ci    for (i = 0; i < rd->workload; i++) {
132cabdff1aSopenharmony_ci        if (rand() % rd->workload < rd->workload / 10) {
133cabdff1aSopenharmony_ci            av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134cabdff1aSopenharmony_ci                   "discarding %d message(s)\n", rd->id,
135cabdff1aSopenharmony_ci                   av_thread_message_queue_nb_elems(rd->queue));
136cabdff1aSopenharmony_ci            av_thread_message_flush(rd->queue);
137cabdff1aSopenharmony_ci        } else {
138cabdff1aSopenharmony_ci            struct message msg;
139cabdff1aSopenharmony_ci            AVDictionary *meta;
140cabdff1aSopenharmony_ci            AVDictionaryEntry *e;
141cabdff1aSopenharmony_ci
142cabdff1aSopenharmony_ci            ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
143cabdff1aSopenharmony_ci            if (ret < 0)
144cabdff1aSopenharmony_ci                break;
145cabdff1aSopenharmony_ci            av_assert0(msg.magic == MAGIC);
146cabdff1aSopenharmony_ci            meta = msg.frame->metadata;
147cabdff1aSopenharmony_ci            e = av_dict_get(meta, "sig", NULL, 0);
148cabdff1aSopenharmony_ci            av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149cabdff1aSopenharmony_ci            av_frame_free(&msg.frame);
150cabdff1aSopenharmony_ci        }
151cabdff1aSopenharmony_ci    }
152cabdff1aSopenharmony_ci
153cabdff1aSopenharmony_ci    av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
154cabdff1aSopenharmony_ci    av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
155cabdff1aSopenharmony_ci
156cabdff1aSopenharmony_ci    return NULL;
157cabdff1aSopenharmony_ci}
158cabdff1aSopenharmony_ci
159cabdff1aSopenharmony_cistatic int get_workload(int minv, int maxv)
160cabdff1aSopenharmony_ci{
161cabdff1aSopenharmony_ci    return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
162cabdff1aSopenharmony_ci}
163cabdff1aSopenharmony_ci
164cabdff1aSopenharmony_ciint main(int ac, char **av)
165cabdff1aSopenharmony_ci{
166cabdff1aSopenharmony_ci    int i, ret = 0;
167cabdff1aSopenharmony_ci    int max_queue_size;
168cabdff1aSopenharmony_ci    int nb_senders, sender_min_load, sender_max_load;
169cabdff1aSopenharmony_ci    int nb_receivers, receiver_min_load, receiver_max_load;
170cabdff1aSopenharmony_ci    struct sender_data *senders;
171cabdff1aSopenharmony_ci    struct receiver_data *receivers;
172cabdff1aSopenharmony_ci    AVThreadMessageQueue *queue = NULL;
173cabdff1aSopenharmony_ci
174cabdff1aSopenharmony_ci    if (ac != 8) {
175cabdff1aSopenharmony_ci        av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176cabdff1aSopenharmony_ci               "<nb_senders> <sender_min_send> <sender_max_send> "
177cabdff1aSopenharmony_ci               "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
178cabdff1aSopenharmony_ci        return 1;
179cabdff1aSopenharmony_ci    }
180cabdff1aSopenharmony_ci
181cabdff1aSopenharmony_ci    max_queue_size    = atoi(av[1]);
182cabdff1aSopenharmony_ci    nb_senders        = atoi(av[2]);
183cabdff1aSopenharmony_ci    sender_min_load   = atoi(av[3]);
184cabdff1aSopenharmony_ci    sender_max_load   = atoi(av[4]);
185cabdff1aSopenharmony_ci    nb_receivers      = atoi(av[5]);
186cabdff1aSopenharmony_ci    receiver_min_load = atoi(av[6]);
187cabdff1aSopenharmony_ci    receiver_max_load = atoi(av[7]);
188cabdff1aSopenharmony_ci
189cabdff1aSopenharmony_ci    if (max_queue_size <= 0 ||
190cabdff1aSopenharmony_ci        nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191cabdff1aSopenharmony_ci        nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192cabdff1aSopenharmony_ci        av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
193cabdff1aSopenharmony_ci        return 1;
194cabdff1aSopenharmony_ci    }
195cabdff1aSopenharmony_ci
196cabdff1aSopenharmony_ci    av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197cabdff1aSopenharmony_ci           "%d receivers receiving [%d-%d]\n", max_queue_size,
198cabdff1aSopenharmony_ci           nb_senders, sender_min_load, sender_max_load,
199cabdff1aSopenharmony_ci           nb_receivers, receiver_min_load, receiver_max_load);
200cabdff1aSopenharmony_ci
201cabdff1aSopenharmony_ci    senders   = av_calloc(nb_senders,   sizeof(*senders));
202cabdff1aSopenharmony_ci    receivers = av_calloc(nb_receivers, sizeof(*receivers));
203cabdff1aSopenharmony_ci    if (!senders || !receivers) {
204cabdff1aSopenharmony_ci        ret = AVERROR(ENOMEM);
205cabdff1aSopenharmony_ci        goto end;
206cabdff1aSopenharmony_ci    }
207cabdff1aSopenharmony_ci
208cabdff1aSopenharmony_ci    ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
209cabdff1aSopenharmony_ci    if (ret < 0)
210cabdff1aSopenharmony_ci        goto end;
211cabdff1aSopenharmony_ci
212cabdff1aSopenharmony_ci    av_thread_message_queue_set_free_func(queue, free_frame);
213cabdff1aSopenharmony_ci
214cabdff1aSopenharmony_ci#define SPAWN_THREADS(type) do {                                                \
215cabdff1aSopenharmony_ci    for (i = 0; i < nb_##type##s; i++) {                                        \
216cabdff1aSopenharmony_ci        struct type##_data *td = &type##s[i];                                   \
217cabdff1aSopenharmony_ci                                                                                \
218cabdff1aSopenharmony_ci        td->id = i;                                                             \
219cabdff1aSopenharmony_ci        td->queue = queue;                                                      \
220cabdff1aSopenharmony_ci        td->workload = get_workload(type##_min_load, type##_max_load);          \
221cabdff1aSopenharmony_ci                                                                                \
222cabdff1aSopenharmony_ci        ret = pthread_create(&td->tid, NULL, type##_thread, td);                \
223cabdff1aSopenharmony_ci        if (ret) {                                                              \
224cabdff1aSopenharmony_ci            const int err = AVERROR(ret);                                       \
225cabdff1aSopenharmony_ci            av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type)    \
226cabdff1aSopenharmony_ci                   " thread: %s\n", av_err2str(err));                           \
227cabdff1aSopenharmony_ci            goto end;                                                           \
228cabdff1aSopenharmony_ci        }                                                                       \
229cabdff1aSopenharmony_ci    }                                                                           \
230cabdff1aSopenharmony_ci} while (0)
231cabdff1aSopenharmony_ci
232cabdff1aSopenharmony_ci#define WAIT_THREADS(type) do {                                                 \
233cabdff1aSopenharmony_ci    for (i = 0; i < nb_##type##s; i++) {                                        \
234cabdff1aSopenharmony_ci        struct type##_data *td = &type##s[i];                                   \
235cabdff1aSopenharmony_ci                                                                                \
236cabdff1aSopenharmony_ci        ret = pthread_join(td->tid, NULL);                                      \
237cabdff1aSopenharmony_ci        if (ret) {                                                              \
238cabdff1aSopenharmony_ci            const int err = AVERROR(ret);                                       \
239cabdff1aSopenharmony_ci            av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type)     \
240cabdff1aSopenharmony_ci                   " thread: %s\n", av_err2str(err));                           \
241cabdff1aSopenharmony_ci            goto end;                                                           \
242cabdff1aSopenharmony_ci        }                                                                       \
243cabdff1aSopenharmony_ci    }                                                                           \
244cabdff1aSopenharmony_ci} while (0)
245cabdff1aSopenharmony_ci
246cabdff1aSopenharmony_ci    SPAWN_THREADS(receiver);
247cabdff1aSopenharmony_ci    SPAWN_THREADS(sender);
248cabdff1aSopenharmony_ci
249cabdff1aSopenharmony_ci    WAIT_THREADS(sender);
250cabdff1aSopenharmony_ci    WAIT_THREADS(receiver);
251cabdff1aSopenharmony_ci
252cabdff1aSopenharmony_ciend:
253cabdff1aSopenharmony_ci    av_thread_message_queue_free(&queue);
254cabdff1aSopenharmony_ci    av_freep(&senders);
255cabdff1aSopenharmony_ci    av_freep(&receivers);
256cabdff1aSopenharmony_ci
257cabdff1aSopenharmony_ci    if (ret < 0 && ret != AVERROR_EOF) {
258cabdff1aSopenharmony_ci        av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
259cabdff1aSopenharmony_ci        return 1;
260cabdff1aSopenharmony_ci    }
261cabdff1aSopenharmony_ci    return 0;
262cabdff1aSopenharmony_ci}
263