social cli
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of new cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "cadet.h"
30 #include "cadet_protocol.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
33 #define DATA_OVERHEAD sizeof(struct GNUNET_CADET_LocalData)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  */
42 struct GNUNET_CADET_TransmitHandle
43 {
44
45     /**
46      * Double Linked list
47      */
48   struct GNUNET_CADET_TransmitHandle *next;
49
50     /**
51      * Double Linked list
52      */
53   struct GNUNET_CADET_TransmitHandle *prev;
54
55     /**
56      * Channel this message is sent on / for (may be NULL for control messages).
57      */
58   struct GNUNET_CADET_Channel *channel;
59
60     /**
61      * Callback to obtain the message to transmit, or NULL if we
62      * got the message in 'data'.  Notice that messages built
63      * by 'notify' need to be encapsulated with information about
64      * the 'target'.
65      */
66   GNUNET_CONNECTION_TransmitReadyNotify notify;
67
68     /**
69      * Closure for 'notify'
70      */
71   void *notify_cls;
72
73     /**
74      * How long is this message valid.  Once the timeout has been
75      * reached, the message must no longer be sent.  If this
76      * is a message with a 'notify' callback set, the 'notify'
77      * function should be called with 'buf' NULL and size 0.
78      */
79   struct GNUNET_TIME_Absolute timeout;
80
81     /**
82      * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
83      */
84   struct GNUNET_SCHEDULER_Task * timeout_task;
85
86     /**
87      * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
88      */
89   size_t size;
90 };
91
92 union CadetInfoCB {
93
94   /**
95    * Channel callback.
96    */
97   GNUNET_CADET_ChannelCB channel_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeersCB peers_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_PeerCB peer_cb;
108
109   /**
110    * Monitor callback
111    */
112   GNUNET_CADET_TunnelsCB tunnels_cb;
113
114   /**
115    * Tunnel callback.
116    */
117   GNUNET_CADET_TunnelCB tunnel_cb;
118 };
119
120
121 /**
122  * Opaque handle to the service.
123  */
124 struct GNUNET_CADET_Handle
125 {
126
127     /**
128      * Handle to the server connection, to send messages later
129      */
130   struct GNUNET_CLIENT_Connection *client;
131
132     /**
133      * Set of handlers used for processing incoming messages in the channels
134      */
135   const struct GNUNET_CADET_MessageHandler *message_handlers;
136
137   /**
138    * Number of handlers in the handlers array.
139    */
140   unsigned int n_handlers;
141
142   /**
143    * Ports open.
144    */
145   const uint32_t *ports;
146
147   /**
148    * Number of ports.
149    */
150   unsigned int n_ports;
151
152     /**
153      * Double linked list of the channels this client is connected to, head.
154      */
155   struct GNUNET_CADET_Channel *channels_head;
156
157     /**
158      * Double linked list of the channels this client is connected to, tail.
159      */
160   struct GNUNET_CADET_Channel *channels_tail;
161
162     /**
163      * Callback for inbound channel creation
164      */
165   GNUNET_CADET_InboundChannelNotificationHandler *new_channel;
166
167     /**
168      * Callback for inbound channel disconnection
169      */
170   GNUNET_CADET_ChannelEndHandler *cleaner;
171
172     /**
173      * Handle to cancel pending transmissions in case of disconnection
174      */
175   struct GNUNET_CLIENT_TransmitHandle *th;
176
177     /**
178      * Closure for all the handlers given by the client
179      */
180   void *cls;
181
182     /**
183      * Messages to send to the service, head.
184      */
185   struct GNUNET_CADET_TransmitHandle *th_head;
186
187     /**
188      * Messages to send to the service, tail.
189      */
190   struct GNUNET_CADET_TransmitHandle *th_tail;
191
192     /**
193      * chid of the next channel to create (to avoid reusing IDs often)
194      */
195   CADET_ChannelNumber next_chid;
196
197     /**
198      * Have we started the task to receive messages from the service
199      * yet? We do this after we send the 'CADET_LOCAL_CONNECT' message.
200      */
201   int in_receive;
202
203   /**
204    * Configuration given by the client, in case of reconnection
205    */
206   const struct GNUNET_CONFIGURATION_Handle *cfg;
207
208   /**
209    * Time to the next reconnect in case one reconnect fails
210    */
211   struct GNUNET_TIME_Relative reconnect_time;
212
213   /**
214    * Task for trying to reconnect.
215    */
216   struct GNUNET_SCHEDULER_Task * reconnect_task;
217
218   /**
219    * Callback for an info task (only one active at a time).
220    */
221   union CadetInfoCB info_cb;
222
223   /**
224    * Info callback closure for @c info_cb.
225    */
226   void *info_cls;
227 };
228
229
230 /**
231  * Description of a peer
232  */
233 struct GNUNET_CADET_Peer
234 {
235     /**
236      * ID of the peer in short form
237      */
238   GNUNET_PEER_Id id;
239
240   /**
241    * Channel this peer belongs to
242    */
243   struct GNUNET_CADET_Channel *t;
244 };
245
246
247 /**
248  * Opaque handle to a channel.
249  */
250 struct GNUNET_CADET_Channel
251 {
252
253     /**
254      * DLL next
255      */
256   struct GNUNET_CADET_Channel *next;
257
258     /**
259      * DLL prev
260      */
261   struct GNUNET_CADET_Channel *prev;
262
263     /**
264      * Handle to the cadet this channel belongs to
265      */
266   struct GNUNET_CADET_Handle *cadet;
267
268     /**
269      * Local ID of the channel
270      */
271   CADET_ChannelNumber chid;
272
273     /**
274      * Port number.
275      */
276   uint32_t port;
277
278     /**
279      * Other end of the channel.
280      */
281   GNUNET_PEER_Id peer;
282
283   /**
284    * Any data the caller wants to put in here
285    */
286   void *ctx;
287
288     /**
289      * Size of packet queued in this channel
290      */
291   unsigned int packet_size;
292
293     /**
294      * Channel options: reliability, etc.
295      */
296   enum GNUNET_CADET_ChannelOption options;
297
298     /**
299      * Are we allowed to send to the service?
300      */
301   int allow_send;
302
303 };
304
305
306 /**
307  * Implementation state for cadet's message queue.
308  */
309 struct CadetMQState
310 {
311   /**
312    * The current transmit handle, or NULL
313    * if no transmit is active.
314    */
315   struct GNUNET_CADET_TransmitHandle *th;
316
317   /**
318    * Channel to send the data over.
319    */
320   struct GNUNET_CADET_Channel *channel;
321 };
322
323
324 /******************************************************************************/
325 /***********************         DECLARATIONS         *************************/
326 /******************************************************************************/
327
328 /**
329  * Function called to send a message to the service.
330  * "buf" will be NULL and "size" zero if the socket was closed for writing in
331  * the meantime.
332  *
333  * @param cls closure, the cadet handle
334  * @param size number of bytes available in buf
335  * @param buf where the callee should write the connect message
336  * @return number of bytes written to buf
337  */
338 static size_t
339 send_callback (void *cls, size_t size, void *buf);
340
341
342 /******************************************************************************/
343 /***********************     AUXILIARY FUNCTIONS      *************************/
344 /******************************************************************************/
345
346 /**
347  * Check if transmission is a payload packet.
348  *
349  * @param th Transmission handle.
350  *
351  * @return #GNUNET_YES if it is a payload packet,
352  *         #GNUNET_NO if it is a cadet management packet.
353  */
354 static int
355 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
356 {
357   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
358 }
359
360
361 /**
362  * Check whether there is any message ready in the queue and find the size.
363  *
364  * @param h Cadet handle.
365  *
366  * @return The size of the first ready message in the queue, including overhead.
367  *         0 if there is none.
368  */
369 static size_t
370 message_ready_size (struct GNUNET_CADET_Handle *h)
371 {
372   struct GNUNET_CADET_TransmitHandle *th;
373   struct GNUNET_CADET_Channel *ch;
374
375   for (th = h->th_head; NULL != th; th = th->next)
376   {
377     ch = th->channel;
378     if (GNUNET_NO == th_is_payload (th) || GNUNET_YES == ch->allow_send)
379       return th->size;
380   }
381   return 0;
382 }
383
384
385 /**
386  * Get the channel handler for the channel specified by id from the given handle
387  * @param h Cadet handle
388  * @param chid ID of the wanted channel
389  * @return handle to the required channel or NULL if not found
390  */
391 static struct GNUNET_CADET_Channel *
392 retrieve_channel (struct GNUNET_CADET_Handle *h, CADET_ChannelNumber chid)
393 {
394   struct GNUNET_CADET_Channel *ch;
395
396   ch = h->channels_head;
397   while (ch != NULL)
398   {
399     if (ch->chid == chid)
400       return ch;
401     ch = ch->next;
402   }
403   return NULL;
404 }
405
406
407 /**
408  * Create a new channel and insert it in the channel list of the cadet handle
409  *
410  * @param h Cadet handle
411  * @param chid Desired chid of the channel, 0 to assign one automatically.
412  *
413  * @return Handle to the created channel.
414  */
415 static struct GNUNET_CADET_Channel *
416 create_channel (struct GNUNET_CADET_Handle *h, CADET_ChannelNumber chid)
417 {
418   struct GNUNET_CADET_Channel *ch;
419
420   ch = GNUNET_new (struct GNUNET_CADET_Channel);
421   GNUNET_CONTAINER_DLL_insert (h->channels_head, h->channels_tail, ch);
422   ch->cadet = h;
423   if (0 == chid)
424   {
425     ch->chid = h->next_chid;
426     while (NULL != retrieve_channel (h, h->next_chid))
427     {
428       h->next_chid++;
429       h->next_chid &= ~GNUNET_CADET_LOCAL_CHANNEL_ID_SERV;
430       h->next_chid |= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
431     }
432   }
433   else
434   {
435     ch->chid = chid;
436   }
437   ch->allow_send = GNUNET_NO;
438   return ch;
439 }
440
441
442 /**
443  * Destroy the specified channel.
444  * - Destroys all peers, calling the disconnect callback on each if needed
445  * - Cancels all outgoing traffic for that channel, calling respective notifys
446  * - Calls cleaner if channel was inbound
447  * - Frees all memory used
448  *
449  * @param ch Pointer to the channel.
450  * @param call_cleaner Whether to call the cleaner handler.
451  *
452  * @return Handle to the required channel or NULL if not found.
453  */
454 static void
455 destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner)
456 {
457   struct GNUNET_CADET_Handle *h;
458   struct GNUNET_CADET_TransmitHandle *th;
459   struct GNUNET_CADET_TransmitHandle *next;
460
461   LOG (GNUNET_ERROR_TYPE_DEBUG, " destroy_channel %X\n", ch->chid);
462
463   if (NULL == ch)
464   {
465     GNUNET_break (0);
466     return;
467   }
468   h = ch->cadet;
469
470   GNUNET_CONTAINER_DLL_remove (h->channels_head, h->channels_tail, ch);
471
472   /* signal channel destruction */
473   if ( (NULL != h->cleaner) && (0 != ch->peer) && (GNUNET_YES == call_cleaner) )
474   {
475     LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cleaner\n");
476     h->cleaner (h->cls, ch, ch->ctx);
477   }
478
479   /* check that clients did not leave messages behind in the queue */
480   for (th = h->th_head; NULL != th; th = next)
481   {
482     next = th->next;
483     if (th->channel != ch)
484       continue;
485     /* Clients should have aborted their requests already.
486      * Management traffic should be ok, as clients can't cancel that.
487      * If the service crashed and we are reconnecting, it's ok.
488      */
489     GNUNET_break (GNUNET_NO == th_is_payload (th)
490                   || GNUNET_NO == h->in_receive);
491     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
492
493     /* clean up request */
494     if (NULL != th->timeout_task)
495       GNUNET_SCHEDULER_cancel (th->timeout_task);
496     GNUNET_free (th);
497   }
498
499   /* if there are no more pending requests with cadet service, cancel active request */
500   /* Note: this should be unnecessary... */
501   if ((0 == message_ready_size (h)) && (NULL != h->th))
502   {
503     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
504     h->th = NULL;
505   }
506
507   if (0 != ch->peer)
508     GNUNET_PEER_change_rc (ch->peer, -1);
509   GNUNET_free (ch);
510   return;
511 }
512
513
514 /**
515  * Notify client that the transmission has timed out
516  *
517  * @param cls closure
518  */
519 static void
520 timeout_transmission (void *cls)
521 {
522   struct GNUNET_CADET_TransmitHandle *th = cls;
523   struct GNUNET_CADET_Handle *cadet = th->channel->cadet;
524
525   th->timeout_task = NULL;
526   th->channel->packet_size = 0;
527   GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th);
528   if (GNUNET_YES == th_is_payload (th))
529     GNUNET_break (0 == th->notify (th->notify_cls, 0, NULL));
530   GNUNET_free (th);
531   if ((0 == message_ready_size (cadet)) && (NULL != cadet->th))
532   {
533     /* nothing ready to transmit, no point in asking for transmission */
534     GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th);
535     cadet->th = NULL;
536   }
537 }
538
539
540 /**
541  * Add a transmit handle to the transmission queue and set the
542  * timeout if needed.
543  *
544  * @param h cadet handle with the queue head and tail
545  * @param th handle to the packet to be transmitted
546  */
547 static void
548 add_to_queue (struct GNUNET_CADET_Handle *h,
549               struct GNUNET_CADET_TransmitHandle *th)
550 {
551   GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
552   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us)
553     return;
554   th->timeout_task =
555       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
556                                     (th->timeout), &timeout_transmission, th);
557 }
558
559
560 /**
561  * Auxiliary function to send an already constructed packet to the service.
562  * Takes care of creating a new queue element, copying the message and
563  * calling the tmt_rdy function if necessary.
564  *
565  * @param h cadet handle
566  * @param msg message to transmit
567  * @param channel channel this send is related to (NULL if N/A)
568  */
569 static void
570 send_packet (struct GNUNET_CADET_Handle *h,
571              const struct GNUNET_MessageHeader *msg,
572              struct GNUNET_CADET_Channel *channel);
573
574
575 /**
576  * Send an ack on the channel to confirm the processing of a message.
577  *
578  * @param ch Channel on which to send the ACK.
579  */
580 static void
581 send_ack (struct GNUNET_CADET_Channel *ch)
582 {
583   struct GNUNET_CADET_LocalAck msg;
584
585   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid);
586   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
587   msg.header.size = htons (sizeof (msg));
588   msg.channel_id = htonl (ch->chid);
589
590   send_packet (ch->cadet, &msg.header, ch);
591   return;
592 }
593
594
595
596 /**
597  * Reconnect callback: tries to reconnect again after a failer previous
598  * reconnection.
599  *
600  * @param cls closure (cadet handle)
601  */
602 static void
603 reconnect_cbk (void *cls);
604
605
606 /**
607  * Send a connect packet to the service with the applications and types
608  * requested by the user.
609  *
610  * @param h The cadet handle.
611  *
612  */
613 static void
614 send_connect (struct GNUNET_CADET_Handle *h)
615 {
616   size_t size;
617
618   size = sizeof (struct GNUNET_CADET_ClientConnect);
619   size += h->n_ports * sizeof (uint32_t);
620   {
621     char buf[size] GNUNET_ALIGN;
622     struct GNUNET_CADET_ClientConnect *msg;
623     uint32_t *ports;
624     uint16_t i;
625
626     /* build connection packet */
627     msg = (struct GNUNET_CADET_ClientConnect *) buf;
628     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_CONNECT);
629     msg->header.size = htons (size);
630     ports = (uint32_t *) &msg[1];
631     for (i = 0; i < h->n_ports; i++)
632     {
633       ports[i] = htonl (h->ports[i]);
634       LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n",
635            h->ports[i]);
636     }
637     LOG (GNUNET_ERROR_TYPE_DEBUG,
638          "Sending %lu bytes long message with %u ports\n",
639          ntohs (msg->header.size), h->n_ports);
640     send_packet (h, &msg->header, NULL);
641   }
642 }
643
644
645 /**
646  * Reconnect to the service, retransmit all infomation to try to restore the
647  * original state.
648  *
649  * @param h handle to the cadet
650  *
651  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
652  */
653 static int
654 do_reconnect (struct GNUNET_CADET_Handle *h)
655 {
656   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
657   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
658   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
659   LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
660   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
661
662   /* disconnect */
663   if (NULL != h->th)
664   {
665     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
666     h->th = NULL;
667   }
668   if (NULL != h->client)
669   {
670     GNUNET_CLIENT_disconnect (h->client);
671   }
672
673   /* connect again */
674   h->client = GNUNET_CLIENT_connect ("cadet", h->cfg);
675   if (h->client == NULL)
676   {
677     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
678                                                       &reconnect_cbk, h);
679     h->reconnect_time =
680         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
681                                   GNUNET_TIME_relative_multiply
682                                   (h->reconnect_time, 2));
683     LOG (GNUNET_ERROR_TYPE_DEBUG, "Next retry in %s\n",
684          GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
685                                                  GNUNET_NO));
686     GNUNET_break (0);
687     return GNUNET_NO;
688   }
689   else
690   {
691     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
692   }
693   send_connect (h);
694   return GNUNET_YES;
695 }
696
697 /**
698  * Reconnect callback: tries to reconnect again after a failer previous
699  * reconnecttion
700  *
701  * @param cls closure (cadet handle)
702  */
703 static void
704 reconnect_cbk (void *cls)
705 {
706   struct GNUNET_CADET_Handle *h = cls;
707
708   h->reconnect_task = NULL;
709   do_reconnect (h);
710 }
711
712
713 /**
714  * Reconnect to the service, retransmit all infomation to try to restore the
715  * original state.
716  *
717  * @param h handle to the cadet
718  *
719  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
720  */
721 static void
722 reconnect (struct GNUNET_CADET_Handle *h)
723 {
724   struct GNUNET_CADET_Channel *ch;
725
726   LOG (GNUNET_ERROR_TYPE_DEBUG,
727        "Requested RECONNECT, destroying all channels\n");
728   h->in_receive = GNUNET_NO;
729   for (ch = h->channels_head; NULL != ch; ch = h->channels_head)
730     destroy_channel (ch, GNUNET_YES);
731   if (NULL == h->reconnect_task)
732     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
733                                                       &reconnect_cbk, h);
734 }
735
736
737 /******************************************************************************/
738 /***********************      RECEIVE HANDLERS     ****************************/
739 /******************************************************************************/
740
741 /**
742  * Process the new channel notification and add it to the channels in the handle
743  *
744  * @param h     The cadet handle
745  * @param msg   A message with the details of the new incoming channel
746  */
747 static void
748 process_channel_created (struct GNUNET_CADET_Handle *h,
749                          const struct GNUNET_CADET_ChannelMessage *msg)
750 {
751   struct GNUNET_CADET_Channel *ch;
752   CADET_ChannelNumber chid;
753   uint32_t port;
754
755   chid = ntohl (msg->channel_id);
756   port = ntohl (msg->port);
757   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming channel %X:%u\n", chid, port);
758   if (chid < GNUNET_CADET_LOCAL_CHANNEL_ID_SERV)
759   {
760     GNUNET_break (0);
761     return;
762   }
763   if (NULL != h->new_channel)
764   {
765     void *ctx;
766
767     ch = create_channel (h, chid);
768     ch->allow_send = GNUNET_NO;
769     ch->peer = GNUNET_PEER_intern (&msg->peer);
770     ch->cadet = h;
771     ch->chid = chid;
772     ch->port = port;
773     ch->options = ntohl (msg->opt);
774
775     LOG (GNUNET_ERROR_TYPE_DEBUG, "  created channel %p\n", ch);
776     ctx = h->new_channel (h->cls, ch, &msg->peer, ch->port, ch->options);
777     if (NULL != ctx)
778       ch->ctx = ctx;
779     LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
780   }
781   else
782   {
783     struct GNUNET_CADET_ChannelMessage d_msg;
784
785     LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n");
786
787     d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
788     d_msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelMessage));
789     d_msg.channel_id = msg->channel_id;
790     memset (&d_msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
791     d_msg.port = 0;
792     d_msg.opt = 0;
793
794     send_packet (h, &d_msg.header, NULL);
795   }
796   return;
797 }
798
799
800 /**
801  * Process the channel destroy notification and free associated resources
802  *
803  * @param h     The cadet handle
804  * @param msg   A message with the details of the channel being destroyed
805  */
806 static void
807 process_channel_destroy (struct GNUNET_CADET_Handle *h,
808                          const struct GNUNET_CADET_ChannelMessage *msg)
809 {
810   struct GNUNET_CADET_Channel *ch;
811   CADET_ChannelNumber chid;
812
813   chid = ntohl (msg->channel_id);
814   LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel %X Destroy from service\n", chid);
815   ch = retrieve_channel (h, chid);
816
817   if (NULL == ch)
818   {
819     LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X unknown\n", chid);
820     return;
821   }
822   destroy_channel (ch, GNUNET_YES);
823 }
824
825
826 /**
827  * Process the incoming data packets, call appropriate handlers.
828  *
829  * @param h         The cadet handle
830  * @param message   A message encapsulating the data
831  */
832 static void
833 process_incoming_data (struct GNUNET_CADET_Handle *h,
834                        const struct GNUNET_MessageHeader *message)
835 {
836   const struct GNUNET_MessageHeader *payload;
837   const struct GNUNET_CADET_MessageHandler *handler;
838   struct GNUNET_CADET_LocalData *dmsg;
839   struct GNUNET_CADET_Channel *ch;
840   size_t size;
841   unsigned int i;
842   uint16_t type;
843
844   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
845   dmsg = (struct GNUNET_CADET_LocalData *) message;
846   ch = retrieve_channel (h, ntohl (dmsg->id));
847   if (NULL == ch)
848   {
849     GNUNET_break (0);
850     return;
851   }
852
853   payload = (struct GNUNET_MessageHeader *) &dmsg[1];
854   LOG (GNUNET_ERROR_TYPE_DEBUG, "  %s data on channel %s [%X]\n",
855        GC_f2s (ch->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV),
856        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (dmsg->id));
857
858   size = ntohs (message->size);
859   LOG (GNUNET_ERROR_TYPE_DEBUG, "  %u bytes\n", size);
860
861   type = ntohs (payload->type);
862   size = ntohs (payload->size);
863   LOG (GNUNET_ERROR_TYPE_DEBUG, "  payload type %s\n", GC_m2s (type));
864   for (i = 0; i < h->n_handlers; i++)
865   {
866     handler = &h->message_handlers[i];
867     LOG (GNUNET_ERROR_TYPE_DEBUG, "    checking handler for type %u\n",
868          handler->type);
869     if (handler->type == type)
870     {
871       if (GNUNET_OK !=
872           handler->callback (h->cls, ch, &ch->ctx, payload))
873       {
874         LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
875         GNUNET_CADET_channel_destroy (ch);
876         return;
877       }
878       else
879       {
880         LOG (GNUNET_ERROR_TYPE_DEBUG,
881              "callback completed successfully\n");
882         return;
883       }
884     }
885   }
886 }
887
888
889 /**
890  * Process a local ACK message, enabling the client to send
891  * more data to the service.
892  *
893  * @param h Cadet handle.
894  * @param message Message itself.
895  */
896 static void
897 process_ack (struct GNUNET_CADET_Handle *h,
898              const struct GNUNET_MessageHeader *message)
899 {
900   struct GNUNET_CADET_LocalAck *msg;
901   struct GNUNET_CADET_Channel *ch;
902   CADET_ChannelNumber chid;
903
904   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
905   msg = (struct GNUNET_CADET_LocalAck *) message;
906   chid = ntohl (msg->channel_id);
907   ch = retrieve_channel (h, chid);
908   if (NULL == ch)
909   {
910     LOG (GNUNET_ERROR_TYPE_DEBUG, "ACK on unknown channel %X\n", chid);
911     return;
912   }
913   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on channel %X!\n", ch->chid);
914   ch->allow_send = GNUNET_YES;
915   if (NULL == h->th && 0 < ch->packet_size)
916   {
917     LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n");
918     h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, ch->packet_size,
919                                                  GNUNET_TIME_UNIT_FOREVER_REL,
920                                                  GNUNET_YES, &send_callback, h);
921   }
922 }
923
924
925 /*
926  * Process a local reply about info on all channels, pass info to the user.
927  *
928  * @param h Cadet handle.
929  * @param message Message itself.
930  */
931 // static void
932 // process_get_channels (struct GNUNET_CADET_Handle *h,
933 //                      const struct GNUNET_MessageHeader *message)
934 // {
935 //   struct GNUNET_CADET_LocalInfo *msg;
936 //
937 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
938 //
939 //   if (NULL == h->channels_cb)
940 //   {
941 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
942 //     return;
943 //   }
944 //
945 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
946 //   if (ntohs (message->size) !=
947 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
948 //        sizeof (struct GNUNET_PeerIdentity)))
949 //   {
950 //     GNUNET_break_op (0);
951 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
952 //                 "Get channels message: size %hu - expected %u\n",
953 //                 ntohs (message->size),
954 //                 sizeof (struct GNUNET_CADET_LocalInfo));
955 //     return;
956 //   }
957 //   h->channels_cb (h->channels_cls,
958 //                   ntohl (msg->channel_id),
959 //                   &msg->owner,
960 //                   &msg->destination);
961 // }
962
963
964
965 /*
966  * Process a local monitor_channel reply, pass info to the user.
967  *
968  * @param h Cadet handle.
969  * @param message Message itself.
970  */
971 // static void
972 // process_show_channel (struct GNUNET_CADET_Handle *h,
973 //                      const struct GNUNET_MessageHeader *message)
974 // {
975 //   struct GNUNET_CADET_LocalInfo *msg;
976 //   size_t esize;
977 //
978 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
979 //
980 //   if (NULL == h->channel_cb)
981 //   {
982 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
983 //     return;
984 //   }
985 //
986 //   /* Verify message sanity */
987 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
988 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
989 //   if (ntohs (message->size) != esize)
990 //   {
991 //     GNUNET_break_op (0);
992 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
993 //                 "Show channel message: size %hu - expected %u\n",
994 //                 ntohs (message->size),
995 //                 esize);
996 //
997 //     h->channel_cb (h->channel_cls, NULL, NULL);
998 //     h->channel_cb = NULL;
999 //     h->channel_cls = NULL;
1000 //
1001 //     return;
1002 //   }
1003 //
1004 //   h->channel_cb (h->channel_cls,
1005 //                  &msg->destination,
1006 //                  &msg->owner);
1007 // }
1008
1009
1010
1011 /**
1012  * Process a local reply about info on all tunnels, pass info to the user.
1013  *
1014  * @param h Cadet handle.
1015  * @param message Message itself.
1016  */
1017 static void
1018 process_get_peers (struct GNUNET_CADET_Handle *h,
1019                      const struct GNUNET_MessageHeader *message)
1020 {
1021   struct GNUNET_CADET_LocalInfoPeer *msg;
1022   uint16_t size;
1023
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Get Peer messasge received\n");
1025
1026   if (NULL == h->info_cb.peers_cb)
1027   {
1028     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  ignored\n");
1029     return;
1030   }
1031
1032   size = ntohs (message->size);
1033   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1034   {
1035     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1036     h->info_cb.peers_cb = NULL;
1037     h->info_cls = NULL;
1038     return;
1039   }
1040
1041   msg = (struct GNUNET_CADET_LocalInfoPeer *) message;
1042   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1043                        (int) ntohs (msg->tunnel),
1044                        (unsigned int ) ntohs (msg->paths),
1045                        0);
1046 }
1047
1048
1049 /**
1050  * Process a local peer info reply, pass info to the user.
1051  *
1052  * @param h Cadet handle.
1053  * @param message Message itself.
1054  */
1055 static void
1056 process_get_peer (struct GNUNET_CADET_Handle *h,
1057                   const struct GNUNET_MessageHeader *message)
1058 {
1059   struct GNUNET_CADET_LocalInfoPeer *msg;
1060   struct GNUNET_PeerIdentity *id;
1061   unsigned int epaths;
1062   unsigned int paths;
1063   unsigned int path_length;
1064   unsigned int i;
1065   int neighbor;
1066   size_t esize;
1067   size_t msize;
1068
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Info Peer messasge received\n");
1070   if (NULL == h->info_cb.peer_cb)
1071   {
1072     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  ignored\n");
1073     return;
1074   }
1075
1076   /* Verify message sanity */
1077   msg = (struct GNUNET_CADET_LocalInfoPeer *) message;
1078   esize = ntohs (message->size);
1079   msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1080   if (esize < msize)
1081   {
1082     GNUNET_break_op (0);
1083     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1084     goto clean_cls;
1085   }
1086   epaths = (unsigned int) ntohs (msg->paths);
1087   paths = 0;
1088   path_length = 0;
1089   neighbor = GNUNET_NO;
1090   id = (struct GNUNET_PeerIdentity *) &msg[1];
1091   for (i = 0; msize < esize; i++)
1092   {
1093     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&id[i]));
1094     msize += sizeof (struct GNUNET_PeerIdentity);
1095     path_length++;
1096     if (0 == memcmp (&id[i], &msg->destination,
1097                      sizeof (struct GNUNET_PeerIdentity)))
1098     {
1099       if (1 == path_length)
1100         neighbor = GNUNET_YES;
1101       path_length = 0;
1102       paths++;
1103     }
1104   }
1105   if (msize != esize)
1106   {
1107     GNUNET_break_op (0);
1108     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "m:%u, e: %u\n", msize, esize);
1109     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1110     goto clean_cls;
1111   }
1112   if (paths != epaths)
1113   {
1114     GNUNET_break_op (0);
1115     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1116     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1117     goto clean_cls;
1118   }
1119
1120   /* Call Callback with tunnel info. */
1121   id = (struct GNUNET_PeerIdentity *) &msg[1];
1122   h->info_cb.peer_cb (h->info_cls,
1123                       &msg->destination,
1124                       (int) ntohs (msg->tunnel),
1125                       neighbor,
1126                       paths,
1127                       id);
1128
1129   clean_cls:
1130   h->info_cb.peer_cb = NULL;
1131   h->info_cls = NULL;
1132 }
1133
1134
1135 /**
1136  * Process a local reply about info on all tunnels, pass info to the user.
1137  *
1138  * @param h Cadet handle.
1139  * @param message Message itself.
1140  */
1141 static void
1142 process_get_tunnels (struct GNUNET_CADET_Handle *h,
1143                      const struct GNUNET_MessageHeader *message)
1144 {
1145   struct GNUNET_CADET_LocalInfoTunnel *msg;
1146   uint16_t size;
1147
1148   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Get Tunnels messasge received\n");
1149
1150   if (NULL == h->info_cb.tunnels_cb)
1151   {
1152     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  ignored\n");
1153     return;
1154   }
1155
1156   size = ntohs (message->size);
1157   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1158   {
1159     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1160     h->info_cb.tunnels_cb = NULL;
1161     h->info_cls = NULL;
1162     return;
1163   }
1164
1165   msg = (struct GNUNET_CADET_LocalInfoTunnel *) message;
1166   h->info_cb.tunnels_cb (h->info_cls, &msg->destination,
1167                          ntohl (msg->channels), ntohl (msg->connections),
1168                          ntohs (msg->estate), ntohs (msg->cstate));
1169
1170 }
1171
1172
1173 /**
1174  * Process a local tunnel info reply, pass info to the user.
1175  *
1176  * @param h Cadet handle.
1177  * @param message Message itself.
1178  */
1179 static void
1180 process_get_tunnel (struct GNUNET_CADET_Handle *h,
1181                     const struct GNUNET_MessageHeader *message)
1182 {
1183   struct GNUNET_CADET_LocalInfoTunnel *msg;
1184   size_t esize;
1185   size_t msize;
1186   unsigned int ch_n;
1187   unsigned int c_n;
1188   struct GNUNET_CADET_Hash *conns;
1189   CADET_ChannelNumber *chns;
1190
1191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Get Tunnel messasge received\n");
1192   if (NULL == h->info_cb.tunnel_cb)
1193   {
1194     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  ignored\n");
1195     return;
1196   }
1197
1198   /* Verify message sanity */
1199   msg = (struct GNUNET_CADET_LocalInfoTunnel *) message;
1200   msize = ntohs (message->size);
1201   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1202   if (esize > msize)
1203   {
1204     GNUNET_break_op (0);
1205     h->info_cb.tunnel_cb (h->info_cls, NULL, 0, 0, NULL, NULL, 0, 0);
1206     goto clean_cls;
1207   }
1208   ch_n = ntohl (msg->channels);
1209   c_n = ntohl (msg->connections);
1210   esize += ch_n * sizeof (CADET_ChannelNumber);
1211   esize += c_n * sizeof (struct GNUNET_CADET_Hash);
1212   if (msize != esize)
1213   {
1214     GNUNET_break_op (0);
1215     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "m:%u, e: %u (%u ch, %u conn)\n",
1216                 msize, esize, ch_n, c_n);
1217     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u (%u ch, %u conn)\n",
1218                 sizeof (struct GNUNET_CADET_LocalInfoTunnel),
1219                 sizeof (CADET_ChannelNumber), sizeof (struct GNUNET_HashCode));
1220     h->info_cb.tunnel_cb (h->info_cls, NULL, 0, 0, NULL, NULL, 0, 0);
1221     goto clean_cls;
1222   }
1223
1224   /* Call Callback with tunnel info. */
1225   conns = (struct GNUNET_CADET_Hash *) &msg[1];
1226   chns = (CADET_ChannelNumber *) &conns[c_n];
1227   h->info_cb.tunnel_cb (h->info_cls, &msg->destination,
1228                 ch_n, c_n, chns, conns,
1229                 ntohs (msg->estate), ntohs (msg->cstate));
1230
1231 clean_cls:
1232   h->info_cb.tunnel_cb = NULL;
1233   h->info_cls = NULL;
1234 }
1235
1236
1237 /**
1238  * Function to process all messages received from the service
1239  *
1240  * @param cls closure
1241  * @param msg message received, NULL on timeout or fatal error
1242  */
1243 static void
1244 msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1245 {
1246   struct GNUNET_CADET_Handle *h = cls;
1247   uint16_t type;
1248
1249   if (msg == NULL)
1250   {
1251     LOG (GNUNET_ERROR_TYPE_DEBUG,
1252          "Cadet service disconnected, reconnecting\n", h);
1253     reconnect (h);
1254     return;
1255   }
1256   type = ntohs (msg->type);
1257   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1258   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1259        GC_m2s (type));
1260   switch (type)
1261   {
1262     /* Notify of a new incoming channel */
1263   case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1264     process_channel_created (h, (struct GNUNET_CADET_ChannelMessage *) msg);
1265     break;
1266     /* Notify of a channel disconnection */
1267   case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: /* TODO separate(gid problem)*/
1268   case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
1269     process_channel_destroy (h, (struct GNUNET_CADET_ChannelMessage *) msg);
1270     break;
1271   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA:
1272     process_incoming_data (h, msg);
1273     break;
1274   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK:
1275     process_ack (h, msg);
1276     break;
1277 //   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1278 //     process_get_channels (h, msg);
1279 //     break;
1280 //   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1281 //     process_show_channel (h, msg);
1282 //     break;
1283   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1284     process_get_peers (h, msg);
1285     break;
1286   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1287     process_get_peer (h, msg);
1288     break;
1289   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1290     process_get_tunnels (h, msg);
1291     break;
1292   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1293     process_get_tunnel (h, msg);
1294     break;
1295 //   case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1296 //     process_show_channel (h, msg);
1297 //     break;
1298   default:
1299     /* We shouldn't get any other packages, log and ignore */
1300     LOG (GNUNET_ERROR_TYPE_WARNING,
1301          "unsolicited message form service (type %s)\n",
1302          GC_m2s (ntohs (msg->type)));
1303   }
1304   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1305   if (GNUNET_YES == h->in_receive)
1306   {
1307     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1308                            GNUNET_TIME_UNIT_FOREVER_REL);
1309   }
1310   else
1311   {
1312     LOG (GNUNET_ERROR_TYPE_DEBUG,
1313          "in receive off, not calling CLIENT_receive\n");
1314   }
1315 }
1316
1317
1318 /******************************************************************************/
1319 /************************       SEND FUNCTIONS     ****************************/
1320 /******************************************************************************/
1321
1322 /**
1323  * Function called to send a message to the service.
1324  * "buf" will be NULL and "size" zero if the socket was closed for writing in
1325  * the meantime.
1326  *
1327  * @param cls closure, the cadet handle
1328  * @param size number of bytes available in buf
1329  * @param buf where the callee should write the connect message
1330  * @return number of bytes written to buf
1331  */
1332 static size_t
1333 send_callback (void *cls, size_t size, void *buf)
1334 {
1335   struct GNUNET_CADET_Handle *h = cls;
1336   struct GNUNET_CADET_TransmitHandle *th;
1337   struct GNUNET_CADET_TransmitHandle *next;
1338   struct GNUNET_CADET_Channel *ch;
1339   char *cbuf = buf;
1340   size_t tsize;
1341   size_t psize;
1342   size_t nsize;
1343
1344   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1345   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback, buffer %u\n", size);
1346   if ((0 == size) || (NULL == buf))
1347   {
1348     LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1349     reconnect (h);
1350     h->th = NULL;
1351     return 0;
1352   }
1353   tsize = 0;
1354   next = h->th_head;
1355   nsize = message_ready_size (h);
1356   while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1357   {
1358     ch = th->channel;
1359     if (GNUNET_YES == th_is_payload (th))
1360     {
1361       struct GNUNET_CADET_LocalData *dmsg;
1362       struct GNUNET_MessageHeader *mh;
1363
1364       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload, %u bytes on %X (%p)\n",
1365            th->size, ch->chid, ch);
1366       if (GNUNET_NO == ch->allow_send)
1367       {
1368         /* This channel is not ready to transmit yet, Try the next message */
1369         next = th->next;
1370         continue;
1371       }
1372       ch->packet_size = 0;
1373       GNUNET_assert (size >= th->size);
1374       dmsg = (struct GNUNET_CADET_LocalData *) cbuf;
1375       mh = (struct GNUNET_MessageHeader *) &dmsg[1];
1376       psize = th->notify (th->notify_cls, size - DATA_OVERHEAD, mh);
1377
1378       if (psize > 0)
1379       {
1380         GNUNET_assert (sizeof (struct GNUNET_MessageHeader) <= psize);
1381         psize += DATA_OVERHEAD;
1382         GNUNET_assert (size >= psize);
1383         dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1384         dmsg->header.size = htons (psize);
1385         dmsg->id = htonl (ch->chid);
1386         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  sending, type %s\n",
1387              GC_m2s (ntohs (mh->type)));
1388         ch->allow_send = GNUNET_NO;
1389       }
1390       else
1391       {
1392         LOG (GNUNET_ERROR_TYPE_DEBUG,
1393              "#  callback returned size 0, "
1394              "application canceled transmission\n");
1395       }
1396     }
1397     else
1398     {
1399       const struct GNUNET_MessageHeader *mh;
1400
1401       mh = (const struct GNUNET_MessageHeader *) &th[1];
1402       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  cadet internal traffic, type %s\n",
1403            GC_m2s (ntohs (mh->type)));
1404       memcpy (cbuf, &th[1], th->size);
1405       psize = th->size;
1406     }
1407     GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= psize);
1408     if (th->timeout_task != NULL)
1409       GNUNET_SCHEDULER_cancel (th->timeout_task);
1410     next = th->next;
1411     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1412     GNUNET_free (th);
1413     nsize = message_ready_size (h);
1414     cbuf += psize;
1415     size -= psize;
1416     tsize += psize;
1417   }
1418   LOG (GNUNET_ERROR_TYPE_DEBUG, "#  total size: %u\n", tsize);
1419   h->th = NULL;
1420   size = message_ready_size (h);
1421   if (0 != size)
1422   {
1423     LOG (GNUNET_ERROR_TYPE_DEBUG, "#  next size: %u\n", size);
1424     h->th =
1425         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1426                                              GNUNET_TIME_UNIT_FOREVER_REL,
1427                                              GNUNET_YES, &send_callback, h);
1428   }
1429   else
1430   {
1431     if (NULL != h->th_head)
1432       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing ready to transmit\n");
1433     else
1434       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing left to transmit\n");
1435   }
1436   if (GNUNET_NO == h->in_receive)
1437   {
1438     LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1439     h->in_receive = GNUNET_YES;
1440     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1441                            GNUNET_TIME_UNIT_FOREVER_REL);
1442   }
1443   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback() END\n");
1444   return tsize;
1445 }
1446
1447
1448 /**
1449  * Auxiliary function to send an already constructed packet to the service.
1450  * Takes care of creating a new queue element, copying the message and
1451  * calling the tmt_rdy function if necessary.
1452  *
1453  * @param h cadet handle
1454  * @param msg message to transmit
1455  * @param channel channel this send is related to (NULL if N/A)
1456  */
1457 static void
1458 send_packet (struct GNUNET_CADET_Handle *h,
1459              const struct GNUNET_MessageHeader *msg,
1460              struct GNUNET_CADET_Channel *channel)
1461 {
1462   struct GNUNET_CADET_TransmitHandle *th;
1463   size_t msize;
1464
1465   LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1466        GC_m2s(ntohs(msg->type)));
1467   msize = ntohs (msg->size);
1468   th = GNUNET_malloc (sizeof (struct GNUNET_CADET_TransmitHandle) + msize);
1469   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1470   th->size = msize;
1471   th->channel = channel;
1472   memcpy (&th[1], msg, msize);
1473   add_to_queue (h, th);
1474   if (NULL != h->th)
1475     return;
1476   LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
1477   h->th =
1478       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1479                                            GNUNET_TIME_UNIT_FOREVER_REL,
1480                                            GNUNET_YES, &send_callback, h);
1481 }
1482
1483
1484 /******************************************************************************/
1485 /**********************      API CALL DEFINITIONS     *************************/
1486 /******************************************************************************/
1487
1488 struct GNUNET_CADET_Handle *
1489 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1490                      GNUNET_CADET_InboundChannelNotificationHandler new_channel,
1491                      GNUNET_CADET_ChannelEndHandler cleaner,
1492                      const struct GNUNET_CADET_MessageHandler *handlers,
1493                      const uint32_t *ports)
1494 {
1495   struct GNUNET_CADET_Handle *h;
1496
1497   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n");
1498   h = GNUNET_new (struct GNUNET_CADET_Handle);
1499   LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
1500   h->cfg = cfg;
1501   h->new_channel = new_channel;
1502   h->cleaner = cleaner;
1503   h->client = GNUNET_CLIENT_connect ("cadet", cfg);
1504   if (h->client == NULL)
1505   {
1506     GNUNET_break (0);
1507     GNUNET_free (h);
1508     return NULL;
1509   }
1510   h->cls = cls;
1511   h->message_handlers = handlers;
1512   h->ports = ports;
1513   h->next_chid = GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
1514   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1515   h->reconnect_task = NULL;
1516
1517   if (NULL != ports && ports[0] != 0 && NULL == new_channel)
1518   {
1519     GNUNET_break (0);
1520     LOG (GNUNET_ERROR_TYPE_DEBUG,
1521          "no new channel handler given, ports parameter is useless!!\n");
1522   }
1523   if ((NULL == ports || ports[0] == 0) && NULL != new_channel)
1524   {
1525     GNUNET_break (0);
1526     LOG (GNUNET_ERROR_TYPE_DEBUG,
1527          "no ports given, new channel handler will never be called!!\n");
1528   }
1529   /* count handlers */
1530   for (h->n_handlers = 0;
1531        handlers && handlers[h->n_handlers].type;
1532        h->n_handlers++) ;
1533   for (h->n_ports = 0;
1534        ports && ports[h->n_ports];
1535        h->n_ports++) ;
1536   send_connect (h);
1537   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect() END\n");
1538   return h;
1539 }
1540
1541
1542 void
1543 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1544 {
1545   struct GNUNET_CADET_Channel *ch;
1546   struct GNUNET_CADET_Channel *aux;
1547   struct GNUNET_CADET_TransmitHandle *th;
1548
1549   LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET DISCONNECT\n");
1550
1551   ch = handle->channels_head;
1552   while (NULL != ch)
1553   {
1554     aux = ch->next;
1555     if (ch->chid < GNUNET_CADET_LOCAL_CHANNEL_ID_SERV)
1556     {
1557       GNUNET_break (0);
1558       LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X not destroyed\n", ch->chid);
1559     }
1560     destroy_channel (ch, GNUNET_YES);
1561     ch = aux;
1562   }
1563   while ( (th = handle->th_head) != NULL)
1564   {
1565     struct GNUNET_MessageHeader *msg;
1566
1567     /* Make sure it is an allowed packet (everything else should have been
1568      * already canceled).
1569      */
1570     GNUNET_break (GNUNET_NO == th_is_payload (th));
1571     msg = (struct GNUNET_MessageHeader *) &th[1];
1572     switch (ntohs(msg->type))
1573     {
1574       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_CONNECT:
1575       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1576       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1577       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1578       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1579       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1580       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1581       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1582       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1583         break;
1584       default:
1585         GNUNET_break (0);
1586         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1587              GC_m2s (ntohs(msg->type)));
1588     }
1589
1590     GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th);
1591     GNUNET_free (th);
1592   }
1593
1594   if (NULL != handle->th)
1595   {
1596     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1597     handle->th = NULL;
1598   }
1599   if (NULL != handle->client)
1600   {
1601     GNUNET_CLIENT_disconnect (handle->client);
1602     handle->client = NULL;
1603   }
1604   if (NULL != handle->reconnect_task)
1605   {
1606     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1607     handle->reconnect_task = NULL;
1608   }
1609   GNUNET_free (handle);
1610 }
1611
1612
1613 /**
1614  * Create a new channel towards a remote peer.
1615  *
1616  * If the destination port is not open by any peer or the destination peer
1617  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1618  * for this channel.
1619  *
1620  * @param h cadet handle
1621  * @param channel_ctx client's channel context to associate with the channel
1622  * @param peer peer identity the channel should go to
1623  * @param port Port number.
1624  * @param options CadetOption flag field, with all desired option bits set to 1.
1625  *
1626  * @return handle to the channel
1627  */
1628 struct GNUNET_CADET_Channel *
1629 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1630                             void *channel_ctx,
1631                             const struct GNUNET_PeerIdentity *peer,
1632                             uint32_t port,
1633                             enum GNUNET_CADET_ChannelOption options)
1634 {
1635   struct GNUNET_CADET_Channel *ch;
1636   struct GNUNET_CADET_ChannelMessage msg;
1637
1638   LOG (GNUNET_ERROR_TYPE_DEBUG,
1639        "Creating new channel to %s:%u\n",
1640        GNUNET_i2s (peer), port);
1641   ch = create_channel (h, 0);
1642   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", ch);
1643   LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", ch->chid);
1644   ch->ctx = channel_ctx;
1645   ch->peer = GNUNET_PEER_intern (peer);
1646   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE);
1647   msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelMessage));
1648   msg.channel_id = htonl (ch->chid);
1649   msg.port = htonl (port);
1650   msg.peer = *peer;
1651   msg.opt = htonl (options);
1652   ch->allow_send = GNUNET_NO;
1653   send_packet (h, &msg.header, ch);
1654   return ch;
1655 }
1656
1657
1658 void
1659 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1660 {
1661   struct GNUNET_CADET_Handle *h;
1662   struct GNUNET_CADET_ChannelMessage msg;
1663   struct GNUNET_CADET_TransmitHandle *th;
1664
1665   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n");
1666   h = channel->cadet;
1667
1668   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
1669   msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelMessage));
1670   msg.channel_id = htonl (channel->chid);
1671   memset (&msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
1672   msg.port = 0;
1673   msg.opt = 0;
1674   th = h->th_head;
1675   while (th != NULL)
1676   {
1677     struct GNUNET_CADET_TransmitHandle *aux;
1678     if (th->channel == channel)
1679     {
1680       aux = th->next;
1681       if (GNUNET_YES == th_is_payload (th))
1682       {
1683         /* applications should cancel before destroying channel */
1684         GNUNET_break (0);
1685         LOG (GNUNET_ERROR_TYPE_WARNING,
1686              "Channel destroyed without cancelling transmission requests\n");
1687         th->notify (th->notify_cls, 0, NULL);
1688       }
1689       GNUNET_CADET_notify_transmit_ready_cancel (th);
1690       th = aux;
1691     }
1692     else
1693       th = th->next;
1694   }
1695
1696   destroy_channel (channel, GNUNET_YES);
1697   send_packet (h, &msg.header, NULL);
1698 }
1699
1700
1701 /**
1702  * Get information about a channel.
1703  *
1704  * @param channel Channel handle.
1705  * @param option Query (GNUNET_CADET_OPTION_*).
1706  * @param ... dependant on option, currently not used
1707  *
1708  * @return Union with an answer to the query.
1709  */
1710 const union GNUNET_CADET_ChannelInfo *
1711 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1712                               enum GNUNET_CADET_ChannelOption option, ...)
1713 {
1714   static int bool_flag;
1715   const union GNUNET_CADET_ChannelInfo *ret;
1716
1717   switch (option)
1718   {
1719     case GNUNET_CADET_OPTION_NOBUFFER:
1720     case GNUNET_CADET_OPTION_RELIABLE:
1721     case GNUNET_CADET_OPTION_OOORDER:
1722       if (0 != (option & channel->options))
1723         bool_flag = GNUNET_YES;
1724       else
1725         bool_flag = GNUNET_NO;
1726       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1727       break;
1728     case GNUNET_CADET_OPTION_PEER:
1729       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1730       break;
1731     default:
1732       GNUNET_break (0);
1733       return NULL;
1734   }
1735
1736   return ret;
1737 }
1738
1739 struct GNUNET_CADET_TransmitHandle *
1740 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int cork,
1741                                     struct GNUNET_TIME_Relative maxdelay,
1742                                     size_t notify_size,
1743                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
1744                                     void *notify_cls)
1745 {
1746   struct GNUNET_CADET_TransmitHandle *th;
1747
1748   GNUNET_assert (NULL != channel);
1749   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
1750   LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY\n");
1751   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on channel %X\n", channel->chid);
1752   LOG (GNUNET_ERROR_TYPE_DEBUG, "    allow_send %d\n", channel->allow_send);
1753   if (channel->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV)
1754     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
1755   else
1756     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to destination\n");
1757   LOG (GNUNET_ERROR_TYPE_DEBUG, "    payload size %u\n", notify_size);
1758   GNUNET_assert (NULL != notify);
1759   GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed
1760   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
1761   th->channel = channel;
1762   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1763   th->size = notify_size + DATA_OVERHEAD;
1764   channel->packet_size = th->size;
1765   LOG (GNUNET_ERROR_TYPE_DEBUG, "    total size %u\n", th->size);
1766   th->notify = notify;
1767   th->notify_cls = notify_cls;
1768   add_to_queue (channel->cadet, th);
1769   if (NULL != channel->cadet->th)
1770     return th;
1771   if (GNUNET_NO == channel->allow_send)
1772     return th;
1773   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call client notify tmt rdy\n");
1774   channel->cadet->th =
1775       GNUNET_CLIENT_notify_transmit_ready (channel->cadet->client, th->size,
1776                                            GNUNET_TIME_UNIT_FOREVER_REL,
1777                                            GNUNET_YES, &send_callback,
1778                                            channel->cadet);
1779   LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n");
1780   return th;
1781 }
1782
1783
1784 void
1785 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
1786 {
1787   struct GNUNET_CADET_Handle *cadet;
1788
1789   LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL\n");
1790   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on channel %X (%p)\n",
1791        th->channel->chid, th->channel);
1792   LOG (GNUNET_ERROR_TYPE_DEBUG, "    size %u bytes\n", th->size);
1793   th->channel->packet_size = 0;
1794   cadet = th->channel->cadet;
1795   if (th->timeout_task != NULL)
1796     GNUNET_SCHEDULER_cancel (th->timeout_task);
1797   GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th);
1798   GNUNET_free (th);
1799   if ((0 == message_ready_size (cadet)) && (NULL != cadet->th))
1800   {
1801     /* queue empty, no point in asking for transmission */
1802     GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th);
1803     cadet->th = NULL;
1804   }
1805   LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL END\n");
1806 }
1807
1808
1809 void
1810 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
1811 {
1812   send_ack (channel);
1813 }
1814
1815
1816 static void
1817 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
1818 {
1819   struct GNUNET_MessageHeader msg;
1820
1821   msg.size = htons (sizeof (msg));
1822   msg.type = htons (type);
1823   send_packet (h, &msg, NULL);
1824 }
1825
1826
1827 /**
1828  * Request a debug dump on the service's STDERR.
1829  *
1830  * WARNING: unstable API, likely to change in the future!
1831  *
1832  * @param h cadet handle
1833  */
1834 void
1835 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
1836 {
1837   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
1838   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
1839 }
1840
1841
1842 /**
1843  * Request information about peers known to the running cadet service.
1844  * The callback will be called for every peer known to the service.
1845  * Only one info request (of any kind) can be active at once.
1846  *
1847  *
1848  * WARNING: unstable API, likely to change in the future!
1849  *
1850  * @param h Handle to the cadet peer.
1851  * @param callback Function to call with the requested data.
1852  * @param callback_cls Closure for @c callback.
1853  *
1854  * @return #GNUNET_OK / #GNUNET_SYSERR
1855  */
1856 int
1857 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
1858                        GNUNET_CADET_PeersCB callback,
1859                        void *callback_cls)
1860 {
1861   if (NULL != h->info_cb.peers_cb)
1862   {
1863     GNUNET_break (0);
1864     return GNUNET_SYSERR;
1865   }
1866   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
1867   h->info_cb.peers_cb = callback;
1868   h->info_cls = callback_cls;
1869   return GNUNET_OK;
1870 }
1871
1872
1873 /**
1874  * Cancel a peer info request. The callback will not be called (anymore).
1875  *
1876  * WARNING: unstable API, likely to change in the future!
1877  *
1878  * @param h Cadet handle.
1879  *
1880  * @return Closure given to GNUNET_CADET_get_peers.
1881  */
1882 void *
1883 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
1884 {
1885   void *cls;
1886
1887   cls = h->info_cls;
1888   h->info_cb.peers_cb = NULL;
1889   h->info_cls = NULL;
1890   return cls;
1891 }
1892
1893
1894 /**
1895  * Request information about a peer known to the running cadet peer.
1896  * The callback will be called for the tunnel once.
1897  * Only one info request (of any kind) can be active at once.
1898  *
1899  * WARNING: unstable API, likely to change in the future!
1900  *
1901  * @param h Handle to the cadet peer.
1902  * @param id Peer whose tunnel to examine.
1903  * @param callback Function to call with the requested data.
1904  * @param callback_cls Closure for @c callback.
1905  *
1906  * @return #GNUNET_OK / #GNUNET_SYSERR
1907  */
1908 int
1909 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
1910                       const struct GNUNET_PeerIdentity *id,
1911                       GNUNET_CADET_PeerCB callback,
1912                       void *callback_cls)
1913 {
1914   struct GNUNET_CADET_LocalInfo msg;
1915
1916   if (NULL != h->info_cb.peer_cb)
1917   {
1918     GNUNET_break (0);
1919     return GNUNET_SYSERR;
1920   }
1921
1922   memset (&msg, 0, sizeof (msg));
1923   msg.header.size = htons (sizeof (msg));
1924   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
1925   msg.peer = *id;
1926   send_packet (h, &msg.header, NULL);
1927   h->info_cb.peer_cb = callback;
1928   h->info_cls = callback_cls;
1929   return GNUNET_OK;
1930 }
1931
1932
1933 /**
1934  * Request information about tunnels of the running cadet peer.
1935  * The callback will be called for every tunnel of the service.
1936  * Only one info request (of any kind) can be active at once.
1937  *
1938  * WARNING: unstable API, likely to change in the future!
1939  *
1940  * @param h Handle to the cadet peer.
1941  * @param callback Function to call with the requested data.
1942  * @param callback_cls Closure for @c callback.
1943  *
1944  * @return #GNUNET_OK / #GNUNET_SYSERR
1945  */
1946 int
1947 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
1948                          GNUNET_CADET_TunnelsCB callback,
1949                          void *callback_cls)
1950 {
1951   if (NULL != h->info_cb.tunnels_cb)
1952   {
1953     GNUNET_break (0);
1954     return GNUNET_SYSERR;
1955   }
1956   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
1957   h->info_cb.tunnels_cb = callback;
1958   h->info_cls = callback_cls;
1959   return GNUNET_OK;
1960 }
1961
1962
1963 /**
1964  * Cancel a monitor request. The monitor callback will not be called.
1965  *
1966  * @param h Cadet handle.
1967  *
1968  * @return Closure given to GNUNET_CADET_get_tunnels.
1969  */
1970 void *
1971 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
1972 {
1973   void *cls;
1974
1975   h->info_cb.tunnels_cb = NULL;
1976   cls = h->info_cls;
1977   h->info_cls = NULL;
1978
1979   return cls;
1980 }
1981
1982
1983
1984 /**
1985  * Request information about a tunnel of the running cadet peer.
1986  * The callback will be called for the tunnel once.
1987  * Only one info request (of any kind) can be active at once.
1988  *
1989  * WARNING: unstable API, likely to change in the future!
1990  *
1991  * @param h Handle to the cadet peer.
1992  * @param id Peer whose tunnel to examine.
1993  * @param callback Function to call with the requested data.
1994  * @param callback_cls Closure for @c callback.
1995  *
1996  * @return #GNUNET_OK / #GNUNET_SYSERR
1997  */
1998 int
1999 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2000                         const struct GNUNET_PeerIdentity *id,
2001                         GNUNET_CADET_TunnelCB callback,
2002                         void *callback_cls)
2003 {
2004   struct GNUNET_CADET_LocalInfo msg;
2005
2006   if (NULL != h->info_cb.tunnel_cb)
2007   {
2008     GNUNET_break (0);
2009     return GNUNET_SYSERR;
2010   }
2011
2012   memset (&msg, 0, sizeof (msg));
2013   msg.header.size = htons (sizeof (msg));
2014   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2015   msg.peer = *id;
2016   send_packet (h, &msg.header, NULL);
2017   h->info_cb.tunnel_cb = callback;
2018   h->info_cls = callback_cls;
2019   return GNUNET_OK;
2020 }
2021
2022
2023 /**
2024  * Request information about a specific channel of the running cadet peer.
2025  *
2026  * WARNING: unstable API, likely to change in the future!
2027  * FIXME Add destination option.
2028  *
2029  * @param h Handle to the cadet peer.
2030  * @param initiator ID of the owner of the channel.
2031  * @param channel_number Channel number.
2032  * @param callback Function to call with the requested data.
2033  * @param callback_cls Closure for @c callback.
2034  *
2035  * @return #GNUNET_OK / #GNUNET_SYSERR
2036  */
2037 int
2038 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2039                          struct GNUNET_PeerIdentity *initiator,
2040                          unsigned int channel_number,
2041                          GNUNET_CADET_ChannelCB callback,
2042                          void *callback_cls)
2043 {
2044   struct GNUNET_CADET_LocalInfo msg;
2045
2046   if (NULL != h->info_cb.channel_cb)
2047   {
2048     GNUNET_break (0);
2049     return GNUNET_SYSERR;
2050   }
2051
2052   msg.header.size = htons (sizeof (msg));
2053   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2054   msg.peer = *initiator;
2055   msg.channel_id = htonl (channel_number);
2056 //   msg.reserved = 0;
2057   send_packet (h, &msg.header, NULL);
2058   h->info_cb.channel_cb = callback;
2059   h->info_cls = callback_cls;
2060   return GNUNET_OK;
2061 }
2062
2063
2064 /**
2065  * Function called to notify a client about the connection
2066  * begin ready to queue more data.  "buf" will be
2067  * NULL and "size" zero if the connection was closed for
2068  * writing in the meantime.
2069  *
2070  * @param cls closure
2071  * @param size number of bytes available in buf
2072  * @param buf where the callee should write the message
2073  * @return number of bytes written to buf
2074  */
2075 static size_t
2076 cadet_mq_ntr (void *cls, size_t size,
2077              void *buf)
2078 {
2079   struct GNUNET_MQ_Handle *mq = cls;
2080   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2081   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2082   uint16_t msize;
2083
2084   state->th = NULL;
2085   if (NULL == buf)
2086   {
2087     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2088     return 0;
2089   }
2090   msize = ntohs (msg->size);
2091   GNUNET_assert (msize <= size);
2092   memcpy (buf, msg, msize);
2093   GNUNET_MQ_impl_send_continue (mq);
2094   return msize;
2095 }
2096
2097
2098 /**
2099  * Signature of functions implementing the
2100  * sending functionality of a message queue.
2101  *
2102  * @param mq the message queue
2103  * @param msg the message to send
2104  * @param impl_state state of the implementation
2105  */
2106 static void
2107 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
2108                     const struct GNUNET_MessageHeader *msg,
2109                     void *impl_state)
2110 {
2111   struct CadetMQState *state = impl_state;
2112
2113   GNUNET_assert (NULL == state->th);
2114   state->th =
2115       GNUNET_CADET_notify_transmit_ready (state->channel,
2116                                          /* FIXME: add option for corking */
2117                                          GNUNET_NO,
2118                                          GNUNET_TIME_UNIT_FOREVER_REL,
2119                                          ntohs (msg->size),
2120                                          &cadet_mq_ntr, mq);
2121
2122 }
2123
2124
2125 /**
2126  * Signature of functions implementing the
2127  * destruction of a message queue.
2128  * Implementations must not free 'mq', but should
2129  * take care of 'impl_state'.
2130  *
2131  * @param mq the message queue to destroy
2132  * @param impl_state state of the implementation
2133  */
2134 static void
2135 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
2136                        void *impl_state)
2137 {
2138   struct CadetMQState *state = impl_state;
2139
2140   if (NULL != state->th)
2141     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2142
2143   GNUNET_free (state);
2144 }
2145
2146
2147 /**
2148  * Create a message queue for a cadet channel.
2149  * The message queue can only be used to transmit messages,
2150  * not to receive them.
2151  *
2152  * @param channel the channel to create the message qeue for
2153  * @return a message queue to messages over the channel
2154  */
2155 struct GNUNET_MQ_Handle *
2156 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2157 {
2158   struct GNUNET_MQ_Handle *mq;
2159   struct CadetMQState *state;
2160
2161   state = GNUNET_new (struct CadetMQState);
2162   state->channel = channel;
2163
2164   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2165                                       &cadet_mq_destroy_impl,
2166                                       NULL, /* FIXME: cancel impl. */
2167                                       state,
2168                                       NULL, /* no msg handlers */
2169                                       NULL, /* no err handlers */
2170                                       NULL); /* no handler cls */
2171   return mq;
2172 }