1 /*
2 * Tee pseudo-muxer
3 * Copyright (c) 2012 Nicolas George
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22
23 #include "libavutil/avutil.h"
24 #include "libavutil/avstring.h"
25 #include "libavutil/opt.h"
26 #include "libavcodec/bsf.h"
27 #include "internal.h"
28 #include "avformat.h"
29 #include "mux.h"
30 #include "tee_common.h"
31
32 typedef enum {
33 ON_SLAVE_FAILURE_ABORT = 1,
34 ON_SLAVE_FAILURE_IGNORE = 2
35 } SlaveFailurePolicy;
36
37 #define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
38
39 typedef struct {
40 AVFormatContext *avf;
41 AVBSFContext **bsfs; ///< bitstream filters per stream
42
43 SlaveFailurePolicy on_fail;
44 int use_fifo;
45 AVDictionary *fifo_options;
46
47 /** map from input to output streams indexes,
48 * disabled output streams are set to -1 */
49 int *stream_map;
50 int header_written;
51 } TeeSlave;
52
53 typedef struct TeeContext {
54 const AVClass *class;
55 unsigned nb_slaves;
56 unsigned nb_alive;
57 TeeSlave *slaves;
58 int use_fifo;
59 AVDictionary *fifo_options;
60 } TeeContext;
61
62 static const char *const slave_delim = "|";
63 static const char *const slave_bsfs_spec_sep = "/";
64 static const char *const slave_select_sep = ",";
65
66 #define OFFSET(x) offsetof(TeeContext, x)
67 static const AVOption options[] = {
68 {"use_fifo", "Use fifo pseudo-muxer to separate actual muxers from encoder",
69 OFFSET(use_fifo), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
70 {"fifo_options", "fifo pseudo-muxer options", OFFSET(fifo_options),
71 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
72 {NULL}
73 };
74
75 static const AVClass tee_muxer_class = {
76 .class_name = "Tee muxer",
77 .item_name = av_default_item_name,
78 .option = options,
79 .version = LIBAVUTIL_VERSION_INT,
80 };
81
parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)82 static inline int parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)
83 {
84 if (!opt) {
85 tee_slave->on_fail = DEFAULT_SLAVE_FAILURE_POLICY;
86 return 0;
87 } else if (!av_strcasecmp("abort", opt)) {
88 tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
89 return 0;
90 } else if (!av_strcasecmp("ignore", opt)) {
91 tee_slave->on_fail = ON_SLAVE_FAILURE_IGNORE;
92 return 0;
93 }
94 /* Set failure behaviour to abort, so invalid option error will not be ignored */
95 tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
96 return AVERROR(EINVAL);
97 }
98
parse_slave_fifo_policy(const char *use_fifo, TeeSlave *tee_slave)99 static int parse_slave_fifo_policy(const char *use_fifo, TeeSlave *tee_slave)
100 {
101 /*TODO - change this to use proper function for parsing boolean
102 * options when there is one */
103 if (av_match_name(use_fifo, "true,y,yes,enable,enabled,on,1")) {
104 tee_slave->use_fifo = 1;
105 } else if (av_match_name(use_fifo, "false,n,no,disable,disabled,off,0")) {
106 tee_slave->use_fifo = 0;
107 } else {
108 return AVERROR(EINVAL);
109 }
110 return 0;
111 }
112
parse_slave_fifo_options(const char *fifo_options, TeeSlave *tee_slave)113 static int parse_slave_fifo_options(const char *fifo_options, TeeSlave *tee_slave)
114 {
115 return av_dict_parse_string(&tee_slave->fifo_options, fifo_options, "=", ":", 0);
116 }
117
close_slave(TeeSlave *tee_slave)118 static int close_slave(TeeSlave *tee_slave)
119 {
120 AVFormatContext *avf;
121 unsigned i;
122 int ret = 0;
123
124 av_dict_free(&tee_slave->fifo_options);
125 avf = tee_slave->avf;
126 if (!avf)
127 return 0;
128
129 if (tee_slave->header_written)
130 ret = av_write_trailer(avf);
131
132 if (tee_slave->bsfs) {
133 for (i = 0; i < avf->nb_streams; ++i)
134 av_bsf_free(&tee_slave->bsfs[i]);
135 }
136 av_freep(&tee_slave->stream_map);
137 av_freep(&tee_slave->bsfs);
138
139 ff_format_io_close(avf, &avf->pb);
140 avformat_free_context(avf);
141 tee_slave->avf = NULL;
142 return ret;
143 }
144
close_slaves(AVFormatContext *avf)145 static void close_slaves(AVFormatContext *avf)
146 {
147 TeeContext *tee = avf->priv_data;
148 unsigned i;
149
150 for (i = 0; i < tee->nb_slaves; i++) {
151 close_slave(&tee->slaves[i]);
152 }
153 av_freep(&tee->slaves);
154 }
155
open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)156 static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
157 {
158 int i, ret;
159 AVDictionary *options = NULL, *bsf_options = NULL;
160 AVDictionaryEntry *entry;
161 char *filename;
162 char *format = NULL, *select = NULL, *on_fail = NULL;
163 char *use_fifo = NULL, *fifo_options_str = NULL;
164 AVFormatContext *avf2 = NULL;
165 AVStream *st, *st2;
166 int stream_count;
167 int fullret;
168 char *subselect = NULL, *next_subselect = NULL, *first_subselect = NULL, *tmp_select = NULL;
169
170 if ((ret = ff_tee_parse_slave_options(avf, slave, &options, &filename)) < 0)
171 return ret;
172
173 #define CONSUME_OPTION(option, field, action) do { \
174 if ((entry = av_dict_get(options, option, NULL, 0))) { \
175 field = entry->value; \
176 { action } \
177 av_dict_set(&options, option, NULL, 0); \
178 } \
179 } while (0)
180 #define STEAL_OPTION(option, field) \
181 CONSUME_OPTION(option, field, \
182 entry->value = NULL; /* prevent it from being freed */)
183 #define PROCESS_OPTION(option, field, function, on_error) \
184 CONSUME_OPTION(option, field, if ((ret = function) < 0) { { on_error } goto end; })
185
186 STEAL_OPTION("f", format);
187 STEAL_OPTION("select", select);
188 PROCESS_OPTION("onfail", on_fail,
189 parse_slave_failure_policy_option(on_fail, tee_slave),
190 av_log(avf, AV_LOG_ERROR, "Invalid onfail option value, "
191 "valid options are 'abort' and 'ignore'\n"););
192 PROCESS_OPTION("use_fifo", use_fifo,
193 parse_slave_fifo_policy(use_fifo, tee_slave),
194 av_log(avf, AV_LOG_ERROR, "Error parsing fifo options: %s\n",
195 av_err2str(ret)););
196 PROCESS_OPTION("fifo_options", fifo_options_str,
197 parse_slave_fifo_options(fifo_options_str, tee_slave), ;);
198 entry = NULL;
199 while ((entry = av_dict_get(options, "bsfs", entry, AV_DICT_IGNORE_SUFFIX))) {
200 /* trim out strlen("bsfs") characters from key */
201 av_dict_set(&bsf_options, entry->key + 4, entry->value, 0);
202 av_dict_set(&options, entry->key, NULL, 0);
203 }
204
205 if (tee_slave->use_fifo) {
206
207 if (options) {
208 char *format_options_str = NULL;
209 ret = av_dict_get_string(options, &format_options_str, '=', ':');
210 if (ret < 0)
211 goto end;
212
213 ret = av_dict_set(&tee_slave->fifo_options, "format_opts", format_options_str,
214 AV_DICT_DONT_STRDUP_VAL);
215 if (ret < 0)
216 goto end;
217 }
218
219 if (format) {
220 ret = av_dict_set(&tee_slave->fifo_options, "fifo_format", format,
221 AV_DICT_DONT_STRDUP_VAL);
222 format = NULL;
223 if (ret < 0)
224 goto end;
225 }
226
227 av_dict_free(&options);
228 options = tee_slave->fifo_options;
229 tee_slave->fifo_options = NULL;
230 }
231 ret = avformat_alloc_output_context2(&avf2, NULL,
232 tee_slave->use_fifo ? "fifo" :format, filename);
233 if (ret < 0)
234 goto end;
235 tee_slave->avf = avf2;
236 av_dict_copy(&avf2->metadata, avf->metadata, 0);
237 avf2->opaque = avf->opaque;
238 avf2->io_open = avf->io_open;
239 avf2->io_close = avf->io_close;
240 avf2->io_close2 = avf->io_close2;
241 avf2->interrupt_callback = avf->interrupt_callback;
242 avf2->flags = avf->flags;
243 avf2->strict_std_compliance = avf->strict_std_compliance;
244
245 tee_slave->stream_map = av_calloc(avf->nb_streams, sizeof(*tee_slave->stream_map));
246 if (!tee_slave->stream_map) {
247 ret = AVERROR(ENOMEM);
248 goto end;
249 }
250
251 stream_count = 0;
252 for (i = 0; i < avf->nb_streams; i++) {
253 st = avf->streams[i];
254 if (select) {
255 tmp_select = av_strdup(select); // av_strtok is destructive so we regenerate it in each loop
256 if (!tmp_select) {
257 ret = AVERROR(ENOMEM);
258 goto end;
259 }
260 fullret = 0;
261 first_subselect = tmp_select;
262 next_subselect = NULL;
263 while (subselect = av_strtok(first_subselect, slave_select_sep, &next_subselect)) {
264 first_subselect = NULL;
265
266 ret = avformat_match_stream_specifier(avf, avf->streams[i], subselect);
267 if (ret < 0) {
268 av_log(avf, AV_LOG_ERROR,
269 "Invalid stream specifier '%s' for output '%s'\n",
270 subselect, slave);
271 goto end;
272 }
273 if (ret != 0) {
274 fullret = 1; // match
275 break;
276 }
277 }
278 av_freep(&tmp_select);
279
280 if (fullret == 0) { /* no match */
281 tee_slave->stream_map[i] = -1;
282 continue;
283 }
284 }
285 tee_slave->stream_map[i] = stream_count++;
286
287 if (!(st2 = avformat_new_stream(avf2, NULL))) {
288 ret = AVERROR(ENOMEM);
289 goto end;
290 }
291
292 ret = ff_stream_encode_params_copy(st2, st);
293 if (ret < 0)
294 goto end;
295 }
296
297 ret = ff_format_output_open(avf2, filename, &options);
298 if (ret < 0) {
299 av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n", slave,
300 av_err2str(ret));
301 goto end;
302 }
303
304 if ((ret = avformat_write_header(avf2, &options)) < 0) {
305 av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
306 slave, av_err2str(ret));
307 goto end;
308 }
309 tee_slave->header_written = 1;
310
311 tee_slave->bsfs = av_calloc(avf2->nb_streams, sizeof(*tee_slave->bsfs));
312 if (!tee_slave->bsfs) {
313 ret = AVERROR(ENOMEM);
314 goto end;
315 }
316
317 entry = NULL;
318 while (entry = av_dict_get(bsf_options, "", NULL, AV_DICT_IGNORE_SUFFIX)) {
319 const char *spec = entry->key;
320 if (*spec) {
321 if (strspn(spec, slave_bsfs_spec_sep) != 1) {
322 av_log(avf, AV_LOG_ERROR,
323 "Specifier separator in '%s' is '%c', but only characters '%s' "
324 "are allowed\n", entry->key, *spec, slave_bsfs_spec_sep);
325 ret = AVERROR(EINVAL);
326 goto end;
327 }
328 spec++; /* consume separator */
329 }
330
331 for (i = 0; i < avf2->nb_streams; i++) {
332 ret = avformat_match_stream_specifier(avf2, avf2->streams[i], spec);
333 if (ret < 0) {
334 av_log(avf, AV_LOG_ERROR,
335 "Invalid stream specifier '%s' in bsfs option '%s' for slave "
336 "output '%s'\n", spec, entry->key, filename);
337 goto end;
338 }
339
340 if (ret > 0) {
341 av_log(avf, AV_LOG_DEBUG, "spec:%s bsfs:%s matches stream %d of slave "
342 "output '%s'\n", spec, entry->value, i, filename);
343 if (tee_slave->bsfs[i]) {
344 av_log(avf, AV_LOG_WARNING,
345 "Duplicate bsfs specification associated to stream %d of slave "
346 "output '%s', filters will be ignored\n", i, filename);
347 continue;
348 }
349 ret = av_bsf_list_parse_str(entry->value, &tee_slave->bsfs[i]);
350 if (ret < 0) {
351 av_log(avf, AV_LOG_ERROR,
352 "Error parsing bitstream filter sequence '%s' associated to "
353 "stream %d of slave output '%s'\n", entry->value, i, filename);
354 goto end;
355 }
356 }
357 }
358
359 av_dict_set(&bsf_options, entry->key, NULL, 0);
360 }
361
362 for (i = 0; i < avf->nb_streams; i++){
363 int target_stream = tee_slave->stream_map[i];
364 if (target_stream < 0)
365 continue;
366
367 if (!tee_slave->bsfs[target_stream]) {
368 /* Add pass-through bitstream filter */
369 ret = av_bsf_get_null_filter(&tee_slave->bsfs[target_stream]);
370 if (ret < 0) {
371 av_log(avf, AV_LOG_ERROR,
372 "Failed to create pass-through bitstream filter: %s\n",
373 av_err2str(ret));
374 goto end;
375 }
376 }
377
378 tee_slave->bsfs[target_stream]->time_base_in = avf->streams[i]->time_base;
379 ret = avcodec_parameters_copy(tee_slave->bsfs[target_stream]->par_in,
380 avf->streams[i]->codecpar);
381 if (ret < 0)
382 goto end;
383
384 ret = av_bsf_init(tee_slave->bsfs[target_stream]);
385 if (ret < 0) {
386 av_log(avf, AV_LOG_ERROR,
387 "Failed to initialize bitstream filter(s): %s\n",
388 av_err2str(ret));
389 goto end;
390 }
391 }
392
393 if (options) {
394 entry = NULL;
395 while ((entry = av_dict_get(options, "", entry, AV_DICT_IGNORE_SUFFIX)))
396 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
397 ret = AVERROR_OPTION_NOT_FOUND;
398 goto end;
399 }
400
401 end:
402 av_free(format);
403 av_free(select);
404 av_dict_free(&options);
405 av_dict_free(&bsf_options);
406 av_freep(&tmp_select);
407 return ret;
408 }
409
log_slave(TeeSlave *slave, void *log_ctx, int log_level)410 static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
411 {
412 int i;
413 av_log(log_ctx, log_level, "filename:'%s' format:%s\n",
414 slave->avf->url, slave->avf->oformat->name);
415 for (i = 0; i < slave->avf->nb_streams; i++) {
416 AVStream *st = slave->avf->streams[i];
417 AVBSFContext *bsf = slave->bsfs[i];
418 const char *bsf_name;
419
420 av_log(log_ctx, log_level, " stream:%d codec:%s type:%s",
421 i, avcodec_get_name(st->codecpar->codec_id),
422 av_get_media_type_string(st->codecpar->codec_type));
423
424 bsf_name = bsf->filter->priv_class ?
425 bsf->filter->priv_class->item_name(bsf) : bsf->filter->name;
426 av_log(log_ctx, log_level, " bsfs: %s\n", bsf_name);
427 }
428 }
429
tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)430 static int tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)
431 {
432 TeeContext *tee = avf->priv_data;
433 TeeSlave *tee_slave = &tee->slaves[slave_idx];
434
435 tee->nb_alive--;
436
437 close_slave(tee_slave);
438
439 if (!tee->nb_alive) {
440 av_log(avf, AV_LOG_ERROR, "All tee outputs failed.\n");
441 return err_n;
442 } else if (tee_slave->on_fail == ON_SLAVE_FAILURE_ABORT) {
443 av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed, aborting.\n", slave_idx);
444 return err_n;
445 } else {
446 av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed: %s, continuing with %u/%u slaves.\n",
447 slave_idx, av_err2str(err_n), tee->nb_alive, tee->nb_slaves);
448 return 0;
449 }
450 }
451
tee_write_header(AVFormatContext *avf)452 static int tee_write_header(AVFormatContext *avf)
453 {
454 TeeContext *tee = avf->priv_data;
455 unsigned nb_slaves = 0, i;
456 const char *filename = avf->url;
457 char **slaves = NULL;
458 int ret;
459
460 while (*filename) {
461 char *slave = av_get_token(&filename, slave_delim);
462 if (!slave) {
463 ret = AVERROR(ENOMEM);
464 goto fail;
465 }
466 ret = av_dynarray_add_nofree(&slaves, &nb_slaves, slave);
467 if (ret < 0) {
468 av_free(slave);
469 goto fail;
470 }
471 if (strspn(filename, slave_delim))
472 filename++;
473 }
474
475 if (!FF_ALLOCZ_TYPED_ARRAY(tee->slaves, nb_slaves)) {
476 ret = AVERROR(ENOMEM);
477 goto fail;
478 }
479 tee->nb_slaves = tee->nb_alive = nb_slaves;
480
481 for (i = 0; i < nb_slaves; i++) {
482
483 tee->slaves[i].use_fifo = tee->use_fifo;
484 ret = av_dict_copy(&tee->slaves[i].fifo_options, tee->fifo_options, 0);
485 if (ret < 0)
486 goto fail;
487
488 if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
489 ret = tee_process_slave_failure(avf, i, ret);
490 if (ret < 0)
491 goto fail;
492 } else {
493 log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
494 }
495 av_freep(&slaves[i]);
496 }
497
498 for (i = 0; i < avf->nb_streams; i++) {
499 int j, mapped = 0;
500 for (j = 0; j < tee->nb_slaves; j++)
501 if (tee->slaves[j].avf)
502 mapped += tee->slaves[j].stream_map[i] >= 0;
503 if (!mapped)
504 av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
505 "to any slave.\n", i);
506 }
507 av_free(slaves);
508 return 0;
509
510 fail:
511 for (i = 0; i < nb_slaves; i++)
512 av_freep(&slaves[i]);
513 close_slaves(avf);
514 av_free(slaves);
515 return ret;
516 }
517
tee_write_trailer(AVFormatContext *avf)518 static int tee_write_trailer(AVFormatContext *avf)
519 {
520 TeeContext *tee = avf->priv_data;
521 int ret_all = 0, ret;
522 unsigned i;
523
524 for (i = 0; i < tee->nb_slaves; i++) {
525 if ((ret = close_slave(&tee->slaves[i])) < 0) {
526 ret = tee_process_slave_failure(avf, i, ret);
527 if (!ret_all && ret < 0)
528 ret_all = ret;
529 }
530 }
531 av_freep(&tee->slaves);
532 return ret_all;
533 }
534
tee_write_packet(AVFormatContext *avf, AVPacket *pkt)535 static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
536 {
537 TeeContext *tee = avf->priv_data;
538 AVFormatContext *avf2;
539 AVBSFContext *bsfs;
540 AVPacket *const pkt2 = ffformatcontext(avf)->pkt;
541 int ret_all = 0, ret;
542 unsigned i, s;
543 int s2;
544
545 for (i = 0; i < tee->nb_slaves; i++) {
546 if (!(avf2 = tee->slaves[i].avf))
547 continue;
548
549 /* Flush slave if pkt is NULL*/
550 if (!pkt) {
551 ret = av_interleaved_write_frame(avf2, NULL);
552 if (ret < 0) {
553 ret = tee_process_slave_failure(avf, i, ret);
554 if (!ret_all && ret < 0)
555 ret_all = ret;
556 }
557 continue;
558 }
559
560 s = pkt->stream_index;
561 s2 = tee->slaves[i].stream_map[s];
562 if (s2 < 0)
563 continue;
564
565 if ((ret = av_packet_ref(pkt2, pkt)) < 0) {
566 if (!ret_all)
567 ret_all = ret;
568 continue;
569 }
570 bsfs = tee->slaves[i].bsfs[s2];
571 pkt2->stream_index = s2;
572
573 ret = av_bsf_send_packet(bsfs, pkt2);
574 if (ret < 0) {
575 av_packet_unref(pkt2);
576 av_log(avf, AV_LOG_ERROR, "Error while sending packet to bitstream filter: %s\n",
577 av_err2str(ret));
578 ret = tee_process_slave_failure(avf, i, ret);
579 if (!ret_all && ret < 0)
580 ret_all = ret;
581 }
582
583 while(1) {
584 ret = av_bsf_receive_packet(bsfs, pkt2);
585 if (ret == AVERROR(EAGAIN)) {
586 ret = 0;
587 break;
588 } else if (ret < 0) {
589 break;
590 }
591
592 av_packet_rescale_ts(pkt2, bsfs->time_base_out,
593 avf2->streams[s2]->time_base);
594 ret = av_interleaved_write_frame(avf2, pkt2);
595 if (ret < 0)
596 break;
597 };
598
599 if (ret < 0) {
600 ret = tee_process_slave_failure(avf, i, ret);
601 if (!ret_all && ret < 0)
602 ret_all = ret;
603 }
604 }
605 return ret_all;
606 }
607
608 const AVOutputFormat ff_tee_muxer = {
609 .name = "tee",
610 .long_name = NULL_IF_CONFIG_SMALL("Multiple muxer tee"),
611 .priv_data_size = sizeof(TeeContext),
612 .write_header = tee_write_header,
613 .write_trailer = tee_write_trailer,
614 .write_packet = tee_write_packet,
615 .priv_class = &tee_muxer_class,
616 .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
617 };
618