-preparations for replacement of try_connect call
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file conversation/gnunet-helper-audio-playback.c
22  * @brief program to playback audio data to the speaker
23  * @author Siomon Dieterle
24  * @author Andreas Fuchs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
37
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
41 #include <ogg/ogg.h>
42
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
45
46 #define MAXLINE 4096
47
48 #define SAMPLING_RATE 48000
49
50 #define CHANNELS 1
51
52 /* 120ms at 48000 */
53 #define MAX_FRAME_SIZE (960 * 6)
54
55 /**
56  * Pulseaudio specification. May change in the future.
57  */
58 static pa_sample_spec sample_spec = {
59   .format = PA_SAMPLE_FLOAT32LE,
60   .rate = SAMPLING_RATE,
61   .channels = CHANNELS
62 };
63
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
66 #endif
67
68 /**
69  * Pulseaudio mainloop api
70  */
71 static pa_mainloop_api *mainloop_api;
72
73 /**
74  * Pulseaudio threaded mainloop
75  */
76 static pa_threaded_mainloop *m;
77
78 /**
79  * Pulseaudio context
80  */
81 static pa_context *context;
82
83 /**
84  * Pulseaudio output stream
85  */
86 static pa_stream *stream_out;
87
88 /**
89  * OPUS decoder
90  */
91 static OpusDecoder *dec;
92
93 /**
94  * PCM data buffer
95  */
96 static float *pcm_buffer;
97
98 /**
99  * Number of samples for one frame
100  */
101 static int frame_size;
102
103 /**
104  * Pipe we use to signal the main loop that we are ready to receive.
105  */
106 static int ready_pipe[2];
107
108 /**
109  * Ogg I/O state.
110  */
111 static ogg_sync_state oy;
112
113 /**
114  * Ogg stream state.
115  */
116 static ogg_stream_state os;
117
118 static int channels;
119
120 static int preskip;
121
122 static float gain;
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
128 {
129   uint8_t magic[8];
130   uint8_t version;
131   uint8_t channels;
132   uint16_t preskip GNUNET_PACKED;
133   uint32_t sampling_rate GNUNET_PACKED;
134   uint16_t gain GNUNET_PACKED;
135   uint8_t channel_mapping;
136 };
137
138 GNUNET_NETWORK_STRUCT_END
139
140 /**
141  * Process an Opus header and setup the opus decoder based on it.
142  * It takes several pointers for header values which are needed
143  * elsewhere in the code.
144  */
145 static OpusDecoder *
146 process_header (ogg_packet *op)
147 {
148   int err;
149   OpusDecoder *dec;
150   struct OpusHeadPacket header;
151
152   if (op->bytes < sizeof (header))
153     return NULL;
154   memcpy (&header, op->packet, sizeof (header));
155   header.preskip = GNUNET_le16toh (header.preskip);
156   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157   header.gain = GNUNET_le16toh (header.gain);
158
159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161                header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
162
163   channels = header.channels;
164   preskip = header.preskip;
165
166   if (header.channel_mapping != 0)
167   {
168     fprintf (stderr,
169              "This implementation does not support non-mono streams\n");
170     return NULL;
171   }
172
173   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
174   if (OPUS_OK != err)
175   {
176     fprintf (stderr, 
177              "Cannot create encoder: %s\n", 
178              opus_strerror (err));
179     return NULL;
180   }
181   if (! dec)
182   {
183     fprintf (stderr,
184              "Decoder initialization failed: %s\n", 
185              opus_strerror (err));
186     return NULL;
187   }
188
189   if (0 != header.gain)
190   {
191     /*Gain API added in a newer libopus version, if we don't have it
192       we apply the gain ourselves. We also add in a user provided
193       manual gain at the same time.*/
194     int gainadj = (int) header.gain;
195     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196     if(OPUS_UNIMPLEMENTED == err)
197     {
198       gain = pow (10.0, gainadj / 5120.0);
199     }
200     else if (OPUS_OK != err)
201     {
202       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
203       return NULL;
204     }
205   }
206
207   return dec;
208 }
209
210
211 #ifdef DEBUG_DUMP_DECODED_OGG
212 static size_t 
213 fwrite_le32(opus_int32 i32, FILE *file)
214 {
215    unsigned char buf[4];
216    buf[0]=(unsigned char)(i32&0xFF);
217    buf[1]=(unsigned char)(i32>>8&0xFF);
218    buf[2]=(unsigned char)(i32>>16&0xFF);
219    buf[3]=(unsigned char)(i32>>24&0xFF);
220    return fwrite(buf,4,1,file);
221 }
222
223
224 static size_t 
225 fwrite_le16(int i16, FILE *file)
226 {
227    unsigned char buf[2];
228    buf[0]=(unsigned char)(i16&0xFF);
229    buf[1]=(unsigned char)(i16>>8&0xFF);
230    return fwrite(buf,2,1,file);
231 }
232
233
234 static int
235 write_wav_header()
236 {
237    int ret;
238    FILE *file = stdout;
239
240    ret = fprintf (file, "RIFF") >= 0;
241    ret &= fwrite_le32 (0x7fffffff, file);
242
243    ret &= fprintf (file, "WAVEfmt ") >= 0;
244    ret &= fwrite_le32 (16, file);
245    ret &= fwrite_le16 (1, file);
246    ret &= fwrite_le16 (channels, file);
247    ret &= fwrite_le32 (SAMPLING_RATE, file);
248    ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249    ret &= fwrite_le16 (2*channels, file);
250    ret &= fwrite_le16 (16, file);
251
252    ret &= fprintf (file, "data") >= 0;
253    ret &= fwrite_le32 (0x7fffffff, file);
254
255    return !ret ? -1 : 16;
256 }
257
258 #endif
259
260
261 static int64_t
262 audio_write (int64_t maxout)
263 {
264   int64_t sampout = 0;
265   int tmp_skip;
266   unsigned out_len;
267   unsigned to_write;
268   float *output;
269 #ifdef DEBUG_DUMP_DECODED_OGG
270   static int wrote_wav_header;
271
272   if (dump_to_stdout && !wrote_wav_header)
273   {
274     write_wav_header ();
275     wrote_wav_header = 1;
276   }
277 #endif
278   maxout = 0 > maxout ? 0 : maxout;
279   do
280   {
281     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
282     preskip -= tmp_skip;
283     output = pcm_buffer + channels * tmp_skip;
284     out_len = frame_size - tmp_skip;
285     if (out_len > MAX_FRAME_SIZE)
286       exit (6);
287     frame_size = 0;
288
289     to_write = out_len < maxout ? out_len : (unsigned) maxout;
290     if (0 < maxout)
291     {
292       int64_t wrote = 0;
293       wrote = to_write;
294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295                   "Writing %u * %u * %u = %u bytes into PA\n",
296                   to_write, channels, sizeof (float),
297                   to_write * channels * sizeof (float));
298 #ifdef DEBUG_DUMP_DECODED_OGG
299       if (dump_to_stdout)
300       {
301 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
302 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
303 # define float2int(flt) ((int)(floor(.5+flt)))
304         int i;
305         int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
306         for (i=0;i<(int)out_len*channels;i++)
307           out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
308
309         fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
310       }
311       else
312 #endif
313       if (pa_stream_write
314           (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
315           PA_SEEK_RELATIVE) < 0)
316       {
317         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
318                     _("pa_stream_write() failed: %s\n"),
319                     pa_strerror (pa_context_errno (context)));
320       }
321       sampout += wrote;
322       maxout -= wrote;
323     }
324   } while (0 < frame_size && 0 < maxout);
325
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Wrote %" PRId64 " samples\n",
328               sampout);
329   return sampout;
330 }
331
332
333 /**
334  * Pulseaudio shutdown task
335  */
336 static void
337 quit (int ret)
338 {
339   mainloop_api->quit (mainloop_api, ret);
340   exit (ret);
341 }
342
343
344 static void
345 ogg_demux_and_decode ()
346 {
347   ogg_page og;
348   static int stream_init;
349   int64_t page_granule = 0;
350   ogg_packet op;
351   static int has_opus_stream;
352   static int has_tags_packet;
353   static int32_t opus_serialno;
354   static int64_t link_out;
355   static int64_t packet_count;
356   int eos = 0;
357   static int total_links;
358   static int gran_offset;
359
360   while (1 == ogg_sync_pageout (&oy, &og))
361   {
362     if (0 == stream_init)
363     {
364       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365                   "Initialized the stream\n");
366       ogg_stream_init (&os, ogg_page_serialno (&og));
367       stream_init = 1;
368     }
369     if (ogg_page_serialno (&og) != os.serialno)
370     {
371       /* so all streams are read. */
372       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
373                   "Re-set serial number\n");
374       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
375     }
376     /*Add page to the bitstream*/
377     ogg_stream_pagein (&os, &og);
378     page_granule = ogg_page_granulepos (&og);
379     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
380                 "Reading page that ends at %" PRId64 "\n",
381                 page_granule);
382     /*Extract all available packets*/
383     while (1 == ogg_stream_packetout (&os, &op))
384     {
385       /*OggOpus streams are identified by a magic string in the initial
386         stream header.*/
387       if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
388       {
389         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390                     "Got Opus Header\n");
391         if (has_opus_stream && has_tags_packet)
392         {
393           /*If we're seeing another BOS OpusHead now it means
394             the stream is chained without an EOS.
395             This can easily happen if record helper is terminated unexpectedly.
396            */
397           has_opus_stream = 0;
398           if (dec)
399             opus_decoder_destroy (dec);
400           dec = NULL;
401           fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
402         }
403         if (!has_opus_stream)
404         {
405           if (packet_count > 0 && opus_serialno == os.serialno)
406           {
407             fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
408               (int64_t) opus_serialno, (int64_t) os.serialno);
409             quit(1);
410           }
411           opus_serialno = os.serialno;
412           has_opus_stream = 1;
413           has_tags_packet = 0;
414           link_out = 0;
415           packet_count = 0;
416           eos = 0;
417           total_links++;
418           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419                       "Got header for stream %" PRId64 ", this is %dth link\n",
420                       (int64_t) opus_serialno, total_links);
421         }
422         else
423         {
424           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
425         }
426       }
427       if (!has_opus_stream || os.serialno != opus_serialno)
428       {
429         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
430                     "breaking out\n");
431         break;
432       }
433       /*If first packet in a logical stream, process the Opus header*/
434       if (0 == packet_count)
435       {
436         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437                     "Decoding header\n");
438         dec = process_header (&op);
439         if (!dec)
440            quit (1);
441
442         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
443         {
444           /*The format specifies that the initial header and tags packets are on their
445             own pages. To aid implementors in discovering that their files are wrong
446             we reject them explicitly here. In some player designs files like this would
447             fail even without an explicit test.*/
448           fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
449           quit (1);
450         }
451
452         /*Remember how many samples at the front we were told to skip
453           so that we can adjust the timestamp counting.*/
454         gran_offset = preskip;
455
456         if (!pcm_buffer)
457         {
458           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459                   "Allocating %u * %u * %u = %u bytes of buffer space\n",
460                   MAX_FRAME_SIZE, channels, sizeof (float),
461                   MAX_FRAME_SIZE * channels * sizeof (float));
462           pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
463         }
464       }
465       else if (1 == packet_count)
466       {
467         has_tags_packet = 1;
468         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
469         {
470           fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
471           quit (1);
472         }
473       }
474       else
475       {
476         int ret;
477         int64_t maxout;
478         int64_t outsamp;
479
480         /*End of stream condition*/
481         if (op.e_o_s && os.serialno == opus_serialno)
482         {
483           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
484                       "Got EOS\n");
485           eos = 1; /* don't care for anything except opus eos */
486         }
487
488         /*Decode Opus packet*/
489         ret = opus_decode_float (dec,
490                                  (const unsigned char *) op.packet,
491                                  op.bytes,
492                                  pcm_buffer,
493                                  MAX_FRAME_SIZE, 0);
494
495         /*If the decoder returned less than zero, we have an error.*/
496         if (0 > ret)
497         {
498           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
499           break;
500         }
501         frame_size = ret;
502         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
504                     ret, ret * channels, op.bytes);
505
506         /*Apply header gain, if we're not using an opus library new
507           enough to do this internally.*/
508         if (0 != gain)
509         {
510           int i;
511           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512                       "Applying gain %f\n",
513                       gain);
514           for (i = 0; i < frame_size * channels; i++)
515             pcm_buffer[i] *= gain;
516         }
517
518         /*This handles making sure that our output duration respects
519           the final end-trim by not letting the output sample count
520           get ahead of the granpos indicated value.*/
521         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
522         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523                     "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
524                     packet_count, maxout);
525
526         outsamp = audio_write (0 > maxout ? 0 : maxout);
527         link_out += outsamp;
528       }
529       packet_count++;
530     }
531     if (eos)
532     {
533       has_opus_stream = 0;
534       if (dec)
535         opus_decoder_destroy (dec);
536       dec = NULL;
537     }
538   }
539 }
540
541 /**
542  * Message callback
543  */
544 static int
545 stdin_receiver (void *cls,
546                 void *client,
547                 const struct GNUNET_MessageHeader *msg)
548 {
549   struct AudioMessage *audio;
550   char *data;
551   size_t payload_len;
552
553   switch (ntohs (msg->type))
554   {
555   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
556     audio = (struct AudioMessage *) msg;
557     payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
558
559     /*Get the ogg buffer for writing*/
560     data = ogg_sync_buffer (&oy, payload_len);
561     /*Read bitstream from input file*/
562     memcpy (data, (const unsigned char *) &audio[1], payload_len);
563     ogg_sync_wrote (&oy, payload_len);
564
565     ogg_demux_and_decode ();
566     break;
567   default:
568     break;
569   }
570   return GNUNET_OK;
571 }
572
573
574 /**
575  * Callback when data is there for playback
576  */
577 static void
578 stream_write_callback (pa_stream *s,
579                        size_t length,
580                        void *userdata)
581 {
582   /* unblock 'main' */
583   if (-1 != ready_pipe[1])
584   {
585     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586                 "Unblocking main loop!\n");
587     (void) write (ready_pipe[1], "r", 1);
588   }
589 }
590
591
592 /**
593  * Exit callback for SIGTERM and SIGINT
594  */
595 static void
596 exit_signal_callback (pa_mainloop_api *m,
597                       pa_signal_event *e,
598                       int sig,
599                       void *userdata)
600 {
601   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
602               _("gnunet-helper-audio-playback - Got signal, exiting\n"));
603   quit (1);
604 }
605
606
607 /**
608  * Pulseaudio stream state callback
609  */
610 static void
611 context_state_callback (pa_context *c,
612                         void *userdata)
613 {
614   int p;
615
616   GNUNET_assert (NULL != c);
617   switch (pa_context_get_state (c))
618   {
619   case PA_CONTEXT_CONNECTING:
620   case PA_CONTEXT_AUTHORIZING:
621   case PA_CONTEXT_SETTING_NAME:
622     break;
623   case PA_CONTEXT_READY:
624   {
625     GNUNET_assert (! stream_out);
626     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
627                 _("Connection established.\n"));
628     if (! (stream_out =
629            pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
630     {
631       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
632                   _("pa_stream_new() failed: %s\n"),
633                   pa_strerror (pa_context_errno (c)));
634       goto fail;
635     }
636     pa_stream_set_write_callback (stream_out,
637                                   &stream_write_callback,
638                                   NULL);
639     if ((p =
640          pa_stream_connect_playback (stream_out, NULL,
641                                      NULL,
642                                      PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
643                                      NULL,  NULL)) < 0)
644     {
645       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
646                   _("pa_stream_connect_playback() failed: %s\n"),
647                   pa_strerror (pa_context_errno (c)));
648       goto fail;
649     }
650     break;
651   }
652   case PA_CONTEXT_TERMINATED:
653     quit (0);
654     break;
655
656   case PA_CONTEXT_FAILED:
657   default:
658     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
659                 _("Connection failure: %s\n"),
660                 pa_strerror (pa_context_errno (c)));
661     goto fail;
662   }
663   return;
664  fail:
665   quit (1);
666 }
667
668
669 /**
670  * Pulseaudio initialization
671  */
672 static void
673 pa_init ()
674 {
675   int r;
676
677   if (!pa_sample_spec_valid (&sample_spec))
678   {
679     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
680                 _("Wrong Spec\n"));
681   }
682   /* set up threaded playback mainloop */
683   if (!(m = pa_threaded_mainloop_new ()))
684   {
685     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
686                 _("pa_mainloop_new() failed.\n"));
687   }
688   mainloop_api = pa_threaded_mainloop_get_api (m);
689   /* listen to signals */
690   r = pa_signal_init (mainloop_api);
691   GNUNET_assert (r == 0);
692   pa_signal_new (SIGINT, exit_signal_callback, NULL);
693   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
694
695
696   /* connect to the main pulseaudio context */
697   if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
698   {
699     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
700                 _("pa_context_new() failed.\n"));
701   }
702   pa_context_set_state_callback (context, context_state_callback, NULL);
703
704   if (pa_context_connect (context, NULL, 0, NULL) < 0)
705   {
706     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707                 _("pa_context_connect() failed: %s\n"),
708                 pa_strerror (pa_context_errno (context)));
709   }
710   if (pa_threaded_mainloop_start (m) < 0)
711   {
712     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
713                 _("pa_mainloop_run() failed.\n"));
714   }
715 }
716
717
718 static void
719 ogg_init ()
720 {
721   ogg_sync_init (&oy);
722 }
723
724 static void
725 drain_callback (pa_stream*s, int success, void *userdata)
726 {
727   pa_threaded_mainloop_signal (m, 0);
728 }
729
730 /**
731  * The main function for the playback helper.
732  *
733  * @param argc number of arguments from the command line
734  * @param argv command line arguments
735  * @return 0 ok, 1 on error
736  */
737 int
738 main (int argc, char *argv[])
739 {
740   static unsigned long long toff;
741
742   char readbuf[MAXLINE];
743   struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst;
744   char c;
745   ssize_t ret;
746 #ifdef DEBUG_READ_PURE_OGG
747   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
748 #endif
749
750   GNUNET_assert (GNUNET_OK ==
751                  GNUNET_log_setup ("gnunet-helper-audio-playback",
752                                    "WARNING",
753                                    NULL));
754   if (0 != pipe (ready_pipe))
755   {
756     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
757     return 1;
758   }
759   stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
760   ogg_init ();
761   pa_init ();
762   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763               "Waiting for PulseAudio to be ready.\n");
764   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
765   close (ready_pipe[0]);
766   close (ready_pipe[1]);
767   ready_pipe[0] = -1;
768   ready_pipe[1] = -1;
769 #ifdef DEBUG_DUMP_DECODED_OGG
770   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
771 #endif
772   while (1)
773   {
774     ret = read (0, readbuf, sizeof (readbuf));
775     toff += ret;
776     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777                 "Received %d bytes of audio data (total: %llu)\n",
778                 (int) ret,
779                 toff);
780     if (0 > ret)
781     {
782       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
783                   _("Read error from STDIN: %s\n"),
784                   strerror (errno));
785       break;
786     }
787     if (0 == ret)
788       break;
789 #ifdef DEBUG_READ_PURE_OGG
790     if (read_pure_ogg)
791     {
792       char *data = ogg_sync_buffer (&oy, ret);
793       memcpy (data, readbuf, ret);
794       ogg_sync_wrote (&oy, ret);
795       ogg_demux_and_decode ();
796     }
797     else
798 #endif
799     GNUNET_SERVER_mst_receive (stdin_mst, NULL,
800                                readbuf, ret,
801                                GNUNET_NO, GNUNET_NO);
802   }
803   GNUNET_SERVER_mst_destroy (stdin_mst);
804   if (stream_out)
805   {
806     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807                 "Locking\n");
808     pa_threaded_mainloop_lock (m);
809     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810                 "Draining\n");
811     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
812     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
813     {
814       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815                   "Waiting\n");
816       pa_threaded_mainloop_wait (m);
817     }
818     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819                 "Unreffing\n");
820     pa_operation_unref (o);
821     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822                 "Unlocking\n");
823     pa_threaded_mainloop_unlock (m);
824   }
825   return 0;
826 }