uncrustify as demanded.
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback-gst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file conversation/gnunet-helper-audio-playback-gst.c
22  * @brief program to playback audio data to the speaker (GStreamer version)
23  * @author LRN
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_protocols.h"
28 #include "conversation.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_core_service.h"
31
32 #include <gst/gst.h>
33 #include <gst/audio/gstaudiobasesrc.h>
34 #include <gst/app/gstappsrc.h>
35 #include <glib.h>
36
37 #define DEBUG_READ_PURE_OGG 1
38
39 /**
40  * How much data to read in one go
41  */
42 #define MAXLINE 4096
43
44 /**
45  * Max number of microseconds to buffer in audiosink.
46  * Default is 1000
47  */
48 #define BUFFER_TIME 1000
49
50 /**
51  * Min number of microseconds to buffer in audiosink.
52  * Default is 1000
53  */
54 #define LATENCY_TIME 1000
55
56 /**
57  * Tokenizer for the data we get from stdin
58  */
59 struct GNUNET_MessageStreamTokenizer *stdin_mst;
60
61 /**
62  * Main pipeline.
63  */
64 static GstElement *pipeline;
65
66 /**
67  * Appsrc instance into which we write data for the pipeline.
68  */
69 static GstElement *source;
70
71 static GstElement *demuxer;
72 static GstElement *decoder;
73 static GstElement *conv;
74 static GstElement *resampler;
75 static GstElement *sink;
76
77 /**
78  * Set to 1 to break the reading loop
79  */
80 static int abort_read;
81
82
83 static void
84 sink_child_added(GstChildProxy *child_proxy,
85                  GObject *object,
86                  gchar *name,
87                  gpointer user_data)
88 {
89   if (GST_IS_AUDIO_BASE_SRC(object))
90     g_object_set(object,
91                  "buffer-time", (gint64)BUFFER_TIME,
92                  "latency-time", (gint64)LATENCY_TIME,
93                  NULL);
94 }
95
96
97 static void
98 ogg_pad_added(GstElement *element,
99               GstPad *pad,
100               gpointer data)
101 {
102   GstPad *sinkpad;
103   GstElement *decoder = (GstElement *)data;
104
105   /* We can now link this pad with the opus-decoder sink pad */
106   sinkpad = gst_element_get_static_pad(decoder, "sink");
107
108   gst_pad_link(pad, sinkpad);
109
110   gst_element_link_many(decoder, conv, resampler, sink, NULL);
111
112   gst_object_unref(sinkpad);
113 }
114
115
116 static void
117 quit()
118 {
119   if (NULL != source)
120     gst_app_src_end_of_stream(GST_APP_SRC(source));
121   if (NULL != pipeline)
122     gst_element_set_state(pipeline, GST_STATE_NULL);
123   abort_read = 1;
124 }
125
126
127 static gboolean
128 bus_call(GstBus *bus, GstMessage *msg, gpointer data)
129 {
130   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
131              "Bus message\n");
132   switch (GST_MESSAGE_TYPE(msg))
133     {
134     case GST_MESSAGE_EOS:
135       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
136                  "End of stream\n");
137       quit();
138       break;
139
140     case GST_MESSAGE_ERROR:
141     {
142       gchar  *debug;
143       GError *error;
144
145       gst_message_parse_error(msg, &error, &debug);
146       g_free(debug);
147
148       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
149                  "Error: %s\n",
150                  error->message);
151       g_error_free(error);
152
153       quit();
154       break;
155     }
156
157     default:
158       break;
159     }
160
161   return TRUE;
162 }
163
164
165 static void
166 signalhandler(int s)
167 {
168   quit();
169 }
170
171
172 static int
173 feed_buffer_to_gst(const char *audio, size_t b_len)
174 {
175   GstBuffer *b;
176   gchar *bufspace;
177   GstFlowReturn flow;
178
179   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
180              "Feeding %u bytes to GStreamer\n",
181              (unsigned int)b_len);
182
183   bufspace = g_memdup(audio, b_len);
184   b = gst_buffer_new_wrapped(bufspace, b_len);
185   if (NULL == b)
186     {
187       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
188                  "Failed to wrap a buffer\n");
189       g_free(bufspace);
190       return GNUNET_SYSERR;
191     }
192   flow = gst_app_src_push_buffer(GST_APP_SRC(source), b);
193   /* They all return GNUNET_OK, because currently player stops when
194    * data stops coming. This might need to be changed for the player
195    * to also stop when pipeline breaks.
196    */
197   switch (flow)
198     {
199     case GST_FLOW_OK:
200       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
201                  "Fed %u bytes to the pipeline\n",
202                  (unsigned int)b_len);
203       break;
204
205     case GST_FLOW_FLUSHING:
206       /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
207       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
208                  "Dropped a buffer\n");
209       break;
210
211     case GST_FLOW_EOS:
212       /* end of stream */
213       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
214                  "EOS\n");
215       break;
216
217     default:
218       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
219                  "Unexpected push result\n");
220       break;
221     }
222   return GNUNET_OK;
223 }
224
225
226 /**
227  * Message callback
228  *
229  * @param msg message we received.
230  * @return #GNUNET_OK on success,
231  *     #GNUNET_NO to stop further processing due to disconnect (no error)
232  *     #GNUNET_SYSERR to stop further processing due to error
233  */
234 static int
235 stdin_receiver(void *cls,
236                const struct GNUNET_MessageHeader *msg)
237 {
238   struct AudioMessage *audio;
239   size_t b_len;
240
241   switch (ntohs(msg->type))
242     {
243     case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
244       audio = (struct AudioMessage *)msg;
245
246       b_len = ntohs(audio->header.size) - sizeof(struct AudioMessage);
247       feed_buffer_to_gst((const char *)&audio[1], b_len);
248       break;
249
250     default:
251       break;
252     }
253   return GNUNET_OK;
254 }
255
256
257 int
258 main(int argc, char **argv)
259 {
260   GstBus *bus;
261   guint bus_watch_id;
262   uint64_t toff;
263
264   typedef void (*SignalHandlerPointer) (int);
265
266   SignalHandlerPointer inthandler, termhandler;
267 #ifdef DEBUG_READ_PURE_OGG
268   int read_pure_ogg = getenv("GNUNET_READ_PURE_OGG") ? 1 : 0;
269 #endif
270
271   inthandler = signal(SIGINT,
272                       &signalhandler);
273   termhandler = signal(SIGTERM,
274                        &signalhandler);
275
276 #ifdef WINDOWS
277   setmode(0, _O_BINARY);
278 #endif
279
280   /* Initialisation */
281   gst_init(&argc, &argv);
282
283   GNUNET_assert(GNUNET_OK ==
284                 GNUNET_log_setup("gnunet-helper-audio-playback-gst",
285                                  "WARNING",
286                                  NULL));
287
288   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
289              "Audio sink starts\n");
290
291   stdin_mst = GNUNET_MST_create(&stdin_receiver,
292                                 NULL);
293
294   /* Create gstreamer elements */
295   pipeline = gst_pipeline_new("audio-player");
296   source = gst_element_factory_make("appsrc", "audio-input");
297   demuxer = gst_element_factory_make("oggdemux", "ogg-demuxer");
298   decoder = gst_element_factory_make("opusdec", "opus-decoder");
299   conv = gst_element_factory_make("audioconvert", "converter");
300   resampler = gst_element_factory_make("audioresample", "resampler");
301   sink = gst_element_factory_make("autoaudiosink", "audiosink");
302
303   if (!pipeline || !source || !conv || !resampler || !decoder || !demuxer || !sink)
304     {
305       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
306                  "One element could not be created. Exiting.\n");
307       return -1;
308     }
309
310   g_signal_connect(sink,
311                    "child-added",
312                    G_CALLBACK(sink_child_added),
313                    NULL);
314   g_signal_connect(demuxer,
315                    "pad-added",
316                    G_CALLBACK(ogg_pad_added),
317                    decoder);
318
319   /* Keep a reference to it, we operate on it */
320   gst_object_ref(GST_OBJECT(source));
321
322   /* Set up the pipeline */
323
324   /* we feed appsrc as fast as possible, it just blocks when it's full */
325   g_object_set(G_OBJECT(source),
326 /*      "format", GST_FORMAT_TIME,*/
327                "block", TRUE,
328                "is-live", TRUE,
329                NULL);
330
331   g_object_set(G_OBJECT(decoder),
332 /*      "plc", FALSE,*/
333 /*      "apply-gain", TRUE,*/
334                "use-inband-fec", TRUE,
335                NULL);
336
337   /* we add a message handler */
338   bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
339   bus_watch_id = gst_bus_add_watch(bus, bus_call, pipeline);
340   gst_object_unref(bus);
341
342   /* we add all elements into the pipeline */
343   /* audio-input | ogg-demuxer | opus-decoder | converter | resampler | audiosink */
344   gst_bin_add_many(GST_BIN(pipeline), source, demuxer, decoder, conv,
345                    resampler, sink, NULL);
346
347   /* we link the elements together */
348   gst_element_link_many(source, demuxer, NULL);
349
350   /* Set the pipeline to "playing" state*/
351   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Now playing\n");
352   gst_element_set_state(pipeline, GST_STATE_PLAYING);
353
354   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Running...\n");
355   /* Iterate */
356   toff = 0;
357   while (!abort_read)
358     {
359       char readbuf[MAXLINE];
360       int ret;
361
362       ret = read(0, readbuf, sizeof(readbuf));
363       if (0 > ret)
364         {
365           GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
366                      _("Read error from STDIN: %d %s\n"),
367                      ret, strerror(errno));
368           break;
369         }
370       toff += ret;
371       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
372                  "Received %d bytes of audio data (total: %llu)\n",
373                  (int)ret,
374                  (unsigned long long)toff);
375       if (0 == ret)
376         break;
377 #ifdef DEBUG_READ_PURE_OGG
378       if (read_pure_ogg)
379         {
380           feed_buffer_to_gst(readbuf, ret);
381         }
382       else
383 #endif
384       GNUNET_MST_from_buffer(stdin_mst,
385                              readbuf,
386                              ret,
387                              GNUNET_NO,
388                              GNUNET_NO);
389     }
390   GNUNET_MST_destroy(stdin_mst);
391
392   signal(SIGINT, inthandler);
393   signal(SIGINT, termhandler);
394
395   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
396              "Returned, stopping playback\n");
397   quit();
398
399   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
400              "Deleting pipeline\n");
401   gst_object_unref(GST_OBJECT(source));
402   source = NULL;
403   gst_object_unref(GST_OBJECT(pipeline));
404   pipeline = NULL;
405   g_source_remove(bus_watch_id);
406
407   return 0;
408 }