2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
141 * Process an Opus header and setup the opus decoder based on it.
142 * It takes several pointers for header values which are needed
143 * elsewhere in the code.
146 process_header (ogg_packet *op)
150 struct OpusHeadPacket header;
152 if (op->bytes < sizeof (header))
154 GNUNET_memcpy (&header, op->packet, sizeof (header));
155 header.preskip = GNUNET_le16toh (header.preskip);
156 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157 header.gain = GNUNET_le16toh (header.gain);
159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161 header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
163 channels = header.channels;
164 preskip = header.preskip;
166 if (header.channel_mapping != 0)
169 "This implementation does not support non-mono streams\n");
173 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
177 "Cannot create encoder: %s\n",
178 opus_strerror (err));
184 "Decoder initialization failed: %s\n",
185 opus_strerror (err));
189 if (0 != header.gain)
191 /*Gain API added in a newer libopus version, if we don't have it
192 we apply the gain ourselves. We also add in a user provided
193 manual gain at the same time.*/
194 int gainadj = (int) header.gain;
195 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196 if(OPUS_UNIMPLEMENTED == err)
198 gain = pow (10.0, gainadj / 5120.0);
200 else if (OPUS_OK != err)
202 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
211 #ifdef DEBUG_DUMP_DECODED_OGG
213 fwrite_le32(opus_int32 i32, FILE *file)
215 unsigned char buf[4];
216 buf[0]=(unsigned char)(i32&0xFF);
217 buf[1]=(unsigned char)(i32>>8&0xFF);
218 buf[2]=(unsigned char)(i32>>16&0xFF);
219 buf[3]=(unsigned char)(i32>>24&0xFF);
220 return fwrite(buf,4,1,file);
225 fwrite_le16(int i16, FILE *file)
227 unsigned char buf[2];
228 buf[0]=(unsigned char)(i16&0xFF);
229 buf[1]=(unsigned char)(i16>>8&0xFF);
230 return fwrite(buf,2,1,file);
240 ret = fprintf (file, "RIFF") >= 0;
241 ret &= fwrite_le32 (0x7fffffff, file);
243 ret &= fprintf (file, "WAVEfmt ") >= 0;
244 ret &= fwrite_le32 (16, file);
245 ret &= fwrite_le16 (1, file);
246 ret &= fwrite_le16 (channels, file);
247 ret &= fwrite_le32 (SAMPLING_RATE, file);
248 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249 ret &= fwrite_le16 (2*channels, file);
250 ret &= fwrite_le16 (16, file);
252 ret &= fprintf (file, "data") >= 0;
253 ret &= fwrite_le32 (0x7fffffff, file);
255 return !ret ? -1 : 16;
262 audio_write (int64_t maxout)
269 #ifdef DEBUG_DUMP_DECODED_OGG
270 static int wrote_wav_header;
272 if (dump_to_stdout && !wrote_wav_header)
275 wrote_wav_header = 1;
278 maxout = 0 > maxout ? 0 : maxout;
281 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
283 output = pcm_buffer + channels * tmp_skip;
284 out_len = frame_size - tmp_skip;
285 if (out_len > MAX_FRAME_SIZE)
289 to_write = out_len < maxout ? out_len : (unsigned) maxout;
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "Writing %u * %u * %u = %llu bytes into PA\n",
298 (unsigned int) sizeof (float),
299 (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
307 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308 for (i=0;i<(int)out_len*channels;i++)
309 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
311 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
316 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317 PA_SEEK_RELATIVE) < 0)
319 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320 _("pa_stream_write() failed: %s\n"),
321 pa_strerror (pa_context_errno (context)));
326 } while (0 < frame_size && 0 < maxout);
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329 "Wrote %" PRId64 " samples\n",
336 * Pulseaudio shutdown task
341 mainloop_api->quit (mainloop_api, ret);
347 ogg_demux_and_decode ()
350 static int stream_init;
351 int64_t page_granule = 0;
353 static int has_opus_stream;
354 static int has_tags_packet;
355 static int32_t opus_serialno;
356 static int64_t link_out;
357 static int64_t packet_count;
359 static int total_links;
360 static int gran_offset;
362 while (1 == ogg_sync_pageout (&oy, &og))
364 if (0 == stream_init)
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "Initialized the stream\n");
368 ogg_stream_init (&os, ogg_page_serialno (&og));
371 if (ogg_page_serialno (&og) != os.serialno)
373 /* so all streams are read. */
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375 "Re-set serial number\n");
376 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
378 /*Add page to the bitstream*/
379 ogg_stream_pagein (&os, &og);
380 page_granule = ogg_page_granulepos (&og);
381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382 "Reading page that ends at %" PRId64 "\n",
384 /*Extract all available packets*/
385 while (1 == ogg_stream_packetout (&os, &op))
387 /*OggOpus streams are identified by a magic string in the initial
389 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Got Opus Header\n");
393 if (has_opus_stream && has_tags_packet)
395 /*If we're seeing another BOS OpusHead now it means
396 the stream is chained without an EOS.
397 This can easily happen if record helper is terminated unexpectedly.
401 opus_decoder_destroy (dec);
403 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
405 if (!has_opus_stream)
407 if (packet_count > 0 && opus_serialno == os.serialno)
409 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
410 (int64_t) opus_serialno, (int64_t) os.serialno);
413 opus_serialno = os.serialno;
420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421 "Got header for stream %" PRId64 ", this is %dth link\n",
422 (int64_t) opus_serialno, total_links);
426 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
429 if (!has_opus_stream || os.serialno != opus_serialno)
431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435 /*If first packet in a logical stream, process the Opus header*/
436 if (0 == packet_count)
438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439 "Decoding header\n");
440 dec = process_header (&op);
444 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
446 /*The format specifies that the initial header and tags packets are on their
447 own pages. To aid implementors in discovering that their files are wrong
448 we reject them explicitly here. In some player designs files like this would
449 fail even without an explicit test.*/
450 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
454 /*Remember how many samples at the front we were told to skip
455 so that we can adjust the timestamp counting.*/
456 gran_offset = preskip;
460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
464 (unsigned int) sizeof (float),
465 (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
466 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
469 else if (1 == packet_count)
472 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
474 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
484 /*End of stream condition*/
485 if (op.e_o_s && os.serialno == opus_serialno)
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 eos = 1; /* don't care for anything except opus eos */
492 /*Decode Opus packet*/
493 ret = opus_decode_float (dec,
494 (const unsigned char *) op.packet,
499 /*If the decoder returned less than zero, we have an error.*/
502 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
510 (unsigned int) op.bytes);
512 /*Apply header gain, if we're not using an opus library new
513 enough to do this internally.*/
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "Applying gain %f\n",
520 for (i = 0; i < frame_size * channels; i++)
521 pcm_buffer[i] *= gain;
524 /*This handles making sure that our output duration respects
525 the final end-trim by not letting the output sample count
526 get ahead of the granpos indicated value.*/
527 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
530 packet_count, maxout);
532 outsamp = audio_write (0 > maxout ? 0 : maxout);
541 opus_decoder_destroy (dec);
551 stdin_receiver (void *cls,
553 const struct GNUNET_MessageHeader *msg)
555 struct AudioMessage *audio;
559 switch (ntohs (msg->type))
561 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
562 audio = (struct AudioMessage *) msg;
563 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
565 /*Get the ogg buffer for writing*/
566 data = ogg_sync_buffer (&oy, payload_len);
567 /*Read bitstream from input file*/
568 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
569 ogg_sync_wrote (&oy, payload_len);
571 ogg_demux_and_decode ();
581 * Callback when data is there for playback
584 stream_write_callback (pa_stream *s,
589 if (-1 != ready_pipe[1])
591 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592 "Unblocking main loop!\n");
593 (void) write (ready_pipe[1], "r", 1);
599 * Exit callback for SIGTERM and SIGINT
602 exit_signal_callback (pa_mainloop_api *m,
607 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
608 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
614 * Pulseaudio stream state callback
617 context_state_callback (pa_context *c,
622 GNUNET_assert (NULL != c);
623 switch (pa_context_get_state (c))
625 case PA_CONTEXT_CONNECTING:
626 case PA_CONTEXT_AUTHORIZING:
627 case PA_CONTEXT_SETTING_NAME:
629 case PA_CONTEXT_READY:
631 GNUNET_assert (! stream_out);
632 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
633 _("Connection established.\n"));
635 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
637 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
638 _("pa_stream_new() failed: %s\n"),
639 pa_strerror (pa_context_errno (c)));
642 pa_stream_set_write_callback (stream_out,
643 &stream_write_callback,
646 pa_stream_connect_playback (stream_out, NULL,
648 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
651 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
652 _("pa_stream_connect_playback() failed: %s\n"),
653 pa_strerror (pa_context_errno (c)));
658 case PA_CONTEXT_TERMINATED:
662 case PA_CONTEXT_FAILED:
664 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
665 _("Connection failure: %s\n"),
666 pa_strerror (pa_context_errno (c)));
676 * Pulseaudio initialization
683 if (!pa_sample_spec_valid (&sample_spec))
685 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
688 /* set up threaded playback mainloop */
689 if (!(m = pa_threaded_mainloop_new ()))
691 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
692 _("pa_mainloop_new() failed.\n"));
694 mainloop_api = pa_threaded_mainloop_get_api (m);
695 /* listen to signals */
696 r = pa_signal_init (mainloop_api);
697 GNUNET_assert (r == 0);
698 pa_signal_new (SIGINT, exit_signal_callback, NULL);
699 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
702 /* connect to the main pulseaudio context */
703 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
705 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
706 _("pa_context_new() failed.\n"));
708 pa_context_set_state_callback (context, context_state_callback, NULL);
710 if (pa_context_connect (context, NULL, 0, NULL) < 0)
712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
713 _("pa_context_connect() failed: %s\n"),
714 pa_strerror (pa_context_errno (context)));
716 if (pa_threaded_mainloop_start (m) < 0)
718 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
719 _("pa_mainloop_run() failed.\n"));
731 drain_callback (pa_stream*s, int success, void *userdata)
733 pa_threaded_mainloop_signal (m, 0);
737 * The main function for the playback helper.
739 * @param argc number of arguments from the command line
740 * @param argv command line arguments
741 * @return 0 ok, 1 on error
744 main (int argc, char *argv[])
746 static unsigned long long toff;
748 char readbuf[MAXLINE];
749 struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst;
752 #ifdef DEBUG_READ_PURE_OGG
753 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
756 GNUNET_assert (GNUNET_OK ==
757 GNUNET_log_setup ("gnunet-helper-audio-playback",
760 if (0 != pipe (ready_pipe))
762 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
765 stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
768 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769 "Waiting for PulseAudio to be ready.\n");
770 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
771 close (ready_pipe[0]);
772 close (ready_pipe[1]);
775 #ifdef DEBUG_DUMP_DECODED_OGG
776 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
780 ret = read (0, readbuf, sizeof (readbuf));
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "Received %d bytes of audio data (total: %llu)\n",
788 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
789 _("Read error from STDIN: %s\n"),
795 #ifdef DEBUG_READ_PURE_OGG
798 char *data = ogg_sync_buffer (&oy, ret);
799 GNUNET_memcpy (data, readbuf, ret);
800 ogg_sync_wrote (&oy, ret);
801 ogg_demux_and_decode ();
805 GNUNET_SERVER_mst_receive (stdin_mst, NULL,
807 GNUNET_NO, GNUNET_NO);
809 GNUNET_SERVER_mst_destroy (stdin_mst);
812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
814 pa_threaded_mainloop_lock (m);
815 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
818 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822 pa_threaded_mainloop_wait (m);
824 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
826 pa_operation_unref (o);
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829 pa_threaded_mainloop_unlock (m);