Browse code

lavfi: add zmq filters

Stefano Sabatini authored on 2013/04/21 22:00:11
Showing 7 changed files
... ...
@@ -45,6 +45,7 @@ version <next>:
45 45
 - RSD demuxer
46 46
 - RedSpark demuxer
47 47
 - ADPCM IMA Radical decoder
48
+- zmq filters
48 49
 
49 50
 
50 51
 version 1.2:
... ...
@@ -236,6 +236,7 @@ External library support:
236 236
   --enable-libxavs         enable AVS encoding via xavs [no]
237 237
   --enable-libxvid         enable Xvid encoding via xvidcore,
238 238
                            native MPEG-4/Xvid encoder exists [no]
239
+  --enable-libzmq          enable message passing via libzmq [no]
239 240
   --enable-openal          enable OpenAL 1.1 capture support [no]
240 241
   --enable-opencl          enable OpenCL code
241 242
   --enable-openssl         enable openssl [no]
... ...
@@ -1195,6 +1196,7 @@ EXTERNAL_LIBRARY_LIST="
1195 1195
     libx264
1196 1196
     libxavs
1197 1197
     libxvid
1198
+    libzmq
1198 1199
     openal
1199 1200
     opencl
1200 1201
     openssl
... ...
@@ -2122,6 +2124,7 @@ aresample_filter_deps="swresample"
2122 2122
 ass_filter_deps="libass"
2123 2123
 asyncts_filter_deps="avresample"
2124 2124
 atempo_filter_deps="avcodec rdft"
2125
+azmq_filter_deps="libzmq"
2125 2126
 blackframe_filter_deps="gpl"
2126 2127
 boxblur_filter_deps="gpl"
2127 2128
 colormatrix_filter_deps="gpl"
... ...
@@ -2166,6 +2169,7 @@ yadif_filter_deps="gpl"
2166 2166
 pixfmts_super2xsai_test_deps="super2xsai_filter"
2167 2167
 tinterlace_merge_test_deps="tinterlace_filter"
2168 2168
 tinterlace_pad_test_deps="tinterlace_filter"
2169
+zmq_filter_deps="libzmq"
2169 2170
 
2170 2171
 # libraries
2171 2172
 avcodec_deps="avutil"
... ...
@@ -4055,6 +4059,7 @@ enabled libx264    && require  libx264 x264.h x264_encoder_encode -lx264 &&
4055 4055
                         die "ERROR: libx264 must be installed and version must be >= 0.118."; }
4056 4056
 enabled libxavs    && require  libxavs xavs.h xavs_encoder_encode -lxavs
4057 4057
 enabled libxvid    && require  libxvid xvid.h xvid_global -lxvidcore
4058
+enabled libzmq     && require_pkg_config libzmq zmq.h zmq_ctx_new
4058 4059
 enabled openal     && { { for al_libs in "${OPENAL_LIBS}" "-lopenal" "-lOpenAL32"; do
4059 4060
                         check_lib 'AL/al.h' alGetError "${al_libs}" && break; done } ||
4060 4061
                         die "ERROR: openal not found"; } &&
... ...
@@ -8371,6 +8371,48 @@ ffmpeg -i INPUT -filter_complex asplit=5 OUTPUT
8371 8371
 @end example
8372 8372
 @end itemize
8373 8373
 
