Browse code

avformat: Add fifo pseudo-muxer

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
Signed-off-by: Marton Balint <cus@passwd.hu>

Jan Sebechlebsky authored on 2016/08/09 20:26:03
Showing 7 changed files
... ...
@@ -17,6 +17,7 @@ version <next>:
17 17
 - acrusher audio filter
18 18
 - bitplanenoise video filter
19 19
 - floating point support in als decoder
20
+- fifo muxer
20 21
 
21 22
 
22 23
 version 3.1:
... ...
@@ -2834,6 +2834,7 @@ dv_muxer_select="dvprofile"
2834 2834
 dxa_demuxer_select="riffdec"
2835 2835
 eac3_demuxer_select="ac3_parser"
2836 2836
 f4v_muxer_select="mov_muxer"
2837
+fifo_muxer_deps="pthreads"
2837 2838
 flac_demuxer_select="flac_parser"
2838 2839
 hds_muxer_select="flv_muxer"
2839 2840
 hls_muxer_select="mpegts_muxer"
... ...
@@ -1440,6 +1440,101 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
1440 1440
 
1441 1441
 @end table
1442 1442
 
1443
+@section fifo
1444
+
1445
+The fifo pseudo-muxer allows the separation of encoding and muxing by using
1446
+first-in-first-out queue and running the actual muxer in a separate thread. This
1447
+is especially useful in combination with the @ref{tee} muxer and can be used to
1448
+send data to several destinations with different reliability/writing speed/latency.
1449
+
1450
+API users should be aware that callback functions (interrupt_callback,
1451
+io_open and io_close) used within its AVFormatContext must be thread-safe.
1452
+
1453
+The behavior of the fifo muxer if the queue fills up or if the output fails is
1454
+selectable,
1455
+
1456
+@itemize @bullet
1457
+
1458
+@item
1459
+output can be transparently restarted with configurable delay between retries
1460
+based on real time or time of the processed stream.
1461
+
1462
+@item
1463
+encoding can be blocked during temporary failure, or continue transparently
1464
+dropping packets in case fifo queue fills up.
1465
+
1466
+@end itemize
1467
+
1468
+@table @option
1469
+
1470
+@item fifo_format
1471
+Specify the format name. Useful if it cannot be guessed from the
1472
+output name suffix.
1473
+
1474
+@item queue_size
1475
+Specify size of the queue (number of packets). Default value is 60.
1476
+
1477
+@item format_opts
1478
+Specify format options for the underlying muxer. Muxer options can be specified
1479
+as a list of @var{key}=@var{value} pairs separated by ':'.
1480
+
1481
+@item drop_pkts_on_overflow @var{bool}
1482
+If set to 1 (true), in case the fifo queue fills up, packets will be dropped
1483
+rather than blocking the encoder. This allows to continue streaming without
1484
+delaying the input, at the cost of ommiting part of the stream. By default
1485
+this option is set to 0 (false), so in such cases the encoder will be blocked
1486
+until the muxer processes some of the packets and none of them is lost.
1487
+
1488
+@item attempt_recovery @var{bool}
1489
+If failure occurs, attempt to recover the output. This is especially useful
1490
+when used with network output, allows to restart streaming transparently.
1491
+By default this option is set to 0 (false).
1492
+
1493
+@item max_recovery_attempts
1494
+Sets maximum number of successive unsuccessful recovery attempts after which
1495
+the output fails permanently. By default this option is set to 0 (unlimited).
1496
+
1497
+@item recovery_wait_time @var{duration}
1498
+Waiting time before the next recovery attempt after previous unsuccessful
1499
+recovery attempt. Default value is 5 seconds.
1500
+
1501
+@item recovery_wait_streamtime @var{bool}
1502
+If set to 0 (false), the real time is used when waiting for the recovery
1503
+attempt (i.e. the recovery will be attempted after at least
1504
+recovery_wait_time seconds).
1505
+If set to 1 (true), the time of the processed stream is taken into account
1506
+instead (i.e. the recovery will be attempted after at least @var{recovery_wait_time}
1507
+seconds of the stream is omitted).
1508
+By default, this option is set to 0 (false).
1509
+
1510
+@item recover_any_error @var{bool}
1511
+If set to 1 (true), recovery will be attempted regardless of type of the error
1512
+causing the failure. By default this option is set to 0 (false) and in case of
1513
+certain (usually permanent) errors the recovery is not attempted even when
1514
+@var{attempt_recovery} is set to 1.
1515
+
1516
+@item restart_with_keyframe @var{bool}
1517
+Specify whether to wait for the keyframe after recovering from
1518
+queue overflow or failure. This option is set to 0 (false) by default.
1519
+
1520
+@end table
1521
+
1522
+@subsection Examples
1523
+
1524
+@itemize
1525
+
1526
+@item
1527
+Stream something to rtmp server, continue processing the stream at real-time
1528
+rate even in case of temporary failure (network outage) and attempt to recover
1529
+streaming every second indefinitely.
1530
+@example
1531
+ffmpeg -re -i ... -c:v libx264 -c:a aac -f fifo -fifo_format flv -map 0:v -map 0:a
1532
+  -drop_pkts_on_overflow 1 -attempt_recovery 1 -recovery_wait_time 1 rtmp://example.com/live/stream_name
1533
+@end example
1534
+
1535
+@end itemize
1536
+
1537
+@anchor{tee}
1443 1538
 @section tee
