2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
141 * Process an Opus header and setup the opus decoder based on it.
142 * It takes several pointers for header values which are needed
143 * elsewhere in the code.
146 process_header (ogg_packet *op)
150 struct OpusHeadPacket header;
152 if (op->bytes < sizeof (header))
154 GNUNET_memcpy (&header, op->packet, sizeof (header));
155 header.preskip = GNUNET_le16toh (header.preskip);
156 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157 header.gain = GNUNET_le16toh (header.gain);
159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161 header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
163 channels = header.channels;
164 preskip = header.preskip;
166 if (header.channel_mapping != 0)
169 "This implementation does not support non-mono streams\n");
173 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
177 "Cannot create encoder: %s\n",
178 opus_strerror (err));
184 "Decoder initialization failed: %s\n",
185 opus_strerror (err));
189 if (0 != header.gain)
191 /*Gain API added in a newer libopus version, if we don't have it
192 we apply the gain ourselves. We also add in a user provided
193 manual gain at the same time.*/
194 int gainadj = (int) header.gain;
195 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196 if(OPUS_UNIMPLEMENTED == err)
198 gain = pow (10.0, gainadj / 5120.0);
200 else if (OPUS_OK != err)
202 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
211 #ifdef DEBUG_DUMP_DECODED_OGG
213 fwrite_le32(opus_int32 i32, FILE *file)
215 unsigned char buf[4];
216 buf[0]=(unsigned char)(i32&0xFF);
217 buf[1]=(unsigned char)(i32>>8&0xFF);
218 buf[2]=(unsigned char)(i32>>16&0xFF);
219 buf[3]=(unsigned char)(i32>>24&0xFF);
220 return fwrite(buf,4,1,file);
225 fwrite_le16(int i16, FILE *file)
227 unsigned char buf[2];
228 buf[0]=(unsigned char)(i16&0xFF);
229 buf[1]=(unsigned char)(i16>>8&0xFF);
230 return fwrite(buf,2,1,file);
240 ret = fprintf (file, "RIFF") >= 0;
241 ret &= fwrite_le32 (0x7fffffff, file);
243 ret &= fprintf (file, "WAVEfmt ") >= 0;
244 ret &= fwrite_le32 (16, file);
245 ret &= fwrite_le16 (1, file);
246 ret &= fwrite_le16 (channels, file);
247 ret &= fwrite_le32 (SAMPLING_RATE, file);
248 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249 ret &= fwrite_le16 (2*channels, file);
250 ret &= fwrite_le16 (16, file);
252 ret &= fprintf (file, "data") >= 0;
253 ret &= fwrite_le32 (0x7fffffff, file);
255 return !ret ? -1 : 16;
262 audio_write (int64_t maxout)
269 #ifdef DEBUG_DUMP_DECODED_OGG
270 static int wrote_wav_header;
272 if (dump_to_stdout && !wrote_wav_header)
275 wrote_wav_header = 1;
278 maxout = 0 > maxout ? 0 : maxout;
281 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
283 output = pcm_buffer + channels * tmp_skip;
284 out_len = frame_size - tmp_skip;
285 if (out_len > MAX_FRAME_SIZE)
289 to_write = out_len < maxout ? out_len : (unsigned) maxout;
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "Writing %u * %u * %u = %llu bytes into PA\n",
298 (unsigned int) sizeof (float),
299 (unsigned long long) (to_write * channels * sizeof (float)));
300 #ifdef DEBUG_DUMP_DECODED_OGG
303 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
304 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
305 # define float2int(flt) ((int)(floor(.5+flt)))
307 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
308 for (i=0;i<(int)out_len*channels;i++)
309 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
311 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
316 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
317 PA_SEEK_RELATIVE) < 0)
319 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320 _("pa_stream_write() failed: %s\n"),
321 pa_strerror (pa_context_errno (context)));
326 } while (0 < frame_size && 0 < maxout);
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329 "Wrote %" PRId64 " samples\n",
336 * Pulseaudio shutdown task
341 mainloop_api->quit (mainloop_api, ret);
347 ogg_demux_and_decode ()
350 static int stream_init;
351 int64_t page_granule = 0;
353 static int has_opus_stream;
354 static int has_tags_packet;
355 static int32_t opus_serialno;
356 static int64_t link_out;
357 static int64_t packet_count;
359 static int total_links;
360 static int gran_offset;
362 while (1 == ogg_sync_pageout (&oy, &og))
364 if (0 == stream_init)
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "Initialized the stream\n");
368 ogg_stream_init (&os, ogg_page_serialno (&og));
371 if (ogg_page_serialno (&og) != os.serialno)
373 /* so all streams are read. */
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375 "Re-set serial number\n");
376 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
378 /*Add page to the bitstream*/
379 ogg_stream_pagein (&os, &og);
380 page_granule = ogg_page_granulepos (&og);
381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382 "Reading page that ends at %" PRId64 "\n",
384 /*Extract all available packets*/
385 while (1 == ogg_stream_packetout (&os, &op))
387 /*OggOpus streams are identified by a magic string in the initial
389 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Got Opus Header\n");
393 if (has_opus_stream && has_tags_packet)
395 /*If we're seeing another BOS OpusHead now it means
396 the stream is chained without an EOS.
397 This can easily happen if record helper is terminated unexpectedly.
401 opus_decoder_destroy (dec);
403 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
405 if (!has_opus_stream)
407 if (packet_count > 0 && opus_serialno == os.serialno)
409 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
410 (int64_t) opus_serialno, (int64_t) os.serialno);
413 opus_serialno = os.serialno;
420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421 "Got header for stream %" PRId64 ", this is %dth link\n",
422 (int64_t) opus_serialno, total_links);
426 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
429 if (!has_opus_stream || os.serialno != opus_serialno)
431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435 /*If first packet in a logical stream, process the Opus header*/
436 if (0 == packet_count)
438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439 "Decoding header\n");
440 dec = process_header (&op);
444 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
446 /*The format specifies that the initial header and tags packets are on their
447 own pages. To aid implementors in discovering that their files are wrong
448 we reject them explicitly here. In some player designs files like this would
449 fail even without an explicit test.*/
450 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
454 /*Remember how many samples at the front we were told to skip
455 so that we can adjust the timestamp counting.*/
456 gran_offset = preskip;
460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
464 (unsigned int) sizeof (float),
465 (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
466 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
469 else if (1 == packet_count)
472 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
474 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
484 /*End of stream condition*/
485 if (op.e_o_s && os.serialno == opus_serialno)
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 eos = 1; /* don't care for anything except opus eos */
492 /*Decode Opus packet*/
493 ret = opus_decode_float (dec,
494 (const unsigned char *) op.packet,
499 /*If the decoder returned less than zero, we have an error.*/
502 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
510 (unsigned int) op.bytes);
512 /*Apply header gain, if we're not using an opus library new
513 enough to do this internally.*/
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "Applying gain %f\n",
520 for (i = 0; i < frame_size * channels; i++)
521 pcm_buffer[i] *= gain;
524 /*This handles making sure that our output duration respects
525 the final end-trim by not letting the output sample count
526 get ahead of the granpos indicated value.*/
527 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
530 packet_count, maxout);
532 outsamp = audio_write (0 > maxout ? 0 : maxout);
541 opus_decoder_destroy (dec);
550 * @param msg message we received.
551 * @return #GNUNET_OK on success,
552 * #GNUNET_NO to stop further processing due to disconnect (no error)
553 * #GNUNET_SYSERR to stop further processing due to error
556 stdin_receiver (void *cls,
557 const struct GNUNET_MessageHeader *msg)
559 struct AudioMessage *audio;
563 switch (ntohs (msg->type))
565 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
566 audio = (struct AudioMessage *) msg;
567 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
569 /*Get the ogg buffer for writing*/
570 data = ogg_sync_buffer (&oy, payload_len);
571 /*Read bitstream from input file*/
572 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
573 ogg_sync_wrote (&oy, payload_len);
575 ogg_demux_and_decode ();
585 * Callback when data is there for playback
588 stream_write_callback (pa_stream *s,
593 if (-1 != ready_pipe[1])
595 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596 "Unblocking main loop!\n");
597 (void) write (ready_pipe[1], "r", 1);
603 * Exit callback for SIGTERM and SIGINT
606 exit_signal_callback (pa_mainloop_api *m,
611 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
612 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
618 * Pulseaudio stream state callback
621 context_state_callback (pa_context *c,
626 GNUNET_assert (NULL != c);
627 switch (pa_context_get_state (c))
629 case PA_CONTEXT_CONNECTING:
630 case PA_CONTEXT_AUTHORIZING:
631 case PA_CONTEXT_SETTING_NAME:
633 case PA_CONTEXT_READY:
635 GNUNET_assert (! stream_out);
636 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
637 _("Connection established.\n"));
639 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
641 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
642 _("pa_stream_new() failed: %s\n"),
643 pa_strerror (pa_context_errno (c)));
646 pa_stream_set_write_callback (stream_out,
647 &stream_write_callback,
650 pa_stream_connect_playback (stream_out, NULL,
652 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
655 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
656 _("pa_stream_connect_playback() failed: %s\n"),
657 pa_strerror (pa_context_errno (c)));
662 case PA_CONTEXT_TERMINATED:
666 case PA_CONTEXT_FAILED:
668 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
669 _("Connection failure: %s\n"),
670 pa_strerror (pa_context_errno (c)));
680 * Pulseaudio initialization
687 if (!pa_sample_spec_valid (&sample_spec))
689 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
692 /* set up threaded playback mainloop */
693 if (!(m = pa_threaded_mainloop_new ()))
695 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
696 _("pa_mainloop_new() failed.\n"));
698 mainloop_api = pa_threaded_mainloop_get_api (m);
699 /* listen to signals */
700 r = pa_signal_init (mainloop_api);
701 GNUNET_assert (r == 0);
702 pa_signal_new (SIGINT, exit_signal_callback, NULL);
703 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
706 /* connect to the main pulseaudio context */
707 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
709 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
710 _("pa_context_new() failed.\n"));
712 pa_context_set_state_callback (context, context_state_callback, NULL);
714 if (pa_context_connect (context, NULL, 0, NULL) < 0)
716 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
717 _("pa_context_connect() failed: %s\n"),
718 pa_strerror (pa_context_errno (context)));
720 if (pa_threaded_mainloop_start (m) < 0)
722 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
723 _("pa_mainloop_run() failed.\n"));
736 drain_callback (pa_stream*s, int success, void *userdata)
738 pa_threaded_mainloop_signal (m, 0);
743 * The main function for the playback helper.
745 * @param argc number of arguments from the command line
746 * @param argv command line arguments
747 * @return 0 ok, 1 on error
750 main (int argc, char *argv[])
752 static unsigned long long toff;
754 char readbuf[MAXLINE];
755 struct GNUNET_MessageStreamTokenizer *stdin_mst;
758 #ifdef DEBUG_READ_PURE_OGG
759 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
762 GNUNET_assert (GNUNET_OK ==
763 GNUNET_log_setup ("gnunet-helper-audio-playback",
766 if (0 != pipe (ready_pipe))
768 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
771 stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775 "Waiting for PulseAudio to be ready.\n");
776 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
777 close (ready_pipe[0]);
778 close (ready_pipe[1]);
781 #ifdef DEBUG_DUMP_DECODED_OGG
782 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
786 ret = read (0, readbuf, sizeof (readbuf));
788 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789 "Received %d bytes of audio data (total: %llu)\n",
794 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
795 _("Read error from STDIN: %s\n"),
801 #ifdef DEBUG_READ_PURE_OGG
804 char *data = ogg_sync_buffer (&oy, ret);
805 GNUNET_memcpy (data, readbuf, ret);
806 ogg_sync_wrote (&oy, ret);
807 ogg_demux_and_decode ();
811 GNUNET_MST_from_buffer (stdin_mst,
813 GNUNET_NO, GNUNET_NO);
815 GNUNET_MST_destroy (stdin_mst);
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820 pa_threaded_mainloop_lock (m);
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
824 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
826 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828 pa_threaded_mainloop_wait (m);
830 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832 pa_operation_unref (o);
833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
835 pa_threaded_mainloop_unlock (m);