first batch of license fixes (boring)
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback-gst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15 /**
16  * @file conversation/gnunet-helper-audio-playback-gst.c
17  * @brief program to playback audio data to the speaker (GStreamer version)
18  * @author LRN
19  */
20 #include "platform.h"
21 #include "gnunet_util_lib.h"
22 #include "gnunet_protocols.h"
23 #include "conversation.h"
24 #include "gnunet_constants.h"
25 #include "gnunet_core_service.h"
26
27 #include <gst/gst.h>
28 #include <gst/audio/gstaudiobasesrc.h>
29 #include <gst/app/gstappsrc.h>
30 #include <glib.h>
31
32 #define DEBUG_READ_PURE_OGG 1
33
34 /**
35  * How much data to read in one go
36  */
37 #define MAXLINE 4096
38
39 /**
40  * Max number of microseconds to buffer in audiosink.
41  * Default is 1000
42  */
43 #define BUFFER_TIME 1000
44
45 /**
46  * Min number of microseconds to buffer in audiosink.
47  * Default is 1000
48  */
49 #define LATENCY_TIME 1000
50
51 /**
52  * Tokenizer for the data we get from stdin
53  */
54 struct GNUNET_MessageStreamTokenizer *stdin_mst;
55
56 /**
57  * Main pipeline.
58  */
59 static GstElement *pipeline;
60
61 /**
62  * Appsrc instance into which we write data for the pipeline.
63  */
64 static GstElement *source;
65
66 static GstElement *demuxer;
67 static GstElement *decoder;
68 static GstElement *conv;
69 static GstElement *resampler;
70 static GstElement *sink;
71
72 /**
73  * Set to 1 to break the reading loop
74  */
75 static int abort_read;
76
77
78 static void
79 sink_child_added (GstChildProxy *child_proxy,
80                   GObject *object,
81                   gchar *name,
82                   gpointer user_data)
83 {
84   if (GST_IS_AUDIO_BASE_SRC (object))
85     g_object_set (object,
86                   "buffer-time", (gint64) BUFFER_TIME,
87                   "latency-time", (gint64) LATENCY_TIME,
88                   NULL);
89 }
90
91
92 static void
93 ogg_pad_added (GstElement *element,
94                GstPad *pad,
95                gpointer data)
96 {
97   GstPad *sinkpad;
98   GstElement *decoder = (GstElement *) data;
99
100   /* We can now link this pad with the opus-decoder sink pad */
101   sinkpad = gst_element_get_static_pad (decoder, "sink");
102
103   gst_pad_link (pad, sinkpad);
104
105   gst_element_link_many (decoder, conv, resampler, sink, NULL);
106
107   gst_object_unref (sinkpad);
108 }
109
110
111 static void
112 quit ()
113 {
114   if (NULL != source)
115     gst_app_src_end_of_stream (GST_APP_SRC (source));
116   if (NULL != pipeline)
117     gst_element_set_state (pipeline, GST_STATE_NULL);
118   abort_read = 1;
119 }
120
121
122 static gboolean
123 bus_call (GstBus *bus, GstMessage *msg, gpointer data)
124 {
125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
126               "Bus message\n");
127   switch (GST_MESSAGE_TYPE (msg))
128   {
129   case GST_MESSAGE_EOS:
130     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
131                 "End of stream\n");
132     quit ();
133     break;
134
135   case GST_MESSAGE_ERROR:
136     {
137       gchar  *debug;
138       GError *error;
139
140       gst_message_parse_error (msg, &error, &debug);
141       g_free (debug);
142
143       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
144                   "Error: %s\n",
145                   error->message);
146       g_error_free (error);
147
148       quit ();
149       break;
150     }
151   default:
152     break;
153   }
154
155   return TRUE;
156 }
157
158
159 static void
160 signalhandler (int s)
161 {
162   quit ();
163 }
164
165
166 static int
167 feed_buffer_to_gst (const char *audio, size_t b_len)
168 {
169   GstBuffer *b;
170   gchar *bufspace;
171   GstFlowReturn flow;
172
173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
174               "Feeding %u bytes to GStreamer\n",
175               (unsigned int) b_len);
176
177   bufspace = g_memdup (audio, b_len);
178   b = gst_buffer_new_wrapped (bufspace, b_len);
179   if (NULL == b)
180   {
181     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
182                 "Failed to wrap a buffer\n");
183     g_free (bufspace);
184     return GNUNET_SYSERR;
185   }
186   flow = gst_app_src_push_buffer (GST_APP_SRC (source), b);
187   /* They all return GNUNET_OK, because currently player stops when
188    * data stops coming. This might need to be changed for the player
189    * to also stop when pipeline breaks.
190    */
191   switch (flow)
192   {
193   case GST_FLOW_OK:
194     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
195                 "Fed %u bytes to the pipeline\n",
196                 (unsigned int) b_len);
197     break;
198   case GST_FLOW_FLUSHING:
199     /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
200     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
201                 "Dropped a buffer\n");
202     break;
203   case GST_FLOW_EOS:
204     /* end of stream */
205     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
206                 "EOS\n");
207     break;
208   default:
209     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
210                 "Unexpected push result\n");
211     break;
212   }
213   return GNUNET_OK;
214 }
215
216
217 /**
218  * Message callback
219  *
220  * @param msg message we received.
221  * @return #GNUNET_OK on success,
222  *     #GNUNET_NO to stop further processing due to disconnect (no error)
223  *     #GNUNET_SYSERR to stop further processing due to error
224  */
225 static int
226 stdin_receiver (void *cls,
227                 const struct GNUNET_MessageHeader *msg)
228 {
229   struct AudioMessage *audio;
230   size_t b_len;
231
232   switch (ntohs (msg->type))
233   {
234   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
235     audio = (struct AudioMessage *) msg;
236
237     b_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
238     feed_buffer_to_gst ((const char *) &audio[1], b_len);
239     break;
240   default:
241     break;
242   }
243   return GNUNET_OK;
244 }
245
246
247 int
248 main (int argc, char **argv)
249 {
250   GstBus *bus;
251   guint bus_watch_id;
252   uint64_t toff;
253
254   typedef void (*SignalHandlerPointer) (int);
255
256   SignalHandlerPointer inthandler, termhandler;
257 #ifdef DEBUG_READ_PURE_OGG
258   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
259 #endif
260
261   inthandler = signal (SIGINT,
262                        &signalhandler);
263   termhandler = signal (SIGTERM,
264                         &signalhandler);
265
266 #ifdef WINDOWS
267   setmode (0, _O_BINARY);
268 #endif
269
270   /* Initialisation */
271   gst_init (&argc, &argv);
272
273   GNUNET_assert (GNUNET_OK ==
274                  GNUNET_log_setup ("gnunet-helper-audio-playback-gst",
275                                    "WARNING",
276                                    NULL));
277
278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
279               "Audio sink starts\n");
280
281   stdin_mst = GNUNET_MST_create (&stdin_receiver,
282                                  NULL);
283
284   /* Create gstreamer elements */
285   pipeline = gst_pipeline_new ("audio-player");
286   source   = gst_element_factory_make ("appsrc",        "audio-input");
287   demuxer  = gst_element_factory_make ("oggdemux",      "ogg-demuxer");
288   decoder  = gst_element_factory_make ("opusdec",       "opus-decoder");
289   conv     = gst_element_factory_make ("audioconvert",  "converter");
290   resampler= gst_element_factory_make ("audioresample", "resampler");
291   sink     = gst_element_factory_make ("autoaudiosink", "audiosink");
292
293   if (!pipeline || !source || !conv || !resampler || !decoder || !demuxer || !sink)
294   {
295     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
296                 "One element could not be created. Exiting.\n");
297     return -1;
298   }
299
300   g_signal_connect (sink,
301                     "child-added",
302                     G_CALLBACK (sink_child_added),
303                     NULL);
304   g_signal_connect (demuxer,
305                     "pad-added",
306                     G_CALLBACK (ogg_pad_added),
307                     decoder);
308
309   /* Keep a reference to it, we operate on it */
310   gst_object_ref (GST_OBJECT (source));
311
312   /* Set up the pipeline */
313
314   /* we feed appsrc as fast as possible, it just blocks when it's full */
315   g_object_set (G_OBJECT (source),
316 /*      "format", GST_FORMAT_TIME,*/
317       "block", TRUE,
318       "is-live", TRUE,
319       NULL);
320
321   g_object_set (G_OBJECT (decoder),
322 /*      "plc", FALSE,*/
323 /*      "apply-gain", TRUE,*/
324       "use-inband-fec", TRUE,
325       NULL);
326
327   /* we add a message handler */
328   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
329   bus_watch_id = gst_bus_add_watch (bus, bus_call, pipeline);
330   gst_object_unref (bus);
331
332   /* we add all elements into the pipeline */
333   /* audio-input | ogg-demuxer | opus-decoder | converter | resampler | audiosink */
334   gst_bin_add_many (GST_BIN (pipeline), source, demuxer, decoder, conv,
335       resampler, sink, NULL);
336
337   /* we link the elements together */
338   gst_element_link_many (source, demuxer, NULL);
339
340   /* Set the pipeline to "playing" state*/
341   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Now playing\n");
342   gst_element_set_state (pipeline, GST_STATE_PLAYING);
343
344   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running...\n");
345   /* Iterate */
346   toff = 0;
347   while (!abort_read)
348   {
349     char readbuf[MAXLINE];
350     int ret;
351
352     ret = read (0, readbuf, sizeof (readbuf));
353     if (0 > ret)
354     {
355       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
356                   _("Read error from STDIN: %d %s\n"),
357                   ret, strerror (errno));
358       break;
359     }
360     toff += ret;
361     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362                 "Received %d bytes of audio data (total: %llu)\n",
363                 (int) ret,
364                 (unsigned long long) toff);
365     if (0 == ret)
366       break;
367 #ifdef DEBUG_READ_PURE_OGG
368     if (read_pure_ogg)
369     {
370       feed_buffer_to_gst (readbuf, ret);
371     }
372     else
373 #endif
374     GNUNET_MST_from_buffer (stdin_mst,
375                             readbuf,
376                             ret,
377                             GNUNET_NO,
378                             GNUNET_NO);
379   }
380   GNUNET_MST_destroy (stdin_mst);
381
382   signal (SIGINT, inthandler);
383   signal (SIGINT, termhandler);
384
385   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
386               "Returned, stopping playback\n");
387   quit ();
388
389   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
390               "Deleting pipeline\n");
391   gst_object_unref (GST_OBJECT (source));
392   source = NULL;
393   gst_object_unref (GST_OBJECT (pipeline));
394   pipeline = NULL;
395   g_source_remove (bus_watch_id);
396
397   return 0;
398 }