2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file conversation/gnunet-helper-audio-playback.c
20 * @brief program to playback audio data to the speaker
21 * @author Siomon Dieterle
22 * @author Andreas Fuchs
23 * @author Christian Grothoff
26 #include "gnunet_util_lib.h"
27 #include "gnunet_protocols.h"
28 #include "conversation.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_core_service.h"
32 #include <pulse/simple.h>
33 #include <pulse/error.h>
34 #include <pulse/rtclock.h>
36 #include <pulse/pulseaudio.h>
37 #include <opus/opus.h>
38 #include <opus/opus_types.h>
41 #define DEBUG_READ_PURE_OGG 1
42 #define DEBUG_DUMP_DECODED_OGG 1
46 #define SAMPLING_RATE 48000
51 #define MAX_FRAME_SIZE (960 * 6)
54 * Pulseaudio specification. May change in the future.
56 static pa_sample_spec sample_spec = {
57 .format = PA_SAMPLE_FLOAT32LE,
58 .rate = SAMPLING_RATE,
62 #ifdef DEBUG_DUMP_DECODED_OGG
63 static int dump_to_stdout;
67 * Pulseaudio mainloop api
69 static pa_mainloop_api *mainloop_api;
72 * Pulseaudio threaded mainloop
74 static pa_threaded_mainloop *m;
79 static pa_context *context;
82 * Pulseaudio output stream
84 static pa_stream *stream_out;
89 static OpusDecoder *dec;
94 static float *pcm_buffer;
97 * Number of samples for one frame
99 static int frame_size;
102 * Pipe we use to signal the main loop that we are ready to receive.
104 static int ready_pipe[2];
109 static ogg_sync_state oy;
114 static ogg_stream_state os;
122 GNUNET_NETWORK_STRUCT_BEGIN
124 /* OggOpus spec says the numbers must be in little-endian order */
125 struct OpusHeadPacket
130 uint16_t preskip GNUNET_PACKED;
131 uint32_t sampling_rate GNUNET_PACKED;
132 uint16_t gain GNUNET_PACKED;
133 uint8_t channel_mapping;
136 GNUNET_NETWORK_STRUCT_END
139 * Process an Opus header and setup the opus decoder based on it.
140 * It takes several pointers for header values which are needed
141 * elsewhere in the code.
144 process_header (ogg_packet *op)
148 struct OpusHeadPacket header;
150 if ( ((unsigned int) op->bytes) < sizeof (header))
152 GNUNET_memcpy (&header,
155 header.preskip = GNUNET_le16toh (header.preskip);
156 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
157 header.gain = GNUNET_le16toh (header.gain);
159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
160 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
164 header.sampling_rate,
166 channels = header.channels;
167 preskip = header.preskip;
169 if (header.channel_mapping != 0)
172 "This implementation does not support non-mono streams\n");
176 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
180 "Cannot create encoder: %s\n",
181 opus_strerror (err));
187 "Decoder initialization failed: %s\n",
188 opus_strerror (err));
192 if (0 != header.gain)
194 /*Gain API added in a newer libopus version, if we don't have it
195 we apply the gain ourselves. We also add in a user provided
196 manual gain at the same time.*/
197 int gainadj = (int) header.gain;
198 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
199 if(OPUS_UNIMPLEMENTED == err)
201 gain = pow (10.0, gainadj / 5120.0);
203 else if (OPUS_OK != err)
205 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
214 #ifdef DEBUG_DUMP_DECODED_OGG
216 fwrite_le32(opus_int32 i32, FILE *file)
218 unsigned char buf[4];
219 buf[0]=(unsigned char)(i32&0xFF);
220 buf[1]=(unsigned char)(i32>>8&0xFF);
221 buf[2]=(unsigned char)(i32>>16&0xFF);
222 buf[3]=(unsigned char)(i32>>24&0xFF);
223 return fwrite(buf,4,1,file);
228 fwrite_le16(int i16, FILE *file)
230 unsigned char buf[2];
231 buf[0]=(unsigned char)(i16&0xFF);
232 buf[1]=(unsigned char)(i16>>8&0xFF);
233 return fwrite(buf,2,1,file);
243 ret = fprintf (file, "RIFF") >= 0;
244 ret &= fwrite_le32 (0x7fffffff, file);
246 ret &= fprintf (file, "WAVEfmt ") >= 0;
247 ret &= fwrite_le32 (16, file);
248 ret &= fwrite_le16 (1, file);
249 ret &= fwrite_le16 (channels, file);
250 ret &= fwrite_le32 (SAMPLING_RATE, file);
251 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
252 ret &= fwrite_le16 (2*channels, file);
253 ret &= fwrite_le16 (16, file);
255 ret &= fprintf (file, "data") >= 0;
256 ret &= fwrite_le32 (0x7fffffff, file);
258 return !ret ? -1 : 16;
265 audio_write (int64_t maxout)
272 #ifdef DEBUG_DUMP_DECODED_OGG
273 static int wrote_wav_header;
275 if (dump_to_stdout && !wrote_wav_header)
278 wrote_wav_header = 1;
281 maxout = 0 > maxout ? 0 : maxout;
284 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
286 output = pcm_buffer + channels * tmp_skip;
287 out_len = frame_size - tmp_skip;
288 if (out_len > MAX_FRAME_SIZE)
292 to_write = out_len < maxout ? out_len : (unsigned) maxout;
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298 "Writing %u * %u * %u = %llu bytes into PA\n",
301 (unsigned int) sizeof (float),
302 (unsigned long long) (to_write * channels * sizeof (float)));
303 #ifdef DEBUG_DUMP_DECODED_OGG
306 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
307 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
308 # define float2int(flt) ((int)(floor(.5+flt)))
310 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
311 for (i=0;i<(int)out_len*channels;i++)
312 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
314 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
319 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
320 PA_SEEK_RELATIVE) < 0)
322 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
323 _("pa_stream_write() failed: %s\n"),
324 pa_strerror (pa_context_errno (context)));
329 } while (0 < frame_size && 0 < maxout);
331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
332 "Wrote %" PRId64 " samples\n",
339 * Pulseaudio shutdown task
344 mainloop_api->quit (mainloop_api,
351 ogg_demux_and_decode ()
354 static int stream_init;
355 int64_t page_granule = 0;
357 static int has_opus_stream;
358 static int has_tags_packet;
359 static int32_t opus_serialno;
360 static int64_t link_out;
361 static int64_t packet_count;
363 static int total_links;
364 static int gran_offset;
366 while (1 == ogg_sync_pageout (&oy, &og))
368 if (0 == stream_init)
370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371 "Initialized the stream\n");
372 ogg_stream_init (&os, ogg_page_serialno (&og));
375 if (ogg_page_serialno (&og) != os.serialno)
377 /* so all streams are read. */
378 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379 "Re-set serial number\n");
380 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
382 /*Add page to the bitstream*/
383 ogg_stream_pagein (&os, &og);
384 page_granule = ogg_page_granulepos (&og);
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386 "Reading page that ends at %" PRId64 "\n",
388 /*Extract all available packets*/
389 while (1 == ogg_stream_packetout (&os, &op))
391 /*OggOpus streams are identified by a magic string in the initial
393 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396 "Got Opus Header\n");
397 if (has_opus_stream && has_tags_packet)
399 /*If we're seeing another BOS OpusHead now it means
400 the stream is chained without an EOS.
401 This can easily happen if record helper is terminated unexpectedly.
405 opus_decoder_destroy (dec);
407 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
409 if (!has_opus_stream)
411 if (packet_count > 0 && opus_serialno == os.serialno)
413 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
414 (int64_t) opus_serialno, (int64_t) os.serialno);
417 opus_serialno = os.serialno;
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425 "Got header for stream %" PRId64 ", this is %dth link\n",
426 (int64_t) opus_serialno, total_links);
430 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
433 if (!has_opus_stream || os.serialno != opus_serialno)
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439 /*If first packet in a logical stream, process the Opus header*/
440 if (0 == packet_count)
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Decoding header\n");
444 dec = process_header (&op);
448 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
450 /*The format specifies that the initial header and tags packets are on their
451 own pages. To aid implementors in discovering that their files are wrong
452 we reject them explicitly here. In some player designs files like this would
453 fail even without an explicit test.*/
454 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
458 /*Remember how many samples at the front we were told to skip
459 so that we can adjust the timestamp counting.*/
460 gran_offset = preskip;
464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
468 (unsigned int) sizeof (float),
469 (unsigned long long) (MAX_FRAME_SIZE * channels * sizeof (float)));
470 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
473 else if (1 == packet_count)
476 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
478 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
488 /*End of stream condition*/
489 if (op.e_o_s && os.serialno == opus_serialno)
491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
493 eos = 1; /* don't care for anything except opus eos */
496 /*Decode Opus packet*/
497 ret = opus_decode_float (dec,
498 (const unsigned char *) op.packet,
503 /*If the decoder returned less than zero, we have an error.*/
506 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
514 (unsigned int) op.bytes);
516 /*Apply header gain, if we're not using an opus library new
517 enough to do this internally.*/
521 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522 "Applying gain %f\n",
524 for (i = 0; i < frame_size * channels; i++)
525 pcm_buffer[i] *= gain;
528 /*This handles making sure that our output duration respects
529 the final end-trim by not letting the output sample count
530 get ahead of the granpos indicated value.*/
531 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
532 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
533 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
534 packet_count, maxout);
536 outsamp = audio_write (0 > maxout ? 0 : maxout);
545 opus_decoder_destroy (dec);
555 * @param msg message we received.
556 * @return #GNUNET_OK on success,
557 * #GNUNET_NO to stop further processing due to disconnect (no error)
558 * #GNUNET_SYSERR to stop further processing due to error
561 stdin_receiver (void *cls,
562 const struct GNUNET_MessageHeader *msg)
564 struct AudioMessage *audio;
569 switch (ntohs (msg->type))
571 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
572 audio = (struct AudioMessage *) msg;
573 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
575 /*Get the ogg buffer for writing*/
576 data = ogg_sync_buffer (&oy, payload_len);
577 /*Read bitstream from input file*/
578 GNUNET_memcpy (data, (const unsigned char *) &audio[1], payload_len);
579 ogg_sync_wrote (&oy, payload_len);
581 ogg_demux_and_decode ();
591 * Callback when data is there for playback
594 stream_write_callback (pa_stream *s,
602 if (-1 != ready_pipe[1])
604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
605 "Unblocking main loop!\n");
606 (void) write (ready_pipe[1], "r", 1);
612 * Exit callback for SIGTERM and SIGINT
615 exit_signal_callback (pa_mainloop_api *m,
624 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
625 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
631 * Pulseaudio stream state callback
634 context_state_callback (pa_context *c,
640 GNUNET_assert (NULL != c);
641 switch (pa_context_get_state (c))
643 case PA_CONTEXT_CONNECTING:
644 case PA_CONTEXT_AUTHORIZING:
645 case PA_CONTEXT_SETTING_NAME:
647 case PA_CONTEXT_READY:
649 GNUNET_assert (! stream_out);
650 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
651 _("Connection established.\n"));
653 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
655 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
656 _("pa_stream_new() failed: %s\n"),
657 pa_strerror (pa_context_errno (c)));
660 pa_stream_set_write_callback (stream_out,
661 &stream_write_callback,
664 pa_stream_connect_playback (stream_out, NULL,
666 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
669 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
670 _("pa_stream_connect_playback() failed: %s\n"),
671 pa_strerror (pa_context_errno (c)));
676 case PA_CONTEXT_TERMINATED:
680 case PA_CONTEXT_FAILED:
682 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
683 _("Connection failure: %s\n"),
684 pa_strerror (pa_context_errno (c)));
694 * Pulseaudio initialization
701 if (!pa_sample_spec_valid (&sample_spec))
703 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
706 /* set up threaded playback mainloop */
707 if (!(m = pa_threaded_mainloop_new ()))
709 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
710 _("pa_mainloop_new() failed.\n"));
712 mainloop_api = pa_threaded_mainloop_get_api (m);
713 /* listen to signals */
714 r = pa_signal_init (mainloop_api);
715 GNUNET_assert (r == 0);
716 pa_signal_new (SIGINT, exit_signal_callback, NULL);
717 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
720 /* connect to the main pulseaudio context */
721 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
723 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
724 _("pa_context_new() failed.\n"));
726 pa_context_set_state_callback (context, context_state_callback, NULL);
728 if (pa_context_connect (context, NULL, 0, NULL) < 0)
730 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
731 _("pa_context_connect() failed: %s\n"),
732 pa_strerror (pa_context_errno (context)));
734 if (pa_threaded_mainloop_start (m) < 0)
736 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
737 _("pa_mainloop_run() failed.\n"));
750 drain_callback (pa_stream*s, int success, void *userdata)
755 pa_threaded_mainloop_signal (m,
761 * The main function for the playback helper.
763 * @param argc number of arguments from the command line
764 * @param argv command line arguments
765 * @return 0 ok, 1 on error
768 main (int argc, char *argv[])
770 static unsigned long long toff;
771 char readbuf[MAXLINE];
772 struct GNUNET_MessageStreamTokenizer *stdin_mst;
775 #ifdef DEBUG_READ_PURE_OGG
776 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
781 GNUNET_assert (GNUNET_OK ==
782 GNUNET_log_setup ("gnunet-helper-audio-playback",
785 if (0 != pipe (ready_pipe))
787 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
790 stdin_mst = GNUNET_MST_create (&stdin_receiver, NULL);
793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 "Waiting for PulseAudio to be ready.\n");
795 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
796 close (ready_pipe[0]);
797 close (ready_pipe[1]);
800 #ifdef DEBUG_DUMP_DECODED_OGG
801 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
805 ret = read (STDIN_FILENO,
809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810 "Received %d bytes of audio data (total: %llu)\n",
815 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
816 _("Read error from STDIN: %s\n"),
822 #ifdef DEBUG_READ_PURE_OGG
825 char *data = ogg_sync_buffer (&oy, ret);
826 GNUNET_memcpy (data, readbuf, ret);
827 ogg_sync_wrote (&oy, ret);
828 ogg_demux_and_decode ();
832 GNUNET_MST_from_buffer (stdin_mst,
834 GNUNET_NO, GNUNET_NO);
836 GNUNET_MST_destroy (stdin_mst);
839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841 pa_threaded_mainloop_lock (m);
842 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
845 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
847 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849 pa_threaded_mainloop_wait (m);
851 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853 pa_operation_unref (o);
854 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856 pa_threaded_mainloop_unlock (m);