1/* 2 * Permission is hereby granted, free of charge, to any person obtaining a copy 3 * of this software and associated documentation files (the "Software"), to deal 4 * in the Software without restriction, including without limitation the rights 5 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 6 * copies of the Software, and to permit persons to whom the Software is 7 * furnished to do so, subject to the following conditions: 8 * 9 * The above copyright notice and this permission notice shall be included in 10 * all copies or substantial portions of the Software. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 15 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 17 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 18 * THE SOFTWARE. 19 */ 20 21/** 22 * Thread message API test 23 */ 24 25#include "libavutil/avassert.h" 26#include "libavutil/avstring.h" 27#include "libavutil/frame.h" 28#include "libavutil/threadmessage.h" 29#include "libavutil/thread.h" // not public 30 31struct sender_data { 32 int id; 33 pthread_t tid; 34 int workload; 35 AVThreadMessageQueue *queue; 36}; 37 38/* same as sender_data but shuffled for testing purpose */ 39struct receiver_data { 40 pthread_t tid; 41 int workload; 42 int id; 43 AVThreadMessageQueue *queue; 44}; 45 46struct message { 47 AVFrame *frame; 48 // we add some junk in the message to make sure the message size is > 49 // sizeof(void*) 50 int magic; 51}; 52 53#define MAGIC 0xdeadc0de 54 55static void free_frame(void *arg) 56{ 57 struct message *msg = arg; 58 av_assert0(msg->magic == MAGIC); 59 av_frame_free(&msg->frame); 60} 61 62static void *sender_thread(void *arg) 63{ 64 int i, ret = 0; 65 struct sender_data *wd = arg; 66 67 av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); 68 for (i = 0; i < wd->workload; i++) { 69 if (rand() % wd->workload < wd->workload / 10) { 70 av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); 71 av_thread_message_flush(wd->queue); 72 } else { 73 char *val; 74 AVDictionary *meta = NULL; 75 struct message msg = { 76 .magic = MAGIC, 77 .frame = av_frame_alloc(), 78 }; 79 80 if (!msg.frame) { 81 ret = AVERROR(ENOMEM); 82 break; 83 } 84 85 /* we add some metadata to identify the frames */ 86 val = av_asprintf("frame %d/%d from sender %d", 87 i + 1, wd->workload, wd->id); 88 if (!val) { 89 av_frame_free(&msg.frame); 90 ret = AVERROR(ENOMEM); 91 break; 92 } 93 ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); 94 if (ret < 0) { 95 av_frame_free(&msg.frame); 96 break; 97 } 98 msg.frame->metadata = meta; 99 100 /* allocate a real frame in order to simulate "real" work */ 101 msg.frame->format = AV_PIX_FMT_RGBA; 102 msg.frame->width = 320; 103 msg.frame->height = 240; 104 ret = av_frame_get_buffer(msg.frame, 0); 105 if (ret < 0) { 106 av_frame_free(&msg.frame); 107 break; 108 } 109 110 /* push the frame in the common queue */ 111 av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", 112 wd->id, i + 1, wd->workload, msg.frame); 113 ret = av_thread_message_queue_send(wd->queue, &msg, 0); 114 if (ret < 0) { 115 av_frame_free(&msg.frame); 116 break; 117 } 118 } 119 } 120 av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", 121 wd->id, av_err2str(ret)); 122 av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); 123 return NULL; 124} 125 126static void *receiver_thread(void *arg) 127{ 128 int i, ret = 0; 129 struct receiver_data *rd = arg; 130 131 for (i = 0; i < rd->workload; i++) { 132 if (rand() % rd->workload < rd->workload / 10) { 133 av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, " 134 "discarding %d message(s)\n", rd->id, 135 av_thread_message_queue_nb_elems(rd->queue)); 136 av_thread_message_flush(rd->queue); 137 } else { 138 struct message msg; 139 AVDictionary *meta; 140 AVDictionaryEntry *e; 141 142 ret = av_thread_message_queue_recv(rd->queue, &msg, 0); 143 if (ret < 0) 144 break; 145 av_assert0(msg.magic == MAGIC); 146 meta = msg.frame->metadata; 147 e = av_dict_get(meta, "sig", NULL, 0); 148 av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); 149 av_frame_free(&msg.frame); 150 } 151 } 152 153 av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); 154 av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); 155 156 return NULL; 157} 158 159static int get_workload(int minv, int maxv) 160{ 161 return maxv == minv ? maxv : rand() % (maxv - minv) + minv; 162} 163 164int main(int ac, char **av) 165{ 166 int i, ret = 0; 167 int max_queue_size; 168 int nb_senders, sender_min_load, sender_max_load; 169 int nb_receivers, receiver_min_load, receiver_max_load; 170 struct sender_data *senders; 171 struct receiver_data *receivers; 172 AVThreadMessageQueue *queue = NULL; 173 174 if (ac != 8) { 175 av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " 176 "<nb_senders> <sender_min_send> <sender_max_send> " 177 "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); 178 return 1; 179 } 180 181 max_queue_size = atoi(av[1]); 182 nb_senders = atoi(av[2]); 183 sender_min_load = atoi(av[3]); 184 sender_max_load = atoi(av[4]); 185 nb_receivers = atoi(av[5]); 186 receiver_min_load = atoi(av[6]); 187 receiver_max_load = atoi(av[7]); 188 189 if (max_queue_size <= 0 || 190 nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || 191 nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { 192 av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); 193 return 1; 194 } 195 196 av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " 197 "%d receivers receiving [%d-%d]\n", max_queue_size, 198 nb_senders, sender_min_load, sender_max_load, 199 nb_receivers, receiver_min_load, receiver_max_load); 200 201 senders = av_calloc(nb_senders, sizeof(*senders)); 202 receivers = av_calloc(nb_receivers, sizeof(*receivers)); 203 if (!senders || !receivers) { 204 ret = AVERROR(ENOMEM); 205 goto end; 206 } 207 208 ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); 209 if (ret < 0) 210 goto end; 211 212 av_thread_message_queue_set_free_func(queue, free_frame); 213 214#define SPAWN_THREADS(type) do { \ 215 for (i = 0; i < nb_##type##s; i++) { \ 216 struct type##_data *td = &type##s[i]; \ 217 \ 218 td->id = i; \ 219 td->queue = queue; \ 220 td->workload = get_workload(type##_min_load, type##_max_load); \ 221 \ 222 ret = pthread_create(&td->tid, NULL, type##_thread, td); \ 223 if (ret) { \ 224 const int err = AVERROR(ret); \ 225 av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \ 226 " thread: %s\n", av_err2str(err)); \ 227 goto end; \ 228 } \ 229 } \ 230} while (0) 231 232#define WAIT_THREADS(type) do { \ 233 for (i = 0; i < nb_##type##s; i++) { \ 234 struct type##_data *td = &type##s[i]; \ 235 \ 236 ret = pthread_join(td->tid, NULL); \ 237 if (ret) { \ 238 const int err = AVERROR(ret); \ 239 av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \ 240 " thread: %s\n", av_err2str(err)); \ 241 goto end; \ 242 } \ 243 } \ 244} while (0) 245 246 SPAWN_THREADS(receiver); 247 SPAWN_THREADS(sender); 248 249 WAIT_THREADS(sender); 250 WAIT_THREADS(receiver); 251 252end: 253 av_thread_message_queue_free(&queue); 254 av_freep(&senders); 255 av_freep(&receivers); 256 257 if (ret < 0 && ret != AVERROR_EOF) { 258 av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); 259 return 1; 260 } 261 return 0; 262} 263