note about performance issue, indentation fix
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_cadet_service.h"
30 #include "cadet.h"
31 #include "cadet_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  *
42  * @deprecated
43  */
44 struct GNUNET_CADET_TransmitHandle
45 {
46   /**
47    * Double Linked list
48    */
49   struct GNUNET_CADET_TransmitHandle *next;
50
51   /**
52    * Double Linked list
53    */
54   struct GNUNET_CADET_TransmitHandle *prev;
55
56   /**
57    * Channel this message is sent on / for (may be NULL for control messages).
58    */
59   struct GNUNET_CADET_Channel *channel;
60
61   /**
62    * Request data task.
63    */
64   struct GNUNET_SCHEDULER_Task *request_data_task;
65
66   /**
67    * Callback to obtain the message to transmit, or NULL if we
68    * got the message in 'data'.  Notice that messages built
69    * by 'notify' need to be encapsulated with information about
70    * the 'target'.
71    */
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73
74   /**
75    * Closure for 'notify'
76    */
77   void *notify_cls;
78
79   /**
80    * Size of the payload.
81    */
82   size_t size;
83 };
84
85
86 union CadetInfoCB
87 {
88
89   /**
90    * Channel callback.
91    */
92   GNUNET_CADET_ChannelCB channel_cb;
93
94   /**
95    * Monitor callback
96    */
97   GNUNET_CADET_PeersCB peers_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeerCB peer_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_TunnelsCB tunnels_cb;
108
109   /**
110    * Tunnel callback.
111    */
112   GNUNET_CADET_TunnelCB tunnel_cb;
113 };
114
115
116 /**
117  * Opaque handle to the service.
118  */
119 struct GNUNET_CADET_Handle
120 {
121   /**
122    * Flag to indicate old or MQ API.
123    */
124   int mq_api;
125
126   /**
127    * Message queue (if available).
128    */
129   struct GNUNET_MQ_Handle *mq;
130
131   /**
132    * Set of handlers used for processing incoming messages in the channels
133    *
134    * @deprecated
135    */
136   const struct GNUNET_CADET_MessageHandler *message_handlers;
137
138   /**
139    * Number of handlers in the handlers array.
140    *
141    * @deprecated
142    */
143   unsigned int n_handlers;
144
145   /**
146    * Ports open.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap *ports;
149
150   /**
151    * Double linked list of the channels this client is connected to, head.
152    */
153   struct GNUNET_CADET_Channel *channels_head;
154
155   /**
156    * Double linked list of the channels this client is connected to, tail.
157    */
158   struct GNUNET_CADET_Channel *channels_tail;
159
160   /**
161    * Callback for inbound channel disconnection
162    */
163   GNUNET_CADET_ChannelEndHandler *cleaner;
164
165   /**
166    * Closure for all the handlers given by the client
167    *
168    * @deprecated
169    */
170   void *cls;
171
172   /**
173    * Messages to send to the service, head.
174    *
175    * @deprecated
176    */
177   struct GNUNET_CADET_TransmitHandle *th_head;
178
179   /**
180    * Messages to send to the service, tail.
181    *
182    * @deprecated
183    */
184   struct GNUNET_CADET_TransmitHandle *th_tail;
185
186   /**
187    * child of the next channel to create (to avoid reusing IDs often)
188    */
189   struct GNUNET_CADET_ClientChannelNumber next_ccn;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   struct GNUNET_SCHEDULER_Task * reconnect_task;
205
206   /**
207    * Callback for an info task (only one active at a time).
208    */
209   union CadetInfoCB info_cb;
210
211   /**
212    * Info callback closure for @c info_cb.
213    */
214   void *info_cls;
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_CADET_Peer
222 {
223   /**
224    * ID of the peer in short form
225    */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Channel this peer belongs to
230    */
231   struct GNUNET_CADET_Channel *t;
232 };
233
234
235 /**
236  * Opaque handle to a channel.
237  */
238 struct GNUNET_CADET_Channel
239 {
240   /**
241    * DLL next
242    */
243   struct GNUNET_CADET_Channel *next;
244
245   /**
246    * DLL prev
247    */
248   struct GNUNET_CADET_Channel *prev;
249
250   /**
251    * Handle to the cadet this channel belongs to
252    */
253   struct GNUNET_CADET_Handle *cadet;
254
255   /**
256    * Local ID of the channel
257    */
258   struct GNUNET_CADET_ClientChannelNumber ccn;
259
260   /**
261    * Channel's port, if incoming.
262    */
263   struct GNUNET_CADET_Port *incoming_port;
264
265   /**
266    * Other end of the channel.
267    */
268   GNUNET_PEER_Id peer;
269
270   /**
271    * Any data the caller wants to put in here
272    */
273   void *ctx;
274
275   /**
276    * Channel options: reliability, etc.
277    */
278   enum GNUNET_CADET_ChannelOption options;
279
280   /**
281    * Are we allowed to send to the service?
282    *
283    * @deprecated?
284    */
285   unsigned int allow_send;
286
287   /*****************************    MQ     ************************************/
288   /**
289    * Message Queue for the channel.
290    */
291   struct GNUNET_MQ_Handle *mq;
292
293   /**
294    * Task to allow mq to send more traffic.
295    */
296   struct GNUNET_SCHEDULER_Task *mq_cont;
297
298   /**
299    * Pending envelope in case we don't have an ACK from the service.
300    */
301   struct GNUNET_MQ_Envelope *pending_env;
302
303   /**
304    * Window change handler.
305    */
306   GNUNET_CADET_WindowSizeEventHandler window_changes;
307
308   /**
309    * Disconnect handler.
310    */
311   GNUNET_CADET_DisconnectEventHandler disconnects;
312
313 };
314
315
316 /**
317  * Opaque handle to a port.
318  */
319 struct GNUNET_CADET_Port
320 {
321   /**
322    * Handle to the CADET session this port belongs to.
323    */
324   struct GNUNET_CADET_Handle *cadet;
325
326   /**
327    * Port ID.
328    *
329    * @deprecated
330    */
331   struct GNUNET_HashCode *hash;
332
333   /**
334    * Callback handler for incoming channels on this port.
335    */
336   GNUNET_CADET_InboundChannelNotificationHandler *handler;
337
338   /**
339    * Closure for @a handler.
340    */
341   void *cls;
342
343   /*****************************    MQ     ************************************/
344
345   /**
346    * Port "number"
347    */
348   struct GNUNET_HashCode id;
349
350   /**
351    * Handler for incoming channels on this port
352    */
353   GNUNET_CADET_ConnectEventHandler connects;
354
355   /**
356    * Closure for @ref connects
357    */
358   void * connects_cls;
359
360   /**
361    * Window size change handler.
362    */
363   GNUNET_CADET_WindowSizeEventHandler window_changes;
364
365   /**
366    * Handler called when an incoming channel is destroyed..
367    */
368   GNUNET_CADET_DisconnectEventHandler disconnects;
369
370   /**
371    * Payload handlers for incoming channels.
372    */
373   const struct GNUNET_MQ_MessageHandler *handlers;
374 };
375
376
377 /**
378  * Implementation state for cadet's message queue.
379  */
380 struct CadetMQState
381 {
382   /**
383    * The current transmit handle, or NULL
384    * if no transmit is active.
385    */
386   struct GNUNET_CADET_TransmitHandle *th;
387
388   /**
389    * Channel to send the data over.
390    */
391   struct GNUNET_CADET_Channel *channel;
392 };
393
394
395
396 /******************************************************************************/
397 /*********************      FUNCTION DECLARATIONS     *************************/
398 /******************************************************************************/
399
400 /**
401  * Reconnect to the service, retransmit all infomation to try to restore the
402  * original state.
403  *
404  * @param h Handle to the CADET service.
405  */
406 static void
407 schedule_reconnect (struct GNUNET_CADET_Handle *h);
408
409
410 /**
411  * Reconnect callback: tries to reconnect again after a failer previous
412  * reconnection.
413  *
414  * @param cls Closure (cadet handle).
415  */
416 static void
417 reconnect_cbk (void *cls);
418
419
420 /**
421  * Reconnect to the service, retransmit all infomation to try to restore the
422  * original state.
423  *
424  * @param h handle to the cadet
425  */
426 static void
427 reconnect (struct GNUNET_CADET_Handle *h);
428
429
430 /******************************************************************************/
431 /***********************     AUXILIARY FUNCTIONS      *************************/
432 /******************************************************************************/
433
434 /**
435  * Check if transmission is a payload packet.
436  *
437  * @param th Transmission handle.
438  *
439  * @return #GNUNET_YES if it is a payload packet,
440  *         #GNUNET_NO if it is a cadet management packet.
441  */
442 static int
443 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
444 {
445   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
446 }
447
448
449 /**
450  * Find the Port struct for a hash.
451  *
452  * @param h CADET handle.
453  * @param hash HashCode for the port number.
454  *
455  * @return The port handle if known, NULL otherwise.
456  */
457 static struct GNUNET_CADET_Port *
458 find_port (const struct GNUNET_CADET_Handle *h,
459            const struct GNUNET_HashCode *hash)
460 {
461   struct GNUNET_CADET_Port *p;
462
463   p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
464
465   return p;
466 }
467
468
469 /**
470  * Get the channel handler for the channel specified by id from the given handle
471  *
472  * @param h Cadet handle
473  * @param ccn ID of the wanted channel
474  * @return handle to the required channel or NULL if not found
475  */
476 static struct GNUNET_CADET_Channel *
477 retrieve_channel (struct GNUNET_CADET_Handle *h,
478                   struct GNUNET_CADET_ClientChannelNumber ccn)
479 {
480   struct GNUNET_CADET_Channel *ch;
481
482   for (ch = h->channels_head; NULL != ch; ch = ch->next)
483     if (ch->ccn.channel_of_client == ccn.channel_of_client)
484       return ch;
485   return NULL;
486 }
487
488
489 /**
490  * Create a new channel and insert it in the channel list of the cadet handle
491  *
492  * @param h Cadet handle
493  * @param ccn Desired ccn of the channel, 0 to assign one automatically.
494  *
495  * @return Handle to the created channel.
496  */
497 static struct GNUNET_CADET_Channel *
498 create_channel (struct GNUNET_CADET_Handle *h,
499                 struct GNUNET_CADET_ClientChannelNumber ccn)
500 {
501   struct GNUNET_CADET_Channel *ch;
502
503   ch = GNUNET_new (struct GNUNET_CADET_Channel);
504   GNUNET_CONTAINER_DLL_insert (h->channels_head,
505                                h->channels_tail,
506                                ch);
507   ch->cadet = h;
508   if (0 == ccn.channel_of_client)
509   {
510     ch->ccn = h->next_ccn;
511     while (NULL != retrieve_channel (h,
512                                      h->next_ccn))
513     {
514       h->next_ccn.channel_of_client
515         = htonl (1 + ntohl (h->next_ccn.channel_of_client));
516       if (0 == ntohl (h->next_ccn.channel_of_client))
517         h->next_ccn.channel_of_client
518           = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
519     }
520   }
521   else
522   {
523     ch->ccn = ccn;
524   }
525   return ch;
526 }
527
528
529 /**
530  * Destroy the specified channel.
531  * - Destroys all peers, calling the disconnect callback on each if needed
532  * - Cancels all outgoing traffic for that channel, calling respective notifys
533  * - Calls cleaner if channel was inbound
534  * - Frees all memory used
535  *
536  * @param ch Pointer to the channel.
537  * @param call_cleaner Whether to call the cleaner handler.
538  *
539  * @return Handle to the required channel or NULL if not found.
540  */
541 static void
542 destroy_channel (struct GNUNET_CADET_Channel *ch)
543 {
544   struct GNUNET_CADET_Handle *h;
545   struct GNUNET_CADET_TransmitHandle *th;
546   struct GNUNET_CADET_TransmitHandle *next;
547
548   if (NULL == ch)
549   {
550     GNUNET_break (0);
551     return;
552   }
553   h = ch->cadet;
554   LOG (GNUNET_ERROR_TYPE_DEBUG,
555        " destroy_channel %X of %p\n",
556        ch->ccn,
557        h);
558
559   GNUNET_CONTAINER_DLL_remove (h->channels_head,
560                                h->channels_tail,
561                                ch);
562   if (NULL != ch->mq_cont)
563   {
564     GNUNET_SCHEDULER_cancel (ch->mq_cont);
565     ch->mq_cont = NULL;
566   }
567   /* signal channel destruction */
568   if (0 != ch->peer)
569   {
570     if (NULL != h->cleaner)
571     {
572       /** @a deprecated  */
573       LOG (GNUNET_ERROR_TYPE_DEBUG,
574           " calling cleaner\n");
575       h->cleaner (h->cls, ch, ch->ctx);
576     }
577     else if (NULL != ch->disconnects)
578     {
579       LOG (GNUNET_ERROR_TYPE_DEBUG,
580           " calling disconnect handler\n");
581       ch->disconnects (ch->ctx, ch);
582     }
583     else
584     {
585       /* Application won't be aware of the channel destruction and use
586        * a pointer to free'd memory.
587        */
588       GNUNET_assert (0);
589     }
590   }
591
592   /* check that clients did not leave messages behind in the queue */
593   for (th = h->th_head; NULL != th; th = next)
594   {
595     next = th->next;
596     if (th->channel != ch)
597       continue;
598     /* Clients should have aborted their requests already.
599      * Management traffic should be ok, as clients can't cancel that.
600      * If the service crashed and we are reconnecting, it's ok.
601      */
602     GNUNET_break (GNUNET_NO == th_is_payload (th));
603     GNUNET_CADET_notify_transmit_ready_cancel (th);
604   }
605
606   if (0 != ch->peer)
607     GNUNET_PEER_change_rc (ch->peer, -1);
608   GNUNET_free (ch);
609 }
610
611
612 /**
613  * Add a transmit handle to the transmission queue and set the
614  * timeout if needed.
615  *
616  * @param h cadet handle with the queue head and tail
617  * @param th handle to the packet to be transmitted
618  */
619 static void
620 add_to_queue (struct GNUNET_CADET_Handle *h,
621               struct GNUNET_CADET_TransmitHandle *th)
622 {
623   GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
624                                     h->th_tail,
625                                     th);
626 }
627
628
629 /**
630  * Remove a transmit handle from the transmission queue, if present.
631  *
632  * Safe to call even if not queued.
633  *
634  * @param th handle to the packet to be unqueued.
635  */
636 static void
637 remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
638 {
639   struct GNUNET_CADET_Handle *h = th->channel->cadet;
640
641   /* It might or might not have been queued (rarely not), but check anyway. */
642   if (NULL != th->next || h->th_tail == th)
643   {
644     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
645   }
646 }
647
648
649 /**
650  * Notify the application about a change in the window size (if needed).
651  *
652  * @param ch Channel to notify about.
653  */
654 static void
655 notify_window_size (struct GNUNET_CADET_Channel *ch)
656 {
657   if (NULL != ch->window_changes)
658   {
659     ch->window_changes (ch->ctx, ch, ch->allow_send);
660   }
661 }
662
663 /******************************************************************************/
664 /***********************      MQ API CALLBACKS     ****************************/
665 /******************************************************************************/
666
667 /**
668  * Allow the MQ implementation to send the next message.
669  *
670  * @param cls Closure (channel whose mq to activate).
671  */
672 static void
673 cadet_mq_send_continue (void *cls)
674 {
675   struct GNUNET_CADET_Channel *ch = cls;
676
677   ch->mq_cont = NULL;
678   GNUNET_MQ_impl_send_continue (ch->mq);
679 }
680
681 /**
682  * Implement sending functionality of a message queue for
683  * us sending messages to a peer.
684  *
685  * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
686  * in order to label the message with the channel ID and send the
687  * encapsulated message to the service.
688  *
689  * @param mq the message queue
690  * @param msg the message to send
691  * @param impl_state state of the implementation
692  */
693 static void
694 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
695                     const struct GNUNET_MessageHeader *msg,
696                     void *impl_state)
697 {
698   struct GNUNET_CADET_Channel *ch = impl_state;
699   struct GNUNET_CADET_Handle *h = ch->cadet;
700   uint16_t msize;
701   struct GNUNET_MQ_Envelope *env;
702   struct GNUNET_CADET_LocalData *cadet_msg;
703
704
705   if (NULL == h->mq)
706   {
707     /* We're currently reconnecting, pretend this worked */
708     GNUNET_MQ_impl_send_continue (mq);
709     return;
710   }
711
712   /* check message size for sanity */
713   msize = ntohs (msg->size);
714   if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
715   {
716     GNUNET_break (0);
717     GNUNET_MQ_impl_send_continue (mq);
718     return;
719   }
720
721   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
722                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
723                                  msg);
724   cadet_msg->ccn = ch->ccn;
725
726   if (0 < ch->allow_send)
727   {
728     /* Service has allowed this message, just send it and continue accepting */
729     GNUNET_MQ_send (h->mq, env);
730     ch->allow_send--;
731     ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
732     // notify_window_size (ch); /* FIXME add "verbose" setting? */
733   }
734   else
735   {
736     /* Service has NOT allowed this message, queue it and wait for an ACK */
737     GNUNET_assert (NULL == ch->pending_env);
738     ch->pending_env = env;
739   }
740 }
741
742
743 /**
744  * Handle destruction of a message queue.  Implementations must not
745  * free @a mq, but should take care of @a impl_state.
746  *
747  * @param mq the message queue to destroy
748  * @param impl_state state of the implementation
749  */
750 static void
751 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
752                        void *impl_state)
753 {
754   struct GNUNET_CADET_Channel *ch = impl_state;
755
756   GNUNET_assert (mq == ch->mq);
757   ch->mq = NULL;
758 }
759
760
761 /**
762  * We had an error processing a message we forwarded from a peer to
763  * the CADET service.  We should just complain about it but otherwise
764  * continue processing.
765  *
766  * @param cls closure
767  * @param error error code
768  */
769 static void
770 cadet_mq_error_handler (void *cls,
771                         enum GNUNET_MQ_Error error)
772 {
773   GNUNET_break_op (0);
774 }
775
776
777 /**
778  * Implementation function that cancels the currently sent message.
779  * Should basically undo whatever #mq_send_impl() did.
780  *
781  * @param mq message queue
782  * @param impl_state state specific to the implementation
783  */
784
785 static void
786 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
787                      void *impl_state)
788 {
789   struct GNUNET_CADET_Channel *ch = impl_state;
790
791   LOG (GNUNET_ERROR_TYPE_WARNING,
792        "Cannot cancel mq message on channel %X of %p\n",
793        ch->ccn.channel_of_client, ch->cadet);
794
795   GNUNET_break (0);
796 }
797
798
799 /******************************************************************************/
800 /***********************      RECEIVE HANDLERS     ****************************/
801 /******************************************************************************/
802
803
804 /**
805  * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
806  * request the data to send over MQ. Since MQ manages the queue, this function
807  * is scheduled immediatly after a transmit ready notification.
808  *
809  * @param cls Closure (transmit handle).
810  */
811 static void
812 request_data (void *cls)
813 {
814   struct GNUNET_CADET_TransmitHandle *th = cls;
815   struct GNUNET_CADET_LocalData *msg;
816   struct GNUNET_MQ_Envelope *env;
817   size_t osize;
818
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820        "Requesting Data: %u bytes (allow send is %u)\n",
821        th->size,
822        th->channel->allow_send);
823
824   GNUNET_assert (0 < th->channel->allow_send);
825   th->channel->allow_send--;
826   /* NOTE: we may be allowed to send another packet immediately,
827      albeit the current logic waits for the ACK. */
828   th->request_data_task = NULL;
829   remove_from_queue (th);
830
831   env = GNUNET_MQ_msg_extra (msg,
832                              th->size,
833                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
834   msg->ccn = th->channel->ccn;
835   osize = th->notify (th->notify_cls,
836                       th->size,
837                       &msg[1]);
838   GNUNET_assert (osize == th->size);
839
840   GNUNET_MQ_send (th->channel->cadet->mq,
841                   env);
842   GNUNET_free (th);
843 }
844
845
846 /**
847  * Process the new channel notification and add it to the channels in the handle
848  *
849  * @param h     The cadet handle
850  * @param msg   A message with the details of the new incoming channel
851  */
852 static void
853 handle_channel_created (void *cls,
854                         const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
855 {
856   struct GNUNET_CADET_Handle *h = cls;
857   struct GNUNET_CADET_Channel *ch;
858   struct GNUNET_CADET_Port *port;
859   const struct GNUNET_HashCode *port_number;
860   struct GNUNET_CADET_ClientChannelNumber ccn;
861
862   ccn = msg->ccn;
863   port_number = &msg->port;
864   if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
865   {
866     GNUNET_break (0);
867     return;
868   }
869   port = find_port (h, port_number);
870   if (NULL == port)
871   {
872     /* We could have closed the port but the service didn't know about it yet
873      * This is not an error.
874      */
875     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
876     struct GNUNET_MQ_Envelope *env;
877
878     GNUNET_break (0);
879     LOG (GNUNET_ERROR_TYPE_DEBUG,
880          "No handler for incoming channel %X (on port %s, recently closed?)\n",
881          ntohl (ccn.channel_of_client),
882          GNUNET_h2s (port_number));
883     env = GNUNET_MQ_msg (d_msg,
884                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
885     d_msg->ccn = msg->ccn;
886     GNUNET_MQ_send (h->mq,
887                     env);
888     return;
889   }
890
891   ch = create_channel (h,
892                        ccn);
893   ch->peer = GNUNET_PEER_intern (&msg->peer);
894   ch->cadet = h;
895   ch->ccn = ccn;
896   ch->incoming_port = port;
897   ch->options = ntohl (msg->opt);
898   LOG (GNUNET_ERROR_TYPE_DEBUG,
899        "Creating incoming channel %X [%s] %p\n",
900        ntohl (ccn.channel_of_client),
901        GNUNET_h2s (port_number),
902        ch);
903
904   if (NULL != port->handler)
905   {
906     /** @deprecated */
907     /* Old style API */
908     ch->ctx = port->handler (port->cls,
909                              ch,
910                              &msg->peer,
911                              port->hash,
912                              ch->options);
913   }
914   else
915   {
916     /* MQ API */
917     GNUNET_assert (NULL != port->connects);
918     ch->window_changes = port->window_changes;
919     ch->disconnects = port->disconnects;
920     ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
921                                             &cadet_mq_destroy_impl,
922                                             &cadet_mq_cancel_impl,
923                                             ch,
924                                             port->handlers,
925                                             &cadet_mq_error_handler,
926                                             ch);
927     ch->ctx = port->connects (port->cls,
928                               ch,
929                               &msg->peer);
930     GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
931   }
932 }
933
934
935 /**
936  * Process the channel destroy notification and free associated resources
937  *
938  * @param h     The cadet handle
939  * @param msg   A message with the details of the channel being destroyed
940  */
941 static void
942 handle_channel_destroy (void *cls,
943                         const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
944 {
945   struct GNUNET_CADET_Handle *h = cls;
946   struct GNUNET_CADET_Channel *ch;
947   struct GNUNET_CADET_ClientChannelNumber ccn;
948
949   ccn = msg->ccn;
950   LOG (GNUNET_ERROR_TYPE_DEBUG,
951        "Channel %X Destroy from service\n",
952        ntohl (ccn.channel_of_client));
953   ch = retrieve_channel (h,
954                          ccn);
955
956   if (NULL == ch)
957   {
958     LOG (GNUNET_ERROR_TYPE_DEBUG,
959          "channel %X unknown\n",
960          ntohl (ccn.channel_of_client));
961     return;
962   }
963   destroy_channel (ch);
964 }
965
966
967 /**
968  * Check that message received from CADET service is well-formed.
969  *
970  * @param cls the `struct GNUNET_CADET_Handle`
971  * @param message the message we got
972  * @return #GNUNET_OK if the message is well-formed,
973  *         #GNUNET_SYSERR otherwise
974  */
975 static int
976 check_local_data (void *cls,
977                   const struct GNUNET_CADET_LocalData *message)
978 {
979   struct GNUNET_CADET_Handle *h = cls;
980   struct GNUNET_CADET_Channel *ch;
981   uint16_t size;
982
983   size = ntohs (message->header.size);
984   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
985   {
986     GNUNET_break_op (0);
987     return GNUNET_SYSERR;
988   }
989
990   ch = retrieve_channel (h,
991                          message->ccn);
992   if (NULL == ch)
993   {
994     GNUNET_break_op (0);
995     return GNUNET_SYSERR;
996   }
997
998   return GNUNET_OK;
999 }
1000
1001
1002 /**
1003  * Process the incoming data packets, call appropriate handlers.
1004  *
1005  * @param h       The cadet handle
1006  * @param message A message encapsulating the data
1007  */
1008 static void
1009 handle_local_data (void *cls,
1010                    const struct GNUNET_CADET_LocalData *message)
1011 {
1012   struct GNUNET_CADET_Handle *h = cls;
1013   const struct GNUNET_MessageHeader *payload;
1014   const struct GNUNET_CADET_MessageHandler *handler;
1015   struct GNUNET_CADET_Channel *ch;
1016   uint16_t type;
1017   int fwd;
1018
1019   ch = retrieve_channel (h,
1020                          message->ccn);
1021   if (NULL == ch)
1022   {
1023     GNUNET_break_op (0);
1024     reconnect (h);
1025     return;
1026   }
1027
1028   payload = (struct GNUNET_MessageHeader *) &message[1];
1029   type = ntohs (payload->type);
1030   fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
1031   LOG (GNUNET_ERROR_TYPE_DEBUG,
1032        "Got a %s data on channel %s [%X] of type %s (%u)\n",
1033        GC_f2s (fwd),
1034        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
1035        ntohl (message->ccn.channel_of_client),
1036        GC_m2s (type),
1037        type);
1038   if (NULL != ch->mq)
1039   {
1040     LOG (GNUNET_ERROR_TYPE_DEBUG,
1041          "injecting msg %s into mq %p\n",
1042          GC_m2s (ntohs (payload->type)),
1043          ch->mq);
1044     GNUNET_MQ_inject_message (ch->mq,
1045                               payload);
1046     return;
1047   }
1048   /** @a deprecated */
1049   for (unsigned i=0;i<h->n_handlers;i++)
1050   {
1051     handler = &h->message_handlers[i];
1052     if (handler->type == type)
1053     {
1054       if (GNUNET_OK !=
1055           handler->callback (h->cls,
1056                              ch,
1057                              &ch->ctx,
1058                              payload))
1059       {
1060         LOG (GNUNET_ERROR_TYPE_DEBUG,
1061              "callback caused disconnection\n");
1062         GNUNET_CADET_channel_destroy (ch);
1063         return;
1064       }
1065       return;
1066     }
1067   }
1068   /* Other peer sent message we do not comprehend. */
1069   GNUNET_break_op (0);
1070   GNUNET_CADET_receive_done (ch);
1071 }
1072
1073
1074 /**
1075  * Process a local ACK message, enabling the client to send
1076  * more data to the service.
1077  *
1078  * @param h Cadet handle.
1079  * @param message Message itself.
1080  */
1081 static void
1082 handle_local_ack (void *cls,
1083                   const struct GNUNET_CADET_LocalAck *message)
1084 {
1085   struct GNUNET_CADET_Handle *h = cls;
1086   struct GNUNET_CADET_Channel *ch;
1087   struct GNUNET_CADET_ClientChannelNumber ccn;
1088   struct GNUNET_CADET_TransmitHandle *th;
1089
1090   ccn = message->ccn;
1091   ch = retrieve_channel (h, ccn);
1092   if (NULL == ch)
1093   {
1094     LOG (GNUNET_ERROR_TYPE_DEBUG,
1095          "ACK on unknown channel %X\n",
1096          ntohl (ccn.channel_of_client));
1097     return;
1098   }
1099   ch->allow_send++;
1100   if (NULL != ch->mq)
1101   {
1102     if (NULL == ch->pending_env)
1103     {
1104       LOG (GNUNET_ERROR_TYPE_DEBUG,
1105            "Got an ACK on mq channel %X, allow send now %u!\n",
1106            ntohl (ch->ccn.channel_of_client),
1107            ch->allow_send);
1108       notify_window_size (ch);
1109     }
1110     else
1111     {
1112       LOG (GNUNET_ERROR_TYPE_DEBUG,
1113            "Got an ACK on mq channel %X, sending pending message!\n",
1114            ntohl (ch->ccn.channel_of_client));
1115       GNUNET_MQ_send (h->mq, ch->pending_env);
1116       ch->allow_send--;
1117       ch->pending_env = NULL;
1118       ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
1119     }
1120     return;
1121   }
1122
1123   /** @deprecated */
1124   /* Old style API */
1125   for (th = h->th_head; NULL != th; th = th->next)
1126   {
1127     if ( (th->channel == ch) &&
1128          (NULL == th->request_data_task) )
1129     {
1130       th->request_data_task
1131         = GNUNET_SCHEDULER_add_now (&request_data,
1132                                     th);
1133       break;
1134     }
1135   }
1136 }
1137
1138
1139 /**
1140  * Generic error handler, called with the appropriate error code and
1141  * the same closure specified at the creation of the message queue.
1142  * Not every message queue implementation supports an error handler.
1143  *
1144  * @param cls closure, a `struct GNUNET_CORE_Handle *`
1145  * @param error error code
1146  */
1147 static void
1148 handle_mq_error (void *cls,
1149                  enum GNUNET_MQ_Error error)
1150 {
1151   struct GNUNET_CADET_Handle *h = cls;
1152
1153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
1154   GNUNET_MQ_destroy (h->mq);
1155   h->mq = NULL;
1156   reconnect (h);
1157 }
1158
1159
1160 /*
1161  * Process a local reply about info on all channels, pass info to the user.
1162  *
1163  * @param h Cadet handle.
1164  * @param message Message itself.
1165  */
1166 // static void
1167 // process_get_channels (struct GNUNET_CADET_Handle *h,
1168 //                      const struct GNUNET_MessageHeader *message)
1169 // {
1170 //   struct GNUNET_CADET_LocalInfo *msg;
1171 //
1172 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
1173 //
1174 //   if (NULL == h->channels_cb)
1175 //   {
1176 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1177 //     return;
1178 //   }
1179 //
1180 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1181 //   if (ntohs (message->size) !=
1182 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
1183 //        sizeof (struct GNUNET_PeerIdentity)))
1184 //   {
1185 //     GNUNET_break_op (0);
1186 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1187 //                 "Get channels message: size %hu - expected %u\n",
1188 //                 ntohs (message->size),
1189 //                 sizeof (struct GNUNET_CADET_LocalInfo));
1190 //     return;
1191 //   }
1192 //   h->channels_cb (h->channels_cls,
1193 //                   ntohl (msg->channel_id),
1194 //                   &msg->owner,
1195 //                   &msg->destination);
1196 // }
1197
1198
1199
1200 /*
1201  * Process a local monitor_channel reply, pass info to the user.
1202  *
1203  * @param h Cadet handle.
1204  * @param message Message itself.
1205  */
1206 // static void
1207 // process_show_channel (struct GNUNET_CADET_Handle *h,
1208 //                      const struct GNUNET_MessageHeader *message)
1209 // {
1210 //   struct GNUNET_CADET_LocalInfo *msg;
1211 //   size_t esize;
1212 //
1213 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1214 //
1215 //   if (NULL == h->channel_cb)
1216 //   {
1217 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1218 //     return;
1219 //   }
1220 //
1221 //   /* Verify message sanity */
1222 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1223 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
1224 //   if (ntohs (message->size) != esize)
1225 //   {
1226 //     GNUNET_break_op (0);
1227 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1228 //                 "Show channel message: size %hu - expected %u\n",
1229 //                 ntohs (message->size),
1230 //                 esize);
1231 //
1232 //     h->channel_cb (h->channel_cls, NULL, NULL);
1233 //     h->channel_cb = NULL;
1234 //     h->channel_cls = NULL;
1235 //
1236 //     return;
1237 //   }
1238 //
1239 //   h->channel_cb (h->channel_cls,
1240 //                  &msg->destination,
1241 //                  &msg->owner);
1242 // }
1243
1244
1245
1246 /**
1247  * Check that message received from CADET service is well-formed.
1248  *
1249  * @param cls the `struct GNUNET_CADET_Handle`
1250  * @param message the message we got
1251  * @return #GNUNET_OK if the message is well-formed,
1252  *         #GNUNET_SYSERR otherwise
1253  */
1254 static int
1255 check_get_peers (void *cls,
1256                  const struct GNUNET_CADET_LocalInfoPeer *message)
1257 {
1258   struct GNUNET_CADET_Handle *h = cls;
1259   uint16_t size;
1260
1261   if (NULL == h->info_cb.peers_cb)
1262   {
1263     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1264                 "  no handler for peesr monitor message!\n");
1265     return GNUNET_SYSERR;
1266   }
1267
1268   size = ntohs (message->header.size);
1269   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1270   {
1271     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1272     h->info_cb.peers_cb = NULL;
1273     h->info_cls = NULL;
1274     return GNUNET_SYSERR;
1275   }
1276
1277   return GNUNET_OK;
1278 }
1279
1280
1281 /**
1282  * Process a local reply about info on all tunnels, pass info to the user.
1283  *
1284  * @param cls Closure (Cadet handle).
1285  * @param msg Message itself.
1286  */
1287 static void
1288 handle_get_peers (void *cls,
1289                   const struct GNUNET_CADET_LocalInfoPeer *msg)
1290 {
1291   struct GNUNET_CADET_Handle *h = cls;
1292   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1293                        (int) ntohs (msg->tunnel),
1294                        (unsigned int ) ntohs (msg->paths),
1295                        0);
1296 }
1297
1298
1299 /**
1300  * Check that message received from CADET service is well-formed.
1301  *
1302  * @param cls the `struct GNUNET_CADET_Handle`
1303  * @param message the message we got
1304  * @return #GNUNET_OK if the message is well-formed,
1305  *         #GNUNET_SYSERR otherwise
1306  */
1307 static int
1308 check_get_peer (void *cls,
1309                 const struct GNUNET_CADET_LocalInfoPeer *message)
1310 {
1311   struct GNUNET_CADET_Handle *h = cls;
1312   const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1313   struct GNUNET_PeerIdentity *paths_array;
1314   size_t esize;
1315   unsigned int epaths;
1316   unsigned int paths;
1317   unsigned int peers;
1318
1319   if (NULL == h->info_cb.peer_cb)
1320   {
1321     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1322                 "  no handler for peer monitor message!\n");
1323     goto clean_cls;
1324   }
1325
1326   /* Verify message sanity */
1327   esize = ntohs (message->header.size);
1328   if (esize < msize)
1329   {
1330     GNUNET_break_op (0);
1331     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1332     goto clean_cls;
1333   }
1334   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
1335   {
1336     GNUNET_break_op (0);
1337     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1338     goto clean_cls;
1339
1340   }
1341   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
1342   epaths = (unsigned int) ntohs (message->paths);
1343   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1344   paths = 0;
1345   for (int i = 0; i < peers; i++)
1346   {
1347     if (0 == memcmp (&paths_array[i], &message->destination,
1348                      sizeof (struct GNUNET_PeerIdentity)))
1349     {
1350       paths++;
1351     }
1352   }
1353   if (paths != epaths)
1354   {
1355     GNUNET_break_op (0);
1356     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1357     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1358     goto clean_cls;
1359   }
1360
1361   return GNUNET_OK;
1362
1363 clean_cls:
1364   h->info_cb.peer_cb = NULL;
1365   h->info_cls = NULL;
1366   return GNUNET_SYSERR;
1367 }
1368
1369
1370 /**
1371  * Process a local peer info reply, pass info to the user.
1372  *
1373  * @param cls Closure (Cadet handle).
1374  * @param message Message itself.
1375  */
1376 static void
1377 handle_get_peer (void *cls,
1378                  const struct GNUNET_CADET_LocalInfoPeer *message)
1379 {
1380   struct GNUNET_CADET_Handle *h = cls;
1381   struct GNUNET_PeerIdentity *paths_array;
1382   unsigned int paths;
1383   unsigned int path_length;
1384   int neighbor;
1385   unsigned int peers;
1386
1387   paths = (unsigned int) ntohs (message->paths);
1388   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1389   peers = (ntohs (message->header.size) - sizeof (*message))
1390           / sizeof (struct GNUNET_PeerIdentity);
1391   path_length = 0;
1392   neighbor = GNUNET_NO;
1393
1394   for (int i = 0; i < peers; i++)
1395   {
1396     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
1397     path_length++;
1398     if (0 == memcmp (&paths_array[i], &message->destination,
1399                      sizeof (struct GNUNET_PeerIdentity)))
1400     {
1401       if (1 == path_length)
1402         neighbor = GNUNET_YES;
1403       path_length = 0;
1404     }
1405   }
1406
1407   /* Call Callback with tunnel info. */
1408   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1409   h->info_cb.peer_cb (h->info_cls,
1410                       &message->destination,
1411                       (int) ntohs (message->tunnel),
1412                       neighbor,
1413                       paths,
1414                       paths_array);
1415 }
1416
1417
1418 /**
1419  * Check that message received from CADET service is well-formed.
1420  *
1421  * @param cls the `struct GNUNET_CADET_Handle`
1422  * @param msg the message we got
1423  * @return #GNUNET_OK if the message is well-formed,
1424  *         #GNUNET_SYSERR otherwise
1425  */
1426 static int
1427 check_get_tunnels (void *cls,
1428                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1429 {
1430   struct GNUNET_CADET_Handle *h = cls;
1431   uint16_t size;
1432
1433   if (NULL == h->info_cb.tunnels_cb)
1434   {
1435     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1436                 "  no handler for tunnels monitor message!\n");
1437     return GNUNET_SYSERR;
1438   }
1439
1440   size = ntohs (msg->header.size);
1441   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1442   {
1443     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1444     h->info_cb.tunnels_cb = NULL;
1445     h->info_cls = NULL;
1446     return GNUNET_SYSERR;
1447   }
1448   return GNUNET_OK;
1449 }
1450
1451
1452 /**
1453  * Process a local reply about info on all tunnels, pass info to the user.
1454  *
1455  * @param cls Closure (Cadet handle).
1456  * @param message Message itself.
1457  */
1458 static void
1459 handle_get_tunnels (void *cls,
1460                     const struct GNUNET_CADET_LocalInfoTunnel *msg)
1461 {
1462   struct GNUNET_CADET_Handle *h = cls;
1463
1464   h->info_cb.tunnels_cb (h->info_cls,
1465                          &msg->destination,
1466                          ntohl (msg->channels),
1467                          ntohl (msg->connections),
1468                          ntohs (msg->estate),
1469                          ntohs (msg->cstate));
1470
1471 }
1472
1473
1474 /**
1475  * Check that message received from CADET service is well-formed.
1476  *
1477  * @param cls the `struct GNUNET_CADET_Handle`
1478  * @param msg the message we got
1479  * @return #GNUNET_OK if the message is well-formed,
1480  *         #GNUNET_SYSERR otherwise
1481  */
1482 static int
1483 check_get_tunnel (void *cls,
1484                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
1485 {
1486   struct GNUNET_CADET_Handle *h = cls;
1487   unsigned int ch_n;
1488   unsigned int c_n;
1489   size_t esize;
1490   size_t msize;
1491
1492   if (NULL == h->info_cb.tunnel_cb)
1493   {
1494     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1495                 "  no handler for tunnel monitor message!\n");
1496     goto clean_cls;
1497   }
1498
1499   /* Verify message sanity */
1500   msize = ntohs (msg->header.size);
1501   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1502   if (esize > msize)
1503   {
1504     GNUNET_break_op (0);
1505     h->info_cb.tunnel_cb (h->info_cls,
1506                           NULL, 0, 0, NULL, NULL, 0, 0);
1507     goto clean_cls;
1508   }
1509   ch_n = ntohl (msg->channels);
1510   c_n = ntohl (msg->connections);
1511   esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber);
1512   esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier);
1513   if (msize != esize)
1514   {
1515     GNUNET_break_op (0);
1516     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1517                 "m:%u, e: %u (%u ch, %u conn)\n",
1518                 (unsigned int) msize,
1519                 (unsigned int) esize,
1520                 ch_n,
1521                 c_n);
1522     h->info_cb.tunnel_cb (h->info_cls,
1523                           NULL, 0, 0, NULL, NULL, 0, 0);
1524     goto clean_cls;
1525   }
1526
1527   return GNUNET_OK;
1528
1529 clean_cls:
1530   h->info_cb.tunnel_cb = NULL;
1531   h->info_cls = NULL;
1532   return GNUNET_SYSERR;
1533 }
1534
1535
1536 /**
1537  * Process a local tunnel info reply, pass info to the user.
1538  *
1539  * @param cls Closure (Cadet handle).
1540  * @param msg Message itself.
1541  */
1542 static void
1543 handle_get_tunnel (void *cls,
1544                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1545 {
1546   struct GNUNET_CADET_Handle *h = cls;
1547   unsigned int ch_n;
1548   unsigned int c_n;
1549   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
1550   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
1551
1552   ch_n = ntohl (msg->channels);
1553   c_n = ntohl (msg->connections);
1554
1555   /* Call Callback with tunnel info. */
1556   conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
1557   chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n];
1558   h->info_cb.tunnel_cb (h->info_cls,
1559                         &msg->destination,
1560                         ch_n,
1561                         c_n,
1562                         chns,
1563                         conns,
1564                         ntohs (msg->estate),
1565                         ntohs (msg->cstate));
1566 }
1567
1568
1569 /**
1570  * Reconnect to the service, retransmit all infomation to try to restore the
1571  * original state.
1572  *
1573  * @param h handle to the cadet
1574  */
1575 static void
1576 reconnect (struct GNUNET_CADET_Handle *h)
1577 {
1578   struct GNUNET_MQ_MessageHandler handlers[] = {
1579     GNUNET_MQ_hd_fixed_size (channel_created,
1580                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
1581                              struct GNUNET_CADET_LocalChannelCreateMessage,
1582                              h),
1583     GNUNET_MQ_hd_fixed_size (channel_destroy,
1584                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
1585                              struct GNUNET_CADET_LocalChannelDestroyMessage,
1586                              h),
1587     GNUNET_MQ_hd_var_size (local_data,
1588                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
1589                            struct GNUNET_CADET_LocalData,
1590                            h),
1591     GNUNET_MQ_hd_fixed_size (local_ack,
1592                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
1593                              struct GNUNET_CADET_LocalAck,
1594                              h),
1595     GNUNET_MQ_hd_var_size (get_peers,
1596                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
1597                            struct GNUNET_CADET_LocalInfoPeer,
1598                            h),
1599     GNUNET_MQ_hd_var_size (get_peer,
1600                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
1601                            struct GNUNET_CADET_LocalInfoPeer,
1602                            h),
1603     GNUNET_MQ_hd_var_size (get_tunnels,
1604                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
1605                            struct GNUNET_CADET_LocalInfoTunnel,
1606                            h),
1607     GNUNET_MQ_hd_var_size (get_tunnel,
1608                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
1609                            struct GNUNET_CADET_LocalInfoTunnel,
1610                            h),
1611 // FIXME
1612 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
1613 //                        GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
1614 //                        struct GNUNET_CADET_ChannelDestroyMessage);
1615     GNUNET_MQ_handler_end ()
1616   };
1617   struct GNUNET_CADET_Channel *ch;
1618
1619   while (NULL != (ch = h->channels_head))
1620   {
1621     LOG (GNUNET_ERROR_TYPE_DEBUG,
1622          "Destroying channel due to a reconnect\n");
1623     destroy_channel (ch);
1624   }
1625
1626   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
1627
1628   if (NULL != h->mq)
1629   {
1630     GNUNET_MQ_destroy (h->mq);
1631     h->mq = NULL;
1632   }
1633   h->mq = GNUNET_CLIENT_connect (h->cfg,
1634                                  "cadet",
1635                                  handlers,
1636                                  &handle_mq_error,
1637                                  h);
1638   if (NULL == h->mq)
1639   {
1640     schedule_reconnect (h);
1641     return;
1642   }
1643   else
1644   {
1645     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1646   }
1647 }
1648
1649 /**
1650  * Reconnect callback: tries to reconnect again after a failer previous
1651  * reconnecttion
1652  *
1653  * @param cls closure (cadet handle)
1654  */
1655 static void
1656 reconnect_cbk (void *cls)
1657 {
1658   struct GNUNET_CADET_Handle *h = cls;
1659
1660   h->reconnect_task = NULL;
1661   reconnect (h);
1662 }
1663
1664
1665 /**
1666  * Reconnect to the service, retransmit all infomation to try to restore the
1667  * original state.
1668  *
1669  * @param h handle to the cadet
1670  *
1671  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
1672  */
1673 static void
1674 schedule_reconnect (struct GNUNET_CADET_Handle *h)
1675 {
1676   if (NULL == h->reconnect_task)
1677   {
1678     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
1679                                                       &reconnect_cbk, h);
1680     h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
1681   }
1682 }
1683
1684
1685 /******************************************************************************/
1686 /**********************      API CALL DEFINITIONS     *************************/
1687 /******************************************************************************/
1688
1689 struct GNUNET_CADET_Handle *
1690 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1691                       void *cls,
1692                       GNUNET_CADET_ChannelEndHandler cleaner,
1693                       const struct GNUNET_CADET_MessageHandler *handlers)
1694 {
1695   struct GNUNET_CADET_Handle *h;
1696
1697   h = GNUNET_new (struct GNUNET_CADET_Handle);
1698   LOG (GNUNET_ERROR_TYPE_DEBUG,
1699        "GNUNET_CADET_connect() %p\n",
1700        h);
1701   h->cfg = cfg;
1702   h->cleaner = cleaner;
1703   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1704   reconnect (h);
1705   if (h->mq == NULL)
1706   {
1707     GNUNET_break (0);
1708     GNUNET_CADET_disconnect (h);
1709     return NULL;
1710   }
1711   h->cls = cls;
1712   h->message_handlers = handlers;
1713   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1714   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1715   h->reconnect_task = NULL;
1716
1717   /* count handlers */
1718   for (h->n_handlers = 0;
1719        handlers && handlers[h->n_handlers].type;
1720        h->n_handlers++) ;
1721   return h;
1722 }
1723
1724
1725 /**
1726  * Disconnect from the cadet service. All channels will be destroyed. All channel
1727  * disconnect callbacks will be called on any still connected peers, notifying
1728  * about their disconnection. The registered inbound channel cleaner will be
1729  * called should any inbound channels still exist.
1730  *
1731  * @param handle connection to cadet to disconnect
1732  */
1733 void
1734 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1735 {
1736   struct GNUNET_CADET_Channel *ch;
1737   struct GNUNET_CADET_Channel *aux;
1738   struct GNUNET_CADET_TransmitHandle *th;
1739
1740   LOG (GNUNET_ERROR_TYPE_DEBUG,
1741        "CADET DISCONNECT\n");
1742   ch = handle->channels_head;
1743   while (NULL != ch)
1744   {
1745     aux = ch->next;
1746     if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1747     {
1748       GNUNET_break (0);
1749       LOG (GNUNET_ERROR_TYPE_DEBUG,
1750            "channel %X not destroyed\n",
1751            ntohl (ch->ccn.channel_of_client));
1752     }
1753     destroy_channel (ch);
1754     ch = aux;
1755   }
1756   while (NULL != (th = handle->th_head))
1757   {
1758     struct GNUNET_MessageHeader *msg;
1759
1760     /* Make sure it is an allowed packet (everything else should have been
1761      * already canceled).
1762      */
1763     GNUNET_break (GNUNET_NO == th_is_payload (th));
1764     msg = (struct GNUNET_MessageHeader *) &th[1];
1765     switch (ntohs(msg->type))
1766     {
1767       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1768       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1769       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN:
1770       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE:
1771       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1772       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1773       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1774       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1775       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1776       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1777         break;
1778       default:
1779         GNUNET_break (0);
1780         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1781              GC_m2s (ntohs(msg->type)));
1782     }
1783
1784     GNUNET_CADET_notify_transmit_ready_cancel (th);
1785   }
1786
1787   if (NULL != handle->mq)
1788   {
1789     GNUNET_MQ_destroy (handle->mq);
1790     handle->mq = NULL;
1791   }
1792   if (NULL != handle->reconnect_task)
1793   {
1794     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1795     handle->reconnect_task = NULL;
1796   }
1797
1798   GNUNET_CONTAINER_multihashmap_destroy (handle->ports);
1799   handle->ports = NULL;
1800   GNUNET_free (handle);
1801 }
1802
1803
1804 /**
1805  * Open a port to receive incomming channels.
1806  *
1807  * @param h CADET handle.
1808  * @param port Hash representing the port number.
1809  * @param new_channel Function called when an channel is received.
1810  * @param new_channel_cls Closure for @a new_channel.
1811  * @return Port handle.
1812  */
1813 struct GNUNET_CADET_Port *
1814 GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1815                         const struct GNUNET_HashCode *port,
1816                         GNUNET_CADET_InboundChannelNotificationHandler
1817                             new_channel,
1818                         void *new_channel_cls)
1819 {
1820   struct GNUNET_CADET_PortMessage *msg;
1821   struct GNUNET_MQ_Envelope *env;
1822   struct GNUNET_CADET_Port *p;
1823
1824   GNUNET_assert (NULL != new_channel);
1825   p = GNUNET_new (struct GNUNET_CADET_Port);
1826   p->cadet = h;
1827   p->hash = GNUNET_new (struct GNUNET_HashCode);
1828   *p->hash = *port;
1829   p->handler = new_channel;
1830   p->cls = new_channel_cls;
1831   GNUNET_assert (GNUNET_OK ==
1832                  GNUNET_CONTAINER_multihashmap_put (h->ports,
1833                                                     p->hash,
1834                                                     p,
1835                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1836
1837   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1838   msg->port = *p->hash;
1839   GNUNET_MQ_send (h->mq, env);
1840
1841   return p;
1842 }
1843
1844 /**
1845  * Close a port opened with @a GNUNET_CADET_open_port.
1846  * The @a new_channel callback will no longer be called.
1847  *
1848  * @param p Port handle.
1849  */
1850 void
1851 GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1852 {
1853   struct GNUNET_CADET_PortMessage *msg;
1854   struct GNUNET_MQ_Envelope *env;
1855   struct GNUNET_HashCode *id;
1856
1857   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1858
1859   id = NULL != p->hash ? p->hash : &p->id;
1860   msg->port = *id;
1861   GNUNET_MQ_send (p->cadet->mq, env);
1862   GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
1863   GNUNET_free_non_null (p->hash);
1864   GNUNET_free (p);
1865 }
1866
1867
1868 /**
1869  * Create a new channel towards a remote peer.
1870  *
1871  * If the destination port is not open by any peer or the destination peer
1872  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1873  * for this channel.
1874  *
1875  * @param h cadet handle
1876  * @param channel_ctx client's channel context to associate with the channel
1877  * @param peer peer identity the channel should go to
1878  * @param port Port hash (port number).
1879  * @param options CadetOption flag field, with all desired option bits set to 1.
1880  * @return handle to the channel
1881  */
1882 struct GNUNET_CADET_Channel *
1883 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1884                             void *channel_ctx,
1885                             const struct GNUNET_PeerIdentity *peer,
1886                             const struct GNUNET_HashCode *port,
1887                             enum GNUNET_CADET_ChannelOption options)
1888 {
1889   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
1890   struct GNUNET_MQ_Envelope *env;
1891   struct GNUNET_CADET_Channel *ch;
1892   struct GNUNET_CADET_ClientChannelNumber ccn;
1893
1894   ccn.channel_of_client = htonl (0);
1895   ch = create_channel (h, ccn);
1896   ch->ctx = channel_ctx;
1897   ch->peer = GNUNET_PEER_intern (peer);
1898
1899   LOG (GNUNET_ERROR_TYPE_DEBUG,
1900        "Creating new channel to %s:%u at %p number %X\n",
1901        GNUNET_i2s (peer),
1902        port,
1903        ch,
1904        ntohl (ch->ccn.channel_of_client));
1905   env = GNUNET_MQ_msg (msg,
1906                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1907   msg->ccn = ch->ccn;
1908   msg->port = *port;
1909   msg->peer = *peer;
1910   msg->opt = htonl (options);
1911   GNUNET_MQ_send (h->mq,
1912                   env);
1913   return ch;
1914 }
1915
1916
1917 void
1918 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1919 {
1920   struct GNUNET_CADET_Handle *h;
1921   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
1922   struct GNUNET_MQ_Envelope *env;
1923   struct GNUNET_CADET_TransmitHandle *th;
1924   struct GNUNET_CADET_TransmitHandle *next;
1925
1926   LOG (GNUNET_ERROR_TYPE_DEBUG,
1927        "Destroying channel\n");
1928   h = channel->cadet;
1929   for  (th = h->th_head; th != NULL; th = next)
1930   {
1931     next = th->next;
1932     if (th->channel == channel)
1933     {
1934       GNUNET_break (0);
1935       if (GNUNET_YES == th_is_payload (th))
1936       {
1937         /* applications should cancel before destroying channel */
1938         LOG (GNUNET_ERROR_TYPE_WARNING,
1939              "Channel destroyed without cancelling transmission requests\n");
1940         th->notify (th->notify_cls, 0, NULL);
1941       }
1942       else
1943       {
1944         LOG (GNUNET_ERROR_TYPE_WARNING,
1945              "no meta-traffic should be queued\n");
1946       }
1947       GNUNET_CONTAINER_DLL_remove (h->th_head,
1948                                    h->th_tail,
1949                                    th);
1950       GNUNET_CADET_notify_transmit_ready_cancel (th);
1951     }
1952   }
1953
1954   env = GNUNET_MQ_msg (msg,
1955                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1956   msg->ccn = channel->ccn;
1957   GNUNET_MQ_send (h->mq,
1958                   env);
1959
1960   destroy_channel (channel);
1961 }
1962
1963
1964 /**
1965  * Get information about a channel.
1966  *
1967  * @param channel Channel handle.
1968  * @param option Query (GNUNET_CADET_OPTION_*).
1969  * @param ... dependant on option, currently not used
1970  *
1971  * @return Union with an answer to the query.
1972  */
1973 const union GNUNET_CADET_ChannelInfo *
1974 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1975                               enum GNUNET_CADET_ChannelOption option, ...)
1976 {
1977   static int bool_flag;
1978   const union GNUNET_CADET_ChannelInfo *ret;
1979
1980   switch (option)
1981   {
1982     case GNUNET_CADET_OPTION_NOBUFFER:
1983     case GNUNET_CADET_OPTION_RELIABLE:
1984     case GNUNET_CADET_OPTION_OUT_OF_ORDER:
1985       if (0 != (option & channel->options))
1986         bool_flag = GNUNET_YES;
1987       else
1988         bool_flag = GNUNET_NO;
1989       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1990       break;
1991     case GNUNET_CADET_OPTION_PEER:
1992       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1993       break;
1994     default:
1995       GNUNET_break (0);
1996       return NULL;
1997   }
1998
1999   return ret;
2000 }
2001
2002
2003 struct GNUNET_CADET_TransmitHandle *
2004 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
2005                                     int cork,
2006                                     struct GNUNET_TIME_Relative maxdelay,
2007                                     size_t notify_size,
2008                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
2009                                     void *notify_cls)
2010 {
2011   struct GNUNET_CADET_TransmitHandle *th;
2012
2013   GNUNET_assert (NULL != channel);
2014   GNUNET_assert (NULL != notify);
2015   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
2016   LOG (GNUNET_ERROR_TYPE_DEBUG,
2017        "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
2018        ntohl (channel->ccn.channel_of_client),
2019        channel->allow_send,
2020        (ntohl (channel->ccn.channel_of_client) >=
2021         GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
2022        ? "origin"
2023        : "destination",
2024        (unsigned int) notify_size);
2025   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
2026   {
2027     LOG (GNUNET_ERROR_TYPE_WARNING,
2028          "CADET transmit ready timeout is deprected (has no effect)\n");
2029   }
2030
2031   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
2032   th->channel = channel;
2033   th->size = notify_size;
2034   th->notify = notify;
2035   th->notify_cls = notify_cls;
2036   if (0 != channel->allow_send)
2037     th->request_data_task
2038       = GNUNET_SCHEDULER_add_now (&request_data,
2039                                   th);
2040   else
2041     add_to_queue (channel->cadet,
2042                   th);
2043   return th;
2044 }
2045
2046
2047 void
2048 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
2049 {
2050   if (NULL != th->request_data_task)
2051   {
2052     GNUNET_SCHEDULER_cancel (th->request_data_task);
2053     th->request_data_task = NULL;
2054   }
2055   remove_from_queue (th);
2056   GNUNET_free (th);
2057 }
2058
2059
2060 /**
2061  * Send an ack on the channel to confirm the processing of a message.
2062  *
2063  * @param ch Channel on which to send the ACK.
2064  */
2065 void
2066 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
2067 {
2068   struct GNUNET_CADET_LocalAck *msg;
2069   struct GNUNET_MQ_Envelope *env;
2070
2071   env = GNUNET_MQ_msg (msg,
2072                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
2073   LOG (GNUNET_ERROR_TYPE_DEBUG,
2074        "Sending ACK on channel %X\n",
2075        ntohl (channel->ccn.channel_of_client));
2076   msg->ccn = channel->ccn;
2077   GNUNET_MQ_send (channel->cadet->mq,
2078                   env);
2079 }
2080
2081
2082 static void
2083 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
2084 {
2085   struct GNUNET_MessageHeader *msg;
2086   struct GNUNET_MQ_Envelope *env;
2087
2088   LOG (GNUNET_ERROR_TYPE_DEBUG,
2089        " Sending %s monitor message to service\n",
2090        GC_m2s(type));
2091
2092   env = GNUNET_MQ_msg (msg, type);
2093   GNUNET_MQ_send (h->mq, env);
2094 }
2095
2096
2097 /**
2098  * Request a debug dump on the service's STDERR.
2099  *
2100  * WARNING: unstable API, likely to change in the future!
2101  *
2102  * @param h cadet handle
2103  */
2104 void
2105 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
2106 {
2107   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
2108   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
2109 }
2110
2111
2112 /**
2113  * Request information about peers known to the running cadet service.
2114  * The callback will be called for every peer known to the service.
2115  * Only one info request (of any kind) can be active at once.
2116  *
2117  *
2118  * WARNING: unstable API, likely to change in the future!
2119  *
2120  * @param h Handle to the cadet peer.
2121  * @param callback Function to call with the requested data.
2122  * @param callback_cls Closure for @c callback.
2123  *
2124  * @return #GNUNET_OK / #GNUNET_SYSERR
2125  */
2126 int
2127 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
2128                        GNUNET_CADET_PeersCB callback,
2129                        void *callback_cls)
2130 {
2131   if (NULL != h->info_cb.peers_cb)
2132   {
2133     GNUNET_break (0);
2134     return GNUNET_SYSERR;
2135   }
2136   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
2137   h->info_cb.peers_cb = callback;
2138   h->info_cls = callback_cls;
2139   return GNUNET_OK;
2140 }
2141
2142
2143 /**
2144  * Cancel a peer info request. The callback will not be called (anymore).
2145  *
2146  * WARNING: unstable API, likely to change in the future!
2147  *
2148  * @param h Cadet handle.
2149  *
2150  * @return Closure given to GNUNET_CADET_get_peers.
2151  */
2152 void *
2153 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
2154 {
2155   void *cls;
2156
2157   cls = h->info_cls;
2158   h->info_cb.peers_cb = NULL;
2159   h->info_cls = NULL;
2160   return cls;
2161 }
2162
2163
2164 /**
2165  * Request information about a peer known to the running cadet peer.
2166  * The callback will be called for the tunnel once.
2167  * Only one info request (of any kind) can be active at once.
2168  *
2169  * WARNING: unstable API, likely to change in the future!
2170  *
2171  * @param h Handle to the cadet peer.
2172  * @param id Peer whose tunnel to examine.
2173  * @param callback Function to call with the requested data.
2174  * @param callback_cls Closure for @c callback.
2175  *
2176  * @return #GNUNET_OK / #GNUNET_SYSERR
2177  */
2178 int
2179 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
2180                        const struct GNUNET_PeerIdentity *id,
2181                        GNUNET_CADET_PeerCB callback,
2182                        void *callback_cls)
2183 {
2184   struct GNUNET_CADET_LocalInfo *msg;
2185   struct GNUNET_MQ_Envelope *env;
2186
2187   if (NULL != h->info_cb.peer_cb)
2188   {
2189     GNUNET_break (0);
2190     return GNUNET_SYSERR;
2191   }
2192
2193   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
2194   msg->peer = *id;
2195   GNUNET_MQ_send (h->mq, env);
2196
2197   h->info_cb.peer_cb = callback;
2198   h->info_cls = callback_cls;
2199   return GNUNET_OK;
2200 }
2201
2202
2203 /**
2204  * Request information about tunnels of the running cadet peer.
2205  * The callback will be called for every tunnel of the service.
2206  * Only one info request (of any kind) can be active at once.
2207  *
2208  * WARNING: unstable API, likely to change in the future!
2209  *
2210  * @param h Handle to the cadet peer.
2211  * @param callback Function to call with the requested data.
2212  * @param callback_cls Closure for @c callback.
2213  *
2214  * @return #GNUNET_OK / #GNUNET_SYSERR
2215  */
2216 int
2217 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
2218                          GNUNET_CADET_TunnelsCB callback,
2219                          void *callback_cls)
2220 {
2221   if (NULL != h->info_cb.tunnels_cb)
2222   {
2223     GNUNET_break (0);
2224     return GNUNET_SYSERR;
2225   }
2226   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
2227   h->info_cb.tunnels_cb = callback;
2228   h->info_cls = callback_cls;
2229   return GNUNET_OK;
2230 }
2231
2232
2233 /**
2234  * Cancel a monitor request. The monitor callback will not be called.
2235  *
2236  * @param h Cadet handle.
2237  *
2238  * @return Closure given to GNUNET_CADET_get_tunnels.
2239  */
2240 void *
2241 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
2242 {
2243   void *cls;
2244
2245   h->info_cb.tunnels_cb = NULL;
2246   cls = h->info_cls;
2247   h->info_cls = NULL;
2248
2249   return cls;
2250 }
2251
2252
2253
2254 /**
2255  * Request information about a tunnel of the running cadet peer.
2256  * The callback will be called for the tunnel once.
2257  * Only one info request (of any kind) can be active at once.
2258  *
2259  * WARNING: unstable API, likely to change in the future!
2260  *
2261  * @param h Handle to the cadet peer.
2262  * @param id Peer whose tunnel to examine.
2263  * @param callback Function to call with the requested data.
2264  * @param callback_cls Closure for @c callback.
2265  *
2266  * @return #GNUNET_OK / #GNUNET_SYSERR
2267  */
2268 int
2269 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2270                         const struct GNUNET_PeerIdentity *id,
2271                         GNUNET_CADET_TunnelCB callback,
2272                         void *callback_cls)
2273 {
2274   struct GNUNET_CADET_LocalInfo *msg;
2275   struct GNUNET_MQ_Envelope *env;
2276
2277   if (NULL != h->info_cb.tunnel_cb)
2278   {
2279     GNUNET_break (0);
2280     return GNUNET_SYSERR;
2281   }
2282
2283   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2284   msg->peer = *id;
2285   GNUNET_MQ_send (h->mq, env);
2286
2287   h->info_cb.tunnel_cb = callback;
2288   h->info_cls = callback_cls;
2289   return GNUNET_OK;
2290 }
2291
2292
2293 /**
2294  * Request information about a specific channel of the running cadet peer.
2295  *
2296  * WARNING: unstable API, likely to change in the future!
2297  * FIXME Add destination option.
2298  *
2299  * @param h Handle to the cadet peer.
2300  * @param initiator ID of the owner of the channel.
2301  * @param channel_number Channel number.
2302  * @param callback Function to call with the requested data.
2303  * @param callback_cls Closure for @c callback.
2304  *
2305  * @return #GNUNET_OK / #GNUNET_SYSERR
2306  */
2307 int
2308 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2309                            struct GNUNET_PeerIdentity *initiator,
2310                            unsigned int channel_number,
2311                            GNUNET_CADET_ChannelCB callback,
2312                            void *callback_cls)
2313 {
2314   struct GNUNET_CADET_LocalInfo *msg;
2315   struct GNUNET_MQ_Envelope *env;
2316
2317   if (NULL != h->info_cb.channel_cb)
2318   {
2319     GNUNET_break (0);
2320     return GNUNET_SYSERR;
2321   }
2322
2323   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2324   msg->peer = *initiator;
2325   msg->ccn.channel_of_client = htonl (channel_number);
2326   GNUNET_MQ_send (h->mq, env);
2327
2328   h->info_cb.channel_cb = callback;
2329   h->info_cls = callback_cls;
2330   return GNUNET_OK;
2331 }
2332
2333
2334 /**
2335  * Function called to notify a client about the connection
2336  * begin ready to queue more data.  "buf" will be
2337  * NULL and "size" zero if the connection was closed for
2338  * writing in the meantime.
2339  *
2340  * @param cls closure
2341  * @param size number of bytes available in buf
2342  * @param buf where the callee should write the message
2343  * @return number of bytes written to buf
2344  */
2345 static size_t
2346 cadet_mq_ntr (void *cls, size_t size,
2347              void *buf)
2348 {
2349   struct GNUNET_MQ_Handle *mq = cls;
2350   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2351   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2352   uint16_t msize;
2353
2354   state->th = NULL;
2355   if (NULL == buf)
2356   {
2357     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2358     return 0;
2359   }
2360   msize = ntohs (msg->size);
2361   GNUNET_assert (msize <= size);
2362   GNUNET_memcpy (buf, msg, msize);
2363   GNUNET_MQ_impl_send_continue (mq);
2364   return msize;
2365 }
2366
2367
2368 /**
2369  * Signature of functions implementing the
2370  * sending functionality of a message queue.
2371  *
2372  * @param mq the message queue
2373  * @param msg the message to send
2374  * @param impl_state state of the implementation
2375  */
2376 static void
2377 cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2378                         const struct GNUNET_MessageHeader *msg,
2379                         void *impl_state)
2380 {
2381   struct CadetMQState *state = impl_state;
2382
2383   GNUNET_assert (NULL == state->th);
2384   state->th =
2385       GNUNET_CADET_notify_transmit_ready (state->channel,
2386                                          /* FIXME: add option for corking */
2387                                          GNUNET_NO,
2388                                          GNUNET_TIME_UNIT_FOREVER_REL,
2389                                          ntohs (msg->size),
2390                                          &cadet_mq_ntr, mq);
2391
2392 }
2393
2394
2395 /**
2396  * Signature of functions implementing the
2397  * destruction of a message queue.
2398  * Implementations must not free 'mq', but should
2399  * take care of 'impl_state'.
2400  *
2401  * @param mq the message queue to destroy
2402  * @param impl_state state of the implementation
2403  */
2404 static void
2405 cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2406                            void *impl_state)
2407 {
2408   struct CadetMQState *state = impl_state;
2409
2410   if (NULL != state->th)
2411     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2412
2413   GNUNET_free (state);
2414 }
2415
2416
2417 /**
2418  * Create a message queue for a cadet channel.
2419  * The message queue can only be used to transmit messages,
2420  * not to receive them.
2421  *
2422  * @param channel the channel to create the message qeue for
2423  * @return a message queue to messages over the channel
2424  */
2425 struct GNUNET_MQ_Handle *
2426 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2427 {
2428   struct GNUNET_MQ_Handle *mq;
2429   struct CadetMQState *state;
2430
2431   state = GNUNET_new (struct CadetMQState);
2432   state->channel = channel;
2433
2434   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2435                                       &cadet_mq_destroy_impl_old,
2436                                       NULL, /* FIXME: cancel impl. */
2437                                       state,
2438                                       NULL, /* no msg handlers */
2439                                       NULL, /* no err handlers */
2440                                       NULL); /* no handler cls */
2441   return mq;
2442 }
2443
2444
2445 /**
2446  * Transitional function to convert an unsigned int port to a hash value.
2447  * WARNING: local static value returned, NOT reentrant!
2448  * WARNING: do not use this function for new code!
2449  *
2450  * @param port Numerical port (unsigned int format).
2451  *
2452  * @return A GNUNET_HashCode usable for the new CADET API.
2453  */
2454 const struct GNUNET_HashCode *
2455 GC_u2h (uint32_t port)
2456 {
2457   static struct GNUNET_HashCode hash;
2458
2459   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2460               "This is a transitional function, "
2461               "use proper crypto hashes as CADET ports\n");
2462   GNUNET_CRYPTO_hash (&port, sizeof (port), &hash);
2463
2464   return &hash;
2465 }
2466
2467
2468
2469 /******************************************************************************/
2470 /******************************* MQ-BASED API *********************************/
2471 /******************************************************************************/
2472
2473 /**
2474  * Connect to the MQ-based cadet service.
2475  *
2476  * @param cfg Configuration to use.
2477  *
2478  * @return Handle to the cadet service NULL on error.
2479  */
2480 struct GNUNET_CADET_Handle *
2481 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2482 {
2483   struct GNUNET_CADET_Handle *h;
2484
2485   LOG (GNUNET_ERROR_TYPE_DEBUG,
2486        "GNUNET_CADET_connecT()\n");
2487   h = GNUNET_new (struct GNUNET_CADET_Handle);
2488   h->cfg = cfg;
2489   h->mq_api = GNUNET_YES;
2490   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2491   reconnect (h);
2492   if (NULL == h->mq)
2493   {
2494     GNUNET_break (0);
2495     GNUNET_CADET_disconnect (h);
2496     return NULL;
2497   }
2498   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2499   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2500   h->reconnect_task = NULL;
2501
2502   return h;
2503 }
2504
2505
2506 /**
2507  * Open a port to receive incomming MQ-based channels.
2508  *
2509  * @param h CADET handle.
2510  * @param port Hash identifying the port.
2511  * @param connects Function called when an incoming channel is connected.
2512  * @param connects_cls Closure for the @a connects handler.
2513  * @param window_changes Function called when the transmit window size changes.
2514  * @param disconnects Function called when a channel is disconnected.
2515  * @param handlers Callbacks for messages we care about, NULL-terminated.
2516  *
2517  * @return Port handle.
2518  */
2519 struct GNUNET_CADET_Port *
2520 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2521                         const struct GNUNET_HashCode *port,
2522                         GNUNET_CADET_ConnectEventHandler connects,
2523                         void * connects_cls,
2524                         GNUNET_CADET_WindowSizeEventHandler window_changes,
2525                         GNUNET_CADET_DisconnectEventHandler disconnects,
2526                         const struct GNUNET_MQ_MessageHandler *handlers)
2527 {
2528   struct GNUNET_CADET_PortMessage *msg;
2529   struct GNUNET_MQ_Envelope *env;
2530   struct GNUNET_CADET_Port *p;
2531
2532   GNUNET_assert (NULL != connects);
2533   GNUNET_assert (NULL != disconnects);
2534
2535   p = GNUNET_new (struct GNUNET_CADET_Port);
2536   p->cadet = h;
2537   p->id = *port;
2538   p->connects = connects;
2539   p->cls = connects_cls;
2540   p->window_changes = window_changes;
2541   p->disconnects = disconnects;
2542   if (NULL != handlers)
2543   {
2544     unsigned int i;
2545     for (i=0;NULL != handlers[i].cb; i++) ;
2546     p->handlers = GNUNET_new_array (i + 1,
2547                                     struct GNUNET_MQ_MessageHandler);
2548     GNUNET_memcpy ((struct GNUNET_MQ_MessageHandler *) p->handlers,
2549                    handlers,
2550                    i * sizeof (struct GNUNET_MQ_MessageHandler));
2551   }
2552
2553   GNUNET_assert (GNUNET_OK ==
2554                  GNUNET_CONTAINER_multihashmap_put (h->ports,
2555                                                     &p->id,
2556                                                     p,
2557                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2558
2559   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2560   msg->port = p->id;
2561   GNUNET_MQ_send (h->mq, env);
2562
2563   return p;
2564 }
2565
2566
2567 /**
2568  * Create a new channel towards a remote peer.
2569  *
2570  * If the destination port is not open by any peer or the destination peer
2571  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2572  * for this channel.
2573  *
2574  * @param h CADET handle.
2575  * @param channel_cls Closure for the channel. It's given to:
2576  *                    - The disconnect handler @a disconnects
2577  *                    - Each message type callback in @a handlers
2578  * @param destination Peer identity the channel should go to.
2579  * @param port Identification of the destination port.
2580  * @param options CadetOption flag field, with all desired option bits set to 1.
2581  * @param window_changes Function called when the transmit window size changes.
2582  * @param disconnects Function called when the channel is disconnected.
2583  * @param handlers Callbacks for messages we care about, NULL-terminated.
2584  *
2585  * @return Handle to the channel.
2586  */
2587 struct GNUNET_CADET_Channel *
2588 GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2589                              void *channel_cls,
2590                              const struct GNUNET_PeerIdentity *destination,
2591                              const struct GNUNET_HashCode *port,
2592                              enum GNUNET_CADET_ChannelOption options,
2593                              GNUNET_CADET_WindowSizeEventHandler window_changes,
2594                              GNUNET_CADET_DisconnectEventHandler disconnects,
2595                              const struct GNUNET_MQ_MessageHandler *handlers)
2596 {
2597   struct GNUNET_CADET_Channel *ch;
2598   struct GNUNET_CADET_ClientChannelNumber ccn;
2599   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2600   struct GNUNET_MQ_Envelope *env;
2601
2602   GNUNET_assert (NULL != disconnects);
2603
2604   /* Save parameters */
2605   ccn.channel_of_client = htonl (0);
2606   ch = create_channel (h, ccn);
2607   ch->ctx = channel_cls;
2608   ch->peer = GNUNET_PEER_intern (destination);
2609   ch->options = options;
2610   ch->window_changes = window_changes;
2611   ch->disconnects = disconnects;
2612
2613   /* Create MQ for channel */
2614   ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2615                                           &cadet_mq_destroy_impl,
2616                                           &cadet_mq_cancel_impl,
2617                                           ch,
2618                                           handlers,
2619                                           &cadet_mq_error_handler,
2620                                           ch);
2621   GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2622
2623   /* Request channel creation to service */
2624   env = GNUNET_MQ_msg (msg,
2625                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2626   msg->ccn = ch->ccn;
2627   msg->port = *port;
2628   msg->peer = *destination;
2629   msg->opt = htonl (options);
2630   GNUNET_MQ_send (h->mq,
2631                   env);
2632
2633   return ch;
2634 }
2635
2636
2637 /**
2638  * Obtain the message queue for a connected peer.
2639  *
2640  * @param channel The channel handle from which to get the MQ.
2641  *
2642  * @return NULL if @a channel is not yet connected.
2643  */
2644 struct GNUNET_MQ_Handle *
2645 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2646 {
2647   return channel->mq;
2648 }