2 This file is part of GNUnet.
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file conversation/gnunet-helper-audio-playback.c
22 * @brief program to playback audio data to the speaker
23 * @author Siomon Dieterle
24 * @author Andreas Fuchs
25 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "conversation.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
34 #include <pulse/simple.h>
35 #include <pulse/error.h>
36 #include <pulse/rtclock.h>
38 #include <pulse/pulseaudio.h>
39 #include <opus/opus.h>
40 #include <opus/opus_types.h>
43 #define DEBUG_READ_PURE_OGG 1
44 #define DEBUG_DUMP_DECODED_OGG 1
48 #define SAMPLING_RATE 48000
53 #define MAX_FRAME_SIZE (960 * 6)
56 * Pulseaudio specification. May change in the future.
58 static pa_sample_spec sample_spec = {
59 .format = PA_SAMPLE_FLOAT32LE,
60 .rate = SAMPLING_RATE,
64 #ifdef DEBUG_DUMP_DECODED_OGG
65 static int dump_to_stdout;
69 * Pulseaudio mainloop api
71 static pa_mainloop_api *mainloop_api;
74 * Pulseaudio threaded mainloop
76 static pa_threaded_mainloop *m;
81 static pa_context *context;
84 * Pulseaudio output stream
86 static pa_stream *stream_out;
91 static OpusDecoder *dec;
96 static float *pcm_buffer;
99 * Number of samples for one frame
101 static int frame_size;
104 * Pipe we use to signal the main loop that we are ready to receive.
106 static int ready_pipe[2];
111 static ogg_sync_state oy;
116 static ogg_stream_state os;
124 GNUNET_NETWORK_STRUCT_BEGIN
126 /* OggOpus spec says the numbers must be in little-endian order */
127 struct OpusHeadPacket
132 uint16_t preskip GNUNET_PACKED;
133 uint32_t sampling_rate GNUNET_PACKED;
134 uint16_t gain GNUNET_PACKED;
135 uint8_t channel_mapping;
138 GNUNET_NETWORK_STRUCT_END
140 /*Process an Opus header and setup the opus decoder based on it.
141 It takes several pointers for header values which are needed
142 elsewhere in the code.*/
144 process_header (ogg_packet *op)
148 struct OpusHeadPacket header;
150 if (op->bytes < sizeof (header))
152 memcpy (&header, op->packet, sizeof (header));
153 header.preskip = GNUNET_le16toh (header.preskip);
154 header.sampling_rate = GNUNET_le32toh (header.sampling_rate);
155 header.gain = GNUNET_le16toh (header.gain);
157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
158 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
159 header.version, header.channels, header.preskip, header.sampling_rate, header.gain);
161 channels = header.channels;
162 preskip = header.preskip;
164 if (header.channel_mapping != 0)
166 fprintf (stderr, "This implementation does not support non-mono streams\n");
170 dec = opus_decoder_create (SAMPLING_RATE, channels, &err);
173 fprintf (stderr, "Cannot create encoder: %s\n", opus_strerror (err));
178 fprintf (stderr, "Decoder initialization failed: %s\n", opus_strerror (err));
182 if (0 != header.gain)
184 /*Gain API added in a newer libopus version, if we don't have it
185 we apply the gain ourselves. We also add in a user provided
186 manual gain at the same time.*/
187 int gainadj = (int) header.gain;
188 err = opus_decoder_ctl (dec, OPUS_SET_GAIN (gainadj));
189 if(OPUS_UNIMPLEMENTED == err)
191 gain = pow (10.0, gainadj / 5120.0);
193 else if (OPUS_OK != err)
195 fprintf (stderr, "Error setting gain: %s\n", opus_strerror (err));
204 #ifdef DEBUG_DUMP_DECODED_OGG
205 static size_t fwrite_le32(opus_int32 i32, FILE *file)
207 unsigned char buf[4];
208 buf[0]=(unsigned char)(i32&0xFF);
209 buf[1]=(unsigned char)(i32>>8&0xFF);
210 buf[2]=(unsigned char)(i32>>16&0xFF);
211 buf[3]=(unsigned char)(i32>>24&0xFF);
212 return fwrite(buf,4,1,file);
215 static size_t fwrite_le16(int i16, FILE *file)
217 unsigned char buf[2];
218 buf[0]=(unsigned char)(i16&0xFF);
219 buf[1]=(unsigned char)(i16>>8&0xFF);
220 return fwrite(buf,2,1,file);
223 static int write_wav_header()
228 ret = fprintf (file, "RIFF") >= 0;
229 ret &= fwrite_le32 (0x7fffffff, file);
231 ret &= fprintf (file, "WAVEfmt ") >= 0;
232 ret &= fwrite_le32 (16, file);
233 ret &= fwrite_le16 (1, file);
234 ret &= fwrite_le16 (channels, file);
235 ret &= fwrite_le32 (SAMPLING_RATE, file);
236 ret &= fwrite_le32 (2*channels*SAMPLING_RATE, file);
237 ret &= fwrite_le16 (2*channels, file);
238 ret &= fwrite_le16 (16, file);
240 ret &= fprintf (file, "data") >= 0;
241 ret &= fwrite_le32 (0x7fffffff, file);
243 return !ret ? -1 : 16;
249 audio_write (int64_t maxout)
256 #ifdef DEBUG_DUMP_DECODED_OGG
257 static int wrote_wav_header;
259 if (dump_to_stdout && !wrote_wav_header)
262 wrote_wav_header = 1;
265 maxout = 0 > maxout ? 0 : maxout;
268 tmp_skip = (preskip > frame_size) ? (int) frame_size : preskip;
270 output = pcm_buffer + channels * tmp_skip;
271 out_len = frame_size - tmp_skip;
272 if (out_len > MAX_FRAME_SIZE)
276 to_write = out_len < maxout ? out_len : (unsigned) maxout;
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Writing %u * %u * %u = %u bytes into PA\n",
283 to_write, channels, sizeof (float),
284 to_write * channels * sizeof (float));
285 #ifdef DEBUG_DUMP_DECODED_OGG
288 # define fminf(_x,_y) ((_x)<(_y)?(_x):(_y))
289 # define fmaxf(_x,_y) ((_x)>(_y)?(_x):(_y))
290 # define float2int(flt) ((int)(floor(.5+flt)))
292 int16_t *out = alloca(sizeof(short)*MAX_FRAME_SIZE*channels);
293 for (i=0;i<(int)out_len*channels;i++)
294 out[i]=(short)float2int(fmaxf(-32768,fminf(output[i]*32768.f,32767)));
296 fwrite (out, 2 * channels, out_len<maxout?out_len:maxout, stdout);
301 (stream_out, output, to_write * channels * sizeof (float), NULL, 0,
302 PA_SEEK_RELATIVE) < 0)
304 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
305 _("pa_stream_write() failed: %s\n"),
306 pa_strerror (pa_context_errno (context)));
311 } while (0 < frame_size && 0 < maxout);
313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
314 "Wrote %" PRId64 " samples\n",
321 * Pulseaudio shutdown task
326 mainloop_api->quit (mainloop_api, ret);
332 ogg_demux_and_decode ()
335 static int stream_init;
336 int64_t page_granule = 0;
338 static int has_opus_stream;
339 static int has_tags_packet;
340 static int32_t opus_serialno;
341 static int64_t link_out;
342 static int64_t packet_count;
344 static int total_links;
345 static int gran_offset;
347 while (1 == ogg_sync_pageout (&oy, &og))
349 if (0 == stream_init)
351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352 "Initialized the stream\n");
353 ogg_stream_init (&os, ogg_page_serialno (&og));
356 if (ogg_page_serialno (&og) != os.serialno)
358 /* so all streams are read. */
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360 "Re-set serial number\n");
361 ogg_stream_reset_serialno (&os, ogg_page_serialno (&og));
363 /*Add page to the bitstream*/
364 ogg_stream_pagein (&os, &og);
365 page_granule = ogg_page_granulepos (&og);
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "Reading page that ends at %" PRId64 "\n",
369 /*Extract all available packets*/
370 while (1 == ogg_stream_packetout (&os, &op))
372 /*OggOpus streams are identified by a magic string in the initial
374 if (op.b_o_s && op.bytes >= 8 && !memcmp (op.packet, "OpusHead", 8))
376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377 "Got Opus Header\n");
378 if (has_opus_stream && has_tags_packet)
380 /*If we're seeing another BOS OpusHead now it means
381 the stream is chained without an EOS.
382 This can easily happen if record helper is terminated unexpectedly.
386 opus_decoder_destroy (dec);
388 fprintf (stderr, "\nWarning: stream %" PRId64 " ended without EOS and a new stream began.\n", (int64_t) os.serialno);
390 if (!has_opus_stream)
392 if (packet_count > 0 && opus_serialno == os.serialno)
394 fprintf (stderr, "\nError: Apparent chaining without changing serial number (%" PRId64 "==%" PRId64 ").\n",
395 (int64_t) opus_serialno, (int64_t) os.serialno);
398 opus_serialno = os.serialno;
405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406 "Got header for stream %" PRId64 ", this is %dth link\n",
407 (int64_t) opus_serialno, total_links);
411 fprintf (stderr, "\nWarning: ignoring opus stream %" PRId64 "\n", (int64_t) os.serialno);
414 if (!has_opus_stream || os.serialno != opus_serialno)
416 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
420 /*If first packet in a logical stream, process the Opus header*/
421 if (0 == packet_count)
423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
424 "Decoding header\n");
425 dec = process_header (&op);
429 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
431 /*The format specifies that the initial header and tags packets are on their
432 own pages. To aid implementors in discovering that their files are wrong
433 we reject them explicitly here. In some player designs files like this would
434 fail even without an explicit test.*/
435 fprintf (stderr, "Extra packets on initial header page. Invalid stream.\n");
439 /*Remember how many samples at the front we were told to skip
440 so that we can adjust the timestamp counting.*/
441 gran_offset = preskip;
445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446 "Allocating %u * %u * %u = %u bytes of buffer space\n",
447 MAX_FRAME_SIZE, channels, sizeof (float),
448 MAX_FRAME_SIZE * channels * sizeof (float));
449 pcm_buffer = pa_xmalloc (sizeof (float) * MAX_FRAME_SIZE * channels);
452 else if (1 == packet_count)
455 if (0 != ogg_stream_packetout (&os, &op) || 255 == og.header[og.header_len - 1])
457 fprintf (stderr, "Extra packets on initial tags page. Invalid stream.\n");
467 /*End of stream condition*/
468 if (op.e_o_s && os.serialno == opus_serialno)
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
472 eos = 1; /* don't care for anything except opus eos */
475 /*Decode Opus packet*/
476 ret = opus_decode_float (dec,
477 (const unsigned char *) op.packet,
482 /*If the decoder returned less than zero, we have an error.*/
485 fprintf (stderr, "Decoding error: %s\n", opus_strerror (ret));
489 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
491 ret, ret * channels, op.bytes);
493 /*Apply header gain, if we're not using an opus library new
494 enough to do this internally.*/
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499 "Applying gain %f\n",
501 for (i = 0; i < frame_size * channels; i++)
502 pcm_buffer[i] *= gain;
505 /*This handles making sure that our output duration respects
506 the final end-trim by not letting the output sample count
507 get ahead of the granpos indicated value.*/
508 maxout = ((page_granule - gran_offset) * SAMPLING_RATE / 48000) - link_out;
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510 "Writing audio packet %" PRId64 ", at most %" PRId64 " samples\n",
511 packet_count, maxout);
513 outsamp = audio_write (0 > maxout ? 0 : maxout);
522 opus_decoder_destroy (dec);
532 stdin_receiver (void *cls,
534 const struct GNUNET_MessageHeader *msg)
536 struct AudioMessage *audio;
540 switch (ntohs (msg->type))
542 case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
543 audio = (struct AudioMessage *) msg;
544 payload_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
546 /*Get the ogg buffer for writing*/
547 data = ogg_sync_buffer (&oy, payload_len);
548 /*Read bitstream from input file*/
549 memcpy (data, (const unsigned char *) &audio[1], payload_len);
550 ogg_sync_wrote (&oy, payload_len);
552 ogg_demux_and_decode ();
562 * Callback when data is there for playback
565 stream_write_callback (pa_stream * s,
570 if (-1 != ready_pipe[1])
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573 "Unblocking main loop!\n");
574 write (ready_pipe[1], "r", 1);
580 * Exit callback for SIGTERM and SIGINT
583 exit_signal_callback (pa_mainloop_api * m, pa_signal_event * e, int sig,
586 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
587 _("gnunet-helper-audio-playback - Got signal, exiting\n"));
593 * Pulseaudio stream state callback
596 context_state_callback (pa_context * c,
601 GNUNET_assert (NULL != c);
602 switch (pa_context_get_state (c))
604 case PA_CONTEXT_CONNECTING:
605 case PA_CONTEXT_AUTHORIZING:
606 case PA_CONTEXT_SETTING_NAME:
608 case PA_CONTEXT_READY:
610 GNUNET_assert (!stream_out);
611 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
612 _("Connection established.\n"));
614 pa_stream_new (c, "GNUNET VoIP playback", &sample_spec, NULL)))
616 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
617 _("pa_stream_new() failed: %s\n"),
618 pa_strerror (pa_context_errno (c)));
621 pa_stream_set_write_callback (stream_out,
622 &stream_write_callback,
625 pa_stream_connect_playback (stream_out, NULL,
627 PA_STREAM_ADJUST_LATENCY | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE,
630 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
631 _("pa_stream_connect_playback() failed: %s\n"),
632 pa_strerror (pa_context_errno (c)));
637 case PA_CONTEXT_TERMINATED:
641 case PA_CONTEXT_FAILED:
643 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
644 _("Connection failure: %s\n"),
645 pa_strerror (pa_context_errno (c)));
655 * Pulseaudio initialization
662 if (!pa_sample_spec_valid (&sample_spec))
664 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
667 /* set up threaded playback mainloop */
668 if (!(m = pa_threaded_mainloop_new ()))
670 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671 _("pa_mainloop_new() failed.\n"));
673 mainloop_api = pa_threaded_mainloop_get_api (m);
674 /* listen to signals */
675 r = pa_signal_init (mainloop_api);
676 GNUNET_assert (r == 0);
677 pa_signal_new (SIGINT, exit_signal_callback, NULL);
678 pa_signal_new (SIGTERM, exit_signal_callback, NULL);
681 /* connect to the main pulseaudio context */
682 if (!(context = pa_context_new (mainloop_api, "GNUnet VoIP")))
684 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
685 _("pa_context_new() failed.\n"));
687 pa_context_set_state_callback (context, context_state_callback, NULL);
689 if (pa_context_connect (context, NULL, 0, NULL) < 0)
691 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
692 _("pa_context_connect() failed: %s\n"),
693 pa_strerror (pa_context_errno (context)));
695 if (pa_threaded_mainloop_start (m) < 0)
697 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
698 _("pa_mainloop_run() failed.\n"));
710 drain_callback (pa_stream*s, int success, void *userdata)
712 pa_threaded_mainloop_signal (m, 0);
716 * The main function for the playback helper.
718 * @param argc number of arguments from the command line
719 * @param argv command line arguments
720 * @return 0 ok, 1 on error
723 main (int argc, char *argv[])
725 static unsigned long long toff;
727 char readbuf[MAXLINE];
728 struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst;
731 #ifdef DEBUG_READ_PURE_OGG
732 int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
735 GNUNET_assert (GNUNET_OK ==
736 GNUNET_log_setup ("gnunet-helper-audio-playback",
739 if (0 != pipe (ready_pipe))
741 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "pipe");
744 stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL);
747 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748 "Waiting for PulseAudio to be ready.\n");
749 GNUNET_assert (1 == read (ready_pipe[0], &c, 1));
750 close (ready_pipe[0]);
751 close (ready_pipe[1]);
754 #ifdef DEBUG_DUMP_DECODED_OGG
755 dump_to_stdout = getenv ("GNUNET_DUMP_DECODED_OGG") ? 1 : 0;
759 ret = read (0, readbuf, sizeof (readbuf));
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "Received %d bytes of audio data (total: %llu)\n",
767 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
768 _("Read error from STDIN: %s\n"),
774 #ifdef DEBUG_READ_PURE_OGG
777 char *data = ogg_sync_buffer (&oy, ret);
778 memcpy (data, readbuf, ret);
779 ogg_sync_wrote (&oy, ret);
780 ogg_demux_and_decode ();
784 GNUNET_SERVER_mst_receive (stdin_mst, NULL,
786 GNUNET_NO, GNUNET_NO);
788 GNUNET_SERVER_mst_destroy (stdin_mst);
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 pa_threaded_mainloop_lock (m);
794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796 pa_operation *o = pa_stream_drain (stream_out, drain_callback, NULL);
797 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801 pa_threaded_mainloop_wait (m);
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805 pa_operation_unref (o);
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808 pa_threaded_mainloop_unlock (m);