-fix crash observed on FreeBSD
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file conversation/gnunet-helper-audio-playback.c
22  * @brief program to playback audio data to the speaker
23  * @author Siomon Dieterle
24  * @author Andreas Fuchs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
37
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
41 #include <ogg/ogg.h>
42
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
45
46 #define MAXLINE 4096
47
48 #define SAMPLING_RATE 48000
49
50 #define CHANNELS 1
51
52 /* 120ms at 48000 */
53 #define MAX_FRAME_SIZE (960 * 6)
54
55 /**
56  * Pulseaudio specification. May change in the future.
57  */
58 static pa_sample_spec sample_spec = {
59   .format = PA_SAMPLE_FLOAT32LE,
60   .rate = SAMPLING_RATE,
61   .channels = CHANNELS
62 };
63
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
66 #endif
67
68 /**
69  * Pulseaudio mainloop api
70  */
71 static pa_mainloop_api *mainloop_api;
72
73 /**
74  * Pulseaudio threaded mainloop
75  */
76 static pa_threaded_mainloop *m;
77
78 /**
79  * Pulseaudio context
80  */
81 static pa_context *context;
82
83 /**
84  * Pulseaudio output stream
85  */
86 static pa_stream *stream_out;
87
88 /**
89  * OPUS decoder
90  */
91 static OpusDecoder *dec;
92
93 /**
94  * PCM data buffer
95  */
96 static float *pcm_buffer;
97
98 /**
99  * Number of samples for one frame
100  */
101 static int frame_size;
102
103 /**
104  * Pipe we use to signal the main loop that we are ready to receive.
105  */
106 static int ready_pipe[2];
107
108 /**
109  * Ogg I/O state.
110  */
111 static ogg_sync_state oy;
112
113 /**
114  * Ogg stream state.
115  */
116 static ogg_stream_state os;
117
118 static int channels;
119
120 static int preskip;
121
122 static float gain;
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
128 {
129   uint8_t magic[8];
130   uint8_t version;
131   uint8_t channels;
132   uint16_t preskip GNUNET_PACKED;
133   uint32_t sampling_rate GNUNET_PACKED;
134   uint16_t gain GNUNET_PACKED;
135   uint8_t channel_mapping;
136 };
137
138 GNUNET_NETWORK_STRUCT_END
139
140 /*Process an Opus header and setup the opus decoder based on it.
141   It takes several pointers for header values which are needed
142   elsewhere in the code.*/
143 static OpusDecoder *
144 process_header (ogg_packet *op)
145 {
146   int err;
147   OpusDecoder *dec;
148   struct OpusHeadPacket header;
149
150   if (op->bytes < sizeof (header))
151     return NULL;
152   memcpy (&header, op->packet, sizeof (header));
153   header.preskip = GNUNET_le16toh (header.preskip);
154   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
155   header.gain = GNUNET_le16toh (header.gain);
156
157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
158               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
159                header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
160
161   channels = header.channels;
162   preskip = header.preskip;
163
164   if (header.channel_mapping != 0)
165   {
166     fprintf (stderr, "This implementation does not support non-mono streams\n");
167     return NULL;
168   }
169
170   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
171   if (OPUS_OK != err)
172   {
173     fprintf (stderr, "Cannot create encoder: %s\n", opus_strerror (err));
174     return NULL;
175   }
176   if (!dec)
177   {
178     fprintf (stderr, "Decoder initialization failed: %s\n", opus_strerror (err));
179     return NULL;
180   }
181
182   if (0 != header.gain)
183   {
184     /*Gain API added in a newer libopus version, if we don't have it
185       we apply the gain ourselves. We also add in a user provided
186       manual gain at the same time.*/
187     int gainadj = (int) header.gain;
188     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
189     if(OPUS_UNIMPLEMENTED == err)
190     {
191       gain = pow (10.0, gainadj / 5120.0);
192     }
193     else if (OPUS_OK != err)
194     {
195       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
196       return NULL;
197     }
198   }
199
200   return dec;
201 }
202
203
204 #ifdef DEBUG_DUMP_DECODED_OGG
205 static size_t fwrite_le32(opus_int32 i32, FILE *file)
206 {
207    unsigned char buf[4];
208    buf[0]=(unsigned char)(i32&0xFF);
209    buf[1]=(unsigned char)(i32>>8&0xFF);
210    buf[2]=(unsigned char)(i32>>16&0xFF);
211    buf[3]=(unsigned char)(i32>>24&0xFF);
212    return fwrite(buf,4,1,file);
213 }
214
215 static size_t fwrite_le16(int i16, FILE *file)
216 {
217    unsigned char buf[2];
218    buf[0]=(unsigned char)(i16&0xFF);
219    buf[1]=(unsigned char)(i16>>8&0xFF);
220    return fwrite(buf,2,1,file);
221 }
222
223 static int write_wav_header()
224 {
225    int ret;
226    FILE *file = stdout;
227
228    ret = fprintf (file, "RIFF") >= 0;
229    ret &= fwrite_le32 (0x7fffffff, file);
230
231    ret &= fprintf (file, "WAVEfmt ") >= 0;
232    ret &= fwrite_le32 (16, file);
233    ret &= fwrite_le16 (1, file);
234    ret &= fwrite_le16 (channels, file);
235    ret &= fwrite_le32 (SAMPLING_RATE, file);
236    ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
237    ret &= fwrite_le16 (2*channels, file);
238    ret &= fwrite_le16 (16, file);
239
240    ret &= fprintf (file, "data") >= 0;
241    ret &= fwrite_le32 (0x7fffffff, file);
242
243    return !ret ? -1 : 16;
244 }
245
246 #endif
247
248 static int64_t
249 audio_write (int64_t maxout)
250 {
251   int64_t sampout = 0;
252   int tmp_skip;
253   unsigned out_len;
254   unsigned to_write;
255   float *output;
256 #ifdef DEBUG_DUMP_DECODED_OGG
257   static int wrote_wav_header;
258
259   if (dump_to_stdout && !wrote_wav_header)
260   {
261     write_wav_header ();
262     wrote_wav_header = 1;
263   }
264 #endif
265   maxout = 0 > maxout ? 0 : maxout;
266   do
267   {
268     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
269     preskip -= tmp_skip;
270     output = pcm_buffer + channels * tmp_skip;
271     out_len = frame_size - tmp_skip;
272     if (out_len > MAX_FRAME_SIZE)
273       exit (6);
274     frame_size = 0;
275
276     to_write = out_len < maxout ? out_len : (unsigned) maxout;
277     if (0 < maxout)
278     {
279       int64_t wrote = 0;
280       wrote = to_write;
281       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282                   "Writing %u * %u * %u = %u bytes into PA\n",
283                   to_write, channels, sizeof (float),
284                   to_write * channels * sizeof (float));
285 #ifdef DEBUG_DUMP_DECODED_OGG
286       if (dump_to_stdout)
287       {
288 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
289 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
290 # define float2int(flt) ((int)(floor(.5+flt)))
291         int i;
292         int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
293         for (i=0;i<(int)out_len*channels;i++)
294           out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
295
296         fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
297       }
298       else
299 #endif
300       if (pa_stream_write
301           (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
302           PA_SEEK_RELATIVE) < 0)
303       {
304         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
305                     _("pa_stream_write() failed: %s\n"),
306                     pa_strerror (pa_context_errno (context)));
307       }
308       sampout += wrote;
309       maxout -= wrote;
310     }
311   } while (0 < frame_size && 0 < maxout);
312
313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
314               "Wrote %" PRId64 " samples\n",
315               sampout);
316   return sampout;
317 }
318
319
320 /**
321  * Pulseaudio shutdown task
322  */
323 static void
324 quit (int ret)
325 {
326   mainloop_api->quit (mainloop_api, ret);
327   exit (ret);
328 }
329
330
331 static void
332 ogg_demux_and_decode ()
333 {
334   ogg_page og;
335   static int stream_init;
336   int64_t page_granule = 0;
337   ogg_packet op;
338   static int has_opus_stream;
339   static int has_tags_packet;
340   static int32_t opus_serialno;
341   static int64_t link_out;
342   static int64_t packet_count;
343   int eos = 0;
344   static int total_links;
345   static int gran_offset;
346
347   while (1 == ogg_sync_pageout (&oy, &og))
348   {
349     if (0 == stream_init)
350     {
351       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352                   "Initialized the stream\n");
353       ogg_stream_init (&os, ogg_page_serialno (&og));
354       stream_init = 1;
355     }
356     if (ogg_page_serialno (&og) != os.serialno)
357     {
358       /* so all streams are read. */
359       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360                   "Re-set serial number\n");
361       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
362     }
363     /*Add page to the bitstream*/
364     ogg_stream_pagein (&os, &og);
365     page_granule = ogg_page_granulepos (&og);
366     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367                 "Reading page that ends at %" PRId64 "\n",
368                 page_granule);
369     /*Extract all available packets*/
370     while (1 == ogg_stream_packetout (&os, &op))
371     {
372       /*OggOpus streams are identified by a magic string in the initial
373         stream header.*/
374       if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
375       {
376         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377                     "Got Opus Header\n");
378         if (has_opus_stream && has_tags_packet)
379         {
380           /*If we're seeing another BOS OpusHead now it means
381             the stream is chained without an EOS.
382             This can easily happen if record helper is terminated unexpectedly.
383            */
384           has_opus_stream = 0;
385           if (dec)
386             opus_decoder_destroy (dec);
387           dec = NULL;
388           fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
389         }
390         if (!has_opus_stream)
391         {
392           if (packet_count > 0 && opus_serialno == os.serialno)
393           {
394             fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
395               (int64_t) opus_serialno, (int64_t) os.serialno);
396             quit(1);
397           }
398           opus_serialno = os.serialno;
399           has_opus_stream = 1;
400           has_tags_packet = 0;
401           link_out = 0;
402           packet_count = 0;
403           eos = 0;
404           total_links++;
405           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406                       "Got header for stream %" PRId64 ", this is %dth link\n",
407                       (int64_t) opus_serialno, total_links);
408         }
409         else
410         {
411           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
412         }
413       }
414       if (!has_opus_stream || os.serialno != opus_serialno)
415       {
416         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                     "breaking out\n");
418         break;
419       }
420       /*If first packet in a logical stream, process the Opus header*/
421       if (0 == packet_count)
422       {
423         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
424                     "Decoding header\n");
425         dec = process_header (&op);
426         if (!dec)
427            quit (1);
428
429         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
430         {
431           /*The format specifies that the initial header and tags packets are on their
432             own pages. To aid implementors in discovering that their files are wrong
433             we reject them explicitly here. In some player designs files like this would
434             fail even without an explicit test.*/
435           fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
436           quit (1);
437         }
438
439         /*Remember how many samples at the front we were told to skip
440           so that we can adjust the timestamp counting.*/
441         gran_offset = preskip;
442
443         if (!pcm_buffer)
444         {
445           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446                   "Allocating %u * %u * %u = %u bytes of buffer space\n",
447                   MAX_FRAME_SIZE, channels, sizeof (float),
448                   MAX_FRAME_SIZE * channels * sizeof (float));
449           pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
450         }
451       }
452       else if (1 == packet_count)
453       {
454         has_tags_packet = 1;
455         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
456         {
457           fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
458           quit (1);
459         }
460       }
461       else
462       {
463         int ret;
464         int64_t maxout;
465         int64_t outsamp;
466
467         /*End of stream condition*/
468         if (op.e_o_s && os.serialno == opus_serialno)
469         {
470           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471                       "Got EOS\n");
472           eos = 1; /* don't care for anything except opus eos */
473         }
474
475         /*Decode Opus packet*/
476         ret = opus_decode_float (dec,
477                                  (const unsigned char *) op.packet,
478                                  op.bytes,
479                                  pcm_buffer,
480                                  MAX_FRAME_SIZE, 0);
481
482         /*If the decoder returned less than zero, we have an error.*/
483         if (0 > ret)
484         {
485           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
486           break;
487         }
488         frame_size = ret;
489         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
491                     ret, ret * channels, op.bytes);
492
493         /*Apply header gain, if we're not using an opus library new
494           enough to do this internally.*/
495         if (0 != gain)
496         {
497           int i;
498           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499                       "Applying gain %f\n",
500                       gain);
501           for (i = 0; i < frame_size * channels; i++)
502             pcm_buffer[i] *= gain;
503         }
504
505         /*This handles making sure that our output duration respects
506           the final end-trim by not letting the output sample count
507           get ahead of the granpos indicated value.*/
508         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
509         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510                     "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
511                     packet_count, maxout);
512
513         outsamp = audio_write (0 > maxout ? 0 : maxout);
514         link_out += outsamp;
515       }
516       packet_count++;
517     }
518     if (eos)
519     {
520       has_opus_stream = 0;
521       if (dec)
522         opus_decoder_destroy (dec);
523       dec = NULL;
524     }
525   }
526 }
527
528 /**
529  * Message callback
530  */
531 static int
532 stdin_receiver (void *cls,
533                 void *client,
534                 const struct GNUNET_MessageHeader *msg)
535 {
536   struct AudioMessage *audio;
537   char *data;
538   size_t payload_len;
539
540   switch (ntohs (msg->type))
541   {
542   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
543     audio = (struct AudioMessage *) msg;
544     payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
545
546     /*Get the ogg buffer for writing*/
547     data = ogg_sync_buffer (&oy, payload_len);
548     /*Read bitstream from input file*/
549     memcpy (data, (const unsigned char *) &audio[1], payload_len);
550     ogg_sync_wrote (&oy, payload_len);
551
552     ogg_demux_and_decode ();
553     break;
554   default:
555     break;
556   }
557   return GNUNET_OK;
558 }
559
560
561 /**
562  * Callback when data is there for playback
563  */
564 static void
565 stream_write_callback (pa_stream * s,
566                        size_t length,
567                        void *userdata)
568 {
569   /* unblock 'main' */
570   if (-1 != ready_pipe[1])
571   {
572     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573                 "Unblocking main loop!\n");
574     write (ready_pipe[1], "r", 1);
575   }
576 }
577
578
579 /**
580  * Exit callback for SIGTERM and SIGINT
581  */
582 static void
583 exit_signal_callback (pa_mainloop_api * m, pa_signal_event * e, int sig,
584                       void *userdata)
585 {
586   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
587               _("gnunet-helper-audio-playback - Got signal, exiting\n"));
588   quit (1);
589 }
590
591
592 /**
593  * Pulseaudio stream state callback
594  */
595 static void
596 context_state_callback (pa_context * c,
597                         void *userdata)
598 {
599   int p;
600
601   GNUNET_assert (NULL != c);
602   switch (pa_context_get_state (c))
603   {
604   case PA_CONTEXT_CONNECTING:
605   case PA_CONTEXT_AUTHORIZING:
606   case PA_CONTEXT_SETTING_NAME:
607     break;
608   case PA_CONTEXT_READY:
609   {
610     GNUNET_assert (!stream_out);
611     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
612                 _("Connection established.\n"));
613     if (!(stream_out =
614           pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
615     {
616       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
617                   _("pa_stream_new() failed: %s\n"),
618                   pa_strerror (pa_context_errno (c)));
619       goto fail;
620     }
621     pa_stream_set_write_callback (stream_out,
622                                   &stream_write_callback,
623                                   NULL);
624     if ((p =
625          pa_stream_connect_playback (stream_out, NULL,
626                                      NULL,
627                                      PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
628                                      NULL,  NULL)) < 0)
629     {
630       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
631                   _("pa_stream_connect_playback() failed: %s\n"),
632                   pa_strerror (pa_context_errno (c)));
633       goto fail;
634     }
635     break;
636   }
637   case PA_CONTEXT_TERMINATED:
638     quit (0);
639     break;
640
641   case PA_CONTEXT_FAILED:
642   default:
643     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
644                 _("Connection failure: %s\n"),
645                 pa_strerror (pa_context_errno (c)));
646     goto fail;
647   }
648   return;
649  fail:
650   quit (1);
651 }
652
653
654 /**
655  * Pulseaudio initialization
656  */
657 static void
658 pa_init ()
659 {
660   int r;
661
662   if (!pa_sample_spec_valid (&sample_spec))
663   {
664     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
665                 _("Wrong Spec\n"));
666   }
667   /* set up threaded playback mainloop */
668   if (!(m = pa_threaded_mainloop_new ()))
669   {
670     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671                 _("pa_mainloop_new() failed.\n"));
672   }
673   mainloop_api = pa_threaded_mainloop_get_api (m);
674   /* listen to signals */
675   r = pa_signal_init (mainloop_api);
676   GNUNET_assert (r == 0);
677   pa_signal_new (SIGINT, exit_signal_callback, NULL);
678   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
679
680
681   /* connect to the main pulseaudio context */
682   if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
683   {
684     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
685                 _("pa_context_new() failed.\n"));
686   }
687   pa_context_set_state_callback (context, context_state_callback, NULL);
688
689   if (pa_context_connect (context, NULL, 0, NULL) < 0)
690   {
691     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
692                 _("pa_context_connect() failed: %s\n"),
693                 pa_strerror (pa_context_errno (context)));
694   }
695   if (pa_threaded_mainloop_start (m) < 0)
696   {
697     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
698                 _("pa_mainloop_run() failed.\n"));
699   }
700 }
701
702
703 static void
704 ogg_init ()
705 {
706   ogg_sync_init (&oy);
707 }
708
709 static void
710 drain_callback (pa_stream*s, int success, void *userdata)
711 {
712   pa_threaded_mainloop_signal (m, 0);
713 }
714
715 /**
716  * The main function for the playback helper.
717  *
718  * @param argc number of arguments from the command line
719  * @param argv command line arguments
720  * @return 0 ok, 1 on error
721  */
722 int
723 main (int argc, char *argv[])
724 {
725   static unsigned long long toff;
726
727   char readbuf[MAXLINE];
728   struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst;
729   char c;
730   ssize_t ret;
731 #ifdef DEBUG_READ_PURE_OGG
732   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
733 #endif
734
735   GNUNET_assert (GNUNET_OK ==
736                  GNUNET_log_setup ("gnunet-helper-audio-playback",
737                                    "WARNING",
738                                    NULL));
739   if (0 != pipe (ready_pipe))
740   {
741     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
742     return 1;
743   }
744   stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
745   ogg_init ();
746   pa_init ();
747   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748               "Waiting for PulseAudio to be ready.\n");
749   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
750   close (ready_pipe[0]);
751   close (ready_pipe[1]);
752   ready_pipe[0] = -1;
753   ready_pipe[1] = -1;
754 #ifdef DEBUG_DUMP_DECODED_OGG
755   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
756 #endif
757   while (1)
758   {
759     ret = read (0, readbuf, sizeof (readbuf));
760     toff += ret;
761     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762                 "Received %d bytes of audio data (total: %llu)\n",
763                 (int) ret,
764                 toff);
765     if (0 > ret)
766     {
767       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
768                   _("Read error from STDIN: %s\n"),
769                   strerror (errno));
770       break;
771     }
772     if (0 == ret)
773       break;
774 #ifdef DEBUG_READ_PURE_OGG
775     if (read_pure_ogg)
776     {
777       char *data = ogg_sync_buffer (&oy, ret);
778       memcpy (data, readbuf, ret);
779       ogg_sync_wrote (&oy, ret);
780       ogg_demux_and_decode ();
781     }
782     else
783 #endif
784     GNUNET_SERVER_mst_receive (stdin_mst, NULL,
785                                readbuf, ret,
786                                GNUNET_NO, GNUNET_NO);
787   }
788   GNUNET_SERVER_mst_destroy (stdin_mst);
789   if (stream_out)
790   {
791     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792                 "Locking\n");
793     pa_threaded_mainloop_lock (m);
794     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795                 "Draining\n");
796     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
797     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
798     {
799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                   "Waiting\n");
801       pa_threaded_mainloop_wait (m);
802     }
803     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804                 "Unreffing\n");
805     pa_operation_unref (o);
806     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807                 "Unlocking\n");
808     pa_threaded_mainloop_unlock (m);
809   }
810   return 0;
811 }