2 This file is part of GNUnet.
3 Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
141 * Process an Opus header and setup the opus decoder based on it.
142 * It takes several pointers for header values which are needed
143 * elsewhere in the code.
146 process_header (ogg_packet *op)
150 struct OpusHeadPacket header;
152 if (op->bytes < sizeof (header))
154 memcpy (&header, op->packet, sizeof (header));
155 header.preskip = GNUNET_le16toh (header.preskip);
156 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157 header.gain = GNUNET_le16toh (header.gain);
159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
161 header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
163 channels = header.channels;
164 preskip = header.preskip;
166 if (header.channel_mapping != 0)
169 "This implementation does not support non-mono streams\n");
173 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
177 "Cannot create encoder: %s\n",
178 opus_strerror (err));
184 "Decoder initialization failed: %s\n",
185 opus_strerror (err));
189 if (0 != header.gain)
191 /*Gain API added in a newer libopus version, if we don't have it
192 we apply the gain ourselves. We also add in a user provided
193 manual gain at the same time.*/
194 int gainadj = (int) header.gain;
195 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
196 if(OPUS_UNIMPLEMENTED == err)
198 gain = pow (10.0, gainadj / 5120.0);
200 else if (OPUS_OK != err)
202 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
211 #ifdef DEBUG_DUMP_DECODED_OGG
213 fwrite_le32(opus_int32 i32, FILE *file)
215 unsigned char buf[4];
216 buf[0]=(unsigned char)(i32&0xFF);
217 buf[1]=(unsigned char)(i32>>8&0xFF);
218 buf[2]=(unsigned char)(i32>>16&0xFF);
219 buf[3]=(unsigned char)(i32>>24&0xFF);
220 return fwrite(buf,4,1,file);
225 fwrite_le16(int i16, FILE *file)
227 unsigned char buf[2];
228 buf[0]=(unsigned char)(i16&0xFF);
229 buf[1]=(unsigned char)(i16>>8&0xFF);
230 return fwrite(buf,2,1,file);
240 ret = fprintf (file, "RIFF") >= 0;
241 ret &= fwrite_le32 (0x7fffffff, file);
243 ret &= fprintf (file, "WAVEfmt ") >= 0;
244 ret &= fwrite_le32 (16, file);
245 ret &= fwrite_le16 (1, file);
246 ret &= fwrite_le16 (channels, file);
247 ret &= fwrite_le32 (SAMPLING_RATE, file);
248 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
249 ret &= fwrite_le16 (2*channels, file);
250 ret &= fwrite_le16 (16, file);
252 ret &= fprintf (file, "data") >= 0;
253 ret &= fwrite_le32 (0x7fffffff, file);
255 return !ret ? -1 : 16;
262 audio_write (int64_t maxout)
269 #ifdef DEBUG_DUMP_DECODED_OGG
270 static int wrote_wav_header;
272 if (dump_to_stdout && !wrote_wav_header)
275 wrote_wav_header = 1;
278 maxout = 0 > maxout ? 0 : maxout;
281 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
283 output = pcm_buffer + channels * tmp_skip;
284 out_len = frame_size - tmp_skip;
285 if (out_len > MAX_FRAME_SIZE)
289 to_write = out_len < maxout ? out_len : (unsigned) maxout;
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "Writing %u * %u * %u = %u bytes into PA\n",
296 to_write, channels, sizeof (float),
297 to_write * channels * sizeof (float));
298 #ifdef DEBUG_DUMP_DECODED_OGG
301 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
302 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
303 # define float2int(flt) ((int)(floor(.5+flt)))
305 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
306 for (i=0;i<(int)out_len*channels;i++)
307 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
309 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
314 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
315 PA_SEEK_RELATIVE) < 0)
317 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
318 _("pa_stream_write() failed: %s\n"),
319 pa_strerror (pa_context_errno (context)));
324 } while (0 < frame_size && 0 < maxout);
326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327 "Wrote %" PRId64 " samples\n",
334 * Pulseaudio shutdown task
339 mainloop_api->quit (mainloop_api, ret);
345 ogg_demux_and_decode ()
348 static int stream_init;
349 int64_t page_granule = 0;
351 static int has_opus_stream;
352 static int has_tags_packet;
353 static int32_t opus_serialno;
354 static int64_t link_out;
355 static int64_t packet_count;
357 static int total_links;
358 static int gran_offset;
360 while (1 == ogg_sync_pageout (&oy, &og))
362 if (0 == stream_init)
364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365 "Initialized the stream\n");
366 ogg_stream_init (&os, ogg_page_serialno (&og));
369 if (ogg_page_serialno (&og) != os.serialno)
371 /* so all streams are read. */
372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
373 "Re-set serial number\n");
374 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
376 /*Add page to the bitstream*/
377 ogg_stream_pagein (&os, &og);
378 page_granule = ogg_page_granulepos (&og);
379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
380 "Reading page that ends at %" PRId64 "\n",
382 /*Extract all available packets*/
383 while (1 == ogg_stream_packetout (&os, &op))
385 /*OggOpus streams are identified by a magic string in the initial
387 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390 "Got Opus Header\n");
391 if (has_opus_stream && has_tags_packet)
393 /*If we're seeing another BOS OpusHead now it means
394 the stream is chained without an EOS.
395 This can easily happen if record helper is terminated unexpectedly.
399 opus_decoder_destroy (dec);
401 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
403 if (!has_opus_stream)
405 if (packet_count > 0 && opus_serialno == os.serialno)
407 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
408 (int64_t) opus_serialno, (int64_t) os.serialno);
411 opus_serialno = os.serialno;
418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419 "Got header for stream %" PRId64 ", this is %dth link\n",
420 (int64_t) opus_serialno, total_links);
424 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
427 if (!has_opus_stream || os.serialno != opus_serialno)
429 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433 /*If first packet in a logical stream, process the Opus header*/
434 if (0 == packet_count)
436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437 "Decoding header\n");
438 dec = process_header (&op);
442 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
444 /*The format specifies that the initial header and tags packets are on their
445 own pages. To aid implementors in discovering that their files are wrong
446 we reject them explicitly here. In some player designs files like this would
447 fail even without an explicit test.*/
448 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
452 /*Remember how many samples at the front we were told to skip
453 so that we can adjust the timestamp counting.*/
454 gran_offset = preskip;
458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459 "Allocating %u * %u * %u = %u bytes of buffer space\n",
460 MAX_FRAME_SIZE, channels, sizeof (float),
461 MAX_FRAME_SIZE * channels * sizeof (float));
462 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
465 else if (1 == packet_count)
468 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
470 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
480 /*End of stream condition*/
481 if (op.e_o_s && os.serialno == opus_serialno)
483 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
485 eos = 1; /* don't care for anything except opus eos */
488 /*Decode Opus packet*/
489 ret = opus_decode_float (dec,
490 (const unsigned char *) op.packet,
495 /*If the decoder returned less than zero, we have an error.*/
498 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
504 ret, ret * channels, op.bytes);
506 /*Apply header gain, if we're not using an opus library new
507 enough to do this internally.*/
511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512 "Applying gain %f\n",
514 for (i = 0; i < frame_size * channels; i++)
515 pcm_buffer[i] *= gain;
518 /*This handles making sure that our output duration respects
519 the final end-trim by not letting the output sample count
520 get ahead of the granpos indicated value.*/
521 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
522 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
524 packet_count, maxout);
526 outsamp = audio_write (0 > maxout ? 0 : maxout);
535 opus_decoder_destroy (dec);
545 stdin_receiver (void *cls,
547 const struct GNUNET_MessageHeader *msg)
549 struct AudioMessage *audio;
553 switch (ntohs (msg->type))
555 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
556 audio = (struct AudioMessage *) msg;
557 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
559 /*Get the ogg buffer for writing*/
560 data = ogg_sync_buffer (&oy, payload_len);
561 /*Read bitstream from input file*/
562 memcpy (data, (const unsigned char *) &audio[1], payload_len);
563 ogg_sync_wrote (&oy, payload_len);
565 ogg_demux_and_decode ();
575 * Callback when data is there for playback
578 stream_write_callback (pa_stream *s,
583 if (-1 != ready_pipe[1])
585 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586 "Unblocking main loop!\n");
587 (void) write (ready_pipe[1], "r", 1);
593 * Exit callback for SIGTERM and SIGINT
596 exit_signal_callback (pa_mainloop_api *m,
601 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
602 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
608 * Pulseaudio stream state callback
611 context_state_callback (pa_context *c,
616 GNUNET_assert (NULL != c);
617 switch (pa_context_get_state (c))
619 case PA_CONTEXT_CONNECTING:
620 case PA_CONTEXT_AUTHORIZING:
621 case PA_CONTEXT_SETTING_NAME:
623 case PA_CONTEXT_READY:
625 GNUNET_assert (! stream_out);
626 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
627 _("Connection established.\n"));
629 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
631 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
632 _("pa_stream_new() failed: %s\n"),
633 pa_strerror (pa_context_errno (c)));
636 pa_stream_set_write_callback (stream_out,
637 &stream_write_callback,
640 pa_stream_connect_playback (stream_out, NULL,
642 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
645 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
646 _("pa_stream_connect_playback() failed: %s\n"),
647 pa_strerror (pa_context_errno (c)));
652 case PA_CONTEXT_TERMINATED:
656 case PA_CONTEXT_FAILED:
658 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
659 _("Connection failure: %s\n"),
660 pa_strerror (pa_context_errno (c)));
670 * Pulseaudio initialization
677 if (!pa_sample_spec_valid (&sample_spec))
679 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
682 /* set up threaded playback mainloop */
683 if (!(m = pa_threaded_mainloop_new ()))
685 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
686 _("pa_mainloop_new() failed.\n"));
688 mainloop_api = pa_threaded_mainloop_get_api (m);
689 /* listen to signals */
690 r = pa_signal_init (mainloop_api);
691 GNUNET_assert (r == 0);
692 pa_signal_new (SIGINT, exit_signal_callback, NULL);
693 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
696 /* connect to the main pulseaudio context */
697 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
699 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
700 _("pa_context_new() failed.\n"));
702 pa_context_set_state_callback (context, context_state_callback, NULL);
704 if (pa_context_connect (context, NULL, 0, NULL) < 0)
706 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707 _("pa_context_connect() failed: %s\n"),
708 pa_strerror (pa_context_errno (context)));
710 if (pa_threaded_mainloop_start (m) < 0)
712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
713 _("pa_mainloop_run() failed.\n"));
725 drain_callback (pa_stream*s, int success, void *userdata)
727 pa_threaded_mainloop_signal (m, 0);
731 * The main function for the playback helper.
733 * @param argc number of arguments from the command line
734 * @param argv command line arguments
735 * @return 0 ok, 1 on error
738 main (int argc, char *argv[])
740 static unsigned long long toff;
742 char readbuf[MAXLINE];
743 struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst;
746 #ifdef DEBUG_READ_PURE_OGG
747 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
750 GNUNET_assert (GNUNET_OK ==
751 GNUNET_log_setup ("gnunet-helper-audio-playback",
754 if (0 != pipe (ready_pipe))
756 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
759 stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
762 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763 "Waiting for PulseAudio to be ready.\n");
764 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
765 close (ready_pipe[0]);
766 close (ready_pipe[1]);
769 #ifdef DEBUG_DUMP_DECODED_OGG
770 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
774 ret = read (0, readbuf, sizeof (readbuf));
776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777 "Received %d bytes of audio data (total: %llu)\n",
782 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
783 _("Read error from STDIN: %s\n"),
789 #ifdef DEBUG_READ_PURE_OGG
792 char *data = ogg_sync_buffer (&oy, ret);
793 memcpy (data, readbuf, ret);
794 ogg_sync_wrote (&oy, ret);
795 ogg_demux_and_decode ();
799 GNUNET_SERVER_mst_receive (stdin_mst, NULL,
801 GNUNET_NO, GNUNET_NO);
803 GNUNET_SERVER_mst_destroy (stdin_mst);
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808 pa_threaded_mainloop_lock (m);
809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
812 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
814 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
816 pa_threaded_mainloop_wait (m);
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820 pa_operation_unref (o);
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823 pa_threaded_mainloop_unlock (m);