a1c7e8461cd2222da94fd12ccf613eee3708b07b
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_cadet_service.h"
30 #include "cadet.h"
31 #include "cadet_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  *
42  * @deprecated
43  */
44 struct GNUNET_CADET_TransmitHandle
45 {
46   /**
47    * Double Linked list
48    */
49   struct GNUNET_CADET_TransmitHandle *next;
50
51   /**
52    * Double Linked list
53    */
54   struct GNUNET_CADET_TransmitHandle *prev;
55
56   /**
57    * Channel this message is sent on / for (may be NULL for control messages).
58    */
59   struct GNUNET_CADET_Channel *channel;
60
61   /**
62    * Request data task.
63    */
64   struct GNUNET_SCHEDULER_Task *request_data_task;
65
66   /**
67    * Callback to obtain the message to transmit, or NULL if we
68    * got the message in 'data'.  Notice that messages built
69    * by 'notify' need to be encapsulated with information about
70    * the 'target'.
71    */
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73
74   /**
75    * Closure for 'notify'
76    */
77   void *notify_cls;
78
79   /**
80    * Size of the payload.
81    */
82   size_t size;
83 };
84
85
86 union CadetInfoCB
87 {
88
89   /**
90    * Channel callback.
91    */
92   GNUNET_CADET_ChannelCB channel_cb;
93
94   /**
95    * Monitor callback
96    */
97   GNUNET_CADET_PeersCB peers_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeerCB peer_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_TunnelsCB tunnels_cb;
108
109   /**
110    * Tunnel callback.
111    */
112   GNUNET_CADET_TunnelCB tunnel_cb;
113 };
114
115
116 /**
117  * Opaque handle to the service.
118  */
119 struct GNUNET_CADET_Handle
120 {
121   /**
122    * Flag to indicate old or MQ API.
123    */
124   int mq_api;
125
126   /**
127    * Message queue (if available).
128    */
129   struct GNUNET_MQ_Handle *mq;
130
131   /**
132    * Set of handlers used for processing incoming messages in the channels
133    *
134    * @deprecated
135    */
136   const struct GNUNET_CADET_MessageHandler *message_handlers;
137
138   /**
139    * Number of handlers in the handlers array.
140    *
141    * @deprecated
142    */
143   unsigned int n_handlers;
144
145   /**
146    * Ports open.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap *ports;
149
150   /**
151    * Double linked list of the channels this client is connected to, head.
152    */
153   struct GNUNET_CADET_Channel *channels_head;
154
155   /**
156    * Double linked list of the channels this client is connected to, tail.
157    */
158   struct GNUNET_CADET_Channel *channels_tail;
159
160   /**
161    * Callback for inbound channel disconnection
162    */
163   GNUNET_CADET_ChannelEndHandler *cleaner;
164
165   /**
166    * Closure for all the handlers given by the client
167    *
168    * @deprecated
169    */
170   void *cls;
171
172   /**
173    * Messages to send to the service, head.
174    *
175    * @deprecated
176    */
177   struct GNUNET_CADET_TransmitHandle *th_head;
178
179   /**
180    * Messages to send to the service, tail.
181    *
182    * @deprecated
183    */
184   struct GNUNET_CADET_TransmitHandle *th_tail;
185
186   /**
187    * child of the next channel to create (to avoid reusing IDs often)
188    */
189   struct GNUNET_CADET_ClientChannelNumber next_ccn;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   struct GNUNET_SCHEDULER_Task * reconnect_task;
205
206   /**
207    * Callback for an info task (only one active at a time).
208    */
209   union CadetInfoCB info_cb;
210
211   /**
212    * Info callback closure for @c info_cb.
213    */
214   void *info_cls;
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_CADET_Peer
222 {
223   /**
224    * ID of the peer in short form
225    */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Channel this peer belongs to
230    */
231   struct GNUNET_CADET_Channel *t;
232 };
233
234
235 /**
236  * Opaque handle to a channel.
237  */
238 struct GNUNET_CADET_Channel
239 {
240   /**
241    * DLL next
242    */
243   struct GNUNET_CADET_Channel *next;
244
245   /**
246    * DLL prev
247    */
248   struct GNUNET_CADET_Channel *prev;
249
250   /**
251    * Handle to the cadet this channel belongs to
252    */
253   struct GNUNET_CADET_Handle *cadet;
254
255   /**
256    * Local ID of the channel
257    */
258   struct GNUNET_CADET_ClientChannelNumber ccn;
259
260   /**
261    * Channel's port, if incoming.
262    */
263   struct GNUNET_CADET_Port *incoming_port;
264
265   /**
266    * Other end of the channel.
267    */
268   GNUNET_PEER_Id peer;
269
270   /**
271    * Any data the caller wants to put in here
272    */
273   void *ctx;
274
275   /**
276    * Channel options: reliability, etc.
277    */
278   enum GNUNET_CADET_ChannelOption options;
279
280   /**
281    * Are we allowed to send to the service?
282    *
283    * @deprecated?
284    */
285   unsigned int allow_send;
286
287   /*****************************    MQ     ************************************/
288   /**
289    * Message Queue for the channel.
290    */
291   struct GNUNET_MQ_Handle *mq;
292
293   /**
294    * Window change handler.
295    */
296   GNUNET_CADET_WindowSizeEventHandler window_changes;
297
298   /**
299    * Disconnect handler.
300    */
301   GNUNET_CADET_DisconnectEventHandler disconnects;
302
303 };
304
305
306 /**
307  * Opaque handle to a port.
308  */
309 struct GNUNET_CADET_Port
310 {
311   /**
312    * Handle to the CADET session this port belongs to.
313    */
314   struct GNUNET_CADET_Handle *cadet;
315
316   /**
317    * Port ID.
318    */
319   struct GNUNET_HashCode *hash;
320
321   /**
322    * Callback handler for incoming channels on this port.
323    */
324   GNUNET_CADET_InboundChannelNotificationHandler *handler;
325
326   /**
327    * Closure for @a handler.
328    */
329   void *cls;
330
331   /*****************************    MQ     ************************************/
332
333   /**
334    * Port "number"
335    */
336   struct GNUNET_HashCode id;
337
338   /**
339    * Handler for incoming channels on this port
340    */
341   GNUNET_CADET_ConnectEventHandler connects;
342
343   /**
344    * Closure for @ref connects
345    */
346   void * connects_cls;
347
348   /**
349    * Window size change handler.
350    */
351   GNUNET_CADET_WindowSizeEventHandler window_changes;
352
353   /**
354    * Handler called when an incoming channel is destroyed..
355    */
356   GNUNET_CADET_DisconnectEventHandler disconnects;
357
358   /**
359    * Payload handlers for incoming channels.
360    */
361   const struct GNUNET_MQ_MessageHandler *handlers;
362 };
363
364
365 /**
366  * Implementation state for cadet's message queue.
367  */
368 struct CadetMQState
369 {
370   /**
371    * The current transmit handle, or NULL
372    * if no transmit is active.
373    */
374   struct GNUNET_CADET_TransmitHandle *th;
375
376   /**
377    * Channel to send the data over.
378    */
379   struct GNUNET_CADET_Channel *channel;
380 };
381
382
383
384 /******************************************************************************/
385 /*********************      FUNCTION DECLARATIONS     *************************/
386 /******************************************************************************/
387
388 /**
389  * Reconnect to the service, retransmit all infomation to try to restore the
390  * original state.
391  *
392  * @param h Handle to the CADET service.
393  */
394 static void
395 schedule_reconnect (struct GNUNET_CADET_Handle *h);
396
397
398 /**
399  * Reconnect callback: tries to reconnect again after a failer previous
400  * reconnection.
401  *
402  * @param cls Closure (cadet handle).
403  */
404 static void
405 reconnect_cbk (void *cls);
406
407
408 /**
409  * Reconnect to the service, retransmit all infomation to try to restore the
410  * original state.
411  *
412  * @param h handle to the cadet
413  */
414 static void
415 reconnect (struct GNUNET_CADET_Handle *h);
416
417
418 /******************************************************************************/
419 /***********************     AUXILIARY FUNCTIONS      *************************/
420 /******************************************************************************/
421
422 /**
423  * Check if transmission is a payload packet.
424  *
425  * @param th Transmission handle.
426  *
427  * @return #GNUNET_YES if it is a payload packet,
428  *         #GNUNET_NO if it is a cadet management packet.
429  */
430 static int
431 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
432 {
433   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
434 }
435
436
437 /**
438  * Find the Port struct for a hash.
439  *
440  * @param h CADET handle.
441  * @param hash HashCode for the port number.
442  *
443  * @return The port handle if known, NULL otherwise.
444  */
445 static struct GNUNET_CADET_Port *
446 find_port (const struct GNUNET_CADET_Handle *h,
447            const struct GNUNET_HashCode *hash)
448 {
449   struct GNUNET_CADET_Port *p;
450
451   p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
452
453   return p;
454 }
455
456
457 /**
458  * Get the channel handler for the channel specified by id from the given handle
459  *
460  * @param h Cadet handle
461  * @param ccn ID of the wanted channel
462  * @return handle to the required channel or NULL if not found
463  */
464 static struct GNUNET_CADET_Channel *
465 retrieve_channel (struct GNUNET_CADET_Handle *h,
466                   struct GNUNET_CADET_ClientChannelNumber ccn)
467 {
468   struct GNUNET_CADET_Channel *ch;
469
470   for (ch = h->channels_head; NULL != ch; ch = ch->next)
471     if (ch->ccn.channel_of_client == ccn.channel_of_client)
472       return ch;
473   return NULL;
474 }
475
476
477 /**
478  * Create a new channel and insert it in the channel list of the cadet handle
479  *
480  * @param h Cadet handle
481  * @param ccn Desired ccn of the channel, 0 to assign one automatically.
482  *
483  * @return Handle to the created channel.
484  */
485 static struct GNUNET_CADET_Channel *
486 create_channel (struct GNUNET_CADET_Handle *h,
487                 struct GNUNET_CADET_ClientChannelNumber ccn)
488 {
489   struct GNUNET_CADET_Channel *ch;
490
491   ch = GNUNET_new (struct GNUNET_CADET_Channel);
492   GNUNET_CONTAINER_DLL_insert (h->channels_head,
493                                h->channels_tail,
494                                ch);
495   ch->cadet = h;
496   if (0 == ccn.channel_of_client)
497   {
498     ch->ccn = h->next_ccn;
499     while (NULL != retrieve_channel (h,
500                                      h->next_ccn))
501     {
502       h->next_ccn.channel_of_client
503         = htonl (1 + ntohl (h->next_ccn.channel_of_client));
504       if (0 == ntohl (h->next_ccn.channel_of_client))
505         h->next_ccn.channel_of_client
506           = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
507     }
508   }
509   else
510   {
511     ch->ccn = ccn;
512   }
513   return ch;
514 }
515
516
517 /**
518  * Destroy the specified channel.
519  * - Destroys all peers, calling the disconnect callback on each if needed
520  * - Cancels all outgoing traffic for that channel, calling respective notifys
521  * - Calls cleaner if channel was inbound
522  * - Frees all memory used
523  *
524  * @param ch Pointer to the channel.
525  * @param call_cleaner Whether to call the cleaner handler.
526  *
527  * @return Handle to the required channel or NULL if not found.
528  */
529 static void
530 destroy_channel (struct GNUNET_CADET_Channel *ch)
531 {
532   struct GNUNET_CADET_Handle *h;
533   struct GNUNET_CADET_TransmitHandle *th;
534   struct GNUNET_CADET_TransmitHandle *next;
535
536   if (NULL == ch)
537   {
538     GNUNET_break (0);
539     return;
540   }
541   h = ch->cadet;
542   LOG (GNUNET_ERROR_TYPE_DEBUG,
543        " destroy_channel %X of %p\n",
544        ch->ccn,
545        h);
546
547   GNUNET_CONTAINER_DLL_remove (h->channels_head,
548                                h->channels_tail,
549                                ch);
550
551   /* signal channel destruction */
552   if (0 != ch->peer)
553   {
554     if (NULL != h->cleaner)
555     {
556       /** @a deprecated  */
557       LOG (GNUNET_ERROR_TYPE_DEBUG,
558           " calling cleaner\n");
559       h->cleaner (h->cls, ch, ch->ctx);
560     }
561     else if (NULL != ch->disconnects)
562     {
563       LOG (GNUNET_ERROR_TYPE_DEBUG,
564           " calling disconnect handler\n");
565       ch->disconnects (ch->ctx, ch);
566     }
567     else
568     {
569       /* Application won't be aware of the channel destruction and use
570        * a pointer to free'd memory.
571        */
572       GNUNET_assert (0);
573     }
574   }
575
576   /* check that clients did not leave messages behind in the queue */
577   for (th = h->th_head; NULL != th; th = next)
578   {
579     next = th->next;
580     if (th->channel != ch)
581       continue;
582     /* Clients should have aborted their requests already.
583      * Management traffic should be ok, as clients can't cancel that.
584      * If the service crashed and we are reconnecting, it's ok.
585      */
586     GNUNET_break (GNUNET_NO == th_is_payload (th));
587     GNUNET_CADET_notify_transmit_ready_cancel (th);
588   }
589
590   if (0 != ch->peer)
591     GNUNET_PEER_change_rc (ch->peer, -1);
592   GNUNET_free (ch);
593 }
594
595
596 /**
597  * Add a transmit handle to the transmission queue and set the
598  * timeout if needed.
599  *
600  * @param h cadet handle with the queue head and tail
601  * @param th handle to the packet to be transmitted
602  */
603 static void
604 add_to_queue (struct GNUNET_CADET_Handle *h,
605               struct GNUNET_CADET_TransmitHandle *th)
606 {
607   GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
608                                     h->th_tail,
609                                     th);
610 }
611
612
613 /**
614  * Remove a transmit handle from the transmission queue, if present.
615  *
616  * Safe to call even if not queued.
617  *
618  * @param th handle to the packet to be unqueued.
619  */
620 static void
621 remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
622 {
623   struct GNUNET_CADET_Handle *h = th->channel->cadet;
624
625   /* It might or might not have been queued (rarely not), but check anyway. */
626   if (NULL != th->next || h->th_tail == th)
627   {
628     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
629   }
630 }
631
632
633 /******************************************************************************/
634 /***********************      MQ API CALLBACKS     ****************************/
635 /******************************************************************************/
636
637
638 /**
639  * Implement sending functionality of a message queue for
640  * us sending messages to a peer.
641  *
642  * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
643  * in order to label the message with the channel ID and send the
644  * encapsulated message to the service.
645  *
646  * @param mq the message queue
647  * @param msg the message to send
648  * @param impl_state state of the implementation
649  */
650 static void
651 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
652                     const struct GNUNET_MessageHeader *msg,
653                     void *impl_state)
654 {
655   struct GNUNET_CADET_Channel *ch = impl_state;
656   struct GNUNET_CADET_Handle *h = ch->cadet;
657   uint16_t msize;
658   struct GNUNET_MQ_Envelope *env;
659   struct GNUNET_CADET_LocalData *cadet_msg;
660
661
662   if (NULL == h->mq)
663   {
664     /* We're currently reconnecting, pretend this worked */
665     GNUNET_MQ_impl_send_continue (mq);
666     return;
667   }
668
669   /* check message size for sanity */
670   msize = ntohs (msg->size);
671   if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
672   {
673     GNUNET_break (0);
674     GNUNET_MQ_impl_send_continue (mq);
675     return;
676   }
677
678   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
679                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
680                                  msg);
681   cadet_msg->ccn = ch->ccn;
682   GNUNET_MQ_send (h->mq, env);
683   GNUNET_MQ_impl_send_continue (mq);
684 }
685
686
687 /**
688  * Handle destruction of a message queue.  Implementations must not
689  * free @a mq, but should take care of @a impl_state.
690  *
691  * @param mq the message queue to destroy
692  * @param impl_state state of the implementation
693  */
694 static void
695 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
696                        void *impl_state)
697 {
698   struct GNUNET_CADET_Channel *ch = impl_state;
699
700   GNUNET_assert (mq == ch->mq);
701   ch->mq = NULL;
702 }
703
704
705 /**
706  * We had an error processing a message we forwarded from a peer to
707  * the CADET service.  We should just complain about it but otherwise
708  * continue processing.
709  *
710  * @param cls closure
711  * @param error error code
712  */
713 static void
714 cadet_mq_error_handler (void *cls,
715                         enum GNUNET_MQ_Error error)
716 {
717   GNUNET_break_op (0);
718 }
719
720
721 /**
722  * Implementation function that cancels the currently sent message.
723  * Should basically undo whatever #mq_send_impl() did.
724  *
725  * @param mq message queue
726  * @param impl_state state specific to the implementation
727  */
728 static void
729 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
730                      void *impl_state)
731 {
732   struct GNUNET_CADET_Channel *ch = impl_state;
733
734   LOG (GNUNET_ERROR_TYPE_WARNING,
735        "Cannot cancel mq message on channel %X of %p\n",
736        ch->ccn.channel_of_client, ch->cadet);
737
738   GNUNET_break (0);
739 }
740
741
742 /******************************************************************************/
743 /***********************      RECEIVE HANDLERS     ****************************/
744 /******************************************************************************/
745
746
747 /**
748  * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
749  * request the data to send over MQ. Since MQ manages the queue, this function
750  * is scheduled immediatly after a transmit ready notification.
751  *
752  * @param cls Closure (transmit handle).
753  */
754 static void
755 request_data (void *cls)
756 {
757   struct GNUNET_CADET_TransmitHandle *th = cls;
758   struct GNUNET_CADET_LocalData *msg;
759   struct GNUNET_MQ_Envelope *env;
760   size_t osize;
761
762   LOG (GNUNET_ERROR_TYPE_DEBUG,
763        "Requesting Data: %u bytes (allow send is %u)\n",
764        th->size,
765        th->channel->allow_send);
766
767   GNUNET_assert (0 < th->channel->allow_send);
768   th->channel->allow_send--;
769   /* NOTE: we may be allowed to send another packet immediately,
770      albeit the current logic waits for the ACK. */
771   th->request_data_task = NULL;
772   remove_from_queue (th);
773
774   env = GNUNET_MQ_msg_extra (msg,
775                              th->size,
776                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
777   msg->ccn = th->channel->ccn;
778   osize = th->notify (th->notify_cls,
779                       th->size,
780                       &msg[1]);
781   GNUNET_assert (osize == th->size);
782
783   GNUNET_MQ_send (th->channel->cadet->mq,
784                   env);
785   GNUNET_free (th);
786 }
787
788
789 /**
790  * Process the new channel notification and add it to the channels in the handle
791  *
792  * @param h     The cadet handle
793  * @param msg   A message with the details of the new incoming channel
794  */
795 static void
796 handle_channel_created (void *cls,
797                         const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
798 {
799   struct GNUNET_CADET_Handle *h = cls;
800   struct GNUNET_CADET_Channel *ch;
801   struct GNUNET_CADET_Port *port;
802   const struct GNUNET_HashCode *port_number;
803   struct GNUNET_CADET_ClientChannelNumber ccn;
804
805   ccn = msg->ccn;
806   port_number = &msg->port;
807   if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
808   {
809     GNUNET_break (0);
810     return;
811   }
812   port = find_port (h, port_number);
813   if (NULL == port)
814   {
815     /* We could have closed the port but the service didn't know about it yet
816      * This is not an error.
817      */
818     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
819     struct GNUNET_MQ_Envelope *env;
820
821     GNUNET_break (0);
822     LOG (GNUNET_ERROR_TYPE_DEBUG,
823          "No handler for incoming channel %X (on port %s, recently closed?)\n",
824          ntohl (ccn.channel_of_client),
825          GNUNET_h2s (port_number));
826     env = GNUNET_MQ_msg (d_msg,
827                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
828     d_msg->ccn = msg->ccn;
829     GNUNET_MQ_send (h->mq,
830                     env);
831     return;
832   }
833
834   ch = create_channel (h,
835                        ccn);
836   ch->peer = GNUNET_PEER_intern (&msg->peer);
837   ch->cadet = h;
838   ch->ccn = ccn;
839   ch->incoming_port = port;
840   ch->options = ntohl (msg->opt);
841   LOG (GNUNET_ERROR_TYPE_DEBUG,
842        "Creating incoming channel %X [%s] %p\n",
843        ntohl (ccn.channel_of_client),
844        GNUNET_h2s (port_number),
845        ch);
846
847   if (NULL != port->handler)
848   {
849     /** @deprecated */
850     /* Old style API */
851     ch->ctx = port->handler (port->cls,
852                             ch,
853                             &msg->peer,
854                             port->hash,
855                             ch->options);
856   } else {
857     /* MQ API */
858     GNUNET_assert (NULL != port->connects);
859     ch->window_changes = port->window_changes;
860     ch->disconnects = port->disconnects;
861     ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
862                                             &cadet_mq_destroy_impl,
863                                             &cadet_mq_cancel_impl,
864                                             ch,
865                                             port->handlers,
866                                             &cadet_mq_error_handler,
867                                             ch);
868     ch->ctx = port->connects (port->cadet->cls,
869                               ch,
870                               &msg->peer);
871     GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
872   }
873 }
874
875
876 /**
877  * Process the channel destroy notification and free associated resources
878  *
879  * @param h     The cadet handle
880  * @param msg   A message with the details of the channel being destroyed
881  */
882 static void
883 handle_channel_destroy (void *cls,
884                         const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
885 {
886   struct GNUNET_CADET_Handle *h = cls;
887   struct GNUNET_CADET_Channel *ch;
888   struct GNUNET_CADET_ClientChannelNumber ccn;
889
890   ccn = msg->ccn;
891   LOG (GNUNET_ERROR_TYPE_DEBUG,
892        "Channel %X Destroy from service\n",
893        ntohl (ccn.channel_of_client));
894   ch = retrieve_channel (h,
895                          ccn);
896
897   if (NULL == ch)
898   {
899     LOG (GNUNET_ERROR_TYPE_DEBUG,
900          "channel %X unknown\n",
901          ntohl (ccn.channel_of_client));
902     return;
903   }
904   destroy_channel (ch);
905 }
906
907
908 /**
909  * Check that message received from CADET service is well-formed.
910  *
911  * @param cls the `struct GNUNET_CADET_Handle`
912  * @param message the message we got
913  * @return #GNUNET_OK if the message is well-formed,
914  *         #GNUNET_SYSERR otherwise
915  */
916 static int
917 check_local_data (void *cls,
918                   const struct GNUNET_CADET_LocalData *message)
919 {
920   struct GNUNET_CADET_Handle *h = cls;
921   struct GNUNET_CADET_Channel *ch;
922   uint16_t size;
923
924   size = ntohs (message->header.size);
925   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
926   {
927     GNUNET_break_op (0);
928     return GNUNET_SYSERR;
929   }
930
931   ch = retrieve_channel (h,
932                          message->ccn);
933   if (NULL == ch)
934   {
935     GNUNET_break_op (0);
936     return GNUNET_SYSERR;
937   }
938
939   return GNUNET_OK;
940 }
941
942
943 /**
944  * Process the incoming data packets, call appropriate handlers.
945  *
946  * @param h       The cadet handle
947  * @param message A message encapsulating the data
948  */
949 static void
950 handle_local_data (void *cls,
951                    const struct GNUNET_CADET_LocalData *message)
952 {
953   struct GNUNET_CADET_Handle *h = cls;
954   const struct GNUNET_MessageHeader *payload;
955   const struct GNUNET_CADET_MessageHandler *handler;
956   struct GNUNET_CADET_Channel *ch;
957   uint16_t type;
958
959   ch = retrieve_channel (h,
960                          message->ccn);
961   if (NULL == ch)
962   {
963     GNUNET_break_op (0);
964     reconnect (h);
965     return;
966   }
967
968   payload = (struct GNUNET_MessageHeader *) &message[1];
969   type = ntohs (payload->type);
970   LOG (GNUNET_ERROR_TYPE_DEBUG,
971        "Got a %s data on channel %s [%X] of type %s (%u)\n",
972        GC_f2s (ntohl (ch->ccn.channel_of_client) >=
973                GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
974        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
975        ntohl (message->ccn.channel_of_client),
976        GC_m2s (type),
977        type);
978   for (unsigned i=0;i<h->n_handlers;i++)
979   {
980     handler = &h->message_handlers[i];
981     if (handler->type == type)
982     {
983       if (GNUNET_OK !=
984           handler->callback (h->cls,
985                              ch,
986                              &ch->ctx,
987                              payload))
988       {
989         LOG (GNUNET_ERROR_TYPE_DEBUG,
990              "callback caused disconnection\n");
991         GNUNET_CADET_channel_destroy (ch);
992         return;
993       }
994       return;
995     }
996   }
997   /* Other peer sent message we do not comprehend. */
998   GNUNET_break_op (0);
999   GNUNET_CADET_receive_done (ch);
1000 }
1001
1002
1003 /**
1004  * Process a local ACK message, enabling the client to send
1005  * more data to the service.
1006  *
1007  * @param h Cadet handle.
1008  * @param message Message itself.
1009  */
1010 static void
1011 handle_local_ack (void *cls,
1012                   const struct GNUNET_CADET_LocalAck *message)
1013 {
1014   struct GNUNET_CADET_Handle *h = cls;
1015   struct GNUNET_CADET_Channel *ch;
1016   struct GNUNET_CADET_ClientChannelNumber ccn;
1017   struct GNUNET_CADET_TransmitHandle *th;
1018
1019   ccn = message->ccn;
1020   ch = retrieve_channel (h, ccn);
1021   if (NULL == ch)
1022   {
1023     LOG (GNUNET_ERROR_TYPE_DEBUG,
1024          "ACK on unknown channel %X\n",
1025          ntohl (ccn.channel_of_client));
1026     return;
1027   }
1028   ch->allow_send++;
1029   LOG (GNUNET_ERROR_TYPE_DEBUG,
1030        "Got an ACK on channel %X, allow send now %u!\n",
1031        ntohl (ch->ccn.channel_of_client),
1032        ch->allow_send);
1033   for (th = h->th_head; NULL != th; th = th->next)
1034   {
1035     if ( (th->channel == ch) &&
1036          (NULL == th->request_data_task) )
1037     {
1038       th->request_data_task
1039         = GNUNET_SCHEDULER_add_now (&request_data,
1040                                     th);
1041       break;
1042     }
1043   }
1044 }
1045
1046
1047 /**
1048  * Generic error handler, called with the appropriate error code and
1049  * the same closure specified at the creation of the message queue.
1050  * Not every message queue implementation supports an error handler.
1051  *
1052  * @param cls closure, a `struct GNUNET_CORE_Handle *`
1053  * @param error error code
1054  */
1055 static void
1056 handle_mq_error (void *cls,
1057                  enum GNUNET_MQ_Error error)
1058 {
1059   struct GNUNET_CADET_Handle *h = cls;
1060
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
1062   GNUNET_MQ_destroy (h->mq);
1063   h->mq = NULL;
1064   reconnect (h);
1065 }
1066
1067
1068 /*
1069  * Process a local reply about info on all channels, pass info to the user.
1070  *
1071  * @param h Cadet handle.
1072  * @param message Message itself.
1073  */
1074 // static void
1075 // process_get_channels (struct GNUNET_CADET_Handle *h,
1076 //                      const struct GNUNET_MessageHeader *message)
1077 // {
1078 //   struct GNUNET_CADET_LocalInfo *msg;
1079 //
1080 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
1081 //
1082 //   if (NULL == h->channels_cb)
1083 //   {
1084 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1085 //     return;
1086 //   }
1087 //
1088 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1089 //   if (ntohs (message->size) !=
1090 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
1091 //        sizeof (struct GNUNET_PeerIdentity)))
1092 //   {
1093 //     GNUNET_break_op (0);
1094 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1095 //                 "Get channels message: size %hu - expected %u\n",
1096 //                 ntohs (message->size),
1097 //                 sizeof (struct GNUNET_CADET_LocalInfo));
1098 //     return;
1099 //   }
1100 //   h->channels_cb (h->channels_cls,
1101 //                   ntohl (msg->channel_id),
1102 //                   &msg->owner,
1103 //                   &msg->destination);
1104 // }
1105
1106
1107
1108 /*
1109  * Process a local monitor_channel reply, pass info to the user.
1110  *
1111  * @param h Cadet handle.
1112  * @param message Message itself.
1113  */
1114 // static void
1115 // process_show_channel (struct GNUNET_CADET_Handle *h,
1116 //                      const struct GNUNET_MessageHeader *message)
1117 // {
1118 //   struct GNUNET_CADET_LocalInfo *msg;
1119 //   size_t esize;
1120 //
1121 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1122 //
1123 //   if (NULL == h->channel_cb)
1124 //   {
1125 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1126 //     return;
1127 //   }
1128 //
1129 //   /* Verify message sanity */
1130 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1131 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
1132 //   if (ntohs (message->size) != esize)
1133 //   {
1134 //     GNUNET_break_op (0);
1135 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136 //                 "Show channel message: size %hu - expected %u\n",
1137 //                 ntohs (message->size),
1138 //                 esize);
1139 //
1140 //     h->channel_cb (h->channel_cls, NULL, NULL);
1141 //     h->channel_cb = NULL;
1142 //     h->channel_cls = NULL;
1143 //
1144 //     return;
1145 //   }
1146 //
1147 //   h->channel_cb (h->channel_cls,
1148 //                  &msg->destination,
1149 //                  &msg->owner);
1150 // }
1151
1152
1153
1154 /**
1155  * Check that message received from CADET service is well-formed.
1156  *
1157  * @param cls the `struct GNUNET_CADET_Handle`
1158  * @param message the message we got
1159  * @return #GNUNET_OK if the message is well-formed,
1160  *         #GNUNET_SYSERR otherwise
1161  */
1162 static int
1163 check_get_peers (void *cls,
1164                  const struct GNUNET_CADET_LocalInfoPeer *message)
1165 {
1166   struct GNUNET_CADET_Handle *h = cls;
1167   uint16_t size;
1168
1169   if (NULL == h->info_cb.peers_cb)
1170   {
1171     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1172                 "  no handler for peesr monitor message!\n");
1173     return GNUNET_SYSERR;
1174   }
1175
1176   size = ntohs (message->header.size);
1177   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1178   {
1179     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1180     h->info_cb.peers_cb = NULL;
1181     h->info_cls = NULL;
1182     return GNUNET_SYSERR;
1183   }
1184
1185   return GNUNET_OK;
1186 }
1187
1188
1189 /**
1190  * Process a local reply about info on all tunnels, pass info to the user.
1191  *
1192  * @param cls Closure (Cadet handle).
1193  * @param msg Message itself.
1194  */
1195 static void
1196 handle_get_peers (void *cls,
1197                   const struct GNUNET_CADET_LocalInfoPeer *msg)
1198 {
1199   struct GNUNET_CADET_Handle *h = cls;
1200   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1201                        (int) ntohs (msg->tunnel),
1202                        (unsigned int ) ntohs (msg->paths),
1203                        0);
1204 }
1205
1206
1207 /**
1208  * Check that message received from CADET service is well-formed.
1209  *
1210  * @param cls the `struct GNUNET_CADET_Handle`
1211  * @param message the message we got
1212  * @return #GNUNET_OK if the message is well-formed,
1213  *         #GNUNET_SYSERR otherwise
1214  */
1215 static int
1216 check_get_peer (void *cls,
1217                 const struct GNUNET_CADET_LocalInfoPeer *message)
1218 {
1219   struct GNUNET_CADET_Handle *h = cls;
1220   const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1221   struct GNUNET_PeerIdentity *paths_array;
1222   size_t esize;
1223   unsigned int epaths;
1224   unsigned int paths;
1225   unsigned int peers;
1226
1227   if (NULL == h->info_cb.peer_cb)
1228   {
1229     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1230                 "  no handler for peer monitor message!\n");
1231     goto clean_cls;
1232   }
1233
1234   /* Verify message sanity */
1235   esize = ntohs (message->header.size);
1236   if (esize < msize)
1237   {
1238     GNUNET_break_op (0);
1239     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1240     goto clean_cls;
1241   }
1242   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
1243   {
1244     GNUNET_break_op (0);
1245     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1246     goto clean_cls;
1247
1248   }
1249   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
1250   epaths = (unsigned int) ntohs (message->paths);
1251   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1252   paths = 0;
1253   for (int i = 0; i < peers; i++)
1254   {
1255     if (0 == memcmp (&paths_array[i], &message->destination,
1256                      sizeof (struct GNUNET_PeerIdentity)))
1257     {
1258       paths++;
1259     }
1260   }
1261   if (paths != epaths)
1262   {
1263     GNUNET_break_op (0);
1264     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1265     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1266     goto clean_cls;
1267   }
1268
1269   return GNUNET_OK;
1270
1271 clean_cls:
1272   h->info_cb.peer_cb = NULL;
1273   h->info_cls = NULL;
1274   return GNUNET_SYSERR;
1275 }
1276
1277
1278 /**
1279  * Process a local peer info reply, pass info to the user.
1280  *
1281  * @param cls Closure (Cadet handle).
1282  * @param message Message itself.
1283  */
1284 static void
1285 handle_get_peer (void *cls,
1286                  const struct GNUNET_CADET_LocalInfoPeer *message)
1287 {
1288   struct GNUNET_CADET_Handle *h = cls;
1289   struct GNUNET_PeerIdentity *paths_array;
1290   unsigned int paths;
1291   unsigned int path_length;
1292   int neighbor;
1293   unsigned int peers;
1294
1295   paths = (unsigned int) ntohs (message->paths);
1296   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1297   peers = (ntohs (message->header.size) - sizeof (*message))
1298           / sizeof (struct GNUNET_PeerIdentity);
1299   path_length = 0;
1300   neighbor = GNUNET_NO;
1301
1302   for (int i = 0; i < peers; i++)
1303   {
1304     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
1305     path_length++;
1306     if (0 == memcmp (&paths_array[i], &message->destination,
1307                      sizeof (struct GNUNET_PeerIdentity)))
1308     {
1309       if (1 == path_length)
1310         neighbor = GNUNET_YES;
1311       path_length = 0;
1312     }
1313   }
1314
1315   /* Call Callback with tunnel info. */
1316   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1317   h->info_cb.peer_cb (h->info_cls,
1318                       &message->destination,
1319                       (int) ntohs (message->tunnel),
1320                       neighbor,
1321                       paths,
1322                       paths_array);
1323 }
1324
1325
1326 /**
1327  * Check that message received from CADET service is well-formed.
1328  *
1329  * @param cls the `struct GNUNET_CADET_Handle`
1330  * @param msg the message we got
1331  * @return #GNUNET_OK if the message is well-formed,
1332  *         #GNUNET_SYSERR otherwise
1333  */
1334 static int
1335 check_get_tunnels (void *cls,
1336                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1337 {
1338   struct GNUNET_CADET_Handle *h = cls;
1339   uint16_t size;
1340
1341   if (NULL == h->info_cb.tunnels_cb)
1342   {
1343     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1344                 "  no handler for tunnels monitor message!\n");
1345     return GNUNET_SYSERR;
1346   }
1347
1348   size = ntohs (msg->header.size);
1349   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1350   {
1351     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1352     h->info_cb.tunnels_cb = NULL;
1353     h->info_cls = NULL;
1354     return GNUNET_SYSERR;
1355   }
1356   return GNUNET_OK;
1357 }
1358
1359
1360 /**
1361  * Process a local reply about info on all tunnels, pass info to the user.
1362  *
1363  * @param cls Closure (Cadet handle).
1364  * @param message Message itself.
1365  */
1366 static void
1367 handle_get_tunnels (void *cls,
1368                     const struct GNUNET_CADET_LocalInfoTunnel *msg)
1369 {
1370   struct GNUNET_CADET_Handle *h = cls;
1371
1372   h->info_cb.tunnels_cb (h->info_cls,
1373                          &msg->destination,
1374                          ntohl (msg->channels),
1375                          ntohl (msg->connections),
1376                          ntohs (msg->estate),
1377                          ntohs (msg->cstate));
1378
1379 }
1380
1381
1382 /**
1383  * Check that message received from CADET service is well-formed.
1384  *
1385  * @param cls the `struct GNUNET_CADET_Handle`
1386  * @param msg the message we got
1387  * @return #GNUNET_OK if the message is well-formed,
1388  *         #GNUNET_SYSERR otherwise
1389  */
1390 static int
1391 check_get_tunnel (void *cls,
1392                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
1393 {
1394   struct GNUNET_CADET_Handle *h = cls;
1395   unsigned int ch_n;
1396   unsigned int c_n;
1397   size_t esize;
1398   size_t msize;
1399
1400   if (NULL == h->info_cb.tunnel_cb)
1401   {
1402     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1403                 "  no handler for tunnel monitor message!\n");
1404     goto clean_cls;
1405   }
1406
1407   /* Verify message sanity */
1408   msize = ntohs (msg->header.size);
1409   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1410   if (esize > msize)
1411   {
1412     GNUNET_break_op (0);
1413     h->info_cb.tunnel_cb (h->info_cls,
1414                           NULL, 0, 0, NULL, NULL, 0, 0);
1415     goto clean_cls;
1416   }
1417   ch_n = ntohl (msg->channels);
1418   c_n = ntohl (msg->connections);
1419   esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber);
1420   esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier);
1421   if (msize != esize)
1422   {
1423     GNUNET_break_op (0);
1424     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425                 "m:%u, e: %u (%u ch, %u conn)\n",
1426                 (unsigned int) msize,
1427                 (unsigned int) esize,
1428                 ch_n,
1429                 c_n);
1430     h->info_cb.tunnel_cb (h->info_cls,
1431                           NULL, 0, 0, NULL, NULL, 0, 0);
1432     goto clean_cls;
1433   }
1434
1435   return GNUNET_OK;
1436
1437 clean_cls:
1438   h->info_cb.tunnel_cb = NULL;
1439   h->info_cls = NULL;
1440   return GNUNET_SYSERR;
1441 }
1442
1443
1444 /**
1445  * Process a local tunnel info reply, pass info to the user.
1446  *
1447  * @param cls Closure (Cadet handle).
1448  * @param msg Message itself.
1449  */
1450 static void
1451 handle_get_tunnel (void *cls,
1452                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1453 {
1454   struct GNUNET_CADET_Handle *h = cls;
1455   unsigned int ch_n;
1456   unsigned int c_n;
1457   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
1458   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
1459
1460   ch_n = ntohl (msg->channels);
1461   c_n = ntohl (msg->connections);
1462
1463   /* Call Callback with tunnel info. */
1464   conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
1465   chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n];
1466   h->info_cb.tunnel_cb (h->info_cls,
1467                         &msg->destination,
1468                         ch_n,
1469                         c_n,
1470                         chns,
1471                         conns,
1472                         ntohs (msg->estate),
1473                         ntohs (msg->cstate));
1474 }
1475
1476
1477 /**
1478  * Reconnect to the service, retransmit all infomation to try to restore the
1479  * original state.
1480  *
1481  * @param h handle to the cadet
1482  */
1483 static void
1484 reconnect (struct GNUNET_CADET_Handle *h)
1485 {
1486   struct GNUNET_MQ_MessageHandler handlers[] = {
1487     GNUNET_MQ_hd_fixed_size (channel_created,
1488                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
1489                              struct GNUNET_CADET_LocalChannelCreateMessage,
1490                              h),
1491     GNUNET_MQ_hd_fixed_size (channel_destroy,
1492                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
1493                              struct GNUNET_CADET_LocalChannelDestroyMessage,
1494                              h),
1495     GNUNET_MQ_hd_var_size (local_data,
1496                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
1497                            struct GNUNET_CADET_LocalData,
1498                            h),
1499     GNUNET_MQ_hd_fixed_size (local_ack,
1500                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
1501                              struct GNUNET_CADET_LocalAck,
1502                              h),
1503     GNUNET_MQ_hd_var_size (get_peers,
1504                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
1505                            struct GNUNET_CADET_LocalInfoPeer,
1506                            h),
1507     GNUNET_MQ_hd_var_size (get_peer,
1508                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
1509                            struct GNUNET_CADET_LocalInfoPeer,
1510                            h),
1511     GNUNET_MQ_hd_var_size (get_tunnels,
1512                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
1513                            struct GNUNET_CADET_LocalInfoTunnel,
1514                            h),
1515     GNUNET_MQ_hd_var_size (get_tunnel,
1516                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
1517                            struct GNUNET_CADET_LocalInfoTunnel,
1518                            h),
1519 // FIXME
1520 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
1521 //                        GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
1522 //                        struct GNUNET_CADET_ChannelDestroyMessage);
1523     GNUNET_MQ_handler_end ()
1524   };
1525   struct GNUNET_CADET_Channel *ch;
1526
1527   while (NULL != (ch = h->channels_head))
1528   {
1529     LOG (GNUNET_ERROR_TYPE_DEBUG,
1530          "Destroying channel due to a reconnect\n");
1531     destroy_channel (ch);
1532   }
1533
1534   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
1535
1536   if (NULL != h->mq)
1537   {
1538     GNUNET_MQ_destroy (h->mq);
1539     h->mq = NULL;
1540   }
1541   h->mq = GNUNET_CLIENT_connect (h->cfg,
1542                                  "cadet",
1543                                  handlers,
1544                                  &handle_mq_error,
1545                                  h);
1546   if (NULL == h->mq)
1547   {
1548     schedule_reconnect (h);
1549     return;
1550   }
1551   else
1552   {
1553     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1554   }
1555 }
1556
1557 /**
1558  * Reconnect callback: tries to reconnect again after a failer previous
1559  * reconnecttion
1560  *
1561  * @param cls closure (cadet handle)
1562  */
1563 static void
1564 reconnect_cbk (void *cls)
1565 {
1566   struct GNUNET_CADET_Handle *h = cls;
1567
1568   h->reconnect_task = NULL;
1569   reconnect (h);
1570 }
1571
1572
1573 /**
1574  * Reconnect to the service, retransmit all infomation to try to restore the
1575  * original state.
1576  *
1577  * @param h handle to the cadet
1578  *
1579  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
1580  */
1581 static void
1582 schedule_reconnect (struct GNUNET_CADET_Handle *h)
1583 {
1584   if (NULL == h->reconnect_task)
1585   {
1586     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
1587                                                       &reconnect_cbk, h);
1588     h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
1589   }
1590 }
1591
1592
1593 /******************************************************************************/
1594 /**********************      API CALL DEFINITIONS     *************************/
1595 /******************************************************************************/
1596
1597 struct GNUNET_CADET_Handle *
1598 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1599                       void *cls,
1600                       GNUNET_CADET_ChannelEndHandler cleaner,
1601                       const struct GNUNET_CADET_MessageHandler *handlers)
1602 {
1603   struct GNUNET_CADET_Handle *h;
1604
1605   h = GNUNET_new (struct GNUNET_CADET_Handle);
1606   LOG (GNUNET_ERROR_TYPE_DEBUG,
1607        "GNUNET_CADET_connect() %p\n",
1608        h);
1609   h->cfg = cfg;
1610   h->cleaner = cleaner;
1611   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1612   reconnect (h);
1613   if (h->mq == NULL)
1614   {
1615     GNUNET_break (0);
1616     GNUNET_CADET_disconnect (h);
1617     return NULL;
1618   }
1619   h->cls = cls;
1620   h->message_handlers = handlers;
1621   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1622   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1623   h->reconnect_task = NULL;
1624
1625   /* count handlers */
1626   for (h->n_handlers = 0;
1627        handlers && handlers[h->n_handlers].type;
1628        h->n_handlers++) ;
1629   return h;
1630 }
1631
1632
1633 /**
1634  * Disconnect from the cadet service. All channels will be destroyed. All channel
1635  * disconnect callbacks will be called on any still connected peers, notifying
1636  * about their disconnection. The registered inbound channel cleaner will be
1637  * called should any inbound channels still exist.
1638  *
1639  * @param handle connection to cadet to disconnect
1640  */
1641 void
1642 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1643 {
1644   struct GNUNET_CADET_Channel *ch;
1645   struct GNUNET_CADET_Channel *aux;
1646   struct GNUNET_CADET_TransmitHandle *th;
1647
1648   LOG (GNUNET_ERROR_TYPE_DEBUG,
1649        "CADET DISCONNECT\n");
1650   ch = handle->channels_head;
1651   while (NULL != ch)
1652   {
1653     aux = ch->next;
1654     if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1655     {
1656       GNUNET_break (0);
1657       LOG (GNUNET_ERROR_TYPE_DEBUG,
1658            "channel %X not destroyed\n",
1659            ntohl (ch->ccn.channel_of_client));
1660     }
1661     destroy_channel (ch);
1662     ch = aux;
1663   }
1664   while (NULL != (th = handle->th_head))
1665   {
1666     struct GNUNET_MessageHeader *msg;
1667
1668     /* Make sure it is an allowed packet (everything else should have been
1669      * already canceled).
1670      */
1671     GNUNET_break (GNUNET_NO == th_is_payload (th));
1672     msg = (struct GNUNET_MessageHeader *) &th[1];
1673     switch (ntohs(msg->type))
1674     {
1675       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1676       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1677       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN:
1678       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE:
1679       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1680       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1681       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1682       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1683       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1684       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1685         break;
1686       default:
1687         GNUNET_break (0);
1688         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1689              GC_m2s (ntohs(msg->type)));
1690     }
1691
1692     GNUNET_CADET_notify_transmit_ready_cancel (th);
1693   }
1694
1695   if (NULL != handle->mq)
1696   {
1697     GNUNET_MQ_destroy (handle->mq);
1698     handle->mq = NULL;
1699   }
1700   if (NULL != handle->reconnect_task)
1701   {
1702     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1703     handle->reconnect_task = NULL;
1704   }
1705
1706   GNUNET_CONTAINER_multihashmap_destroy (handle->ports);
1707   handle->ports = NULL;
1708   GNUNET_free (handle);
1709 }
1710
1711
1712 /**
1713  * Open a port to receive incomming channels.
1714  *
1715  * @param h CADET handle.
1716  * @param port Hash representing the port number.
1717  * @param new_channel Function called when an channel is received.
1718  * @param new_channel_cls Closure for @a new_channel.
1719  * @return Port handle.
1720  */
1721 struct GNUNET_CADET_Port *
1722 GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1723                         const struct GNUNET_HashCode *port,
1724                         GNUNET_CADET_InboundChannelNotificationHandler
1725                             new_channel,
1726                         void *new_channel_cls)
1727 {
1728   struct GNUNET_CADET_PortMessage *msg;
1729   struct GNUNET_MQ_Envelope *env;
1730   struct GNUNET_CADET_Port *p;
1731
1732   GNUNET_assert (NULL != new_channel);
1733   p = GNUNET_new (struct GNUNET_CADET_Port);
1734   p->cadet = h;
1735   p->hash = GNUNET_new (struct GNUNET_HashCode);
1736   *p->hash = *port;
1737   p->handler = new_channel;
1738   p->cls = new_channel_cls;
1739   GNUNET_assert (GNUNET_OK ==
1740                  GNUNET_CONTAINER_multihashmap_put (h->ports,
1741                                                     p->hash,
1742                                                     p,
1743                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1744
1745   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1746   msg->port = *p->hash;
1747   GNUNET_MQ_send (h->mq, env);
1748
1749   return p;
1750 }
1751
1752 /**
1753  * Close a port opened with @a GNUNET_CADET_open_port.
1754  * The @a new_channel callback will no longer be called.
1755  *
1756  * @param p Port handle.
1757  */
1758 void
1759 GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1760 {
1761   struct GNUNET_CADET_PortMessage *msg;
1762   struct GNUNET_MQ_Envelope *env;
1763   struct GNUNET_HashCode *id;
1764
1765   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1766
1767   id = NULL != p->hash ? p->hash : &p->id;
1768   msg->port = *id;
1769   GNUNET_MQ_send (p->cadet->mq, env);
1770   GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
1771   GNUNET_free_non_null (p->hash);
1772   GNUNET_free (p);
1773 }
1774
1775
1776 /**
1777  * Create a new channel towards a remote peer.
1778  *
1779  * If the destination port is not open by any peer or the destination peer
1780  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1781  * for this channel.
1782  *
1783  * @param h cadet handle
1784  * @param channel_ctx client's channel context to associate with the channel
1785  * @param peer peer identity the channel should go to
1786  * @param port Port hash (port number).
1787  * @param options CadetOption flag field, with all desired option bits set to 1.
1788  * @return handle to the channel
1789  */
1790 struct GNUNET_CADET_Channel *
1791 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1792                             void *channel_ctx,
1793                             const struct GNUNET_PeerIdentity *peer,
1794                             const struct GNUNET_HashCode *port,
1795                             enum GNUNET_CADET_ChannelOption options)
1796 {
1797   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
1798   struct GNUNET_MQ_Envelope *env;
1799   struct GNUNET_CADET_Channel *ch;
1800   struct GNUNET_CADET_ClientChannelNumber ccn;
1801
1802   ccn.channel_of_client = htonl (0);
1803   ch = create_channel (h, ccn);
1804   ch->ctx = channel_ctx;
1805   ch->peer = GNUNET_PEER_intern (peer);
1806
1807   LOG (GNUNET_ERROR_TYPE_DEBUG,
1808        "Creating new channel to %s:%u at %p number %X\n",
1809        GNUNET_i2s (peer),
1810        port,
1811        ch,
1812        ntohl (ch->ccn.channel_of_client));
1813   env = GNUNET_MQ_msg (msg,
1814                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1815   msg->ccn = ch->ccn;
1816   msg->port = *port;
1817   msg->peer = *peer;
1818   msg->opt = htonl (options);
1819   GNUNET_MQ_send (h->mq,
1820                   env);
1821   return ch;
1822 }
1823
1824
1825 void
1826 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1827 {
1828   struct GNUNET_CADET_Handle *h;
1829   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
1830   struct GNUNET_MQ_Envelope *env;
1831   struct GNUNET_CADET_TransmitHandle *th;
1832   struct GNUNET_CADET_TransmitHandle *next;
1833
1834   LOG (GNUNET_ERROR_TYPE_DEBUG,
1835        "Destroying channel\n");
1836   h = channel->cadet;
1837   for  (th = h->th_head; th != NULL; th = next)
1838   {
1839     next = th->next;
1840     if (th->channel == channel)
1841     {
1842       GNUNET_break (0);
1843       if (GNUNET_YES == th_is_payload (th))
1844       {
1845         /* applications should cancel before destroying channel */
1846         LOG (GNUNET_ERROR_TYPE_WARNING,
1847              "Channel destroyed without cancelling transmission requests\n");
1848         th->notify (th->notify_cls, 0, NULL);
1849       }
1850       else
1851       {
1852         LOG (GNUNET_ERROR_TYPE_WARNING,
1853              "no meta-traffic should be queued\n");
1854       }
1855       GNUNET_CONTAINER_DLL_remove (h->th_head,
1856                                    h->th_tail,
1857                                    th);
1858       GNUNET_CADET_notify_transmit_ready_cancel (th);
1859     }
1860   }
1861
1862   env = GNUNET_MQ_msg (msg,
1863                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1864   msg->ccn = channel->ccn;
1865   GNUNET_MQ_send (h->mq,
1866                   env);
1867
1868   destroy_channel (channel);
1869 }
1870
1871
1872 /**
1873  * Get information about a channel.
1874  *
1875  * @param channel Channel handle.
1876  * @param option Query (GNUNET_CADET_OPTION_*).
1877  * @param ... dependant on option, currently not used
1878  *
1879  * @return Union with an answer to the query.
1880  */
1881 const union GNUNET_CADET_ChannelInfo *
1882 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1883                               enum GNUNET_CADET_ChannelOption option, ...)
1884 {
1885   static int bool_flag;
1886   const union GNUNET_CADET_ChannelInfo *ret;
1887
1888   switch (option)
1889   {
1890     case GNUNET_CADET_OPTION_NOBUFFER:
1891     case GNUNET_CADET_OPTION_RELIABLE:
1892     case GNUNET_CADET_OPTION_OUT_OF_ORDER:
1893       if (0 != (option & channel->options))
1894         bool_flag = GNUNET_YES;
1895       else
1896         bool_flag = GNUNET_NO;
1897       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1898       break;
1899     case GNUNET_CADET_OPTION_PEER:
1900       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1901       break;
1902     default:
1903       GNUNET_break (0);
1904       return NULL;
1905   }
1906
1907   return ret;
1908 }
1909
1910
1911 struct GNUNET_CADET_TransmitHandle *
1912 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
1913                                     int cork,
1914                                     struct GNUNET_TIME_Relative maxdelay,
1915                                     size_t notify_size,
1916                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
1917                                     void *notify_cls)
1918 {
1919   struct GNUNET_CADET_TransmitHandle *th;
1920
1921   GNUNET_assert (NULL != channel);
1922   GNUNET_assert (NULL != notify);
1923   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
1924   LOG (GNUNET_ERROR_TYPE_DEBUG,
1925        "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
1926        ntohl (channel->ccn.channel_of_client),
1927        channel->allow_send,
1928        (ntohl (channel->ccn.channel_of_client) >=
1929         GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1930        ? "origin"
1931        : "destination",
1932        (unsigned int) notify_size);
1933   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
1934   {
1935     LOG (GNUNET_ERROR_TYPE_WARNING,
1936          "CADET transmit ready timeout is deprected (has no effect)\n");
1937   }
1938
1939   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
1940   th->channel = channel;
1941   th->size = notify_size;
1942   th->notify = notify;
1943   th->notify_cls = notify_cls;
1944   if (0 != channel->allow_send)
1945     th->request_data_task
1946       = GNUNET_SCHEDULER_add_now (&request_data,
1947                                   th);
1948   else
1949     add_to_queue (channel->cadet,
1950                   th);
1951   return th;
1952 }
1953
1954
1955 void
1956 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
1957 {
1958   if (NULL != th->request_data_task)
1959   {
1960     GNUNET_SCHEDULER_cancel (th->request_data_task);
1961     th->request_data_task = NULL;
1962   }
1963   remove_from_queue (th);
1964   GNUNET_free (th);
1965 }
1966
1967
1968 /**
1969  * Send an ack on the channel to confirm the processing of a message.
1970  *
1971  * @param ch Channel on which to send the ACK.
1972  */
1973 void
1974 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
1975 {
1976   struct GNUNET_CADET_LocalAck *msg;
1977   struct GNUNET_MQ_Envelope *env;
1978
1979   env = GNUNET_MQ_msg (msg,
1980                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1981   LOG (GNUNET_ERROR_TYPE_DEBUG,
1982        "Sending ACK on channel %X\n",
1983        ntohl (channel->ccn.channel_of_client));
1984   msg->ccn = channel->ccn;
1985   GNUNET_MQ_send (channel->cadet->mq,
1986                   env);
1987 }
1988
1989
1990 static void
1991 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
1992 {
1993   struct GNUNET_MessageHeader *msg;
1994   struct GNUNET_MQ_Envelope *env;
1995
1996   LOG (GNUNET_ERROR_TYPE_DEBUG,
1997        " Sending %s monitor message to service\n",
1998        GC_m2s(type));
1999
2000   env = GNUNET_MQ_msg (msg, type);
2001   GNUNET_MQ_send (h->mq, env);
2002 }
2003
2004
2005 /**
2006  * Request a debug dump on the service's STDERR.
2007  *
2008  * WARNING: unstable API, likely to change in the future!
2009  *
2010  * @param h cadet handle
2011  */
2012 void
2013 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
2014 {
2015   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
2016   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
2017 }
2018
2019
2020 /**
2021  * Request information about peers known to the running cadet service.
2022  * The callback will be called for every peer known to the service.
2023  * Only one info request (of any kind) can be active at once.
2024  *
2025  *
2026  * WARNING: unstable API, likely to change in the future!
2027  *
2028  * @param h Handle to the cadet peer.
2029  * @param callback Function to call with the requested data.
2030  * @param callback_cls Closure for @c callback.
2031  *
2032  * @return #GNUNET_OK / #GNUNET_SYSERR
2033  */
2034 int
2035 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
2036                        GNUNET_CADET_PeersCB callback,
2037                        void *callback_cls)
2038 {
2039   if (NULL != h->info_cb.peers_cb)
2040   {
2041     GNUNET_break (0);
2042     return GNUNET_SYSERR;
2043   }
2044   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
2045   h->info_cb.peers_cb = callback;
2046   h->info_cls = callback_cls;
2047   return GNUNET_OK;
2048 }
2049
2050
2051 /**
2052  * Cancel a peer info request. The callback will not be called (anymore).
2053  *
2054  * WARNING: unstable API, likely to change in the future!
2055  *
2056  * @param h Cadet handle.
2057  *
2058  * @return Closure given to GNUNET_CADET_get_peers.
2059  */
2060 void *
2061 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
2062 {
2063   void *cls;
2064
2065   cls = h->info_cls;
2066   h->info_cb.peers_cb = NULL;
2067   h->info_cls = NULL;
2068   return cls;
2069 }
2070
2071
2072 /**
2073  * Request information about a peer known to the running cadet peer.
2074  * The callback will be called for the tunnel once.
2075  * Only one info request (of any kind) can be active at once.
2076  *
2077  * WARNING: unstable API, likely to change in the future!
2078  *
2079  * @param h Handle to the cadet peer.
2080  * @param id Peer whose tunnel to examine.
2081  * @param callback Function to call with the requested data.
2082  * @param callback_cls Closure for @c callback.
2083  *
2084  * @return #GNUNET_OK / #GNUNET_SYSERR
2085  */
2086 int
2087 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
2088                        const struct GNUNET_PeerIdentity *id,
2089                        GNUNET_CADET_PeerCB callback,
2090                        void *callback_cls)
2091 {
2092   struct GNUNET_CADET_LocalInfo *msg;
2093   struct GNUNET_MQ_Envelope *env;
2094
2095   if (NULL != h->info_cb.peer_cb)
2096   {
2097     GNUNET_break (0);
2098     return GNUNET_SYSERR;
2099   }
2100
2101   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
2102   msg->peer = *id;
2103   GNUNET_MQ_send (h->mq, env);
2104
2105   h->info_cb.peer_cb = callback;
2106   h->info_cls = callback_cls;
2107   return GNUNET_OK;
2108 }
2109
2110
2111 /**
2112  * Request information about tunnels of the running cadet peer.
2113  * The callback will be called for every tunnel of the service.
2114  * Only one info request (of any kind) can be active at once.
2115  *
2116  * WARNING: unstable API, likely to change in the future!
2117  *
2118  * @param h Handle to the cadet peer.
2119  * @param callback Function to call with the requested data.
2120  * @param callback_cls Closure for @c callback.
2121  *
2122  * @return #GNUNET_OK / #GNUNET_SYSERR
2123  */
2124 int
2125 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
2126                          GNUNET_CADET_TunnelsCB callback,
2127                          void *callback_cls)
2128 {
2129   if (NULL != h->info_cb.tunnels_cb)
2130   {
2131     GNUNET_break (0);
2132     return GNUNET_SYSERR;
2133   }
2134   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
2135   h->info_cb.tunnels_cb = callback;
2136   h->info_cls = callback_cls;
2137   return GNUNET_OK;
2138 }
2139
2140
2141 /**
2142  * Cancel a monitor request. The monitor callback will not be called.
2143  *
2144  * @param h Cadet handle.
2145  *
2146  * @return Closure given to GNUNET_CADET_get_tunnels.
2147  */
2148 void *
2149 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
2150 {
2151   void *cls;
2152
2153   h->info_cb.tunnels_cb = NULL;
2154   cls = h->info_cls;
2155   h->info_cls = NULL;
2156
2157   return cls;
2158 }
2159
2160
2161
2162 /**
2163  * Request information about a tunnel of the running cadet peer.
2164  * The callback will be called for the tunnel once.
2165  * Only one info request (of any kind) can be active at once.
2166  *
2167  * WARNING: unstable API, likely to change in the future!
2168  *
2169  * @param h Handle to the cadet peer.
2170  * @param id Peer whose tunnel to examine.
2171  * @param callback Function to call with the requested data.
2172  * @param callback_cls Closure for @c callback.
2173  *
2174  * @return #GNUNET_OK / #GNUNET_SYSERR
2175  */
2176 int
2177 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2178                         const struct GNUNET_PeerIdentity *id,
2179                         GNUNET_CADET_TunnelCB callback,
2180                         void *callback_cls)
2181 {
2182   struct GNUNET_CADET_LocalInfo *msg;
2183   struct GNUNET_MQ_Envelope *env;
2184
2185   if (NULL != h->info_cb.tunnel_cb)
2186   {
2187     GNUNET_break (0);
2188     return GNUNET_SYSERR;
2189   }
2190
2191   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2192   msg->peer = *id;
2193   GNUNET_MQ_send (h->mq, env);
2194
2195   h->info_cb.tunnel_cb = callback;
2196   h->info_cls = callback_cls;
2197   return GNUNET_OK;
2198 }
2199
2200
2201 /**
2202  * Request information about a specific channel of the running cadet peer.
2203  *
2204  * WARNING: unstable API, likely to change in the future!
2205  * FIXME Add destination option.
2206  *
2207  * @param h Handle to the cadet peer.
2208  * @param initiator ID of the owner of the channel.
2209  * @param channel_number Channel number.
2210  * @param callback Function to call with the requested data.
2211  * @param callback_cls Closure for @c callback.
2212  *
2213  * @return #GNUNET_OK / #GNUNET_SYSERR
2214  */
2215 int
2216 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2217                            struct GNUNET_PeerIdentity *initiator,
2218                            unsigned int channel_number,
2219                            GNUNET_CADET_ChannelCB callback,
2220                            void *callback_cls)
2221 {
2222   struct GNUNET_CADET_LocalInfo *msg;
2223   struct GNUNET_MQ_Envelope *env;
2224
2225   if (NULL != h->info_cb.channel_cb)
2226   {
2227     GNUNET_break (0);
2228     return GNUNET_SYSERR;
2229   }
2230
2231   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2232   msg->peer = *initiator;
2233   msg->ccn.channel_of_client = htonl (channel_number);
2234   GNUNET_MQ_send (h->mq, env);
2235
2236   h->info_cb.channel_cb = callback;
2237   h->info_cls = callback_cls;
2238   return GNUNET_OK;
2239 }
2240
2241
2242 /**
2243  * Function called to notify a client about the connection
2244  * begin ready to queue more data.  "buf" will be
2245  * NULL and "size" zero if the connection was closed for
2246  * writing in the meantime.
2247  *
2248  * @param cls closure
2249  * @param size number of bytes available in buf
2250  * @param buf where the callee should write the message
2251  * @return number of bytes written to buf
2252  */
2253 static size_t
2254 cadet_mq_ntr (void *cls, size_t size,
2255              void *buf)
2256 {
2257   struct GNUNET_MQ_Handle *mq = cls;
2258   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2259   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2260   uint16_t msize;
2261
2262   state->th = NULL;
2263   if (NULL == buf)
2264   {
2265     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2266     return 0;
2267   }
2268   msize = ntohs (msg->size);
2269   GNUNET_assert (msize <= size);
2270   GNUNET_memcpy (buf, msg, msize);
2271   GNUNET_MQ_impl_send_continue (mq);
2272   return msize;
2273 }
2274
2275
2276 /**
2277  * Signature of functions implementing the
2278  * sending functionality of a message queue.
2279  *
2280  * @param mq the message queue
2281  * @param msg the message to send
2282  * @param impl_state state of the implementation
2283  */
2284 static void
2285 cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2286                         const struct GNUNET_MessageHeader *msg,
2287                         void *impl_state)
2288 {
2289   struct CadetMQState *state = impl_state;
2290
2291   GNUNET_assert (NULL == state->th);
2292   state->th =
2293       GNUNET_CADET_notify_transmit_ready (state->channel,
2294                                          /* FIXME: add option for corking */
2295                                          GNUNET_NO,
2296                                          GNUNET_TIME_UNIT_FOREVER_REL,
2297                                          ntohs (msg->size),
2298                                          &cadet_mq_ntr, mq);
2299
2300 }
2301
2302
2303 /**
2304  * Signature of functions implementing the
2305  * destruction of a message queue.
2306  * Implementations must not free 'mq', but should
2307  * take care of 'impl_state'.
2308  *
2309  * @param mq the message queue to destroy
2310  * @param impl_state state of the implementation
2311  */
2312 static void
2313 cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2314                            void *impl_state)
2315 {
2316   struct CadetMQState *state = impl_state;
2317
2318   if (NULL != state->th)
2319     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2320
2321   GNUNET_free (state);
2322 }
2323
2324
2325 /**
2326  * Create a message queue for a cadet channel.
2327  * The message queue can only be used to transmit messages,
2328  * not to receive them.
2329  *
2330  * @param channel the channel to create the message qeue for
2331  * @return a message queue to messages over the channel
2332  */
2333 struct GNUNET_MQ_Handle *
2334 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2335 {
2336   struct GNUNET_MQ_Handle *mq;
2337   struct CadetMQState *state;
2338
2339   state = GNUNET_new (struct CadetMQState);
2340   state->channel = channel;
2341
2342   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2343                                       &cadet_mq_destroy_impl_old,
2344                                       NULL, /* FIXME: cancel impl. */
2345                                       state,
2346                                       NULL, /* no msg handlers */
2347                                       NULL, /* no err handlers */
2348                                       NULL); /* no handler cls */
2349   return mq;
2350 }
2351
2352
2353 /**
2354  * Transitional function to convert an unsigned int port to a hash value.
2355  * WARNING: local static value returned, NOT reentrant!
2356  * WARNING: do not use this function for new code!
2357  *
2358  * @param port Numerical port (unsigned int format).
2359  *
2360  * @return A GNUNET_HashCode usable for the new CADET API.
2361  */
2362 const struct GNUNET_HashCode *
2363 GC_u2h (uint32_t port)
2364 {
2365   static struct GNUNET_HashCode hash;
2366
2367   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2368               "This is a transitional function, "
2369               "use proper crypto hashes as CADET ports\n");
2370   GNUNET_CRYPTO_hash (&port, sizeof (port), &hash);
2371
2372   return &hash;
2373 }
2374
2375
2376
2377 /******************************************************************************/
2378 /******************************* MQ-BASED API *********************************/
2379 /******************************************************************************/
2380
2381 /**
2382  * Connect to the MQ-based cadet service.
2383  *
2384  * @param cfg Configuration to use.
2385  *
2386  * @return Handle to the cadet service NULL on error.
2387  */
2388 struct GNUNET_CADET_Handle *
2389 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2390 {
2391   struct GNUNET_CADET_Handle *h;
2392
2393   LOG (GNUNET_ERROR_TYPE_DEBUG,
2394        "GNUNET_CADET_connecT()\n");
2395   h = GNUNET_new (struct GNUNET_CADET_Handle);
2396   h->cfg = cfg;
2397   h->mq_api = GNUNET_YES;
2398   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2399   reconnect (h);
2400   if (NULL == h->mq)
2401   {
2402     GNUNET_break (0);
2403     GNUNET_CADET_disconnect (h);
2404     return NULL;
2405   }
2406   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2407   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2408   h->reconnect_task = NULL;
2409
2410   return h;
2411 }
2412
2413
2414 /**
2415  * Open a port to receive incomming MQ-based channels.
2416  *
2417  * @param h CADET handle.
2418  * @param port Hash identifying the port.
2419  * @param connects Function called when an incoming channel is connected.
2420  * @param connects_cls Closure for the @a connects handler.
2421  * @param window_changes Function called when the transmit window size changes.
2422  * @param disconnects Function called when a channel is disconnected.
2423  * @param handlers Callbacks for messages we care about, NULL-terminated.
2424  *
2425  * @return Port handle.
2426  */
2427 struct GNUNET_CADET_Port *
2428 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2429                         const struct GNUNET_HashCode *port,
2430                         GNUNET_CADET_ConnectEventHandler connects,
2431                         void * connects_cls,
2432                         GNUNET_CADET_WindowSizeEventHandler window_changes,
2433                         GNUNET_CADET_DisconnectEventHandler disconnects,
2434                         const struct GNUNET_MQ_MessageHandler *handlers)
2435 {
2436   struct GNUNET_CADET_PortMessage *msg;
2437   struct GNUNET_MQ_Envelope *env;
2438   struct GNUNET_CADET_Port *p;
2439
2440   GNUNET_assert (NULL != connects);
2441   GNUNET_assert (NULL != disconnects);
2442
2443   p = GNUNET_new (struct GNUNET_CADET_Port);
2444   p->cadet = h;
2445   p->id = *port;
2446   p->connects = connects;
2447   p->cls = connects_cls;
2448   p->window_changes = window_changes;
2449   p->disconnects = disconnects;
2450   p->handlers = handlers;
2451
2452   GNUNET_assert (GNUNET_OK ==
2453                  GNUNET_CONTAINER_multihashmap_put (h->ports,
2454                                                     p->hash,
2455                                                     p,
2456                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2457
2458   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2459   msg->port = p->id;
2460   GNUNET_MQ_send (h->mq, env);
2461
2462   return p;
2463 }
2464
2465
2466 /**
2467  * Create a new channel towards a remote peer.
2468  *
2469  * If the destination port is not open by any peer or the destination peer
2470  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2471  * for this channel.
2472  *
2473  * @param h CADET handle.
2474  * @param channel_cls Closure for the channel. It's given to:
2475  *                    - The disconnect handler @a disconnects
2476  *                    - Each message type callback in @a handlers
2477  * @param destination Peer identity the channel should go to.
2478  * @param port Identification of the destination port.
2479  * @param options CadetOption flag field, with all desired option bits set to 1.
2480  * @param window_changes Function called when the transmit window size changes.
2481  * @param disconnects Function called when the channel is disconnected.
2482  * @param handlers Callbacks for messages we care about, NULL-terminated.
2483  *
2484  * @return Handle to the channel.
2485  */
2486 struct GNUNET_CADET_Channel *
2487 GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2488                              void *channel_cls,
2489                              const struct GNUNET_PeerIdentity *destination,
2490                              const struct GNUNET_HashCode *port,
2491                              enum GNUNET_CADET_ChannelOption options,
2492                              GNUNET_CADET_WindowSizeEventHandler window_changes,
2493                              GNUNET_CADET_DisconnectEventHandler disconnects,
2494                              const struct GNUNET_MQ_MessageHandler *handlers)
2495 {
2496   struct GNUNET_CADET_Channel *ch;
2497   struct GNUNET_CADET_ClientChannelNumber ccn;
2498   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2499   struct GNUNET_MQ_Envelope *env;
2500
2501   GNUNET_assert (NULL != disconnects);
2502
2503   /* Save parameters */
2504   ccn.channel_of_client = htonl (0);
2505   ch = create_channel (h, ccn);
2506   ch->ctx = channel_cls;
2507   ch->peer = GNUNET_PEER_intern (destination);
2508   ch->options = options;
2509   ch->window_changes = window_changes;
2510   ch->disconnects = disconnects;
2511
2512   /* Create MQ for channel */
2513   ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2514                                           &cadet_mq_destroy_impl,
2515                                           &cadet_mq_cancel_impl,
2516                                           ch,
2517                                           handlers,
2518                                           &cadet_mq_error_handler,
2519                                           ch);
2520   GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2521
2522   /* Request channel creation to service */
2523   env = GNUNET_MQ_msg (msg,
2524                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2525   msg->ccn = ch->ccn;
2526   msg->port = *port;
2527   msg->peer = *destination;
2528   msg->opt = htonl (options);
2529   GNUNET_MQ_send (h->mq,
2530                   env);
2531
2532   return ch;
2533 }
2534
2535
2536 /**
2537  * Obtain the message queue for a connected peer.
2538  *
2539  * @param channel The channel handle from which to get the MQ.
2540  *
2541  * @return NULL if @a channel is not yet connected.
2542  */
2543 struct GNUNET_MQ_Handle *
2544 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2545 {
2546   return channel->mq;
2547 }