9448cc7a5669b08042d3207d1d9eea7bb2553490
[oweals/gnunet.git] / src / cadet / cadet_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/cadet_api.c
22  * @brief cadet api: client implementation of cadet service
23  * @author Bartlomiej Polot
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_cadet_service.h"
30 #include "cadet.h"
31 #include "cadet_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
34
35 /******************************************************************************/
36 /************************      DATA STRUCTURES     ****************************/
37 /******************************************************************************/
38
39 /**
40  * Transmission queue to the service
41  *
42  * @deprecated
43  */
44 struct GNUNET_CADET_TransmitHandle
45 {
46   /**
47    * Double Linked list
48    */
49   struct GNUNET_CADET_TransmitHandle *next;
50
51   /**
52    * Double Linked list
53    */
54   struct GNUNET_CADET_TransmitHandle *prev;
55
56   /**
57    * Channel this message is sent on / for (may be NULL for control messages).
58    */
59   struct GNUNET_CADET_Channel *channel;
60
61   /**
62    * Request data task.
63    */
64   struct GNUNET_SCHEDULER_Task *request_data_task;
65
66   /**
67    * Callback to obtain the message to transmit, or NULL if we
68    * got the message in 'data'.  Notice that messages built
69    * by 'notify' need to be encapsulated with information about
70    * the 'target'.
71    */
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73
74   /**
75    * Closure for 'notify'
76    */
77   void *notify_cls;
78
79   /**
80    * Size of the payload.
81    */
82   size_t size;
83 };
84
85
86 union CadetInfoCB
87 {
88
89   /**
90    * Channel callback.
91    */
92   GNUNET_CADET_ChannelCB channel_cb;
93
94   /**
95    * Monitor callback
96    */
97   GNUNET_CADET_PeersCB peers_cb;
98
99   /**
100    * Monitor callback
101    */
102   GNUNET_CADET_PeerCB peer_cb;
103
104   /**
105    * Monitor callback
106    */
107   GNUNET_CADET_TunnelsCB tunnels_cb;
108
109   /**
110    * Tunnel callback.
111    */
112   GNUNET_CADET_TunnelCB tunnel_cb;
113 };
114
115
116 /**
117  * Opaque handle to the service.
118  */
119 struct GNUNET_CADET_Handle
120 {
121   /**
122    * Flag to indicate old or MQ API.
123    */
124   int mq_api;
125
126   /**
127    * Message queue (if available).
128    */
129   struct GNUNET_MQ_Handle *mq;
130
131   /**
132    * Set of handlers used for processing incoming messages in the channels
133    *
134    * @deprecated
135    */
136   const struct GNUNET_CADET_MessageHandler *message_handlers;
137
138   /**
139    * Number of handlers in the handlers array.
140    *
141    * @deprecated
142    */
143   unsigned int n_handlers;
144
145   /**
146    * Ports open.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap *ports;
149
150   /**
151    * Double linked list of the channels this client is connected to, head.
152    */
153   struct GNUNET_CADET_Channel *channels_head;
154
155   /**
156    * Double linked list of the channels this client is connected to, tail.
157    */
158   struct GNUNET_CADET_Channel *channels_tail;
159
160   /**
161    * Callback for inbound channel disconnection
162    */
163   GNUNET_CADET_ChannelEndHandler *cleaner;
164
165   /**
166    * Closure for all the handlers given by the client
167    *
168    * @deprecated
169    */
170   void *cls;
171
172   /**
173    * Messages to send to the service, head.
174    *
175    * @deprecated
176    */
177   struct GNUNET_CADET_TransmitHandle *th_head;
178
179   /**
180    * Messages to send to the service, tail.
181    *
182    * @deprecated
183    */
184   struct GNUNET_CADET_TransmitHandle *th_tail;
185
186   /**
187    * child of the next channel to create (to avoid reusing IDs often)
188    */
189   struct GNUNET_CADET_ClientChannelNumber next_ccn;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   struct GNUNET_SCHEDULER_Task * reconnect_task;
205
206   /**
207    * Callback for an info task (only one active at a time).
208    */
209   union CadetInfoCB info_cb;
210
211   /**
212    * Info callback closure for @c info_cb.
213    */
214   void *info_cls;
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_CADET_Peer
222 {
223   /**
224    * ID of the peer in short form
225    */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Channel this peer belongs to
230    */
231   struct GNUNET_CADET_Channel *t;
232 };
233
234
235 /**
236  * Opaque handle to a channel.
237  */
238 struct GNUNET_CADET_Channel
239 {
240   /**
241    * DLL next
242    */
243   struct GNUNET_CADET_Channel *next;
244
245   /**
246    * DLL prev
247    */
248   struct GNUNET_CADET_Channel *prev;
249
250   /**
251    * Handle to the cadet this channel belongs to
252    */
253   struct GNUNET_CADET_Handle *cadet;
254
255   /**
256    * Local ID of the channel
257    */
258   struct GNUNET_CADET_ClientChannelNumber ccn;
259
260   /**
261    * Channel's port, if incoming.
262    */
263   struct GNUNET_CADET_Port *incoming_port;
264
265   /**
266    * Other end of the channel.
267    */
268   GNUNET_PEER_Id peer;
269
270   /**
271    * Any data the caller wants to put in here
272    */
273   void *ctx;
274
275   /**
276    * Channel options: reliability, etc.
277    */
278   enum GNUNET_CADET_ChannelOption options;
279
280   /**
281    * Are we allowed to send to the service?
282    *
283    * @deprecated?
284    */
285   unsigned int allow_send;
286
287   /*****************************    MQ     ************************************/
288   /**
289    * Message Queue for the channel.
290    */
291   struct GNUNET_MQ_Handle *mq;
292
293   /**
294    * Window change handler.
295    */
296   GNUNET_CADET_WindowSizeEventHandler window_changes;
297
298   /**
299    * Disconnect handler.
300    */
301   GNUNET_CADET_DisconnectEventHandler disconnects;
302
303 };
304
305
306 /**
307  * Opaque handle to a port.
308  */
309 struct GNUNET_CADET_Port
310 {
311   /**
312    * Handle to the CADET session this port belongs to.
313    */
314   struct GNUNET_CADET_Handle *cadet;
315
316   /**
317    * Port ID.
318    */
319   struct GNUNET_HashCode *hash;
320
321   /**
322    * Callback handler for incoming channels on this port.
323    */
324   GNUNET_CADET_InboundChannelNotificationHandler *handler;
325
326   /**
327    * Closure for @a handler.
328    */
329   void *cls;
330
331   /*****************************    MQ     ************************************/
332
333   /**
334    * Port "number"
335    */
336   struct GNUNET_HashCode id;
337
338   /**
339    * Handler for incoming channels on this port
340    */
341   GNUNET_CADET_ConnectEventHandler connects;
342
343   /**
344    * Closure for @ref connects
345    */
346   void * connects_cls;
347
348   /**
349    * Window size change handler.
350    */
351   GNUNET_CADET_WindowSizeEventHandler window_changes;
352
353   /**
354    * Handler called when an incoming channel is destroyed..
355    */
356   GNUNET_CADET_DisconnectEventHandler disconnects;
357
358   /**
359    * Payload handlers for incoming channels.
360    */
361   const struct GNUNET_MQ_MessageHandler *handlers;
362 };
363
364
365 /**
366  * Implementation state for cadet's message queue.
367  */
368 struct CadetMQState
369 {
370   /**
371    * The current transmit handle, or NULL
372    * if no transmit is active.
373    */
374   struct GNUNET_CADET_TransmitHandle *th;
375
376   /**
377    * Channel to send the data over.
378    */
379   struct GNUNET_CADET_Channel *channel;
380 };
381
382
383
384 /******************************************************************************/
385 /*********************      FUNCTION DECLARATIONS     *************************/
386 /******************************************************************************/
387
388 /**
389  * Reconnect to the service, retransmit all infomation to try to restore the
390  * original state.
391  *
392  * @param h Handle to the CADET service.
393  */
394 static void
395 schedule_reconnect (struct GNUNET_CADET_Handle *h);
396
397
398 /**
399  * Reconnect callback: tries to reconnect again after a failer previous
400  * reconnection.
401  *
402  * @param cls Closure (cadet handle).
403  */
404 static void
405 reconnect_cbk (void *cls);
406
407
408 /**
409  * Reconnect to the service, retransmit all infomation to try to restore the
410  * original state.
411  *
412  * @param h handle to the cadet
413  */
414 static void
415 reconnect (struct GNUNET_CADET_Handle *h);
416
417
418 /******************************************************************************/
419 /***********************     AUXILIARY FUNCTIONS      *************************/
420 /******************************************************************************/
421
422 /**
423  * Check if transmission is a payload packet.
424  *
425  * @param th Transmission handle.
426  *
427  * @return #GNUNET_YES if it is a payload packet,
428  *         #GNUNET_NO if it is a cadet management packet.
429  */
430 static int
431 th_is_payload (struct GNUNET_CADET_TransmitHandle *th)
432 {
433   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
434 }
435
436
437 /**
438  * Find the Port struct for a hash.
439  *
440  * @param h CADET handle.
441  * @param hash HashCode for the port number.
442  *
443  * @return The port handle if known, NULL otherwise.
444  */
445 static struct GNUNET_CADET_Port *
446 find_port (const struct GNUNET_CADET_Handle *h,
447            const struct GNUNET_HashCode *hash)
448 {
449   struct GNUNET_CADET_Port *p;
450
451   p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
452
453   return p;
454 }
455
456
457 /**
458  * Get the channel handler for the channel specified by id from the given handle
459  *
460  * @param h Cadet handle
461  * @param ccn ID of the wanted channel
462  * @return handle to the required channel or NULL if not found
463  */
464 static struct GNUNET_CADET_Channel *
465 retrieve_channel (struct GNUNET_CADET_Handle *h,
466                   struct GNUNET_CADET_ClientChannelNumber ccn)
467 {
468   struct GNUNET_CADET_Channel *ch;
469
470   for (ch = h->channels_head; NULL != ch; ch = ch->next)
471     if (ch->ccn.channel_of_client == ccn.channel_of_client)
472       return ch;
473   return NULL;
474 }
475
476
477 /**
478  * Create a new channel and insert it in the channel list of the cadet handle
479  *
480  * @param h Cadet handle
481  * @param ccn Desired ccn of the channel, 0 to assign one automatically.
482  *
483  * @return Handle to the created channel.
484  */
485 static struct GNUNET_CADET_Channel *
486 create_channel (struct GNUNET_CADET_Handle *h,
487                 struct GNUNET_CADET_ClientChannelNumber ccn)
488 {
489   struct GNUNET_CADET_Channel *ch;
490
491   ch = GNUNET_new (struct GNUNET_CADET_Channel);
492   GNUNET_CONTAINER_DLL_insert (h->channels_head,
493                                h->channels_tail,
494                                ch);
495   ch->cadet = h;
496   if (0 == ccn.channel_of_client)
497   {
498     ch->ccn = h->next_ccn;
499     while (NULL != retrieve_channel (h,
500                                      h->next_ccn))
501     {
502       h->next_ccn.channel_of_client
503         = htonl (1 + ntohl (h->next_ccn.channel_of_client));
504       if (0 == ntohl (h->next_ccn.channel_of_client))
505         h->next_ccn.channel_of_client
506           = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
507     }
508   }
509   else
510   {
511     ch->ccn = ccn;
512   }
513   return ch;
514 }
515
516
517 /**
518  * Destroy the specified channel.
519  * - Destroys all peers, calling the disconnect callback on each if needed
520  * - Cancels all outgoing traffic for that channel, calling respective notifys
521  * - Calls cleaner if channel was inbound
522  * - Frees all memory used
523  *
524  * @param ch Pointer to the channel.
525  * @param call_cleaner Whether to call the cleaner handler.
526  *
527  * @return Handle to the required channel or NULL if not found.
528  */
529 static void
530 destroy_channel (struct GNUNET_CADET_Channel *ch)
531 {
532   struct GNUNET_CADET_Handle *h;
533   struct GNUNET_CADET_TransmitHandle *th;
534   struct GNUNET_CADET_TransmitHandle *next;
535
536   if (NULL == ch)
537   {
538     GNUNET_break (0);
539     return;
540   }
541   h = ch->cadet;
542   LOG (GNUNET_ERROR_TYPE_DEBUG,
543        " destroy_channel %X of %p\n",
544        ch->ccn,
545        h);
546
547   GNUNET_CONTAINER_DLL_remove (h->channels_head,
548                                h->channels_tail,
549                                ch);
550
551   /* signal channel destruction */
552   if (0 != ch->peer)
553   {
554     if (NULL != h->cleaner)
555     {
556       /** @a deprecated  */
557       LOG (GNUNET_ERROR_TYPE_DEBUG,
558           " calling cleaner\n");
559       h->cleaner (h->cls, ch, ch->ctx);
560     }
561     else if (NULL != ch->disconnects)
562     {
563       LOG (GNUNET_ERROR_TYPE_DEBUG,
564           " calling disconnect handler\n");
565       ch->disconnects (ch->ctx, ch);
566     }
567     else
568     {
569       /* Application won't be aware of the channel destruction and use
570        * a pointer to free'd memory.
571        */
572       GNUNET_assert (0);
573     }
574   }
575
576   /* check that clients did not leave messages behind in the queue */
577   for (th = h->th_head; NULL != th; th = next)
578   {
579     next = th->next;
580     if (th->channel != ch)
581       continue;
582     /* Clients should have aborted their requests already.
583      * Management traffic should be ok, as clients can't cancel that.
584      * If the service crashed and we are reconnecting, it's ok.
585      */
586     GNUNET_break (GNUNET_NO == th_is_payload (th));
587     GNUNET_CADET_notify_transmit_ready_cancel (th);
588   }
589
590   if (0 != ch->peer)
591     GNUNET_PEER_change_rc (ch->peer, -1);
592   GNUNET_free (ch);
593 }
594
595
596 /**
597  * Add a transmit handle to the transmission queue and set the
598  * timeout if needed.
599  *
600  * @param h cadet handle with the queue head and tail
601  * @param th handle to the packet to be transmitted
602  */
603 static void
604 add_to_queue (struct GNUNET_CADET_Handle *h,
605               struct GNUNET_CADET_TransmitHandle *th)
606 {
607   GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
608                                     h->th_tail,
609                                     th);
610 }
611
612
613 /**
614  * Remove a transmit handle from the transmission queue, if present.
615  *
616  * Safe to call even if not queued.
617  *
618  * @param th handle to the packet to be unqueued.
619  */
620 static void
621 remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
622 {
623   struct GNUNET_CADET_Handle *h = th->channel->cadet;
624
625   /* It might or might not have been queued (rarely not), but check anyway. */
626   if (NULL != th->next || h->th_tail == th)
627   {
628     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
629   }
630 }
631
632
633 /******************************************************************************/
634 /***********************      MQ API CALLBACKS     ****************************/
635 /******************************************************************************/
636
637
638 /**
639  * Implement sending functionality of a message queue for
640  * us sending messages to a peer.
641  *
642  * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
643  * in order to label the message with the channel ID and send the
644  * encapsulated message to the service.
645  *
646  * @param mq the message queue
647  * @param msg the message to send
648  * @param impl_state state of the implementation
649  */
650 static void
651 cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
652                     const struct GNUNET_MessageHeader *msg,
653                     void *impl_state)
654 {
655   struct GNUNET_CADET_Channel *ch = impl_state;
656   struct GNUNET_CADET_Handle *h = ch->cadet;
657   uint16_t msize;
658   struct GNUNET_MQ_Envelope *env;
659   struct GNUNET_CADET_LocalData *cadet_msg;
660
661
662   if (NULL == h->mq)
663   {
664     /* We're currently reconnecting, pretend this worked */
665     GNUNET_MQ_impl_send_continue (mq);
666     return;
667   }
668
669   /* check message size for sanity */
670   msize = ntohs (msg->size);
671   if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
672   {
673     GNUNET_break (0);
674     GNUNET_MQ_impl_send_continue (mq);
675     return;
676   }
677
678   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
679                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
680                                  msg);
681   cadet_msg->ccn = ch->ccn;
682   GNUNET_MQ_send (h->mq, env);
683   GNUNET_MQ_impl_send_continue (mq);
684 }
685
686
687 /**
688  * Handle destruction of a message queue.  Implementations must not
689  * free @a mq, but should take care of @a impl_state.
690  *
691  * @param mq the message queue to destroy
692  * @param impl_state state of the implementation
693  */
694 static void
695 cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
696                        void *impl_state)
697 {
698   struct GNUNET_CADET_Channel *ch = impl_state;
699
700   GNUNET_assert (mq == ch->mq);
701   ch->mq = NULL;
702 }
703
704
705 /**
706  * We had an error processing a message we forwarded from a peer to
707  * the CADET service.  We should just complain about it but otherwise
708  * continue processing.
709  *
710  * @param cls closure
711  * @param error error code
712  */
713 static void
714 cadet_mq_error_handler (void *cls,
715                         enum GNUNET_MQ_Error error)
716 {
717   GNUNET_break_op (0);
718 }
719
720
721 /**
722  * Implementation function that cancels the currently sent message.
723  * Should basically undo whatever #mq_send_impl() did.
724  *
725  * @param mq message queue
726  * @param impl_state state specific to the implementation
727  */
728 static void
729 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
730                      void *impl_state)
731 {
732   struct GNUNET_CADET_Channel *ch = impl_state;
733
734   LOG (GNUNET_ERROR_TYPE_WARNING,
735        "Cannot cancel mq message on channel %X of %p\n",
736        ch->ccn.channel_of_client, ch->cadet);
737
738   GNUNET_break (0);
739 }
740
741
742 /******************************************************************************/
743 /***********************      RECEIVE HANDLERS     ****************************/
744 /******************************************************************************/
745
746
747 /**
748  * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
749  * request the data to send over MQ. Since MQ manages the queue, this function
750  * is scheduled immediatly after a transmit ready notification.
751  *
752  * @param cls Closure (transmit handle).
753  */
754 static void
755 request_data (void *cls)
756 {
757   struct GNUNET_CADET_TransmitHandle *th = cls;
758   struct GNUNET_CADET_LocalData *msg;
759   struct GNUNET_MQ_Envelope *env;
760   size_t osize;
761
762   LOG (GNUNET_ERROR_TYPE_DEBUG,
763        "Requesting Data: %u bytes (allow send is %u)\n",
764        th->size,
765        th->channel->allow_send);
766
767   GNUNET_assert (0 < th->channel->allow_send);
768   th->channel->allow_send--;
769   /* NOTE: we may be allowed to send another packet immediately,
770      albeit the current logic waits for the ACK. */
771   th->request_data_task = NULL;
772   remove_from_queue (th);
773
774   env = GNUNET_MQ_msg_extra (msg,
775                              th->size,
776                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
777   msg->ccn = th->channel->ccn;
778   osize = th->notify (th->notify_cls,
779                       th->size,
780                       &msg[1]);
781   GNUNET_assert (osize == th->size);
782
783   GNUNET_MQ_send (th->channel->cadet->mq,
784                   env);
785   GNUNET_free (th);
786 }
787
788
789 /**
790  * Process the new channel notification and add it to the channels in the handle
791  *
792  * @param h     The cadet handle
793  * @param msg   A message with the details of the new incoming channel
794  */
795 static void
796 handle_channel_created (void *cls,
797                         const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
798 {
799   struct GNUNET_CADET_Handle *h = cls;
800   struct GNUNET_CADET_Channel *ch;
801   struct GNUNET_CADET_Port *port;
802   const struct GNUNET_HashCode *port_number;
803   struct GNUNET_CADET_ClientChannelNumber ccn;
804
805   ccn = msg->ccn;
806   port_number = &msg->port;
807   if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
808   {
809     GNUNET_break (0);
810     return;
811   }
812   port = find_port (h, port_number);
813   if (NULL == port)
814   {
815     /* We could have closed the port but the service didn't know about it yet
816      * This is not an error.
817      */
818     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
819     struct GNUNET_MQ_Envelope *env;
820
821     GNUNET_break (0);
822     LOG (GNUNET_ERROR_TYPE_DEBUG,
823          "No handler for incoming channel %X (on port %s, recently closed?)\n",
824          ntohl (ccn.channel_of_client),
825          GNUNET_h2s (port_number));
826     env = GNUNET_MQ_msg (d_msg,
827                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
828     d_msg->ccn = msg->ccn;
829     GNUNET_MQ_send (h->mq,
830                     env);
831     return;
832   }
833
834   ch = create_channel (h,
835                        ccn);
836   ch->peer = GNUNET_PEER_intern (&msg->peer);
837   ch->cadet = h;
838   ch->ccn = ccn;
839   ch->incoming_port = port;
840   ch->options = ntohl (msg->opt);
841   LOG (GNUNET_ERROR_TYPE_DEBUG,
842        "Creating incoming channel %X [%s] %p\n",
843        ntohl (ccn.channel_of_client),
844        GNUNET_h2s (port_number),
845        ch);
846
847   if (NULL != port->handler)
848   {
849     /** @deprecated */
850     /* Old style API */
851     ch->ctx = port->handler (port->cls,
852                             ch,
853                             &msg->peer,
854                             port->hash,
855                             ch->options);
856   } else {
857     /* MQ API */
858     GNUNET_assert (NULL != port->connects);
859     ch->window_changes = port->window_changes;
860     ch->disconnects = port->disconnects;
861     ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
862                                             &cadet_mq_destroy_impl,
863                                             &cadet_mq_cancel_impl,
864                                             ch,
865                                             port->handlers,
866                                             &cadet_mq_error_handler,
867                                             ch);
868     ch->ctx = port->connects (port->cadet->cls,
869                               ch,
870                               &msg->peer);
871     GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
872   }
873 }
874
875
876 /**
877  * Process the channel destroy notification and free associated resources
878  *
879  * @param h     The cadet handle
880  * @param msg   A message with the details of the channel being destroyed
881  */
882 static void
883 handle_channel_destroy (void *cls,
884                         const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
885 {
886   struct GNUNET_CADET_Handle *h = cls;
887   struct GNUNET_CADET_Channel *ch;
888   struct GNUNET_CADET_ClientChannelNumber ccn;
889
890   ccn = msg->ccn;
891   LOG (GNUNET_ERROR_TYPE_DEBUG,
892        "Channel %X Destroy from service\n",
893        ntohl (ccn.channel_of_client));
894   ch = retrieve_channel (h,
895                          ccn);
896
897   if (NULL == ch)
898   {
899     LOG (GNUNET_ERROR_TYPE_DEBUG,
900          "channel %X unknown\n",
901          ntohl (ccn.channel_of_client));
902     return;
903   }
904   destroy_channel (ch);
905 }
906
907
908 /**
909  * Check that message received from CADET service is well-formed.
910  *
911  * @param cls the `struct GNUNET_CADET_Handle`
912  * @param message the message we got
913  * @return #GNUNET_OK if the message is well-formed,
914  *         #GNUNET_SYSERR otherwise
915  */
916 static int
917 check_local_data (void *cls,
918                   const struct GNUNET_CADET_LocalData *message)
919 {
920   struct GNUNET_CADET_Handle *h = cls;
921   struct GNUNET_CADET_Channel *ch;
922   uint16_t size;
923
924   size = ntohs (message->header.size);
925   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
926   {
927     GNUNET_break_op (0);
928     return GNUNET_SYSERR;
929   }
930
931   ch = retrieve_channel (h,
932                          message->ccn);
933   if (NULL == ch)
934   {
935     GNUNET_break_op (0);
936     return GNUNET_SYSERR;
937   }
938
939   return GNUNET_OK;
940 }
941
942
943 /**
944  * Process the incoming data packets, call appropriate handlers.
945  *
946  * @param h       The cadet handle
947  * @param message A message encapsulating the data
948  */
949 static void
950 handle_local_data (void *cls,
951                    const struct GNUNET_CADET_LocalData *message)
952 {
953   struct GNUNET_CADET_Handle *h = cls;
954   const struct GNUNET_MessageHeader *payload;
955   const struct GNUNET_CADET_MessageHandler *handler;
956   struct GNUNET_CADET_Channel *ch;
957   uint16_t type;
958
959   ch = retrieve_channel (h,
960                          message->ccn);
961   GNUNET_assert (NULL != ch);
962
963   payload = (struct GNUNET_MessageHeader *) &message[1];
964   type = ntohs (payload->type);
965   LOG (GNUNET_ERROR_TYPE_DEBUG,
966        "Got a %s data on channel %s [%X] of type %s (%u)\n",
967        GC_f2s (ntohl (ch->ccn.channel_of_client) >=
968                GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
969        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
970        ntohl (message->ccn.channel_of_client),
971        GC_m2s (type),
972        type);
973   for (unsigned i=0;i<h->n_handlers;i++)
974   {
975     handler = &h->message_handlers[i];
976     if (handler->type == type)
977     {
978       if (GNUNET_OK !=
979           handler->callback (h->cls,
980                              ch,
981                              &ch->ctx,
982                              payload))
983       {
984         LOG (GNUNET_ERROR_TYPE_DEBUG,
985              "callback caused disconnection\n");
986         GNUNET_CADET_channel_destroy (ch);
987         return;
988       }
989       return;
990     }
991   }
992   /* Other peer sent message we do not comprehend. */
993   GNUNET_break_op (0);
994   GNUNET_CADET_receive_done (ch);
995 }
996
997
998 /**
999  * Process a local ACK message, enabling the client to send
1000  * more data to the service.
1001  *
1002  * @param h Cadet handle.
1003  * @param message Message itself.
1004  */
1005 static void
1006 handle_local_ack (void *cls,
1007                   const struct GNUNET_CADET_LocalAck *message)
1008 {
1009   struct GNUNET_CADET_Handle *h = cls;
1010   struct GNUNET_CADET_Channel *ch;
1011   struct GNUNET_CADET_ClientChannelNumber ccn;
1012   struct GNUNET_CADET_TransmitHandle *th;
1013
1014   ccn = message->ccn;
1015   ch = retrieve_channel (h, ccn);
1016   if (NULL == ch)
1017   {
1018     LOG (GNUNET_ERROR_TYPE_DEBUG,
1019          "ACK on unknown channel %X\n",
1020          ntohl (ccn.channel_of_client));
1021     return;
1022   }
1023   ch->allow_send++;
1024   LOG (GNUNET_ERROR_TYPE_DEBUG,
1025        "Got an ACK on channel %X, allow send now %u!\n",
1026        ntohl (ch->ccn.channel_of_client),
1027        ch->allow_send);
1028   for (th = h->th_head; NULL != th; th = th->next)
1029   {
1030     if ( (th->channel == ch) &&
1031          (NULL == th->request_data_task) )
1032     {
1033       th->request_data_task
1034         = GNUNET_SCHEDULER_add_now (&request_data,
1035                                     th);
1036       break;
1037     }
1038   }
1039 }
1040
1041
1042 /**
1043  * Generic error handler, called with the appropriate error code and
1044  * the same closure specified at the creation of the message queue.
1045  * Not every message queue implementation supports an error handler.
1046  *
1047  * @param cls closure, a `struct GNUNET_CORE_Handle *`
1048  * @param error error code
1049  */
1050 static void
1051 handle_mq_error (void *cls,
1052                  enum GNUNET_MQ_Error error)
1053 {
1054   struct GNUNET_CADET_Handle *h = cls;
1055
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
1057   GNUNET_MQ_destroy (h->mq);
1058   h->mq = NULL;
1059   reconnect (h);
1060 }
1061
1062
1063 /*
1064  * Process a local reply about info on all channels, pass info to the user.
1065  *
1066  * @param h Cadet handle.
1067  * @param message Message itself.
1068  */
1069 // static void
1070 // process_get_channels (struct GNUNET_CADET_Handle *h,
1071 //                      const struct GNUNET_MessageHeader *message)
1072 // {
1073 //   struct GNUNET_CADET_LocalInfo *msg;
1074 //
1075 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
1076 //
1077 //   if (NULL == h->channels_cb)
1078 //   {
1079 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1080 //     return;
1081 //   }
1082 //
1083 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1084 //   if (ntohs (message->size) !=
1085 //       (sizeof (struct GNUNET_CADET_LocalInfo) +
1086 //        sizeof (struct GNUNET_PeerIdentity)))
1087 //   {
1088 //     GNUNET_break_op (0);
1089 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1090 //                 "Get channels message: size %hu - expected %u\n",
1091 //                 ntohs (message->size),
1092 //                 sizeof (struct GNUNET_CADET_LocalInfo));
1093 //     return;
1094 //   }
1095 //   h->channels_cb (h->channels_cls,
1096 //                   ntohl (msg->channel_id),
1097 //                   &msg->owner,
1098 //                   &msg->destination);
1099 // }
1100
1101
1102
1103 /*
1104  * Process a local monitor_channel reply, pass info to the user.
1105  *
1106  * @param h Cadet handle.
1107  * @param message Message itself.
1108  */
1109 // static void
1110 // process_show_channel (struct GNUNET_CADET_Handle *h,
1111 //                      const struct GNUNET_MessageHeader *message)
1112 // {
1113 //   struct GNUNET_CADET_LocalInfo *msg;
1114 //   size_t esize;
1115 //
1116 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1117 //
1118 //   if (NULL == h->channel_cb)
1119 //   {
1120 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1121 //     return;
1122 //   }
1123 //
1124 //   /* Verify message sanity */
1125 //   msg = (struct GNUNET_CADET_LocalInfo *) message;
1126 //   esize = sizeof (struct GNUNET_CADET_LocalInfo);
1127 //   if (ntohs (message->size) != esize)
1128 //   {
1129 //     GNUNET_break_op (0);
1130 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1131 //                 "Show channel message: size %hu - expected %u\n",
1132 //                 ntohs (message->size),
1133 //                 esize);
1134 //
1135 //     h->channel_cb (h->channel_cls, NULL, NULL);
1136 //     h->channel_cb = NULL;
1137 //     h->channel_cls = NULL;
1138 //
1139 //     return;
1140 //   }
1141 //
1142 //   h->channel_cb (h->channel_cls,
1143 //                  &msg->destination,
1144 //                  &msg->owner);
1145 // }
1146
1147
1148
1149 /**
1150  * Check that message received from CADET service is well-formed.
1151  *
1152  * @param cls the `struct GNUNET_CADET_Handle`
1153  * @param message the message we got
1154  * @return #GNUNET_OK if the message is well-formed,
1155  *         #GNUNET_SYSERR otherwise
1156  */
1157 static int
1158 check_get_peers (void *cls,
1159                  const struct GNUNET_CADET_LocalInfoPeer *message)
1160 {
1161   struct GNUNET_CADET_Handle *h = cls;
1162   uint16_t size;
1163
1164   if (NULL == h->info_cb.peers_cb)
1165   {
1166     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167                 "  no handler for peesr monitor message!\n");
1168     return GNUNET_SYSERR;
1169   }
1170
1171   size = ntohs (message->header.size);
1172   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
1173   {
1174     h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
1175     h->info_cb.peers_cb = NULL;
1176     h->info_cls = NULL;
1177     return GNUNET_SYSERR;
1178   }
1179
1180   return GNUNET_OK;
1181 }
1182
1183
1184 /**
1185  * Process a local reply about info on all tunnels, pass info to the user.
1186  *
1187  * @param cls Closure (Cadet handle).
1188  * @param msg Message itself.
1189  */
1190 static void
1191 handle_get_peers (void *cls,
1192                   const struct GNUNET_CADET_LocalInfoPeer *msg)
1193 {
1194   struct GNUNET_CADET_Handle *h = cls;
1195   h->info_cb.peers_cb (h->info_cls, &msg->destination,
1196                        (int) ntohs (msg->tunnel),
1197                        (unsigned int ) ntohs (msg->paths),
1198                        0);
1199 }
1200
1201
1202 /**
1203  * Check that message received from CADET service is well-formed.
1204  *
1205  * @param cls the `struct GNUNET_CADET_Handle`
1206  * @param message the message we got
1207  * @return #GNUNET_OK if the message is well-formed,
1208  *         #GNUNET_SYSERR otherwise
1209  */
1210 static int
1211 check_get_peer (void *cls,
1212                 const struct GNUNET_CADET_LocalInfoPeer *message)
1213 {
1214   struct GNUNET_CADET_Handle *h = cls;
1215   const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
1216   struct GNUNET_PeerIdentity *paths_array;
1217   size_t esize;
1218   unsigned int epaths;
1219   unsigned int paths;
1220   unsigned int peers;
1221
1222   if (NULL == h->info_cb.peer_cb)
1223   {
1224     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1225                 "  no handler for peer monitor message!\n");
1226     goto clean_cls;
1227   }
1228
1229   /* Verify message sanity */
1230   esize = ntohs (message->header.size);
1231   if (esize < msize)
1232   {
1233     GNUNET_break_op (0);
1234     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1235     goto clean_cls;
1236   }
1237   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
1238   {
1239     GNUNET_break_op (0);
1240     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1241     goto clean_cls;
1242
1243   }
1244   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
1245   epaths = (unsigned int) ntohs (message->paths);
1246   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1247   paths = 0;
1248   for (int i = 0; i < peers; i++)
1249   {
1250     if (0 == memcmp (&paths_array[i], &message->destination,
1251                      sizeof (struct GNUNET_PeerIdentity)))
1252     {
1253       paths++;
1254     }
1255   }
1256   if (paths != epaths)
1257   {
1258     GNUNET_break_op (0);
1259     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
1260     h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
1261     goto clean_cls;
1262   }
1263
1264   return GNUNET_OK;
1265
1266 clean_cls:
1267   h->info_cb.peer_cb = NULL;
1268   h->info_cls = NULL;
1269   return GNUNET_SYSERR;
1270 }
1271
1272
1273 /**
1274  * Process a local peer info reply, pass info to the user.
1275  *
1276  * @param cls Closure (Cadet handle).
1277  * @param message Message itself.
1278  */
1279 static void
1280 handle_get_peer (void *cls,
1281                  const struct GNUNET_CADET_LocalInfoPeer *message)
1282 {
1283   struct GNUNET_CADET_Handle *h = cls;
1284   struct GNUNET_PeerIdentity *paths_array;
1285   unsigned int paths;
1286   unsigned int path_length;
1287   int neighbor;
1288   unsigned int peers;
1289
1290   paths = (unsigned int) ntohs (message->paths);
1291   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1292   peers = (ntohs (message->header.size) - sizeof (*message))
1293           / sizeof (struct GNUNET_PeerIdentity);
1294   path_length = 0;
1295   neighbor = GNUNET_NO;
1296
1297   for (int i = 0; i < peers; i++)
1298   {
1299     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
1300     path_length++;
1301     if (0 == memcmp (&paths_array[i], &message->destination,
1302                      sizeof (struct GNUNET_PeerIdentity)))
1303     {
1304       if (1 == path_length)
1305         neighbor = GNUNET_YES;
1306       path_length = 0;
1307     }
1308   }
1309
1310   /* Call Callback with tunnel info. */
1311   paths_array = (struct GNUNET_PeerIdentity *) &message[1];
1312   h->info_cb.peer_cb (h->info_cls,
1313                       &message->destination,
1314                       (int) ntohs (message->tunnel),
1315                       neighbor,
1316                       paths,
1317                       paths_array);
1318 }
1319
1320
1321 /**
1322  * Check that message received from CADET service is well-formed.
1323  *
1324  * @param cls the `struct GNUNET_CADET_Handle`
1325  * @param msg the message we got
1326  * @return #GNUNET_OK if the message is well-formed,
1327  *         #GNUNET_SYSERR otherwise
1328  */
1329 static int
1330 check_get_tunnels (void *cls,
1331                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1332 {
1333   struct GNUNET_CADET_Handle *h = cls;
1334   uint16_t size;
1335
1336   if (NULL == h->info_cb.tunnels_cb)
1337   {
1338     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1339                 "  no handler for tunnels monitor message!\n");
1340     return GNUNET_SYSERR;
1341   }
1342
1343   size = ntohs (msg->header.size);
1344   if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
1345   {
1346     h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
1347     h->info_cb.tunnels_cb = NULL;
1348     h->info_cls = NULL;
1349     return GNUNET_SYSERR;
1350   }
1351   return GNUNET_OK;
1352 }
1353
1354
1355 /**
1356  * Process a local reply about info on all tunnels, pass info to the user.
1357  *
1358  * @param cls Closure (Cadet handle).
1359  * @param message Message itself.
1360  */
1361 static void
1362 handle_get_tunnels (void *cls,
1363                     const struct GNUNET_CADET_LocalInfoTunnel *msg)
1364 {
1365   struct GNUNET_CADET_Handle *h = cls;
1366
1367   h->info_cb.tunnels_cb (h->info_cls,
1368                          &msg->destination,
1369                          ntohl (msg->channels),
1370                          ntohl (msg->connections),
1371                          ntohs (msg->estate),
1372                          ntohs (msg->cstate));
1373
1374 }
1375
1376
1377 /**
1378  * Check that message received from CADET service is well-formed.
1379  *
1380  * @param cls the `struct GNUNET_CADET_Handle`
1381  * @param msg the message we got
1382  * @return #GNUNET_OK if the message is well-formed,
1383  *         #GNUNET_SYSERR otherwise
1384  */
1385 static int
1386 check_get_tunnel (void *cls,
1387                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
1388 {
1389   struct GNUNET_CADET_Handle *h = cls;
1390   unsigned int ch_n;
1391   unsigned int c_n;
1392   size_t esize;
1393   size_t msize;
1394
1395   if (NULL == h->info_cb.tunnel_cb)
1396   {
1397     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1398                 "  no handler for tunnel monitor message!\n");
1399     goto clean_cls;
1400   }
1401
1402   /* Verify message sanity */
1403   msize = ntohs (msg->header.size);
1404   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
1405   if (esize > msize)
1406   {
1407     GNUNET_break_op (0);
1408     h->info_cb.tunnel_cb (h->info_cls,
1409                           NULL, 0, 0, NULL, NULL, 0, 0);
1410     goto clean_cls;
1411   }
1412   ch_n = ntohl (msg->channels);
1413   c_n = ntohl (msg->connections);
1414   esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber);
1415   esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier);
1416   if (msize != esize)
1417   {
1418     GNUNET_break_op (0);
1419     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1420                 "m:%u, e: %u (%u ch, %u conn)\n",
1421                 (unsigned int) msize,
1422                 (unsigned int) esize,
1423                 ch_n,
1424                 c_n);
1425     h->info_cb.tunnel_cb (h->info_cls,
1426                           NULL, 0, 0, NULL, NULL, 0, 0);
1427     goto clean_cls;
1428   }
1429
1430   return GNUNET_OK;
1431
1432 clean_cls:
1433   h->info_cb.tunnel_cb = NULL;
1434   h->info_cls = NULL;
1435   return GNUNET_SYSERR;
1436 }
1437
1438
1439 /**
1440  * Process a local tunnel info reply, pass info to the user.
1441  *
1442  * @param cls Closure (Cadet handle).
1443  * @param msg Message itself.
1444  */
1445 static void
1446 handle_get_tunnel (void *cls,
1447                    const struct GNUNET_CADET_LocalInfoTunnel *msg)
1448 {
1449   struct GNUNET_CADET_Handle *h = cls;
1450   unsigned int ch_n;
1451   unsigned int c_n;
1452   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
1453   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
1454
1455   ch_n = ntohl (msg->channels);
1456   c_n = ntohl (msg->connections);
1457
1458   /* Call Callback with tunnel info. */
1459   conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
1460   chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n];
1461   h->info_cb.tunnel_cb (h->info_cls,
1462                         &msg->destination,
1463                         ch_n,
1464                         c_n,
1465                         chns,
1466                         conns,
1467                         ntohs (msg->estate),
1468                         ntohs (msg->cstate));
1469 }
1470
1471
1472 /**
1473  * Reconnect to the service, retransmit all infomation to try to restore the
1474  * original state.
1475  *
1476  * @param h handle to the cadet
1477  */
1478 static void
1479 reconnect (struct GNUNET_CADET_Handle *h)
1480 {
1481   struct GNUNET_MQ_MessageHandler handlers[] = {
1482     GNUNET_MQ_hd_fixed_size (channel_created,
1483                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
1484                              struct GNUNET_CADET_LocalChannelCreateMessage,
1485                              h),
1486     GNUNET_MQ_hd_fixed_size (channel_destroy,
1487                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
1488                              struct GNUNET_CADET_LocalChannelDestroyMessage,
1489                              h),
1490     GNUNET_MQ_hd_var_size (local_data,
1491                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
1492                            struct GNUNET_CADET_LocalData,
1493                            h),
1494     GNUNET_MQ_hd_fixed_size (local_ack,
1495                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
1496                              struct GNUNET_CADET_LocalAck,
1497                              h),
1498     GNUNET_MQ_hd_var_size (get_peers,
1499                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
1500                            struct GNUNET_CADET_LocalInfoPeer,
1501                            h),
1502     GNUNET_MQ_hd_var_size (get_peer,
1503                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
1504                            struct GNUNET_CADET_LocalInfoPeer,
1505                            h),
1506     GNUNET_MQ_hd_var_size (get_tunnels,
1507                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
1508                            struct GNUNET_CADET_LocalInfoTunnel,
1509                            h),
1510     GNUNET_MQ_hd_var_size (get_tunnel,
1511                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
1512                            struct GNUNET_CADET_LocalInfoTunnel,
1513                            h),
1514 // FIXME
1515 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
1516 //                        GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
1517 //                        struct GNUNET_CADET_ChannelDestroyMessage);
1518     GNUNET_MQ_handler_end ()
1519   };
1520   struct GNUNET_CADET_Channel *ch;
1521
1522   while (NULL != (ch = h->channels_head))
1523   {
1524     LOG (GNUNET_ERROR_TYPE_DEBUG,
1525          "Destroying channel due to a reconnect\n");
1526     destroy_channel (ch);
1527   }
1528
1529   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
1530
1531   if (NULL != h->mq)
1532   {
1533     GNUNET_MQ_destroy (h->mq);
1534     h->mq = NULL;
1535   }
1536   h->mq = GNUNET_CLIENT_connect (h->cfg,
1537                                  "cadet",
1538                                  handlers,
1539                                  &handle_mq_error,
1540                                  h);
1541   if (NULL == h->mq)
1542   {
1543     schedule_reconnect (h);
1544     return;
1545   }
1546   else
1547   {
1548     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1549   }
1550 }
1551
1552 /**
1553  * Reconnect callback: tries to reconnect again after a failer previous
1554  * reconnecttion
1555  *
1556  * @param cls closure (cadet handle)
1557  */
1558 static void
1559 reconnect_cbk (void *cls)
1560 {
1561   struct GNUNET_CADET_Handle *h = cls;
1562
1563   h->reconnect_task = NULL;
1564   reconnect (h);
1565 }
1566
1567
1568 /**
1569  * Reconnect to the service, retransmit all infomation to try to restore the
1570  * original state.
1571  *
1572  * @param h handle to the cadet
1573  *
1574  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
1575  */
1576 static void
1577 schedule_reconnect (struct GNUNET_CADET_Handle *h)
1578 {
1579   if (NULL == h->reconnect_task)
1580   {
1581     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
1582                                                       &reconnect_cbk, h);
1583     h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
1584   }
1585 }
1586
1587
1588 /******************************************************************************/
1589 /**********************      API CALL DEFINITIONS     *************************/
1590 /******************************************************************************/
1591
1592 struct GNUNET_CADET_Handle *
1593 GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1594                       void *cls,
1595                       GNUNET_CADET_ChannelEndHandler cleaner,
1596                       const struct GNUNET_CADET_MessageHandler *handlers)
1597 {
1598   struct GNUNET_CADET_Handle *h;
1599
1600   h = GNUNET_new (struct GNUNET_CADET_Handle);
1601   LOG (GNUNET_ERROR_TYPE_DEBUG,
1602        "GNUNET_CADET_connect() %p\n",
1603        h);
1604   h->cfg = cfg;
1605   h->cleaner = cleaner;
1606   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1607   reconnect (h);
1608   if (h->mq == NULL)
1609   {
1610     GNUNET_break (0);
1611     GNUNET_CADET_disconnect (h);
1612     return NULL;
1613   }
1614   h->cls = cls;
1615   h->message_handlers = handlers;
1616   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1617   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1618   h->reconnect_task = NULL;
1619
1620   /* count handlers */
1621   for (h->n_handlers = 0;
1622        handlers && handlers[h->n_handlers].type;
1623        h->n_handlers++) ;
1624   return h;
1625 }
1626
1627
1628 /**
1629  * Disconnect from the cadet service. All channels will be destroyed. All channel
1630  * disconnect callbacks will be called on any still connected peers, notifying
1631  * about their disconnection. The registered inbound channel cleaner will be
1632  * called should any inbound channels still exist.
1633  *
1634  * @param handle connection to cadet to disconnect
1635  */
1636 void
1637 GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1638 {
1639   struct GNUNET_CADET_Channel *ch;
1640   struct GNUNET_CADET_Channel *aux;
1641   struct GNUNET_CADET_TransmitHandle *th;
1642
1643   LOG (GNUNET_ERROR_TYPE_DEBUG,
1644        "CADET DISCONNECT\n");
1645   ch = handle->channels_head;
1646   while (NULL != ch)
1647   {
1648     aux = ch->next;
1649     if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1650     {
1651       GNUNET_break (0);
1652       LOG (GNUNET_ERROR_TYPE_DEBUG,
1653            "channel %X not destroyed\n",
1654            ntohl (ch->ccn.channel_of_client));
1655     }
1656     destroy_channel (ch);
1657     ch = aux;
1658   }
1659   while (NULL != (th = handle->th_head))
1660   {
1661     struct GNUNET_MessageHeader *msg;
1662
1663     /* Make sure it is an allowed packet (everything else should have been
1664      * already canceled).
1665      */
1666     GNUNET_break (GNUNET_NO == th_is_payload (th));
1667     msg = (struct GNUNET_MessageHeader *) &th[1];
1668     switch (ntohs(msg->type))
1669     {
1670       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1671       case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1672       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN:
1673       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE:
1674       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1675       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1676       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1677       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1678       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1679       case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1680         break;
1681       default:
1682         GNUNET_break (0);
1683         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n",
1684              GC_m2s (ntohs(msg->type)));
1685     }
1686
1687     GNUNET_CADET_notify_transmit_ready_cancel (th);
1688   }
1689
1690   if (NULL != handle->mq)
1691   {
1692     GNUNET_MQ_destroy (handle->mq);
1693     handle->mq = NULL;
1694   }
1695   if (NULL != handle->reconnect_task)
1696   {
1697     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1698     handle->reconnect_task = NULL;
1699   }
1700
1701   GNUNET_CONTAINER_multihashmap_destroy (handle->ports);
1702   handle->ports = NULL;
1703   GNUNET_free (handle);
1704 }
1705
1706
1707 /**
1708  * Open a port to receive incomming channels.
1709  *
1710  * @param h CADET handle.
1711  * @param port Hash representing the port number.
1712  * @param new_channel Function called when an channel is received.
1713  * @param new_channel_cls Closure for @a new_channel.
1714  * @return Port handle.
1715  */
1716 struct GNUNET_CADET_Port *
1717 GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1718                         const struct GNUNET_HashCode *port,
1719                         GNUNET_CADET_InboundChannelNotificationHandler
1720                             new_channel,
1721                         void *new_channel_cls)
1722 {
1723   struct GNUNET_CADET_PortMessage *msg;
1724   struct GNUNET_MQ_Envelope *env;
1725   struct GNUNET_CADET_Port *p;
1726
1727   GNUNET_assert (NULL != new_channel);
1728   p = GNUNET_new (struct GNUNET_CADET_Port);
1729   p->cadet = h;
1730   p->hash = GNUNET_new (struct GNUNET_HashCode);
1731   *p->hash = *port;
1732   p->handler = new_channel;
1733   p->cls = new_channel_cls;
1734   GNUNET_assert (GNUNET_OK ==
1735                  GNUNET_CONTAINER_multihashmap_put (h->ports,
1736                                                     p->hash,
1737                                                     p,
1738                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1739
1740   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1741   msg->port = *p->hash;
1742   GNUNET_MQ_send (h->mq, env);
1743
1744   return p;
1745 }
1746
1747 /**
1748  * Close a port opened with @a GNUNET_CADET_open_port.
1749  * The @a new_channel callback will no longer be called.
1750  *
1751  * @param p Port handle.
1752  */
1753 void
1754 GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1755 {
1756   struct GNUNET_CADET_PortMessage *msg;
1757   struct GNUNET_MQ_Envelope *env;
1758   struct GNUNET_HashCode *id;
1759
1760   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1761
1762   id = NULL != p->hash ? p->hash : &p->id;
1763   msg->port = *id;
1764   GNUNET_MQ_send (p->cadet->mq, env);
1765   GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
1766   GNUNET_free_non_null (p->hash);
1767   GNUNET_free (p);
1768 }
1769
1770
1771 /**
1772  * Create a new channel towards a remote peer.
1773  *
1774  * If the destination port is not open by any peer or the destination peer
1775  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
1776  * for this channel.
1777  *
1778  * @param h cadet handle
1779  * @param channel_ctx client's channel context to associate with the channel
1780  * @param peer peer identity the channel should go to
1781  * @param port Port hash (port number).
1782  * @param options CadetOption flag field, with all desired option bits set to 1.
1783  * @return handle to the channel
1784  */
1785 struct GNUNET_CADET_Channel *
1786 GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1787                             void *channel_ctx,
1788                             const struct GNUNET_PeerIdentity *peer,
1789                             const struct GNUNET_HashCode *port,
1790                             enum GNUNET_CADET_ChannelOption options)
1791 {
1792   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
1793   struct GNUNET_MQ_Envelope *env;
1794   struct GNUNET_CADET_Channel *ch;
1795   struct GNUNET_CADET_ClientChannelNumber ccn;
1796
1797   ccn.channel_of_client = htonl (0);
1798   ch = create_channel (h, ccn);
1799   ch->ctx = channel_ctx;
1800   ch->peer = GNUNET_PEER_intern (peer);
1801
1802   LOG (GNUNET_ERROR_TYPE_DEBUG,
1803        "Creating new channel to %s:%u at %p number %X\n",
1804        GNUNET_i2s (peer),
1805        port,
1806        ch,
1807        ntohl (ch->ccn.channel_of_client));
1808   env = GNUNET_MQ_msg (msg,
1809                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1810   msg->ccn = ch->ccn;
1811   msg->port = *port;
1812   msg->peer = *peer;
1813   msg->opt = htonl (options);
1814   GNUNET_MQ_send (h->mq,
1815                   env);
1816   return ch;
1817 }
1818
1819
1820 void
1821 GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1822 {
1823   struct GNUNET_CADET_Handle *h;
1824   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
1825   struct GNUNET_MQ_Envelope *env;
1826   struct GNUNET_CADET_TransmitHandle *th;
1827   struct GNUNET_CADET_TransmitHandle *next;
1828
1829   LOG (GNUNET_ERROR_TYPE_DEBUG,
1830        "Destroying channel\n");
1831   h = channel->cadet;
1832   for  (th = h->th_head; th != NULL; th = next)
1833   {
1834     next = th->next;
1835     if (th->channel == channel)
1836     {
1837       GNUNET_break (0);
1838       if (GNUNET_YES == th_is_payload (th))
1839       {
1840         /* applications should cancel before destroying channel */
1841         LOG (GNUNET_ERROR_TYPE_WARNING,
1842              "Channel destroyed without cancelling transmission requests\n");
1843         th->notify (th->notify_cls, 0, NULL);
1844       }
1845       else
1846       {
1847         LOG (GNUNET_ERROR_TYPE_WARNING,
1848              "no meta-traffic should be queued\n");
1849       }
1850       GNUNET_CONTAINER_DLL_remove (h->th_head,
1851                                    h->th_tail,
1852                                    th);
1853       GNUNET_CADET_notify_transmit_ready_cancel (th);
1854     }
1855   }
1856
1857   env = GNUNET_MQ_msg (msg,
1858                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1859   msg->ccn = channel->ccn;
1860   GNUNET_MQ_send (h->mq,
1861                   env);
1862
1863   destroy_channel (channel);
1864 }
1865
1866
1867 /**
1868  * Get information about a channel.
1869  *
1870  * @param channel Channel handle.
1871  * @param option Query (GNUNET_CADET_OPTION_*).
1872  * @param ... dependant on option, currently not used
1873  *
1874  * @return Union with an answer to the query.
1875  */
1876 const union GNUNET_CADET_ChannelInfo *
1877 GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1878                               enum GNUNET_CADET_ChannelOption option, ...)
1879 {
1880   static int bool_flag;
1881   const union GNUNET_CADET_ChannelInfo *ret;
1882
1883   switch (option)
1884   {
1885     case GNUNET_CADET_OPTION_NOBUFFER:
1886     case GNUNET_CADET_OPTION_RELIABLE:
1887     case GNUNET_CADET_OPTION_OUT_OF_ORDER:
1888       if (0 != (option & channel->options))
1889         bool_flag = GNUNET_YES;
1890       else
1891         bool_flag = GNUNET_NO;
1892       ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1893       break;
1894     case GNUNET_CADET_OPTION_PEER:
1895       ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1896       break;
1897     default:
1898       GNUNET_break (0);
1899       return NULL;
1900   }
1901
1902   return ret;
1903 }
1904
1905
1906 struct GNUNET_CADET_TransmitHandle *
1907 GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
1908                                     int cork,
1909                                     struct GNUNET_TIME_Relative maxdelay,
1910                                     size_t notify_size,
1911                                     GNUNET_CONNECTION_TransmitReadyNotify notify,
1912                                     void *notify_cls)
1913 {
1914   struct GNUNET_CADET_TransmitHandle *th;
1915
1916   GNUNET_assert (NULL != channel);
1917   GNUNET_assert (NULL != notify);
1918   GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
1919   LOG (GNUNET_ERROR_TYPE_DEBUG,
1920        "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
1921        ntohl (channel->ccn.channel_of_client),
1922        channel->allow_send,
1923        (ntohl (channel->ccn.channel_of_client) >=
1924         GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
1925        ? "origin"
1926        : "destination",
1927        (unsigned int) notify_size);
1928   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
1929   {
1930     LOG (GNUNET_ERROR_TYPE_WARNING,
1931          "CADET transmit ready timeout is deprected (has no effect)\n");
1932   }
1933
1934   th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
1935   th->channel = channel;
1936   th->size = notify_size;
1937   th->notify = notify;
1938   th->notify_cls = notify_cls;
1939   if (0 != channel->allow_send)
1940     th->request_data_task
1941       = GNUNET_SCHEDULER_add_now (&request_data,
1942                                   th);
1943   else
1944     add_to_queue (channel->cadet,
1945                   th);
1946   return th;
1947 }
1948
1949
1950 void
1951 GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
1952 {
1953   if (NULL != th->request_data_task)
1954   {
1955     GNUNET_SCHEDULER_cancel (th->request_data_task);
1956     th->request_data_task = NULL;
1957   }
1958   remove_from_queue (th);
1959   GNUNET_free (th);
1960 }
1961
1962
1963 /**
1964  * Send an ack on the channel to confirm the processing of a message.
1965  *
1966  * @param ch Channel on which to send the ACK.
1967  */
1968 void
1969 GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
1970 {
1971   struct GNUNET_CADET_LocalAck *msg;
1972   struct GNUNET_MQ_Envelope *env;
1973
1974   env = GNUNET_MQ_msg (msg,
1975                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1976   LOG (GNUNET_ERROR_TYPE_DEBUG,
1977        "Sending ACK on channel %X\n",
1978        ntohl (channel->ccn.channel_of_client));
1979   msg->ccn = channel->ccn;
1980   GNUNET_MQ_send (channel->cadet->mq,
1981                   env);
1982 }
1983
1984
1985 static void
1986 send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
1987 {
1988   struct GNUNET_MessageHeader *msg;
1989   struct GNUNET_MQ_Envelope *env;
1990
1991   LOG (GNUNET_ERROR_TYPE_DEBUG,
1992        " Sending %s monitor message to service\n",
1993        GC_m2s(type));
1994
1995   env = GNUNET_MQ_msg (msg, type);
1996   GNUNET_MQ_send (h->mq, env);
1997 }
1998
1999
2000 /**
2001  * Request a debug dump on the service's STDERR.
2002  *
2003  * WARNING: unstable API, likely to change in the future!
2004  *
2005  * @param h cadet handle
2006  */
2007 void
2008 GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h)
2009 {
2010   LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n");
2011   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP);
2012 }
2013
2014
2015 /**
2016  * Request information about peers known to the running cadet service.
2017  * The callback will be called for every peer known to the service.
2018  * Only one info request (of any kind) can be active at once.
2019  *
2020  *
2021  * WARNING: unstable API, likely to change in the future!
2022  *
2023  * @param h Handle to the cadet peer.
2024  * @param callback Function to call with the requested data.
2025  * @param callback_cls Closure for @c callback.
2026  *
2027  * @return #GNUNET_OK / #GNUNET_SYSERR
2028  */
2029 int
2030 GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h,
2031                        GNUNET_CADET_PeersCB callback,
2032                        void *callback_cls)
2033 {
2034   if (NULL != h->info_cb.peers_cb)
2035   {
2036     GNUNET_break (0);
2037     return GNUNET_SYSERR;
2038   }
2039   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS);
2040   h->info_cb.peers_cb = callback;
2041   h->info_cls = callback_cls;
2042   return GNUNET_OK;
2043 }
2044
2045
2046 /**
2047  * Cancel a peer info request. The callback will not be called (anymore).
2048  *
2049  * WARNING: unstable API, likely to change in the future!
2050  *
2051  * @param h Cadet handle.
2052  *
2053  * @return Closure given to GNUNET_CADET_get_peers.
2054  */
2055 void *
2056 GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
2057 {
2058   void *cls;
2059
2060   cls = h->info_cls;
2061   h->info_cb.peers_cb = NULL;
2062   h->info_cls = NULL;
2063   return cls;
2064 }
2065
2066
2067 /**
2068  * Request information about a peer known to the running cadet peer.
2069  * The callback will be called for the tunnel once.
2070  * Only one info request (of any kind) can be active at once.
2071  *
2072  * WARNING: unstable API, likely to change in the future!
2073  *
2074  * @param h Handle to the cadet peer.
2075  * @param id Peer whose tunnel to examine.
2076  * @param callback Function to call with the requested data.
2077  * @param callback_cls Closure for @c callback.
2078  *
2079  * @return #GNUNET_OK / #GNUNET_SYSERR
2080  */
2081 int
2082 GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
2083                        const struct GNUNET_PeerIdentity *id,
2084                        GNUNET_CADET_PeerCB callback,
2085                        void *callback_cls)
2086 {
2087   struct GNUNET_CADET_LocalInfo *msg;
2088   struct GNUNET_MQ_Envelope *env;
2089
2090   if (NULL != h->info_cb.peer_cb)
2091   {
2092     GNUNET_break (0);
2093     return GNUNET_SYSERR;
2094   }
2095
2096   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
2097   msg->peer = *id;
2098   GNUNET_MQ_send (h->mq, env);
2099
2100   h->info_cb.peer_cb = callback;
2101   h->info_cls = callback_cls;
2102   return GNUNET_OK;
2103 }
2104
2105
2106 /**
2107  * Request information about tunnels of the running cadet peer.
2108  * The callback will be called for every tunnel of the service.
2109  * Only one info request (of any kind) can be active at once.
2110  *
2111  * WARNING: unstable API, likely to change in the future!
2112  *
2113  * @param h Handle to the cadet peer.
2114  * @param callback Function to call with the requested data.
2115  * @param callback_cls Closure for @c callback.
2116  *
2117  * @return #GNUNET_OK / #GNUNET_SYSERR
2118  */
2119 int
2120 GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h,
2121                          GNUNET_CADET_TunnelsCB callback,
2122                          void *callback_cls)
2123 {
2124   if (NULL != h->info_cb.tunnels_cb)
2125   {
2126     GNUNET_break (0);
2127     return GNUNET_SYSERR;
2128   }
2129   send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS);
2130   h->info_cb.tunnels_cb = callback;
2131   h->info_cls = callback_cls;
2132   return GNUNET_OK;
2133 }
2134
2135
2136 /**
2137  * Cancel a monitor request. The monitor callback will not be called.
2138  *
2139  * @param h Cadet handle.
2140  *
2141  * @return Closure given to GNUNET_CADET_get_tunnels.
2142  */
2143 void *
2144 GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h)
2145 {
2146   void *cls;
2147
2148   h->info_cb.tunnels_cb = NULL;
2149   cls = h->info_cls;
2150   h->info_cls = NULL;
2151
2152   return cls;
2153 }
2154
2155
2156
2157 /**
2158  * Request information about a tunnel of the running cadet peer.
2159  * The callback will be called for the tunnel once.
2160  * Only one info request (of any kind) can be active at once.
2161  *
2162  * WARNING: unstable API, likely to change in the future!
2163  *
2164  * @param h Handle to the cadet peer.
2165  * @param id Peer whose tunnel to examine.
2166  * @param callback Function to call with the requested data.
2167  * @param callback_cls Closure for @c callback.
2168  *
2169  * @return #GNUNET_OK / #GNUNET_SYSERR
2170  */
2171 int
2172 GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2173                         const struct GNUNET_PeerIdentity *id,
2174                         GNUNET_CADET_TunnelCB callback,
2175                         void *callback_cls)
2176 {
2177   struct GNUNET_CADET_LocalInfo *msg;
2178   struct GNUNET_MQ_Envelope *env;
2179
2180   if (NULL != h->info_cb.tunnel_cb)
2181   {
2182     GNUNET_break (0);
2183     return GNUNET_SYSERR;
2184   }
2185
2186   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2187   msg->peer = *id;
2188   GNUNET_MQ_send (h->mq, env);
2189
2190   h->info_cb.tunnel_cb = callback;
2191   h->info_cls = callback_cls;
2192   return GNUNET_OK;
2193 }
2194
2195
2196 /**
2197  * Request information about a specific channel of the running cadet peer.
2198  *
2199  * WARNING: unstable API, likely to change in the future!
2200  * FIXME Add destination option.
2201  *
2202  * @param h Handle to the cadet peer.
2203  * @param initiator ID of the owner of the channel.
2204  * @param channel_number Channel number.
2205  * @param callback Function to call with the requested data.
2206  * @param callback_cls Closure for @c callback.
2207  *
2208  * @return #GNUNET_OK / #GNUNET_SYSERR
2209  */
2210 int
2211 GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2212                            struct GNUNET_PeerIdentity *initiator,
2213                            unsigned int channel_number,
2214                            GNUNET_CADET_ChannelCB callback,
2215                            void *callback_cls)
2216 {
2217   struct GNUNET_CADET_LocalInfo *msg;
2218   struct GNUNET_MQ_Envelope *env;
2219
2220   if (NULL != h->info_cb.channel_cb)
2221   {
2222     GNUNET_break (0);
2223     return GNUNET_SYSERR;
2224   }
2225
2226   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2227   msg->peer = *initiator;
2228   msg->ccn.channel_of_client = htonl (channel_number);
2229   GNUNET_MQ_send (h->mq, env);
2230
2231   h->info_cb.channel_cb = callback;
2232   h->info_cls = callback_cls;
2233   return GNUNET_OK;
2234 }
2235
2236
2237 /**
2238  * Function called to notify a client about the connection
2239  * begin ready to queue more data.  "buf" will be
2240  * NULL and "size" zero if the connection was closed for
2241  * writing in the meantime.
2242  *
2243  * @param cls closure
2244  * @param size number of bytes available in buf
2245  * @param buf where the callee should write the message
2246  * @return number of bytes written to buf
2247  */
2248 static size_t
2249 cadet_mq_ntr (void *cls, size_t size,
2250              void *buf)
2251 {
2252   struct GNUNET_MQ_Handle *mq = cls;
2253   struct CadetMQState *state = GNUNET_MQ_impl_state (mq);
2254   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
2255   uint16_t msize;
2256
2257   state->th = NULL;
2258   if (NULL == buf)
2259   {
2260     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
2261     return 0;
2262   }
2263   msize = ntohs (msg->size);
2264   GNUNET_assert (msize <= size);
2265   GNUNET_memcpy (buf, msg, msize);
2266   GNUNET_MQ_impl_send_continue (mq);
2267   return msize;
2268 }
2269
2270
2271 /**
2272  * Signature of functions implementing the
2273  * sending functionality of a message queue.
2274  *
2275  * @param mq the message queue
2276  * @param msg the message to send
2277  * @param impl_state state of the implementation
2278  */
2279 static void
2280 cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2281                         const struct GNUNET_MessageHeader *msg,
2282                         void *impl_state)
2283 {
2284   struct CadetMQState *state = impl_state;
2285
2286   GNUNET_assert (NULL == state->th);
2287   state->th =
2288       GNUNET_CADET_notify_transmit_ready (state->channel,
2289                                          /* FIXME: add option for corking */
2290                                          GNUNET_NO,
2291                                          GNUNET_TIME_UNIT_FOREVER_REL,
2292                                          ntohs (msg->size),
2293                                          &cadet_mq_ntr, mq);
2294
2295 }
2296
2297
2298 /**
2299  * Signature of functions implementing the
2300  * destruction of a message queue.
2301  * Implementations must not free 'mq', but should
2302  * take care of 'impl_state'.
2303  *
2304  * @param mq the message queue to destroy
2305  * @param impl_state state of the implementation
2306  */
2307 static void
2308 cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2309                            void *impl_state)
2310 {
2311   struct CadetMQState *state = impl_state;
2312
2313   if (NULL != state->th)
2314     GNUNET_CADET_notify_transmit_ready_cancel (state->th);
2315
2316   GNUNET_free (state);
2317 }
2318
2319
2320 /**
2321  * Create a message queue for a cadet channel.
2322  * The message queue can only be used to transmit messages,
2323  * not to receive them.
2324  *
2325  * @param channel the channel to create the message qeue for
2326  * @return a message queue to messages over the channel
2327  */
2328 struct GNUNET_MQ_Handle *
2329 GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2330 {
2331   struct GNUNET_MQ_Handle *mq;
2332   struct CadetMQState *state;
2333
2334   state = GNUNET_new (struct CadetMQState);
2335   state->channel = channel;
2336
2337   mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2338                                       &cadet_mq_destroy_impl_old,
2339                                       NULL, /* FIXME: cancel impl. */
2340                                       state,
2341                                       NULL, /* no msg handlers */
2342                                       NULL, /* no err handlers */
2343                                       NULL); /* no handler cls */
2344   return mq;
2345 }
2346
2347
2348 /**
2349  * Transitional function to convert an unsigned int port to a hash value.
2350  * WARNING: local static value returned, NOT reentrant!
2351  * WARNING: do not use this function for new code!
2352  *
2353  * @param port Numerical port (unsigned int format).
2354  *
2355  * @return A GNUNET_HashCode usable for the new CADET API.
2356  */
2357 const struct GNUNET_HashCode *
2358 GC_u2h (uint32_t port)
2359 {
2360   static struct GNUNET_HashCode hash;
2361
2362   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2363               "This is a transitional function, "
2364               "use proper crypto hashes as CADET ports\n");
2365   GNUNET_CRYPTO_hash (&port, sizeof (port), &hash);
2366
2367   return &hash;
2368 }
2369
2370
2371
2372 /******************************************************************************/
2373 /******************************* MQ-BASED API *********************************/
2374 /******************************************************************************/
2375
2376 /**
2377  * Connect to the MQ-based cadet service.
2378  *
2379  * @param cfg Configuration to use.
2380  *
2381  * @return Handle to the cadet service NULL on error.
2382  */
2383 struct GNUNET_CADET_Handle *
2384 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2385 {
2386   struct GNUNET_CADET_Handle *h;
2387
2388   LOG (GNUNET_ERROR_TYPE_DEBUG,
2389        "GNUNET_CADET_connecT()\n");
2390   h = GNUNET_new (struct GNUNET_CADET_Handle);
2391   h->cfg = cfg;
2392   h->mq_api = GNUNET_YES;
2393   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2394   reconnect (h);
2395   if (NULL == h->mq)
2396   {
2397     GNUNET_break (0);
2398     GNUNET_CADET_disconnect (h);
2399     return NULL;
2400   }
2401   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2402   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2403   h->reconnect_task = NULL;
2404
2405   return h;
2406 }
2407
2408
2409 /**
2410  * Open a port to receive incomming MQ-based channels.
2411  *
2412  * @param h CADET handle.
2413  * @param port Hash identifying the port.
2414  * @param connects Function called when an incoming channel is connected.
2415  * @param connects_cls Closure for the @a connects handler.
2416  * @param window_changes Function called when the transmit window size changes.
2417  * @param disconnects Function called when a channel is disconnected.
2418  * @param handlers Callbacks for messages we care about, NULL-terminated.
2419  *
2420  * @return Port handle.
2421  */
2422 struct GNUNET_CADET_Port *
2423 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2424                         const struct GNUNET_HashCode *port,
2425                         GNUNET_CADET_ConnectEventHandler connects,
2426                         void * connects_cls,
2427                         GNUNET_CADET_WindowSizeEventHandler window_changes,
2428                         GNUNET_CADET_DisconnectEventHandler disconnects,
2429                         const struct GNUNET_MQ_MessageHandler *handlers)
2430 {
2431   struct GNUNET_CADET_PortMessage *msg;
2432   struct GNUNET_MQ_Envelope *env;
2433   struct GNUNET_CADET_Port *p;
2434
2435   GNUNET_assert (NULL != connects);
2436   GNUNET_assert (NULL != disconnects);
2437
2438   p = GNUNET_new (struct GNUNET_CADET_Port);
2439   p->cadet = h;
2440   p->id = *port;
2441   p->connects = connects;
2442   p->cls = connects_cls;
2443   p->window_changes = window_changes;
2444   p->disconnects = disconnects;
2445   p->handlers = handlers;
2446
2447   GNUNET_assert (GNUNET_OK ==
2448                  GNUNET_CONTAINER_multihashmap_put (h->ports,
2449                                                     p->hash,
2450                                                     p,
2451                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2452
2453   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2454   msg->port = p->id;
2455   GNUNET_MQ_send (h->mq, env);
2456
2457   return p;
2458 }
2459
2460
2461 /**
2462  * Create a new channel towards a remote peer.
2463  *
2464  * If the destination port is not open by any peer or the destination peer
2465  * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2466  * for this channel.
2467  *
2468  * @param h CADET handle.
2469  * @param channel_cls Closure for the channel. It's given to:
2470  *                    - The disconnect handler @a disconnects
2471  *                    - Each message type callback in @a handlers
2472  * @param destination Peer identity the channel should go to.
2473  * @param port Identification of the destination port.
2474  * @param options CadetOption flag field, with all desired option bits set to 1.
2475  * @param window_changes Function called when the transmit window size changes.
2476  * @param disconnects Function called when the channel is disconnected.
2477  * @param handlers Callbacks for messages we care about, NULL-terminated.
2478  *
2479  * @return Handle to the channel.
2480  */
2481 struct GNUNET_CADET_Channel *
2482 GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2483                              void *channel_cls,
2484                              const struct GNUNET_PeerIdentity *destination,
2485                              const struct GNUNET_HashCode *port,
2486                              enum GNUNET_CADET_ChannelOption options,
2487                              GNUNET_CADET_WindowSizeEventHandler window_changes,
2488                              GNUNET_CADET_DisconnectEventHandler disconnects,
2489                              const struct GNUNET_MQ_MessageHandler *handlers)
2490 {
2491   struct GNUNET_CADET_Channel *ch;
2492   struct GNUNET_CADET_ClientChannelNumber ccn;
2493   struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2494   struct GNUNET_MQ_Envelope *env;
2495
2496   GNUNET_assert (NULL != disconnects);
2497
2498   /* Save parameters */
2499   ccn.channel_of_client = htonl (0);
2500   ch = create_channel (h, ccn);
2501   ch->ctx = channel_cls;
2502   ch->peer = GNUNET_PEER_intern (destination);
2503   ch->options = options;
2504   ch->window_changes = window_changes;
2505   ch->disconnects = disconnects;
2506
2507   /* Create MQ for channel */
2508   ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2509                                           &cadet_mq_destroy_impl,
2510                                           &cadet_mq_cancel_impl,
2511                                           ch,
2512                                           handlers,
2513                                           &cadet_mq_error_handler,
2514                                           ch);
2515   GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2516
2517   /* Request channel creation to service */
2518   env = GNUNET_MQ_msg (msg,
2519                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2520   msg->ccn = ch->ccn;
2521   msg->port = *port;
2522   msg->peer = *destination;
2523   msg->opt = htonl (options);
2524   GNUNET_MQ_send (h->mq,
2525                   env);
2526
2527   return ch;
2528 }
2529
2530
2531 /**
2532  * Obtain the message queue for a connected peer.
2533  *
2534  * @param channel The channel handle from which to get the MQ.
2535  *
2536  * @return NULL if @a channel is not yet connected.
2537  */
2538 struct GNUNET_MQ_Handle *
2539 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2540 {
2541   return channel->mq;
2542 }