-fixed _clear() in peers "helper"
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback-gst.c
old mode 100755 (executable)
new mode 100644 (file)
index d6d2316..4f1978a
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2013 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 /**
  * @file conversation/gnunet-helper-audio-playback-gst.c
 #include "gnunet_core_service.h"
 
 #include <gst/gst.h>
-#include <gst/app/gstappsrc.h>
 #include <gst/audio/gstaudiobasesrc.h>
+#include <gst/app/gstappsrc.h>
 #include <glib.h>
 
-#include <opus/opus.h>
-#include <opus/opus_types.h>
+#define DEBUG_READ_PURE_OGG 1
 
 /**
  * How much data to read in one go
  */
 #define MAXLINE 4096
 
-#define SAMPLING_RATE 48000
-
-#define CHANNELS 1
-
-#define FRAME_SIZE (SAMPLING_RATE / 50)
-
-#define PCM_LENGTH (FRAME_SIZE * CHANNELS * sizeof (int16_t))
-
 /**
  * Max number of microseconds to buffer in audiosink.
- * Default is 200000
+ * Default is 1000
  */
 #define BUFFER_TIME 1000
 
 /**
  * Min number of microseconds to buffer in audiosink.
- * Default is 10000
+ * Default is 1000
  */
 #define LATENCY_TIME 1000
 
@@ -77,11 +68,11 @@ static GstElement *pipeline;
  */
 static GstElement *source;
 
-/**
- * OPUS decoder
- */
-static OpusDecoder *dec;
-
+static GstElement *demuxer;
+static GstElement *decoder;
+static GstElement *conv;
+static GstElement *resampler;
+static GstElement *sink;
 
 /**
  * Set to 1 to break the reading loop
@@ -89,25 +80,39 @@ static OpusDecoder *dec;
 static int abort_read;
 
 
-/**
- * OPUS initialization
- */
 static void
-opus_init ()
+sink_child_added (GstChildProxy *child_proxy,
+                 GObject *object, 
+                 gchar *name,
+                 gpointer user_data)
 {
-  int err;
-  int channels = 1;
-
-  dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
+  if (GST_IS_AUDIO_BASE_SRC (object))
+    g_object_set (object,
+                 "buffer-time", (gint64) BUFFER_TIME, 
+                 "latency-time", (gint64) LATENCY_TIME,
+                 NULL);
 }
 
-void
-sink_child_added (GstChildProxy *child_proxy, GObject *object, gchar *name, gpointer user_data)
+
+static void
+ogg_pad_added (GstElement *element, 
+              GstPad *pad,
+              gpointer data)
 {
-  if (GST_IS_AUDIO_BASE_SRC (object))
-    g_object_set (object, "buffer-time", (gint64) BUFFER_TIME, "latency-time", (gint64) LATENCY_TIME, NULL);
+  GstPad *sinkpad;
+  GstElement *decoder = (GstElement *) data;
+
+  /* We can now link this pad with the opus-decoder sink pad */
+  sinkpad = gst_element_get_static_pad (decoder, "sink");
+
+  gst_pad_link (pad, sinkpad);
+
+  gst_element_link_many (decoder, conv, resampler, sink, NULL);
+
+  gst_object_unref (sinkpad);
 }
 
+
 static void
 quit ()
 {
@@ -118,14 +123,17 @@ quit ()
   abort_read = 1;
 }
 
+
 static gboolean
 bus_call (GstBus *bus, GstMessage *msg, gpointer data)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bus message\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
+             "Bus message\n");
   switch (GST_MESSAGE_TYPE (msg))
   {
   case GST_MESSAGE_EOS:
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "End of stream\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "End of stream\n");
     quit ();
     break;
 
@@ -137,7 +145,9 @@ bus_call (GstBus *bus, GstMessage *msg, gpointer data)
       gst_message_parse_error (msg, &error, &debug);
       g_free (debug);
       
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Error: %s\n", error->message);
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
+                 "Error: %s\n", 
+                 error->message);
       g_error_free (error);
       
       quit ();
@@ -158,6 +168,57 @@ signalhandler (int s)
 }
 
 
