2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
141 * Process an Opus header and setup the opus decoder based on it.
142 * It takes several pointers for header values which are needed
143 * elsewhere in the code.
146 process_header (ogg_packet *op)
150 struct OpusHeadPacket header;
152 if (op->bytes < sizeof (header))
154 GNUNET_memcpy (&header, op->packet, sizeof (header));
155 header.preskip = GNUNET_le16toh (header.preskip);
156 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157 header.gain = GNUNET_le16toh (header.gain);
159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161 header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
163 channels = header.channels;
164 preskip = header.preskip;
166 if (header.channel_mapping != 0)
169 "This implementation does not support non-mono streams\n");
173 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
177 "Cannot create encoder: %s\n",
178 opus_strerror (err));
184 "Decoder initialization failed: %s\n",
185 opus_strerror (err));
189 if (0 != header.gain)
191 /*Gain API added in a newer libopus version, if we don't have it
192 we apply the gain ourselves. We also add in a user provided
193 manual gain at the same time.*/
194 int gainadj = (int) header.gain;
195 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196 if(OPUS_UNIMPLEMENTED == err)
198 gain = pow (10.0, gainadj / 5120.0);
200 else if (OPUS_OK != err)
202 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
211 #ifdef DEBUG_DUMP_DECODED_OGG
213 fwrite_le32(opus_int32 i32, FILE *file)
215 unsigned char buf[4];
216 buf[0]=(unsigned char)(i32&0xFF);
217 buf[1]=(unsigned char)(i32>>8&0xFF);
218 buf[2]=(unsigned char)(i32>>16&0xFF);
219 buf[3]=(unsigned char)(i32>>24&0xFF);
220 return fwrite(buf,4,1,file);
225 fwrite_le16(int i16, FILE *file)
227 unsigned char buf[2];
228 buf[0]=(unsigned char)(i16&0xFF);
229 buf[1]=(unsigned char)(i16>>8&0xFF);
230 return fwrite(buf,2,1,file);
240 ret = fprintf (file, "RIFF") >= 0;
241 ret &= fwrite_le32 (0x7fffffff, file);
243 ret &= fprintf (file, "WAVEfmt ") >= 0;
244 ret &= fwrite_le32 (16, file);
245 ret &= fwrite_le16 (1, file);
246 ret &= fwrite_le16 (channels, file);
247 ret &= fwrite_le32 (SAMPLING_RATE, file);
248 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249 ret &= fwrite_le16 (2*channels, file);
250 ret &= fwrite_le16 (16, file);
252 ret &= fprintf (file, "data") >= 0;
253 ret &= fwrite_le32 (0x7fffffff, file);
255 return !ret ? -1 : 16;
262 audio_write (int64_t maxout)
269 #ifdef DEBUG_DUMP_DECODED_OGG
270 static int wrote_wav_header;
272 if (dump_to_stdout && !wrote_wav_header)
275 wrote_wav_header = 1;
278 maxout = 0 > maxout ? 0 : maxout;
281 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
283 output = pcm_buffer + channels * tmp_skip;
284 out_len = frame_size - tmp_skip;
285 if (out_len > MAX_FRAME_SIZE)
289 to_write = out_len < maxout ? out_len : (unsigned) maxout;
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "Writing %u * %u * %u = %llu bytes into PA\n",
298 (unsigned int) sizeof (float),
299 (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
307 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308 for (i=0;i<(int)out_len*channels;i++)
309 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
311 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
316 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317 PA_SEEK_RELATIVE) < 0)
319 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320 _("pa_stream_write() failed: %s\n"),
321 pa_strerror (pa_context_errno (context)));
326 } while (0 < frame_size && 0 < maxout);
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329 "Wrote %" PRId64 " samples\n",
336 * Pulseaudio shutdown task
341 mainloop_api->quit (mainloop_api, ret);
347 ogg_demux_and_decode ()
350 static int stream_init;
351 int64_t page_granule = 0;
353 static int has_opus_stream;
354 static int has_tags_packet;
355 static int32_t opus_serialno;
356 static int64_t link_out;
357 static int64_t packet_count;
359 static int total_links;
360 static int gran_offset;
362 while (1 == ogg_sync_pageout (&oy, &og))
364 if (0 == stream_init)
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "Initialized the stream\n");
368 ogg_stream_init (&os, ogg_page_serialno (&og));
371 if (ogg_page_serialno (&og) != os.serialno)
373 /* so all streams are read. */
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375 "Re-set serial number\n");
376 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
378 /*Add page to the bitstream*/
379 ogg_stream_pagein (&os, &og);
380 page_granule = ogg_page_granulepos (&og);
381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382 "Reading page that ends at %" PRId64 "\n",
384 /*Extract all available packets*/
385 while (1 == ogg_stream_packetout (&os, &op))
387 /*OggOpus streams are identified by a magic string in the initial
389 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Got Opus Header\n");
393 if (has_opus_stream && has_tags_packet)
395 /*If we're seeing another BOS OpusHead now it means
396 the stream is chained without an EOS.
397 This can easily happen if record helper is terminated unexpectedly.
401 opus_decoder_destroy (dec);
403 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
405 if (!has_opus_stream)
407 if (packet_count > 0 && opus_serialno == os.serialno)
409 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
410 (int64_t) opus_serialno, (int64_t) os.serialno);
413 opus_serialno = os.serialno;
420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421 "Got header for stream %" PRId64 ", this is %dth link\n",
422 (int64_t) opus_serialno, total_links);
426 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
429 if (!has_opus_stream || os.serialno != opus_serialno)
431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435 /*If first packet in a logical stream, process the Opus header*/
436 if (0 == packet_count)
438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439 "Decoding header\n");
440 dec = process_header (&op);
444 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
446 /*The format specifies that the initial header and tags packets are on their
447 own pages. To aid implementors in discovering that their files are wrong
448 we reject them explicitly here. In some player designs files like this would
449 fail even without an explicit test.*/
450 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
454 /*Remember how many samples at the front we were told to skip
455 so that we can adjust the timestamp counting.*/
456 gran_offset = preskip;
460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
464 (unsigned int) sizeof (float),
465 (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
466 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
469 else if (1 == packet_count)
472 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
474 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
484 /*End of stream condition*/
485 if (op.e_o_s && os.serialno == opus_serialno)
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 eos = 1; /* don't care for anything except opus eos */
492 /*Decode Opus packet*/
493 ret = opus_decode_float (dec,
494 (const unsigned char *) op.packet,
499 /*If the decoder returned less than zero, we have an error.*/
502 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
510 (unsigned int) op.bytes);
512 /*Apply header gain, if we're not using an opus library new
513 enough to do this internally.*/
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "Applying gain %f\n",
520 for (i = 0; i < frame_size * channels; i++)
521 pcm_buffer[i] *= gain;
524 /*This handles making sure that our output duration respects
525 the final end-trim by not letting the output sample count
526 get ahead of the granpos indicated value.*/
527 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
530 packet_count, maxout);
532 outsamp = audio_write (0 > maxout ? 0 : maxout);
541 opus_decoder_destroy (dec);
551 stdin_receiver (void *cls,
552 const struct GNUNET_MessageHeader *msg)
554 struct AudioMessage *audio;
558 switch (ntohs (msg->type))
560 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
561 audio = (struct AudioMessage *) msg;
562 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
564 /*Get the ogg buffer for writing*/
565 data = ogg_sync_buffer (&oy, payload_len);
566 /*Read bitstream from input file*/
567 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
568 ogg_sync_wrote (&oy, payload_len);
570 ogg_demux_and_decode ();
580 * Callback when data is there for playback
583 stream_write_callback (pa_stream *s,
588 if (-1 != ready_pipe[1])
590 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591 "Unblocking main loop!\n");
592 (void) write (ready_pipe[1], "r", 1);
598 * Exit callback for SIGTERM and SIGINT
601 exit_signal_callback (pa_mainloop_api *m,
606 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
607 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
613 * Pulseaudio stream state callback
616 context_state_callback (pa_context *c,
621 GNUNET_assert (NULL != c);
622 switch (pa_context_get_state (c))
624 case PA_CONTEXT_CONNECTING:
625 case PA_CONTEXT_AUTHORIZING:
626 case PA_CONTEXT_SETTING_NAME:
628 case PA_CONTEXT_READY:
630 GNUNET_assert (! stream_out);
631 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
632 _("Connection established.\n"));
634 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
636 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
637 _("pa_stream_new() failed: %s\n"),
638 pa_strerror (pa_context_errno (c)));
641 pa_stream_set_write_callback (stream_out,
642 &stream_write_callback,
645 pa_stream_connect_playback (stream_out, NULL,
647 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
650 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
651 _("pa_stream_connect_playback() failed: %s\n"),
652 pa_strerror (pa_context_errno (c)));
657 case PA_CONTEXT_TERMINATED:
661 case PA_CONTEXT_FAILED:
663 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
664 _("Connection failure: %s\n"),
665 pa_strerror (pa_context_errno (c)));
675 * Pulseaudio initialization
682 if (!pa_sample_spec_valid (&sample_spec))
684 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
687 /* set up threaded playback mainloop */
688 if (!(m = pa_threaded_mainloop_new ()))
690 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
691 _("pa_mainloop_new() failed.\n"));
693 mainloop_api = pa_threaded_mainloop_get_api (m);
694 /* listen to signals */
695 r = pa_signal_init (mainloop_api);
696 GNUNET_assert (r == 0);
697 pa_signal_new (SIGINT, exit_signal_callback, NULL);
698 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
701 /* connect to the main pulseaudio context */
702 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
704 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
705 _("pa_context_new() failed.\n"));
707 pa_context_set_state_callback (context, context_state_callback, NULL);
709 if (pa_context_connect (context, NULL, 0, NULL) < 0)
711 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
712 _("pa_context_connect() failed: %s\n"),
713 pa_strerror (pa_context_errno (context)));
715 if (pa_threaded_mainloop_start (m) < 0)
717 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
718 _("pa_mainloop_run() failed.\n"));
731 drain_callback (pa_stream*s, int success, void *userdata)
733 pa_threaded_mainloop_signal (m, 0);
738 * The main function for the playback helper.
740 * @param argc number of arguments from the command line
741 * @param argv command line arguments
742 * @return 0 ok, 1 on error
745 main (int argc, char *argv[])
747 static unsigned long long toff;
749 char readbuf[MAXLINE];
750 struct GNUNET_MessageStreamTokenizer *stdin_mst;
753 #ifdef DEBUG_READ_PURE_OGG
754 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
757 GNUNET_assert (GNUNET_OK ==
758 GNUNET_log_setup ("gnunet-helper-audio-playback",
761 if (0 != pipe (ready_pipe))
763 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
766 stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
769 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
770 "Waiting for PulseAudio to be ready.\n");
771 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
772 close (ready_pipe[0]);
773 close (ready_pipe[1]);
776 #ifdef DEBUG_DUMP_DECODED_OGG
777 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
781 ret = read (0, readbuf, sizeof (readbuf));
783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784 "Received %d bytes of audio data (total: %llu)\n",
789 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
790 _("Read error from STDIN: %s\n"),
796 #ifdef DEBUG_READ_PURE_OGG
799 char *data = ogg_sync_buffer (&oy, ret);
800 GNUNET_memcpy (data, readbuf, ret);
801 ogg_sync_wrote (&oy, ret);
802 ogg_demux_and_decode ();
806 GNUNET_MST_from_buffer (stdin_mst,
808 GNUNET_NO, GNUNET_NO);
810 GNUNET_MST_destroy (stdin_mst);
813 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815 pa_threaded_mainloop_lock (m);
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
818 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
819 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823 pa_threaded_mainloop_wait (m);
825 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827 pa_operation_unref (o);
828 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
830 pa_threaded_mainloop_unlock (m);