Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_cadet_service.h"
30 #include "cadet.h"
31 #include "cadet_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  *
42  * @deprecated
43  */
44 struct GNUNET_CADET_TransmitHandle
45 {
46   /**
47    * Double Linked list
48    */
49   struct GNUNET_CADET_TransmitHandle *next;
50
51   /**
52    * Double Linked list
53    */
54   struct GNUNET_CADET_TransmitHandle *prev;
55
56   /**
57    * Channel this message is sent on / for (may be NULL for control messages).
58    */
59   struct GNUNET_CADET_Channel *channel;
60
61   /**
62    * Request data task.
63    */
64   struct GNUNET_SCHEDULER_Task *request_data_task;
65
66   /**
67    * Callback to obtain the message to transmit, or NULL if we
68    * got the message in 'data'.  Notice that messages built
69    * by 'notify' need to be encapsulated with information about
70    * the 'target'.
71    */
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73
74   /**
75    * Closure for 'notify'
76    */
77   void *notify_cls;
78
79   /**
80    * Size of the payload.
81    */
82   size_t size;
83 };
84
85
86 union CadetInfoCB
87 {
88
89   /**
90    * Channel callback.
91    */
92   GNUNET_CADET_ChannelCB channel_cb;
93
94   /**
95    * Monitor callback
96    */
97   GNUNET_CADET_PeersCB peers_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeerCB peer_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_TunnelsCB tunnels_cb;
108
109   /**
110    * Tunnel callback.
111    */
112   GNUNET_CADET_TunnelCB tunnel_cb;
113 };
114
115
116 /**
117  * Opaque handle to the service.
118  */
119 struct GNUNET_CADET_Handle
120 {
121   /**
122    * Flag to indicate old or MQ API.
123    */
124   int mq_api;
125
126   /**
127    * Message queue (if available).
128    */
129   struct GNUNET_MQ_Handle *mq;
130
131   /**
132    * Set of handlers used for processing incoming messages in the channels
133    *
134    * @deprecated
135    */
136   const struct GNUNET_CADET_MessageHandler *message_handlers;
137
138   /**
139    * Number of handlers in the handlers array.
140    *
141    * @deprecated
142    */
143   unsigned int n_handlers;
144
145   /**
146    * Ports open.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap *ports;
149
150   /**
151    * Double linked list of the channels this client is connected to, head.
152    */
153   struct GNUNET_CADET_Channel *channels_head;
154
155   /**
156    * Double linked list of the channels this client is connected to, tail.
157    */
158   struct GNUNET_CADET_Channel *channels_tail;
159
160   /**
161    * Callback for inbound channel disconnection
162    */
163   GNUNET_CADET_ChannelEndHandler *cleaner;
164
165   /**
166    * Closure for all the handlers given by the client
167    *
168    * @deprecated
169    */
170   void *cls;
171
172   /**
173    * Messages to send to the service, head.
174    *
175    * @deprecated
176    */
177   struct GNUNET_CADET_TransmitHandle *th_head;
178
179   /**
180    * Messages to send to the service, tail.
181    *
182    * @deprecated
183    */
184   struct GNUNET_CADET_TransmitHandle *th_tail;
185
186   /**
187    * child of the next channel to create (to avoid reusing IDs often)
188    */
189   struct GNUNET_CADET_ClientChannelNumber next_ccn;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   struct GNUNET_SCHEDULER_Task * reconnect_task;
205
206   /**
207    * Callback for an info task (only one active at a time).
208    */
209   union CadetInfoCB info_cb;
210
211   /**
212    * Info callback closure for @c info_cb.
213    */
214   void *info_cls;
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_CADET_Peer
222 {
223   /**
224    * ID of the peer in short form
225    */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Channel this peer belongs to
230    */
231   struct GNUNET_CADET_Channel *t;
232 };
233
234
235 /**
236  * Opaque handle to a channel.
237  */
238 struct GNUNET_CADET_Channel
239 {
240   /**
241    * DLL next
242    */
243   struct GNUNET_CADET_Channel *next;
244
245   /**
246    * DLL prev
247    */
248   struct GNUNET_CADET_Channel *prev;
249
250   /**
251    * Handle to the cadet this channel belongs to
252    */
253   struct GNUNET_CADET_Handle *cadet;
254
255   /**
256    * Local ID of the channel
257    */
258   struct GNUNET_CADET_ClientChannelNumber ccn;
259
260   /**
261    * Channel's port, if incoming.
262    */
263   struct GNUNET_CADET_Port *incoming_port;
264
265   /**
266    * Other end of the channel.
267    */
268   GNUNET_PEER_Id peer;
269
270   /**
271    * Any data the caller wants to put in here
272    */
273   void *ctx;
274
275   /**
276    * Channel options: reliability, etc.
277    */
278   enum GNUNET_CADET_ChannelOption options;
279
280   /**
281    * Are we allowed to send to the service?
282    *
283    * @deprecated?
284    */
285   unsigned int allow_send;
286
287   /*****************************    MQ     ************************************/
288   /**
289    * Message Queue for the channel.
290    */
291   struct GNUNET_MQ_Handle *mq;
292
293   /**
294    * Task to allow mq to send more traffic.
295    */
296   struct GNUNET_SCHEDULER_Task *mq_cont;
297
298   /**
299    * Pending envelope in case we don't have an ACK from the service.
300    */
301   struct GNUNET_MQ_Envelope *pending_env;
302
303   /**
304    * Window change handler.
305    */
306   GNUNET_CADET_WindowSizeEventHandler window_changes;
307
308   /**
309    * Disconnect handler.
310    */
311   GNUNET_CADET_DisconnectEventHandler disconnects;
312
313 };
314
315
316 /**
317  * Opaque handle to a port.
318  */
319 struct GNUNET_CADET_Port
320 {
321   /**
322    * Handle to the CADET session this port belongs to.
323    */
324   struct GNUNET_CADET_Handle *cadet;
325
326   /**
327    * Port ID.
328    */
329   struct GNUNET_HashCode *hash;
330
331   /**
332    * Callback handler for incoming channels on this port.
333    */
334   GNUNET_CADET_InboundChannelNotificationHandler *handler;
335
336   /**
337    * Closure for @a handler.
338    */
339   void *cls;
340
341   /*****************************    MQ     ************************************/
342
343   /**
344    * Port "number"
345    */
346   struct GNUNET_HashCode id;
347
348   /**
349    * Handler for incoming channels on this port
350    */
351   GNUNET_CADET_ConnectEventHandler connects;
352
353   /**
354    * Closure for @ref connects
355    */
356   void * connects_cls;
357
358   /**
359    * Window size change handler.
360    */
361   GNUNET_CADET_WindowSizeEventHandler window_changes;
362
363   /**
364    * Handler called when an incoming channel is destroyed..
365    */
366   GNUNET_CADET_DisconnectEventHandler disconnects;
367
368   /**
369    * Payload handlers for incoming channels.
370    */
371   const struct GNUNET_MQ_MessageHandler *handlers;
372 };
373
374
375 /**
376  * Implementation state for cadet's message queue.
377  */
378 struct CadetMQState
379 {
380   /**
381    * The current transmit handle, or NULL
382    * if no transmit is active.
383    */
384   struct GNUNET_CADET_TransmitHandle *th;
385
386   /**
387    * Channel to send the data over.
388    */
389   struct GNUNET_CADET_Channel *channel;
390 };
391
392
393
394 /******************************************************************************/
395 /*********************      FUNCTION DECLARATIONS     *************************/
396 /******************************************************************************/
397
398 /**
399  * Reconnect to the service, retransmit all infomation to try to restore the
400  * original state.
401  *
402  * @param h Handle to the CADET service.
403  */
404 static void
405 schedule_reconnect (struct GNUNET_CADET_Handle *h);
406
407
408 /**
409  * Reconnect callback: tries to reconnect again after a failer previous
410  * reconnection.
411  *
412  * @param cls Closure (cadet handle).
413  */
414 static void
415 reconnect_cbk (void *cls);
416
417
418 /**
419  * Reconnect to the service, retransmit all infomation to try to restore the
420  * original state.
421  *
422  * @param h handle to the cadet
423  */
424 static void
425 reconnect (struct GNUNET_CADET_Handle *h);
426
427
428 /******************************************************************************/
429 /***********************     AUXILIARY FUNCTIONS      *************************/
430 /******************************************************************************/
431
432 /**
433  * Check if transmission is a payload packet.
434  *
435  * @param th Transmission handle.
436  *
437  * @return #GNUNET_YES if it is a payload packet,
438  *         #GNUNET_NO if it is a cadet management packet.
439  */
440 static int
441 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
442 {
443   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
444 }
445
446
447 /**
448  * Find the Port struct for a hash.
449  *
450  * @param h CADET handle.
451  * @param hash HashCode for the port number.
452  *
453  * @return The port handle if known, NULL otherwise.
454  */
455 static struct GNUNET_CADET_Port *
456 find_port (const struct GNUNET_CADET_Handle *h,
457            const struct GNUNET_HashCode *hash)
458 {
459   struct GNUNET_CADET_Port *p;
460
461   p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
462
463   return p;
464 }
465
466
467 /**
468  * Get the channel handler for the channel specified by id from the given handle
469  *
470  * @param h Cadet handle
471  * @param ccn ID of the wanted channel
472  * @return handle to the required channel or NULL if not found
473  */
474 static struct GNUNET_CADET_Channel *
475 retrieve_channel (struct GNUNET_CADET_Handle *h,
476                   struct GNUNET_CADET_ClientChannelNumber ccn)
477 {
478   struct GNUNET_CADET_Channel *ch;
479
480   for (ch = h->channels_head; NULL != ch; ch = ch->next)
481     if (ch->ccn.channel_of_client == ccn.channel_of_client)
482       return ch;
483   return NULL;
484 }
485
486
487 /**
488  * Create a new channel and insert it in the channel list of the cadet handle
489  *
490  * @param h Cadet handle
491  * @param ccn Desired ccn of the channel, 0 to assign one automatically.
492  *
493  * @return Handle to the created channel.
494  */
495 static struct GNUNET_CADET_Channel *
496 create_channel (struct GNUNET_CADET_Handle *h,
497                 struct GNUNET_CADET_ClientChannelNumber ccn)
498 {
499   struct GNUNET_CADET_Channel *ch;
500
501   ch = GNUNET_new (struct GNUNET_CADET_Channel);
502   GNUNET_CONTAINER_DLL_insert (h->channels_head,
503                                h->channels_tail,
504                                ch);
505   ch->cadet = h;
506   if (0 == ccn.channel_of_client)
507   {
508     ch->ccn = h->next_ccn;
509     while (NULL != retrieve_channel (h,
510                                      h->next_ccn))
511     {
512       h->next_ccn.channel_of_client
513         = htonl (1 + ntohl (h->next_ccn.channel_of_client));
514       if (0 == ntohl (h->next_ccn.channel_of_client))
515         h->next_ccn.channel_of_client
516           = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
517     }
518   }
519   else
520   {
521     ch->ccn = ccn;
522   }
523   return ch;
524 }
525
526
527 /**
528  * Destroy the specified channel.
529  * - Destroys all peers, calling the disconnect callback on each if needed
530  * - Cancels all outgoing traffic for that channel, calling respective notifys
531  * - Calls cleaner if channel was inbound
532  * - Frees all memory used
533  *
534  * @param ch Pointer to the channel.
535  * @param call_cleaner Whether to call the cleaner handler.
536  *
537  * @return Handle to the required channel or NULL if not found.
538  */
539 static void
540 destroy_channel (struct GNUNET_CADET_Channel *ch)
541 {
542   struct GNUNET_CADET_Handle *h;
543   struct GNUNET_CADET_TransmitHandle *th;
544   struct GNUNET_CADET_TransmitHandle *next;
545
546   if (NULL == ch)
547   {
548     GNUNET_break (0);
549     return;
550   }
551   h = ch->cadet;
552   LOG (GNUNET_ERROR_TYPE_DEBUG,
553        " destroy_channel %X of %p\n",
554        ch->ccn,
555        h);
556
557   GNUNET_CONTAINER_DLL_remove (h->channels_head,
558                                h->channels_tail,
559                                ch);
560   if (NULL != ch->mq_cont)
561   {
562     GNUNET_SCHEDULER_cancel (ch->mq_cont);
563     ch->mq_cont = NULL;
564   }
565   /* signal channel destruction */
566   if (0 != ch->peer)
567   {
568     if (NULL != h->cleaner)
569     {
570       /** @a deprecated  */
571       LOG (GNUNET_ERROR_TYPE_DEBUG,
572           " calling cleaner\n");
573       h->cleaner (h->cls, ch, ch->ctx);
574     }
575     else if (NULL != ch->disconnects)
576     {
577       LOG (GNUNET_ERROR_TYPE_DEBUG,
578           " calling disconnect handler\n");
579       ch->disconnects (ch->ctx, ch);
580     }
581     else
582     {
583       /* Application won't be aware of the channel destruction and use
584        * a pointer to free'd memory.
585        */
586       GNUNET_assert (0);
587     }
588   }
589
590   /* check that clients did not leave messages behind in the queue */
591   for (th = h->th_head; NULL != th; th = next)
592   {
593     next = th->next;
594     if (th->channel != ch)
595       continue;
596     /* Clients should have aborted their requests already.
597      * Management traffic should be ok, as clients can't cancel that.
598      * If the service crashed and we are reconnecting, it's ok.
599      */
600     GNUNET_break (GNUNET_NO == th_is_payload (th));
601     GNUNET_CADET_notify_transmit_ready_cancel (th);
602   }
603
604   if (0 != ch->peer)
605     GNUNET_PEER_change_rc (ch->peer, -1);
606   GNUNET_free (ch);
607 }
608
609
610 /**
611  * Add a transmit handle to the transmission queue and set the
612  * timeout if needed.
613  *
614  * @param h cadet handle with the queue head and tail
615  * @param th handle to the packet to be transmitted
616  */
617 static void
618 add_to_queue (struct GNUNET_CADET_Handle *h,
619               struct GNUNET_CADET_TransmitHandle *th)
620 {
621   GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
622                                     h->th_tail,
623                                     th);
624 }
625
626
627 /**
628  * Remove a transmit handle from the transmission queue, if present.
629  *
630  * Safe to call even if not queued.
631  *
632  * @param th handle to the packet to be unqueued.
633  */
634 static void
635 remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
636 {
637   struct GNUNET_CADET_Handle *h = th->channel->cadet;
638
639   /* It might or might not have been queued (rarely not), but check anyway. */
640   if (NULL != th->next || h->th_tail == th)
641   {
642     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
643   }
644 }
645
646
647 /**
648  * Notify the application about a change in the window size (if needed).
649  *
650  * @param ch Channel to notify about.
651  */
652 static void
653 notify_window_size (struct GNUNET_CADET_Channel *ch)
654 {
655   if (NULL != ch->window_changes)
656   {
657     ch->window_changes (ch->ctx, ch, ch->allow_send);
658   }
659 }
660
661 /******************************************************************************/
662 /***********************      MQ API CALLBACKS     ****************************/
663 /******************************************************************************/
664
665 /**
666  * Allow the MQ implementation to send the next message.
667  *
668  * @param cls Closure (channel whose mq to activate).
669  */
670 static void
671 cadet_mq_send_continue (void *cls)
672 {
673   struct GNUNET_CADET_Channel *ch = cls;
674
675   ch->mq_cont = NULL;
676   GNUNET_MQ_impl_send_continue (ch->mq);
677 }
678
679 /**
680  * Implement sending functionality of a message queue for
681  * us sending messages to a peer.
682  *
683  * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
684  * in order to label the message with the channel ID and send the
685  * encapsulated message to the service.
686  *
687  * @param mq the message queue
688  * @param msg the message to send
689  * @param impl_state state of the implementation
690  */
691 static void
692 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
693                     const struct GNUNET_MessageHeader *msg,
694                     void *impl_state)
695 {
696   struct GNUNET_CADET_Channel *ch = impl_state;
697   struct GNUNET_CADET_Handle *h = ch->cadet;
698   uint16_t msize;
699   struct GNUNET_MQ_Envelope *env;
700   struct GNUNET_CADET_LocalData *cadet_msg;
701
702
703   if (NULL == h->mq)
704   {
705     /* We're currently reconnecting, pretend this worked */
706     GNUNET_MQ_impl_send_continue (mq);
707     return;
708   }
709
710   /* check message size for sanity */
711   msize = ntohs (msg->size);
712   if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
713   {
714     GNUNET_break (0);
715     GNUNET_MQ_impl_send_continue (mq);
716     return;
717   }
718
719   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
720                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
721                                  msg);
722   cadet_msg->ccn = ch->ccn;
723
724   if (0 < ch->allow_send)
725   {
726     /* Service has allowed this message, just send it and continue accepting */
727     GNUNET_MQ_send (h->mq, env);
728     ch->allow_send--;
729     ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
730     // notify_window_size (ch); /* FIXME add "verbose" setting? */
731   }
732   else
733   {
734     /* Service has NOT allowed this message, queue it and wait for an ACK */
735     GNUNET_assert (NULL != ch->pending_env);
736     ch->pending_env = env;
737   }
738 }
739
740
741 /**
742  * Handle destruction of a message queue.  Implementations must not
743  * free @a mq, but should take care of @a impl_state.
744  *
745  * @param mq the message queue to destroy
746  * @param impl_state state of the implementation
747  */
748 static void
749 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
750                        void *impl_state)
751 {
752   struct GNUNET_CADET_Channel *ch = impl_state;
753
754   GNUNET_assert (mq == ch->mq);
755   ch->mq = NULL;
756 }
757
758
759 /**
760  * We had an error processing a message we forwarded from a peer to
761  * the CADET service.  We should just complain about it but otherwise
762  * continue processing.
763  *
764  * @param cls closure
765  * @param error error code
766  */
767 static void
768 cadet_mq_error_handler (void *cls,
769                         enum GNUNET_MQ_Error error)
770 {
771   GNUNET_break_op (0);
772 }
773
774
775 /**
776  * Implementation function that cancels the currently sent message.
777  * Should basically undo whatever #mq_send_impl() did.
778  *
779  * @param mq message queue
780  * @param impl_state state specific to the implementation
781  */
782
783 static void
784 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
785                      void *impl_state)
786 {
787   struct GNUNET_CADET_Channel *ch = impl_state;
788
789   LOG (GNUNET_ERROR_TYPE_WARNING,
790        "Cannot cancel mq message on channel %X of %p\n",
791        ch->ccn.channel_of_client, ch->cadet);
792
793   GNUNET_break (0);
794 }
795
796
797 /******************************************************************************/
798 /***********************      RECEIVE HANDLERS     ****************************/
799 /******************************************************************************/
800
801
802 /**
803  * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
804  * request the data to send over MQ. Since MQ manages the queue, this function
805  * is scheduled immediatly after a transmit ready notification.
806  *
807  * @param cls Closure (transmit handle).
808  */
809 static void
810 request_data (void *cls)
811 {
812   struct GNUNET_CADET_TransmitHandle *th = cls;
813   struct GNUNET_CADET_LocalData *msg;
814   struct GNUNET_MQ_Envelope *env;
815   size_t osize;
816
817   LOG (GNUNET_ERROR_TYPE_DEBUG,
818        "Requesting Data: %u bytes (allow send is %u)\n",
819        th->size,
820        th->channel->allow_send);
821
822   GNUNET_assert (0 < th->channel->allow_send);
823   th->channel->allow_send--;
824   /* NOTE: we may be allowed to send another packet immediately,
825      albeit the current logic waits for the ACK. */
826   th->request_data_task = NULL;
827   remove_from_queue (th);
828
829   env = GNUNET_MQ_msg_extra (msg,
830                              th->size,
831                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
832   msg->ccn = th->channel->ccn;
833   osize = th->notify (th->notify_cls,
834                       th->size,
835                       &msg[1]);
836   GNUNET_assert (osize == th->size);
837
838   GNUNET_MQ_send (th->channel->cadet->mq,
839                   env);
840   GNUNET_free (th);
841 }
842
843
844 /**
845  * Process the new channel notification and add it to the channels in the handle
846  *
847  * @param h     The cadet handle
848  * @param msg   A message with the details of the new incoming channel
849  */
850 static void
851 handle_channel_created (void *cls,
852                         const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
853 {
854   struct GNUNET_CADET_Handle *h = cls;
855   struct GNUNET_CADET_Channel *ch;
856   struct GNUNET_CADET_Port *port;
857   const struct GNUNET_HashCode *port_number;
858   struct GNUNET_CADET_ClientChannelNumber ccn;
859
860   ccn = msg->ccn;
861   port_number = &msg->port;
862   if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
863   {
864     GNUNET_break (0);
865     return;
866   }
867   port = find_port (h, port_number);
868   if (NULL == port)
869   {
870     /* We could have closed the port but the service didn't know about it yet
871      * This is not an error.
872      */
873     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
874     struct GNUNET_MQ_Envelope *env;
875
876     GNUNET_break (0);
877     LOG (GNUNET_ERROR_TYPE_DEBUG,
878          "No handler for incoming channel %X (on port %s, recently closed?)\n",
879          ntohl (ccn.channel_of_client),
880          GNUNET_h2s (port_number));
881     env = GNUNET_MQ_msg (d_msg,
882                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
883     d_msg->ccn = msg->ccn;
884     GNUNET_MQ_send (h->mq,
885                     env);
886     return;
887   }
888
889   ch = create_channel (h,
890                        ccn);
891   ch->peer = GNUNET_PEER_intern (&msg->peer);
892   ch->cadet = h;
893   ch->ccn = ccn;
894   ch->incoming_port = port;
895   ch->options = ntohl (msg->opt);
896   LOG (GNUNET_ERROR_TYPE_DEBUG,
897        "Creating incoming channel %X [%s] %p\n",
898        ntohl (ccn.channel_of_client),
899        GNUNET_h2s (port_number),
900        ch);
901
902   if (NULL != port->handler)
903   {
904     /** @deprecated */
905     /* Old style API */
906     ch->ctx = port->handler (port->cls,
907                             ch,
908                             &msg->peer,
909                             port->hash,
910                             ch->options);
911   } else {
912     /* MQ API */
913     GNUNET_assert (NULL != port->connects);
914     ch->window_changes = port->window_changes;
915     ch->disconnects = port->disconnects;
916     ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
917                                             &cadet_mq_destroy_impl,
918                                             &cadet_mq_cancel_impl,
919                                             ch,
920                                             port->handlers,
921                                             &cadet_mq_error_handler,
922                                             ch);
923     ch->ctx = port->connects (port->cadet->cls,
924                               ch,
925                               &msg->peer);
926     GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
927   }
928 }
929
930
931 /**
932  * Process the channel destroy notification and free associated resources
933  *
934  * @param h     The cadet handle
935  * @param msg   A message with the details of the channel being destroyed
936  */
937 static void
938 handle_channel_destroy (void *cls,
939                         const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
940 {
941   struct GNUNET_CADET_Handle *h = cls;
942   struct GNUNET_CADET_Channel *ch;
943   struct GNUNET_CADET_ClientChannelNumber ccn;
944
945   ccn = msg->ccn;
946   LOG (GNUNET_ERROR_TYPE_DEBUG,
947        "Channel %X Destroy from service\n",
948        ntohl (ccn.channel_of_client));
949   ch = retrieve_channel (h,
950                          ccn);
951
952   if (NULL == ch)
953   {
954     LOG (GNUNET_ERROR_TYPE_DEBUG,
955          "channel %X unknown\n",
956          ntohl (ccn.channel_of_client));
957     return;
958   }
959   destroy_channel (ch);
960 }
961
962
963 /**
964  * Check that message received from CADET service is well-formed.
965  *
966  * @param cls the `struct GNUNET_CADET_Handle`
967  * @param message the message we got
968  * @return #GNUNET_OK if the message is well-formed,
969  *         #GNUNET_SYSERR otherwise
970  */
971 static int
972 check_local_data (void *cls,
973                   const struct GNUNET_CADET_LocalData *message)
974 {
975   struct GNUNET_CADET_Handle *h = cls;
976   struct GNUNET_CADET_Channel *ch;
977   uint16_t size;
978
979   size = ntohs (message->header.size);
980   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
981   {
982     GNUNET_break_op (0);
983     return GNUNET_SYSERR;
984   }
985
986   ch = retrieve_channel (h,
987                          message->ccn);
988   if (NULL == ch)
989   {
990     GNUNET_break_op (0);
991     return GNUNET_SYSERR;
992   }
993
994   return GNUNET_OK;
995 }
996
997
998 /**
999  * Process the incoming data packets, call appropriate handlers.
1000  *
1001  * @param h       The cadet handle
1002  * @param message A message encapsulating the data
1003  */
1004 static void
1005 handle_local_data (void *cls,
1006                    const struct GNUNET_CADET_LocalData *message)
1007 {
1008   struct GNUNET_CADET_Handle *h = cls;
1009   const struct GNUNET_MessageHeader *payload;
1010   const struct GNUNET_CADET_MessageHandler *handler;
1011   struct GNUNET_CADET_Channel *ch;
1012   uint16_t type;
1013
1014   ch = retrieve_channel (h,
1015                          message->ccn);
1016   if (NULL == ch)
1017   {
1018     GNUNET_break_op (0);
1019     reconnect (h);
1020     return;
1021   }
1022
1023   payload = (struct GNUNET_MessageHeader *) &message[1];
1024   type = ntohs (payload->type);
1025   LOG (GNUNET_ERROR_TYPE_DEBUG,
1026        "Got a %s data on channel %s [%X] of type %s (%u)\n",
1027        GC_f2s (ntohl (ch->ccn.channel_of_client) >=
1028                GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
1029        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
1030        ntohl (message->ccn.channel_of_client),
1031        GC_m2s (type),
1032        type);
1033   if (NULL != ch->mq)
1034   {
1035     GNUNET_MQ_inject_message (ch->mq, payload);
1036     return;
1037   }
1038   /** @a deprecated */
1039   for (unsigned i=0;i<h->n_handlers;i++)
1040   {
1041     handler = &h->message_handlers[i];
1042     if (handler->type == type)
1043     {
1044       if (GNUNET_OK !=
1045           handler->callback (h->cls,
1046                              ch,
1047                              &ch->ctx,
1048                              payload))
1049       {
1050         LOG (GNUNET_ERROR_TYPE_DEBUG,
1051              "callback caused disconnection\n");
1052         GNUNET_CADET_channel_destroy (ch);
1053         return;
1054       }
1055       return;
1056     }
1057   }
1058   /* Other peer sent message we do not comprehend. */
1059   GNUNET_break_op (0);
1060   GNUNET_CADET_receive_done (ch);
1061 }
1062
1063
1064 /**
1065  * Process a local ACK message, enabling the client to send
1066  * more data to the service.
1067  *
1068  * @param h Cadet handle.
1069  * @param message Message itself.
1070  */
1071 static void
1072 handle_local_ack (void *cls,
1073                   const struct GNUNET_CADET_LocalAck *message)
1074 {
1075   struct GNUNET_CADET_Handle *h = cls;
1076   struct GNUNET_CADET_Channel *ch;
1077   struct GNUNET_CADET_ClientChannelNumber ccn;
1078   struct GNUNET_CADET_TransmitHandle *th;
1079
1080   ccn = message->ccn;
1081   ch = retrieve_channel (h, ccn);
1082   if (NULL == ch)
1083   {
1084     LOG (GNUNET_ERROR_TYPE_DEBUG,
1085          "ACK on unknown channel %X\n",
1086          ntohl (ccn.channel_of_client));
1087     return;
1088   }
1089   ch->allow_send++;
1090   if (NULL != ch->mq)
1091   {
1092     if (NULL == ch->pending_env)
1093     {
1094       LOG (GNUNET_ERROR_TYPE_DEBUG,
1095            "Got an ACK on mq channel %X, allow send now %u!\n",
1096            ntohl (ch->ccn.channel_of_client),
1097            ch->allow_send);
1098       notify_window_size (ch);
1099     }
1100     else
1101     {
1102       LOG (GNUNET_ERROR_TYPE_DEBUG,
1103            "Got an ACK on channel %X, sending pending message!\n",
1104            ntohl (ch->ccn.channel_of_client));
1105       GNUNET_MQ_send (h->mq, ch->pending_env);
1106       ch->allow_send--;
1107       ch->pending_env = NULL;
1108       ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
1109     }
1110     return;
1111   }
1112
1113   /** @deprecated */
1114   /* Old style API */
1115   for (th = h->th_head; NULL != th; th = th->next)
1116   {
1117     if ( (th->channel == ch) &&
1118          (NULL == th->request_data_task) )
1119     {
1120       th->request_data_task
1121         = GNUNET_SCHEDULER_add_now (&request_data,
1122                                     th);
1123       break;
1124     }
1125   }
1126 }
1127
1128
1129 /**
1130  * Generic error handler, called with the appropriate error code and
1131  * the same closure specified at the creation of the message queue.
1132  * Not every message queue implementation supports an error handler.
1133  *
1134  * @param cls closure, a `struct GNUNET_CORE_Handle *`
1135  * @param error error code
1136  */
1137 static void
1138 handle_mq_error (void *cls,
1139                  enum GNUNET_MQ_Error error)
1140 {
1141   struct GNUNET_CADET_Handle *h = cls;
1142
1143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
1144   GNUNET_MQ_destroy (h->mq);
1145   h->mq = NULL;
1146   reconnect (h);
1147 }
1148
1149
1150 /*
1151  * Process a local reply about info on all channels, pass info to the user.
1152  *
1153  * @param h Cadet handle.
1154  * @param message Message itself.
1155  */
1156 // static void
1157 // process_get_channels (struct GNUNET_CADET_Handle *h,
1158 //                      const struct GNUNET_MessageHeader *message)
1159 // {
1160 //   struct GNUNET_CADET_LocalInfo *msg;
1161 //
1162 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
1163 //
1164 //   if (NULL == h->channels_cb)
1165 //   {
1166 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1167 //     return;
1168 //   }
1169 //
1170 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1171 //   if (ntohs (message->size) !=
1172 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
1173 //        sizeof (struct GNUNET_PeerIdentity)))
1174 //   {
1175 //     GNUNET_break_op (0);
1176 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1177 //                 "Get channels message: size %hu - expected %u\n",
1178 //                 ntohs (message->size),
1179 //                 sizeof (struct GNUNET_CADET_LocalInfo));
1180 //     return;
1181 //   }
1182 //   h->channels_cb (h->channels_cls,
1183 //                   ntohl (msg->channel_id),
1184 //                   &msg->owner,
1185 //                   &msg->destination);
1186 // }
1187
1188
1189
1190 /*
1191  * Process a local monitor_channel reply, pass info to the user.
1192  *
1193  * @param h Cadet handle.
1194  * @param message Message itself.
1195  */
1196 // static void
1197 // process_show_channel (struct GNUNET_CADET_Handle *h,
1198 //                      const struct GNUNET_MessageHeader *message)
1199 // {
1200 //   struct GNUNET_CADET_LocalInfo *msg;
1201 //   size_t esize;
1202 //
1203 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1204 //
1205 //   if (NULL == h->channel_cb)
1206 //   {
1207 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1208 //     return;
1209 //   }
1210 //
1211 //   /* Verify message sanity */
1212 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1213 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
1214 //   if (ntohs (message->size) != esize)
1215 //   {
1216 //     GNUNET_break_op (0);
1217 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1218 //                 "Show channel message: size %hu - expected %u\n",
1219 //                 ntohs (message->size),
1220 //                 esize);
1221 //
1222 //     h->channel_cb (h->channel_cls, NULL, NULL);
1223 //     h->channel_cb = NULL;
1224 //     h->channel_cls = NULL;
1225 //
1226 //     return;
1227 //   }
1228 //
1229 //   h->channel_cb (h->channel_cls,
1230 //                  &msg->destination,
1231 //                  &msg->owner);
1232 // }
1233
1234
1235
1236 /**
1237  * Check that message received from CADET service is well-formed.
1238  *
1239  * @param cls the `struct GNUNET_CADET_Handle`
1240  * @param message the message we got
1241  * @return #GNUNET_OK if the message is well-formed,
1242  *         #GNUNET_SYSERR otherwise
1243  */
1244 static int
1245 check_get_peers (void *cls,
1246                  const struct GNUNET_CADET_LocalInfoPeer *message)
1247 {
1248   struct GNUNET_CADET_Handle *h = cls;
1249   uint16_t size;
1250
1251   if (NULL == h->info_cb.peers_cb)
1252   {
1253     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1254                 "  no handler for peesr monitor message!\n");
1255     return GNUNET_SYSERR;
1256   }
1257
1258   size = ntohs (message->header.size);
1259   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1260   {
1261     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1262     h->info_cb.peers_cb = NULL;
1263     h->info_cls = NULL;
1264     return GNUNET_SYSERR;
1265   }
1266
1267   return GNUNET_OK;
1268 }
1269
1270
1271 /**
1272  * Process a local reply about info on all tunnels, pass info to the user.
1273  *
1274  * @param cls Closure (Cadet handle).
1275  * @param msg Message itself.
1276  */
1277 static void
1278 handle_get_peers (void *cls,
1279                   const struct GNUNET_CADET_LocalInfoPeer *msg)
1280 {
1281   struct GNUNET_CADET_Handle *h = cls;
1282   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1283                        (int) ntohs (msg->tunnel),
1284                        (unsigned int ) ntohs (msg->paths),
1285                        0);
1286 }
1287
1288
1289 /**
1290  * Check that message received from CADET service is well-formed.
1291  *
1292  * @param cls the `struct GNUNET_CADET_Handle`
1293  * @param message the message we got
1294  * @return #GNUNET_OK if the message is well-formed,
1295  *         #GNUNET_SYSERR otherwise
1296  */
1297 static int
1298 check_get_peer (void *cls,
1299                 const struct GNUNET_CADET_LocalInfoPeer *message)
1300 {
1301   struct GNUNET_CADET_Handle *h = cls;
1302   const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1303   struct GNUNET_PeerIdentity *paths_array;
1304   size_t esize;
1305   unsigned int epaths;
1306   unsigned int paths;
1307   unsigned int peers;
1308
1309   if (NULL == h->info_cb.peer_cb)
1310   {
1311     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1312                 "  no handler for peer monitor message!\n");
1313     goto clean_cls;
1314   }
1315
1316   /* Verify message sanity */
1317   esize = ntohs (message->header.size);
1318   if (esize < msize)
1319   {
1320     GNUNET_break_op (0);
1321     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1322     goto clean_cls;
1323   }
1324   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
1325   {
1326     GNUNET_break_op (0);
1327     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1328     goto clean_cls;
1329
1330   }
1331   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
1332   epaths = (unsigned int) ntohs (message->paths);
1333   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1334   paths = 0;
1335   for (int i = 0; i < peers; i++)
1336   {
1337     if (0 == memcmp (&paths_array[i], &message->destination,
1338                      sizeof (struct GNUNET_PeerIdentity)))
1339     {
1340       paths++;
1341     }
1342   }
1343   if (paths != epaths)
1344   {
1345     GNUNET_break_op (0);
1346     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1347     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1348     goto clean_cls;
1349   }
1350
1351   return GNUNET_OK;
1352
1353 clean_cls:
1354   h->info_cb.peer_cb = NULL;
1355   h->info_cls = NULL;
1356   return GNUNET_SYSERR;
1357 }
1358
1359
1360 /**
1361  * Process a local peer info reply, pass info to the user.
1362  *
1363  * @param cls Closure (Cadet handle).
1364  * @param message Message itself.
1365  */
1366 static void
1367 handle_get_peer (void *cls,
1368                  const struct GNUNET_CADET_LocalInfoPeer *message)
1369 {
1370   struct GNUNET_CADET_Handle *h = cls;
1371   struct GNUNET_PeerIdentity *paths_array;
1372   unsigned int paths;
1373   unsigned int path_length;
1374   int neighbor;
1375   unsigned int peers;
1376
1377   paths = (unsigned int) ntohs (message->paths);
1378   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1379   peers = (ntohs (message->header.size) - sizeof (*message))
1380           / sizeof (struct GNUNET_PeerIdentity);
1381   path_length = 0;
1382   neighbor = GNUNET_NO;
1383
1384   for (int i = 0; i < peers; i++)
1385   {
1386     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
1387     path_length++;
1388     if (0 == memcmp (&paths_array[i], &message->destination,
1389                      sizeof (struct GNUNET_PeerIdentity)))
1390     {
1391       if (1 == path_length)
1392         neighbor = GNUNET_YES;
1393       path_length = 0;
1394     }
1395   }
1396
1397   /* Call Callback with tunnel info. */
1398   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1399   h->info_cb.peer_cb (h->info_cls,
1400                       &message->destination,
1401                       (int) ntohs (message->tunnel),
1402                       neighbor,
1403                       paths,
1404                       paths_array);
1405 }
1406
1407
1408 /**
1409  * Check that message received from CADET service is well-formed.
1410  *
1411  * @param cls the `struct GNUNET_CADET_Handle`
1412  * @param msg the message we got
1413  * @return #GNUNET_OK if the message is well-formed,
1414  *         #GNUNET_SYSERR otherwise
1415  */
1416 static int
1417 check_get_tunnels (void *cls,
1418                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1419 {
1420   struct GNUNET_CADET_Handle *h = cls;
1421   uint16_t size;
1422
1423   if (NULL == h->info_cb.tunnels_cb)
1424   {
1425     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1426                 "  no handler for tunnels monitor message!\n");
1427     return GNUNET_SYSERR;
1428   }
1429
1430   size = ntohs (msg->header.size);
1431   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1432   {
1433     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1434     h->info_cb.tunnels_cb = NULL;
1435     h->info_cls = NULL;
1436     return GNUNET_SYSERR;
1437   }
1438   return GNUNET_OK;
1439 }
1440
1441
1442 /**
1443  * Process a local reply about info on all tunnels, pass info to the user.
1444  *
1445  * @param cls Closure (Cadet handle).
1446  * @param message Message itself.
1447  */
1448 static void
1449 handle_get_tunnels (void *cls,
1450                     const struct GNUNET_CADET_LocalInfoTunnel *msg)
1451 {
1452   struct GNUNET_CADET_Handle *h = cls;
1453
1454   h->info_cb.tunnels_cb (h->info_cls,
1455                          &msg->destination,
1456                          ntohl (msg->channels),
1457                          ntohl (msg->connections),
1458                          ntohs (msg->estate),
1459                          ntohs (msg->cstate));
1460
1461 }
1462
1463
1464 /**
1465  * Check that message received from CADET service is well-formed.
1466  *
1467  * @param cls the `struct GNUNET_CADET_Handle`
1468  * @param msg the message we got
1469  * @return #GNUNET_OK if the message is well-formed,
1470  *         #GNUNET_SYSERR otherwise
1471  */
1472 static int
1473 check_get_tunnel (void *cls,
1474                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
1475 {
1476   struct GNUNET_CADET_Handle *h = cls;
1477   unsigned int ch_n;
1478   unsigned int c_n;
1479   size_t esize;
1480   size_t msize;
1481
1482   if (NULL == h->info_cb.tunnel_cb)
1483   {
1484     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1485                 "  no handler for tunnel monitor message!\n");
1486     goto clean_cls;
1487   }
1488
1489   /* Verify message sanity */
1490   msize = ntohs (msg->header.size);
1491   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1492   if (esize > msize)
1493   {
1494     GNUNET_break_op (0);
1495     h->info_cb.tunnel_cb (h->info_cls,
1496                           NULL, 0, 0, NULL, NULL, 0, 0);
1497     goto clean_cls;
1498   }
1499   ch_n = ntohl (msg->channels);
1500   c_n = ntohl (msg->connections);
1501   esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber);
1502   esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier);
1503   if (msize != esize)
1504   {
1505     GNUNET_break_op (0);
1506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1507                 "m:%u, e: %u (%u ch, %u conn)\n",
1508                 (unsigned int) msize,
1509                 (unsigned int) esize,
1510                 ch_n,
1511                 c_n);
1512     h->info_cb.tunnel_cb (h->info_cls,
1513                           NULL, 0, 0, NULL, NULL, 0, 0);
1514     goto clean_cls;
1515   }
1516
1517   return GNUNET_OK;
1518
1519 clean_cls:
1520   h->info_cb.tunnel_cb = NULL;
1521   h->info_cls = NULL;
1522   return GNUNET_SYSERR;
1523 }
1524
1525
1526 /**
1527  * Process a local tunnel info reply, pass info to the user.
1528  *
1529  * @param cls Closure (Cadet handle).
1530  * @param msg Message itself.
1531  */
1532 static void
1533 handle_get_tunnel (void *cls,
1534                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1535 {
1536   struct GNUNET_CADET_Handle *h = cls;
1537   unsigned int ch_n;
1538   unsigned int c_n;
1539   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
1540   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
1541
1542   ch_n = ntohl (msg->channels);
1543   c_n = ntohl (msg->connections);
1544
1545   /* Call Callback with tunnel info. */
1546   conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
1547   chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n];
1548   h->info_cb.tunnel_cb (h->info_cls,
1549                         &msg->destination,
1550                         ch_n,
1551                         c_n,
1552                         chns,
1553                         conns,
1554                         ntohs (msg->estate),
1555                         ntohs (msg->cstate));
1556 }
1557
1558
1559 /**
1560  * Reconnect to the service, retransmit all infomation to try to restore the
1561  * original state.
1562  *
1563  * @param h handle to the cadet
1564  */
1565 static void
1566 reconnect (struct GNUNET_CADET_Handle *h)
1567 {
1568   struct GNUNET_MQ_MessageHandler handlers[] = {
1569     GNUNET_MQ_hd_fixed_size (channel_created,
1570                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
1571                              struct GNUNET_CADET_LocalChannelCreateMessage,
1572                              h),
1573     GNUNET_MQ_hd_fixed_size (channel_destroy,
1574                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
1575                              struct GNUNET_CADET_LocalChannelDestroyMessage,
1576                              h),
1577     GNUNET_MQ_hd_var_size (local_data,
1578                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
1579                            struct GNUNET_CADET_LocalData,
1580                            h),
1581     GNUNET_MQ_hd_fixed_size (local_ack,
1582                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
1583                              struct GNUNET_CADET_LocalAck,
1584                              h),
1585     GNUNET_MQ_hd_var_size (get_peers,
1586                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
1587                            struct GNUNET_CADET_LocalInfoPeer,
1588                            h),
1589     GNUNET_MQ_hd_var_size (get_peer,
1590                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
1591                            struct GNUNET_CADET_LocalInfoPeer,
1592                            h),
1593     GNUNET_MQ_hd_var_size (get_tunnels,
1594                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
1595                            struct GNUNET_CADET_LocalInfoTunnel,
1596                            h),
1597     GNUNET_MQ_hd_var_size (get_tunnel,
1598                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
1599                            struct GNUNET_CADET_LocalInfoTunnel,
1600                            h),
1601 // FIXME
1602 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
1603 //                        GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
1604 //                        struct GNUNET_CADET_ChannelDestroyMessage);
1605     GNUNET_MQ_handler_end ()
1606   };
1607   struct GNUNET_CADET_Channel *ch;
1608
1609   while (NULL != (ch = h->channels_head))
1610   {
1611     LOG (GNUNET_ERROR_TYPE_DEBUG,
1612          "Destroying channel due to a reconnect\n");
1613     destroy_channel (ch);
1614   }
1615
1616   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
1617
1618   if (NULL != h->mq)
1619   {
1620     GNUNET_MQ_destroy (h->mq);
1621     h->mq = NULL;
1622   }
1623   h->mq = GNUNET_CLIENT_connect (h->cfg,
1624                                  "cadet",
1625                                  handlers,
1626                                  &handle_mq_error,
1627                                  h);
1628   if (NULL == h->mq)
1629   {
1630     schedule_reconnect (h);
1631     return;
1632   }
1633   else
1634   {
1635     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1636   }
1637 }
1638
1639 /**
1640  * Reconnect callback: tries to reconnect again after a failer previous
1641  * reconnecttion
1642  *
1643  * @param cls closure (cadet handle)
1644  */
1645 static void
1646 reconnect_cbk (void *cls)
1647 {
1648   struct GNUNET_CADET_Handle *h = cls;
1649
1650   h->reconnect_task = NULL;
1651   reconnect (h);
1652 }
1653
1654
1655 /**
1656  * Reconnect to the service, retransmit all infomation to try to restore the
1657  * original state.
1658  *
1659  * @param h handle to the cadet
1660  *
1661  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
1662  */
1663 static void
1664 schedule_reconnect (struct GNUNET_CADET_Handle *h)
1665 {
1666   if (NULL == h->reconnect_task)
1667   {
1668     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
1669                                                       &reconnect_cbk, h);
1670     h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
1671   }
1672 }
1673
1674
1675 /******************************************************************************/
1676 /**********************      API CALL DEFINITIONS     *************************/
1677 /******************************************************************************/
1678
1679 struct GNUNET_CADET_Handle *
1680 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1681                       void *cls,
1682                       GNUNET_CADET_ChannelEndHandler cleaner,
1683                       const struct GNUNET_CADET_MessageHandler *handlers)
1684 {
1685   struct GNUNET_CADET_Handle *h;
1686
1687   h = GNUNET_new (struct GNUNET_CADET_Handle);
1688   LOG (GNUNET_ERROR_TYPE_DEBUG,
1689        "GNUNET_CADET_connect() %p\n",
1690        h);
1691   h->cfg = cfg;
1692   h->cleaner = cleaner;
1693   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1694   reconnect (h);
1695   if (h->mq == NULL)
1696   {
1697     GNUNET_break (0);
1698     GNUNET_CADET_disconnect (h);
1699     return NULL;
1700   }
1701   h->cls = cls;
1702   h->message_handlers = handlers;
1703   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1704   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1705   h->reconnect_task = NULL;
1706
1707   /* count handlers */
1708   for (h->n_handlers = 0;
1709        handlers && handlers[h->n_handlers].type;
1710        h->n_handlers++) ;
1711   return h;
1712 }
1713
1714
1715 /**
1716  * Disconnect from the cadet service. All channels will be destroyed. All channel
1717  * disconnect callbacks will be called on any still connected peers, notifying
1718  * about their disconnection. The registered inbound channel cleaner will be
1719  * called should any inbound channels still exist.
1720  *
1721  * @param handle connection to cadet to disconnect
1722  */
1723 void
1724 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1725 {
1726   struct GNUNET_CADET_Channel *ch;
1727   struct GNUNET_CADET_Channel *aux;
1728   struct GNUNET_CADET_TransmitHandle *th;
1729
1730   LOG (GNUNET_ERROR_TYPE_DEBUG,
1731        "CADET DISCONNECT\n");
1732   ch = handle->channels_head;
1733   while (NULL != ch)
1734   {
1735     aux = ch->next;
1736     if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1737     {
1738       GNUNET_break (0);
1739       LOG (GNUNET_ERROR_TYPE_DEBUG,
1740            "channel %X not destroyed\n",
1741            ntohl (ch->ccn.channel_of_client));
1742     }
1743     destroy_channel (ch);
1744     ch = aux;
1745   }
1746   while (NULL != (th = handle->th_head))
1747   {
1748     struct GNUNET_MessageHeader *msg;
1749
1750     /* Make sure it is an allowed packet (everything else should have been
1751      * already canceled).
1752      */
1753     GNUNET_break (GNUNET_NO == th_is_payload (th));
1754     msg = (struct GNUNET_MessageHeader *) &th[1];
1755     switch (ntohs(msg->type))
1756     {
1757       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1758       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1759       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN:
1760       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE:
1761       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1762       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1763       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1764       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1765       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1766       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1767         break;
1768       default:
1769         GNUNET_break (0);
1770         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1771              GC_m2s (ntohs(msg->type)));
1772     }
1773
1774     GNUNET_CADET_notify_transmit_ready_cancel (th);
1775   }
1776
1777   if (NULL != handle->mq)
1778   {
1779     GNUNET_MQ_destroy (handle->mq);
1780     handle->mq = NULL;
1781   }
1782   if (NULL != handle->reconnect_task)
1783   {
1784     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1785     handle->reconnect_task = NULL;
1786   }
1787
1788   GNUNET_CONTAINER_multihashmap_destroy (handle->ports);
1789   handle->ports = NULL;
1790   GNUNET_free (handle);
1791 }
1792
1793
1794 /**
1795  * Open a port to receive incomming channels.
1796  *
1797  * @param h CADET handle.
1798  * @param port Hash representing the port number.
1799  * @param new_channel Function called when an channel is received.
1800  * @param new_channel_cls Closure for @a new_channel.
1801  * @return Port handle.
1802  */
1803 struct GNUNET_CADET_Port *
1804 GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1805                         const struct GNUNET_HashCode *port,
1806                         GNUNET_CADET_InboundChannelNotificationHandler
1807                             new_channel,
1808                         void *new_channel_cls)
1809 {
1810   struct GNUNET_CADET_PortMessage *msg;
1811   struct GNUNET_MQ_Envelope *env;
1812   struct GNUNET_CADET_Port *p;
1813
1814   GNUNET_assert (NULL != new_channel);
1815   p = GNUNET_new (struct GNUNET_CADET_Port);
1816   p->cadet = h;
1817   p->hash = GNUNET_new (struct GNUNET_HashCode);
1818   *p->hash = *port;
1819   p->handler = new_channel;
1820   p->cls = new_channel_cls;
1821   GNUNET_assert (GNUNET_OK ==
1822                  GNUNET_CONTAINER_multihashmap_put (h->ports,
1823                                                     p->hash,
1824                                                     p,
1825                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1826
1827   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1828   msg->port = *p->hash;
1829   GNUNET_MQ_send (h->mq, env);
1830
1831   return p;
1832 }
1833
1834 /**
1835  * Close a port opened with @a GNUNET_CADET_open_port.
1836  * The @a new_channel callback will no longer be called.
1837  *
1838  * @param p Port handle.
1839  */
1840 void
1841 GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1842 {
1843   struct GNUNET_CADET_PortMessage *msg;
1844   struct GNUNET_MQ_Envelope *env;
1845   struct GNUNET_HashCode *id;
1846
1847   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1848
1849   id = NULL != p->hash ? p->hash : &p->id;
1850   msg->port = *id;
1851   GNUNET_MQ_send (p->cadet->mq, env);
1852   GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
1853   GNUNET_free_non_null (p->hash);
1854   GNUNET_free (p);
1855 }
1856
1857
1858 /**
1859  * Create a new channel towards a remote peer.
1860  *
1861  * If the destination port is not open by any peer or the destination peer
1862  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1863  * for this channel.
1864  *
1865  * @param h cadet handle
1866  * @param channel_ctx client's channel context to associate with the channel
1867  * @param peer peer identity the channel should go to
1868  * @param port Port hash (port number).
1869  * @param options CadetOption flag field, with all desired option bits set to 1.
1870  * @return handle to the channel
1871  */
1872 struct GNUNET_CADET_Channel *
1873 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1874                             void *channel_ctx,
1875                             const struct GNUNET_PeerIdentity *peer,
1876                             const struct GNUNET_HashCode *port,
1877                             enum GNUNET_CADET_ChannelOption options)
1878 {
1879   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
1880   struct GNUNET_MQ_Envelope *env;
1881   struct GNUNET_CADET_Channel *ch;
1882   struct GNUNET_CADET_ClientChannelNumber ccn;
1883
1884   ccn.channel_of_client = htonl (0);
1885   ch = create_channel (h, ccn);
1886   ch->ctx = channel_ctx;
1887   ch->peer = GNUNET_PEER_intern (peer);
1888
1889   LOG (GNUNET_ERROR_TYPE_DEBUG,
1890        "Creating new channel to %s:%u at %p number %X\n",
1891        GNUNET_i2s (peer),
1892        port,
1893        ch,
1894        ntohl (ch->ccn.channel_of_client));
1895   env = GNUNET_MQ_msg (msg,
1896                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1897   msg->ccn = ch->ccn;
1898   msg->port = *port;
1899   msg->peer = *peer;
1900   msg->opt = htonl (options);
1901   GNUNET_MQ_send (h->mq,
1902                   env);
1903   return ch;
1904 }
1905
1906
1907 void
1908 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1909 {
1910   struct GNUNET_CADET_Handle *h;
1911   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
1912   struct GNUNET_MQ_Envelope *env;
1913   struct GNUNET_CADET_TransmitHandle *th;
1914   struct GNUNET_CADET_TransmitHandle *next;
1915
1916   LOG (GNUNET_ERROR_TYPE_DEBUG,
1917        "Destroying channel\n");
1918   h = channel->cadet;
1919   for  (th = h->th_head; th != NULL; th = next)
1920   {
1921     next = th->next;
1922     if (th->channel == channel)
1923     {
1924       GNUNET_break (0);
1925       if (GNUNET_YES == th_is_payload (th))
1926       {
1927         /* applications should cancel before destroying channel */
1928         LOG (GNUNET_ERROR_TYPE_WARNING,
1929              "Channel destroyed without cancelling transmission requests\n");
1930         th->notify (th->notify_cls, 0, NULL);
1931       }
1932       else
1933       {
1934         LOG (GNUNET_ERROR_TYPE_WARNING,
1935              "no meta-traffic should be queued\n");
1936       }
1937       GNUNET_CONTAINER_DLL_remove (h->th_head,
1938                                    h->th_tail,
1939                                    th);
1940       GNUNET_CADET_notify_transmit_ready_cancel (th);
1941     }
1942   }
1943
1944   env = GNUNET_MQ_msg (msg,
1945                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1946   msg->ccn = channel->ccn;
1947   GNUNET_MQ_send (h->mq,
1948                   env);
1949
1950   destroy_channel (channel);
1951 }
1952
1953
1954 /**
1955  * Get information about a channel.
1956  *
1957  * @param channel Channel handle.
1958  * @param option Query (GNUNET_CADET_OPTION_*).
1959  * @param ... dependant on option, currently not used
1960  *
1961  * @return Union with an answer to the query.
1962  */
1963 const union GNUNET_CADET_ChannelInfo *
1964 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1965                               enum GNUNET_CADET_ChannelOption option, ...)
1966 {
1967   static int bool_flag;
1968   const union GNUNET_CADET_ChannelInfo *ret;
1969
1970   switch (option)
1971   {
1972     case GNUNET_CADET_OPTION_NOBUFFER:
1973     case GNUNET_CADET_OPTION_RELIABLE:
1974     case GNUNET_CADET_OPTION_OUT_OF_ORDER:
1975       if (0 != (option & channel->options))
1976         bool_flag = GNUNET_YES;
1977       else
1978         bool_flag = GNUNET_NO;
1979       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1980       break;
1981     case GNUNET_CADET_OPTION_PEER:
1982       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1983       break;
1984     default:
1985       GNUNET_break (0);
1986       return NULL;
1987   }
1988
1989   return ret;
1990 }
1991
1992
1993 struct GNUNET_CADET_TransmitHandle *
1994 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
1995                                     int cork,
1996                                     struct GNUNET_TIME_Relative maxdelay,
1997                                     size_t notify_size,
1998                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
1999                                     void *notify_cls)
2000 {
2001   struct GNUNET_CADET_TransmitHandle *th;
2002
2003   GNUNET_assert (NULL != channel);
2004   GNUNET_assert (NULL != notify);
2005   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
2006   LOG (GNUNET_ERROR_TYPE_DEBUG,
2007        "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
2008        ntohl (channel->ccn.channel_of_client),
2009        channel->allow_send,
2010        (ntohl (channel->ccn.channel_of_client) >=
2011         GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
2012        ? "origin"
2013        : "destination",
2014        (unsigned int) notify_size);
2015   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
2016   {
2017     LOG (GNUNET_ERROR_TYPE_WARNING,
2018          "CADET transmit ready timeout is deprected (has no effect)\n");
2019   }
2020
2021   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
2022   th->channel = channel;
2023   th->size = notify_size;
2024   th->notify = notify;
2025   th->notify_cls = notify_cls;
2026   if (0 != channel->allow_send)
2027     th->request_data_task
2028       = GNUNET_SCHEDULER_add_now (&request_data,
2029                                   th);
2030   else
2031     add_to_queue (channel->cadet,
2032                   th);
2033   return th;
2034 }
2035
2036
2037 void
2038 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
2039 {
2040   if (NULL != th->request_data_task)
2041   {
2042     GNUNET_SCHEDULER_cancel (th->request_data_task);
2043     th->request_data_task = NULL;
2044   }
2045   remove_from_queue (th);
2046   GNUNET_free (th);
2047 }
2048
2049
2050 /**
2051  * Send an ack on the channel to confirm the processing of a message.
2052  *
2053  * @param ch Channel on which to send the ACK.
2054  */
2055 void
2056 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
2057 {
2058   struct GNUNET_CADET_LocalAck *msg;
2059   struct GNUNET_MQ_Envelope *env;
2060
2061   env = GNUNET_MQ_msg (msg,
2062                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
2063   LOG (GNUNET_ERROR_TYPE_DEBUG,
2064        "Sending ACK on channel %X\n",
2065        ntohl (channel->ccn.channel_of_client));
2066   msg->ccn = channel->ccn;
2067   GNUNET_MQ_send (channel->cadet->mq,
2068                   env);
2069 }
2070
2071
2072 static void
2073 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
2074 {
2075   struct GNUNET_MessageHeader *msg;
2076   struct GNUNET_MQ_Envelope *env;
2077
2078   LOG (GNUNET_ERROR_TYPE_DEBUG,
2079        " Sending %s monitor message to service\n",
2080        GC_m2s(type));
2081
2082   env = GNUNET_MQ_msg (msg, type);
2083   GNUNET_MQ_send (h->mq, env);
2084 }
2085
2086
2087 /**
2088  * Request a debug dump on the service's STDERR.
2089  *
2090  * WARNING: unstable API, likely to change in the future!
2091  *
2092  * @param h cadet handle
2093  */
2094 void
2095 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
2096 {
2097   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
2098   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
2099 }
2100
2101
2102 /**
2103  * Request information about peers known to the running cadet service.
2104  * The callback will be called for every peer known to the service.
2105  * Only one info request (of any kind) can be active at once.
2106  *
2107  *
2108  * WARNING: unstable API, likely to change in the future!
2109  *
2110  * @param h Handle to the cadet peer.
2111  * @param callback Function to call with the requested data.
2112  * @param callback_cls Closure for @c callback.
2113  *
2114  * @return #GNUNET_OK / #GNUNET_SYSERR
2115  */
2116 int
2117 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
2118                        GNUNET_CADET_PeersCB callback,
2119                        void *callback_cls)
2120 {
2121   if (NULL != h->info_cb.peers_cb)
2122   {
2123     GNUNET_break (0);
2124     return GNUNET_SYSERR;
2125   }
2126   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
2127   h->info_cb.peers_cb = callback;
2128   h->info_cls = callback_cls;
2129   return GNUNET_OK;
2130 }
2131
2132
2133 /**
2134  * Cancel a peer info request. The callback will not be called (anymore).
2135  *
2136  * WARNING: unstable API, likely to change in the future!
2137  *
2138  * @param h Cadet handle.
2139  *
2140  * @return Closure given to GNUNET_CADET_get_peers.
2141  */
2142 void *
2143 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
2144 {
2145   void *cls;
2146
2147   cls = h->info_cls;
2148   h->info_cb.peers_cb = NULL;
2149   h->info_cls = NULL;
2150   return cls;
2151 }
2152
2153
2154 /**
2155  * Request information about a peer known to the running cadet peer.
2156  * The callback will be called for the tunnel once.
2157  * Only one info request (of any kind) can be active at once.
2158  *
2159  * WARNING: unstable API, likely to change in the future!
2160  *
2161  * @param h Handle to the cadet peer.
2162  * @param id Peer whose tunnel to examine.
2163  * @param callback Function to call with the requested data.
2164  * @param callback_cls Closure for @c callback.
2165  *
2166  * @return #GNUNET_OK / #GNUNET_SYSERR
2167  */
2168 int
2169 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
2170                        const struct GNUNET_PeerIdentity *id,
2171                        GNUNET_CADET_PeerCB callback,
2172                        void *callback_cls)
2173 {
2174   struct GNUNET_CADET_LocalInfo *msg;
2175   struct GNUNET_MQ_Envelope *env;
2176
2177   if (NULL != h->info_cb.peer_cb)
2178   {
2179     GNUNET_break (0);
2180     return GNUNET_SYSERR;
2181   }
2182
2183   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
2184   msg->peer = *id;
2185   GNUNET_MQ_send (h->mq, env);
2186
2187   h->info_cb.peer_cb = callback;
2188   h->info_cls = callback_cls;
2189   return GNUNET_OK;
2190 }
2191
2192
2193 /**
2194  * Request information about tunnels of the running cadet peer.
2195  * The callback will be called for every tunnel of the service.
2196  * Only one info request (of any kind) can be active at once.
2197  *
2198  * WARNING: unstable API, likely to change in the future!
2199  *
2200  * @param h Handle to the cadet peer.
2201  * @param callback Function to call with the requested data.
2202  * @param callback_cls Closure for @c callback.
2203  *
2204  * @return #GNUNET_OK / #GNUNET_SYSERR
2205  */
2206 int
2207 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
2208                          GNUNET_CADET_TunnelsCB callback,
2209                          void *callback_cls)
2210 {
2211   if (NULL != h->info_cb.tunnels_cb)
2212   {
2213     GNUNET_break (0);
2214     return GNUNET_SYSERR;
2215   }
2216   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
2217   h->info_cb.tunnels_cb = callback;
2218   h->info_cls = callback_cls;
2219   return GNUNET_OK;
2220 }
2221
2222
2223 /**
2224  * Cancel a monitor request. The monitor callback will not be called.
2225  *
2226  * @param h Cadet handle.
2227  *
2228  * @return Closure given to GNUNET_CADET_get_tunnels.
2229  */
2230 void *
2231 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
2232 {
2233   void *cls;
2234
2235   h->info_cb.tunnels_cb = NULL;
2236   cls = h->info_cls;
2237   h->info_cls = NULL;
2238
2239   return cls;
2240 }
2241
2242
2243
2244 /**
2245  * Request information about a tunnel of the running cadet peer.
2246  * The callback will be called for the tunnel once.
2247  * Only one info request (of any kind) can be active at once.
2248  *
2249  * WARNING: unstable API, likely to change in the future!
2250  *
2251  * @param h Handle to the cadet peer.
2252  * @param id Peer whose tunnel to examine.
2253  * @param callback Function to call with the requested data.
2254  * @param callback_cls Closure for @c callback.
2255  *
2256  * @return #GNUNET_OK / #GNUNET_SYSERR
2257  */
2258 int
2259 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2260                         const struct GNUNET_PeerIdentity *id,
2261                         GNUNET_CADET_TunnelCB callback,
2262                         void *callback_cls)
2263 {
2264   struct GNUNET_CADET_LocalInfo *msg;
2265   struct GNUNET_MQ_Envelope *env;
2266
2267   if (NULL != h->info_cb.tunnel_cb)
2268   {
2269     GNUNET_break (0);
2270     return GNUNET_SYSERR;
2271   }
2272
2273   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2274   msg->peer = *id;
2275   GNUNET_MQ_send (h->mq, env);
2276
2277   h->info_cb.tunnel_cb = callback;
2278   h->info_cls = callback_cls;
2279   return GNUNET_OK;
2280 }
2281
2282
2283 /**
2284  * Request information about a specific channel of the running cadet peer.
2285  *
2286  * WARNING: unstable API, likely to change in the future!
2287  * FIXME Add destination option.
2288  *
2289  * @param h Handle to the cadet peer.
2290  * @param initiator ID of the owner of the channel.
2291  * @param channel_number Channel number.
2292  * @param callback Function to call with the requested data.
2293  * @param callback_cls Closure for @c callback.
2294  *
2295  * @return #GNUNET_OK / #GNUNET_SYSERR
2296  */
2297 int
2298 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2299                            struct GNUNET_PeerIdentity *initiator,
2300                            unsigned int channel_number,
2301                            GNUNET_CADET_ChannelCB callback,
2302                            void *callback_cls)
2303 {
2304   struct GNUNET_CADET_LocalInfo *msg;
2305   struct GNUNET_MQ_Envelope *env;
2306
2307   if (NULL != h->info_cb.channel_cb)
2308   {
2309     GNUNET_break (0);
2310     return GNUNET_SYSERR;
2311   }
2312
2313   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2314   msg->peer = *initiator;
2315   msg->ccn.channel_of_client = htonl (channel_number);
2316   GNUNET_MQ_send (h->mq, env);
2317
2318   h->info_cb.channel_cb = callback;
2319   h->info_cls = callback_cls;
2320   return GNUNET_OK;
2321 }
2322
2323
2324 /**
2325  * Function called to notify a client about the connection
2326  * begin ready to queue more data.  "buf" will be
2327  * NULL and "size" zero if the connection was closed for
2328  * writing in the meantime.
2329  *
2330  * @param cls closure
2331  * @param size number of bytes available in buf
2332  * @param buf where the callee should write the message
2333  * @return number of bytes written to buf
2334  */
2335 static size_t
2336 cadet_mq_ntr (void *cls, size_t size,
2337              void *buf)
2338 {
2339   struct GNUNET_MQ_Handle *mq = cls;
2340   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2341   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2342   uint16_t msize;
2343
2344   state->th = NULL;
2345   if (NULL == buf)
2346   {
2347     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2348     return 0;
2349   }
2350   msize = ntohs (msg->size);
2351   GNUNET_assert (msize <= size);
2352   GNUNET_memcpy (buf, msg, msize);
2353   GNUNET_MQ_impl_send_continue (mq);
2354   return msize;
2355 }
2356
2357
2358 /**
2359  * Signature of functions implementing the
2360  * sending functionality of a message queue.
2361  *
2362  * @param mq the message queue
2363  * @param msg the message to send
2364  * @param impl_state state of the implementation
2365  */
2366 static void
2367 cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2368                         const struct GNUNET_MessageHeader *msg,
2369                         void *impl_state)
2370 {
2371   struct CadetMQState *state = impl_state;
2372
2373   GNUNET_assert (NULL == state->th);
2374   state->th =
2375       GNUNET_CADET_notify_transmit_ready (state->channel,
2376                                          /* FIXME: add option for corking */
2377                                          GNUNET_NO,
2378                                          GNUNET_TIME_UNIT_FOREVER_REL,
2379                                          ntohs (msg->size),
2380                                          &cadet_mq_ntr, mq);
2381
2382 }
2383
2384
2385 /**
2386  * Signature of functions implementing the
2387  * destruction of a message queue.
2388  * Implementations must not free 'mq', but should
2389  * take care of 'impl_state'.
2390  *
2391  * @param mq the message queue to destroy
2392  * @param impl_state state of the implementation
2393  */
2394 static void
2395 cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2396                            void *impl_state)
2397 {
2398   struct CadetMQState *state = impl_state;
2399
2400   if (NULL != state->th)
2401     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2402
2403   GNUNET_free (state);
2404 }
2405
2406
2407 /**
2408  * Create a message queue for a cadet channel.
2409  * The message queue can only be used to transmit messages,
2410  * not to receive them.
2411  *
2412  * @param channel the channel to create the message qeue for
2413  * @return a message queue to messages over the channel
2414  */
2415 struct GNUNET_MQ_Handle *
2416 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2417 {
2418   struct GNUNET_MQ_Handle *mq;
2419   struct CadetMQState *state;
2420
2421   state = GNUNET_new (struct CadetMQState);
2422   state->channel = channel;
2423
2424   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2425                                       &cadet_mq_destroy_impl_old,
2426                                       NULL, /* FIXME: cancel impl. */
2427                                       state,
2428                                       NULL, /* no msg handlers */
2429                                       NULL, /* no err handlers */
2430                                       NULL); /* no handler cls */
2431   return mq;
2432 }
2433
2434
2435 /**
2436  * Transitional function to convert an unsigned int port to a hash value.
2437  * WARNING: local static value returned, NOT reentrant!
2438  * WARNING: do not use this function for new code!
2439  *
2440  * @param port Numerical port (unsigned int format).
2441  *
2442  * @return A GNUNET_HashCode usable for the new CADET API.
2443  */
2444 const struct GNUNET_HashCode *
2445 GC_u2h (uint32_t port)
2446 {
2447   static struct GNUNET_HashCode hash;
2448
2449   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2450               "This is a transitional function, "
2451               "use proper crypto hashes as CADET ports\n");
2452   GNUNET_CRYPTO_hash (&port, sizeof (port), &hash);
2453
2454   return &hash;
2455 }
2456
2457
2458
2459 /******************************************************************************/
2460 /******************************* MQ-BASED API *********************************/
2461 /******************************************************************************/
2462
2463 /**
2464  * Connect to the MQ-based cadet service.
2465  *
2466  * @param cfg Configuration to use.
2467  *
2468  * @return Handle to the cadet service NULL on error.
2469  */
2470 struct GNUNET_CADET_Handle *
2471 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2472 {
2473   struct GNUNET_CADET_Handle *h;
2474
2475   LOG (GNUNET_ERROR_TYPE_DEBUG,
2476        "GNUNET_CADET_connecT()\n");
2477   h = GNUNET_new (struct GNUNET_CADET_Handle);
2478   h->cfg = cfg;
2479   h->mq_api = GNUNET_YES;
2480   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2481   reconnect (h);
2482   if (NULL == h->mq)
2483   {
2484     GNUNET_break (0);
2485     GNUNET_CADET_disconnect (h);
2486     return NULL;
2487   }
2488   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2489   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2490   h->reconnect_task = NULL;
2491
2492   return h;
2493 }
2494
2495
2496 /**
2497  * Open a port to receive incomming MQ-based channels.
2498  *
2499  * @param h CADET handle.
2500  * @param port Hash identifying the port.
2501  * @param connects Function called when an incoming channel is connected.
2502  * @param connects_cls Closure for the @a connects handler.
2503  * @param window_changes Function called when the transmit window size changes.
2504  * @param disconnects Function called when a channel is disconnected.
2505  * @param handlers Callbacks for messages we care about, NULL-terminated.
2506  *
2507  * @return Port handle.
2508  */
2509 struct GNUNET_CADET_Port *
2510 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2511                         const struct GNUNET_HashCode *port,
2512                         GNUNET_CADET_ConnectEventHandler connects,
2513                         void * connects_cls,
2514                         GNUNET_CADET_WindowSizeEventHandler window_changes,
2515                         GNUNET_CADET_DisconnectEventHandler disconnects,
2516                         const struct GNUNET_MQ_MessageHandler *handlers)
2517 {
2518   struct GNUNET_CADET_PortMessage *msg;
2519   struct GNUNET_MQ_Envelope *env;
2520   struct GNUNET_CADET_Port *p;
2521
2522   GNUNET_assert (NULL != connects);
2523   GNUNET_assert (NULL != disconnects);
2524
2525   p = GNUNET_new (struct GNUNET_CADET_Port);
2526   p->cadet = h;
2527   p->id = *port;
2528   p->connects = connects;
2529   p->cls = connects_cls;
2530   p->window_changes = window_changes;
2531   p->disconnects = disconnects;
2532   p->handlers = handlers;
2533
2534   GNUNET_assert (GNUNET_OK ==
2535                  GNUNET_CONTAINER_multihashmap_put (h->ports,
2536                                                     p->hash,
2537                                                     p,
2538                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2539
2540   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2541   msg->port = p->id;
2542   GNUNET_MQ_send (h->mq, env);
2543
2544   return p;
2545 }
2546
2547
2548 /**
2549  * Create a new channel towards a remote peer.
2550  *
2551  * If the destination port is not open by any peer or the destination peer
2552  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2553  * for this channel.
2554  *
2555  * @param h CADET handle.
2556  * @param channel_cls Closure for the channel. It's given to:
2557  *                    - The disconnect handler @a disconnects
2558  *                    - Each message type callback in @a handlers
2559  * @param destination Peer identity the channel should go to.
2560  * @param port Identification of the destination port.
2561  * @param options CadetOption flag field, with all desired option bits set to 1.
2562  * @param window_changes Function called when the transmit window size changes.
2563  * @param disconnects Function called when the channel is disconnected.
2564  * @param handlers Callbacks for messages we care about, NULL-terminated.
2565  *
2566  * @return Handle to the channel.
2567  */
2568 struct GNUNET_CADET_Channel *
2569 GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2570                              void *channel_cls,
2571                              const struct GNUNET_PeerIdentity *destination,
2572                              const struct GNUNET_HashCode *port,
2573                              enum GNUNET_CADET_ChannelOption options,
2574                              GNUNET_CADET_WindowSizeEventHandler window_changes,
2575                              GNUNET_CADET_DisconnectEventHandler disconnects,
2576                              const struct GNUNET_MQ_MessageHandler *handlers)
2577 {
2578   struct GNUNET_CADET_Channel *ch;
2579   struct GNUNET_CADET_ClientChannelNumber ccn;
2580   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2581   struct GNUNET_MQ_Envelope *env;
2582
2583   GNUNET_assert (NULL != disconnects);
2584
2585   /* Save parameters */
2586   ccn.channel_of_client = htonl (0);
2587   ch = create_channel (h, ccn);
2588   ch->ctx = channel_cls;
2589   ch->peer = GNUNET_PEER_intern (destination);
2590   ch->options = options;
2591   ch->window_changes = window_changes;
2592   ch->disconnects = disconnects;
2593
2594   /* Create MQ for channel */
2595   ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2596                                           &cadet_mq_destroy_impl,
2597                                           &cadet_mq_cancel_impl,
2598                                           ch,
2599                                           handlers,
2600                                           &cadet_mq_error_handler,
2601                                           ch);
2602   GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2603
2604   /* Request channel creation to service */
2605   env = GNUNET_MQ_msg (msg,
2606                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2607   msg->ccn = ch->ccn;
2608   msg->port = *port;
2609   msg->peer = *destination;
2610   msg->opt = htonl (options);
2611   GNUNET_MQ_send (h->mq,
2612                   env);
2613
2614   return ch;
2615 }
2616
2617
2618 /**
2619  * Obtain the message queue for a connected peer.
2620  *
2621  * @param channel The channel handle from which to get the MQ.
2622  *
2623  * @return NULL if @a channel is not yet connected.
2624  */
2625 struct GNUNET_MQ_Handle *
2626 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2627 {
2628   return channel->mq;
2629 }