ignore in lint/
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback-gst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file conversation/gnunet-helper-audio-playback-gst.c
22  * @brief program to playback audio data to the speaker (GStreamer version)
23  * @author LRN
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_protocols.h"
28 #include "conversation.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_core_service.h"
31
32 #include <gst/gst.h>
33 #include <gst/audio/gstaudiobasesrc.h>
34 #include <gst/app/gstappsrc.h>
35 #include <glib.h>
36
37 #define DEBUG_READ_PURE_OGG 1
38
39 /**
40  * How much data to read in one go
41  */
42 #define MAXLINE 4096
43
44 /**
45  * Max number of microseconds to buffer in audiosink.
46  * Default is 1000
47  */
48 #define BUFFER_TIME 1000
49
50 /**
51  * Min number of microseconds to buffer in audiosink.
52  * Default is 1000
53  */
54 #define LATENCY_TIME 1000
55
56 /**
57  * Tokenizer for the data we get from stdin
58  */
59 struct GNUNET_MessageStreamTokenizer *stdin_mst;
60
61 /**
62  * Main pipeline.
63  */
64 static GstElement *pipeline;
65
66 /**
67  * Appsrc instance into which we write data for the pipeline.
68  */
69 static GstElement *source;
70
71 static GstElement *demuxer;
72 static GstElement *decoder;
73 static GstElement *conv;
74 static GstElement *resampler;
75 static GstElement *sink;
76
77 /**
78  * Set to 1 to break the reading loop
79  */
80 static int abort_read;
81
82
83 static void
84 sink_child_added (GstChildProxy *child_proxy,
85                   GObject *object,
86                   gchar *name,
87                   gpointer user_data)
88 {
89   if (GST_IS_AUDIO_BASE_SRC (object))
90     g_object_set (object,
91                   "buffer-time", (gint64) BUFFER_TIME,
92                   "latency-time", (gint64) LATENCY_TIME,
93                   NULL);
94 }
95
96
97 static void
98 ogg_pad_added (GstElement *element,
99                GstPad *pad,
100                gpointer data)
101 {
102   GstPad *sinkpad;
103   GstElement *decoder = (GstElement *) data;
104
105   /* We can now link this pad with the opus-decoder sink pad */
106   sinkpad = gst_element_get_static_pad (decoder, "sink");
107
108   gst_pad_link (pad, sinkpad);
109
110   gst_element_link_many (decoder, conv, resampler, sink, NULL);
111
112   gst_object_unref (sinkpad);
113 }
114
115
116 static void
117 quit ()
118 {
119   if (NULL != source)
120     gst_app_src_end_of_stream (GST_APP_SRC (source));
121   if (NULL != pipeline)
122     gst_element_set_state (pipeline, GST_STATE_NULL);
123   abort_read = 1;
124 }
125
126
127 static gboolean
128 bus_call (GstBus *bus, GstMessage *msg, gpointer data)
129 {
130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
131               "Bus message\n");
132   switch (GST_MESSAGE_TYPE (msg))
133   {
134   case GST_MESSAGE_EOS:
135     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
136                 "End of stream\n");
137     quit ();
138     break;
139
140   case GST_MESSAGE_ERROR:
141     {
142       gchar  *debug;
143       GError *error;
144
145       gst_message_parse_error (msg, &error, &debug);
146       g_free (debug);
147
148       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
149                   "Error: %s\n",
150                   error->message);
151       g_error_free (error);
152
153       quit ();
154       break;
155     }
156   default:
157     break;
158   }
159
160   return TRUE;
161 }
162
163
164 static void
165 signalhandler (int s)
166 {
167   quit ();
168 }
169
170
171 static int
172 feed_buffer_to_gst (const char *audio, size_t b_len)
173 {
174   GstBuffer *b;
175   gchar *bufspace;
176   GstFlowReturn flow;
177
178   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
179               "Feeding %u bytes to GStreamer\n",
180               (unsigned int) b_len);
181
182   bufspace = g_memdup (audio, b_len);
183   b = gst_buffer_new_wrapped (bufspace, b_len);
184   if (NULL == b)
185   {
186     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
187                 "Failed to wrap a buffer\n");
188     g_free (bufspace);
189     return GNUNET_SYSERR;
190   }
191   flow = gst_app_src_push_buffer (GST_APP_SRC (source), b);
192   /* They all return GNUNET_OK, because currently player stops when
193    * data stops coming. This might need to be changed for the player
194    * to also stop when pipeline breaks.
195    */
196   switch (flow)
197   {
198   case GST_FLOW_OK:
199     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200                 "Fed %u bytes to the pipeline\n",
201                 (unsigned int) b_len);
202     break;
203   case GST_FLOW_FLUSHING:
204     /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
205     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
206                 "Dropped a buffer\n");
207     break;
208   case GST_FLOW_EOS:
209     /* end of stream */
210     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
211                 "EOS\n");
212     break;
213   default:
214     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
215                 "Unexpected push result\n");
216     break;
217   }
218   return GNUNET_OK;
219 }
220
221
222 /**
223  * Message callback
224  *
225  * @param msg message we received.
226  * @return #GNUNET_OK on success,
227  *     #GNUNET_NO to stop further processing due to disconnect (no error)
228  *     #GNUNET_SYSERR to stop further processing due to error
229  */
230 static int
231 stdin_receiver (void *cls,
232                 const struct GNUNET_MessageHeader *msg)
233 {
234   struct AudioMessage *audio;
235   size_t b_len;
236
237   switch (ntohs (msg->type))
238   {
239   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
240     audio = (struct AudioMessage *) msg;
241
242     b_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
243     feed_buffer_to_gst ((const char *) &audio[1], b_len);
244     break;
245   default:
246     break;
247   }
248   return GNUNET_OK;
249 }
250
251
252 int
253 main (int argc, char **argv)
254 {
255   GstBus *bus;
256   guint bus_watch_id;
257   uint64_t toff;
258
259   typedef void (*SignalHandlerPointer) (int);
260
261   SignalHandlerPointer inthandler, termhandler;
262 #ifdef DEBUG_READ_PURE_OGG
263   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
264 #endif
265
266   inthandler = signal (SIGINT,
267                        &signalhandler);
268   termhandler = signal (SIGTERM,
269                         &signalhandler);
270
271 #ifdef WINDOWS
272   setmode (0, _O_BINARY);
273 #endif
274
275   /* Initialisation */
276   gst_init (&argc, &argv);
277
278   GNUNET_assert (GNUNET_OK ==
279                  GNUNET_log_setup ("gnunet-helper-audio-playback-gst",
280                                    "WARNING",
281                                    NULL));
282
283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
284               "Audio sink starts\n");
285
286   stdin_mst = GNUNET_MST_create (&stdin_receiver,
287                                  NULL);
288
289   /* Create gstreamer elements */
290   pipeline = gst_pipeline_new ("audio-player");
291   source   = gst_element_factory_make ("appsrc",        "audio-input");
292   demuxer  = gst_element_factory_make ("oggdemux",      "ogg-demuxer");
293   decoder  = gst_element_factory_make ("opusdec",       "opus-decoder");
294   conv     = gst_element_factory_make ("audioconvert",  "converter");
295   resampler= gst_element_factory_make ("audioresample", "resampler");
296   sink     = gst_element_factory_make ("autoaudiosink", "audiosink");
297
298   if (!pipeline || !source || !conv || !resampler || !decoder || !demuxer || !sink)
299   {
300     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
301                 "One element could not be created. Exiting.\n");
302     return -1;
303   }
304
305   g_signal_connect (sink,
306                     "child-added",
307                     G_CALLBACK (sink_child_added),
308                     NULL);
309   g_signal_connect (demuxer,
310                     "pad-added",
311                     G_CALLBACK (ogg_pad_added),
312                     decoder);
313
314   /* Keep a reference to it, we operate on it */
315   gst_object_ref (GST_OBJECT (source));
316
317   /* Set up the pipeline */
318
319   /* we feed appsrc as fast as possible, it just blocks when it's full */
320   g_object_set (G_OBJECT (source),
321 /*      "format", GST_FORMAT_TIME,*/
322       "block", TRUE,
323       "is-live", TRUE,
324       NULL);
325
326   g_object_set (G_OBJECT (decoder),
327 /*      "plc", FALSE,*/
328 /*      "apply-gain", TRUE,*/
329       "use-inband-fec", TRUE,
330       NULL);
331
332   /* we add a message handler */
333   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
334   bus_watch_id = gst_bus_add_watch (bus, bus_call, pipeline);
335   gst_object_unref (bus);
336
337   /* we add all elements into the pipeline */
338   /* audio-input | ogg-demuxer | opus-decoder | converter | resampler | audiosink */
339   gst_bin_add_many (GST_BIN (pipeline), source, demuxer, decoder, conv,
340       resampler, sink, NULL);
341
342   /* we link the elements together */
343   gst_element_link_many (source, demuxer, NULL);
344
345   /* Set the pipeline to "playing" state*/
346   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Now playing\n");
347   gst_element_set_state (pipeline, GST_STATE_PLAYING);
348
349   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running...\n");
350   /* Iterate */
351   toff = 0;
352   while (!abort_read)
353   {
354     char readbuf[MAXLINE];
355     int ret;
356
357     ret = read (0, readbuf, sizeof (readbuf));
358     if (0 > ret)
359     {
360       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
361                   _("Read error from STDIN: %d %s\n"),
362                   ret, strerror (errno));
363       break;
364     }
365     toff += ret;
366     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367                 "Received %d bytes of audio data (total: %llu)\n",
368                 (int) ret,
369                 (unsigned long long) toff);
370     if (0 == ret)
371       break;
372 #ifdef DEBUG_READ_PURE_OGG
373     if (read_pure_ogg)
374     {
375       feed_buffer_to_gst (readbuf, ret);
376     }
377     else
378 #endif
379     GNUNET_MST_from_buffer (stdin_mst,
380                             readbuf,
381                             ret,
382                             GNUNET_NO,
383                             GNUNET_NO);
384   }
385   GNUNET_MST_destroy (stdin_mst);
386
387   signal (SIGINT, inthandler);
388   signal (SIGINT, termhandler);
389
390   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
391               "Returned, stopping playback\n");
392   quit ();
393
394   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
395               "Deleting pipeline\n");
396   gst_object_unref (GST_OBJECT (source));
397   source = NULL;
398   gst_object_unref (GST_OBJECT (pipeline));
399   pipeline = NULL;
400   g_source_remove (bus_watch_id);
401
402   return 0;
403 }