2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
141 * Process an Opus header and setup the opus decoder based on it.
142 * It takes several pointers for header values which are needed
143 * elsewhere in the code.
146 process_header (ogg_packet *op)
150 struct OpusHeadPacket header;
152 if ( ((unsigned int) op->bytes) < sizeof (header))
154 GNUNET_memcpy (&header,
157 header.preskip = GNUNET_le16toh (header.preskip);
158 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
159 header.gain = GNUNET_le16toh (header.gain);
161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
162 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
166 header.sampling_rate,
168 channels = header.channels;
169 preskip = header.preskip;
171 if (header.channel_mapping != 0)
174 "This implementation does not support non-mono streams\n");
178 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
182 "Cannot create encoder: %s\n",
183 opus_strerror (err));
189 "Decoder initialization failed: %s\n",
190 opus_strerror (err));
194 if (0 != header.gain)
196 /*Gain API added in a newer libopus version, if we don't have it
197 we apply the gain ourselves. We also add in a user provided
198 manual gain at the same time.*/
199 int gainadj = (int) header.gain;
200 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
201 if(OPUS_UNIMPLEMENTED == err)
203 gain = pow (10.0, gainadj / 5120.0);
205 else if (OPUS_OK != err)
207 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
216 #ifdef DEBUG_DUMP_DECODED_OGG
218 fwrite_le32(opus_int32 i32, FILE *file)
220 unsigned char buf[4];
221 buf[0]=(unsigned char)(i32&0xFF);
222 buf[1]=(unsigned char)(i32>>8&0xFF);
223 buf[2]=(unsigned char)(i32>>16&0xFF);
224 buf[3]=(unsigned char)(i32>>24&0xFF);
225 return fwrite(buf,4,1,file);
230 fwrite_le16(int i16, FILE *file)
232 unsigned char buf[2];
233 buf[0]=(unsigned char)(i16&0xFF);
234 buf[1]=(unsigned char)(i16>>8&0xFF);
235 return fwrite(buf,2,1,file);
245 ret = fprintf (file, "RIFF") >= 0;
246 ret &= fwrite_le32 (0x7fffffff, file);
248 ret &= fprintf (file, "WAVEfmt ") >= 0;
249 ret &= fwrite_le32 (16, file);
250 ret &= fwrite_le16 (1, file);
251 ret &= fwrite_le16 (channels, file);
252 ret &= fwrite_le32 (SAMPLING_RATE, file);
253 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
254 ret &= fwrite_le16 (2*channels, file);
255 ret &= fwrite_le16 (16, file);
257 ret &= fprintf (file, "data") >= 0;
258 ret &= fwrite_le32 (0x7fffffff, file);
260 return !ret ? -1 : 16;
267 audio_write (int64_t maxout)
274 #ifdef DEBUG_DUMP_DECODED_OGG
275 static int wrote_wav_header;
277 if (dump_to_stdout && !wrote_wav_header)
280 wrote_wav_header = 1;
283 maxout = 0 > maxout ? 0 : maxout;
286 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
288 output = pcm_buffer + channels * tmp_skip;
289 out_len = frame_size - tmp_skip;
290 if (out_len > MAX_FRAME_SIZE)
294 to_write = out_len < maxout ? out_len : (unsigned) maxout;
299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300 "Writing %u * %u * %u = %llu bytes into PA\n",
303 (unsigned int) sizeof (float),
304 (unsigned long long) (to_write * channels * sizeof (float)));
305 #ifdef DEBUG_DUMP_DECODED_OGG
308 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
309 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
310 # define float2int(flt) ((int)(floor(.5+flt)))
312 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
313 for (i=0;i<(int)out_len*channels;i++)
314 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
316 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
321 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
322 PA_SEEK_RELATIVE) < 0)
324 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
325 _("pa_stream_write() failed: %s\n"),
326 pa_strerror (pa_context_errno (context)));
331 } while (0 < frame_size && 0 < maxout);
333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
334 "Wrote %" PRId64 " samples\n",
341 * Pulseaudio shutdown task
346 mainloop_api->quit (mainloop_api,
353 ogg_demux_and_decode ()
356 static int stream_init;
357 int64_t page_granule = 0;
359 static int has_opus_stream;
360 static int has_tags_packet;
361 static int32_t opus_serialno;
362 static int64_t link_out;
363 static int64_t packet_count;
365 static int total_links;
366 static int gran_offset;
368 while (1 == ogg_sync_pageout (&oy, &og))
370 if (0 == stream_init)
372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
373 "Initialized the stream\n");
374 ogg_stream_init (&os, ogg_page_serialno (&og));
377 if (ogg_page_serialno (&og) != os.serialno)
379 /* so all streams are read. */
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381 "Re-set serial number\n");
382 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
384 /*Add page to the bitstream*/
385 ogg_stream_pagein (&os, &og);
386 page_granule = ogg_page_granulepos (&og);
387 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
388 "Reading page that ends at %" PRId64 "\n",
390 /*Extract all available packets*/
391 while (1 == ogg_stream_packetout (&os, &op))
393 /*OggOpus streams are identified by a magic string in the initial
395 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
398 "Got Opus Header\n");
399 if (has_opus_stream && has_tags_packet)
401 /*If we're seeing another BOS OpusHead now it means
402 the stream is chained without an EOS.
403 This can easily happen if record helper is terminated unexpectedly.
407 opus_decoder_destroy (dec);
409 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
411 if (!has_opus_stream)
413 if (packet_count > 0 && opus_serialno == os.serialno)
415 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
416 (int64_t) opus_serialno, (int64_t) os.serialno);
419 opus_serialno = os.serialno;
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "Got header for stream %" PRId64 ", this is %dth link\n",
428 (int64_t) opus_serialno, total_links);
432 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
435 if (!has_opus_stream || os.serialno != opus_serialno)
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441 /*If first packet in a logical stream, process the Opus header*/
442 if (0 == packet_count)
444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445 "Decoding header\n");
446 dec = process_header (&op);
450 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
452 /*The format specifies that the initial header and tags packets are on their
453 own pages. To aid implementors in discovering that their files are wrong
454 we reject them explicitly here. In some player designs files like this would
455 fail even without an explicit test.*/
456 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
460 /*Remember how many samples at the front we were told to skip
461 so that we can adjust the timestamp counting.*/
462 gran_offset = preskip;
466 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
467 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
470 (unsigned int) sizeof (float),
471 (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
472 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
475 else if (1 == packet_count)
478 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
480 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
490 /*End of stream condition*/
491 if (op.e_o_s && os.serialno == opus_serialno)
493 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495 eos = 1; /* don't care for anything except opus eos */
498 /*Decode Opus packet*/
499 ret = opus_decode_float (dec,
500 (const unsigned char *) op.packet,
505 /*If the decoder returned less than zero, we have an error.*/
508 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
516 (unsigned int) op.bytes);
518 /*Apply header gain, if we're not using an opus library new
519 enough to do this internally.*/
523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524 "Applying gain %f\n",
526 for (i = 0; i < frame_size * channels; i++)
527 pcm_buffer[i] *= gain;
530 /*This handles making sure that our output duration respects
531 the final end-trim by not letting the output sample count
532 get ahead of the granpos indicated value.*/
533 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
534 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
536 packet_count, maxout);
538 outsamp = audio_write (0 > maxout ? 0 : maxout);
547 opus_decoder_destroy (dec);
557 * @param msg message we received.
558 * @return #GNUNET_OK on success,
559 * #GNUNET_NO to stop further processing due to disconnect (no error)
560 * #GNUNET_SYSERR to stop further processing due to error
563 stdin_receiver (void *cls,
564 const struct GNUNET_MessageHeader *msg)
566 struct AudioMessage *audio;
571 switch (ntohs (msg->type))
573 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
574 audio = (struct AudioMessage *) msg;
575 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
577 /*Get the ogg buffer for writing*/
578 data = ogg_sync_buffer (&oy, payload_len);
579 /*Read bitstream from input file*/
580 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
581 ogg_sync_wrote (&oy, payload_len);
583 ogg_demux_and_decode ();
593 * Callback when data is there for playback
596 stream_write_callback (pa_stream *s,
604 if (-1 != ready_pipe[1])
606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
607 "Unblocking main loop!\n");
608 (void) write (ready_pipe[1], "r", 1);
614 * Exit callback for SIGTERM and SIGINT
617 exit_signal_callback (pa_mainloop_api *m,
626 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
627 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
633 * Pulseaudio stream state callback
636 context_state_callback (pa_context *c,
642 GNUNET_assert (NULL != c);
643 switch (pa_context_get_state (c))
645 case PA_CONTEXT_CONNECTING:
646 case PA_CONTEXT_AUTHORIZING:
647 case PA_CONTEXT_SETTING_NAME:
649 case PA_CONTEXT_READY:
651 GNUNET_assert (! stream_out);
652 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
653 _("Connection established.\n"));
655 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
657 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
658 _("pa_stream_new() failed: %s\n"),
659 pa_strerror (pa_context_errno (c)));
662 pa_stream_set_write_callback (stream_out,
663 &stream_write_callback,
666 pa_stream_connect_playback (stream_out, NULL,
668 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
671 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
672 _("pa_stream_connect_playback() failed: %s\n"),
673 pa_strerror (pa_context_errno (c)));
678 case PA_CONTEXT_TERMINATED:
682 case PA_CONTEXT_FAILED:
684 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
685 _("Connection failure: %s\n"),
686 pa_strerror (pa_context_errno (c)));
696 * Pulseaudio initialization
703 if (!pa_sample_spec_valid (&sample_spec))
705 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
708 /* set up threaded playback mainloop */
709 if (!(m = pa_threaded_mainloop_new ()))
711 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
712 _("pa_mainloop_new() failed.\n"));
714 mainloop_api = pa_threaded_mainloop_get_api (m);
715 /* listen to signals */
716 r = pa_signal_init (mainloop_api);
717 GNUNET_assert (r == 0);
718 pa_signal_new (SIGINT, exit_signal_callback, NULL);
719 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
722 /* connect to the main pulseaudio context */
723 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
725 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
726 _("pa_context_new() failed.\n"));
728 pa_context_set_state_callback (context, context_state_callback, NULL);
730 if (pa_context_connect (context, NULL, 0, NULL) < 0)
732 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
733 _("pa_context_connect() failed: %s\n"),
734 pa_strerror (pa_context_errno (context)));
736 if (pa_threaded_mainloop_start (m) < 0)
738 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
739 _("pa_mainloop_run() failed.\n"));
752 drain_callback (pa_stream*s, int success, void *userdata)
757 pa_threaded_mainloop_signal (m,
763 * The main function for the playback helper.
765 * @param argc number of arguments from the command line
766 * @param argv command line arguments
767 * @return 0 ok, 1 on error
770 main (int argc, char *argv[])
772 static unsigned long long toff;
773 char readbuf[MAXLINE];
774 struct GNUNET_MessageStreamTokenizer *stdin_mst;
777 #ifdef DEBUG_READ_PURE_OGG
778 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
783 GNUNET_assert (GNUNET_OK ==
784 GNUNET_log_setup ("gnunet-helper-audio-playback",
787 if (0 != pipe (ready_pipe))
789 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
792 stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796 "Waiting for PulseAudio to be ready.\n");
797 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
798 close (ready_pipe[0]);
799 close (ready_pipe[1]);
802 #ifdef DEBUG_DUMP_DECODED_OGG
803 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
807 ret = read (STDIN_FILENO,
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "Received %d bytes of audio data (total: %llu)\n",
817 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
818 _("Read error from STDIN: %s\n"),
824 #ifdef DEBUG_READ_PURE_OGG
827 char *data = ogg_sync_buffer (&oy, ret);
828 GNUNET_memcpy (data, readbuf, ret);
829 ogg_sync_wrote (&oy, ret);
830 ogg_demux_and_decode ();
834 GNUNET_MST_from_buffer (stdin_mst,
836 GNUNET_NO, GNUNET_NO);
838 GNUNET_MST_destroy (stdin_mst);
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843 pa_threaded_mainloop_lock (m);
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
847 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851 pa_threaded_mainloop_wait (m);
853 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855 pa_operation_unref (o);
856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858 pa_threaded_mainloop_unlock (m);