- unify flow control with service, adapt to client-controlled flow control
[oweals/gnunet.git] / src / mesh / mesh2_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4      GNUnet is free software; you can redistribute it and/or modify
5      it under the terms of the GNU General Public License as published
6      by the Free Software Foundation; either version 3, or (at your
7      option) any later version.
8      GNUnet is distributed in the hope that it will be useful, but
9      WITHOUT ANY WARRANTY; without even the implied warranty of
10      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11      General Public License for more details.
12      You should have received a copy of the GNU General Public License
13      along with GNUnet; see the file COPYING.  If not, write to the
14      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
15      Boston, MA 02111-1307, USA.
16 */
17
18 /**
19  * @file mesh/mesh2_api.c
20  * @brief mesh2 api: client implementation of new mesh service
21  * @author Bartlomiej Polot
22  *
23  * STRUCTURE:
24  * - DATA STRUCTURES
25  * - DECLARATIONS
26  * - AUXILIARY FUNCTIONS
27  * - RECEIVE HANDLERS
28  * - SEND FUNCTIONS
29  * - API CALL DEFINITIONS
30  *
31  * TODO: add regex to reconnect
32  */
33 #include "platform.h"
34 #include "gnunet_common.h"
35 #include "gnunet_client_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_peer_lib.h"
38 #include "gnunet_mesh2_service.h"
39 #include "mesh2.h"
40 #include "mesh2_protocol.h"
41
42 #define LOG(kind,...) GNUNET_log_from (kind, "mesh2-api",__VA_ARGS__)
43
44 /* FIXME: use dynamic values, expose in API */
45 #define INITIAL_WINDOW_SIZE 4
46 #define ACK_THRESHOLD 2
47 /* FIXME END */
48
49 #define DEBUG_ACK GNUNET_YES
50
51 /******************************************************************************/
52 /************************      DATA STRUCTURES     ****************************/
53 /******************************************************************************/
54
55 /**
56  * Transmission queue to the service
57  */
58 struct GNUNET_MESH_TransmitHandle
59 {
60
61     /**
62      * Double Linked list
63      */
64   struct GNUNET_MESH_TransmitHandle *next;
65
66     /**
67      * Double Linked list
68      */
69   struct GNUNET_MESH_TransmitHandle *prev;
70
71     /**
72      * Tunnel this message is sent on / for (may be NULL for control messages).
73      */
74   struct GNUNET_MESH_Tunnel *tunnel;
75
76     /**
77      * Callback to obtain the message to transmit, or NULL if we
78      * got the message in 'data'.  Notice that messages built
79      * by 'notify' need to be encapsulated with information about
80      * the 'target'.
81      */
82   GNUNET_CONNECTION_TransmitReadyNotify notify;
83
84     /**
85      * Closure for 'notify'
86      */
87   void *notify_cls;
88
89     /**
90      * How long is this message valid.  Once the timeout has been
91      * reached, the message must no longer be sent.  If this
92      * is a message with a 'notify' callback set, the 'notify'
93      * function should be called with 'buf' NULL and size 0.
94      */
95   struct GNUNET_TIME_Absolute timeout;
96
97     /**
98      * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
99      */
100   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
101
102     /**
103      * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
104      */
105   size_t size;
106 };
107
108
109 /**
110  * Opaque handle to the service.
111  */
112 struct GNUNET_MESH_Handle
113 {
114
115     /**
116      * Handle to the server connection, to send messages later
117      */
118   struct GNUNET_CLIENT_Connection *client;
119
120     /**
121      * Set of handlers used for processing incoming messages in the tunnels
122      */
123   const struct GNUNET_MESH_MessageHandler *message_handlers;
124
125   /**
126    * Number of handlers in the handlers array.
127    */
128   unsigned int n_handlers;
129
130   /**
131    * Ports open.
132    */
133   const uint32_t *ports;
134
135   /**
136    * Number of ports.
137    */
138   unsigned int n_ports;
139
140     /**
141      * Double linked list of the tunnels this client is connected to, head.
142      */
143   struct GNUNET_MESH_Tunnel *tunnels_head;
144
145     /**
146      * Double linked list of the tunnels this client is connected to, tail.
147      */
148   struct GNUNET_MESH_Tunnel *tunnels_tail;
149
150     /**
151      * Callback for inbound tunnel creation
152      */
153   GNUNET_MESH_InboundTunnelNotificationHandler *new_tunnel;
154
155     /**
156      * Callback for inbound tunnel disconnection
157      */
158   GNUNET_MESH_TunnelEndHandler *cleaner;
159
160     /**
161      * Handle to cancel pending transmissions in case of disconnection
162      */
163   struct GNUNET_CLIENT_TransmitHandle *th;
164
165     /**
166      * Closure for all the handlers given by the client
167      */
168   void *cls;
169
170     /**
171      * Messages to send to the service, head.
172      */
173   struct GNUNET_MESH_TransmitHandle *th_head;
174
175     /**
176      * Messages to send to the service, tail.
177      */
178   struct GNUNET_MESH_TransmitHandle *th_tail;
179
180     /**
181      * tid of the next tunnel to create (to avoid reusing IDs often)
182      */
183   MESH_TunnelNumber next_tid;
184
185     /**
186      * Have we started the task to receive messages from the service
187      * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message.
188      */
189   int in_receive;
190
191   /**
192    * Configuration given by the client, in case of reconnection
193    */
194   const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196   /**
197    * Time to the next reconnect in case one reconnect fails
198    */
199   struct GNUNET_TIME_Relative reconnect_time;
200   
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * Monitor callback
208    */
209   GNUNET_MESH_TunnelsCB tunnels_cb;
210
211   /**
212    * Monitor callback closure.
213    */
214   void *tunnels_cls;
215
216   /**
217    * Tunnel callback.
218    */
219   GNUNET_MESH_TunnelCB tunnel_cb;
220
221   /**
222    * Tunnel callback closure.
223    */
224   void *tunnel_cls;
225
226 #if DEBUG_ACK
227   unsigned int acks_sent;
228   unsigned int acks_recv;
229 #endif
230 };
231
232
233 /**
234  * Description of a peer
235  */
236 struct GNUNET_MESH_Peer
237 {
238     /**
239      * ID of the peer in short form
240      */
241   GNUNET_PEER_Id id;
242
243   /**
244    * Tunnel this peer belongs to
245    */
246   struct GNUNET_MESH_Tunnel *t;
247
248   /**
249    * Flag indicating whether service has informed about its connection
250    */
251   int connected;
252
253 };
254
255
256 /**
257  * Opaque handle to a tunnel.
258  */
259 struct GNUNET_MESH_Tunnel
260 {
261
262     /**
263      * DLL next
264      */
265   struct GNUNET_MESH_Tunnel *next;
266
267     /**
268      * DLL prev
269      */
270   struct GNUNET_MESH_Tunnel *prev;
271
272     /**
273      * Handle to the mesh this tunnel belongs to
274      */
275   struct GNUNET_MESH_Handle *mesh;
276
277     /**
278      * Local ID of the tunnel
279      */
280   MESH_TunnelNumber tid;
281
282     /**
283      * Port number.
284      */
285   uint32_t port;
286
287     /**
288      * Other end of the tunnel.
289      */
290   GNUNET_PEER_Id peer;
291
292   /**
293    * Any data the caller wants to put in here
294    */
295   void *ctx;
296
297     /**
298      * Size of packet queued in this tunnel
299      */
300   unsigned int packet_size;
301
302     /**
303      * Is the tunnel allowed to buffer?
304      */
305   int buffering;
306
307     /**
308      * Maximum allowed PID to send (last ACK recevied).
309      */
310   uint32_t last_ack_recv;
311
312     /**
313      * Last PID received from the service.
314      */
315   uint32_t last_pid_recv;
316
317   /**
318    * Last packet ID sent to the service.
319    */
320   uint32_t last_pid_sent;
321
322   /**
323    * Last ACK value sent to the service: how much are we willing to accept?
324    */
325   uint32_t last_ack_sent;
326 };
327
328
329 /******************************************************************************/
330 /***********************         DECLARATIONS         *************************/
331 /******************************************************************************/
332
333 /**
334  * Function called to send a message to the service.
335  * "buf" will be NULL and "size" zero if the socket was closed for writing in
336  * the meantime.
337  *
338  * @param cls closure, the mesh handle
339  * @param size number of bytes available in buf
340  * @param buf where the callee should write the connect message
341  * @return number of bytes written to buf
342  */
343 static size_t
344 send_callback (void *cls, size_t size, void *buf);
345
346
347 /******************************************************************************/
348 /***********************     AUXILIARY FUNCTIONS      *************************/
349 /******************************************************************************/
350
351 /**
352  * Check if transmission is a payload packet.
353  *
354  * @param th Transmission handle.
355  *
356  * @return GNUNET_YES if it is a payload packet,
357  *         GNUNET_NO if it is a mesh management packet.
358  */
359 static int
360 th_is_payload (struct GNUNET_MESH_TransmitHandle *th)
361 {
362   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
363 }
364
365
366 /**
367  * Check whether there is any message ready in the queue and find the size.
368  * 
369  * @param h Mesh handle.
370  * 
371  * @return The size of the first ready message in the queue,
372  *         0 if there is none.
373  */
374 static size_t
375 message_ready_size (struct GNUNET_MESH_Handle *h)
376 {
377   struct GNUNET_MESH_TransmitHandle *th;
378   struct GNUNET_MESH_Tunnel *t;
379
380   for (th = h->th_head; NULL != th; th = th->next)
381   {
382     t = th->tunnel;
383     if (GNUNET_NO == th_is_payload (th))
384     {
385       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message internal\n");
386       return th->size;
387     }
388     if (GNUNET_YES == GMC_is_pid_bigger(t->last_ack_recv, t->last_pid_sent))
389     {
390       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message payload ok (%u <= %u)\n",
391            t->last_pid_sent, t->last_ack_recv);
392       return th->size;
393     }
394   }
395   return 0;
396 }
397
398
399 /**
400  * Get the tunnel handler for the tunnel specified by id from the given handle
401  * @param h Mesh handle
402  * @param tid ID of the wanted tunnel
403  * @return handle to the required tunnel or NULL if not found
404  */
405 static struct GNUNET_MESH_Tunnel *
406 retrieve_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
407 {
408   struct GNUNET_MESH_Tunnel *t;
409
410   t = h->tunnels_head;
411   while (t != NULL)
412   {
413     if (t->tid == tid)
414       return t;
415     t = t->next;
416   }
417   return NULL;
418 }
419
420
421 /**
422  * Create a new tunnel and insert it in the tunnel list of the mesh handle
423  * @param h Mesh handle
424  * @param tid desired tid of the tunnel, 0 to assign one automatically
425  * @return handle to the created tunnel
426  */
427 static struct GNUNET_MESH_Tunnel *
428 create_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
429 {
430   struct GNUNET_MESH_Tunnel *t;
431
432   t = GNUNET_malloc (sizeof (struct GNUNET_MESH_Tunnel));
433   GNUNET_CONTAINER_DLL_insert (h->tunnels_head, h->tunnels_tail, t);
434   t->mesh = h;
435   if (0 == tid)
436   {
437     t->tid = h->next_tid;
438     while (NULL != retrieve_tunnel (h, h->next_tid))
439     {
440       h->next_tid++;
441       h->next_tid &= ~GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
442       h->next_tid |= GNUNET_MESH_LOCAL_TUNNEL_ID_CLI;
443     }
444   }
445   else
446   {
447     t->tid = tid;
448   }
449   t->last_ack_recv = (uint32_t) -1;
450   t->last_pid_recv = (uint32_t) -1;
451   t->buffering = GNUNET_YES;
452   return t;
453 }
454
455
456 /**
457  * Destroy the specified tunnel.
458  * - Destroys all peers, calling the disconnect callback on each if needed
459  * - Cancels all outgoing traffic for that tunnel, calling respective notifys
460  * - Calls cleaner if tunnel was inbound
461  * - Frees all memory used
462  *
463  * @param t Pointer to the tunnel.
464  * @param call_cleaner Whether to call the cleaner handler.
465  *
466  * @return Handle to the required tunnel or NULL if not found.
467  */
468 static void
469 destroy_tunnel (struct GNUNET_MESH_Tunnel *t, int call_cleaner)
470 {
471   struct GNUNET_MESH_Handle *h;
472   struct GNUNET_MESH_TransmitHandle *th;
473   struct GNUNET_MESH_TransmitHandle *next;
474
475   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroy_tunnel %X\n", t->tid);
476
477   if (NULL == t)
478   {
479     GNUNET_break (0);
480     return;
481   }
482   h = t->mesh;
483
484   /* free all peer's ID */
485   GNUNET_CONTAINER_DLL_remove (h->tunnels_head, h->tunnels_tail, t);
486   GNUNET_PEER_change_rc (t->peer, -1);
487
488   /* signal tunnel destruction */
489   if ( (NULL != h->cleaner) && (0 != t->peer) && (GNUNET_YES == call_cleaner) )
490     h->cleaner (h->cls, t, t->ctx);
491
492   /* check that clients did not leave messages behind in the queue */
493   for (th = h->th_head; NULL != th; th = next)
494   {
495     next = th->next;
496     if (th->tunnel != t)
497       continue;
498     /* Clients should have aborted their requests already.
499      * Management traffic should be ok, as clients can't cancel that */
500     GNUNET_break (GNUNET_NO == th_is_payload(th));
501     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
502
503     /* clean up request */
504     if (GNUNET_SCHEDULER_NO_TASK != th->timeout_task)
505       GNUNET_SCHEDULER_cancel (th->timeout_task);
506     GNUNET_free (th);    
507   }
508
509   /* if there are no more pending requests with mesh service, cancel active request */
510   /* Note: this should be unnecessary... */
511   if ((0 == message_ready_size (h)) && (NULL != h->th))
512   {
513     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
514     h->th = NULL;
515   }
516
517   if (0 != t->peer)
518     GNUNET_PEER_change_rc (t->peer, -1);
519   GNUNET_free (t);
520   return;
521 }
522
523
524 /**
525  * Notify client that the transmission has timed out
526  * 
527  * @param cls closure
528  * @param tc task context
529  */
530 static void
531 timeout_transmission (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
532 {
533   struct GNUNET_MESH_TransmitHandle *th = cls;
534   struct GNUNET_MESH_Handle *mesh;
535
536   mesh = th->tunnel->mesh;
537   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
538   th->tunnel->packet_size = 0;
539   if (GNUNET_YES == th_is_payload (th))
540     th->notify (th->notify_cls, 0, NULL);
541   GNUNET_free (th);
542   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
543   {
544     /* nothing ready to transmit, no point in asking for transmission */
545     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
546     mesh->th = NULL;
547   }
548 }
549
550
551 /**
552  * Add a transmit handle to the transmission queue and set the
553  * timeout if needed.
554  *
555  * @param h mesh handle with the queue head and tail
556  * @param th handle to the packet to be transmitted
557  */
558 static void
559 add_to_queue (struct GNUNET_MESH_Handle *h,
560               struct GNUNET_MESH_TransmitHandle *th)
561 {
562   GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
563   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value == th->timeout.abs_value)
564     return;
565   th->timeout_task =
566       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
567                                     (th->timeout), &timeout_transmission, th);
568 }
569
570
571 /**
572  * Auxiliary function to send an already constructed packet to the service.
573  * Takes care of creating a new queue element, copying the message and
574  * calling the tmt_rdy function if necessary.
575  *
576  * @param h mesh handle
577  * @param msg message to transmit
578  * @param tunnel tunnel this send is related to (NULL if N/A)
579  */
580 static void
581 send_packet (struct GNUNET_MESH_Handle *h,
582              const struct GNUNET_MessageHeader *msg,
583              struct GNUNET_MESH_Tunnel *tunnel);
584
585
586 /**
587  * Send an ack on the tunnel to confirm the processing of a message.
588  * 
589  * @param h Mesh handle.
590  * @param t Tunnel on which to send the ACK.
591  */
592 static void
593 send_ack (struct GNUNET_MESH_Handle *h, struct GNUNET_MESH_Tunnel *t)
594 {
595   struct GNUNET_MESH_LocalAck msg;
596   uint32_t delta;
597
598   delta = t->last_ack_sent - t->last_pid_recv;
599   if (delta > ACK_THRESHOLD)
600   {
601     LOG (GNUNET_ERROR_TYPE_DEBUG,
602          "Not sending ACK on tunnel %X: ACK: %u, PID: %u, buffer %u\n",
603          t->tid, t->last_ack_sent, t->last_pid_recv, delta);
604     return;
605   }
606   if (GNUNET_YES == t->buffering)
607     t->last_ack_sent = t->last_pid_recv + INITIAL_WINDOW_SIZE;
608   else
609     t->last_ack_sent = t->last_pid_recv + 1;
610   LOG (GNUNET_ERROR_TYPE_DEBUG,
611        "Sending ACK on tunnel %X: %u\n",
612        t->tid, t->last_ack_sent);
613   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
614   msg.header.size = htons (sizeof (msg));
615   msg.tunnel_id = htonl (t->tid);
616   msg.max_pid = htonl (t->last_ack_sent);
617
618 #if DEBUG_ACK
619   t->mesh->acks_sent++;
620 #endif
621
622   send_packet (h, &msg.header, t);
623   return;
624 }
625
626
627
628 /**
629  * Reconnect callback: tries to reconnect again after a failer previous
630  * reconnecttion
631  * @param cls closure (mesh handle)
632  * @param tc task context
633  */
634 static void
635 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
636
637
638 /**
639  * Send a connect packet to the service with the applications and types
640  * requested by the user.
641  *
642  * @param h The mesh handle.
643  *
644  */
645 static void
646 send_connect (struct GNUNET_MESH_Handle *h)
647 {
648   size_t size;
649
650   size = sizeof (struct GNUNET_MESH_ClientConnect);
651   size += h->n_ports * sizeof (uint32_t);
652   {
653     char buf[size] GNUNET_ALIGN;
654     struct GNUNET_MESH_ClientConnect *msg;
655     uint32_t *ports;
656     uint16_t i;
657
658     /* build connection packet */
659     msg = (struct GNUNET_MESH_ClientConnect *) buf;
660     msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
661     msg->header.size = htons (size);
662     ports = (uint32_t *) &msg[1];
663     for (i = 0; i < h->n_ports; i++)
664     {
665       ports[i] = htonl (h->ports[i]);
666       LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n",
667            h->ports[i]);
668     }
669     LOG (GNUNET_ERROR_TYPE_DEBUG,
670          "Sending %lu bytes long message with %u ports\n",
671          ntohs (msg->header.size), h->n_ports);
672     send_packet (h, &msg->header, NULL);
673   }
674 }
675
676
677 /**
678  * Reconnect to the service, retransmit all infomation to try to restore the
679  * original state.
680  *
681  * @param h handle to the mesh
682  *
683  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
684  */
685 static int
686 do_reconnect (struct GNUNET_MESH_Handle *h)
687 {
688   struct GNUNET_MESH_Tunnel *t;
689
690   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
691   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
692   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
693   LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
694   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
695
696   /* disconnect */
697   if (NULL != h->th)
698   {
699     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
700     h->th = NULL;
701   }
702   if (NULL != h->client)
703   {
704     GNUNET_CLIENT_disconnect (h->client);
705   }
706
707   /* connect again */
708   h->client = GNUNET_CLIENT_connect ("mesh", h->cfg);
709   if (h->client == NULL)
710   {
711     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
712                                                       &reconnect_cbk, h);
713     h->reconnect_time =
714         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
715                                   GNUNET_TIME_relative_multiply
716                                   (h->reconnect_time, 2));
717     LOG (GNUNET_ERROR_TYPE_DEBUG, 
718          "Next retry in %s\n",
719          GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
720                                                  GNUNET_NO));
721     GNUNET_break (0);
722     return GNUNET_NO;
723   }
724   else
725   {
726     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
727   }
728   send_connect (h);
729   /* Rebuild all tunnels */
730   for (t = h->tunnels_head; NULL != t; t = t->next)
731   {
732     struct GNUNET_MESH_TunnelMessage tmsg;
733
734     if (t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
735     {
736       /* Tunnel was created by service (incoming tunnel) */
737       /* TODO: Notify service of missing tunnel, to request
738        * creator to recreate path (find a path to him via DHT?)
739        */
740       continue;
741     }
742     t->last_ack_sent = (uint32_t) -1;
743     t->last_pid_sent = (uint32_t) -1;
744     t->last_ack_recv = (uint32_t) -1;
745     t->last_pid_recv = (uint32_t) -1;
746     tmsg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
747     tmsg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
748     tmsg.tunnel_id = htonl (t->tid);
749     GNUNET_PEER_resolve (t->peer, &tmsg.peer);
750     send_packet (h, &tmsg.header, t);
751
752     if (GNUNET_NO == t->buffering)
753       GNUNET_MESH_tunnel_buffer (t, GNUNET_NO);
754   }
755   return GNUNET_YES;
756 }
757
758 /**
759  * Reconnect callback: tries to reconnect again after a failer previous
760  * reconnecttion
761  * @param cls closure (mesh handle)
762  * @param tc task context
763  */
764 static void
765 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
766 {
767   struct GNUNET_MESH_Handle *h = cls;
768
769   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
770   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
771     return;
772   do_reconnect (h);
773 }
774
775
776 /**
777  * Reconnect to the service, retransmit all infomation to try to restore the
778  * original state.
779  *
780  * @param h handle to the mesh
781  *
782  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
783  */
784 static void
785 reconnect (struct GNUNET_MESH_Handle *h)
786 {
787   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requested RECONNECT\n");
788   h->in_receive = GNUNET_NO;
789   if (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task)
790     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
791                                                       &reconnect_cbk, h);
792 }
793
794
795 /******************************************************************************/
796 /***********************      RECEIVE HANDLERS     ****************************/
797 /******************************************************************************/
798
799 /**
800  * Process the new tunnel notification and add it to the tunnels in the handle
801  *
802  * @param h     The mesh handle
803  * @param msg   A message with the details of the new incoming tunnel
804  */
805 static void
806 process_tunnel_created (struct GNUNET_MESH_Handle *h,
807                         const struct GNUNET_MESH_TunnelNotification *msg)
808 {
809   struct GNUNET_MESH_Tunnel *t;
810   MESH_TunnelNumber tid;
811
812   tid = ntohl (msg->tunnel_id);
813   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming tunnel %X\n", tid);
814   if (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
815   {
816     GNUNET_break (0);
817     return;
818   }
819   if (NULL != h->new_tunnel)
820   {
821     t = create_tunnel (h, tid);
822     t->peer = GNUNET_PEER_intern (&msg->peer);
823     GNUNET_PEER_change_rc (t->peer, 1);
824     t->mesh = h;
825     t->tid = tid;
826     t->port = ntohl (msg->port);
827     if (0 != (msg->opt & MESH_TUNNEL_OPT_NOBUFFER))
828       t->buffering = GNUNET_NO;
829     else
830       t->buffering = GNUNET_YES;
831     LOG (GNUNET_ERROR_TYPE_DEBUG, "  created tunnel %p\n", t);
832     t->ctx = h->new_tunnel (h->cls, t, &msg->peer, t->port);
833     LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
834   }
835   else
836   {
837     struct GNUNET_MESH_TunnelMessage d_msg;
838
839     LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming tunnels\n");
840
841     d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
842     d_msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
843     d_msg.tunnel_id = msg->tunnel_id;
844
845     send_packet (h, &d_msg.header, NULL);
846   }
847   return;
848 }
849
850
851 /**
852  * Process the tunnel destroy notification and free associated resources
853  *
854  * @param h     The mesh handle
855  * @param msg   A message with the details of the tunnel being destroyed
856  */
857 static void
858 process_tunnel_destroy (struct GNUNET_MESH_Handle *h,
859                         const struct GNUNET_MESH_TunnelMessage *msg)
860 {
861   struct GNUNET_MESH_Tunnel *t;
862   MESH_TunnelNumber tid;
863
864   tid = ntohl (msg->tunnel_id);
865   t = retrieve_tunnel (h, tid);
866
867   if (NULL == t)
868   {
869     return;
870   }
871   LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X destroyed\n", t->tid);
872   destroy_tunnel (t, GNUNET_YES);
873   return;
874 }
875
876
877 /**
878  * Process the incoming data packets
879  *
880  * @param h         The mesh handle
881  * @param message   A message encapsulating the data
882  * 
883  * @return GNUNET_YES if everything went fine
884  *         GNUNET_NO if client closed connection (h no longer valid)
885  */
886 static int
887 process_incoming_data (struct GNUNET_MESH_Handle *h,
888                        const struct GNUNET_MessageHeader *message)
889 {
890   const struct GNUNET_MessageHeader *payload;
891   const struct GNUNET_MESH_MessageHandler *handler;
892   const struct GNUNET_PeerIdentity *peer;
893   struct GNUNET_PeerIdentity id;
894   struct GNUNET_MESH_Unicast *ucast;
895   struct GNUNET_MESH_ToOrigin *to_orig;
896   struct GNUNET_MESH_Tunnel *t;
897   unsigned int i;
898   uint32_t pid;
899   uint16_t type;
900
901   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
902   type = ntohs (message->type);
903   switch (type)
904   {
905   case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
906     ucast = (struct GNUNET_MESH_Unicast *) message;
907
908     t = retrieve_tunnel (h, ntohl (ucast->tid));
909     payload = (struct GNUNET_MessageHeader *) &ucast[1];
910     peer = &ucast->oid;
911     pid = ntohl (ucast->pid);
912     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ucast on tunnel %s [%X]\n",
913          GNUNET_i2s (peer), ntohl (ucast->tid));
914     break;
915   case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
916     to_orig = (struct GNUNET_MESH_ToOrigin *) message;
917     t = retrieve_tunnel (h, ntohl (to_orig->tid));
918     payload = (struct GNUNET_MessageHeader *) &to_orig[1];
919     GNUNET_PEER_resolve (t->peer, &id);
920     peer = &id;
921     pid = ntohl (to_orig->pid);
922     LOG (GNUNET_ERROR_TYPE_DEBUG, "  torig on tunnel %s [%X]\n",
923          GNUNET_i2s (peer), ntohl (to_orig->tid));
924     break;
925   default:
926     GNUNET_break (0);
927     return GNUNET_YES;
928   }
929   LOG (GNUNET_ERROR_TYPE_DEBUG, "  pid %u\n", pid);
930   if (NULL == t)
931   {
932     /* Tunnel was ignored/destroyed, probably service didn't get it yet */
933     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ignored!\n");
934     return GNUNET_YES;
935   }
936   if (GNUNET_YES ==
937       GMC_is_pid_bigger(pid, t->last_ack_sent))
938   {
939     GNUNET_break (0);
940     LOG (GNUNET_ERROR_TYPE_WARNING,
941          "  unauthorized message! (%u, ACK %u)\n",
942          pid, t->last_ack_sent);
943     // FIXME fc what now? accept? reject?
944     return GNUNET_YES;
945   }
946   t->last_pid_recv = pid;
947   type = ntohs (payload->type);
948   send_ack (h, t);
949   for (i = 0; i < h->n_handlers; i++)
950   {
951     handler = &h->message_handlers[i];
952     if (handler->type == type)
953     {
954       if (GNUNET_OK !=
955           handler->callback (h->cls, t, &t->ctx, peer, payload))
956       {
957         LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
958         GNUNET_MESH_disconnect (h);
959         return GNUNET_NO;
960       }
961       else
962       {
963         LOG (GNUNET_ERROR_TYPE_DEBUG,
964              "callback completed successfully\n");
965       }
966     }
967   }
968   return GNUNET_YES;
969 }
970
971
972 /**
973  * Process a local ACK message, enabling the client to send
974  * more data to the service.
975  * 
976  * @param h Mesh handle.
977  * @param message Message itself.
978  */
979 static void
980 process_ack (struct GNUNET_MESH_Handle *h,
981              const struct GNUNET_MessageHeader *message)
982 {
983   struct GNUNET_MESH_LocalAck *msg;
984   struct GNUNET_MESH_Tunnel *t;
985   uint32_t ack;
986
987   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
988   h->acks_recv++;
989   msg = (struct GNUNET_MESH_LocalAck *) message;
990
991   t = retrieve_tunnel (h, ntohl (msg->tunnel_id));
992
993   if (NULL == t)
994   {
995     LOG (GNUNET_ERROR_TYPE_WARNING,
996          "ACK on unknown tunnel %X\n",
997          ntohl (msg->tunnel_id));
998     return;
999   }
1000   ack = ntohl (msg->max_pid);
1001   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on tunnel %X, ack %u!\n", t->tid, ack);
1002   if (GNUNET_YES == GMC_is_pid_bigger(ack, t->last_ack_recv))
1003     t->last_ack_recv = ack;
1004   else
1005     return;
1006   if (NULL == h->th && 0 < t->packet_size)
1007   {
1008     LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n", t->tid, ack);
1009     h->th =
1010         GNUNET_CLIENT_notify_transmit_ready (h->client, t->packet_size,
1011                                              GNUNET_TIME_UNIT_FOREVER_REL,
1012                                              GNUNET_YES, &send_callback, h);
1013   }
1014 }
1015
1016
1017 /**
1018  * Process a local reply about info on all tunnels, pass info to the user.
1019  *
1020  * @param h Mesh handle.
1021  * @param message Message itself.
1022  */
1023 static void
1024 process_get_tunnels (struct GNUNET_MESH_Handle *h,
1025                      const struct GNUNET_MessageHeader *message)
1026 {
1027   struct GNUNET_MESH_LocalMonitor *msg;
1028
1029   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Tunnels messasge received\n");
1030
1031   if (NULL == h->tunnels_cb)
1032   {
1033     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1034     return;
1035   }
1036
1037   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1038   if (ntohs (message->size) !=
1039       (sizeof (struct GNUNET_MESH_LocalMonitor) +
1040        sizeof (struct GNUNET_PeerIdentity)))
1041   {
1042     GNUNET_break_op (0);
1043     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1044                 "Get tunnels message: size %hu - expected %u\n",
1045                 ntohs (message->size),
1046                 sizeof (struct GNUNET_MESH_LocalMonitor));
1047     return;
1048   }
1049   h->tunnels_cb (h->tunnels_cls,
1050                  ntohl (msg->tunnel_id),
1051                  &msg->owner,
1052                  &msg->destination);
1053 }
1054
1055
1056
1057 /**
1058  * Process a local monitor_tunnel reply, pass info to the user.
1059  *
1060  * @param h Mesh handle.
1061  * @param message Message itself.
1062  */
1063 static void
1064 process_show_tunnel (struct GNUNET_MESH_Handle *h,
1065                      const struct GNUNET_MessageHeader *message)
1066 {
1067   struct GNUNET_MESH_LocalMonitor *msg;
1068   size_t esize;
1069
1070   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Tunnel messasge received\n");
1071
1072   if (NULL == h->tunnel_cb)
1073   {
1074     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1075     return;
1076   }
1077
1078   /* Verify message sanity */
1079   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1080   esize = sizeof (struct GNUNET_MESH_LocalMonitor);
1081   if (ntohs (message->size) != esize)
1082   {
1083     GNUNET_break_op (0);
1084     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1085                 "Show tunnel message: size %hu - expected %u\n",
1086                 ntohs (message->size),
1087                 esize);
1088
1089     h->tunnel_cb (h->tunnel_cls, NULL, NULL);
1090     h->tunnel_cb = NULL;
1091     h->tunnel_cls = NULL;
1092
1093     return;
1094   }
1095
1096   h->tunnel_cb (h->tunnel_cls,
1097                 &msg->destination,
1098                 &msg->owner);
1099 }
1100
1101
1102 /**
1103  * Function to process all messages received from the service
1104  *
1105  * @param cls closure
1106  * @param msg message received, NULL on timeout or fatal error
1107  */
1108 static void
1109 msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1110 {
1111   struct GNUNET_MESH_Handle *h = cls;
1112
1113   if (msg == NULL)
1114   {
1115     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1116          "Mesh service disconnected, reconnecting\n", h);
1117     reconnect (h);
1118     return;
1119   }
1120   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n",
1121        GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1122   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1123        GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1124   switch (ntohs (msg->type))
1125   {
1126     /* Notify of a new incoming tunnel */
1127   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE:
1128     process_tunnel_created (h, (struct GNUNET_MESH_TunnelNotification *) msg);
1129     break;
1130     /* Notify of a tunnel disconnection */
1131   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY:
1132     process_tunnel_destroy (h, (struct GNUNET_MESH_TunnelMessage *) msg);
1133     break;
1134     /* Notify of a new data packet in the tunnel */
1135   case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
1136   case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
1137   case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
1138     if (GNUNET_NO == process_incoming_data (h, msg))
1139       return;
1140     break;
1141   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
1142     process_ack (h, msg);
1143     break;
1144   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS:
1145         process_get_tunnels (h, msg);
1146     break;
1147   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL:
1148         process_show_tunnel (h, msg);
1149     break;
1150   default:
1151     /* We shouldn't get any other packages, log and ignore */
1152     LOG (GNUNET_ERROR_TYPE_WARNING,
1153          "unsolicited message form service (type %s)\n",
1154          GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1155   }
1156   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1157   if (GNUNET_YES == h->in_receive)
1158   {
1159     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1160                            GNUNET_TIME_UNIT_FOREVER_REL);
1161   }
1162   else
1163   {
1164     LOG (GNUNET_ERROR_TYPE_DEBUG,
1165          "in receive off, not calling CLIENT_receive\n");
1166   }
1167 }
1168
1169
1170 /******************************************************************************/
1171 /************************       SEND FUNCTIONS     ****************************/
1172 /******************************************************************************/
1173
1174 /**
1175  * Function called to send a message to the service.
1176  * "buf" will be NULL and "size" zero if the socket was closed for writing in
1177  * the meantime.
1178  *
1179  * @param cls closure, the mesh handle
1180  * @param size number of bytes available in buf
1181  * @param buf where the callee should write the connect message
1182  * @return number of bytes written to buf
1183  */
1184 static size_t
1185 send_callback (void *cls, size_t size, void *buf)
1186 {
1187   struct GNUNET_MESH_Handle *h = cls;
1188   struct GNUNET_MESH_TransmitHandle *th;
1189   struct GNUNET_MESH_TransmitHandle *next;
1190   struct GNUNET_MESH_Tunnel *t;
1191   char *cbuf = buf;
1192   size_t tsize;
1193   size_t psize;
1194   size_t nsize;
1195
1196   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1197   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() Buffer %u\n", size);
1198   if ((0 == size) || (NULL == buf))
1199   {
1200     LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1201     reconnect (h);
1202     h->th = NULL;
1203     return 0;
1204   }
1205   tsize = 0;
1206   next = h->th_head;
1207   nsize = message_ready_size (h);
1208   while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1209   {
1210     t = th->tunnel;
1211     if (GNUNET_YES == th_is_payload (th))
1212     {
1213       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload\n");
1214       if (GNUNET_NO == GMC_is_pid_bigger(t->last_ack_recv, t->last_pid_sent))
1215       {
1216         /* This tunnel is not ready to transmit yet, try next message */
1217         next = th->next;
1218         continue;
1219       }
1220       t->packet_size = 0;
1221       if (t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1222       {
1223         /* traffic to origin */
1224         struct GNUNET_MESH_ToOrigin to;
1225         struct GNUNET_MessageHeader *mh;
1226
1227         GNUNET_assert (size >= th->size);
1228         mh = (struct GNUNET_MessageHeader *) &cbuf[sizeof (to)];
1229         psize = th->notify (th->notify_cls, size - sizeof (to), mh);
1230         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  to origin, type %s\n",
1231              GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1232         if (psize > 0)
1233         {
1234           psize += sizeof (to);
1235           GNUNET_assert (size >= psize);
1236           to.header.size = htons (psize);
1237           to.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
1238           to.tid = htonl (t->tid);
1239           to.pid = htonl (t->last_pid_sent + 1);
1240           to.ttl = 0;
1241           memset (&to.oid, 0, sizeof (struct GNUNET_PeerIdentity));
1242           memcpy (cbuf, &to, sizeof (to));
1243         }
1244       }
1245       else
1246       {
1247         /* unicast */
1248         struct GNUNET_MESH_Unicast uc;
1249         struct GNUNET_MessageHeader *mh;
1250
1251         GNUNET_assert (size >= th->size);
1252         mh = (struct GNUNET_MessageHeader *) &cbuf[sizeof (uc)];
1253         psize = th->notify (th->notify_cls, size - sizeof (uc), mh);
1254         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  unicast, type %s\n",
1255              GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1256         if (psize > 0)
1257         {
1258           psize += sizeof (uc);
1259           GNUNET_assert (size >= psize);
1260           uc.header.size = htons (psize);
1261           uc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST);
1262           uc.tid = htonl (t->tid);
1263           uc.pid = htonl (t->last_pid_sent + 1);
1264           uc.ttl = 0;
1265           memset (&uc.oid, 0, sizeof (struct GNUNET_PeerIdentity));
1266           memcpy (cbuf, &uc, sizeof (uc));
1267         }
1268       }
1269       t->last_pid_sent++;
1270     }
1271     else
1272     {
1273       struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
1274
1275       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  mesh traffic, type %s\n",
1276            GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1277       memcpy (cbuf, &th[1], th->size);
1278       psize = th->size;
1279     }
1280     if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1281       GNUNET_SCHEDULER_cancel (th->timeout_task);
1282     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1283     GNUNET_free (th);
1284     next = h->th_head;
1285     nsize = message_ready_size (h);
1286     cbuf += psize;
1287     size -= psize;
1288     tsize += psize;
1289   }
1290   LOG (GNUNET_ERROR_TYPE_DEBUG, "#  total size: %u\n", tsize);
1291   h->th = NULL;
1292   size = message_ready_size (h);
1293   if (0 != size)
1294   {
1295     LOG (GNUNET_ERROR_TYPE_DEBUG, "#  next size: %u\n", size);
1296     h->th =
1297         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1298                                              GNUNET_TIME_UNIT_FOREVER_REL,
1299                                              GNUNET_YES, &send_callback, h);
1300   }
1301   else
1302   {
1303     if (NULL != h->th_head)
1304       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  can't transmit any more\n");
1305     else
1306       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing left to transmit\n");
1307   }
1308   if (GNUNET_NO == h->in_receive)
1309   {
1310     LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1311     h->in_receive = GNUNET_YES;
1312     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1313                            GNUNET_TIME_UNIT_FOREVER_REL);
1314   }
1315   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() END\n");
1316   return tsize;
1317 }
1318
1319
1320 /**
1321  * Auxiliary function to send an already constructed packet to the service.
1322  * Takes care of creating a new queue element, copying the message and
1323  * calling the tmt_rdy function if necessary.
1324  * 
1325  * @param h mesh handle
1326  * @param msg message to transmit
1327  * @param tunnel tunnel this send is related to (NULL if N/A)
1328  */
1329 static void
1330 send_packet (struct GNUNET_MESH_Handle *h,
1331              const struct GNUNET_MessageHeader *msg,
1332              struct GNUNET_MESH_Tunnel *tunnel)
1333 {
1334   struct GNUNET_MESH_TransmitHandle *th;
1335   size_t msize;
1336
1337   LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1338        GNUNET_MESH_DEBUG_M2S(ntohs(msg->type)));
1339   msize = ntohs (msg->size);
1340   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
1341   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1342   th->size = msize;
1343   th->tunnel = tunnel;
1344   memcpy (&th[1], msg, msize);
1345   add_to_queue (h, th);
1346   LOG (GNUNET_ERROR_TYPE_DEBUG, "  queued\n");
1347   if (NULL != h->th)
1348     return;
1349   LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
1350   h->th =
1351       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1352                                            GNUNET_TIME_UNIT_FOREVER_REL,
1353                                            GNUNET_YES, &send_callback, h);
1354 }
1355
1356
1357 /******************************************************************************/
1358 /**********************      API CALL DEFINITIONS     *************************/
1359 /******************************************************************************/
1360
1361 struct GNUNET_MESH_Handle *
1362 GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1363                      GNUNET_MESH_InboundTunnelNotificationHandler new_tunnel,
1364                      GNUNET_MESH_TunnelEndHandler cleaner,
1365                      const struct GNUNET_MESH_MessageHandler *handlers,
1366                      const uint32_t *ports)
1367 {
1368   struct GNUNET_MESH_Handle *h;
1369
1370   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect()\n");
1371   h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle));
1372   LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
1373   h->cfg = cfg;
1374   h->new_tunnel = new_tunnel;
1375   h->cleaner = cleaner;
1376   h->client = GNUNET_CLIENT_connect ("mesh", cfg);
1377   if (h->client == NULL)
1378   {
1379     GNUNET_break (0);
1380     GNUNET_free (h);
1381     return NULL;
1382   }
1383   h->cls = cls;
1384   h->message_handlers = handlers;
1385   h->ports = ports;
1386   h->next_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_CLI;
1387   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1388   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1389
1390   /* count handlers */
1391   for (h->n_handlers = 0;
1392        handlers && handlers[h->n_handlers].type;
1393        h->n_handlers++) ;
1394   for (h->n_ports = 0;
1395        ports && ports[h->n_ports];
1396        h->n_ports++) ;
1397   send_connect (h);
1398   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect() END\n");
1399   return h;
1400 }
1401
1402
1403 /**
1404  * Disconnect from the mesh service. All tunnels will be destroyed. All tunnel
1405  * disconnect callbacks will be called on any still connected peers, notifying
1406  * about their disconnection. The registered inbound tunnel cleaner will be
1407  * called should any inbound tunnels still exist.
1408  *
1409  * @param handle connection to mesh to disconnect
1410  */
1411 void
1412 GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
1413 {
1414   struct GNUNET_MESH_Tunnel *t;
1415   struct GNUNET_MESH_Tunnel *aux;
1416   struct GNUNET_MESH_TransmitHandle *th;
1417
1418   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH DISCONNECT\n");
1419
1420 #if DEBUG_ACK
1421   LOG (GNUNET_ERROR_TYPE_INFO, "Sent %d ACKs\n", handle->acks_sent);
1422   LOG (GNUNET_ERROR_TYPE_INFO, "Recv %d ACKs\n\n", handle->acks_recv);
1423 #endif
1424
1425   t = handle->tunnels_head;
1426   while (NULL != t)
1427   {
1428     aux = t->next;
1429     if (t->tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1430     {
1431       GNUNET_break (0);
1432       LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X not destroyed\n", t->tid);
1433     }
1434     destroy_tunnel (t, GNUNET_YES);
1435     t = aux;
1436   }
1437   while ( (th = handle->th_head) != NULL)
1438   {
1439     struct GNUNET_MessageHeader *msg;
1440
1441     /* Make sure it is an allowed packet (everything else should have been
1442      * already canceled).
1443      */
1444     GNUNET_break (GNUNET_NO == th_is_payload (th));
1445     msg = (struct GNUNET_MessageHeader *) &th[1];
1446     switch (ntohs(msg->type))
1447     {
1448       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT:
1449       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY:
1450       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS:
1451       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL:
1452         break;
1453       default:
1454         GNUNET_break (0);
1455         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected msg %u\n",
1456              ntohs(msg->type));
1457     }
1458
1459     GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th);
1460     GNUNET_free (th);
1461   }
1462
1463   if (NULL != handle->th)
1464   {
1465     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1466     handle->th = NULL;
1467   }
1468   if (NULL != handle->client)
1469   {
1470     GNUNET_CLIENT_disconnect (handle->client);
1471     handle->client = NULL;
1472   }
1473   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1474   {
1475     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1476     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1477   }
1478   GNUNET_free (handle);
1479 }
1480
1481
1482 struct GNUNET_MESH_Tunnel *
1483 GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h, 
1484                            void *tunnel_ctx,
1485                            const struct GNUNET_PeerIdentity *peer,
1486                            uint32_t port)
1487 {
1488   struct GNUNET_MESH_Tunnel *t;
1489   struct GNUNET_MESH_TunnelMessage msg;
1490
1491   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating new tunnel\n");
1492   t = create_tunnel (h, 0);
1493   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", t);
1494   LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", t->tid);
1495   t->ctx = tunnel_ctx;
1496   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
1497   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1498   msg.tunnel_id = htonl (t->tid);
1499   msg.port = htonl (port);
1500   msg.peer = *peer;
1501   send_packet (h, &msg.header, t);
1502   return t;
1503 }
1504
1505
1506 /**
1507  * Destroy an existing tunnel. The existing callback for the tunnel will NOT
1508  * be called.
1509  *
1510  * @param tunnel tunnel handle
1511  */
1512 void
1513 GNUNET_MESH_tunnel_destroy (struct GNUNET_MESH_Tunnel *tunnel)
1514 {
1515   struct GNUNET_MESH_Handle *h;
1516   struct GNUNET_MESH_TunnelMessage msg;
1517   struct GNUNET_MESH_TransmitHandle *th;
1518
1519   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying tunnel\n");
1520   h = tunnel->mesh;
1521
1522   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
1523   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1524   msg.tunnel_id = htonl (tunnel->tid);
1525   th = h->th_head;
1526   while (th != NULL)
1527   {
1528     struct GNUNET_MESH_TransmitHandle *aux;
1529     if (th->tunnel == tunnel)
1530     {
1531       aux = th->next;
1532       /* FIXME call the handler? */
1533       if (GNUNET_YES == th_is_payload (th))
1534         th->notify (th->notify_cls, 0, NULL);
1535       GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1536       GNUNET_free (th);
1537       th = aux;
1538     }
1539     else
1540       th = th->next;
1541   }
1542
1543   destroy_tunnel (tunnel, GNUNET_NO);
1544   send_packet (h, &msg.header, NULL);
1545 }
1546
1547
1548 /**
1549  * Turn on/off the buffering status of the tunnel.
1550  * 
1551  * @param tunnel Tunnel affected.
1552  * @param buffer GNUNET_YES to turn buffering on (default),
1553  *               GNUNET_NO otherwise.
1554  */
1555 void
1556 GNUNET_MESH_tunnel_buffer (struct GNUNET_MESH_Tunnel *tunnel, int buffer)
1557 {
1558   struct GNUNET_MESH_TunnelMessage msg;
1559   struct GNUNET_MESH_Handle *h;
1560
1561   h = tunnel->mesh;
1562   tunnel->buffering = buffer;
1563
1564   if (GNUNET_YES == buffer)
1565     msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_BUFFER);
1566   else
1567     msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_NOBUFFER);
1568   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1569   msg.tunnel_id = htonl (tunnel->tid);
1570
1571   send_packet (h, &msg.header, NULL);
1572 }
1573
1574
1575 struct GNUNET_MESH_TransmitHandle *
1576 GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork,
1577                                    struct GNUNET_TIME_Relative maxdelay,
1578                                    size_t notify_size,
1579                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1580                                    void *notify_cls)
1581 {
1582   struct GNUNET_MESH_TransmitHandle *th;
1583   size_t overhead;
1584
1585   GNUNET_assert (NULL != tunnel);
1586   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY\n");
1587   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on tunnel %X\n", tunnel->tid);
1588   if (tunnel->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1589     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
1590   else
1591     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to destination\n");
1592   LOG (GNUNET_ERROR_TYPE_DEBUG, "    payload size %u\n", notify_size);
1593   GNUNET_assert (NULL != notify);
1594   GNUNET_assert (0 == tunnel->packet_size); // Only one data packet allowed
1595   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle));
1596   th->tunnel = tunnel;
1597   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1598   if (tunnel->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1599     overhead = sizeof (struct GNUNET_MESH_ToOrigin);
1600   else
1601     overhead = sizeof (struct GNUNET_MESH_Unicast);
1602   tunnel->packet_size = th->size = notify_size + overhead;
1603   LOG (GNUNET_ERROR_TYPE_DEBUG, "    total size %u\n", th->size);
1604   th->notify = notify;
1605   th->notify_cls = notify_cls;
1606   add_to_queue (tunnel->mesh, th);
1607   if (NULL != tunnel->mesh->th)
1608     return th;
1609   if (!GMC_is_pid_bigger (tunnel->last_ack_recv, tunnel->last_pid_sent))
1610     return th;
1611   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call client notify tmt rdy\n");
1612   tunnel->mesh->th =
1613       GNUNET_CLIENT_notify_transmit_ready (tunnel->mesh->client, th->size,
1614                                            GNUNET_TIME_UNIT_FOREVER_REL,
1615                                            GNUNET_YES, &send_callback,
1616                                            tunnel->mesh);
1617   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY END\n");
1618   return th;
1619 }
1620
1621
1622 /**
1623  * Cancel the specified transmission-ready notification.
1624  *
1625  * @param th handle that was returned by "notify_transmit_ready".
1626  */
1627 void
1628 GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
1629 {
1630   struct GNUNET_MESH_Handle *mesh;
1631
1632   th->tunnel->packet_size = 0;
1633   mesh = th->tunnel->mesh;
1634   if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1635     GNUNET_SCHEDULER_cancel (th->timeout_task);
1636   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
1637   GNUNET_free (th);
1638   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
1639   {
1640     /* queue empty, no point in asking for transmission */
1641     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
1642     mesh->th = NULL;
1643   }
1644 }
1645
1646
1647 /**
1648  * Request information about the running mesh peer.
1649  * The callback will be called for every tunnel known to the service,
1650  * listing all active peers that blong to the tunnel.
1651  *
1652  * If called again on the same handle, it will overwrite the previous
1653  * callback and cls. To retrieve the cls, monitor_cancel must be
1654  * called first.
1655  *
1656  * WARNING: unstable API, likely to change in the future!
1657  *
1658  * @param h Handle to the mesh peer.
1659  * @param callback Function to call with the requested data.
1660  * @param callback_cls Closure for @c callback.
1661  */
1662 void
1663 GNUNET_MESH_get_tunnels (struct GNUNET_MESH_Handle *h,
1664                          GNUNET_MESH_TunnelsCB callback,
1665                          void *callback_cls)
1666 {
1667   struct GNUNET_MessageHeader msg;
1668
1669   msg.size = htons (sizeof (msg));
1670   msg.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS);
1671   send_packet (h, &msg, NULL);
1672   h->tunnels_cb = callback;
1673   h->tunnels_cls = callback_cls;
1674
1675   return;
1676 }
1677
1678
1679 /**
1680  * Cancel a monitor request. The monitor callback will not be called.
1681  *
1682  * @param h Mesh handle.
1683  *
1684  * @return Closure given to GNUNET_MESH_monitor, if any.
1685  */
1686 void *
1687 GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h)
1688 {
1689   void *cls;
1690
1691   cls = h->tunnels_cls;
1692   h->tunnels_cb = NULL;
1693   h->tunnels_cls = NULL;
1694   return cls;
1695 }
1696
1697
1698 /**
1699  * Request information about a specific tunnel of the running mesh peer.
1700  *
1701  * WARNING: unstable API, likely to change in the future!
1702  * FIXME Add destination option.
1703  *
1704  * @param h Handle to the mesh peer.
1705  * @param initiator ID of the owner of the tunnel.
1706  * @param tunnel_number Tunnel number.
1707  * @param callback Function to call with the requested data.
1708  * @param callback_cls Closure for @c callback.
1709  */
1710 void
1711 GNUNET_MESH_show_tunnel (struct GNUNET_MESH_Handle *h,
1712                          struct GNUNET_PeerIdentity *initiator,
1713                          unsigned int tunnel_number,
1714                          GNUNET_MESH_TunnelCB callback,
1715                          void *callback_cls)
1716 {
1717   struct GNUNET_MESH_LocalMonitor msg;
1718
1719   msg.header.size = htons (sizeof (msg));
1720   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL);
1721   msg.owner = *initiator;
1722   msg.tunnel_id = htonl (tunnel_number);
1723   msg.reserved = 0;
1724   send_packet (h, &msg.header, NULL);
1725   h->tunnel_cb = callback;
1726   h->tunnel_cls = callback_cls;
1727
1728   return;
1729 }
1730
1731
1732 /**
1733  * Transition API for tunnel ctx management
1734  */
1735 void
1736 GNUNET_MESH_tunnel_set_data (struct GNUNET_MESH_Tunnel *tunnel, void *data)
1737 {
1738   tunnel->ctx = data;
1739 }
1740
1741 /**
1742  * Transition API for tunnel ctx management
1743  */
1744 void *
1745 GNUNET_MESH_tunnel_get_data (struct GNUNET_MESH_Tunnel *tunnel)
1746 {
1747   return tunnel->ctx;
1748 }
1749
1750