glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15 /**
16  * @file conversation/gnunet-helper-audio-playback.c
17  * @brief program to playback audio data to the speaker
18  * @author Siomon Dieterle
19  * @author Andreas Fuchs
20  * @author Christian Grothoff
21  */
22 #include "platform.h"
23 #include "gnunet_util_lib.h"
24 #include "gnunet_protocols.h"
25 #include "conversation.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_core_service.h"
28
29 #include <pulse/simple.h>
30 #include <pulse/error.h>
31 #include <pulse/rtclock.h>
32
33 #include <pulse/pulseaudio.h>
34 #include <opus/opus.h>
35 #include <opus/opus_types.h>
36 #include <ogg/ogg.h>
37
38 #define DEBUG_READ_PURE_OGG 1
39 #define DEBUG_DUMP_DECODED_OGG 1
40
41 #define MAXLINE 4096
42
43 #define SAMPLING_RATE 48000
44
45 #define CHANNELS 1
46
47 /* 120ms at 48000 */
48 #define MAX_FRAME_SIZE (960 * 6)
49
50 /**
51  * Pulseaudio specification. May change in the future.
52  */
53 static pa_sample_spec sample_spec = {
54   .format = PA_SAMPLE_FLOAT32LE,
55   .rate = SAMPLING_RATE,
56   .channels = CHANNELS
57 };
58
59 #ifdef DEBUG_DUMP_DECODED_OGG
60 static int dump_to_stdout;
61 #endif
62
63 /**
64  * Pulseaudio mainloop api
65  */
66 static pa_mainloop_api *mainloop_api;
67
68 /**
69  * Pulseaudio threaded mainloop
70  */
71 static pa_threaded_mainloop *m;
72
73 /**
74  * Pulseaudio context
75  */
76 static pa_context *context;
77
78 /**
79  * Pulseaudio output stream
80  */
81 static pa_stream *stream_out;
82
83 /**
84  * OPUS decoder
85  */
86 static OpusDecoder *dec;
87
88 /**
89  * PCM data buffer
90  */
91 static float *pcm_buffer;
92
93 /**
94  * Number of samples for one frame
95  */
96 static int frame_size;
97
98 /**
99  * Pipe we use to signal the main loop that we are ready to receive.
100  */
101 static int ready_pipe[2];
102
103 /**
104  * Ogg I/O state.
105  */
106 static ogg_sync_state oy;
107
108 /**
109  * Ogg stream state.
110  */
111 static ogg_stream_state os;
112
113 static int channels;
114
115 static int preskip;
116
117 static float gain;
118
119 GNUNET_NETWORK_STRUCT_BEGIN
120
121 /* OggOpus spec says the numbers must be in little-endian order */
122 struct OpusHeadPacket
123 {
124   uint8_t magic[8];
125   uint8_t version;
126   uint8_t channels;
127   uint16_t preskip GNUNET_PACKED;
128   uint32_t sampling_rate GNUNET_PACKED;
129   uint16_t gain GNUNET_PACKED;
130   uint8_t channel_mapping;
131 };
132
133 GNUNET_NETWORK_STRUCT_END
134
135 /**
136  * Process an Opus header and setup the opus decoder based on it.
137  * It takes several pointers for header values which are needed
138  * elsewhere in the code.
139  */
140 static OpusDecoder *
141 process_header (ogg_packet *op)
142 {
143   int err;
144   OpusDecoder *dec;
145   struct OpusHeadPacket header;
146
147   if ( ((unsigned int) op->bytes) < sizeof (header))
148     return NULL;
149   GNUNET_memcpy (&header,
150                  op->packet,
151                  sizeof (header));
152   header.preskip = GNUNET_le16toh (header.preskip);
153   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
154   header.gain = GNUNET_le16toh (header.gain);
155
156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
157               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
158               header.version,
159               header.channels,
160               header.preskip,
161               header.sampling_rate,
162               header.gain);
163   channels = header.channels;
164   preskip = header.preskip;
165
166   if (header.channel_mapping != 0)
167   {
168     fprintf (stderr,
169              "This implementation does not support non-mono streams\n");
170     return NULL;
171   }
172
173   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
174   if (OPUS_OK != err)
175   {
176     fprintf (stderr,
177              "Cannot create encoder: %s\n",
178              opus_strerror (err));
179     return NULL;
180   }
181   if (! dec)
182   {
183     fprintf (stderr,
184              "Decoder initialization failed: %s\n",
185              opus_strerror (err));
186     return NULL;
187   }
188
189   if (0 != header.gain)
190   {
191     /*Gain API added in a newer libopus version, if we don't have it
192       we apply the gain ourselves. We also add in a user provided
193       manual gain at the same time.*/
194     int gainadj = (int) header.gain;
195     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196     if(OPUS_UNIMPLEMENTED == err)
197     {
198       gain = pow (10.0, gainadj / 5120.0);
199     }
200     else if (OPUS_OK != err)
201     {
202       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
203       return NULL;
204     }
205   }
206
207   return dec;
208 }
209
210
211 #ifdef DEBUG_DUMP_DECODED_OGG
212 static size_t
213 fwrite_le32(opus_int32 i32, FILE *file)
214 {
215    unsigned char buf[4];
216    buf[0]=(unsigned char)(i32&0xFF);
217    buf[1]=(unsigned char)(i32>>8&0xFF);
218    buf[2]=(unsigned char)(i32>>16&0xFF);
219    buf[3]=(unsigned char)(i32>>24&0xFF);
220    return fwrite(buf,4,1,file);
221 }
222
223
224 static size_t
225 fwrite_le16(int i16, FILE *file)
226 {
227    unsigned char buf[2];
228    buf[0]=(unsigned char)(i16&0xFF);
229    buf[1]=(unsigned char)(i16>>8&0xFF);
230    return fwrite(buf,2,1,file);
231 }
232
233
234 static int
235 write_wav_header()
236 {
237    int ret;
238    FILE *file = stdout;
239
240    ret = fprintf (file, "RIFF") >= 0;
241    ret &= fwrite_le32 (0x7fffffff, file);
242
243    ret &= fprintf (file, "WAVEfmt ") >= 0;
244    ret &= fwrite_le32 (16, file);
245    ret &= fwrite_le16 (1, file);
246    ret &= fwrite_le16 (channels, file);
247    ret &= fwrite_le32 (SAMPLING_RATE, file);
248    ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249    ret &= fwrite_le16 (2*channels, file);
250    ret &= fwrite_le16 (16, file);
251
252    ret &= fprintf (file, "data") >= 0;
253    ret &= fwrite_le32 (0x7fffffff, file);
254
255    return !ret ? -1 : 16;
256 }
257
258 #endif
259
260
261 static int64_t
262 audio_write (int64_t maxout)
263 {
264   int64_t sampout = 0;
265   int tmp_skip;
266   unsigned out_len;
267   unsigned to_write;
268   float *output;
269 #ifdef DEBUG_DUMP_DECODED_OGG
270   static int wrote_wav_header;
271
272   if (dump_to_stdout && !wrote_wav_header)
273   {
274     write_wav_header ();
275     wrote_wav_header = 1;
276   }
277 #endif
278   maxout = 0 > maxout ? 0 : maxout;
279   do
280   {
281     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
282     preskip -= tmp_skip;
283     output = pcm_buffer + channels * tmp_skip;
284     out_len = frame_size - tmp_skip;
285     if (out_len > MAX_FRAME_SIZE)
286       exit (6);
287     frame_size = 0;
288
289     to_write = out_len < maxout ? out_len : (unsigned) maxout;
290     if (0 < maxout)
291     {
292       int64_t wrote = 0;
293       wrote = to_write;
294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295                   "Writing %u * %u * %u = %llu bytes into PA\n",
296                   to_write,
297                   channels,
298                   (unsigned int) sizeof (float),
299                   (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
301       if (dump_to_stdout)
302       {
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
306         int i;
307         int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308         for (i=0;i<(int)out_len*channels;i++)
309           out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
310
311         fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
312       }
313       else
314 #endif
315       if (pa_stream_write
316           (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317           PA_SEEK_RELATIVE) < 0)
318       {
319         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320                     _("pa_stream_write() failed: %s\n"),
321                     pa_strerror (pa_context_errno (context)));
322       }
323       sampout += wrote;
324       maxout -= wrote;
325     }
326   } while (0 < frame_size && 0 < maxout);
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Wrote %" PRId64 " samples\n",
330               sampout);
331   return sampout;
332 }
333
334
335 /**
336  * Pulseaudio shutdown task
337  */
338 static void
339 quit (int ret)
340 {
341   mainloop_api->quit (mainloop_api,
342                       ret);
343   exit (ret);
344 }
345
346
347 static void
348 ogg_demux_and_decode ()
349 {
350   ogg_page og;
351   static int stream_init;
352   int64_t page_granule = 0;
353   ogg_packet op;
354   static int has_opus_stream;
355   static int has_tags_packet;
356   static int32_t opus_serialno;
357   static int64_t link_out;
358   static int64_t packet_count;
359   int eos = 0;
360   static int total_links;
361   static int gran_offset;
362
363   while (1 == ogg_sync_pageout (&oy, &og))
364   {
365     if (0 == stream_init)
366     {
367       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368                   "Initialized the stream\n");
369       ogg_stream_init (&os, ogg_page_serialno (&og));
370       stream_init = 1;
371     }
372     if (ogg_page_serialno (&og) != os.serialno)
373     {
374       /* so all streams are read. */
375       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376                   "Re-set serial number\n");
377       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
378     }
379     /*Add page to the bitstream*/
380     ogg_stream_pagein (&os, &og);
381     page_granule = ogg_page_granulepos (&og);
382     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
383                 "Reading page that ends at %" PRId64 "\n",
384                 page_granule);
385     /*Extract all available packets*/
386     while (1 == ogg_stream_packetout (&os, &op))
387     {
388       /*OggOpus streams are identified by a magic string in the initial
389         stream header.*/
390       if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
391       {
392         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393                     "Got Opus Header\n");
394         if (has_opus_stream && has_tags_packet)
395         {
396           /*If we're seeing another BOS OpusHead now it means
397             the stream is chained without an EOS.
398             This can easily happen if record helper is terminated unexpectedly.
399            */
400           has_opus_stream = 0;
401           if (dec)
402             opus_decoder_destroy (dec);
403           dec = NULL;
404           fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
405         }
406         if (!has_opus_stream)
407         {
408           if (packet_count > 0 && opus_serialno == os.serialno)
409           {
410             fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
411               (int64_t) opus_serialno, (int64_t) os.serialno);
412             quit(1);
413           }
414           opus_serialno = os.serialno;
415           has_opus_stream = 1;
416           has_tags_packet = 0;
417           link_out = 0;
418           packet_count = 0;
419           eos = 0;
420           total_links++;
421           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
422                       "Got header for stream %" PRId64 ", this is %dth link\n",
423                       (int64_t) opus_serialno, total_links);
424         }
425         else
426         {
427           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
428         }
429       }
430       if (!has_opus_stream || os.serialno != opus_serialno)
431       {
432         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433                     "breaking out\n");
434         break;
435       }
436       /*If first packet in a logical stream, process the Opus header*/
437       if (0 == packet_count)
438       {
439         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440                     "Decoding header\n");
441         dec = process_header (&op);
442         if (!dec)
443            quit (1);
444
445         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
446         {
447           /*The format specifies that the initial header and tags packets are on their
448             own pages. To aid implementors in discovering that their files are wrong
449             we reject them explicitly here. In some player designs files like this would
450             fail even without an explicit test.*/
451           fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
452           quit (1);
453         }
454
455         /*Remember how many samples at the front we were told to skip
456           so that we can adjust the timestamp counting.*/
457         gran_offset = preskip;
458
459         if (! pcm_buffer)
460         {
461           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462                       "Allocating %u * %u * %u = %llu bytes of buffer space\n",
463                       MAX_FRAME_SIZE,
464                       channels,
465                       (unsigned int) sizeof (float),
466                       (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
467           pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
468         }
469       }
470       else if (1 == packet_count)
471       {
472         has_tags_packet = 1;
473         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
474         {
475           fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
476           quit (1);
477         }
478       }
479       else
480       {
481         int ret;
482         int64_t maxout;
483         int64_t outsamp;
484
485         /*End of stream condition*/
486         if (op.e_o_s && os.serialno == opus_serialno)
487         {
488           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489                       "Got EOS\n");
490           eos = 1; /* don't care for anything except opus eos */
491         }
492
493         /*Decode Opus packet*/
494         ret = opus_decode_float (dec,
495                                  (const unsigned char *) op.packet,
496                                  op.bytes,
497                                  pcm_buffer,
498                                  MAX_FRAME_SIZE, 0);
499
500         /*If the decoder returned less than zero, we have an error.*/
501         if (0 > ret)
502         {
503           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
504           break;
505         }
506         frame_size = ret;
507         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
509                     ret,
510                     ret * channels,
511                     (unsigned int) op.bytes);
512
513         /*Apply header gain, if we're not using an opus library new
514           enough to do this internally.*/
515         if (0 != gain)
516         {
517           int i;
518           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519                       "Applying gain %f\n",
520                       gain);
521           for (i = 0; i < frame_size * channels; i++)
522             pcm_buffer[i] *= gain;
523         }
524
525         /*This handles making sure that our output duration respects
526           the final end-trim by not letting the output sample count
527           get ahead of the granpos indicated value.*/
528         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
529         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                     "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
531                     packet_count, maxout);
532
533         outsamp = audio_write (0 > maxout ? 0 : maxout);
534         link_out += outsamp;
535       }
536       packet_count++;
537     }
538     if (eos)
539     {
540       has_opus_stream = 0;
541       if (dec)
542         opus_decoder_destroy (dec);
543       dec = NULL;
544     }
545   }
546 }
547
548
549 /**
550  * Message callback
551  *
552  * @param msg message we received.
553  * @return #GNUNET_OK on success,
554  *     #GNUNET_NO to stop further processing due to disconnect (no error)
555  *     #GNUNET_SYSERR to stop further processing due to error
556  */
557 static int
558 stdin_receiver (void *cls,
559                 const struct GNUNET_MessageHeader *msg)
560 {
561   struct AudioMessage *audio;
562   char *data;
563   size_t payload_len;
564
565   (void) cls;
566   switch (ntohs (msg->type))
567   {
568   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
569     audio = (struct AudioMessage *) msg;
570     payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
571
572     /*Get the ogg buffer for writing*/
573     data = ogg_sync_buffer (&oy, payload_len);
574     /*Read bitstream from input file*/
575     GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
576     ogg_sync_wrote (&oy, payload_len);
577
578     ogg_demux_and_decode ();
579     break;
580   default:
581     break;
582   }
583   return GNUNET_OK;
584 }
585
586
587 /**
588  * Callback when data is there for playback
589  */
590 static void
591 stream_write_callback (pa_stream *s,
592                        size_t length,
593                        void *userdata)
594 {
595   /* unblock 'main' */
596   (void) userdata;
597   (void) length;
598   (void) s;
599   if (-1 != ready_pipe[1])
600   {
601     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602                 "Unblocking main loop!\n");
603     (void) write (ready_pipe[1], "r", 1);
604   }
605 }
606
607
608 /**
609  * Exit callback for SIGTERM and SIGINT
610  */
611 static void
612 exit_signal_callback (pa_mainloop_api *m,
613                       pa_signal_event *e,
614                       int sig,
615                       void *userdata)
616 {
617   (void) m;
618   (void) e;
619   (void) sig;
620   (void) userdata;
621   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
622               _("gnunet-helper-audio-playback - Got signal, exiting\n"));
623   quit (1);
624 }
625
626
627 /**
628  * Pulseaudio stream state callback
629  */
630 static void
631 context_state_callback (pa_context *c,
632                         void *userdata)
633 {
634   int p;
635
636   (void) userdata;
637   GNUNET_assert (NULL != c);
638   switch (pa_context_get_state (c))
639   {
640   case PA_CONTEXT_CONNECTING:
641   case PA_CONTEXT_AUTHORIZING:
642   case PA_CONTEXT_SETTING_NAME:
643     break;
644   case PA_CONTEXT_READY:
645   {
646     GNUNET_assert (! stream_out);
647     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
648                 _("Connection established.\n"));
649     if (! (stream_out =
650            pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
651     {
652       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
653                   _("pa_stream_new() failed: %s\n"),
654                   pa_strerror (pa_context_errno (c)));
655       goto fail;
656     }
657     pa_stream_set_write_callback (stream_out,
658                                   &stream_write_callback,
659                                   NULL);
660     if ((p =
661          pa_stream_connect_playback (stream_out, NULL,
662                                      NULL,
663                                      PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
664                                      NULL,  NULL)) < 0)
665     {
666       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
667                   _("pa_stream_connect_playback() failed: %s\n"),
668                   pa_strerror (pa_context_errno (c)));
669       goto fail;
670     }
671     break;
672   }
673   case PA_CONTEXT_TERMINATED:
674     quit (0);
675     break;
676
677   case PA_CONTEXT_FAILED:
678   default:
679     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
680                 _("Connection failure: %s\n"),
681                 pa_strerror (pa_context_errno (c)));
682     goto fail;
683   }
684   return;
685  fail:
686   quit (1);
687 }
688
689
690 /**
691  * Pulseaudio initialization
692  */
693 static void
694 pa_init ()
695 {
696   int r;
697
698   if (!pa_sample_spec_valid (&sample_spec))
699   {
700     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
701                 _("Wrong Spec\n"));
702   }
703   /* set up threaded playback mainloop */
704   if (!(m = pa_threaded_mainloop_new ()))
705   {
706     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707                 _("pa_mainloop_new() failed.\n"));
708   }
709   mainloop_api = pa_threaded_mainloop_get_api (m);
710   /* listen to signals */
711   r = pa_signal_init (mainloop_api);
712   GNUNET_assert (r == 0);
713   pa_signal_new (SIGINT, exit_signal_callback, NULL);
714   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
715
716
717   /* connect to the main pulseaudio context */
718   if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
719   {
720     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
721                 _("pa_context_new() failed.\n"));
722   }
723   pa_context_set_state_callback (context, context_state_callback, NULL);
724
725   if (pa_context_connect (context, NULL, 0, NULL) < 0)
726   {
727     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
728                 _("pa_context_connect() failed: %s\n"),
729                 pa_strerror (pa_context_errno (context)));
730   }
731   if (pa_threaded_mainloop_start (m) < 0)
732   {
733     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
734                 _("pa_mainloop_run() failed.\n"));
735   }
736 }
737
738
739 static void
740 ogg_init ()
741 {
742   ogg_sync_init (&oy);
743 }
744
745
746 static void
747 drain_callback (pa_stream*s, int success, void *userdata)
748 {
749   (void) s;
750   (void) success;
751   (void) userdata;
752   pa_threaded_mainloop_signal (m,
753                                0);
754 }
755
756
757 /**
758  * The main function for the playback helper.
759  *
760  * @param argc number of arguments from the command line
761  * @param argv command line arguments
762  * @return 0 ok, 1 on error
763  */
764 int
765 main (int argc, char *argv[])
766 {
767   static unsigned long long toff;
768   char readbuf[MAXLINE];
769   struct GNUNET_MessageStreamTokenizer *stdin_mst;
770   char c;
771   ssize_t ret;
772 #ifdef DEBUG_READ_PURE_OGG
773   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
774 #endif
775
776   (void) argc;
777   (void) argv;
778   GNUNET_assert (GNUNET_OK ==
779                  GNUNET_log_setup ("gnunet-helper-audio-playback",
780                                    "WARNING",
781                                    NULL));
782   if (0 != pipe (ready_pipe))
783   {
784     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
785     return 1;
786   }
787   stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
788   ogg_init ();
789   pa_init ();
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "Waiting for PulseAudio to be ready.\n");
792   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
793   close (ready_pipe[0]);
794   close (ready_pipe[1]);
795   ready_pipe[0] = -1;
796   ready_pipe[1] = -1;
797 #ifdef DEBUG_DUMP_DECODED_OGG
798   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
799 #endif
800   while (1)
801   {
802     ret = read (STDIN_FILENO,
803                 readbuf,
804                 sizeof (readbuf));
805     toff += ret;
806     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807                 "Received %d bytes of audio data (total: %llu)\n",
808                 (int) ret,
809                 toff);
810     if (0 > ret)
811     {
812       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
813                   _("Read error from STDIN: %s\n"),
814                   strerror (errno));
815       break;
816     }
817     if (0 == ret)
818       break;
819 #ifdef DEBUG_READ_PURE_OGG
820     if (read_pure_ogg)
821     {
822       char *data = ogg_sync_buffer (&oy, ret);
823       GNUNET_memcpy (data, readbuf, ret);
824       ogg_sync_wrote (&oy, ret);
825       ogg_demux_and_decode ();
826     }
827     else
828 #endif
829     GNUNET_MST_from_buffer (stdin_mst,
830                             readbuf, ret,
831                             GNUNET_NO, GNUNET_NO);
832   }
833   GNUNET_MST_destroy (stdin_mst);
834   if (stream_out)
835   {
836     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
837                 "Locking\n");
838     pa_threaded_mainloop_lock (m);
839     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840                 "Draining\n");
841     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
842     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
843     {
844       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845                   "Waiting\n");
846       pa_threaded_mainloop_wait (m);
847     }
848     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849                 "Unreffing\n");
850     pa_operation_unref (o);
851     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852                 "Unlocking\n");
853     pa_threaded_mainloop_unlock (m);
854   }
855   return 0;
856 }