1 /*
2  * Tee pseudo-muxer
3  * Copyright (c) 2012 Nicolas George
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 
23 #include "libavutil/avutil.h"
24 #include "libavutil/avstring.h"
25 #include "libavutil/opt.h"
26 #include "libavcodec/bsf.h"
27 #include "internal.h"
28 #include "avformat.h"
29 #include "mux.h"
30 #include "tee_common.h"
31 
32 typedef enum {
33     ON_SLAVE_FAILURE_ABORT  = 1,
34     ON_SLAVE_FAILURE_IGNORE = 2
35 } SlaveFailurePolicy;
36 
37 #define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
38 
39 typedef struct {
40     AVFormatContext *avf;
41     AVBSFContext **bsfs; ///< bitstream filters per stream
42 
43     SlaveFailurePolicy on_fail;
44     int use_fifo;
45     AVDictionary *fifo_options;
46 
47     /** map from input to output streams indexes,
48      * disabled output streams are set to -1 */
49     int *stream_map;
50     int header_written;
51 } TeeSlave;
52 
53 typedef struct TeeContext {
54     const AVClass *class;
55     unsigned nb_slaves;
56     unsigned nb_alive;
57     TeeSlave *slaves;
58     int use_fifo;
59     AVDictionary *fifo_options;
60 } TeeContext;
61 
62 static const char *const slave_delim     = "|";
63 static const char *const slave_bsfs_spec_sep = "/";
64 static const char *const slave_select_sep = ",";
65 
66 #define OFFSET(x) offsetof(TeeContext, x)
67 static const AVOption options[] = {
68         {"use_fifo", "Use fifo pseudo-muxer to separate actual muxers from encoder",
69          OFFSET(use_fifo), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
70         {"fifo_options", "fifo pseudo-muxer options", OFFSET(fifo_options),
71          AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
72         {NULL}
73 };
74 
75 static const AVClass tee_muxer_class = {
76     .class_name = "Tee muxer",
77     .item_name  = av_default_item_name,
78     .option = options,
79     .version    = LIBAVUTIL_VERSION_INT,
80 };
81 
parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)82 static inline int parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)
83 {
84     if (!opt) {
85         tee_slave->on_fail = DEFAULT_SLAVE_FAILURE_POLICY;
86         return 0;
87     } else if (!av_strcasecmp("abort", opt)) {
88         tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
89         return 0;
90     } else if (!av_strcasecmp("ignore", opt)) {
91         tee_slave->on_fail = ON_SLAVE_FAILURE_IGNORE;
92         return 0;
93     }
94     /* Set failure behaviour to abort, so invalid option error will not be ignored */
95     tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
96     return AVERROR(EINVAL);
97 }
98 
parse_slave_fifo_policy(const char *use_fifo, TeeSlave *tee_slave)99 static int parse_slave_fifo_policy(const char *use_fifo, TeeSlave *tee_slave)
100 {
101     /*TODO - change this to use proper function for parsing boolean
102      *       options when there is one */
103     if (av_match_name(use_fifo, "true,y,yes,enable,enabled,on,1")) {
104         tee_slave->use_fifo = 1;
105     } else if (av_match_name(use_fifo, "false,n,no,disable,disabled,off,0")) {
106         tee_slave->use_fifo = 0;
107     } else {
108         return AVERROR(EINVAL);
109     }
110     return 0;
111 }
112 
parse_slave_fifo_options(const char *fifo_options, TeeSlave *tee_slave)113 static int parse_slave_fifo_options(const char *fifo_options, TeeSlave *tee_slave)
114 {
115     return av_dict_parse_string(&tee_slave->fifo_options, fifo_options, "=", ":", 0);
116 }
117 
close_slave(TeeSlave *tee_slave)118 static int close_slave(TeeSlave *tee_slave)
119 {
120     AVFormatContext *avf;
121     unsigned i;
122     int ret = 0;
123 
124     av_dict_free(&tee_slave->fifo_options);
125     avf = tee_slave->avf;
126     if (!avf)
127         return 0;
128 
129     if (tee_slave->header_written)
130         ret = av_write_trailer(avf);
131 
132     if (tee_slave->bsfs) {
133         for (i = 0; i < avf->nb_streams; ++i)
134             av_bsf_free(&tee_slave->bsfs[i]);
135     }
136     av_freep(&tee_slave->stream_map);
137     av_freep(&tee_slave->bsfs);
138 
139     ff_format_io_close(avf, &avf->pb);
140     avformat_free_context(avf);
141     tee_slave->avf = NULL;
142     return ret;
143 }
144 
close_slaves(AVFormatContext *avf)145 static void close_slaves(AVFormatContext *avf)
146 {
147     TeeContext *tee = avf->priv_data;
148     unsigned i;
149 
150     for (i = 0; i < tee->nb_slaves; i++) {
151         close_slave(&tee->slaves[i]);
152     }
153     av_freep(&tee->slaves);
154 }
155 
open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)156 static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
157 {
158     int i, ret;
159     AVDictionary *options = NULL, *bsf_options = NULL;
160     AVDictionaryEntry *entry;
161     char *filename;
162     char *format = NULL, *select = NULL, *on_fail = NULL;
163     char *use_fifo = NULL, *fifo_options_str = NULL;
164     AVFormatContext *avf2 = NULL;
165     AVStream *st, *st2;
166     int stream_count;
167     int fullret;
168     char *subselect = NULL, *next_subselect = NULL, *first_subselect = NULL, *tmp_select = NULL;
169 
170     if ((ret = ff_tee_parse_slave_options(avf, slave, &options, &filename)) < 0)
171         return ret;
172 
173 #define CONSUME_OPTION(option, field, action) do {                      \
174         if ((entry = av_dict_get(options, option, NULL, 0))) {          \
175             field = entry->value;                                       \
176             { action }                                                  \
177             av_dict_set(&options, option, NULL, 0);                     \
178         }                                                               \
179     } while (0)
180 #define STEAL_OPTION(option, field)                                     \
181     CONSUME_OPTION(option, field,                                       \
182                    entry->value = NULL; /* prevent it from being freed */)
183 #define PROCESS_OPTION(option, field, function, on_error)               \
184     CONSUME_OPTION(option, field, if ((ret = function) < 0) { { on_error } goto end; })
185 
186     STEAL_OPTION("f", format);
187     STEAL_OPTION("select", select);
188     PROCESS_OPTION("onfail", on_fail,
189                    parse_slave_failure_policy_option(on_fail, tee_slave),
190                    av_log(avf, AV_LOG_ERROR, "Invalid onfail option value, "
191                           "valid options are 'abort' and 'ignore'\n"););
192     PROCESS_OPTION("use_fifo", use_fifo,
193                    parse_slave_fifo_policy(use_fifo, tee_slave),
194                    av_log(avf, AV_LOG_ERROR, "Error parsing fifo options: %s\n",
195                           av_err2str(ret)););
196     PROCESS_OPTION("fifo_options", fifo_options_str,
197                    parse_slave_fifo_options(fifo_options_str, tee_slave), ;);
198     entry = NULL;
199     while ((entry = av_dict_get(options, "bsfs", entry, AV_DICT_IGNORE_SUFFIX))) {
200         /* trim out strlen("bsfs") characters from key */
201         av_dict_set(&bsf_options, entry->key + 4, entry->value, 0);
202         av_dict_set(&options, entry->key, NULL, 0);
203     }
204 
205     if (tee_slave->use_fifo) {
206 
207         if (options) {
208             char *format_options_str = NULL;
209             ret = av_dict_get_string(options, &format_options_str, '=', ':');
210             if (ret < 0)
211                 goto end;
212 
213             ret = av_dict_set(&tee_slave->fifo_options, "format_opts", format_options_str,
214                               AV_DICT_DONT_STRDUP_VAL);
215             if (ret < 0)
216                 goto end;
217         }
218 
219         if (format) {
220             ret = av_dict_set(&tee_slave->fifo_options, "fifo_format", format,
221                               AV_DICT_DONT_STRDUP_VAL);
222             format = NULL;
223             if (ret < 0)
224                 goto end;
225         }
226 
227         av_dict_free(&options);
228         options = tee_slave->fifo_options;
229         tee_slave->fifo_options = NULL;
230     }
231     ret = avformat_alloc_output_context2(&avf2, NULL,
232                                          tee_slave->use_fifo ? "fifo" :format, filename);
233     if (ret < 0)
234         goto end;
235     tee_slave->avf = avf2;
236     av_dict_copy(&avf2->metadata, avf->metadata, 0);
237     avf2->opaque   = avf->opaque;
238     avf2->io_open  = avf->io_open;
239     avf2->io_close = avf->io_close;
240     avf2->io_close2 = avf->io_close2;
241     avf2->interrupt_callback = avf->interrupt_callback;
242     avf2->flags = avf->flags;
243     avf2->strict_std_compliance = avf->strict_std_compliance;
244 
245     tee_slave->stream_map = av_calloc(avf->nb_streams, sizeof(*tee_slave->stream_map));
246     if (!tee_slave->stream_map) {
247         ret = AVERROR(ENOMEM);
248         goto end;
249     }
250 
251     stream_count = 0;
252     for (i = 0; i < avf->nb_streams; i++) {
253         st = avf->streams[i];
254         if (select) {
255             tmp_select = av_strdup(select);  // av_strtok is destructive so we regenerate it in each loop
256             if (!tmp_select) {
257                 ret = AVERROR(ENOMEM);
258                 goto end;
259             }
260             fullret = 0;
261             first_subselect = tmp_select;
262             next_subselect = NULL;
263             while (subselect = av_strtok(first_subselect, slave_select_sep, &next_subselect)) {
264                 first_subselect = NULL;
265 
266                 ret = avformat_match_stream_specifier(avf, avf->streams[i], subselect);
267                 if (ret < 0) {
268                     av_log(avf, AV_LOG_ERROR,
269                            "Invalid stream specifier '%s' for output '%s'\n",
270                            subselect, slave);
271                     goto end;
272                 }
273                 if (ret != 0) {
274                     fullret = 1; // match
275                     break;
276                 }
277             }
278             av_freep(&tmp_select);
279 
280             if (fullret == 0) { /* no match */
281                 tee_slave->stream_map[i] = -1;
282                 continue;
283             }
284         }
285         tee_slave->stream_map[i] = stream_count++;
286 
287         if (!(st2 = avformat_new_stream(avf2, NULL))) {
288             ret = AVERROR(ENOMEM);
289             goto end;
290         }
291 
292         ret = ff_stream_encode_params_copy(st2, st);
293         if (ret < 0)
294             goto end;
295     }
296 
297     ret = ff_format_output_open(avf2, filename, &options);
298     if (ret < 0) {
299         av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n", slave,
300                av_err2str(ret));
301         goto end;
302     }
303 
304     if ((ret = avformat_write_header(avf2, &options)) < 0) {
305         av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
306                slave, av_err2str(ret));
307         goto end;
308     }
309     tee_slave->header_written = 1;
310 
311     tee_slave->bsfs = av_calloc(avf2->nb_streams, sizeof(*tee_slave->bsfs));
312     if (!tee_slave->bsfs) {
313         ret = AVERROR(ENOMEM);
314         goto end;
315     }
316 
317     entry = NULL;
318     while (entry = av_dict_get(bsf_options, "", NULL, AV_DICT_IGNORE_SUFFIX)) {
319         const char *spec = entry->key;
320         if (*spec) {
321             if (strspn(spec, slave_bsfs_spec_sep) != 1) {
322                 av_log(avf, AV_LOG_ERROR,
323                        "Specifier separator in '%s' is '%c', but only characters '%s' "
324                        "are allowed\n", entry->key, *spec, slave_bsfs_spec_sep);
325                 ret = AVERROR(EINVAL);
326                 goto end;
327             }
328             spec++; /* consume separator */
329         }
330 
331         for (i = 0; i < avf2->nb_streams; i++) {
332             ret = avformat_match_stream_specifier(avf2, avf2->streams[i], spec);
333             if (ret < 0) {
334                 av_log(avf, AV_LOG_ERROR,
335                        "Invalid stream specifier '%s' in bsfs option '%s' for slave "
336                        "output '%s'\n", spec, entry->key, filename);
337                 goto end;
338             }
339 
340             if (ret > 0) {
341                 av_log(avf, AV_LOG_DEBUG, "spec:%s bsfs:%s matches stream %d of slave "
342                        "output '%s'\n", spec, entry->value, i, filename);
343                 if (tee_slave->bsfs[i]) {
344                     av_log(avf, AV_LOG_WARNING,
345                            "Duplicate bsfs specification associated to stream %d of slave "
346                            "output '%s', filters will be ignored\n", i, filename);
347                     continue;
348                 }
349                 ret = av_bsf_list_parse_str(entry->value, &tee_slave->bsfs[i]);
350                 if (ret < 0) {
351                     av_log(avf, AV_LOG_ERROR,
352                            "Error parsing bitstream filter sequence '%s' associated to "
353                            "stream %d of slave output '%s'\n", entry->value, i, filename);
354                     goto end;
355                 }
356             }
357         }
358 
359         av_dict_set(&bsf_options, entry->key, NULL, 0);
360     }
361 
362     for (i = 0; i < avf->nb_streams; i++){
363         int target_stream = tee_slave->stream_map[i];
364         if (target_stream < 0)
365             continue;
366 
367         if (!tee_slave->bsfs[target_stream]) {
368             /* Add pass-through bitstream filter */
369             ret = av_bsf_get_null_filter(&tee_slave->bsfs[target_stream]);
370             if (ret < 0) {
371                 av_log(avf, AV_LOG_ERROR,
372                        "Failed to create pass-through bitstream filter: %s\n",
373                        av_err2str(ret));
374                 goto end;
375             }
376         }
377 
378         tee_slave->bsfs[target_stream]->time_base_in = avf->streams[i]->time_base;
379         ret = avcodec_parameters_copy(tee_slave->bsfs[target_stream]->par_in,
380                                       avf->streams[i]->codecpar);
381         if (ret < 0)
382             goto end;
383 
384         ret = av_bsf_init(tee_slave->bsfs[target_stream]);
385         if (ret < 0) {
386             av_log(avf, AV_LOG_ERROR,
387             "Failed to initialize bitstream filter(s): %s\n",
388             av_err2str(ret));
389             goto end;
390         }
391     }
392 
393     if (options) {
394         entry = NULL;
395         while ((entry = av_dict_get(options, "", entry, AV_DICT_IGNORE_SUFFIX)))
396             av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
397         ret = AVERROR_OPTION_NOT_FOUND;
398         goto end;
399     }
400 
401 end:
402     av_free(format);
403     av_free(select);
404     av_dict_free(&options);
405     av_dict_free(&bsf_options);
406     av_freep(&tmp_select);
407     return ret;
408 }
409 
log_slave(TeeSlave *slave, void *log_ctx, int log_level)410 static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
411 {
412     int i;
413     av_log(log_ctx, log_level, "filename:'%s' format:%s\n",
414            slave->avf->url, slave->avf->oformat->name);
415     for (i = 0; i < slave->avf->nb_streams; i++) {
416         AVStream *st = slave->avf->streams[i];
417         AVBSFContext *bsf = slave->bsfs[i];
418         const char *bsf_name;
419 
420         av_log(log_ctx, log_level, "    stream:%d codec:%s type:%s",
421                i, avcodec_get_name(st->codecpar->codec_id),
422                av_get_media_type_string(st->codecpar->codec_type));
423 
424         bsf_name = bsf->filter->priv_class ?
425                    bsf->filter->priv_class->item_name(bsf) : bsf->filter->name;
426         av_log(log_ctx, log_level, " bsfs: %s\n", bsf_name);
427     }
428 }
429 
tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)430 static int tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)
431 {
432     TeeContext *tee = avf->priv_data;
433     TeeSlave *tee_slave = &tee->slaves[slave_idx];
434 
435     tee->nb_alive--;
436 
437     close_slave(tee_slave);
438 
439     if (!tee->nb_alive) {
440         av_log(avf, AV_LOG_ERROR, "All tee outputs failed.\n");
441         return err_n;
442     } else if (tee_slave->on_fail == ON_SLAVE_FAILURE_ABORT) {
443         av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed, aborting.\n", slave_idx);
444         return err_n;
445     } else {
446         av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed: %s, continuing with %u/%u slaves.\n",
447                slave_idx, av_err2str(err_n), tee->nb_alive, tee->nb_slaves);
448         return 0;
449     }
450 }
451 
tee_write_header(AVFormatContext *avf)452 static int tee_write_header(AVFormatContext *avf)
453 {
454     TeeContext *tee = avf->priv_data;
455     unsigned nb_slaves = 0, i;
456     const char *filename = avf->url;
457     char **slaves = NULL;
458     int ret;
459 
460     while (*filename) {
461         char *slave = av_get_token(&filename, slave_delim);
462         if (!slave) {
463             ret = AVERROR(ENOMEM);
464             goto fail;
465         }
466         ret = av_dynarray_add_nofree(&slaves, &nb_slaves, slave);
467         if (ret < 0) {
468             av_free(slave);
469             goto fail;
470         }
471         if (strspn(filename, slave_delim))
472             filename++;
473     }
474 
475     if (!FF_ALLOCZ_TYPED_ARRAY(tee->slaves, nb_slaves)) {
476         ret = AVERROR(ENOMEM);
477         goto fail;
478     }
479     tee->nb_slaves = tee->nb_alive = nb_slaves;
480 
481     for (i = 0; i < nb_slaves; i++) {
482 
483         tee->slaves[i].use_fifo = tee->use_fifo;
484         ret = av_dict_copy(&tee->slaves[i].fifo_options, tee->fifo_options, 0);
485         if (ret < 0)
486             goto fail;
487 
488         if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
489             ret = tee_process_slave_failure(avf, i, ret);
490             if (ret < 0)
491                 goto fail;
492         } else {
493             log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
494         }
495         av_freep(&slaves[i]);
496     }
497 
498     for (i = 0; i < avf->nb_streams; i++) {
499         int j, mapped = 0;
500         for (j = 0; j < tee->nb_slaves; j++)
501             if (tee->slaves[j].avf)
502                 mapped += tee->slaves[j].stream_map[i] >= 0;
503         if (!mapped)
504             av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
505                    "to any slave.\n", i);
506     }
507     av_free(slaves);
508     return 0;
509 
510 fail:
511     for (i = 0; i < nb_slaves; i++)
512         av_freep(&slaves[i]);
513     close_slaves(avf);
514     av_free(slaves);
515     return ret;
516 }
517 
tee_write_trailer(AVFormatContext *avf)518 static int tee_write_trailer(AVFormatContext *avf)
519 {
520     TeeContext *tee = avf->priv_data;
521     int ret_all = 0, ret;
522     unsigned i;
523 
524     for (i = 0; i < tee->nb_slaves; i++) {
525         if ((ret = close_slave(&tee->slaves[i])) < 0) {
526             ret = tee_process_slave_failure(avf, i, ret);
527             if (!ret_all && ret < 0)
528                 ret_all = ret;
529         }
530     }
531     av_freep(&tee->slaves);
532     return ret_all;
533 }
534 
tee_write_packet(AVFormatContext *avf, AVPacket *pkt)535 static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
536 {
537     TeeContext *tee = avf->priv_data;
538     AVFormatContext *avf2;
539     AVBSFContext *bsfs;
540     AVPacket *const pkt2 = ffformatcontext(avf)->pkt;
541     int ret_all = 0, ret;
542     unsigned i, s;
543     int s2;
544 
545     for (i = 0; i < tee->nb_slaves; i++) {
546         if (!(avf2 = tee->slaves[i].avf))
547             continue;
548 
549         /* Flush slave if pkt is NULL*/
550         if (!pkt) {
551             ret = av_interleaved_write_frame(avf2, NULL);
552             if (ret < 0) {
553                 ret = tee_process_slave_failure(avf, i, ret);
554                 if (!ret_all && ret < 0)
555                     ret_all = ret;
556             }
557             continue;
558         }
559 
560         s = pkt->stream_index;
561         s2 = tee->slaves[i].stream_map[s];
562         if (s2 < 0)
563             continue;
564 
565         if ((ret = av_packet_ref(pkt2, pkt)) < 0) {
566             if (!ret_all)
567                 ret_all = ret;
568             continue;
569         }
570         bsfs = tee->slaves[i].bsfs[s2];
571         pkt2->stream_index = s2;
572 
573         ret = av_bsf_send_packet(bsfs, pkt2);
574         if (ret < 0) {
575             av_packet_unref(pkt2);
576             av_log(avf, AV_LOG_ERROR, "Error while sending packet to bitstream filter: %s\n",
577                    av_err2str(ret));
578             ret = tee_process_slave_failure(avf, i, ret);
579             if (!ret_all && ret < 0)
580                 ret_all = ret;
581         }
582 
583         while(1) {
584             ret = av_bsf_receive_packet(bsfs, pkt2);
585             if (ret == AVERROR(EAGAIN)) {
586                 ret = 0;
587                 break;
588             } else if (ret < 0) {
589                 break;
590             }
591 
592             av_packet_rescale_ts(pkt2, bsfs->time_base_out,
593                                  avf2->streams[s2]->time_base);
594             ret = av_interleaved_write_frame(avf2, pkt2);
595             if (ret < 0)
596                 break;
597         };
598 
599         if (ret < 0) {
600             ret = tee_process_slave_failure(avf, i, ret);
601             if (!ret_all && ret < 0)
602                 ret_all = ret;
603         }
604     }
605     return ret_all;
606 }
607 
608 const AVOutputFormat ff_tee_muxer = {
609     .name              = "tee",
610     .long_name         = NULL_IF_CONFIG_SMALL("Multiple muxer tee"),
611     .priv_data_size    = sizeof(TeeContext),
612     .write_header      = tee_write_header,
613     .write_trailer     = tee_write_trailer,
614     .write_packet      = tee_write_packet,
615     .priv_class        = &tee_muxer_class,
616     .flags             = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
617 };
618