1444 1539
 
1445 1540
 The tee muxer can be used to write the same data to several files or any
... ...
@@ -162,6 +162,7 @@ OBJS-$(CONFIG_FFM_DEMUXER)               += ffmdec.o
162 162
 OBJS-$(CONFIG_FFM_MUXER)                 += ffmenc.o
163 163
 OBJS-$(CONFIG_FFMETADATA_DEMUXER)        += ffmetadec.o
164 164
 OBJS-$(CONFIG_FFMETADATA_MUXER)          += ffmetaenc.o
165
+OBJS-$(CONFIG_FIFO_MUXER)                += fifo.o
165 166
 OBJS-$(CONFIG_FILMSTRIP_DEMUXER)         += filmstripdec.o
166 167
 OBJS-$(CONFIG_FILMSTRIP_MUXER)           += filmstripenc.o
167 168
 OBJS-$(CONFIG_FLAC_DEMUXER)              += flacdec.o rawdec.o \
... ...
@@ -123,6 +123,7 @@ void av_register_all(void)
123 123
     REGISTER_MUXER   (F4V,              f4v);
124 124
     REGISTER_MUXDEMUX(FFM,              ffm);
125 125
     REGISTER_MUXDEMUX(FFMETADATA,       ffmetadata);
126
+    REGISTER_MUXER   (FIFO,             fifo);
126 127
     REGISTER_MUXDEMUX(FILMSTRIP,        filmstrip);
127 128
     REGISTER_MUXDEMUX(FLAC,             flac);
128 129
     REGISTER_DEMUXER (FLIC,             flic);
