2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
141 * Process an Opus header and setup the opus decoder based on it.
142 * It takes several pointers for header values which are needed
143 * elsewhere in the code.
146 process_header (ogg_packet *op)
150 struct OpusHeadPacket header;
152 if (((unsigned int) op->bytes) < sizeof(header))
154 GNUNET_memcpy (&header,
157 header.preskip = GNUNET_le16toh (header.preskip);
158 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
159 header.gain = GNUNET_le16toh (header.gain);
161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
162 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
166 header.sampling_rate,
168 channels = header.channels;
169 preskip = header.preskip;
171 if (header.channel_mapping != 0)
174 "This implementation does not support non-mono streams\n");
178 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
182 "Cannot create encoder: %s\n",
183 opus_strerror (err));
189 "Decoder initialization failed: %s\n",
190 opus_strerror (err));
194 if (0 != header.gain)
196 /*Gain API added in a newer libopus version, if we don't have it
197 we apply the gain ourselves. We also add in a user provided
198 manual gain at the same time.*/
199 int gainadj = (int) header.gain;
200 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
201 if (OPUS_UNIMPLEMENTED == err)
203 gain = pow (10.0, gainadj / 5120.0);
205 else if (OPUS_OK != err)
207 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
216 #ifdef DEBUG_DUMP_DECODED_OGG
218 fwrite_le32 (opus_int32 i32, FILE *file)
220 unsigned char buf[4];
222 buf[0] = (unsigned char) (i32 & 0xFF);
223 buf[1] = (unsigned char) (i32 >> 8 & 0xFF);
224 buf[2] = (unsigned char) (i32 >> 16 & 0xFF);
225 buf[3] = (unsigned char) (i32 >> 24 & 0xFF);
226 return fwrite (buf, 4, 1, file);
231 fwrite_le16 (int i16, FILE *file)
233 unsigned char buf[2];
235 buf[0] = (unsigned char) (i16 & 0xFF);
236 buf[1] = (unsigned char) (i16 >> 8 & 0xFF);
237 return fwrite (buf, 2, 1, file);
247 ret = fprintf (file, "RIFF") >= 0;
248 ret &= fwrite_le32 (0x7fffffff, file);
250 ret &= fprintf (file, "WAVEfmt ") >= 0;
251 ret &= fwrite_le32 (16, file);
252 ret &= fwrite_le16 (1, file);
253 ret &= fwrite_le16 (channels, file);
254 ret &= fwrite_le32 (SAMPLING_RATE, file);
255 ret &= fwrite_le32 (2 * channels * SAMPLING_RATE, file);
256 ret &= fwrite_le16 (2 * channels, file);
257 ret &= fwrite_le16 (16, file);
259 ret &= fprintf (file, "data") >= 0;
260 ret &= fwrite_le32 (0x7fffffff, file);
262 return ! ret ? -1 : 16;
270 audio_write (int64_t maxout)
278 #ifdef DEBUG_DUMP_DECODED_OGG
279 static int wrote_wav_header;
281 if (dump_to_stdout && ! wrote_wav_header)
284 wrote_wav_header = 1;
287 maxout = 0 > maxout ? 0 : maxout;
290 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
292 output = pcm_buffer + channels * tmp_skip;
293 out_len = frame_size - tmp_skip;
294 if (out_len > MAX_FRAME_SIZE)
298 to_write = out_len < maxout ? out_len : (unsigned) maxout;
303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
304 "Writing %u * %u * %u = %llu bytes into PA\n",
307 (unsigned int) sizeof(float),
308 (unsigned long long) (to_write * channels * sizeof(float)));
309 #ifdef DEBUG_DUMP_DECODED_OGG
312 # define fminf(_x, _y) ((_x) < (_y) ? (_x) : (_y))
313 # define fmaxf(_x, _y) ((_x) > (_y) ? (_x) : (_y))
314 # define float2int(flt) ((int) (floor (.5 + flt)))
316 int16_t *out = alloca (sizeof(short) * MAX_FRAME_SIZE * channels);
317 for (i = 0; i < (int) out_len * channels; i++)
318 out[i] = (short) float2int (fmaxf (-32768, fminf (output[i] * 32768.f,
321 fwrite (out, 2 * channels, out_len < maxout ? out_len : maxout, stdout);
326 (stream_out, output, to_write * channels * sizeof(float), NULL, 0,
327 PA_SEEK_RELATIVE) < 0)
329 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
330 _ ("pa_stream_write() failed: %s\n"),
331 pa_strerror (pa_context_errno (context)));
337 while (0 < frame_size && 0 < maxout);
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340 "Wrote %" PRId64 " samples\n",
347 * Pulseaudio shutdown task
352 mainloop_api->quit (mainloop_api,
359 ogg_demux_and_decode ()
362 static int stream_init;
363 int64_t page_granule = 0;
365 static int has_opus_stream;
366 static int has_tags_packet;
367 static int32_t opus_serialno;
368 static int64_t link_out;
369 static int64_t packet_count;
371 static int total_links;
372 static int gran_offset;
374 while (1 == ogg_sync_pageout (&oy, &og))
376 if (0 == stream_init)
378 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379 "Initialized the stream\n");
380 ogg_stream_init (&os, ogg_page_serialno (&og));
383 if (ogg_page_serialno (&og) != os.serialno)
385 /* so all streams are read. */
386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387 "Re-set serial number\n");
388 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
390 /*Add page to the bitstream*/
391 ogg_stream_pagein (&os, &og);
392 page_granule = ogg_page_granulepos (&og);
393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
394 "Reading page that ends at %" PRId64 "\n",
396 /*Extract all available packets*/
397 while (1 == ogg_stream_packetout (&os, &op))
399 /*OggOpus streams are identified by a magic string in the initial
401 if (op.b_o_s && (op.bytes >= 8) && ! memcmp (op.packet, "OpusHead", 8))
403 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404 "Got Opus Header\n");
405 if (has_opus_stream && has_tags_packet)
407 /*If we're seeing another BOS OpusHead now it means
408 the stream is chained without an EOS.
409 This can easily happen if record helper is terminated unexpectedly.
413 opus_decoder_destroy (dec);
416 "\nWarning: stream %" PRId64
417 " ended without EOS and a new stream began.\n",
418 (int64_t) os.serialno);
420 if (! has_opus_stream)
422 if ((packet_count > 0) && (opus_serialno == os.serialno) )
425 "\nError: Apparent chaining without changing serial number (%"
426 PRId64 "==%" PRId64 ").\n",
427 (int64_t) opus_serialno, (int64_t) os.serialno);
430 opus_serialno = os.serialno;
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "Got header for stream %" PRId64 ", this is %dth link\n",
439 (int64_t) opus_serialno, total_links);
443 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n",
444 (int64_t) os.serialno);
447 if (! has_opus_stream || (os.serialno != opus_serialno) )
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 /*If first packet in a logical stream, process the Opus header*/
454 if (0 == packet_count)
456 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457 "Decoding header\n");
458 dec = process_header (&op);
462 if ((0 != ogg_stream_packetout (&os, &op)) || (255 ==
463 og.header[og.header_len
466 /*The format specifies that the initial header and tags packets are on their
467 own pages. To aid implementors in discovering that their files are wrong
468 we reject them explicitly here. In some player designs files like this would
469 fail even without an explicit test.*/
471 "Extra packets on initial header page. Invalid stream.\n");
475 /*Remember how many samples at the front we were told to skip
476 so that we can adjust the timestamp counting.*/
477 gran_offset = preskip;
481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
485 (unsigned int) sizeof(float),
486 (unsigned long long) (MAX_FRAME_SIZE * channels
488 pcm_buffer = pa_xmalloc (sizeof(float) * MAX_FRAME_SIZE * channels);
491 else if (1 == packet_count)
494 if ((0 != ogg_stream_packetout (&os, &op)) || (255 ==
495 og.header[og.header_len
499 "Extra packets on initial tags page. Invalid stream.\n");
509 /*End of stream condition*/
510 if (op.e_o_s && (os.serialno == opus_serialno) )
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514 eos = 1; /* don't care for anything except opus eos */
517 /*Decode Opus packet*/
518 ret = opus_decode_float (dec,
519 (const unsigned char *) op.packet,
524 /*If the decoder returned less than zero, we have an error.*/
527 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
531 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
535 (unsigned int) op.bytes);
537 /*Apply header gain, if we're not using an opus library new
538 enough to do this internally.*/
542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Applying gain %f\n",
545 for (i = 0; i < frame_size * channels; i++)
546 pcm_buffer[i] *= gain;
549 /*This handles making sure that our output duration respects
550 the final end-trim by not letting the output sample count
551 get ahead of the granpos indicated value.*/
552 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000)
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555 "Writing audio packet %" PRId64 ", at most %" PRId64
557 packet_count, maxout);
559 outsamp = audio_write (0 > maxout ? 0 : maxout);
568 opus_decoder_destroy (dec);
578 * @param msg message we received.
579 * @return #GNUNET_OK on success,
580 * #GNUNET_NO to stop further processing due to disconnect (no error)
581 * #GNUNET_SYSERR to stop further processing due to error
584 stdin_receiver (void *cls,
585 const struct GNUNET_MessageHeader *msg)
587 struct AudioMessage *audio;
592 switch (ntohs (msg->type))
594 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
595 audio = (struct AudioMessage *) msg;
596 payload_len = ntohs (audio->header.size) - sizeof(struct AudioMessage);
598 /*Get the ogg buffer for writing*/
599 data = ogg_sync_buffer (&oy, payload_len);
600 /*Read bitstream from input file*/
601 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
602 ogg_sync_wrote (&oy, payload_len);
604 ogg_demux_and_decode ();
615 * Callback when data is there for playback
618 stream_write_callback (pa_stream *s,
626 if (-1 != ready_pipe[1])
628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629 "Unblocking main loop!\n");
630 (void) write (ready_pipe[1], "r", 1);
636 * Exit callback for SIGTERM and SIGINT
639 exit_signal_callback (pa_mainloop_api *m,
648 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
649 _ ("gnunet-helper-audio-playback - Got signal, exiting\n"));
655 * Pulseaudio stream state callback
658 context_state_callback (pa_context *c,
664 GNUNET_assert (NULL != c);
665 switch (pa_context_get_state (c))
667 case PA_CONTEXT_CONNECTING:
668 case PA_CONTEXT_AUTHORIZING:
669 case PA_CONTEXT_SETTING_NAME:
672 case PA_CONTEXT_READY:
674 GNUNET_assert (! stream_out);
675 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
676 _ ("Connection established.\n"));
678 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
680 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
681 _ ("pa_stream_new() failed: %s\n"),
682 pa_strerror (pa_context_errno (c)));
685 pa_stream_set_write_callback (stream_out,
686 &stream_write_callback,
689 pa_stream_connect_playback (stream_out, NULL,
691 PA_STREAM_ADJUST_LATENCY
692 | PA_STREAM_INTERPOLATE_TIMING
693 | PA_STREAM_AUTO_TIMING_UPDATE,
696 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
697 _ ("pa_stream_connect_playback() failed: %s\n"),
698 pa_strerror (pa_context_errno (c)));
704 case PA_CONTEXT_TERMINATED:
708 case PA_CONTEXT_FAILED:
710 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
711 _ ("Connection failure: %s\n"),
712 pa_strerror (pa_context_errno (c)));
722 * Pulseaudio initialization
729 if (! pa_sample_spec_valid (&sample_spec))
731 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
734 /* set up threaded playback mainloop */
735 if (! (m = pa_threaded_mainloop_new ()))
737 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
738 _ ("pa_mainloop_new() failed.\n"));
740 mainloop_api = pa_threaded_mainloop_get_api (m);
741 /* listen to signals */
742 r = pa_signal_init (mainloop_api);
743 GNUNET_assert (r == 0);
744 pa_signal_new (SIGINT, exit_signal_callback, NULL);
745 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
748 /* connect to the main pulseaudio context */
749 if (! (context = pa_context_new (mainloop_api, "GNUnet VoIP")))
751 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
752 _ ("pa_context_new() failed.\n"));
754 pa_context_set_state_callback (context, context_state_callback, NULL);
756 if (pa_context_connect (context, NULL, 0, NULL) < 0)
758 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
759 _ ("pa_context_connect() failed: %s\n"),
760 pa_strerror (pa_context_errno (context)));
762 if (pa_threaded_mainloop_start (m) < 0)
764 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
765 _ ("pa_mainloop_run() failed.\n"));
778 drain_callback (pa_stream*s, int success, void *userdata)
783 pa_threaded_mainloop_signal (m,
789 * The main function for the playback helper.
791 * @param argc number of arguments from the command line
792 * @param argv command line arguments
793 * @return 0 ok, 1 on error
796 main (int argc, char *argv[])
798 static unsigned long long toff;
799 char readbuf[MAXLINE];
800 struct GNUNET_MessageStreamTokenizer *stdin_mst;
804 #ifdef DEBUG_READ_PURE_OGG
805 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
810 GNUNET_assert (GNUNET_OK ==
811 GNUNET_log_setup ("gnunet-helper-audio-playback",
814 if (0 != pipe (ready_pipe))
816 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
819 stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
822 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823 "Waiting for PulseAudio to be ready.\n");
824 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
825 close (ready_pipe[0]);
826 close (ready_pipe[1]);
829 #ifdef DEBUG_DUMP_DECODED_OGG
830 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
834 ret = read (STDIN_FILENO,
838 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839 "Received %d bytes of audio data (total: %llu)\n",
844 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
845 _ ("Read error from STDIN: %s\n"),
851 #ifdef DEBUG_READ_PURE_OGG
854 char *data = ogg_sync_buffer (&oy, ret);
855 GNUNET_memcpy (data, readbuf, ret);
856 ogg_sync_wrote (&oy, ret);
857 ogg_demux_and_decode ();
861 GNUNET_MST_from_buffer (stdin_mst,
863 GNUNET_NO, GNUNET_NO);
865 GNUNET_MST_destroy (stdin_mst);
868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870 pa_threaded_mainloop_lock (m);
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
874 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
876 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878 pa_threaded_mainloop_wait (m);
880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882 pa_operation_unref (o);
883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885 pa_threaded_mainloop_unlock (m);