2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file conversation/gnunet-helper-audio-playback.c
17 * @brief program to playback audio data to the speaker
18 * @author Siomon Dieterle
19 * @author Andreas Fuchs
20 * @author Christian Grothoff
23 #include "gnunet_util_lib.h"
24 #include "gnunet_protocols.h"
25 #include "conversation.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_core_service.h"
29 #include <pulse/simple.h>
30 #include <pulse/error.h>
31 #include <pulse/rtclock.h>
33 #include <pulse/pulseaudio.h>
34 #include <opus/opus.h>
35 #include <opus/opus_types.h>
38 #define DEBUG_READ_PURE_OGG 1
39 #define DEBUG_DUMP_DECODED_OGG 1
43 #define SAMPLING_RATE 48000
48 #define MAX_FRAME_SIZE (960 * 6)
51 * Pulseaudio specification. May change in the future.
53 static pa_sample_spec sample_spec = {
54 .format = PA_SAMPLE_FLOAT32LE,
55 .rate = SAMPLING_RATE,
59 #ifdef DEBUG_DUMP_DECODED_OGG
60 static int dump_to_stdout;
64 * Pulseaudio mainloop api
66 static pa_mainloop_api *mainloop_api;
69 * Pulseaudio threaded mainloop
71 static pa_threaded_mainloop *m;
76 static pa_context *context;
79 * Pulseaudio output stream
81 static pa_stream *stream_out;
86 static OpusDecoder *dec;
91 static float *pcm_buffer;
94 * Number of samples for one frame
96 static int frame_size;
99 * Pipe we use to signal the main loop that we are ready to receive.
101 static int ready_pipe[2];
106 static ogg_sync_state oy;
111 static ogg_stream_state os;
119 GNUNET_NETWORK_STRUCT_BEGIN
121 /* OggOpus spec says the numbers must be in little-endian order */
122 struct OpusHeadPacket
127 uint16_t preskip GNUNET_PACKED;
128 uint32_t sampling_rate GNUNET_PACKED;
129 uint16_t gain GNUNET_PACKED;
130 uint8_t channel_mapping;
133 GNUNET_NETWORK_STRUCT_END
136 * Process an Opus header and setup the opus decoder based on it.
137 * It takes several pointers for header values which are needed
138 * elsewhere in the code.
141 process_header (ogg_packet *op)
145 struct OpusHeadPacket header;
147 if ( ((unsigned int) op->bytes) < sizeof (header))
149 GNUNET_memcpy (&header,
152 header.preskip = GNUNET_le16toh (header.preskip);
153 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
154 header.gain = GNUNET_le16toh (header.gain);
156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
157 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161 header.sampling_rate,
163 channels = header.channels;
164 preskip = header.preskip;
166 if (header.channel_mapping != 0)
169 "This implementation does not support non-mono streams\n");
173 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
177 "Cannot create encoder: %s\n",
178 opus_strerror (err));
184 "Decoder initialization failed: %s\n",
185 opus_strerror (err));
189 if (0 != header.gain)
191 /*Gain API added in a newer libopus version, if we don't have it
192 we apply the gain ourselves. We also add in a user provided
193 manual gain at the same time.*/
194 int gainadj = (int) header.gain;
195 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196 if(OPUS_UNIMPLEMENTED == err)
198 gain = pow (10.0, gainadj / 5120.0);
200 else if (OPUS_OK != err)
202 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
211 #ifdef DEBUG_DUMP_DECODED_OGG
213 fwrite_le32(opus_int32 i32, FILE *file)
215 unsigned char buf[4];
216 buf[0]=(unsigned char)(i32&0xFF);
217 buf[1]=(unsigned char)(i32>>8&0xFF);
218 buf[2]=(unsigned char)(i32>>16&0xFF);
219 buf[3]=(unsigned char)(i32>>24&0xFF);
220 return fwrite(buf,4,1,file);
225 fwrite_le16(int i16, FILE *file)
227 unsigned char buf[2];
228 buf[0]=(unsigned char)(i16&0xFF);
229 buf[1]=(unsigned char)(i16>>8&0xFF);
230 return fwrite(buf,2,1,file);
240 ret = fprintf (file, "RIFF") >= 0;
241 ret &= fwrite_le32 (0x7fffffff, file);
243 ret &= fprintf (file, "WAVEfmt ") >= 0;
244 ret &= fwrite_le32 (16, file);
245 ret &= fwrite_le16 (1, file);
246 ret &= fwrite_le16 (channels, file);
247 ret &= fwrite_le32 (SAMPLING_RATE, file);
248 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249 ret &= fwrite_le16 (2*channels, file);
250 ret &= fwrite_le16 (16, file);
252 ret &= fprintf (file, "data") >= 0;
253 ret &= fwrite_le32 (0x7fffffff, file);
255 return !ret ? -1 : 16;
262 audio_write (int64_t maxout)
269 #ifdef DEBUG_DUMP_DECODED_OGG
270 static int wrote_wav_header;
272 if (dump_to_stdout && !wrote_wav_header)
275 wrote_wav_header = 1;
278 maxout = 0 > maxout ? 0 : maxout;
281 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
283 output = pcm_buffer + channels * tmp_skip;
284 out_len = frame_size - tmp_skip;
285 if (out_len > MAX_FRAME_SIZE)
289 to_write = out_len < maxout ? out_len : (unsigned) maxout;
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "Writing %u * %u * %u = %llu bytes into PA\n",
298 (unsigned int) sizeof (float),
299 (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
307 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308 for (i=0;i<(int)out_len*channels;i++)
309 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
311 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
316 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317 PA_SEEK_RELATIVE) < 0)
319 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320 _("pa_stream_write() failed: %s\n"),
321 pa_strerror (pa_context_errno (context)));
326 } while (0 < frame_size && 0 < maxout);
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329 "Wrote %" PRId64 " samples\n",
336 * Pulseaudio shutdown task
341 mainloop_api->quit (mainloop_api,
348 ogg_demux_and_decode ()
351 static int stream_init;
352 int64_t page_granule = 0;
354 static int has_opus_stream;
355 static int has_tags_packet;
356 static int32_t opus_serialno;
357 static int64_t link_out;
358 static int64_t packet_count;
360 static int total_links;
361 static int gran_offset;
363 while (1 == ogg_sync_pageout (&oy, &og))
365 if (0 == stream_init)
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Initialized the stream\n");
369 ogg_stream_init (&os, ogg_page_serialno (&og));
372 if (ogg_page_serialno (&og) != os.serialno)
374 /* so all streams are read. */
375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376 "Re-set serial number\n");
377 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
379 /*Add page to the bitstream*/
380 ogg_stream_pagein (&os, &og);
381 page_granule = ogg_page_granulepos (&og);
382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
383 "Reading page that ends at %" PRId64 "\n",
385 /*Extract all available packets*/
386 while (1 == ogg_stream_packetout (&os, &op))
388 /*OggOpus streams are identified by a magic string in the initial
390 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "Got Opus Header\n");
394 if (has_opus_stream && has_tags_packet)
396 /*If we're seeing another BOS OpusHead now it means
397 the stream is chained without an EOS.
398 This can easily happen if record helper is terminated unexpectedly.
402 opus_decoder_destroy (dec);
404 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
406 if (!has_opus_stream)
408 if (packet_count > 0 && opus_serialno == os.serialno)
410 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
411 (int64_t) opus_serialno, (int64_t) os.serialno);
414 opus_serialno = os.serialno;
421 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
422 "Got header for stream %" PRId64 ", this is %dth link\n",
423 (int64_t) opus_serialno, total_links);
427 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
430 if (!has_opus_stream || os.serialno != opus_serialno)
432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436 /*If first packet in a logical stream, process the Opus header*/
437 if (0 == packet_count)
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440 "Decoding header\n");
441 dec = process_header (&op);
445 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
447 /*The format specifies that the initial header and tags packets are on their
448 own pages. To aid implementors in discovering that their files are wrong
449 we reject them explicitly here. In some player designs files like this would
450 fail even without an explicit test.*/
451 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
455 /*Remember how many samples at the front we were told to skip
456 so that we can adjust the timestamp counting.*/
457 gran_offset = preskip;
461 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
465 (unsigned int) sizeof (float),
466 (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
467 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
470 else if (1 == packet_count)
473 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
475 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
485 /*End of stream condition*/
486 if (op.e_o_s && os.serialno == opus_serialno)
488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490 eos = 1; /* don't care for anything except opus eos */
493 /*Decode Opus packet*/
494 ret = opus_decode_float (dec,
495 (const unsigned char *) op.packet,
500 /*If the decoder returned less than zero, we have an error.*/
503 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
507 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
511 (unsigned int) op.bytes);
513 /*Apply header gain, if we're not using an opus library new
514 enough to do this internally.*/
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "Applying gain %f\n",
521 for (i = 0; i < frame_size * channels; i++)
522 pcm_buffer[i] *= gain;
525 /*This handles making sure that our output duration respects
526 the final end-trim by not letting the output sample count
527 get ahead of the granpos indicated value.*/
528 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
531 packet_count, maxout);
533 outsamp = audio_write (0 > maxout ? 0 : maxout);
542 opus_decoder_destroy (dec);
552 * @param msg message we received.
553 * @return #GNUNET_OK on success,
554 * #GNUNET_NO to stop further processing due to disconnect (no error)
555 * #GNUNET_SYSERR to stop further processing due to error
558 stdin_receiver (void *cls,
559 const struct GNUNET_MessageHeader *msg)
561 struct AudioMessage *audio;
566 switch (ntohs (msg->type))
568 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
569 audio = (struct AudioMessage *) msg;
570 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
572 /*Get the ogg buffer for writing*/
573 data = ogg_sync_buffer (&oy, payload_len);
574 /*Read bitstream from input file*/
575 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
576 ogg_sync_wrote (&oy, payload_len);
578 ogg_demux_and_decode ();
588 * Callback when data is there for playback
591 stream_write_callback (pa_stream *s,
599 if (-1 != ready_pipe[1])
601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602 "Unblocking main loop!\n");
603 (void) write (ready_pipe[1], "r", 1);
609 * Exit callback for SIGTERM and SIGINT
612 exit_signal_callback (pa_mainloop_api *m,
621 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
622 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
628 * Pulseaudio stream state callback
631 context_state_callback (pa_context *c,
637 GNUNET_assert (NULL != c);
638 switch (pa_context_get_state (c))
640 case PA_CONTEXT_CONNECTING:
641 case PA_CONTEXT_AUTHORIZING:
642 case PA_CONTEXT_SETTING_NAME:
644 case PA_CONTEXT_READY:
646 GNUNET_assert (! stream_out);
647 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
648 _("Connection established.\n"));
650 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
652 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
653 _("pa_stream_new() failed: %s\n"),
654 pa_strerror (pa_context_errno (c)));
657 pa_stream_set_write_callback (stream_out,
658 &stream_write_callback,
661 pa_stream_connect_playback (stream_out, NULL,
663 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
666 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
667 _("pa_stream_connect_playback() failed: %s\n"),
668 pa_strerror (pa_context_errno (c)));
673 case PA_CONTEXT_TERMINATED:
677 case PA_CONTEXT_FAILED:
679 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
680 _("Connection failure: %s\n"),
681 pa_strerror (pa_context_errno (c)));
691 * Pulseaudio initialization
698 if (!pa_sample_spec_valid (&sample_spec))
700 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
703 /* set up threaded playback mainloop */
704 if (!(m = pa_threaded_mainloop_new ()))
706 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707 _("pa_mainloop_new() failed.\n"));
709 mainloop_api = pa_threaded_mainloop_get_api (m);
710 /* listen to signals */
711 r = pa_signal_init (mainloop_api);
712 GNUNET_assert (r == 0);
713 pa_signal_new (SIGINT, exit_signal_callback, NULL);
714 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
717 /* connect to the main pulseaudio context */
718 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
720 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
721 _("pa_context_new() failed.\n"));
723 pa_context_set_state_callback (context, context_state_callback, NULL);
725 if (pa_context_connect (context, NULL, 0, NULL) < 0)
727 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
728 _("pa_context_connect() failed: %s\n"),
729 pa_strerror (pa_context_errno (context)));
731 if (pa_threaded_mainloop_start (m) < 0)
733 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
734 _("pa_mainloop_run() failed.\n"));
747 drain_callback (pa_stream*s, int success, void *userdata)
752 pa_threaded_mainloop_signal (m,
758 * The main function for the playback helper.
760 * @param argc number of arguments from the command line
761 * @param argv command line arguments
762 * @return 0 ok, 1 on error
765 main (int argc, char *argv[])
767 static unsigned long long toff;
768 char readbuf[MAXLINE];
769 struct GNUNET_MessageStreamTokenizer *stdin_mst;
772 #ifdef DEBUG_READ_PURE_OGG
773 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
778 GNUNET_assert (GNUNET_OK ==
779 GNUNET_log_setup ("gnunet-helper-audio-playback",
782 if (0 != pipe (ready_pipe))
784 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
787 stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791 "Waiting for PulseAudio to be ready.\n");
792 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
793 close (ready_pipe[0]);
794 close (ready_pipe[1]);
797 #ifdef DEBUG_DUMP_DECODED_OGG
798 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
802 ret = read (STDIN_FILENO,
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "Received %d bytes of audio data (total: %llu)\n",
812 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
813 _("Read error from STDIN: %s\n"),
819 #ifdef DEBUG_READ_PURE_OGG
822 char *data = ogg_sync_buffer (&oy, ret);
823 GNUNET_memcpy (data, readbuf, ret);
824 ogg_sync_wrote (&oy, ret);
825 ogg_demux_and_decode ();
829 GNUNET_MST_from_buffer (stdin_mst,
831 GNUNET_NO, GNUNET_NO);
833 GNUNET_MST_destroy (stdin_mst);
836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 pa_threaded_mainloop_lock (m);
839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
842 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846 pa_threaded_mainloop_wait (m);
848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850 pa_operation_unref (o);
851 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853 pa_threaded_mainloop_unlock (m);