129 130
new file mode 100644
... ...
@@ -0,0 +1,660 @@
0
+/*
1
+ * FIFO pseudo-muxer
2
+ * Copyright (c) 2016 Jan Sebechlebsky
3
+ *
4
+ * This file is part of FFmpeg.
5
+ *
6
+ * FFmpeg is free software; you can redistribute it and/or
7
+ * modify it under the terms of the GNU Lesser General Public License
8
+ * as published by the Free Software Foundation; either
9
+ * version 2.1 of the License, or (at your option) any later version.
10
+ *
11
+ * FFmpeg is distributed in the hope that it will be useful,
12
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
+ * GNU Lesser General Public License for more details.
15
+ *
16
+ * You should have received a copy of the GNU Lesser General Public License
17
+ * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
18
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
+ */
20
+
21
+#include "libavutil/opt.h"
22
+#include "libavutil/time.h"
23
+#include "libavutil/thread.h"
24
+#include "libavutil/threadmessage.h"
25
+#include "avformat.h"
26
+#include "internal.h"
27
+
28
+#define FIFO_DEFAULT_QUEUE_SIZE              60
29
+#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
30
+#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
31
+
32
+typedef struct FifoContext {
33
+    const AVClass *class;
34
+    AVFormatContext *avf;
35
+
36
+    char *format;
37
+    char *format_options_str;
38
+    AVDictionary *format_options;
39
+
40
+    int queue_size;
41
+    AVThreadMessageQueue *queue;
42
+
43
+    pthread_t writer_thread;
44
+
45
+    /* Return value of last write_trailer_call */
46
+    int write_trailer_ret;
47
+
48
+    /* Time to wait before next recovery attempt
49
+     * This can refer to the time in processed stream,
50
+     * or real time. */
51
+    int64_t recovery_wait_time;
52
+
53
+    /* Maximal number of unsuccessful successive recovery attempts */
54
+    int max_recovery_attempts;
55
+
56
+    /* Whether to attempt recovery from failure */
57
+    int attempt_recovery;
58
+
59
+    /* If >0 stream time will be used when waiting
60
+     * for the recovery attempt instead of real time */
61
+    int recovery_wait_streamtime;
62
+
63
+    /* If >0 recovery will be attempted regardless of error code
64
+     * (except AVERROR_EXIT, so exit request is never ignored) */
65
+    int recover_any_error;
66
+
67
+    /* Whether to drop packets in case the queue is full. */
68
+    int drop_pkts_on_overflow;
69
+
70
+    /* Whether to wait for keyframe when recovering
71
+     * from failure or queue overflow */
72
+    int restart_with_keyframe;
73
+
74
+    pthread_mutex_t overflow_flag_lock;
75
+    /* Value > 0 signals queue overflow */
76
+    volatile uint8_t overflow_flag;
77
+
78
+} FifoContext;
79
+
80
+typedef struct FifoThreadContext {
81
+    AVFormatContext *avf;
82
+
83
+    /* Timestamp of last failure.
84
+     * This is either pts in case stream time is used,
85
+     * or microseconds as returned by av_getttime_relative() */
86
+    int64_t last_recovery_ts;
87
+
88
+    /* Number of current recovery process
89
+     * Value > 0 means we are in recovery process */
90
+    int recovery_nr;
91
+
92
+    /* If > 0 all frames will be dropped until keyframe is received */
93
+    uint8_t drop_until_keyframe;
94
+
95
+    /* Value > 0 means that the previous write_header call was successful
96
+     * so finalization by calling write_trailer and ff_io_close must be done
97
+     * before exiting / reinitialization of underlying muxer */
98
+    uint8_t header_written;
99
+} FifoThreadContext;
100
+
101
+typedef enum FifoMessageType {
102
+    FIFO_WRITE_HEADER,
103
+    FIFO_WRITE_PACKET,
104
+    FIFO_FLUSH_OUTPUT
105
+} FifoMessageType;
106
+
107
+typedef struct FifoMessage {
108
+    FifoMessageType type;
109
+    AVPacket pkt;
110
+} FifoMessage;
111
+
112
+static int fifo_thread_write_header(FifoThreadContext *ctx)
113
+{
114
+    AVFormatContext *avf = ctx->avf;
115
+    FifoContext *fifo = avf->priv_data;
116
+    AVFormatContext *avf2 = fifo->avf;
117
+    AVDictionary *format_options = NULL;
118
+    int ret, i;
119
+
120
+    ret = av_dict_copy(&format_options, fifo->format_options, 0);
121
+    if (ret < 0)
122
+        return ret;
123
+
124
+    ret = ff_format_output_open(avf2, avf->filename, &format_options);
125
+    if (ret < 0) {
126
+        av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
127
+               av_err2str(ret));
128
+        goto end;
129
+    }
130
+
131
+    for (i = 0;i < avf2->nb_streams; i++)
132
+        avf2->streams[i]->cur_dts = 0;
133
+
134
+    ret = avformat_write_header(avf2, &format_options);
135
+    if (!ret)
136
+        ctx->header_written = 1;
137
+
138
+    // Check for options unrecognized by underlying muxer
139
+    if (format_options) {
140
+        AVDictionaryEntry *entry = NULL;
141
+        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
142
+            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
143
+        ret = AVERROR(EINVAL);
144
+    }
145
+
146
+end:
147
+    av_dict_free(&format_options);
148
+    return ret;
149
+}
150
+
151
+static int fifo_thread_flush_output(FifoThreadContext *ctx)
152
+{
153
+    AVFormatContext *avf = ctx->avf;
154
+    FifoContext *fifo = avf->priv_data;
155
+    AVFormatContext *avf2 = fifo->avf;
156
+
157
+    return av_write_frame(avf2, NULL);
158
+}
159
+
160
+static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
161
+{
162
+    AVFormatContext *avf = ctx->avf;
163
+    FifoContext *fifo = avf->priv_data;
164
+    AVFormatContext *avf2 = fifo->avf;
165
+    AVRational src_tb, dst_tb;
166
+    int ret, s_idx;
167
+
168
+    if (ctx->drop_until_keyframe) {
169
+        if (pkt->flags & AV_PKT_FLAG_KEY) {
170
+            ctx->drop_until_keyframe = 0;
171
+            av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
172
+        } else {
173
+            av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
174
+            av_packet_unref(pkt);
175
+            return 0;
176
+        }
177
+    }
178
+
179
+    s_idx = pkt->stream_index;
180
+    src_tb = avf->streams[s_idx]->time_base;
181
+    dst_tb = avf2->streams[s_idx]->time_base;
182
+    av_packet_rescale_ts(pkt, src_tb, dst_tb);
183
+
184
+    ret = av_write_frame(avf2, pkt);
185
+    if (ret >= 0)
186
+        av_packet_unref(pkt);
187
+    return ret;
188
+}
189
+
190
+static int fifo_thread_write_trailer(FifoThreadContext *ctx)
191
+{
192
+    AVFormatContext *avf = ctx->avf;
193
+    FifoContext *fifo = avf->priv_data;
194
+    AVFormatContext *avf2 = fifo->avf;
195
+    int ret;
196
+
197
+    if (!ctx->header_written)
198
+        return 0;
199
+
200
+    ret = av_write_trailer(avf2);
201
+    ff_format_io_close(avf2, &avf2->pb);
202
+
203
+    return ret;
204
+}
205
+
206
+static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
207
+{
208
+    int ret;
209
+
210
+    if (!ctx->header_written) {
211
+        ret = fifo_thread_write_header(ctx);
212
+        if (ret < 0)
213
+            return ret;
214
+    }
215
+
216
+    switch(msg->type) {
217
+    case FIFO_WRITE_HEADER:
218
+        return ret;
219
+    case FIFO_WRITE_PACKET:
220
+        return fifo_thread_write_packet(ctx, &msg->pkt);
221
+    case FIFO_FLUSH_OUTPUT:
222
+        return fifo_thread_flush_output(ctx);
223
+    }
224
+
225
+    return AVERROR(EINVAL);
226
+}
227
+
228
+static int is_recoverable(const FifoContext *fifo, int err_no) {
229
+    if (!fifo->attempt_recovery)
230
+        return 0;
231
+
232
+    if (fifo->recover_any_error)
233
+        return err_no != AVERROR_EXIT;
234
+
235
+    switch (err_no) {
236
+    case AVERROR(EINVAL):
237
+    case AVERROR(ENOSYS):
238
+    case AVERROR_EOF:
239
+    case AVERROR_EXIT:
240
+    case AVERROR_PATCHWELCOME:
241
+        return 0;
242
+    default:
243
+        return 1;
244
+    }
245
+}
246
+
247
+static void free_message(void *msg)
248
+{
249
+    FifoMessage *fifo_msg = msg;
250
+
251
+    if (fifo_msg->type == FIFO_WRITE_PACKET)
252
+        av_packet_unref(&fifo_msg->pkt);
253
+}
254
+
255
+static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
256
+                                                int err_no)
257
+{
258
+    AVFormatContext *avf = ctx->avf;
259
+    FifoContext *fifo = avf->priv_data;
260
+    int ret;
261
+
262
+    av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
263
+           av_err2str(err_no));
264
+
265
+    if (fifo->recovery_wait_streamtime) {
266
+        if (pkt->pts == AV_NOPTS_VALUE)
267
+            av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
268
+                   " timestamp, recovery will be attempted immediately");
269
+        ctx->last_recovery_ts = pkt->pts;
270
+    } else {
271
+        ctx->last_recovery_ts = av_gettime_relative();
272
+    }
273
+
274
+    if (fifo->max_recovery_attempts &&
275
+        ctx->recovery_nr >= fifo->max_recovery_attempts) {
276
+        av_log(avf, AV_LOG_ERROR,
277
+               "Maximal number of %d recovery attempts reached.\n",
278
+               fifo->max_recovery_attempts);
279
+        ret = err_no;
280
+    } else {
281
+        ret = AVERROR(EAGAIN);
282
+    }
283
+
284
+    return ret;
285
+}
286
+
287
+static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
288
+{
289
+    AVFormatContext *avf = ctx->avf;
290
+    FifoContext *fifo = avf->priv_data;
291
+    AVPacket *pkt = &msg->pkt;
292
+    int64_t time_since_recovery;
293
+    int ret;
294
+
295
+    if (!is_recoverable(fifo, err_no)) {
296
+        ret = err_no;
297
+        goto fail;
298
+    }
299
+
300
+    if (ctx->header_written) {
301
+        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
302
+        ctx->header_written = 0;
303
+    }
304
+
305
+    if (!ctx->recovery_nr) {
306
+        ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
307
+                                AV_NOPTS_VALUE : 0;
308
+    } else {
309
+        if (fifo->recovery_wait_streamtime) {
310
+            if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
311
+                AVRational tb = avf->streams[pkt->stream_index]->time_base;
312
+                time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
313
+                                                   tb, AV_TIME_BASE_Q);
314
+            } else {
315
+                /* Enforce recovery immediately */
316
+                time_since_recovery = fifo->recovery_wait_time;
317
+            }
318
+        } else {
319
+            time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
320
+        }
321
+
322
+        if (time_since_recovery < fifo->recovery_wait_time)
323
+            return AVERROR(EAGAIN);
324
+    }
325
+
326
+    ctx->recovery_nr++;
327
+
328
+    if (fifo->max_recovery_attempts) {
329
+        av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
330
+               ctx->recovery_nr, fifo->max_recovery_attempts);
331
+    } else {
332
+        av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
333
+               ctx->recovery_nr);
334
+    }
335
+
336
+    if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
337
+        ctx->drop_until_keyframe = 1;
338
+
339
+    ret = fifo_thread_dispatch_message(ctx, msg);
340
+    if (ret < 0) {
341
+        if (is_recoverable(fifo, ret)) {
342
+            return fifo_thread_process_recovery_failure(ctx, pkt, ret);
343
+        } else {
344
+            goto fail;
345
+        }
346
+    } else {
347
+        av_log(avf, AV_LOG_INFO, "Recovery successful\n");
348
+        ctx->recovery_nr = 0;
349
+    }
350
+
351
+    return 0;
352
+
353
+fail:
354
+    free_message(msg);
355
+    return ret;
356
+}
357
+
358
+static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
359
+{
360
+    AVFormatContext *avf = ctx->avf;
361
+    FifoContext *fifo = avf->priv_data;
362
+    int ret;
363
+
364
+    do {
365
+        if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
366
+            int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
367
+            int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
368
+            if (time_to_wait)
369
+                av_usleep(FFMIN(10000, time_to_wait));
370
+        }
371
+
372
+        ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
373
+    } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
374
+
375
+    if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
376
+        if (msg->type == FIFO_WRITE_PACKET)
377
+            av_packet_unref(&msg->pkt);
378
+        ret = 0;
379
+    }
380
+
381
+    return ret;
382
+}
383
+
384
+static void *fifo_consumer_thread(void *data)
385
+{
386
+    AVFormatContext *avf = data;
387
+    FifoContext *fifo = avf->priv_data;
388
+    AVThreadMessageQueue *queue = fifo->queue;
389
+    FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
390
+    int ret;
391
+
392
+    FifoThreadContext fifo_thread_ctx;
393
+    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
394
+    fifo_thread_ctx.avf = avf;
395
+
396
+    while (1) {
397
+        uint8_t just_flushed = 0;
398
+
399
+        if (!fifo_thread_ctx.recovery_nr)
400
+            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
401
+
402
+        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
403
+            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
404
+            if (rec_ret < 0) {
405
+                av_thread_message_queue_set_err_send(queue, rec_ret);
406
+                break;
407
+            }
408
+        }
409
+
410
+        /* If the queue is full at the moment when fifo_write_packet
411
+         * attempts to insert new message (packet) to the queue,
412
+         * it sets the fifo->overflow_flag to 1 and drops packet.
413
+         * Here in consumer thread, the flag is checked and if it is
414
+         * set, the queue is flushed and flag cleared. */
415
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
416
+        if (fifo->overflow_flag) {
417
+            av_thread_message_flush(queue);
418
+            if (fifo->restart_with_keyframe)
419
+                fifo_thread_ctx.drop_until_keyframe = 1;
420
+            fifo->overflow_flag = 0;
421
+            just_flushed = 1;
422
+        }
423
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
424
+
425
+        if (just_flushed)
426
+            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
427
+
428
+        ret = av_thread_message_queue_recv(queue, &msg, 0);
429
+        if (ret < 0) {
430
+            av_thread_message_queue_set_err_send(queue, ret);
431
+            break;
432
+        }
433
+    }
434
+
435
+    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
436
+
437
+    return NULL;
438
+}
439
+
440
+static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat)
441
+{
442
+    FifoContext *fifo = avf->priv_data;
443
+    AVFormatContext *avf2;
444
+    int ret = 0, i;
445
+
446
+    ret = avformat_alloc_output_context2(&avf2, oformat, NULL, NULL);
447
+    if (ret < 0)
448
+        return ret;
449
+
450
+    fifo->avf = avf2;
451
+
452
+    avf2->interrupt_callback = avf->interrupt_callback;
453
+    avf2->max_delay = avf->max_delay;
454
+    ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
455
+    if (ret < 0)
456
+        return ret;
457
+    avf2->opaque = avf->opaque;
458
+    avf2->io_close = avf->io_close;
459
+    avf2->io_open = avf->io_open;
460
+    avf2->flags = avf->flags;
461
+
462
+    for (i = 0; i < avf->nb_streams; ++i) {
463
+        AVStream *st = avformat_new_stream(avf2, NULL);
464
+        if (!st)
465
+            return AVERROR(ENOMEM);
466
+
467
+        ret = ff_stream_encode_params_copy(st, avf->streams[i]);
468
+        if (ret < 0)
469
+            return ret;
470
+    }
471
+
472
+    return 0;
473
+}
474
+
475
+static int fifo_init(AVFormatContext *avf)
476
+{
477
+    FifoContext *fifo = avf->priv_data;
478
+    AVOutputFormat *oformat;
479
+    int ret = 0;
480
+
481
+    if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
482
+        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
483
+               " only when drop_pkts_on_overflow is also turned on\n");
484
+        return AVERROR(EINVAL);
485
+    }
486
+
487
+    if (fifo->format_options_str) {
488
+        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
489
+                                   "=", ":", 0);
490
+        if (ret < 0) {
491
+            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
492
+                   fifo->format_options_str);
493
+            return ret;
494
+        }
495
+    }
496
+
497
+    oformat = av_guess_format(fifo->format, avf->filename, NULL);
498
+    if (!oformat) {
499
+        ret = AVERROR_MUXER_NOT_FOUND;
500
+        return ret;
501
+    }
502
+
503
+    ret = fifo_mux_init(avf, oformat);
504
+    if (ret < 0)
505
+        return ret;
506
+
507
+    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
508
+                                        sizeof(FifoMessage));
509
+    if (ret < 0)
510
+        return ret;
511
+
512
+    av_thread_message_queue_set_free_func(fifo->queue, free_message);
513
+
514
+    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
515
+    if (ret < 0)
516
+        return AVERROR(ret);
517
+
518
+    return 0;
519
+}
520
+
521
+static int fifo_write_header(AVFormatContext *avf)
522
+{
523
+    FifoContext * fifo = avf->priv_data;
524
+    int ret;
525
+
526
+    ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
527
+    if (ret) {
528
+        av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
529
+               av_err2str(AVERROR(ret)));
530
+        ret = AVERROR(ret);
531
+    }
532
+
533
+    return ret;
534
+}
535
+
536
+static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
537
+{
538
+    FifoContext *fifo = avf->priv_data;
539
+    FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
540
+    int ret;
541
+
542
+    if (pkt) {
543
+        av_init_packet(&msg.pkt);
544
+        ret = av_packet_ref(&msg.pkt,pkt);
545
+        if (ret < 0)
546
+            return ret;
547
+    }
548
+
549
+    ret = av_thread_message_queue_send(fifo->queue, &msg,
550
+                                       fifo->drop_pkts_on_overflow ?
551
+                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
552
+    if (ret == AVERROR(EAGAIN)) {
553
+        uint8_t overflow_set = 0;
554
+
555
+        /* Queue is full, set fifo->overflow_flag to 1
556
+         * to let consumer thread know the queue should
557
+         * be flushed. */
558
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
559
+        if (!fifo->overflow_flag)
560
+            fifo->overflow_flag = overflow_set = 1;
561
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
562
+
563
+        if (overflow_set)
564
+            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
565
+        ret = 0;
566
+        goto fail;
567
+    } else if (ret < 0) {
568
+        goto fail;
569
+    }
570
+
571
+    return ret;
572
+fail:
573
+    if (pkt)
574
+        av_packet_unref(&msg.pkt);
575
+    return ret;
576
+}
577
+
578
+static int fifo_write_trailer(AVFormatContext *avf)
579
+{
580
+    FifoContext *fifo= avf->priv_data;
581
+    int ret;
582
+
583
+    av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
584
+
585
+    ret = pthread_join(fifo->writer_thread, NULL);
586
+    if (ret < 0) {
587
+        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
588
+               av_err2str(AVERROR(ret)));
589
+        return AVERROR(ret);
590
+    }
591
+
592
+    ret = fifo->write_trailer_ret;
593
+    return ret;
594
+}
595
+
596
+static void fifo_deinit(AVFormatContext *avf)
597
+{
598
+    FifoContext *fifo = avf->priv_data;
599
+
600
+    av_dict_free(&fifo->format_options);
601
+    avformat_free_context(fifo->avf);
602
+    av_thread_message_queue_free(&fifo->queue);
603
+    pthread_mutex_destroy(&fifo->overflow_flag_lock);
604
+}
605
+
606
+#define OFFSET(x) offsetof(FifoContext, x)
607
+static const AVOption options[] = {
608
+        {"fifo_format", "Target muxer", OFFSET(format),
609
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
610
+
611
+        {"queue_size", "Size of fifo queue", OFFSET(queue_size),
612
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
613
+
614
+        {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
615
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
616
+
617
+        {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
618
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
619
+
620
+        {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
621
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
622
+
623
+        {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
624
+        AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
625
+
626
+        {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
627
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
628
+
629
+        {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
630
+         AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
631
+
632
+        {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
633
+         OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
634
+
635
+        {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
636
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
637
+
638
+        {NULL},
639
+};
640
+
641
+static const AVClass fifo_muxer_class = {
642
+    .class_name = "Fifo muxer",
643
+    .item_name  = av_default_item_name,
644
+    .option     = options,
645
+    .version    = LIBAVUTIL_VERSION_INT,
646
+};
647
+
648
+AVOutputFormat ff_fifo_muxer = {
649
+    .name           = "fifo",
650
+    .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
651
+    .priv_data_size = sizeof(FifoContext),
652
+    .init           = fifo_init,
653
+    .write_header   = fifo_write_header,
654
+    .write_packet   = fifo_write_packet,
655
+    .write_trailer  = fifo_write_trailer,
656
+    .deinit         = fifo_deinit,
657
+    .priv_class     = &fifo_muxer_class,
658
+    .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
659
+};
... ...
@@ -32,8 +32,8 @@
32 32
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
33 33
 // Also please add any ticket numbers that you believe might be affected here
34 34
 #define LIBAVFORMAT_VERSION_MAJOR  57
35
-#define LIBAVFORMAT_VERSION_MINOR  47
36
-#define LIBAVFORMAT_VERSION_MICRO 101
35
+#define LIBAVFORMAT_VERSION_MINOR  48
36
+#define LIBAVFORMAT_VERSION_MICRO 100
37 37
 
38 38
 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
39 39
                                                LIBAVFORMAT_VERSION_MINOR, \