8374
+@section zmq, azmq
8375
+
8376
+Receive commands sent through a libzmq client, and forward them to
8377
+filters in the filtergraph.
8378
+
8379
+@code{zmq} and @code{azmq} work as a pass-through filters. @code{zmq}
8380
+must be inserted between two video filters, @code{azmq} between two
8381
+audio filters.
8382
+
8383
+To enable these filters you need to install the libzmq library and
8384
+headers and configure FFmpeg with @code{--enable-libzmq}.
8385
+
8386
+For more information about libzmq see:
8387
+@url{http://www.zeromq.org/}
8388
+
8389
+The @code{zmq} and @code{azmq} filters work as a libzmq server, which
8390
+receives messages sent through a network interface defined by the
8391
+@option{bind_address} option.
8392
+
8393
+The received message must be in the form:
8394
+@example
8395
+@var{TARGET} @var{COMMAND} [@var{ARG}]
8396
+@end example
8397
+
8398
+@var{TARGET} specifies the target of the command, usually the name of
8399
+the filter class or a specific filter instance name.
8400
+
8401
+@var{COMMAND} specifies the name of the command for the target filter.
8402
+
8403
+@var{ARG} is optional and specifies the optional argument list for the
8404
+given @var{COMMAND}.
8405
+
8406
+Upon reception, the message is processed and the corresponding command
8407
+is injected into the filtergraph. Depending on the result, the filter
8408
+will send a reply to the client, adopting the format:
8409
+@example
8410
+@var{ERROR_CODE} @var{ERROR_REASON}
8411
+@var{MESSAGE}
8412
+@end example
8413
+
8414
+@var{MESSAGE} is optional.
8415
+
8374 8416
 @c man end MULTIMEDIA FILTERS
8375 8417
 
8376 8418
 @chapter Multimedia Sources
... ...
@@ -74,6 +74,7 @@ OBJS-$(CONFIG_ASTREAMSYNC_FILTER)            += af_astreamsync.o
74 74
 OBJS-$(CONFIG_ASYNCTS_FILTER)                += af_asyncts.o
75 75
 OBJS-$(CONFIG_ATEMPO_FILTER)                 += af_atempo.o
76 76
 OBJS-$(CONFIG_ATRIM_FILTER)                  += trim.o
77
+OBJS-$(CONFIG_AZMQ_FILTER)                   += f_zmq.o
77 78
 OBJS-$(CONFIG_BANDPASS_FILTER)               += af_biquads.o
78 79
 OBJS-$(CONFIG_BANDREJECT_FILTER)             += af_biquads.o
79 80
 OBJS-$(CONFIG_BASS_FILTER)                   += af_biquads.o
... ...
@@ -187,6 +188,7 @@ OBJS-$(CONFIG_VFLIP_FILTER)                  += vf_vflip.o
187 187
 OBJS-$(CONFIG_VIDSTABDETECT_FILTER)          += vidstabutils.o vf_vidstabdetect.o
188 188
 OBJS-$(CONFIG_VIDSTABTRANSFORM_FILTER)       += vidstabutils.o vf_vidstabtransform.o
189 189
 OBJS-$(CONFIG_YADIF_FILTER)                  += vf_yadif.o
190
+OBJS-$(CONFIG_ZMQ_FILTER)                    += f_zmq.o
190 191
 
191 192
 OBJS-$(CONFIG_CELLAUTO_FILTER)               += vsrc_cellauto.o
192 193
 OBJS-$(CONFIG_COLOR_FILTER)                  += vsrc_testsrc.o
... ...
@@ -72,6 +72,7 @@ void avfilter_register_all(void)
72 72
     REGISTER_FILTER(ASYNCTS,        asyncts,        af);
73 73
     REGISTER_FILTER(ATEMPO,         atempo,         af);
74 74
     REGISTER_FILTER(ATRIM,          atrim,          af);
75
+    REGISTER_FILTER(AZMQ,           azmq,           af);
75 76
     REGISTER_FILTER(BANDPASS,       bandpass,       af);
76 77
     REGISTER_FILTER(BANDREJECT,     bandreject,     af);
77 78
     REGISTER_FILTER(BASS,           bass,           af);
... ...
@@ -184,6 +185,7 @@ void avfilter_register_all(void)
184 184
     REGISTER_FILTER(VIDSTABDETECT,  vidstabdetect,  vf);
185 185
     REGISTER_FILTER(VIDSTABTRANSFORM, vidstabtransform, vf);
186 186
     REGISTER_FILTER(YADIF,          yadif,          vf);
187
+    REGISTER_FILTER(ZMQ,            zmq,            vf);
187 188
 
188 189
     REGISTER_FILTER(CELLAUTO,       cellauto,       vsrc);
189 190
     REGISTER_FILTER(COLOR,          color,          vsrc);
190 191
new file mode 100644
... ...
@@ -0,0 +1,276 @@
0
+/*
1
+ * Copyright (c) 2013 Stefano Sabatini
2
+ *
3
+ * This file is part of FFmpeg.
4
+ *
5
+ * FFmpeg is free software; you can redistribute it and/or
6
+ * modify it under the terms of the GNU Lesser General Public
7
+ * License as published by the Free Software Foundation; either
8
+ * version 2.1 of the License, or (at your option) any later version.
9
+ *
10
+ * FFmpeg is distributed in the hope that it will be useful,
11
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
+ * Lesser General Public License for more details.
14
+ *
15
+ * You should have received a copy of the GNU Lesser General Public
16
+ * License along with FFmpeg; if not, write to the Free Software
17
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
+ */
19
+
20
+/**
21
+ * @file
22
+ * receive commands through libzeromq and broker them to filters
23
+ */
24
+
25
+#include <zmq.h>
26
+#include "libavutil/avstring.h"
27
+#include "libavutil/bprint.h"
28
+#include "libavutil/opt.h"
29
+#include "avfilter.h"
30
+#include "internal.h"
31
+#include "avfiltergraph.h"
32
+#include "audio.h"
33
+#include "video.h"
34
+
35
+typedef struct {
36
+    const AVClass *class;
37
+    void *zmq;
38
+    void *responder;
39
+    char *bind_address;
40
+    int command_count;
41
+} ZMQContext;
42
+
43
+#define OFFSET(x) offsetof(ZMQContext, x)
44
+#define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
45
+static const AVOption options[] = {
46
+    { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
47
+    { "b",            "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
48
+    { NULL }
49
+};
50
+
51
+static av_cold int init(AVFilterContext *ctx)
52
+{
53
+    ZMQContext *zmq = ctx->priv;
54
+
55
+    zmq->zmq = zmq_ctx_new();
56
+    if (!zmq->zmq) {
57
+        av_log(ctx, AV_LOG_ERROR,
58
+               "Could not create ZMQ context: %s\n", zmq_strerror(errno));
59
+        return AVERROR_EXTERNAL;
60
+    }
61
+
62
+    zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
63
+    if (!zmq->responder) {
64
+        av_log(ctx, AV_LOG_ERROR,
65
+               "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
66
+        return AVERROR_EXTERNAL;
67
+    }
68
+
69
+    if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
70
+        av_log(ctx, AV_LOG_ERROR,
71
+               "Could not bind ZMQ socket to address '%s': %s\n",
72
+               zmq->bind_address, zmq_strerror(errno));
73
+        return AVERROR_EXTERNAL;
74
+    }
75
+
76
+    zmq->command_count = -1;
77
+    return 0;
78
+}
79
+
80
+static void av_cold uninit(AVFilterContext *ctx)
81
+{
82
+    ZMQContext *zmq = ctx->priv;
83
+
84
+    zmq_close(zmq->responder);
85
+    zmq_ctx_destroy(zmq->zmq);
86
+}
87
+
88
+typedef struct {
89
+    char *target, *command, *arg;
90
+} Command;
91
+
92
+#define SPACES " \f\t\n\r"
93
+
94
+static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
95
+{
96
+    const char **buf = &command_str;
97
+
98
+    cmd->target = av_get_token(buf, SPACES);
99
+    if (!cmd->target || !cmd->target[0]) {
100
+        av_log(log_ctx, AV_LOG_ERROR,
101
+               "No target specified in command '%s'\n", command_str);
102
+        return AVERROR(EINVAL);
103
+    }
104
+
105
+    cmd->command = av_get_token(buf, SPACES);
106
+    if (!cmd->command || !cmd->command[0]) {
107
+        av_log(log_ctx, AV_LOG_ERROR,
108
+               "No command specified in command '%s'\n", command_str);
109
+        return AVERROR(EINVAL);
110
+    }
111
+
112
+    cmd->arg = av_get_token(buf, SPACES);
113
+    return 0;
114
+}
115
+
116
+static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
117
+{
118
+    ZMQContext *zmq = ctx->priv;
119
+    zmq_msg_t msg;
120
+    int ret = 0;
121
+
122
+    if (zmq_msg_init(&msg) == -1) {
123
+        av_log(ctx, AV_LOG_WARNING,
124
+               "Could not initialize receive message: %s\n", zmq_strerror(errno));
125
+        return AVERROR_EXTERNAL;
126
+    }
127
+
128
+    if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
129
+        if (errno != EAGAIN)
130
+            av_log(ctx, AV_LOG_WARNING,
131
+                   "Could not receive message: %s\n", zmq_strerror(errno));
132
+        ret = AVERROR_EXTERNAL;
133
+        goto end;
134
+    }
135
+
136
+    *buf_size = zmq_msg_size(&msg) + 1;
137
+    *buf = av_malloc(*buf_size);
138
+    if (!*buf) {
139
+        ret = AVERROR(ENOMEM);
140
+        goto end;
141
+    }
142
+    memcpy(*buf, zmq_msg_data(&msg), *buf_size);
143
+    (*buf)[*buf_size-1] = 0;
144
+
145
+end:
146
+    zmq_msg_close(&msg);
147
+    return ret;
148
+}
149
+
150
+static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
151
+{
152
+    AVFilterContext *ctx = inlink->dst;
153
+    ZMQContext *zmq = ctx->priv;
154
+
155
+    while (1) {
156
+        char cmd_buf[1024];
157
+        char *recv_buf, *send_buf;
158
+        int recv_buf_size;
159
+        Command cmd = {0};
160
+        int ret;
161
+
162
+        /* receive command */
163
+        if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
164
+            break;
165
+        zmq->command_count++;
166
+
167
+        /* parse command */
168
+        if (parse_command(&cmd, recv_buf, ctx) < 0) {
169
+            av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
170
+            goto end;
171
+        }
172
+
173
+        /* process command */
174
+        av_log(ctx, AV_LOG_VERBOSE,
175
+               "Processing command #%d target:%s command:%s arg:%s\n",
176
+               zmq->command_count, cmd.target, cmd.command, cmd.arg);
177
+        ret = avfilter_graph_send_command(inlink->graph,
178
+                                          cmd.target, cmd.command, cmd.arg,
179
+                                          cmd_buf, sizeof(cmd_buf),
180
+                                          AVFILTER_CMD_FLAG_ONE);
181
+        send_buf = av_asprintf("%d %s%s%s",
182
+                               -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
183
+        if (!send_buf) {
184
+            ret = AVERROR(ENOMEM);
185
+            goto end;
186
+        }
187
+        av_log(ctx, AV_LOG_VERBOSE,
188
+               "Sending command reply for command #%d:\n%s\n",
189
+               zmq->command_count, send_buf);
190
+        if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
191
+            av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
192
+                   zmq->command_count, zmq_strerror(ret));
193
+
194
+    end:
195
+        av_freep(&send_buf);
196
+        av_freep(&recv_buf);
197
+        recv_buf_size = 0;
198
+        av_freep(&cmd.target);
199
+        av_freep(&cmd.command);
200
+        av_freep(&cmd.arg);
201
+    }
202
+
203
+    return ff_filter_frame(ctx->outputs[0], ref);
204
+}
205
+
206
+#if CONFIG_ZMQ_FILTER
207
+
208
+#define zmq_options options
209
+AVFILTER_DEFINE_CLASS(zmq);
210
+
211
+static const AVFilterPad zmq_inputs[] = {
212
+    {
213
+        .name             = "default",
214
+        .type             = AVMEDIA_TYPE_VIDEO,
215
+        .filter_frame     = filter_frame,
216
+    },
217
+    { NULL }
218
+};
219
+
220
+static const AVFilterPad zmq_outputs[] = {
221
+    {
222
+        .name = "default",
223
+        .type = AVMEDIA_TYPE_VIDEO,
224
+    },
225
+    { NULL }
226
+};
227
+
228
+AVFilter avfilter_vf_zmq = {
229
+    .name        = "zmq",
230
+    .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
231
+    .init        = init,
232
+    .uninit      = uninit,
233
+    .priv_size   = sizeof(ZMQContext),
234
+    .inputs      = zmq_inputs,
235
+    .outputs     = zmq_outputs,
236
+    .priv_class  = &zmq_class,
237
+};
238
+
239
+#endif
240
+
241
+#if CONFIG_AZMQ_FILTER
242
+
243
+#define azmq_options options
244
+AVFILTER_DEFINE_CLASS(azmq);
245
+
246
+static const AVFilterPad azmq_inputs[] = {
247
+    {
248
+        .name             = "default",
249
+        .type             = AVMEDIA_TYPE_AUDIO,
250
+        .get_audio_buffer = ff_null_get_audio_buffer,
251
+        .filter_frame     = filter_frame,
252
+    },
253
+    { NULL }
254
+};
255
+
256
+static const AVFilterPad azmq_outputs[] = {
257
+    {
258
+        .name = "default",
259
+        .type = AVMEDIA_TYPE_AUDIO,
260
+    },
261
+    { NULL }
262
+};
263
+
264
+AVFilter avfilter_af_azmq = {
265
+    .name        = "azmq",
266
+    .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
267
+    .init        = init,
268
+    .uninit      = uninit,
269
+    .priv_size   = sizeof(ZMQContext),
270
+    .inputs      = azmq_inputs,
271
+    .outputs     = azmq_outputs,
272
+    .priv_class  = &azmq_class,
273
+};
274
+
275
+#endif
... ...
@@ -29,7 +29,7 @@
29 29
 #include "libavutil/avutil.h"
30 30
 
31 31
 #define LIBAVFILTER_VERSION_MAJOR  3
32
-#define LIBAVFILTER_VERSION_MINOR  65
32
+#define LIBAVFILTER_VERSION_MINOR  66
33 33
 #define LIBAVFILTER_VERSION_MICRO 100
34 34
 
35 35
 #define LIBAVFILTER_VERSION_INT AV_VERSION_INT(LIBAVFILTER_VERSION_MAJOR, \