-add reverse autoadd; with test
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file conversation/gnunet-helper-audio-playback.c
22  * @brief program to playback audio data to the speaker
23  * @author Siomon Dieterle
24  * @author Andreas Fuchs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
37
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
41 #include <ogg/ogg.h>
42
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
45
46 #define MAXLINE 4096
47
48 #define SAMPLING_RATE 48000
49
50 #define CHANNELS 1
51
52 /* 120ms at 48000 */
53 #define MAX_FRAME_SIZE (960 * 6)
54
55 /**
56  * Pulseaudio specification. May change in the future.
57  */
58 static pa_sample_spec sample_spec = {
59   .format = PA_SAMPLE_FLOAT32LE,
60   .rate = SAMPLING_RATE,
61   .channels = CHANNELS
62 };
63
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
66 #endif
67
68 /**
69  * Pulseaudio mainloop api
70  */
71 static pa_mainloop_api *mainloop_api;
72
73 /**
74  * Pulseaudio threaded mainloop
75  */
76 static pa_threaded_mainloop *m;
77
78 /**
79  * Pulseaudio context
80  */
81 static pa_context *context;
82
83 /**
84  * Pulseaudio output stream
85  */
86 static pa_stream *stream_out;
87
88 /**
89  * OPUS decoder
90  */
91 static OpusDecoder *dec;
92
93 /**
94  * PCM data buffer
95  */
96 static float *pcm_buffer;
97
98 /**
99  * Number of samples for one frame
100  */
101 static int frame_size;
102
103 /**
104  * Pipe we use to signal the main loop that we are ready to receive.
105  */
106 static int ready_pipe[2];
107
108 /**
109  * Ogg I/O state.
110  */
111 static ogg_sync_state oy;
112
113 /**
114  * Ogg stream state.
115  */
116 static ogg_stream_state os;
117
118 static int channels;
119
120 static int preskip;
121
122 static float gain;
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
128 {
129   uint8_t magic[8];
130   uint8_t version;
131   uint8_t channels;
132   uint16_t preskip GNUNET_PACKED;
133   uint32_t sampling_rate GNUNET_PACKED;
134   uint16_t gain GNUNET_PACKED;
135   uint8_t channel_mapping;
136 };
137
138 GNUNET_NETWORK_STRUCT_END
139
140 /**
141  * Process an Opus header and setup the opus decoder based on it.
142  * It takes several pointers for header values which are needed
143  * elsewhere in the code.
144  */
145 static OpusDecoder *
146 process_header (ogg_packet *op)
147 {
148   int err;
149   OpusDecoder *dec;
150   struct OpusHeadPacket header;
151
152   if (op->bytes < sizeof (header))
153     return NULL;
154   GNUNET_memcpy (&header, op->packet, sizeof (header));
155   header.preskip = GNUNET_le16toh (header.preskip);
156   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157   header.gain = GNUNET_le16toh (header.gain);
158
159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161                header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
162
163   channels = header.channels;
164   preskip = header.preskip;
165
166   if (header.channel_mapping != 0)
167   {
168     fprintf (stderr,
169              "This implementation does not support non-mono streams\n");
170     return NULL;
171   }
172
173   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
174   if (OPUS_OK != err)
175   {
176     fprintf (stderr,
177              "Cannot create encoder: %s\n",
178              opus_strerror (err));
179     return NULL;
180   }
181   if (! dec)
182   {
183     fprintf (stderr,
184              "Decoder initialization failed: %s\n",
185              opus_strerror (err));
186     return NULL;
187   }
188
189   if (0 != header.gain)
190   {
191     /*Gain API added in a newer libopus version, if we don't have it
192       we apply the gain ourselves. We also add in a user provided
193       manual gain at the same time.*/
194     int gainadj = (int) header.gain;
195     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196     if(OPUS_UNIMPLEMENTED == err)
197     {
198       gain = pow (10.0, gainadj / 5120.0);
199     }
200     else if (OPUS_OK != err)
201     {
202       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
203       return NULL;
204     }
205   }
206
207   return dec;
208 }
209
210
211 #ifdef DEBUG_DUMP_DECODED_OGG
212 static size_t
213 fwrite_le32(opus_int32 i32, FILE *file)
214 {
215    unsigned char buf[4];
216    buf[0]=(unsigned char)(i32&0xFF);
217    buf[1]=(unsigned char)(i32>>8&0xFF);
218    buf[2]=(unsigned char)(i32>>16&0xFF);
219    buf[3]=(unsigned char)(i32>>24&0xFF);
220    return fwrite(buf,4,1,file);
221 }
222
223
224 static size_t
225 fwrite_le16(int i16, FILE *file)
226 {
227    unsigned char buf[2];
228    buf[0]=(unsigned char)(i16&0xFF);
229    buf[1]=(unsigned char)(i16>>8&0xFF);
230    return fwrite(buf,2,1,file);
231 }
232
233
234 static int
235 write_wav_header()
236 {
237    int ret;
238    FILE *file = stdout;
239
240    ret = fprintf (file, "RIFF") >= 0;
241    ret &= fwrite_le32 (0x7fffffff, file);
242
243    ret &= fprintf (file, "WAVEfmt ") >= 0;
244    ret &= fwrite_le32 (16, file);
245    ret &= fwrite_le16 (1, file);
246    ret &= fwrite_le16 (channels, file);
247    ret &= fwrite_le32 (SAMPLING_RATE, file);
248    ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249    ret &= fwrite_le16 (2*channels, file);
250    ret &= fwrite_le16 (16, file);
251
252    ret &= fprintf (file, "data") >= 0;
253    ret &= fwrite_le32 (0x7fffffff, file);
254
255    return !ret ? -1 : 16;
256 }
257
258 #endif
259
260
261 static int64_t
262 audio_write (int64_t maxout)
263 {
264   int64_t sampout = 0;
265   int tmp_skip;
266   unsigned out_len;
267   unsigned to_write;
268   float *output;
269 #ifdef DEBUG_DUMP_DECODED_OGG
270   static int wrote_wav_header;
271
272   if (dump_to_stdout && !wrote_wav_header)
273   {
274     write_wav_header ();
275     wrote_wav_header = 1;
276   }
277 #endif
278   maxout = 0 > maxout ? 0 : maxout;
279   do
280   {
281     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
282     preskip -= tmp_skip;
283     output = pcm_buffer + channels * tmp_skip;
284     out_len = frame_size - tmp_skip;
285     if (out_len > MAX_FRAME_SIZE)
286       exit (6);
287     frame_size = 0;
288
289     to_write = out_len < maxout ? out_len : (unsigned) maxout;
290     if (0 < maxout)
291     {
292       int64_t wrote = 0;
293       wrote = to_write;
294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295                   "Writing %u * %u * %u = %llu bytes into PA\n",
296                   to_write,
297                   channels,
298                   (unsigned int) sizeof (float),
299                   (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
301       if (dump_to_stdout)
302       {
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
306         int i;
307         int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308         for (i=0;i<(int)out_len*channels;i++)
309           out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
310
311         fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
312       }
313       else
314 #endif
315       if (pa_stream_write
316           (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317           PA_SEEK_RELATIVE) < 0)
318       {
319         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320                     _("pa_stream_write() failed: %s\n"),
321                     pa_strerror (pa_context_errno (context)));
322       }
323       sampout += wrote;
324       maxout -= wrote;
325     }
326   } while (0 < frame_size && 0 < maxout);
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Wrote %" PRId64 " samples\n",
330               sampout);
331   return sampout;
332 }
333
334
335 /**
336  * Pulseaudio shutdown task
337  */
338 static void
339 quit (int ret)
340 {
341   mainloop_api->quit (mainloop_api, ret);
342   exit (ret);
343 }
344
345
346 static void
347 ogg_demux_and_decode ()
348 {
349   ogg_page og;
350   static int stream_init;
351   int64_t page_granule = 0;
352   ogg_packet op;
353   static int has_opus_stream;
354   static int has_tags_packet;
355   static int32_t opus_serialno;
356   static int64_t link_out;
357   static int64_t packet_count;
358   int eos = 0;
359   static int total_links;
360   static int gran_offset;
361
362   while (1 == ogg_sync_pageout (&oy, &og))
363   {
364     if (0 == stream_init)
365     {
366       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367                   "Initialized the stream\n");
368       ogg_stream_init (&os, ogg_page_serialno (&og));
369       stream_init = 1;
370     }
371     if (ogg_page_serialno (&og) != os.serialno)
372     {
373       /* so all streams are read. */
374       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375                   "Re-set serial number\n");
376       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
377     }
378     /*Add page to the bitstream*/
379     ogg_stream_pagein (&os, &og);
380     page_granule = ogg_page_granulepos (&og);
381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382                 "Reading page that ends at %" PRId64 "\n",
383                 page_granule);
384     /*Extract all available packets*/
385     while (1 == ogg_stream_packetout (&os, &op))
386     {
387       /*OggOpus streams are identified by a magic string in the initial
388         stream header.*/
389       if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
390       {
391         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                     "Got Opus Header\n");
393         if (has_opus_stream && has_tags_packet)
394         {
395           /*If we're seeing another BOS OpusHead now it means
396             the stream is chained without an EOS.
397             This can easily happen if record helper is terminated unexpectedly.
398            */
399           has_opus_stream = 0;
400           if (dec)
401             opus_decoder_destroy (dec);
402           dec = NULL;
403           fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
404         }
405         if (!has_opus_stream)
406         {
407           if (packet_count > 0 && opus_serialno == os.serialno)
408           {
409             fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
410               (int64_t) opus_serialno, (int64_t) os.serialno);
411             quit(1);
412           }
413           opus_serialno = os.serialno;
414           has_opus_stream = 1;
415           has_tags_packet = 0;
416           link_out = 0;
417           packet_count = 0;
418           eos = 0;
419           total_links++;
420           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421                       "Got header for stream %" PRId64 ", this is %dth link\n",
422                       (int64_t) opus_serialno, total_links);
423         }
424         else
425         {
426           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
427         }
428       }
429       if (!has_opus_stream || os.serialno != opus_serialno)
430       {
431         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432                     "breaking out\n");
433         break;
434       }
435       /*If first packet in a logical stream, process the Opus header*/
436       if (0 == packet_count)
437       {
438         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439                     "Decoding header\n");
440         dec = process_header (&op);
441         if (!dec)
442            quit (1);
443
444         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
445         {
446           /*The format specifies that the initial header and tags packets are on their
447             own pages. To aid implementors in discovering that their files are wrong
448             we reject them explicitly here. In some player designs files like this would
449             fail even without an explicit test.*/
450           fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
451           quit (1);
452         }
453
454         /*Remember how many samples at the front we were told to skip
455           so that we can adjust the timestamp counting.*/
456         gran_offset = preskip;
457
458         if (! pcm_buffer)
459         {
460           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461                       "Allocating %u * %u * %u = %llu bytes of buffer space\n",
462                       MAX_FRAME_SIZE,
463                       channels,
464                       (unsigned int) sizeof (float),
465                       (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
466           pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
467         }
468       }
469       else if (1 == packet_count)
470       {
471         has_tags_packet = 1;
472         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
473         {
474           fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
475           quit (1);
476         }
477       }
478       else
479       {
480         int ret;
481         int64_t maxout;
482         int64_t outsamp;
483
484         /*End of stream condition*/
485         if (op.e_o_s && os.serialno == opus_serialno)
486         {
487           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
488                       "Got EOS\n");
489           eos = 1; /* don't care for anything except opus eos */
490         }
491
492         /*Decode Opus packet*/
493         ret = opus_decode_float (dec,
494                                  (const unsigned char *) op.packet,
495                                  op.bytes,
496                                  pcm_buffer,
497                                  MAX_FRAME_SIZE, 0);
498
499         /*If the decoder returned less than zero, we have an error.*/
500         if (0 > ret)
501         {
502           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
503           break;
504         }
505         frame_size = ret;
506         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
508                     ret,
509                     ret * channels,
510                     (unsigned int) op.bytes);
511
512         /*Apply header gain, if we're not using an opus library new
513           enough to do this internally.*/
514         if (0 != gain)
515         {
516           int i;
517           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518                       "Applying gain %f\n",
519                       gain);
520           for (i = 0; i < frame_size * channels; i++)
521             pcm_buffer[i] *= gain;
522         }
523
524         /*This handles making sure that our output duration respects
525           the final end-trim by not letting the output sample count
526           get ahead of the granpos indicated value.*/
527         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
528         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529                     "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
530                     packet_count, maxout);
531
532         outsamp = audio_write (0 > maxout ? 0 : maxout);
533         link_out += outsamp;
534       }
535       packet_count++;
536     }
537     if (eos)
538     {
539       has_opus_stream = 0;
540       if (dec)
541         opus_decoder_destroy (dec);
542       dec = NULL;
543     }
544   }
545 }
546
547 /**
548  * Message callback
549  */
550 static int
551 stdin_receiver (void *cls,
552                 void *client,
553                 const struct GNUNET_MessageHeader *msg)
554 {
555   struct AudioMessage *audio;
556   char *data;
557   size_t payload_len;
558
559   switch (ntohs (msg->type))
560   {
561   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
562     audio = (struct AudioMessage *) msg;
563     payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
564
565     /*Get the ogg buffer for writing*/
566     data = ogg_sync_buffer (&oy, payload_len);
567     /*Read bitstream from input file*/
568     GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
569     ogg_sync_wrote (&oy, payload_len);
570
571     ogg_demux_and_decode ();
572     break;
573   default:
574     break;
575   }
576   return GNUNET_OK;
577 }
578
579
580 /**
581  * Callback when data is there for playback
582  */
583 static void
584 stream_write_callback (pa_stream *s,
585                        size_t length,
586                        void *userdata)
587 {
588   /* unblock 'main' */
589   if (-1 != ready_pipe[1])
590   {
591     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592                 "Unblocking main loop!\n");
593     (void) write (ready_pipe[1], "r", 1);
594   }
595 }
596
597
598 /**
599  * Exit callback for SIGTERM and SIGINT
600  */
601 static void
602 exit_signal_callback (pa_mainloop_api *m,
603                       pa_signal_event *e,
604                       int sig,
605                       void *userdata)
606 {
607   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
608               _("gnunet-helper-audio-playback - Got signal, exiting\n"));
609   quit (1);
610 }
611
612
613 /**
614  * Pulseaudio stream state callback
615  */
616 static void
617 context_state_callback (pa_context *c,
618                         void *userdata)
619 {
620   int p;
621
622   GNUNET_assert (NULL != c);
623   switch (pa_context_get_state (c))
624   {
625   case PA_CONTEXT_CONNECTING:
626   case PA_CONTEXT_AUTHORIZING:
627   case PA_CONTEXT_SETTING_NAME:
628     break;
629   case PA_CONTEXT_READY:
630   {
631     GNUNET_assert (! stream_out);
632     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
633                 _("Connection established.\n"));
634     if (! (stream_out =
635            pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
636     {
637       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
638                   _("pa_stream_new() failed: %s\n"),
639                   pa_strerror (pa_context_errno (c)));
640       goto fail;
641     }
642     pa_stream_set_write_callback (stream_out,
643                                   &stream_write_callback,
644                                   NULL);
645     if ((p =
646          pa_stream_connect_playback (stream_out, NULL,
647                                      NULL,
648                                      PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
649                                      NULL,  NULL)) < 0)
650     {
651       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
652                   _("pa_stream_connect_playback() failed: %s\n"),
653                   pa_strerror (pa_context_errno (c)));
654       goto fail;
655     }
656     break;
657   }
658   case PA_CONTEXT_TERMINATED:
659     quit (0);
660     break;
661
662   case PA_CONTEXT_FAILED:
663   default:
664     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
665                 _("Connection failure: %s\n"),
666                 pa_strerror (pa_context_errno (c)));
667     goto fail;
668   }
669   return;
670  fail:
671   quit (1);
672 }
673
674
675 /**
676  * Pulseaudio initialization
677  */
678 static void
679 pa_init ()
680 {
681   int r;
682
683   if (!pa_sample_spec_valid (&sample_spec))
684   {
685     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
686                 _("Wrong Spec\n"));
687   }
688   /* set up threaded playback mainloop */
689   if (!(m = pa_threaded_mainloop_new ()))
690   {
691     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
692                 _("pa_mainloop_new() failed.\n"));
693   }
694   mainloop_api = pa_threaded_mainloop_get_api (m);
695   /* listen to signals */
696   r = pa_signal_init (mainloop_api);
697   GNUNET_assert (r == 0);
698   pa_signal_new (SIGINT, exit_signal_callback, NULL);
699   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
700
701
702   /* connect to the main pulseaudio context */
703   if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
704   {
705     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
706                 _("pa_context_new() failed.\n"));
707   }
708   pa_context_set_state_callback (context, context_state_callback, NULL);
709
710   if (pa_context_connect (context, NULL, 0, NULL) < 0)
711   {
712     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
713                 _("pa_context_connect() failed: %s\n"),
714                 pa_strerror (pa_context_errno (context)));
715   }
716   if (pa_threaded_mainloop_start (m) < 0)
717   {
718     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
719                 _("pa_mainloop_run() failed.\n"));
720   }
721 }
722
723
724 static void
725 ogg_init ()
726 {
727   ogg_sync_init (&oy);
728 }
729
730 static void
731 drain_callback (pa_stream*s, int success, void *userdata)
732 {
733   pa_threaded_mainloop_signal (m, 0);
734 }
735
736 /**
737  * The main function for the playback helper.
738  *
739  * @param argc number of arguments from the command line
740  * @param argv command line arguments
741  * @return 0 ok, 1 on error
742  */
743 int
744 main (int argc, char *argv[])
745 {
746   static unsigned long long toff;
747
748   char readbuf[MAXLINE];
749   struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst;
750   char c;
751   ssize_t ret;
752 #ifdef DEBUG_READ_PURE_OGG
753   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
754 #endif
755
756   GNUNET_assert (GNUNET_OK ==
757                  GNUNET_log_setup ("gnunet-helper-audio-playback",
758                                    "WARNING",
759                                    NULL));
760   if (0 != pipe (ready_pipe))
761   {
762     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
763     return 1;
764   }
765   stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
766   ogg_init ();
767   pa_init ();
768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769               "Waiting for PulseAudio to be ready.\n");
770   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
771   close (ready_pipe[0]);
772   close (ready_pipe[1]);
773   ready_pipe[0] = -1;
774   ready_pipe[1] = -1;
775 #ifdef DEBUG_DUMP_DECODED_OGG
776   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
777 #endif
778   while (1)
779   {
780     ret = read (0, readbuf, sizeof (readbuf));
781     toff += ret;
782     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783                 "Received %d bytes of audio data (total: %llu)\n",
784                 (int) ret,
785                 toff);
786     if (0 > ret)
787     {
788       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
789                   _("Read error from STDIN: %s\n"),
790                   strerror (errno));
791       break;
792     }
793     if (0 == ret)
794       break;
795 #ifdef DEBUG_READ_PURE_OGG
796     if (read_pure_ogg)
797     {
798       char *data = ogg_sync_buffer (&oy, ret);
799       GNUNET_memcpy (data, readbuf, ret);
800       ogg_sync_wrote (&oy, ret);
801       ogg_demux_and_decode ();
802     }
803     else
804 #endif
805     GNUNET_SERVER_mst_receive (stdin_mst, NULL,
806                                readbuf, ret,
807                                GNUNET_NO, GNUNET_NO);
808   }
809   GNUNET_SERVER_mst_destroy (stdin_mst);
810   if (stream_out)
811   {
812     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                 "Locking\n");
814     pa_threaded_mainloop_lock (m);
815     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
816                 "Draining\n");
817     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
818     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
819     {
820       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821                   "Waiting\n");
822       pa_threaded_mainloop_wait (m);
823     }
824     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
825                 "Unreffing\n");
826     pa_operation_unref (o);
827     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828                 "Unlocking\n");
829     pa_threaded_mainloop_unlock (m);
830   }
831   return 0;
832 }