+static int
+feed_buffer_to_gst (const char *audio, size_t b_len)
+{
+  GstBuffer *b;
+  gchar *bufspace;
+  GstFlowReturn flow;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Feeding %u bytes to GStreamer\n",
+             (unsigned int) b_len);
+
+  bufspace = g_memdup (audio, b_len);
+  b = gst_buffer_new_wrapped (bufspace, b_len);
+  if (NULL == b)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Failed to wrap a buffer\n");
+    g_free (bufspace);
+    return GNUNET_SYSERR;
+  }
+  flow = gst_app_src_push_buffer (GST_APP_SRC (source), b);
+  /* They all return GNUNET_OK, because currently player stops when
+   * data stops coming. This might need to be changed for the player
+   * to also stop when pipeline breaks.
+   */
+  switch (flow)
+  {
+  case GST_FLOW_OK:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
+               "Fed %u bytes to the pipeline\n",
+               (unsigned int) b_len);
+    break;
+  case GST_FLOW_FLUSHING:
+    /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Dropped a buffer\n");
+    break;
+  case GST_FLOW_EOS:
+    /* end of stream */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+               "EOS\n");
+    break;
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Unexpected push result\n");
+    break;
+  }
+  return GNUNET_OK;
+}
+
+
 /**
  * Message callback
  */
@@ -167,68 +228,15 @@ stdin_receiver (void *cls,
                const struct GNUNET_MessageHeader *msg)
 {
   struct AudioMessage *audio;
-  GstBuffer *b;
-  int16_t *bufspace;
-  GstFlowReturn flow;
-  int ret;
+  size_t b_len;
 
   switch (ntohs (msg->type))
   {
   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
     audio = (struct AudioMessage *) msg;
 
-    bufspace = (int16_t *) g_malloc (PCM_LENGTH);
-
-    ret = opus_decode (dec,
-                      (const unsigned char *) &audio[1],
-                      ntohs (audio->header.size) - sizeof (struct AudioMessage),
-                      bufspace,
-                      FRAME_SIZE, 0);
-    if (ret < 0)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 "Opus decoding failed: %d\n",
-                 ret);
-      g_free (bufspace);
-      return GNUNET_OK;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Decoded frame with %u bytes\n",
-               ntohs (audio->header.size));
-
-    b = gst_buffer_new_wrapped (bufspace, ret * sizeof (int16_t));
-    if (NULL == b)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to wrap a buffer\n");
-      g_free (bufspace);
-      return GNUNET_SYSERR;
-    }
-
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pushing...\n");
-    flow = gst_app_src_push_buffer (GST_APP_SRC (source), b);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pushed!\n");
-    /* They all return GNUNET_OK, because currently player stops when
-     * data stops coming. This might need to be changed for the player
-     * to also stop when pipeline breaks.
-     */
-    switch (flow)
-    {
-    case GST_FLOW_OK:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fed %u bytes to the pipeline\n",
-          (unsigned int) ret * sizeof (int16_t));
-      break;
-    case GST_FLOW_FLUSHING:
-      /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Dropped a buffer\n");
-      break;
-    case GST_FLOW_EOS:
-      /* end of stream */
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "EOS\n");
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unexpected push result\n");
-      break;
-    }
+    b_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
+    feed_buffer_to_gst ((const char *) &audio[1], b_len);
     break;
   default:
     break;
@@ -240,62 +248,64 @@ stdin_receiver (void *cls,
 int
 main (int argc, char **argv)
 {
-  GstElement *conv, *resampler, *sink;
   GstBus *bus;
-  GstCaps *caps;
   guint bus_watch_id;
   uint64_t toff;
 
   typedef void (*SignalHandlerPointer) (int);
  
   SignalHandlerPointer inthandler, termhandler;
+#ifdef DEBUG_READ_PURE_OGG
+  int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
+#endif
 
-  inthandler = signal (SIGINT, signalhandler);
-  termhandler = signal (SIGTERM, signalhandler);
-
+  inthandler = signal (SIGINT, 
+                      &signalhandler);
+  termhandler = signal (SIGTERM, 
+                       &signalhandler);
+  
 #ifdef WINDOWS
   setmode (0, _O_BINARY);
 #endif
-
-  opus_init ();
-
+  
   /* Initialisation */
   gst_init (&argc, &argv);
 
   GNUNET_assert (GNUNET_OK ==
-                GNUNET_log_setup ("gnunet-helper-audio-playback",
+                GNUNET_log_setup ("gnunet-helper-audio-playback-gst",
                                   "WARNING",
                                   NULL));
-
+  
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Audio sink starts\n");
-
-  stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
-
+  
+  stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, 
+                                       NULL);
+  
   /* Create gstreamer elements */
   pipeline = gst_pipeline_new ("audio-player");
   source   = gst_element_factory_make ("appsrc",        "audio-input");
+  demuxer  = gst_element_factory_make ("oggdemux",      "ogg-demuxer");
+  decoder  = gst_element_factory_make ("opusdec",       "opus-decoder");
   conv     = gst_element_factory_make ("audioconvert",  "converter");
   resampler= gst_element_factory_make ("audioresample", "resampler");
   sink     = gst_element_factory_make ("autoaudiosink", "audiosink");
 
-  if (!pipeline || !source || !conv || !resampler || !sink)
+  if (!pipeline || !source || !conv || !resampler || !decoder || !demuxer || !sink)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-        "One element could not be created. Exiting.\n");
+               "One element could not be created. Exiting.\n");
     return -1;
   }
 
