- adjust FWD/BCK to new channel numbering
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_cadet_service.h"
30 #include "cadet.h"
31 #include "cadet_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  *
42  * @deprecated
43  */
44 struct GNUNET_CADET_TransmitHandle
45 {
46   /**
47    * Double Linked list
48    */
49   struct GNUNET_CADET_TransmitHandle *next;
50
51   /**
52    * Double Linked list
53    */
54   struct GNUNET_CADET_TransmitHandle *prev;
55
56   /**
57    * Channel this message is sent on / for (may be NULL for control messages).
58    */
59   struct GNUNET_CADET_Channel *channel;
60
61   /**
62    * Request data task.
63    */
64   struct GNUNET_SCHEDULER_Task *request_data_task;
65
66   /**
67    * Callback to obtain the message to transmit, or NULL if we
68    * got the message in 'data'.  Notice that messages built
69    * by 'notify' need to be encapsulated with information about
70    * the 'target'.
71    */
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73
74   /**
75    * Closure for 'notify'
76    */
77   void *notify_cls;
78
79   /**
80    * Size of the payload.
81    */
82   size_t size;
83 };
84
85
86 union CadetInfoCB
87 {
88
89   /**
90    * Channel callback.
91    */
92   GNUNET_CADET_ChannelCB channel_cb;
93
94   /**
95    * Monitor callback
96    */
97   GNUNET_CADET_PeersCB peers_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeerCB peer_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_TunnelsCB tunnels_cb;
108
109   /**
110    * Tunnel callback.
111    */
112   GNUNET_CADET_TunnelCB tunnel_cb;
113 };
114
115
116 /**
117  * Opaque handle to the service.
118  */
119 struct GNUNET_CADET_Handle
120 {
121   /**
122    * Flag to indicate old or MQ API.
123    */
124   int mq_api;
125
126   /**
127    * Message queue (if available).
128    */
129   struct GNUNET_MQ_Handle *mq;
130
131   /**
132    * Set of handlers used for processing incoming messages in the channels
133    *
134    * @deprecated
135    */
136   const struct GNUNET_CADET_MessageHandler *message_handlers;
137
138   /**
139    * Number of handlers in the handlers array.
140    *
141    * @deprecated
142    */
143   unsigned int n_handlers;
144
145   /**
146    * Ports open.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap *ports;
149
150   /**
151    * Double linked list of the channels this client is connected to, head.
152    */
153   struct GNUNET_CADET_Channel *channels_head;
154
155   /**
156    * Double linked list of the channels this client is connected to, tail.
157    */
158   struct GNUNET_CADET_Channel *channels_tail;
159
160   /**
161    * Callback for inbound channel disconnection
162    */
163   GNUNET_CADET_ChannelEndHandler *cleaner;
164
165   /**
166    * Closure for all the handlers given by the client
167    *
168    * @deprecated
169    */
170   void *cls;
171
172   /**
173    * Messages to send to the service, head.
174    *
175    * @deprecated
176    */
177   struct GNUNET_CADET_TransmitHandle *th_head;
178
179   /**
180    * Messages to send to the service, tail.
181    *
182    * @deprecated
183    */
184   struct GNUNET_CADET_TransmitHandle *th_tail;
185
186   /**
187    * child of the next channel to create (to avoid reusing IDs often)
188    */
189   struct GNUNET_CADET_ClientChannelNumber next_ccn;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   struct GNUNET_SCHEDULER_Task * reconnect_task;
205
206   /**
207    * Callback for an info task (only one active at a time).
208    */
209   union CadetInfoCB info_cb;
210
211   /**
212    * Info callback closure for @c info_cb.
213    */
214   void *info_cls;
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_CADET_Peer
222 {
223   /**
224    * ID of the peer in short form
225    */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Channel this peer belongs to
230    */
231   struct GNUNET_CADET_Channel *t;
232 };
233
234
235 /**
236  * Opaque handle to a channel.
237  */
238 struct GNUNET_CADET_Channel
239 {
240   /**
241    * DLL next
242    */
243   struct GNUNET_CADET_Channel *next;
244
245   /**
246    * DLL prev
247    */
248   struct GNUNET_CADET_Channel *prev;
249
250   /**
251    * Handle to the cadet this channel belongs to
252    */
253   struct GNUNET_CADET_Handle *cadet;
254
255   /**
256    * Local ID of the channel
257    */
258   struct GNUNET_CADET_ClientChannelNumber ccn;
259
260   /**
261    * Channel's port, if incoming.
262    */
263   struct GNUNET_CADET_Port *incoming_port;
264
265   /**
266    * Other end of the channel.
267    */
268   GNUNET_PEER_Id peer;
269
270   /**
271    * Any data the caller wants to put in here
272    */
273   void *ctx;
274
275   /**
276    * Channel options: reliability, etc.
277    */
278   enum GNUNET_CADET_ChannelOption options;
279
280   /**
281    * Are we allowed to send to the service?
282    *
283    * @deprecated?
284    */
285   unsigned int allow_send;
286
287   /*****************************    MQ     ************************************/
288   /**
289    * Message Queue for the channel.
290    */
291   struct GNUNET_MQ_Handle *mq;
292
293   /**
294    * Task to allow mq to send more traffic.
295    */
296   struct GNUNET_SCHEDULER_Task *mq_cont;
297
298   /**
299    * Pending envelope in case we don't have an ACK from the service.
300    */
301   struct GNUNET_MQ_Envelope *pending_env;
302
303   /**
304    * Window change handler.
305    */
306   GNUNET_CADET_WindowSizeEventHandler window_changes;
307
308   /**
309    * Disconnect handler.
310    */
311   GNUNET_CADET_DisconnectEventHandler disconnects;
312
313 };
314
315
316 /**
317  * Opaque handle to a port.
318  */
319 struct GNUNET_CADET_Port
320 {
321   /**
322    * Handle to the CADET session this port belongs to.
323    */
324   struct GNUNET_CADET_Handle *cadet;
325
326   /**
327    * Port ID.
328    *
329    * @deprecated
330    */
331   struct GNUNET_HashCode *hash;
332
333   /**
334    * Callback handler for incoming channels on this port.
335    */
336   GNUNET_CADET_InboundChannelNotificationHandler *handler;
337
338   /**
339    * Closure for @a handler.
340    */
341   void *cls;
342
343   /*****************************    MQ     ************************************/
344
345   /**
346    * Port "number"
347    */
348   struct GNUNET_HashCode id;
349
350   /**
351    * Handler for incoming channels on this port
352    */
353   GNUNET_CADET_ConnectEventHandler connects;
354
355   /**
356    * Closure for @ref connects
357    */
358   void * connects_cls;
359
360   /**
361    * Window size change handler.
362    */
363   GNUNET_CADET_WindowSizeEventHandler window_changes;
364
365   /**
366    * Handler called when an incoming channel is destroyed..
367    */
368   GNUNET_CADET_DisconnectEventHandler disconnects;
369
370   /**
371    * Payload handlers for incoming channels.
372    */
373   const struct GNUNET_MQ_MessageHandler *handlers;
374 };
375
376
377 /**
378  * Implementation state for cadet's message queue.
379  */
380 struct CadetMQState
381 {
382   /**
383    * The current transmit handle, or NULL
384    * if no transmit is active.
385    */
386   struct GNUNET_CADET_TransmitHandle *th;
387
388   /**
389    * Channel to send the data over.
390    */
391   struct GNUNET_CADET_Channel *channel;
392 };
393
394
395
396 /******************************************************************************/
397 /*********************      FUNCTION DECLARATIONS     *************************/
398 /******************************************************************************/
399
400 /**
401  * Reconnect to the service, retransmit all infomation to try to restore the
402  * original state.
403  *
404  * @param h Handle to the CADET service.
405  */
406 static void
407 schedule_reconnect (struct GNUNET_CADET_Handle *h);
408
409
410 /**
411  * Reconnect callback: tries to reconnect again after a failer previous
412  * reconnection.
413  *
414  * @param cls Closure (cadet handle).
415  */
416 static void
417 reconnect_cbk (void *cls);
418
419
420 /**
421  * Reconnect to the service, retransmit all infomation to try to restore the
422  * original state.
423  *
424  * @param h handle to the cadet
425  */
426 static void
427 reconnect (struct GNUNET_CADET_Handle *h);
428
429
430 /******************************************************************************/
431 /***********************     AUXILIARY FUNCTIONS      *************************/
432 /******************************************************************************/
433
434 /**
435  * Check if transmission is a payload packet.
436  *
437  * @param th Transmission handle.
438  *
439  * @return #GNUNET_YES if it is a payload packet,
440  *         #GNUNET_NO if it is a cadet management packet.
441  */
442 static int
443 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
444 {
445   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
446 }
447
448
449 /**
450  * Find the Port struct for a hash.
451  *
452  * @param h CADET handle.
453  * @param hash HashCode for the port number.
454  *
455  * @return The port handle if known, NULL otherwise.
456  */
457 static struct GNUNET_CADET_Port *
458 find_port (const struct GNUNET_CADET_Handle *h,
459            const struct GNUNET_HashCode *hash)
460 {
461   struct GNUNET_CADET_Port *p;
462
463   p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
464
465   return p;
466 }
467
468
469 /**
470  * Get the channel handler for the channel specified by id from the given handle
471  *
472  * @param h Cadet handle
473  * @param ccn ID of the wanted channel
474  * @return handle to the required channel or NULL if not found
475  */
476 static struct GNUNET_CADET_Channel *
477 retrieve_channel (struct GNUNET_CADET_Handle *h,
478                   struct GNUNET_CADET_ClientChannelNumber ccn)
479 {
480   struct GNUNET_CADET_Channel *ch;
481
482   for (ch = h->channels_head; NULL != ch; ch = ch->next)
483     if (ch->ccn.channel_of_client == ccn.channel_of_client)
484       return ch;
485   return NULL;
486 }
487
488
489 /**
490  * Create a new channel and insert it in the channel list of the cadet handle
491  *
492  * @param h Cadet handle
493  * @param ccn Desired ccn of the channel, 0 to assign one automatically.
494  *
495  * @return Handle to the created channel.
496  */
497 static struct GNUNET_CADET_Channel *
498 create_channel (struct GNUNET_CADET_Handle *h,
499                 struct GNUNET_CADET_ClientChannelNumber ccn)
500 {
501   struct GNUNET_CADET_Channel *ch;
502
503   ch = GNUNET_new (struct GNUNET_CADET_Channel);
504   GNUNET_CONTAINER_DLL_insert (h->channels_head,
505                                h->channels_tail,
506                                ch);
507   ch->cadet = h;
508   if (0 == ccn.channel_of_client)
509   {
510     ch->ccn = h->next_ccn;
511     while (NULL != retrieve_channel (h,
512                                      h->next_ccn))
513     {
514       h->next_ccn.channel_of_client
515         = htonl (1 + ntohl (h->next_ccn.channel_of_client));
516       if (0 == ntohl (h->next_ccn.channel_of_client))
517         h->next_ccn.channel_of_client
518           = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
519     }
520   }
521   else
522   {
523     ch->ccn = ccn;
524   }
525   return ch;
526 }
527
528
529 /**
530  * Destroy the specified channel.
531  * - Destroys all peers, calling the disconnect callback on each if needed
532  * - Cancels all outgoing traffic for that channel, calling respective notifys
533  * - Calls cleaner if channel was inbound
534  * - Frees all memory used
535  *
536  * @param ch Pointer to the channel.
537  * @param call_cleaner Whether to call the cleaner handler.
538  *
539  * @return Handle to the required channel or NULL if not found.
540  */
541 static void
542 destroy_channel (struct GNUNET_CADET_Channel *ch)
543 {
544   struct GNUNET_CADET_Handle *h;
545   struct GNUNET_CADET_TransmitHandle *th;
546   struct GNUNET_CADET_TransmitHandle *next;
547
548   if (NULL == ch)
549   {
550     GNUNET_break (0);
551     return;
552   }
553   h = ch->cadet;
554   LOG (GNUNET_ERROR_TYPE_DEBUG,
555        " destroy_channel %X of %p\n",
556        ch->ccn,
557        h);
558
559   GNUNET_CONTAINER_DLL_remove (h->channels_head,
560                                h->channels_tail,
561                                ch);
562   if (NULL != ch->mq_cont)
563   {
564     GNUNET_SCHEDULER_cancel (ch->mq_cont);
565     ch->mq_cont = NULL;
566   }
567   /* signal channel destruction */
568   if (0 != ch->peer)
569   {
570     if (NULL != h->cleaner)
571     {
572       /** @a deprecated  */
573       LOG (GNUNET_ERROR_TYPE_DEBUG,
574           " calling cleaner\n");
575       h->cleaner (h->cls, ch, ch->ctx);
576     }
577     else if (NULL != ch->disconnects)
578     {
579       LOG (GNUNET_ERROR_TYPE_DEBUG,
580           " calling disconnect handler\n");
581       ch->disconnects (ch->ctx, ch);
582     }
583     else
584     {
585       /* Application won't be aware of the channel destruction and use
586        * a pointer to free'd memory.
587        */
588       GNUNET_assert (0);
589     }
590   }
591
592   /* check that clients did not leave messages behind in the queue */
593   for (th = h->th_head; NULL != th; th = next)
594   {
595     next = th->next;
596     if (th->channel != ch)
597       continue;
598     /* Clients should have aborted their requests already.
599      * Management traffic should be ok, as clients can't cancel that.
600      * If the service crashed and we are reconnecting, it's ok.
601      */
602     GNUNET_break (GNUNET_NO == th_is_payload (th));
603     GNUNET_CADET_notify_transmit_ready_cancel (th);
604   }
605
606   if (0 != ch->peer)
607     GNUNET_PEER_change_rc (ch->peer, -1);
608   GNUNET_free (ch);
609 }
610
611
612 /**
613  * Add a transmit handle to the transmission queue and set the
614  * timeout if needed.
615  *
616  * @param h cadet handle with the queue head and tail
617  * @param th handle to the packet to be transmitted
618  */
619 static void
620 add_to_queue (struct GNUNET_CADET_Handle *h,
621               struct GNUNET_CADET_TransmitHandle *th)
622 {
623   GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
624                                     h->th_tail,
625                                     th);
626 }
627
628
629 /**
630  * Remove a transmit handle from the transmission queue, if present.
631  *
632  * Safe to call even if not queued.
633  *
634  * @param th handle to the packet to be unqueued.
635  */
636 static void
637 remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
638 {
639   struct GNUNET_CADET_Handle *h = th->channel->cadet;
640
641   /* It might or might not have been queued (rarely not), but check anyway. */
642   if (NULL != th->next || h->th_tail == th)
643   {
644     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
645   }
646 }
647
648
649 /**
650  * Notify the application about a change in the window size (if needed).
651  *
652  * @param ch Channel to notify about.
653  */
654 static void
655 notify_window_size (struct GNUNET_CADET_Channel *ch)
656 {
657   if (NULL != ch->window_changes)
658   {
659     ch->window_changes (ch->ctx, ch, ch->allow_send);
660   }
661 }
662
663 /******************************************************************************/
664 /***********************      MQ API CALLBACKS     ****************************/
665 /******************************************************************************/
666
667 /**
668  * Allow the MQ implementation to send the next message.
669  *
670  * @param cls Closure (channel whose mq to activate).
671  */
672 static void
673 cadet_mq_send_continue (void *cls)
674 {
675   struct GNUNET_CADET_Channel *ch = cls;
676
677   ch->mq_cont = NULL;
678   GNUNET_MQ_impl_send_continue (ch->mq);
679 }
680
681 /**
682  * Implement sending functionality of a message queue for
683  * us sending messages to a peer.
684  *
685  * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
686  * in order to label the message with the channel ID and send the
687  * encapsulated message to the service.
688  *
689  * @param mq the message queue
690  * @param msg the message to send
691  * @param impl_state state of the implementation
692  */
693 static void
694 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
695                     const struct GNUNET_MessageHeader *msg,
696                     void *impl_state)
697 {
698   struct GNUNET_CADET_Channel *ch = impl_state;
699   struct GNUNET_CADET_Handle *h = ch->cadet;
700   uint16_t msize;
701   struct GNUNET_MQ_Envelope *env;
702   struct GNUNET_CADET_LocalData *cadet_msg;
703
704
705   if (NULL == h->mq)
706   {
707     /* We're currently reconnecting, pretend this worked */
708     GNUNET_MQ_impl_send_continue (mq);
709     return;
710   }
711
712   /* check message size for sanity */
713   msize = ntohs (msg->size);
714   if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
715   {
716     GNUNET_break (0);
717     GNUNET_MQ_impl_send_continue (mq);
718     return;
719   }
720
721   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
722                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
723                                  msg);
724   cadet_msg->ccn = ch->ccn;
725
726   if (0 < ch->allow_send)
727   {
728     /* Service has allowed this message, just send it and continue accepting */
729     GNUNET_MQ_send (h->mq, env);
730     ch->allow_send--;
731     ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
732     // notify_window_size (ch); /* FIXME add "verbose" setting? */
733   }
734   else
735   {
736     /* Service has NOT allowed this message, queue it and wait for an ACK */
737     GNUNET_assert (NULL == ch->pending_env);
738     ch->pending_env = env;
739   }
740 }
741
742
743 /**
744  * Handle destruction of a message queue.  Implementations must not
745  * free @a mq, but should take care of @a impl_state.
746  *
747  * @param mq the message queue to destroy
748  * @param impl_state state of the implementation
749  */
750 static void
751 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
752                        void *impl_state)
753 {
754   struct GNUNET_CADET_Channel *ch = impl_state;
755
756   GNUNET_assert (mq == ch->mq);
757   ch->mq = NULL;
758 }
759
760
761 /**
762  * We had an error processing a message we forwarded from a peer to
763  * the CADET service.  We should just complain about it but otherwise
764  * continue processing.
765  *
766  * @param cls closure
767  * @param error error code
768  */
769 static void
770 cadet_mq_error_handler (void *cls,
771                         enum GNUNET_MQ_Error error)
772 {
773   GNUNET_break_op (0);
774 }
775
776
777 /**
778  * Implementation function that cancels the currently sent message.
779  * Should basically undo whatever #mq_send_impl() did.
780  *
781  * @param mq message queue
782  * @param impl_state state specific to the implementation
783  */
784
785 static void
786 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
787                      void *impl_state)
788 {
789   struct GNUNET_CADET_Channel *ch = impl_state;
790
791   LOG (GNUNET_ERROR_TYPE_WARNING,
792        "Cannot cancel mq message on channel %X of %p\n",
793        ch->ccn.channel_of_client, ch->cadet);
794
795   GNUNET_break (0);
796 }
797
798
799 /******************************************************************************/
800 /***********************      RECEIVE HANDLERS     ****************************/
801 /******************************************************************************/
802
803
804 /**
805  * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
806  * request the data to send over MQ. Since MQ manages the queue, this function
807  * is scheduled immediatly after a transmit ready notification.
808  *
809  * @param cls Closure (transmit handle).
810  */
811 static void
812 request_data (void *cls)
813 {
814   struct GNUNET_CADET_TransmitHandle *th = cls;
815   struct GNUNET_CADET_LocalData *msg;
816   struct GNUNET_MQ_Envelope *env;
817   size_t osize;
818
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820        "Requesting Data: %u bytes (allow send is %u)\n",
821        th->size,
822        th->channel->allow_send);
823
824   GNUNET_assert (0 < th->channel->allow_send);
825   th->channel->allow_send--;
826   /* NOTE: we may be allowed to send another packet immediately,
827      albeit the current logic waits for the ACK. */
828   th->request_data_task = NULL;
829   remove_from_queue (th);
830
831   env = GNUNET_MQ_msg_extra (msg,
832                              th->size,
833                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
834   msg->ccn = th->channel->ccn;
835   osize = th->notify (th->notify_cls,
836                       th->size,
837                       &msg[1]);
838   GNUNET_assert (osize == th->size);
839
840   GNUNET_MQ_send (th->channel->cadet->mq,
841                   env);
842   GNUNET_free (th);
843 }
844
845
846 /**
847  * Process the new channel notification and add it to the channels in the handle
848  *
849  * @param h     The cadet handle
850  * @param msg   A message with the details of the new incoming channel
851  */
852 static void
853 handle_channel_created (void *cls,
854                         const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
855 {
856   struct GNUNET_CADET_Handle *h = cls;
857   struct GNUNET_CADET_Channel *ch;
858   struct GNUNET_CADET_Port *port;
859   const struct GNUNET_HashCode *port_number;
860   struct GNUNET_CADET_ClientChannelNumber ccn;
861
862   ccn = msg->ccn;
863   port_number = &msg->port;
864   if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
865   {
866     GNUNET_break (0);
867     return;
868   }
869   port = find_port (h, port_number);
870   if (NULL == port)
871   {
872     /* We could have closed the port but the service didn't know about it yet
873      * This is not an error.
874      */
875     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
876     struct GNUNET_MQ_Envelope *env;
877
878     GNUNET_break (0);
879     LOG (GNUNET_ERROR_TYPE_DEBUG,
880          "No handler for incoming channel %X (on port %s, recently closed?)\n",
881          ntohl (ccn.channel_of_client),
882          GNUNET_h2s (port_number));
883     env = GNUNET_MQ_msg (d_msg,
884                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
885     d_msg->ccn = msg->ccn;
886     GNUNET_MQ_send (h->mq,
887                     env);
888     return;
889   }
890
891   ch = create_channel (h,
892                        ccn);
893   ch->peer = GNUNET_PEER_intern (&msg->peer);
894   ch->cadet = h;
895   ch->ccn = ccn;
896   ch->incoming_port = port;
897   ch->options = ntohl (msg->opt);
898   LOG (GNUNET_ERROR_TYPE_DEBUG,
899        "Creating incoming channel %X [%s] %p\n",
900        ntohl (ccn.channel_of_client),
901        GNUNET_h2s (port_number),
902        ch);
903
904   if (NULL != port->handler)
905   {
906     /** @deprecated */
907     /* Old style API */
908     ch->ctx = port->handler (port->cls,
909                              ch,
910                              &msg->peer,
911                              port->hash,
912                              ch->options);
913   }
914   else
915   {
916     /* MQ API */
917     GNUNET_assert (NULL != port->connects);
918     ch->window_changes = port->window_changes;
919     ch->disconnects = port->disconnects;
920     ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
921                                             &cadet_mq_destroy_impl,
922                                             &cadet_mq_cancel_impl,
923                                             ch,
924                                             port->handlers,
925                                             &cadet_mq_error_handler,
926                                             ch);
927     ch->ctx = port->connects (port->cls,
928                               ch,
929                               &msg->peer);
930     GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
931   }
932 }
933
934
935 /**
936  * Process the channel destroy notification and free associated resources
937  *
938  * @param h     The cadet handle
939  * @param msg   A message with the details of the channel being destroyed
940  */
941 static void
942 handle_channel_destroy (void *cls,
943                         const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
944 {
945   struct GNUNET_CADET_Handle *h = cls;
946   struct GNUNET_CADET_Channel *ch;
947   struct GNUNET_CADET_ClientChannelNumber ccn;
948
949   ccn = msg->ccn;
950   LOG (GNUNET_ERROR_TYPE_DEBUG,
951        "Channel %X Destroy from service\n",
952        ntohl (ccn.channel_of_client));
953   ch = retrieve_channel (h,
954                          ccn);
955
956   if (NULL == ch)
957   {
958     LOG (GNUNET_ERROR_TYPE_DEBUG,
959          "channel %X unknown\n",
960          ntohl (ccn.channel_of_client));
961     return;
962   }
963   destroy_channel (ch);
964 }
965
966
967 /**
968  * Check that message received from CADET service is well-formed.
969  *
970  * @param cls the `struct GNUNET_CADET_Handle`
971  * @param message the message we got
972  * @return #GNUNET_OK if the message is well-formed,
973  *         #GNUNET_SYSERR otherwise
974  */
975 static int
976 check_local_data (void *cls,
977                   const struct GNUNET_CADET_LocalData *message)
978 {
979   struct GNUNET_CADET_Handle *h = cls;
980   struct GNUNET_CADET_Channel *ch;
981   uint16_t size;
982
983   size = ntohs (message->header.size);
984   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
985   {
986     GNUNET_break_op (0);
987     return GNUNET_SYSERR;
988   }
989
990   ch = retrieve_channel (h,
991                          message->ccn);
992   if (NULL == ch)
993   {
994     GNUNET_break_op (0);
995     return GNUNET_SYSERR;
996   }
997
998   return GNUNET_OK;
999 }
1000
1001
1002 /**
1003  * Process the incoming data packets, call appropriate handlers.
1004  *
1005  * @param h       The cadet handle
1006  * @param message A message encapsulating the data
1007  */
1008 static void
1009 handle_local_data (void *cls,
1010                    const struct GNUNET_CADET_LocalData *message)
1011 {
1012   struct GNUNET_CADET_Handle *h = cls;
1013   const struct GNUNET_MessageHeader *payload;
1014   const struct GNUNET_CADET_MessageHandler *handler;
1015   struct GNUNET_CADET_Channel *ch;
1016   uint16_t type;
1017   int fwd;
1018
1019   ch = retrieve_channel (h,
1020                          message->ccn);
1021   if (NULL == ch)
1022   {
1023     GNUNET_break_op (0);
1024     reconnect (h);
1025     return;
1026   }
1027
1028   payload = (struct GNUNET_MessageHeader *) &message[1];
1029   type = ntohs (payload->type);
1030   fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
1031   LOG (GNUNET_ERROR_TYPE_DEBUG,
1032        "Got a %s data on channel %s [%X] of type %s (%u)\n",
1033        GC_f2s (fwd),
1034        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
1035        ntohl (message->ccn.channel_of_client),
1036        GC_m2s (type),
1037        type);
1038   if (NULL != ch->mq)
1039   {
1040     LOG (GNUNET_ERROR_TYPE_DEBUG,
1041          "injecting msg %s into mq %p\n",
1042          GC_m2s (ntohs (payload->type)),
1043          ch->mq);
1044     GNUNET_MQ_inject_message (ch->mq, payload);
1045     return;
1046   }
1047   /** @a deprecated */
1048   for (unsigned i=0;i<h->n_handlers;i++)
1049   {
1050     handler = &h->message_handlers[i];
1051     if (handler->type == type)
1052     {
1053       if (GNUNET_OK !=
1054           handler->callback (h->cls,
1055                              ch,
1056                              &ch->ctx,
1057                              payload))
1058       {
1059         LOG (GNUNET_ERROR_TYPE_DEBUG,
1060              "callback caused disconnection\n");
1061         GNUNET_CADET_channel_destroy (ch);
1062         return;
1063       }
1064       return;
1065     }
1066   }
1067   /* Other peer sent message we do not comprehend. */
1068   GNUNET_break_op (0);
1069   GNUNET_CADET_receive_done (ch);
1070 }
1071
1072
1073 /**
1074  * Process a local ACK message, enabling the client to send
1075  * more data to the service.
1076  *
1077  * @param h Cadet handle.
1078  * @param message Message itself.
1079  */
1080 static void
1081 handle_local_ack (void *cls,
1082                   const struct GNUNET_CADET_LocalAck *message)
1083 {
1084   struct GNUNET_CADET_Handle *h = cls;
1085   struct GNUNET_CADET_Channel *ch;
1086   struct GNUNET_CADET_ClientChannelNumber ccn;
1087   struct GNUNET_CADET_TransmitHandle *th;
1088
1089   ccn = message->ccn;
1090   ch = retrieve_channel (h, ccn);
1091   if (NULL == ch)
1092   {
1093     LOG (GNUNET_ERROR_TYPE_DEBUG,
1094          "ACK on unknown channel %X\n",
1095          ntohl (ccn.channel_of_client));
1096     return;
1097   }
1098   ch->allow_send++;
1099   if (NULL != ch->mq)
1100   {
1101     if (NULL == ch->pending_env)
1102     {
1103       LOG (GNUNET_ERROR_TYPE_DEBUG,
1104            "Got an ACK on mq channel %X, allow send now %u!\n",
1105            ntohl (ch->ccn.channel_of_client),
1106            ch->allow_send);
1107       notify_window_size (ch);
1108     }
1109     else
1110     {
1111       LOG (GNUNET_ERROR_TYPE_DEBUG,
1112            "Got an ACK on mq channel %X, sending pending message!\n",
1113            ntohl (ch->ccn.channel_of_client));
1114       GNUNET_MQ_send (h->mq, ch->pending_env);
1115       ch->allow_send--;
1116       ch->pending_env = NULL;
1117       ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
1118     }
1119     return;
1120   }
1121
1122   /** @deprecated */
1123   /* Old style API */
1124   for (th = h->th_head; NULL != th; th = th->next)
1125   {
1126     if ( (th->channel == ch) &&
1127          (NULL == th->request_data_task) )
1128     {
1129       th->request_data_task
1130         = GNUNET_SCHEDULER_add_now (&request_data,
1131                                     th);
1132       break;
1133     }
1134   }
1135 }
1136
1137
1138 /**
1139  * Generic error handler, called with the appropriate error code and
1140  * the same closure specified at the creation of the message queue.
1141  * Not every message queue implementation supports an error handler.
1142  *
1143  * @param cls closure, a `struct GNUNET_CORE_Handle *`
1144  * @param error error code
1145  */
1146 static void
1147 handle_mq_error (void *cls,
1148                  enum GNUNET_MQ_Error error)
1149 {
1150   struct GNUNET_CADET_Handle *h = cls;
1151
1152   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
1153   GNUNET_MQ_destroy (h->mq);
1154   h->mq = NULL;
1155   reconnect (h);
1156 }
1157
1158
1159 /*
1160  * Process a local reply about info on all channels, pass info to the user.
1161  *
1162  * @param h Cadet handle.
1163  * @param message Message itself.
1164  */
1165 // static void
1166 // process_get_channels (struct GNUNET_CADET_Handle *h,
1167 //                      const struct GNUNET_MessageHeader *message)
1168 // {
1169 //   struct GNUNET_CADET_LocalInfo *msg;
1170 //
1171 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
1172 //
1173 //   if (NULL == h->channels_cb)
1174 //   {
1175 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1176 //     return;
1177 //   }
1178 //
1179 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1180 //   if (ntohs (message->size) !=
1181 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
1182 //        sizeof (struct GNUNET_PeerIdentity)))
1183 //   {
1184 //     GNUNET_break_op (0);
1185 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1186 //                 "Get channels message: size %hu - expected %u\n",
1187 //                 ntohs (message->size),
1188 //                 sizeof (struct GNUNET_CADET_LocalInfo));
1189 //     return;
1190 //   }
1191 //   h->channels_cb (h->channels_cls,
1192 //                   ntohl (msg->channel_id),
1193 //                   &msg->owner,
1194 //                   &msg->destination);
1195 // }
1196
1197
1198
1199 /*
1200  * Process a local monitor_channel reply, pass info to the user.
1201  *
1202  * @param h Cadet handle.
1203  * @param message Message itself.
1204  */
1205 // static void
1206 // process_show_channel (struct GNUNET_CADET_Handle *h,
1207 //                      const struct GNUNET_MessageHeader *message)
1208 // {
1209 //   struct GNUNET_CADET_LocalInfo *msg;
1210 //   size_t esize;
1211 //
1212 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1213 //
1214 //   if (NULL == h->channel_cb)
1215 //   {
1216 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1217 //     return;
1218 //   }
1219 //
1220 //   /* Verify message sanity */
1221 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1222 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
1223 //   if (ntohs (message->size) != esize)
1224 //   {
1225 //     GNUNET_break_op (0);
1226 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1227 //                 "Show channel message: size %hu - expected %u\n",
1228 //                 ntohs (message->size),
1229 //                 esize);
1230 //
1231 //     h->channel_cb (h->channel_cls, NULL, NULL);
1232 //     h->channel_cb = NULL;
1233 //     h->channel_cls = NULL;
1234 //
1235 //     return;
1236 //   }
1237 //
1238 //   h->channel_cb (h->channel_cls,
1239 //                  &msg->destination,
1240 //                  &msg->owner);
1241 // }
1242
1243
1244
1245 /**
1246  * Check that message received from CADET service is well-formed.
1247  *
1248  * @param cls the `struct GNUNET_CADET_Handle`
1249  * @param message the message we got
1250  * @return #GNUNET_OK if the message is well-formed,
1251  *         #GNUNET_SYSERR otherwise
1252  */
1253 static int
1254 check_get_peers (void *cls,
1255                  const struct GNUNET_CADET_LocalInfoPeer *message)
1256 {
1257   struct GNUNET_CADET_Handle *h = cls;
1258   uint16_t size;
1259
1260   if (NULL == h->info_cb.peers_cb)
1261   {
1262     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1263                 "  no handler for peesr monitor message!\n");
1264     return GNUNET_SYSERR;
1265   }
1266
1267   size = ntohs (message->header.size);
1268   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1269   {
1270     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1271     h->info_cb.peers_cb = NULL;
1272     h->info_cls = NULL;
1273     return GNUNET_SYSERR;
1274   }
1275
1276   return GNUNET_OK;
1277 }
1278
1279
1280 /**
1281  * Process a local reply about info on all tunnels, pass info to the user.
1282  *
1283  * @param cls Closure (Cadet handle).
1284  * @param msg Message itself.
1285  */
1286 static void
1287 handle_get_peers (void *cls,
1288                   const struct GNUNET_CADET_LocalInfoPeer *msg)
1289 {
1290   struct GNUNET_CADET_Handle *h = cls;
1291   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1292                        (int) ntohs (msg->tunnel),
1293                        (unsigned int ) ntohs (msg->paths),
1294                        0);
1295 }
1296
1297
1298 /**
1299  * Check that message received from CADET service is well-formed.
1300  *
1301  * @param cls the `struct GNUNET_CADET_Handle`
1302  * @param message the message we got
1303  * @return #GNUNET_OK if the message is well-formed,
1304  *         #GNUNET_SYSERR otherwise
1305  */
1306 static int
1307 check_get_peer (void *cls,
1308                 const struct GNUNET_CADET_LocalInfoPeer *message)
1309 {
1310   struct GNUNET_CADET_Handle *h = cls;
1311   const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1312   struct GNUNET_PeerIdentity *paths_array;
1313   size_t esize;
1314   unsigned int epaths;
1315   unsigned int paths;
1316   unsigned int peers;
1317
1318   if (NULL == h->info_cb.peer_cb)
1319   {
1320     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1321                 "  no handler for peer monitor message!\n");
1322     goto clean_cls;
1323   }
1324
1325   /* Verify message sanity */
1326   esize = ntohs (message->header.size);
1327   if (esize < msize)
1328   {
1329     GNUNET_break_op (0);
1330     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1331     goto clean_cls;
1332   }
1333   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
1334   {
1335     GNUNET_break_op (0);
1336     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1337     goto clean_cls;
1338
1339   }
1340   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
1341   epaths = (unsigned int) ntohs (message->paths);
1342   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1343   paths = 0;
1344   for (int i = 0; i < peers; i++)
1345   {
1346     if (0 == memcmp (&paths_array[i], &message->destination,
1347                      sizeof (struct GNUNET_PeerIdentity)))
1348     {
1349       paths++;
1350     }
1351   }
1352   if (paths != epaths)
1353   {
1354     GNUNET_break_op (0);
1355     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1356     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1357     goto clean_cls;
1358   }
1359
1360   return GNUNET_OK;
1361
1362 clean_cls:
1363   h->info_cb.peer_cb = NULL;
1364   h->info_cls = NULL;
1365   return GNUNET_SYSERR;
1366 }
1367
1368
1369 /**
1370  * Process a local peer info reply, pass info to the user.
1371  *
1372  * @param cls Closure (Cadet handle).
1373  * @param message Message itself.
1374  */
1375 static void
1376 handle_get_peer (void *cls,
1377                  const struct GNUNET_CADET_LocalInfoPeer *message)
1378 {
1379   struct GNUNET_CADET_Handle *h = cls;
1380   struct GNUNET_PeerIdentity *paths_array;
1381   unsigned int paths;
1382   unsigned int path_length;
1383   int neighbor;
1384   unsigned int peers;
1385
1386   paths = (unsigned int) ntohs (message->paths);
1387   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1388   peers = (ntohs (message->header.size) - sizeof (*message))
1389           / sizeof (struct GNUNET_PeerIdentity);
1390   path_length = 0;
1391   neighbor = GNUNET_NO;
1392
1393   for (int i = 0; i < peers; i++)
1394   {
1395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
1396     path_length++;
1397     if (0 == memcmp (&paths_array[i], &message->destination,
1398                      sizeof (struct GNUNET_PeerIdentity)))
1399     {
1400       if (1 == path_length)
1401         neighbor = GNUNET_YES;
1402       path_length = 0;
1403     }
1404   }
1405
1406   /* Call Callback with tunnel info. */
1407   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1408   h->info_cb.peer_cb (h->info_cls,
1409                       &message->destination,
1410                       (int) ntohs (message->tunnel),
1411                       neighbor,
1412                       paths,
1413                       paths_array);
1414 }
1415
1416
1417 /**
1418  * Check that message received from CADET service is well-formed.
1419  *
1420  * @param cls the `struct GNUNET_CADET_Handle`
1421  * @param msg the message we got
1422  * @return #GNUNET_OK if the message is well-formed,
1423  *         #GNUNET_SYSERR otherwise
1424  */
1425 static int
1426 check_get_tunnels (void *cls,
1427                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1428 {
1429   struct GNUNET_CADET_Handle *h = cls;
1430   uint16_t size;
1431
1432   if (NULL == h->info_cb.tunnels_cb)
1433   {
1434     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1435                 "  no handler for tunnels monitor message!\n");
1436     return GNUNET_SYSERR;
1437   }
1438
1439   size = ntohs (msg->header.size);
1440   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1441   {
1442     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1443     h->info_cb.tunnels_cb = NULL;
1444     h->info_cls = NULL;
1445     return GNUNET_SYSERR;
1446   }
1447   return GNUNET_OK;
1448 }
1449
1450
1451 /**
1452  * Process a local reply about info on all tunnels, pass info to the user.
1453  *
1454  * @param cls Closure (Cadet handle).
1455  * @param message Message itself.
1456  */
1457 static void
1458 handle_get_tunnels (void *cls,
1459                     const struct GNUNET_CADET_LocalInfoTunnel *msg)
1460 {
1461   struct GNUNET_CADET_Handle *h = cls;
1462
1463   h->info_cb.tunnels_cb (h->info_cls,
1464                          &msg->destination,
1465                          ntohl (msg->channels),
1466                          ntohl (msg->connections),
1467                          ntohs (msg->estate),
1468                          ntohs (msg->cstate));
1469
1470 }
1471
1472
1473 /**
1474  * Check that message received from CADET service is well-formed.
1475  *
1476  * @param cls the `struct GNUNET_CADET_Handle`
1477  * @param msg the message we got
1478  * @return #GNUNET_OK if the message is well-formed,
1479  *         #GNUNET_SYSERR otherwise
1480  */
1481 static int
1482 check_get_tunnel (void *cls,
1483                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
1484 {
1485   struct GNUNET_CADET_Handle *h = cls;
1486   unsigned int ch_n;
1487   unsigned int c_n;
1488   size_t esize;
1489   size_t msize;
1490
1491   if (NULL == h->info_cb.tunnel_cb)
1492   {
1493     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1494                 "  no handler for tunnel monitor message!\n");
1495     goto clean_cls;
1496   }
1497
1498   /* Verify message sanity */
1499   msize = ntohs (msg->header.size);
1500   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1501   if (esize > msize)
1502   {
1503     GNUNET_break_op (0);
1504     h->info_cb.tunnel_cb (h->info_cls,
1505                           NULL, 0, 0, NULL, NULL, 0, 0);
1506     goto clean_cls;
1507   }
1508   ch_n = ntohl (msg->channels);
1509   c_n = ntohl (msg->connections);
1510   esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber);
1511   esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier);
1512   if (msize != esize)
1513   {
1514     GNUNET_break_op (0);
1515     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516                 "m:%u, e: %u (%u ch, %u conn)\n",
1517                 (unsigned int) msize,
1518                 (unsigned int) esize,
1519                 ch_n,
1520                 c_n);
1521     h->info_cb.tunnel_cb (h->info_cls,
1522                           NULL, 0, 0, NULL, NULL, 0, 0);
1523     goto clean_cls;
1524   }
1525
1526   return GNUNET_OK;
1527
1528 clean_cls:
1529   h->info_cb.tunnel_cb = NULL;
1530   h->info_cls = NULL;
1531   return GNUNET_SYSERR;
1532 }
1533
1534
1535 /**
1536  * Process a local tunnel info reply, pass info to the user.
1537  *
1538  * @param cls Closure (Cadet handle).
1539  * @param msg Message itself.
1540  */
1541 static void
1542 handle_get_tunnel (void *cls,
1543                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1544 {
1545   struct GNUNET_CADET_Handle *h = cls;
1546   unsigned int ch_n;
1547   unsigned int c_n;
1548   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
1549   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
1550
1551   ch_n = ntohl (msg->channels);
1552   c_n = ntohl (msg->connections);
1553
1554   /* Call Callback with tunnel info. */
1555   conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
1556   chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n];
1557   h->info_cb.tunnel_cb (h->info_cls,
1558                         &msg->destination,
1559                         ch_n,
1560                         c_n,
1561                         chns,
1562                         conns,
1563                         ntohs (msg->estate),
1564                         ntohs (msg->cstate));
1565 }
1566
1567
1568 /**
1569  * Reconnect to the service, retransmit all infomation to try to restore the
1570  * original state.
1571  *
1572  * @param h handle to the cadet
1573  */
1574 static void
1575 reconnect (struct GNUNET_CADET_Handle *h)
1576 {
1577   struct GNUNET_MQ_MessageHandler handlers[] = {
1578     GNUNET_MQ_hd_fixed_size (channel_created,
1579                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
1580                              struct GNUNET_CADET_LocalChannelCreateMessage,
1581                              h),
1582     GNUNET_MQ_hd_fixed_size (channel_destroy,
1583                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
1584                              struct GNUNET_CADET_LocalChannelDestroyMessage,
1585                              h),
1586     GNUNET_MQ_hd_var_size (local_data,
1587                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
1588                            struct GNUNET_CADET_LocalData,
1589                            h),
1590     GNUNET_MQ_hd_fixed_size (local_ack,
1591                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
1592                              struct GNUNET_CADET_LocalAck,
1593                              h),
1594     GNUNET_MQ_hd_var_size (get_peers,
1595                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
1596                            struct GNUNET_CADET_LocalInfoPeer,
1597                            h),
1598     GNUNET_MQ_hd_var_size (get_peer,
1599                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
1600                            struct GNUNET_CADET_LocalInfoPeer,
1601                            h),
1602     GNUNET_MQ_hd_var_size (get_tunnels,
1603                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
1604                            struct GNUNET_CADET_LocalInfoTunnel,
1605                            h),
1606     GNUNET_MQ_hd_var_size (get_tunnel,
1607                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
1608                            struct GNUNET_CADET_LocalInfoTunnel,
1609                            h),
1610 // FIXME
1611 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
1612 //                        GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
1613 //                        struct GNUNET_CADET_ChannelDestroyMessage);
1614     GNUNET_MQ_handler_end ()
1615   };
1616   struct GNUNET_CADET_Channel *ch;
1617
1618   while (NULL != (ch = h->channels_head))
1619   {
1620     LOG (GNUNET_ERROR_TYPE_DEBUG,
1621          "Destroying channel due to a reconnect\n");
1622     destroy_channel (ch);
1623   }
1624
1625   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
1626
1627   if (NULL != h->mq)
1628   {
1629     GNUNET_MQ_destroy (h->mq);
1630     h->mq = NULL;
1631   }
1632   h->mq = GNUNET_CLIENT_connect (h->cfg,
1633                                  "cadet",
1634                                  handlers,
1635                                  &handle_mq_error,
1636                                  h);
1637   if (NULL == h->mq)
1638   {
1639     schedule_reconnect (h);
1640     return;
1641   }
1642   else
1643   {
1644     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1645   }
1646 }
1647
1648 /**
1649  * Reconnect callback: tries to reconnect again after a failer previous
1650  * reconnecttion
1651  *
1652  * @param cls closure (cadet handle)
1653  */
1654 static void
1655 reconnect_cbk (void *cls)
1656 {
1657   struct GNUNET_CADET_Handle *h = cls;
1658
1659   h->reconnect_task = NULL;
1660   reconnect (h);
1661 }
1662
1663
1664 /**
1665  * Reconnect to the service, retransmit all infomation to try to restore the
1666  * original state.
1667  *
1668  * @param h handle to the cadet
1669  *
1670  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
1671  */
1672 static void
1673 schedule_reconnect (struct GNUNET_CADET_Handle *h)
1674 {
1675   if (NULL == h->reconnect_task)
1676   {
1677     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
1678                                                       &reconnect_cbk, h);
1679     h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
1680   }
1681 }
1682
1683
1684 /******************************************************************************/
1685 /**********************      API CALL DEFINITIONS     *************************/
1686 /******************************************************************************/
1687
1688 struct GNUNET_CADET_Handle *
1689 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1690                       void *cls,
1691                       GNUNET_CADET_ChannelEndHandler cleaner,
1692                       const struct GNUNET_CADET_MessageHandler *handlers)
1693 {
1694   struct GNUNET_CADET_Handle *h;
1695
1696   h = GNUNET_new (struct GNUNET_CADET_Handle);
1697   LOG (GNUNET_ERROR_TYPE_DEBUG,
1698        "GNUNET_CADET_connect() %p\n",
1699        h);
1700   h->cfg = cfg;
1701   h->cleaner = cleaner;
1702   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1703   reconnect (h);
1704   if (h->mq == NULL)
1705   {
1706     GNUNET_break (0);
1707     GNUNET_CADET_disconnect (h);
1708     return NULL;
1709   }
1710   h->cls = cls;
1711   h->message_handlers = handlers;
1712   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1713   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1714   h->reconnect_task = NULL;
1715
1716   /* count handlers */
1717   for (h->n_handlers = 0;
1718        handlers && handlers[h->n_handlers].type;
1719        h->n_handlers++) ;
1720   return h;
1721 }
1722
1723
1724 /**
1725  * Disconnect from the cadet service. All channels will be destroyed. All channel
1726  * disconnect callbacks will be called on any still connected peers, notifying
1727  * about their disconnection. The registered inbound channel cleaner will be
1728  * called should any inbound channels still exist.
1729  *
1730  * @param handle connection to cadet to disconnect
1731  */
1732 void
1733 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1734 {
1735   struct GNUNET_CADET_Channel *ch;
1736   struct GNUNET_CADET_Channel *aux;
1737   struct GNUNET_CADET_TransmitHandle *th;
1738
1739   LOG (GNUNET_ERROR_TYPE_DEBUG,
1740        "CADET DISCONNECT\n");
1741   ch = handle->channels_head;
1742   while (NULL != ch)
1743   {
1744     aux = ch->next;
1745     if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1746     {
1747       GNUNET_break (0);
1748       LOG (GNUNET_ERROR_TYPE_DEBUG,
1749            "channel %X not destroyed\n",
1750            ntohl (ch->ccn.channel_of_client));
1751     }
1752     destroy_channel (ch);
1753     ch = aux;
1754   }
1755   while (NULL != (th = handle->th_head))
1756   {
1757     struct GNUNET_MessageHeader *msg;
1758
1759     /* Make sure it is an allowed packet (everything else should have been
1760      * already canceled).
1761      */
1762     GNUNET_break (GNUNET_NO == th_is_payload (th));
1763     msg = (struct GNUNET_MessageHeader *) &th[1];
1764     switch (ntohs(msg->type))
1765     {
1766       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1767       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1768       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN:
1769       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE:
1770       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1771       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1772       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1773       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1774       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1775       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1776         break;
1777       default:
1778         GNUNET_break (0);
1779         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1780              GC_m2s (ntohs(msg->type)));
1781     }
1782
1783     GNUNET_CADET_notify_transmit_ready_cancel (th);
1784   }
1785
1786   if (NULL != handle->mq)
1787   {
1788     GNUNET_MQ_destroy (handle->mq);
1789     handle->mq = NULL;
1790   }
1791   if (NULL != handle->reconnect_task)
1792   {
1793     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1794     handle->reconnect_task = NULL;
1795   }
1796
1797   GNUNET_CONTAINER_multihashmap_destroy (handle->ports);
1798   handle->ports = NULL;
1799   GNUNET_free (handle);
1800 }
1801
1802
1803 /**
1804  * Open a port to receive incomming channels.
1805  *
1806  * @param h CADET handle.
1807  * @param port Hash representing the port number.
1808  * @param new_channel Function called when an channel is received.
1809  * @param new_channel_cls Closure for @a new_channel.
1810  * @return Port handle.
1811  */
1812 struct GNUNET_CADET_Port *
1813 GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1814                         const struct GNUNET_HashCode *port,
1815                         GNUNET_CADET_InboundChannelNotificationHandler
1816                             new_channel,
1817                         void *new_channel_cls)
1818 {
1819   struct GNUNET_CADET_PortMessage *msg;
1820   struct GNUNET_MQ_Envelope *env;
1821   struct GNUNET_CADET_Port *p;
1822
1823   GNUNET_assert (NULL != new_channel);
1824   p = GNUNET_new (struct GNUNET_CADET_Port);
1825   p->cadet = h;
1826   p->hash = GNUNET_new (struct GNUNET_HashCode);
1827   *p->hash = *port;
1828   p->handler = new_channel;
1829   p->cls = new_channel_cls;
1830   GNUNET_assert (GNUNET_OK ==
1831                  GNUNET_CONTAINER_multihashmap_put (h->ports,
1832                                                     p->hash,
1833                                                     p,
1834                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1835
1836   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1837   msg->port = *p->hash;
1838   GNUNET_MQ_send (h->mq, env);
1839
1840   return p;
1841 }
1842
1843 /**
1844  * Close a port opened with @a GNUNET_CADET_open_port.
1845  * The @a new_channel callback will no longer be called.
1846  *
1847  * @param p Port handle.
1848  */
1849 void
1850 GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1851 {
1852   struct GNUNET_CADET_PortMessage *msg;
1853   struct GNUNET_MQ_Envelope *env;
1854   struct GNUNET_HashCode *id;
1855
1856   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1857
1858   id = NULL != p->hash ? p->hash : &p->id;
1859   msg->port = *id;
1860   GNUNET_MQ_send (p->cadet->mq, env);
1861   GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
1862   GNUNET_free_non_null (p->hash);
1863   GNUNET_free (p);
1864 }
1865
1866
1867 /**
1868  * Create a new channel towards a remote peer.
1869  *
1870  * If the destination port is not open by any peer or the destination peer
1871  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1872  * for this channel.
1873  *
1874  * @param h cadet handle
1875  * @param channel_ctx client's channel context to associate with the channel
1876  * @param peer peer identity the channel should go to
1877  * @param port Port hash (port number).
1878  * @param options CadetOption flag field, with all desired option bits set to 1.
1879  * @return handle to the channel
1880  */
1881 struct GNUNET_CADET_Channel *
1882 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1883                             void *channel_ctx,
1884                             const struct GNUNET_PeerIdentity *peer,
1885                             const struct GNUNET_HashCode *port,
1886                             enum GNUNET_CADET_ChannelOption options)
1887 {
1888   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
1889   struct GNUNET_MQ_Envelope *env;
1890   struct GNUNET_CADET_Channel *ch;
1891   struct GNUNET_CADET_ClientChannelNumber ccn;
1892
1893   ccn.channel_of_client = htonl (0);
1894   ch = create_channel (h, ccn);
1895   ch->ctx = channel_ctx;
1896   ch->peer = GNUNET_PEER_intern (peer);
1897
1898   LOG (GNUNET_ERROR_TYPE_DEBUG,
1899        "Creating new channel to %s:%u at %p number %X\n",
1900        GNUNET_i2s (peer),
1901        port,
1902        ch,
1903        ntohl (ch->ccn.channel_of_client));
1904   env = GNUNET_MQ_msg (msg,
1905                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1906   msg->ccn = ch->ccn;
1907   msg->port = *port;
1908   msg->peer = *peer;
1909   msg->opt = htonl (options);
1910   GNUNET_MQ_send (h->mq,
1911                   env);
1912   return ch;
1913 }
1914
1915
1916 void
1917 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1918 {
1919   struct GNUNET_CADET_Handle *h;
1920   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
1921   struct GNUNET_MQ_Envelope *env;
1922   struct GNUNET_CADET_TransmitHandle *th;
1923   struct GNUNET_CADET_TransmitHandle *next;
1924
1925   LOG (GNUNET_ERROR_TYPE_DEBUG,
1926        "Destroying channel\n");
1927   h = channel->cadet;
1928   for  (th = h->th_head; th != NULL; th = next)
1929   {
1930     next = th->next;
1931     if (th->channel == channel)
1932     {
1933       GNUNET_break (0);
1934       if (GNUNET_YES == th_is_payload (th))
1935       {
1936         /* applications should cancel before destroying channel */
1937         LOG (GNUNET_ERROR_TYPE_WARNING,
1938              "Channel destroyed without cancelling transmission requests\n");
1939         th->notify (th->notify_cls, 0, NULL);
1940       }
1941       else
1942       {
1943         LOG (GNUNET_ERROR_TYPE_WARNING,
1944              "no meta-traffic should be queued\n");
1945       }
1946       GNUNET_CONTAINER_DLL_remove (h->th_head,
1947                                    h->th_tail,
1948                                    th);
1949       GNUNET_CADET_notify_transmit_ready_cancel (th);
1950     }
1951   }
1952
1953   env = GNUNET_MQ_msg (msg,
1954                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1955   msg->ccn = channel->ccn;
1956   GNUNET_MQ_send (h->mq,
1957                   env);
1958
1959   destroy_channel (channel);
1960 }
1961
1962
1963 /**
1964  * Get information about a channel.
1965  *
1966  * @param channel Channel handle.
1967  * @param option Query (GNUNET_CADET_OPTION_*).
1968  * @param ... dependant on option, currently not used
1969  *
1970  * @return Union with an answer to the query.
1971  */
1972 const union GNUNET_CADET_ChannelInfo *
1973 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1974                               enum GNUNET_CADET_ChannelOption option, ...)
1975 {
1976   static int bool_flag;
1977   const union GNUNET_CADET_ChannelInfo *ret;
1978
1979   switch (option)
1980   {
1981     case GNUNET_CADET_OPTION_NOBUFFER:
1982     case GNUNET_CADET_OPTION_RELIABLE:
1983     case GNUNET_CADET_OPTION_OUT_OF_ORDER:
1984       if (0 != (option & channel->options))
1985         bool_flag = GNUNET_YES;
1986       else
1987         bool_flag = GNUNET_NO;
1988       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1989       break;
1990     case GNUNET_CADET_OPTION_PEER:
1991       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1992       break;
1993     default:
1994       GNUNET_break (0);
1995       return NULL;
1996   }
1997
1998   return ret;
1999 }
2000
2001
2002 struct GNUNET_CADET_TransmitHandle *
2003 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
2004                                     int cork,
2005                                     struct GNUNET_TIME_Relative maxdelay,
2006                                     size_t notify_size,
2007                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
2008                                     void *notify_cls)
2009 {
2010   struct GNUNET_CADET_TransmitHandle *th;
2011
2012   GNUNET_assert (NULL != channel);
2013   GNUNET_assert (NULL != notify);
2014   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
2015   LOG (GNUNET_ERROR_TYPE_DEBUG,
2016        "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
2017        ntohl (channel->ccn.channel_of_client),
2018        channel->allow_send,
2019        (ntohl (channel->ccn.channel_of_client) >=
2020         GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
2021        ? "origin"
2022        : "destination",
2023        (unsigned int) notify_size);
2024   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
2025   {
2026     LOG (GNUNET_ERROR_TYPE_WARNING,
2027          "CADET transmit ready timeout is deprected (has no effect)\n");
2028   }
2029
2030   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
2031   th->channel = channel;
2032   th->size = notify_size;
2033   th->notify = notify;
2034   th->notify_cls = notify_cls;
2035   if (0 != channel->allow_send)
2036     th->request_data_task
2037       = GNUNET_SCHEDULER_add_now (&request_data,
2038                                   th);
2039   else
2040     add_to_queue (channel->cadet,
2041                   th);
2042   return th;
2043 }
2044
2045
2046 void
2047 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
2048 {
2049   if (NULL != th->request_data_task)
2050   {
2051     GNUNET_SCHEDULER_cancel (th->request_data_task);
2052     th->request_data_task = NULL;
2053   }
2054   remove_from_queue (th);
2055   GNUNET_free (th);
2056 }
2057
2058
2059 /**
2060  * Send an ack on the channel to confirm the processing of a message.
2061  *
2062  * @param ch Channel on which to send the ACK.
2063  */
2064 void
2065 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
2066 {
2067   struct GNUNET_CADET_LocalAck *msg;
2068   struct GNUNET_MQ_Envelope *env;
2069
2070   env = GNUNET_MQ_msg (msg,
2071                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
2072   LOG (GNUNET_ERROR_TYPE_DEBUG,
2073        "Sending ACK on channel %X\n",
2074        ntohl (channel->ccn.channel_of_client));
2075   msg->ccn = channel->ccn;
2076   GNUNET_MQ_send (channel->cadet->mq,
2077                   env);
2078 }
2079
2080
2081 static void
2082 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
2083 {
2084   struct GNUNET_MessageHeader *msg;
2085   struct GNUNET_MQ_Envelope *env;
2086
2087   LOG (GNUNET_ERROR_TYPE_DEBUG,
2088        " Sending %s monitor message to service\n",
2089        GC_m2s(type));
2090
2091   env = GNUNET_MQ_msg (msg, type);
2092   GNUNET_MQ_send (h->mq, env);
2093 }
2094
2095
2096 /**
2097  * Request a debug dump on the service's STDERR.
2098  *
2099  * WARNING: unstable API, likely to change in the future!
2100  *
2101  * @param h cadet handle
2102  */
2103 void
2104 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
2105 {
2106   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
2107   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
2108 }
2109
2110
2111 /**
2112  * Request information about peers known to the running cadet service.
2113  * The callback will be called for every peer known to the service.
2114  * Only one info request (of any kind) can be active at once.
2115  *
2116  *
2117  * WARNING: unstable API, likely to change in the future!
2118  *
2119  * @param h Handle to the cadet peer.
2120  * @param callback Function to call with the requested data.
2121  * @param callback_cls Closure for @c callback.
2122  *
2123  * @return #GNUNET_OK / #GNUNET_SYSERR
2124  */
2125 int
2126 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
2127                        GNUNET_CADET_PeersCB callback,
2128                        void *callback_cls)
2129 {
2130   if (NULL != h->info_cb.peers_cb)
2131   {
2132     GNUNET_break (0);
2133     return GNUNET_SYSERR;
2134   }
2135   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
2136   h->info_cb.peers_cb = callback;
2137   h->info_cls = callback_cls;
2138   return GNUNET_OK;
2139 }
2140
2141
2142 /**
2143  * Cancel a peer info request. The callback will not be called (anymore).
2144  *
2145  * WARNING: unstable API, likely to change in the future!
2146  *
2147  * @param h Cadet handle.
2148  *
2149  * @return Closure given to GNUNET_CADET_get_peers.
2150  */
2151 void *
2152 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
2153 {
2154   void *cls;
2155
2156   cls = h->info_cls;
2157   h->info_cb.peers_cb = NULL;
2158   h->info_cls = NULL;
2159   return cls;
2160 }
2161
2162
2163 /**
2164  * Request information about a peer known to the running cadet peer.
2165  * The callback will be called for the tunnel once.
2166  * Only one info request (of any kind) can be active at once.
2167  *
2168  * WARNING: unstable API, likely to change in the future!
2169  *
2170  * @param h Handle to the cadet peer.
2171  * @param id Peer whose tunnel to examine.
2172  * @param callback Function to call with the requested data.
2173  * @param callback_cls Closure for @c callback.
2174  *
2175  * @return #GNUNET_OK / #GNUNET_SYSERR
2176  */
2177 int
2178 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
2179                        const struct GNUNET_PeerIdentity *id,
2180                        GNUNET_CADET_PeerCB callback,
2181                        void *callback_cls)
2182 {
2183   struct GNUNET_CADET_LocalInfo *msg;
2184   struct GNUNET_MQ_Envelope *env;
2185
2186   if (NULL != h->info_cb.peer_cb)
2187   {
2188     GNUNET_break (0);
2189     return GNUNET_SYSERR;
2190   }
2191
2192   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
2193   msg->peer = *id;
2194   GNUNET_MQ_send (h->mq, env);
2195
2196   h->info_cb.peer_cb = callback;
2197   h->info_cls = callback_cls;
2198   return GNUNET_OK;
2199 }
2200
2201
2202 /**
2203  * Request information about tunnels of the running cadet peer.
2204  * The callback will be called for every tunnel of the service.
2205  * Only one info request (of any kind) can be active at once.
2206  *
2207  * WARNING: unstable API, likely to change in the future!
2208  *
2209  * @param h Handle to the cadet peer.
2210  * @param callback Function to call with the requested data.
2211  * @param callback_cls Closure for @c callback.
2212  *
2213  * @return #GNUNET_OK / #GNUNET_SYSERR
2214  */
2215 int
2216 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
2217                          GNUNET_CADET_TunnelsCB callback,
2218                          void *callback_cls)
2219 {
2220   if (NULL != h->info_cb.tunnels_cb)
2221   {
2222     GNUNET_break (0);
2223     return GNUNET_SYSERR;
2224   }
2225   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
2226   h->info_cb.tunnels_cb = callback;
2227   h->info_cls = callback_cls;
2228   return GNUNET_OK;
2229 }
2230
2231
2232 /**
2233  * Cancel a monitor request. The monitor callback will not be called.
2234  *
2235  * @param h Cadet handle.
2236  *
2237  * @return Closure given to GNUNET_CADET_get_tunnels.
2238  */
2239 void *
2240 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
2241 {
2242   void *cls;
2243
2244   h->info_cb.tunnels_cb = NULL;
2245   cls = h->info_cls;
2246   h->info_cls = NULL;
2247
2248   return cls;
2249 }
2250
2251
2252
2253 /**
2254  * Request information about a tunnel of the running cadet peer.
2255  * The callback will be called for the tunnel once.
2256  * Only one info request (of any kind) can be active at once.
2257  *
2258  * WARNING: unstable API, likely to change in the future!
2259  *
2260  * @param h Handle to the cadet peer.
2261  * @param id Peer whose tunnel to examine.
2262  * @param callback Function to call with the requested data.
2263  * @param callback_cls Closure for @c callback.
2264  *
2265  * @return #GNUNET_OK / #GNUNET_SYSERR
2266  */
2267 int
2268 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2269                         const struct GNUNET_PeerIdentity *id,
2270                         GNUNET_CADET_TunnelCB callback,
2271                         void *callback_cls)
2272 {
2273   struct GNUNET_CADET_LocalInfo *msg;
2274   struct GNUNET_MQ_Envelope *env;
2275
2276   if (NULL != h->info_cb.tunnel_cb)
2277   {
2278     GNUNET_break (0);
2279     return GNUNET_SYSERR;
2280   }
2281
2282   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2283   msg->peer = *id;
2284   GNUNET_MQ_send (h->mq, env);
2285
2286   h->info_cb.tunnel_cb = callback;
2287   h->info_cls = callback_cls;
2288   return GNUNET_OK;
2289 }
2290
2291
2292 /**
2293  * Request information about a specific channel of the running cadet peer.
2294  *
2295  * WARNING: unstable API, likely to change in the future!
2296  * FIXME Add destination option.
2297  *
2298  * @param h Handle to the cadet peer.
2299  * @param initiator ID of the owner of the channel.
2300  * @param channel_number Channel number.
2301  * @param callback Function to call with the requested data.
2302  * @param callback_cls Closure for @c callback.
2303  *
2304  * @return #GNUNET_OK / #GNUNET_SYSERR
2305  */
2306 int
2307 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2308                            struct GNUNET_PeerIdentity *initiator,
2309                            unsigned int channel_number,
2310                            GNUNET_CADET_ChannelCB callback,
2311                            void *callback_cls)
2312 {
2313   struct GNUNET_CADET_LocalInfo *msg;
2314   struct GNUNET_MQ_Envelope *env;
2315
2316   if (NULL != h->info_cb.channel_cb)
2317   {
2318     GNUNET_break (0);
2319     return GNUNET_SYSERR;
2320   }
2321
2322   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2323   msg->peer = *initiator;
2324   msg->ccn.channel_of_client = htonl (channel_number);
2325   GNUNET_MQ_send (h->mq, env);
2326
2327   h->info_cb.channel_cb = callback;
2328   h->info_cls = callback_cls;
2329   return GNUNET_OK;
2330 }
2331
2332
2333 /**
2334  * Function called to notify a client about the connection
2335  * begin ready to queue more data.  "buf" will be
2336  * NULL and "size" zero if the connection was closed for
2337  * writing in the meantime.
2338  *
2339  * @param cls closure
2340  * @param size number of bytes available in buf
2341  * @param buf where the callee should write the message
2342  * @return number of bytes written to buf
2343  */
2344 static size_t
2345 cadet_mq_ntr (void *cls, size_t size,
2346              void *buf)
2347 {
2348   struct GNUNET_MQ_Handle *mq = cls;
2349   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2350   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2351   uint16_t msize;
2352
2353   state->th = NULL;
2354   if (NULL == buf)
2355   {
2356     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2357     return 0;
2358   }
2359   msize = ntohs (msg->size);
2360   GNUNET_assert (msize <= size);
2361   GNUNET_memcpy (buf, msg, msize);
2362   GNUNET_MQ_impl_send_continue (mq);
2363   return msize;
2364 }
2365
2366
2367 /**
2368  * Signature of functions implementing the
2369  * sending functionality of a message queue.
2370  *
2371  * @param mq the message queue
2372  * @param msg the message to send
2373  * @param impl_state state of the implementation
2374  */
2375 static void
2376 cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2377                         const struct GNUNET_MessageHeader *msg,
2378                         void *impl_state)
2379 {
2380   struct CadetMQState *state = impl_state;
2381
2382   GNUNET_assert (NULL == state->th);
2383   state->th =
2384       GNUNET_CADET_notify_transmit_ready (state->channel,
2385                                          /* FIXME: add option for corking */
2386                                          GNUNET_NO,
2387                                          GNUNET_TIME_UNIT_FOREVER_REL,
2388                                          ntohs (msg->size),
2389                                          &cadet_mq_ntr, mq);
2390
2391 }
2392
2393
2394 /**
2395  * Signature of functions implementing the
2396  * destruction of a message queue.
2397  * Implementations must not free 'mq', but should
2398  * take care of 'impl_state'.
2399  *
2400  * @param mq the message queue to destroy
2401  * @param impl_state state of the implementation
2402  */
2403 static void
2404 cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2405                            void *impl_state)
2406 {
2407   struct CadetMQState *state = impl_state;
2408
2409   if (NULL != state->th)
2410     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2411
2412   GNUNET_free (state);
2413 }
2414
2415
2416 /**
2417  * Create a message queue for a cadet channel.
2418  * The message queue can only be used to transmit messages,
2419  * not to receive them.
2420  *
2421  * @param channel the channel to create the message qeue for
2422  * @return a message queue to messages over the channel
2423  */
2424 struct GNUNET_MQ_Handle *
2425 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2426 {
2427   struct GNUNET_MQ_Handle *mq;
2428   struct CadetMQState *state;
2429
2430   state = GNUNET_new (struct CadetMQState);
2431   state->channel = channel;
2432
2433   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2434                                       &cadet_mq_destroy_impl_old,
2435                                       NULL, /* FIXME: cancel impl. */
2436                                       state,
2437                                       NULL, /* no msg handlers */
2438                                       NULL, /* no err handlers */
2439                                       NULL); /* no handler cls */
2440   return mq;
2441 }
2442
2443
2444 /**
2445  * Transitional function to convert an unsigned int port to a hash value.
2446  * WARNING: local static value returned, NOT reentrant!
2447  * WARNING: do not use this function for new code!
2448  *
2449  * @param port Numerical port (unsigned int format).
2450  *
2451  * @return A GNUNET_HashCode usable for the new CADET API.
2452  */
2453 const struct GNUNET_HashCode *
2454 GC_u2h (uint32_t port)
2455 {
2456   static struct GNUNET_HashCode hash;
2457
2458   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2459               "This is a transitional function, "
2460               "use proper crypto hashes as CADET ports\n");
2461   GNUNET_CRYPTO_hash (&port, sizeof (port), &hash);
2462
2463   return &hash;
2464 }
2465
2466
2467
2468 /******************************************************************************/
2469 /******************************* MQ-BASED API *********************************/
2470 /******************************************************************************/
2471
2472 /**
2473  * Connect to the MQ-based cadet service.
2474  *
2475  * @param cfg Configuration to use.
2476  *
2477  * @return Handle to the cadet service NULL on error.
2478  */
2479 struct GNUNET_CADET_Handle *
2480 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2481 {
2482   struct GNUNET_CADET_Handle *h;
2483
2484   LOG (GNUNET_ERROR_TYPE_DEBUG,
2485        "GNUNET_CADET_connecT()\n");
2486   h = GNUNET_new (struct GNUNET_CADET_Handle);
2487   h->cfg = cfg;
2488   h->mq_api = GNUNET_YES;
2489   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2490   reconnect (h);
2491   if (NULL == h->mq)
2492   {
2493     GNUNET_break (0);
2494     GNUNET_CADET_disconnect (h);
2495     return NULL;
2496   }
2497   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2498   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2499   h->reconnect_task = NULL;
2500
2501   return h;
2502 }
2503
2504
2505 /**
2506  * Open a port to receive incomming MQ-based channels.
2507  *
2508  * @param h CADET handle.
2509  * @param port Hash identifying the port.
2510  * @param connects Function called when an incoming channel is connected.
2511  * @param connects_cls Closure for the @a connects handler.
2512  * @param window_changes Function called when the transmit window size changes.
2513  * @param disconnects Function called when a channel is disconnected.
2514  * @param handlers Callbacks for messages we care about, NULL-terminated.
2515  *
2516  * @return Port handle.
2517  */
2518 struct GNUNET_CADET_Port *
2519 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2520                         const struct GNUNET_HashCode *port,
2521                         GNUNET_CADET_ConnectEventHandler connects,
2522                         void * connects_cls,
2523                         GNUNET_CADET_WindowSizeEventHandler window_changes,
2524                         GNUNET_CADET_DisconnectEventHandler disconnects,
2525                         const struct GNUNET_MQ_MessageHandler *handlers)
2526 {
2527   struct GNUNET_CADET_PortMessage *msg;
2528   struct GNUNET_MQ_Envelope *env;
2529   struct GNUNET_CADET_Port *p;
2530
2531   GNUNET_assert (NULL != connects);
2532   GNUNET_assert (NULL != disconnects);
2533
2534   p = GNUNET_new (struct GNUNET_CADET_Port);
2535   p->cadet = h;
2536   p->id = *port;
2537   p->connects = connects;
2538   p->cls = connects_cls;
2539   p->window_changes = window_changes;
2540   p->disconnects = disconnects;
2541   p->handlers = handlers;
2542
2543   GNUNET_assert (GNUNET_OK ==
2544                  GNUNET_CONTAINER_multihashmap_put (h->ports,
2545                                                     &p->id,
2546                                                     p,
2547                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2548
2549   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2550   msg->port = p->id;
2551   GNUNET_MQ_send (h->mq, env);
2552
2553   return p;
2554 }
2555
2556
2557 /**
2558  * Create a new channel towards a remote peer.
2559  *
2560  * If the destination port is not open by any peer or the destination peer
2561  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2562  * for this channel.
2563  *
2564  * @param h CADET handle.
2565  * @param channel_cls Closure for the channel. It's given to:
2566  *                    - The disconnect handler @a disconnects
2567  *                    - Each message type callback in @a handlers
2568  * @param destination Peer identity the channel should go to.
2569  * @param port Identification of the destination port.
2570  * @param options CadetOption flag field, with all desired option bits set to 1.
2571  * @param window_changes Function called when the transmit window size changes.
2572  * @param disconnects Function called when the channel is disconnected.
2573  * @param handlers Callbacks for messages we care about, NULL-terminated.
2574  *
2575  * @return Handle to the channel.
2576  */
2577 struct GNUNET_CADET_Channel *
2578 GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2579                              void *channel_cls,
2580                              const struct GNUNET_PeerIdentity *destination,
2581                              const struct GNUNET_HashCode *port,
2582                              enum GNUNET_CADET_ChannelOption options,
2583                              GNUNET_CADET_WindowSizeEventHandler window_changes,
2584                              GNUNET_CADET_DisconnectEventHandler disconnects,
2585                              const struct GNUNET_MQ_MessageHandler *handlers)
2586 {
2587   struct GNUNET_CADET_Channel *ch;
2588   struct GNUNET_CADET_ClientChannelNumber ccn;
2589   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2590   struct GNUNET_MQ_Envelope *env;
2591
2592   GNUNET_assert (NULL != disconnects);
2593
2594   /* Save parameters */
2595   ccn.channel_of_client = htonl (0);
2596   ch = create_channel (h, ccn);
2597   ch->ctx = channel_cls;
2598   ch->peer = GNUNET_PEER_intern (destination);
2599   ch->options = options;
2600   ch->window_changes = window_changes;
2601   ch->disconnects = disconnects;
2602
2603   /* Create MQ for channel */
2604   ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2605                                           &cadet_mq_destroy_impl,
2606                                           &cadet_mq_cancel_impl,
2607                                           ch,
2608                                           handlers,
2609                                           &cadet_mq_error_handler,
2610                                           ch);
2611   GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2612
2613   /* Request channel creation to service */
2614   env = GNUNET_MQ_msg (msg,
2615                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2616   msg->ccn = ch->ccn;
2617   msg->port = *port;
2618   msg->peer = *destination;
2619   msg->opt = htonl (options);
2620   GNUNET_MQ_send (h->mq,
2621                   env);
2622
2623   return ch;
2624 }
2625
2626
2627 /**
2628  * Obtain the message queue for a connected peer.
2629  *
2630  * @param channel The channel handle from which to get the MQ.
2631  *
2632  * @return NULL if @a channel is not yet connected.
2633  */
2634 struct GNUNET_MQ_Handle *
2635 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2636 {
2637   return channel->mq;
2638 }