2de8106a017c4c179215d8310dd689f47ce1875f
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_cadet_service.h"
30 #include "cadet.h"
31 #include "cadet_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  *
42  * @deprecated
43  */
44 struct GNUNET_CADET_TransmitHandle
45 {
46   /**
47    * Double Linked list
48    */
49   struct GNUNET_CADET_TransmitHandle *next;
50
51   /**
52    * Double Linked list
53    */
54   struct GNUNET_CADET_TransmitHandle *prev;
55
56   /**
57    * Channel this message is sent on / for (may be NULL for control messages).
58    */
59   struct GNUNET_CADET_Channel *channel;
60
61   /**
62    * Request data task.
63    */
64   struct GNUNET_SCHEDULER_Task *request_data_task;
65
66   /**
67    * Callback to obtain the message to transmit, or NULL if we
68    * got the message in 'data'.  Notice that messages built
69    * by 'notify' need to be encapsulated with information about
70    * the 'target'.
71    */
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73
74   /**
75    * Closure for 'notify'
76    */
77   void *notify_cls;
78
79   /**
80    * Size of the payload.
81    */
82   size_t size;
83 };
84
85
86 union CadetInfoCB
87 {
88
89   /**
90    * Channel callback.
91    */
92   GNUNET_CADET_ChannelCB channel_cb;
93
94   /**
95    * Monitor callback
96    */
97   GNUNET_CADET_PeersCB peers_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeerCB peer_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_TunnelsCB tunnels_cb;
108
109   /**
110    * Tunnel callback.
111    */
112   GNUNET_CADET_TunnelCB tunnel_cb;
113 };
114
115
116 /**
117  * Opaque handle to the service.
118  */
119 struct GNUNET_CADET_Handle
120 {
121   /**
122    * Flag to indicate old or MQ API.
123    */
124   int mq_api;
125
126   /**
127    * Message queue (if available).
128    */
129   struct GNUNET_MQ_Handle *mq;
130
131   /**
132    * Set of handlers used for processing incoming messages in the channels
133    *
134    * @deprecated
135    */
136   const struct GNUNET_CADET_MessageHandler *message_handlers;
137
138   /**
139    * Number of handlers in the handlers array.
140    *
141    * @deprecated
142    */
143   unsigned int n_handlers;
144
145   /**
146    * Ports open.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap *ports;
149
150   /**
151    * Double linked list of the channels this client is connected to, head.
152    */
153   struct GNUNET_CADET_Channel *channels_head;
154
155   /**
156    * Double linked list of the channels this client is connected to, tail.
157    */
158   struct GNUNET_CADET_Channel *channels_tail;
159
160   /**
161    * Callback for inbound channel disconnection
162    */
163   GNUNET_CADET_ChannelEndHandler *cleaner;
164
165   /**
166    * Closure for all the handlers given by the client
167    *
168    * @deprecated
169    */
170   void *cls;
171
172   /**
173    * Messages to send to the service, head.
174    *
175    * @deprecated
176    */
177   struct GNUNET_CADET_TransmitHandle *th_head;
178
179   /**
180    * Messages to send to the service, tail.
181    *
182    * @deprecated
183    */
184   struct GNUNET_CADET_TransmitHandle *th_tail;
185
186   /**
187    * child of the next channel to create (to avoid reusing IDs often)
188    */
189   struct GNUNET_CADET_ClientChannelNumber next_ccn;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   struct GNUNET_SCHEDULER_Task * reconnect_task;
205
206   /**
207    * Callback for an info task (only one active at a time).
208    */
209   union CadetInfoCB info_cb;
210
211   /**
212    * Info callback closure for @c info_cb.
213    */
214   void *info_cls;
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_CADET_Peer
222 {
223   /**
224    * ID of the peer in short form
225    */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Channel this peer belongs to
230    */
231   struct GNUNET_CADET_Channel *t;
232 };
233
234
235 /**
236  * Opaque handle to a channel.
237  */
238 struct GNUNET_CADET_Channel
239 {
240   /**
241    * DLL next
242    */
243   struct GNUNET_CADET_Channel *next;
244
245   /**
246    * DLL prev
247    */
248   struct GNUNET_CADET_Channel *prev;
249
250   /**
251    * Handle to the cadet this channel belongs to
252    */
253   struct GNUNET_CADET_Handle *cadet;
254
255   /**
256    * Local ID of the channel
257    */
258   struct GNUNET_CADET_ClientChannelNumber ccn;
259
260   /**
261    * Channel's port, if incoming.
262    */
263   struct GNUNET_CADET_Port *incoming_port;
264
265   /**
266    * Other end of the channel.
267    */
268   GNUNET_PEER_Id peer;
269
270   /**
271    * Any data the caller wants to put in here
272    */
273   void *ctx;
274
275   /**
276    * Channel options: reliability, etc.
277    */
278   enum GNUNET_CADET_ChannelOption options;
279
280   /**
281    * Are we allowed to send to the service?
282    *
283    * @deprecated?
284    */
285   unsigned int allow_send;
286
287   /*****************************    MQ     ************************************/
288   /**
289    * Message Queue for the channel.
290    */
291   struct GNUNET_MQ_Handle *mq;
292
293   /**
294    * Task to allow mq to send more traffic.
295    */
296   struct GNUNET_SCHEDULER_Task *mq_cont;
297
298   /**
299    * Pending envelope in case we don't have an ACK from the service.
300    */
301   struct GNUNET_MQ_Envelope *pending_env;
302
303   /**
304    * Window change handler.
305    */
306   GNUNET_CADET_WindowSizeEventHandler window_changes;
307
308   /**
309    * Disconnect handler.
310    */
311   GNUNET_CADET_DisconnectEventHandler disconnects;
312
313 };
314
315
316 /**
317  * Opaque handle to a port.
318  */
319 struct GNUNET_CADET_Port
320 {
321   /**
322    * Handle to the CADET session this port belongs to.
323    */
324   struct GNUNET_CADET_Handle *cadet;
325
326   /**
327    * Port ID.
328    *
329    * @deprecated
330    */
331   struct GNUNET_HashCode *hash;
332
333   /**
334    * Callback handler for incoming channels on this port.
335    */
336   GNUNET_CADET_InboundChannelNotificationHandler *handler;
337
338   /**
339    * Closure for @a handler.
340    */
341   void *cls;
342
343   /*****************************    MQ     ************************************/
344
345   /**
346    * Port "number"
347    */
348   struct GNUNET_HashCode id;
349
350   /**
351    * Handler for incoming channels on this port
352    */
353   GNUNET_CADET_ConnectEventHandler connects;
354
355   /**
356    * Closure for @ref connects
357    */
358   void * connects_cls;
359
360   /**
361    * Window size change handler.
362    */
363   GNUNET_CADET_WindowSizeEventHandler window_changes;
364
365   /**
366    * Handler called when an incoming channel is destroyed..
367    */
368   GNUNET_CADET_DisconnectEventHandler disconnects;
369
370   /**
371    * Payload handlers for incoming channels.
372    */
373   const struct GNUNET_MQ_MessageHandler *handlers;
374 };
375
376
377 /**
378  * Implementation state for cadet's message queue.
379  */
380 struct CadetMQState
381 {
382   /**
383    * The current transmit handle, or NULL
384    * if no transmit is active.
385    */
386   struct GNUNET_CADET_TransmitHandle *th;
387
388   /**
389    * Channel to send the data over.
390    */
391   struct GNUNET_CADET_Channel *channel;
392 };
393
394
395
396 /******************************************************************************/
397 /*********************      FUNCTION DECLARATIONS     *************************/
398 /******************************************************************************/
399
400 /**
401  * Reconnect to the service, retransmit all infomation to try to restore the
402  * original state.
403  *
404  * @param h Handle to the CADET service.
405  */
406 static void
407 schedule_reconnect (struct GNUNET_CADET_Handle *h);
408
409
410 /**
411  * Reconnect callback: tries to reconnect again after a failer previous
412  * reconnection.
413  *
414  * @param cls Closure (cadet handle).
415  */
416 static void
417 reconnect_cbk (void *cls);
418
419
420 /**
421  * Reconnect to the service, retransmit all infomation to try to restore the
422  * original state.
423  *
424  * @param h handle to the cadet
425  */
426 static void
427 reconnect (struct GNUNET_CADET_Handle *h);
428
429
430 /******************************************************************************/
431 /***********************     AUXILIARY FUNCTIONS      *************************/
432 /******************************************************************************/
433
434 /**
435  * Check if transmission is a payload packet.
436  *
437  * @param th Transmission handle.
438  *
439  * @return #GNUNET_YES if it is a payload packet,
440  *         #GNUNET_NO if it is a cadet management packet.
441  */
442 static int
443 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
444 {
445   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
446 }
447
448
449 /**
450  * Find the Port struct for a hash.
451  *
452  * @param h CADET handle.
453  * @param hash HashCode for the port number.
454  *
455  * @return The port handle if known, NULL otherwise.
456  */
457 static struct GNUNET_CADET_Port *
458 find_port (const struct GNUNET_CADET_Handle *h,
459            const struct GNUNET_HashCode *hash)
460 {
461   struct GNUNET_CADET_Port *p;
462
463   p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
464
465   return p;
466 }
467
468
469 /**
470  * Get the channel handler for the channel specified by id from the given handle
471  *
472  * @param h Cadet handle
473  * @param ccn ID of the wanted channel
474  * @return handle to the required channel or NULL if not found
475  */
476 static struct GNUNET_CADET_Channel *
477 retrieve_channel (struct GNUNET_CADET_Handle *h,
478                   struct GNUNET_CADET_ClientChannelNumber ccn)
479 {
480   struct GNUNET_CADET_Channel *ch;
481
482   for (ch = h->channels_head; NULL != ch; ch = ch->next)
483     if (ch->ccn.channel_of_client == ccn.channel_of_client)
484       return ch;
485   return NULL;
486 }
487
488
489 /**
490  * Create a new channel and insert it in the channel list of the cadet handle
491  *
492  * @param h Cadet handle
493  * @param ccn Desired ccn of the channel, 0 to assign one automatically.
494  *
495  * @return Handle to the created channel.
496  */
497 static struct GNUNET_CADET_Channel *
498 create_channel (struct GNUNET_CADET_Handle *h,
499                 struct GNUNET_CADET_ClientChannelNumber ccn)
500 {
501   struct GNUNET_CADET_Channel *ch;
502
503   ch = GNUNET_new (struct GNUNET_CADET_Channel);
504   GNUNET_CONTAINER_DLL_insert (h->channels_head,
505                                h->channels_tail,
506                                ch);
507   ch->cadet = h;
508   if (0 == ccn.channel_of_client)
509   {
510     ch->ccn = h->next_ccn;
511     while (NULL != retrieve_channel (h,
512                                      h->next_ccn))
513     {
514       h->next_ccn.channel_of_client
515         = htonl (1 + ntohl (h->next_ccn.channel_of_client));
516       if (0 == ntohl (h->next_ccn.channel_of_client))
517         h->next_ccn.channel_of_client
518           = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
519     }
520   }
521   else
522   {
523     ch->ccn = ccn;
524   }
525   return ch;
526 }
527
528
529 /**
530  * Destroy the specified channel.
531  * - Destroys all peers, calling the disconnect callback on each if needed
532  * - Cancels all outgoing traffic for that channel, calling respective notifys
533  * - Calls cleaner if channel was inbound
534  * - Frees all memory used
535  *
536  * @param ch Pointer to the channel.
537  * @param call_cleaner Whether to call the cleaner handler.
538  *
539  * @return Handle to the required channel or NULL if not found.
540  */
541 static void
542 destroy_channel (struct GNUNET_CADET_Channel *ch)
543 {
544   struct GNUNET_CADET_Handle *h;
545   struct GNUNET_CADET_TransmitHandle *th;
546   struct GNUNET_CADET_TransmitHandle *next;
547
548   if (NULL == ch)
549   {
550     GNUNET_break (0);
551     return;
552   }
553   h = ch->cadet;
554   LOG (GNUNET_ERROR_TYPE_DEBUG,
555        " destroy_channel %X of %p\n",
556        ch->ccn,
557        h);
558
559   GNUNET_CONTAINER_DLL_remove (h->channels_head,
560                                h->channels_tail,
561                                ch);
562   if (NULL != ch->mq_cont)
563   {
564     GNUNET_SCHEDULER_cancel (ch->mq_cont);
565     ch->mq_cont = NULL;
566   }
567   /* signal channel destruction */
568   if (0 != ch->peer)
569   {
570     if (NULL != h->cleaner)
571     {
572       /** @a deprecated  */
573       LOG (GNUNET_ERROR_TYPE_DEBUG,
574           " calling cleaner\n");
575       h->cleaner (h->cls, ch, ch->ctx);
576     }
577     else if (NULL != ch->disconnects)
578     {
579       LOG (GNUNET_ERROR_TYPE_DEBUG,
580           " calling disconnect handler\n");
581       ch->disconnects (ch->ctx, ch);
582     }
583     else
584     {
585       /* Application won't be aware of the channel destruction and use
586        * a pointer to free'd memory.
587        */
588       GNUNET_assert (0);
589     }
590   }
591
592   /* check that clients did not leave messages behind in the queue */
593   for (th = h->th_head; NULL != th; th = next)
594   {
595     next = th->next;
596     if (th->channel != ch)
597       continue;
598     /* Clients should have aborted their requests already.
599      * Management traffic should be ok, as clients can't cancel that.
600      * If the service crashed and we are reconnecting, it's ok.
601      */
602     GNUNET_break (GNUNET_NO == th_is_payload (th));
603     GNUNET_CADET_notify_transmit_ready_cancel (th);
604   }
605
606   if (0 != ch->peer)
607     GNUNET_PEER_change_rc (ch->peer, -1);
608   GNUNET_free (ch);
609 }
610
611
612 /**
613  * Add a transmit handle to the transmission queue and set the
614  * timeout if needed.
615  *
616  * @param h cadet handle with the queue head and tail
617  * @param th handle to the packet to be transmitted
618  */
619 static void
620 add_to_queue (struct GNUNET_CADET_Handle *h,
621               struct GNUNET_CADET_TransmitHandle *th)
622 {
623   GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
624                                     h->th_tail,
625                                     th);
626 }
627
628
629 /**
630  * Remove a transmit handle from the transmission queue, if present.
631  *
632  * Safe to call even if not queued.
633  *
634  * @param th handle to the packet to be unqueued.
635  */
636 static void
637 remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
638 {
639   struct GNUNET_CADET_Handle *h = th->channel->cadet;
640
641   /* It might or might not have been queued (rarely not), but check anyway. */
642   if (NULL != th->next || h->th_tail == th)
643   {
644     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
645   }
646 }
647
648
649 /**
650  * Notify the application about a change in the window size (if needed).
651  *
652  * @param ch Channel to notify about.
653  */
654 static void
655 notify_window_size (struct GNUNET_CADET_Channel *ch)
656 {
657   if (NULL != ch->window_changes)
658   {
659     ch->window_changes (ch->ctx, ch, ch->allow_send);
660   }
661 }
662
663 /******************************************************************************/
664 /***********************      MQ API CALLBACKS     ****************************/
665 /******************************************************************************/
666
667 /**
668  * Allow the MQ implementation to send the next message.
669  *
670  * @param cls Closure (channel whose mq to activate).
671  */
672 static void
673 cadet_mq_send_continue (void *cls)
674 {
675   struct GNUNET_CADET_Channel *ch = cls;
676
677   ch->mq_cont = NULL;
678   GNUNET_MQ_impl_send_continue (ch->mq);
679 }
680
681 /**
682  * Implement sending functionality of a message queue for
683  * us sending messages to a peer.
684  *
685  * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
686  * in order to label the message with the channel ID and send the
687  * encapsulated message to the service.
688  *
689  * @param mq the message queue
690  * @param msg the message to send
691  * @param impl_state state of the implementation
692  */
693 static void
694 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
695                     const struct GNUNET_MessageHeader *msg,
696                     void *impl_state)
697 {
698   struct GNUNET_CADET_Channel *ch = impl_state;
699   struct GNUNET_CADET_Handle *h = ch->cadet;
700   uint16_t msize;
701   struct GNUNET_MQ_Envelope *env;
702   struct GNUNET_CADET_LocalData *cadet_msg;
703
704
705   if (NULL == h->mq)
706   {
707     /* We're currently reconnecting, pretend this worked */
708     GNUNET_MQ_impl_send_continue (mq);
709     return;
710   }
711
712   /* check message size for sanity */
713   msize = ntohs (msg->size);
714   if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
715   {
716     GNUNET_break (0);
717     GNUNET_MQ_impl_send_continue (mq);
718     return;
719   }
720
721   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
722                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
723                                  msg);
724   cadet_msg->ccn = ch->ccn;
725
726   if (0 < ch->allow_send)
727   {
728     /* Service has allowed this message, just send it and continue accepting */
729     GNUNET_MQ_send (h->mq, env);
730     ch->allow_send--;
731     ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
732     // notify_window_size (ch); /* FIXME add "verbose" setting? */
733   }
734   else
735   {
736     /* Service has NOT allowed this message, queue it and wait for an ACK */
737     GNUNET_assert (NULL == ch->pending_env);
738     ch->pending_env = env;
739   }
740 }
741
742
743 /**
744  * Handle destruction of a message queue.  Implementations must not
745  * free @a mq, but should take care of @a impl_state.
746  *
747  * @param mq the message queue to destroy
748  * @param impl_state state of the implementation
749  */
750 static void
751 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
752                        void *impl_state)
753 {
754   struct GNUNET_CADET_Channel *ch = impl_state;
755
756   GNUNET_assert (mq == ch->mq);
757   ch->mq = NULL;
758 }
759
760
761 /**
762  * We had an error processing a message we forwarded from a peer to
763  * the CADET service.  We should just complain about it but otherwise
764  * continue processing.
765  *
766  * @param cls closure
767  * @param error error code
768  */
769 static void
770 cadet_mq_error_handler (void *cls,
771                         enum GNUNET_MQ_Error error)
772 {
773   GNUNET_break_op (0);
774 }
775
776
777 /**
778  * Implementation function that cancels the currently sent message.
779  * Should basically undo whatever #mq_send_impl() did.
780  *
781  * @param mq message queue
782  * @param impl_state state specific to the implementation
783  */
784
785 static void
786 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
787                      void *impl_state)
788 {
789   struct GNUNET_CADET_Channel *ch = impl_state;
790
791   LOG (GNUNET_ERROR_TYPE_WARNING,
792        "Cannot cancel mq message on channel %X of %p\n",
793        ch->ccn.channel_of_client, ch->cadet);
794
795   GNUNET_break (0);
796 }
797
798
799 /******************************************************************************/
800 /***********************      RECEIVE HANDLERS     ****************************/
801 /******************************************************************************/
802
803
804 /**
805  * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
806  * request the data to send over MQ. Since MQ manages the queue, this function
807  * is scheduled immediatly after a transmit ready notification.
808  *
809  * @param cls Closure (transmit handle).
810  */
811 static void
812 request_data (void *cls)
813 {
814   struct GNUNET_CADET_TransmitHandle *th = cls;
815   struct GNUNET_CADET_LocalData *msg;
816   struct GNUNET_MQ_Envelope *env;
817   size_t osize;
818
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820        "Requesting Data: %u bytes (allow send is %u)\n",
821        th->size,
822        th->channel->allow_send);
823
824   GNUNET_assert (0 < th->channel->allow_send);
825   th->channel->allow_send--;
826   /* NOTE: we may be allowed to send another packet immediately,
827      albeit the current logic waits for the ACK. */
828   th->request_data_task = NULL;
829   remove_from_queue (th);
830
831   env = GNUNET_MQ_msg_extra (msg,
832                              th->size,
833                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
834   msg->ccn = th->channel->ccn;
835   osize = th->notify (th->notify_cls,
836                       th->size,
837                       &msg[1]);
838   GNUNET_assert (osize == th->size);
839
840   GNUNET_MQ_send (th->channel->cadet->mq,
841                   env);
842   GNUNET_free (th);
843 }
844
845
846 /**
847  * Process the new channel notification and add it to the channels in the handle
848  *
849  * @param h     The cadet handle
850  * @param msg   A message with the details of the new incoming channel
851  */
852 static void
853 handle_channel_created (void *cls,
854                         const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
855 {
856   struct GNUNET_CADET_Handle *h = cls;
857   struct GNUNET_CADET_Channel *ch;
858   struct GNUNET_CADET_Port *port;
859   const struct GNUNET_HashCode *port_number;
860   struct GNUNET_CADET_ClientChannelNumber ccn;
861
862   ccn = msg->ccn;
863   port_number = &msg->port;
864   if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
865   {
866     GNUNET_break (0);
867     return;
868   }
869   port = find_port (h, port_number);
870   if (NULL == port)
871   {
872     /* We could have closed the port but the service didn't know about it yet
873      * This is not an error.
874      */
875     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
876     struct GNUNET_MQ_Envelope *env;
877
878     GNUNET_break (0);
879     LOG (GNUNET_ERROR_TYPE_DEBUG,
880          "No handler for incoming channel %X (on port %s, recently closed?)\n",
881          ntohl (ccn.channel_of_client),
882          GNUNET_h2s (port_number));
883     env = GNUNET_MQ_msg (d_msg,
884                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
885     d_msg->ccn = msg->ccn;
886     GNUNET_MQ_send (h->mq,
887                     env);
888     return;
889   }
890
891   ch = create_channel (h,
892                        ccn);
893   ch->peer = GNUNET_PEER_intern (&msg->peer);
894   ch->cadet = h;
895   ch->ccn = ccn;
896   ch->incoming_port = port;
897   ch->options = ntohl (msg->opt);
898   LOG (GNUNET_ERROR_TYPE_DEBUG,
899        "Creating incoming channel %X [%s] %p\n",
900        ntohl (ccn.channel_of_client),
901        GNUNET_h2s (port_number),
902        ch);
903
904   if (NULL != port->handler)
905   {
906     /** @deprecated */
907     /* Old style API */
908     ch->ctx = port->handler (port->cls,
909                             ch,
910                             &msg->peer,
911                             port->hash,
912                             ch->options);
913   } else {
914     /* MQ API */
915     GNUNET_assert (NULL != port->connects);
916     ch->window_changes = port->window_changes;
917     ch->disconnects = port->disconnects;
918     ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
919                                             &cadet_mq_destroy_impl,
920                                             &cadet_mq_cancel_impl,
921                                             ch,
922                                             port->handlers,
923                                             &cadet_mq_error_handler,
924                                             ch);
925     ch->ctx = port->connects (port->cls,
926                               ch,
927                               &msg->peer);
928     GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
929   }
930 }
931
932
933 /**
934  * Process the channel destroy notification and free associated resources
935  *
936  * @param h     The cadet handle
937  * @param msg   A message with the details of the channel being destroyed
938  */
939 static void
940 handle_channel_destroy (void *cls,
941                         const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
942 {
943   struct GNUNET_CADET_Handle *h = cls;
944   struct GNUNET_CADET_Channel *ch;
945   struct GNUNET_CADET_ClientChannelNumber ccn;
946
947   ccn = msg->ccn;
948   LOG (GNUNET_ERROR_TYPE_DEBUG,
949        "Channel %X Destroy from service\n",
950        ntohl (ccn.channel_of_client));
951   ch = retrieve_channel (h,
952                          ccn);
953
954   if (NULL == ch)
955   {
956     LOG (GNUNET_ERROR_TYPE_DEBUG,
957          "channel %X unknown\n",
958          ntohl (ccn.channel_of_client));
959     return;
960   }
961   destroy_channel (ch);
962 }
963
964
965 /**
966  * Check that message received from CADET service is well-formed.
967  *
968  * @param cls the `struct GNUNET_CADET_Handle`
969  * @param message the message we got
970  * @return #GNUNET_OK if the message is well-formed,
971  *         #GNUNET_SYSERR otherwise
972  */
973 static int
974 check_local_data (void *cls,
975                   const struct GNUNET_CADET_LocalData *message)
976 {
977   struct GNUNET_CADET_Handle *h = cls;
978   struct GNUNET_CADET_Channel *ch;
979   uint16_t size;
980
981   size = ntohs (message->header.size);
982   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
983   {
984     GNUNET_break_op (0);
985     return GNUNET_SYSERR;
986   }
987
988   ch = retrieve_channel (h,
989                          message->ccn);
990   if (NULL == ch)
991   {
992     GNUNET_break_op (0);
993     return GNUNET_SYSERR;
994   }
995
996   return GNUNET_OK;
997 }
998
999
1000 /**
1001  * Process the incoming data packets, call appropriate handlers.
1002  *
1003  * @param h       The cadet handle
1004  * @param message A message encapsulating the data
1005  */
1006 static void
1007 handle_local_data (void *cls,
1008                    const struct GNUNET_CADET_LocalData *message)
1009 {
1010   struct GNUNET_CADET_Handle *h = cls;
1011   const struct GNUNET_MessageHeader *payload;
1012   const struct GNUNET_CADET_MessageHandler *handler;
1013   struct GNUNET_CADET_Channel *ch;
1014   uint16_t type;
1015
1016   ch = retrieve_channel (h,
1017                          message->ccn);
1018   if (NULL == ch)
1019   {
1020     GNUNET_break_op (0);
1021     reconnect (h);
1022     return;
1023   }
1024
1025   payload = (struct GNUNET_MessageHeader *) &message[1];
1026   type = ntohs (payload->type);
1027   LOG (GNUNET_ERROR_TYPE_DEBUG,
1028        "Got a %s data on channel %s [%X] of type %s (%u)\n",
1029        GC_f2s (ntohl (ch->ccn.channel_of_client) >=
1030                GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
1031        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
1032        ntohl (message->ccn.channel_of_client),
1033        GC_m2s (type),
1034        type);
1035   if (NULL != ch->mq)
1036   {
1037     GNUNET_MQ_inject_message (ch->mq, payload);
1038     return;
1039   }
1040   /** @a deprecated */
1041   for (unsigned i=0;i<h->n_handlers;i++)
1042   {
1043     handler = &h->message_handlers[i];
1044     if (handler->type == type)
1045     {
1046       if (GNUNET_OK !=
1047           handler->callback (h->cls,
1048                              ch,
1049                              &ch->ctx,
1050                              payload))
1051       {
1052         LOG (GNUNET_ERROR_TYPE_DEBUG,
1053              "callback caused disconnection\n");
1054         GNUNET_CADET_channel_destroy (ch);
1055         return;
1056       }
1057       return;
1058     }
1059   }
1060   /* Other peer sent message we do not comprehend. */
1061   GNUNET_break_op (0);
1062   GNUNET_CADET_receive_done (ch);
1063 }
1064
1065
1066 /**
1067  * Process a local ACK message, enabling the client to send
1068  * more data to the service.
1069  *
1070  * @param h Cadet handle.
1071  * @param message Message itself.
1072  */
1073 static void
1074 handle_local_ack (void *cls,
1075                   const struct GNUNET_CADET_LocalAck *message)
1076 {
1077   struct GNUNET_CADET_Handle *h = cls;
1078   struct GNUNET_CADET_Channel *ch;
1079   struct GNUNET_CADET_ClientChannelNumber ccn;
1080   struct GNUNET_CADET_TransmitHandle *th;
1081
1082   ccn = message->ccn;
1083   ch = retrieve_channel (h, ccn);
1084   if (NULL == ch)
1085   {
1086     LOG (GNUNET_ERROR_TYPE_DEBUG,
1087          "ACK on unknown channel %X\n",
1088          ntohl (ccn.channel_of_client));
1089     return;
1090   }
1091   ch->allow_send++;
1092   if (NULL != ch->mq)
1093   {
1094     if (NULL == ch->pending_env)
1095     {
1096       LOG (GNUNET_ERROR_TYPE_DEBUG,
1097            "Got an ACK on mq channel %X, allow send now %u!\n",
1098            ntohl (ch->ccn.channel_of_client),
1099            ch->allow_send);
1100       notify_window_size (ch);
1101     }
1102     else
1103     {
1104       LOG (GNUNET_ERROR_TYPE_DEBUG,
1105            "Got an ACK on mq channel %X, sending pending message!\n",
1106            ntohl (ch->ccn.channel_of_client));
1107       GNUNET_MQ_send (h->mq, ch->pending_env);
1108       ch->allow_send--;
1109       ch->pending_env = NULL;
1110       ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
1111     }
1112     return;
1113   }
1114
1115   /** @deprecated */
1116   /* Old style API */
1117   for (th = h->th_head; NULL != th; th = th->next)
1118   {
1119     if ( (th->channel == ch) &&
1120          (NULL == th->request_data_task) )
1121     {
1122       th->request_data_task
1123         = GNUNET_SCHEDULER_add_now (&request_data,
1124                                     th);
1125       break;
1126     }
1127   }
1128 }
1129
1130
1131 /**
1132  * Generic error handler, called with the appropriate error code and
1133  * the same closure specified at the creation of the message queue.
1134  * Not every message queue implementation supports an error handler.
1135  *
1136  * @param cls closure, a `struct GNUNET_CORE_Handle *`
1137  * @param error error code
1138  */
1139 static void
1140 handle_mq_error (void *cls,
1141                  enum GNUNET_MQ_Error error)
1142 {
1143   struct GNUNET_CADET_Handle *h = cls;
1144
1145   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
1146   GNUNET_MQ_destroy (h->mq);
1147   h->mq = NULL;
1148   reconnect (h);
1149 }
1150
1151
1152 /*
1153  * Process a local reply about info on all channels, pass info to the user.
1154  *
1155  * @param h Cadet handle.
1156  * @param message Message itself.
1157  */
1158 // static void
1159 // process_get_channels (struct GNUNET_CADET_Handle *h,
1160 //                      const struct GNUNET_MessageHeader *message)
1161 // {
1162 //   struct GNUNET_CADET_LocalInfo *msg;
1163 //
1164 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
1165 //
1166 //   if (NULL == h->channels_cb)
1167 //   {
1168 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1169 //     return;
1170 //   }
1171 //
1172 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1173 //   if (ntohs (message->size) !=
1174 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
1175 //        sizeof (struct GNUNET_PeerIdentity)))
1176 //   {
1177 //     GNUNET_break_op (0);
1178 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1179 //                 "Get channels message: size %hu - expected %u\n",
1180 //                 ntohs (message->size),
1181 //                 sizeof (struct GNUNET_CADET_LocalInfo));
1182 //     return;
1183 //   }
1184 //   h->channels_cb (h->channels_cls,
1185 //                   ntohl (msg->channel_id),
1186 //                   &msg->owner,
1187 //                   &msg->destination);
1188 // }
1189
1190
1191
1192 /*
1193  * Process a local monitor_channel reply, pass info to the user.
1194  *
1195  * @param h Cadet handle.
1196  * @param message Message itself.
1197  */
1198 // static void
1199 // process_show_channel (struct GNUNET_CADET_Handle *h,
1200 //                      const struct GNUNET_MessageHeader *message)
1201 // {
1202 //   struct GNUNET_CADET_LocalInfo *msg;
1203 //   size_t esize;
1204 //
1205 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1206 //
1207 //   if (NULL == h->channel_cb)
1208 //   {
1209 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1210 //     return;
1211 //   }
1212 //
1213 //   /* Verify message sanity */
1214 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1215 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
1216 //   if (ntohs (message->size) != esize)
1217 //   {
1218 //     GNUNET_break_op (0);
1219 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1220 //                 "Show channel message: size %hu - expected %u\n",
1221 //                 ntohs (message->size),
1222 //                 esize);
1223 //
1224 //     h->channel_cb (h->channel_cls, NULL, NULL);
1225 //     h->channel_cb = NULL;
1226 //     h->channel_cls = NULL;
1227 //
1228 //     return;
1229 //   }
1230 //
1231 //   h->channel_cb (h->channel_cls,
1232 //                  &msg->destination,
1233 //                  &msg->owner);
1234 // }
1235
1236
1237
1238 /**
1239  * Check that message received from CADET service is well-formed.
1240  *
1241  * @param cls the `struct GNUNET_CADET_Handle`
1242  * @param message the message we got
1243  * @return #GNUNET_OK if the message is well-formed,
1244  *         #GNUNET_SYSERR otherwise
1245  */
1246 static int
1247 check_get_peers (void *cls,
1248                  const struct GNUNET_CADET_LocalInfoPeer *message)
1249 {
1250   struct GNUNET_CADET_Handle *h = cls;
1251   uint16_t size;
1252
1253   if (NULL == h->info_cb.peers_cb)
1254   {
1255     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1256                 "  no handler for peesr monitor message!\n");
1257     return GNUNET_SYSERR;
1258   }
1259
1260   size = ntohs (message->header.size);
1261   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1262   {
1263     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1264     h->info_cb.peers_cb = NULL;
1265     h->info_cls = NULL;
1266     return GNUNET_SYSERR;
1267   }
1268
1269   return GNUNET_OK;
1270 }
1271
1272
1273 /**
1274  * Process a local reply about info on all tunnels, pass info to the user.
1275  *
1276  * @param cls Closure (Cadet handle).
1277  * @param msg Message itself.
1278  */
1279 static void
1280 handle_get_peers (void *cls,
1281                   const struct GNUNET_CADET_LocalInfoPeer *msg)
1282 {
1283   struct GNUNET_CADET_Handle *h = cls;
1284   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1285                        (int) ntohs (msg->tunnel),
1286                        (unsigned int ) ntohs (msg->paths),
1287                        0);
1288 }
1289
1290
1291 /**
1292  * Check that message received from CADET service is well-formed.
1293  *
1294  * @param cls the `struct GNUNET_CADET_Handle`
1295  * @param message the message we got
1296  * @return #GNUNET_OK if the message is well-formed,
1297  *         #GNUNET_SYSERR otherwise
1298  */
1299 static int
1300 check_get_peer (void *cls,
1301                 const struct GNUNET_CADET_LocalInfoPeer *message)
1302 {
1303   struct GNUNET_CADET_Handle *h = cls;
1304   const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1305   struct GNUNET_PeerIdentity *paths_array;
1306   size_t esize;
1307   unsigned int epaths;
1308   unsigned int paths;
1309   unsigned int peers;
1310
1311   if (NULL == h->info_cb.peer_cb)
1312   {
1313     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1314                 "  no handler for peer monitor message!\n");
1315     goto clean_cls;
1316   }
1317
1318   /* Verify message sanity */
1319   esize = ntohs (message->header.size);
1320   if (esize < msize)
1321   {
1322     GNUNET_break_op (0);
1323     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1324     goto clean_cls;
1325   }
1326   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
1327   {
1328     GNUNET_break_op (0);
1329     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1330     goto clean_cls;
1331
1332   }
1333   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
1334   epaths = (unsigned int) ntohs (message->paths);
1335   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1336   paths = 0;
1337   for (int i = 0; i < peers; i++)
1338   {
1339     if (0 == memcmp (&paths_array[i], &message->destination,
1340                      sizeof (struct GNUNET_PeerIdentity)))
1341     {
1342       paths++;
1343     }
1344   }
1345   if (paths != epaths)
1346   {
1347     GNUNET_break_op (0);
1348     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1349     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1350     goto clean_cls;
1351   }
1352
1353   return GNUNET_OK;
1354
1355 clean_cls:
1356   h->info_cb.peer_cb = NULL;
1357   h->info_cls = NULL;
1358   return GNUNET_SYSERR;
1359 }
1360
1361
1362 /**
1363  * Process a local peer info reply, pass info to the user.
1364  *
1365  * @param cls Closure (Cadet handle).
1366  * @param message Message itself.
1367  */
1368 static void
1369 handle_get_peer (void *cls,
1370                  const struct GNUNET_CADET_LocalInfoPeer *message)
1371 {
1372   struct GNUNET_CADET_Handle *h = cls;
1373   struct GNUNET_PeerIdentity *paths_array;
1374   unsigned int paths;
1375   unsigned int path_length;
1376   int neighbor;
1377   unsigned int peers;
1378
1379   paths = (unsigned int) ntohs (message->paths);
1380   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1381   peers = (ntohs (message->header.size) - sizeof (*message))
1382           / sizeof (struct GNUNET_PeerIdentity);
1383   path_length = 0;
1384   neighbor = GNUNET_NO;
1385
1386   for (int i = 0; i < peers; i++)
1387   {
1388     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
1389     path_length++;
1390     if (0 == memcmp (&paths_array[i], &message->destination,
1391                      sizeof (struct GNUNET_PeerIdentity)))
1392     {
1393       if (1 == path_length)
1394         neighbor = GNUNET_YES;
1395       path_length = 0;
1396     }
1397   }
1398
1399   /* Call Callback with tunnel info. */
1400   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1401   h->info_cb.peer_cb (h->info_cls,
1402                       &message->destination,
1403                       (int) ntohs (message->tunnel),
1404                       neighbor,
1405                       paths,
1406                       paths_array);
1407 }
1408
1409
1410 /**
1411  * Check that message received from CADET service is well-formed.
1412  *
1413  * @param cls the `struct GNUNET_CADET_Handle`
1414  * @param msg the message we got
1415  * @return #GNUNET_OK if the message is well-formed,
1416  *         #GNUNET_SYSERR otherwise
1417  */
1418 static int
1419 check_get_tunnels (void *cls,
1420                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1421 {
1422   struct GNUNET_CADET_Handle *h = cls;
1423   uint16_t size;
1424
1425   if (NULL == h->info_cb.tunnels_cb)
1426   {
1427     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1428                 "  no handler for tunnels monitor message!\n");
1429     return GNUNET_SYSERR;
1430   }
1431
1432   size = ntohs (msg->header.size);
1433   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1434   {
1435     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1436     h->info_cb.tunnels_cb = NULL;
1437     h->info_cls = NULL;
1438     return GNUNET_SYSERR;
1439   }
1440   return GNUNET_OK;
1441 }
1442
1443
1444 /**
1445  * Process a local reply about info on all tunnels, pass info to the user.
1446  *
1447  * @param cls Closure (Cadet handle).
1448  * @param message Message itself.
1449  */
1450 static void
1451 handle_get_tunnels (void *cls,
1452                     const struct GNUNET_CADET_LocalInfoTunnel *msg)
1453 {
1454   struct GNUNET_CADET_Handle *h = cls;
1455
1456   h->info_cb.tunnels_cb (h->info_cls,
1457                          &msg->destination,
1458                          ntohl (msg->channels),
1459                          ntohl (msg->connections),
1460                          ntohs (msg->estate),
1461                          ntohs (msg->cstate));
1462
1463 }
1464
1465
1466 /**
1467  * Check that message received from CADET service is well-formed.
1468  *
1469  * @param cls the `struct GNUNET_CADET_Handle`
1470  * @param msg the message we got
1471  * @return #GNUNET_OK if the message is well-formed,
1472  *         #GNUNET_SYSERR otherwise
1473  */
1474 static int
1475 check_get_tunnel (void *cls,
1476                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
1477 {
1478   struct GNUNET_CADET_Handle *h = cls;
1479   unsigned int ch_n;
1480   unsigned int c_n;
1481   size_t esize;
1482   size_t msize;
1483
1484   if (NULL == h->info_cb.tunnel_cb)
1485   {
1486     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1487                 "  no handler for tunnel monitor message!\n");
1488     goto clean_cls;
1489   }
1490
1491   /* Verify message sanity */
1492   msize = ntohs (msg->header.size);
1493   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1494   if (esize > msize)
1495   {
1496     GNUNET_break_op (0);
1497     h->info_cb.tunnel_cb (h->info_cls,
1498                           NULL, 0, 0, NULL, NULL, 0, 0);
1499     goto clean_cls;
1500   }
1501   ch_n = ntohl (msg->channels);
1502   c_n = ntohl (msg->connections);
1503   esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber);
1504   esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier);
1505   if (msize != esize)
1506   {
1507     GNUNET_break_op (0);
1508     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509                 "m:%u, e: %u (%u ch, %u conn)\n",
1510                 (unsigned int) msize,
1511                 (unsigned int) esize,
1512                 ch_n,
1513                 c_n);
1514     h->info_cb.tunnel_cb (h->info_cls,
1515                           NULL, 0, 0, NULL, NULL, 0, 0);
1516     goto clean_cls;
1517   }
1518
1519   return GNUNET_OK;
1520
1521 clean_cls:
1522   h->info_cb.tunnel_cb = NULL;
1523   h->info_cls = NULL;
1524   return GNUNET_SYSERR;
1525 }
1526
1527
1528 /**
1529  * Process a local tunnel info reply, pass info to the user.
1530  *
1531  * @param cls Closure (Cadet handle).
1532  * @param msg Message itself.
1533  */
1534 static void
1535 handle_get_tunnel (void *cls,
1536                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1537 {
1538   struct GNUNET_CADET_Handle *h = cls;
1539   unsigned int ch_n;
1540   unsigned int c_n;
1541   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
1542   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
1543
1544   ch_n = ntohl (msg->channels);
1545   c_n = ntohl (msg->connections);
1546
1547   /* Call Callback with tunnel info. */
1548   conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
1549   chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n];
1550   h->info_cb.tunnel_cb (h->info_cls,
1551                         &msg->destination,
1552                         ch_n,
1553                         c_n,
1554                         chns,
1555                         conns,
1556                         ntohs (msg->estate),
1557                         ntohs (msg->cstate));
1558 }
1559
1560
1561 /**
1562  * Reconnect to the service, retransmit all infomation to try to restore the
1563  * original state.
1564  *
1565  * @param h handle to the cadet
1566  */
1567 static void
1568 reconnect (struct GNUNET_CADET_Handle *h)
1569 {
1570   struct GNUNET_MQ_MessageHandler handlers[] = {
1571     GNUNET_MQ_hd_fixed_size (channel_created,
1572                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
1573                              struct GNUNET_CADET_LocalChannelCreateMessage,
1574                              h),
1575     GNUNET_MQ_hd_fixed_size (channel_destroy,
1576                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
1577                              struct GNUNET_CADET_LocalChannelDestroyMessage,
1578                              h),
1579     GNUNET_MQ_hd_var_size (local_data,
1580                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
1581                            struct GNUNET_CADET_LocalData,
1582                            h),
1583     GNUNET_MQ_hd_fixed_size (local_ack,
1584                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
1585                              struct GNUNET_CADET_LocalAck,
1586                              h),
1587     GNUNET_MQ_hd_var_size (get_peers,
1588                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
1589                            struct GNUNET_CADET_LocalInfoPeer,
1590                            h),
1591     GNUNET_MQ_hd_var_size (get_peer,
1592                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
1593                            struct GNUNET_CADET_LocalInfoPeer,
1594                            h),
1595     GNUNET_MQ_hd_var_size (get_tunnels,
1596                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
1597                            struct GNUNET_CADET_LocalInfoTunnel,
1598                            h),
1599     GNUNET_MQ_hd_var_size (get_tunnel,
1600                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
1601                            struct GNUNET_CADET_LocalInfoTunnel,
1602                            h),
1603 // FIXME
1604 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
1605 //                        GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
1606 //                        struct GNUNET_CADET_ChannelDestroyMessage);
1607     GNUNET_MQ_handler_end ()
1608   };
1609   struct GNUNET_CADET_Channel *ch;
1610
1611   while (NULL != (ch = h->channels_head))
1612   {
1613     LOG (GNUNET_ERROR_TYPE_DEBUG,
1614          "Destroying channel due to a reconnect\n");
1615     destroy_channel (ch);
1616   }
1617
1618   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
1619
1620   if (NULL != h->mq)
1621   {
1622     GNUNET_MQ_destroy (h->mq);
1623     h->mq = NULL;
1624   }
1625   h->mq = GNUNET_CLIENT_connect (h->cfg,
1626                                  "cadet",
1627                                  handlers,
1628                                  &handle_mq_error,
1629                                  h);
1630   if (NULL == h->mq)
1631   {
1632     schedule_reconnect (h);
1633     return;
1634   }
1635   else
1636   {
1637     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1638   }
1639 }
1640
1641 /**
1642  * Reconnect callback: tries to reconnect again after a failer previous
1643  * reconnecttion
1644  *
1645  * @param cls closure (cadet handle)
1646  */
1647 static void
1648 reconnect_cbk (void *cls)
1649 {
1650   struct GNUNET_CADET_Handle *h = cls;
1651
1652   h->reconnect_task = NULL;
1653   reconnect (h);
1654 }
1655
1656
1657 /**
1658  * Reconnect to the service, retransmit all infomation to try to restore the
1659  * original state.
1660  *
1661  * @param h handle to the cadet
1662  *
1663  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
1664  */
1665 static void
1666 schedule_reconnect (struct GNUNET_CADET_Handle *h)
1667 {
1668   if (NULL == h->reconnect_task)
1669   {
1670     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
1671                                                       &reconnect_cbk, h);
1672     h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
1673   }
1674 }
1675
1676
1677 /******************************************************************************/
1678 /**********************      API CALL DEFINITIONS     *************************/
1679 /******************************************************************************/
1680
1681 struct GNUNET_CADET_Handle *
1682 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1683                       void *cls,
1684                       GNUNET_CADET_ChannelEndHandler cleaner,
1685                       const struct GNUNET_CADET_MessageHandler *handlers)
1686 {
1687   struct GNUNET_CADET_Handle *h;
1688
1689   h = GNUNET_new (struct GNUNET_CADET_Handle);
1690   LOG (GNUNET_ERROR_TYPE_DEBUG,
1691        "GNUNET_CADET_connect() %p\n",
1692        h);
1693   h->cfg = cfg;
1694   h->cleaner = cleaner;
1695   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1696   reconnect (h);
1697   if (h->mq == NULL)
1698   {
1699     GNUNET_break (0);
1700     GNUNET_CADET_disconnect (h);
1701     return NULL;
1702   }
1703   h->cls = cls;
1704   h->message_handlers = handlers;
1705   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1706   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1707   h->reconnect_task = NULL;
1708
1709   /* count handlers */
1710   for (h->n_handlers = 0;
1711        handlers && handlers[h->n_handlers].type;
1712        h->n_handlers++) ;
1713   return h;
1714 }
1715
1716
1717 /**
1718  * Disconnect from the cadet service. All channels will be destroyed. All channel
1719  * disconnect callbacks will be called on any still connected peers, notifying
1720  * about their disconnection. The registered inbound channel cleaner will be
1721  * called should any inbound channels still exist.
1722  *
1723  * @param handle connection to cadet to disconnect
1724  */
1725 void
1726 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1727 {
1728   struct GNUNET_CADET_Channel *ch;
1729   struct GNUNET_CADET_Channel *aux;
1730   struct GNUNET_CADET_TransmitHandle *th;
1731
1732   LOG (GNUNET_ERROR_TYPE_DEBUG,
1733        "CADET DISCONNECT\n");
1734   ch = handle->channels_head;
1735   while (NULL != ch)
1736   {
1737     aux = ch->next;
1738     if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1739     {
1740       GNUNET_break (0);
1741       LOG (GNUNET_ERROR_TYPE_DEBUG,
1742            "channel %X not destroyed\n",
1743            ntohl (ch->ccn.channel_of_client));
1744     }
1745     destroy_channel (ch);
1746     ch = aux;
1747   }
1748   while (NULL != (th = handle->th_head))
1749   {
1750     struct GNUNET_MessageHeader *msg;
1751
1752     /* Make sure it is an allowed packet (everything else should have been
1753      * already canceled).
1754      */
1755     GNUNET_break (GNUNET_NO == th_is_payload (th));
1756     msg = (struct GNUNET_MessageHeader *) &th[1];
1757     switch (ntohs(msg->type))
1758     {
1759       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1760       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1761       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN:
1762       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE:
1763       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1764       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1765       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1766       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1767       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1768       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1769         break;
1770       default:
1771         GNUNET_break (0);
1772         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1773              GC_m2s (ntohs(msg->type)));
1774     }
1775
1776     GNUNET_CADET_notify_transmit_ready_cancel (th);
1777   }
1778
1779   if (NULL != handle->mq)
1780   {
1781     GNUNET_MQ_destroy (handle->mq);
1782     handle->mq = NULL;
1783   }
1784   if (NULL != handle->reconnect_task)
1785   {
1786     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1787     handle->reconnect_task = NULL;
1788   }
1789
1790   GNUNET_CONTAINER_multihashmap_destroy (handle->ports);
1791   handle->ports = NULL;
1792   GNUNET_free (handle);
1793 }
1794
1795
1796 /**
1797  * Open a port to receive incomming channels.
1798  *
1799  * @param h CADET handle.
1800  * @param port Hash representing the port number.
1801  * @param new_channel Function called when an channel is received.
1802  * @param new_channel_cls Closure for @a new_channel.
1803  * @return Port handle.
1804  */
1805 struct GNUNET_CADET_Port *
1806 GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1807                         const struct GNUNET_HashCode *port,
1808                         GNUNET_CADET_InboundChannelNotificationHandler
1809                             new_channel,
1810                         void *new_channel_cls)
1811 {
1812   struct GNUNET_CADET_PortMessage *msg;
1813   struct GNUNET_MQ_Envelope *env;
1814   struct GNUNET_CADET_Port *p;
1815
1816   GNUNET_assert (NULL != new_channel);
1817   p = GNUNET_new (struct GNUNET_CADET_Port);
1818   p->cadet = h;
1819   p->hash = GNUNET_new (struct GNUNET_HashCode);
1820   *p->hash = *port;
1821   p->handler = new_channel;
1822   p->cls = new_channel_cls;
1823   GNUNET_assert (GNUNET_OK ==
1824                  GNUNET_CONTAINER_multihashmap_put (h->ports,
1825                                                     p->hash,
1826                                                     p,
1827                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1828
1829   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1830   msg->port = *p->hash;
1831   GNUNET_MQ_send (h->mq, env);
1832
1833   return p;
1834 }
1835
1836 /**
1837  * Close a port opened with @a GNUNET_CADET_open_port.
1838  * The @a new_channel callback will no longer be called.
1839  *
1840  * @param p Port handle.
1841  */
1842 void
1843 GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1844 {
1845   struct GNUNET_CADET_PortMessage *msg;
1846   struct GNUNET_MQ_Envelope *env;
1847   struct GNUNET_HashCode *id;
1848
1849   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1850
1851   id = NULL != p->hash ? p->hash : &p->id;
1852   msg->port = *id;
1853   GNUNET_MQ_send (p->cadet->mq, env);
1854   GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
1855   GNUNET_free_non_null (p->hash);
1856   GNUNET_free (p);
1857 }
1858
1859
1860 /**
1861  * Create a new channel towards a remote peer.
1862  *
1863  * If the destination port is not open by any peer or the destination peer
1864  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1865  * for this channel.
1866  *
1867  * @param h cadet handle
1868  * @param channel_ctx client's channel context to associate with the channel
1869  * @param peer peer identity the channel should go to
1870  * @param port Port hash (port number).
1871  * @param options CadetOption flag field, with all desired option bits set to 1.
1872  * @return handle to the channel
1873  */
1874 struct GNUNET_CADET_Channel *
1875 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1876                             void *channel_ctx,
1877                             const struct GNUNET_PeerIdentity *peer,
1878                             const struct GNUNET_HashCode *port,
1879                             enum GNUNET_CADET_ChannelOption options)
1880 {
1881   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
1882   struct GNUNET_MQ_Envelope *env;
1883   struct GNUNET_CADET_Channel *ch;
1884   struct GNUNET_CADET_ClientChannelNumber ccn;
1885
1886   ccn.channel_of_client = htonl (0);
1887   ch = create_channel (h, ccn);
1888   ch->ctx = channel_ctx;
1889   ch->peer = GNUNET_PEER_intern (peer);
1890
1891   LOG (GNUNET_ERROR_TYPE_DEBUG,
1892        "Creating new channel to %s:%u at %p number %X\n",
1893        GNUNET_i2s (peer),
1894        port,
1895        ch,
1896        ntohl (ch->ccn.channel_of_client));
1897   env = GNUNET_MQ_msg (msg,
1898                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1899   msg->ccn = ch->ccn;
1900   msg->port = *port;
1901   msg->peer = *peer;
1902   msg->opt = htonl (options);
1903   GNUNET_MQ_send (h->mq,
1904                   env);
1905   return ch;
1906 }
1907
1908
1909 void
1910 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1911 {
1912   struct GNUNET_CADET_Handle *h;
1913   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
1914   struct GNUNET_MQ_Envelope *env;
1915   struct GNUNET_CADET_TransmitHandle *th;
1916   struct GNUNET_CADET_TransmitHandle *next;
1917
1918   LOG (GNUNET_ERROR_TYPE_DEBUG,
1919        "Destroying channel\n");
1920   h = channel->cadet;
1921   for  (th = h->th_head; th != NULL; th = next)
1922   {
1923     next = th->next;
1924     if (th->channel == channel)
1925     {
1926       GNUNET_break (0);
1927       if (GNUNET_YES == th_is_payload (th))
1928       {
1929         /* applications should cancel before destroying channel */
1930         LOG (GNUNET_ERROR_TYPE_WARNING,
1931              "Channel destroyed without cancelling transmission requests\n");
1932         th->notify (th->notify_cls, 0, NULL);
1933       }
1934       else
1935       {
1936         LOG (GNUNET_ERROR_TYPE_WARNING,
1937              "no meta-traffic should be queued\n");
1938       }
1939       GNUNET_CONTAINER_DLL_remove (h->th_head,
1940                                    h->th_tail,
1941                                    th);
1942       GNUNET_CADET_notify_transmit_ready_cancel (th);
1943     }
1944   }
1945
1946   env = GNUNET_MQ_msg (msg,
1947                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1948   msg->ccn = channel->ccn;
1949   GNUNET_MQ_send (h->mq,
1950                   env);
1951
1952   destroy_channel (channel);
1953 }
1954
1955
1956 /**
1957  * Get information about a channel.
1958  *
1959  * @param channel Channel handle.
1960  * @param option Query (GNUNET_CADET_OPTION_*).
1961  * @param ... dependant on option, currently not used
1962  *
1963  * @return Union with an answer to the query.
1964  */
1965 const union GNUNET_CADET_ChannelInfo *
1966 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1967                               enum GNUNET_CADET_ChannelOption option, ...)
1968 {
1969   static int bool_flag;
1970   const union GNUNET_CADET_ChannelInfo *ret;
1971
1972   switch (option)
1973   {
1974     case GNUNET_CADET_OPTION_NOBUFFER:
1975     case GNUNET_CADET_OPTION_RELIABLE:
1976     case GNUNET_CADET_OPTION_OUT_OF_ORDER:
1977       if (0 != (option & channel->options))
1978         bool_flag = GNUNET_YES;
1979       else
1980         bool_flag = GNUNET_NO;
1981       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1982       break;
1983     case GNUNET_CADET_OPTION_PEER:
1984       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1985       break;
1986     default:
1987       GNUNET_break (0);
1988       return NULL;
1989   }
1990
1991   return ret;
1992 }
1993
1994
1995 struct GNUNET_CADET_TransmitHandle *
1996 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
1997                                     int cork,
1998                                     struct GNUNET_TIME_Relative maxdelay,
1999                                     size_t notify_size,
2000                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
2001                                     void *notify_cls)
2002 {
2003   struct GNUNET_CADET_TransmitHandle *th;
2004
2005   GNUNET_assert (NULL != channel);
2006   GNUNET_assert (NULL != notify);
2007   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
2008   LOG (GNUNET_ERROR_TYPE_DEBUG,
2009        "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
2010        ntohl (channel->ccn.channel_of_client),
2011        channel->allow_send,
2012        (ntohl (channel->ccn.channel_of_client) >=
2013         GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
2014        ? "origin"
2015        : "destination",
2016        (unsigned int) notify_size);
2017   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
2018   {
2019     LOG (GNUNET_ERROR_TYPE_WARNING,
2020          "CADET transmit ready timeout is deprected (has no effect)\n");
2021   }
2022
2023   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
2024   th->channel = channel;
2025   th->size = notify_size;
2026   th->notify = notify;
2027   th->notify_cls = notify_cls;
2028   if (0 != channel->allow_send)
2029     th->request_data_task
2030       = GNUNET_SCHEDULER_add_now (&request_data,
2031                                   th);
2032   else
2033     add_to_queue (channel->cadet,
2034                   th);
2035   return th;
2036 }
2037
2038
2039 void
2040 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
2041 {
2042   if (NULL != th->request_data_task)
2043   {
2044     GNUNET_SCHEDULER_cancel (th->request_data_task);
2045     th->request_data_task = NULL;
2046   }
2047   remove_from_queue (th);
2048   GNUNET_free (th);
2049 }
2050
2051
2052 /**
2053  * Send an ack on the channel to confirm the processing of a message.
2054  *
2055  * @param ch Channel on which to send the ACK.
2056  */
2057 void
2058 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
2059 {
2060   struct GNUNET_CADET_LocalAck *msg;
2061   struct GNUNET_MQ_Envelope *env;
2062
2063   env = GNUNET_MQ_msg (msg,
2064                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
2065   LOG (GNUNET_ERROR_TYPE_DEBUG,
2066        "Sending ACK on channel %X\n",
2067        ntohl (channel->ccn.channel_of_client));
2068   msg->ccn = channel->ccn;
2069   GNUNET_MQ_send (channel->cadet->mq,
2070                   env);
2071 }
2072
2073
2074 static void
2075 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
2076 {
2077   struct GNUNET_MessageHeader *msg;
2078   struct GNUNET_MQ_Envelope *env;
2079
2080   LOG (GNUNET_ERROR_TYPE_DEBUG,
2081        " Sending %s monitor message to service\n",
2082        GC_m2s(type));
2083
2084   env = GNUNET_MQ_msg (msg, type);
2085   GNUNET_MQ_send (h->mq, env);
2086 }
2087
2088
2089 /**
2090  * Request a debug dump on the service's STDERR.
2091  *
2092  * WARNING: unstable API, likely to change in the future!
2093  *
2094  * @param h cadet handle
2095  */
2096 void
2097 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
2098 {
2099   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
2100   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
2101 }
2102
2103
2104 /**
2105  * Request information about peers known to the running cadet service.
2106  * The callback will be called for every peer known to the service.
2107  * Only one info request (of any kind) can be active at once.
2108  *
2109  *
2110  * WARNING: unstable API, likely to change in the future!
2111  *
2112  * @param h Handle to the cadet peer.
2113  * @param callback Function to call with the requested data.
2114  * @param callback_cls Closure for @c callback.
2115  *
2116  * @return #GNUNET_OK / #GNUNET_SYSERR
2117  */
2118 int
2119 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
2120                        GNUNET_CADET_PeersCB callback,
2121                        void *callback_cls)
2122 {
2123   if (NULL != h->info_cb.peers_cb)
2124   {
2125     GNUNET_break (0);
2126     return GNUNET_SYSERR;
2127   }
2128   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
2129   h->info_cb.peers_cb = callback;
2130   h->info_cls = callback_cls;
2131   return GNUNET_OK;
2132 }
2133
2134
2135 /**
2136  * Cancel a peer info request. The callback will not be called (anymore).
2137  *
2138  * WARNING: unstable API, likely to change in the future!
2139  *
2140  * @param h Cadet handle.
2141  *
2142  * @return Closure given to GNUNET_CADET_get_peers.
2143  */
2144 void *
2145 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
2146 {
2147   void *cls;
2148
2149   cls = h->info_cls;
2150   h->info_cb.peers_cb = NULL;
2151   h->info_cls = NULL;
2152   return cls;
2153 }
2154
2155
2156 /**
2157  * Request information about a peer known to the running cadet peer.
2158  * The callback will be called for the tunnel once.
2159  * Only one info request (of any kind) can be active at once.
2160  *
2161  * WARNING: unstable API, likely to change in the future!
2162  *
2163  * @param h Handle to the cadet peer.
2164  * @param id Peer whose tunnel to examine.
2165  * @param callback Function to call with the requested data.
2166  * @param callback_cls Closure for @c callback.
2167  *
2168  * @return #GNUNET_OK / #GNUNET_SYSERR
2169  */
2170 int
2171 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
2172                        const struct GNUNET_PeerIdentity *id,
2173                        GNUNET_CADET_PeerCB callback,
2174                        void *callback_cls)
2175 {
2176   struct GNUNET_CADET_LocalInfo *msg;
2177   struct GNUNET_MQ_Envelope *env;
2178
2179   if (NULL != h->info_cb.peer_cb)
2180   {
2181     GNUNET_break (0);
2182     return GNUNET_SYSERR;
2183   }
2184
2185   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
2186   msg->peer = *id;
2187   GNUNET_MQ_send (h->mq, env);
2188
2189   h->info_cb.peer_cb = callback;
2190   h->info_cls = callback_cls;
2191   return GNUNET_OK;
2192 }
2193
2194
2195 /**
2196  * Request information about tunnels of the running cadet peer.
2197  * The callback will be called for every tunnel of the service.
2198  * Only one info request (of any kind) can be active at once.
2199  *
2200  * WARNING: unstable API, likely to change in the future!
2201  *
2202  * @param h Handle to the cadet peer.
2203  * @param callback Function to call with the requested data.
2204  * @param callback_cls Closure for @c callback.
2205  *
2206  * @return #GNUNET_OK / #GNUNET_SYSERR
2207  */
2208 int
2209 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
2210                          GNUNET_CADET_TunnelsCB callback,
2211                          void *callback_cls)
2212 {
2213   if (NULL != h->info_cb.tunnels_cb)
2214   {
2215     GNUNET_break (0);
2216     return GNUNET_SYSERR;
2217   }
2218   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
2219   h->info_cb.tunnels_cb = callback;
2220   h->info_cls = callback_cls;
2221   return GNUNET_OK;
2222 }
2223
2224
2225 /**
2226  * Cancel a monitor request. The monitor callback will not be called.
2227  *
2228  * @param h Cadet handle.
2229  *
2230  * @return Closure given to GNUNET_CADET_get_tunnels.
2231  */
2232 void *
2233 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
2234 {
2235   void *cls;
2236
2237   h->info_cb.tunnels_cb = NULL;
2238   cls = h->info_cls;
2239   h->info_cls = NULL;
2240
2241   return cls;
2242 }
2243
2244
2245
2246 /**
2247  * Request information about a tunnel of the running cadet peer.
2248  * The callback will be called for the tunnel once.
2249  * Only one info request (of any kind) can be active at once.
2250  *
2251  * WARNING: unstable API, likely to change in the future!
2252  *
2253  * @param h Handle to the cadet peer.
2254  * @param id Peer whose tunnel to examine.
2255  * @param callback Function to call with the requested data.
2256  * @param callback_cls Closure for @c callback.
2257  *
2258  * @return #GNUNET_OK / #GNUNET_SYSERR
2259  */
2260 int
2261 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2262                         const struct GNUNET_PeerIdentity *id,
2263                         GNUNET_CADET_TunnelCB callback,
2264                         void *callback_cls)
2265 {
2266   struct GNUNET_CADET_LocalInfo *msg;
2267   struct GNUNET_MQ_Envelope *env;
2268
2269   if (NULL != h->info_cb.tunnel_cb)
2270   {
2271     GNUNET_break (0);
2272     return GNUNET_SYSERR;
2273   }
2274
2275   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2276   msg->peer = *id;
2277   GNUNET_MQ_send (h->mq, env);
2278
2279   h->info_cb.tunnel_cb = callback;
2280   h->info_cls = callback_cls;
2281   return GNUNET_OK;
2282 }
2283
2284
2285 /**
2286  * Request information about a specific channel of the running cadet peer.
2287  *
2288  * WARNING: unstable API, likely to change in the future!
2289  * FIXME Add destination option.
2290  *
2291  * @param h Handle to the cadet peer.
2292  * @param initiator ID of the owner of the channel.
2293  * @param channel_number Channel number.
2294  * @param callback Function to call with the requested data.
2295  * @param callback_cls Closure for @c callback.
2296  *
2297  * @return #GNUNET_OK / #GNUNET_SYSERR
2298  */
2299 int
2300 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2301                            struct GNUNET_PeerIdentity *initiator,
2302                            unsigned int channel_number,
2303                            GNUNET_CADET_ChannelCB callback,
2304                            void *callback_cls)
2305 {
2306   struct GNUNET_CADET_LocalInfo *msg;
2307   struct GNUNET_MQ_Envelope *env;
2308
2309   if (NULL != h->info_cb.channel_cb)
2310   {
2311     GNUNET_break (0);
2312     return GNUNET_SYSERR;
2313   }
2314
2315   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2316   msg->peer = *initiator;
2317   msg->ccn.channel_of_client = htonl (channel_number);
2318   GNUNET_MQ_send (h->mq, env);
2319
2320   h->info_cb.channel_cb = callback;
2321   h->info_cls = callback_cls;
2322   return GNUNET_OK;
2323 }
2324
2325
2326 /**
2327  * Function called to notify a client about the connection
2328  * begin ready to queue more data.  "buf" will be
2329  * NULL and "size" zero if the connection was closed for
2330  * writing in the meantime.
2331  *
2332  * @param cls closure
2333  * @param size number of bytes available in buf
2334  * @param buf where the callee should write the message
2335  * @return number of bytes written to buf
2336  */
2337 static size_t
2338 cadet_mq_ntr (void *cls, size_t size,
2339              void *buf)
2340 {
2341   struct GNUNET_MQ_Handle *mq = cls;
2342   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2343   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2344   uint16_t msize;
2345
2346   state->th = NULL;
2347   if (NULL == buf)
2348   {
2349     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2350     return 0;
2351   }
2352   msize = ntohs (msg->size);
2353   GNUNET_assert (msize <= size);
2354   GNUNET_memcpy (buf, msg, msize);
2355   GNUNET_MQ_impl_send_continue (mq);
2356   return msize;
2357 }
2358
2359
2360 /**
2361  * Signature of functions implementing the
2362  * sending functionality of a message queue.
2363  *
2364  * @param mq the message queue
2365  * @param msg the message to send
2366  * @param impl_state state of the implementation
2367  */
2368 static void
2369 cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2370                         const struct GNUNET_MessageHeader *msg,
2371                         void *impl_state)
2372 {
2373   struct CadetMQState *state = impl_state;
2374
2375   GNUNET_assert (NULL == state->th);
2376   state->th =
2377       GNUNET_CADET_notify_transmit_ready (state->channel,
2378                                          /* FIXME: add option for corking */
2379                                          GNUNET_NO,
2380                                          GNUNET_TIME_UNIT_FOREVER_REL,
2381                                          ntohs (msg->size),
2382                                          &cadet_mq_ntr, mq);
2383
2384 }
2385
2386
2387 /**
2388  * Signature of functions implementing the
2389  * destruction of a message queue.
2390  * Implementations must not free 'mq', but should
2391  * take care of 'impl_state'.
2392  *
2393  * @param mq the message queue to destroy
2394  * @param impl_state state of the implementation
2395  */
2396 static void
2397 cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2398                            void *impl_state)
2399 {
2400   struct CadetMQState *state = impl_state;
2401
2402   if (NULL != state->th)
2403     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2404
2405   GNUNET_free (state);
2406 }
2407
2408
2409 /**
2410  * Create a message queue for a cadet channel.
2411  * The message queue can only be used to transmit messages,
2412  * not to receive them.
2413  *
2414  * @param channel the channel to create the message qeue for
2415  * @return a message queue to messages over the channel
2416  */
2417 struct GNUNET_MQ_Handle *
2418 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2419 {
2420   struct GNUNET_MQ_Handle *mq;
2421   struct CadetMQState *state;
2422
2423   state = GNUNET_new (struct CadetMQState);
2424   state->channel = channel;
2425
2426   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2427                                       &cadet_mq_destroy_impl_old,
2428                                       NULL, /* FIXME: cancel impl. */
2429                                       state,
2430                                       NULL, /* no msg handlers */
2431                                       NULL, /* no err handlers */
2432                                       NULL); /* no handler cls */
2433   return mq;
2434 }
2435
2436
2437 /**
2438  * Transitional function to convert an unsigned int port to a hash value.
2439  * WARNING: local static value returned, NOT reentrant!
2440  * WARNING: do not use this function for new code!
2441  *
2442  * @param port Numerical port (unsigned int format).
2443  *
2444  * @return A GNUNET_HashCode usable for the new CADET API.
2445  */
2446 const struct GNUNET_HashCode *
2447 GC_u2h (uint32_t port)
2448 {
2449   static struct GNUNET_HashCode hash;
2450
2451   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2452               "This is a transitional function, "
2453               "use proper crypto hashes as CADET ports\n");
2454   GNUNET_CRYPTO_hash (&port, sizeof (port), &hash);
2455
2456   return &hash;
2457 }
2458
2459
2460
2461 /******************************************************************************/
2462 /******************************* MQ-BASED API *********************************/
2463 /******************************************************************************/
2464
2465 /**
2466  * Connect to the MQ-based cadet service.
2467  *
2468  * @param cfg Configuration to use.
2469  *
2470  * @return Handle to the cadet service NULL on error.
2471  */
2472 struct GNUNET_CADET_Handle *
2473 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2474 {
2475   struct GNUNET_CADET_Handle *h;
2476
2477   LOG (GNUNET_ERROR_TYPE_DEBUG,
2478        "GNUNET_CADET_connecT()\n");
2479   h = GNUNET_new (struct GNUNET_CADET_Handle);
2480   h->cfg = cfg;
2481   h->mq_api = GNUNET_YES;
2482   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2483   reconnect (h);
2484   if (NULL == h->mq)
2485   {
2486     GNUNET_break (0);
2487     GNUNET_CADET_disconnect (h);
2488     return NULL;
2489   }
2490   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2491   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2492   h->reconnect_task = NULL;
2493
2494   return h;
2495 }
2496
2497
2498 /**
2499  * Open a port to receive incomming MQ-based channels.
2500  *
2501  * @param h CADET handle.
2502  * @param port Hash identifying the port.
2503  * @param connects Function called when an incoming channel is connected.
2504  * @param connects_cls Closure for the @a connects handler.
2505  * @param window_changes Function called when the transmit window size changes.
2506  * @param disconnects Function called when a channel is disconnected.
2507  * @param handlers Callbacks for messages we care about, NULL-terminated.
2508  *
2509  * @return Port handle.
2510  */
2511 struct GNUNET_CADET_Port *
2512 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2513                         const struct GNUNET_HashCode *port,
2514                         GNUNET_CADET_ConnectEventHandler connects,
2515                         void * connects_cls,
2516                         GNUNET_CADET_WindowSizeEventHandler window_changes,
2517                         GNUNET_CADET_DisconnectEventHandler disconnects,
2518                         const struct GNUNET_MQ_MessageHandler *handlers)
2519 {
2520   struct GNUNET_CADET_PortMessage *msg;
2521   struct GNUNET_MQ_Envelope *env;
2522   struct GNUNET_CADET_Port *p;
2523
2524   GNUNET_assert (NULL != connects);
2525   GNUNET_assert (NULL != disconnects);
2526
2527   p = GNUNET_new (struct GNUNET_CADET_Port);
2528   p->cadet = h;
2529   p->id = *port;
2530   p->connects = connects;
2531   p->cls = connects_cls;
2532   p->window_changes = window_changes;
2533   p->disconnects = disconnects;
2534   p->handlers = handlers;
2535
2536   GNUNET_assert (GNUNET_OK ==
2537                  GNUNET_CONTAINER_multihashmap_put (h->ports,
2538                                                     &p->id,
2539                                                     p,
2540                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2541
2542   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2543   msg->port = p->id;
2544   GNUNET_MQ_send (h->mq, env);
2545
2546   return p;
2547 }
2548
2549
2550 /**
2551  * Create a new channel towards a remote peer.
2552  *
2553  * If the destination port is not open by any peer or the destination peer
2554  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2555  * for this channel.
2556  *
2557  * @param h CADET handle.
2558  * @param channel_cls Closure for the channel. It's given to:
2559  *                    - The disconnect handler @a disconnects
2560  *                    - Each message type callback in @a handlers
2561  * @param destination Peer identity the channel should go to.
2562  * @param port Identification of the destination port.
2563  * @param options CadetOption flag field, with all desired option bits set to 1.
2564  * @param window_changes Function called when the transmit window size changes.
2565  * @param disconnects Function called when the channel is disconnected.
2566  * @param handlers Callbacks for messages we care about, NULL-terminated.
2567  *
2568  * @return Handle to the channel.
2569  */
2570 struct GNUNET_CADET_Channel *
2571 GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2572                              void *channel_cls,
2573                              const struct GNUNET_PeerIdentity *destination,
2574                              const struct GNUNET_HashCode *port,
2575                              enum GNUNET_CADET_ChannelOption options,
2576                              GNUNET_CADET_WindowSizeEventHandler window_changes,
2577                              GNUNET_CADET_DisconnectEventHandler disconnects,
2578                              const struct GNUNET_MQ_MessageHandler *handlers)
2579 {
2580   struct GNUNET_CADET_Channel *ch;
2581   struct GNUNET_CADET_ClientChannelNumber ccn;
2582   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2583   struct GNUNET_MQ_Envelope *env;
2584
2585   GNUNET_assert (NULL != disconnects);
2586
2587   /* Save parameters */
2588   ccn.channel_of_client = htonl (0);
2589   ch = create_channel (h, ccn);
2590   ch->ctx = channel_cls;
2591   ch->peer = GNUNET_PEER_intern (destination);
2592   ch->options = options;
2593   ch->window_changes = window_changes;
2594   ch->disconnects = disconnects;
2595
2596   /* Create MQ for channel */
2597   ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2598                                           &cadet_mq_destroy_impl,
2599                                           &cadet_mq_cancel_impl,
2600                                           ch,
2601                                           handlers,
2602                                           &cadet_mq_error_handler,
2603                                           ch);
2604   GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2605
2606   /* Request channel creation to service */
2607   env = GNUNET_MQ_msg (msg,
2608                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2609   msg->ccn = ch->ccn;
2610   msg->port = *port;
2611   msg->peer = *destination;
2612   msg->opt = htonl (options);
2613   GNUNET_MQ_send (h->mq,
2614                   env);
2615
2616   return ch;
2617 }
2618
2619
2620 /**
2621  * Obtain the message queue for a connected peer.
2622  *
2623  * @param channel The channel handle from which to get the MQ.
2624  *
2625  * @return NULL if @a channel is not yet connected.
2626  */
2627 struct GNUNET_MQ_Handle *
2628 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2629 {
2630   return channel->mq;
2631 }