1cabdff1aSopenharmony_ci/* 2cabdff1aSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy 3cabdff1aSopenharmony_ci * of this software and associated documentation files (the "Software"), to deal 4cabdff1aSopenharmony_ci * in the Software without restriction, including without limitation the rights 5cabdff1aSopenharmony_ci * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 6cabdff1aSopenharmony_ci * copies of the Software, and to permit persons to whom the Software is 7cabdff1aSopenharmony_ci * furnished to do so, subject to the following conditions: 8cabdff1aSopenharmony_ci * 9cabdff1aSopenharmony_ci * The above copyright notice and this permission notice shall be included in 10cabdff1aSopenharmony_ci * all copies or substantial portions of the Software. 11cabdff1aSopenharmony_ci * 12cabdff1aSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 13cabdff1aSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 14cabdff1aSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 15cabdff1aSopenharmony_ci * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 16cabdff1aSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 17cabdff1aSopenharmony_ci * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 18cabdff1aSopenharmony_ci * THE SOFTWARE. 19cabdff1aSopenharmony_ci */ 20cabdff1aSopenharmony_ci 21cabdff1aSopenharmony_ci/** 22cabdff1aSopenharmony_ci * Thread message API test 23cabdff1aSopenharmony_ci */ 24cabdff1aSopenharmony_ci 25cabdff1aSopenharmony_ci#include "libavutil/avassert.h" 26cabdff1aSopenharmony_ci#include "libavutil/avstring.h" 27cabdff1aSopenharmony_ci#include "libavutil/frame.h" 28cabdff1aSopenharmony_ci#include "libavutil/threadmessage.h" 29cabdff1aSopenharmony_ci#include "libavutil/thread.h" // not public 30cabdff1aSopenharmony_ci 31cabdff1aSopenharmony_cistruct sender_data { 32cabdff1aSopenharmony_ci int id; 33cabdff1aSopenharmony_ci pthread_t tid; 34cabdff1aSopenharmony_ci int workload; 35cabdff1aSopenharmony_ci AVThreadMessageQueue *queue; 36cabdff1aSopenharmony_ci}; 37cabdff1aSopenharmony_ci 38cabdff1aSopenharmony_ci/* same as sender_data but shuffled for testing purpose */ 39cabdff1aSopenharmony_cistruct receiver_data { 40cabdff1aSopenharmony_ci pthread_t tid; 41cabdff1aSopenharmony_ci int workload; 42cabdff1aSopenharmony_ci int id; 43cabdff1aSopenharmony_ci AVThreadMessageQueue *queue; 44cabdff1aSopenharmony_ci}; 45cabdff1aSopenharmony_ci 46cabdff1aSopenharmony_cistruct message { 47cabdff1aSopenharmony_ci AVFrame *frame; 48cabdff1aSopenharmony_ci // we add some junk in the message to make sure the message size is > 49cabdff1aSopenharmony_ci // sizeof(void*) 50cabdff1aSopenharmony_ci int magic; 51cabdff1aSopenharmony_ci}; 52cabdff1aSopenharmony_ci 53cabdff1aSopenharmony_ci#define MAGIC 0xdeadc0de 54cabdff1aSopenharmony_ci 55cabdff1aSopenharmony_cistatic void free_frame(void *arg) 56cabdff1aSopenharmony_ci{ 57cabdff1aSopenharmony_ci struct message *msg = arg; 58cabdff1aSopenharmony_ci av_assert0(msg->magic == MAGIC); 59cabdff1aSopenharmony_ci av_frame_free(&msg->frame); 60cabdff1aSopenharmony_ci} 61cabdff1aSopenharmony_ci 62cabdff1aSopenharmony_cistatic void *sender_thread(void *arg) 63cabdff1aSopenharmony_ci{ 64cabdff1aSopenharmony_ci int i, ret = 0; 65cabdff1aSopenharmony_ci struct sender_data *wd = arg; 66cabdff1aSopenharmony_ci 67cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); 68cabdff1aSopenharmony_ci for (i = 0; i < wd->workload; i++) { 69cabdff1aSopenharmony_ci if (rand() % wd->workload < wd->workload / 10) { 70cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); 71cabdff1aSopenharmony_ci av_thread_message_flush(wd->queue); 72cabdff1aSopenharmony_ci } else { 73cabdff1aSopenharmony_ci char *val; 74cabdff1aSopenharmony_ci AVDictionary *meta = NULL; 75cabdff1aSopenharmony_ci struct message msg = { 76cabdff1aSopenharmony_ci .magic = MAGIC, 77cabdff1aSopenharmony_ci .frame = av_frame_alloc(), 78cabdff1aSopenharmony_ci }; 79cabdff1aSopenharmony_ci 80cabdff1aSopenharmony_ci if (!msg.frame) { 81cabdff1aSopenharmony_ci ret = AVERROR(ENOMEM); 82cabdff1aSopenharmony_ci break; 83cabdff1aSopenharmony_ci } 84cabdff1aSopenharmony_ci 85cabdff1aSopenharmony_ci /* we add some metadata to identify the frames */ 86cabdff1aSopenharmony_ci val = av_asprintf("frame %d/%d from sender %d", 87cabdff1aSopenharmony_ci i + 1, wd->workload, wd->id); 88cabdff1aSopenharmony_ci if (!val) { 89cabdff1aSopenharmony_ci av_frame_free(&msg.frame); 90cabdff1aSopenharmony_ci ret = AVERROR(ENOMEM); 91cabdff1aSopenharmony_ci break; 92cabdff1aSopenharmony_ci } 93cabdff1aSopenharmony_ci ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); 94cabdff1aSopenharmony_ci if (ret < 0) { 95cabdff1aSopenharmony_ci av_frame_free(&msg.frame); 96cabdff1aSopenharmony_ci break; 97cabdff1aSopenharmony_ci } 98cabdff1aSopenharmony_ci msg.frame->metadata = meta; 99cabdff1aSopenharmony_ci 100cabdff1aSopenharmony_ci /* allocate a real frame in order to simulate "real" work */ 101cabdff1aSopenharmony_ci msg.frame->format = AV_PIX_FMT_RGBA; 102cabdff1aSopenharmony_ci msg.frame->width = 320; 103cabdff1aSopenharmony_ci msg.frame->height = 240; 104cabdff1aSopenharmony_ci ret = av_frame_get_buffer(msg.frame, 0); 105cabdff1aSopenharmony_ci if (ret < 0) { 106cabdff1aSopenharmony_ci av_frame_free(&msg.frame); 107cabdff1aSopenharmony_ci break; 108cabdff1aSopenharmony_ci } 109cabdff1aSopenharmony_ci 110cabdff1aSopenharmony_ci /* push the frame in the common queue */ 111cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", 112cabdff1aSopenharmony_ci wd->id, i + 1, wd->workload, msg.frame); 113cabdff1aSopenharmony_ci ret = av_thread_message_queue_send(wd->queue, &msg, 0); 114cabdff1aSopenharmony_ci if (ret < 0) { 115cabdff1aSopenharmony_ci av_frame_free(&msg.frame); 116cabdff1aSopenharmony_ci break; 117cabdff1aSopenharmony_ci } 118cabdff1aSopenharmony_ci } 119cabdff1aSopenharmony_ci } 120cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", 121cabdff1aSopenharmony_ci wd->id, av_err2str(ret)); 122cabdff1aSopenharmony_ci av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); 123cabdff1aSopenharmony_ci return NULL; 124cabdff1aSopenharmony_ci} 125cabdff1aSopenharmony_ci 126cabdff1aSopenharmony_cistatic void *receiver_thread(void *arg) 127cabdff1aSopenharmony_ci{ 128cabdff1aSopenharmony_ci int i, ret = 0; 129cabdff1aSopenharmony_ci struct receiver_data *rd = arg; 130cabdff1aSopenharmony_ci 131cabdff1aSopenharmony_ci for (i = 0; i < rd->workload; i++) { 132cabdff1aSopenharmony_ci if (rand() % rd->workload < rd->workload / 10) { 133cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, " 134cabdff1aSopenharmony_ci "discarding %d message(s)\n", rd->id, 135cabdff1aSopenharmony_ci av_thread_message_queue_nb_elems(rd->queue)); 136cabdff1aSopenharmony_ci av_thread_message_flush(rd->queue); 137cabdff1aSopenharmony_ci } else { 138cabdff1aSopenharmony_ci struct message msg; 139cabdff1aSopenharmony_ci AVDictionary *meta; 140cabdff1aSopenharmony_ci AVDictionaryEntry *e; 141cabdff1aSopenharmony_ci 142cabdff1aSopenharmony_ci ret = av_thread_message_queue_recv(rd->queue, &msg, 0); 143cabdff1aSopenharmony_ci if (ret < 0) 144cabdff1aSopenharmony_ci break; 145cabdff1aSopenharmony_ci av_assert0(msg.magic == MAGIC); 146cabdff1aSopenharmony_ci meta = msg.frame->metadata; 147cabdff1aSopenharmony_ci e = av_dict_get(meta, "sig", NULL, 0); 148cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); 149cabdff1aSopenharmony_ci av_frame_free(&msg.frame); 150cabdff1aSopenharmony_ci } 151cabdff1aSopenharmony_ci } 152cabdff1aSopenharmony_ci 153cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); 154cabdff1aSopenharmony_ci av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); 155cabdff1aSopenharmony_ci 156cabdff1aSopenharmony_ci return NULL; 157cabdff1aSopenharmony_ci} 158cabdff1aSopenharmony_ci 159cabdff1aSopenharmony_cistatic int get_workload(int minv, int maxv) 160cabdff1aSopenharmony_ci{ 161cabdff1aSopenharmony_ci return maxv == minv ? maxv : rand() % (maxv - minv) + minv; 162cabdff1aSopenharmony_ci} 163cabdff1aSopenharmony_ci 164cabdff1aSopenharmony_ciint main(int ac, char **av) 165cabdff1aSopenharmony_ci{ 166cabdff1aSopenharmony_ci int i, ret = 0; 167cabdff1aSopenharmony_ci int max_queue_size; 168cabdff1aSopenharmony_ci int nb_senders, sender_min_load, sender_max_load; 169cabdff1aSopenharmony_ci int nb_receivers, receiver_min_load, receiver_max_load; 170cabdff1aSopenharmony_ci struct sender_data *senders; 171cabdff1aSopenharmony_ci struct receiver_data *receivers; 172cabdff1aSopenharmony_ci AVThreadMessageQueue *queue = NULL; 173cabdff1aSopenharmony_ci 174cabdff1aSopenharmony_ci if (ac != 8) { 175cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " 176cabdff1aSopenharmony_ci "<nb_senders> <sender_min_send> <sender_max_send> " 177cabdff1aSopenharmony_ci "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); 178cabdff1aSopenharmony_ci return 1; 179cabdff1aSopenharmony_ci } 180cabdff1aSopenharmony_ci 181cabdff1aSopenharmony_ci max_queue_size = atoi(av[1]); 182cabdff1aSopenharmony_ci nb_senders = atoi(av[2]); 183cabdff1aSopenharmony_ci sender_min_load = atoi(av[3]); 184cabdff1aSopenharmony_ci sender_max_load = atoi(av[4]); 185cabdff1aSopenharmony_ci nb_receivers = atoi(av[5]); 186cabdff1aSopenharmony_ci receiver_min_load = atoi(av[6]); 187cabdff1aSopenharmony_ci receiver_max_load = atoi(av[7]); 188cabdff1aSopenharmony_ci 189cabdff1aSopenharmony_ci if (max_queue_size <= 0 || 190cabdff1aSopenharmony_ci nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || 191cabdff1aSopenharmony_ci nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { 192cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); 193cabdff1aSopenharmony_ci return 1; 194cabdff1aSopenharmony_ci } 195cabdff1aSopenharmony_ci 196cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " 197cabdff1aSopenharmony_ci "%d receivers receiving [%d-%d]\n", max_queue_size, 198cabdff1aSopenharmony_ci nb_senders, sender_min_load, sender_max_load, 199cabdff1aSopenharmony_ci nb_receivers, receiver_min_load, receiver_max_load); 200cabdff1aSopenharmony_ci 201cabdff1aSopenharmony_ci senders = av_calloc(nb_senders, sizeof(*senders)); 202cabdff1aSopenharmony_ci receivers = av_calloc(nb_receivers, sizeof(*receivers)); 203cabdff1aSopenharmony_ci if (!senders || !receivers) { 204cabdff1aSopenharmony_ci ret = AVERROR(ENOMEM); 205cabdff1aSopenharmony_ci goto end; 206cabdff1aSopenharmony_ci } 207cabdff1aSopenharmony_ci 208cabdff1aSopenharmony_ci ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); 209cabdff1aSopenharmony_ci if (ret < 0) 210cabdff1aSopenharmony_ci goto end; 211cabdff1aSopenharmony_ci 212cabdff1aSopenharmony_ci av_thread_message_queue_set_free_func(queue, free_frame); 213cabdff1aSopenharmony_ci 214cabdff1aSopenharmony_ci#define SPAWN_THREADS(type) do { \ 215cabdff1aSopenharmony_ci for (i = 0; i < nb_##type##s; i++) { \ 216cabdff1aSopenharmony_ci struct type##_data *td = &type##s[i]; \ 217cabdff1aSopenharmony_ci \ 218cabdff1aSopenharmony_ci td->id = i; \ 219cabdff1aSopenharmony_ci td->queue = queue; \ 220cabdff1aSopenharmony_ci td->workload = get_workload(type##_min_load, type##_max_load); \ 221cabdff1aSopenharmony_ci \ 222cabdff1aSopenharmony_ci ret = pthread_create(&td->tid, NULL, type##_thread, td); \ 223cabdff1aSopenharmony_ci if (ret) { \ 224cabdff1aSopenharmony_ci const int err = AVERROR(ret); \ 225cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \ 226cabdff1aSopenharmony_ci " thread: %s\n", av_err2str(err)); \ 227cabdff1aSopenharmony_ci goto end; \ 228cabdff1aSopenharmony_ci } \ 229cabdff1aSopenharmony_ci } \ 230cabdff1aSopenharmony_ci} while (0) 231cabdff1aSopenharmony_ci 232cabdff1aSopenharmony_ci#define WAIT_THREADS(type) do { \ 233cabdff1aSopenharmony_ci for (i = 0; i < nb_##type##s; i++) { \ 234cabdff1aSopenharmony_ci struct type##_data *td = &type##s[i]; \ 235cabdff1aSopenharmony_ci \ 236cabdff1aSopenharmony_ci ret = pthread_join(td->tid, NULL); \ 237cabdff1aSopenharmony_ci if (ret) { \ 238cabdff1aSopenharmony_ci const int err = AVERROR(ret); \ 239cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \ 240cabdff1aSopenharmony_ci " thread: %s\n", av_err2str(err)); \ 241cabdff1aSopenharmony_ci goto end; \ 242cabdff1aSopenharmony_ci } \ 243cabdff1aSopenharmony_ci } \ 244cabdff1aSopenharmony_ci} while (0) 245cabdff1aSopenharmony_ci 246cabdff1aSopenharmony_ci SPAWN_THREADS(receiver); 247cabdff1aSopenharmony_ci SPAWN_THREADS(sender); 248cabdff1aSopenharmony_ci 249cabdff1aSopenharmony_ci WAIT_THREADS(sender); 250cabdff1aSopenharmony_ci WAIT_THREADS(receiver); 251cabdff1aSopenharmony_ci 252cabdff1aSopenharmony_ciend: 253cabdff1aSopenharmony_ci av_thread_message_queue_free(&queue); 254cabdff1aSopenharmony_ci av_freep(&senders); 255cabdff1aSopenharmony_ci av_freep(&receivers); 256cabdff1aSopenharmony_ci 257cabdff1aSopenharmony_ci if (ret < 0 && ret != AVERROR_EOF) { 258cabdff1aSopenharmony_ci av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); 259cabdff1aSopenharmony_ci return 1; 260cabdff1aSopenharmony_ci } 261cabdff1aSopenharmony_ci return 0; 262cabdff1aSopenharmony_ci} 263