tighten formatting rules
[oweals/gnunet.git] / src / conversation / gnunet-helper-audio-playback.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file conversation/gnunet-helper-audio-playback.c
22  * @brief program to playback audio data to the speaker
23  * @author Siomon Dieterle
24  * @author Andreas Fuchs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
37
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
41 #include <ogg/ogg.h>
42
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
45
46 #define MAXLINE 4096
47
48 #define SAMPLING_RATE 48000
49
50 #define CHANNELS 1
51
52 /* 120ms at 48000 */
53 #define MAX_FRAME_SIZE (960 * 6)
54
55 /**
56  * Pulseaudio specification. May change in the future.
57  */
58 static pa_sample_spec sample_spec = {
59   .format = PA_SAMPLE_FLOAT32LE,
60   .rate = SAMPLING_RATE,
61   .channels = CHANNELS
62 };
63
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
66 #endif
67
68 /**
69  * Pulseaudio mainloop api
70  */
71 static pa_mainloop_api *mainloop_api;
72
73 /**
74  * Pulseaudio threaded mainloop
75  */
76 static pa_threaded_mainloop *m;
77
78 /**
79  * Pulseaudio context
80  */
81 static pa_context *context;
82
83 /**
84  * Pulseaudio output stream
85  */
86 static pa_stream *stream_out;
87
88 /**
89  * OPUS decoder
90  */
91 static OpusDecoder *dec;
92
93 /**
94  * PCM data buffer
95  */
96 static float *pcm_buffer;
97
98 /**
99  * Number of samples for one frame
100  */
101 static int frame_size;
102
103 /**
104  * Pipe we use to signal the main loop that we are ready to receive.
105  */
106 static int ready_pipe[2];
107
108 /**
109  * Ogg I/O state.
110  */
111 static ogg_sync_state oy;
112
113 /**
114  * Ogg stream state.
115  */
116 static ogg_stream_state os;
117
118 static int channels;
119
120 static int preskip;
121
122 static float gain;
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
128 {
129   uint8_t magic[8];
130   uint8_t version;
131   uint8_t channels;
132   uint16_t preskip GNUNET_PACKED;
133   uint32_t sampling_rate GNUNET_PACKED;
134   uint16_t gain GNUNET_PACKED;
135   uint8_t channel_mapping;
136 };
137
138 GNUNET_NETWORK_STRUCT_END
139
140 /**
141  * Process an Opus header and setup the opus decoder based on it.
142  * It takes several pointers for header values which are needed
143  * elsewhere in the code.
144  */
145 static OpusDecoder *
146 process_header (ogg_packet *op)
147 {
148   int err;
149   OpusDecoder *dec;
150   struct OpusHeadPacket header;
151
152   if (((unsigned int) op->bytes) < sizeof(header))
153     return NULL;
154   GNUNET_memcpy (&header,
155                  op->packet,
156                  sizeof(header));
157   header.preskip = GNUNET_le16toh (header.preskip);
158   header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
159   header.gain = GNUNET_le16toh (header.gain);
160
161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
162               "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
163               header.version,
164               header.channels,
165               header.preskip,
166               header.sampling_rate,
167               header.gain);
168   channels = header.channels;
169   preskip = header.preskip;
170
171   if (header.channel_mapping != 0)
172   {
173     fprintf (stderr,
174              "This implementation does not support non-mono streams\n");
175     return NULL;
176   }
177
178   dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
179   if (OPUS_OK != err)
180   {
181     fprintf (stderr,
182              "Cannot create encoder: %s\n",
183              opus_strerror (err));
184     return NULL;
185   }
186   if (! dec)
187   {
188     fprintf (stderr,
189              "Decoder initialization failed: %s\n",
190              opus_strerror (err));
191     return NULL;
192   }
193
194   if (0 != header.gain)
195   {
196     /*Gain API added in a newer libopus version, if we don't have it
197        we apply the gain ourselves. We also add in a user provided
198        manual gain at the same time.*/
199     int gainadj = (int) header.gain;
200     err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
201     if (OPUS_UNIMPLEMENTED == err)
202     {
203       gain = pow (10.0, gainadj / 5120.0);
204     }
205     else if (OPUS_OK != err)
206     {
207       fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
208       return NULL;
209     }
210   }
211
212   return dec;
213 }
214
215
216 #ifdef DEBUG_DUMP_DECODED_OGG
217 static size_t
218 fwrite_le32 (opus_int32 i32, FILE *file)
219 {
220   unsigned char buf[4];
221
222   buf[0] = (unsigned char) (i32 & 0xFF);
223   buf[1] = (unsigned char) (i32 >> 8 & 0xFF);
224   buf[2] = (unsigned char) (i32 >> 16 & 0xFF);
225   buf[3] = (unsigned char) (i32 >> 24 & 0xFF);
226   return fwrite (buf, 4, 1, file);
227 }
228
229
230 static size_t
231 fwrite_le16 (int i16, FILE *file)
232 {
233   unsigned char buf[2];
234
235   buf[0] = (unsigned char) (i16 & 0xFF);
236   buf[1] = (unsigned char) (i16 >> 8 & 0xFF);
237   return fwrite (buf, 2, 1, file);
238 }
239
240
241 static int
242 write_wav_header ()
243 {
244   int ret;
245   FILE *file = stdout;
246
247   ret = fprintf (file, "RIFF") >= 0;
248   ret &= fwrite_le32 (0x7fffffff, file);
249
250   ret &= fprintf (file, "WAVEfmt ") >= 0;
251   ret &= fwrite_le32 (16, file);
252   ret &= fwrite_le16 (1, file);
253   ret &= fwrite_le16 (channels, file);
254   ret &= fwrite_le32 (SAMPLING_RATE, file);
255   ret &= fwrite_le32 (2 * channels * SAMPLING_RATE, file);
256   ret &= fwrite_le16 (2 * channels, file);
257   ret &= fwrite_le16 (16, file);
258
259   ret &= fprintf (file, "data") >= 0;
260   ret &= fwrite_le32 (0x7fffffff, file);
261
262   return ! ret ? -1 : 16;
263 }
264
265
266 #endif
267
268
269 static int64_t
270 audio_write (int64_t maxout)
271 {
272   int64_t sampout = 0;
273   int tmp_skip;
274   unsigned out_len;
275   unsigned to_write;
276   float *output;
277
278 #ifdef DEBUG_DUMP_DECODED_OGG
279   static int wrote_wav_header;
280
281   if (dump_to_stdout && ! wrote_wav_header)
282   {
283     write_wav_header ();
284     wrote_wav_header = 1;
285   }
286 #endif
287   maxout = 0 > maxout ? 0 : maxout;
288   do
289   {
290     tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
291     preskip -= tmp_skip;
292     output = pcm_buffer + channels * tmp_skip;
293     out_len = frame_size - tmp_skip;
294     if (out_len > MAX_FRAME_SIZE)
295       exit (6);
296     frame_size = 0;
297
298     to_write = out_len < maxout ? out_len : (unsigned) maxout;
299     if (0 < maxout)
300     {
301       int64_t wrote = 0;
302       wrote = to_write;
303       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
304                   "Writing %u * %u * %u = %llu bytes into PA\n",
305                   to_write,
306                   channels,
307                   (unsigned int) sizeof(float),
308                   (unsigned long long) (to_write * channels * sizeof(float)));
309 #ifdef DEBUG_DUMP_DECODED_OGG
310       if (dump_to_stdout)
311       {
312 # define fminf(_x, _y) ((_x) < (_y) ? (_x) : (_y))
313 # define fmaxf(_x, _y) ((_x) > (_y) ? (_x) : (_y))
314 # define float2int(flt) ((int) (floor (.5 + flt)))
315         int i;
316         int16_t *out = alloca (sizeof(short) * MAX_FRAME_SIZE * channels);
317         for (i = 0; i < (int) out_len * channels; i++)
318           out[i] = (short) float2int (fmaxf (-32768, fminf (output[i] * 32768.f,
319                                                             32767)));
320
321         fwrite (out, 2 * channels, out_len < maxout ? out_len : maxout, stdout);
322       }
323       else
324 #endif
325       if (pa_stream_write
326             (stream_out, output, to_write * channels * sizeof(float), NULL, 0,
327             PA_SEEK_RELATIVE) < 0)
328       {
329         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
330                     _ ("pa_stream_write() failed: %s\n"),
331                     pa_strerror (pa_context_errno (context)));
332       }
333       sampout += wrote;
334       maxout -= wrote;
335     }
336   }
337   while (0 < frame_size && 0 < maxout);
338
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Wrote %" PRId64 " samples\n",
341               sampout);
342   return sampout;
343 }
344
345
346 /**
347  * Pulseaudio shutdown task
348  */
349 static void
350 quit (int ret)
351 {
352   mainloop_api->quit (mainloop_api,
353                       ret);
354   exit (ret);
355 }
356
357
358 static void
359 ogg_demux_and_decode ()
360 {
361   ogg_page og;
362   static int stream_init;
363   int64_t page_granule = 0;
364   ogg_packet op;
365   static int has_opus_stream;
366   static int has_tags_packet;
367   static int32_t opus_serialno;
368   static int64_t link_out;
369   static int64_t packet_count;
370   int eos = 0;
371   static int total_links;
372   static int gran_offset;
373
374   while (1 == ogg_sync_pageout (&oy, &og))
375   {
376     if (0 == stream_init)
377     {
378       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379                   "Initialized the stream\n");
380       ogg_stream_init (&os, ogg_page_serialno (&og));
381       stream_init = 1;
382     }
383     if (ogg_page_serialno (&og) != os.serialno)
384     {
385       /* so all streams are read. */
386       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387                   "Re-set serial number\n");
388       ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
389     }
390     /*Add page to the bitstream*/
391     ogg_stream_pagein (&os, &og);
392     page_granule = ogg_page_granulepos (&og);
393     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
394                 "Reading page that ends at %" PRId64 "\n",
395                 page_granule);
396     /*Extract all available packets*/
397     while (1 == ogg_stream_packetout (&os, &op))
398     {
399       /*OggOpus streams are identified by a magic string in the initial
400          stream header.*/
401       if (op.b_o_s && (op.bytes >= 8) && ! memcmp (op.packet, "OpusHead", 8))
402       {
403         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404                     "Got Opus Header\n");
405         if (has_opus_stream && has_tags_packet)
406         {
407           /*If we're seeing another BOS OpusHead now it means
408              the stream is chained without an EOS.
409              This can easily happen if record helper is terminated unexpectedly.
410            */
411           has_opus_stream = 0;
412           if (dec)
413             opus_decoder_destroy (dec);
414           dec = NULL;
415           fprintf (stderr,
416                    "\nWarning: stream %" PRId64
417                    " ended without EOS and a new stream began.\n",
418                    (int64_t) os.serialno);
419         }
420         if (! has_opus_stream)
421         {
422           if ((packet_count > 0) && (opus_serialno == os.serialno) )
423           {
424             fprintf (stderr,
425                      "\nError: Apparent chaining without changing serial number (%"
426                      PRId64 "==%" PRId64 ").\n",
427                      (int64_t) opus_serialno, (int64_t) os.serialno);
428             quit (1);
429           }
430           opus_serialno = os.serialno;
431           has_opus_stream = 1;
432           has_tags_packet = 0;
433           link_out = 0;
434           packet_count = 0;
435           eos = 0;
436           total_links++;
437           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438                       "Got header for stream %" PRId64 ", this is %dth link\n",
439                       (int64_t) opus_serialno, total_links);
440         }
441         else
442         {
443           fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n",
444                    (int64_t) os.serialno);
445         }
446       }
447       if (! has_opus_stream || (os.serialno != opus_serialno) )
448       {
449         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450                     "breaking out\n");
451         break;
452       }
453       /*If first packet in a logical stream, process the Opus header*/
454       if (0 == packet_count)
455       {
456         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457                     "Decoding header\n");
458         dec = process_header (&op);
459         if (! dec)
460           quit (1);
461
462         if ((0 != ogg_stream_packetout (&os, &op)) || (255 ==
463                                                        og.header[og.header_len
464                                                                  - 1]) )
465         {
466           /*The format specifies that the initial header and tags packets are on their
467              own pages. To aid implementors in discovering that their files are wrong
468              we reject them explicitly here. In some player designs files like this would
469              fail even without an explicit test.*/
470           fprintf (stderr,
471                    "Extra packets on initial header page. Invalid stream.\n");
472           quit (1);
473         }
474
475         /*Remember how many samples at the front we were told to skip
476            so that we can adjust the timestamp counting.*/
477         gran_offset = preskip;
478
479         if (! pcm_buffer)
480         {
481           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482                       "Allocating %u * %u * %u = %llu bytes of buffer space\n",
483                       MAX_FRAME_SIZE,
484                       channels,
485                       (unsigned int) sizeof(float),
486                       (unsigned long long) (MAX_FRAME_SIZE * channels
487                                             * sizeof(float)));
488           pcm_buffer = pa_xmalloc (sizeof(float) * MAX_FRAME_SIZE * channels);
489         }
490       }
491       else if (1 == packet_count)
492       {
493         has_tags_packet = 1;
494         if ((0 != ogg_stream_packetout (&os, &op)) || (255 ==
495                                                        og.header[og.header_len
496                                                                  - 1]) )
497         {
498           fprintf (stderr,
499                    "Extra packets on initial tags page. Invalid stream.\n");
500           quit (1);
501         }
502       }
503       else
504       {
505         int ret;
506         int64_t maxout;
507         int64_t outsamp;
508
509         /*End of stream condition*/
510         if (op.e_o_s && (os.serialno == opus_serialno) )
511         {
512           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513                       "Got EOS\n");
514           eos = 1;         /* don't care for anything except opus eos */
515         }
516
517         /*Decode Opus packet*/
518         ret = opus_decode_float (dec,
519                                  (const unsigned char *) op.packet,
520                                  op.bytes,
521                                  pcm_buffer,
522                                  MAX_FRAME_SIZE, 0);
523
524         /*If the decoder returned less than zero, we have an error.*/
525         if (0 > ret)
526         {
527           fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
528           break;
529         }
530         frame_size = ret;
531         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532                     "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
533                     ret,
534                     ret * channels,
535                     (unsigned int) op.bytes);
536
537         /*Apply header gain, if we're not using an opus library new
538            enough to do this internally.*/
539         if (0 != gain)
540         {
541           int i;
542           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543                       "Applying gain %f\n",
544                       gain);
545           for (i = 0; i < frame_size * channels; i++)
546             pcm_buffer[i] *= gain;
547         }
548
549         /*This handles making sure that our output duration respects
550            the final end-trim by not letting the output sample count
551            get ahead of the granpos indicated value.*/
552         maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000)
553                  - link_out;
554         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555                     "Writing audio packet %" PRId64 ", at most %" PRId64
556                     " samples\n",
557                     packet_count, maxout);
558
559         outsamp = audio_write (0 > maxout ? 0 : maxout);
560         link_out += outsamp;
561       }
562       packet_count++;
563     }
564     if (eos)
565     {
566       has_opus_stream = 0;
567       if (dec)
568         opus_decoder_destroy (dec);
569       dec = NULL;
570     }
571   }
572 }
573
574
575 /**
576  * Message callback
577  *
578  * @param msg message we received.
579  * @return #GNUNET_OK on success,
580  *     #GNUNET_NO to stop further processing due to disconnect (no error)
581  *     #GNUNET_SYSERR to stop further processing due to error
582  */
583 static int
584 stdin_receiver (void *cls,
585                 const struct GNUNET_MessageHeader *msg)
586 {
587   struct AudioMessage *audio;
588   char *data;
589   size_t payload_len;
590
591   (void) cls;
592   switch (ntohs (msg->type))
593   {
594   case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
595     audio = (struct AudioMessage *) msg;
596     payload_len = ntohs (audio->header.size) - sizeof(struct AudioMessage);
597
598     /*Get the ogg buffer for writing*/
599     data = ogg_sync_buffer (&oy, payload_len);
600     /*Read bitstream from input file*/
601     GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
602     ogg_sync_wrote (&oy, payload_len);
603
604     ogg_demux_and_decode ();
605     break;
606
607   default:
608     break;
609   }
610   return GNUNET_OK;
611 }
612
613
614 /**
615  * Callback when data is there for playback
616  */
617 static void
618 stream_write_callback (pa_stream *s,
619                        size_t length,
620                        void *userdata)
621 {
622   /* unblock 'main' */
623   (void) userdata;
624   (void) length;
625   (void) s;
626   if (-1 != ready_pipe[1])
627   {
628     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629                 "Unblocking main loop!\n");
630     (void) write (ready_pipe[1], "r", 1);
631   }
632 }
633
634
635 /**
636  * Exit callback for SIGTERM and SIGINT
637  */
638 static void
639 exit_signal_callback (pa_mainloop_api *m,
640                       pa_signal_event *e,
641                       int sig,
642                       void *userdata)
643 {
644   (void) m;
645   (void) e;
646   (void) sig;
647   (void) userdata;
648   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
649               _ ("gnunet-helper-audio-playback - Got signal, exiting\n"));
650   quit (1);
651 }
652
653
654 /**
655  * Pulseaudio stream state callback
656  */
657 static void
658 context_state_callback (pa_context *c,
659                         void *userdata)
660 {
661   int p;
662
663   (void) userdata;
664   GNUNET_assert (NULL != c);
665   switch (pa_context_get_state (c))
666   {
667   case PA_CONTEXT_CONNECTING:
668   case PA_CONTEXT_AUTHORIZING:
669   case PA_CONTEXT_SETTING_NAME:
670     break;
671
672   case PA_CONTEXT_READY:
673     {
674       GNUNET_assert (! stream_out);
675       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
676                   _ ("Connection established.\n"));
677       if (! (stream_out =
678                pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
679       {
680         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
681                     _ ("pa_stream_new() failed: %s\n"),
682                     pa_strerror (pa_context_errno (c)));
683         goto fail;
684       }
685       pa_stream_set_write_callback (stream_out,
686                                     &stream_write_callback,
687                                     NULL);
688       if ((p =
689              pa_stream_connect_playback (stream_out, NULL,
690                                          NULL,
691                                          PA_STREAM_ADJUST_LATENCY
692                                          | PA_STREAM_INTERPOLATE_TIMING
693                                          | PA_STREAM_AUTO_TIMING_UPDATE,
694                                          NULL, NULL)) < 0)
695       {
696         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
697                     _ ("pa_stream_connect_playback() failed: %s\n"),
698                     pa_strerror (pa_context_errno (c)));
699         goto fail;
700       }
701       break;
702     }
703
704   case PA_CONTEXT_TERMINATED:
705     quit (0);
706     break;
707
708   case PA_CONTEXT_FAILED:
709   default:
710     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
711                 _ ("Connection failure: %s\n"),
712                 pa_strerror (pa_context_errno (c)));
713     goto fail;
714   }
715   return;
716 fail:
717   quit (1);
718 }
719
720
721 /**
722  * Pulseaudio initialization
723  */
724 static void
725 pa_init ()
726 {
727   int r;
728
729   if (! pa_sample_spec_valid (&sample_spec))
730   {
731     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
732                 _ ("Wrong Spec\n"));
733   }
734   /* set up threaded playback mainloop */
735   if (! (m = pa_threaded_mainloop_new ()))
736   {
737     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
738                 _ ("pa_mainloop_new() failed.\n"));
739   }
740   mainloop_api = pa_threaded_mainloop_get_api (m);
741   /* listen to signals */
742   r = pa_signal_init (mainloop_api);
743   GNUNET_assert (r == 0);
744   pa_signal_new (SIGINT, exit_signal_callback, NULL);
745   pa_signal_new (SIGTERM, exit_signal_callback, NULL);
746
747
748   /* connect to the main pulseaudio context */
749   if (! (context = pa_context_new (mainloop_api, "GNUnet VoIP")))
750   {
751     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
752                 _ ("pa_context_new() failed.\n"));
753   }
754   pa_context_set_state_callback (context, context_state_callback, NULL);
755
756   if (pa_context_connect (context, NULL, 0, NULL) < 0)
757   {
758     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
759                 _ ("pa_context_connect() failed: %s\n"),
760                 pa_strerror (pa_context_errno (context)));
761   }
762   if (pa_threaded_mainloop_start (m) < 0)
763   {
764     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
765                 _ ("pa_mainloop_run() failed.\n"));
766   }
767 }
768
769
770 static void
771 ogg_init ()
772 {
773   ogg_sync_init (&oy);
774 }
775
776
777 static void
778 drain_callback (pa_stream*s, int success, void *userdata)
779 {
780   (void) s;
781   (void) success;
782   (void) userdata;
783   pa_threaded_mainloop_signal (m,
784                                0);
785 }
786
787
788 /**
789  * The main function for the playback helper.
790  *
791  * @param argc number of arguments from the command line
792  * @param argv command line arguments
793  * @return 0 ok, 1 on error
794  */
795 int
796 main (int argc, char *argv[])
797 {
798   static unsigned long long toff;
799   char readbuf[MAXLINE];
800   struct GNUNET_MessageStreamTokenizer *stdin_mst;
801   char c;
802   ssize_t ret;
803
804 #ifdef DEBUG_READ_PURE_OGG
805   int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
806 #endif
807
808   (void) argc;
809   (void) argv;
810   GNUNET_assert (GNUNET_OK ==
811                  GNUNET_log_setup ("gnunet-helper-audio-playback",
812                                    "WARNING",
813                                    NULL));
814   if (0 != pipe (ready_pipe))
815   {
816     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
817     return 1;
818   }
819   stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
820   ogg_init ();
821   pa_init ();
822   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823               "Waiting for PulseAudio to be ready.\n");
824   GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
825   close (ready_pipe[0]);
826   close (ready_pipe[1]);
827   ready_pipe[0] = -1;
828   ready_pipe[1] = -1;
829 #ifdef DEBUG_DUMP_DECODED_OGG
830   dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
831 #endif
832   while (1)
833   {
834     ret = read (STDIN_FILENO,
835                 readbuf,
836                 sizeof(readbuf));
837     toff += ret;
838     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839                 "Received %d bytes of audio data (total: %llu)\n",
840                 (int) ret,
841                 toff);
842     if (0 > ret)
843     {
844       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
845                   _ ("Read error from STDIN: %s\n"),
846                   strerror (errno));
847       break;
848     }
849     if (0 == ret)
850       break;
851 #ifdef DEBUG_READ_PURE_OGG
852     if (read_pure_ogg)
853     {
854       char *data = ogg_sync_buffer (&oy, ret);
855       GNUNET_memcpy (data, readbuf, ret);
856       ogg_sync_wrote (&oy, ret);
857       ogg_demux_and_decode ();
858     }
859     else
860 #endif
861     GNUNET_MST_from_buffer (stdin_mst,
862                             readbuf, ret,
863                             GNUNET_NO, GNUNET_NO);
864   }
865   GNUNET_MST_destroy (stdin_mst);
866   if (stream_out)
867   {
868     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869                 "Locking\n");
870     pa_threaded_mainloop_lock (m);
871     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872                 "Draining\n");
873     pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
874     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
875     {
876       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877                   "Waiting\n");
878       pa_threaded_mainloop_wait (m);
879     }
880     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881                 "Unreffing\n");
882     pa_operation_unref (o);
883     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884                 "Unlocking\n");
885     pa_threaded_mainloop_unlock (m);
886   }
887   return 0;
888 }