Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file conversation/gnunet-helper-audio-playback.c
22  * @brief program to playback audio data to the speaker
23  * @author Siomon Dieterle
24  * @author Andreas Fuchs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
37
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
41 #include <ogg/ogg.h>
42
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
45
46 #define MAXLINE 4096
47
48 #define SAMPLING_RATE 48000
49
50 #define CHANNELS 1
51
52 /* 120ms at 48000 */
53 #define MAX_FRAME_SIZE (960 * 6)
54
55 /**
56  * Pulseaudio specification. May change in the future.
57  */
58 static pa_sample_spec sample_spec = {
59   .format = PA_SAMPLE_FLOAT32LE,
60   .rate = SAMPLING_RATE,
61   .channels = CHANNELS
62 };
63
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
66 #endif
67
68 /**
69  * Pulseaudio mainloop api
70  */
71 static pa_mainloop_api *mainloop_api;
72
73 /**
74  * Pulseaudio threaded mainloop
75  */
76 static pa_threaded_mainloop *m;
77
78 /**
79  * Pulseaudio context
80  */
81 static pa_context *context;
82
83 /**
84  * Pulseaudio output stream
85  */
86 static pa_stream *stream_out;
87
88 /**
89  * OPUS decoder
90  */
91 static OpusDecoder *dec;
92
93 /**
94  * PCM data buffer
95  */
96 static float *pcm_buffer;
97
98 /**
99  * Number of samples for one frame
100  */
101 static int frame_size;
102
103 /**
104  * Pipe we use to signal the main loop that we are ready to receive.
105  */
106 static int ready_pipe[2];
107
108 /**
109  * Ogg I/O state.
110  */
111 static ogg_sync_state oy;
112
113 /**
114  * Ogg stream state.
115  */
116 static ogg_stream_state os;
117
118 static int channels;
119
120 static int preskip;
121
122 static float gain;
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
128 {
129   uint8_t magic[8];
130   uint8_t version;
131   uint8_t channels;
132   uint16_t preskip GNUNET_PACKED;
133   uint32_t sampling_rate GNUNET_PACKED;
134   uint16_t gain GNUNET_PACKED;
135   uint8_t channel_mapping;
136 };
137
138 GNUNET_NETWORK_STRUCT_END
139
140 /**
141  * Process an Opus header and setup the opus decoder based on it.
142  * It takes several pointers for header values which are needed
143  * elsewhere in the code.
144  */
145 static OpusDecoder *
146 process_header (ogg_packet *op)
147 {
148   int err;
149   OpusDecoder *dec;
150   struct OpusHeadPacket header;
151
152   if (op->bytes < sizeof (header))
153     return NULL;
154   GNUNET_memcpy (&header, op->packet, sizeof (header));
155   header.preskip = GNUNET_le16toh (header.preskip);
156   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157   header.gain = GNUNET_le16toh (header.gain);
158
159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161                header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
162
163   channels = header.channels;
164   preskip = header.preskip;
165
166   if (header.channel_mapping != 0)
167   {
168     fprintf (stderr,
169              "This implementation does not support non-mono streams\n");
170     return NULL;
171   }
172
173   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
174   if (OPUS_OK != err)
175   {
176     fprintf (stderr,
177              "Cannot create encoder: %s\n",
178              opus_strerror (err));
179     return NULL;
180   }
181   if (! dec)
182   {
183     fprintf (stderr,
184              "Decoder initialization failed: %s\n",
185              opus_strerror (err));
186     return NULL;
187   }
188
189   if (0 != header.gain)
190   {
191     /*Gain API added in a newer libopus version, if we don't have it
192       we apply the gain ourselves. We also add in a user provided
193       manual gain at the same time.*/
194     int gainadj = (int) header.gain;
195     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196     if(OPUS_UNIMPLEMENTED == err)
197     {
198       gain = pow (10.0, gainadj / 5120.0);
199     }
200     else if (OPUS_OK != err)
201     {
202       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
203       return NULL;
204     }
205   }
206
207   return dec;
208 }
209
210
211 #ifdef DEBUG_DUMP_DECODED_OGG
212 static size_t
213 fwrite_le32(opus_int32 i32, FILE *file)
214 {
215    unsigned char buf[4];
216    buf[0]=(unsigned char)(i32&0xFF);
217    buf[1]=(unsigned char)(i32>>8&0xFF);
218    buf[2]=(unsigned char)(i32>>16&0xFF);
219    buf[3]=(unsigned char)(i32>>24&0xFF);
220    return fwrite(buf,4,1,file);
221 }
222
223
224 static size_t
225 fwrite_le16(int i16, FILE *file)
226 {
227    unsigned char buf[2];
228    buf[0]=(unsigned char)(i16&0xFF);
229    buf[1]=(unsigned char)(i16>>8&0xFF);
230    return fwrite(buf,2,1,file);
231 }
232
233
234 static int
235 write_wav_header()
236 {
237    int ret;
238    FILE *file = stdout;
239
240    ret = fprintf (file, "RIFF") >= 0;
241    ret &= fwrite_le32 (0x7fffffff, file);
242
243    ret &= fprintf (file, "WAVEfmt ") >= 0;
244    ret &= fwrite_le32 (16, file);
245    ret &= fwrite_le16 (1, file);
246    ret &= fwrite_le16 (channels, file);
247    ret &= fwrite_le32 (SAMPLING_RATE, file);
248    ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249    ret &= fwrite_le16 (2*channels, file);
250    ret &= fwrite_le16 (16, file);
251
252    ret &= fprintf (file, "data") >= 0;
253    ret &= fwrite_le32 (0x7fffffff, file);
254
255    return !ret ? -1 : 16;
256 }
257
258 #endif
259
260
261 static int64_t
262 audio_write (int64_t maxout)
263 {
264   int64_t sampout = 0;
265   int tmp_skip;
266   unsigned out_len;
267   unsigned to_write;
268   float *output;
269 #ifdef DEBUG_DUMP_DECODED_OGG
270   static int wrote_wav_header;
271
272   if (dump_to_stdout && !wrote_wav_header)
273   {
274     write_wav_header ();
275     wrote_wav_header = 1;
276   }
277 #endif
278   maxout = 0 > maxout ? 0 : maxout;
279   do
280   {
281     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
282     preskip -= tmp_skip;
283     output = pcm_buffer + channels * tmp_skip;
284     out_len = frame_size - tmp_skip;
285     if (out_len > MAX_FRAME_SIZE)
286       exit (6);
287     frame_size = 0;
288
289     to_write = out_len < maxout ? out_len : (unsigned) maxout;
290     if (0 < maxout)
291     {
292       int64_t wrote = 0;
293       wrote = to_write;
294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295                   "Writing %u * %u * %u = %llu bytes into PA\n",
296                   to_write,
297                   channels,
298                   (unsigned int) sizeof (float),
299                   (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
301       if (dump_to_stdout)
302       {
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
306         int i;
307         int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308         for (i=0;i<(int)out_len*channels;i++)
309           out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
310
311         fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
312       }
313       else
314 #endif
315       if (pa_stream_write
316           (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317           PA_SEEK_RELATIVE) < 0)
318       {
319         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320                     _("pa_stream_write() failed: %s\n"),
321                     pa_strerror (pa_context_errno (context)));
322       }
323       sampout += wrote;
324       maxout -= wrote;
325     }
326   } while (0 < frame_size && 0 < maxout);
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Wrote %" PRId64 " samples\n",
330               sampout);
331   return sampout;
332 }
333
334
335 /**
336  * Pulseaudio shutdown task
337  */
338 static void
339 quit (int ret)
340 {
341   mainloop_api->quit (mainloop_api, ret);
342   exit (ret);
343 }
344
345
346 static void
347 ogg_demux_and_decode ()
348 {
349   ogg_page og;
350   static int stream_init;
351   int64_t page_granule = 0;
352   ogg_packet op;
353   static int has_opus_stream;
354   static int has_tags_packet;
355   static int32_t opus_serialno;
356   static int64_t link_out;
357   static int64_t packet_count;
358   int eos = 0;
359   static int total_links;
360   static int gran_offset;
361
362   while (1 == ogg_sync_pageout (&oy, &og))
363   {
364     if (0 == stream_init)
365     {
366       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367                   "Initialized the stream\n");
368       ogg_stream_init (&os, ogg_page_serialno (&og));
369       stream_init = 1;
370     }
371     if (ogg_page_serialno (&og) != os.serialno)
372     {
373       /* so all streams are read. */
374       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375                   "Re-set serial number\n");
376       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
377     }
378     /*Add page to the bitstream*/
379     ogg_stream_pagein (&os, &og);
380     page_granule = ogg_page_granulepos (&og);
381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382                 "Reading page that ends at %" PRId64 "\n",
383                 page_granule);
384     /*Extract all available packets*/
385     while (1 == ogg_stream_packetout (&os, &op))
386     {
387       /*OggOpus streams are identified by a magic string in the initial
388         stream header.*/
389       if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
390       {
391         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                     "Got Opus Header\n");
393         if (has_opus_stream && has_tags_packet)
394         {
395           /*If we're seeing another BOS OpusHead now it means
396             the stream is chained without an EOS.
397             This can easily happen if record helper is terminated unexpectedly.
398            */
399           has_opus_stream = 0;
400           if (dec)
401             opus_decoder_destroy (dec);
402           dec = NULL;
403           fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
404         }
405         if (!has_opus_stream)
406         {
407           if (packet_count > 0 && opus_serialno == os.serialno)
408           {
409             fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
410               (int64_t) opus_serialno, (int64_t) os.serialno);
411             quit(1);
412           }
413           opus_serialno = os.serialno;
414           has_opus_stream = 1;
415           has_tags_packet = 0;
416           link_out = 0;
417           packet_count = 0;
418           eos = 0;
419           total_links++;
420           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421                       "Got header for stream %" PRId64 ", this is %dth link\n",
422                       (int64_t) opus_serialno, total_links);
423         }
424         else
425         {
426           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
427         }
428       }
429       if (!has_opus_stream || os.serialno != opus_serialno)
430       {
431         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432                     "breaking out\n");
433         break;
434       }
435       /*If first packet in a logical stream, process the Opus header*/
436       if (0 == packet_count)
437       {
438         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439                     "Decoding header\n");
440         dec = process_header (&op);
441         if (!dec)
442            quit (1);
443
444         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
445         {
446           /*The format specifies that the initial header and tags packets are on their
447             own pages. To aid implementors in discovering that their files are wrong
448             we reject them explicitly here. In some player designs files like this would
449             fail even without an explicit test.*/
450           fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
451           quit (1);
452         }
453
454         /*Remember how many samples at the front we were told to skip
455           so that we can adjust the timestamp counting.*/
456         gran_offset = preskip;
457
458         if (! pcm_buffer)
459         {
460           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461                       "Allocating %u * %u * %u = %llu bytes of buffer space\n",
462                       MAX_FRAME_SIZE,
463                       channels,
464                       (unsigned int) sizeof (float),
465                       (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
466           pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
467         }
468       }
469       else if (1 == packet_count)
470       {
471         has_tags_packet = 1;
472         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
473         {
474           fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
475           quit (1);
476         }
477       }
478       else
479       {
480         int ret;
481         int64_t maxout;
482         int64_t outsamp;
483
484         /*End of stream condition*/
485         if (op.e_o_s && os.serialno == opus_serialno)
486         {
487           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
488                       "Got EOS\n");
489           eos = 1; /* don't care for anything except opus eos */
490         }
491
492         /*Decode Opus packet*/
493         ret = opus_decode_float (dec,
494                                  (const unsigned char *) op.packet,
495                                  op.bytes,
496                                  pcm_buffer,
497                                  MAX_FRAME_SIZE, 0);
498
499         /*If the decoder returned less than zero, we have an error.*/
500         if (0 > ret)
501         {
502           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
503           break;
504         }
505         frame_size = ret;
506         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
508                     ret,
509                     ret * channels,
510                     (unsigned int) op.bytes);
511
512         /*Apply header gain, if we're not using an opus library new
513           enough to do this internally.*/
514         if (0 != gain)
515         {
516           int i;
517           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518                       "Applying gain %f\n",
519                       gain);
520           for (i = 0; i < frame_size * channels; i++)
521             pcm_buffer[i] *= gain;
522         }
523
524         /*This handles making sure that our output duration respects
525           the final end-trim by not letting the output sample count
526           get ahead of the granpos indicated value.*/
527         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
528         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529                     "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
530                     packet_count, maxout);
531
532         outsamp = audio_write (0 > maxout ? 0 : maxout);
533         link_out += outsamp;
534       }
535       packet_count++;
536     }
537     if (eos)
538     {
539       has_opus_stream = 0;
540       if (dec)
541         opus_decoder_destroy (dec);
542       dec = NULL;
543     }
544   }
545 }
546
547 /**
548  * Message callback
549  *
550  * @param msg message we received.
551  * @return #GNUNET_OK on success,
552  *     #GNUNET_NO to stop further processing due to disconnect (no error)
553  *     #GNUNET_SYSERR to stop further processing due to error
554  */
555 static int
556 stdin_receiver (void *cls,
557                 const struct GNUNET_MessageHeader *msg)
558 {
559   struct AudioMessage *audio;
560   char *data;
561   size_t payload_len;
562
563   switch (ntohs (msg->type))
564   {
565   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
566     audio = (struct AudioMessage *) msg;
567     payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
568
569     /*Get the ogg buffer for writing*/
570     data = ogg_sync_buffer (&oy, payload_len);
571     /*Read bitstream from input file*/
572     GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
573     ogg_sync_wrote (&oy, payload_len);
574
575     ogg_demux_and_decode ();
576     break;
577   default:
578     break;
579   }
580   return GNUNET_OK;
581 }
582
583
584 /**
585  * Callback when data is there for playback
586  */
587 static void
588 stream_write_callback (pa_stream *s,
589                        size_t length,
590                        void *userdata)
591 {
592   /* unblock 'main' */
593   if (-1 != ready_pipe[1])
594   {
595     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                 "Unblocking main loop!\n");
597     (void) write (ready_pipe[1], "r", 1);
598   }
599 }
600
601
602 /**
603  * Exit callback for SIGTERM and SIGINT
604  */
605 static void
606 exit_signal_callback (pa_mainloop_api *m,
607                       pa_signal_event *e,
608                       int sig,
609                       void *userdata)
610 {
611   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
612               _("gnunet-helper-audio-playback - Got signal, exiting\n"));
613   quit (1);
614 }
615
616
617 /**
618  * Pulseaudio stream state callback
619  */
620 static void
621 context_state_callback (pa_context *c,
622                         void *userdata)
623 {
624   int p;
625
626   GNUNET_assert (NULL != c);
627   switch (pa_context_get_state (c))
628   {
629   case PA_CONTEXT_CONNECTING:
630   case PA_CONTEXT_AUTHORIZING:
631   case PA_CONTEXT_SETTING_NAME:
632     break;
633   case PA_CONTEXT_READY:
634   {
635     GNUNET_assert (! stream_out);
636     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
637                 _("Connection established.\n"));
638     if (! (stream_out =
639            pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
640     {
641       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
642                   _("pa_stream_new() failed: %s\n"),
643                   pa_strerror (pa_context_errno (c)));
644       goto fail;
645     }
646     pa_stream_set_write_callback (stream_out,
647                                   &stream_write_callback,
648                                   NULL);
649     if ((p =
650          pa_stream_connect_playback (stream_out, NULL,
651                                      NULL,
652                                      PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
653                                      NULL,  NULL)) < 0)
654     {
655       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
656                   _("pa_stream_connect_playback() failed: %s\n"),
657                   pa_strerror (pa_context_errno (c)));
658       goto fail;
659     }
660     break;
661   }
662   case PA_CONTEXT_TERMINATED:
663     quit (0);
664     break;
665
666   case PA_CONTEXT_FAILED:
667   default:
668     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
669                 _("Connection failure: %s\n"),
670                 pa_strerror (pa_context_errno (c)));
671     goto fail;
672   }
673   return;
674  fail:
675   quit (1);
676 }
677
678
679 /**
680  * Pulseaudio initialization
681  */
682 static void
683 pa_init ()
684 {
685   int r;
686
687   if (!pa_sample_spec_valid (&sample_spec))
688   {
689     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
690                 _("Wrong Spec\n"));
691   }
692   /* set up threaded playback mainloop */
693   if (!(m = pa_threaded_mainloop_new ()))
694   {
695     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
696                 _("pa_mainloop_new() failed.\n"));
697   }
698   mainloop_api = pa_threaded_mainloop_get_api (m);
699   /* listen to signals */
700   r = pa_signal_init (mainloop_api);
701   GNUNET_assert (r == 0);
702   pa_signal_new (SIGINT, exit_signal_callback, NULL);
703   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
704
705
706   /* connect to the main pulseaudio context */
707   if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
708   {
709     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
710                 _("pa_context_new() failed.\n"));
711   }
712   pa_context_set_state_callback (context, context_state_callback, NULL);
713
714   if (pa_context_connect (context, NULL, 0, NULL) < 0)
715   {
716     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
717                 _("pa_context_connect() failed: %s\n"),
718                 pa_strerror (pa_context_errno (context)));
719   }
720   if (pa_threaded_mainloop_start (m) < 0)
721   {
722     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
723                 _("pa_mainloop_run() failed.\n"));
724   }
725 }
726
727
728 static void
729 ogg_init ()
730 {
731   ogg_sync_init (&oy);
732 }
733
734
735 static void
736 drain_callback (pa_stream*s, int success, void *userdata)
737 {
738   pa_threaded_mainloop_signal (m, 0);
739 }
740
741
742 /**
743  * The main function for the playback helper.
744  *
745  * @param argc number of arguments from the command line
746  * @param argv command line arguments
747  * @return 0 ok, 1 on error
748  */
749 int
750 main (int argc, char *argv[])
751 {
752   static unsigned long long toff;
753
754   char readbuf[MAXLINE];
755   struct GNUNET_MessageStreamTokenizer *stdin_mst;
756   char c;
757   ssize_t ret;
758 #ifdef DEBUG_READ_PURE_OGG
759   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
760 #endif
761
762   GNUNET_assert (GNUNET_OK ==
763                  GNUNET_log_setup ("gnunet-helper-audio-playback",
764                                    "WARNING",
765                                    NULL));
766   if (0 != pipe (ready_pipe))
767   {
768     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
769     return 1;
770   }
771   stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
772   ogg_init ();
773   pa_init ();
774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775               "Waiting for PulseAudio to be ready.\n");
776   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
777   close (ready_pipe[0]);
778   close (ready_pipe[1]);
779   ready_pipe[0] = -1;
780   ready_pipe[1] = -1;
781 #ifdef DEBUG_DUMP_DECODED_OGG
782   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
783 #endif
784   while (1)
785   {
786     ret = read (0, readbuf, sizeof (readbuf));
787     toff += ret;
788     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789                 "Received %d bytes of audio data (total: %llu)\n",
790                 (int) ret,
791                 toff);
792     if (0 > ret)
793     {
794       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
795                   _("Read error from STDIN: %s\n"),
796                   strerror (errno));
797       break;
798     }
799     if (0 == ret)
800       break;
801 #ifdef DEBUG_READ_PURE_OGG
802     if (read_pure_ogg)
803     {
804       char *data = ogg_sync_buffer (&oy, ret);
805       GNUNET_memcpy (data, readbuf, ret);
806       ogg_sync_wrote (&oy, ret);
807       ogg_demux_and_decode ();
808     }
809     else
810 #endif
811     GNUNET_MST_from_buffer (stdin_mst,
812                             readbuf, ret,
813                             GNUNET_NO, GNUNET_NO);
814   }
815   GNUNET_MST_destroy (stdin_mst);
816   if (stream_out)
817   {
818     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819                 "Locking\n");
820     pa_threaded_mainloop_lock (m);
821     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822                 "Draining\n");
823     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
824     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
825     {
826       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827                   "Waiting\n");
828       pa_threaded_mainloop_wait (m);
829     }
830     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831                 "Unreffing\n");
832     pa_operation_unref (o);
833     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834                 "Unlocking\n");
835     pa_threaded_mainloop_unlock (m);
836   }
837   return 0;
838 }