-  g_signal_connect (sink, "child-added", G_CALLBACK (sink_child_added), NULL);
-
-  caps = gst_caps_new_simple ("audio/x-raw",
-    "format", G_TYPE_STRING, "S16LE",
-    "rate", G_TYPE_INT, SAMPLING_RATE,
-    "channels", G_TYPE_INT, CHANNELS,
-    "layout", G_TYPE_STRING, "interleaved",
-     NULL);
-  gst_app_src_set_caps (GST_APP_SRC (source), caps);
-  gst_caps_unref (caps);
+  g_signal_connect (sink, 
+                   "child-added",
+                   G_CALLBACK (sink_child_added), 
+                   NULL);
+  g_signal_connect (demuxer, 
+                   "pad-added",
+                   G_CALLBACK (ogg_pad_added), 
+                   decoder);
 
   /* Keep a reference to it, we operate on it */
   gst_object_ref (GST_OBJECT (source));
@@ -304,23 +314,29 @@ main (int argc, char **argv)
 
   /* we feed appsrc as fast as possible, it just blocks when it's full */
   g_object_set (G_OBJECT (source),
-      "format", GST_FORMAT_TIME,
+/*      "format", GST_FORMAT_TIME,*/
       "block", TRUE,
       "is-live", TRUE,
       NULL);
 
+  g_object_set (G_OBJECT (decoder),
+/*      "plc", FALSE,*/
+/*      "apply-gain", TRUE,*/
+      "use-inband-fec", TRUE,
+      NULL);
+
   /* we add a message handler */
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   bus_watch_id = gst_bus_add_watch (bus, bus_call, pipeline);
   gst_object_unref (bus);
 
   /* we add all elements into the pipeline */
-  /* audio-input | converter | resampler | audiosink */
-  gst_bin_add_many (GST_BIN (pipeline), source, conv,
+  /* audio-input | ogg-demuxer | opus-decoder | converter | resampler | audiosink */
+  gst_bin_add_many (GST_BIN (pipeline), source, demuxer, decoder, conv,
       resampler, sink, NULL);
 
   /* we link the elements together */
-  gst_element_link_many (source, conv, resampler, sink, NULL);
+  gst_element_link_many (source, demuxer, NULL);
 
   /* Set the pipeline to "playing" state*/
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Now playing\n");
@@ -349,6 +365,13 @@ main (int argc, char **argv)
                toff);
     if (0 == ret)
       break;
+#ifdef DEBUG_READ_PURE_OGG
+    if (read_pure_ogg)
+    {
+      feed_buffer_to_gst (readbuf, ret);
+    }
+    else
+#endif
     GNUNET_SERVER_mst_receive (stdin_mst, NULL,
                               readbuf, ret,
                               GNUNET_NO, GNUNET_NO);
@@ -358,10 +381,12 @@ main (int argc, char **argv)
   signal (SIGINT, inthandler);
   signal (SIGINT, termhandler);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Returned, stopping playback\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+             "Returned, stopping playback\n");
   quit ();
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Deleting pipeline\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+             "Deleting pipeline\n");
   gst_object_unref (GST_OBJECT (source));
   source = NULL;
   gst_object_unref (GST_OBJECT (pipeline));