paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file conversation/gnunet-helper-audio-playback.c
20  * @brief program to playback audio data to the speaker
21  * @author Siomon Dieterle
22  * @author Andreas Fuchs
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_protocols.h"
28 #include "conversation.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_core_service.h"
31
32 #include <pulse/simple.h>
33 #include <pulse/error.h>
34 #include <pulse/rtclock.h>
35
36 #include <pulse/pulseaudio.h>
37 #include <opus/opus.h>
38 #include <opus/opus_types.h>
39 #include <ogg/ogg.h>
40
41 #define DEBUG_READ_PURE_OGG 1
42 #define DEBUG_DUMP_DECODED_OGG 1
43
44 #define MAXLINE 4096
45
46 #define SAMPLING_RATE 48000
47
48 #define CHANNELS 1
49
50 /* 120ms at 48000 */
51 #define MAX_FRAME_SIZE (960 * 6)
52
53 /**
54  * Pulseaudio specification. May change in the future.
55  */
56 static pa_sample_spec sample_spec = {
57   .format = PA_SAMPLE_FLOAT32LE,
58   .rate = SAMPLING_RATE,
59   .channels = CHANNELS
60 };
61
62 #ifdef DEBUG_DUMP_DECODED_OGG
63 static int dump_to_stdout;
64 #endif
65
66 /**
67  * Pulseaudio mainloop api
68  */
69 static pa_mainloop_api *mainloop_api;
70
71 /**
72  * Pulseaudio threaded mainloop
73  */
74 static pa_threaded_mainloop *m;
75
76 /**
77  * Pulseaudio context
78  */
79 static pa_context *context;
80
81 /**
82  * Pulseaudio output stream
83  */
84 static pa_stream *stream_out;
85
86 /**
87  * OPUS decoder
88  */
89 static OpusDecoder *dec;
90
91 /**
92  * PCM data buffer
93  */
94 static float *pcm_buffer;
95
96 /**
97  * Number of samples for one frame
98  */
99 static int frame_size;
100
101 /**
102  * Pipe we use to signal the main loop that we are ready to receive.
103  */
104 static int ready_pipe[2];
105
106 /**
107  * Ogg I/O state.
108  */
109 static ogg_sync_state oy;
110
111 /**
112  * Ogg stream state.
113  */
114 static ogg_stream_state os;
115
116 static int channels;
117
118 static int preskip;
119
120 static float gain;
121
122 GNUNET_NETWORK_STRUCT_BEGIN
123
124 /* OggOpus spec says the numbers must be in little-endian order */
125 struct OpusHeadPacket
126 {
127   uint8_t magic[8];
128   uint8_t version;
129   uint8_t channels;
130   uint16_t preskip GNUNET_PACKED;
131   uint32_t sampling_rate GNUNET_PACKED;
132   uint16_t gain GNUNET_PACKED;
133   uint8_t channel_mapping;
134 };
135
136 GNUNET_NETWORK_STRUCT_END
137
138 /**
139  * Process an Opus header and setup the opus decoder based on it.
140  * It takes several pointers for header values which are needed
141  * elsewhere in the code.
142  */
143 static OpusDecoder *
144 process_header (ogg_packet *op)
145 {
146   int err;
147   OpusDecoder *dec;
148   struct OpusHeadPacket header;
149
150   if ( ((unsigned int) op->bytes) < sizeof (header))
151     return NULL;
152   GNUNET_memcpy (&header,
153                  op->packet,
154                  sizeof (header));
155   header.preskip = GNUNET_le16toh (header.preskip);
156   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157   header.gain = GNUNET_le16toh (header.gain);
158
159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161               header.version,
162               header.channels,
163               header.preskip,
164               header.sampling_rate,
165               header.gain);
166   channels = header.channels;
167   preskip = header.preskip;
168
169   if (header.channel_mapping != 0)
170   {
171     fprintf (stderr,
172              "This implementation does not support non-mono streams\n");
173     return NULL;
174   }
175
176   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
177   if (OPUS_OK != err)
178   {
179     fprintf (stderr,
180              "Cannot create encoder: %s\n",
181              opus_strerror (err));
182     return NULL;
183   }
184   if (! dec)
185   {
186     fprintf (stderr,
187              "Decoder initialization failed: %s\n",
188              opus_strerror (err));
189     return NULL;
190   }
191
192   if (0 != header.gain)
193   {
194     /*Gain API added in a newer libopus version, if we don't have it
195       we apply the gain ourselves. We also add in a user provided
196       manual gain at the same time.*/
197     int gainadj = (int) header.gain;
198     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
199     if(OPUS_UNIMPLEMENTED == err)
200     {
201       gain = pow (10.0, gainadj / 5120.0);
202     }
203     else if (OPUS_OK != err)
204     {
205       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
206       return NULL;
207     }
208   }
209
210   return dec;
211 }
212
213
214 #ifdef DEBUG_DUMP_DECODED_OGG
215 static size_t
216 fwrite_le32(opus_int32 i32, FILE *file)
217 {
218    unsigned char buf[4];
219    buf[0]=(unsigned char)(i32&0xFF);
220    buf[1]=(unsigned char)(i32>>8&0xFF);
221    buf[2]=(unsigned char)(i32>>16&0xFF);
222    buf[3]=(unsigned char)(i32>>24&0xFF);
223    return fwrite(buf,4,1,file);
224 }
225
226
227 static size_t
228 fwrite_le16(int i16, FILE *file)
229 {
230    unsigned char buf[2];
231    buf[0]=(unsigned char)(i16&0xFF);
232    buf[1]=(unsigned char)(i16>>8&0xFF);
233    return fwrite(buf,2,1,file);
234 }
235
236
237 static int
238 write_wav_header()
239 {
240    int ret;
241    FILE *file = stdout;
242
243    ret = fprintf (file, "RIFF") >= 0;
244    ret &= fwrite_le32 (0x7fffffff, file);
245
246    ret &= fprintf (file, "WAVEfmt ") >= 0;
247    ret &= fwrite_le32 (16, file);
248    ret &= fwrite_le16 (1, file);
249    ret &= fwrite_le16 (channels, file);
250    ret &= fwrite_le32 (SAMPLING_RATE, file);
251    ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
252    ret &= fwrite_le16 (2*channels, file);
253    ret &= fwrite_le16 (16, file);
254
255    ret &= fprintf (file, "data") >= 0;
256    ret &= fwrite_le32 (0x7fffffff, file);
257
258    return !ret ? -1 : 16;
259 }
260
261 #endif
262
263
264 static int64_t
265 audio_write (int64_t maxout)
266 {
267   int64_t sampout = 0;
268   int tmp_skip;
269   unsigned out_len;
270   unsigned to_write;
271   float *output;
272 #ifdef DEBUG_DUMP_DECODED_OGG
273   static int wrote_wav_header;
274
275   if (dump_to_stdout && !wrote_wav_header)
276   {
277     write_wav_header ();
278     wrote_wav_header = 1;
279   }
280 #endif
281   maxout = 0 > maxout ? 0 : maxout;
282   do
283   {
284     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
285     preskip -= tmp_skip;
286     output = pcm_buffer + channels * tmp_skip;
287     out_len = frame_size - tmp_skip;
288     if (out_len > MAX_FRAME_SIZE)
289       exit (6);
290     frame_size = 0;
291
292     to_write = out_len < maxout ? out_len : (unsigned) maxout;
293     if (0 < maxout)
294     {
295       int64_t wrote = 0;
296       wrote = to_write;
297       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298                   "Writing %u * %u * %u = %llu bytes into PA\n",
299                   to_write,
300                   channels,
301                   (unsigned int) sizeof (float),
302                   (unsigned long long) (to_write * channels * sizeof (float)));
303 #ifdef DEBUG_DUMP_DECODED_OGG
304       if (dump_to_stdout)
305       {
306 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
307 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
308 # define float2int(flt) ((int)(floor(.5+flt)))
309         int i;
310         int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
311         for (i=0;i<(int)out_len*channels;i++)
312           out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
313
314         fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
315       }
316       else
317 #endif
318       if (pa_stream_write
319           (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
320           PA_SEEK_RELATIVE) < 0)
321       {
322         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
323                     _("pa_stream_write() failed: %s\n"),
324                     pa_strerror (pa_context_errno (context)));
325       }
326       sampout += wrote;
327       maxout -= wrote;
328     }
329   } while (0 < frame_size && 0 < maxout);
330
331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
332               "Wrote %" PRId64 " samples\n",
333               sampout);
334   return sampout;
335 }
336
337
338 /**
339  * Pulseaudio shutdown task
340  */
341 static void
342 quit (int ret)
343 {
344   mainloop_api->quit (mainloop_api,
345                       ret);
346   exit (ret);
347 }
348
349
350 static void
351 ogg_demux_and_decode ()
352 {
353   ogg_page og;
354   static int stream_init;
355   int64_t page_granule = 0;
356   ogg_packet op;
357   static int has_opus_stream;
358   static int has_tags_packet;
359   static int32_t opus_serialno;
360   static int64_t link_out;
361   static int64_t packet_count;
362   int eos = 0;
363   static int total_links;
364   static int gran_offset;
365
366   while (1 == ogg_sync_pageout (&oy, &og))
367   {
368     if (0 == stream_init)
369     {
370       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371                   "Initialized the stream\n");
372       ogg_stream_init (&os, ogg_page_serialno (&og));
373       stream_init = 1;
374     }
375     if (ogg_page_serialno (&og) != os.serialno)
376     {
377       /* so all streams are read. */
378       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379                   "Re-set serial number\n");
380       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
381     }
382     /*Add page to the bitstream*/
383     ogg_stream_pagein (&os, &og);
384     page_granule = ogg_page_granulepos (&og);
385     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386                 "Reading page that ends at %" PRId64 "\n",
387                 page_granule);
388     /*Extract all available packets*/
389     while (1 == ogg_stream_packetout (&os, &op))
390     {
391       /*OggOpus streams are identified by a magic string in the initial
392         stream header.*/
393       if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
394       {
395         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396                     "Got Opus Header\n");
397         if (has_opus_stream && has_tags_packet)
398         {
399           /*If we're seeing another BOS OpusHead now it means
400             the stream is chained without an EOS.
401             This can easily happen if record helper is terminated unexpectedly.
402            */
403           has_opus_stream = 0;
404           if (dec)
405             opus_decoder_destroy (dec);
406           dec = NULL;
407           fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
408         }
409         if (!has_opus_stream)
410         {
411           if (packet_count > 0 && opus_serialno == os.serialno)
412           {
413             fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
414               (int64_t) opus_serialno, (int64_t) os.serialno);
415             quit(1);
416           }
417           opus_serialno = os.serialno;
418           has_opus_stream = 1;
419           has_tags_packet = 0;
420           link_out = 0;
421           packet_count = 0;
422           eos = 0;
423           total_links++;
424           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425                       "Got header for stream %" PRId64 ", this is %dth link\n",
426                       (int64_t) opus_serialno, total_links);
427         }
428         else
429         {
430           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
431         }
432       }
433       if (!has_opus_stream || os.serialno != opus_serialno)
434       {
435         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436                     "breaking out\n");
437         break;
438       }
439       /*If first packet in a logical stream, process the Opus header*/
440       if (0 == packet_count)
441       {
442         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443                     "Decoding header\n");
444         dec = process_header (&op);
445         if (!dec)
446            quit (1);
447
448         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
449         {
450           /*The format specifies that the initial header and tags packets are on their
451             own pages. To aid implementors in discovering that their files are wrong
452             we reject them explicitly here. In some player designs files like this would
453             fail even without an explicit test.*/
454           fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
455           quit (1);
456         }
457
458         /*Remember how many samples at the front we were told to skip
459           so that we can adjust the timestamp counting.*/
460         gran_offset = preskip;
461
462         if (! pcm_buffer)
463         {
464           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465                       "Allocating %u * %u * %u = %llu bytes of buffer space\n",
466                       MAX_FRAME_SIZE,
467                       channels,
468                       (unsigned int) sizeof (float),
469                       (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
470           pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
471         }
472       }
473       else if (1 == packet_count)
474       {
475         has_tags_packet = 1;
476         if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
477         {
478           fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
479           quit (1);
480         }
481       }
482       else
483       {
484         int ret;
485         int64_t maxout;
486         int64_t outsamp;
487
488         /*End of stream condition*/
489         if (op.e_o_s && os.serialno == opus_serialno)
490         {
491           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
492                       "Got EOS\n");
493           eos = 1; /* don't care for anything except opus eos */
494         }
495
496         /*Decode Opus packet*/
497         ret = opus_decode_float (dec,
498                                  (const unsigned char *) op.packet,
499                                  op.bytes,
500                                  pcm_buffer,
501                                  MAX_FRAME_SIZE, 0);
502
503         /*If the decoder returned less than zero, we have an error.*/
504         if (0 > ret)
505         {
506           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
507           break;
508         }
509         frame_size = ret;
510         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
512                     ret,
513                     ret * channels,
514                     (unsigned int) op.bytes);
515
516         /*Apply header gain, if we're not using an opus library new
517           enough to do this internally.*/
518         if (0 != gain)
519         {
520           int i;
521           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522                       "Applying gain %f\n",
523                       gain);
524           for (i = 0; i < frame_size * channels; i++)
525             pcm_buffer[i] *= gain;
526         }
527
528         /*This handles making sure that our output duration respects
529           the final end-trim by not letting the output sample count
530           get ahead of the granpos indicated value.*/
531         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
532         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
533                     "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
534                     packet_count, maxout);
535
536         outsamp = audio_write (0 > maxout ? 0 : maxout);
537         link_out += outsamp;
538       }
539       packet_count++;
540     }
541     if (eos)
542     {
543       has_opus_stream = 0;
544       if (dec)
545         opus_decoder_destroy (dec);
546       dec = NULL;
547     }
548   }
549 }
550
551
552 /**
553  * Message callback
554  *
555  * @param msg message we received.
556  * @return #GNUNET_OK on success,
557  *     #GNUNET_NO to stop further processing due to disconnect (no error)
558  *     #GNUNET_SYSERR to stop further processing due to error
559  */
560 static int
561 stdin_receiver (void *cls,
562                 const struct GNUNET_MessageHeader *msg)
563 {
564   struct AudioMessage *audio;
565   char *data;
566   size_t payload_len;
567
568   (void) cls;
569   switch (ntohs (msg->type))
570   {
571   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
572     audio = (struct AudioMessage *) msg;
573     payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
574
575     /*Get the ogg buffer for writing*/
576     data = ogg_sync_buffer (&oy, payload_len);
577     /*Read bitstream from input file*/
578     GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
579     ogg_sync_wrote (&oy, payload_len);
580
581     ogg_demux_and_decode ();
582     break;
583   default:
584     break;
585   }
586   return GNUNET_OK;
587 }
588
589
590 /**
591  * Callback when data is there for playback
592  */
593 static void
594 stream_write_callback (pa_stream *s,
595                        size_t length,
596                        void *userdata)
597 {
598   /* unblock 'main' */
599   (void) userdata;
600   (void) length;
601   (void) s;
602   if (-1 != ready_pipe[1])
603   {
604     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
605                 "Unblocking main loop!\n");
606     (void) write (ready_pipe[1], "r", 1);
607   }
608 }
609
610
611 /**
612  * Exit callback for SIGTERM and SIGINT
613  */
614 static void
615 exit_signal_callback (pa_mainloop_api *m,
616                       pa_signal_event *e,
617                       int sig,
618                       void *userdata)
619 {
620   (void) m;
621   (void) e;
622   (void) sig;
623   (void) userdata;
624   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
625               _("gnunet-helper-audio-playback - Got signal, exiting\n"));
626   quit (1);
627 }
628
629
630 /**
631  * Pulseaudio stream state callback
632  */
633 static void
634 context_state_callback (pa_context *c,
635                         void *userdata)
636 {
637   int p;
638
639   (void) userdata;
640   GNUNET_assert (NULL != c);
641   switch (pa_context_get_state (c))
642   {
643   case PA_CONTEXT_CONNECTING:
644   case PA_CONTEXT_AUTHORIZING:
645   case PA_CONTEXT_SETTING_NAME:
646     break;
647   case PA_CONTEXT_READY:
648   {
649     GNUNET_assert (! stream_out);
650     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
651                 _("Connection established.\n"));
652     if (! (stream_out =
653            pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
654     {
655       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
656                   _("pa_stream_new() failed: %s\n"),
657                   pa_strerror (pa_context_errno (c)));
658       goto fail;
659     }
660     pa_stream_set_write_callback (stream_out,
661                                   &stream_write_callback,
662                                   NULL);
663     if ((p =
664          pa_stream_connect_playback (stream_out, NULL,
665                                      NULL,
666                                      PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
667                                      NULL,  NULL)) < 0)
668     {
669       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
670                   _("pa_stream_connect_playback() failed: %s\n"),
671                   pa_strerror (pa_context_errno (c)));
672       goto fail;
673     }
674     break;
675   }
676   case PA_CONTEXT_TERMINATED:
677     quit (0);
678     break;
679
680   case PA_CONTEXT_FAILED:
681   default:
682     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
683                 _("Connection failure: %s\n"),
684                 pa_strerror (pa_context_errno (c)));
685     goto fail;
686   }
687   return;
688  fail:
689   quit (1);
690 }
691
692
693 /**
694  * Pulseaudio initialization
695  */
696 static void
697 pa_init ()
698 {
699   int r;
700
701   if (!pa_sample_spec_valid (&sample_spec))
702   {
703     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
704                 _("Wrong Spec\n"));
705   }
706   /* set up threaded playback mainloop */
707   if (!(m = pa_threaded_mainloop_new ()))
708   {
709     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
710                 _("pa_mainloop_new() failed.\n"));
711   }
712   mainloop_api = pa_threaded_mainloop_get_api (m);
713   /* listen to signals */
714   r = pa_signal_init (mainloop_api);
715   GNUNET_assert (r == 0);
716   pa_signal_new (SIGINT, exit_signal_callback, NULL);
717   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
718
719
720   /* connect to the main pulseaudio context */
721   if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
722   {
723     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
724                 _("pa_context_new() failed.\n"));
725   }
726   pa_context_set_state_callback (context, context_state_callback, NULL);
727
728   if (pa_context_connect (context, NULL, 0, NULL) < 0)
729   {
730     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
731                 _("pa_context_connect() failed: %s\n"),
732                 pa_strerror (pa_context_errno (context)));
733   }
734   if (pa_threaded_mainloop_start (m) < 0)
735   {
736     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
737                 _("pa_mainloop_run() failed.\n"));
738   }
739 }
740
741
742 static void
743 ogg_init ()
744 {
745   ogg_sync_init (&oy);
746 }
747
748
749 static void
750 drain_callback (pa_stream*s, int success, void *userdata)
751 {
752   (void) s;
753   (void) success;
754   (void) userdata;
755   pa_threaded_mainloop_signal (m,
756                                0);
757 }
758
759
760 /**
761  * The main function for the playback helper.
762  *
763  * @param argc number of arguments from the command line
764  * @param argv command line arguments
765  * @return 0 ok, 1 on error
766  */
767 int
768 main (int argc, char *argv[])
769 {
770   static unsigned long long toff;
771   char readbuf[MAXLINE];
772   struct GNUNET_MessageStreamTokenizer *stdin_mst;
773   char c;
774   ssize_t ret;
775 #ifdef DEBUG_READ_PURE_OGG
776   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
777 #endif
778
779   (void) argc;
780   (void) argv;
781   GNUNET_assert (GNUNET_OK ==
782                  GNUNET_log_setup ("gnunet-helper-audio-playback",
783                                    "WARNING",
784                                    NULL));
785   if (0 != pipe (ready_pipe))
786   {
787     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
788     return 1;
789   }
790   stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
791   ogg_init ();
792   pa_init ();
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794               "Waiting for PulseAudio to be ready.\n");
795   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
796   close (ready_pipe[0]);
797   close (ready_pipe[1]);
798   ready_pipe[0] = -1;
799   ready_pipe[1] = -1;
800 #ifdef DEBUG_DUMP_DECODED_OGG
801   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
802 #endif
803   while (1)
804   {
805     ret = read (STDIN_FILENO,
806                 readbuf,
807                 sizeof (readbuf));
808     toff += ret;
809     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810                 "Received %d bytes of audio data (total: %llu)\n",
811                 (int) ret,
812                 toff);
813     if (0 > ret)
814     {
815       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
816                   _("Read error from STDIN: %s\n"),
817                   strerror (errno));
818       break;
819     }
820     if (0 == ret)
821       break;
822 #ifdef DEBUG_READ_PURE_OGG
823     if (read_pure_ogg)
824     {
825       char *data = ogg_sync_buffer (&oy, ret);
826       GNUNET_memcpy (data, readbuf, ret);
827       ogg_sync_wrote (&oy, ret);
828       ogg_demux_and_decode ();
829     }
830     else
831 #endif
832     GNUNET_MST_from_buffer (stdin_mst,
833                             readbuf, ret,
834                             GNUNET_NO, GNUNET_NO);
835   }
836   GNUNET_MST_destroy (stdin_mst);
837   if (stream_out)
838   {
839     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840                 "Locking\n");
841     pa_threaded_mainloop_lock (m);
842     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843                 "Draining\n");
844     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
845     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
846     {
847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848                   "Waiting\n");
849       pa_threaded_mainloop_wait (m);
850     }
851     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852                 "Unreffing\n");
853     pa_operation_unref (o);
854     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855                 "Unlocking\n");
856     pa_threaded_mainloop_unlock (m);
857   }
858   return 0;
859 }